writer: new RowChunk column-write API; rebuild the encode pool#102
Conversation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Arrow.jl returns Arrow.Date/Arrow.Timestamp wrappers, not Dates.Date/DateTime. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…rge Julia finalize arms Removes ~130 lines of duplicated identity/scatter dispatch in append_numeric by introducing two declarative macros. Merges JULIA_DATE/TIMESTAMP/TIMESTAMPTZ finalize arms with their non-Julia counterparts since the Arrow output type is identical. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This reverts commit dfdb437.
This reverts commit 8db1a0b.
Replace the global MPMC channel + per-writer mutex with per-writer FIFO queues and a work-stealing worker pool. Workers scan the active-writer set, CAS a `busy` flag to claim a writer, then drain its queue. This removes the head-of-line blocking observed when multiple workers pulled tasks for the same writer and serialized on its mutex. Wakeup discipline: a single shared `Notify` plus a cascade — the worker that wins a claim notifies a peer before draining, so concurrent producer notifications that collapse to one stored permit still wake enough workers. After releasing `busy`, the worker re-checks `queue_len` and notifies again if a producer pushed during the release window. Add a `#[cfg(test)]` encode hook plus two unit tests: - fairness: 4 writers × 8 batches drain in round-robin, preserving per-writer FIFO. - stranded-task stress: 1.6k submits across 8 writers from 4 producer threads all reach pending=0. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The existing LZ4 codec maps to Compression::LZ4, the deprecated Hadoop-framed variant whose per-page framing overhead is the usual reason LZ4 underperforms Snappy in parquet benchmarks. Add LZ4_RAW = 5 as a separate FFI enum value mapping to Compression::LZ4_RAW (modern raw blocks, parquet spec codec ID 7). LZ4 = 3 keeps its current behavior for backward compatibility. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Annotate classified_error(), classify(), and classify_iceberg() with #[track_caller] so each call site embeds "[src/file.rs:line]" into the detail field at compile time (zero runtime overhead). Inlined classify_iceberg() so it no longer delegates to classify() — otherwise #[track_caller] would capture a line inside error_codes.rs instead of the actual FFI site. Converted all map_err(classify_iceberg) and map_err(classify) callers to closure form (|e| classify_iceberg(e)) because bare function-pointer dispatch loses the caller attribute; closure bodies preserve the source location. IcebergException.detail in Julia now looks like: "Null scan pointer provided [src/full.rs:169]" Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replaces the OS encode threads each calling handle.block_on(w.write(batch)) with N async tasks spawned on the existing runtime. The runtime thread is freed during the I/O await inside w.write() and can pick up another writer's encode task in the meantime — hiding S3 PUT latency across writers without changing per-writer FIFO ordering. Adds WriterState::poisoned so iceberg_writer_free can signal in-flight encodes (which now release the writer mutex during .await) to drop rather than restore the writer. Replaces the old catch_unwind bookkeeping with a PendingGuard Drop impl that decrements pending exactly once on normal return or panic. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Since the conversion to async worker tasks, workers parked on S3 PUTs no longer consume runtime threads. Defaulting to one task per core left roughly half the cores idle while tasks awaited I/O. Oversubscribing by 2x lets the runtime keep cores busy with encode CPU from one task while others are parked on I/O — measured ~7% throughput improvement on an EBS-capped local benchmark; expected to be larger when I/O latency is higher (real S3). Also refresh the now-stale "OS threads" / "N = available_parallelism" phrasing in the module/struct/FFI doc comments to reflect the async-task implementation. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Rename the Rust module to record_batch_builder and embed the builder inside IcebergDataFileWriter (UnsafeCell<Option<RecordBatchBuilder>>, lazily initialized). Column types are derived from the table's Arrow schema at writer construction; callers no longer pass col_types. Auto-flush when the coalesce window fills; explicit flush via the new iceberg_writer_flush. close also flushes any partial-window remainder. The Julia surface collapses to one user-facing type (RowChunk) and two methods (append!, flush!). ColumnBatch, SliceBatch, ColumnBatchBuilder, ColumnDescriptor, append_slice!, free_builder!, and both write_columns overloads are removed. SliceRef is renamed to ColumnSlice (internal). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
push!(chunk, ...) now just records a reference to the column data plus
optional validity/sel — no pointer-taking, no allocation of ptr/len arrays.
All FFI prep (pointer-taking, string ptr/len gather) happens in append!
using scratch buffers owned by DataFileWriter. Scratch is grown lazily on
first use and reused across every subsequent append!, so a streaming
pipeline pays zero per-call allocation for the FFI argument arrays.
For string columns this kills the per-chunk Vector{Ptr{UInt8}} and
Vector{Int64} allocation that previously scaled with chunk size — the
benchmark on 100M rows × 4 string columns allocated 9 GiB more than the
pre-refactor builder; this commit restores parity.
push!(chunk, ::AbstractVector{<:AbstractString}) replaces the narrow
::Vector{String} signature, so callers with Vector{VariableSizeString},
Vector{SubString{String}}, etc. don't have to materialize through String.
ColumnSlice moves up next to DataFileWriter so the writer can carry a
Vector{ColumnSlice} scratch field. FFI surface is unchanged.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
… time
Numeric push! builds ColumnSlice inline like the pre-Option-D code — fully
type-stable, no allocation beyond chunk.slices/preserve growth.
String push! also builds ColumnSlice inline, but materializes ptr/len into
a per-column-position pool owned by the RowChunk. Base.empty!(chunk) resets
the working vectors without freeing the pool, so streaming pipelines that
follow the `empty!; push!; append!` idiom pay zero per-iteration allocation
for string-gather buffers (down from 9 GiB on the 100M×4-string benchmark).
push!(chunk, ::AbstractVector{<:AbstractString}) accepts any AbstractString
subtype directly (VariableSizeString, SubString{String}, …), so callers
don't have to materialize through Vector{String}.
String columns now also support `sel=` for scattered access — same shape as
the numeric overload. The original API documented that strings couldn't
support sel; that rationale (caller had to provide pre-built ptr/len in the
old low-level overload) no longer applies once the gather lives in push!.
Replaces the Option D commit (8ded709): deferring numeric prep into append!
introduced an Any-typed wrapper struct that broke type stability and cost
~10-20% on numeric workloads. This commit puts numerics back on the typed
path while keeping the string buffer-reuse win.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Validity bitmaps passed to `push!` are now source-aligned by contract
(`length(validity) >= length(data)`, bit `i` describes `data[i]`),
matching the natural shape of `NullableVector` / `Vector{Union{T,Missing}}` /
Arrow nullable arrays. With `sel == nothing`, source and output positions
coincide and the steady-state hot path is unchanged.
Numeric path: Rust gathers validity through `sel` alongside the value gather
in `append_to_state` — added a new branch before the existing memcpy and
bit-by-bit paths.
String path: Rust never sees `sel_ptr` for string columns (the value gather
is pre-applied on the Julia side because Rust can't walk an
`AbstractString` vector across the FFI). The validity gather now folds
into that same Julia-side loop, materializing an output-aligned bitmap
into a new per-column `RowChunk.str_validity` pool retained across
`empty!` so streaming reuse stays zero-allocation.
The consumer-visible contract is identical for numeric and string columns:
pass a source-aligned `validity` either way; the library does the gather.
Tests:
- "multi-chunk coalescing" translated to source-aligned validity.
- New "source-aligned validity with non-identity sel" testset covers
both numeric and string columns with a 5-element source and sel=[3,1,4].
Relaxes the `sel` parameter of both `push!` overloads from
`Union{Nothing, Vector{Int64}}` to `Union{Nothing, AbstractVector{Int64}}`
so callers can pass a `view(sel_buf, 1:n)` when their sel buffer carries
stale capacity beyond the live region.
`pointer(sel)` and `length(sel)` already work correctly for contiguous
`SubArray{Int64,1,Vector{Int64},Tuple{UnitRange{Int}},true}` (verified:
`pointer(view(v, 3:7)) == pointer(v) + 2*sizeof(Int64)`), so no body
changes are needed.
cargo fmt and address all clippy lints (div_ceil, is_multiple_of, vec![] over Vec::new+push, enumerate over index loop, size_of_val). Annotate get_or_init_builder with allow(mut_from_ref) since it relies on UnsafeCell interior mutability per the documented single-thread FFI contract. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
| desc = ColumnDescriptor(pointer(data), ...) | ||
| write_columns(writer, [desc], (data, validity)) # Arrays preserved during call | ||
| chunk = RowChunk() | ||
| push!(chunk, ids) # non-nullable numeric, sequential |
There was a problem hiding this comment.
Wondering here: Can users always know the schema order this one expects? They I guess have to use it from the stored arrow schema? I am wondering whether retrieving the table schema multiple times always gives the same schema order, or whether that could potentially be a source of bugs.
There was a problem hiding this comment.
I (and Claude) assumed the schema order is fixed. I have nothing to back that up, but it would be surprising if it weren't, no?
There was a problem hiding this comment.
Just wondering, because usually schema information is usually returned always as some JSON/dict like object, and not sure whether the iceberg spec specifies a column order?
There was a problem hiding this comment.
I btw do not think it is a big issue in any case, because callers can just rely on the schema they see in the writer
…or pattern Use DATA_SCHEMA_MISMATCH (matching `_write_ipc_bytes`) for append!/flush! non-zero FFI returns — these are deferred-error placeholders; the real classified error surfaces via close_writer. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
| // during S3 PUTs. Number of in-flight encodes is no longer bounded by OS | ||
| // thread count, only by core count for actual CPU work. | ||
| for _ in 0..n { | ||
| handle.spawn(encode_worker_loop(pool_ref)); |
There was a problem hiding this comment.
Could you ask Claude whether here it would make sense to use spawn_blocking instead, as encoding is CPU-bound?
There was a problem hiding this comment.
Non-blocking spawn was the point. The encoder tasks don't just encode, they also write to disk and they block on that. One of the things this PR does is creating more tasks than threads so that an encoder task can take over while another encoder task is blocked on fsync.
Summary
Two independent changes ship together because they share the same FFI seam: a redesigned column-write API on the Julia side, and a rebuilt encode pool on the Rust side.
Column-write API:
RowChunk+append!+flush!Main exposes two parallel column-write paths — the low-level
ColumnBatch/ColumnDescriptorsurface and the high-level "gathered" surface (GatheredBatch,GatheredColumn,SliceRef,GatheredColumnDescriptor,add_slice!,add_string_slice!) — both eventually calling one of threewrite_columnsoverloads. This PR replaces all of that with a new design built around a single user-facing Julia type,RowChunk, and two methods,push!andappend!, plus an explicitflush!.The biggest shape change is who picks batch boundaries. On main, each
write_columns(writer, batch)call is one ArrowRecordBatch: the caller's slice cadence and the encoder's row-group cadence are forced to coincide. In thisPR the writer owns an embedded
RecordBatchBuilder, callers hand in arbitrary upstream slices viaappend!(writer, chunk), and the writer auto-flushes aRecordBatchto the encode pool when the accumulated row count reaches its coalesce window.flush!(writer)is the explicit escape hatch for logical boundaries (end of transaction, time tick) where a row-group break is desired sooner than the natural window.Column types no longer have to be supplied per call. The builder derives them from the table's Arrow schema at writer construction, so callers drop the
COLUMN_TYPE_*enum values they previously passed throughadd_slice!,add_string_slice!, and theColumnBatchpush overloads.The
push!signatures are also broader thanadd_slice!/add_string_slice!:selfor scattered access. On main,add_string_slice!rejected selection vectors and required the caller to pre-buildstr_ptrs/str_lensarrays with any selection already applied; the newpush!does the gather internally.AbstractVector{<:AbstractString}— Arrow'sVariableSizeString,SubString{String}, etc. — without materializing throughVector{String}.selaccepts any contiguousAbstractVector{Int64}, not justVector{Int64}, so callers can passview(sel_buf, 1:n)when their sel buffer carries stale capacity beyond the live region.Streaming pipelines reuse the same
RowChunkacross iterations viaempty!. That clears the working vectors but retains the chunk's internal string-gather pool, so once the pool is sized to the steady-state slice width, subsequentempty!/push!/append!cycles allocate nothing on the string path. On main the equivalent loop allocated a freshstr_ptrs/str_lenspair peradd_string_slice!call.Source-aligned validity semantics
Validity bitmaps on main were output-aligned: when the caller used
sel, they were responsible for gathering the source validity throughselthemselves and producing a bitmap whose length matched the output row count.This PR flips the contract:
length(validity) >= length(data), and bitidescribes whetherdata[i]is valid. This matches the natural shape ofNullableVector,Vector{Union{T,Missing}}, and Arrow nullable arrays. Whenselis also supplied, the library gathers validity alongside the value gather — numerics gather in Rust as part of the existing value gather; strings gather in Julia (where the value gather already lives, since Rust can't walk anAbstractStringvector across the FFI) and materialize the output-aligned bitmap into a per-column scratch buffer that's reused acrossempty!.Encode pool: per-writer FIFO + work-stealing async tasks
On main the encode pool is N OS threads (N =
available_parallelism) pulling from a single MPMC channel, with per-writer ordering enforced by anArc<Mutex<ConcreteDataFileWriter>>insideWriterState. That design has head-of-line blocking baked in: when two tasks for the same writer land adjacently in the global queue, the second worker takes the task and then blocks on the writer's mutex even when tasks for other idle writers are sitting behind it in the queue.This PR moves to per-writer FIFO queues drained by a work-stealing pool. Each
WriterStatenow owns aMutex<VecDeque<RecordBatch>>plus atomicqueue_lenandbusyflag. Workers scan the active-writer snapshot, skip writers with empty queues, CASbusy: false → trueto claim one, then drain it. Wakeups use a single sharedNotifywith two safeguards against collapsed-notification races: the worker that wins a claim cascades a wake to a peer before draining, and after releasingbusyit re-checksqueue_lenand notifies again if a producer pushed during the release window.The workers themselves moved from OS threads to async tokio tasks. The old loop ran
handle.block_on(w.write(batch)), holding the runtime thread for the duration of the S3 PUT. With async tasks the runtime thread is free during the I/O await and can drive another writer's encode in the meantime, hiding S3 latency across writers without disturbing per-writer FIFO ordering. The default worker count moved fromavailable_parallelismto2 * available_parallelismto keep cores busy with encode CPU from one task while others await I/O.In-flight encodes can now observe writer shutdown:
iceberg_writer_freeflips apoisonedflag and the task drops the writer rather than restoring it. The oldcatch_unwind+ manualpendingbookkeeping is replaced by aPendingGuardDropimpl that decrements exactly once on normal return or panic.Other changes
LZ4_RAW = 5compression codec added as a separate enum value mapping toCompression::LZ4_RAW(modern raw blocks, parquet spec codec ID 7). The existingLZ4 = 3keeps mapping toCompression::LZ4— the legacy Hadoop-framed variant whose per-page framing overhead is the usual reason LZ4 underperforms Snappy in parquet benchmarks — for backward compatibility.🤖 Generated with Claude Code