Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions crates/iceberg/src/io/local_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};

use super::{
Expand Down Expand Up @@ -200,6 +202,13 @@ impl Storage for LocalFsStorage {
Ok(())
}

async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
while let Some(path) = paths.next().await {
self.delete(&path).await?;
}
Ok(())
}

fn new_input(&self, path: &str) -> Result<InputFile> {
Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
}
Expand Down Expand Up @@ -534,4 +543,61 @@ mod tests {

assert!(path.exists());
}

#[tokio::test]
async fn test_local_fs_storage_delete_stream() {
use futures::stream;

let tmp_dir = TempDir::new().unwrap();
let storage = LocalFsStorage::new();

// Create multiple files
let file1 = tmp_dir.path().join("file1.txt");
let file2 = tmp_dir.path().join("file2.txt");
let file3 = tmp_dir.path().join("file3.txt");

storage
.write(file1.to_str().unwrap(), Bytes::from("1"))
.await
.unwrap();
storage
.write(file2.to_str().unwrap(), Bytes::from("2"))
.await
.unwrap();
storage
.write(file3.to_str().unwrap(), Bytes::from("3"))
.await
.unwrap();

// Verify files exist
assert!(storage.exists(file1.to_str().unwrap()).await.unwrap());
assert!(storage.exists(file2.to_str().unwrap()).await.unwrap());
assert!(storage.exists(file3.to_str().unwrap()).await.unwrap());

// Delete multiple files using stream
let paths = vec![
file1.to_str().unwrap().to_string(),
file2.to_str().unwrap().to_string(),
];
let path_stream = stream::iter(paths).boxed();
storage.delete_stream(path_stream).await.unwrap();

// Verify deleted files no longer exist
assert!(!storage.exists(file1.to_str().unwrap()).await.unwrap());
assert!(!storage.exists(file2.to_str().unwrap()).await.unwrap());

// Verify file3 still exists
assert!(storage.exists(file3.to_str().unwrap()).await.unwrap());
}

#[tokio::test]
async fn test_local_fs_storage_delete_stream_empty() {
use futures::stream;

let storage = LocalFsStorage::new();

// Delete with empty stream should succeed
let path_stream = stream::iter(Vec::<String>::new()).boxed();
storage.delete_stream(path_stream).await.unwrap();
}
}
61 changes: 61 additions & 0 deletions crates/iceberg/src/io/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use std::sync::{Arc, RwLock};

use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};

use super::{
Expand Down Expand Up @@ -220,6 +222,13 @@ impl Storage for MemoryStorage {
Ok(())
}

async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
while let Some(path) = paths.next().await {
self.delete(&path).await?;
}
Ok(())
}

fn new_input(&self, path: &str) -> Result<InputFile> {
Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
}
Expand Down Expand Up @@ -594,4 +603,56 @@ mod tests {
assert_eq!(storage.read("/path/to/file").await.unwrap(), content);
assert_eq!(storage.read("path/to/file").await.unwrap(), content);
}

#[tokio::test]
async fn test_memory_storage_delete_stream() {
use futures::stream;

let storage = MemoryStorage::new();

// Create multiple files
storage
.write("memory://file1.txt", Bytes::from("1"))
.await
.unwrap();
storage
.write("memory://file2.txt", Bytes::from("2"))
.await
.unwrap();
storage
.write("memory://file3.txt", Bytes::from("3"))
.await
.unwrap();

// Verify files exist
assert!(storage.exists("memory://file1.txt").await.unwrap());
assert!(storage.exists("memory://file2.txt").await.unwrap());
assert!(storage.exists("memory://file3.txt").await.unwrap());

// Delete multiple files using stream
let paths = vec![
"memory://file1.txt".to_string(),
"memory://file2.txt".to_string(),
];
let path_stream = stream::iter(paths).boxed();
storage.delete_stream(path_stream).await.unwrap();

// Verify deleted files no longer exist
assert!(!storage.exists("memory://file1.txt").await.unwrap());
assert!(!storage.exists("memory://file2.txt").await.unwrap());

// Verify file3 still exists
assert!(storage.exists("memory://file3.txt").await.unwrap());
}

#[tokio::test]
async fn test_memory_storage_delete_stream_empty() {
use futures::stream;

let storage = MemoryStorage::new();

// Delete with empty stream should succeed
let path_stream = stream::iter(Vec::<String>::new()).boxed();
storage.delete_stream(path_stream).await.unwrap();
}
}
24 changes: 24 additions & 0 deletions crates/iceberg/src/io/opendal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use async_trait::async_trait;
#[cfg(feature = "storage-azdls")]
use azdls::AzureStorageScheme;
use bytes::Bytes;
use futures::StreamExt;
use futures::stream::BoxStream;
use opendal::layers::RetryLayer;
#[cfg(feature = "storage-azdls")]
use opendal::services::AzdlsConfig;
Expand Down Expand Up @@ -429,6 +431,28 @@ impl Storage for OpenDalStorage {
Ok(op.remove_all(&path).await?)
}

async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
// Get the first path to create the operator
let Some(first_path) = paths.next().await else {
return Ok(());
};

let storage = self.clone();
let (op, first_relative) = self.create_operator(&first_path)?;

// Create a stream of relative paths, starting with the first one
let relative_paths = futures::stream::once(async move { first_relative.to_string() })
.chain(paths.map(move |path| {
let (_, relative_path) = storage
.create_operator(&path)
.expect("Failed to create operator");
relative_path.to_string()
}));

op.delete_stream(relative_paths).await?;
Ok(())
}

#[allow(unreachable_code, unused_variables)]
fn new_input(&self, path: &str) -> Result<InputFile> {
Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
Expand Down
4 changes: 4 additions & 0 deletions crates/iceberg/src/io/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;

use super::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile};
use crate::Result;
Expand Down Expand Up @@ -92,6 +93,9 @@ pub trait Storage: Debug + Send + Sync {
/// Delete all files with the given prefix
async fn delete_prefix(&self, path: &str) -> Result<()>;

/// Delete multiple files from a stream of paths.
async fn delete_stream(&self, paths: BoxStream<'static, String>) -> Result<()>;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some considerations when designing this, I've put it here: #2065 (comment)


/// Create a new input file for reading
fn new_input(&self, path: &str) -> Result<InputFile>;

Expand Down
Loading