Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 186 additions & 3 deletions crates/iceberg/src/transaction/overwrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::transaction::snapshot::{
DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer,
};
use crate::transaction::{ActionCommit, TransactionAction};
use crate::{Error, ErrorKind};

/// OverwriteAction is a transaction action for overwriting data files in the table.
///
Expand Down Expand Up @@ -98,6 +99,13 @@ impl OverwriteAction {
#[async_trait]
impl TransactionAction for OverwriteAction {
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
if !self.deleted_data_files.is_empty() && table.metadata().current_snapshot().is_none() {
return Err(Error::new(
ErrorKind::PreconditionFailed,
"Cannot delete data files from a table with no current snapshot",
));
}

let snapshot_producer = SnapshotProducer::new(
table,
self.commit_uuid.unwrap_or_else(Uuid::now_v7),
Expand Down Expand Up @@ -168,15 +176,22 @@ impl SnapshotProduceOperation for OverwriteOperation {
return Ok(manifest_list
.entries()
.iter()
.filter(|entry| entry.has_added_files() || entry.has_existing_files())
.filter(|entry| {
entry.has_added_files()
|| entry.has_existing_files()
|| entry.has_deleted_files()
})
.cloned()
.collect());
}

let mut result = Vec::new();

for manifest_file in manifest_list.entries() {
if !manifest_file.has_added_files() && !manifest_file.has_existing_files() {
if !manifest_file.has_added_files()
&& !manifest_file.has_existing_files()
&& !manifest_file.has_deleted_files()
{
continue;
}

Expand Down Expand Up @@ -224,7 +239,20 @@ impl OverwriteOperation {
Some(self.snapshot_id),
manifest_file.key_metadata.clone(),
table.metadata().current_schema().clone(),
table.metadata().default_partition_spec().as_ref().clone(),
table
.metadata()
.partition_spec_by_id(manifest_file.partition_spec_id)
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!(
"Partition spec {} not found in table metadata",
manifest_file.partition_spec_id
),
)
})?
.as_ref()
.clone(),
);

let mut writer = match table.metadata().format_version() {
Expand Down Expand Up @@ -527,4 +555,159 @@ mod tests {
"Deleted entry should survive fast_append, entries: {all_entries:?}",
);
}

#[tokio::test]
async fn test_overwrite_unaffected_manifest_passthrough() {
use crate::memory::tests::new_memory_catalog;
use crate::transaction::ApplyTransactionAction;
use crate::transaction::tests::make_v3_minimal_table_in_catalog;

let catalog = new_memory_catalog().await;
let table = make_v3_minimal_table_in_catalog(&catalog).await;
let spec_id = table.metadata().default_partition_spec_id();

// Commit file A in its own manifest.
let file_a = test_data_file("test/a.parquet", spec_id);
let tx = Transaction::new(&table);
let action = tx.fast_append().add_data_files(vec![file_a.clone()]);
let tx = action.apply(tx).unwrap();
let table = tx.commit(&catalog).await.unwrap();

// Commit file B in a second manifest.
let file_b = test_data_file("test/b.parquet", spec_id);
let tx = Transaction::new(&table);
let action = tx.fast_append().add_data_files(vec![file_b.clone()]);
let tx = action.apply(tx).unwrap();
let table = tx.commit(&catalog).await.unwrap();

// Record the manifest paths before the overwrite.
let pre_snapshot = table.metadata().current_snapshot().unwrap();
let pre_manifest_list = pre_snapshot
.load_manifest_list(table.file_io(), table.metadata())
.await
.unwrap();
assert_eq!(2, pre_manifest_list.entries().len());

// Find which manifest contains file B (the unaffected one).
let mut manifest_b_path = None;
for mf in pre_manifest_list.entries() {
let manifest = mf.load_manifest(table.file_io()).await.unwrap();
if manifest
.entries()
.iter()
.any(|e| e.file_path() == "test/b.parquet")
{
manifest_b_path = Some(mf.manifest_path.clone());
}
}
let manifest_b_path = manifest_b_path.expect("manifest for file B not found");

// Overwrite: delete file A only, no adds (delete-only overwrite).
let tx = Transaction::new(&table);
let action = tx.overwrite().delete_data_files(vec![file_a.clone()]);
let tx = action.apply(tx).unwrap();
let table = tx.commit(&catalog).await.unwrap();

let post_snapshot = table.metadata().current_snapshot().unwrap();
let post_manifest_list = post_snapshot
.load_manifest_list(table.file_io(), table.metadata())
.await
.unwrap();

// Should still have 2 manifests: one rewritten (A deleted), one passed through (B unchanged).
assert_eq!(
2,
post_manifest_list.entries().len(),
"Expected 2 manifests after delete-only overwrite, got {}",
post_manifest_list.entries().len()
);

// The manifest containing file B should be the same path (passthrough).
let passthrough = post_manifest_list
.entries()
.iter()
.find(|mf| mf.manifest_path == manifest_b_path);
assert!(
passthrough.is_some(),
"Manifest for file B should be passed through unchanged, post manifests: {:?}",
post_manifest_list
.entries()
.iter()
.map(|e| &e.manifest_path)
.collect::<Vec<_>>()
);

// File A should be marked deleted, file B should still be alive.
let mut all_entries = vec![];
for mf in post_manifest_list.entries() {
let manifest = mf.load_manifest(table.file_io()).await.unwrap();
for entry in manifest.entries() {
all_entries.push((entry.status(), entry.file_path().to_string()));
}
}
assert!(
all_entries
.iter()
.any(|(s, p)| *s == ManifestStatus::Deleted && p == "test/a.parquet"),
"File A should be Deleted, entries: {all_entries:?}"
);
assert!(
all_entries
.iter()
.any(|(s, p)| *s != ManifestStatus::Deleted && p == "test/b.parquet"),
"File B should still be alive, entries: {all_entries:?}"
);
}

#[tokio::test]
async fn test_delete_only_overwrite_summary() {
use crate::memory::tests::new_memory_catalog;
use crate::transaction::ApplyTransactionAction;
use crate::transaction::tests::make_v3_minimal_table_in_catalog;

let catalog = new_memory_catalog().await;
let table = make_v3_minimal_table_in_catalog(&catalog).await;
let spec_id = table.metadata().default_partition_spec_id();

// Append 3 files.
let file1 = test_data_file("test/f1.parquet", spec_id);
let file2 = test_data_file("test/f2.parquet", spec_id);
let file3 = test_data_file("test/f3.parquet", spec_id);
let tx = Transaction::new(&table);
let action =
tx.fast_append()
.add_data_files(vec![file1.clone(), file2.clone(), file3.clone()]);
let tx = action.apply(tx).unwrap();
let table = tx.commit(&catalog).await.unwrap();

// Delete one file with a delete-only overwrite (no adds).
let tx = Transaction::new(&table);
let action = tx.overwrite().delete_data_files(vec![file1.clone()]);
let tx = action.apply(tx).unwrap();
let table = tx.commit(&catalog).await.unwrap();

let snapshot = table.metadata().current_snapshot().unwrap();
assert_eq!(snapshot.summary().operation, Operation::Overwrite);

let props = &snapshot.summary().additional_properties;

// Overwrite semantics use truncate_table_summary which treats overwrite as
// "replace all": deleted-data-files reflects the previous total (3), not the
// number of explicitly deleted files (1). Without Fix 1, previous_snapshot
// would be None so previous_total=0 and u64 subtraction would underflow.
assert_eq!(
props.get("deleted-data-files").map(|s| s.as_str()),
Some("3"),
"Expected deleted-data-files=3 (overwrite semantics), got: {props:?}"
);

// total-data-files should be 0 after a delete-only overwrite (overwrite
// semantics: 3 removed - 3 previous = 0). Without Fix 1 this would panic
// with u64 underflow in debug mode.
assert_eq!(
props.get("total-data-files").map(|s| s.as_str()),
Some("0"),
"Expected total-data-files=0 (overwrite semantics), got: {props:?}"
);
}
}
5 changes: 1 addition & 4 deletions crates/iceberg/src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,10 +404,7 @@ impl<'a> SnapshotProducer<'a> {
);
}

let previous_snapshot = table_metadata
.snapshot_by_id(self.snapshot_id)
.and_then(|snapshot| snapshot.parent_snapshot_id())
.and_then(|parent_id| table_metadata.snapshot_by_id(parent_id));
let previous_snapshot = table_metadata.current_snapshot();

let mut additional_properties = summary_collector.build();
additional_properties.extend(self.snapshot_properties.clone());
Expand Down
Loading