Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
a492646
chore: bump iceberg-rust to gb/overwrite-action, fix API breaks
gbrgr May 28, 2026
2ad549e
feat(ffi): add OverwriteAction FFI and iceberg_table_list_data_files
gbrgr May 28, 2026
8bc2f58
feat(julia): add OverwriteAction bindings and list_data_files
gbrgr May 28, 2026
9532bc0
test: add overwrite action tests using file-based catalog
gbrgr May 28, 2026
506224c
fix(test): qualify with_data_file_writer with RustyIceberg module prefix
gbrgr May 28, 2026
0a4a69e
fix(test): qualify with_transaction, fix consumed-tx test, add partia…
gbrgr May 28, 2026
d59c1dd
test: rewrite partial overwrite test to use snapshot-based file selec…
gbrgr May 28, 2026
935409e
fix(julia): use parse_and_throw in OverwriteAction FFI wrappers
gbrgr May 28, 2026
0275056
fix clippy, cargo fmt, bump FFI version to 0.8.2, add fast-append-aft…
gbrgr May 28, 2026
4aa32ae
fix memory leak: free DataFiles handle after OverwriteAction add/delete
gbrgr May 28, 2026
3ed58f1
bump iceberg-rust rev to 50b335f (allow delete-only overwrite)
gbrgr May 28, 2026
c1e6b54
Update Cargo.lock
gbrgr May 28, 2026
a3f1171
fix: expect nothing from read_table_data after delete-only overwrite
gbrgr May 28, 2026
f77f1da
add DataFiles inspection API: length and data_file_info
gbrgr May 28, 2026
e6fa9b7
refactor: move DataFiles FFI code into dedicated data_file.rs module
gbrgr May 28, 2026
7dd35c1
bump iceberg-rust rev to e173637c (merged PR #76)
gbrgr May 28, 2026
2bab9e3
bump iceberg-rust rev to 27f68f7e (overwrite correctness fixes PR #77)
gbrgr May 28, 2026
13f4bba
merge main, bump version to 0.8.3
gbrgr May 28, 2026
31902ce
Update src/data_file.jl
gbrgr May 29, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "RustyIceberg"
uuid = "390bdf5b-b624-43dc-a846-0ef7a3405804"
version = "0.8.2"
version = "0.8.3"

[deps]
Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45"
Expand Down
21 changes: 11 additions & 10 deletions iceberg_rust_ffi/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions iceberg_rust_ffi/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iceberg_rust_ffi"
version = "0.8.2"
version = "0.8.3"
edition = "2021"

[lib]
Expand All @@ -12,8 +12,8 @@ default = ["julia"]
julia = []

[dependencies]
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "418213731e91544f5eb31a3efa459e88f599030e", features = ["storage-azdls"] }
iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "418213731e91544f5eb31a3efa459e88f599030e" }
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "27f68f7ea4ac4a3e51bf8c7f644648ccca235170", features = ["storage-azdls"] }
iceberg-catalog-rest = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "27f68f7ea4ac4a3e51bf8c7f644648ccca235170" }
object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false }
tokio = { version = "1.0", features = ["full"] }
futures = "0.3"
Expand All @@ -31,6 +31,7 @@ async-trait = "0.1"
# Pin home to 0.5.9 to support rustc < 1.88
# (0.5.11+ requires rustc 1.88+)
home = "=0.5.9"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.142"
strum = { version = "0.26", features = ["derive"] }

Expand Down
178 changes: 178 additions & 0 deletions iceberg_rust_ffi/src/data_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/// DataFiles FFI — opaque handle, inspection helpers, and table listing.
use std::collections::HashMap;
use std::ffi::c_void;

use iceberg::spec::{DataContentType, DataFile, Datum, ManifestContentType};
use object_store_ffi::{
export_runtime_op, with_cancellation, CResult, NotifyGuard, ResponseGuard, RT,
};

use crate::error_codes::{classified_error, IcebergErrorCode};
use crate::response::IcebergBoxedResponse;

/// Opaque handle for a collection of data files.
///
/// Produced by writers and by `iceberg_table_list_data_files`. Consumed
/// (files moved out) by `iceberg_fast_append_action_add_data_files`,
/// `iceberg_overwrite_action_add_data_files`, and
/// `iceberg_overwrite_action_delete_data_files`. Free with
/// `iceberg_data_files_free` after use.
pub struct IcebergDataFiles {
pub data_files: Vec<DataFile>,
}

unsafe impl Send for IcebergDataFiles {}
unsafe impl Sync for IcebergDataFiles {}

