diff --git a/Cargo.lock b/Cargo.lock index 8af03acc96..596475f27f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3720,6 +3720,7 @@ dependencies = [ "chrono", "criterion", "futures", + "globset", "grep", "grep-matcher", "grep-regex", diff --git a/crates/ragfs-python/src/lib.rs b/crates/ragfs-python/src/lib.rs index b1d4b01e81..f876b9be1f 100644 --- a/crates/ragfs-python/src/lib.rs +++ b/crates/ragfs-python/src/lib.rs @@ -20,7 +20,7 @@ use ragfs::cache::{ use ragfs::core::builder::EncryptionConfig; use ragfs::core::{ build_default_stack, register_builtin_plugins, ConfigValue, FileInfo, FileSystem, - FilesystemStats, FsContext, FsContextInner, FsOperation, GrepResult, MountableFS, + FilesystemStats, FsContext, FsContextInner, FsOperation, GlobPage, GrepResult, MountableFS, OperationStats, PluginConfig, RagfsConfig, StatsWrappedFS, TreeEntry, WriteFlag, FS_CTX, }; @@ -1498,6 +1498,57 @@ impl RAGFSBindingClient { }) } + /// Return one page of flat glob results. + /// + /// Args: + /// path: The root path of the traversal + /// pattern: Glob pattern matched against query-root-relative paths + /// show_hidden: Whether to include hidden files (default: False) + /// page_size: Maximum number of matched entries returned in this page + /// level_limit: Maximum depth relative to query root (default: None) + /// continuation_token: Opaque token returned by the previous page + /// ctx: Optional FsContext dict (e.g. {"account_id": ...}) + /// + /// Returns: + /// A dict with keys: entries (list[GlobEntry]), next_token (str | None) + #[pyo3(signature = (path, pattern, show_hidden=false, page_size=None, level_limit=None, continuation_token=None, ctx=None))] + fn glob_directory( + &self, + py: Python<'_>, + path: String, + pattern: String, + show_hidden: bool, + page_size: Option, + level_limit: Option, + continuation_token: Option, + ctx: Option>, + ) -> PyResult> { + let fs_ctx = build_fs_context(ctx); + let top = self.top.clone(); + let page_size = page_size.map(|n| if n < 0 { 0 } else { n as usize }); + let level_limit_usize = level_limit.map(|n| if n < 0 { 0 } else { n as usize }); + + let page: GlobPage = self + .run_scoped(py, fs_ctx, move || async move { + top.glob_directory( + &path, + &pattern, + show_hidden, + page_size, + level_limit_usize, + continuation_token, + ) + .await + }) + .map_err(to_py_err)?; + + Python::attach(|py| { + let value = serde_json::to_value(&page) + .map_err(|err| PyRuntimeError::new_err(err.to_string()))?; + serde_json_to_py(py, &value) + }) + } + /// Query multi-write sync status under a file or directory path. /// /// Args: diff --git a/crates/ragfs/Cargo.toml b/crates/ragfs/Cargo.toml index 64c1db03a5..8fee939be4 100644 --- a/crates/ragfs/Cargo.toml +++ b/crates/ragfs/Cargo.toml @@ -69,6 +69,7 @@ lru = "0.12" # Regular expressions for grep regex = "1.10" mime_guess = "2.0" +globset = "0.4" # Encryption (envelope encryption: AES-256-GCM + HKDF-SHA256) aes-gcm = "0.10" diff --git a/crates/ragfs/src/cache/wrapper.rs b/crates/ragfs/src/cache/wrapper.rs index b0d738b182..8a7518cdaf 100644 --- a/crates/ragfs/src/cache/wrapper.rs +++ b/crates/ragfs/src/cache/wrapper.rs @@ -9,7 +9,8 @@ use crate::core::filesystem::{ relative_match_file, }; use crate::core::{ - FileInfo, FileSystem, GrepMatch, GrepResult, MultiWriteWrappedFS, Result, TreeEntry, WriteFlag, + FileInfo, FileSystem, GlobPage, GrepMatch, GrepResult, MultiWriteWrappedFS, Result, + TreeEntry, WriteFlag, }; use async_trait::async_trait; use bytes::Bytes; @@ -1221,6 +1222,27 @@ impl FileSystem for CachedFileSystem { .tree_directory(path, show_hidden, node_limit, level_limit) .await } + + async fn glob_directory( + &self, + path: &str, + pattern: &str, + show_hidden: bool, + page_size: Option, + level_limit: Option, + continuation_token: Option, + ) -> Result { + self.backend + .glob_directory( + path, + pattern, + show_hidden, + page_size, + level_limit, + continuation_token, + ) + .await + } } fn normalize_path(path: &str) -> String { diff --git a/crates/ragfs/src/core/encryption_wrapper.rs b/crates/ragfs/src/core/encryption_wrapper.rs index 98841d663a..6b07186f57 100644 --- a/crates/ragfs/src/core/encryption_wrapper.rs +++ b/crates/ragfs/src/core/encryption_wrapper.rs @@ -20,7 +20,7 @@ use crate::shape::SHAPE_MANIFEST_PATH; use super::context::FsContextView; use super::errors::{Error, Result}; use super::filesystem::{compile_grep_regex, normalize_prefix_path, FileSystem}; -use super::types::{FileInfo, GrepResult, TreeEntry, WriteFlag}; +use super::types::{FileInfo, GlobPage, GrepResult, TreeEntry, WriteFlag}; const SYSTEM_ACCOUNT_ID: &str = "_system"; @@ -326,6 +326,31 @@ impl FileSystem for EncryptionWrappedFS { .collect()) } + async fn glob_directory( + &self, + path: &str, + pattern: &str, + show_hidden: bool, + page_size: Option, + level_limit: Option, + continuation_token: Option, + ) -> Result { + let mut page = self + .inner + .glob_directory( + path, + pattern, + show_hidden, + page_size, + level_limit, + continuation_token, + ) + .await?; + page.entries + .retain(|entry| !Self::is_shape_manifest_path(&entry.path)); + Ok(page) + } + async fn ensure_parent_dirs(&self, path: &str, mode: u32) -> Result<()> { self.inner.ensure_parent_dirs(path, mode).await } diff --git a/crates/ragfs/src/core/filesystem.rs b/crates/ragfs/src/core/filesystem.rs index d7fa57132c..92123bd608 100644 --- a/crates/ragfs/src/core/filesystem.rs +++ b/crates/ragfs/src/core/filesystem.rs @@ -9,7 +9,10 @@ use regex::Regex; use std::any::Any; use super::errors::{Error, Result}; -use super::types::{FileInfo, GrepResult, TreeEntry, WriteFlag}; +use super::glob::{ + compare_rel_paths, decode_offset_token, encode_offset_token, purepath_match, validate_pattern, +}; +use super::types::{FileInfo, GlobEntry, GlobPage, GrepResult, TreeEntry, WriteFlag}; /// Normalize a path for prefix comparisons. /// @@ -440,6 +443,65 @@ pub trait FileSystem: Send + Sync + Any { Ok(result) } + /// Return one page of flat glob results under `path`. + /// + /// The default implementation preserves the current Python behavior by + /// reusing `tree_directory()` and matching against the returned `rel_path` + /// values, then slicing matches with an opaque continuation token. + async fn glob_directory( + &self, + path: &str, + pattern: &str, + show_hidden: bool, + page_size: Option, + level_limit: Option, + continuation_token: Option, + ) -> Result { + validate_pattern(pattern)?; + if matches!(page_size, Some(0)) { + return Err(Error::invalid_operation("page_size must be positive")); + } + + let entries = self + .tree_directory(path, show_hidden, None, level_limit) + .await?; + + let mut matched = Vec::new(); + for entry in entries { + if purepath_match(&entry.rel_path, pattern)? { + matched.push(GlobEntry { + path: entry.path, + rel_path: entry.rel_path, + name: entry.info.name, + is_dir: entry.info.is_dir, + }); + } + } + matched.sort_by(|left, right| compare_rel_paths(&left.rel_path, &right.rel_path)); + + let start = decode_offset_token( + continuation_token.as_deref(), + path, + pattern, + show_hidden, + level_limit, + )?; + if start > matched.len() { + return Err(Error::invalid_operation("continuation token out of range")); + } + let end = page_size + .map(|limit| start.saturating_add(limit)) + .unwrap_or(matched.len()) + .min(matched.len()); + let next_token = (end < matched.len()) + .then(|| encode_offset_token(end, path, pattern, show_hidden, level_limit)); + + Ok(GlobPage { + entries: matched[start..end].to_vec(), + next_token, + }) + } + /// Internal recursive helper for tree_directory. /// /// # Arguments @@ -1029,4 +1091,139 @@ mod tests { assert!(names.contains(&"secret.txt".to_string())); assert!(!names.contains(&".hidden_file".to_string())); } + + /// Test helper that calls `glob_directory` with a fixed `/root` query root. + /// + /// Args: + /// - `fs`: The `TreeFS` instance under test. + /// - `pattern`: The glob pattern to match. + /// - `page_size`: The requested page size. + /// - `continuation_token`: The pagination token for the next page. + /// + /// Returns: + /// - A `GlobPage` on success. In tests this helper uses `unwrap()`, so any + /// error fails the test immediately. + async fn root_glob( + fs: &TreeFS, + pattern: &str, + page_size: Option, + continuation_token: Option, + ) -> crate::core::GlobPage { + fs.glob_directory("/root", pattern, false, page_size, None, continuation_token) + .await + .unwrap() + } + + /// Test helper that extracts each entry's `rel_path` from a `GlobPage`. + /// + /// Args: + /// - `page`: The glob page whose relative paths should be collected. + /// + /// Returns: + /// - A list of `rel_path` values in their original order, suitable for + /// result-content and ordering assertions. + fn glob_rel_paths(page: &crate::core::GlobPage) -> Vec { + page.entries + .iter() + .map(|entry| entry.rel_path.clone()) + .collect() + } + + #[tokio::test] + async fn test_glob_directory_matches_basename_suffix_semantics() { + let fs = TreeFS::default() + .with_dir_entries("/root", vec![("sub", true), ("top.md", false)]) + .with_dir_entries( + "/root/sub", + vec![("nested.md", false), ("nested.txt", false)], + ); + + let page = root_glob(&fs, "*.md", None, None).await; + + assert_eq!(glob_rel_paths(&page), vec!["sub/nested.md", "top.md"]); + assert!(page.next_token.is_none()); + } + + #[tokio::test] + async fn test_glob_directory_matches_path_suffix_segments() { + let fs = TreeFS::default() + .with_dir_entries("/root", vec![("a", true), ("x", true)]) + .with_dir_entries("/root/a", vec![("b", true)]) + .with_dir_entries("/root/a/b", vec![("c.md", false)]) + .with_dir_entries("/root/x", vec![("a", true)]) + .with_dir_entries("/root/x/a", vec![("b", true)]) + .with_dir_entries("/root/x/a/b", vec![("c.md", false)]); + + let page = root_glob(&fs, "a/**/*.md", None, None).await; + + assert_eq!(glob_rel_paths(&page), vec!["a/b/c.md", "x/a/b/c.md"]); + } + + #[tokio::test] + async fn test_glob_directory_paginates_with_opaque_offset_tokens() { + let fs = TreeFS::default().with_dir_entries( + "/root", + vec![("a.md", false), ("b.md", false), ("c.md", false)], + ); + + let first = root_glob(&fs, "*.md", Some(2), None).await; + assert_eq!(glob_rel_paths(&first), vec!["a.md", "b.md"]); + assert!(first.next_token.is_some()); + + let second = root_glob(&fs, "*.md", Some(2), first.next_token).await; + assert_eq!(glob_rel_paths(&second), vec!["c.md"]); + assert!(second.next_token.is_none()); + } + + #[tokio::test] + async fn test_glob_directory_rejects_token_from_different_query_scope() { + let fs = TreeFS::default().with_dir_entries( + "/root", + vec![("a.md", false), ("b.md", false), ("c.md", false)], + ); + + let first = root_glob(&fs, "*.md", Some(2), None).await; + let err = fs + .glob_directory("/root", "*.txt", false, Some(2), None, first.next_token) + .await + .unwrap_err(); + + assert!(matches!(err, Error::InvalidOperation(_))); + } + + #[tokio::test] + async fn test_glob_directory_empty_pattern_is_invalid() { + let fs = TreeFS::default().with_dir_entries("/root", vec![("a.md", false)]); + + let err = fs + .glob_directory("/root", "", false, None, None, None) + .await + .unwrap_err(); + + assert!(matches!(err, Error::InvalidOperation(_))); + } + + #[tokio::test] + async fn test_glob_directory_empty_pattern_is_invalid_for_empty_directory() { + let fs = TreeFS::default().with_dir_entries("/root", vec![]); + + let err = fs + .glob_directory("/root", "", false, None, None, None) + .await + .unwrap_err(); + + assert!(matches!(err, Error::InvalidOperation(_))); + } + + #[tokio::test] + async fn test_glob_directory_zero_page_size_is_invalid() { + let fs = TreeFS::default().with_dir_entries("/root", vec![("a.md", false)]); + + let err = fs + .glob_directory("/root", "*.md", false, Some(0), None, None) + .await + .unwrap_err(); + + assert!(matches!(err, Error::InvalidOperation(_))); + } } diff --git a/crates/ragfs/src/core/glob.rs b/crates/ragfs/src/core/glob.rs new file mode 100644 index 0000000000..13bea668f2 --- /dev/null +++ b/crates/ragfs/src/core/glob.rs @@ -0,0 +1,246 @@ +//! Glob helpers shared by the default trait implementation and backend +//! overrides. + +use std::cmp::Ordering; + +use globset::GlobBuilder; +use sha2::{Digest, Sha256}; + +use crate::core::{Error, Result}; + +/// Return whether the relative path matches the same glob semantics currently +/// used by `pathlib.PurePath.match()`. +/// +/// The current Python implementation matches path *suffix* segments rather +/// than anchoring the pattern at the query root. For example: +/// +/// - `PurePath("foo/bar.rs").match("*.rs") == True` +/// - `PurePath("a/b/c.md").match("a/**/*.md") == True` +/// - `PurePath("foo").match("**/foo") == False` +/// +/// We preserve that contract here by: +/// +/// 1. Splitting both pattern and path into non-empty `/`-separated segments. +/// 2. When the pattern contains no `/`, matching it against the basename only. +/// 3. Otherwise, matching the pattern segments against the path's trailing +/// segments of equal length. +/// 4. Using `globset` only for *single-segment* matching, so `/` semantics stay +/// under our control. +pub fn purepath_match(rel_path: &str, pattern: &str) -> Result { + validate_pattern(pattern)?; + + let path_segments = split_segments(rel_path); + let pattern_segments = split_segments(pattern); + + if pattern_segments.is_empty() { + return Ok(false); + } + + if pattern_segments.len() == 1 { + let name = path_segments.last().map(String::as_str).unwrap_or(""); + return segment_matches(name, &pattern_segments[0]); + } + + if pattern_segments.len() > path_segments.len() { + return Ok(false); + } + + let offset = path_segments.len() - pattern_segments.len(); + for (path_segment, pattern_segment) in + path_segments[offset..].iter().zip(pattern_segments.iter()) + { + if !segment_matches(path_segment, pattern_segment)? { + return Ok(false); + } + } + Ok(true) +} + +/// Decode the opaque offset token used by the default implementations. +pub fn decode_offset_token( + token: Option<&str>, + path: &str, + pattern: &str, + show_hidden: bool, + level_limit: Option, +) -> Result { + match token { + None => Ok(0), + Some(raw) if raw.is_empty() => Err(Error::invalid_operation("empty continuation token")), + Some(raw) => { + let Some((scope, offset)) = raw.split_once(':') else { + return Err(Error::invalid_operation("invalid continuation token")); + }; + if scope != token_scope(path, pattern, show_hidden, level_limit) { + return Err(Error::invalid_operation( + "continuation token scope mismatch", + )); + } + offset + .parse::() + .map_err(|_| Error::invalid_operation("invalid continuation token")) + } + } +} + +/// Encode the opaque offset token used by the default implementations. +pub fn encode_offset_token( + offset: usize, + path: &str, + pattern: &str, + show_hidden: bool, + level_limit: Option, +) -> String { + format!( + "{}:{offset}", + token_scope(path, pattern, show_hidden, level_limit) + ) +} + +/// Validate whether a glob pattern is acceptable for matching. +/// +/// Separator-only patterns such as `"/"` are valid inputs, but they do not +/// match any relative path. +pub fn validate_pattern(pattern: &str) -> Result<()> { + if pattern.is_empty() { + return Err(Error::invalid_operation("empty glob pattern")); + } + + let mut saw_segment = false; + for segment in pattern.split('/') { + if segment.is_empty() { + continue; + } + saw_segment = true; + if segment != "." { + return Ok(()); + } + } + + if saw_segment { + return Err(Error::invalid_operation("empty glob pattern")); + } + + Ok(()) +} + +/// Compare two relative paths lexicographically by path component, ignoring +/// empty segments and `.` segments. +pub fn compare_rel_paths(left: &str, right: &str) -> Ordering { + split_segments(left).cmp(&split_segments(right)) +} + +fn split_segments(value: &str) -> Vec { + value + .split('/') + .filter(|segment| !segment.is_empty() && *segment != ".") + .map(ToString::to_string) + .collect() +} + +fn segment_matches(value: &str, pattern: &str) -> Result { + let glob = match GlobBuilder::new(pattern).literal_separator(true).build() { + Ok(glob) => glob, + Err(_) => return Ok(false), + }; + Ok(glob.compile_matcher().is_match(value)) +} + +fn token_scope(path: &str, pattern: &str, show_hidden: bool, level_limit: Option) -> String { + let mut hasher = Sha256::new(); + hasher.update(path.as_bytes()); + hasher.update([0]); + hasher.update(pattern.as_bytes()); + hasher.update([0, show_hidden as u8, 0]); + hasher.update(level_limit.unwrap_or(usize::MAX).to_string().as_bytes()); + format!("{:x}", hasher.finalize()) +} + +#[cfg(test)] +mod tests { + use std::cmp::Ordering; + + use super::{ + compare_rel_paths, decode_offset_token, encode_offset_token, purepath_match, + validate_pattern, + }; + + #[test] + fn test_purepath_match_basename_behavior() { + assert!(purepath_match("foo/bar.rs", "*.rs").unwrap()); + assert!(purepath_match("sub/resource_a", "resource_*").unwrap()); + assert!(!purepath_match("foo/bar.rs", "*.md").unwrap()); + } + + #[test] + fn test_purepath_match_suffix_segments_behavior() { + assert!(purepath_match("a/b/c.md", "a/**/*.md").unwrap()); + assert!(purepath_match("a/b/c.md", "*/c.md").unwrap()); + assert!(purepath_match("foo/bar", "foo/**").unwrap()); + assert!(!purepath_match("foo/bar/baz", "foo/**").unwrap()); + assert!(!purepath_match("foo", "**/foo").unwrap()); + assert!(!purepath_match("a.md", "**/*.md").unwrap()); + assert!(purepath_match("x/a.md", "**/*.md").unwrap()); + } + + #[test] + fn test_purepath_match_empty_pattern_rejected() { + assert!(purepath_match("a", "").is_err()); + } + + #[test] + fn test_purepath_match_root_only_pattern_returns_false() { + assert!(!purepath_match("a", "/").unwrap()); + assert!(!purepath_match("a", "///").unwrap()); + } + + #[test] + fn test_purepath_match_dot_only_pattern_still_rejected() { + assert!(purepath_match("a", ".").is_err()); + assert!(purepath_match("a", "./").is_err()); + assert!(purepath_match("a", "././").is_err()); + } + + #[test] + fn test_purepath_match_invalid_segment_returns_false() { + assert!(!purepath_match("foo", "[").unwrap()); + assert!(!purepath_match("foo", "foo[bar").unwrap()); + assert!(!purepath_match("foo", "[]").unwrap()); + } + + #[test] + fn test_purepath_match_normalizes_dot_segments() { + assert!(purepath_match("a/./b.md", "a/b.md").unwrap()); + assert!(purepath_match("a/b.md", "./b.md").unwrap()); + assert!(purepath_match("./a/b.md", "a/b.md").unwrap()); + } + + #[test] + fn test_offset_token_rejects_scope_mismatch() { + let token = encode_offset_token(2, "/root", "*.md", false, None); + + assert_eq!( + decode_offset_token(Some(&token), "/root", "*.md", false, None).unwrap(), + 2 + ); + assert!(decode_offset_token(Some(&token), "/root", "*.txt", false, None).is_err()); + } + + #[test] + fn test_validate_pattern_distinguishes_root_only_and_dot_only() { + validate_pattern("/").unwrap(); + validate_pattern("///").unwrap(); + assert!(validate_pattern(".").is_err()); + assert!(validate_pattern("./").is_err()); + validate_pattern("a/./b").unwrap(); + } + + #[test] + fn test_compare_rel_paths_uses_path_component_order() { + assert_eq!(compare_rel_paths("a", "a"), Ordering::Equal); + assert_eq!(compare_rel_paths("a", "a/b"), Ordering::Less); + assert_eq!(compare_rel_paths("a/b/c.txt", "a/b.txt"), Ordering::Less); + assert_eq!(compare_rel_paths("a//b", "a/b"), Ordering::Equal); + assert_eq!(compare_rel_paths("./a", "a"), Ordering::Equal); + } +} diff --git a/crates/ragfs/src/core/mod.rs b/crates/ragfs/src/core/mod.rs index b7469ce8b6..6bc43e9397 100644 --- a/crates/ragfs/src/core/mod.rs +++ b/crates/ragfs/src/core/mod.rs @@ -12,6 +12,7 @@ pub mod context; pub mod encryption_wrapper; pub mod errors; pub mod filesystem; +pub mod glob; pub mod mountable; pub mod multibackend_wrapper; pub mod plugin; @@ -36,7 +37,7 @@ pub use stats::{FilesystemStats, FsOperation, OperationStats, OperationTimer, St pub use stats_wrapper::StatsWrappedFS; pub use types::{ BackendItemConfig, BackendRole, BackendSyncState, BackendsConfig, ConfigParameter, ConfigValue, - EncryptionConfig, FileInfo, GrepMatch, GrepResult, OperationItemConfig, PluginConfig, - RedirectEntry, RedirectMeta, RedirectPolicy, SyncLogEntry, SyncLogMeta, SyncOp, SyncType, - TreeEntry, WriteFlag, + EncryptionConfig, FileInfo, GlobEntry, GlobPage, GrepMatch, GrepResult, + OperationItemConfig, PluginConfig, RedirectEntry, RedirectMeta, RedirectPolicy, + SyncLogEntry, SyncLogMeta, SyncOp, SyncType, TreeEntry, WriteFlag, }; diff --git a/crates/ragfs/src/core/mountable.rs b/crates/ragfs/src/core/mountable.rs index 15015279b6..9f9438e925 100644 --- a/crates/ragfs/src/core/mountable.rs +++ b/crates/ragfs/src/core/mountable.rs @@ -26,7 +26,9 @@ use super::multibackend_wrapper::MultiWriteWrappedFS; use super::plugin::ServicePlugin; use super::stats::{FilesystemStats, StatsCollector}; use super::stats_wrapper::StatsWrappedFS; -use super::types::{BackendsConfig, FileInfo, GrepResult, PluginConfig, TreeEntry, WriteFlag}; +use super::types::{ + BackendsConfig, FileInfo, GlobPage, GrepResult, PluginConfig, TreeEntry, WriteFlag, +}; #[cfg(feature = "cache")] use crate::cache::{ CacheNamespace, CachePolicy, CacheProvider, CacheTraversalMode, CachedFileSystem, @@ -871,6 +873,56 @@ impl FileSystem for MountableFS { Ok(entries) } + + async fn glob_directory( + &self, + path: &str, + pattern: &str, + show_hidden: bool, + page_size: Option, + level_limit: Option, + continuation_token: Option, + ) -> Result { + let (mount_info, rel_path) = self.find_mount(path).await?; + + let mount_prefix = if mount_info.path == "/" { + String::new() + } else { + mount_info.path.clone() + }; + + let mut page = mount_info + .fs + .glob_directory( + &rel_path, + pattern, + show_hidden, + page_size, + level_limit, + continuation_token, + ) + .await?; + + for entry in &mut page.entries { + if !mount_prefix.is_empty() { + entry.path = if entry.path == "/" { + mount_prefix.clone() + } else { + format!("{}{}", mount_prefix, entry.path) + }; + } + } + + page.entries.retain(|entry| { + entry + .path + .rsplit('/') + .next() + .map_or(true, |name| name != PATH_LOCK_FILE) + }); + + Ok(page) + } } #[cfg(test)] diff --git a/crates/ragfs/src/core/multibackend_wrapper.rs b/crates/ragfs/src/core/multibackend_wrapper.rs index dab454415d..6e84c37962 100644 --- a/crates/ragfs/src/core/multibackend_wrapper.rs +++ b/crates/ragfs/src/core/multibackend_wrapper.rs @@ -22,8 +22,8 @@ use super::context::{FsContext, FS_CTX}; use super::errors::{Error, Result}; use super::filesystem::{normalize_prefix_path, relative_match_file, FileSystem}; use super::types::{ - BackendRole, BackendSyncState, FileInfo, GrepResult, OperationItemConfig, RedirectEntry, - RedirectPolicy, SyncLogEntry, SyncOp, SyncType, TreeEntry, WriteFlag, + BackendRole, BackendSyncState, FileInfo, GrepResult, OperationItemConfig, + RedirectEntry, RedirectPolicy, SyncLogEntry, SyncOp, SyncType, TreeEntry, WriteFlag, }; use crate::multibackend::meta::{ current_required_ctx, file_name, parent_dir, DefaultFsContextResolver, FsContextResolver, diff --git a/crates/ragfs/src/core/stats.rs b/crates/ragfs/src/core/stats.rs index 0f36b52a96..0a626c7c59 100644 --- a/crates/ragfs/src/core/stats.rs +++ b/crates/ragfs/src/core/stats.rs @@ -42,6 +42,8 @@ pub enum FsOperation { EnsureParentDirs, /// Tree directory operation TreeDir, + /// Glob directory operation + GlobDir, } impl FsOperation { @@ -63,6 +65,7 @@ impl FsOperation { FsOperation::Grep, FsOperation::EnsureParentDirs, FsOperation::TreeDir, + FsOperation::GlobDir, ] } @@ -84,6 +87,7 @@ impl FsOperation { FsOperation::Grep => "grep", FsOperation::EnsureParentDirs => "ensure_parent_dirs", FsOperation::TreeDir => "tree_dir", + FsOperation::GlobDir => "glob_dir", } } } diff --git a/crates/ragfs/src/core/stats_wrapper.rs b/crates/ragfs/src/core/stats_wrapper.rs index 0a32380ea9..56072f3074 100644 --- a/crates/ragfs/src/core/stats_wrapper.rs +++ b/crates/ragfs/src/core/stats_wrapper.rs @@ -7,8 +7,8 @@ use async_trait::async_trait; use std::sync::Arc; use super::{ - FileInfo, FileSystem, FsOperation, GrepResult, OperationTimer, Result, StatsCollector, - TreeEntry, WriteFlag, + FileInfo, FileSystem, FsOperation, GlobPage, GrepResult, OperationTimer, Result, + StatsCollector, TreeEntry, WriteFlag, }; /// A wrapper around FileSystem that automatically collects operation statistics @@ -197,4 +197,29 @@ impl FileSystem for StatsWrappedFS { timer.finish().await; result } + + async fn glob_directory( + &self, + path: &str, + pattern: &str, + show_hidden: bool, + page_size: Option, + level_limit: Option, + continuation_token: Option, + ) -> Result { + let timer = OperationTimer::start(FsOperation::GlobDir, Arc::clone(&self.stats)); + let result = self + .inner + .glob_directory( + path, + pattern, + show_hidden, + page_size, + level_limit, + continuation_token, + ) + .await; + timer.finish().await; + result + } } diff --git a/crates/ragfs/src/core/types.rs b/crates/ragfs/src/core/types.rs index eb98a5be9b..767091b0bb 100644 --- a/crates/ragfs/src/core/types.rs +++ b/crates/ragfs/src/core/types.rs @@ -58,6 +58,37 @@ pub struct TreeEntry { pub extra: HashMap, } +/// Flat glob match entry. +/// +/// Represents one path matched by `glob_directory`, preserving enough metadata +/// for Python-side visibility and URI alias handling without reconstructing a +/// full `TreeEntry`. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GlobEntry { + /// Plugin-root-relative absolute path, matching the `TreeEntry.path` + /// contract after mount rewriting. + pub path: String, + + /// Path relative to the glob query root. + pub rel_path: String, + + /// Final path component. + pub name: String, + + /// Whether the matched entry is a directory. + pub is_dir: bool, +} + +/// One page of glob results. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GlobPage { + /// Matched entries for this page. + pub entries: Vec, + + /// Opaque continuation token for the next page. + pub next_token: Option, +} + impl GrepResult { /// Create a new empty GrepResult pub fn new() -> Self { diff --git a/crates/ragfs/src/lib.rs b/crates/ragfs/src/lib.rs index 47106ddc6b..d07c167b97 100644 --- a/crates/ragfs/src/lib.rs +++ b/crates/ragfs/src/lib.rs @@ -40,8 +40,8 @@ pub mod shape; // Re-export core types for convenience pub use core::{ ConfigParameter, ConfigValue, Error, FileInfo, FileSystem, FilesystemStats, FsOperation, - HealthStatus, MountableFS, OperationStats, OperationTimer, PluginConfig, PluginRegistry, - Result, ServicePlugin, StatsCollector, StatsWrappedFS, WriteFlag, + GlobEntry, GlobPage, HealthStatus, MountableFS, OperationStats, OperationTimer, PluginConfig, + PluginRegistry, Result, ServicePlugin, StatsCollector, StatsWrappedFS, WriteFlag, }; /// Version of RAGFS diff --git a/crates/ragfs/src/plugins/s3fs/client.rs b/crates/ragfs/src/plugins/s3fs/client.rs index c6b405bc0c..dddda45930 100644 --- a/crates/ragfs/src/plugins/s3fs/client.rs +++ b/crates/ragfs/src/plugins/s3fs/client.rs @@ -252,7 +252,7 @@ impl DirectoryMarkerMode { } /// Object metadata from HeadObject -#[derive(Debug, Clone)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct ObjectMeta { /// Object key pub key: String, @@ -264,6 +264,13 @@ pub struct ObjectMeta { pub is_dir_marker: bool, } +/// One flat `ListObjectsV2` page used by S3-backed tree/glob traversal. +#[derive(Debug, Clone)] +pub struct ListTreePage { + pub objects: Vec, + pub next_continuation_token: Option, +} + /// Result of a ListObjects operation #[derive(Debug)] pub struct ListResult { @@ -800,6 +807,59 @@ impl S3Client { Ok(objects) } + /// List one flat object page under a prefix (no delimiter). + pub async fn list_tree_objects_page( + &self, + prefix: &str, + continuation_token: Option<&str>, + max_keys: usize, + ) -> Result { + let mut req = self + .client + .list_objects_v2() + .bucket(&self.bucket) + .prefix(prefix) + .max_keys(max_keys.min(1000) as i32); + + if let Some(token) = continuation_token { + req = req.continuation_token(token); + } + + let resp = req.send().await.map_err(|e| { + format_sdk_s3_error( + "ListObjectsV2", + &format!("bucket={} prefix={prefix}", self.bucket), + &e, + ) + })?; + + let mut objects = Vec::new(); + for obj in resp.contents() { + let key = obj.key().unwrap_or(""); + if key == prefix { + continue; + } + + let size = obj.size.unwrap_or(0); + let last_modified = obj + .last_modified() + .map(aws_datetime_to_systemtime) + .unwrap_or(UNIX_EPOCH); + + objects.push(ObjectMeta { + key: key.to_string(), + size, + last_modified, + is_dir_marker: key.ends_with('/'), + }); + } + + Ok(ListTreePage { + objects, + next_continuation_token: resp.next_continuation_token().map(|s| s.to_string()), + }) + } + /// Copy an object pub async fn copy_object(&self, src_key: &str, dst_key: &str) -> Result<()> { let copy_source = format!("{}/{}", self.bucket, src_key); diff --git a/crates/ragfs/src/plugins/s3fs/mod.rs b/crates/ragfs/src/plugins/s3fs/mod.rs index 0d3646687a..0c35e39870 100644 --- a/crates/ragfs/src/plugins/s3fs/mod.rs +++ b/crates/ragfs/src/plugins/s3fs/mod.rs @@ -18,21 +18,23 @@ pub mod client; mod tree; use async_trait::async_trait; +use sha2::{Digest, Sha256}; use std::sync::{Arc, Mutex}; use std::time::SystemTime; use cache::{S3ListDirCache, S3StatCache}; -use client::S3Client; +use client::{ListTreePage, S3Client}; use futures::stream::{self, StreamExt}; use regex::Regex; use std::sync::atomic::{AtomicUsize, Ordering}; use crate::core::filesystem::{relative_depth, relative_match_file}; +use crate::core::glob::{purepath_match, validate_pattern}; use crate::core::{ - ConfigParameter, Error, FileInfo, FileSystem, GrepMatch, GrepResult, PluginConfig, Result, - ServicePlugin, TreeEntry, WriteFlag, + ConfigParameter, Error, FileInfo, FileSystem, GlobEntry, GlobPage, GrepMatch, GrepResult, + PluginConfig, Result, ServicePlugin, TreeEntry, WriteFlag, }; -use tree::build_tree_entries_from_flat_listing; +use tree::{build_tree_entries_from_flat_listing, build_tree_entries_from_listing_page, rel_parts}; /// Check whether `path` is under `exclude_path` (including itself). fn s3_is_excluded_path(path: &str, exclude_path: &str) -> bool { @@ -59,6 +61,15 @@ struct S3ChunkReader<'a> { key: &'a str, } +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +struct S3GlobToken { + scope: String, + scan_token: Option, + page_entries: Vec, + page_offset: usize, + last_rel_parts: Vec, +} + #[async_trait] impl ChunkReader for S3ChunkReader<'_> { async fn read_chunk(&mut self, offset: u64, size: u64) -> Result> { @@ -237,6 +248,95 @@ impl S3FileSystem { result } + fn decode_glob_token( + token: Option<&str>, + path: &str, + pattern: &str, + show_hidden: bool, + level_limit: Option, + ) -> Result { + let scope = Self::glob_token_scope(path, pattern, show_hidden, level_limit); + match token { + None => Ok(S3GlobToken { + scope, + scan_token: None, + page_entries: Vec::new(), + page_offset: 0, + last_rel_parts: Vec::new(), + }), + Some(raw) if raw.is_empty() => Err(Error::invalid_operation("empty continuation token")), + Some(raw) => { + let state: S3GlobToken = serde_json::from_str(raw) + .map_err(|_| Error::invalid_operation("invalid continuation token"))?; + if state.scope != scope { + return Err(Error::invalid_operation("continuation token scope mismatch")); + } + Ok(state) + } + } + } + + fn encode_glob_token(state: &S3GlobToken) -> Result { + // ponytail: the token carries the remaining entries from the current + // page directly to avoid extra state storage; the ceiling is bounded + // by `page_size` and the S3 single-page limit of 1000 keys. + serde_json::to_string(state).map_err(|err| Error::Serialization(err.to_string())) + } + + fn glob_token_scope( + path: &str, + pattern: &str, + show_hidden: bool, + level_limit: Option, + ) -> String { + let mut hasher = Sha256::new(); + hasher.update(path.as_bytes()); + hasher.update([0]); + hasher.update(pattern.as_bytes()); + hasher.update([0, show_hidden as u8, 0]); + hasher.update(level_limit.unwrap_or(usize::MAX).to_string().as_bytes()); + format!("{:x}", hasher.finalize()) + } + + fn glob_entries_from_listing_page( + &self, + query_root: &str, + page: &ListTreePage, + pattern: &str, + show_hidden: bool, + level_limit: Option, + last_rel_parts: &[String], + ) -> Result<(Vec, Vec)> { + let ordered = build_tree_entries_from_listing_page( + query_root, + &page.objects, + show_hidden, + level_limit, + last_rel_parts, + |key| self.client.strip_prefix(key), + )?; + + let mut matched = Vec::new(); + let next_last_rel_parts = ordered + .last() + .map(|entry| rel_parts(&entry.rel_path)) + .unwrap_or_else(|| last_rel_parts.to_vec()); + + for entry in ordered { + if purepath_match(&entry.rel_path, pattern)? { + matched.push(GlobEntry { + path: entry.path, + rel_path: entry.rel_path, + name: entry.info.name, + is_dir: entry.info.is_dir, + }); + } + } + + Ok((matched, next_last_rel_parts)) + } + + /// Get file name from path fn file_name(path: &str) -> String { if path == "/" { @@ -833,6 +933,98 @@ impl FileSystem for S3FileSystem { Ok(result) } + + async fn glob_directory( + &self, + path: &str, + pattern: &str, + show_hidden: bool, + page_size: Option, + level_limit: Option, + continuation_token: Option, + ) -> Result { + validate_pattern(pattern)?; + if matches!(page_size, Some(0)) { + return Err(Error::invalid_operation("page_size must be positive")); + } + + let normalized = Self::normalize_path(path); + let info = self.stat(&normalized).await?; + if !info.is_dir { + return Err(Error::NotADirectory(normalized)); + } + + let prefix = if normalized == "/" { + self.client.build_key("") + } else { + format!("{}/", self.client.build_key(&normalized)) + }; + let mut state = Self::decode_glob_token( + continuation_token.as_deref(), + path, + pattern, + show_hidden, + level_limit, + )?; + let target_limit = page_size.unwrap_or(usize::MAX); + let fetch_size = 1000; + let mut matched = Vec::new(); + + loop { + while state.page_offset < state.page_entries.len() { + let entry = state.page_entries[state.page_offset].clone(); + state.page_offset += 1; + matched.push(entry); + if matched.len() >= target_limit { + let next_token = if state.page_offset < state.page_entries.len() + || state.scan_token.is_some() + { + Some(Self::encode_glob_token(&state)?) + } else { + None + }; + return Ok(GlobPage { + entries: matched, + next_token, + }); + } + } + + state.page_entries.clear(); + state.page_offset = 0; + + if state.scan_token.is_none() && !state.last_rel_parts.is_empty() { + break; + } + + let page = self + .client + .list_tree_objects_page(&prefix, state.scan_token.as_deref(), fetch_size) + .await?; + state.scan_token = page.next_continuation_token.clone(); + let (entries, next_last_rel_parts) = self.glob_entries_from_listing_page( + &normalized, + &page, + pattern, + show_hidden, + level_limit, + &state.last_rel_parts, + )?; + state.last_rel_parts = next_last_rel_parts; + if entries.is_empty() { + if state.scan_token.is_none() { + break; + } + continue; + } + state.page_entries = entries; + } + + Ok(GlobPage { + entries: matched, + next_token: None, + }) + } } /// S3FS Plugin @@ -1397,7 +1589,7 @@ mod tests { .await } - // ── Case 3: 正则无效 ── + // ── Case 3: invalid regex ── #[test] fn test_grep_invalid_regex() { @@ -1411,7 +1603,7 @@ mod tests { ); } - // ── Case 17: case_insensitive 正则构建 ── + // ── Case 17: case-insensitive regex construction ── #[test] fn test_case_insensitive_regex() { @@ -1425,7 +1617,8 @@ mod tests { assert!(re2.is_match("world"), "uppercase pattern matches lowercase"); } - // ── Case 9: 文件大小恰好为 CHUNK_SIZE(单 chunk,is_last 靠 offset+size>=file_size) ── + // ── Case 9: file size exactly equals CHUNK_SIZE (single chunk, with + // `is_last` determined by `offset + size >= file_size`) ── #[tokio::test] async fn test_grep_stream_exact_chunk_single() { @@ -1441,7 +1634,8 @@ mod tests { assert_eq!(matches[1].content, "world"); } - // ── Case 10: 最后 chunk 恰好等于 CHUNK_SIZE(多 chunk 场景) ── + // ── Case 10: last chunk size exactly equals CHUNK_SIZE (multi-chunk + // scenario) ── #[tokio::test] async fn test_grep_stream_last_chunk_exact_size() { @@ -1452,7 +1646,7 @@ mod tests { assert_eq!(matches[3].content, "line4"); } - // ── Case 11: 跨 chunk 边界行的拼接 ── + // ── Case 11: line stitching across chunk boundaries ── #[tokio::test] async fn test_grep_stream_cross_chunk_line() { @@ -1464,7 +1658,7 @@ mod tests { assert_eq!(matches[0].content, "lo wo", "stitched across boundary"); } - // ── Case 12: 连续多 chunk 无换行符 ── + // ── Case 12: multiple consecutive chunks with no newline ── #[tokio::test] async fn test_grep_stream_multi_chunk_no_newline() { @@ -1480,7 +1674,7 @@ mod tests { ); } - // ── Case 13: 超长行 > GREP_MAX_PARTIAL_SIZE ── + // ── Case 13: oversized line > GREP_MAX_PARTIAL_SIZE ── #[tokio::test] async fn test_grep_stream_line_exceeds_max_partial() { @@ -1493,7 +1687,7 @@ mod tests { ); } - // ── Case 14: 二进制内容(含无效 UTF-8) ── + // ── Case 14: binary content (including invalid UTF-8) ── #[tokio::test] async fn test_grep_stream_binary_content() { diff --git a/crates/ragfs/src/plugins/s3fs/tree.rs b/crates/ragfs/src/plugins/s3fs/tree.rs index 9dc8bec851..2d0237a955 100644 --- a/crates/ragfs/src/plugins/s3fs/tree.rs +++ b/crates/ragfs/src/plugins/s3fs/tree.rs @@ -135,6 +135,52 @@ where Ok(result) } +/// Build one ordered page of `TreeEntry` values from a single S3 listing page. +/// +/// Compared with `build_tree_entries_from_flat_listing`, this variant drops the +/// leading ancestor entries that were already emitted by a previous listing +/// page, so callers can stitch paginated S3 scans into one stable DFS stream. +pub(super) fn build_tree_entries_from_listing_page( + query_root: &str, + objects: &[client::ObjectMeta], + show_hidden: bool, + level_limit: Option, + prev_rel_parts: &[String], + key_to_path: F, +) -> Result> +where + F: Fn(&str) -> String, +{ + let mut entries = + build_tree_entries_from_flat_listing(query_root, objects, show_hidden, level_limit, key_to_path)?; + if prev_rel_parts.is_empty() || entries.is_empty() { + return Ok(entries); + } + + let mut skip = 0usize; + while skip < entries.len() { + let parts = rel_parts(&entries[skip].rel_path); + if parts.len() <= prev_rel_parts.len() && prev_rel_parts[..parts.len()] == parts[..] { + skip += 1; + continue; + } + break; + } + + if skip > 0 { + entries.drain(..skip); + } + Ok(entries) +} + +pub(super) fn rel_parts(rel_path: &str) -> Vec { + rel_path + .split('/') + .filter(|segment| !segment.is_empty() && *segment != ".") + .map(ToString::to_string) + .collect() +} + #[cfg(test)] mod tests { use super::*; @@ -322,4 +368,57 @@ mod tests { // (`/tenant_x`, `/tenant_x/resources`) and not the query root itself. assert_eq!(paths, vec!["/tenant_x/resources/res_1/res_1.md"]); } + + #[test] + fn test_build_tree_entries_page_drops_cross_page_duplicate_ancestors() { + let first_page = vec![make_meta("a/b/first.txt", 100, false)]; + let first = build_tree_entries_from_listing_page( + "/", + &first_page, + false, + None, + &[], + identity_key_to_path, + ) + .unwrap(); + let last_rel_parts = first + .last() + .unwrap() + .rel_path + .split('/') + .map(str::to_string) + .collect::>(); + + let second_page = vec![make_meta("a/b/second.txt", 100, false)]; + let second = build_tree_entries_from_listing_page( + "/", + &second_page, + false, + None, + &last_rel_parts, + identity_key_to_path, + ) + .unwrap(); + + let paths: Vec<&str> = second.iter().map(|e| e.path.as_str()).collect(); + assert_eq!(paths, vec!["/a/b/second.txt"]); + } + + #[test] + fn test_build_tree_entries_page_keeps_diverging_suffix_after_common_prefix() { + let prev_rel_parts = vec!["a".to_string(), "b".to_string(), "first.txt".to_string()]; + let page = vec![make_meta("a/c/second.txt", 100, false)]; + let result = build_tree_entries_from_listing_page( + "/", + &page, + false, + None, + &prev_rel_parts, + identity_key_to_path, + ) + .unwrap(); + + let paths: Vec<&str> = result.iter().map(|e| e.path.as_str()).collect(); + assert_eq!(paths, vec!["/a/c", "/a/c/second.txt"]); + } } diff --git a/openviking/pyagfs/async_client.py b/openviking/pyagfs/async_client.py index 09e57d43f7..9314799516 100644 --- a/openviking/pyagfs/async_client.py +++ b/openviking/pyagfs/async_client.py @@ -224,6 +224,28 @@ async def tree_directory( ctx=_fs_ctx_or_default(path, fs_ctx), ) + async def glob_directory( + self, + path: str, + pattern: str, + show_hidden: bool = False, + page_size: int | None = None, + level_limit: int | None = None, + continuation_token: str | None = None, + *, + fs_ctx: Dict[str, str] | None = None, + ) -> Dict[str, Any]: + return await self.run( + "glob_directory", + path, + pattern, + show_hidden=show_hidden, + page_size=page_size, + level_limit=level_limit, + continuation_token=continuation_token, + ctx=_fs_ctx_or_default(path, fs_ctx), + ) + async def system_sync_status( self, path: str, *, fs_ctx: Dict[str, str] | None = None ) -> Dict[str, Any]: diff --git a/openviking/pyagfs/protocols.py b/openviking/pyagfs/protocols.py index b6b6c19c8b..d04fe66800 100644 --- a/openviking/pyagfs/protocols.py +++ b/openviking/pyagfs/protocols.py @@ -93,6 +93,17 @@ def tree_directory( ) -> list[Dict[str, Any]]: """Return a tree view for the given AGFS directory.""" + def glob_directory( + self, + path: str, + pattern: str, + show_hidden: bool = False, + page_size: int | None = None, + level_limit: int | None = None, + continuation_token: str | None = None, + ) -> Dict[str, Any]: + """Return one page of flat glob matches for the given AGFS directory.""" + def system_sync_status(self, path: str) -> Dict[str, Any]: """Return multi-write sync status for a file or directory path.""" diff --git a/openviking/storage/viking_fs.py b/openviking/storage/viking_fs.py index d67aff52fb..fc10c9c1cf 100644 --- a/openviking/storage/viking_fs.py +++ b/openviking/storage/viking_fs.py @@ -992,16 +992,64 @@ async def glob( ctx: Optional[RequestContext] = None, ) -> Dict: """File pattern matching, supports **/*.md recursive.""" - entries = await self.tree(uri, node_limit=1000000, level_limit=None, ctx=ctx) - base_uri = uri.rstrip("/") + _ensure_non_empty_search_query(pattern) + self._ensure_access(uri, ctx) + real_ctx = self._ctx_or_default(ctx) + primary_path = self._uri_to_path(uri, ctx=ctx) + path: Optional[str] = None + for candidate_path in self._read_paths(uri, ctx=ctx): + if not await self._read_path_visible(uri, candidate_path, primary_path, real_ctx): + continue + if await self._agfs_path_exists(candidate_path): + path = candidate_path + break + if path is None: + if self._is_legacy_session_root_uri(uri): + return {"matches": [], "count": 0} + raise NotFoundError(uri, "directory") + + page_size = self._glob_page_size(node_limit) + continuation_token: Optional[str] = None matches = [] - for entry in entries: - rel_path = entry.get("rel_path", "") - if PurePath(rel_path).match(pattern): - matches.append(f"{base_uri}/{rel_path}") - # Now apply node limit to the filtered matches - if node_limit is not None and node_limit > 0: - matches = matches[:node_limit] + while True: + page = await self._async_agfs.glob_directory( + path, + pattern, + show_hidden=False, + page_size=page_size, + level_limit=None, + continuation_token=continuation_token, + ) + + for entry in page.get("entries", []): + if node_limit is not None and node_limit > 0 and len(matches) >= node_limit: + return {"matches": matches, "count": len(matches)} + if not self._is_path_entry_visible( + entry["path"], + entry.get("name") or entry["path"].rsplit("/", 1)[-1], + path, + real_ctx, + ): + continue + if not await self._read_path_visible(uri, entry["path"], primary_path, real_ctx): + continue + rel_path = entry.get("rel_path", "") + # Re-check with Python to keep the existing PurePath semantics as the final oracle. + if not PurePath(rel_path).match(pattern): + continue + entry_uri = self._alias_uri_for_path( + request_uri=uri, + base_path=path, + entry_path=entry["path"], + ctx=ctx, + ) + matches.append(entry_uri) + + if node_limit is not None and node_limit > 0 and len(matches) >= node_limit: + return {"matches": matches, "count": len(matches)} + continuation_token = page.get("next_token") + if not continuation_token: + break return {"matches": matches, "count": len(matches)} async def _batch_fetch_abstracts( @@ -1664,25 +1712,15 @@ def _ancestor_is_filtered(self, entry_path: str, base_path: str) -> bool: return True return False - def _is_tree_entry_visible( - self, entry: Dict[str, Any], base_path: str, ctx: RequestContext + def _is_path_entry_visible( + self, entry_path: str, name: str, base_path: str, ctx: RequestContext ) -> bool: - """Check visibility for a single TreeEntry returned by Rust tree_directory. - - Applies three layers of filtering: - 1. Ancestor chain — if any ancestor directory would be filtered by _ls_entries, - all descendants are invisible. - 2. Self — the entry's own name must pass _ls_entries at its parent level. - 3. ACL — the entry must be accessible by the requesting context. - """ - entry_path = entry["path"] - + """Check visibility for one flattened path entry returned by Rust.""" if self._ancestor_is_filtered(entry_path, base_path): return False entry_parts = [p for p in entry_path.strip("/").split("/") if p] if entry_parts: - name = entry_parts[-1] parent_parts = entry_parts[:-1] parent_path = "/" + "/".join(parent_parts) if parent_parts else "/" if not self._is_name_visible_at_path(name, parent_path): @@ -1694,10 +1732,26 @@ def _is_tree_entry_visible( return True + def _is_tree_entry_visible( + self, entry: Dict[str, Any], base_path: str, ctx: RequestContext + ) -> bool: + """Check visibility for a single TreeEntry returned by Rust tree_directory.""" + entry_path = entry["path"] + entry_info = entry.get("info", {}) + name = entry_info.get("name") or entry_path.rstrip("/").rsplit("/", 1)[-1] + return self._is_path_entry_visible(entry_path, name, base_path, ctx) + # Over-fetch multiplier for bounded tree traversal. When a node_limit is # set, we push down node_limit * this factor as the raw-node limit to Rust, # leaving headroom for ACL/internal-name filtering before re-fetching. _TREE_OVERFETCH_FACTOR = 4 + _GLOB_PAGE_SIZE_DEFAULT = 1024 + + def _glob_page_size(self, node_limit: Optional[int]) -> int: + """Return the backend page size used by glob_directory.""" + if node_limit is None or node_limit <= 0: + return self._GLOB_PAGE_SIZE_DEFAULT + return node_limit async def _iter_visible_tree_entries( self, @@ -1922,14 +1976,20 @@ def _alias_uri_for_path( entry_path: str, ctx: Optional[RequestContext], ) -> str: - normalized_request, request_parts = self._normalized_uri_parts(request_uri) - if not request_parts or request_parts[0] not in {"agent", "session"}: - return self._path_to_uri(entry_path, ctx=ctx) base = base_path.rstrip("/") + normalized_request, _request_parts = self._normalized_uri_parts(request_uri) + request_root = ( + normalized_request + if normalized_request == "viking://" + else normalized_request.rstrip("/") + ) rel_path = entry_path[len(base) :].strip("/") if entry_path.startswith(base) else "" if not rel_path: - return normalized_request.rstrip("/") - return f"{normalized_request.rstrip('/')}/{rel_path}" + return request_root + if entry_path.startswith(base): + separator = "" if request_root.endswith("://") else "/" + return f"{request_root}{separator}{rel_path}" + return self._path_to_uri(entry_path, ctx=ctx) async def _agfs_path_exists(self, path: str) -> bool: try: diff --git a/tests/storage/test_viking_fs_glob.py b/tests/storage/test_viking_fs_glob.py new file mode 100644 index 0000000000..1fb6ee31ec --- /dev/null +++ b/tests/storage/test_viking_fs_glob.py @@ -0,0 +1,288 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 + +import pytest + +from openviking.server.identity import RequestContext, Role +from openviking.storage.viking_fs import VikingFS +from openviking_cli.exceptions import InvalidArgumentError +from openviking_cli.session.user_id import UserIdentifier + + +class _DummyAgfs: + pass + + +def _default_ctx() -> RequestContext: + return RequestContext(user=UserIdentifier.the_default_user(), role=Role.ROOT) + + +@pytest.fixture +def fs(monkeypatch): + viking_fs = VikingFS(agfs=_DummyAgfs()) + monkeypatch.setattr(viking_fs, "_ctx_or_default", lambda _ctx=None: _default_ctx()) + monkeypatch.setattr( + viking_fs, "_uri_to_path", lambda _uri, **_kwargs: "/local/test_account/resources" + ) + monkeypatch.setattr( + viking_fs, "_read_paths", lambda _uri, **_kwargs: ["/local/test_account/resources"] + ) + monkeypatch.setattr(viking_fs, "_agfs_path_exists", lambda _path: _async_true()) + monkeypatch.setattr(viking_fs, "_read_path_visible", lambda *_args, **_kwargs: _async_true()) + monkeypatch.setattr( + viking_fs, + "_path_to_uri", + lambda path, **_kwargs: path.replace("/local/test_account/", "viking://"), + ) + monkeypatch.setattr(viking_fs, "_is_accessible", lambda _uri, _ctx: True) + return viking_fs + + +async def _async_true(): + return True + + +@pytest.mark.asyncio +async def test_glob_delegates_to_agfs_with_paging_and_visibility(monkeypatch, fs): + calls = [] + + pages = [ + { + "entries": [ + { + "path": "/local/test_account/resources/group/a.md", + "rel_path": "group/a.md", + "name": "a.md", + "is_dir": False, + }, + { + "path": "/local/test_account/resources/_system/secret.md", + "rel_path": "_system/secret.md", + "name": "secret.md", + "is_dir": False, + }, + ], + "next_token": "tok-1", + }, + { + "entries": [ + { + "path": "/local/test_account/resources/group/b.md", + "rel_path": "group/b.md", + "name": "b.md", + "is_dir": False, + } + ], + "next_token": None, + }, + ] + + async def fake_glob_directory(path, pattern, **kwargs): + calls.append({"path": path, "pattern": pattern, **kwargs}) + return pages[len(calls) - 1] + + monkeypatch.setattr(fs._async_agfs, "glob_directory", fake_glob_directory) + + result = await fs.glob("*.md", uri="viking://resources", node_limit=2, ctx=_default_ctx()) + + assert result == { + "matches": [ + "viking://resources/group/a.md", + "viking://resources/group/b.md", + ], + "count": 2, + } + assert [call["continuation_token"] for call in calls] == [None, "tok-1"] + assert all(call["page_size"] == 2 for call in calls) + + +@pytest.mark.asyncio +async def test_glob_stops_after_reaching_limit_at_page_end(monkeypatch, fs): + calls = [] + + async def fake_glob_directory(path, pattern, **kwargs): + calls.append({"path": path, "pattern": pattern, **kwargs}) + return { + "entries": [ + { + "path": "/local/test_account/resources/group/a.md", + "rel_path": "group/a.md", + "name": "a.md", + "is_dir": False, + }, + { + "path": "/local/test_account/resources/group/b.md", + "rel_path": "group/b.md", + "name": "b.md", + "is_dir": False, + }, + ], + "next_token": "tok-should-not-be-used", + } + + monkeypatch.setattr(fs._async_agfs, "glob_directory", fake_glob_directory) + + result = await fs.glob("*.md", uri="viking://resources", node_limit=2, ctx=_default_ctx()) + + assert result == { + "matches": [ + "viking://resources/group/a.md", + "viking://resources/group/b.md", + ], + "count": 2, + } + assert len(calls) == 1 + + +@pytest.mark.asyncio +async def test_glob_uses_python_match_as_final_oracle(monkeypatch, fs): + async def fake_glob_directory(path, pattern, **kwargs): + return { + "entries": [ + { + "path": "/local/test_account/resources/a.md", + "rel_path": "a.md", + "name": "a.md", + "is_dir": False, + } + ], + "next_token": None, + } + + monkeypatch.setattr(fs._async_agfs, "glob_directory", fake_glob_directory) + + result = await fs.glob("**/*.md", uri="viking://resources", ctx=_default_ctx()) + + assert result == {"matches": [], "count": 0} + + +@pytest.mark.asyncio +async def test_glob_rejects_empty_pattern(fs): + with pytest.raises(InvalidArgumentError): + await fs.glob("", uri="viking://resources", ctx=_default_ctx()) + + +@pytest.mark.asyncio +async def test_glob_checks_access_before_listing(monkeypatch, fs): + called = False + + def fake_ensure_access(uri, ctx): + nonlocal called + called = True + raise PermissionError(f"denied: {uri}") + + monkeypatch.setattr(fs, "_ensure_access", fake_ensure_access) + + with pytest.raises(PermissionError): + await fs.glob("*.md", uri="viking://resources", ctx=_default_ctx()) + + assert called is True + + +@pytest.mark.asyncio +async def test_glob_preserves_request_uri_alias(monkeypatch, fs): + monkeypatch.setattr(fs, "_uri_to_path", lambda _uri, **_kwargs: "/local/test_account/user") + monkeypatch.setattr(fs, "_read_paths", lambda _uri, **_kwargs: ["/local/test_account/user"]) + monkeypatch.setattr( + fs, "_path_to_uri", lambda _path, **_kwargs: "viking://user/test_account/demo.md" + ) + + async def fake_glob_directory(path, pattern, **kwargs): + return { + "entries": [ + { + "path": "/local/test_account/user/demo.md", + "rel_path": "demo.md", + "name": "demo.md", + "is_dir": False, + } + ], + "next_token": None, + } + + monkeypatch.setattr(fs._async_agfs, "glob_directory", fake_glob_directory) + + result = await fs.glob("*.md", uri="viking://user", ctx=_default_ctx()) + + assert result == {"matches": ["viking://user/demo.md"], "count": 1} + + +@pytest.mark.asyncio +async def test_glob_preserves_root_uri(monkeypatch, fs): + monkeypatch.setattr(fs, "_uri_to_path", lambda _uri, **_kwargs: "/local/test_account") + monkeypatch.setattr(fs, "_read_paths", lambda _uri, **_kwargs: ["/local/test_account"]) + monkeypatch.setattr( + fs, "_path_to_uri", lambda _path, **_kwargs: "viking://resources/should-not-leak.md" + ) + + async def fake_glob_directory(path, pattern, **kwargs): + return { + "entries": [ + { + "path": "/local/test_account/resources/demo.md", + "rel_path": "resources/demo.md", + "name": "demo.md", + "is_dir": False, + } + ], + "next_token": None, + } + + monkeypatch.setattr(fs._async_agfs, "glob_directory", fake_glob_directory) + + result = await fs.glob("*.md", uri="viking://", ctx=_default_ctx()) + + assert result == {"matches": ["viking://resources/demo.md"], "count": 1} + + +@pytest.mark.asyncio +async def test_glob_keeps_directory_matches(monkeypatch, fs): + async def fake_glob_directory(path, pattern, **kwargs): + return { + "entries": [ + { + "path": "/local/test_account/resources/folder", + "rel_path": "folder", + "name": "folder", + "is_dir": True, + } + ], + "next_token": None, + } + + monkeypatch.setattr(fs._async_agfs, "glob_directory", fake_glob_directory) + + result = await fs.glob("*", uri="viking://resources", ctx=_default_ctx()) + + assert result == {"matches": ["viking://resources/folder"], "count": 1} + + +@pytest.mark.asyncio +async def test_glob_preserves_legacy_session_alias(monkeypatch, fs): + monkeypatch.setattr( + fs, "_uri_to_path", lambda _uri, **_kwargs: "/local/test_account/user/alice/sessions/sess_1" + ) + monkeypatch.setattr( + fs, + "_read_paths", + lambda _uri, **_kwargs: ["/local/test_account/user/alice/sessions/sess_1"], + ) + + async def fake_glob_directory(path, pattern, **kwargs): + return { + "entries": [ + { + "path": "/local/test_account/user/alice/sessions/sess_1/messages.jsonl", + "rel_path": "messages.jsonl", + "name": "messages.jsonl", + "is_dir": False, + } + ], + "next_token": None, + } + + monkeypatch.setattr(fs._async_agfs, "glob_directory", fake_glob_directory) + + result = await fs.glob("*.jsonl", uri="viking://session/alice/sess_1", ctx=_default_ctx()) + + assert result == {"matches": ["viking://session/alice/sess_1/messages.jsonl"], "count": 1}