From a3d0d7ed32d061e0283d835dbb1da1a792d324ff Mon Sep 17 00:00:00 2001 From: Jarek Reynolds Date: Sat, 6 Jun 2026 16:19:01 -0600 Subject: [PATCH] fix(cloud-sdk): reclaim builder sandbox on cancellation --- crates/cloud-sdk/src/sandbox_images.rs | 345 ++++++++++++++++++++++++- 1 file changed, 331 insertions(+), 14 deletions(-) diff --git a/crates/cloud-sdk/src/sandbox_images.rs b/crates/cloud-sdk/src/sandbox_images.rs index a7666cd4..02c15668 100644 --- a/crates/cloud-sdk/src/sandbox_images.rs +++ b/crates/cloud-sdk/src/sandbox_images.rs @@ -337,6 +337,7 @@ where .await?; let sandbox_id = created.sandbox_id.clone(); let routing_hint = created.routing_hint.clone(); + let cleanup = BuilderSandboxCleanup::arm(sandboxes.clone(), sandbox_id.clone()); let result = async { let running_info = wait_for_sandbox_status( @@ -381,9 +382,9 @@ where // sandbox visibly alive while that step is in flight so the Platform // doesn't suspend it out from under us. Aborted as soon as the // builder returns, regardless of outcome. - let keepalive_task = spawn_builder_keepalive(proxy.clone()); + let keepalive = AbortOnDropJoinHandle(spawn_builder_keepalive(proxy.clone())); let builder_result = run_rootfs_builder(&proxy, &prepared.builder.command, &mut emit).await; - keepalive_task.abort(); + drop(keepalive); builder_result?; let metadata = read_build_metadata(&proxy).await?; @@ -408,7 +409,7 @@ where } .await; - if let Err(error) = sandboxes.delete(&sandbox_id).await { + if let Err(error) = cleanup.reclaim().await { emit(SandboxImageBuildEvent::Warning(format!( "Failed to terminate rootfs builder sandbox {} during cleanup: {}", sandbox_id, error @@ -418,6 +419,62 @@ where result } +struct BuilderSandboxCleanup { + handle: tokio::runtime::Handle, + sandboxes: SandboxesClient, + sandbox_id: Option, +} + +impl BuilderSandboxCleanup { + fn arm(sandboxes: SandboxesClient, sandbox_id: String) -> Self { + Self { + handle: tokio::runtime::Handle::current(), + sandboxes, + sandbox_id: Some(sandbox_id), + } + } + + fn spawn_delete(&mut self) -> Option>> { + let sandbox_id = self.sandbox_id.take()?; + let sandboxes = self.sandboxes.clone(); + Some(self.handle.spawn(async move { + match sandboxes.delete(&sandbox_id).await { + Ok(_) => Ok(()), + Err(SdkError::ServerError { + status: StatusCode::NOT_FOUND, + .. + }) => Ok(()), + Err(error) => Err(error.into()), + } + })) + } + + async fn reclaim(mut self) -> Result<()> { + if let Some(task) = self.spawn_delete() { + task.await.map_err(|error| { + SandboxImageBuildError::other(format!( + "builder sandbox cleanup task failed: {error}" + )) + })??; + } + Ok(()) + } +} + +impl Drop for BuilderSandboxCleanup { + fn drop(&mut self) { + self.spawn_delete(); + } +} + +struct AbortOnDropJoinHandle(tokio::task::JoinHandle<()>); + +impl Drop for AbortOnDropJoinHandle { + fn drop(&mut self) { + self.0.abort(); + } +} + async fn resolve_build_context(options: SandboxImageBuildOptions) -> Result { let client = unscoped_client(&options)?; let (organization_id, project_id) = if options.use_scope_headers { @@ -950,10 +1007,7 @@ fn docker_config_json_from_credentials( } /// Spawn a background task that pings the builder sandbox at a fixed cadence -/// to keep it from being suspended due to inactivity / lifetime expiry. The -/// caller MUST `.abort()` the returned handle when the build phase finishes -/// (success or failure) — otherwise the task would outlive the build and keep -/// poking a sandbox we're about to delete. +/// to keep it from being suspended due to inactivity / lifetime expiry. fn spawn_builder_keepalive(proxy: SandboxProxyClient) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut interval = tokio::time::interval(BUILDER_SANDBOX_KEEPALIVE_INTERVAL); @@ -1830,15 +1884,278 @@ fn is_dockerignored( #[cfg(test)] mod tests { use super::{ - CompleteSandboxTemplateBuildRequest, PreparedRootfsBuilder, PreparedRootfsParent, - PreparedSandboxTemplateBuild, build_rootfs_spec, collect_dir_files, - complete_request_from_metadata, default_registered_name, load_dockerfile_plan, - load_dockerfile_text_plan, logical_dockerfile_lines, normalize_posix, rootfs_builder_env, - rootfs_builder_executable, rootfs_disk_bytes, rootfs_disk_bytes_to_mb, - streaming_process_payload, + AbortOnDropJoinHandle, BuilderSandboxCleanup, CompleteSandboxTemplateBuildRequest, + PreparedRootfsBuilder, PreparedRootfsParent, PreparedSandboxTemplateBuild, + build_rootfs_spec, collect_dir_files, complete_request_from_metadata, + default_registered_name, load_dockerfile_plan, load_dockerfile_text_plan, + logical_dockerfile_lines, normalize_posix, rootfs_builder_env, rootfs_builder_executable, + rootfs_disk_bytes, rootfs_disk_bytes_to_mb, streaming_process_payload, }; + use crate::{ClientBuilder, sandboxes::SandboxesClient}; + use reqwest::StatusCode; use serde_json::{Value, json}; - use std::io::Write; + use std::{ + future, + io::Write, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, + time::Duration, + }; + use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpStream}, + task::JoinHandle, + }; + + const DELETE_TIMEOUT: Duration = Duration::from_secs(1); + const SECOND_DELETE_TIMEOUT: Duration = Duration::from_millis(200); + + #[tokio::test] + async fn ok_delete_success() { + let (sandboxes, server) = delete_test_client(StatusCode::NO_CONTENT, false).await; + let cleanup = BuilderSandboxCleanup::arm(sandboxes, "sandbox-1".to_string()); + + cleanup.reclaim().await.expect("reclaim succeeds"); + + let requests = await_delete_requests(server).await; + assert_single_delete(&requests, "sandbox-1"); + } + + #[tokio::test] + async fn drop_spawns_delete() { + let (sandboxes, server) = delete_test_client(StatusCode::NO_CONTENT, false).await; + let cleanup = BuilderSandboxCleanup::arm(sandboxes, "sandbox-1".to_string()); + + drop(cleanup); + + let requests = await_delete_requests(server).await; + assert_single_delete(&requests, "sandbox-1"); + } + + #[tokio::test] + async fn reclaim_deletes_once() { + let (sandboxes, server) = delete_test_client(StatusCode::NO_CONTENT, true).await; + let cleanup = BuilderSandboxCleanup::arm(sandboxes, "sandbox-1".to_string()); + + cleanup.reclaim().await.expect("reclaim succeeds"); + + let requests = await_delete_requests(server).await; + assert_single_delete(&requests, "sandbox-1"); + } + + #[tokio::test] + async fn reclaim_cancellation_completes_delete() { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind test listener"); + let address = listener.local_addr().expect("listener address"); + let delete_started = Arc::new(AtomicBool::new(false)); + let delete_completed = Arc::new(AtomicBool::new(false)); + let server_started = Arc::clone(&delete_started); + let server_completed = Arc::clone(&delete_completed); + let server = tokio::spawn(async move { + let (mut socket, _) = listener.accept().await.expect("accept delete"); + let request = read_http_request(&mut socket).await; + server_started.store(true, Ordering::SeqCst); + tokio::time::sleep(Duration::from_millis(100)).await; + write_delete_response(&mut socket, StatusCode::NO_CONTENT).await; + server_completed.store(true, Ordering::SeqCst); + request + }); + let sandboxes = test_sandboxes_client(address); + let cleanup = BuilderSandboxCleanup::arm(sandboxes, "sandbox-1".to_string()); + + let reclaim = tokio::spawn(cleanup.reclaim()); + wait_for_flag(&delete_started).await; + reclaim.abort(); + let _ = reclaim.await; + + let request = tokio::time::timeout(DELETE_TIMEOUT, server) + .await + .expect("delete should complete after reclaim cancellation") + .expect("server join"); + assert!(delete_completed.load(Ordering::SeqCst)); + assert_delete_request(&request, "sandbox-1"); + } + + #[tokio::test] + async fn delete_404_is_success() { + let (sandboxes, server) = delete_test_client(StatusCode::NOT_FOUND, false).await; + let cleanup = BuilderSandboxCleanup::arm(sandboxes, "sandbox-1".to_string()); + + cleanup + .reclaim() + .await + .expect("404 is a successful cleanup"); + + let requests = await_delete_requests(server).await; + assert_single_delete(&requests, "sandbox-1"); + } + + #[tokio::test] + async fn non_404_error_propagates() { + let (sandboxes, server) = + delete_test_client(StatusCode::INTERNAL_SERVER_ERROR, false).await; + let cleanup = BuilderSandboxCleanup::arm(sandboxes, "sandbox-1".to_string()); + + let error = cleanup.reclaim().await.expect_err("500 should propagate"); + + assert!(error.to_string().contains("500"), "{error}"); + let requests = await_delete_requests(server).await; + assert_single_delete(&requests, "sandbox-1"); + } + + #[tokio::test] + async fn keepalive_aborts_on_drop() { + let started = Arc::new(AtomicBool::new(false)); + let dropped = Arc::new(AtomicBool::new(false)); + let task_started = Arc::clone(&started); + let task_dropped = Arc::clone(&dropped); + let keepalive = AbortOnDropJoinHandle(tokio::spawn(async move { + let _guard = DropFlag(task_dropped); + task_started.store(true, Ordering::SeqCst); + future::pending::<()>().await; + })); + + wait_for_flag(&started).await; + drop(keepalive); + + wait_for_flag(&dropped).await; + } + + struct DropFlag(Arc); + + impl Drop for DropFlag { + fn drop(&mut self) { + self.0.store(true, Ordering::SeqCst); + } + } + + async fn delete_test_client( + status: StatusCode, + wait_for_second: bool, + ) -> (SandboxesClient, JoinHandle>>) { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .expect("bind test listener"); + let address = listener.local_addr().expect("listener address"); + let server = tokio::spawn(async move { + let mut requests = Vec::new(); + let (mut socket, _) = listener.accept().await.expect("accept delete"); + requests.push(read_http_request(&mut socket).await); + write_delete_response(&mut socket, status).await; + + if wait_for_second + && let Ok(Ok((mut socket, _))) = + tokio::time::timeout(SECOND_DELETE_TIMEOUT, listener.accept()).await + { + requests.push(read_http_request(&mut socket).await); + write_delete_response(&mut socket, status).await; + } + + requests + }); + + (test_sandboxes_client(address), server) + } + + fn test_sandboxes_client(address: std::net::SocketAddr) -> SandboxesClient { + let client = ClientBuilder::new(&format!("http://{address}")) + .build() + .expect("build client"); + SandboxesClient::new(client, "default", false) + } + + async fn await_delete_requests(server: JoinHandle>>) -> Vec> { + tokio::time::timeout(DELETE_TIMEOUT, server) + .await + .expect("delete request timed out") + .expect("server join") + } + + fn assert_single_delete(requests: &[Vec], sandbox_id: &str) { + assert_eq!( + requests.len(), + 1, + "expected exactly one delete request, got {}", + requests.len() + ); + assert_delete_request(&requests[0], sandbox_id); + } + + fn assert_delete_request(request: &[u8], sandbox_id: &str) { + let request_text = String::from_utf8_lossy(request); + let expected = format!("DELETE /sandboxes/{sandbox_id} HTTP/1.1\r\n"); + assert!( + request_text.starts_with(&expected), + "expected request to start with {expected:?}, got {request_text:?}" + ); + } + + async fn read_http_request(socket: &mut TcpStream) -> Vec { + let mut request = Vec::new(); + let mut buf = [0_u8; 4096]; + + loop { + let read = socket.read(&mut buf).await.expect("read request"); + if read == 0 { + break; + } + request.extend_from_slice(&buf[..read]); + + if let Some(headers_end) = request.windows(4).position(|window| window == b"\r\n\r\n") { + let headers = String::from_utf8_lossy(&request[..headers_end + 4]); + let content_length = headers + .lines() + .find_map(|line| { + let (name, value) = line.split_once(':')?; + if name.eq_ignore_ascii_case("content-length") { + value.trim().parse::().ok() + } else { + None + } + }) + .unwrap_or(0); + + if request.len() >= headers_end + 4 + content_length { + break; + } + } + } + + request + } + + async fn write_delete_response(socket: &mut TcpStream, status: StatusCode) { + let body = if status.is_success() { + "" + } else { + "delete failed" + }; + let response = format!( + "HTTP/1.1 {} {}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", + status.as_u16(), + status.canonical_reason().unwrap_or("Unknown"), + body.len(), + body + ); + socket + .write_all(response.as_bytes()) + .await + .expect("write response"); + } + + async fn wait_for_flag(flag: &AtomicBool) { + for _ in 0..100 { + if flag.load(Ordering::SeqCst) { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + panic!("timed out waiting for flag"); + } #[test] fn default_registered_name_uses_parent_for_dockerfile() {