diff --git a/crates/tower-runtime/src/auto_cleanup.rs b/crates/tower-runtime/src/auto_cleanup.rs new file mode 100644 index 00000000..992fb23f --- /dev/null +++ b/crates/tower-runtime/src/auto_cleanup.rs @@ -0,0 +1,325 @@ +//! Automatic cleanup timer for subprocess executions +//! +//! This module exists to handle the case where the control plane +//! disconnects and never sends a cleanup call to the runner. Under normal circumstances, +//! the control plane should always call cleanup after a run finishes. +//! ref: TOW-1342 + +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tmpdir::TmpDir; +use tokio::sync::Mutex; + +use crate::App; + +/// How often to poll the app status to check if it has reached terminal state +const STATUS_POLL_INTERVAL: Duration = Duration::from_secs(5); + +/// Spawns a background task that monitors an app and performs automatic cleanup +/// after a timeout if explicit cleanup hasn't been called. +/// +/// This task: +/// 1. Polls the app status every STATUS_POLL_INTERVAL +/// 2. When the app reaches a terminal state, waits for cleanup_timeout +/// 3. If cleanup_called flag is still false, performs cleanup and logs a warning +pub fn spawn_cleanup_monitor( + run_id: String, + app: Arc>, + package_tmp_dir: Arc>>, + uv_temp_dir: Arc>>, + cleanup_called: Arc, + cleanup_timeout: Duration, +) { + tokio::spawn(async move { + use tower_telemetry::{info, warn}; + + // Wait for terminal state + loop { + tokio::time::sleep(STATUS_POLL_INTERVAL).await; + let status = app.lock().await.status().await; + if matches!(status, Ok(s) if s.is_terminal()) { + info!( + "Run {} finished, starting {}s automatic cleanup timer", + run_id, + cleanup_timeout.as_secs() + ); + break; + } + } + + // Wait for cleanup timeout + tokio::time::sleep(cleanup_timeout).await; + + // Check if explicit cleanup was called + if cleanup_called.load(Ordering::Relaxed) { + return; + } + + // Perform automatic cleanup + warn!( + "Automatic cleanup triggered for run {} after {}s (control plane cleanup not received)", + run_id, + cleanup_timeout.as_secs() + ); + + if let Some(temp_dir) = uv_temp_dir.lock().await.take() { + let _ = tokio::fs::remove_dir_all(&temp_dir).await; + } + + if let Some(tmp_dir) = package_tmp_dir.lock().await.take() { + let _ = tokio::fs::remove_dir_all(tmp_dir.to_path_buf()).await; + } + + cleanup_called.store(true, Ordering::Relaxed); + }); +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{errors::Error, StartOptions, Status}; + + /// Mock LocalApp for testing that allows controlled status transitions + struct MockLocalApp { + status: Arc>, + } + + impl MockLocalApp { + fn new(initial_status: Status, transition_to_terminal_after: Duration) -> Self { + let status = Arc::new(Mutex::new(initial_status)); + let app = Self { + status: status.clone(), + }; + + // Spawn background task to transition to terminal state after delay + tokio::spawn(async move { + tokio::time::sleep(transition_to_terminal_after).await; + *status.lock().await = Status::Exited; + }); + + app + } + } + + impl crate::App for MockLocalApp { + async fn start(_opts: StartOptions) -> Result { + unimplemented!("MockLocalApp doesn't support start") + } + + async fn terminate(&mut self) -> Result<(), Error> { + Ok(()) + } + + async fn status(&self) -> Result { + Ok(*self.status.lock().await) + } + } + + /// Helper to create temp directories for testing + async fn create_test_dirs() -> (TmpDir, PathBuf) { + let package_tmp = TmpDir::new("test-package") + .await + .expect("Failed to create package temp dir"); + + // Use timestamp for uniqueness + let unique_id = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + let uv_temp_path = std::env::temp_dir().join(format!("test-uv-{}", unique_id)); + tokio::fs::create_dir_all(&uv_temp_path) + .await + .expect("Failed to create uv temp dir"); + + (package_tmp, uv_temp_path) + } + + #[tokio::test] + async fn test_automatic_cleanup_triggers_after_timeout() { + // Create a mock app that transitions to Exited after 100ms + let app = Arc::new(Mutex::new(MockLocalApp::new( + Status::Running, + Duration::from_millis(100), + ))); + + // Create temp directories + let (package_tmp, uv_temp_path) = create_test_dirs().await; + let package_tmp_dir = Arc::new(Mutex::new(Some(package_tmp))); + let uv_temp_dir = Arc::new(Mutex::new(Some(uv_temp_path.clone()))); + let cleanup_called = Arc::new(AtomicBool::new(false)); + + // Verify directories exist before cleanup + assert!(tokio::fs::metadata(uv_temp_path.clone()).await.is_ok()); + + let cleanup_timeout = Duration::from_secs(1); + + // Spawn cleanup monitor + spawn_cleanup_monitor( + "test-run-1".to_string(), + app, + package_tmp_dir.clone(), + uv_temp_dir.clone(), + cleanup_called.clone(), + cleanup_timeout, + ); + + // Wait for: + // - App to transition (100ms) + // - Polling to detect terminal state (up to STATUS_POLL_INTERVAL) + // - Cleanup timeout + // - Buffer (1000ms) + let wait_time = Duration::from_millis(100) + + STATUS_POLL_INTERVAL + + cleanup_timeout + + Duration::from_secs(1); + tokio::time::sleep(wait_time).await; + + // Verify cleanup happened + assert!( + cleanup_called.load(Ordering::Relaxed), + "Cleanup flag should be set" + ); + + // Verify directories were removed + assert!( + uv_temp_dir.lock().await.is_none(), + "UV temp dir should be taken" + ); + assert!( + package_tmp_dir.lock().await.is_none(), + "Package temp dir should be taken" + ); + + // Verify actual filesystem cleanup + assert!( + tokio::fs::metadata(uv_temp_path).await.is_err(), + "UV temp directory should be deleted from filesystem" + ); + } + + #[tokio::test] + async fn test_explicit_cleanup_prevents_automatic_cleanup() { + // Create a mock app that transitions to Exited after 100ms + let app = Arc::new(Mutex::new(MockLocalApp::new( + Status::Running, + Duration::from_millis(100), + ))); + + // Create temp directories + let (package_tmp, uv_temp_path) = create_test_dirs().await; + let package_tmp_dir = Arc::new(Mutex::new(Some(package_tmp))); + let uv_temp_dir = Arc::new(Mutex::new(Some(uv_temp_path.clone()))); + let cleanup_called = Arc::new(AtomicBool::new(false)); + + let cleanup_timeout = Duration::from_secs(1); + + // Spawn cleanup monitor + spawn_cleanup_monitor( + "test-run-2".to_string(), + app, + package_tmp_dir.clone(), + uv_temp_dir.clone(), + cleanup_called.clone(), + cleanup_timeout, + ); + + // Wait for app to transition + polling to detect it + let wait_before_cleanup = + Duration::from_millis(100) + STATUS_POLL_INTERVAL + Duration::from_millis(100); + tokio::time::sleep(wait_before_cleanup).await; + + // Simulate explicit cleanup call before timeout expires + cleanup_called.store(true, Ordering::Relaxed); + + // Manually clean up directories (simulating explicit cleanup) + if let Some(temp_dir) = uv_temp_dir.lock().await.take() { + let _ = tokio::fs::remove_dir_all(&temp_dir).await; + } + if let Some(tmp_dir) = package_tmp_dir.lock().await.take() { + let _ = tokio::fs::remove_dir_all(tmp_dir.to_path_buf()).await; + } + + // Wait past the cleanup timeout to ensure automatic cleanup would have triggered + tokio::time::sleep(cleanup_timeout + Duration::from_secs(1)).await; + + // Verify cleanup flag is still true + assert!( + cleanup_called.load(Ordering::Relaxed), + "Cleanup flag should remain set" + ); + + // Verify directories were already cleaned up + assert!( + uv_temp_dir.lock().await.is_none(), + "UV temp dir should already be taken" + ); + assert!( + package_tmp_dir.lock().await.is_none(), + "Package temp dir should already be taken" + ); + } + + #[tokio::test] + async fn test_cleanup_waits_for_terminal_state() { + // Create a mock app that takes longer to transition (6s) + let app = Arc::new(Mutex::new(MockLocalApp::new( + Status::Running, + Duration::from_secs(6), + ))); + + // Create temp directories + let (package_tmp, uv_temp_path) = create_test_dirs().await; + let package_tmp_dir = Arc::new(Mutex::new(Some(package_tmp))); + let uv_temp_dir = Arc::new(Mutex::new(Some(uv_temp_path.clone()))); + let cleanup_called = Arc::new(AtomicBool::new(false)); + + let cleanup_timeout = Duration::from_millis(500); + + // Spawn cleanup monitor + spawn_cleanup_monitor( + "test-run-3".to_string(), + app.clone(), + package_tmp_dir.clone(), + uv_temp_dir.clone(), + cleanup_called.clone(), + cleanup_timeout, + ); + + // Check status well before transition + tokio::time::sleep(Duration::from_secs(2)).await; + + // Cleanup should NOT have happened yet because app is still Running + assert!( + !cleanup_called.load(Ordering::Relaxed), + "Cleanup should not trigger while app is still running" + ); + + // Verify directories still exist + assert!( + uv_temp_dir.lock().await.is_some(), + "UV temp dir should still exist" + ); + + // Wait for: + // - Rest of transition (4s more) + // - Polling to detect terminal state (up to STATUS_POLL_INTERVAL) + // - Cleanup timeout + // - Buffer (1s) + let remaining_wait = Duration::from_secs(4) + + STATUS_POLL_INTERVAL + + cleanup_timeout + + Duration::from_secs(1); + tokio::time::sleep(remaining_wait).await; + + // Now cleanup should have happened + assert!( + cleanup_called.load(Ordering::Relaxed), + "Cleanup should trigger after app reaches terminal state" + ); + + // Cleanup the temp directory manually if test failed to clean it up + let _ = tokio::fs::remove_dir_all(uv_temp_path).await; + } +} diff --git a/crates/tower-runtime/src/lib.rs b/crates/tower-runtime/src/lib.rs index 74091edc..b4f56591 100644 --- a/crates/tower-runtime/src/lib.rs +++ b/crates/tower-runtime/src/lib.rs @@ -6,6 +6,7 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tower_package::Package; +pub mod auto_cleanup; pub mod errors; pub mod execution; pub mod local; @@ -44,6 +45,13 @@ pub enum Status { Crashed { code: i32 }, } +impl Status { + /// Returns true if this status represents a terminal state (run is finished) + pub fn is_terminal(&self) -> bool { + matches!(self, Status::Exited | Status::Crashed { .. }) + } +} + pub type OutputReceiver = UnboundedReceiver; pub type OutputSender = UnboundedSender; diff --git a/crates/tower-runtime/src/subprocess.rs b/crates/tower-runtime/src/subprocess.rs index 15603f5b..decdb1b0 100644 --- a/crates/tower-runtime/src/subprocess.rs +++ b/crates/tower-runtime/src/subprocess.rs @@ -1,5 +1,6 @@ //! Subprocess execution backend +use crate::auto_cleanup; use crate::errors::Error; use crate::execution::{ BackendCapabilities, CacheBackend, ExecutionBackend, ExecutionHandle, ExecutionSpec, @@ -10,6 +11,7 @@ use crate::{App, OutputReceiver, StartOptions, Status}; use async_trait::async_trait; use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tmpdir::TmpDir; use tokio::fs::File; @@ -18,6 +20,9 @@ use tokio::sync::Mutex; use tokio::time::Duration; use tower_package::Package; +/// Cleanup timeout after a run finishes (5 minutes) +const CLEANUP_TIMEOUT: Duration = Duration::from_secs(5 * 60); + /// SubprocessBackend executes apps as a subprocess pub struct SubprocessBackend { /// Optional default cache directory to use @@ -146,14 +151,29 @@ impl ExecutionBackend for SubprocessBackend { }; // Start the LocalApp - let app = LocalApp::start(opts).await?; + let app = Arc::new(Mutex::new(LocalApp::start(opts).await?)); + + let package_tmp_dir = Arc::new(Mutex::new(package_tmp_dir)); + let uv_temp_dir = Arc::new(Mutex::new(uv_temp_dir)); + let cleanup_called = Arc::new(AtomicBool::new(false)); + + // Spawn automatic cleanup monitor (temporary workaround for disconnected control plane) + auto_cleanup::spawn_cleanup_monitor( + spec.id.clone(), + Arc::clone(&app), + Arc::clone(&package_tmp_dir), + Arc::clone(&uv_temp_dir), + Arc::clone(&cleanup_called), + CLEANUP_TIMEOUT, + ); Ok(SubprocessHandle { id: spec.id, - app: Arc::new(Mutex::new(app)), + app, output_receiver: Arc::new(Mutex::new(output_receiver)), package_tmp_dir, uv_temp_dir, + cleanup_called, }) } @@ -181,22 +201,9 @@ pub struct SubprocessHandle { id: String, app: Arc>, output_receiver: Arc>, - package_tmp_dir: Option, // Track package temp directory for cleanup - uv_temp_dir: Option, // Track UV's temp directory for cleanup -} - -impl Drop for SubprocessHandle { - fn drop(&mut self) { - // Best-effort cleanup of UV temp directory when handle is dropped - if let Some(temp_dir) = self.uv_temp_dir.take() { - let _ = std::fs::remove_dir_all(&temp_dir); - } - - // Best-effort cleanup of package temp directory when handle is dropped - if let Some(tmp_dir) = self.package_tmp_dir.take() { - let _ = std::fs::remove_dir_all(tmp_dir.to_path_buf()); - } - } + package_tmp_dir: Arc>>, + uv_temp_dir: Arc>>, + cleanup_called: Arc, } #[async_trait] @@ -256,23 +263,28 @@ impl ExecutionHandle for SubprocessHandle { } async fn cleanup(&mut self) -> Result<(), Error> { + use tower_telemetry::{debug, info}; + + info!("Explicit cleanup called for run {}", self.id); + + // Mark cleanup as called (prevents timer from running) + self.cleanup_called.store(true, Ordering::Relaxed); + // Ensure the app is terminated self.terminate().await?; - // Clean up uv's temp directory if it was created - if let Some(ref temp_dir) = self.uv_temp_dir { - if let Err(e) = tokio::fs::remove_dir_all(temp_dir).await { - // Log but don't fail - cleanup is best-effort - tower_telemetry::debug!("Failed to clean up uv temp directory: {:?}", e); + // Clean up uv's temp directory + if let Some(temp_dir) = self.uv_temp_dir.lock().await.take() { + if let Err(e) = tokio::fs::remove_dir_all(&temp_dir).await { + debug!("Failed to clean up uv temp directory: {:?}", e); } } // Clean up package temp directory - if let Some(tmp_dir) = self.package_tmp_dir.take() { + if let Some(tmp_dir) = self.package_tmp_dir.lock().await.take() { let path = tmp_dir.to_path_buf(); if let Err(e) = tokio::fs::remove_dir_all(&path).await { - // Log but don't fail - cleanup is best-effort - tower_telemetry::debug!("Failed to clean up package temp directory: {:?}", e); + debug!("Failed to clean up package temp directory: {:?}", e); } }