/// Type alias for async operations that return a DataFiles handle.
pub type IcebergDataFilesResponse = IcebergBoxedResponse<IcebergDataFiles>;

/// Free a data files handle.
#[no_mangle]
pub extern "C" fn iceberg_data_files_free(data_files: *mut IcebergDataFiles) {
if !data_files.is_null() {
unsafe {
let _ = Box::from_raw(data_files);
}
}
}

/// Return the number of data files in the handle (0 if null).
#[no_mangle]
pub extern "C" fn iceberg_data_files_len(data_files: *const IcebergDataFiles) -> usize {
if data_files.is_null() {
return 0;
}
unsafe { (*data_files).data_files.len() }
}

/// Return a JSON array of objects describing every data file in the handle.
///
/// Each object contains all metadata fields from the Iceberg DataFile spec:
/// content, file_path, file_format, record_count, file_size_in_bytes,
/// column_sizes, value_counts, null_value_counts, nan_value_counts,
/// lower_bounds, upper_bounds, split_offsets, sort_order_id, equality_ids,
/// first_row_id, referenced_data_file, content_offset, content_size_in_bytes.
///
/// Returns null on error or if `data_files` is null.
/// The caller must free the returned string with `iceberg_destroy_cstring`.
#[no_mangle]
pub extern "C" fn iceberg_data_files_to_json(
data_files: *const IcebergDataFiles,
) -> *mut std::ffi::c_char {
#[derive(serde::Serialize)]
struct DataFileJson<'a> {
content: &'static str,
file_path: &'a str,
file_format: String,
record_count: u64,
file_size_in_bytes: u64,
column_sizes: &'a HashMap<i32, u64>,
value_counts: &'a HashMap<i32, u64>,
null_value_counts: &'a HashMap<i32, u64>,
nan_value_counts: &'a HashMap<i32, u64>,
lower_bounds: HashMap<i32, &'a Datum>,
upper_bounds: HashMap<i32, &'a Datum>,
split_offsets: Option<&'a [i64]>,
sort_order_id: Option<i32>,
equality_ids: Option<Vec<i32>>,
first_row_id: Option<i64>,
referenced_data_file: Option<String>,
content_offset: Option<i64>,
content_size_in_bytes: Option<i64>,
}

if data_files.is_null() {
return std::ptr::null_mut();
}
let df_ref = unsafe { &*data_files };

let entries: Vec<DataFileJson> = df_ref
.data_files
.iter()
.map(|f| DataFileJson {
content: match f.content_type() {
DataContentType::Data => "data",
DataContentType::PositionDeletes => "position_deletes",
DataContentType::EqualityDeletes => "equality_deletes",
},
file_path: f.file_path(),
file_format: f.file_format().to_string(),
record_count: f.record_count(),
file_size_in_bytes: f.file_size_in_bytes(),
column_sizes: f.column_sizes(),
value_counts: f.value_counts(),
null_value_counts: f.null_value_counts(),
nan_value_counts: f.nan_value_counts(),
lower_bounds: f.lower_bounds().iter().map(|(k, v)| (*k, v)).collect(),
upper_bounds: f.upper_bounds().iter().map(|(k, v)| (*k, v)).collect(),
split_offsets: f.split_offsets(),
sort_order_id: f.sort_order_id(),
equality_ids: f.equality_ids(),
first_row_id: f.first_row_id(),
referenced_data_file: f.referenced_data_file(),
content_offset: f.content_offset(),
content_size_in_bytes: f.content_size_in_bytes(),
})
.collect();

match serde_json::to_string(&entries) {
Ok(json) => match std::ffi::CString::new(json) {
Ok(c_str) => c_str.into_raw(),
Err(_) => std::ptr::null_mut(),
},
Err(_) => std::ptr::null_mut(),
}
}

