feat: columnar DataFrame ingest (Arrow / Polars / Pandas) + Arrow egress#153
feat: columnar DataFrame ingest (Arrow / Polars / Pandas) + Arrow egress#153kafka1991 wants to merge 117 commits into
Conversation
Plan and FFI ABI for the new column-major writer that will ingest Pandas/Polars DataFrames over QWP/WebSocket. Locks the QuestDb pool shape, BulkChunk encoder strategy, validity bitmap semantics, and the C ABI the separate Python wrapper repo will consume. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Locks the column-sender API around synchronous flush: sender.flush(&mut chunk, ack_level) blocks until the requested ACK level (Ok = WAL commit, Durable = object-store via Enterprise opt-in). Drops the FSN/submit/await split from the FFI; at most one frame in flight per sender, parallelism via the pool. Refuses sf_dir and other sf_* keys at QuestDb::connect with ConfigError — store-and-forward is single-writer-per-slot and interacts awkwardly with pool auto-grow; row-major Sender remains the SF path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Lands the Rust core, C ABI, and benchmarks for a column-major sender
targeting Pandas/Polars → QuestDB throughput over QWP/WebSocket. See
`doc/COLUMN_SENDER_PLAN.md` for the design and `doc/COLUMN_SENDER_FFI_ABI.md`
for the C ABI spec; both shipped in earlier commits on this branch.
# What's in the box
* **WS-0 — `QuestDb` pool** (`ingress/column_sender/db.rs`,
`ingress/column_sender/conf.rs`).
Thread-safe pool with eager-open, fail-fast at `pool_max`,
`BorrowedSender<'a>` that returns on `Drop`, and a background reaper
(`pool_reap=auto`, tick = max(5 s, idle_timeout / 12)) that closes
excess-over-`pool_size` connections. New conf keys: `pool_size`,
`pool_max`, `pool_idle_timeout_ms`, `pool_reap`. `sf_*` / `sender_id`
/ `qwp_ws_progress=manual` refused at `connect`-time.
* **WS-1 — synchronous `flush` plumbing** (`ingress/column_sender/sender.rs`,
`ingress/column_sender/encoder.rs`).
`ColumnSender::flush(chunk, AckLevel)` encodes the chunk, publishes via
the existing QWP/WS replay queue (`Sender::qwp_ws_publish_raw` —
pub(crate) escape hatch in the row-API sender), and blocks until the
ACK watermark crosses the published FSN. Polls in 50 ms slices so a
`must_close` mid-wait surfaces promptly. `AckLevel::Durable` requires
`request_durable_ack=on` at connect or returns `InvalidApiCall`.
* **WS-2 — `Chunk` + numeric / fixed-width columns**
(`ingress/column_sender/chunk.rs`, `validity.rs`, `wire.rs`).
Per-column wire-shape `Vec<u8>` storage so encode is a header +
`extend_from_slice` per column. Two code paths per type per the plan
§2.2:
- Bool, i8, i16, i32, i64, f32, f64: `null_flag = 0` always; nullable
rows sentinel-encoded (0 / i32::MIN / i64::MIN / NaN), matching the
row-API convention.
- Sparse-null types (uuid, long256, ipv4, ts_nanos, ts_micros,
date_millis): no-null = `extend_from_slice`; nullable = QWP-shape
bitmap + dense gather.
- Designated timestamp (micros or nanos) — exactly one per chunk.
Connection-scoped `SchemaRegistry`: first emit → FULL; repeat → REFERENCE.
* **WS-3 — VARCHAR** (`Chunk::column_varchar`). Arrow Utf8 in
(`offsets: &[i32]` length `row_count + 1`, `bytes: &[u8]`); wire out
is dense `non_null_count + 1` LE-u32 offsets + concatenated bytes.
No-null path memcpys offsets when `offsets[0] == 0`; nullable path
walks validity and skips slicing for null rows. Offset validation
(negative / non-monotonic / past `bytes_len`) caught client-side.
* **WS-4 — symbol bulk-intern**
(`Chunk::symbol_dict_{i8,i16,i32}`, `encoder::resolve_symbols`).
Three append-time passes: referenced-bitset + range check; compact
referenced dict bytes; translate codes to internal indices and build
the QWP-shape bitmap. Connection-scoped `SymbolGlobalDict` shared
with the row API's type (`buffer/qwp.rs:next_id/intern/entry`
promoted to `pub(crate)`). At flush time, only entries the chunk
actually references reach the wire — protects the 1M-per-connection
cap on huge Pandas `Categorical` dicts. Roll-back on encode error
keeps client + server dict views coherent.
* **WS-5 — C ABI** (`questdb-rs-ffi/src/column_sender.rs`,
`include/questdb/ingress/column_sender.h`).
Full implementation of `doc/COLUMN_SENDER_FFI_ABI.md`:
- Opaque handles `questdb_db`, `column_sender`, `column_sender_chunk`.
- `column_sender_validity` repr-C struct; `column_sender_ack_level`
repr-C enum.
- `questdb_db_connect/close/borrow_sender/return_sender/reap_idle`.
- Every chunk column-append, the VARCHAR + symbol_dict family, the
two designated-timestamp variants, and `column_sender_flush`.
- Errors reuse `line_sender_error*`.
Rust side gains `OwnedSender` — Arc-backed borrow handle the FFI hands
out as `column_sender*` so the C caller can free `questdb_db*` before
all borrows return without dangling.
Hand-runnable smoke test at `cpp_test/smoke_column_sender.c`
(compiles with `-Wall -Wextra -Werror`; not wired into CMake yet —
matches the `smoke_line_reader` pattern).
* **WS-6 — bench** (`questdb-rs/benches/column_sender.rs`,
`doc/COLUMN_SENDER_PERF.md`).
Three families: per-column append vs raw memcpy baseline; symbol
bulk-intern vs naïve per-row HashMap; encode_chunk end-to-end (no
network). First-baseline numbers (Apple Silicon laptop, 100k rows):
- `column_f64/column_sender_no_null` ≈ 55 GiB/s — matches memcpy.
- `column_i64/column_sender_no_null` ≈ 54 GiB/s — matches memcpy.
- `column_varchar/column_sender_no_null` within ~5 % of memcpy.
- Symbol bulk-intern ~16× faster than naïve per-row HashMap.
- `encode_chunk/populate_plus_encode` ≈ 139 M rows/s end-to-end.
# Verification
- 57 column-sender tests (Rust core); 8 FFI tests; full 834-test lib
suite passes.
- `cargo fmt` + `cargo clippy --tests --benches` clean on both crates.
- `cargo doc` introduces no new warnings.
- `cc -std=c11 -Wall -Wextra -Werror -I include` compiles the C header
and the smoke program.
# What's not in here
- WS-7 (Python wrapper) lives in `py-questdb-client`. With the C ABI
in `include/questdb/ingress/column_sender.h` and the FFI symbols in
`libquestdb_client`, that repo can now start consuming.
- A live Pandas→QuestDB end-to-end bench and 1-hour soak — both
belong in the Python repo / nightly CI rather than the in-tree
Criterion suite.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Rewrite the column-major sender to eliminate intermediate buffers and pipeline writes for maximum single-connection throughput. Architecture changes: - ColumnSender now owns a dedicated ColumnConn (conn.rs) that drives socket I/O directly — no replay queue, no background thread, no row-API publisher involvement. - Chunk<'a> holds borrowed descriptors (raw pointers + lengths) into the caller's buffers; no per-column Vec<u8> staging. The encoder writes wire bytes straight from caller memory into the connection's reusable write_buf at flush time. - flush() pipelines: encode + WS-mask + write_all, then drain acks non-blocking. Blocks only when in-flight hits the 128-frame protocol cap. New sync(AckLevel) blocks until all acks settle. - Server cumulative OKs handled correctly (sequence=N acks all frames up to N). API changes: - flush(&mut chunk, AckLevel) → flush(&mut chunk) (fire-and-forget) - New sync(AckLevel) drains all in-flight acks - FFI: column_sender_flush drops ack_level arg; new column_sender_sync - FFI lifetime contract: caller buffers must outlive flush (no copy) Performance (5M-row L1 quotes, 9 columns, localhost): - Encode path: 6 GB/s (2.3% of wall time) - End-to-end: 350 MB/s pipelined (was 264 MB/s stop-and-wait) - Per-chunk p50: 0.72 ms (was 2.64 ms) - Criterion populate+encode: 575 µs (was 718 µs, 20% faster) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The default macOS TCP send buffer (~128 KB) is smaller than a typical QWP chunk (1.5 MB at 25k rows). write_all blocks mid-frame while the kernel drains the small buffer. A 4 MiB send buffer lets the kernel accept a full chunk in one shot, reducing write_all stalls when the pipeline has multiple frames in flight. Also sets SO_RCVBUF to 4 MiB to absorb ack bursts from the server without backpressuring the server's send path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
flush() now sets FLAG_DEFER_COMMIT (0x01) on every QWP frame. The server appends rows to WAL writers without committing. sync() sends a commit-triggering empty frame (without the flag) that commits all accumulated rows in one WAL transaction, then drains acks. This eliminates per-chunk WAL fsync overhead: 200 chunks × 25k rows now produce 1 WAL commit instead of 200. The p95 per-chunk latency drops from ~23 ms to ~7 ms. Old servers that don't recognize the flag ignore it (reserved bit position) and commit per-message — graceful degradation per the spec. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The server's ClientSymbolCache only caches symbols with symbolKey < initialSymbolCount. On a fresh table, initialSymbolCount stays at 0 until a WAL segment rolls and the watermark updates. By sending the first frame without FLAG_DEFER_COMMIT, the server commits it immediately, which allows the next segment to pick up the new symbol count and enable caching for all subsequent deferred frames. This is a client-side workaround for a server-side cache limitation. The proper fix is for the server to cache locally-assigned symbol IDs within the same segment (see WalColumnarRowAppender.putSymbolColumn). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Align the C ABI, docs, and smoke test with column_sender_flush(sender, chunk, err) plus column_sender_sync(sender, ack_level, err). Reserve an in-flight slot for the sync commit, validate durable ACK opt-in before publishing, and add pool/sync coverage.
Rename the borrowed handle returned from the connection pool from `column_sender` to `qwpws_conn` so it can host peer writer modes (per-type today, generic Arrow / NumPy in Steps 2-3, future egress readers). No behaviour change — the underlying Rust types (ColumnSender / OwnedSender) keep their names since they're doc-hidden; only the public C ABI changes. FFI surface changes: - struct column_sender -> qwpws_conn - questdb_db_borrow_sender -> questdb_db_borrow_conn - questdb_db_return_sender -> questdb_db_return_conn - column_sender_must_close -> qwpws_conn_must_close - column_sender_flush(sender, ...) -> column_sender_flush(conn, ...) - column_sender_sync(sender, ...) -> column_sender_sync(conn, ...) column_sender_chunk and the column_sender_chunk_column_* / _symbol_dict_* appenders keep their names — the chunk IS the column-sender writer's accumulator, and flush/sync are operations on it; only the borrowed-handle parameter type changes. See plan-conn-pool-and-writers.md in py-questdb-client (Step 1) and the Slack thread from 2026-05-27 with Victor for the rationale: pool QWP/WS connections, not writers, so egress readers and Arrow / NumPy appenders can share the same pool as the existing column_sender chunk path. Open Q1 from the plan is answered (chunk.rs:208, encoder.rs:82-95, encoder.rs:460-466): `column_sender_chunk_column_*` already direct-writes to the wire buffer — for native-LE contiguous data it is one `extend_from_slice` per column. So Step 3's NumPy appender is no longer about "saving an extra memcpy"; it's about avoiding Python-side widening for narrower dtypes / strided / non-native-endian. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
New entry point that consumes an Apache Arrow C Data Interface
ArrowArray + ArrowSchema pair and dispatches to the existing per-type
chunk methods based on the schema's format string. Caller passes the
borrowed pointers it gets from PyArrow's `_export_to_c` (or any other
Arrow C Data producer); the FFI never constructs or releases the
arrays.
Supported schema formats in this patch:
- c, s, i, l int8 / int16 / int32 / int64
- f, g float32 / float64
- b bool (LSB-first bitmap)
- u UTF-8 string (int32 offsets)
- tsn:..., tsu:... timestamp nanos / micros (timezone suffix ignored)
- dictionary schemas with c/s/i indices and a UTF-8 value type —
routed to symbol_dict_i8 / _i16 / _i32
Other formats — including LargeUtf8 (U), decimal, struct, list, and
non-UTF-8 dictionary values — currently return
line_sender_error_invalid_api_call. LargeUtf8 lands in Step 2b.
Constraints:
- ArrowArray.offset must be 0; sliced arrays are rejected.
- The chunk's row-count lock applies to the new appender the same
way as the per-type calls.
The Arrow types are mirrored as #[repr(C)] structs in the Rust FFI
shim so we read them without taking a dependency on the arrow / arrow-
array crate. No new Rust dependencies.
See plan-conn-pool-and-writers.md (Step 2). The Cython-side wiring
(routing pandas Arrow-backed columns through this entry point) lands
in a separate patch.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add ColumnKind::VarcharLarge (i64 offsets) + Chunk::column_varchar_large + encode_varchar_large. The new encoder reads i64 offsets and writes u32 LE to the wire frame in one pass — no caller- or Rust-side intermediate Vec<i32> for the narrowing. Validation rejects negative offsets, decreasing offsets, offsets exceeding the bytes buffer, AND any last offset exceeding u32::MAX (the QWP wire offset table is uint32 LE). The overflow check at chunk-build time surfaces a meaningful error rather than a per-row overflow at encode time. The Arrow appender's `U` format match now routes here. This unblocks the Python side: pandas large_string columns can be sent without the Python-side cast to UTF-8 (which previously allocated a fresh Arrow array via pyarrow.cast). estimate_frame_size grew a VarcharLarge case identical to Varchar. questdb-rs 836 lib-tests pass. clippy clean on both crates. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Extend column_sender_chunk_append_arrow_column with row_offset and
row_count parameters so chunked-emission callers can slice an
ArrowArray without consolidating it first. Required for the Python
Client.dataframe path, which loops over row chunks and currently
slices buffers manually for the per-type appenders.
Per-format slicing:
- fixed-width primitives + timestamps: data pointer is shifted by
row_offset elements (`ptr.add(row_offset)`).
- bool bitmap: shifted by row_offset / 8 bytes; row_offset % 8 == 0
required (matches the validity bitmap byte-alignment).
- utf8 / large_utf8: offsets pointer shifted by row_offset
elements (Arrow offsets are monotonic, so the slice's offsets
are still well-formed). bytes_len is read from the original
array's last offset; the encoder rebases on the wire.
- dictionary symbols: codes pointer shifted; the dictionary is
shared across chunks unchanged.
Validity bitmap requires row_offset % 8 == 0; with row_offset=0 and
row_count=array.length we get exactly the previous behaviour.
Caller bounds-check: row_offset + row_count must not exceed
array.length.
The C header docs the new parameters; clippy & fmt clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Match the Arrow C Data Interface spec more precisely: `children` is `*const *mut ArrowArray` (`struct ArrowArray**` in the spec) and `dictionary` is `*mut ArrowArray`. We never mutate, so this is layout-equivalent to the previous `*const`/`*const`, but the declarations now line up with the spec for readers cross-checking. - Rename `array_len` -> `array_total_len` in the appender so the meaning is unambiguous next to the per-call `row_count` parameter. - Cross-reference doc comments: the per-type varchar / symbol_dict C-ABI entries now mention `column_sender_chunk_append_arrow_column` as the recommended path for callers holding an Arrow array, and flag the per-type entries as the lower-level building block. No behaviour change. fmt + clippy clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two correctness findings from the multi-agent review: 1. **encode_varchar_large rejected valid late slices.** validate_varchar_offsets_i64 checked the absolute `last` offset against u32::MAX, but the encoder narrows `(off - first)` per row. A slice taken from the tail of a multi-GiB LargeUtf8 array (e.g. base=3 GiB, last=4 GiB) was rejected even though every wire offset would be ≤ 1 GiB. Now we validate the *span* `last - first` against u32::MAX, with a clearer error message. 2. **Null-pointer deref on malformed Arrow arrays.** arrow_buffer<T> returned the raw buffer pointer without checking it for null. Callers then unconditionally `slice::from_raw_parts(...)` or `*offsets_ptr.add(...)`. A producer presenting length > 0 with a null data buffer (spec-violating but plausible from buggy clients) would UB before any validation ran. Added an `allow_null: bool` parameter. The bytes buffer of an empty varchar/symbol-dict array can legitimately be NULL (we already guard that downstream), so those three call sites pass `true`. All other call sites — offsets, primitives, codes, bool bitmap — pass `false` and surface a clean `InvalidApiCall` error instead. Reviewers: convergent finding from concurrency-code-reviewer (Rust) and general-purpose (cross-layer) agents. clippy + fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add Chunk::column_numpy + NumpyDtype enum to questdb-rs, plus the C FFI wrapper column_sender_chunk_append_numpy_column. Behaviour, per Step 3 design decisions: - i8/i16/i32 -> i64 sign-extend (wire = LONG). - u8/u16/u32 -> i64 zero-extend (wire = LONG). - i64 -> pass-through (wire = LONG). - u64 -> i64 bit-reinterpret. Values > i64::MAX wrap to negative on the wire, matching the row path's C-cast behaviour. - f32 -> f64 widen (wire = DOUBLE). - f64 -> pass-through (wire = DOUBLE). - bool (NumPy byte-per-row) -> Arrow LSB-first packed bitmap (wire = BOOLEAN). Strided arrays and non-native-endian arrays are not supported in v1; the caller (Python client) consolidates upstream. Widening lives in Rust at append time, materialising into a chunk- owned scratch arena (`Chunk::scratch: Vec<NumpyScratch>`). The ColumnDescriptor's `*const T` points into the scratch; the encoder hot path is unchanged. Scratch is cleared on Chunk::clear / drop. The scratch enum uses typed variants (Box<[i64]>, Box<[f64]>, Box<[u8]>) so the storage alignment matches the encoder's read alignment. questdb-rs 836 lib-tests pass. clippy + fmt clean on both crates. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Round-3 dirty-sender fix (option c from plan-conn-pool-and-writers.md): expose a new FFI that callers use in error-recovery paths to force-close a conn instead of recycling it. The problem: a mid-call flush failure left a conn with in-flight uncommitted frames in the pool. The next borrower's first flush is QWP's "immediate commit", which would commit the stale frames alongside their own. The fix exposes a single new entry point: void questdb_db_drop_conn(questdb_db* db, qwpws_conn* conn); semantically equivalent to "mark must_close, then return" but in one atomic step. The conn enters the terminal state and the pool drops it on return rather than recycling it. Implementation: - ColumnConn gains `mark_must_close(&mut self)` (pub(crate)). - ColumnSender gains `mark_must_close(&mut self)` (pub) that forwards to ColumnConn. - The FFI wraps these: questdb_db_drop_conn marks then drops. The existing `qwpws_conn_must_close()` getter is unchanged; this adds the corresponding setter at each layer. clippy + fmt clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three small hardening tweaks:
1. **Tighten format dispatch.** The Arrow C Data Interface only uses
a `:`-prefixed parameter on timestamp / date / time formats;
everything else is a single character. Previously
`column_sender_chunk_append_arrow_column` did
`format.split(':').next()` and dispatched on the prefix, which
would spuriously match e.g. a malformed `"u:foo"` to the varchar
arm. Exact-match the non-ts arms and use `starts_with("tsn:")` /
`starts_with("tsu:")` for the ts arms.
2. **Accept `null_count == -1` with NULL bitmap as "no nulls".**
pyarrow / polars emit this shape when the column has no nulls
(the spec's "unknown" interpretation). We treat it as no-nulls;
the encoder reads the data buffer densely. Only `null_count > 0`
with a NULL bitmap is malformed.
3. **Guard `dict_array.length < 0`.** The main array's negative
length is already rejected in
`column_sender_chunk_append_arrow_column`; mirror the same check
inside `arrow_dictionary_utf8` for symmetry.
clippy + fmt clean.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…refactor # Conflicts: # questdb-rs/src/ingress.rs
Extends the column-sender pool to also serve egress readers from one
shared `questdb_db` configured by a single conf-string. Lazy-init for
readers, eager for writers, same `pool_size` / `pool_max` /
`pool_idle_timeout_ms` / `pool_reap` budget.
- questdb-rs/db.rs: parallel reader free-list, `borrow_reader_owned`,
`ReaderPoolHandle`, `OwnedReader::mark_must_close`, integrated into
the reaper. All reader-side state and methods feature-gated under
`_egress` so the default build (no egress) stays lean.
- questdb-rs/egress/config: reader conf-string parser accepts the
`qwpws::` / `qwpwss::` schemes and ignores `pool_*` keys, so a
single conf-string drives both the sender and reader pools without
translation.
- questdb-rs-ffi/egress: `line_reader` becomes a named struct with a
`ReaderOwnership` enum (Standalone vs Pooled{handle, must_close});
pool borrow/return + `line_reader_mark_must_close` exposed in C.
- column_sender.rs: `questdb_db(pub(crate) QuestDb)` so the egress
FFI can reach the inner pool to wire reader borrows.
- Headers: reader-pool entry points live in `egress/line_reader.h`
next to the type they wrap; `ingress/column_sender.h` points
there.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Keep borrowed sender handles populated when same-handle replacement connect fails, so later safe calls return typed errors instead of panicking. Preserve the logical pool slot during replacement so pool_max=1 can reborrow a live endpoint. Retry Polars dataframe replacement-connect SocketErrors inside the reconnect budget, with focused regressions for delayed endpoint recovery and failed reborrow no-panic behavior. Test: cargo test --features almost-all-features,arrow,polars column_sender_pool:: -- --nocapture
QWP/WebSocket connect-walk role rejections (421 + X-QuestDB-Role, or a target= filter no reachable endpoint satisfies) were coded SocketError, indistinguishable from "all endpoints unreachable". Add a dedicated ErrorCode::RoleMismatch (FFI line_sender_error_role_mismatch = 18, appended ABI-stable) and emit it from both the qwp/ws handshake-reject classifier and the connect-walk wrapper, so callers can tell "no primary elected yet" from "everything is down" -- matching the reader FFI's existing line_reader_error_role_mismatch. Walk and retry behaviour are unchanged: classification keys off the attached role-reject struct, not the error code. Updates the C header, the From<ErrorCode> mapping, and both ABI/coverage tripwire tests. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Resolve ci/run_tests_pipeline.yaml conflict in favour of main's nightly-docker job (TestVsQuestDBNightly): drop the from-source server build and its JDK/Rust/numpy setup steps. The numpy/pyarrow/polars client test deps are now installed by templates/compile.yaml on the hosted ubuntu-latest pool, so nothing is lost. Our Arrow-fuzz additions to the separate from-source fuzz jobs are preserved. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Accept `impl TryInto<TableName>` on `flush_arrow_batch`, `flush_arrow_batch_at_column`, and `flush_polars_dataframe`, so `&str` table names work without `TableName::new(...)?` ceremony (backward compatible; existing `TableName` callers are unaffected). Add `PolarsIngestOptions` and switch `flush_polars_dataframe` to take it, closing the polars capability gaps: - per-column wire-type `overrides` (previously documented for "Polars frames built without pyarrow" yet unreachable through this entry), - an optional designated-timestamp column (parity with the arrow sink), - `max_rows(usize)` instead of a clunky `Option<NonZeroUsize>`. Document that the polars entry owns the commit/failover replay boundary, unlike `flush`/`flush_arrow_batch` which leave rows uncommitted until `sync`. Add a regression test exercising overrides on the polars path.
Elevate API ergonomics from a thin "code quality" nit to a first-class, blocking review dimension in both the pi and claude review-pr skills: - new review-mindset principle treating ergonomics/cross-surface consistency as correctness for a client library; - Agent 8 rewritten into a dedicated "Public API ergonomics & cross-surface consistency" agent with codebase-specific checks; - new checklist section covering same-concept-same-shape, capability parity across sinks/transports, naming/verb parity, cross-language parity, easy-path=safe-path, and silent semantic divergence; - Step 3b verification rule grading ergonomic findings by user impact (footguns are Critical/Moderate, pure style is Minor); - the ergonomics agent now runs at level 1 and in the level-0 pass.
Add a row-major (`crate::ingress::Sender`) companion pool to `QuestDb`, alongside the existing column-major senders. `borrow_row_sender()` hands out a `BorrowedRowSender` that derefs to `Sender` (classic ILP `Buffer` + `flush`) and returns to the pool on `Drop`, dropping instead of recycling when the connection latched `must_close` or the caller called `mark_must_close()`. Mirrors the existing reader pool: lazy-init, independent free list and `pool_max` cap, rebuilt from the stored connect string via `SenderBuilder::from_conf`, with idle reaping (no warm floor). `borrow_sender` remains column-major and unchanged. The three pools (column, reader, row) cap independently, so combined live connections can reach `3 * pool_max`. Add an exhaustive test suite (9 tests): lazy init, recycle/reuse, auto-grow, fail-fast at cap, flush round-trip, explicit must-close drop, column/row independence, concurrent borrow/return, manual + auto idle reaping, and the build-failure in-use-slot leak guard.
Review — columnar DataFrame ingest (Arrow / Polars / Pandas) + Arrow egressAdversarial level-3 review. Scope: 90 files, +40,097 / −972 — the combined landing of #148 (column_sender) + #150 (Arrow/Polars).
Method: build-profile facts captured; deep passes over the ~15 risk-bearing Rust/FFI files; docs/CI/examples/python-fuzz/bench treated as lower priority. Every finding below was verified against source (including arrow-rs 58.3.0). Premise (load-bearing): Headline: unusually well-hardened. The highest-risk surface — the Arrow C Data Interface pre- Critical (request changes)C1 —
ModerateM1 — M2 — numpy append has a data pointer + row_count but no buffer byte-length → a wrapper bug = OOB read (host crash/UB). (verified) M3 — Negative M4 — Pool M5 — Egress: per-column degenerate-list node budget (16M) is reset per column, not per batch → OOM/abort amplification. (confirmed, reachability nuance) M6 — Egress: mid-query decode errors are now failover-eligible → deterministic-corruption reconnect/replay loop + per-round M7 — Within the column API, table/column names use two different shapes + validation timings. (verified) M8 — Designated-timestamp requirement diverges between publish paths. (verified) The chunk path hard-errors without a designated timestamp ( M9 — M10 — No test coverage for sliced / non-zero-offset Arrow arrays — the classic encoder trap. (verified by grep) No Minor
Downgraded (initially suspected, verified safe — high-signal)
Summary
|
Summary
This PR is the combined landing of #148 and #150 — two independently-developed columnar I/O tracks that both target QWP/WebSocket and share the same connection pool /
SymbolGlobalDict, shipped together so downstream consumers (py-questdb-client, in-tree C/C++ tests) see one self-consistent surface.#148 — Column-major sender (
column_sender)A DataFrame → Table ingest API.
QuestDbconnection pool +BorrowedSender+Chunk(per-columnVec<u8>that stacks wire bytes directly) + synchronousflush(AckLevel). Covers bool / signed integers / floats / UUID / Long256 / IPv4 / timestamps / VARCHAR /symbol_dict_{i8,i16,i32}bulk-intern. The connection-scopedSchemaRegistry(FULL / REFERENCE emit modes) andSymbolGlobalDictare shared with the row API, preserving the 1M-per-connection symbol cap on huge PandasCategoricaldicts. Full C ABI (include/questdb/ingress/column_sender.h) and a Criterion bench suite (column path ≈ memcpy ceiling; bulk-intern ~16× faster than per-row HashMap).#150 — Apache Arrow + Polars integration
Both directions over QWP/WebSocket:
Buffer::append_arrow/append_arrow_at_columnconsumes a wholeRecordBatchin one call, column-major dense bulk path (one memcpy per column; QWP null bitmap built by byte-stride OR-with-NOT of the Arrow validity buffer when boundaries align, per-row fallback only when bit-offsets are unaligned).Cursor::as_record_batch_reader()streamingRecordBatchiterator; Polars sub-feature provides the DataFrame bridge.line_sender_buffer_append_arrow*andline_reader_cursor_next_arrow_batch. Every producer-suppliedArrowArray/ArrowSchemais pre-validated beforefrom_ffi(schema depth ≤ 64,row_count≤ 16M, bounded per-node buffer/child counts, and rejection of NULL or under-sized buffer arrays), so a malformed struct returns an error instead of aborting the FFI crate'spanic = "abort"profile. The manual per-column FFI path caps each variable-length payload ati32::MAX.Why merged
The two tracks were developed on the same
jh_conn_pool_refactorbranch and converged on shared infrastructure (connection pool,SymbolGlobalDict, QWP/WS transport). Splitting them at review time would force one to ship behind a compatibility shim for the other; merging them together avoids that churn and gives C/C++ callers — and the upcoming Pandas / Polars wrapper inpy-questdb-client— a column-major, zero-redundant-copy path into QuestDB in one cut.Public surface
See the original PRs' "Public surface" / "What's in the box" sections:
Feature gating
questdb-rs:arrow+polarsare opt-in features, excluded fromalmost-all-features; column_sender lives behindsync-sender-qwp-ws.questdb-rs-ffi:arrowfeature mirrors.CMakeLists.txt:QUESTDB_ENABLE_ARROW=OFFby default; auto-flipped toONwhenQUESTDB_TESTS_AND_EXAMPLES=ONso tests / examples exercise the Arrow path without explicit opt-in.Test plan
test_arrow_c.c/test_arrow_egress.cpp/test_arrow_ingress.cppwired into CMake and exercised in CIarrow_egress_fuzz/arrow_ingress_fuzz/arrow_round_trip_fuzz/arrow_alignment_fuzzcargo bench --features sync-sender-qwp-ws --bench column_senderpy-questdb-client, WS-7)Closes #148, #150.
Summary by CodeRabbit
New Features
Examples
Documentation
Tests
Chores (CI)
Benchmarks