Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 66 additions & 6 deletions crates/tower-runtime/src/subprocess.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{App, OutputReceiver, StartOptions, Status};
use async_trait::async_trait;
use std::path::PathBuf;
use std::sync::Arc;
use tmpdir::TmpDir;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::sync::Mutex;
Expand All @@ -37,7 +38,7 @@ impl SubprocessBackend {
mut package_stream: Box<dyn tokio::io::AsyncRead + Send + Unpin>,
) -> Result<Package, Error> {
// Create temp directory for this package
let temp_dir = tmpdir::TmpDir::new("tower-package")
let temp_dir = TmpDir::new("tower-package")
.await
.map_err(|_| Error::PackageCreateFailed)?;

Expand Down Expand Up @@ -78,24 +79,49 @@ impl ExecutionBackend for SubprocessBackend {
_ => self.cache_dir.clone(),
};

// Create a unique temp directory for uv if no cache directory is configured
let (final_cache_dir, uv_temp_dir) = if cache_dir.is_none() {
let temp_path = std::env::temp_dir().join(format!("tower-uv-{}", spec.id));
tokio::fs::create_dir_all(&temp_path)
.await
.map_err(|_| Error::PackageCreateFailed)?;
// Use the temp directory as cache_dir and track it for cleanup
(Some(temp_path.clone()), Some(temp_path))
} else {
// Use provided cache_dir, no temp dir to clean up
(cache_dir, None)
};

// Receive package stream and unpack it
let package = self.receive_and_unpack_package(spec.package_stream).await?;
let mut package = self.receive_and_unpack_package(spec.package_stream).await?;

let unpacked_path = package
.unpacked_path
.clone()
.ok_or(Error::PackageUnpackFailed)?;

// Extract tmp_dir from package for cleanup tracking
// We need to keep this alive until execution completes
let package_tmp_dir = package.tmp_dir.take();

// Set TMPDIR to the same isolated directory to ensure lock files also go there
let mut env_vars = spec.env_vars;
if let Some(ref temp_dir) = uv_temp_dir {
env_vars.insert("TMPDIR".to_string(), temp_dir.to_string_lossy().to_string());
env_vars.insert("TEMP".to_string(), temp_dir.to_string_lossy().to_string());
env_vars.insert("TMP".to_string(), temp_dir.to_string_lossy().to_string());
}
Comment on lines +108 to +113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to provide configuration for disabling this? It's a great default, but I feel it could cause some problems in certain cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I think I missed this--you're coupling the cache_dir and the overridden temp_dir. I think this is probably OK. It's a bit confusing at first. Here's the logic as I understand it.

  1. If caching is enabled, which is the default, then we'll use the system-default temp directory.
  2. If caching is disabled, we jail the temporary directory and do our own cleanup.

Why not decouple those two things from each other?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think I see that this is intentional--I think we probably will want to separate these in the future, as it's kind of surprising behavior. But we can move forward with this as-is.


let opts = StartOptions {
ctx: spec.telemetry_ctx,
package: Package::from_unpacked_path(unpacked_path).await?,
cwd: None, // LocalApp determines cwd from package
environment: spec.environment,
secrets: spec.secrets,
parameters: spec.parameters,
env_vars: spec.env_vars,
env_vars,
output_sender: output_sender.clone(),
cache_dir,
cache_dir: final_cache_dir, // UV will use this via --cache-dir flag
};

// Start the LocalApp
Expand All @@ -105,7 +131,8 @@ impl ExecutionBackend for SubprocessBackend {
id: spec.id,
app: Arc::new(Mutex::new(app)),
output_receiver: Arc::new(Mutex::new(output_receiver)),
_package: package, // Keep package alive so temp dir doesn't get cleaned up
package_tmp_dir,
uv_temp_dir,
})
}

Expand Down Expand Up @@ -133,7 +160,22 @@ pub struct SubprocessHandle {
id: String,
app: Arc<Mutex<LocalApp>>,
output_receiver: Arc<Mutex<OutputReceiver>>,
_package: Package, // Keep package alive to prevent temp dir cleanup
package_tmp_dir: Option<TmpDir>, // Track package temp directory for cleanup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we have the package manage this? We're removing the package here. Seems like this logic should be encapsulated in the package?

uv_temp_dir: Option<PathBuf>, // Track UV's temp directory for cleanup
}

