diff --git a/crates/iceberg/src/spec/snapshot_summary.rs b/crates/iceberg/src/spec/snapshot_summary.rs index c67ee37d3e..c4767f31cf 100644 --- a/crates/iceberg/src/spec/snapshot_summary.rs +++ b/crates/iceberg/src/spec/snapshot_summary.rs @@ -339,6 +339,7 @@ pub(crate) fn update_snapshot_summaries( if summary.operation != Operation::Append && summary.operation != Operation::Overwrite && summary.operation != Operation::Delete + && summary.operation != Operation::Replace { return Err(Error::new( ErrorKind::DataInvalid, diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 074c7fefe4..5c7658c382 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -54,6 +54,7 @@ mod action; pub use action::*; mod append; +mod replace_data_files; mod snapshot; mod sort_order; mod update_location; @@ -71,6 +72,7 @@ use crate::spec::TableProperties; use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; +use crate::transaction::replace_data_files::ReplaceDataFilesAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; @@ -156,6 +158,11 @@ impl Transaction { UpdateStatisticsAction::new() } + /// Creates a replace data files action (for compaction). + pub fn replace_data_files(&self) -> ReplaceDataFilesAction { + ReplaceDataFilesAction::new() + } + /// Commit transaction. pub async fn commit(self, catalog: &dyn Catalog) -> Result { if self.actions.is_empty() { diff --git a/crates/iceberg/src/transaction/replace_data_files.rs b/crates/iceberg/src/transaction/replace_data_files.rs new file mode 100644 index 0000000000..9961cfd4a7 --- /dev/null +++ b/crates/iceberg/src/transaction/replace_data_files.rs @@ -0,0 +1,577 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use uuid::Uuid; + +use crate::error::Result; +use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation}; +use crate::table::Table; +use crate::transaction::snapshot::{ + DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, +}; +use crate::transaction::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind}; + +/// Action to replace data files in a table (for compaction/rewrite operations). +pub struct ReplaceDataFilesAction { + commit_uuid: Option, + key_metadata: Option>, + snapshot_properties: HashMap, + files_to_delete: Vec, + files_to_add: Vec, + validate_from_snapshot_id: Option, + data_sequence_number: Option, +} + +impl ReplaceDataFilesAction { + pub(crate) fn new() -> Self { + Self { + commit_uuid: None, + key_metadata: None, + snapshot_properties: HashMap::default(), + files_to_delete: vec![], + files_to_add: vec![], + validate_from_snapshot_id: None, + data_sequence_number: None, + } + } + + /// Add files to delete (old files being replaced). + pub fn delete_files(mut self, files: impl IntoIterator) -> Self { + self.files_to_delete.extend(files); + self + } + + /// Add files to add (new files replacing old ones). + pub fn add_files(mut self, files: impl IntoIterator) -> Self { + self.files_to_add.extend(files); + self + } + + /// Set commit UUID. + pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self { + self.commit_uuid = Some(commit_uuid); + self + } + + /// Set key metadata for manifest files. + pub fn set_key_metadata(mut self, key_metadata: Vec) -> Self { + self.key_metadata = Some(key_metadata); + self + } + + /// Set snapshot summary properties. + pub fn set_snapshot_properties(mut self, props: HashMap) -> Self { + self.snapshot_properties = props; + self + } + + /// Validate that files to delete exist from this snapshot. + pub fn validate_from_snapshot(mut self, snapshot_id: i64) -> Self { + self.validate_from_snapshot_id = Some(snapshot_id); + self + } + + /// Set data sequence number for new files (for handling concurrent equality deletes). + pub fn data_sequence_number(mut self, seq_num: i64) -> Self { + self.data_sequence_number = Some(seq_num); + self + } +} + +#[async_trait] +impl TransactionAction for ReplaceDataFilesAction { + async fn commit(self: Arc, table: &Table) -> Result { + if self.files_to_delete.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Replace operation requires files to delete", + )); + } + + if self.files_to_add.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Replace operation requires files to add", + )); + } + + // Validate files exist from specified snapshot + if let Some(snapshot_id) = self.validate_from_snapshot_id { + Self::validate_files_exist(table, snapshot_id, &self.files_to_delete).await?; + } + + let mut snapshot_producer = SnapshotProducer::new( + table, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + self.key_metadata.clone(), + self.snapshot_properties.clone(), + self.files_to_add.clone(), + ); + + if let Some(seq_num) = self.data_sequence_number { + snapshot_producer = snapshot_producer.with_data_sequence_number(seq_num); + } + + snapshot_producer.validate_added_data_files()?; + + let replace_op = ReplaceOperation { + files_to_delete: self.files_to_delete.clone(), + }; + + snapshot_producer + .commit(replace_op, DefaultManifestProcess) + .await + } +} + +impl ReplaceDataFilesAction { + async fn validate_files_exist( + table: &Table, + snapshot_id: i64, + files_to_delete: &[DataFile], + ) -> Result<()> { + let snapshot = table + .metadata() + .snapshot_by_id(snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Snapshot {snapshot_id} not found"), + ) + })?; + + let files_to_find: std::collections::HashSet<&str> = files_to_delete + .iter() + .map(|f| f.file_path.as_str()) + .collect(); + + let manifest_list = snapshot + .load_manifest_list(table.file_io(), &table.metadata_ref()) + .await?; + + let mut found_files = std::collections::HashSet::new(); + for entry in manifest_list.entries() { + let manifest = entry.load_manifest(table.file_io()).await?; + for e in manifest.entries() { + if e.is_alive() && files_to_find.contains(e.file_path()) { + found_files.insert(e.file_path().to_string()); + } + } + } + + let missing: Vec<_> = files_to_delete + .iter() + .filter(|f| !found_files.contains(&f.file_path)) + .map(|f| f.file_path.as_str()) + .collect(); + + if !missing.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Files not found in snapshot {}: {}", + snapshot_id, + missing.join(", ") + ), + )); + } + + Ok(()) + } +} + +struct ReplaceOperation { + files_to_delete: Vec, +} + +impl SnapshotProduceOperation for ReplaceOperation { + fn operation(&self) -> Operation { + Operation::Replace + } + + async fn delete_entries( + &self, + _snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + Ok(vec![]) + } + + async fn existing_manifest( + &self, + snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else { + return Ok(vec![]); + }; + + let files_to_delete: std::collections::HashSet<&str> = self + .files_to_delete + .iter() + .map(|f| f.file_path.as_str()) + .collect(); + + let manifest_list = snapshot + .load_manifest_list( + snapshot_produce.table.file_io(), + &snapshot_produce.table.metadata_ref(), + ) + .await?; + + // Include existing manifests that don't contain deleted files + let mut result = Vec::new(); + for entry in manifest_list.entries() { + if !entry.has_added_files() && !entry.has_existing_files() { + continue; + } + + let manifest = entry + .load_manifest(snapshot_produce.table.file_io()) + .await?; + let has_deleted_file = manifest + .entries() + .iter() + .any(|e| e.is_alive() && files_to_delete.contains(e.file_path())); + + if !has_deleted_file { + result.push(entry.clone()); + } + } + + Ok(result) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::TableUpdate; + use crate::spec::{ + DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Operation, Struct, + }; + use crate::transaction::tests::make_v2_minimal_table; + use crate::transaction::{Transaction, TransactionAction}; + + fn create_data_file(table: &crate::table::Table, path: &str, record_count: u64) -> DataFile { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(path.to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(record_count) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap() + } + + #[tokio::test] + async fn test_replace_data_files_basic() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let old_file = create_data_file(&table, "data/old.parquet", 100); + let new_file = create_data_file(&table, "data/new.parquet", 100); + + let action = tx + .replace_data_files() + .delete_files(vec![old_file]) + .add_files(vec![new_file]); + + let result = Arc::new(action).commit(&table).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_replace_data_files_empty_deletes_fails() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let new_file = create_data_file(&table, "data/new.parquet", 100); + + let action = tx.replace_data_files().add_files(vec![new_file]); + + let result = Arc::new(action).commit(&table).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_replace_data_files_empty_adds_fails() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let old_file = create_data_file(&table, "data/old.parquet", 100); + + let action = tx.replace_data_files().delete_files(vec![old_file]); + + let result = Arc::new(action).commit(&table).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_replace_uses_replace_operation() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let old_file = create_data_file(&table, "data/old.parquet", 100); + let new_file = create_data_file(&table, "data/new.parquet", 100); + + let action = tx + .replace_data_files() + .delete_files(vec![old_file]) + .add_files(vec![new_file]); + + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + snapshot + } else { + panic!("Expected AddSnapshot"); + }; + + assert_eq!(new_snapshot.summary().operation, Operation::Replace); + } +} + +#[cfg(test)] +mod integration_tests { + use crate::memory::tests::new_memory_catalog; + use crate::spec::{ + DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Operation, Struct, + }; + use crate::transaction::tests::make_v3_minimal_table_in_catalog; + use crate::transaction::{ApplyTransactionAction, Transaction}; + + fn create_file(path: &str, record_count: u64) -> DataFile { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(path.to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(record_count) + .partition(Struct::from_iter([Some(Literal::long(0))])) + .partition_spec_id(0) + .build() + .unwrap() + } + + #[tokio::test] + async fn test_replace_after_append() { + let catalog = new_memory_catalog().await; + let table = make_v3_minimal_table_in_catalog(&catalog).await; + + // First append some files + let file1 = create_file("data/file1.parquet", 100); + let file2 = create_file("data/file2.parquet", 100); + let tx = Transaction::new(&table); + let action = tx + .fast_append() + .add_data_files(vec![file1.clone(), file2.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + assert_eq!(table.metadata().snapshots().count(), 1); + + // Now replace file1 and file2 with a single compacted file + let compacted = create_file("data/compacted.parquet", 200); + let tx = Transaction::new(&table); + let action = tx + .replace_data_files() + .delete_files(vec![file1, file2]) + .add_files(vec![compacted]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + assert_eq!(table.metadata().snapshots().count(), 2); + let snapshot = table.metadata().current_snapshot().unwrap(); + assert_eq!(snapshot.summary().operation, Operation::Replace); + + // Verify manifest has only the compacted file + let manifest_list = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 1); + + let manifest = manifest_list.entries()[0] + .load_manifest(table.file_io()) + .await + .unwrap(); + assert_eq!(manifest.entries().len(), 1); + assert_eq!(manifest.entries()[0].file_path(), "data/compacted.parquet"); + } + + #[tokio::test] + async fn test_replace_preserves_unrelated_files() { + let catalog = new_memory_catalog().await; + let table = make_v3_minimal_table_in_catalog(&catalog).await; + + // Append file1 + let file1 = create_file("data/file1.parquet", 100); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![file1.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + // Append file2 in separate transaction + let file2 = create_file("data/file2.parquet", 100); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![file2.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + // Replace only file1 + let file1_compacted = create_file("data/file1_compacted.parquet", 100); + let tx = Transaction::new(&table); + let action = tx + .replace_data_files() + .delete_files(vec![file1]) + .add_files(vec![file1_compacted]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + // Verify both file2 manifest and new compacted manifest exist + let snapshot = table.metadata().current_snapshot().unwrap(); + let manifest_list = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + + // Should have 2 manifests: one for file2 (preserved), one for compacted + assert_eq!(manifest_list.entries().len(), 2); + + // Collect all file paths + let mut all_files = Vec::new(); + for entry in manifest_list.entries() { + let manifest = entry.load_manifest(table.file_io()).await.unwrap(); + for e in manifest.entries() { + if e.is_alive() { + all_files.push(e.file_path().to_string()); + } + } + } + all_files.sort(); + assert_eq!(all_files, vec![ + "data/file1_compacted.parquet", + "data/file2.parquet" + ]); + } + + #[tokio::test] + async fn test_validate_from_snapshot_success() { + let catalog = new_memory_catalog().await; + let table = make_v3_minimal_table_in_catalog(&catalog).await; + + let file1 = create_file("data/file1.parquet", 100); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![file1.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + let snapshot_id = table.metadata().current_snapshot().unwrap().snapshot_id(); + + // Replace with validation from correct snapshot + let compacted = create_file("data/compacted.parquet", 100); + let tx = Transaction::new(&table); + let action = tx + .replace_data_files() + .validate_from_snapshot(snapshot_id) + .delete_files(vec![file1]) + .add_files(vec![compacted]); + let tx = action.apply(tx).unwrap(); + let result = tx.commit(&catalog).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_validate_from_snapshot_missing_file() { + let catalog = new_memory_catalog().await; + let table = make_v3_minimal_table_in_catalog(&catalog).await; + + let file1 = create_file("data/file1.parquet", 100); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![file1.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + let snapshot_id = table.metadata().current_snapshot().unwrap().snapshot_id(); + + // Try to delete a file that doesn't exist + let nonexistent = create_file("data/nonexistent.parquet", 100); + let compacted = create_file("data/compacted.parquet", 100); + let tx = Transaction::new(&table); + let action = tx + .replace_data_files() + .validate_from_snapshot(snapshot_id) + .delete_files(vec![nonexistent]) + .add_files(vec![compacted]); + let tx = action.apply(tx).unwrap(); + let result = tx.commit(&catalog).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_data_sequence_number() { + let catalog = new_memory_catalog().await; + let table = make_v3_minimal_table_in_catalog(&catalog).await; + + let file1 = create_file("data/file1.parquet", 100); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![file1.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + let original_seq = table + .metadata() + .current_snapshot() + .unwrap() + .sequence_number(); + + // Replace with custom sequence number + let compacted = create_file("data/compacted.parquet", 100); + let tx = Transaction::new(&table); + let action = tx + .replace_data_files() + .data_sequence_number(original_seq) + .delete_files(vec![file1]) + .add_files(vec![compacted]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + // Verify the new manifest entry has the custom sequence number + let snapshot = table.metadata().current_snapshot().unwrap(); + let manifest_list = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + + for entry in manifest_list.entries() { + let manifest = entry.load_manifest(table.file_io()).await.unwrap(); + for e in manifest.entries() { + if e.file_path() == "data/compacted.parquet" { + assert_eq!(e.sequence_number(), Some(original_seq)); + } + } + } + } +} diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8bf26a174..10578bbf59 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -118,6 +118,8 @@ pub(crate) struct SnapshotProducer<'a> { // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). manifest_counter: RangeFrom, + // Custom data sequence number for compaction operations. + data_sequence_number: Option, } impl<'a> SnapshotProducer<'a> { @@ -136,9 +138,15 @@ impl<'a> SnapshotProducer<'a> { snapshot_properties, added_data_files, manifest_counter: (0..), + data_sequence_number: None, } } + pub(crate) fn with_data_sequence_number(mut self, seq_num: i64) -> Self { + self.data_sequence_number = Some(seq_num); + self + } + pub(crate) fn validate_added_data_files(&self) -> Result<()> { for data_file in &self.added_data_files { if data_file.content_type() != crate::spec::DataContentType::Data { @@ -300,18 +308,26 @@ impl<'a> SnapshotProducer<'a> { let snapshot_id = self.snapshot_id; let format_version = self.table.metadata().format_version(); - let manifest_entries = added_data_files.into_iter().map(|data_file| { - let builder = ManifestEntry::builder() - .status(crate::spec::ManifestStatus::Added) - .data_file(data_file); - if format_version == FormatVersion::V1 { - builder.snapshot_id(snapshot_id).build() - } else { - // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when - // commit failed. - builder.build() - } - }); + let data_sequence_number = self.data_sequence_number; + let manifest_entries: Vec<_> = added_data_files + .into_iter() + .map(|data_file| match (format_version, data_sequence_number) { + (FormatVersion::V1, _) => ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .snapshot_id(snapshot_id) + .data_file(data_file) + .build(), + (_, Some(seq_num)) => ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .sequence_number(seq_num) + .data_file(data_file) + .build(), + (_, None) => ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .data_file(data_file) + .build(), + }) + .collect(); let mut writer = self.new_manifest_writer(ManifestContentType::Data)?; for entry in manifest_entries { writer.add_entry(entry)?;