-
Notifications
You must be signed in to change notification settings - Fork 3
[TOW-1394] UV cache & package path cleanup #184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
5bca9fa
[TOW-1394][WIP] Attempt cache cleanup
sammuti 9561bec
In case of no cache_dir provided create temporary directories for uv …
sammuti a01f013
Correctly set cache and clean
sammuti 6248f10
Track & explicitly remove tmp pacakge dir
sammuti 8a70950
Test cleanup
sammuti File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ use crate::{App, OutputReceiver, StartOptions, Status}; | |
| use async_trait::async_trait; | ||
| use std::path::PathBuf; | ||
| use std::sync::Arc; | ||
| use tmpdir::TmpDir; | ||
| use tokio::fs::File; | ||
| use tokio::io::AsyncWriteExt; | ||
| use tokio::sync::Mutex; | ||
|
|
@@ -37,7 +38,7 @@ impl SubprocessBackend { | |
| mut package_stream: Box<dyn tokio::io::AsyncRead + Send + Unpin>, | ||
| ) -> Result<Package, Error> { | ||
| // Create temp directory for this package | ||
| let temp_dir = tmpdir::TmpDir::new("tower-package") | ||
| let temp_dir = TmpDir::new("tower-package") | ||
| .await | ||
| .map_err(|_| Error::PackageCreateFailed)?; | ||
|
|
||
|
|
@@ -78,24 +79,49 @@ impl ExecutionBackend for SubprocessBackend { | |
| _ => self.cache_dir.clone(), | ||
| }; | ||
|
|
||
| // Create a unique temp directory for uv if no cache directory is configured | ||
| let (final_cache_dir, uv_temp_dir) = if cache_dir.is_none() { | ||
| let temp_path = std::env::temp_dir().join(format!("tower-uv-{}", spec.id)); | ||
| tokio::fs::create_dir_all(&temp_path) | ||
| .await | ||
| .map_err(|_| Error::PackageCreateFailed)?; | ||
| // Use the temp directory as cache_dir and track it for cleanup | ||
| (Some(temp_path.clone()), Some(temp_path)) | ||
| } else { | ||
| // Use provided cache_dir, no temp dir to clean up | ||
| (cache_dir, None) | ||
| }; | ||
|
|
||
| // Receive package stream and unpack it | ||
| let package = self.receive_and_unpack_package(spec.package_stream).await?; | ||
| let mut package = self.receive_and_unpack_package(spec.package_stream).await?; | ||
|
|
||
| let unpacked_path = package | ||
| .unpacked_path | ||
| .clone() | ||
| .ok_or(Error::PackageUnpackFailed)?; | ||
|
|
||
| // Extract tmp_dir from package for cleanup tracking | ||
| // We need to keep this alive until execution completes | ||
| let package_tmp_dir = package.tmp_dir.take(); | ||
|
|
||
| // Set TMPDIR to the same isolated directory to ensure lock files also go there | ||
| let mut env_vars = spec.env_vars; | ||
| if let Some(ref temp_dir) = uv_temp_dir { | ||
| env_vars.insert("TMPDIR".to_string(), temp_dir.to_string_lossy().to_string()); | ||
| env_vars.insert("TEMP".to_string(), temp_dir.to_string_lossy().to_string()); | ||
| env_vars.insert("TMP".to_string(), temp_dir.to_string_lossy().to_string()); | ||
| } | ||
|
|
||
| let opts = StartOptions { | ||
| ctx: spec.telemetry_ctx, | ||
| package: Package::from_unpacked_path(unpacked_path).await?, | ||
| cwd: None, // LocalApp determines cwd from package | ||
| environment: spec.environment, | ||
| secrets: spec.secrets, | ||
| parameters: spec.parameters, | ||
| env_vars: spec.env_vars, | ||
| env_vars, | ||
| output_sender: output_sender.clone(), | ||
| cache_dir, | ||
| cache_dir: final_cache_dir, // UV will use this via --cache-dir flag | ||
| }; | ||
|
|
||
| // Start the LocalApp | ||
|
|
@@ -105,7 +131,8 @@ impl ExecutionBackend for SubprocessBackend { | |
| id: spec.id, | ||
| app: Arc::new(Mutex::new(app)), | ||
| output_receiver: Arc::new(Mutex::new(output_receiver)), | ||
| _package: package, // Keep package alive so temp dir doesn't get cleaned up | ||
| package_tmp_dir, | ||
| uv_temp_dir, | ||
| }) | ||
| } | ||
|
|
||
|
|
@@ -133,7 +160,22 @@ pub struct SubprocessHandle { | |
| id: String, | ||
| app: Arc<Mutex<LocalApp>>, | ||
| output_receiver: Arc<Mutex<OutputReceiver>>, | ||
| _package: Package, // Keep package alive to prevent temp dir cleanup | ||
| package_tmp_dir: Option<TmpDir>, // Track package temp directory for cleanup | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why can't we have the package manage this? We're removing the package here. Seems like this logic should be encapsulated in the package? |
||
| uv_temp_dir: Option<PathBuf>, // Track UV's temp directory for cleanup | ||
| } | ||
|
|
||
| impl Drop for SubprocessHandle { | ||
| fn drop(&mut self) { | ||
| // Best-effort cleanup of UV temp directory when handle is dropped | ||
| if let Some(temp_dir) = self.uv_temp_dir.take() { | ||
| let _ = std::fs::remove_dir_all(&temp_dir); | ||
| } | ||
|
|
||
| // Best-effort cleanup of package temp directory when handle is dropped | ||
| if let Some(tmp_dir) = self.package_tmp_dir.take() { | ||
| let _ = std::fs::remove_dir_all(tmp_dir.to_path_buf()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[async_trait] | ||
|
|
@@ -195,6 +237,24 @@ impl ExecutionHandle for SubprocessHandle { | |
| async fn cleanup(&mut self) -> Result<(), Error> { | ||
| // Ensure the app is terminated | ||
| self.terminate().await?; | ||
|
|
||
| // Clean up uv's temp directory if it was created | ||
| if let Some(ref temp_dir) = self.uv_temp_dir { | ||
| if let Err(e) = tokio::fs::remove_dir_all(temp_dir).await { | ||
| // Log but don't fail - cleanup is best-effort | ||
| tower_telemetry::debug!("Failed to clean up uv temp directory: {:?}", e); | ||
| } | ||
| } | ||
|
|
||
| // Clean up package temp directory | ||
| if let Some(tmp_dir) = self.package_tmp_dir.take() { | ||
| let path = tmp_dir.to_path_buf(); | ||
| if let Err(e) = tokio::fs::remove_dir_all(&path).await { | ||
| // Log but don't fail - cleanup is best-effort | ||
| tower_telemetry::debug!("Failed to clean up package temp directory: {:?}", e); | ||
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,173 @@ | ||
| use std::collections::HashMap; | ||
| use std::path::PathBuf; | ||
|
|
||
| use tower_runtime::execution::{ | ||
| CacheBackend, CacheConfig, CacheIsolation, ExecutionBackend, ExecutionHandle, ExecutionSpec, | ||
| ResourceLimits, RuntimeConfig, | ||
| }; | ||
| use tower_runtime::subprocess::SubprocessBackend; | ||
| use tower_runtime::Status; | ||
|
|
||
| use config::Towerfile; | ||
| use tower_package::{Package, PackageSpec}; | ||
|
|
||
| fn get_example_app_dir(name: &str) -> PathBuf { | ||
| let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); | ||
| path.push("tests"); | ||
| path.push("example-apps"); | ||
| path.push(name); | ||
| assert!( | ||
| path.exists(), | ||
| "Example app directory does not exist: {}", | ||
| path.display() | ||
| ); | ||
| path | ||
| } | ||
|
|
||
| async fn build_package_from_dir(dir: &PathBuf) -> Package { | ||
| let towerfile = Towerfile::from_path(dir.join("Towerfile")).expect("Failed to load Towerfile"); | ||
| let spec = PackageSpec::from_towerfile(&towerfile); | ||
| let mut package = Package::build(spec) | ||
| .await | ||
| .expect("Failed to build package from directory"); | ||
| package.unpack().await.expect("Failed to unpack package"); | ||
| package | ||
| } | ||
|
|
||
| async fn create_execution_spec(id: String, package: Package) -> ExecutionSpec { | ||
| let tar_gz_path = package | ||
| .package_file_path | ||
| .expect("Package should have tar.gz file"); | ||
|
|
||
| let file = tokio::fs::File::open(&tar_gz_path) | ||
| .await | ||
| .expect("Failed to open package file"); | ||
|
|
||
| ExecutionSpec { | ||
| id, | ||
| telemetry_ctx: tower_telemetry::Context::new(), | ||
| package_stream: Box::new(file), | ||
| environment: "test".to_string(), | ||
| secrets: HashMap::new(), | ||
| parameters: HashMap::new(), | ||
| env_vars: HashMap::new(), | ||
| runtime: RuntimeConfig { | ||
| image: "python:3.11".to_string(), | ||
| version: None, | ||
| cache: CacheConfig { | ||
| enable_bundle_cache: false, | ||
| enable_runtime_cache: false, | ||
| enable_dependency_cache: false, | ||
| backend: CacheBackend::None, | ||
| isolation: CacheIsolation::None, | ||
| }, | ||
| entrypoint: None, | ||
| command: None, | ||
| }, | ||
| resources: ResourceLimits { | ||
| cpu_millicores: None, | ||
| memory_mb: None, | ||
| storage_mb: None, | ||
| max_pids: None, | ||
| gpu_count: 0, | ||
| timeout_seconds: 300, | ||
| }, | ||
| networking: None, | ||
| } | ||
| } | ||
|
|
||
| /// Check if specific execution's temp directory exists | ||
| fn uv_temp_dir_exists(execution_id: &str) -> bool { | ||
| let tmp_dir = std::env::temp_dir(); | ||
| let uv_cache_dir = tmp_dir.join(format!("tower-uv-{}", execution_id)); | ||
| uv_cache_dir.exists() | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_no_temp_file_accumulation_happy_path() { | ||
| let execution_id = "test-happy-cleanup"; | ||
|
|
||
| // Execute app with dependencies | ||
| let app_dir = get_example_app_dir("02-use-faker"); | ||
| let package = build_package_from_dir(&app_dir).await; | ||
| let backend = SubprocessBackend::new(None); | ||
| let spec = create_execution_spec(execution_id.to_string(), package).await; | ||
|
|
||
| let mut handle = backend | ||
| .create(spec) | ||
| .await | ||
| .expect("Failed to create execution"); | ||
| let status = handle | ||
| .wait_for_completion() | ||
| .await | ||
| .expect("Failed to wait for completion"); | ||
|
|
||
| assert_eq!(status, Status::Exited, "App should exit successfully"); | ||
|
|
||
| // Cleanup | ||
| handle.cleanup().await.expect("Failed to cleanup"); | ||
|
|
||
| // Verify this execution's temp directory was cleaned up | ||
| assert!( | ||
| !uv_temp_dir_exists(execution_id), | ||
| "UV temp directory should be cleaned up after execution" | ||
| ); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_no_temp_file_accumulation_on_termination() { | ||
| let execution_id = "test-terminate-cleanup"; | ||
|
|
||
| // Execute app with dependencies | ||
| let app_dir = get_example_app_dir("02-use-faker"); | ||
| let package = build_package_from_dir(&app_dir).await; | ||
| let backend = SubprocessBackend::new(None); | ||
| let spec = create_execution_spec(execution_id.to_string(), package).await; | ||
|
|
||
| let mut handle = backend | ||
| .create(spec) | ||
| .await | ||
| .expect("Failed to create execution"); | ||
|
|
||
| // Let it start, then terminate | ||
| tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; | ||
| handle.terminate().await.expect("Failed to terminate"); | ||
|
|
||
| // Cleanup | ||
| handle.cleanup().await.expect("Failed to cleanup"); | ||
|
|
||
| // Verify this execution's temp directory was cleaned up | ||
| assert!( | ||
| !uv_temp_dir_exists(execution_id), | ||
| "UV temp directory should be cleaned up after termination" | ||
| ); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_multiple_executions_no_accumulation() { | ||
| // Run multiple executions | ||
| for i in 0..3 { | ||
| let execution_id = format!("test-multi-cleanup-{}", i); | ||
| let app_dir = get_example_app_dir("01-hello-world"); | ||
| let package = build_package_from_dir(&app_dir).await; | ||
| let backend = SubprocessBackend::new(None); | ||
| let spec = create_execution_spec(execution_id.clone(), package).await; | ||
|
|
||
| let mut handle = backend | ||
| .create(spec) | ||
| .await | ||
| .expect("Failed to create execution"); | ||
| handle | ||
| .wait_for_completion() | ||
| .await | ||
| .expect("Failed to wait for completion"); | ||
| handle.cleanup().await.expect("Failed to cleanup"); | ||
|
|
||
| // Verify each execution's temp directory is cleaned up | ||
| assert!( | ||
| !uv_temp_dir_exists(&execution_id), | ||
| "UV temp directory {} should be cleaned up", | ||
| execution_id | ||
| ); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to provide configuration for disabling this? It's a great default, but I feel it could cause some problems in certain cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I think I missed this--you're coupling the
cache_dirand the overriddentemp_dir. I think this is probably OK. It's a bit confusing at first. Here's the logic as I understand it.Why not decouple those two things from each other?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I think I see that this is intentional--I think we probably will want to separate these in the future, as it's kind of surprising behavior. But we can move forward with this as-is.