impl Drop for SubprocessHandle {
fn drop(&mut self) {
// Best-effort cleanup of UV temp directory when handle is dropped
if let Some(temp_dir) = self.uv_temp_dir.take() {
let _ = std::fs::remove_dir_all(&temp_dir);
}

// Best-effort cleanup of package temp directory when handle is dropped
if let Some(tmp_dir) = self.package_tmp_dir.take() {
let _ = std::fs::remove_dir_all(tmp_dir.to_path_buf());
}
}
}

#[async_trait]
Expand Down Expand Up @@ -195,6 +237,24 @@ impl ExecutionHandle for SubprocessHandle {
async fn cleanup(&mut self) -> Result<(), Error> {
// Ensure the app is terminated
self.terminate().await?;

// Clean up uv's temp directory if it was created
if let Some(ref temp_dir) = self.uv_temp_dir {
if let Err(e) = tokio::fs::remove_dir_all(temp_dir).await {
// Log but don't fail - cleanup is best-effort
tower_telemetry::debug!("Failed to clean up uv temp directory: {:?}", e);
}
}

// Clean up package temp directory
if let Some(tmp_dir) = self.package_tmp_dir.take() {
let path = tmp_dir.to_path_buf();
if let Err(e) = tokio::fs::remove_dir_all(&path).await {
// Log but don't fail - cleanup is best-effort
tower_telemetry::debug!("Failed to clean up package temp directory: {:?}", e);
}
}

Ok(())
}
}
173 changes: 173 additions & 0 deletions crates/tower-runtime/tests/subprocess_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
use std::collections::HashMap;
use std::path::PathBuf;

use tower_runtime::execution::{
CacheBackend, CacheConfig, CacheIsolation, ExecutionBackend, ExecutionHandle, ExecutionSpec,
ResourceLimits, RuntimeConfig,
};
use tower_runtime::subprocess::SubprocessBackend;
use tower_runtime::Status;

use config::Towerfile;
use tower_package::{Package, PackageSpec};

fn get_example_app_dir(name: &str) -> PathBuf {
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("tests");
path.push("example-apps");
path.push(name);
assert!(
path.exists(),
"Example app directory does not exist: {}",
path.display()
);
path
}

async fn build_package_from_dir(dir: &PathBuf) -> Package {
let towerfile = Towerfile::from_path(dir.join("Towerfile")).expect("Failed to load Towerfile");
let spec = PackageSpec::from_towerfile(&towerfile);
let mut package = Package::build(spec)
.await
.expect("Failed to build package from directory");
package.unpack().await.expect("Failed to unpack package");
package
}

async fn create_execution_spec(id: String, package: Package) -> ExecutionSpec {
let tar_gz_path = package
.package_file_path
.expect("Package should have tar.gz file");

let file = tokio::fs::File::open(&tar_gz_path)
.await
.expect("Failed to open package file");

ExecutionSpec {
id,
telemetry_ctx: tower_telemetry::Context::new(),
package_stream: Box::new(file),
environment: "test".to_string(),
secrets: HashMap::new(),
parameters: HashMap::new(),
env_vars: HashMap::new(),
runtime: RuntimeConfig {
image: "python:3.11".to_string(),
version: None,
cache: CacheConfig {
enable_bundle_cache: false,
enable_runtime_cache: false,
enable_dependency_cache: false,
backend: CacheBackend::None,
isolation: CacheIsolation::None,
},
entrypoint: None,
command: None,
},
resources: ResourceLimits {
cpu_millicores: None,
memory_mb: None,
storage_mb: None,
max_pids: None,
gpu_count: 0,
timeout_seconds: 300,
},
networking: None,
}
}

/// Check if specific execution's temp directory exists
fn uv_temp_dir_exists(execution_id: &str) -> bool {
let tmp_dir = std::env::temp_dir();
let uv_cache_dir = tmp_dir.join(format!("tower-uv-{}", execution_id));
uv_cache_dir.exists()
}

#[tokio::test]
async fn test_no_temp_file_accumulation_happy_path() {
let execution_id = "test-happy-cleanup";

// Execute app with dependencies
let app_dir = get_example_app_dir("02-use-faker");
let package = build_package_from_dir(&app_dir).await;
let backend = SubprocessBackend::new(None);
let spec = create_execution_spec(execution_id.to_string(), package).await;

let mut handle = backend
.create(spec)
.await
.expect("Failed to create execution");
let status = handle
.wait_for_completion()
.await
.expect("Failed to wait for completion");

assert_eq!(status, Status::Exited, "App should exit successfully");

// Cleanup
handle.cleanup().await.expect("Failed to cleanup");

// Verify this execution's temp directory was cleaned up
assert!(
!uv_temp_dir_exists(execution_id),
"UV temp directory should be cleaned up after execution"
);
}