export_runtime_op!(
iceberg_table_list_data_files,
crate::IcebergDataFilesResponse,
|| {
if table.is_null() {
return Err(classified_error(
IcebergErrorCode::STATE_RESOURCE_FREED,
"Resource has been freed",
"Null table pointer provided",
));
}
let table_ref = unsafe { &*table };
Ok(table_ref)
},
table_ref,
async {
let table = &table_ref.table;
let Some(snapshot) = table.metadata().current_snapshot() else {
return Ok::<IcebergDataFiles, anyhow::Error>(IcebergDataFiles {
data_files: vec![],
});
};

let manifest_list = snapshot
.load_manifest_list(table.file_io(), &table.metadata_ref())
.await
.map_err(|e| anyhow::anyhow!("Failed to load manifest list: {e}"))?;

let mut data_files = Vec::new();
for manifest_entry in manifest_list.entries() {
if manifest_entry.content != ManifestContentType::Data {
continue;
}
if !manifest_entry.has_added_files() && !manifest_entry.has_existing_files() {
continue;
}
let manifest = manifest_entry
.load_manifest(table.file_io())
.await
.map_err(|e| anyhow::anyhow!("Failed to load manifest: {e}"))?;
for entry in manifest.entries() {
if entry.is_alive() {
data_files.push(entry.data_file().clone());
}
}
}

Ok::<IcebergDataFiles, anyhow::Error>(IcebergDataFiles { data_files })
},
table: *mut crate::table::IcebergTable
);
2 changes: 1 addition & 1 deletion iceberg_rust_ffi/src/full_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn read_one_full_scan_file(
task: FileScanTask,
) -> iceberg::Result<iceberg::scan::ArrowRecordBatchStream> {
let task_stream = Box::pin(futures::stream::once(async { Ok(task) }));
reader.read(task_stream)
reader.read(task_stream).map(|r| r.stream())
}

/// Full-scan entry point — wraps `create_nested_pipeline` with a closure
Expand Down
3 changes: 2 additions & 1 deletion iceberg_rust_ffi/src/incremental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ export_runtime_op!(
let (scan_ref, serialization_concurrency) = result_tuple;

// Get unzipped streams (separate append and delete streams)
let (inserts_stream, deletes_stream) = scan_ref.to_unzipped_arrow().await.map_err(|e| classify_iceberg(e))?;
let result = scan_ref.to_unzipped_arrow().await.map_err(|e| classify_iceberg(e))?;
let (inserts_stream, deletes_stream) = (result.appends, result.deletes);

// Transform both streams with parallel serialization
let inserts = IcebergArrowStream {
Expand Down
33 changes: 17 additions & 16 deletions iceberg_rust_ffi/src/incremental_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! - delete stream: flat Arrow stream of delete records

use futures::{StreamExt, TryStreamExt};
use iceberg::arrow::{ArrowReader, StreamsInto, UnzippedIncrementalBatchRecordStream};
use iceberg::arrow::{ArrowReader, StreamsInto, UnzippedIncrementalScanResult};
use iceberg::io::FileIO;
use iceberg::scan::incremental::{AppendedFileScanTask, DeleteScanTask};
use tokio::sync::Mutex as AsyncMutex;
Expand All @@ -35,12 +35,11 @@ fn read_one_append_file(
) -> iceberg::Result<iceberg::scan::ArrowRecordBatchStream> {
let append = futures::stream::once(async { Ok(task) }).boxed();
let delete = futures::stream::empty::<iceberg::Result<DeleteScanTask>>().boxed();
let (arrow_stream, _): UnzippedIncrementalBatchRecordStream =
StreamsInto::<ArrowReader, UnzippedIncrementalBatchRecordStream>::stream(
(append, delete),
reader,
)?;
Ok(arrow_stream)
let UnzippedIncrementalScanResult { appends, .. } = StreamsInto::<
ArrowReader,
UnzippedIncrementalScanResult,
>::stream((append, delete), reader)?;
Ok(appends)
}

/// Build the nested incremental pipeline.
Expand Down Expand Up @@ -81,15 +80,17 @@ pub async fn create_incremental_nested_pipeline(

// Delete stream: StreamsInto with empty append stream routes all delete
// tasks through the iceberg reader machinery.
let (_, delete_arrow): UnzippedIncrementalBatchRecordStream =
StreamsInto::<ArrowReader, UnzippedIncrementalBatchRecordStream>::stream(
(
futures::stream::empty::<iceberg::Result<AppendedFileScanTask>>().boxed(),
delete_tasks,
),
build_reader(file_io, batch_size),
)
.map_err(|e| anyhow::anyhow!("Failed to create delete stream: {e}"))?;
let UnzippedIncrementalScanResult {
deletes: delete_arrow,
..
} = StreamsInto::<ArrowReader, UnzippedIncrementalScanResult>::stream(
(
futures::stream::empty::<iceberg::Result<AppendedFileScanTask>>().boxed(),
delete_tasks,
),
build_reader(file_io, batch_size),
)
.map_err(|e| anyhow::anyhow!("Failed to create delete stream: {e}"))?;

let delete_stream = IcebergArrowStream {
stream: AsyncMutex::new(crate::transform_stream_with_parallel_serialization(
Expand Down
Loading
Loading