Skip to content

writer: new RowChunk column-write API; rebuild the encode pool#102

Merged
rgankema merged 29 commits into
mainfrom
rg/batch-builder
May 28, 2026
Merged

writer: new RowChunk column-write API; rebuild the encode pool#102
rgankema merged 29 commits into
mainfrom
rg/batch-builder

Conversation

@rgankema
Copy link
Copy Markdown
Contributor

@rgankema rgankema commented May 20, 2026

Summary

Two independent changes ship together because they share the same FFI seam: a redesigned column-write API on the Julia side, and a rebuilt encode pool on the Rust side.

Column-write API: RowChunk + append! + flush!

Main exposes two parallel column-write paths — the low-level ColumnBatch / ColumnDescriptor surface and the high-level "gathered" surface (GatheredBatch, GatheredColumn, SliceRef, GatheredColumnDescriptor, add_slice!, add_string_slice!) — both eventually calling one of three write_columns overloads. This PR replaces all of that with a new design built around a single user-facing Julia type, RowChunk, and two methods, push! and append!, plus an explicit flush!.

The biggest shape change is who picks batch boundaries. On main, each write_columns(writer, batch) call is one Arrow RecordBatch: the caller's slice cadence and the encoder's row-group cadence are forced to coincide. In this
PR the writer owns an embedded RecordBatchBuilder, callers hand in arbitrary upstream slices via append!(writer, chunk), and the writer auto-flushes a RecordBatch to the encode pool when the accumulated row count reaches its coalesce window. flush!(writer) is the explicit escape hatch for logical boundaries (end of transaction, time tick) where a row-group break is desired sooner than the natural window.

Column types no longer have to be supplied per call. The builder derives them from the table's Arrow schema at writer construction, so callers drop the COLUMN_TYPE_* enum values they previously passed through add_slice!, add_string_slice!, and the ColumnBatch push overloads.

The push! signatures are also broader than add_slice! / add_string_slice!:

  • Strings now accept sel for scattered access. On main, add_string_slice! rejected selection vectors and required the caller to pre-build str_ptrs / str_lens arrays with any selection already applied; the new push! does the gather internally.
  • Strings now accept any AbstractVector{<:AbstractString} — Arrow's VariableSizeString, SubString{String}, etc. — without materializing through Vector{String}.
  • sel accepts any contiguous AbstractVector{Int64}, not just Vector{Int64}, so callers can pass view(sel_buf, 1:n) when their sel buffer carries stale capacity beyond the live region.

Streaming pipelines reuse the same RowChunk across iterations via empty!. That clears the working vectors but retains the chunk's internal string-gather pool, so once the pool is sized to the steady-state slice width, subsequent empty!/push!/append! cycles allocate nothing on the string path. On main the equivalent loop allocated a fresh str_ptrs / str_lens pair per add_string_slice! call.

Source-aligned validity semantics

Validity bitmaps on main were output-aligned: when the caller used sel, they were responsible for gathering the source validity through sel themselves and producing a bitmap whose length matched the output row count.