#[tokio::test]
async fn test_no_temp_file_accumulation_on_termination() {
let execution_id = "test-terminate-cleanup";

// Execute app with dependencies
let app_dir = get_example_app_dir("02-use-faker");
let package = build_package_from_dir(&app_dir).await;
let backend = SubprocessBackend::new(None);
let spec = create_execution_spec(execution_id.to_string(), package).await;

let mut handle = backend
.create(spec)
.await
.expect("Failed to create execution");

// Let it start, then terminate
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
handle.terminate().await.expect("Failed to terminate");

// Cleanup
handle.cleanup().await.expect("Failed to cleanup");

// Verify this execution's temp directory was cleaned up
assert!(
!uv_temp_dir_exists(execution_id),
"UV temp directory should be cleaned up after termination"
);
}

#[tokio::test]
async fn test_multiple_executions_no_accumulation() {
// Run multiple executions
for i in 0..3 {
let execution_id = format!("test-multi-cleanup-{}", i);
let app_dir = get_example_app_dir("01-hello-world");
let package = build_package_from_dir(&app_dir).await;
let backend = SubprocessBackend::new(None);
let spec = create_execution_spec(execution_id.clone(), package).await;

let mut handle = backend
.create(spec)
.await
.expect("Failed to create execution");
handle
.wait_for_completion()
.await
.expect("Failed to wait for completion");
handle.cleanup().await.expect("Failed to cleanup");

// Verify each execution's temp directory is cleaned up
assert!(
!uv_temp_dir_exists(&execution_id),
"UV temp directory {} should be cleaned up",
execution_id
);
}
}
35 changes: 23 additions & 12 deletions crates/tower-uv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,18 @@ fn normalize_env_vars(env_vars: &HashMap<String, String>) -> HashMap<String, Str
env_vars.insert("SYSTEMROOT".to_string(), systemroot);

// We also need to bring along the TEMP environment variable because Python needs it for
// things like creating temporary files, etc.
let temp = std::env::var("TEMP").unwrap_or_default();
env_vars.insert("TEMP".to_string(), temp);
// things like creating temporary files, etc. But only set if not already provided.
if !env_vars.contains_key("TEMP") {
let temp = std::env::var("TEMP").unwrap_or_default();
env_vars.insert("TEMP".to_string(), temp);
}

// Apparently, according to some random person on Stack Overflow, sometimes the var can be
// TEMP and sometimes it can be TMP. So uh...let's just grab both just in case.
let tmp = std::env::var("TMP").unwrap_or_default();
env_vars.insert("TMP".to_string(), tmp);
if !env_vars.contains_key("TMP") {
let tmp = std::env::var("TMP").unwrap_or_default();
env_vars.insert("TMP".to_string(), tmp);
}
}

#[cfg(not(windows))]
Expand All @@ -83,16 +87,23 @@ fn normalize_env_vars(env_vars: &HashMap<String, String>) -> HashMap<String, Str
env_vars.insert("PATH".to_string(), path);

// On Unix systems, we also want to bring along the TMPDIR environment variable for temp
// files.
let tmpdir = std::env::var("TMPDIR").unwrap_or_default();
env_vars.insert("TMPDIR".to_string(), tmpdir);
// files. But only set it if not already provided (to allow custom temp directories).
if !env_vars.contains_key("TMPDIR") {
let tmpdir = std::env::var("TMPDIR").unwrap_or_default();
env_vars.insert("TMPDIR".to_string(), tmpdir);
}

// Also other potentially-set temp vars. These may not be set.
let temp = std::env::var("TEMP").unwrap_or_default();
env_vars.insert("TEMP".to_string(), temp);
// Only set if not already provided.
if !env_vars.contains_key("TEMP") {
let temp = std::env::var("TEMP").unwrap_or_default();
env_vars.insert("TEMP".to_string(), temp);
}

let tmp = std::env::var("TMP").unwrap_or_default();
env_vars.insert("TMP".to_string(), tmp);
if !env_vars.contains_key("TMP") {
let tmp = std::env::var("TMP").unwrap_or_default();
env_vars.insert("TMP".to_string(), tmp);
}

// Let's pass in TZ as well to propagate that to child processes.
let tz = std::env::var("TZ").unwrap_or_default();
Expand Down