diff --git a/crates/iceberg/src/io/local_fs.rs b/crates/iceberg/src/io/local_fs.rs index 0a55199f70..fbf23ac3d3 100644 --- a/crates/iceberg/src/io/local_fs.rs +++ b/crates/iceberg/src/io/local_fs.rs @@ -29,6 +29,8 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; +use futures::StreamExt; +use futures::stream::BoxStream; use serde::{Deserialize, Serialize}; use super::{ @@ -200,6 +202,13 @@ impl Storage for LocalFsStorage { Ok(()) } + async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> { + while let Some(path) = paths.next().await { + self.delete(&path).await?; + } + Ok(()) + } + fn new_input(&self, path: &str) -> Result { Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) } @@ -534,4 +543,61 @@ mod tests { assert!(path.exists()); } + + #[tokio::test] + async fn test_local_fs_storage_delete_stream() { + use futures::stream; + + let tmp_dir = TempDir::new().unwrap(); + let storage = LocalFsStorage::new(); + + // Create multiple files + let file1 = tmp_dir.path().join("file1.txt"); + let file2 = tmp_dir.path().join("file2.txt"); + let file3 = tmp_dir.path().join("file3.txt"); + + storage + .write(file1.to_str().unwrap(), Bytes::from("1")) + .await + .unwrap(); + storage + .write(file2.to_str().unwrap(), Bytes::from("2")) + .await + .unwrap(); + storage + .write(file3.to_str().unwrap(), Bytes::from("3")) + .await + .unwrap(); + + // Verify files exist + assert!(storage.exists(file1.to_str().unwrap()).await.unwrap()); + assert!(storage.exists(file2.to_str().unwrap()).await.unwrap()); + assert!(storage.exists(file3.to_str().unwrap()).await.unwrap()); + + // Delete multiple files using stream + let paths = vec![ + file1.to_str().unwrap().to_string(), + file2.to_str().unwrap().to_string(), + ]; + let path_stream = stream::iter(paths).boxed(); + storage.delete_stream(path_stream).await.unwrap(); + + // Verify deleted files no longer exist + assert!(!storage.exists(file1.to_str().unwrap()).await.unwrap()); + assert!(!storage.exists(file2.to_str().unwrap()).await.unwrap()); + + // Verify file3 still exists + assert!(storage.exists(file3.to_str().unwrap()).await.unwrap()); + } + + #[tokio::test] + async fn test_local_fs_storage_delete_stream_empty() { + use futures::stream; + + let storage = LocalFsStorage::new(); + + // Delete with empty stream should succeed + let path_stream = stream::iter(Vec::::new()).boxed(); + storage.delete_stream(path_stream).await.unwrap(); + } } diff --git a/crates/iceberg/src/io/memory.rs b/crates/iceberg/src/io/memory.rs index 39f1f5db9d..0752003a86 100644 --- a/crates/iceberg/src/io/memory.rs +++ b/crates/iceberg/src/io/memory.rs @@ -28,6 +28,8 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use bytes::Bytes; +use futures::StreamExt; +use futures::stream::BoxStream; use serde::{Deserialize, Serialize}; use super::{ @@ -220,6 +222,13 @@ impl Storage for MemoryStorage { Ok(()) } + async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> { + while let Some(path) = paths.next().await { + self.delete(&path).await?; + } + Ok(()) + } + fn new_input(&self, path: &str) -> Result { Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) } @@ -594,4 +603,56 @@ mod tests { assert_eq!(storage.read("/path/to/file").await.unwrap(), content); assert_eq!(storage.read("path/to/file").await.unwrap(), content); } + + #[tokio::test] + async fn test_memory_storage_delete_stream() { + use futures::stream; + + let storage = MemoryStorage::new(); + + // Create multiple files + storage + .write("memory://file1.txt", Bytes::from("1")) + .await + .unwrap(); + storage + .write("memory://file2.txt", Bytes::from("2")) + .await + .unwrap(); + storage + .write("memory://file3.txt", Bytes::from("3")) + .await + .unwrap(); + + // Verify files exist + assert!(storage.exists("memory://file1.txt").await.unwrap()); + assert!(storage.exists("memory://file2.txt").await.unwrap()); + assert!(storage.exists("memory://file3.txt").await.unwrap()); + + // Delete multiple files using stream + let paths = vec![ + "memory://file1.txt".to_string(), + "memory://file2.txt".to_string(), + ]; + let path_stream = stream::iter(paths).boxed(); + storage.delete_stream(path_stream).await.unwrap(); + + // Verify deleted files no longer exist + assert!(!storage.exists("memory://file1.txt").await.unwrap()); + assert!(!storage.exists("memory://file2.txt").await.unwrap()); + + // Verify file3 still exists + assert!(storage.exists("memory://file3.txt").await.unwrap()); + } + + #[tokio::test] + async fn test_memory_storage_delete_stream_empty() { + use futures::stream; + + let storage = MemoryStorage::new(); + + // Delete with empty stream should succeed + let path_stream = stream::iter(Vec::::new()).boxed(); + storage.delete_stream(path_stream).await.unwrap(); + } } diff --git a/crates/iceberg/src/io/opendal/mod.rs b/crates/iceberg/src/io/opendal/mod.rs index fb49dc9e3f..541b38de03 100644 --- a/crates/iceberg/src/io/opendal/mod.rs +++ b/crates/iceberg/src/io/opendal/mod.rs @@ -24,6 +24,8 @@ use async_trait::async_trait; #[cfg(feature = "storage-azdls")] use azdls::AzureStorageScheme; use bytes::Bytes; +use futures::StreamExt; +use futures::stream::BoxStream; use opendal::layers::RetryLayer; #[cfg(feature = "storage-azdls")] use opendal::services::AzdlsConfig; @@ -429,6 +431,28 @@ impl Storage for OpenDalStorage { Ok(op.remove_all(&path).await?) } + async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> { + // Get the first path to create the operator + let Some(first_path) = paths.next().await else { + return Ok(()); + }; + + let storage = self.clone(); + let (op, first_relative) = self.create_operator(&first_path)?; + + // Create a stream of relative paths, starting with the first one + let relative_paths = futures::stream::once(async move { first_relative.to_string() }) + .chain(paths.map(move |path| { + let (_, relative_path) = storage + .create_operator(&path) + .expect("Failed to create operator"); + relative_path.to_string() + })); + + op.delete_stream(relative_paths).await?; + Ok(()) + } + #[allow(unreachable_code, unused_variables)] fn new_input(&self, path: &str) -> Result { Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) diff --git a/crates/iceberg/src/io/storage.rs b/crates/iceberg/src/io/storage.rs index 15cc85ab10..193273bd2e 100644 --- a/crates/iceberg/src/io/storage.rs +++ b/crates/iceberg/src/io/storage.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; +use futures::stream::BoxStream; use super::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; use crate::Result; @@ -92,6 +93,9 @@ pub trait Storage: Debug + Send + Sync { /// Delete all files with the given prefix async fn delete_prefix(&self, path: &str) -> Result<()>; + /// Delete multiple files from a stream of paths. + async fn delete_stream(&self, paths: BoxStream<'static, String>) -> Result<()>; + /// Create a new input file for reading fn new_input(&self, path: &str) -> Result;