diff --git a/crates/tower-runtime/src/subprocess.rs b/crates/tower-runtime/src/subprocess.rs index 713722f2..de3147ce 100644 --- a/crates/tower-runtime/src/subprocess.rs +++ b/crates/tower-runtime/src/subprocess.rs @@ -11,6 +11,7 @@ use crate::{App, OutputReceiver, StartOptions, Status}; use async_trait::async_trait; use std::path::PathBuf; use std::sync::Arc; +use tmpdir::TmpDir; use tokio::fs::File; use tokio::io::AsyncWriteExt; use tokio::sync::Mutex; @@ -37,7 +38,7 @@ impl SubprocessBackend { mut package_stream: Box, ) -> Result { // Create temp directory for this package - let temp_dir = tmpdir::TmpDir::new("tower-package") + let temp_dir = TmpDir::new("tower-package") .await .map_err(|_| Error::PackageCreateFailed)?; @@ -78,14 +79,39 @@ impl ExecutionBackend for SubprocessBackend { _ => self.cache_dir.clone(), }; + // Create a unique temp directory for uv if no cache directory is configured + let (final_cache_dir, uv_temp_dir) = if cache_dir.is_none() { + let temp_path = std::env::temp_dir().join(format!("tower-uv-{}", spec.id)); + tokio::fs::create_dir_all(&temp_path) + .await + .map_err(|_| Error::PackageCreateFailed)?; + // Use the temp directory as cache_dir and track it for cleanup + (Some(temp_path.clone()), Some(temp_path)) + } else { + // Use provided cache_dir, no temp dir to clean up + (cache_dir, None) + }; + // Receive package stream and unpack it - let package = self.receive_and_unpack_package(spec.package_stream).await?; + let mut package = self.receive_and_unpack_package(spec.package_stream).await?; let unpacked_path = package .unpacked_path .clone() .ok_or(Error::PackageUnpackFailed)?; + // Extract tmp_dir from package for cleanup tracking + // We need to keep this alive until execution completes + let package_tmp_dir = package.tmp_dir.take(); + + // Set TMPDIR to the same isolated directory to ensure lock files also go there + let mut env_vars = spec.env_vars; + if let Some(ref temp_dir) = uv_temp_dir { + env_vars.insert("TMPDIR".to_string(), temp_dir.to_string_lossy().to_string()); + env_vars.insert("TEMP".to_string(), temp_dir.to_string_lossy().to_string()); + env_vars.insert("TMP".to_string(), temp_dir.to_string_lossy().to_string()); + } + let opts = StartOptions { ctx: spec.telemetry_ctx, package: Package::from_unpacked_path(unpacked_path).await?, @@ -93,9 +119,9 @@ impl ExecutionBackend for SubprocessBackend { environment: spec.environment, secrets: spec.secrets, parameters: spec.parameters, - env_vars: spec.env_vars, + env_vars, output_sender: output_sender.clone(), - cache_dir, + cache_dir: final_cache_dir, // UV will use this via --cache-dir flag }; // Start the LocalApp @@ -105,7 +131,8 @@ impl ExecutionBackend for SubprocessBackend { id: spec.id, app: Arc::new(Mutex::new(app)), output_receiver: Arc::new(Mutex::new(output_receiver)), - _package: package, // Keep package alive so temp dir doesn't get cleaned up + package_tmp_dir, + uv_temp_dir, }) } @@ -133,7 +160,22 @@ pub struct SubprocessHandle { id: String, app: Arc>, output_receiver: Arc>, - _package: Package, // Keep package alive to prevent temp dir cleanup + package_tmp_dir: Option, // Track package temp directory for cleanup + uv_temp_dir: Option, // Track UV's temp directory for cleanup +} + +impl Drop for SubprocessHandle { + fn drop(&mut self) { + // Best-effort cleanup of UV temp directory when handle is dropped + if let Some(temp_dir) = self.uv_temp_dir.take() { + let _ = std::fs::remove_dir_all(&temp_dir); + } + + // Best-effort cleanup of package temp directory when handle is dropped + if let Some(tmp_dir) = self.package_tmp_dir.take() { + let _ = std::fs::remove_dir_all(tmp_dir.to_path_buf()); + } + } } #[async_trait] @@ -195,6 +237,24 @@ impl ExecutionHandle for SubprocessHandle { async fn cleanup(&mut self) -> Result<(), Error> { // Ensure the app is terminated self.terminate().await?; + + // Clean up uv's temp directory if it was created + if let Some(ref temp_dir) = self.uv_temp_dir { + if let Err(e) = tokio::fs::remove_dir_all(temp_dir).await { + // Log but don't fail - cleanup is best-effort + tower_telemetry::debug!("Failed to clean up uv temp directory: {:?}", e); + } + } + + // Clean up package temp directory + if let Some(tmp_dir) = self.package_tmp_dir.take() { + let path = tmp_dir.to_path_buf(); + if let Err(e) = tokio::fs::remove_dir_all(&path).await { + // Log but don't fail - cleanup is best-effort + tower_telemetry::debug!("Failed to clean up package temp directory: {:?}", e); + } + } + Ok(()) } } diff --git a/crates/tower-runtime/tests/subprocess_test.rs b/crates/tower-runtime/tests/subprocess_test.rs new file mode 100644 index 00000000..40a99929 --- /dev/null +++ b/crates/tower-runtime/tests/subprocess_test.rs @@ -0,0 +1,173 @@ +use std::collections::HashMap; +use std::path::PathBuf; + +use tower_runtime::execution::{ + CacheBackend, CacheConfig, CacheIsolation, ExecutionBackend, ExecutionHandle, ExecutionSpec, + ResourceLimits, RuntimeConfig, +}; +use tower_runtime::subprocess::SubprocessBackend; +use tower_runtime::Status; + +use config::Towerfile; +use tower_package::{Package, PackageSpec}; + +fn get_example_app_dir(name: &str) -> PathBuf { + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("tests"); + path.push("example-apps"); + path.push(name); + assert!( + path.exists(), + "Example app directory does not exist: {}", + path.display() + ); + path +} + +async fn build_package_from_dir(dir: &PathBuf) -> Package { + let towerfile = Towerfile::from_path(dir.join("Towerfile")).expect("Failed to load Towerfile"); + let spec = PackageSpec::from_towerfile(&towerfile); + let mut package = Package::build(spec) + .await + .expect("Failed to build package from directory"); + package.unpack().await.expect("Failed to unpack package"); + package +} + +async fn create_execution_spec(id: String, package: Package) -> ExecutionSpec { + let tar_gz_path = package + .package_file_path + .expect("Package should have tar.gz file"); + + let file = tokio::fs::File::open(&tar_gz_path) + .await + .expect("Failed to open package file"); + + ExecutionSpec { + id, + telemetry_ctx: tower_telemetry::Context::new(), + package_stream: Box::new(file), + environment: "test".to_string(), + secrets: HashMap::new(), + parameters: HashMap::new(), + env_vars: HashMap::new(), + runtime: RuntimeConfig { + image: "python:3.11".to_string(), + version: None, + cache: CacheConfig { + enable_bundle_cache: false, + enable_runtime_cache: false, + enable_dependency_cache: false, + backend: CacheBackend::None, + isolation: CacheIsolation::None, + }, + entrypoint: None, + command: None, + }, + resources: ResourceLimits { + cpu_millicores: None, + memory_mb: None, + storage_mb: None, + max_pids: None, + gpu_count: 0, + timeout_seconds: 300, + }, + networking: None, + } +} + +/// Check if specific execution's temp directory exists +fn uv_temp_dir_exists(execution_id: &str) -> bool { + let tmp_dir = std::env::temp_dir(); + let uv_cache_dir = tmp_dir.join(format!("tower-uv-{}", execution_id)); + uv_cache_dir.exists() +} + +#[tokio::test] +async fn test_no_temp_file_accumulation_happy_path() { + let execution_id = "test-happy-cleanup"; + + // Execute app with dependencies + let app_dir = get_example_app_dir("02-use-faker"); + let package = build_package_from_dir(&app_dir).await; + let backend = SubprocessBackend::new(None); + let spec = create_execution_spec(execution_id.to_string(), package).await; + + let mut handle = backend + .create(spec) + .await + .expect("Failed to create execution"); + let status = handle + .wait_for_completion() + .await + .expect("Failed to wait for completion"); + + assert_eq!(status, Status::Exited, "App should exit successfully"); + + // Cleanup + handle.cleanup().await.expect("Failed to cleanup"); + + // Verify this execution's temp directory was cleaned up + assert!( + !uv_temp_dir_exists(execution_id), + "UV temp directory should be cleaned up after execution" + ); +} + +#[tokio::test] +async fn test_no_temp_file_accumulation_on_termination() { + let execution_id = "test-terminate-cleanup"; + + // Execute app with dependencies + let app_dir = get_example_app_dir("02-use-faker"); + let package = build_package_from_dir(&app_dir).await; + let backend = SubprocessBackend::new(None); + let spec = create_execution_spec(execution_id.to_string(), package).await; + + let mut handle = backend + .create(spec) + .await + .expect("Failed to create execution"); + + // Let it start, then terminate + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + handle.terminate().await.expect("Failed to terminate"); + + // Cleanup + handle.cleanup().await.expect("Failed to cleanup"); + + // Verify this execution's temp directory was cleaned up + assert!( + !uv_temp_dir_exists(execution_id), + "UV temp directory should be cleaned up after termination" + ); +} + +#[tokio::test] +async fn test_multiple_executions_no_accumulation() { + // Run multiple executions + for i in 0..3 { + let execution_id = format!("test-multi-cleanup-{}", i); + let app_dir = get_example_app_dir("01-hello-world"); + let package = build_package_from_dir(&app_dir).await; + let backend = SubprocessBackend::new(None); + let spec = create_execution_spec(execution_id.clone(), package).await; + + let mut handle = backend + .create(spec) + .await + .expect("Failed to create execution"); + handle + .wait_for_completion() + .await + .expect("Failed to wait for completion"); + handle.cleanup().await.expect("Failed to cleanup"); + + // Verify each execution's temp directory is cleaned up + assert!( + !uv_temp_dir_exists(&execution_id), + "UV temp directory {} should be cleaned up", + execution_id + ); + } +} diff --git a/crates/tower-uv/src/lib.rs b/crates/tower-uv/src/lib.rs index 33daf16b..91077319 100644 --- a/crates/tower-uv/src/lib.rs +++ b/crates/tower-uv/src/lib.rs @@ -61,14 +61,18 @@ fn normalize_env_vars(env_vars: &HashMap) -> HashMap) -> HashMap