This PR flips the contract: length(validity) >= length(data), and bit i describes whether data[i] is valid. This matches the natural shape of NullableVector, Vector{Union{T,Missing}}, and Arrow nullable arrays. When sel is also supplied, the library gathers validity alongside the value gather — numerics gather in Rust as part of the existing value gather; strings gather in Julia (where the value gather already lives, since Rust can't walk an AbstractString vector across the FFI) and materialize the output-aligned bitmap into a per-column scratch buffer that's reused across empty!.

Encode pool: per-writer FIFO + work-stealing async tasks

On main the encode pool is N OS threads (N = available_parallelism) pulling from a single MPMC channel, with per-writer ordering enforced by an Arc<Mutex<ConcreteDataFileWriter>> inside WriterState. That design has head-of-line blocking baked in: when two tasks for the same writer land adjacently in the global queue, the second worker takes the task and then blocks on the writer's mutex even when tasks for other idle writers are sitting behind it in the queue.

This PR moves to per-writer FIFO queues drained by a work-stealing pool. Each WriterState now owns a Mutex<VecDeque<RecordBatch>> plus atomic queue_len and busy flag. Workers scan the active-writer snapshot, skip writers with empty queues, CAS busy: false → true to claim one, then drain it. Wakeups use a single shared Notify with two safeguards against collapsed-notification races: the worker that wins a claim cascades a wake to a peer before draining, and after releasing busy it re-checks queue_len and notifies again if a producer pushed during the release window.

The workers themselves moved from OS threads to async tokio tasks. The old loop ran handle.block_on(w.write(batch)), holding the runtime thread for the duration of the S3 PUT. With async tasks the runtime thread is free during the I/O await and can drive another writer's encode in the meantime, hiding S3 latency across writers without disturbing per-writer FIFO ordering. The default worker count moved from available_parallelism to 2 * available_parallelism to keep cores busy with encode CPU from one task while others await I/O.

In-flight encodes can now observe writer shutdown: iceberg_writer_free flips a poisoned flag and the task drops the writer rather than restoring it. The old catch_unwind + manual pending bookkeeping is replaced by a PendingGuard Drop impl that decrements exactly once on normal return or panic.

Other changes

LZ4_RAW = 5 compression codec added as a separate enum value mapping to Compression::LZ4_RAW (modern raw blocks, parquet spec codec ID 7). The existing LZ4 = 3 keeps mapping to Compression::LZ4 — the legacy Hadoop-framed variant whose per-page framing overhead is the usual reason LZ4 underperforms Snappy in parquet benchmarks — for backward compatibility.

🤖 Generated with Claude Code

gbrgr and others added 16 commits May 11, 2026 10:33
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Arrow.jl returns Arrow.Date/Arrow.Timestamp wrappers, not Dates.Date/DateTime.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…rge Julia finalize arms

Removes ~130 lines of duplicated identity/scatter dispatch in append_numeric by
introducing two declarative macros. Merges JULIA_DATE/TIMESTAMP/TIMESTAMPTZ finalize
arms with their non-Julia counterparts since the Arrow output type is identical.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This reverts commit dfdb437.
Replace the global MPMC channel + per-writer mutex with per-writer FIFO
queues and a work-stealing worker pool. Workers scan the active-writer
set, CAS a `busy` flag to claim a writer, then drain its queue. This
removes the head-of-line blocking observed when multiple workers pulled
tasks for the same writer and serialized on its mutex.

Wakeup discipline: a single shared `Notify` plus a cascade — the worker
that wins a claim notifies a peer before draining, so concurrent
producer notifications that collapse to one stored permit still wake
enough workers. After releasing `busy`, the worker re-checks `queue_len`
and notifies again if a producer pushed during the release window.

Add a `#[cfg(test)]` encode hook plus two unit tests:
- fairness: 4 writers × 8 batches drain in round-robin, preserving
  per-writer FIFO.
- stranded-task stress: 1.6k submits across 8 writers from 4 producer
  threads all reach pending=0.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
The existing LZ4 codec maps to Compression::LZ4, the deprecated
Hadoop-framed variant whose per-page framing overhead is the usual
reason LZ4 underperforms Snappy in parquet benchmarks. Add LZ4_RAW = 5
as a separate FFI enum value mapping to Compression::LZ4_RAW (modern
raw blocks, parquet spec codec ID 7). LZ4 = 3 keeps its current
behavior for backward compatibility.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
robertbuessow added a commit that referenced this pull request May 21, 2026
Annotate classified_error(), classify(), and classify_iceberg() with
#[track_caller] so each call site embeds "[src/file.rs:line]" into the
detail field at compile time (zero runtime overhead).  Inlined
classify_iceberg() so it no longer delegates to classify() — otherwise
#[track_caller] would capture a line inside error_codes.rs instead of
the actual FFI site.  Converted all map_err(classify_iceberg) and
map_err(classify) callers to closure form (|e| classify_iceberg(e))
because bare function-pointer dispatch loses the caller attribute;
closure bodies preserve the source location.

IcebergException.detail in Julia now looks like:
  "Null scan pointer provided [src/full.rs:169]"

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
rgankema and others added 10 commits May 22, 2026 08:57
Replaces the OS encode threads each calling handle.block_on(w.write(batch))
with N async tasks spawned on the existing runtime. The runtime thread is
freed during the I/O await inside w.write() and can pick up another
writer's encode task in the meantime — hiding S3 PUT latency across
writers without changing per-writer FIFO ordering.

Adds WriterState::poisoned so iceberg_writer_free can signal in-flight
encodes (which now release the writer mutex during .await) to drop
rather than restore the writer. Replaces the old catch_unwind
bookkeeping with a PendingGuard Drop impl that decrements pending
exactly once on normal return or panic.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Since the conversion to async worker tasks, workers parked on S3 PUTs no
longer consume runtime threads. Defaulting to one task per core left
roughly half the cores idle while tasks awaited I/O. Oversubscribing by
2x lets the runtime keep cores busy with encode CPU from one task while
others are parked on I/O — measured ~7% throughput improvement on an
EBS-capped local benchmark; expected to be larger when I/O latency is
higher (real S3).

Also refresh the now-stale "OS threads" / "N = available_parallelism"
phrasing in the module/struct/FFI doc comments to reflect the async-task
implementation.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Rename the Rust module to record_batch_builder and embed the builder
inside IcebergDataFileWriter (UnsafeCell<Option<RecordBatchBuilder>>,
lazily initialized). Column types are derived from the table's Arrow
schema at writer construction; callers no longer pass col_types.

Auto-flush when the coalesce window fills; explicit flush via the new
iceberg_writer_flush. close also flushes any partial-window remainder.

The Julia surface collapses to one user-facing type (RowChunk) and two
methods (append!, flush!). ColumnBatch, SliceBatch, ColumnBatchBuilder,
ColumnDescriptor, append_slice!, free_builder!, and both write_columns
overloads are removed. SliceRef is renamed to ColumnSlice (internal).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
push!(chunk, ...) now just records a reference to the column data plus
optional validity/sel — no pointer-taking, no allocation of ptr/len arrays.
All FFI prep (pointer-taking, string ptr/len gather) happens in append!
using scratch buffers owned by DataFileWriter. Scratch is grown lazily on
first use and reused across every subsequent append!, so a streaming
pipeline pays zero per-call allocation for the FFI argument arrays.

For string columns this kills the per-chunk Vector{Ptr{UInt8}} and
Vector{Int64} allocation that previously scaled with chunk size — the
benchmark on 100M rows × 4 string columns allocated 9 GiB more than the
pre-refactor builder; this commit restores parity.

push!(chunk, ::AbstractVector{<:AbstractString}) replaces the narrow
::Vector{String} signature, so callers with Vector{VariableSizeString},
Vector{SubString{String}}, etc. don't have to materialize through String.

ColumnSlice moves up next to DataFileWriter so the writer can carry a
Vector{ColumnSlice} scratch field. FFI surface is unchanged.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
… time

Numeric push! builds ColumnSlice inline like the pre-Option-D code — fully
type-stable, no allocation beyond chunk.slices/preserve growth.

String push! also builds ColumnSlice inline, but materializes ptr/len into
a per-column-position pool owned by the RowChunk. Base.empty!(chunk) resets
the working vectors without freeing the pool, so streaming pipelines that
follow the `empty!; push!; append!` idiom pay zero per-iteration allocation
for string-gather buffers (down from 9 GiB on the 100M×4-string benchmark).

push!(chunk, ::AbstractVector{<:AbstractString}) accepts any AbstractString
subtype directly (VariableSizeString, SubString{String}, …), so callers
don't have to materialize through Vector{String}.

String columns now also support `sel=` for scattered access — same shape as
the numeric overload. The original API documented that strings couldn't
support sel; that rationale (caller had to provide pre-built ptr/len in the
old low-level overload) no longer applies once the gather lives in push!.

Replaces the Option D commit (8ded709): deferring numeric prep into append!
introduced an Any-typed wrapper struct that broke type stability and cost
~10-20% on numeric workloads. This commit puts numerics back on the typed
path while keeping the string buffer-reuse win.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Validity bitmaps passed to `push!` are now source-aligned by contract
(`length(validity) >= length(data)`, bit `i` describes `data[i]`),
matching the natural shape of `NullableVector` / `Vector{Union{T,Missing}}` /
Arrow nullable arrays. With `sel == nothing`, source and output positions
coincide and the steady-state hot path is unchanged.

Numeric path: Rust gathers validity through `sel` alongside the value gather
in `append_to_state` — added a new branch before the existing memcpy and
bit-by-bit paths.

String path: Rust never sees `sel_ptr` for string columns (the value gather
is pre-applied on the Julia side because Rust can't walk an
`AbstractString` vector across the FFI). The validity gather now folds
into that same Julia-side loop, materializing an output-aligned bitmap
into a new per-column `RowChunk.str_validity` pool retained across
`empty!` so streaming reuse stays zero-allocation.

The consumer-visible contract is identical for numeric and string columns:
pass a source-aligned `validity` either way; the library does the gather.

Tests:
- "multi-chunk coalescing" translated to source-aligned validity.
- New "source-aligned validity with non-identity sel" testset covers
  both numeric and string columns with a 5-element source and sel=[3,1,4].
Relaxes the `sel` parameter of both `push!` overloads from
`Union{Nothing, Vector{Int64}}` to `Union{Nothing, AbstractVector{Int64}}`
so callers can pass a `view(sel_buf, 1:n)` when their sel buffer carries
stale capacity beyond the live region.

`pointer(sel)` and `length(sel)` already work correctly for contiguous
`SubArray{Int64,1,Vector{Int64},Tuple{UnitRange{Int}},true}` (verified:
`pointer(view(v, 3:7)) == pointer(v) + 2*sizeof(Int64)`), so no body
changes are needed.
@rgankema rgankema changed the title writer: per-writer queues + work-stealing encode pool writer: streaming RowChunk API and work-stealing async encode pool May 28, 2026
@rgankema rgankema changed the title writer: streaming RowChunk API and work-stealing async encode pool writer: new RowChunk column-write API; rebuild the encode pool May 28, 2026
cargo fmt and address all clippy lints (div_ceil, is_multiple_of,
vec![] over Vec::new+push, enumerate over index loop, size_of_val).
Annotate get_or_init_builder with allow(mut_from_ref) since it relies
on UnsafeCell interior mutability per the documented single-thread
FFI contract.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@rgankema rgankema requested review from gbrgr, hall-alex and robertbuessow and removed request for gbrgr May 28, 2026 07:31
@rgankema rgankema marked this pull request as ready for review May 28, 2026 07:32
Comment thread src/writer.jl
desc = ColumnDescriptor(pointer(data), ...)
write_columns(writer, [desc], (data, validity)) # Arrays preserved during call
chunk = RowChunk()
push!(chunk, ids) # non-nullable numeric, sequential
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering here: Can users always know the schema order this one expects? They I guess have to use it from the stored arrow schema? I am wondering whether retrieving the table schema multiple times always gives the same schema order, or whether that could potentially be a source of bugs.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I (and Claude) assumed the schema order is fixed. I have nothing to back that up, but it would be surprising if it weren't, no?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering, because usually schema information is usually returned always as some JSON/dict like object, and not sure whether the iceberg spec specifies a column order?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I btw do not think it is a big issue in any case, because callers can just rely on the schema they see in the writer

Comment thread src/writer.jl Outdated
rgankema and others added 2 commits May 28, 2026 12:38
…or pattern

Use DATA_SCHEMA_MISMATCH (matching `_write_ipc_bytes`) for append!/flush!
non-zero FFI returns — these are deferred-error placeholders; the real
classified error surfaces via close_writer.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
// during S3 PUTs. Number of in-flight encodes is no longer bounded by OS
// thread count, only by core count for actual CPU work.
for _ in 0..n {
handle.spawn(encode_worker_loop(pool_ref));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you ask Claude whether here it would make sense to use spawn_blocking instead, as encoding is CPU-bound?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking spawn was the point. The encoder tasks don't just encode, they also write to disk and they block on that. One of the things this PR does is creating more tasks than threads so that an encoder task can take over while another encoder task is blocked on fsync.

Copy link
Copy Markdown
Contributor

@gbrgr gbrgr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@rgankema rgankema merged commit 6155c04 into main May 28, 2026
6 checks passed
@rgankema rgankema deleted the rg/batch-builder branch May 28, 2026 13:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants