From f5f374dd03fff46f461ca8f0eb8552420549c241 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C5=82gorzata=20Zagajewska?= Date: Sat, 6 Jun 2026 15:31:45 +0200 Subject: [PATCH 1/5] detect and remove invalid model cache entries before re-downloading --- paddler_agent/src/model_source/url.rs | 64 +++++++++++ .../src/cached_downloaded_model.rs | 107 +++++++++++++++++- 2 files changed, 168 insertions(+), 3 deletions(-) diff --git a/paddler_agent/src/model_source/url.rs b/paddler_agent/src/model_source/url.rs index 68d234fa..e5a2e88e 100644 --- a/paddler_agent/src/model_source/url.rs +++ b/paddler_agent/src/model_source/url.rs @@ -190,6 +190,13 @@ async fn resolve_url_into_cache( } }; + if let Err(io_error) = cached.remove_invalid_cache_entry().await { + slot_aggregated_status.reset_download(); + slot_aggregated_status.register_issue(classify_cache_io_error(url_string, &io_error)); + + return Err(anyhow::Error::new(io_error)); + } + let basename = cached .cache_file_path .file_name() @@ -775,4 +782,61 @@ mod tests { )); assert_eq!(tokio::fs::read(&expected_path).await.unwrap(), body); } + + #[tokio::test] + async fn resolve_recovers_from_stale_directory_then_downloads() { + let directory = TempDir::new().unwrap(); + let cache_dir = cache_dir_at(directory.path()); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let url_string = format!("http://127.0.0.1:{port}/model.gguf"); + let body = b"recovered model bytes".to_vec(); + let server = tokio::spawn(serve_single_ok_response(listener, body.clone())); + + let cached = CachedDownloadedModel::new(&cache_dir, &url_string).unwrap(); + let expected_path = cached.cache_file_path.clone(); + cached.ensure_cache_subdir_exists().await.unwrap(); + tokio::fs::create_dir(&expected_path).await.unwrap(); + + let resolution = resolve_url_into_cache(&url_string, &cache_dir, fresh_status()) + .await + .unwrap(); + + server.await.unwrap(); + + assert!(matches!( + resolution, + DesiredModelResolution::Resolved(resolved_path) if resolved_path == expected_path + )); + assert_eq!(tokio::fs::read(&expected_path).await.unwrap(), body); + } + + #[tokio::test] + async fn resolve_reports_unreachable_when_dead_url_with_stale_directory() { + let directory = TempDir::new().unwrap(); + let cache_dir = cache_dir_at(directory.path()); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + drop(listener); + let url_string = format!("http://127.0.0.1:{port}/model.gguf"); + + let cached = CachedDownloadedModel::new(&cache_dir, &url_string).unwrap(); + cached.ensure_cache_subdir_exists().await.unwrap(); + tokio::fs::create_dir(&cached.cache_file_path) + .await + .unwrap(); + + let status = fresh_status(); + let result = resolve_url_into_cache(&url_string, &cache_dir, status.clone()).await; + + assert!(result.is_err(), "a dead URL must produce an Err"); + assert!( + status.has_issue(&AgentIssue::DownloadServerIsUnreachable(ModelPath { + model_path: url_string.clone(), + })), + "recovery must clear the stale dir so the dead-URL download reports unreachable" + ); + } } diff --git a/paddler_cache_dir/src/cached_downloaded_model.rs b/paddler_cache_dir/src/cached_downloaded_model.rs index 2fbd586f..2d565cc0 100644 --- a/paddler_cache_dir/src/cached_downloaded_model.rs +++ b/paddler_cache_dir/src/cached_downloaded_model.rs @@ -1,4 +1,5 @@ use std::fmt::Write as _; +use std::io; use std::path::PathBuf; use anyhow::Result; @@ -44,11 +45,33 @@ impl CachedDownloadedModel { }) } - pub async fn is_cached(&self) -> Result { - fs::try_exists(&self.cache_file_path).await + pub async fn is_cached(&self) -> Result { + match fs::metadata(&self.cache_file_path).await { + Ok(metadata) => Ok(metadata.is_file()), + Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(false), + Err(error) => Err(error), + } + } + + pub async fn remove_invalid_cache_entry(&self) -> Result<(), io::Error> { + let metadata = match fs::symlink_metadata(&self.cache_file_path).await { + Ok(metadata) => metadata, + Err(error) if error.kind() == io::ErrorKind::NotFound => return Ok(()), + Err(error) => return Err(error), + }; + + if metadata.is_file() { + return Ok(()); + } + + if metadata.is_dir() { + fs::remove_dir_all(&self.cache_file_path).await + } else { + fs::remove_file(&self.cache_file_path).await + } } - pub async fn ensure_cache_subdir_exists(&self) -> Result<(), std::io::Error> { + pub async fn ensure_cache_subdir_exists(&self) -> Result<(), io::Error> { fs::create_dir_all(&self.cache_subdir).await } @@ -202,6 +225,84 @@ mod tests { assert!(cached.is_cached().await.unwrap()); } + #[tokio::test] + async fn is_cached_returns_false_when_path_is_a_directory() { + let directory = TempDir::new().unwrap(); + let cache_dir = cache_dir_at(directory.path()); + let cached = + CachedDownloadedModel::new(&cache_dir, "https://host.example/leftover.gguf").unwrap(); + + cached.ensure_cache_subdir_exists().await.unwrap(); + tokio::fs::create_dir(&cached.cache_file_path) + .await + .unwrap(); + + assert!( + !cached.is_cached().await.unwrap(), + "a directory occupying the cache file path is not a cached model" + ); + } + + #[tokio::test] + async fn remove_invalid_cache_entry_removes_stale_directory() { + let directory = TempDir::new().unwrap(); + let cache_dir = cache_dir_at(directory.path()); + let cached = + CachedDownloadedModel::new(&cache_dir, "https://host.example/stale.gguf").unwrap(); + + cached.ensure_cache_subdir_exists().await.unwrap(); + tokio::fs::create_dir(&cached.cache_file_path) + .await + .unwrap(); + tokio::fs::write(cached.cache_file_path.join("inner"), b"leftover") + .await + .unwrap(); + + cached.remove_invalid_cache_entry().await.unwrap(); + + assert!( + !tokio::fs::try_exists(&cached.cache_file_path) + .await + .unwrap() + ); + } + + #[tokio::test] + async fn remove_invalid_cache_entry_keeps_regular_file() { + let directory = TempDir::new().unwrap(); + let cache_dir = cache_dir_at(directory.path()); + let cached = + CachedDownloadedModel::new(&cache_dir, "https://host.example/real.gguf").unwrap(); + + cached.ensure_cache_subdir_exists().await.unwrap(); + tokio::fs::write(&cached.cache_file_path, b"real model bytes") + .await + .unwrap(); + + cached.remove_invalid_cache_entry().await.unwrap(); + + assert_eq!( + tokio::fs::read(&cached.cache_file_path).await.unwrap(), + b"real model bytes" + ); + } + + #[tokio::test] + async fn remove_invalid_cache_entry_is_noop_when_absent() { + let directory = TempDir::new().unwrap(); + let cache_dir = cache_dir_at(directory.path()); + let cached = + CachedDownloadedModel::new(&cache_dir, "https://host.example/missing.gguf").unwrap(); + + cached.remove_invalid_cache_entry().await.unwrap(); + + assert!( + !tokio::fs::try_exists(&cached.cache_file_path) + .await + .unwrap() + ); + } + #[tokio::test] async fn try_acquire_download_lock_succeeds_when_uncontested() { let directory = TempDir::new().unwrap(); From 4a5e52ca340a1b7a65b6834a050f6dbad93aede5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ma=C5=82gorzata=20Zagajewska?= Date: Sat, 6 Jun 2026 15:31:45 +0200 Subject: [PATCH 2/5] time out the agents stream watcher so a stalled predicate fails instead of hanging --- paddler_test_cluster_harness/Cargo.toml | 1 + .../src/agents_stream_watcher.rs | 42 ++++++++++++++++--- 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/paddler_test_cluster_harness/Cargo.toml b/paddler_test_cluster_harness/Cargo.toml index e97271e6..0b63e5b3 100644 --- a/paddler_test_cluster_harness/Cargo.toml +++ b/paddler_test_cluster_harness/Cargo.toml @@ -26,6 +26,7 @@ url = { workspace = true } [dev-dependencies] http = { workspace = true } +tokio = { workspace = true, features = ["test-util"] } [lints] workspace = true diff --git a/paddler_test_cluster_harness/src/agents_stream_watcher.rs b/paddler_test_cluster_harness/src/agents_stream_watcher.rs index 19d0448a..30f47dcf 100644 --- a/paddler_test_cluster_harness/src/agents_stream_watcher.rs +++ b/paddler_test_cluster_harness/src/agents_stream_watcher.rs @@ -1,4 +1,5 @@ use std::pin::Pin; +use std::time::Duration; use anyhow::Context as _; use anyhow::Result; @@ -8,6 +9,9 @@ use futures_util::Stream; use futures_util::StreamExt as _; use paddler_client::client_management::ClientManagement; use paddler_messaging::agent_controller_pool_snapshot::AgentControllerPoolSnapshot; +use tokio::time::timeout; + +const UNTIL_TIMEOUT: Duration = Duration::from_secs(10); pub struct AgentsStreamWatcher { stream: Pin> + Send>>, @@ -35,6 +39,16 @@ impl AgentsStreamWatcher { Self { stream } } + async fn next_snapshot(&mut self) -> Result> { + match timeout(UNTIL_TIMEOUT, self.stream.next()).await { + Err(_elapsed) => Err(anyhow!( + "agents stream did not satisfy the predicate within {UNTIL_TIMEOUT:?}" + )), + Ok(None) => Ok(None), + Ok(Some(item)) => Ok(Some(item.context("agents stream yielded an error")?)), + } + } + pub async fn until( &mut self, mut predicate: TPredicate, @@ -42,9 +56,7 @@ impl AgentsStreamWatcher { where TPredicate: FnMut(&AgentControllerPoolSnapshot) -> bool, { - while let Some(item) = self.stream.next().await { - let snapshot = item.context("agents stream yielded an error")?; - + while let Some(snapshot) = self.next_snapshot().await? { if predicate(&snapshot) { return Ok(snapshot); } @@ -63,9 +75,7 @@ impl AgentsStreamWatcher { where TPredicate: FnMut(&AgentControllerPoolSnapshot) -> bool, { - while let Some(item) = self.stream.next().await { - let snapshot = item.context("agents stream yielded an error")?; - + while let Some(snapshot) = self.next_snapshot().await? { let agent_present = snapshot .agents .iter() @@ -219,6 +229,26 @@ mod tests { AgentsStreamWatcher::from_stream(Box::pin(stream::iter(snapshots.into_iter().map(Ok)))) } + #[tokio::test(start_paused = true)] + async fn until_times_out_when_predicate_is_never_satisfied() -> Result<()> { + let mut watcher = AgentsStreamWatcher::from_stream(Box::pin(stream::pending::< + Result, + >())); + + let error = watcher + .until(|_snapshot| false) + .await + .err() + .context("until must time out when no snapshot ever arrives")?; + + assert!( + format!("{error:#}").contains("within"), + "timeout error must mention the elapsed bound, got: {error:#}" + ); + + Ok(()) + } + #[tokio::test] async fn until_agent_returns_ok_when_predicate_matches_with_agent_present() -> Result<()> { let agent_id = "agent-x"; From 0ee7120df908a5b6fe01cf0adaa20d8e097d654e Mon Sep 17 00:00:00 2001 From: Mateusz Charytoniuk Date: Mon, 8 Jun 2026 18:43:57 +0200 Subject: [PATCH 3/5] extract cache entry validation and healing into dedicated modules --- paddler_agent/src/model_source/url.rs | 9 +- paddler_cache_dir/src/cache_entry_healer.rs | 139 ++++++++++++++++++ .../src/cache_entry_validator.rs | 80 ++++++++++ .../src/cached_downloaded_model.rs | 129 ---------------- paddler_cache_dir/src/lib.rs | 2 + 5 files changed, 228 insertions(+), 131 deletions(-) create mode 100644 paddler_cache_dir/src/cache_entry_healer.rs create mode 100644 paddler_cache_dir/src/cache_entry_validator.rs diff --git a/paddler_agent/src/model_source/url.rs b/paddler_agent/src/model_source/url.rs index e5a2e88e..825ddff5 100644 --- a/paddler_agent/src/model_source/url.rs +++ b/paddler_agent/src/model_source/url.rs @@ -7,6 +7,8 @@ use async_trait::async_trait; use url::Url; use paddler_cache_dir::cache_dir::CacheDir; +use paddler_cache_dir::cache_entry_healer::CacheEntryHealer; +use paddler_cache_dir::cache_entry_validator::CacheEntryValidator; use paddler_cache_dir::cached_downloaded_model::CachedDownloadedModel; use paddler_cache_dir::download_lock_acquisition_error::DownloadLockAcquisitionError; use paddler_download_manager::download_error::DownloadError; @@ -143,8 +145,9 @@ async fn resolve_url_into_cache( } let cached = CachedDownloadedModel::new(cache_dir, url_string)?; + let cache_entry_validator = CacheEntryValidator::new(cached.cache_file_path.clone()); - let is_cached = match cached.is_cached().await { + let is_cached = match cache_entry_validator.is_valid().await { Ok(value) => value, Err(io_error) => { slot_aggregated_status.reset_download(); @@ -190,7 +193,9 @@ async fn resolve_url_into_cache( } }; - if let Err(io_error) = cached.remove_invalid_cache_entry().await { + let cache_entry_healer = CacheEntryHealer::new(cached.cache_file_path.clone()); + + if let Err(io_error) = cache_entry_healer.remove_if_invalid().await { slot_aggregated_status.reset_download(); slot_aggregated_status.register_issue(classify_cache_io_error(url_string, &io_error)); diff --git a/paddler_cache_dir/src/cache_entry_healer.rs b/paddler_cache_dir/src/cache_entry_healer.rs new file mode 100644 index 00000000..109583f3 --- /dev/null +++ b/paddler_cache_dir/src/cache_entry_healer.rs @@ -0,0 +1,139 @@ +use std::io; +use std::path::PathBuf; + +use tokio::fs; + +pub struct CacheEntryHealer { + path: PathBuf, +} + +impl CacheEntryHealer { + #[must_use] + pub const fn new(path: PathBuf) -> Self { + Self { path } + } + + pub async fn remove_if_invalid(&self) -> Result<(), io::Error> { + let metadata = match fs::symlink_metadata(&self.path).await { + Ok(metadata) => metadata, + Err(error) if error.kind() == io::ErrorKind::NotFound => return Ok(()), + Err(error) => return Err(error), + }; + + if metadata.is_file() { + return Ok(()); + } + + if metadata.is_dir() { + fs::remove_dir_all(&self.path).await + } else { + fs::remove_file(&self.path).await + } + } +} + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + + use crate::cache_entry_healer::CacheEntryHealer; + + #[tokio::test] + async fn remove_if_invalid_keeps_regular_file() { + let directory = TempDir::new().unwrap(); + let file_path = directory.path().join("model.gguf"); + tokio::fs::write(&file_path, b"real model bytes") + .await + .unwrap(); + + CacheEntryHealer::new(file_path.clone()) + .remove_if_invalid() + .await + .unwrap(); + + assert_eq!( + tokio::fs::read(&file_path).await.unwrap(), + b"real model bytes" + ); + } + + #[tokio::test] + async fn remove_if_invalid_removes_directory() { + let directory = TempDir::new().unwrap(); + let directory_at_cache_path = directory.path().join("model.gguf"); + tokio::fs::create_dir(&directory_at_cache_path) + .await + .unwrap(); + tokio::fs::write(directory_at_cache_path.join("inner"), b"leftover") + .await + .unwrap(); + + CacheEntryHealer::new(directory_at_cache_path.clone()) + .remove_if_invalid() + .await + .unwrap(); + + assert!( + !tokio::fs::try_exists(&directory_at_cache_path) + .await + .unwrap() + ); + } + + #[cfg(unix)] + #[tokio::test] + async fn remove_if_invalid_removes_symlink() { + use std::os::unix::fs::symlink; + + let directory = TempDir::new().unwrap(); + let symlink_at_cache_path = directory.path().join("model.gguf"); + symlink( + directory.path().join("missing-target"), + &symlink_at_cache_path, + ) + .unwrap(); + + CacheEntryHealer::new(symlink_at_cache_path.clone()) + .remove_if_invalid() + .await + .unwrap(); + + assert!( + tokio::fs::symlink_metadata(&symlink_at_cache_path) + .await + .is_err(), + "the symlink itself must be removed, not followed" + ); + } + + #[tokio::test] + async fn remove_if_invalid_is_noop_when_absent() { + let directory = TempDir::new().unwrap(); + let missing_path = directory.path().join("missing.gguf"); + + CacheEntryHealer::new(missing_path.clone()) + .remove_if_invalid() + .await + .unwrap(); + + assert!(!tokio::fs::try_exists(&missing_path).await.unwrap()); + } + + #[tokio::test] + async fn remove_if_invalid_propagates_error_when_parent_is_a_file() { + let directory = TempDir::new().unwrap(); + let file_in_place_of_parent = directory.path().join("not-a-directory"); + tokio::fs::write(&file_in_place_of_parent, b"blocker") + .await + .unwrap(); + + let result = CacheEntryHealer::new(file_in_place_of_parent.join("child.gguf")) + .remove_if_invalid() + .await; + + assert!( + result.is_err(), + "a non-NotFound stat failure must propagate instead of being treated as healed" + ); + } +} diff --git a/paddler_cache_dir/src/cache_entry_validator.rs b/paddler_cache_dir/src/cache_entry_validator.rs new file mode 100644 index 00000000..05b1f780 --- /dev/null +++ b/paddler_cache_dir/src/cache_entry_validator.rs @@ -0,0 +1,80 @@ +use std::io; +use std::path::PathBuf; + +use tokio::fs; + +pub struct CacheEntryValidator { + path: PathBuf, +} + +impl CacheEntryValidator { + #[must_use] + pub const fn new(path: PathBuf) -> Self { + Self { path } + } + + pub async fn is_valid(&self) -> Result { + match fs::metadata(&self.path).await { + Ok(metadata) => Ok(metadata.is_file()), + Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(false), + Err(error) => Err(error), + } + } +} + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + + use crate::cache_entry_validator::CacheEntryValidator; + + #[tokio::test] + async fn is_valid_returns_true_for_regular_file() { + let directory = TempDir::new().unwrap(); + let file_path = directory.path().join("model.gguf"); + tokio::fs::write(&file_path, b"model bytes").await.unwrap(); + + let validator = CacheEntryValidator::new(file_path); + + assert!(validator.is_valid().await.unwrap()); + } + + #[tokio::test] + async fn is_valid_returns_false_for_directory() { + let directory = TempDir::new().unwrap(); + let directory_at_cache_path = directory.path().join("model.gguf"); + tokio::fs::create_dir(&directory_at_cache_path) + .await + .unwrap(); + + let validator = CacheEntryValidator::new(directory_at_cache_path); + + assert!( + !validator.is_valid().await.unwrap(), + "a directory occupying the cache file path is not a valid cached model" + ); + } + + #[tokio::test] + async fn is_valid_returns_false_when_absent() { + let directory = TempDir::new().unwrap(); + let validator = CacheEntryValidator::new(directory.path().join("missing.gguf")); + + assert!(!validator.is_valid().await.unwrap()); + } + + #[tokio::test] + async fn is_valid_propagates_error_when_parent_is_a_file() { + let directory = TempDir::new().unwrap(); + let file_in_place_of_parent = directory.path().join("not-a-directory"); + tokio::fs::write(&file_in_place_of_parent, b"blocker") + .await + .unwrap(); + let validator = CacheEntryValidator::new(file_in_place_of_parent.join("child.gguf")); + + assert!( + validator.is_valid().await.is_err(), + "a non-NotFound stat failure must propagate instead of reporting a cache miss" + ); + } +} diff --git a/paddler_cache_dir/src/cached_downloaded_model.rs b/paddler_cache_dir/src/cached_downloaded_model.rs index 2d565cc0..a7fc39dd 100644 --- a/paddler_cache_dir/src/cached_downloaded_model.rs +++ b/paddler_cache_dir/src/cached_downloaded_model.rs @@ -45,32 +45,6 @@ impl CachedDownloadedModel { }) } - pub async fn is_cached(&self) -> Result { - match fs::metadata(&self.cache_file_path).await { - Ok(metadata) => Ok(metadata.is_file()), - Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(false), - Err(error) => Err(error), - } - } - - pub async fn remove_invalid_cache_entry(&self) -> Result<(), io::Error> { - let metadata = match fs::symlink_metadata(&self.cache_file_path).await { - Ok(metadata) => metadata, - Err(error) if error.kind() == io::ErrorKind::NotFound => return Ok(()), - Err(error) => return Err(error), - }; - - if metadata.is_file() { - return Ok(()); - } - - if metadata.is_dir() { - fs::remove_dir_all(&self.cache_file_path).await - } else { - fs::remove_file(&self.cache_file_path).await - } - } - pub async fn ensure_cache_subdir_exists(&self) -> Result<(), io::Error> { fs::create_dir_all(&self.cache_subdir).await } @@ -200,109 +174,6 @@ mod tests { ); } - #[tokio::test] - async fn is_cached_returns_false_when_cache_file_absent() { - let directory = TempDir::new().unwrap(); - let cache_dir = cache_dir_at(directory.path()); - let cached = - CachedDownloadedModel::new(&cache_dir, "https://host.example/missing.gguf").unwrap(); - - assert!(!cached.is_cached().await.unwrap()); - } - - #[tokio::test] - async fn is_cached_returns_true_when_cache_file_present() { - let directory = TempDir::new().unwrap(); - let cache_dir = cache_dir_at(directory.path()); - let cached = - CachedDownloadedModel::new(&cache_dir, "https://host.example/present.gguf").unwrap(); - - cached.ensure_cache_subdir_exists().await.unwrap(); - tokio::fs::write(&cached.cache_file_path, b"cached") - .await - .unwrap(); - - assert!(cached.is_cached().await.unwrap()); - } - - #[tokio::test] - async fn is_cached_returns_false_when_path_is_a_directory() { - let directory = TempDir::new().unwrap(); - let cache_dir = cache_dir_at(directory.path()); - let cached = - CachedDownloadedModel::new(&cache_dir, "https://host.example/leftover.gguf").unwrap(); - - cached.ensure_cache_subdir_exists().await.unwrap(); - tokio::fs::create_dir(&cached.cache_file_path) - .await - .unwrap(); - - assert!( - !cached.is_cached().await.unwrap(), - "a directory occupying the cache file path is not a cached model" - ); - } - - #[tokio::test] - async fn remove_invalid_cache_entry_removes_stale_directory() { - let directory = TempDir::new().unwrap(); - let cache_dir = cache_dir_at(directory.path()); - let cached = - CachedDownloadedModel::new(&cache_dir, "https://host.example/stale.gguf").unwrap(); - - cached.ensure_cache_subdir_exists().await.unwrap(); - tokio::fs::create_dir(&cached.cache_file_path) - .await - .unwrap(); - tokio::fs::write(cached.cache_file_path.join("inner"), b"leftover") - .await - .unwrap(); - - cached.remove_invalid_cache_entry().await.unwrap(); - - assert!( - !tokio::fs::try_exists(&cached.cache_file_path) - .await - .unwrap() - ); - } - - #[tokio::test] - async fn remove_invalid_cache_entry_keeps_regular_file() { - let directory = TempDir::new().unwrap(); - let cache_dir = cache_dir_at(directory.path()); - let cached = - CachedDownloadedModel::new(&cache_dir, "https://host.example/real.gguf").unwrap(); - - cached.ensure_cache_subdir_exists().await.unwrap(); - tokio::fs::write(&cached.cache_file_path, b"real model bytes") - .await - .unwrap(); - - cached.remove_invalid_cache_entry().await.unwrap(); - - assert_eq!( - tokio::fs::read(&cached.cache_file_path).await.unwrap(), - b"real model bytes" - ); - } - - #[tokio::test] - async fn remove_invalid_cache_entry_is_noop_when_absent() { - let directory = TempDir::new().unwrap(); - let cache_dir = cache_dir_at(directory.path()); - let cached = - CachedDownloadedModel::new(&cache_dir, "https://host.example/missing.gguf").unwrap(); - - cached.remove_invalid_cache_entry().await.unwrap(); - - assert!( - !tokio::fs::try_exists(&cached.cache_file_path) - .await - .unwrap() - ); - } - #[tokio::test] async fn try_acquire_download_lock_succeeds_when_uncontested() { let directory = TempDir::new().unwrap(); diff --git a/paddler_cache_dir/src/lib.rs b/paddler_cache_dir/src/lib.rs index 45a09025..1dac5b04 100644 --- a/paddler_cache_dir/src/lib.rs +++ b/paddler_cache_dir/src/lib.rs @@ -1,4 +1,6 @@ pub mod cache_dir; +pub mod cache_entry_healer; +pub mod cache_entry_validator; pub mod cached_downloaded_model; pub mod cached_downloaded_model_lock; pub mod download_lock_acquisition_error; From ebcc92d51b313fa49ba28537f9a98a23d7c90441 Mon Sep 17 00:00:00 2001 From: Mateusz Charytoniuk Date: Mon, 8 Jun 2026 19:23:36 +0200 Subject: [PATCH 4/5] treat cache symlinks as corrupted entries --- paddler_agent/src/model_source/url.rs | 41 +++++++++++++++++++ .../src/cache_entry_validator.rs | 23 ++++++++++- 2 files changed, 63 insertions(+), 1 deletion(-) diff --git a/paddler_agent/src/model_source/url.rs b/paddler_agent/src/model_source/url.rs index 825ddff5..88becda6 100644 --- a/paddler_agent/src/model_source/url.rs +++ b/paddler_agent/src/model_source/url.rs @@ -817,6 +817,47 @@ mod tests { assert_eq!(tokio::fs::read(&expected_path).await.unwrap(), body); } + #[cfg(unix)] + #[tokio::test] + async fn resolve_treats_symlink_as_corrupted_then_redownloads() { + use std::os::unix::fs::symlink; + + let directory = TempDir::new().unwrap(); + let cache_dir = cache_dir_at(directory.path()); + + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let port = listener.local_addr().unwrap().port(); + let url_string = format!("http://127.0.0.1:{port}/model.gguf"); + let body = b"redownloaded model bytes".to_vec(); + let server = tokio::spawn(serve_single_ok_response(listener, body.clone())); + + let cached = CachedDownloadedModel::new(&cache_dir, &url_string).unwrap(); + let expected_path = cached.cache_file_path.clone(); + cached.ensure_cache_subdir_exists().await.unwrap(); + let planted_target = directory.path().join("planted-target.gguf"); + tokio::fs::write(&planted_target, b"planted contents") + .await + .unwrap(); + symlink(&planted_target, &expected_path).unwrap(); + + let resolution = resolve_url_into_cache(&url_string, &cache_dir, fresh_status()) + .await + .unwrap(); + + server.await.unwrap(); + + assert!(matches!( + resolution, + DesiredModelResolution::Resolved(resolved_path) if resolved_path == expected_path + )); + assert_eq!(tokio::fs::read(&expected_path).await.unwrap(), body); + assert_eq!( + tokio::fs::read(&planted_target).await.unwrap(), + b"planted contents", + "healing must remove the symlink itself, not the file it points to" + ); + } + #[tokio::test] async fn resolve_reports_unreachable_when_dead_url_with_stale_directory() { let directory = TempDir::new().unwrap(); diff --git a/paddler_cache_dir/src/cache_entry_validator.rs b/paddler_cache_dir/src/cache_entry_validator.rs index 05b1f780..2da74969 100644 --- a/paddler_cache_dir/src/cache_entry_validator.rs +++ b/paddler_cache_dir/src/cache_entry_validator.rs @@ -14,7 +14,7 @@ impl CacheEntryValidator { } pub async fn is_valid(&self) -> Result { - match fs::metadata(&self.path).await { + match fs::symlink_metadata(&self.path).await { Ok(metadata) => Ok(metadata.is_file()), Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(false), Err(error) => Err(error), @@ -55,6 +55,27 @@ mod tests { ); } + #[cfg(unix)] + #[tokio::test] + async fn is_valid_returns_false_for_symlink() { + use std::os::unix::fs::symlink; + + let directory = TempDir::new().unwrap(); + let real_file = directory.path().join("real.gguf"); + tokio::fs::write(&real_file, b"real model bytes") + .await + .unwrap(); + let symlink_at_cache_path = directory.path().join("model.gguf"); + symlink(&real_file, &symlink_at_cache_path).unwrap(); + + let validator = CacheEntryValidator::new(symlink_at_cache_path); + + assert!( + !validator.is_valid().await.unwrap(), + "a symlink at the cache path is corrupted, even when it points to a regular file" + ); + } + #[tokio::test] async fn is_valid_returns_false_when_absent() { let directory = TempDir::new().unwrap(); From 061beea8da755ad3c375aabe060ab214a1577acc Mon Sep 17 00:00:00 2001 From: Mateusz Charytoniuk Date: Mon, 8 Jun 2026 19:23:36 +0200 Subject: [PATCH 5/5] drop the agents stream watcher next-snapshot timeout --- paddler_test_cluster_harness/Cargo.toml | 1 - .../src/agents_stream_watcher.rs | 42 +++---------------- 2 files changed, 6 insertions(+), 37 deletions(-) diff --git a/paddler_test_cluster_harness/Cargo.toml b/paddler_test_cluster_harness/Cargo.toml index 0b63e5b3..e97271e6 100644 --- a/paddler_test_cluster_harness/Cargo.toml +++ b/paddler_test_cluster_harness/Cargo.toml @@ -26,7 +26,6 @@ url = { workspace = true } [dev-dependencies] http = { workspace = true } -tokio = { workspace = true, features = ["test-util"] } [lints] workspace = true diff --git a/paddler_test_cluster_harness/src/agents_stream_watcher.rs b/paddler_test_cluster_harness/src/agents_stream_watcher.rs index 30f47dcf..19d0448a 100644 --- a/paddler_test_cluster_harness/src/agents_stream_watcher.rs +++ b/paddler_test_cluster_harness/src/agents_stream_watcher.rs @@ -1,5 +1,4 @@ use std::pin::Pin; -use std::time::Duration; use anyhow::Context as _; use anyhow::Result; @@ -9,9 +8,6 @@ use futures_util::Stream; use futures_util::StreamExt as _; use paddler_client::client_management::ClientManagement; use paddler_messaging::agent_controller_pool_snapshot::AgentControllerPoolSnapshot; -use tokio::time::timeout; - -const UNTIL_TIMEOUT: Duration = Duration::from_secs(10); pub struct AgentsStreamWatcher { stream: Pin> + Send>>, @@ -39,16 +35,6 @@ impl AgentsStreamWatcher { Self { stream } } - async fn next_snapshot(&mut self) -> Result> { - match timeout(UNTIL_TIMEOUT, self.stream.next()).await { - Err(_elapsed) => Err(anyhow!( - "agents stream did not satisfy the predicate within {UNTIL_TIMEOUT:?}" - )), - Ok(None) => Ok(None), - Ok(Some(item)) => Ok(Some(item.context("agents stream yielded an error")?)), - } - } - pub async fn until( &mut self, mut predicate: TPredicate, @@ -56,7 +42,9 @@ impl AgentsStreamWatcher { where TPredicate: FnMut(&AgentControllerPoolSnapshot) -> bool, { - while let Some(snapshot) = self.next_snapshot().await? { + while let Some(item) = self.stream.next().await { + let snapshot = item.context("agents stream yielded an error")?; + if predicate(&snapshot) { return Ok(snapshot); } @@ -75,7 +63,9 @@ impl AgentsStreamWatcher { where TPredicate: FnMut(&AgentControllerPoolSnapshot) -> bool, { - while let Some(snapshot) = self.next_snapshot().await? { + while let Some(item) = self.stream.next().await { + let snapshot = item.context("agents stream yielded an error")?; + let agent_present = snapshot .agents .iter() @@ -229,26 +219,6 @@ mod tests { AgentsStreamWatcher::from_stream(Box::pin(stream::iter(snapshots.into_iter().map(Ok)))) } - #[tokio::test(start_paused = true)] - async fn until_times_out_when_predicate_is_never_satisfied() -> Result<()> { - let mut watcher = AgentsStreamWatcher::from_stream(Box::pin(stream::pending::< - Result, - >())); - - let error = watcher - .until(|_snapshot| false) - .await - .err() - .context("until must time out when no snapshot ever arrives")?; - - assert!( - format!("{error:#}").contains("within"), - "timeout error must mention the elapsed bound, got: {error:#}" - ); - - Ok(()) - } - #[tokio::test] async fn until_agent_returns_ok_when_predicate_matches_with_agent_present() -> Result<()> { let agent_id = "agent-x";