From eeb2e26c48fa3979761ad178b0bfc315f9007ded Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Wed, 4 Feb 2026 15:10:52 +0530 Subject: [PATCH 1/5] Add ReplaceDataFilesAction for compaction support --- crates/iceberg/src/spec/snapshot_summary.rs | 1 + crates/iceberg/src/transaction/mod.rs | 7 + .../src/transaction/replace_data_files.rs | 271 ++++++++++++++++++ 3 files changed, 279 insertions(+) create mode 100644 crates/iceberg/src/transaction/replace_data_files.rs diff --git a/crates/iceberg/src/spec/snapshot_summary.rs b/crates/iceberg/src/spec/snapshot_summary.rs index c67ee37d3e..c4767f31cf 100644 --- a/crates/iceberg/src/spec/snapshot_summary.rs +++ b/crates/iceberg/src/spec/snapshot_summary.rs @@ -339,6 +339,7 @@ pub(crate) fn update_snapshot_summaries( if summary.operation != Operation::Append && summary.operation != Operation::Overwrite && summary.operation != Operation::Delete + && summary.operation != Operation::Replace { return Err(Error::new( ErrorKind::DataInvalid, diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 074c7fefe4..5c7658c382 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -54,6 +54,7 @@ mod action; pub use action::*; mod append; +mod replace_data_files; mod snapshot; mod sort_order; mod update_location; @@ -71,6 +72,7 @@ use crate::spec::TableProperties; use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; +use crate::transaction::replace_data_files::ReplaceDataFilesAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; @@ -156,6 +158,11 @@ impl Transaction { UpdateStatisticsAction::new() } + /// Creates a replace data files action (for compaction). + pub fn replace_data_files(&self) -> ReplaceDataFilesAction { + ReplaceDataFilesAction::new() + } + /// Commit transaction. pub async fn commit(self, catalog: &dyn Catalog) -> Result { if self.actions.is_empty() { diff --git a/crates/iceberg/src/transaction/replace_data_files.rs b/crates/iceberg/src/transaction/replace_data_files.rs new file mode 100644 index 0000000000..ce42f52357 --- /dev/null +++ b/crates/iceberg/src/transaction/replace_data_files.rs @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use uuid::Uuid; + +use crate::error::Result; +use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation}; +use crate::table::Table; +use crate::transaction::snapshot::{ + DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, +}; +use crate::transaction::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind}; + +/// Action to replace data files in a table (for compaction/rewrite operations). +pub struct ReplaceDataFilesAction { + commit_uuid: Option, + key_metadata: Option>, + snapshot_properties: HashMap, + files_to_delete: Vec, + files_to_add: Vec, +} + +impl ReplaceDataFilesAction { + pub(crate) fn new() -> Self { + Self { + commit_uuid: None, + key_metadata: None, + snapshot_properties: HashMap::default(), + files_to_delete: vec![], + files_to_add: vec![], + } + } + + /// Add files to delete (old files being replaced). + pub fn delete_files(mut self, files: impl IntoIterator) -> Self { + self.files_to_delete.extend(files); + self + } + + /// Add files to add (new files replacing old ones). + pub fn add_files(mut self, files: impl IntoIterator) -> Self { + self.files_to_add.extend(files); + self + } + + /// Set commit UUID. + pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self { + self.commit_uuid = Some(commit_uuid); + self + } + + /// Set key metadata for manifest files. + pub fn set_key_metadata(mut self, key_metadata: Vec) -> Self { + self.key_metadata = Some(key_metadata); + self + } + + /// Set snapshot summary properties. + pub fn set_snapshot_properties(mut self, props: HashMap) -> Self { + self.snapshot_properties = props; + self + } +} + +#[async_trait] +impl TransactionAction for ReplaceDataFilesAction { + async fn commit(self: Arc, table: &Table) -> Result { + if self.files_to_delete.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Replace operation requires files to delete", + )); + } + + if self.files_to_add.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Replace operation requires files to add", + )); + } + + let snapshot_producer = SnapshotProducer::new( + table, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + self.key_metadata.clone(), + self.snapshot_properties.clone(), + self.files_to_add.clone(), + ); + + snapshot_producer.validate_added_data_files()?; + + let replace_op = ReplaceOperation { + files_to_delete: self.files_to_delete.clone(), + }; + + snapshot_producer + .commit(replace_op, DefaultManifestProcess) + .await + } +} + +struct ReplaceOperation { + files_to_delete: Vec, +} + +impl SnapshotProduceOperation for ReplaceOperation { + fn operation(&self) -> Operation { + Operation::Replace + } + + async fn delete_entries( + &self, + _snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + Ok(vec![]) + } + + async fn existing_manifest( + &self, + snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else { + return Ok(vec![]); + }; + + let files_to_delete: std::collections::HashSet<&str> = self + .files_to_delete + .iter() + .map(|f| f.file_path.as_str()) + .collect(); + + let manifest_list = snapshot + .load_manifest_list( + snapshot_produce.table.file_io(), + &snapshot_produce.table.metadata_ref(), + ) + .await?; + + // Include existing manifests that don't contain deleted files + let mut result = Vec::new(); + for entry in manifest_list.entries() { + if !entry.has_added_files() && !entry.has_existing_files() { + continue; + } + + let manifest = entry.load_manifest(snapshot_produce.table.file_io()).await?; + let has_deleted_file = manifest + .entries() + .iter() + .any(|e| e.is_alive() && files_to_delete.contains(e.file_path())); + + if !has_deleted_file { + result.push(entry.clone()); + } + } + + Ok(result) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::spec::{ + DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Operation, Struct, + }; + use crate::transaction::tests::make_v2_minimal_table; + use crate::transaction::{Transaction, TransactionAction}; + use crate::TableUpdate; + + fn create_data_file(table: &crate::table::Table, path: &str, record_count: u64) -> DataFile { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(path.to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(record_count) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap() + } + + #[tokio::test] + async fn test_replace_data_files_basic() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let old_file = create_data_file(&table, "data/old.parquet", 100); + let new_file = create_data_file(&table, "data/new.parquet", 100); + + let action = tx + .replace_data_files() + .delete_files(vec![old_file]) + .add_files(vec![new_file]); + + let result = Arc::new(action).commit(&table).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_replace_data_files_empty_deletes_fails() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let new_file = create_data_file(&table, "data/new.parquet", 100); + + let action = tx.replace_data_files().add_files(vec![new_file]); + + let result = Arc::new(action).commit(&table).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_replace_data_files_empty_adds_fails() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let old_file = create_data_file(&table, "data/old.parquet", 100); + + let action = tx.replace_data_files().delete_files(vec![old_file]); + + let result = Arc::new(action).commit(&table).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_replace_uses_replace_operation() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let old_file = create_data_file(&table, "data/old.parquet", 100); + let new_file = create_data_file(&table, "data/new.parquet", 100); + + let action = tx + .replace_data_files() + .delete_files(vec![old_file]) + .add_files(vec![new_file]); + + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + snapshot + } else { + panic!("Expected AddSnapshot"); + }; + + assert_eq!(new_snapshot.summary().operation, Operation::Replace); + } +} From e8900b3f09bd8f9f4f9aba9c895355dc91a8f9f3 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Wed, 4 Feb 2026 15:12:12 +0530 Subject: [PATCH 2/5] Add integration tests for ReplaceDataFilesAction --- .../src/transaction/replace_data_files.rs | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/crates/iceberg/src/transaction/replace_data_files.rs b/crates/iceberg/src/transaction/replace_data_files.rs index ce42f52357..86d3a309de 100644 --- a/crates/iceberg/src/transaction/replace_data_files.rs +++ b/crates/iceberg/src/transaction/replace_data_files.rs @@ -269,3 +269,123 @@ mod tests { assert_eq!(new_snapshot.summary().operation, Operation::Replace); } } + +#[cfg(test)] +mod integration_tests { + use crate::memory::tests::new_memory_catalog; + use crate::spec::{ + DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Operation, Struct, + }; + use crate::transaction::tests::make_v3_minimal_table_in_catalog; + use crate::transaction::{ApplyTransactionAction, Transaction}; + + fn create_file(path: &str, record_count: u64) -> DataFile { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(path.to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(record_count) + .partition(Struct::from_iter([Some(Literal::long(0))])) + .partition_spec_id(0) + .build() + .unwrap() + } + + #[tokio::test] + async fn test_replace_after_append() { + let catalog = new_memory_catalog().await; + let table = make_v3_minimal_table_in_catalog(&catalog).await; + + // First append some files + let file1 = create_file("data/file1.parquet", 100); + let file2 = create_file("data/file2.parquet", 100); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![file1.clone(), file2.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + assert_eq!(table.metadata().snapshots().count(), 1); + + // Now replace file1 and file2 with a single compacted file + let compacted = create_file("data/compacted.parquet", 200); + let tx = Transaction::new(&table); + let action = tx + .replace_data_files() + .delete_files(vec![file1, file2]) + .add_files(vec![compacted]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + assert_eq!(table.metadata().snapshots().count(), 2); + let snapshot = table.metadata().current_snapshot().unwrap(); + assert_eq!(snapshot.summary().operation, Operation::Replace); + + // Verify manifest has only the compacted file + let manifest_list = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 1); + + let manifest = manifest_list.entries()[0] + .load_manifest(table.file_io()) + .await + .unwrap(); + assert_eq!(manifest.entries().len(), 1); + assert_eq!(manifest.entries()[0].file_path(), "data/compacted.parquet"); + } + + #[tokio::test] + async fn test_replace_preserves_unrelated_files() { + let catalog = new_memory_catalog().await; + let table = make_v3_minimal_table_in_catalog(&catalog).await; + + // Append file1 + let file1 = create_file("data/file1.parquet", 100); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![file1.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + // Append file2 in separate transaction + let file2 = create_file("data/file2.parquet", 100); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![file2.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + // Replace only file1 + let file1_compacted = create_file("data/file1_compacted.parquet", 100); + let tx = Transaction::new(&table); + let action = tx + .replace_data_files() + .delete_files(vec![file1]) + .add_files(vec![file1_compacted]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + // Verify both file2 manifest and new compacted manifest exist + let snapshot = table.metadata().current_snapshot().unwrap(); + let manifest_list = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + + // Should have 2 manifests: one for file2 (preserved), one for compacted + assert_eq!(manifest_list.entries().len(), 2); + + // Collect all file paths + let mut all_files = Vec::new(); + for entry in manifest_list.entries() { + let manifest = entry.load_manifest(table.file_io()).await.unwrap(); + for e in manifest.entries() { + if e.is_alive() { + all_files.push(e.file_path().to_string()); + } + } + } + all_files.sort(); + assert_eq!(all_files, vec!["data/file1_compacted.parquet", "data/file2.parquet"]); + } +} From 72c7c5cbe82a8ffadbd09df47a4027289f28fc1d Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Wed, 4 Feb 2026 16:53:08 +0530 Subject: [PATCH 3/5] Add validate_from_snapshot to ReplaceDataFilesAction --- .../src/transaction/replace_data_files.rs | 113 ++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/crates/iceberg/src/transaction/replace_data_files.rs b/crates/iceberg/src/transaction/replace_data_files.rs index 86d3a309de..ec58680ff3 100644 --- a/crates/iceberg/src/transaction/replace_data_files.rs +++ b/crates/iceberg/src/transaction/replace_data_files.rs @@ -37,6 +37,7 @@ pub struct ReplaceDataFilesAction { snapshot_properties: HashMap, files_to_delete: Vec, files_to_add: Vec, + validate_from_snapshot_id: Option, } impl ReplaceDataFilesAction { @@ -47,6 +48,7 @@ impl ReplaceDataFilesAction { snapshot_properties: HashMap::default(), files_to_delete: vec![], files_to_add: vec![], + validate_from_snapshot_id: None, } } @@ -79,6 +81,12 @@ impl ReplaceDataFilesAction { self.snapshot_properties = props; self } + + /// Validate that files to delete exist from this snapshot. + pub fn validate_from_snapshot(mut self, snapshot_id: i64) -> Self { + self.validate_from_snapshot_id = Some(snapshot_id); + self + } } #[async_trait] @@ -98,6 +106,11 @@ impl TransactionAction for ReplaceDataFilesAction { )); } + // Validate files exist from specified snapshot + if let Some(snapshot_id) = self.validate_from_snapshot_id { + Self::validate_files_exist(table, snapshot_id, &self.files_to_delete).await?; + } + let snapshot_producer = SnapshotProducer::new( table, self.commit_uuid.unwrap_or_else(Uuid::now_v7), @@ -118,6 +131,53 @@ impl TransactionAction for ReplaceDataFilesAction { } } +impl ReplaceDataFilesAction { + async fn validate_files_exist( + table: &Table, + snapshot_id: i64, + files_to_delete: &[DataFile], + ) -> Result<()> { + let snapshot = table.metadata().snapshot_by_id(snapshot_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Snapshot {} not found", snapshot_id), + ) + })?; + + let files_to_find: std::collections::HashSet<&str> = + files_to_delete.iter().map(|f| f.file_path.as_str()).collect(); + + let manifest_list = snapshot + .load_manifest_list(table.file_io(), &table.metadata_ref()) + .await?; + + let mut found_files = std::collections::HashSet::new(); + for entry in manifest_list.entries() { + let manifest = entry.load_manifest(table.file_io()).await?; + for e in manifest.entries() { + if e.is_alive() && files_to_find.contains(e.file_path()) { + found_files.insert(e.file_path().to_string()); + } + } + } + + let missing: Vec<_> = files_to_delete + .iter() + .filter(|f| !found_files.contains(&f.file_path)) + .map(|f| f.file_path.as_str()) + .collect(); + + if !missing.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Files not found in snapshot {}: {}", snapshot_id, missing.join(", ")), + )); + } + + Ok(()) + } +} + struct ReplaceOperation { files_to_delete: Vec, } @@ -388,4 +448,57 @@ mod integration_tests { all_files.sort(); assert_eq!(all_files, vec!["data/file1_compacted.parquet", "data/file2.parquet"]); } + + #[tokio::test] + async fn test_validate_from_snapshot_success() { + let catalog = new_memory_catalog().await; + let table = make_v3_minimal_table_in_catalog(&catalog).await; + + let file1 = create_file("data/file1.parquet", 100); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![file1.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + let snapshot_id = table.metadata().current_snapshot().unwrap().snapshot_id(); + + // Replace with validation from correct snapshot + let compacted = create_file("data/compacted.parquet", 100); + let tx = Transaction::new(&table); + let action = tx + .replace_data_files() + .validate_from_snapshot(snapshot_id) + .delete_files(vec![file1]) + .add_files(vec![compacted]); + let tx = action.apply(tx).unwrap(); + let result = tx.commit(&catalog).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_validate_from_snapshot_missing_file() { + let catalog = new_memory_catalog().await; + let table = make_v3_minimal_table_in_catalog(&catalog).await; + + let file1 = create_file("data/file1.parquet", 100); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![file1.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + let snapshot_id = table.metadata().current_snapshot().unwrap().snapshot_id(); + + // Try to delete a file that doesn't exist + let nonexistent = create_file("data/nonexistent.parquet", 100); + let compacted = create_file("data/compacted.parquet", 100); + let tx = Transaction::new(&table); + let action = tx + .replace_data_files() + .validate_from_snapshot(snapshot_id) + .delete_files(vec![nonexistent]) + .add_files(vec![compacted]); + let tx = action.apply(tx).unwrap(); + let result = tx.commit(&catalog).await; + assert!(result.is_err()); + } } From 588c3e617e8c4af6ac3caab6e8135fdab7f69cd4 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Wed, 4 Feb 2026 16:56:30 +0530 Subject: [PATCH 4/5] Add data_sequence_number to ReplaceDataFilesAction --- .../src/transaction/replace_data_files.rs | 55 ++++++++++++++++++- crates/iceberg/src/transaction/snapshot.rs | 42 ++++++++++---- 2 files changed, 84 insertions(+), 13 deletions(-) diff --git a/crates/iceberg/src/transaction/replace_data_files.rs b/crates/iceberg/src/transaction/replace_data_files.rs index ec58680ff3..1ef9ad49b8 100644 --- a/crates/iceberg/src/transaction/replace_data_files.rs +++ b/crates/iceberg/src/transaction/replace_data_files.rs @@ -38,6 +38,7 @@ pub struct ReplaceDataFilesAction { files_to_delete: Vec, files_to_add: Vec, validate_from_snapshot_id: Option, + data_sequence_number: Option, } impl ReplaceDataFilesAction { @@ -49,6 +50,7 @@ impl ReplaceDataFilesAction { files_to_delete: vec![], files_to_add: vec![], validate_from_snapshot_id: None, + data_sequence_number: None, } } @@ -87,6 +89,12 @@ impl ReplaceDataFilesAction { self.validate_from_snapshot_id = Some(snapshot_id); self } + + /// Set data sequence number for new files (for handling concurrent equality deletes). + pub fn data_sequence_number(mut self, seq_num: i64) -> Self { + self.data_sequence_number = Some(seq_num); + self + } } #[async_trait] @@ -111,7 +119,7 @@ impl TransactionAction for ReplaceDataFilesAction { Self::validate_files_exist(table, snapshot_id, &self.files_to_delete).await?; } - let snapshot_producer = SnapshotProducer::new( + let mut snapshot_producer = SnapshotProducer::new( table, self.commit_uuid.unwrap_or_else(Uuid::now_v7), self.key_metadata.clone(), @@ -119,6 +127,10 @@ impl TransactionAction for ReplaceDataFilesAction { self.files_to_add.clone(), ); + if let Some(seq_num) = self.data_sequence_number { + snapshot_producer = snapshot_producer.with_data_sequence_number(seq_num); + } + snapshot_producer.validate_added_data_files()?; let replace_op = ReplaceOperation { @@ -501,4 +513,45 @@ mod integration_tests { let result = tx.commit(&catalog).await; assert!(result.is_err()); } + + #[tokio::test] + async fn test_data_sequence_number() { + let catalog = new_memory_catalog().await; + let table = make_v3_minimal_table_in_catalog(&catalog).await; + + let file1 = create_file("data/file1.parquet", 100); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![file1.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + let original_seq = table.metadata().current_snapshot().unwrap().sequence_number(); + + // Replace with custom sequence number + let compacted = create_file("data/compacted.parquet", 100); + let tx = Transaction::new(&table); + let action = tx + .replace_data_files() + .data_sequence_number(original_seq) + .delete_files(vec![file1]) + .add_files(vec![compacted]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + // Verify the new manifest entry has the custom sequence number + let snapshot = table.metadata().current_snapshot().unwrap(); + let manifest_list = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + + for entry in manifest_list.entries() { + let manifest = entry.load_manifest(table.file_io()).await.unwrap(); + for e in manifest.entries() { + if e.file_path() == "data/compacted.parquet" { + assert_eq!(e.sequence_number(), Some(original_seq)); + } + } + } + } } diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8bf26a174..28abde73c9 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -118,6 +118,8 @@ pub(crate) struct SnapshotProducer<'a> { // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). manifest_counter: RangeFrom, + // Custom data sequence number for compaction operations. + data_sequence_number: Option, } impl<'a> SnapshotProducer<'a> { @@ -136,9 +138,15 @@ impl<'a> SnapshotProducer<'a> { snapshot_properties, added_data_files, manifest_counter: (0..), + data_sequence_number: None, } } + pub(crate) fn with_data_sequence_number(mut self, seq_num: i64) -> Self { + self.data_sequence_number = Some(seq_num); + self + } + pub(crate) fn validate_added_data_files(&self) -> Result<()> { for data_file in &self.added_data_files { if data_file.content_type() != crate::spec::DataContentType::Data { @@ -300,18 +308,28 @@ impl<'a> SnapshotProducer<'a> { let snapshot_id = self.snapshot_id; let format_version = self.table.metadata().format_version(); - let manifest_entries = added_data_files.into_iter().map(|data_file| { - let builder = ManifestEntry::builder() - .status(crate::spec::ManifestStatus::Added) - .data_file(data_file); - if format_version == FormatVersion::V1 { - builder.snapshot_id(snapshot_id).build() - } else { - // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when - // commit failed. - builder.build() - } - }); + let data_sequence_number = self.data_sequence_number; + let manifest_entries: Vec<_> = added_data_files + .into_iter() + .map(|data_file| { + match (format_version, data_sequence_number) { + (FormatVersion::V1, _) => ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .snapshot_id(snapshot_id) + .data_file(data_file) + .build(), + (_, Some(seq_num)) => ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .sequence_number(seq_num) + .data_file(data_file) + .build(), + (_, None) => ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .data_file(data_file) + .build(), + } + }) + .collect(); let mut writer = self.new_manifest_writer(ManifestContentType::Data)?; for entry in manifest_entries { writer.add_entry(entry)?; From c1215fef17dbb0fddfafb688c4c9971b367f129a Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Fri, 6 Feb 2026 00:33:41 +0530 Subject: [PATCH 5/5] Fix formatting and clippy lints --- .../src/transaction/replace_data_files.rs | 50 +++++++++++++------ crates/iceberg/src/transaction/snapshot.rs | 32 ++++++------ 2 files changed, 50 insertions(+), 32 deletions(-) diff --git a/crates/iceberg/src/transaction/replace_data_files.rs b/crates/iceberg/src/transaction/replace_data_files.rs index 1ef9ad49b8..9961cfd4a7 100644 --- a/crates/iceberg/src/transaction/replace_data_files.rs +++ b/crates/iceberg/src/transaction/replace_data_files.rs @@ -149,15 +149,20 @@ impl ReplaceDataFilesAction { snapshot_id: i64, files_to_delete: &[DataFile], ) -> Result<()> { - let snapshot = table.metadata().snapshot_by_id(snapshot_id).ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!("Snapshot {} not found", snapshot_id), - ) - })?; - - let files_to_find: std::collections::HashSet<&str> = - files_to_delete.iter().map(|f| f.file_path.as_str()).collect(); + let snapshot = table + .metadata() + .snapshot_by_id(snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Snapshot {snapshot_id} not found"), + ) + })?; + + let files_to_find: std::collections::HashSet<&str> = files_to_delete + .iter() + .map(|f| f.file_path.as_str()) + .collect(); let manifest_list = snapshot .load_manifest_list(table.file_io(), &table.metadata_ref()) @@ -182,7 +187,11 @@ impl ReplaceDataFilesAction { if !missing.is_empty() { return Err(Error::new( ErrorKind::DataInvalid, - format!("Files not found in snapshot {}: {}", snapshot_id, missing.join(", ")), + format!( + "Files not found in snapshot {}: {}", + snapshot_id, + missing.join(", ") + ), )); } @@ -234,7 +243,9 @@ impl SnapshotProduceOperation for ReplaceOperation { continue; } - let manifest = entry.load_manifest(snapshot_produce.table.file_io()).await?; + let manifest = entry + .load_manifest(snapshot_produce.table.file_io()) + .await?; let has_deleted_file = manifest .entries() .iter() @@ -253,12 +264,12 @@ impl SnapshotProduceOperation for ReplaceOperation { mod tests { use std::sync::Arc; + use crate::TableUpdate; use crate::spec::{ DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Operation, Struct, }; use crate::transaction::tests::make_v2_minimal_table; use crate::transaction::{Transaction, TransactionAction}; - use crate::TableUpdate; fn create_data_file(table: &crate::table::Table, path: &str, record_count: u64) -> DataFile { DataFileBuilder::default() @@ -373,7 +384,9 @@ mod integration_tests { let file1 = create_file("data/file1.parquet", 100); let file2 = create_file("data/file2.parquet", 100); let tx = Transaction::new(&table); - let action = tx.fast_append().add_data_files(vec![file1.clone(), file2.clone()]); + let action = tx + .fast_append() + .add_data_files(vec![file1.clone(), file2.clone()]); let tx = action.apply(tx).unwrap(); let table = tx.commit(&catalog).await.unwrap(); @@ -458,7 +471,10 @@ mod integration_tests { } } all_files.sort(); - assert_eq!(all_files, vec!["data/file1_compacted.parquet", "data/file2.parquet"]); + assert_eq!(all_files, vec![ + "data/file1_compacted.parquet", + "data/file2.parquet" + ]); } #[tokio::test] @@ -525,7 +541,11 @@ mod integration_tests { let tx = action.apply(tx).unwrap(); let table = tx.commit(&catalog).await.unwrap(); - let original_seq = table.metadata().current_snapshot().unwrap().sequence_number(); + let original_seq = table + .metadata() + .current_snapshot() + .unwrap() + .sequence_number(); // Replace with custom sequence number let compacted = create_file("data/compacted.parquet", 100); diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 28abde73c9..10578bbf59 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -311,23 +311,21 @@ impl<'a> SnapshotProducer<'a> { let data_sequence_number = self.data_sequence_number; let manifest_entries: Vec<_> = added_data_files .into_iter() - .map(|data_file| { - match (format_version, data_sequence_number) { - (FormatVersion::V1, _) => ManifestEntry::builder() - .status(crate::spec::ManifestStatus::Added) - .snapshot_id(snapshot_id) - .data_file(data_file) - .build(), - (_, Some(seq_num)) => ManifestEntry::builder() - .status(crate::spec::ManifestStatus::Added) - .sequence_number(seq_num) - .data_file(data_file) - .build(), - (_, None) => ManifestEntry::builder() - .status(crate::spec::ManifestStatus::Added) - .data_file(data_file) - .build(), - } + .map(|data_file| match (format_version, data_sequence_number) { + (FormatVersion::V1, _) => ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .snapshot_id(snapshot_id) + .data_file(data_file) + .build(), + (_, Some(seq_num)) => ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .sequence_number(seq_num) + .data_file(data_file) + .build(), + (_, None) => ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .data_file(data_file) + .build(), }) .collect(); let mut writer = self.new_manifest_writer(ManifestContentType::Data)?;