diff --git a/Project.toml b/Project.toml index 6790cdb..2fe379d 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "RustyIceberg" uuid = "390bdf5b-b624-43dc-a846-0ef7a3405804" -version = "0.8.2" +version = "0.8.3" [deps] Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45" diff --git a/iceberg_rust_ffi/Cargo.lock b/iceberg_rust_ffi/Cargo.lock index cd3c359..23558e2 100644 --- a/iceberg_rust_ffi/Cargo.lock +++ b/iceberg_rust_ffi/Cargo.lock @@ -109,7 +109,7 @@ dependencies = [ "miniz_oxide", "num-bigint", "quad-rand", - "rand 0.9.2", + "rand 0.9.4", "regex-lite", "serde", "serde_bytes", @@ -1362,7 +1362,7 @@ dependencies = [ "idna", "ipnet", "once_cell", - "rand 0.9.2", + "rand 0.9.4", "ring", "thiserror 2.0.18", "tinyvec", @@ -1405,7 +1405,7 @@ dependencies = [ "moka", "once_cell", "parking_lot", - "rand 0.9.2", + "rand 0.9.4", "resolv-conf", "smallvec", "thiserror 2.0.18", @@ -1566,7 +1566,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.9.0" -source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=418213731e91544f5eb31a3efa459e88f599030e#418213731e91544f5eb31a3efa459e88f599030e" +source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=27f68f7ea4ac4a3e51bf8c7f644648ccca235170#27f68f7ea4ac4a3e51bf8c7f644648ccca235170" dependencies = [ "aes-gcm", "anyhow", @@ -1600,7 +1600,7 @@ dependencies = [ "opendal", "ordered-float 4.6.0", "parquet", - "rand 0.8.5", + "rand 0.9.4", "reqsign", "reqwest", "roaring", @@ -1623,7 +1623,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.9.0" -source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=418213731e91544f5eb31a3efa459e88f599030e#418213731e91544f5eb31a3efa459e88f599030e" +source = "git+https://github.com/RelationalAI/iceberg-rust.git?rev=27f68f7ea4ac4a3e51bf8c7f644648ccca235170#27f68f7ea4ac4a3e51bf8c7f644648ccca235170" dependencies = [ "async-trait", "chrono", @@ -1641,7 +1641,7 @@ dependencies = [ [[package]] name = "iceberg_rust_ffi" -version = "0.8.2" +version = "0.8.3" dependencies = [ "anyhow", "arrow-array", @@ -1658,6 +1658,7 @@ dependencies = [ "object_store_ffi", "once_cell", "parquet", + "serde", "serde_json", "strum 0.26.3", "tempfile", @@ -2726,7 +2727,7 @@ dependencies = [ "bytes", "getrandom 0.3.4", "lru-slab", - "rand 0.9.2", + "rand 0.9.4", "ring", "rustc-hash", "rustls", @@ -2790,9 +2791,9 @@ dependencies = [ [[package]] name = "rand" -version = "0.9.2" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" +checksum = "44c5af06bb1b7d3216d91932aed5265164bf384dc89cd6ba05cf59a35f5f76ea" dependencies = [ "rand_chacha 0.9.0", "rand_core 0.9.5", diff --git a/iceberg_rust_ffi/Cargo.toml b/iceberg_rust_ffi/Cargo.toml index 85a4c99..c99705f 100644 --- a/iceberg_rust_ffi/Cargo.toml +++ b/iceberg_rust_ffi/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iceberg_rust_ffi" -version = "0.8.2" +version = "0.8.3" edition = "2021" [lib] @@ -12,8 +12,8 @@ default = ["julia"] julia = [] [dependencies] -iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "418213731e91544f5eb31a3efa459e88f599030e", features = ["storage-azdls"] } -iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "418213731e91544f5eb31a3efa459e88f599030e" } +iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "27f68f7ea4ac4a3e51bf8c7f644648ccca235170", features = ["storage-azdls"] } +iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "27f68f7ea4ac4a3e51bf8c7f644648ccca235170" } object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false } tokio = { version = "1.0", features = ["full"] } futures = "0.3" @@ -31,6 +31,7 @@ async-trait = "0.1" # Pin home to 0.5.9 to support rustc < 1.88 # (0.5.11+ requires rustc 1.88+) home = "=0.5.9" +serde = { version = "1.0", features = ["derive"] } serde_json = "1.0.142" strum = { version = "0.26", features = ["derive"] } diff --git a/iceberg_rust_ffi/src/data_file.rs b/iceberg_rust_ffi/src/data_file.rs new file mode 100644 index 0000000..24d6276 --- /dev/null +++ b/iceberg_rust_ffi/src/data_file.rs @@ -0,0 +1,178 @@ +/// DataFiles FFI — opaque handle, inspection helpers, and table listing. +use std::collections::HashMap; +use std::ffi::c_void; + +use iceberg::spec::{DataContentType, DataFile, Datum, ManifestContentType}; +use object_store_ffi::{ + export_runtime_op, with_cancellation, CResult, NotifyGuard, ResponseGuard, RT, +}; + +use crate::error_codes::{classified_error, IcebergErrorCode}; +use crate::response::IcebergBoxedResponse; + +/// Opaque handle for a collection of data files. +/// +/// Produced by writers and by `iceberg_table_list_data_files`. Consumed +/// (files moved out) by `iceberg_fast_append_action_add_data_files`, +/// `iceberg_overwrite_action_add_data_files`, and +/// `iceberg_overwrite_action_delete_data_files`. Free with +/// `iceberg_data_files_free` after use. +pub struct IcebergDataFiles { + pub data_files: Vec, +} + +unsafe impl Send for IcebergDataFiles {} +unsafe impl Sync for IcebergDataFiles {} + +/// Type alias for async operations that return a DataFiles handle. +pub type IcebergDataFilesResponse = IcebergBoxedResponse; + +/// Free a data files handle. +#[no_mangle] +pub extern "C" fn iceberg_data_files_free(data_files: *mut IcebergDataFiles) { + if !data_files.is_null() { + unsafe { + let _ = Box::from_raw(data_files); + } + } +} + +/// Return the number of data files in the handle (0 if null). +#[no_mangle] +pub extern "C" fn iceberg_data_files_len(data_files: *const IcebergDataFiles) -> usize { + if data_files.is_null() { + return 0; + } + unsafe { (*data_files).data_files.len() } +} + +/// Return a JSON array of objects describing every data file in the handle. +/// +/// Each object contains all metadata fields from the Iceberg DataFile spec: +/// content, file_path, file_format, record_count, file_size_in_bytes, +/// column_sizes, value_counts, null_value_counts, nan_value_counts, +/// lower_bounds, upper_bounds, split_offsets, sort_order_id, equality_ids, +/// first_row_id, referenced_data_file, content_offset, content_size_in_bytes. +/// +/// Returns null on error or if `data_files` is null. +/// The caller must free the returned string with `iceberg_destroy_cstring`. +#[no_mangle] +pub extern "C" fn iceberg_data_files_to_json( + data_files: *const IcebergDataFiles, +) -> *mut std::ffi::c_char { + #[derive(serde::Serialize)] + struct DataFileJson<'a> { + content: &'static str, + file_path: &'a str, + file_format: String, + record_count: u64, + file_size_in_bytes: u64, + column_sizes: &'a HashMap, + value_counts: &'a HashMap, + null_value_counts: &'a HashMap, + nan_value_counts: &'a HashMap, + lower_bounds: HashMap, + upper_bounds: HashMap, + split_offsets: Option<&'a [i64]>, + sort_order_id: Option, + equality_ids: Option>, + first_row_id: Option, + referenced_data_file: Option, + content_offset: Option, + content_size_in_bytes: Option, + } + + if data_files.is_null() { + return std::ptr::null_mut(); + } + let df_ref = unsafe { &*data_files }; + + let entries: Vec = df_ref + .data_files + .iter() + .map(|f| DataFileJson { + content: match f.content_type() { + DataContentType::Data => "data", + DataContentType::PositionDeletes => "position_deletes", + DataContentType::EqualityDeletes => "equality_deletes", + }, + file_path: f.file_path(), + file_format: f.file_format().to_string(), + record_count: f.record_count(), + file_size_in_bytes: f.file_size_in_bytes(), + column_sizes: f.column_sizes(), + value_counts: f.value_counts(), + null_value_counts: f.null_value_counts(), + nan_value_counts: f.nan_value_counts(), + lower_bounds: f.lower_bounds().iter().map(|(k, v)| (*k, v)).collect(), + upper_bounds: f.upper_bounds().iter().map(|(k, v)| (*k, v)).collect(), + split_offsets: f.split_offsets(), + sort_order_id: f.sort_order_id(), + equality_ids: f.equality_ids(), + first_row_id: f.first_row_id(), + referenced_data_file: f.referenced_data_file(), + content_offset: f.content_offset(), + content_size_in_bytes: f.content_size_in_bytes(), + }) + .collect(); + + match serde_json::to_string(&entries) { + Ok(json) => match std::ffi::CString::new(json) { + Ok(c_str) => c_str.into_raw(), + Err(_) => std::ptr::null_mut(), + }, + Err(_) => std::ptr::null_mut(), + } +} + +export_runtime_op!( + iceberg_table_list_data_files, + crate::IcebergDataFilesResponse, + || { + if table.is_null() { + return Err(classified_error( + IcebergErrorCode::STATE_RESOURCE_FREED, + "Resource has been freed", + "Null table pointer provided", + )); + } + let table_ref = unsafe { &*table }; + Ok(table_ref) + }, + table_ref, + async { + let table = &table_ref.table; + let Some(snapshot) = table.metadata().current_snapshot() else { + return Ok::(IcebergDataFiles { + data_files: vec![], + }); + }; + + let manifest_list = snapshot + .load_manifest_list(table.file_io(), &table.metadata_ref()) + .await + .map_err(|e| anyhow::anyhow!("Failed to load manifest list: {e}"))?; + + let mut data_files = Vec::new(); + for manifest_entry in manifest_list.entries() { + if manifest_entry.content != ManifestContentType::Data { + continue; + } + if !manifest_entry.has_added_files() && !manifest_entry.has_existing_files() { + continue; + } + let manifest = manifest_entry + .load_manifest(table.file_io()) + .await + .map_err(|e| anyhow::anyhow!("Failed to load manifest: {e}"))?; + for entry in manifest.entries() { + if entry.is_alive() { + data_files.push(entry.data_file().clone()); + } + } + } + + Ok::(IcebergDataFiles { data_files }) + }, + table: *mut crate::table::IcebergTable +); diff --git a/iceberg_rust_ffi/src/full_pipeline.rs b/iceberg_rust_ffi/src/full_pipeline.rs index 320af92..02ce1ab 100644 --- a/iceberg_rust_ffi/src/full_pipeline.rs +++ b/iceberg_rust_ffi/src/full_pipeline.rs @@ -35,7 +35,7 @@ fn read_one_full_scan_file( task: FileScanTask, ) -> iceberg::Result { let task_stream = Box::pin(futures::stream::once(async { Ok(task) })); - reader.read(task_stream) + reader.read(task_stream).map(|r| r.stream()) } /// Full-scan entry point — wraps `create_nested_pipeline` with a closure diff --git a/iceberg_rust_ffi/src/incremental.rs b/iceberg_rust_ffi/src/incremental.rs index f299b64..900208e 100644 --- a/iceberg_rust_ffi/src/incremental.rs +++ b/iceberg_rust_ffi/src/incremental.rs @@ -218,7 +218,8 @@ export_runtime_op!( let (scan_ref, serialization_concurrency) = result_tuple; // Get unzipped streams (separate append and delete streams) - let (inserts_stream, deletes_stream) = scan_ref.to_unzipped_arrow().await.map_err(|e| classify_iceberg(e))?; + let result = scan_ref.to_unzipped_arrow().await.map_err(|e| classify_iceberg(e))?; + let (inserts_stream, deletes_stream) = (result.appends, result.deletes); // Transform both streams with parallel serialization let inserts = IcebergArrowStream { diff --git a/iceberg_rust_ffi/src/incremental_pipeline.rs b/iceberg_rust_ffi/src/incremental_pipeline.rs index 6613328..283909b 100644 --- a/iceberg_rust_ffi/src/incremental_pipeline.rs +++ b/iceberg_rust_ffi/src/incremental_pipeline.rs @@ -17,7 +17,7 @@ //! - delete stream: flat Arrow stream of delete records use futures::{StreamExt, TryStreamExt}; -use iceberg::arrow::{ArrowReader, StreamsInto, UnzippedIncrementalBatchRecordStream}; +use iceberg::arrow::{ArrowReader, StreamsInto, UnzippedIncrementalScanResult}; use iceberg::io::FileIO; use iceberg::scan::incremental::{AppendedFileScanTask, DeleteScanTask}; use tokio::sync::Mutex as AsyncMutex; @@ -35,12 +35,11 @@ fn read_one_append_file( ) -> iceberg::Result { let append = futures::stream::once(async { Ok(task) }).boxed(); let delete = futures::stream::empty::>().boxed(); - let (arrow_stream, _): UnzippedIncrementalBatchRecordStream = - StreamsInto::::stream( - (append, delete), - reader, - )?; - Ok(arrow_stream) + let UnzippedIncrementalScanResult { appends, .. } = StreamsInto::< + ArrowReader, + UnzippedIncrementalScanResult, + >::stream((append, delete), reader)?; + Ok(appends) } /// Build the nested incremental pipeline. @@ -81,15 +80,17 @@ pub async fn create_incremental_nested_pipeline( // Delete stream: StreamsInto with empty append stream routes all delete // tasks through the iceberg reader machinery. - let (_, delete_arrow): UnzippedIncrementalBatchRecordStream = - StreamsInto::::stream( - ( - futures::stream::empty::>().boxed(), - delete_tasks, - ), - build_reader(file_io, batch_size), - ) - .map_err(|e| anyhow::anyhow!("Failed to create delete stream: {e}"))?; + let UnzippedIncrementalScanResult { + deletes: delete_arrow, + .. + } = StreamsInto::::stream( + ( + futures::stream::empty::>().boxed(), + delete_tasks, + ), + build_reader(file_io, batch_size), + ) + .map_err(|e| anyhow::anyhow!("Failed to create delete stream: {e}"))?; let delete_stream = IcebergArrowStream { stream: AsyncMutex::new(crate::transform_stream_with_parallel_serialization( diff --git a/iceberg_rust_ffi/src/lib.rs b/iceberg_rust_ffi/src/lib.rs index ef56b94..0af0106 100644 --- a/iceberg_rust_ffi/src/lib.rs +++ b/iceberg_rust_ffi/src/lib.rs @@ -34,6 +34,9 @@ mod table; // Transaction module mod transaction; +// DataFiles handle, inspection helpers, and table listing +mod data_file; + // Writer module mod writer; @@ -69,6 +72,7 @@ pub use error_codes::{classified_error, classify, ClassifiedError}; // Re-export types and functions from submodules pub use catalog::{IcebergBoolResponse, IcebergCatalog, IcebergCatalogResponse}; +pub use data_file::{IcebergDataFiles, IcebergDataFilesResponse}; pub use full::IcebergScan; pub use incremental::{IcebergIncrementalScan, IcebergUnzippedStreamsResponse}; pub use response::{ @@ -80,7 +84,7 @@ pub use table::{ IcebergFileScan, IcebergFileScanResponse, IcebergFileScanStream, IcebergFileScanStreamResponse, IcebergTable, IcebergTableResponse, }; -pub use transaction::{IcebergDataFiles, IcebergTransaction, IcebergTransactionResponse}; +pub use transaction::{IcebergOverwriteAction, IcebergTransaction, IcebergTransactionResponse}; pub use writer::{ IcebergDataFileWriter, IcebergDataFileWriterResponse, IcebergWriterCloseResponse, }; diff --git a/iceberg_rust_ffi/src/transaction.rs b/iceberg_rust_ffi/src/transaction.rs index 5e33cc5..4efa95e 100644 --- a/iceberg_rust_ffi/src/transaction.rs +++ b/iceberg_rust_ffi/src/transaction.rs @@ -7,6 +7,7 @@ use crate::catalog::IcebergCatalog; use crate::error_codes::{classified_error, classify_iceberg, IcebergErrorCode}; use crate::response::IcebergBoxedResponse; use crate::table::IcebergTable; +use crate::IcebergDataFiles; use iceberg::spec::DataFile; use iceberg::transaction::{ApplyTransactionAction, Transaction}; @@ -44,15 +45,6 @@ impl IcebergTransaction { } } -/// Opaque handle for data files produced by a writer -/// This holds the Vec that results from closing a writer -pub struct IcebergDataFiles { - pub data_files: Vec, -} - -unsafe impl Send for IcebergDataFiles {} -unsafe impl Sync for IcebergDataFiles {} - /// Opaque handle for accumulating data files for a FastAppendAction /// /// Since iceberg-rust's FastAppendAction is not publicly exported, we store @@ -83,6 +75,22 @@ impl IcebergFastAppendAction { } } +/// Opaque handle accumulating data files for an OverwriteAction. +#[derive(Default)] +pub struct IcebergOverwriteAction { + added_files: Vec, + deleted_files: Vec, +} + +unsafe impl Send for IcebergOverwriteAction {} +unsafe impl Sync for IcebergOverwriteAction {} + +impl IcebergOverwriteAction { + pub fn new() -> Self { + Self::default() + } +} + /// Type alias for transaction response pub type IcebergTransactionResponse = IcebergBoxedResponse; @@ -96,16 +104,6 @@ pub extern "C" fn iceberg_transaction_free(transaction: *mut IcebergTransaction) } } -/// Free data files handle -#[no_mangle] -pub extern "C" fn iceberg_data_files_free(data_files: *mut IcebergDataFiles) { - if !data_files.is_null() { - unsafe { - let _ = Box::from_raw(data_files); - } - } -} - /// Free a fast append action #[no_mangle] pub extern "C" fn iceberg_fast_append_action_free(action: *mut IcebergFastAppendAction) { @@ -310,3 +308,161 @@ export_runtime_op!( transaction: *mut IcebergTransaction, catalog: *mut IcebergCatalog ); + +/// Free an overwrite action. +#[no_mangle] +pub extern "C" fn iceberg_overwrite_action_free(action: *mut IcebergOverwriteAction) { + if !action.is_null() { + unsafe { + let _ = Box::from_raw(action); + } + } +} + +/// Create a new OverwriteAction. +/// +/// # Safety +/// The returned action must be freed with `iceberg_overwrite_action_free` when no longer needed. +#[no_mangle] +pub extern "C" fn iceberg_overwrite_action_new() -> *mut IcebergOverwriteAction { + Box::into_raw(Box::new(IcebergOverwriteAction::new())) +} + +/// Add data files to write in the overwrite snapshot. +/// +/// The data_files handle is consumed — its files are moved into the action. +/// Returns 0 on success, non-zero on error. +#[no_mangle] +pub extern "C" fn iceberg_overwrite_action_add_data_files( + action: *mut IcebergOverwriteAction, + data_files: *mut IcebergDataFiles, + error_message_out: *mut *mut std::ffi::c_char, +) -> i32 { + let set_error = |msg: &str, out: *mut *mut std::ffi::c_char| { + if !out.is_null() { + if let Ok(c_str) = std::ffi::CString::new(msg) { + unsafe { + *out = c_str.into_raw(); + } + } + } + }; + if action.is_null() { + set_error("Null action pointer provided", error_message_out); + return 1; + } + if data_files.is_null() { + set_error("Null data_files pointer provided", error_message_out); + return 1; + } + let action_ref = unsafe { &mut *action }; + let df_ref = unsafe { &mut *data_files }; + action_ref + .added_files + .extend(std::mem::take(&mut df_ref.data_files)); + 0 +} + +/// Mark data files for deletion in the overwrite snapshot. +/// +/// The data_files handle is consumed — its files are moved into the action. +/// Returns 0 on success, non-zero on error. +#[no_mangle] +pub extern "C" fn iceberg_overwrite_action_delete_data_files( + action: *mut IcebergOverwriteAction, + data_files: *mut IcebergDataFiles, + error_message_out: *mut *mut std::ffi::c_char, +) -> i32 { + let set_error = |msg: &str, out: *mut *mut std::ffi::c_char| { + if !out.is_null() { + if let Ok(c_str) = std::ffi::CString::new(msg) { + unsafe { + *out = c_str.into_raw(); + } + } + } + }; + if action.is_null() { + set_error("Null action pointer provided", error_message_out); + return 1; + } + if data_files.is_null() { + set_error("Null data_files pointer provided", error_message_out); + return 1; + } + let action_ref = unsafe { &mut *action }; + let df_ref = unsafe { &mut *data_files }; + action_ref + .deleted_files + .extend(std::mem::take(&mut df_ref.data_files)); + 0 +} + +/// Apply an OverwriteAction to a transaction. +/// +/// Consumes the action's file lists and applies them to the transaction via +/// `Transaction::overwrite()`. The action handle should be freed after this call. +/// Returns 0 on success, non-zero on error. +#[no_mangle] +pub extern "C" fn iceberg_overwrite_action_apply( + action: *mut IcebergOverwriteAction, + transaction: *mut IcebergTransaction, + error_message_out: *mut *mut std::ffi::c_char, +) -> i32 { + let set_error = |msg: &str, out: *mut *mut std::ffi::c_char| { + if !out.is_null() { + if let Ok(c_str) = std::ffi::CString::new(msg) { + unsafe { + *out = c_str.into_raw(); + } + } + } + }; + if action.is_null() { + set_error("Null action pointer provided", error_message_out); + return 1; + } + if transaction.is_null() { + set_error("Null transaction pointer provided", error_message_out); + return 1; + } + let action_ref = unsafe { &mut *action }; + let tx_ref = unsafe { &mut *transaction }; + + let added = std::mem::take(&mut action_ref.added_files); + let deleted = std::mem::take(&mut action_ref.deleted_files); + + let tx = match tx_ref.take() { + Some(t) => t, + None => { + set_error( + &format!( + "{}\t{}\tTransaction already consumed", + IcebergErrorCode::STATE_TRANSACTION_CONSUMED as u32, + "Transaction has already been committed or rolled back" + ), + error_message_out, + ); + return 1; + } + }; + + let overwrite_action = tx + .overwrite() + .add_data_files(added) + .delete_data_files(deleted); + + match overwrite_action.apply(tx) { + Ok(new_tx) => { + tx_ref.replace(new_tx); + 0 + } + Err(e) => { + set_error( + &format!("Failed to apply overwrite: {}", e), + error_message_out, + ); + 1 + } + } +} diff --git a/iceberg_rust_ffi/src/writer.rs b/iceberg_rust_ffi/src/writer.rs index 6c0db60..5003344 100644 --- a/iceberg_rust_ffi/src/writer.rs +++ b/iceberg_rust_ffi/src/writer.rs @@ -100,7 +100,6 @@ use crate::error_codes::{classified_error, classify, classify_iceberg, IcebergEr use crate::record_batch_builder::{RecordBatchBuilder, DEFAULT_COALESCE_ROWS}; use crate::response::IcebergBoxedResponse; use crate::table::IcebergTable; -use crate::transaction::IcebergDataFiles; use crate::util::parse_c_string; use crate::writer_columns::{ ColumnSlice, COLUMN_TYPE_BOOLEAN, COLUMN_TYPE_DECIMAL_INT128, COLUMN_TYPE_DECIMAL_INT32, @@ -109,6 +108,7 @@ use crate::writer_columns::{ COLUMN_TYPE_JULIA_TIMESTAMPTZ, COLUMN_TYPE_JULIA_TIMESTAMPTZ_NS, COLUMN_TYPE_JULIA_TIMESTAMP_NS, COLUMN_TYPE_STRING, COLUMN_TYPE_UUID, }; +use crate::IcebergDataFiles; use object_store_ffi::{ export_runtime_op, with_cancellation, CResult, NotifyGuard, ResponseGuard, RT, }; diff --git a/src/RustyIceberg.jl b/src/RustyIceberg.jl index b68f935..7eb9c2a 100644 --- a/src/RustyIceberg.jl +++ b/src/RustyIceberg.jl @@ -45,8 +45,9 @@ export IcebergBoolean, IcebergInt, IcebergLong, IcebergFloat, IcebergDouble export IcebergDate, IcebergTime, IcebergTimestamp, IcebergTimestamptz export IcebergTimestampNs, IcebergTimestamptzNs export IcebergString, IcebergUuid, IcebergBinary, IcebergDecimal -export Transaction, DataFiles, free_transaction!, free_data_files!, commit, transaction +export Transaction, DataFiles, free_transaction!, free_data_files!, commit, transaction, data_file_info export FastAppendAction, free_fast_append_action!, add_data_files, apply, with_fast_append +export OverwriteAction, free_overwrite_action!, delete_data_files, with_overwrite, list_data_files export DataFileWriter, free_writer!, close_writer, set_encode_workers! export WriterConfig, CompressionCodec, UNCOMPRESSED, SNAPPY, GZIP, LZ4, ZSTD, LZ4_RAW export RowChunk, flush! diff --git a/src/data_file.jl b/src/data_file.jl index db1d3df..77002e9 100644 --- a/src/data_file.jl +++ b/src/data_file.jl @@ -31,3 +31,46 @@ function free_data_files!(df::DataFiles) df.ptr = C_NULL return nothing end + +""" + Base.length(df::DataFiles) -> Int + +Return the number of data files in the handle (0 if the handle is null/freed). +""" +function Base.length(df::DataFiles) + df.ptr == C_NULL && return 0 + return Int(@ccall rust_lib.iceberg_data_files_len(df.ptr::Ptr{Cvoid})::Csize_t) +end + +""" + data_file_info(df::DataFiles) -> Vector{Dict{String,Any}} + +Return a `Vector` of `Dict`s, one per data file, containing all Iceberg +`DataFile` metadata fields: + +- `content` — `"data"`, `"position_deletes"`, or `"equality_deletes"` +- `file_path` — full URI of the file +- `file_format` — `"parquet"`, `"avro"`, `"orc"`, etc. +- `record_count` — number of records in the file +- `file_size_in_bytes` — total file size +- `column_sizes`, `value_counts`, `null_value_counts`, `nan_value_counts` — per-column stats maps (keys are field-id strings) +- `lower_bounds`, `upper_bounds` — per-column min/max bounds +- `split_offsets` — row-group offsets (Parquet), or `nothing` +- `sort_order_id`, `equality_ids`, `first_row_id`, `referenced_data_file`, `content_offset`, `content_size_in_bytes` — optional fields + +Returns an empty vector if the handle is null/freed. +""" +function data_file_info(df::DataFiles) + df.ptr == C_NULL && return Dict{String,Any}[] + ptr = @ccall rust_lib.iceberg_data_files_to_json(df.ptr::Ptr{Cvoid})::Ptr{Cchar} + if ptr == C_NULL + throw(IcebergException( + INTERNAL, + "Internal error (please report this as a bug)", + "iceberg_data_files_to_json returned null", + )) + end + json_str = unsafe_string(ptr) + @ccall rust_lib.iceberg_destroy_cstring(ptr::Ptr{Cchar})::Cint + return JSON.parse(json_str) +end diff --git a/src/transaction.jl b/src/transaction.jl index 771d619..1996be8 100644 --- a/src/transaction.jl +++ b/src/transaction.jl @@ -350,3 +350,171 @@ function with_fast_append(f::Function, tx::Transaction) end return nothing end + +# --------------------------------------------------------------------------- +# OverwriteAction +# --------------------------------------------------------------------------- + +const DataFilesResponse = Response{Ptr{Cvoid}} + +""" + OverwriteAction + +Opaque handle that accumulates files to add and files to delete for an atomic +overwrite snapshot (`Operation::Overwrite`). + +Create with `OverwriteAction()`, populate with `add_data_files` and +`delete_data_files`, then commit with `apply`. Use `with_overwrite` for +automatic cleanup. +""" +mutable struct OverwriteAction + ptr::Ptr{Cvoid} +end + +function OverwriteAction() + ptr = @ccall rust_lib.iceberg_overwrite_action_new()::Ptr{Cvoid} + if ptr == C_NULL + throw(IcebergException(STATE_RESOURCE_FREED, "Resource has been freed", "iceberg_overwrite_action_new returned null")) + end + return OverwriteAction(ptr) +end + +function free_overwrite_action!(action::OverwriteAction) + if action.ptr == C_NULL + return nothing + end + @ccall rust_lib.iceberg_overwrite_action_free(action.ptr::Ptr{Cvoid})::Cvoid + action.ptr = C_NULL + return nothing +end + +""" + add_data_files(action::OverwriteAction, data_files::DataFiles) + +Add new data files to be written in the overwrite snapshot. +The `data_files` handle is consumed by this call. +""" +function add_data_files(action::OverwriteAction, data_files::DataFiles) + if action.ptr == C_NULL + throw(IcebergException(STATE_RESOURCE_FREED, "Resource has been freed", "OverwriteAction has been freed")) + end + if data_files.ptr == C_NULL + throw(IcebergException(STATE_RESOURCE_FREED, "Resource has been freed", "DataFiles has been freed or consumed")) + end + error_message_ptr = Ref{Ptr{Cchar}}(C_NULL) + try + result = @ccall rust_lib.iceberg_overwrite_action_add_data_files( + action.ptr::Ptr{Cvoid}, + data_files.ptr::Ptr{Cvoid}, + error_message_ptr::Ref{Ptr{Cchar}} + )::Cint + if result != 0 + parse_and_throw(error_message_ptr[], "overwrite add_data_files") + end + finally + free_data_files!(data_files) + end + return nothing +end + +""" + delete_data_files(action::OverwriteAction, data_files::DataFiles) + +Mark existing data files for deletion in the overwrite snapshot. +The `data_files` handle is consumed by this call. +""" +function delete_data_files(action::OverwriteAction, data_files::DataFiles) + if action.ptr == C_NULL + throw(IcebergException(STATE_RESOURCE_FREED, "Resource has been freed", "OverwriteAction has been freed")) + end + if data_files.ptr == C_NULL + throw(IcebergException(STATE_RESOURCE_FREED, "Resource has been freed", "DataFiles has been freed or consumed")) + end + error_message_ptr = Ref{Ptr{Cchar}}(C_NULL) + try + result = @ccall rust_lib.iceberg_overwrite_action_delete_data_files( + action.ptr::Ptr{Cvoid}, + data_files.ptr::Ptr{Cvoid}, + error_message_ptr::Ref{Ptr{Cchar}} + )::Cint + if result != 0 + parse_and_throw(error_message_ptr[], "overwrite delete_data_files") + end + finally + free_data_files!(data_files) + end + return nothing +end + +""" + apply(action::OverwriteAction, tx::Transaction) + +Apply the overwrite action to the transaction. +""" +function apply(action::OverwriteAction, tx::Transaction) + if action.ptr == C_NULL + throw(IcebergException(STATE_RESOURCE_FREED, "Resource has been freed", "OverwriteAction has been freed")) + end + if tx.ptr == C_NULL + throw(IcebergException(STATE_TRANSACTION_CONSUMED, "Transaction has already been committed or rolled back", "Transaction has been freed or consumed")) + end + error_message_ptr = Ref{Ptr{Cchar}}(C_NULL) + result = @ccall rust_lib.iceberg_overwrite_action_apply( + action.ptr::Ptr{Cvoid}, + tx.ptr::Ptr{Cvoid}, + error_message_ptr::Ref{Ptr{Cchar}} + )::Cint + if result != 0 + parse_and_throw(error_message_ptr[], "overwrite apply") + end + return nothing +end + +""" + with_overwrite(f::Function, tx::Transaction) + +Create an `OverwriteAction`, pass it to `f`, then apply it to `tx`. +Frees the action automatically even on error. + +# Example +```julia +updated_table = with_transaction(table, catalog) do tx + with_overwrite(tx) do action + add_data_files(action, new_files) + delete_data_files(action, old_files) + end +end +``` +""" +function with_overwrite(f::Function, tx::Transaction) + action = OverwriteAction() + try + f(action) + apply(action, tx) + finally + free_overwrite_action!(action) + end + return nothing +end + +""" + list_data_files(table::Table) -> DataFiles + +Return a `DataFiles` handle containing all live data files in the current +snapshot of `table`. Returns an empty handle if the table has no snapshot. + +The returned `DataFiles` must be freed with `free_data_files!` if not passed +to `delete_data_files`. +""" +function list_data_files(table::Table) + response = DataFilesResponse() + async_ccall(response, table) do handle + @ccall rust_lib.iceberg_table_list_data_files( + table::Table, + response::Ref{DataFilesResponse}, + handle::Ptr{Cvoid} + )::Cint + end + @throw_on_error(response, "list_data_files", IcebergException) + return DataFiles(response.value) +end diff --git a/test/overwrite_tests.jl b/test/overwrite_tests.jl new file mode 100644 index 0000000..98a82f4 --- /dev/null +++ b/test/overwrite_tests.jl @@ -0,0 +1,408 @@ +using RustyIceberg +using Test + +# --------------------------------------------------------------------------- +# helpers +# --------------------------------------------------------------------------- + +function _ow_schema() + Schema([ + Field(Int32(1), "id", "long"; required=true), + Field(Int32(2), "value", "double"; required=false), + ]) +end + +function _write_and_append(table, catalog, data; prefix="data") + files = RustyIceberg.with_data_file_writer(table; prefix) do w + write(w, data) + end + RustyIceberg.with_transaction(table, catalog) do tx + with_fast_append(tx) do action + add_data_files(action, files) + end + end +end + +# --------------------------------------------------------------------------- +# OverwriteAction lifecycle +# --------------------------------------------------------------------------- + +@testset "OverwriteAction lifecycle" begin + action = RustyIceberg.OverwriteAction() + @test action.ptr != C_NULL + + free_overwrite_action!(action) + @test action.ptr == C_NULL + + # double-free must be a no-op + free_overwrite_action!(action) + @test action.ptr == C_NULL + println("✅ OverwriteAction lifecycle") +end + +# --------------------------------------------------------------------------- +# list_data_files +# --------------------------------------------------------------------------- + +@testset "list_data_files on empty table" begin + mktempdir() do warehouse + cat = catalog_create_memory(warehouse) + table = C_NULL + try + create_namespace(cat, ["ns"]) + table = create_table(cat, ["ns"], "t", _ow_schema()) + + df = list_data_files(table) + @test length(df) == 0 + @test isempty(data_file_info(df)) + free_data_files!(df) + @test df.ptr == C_NULL + finally + table != C_NULL && free_table(table) + free_catalog!(cat) + end + end + println("✅ list_data_files on empty table returns empty handle") +end + +@testset "list_data_files after append" begin + mktempdir() do warehouse + cat = catalog_create_memory(warehouse) + table = C_NULL + updated = C_NULL + try + create_namespace(cat, ["ns"]) + table = create_table(cat, ["ns"], "t", _ow_schema()) + updated = _write_and_append(table, cat, + (id=Int64[1,2,3], value=[1.1,2.2,3.3])) + + listed = list_data_files(updated) + @test length(listed) == 1 + + info = data_file_info(listed) + @test length(info) == 1 + f = info[1] + @test f["content"] == "data" + @test f["file_format"] == "parquet" + @test f["record_count"] == 3 + @test f["file_size_in_bytes"] > 0 + @test endswith(f["file_path"], ".parquet") + + free_data_files!(listed) + finally + table != C_NULL && free_table(table) + updated != C_NULL && free_table(updated) + free_catalog!(cat) + end + end + println("✅ list_data_files after append returns correct file metadata") +end + +# --------------------------------------------------------------------------- +# full overwrite — replace ALL files +# --------------------------------------------------------------------------- + +@testset "Overwrite replaces all existing files" begin + mktempdir() do warehouse + cat = catalog_create_memory(warehouse) + table = C_NULL; v1 = C_NULL; v2 = C_NULL; v3 = C_NULL + try + create_namespace(cat, ["ns"]) + table = create_table(cat, ["ns"], "t", _ow_schema()) + + # two appends so we have two data files to delete + v1 = _write_and_append(table, cat, + (id=Int64[1,2,3], value=[1.1,2.2,3.3]); prefix="a") + v2 = _write_and_append(v1, cat, + (id=Int64[4,5], value=[4.4,5.5]); prefix="b") + + before = read_table_data(v2) + @test length(before.id) == 5 + + snap_before = table_current_snapshot_id(v2) + @test !isnothing(snap_before) + + old_files = list_data_files(v2) + @test length(old_files) == 2 + @test sum(f["record_count"] for f in data_file_info(old_files)) == 5 + + new_files = RustyIceberg.with_data_file_writer(v2; prefix="new") do w + write(w, (id=Int64[10,20], value=[10.0,20.0])) + end + + v3 = RustyIceberg.with_transaction(v2, cat) do tx + with_overwrite(tx) do action + add_data_files(action, new_files) + delete_data_files(action, old_files) + end + end + + snap_after = table_current_snapshot_id(v3) + @test !isnothing(snap_after) + @test snap_after != snap_before + + after = read_table_data(v3) + @test !isnothing(after) + @test length(after.id) == 2 + @test sort(after.id) == [10, 20] + finally + table != C_NULL && free_table(table) + v1 != C_NULL && free_table(v1) + v2 != C_NULL && free_table(v2) + v3 != C_NULL && free_table(v3) + free_catalog!(cat) + end + end + println("✅ Overwrite replaces all existing files") +end + +# --------------------------------------------------------------------------- +# partial overwrite — delete only the first file, second file survives intact +# --------------------------------------------------------------------------- + +@testset "Overwrite only deletes explicitly listed files" begin + mktempdir() do warehouse + cat = catalog_create_memory(warehouse) + table = C_NULL; v1 = C_NULL; v2 = C_NULL; v3 = C_NULL + try + create_namespace(cat, ["ns"]) + table = create_table(cat, ["ns"], "t", _ow_schema()) + + # append two separate files + v1 = _write_and_append(table, cat, + (id=Int64[1,2,3], value=[1.0,2.0,3.0]); prefix="first") + v2 = _write_and_append(v1, cat, + (id=Int64[4,5], value=[4.0,5.0]); prefix="second") + + @test length(read_table_data(v2).id) == 5 + + # list_data_files on v1 returns only the first file + files_from_v1 = list_data_files(v1) + @test length(files_from_v1) == 1 + @test data_file_info(files_from_v1)[1]["record_count"] == 3 + + # replacement for the first file + new_file = RustyIceberg.with_data_file_writer(v2; prefix="new") do w + write(w, (id=Int64[10,11,12], value=[10.0,11.0,12.0])) + end + + # overwrite: delete only the first file, add replacement + # the second file (rows 4,5) is NOT in the delete list → it survives + v3 = RustyIceberg.with_transaction(v2, cat) do tx + with_overwrite(tx) do action + add_data_files(action, new_file) + delete_data_files(action, files_from_v1) + end + end + + after = read_table_data(v3) + @test !isnothing(after) + @test length(after.id) == 5 # 2 surviving + 3 new + @test sort(after.id) == [4, 5, 10, 11, 12] + finally + table != C_NULL && free_table(table) + v1 != C_NULL && free_table(v1) + v2 != C_NULL && free_table(v2) + v3 != C_NULL && free_table(v3) + free_catalog!(cat) + end + end + println("✅ Overwrite only deletes explicitly listed files; others survive") +end + +# --------------------------------------------------------------------------- +# overwrite with no deletes (add-only via Overwrite snapshot kind) +# --------------------------------------------------------------------------- + +@testset "Overwrite add-only produces a new snapshot" begin + mktempdir() do warehouse + cat = catalog_create_memory(warehouse) + table = C_NULL; updated = C_NULL + try + create_namespace(cat, ["ns"]) + table = create_table(cat, ["ns"], "t", _ow_schema()) + + new_files = RustyIceberg.with_data_file_writer(table) do w + write(w, (id=Int64[1], value=[1.0])) + end + + updated = RustyIceberg.with_transaction(table, cat) do tx + with_overwrite(tx) do action + add_data_files(action, new_files) + end + end + + @test !isnothing(table_current_snapshot_id(updated)) + data = read_table_data(updated) + @test length(data.id) == 1 + finally + table != C_NULL && free_table(table) + updated != C_NULL && free_table(updated) + free_catalog!(cat) + end + end + println("✅ Overwrite add-only produces a new snapshot") +end + +# --------------------------------------------------------------------------- +# two sequential overwrites +# --------------------------------------------------------------------------- + +@testset "Two sequential overwrites" begin + mktempdir() do warehouse + cat = catalog_create_memory(warehouse) + table = C_NULL; v1 = C_NULL; v2 = C_NULL; v3 = C_NULL + try + create_namespace(cat, ["ns"]) + table = create_table(cat, ["ns"], "t", _ow_schema()) + + v1 = _write_and_append(table, cat, + (id=Int64[1,2,3], value=[1.0,2.0,3.0])) + + # first overwrite + old1 = list_data_files(v1) + files2 = RustyIceberg.with_data_file_writer(v1; prefix="r1") do w + write(w, (id=Int64[10,11], value=[10.0,11.0])) + end + v2 = RustyIceberg.with_transaction(v1, cat) do tx + with_overwrite(tx) do action + add_data_files(action, files2) + delete_data_files(action, old1) + end + end + @test length(read_table_data(v2).id) == 2 + + # second overwrite + old2 = list_data_files(v2) + files3 = RustyIceberg.with_data_file_writer(v2; prefix="r2") do w + write(w, (id=Int64[99], value=[99.0])) + end + v3 = RustyIceberg.with_transaction(v2, cat) do tx + with_overwrite(tx) do action + add_data_files(action, files3) + delete_data_files(action, old2) + end + end + + data = read_table_data(v3) + @test length(data.id) == 1 + @test data.id[1] == 99 + finally + table != C_NULL && free_table(table) + v1 != C_NULL && free_table(v1) + v2 != C_NULL && free_table(v2) + v3 != C_NULL && free_table(v3) + free_catalog!(cat) + end + end + println("✅ Two sequential overwrites converge correctly") +end + +# --------------------------------------------------------------------------- +# fast append after full overwrite (table cleared then re-populated) +# --------------------------------------------------------------------------- + +@testset "Fast append after full overwrite" begin + mktempdir() do warehouse + cat = catalog_create_memory(warehouse) + table = C_NULL; v1 = C_NULL; v2 = C_NULL; v3 = C_NULL + try + create_namespace(cat, ["ns"]) + table = create_table(cat, ["ns"], "t", _ow_schema()) + + # seed the table with some rows + v1 = _write_and_append(table, cat, + (id=Int64[1,2,3], value=[1.0,2.0,3.0])) + @test length(read_table_data(v1).id) == 3 + + # overwrite with empty set — delete everything, add nothing + old_files = list_data_files(v1) + v2 = RustyIceberg.with_transaction(v1, cat) do tx + with_overwrite(tx) do action + delete_data_files(action, old_files) + end + end + snap2 = table_current_snapshot_id(v2) + @test !isnothing(snap2) + + # table should now be empty (read_table_data returns nothing when no batches) + @test isnothing(read_table_data(v2)) + + # fast append populates the table again + v3 = _write_and_append(v2, cat, + (id=Int64[10,20], value=[10.0,20.0]); prefix="post") + snap3 = table_current_snapshot_id(v3) + @test !isnothing(snap3) + @test snap3 != snap2 + + data3 = read_table_data(v3) + @test length(data3.id) == 2 + @test sort(data3.id) == [10, 20] + finally + table != C_NULL && free_table(table) + v1 != C_NULL && free_table(v1) + v2 != C_NULL && free_table(v2) + v3 != C_NULL && free_table(v3) + free_catalog!(cat) + end + end + println("✅ Fast append after full overwrite yields only the new rows") +end + +# --------------------------------------------------------------------------- +# error handling +# --------------------------------------------------------------------------- + +@testset "OverwriteAction error handling" begin + mktempdir() do warehouse + cat = catalog_create_memory(warehouse) + table = C_NULL; updated = C_NULL + try + create_namespace(cat, ["ns"]) + table = create_table(cat, ["ns"], "t", _ow_schema()) + + # apply on freed action must throw + action = RustyIceberg.OverwriteAction() + free_overwrite_action!(action) + tx = RustyIceberg.Transaction(table) + @test_throws RustyIceberg.IcebergException apply(action, tx) + free_transaction!(tx) + println("✅ apply on freed action throws") + + # add_data_files / delete_data_files with null DataFiles must throw + action2 = RustyIceberg.OverwriteAction() + null_df = RustyIceberg.DataFiles(C_NULL) + @test_throws RustyIceberg.IcebergException add_data_files(action2, null_df) + @test_throws RustyIceberg.IcebergException delete_data_files(action2, null_df) + free_overwrite_action!(action2) + println("✅ add/delete_data_files with null DataFiles throw") + + # apply on a transaction consumed by commit must throw + files1 = RustyIceberg.with_data_file_writer(table) do w + write(w, (id=Int64[1], value=[1.0])) + end + action3 = RustyIceberg.OverwriteAction() + tx2 = RustyIceberg.Transaction(table) + add_data_files(action3, files1) + apply(action3, tx2) + free_overwrite_action!(action3) + # now commit tx2 — this consumes the inner Transaction + updated = commit(tx2, cat) + + action4 = RustyIceberg.OverwriteAction() + files2 = RustyIceberg.with_data_file_writer(table; prefix="e2") do w + write(w, (id=Int64[2], value=[2.0])) + end + add_data_files(action4, files2) + @test_throws RustyIceberg.IcebergException apply(action4, tx2) + free_overwrite_action!(action4) + free_transaction!(tx2) + println("✅ apply on committed (consumed) transaction throws") + finally + table != C_NULL && free_table(table) + updated != C_NULL && free_table(updated) + free_catalog!(cat) + end + end + println("✅ OverwriteAction error handling tests passed") +end diff --git a/test/runtests.jl b/test/runtests.jl index 0472b42..9e5ca21 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -45,6 +45,9 @@ include("transaction_tests.jl") # Include writer tests include("writer_tests.jl") +# Include overwrite tests +include("overwrite_tests.jl") + end # End of testset println("\n🎉 All tests completed!")