From 8b57332f274197d57384d6e8d3c4a9a43a9078a9 Mon Sep 17 00:00:00 2001 From: Vim Wickramasinghe Date: Wed, 4 Feb 2026 15:01:56 +0100 Subject: [PATCH 1/3] [TOW-1342] Timeout auto cleanup for subprocess --- crates/tower-runtime/src/auto_cleanup.rs | 308 +++++++++++++++++++++++ crates/tower-runtime/src/lib.rs | 8 + crates/tower-runtime/src/subprocess.rs | 64 +++-- 3 files changed, 354 insertions(+), 26 deletions(-) create mode 100644 crates/tower-runtime/src/auto_cleanup.rs diff --git a/crates/tower-runtime/src/auto_cleanup.rs b/crates/tower-runtime/src/auto_cleanup.rs new file mode 100644 index 00000000..fae3504a --- /dev/null +++ b/crates/tower-runtime/src/auto_cleanup.rs @@ -0,0 +1,308 @@ +//! Automatic cleanup timer for subprocess executions +//! +//! This module exists to handle the case where the control plane +//! disconnects and never sends a cleanup call to the runner. Under normal circumstances, +//! the control plane should always call cleanup after a run finishes. +//! +//! **TODO**: Possibly remove this module once the control plane reliability issues are resolved. +//! ref: TOW-1342 + +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tmpdir::TmpDir; +use tokio::sync::Mutex; + +use crate::App; + +/// Spawns a background task that monitors an app and performs automatic cleanup +/// after a timeout if explicit cleanup hasn't been called. +/// +/// This task: +/// 1. Polls the app status every n seconds +/// 2. When the app reaches a terminal state, waits for cleanup_timeout +/// 3. If cleanup_called flag is still false, performs cleanup and logs a warning +pub fn spawn_cleanup_monitor( + run_id: String, + app: Arc>, + package_tmp_dir: Arc>>, + uv_temp_dir: Arc>>, + cleanup_called: Arc, + cleanup_timeout: Duration, +) { + tokio::spawn(async move { + use tower_telemetry::{info, warn}; + + // Wait for terminal state + loop { + tokio::time::sleep(Duration::from_secs(5)).await; + let status = app.lock().await.status().await; + if matches!(status, Ok(s) if s.is_terminal()) { + info!( + "Run {} finished, starting {}s automatic cleanup timer", + run_id, + cleanup_timeout.as_secs() + ); + break; + } + } + + // Wait for cleanup timeout + tokio::time::sleep(cleanup_timeout).await; + + // Check if explicit cleanup was called + if cleanup_called.load(Ordering::Relaxed) { + return; + } + + // Perform automatic cleanup + warn!( + "Automatic cleanup triggered for run {} after {}s (control plane cleanup not received)", + run_id, + cleanup_timeout.as_secs() + ); + + if let Some(temp_dir) = uv_temp_dir.lock().await.take() { + let _ = tokio::fs::remove_dir_all(&temp_dir).await; + } + + if let Some(tmp_dir) = package_tmp_dir.lock().await.take() { + let _ = tokio::fs::remove_dir_all(tmp_dir.to_path_buf()).await; + } + + cleanup_called.store(true, Ordering::Relaxed); + }); +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{errors::Error, StartOptions, Status}; + + /// Mock LocalApp for testing that allows controlled status transitions + struct MockLocalApp { + status: Arc>, + } + + impl MockLocalApp { + fn new(initial_status: Status, transition_to_terminal_after: Duration) -> Self { + let status = Arc::new(Mutex::new(initial_status)); + let app = Self { + status: status.clone(), + }; + + // Spawn background task to transition to terminal state after delay + tokio::spawn(async move { + tokio::time::sleep(transition_to_terminal_after).await; + *status.lock().await = Status::Exited; + }); + + app + } + } + + impl crate::App for MockLocalApp { + async fn start(_opts: StartOptions) -> Result { + unimplemented!("MockLocalApp doesn't support start") + } + + async fn terminate(&mut self) -> Result<(), Error> { + Ok(()) + } + + async fn status(&self) -> Result { + Ok(*self.status.lock().await) + } + } + + /// Helper to create temp directories for testing + async fn create_test_dirs() -> (TmpDir, PathBuf) { + let package_tmp = TmpDir::new("test-package") + .await + .expect("Failed to create package temp dir"); + + // Use timestamp for uniqueness + let unique_id = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + let uv_temp_path = std::env::temp_dir().join(format!("test-uv-{}", unique_id)); + tokio::fs::create_dir_all(&uv_temp_path) + .await + .expect("Failed to create uv temp dir"); + + (package_tmp, uv_temp_path) + } + + #[tokio::test] + async fn test_automatic_cleanup_triggers_after_timeout() { + // Create a mock app that transitions to Exited after 100ms + let app = Arc::new(Mutex::new(MockLocalApp::new( + Status::Running, + Duration::from_millis(100), + ))); + + // Create temp directories + let (package_tmp, uv_temp_path) = create_test_dirs().await; + let package_tmp_dir = Arc::new(Mutex::new(Some(package_tmp))); + let uv_temp_dir = Arc::new(Mutex::new(Some(uv_temp_path.clone()))); + let cleanup_called = Arc::new(AtomicBool::new(false)); + + // Verify directories exist before cleanup + assert!(tokio::fs::metadata(uv_temp_path.clone()).await.is_ok()); + + // Spawn cleanup monitor with short timeout (500ms) + spawn_cleanup_monitor( + "test-run-1".to_string(), + app, + package_tmp_dir.clone(), + uv_temp_dir.clone(), + cleanup_called.clone(), + Duration::from_millis(500), + ); + + // Wait for: + // - App to transition (100ms) + // - Polling to detect terminal state (up to 1000ms) + // - Cleanup timeout (500ms) + // - Buffer (500ms) + tokio::time::sleep(Duration::from_millis(2200)).await; + + // Verify cleanup happened + assert!( + cleanup_called.load(Ordering::Relaxed), + "Cleanup flag should be set" + ); + + // Verify directories were removed + assert!( + uv_temp_dir.lock().await.is_none(), + "UV temp dir should be taken" + ); + assert!( + package_tmp_dir.lock().await.is_none(), + "Package temp dir should be taken" + ); + + // Verify actual filesystem cleanup + assert!( + tokio::fs::metadata(uv_temp_path).await.is_err(), + "UV temp directory should be deleted from filesystem" + ); + } + + #[tokio::test] + async fn test_explicit_cleanup_prevents_automatic_cleanup() { + // Create a mock app that transitions to Exited after 100ms + let app = Arc::new(Mutex::new(MockLocalApp::new( + Status::Running, + Duration::from_millis(100), + ))); + + // Create temp directories + let (package_tmp, uv_temp_path) = create_test_dirs().await; + let package_tmp_dir = Arc::new(Mutex::new(Some(package_tmp))); + let uv_temp_dir = Arc::new(Mutex::new(Some(uv_temp_path.clone()))); + let cleanup_called = Arc::new(AtomicBool::new(false)); + + // Spawn cleanup monitor with short timeout (500ms) + spawn_cleanup_monitor( + "test-run-2".to_string(), + app, + package_tmp_dir.clone(), + uv_temp_dir.clone(), + cleanup_called.clone(), + Duration::from_millis(500), + ); + + // Wait for app to transition + polling to detect it (up to 1100ms) + tokio::time::sleep(Duration::from_millis(1200)).await; + + // Simulate explicit cleanup call before timeout expires + cleanup_called.store(true, Ordering::Relaxed); + + // Manually clean up directories (simulating explicit cleanup) + if let Some(temp_dir) = uv_temp_dir.lock().await.take() { + let _ = tokio::fs::remove_dir_all(&temp_dir).await; + } + if let Some(tmp_dir) = package_tmp_dir.lock().await.take() { + let _ = tokio::fs::remove_dir_all(tmp_dir.to_path_buf()).await; + } + + // Wait past the cleanup timeout (already waited 1200ms, need 500ms more + buffer) + tokio::time::sleep(Duration::from_millis(700)).await; + + // Verify cleanup flag is still true + assert!( + cleanup_called.load(Ordering::Relaxed), + "Cleanup flag should remain set" + ); + + // Verify directories were already cleaned up + assert!( + uv_temp_dir.lock().await.is_none(), + "UV temp dir should already be taken" + ); + assert!( + package_tmp_dir.lock().await.is_none(), + "Package temp dir should already be taken" + ); + } + + #[tokio::test] + async fn test_cleanup_waits_for_terminal_state() { + // Create a mock app that takes longer to transition (1500ms) + let app = Arc::new(Mutex::new(MockLocalApp::new( + Status::Running, + Duration::from_millis(1500), + ))); + + // Create temp directories + let (package_tmp, uv_temp_path) = create_test_dirs().await; + let package_tmp_dir = Arc::new(Mutex::new(Some(package_tmp))); + let uv_temp_dir = Arc::new(Mutex::new(Some(uv_temp_path.clone()))); + let cleanup_called = Arc::new(AtomicBool::new(false)); + + // Spawn cleanup monitor with short timeout (200ms) + spawn_cleanup_monitor( + "test-run-3".to_string(), + app.clone(), + package_tmp_dir.clone(), + uv_temp_dir.clone(), + cleanup_called.clone(), + Duration::from_millis(200), + ); + + // Check status well before transition (500ms) + tokio::time::sleep(Duration::from_millis(500)).await; + + // Cleanup should NOT have happened yet because app is still Running + assert!( + !cleanup_called.load(Ordering::Relaxed), + "Cleanup should not trigger while app is still running" + ); + + // Verify directories still exist + assert!( + uv_temp_dir.lock().await.is_some(), + "UV temp dir should still exist" + ); + + // Wait for: + // - Rest of transition (1000ms more) + // - Polling to detect terminal state (up to 1000ms) + // - Cleanup timeout (200ms) + // - Buffer (300ms) + tokio::time::sleep(Duration::from_millis(2600)).await; + + // Now cleanup should have happened + assert!( + cleanup_called.load(Ordering::Relaxed), + "Cleanup should trigger after app reaches terminal state" + ); + + // Cleanup the temp directory manually if test failed to clean it up + let _ = tokio::fs::remove_dir_all(uv_temp_path).await; + } +} diff --git a/crates/tower-runtime/src/lib.rs b/crates/tower-runtime/src/lib.rs index 74091edc..b4f56591 100644 --- a/crates/tower-runtime/src/lib.rs +++ b/crates/tower-runtime/src/lib.rs @@ -6,6 +6,7 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tower_package::Package; +pub mod auto_cleanup; pub mod errors; pub mod execution; pub mod local; @@ -44,6 +45,13 @@ pub enum Status { Crashed { code: i32 }, } +impl Status { + /// Returns true if this status represents a terminal state (run is finished) + pub fn is_terminal(&self) -> bool { + matches!(self, Status::Exited | Status::Crashed { .. }) + } +} + pub type OutputReceiver = UnboundedReceiver; pub type OutputSender = UnboundedSender; diff --git a/crates/tower-runtime/src/subprocess.rs b/crates/tower-runtime/src/subprocess.rs index 15603f5b..decdb1b0 100644 --- a/crates/tower-runtime/src/subprocess.rs +++ b/crates/tower-runtime/src/subprocess.rs @@ -1,5 +1,6 @@ //! Subprocess execution backend +use crate::auto_cleanup; use crate::errors::Error; use crate::execution::{ BackendCapabilities, CacheBackend, ExecutionBackend, ExecutionHandle, ExecutionSpec, @@ -10,6 +11,7 @@ use crate::{App, OutputReceiver, StartOptions, Status}; use async_trait::async_trait; use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tmpdir::TmpDir; use tokio::fs::File; @@ -18,6 +20,9 @@ use tokio::sync::Mutex; use tokio::time::Duration; use tower_package::Package; +/// Cleanup timeout after a run finishes (5 minutes) +const CLEANUP_TIMEOUT: Duration = Duration::from_secs(5 * 60); + /// SubprocessBackend executes apps as a subprocess pub struct SubprocessBackend { /// Optional default cache directory to use @@ -146,14 +151,29 @@ impl ExecutionBackend for SubprocessBackend { }; // Start the LocalApp - let app = LocalApp::start(opts).await?; + let app = Arc::new(Mutex::new(LocalApp::start(opts).await?)); + + let package_tmp_dir = Arc::new(Mutex::new(package_tmp_dir)); + let uv_temp_dir = Arc::new(Mutex::new(uv_temp_dir)); + let cleanup_called = Arc::new(AtomicBool::new(false)); + + // Spawn automatic cleanup monitor (temporary workaround for disconnected control plane) + auto_cleanup::spawn_cleanup_monitor( + spec.id.clone(), + Arc::clone(&app), + Arc::clone(&package_tmp_dir), + Arc::clone(&uv_temp_dir), + Arc::clone(&cleanup_called), + CLEANUP_TIMEOUT, + ); Ok(SubprocessHandle { id: spec.id, - app: Arc::new(Mutex::new(app)), + app, output_receiver: Arc::new(Mutex::new(output_receiver)), package_tmp_dir, uv_temp_dir, + cleanup_called, }) } @@ -181,22 +201,9 @@ pub struct SubprocessHandle { id: String, app: Arc>, output_receiver: Arc>, - package_tmp_dir: Option, // Track package temp directory for cleanup - uv_temp_dir: Option, // Track UV's temp directory for cleanup -} - -impl Drop for SubprocessHandle { - fn drop(&mut self) { - // Best-effort cleanup of UV temp directory when handle is dropped - if let Some(temp_dir) = self.uv_temp_dir.take() { - let _ = std::fs::remove_dir_all(&temp_dir); - } - - // Best-effort cleanup of package temp directory when handle is dropped - if let Some(tmp_dir) = self.package_tmp_dir.take() { - let _ = std::fs::remove_dir_all(tmp_dir.to_path_buf()); - } - } + package_tmp_dir: Arc>>, + uv_temp_dir: Arc>>, + cleanup_called: Arc, } #[async_trait] @@ -256,23 +263,28 @@ impl ExecutionHandle for SubprocessHandle { } async fn cleanup(&mut self) -> Result<(), Error> { + use tower_telemetry::{debug, info}; + + info!("Explicit cleanup called for run {}", self.id); + + // Mark cleanup as called (prevents timer from running) + self.cleanup_called.store(true, Ordering::Relaxed); + // Ensure the app is terminated self.terminate().await?; - // Clean up uv's temp directory if it was created - if let Some(ref temp_dir) = self.uv_temp_dir { - if let Err(e) = tokio::fs::remove_dir_all(temp_dir).await { - // Log but don't fail - cleanup is best-effort - tower_telemetry::debug!("Failed to clean up uv temp directory: {:?}", e); + // Clean up uv's temp directory + if let Some(temp_dir) = self.uv_temp_dir.lock().await.take() { + if let Err(e) = tokio::fs::remove_dir_all(&temp_dir).await { + debug!("Failed to clean up uv temp directory: {:?}", e); } } // Clean up package temp directory - if let Some(tmp_dir) = self.package_tmp_dir.take() { + if let Some(tmp_dir) = self.package_tmp_dir.lock().await.take() { let path = tmp_dir.to_path_buf(); if let Err(e) = tokio::fs::remove_dir_all(&path).await { - // Log but don't fail - cleanup is best-effort - tower_telemetry::debug!("Failed to clean up package temp directory: {:?}", e); + debug!("Failed to clean up package temp directory: {:?}", e); } } From 59b63e711f5846b91a523572741dbb4ea12ea39d Mon Sep 17 00:00:00 2001 From: Vim Wickramasinghe Date: Wed, 4 Feb 2026 15:03:07 +0100 Subject: [PATCH 2/3] [TOW-1342] Timeout auto cleanup for subprocess --- crates/tower-runtime/src/auto_cleanup.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/tower-runtime/src/auto_cleanup.rs b/crates/tower-runtime/src/auto_cleanup.rs index fae3504a..bfb29d81 100644 --- a/crates/tower-runtime/src/auto_cleanup.rs +++ b/crates/tower-runtime/src/auto_cleanup.rs @@ -3,8 +3,6 @@ //! This module exists to handle the case where the control plane //! disconnects and never sends a cleanup call to the runner. Under normal circumstances, //! the control plane should always call cleanup after a run finishes. -//! -//! **TODO**: Possibly remove this module once the control plane reliability issues are resolved. //! ref: TOW-1342 use std::path::PathBuf; From 0d4152f9c5759d561c8a0475640cfaf641807452 Mon Sep 17 00:00:00 2001 From: Vim Wickramasinghe Date: Wed, 4 Feb 2026 15:11:59 +0100 Subject: [PATCH 3/3] adjust polling interval --- crates/tower-runtime/src/auto_cleanup.rs | 69 +++++++++++++++--------- 1 file changed, 44 insertions(+), 25 deletions(-) diff --git a/crates/tower-runtime/src/auto_cleanup.rs b/crates/tower-runtime/src/auto_cleanup.rs index bfb29d81..992fb23f 100644 --- a/crates/tower-runtime/src/auto_cleanup.rs +++ b/crates/tower-runtime/src/auto_cleanup.rs @@ -14,11 +14,14 @@ use tokio::sync::Mutex; use crate::App; +/// How often to poll the app status to check if it has reached terminal state +const STATUS_POLL_INTERVAL: Duration = Duration::from_secs(5); + /// Spawns a background task that monitors an app and performs automatic cleanup /// after a timeout if explicit cleanup hasn't been called. /// /// This task: -/// 1. Polls the app status every n seconds +/// 1. Polls the app status every STATUS_POLL_INTERVAL /// 2. When the app reaches a terminal state, waits for cleanup_timeout /// 3. If cleanup_called flag is still false, performs cleanup and logs a warning pub fn spawn_cleanup_monitor( @@ -34,7 +37,7 @@ pub fn spawn_cleanup_monitor( // Wait for terminal state loop { - tokio::time::sleep(Duration::from_secs(5)).await; + tokio::time::sleep(STATUS_POLL_INTERVAL).await; let status = app.lock().await.status().await; if matches!(status, Ok(s) if s.is_terminal()) { info!( @@ -150,22 +153,28 @@ mod tests { // Verify directories exist before cleanup assert!(tokio::fs::metadata(uv_temp_path.clone()).await.is_ok()); - // Spawn cleanup monitor with short timeout (500ms) + let cleanup_timeout = Duration::from_secs(1); + + // Spawn cleanup monitor spawn_cleanup_monitor( "test-run-1".to_string(), app, package_tmp_dir.clone(), uv_temp_dir.clone(), cleanup_called.clone(), - Duration::from_millis(500), + cleanup_timeout, ); // Wait for: // - App to transition (100ms) - // - Polling to detect terminal state (up to 1000ms) - // - Cleanup timeout (500ms) - // - Buffer (500ms) - tokio::time::sleep(Duration::from_millis(2200)).await; + // - Polling to detect terminal state (up to STATUS_POLL_INTERVAL) + // - Cleanup timeout + // - Buffer (1000ms) + let wait_time = Duration::from_millis(100) + + STATUS_POLL_INTERVAL + + cleanup_timeout + + Duration::from_secs(1); + tokio::time::sleep(wait_time).await; // Verify cleanup happened assert!( @@ -204,18 +213,22 @@ mod tests { let uv_temp_dir = Arc::new(Mutex::new(Some(uv_temp_path.clone()))); let cleanup_called = Arc::new(AtomicBool::new(false)); - // Spawn cleanup monitor with short timeout (500ms) + let cleanup_timeout = Duration::from_secs(1); + + // Spawn cleanup monitor spawn_cleanup_monitor( "test-run-2".to_string(), app, package_tmp_dir.clone(), uv_temp_dir.clone(), cleanup_called.clone(), - Duration::from_millis(500), + cleanup_timeout, ); - // Wait for app to transition + polling to detect it (up to 1100ms) - tokio::time::sleep(Duration::from_millis(1200)).await; + // Wait for app to transition + polling to detect it + let wait_before_cleanup = + Duration::from_millis(100) + STATUS_POLL_INTERVAL + Duration::from_millis(100); + tokio::time::sleep(wait_before_cleanup).await; // Simulate explicit cleanup call before timeout expires cleanup_called.store(true, Ordering::Relaxed); @@ -228,8 +241,8 @@ mod tests { let _ = tokio::fs::remove_dir_all(tmp_dir.to_path_buf()).await; } - // Wait past the cleanup timeout (already waited 1200ms, need 500ms more + buffer) - tokio::time::sleep(Duration::from_millis(700)).await; + // Wait past the cleanup timeout to ensure automatic cleanup would have triggered + tokio::time::sleep(cleanup_timeout + Duration::from_secs(1)).await; // Verify cleanup flag is still true assert!( @@ -250,10 +263,10 @@ mod tests { #[tokio::test] async fn test_cleanup_waits_for_terminal_state() { - // Create a mock app that takes longer to transition (1500ms) + // Create a mock app that takes longer to transition (6s) let app = Arc::new(Mutex::new(MockLocalApp::new( Status::Running, - Duration::from_millis(1500), + Duration::from_secs(6), ))); // Create temp directories @@ -262,18 +275,20 @@ mod tests { let uv_temp_dir = Arc::new(Mutex::new(Some(uv_temp_path.clone()))); let cleanup_called = Arc::new(AtomicBool::new(false)); - // Spawn cleanup monitor with short timeout (200ms) + let cleanup_timeout = Duration::from_millis(500); + + // Spawn cleanup monitor spawn_cleanup_monitor( "test-run-3".to_string(), app.clone(), package_tmp_dir.clone(), uv_temp_dir.clone(), cleanup_called.clone(), - Duration::from_millis(200), + cleanup_timeout, ); - // Check status well before transition (500ms) - tokio::time::sleep(Duration::from_millis(500)).await; + // Check status well before transition + tokio::time::sleep(Duration::from_secs(2)).await; // Cleanup should NOT have happened yet because app is still Running assert!( @@ -288,11 +303,15 @@ mod tests { ); // Wait for: - // - Rest of transition (1000ms more) - // - Polling to detect terminal state (up to 1000ms) - // - Cleanup timeout (200ms) - // - Buffer (300ms) - tokio::time::sleep(Duration::from_millis(2600)).await; + // - Rest of transition (4s more) + // - Polling to detect terminal state (up to STATUS_POLL_INTERVAL) + // - Cleanup timeout + // - Buffer (1s) + let remaining_wait = Duration::from_secs(4) + + STATUS_POLL_INTERVAL + + cleanup_timeout + + Duration::from_secs(1); + tokio::time::sleep(remaining_wait).await; // Now cleanup should have happened assert!(