diff --git a/crates/iceberg/src/spec/snapshot_summary.rs b/crates/iceberg/src/spec/snapshot_summary.rs
index c67ee37d3e..c4767f31cf 100644
--- a/crates/iceberg/src/spec/snapshot_summary.rs
+++ b/crates/iceberg/src/spec/snapshot_summary.rs
@@ -339,6 +339,7 @@ pub(crate) fn update_snapshot_summaries(
if summary.operation != Operation::Append
&& summary.operation != Operation::Overwrite
&& summary.operation != Operation::Delete
+ && summary.operation != Operation::Replace
{
return Err(Error::new(
ErrorKind::DataInvalid,
diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs
index 074c7fefe4..5c7658c382 100644
--- a/crates/iceberg/src/transaction/mod.rs
+++ b/crates/iceberg/src/transaction/mod.rs
@@ -54,6 +54,7 @@ mod action;
pub use action::*;
mod append;
+mod replace_data_files;
mod snapshot;
mod sort_order;
mod update_location;
@@ -71,6 +72,7 @@ use crate::spec::TableProperties;
use crate::table::Table;
use crate::transaction::action::BoxedTransactionAction;
use crate::transaction::append::FastAppendAction;
+use crate::transaction::replace_data_files::ReplaceDataFilesAction;
use crate::transaction::sort_order::ReplaceSortOrderAction;
use crate::transaction::update_location::UpdateLocationAction;
use crate::transaction::update_properties::UpdatePropertiesAction;
@@ -156,6 +158,11 @@ impl Transaction {
UpdateStatisticsAction::new()
}
+ /// Creates a replace data files action (for compaction).
+ pub fn replace_data_files(&self) -> ReplaceDataFilesAction {
+ ReplaceDataFilesAction::new()
+ }
+
/// Commit transaction.
pub async fn commit(self, catalog: &dyn Catalog) -> Result
{
if self.actions.is_empty() {
diff --git a/crates/iceberg/src/transaction/replace_data_files.rs b/crates/iceberg/src/transaction/replace_data_files.rs
new file mode 100644
index 0000000000..9961cfd4a7
--- /dev/null
+++ b/crates/iceberg/src/transaction/replace_data_files.rs
@@ -0,0 +1,577 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use uuid::Uuid;
+
+use crate::error::Result;
+use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation};
+use crate::table::Table;
+use crate::transaction::snapshot::{
+ DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer,
+};
+use crate::transaction::{ActionCommit, TransactionAction};
+use crate::{Error, ErrorKind};
+
+/// Action to replace data files in a table (for compaction/rewrite operations).
+pub struct ReplaceDataFilesAction {
+ commit_uuid: Option,
+ key_metadata: Option>,
+ snapshot_properties: HashMap,
+ files_to_delete: Vec,
+ files_to_add: Vec,
+ validate_from_snapshot_id: Option,
+ data_sequence_number: Option,
+}
+
+impl ReplaceDataFilesAction {
+ pub(crate) fn new() -> Self {
+ Self {
+ commit_uuid: None,
+ key_metadata: None,
+ snapshot_properties: HashMap::default(),
+ files_to_delete: vec![],
+ files_to_add: vec![],
+ validate_from_snapshot_id: None,
+ data_sequence_number: None,
+ }
+ }
+
+ /// Add files to delete (old files being replaced).
+ pub fn delete_files(mut self, files: impl IntoIterator- ) -> Self {
+ self.files_to_delete.extend(files);
+ self
+ }
+
+ /// Add files to add (new files replacing old ones).
+ pub fn add_files(mut self, files: impl IntoIterator
- ) -> Self {
+ self.files_to_add.extend(files);
+ self
+ }
+
+ /// Set commit UUID.
+ pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self {
+ self.commit_uuid = Some(commit_uuid);
+ self
+ }
+
+ /// Set key metadata for manifest files.
+ pub fn set_key_metadata(mut self, key_metadata: Vec) -> Self {
+ self.key_metadata = Some(key_metadata);
+ self
+ }
+
+ /// Set snapshot summary properties.
+ pub fn set_snapshot_properties(mut self, props: HashMap) -> Self {
+ self.snapshot_properties = props;
+ self
+ }
+
+ /// Validate that files to delete exist from this snapshot.
+ pub fn validate_from_snapshot(mut self, snapshot_id: i64) -> Self {
+ self.validate_from_snapshot_id = Some(snapshot_id);
+ self
+ }
+
+ /// Set data sequence number for new files (for handling concurrent equality deletes).
+ pub fn data_sequence_number(mut self, seq_num: i64) -> Self {
+ self.data_sequence_number = Some(seq_num);
+ self
+ }
+}
+
+#[async_trait]
+impl TransactionAction for ReplaceDataFilesAction {
+ async fn commit(self: Arc, table: &Table) -> Result {
+ if self.files_to_delete.is_empty() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Replace operation requires files to delete",
+ ));
+ }
+
+ if self.files_to_add.is_empty() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ "Replace operation requires files to add",
+ ));
+ }
+
+ // Validate files exist from specified snapshot
+ if let Some(snapshot_id) = self.validate_from_snapshot_id {
+ Self::validate_files_exist(table, snapshot_id, &self.files_to_delete).await?;
+ }
+
+ let mut snapshot_producer = SnapshotProducer::new(
+ table,
+ self.commit_uuid.unwrap_or_else(Uuid::now_v7),
+ self.key_metadata.clone(),
+ self.snapshot_properties.clone(),
+ self.files_to_add.clone(),
+ );
+
+ if let Some(seq_num) = self.data_sequence_number {
+ snapshot_producer = snapshot_producer.with_data_sequence_number(seq_num);
+ }
+
+ snapshot_producer.validate_added_data_files()?;
+
+ let replace_op = ReplaceOperation {
+ files_to_delete: self.files_to_delete.clone(),
+ };
+
+ snapshot_producer
+ .commit(replace_op, DefaultManifestProcess)
+ .await
+ }
+}
+
+impl ReplaceDataFilesAction {
+ async fn validate_files_exist(
+ table: &Table,
+ snapshot_id: i64,
+ files_to_delete: &[DataFile],
+ ) -> Result<()> {
+ let snapshot = table
+ .metadata()
+ .snapshot_by_id(snapshot_id)
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Snapshot {snapshot_id} not found"),
+ )
+ })?;
+
+ let files_to_find: std::collections::HashSet<&str> = files_to_delete
+ .iter()
+ .map(|f| f.file_path.as_str())
+ .collect();
+
+ let manifest_list = snapshot
+ .load_manifest_list(table.file_io(), &table.metadata_ref())
+ .await?;
+
+ let mut found_files = std::collections::HashSet::new();
+ for entry in manifest_list.entries() {
+ let manifest = entry.load_manifest(table.file_io()).await?;
+ for e in manifest.entries() {
+ if e.is_alive() && files_to_find.contains(e.file_path()) {
+ found_files.insert(e.file_path().to_string());
+ }
+ }
+ }
+
+ let missing: Vec<_> = files_to_delete
+ .iter()
+ .filter(|f| !found_files.contains(&f.file_path))
+ .map(|f| f.file_path.as_str())
+ .collect();
+
+ if !missing.is_empty() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Files not found in snapshot {}: {}",
+ snapshot_id,
+ missing.join(", ")
+ ),
+ ));
+ }
+
+ Ok(())
+ }
+}
+
+struct ReplaceOperation {
+ files_to_delete: Vec,
+}
+
+impl SnapshotProduceOperation for ReplaceOperation {
+ fn operation(&self) -> Operation {
+ Operation::Replace
+ }
+
+ async fn delete_entries(
+ &self,
+ _snapshot_produce: &SnapshotProducer<'_>,
+ ) -> Result> {
+ Ok(vec![])
+ }
+
+ async fn existing_manifest(
+ &self,
+ snapshot_produce: &SnapshotProducer<'_>,
+ ) -> Result> {
+ let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else {
+ return Ok(vec![]);
+ };
+
+ let files_to_delete: std::collections::HashSet<&str> = self
+ .files_to_delete
+ .iter()
+ .map(|f| f.file_path.as_str())
+ .collect();
+
+ let manifest_list = snapshot
+ .load_manifest_list(
+ snapshot_produce.table.file_io(),
+ &snapshot_produce.table.metadata_ref(),
+ )
+ .await?;
+
+ // Include existing manifests that don't contain deleted files
+ let mut result = Vec::new();
+ for entry in manifest_list.entries() {
+ if !entry.has_added_files() && !entry.has_existing_files() {
+ continue;
+ }
+
+ let manifest = entry
+ .load_manifest(snapshot_produce.table.file_io())
+ .await?;
+ let has_deleted_file = manifest
+ .entries()
+ .iter()
+ .any(|e| e.is_alive() && files_to_delete.contains(e.file_path()));
+
+ if !has_deleted_file {
+ result.push(entry.clone());
+ }
+ }
+
+ Ok(result)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use crate::TableUpdate;
+ use crate::spec::{
+ DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Operation, Struct,
+ };
+ use crate::transaction::tests::make_v2_minimal_table;
+ use crate::transaction::{Transaction, TransactionAction};
+
+ fn create_data_file(table: &crate::table::Table, path: &str, record_count: u64) -> DataFile {
+ DataFileBuilder::default()
+ .content(DataContentType::Data)
+ .file_path(path.to_string())
+ .file_format(DataFileFormat::Parquet)
+ .file_size_in_bytes(100)
+ .record_count(record_count)
+ .partition_spec_id(table.metadata().default_partition_spec_id())
+ .partition(Struct::from_iter([Some(Literal::long(300))]))
+ .build()
+ .unwrap()
+ }
+
+ #[tokio::test]
+ async fn test_replace_data_files_basic() {
+ let table = make_v2_minimal_table();
+ let tx = Transaction::new(&table);
+
+ let old_file = create_data_file(&table, "data/old.parquet", 100);
+ let new_file = create_data_file(&table, "data/new.parquet", 100);
+
+ let action = tx
+ .replace_data_files()
+ .delete_files(vec![old_file])
+ .add_files(vec![new_file]);
+
+ let result = Arc::new(action).commit(&table).await;
+ assert!(result.is_ok());
+ }
+
+ #[tokio::test]
+ async fn test_replace_data_files_empty_deletes_fails() {
+ let table = make_v2_minimal_table();
+ let tx = Transaction::new(&table);
+
+ let new_file = create_data_file(&table, "data/new.parquet", 100);
+
+ let action = tx.replace_data_files().add_files(vec![new_file]);
+
+ let result = Arc::new(action).commit(&table).await;
+ assert!(result.is_err());
+ }
+
+ #[tokio::test]
+ async fn test_replace_data_files_empty_adds_fails() {
+ let table = make_v2_minimal_table();
+ let tx = Transaction::new(&table);
+
+ let old_file = create_data_file(&table, "data/old.parquet", 100);
+
+ let action = tx.replace_data_files().delete_files(vec![old_file]);
+
+ let result = Arc::new(action).commit(&table).await;
+ assert!(result.is_err());
+ }
+
+ #[tokio::test]
+ async fn test_replace_uses_replace_operation() {
+ let table = make_v2_minimal_table();
+ let tx = Transaction::new(&table);
+
+ let old_file = create_data_file(&table, "data/old.parquet", 100);
+ let new_file = create_data_file(&table, "data/new.parquet", 100);
+
+ let action = tx
+ .replace_data_files()
+ .delete_files(vec![old_file])
+ .add_files(vec![new_file]);
+
+ let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
+ let updates = action_commit.take_updates();
+
+ let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
+ snapshot
+ } else {
+ panic!("Expected AddSnapshot");
+ };
+
+ assert_eq!(new_snapshot.summary().operation, Operation::Replace);
+ }
+}
+
+#[cfg(test)]
+mod integration_tests {
+ use crate::memory::tests::new_memory_catalog;
+ use crate::spec::{
+ DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Operation, Struct,
+ };
+ use crate::transaction::tests::make_v3_minimal_table_in_catalog;
+ use crate::transaction::{ApplyTransactionAction, Transaction};
+
+ fn create_file(path: &str, record_count: u64) -> DataFile {
+ DataFileBuilder::default()
+ .content(DataContentType::Data)
+ .file_path(path.to_string())
+ .file_format(DataFileFormat::Parquet)
+ .file_size_in_bytes(100)
+ .record_count(record_count)
+ .partition(Struct::from_iter([Some(Literal::long(0))]))
+ .partition_spec_id(0)
+ .build()
+ .unwrap()
+ }
+
+ #[tokio::test]
+ async fn test_replace_after_append() {
+ let catalog = new_memory_catalog().await;
+ let table = make_v3_minimal_table_in_catalog(&catalog).await;
+
+ // First append some files
+ let file1 = create_file("data/file1.parquet", 100);
+ let file2 = create_file("data/file2.parquet", 100);
+ let tx = Transaction::new(&table);
+ let action = tx
+ .fast_append()
+ .add_data_files(vec![file1.clone(), file2.clone()]);
+ let tx = action.apply(tx).unwrap();
+ let table = tx.commit(&catalog).await.unwrap();
+
+ assert_eq!(table.metadata().snapshots().count(), 1);
+
+ // Now replace file1 and file2 with a single compacted file
+ let compacted = create_file("data/compacted.parquet", 200);
+ let tx = Transaction::new(&table);
+ let action = tx
+ .replace_data_files()
+ .delete_files(vec![file1, file2])
+ .add_files(vec![compacted]);
+ let tx = action.apply(tx).unwrap();
+ let table = tx.commit(&catalog).await.unwrap();
+
+ assert_eq!(table.metadata().snapshots().count(), 2);
+ let snapshot = table.metadata().current_snapshot().unwrap();
+ assert_eq!(snapshot.summary().operation, Operation::Replace);
+
+ // Verify manifest has only the compacted file
+ let manifest_list = snapshot
+ .load_manifest_list(table.file_io(), table.metadata())
+ .await
+ .unwrap();
+ assert_eq!(manifest_list.entries().len(), 1);
+
+ let manifest = manifest_list.entries()[0]
+ .load_manifest(table.file_io())
+ .await
+ .unwrap();
+ assert_eq!(manifest.entries().len(), 1);
+ assert_eq!(manifest.entries()[0].file_path(), "data/compacted.parquet");
+ }
+
+ #[tokio::test]
+ async fn test_replace_preserves_unrelated_files() {
+ let catalog = new_memory_catalog().await;
+ let table = make_v3_minimal_table_in_catalog(&catalog).await;
+
+ // Append file1
+ let file1 = create_file("data/file1.parquet", 100);
+ let tx = Transaction::new(&table);
+ let action = tx.fast_append().add_data_files(vec![file1.clone()]);
+ let tx = action.apply(tx).unwrap();
+ let table = tx.commit(&catalog).await.unwrap();
+
+ // Append file2 in separate transaction
+ let file2 = create_file("data/file2.parquet", 100);
+ let tx = Transaction::new(&table);
+ let action = tx.fast_append().add_data_files(vec![file2.clone()]);
+ let tx = action.apply(tx).unwrap();
+ let table = tx.commit(&catalog).await.unwrap();
+
+ // Replace only file1
+ let file1_compacted = create_file("data/file1_compacted.parquet", 100);
+ let tx = Transaction::new(&table);
+ let action = tx
+ .replace_data_files()
+ .delete_files(vec![file1])
+ .add_files(vec![file1_compacted]);
+ let tx = action.apply(tx).unwrap();
+ let table = tx.commit(&catalog).await.unwrap();
+
+ // Verify both file2 manifest and new compacted manifest exist
+ let snapshot = table.metadata().current_snapshot().unwrap();
+ let manifest_list = snapshot
+ .load_manifest_list(table.file_io(), table.metadata())
+ .await
+ .unwrap();
+
+ // Should have 2 manifests: one for file2 (preserved), one for compacted
+ assert_eq!(manifest_list.entries().len(), 2);
+
+ // Collect all file paths
+ let mut all_files = Vec::new();
+ for entry in manifest_list.entries() {
+ let manifest = entry.load_manifest(table.file_io()).await.unwrap();
+ for e in manifest.entries() {
+ if e.is_alive() {
+ all_files.push(e.file_path().to_string());
+ }
+ }
+ }
+ all_files.sort();
+ assert_eq!(all_files, vec![
+ "data/file1_compacted.parquet",
+ "data/file2.parquet"
+ ]);
+ }
+
+ #[tokio::test]
+ async fn test_validate_from_snapshot_success() {
+ let catalog = new_memory_catalog().await;
+ let table = make_v3_minimal_table_in_catalog(&catalog).await;
+
+ let file1 = create_file("data/file1.parquet", 100);
+ let tx = Transaction::new(&table);
+ let action = tx.fast_append().add_data_files(vec![file1.clone()]);
+ let tx = action.apply(tx).unwrap();
+ let table = tx.commit(&catalog).await.unwrap();
+
+ let snapshot_id = table.metadata().current_snapshot().unwrap().snapshot_id();
+
+ // Replace with validation from correct snapshot
+ let compacted = create_file("data/compacted.parquet", 100);
+ let tx = Transaction::new(&table);
+ let action = tx
+ .replace_data_files()
+ .validate_from_snapshot(snapshot_id)
+ .delete_files(vec![file1])
+ .add_files(vec![compacted]);
+ let tx = action.apply(tx).unwrap();
+ let result = tx.commit(&catalog).await;
+ assert!(result.is_ok());
+ }
+
+ #[tokio::test]
+ async fn test_validate_from_snapshot_missing_file() {
+ let catalog = new_memory_catalog().await;
+ let table = make_v3_minimal_table_in_catalog(&catalog).await;
+
+ let file1 = create_file("data/file1.parquet", 100);
+ let tx = Transaction::new(&table);
+ let action = tx.fast_append().add_data_files(vec![file1.clone()]);
+ let tx = action.apply(tx).unwrap();
+ let table = tx.commit(&catalog).await.unwrap();
+
+ let snapshot_id = table.metadata().current_snapshot().unwrap().snapshot_id();
+
+ // Try to delete a file that doesn't exist
+ let nonexistent = create_file("data/nonexistent.parquet", 100);
+ let compacted = create_file("data/compacted.parquet", 100);
+ let tx = Transaction::new(&table);
+ let action = tx
+ .replace_data_files()
+ .validate_from_snapshot(snapshot_id)
+ .delete_files(vec![nonexistent])
+ .add_files(vec![compacted]);
+ let tx = action.apply(tx).unwrap();
+ let result = tx.commit(&catalog).await;
+ assert!(result.is_err());
+ }
+
+ #[tokio::test]
+ async fn test_data_sequence_number() {
+ let catalog = new_memory_catalog().await;
+ let table = make_v3_minimal_table_in_catalog(&catalog).await;
+
+ let file1 = create_file("data/file1.parquet", 100);
+ let tx = Transaction::new(&table);
+ let action = tx.fast_append().add_data_files(vec![file1.clone()]);
+ let tx = action.apply(tx).unwrap();
+ let table = tx.commit(&catalog).await.unwrap();
+
+ let original_seq = table
+ .metadata()
+ .current_snapshot()
+ .unwrap()
+ .sequence_number();
+
+ // Replace with custom sequence number
+ let compacted = create_file("data/compacted.parquet", 100);
+ let tx = Transaction::new(&table);
+ let action = tx
+ .replace_data_files()
+ .data_sequence_number(original_seq)
+ .delete_files(vec![file1])
+ .add_files(vec![compacted]);
+ let tx = action.apply(tx).unwrap();
+ let table = tx.commit(&catalog).await.unwrap();
+
+ // Verify the new manifest entry has the custom sequence number
+ let snapshot = table.metadata().current_snapshot().unwrap();
+ let manifest_list = snapshot
+ .load_manifest_list(table.file_io(), table.metadata())
+ .await
+ .unwrap();
+
+ for entry in manifest_list.entries() {
+ let manifest = entry.load_manifest(table.file_io()).await.unwrap();
+ for e in manifest.entries() {
+ if e.file_path() == "data/compacted.parquet" {
+ assert_eq!(e.sequence_number(), Some(original_seq));
+ }
+ }
+ }
+ }
+}
diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs
index c8bf26a174..10578bbf59 100644
--- a/crates/iceberg/src/transaction/snapshot.rs
+++ b/crates/iceberg/src/transaction/snapshot.rs
@@ -118,6 +118,8 @@ pub(crate) struct SnapshotProducer<'a> {
// It starts from 0 and increments for each new manifest file.
// Note: This counter is limited to the range of (0..u64::MAX).
manifest_counter: RangeFrom,
+ // Custom data sequence number for compaction operations.
+ data_sequence_number: Option,
}
impl<'a> SnapshotProducer<'a> {
@@ -136,9 +138,15 @@ impl<'a> SnapshotProducer<'a> {
snapshot_properties,
added_data_files,
manifest_counter: (0..),
+ data_sequence_number: None,
}
}
+ pub(crate) fn with_data_sequence_number(mut self, seq_num: i64) -> Self {
+ self.data_sequence_number = Some(seq_num);
+ self
+ }
+
pub(crate) fn validate_added_data_files(&self) -> Result<()> {
for data_file in &self.added_data_files {
if data_file.content_type() != crate::spec::DataContentType::Data {
@@ -300,18 +308,26 @@ impl<'a> SnapshotProducer<'a> {
let snapshot_id = self.snapshot_id;
let format_version = self.table.metadata().format_version();
- let manifest_entries = added_data_files.into_iter().map(|data_file| {
- let builder = ManifestEntry::builder()
- .status(crate::spec::ManifestStatus::Added)
- .data_file(data_file);
- if format_version == FormatVersion::V1 {
- builder.snapshot_id(snapshot_id).build()
- } else {
- // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when
- // commit failed.
- builder.build()
- }
- });
+ let data_sequence_number = self.data_sequence_number;
+ let manifest_entries: Vec<_> = added_data_files
+ .into_iter()
+ .map(|data_file| match (format_version, data_sequence_number) {
+ (FormatVersion::V1, _) => ManifestEntry::builder()
+ .status(crate::spec::ManifestStatus::Added)
+ .snapshot_id(snapshot_id)
+ .data_file(data_file)
+ .build(),
+ (_, Some(seq_num)) => ManifestEntry::builder()
+ .status(crate::spec::ManifestStatus::Added)
+ .sequence_number(seq_num)
+ .data_file(data_file)
+ .build(),
+ (_, None) => ManifestEntry::builder()
+ .status(crate::spec::ManifestStatus::Added)
+ .data_file(data_file)
+ .build(),
+ })
+ .collect();
let mut writer = self.new_manifest_writer(ManifestContentType::Data)?;
for entry in manifest_entries {
writer.add_entry(entry)?;