diff --git a/.gitignore b/.gitignore index 97dd92c..13641cb 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ iceberg_rust_ffi/integration_test **/*.dylib **/.claude **/.DS_Store -LocalPreferences.toml \ No newline at end of file +LocalPreferences.toml +.vscode/settings.json diff --git a/Project.toml b/Project.toml index d1ab521..6790cdb 100644 --- a/Project.toml +++ b/Project.toml @@ -1,6 +1,6 @@ name = "RustyIceberg" uuid = "390bdf5b-b624-43dc-a846-0ef7a3405804" -version = "0.8.1" +version = "0.8.2" [deps] Arrow = "69666777-d1a9-59fb-9406-91d4454c9d45" diff --git a/iceberg_rust_ffi/Cargo.lock b/iceberg_rust_ffi/Cargo.lock index 952e2f1..cd3c359 100644 --- a/iceberg_rust_ffi/Cargo.lock +++ b/iceberg_rust_ffi/Cargo.lock @@ -1641,7 +1641,7 @@ dependencies = [ [[package]] name = "iceberg_rust_ffi" -version = "0.8.1" +version = "0.8.2" dependencies = [ "anyhow", "arrow-array", diff --git a/iceberg_rust_ffi/Cargo.toml b/iceberg_rust_ffi/Cargo.toml index 9f203fe..85a4c99 100644 --- a/iceberg_rust_ffi/Cargo.toml +++ b/iceberg_rust_ffi/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "iceberg_rust_ffi" -version = "0.8.1" +version = "0.8.2" edition = "2021" [lib] diff --git a/iceberg_rust_ffi/src/lib.rs b/iceberg_rust_ffi/src/lib.rs index be6eae7..ef56b94 100644 --- a/iceberg_rust_ffi/src/lib.rs +++ b/iceberg_rust_ffi/src/lib.rs @@ -37,9 +37,13 @@ mod transaction; // Writer module mod writer; -// Column-based writer module (zero-copy from Julia) +// Shared FFI structs/constants for the column-based write path mod writer_columns; +// Incremental RecordBatch builder: per-slice copy into owned buffers, finalize to RecordBatch. +// Embedded inside `IcebergDataFileWriter`; not directly exposed across the FFI. +mod record_batch_builder; + // Profiling stats for the file-parallel pipeline mod pipeline_stats; @@ -80,7 +84,7 @@ pub use transaction::{IcebergDataFiles, IcebergTransaction, IcebergTransactionRe pub use writer::{ IcebergDataFileWriter, IcebergDataFileWriterResponse, IcebergWriterCloseResponse, }; -pub use writer_columns::ColumnDescriptor; +pub use writer_columns::ColumnSlice; // We use `jl_adopt_thread` to ensure Rust can call into Julia when notifying // the Base.Event that is waiting for the Rust result. diff --git a/iceberg_rust_ffi/src/record_batch_builder.rs b/iceberg_rust_ffi/src/record_batch_builder.rs new file mode 100644 index 0000000..91854ed --- /dev/null +++ b/iceberg_rust_ffi/src/record_batch_builder.rs @@ -0,0 +1,666 @@ +/// Incremental Arrow `RecordBatch` builder for zero-copy-from-Julia writes. +/// +/// Lives as an internal field of `IcebergDataFileWriter`. Julia drives it through +/// `iceberg_writer_append` (per upstream slice) and `iceberg_writer_flush` (explicit +/// boundary). Auto-flush at `coalesce_rows` happens inside the writer's append entry +/// point; the builder itself is mechanical. +/// +/// Each `append_slice` call copies one slice's data per column directly into per-column +/// `MutableBuffer`s that already match Arrow's physical layout. At finalize time those +/// buffers become Arrow `Buffer`s via a zero-copy `.into()` move, get wrapped in typed +/// arrays, and assemble into a `RecordBatch` — no further copy. Fresh same-capacity +/// buffers swap in for the next window, so steady-state reallocation is zero. +/// +/// Null bits are populated lazily: all-valid slices skip the bitmap entirely. The first +/// null slice triggers a one-time backfill of all prior rows as valid, then subsequent +/// slices proceed normally. If no null slice ever arrives, no `NullBuffer` is emitted. +use std::sync::Arc; + +use arrow_array::{ + types::*, ArrayRef, BooleanArray, FixedSizeBinaryArray, PrimitiveArray, StringArray, +}; +use arrow_buffer::{BooleanBuffer, Buffer, MutableBuffer, NullBuffer, OffsetBuffer, ScalarBuffer}; +use arrow_schema::SchemaRef as ArrowSchemaRef; + +use crate::writer_columns::{ + ColumnSlice, COLUMN_TYPE_BOOLEAN, COLUMN_TYPE_DATE, COLUMN_TYPE_DECIMAL_INT128, + COLUMN_TYPE_DECIMAL_INT32, COLUMN_TYPE_DECIMAL_INT64, COLUMN_TYPE_FLOAT32, COLUMN_TYPE_FLOAT64, + COLUMN_TYPE_INT32, COLUMN_TYPE_INT64, COLUMN_TYPE_JULIA_DATE, COLUMN_TYPE_JULIA_TIMESTAMP, + COLUMN_TYPE_JULIA_TIMESTAMPTZ, COLUMN_TYPE_JULIA_TIMESTAMPTZ_NS, + COLUMN_TYPE_JULIA_TIMESTAMP_NS, COLUMN_TYPE_STRING, COLUMN_TYPE_TIMESTAMP, + COLUMN_TYPE_TIMESTAMPTZ, COLUMN_TYPE_UUID, +}; + +/// Days from Julia Date epoch (0001-01-01, Rata Die day 1) to Unix epoch (1970-01-01). +/// Julia Date stores days using 1-based Rata Die: Dates.value(Date(1970,1,1)) == 719163. +const JULIA_DATE_OFFSET: i64 = 719_163; +/// Milliseconds from Julia DateTime epoch (0001-01-01) to Unix epoch (1970-01-01). +const JULIA_TIMESTAMP_OFFSET_MS: i64 = 719_163 * 86_400_000; + +/// Default coalesce-window size for the embedded builder. +pub(crate) const DEFAULT_COALESCE_ROWS: usize = 1_048_576; + +/// Bytes per row for numeric column types (0 for Bool/Str which are not Numeric). +fn column_bytes_per_row(column_type: i32) -> usize { + match column_type { + COLUMN_TYPE_INT32 + | COLUMN_TYPE_DATE + | COLUMN_TYPE_FLOAT32 + | COLUMN_TYPE_DECIMAL_INT32 + | COLUMN_TYPE_JULIA_DATE => 4, + COLUMN_TYPE_INT64 + | COLUMN_TYPE_TIMESTAMP + | COLUMN_TYPE_TIMESTAMPTZ + | COLUMN_TYPE_FLOAT64 + | COLUMN_TYPE_DECIMAL_INT64 + | COLUMN_TYPE_JULIA_TIMESTAMP + | COLUMN_TYPE_JULIA_TIMESTAMPTZ + | COLUMN_TYPE_JULIA_TIMESTAMP_NS + | COLUMN_TYPE_JULIA_TIMESTAMPTZ_NS => 8, + COLUMN_TYPE_DECIMAL_INT128 | COLUMN_TYPE_UUID => 16, + _ => 0, + } +} + +// --------------------------------------------------------------------------- +// Per-column value buffer +// +// All numeric and UUID variants use `MutableBuffer` (byte-oriented, Arrow-native). +// On append we write typed bytes directly into it; on finalize we call `.into()` to +// get an Arrow `Buffer` — ownership transfer, no copy. The buffer is then swapped out +// for a fresh pre-allocated one so the next window never reallocates. + +enum ColumnValues { + Numeric(MutableBuffer), // I32/I64/F32/F64/I128/UUID — bytes per row varies by type + Bool(Vec), // BOOLEAN — 1 byte per row; bit-packed at finalize + Str { + bytes: Vec, + offsets: Vec, // Arrow offset buffer; offsets[0] = 0 always + }, +} + +impl ColumnValues { + fn new(column_type: i32, coalesce_rows: usize) -> Self { + match column_type { + COLUMN_TYPE_BOOLEAN => ColumnValues::Bool(Vec::with_capacity(coalesce_rows)), + COLUMN_TYPE_STRING => ColumnValues::Str { + bytes: Vec::new(), + // Start empty; finalize_and_reset right-sizes to the actual slice length + // after the first flush, so we never hold a 4MB coalesce_rows-sized Vec. + offsets: vec![0i32], + }, + _ => { + let bpr = column_bytes_per_row(column_type).max(8); // fallback 8 for unknown + ColumnValues::Numeric(MutableBuffer::with_capacity(coalesce_rows * bpr)) + } + } + } +} + +// --------------------------------------------------------------------------- +// Per-column builder state + +struct ColumnBuilderState { + column_type: i32, + bytes_per_row: usize, // for Numeric variant: bytes per element + is_nullable: bool, + values: ColumnValues, + /// Lazily-populated validity bitmap. Empty until the first null slice. + null_bits: Vec, + rows: usize, + has_nulls: bool, +} + +impl ColumnBuilderState { + fn new(column_type: i32, is_nullable: bool, coalesce_rows: usize) -> Self { + Self { + column_type, + bytes_per_row: column_bytes_per_row(column_type), + is_nullable, + values: ColumnValues::new(column_type, coalesce_rows), + null_bits: Vec::new(), + rows: 0, + has_nulls: false, + } + } +} + +// --------------------------------------------------------------------------- +// Builder type + +pub(crate) struct RecordBatchBuilder { + columns: Vec, + arrow_schema: ArrowSchemaRef, + coalesce_rows: usize, +} + +impl RecordBatchBuilder { + pub(crate) fn new( + arrow_schema: ArrowSchemaRef, + col_types: &[i32], + coalesce_rows: usize, + ) -> Result { + if col_types.len() != arrow_schema.fields().len() { + return Err(anyhow::anyhow!( + "col_types length {} != schema field count {}", + col_types.len(), + arrow_schema.fields().len() + )); + } + let columns = col_types + .iter() + .zip(arrow_schema.fields().iter()) + .map(|(&ct, field)| ColumnBuilderState::new(ct, field.is_nullable(), coalesce_rows)) + .collect(); + Ok(Self { + columns, + arrow_schema, + coalesce_rows, + }) + } + + /// Rows accumulated in the current window (across all columns; they stay in sync). + pub(crate) fn rows(&self) -> usize { + self.columns.first().map(|c| c.rows).unwrap_or(0) + } + + /// True when the current window has reached or passed `coalesce_rows` — the writer + /// should finalize and reset before continuing. + pub(crate) fn should_flush(&self) -> bool { + self.rows() >= self.coalesce_rows + } + + /// Append one slice per column. Rust copies all data synchronously; source memory + /// may be released the moment this call returns. + /// + /// # Safety + /// All pointers inside the `ColumnSlice`s must be valid for `len` elements for the + /// duration of this call. + pub(crate) unsafe fn append_slice( + &mut self, + slices: &[ColumnSlice], + ) -> Result<(), anyhow::Error> { + if slices.len() != self.columns.len() { + return Err(anyhow::anyhow!( + "slice count {} != column count {}", + slices.len(), + self.columns.len() + )); + } + for (state, slice) in self.columns.iter_mut().zip(slices.iter()) { + unsafe { append_to_state(state, slice) }?; + } + Ok(()) + } + + /// Finalize the accumulated columns into a `RecordBatch` and reset all column + /// buffers in-place for the next window. The buffers are swapped with fresh + /// pre-allocated `MutableBuffer`s of the same capacity, so the next window never + /// reallocates. + pub(crate) fn take_record_batch(&mut self) -> Result { + let mut arrays: Vec = Vec::with_capacity(self.columns.len()); + for (i, state) in self.columns.iter_mut().enumerate() { + let field = self.arrow_schema.field(i); + arrays.push(finalize_and_reset(state, field, self.coalesce_rows)?); + } + arrow_array::RecordBatch::try_new(self.arrow_schema.clone(), arrays) + .map_err(|e| anyhow::anyhow!("RecordBatch: {}", e)) + } +} + +// --------------------------------------------------------------------------- +// Append logic + +unsafe fn append_to_state( + state: &mut ColumnBuilderState, + slice: &ColumnSlice, +) -> Result<(), anyhow::Error> { + let len = slice.len; + + // ---- null bits (lazy) ------------------------------------------------- + // Skip entirely for all-valid slices when no nulls have been seen yet. + // On the first null slice, backfill all prior rows as valid, then copy bits. + // For all-valid slices after nulls have been seen, extend the bitmap with 1s. + if state.is_nullable { + if !slice.validity_ptr.is_null() { + let out_start = state.rows; + let needed = (out_start + len).div_ceil(8); + if !state.has_nulls { + // First null slice: backfill all prior rows as valid. + state.null_bits.resize(needed, 0u8); + set_bits_range(&mut state.null_bits, 0, out_start); + state.has_nulls = true; + } else if state.null_bits.len() < needed { + state.null_bits.resize(needed, 0u8); + } + // Copy validity bits. Three paths: + // (1) `sel_ptr != null` — source-aligned bitmap + non-identity sel: gather + // bit `sel[i] - 1` into output position `out_start + i`. + // (2) `out_start % 8 == 0` — byte-aligned destination (the steady state + // under flush-per-slice): one `copy_nonoverlapping` for the whole slice. + // (3) otherwise — bit-by-bit copy. + // The gather path uses raw pointer arithmetic on `validity_ptr` because the + // source bitmap covers `length(data)` bits (≥ `max(sel)`), which is typically + // larger than `len`. ORing into 0-initialized `null_bits` is safe even when + // `out_start + len` shares a byte with prior rows — we never write a 0 over a + // previously-set 1 because `set_bits_range(0, out_start)` already populated + // earlier rows in a separate phase. + if !slice.sel_ptr.is_null() { + let sel = unsafe { std::slice::from_raw_parts(slice.sel_ptr, len) }; + for (i, &s) in sel.iter().enumerate() { + let src_idx = (s - 1) as usize; + let b = unsafe { (*slice.validity_ptr.add(src_idx / 8) >> (src_idx % 8)) & 1 }; + let pos = out_start + i; + state.null_bits[pos / 8] |= b << (pos % 8); + } + } else if out_start.is_multiple_of(8) { + let dst = out_start / 8; + let n_bytes = len.div_ceil(8); + unsafe { + std::ptr::copy_nonoverlapping( + slice.validity_ptr, + state.null_bits.as_mut_ptr().add(dst), + n_bytes, + ); + } + // Mask off garbage bits beyond `len` in the last byte so they don't + // corrupt a subsequent coalesced slice that shares that byte. + if !len.is_multiple_of(8) { + let tail = state.null_bits.last_mut().unwrap(); + *tail &= (1u8 << (len % 8)) - 1; + } + } else { + for i in 0..len { + let b = unsafe { (*slice.validity_ptr.add(i / 8) >> (i % 8)) & 1 }; + let pos = out_start + i; + state.null_bits[pos / 8] |= b << (pos % 8); + } + } + } else if state.has_nulls { + // All-valid slice but nulls seen earlier — extend bitmap with 1s. + let out_start = state.rows; + let needed = (out_start + len).div_ceil(8); + if state.null_bits.len() < needed { + state.null_bits.resize(needed, 0u8); + } + set_bits_range(&mut state.null_bits, out_start, out_start + len); + } + // else: all-valid slice, no nulls yet — nothing to do. + } + + // ---- values ----------------------------------------------------------- + match &mut state.values { + ColumnValues::Numeric(buf) => { + append_numeric(buf, slice, state.column_type, len)?; + } + ColumnValues::Bool(v) => { + if slice.sel_ptr.is_null() { + let src = unsafe { std::slice::from_raw_parts(slice.data_ptr as *const u8, len) }; + v.extend_from_slice(src); + } else { + // See `append_primitive!` for why scatter uses a raw pointer. + let src = slice.data_ptr as *const u8; + let sel = unsafe { std::slice::from_raw_parts(slice.sel_ptr, len) }; + for &idx in sel { + v.push(unsafe { *src.add((idx - 1) as usize) }); + } + } + } + ColumnValues::Str { bytes, offsets } => { + // Pointer-of-pointers protocol: data_ptr is *const *const u8 (array of pointers + // into Julia source strings), lengths_ptr is *const i64 (byte lengths per row). + // Null and empty rows have length 0; the nb>0 guard is sufficient since Julia + // always sets len=0 for null/empty rows (no null-pointer check needed). + if slice.lengths_ptr.is_null() { + return Err(anyhow::anyhow!("String column: lengths_ptr is null")); + } + let ptrs = + unsafe { std::slice::from_raw_parts(slice.data_ptr as *const *const u8, len) }; + let lens = unsafe { std::slice::from_raw_parts(slice.lengths_ptr, len) }; + // Pre-reserve so bytes/offsets never reallocate mid-loop. + let total: usize = lens.iter().map(|&l| l as usize).sum(); + bytes.reserve(total); + offsets.reserve(len); + // Track running offset locally instead of reading bytes.len() each iteration. + let mut cur_off = bytes.len(); + for i in 0..len { + // Prefetch the string data that will be read PREFETCH_DIST iterations ahead. + if i + PREFETCH_DIST < len { + unsafe { prefetch_read(ptrs[i + PREFETCH_DIST]) }; + } + let nb = lens[i] as usize; + if nb > 0 { + bytes.extend_from_slice(unsafe { std::slice::from_raw_parts(ptrs[i], nb) }); + cur_off += nb; + } + offsets.push(cur_off as i32); + } + } + } + + state.rows += len; + Ok(()) +} + +/// Issue a read prefetch for the cache line at `ptr`. +/// Compiles to a real prefetch on x86_64 and aarch64; no-op elsewhere. +#[inline(always)] +unsafe fn prefetch_read(ptr: *const u8) { + #[cfg(target_arch = "x86_64")] + std::arch::x86_64::_mm_prefetch(ptr as *const i8, std::arch::x86_64::_MM_HINT_T1); + #[cfg(target_arch = "aarch64")] + core::arch::asm!("prfm pldl2keep, [{ptr}]", ptr = in(reg) ptr, options(nostack, readonly)); + #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))] + let _ = ptr; +} + +// Bulk-copy or 1-based-scattered-gather a slice of primitive T into buf (no transform). +// Identity selection → single memcpy; scattered selection → element-wise gather. +// Prefetch distance for scatter-gather loops: enough to cover ~200-cycle cache miss +// latency at typical throughput of a few cycles per element. +const PREFETCH_DIST: usize = 16; + +macro_rules! append_primitive { + ($buf:expr, $slice:expr, $len:expr, $T:ty) => {{ + if $slice.sel_ptr.is_null() { + let src = unsafe { std::slice::from_raw_parts($slice.data_ptr as *const $T, $len) }; + $buf.extend_from_slice(unsafe { as_bytes(src) }); + } else { + // Scatter: index into the source array via raw pointer arithmetic. The + // source array may be longer than `$len` (which is the output stripe + // length), so a `&[T]` of length `$len` would false-positive on any + // sel index ≥ $len. + let src = $slice.data_ptr as *const $T; + let sel = unsafe { std::slice::from_raw_parts($slice.sel_ptr, $len) }; + for (i, &idx) in sel.iter().enumerate() { + if i + PREFETCH_DIST < $len { + unsafe { + prefetch_read(src.add((sel[i + PREFETCH_DIST] - 1) as usize) as *const u8) + }; + } + let v = unsafe { *src.add((idx - 1) as usize) }; + $buf.extend_from_slice(&v.to_ne_bytes()); + } + } + }}; +} + +// Element-wise transform from source type S with optional 1-based scattered gather. +// `$f` maps S → a value whose `.to_ne_bytes()` is written to buf. +macro_rules! append_transform { + ($buf:expr, $slice:expr, $len:expr, $S:ty, $f:expr) => {{ + if $slice.sel_ptr.is_null() { + let src = unsafe { std::slice::from_raw_parts($slice.data_ptr as *const $S, $len) }; + for &v in src { + $buf.extend_from_slice(&($f)(v).to_ne_bytes()); + } + } else { + // See `append_primitive!` for why scatter uses a raw pointer. + let src = $slice.data_ptr as *const $S; + let sel = unsafe { std::slice::from_raw_parts($slice.sel_ptr, $len) }; + for &idx in sel { + let v = unsafe { *src.add((idx - 1) as usize) }; + $buf.extend_from_slice(&($f)(v).to_ne_bytes()); + } + } + }}; +} + +/// Append numeric slice data directly into a `MutableBuffer`. +/// Identity (sequential) slices use a bulk byte copy; scattered slices loop element-wise. +unsafe fn append_numeric( + buf: &mut MutableBuffer, + slice: &ColumnSlice, + column_type: i32, + len: usize, +) -> Result<(), anyhow::Error> { + match column_type { + COLUMN_TYPE_INT32 | COLUMN_TYPE_DATE => append_primitive!(buf, slice, len, i32), + COLUMN_TYPE_INT64 | COLUMN_TYPE_TIMESTAMP | COLUMN_TYPE_TIMESTAMPTZ => { + append_primitive!(buf, slice, len, i64) + } + COLUMN_TYPE_FLOAT32 => append_primitive!(buf, slice, len, f32), + COLUMN_TYPE_FLOAT64 => append_primitive!(buf, slice, len, f64), + COLUMN_TYPE_DECIMAL_INT32 => { + append_transform!(buf, slice, len, i32, |x: i32| x as i128) + } + COLUMN_TYPE_DECIMAL_INT64 => { + append_transform!(buf, slice, len, i64, |x: i64| x as i128) + } + COLUMN_TYPE_DECIMAL_INT128 | COLUMN_TYPE_UUID => { + // 16-byte elements — no primitive type; copy as raw bytes. + if slice.sel_ptr.is_null() { + let src = + unsafe { std::slice::from_raw_parts(slice.data_ptr as *const u8, len * 16) }; + buf.extend_from_slice(src); + } else { + // See `append_primitive!` for why scatter uses a raw pointer. + let src = slice.data_ptr as *const u8; + let sel = unsafe { std::slice::from_raw_parts(slice.sel_ptr, len) }; + for (i, &idx) in sel.iter().enumerate() { + if i + PREFETCH_DIST < len { + unsafe { + prefetch_read(src.add((sel[i + PREFETCH_DIST] - 1) as usize * 16)) + }; + } + let off = (idx - 1) as usize * 16; + let chunk = unsafe { std::slice::from_raw_parts(src.add(off), 16) }; + buf.extend_from_slice(chunk); + } + } + } + // Julia date/timestamp types carry a Julia-epoch offset that Rust removes here. + // Source: i64[] days since 0001-01-01 → i32 days since 1970-01-01 (Date32). + COLUMN_TYPE_JULIA_DATE => { + append_transform!(buf, slice, len, i64, |v: i64| (v - JULIA_DATE_OFFSET) + as i32) + } + // Source: i64[] ms since 0001-01-01 → i64 μs since 1970-01-01. + COLUMN_TYPE_JULIA_TIMESTAMP | COLUMN_TYPE_JULIA_TIMESTAMPTZ => { + append_transform!(buf, slice, len, i64, |v: i64| (v + - JULIA_TIMESTAMP_OFFSET_MS) + * 1_000) + } + // Source: i64[] ms since 0001-01-01 → i64 ns since 1970-01-01. + COLUMN_TYPE_JULIA_TIMESTAMP_NS | COLUMN_TYPE_JULIA_TIMESTAMPTZ_NS => { + append_transform!(buf, slice, len, i64, |v: i64| (v + - JULIA_TIMESTAMP_OFFSET_MS) + * 1_000_000) + } + _ => return Err(anyhow::anyhow!("unsupported column type {}", column_type)), + } + Ok(()) +} + +// --------------------------------------------------------------------------- +// Finalize + +/// Build an Arrow array from `state`'s accumulated data and reset all buffers +/// in-place for the next coalesce window. +fn finalize_and_reset( + state: &mut ColumnBuilderState, + schema_field: &arrow_schema::Field, + coalesce_rows: usize, +) -> Result { + let rows = state.rows; + state.rows = 0; + state.has_nulls = false; + + // Build NullBuffer from lazily-accumulated bits (None if no nulls seen). + // Preserve the Vec's capacity for the next window to avoid repeated allocation. + let null_buf: Option = if state.is_nullable && !state.null_bits.is_empty() { + let cap = state.null_bits.capacity(); + let bits = std::mem::replace(&mut state.null_bits, Vec::with_capacity(cap)); + Some(NullBuffer::new(BooleanBuffer::new( + Buffer::from_vec(bits), + 0, + rows, + ))) + } else { + state.null_bits.clear(); + None + }; + + let array: ArrayRef = match &mut state.values { + ColumnValues::Numeric(buf) => { + // Swap in a fresh pre-allocated buffer; take the old one as Arrow Buffer. + let old = std::mem::replace( + buf, + MutableBuffer::with_capacity(coalesce_rows * state.bytes_per_row), + ); + let arrow_buf: Buffer = old.into(); + build_numeric_array(state.column_type, arrow_buf, rows, null_buf, schema_field)? + } + ColumnValues::Bool(v) => { + let cap = v.capacity(); + let taken = std::mem::replace(v, Vec::with_capacity(cap)); + let mut bits = vec![0u8; rows.div_ceil(8)]; + for (i, &b) in taken.iter().enumerate().take(rows) { + if b != 0 { + bits[i / 8] |= 1u8 << (i % 8); + } + } + Arc::new(BooleanArray::new( + BooleanBuffer::new(Buffer::from_vec(bits), 0, rows), + null_buf, + )) + } + ColumnValues::Str { bytes, offsets } => { + // Capacity hints from the previous window so the reset never over-allocates. + // With has_strings flush-per-slice, offsets.len() == slice_len+1 (~4097), not + // coalesce_rows+1 (1M). Using len() here shrinks the reset from 4MB to ~16KB. + let bytes_cap = bytes.capacity(); + let offsets_hint = offsets.len(); // rows+1 from the window just taken + let taken_bytes = std::mem::replace(bytes, Vec::with_capacity(bytes_cap)); + let taken_offsets = std::mem::replace(offsets, { + let mut v = Vec::with_capacity(offsets_hint.max(1)); + v.push(0i32); + v + }); + let arr = unsafe { + StringArray::new_unchecked( + OffsetBuffer::new(ScalarBuffer::from(taken_offsets)), + Buffer::from_vec(taken_bytes), + null_buf, + ) + }; + Arc::new(arr) + } + }; + + Ok(array) +} + +fn build_numeric_array( + column_type: i32, + buf: Buffer, + rows: usize, + null_buf: Option, + schema_field: &arrow_schema::Field, +) -> Result { + Ok(match column_type { + COLUMN_TYPE_INT32 => Arc::new(PrimitiveArray::::new( + ScalarBuffer::new(buf, 0, rows), + null_buf, + )), + // JULIA_DATE stores the epoch-adjusted i32 value; Arrow type is the same as DATE. + COLUMN_TYPE_DATE | COLUMN_TYPE_JULIA_DATE => Arc::new(PrimitiveArray::::new( + ScalarBuffer::new(buf, 0, rows), + null_buf, + )), + COLUMN_TYPE_INT64 => Arc::new(PrimitiveArray::::new( + ScalarBuffer::new(buf, 0, rows), + null_buf, + )), + // JULIA_TIMESTAMP stores epoch-adjusted μs; Arrow type is the same as TIMESTAMP. + COLUMN_TYPE_TIMESTAMP | COLUMN_TYPE_JULIA_TIMESTAMP => { + Arc::new(PrimitiveArray::::new( + ScalarBuffer::new(buf, 0, rows), + null_buf, + )) + } + // JULIA_TIMESTAMPTZ stores epoch-adjusted μs; Arrow type is the same as TIMESTAMPTZ. + COLUMN_TYPE_TIMESTAMPTZ | COLUMN_TYPE_JULIA_TIMESTAMPTZ => Arc::new( + PrimitiveArray::::new( + ScalarBuffer::new(buf, 0, rows), + null_buf, + ) + .with_timezone("UTC"), + ), + COLUMN_TYPE_FLOAT32 => Arc::new(PrimitiveArray::::new( + ScalarBuffer::new(buf, 0, rows), + null_buf, + )), + COLUMN_TYPE_FLOAT64 => Arc::new(PrimitiveArray::::new( + ScalarBuffer::new(buf, 0, rows), + null_buf, + )), + COLUMN_TYPE_DECIMAL_INT32 | COLUMN_TYPE_DECIMAL_INT64 | COLUMN_TYPE_DECIMAL_INT128 => { + let (precision, scale) = match schema_field.data_type() { + arrow_schema::DataType::Decimal128(p, s) => (*p, *s), + dt => { + return Err(anyhow::anyhow!( + "Expected Decimal128 for field {}, got {:?}", + schema_field.name(), + dt + )) + } + }; + Arc::new( + PrimitiveArray::::new(ScalarBuffer::new(buf, 0, rows), null_buf) + .with_precision_and_scale(precision, scale) + .map_err(|e| anyhow::anyhow!("Decimal precision/scale: {}", e))?, + ) + } + COLUMN_TYPE_UUID => { + // UUID: FixedSizeBinary(16) + Arc::new( + FixedSizeBinaryArray::try_new(16, buf, null_buf) + .map_err(|e| anyhow::anyhow!("UUID FixedSizeBinary: {}", e))?, + ) + } + COLUMN_TYPE_JULIA_TIMESTAMP_NS => Arc::new(PrimitiveArray::::new( + ScalarBuffer::new(buf, 0, rows), + null_buf, + )), + COLUMN_TYPE_JULIA_TIMESTAMPTZ_NS => Arc::new( + PrimitiveArray::::new( + ScalarBuffer::new(buf, 0, rows), + null_buf, + ) + .with_timezone("UTC"), + ), + ct => { + return Err(anyhow::anyhow!( + "unsupported column type {} in finalize", + ct + )) + } + }) +} + +// --------------------------------------------------------------------------- +// Helpers + +/// Set bits [start, end) to 1 in `bits` (Arrow bit layout: bit i at byte i/8, shift i%8). +fn set_bits_range(bits: &mut [u8], start: usize, end: usize) { + if start >= end { + return; + } + let (fb, lb) = (start / 8, (end - 1) / 8); + let (fi, li) = (start % 8, (end - 1) % 8); + if fb == lb { + // All bits in the same byte: set bits [fi, li]. + bits[fb] |= ((1u16 << (li + 1)) - 1) as u8 & (0xFF_u8 << fi); + } else { + bits[fb] |= 0xFF_u8 << fi; // partial first byte: bits [fi, 7] + bits[(fb + 1)..lb].fill(0xFF); // full middle bytes + bits[lb] |= ((1u16 << (li + 1)) - 1) as u8; // partial last byte: bits [0, li] + } +} + +/// Reinterpret a typed slice as bytes. +#[inline(always)] +unsafe fn as_bytes(s: &[T]) -> &[u8] { + unsafe { std::slice::from_raw_parts(s.as_ptr() as *const u8, std::mem::size_of_val(s)) } +} diff --git a/iceberg_rust_ffi/src/writer.rs b/iceberg_rust_ffi/src/writer.rs index 27db184..6c0db60 100644 --- a/iceberg_rust_ffi/src/writer.rs +++ b/iceberg_rust_ffi/src/writer.rs @@ -1,22 +1,24 @@ /// Writer support for iceberg_rust_ffi /// -/// Encoding is handled by a global pool of N=available_parallelism OS threads shared -/// across all writers. Per-writer ordering is guaranteed by the per-writer -/// `Arc>` inside WriterState: only one pool thread encodes -/// a given writer at a time, and the FIFO global queue ensures batches are submitted -/// in order. -use std::any::Any; -use std::cell::RefCell; -use std::ffi::{c_char, c_void, CString}; +/// Encoding is handled by a global pool of N async worker tasks (default +/// 2 * available_parallelism, configurable via `iceberg_set_encode_workers`) running +/// on the tokio runtime, shared across all writers. Each writer owns its own FIFO queue +/// of pending batches; workers scan the set of active writers and claim one (via the +/// per-writer `busy` flag) before draining its queue. This avoids the head-of-line +/// blocking that the old single-MPMC design suffered when many workers happened to pull +/// tasks for the same writer. Workers `.await` the I/O inside `w.write()`, so a runtime +/// thread parked on an S3 PUT is free to drive another writer's encode in the meantime. +use std::cell::UnsafeCell; +use std::collections::VecDeque; +use std::ffi::{c_char, c_void}; use std::io::Cursor; -use std::panic::{catch_unwind, AssertUnwindSafe}; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, OnceLock}; use std::thread; use arrow_array::RecordBatch; use arrow_ipc::reader::StreamReader; -use arrow_schema::SchemaRef as ArrowSchemaRef; +use arrow_schema::{DataType, SchemaRef as ArrowSchemaRef, TimeUnit}; use iceberg::arrow::schema_to_arrow_schema; use iceberg::spec::DataFileFormat; use iceberg::writer::base_writer::data_file_writer::{DataFileWriter, DataFileWriterBuilder}; @@ -35,6 +37,7 @@ const COMPRESSION_SNAPPY: i32 = 1; const COMPRESSION_GZIP: i32 = 2; const COMPRESSION_LZ4: i32 = 3; const COMPRESSION_ZSTD: i32 = 4; +const COMPRESSION_LZ4_RAW: i32 = 5; fn compression_from_code(code: i32) -> Compression { match code { @@ -43,6 +46,7 @@ fn compression_from_code(code: i32) -> Compression { COMPRESSION_GZIP => Compression::GZIP(Default::default()), COMPRESSION_LZ4 => Compression::LZ4, COMPRESSION_ZSTD => Compression::ZSTD(Default::default()), + COMPRESSION_LZ4_RAW => Compression::LZ4_RAW, _ => Compression::SNAPPY, } } @@ -92,13 +96,18 @@ impl ParquetWriterPropertiesFFI { } } -use crate::error_codes::{classified_error, classify, IcebergErrorCode}; +use crate::error_codes::{classified_error, classify, classify_iceberg, IcebergErrorCode}; +use crate::record_batch_builder::{RecordBatchBuilder, DEFAULT_COALESCE_ROWS}; use crate::response::IcebergBoxedResponse; use crate::table::IcebergTable; use crate::transaction::IcebergDataFiles; use crate::util::parse_c_string; use crate::writer_columns::{ - build_arrow_array_gathered, ColumnDescriptor, GatheredColumnDescriptor, SliceRef, + ColumnSlice, COLUMN_TYPE_BOOLEAN, COLUMN_TYPE_DECIMAL_INT128, COLUMN_TYPE_DECIMAL_INT32, + COLUMN_TYPE_DECIMAL_INT64, COLUMN_TYPE_FLOAT32, COLUMN_TYPE_FLOAT64, COLUMN_TYPE_INT32, + COLUMN_TYPE_INT64, COLUMN_TYPE_JULIA_DATE, COLUMN_TYPE_JULIA_TIMESTAMP, + COLUMN_TYPE_JULIA_TIMESTAMPTZ, COLUMN_TYPE_JULIA_TIMESTAMPTZ_NS, + COLUMN_TYPE_JULIA_TIMESTAMP_NS, COLUMN_TYPE_STRING, COLUMN_TYPE_UUID, }; use object_store_ffi::{ export_runtime_op, with_cancellation, CResult, NotifyGuard, ResponseGuard, RT, @@ -108,135 +117,332 @@ use object_store_ffi::{ type ConcreteDataFileWriter = DataFileWriter; -/// Encode task submitted to the global worker pool. -struct EncodeTask { - batch: RecordBatch, - state: Arc, -} - -// Safety: RecordBatch is Send; WriterState fields are Send. -unsafe impl Send for EncodeTask {} - /// Shared mutable state for one IcebergDataFileWriter. /// Owned by the IcebergDataFileWriter and shared with pool workers via Arc. +/// +/// # Invariants +/// +/// 1. **Per-writer FIFO ordering.** Batches for a given writer are encoded in submission +/// order. Pushes go to the back of `pending_queue`; pops come from the front; and +/// `busy` ensures at most one worker drains the queue at a time — so the encode order +/// is identical to the push order. +/// 2. **Single-claim.** A worker may only encode for this writer while it has won the +/// `busy.compare_exchange(false → true)`. The claimed worker drains the queue to +/// empty (or until the writer reports an error) before releasing `busy`. +/// 3. **Stranded-task mitigation.** After releasing `busy`, the worker re-checks +/// `queue_len`; if non-zero (a producer pushed a batch between the worker's last +/// pop and the release), it notifies the global pool again. This prevents the +/// classic missed-notification race where a notification is consumed by a worker +/// that arrived between the producer's push and the queue's becoming non-empty. pub(crate) struct WriterState { - /// The underlying Parquet writer. Protected by a Mutex so pool workers can access it - /// concurrently (though at most one worker encodes a given writer at a time due to the - /// bounded per-writer channel). Set to None when the writer is closed or freed. + /// The underlying Parquet writer. Protected by a Mutex so pool workers can access it. + /// Only the worker that holds the `busy` claim ever locks this — so there's no real + /// contention here; the Mutex is preserved purely to coordinate with + /// `iceberg_writer_free`, which may take the writer out from under in-flight work. + /// Set to None when the writer is closed or freed. writer: Mutex>, - /// Number of encode tasks submitted to the pool but not yet completed. + /// FIFO queue of batches awaiting encode for this writer. + pending_queue: Mutex>, + /// Snapshot of `pending_queue.len()` exposed as an atomic so workers can skip writers + /// with no work without taking the queue lock. Kept in sync with the queue under the + /// queue lock by `submit_batch` (increments before notifying) and by workers (decrement + /// after popping). + queue_len: AtomicUsize, + /// Set to true by the worker currently encoding for this writer. Other workers skip + /// this writer while `busy` is true, even if `queue_len > 0`. + busy: AtomicBool, + /// True once this writer has been registered in `GlobalWorkerPool::active_writers`. + /// First submitter wins the CAS and performs the registration. + registered: AtomicBool, + /// Number of encode tasks submitted but not yet completed. Includes queued + in-flight. pending: AtomicUsize, /// Notified when `pending` drops to zero, so iceberg_writer_close can wait efficiently. done_notify: tokio::sync::Notify, /// First encode error encountered by a pool worker, if any. error: Mutex>, + /// Set by iceberg_writer_free to tell in-flight async encodes to drop the writer + /// instead of putting it back. Needed because async encode takes the writer out of + /// the Option for the duration of `w.write(batch).await`, releasing the Mutex so the + /// runtime thread can drive other tasks while parked on I/O. + poisoned: AtomicBool, } // Safety: ConcreteDataFileWriter is Send (verified by its use in spawn_blocking previously). unsafe impl Send for WriterState {} unsafe impl Sync for WriterState {} -/// Global pool of N=available_parallelism encode worker threads shared across all writers. -struct GlobalWorkerPool { - task_tx: tokio::sync::mpsc::Sender, +impl WriterState { + fn new(writer: ConcreteDataFileWriter) -> Self { + WriterState { + writer: Mutex::new(Some(writer)), + pending_queue: Mutex::new(VecDeque::new()), + queue_len: AtomicUsize::new(0), + busy: AtomicBool::new(false), + registered: AtomicBool::new(false), + pending: AtomicUsize::new(0), + done_notify: tokio::sync::Notify::new(), + error: Mutex::new(None), + poisoned: AtomicBool::new(false), + } + } } -static GLOBAL_ENCODE_POOL: OnceLock = OnceLock::new(); +/// Global pool of N encode worker tasks shared across all writers (N defaults to +/// 2 * available_parallelism so async workers parked on I/O don't starve cores of +/// encode CPU work; tune via `iceberg_set_encode_workers`). +/// +/// Replaces the previous single-MPMC channel design. Each writer owns its own queue; +/// workers scan the active-writer list looking for a writer that (a) has queued work and +/// (b) is not currently claimed by another worker. The first such writer is claimed +/// (`busy = true`), drained, then released. +/// +/// # Wakeup discipline +/// +/// `wake` is a single shared `Notify` for the whole pool. Both producers (`submit_batch`) +/// and workers (after releasing a writer that still has work) call `wake.notify_one()`. +/// To avoid stranded tasks when multiple producers fire concurrently and only one permit +/// can be stored, a worker that successfully claims a writer cascades the wakeup by +/// calling `wake.notify_one()` before draining — so if more writers have work, another +/// worker is roused to look. +pub(crate) struct GlobalWorkerPool { + /// Currently-registered writers. Workers iterate this on each pass looking for work. + /// Locked only briefly to snapshot the list (Arc clones); never held during encode. + active_writers: Mutex>>, + /// Wakeup channel for idle workers. Producers and finishing workers notify; idle + /// workers wait. See struct doc for the cascade discipline that prevents lost wakeups. + wake: tokio::sync::Notify, + /// Rotating start offset for the per-pass scan, so workers don't all collide on + /// writer 0 when several writers have work. + scan_offset: AtomicUsize, +} + +pub(crate) static GLOBAL_ENCODE_POOL: OnceLock = OnceLock::new(); + +impl GlobalWorkerPool { + /// Add a writer to the active set. Idempotent via the `registered` CAS on WriterState. + fn register(&self, state: &Arc) { + if state + .registered + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + let mut guard = self + .active_writers + .lock() + .unwrap_or_else(|e| e.into_inner()); + guard.push(state.clone()); + } + } + + /// Remove a writer from the active set. Called on free; idempotent. + fn unregister(&self, state: &WriterState) { + if !state.registered.swap(false, Ordering::AcqRel) { + return; + } + let mut guard = self + .active_writers + .lock() + .unwrap_or_else(|e| e.into_inner()); + let target = state as *const WriterState; + guard.retain(|s| Arc::as_ptr(s) != target); + } -// Thread-local storage for the most recent synchronous gather error. -// Set by iceberg_writer_write_gathered_columns on failure; consumed by iceberg_take_gather_error. -thread_local! { - static LAST_GATHER_ERROR: RefCell> = const { RefCell::new(None) }; + /// Snapshot the active-writers list. Returns Arc clones so subsequent encoding does + /// not hold the list lock. + fn snapshot(&self) -> Vec> { + let guard = self + .active_writers + .lock() + .unwrap_or_else(|e| e.into_inner()); + guard.clone() + } } -fn store_gather_error(e: &anyhow::Error) { - let msg = format!("{:#}", e); - LAST_GATHER_ERROR.with(|cell| { - *cell.borrow_mut() = CString::new(msg).ok(); - }); +/// Try to claim a writer with pending work. Returns the claimed writer (busy=true) or +/// None if no writer has work available right now. +/// +/// Scans the active-writers snapshot starting at a rotating offset so workers don't all +/// race for writer 0. +fn try_claim_writer(pool: &GlobalWorkerPool) -> Option> { + let writers = pool.snapshot(); + if writers.is_empty() { + return None; + } + let n = writers.len(); + let start = pool.scan_offset.fetch_add(1, Ordering::Relaxed) % n; + for i in 0..n { + let w = &writers[(start + i) % n]; + if w.queue_len.load(Ordering::Acquire) == 0 { + continue; + } + if w.busy + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + // Re-check after winning the claim: another worker may have drained the + // queue between our queue_len load and our CAS. If so, release and skip. + if w.queue_len.load(Ordering::Acquire) == 0 { + w.busy.store(false, Ordering::Release); + continue; + } + return Some(w.clone()); + } + } + None } -/// Returns a heap-allocated C string with the most recent gather error on this thread, -/// or NULL if none. Must be called on the same thread as the failed write call, immediately -/// after it returns. The caller must free the returned string with `iceberg_destroy_cstring`. -#[no_mangle] -pub extern "C" fn iceberg_take_gather_error() -> *mut c_char { - LAST_GATHER_ERROR.with(|cell| { - cell.borrow_mut() - .take() - .map(|s| s.into_raw()) - .unwrap_or(std::ptr::null_mut()) - }) +/// Drop guard that decrements `pending` exactly once. Used by the async encode path to +/// guarantee the counter still falls to zero (so `iceberg_writer_close` can return) even +/// if `w.write(...).await` panics and unwinds the task. +struct PendingGuard(Arc); + +impl Drop for PendingGuard { + fn drop(&mut self) { + let prev = self.0.pending.fetch_sub(1, Ordering::AcqRel); + if prev == 1 { + self.0.done_notify.notify_one(); + } + } } -/// Formats a Rust panic payload into an anyhow error, preserving the message where possible. -fn format_panic_error(panic: Box) -> anyhow::Error { - let msg = if let Some(s) = panic.downcast_ref::<&str>() { - format!("encode worker panicked: {}", s) - } else if let Some(s) = panic.downcast_ref::() { - format!("encode worker panicked: {}", s) - } else { - "encode worker panicked (no string payload)".to_string() +/// Encode a single batch for the given (already-claimed) writer. +/// +/// **Async**: takes the underlying writer out of `state.writer` under the std Mutex +/// (briefly), awaits `w.write(batch)` without holding any sync lock, then puts the writer +/// back — unless `iceberg_writer_free` has set `poisoned`, in which case the writer is +/// dropped. The take-put pattern is what lets the runtime thread go drive other tasks +/// while this one is parked in an S3 PUT inside `w.write()`. +/// +/// Stores any encode error in `state.error` (first-writer-wins). `pending` is decremented +/// exactly once via the `PendingGuard` Drop impl, so close() never hangs even on panic. +async fn encode_one_batch(state: Arc, batch: RecordBatch) { + let _pending = PendingGuard(state.clone()); + + // Test hook: bypass the real Parquet write so we can exercise the dispatch logic in + // isolation. Enabled only when a test installs a positive delay via `test_hooks`. + #[cfg(test)] + { + let delay_ms = test_hooks::DELAY_MS.load(Ordering::Relaxed); + if delay_ms > 0 { + test_hooks::run_hook(&state, &batch); + tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await; + return; + } + } + + // Take the writer out under the std Mutex. If the writer was already poisoned by + // a prior free(), we want to drop any writer that's still in the slot — but the + // Some/None of the slot itself tells us that. + let writer_opt = { + let mut guard = state.writer.lock().unwrap_or_else(|e| e.into_inner()); + guard.take() + }; + + let (mut writer_opt, result) = match writer_opt { + Some(mut w) => { + let r = w.write(batch).await.map_err(classify_iceberg); + (Some(w), r) + } + None => (None, Err(anyhow::anyhow!("writer already closed"))), }; - anyhow::anyhow!(msg) + + // Put the writer back unless free() ran during our .await — in which case + // `poisoned` is set and we drop the writer to honor the poison semantic. + if let Some(w) = writer_opt.take() { + if state.poisoned.load(Ordering::Acquire) { + drop(w); + } else { + *state.writer.lock().unwrap_or_else(|e| e.into_inner()) = Some(w); + } + } + + if let Err(e) = result { + let mut slot = state.error.lock().unwrap_or_else(|e| e.into_inner()); + if slot.is_none() { + *slot = Some(e); + } + } } -/// Body of each encode worker thread: receives tasks from the shared channel and encodes them. -fn encode_worker_loop( - task_rx: Arc>>, - handle: tokio::runtime::Handle, -) { +/// Drain the claimed writer's queue while we hold `busy`. Pops one batch at a time and +/// encodes it. The `busy` flag ensures FIFO per-writer ordering: while we hold it, no +/// other worker can interleave a pop on this writer's queue. +async fn drain_claimed_writer(state: Arc) { loop { - // Acquire the shared receiver lock, then wait for a task. - // The lock is released as soon as recv() returns, so workers are not serialized - // during encoding — only during task pickup. - let task = { - let mut rx = handle.block_on(task_rx.lock()); - match handle.block_on(rx.recv()) { - Some(t) => t, - None => break, // sender dropped → pool shutting down + let batch = { + let mut q = state + .pending_queue + .lock() + .unwrap_or_else(|e| e.into_inner()); + match q.pop_front() { + Some(b) => { + state.queue_len.fetch_sub(1, Ordering::AcqRel); + b + } + None => break, } }; + encode_one_batch(state.clone(), batch).await; + } +} - // Clone state before moving task into the closure so we can always decrement - // pending even if the closure panics. - let state = task.state.clone(); - let handle_enc = handle.clone(); - let encode_result = catch_unwind(AssertUnwindSafe(move || { - let mut guard = task.state.writer.lock().unwrap_or_else(|e| e.into_inner()); - match guard.as_mut() { - Some(w) => handle_enc - .block_on(w.write(task.batch)) - .map_err(|e| crate::error_codes::classify_iceberg(e)), - None => Err(anyhow::anyhow!("writer already closed")), - } - })); +/// Worker thread body: scan for a writer with work, claim it, drain its queue, release, +/// re-check (stranded-task mitigation), and either continue or wait for a wake-up. +/// +/// The wake-up protocol uses a single shared `Notify` with a cascade discipline. See the +/// docs on `GlobalWorkerPool` for the full picture; the key races are: +/// +/// - **Producer notification lost.** Tokio's `Notify` stores at most one permit, so if +/// two producers fire `notify_one()` while all workers are sleeping, only one worker +/// wakes. To prevent the second producer's work from being stranded, the woken worker +/// calls `wake.notify_one()` *before* it starts draining — cascading the wakeup so +/// another worker checks the remaining writers. +/// - **Push between last-pop and busy-release.** A producer pushes a batch after the +/// drain loop sees an empty queue but before this worker clears `busy`. The producer's +/// `notify_one()` may have been consumed by some other worker that ran an empty scan +/// and went back to sleep. Mitigation: after clearing `busy`, this worker re-reads +/// `queue_len`; if non-zero, it notifies again so someone re-claims the writer. +async fn encode_worker_loop(pool: &'static GlobalWorkerPool) { + loop { + // Pre-register interest in the next wake-up. `enable()` guarantees that any + // `notify_one()` issued from this point on will wake the future even if it + // hasn't been polled yet — so the check-then-wait below is race-free. + let notified = pool.wake.notified(); + tokio::pin!(notified); + notified.as_mut().enable(); - let err = match encode_result { - Ok(Ok(())) => None, - Ok(Err(e)) => Some(e), - Err(panic) => Some(format_panic_error(panic)), - }; - if let Some(e) = err { - let mut slot = state.error.lock().unwrap_or_else(|e| e.into_inner()); - if slot.is_none() { - *slot = Some(e); + if let Some(state) = try_claim_writer(pool) { + // Cascade: another writer may also have work. Wake a peer to look in + // parallel before we commit to draining this one. + pool.wake.notify_one(); + + drain_claimed_writer(state.clone()).await; + + // Release the claim. After this point another worker is free to claim + // the writer. + state.busy.store(false, Ordering::Release); + + // Stranded-task mitigation: a producer may have pushed between our last + // pop and our release. If so, ensure a worker is woken to handle it. + if state.queue_len.load(Ordering::Acquire) > 0 { + pool.wake.notify_one(); } + continue; } - // Always decrement pending; notify close() if this was the last task. - let prev = state.pending.fetch_sub(1, Ordering::AcqRel); - if prev == 1 { - state.done_notify.notify_one(); - } + // Nothing to claim — go to sleep until notified. + notified.await; } } -/// Desired encode worker count. 0 means "use available_parallelism". +/// Desired encode worker count. 0 means "use 2 * available_parallelism", which +/// oversubscribes the core count on purpose: encode worker tasks are async, so workers +/// parked on S3 PUTs don't cost CPU and we want enough total tasks for the runtime to +/// keep cores busy with CPU encode while others wait on I/O. /// Must be set before the first iceberg_writer_new call. static ENCODE_WORKERS: AtomicUsize = AtomicUsize::new(0); -/// Set the number of encode worker threads in the global pool. +/// Set the number of encode worker tasks in the global pool. /// Must be called before any writer is created. Returns 0 on success, 1 if the pool is /// already initialized (call ignored). #[no_mangle] @@ -253,46 +459,71 @@ pub extern "C" fn iceberg_set_encode_workers(n: i32) -> i32 { /// Initialize the global encode pool on first call. /// Must be called from within a Tokio runtime (iceberg_writer_new satisfies this). fn get_or_init_encode_pool() -> &'static GlobalWorkerPool { - GLOBAL_ENCODE_POOL.get_or_init(|| { + static INIT: std::sync::Once = std::sync::Once::new(); + INIT.call_once(|| { let configured = ENCODE_WORKERS.load(Ordering::Relaxed); let n = if configured > 0 { configured } else { // available_parallelism() only fails on unusual platforms (embedded, some sandboxes). // On Linux/macOS/Windows it always succeeds, so the unwrap never fires in practice. - thread::available_parallelism().unwrap().get() + // 2x: oversubscribe so async workers parked on I/O leave room for other workers + // to do encode CPU on the freed runtime threads. + thread::available_parallelism().unwrap().get() * 2 }; let handle = tokio::runtime::Handle::current(); - // Buffer 2× workers — drain tasks are rarely blocked on submit. - let (task_tx, task_rx) = tokio::sync::mpsc::channel::(n * 2); - let task_rx = Arc::new(tokio::sync::Mutex::new(task_rx)); - for i in 0..n { - let task_rx = task_rx.clone(); - let handle = handle.clone(); - thread::Builder::new() - .name(format!("iceberg-encode-{}", i)) - .spawn(move || encode_worker_loop(task_rx, handle)) - .expect("failed to spawn iceberg encode worker"); - } + // Install the pool first so workers can reference it as `&'static`. + GLOBAL_ENCODE_POOL + .set(GlobalWorkerPool { + active_writers: Mutex::new(Vec::new()), + wake: tokio::sync::Notify::new(), + scan_offset: AtomicUsize::new(0), + }) + .ok() + .expect("encode pool initialized twice"); + let pool_ref: &'static GlobalWorkerPool = + GLOBAL_ENCODE_POOL.get().expect("pool was just installed"); - GlobalWorkerPool { task_tx } - }) + // Spawn N async worker tasks on the tokio runtime. Each task runs + // `encode_worker_loop`, which awaits at I/O boundaries inside `w.write()` — + // freeing the runtime thread to drive other tasks (other writers' encodes) + // during S3 PUTs. Number of in-flight encodes is no longer bounded by OS + // thread count, only by core count for actual CPU work. + for _ in 0..n { + handle.spawn(encode_worker_loop(pool_ref)); + } + }); + GLOBAL_ENCODE_POOL + .get() + .expect("encode pool not installed by INIT") } /// Opaque writer handle for FFI. /// -/// Writing is pipelined: Julia gathers a RecordBatch and submits it directly to the -/// global encode pool, then returns immediately. Pool workers (N = available_parallelism) +/// Writing is pipelined: Julia hands one upstream slice at a time to `iceberg_writer_append`, +/// which copies it into the embedded `RecordBatchBuilder`'s per-column buffers. When the +/// builder hits the coalesce window, the writer finalizes a `RecordBatch` and submits it to +/// the global encode pool. Pool workers (async tasks; default N = 2 * available_parallelism) /// encode Parquet concurrently across all active writers. pub struct IcebergDataFileWriter { - /// Arrow schema for this table, used by write_columns to create RecordBatches. + /// Arrow schema for this table; used to set up the embedded builder and to assemble + /// RecordBatches from IPC writes. pub(crate) arrow_schema: ArrowSchemaRef, + /// Per-column type codes derived from `arrow_schema` at construction time. Drives the + /// builder's copy/conversion logic. + pub(crate) col_types: Vec, + /// Lazily-constructed RecordBatch builder. `UnsafeCell` because the FFI dereferences + /// the writer as `&IcebergDataFileWriter` (one writer is only ever accessed from one + /// Julia thread, so interior mutability without locking is sound). + pub(crate) builder: UnsafeCell>, /// Shared state: owns the ConcreteDataFileWriter, tracks pending count and errors. pub(crate) writer_state: Arc, } unsafe impl Send for IcebergDataFileWriter {} +// Safety: callers must ensure each writer is accessed from one Julia thread at a time — +// the FFI contract. `builder` is the only `!Sync` field; everything else is Sync via Arc. unsafe impl Sync for IcebergDataFileWriter {} /// Type alias for writer response @@ -313,142 +544,134 @@ fn store_writer_error(writer_ref: &IcebergDataFileWriter, e: anyhow::Error) { } } -/// Build a `RecordBatch` from a slice of `GatheredColumnDescriptor`s. +/// Map an Arrow `DataType` to the corresponding `COLUMN_TYPE_*` code the builder uses. /// -/// # Safety -/// All pointers inside each `GatheredColumnDescriptor` must be valid for the duration of -/// this call (callers hold `GC.@preserve` or equivalent). -unsafe fn build_record_batch( - arrow_schema: ArrowSchemaRef, - col_descs: I, -) -> Result -where - I: IntoIterator, - I::IntoIter: ExactSizeIterator, -{ - let iter = col_descs.into_iter(); - let mut arrays = Vec::with_capacity(iter.len()); - for (i, desc) in iter.enumerate() { - arrays.push(unsafe { build_arrow_array_gathered(&desc, arrow_schema.field(i))? }); - } - RecordBatch::try_new(arrow_schema, arrays) - .map_err(|_| anyhow::anyhow!("failed to construct RecordBatch")) +/// Date and Timestamp variants default to the Julia-epoch codes because that's the natural +/// shape of incoming Julia data (`Dates.value(d)` of a Julia `Date` returns days since +/// 0001-01-01). Users sending pre-converted Unix-epoch integers are an edge case the new +/// schema-driven API doesn't address — they'd need to pre-convert and write integers +/// against a plain integer column. +fn arrow_type_to_column_type(dt: &DataType) -> Result { + Ok(match dt { + DataType::Int32 => COLUMN_TYPE_INT32, + DataType::Int64 => COLUMN_TYPE_INT64, + DataType::Float32 => COLUMN_TYPE_FLOAT32, + DataType::Float64 => COLUMN_TYPE_FLOAT64, + DataType::Utf8 => COLUMN_TYPE_STRING, + DataType::Boolean => COLUMN_TYPE_BOOLEAN, + DataType::Date32 => COLUMN_TYPE_JULIA_DATE, + DataType::Timestamp(TimeUnit::Microsecond, None) => COLUMN_TYPE_JULIA_TIMESTAMP, + DataType::Timestamp(TimeUnit::Microsecond, Some(_)) => COLUMN_TYPE_JULIA_TIMESTAMPTZ, + DataType::Timestamp(TimeUnit::Nanosecond, None) => COLUMN_TYPE_JULIA_TIMESTAMP_NS, + DataType::Timestamp(TimeUnit::Nanosecond, Some(_)) => COLUMN_TYPE_JULIA_TIMESTAMPTZ_NS, + DataType::FixedSizeBinary(16) => COLUMN_TYPE_UUID, + DataType::Decimal128(p, _) => { + if *p <= 9 { + COLUMN_TYPE_DECIMAL_INT32 + } else if *p <= 18 { + COLUMN_TYPE_DECIMAL_INT64 + } else { + COLUMN_TYPE_DECIMAL_INT128 + } + } + other => { + return Err(anyhow::anyhow!( + "Unsupported Arrow type for column writer: {:?}", + other + )) + } + }) } -/// Submit a `RecordBatch` to the global encode pool. +/// Get the embedded builder, constructing it on first access. /// -/// Increments the writer's pending count before sending and rolls it back on channel failure. -fn submit_batch( +/// # Safety +/// Caller must ensure no other thread is accessing the writer at the same time. The FFI +/// contract is one Julia thread per writer. +#[allow(clippy::mut_from_ref)] +unsafe fn get_or_init_builder( writer_ref: &IcebergDataFileWriter, - pool: &GlobalWorkerPool, - batch: RecordBatch, -) -> Result<(), anyhow::Error> { - writer_ref - .writer_state - .pending - .fetch_add(1, Ordering::AcqRel); - let task = EncodeTask { - batch, - state: writer_ref.writer_state.clone(), - }; - match pool.task_tx.blocking_send(task) { - Ok(()) => Ok(()), - Err(_) => { - let prev = writer_ref - .writer_state - .pending - .fetch_sub(1, Ordering::AcqRel); - if prev == 1 { - writer_ref.writer_state.done_notify.notify_one(); - } - Err(anyhow::anyhow!("encode pool channel closed unexpectedly")) - } +) -> Result<&mut RecordBatchBuilder, anyhow::Error> { + let slot = unsafe { &mut *writer_ref.builder.get() }; + if slot.is_none() { + *slot = Some(RecordBatchBuilder::new( + writer_ref.arrow_schema.clone(), + &writer_ref.col_types, + DEFAULT_COALESCE_ROWS, + )?); } + Ok(slot.as_mut().unwrap()) } -/// Validates column count, builds a `RecordBatch` from pre-built gathered descriptors, -/// and submits it to the encode pool. -unsafe fn write_gathered_inner( +/// Finalize the current window of the builder (if non-empty) and submit the resulting +/// `RecordBatch` to the encode pool. Resets the builder in-place for the next window. +fn flush_builder( writer_ref: &IcebergDataFileWriter, pool: &GlobalWorkerPool, - arrow_schema: ArrowSchemaRef, - num_columns: usize, - col_descs: I, -) -> Result<(), anyhow::Error> -where - I: IntoIterator, - I::IntoIter: ExactSizeIterator, -{ - if num_columns != arrow_schema.fields().len() { - return Err(anyhow::anyhow!( - "Column count mismatch: got {} but schema has {}", - num_columns, - arrow_schema.fields().len() - )); - } - let batch = unsafe { build_record_batch(arrow_schema, col_descs) }?; +) -> Result<(), anyhow::Error> { + let slot = unsafe { &mut *writer_ref.builder.get() }; + let Some(builder) = slot.as_mut() else { + return Ok(()); + }; + if builder.rows() == 0 { + return Ok(()); + } + let batch = builder.take_record_batch()?; submit_batch(writer_ref, pool, batch) } -/// Validates column count, builds a `RecordBatch` from flat `ColumnDescriptor`s (each -/// treated as a single sequential slice), and submits it to the encode pool. +/// Submit a `RecordBatch` to the writer's queue. Lazily registers the writer with the +/// global pool on first submit, then pushes onto the per-writer FIFO queue and notifies +/// the pool that there is work available somewhere. /// -/// Each `SliceRef` is constructed on the stack and used within the same loop iteration, -/// so no heap allocation is needed for the descriptor conversion. -unsafe fn write_columns_inner( +/// `pending` (queued + in-flight) is incremented under the queue lock so that +/// `iceberg_writer_close` sees a consistent count. +pub(crate) fn submit_batch( writer_ref: &IcebergDataFileWriter, pool: &GlobalWorkerPool, - arrow_schema: ArrowSchemaRef, - col_descs: &[ColumnDescriptor], + batch: RecordBatch, ) -> Result<(), anyhow::Error> { - if col_descs.len() != arrow_schema.fields().len() { - return Err(anyhow::anyhow!( - "Column count mismatch: got {} but schema has {}", - col_descs.len(), - arrow_schema.fields().len() - )); - } - let mut arrays = Vec::with_capacity(col_descs.len()); - for (i, d) in col_descs.iter().enumerate() { - // SliceRef lives on the stack for exactly this iteration; the raw pointer - // is consumed by build_arrow_array_gathered before the next iteration begins. - let slice = SliceRef { - data_ptr: d.data_ptr, - lengths_ptr: d.lengths_ptr, - validity_ptr: d.validity_ptr, - sel_ptr: std::ptr::null(), - len: d.num_rows, - }; - let desc = GatheredColumnDescriptor { - slices: &slice as *const SliceRef, - num_slices: 1, - total_rows: d.num_rows, - column_type: d.column_type, - is_nullable: d.is_nullable, - }; - arrays.push(unsafe { build_arrow_array_gathered(&desc, arrow_schema.field(i))? }); + // Idempotent — only the first submit pays the lock to push into active_writers. + pool.register(&writer_ref.writer_state); + + { + let mut q = writer_ref + .writer_state + .pending_queue + .lock() + .unwrap_or_else(|e| e.into_inner()); + q.push_back(batch); + // Increment counters under the lock so queue and counters stay consistent. + writer_ref + .writer_state + .queue_len + .fetch_add(1, Ordering::AcqRel); + writer_ref + .writer_state + .pending + .fetch_add(1, Ordering::AcqRel); } - let batch = RecordBatch::try_new(arrow_schema, arrays) - .map_err(|_| anyhow::anyhow!("failed to construct RecordBatch"))?; - submit_batch(writer_ref, pool, batch) + + pool.wake.notify_one(); + Ok(()) } -/// Gather column data from Julia memory into Arrow arrays in the calling thread, then -/// submit the RecordBatch to the global encode pool asynchronously. +/// Append one `RowChunk` (one `ColumnSlice` per output column) to the embedded builder. /// -/// Julia keeps source arrays alive via `GC.@preserve` for the duration of this call. -/// After this function returns, all Julia pointers have been consumed and Julia may safely -/// release the source data. Encode is still asynchronous in the global pool; call -/// `iceberg_writer_close` to wait for all pending encodes. +/// Rust copies all slice data synchronously into per-column buffers; source memory may be +/// released the moment this call returns. If the post-append row count reaches +/// `coalesce_rows`, the builder is finalized into a `RecordBatch` and submitted to the +/// encode pool (auto-flush). The window may end up slightly over `coalesce_rows` — we +/// never split a slice mid-append, preserving the byte-aligned fast paths. /// -/// Returns 0 on success, -1 on error (error stored in writer state, propagated on close). +/// Returns 0 on success, -1 on error (error stored in writer state, surfaced on close). #[no_mangle] -pub extern "C" fn iceberg_writer_write_gathered_columns( +pub extern "C" fn iceberg_writer_append( writer: *mut IcebergDataFileWriter, - columns: *const GatheredColumnDescriptor, + slices: *const ColumnSlice, num_columns: usize, ) -> i32 { - if writer.is_null() || columns.is_null() || num_columns == 0 { + if writer.is_null() || slices.is_null() || num_columns == 0 { return -1; } let writer_ref = unsafe { &*writer }; @@ -459,37 +682,36 @@ pub extern "C" fn iceberg_writer_write_gathered_columns( return -1; } }; - let arrow_schema = writer_ref.arrow_schema.clone(); - let col_descs = unsafe { std::slice::from_raw_parts(columns, num_columns) }; - if let Err(e) = unsafe { - write_gathered_inner( - writer_ref, - pool, - arrow_schema, - num_columns, - col_descs.iter().copied(), - ) - } { - store_gather_error(&e); + let slices_slice = unsafe { std::slice::from_raw_parts(slices, num_columns) }; + + let result = (|| -> Result<(), anyhow::Error> { + let builder = unsafe { get_or_init_builder(writer_ref) }?; + unsafe { builder.append_slice(slices_slice) }?; + if builder.should_flush() { + // Take the batch and submit. Do this after the borrow ends. + let batch = builder.take_record_batch()?; + submit_batch(writer_ref, pool, batch)?; + } + Ok(()) + })(); + + if let Err(e) = result { store_writer_error(writer_ref, e); return -1; } 0 } -/// Synchronous write of flat column data: copies each column from Julia memory into -/// Rust-owned Arrow arrays in the calling thread, then submits to the global encode -/// pool asynchronously. +/// Force the builder to flush its current (partial) window to the encode pool. /// -/// Each `ColumnDescriptor` is treated as a single sequential slice (no scatter/gather). -/// Returns 0 on success, -1 on error (error stored in writer state, propagated on close). +/// Use this on logical boundaries (end of transaction, time tick) when you want a Parquet +/// row group break that doesn't naturally fall at `coalesce_rows`. No-op if the builder +/// is empty or hasn't been initialized. +/// +/// Returns 0 on success, -1 on error (error stored in writer state, surfaced on close). #[no_mangle] -pub extern "C" fn iceberg_writer_write_columns( - writer: *mut IcebergDataFileWriter, - columns: *const ColumnDescriptor, - num_columns: usize, -) -> i32 { - if writer.is_null() || columns.is_null() || num_columns == 0 { +pub extern "C" fn iceberg_writer_flush(writer: *mut IcebergDataFileWriter) -> i32 { + if writer.is_null() { return -1; } let writer_ref = unsafe { &*writer }; @@ -500,21 +722,28 @@ pub extern "C" fn iceberg_writer_write_columns( return -1; } }; - let arrow_schema = writer_ref.arrow_schema.clone(); - let col_descs = unsafe { std::slice::from_raw_parts(columns, num_columns) }; - if let Err(e) = unsafe { write_columns_inner(writer_ref, pool, arrow_schema, col_descs) } { + if let Err(e) = flush_builder(writer_ref, pool) { store_writer_error(writer_ref, e); return -1; } 0 } -/// Free a writer. Poisons the writer state so any in-flight pool tasks fail gracefully. +/// Free a writer. Poisons the writer state so any in-flight pool tasks fail gracefully, +/// and unregisters the writer from the global pool's active-writers list so workers stop +/// scanning it. #[no_mangle] pub extern "C" fn iceberg_writer_free(writer: *mut IcebergDataFileWriter) { if !writer.is_null() { unsafe { let boxed = Box::from_raw(writer); + if let Some(pool) = GLOBAL_ENCODE_POOL.get() { + pool.unregister(&boxed.writer_state); + } + // Set the poison flag BEFORE taking the writer out, so any encode task that + // currently holds the writer outside the Mutex (across its `.await`) will see + // `poisoned == true` when it goes to put the writer back, and drop it instead. + boxed.writer_state.poisoned.store(true, Ordering::Release); // Poison the ConcreteDataFileWriter so any in-flight pool tasks return an error // rather than writing to a partially-freed writer. let _ = boxed.writer_state.writer.lock().unwrap().take(); @@ -524,7 +753,8 @@ pub extern "C" fn iceberg_writer_free(writer: *mut IcebergDataFileWriter) { // Create a new DataFileWriter from a table with configuration options. // -// The global encode pool (N = available_parallelism threads) is initialized on the first call. +// The global encode pool (N async worker tasks, default 2 * available_parallelism) is +// initialized on the first call. export_runtime_op!( iceberg_writer_new, IcebergDataFileWriterResponse, @@ -587,24 +817,29 @@ export_runtime_op!( .await .map_err(|e| anyhow::anyhow!("Failed to build data file writer: {}", e))?; - // Convert Iceberg schema to Arrow schema for use in write_columns + // Convert Iceberg schema to Arrow schema for use by both the IPC and append paths. let arrow_schema = Arc::new( schema_to_arrow_schema(table.metadata().current_schema().as_ref()) .map_err(|e| anyhow::anyhow!("Failed to convert schema to Arrow: {}", e))? ); + // Derive the per-column type codes from the Arrow schema; this is what the + // embedded builder uses to drive copy/conversion decisions. + let col_types: Vec = arrow_schema + .fields() + .iter() + .map(|f| arrow_type_to_column_type(f.data_type())) + .collect::>()?; + // Initialize global pool (no-op if already running). get_or_init_encode_pool(); - let writer_state = Arc::new(WriterState { - writer: Mutex::new(Some(concrete_writer)), - pending: AtomicUsize::new(0), - done_notify: tokio::sync::Notify::new(), - error: Mutex::new(None), - }); + let writer_state = Arc::new(WriterState::new(concrete_writer)); Ok::(IcebergDataFileWriter { arrow_schema, + col_types, + builder: UnsafeCell::new(None), writer_state, }) }, @@ -638,6 +873,12 @@ pub extern "C" fn iceberg_writer_write( } }; + // Flush any pending builder window first so IPC batches don't reorder around append!. + if let Err(e) = flush_builder(writer_ref, pool) { + store_writer_error(writer_ref, e); + return -1; + } + let ipc_bytes = unsafe { std::slice::from_raw_parts(arrow_ipc_data, arrow_ipc_len).to_vec() }; let cursor = Cursor::new(ipc_bytes); @@ -682,6 +923,13 @@ export_runtime_op!( }, writer_ref, async { + // Flush any partial-window remainder in the embedded builder before we wait. + if let Some(pool) = GLOBAL_ENCODE_POOL.get() { + if let Err(e) = flush_builder(writer_ref, pool) { + store_writer_error(writer_ref, e); + } + } + // Wait for all pending pool encodes to complete. // Uses a timeout to guard against a dead worker thread (e.g. panic outside // catch_unwind) that would otherwise leave pending > 0 forever. @@ -729,3 +977,277 @@ export_runtime_op!( }, writer: *mut IcebergDataFileWriter ); + +// ───────────────────────────────────────────────────────────────────────────── +// Test hooks + dispatch tests +// ───────────────────────────────────────────────────────────────────────────── + +#[cfg(test)] +pub(crate) mod test_hooks { + use std::sync::atomic::AtomicU64; + use std::sync::Mutex; + + use arrow_array::{Array, Int64Array, RecordBatch}; + + use super::WriterState; + use std::sync::Arc; + + /// When non-zero, `encode_one_batch` skips the real Parquet write, sleeps this many + /// milliseconds, and records the completion. Used by dispatch-logic tests. + pub(crate) static DELAY_MS: AtomicU64 = AtomicU64::new(0); + + /// Recorded `(writer_id, batch_id)` for each completed encode while `DELAY_MS > 0`. + /// `writer_id` is the `Arc` pointer cast to usize. `batch_id` is read + /// from the batch's first column (assumed to be an Int64Array of length 1). + pub(crate) static COMPLETIONS: Mutex> = Mutex::new(Vec::new()); + + pub(crate) fn run_hook(state: &Arc, batch: &RecordBatch) { + let id = batch + .column(0) + .as_any() + .downcast_ref::() + .map(|a| a.value(0)) + .unwrap_or(-1); + let writer_id = Arc::as_ptr(state) as usize; + COMPLETIONS.lock().unwrap().push((writer_id, id)); + } + + pub(crate) fn reset() { + DELAY_MS.store(0, std::sync::atomic::Ordering::Relaxed); + COMPLETIONS.lock().unwrap().clear(); + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::atomic::Ordering; + use std::sync::{Arc, Mutex}; + use std::time::Duration; + + use arrow_array::{Int64Array, RecordBatch}; + use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + + use super::*; + + /// Serializes all dispatch tests so they don't trample the shared global pool + /// state (`DELAY_MS`, `COMPLETIONS`, `active_writers`). + static TEST_SERIAL: Mutex<()> = Mutex::new(()); + + /// A long-lived multi-threaded runtime that the global encode pool can pin its + /// `Handle` to across tests. `#[tokio::test]` builds a fresh runtime per test and + /// drops it at end, which would invalidate the workers' handles. + fn pinned_runtime() -> &'static tokio::runtime::Runtime { + static RT: std::sync::OnceLock = std::sync::OnceLock::new(); + RT.get_or_init(|| { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_all() + .build() + .unwrap() + }) + } + + fn batch_with_id(id: i64) -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "id", + DataType::Int64, + false, + )])); + RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![id]))]).unwrap() + } + + /// Constructs a WriterState with no underlying Parquet writer. Safe because the + /// test hook bypasses the real `w.write(batch)` path. + fn mock_writer_state() -> Arc { + Arc::new(WriterState { + writer: Mutex::new(None), + pending_queue: Mutex::new(std::collections::VecDeque::new()), + queue_len: std::sync::atomic::AtomicUsize::new(0), + busy: std::sync::atomic::AtomicBool::new(false), + registered: std::sync::atomic::AtomicBool::new(false), + pending: std::sync::atomic::AtomicUsize::new(0), + done_notify: tokio::sync::Notify::new(), + error: Mutex::new(None), + poisoned: std::sync::atomic::AtomicBool::new(false), + }) + } + + /// Push a batch onto a WriterState's queue, registering with the pool and waking it. + /// Mirrors `submit_batch` but takes a bare WriterState (so tests can use mock states + /// without going through IcebergDataFileWriter). + fn push(pool: &GlobalWorkerPool, state: &Arc, batch: RecordBatch) { + pool.register(state); + { + let mut q = state.pending_queue.lock().unwrap(); + q.push_back(batch); + state.queue_len.fetch_add(1, Ordering::AcqRel); + state.pending.fetch_add(1, Ordering::AcqRel); + } + pool.wake.notify_one(); + } + + fn wait_for_pending_zero(state: &WriterState, timeout: Duration) -> bool { + let start = std::time::Instant::now(); + while state.pending.load(Ordering::Acquire) > 0 { + if start.elapsed() > timeout { + return false; + } + std::thread::sleep(Duration::from_millis(5)); + } + true + } + + /// Initializes the global encode pool from inside the pinned runtime. Safe to call + /// many times — only the first call does any work. + fn ensure_pool() -> &'static GlobalWorkerPool { + let _g = pinned_runtime().enter(); + get_or_init_encode_pool() + } + + /// Detach any mock writers we registered with the pool so a subsequent test starts + /// from a clean active-writer list. Doesn't shut down the workers (they're shared). + fn cleanup_writers(pool: &GlobalWorkerPool, states: &[Arc]) { + for s in states { + pool.unregister(s); + } + } + + /// Fairness: with 4 writers each holding 8 queued batches and N>=4 workers, the new + /// dispatch should drain all 4 writers in parallel rather than serializing one + /// writer at a time. + /// + /// We assert two properties: + /// 1. Per-writer FIFO: each writer's batches complete in submission order. + /// 2. Parallelism: within any group of 4 consecutive completions, all 4 writers + /// appear — i.e., a round-robin pattern emerges naturally because each writer + /// is being drained by its own worker, all sleeping for the same delay. + #[test] + fn fairness_drains_writers_in_parallel() { + let _serial = TEST_SERIAL.lock().unwrap(); + let pool = ensure_pool(); + + test_hooks::reset(); + test_hooks::DELAY_MS.store(20, Ordering::Relaxed); + + let writers: Vec> = (0..4).map(|_| mock_writer_state()).collect(); + let writer_ids: HashMap = writers + .iter() + .enumerate() + .map(|(i, s)| (Arc::as_ptr(s) as usize, i)) + .collect(); + + // Submit interleaved: round 0 of every writer, then round 1, etc. + for round in 0..8i64 { + for (i, w) in writers.iter().enumerate() { + let batch_id = (i as i64) * 100 + round; + push(pool, w, batch_with_id(batch_id)); + } + } + + for w in &writers { + assert!( + wait_for_pending_zero(w, Duration::from_secs(10)), + "writer did not drain in time" + ); + } + + let completions = test_hooks::COMPLETIONS.lock().unwrap().clone(); + // 4 writers × 8 batches = 32 completions. + assert_eq!(completions.len(), 32); + + // (1) FIFO per writer: filter completions by writer and check batch IDs ascend. + for (i, w) in writers.iter().enumerate() { + let id = Arc::as_ptr(w) as usize; + let ids: Vec = completions + .iter() + .filter(|(wid, _)| *wid == id) + .map(|(_, bid)| *bid) + .collect(); + assert_eq!(ids.len(), 8, "writer {} missing batches", i); + for j in 0..8 { + assert_eq!( + ids[j], + (i as i64) * 100 + j as i64, + "writer {} batch {} out of order: {:?}", + i, + j, + ids + ); + } + } + + // (2) Parallelism: each group of 4 consecutive completions should contain 4 + // distinct writers. With <4 workers in the pool this would fail; on any modern + // dev machine `available_parallelism() >= 4`. + for chunk in completions.chunks(4) { + let distinct: std::collections::HashSet = + chunk.iter().map(|(wid, _)| writer_ids[wid]).collect(); + assert_eq!( + distinct.len(), + 4, + "expected 4 distinct writers per round, got {:?}", + chunk + ); + } + + cleanup_writers(pool, &writers); + test_hooks::reset(); + } + + /// Stranded-task race: hammer the pool with many submits across many writers and + /// verify that every submitted batch is eventually drained — i.e., `pending` always + /// converges to zero, no batch sits forever in a per-writer queue because of a + /// missed wake-up. + #[test] + fn no_stranded_tasks_under_load() { + let _serial = TEST_SERIAL.lock().unwrap(); + let pool = ensure_pool(); + + test_hooks::reset(); + // Tiny delay (1ms) so a) the test runs fast, b) producers and drains + // genuinely race rather than one always preceding the other. + test_hooks::DELAY_MS.store(1, Ordering::Relaxed); + + const WRITERS: usize = 8; + const BATCHES_PER_WRITER: usize = 200; + let writers: Vec> = (0..WRITERS).map(|_| mock_writer_state()).collect(); + + // Drive submissions from several threads to maximize interleaving. + let mut handles = Vec::new(); + for tid in 0..4 { + let writers = writers.clone(); + let pool: &'static GlobalWorkerPool = pool; + handles.push(std::thread::spawn(move || { + for batch_idx in 0..(BATCHES_PER_WRITER / 4) { + for (wi, w) in writers.iter().enumerate() { + let id = (tid as i64) * 1_000_000 + (wi as i64) * 10_000 + batch_idx as i64; + push(pool, w, batch_with_id(id)); + } + } + })); + } + for h in handles { + h.join().unwrap(); + } + + // Wait for every writer's pending to drop to zero. If any single writer's queue + // is stranded, this would time out. + for (i, w) in writers.iter().enumerate() { + assert!( + wait_for_pending_zero(w, Duration::from_secs(30)), + "writer {} did not drain; pending={} queue_len={}", + i, + w.pending.load(Ordering::Acquire), + w.queue_len.load(Ordering::Acquire), + ); + assert_eq!(w.queue_len.load(Ordering::Acquire), 0); + } + + let total = test_hooks::COMPLETIONS.lock().unwrap().len(); + assert_eq!(total, WRITERS * BATCHES_PER_WRITER); + + cleanup_writers(pool, &writers); + test_hooks::reset(); + } +} diff --git a/iceberg_rust_ffi/src/writer_columns.rs b/iceberg_rust_ffi/src/writer_columns.rs index 4cb83eb..ac9e1a6 100644 --- a/iceberg_rust_ffi/src/writer_columns.rs +++ b/iceberg_rust_ffi/src/writer_columns.rs @@ -1,19 +1,9 @@ -/// Column-based writer support for iceberg_rust_ffi +/// Shared FFI types and column-type constants for the column-based write path. /// -/// This module provides FFI bindings for writing raw column data directly to Parquet, -/// avoiding the overhead of Arrow IPC serialization. Julia passes raw column pointers -/// and metadata, and Rust builds Arrow arrays directly from them. +/// The only consumer of `ColumnSlice` outside this module is `record_batch_builder.rs`, +/// which owns all Arrow array construction. The struct stays here because it is part of +/// the FFI ABI surface that Julia constructs. use std::ffi::c_void; -use std::sync::Arc; - -use arrow_array::{ - types::{ - Date32Type, Decimal128Type, Float32Type, Float64Type, Int32Type, Int64Type, - TimestampMicrosecondType, - }, - ArrayRef, BooleanArray, PrimitiveArray, StringArray, -}; -use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; /// Column type codes (must match Julia's ColumnType enum) pub const COLUMN_TYPE_INT32: i32 = 0; @@ -32,47 +22,33 @@ pub const COLUMN_TYPE_DECIMAL_INT32: i32 = 10; pub const COLUMN_TYPE_DECIMAL_INT64: i32 = 11; /// Decimal backed by Int128 (precision > 18): data is i128[] scaled integers pub const COLUMN_TYPE_DECIMAL_INT128: i32 = 12; - -/// Descriptor for a single column passed from Julia -#[repr(C)] -#[derive(Clone, Copy)] -pub struct ColumnDescriptor { - /// Pointer to the raw data (interpretation depends on column_type) - /// For strings: pointer to array of string pointers (Ptr{UInt8}[]) - pub data_ptr: *const c_void, - /// For string columns: pointer to lengths array (Int64[]) - /// For other types: unused (C_NULL) - pub lengths_ptr: *const i64, - /// Pointer to validity bitmap (only if is_nullable is true) - /// Points to bit-packed data from Julia's BitVector.chunks (UInt64 array) - /// Bit i is 1 if row i is valid, 0 if null - pub validity_ptr: *const u8, - /// Number of rows - pub num_rows: usize, - /// Column type (see COLUMN_TYPE_* constants) - pub column_type: i32, - /// Whether this column is nullable - pub is_nullable: bool, -} - -unsafe impl Send for ColumnDescriptor {} -unsafe impl Sync for ColumnDescriptor {} - -// ============================================================================= -// Scattered-gather writer: pass raw source pointers + selection indices to Rust, -// which gathers the data directly into Arrow arrays — eliminating the Julia-side -// staging copy for non-converting numeric column types. -// ============================================================================= - -/// A reference to one slice of source column data. -/// `sel_ptr = null` → sequential (identity) access: read data[0..len]. -/// `sel_ptr != null` → scattered access: read data[sel[i]-1] for i in 0..len (1-based Julia indices). -/// `validity_ptr = null` → all rows valid (non-nullable or known all-valid slice). -/// `lengths_ptr != null` → string column: data_ptr is Ptr{UInt8}[], lengths_ptr is Int64[] of byte lengths per string. -/// Fields are all 8 bytes — no padding, total 40 bytes. +/// Julia-epoch date: source data is i64[] of days since 0001-01-01; Rust subtracts 719163 and writes i32 Date32. +pub const COLUMN_TYPE_JULIA_DATE: i32 = 13; +/// Julia-epoch timestamp: source data is i64[] of ms since 0001-01-01; Rust converts to μs since Unix epoch. +pub const COLUMN_TYPE_JULIA_TIMESTAMP: i32 = 14; +/// Julia-epoch timestamp with UTC timezone: same conversion as JULIA_TIMESTAMP, UTC-tagged. +pub const COLUMN_TYPE_JULIA_TIMESTAMPTZ: i32 = 15; +/// Julia-epoch nanosecond timestamp: source data is i64[] of ms since 0001-01-01; Rust converts to ns since Unix epoch. +pub const COLUMN_TYPE_JULIA_TIMESTAMP_NS: i32 = 16; +/// Julia-epoch nanosecond timestamp with UTC timezone. +pub const COLUMN_TYPE_JULIA_TIMESTAMPTZ_NS: i32 = 17; + +/// One column's contribution to a single `RowChunk` — a reference to source data the +/// builder will copy on `append`. All fields are 8 bytes; total struct size is 40 bytes +/// with no padding. +/// +/// - `sel_ptr = null` → sequential (identity) access: read `data[0..len]`. `data_ptr` +/// must be valid for `len` elements. +/// - `sel_ptr != null` → scattered access: read `data[sel[i] - 1]` for `i in 0..len` +/// (1-based Julia indices). `data_ptr` must be valid for `max(sel)` elements; the +/// source array is typically longer than `len`, so the gather path uses raw pointer +/// arithmetic rather than a `&[T]` of length `len`. +/// - `validity_ptr = null` → all rows in this slice are valid. +/// - `lengths_ptr != null` → string column: `data_ptr` is `*const *const u8`, +/// `lengths_ptr` is `*const i64` of byte lengths per string. #[repr(C)] #[derive(Clone, Copy)] -pub struct SliceRef { +pub struct ColumnSlice { pub data_ptr: *const c_void, pub lengths_ptr: *const i64, pub validity_ptr: *const u8, @@ -80,296 +56,5 @@ pub struct SliceRef { pub len: usize, } -unsafe impl Send for SliceRef {} -unsafe impl Sync for SliceRef {} - -/// Gathered column descriptor: gather `num_slices` SliceRefs into one Arrow column. -/// `total_rows` must equal the sum of all `slice.len` values. -/// Fields ordered largest-to-smallest; 3 bytes trailing padding → 32 bytes total. -#[repr(C)] -#[derive(Clone, Copy)] -pub struct GatheredColumnDescriptor { - pub slices: *const SliceRef, - pub num_slices: usize, - pub total_rows: usize, - pub column_type: i32, - pub is_nullable: bool, -} - -unsafe impl Send for GatheredColumnDescriptor {} -unsafe impl Sync for GatheredColumnDescriptor {} - -/// Merges the per-slice validity bitmaps from all slices into a single output bitmap. -/// -/// Each slice contributes `slice.len` output rows. Slices with a null `validity_ptr` are -/// all-valid. Slices with a bitmap may be misaligned relative to the output (each slice -/// starts at a different `out` offset), so bits are copied one at a time with a shift. -/// The selection vector (`sel_ptr`) governs which *source data* elements to read; the -/// validity bitmap is always indexed by output row position, so sequential and scattered -/// slices are treated identically here. -/// -/// Returns `None` if every slice is all-valid (no null buffer needed). -unsafe fn build_null_buffer_gathered(slices: &[SliceRef], total_rows: usize) -> Option { - if !slices.iter().any(|s| !s.validity_ptr.is_null()) { - return None; - } - let mut bits = vec![0u8; total_rows.div_ceil(8)]; - let mut out = 0usize; - for slice in slices { - if slice.validity_ptr.is_null() { - // All rows in this slice are valid — set one bit per output row. - for i in 0..slice.len { - bits[(out + i) / 8] |= 1u8 << ((out + i) % 8); - } - } else { - // Copy validity bits from the slice's bitmap into the output bitmap, - // re-aligning from source bit position i to output bit position (out + i). - for i in 0..slice.len { - let b = (*slice.validity_ptr.add(i / 8) >> (i % 8)) & 1; - bits[(out + i) / 8] |= b << ((out + i) % 8); - } - } - out += slice.len; - } - Some(NullBuffer::new(BooleanBuffer::new( - Buffer::from(bits), - 0, - total_rows, - ))) -} - -/// Gather all slices for a column into an Arrow array. -pub(crate) unsafe fn build_arrow_array_gathered( - desc: &GatheredColumnDescriptor, - schema_field: &arrow_schema::Field, -) -> Result { - let slices = std::slice::from_raw_parts(desc.slices, desc.num_slices); - let total = desc.total_rows; - let null_buf = if desc.is_nullable { - build_null_buffer_gathered(slices, total) - } else { - None - }; - - // Macro gathers a primitive numeric type from all slices. - // sel_ptr=null → sequential copy; sel_ptr!=null → indirect gather (1-based indices). - macro_rules! gather_primitive { - ($T:ty, $ArrowType:ty) => {{ - let mut values = Vec::<$T>::with_capacity(total); - for slice in slices { - let src = slice.data_ptr as *const $T; - if slice.sel_ptr.is_null() { - values.extend_from_slice(std::slice::from_raw_parts(src, slice.len)); - } else { - for &idx in std::slice::from_raw_parts(slice.sel_ptr, slice.len) { - values.push(*src.add((idx - 1) as usize)); - } - } - } - Arc::new(PrimitiveArray::<$ArrowType>::new( - ScalarBuffer::from(values), - null_buf, - )) as ArrayRef - }}; - } - - let array: ArrayRef = match desc.column_type { - COLUMN_TYPE_INT32 => gather_primitive!(i32, Int32Type), - COLUMN_TYPE_INT64 => gather_primitive!(i64, Int64Type), - COLUMN_TYPE_FLOAT32 => gather_primitive!(f32, Float32Type), - COLUMN_TYPE_FLOAT64 => gather_primitive!(f64, Float64Type), - COLUMN_TYPE_DATE => gather_primitive!(i32, Date32Type), - COLUMN_TYPE_TIMESTAMP => { - let mut values = Vec::::with_capacity(total); - for slice in slices { - let src = slice.data_ptr as *const i64; - if slice.sel_ptr.is_null() { - values.extend_from_slice(std::slice::from_raw_parts(src, slice.len)); - } else { - for &idx in std::slice::from_raw_parts(slice.sel_ptr, slice.len) { - values.push(*src.add((idx - 1) as usize)); - } - } - } - Arc::new(PrimitiveArray::::new( - ScalarBuffer::from(values), - null_buf, - )) - } - COLUMN_TYPE_TIMESTAMPTZ => { - let mut values = Vec::::with_capacity(total); - for slice in slices { - let src = slice.data_ptr as *const i64; - if slice.sel_ptr.is_null() { - values.extend_from_slice(std::slice::from_raw_parts(src, slice.len)); - } else { - for &idx in std::slice::from_raw_parts(slice.sel_ptr, slice.len) { - values.push(*src.add((idx - 1) as usize)); - } - } - } - Arc::new( - PrimitiveArray::::new( - ScalarBuffer::from(values), - null_buf, - ) - .with_timezone("UTC"), - ) - } - COLUMN_TYPE_BOOLEAN => { - let mut bits = vec![0u8; total.div_ceil(8)]; - let mut out = 0usize; - for slice in slices { - let src = slice.data_ptr as *const u8; - if slice.sel_ptr.is_null() { - let data = std::slice::from_raw_parts(src, slice.len); - for (i, &v) in data.iter().enumerate() { - if v != 0 { - bits[(out + i) / 8] |= 1 << ((out + i) % 8); - } - } - } else { - for (i, &idx) in std::slice::from_raw_parts(slice.sel_ptr, slice.len) - .iter() - .enumerate() - { - if *src.add((idx - 1) as usize) != 0 { - bits[(out + i) / 8] |= 1 << ((out + i) % 8); - } - } - } - out += slice.len; - } - Arc::new(BooleanArray::new( - BooleanBuffer::new(Buffer::from(bits), 0, total), - null_buf, - )) - } - COLUMN_TYPE_STRING => { - // String columns do not support selection vectors. Julia strings are - // heap-allocated with non-contiguous addresses, so the caller must build - // str_ptrs/str_lens arrays up-front — any row selection is already applied - // before add_string_slice! is called. sel_ptr is therefore always null here. - // data_ptr = *const *const u8, lengths_ptr = *const i64. - // - // Build the Arrow StringArray directly: one pass copies string bytes into a - // contiguous values buffer and tracks cumulative offsets. This avoids the - // intermediate Vec> and skips UTF-8 validation — Julia strings - // are guaranteed valid UTF-8. - let null_buf = if desc.is_nullable { - build_null_buffer_gathered(slices, total) - } else { - None - }; - let mut offsets = Vec::::with_capacity(total + 1); - offsets.push(0i32); - let mut values = Vec::::new(); - for slice in slices { - if slice.lengths_ptr.is_null() { - return Err(anyhow::anyhow!("String column requires lengths_ptr")); - } - let ptrs = - std::slice::from_raw_parts(slice.data_ptr as *const *const u8, slice.len); - let lens = std::slice::from_raw_parts(slice.lengths_ptr, slice.len); - for i in 0..slice.len { - let is_null = !slice.validity_ptr.is_null() - && ((*slice.validity_ptr.add(i / 8) >> (i % 8)) & 1) == 0; - if !is_null { - values.extend_from_slice(std::slice::from_raw_parts( - ptrs[i], - lens[i] as usize, - )); - } - offsets.push(values.len() as i32); - } - } - // SAFETY: offsets are monotonically non-decreasing by construction; values - // bytes come from Julia String objects (valid UTF-8) kept alive in col.preserve. - Arc::new(unsafe { - StringArray::new_unchecked( - OffsetBuffer::new(ScalarBuffer::from(offsets)), - Buffer::from_vec(values), - null_buf, - ) - }) - } - COLUMN_TYPE_UUID => { - let mut data: Vec = Vec::with_capacity(total * 16); - for slice in slices { - let src = slice.data_ptr as *const u8; - if slice.sel_ptr.is_null() { - data.extend_from_slice(std::slice::from_raw_parts(src, slice.len * 16)); - } else { - for &idx in std::slice::from_raw_parts(slice.sel_ptr, slice.len) { - data.extend_from_slice(std::slice::from_raw_parts( - src.add((idx - 1) as usize * 16), - 16, - )); - } - } - } - let chunks: Vec<&[u8]> = data.chunks(16).collect(); - Arc::new( - arrow_array::FixedSizeBinaryArray::try_from_iter(chunks.into_iter()) - .map_err(|e| anyhow::anyhow!("Failed to build UUID array: {}", e))?, - ) - } - COLUMN_TYPE_DECIMAL_INT32 | COLUMN_TYPE_DECIMAL_INT64 | COLUMN_TYPE_DECIMAL_INT128 => { - let (precision, scale) = match schema_field.data_type() { - arrow_schema::DataType::Decimal128(p, s) => (*p, *s), - dt => return Err(anyhow::anyhow!("Expected Decimal128, got {:?}", dt)), - }; - let mut values = Vec::::with_capacity(total); - for slice in slices { - match desc.column_type { - COLUMN_TYPE_DECIMAL_INT32 => { - let src = slice.data_ptr as *const i32; - if slice.sel_ptr.is_null() { - values.extend( - std::slice::from_raw_parts(src, slice.len) - .iter() - .map(|&v| v as i128), - ); - } else { - for &idx in std::slice::from_raw_parts(slice.sel_ptr, slice.len) { - values.push(*src.add((idx - 1) as usize) as i128); - } - } - } - COLUMN_TYPE_DECIMAL_INT64 => { - let src = slice.data_ptr as *const i64; - if slice.sel_ptr.is_null() { - values.extend( - std::slice::from_raw_parts(src, slice.len) - .iter() - .map(|&v| v as i128), - ); - } else { - for &idx in std::slice::from_raw_parts(slice.sel_ptr, slice.len) { - values.push(*src.add((idx - 1) as usize) as i128); - } - } - } - _ => { - // DECIMAL_INT128: i128 layout matches Julia Int128 - let src = slice.data_ptr as *const i128; - if slice.sel_ptr.is_null() { - values.extend_from_slice(std::slice::from_raw_parts(src, slice.len)); - } else { - for &idx in std::slice::from_raw_parts(slice.sel_ptr, slice.len) { - values.push(*src.add((idx - 1) as usize)); - } - } - } - } - } - Arc::new( - PrimitiveArray::::new(ScalarBuffer::from(values), null_buf) - .with_precision_and_scale(precision, scale) - .map_err(|e| anyhow::anyhow!("Decimal precision/scale: {}", e))?, - ) - } - _ => return Err(anyhow::anyhow!("Unknown column type: {}", desc.column_type)), - }; - Ok(array) -} +unsafe impl Send for ColumnSlice {} +unsafe impl Sync for ColumnSlice {} diff --git a/src/RustyIceberg.jl b/src/RustyIceberg.jl index c168678..b68f935 100644 --- a/src/RustyIceberg.jl +++ b/src/RustyIceberg.jl @@ -47,14 +47,14 @@ export IcebergTimestampNs, IcebergTimestamptzNs export IcebergString, IcebergUuid, IcebergBinary, IcebergDecimal export Transaction, DataFiles, free_transaction!, free_data_files!, commit, transaction export FastAppendAction, free_fast_append_action!, add_data_files, apply, with_fast_append -export DataFileWriter, free_writer!, close_writer, write_columns, set_encode_workers! -export WriterConfig, CompressionCodec, UNCOMPRESSED, SNAPPY, GZIP, LZ4, ZSTD -export ColumnDescriptor, ColumnBatch, ColumnType +export DataFileWriter, free_writer!, close_writer, set_encode_workers! +export WriterConfig, CompressionCodec, UNCOMPRESSED, SNAPPY, GZIP, LZ4, ZSTD, LZ4_RAW +export RowChunk, flush! +export ColumnType export COLUMN_TYPE_INT32, COLUMN_TYPE_INT64, COLUMN_TYPE_FLOAT32, COLUMN_TYPE_FLOAT64 export COLUMN_TYPE_STRING, COLUMN_TYPE_DATE, COLUMN_TYPE_TIMESTAMP, COLUMN_TYPE_TIMESTAMPTZ, COLUMN_TYPE_BOOLEAN, COLUMN_TYPE_UUID export COLUMN_TYPE_DECIMAL_INT32, COLUMN_TYPE_DECIMAL_INT64, COLUMN_TYPE_DECIMAL_INT128 export julia_type_to_column_type -export GatheredColumn, GatheredBatch, add_slice!, add_string_slice! # Always use the JLL library - override via Preferences if needed for local development # To use a local build, set the preference: diff --git a/src/writer.jl b/src/writer.jl index 3e74f01..9b63326 100644 --- a/src/writer.jl +++ b/src/writer.jl @@ -18,8 +18,11 @@ Compression codec for Parquet files. - `UNCOMPRESSED`: No compression - `SNAPPY`: Snappy compression (fast, moderate compression) - `GZIP`: Gzip compression (slower, better compression) -- `LZ4`: LZ4 compression (very fast, lower compression) +- `LZ4`: LZ4 compression, legacy Hadoop-framed variant (deprecated in the parquet spec; kept for + backward compatibility — prefer `LZ4_RAW`) - `ZSTD`: Zstandard compression (good balance of speed and compression) +- `LZ4_RAW`: LZ4 compression, raw blocks with no framing overhead (modern parquet variant; faster + than `LZ4`) """ @enum CompressionCodec begin UNCOMPRESSED = 0 @@ -27,6 +30,7 @@ Compression codec for Parquet files. GZIP = 2 LZ4 = 3 ZSTD = 4 + LZ4_RAW = 5 end """ @@ -506,49 +510,15 @@ end # ========================================================================================== # Column-based writing (zero-copy from Julia) +# +# A user produces one `RowChunk` per upstream slice (a horizontal stripe of rows across all +# output columns), then calls `append!(writer, chunk)`. The writer copies the data eagerly +# into Rust-owned per-column buffers; the source Julia arrays may be released the moment +# `append!` returns. When the accumulated row count reaches the coalesce window, the writer +# finalizes a `RecordBatch` and ships it to the async encode pool automatically. Callers +# that need flush control on logical boundaries can call `flush!(writer)`. # ========================================================================================== -""" - SliceRef - -FFI reference to a single slice of source column data for the scattered-gather writer. - -- `data_ptr`: pointer to source data array (T[]) or string pointers (Ptr{UInt8}[]) -- `lengths_ptr`: for string columns, pointer to lengths array; null for other types -- `validity_ptr`: pointer to validity bitmap (BitVector.chunks); null if all rows valid -- `sel_ptr`: pointer to selection index array (1-based Julia indices); null for sequential access -- `len`: number of rows in this slice - -All fields are 8 bytes — total struct size is 40 bytes with no padding. -""" -struct SliceRef - data_ptr::Ptr{Cvoid} - lengths_ptr::Ptr{Int64} - validity_ptr::Ptr{UInt8} - sel_ptr::Ptr{Int64} - len::Csize_t -end - -""" - GatheredColumnDescriptor - -FFI descriptor for a column to be gathered from multiple SliceRefs. -Pass an array of these to `write_columns`. - -- `slices_ptr`: pointer to array of SliceRef structs -- `num_slices`: number of SliceRef entries -- `total_rows`: sum of all slice lengths -- `column_type`: ColumnType enum value -- `is_nullable`: whether the column may contain null values -""" -struct GatheredColumnDescriptor - slices_ptr::Ptr{SliceRef} - num_slices::Csize_t - total_rows::Csize_t - column_type::Int32 - is_nullable::Bool -end - """ ColumnType @@ -568,34 +538,13 @@ Enum for column data types, matching the Rust FFI constants. COLUMN_TYPE_DECIMAL_INT32 = 10 # Decimal backed by Int32 (precision ≤ 9) COLUMN_TYPE_DECIMAL_INT64 = 11 # Decimal backed by Int64 (precision ≤ 18) COLUMN_TYPE_DECIMAL_INT128 = 12 # Decimal backed by Int128 (precision > 18) -end - -""" - ColumnDescriptor - -FFI structure describing a single column for direct column writing. -This struct must match the Rust `ColumnDescriptor` layout exactly. - -# Fields -- `data_ptr::Ptr{Cvoid}`: Pointer to the raw column data. For strings, this is a - pointer to an array of string pointers (Ptr{UInt8}[]). -- `lengths_ptr::Ptr{Int64}`: For string columns, pointer to lengths array (Int64[]). - For other types, this is C_NULL. -- `validity_ptr::Ptr{UInt8}`: Pointer to validity bitmap (BitVector.chunks, bit-packed) -- `num_rows::Csize_t`: Number of rows in the column -- `column_type::Int32`: Type of the column (see `ColumnType` enum) -- `is_nullable::Bool`: Whether this column can contain null values - -Note: Fields are ordered to avoid padding (8-byte fields first, then 4-byte, then 1-byte). -""" -struct ColumnDescriptor - data_ptr::Ptr{Cvoid} # 8 bytes, offset 0 - lengths_ptr::Ptr{Int64} # 8 bytes, offset 8 - validity_ptr::Ptr{UInt8} # 8 bytes, offset 16 - num_rows::Csize_t # 8 bytes, offset 24 - column_type::Int32 # 4 bytes, offset 32 - is_nullable::Bool # 1 byte, offset 36 - # (3 bytes trailing padding added by compiler, total 40 bytes) + # Julia-epoch variants: source data uses Julia's internal epoch (0001-01-01); + # Rust applies the offset to produce Iceberg's Unix-epoch representation. + COLUMN_TYPE_JULIA_DATE = 13 # i64[] days since year 1 → i32 days since 1970-01-01 + COLUMN_TYPE_JULIA_TIMESTAMP = 14 # i64[] ms since year 1 → i64 μs since 1970-01-01 + COLUMN_TYPE_JULIA_TIMESTAMPTZ = 15 # same + UTC timezone + COLUMN_TYPE_JULIA_TIMESTAMP_NS = 16 # i64[] ms since year 1 → i64 ns since 1970-01-01 + COLUMN_TYPE_JULIA_TIMESTAMPTZ_NS = 17 # same + UTC timezone end """ @@ -633,9 +582,11 @@ iceberg_column_type(::IcebergLong) = COLUMN_TYPE_INT64 iceberg_column_type(::IcebergFloat) = COLUMN_TYPE_FLOAT32 iceberg_column_type(::IcebergDouble) = COLUMN_TYPE_FLOAT64 iceberg_column_type(::IcebergString) = COLUMN_TYPE_STRING -iceberg_column_type(::IcebergDate) = COLUMN_TYPE_DATE -iceberg_column_type(::IcebergTimestamp) = COLUMN_TYPE_TIMESTAMP -iceberg_column_type(::IcebergTimestamptz) = COLUMN_TYPE_TIMESTAMPTZ +iceberg_column_type(::IcebergDate) = COLUMN_TYPE_JULIA_DATE +iceberg_column_type(::IcebergTimestamp) = COLUMN_TYPE_JULIA_TIMESTAMP +iceberg_column_type(::IcebergTimestamptz) = COLUMN_TYPE_JULIA_TIMESTAMPTZ +iceberg_column_type(::IcebergTimestampNs) = COLUMN_TYPE_JULIA_TIMESTAMP_NS +iceberg_column_type(::IcebergTimestamptzNs) = COLUMN_TYPE_JULIA_TIMESTAMPTZ_NS iceberg_column_type(::IcebergBoolean) = COLUMN_TYPE_BOOLEAN iceberg_column_type(::IcebergUuid) = COLUMN_TYPE_UUID function iceberg_column_type(d::IcebergDecimal) @@ -649,551 +600,316 @@ function iceberg_column_type(d::IcebergDecimal) end """ - ColumnBatch - -A builder for collecting column descriptors and their underlying arrays. -Automatically tracks arrays that need to be preserved during FFI calls. - -# Example -```julia -batch = ColumnBatch() -push!(batch, ids) # non-nullable column -push!(batch, values; validity=validity_vec) # nullable column -write_columns(writer, batch) -``` -""" -mutable struct ColumnBatch - descriptors::Vector{ColumnDescriptor} - arrays_to_preserve::Vector{Any} - - ColumnBatch() = new(ColumnDescriptor[], Any[]) -end - -""" - push!(batch::ColumnBatch, data::Vector{String}; validity=nothing, length=nothing, column_type=nothing) - -Add a string column to the batch. Strings are passed as an array of pointers with lengths. -Note: While this avoids copying on the Julia side, Arrow still copies the string data -into its internal buffer on the Rust side. - -# Arguments -- `data`: The string column data array -- `validity`: Optional validity mask (BitVector where false=null, true=valid) -- `length`: Optional number of rows to use from the array. If not specified, - uses the full array length. -- `column_type`: Optional explicit column type (defaults to COLUMN_TYPE_STRING) -""" -function Base.push!( - batch::ColumnBatch, - data::Vector{String}; - validity::Union{Nothing, BitVector}=nothing, - length::Union{Nothing, Int}=nothing, - column_type::Union{Nothing, ColumnType}=nothing -) - num_rows = length === nothing ? Base.length(data) : length - is_nullable = validity !== nothing - col_type = column_type === nothing ? COLUMN_TYPE_STRING : column_type - - # Build arrays of string pointers and lengths (no copy on Julia side) - # Each String in Julia is a pointer to contiguous UTF-8 bytes - # For null values, we use null pointer and zero length - Rust will check validity mask - str_ptrs = Vector{Ptr{UInt8}}(undef, num_rows) - str_lens = Vector{Int64}(undef, num_rows) - for i in 1:num_rows - if is_nullable && !validity[i] - # Null value - use null pointer and zero length - str_ptrs[i] = Ptr{UInt8}(C_NULL) - str_lens[i] = 0 - else - str_ptrs[i] = pointer(data[i]) - str_lens[i] = sizeof(data[i]) - end - end - - # Preserve all arrays (original strings + metadata arrays) - push!(batch.arrays_to_preserve, data) - push!(batch.arrays_to_preserve, str_ptrs) - push!(batch.arrays_to_preserve, str_lens) - - validity_ptr = if is_nullable - push!(batch.arrays_to_preserve, validity) - Ptr{UInt8}(pointer(validity.chunks)) - else - Ptr{UInt8}(C_NULL) - end + ColumnSlice - # For strings: data_ptr = pointer to string pointers, offsets_ptr = pointer to lengths - desc = ColumnDescriptor( - Ptr{Cvoid}(pointer(str_ptrs)), - pointer(str_lens), # Reuse offsets_ptr for lengths array - validity_ptr, - Csize_t(num_rows), - Int32(col_type), - is_nullable - ) - push!(batch.descriptors, desc) - return batch -end +FFI struct describing one column's contribution to a single `RowChunk`. Internal — +users build `RowChunk`s via `push!` instead of constructing `ColumnSlice` directly. +All fields are 8 bytes; total struct size is 40 bytes with no padding (matches Rust's +`ColumnSlice` layout). """ - push!(batch::ColumnBatch, data::Vector{String}, str_ptrs::Vector{Ptr{UInt8}}, str_lens::Vector{Int64}; validity=nothing, length=nothing, column_type=nothing) - -Add a string column to the batch using pre-allocated pointer/length buffers. -The caller is responsible for filling `str_ptrs` and `str_lens` before calling this. -Avoids allocating new pointer/length arrays on every write. -""" -function Base.push!( - batch::ColumnBatch, - data::Vector{String}, - str_ptrs::Vector{Ptr{UInt8}}, - str_lens::Vector{Int64}; - validity::Union{Nothing, BitVector}=nothing, - length::Union{Nothing, Int}=nothing, - column_type::Union{Nothing, ColumnType}=nothing, -) - num_rows = length === nothing ? Base.length(str_ptrs) : length - is_nullable = validity !== nothing - col_type = column_type === nothing ? COLUMN_TYPE_STRING : column_type - - push!(batch.arrays_to_preserve, data, str_ptrs, str_lens) - - validity_ptr = if is_nullable - push!(batch.arrays_to_preserve, validity) - Ptr{UInt8}(pointer(validity.chunks)) - else - Ptr{UInt8}(C_NULL) - end - - desc = ColumnDescriptor( - Ptr{Cvoid}(pointer(str_ptrs)), - pointer(str_lens), - validity_ptr, - Csize_t(num_rows), - Int32(col_type), - is_nullable - ) - push!(batch.descriptors, desc) - return batch -end - -""" - push!(batch::ColumnBatch, data::Vector{T}; validity=nothing, length=nothing, column_type=nothing) where T - -Add a column to the batch. The column type is inferred from the element type unless -explicitly specified. - -# Arguments -- `data`: The column data array -- `validity`: Optional validity mask (BitVector where false=null, true=valid) -- `length`: Optional number of rows to use from the array. If not specified, - uses the full array length. This allows writing only a prefix of the array. -- `column_type`: Optional explicit column type (ColumnType enum). If not specified, - inferred from the element type T. Use this when the physical storage type differs - from the logical type (e.g., Int32 data that represents Date32). -""" -function Base.push!( - batch::ColumnBatch, - data::Vector{T}; - validity::Union{Nothing, BitVector}=nothing, - length::Union{Nothing, Int}=nothing, - column_type::Union{Nothing, ColumnType}=nothing -) where T - push!(batch.arrays_to_preserve, data) - - col_type = column_type === nothing ? julia_type_to_column_type(T) : column_type - num_rows = length === nothing ? Base.length(data) : length - is_nullable = validity !== nothing - - validity_ptr = if is_nullable - # BitVector stores bits in UInt64 chunks - pass pointer to chunks directly - push!(batch.arrays_to_preserve, validity) - Ptr{UInt8}(pointer(validity.chunks)) - else - Ptr{UInt8}(C_NULL) - end - - desc = ColumnDescriptor( - Ptr{Cvoid}(pointer(data)), - Ptr{Int64}(C_NULL), # lengths_ptr not used for non-string types - validity_ptr, - Csize_t(num_rows), - Int32(col_type), - is_nullable - ) - push!(batch.descriptors, desc) - return batch +struct ColumnSlice + data_ptr::Ptr{Cvoid} + lengths_ptr::Ptr{Int64} + validity_ptr::Ptr{UInt8} + sel_ptr::Ptr{Int64} + len::Csize_t end """ - write_columns(writer::DataFileWriter, columns::Vector{ColumnDescriptor}, arrays_to_preserve) - -Write raw column data directly to the Parquet writer, bypassing Arrow IPC serialization. + RowChunk -This is a low-level function that passes raw column pointers to Rust, which builds -Arrow arrays directly from them. This avoids one serialization step compared to -the standard `write` function. +A horizontal stripe of rows across all output columns — what a streaming producer hands +to the writer at each step. Build one by pushing column data in schema order, then call +`append!(writer, chunk)`. -# Arguments -- `writer::DataFileWriter`: The writer to write to -- `columns::Vector{ColumnDescriptor}`: Array of column descriptors -- `arrays_to_preserve`: A tuple/collection of arrays whose memory is referenced by the - ColumnDescriptors. These will be GC-preserved during the FFI call. - -# Safety -The ColumnDescriptors contain raw pointers that must point to valid data. -Pass all source arrays in `arrays_to_preserve` to ensure they are not garbage -collected during the FFI call. - -# Throws -- `IcebergException` if the write fails - -# Example ```julia -data = Int64[1, 2, 3] -validity = UInt8[1, 1, 1] -desc = ColumnDescriptor(pointer(data), ...) -write_columns(writer, [desc], (data, validity)) # Arrays preserved during call +chunk = RowChunk() +push!(chunk, ids) # non-nullable numeric, sequential +push!(chunk, values; validity=src_valid_bv) # nullable numeric, sequential +push!(chunk, source_scores; + validity=src_valid_bv, sel=sel_indices) # nullable scattered: validity is + # aligned to `source_scores`; the + # library gathers through `sel`. +push!(chunk, tags; validity=src_valid_bv) # nullable strings +append!(writer, chunk) ``` -""" -function write_columns(writer::DataFileWriter, columns::Vector{ColumnDescriptor}, arrays_to_preserve) - writer.ptr == C_NULL && throw(IcebergException( - STATE_RESOURCE_FREED, - "Resource has been freed", - "Writer has been freed", - )) - isempty(columns) && throw(IcebergException( - DATA_SCHEMA_MISMATCH, - "Column not found in table schema", - "No columns provided", - )) - - ret = GC.@preserve columns arrays_to_preserve begin - @ccall rust_lib.iceberg_writer_write_columns( - writer.ptr::Ptr{Cvoid}, - pointer(columns)::Ptr{ColumnDescriptor}, - length(columns)::Csize_t, - )::Int32 - end - - ret == 0 || throw(IcebergException( - DATA_SCHEMA_MISMATCH, - "Column not found in table schema", - "write_columns failed (see writer close for details)", - )) - return nothing -end -""" - write_columns(writer::DataFileWriter, batch::ColumnBatch) - -Write columns from a ColumnBatch to the Parquet writer. +For streaming pipelines that push many chunks, reuse the same `RowChunk` and call +`empty!(chunk)` at the top of each iteration. `empty!` clears the working vectors but +retains the chunk's internal pool of `str_ptrs` / `str_lens` buffers, so string columns +amortize their per-chunk gather work to zero. -This is the recommended way to use write_columns - the ColumnBatch automatically -tracks all arrays that need to be preserved during the FFI call. - -# Arguments -- `writer::DataFileWriter`: The writer to write to -- `batch::ColumnBatch`: The column batch to write - -# Example ```julia -batch = ColumnBatch() -push!(batch, ids) -push!(batch, values; validity=validity_vec) -write_columns(writer, batch) - -# To write only first 100 rows, use the length parameter on push!: -batch = ColumnBatch() -push!(batch, ids; length=100) -push!(batch, values; validity=validity_vec, length=100) -write_columns(writer, batch) -``` -""" -function write_columns(writer::DataFileWriter, batch::ColumnBatch) - write_columns(writer, batch.descriptors, batch.arrays_to_preserve) +chunk = RowChunk() +for slice in upstream + empty!(chunk) + push!(chunk, slice.ids) + push!(chunk, slice.tags; validity=slice.v) + append!(writer, chunk) end +``` -# ========================================================================================== -# High-level gathered-column API -# ========================================================================================== +Column order must be stable across iterations (it has to match the writer's schema +anyway). The internal string pool is keyed by column position. +""" +mutable struct RowChunk + slices::Vector{ColumnSlice} # FFI-ready, one entry per push! + preserve::Vector{Any} # GC roots for source data / validity / sel + # Per-column-position string scratch. Entry `i` holds the `str_ptrs` / `str_lens` / + # `str_validity` buffers for the i-th pushed column when it was a string column. + # `str_validity` is the output-aligned validity bitmap Rust receives when both `sel` + # and `validity` are present on a string push — see that `push!` overload for why. + # Non-string positions have unused (length-0) entries. Retained across `empty!` so + # streaming reuse is free. + str_ptrs::Vector{Vector{Ptr{UInt8}}} + str_lens::Vector{Vector{Int64}} + str_validity::Vector{BitVector} +end + +RowChunk() = RowChunk( + ColumnSlice[], Any[], Vector{Ptr{UInt8}}[], Vector{Int64}[], BitVector[], +) """ - GatheredColumn - -Accumulates one or more source slices for a single column. Rust gathers the data -directly from source buffers when the batch is written, avoiding a Julia-side staging -copy for numeric columns. + empty!(chunk::RowChunk) -Typical usage: +Reset the chunk's working vectors so the next pass can refill them. The internal string +ptr/len pool is *not* freed — it stays sized at the previous high-water mark, so a +streaming loop that calls `empty!` then `push!` repeatedly pays zero per-iteration +allocation for the string-gather buffers. -```julia -col = GatheredColumn(COLUMN_TYPE_INT64) -add_slice!(col, src_array) # sequential: all rows -add_slice!(col, src_array2; sel=sel_indices) # scattered: rows at sel_indices -add_slice!(col, src_array3; validity=valid_bv) # nullable slice - -str_col = GatheredColumn(COLUMN_TYPE_STRING; nullable=true) -add_string_slice!(str_col, ["a", "", "c"]; validity=BitVector([true, false, true])) -``` - -For string columns use `add_string_slice!` instead of `add_slice!`. Selection vectors -are not supported for strings: Julia strings are non-contiguous, so the caller must -build `str_ptrs`/`str_lens` arrays up-front — any row selection is applied on the Julia -side before calling `add_string_slice!`. +Drop the chunk and create a new one if you want to actually reclaim the pool memory. """ -mutable struct GatheredColumn - slices::Vector{SliceRef} - total_rows::Int - column_type::ColumnType - is_nullable::Bool - preserve::Vector{Any} # source arrays kept alive until write +function Base.empty!(chunk::RowChunk) + empty!(chunk.slices) + empty!(chunk.preserve) + return chunk end -GatheredColumn(column_type::ColumnType; nullable::Bool=false) = - GatheredColumn(SliceRef[], 0, column_type, nullable, Any[]) - """ - add_slice!(col::GatheredColumn, data::AbstractVector{T}; - sel=nothing, validity=nothing) + push!(chunk::RowChunk, data::AbstractVector{T}; + validity=nothing, sel=nothing) -Append a slice of `data` to `col`. +Add a non-string column slice to the chunk. Builds the FFI-ready `ColumnSlice` inline — +just `pointer(data)` and pointer arithmetic, no allocation beyond the chunk's own +`slices` / `preserve` growth. -- `sel`: optional `Vector{Int64}` of 1-based row indices into `data` to select. - If omitted, all rows of `data` are used sequentially. -- `validity`: optional `BitVector` (length = number of selected rows, `true` = valid). - Providing this marks the column as nullable. +- `validity`: optional `BitVector` *aligned to `data`* — `length(validity) >= length(data)`, + bit `i` describes whether `data[i]` is valid (`true` = valid, `false` = null). When + `sel` is also provided, Rust gathers validity through `sel` alongside the value gather: + bit `sel[i] - 1` of the source bitmap becomes bit `i` of the output bitmap. When `sel` + is omitted, source and output positions coincide. +- `sel`: optional contiguous `AbstractVector{Int64}` of 1-based indices into `data` for + scattered access. Output row count is `length(sel)`. If omitted, all rows of `data` + are used sequentially. A `view(sel_buf, 1:n)` is accepted — useful when the caller + has a sel buffer with stale capacity beyond the live region. """ -function add_slice!( - col::GatheredColumn, +function Base.push!( + chunk::RowChunk, data::AbstractVector{T}; - sel::Union{Nothing, Vector{Int64}} = nothing, validity::Union{Nothing, BitVector} = nothing, + sel::Union{Nothing, AbstractVector{Int64}} = nothing, ) where T len = sel === nothing ? length(data) : length(sel) sel_ptr = if sel !== nothing - push!(col.preserve, sel) + push!(chunk.preserve, sel) pointer(sel) else Ptr{Int64}(C_NULL) end validity_ptr = if validity !== nothing - col.is_nullable = true - push!(col.preserve, validity) + push!(chunk.preserve, validity) Ptr{UInt8}(pointer(validity.chunks)) else Ptr{UInt8}(C_NULL) end - push!(col.preserve, data) - push!(col.slices, SliceRef( + push!(chunk.preserve, data) + push!(chunk.slices, ColumnSlice( Ptr{Cvoid}(pointer(data)), - Ptr{Int64}(C_NULL), # lengths_ptr unused for non-string types + Ptr{Int64}(C_NULL), validity_ptr, sel_ptr, Csize_t(len), )) - col.total_rows += len - return col + return chunk end """ - add_string_slice!(col::GatheredColumn, strings::Vector{String}; validity=nothing) + push!( + chunk::RowChunk, strings::AbstractVector{<:AbstractString}; + validity=nothing, sel=nothing + ) -Append a string slice to `col` from a plain `Vector{String}`. +Add a string column slice to the chunk. Accepts any `AbstractString` element type +(`String`, `SubString{String}`, Arrow's `VariableSizeString`, …) — `pointer(s)` and +`ncodeunits(s)` are used directly, no materialization through `Vector{String}`. -- `validity`: optional `BitVector` (`true` = valid, `false` = null). Marking a row null - does not require a placeholder in `strings`, but the vector must still be the same length. +The ptr/len gather buffers live in the chunk's per-position pool and are reused across +`empty!`/refill cycles, so a streaming loop pays no allocation for them after the first +chunk. -```julia -col = GatheredColumn(COLUMN_TYPE_STRING; nullable=true) -add_string_slice!(col, ["hello", "", "world"]; validity=BitVector([true, false, true])) -``` +- `validity`: optional `BitVector` *aligned to `strings`* — `length(validity) >= length(strings)`, + bit `i` describes whether `strings[i]` is valid (`true` = valid, `false` = null). +- `sel`: optional contiguous `AbstractVector{Int64}` of 1-based indices into `strings` + for scattered access. Output row count is `length(sel)`. If omitted, all rows of + `strings` are used sequentially. -For performance-critical paths where pointer/length arrays are pre-allocated, use the -lower-level `add_string_slice!(col, str_ptrs, str_lens; validity)` overload directly. +Unlike the numeric `push!`, the value gather is performed here on the Julia side +(`pointer(strings[sel[i]])` per row) because Rust can't walk a Julia `AbstractString` +vector across the FFI. When both `sel` and `validity` are supplied, the validity gather +is folded into that same loop and an output-aligned bitmap is materialized in the +chunk's per-position pool. The consumer-visible contract is therefore identical to the +numeric case — pass a source-aligned `validity` either way. """ -function add_string_slice!( - col::GatheredColumn, - strings::Vector{String}; +function Base.push!( + chunk::RowChunk, + strings::AbstractVector{<:AbstractString}; validity::Union{Nothing, BitVector} = nothing, + sel::Union{Nothing, AbstractVector{Int64}} = nothing, ) - n = length(strings) - is_nullable = validity !== nothing - str_ptrs = Vector{Ptr{UInt8}}(undef, n) - str_lens = Vector{Int64}(undef, n) - for i in 1:n - if is_nullable && !validity[i] - str_ptrs[i] = Ptr{UInt8}(C_NULL) - str_lens[i] = 0 - else - str_ptrs[i] = pointer(strings[i]) - str_lens[i] = ncodeunits(strings[i]) - end + pos = length(chunk.slices) + 1 + # Grow the per-position pool to cover this column's slot. Lazy and retained across + # `empty!` calls, so subsequent iterations are no-ops here. + while length(chunk.str_ptrs) < pos + push!(chunk.str_ptrs, Ptr{UInt8}[]) + push!(chunk.str_lens, Int64[]) + push!(chunk.str_validity, BitVector()) end - push!(col.preserve, strings) # keep String objects alive so pointers remain valid - return add_string_slice!(col, str_ptrs, str_lens; validity) -end + str_ptrs = chunk.str_ptrs[pos] + str_lens = chunk.str_lens[pos] -""" - add_string_slice!(col::GatheredColumn, str_ptrs, str_lens; validity=nothing) + n = sel === nothing ? length(strings) : length(sel) + resize!(str_ptrs, n) # amortized O(1) once the pool is sized + resize!(str_lens, n) -Low-level overload: append a string slice from pre-built pointer/length arrays. -`str_ptrs` is a `Vector{Ptr{UInt8}}` of pointers to UTF-8 string data and `str_lens` -is a `Vector{Int64}` of corresponding byte lengths. The caller is responsible for keeping -the pointed-to string bytes alive until `write_columns` returns. -""" -function add_string_slice!( - col::GatheredColumn, - str_ptrs::Vector{Ptr{UInt8}}, - str_lens::Vector{Int64}; - validity::Union{Nothing, BitVector} = nothing, -) - len = length(str_ptrs) + is_nullable = validity !== nothing + # Rust never sees `sel_ptr` for string columns (the value gather is pre-applied + # below), so when `sel` *and* `validity` are both supplied we have to rewrite the + # source-aligned validity bitmap to an output-aligned one here. With `sel === nothing`, + # source and output positions coincide and we pass `validity` through unchanged. + needs_validity_gather = is_nullable && sel !== nothing + str_validity = needs_validity_gather ? chunk.str_validity[pos] : nothing + if needs_validity_gather + resize!(str_validity, n) + end - validity_ptr = if validity !== nothing - col.is_nullable = true - push!(col.preserve, validity) + if sel === nothing + @inbounds for i in 1:n + if is_nullable && !validity[i] + str_ptrs[i] = Ptr{UInt8}(C_NULL) + str_lens[i] = 0 + else + s = strings[i] + str_ptrs[i] = pointer(s) + str_lens[i] = ncodeunits(s) + end + end + elseif needs_validity_gather + @inbounds for i in 1:n + src = sel[i] + if !validity[src] + str_ptrs[i] = Ptr{UInt8}(C_NULL) + str_lens[i] = 0 + str_validity[i] = false + else + s = strings[src] + str_ptrs[i] = pointer(s) + str_lens[i] = ncodeunits(s) + str_validity[i] = true + end + end + push!(chunk.preserve, sel) + else + @inbounds for i in 1:n + s = strings[sel[i]] + str_ptrs[i] = pointer(s) + str_lens[i] = ncodeunits(s) + end + push!(chunk.preserve, sel) + end + + push!(chunk.preserve, strings) + validity_ptr = if needs_validity_gather + push!(chunk.preserve, str_validity) + Ptr{UInt8}(pointer(str_validity.chunks)) + elseif validity !== nothing + push!(chunk.preserve, validity) Ptr{UInt8}(pointer(validity.chunks)) else Ptr{UInt8}(C_NULL) end - push!(col.preserve, str_ptrs, str_lens) - push!(col.slices, SliceRef( + push!(chunk.slices, ColumnSlice( Ptr{Cvoid}(pointer(str_ptrs)), pointer(str_lens), validity_ptr, Ptr{Int64}(C_NULL), - Csize_t(len), + Csize_t(n), )) - col.total_rows += len - return col -end - -""" - GatheredBatch - -Collects a `GatheredColumn` per output column, then writes all of them in one call. - -```julia -batch = GatheredBatch() -push!(batch, col_int64) -push!(batch, col_float64) -write_columns(writer, batch) -``` - -You can also push a single-slice column inline without building a `GatheredColumn` -explicitly: - -```julia -batch = GatheredBatch() -push!(batch, src_ints, COLUMN_TYPE_INT64) -push!(batch, src_floats, COLUMN_TYPE_FLOAT64; sel=indices, validity=valid_bv) -write_columns(writer, batch) -``` -""" -mutable struct GatheredBatch - columns::Vector{GatheredColumn} + return chunk end -GatheredBatch() = GatheredBatch(GatheredColumn[]) - -""" - push!(batch::GatheredBatch, col::GatheredColumn) - -Append an already-built `GatheredColumn` to the batch. """ -Base.push!(batch::GatheredBatch, col::GatheredColumn) = (push!(batch.columns, col); batch) + append!(writer::DataFileWriter, chunk::RowChunk) -""" - push!(batch::GatheredBatch, data::AbstractVector, column_type::ColumnType; - sel=nothing, validity=nothing, nullable=false) +Hand one `RowChunk` to the writer. The chunk's `slices` are already FFI-ready, so this +just pins memory, fires the FFI, and returns. Rust copies all slice data synchronously +into per-column buffers; the source arrays may be released the moment this call returns. -Convenience: create a single-slice `GatheredColumn` from `data` and append it. +When the accumulated window reaches the coalesce size the writer auto-flushes a +`RecordBatch` to the encode pool. `append!` calls are appended in order — no reordering. """ -function Base.push!( - batch::GatheredBatch, - data::AbstractVector, - column_type::ColumnType; - sel::Union{Nothing, Vector{Int64}} = nothing, - validity::Union{Nothing, BitVector} = nothing, - nullable::Bool = validity !== nothing, -) - col = GatheredColumn(column_type; nullable) - add_slice!(col, data; sel, validity) - push!(batch.columns, col) - return batch +function Base.append!(writer::DataFileWriter, chunk::RowChunk) + writer.ptr == C_NULL && throw(IcebergException( + STATE_RESOURCE_FREED, + "Resource has been freed", + "Writer has been freed", + )) + isempty(chunk.slices) && throw(IcebergException( + DATA_SCHEMA_MISMATCH, + "Column not found in table schema", + "RowChunk has no columns", + )) + ret = GC.@preserve chunk begin + @ccall rust_lib.iceberg_writer_append( + writer.ptr::Ptr{Cvoid}, + pointer(chunk.slices)::Ptr{ColumnSlice}, + length(chunk.slices)::Csize_t, + )::Int32 + end + ret == 0 || throw(IcebergException( + DATA_SCHEMA_MISMATCH, + "Column not found in table schema", + "append! failed (see close_writer for details)", + )) + return writer end """ - write_columns(writer::DataFileWriter, batch::GatheredBatch[, extra_preserve]) - -Gather column data from Julia memory synchronously, then encode asynchronously. + flush!(writer::DataFileWriter) -Gathers all column data from Julia memory in the calling thread using a plain blocking -`ccall`. Encode runs asynchronously in the global worker pool. +Force the writer to flush its current partial window to the encode pool. Useful on logical +boundaries (end of transaction, time tick) where a Parquet row-group break is desired +without waiting for the natural coalesce-window boundary. No-op if the buffer is empty. -`extra_preserve` (optional) is an additional collection of objects whose memory must -stay alive during the gather (e.g. source string arrays for zero-copy string columns). - -The source data pointed to by the `GatheredBatch` slices and `extra_preserve` must be -valid for the duration of this call. After the call returns, all Julia pointers have -been consumed and the source data may be safely released. +`close_writer` flushes any remainder automatically, so explicit `flush!` is only needed +when the caller wants control over flush timing. """ -function write_columns( - writer::DataFileWriter, - batch::GatheredBatch, - extra_preserve = nothing, -) - isempty(batch.columns) && throw(IcebergException( - DATA_SCHEMA_MISMATCH, - "Column not found in table schema", - "GatheredBatch has no columns", - )) +function flush!(writer::DataFileWriter) writer.ptr == C_NULL && throw(IcebergException( STATE_RESOURCE_FREED, "Resource has been freed", "Writer has been freed", )) - - all_slice_arrays = Vector{Vector{SliceRef}}(undef, length(batch.columns)) - descriptors = Vector{GatheredColumnDescriptor}(undef, length(batch.columns)) - preserve = Any[] - - for (i, col) in enumerate(batch.columns) - slices = col.slices - all_slice_arrays[i] = slices - append!(preserve, col.preserve) - push!(preserve, slices) - descriptors[i] = GatheredColumnDescriptor( - pointer(slices), - Csize_t(length(slices)), - Csize_t(col.total_rows), - Int32(col.column_type), - col.is_nullable, - ) - end - extra_preserve !== nothing && append!(preserve, extra_preserve) - - ret = GC.@preserve preserve all_slice_arrays descriptors begin - @ccall rust_lib.iceberg_writer_write_gathered_columns( - writer.ptr::Ptr{Cvoid}, - pointer(descriptors)::Ptr{GatheredColumnDescriptor}, - length(descriptors)::Csize_t, - )::Int32 - end - if ret != 0 - err_ptr = @ccall rust_lib.iceberg_take_gather_error()::Ptr{Cchar} - msg = if err_ptr != C_NULL - s = unsafe_string(err_ptr) - @ccall rust_lib.iceberg_destroy_cstring(err_ptr::Ptr{Cchar})::Cint - s - else - "gather failed (see writer close for details)" - end - throw(IcebergException(DATA_SCHEMA_MISMATCH, "Column not found in table schema", "write_columns (gathered): $msg")) - end - return nothing + ret = @ccall rust_lib.iceberg_writer_flush(writer.ptr::Ptr{Cvoid})::Int32 + ret == 0 || throw(IcebergException( + DATA_SCHEMA_MISMATCH, + "Column not found in table schema", + "flush! failed (see close_writer for details)", + )) + return writer end diff --git a/test/writer_tests.jl b/test/writer_tests.jl index 1c15dd7..060be31 100644 --- a/test/writer_tests.jl +++ b/test/writer_tests.jl @@ -678,8 +678,8 @@ end println("\n✅ Writer with vended credentials tests completed!") end -@testset "Writer write_columns API" begin - println("Testing write_columns (raw column) API...") +@testset "Writer append! / RowChunk API" begin + println("Testing append!(writer, RowChunk) API...") catalog_uri = get_catalog_uri() props = get_catalog_properties() @@ -720,10 +720,9 @@ end @test table != C_NULL println("✅ Test table created: $table_name") - # Test: Write raw column data using write_columns - println("\nTest: Writing data via write_columns...") + # Test: Write raw column data via the streaming RowChunk path + println("\nTest: Writing data via append!(writer, RowChunk)...") - # Prepare raw column data col_ids = Int64[1, 2, 3, 4, 5] col_counts = Int32[10, 20, 30, 40, 50] col_values = Float64[1.1, 2.2, 3.3, 4.4, 5.5] @@ -739,15 +738,14 @@ end @test writer.ptr != C_NULL println("✅ Writer created successfully") - # Build column batch using the helper - batch = RustyIceberg.ColumnBatch() - push!(batch, col_ids) - push!(batch, col_counts; validity=validity_counts) - push!(batch, col_values; validity=validity_values) - push!(batch, col_flags; validity=validity_flags) + chunk = RustyIceberg.RowChunk() + push!(chunk, col_ids) + push!(chunk, col_counts; validity=validity_counts) + push!(chunk, col_values; validity=validity_values) + push!(chunk, col_flags; validity=validity_flags) - RustyIceberg.write_columns(writer, batch) - println("✅ Data written via write_columns") + append!(writer, chunk) + println("✅ Data appended; close_writer will flush the remainder") end @test data_files !== nothing @test data_files.ptr != C_NULL @@ -784,7 +782,7 @@ end @test sorted_counts == Int32[10, 20, 30, 40, 50] @test sorted_values == Float64[1.1, 2.2, 3.3, 4.4, 5.5] @test sorted_flags == Bool[true, false, true, false, true] - println("✅ Verified write_columns data content matches exactly") + println("✅ Verified append!/RowChunk data content matches exactly") # Clean up updated table RustyIceberg.free_table(updated_table) @@ -812,11 +810,11 @@ end end end - println("\n✅ write_columns API tests completed!") + println("\n✅ append!/RowChunk API tests completed!") end -@testset "Writer write_columns with nulls" begin - println("Testing write_columns with null values...") +@testset "Writer RowChunk with nulls" begin + println("Testing RowChunk with null values...") catalog_uri = get_catalog_uri() props = get_catalog_properties() @@ -861,12 +859,11 @@ end validity_values = BitVector([true, false, true, false, true]) # positions 2 and 4 are null data_files = RustyIceberg.with_data_file_writer(table) do writer - batch = RustyIceberg.ColumnBatch() - push!(batch, col_ids) - push!(batch, col_values; validity=validity_values) - - RustyIceberg.write_columns(writer, batch) - println("✅ Data with nulls written via write_columns") + chunk = RustyIceberg.RowChunk() + push!(chunk, col_ids) + push!(chunk, col_values; validity=validity_values) + append!(writer, chunk) + println("✅ Data with nulls appended") end @test data_files !== nothing println("✅ Writer closed successfully") @@ -923,11 +920,11 @@ end end end - println("\n✅ write_columns with nulls tests completed!") + println("\n✅ RowChunk with nulls tests completed!") end -@testset "Writer write_columns decimal types" begin - println("Testing write_columns with decimal types (Int32/Int64/bytes backing)...") +@testset "Writer RowChunk decimal types" begin + println("Testing RowChunk with decimal types (Int32/Int64/Int128 backing)...") catalog_uri = get_catalog_uri() props = get_catalog_properties() @@ -977,13 +974,16 @@ end col_balances = Int128[12345678901234567890, -999999999999, 1] data_files = RustyIceberg.with_data_file_writer(table) do writer - batch = RustyIceberg.ColumnBatch() - push!(batch, col_ids) - push!(batch, col_prices; column_type=RustyIceberg.COLUMN_TYPE_DECIMAL_INT32) - push!(batch, col_volumes; column_type=RustyIceberg.COLUMN_TYPE_DECIMAL_INT64) - push!(batch, col_balances; column_type=RustyIceberg.COLUMN_TYPE_DECIMAL_INT128) - RustyIceberg.write_columns(writer, batch) - println("✅ Decimal data written via write_columns") + # The writer infers DECIMAL_INT32 / INT64 / INT128 column types from the + # schema's Decimal128(precision, scale). Callers just push raw Int32 / Int64 + # / Int128 scaled-integer columns matching that precision. + chunk = RustyIceberg.RowChunk() + push!(chunk, col_ids) + push!(chunk, col_prices) + push!(chunk, col_volumes) + push!(chunk, col_balances) + append!(writer, chunk) + println("✅ Decimal data appended") end @test data_files !== nothing && data_files.ptr != C_NULL println("✅ Writer closed, got DataFiles handle") @@ -1049,11 +1049,11 @@ end end end - println("\n✅ write_columns decimal types tests completed!") + println("\n✅ RowChunk decimal types tests completed!") end -@testset "Writer write_columns decimal nullable" begin - println("Testing write_columns with nullable decimal column...") +@testset "Writer RowChunk decimal nullable" begin + println("Testing RowChunk with nullable decimal column...") catalog_uri = get_catalog_uri() props = get_catalog_properties() @@ -1089,11 +1089,11 @@ end validity = BitVector([true, false, true, false, true]) data_files = RustyIceberg.with_data_file_writer(table) do writer - batch = RustyIceberg.ColumnBatch() - push!(batch, col_ids) - push!(batch, col_prices; validity=validity, column_type=RustyIceberg.COLUMN_TYPE_DECIMAL_INT64) - RustyIceberg.write_columns(writer, batch) - println("✅ Nullable decimal data written") + chunk = RustyIceberg.RowChunk() + push!(chunk, col_ids) + push!(chunk, col_prices; validity=validity) + append!(writer, chunk) + println("✅ Nullable decimal data appended") end @test data_files !== nothing @@ -1146,11 +1146,11 @@ end end end - println("\n✅ write_columns decimal nullable tests completed!") + println("\n✅ RowChunk decimal nullable tests completed!") end -@testset "Writer write_columns (GatheredBatch) API" begin - println("Testing write_columns with GatheredBatch (gathered-column) API...") +@testset "Writer streaming — multi-chunk coalescing" begin + println("Testing streaming append! with multiple RowChunks per writer...") catalog_uri = get_catalog_uri() props = get_catalog_properties() @@ -1165,7 +1165,7 @@ end catalog = RustyIceberg.catalog_create_rest(catalog_uri; properties=props) @test catalog !== nothing - test_namespace = ["test_gathered_$(round(Int, time() * 1000))"] + test_namespace = ["test_builder_multi_$(round(Int, time() * 1000))"] RustyIceberg.create_namespace(catalog, test_namespace) # Schema: id (non-nullable long), score (nullable double), tag (nullable string) @@ -1175,60 +1175,49 @@ end Field(Int32(3), "tag", IcebergString(); required=false), ]) - table_name = "gathered_test_$(round(Int, time() * 1000))" + table_name = "builder_multi_$(round(Int, time() * 1000))" table = RustyIceberg.create_table(catalog, test_namespace, table_name, schema) @test table != C_NULL println("✅ Table created") - # --- Data layout (4 rows) --- - # id: [1, 2, 3, 4] — non-nullable, single sequential slice - # score: [1.1, null, 3.3, null] — nullable; two sequential slices, each with validity - # tag: ["alpha", null, "gamma", null] — nullable string via add_string_slice! + # Write 4 rows across 3 separate append! calls. + # id: [1, 2, 3, 4] — non-nullable; 3 chunks of lengths 1, 2, 1 + # score: [1.1, null, 3.3, null] — nullable; 3 chunks with validity + # tag: ["alpha", null, "gamma", null] — nullable string; 3 chunks + # + # Chunks are deliberately mis-aligned in terms of source array sizes to exercise + # the multi-chunk accumulation path. Chunk 2 for score uses a scattered sel. data_files = RustyIceberg.with_data_file_writer(table) do writer - batch = RustyIceberg.GatheredBatch() - - # id: single sequential slice, no nulls - id_data = Int64[1, 2, 3, 4] - id_col = RustyIceberg.GatheredColumn(RustyIceberg.COLUMN_TYPE_INT64) - RustyIceberg.add_slice!(id_col, id_data) - push!(batch, id_col) - println("✅ id column built (sequential, non-nullable)") - - # score: two sequential slices, each with validity masks - # Slice 1 — src = [1.1, 9.9], validity = [true, false] → rows 0 (1.1) and 1 (null) - # Slice 2 — src = [3.3, 8.8] via selection [1] + identity [8.8] (just sequential here) - # Use scattered access for slice 2: src=[99.9, 3.3, 88.8], sel=[2] → picks 3.3 - score_src1 = Float64[1.1, 9.9] - score_valid1 = BitVector([true, false]) - - score_src2 = Float64[99.9, 3.3, 88.8] - score_sel2 = Int64[2] # picks index 2 → 3.3 (1-based) - score_valid2 = BitVector([true]) - - score_src3 = Float64[7.7, 8.8] - score_valid3 = BitVector([false]) # null for row 3 - - score_col = RustyIceberg.GatheredColumn(RustyIceberg.COLUMN_TYPE_FLOAT64; nullable=true) - RustyIceberg.add_slice!(score_col, score_src1; validity=score_valid1) - RustyIceberg.add_slice!(score_col, score_src2; sel=score_sel2, validity=score_valid2) - RustyIceberg.add_slice!(score_col, score_src3; sel=Int64[1], validity=score_valid3) - push!(batch, score_col) - println("✅ score column built (scattered + nullable)") - - # tag: string column via the high-level add_string_slice! overload - # Row 0: "alpha", row 1: null, row 2: "gamma", row 3: null - tag_col = RustyIceberg.GatheredColumn(RustyIceberg.COLUMN_TYPE_STRING; nullable=true) - RustyIceberg.add_string_slice!( - tag_col, - ["alpha", "", "gamma", ""]; - validity=BitVector([true, false, true, false]) - ) - push!(batch, tag_col) - println("✅ tag column built (string via add_string_slice!)") - - RustyIceberg.write_columns(writer, batch) - println("✅ Batch written via write_columns (GatheredBatch)") + # --- Chunk 1: row 0 --- + c1 = RustyIceberg.RowChunk() + push!(c1, Int64[1]) + push!(c1, Float64[1.1]; validity=BitVector([true])) + push!(c1, ["alpha"]) + append!(writer, c1) + println("✅ Chunk 1 appended") + + # --- Chunk 2: rows 1-2 (score uses a scattered selection) --- + # score_src: [99.9, 3.3, 88.8], sel=[2,1] → values [3.3, 99.9]. + # Source-aligned validity describes the source row, so the bits map as: + # source row 0 = 99.9 → false (output row 1 will be null) + # source row 1 = 3.3 → true (output row 0 is valid) + # source row 2 = 88.8 → unused (not selected); set to true for cleanliness. + c2 = RustyIceberg.RowChunk() + push!(c2, Int64[2, 3]) + push!(c2, Float64[99.9, 3.3, 88.8]; + sel=Int64[2, 1], validity=BitVector([false, true, true])) + push!(c2, ["", "gamma"]; validity=BitVector([false, true])) + append!(writer, c2) + println("✅ Chunk 2 appended (scattered score, nullable strings)") + + # --- Chunk 3: row 3 --- + c3 = RustyIceberg.RowChunk() + push!(c3, Int64[4]) + push!(c3, Float64[0.0]; validity=BitVector([false])) + push!(c3, [""]; validity=BitVector([false])) + append!(writer, c3) + println("✅ Chunk 3 appended; close_writer will flush remainder") end @test data_files !== nothing && data_files.ptr != C_NULL println("✅ Writer closed") @@ -1247,22 +1236,20 @@ end println("✅ Read $(length(tbl.id)) rows") perm = sortperm(tbl.id) - sorted_ids = tbl.id[perm] + sorted_ids = tbl.id[perm] sorted_scores = tbl.score[perm] - sorted_tags = tbl.tag[perm] + sorted_tags = tbl.tag[perm] - # Verify id column (non-nullable, sequential) @test sorted_ids == Int64[1, 2, 3, 4] println("✅ id values correct") - # Verify score column (nullable, scattered slices) + # sel=[2,1] on [99.9, 3.3, 88.8] → row0=src[2]=3.3 (valid), row1=src[1]=99.9 (null) @test !ismissing(sorted_scores[1]) && sorted_scores[1] ≈ 1.1 - @test ismissing(sorted_scores[2]) - @test !ismissing(sorted_scores[3]) && sorted_scores[3] ≈ 3.3 + @test !ismissing(sorted_scores[2]) && sorted_scores[2] ≈ 3.3 + @test ismissing(sorted_scores[3]) @test ismissing(sorted_scores[4]) - println("✅ score values correct (including nulls)") + println("✅ score values correct (including nulls and scattered access)") - # Verify tag column (nullable string) @test !ismissing(sorted_tags[1]) && sorted_tags[1] == "alpha" @test ismissing(sorted_tags[2]) @test !ismissing(sorted_tags[3]) && sorted_tags[3] == "gamma" @@ -1289,7 +1276,296 @@ end end end - println("\n✅ write_columns (GatheredBatch) API tests completed!") + println("\n✅ Streaming multi-chunk tests completed!") +end + +@testset "Writer streaming — source-aligned validity with non-identity sel" begin + # Regression test for the source-aligned-validity gather. The source arrays are + # length 5, sel=[3, 1, 4] picks 3 output rows, and the validity bitmaps are + # source-aligned (length 5). Under the old output-aligned semantics the library + # would have memcpy'd validity bits [0..3) into the output without gathering, and + # the null pattern would have been wrong. Under source-aligned semantics the + # numeric path gathers through `sel` in Rust and the string path gathers through + # `sel` on the Julia side of `push!` — the visible contract is the same. + println("Testing source-aligned validity + non-identity sel...") + + catalog_uri = get_catalog_uri() + props = get_catalog_properties() + + catalog = nothing + table = C_NULL + data_files = nothing + test_namespace = nothing + table_name = nothing + + try + catalog = RustyIceberg.catalog_create_rest(catalog_uri; properties=props) + @test catalog !== nothing + + test_namespace = ["test_builder_srcval_$(round(Int, time() * 1000))"] + RustyIceberg.create_namespace(catalog, test_namespace) + + schema = Schema([ + Field(Int32(1), "id", IcebergLong(); required=true), + Field(Int32(2), "v", IcebergDouble(); required=false), + Field(Int32(3), "tag", IcebergString(); required=false), + ]) + + table_name = "builder_srcval_$(round(Int, time() * 1000))" + table = RustyIceberg.create_table(catalog, test_namespace, table_name, schema) + @test table != C_NULL + + # Source columns (length 5) and sel = [3, 1, 4]: + # output row 0 ← source row 3 → v=30.0 (valid), tag="gamma" (valid) + # output row 1 ← source row 1 → v=10.0 (valid), tag="alpha" (valid) + # output row 2 ← source row 4 → v=40.0 (null!), tag="" (null) + v_src = Float64[10.0, 20.0, 30.0, 40.0, 50.0] + v_validity = BitVector([true, false, true, false, true]) # source-aligned + tag_src = String["alpha", "beta", "gamma", "", "epsilon"] + tag_valid = BitVector([true, true, true, false, true]) # source-aligned + sel = Int64[3, 1, 4] + + data_files = RustyIceberg.with_data_file_writer(table) do writer + c = RustyIceberg.RowChunk() + push!(c, Int64[1, 2, 3]) + push!(c, v_src; sel=sel, validity=v_validity) + push!(c, tag_src; sel=sel, validity=tag_valid) + append!(writer, c) + end + @test data_files !== nothing && data_files.ptr != C_NULL + + updated_table = RustyIceberg.with_transaction(table, catalog) do tx + RustyIceberg.with_fast_append(tx) do action + RustyIceberg.add_data_files(action, data_files) + end + end + @test updated_table != C_NULL + + tbl = read_table_data(updated_table) + @test tbl !== nothing + @test length(tbl.id) == 3 + + perm = sortperm(tbl.id) + ids = tbl.id[perm] + vs = tbl.v[perm] + tags = tbl.tag[perm] + + # id was written [1, 2, 3] in chunk order, paired with output rows 0..2. + # After sortperm(tbl.id) the position is id ascending, so: + # ids[1]=1 ↔ output row 0 ↔ source row 3 → v=30.0, tag="gamma" + # ids[2]=2 ↔ output row 1 ↔ source row 1 → v=10.0, tag="alpha" + # ids[3]=3 ↔ output row 2 ↔ source row 4 → null, tag=null + @test ids == Int64[1, 2, 3] + @test !ismissing(vs[1]) && vs[1] ≈ 30.0 + @test !ismissing(vs[2]) && vs[2] ≈ 10.0 + @test ismissing(vs[3]) + + @test !ismissing(tags[1]) && tags[1] == "gamma" + @test !ismissing(tags[2]) && tags[2] == "alpha" + @test ismissing(tags[3]) + + RustyIceberg.free_table(updated_table) + finally + if data_files !== nothing && data_files.ptr != C_NULL + RustyIceberg.free_data_files!(data_files) + end + if table != C_NULL + RustyIceberg.free_table(table) + end + if table_name !== nothing && test_namespace !== nothing && catalog !== nothing + RustyIceberg.drop_table(catalog, test_namespace, table_name) + end + if test_namespace !== nothing && catalog !== nothing + RustyIceberg.drop_namespace(catalog, test_namespace) + end + if catalog !== nothing + RustyIceberg.free_catalog!(catalog) + end + end + + println("✅ Source-aligned validity + sel test completed") +end + +@testset "Writer streaming — explicit flush! between windows" begin + println("Testing explicit flush!(writer) between windows...") + + catalog_uri = get_catalog_uri() + props = get_catalog_properties() + + catalog = nothing + table = C_NULL + data_files = nothing + test_namespace = nothing + table_name = nothing + + try + catalog = RustyIceberg.catalog_create_rest(catalog_uri; properties=props) + @test catalog !== nothing + + test_namespace = ["test_builder_reuse_$(round(Int, time() * 1000))"] + RustyIceberg.create_namespace(catalog, test_namespace) + + schema = Schema([ + Field(Int32(1), "id", IcebergLong(); required=true), + Field(Int32(2), "value", IcebergDouble(); required=false), + ]) + + table_name = "builder_reuse_$(round(Int, time() * 1000))" + table = RustyIceberg.create_table(catalog, test_namespace, table_name, schema) + @test table != C_NULL + println("✅ Table created") + + data_files = RustyIceberg.with_data_file_writer(table) do writer + # Window 1: rows [1, 2] + c1 = RustyIceberg.RowChunk() + push!(c1, Int64[1, 2]) + push!(c1, Float64[10.0, 20.0]) + append!(writer, c1) + RustyIceberg.flush!(writer) # force a flush boundary + println("✅ Window 1 flushed") + + # Window 2: rows [3, 4, 5] — fresh window after flush! + c2 = RustyIceberg.RowChunk() + push!(c2, Int64[3, 4, 5]) + push!(c2, Float64[30.0, 40.0, 50.0]) + append!(writer, c2) + # close_writer will flush window 2's remainder + println("✅ Window 2 appended (close flushes remainder)") + end + @test data_files !== nothing && data_files.ptr != C_NULL + println("✅ Writer closed") + + updated_table = RustyIceberg.with_transaction(table, catalog) do tx + RustyIceberg.with_fast_append(tx) do action + RustyIceberg.add_data_files(action, data_files) + end + end + @test updated_table != C_NULL + println("✅ Transaction committed") + + tbl = read_table_data(updated_table) + @test tbl !== nothing + @test length(tbl.id) == 5 + println("✅ Read $(length(tbl.id)) rows") + + perm = sortperm(tbl.id) + @test tbl.id[perm] == Int64[1, 2, 3, 4, 5] + @test tbl.value[perm] == Float64[10.0, 20.0, 30.0, 40.0, 50.0] + println("✅ Data from both windows correct") + + RustyIceberg.free_table(updated_table) + + finally + if data_files !== nothing && data_files.ptr != C_NULL + RustyIceberg.free_data_files!(data_files) + end + if table != C_NULL + RustyIceberg.free_table(table) + end + if table_name !== nothing && test_namespace !== nothing && catalog !== nothing + RustyIceberg.drop_table(catalog, test_namespace, table_name) + end + if test_namespace !== nothing && catalog !== nothing + RustyIceberg.drop_namespace(catalog, test_namespace) + end + if catalog !== nothing + RustyIceberg.free_catalog!(catalog) + end + end + + println("\n✅ Streaming explicit-flush tests completed!") +end + +@testset "Writer streaming — date and timestamp epoch conversion" begin + println("Testing date/timestamp epoch conversion through the streaming path...") + + catalog_uri = get_catalog_uri() + props = get_catalog_properties() + + catalog = nothing + table = C_NULL + data_files = nothing + test_namespace = nothing + table_name = nothing + + try + catalog = RustyIceberg.catalog_create_rest(catalog_uri; properties=props) + @test catalog !== nothing + + test_namespace = ["test_builder_dates_$(round(Int, time() * 1000))"] + RustyIceberg.create_namespace(catalog, test_namespace) + + schema = Schema([ + Field(Int32(1), "id", IcebergLong(); required=true), + Field(Int32(2), "event_date", IcebergDate(); required=true), + Field(Int32(3), "event_ts", IcebergTimestamp(); required=true), + ]) + + table_name = "builder_dates_$(round(Int, time() * 1000))" + table = RustyIceberg.create_table(catalog, test_namespace, table_name, schema) + @test table != C_NULL + println("✅ Table created") + + # 2024-01-01 in Julia Date internal representation (Rata Die days, 1-based) + # Dates.value(Date(2024,1,1)) = 738886 + # Expected Iceberg Date32 (days since 1970-01-01) = 738886 - 719163 = 19723 + # Expected Iceberg timestamp (μs since 1970-01-01) = 19723 * 86400 * 1_000_000 = 1_704_067_200_000_000 + julia_date_val = Dates.value(Dates.Date(2024, 1, 1)) # 738886 + julia_ts_val = Dates.value(Dates.DateTime(2024, 1, 1, 0, 0, 0)) # ms since year 1 + + # Writer infers JULIA_DATE / JULIA_TIMESTAMP from the schema (IcebergDate / + # IcebergTimestamp). User just pushes raw Int64 values (Dates.value of the Julia + # Date/DateTime) and Rust handles the epoch conversion at copy time. + data_files = RustyIceberg.with_data_file_writer(table) do writer + chunk = RustyIceberg.RowChunk() + push!(chunk, Int64[1]) + push!(chunk, Int64[julia_date_val]) + push!(chunk, Int64[julia_ts_val]) + append!(writer, chunk) + println("✅ Date/timestamp chunk appended") + end + @test data_files !== nothing && data_files.ptr != C_NULL + + updated_table = RustyIceberg.with_transaction(table, catalog) do tx + RustyIceberg.with_fast_append(tx) do action + RustyIceberg.add_data_files(action, data_files) + end + end + @test updated_table != C_NULL + + tbl = read_table_data(updated_table) + @test tbl !== nothing + @test length(tbl.id) == 1 + + # Arrow.jl returns Arrow.Date / Arrow.Timestamp wrappers with a raw integer in .x. + # Check the raw values directly to verify the Julia→Unix epoch offset is correct. + @test tbl.event_date[1].x == Int32(19723) # 2024-01-01 = day 19723 since 1970-01-01 + println("✅ event_date.x = $(tbl.event_date[1].x) (expected 19723)") + + @test tbl.event_ts[1].x == Int64(1_704_067_200_000_000) # 2024-01-01T00:00:00 in μs + println("✅ event_ts.x = $(tbl.event_ts[1].x) (expected 1_704_067_200_000_000)") + + RustyIceberg.free_table(updated_table) + + finally + if data_files !== nothing && data_files.ptr != C_NULL + RustyIceberg.free_data_files!(data_files) + end + if table != C_NULL + RustyIceberg.free_table(table) + end + if table_name !== nothing && test_namespace !== nothing && catalog !== nothing + RustyIceberg.drop_table(catalog, test_namespace, table_name) + end + if test_namespace !== nothing && catalog !== nothing + RustyIceberg.drop_namespace(catalog, test_namespace) + end + if catalog !== nothing + RustyIceberg.free_catalog!(catalog) + end + end + + println("\n✅ Streaming date/timestamp tests completed!") end @testset "Writer WriterConfig parquet properties" begin