Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
345 changes: 331 additions & 14 deletions crates/cloud-sdk/src/sandbox_images.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,7 @@ where
.await?;
let sandbox_id = created.sandbox_id.clone();
let routing_hint = created.routing_hint.clone();
let cleanup = BuilderSandboxCleanup::arm(sandboxes.clone(), sandbox_id.clone());

let result = async {
let running_info = wait_for_sandbox_status(
Expand Down Expand Up @@ -381,9 +382,9 @@ where
// sandbox visibly alive while that step is in flight so the Platform
// doesn't suspend it out from under us. Aborted as soon as the
// builder returns, regardless of outcome.
let keepalive_task = spawn_builder_keepalive(proxy.clone());
let keepalive = AbortOnDropJoinHandle(spawn_builder_keepalive(proxy.clone()));
let builder_result = run_rootfs_builder(&proxy, &prepared.builder.command, &mut emit).await;
keepalive_task.abort();
drop(keepalive);
builder_result?;

let metadata = read_build_metadata(&proxy).await?;
Expand All @@ -408,7 +409,7 @@ where
}
.await;

if let Err(error) = sandboxes.delete(&sandbox_id).await {
if let Err(error) = cleanup.reclaim().await {
emit(SandboxImageBuildEvent::Warning(format!(
"Failed to terminate rootfs builder sandbox {} during cleanup: {}",
sandbox_id, error
Expand All @@ -418,6 +419,62 @@ where
result
}

struct BuilderSandboxCleanup {
handle: tokio::runtime::Handle,
sandboxes: SandboxesClient,
sandbox_id: Option<String>,
}

impl BuilderSandboxCleanup {
fn arm(sandboxes: SandboxesClient, sandbox_id: String) -> Self {
Self {
handle: tokio::runtime::Handle::current(),
sandboxes,
sandbox_id: Some(sandbox_id),
}
}

fn spawn_delete(&mut self) -> Option<tokio::task::JoinHandle<Result<()>>> {
let sandbox_id = self.sandbox_id.take()?;
let sandboxes = self.sandboxes.clone();
Some(self.handle.spawn(async move {
match sandboxes.delete(&sandbox_id).await {
Ok(_) => Ok(()),
Err(SdkError::ServerError {
status: StatusCode::NOT_FOUND,
..
}) => Ok(()),
Err(error) => Err(error.into()),
}
}))
}

async fn reclaim(mut self) -> Result<()> {
if let Some(task) = self.spawn_delete() {
task.await.map_err(|error| {
SandboxImageBuildError::other(format!(
"builder sandbox cleanup task failed: {error}"
))
})??;
}
Ok(())
}
}

impl Drop for BuilderSandboxCleanup {
fn drop(&mut self) {
self.spawn_delete();
}
}

struct AbortOnDropJoinHandle(tokio::task::JoinHandle<()>);

impl Drop for AbortOnDropJoinHandle {
fn drop(&mut self) {
self.0.abort();
}
}

async fn resolve_build_context(options: SandboxImageBuildOptions) -> Result<ResolvedBuildContext> {
let client = unscoped_client(&options)?;
let (organization_id, project_id) = if options.use_scope_headers {
Expand Down Expand Up @@ -950,10 +1007,7 @@ fn docker_config_json_from_credentials(
}

/// Spawn a background task that pings the builder sandbox at a fixed cadence
/// to keep it from being suspended due to inactivity / lifetime expiry. The
/// caller MUST `.abort()` the returned handle when the build phase finishes
/// (success or failure) — otherwise the task would outlive the build and keep
/// poking a sandbox we're about to delete.
/// to keep it from being suspended due to inactivity / lifetime expiry.
fn spawn_builder_keepalive(proxy: SandboxProxyClient) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(BUILDER_SANDBOX_KEEPALIVE_INTERVAL);
Expand Down Expand Up @@ -1830,15 +1884,278 @@ fn is_dockerignored(
#[cfg(test)]
mod tests {
use super::{
CompleteSandboxTemplateBuildRequest, PreparedRootfsBuilder, PreparedRootfsParent,
PreparedSandboxTemplateBuild, build_rootfs_spec, collect_dir_files,
complete_request_from_metadata, default_registered_name, load_dockerfile_plan,
load_dockerfile_text_plan, logical_dockerfile_lines, normalize_posix, rootfs_builder_env,
rootfs_builder_executable, rootfs_disk_bytes, rootfs_disk_bytes_to_mb,
streaming_process_payload,
AbortOnDropJoinHandle, BuilderSandboxCleanup, CompleteSandboxTemplateBuildRequest,
PreparedRootfsBuilder, PreparedRootfsParent, PreparedSandboxTemplateBuild,
build_rootfs_spec, collect_dir_files, complete_request_from_metadata,
default_registered_name, load_dockerfile_plan, load_dockerfile_text_plan,
logical_dockerfile_lines, normalize_posix, rootfs_builder_env, rootfs_builder_executable,
rootfs_disk_bytes, rootfs_disk_bytes_to_mb, streaming_process_payload,
};
use crate::{ClientBuilder, sandboxes::SandboxesClient};
use reqwest::StatusCode;
use serde_json::{Value, json};
use std::io::Write;
use std::{
future,
io::Write,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
time::Duration,
};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
task::JoinHandle,
};

const DELETE_TIMEOUT: Duration = Duration::from_secs(1);
const SECOND_DELETE_TIMEOUT: Duration = Duration::from_millis(200);

#[tokio::test]
async fn ok_delete_success() {
let (sandboxes, server) = delete_test_client(StatusCode::NO_CONTENT, false).await;
let cleanup = BuilderSandboxCleanup::arm(sandboxes, "sandbox-1".to_string());

cleanup.reclaim().await.expect("reclaim succeeds");

let requests = await_delete_requests(server).await;
assert_single_delete(&requests, "sandbox-1");
}

#[tokio::test]
async fn drop_spawns_delete() {
let (sandboxes, server) = delete_test_client(StatusCode::NO_CONTENT, false).await;
let cleanup = BuilderSandboxCleanup::arm(sandboxes, "sandbox-1".to_string());

drop(cleanup);

let requests = await_delete_requests(server).await;
assert_single_delete(&requests, "sandbox-1");
}

#[tokio::test]
async fn reclaim_deletes_once() {
let (sandboxes, server) = delete_test_client(StatusCode::NO_CONTENT, true).await;
let cleanup = BuilderSandboxCleanup::arm(sandboxes, "sandbox-1".to_string());

cleanup.reclaim().await.expect("reclaim succeeds");

let requests = await_delete_requests(server).await;
assert_single_delete(&requests, "sandbox-1");
}

#[tokio::test]
async fn reclaim_cancellation_completes_delete() {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("bind test listener");
let address = listener.local_addr().expect("listener address");
let delete_started = Arc::new(AtomicBool::new(false));
let delete_completed = Arc::new(AtomicBool::new(false));
let server_started = Arc::clone(&delete_started);
let server_completed = Arc::clone(&delete_completed);
let server = tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.expect("accept delete");
let request = read_http_request(&mut socket).await;
server_started.store(true, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(100)).await;
write_delete_response(&mut socket, StatusCode::NO_CONTENT).await;
server_completed.store(true, Ordering::SeqCst);
request
});
let sandboxes = test_sandboxes_client(address);
let cleanup = BuilderSandboxCleanup::arm(sandboxes, "sandbox-1".to_string());

let reclaim = tokio::spawn(cleanup.reclaim());
wait_for_flag(&delete_started).await;
reclaim.abort();
let _ = reclaim.await;

let request = tokio::time::timeout(DELETE_TIMEOUT, server)
.await
.expect("delete should complete after reclaim cancellation")
.expect("server join");
assert!(delete_completed.load(Ordering::SeqCst));
assert_delete_request(&request, "sandbox-1");
}

#[tokio::test]
async fn delete_404_is_success() {
let (sandboxes, server) = delete_test_client(StatusCode::NOT_FOUND, false).await;
let cleanup = BuilderSandboxCleanup::arm(sandboxes, "sandbox-1".to_string());

cleanup
.reclaim()
.await
.expect("404 is a successful cleanup");

let requests = await_delete_requests(server).await;
assert_single_delete(&requests, "sandbox-1");
}

#[tokio::test]
async fn non_404_error_propagates() {
let (sandboxes, server) =
delete_test_client(StatusCode::INTERNAL_SERVER_ERROR, false).await;
let cleanup = BuilderSandboxCleanup::arm(sandboxes, "sandbox-1".to_string());

let error = cleanup.reclaim().await.expect_err("500 should propagate");

assert!(error.to_string().contains("500"), "{error}");
let requests = await_delete_requests(server).await;
assert_single_delete(&requests, "sandbox-1");
}

#[tokio::test]
async fn keepalive_aborts_on_drop() {
let started = Arc::new(AtomicBool::new(false));
let dropped = Arc::new(AtomicBool::new(false));
let task_started = Arc::clone(&started);
let task_dropped = Arc::clone(&dropped);
let keepalive = AbortOnDropJoinHandle(tokio::spawn(async move {
let _guard = DropFlag(task_dropped);
task_started.store(true, Ordering::SeqCst);
future::pending::<()>().await;
}));

wait_for_flag(&started).await;
drop(keepalive);

wait_for_flag(&dropped).await;
}

struct DropFlag(Arc<AtomicBool>);

impl Drop for DropFlag {
fn drop(&mut self) {
self.0.store(true, Ordering::SeqCst);
}
}

async fn delete_test_client(
status: StatusCode,
wait_for_second: bool,
) -> (SandboxesClient, JoinHandle<Vec<Vec<u8>>>) {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.expect("bind test listener");
let address = listener.local_addr().expect("listener address");
let server = tokio::spawn(async move {
let mut requests = Vec::new();
let (mut socket, _) = listener.accept().await.expect("accept delete");
requests.push(read_http_request(&mut socket).await);
write_delete_response(&mut socket, status).await;

if wait_for_second
&& let Ok(Ok((mut socket, _))) =
tokio::time::timeout(SECOND_DELETE_TIMEOUT, listener.accept()).await
{
requests.push(read_http_request(&mut socket).await);
write_delete_response(&mut socket, status).await;
}

requests
});

(test_sandboxes_client(address), server)
}

fn test_sandboxes_client(address: std::net::SocketAddr) -> SandboxesClient {
let client = ClientBuilder::new(&format!("http://{address}"))
.build()
.expect("build client");
SandboxesClient::new(client, "default", false)
}

async fn await_delete_requests(server: JoinHandle<Vec<Vec<u8>>>) -> Vec<Vec<u8>> {
tokio::time::timeout(DELETE_TIMEOUT, server)
.await
.expect("delete request timed out")
.expect("server join")
}

fn assert_single_delete(requests: &[Vec<u8>], sandbox_id: &str) {
assert_eq!(
requests.len(),
1,
"expected exactly one delete request, got {}",
requests.len()
);
assert_delete_request(&requests[0], sandbox_id);
}

fn assert_delete_request(request: &[u8], sandbox_id: &str) {
let request_text = String::from_utf8_lossy(request);
let expected = format!("DELETE /sandboxes/{sandbox_id} HTTP/1.1\r\n");
assert!(
request_text.starts_with(&expected),
"expected request to start with {expected:?}, got {request_text:?}"
);
}

async fn read_http_request(socket: &mut TcpStream) -> Vec<u8> {
let mut request = Vec::new();
let mut buf = [0_u8; 4096];

loop {
let read = socket.read(&mut buf).await.expect("read request");
if read == 0 {
break;
}
request.extend_from_slice(&buf[..read]);

if let Some(headers_end) = request.windows(4).position(|window| window == b"\r\n\r\n") {
let headers = String::from_utf8_lossy(&request[..headers_end + 4]);
let content_length = headers
.lines()
.find_map(|line| {
let (name, value) = line.split_once(':')?;
if name.eq_ignore_ascii_case("content-length") {
value.trim().parse::<usize>().ok()
} else {
None
}
})
.unwrap_or(0);

if request.len() >= headers_end + 4 + content_length {
break;
}
}
}

request
}

async fn write_delete_response(socket: &mut TcpStream, status: StatusCode) {
let body = if status.is_success() {
""
} else {
"delete failed"
};
let response = format!(
"HTTP/1.1 {} {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
status.as_u16(),
status.canonical_reason().unwrap_or("Unknown"),
body.len(),
body
);
socket
.write_all(response.as_bytes())
.await
.expect("write response");
}

async fn wait_for_flag(flag: &AtomicBool) {
for _ in 0..100 {
if flag.load(Ordering::SeqCst) {
return;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
panic!("timed out waiting for flag");
}

#[test]
fn default_registered_name_uses_parent_for_dockerfile() {
Expand Down
Loading