Skip to content

feat: columnar DataFrame ingest (Arrow / Polars / Pandas) + Arrow egress#153

Open
kafka1991 wants to merge 117 commits into
mainfrom
jh_conn_pool_refactor
Open

feat: columnar DataFrame ingest (Arrow / Polars / Pandas) + Arrow egress#153
kafka1991 wants to merge 117 commits into
mainfrom
jh_conn_pool_refactor

Conversation

@kafka1991

@kafka1991 kafka1991 commented Jun 4, 2026

Copy link
Copy Markdown
Collaborator

Summary

This PR is the combined landing of #148 and #150 — two independently-developed columnar I/O tracks that both target QWP/WebSocket and share the same connection pool / SymbolGlobalDict, shipped together so downstream consumers (py-questdb-client, in-tree C/C++ tests) see one self-consistent surface.

#148 — Column-major sender (column_sender)

A DataFrame → Table ingest API. QuestDb connection pool + BorrowedSender + Chunk (per-column Vec<u8> that stacks wire bytes directly) + synchronous flush(AckLevel). Covers bool / signed integers / floats / UUID / Long256 / IPv4 / timestamps / VARCHAR / symbol_dict_{i8,i16,i32} bulk-intern. The connection-scoped SchemaRegistry (FULL / REFERENCE emit modes) and SymbolGlobalDict are shared with the row API, preserving the 1M-per-connection symbol cap on huge Pandas Categorical dicts. Full C ABI (include/questdb/ingress/column_sender.h) and a Criterion bench suite (column path ≈ memcpy ceiling; bulk-intern ~16× faster than per-row HashMap).

#150 — Apache Arrow + Polars integration

Both directions over QWP/WebSocket:

  • Ingress: Buffer::append_arrow / append_arrow_at_column consumes a whole RecordBatch in one call, column-major dense bulk path (one memcpy per column; QWP null bitmap built by byte-stride OR-with-NOT of the Arrow validity buffer when boundaries align, per-row fallback only when bit-offsets are unaligned).
  • Egress: Cursor::as_record_batch_reader() streaming RecordBatch iterator; Polars sub-feature provides the DataFrame bridge.
  • C ABI via the Arrow C Data Interface: line_sender_buffer_append_arrow* and line_reader_cursor_next_arrow_batch. Every producer-supplied ArrowArray/ArrowSchema is pre-validated before from_ffi (schema depth ≤ 64, row_count ≤ 16M, bounded per-node buffer/child counts, and rejection of NULL or under-sized buffer arrays), so a malformed struct returns an error instead of aborting the FFI crate's panic = "abort" profile. The manual per-column FFI path caps each variable-length payload at i32::MAX.

Why merged

The two tracks were developed on the same jh_conn_pool_refactor branch and converged on shared infrastructure (connection pool, SymbolGlobalDict, QWP/WS transport). Splitting them at review time would force one to ship behind a compatibility shim for the other; merging them together avoids that churn and gives C/C++ callers — and the upcoming Pandas / Polars wrapper in py-questdb-client — a column-major, zero-redundant-copy path into QuestDB in one cut.

Public surface

See the original PRs' "Public surface" / "What's in the box" sections:

Feature gating

  • questdb-rs: arrow + polars are opt-in features, excluded from almost-all-features; column_sender lives behind sync-sender-qwp-ws.
  • questdb-rs-ffi: arrow feature mirrors.
  • CMakeLists.txt: QUESTDB_ENABLE_ARROW=OFF by default; auto-flipped to ON when QUESTDB_TESTS_AND_EXAMPLES=ON so tests / examples exercise the Arrow path without explicit opt-in.

Test plan

  • Rust unit tests: 57 column_sender + 80+ Arrow
  • FFI unit tests: 8 new column_sender + Arrow path coverage
  • C/C++ tests: test_arrow_c.c / test_arrow_egress.cpp / test_arrow_ingress.cpp wired into CMake and exercised in CI
  • System tests against a live QuestDB: arrow_egress_fuzz / arrow_ingress_fuzz / arrow_round_trip_fuzz / arrow_alignment_fuzz
  • cargo bench --features sync-sender-qwp-ws --bench column_sender
  • End-to-end Pandas / Polars throughput (py-questdb-client, WS-7)

Closes #148, #150.

Summary by CodeRabbit

  • New Features

    • Opt-in Apache Arrow support for Arrow egress and ingest; new column-major sender for high-throughput DataFrame ingestion; Polars integration.
  • Examples

    • Added C, C++ and Rust examples demonstrating Arrow egress/ingest and column-sender/Polars workflows.
  • Documentation

    • Added column-sender ABI spec, implementation plan, and performance guidance.
  • Tests

    • Expanded Arrow/Polars unit, smoke and fuzz coverage; new integration tests.
  • Chores (CI)

    • CI updated to install pyarrow/polars, expanded test/fuzz jobs and longer timeouts.
  • Benchmarks

    • New Criterion benchmarks measuring column-sender performance.

bluestreak01 and others added 30 commits May 24, 2026 01:42
Plan and FFI ABI for the new column-major writer that will ingest
Pandas/Polars DataFrames over QWP/WebSocket. Locks the QuestDb pool
shape, BulkChunk encoder strategy, validity bitmap semantics, and
the C ABI the separate Python wrapper repo will consume.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Locks the column-sender API around synchronous flush:
sender.flush(&mut chunk, ack_level) blocks until the requested ACK
level (Ok = WAL commit, Durable = object-store via Enterprise
opt-in). Drops the FSN/submit/await split from the FFI; at most one
frame in flight per sender, parallelism via the pool.

Refuses sf_dir and other sf_* keys at QuestDb::connect with
ConfigError — store-and-forward is single-writer-per-slot and
interacts awkwardly with pool auto-grow; row-major Sender remains
the SF path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Lands the Rust core, C ABI, and benchmarks for a column-major sender
targeting Pandas/Polars → QuestDB throughput over QWP/WebSocket. See
`doc/COLUMN_SENDER_PLAN.md` for the design and `doc/COLUMN_SENDER_FFI_ABI.md`
for the C ABI spec; both shipped in earlier commits on this branch.

# What's in the box

* **WS-0 — `QuestDb` pool** (`ingress/column_sender/db.rs`,
  `ingress/column_sender/conf.rs`).
  Thread-safe pool with eager-open, fail-fast at `pool_max`,
  `BorrowedSender<'a>` that returns on `Drop`, and a background reaper
  (`pool_reap=auto`, tick = max(5 s, idle_timeout / 12)) that closes
  excess-over-`pool_size` connections. New conf keys: `pool_size`,
  `pool_max`, `pool_idle_timeout_ms`, `pool_reap`. `sf_*` / `sender_id`
  / `qwp_ws_progress=manual` refused at `connect`-time.

* **WS-1 — synchronous `flush` plumbing** (`ingress/column_sender/sender.rs`,
  `ingress/column_sender/encoder.rs`).
  `ColumnSender::flush(chunk, AckLevel)` encodes the chunk, publishes via
  the existing QWP/WS replay queue (`Sender::qwp_ws_publish_raw` —
  pub(crate) escape hatch in the row-API sender), and blocks until the
  ACK watermark crosses the published FSN. Polls in 50 ms slices so a
  `must_close` mid-wait surfaces promptly. `AckLevel::Durable` requires
  `request_durable_ack=on` at connect or returns `InvalidApiCall`.

* **WS-2 — `Chunk` + numeric / fixed-width columns**
  (`ingress/column_sender/chunk.rs`, `validity.rs`, `wire.rs`).
  Per-column wire-shape `Vec<u8>` storage so encode is a header +
  `extend_from_slice` per column. Two code paths per type per the plan
  §2.2:
  - Bool, i8, i16, i32, i64, f32, f64: `null_flag = 0` always; nullable
    rows sentinel-encoded (0 / i32::MIN / i64::MIN / NaN), matching the
    row-API convention.
  - Sparse-null types (uuid, long256, ipv4, ts_nanos, ts_micros,
    date_millis): no-null = `extend_from_slice`; nullable = QWP-shape
    bitmap + dense gather.
  - Designated timestamp (micros or nanos) — exactly one per chunk.
  Connection-scoped `SchemaRegistry`: first emit → FULL; repeat → REFERENCE.

* **WS-3 — VARCHAR** (`Chunk::column_varchar`). Arrow Utf8 in
  (`offsets: &[i32]` length `row_count + 1`, `bytes: &[u8]`); wire out
  is dense `non_null_count + 1` LE-u32 offsets + concatenated bytes.
  No-null path memcpys offsets when `offsets[0] == 0`; nullable path
  walks validity and skips slicing for null rows. Offset validation
  (negative / non-monotonic / past `bytes_len`) caught client-side.

* **WS-4 — symbol bulk-intern**
  (`Chunk::symbol_dict_{i8,i16,i32}`, `encoder::resolve_symbols`).
  Three append-time passes: referenced-bitset + range check; compact
  referenced dict bytes; translate codes to internal indices and build
  the QWP-shape bitmap. Connection-scoped `SymbolGlobalDict` shared
  with the row API's type (`buffer/qwp.rs:next_id/intern/entry`
  promoted to `pub(crate)`). At flush time, only entries the chunk
  actually references reach the wire — protects the 1M-per-connection
  cap on huge Pandas `Categorical` dicts. Roll-back on encode error
  keeps client + server dict views coherent.

* **WS-5 — C ABI** (`questdb-rs-ffi/src/column_sender.rs`,
  `include/questdb/ingress/column_sender.h`).
  Full implementation of `doc/COLUMN_SENDER_FFI_ABI.md`:
  - Opaque handles `questdb_db`, `column_sender`, `column_sender_chunk`.
  - `column_sender_validity` repr-C struct; `column_sender_ack_level`
    repr-C enum.
  - `questdb_db_connect/close/borrow_sender/return_sender/reap_idle`.
  - Every chunk column-append, the VARCHAR + symbol_dict family, the
    two designated-timestamp variants, and `column_sender_flush`.
  - Errors reuse `line_sender_error*`.
  Rust side gains `OwnedSender` — Arc-backed borrow handle the FFI hands
  out as `column_sender*` so the C caller can free `questdb_db*` before
  all borrows return without dangling.

  Hand-runnable smoke test at `cpp_test/smoke_column_sender.c`
  (compiles with `-Wall -Wextra -Werror`; not wired into CMake yet —
  matches the `smoke_line_reader` pattern).

* **WS-6 — bench** (`questdb-rs/benches/column_sender.rs`,
  `doc/COLUMN_SENDER_PERF.md`).
  Three families: per-column append vs raw memcpy baseline; symbol
  bulk-intern vs naïve per-row HashMap; encode_chunk end-to-end (no
  network). First-baseline numbers (Apple Silicon laptop, 100k rows):
    - `column_f64/column_sender_no_null` ≈ 55 GiB/s — matches memcpy.
    - `column_i64/column_sender_no_null` ≈ 54 GiB/s — matches memcpy.
    - `column_varchar/column_sender_no_null` within ~5 % of memcpy.
    - Symbol bulk-intern ~16× faster than naïve per-row HashMap.
    - `encode_chunk/populate_plus_encode` ≈ 139 M rows/s end-to-end.

# Verification

- 57 column-sender tests (Rust core); 8 FFI tests; full 834-test lib
  suite passes.
- `cargo fmt` + `cargo clippy --tests --benches` clean on both crates.
- `cargo doc` introduces no new warnings.
- `cc -std=c11 -Wall -Wextra -Werror -I include` compiles the C header
  and the smoke program.

# What's not in here

- WS-7 (Python wrapper) lives in `py-questdb-client`. With the C ABI
  in `include/questdb/ingress/column_sender.h` and the FFI symbols in
  `libquestdb_client`, that repo can now start consuming.
- A live Pandas→QuestDB end-to-end bench and 1-hour soak — both
  belong in the Python repo / nightly CI rather than the in-tree
  Criterion suite.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Rewrite the column-major sender to eliminate intermediate buffers and
pipeline writes for maximum single-connection throughput.

Architecture changes:
- ColumnSender now owns a dedicated ColumnConn (conn.rs) that drives
  socket I/O directly — no replay queue, no background thread, no
  row-API publisher involvement.
- Chunk<'a> holds borrowed descriptors (raw pointers + lengths) into
  the caller's buffers; no per-column Vec<u8> staging. The encoder
  writes wire bytes straight from caller memory into the connection's
  reusable write_buf at flush time.
- flush() pipelines: encode + WS-mask + write_all, then drain acks
  non-blocking. Blocks only when in-flight hits the 128-frame protocol
  cap. New sync(AckLevel) blocks until all acks settle.
- Server cumulative OKs handled correctly (sequence=N acks all frames
  up to N).

API changes:
- flush(&mut chunk, AckLevel) → flush(&mut chunk) (fire-and-forget)
- New sync(AckLevel) drains all in-flight acks
- FFI: column_sender_flush drops ack_level arg; new column_sender_sync
- FFI lifetime contract: caller buffers must outlive flush (no copy)

Performance (5M-row L1 quotes, 9 columns, localhost):
- Encode path: 6 GB/s (2.3% of wall time)
- End-to-end: 350 MB/s pipelined (was 264 MB/s stop-and-wait)
- Per-chunk p50: 0.72 ms (was 2.64 ms)
- Criterion populate+encode: 575 µs (was 718 µs, 20% faster)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The default macOS TCP send buffer (~128 KB) is smaller than a typical
QWP chunk (1.5 MB at 25k rows). write_all blocks mid-frame while the
kernel drains the small buffer. A 4 MiB send buffer lets the kernel
accept a full chunk in one shot, reducing write_all stalls when the
pipeline has multiple frames in flight.

Also sets SO_RCVBUF to 4 MiB to absorb ack bursts from the server
without backpressuring the server's send path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
flush() now sets FLAG_DEFER_COMMIT (0x01) on every QWP frame. The
server appends rows to WAL writers without committing. sync() sends a
commit-triggering empty frame (without the flag) that commits all
accumulated rows in one WAL transaction, then drains acks.

This eliminates per-chunk WAL fsync overhead: 200 chunks × 25k rows
now produce 1 WAL commit instead of 200. The p95 per-chunk latency
drops from ~23 ms to ~7 ms. Old servers that don't recognize the flag
ignore it (reserved bit position) and commit per-message — graceful
degradation per the spec.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The server's ClientSymbolCache only caches symbols with
symbolKey < initialSymbolCount. On a fresh table, initialSymbolCount
stays at 0 until a WAL segment rolls and the watermark updates. By
sending the first frame without FLAG_DEFER_COMMIT, the server commits
it immediately, which allows the next segment to pick up the new
symbol count and enable caching for all subsequent deferred frames.

This is a client-side workaround for a server-side cache limitation.
The proper fix is for the server to cache locally-assigned symbol IDs
within the same segment (see WalColumnarRowAppender.putSymbolColumn).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Align the C ABI, docs, and smoke test with column_sender_flush(sender, chunk, err) plus column_sender_sync(sender, ack_level, err). Reserve an in-flight slot for the sync commit, validate durable ACK opt-in before publishing, and add pool/sync coverage.
Rename the borrowed handle returned from the connection pool from
`column_sender` to `qwpws_conn` so it can host peer writer modes
(per-type today, generic Arrow / NumPy in Steps 2-3, future egress
readers). No behaviour change — the underlying Rust types
(ColumnSender / OwnedSender) keep their names since they're
doc-hidden; only the public C ABI changes.

FFI surface changes:
- struct column_sender             -> qwpws_conn
- questdb_db_borrow_sender         -> questdb_db_borrow_conn
- questdb_db_return_sender         -> questdb_db_return_conn
- column_sender_must_close         -> qwpws_conn_must_close
- column_sender_flush(sender, ...) -> column_sender_flush(conn, ...)
- column_sender_sync(sender, ...)  -> column_sender_sync(conn, ...)

column_sender_chunk and the column_sender_chunk_column_* / _symbol_dict_*
appenders keep their names — the chunk IS the column-sender writer's
accumulator, and flush/sync are operations on it; only the
borrowed-handle parameter type changes.

See plan-conn-pool-and-writers.md in py-questdb-client (Step 1) and
the Slack thread from 2026-05-27 with Victor for the rationale: pool
QWP/WS connections, not writers, so egress readers and Arrow / NumPy
appenders can share the same pool as the existing column_sender chunk
path.

Open Q1 from the plan is answered (chunk.rs:208, encoder.rs:82-95,
encoder.rs:460-466): `column_sender_chunk_column_*` already
direct-writes to the wire buffer — for native-LE contiguous data it
is one `extend_from_slice` per column. So Step 3's NumPy appender is
no longer about "saving an extra memcpy"; it's about avoiding
Python-side widening for narrower dtypes / strided / non-native-endian.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
New entry point that consumes an Apache Arrow C Data Interface
ArrowArray + ArrowSchema pair and dispatches to the existing per-type
chunk methods based on the schema's format string. Caller passes the
borrowed pointers it gets from PyArrow's `_export_to_c` (or any other
Arrow C Data producer); the FFI never constructs or releases the
arrays.

Supported schema formats in this patch:
  - c, s, i, l       int8 / int16 / int32 / int64
  - f, g             float32 / float64
  - b                bool (LSB-first bitmap)
  - u                UTF-8 string (int32 offsets)
  - tsn:..., tsu:... timestamp nanos / micros (timezone suffix ignored)
  - dictionary schemas with c/s/i indices and a UTF-8 value type —
    routed to symbol_dict_i8 / _i16 / _i32

Other formats — including LargeUtf8 (U), decimal, struct, list, and
non-UTF-8 dictionary values — currently return
line_sender_error_invalid_api_call. LargeUtf8 lands in Step 2b.

Constraints:
  - ArrowArray.offset must be 0; sliced arrays are rejected.
  - The chunk's row-count lock applies to the new appender the same
    way as the per-type calls.

The Arrow types are mirrored as #[repr(C)] structs in the Rust FFI
shim so we read them without taking a dependency on the arrow / arrow-
array crate. No new Rust dependencies.

See plan-conn-pool-and-writers.md (Step 2). The Cython-side wiring
(routing pandas Arrow-backed columns through this entry point) lands
in a separate patch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add ColumnKind::VarcharLarge (i64 offsets) + Chunk::column_varchar_large
+ encode_varchar_large. The new encoder reads i64 offsets and writes
u32 LE to the wire frame in one pass — no caller- or Rust-side
intermediate Vec<i32> for the narrowing.

Validation rejects negative offsets, decreasing offsets, offsets
exceeding the bytes buffer, AND any last offset exceeding u32::MAX
(the QWP wire offset table is uint32 LE). The overflow check at
chunk-build time surfaces a meaningful error rather than a per-row
overflow at encode time.

The Arrow appender's `U` format match now routes here. This unblocks
the Python side: pandas large_string columns can be sent without the
Python-side cast to UTF-8 (which previously allocated a fresh Arrow
array via pyarrow.cast).

estimate_frame_size grew a VarcharLarge case identical to Varchar.

questdb-rs 836 lib-tests pass. clippy clean on both crates.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Extend column_sender_chunk_append_arrow_column with row_offset and
row_count parameters so chunked-emission callers can slice an
ArrowArray without consolidating it first. Required for the Python
Client.dataframe path, which loops over row chunks and currently
slices buffers manually for the per-type appenders.

Per-format slicing:
  - fixed-width primitives + timestamps: data pointer is shifted by
    row_offset elements (`ptr.add(row_offset)`).
  - bool bitmap: shifted by row_offset / 8 bytes; row_offset % 8 == 0
    required (matches the validity bitmap byte-alignment).
  - utf8 / large_utf8: offsets pointer shifted by row_offset
    elements (Arrow offsets are monotonic, so the slice's offsets
    are still well-formed). bytes_len is read from the original
    array's last offset; the encoder rebases on the wire.
  - dictionary symbols: codes pointer shifted; the dictionary is
    shared across chunks unchanged.

Validity bitmap requires row_offset % 8 == 0; with row_offset=0 and
row_count=array.length we get exactly the previous behaviour.

Caller bounds-check: row_offset + row_count must not exceed
array.length.

The C header docs the new parameters; clippy & fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
- Match the Arrow C Data Interface spec more precisely: `children`
  is `*const *mut ArrowArray` (`struct ArrowArray**` in the spec)
  and `dictionary` is `*mut ArrowArray`. We never mutate, so this
  is layout-equivalent to the previous `*const`/`*const`, but the
  declarations now line up with the spec for readers cross-checking.
- Rename `array_len` -> `array_total_len` in the appender so the
  meaning is unambiguous next to the per-call `row_count` parameter.
- Cross-reference doc comments: the per-type varchar / symbol_dict
  C-ABI entries now mention `column_sender_chunk_append_arrow_column`
  as the recommended path for callers holding an Arrow array, and
  flag the per-type entries as the lower-level building block.

No behaviour change. fmt + clippy clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two correctness findings from the multi-agent review:

1. **encode_varchar_large rejected valid late slices.**
   validate_varchar_offsets_i64 checked the absolute `last` offset
   against u32::MAX, but the encoder narrows `(off - first)` per row.
   A slice taken from the tail of a multi-GiB LargeUtf8 array (e.g.
   base=3 GiB, last=4 GiB) was rejected even though every wire offset
   would be ≤ 1 GiB. Now we validate the *span* `last - first` against
   u32::MAX, with a clearer error message.

2. **Null-pointer deref on malformed Arrow arrays.**
   arrow_buffer<T> returned the raw buffer pointer without checking it
   for null. Callers then unconditionally `slice::from_raw_parts(...)`
   or `*offsets_ptr.add(...)`. A producer presenting length > 0 with a
   null data buffer (spec-violating but plausible from buggy clients)
   would UB before any validation ran.

   Added an `allow_null: bool` parameter. The bytes buffer of an empty
   varchar/symbol-dict array can legitimately be NULL (we already
   guard that downstream), so those three call sites pass `true`. All
   other call sites — offsets, primitives, codes, bool bitmap — pass
   `false` and surface a clean `InvalidApiCall` error instead.

Reviewers: convergent finding from concurrency-code-reviewer (Rust)
and general-purpose (cross-layer) agents.

clippy + fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Add Chunk::column_numpy + NumpyDtype enum to questdb-rs, plus the C
FFI wrapper column_sender_chunk_append_numpy_column.

Behaviour, per Step 3 design decisions:
- i8/i16/i32 -> i64 sign-extend (wire = LONG).
- u8/u16/u32 -> i64 zero-extend (wire = LONG).
- i64 -> pass-through (wire = LONG).
- u64 -> i64 bit-reinterpret. Values > i64::MAX wrap to negative on
  the wire, matching the row path's C-cast behaviour.
- f32 -> f64 widen (wire = DOUBLE).
- f64 -> pass-through (wire = DOUBLE).
- bool (NumPy byte-per-row) -> Arrow LSB-first packed bitmap
  (wire = BOOLEAN).

Strided arrays and non-native-endian arrays are not supported in v1;
the caller (Python client) consolidates upstream.

Widening lives in Rust at append time, materialising into a chunk-
owned scratch arena (`Chunk::scratch: Vec<NumpyScratch>`). The
ColumnDescriptor's `*const T` points into the scratch; the encoder
hot path is unchanged. Scratch is cleared on Chunk::clear / drop.

The scratch enum uses typed variants (Box<[i64]>, Box<[f64]>,
Box<[u8]>) so the storage alignment matches the encoder's read
alignment.

questdb-rs 836 lib-tests pass. clippy + fmt clean on both crates.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Round-3 dirty-sender fix (option c from
plan-conn-pool-and-writers.md): expose a new FFI that callers use in
error-recovery paths to force-close a conn instead of recycling it.

The problem: a mid-call flush failure left a conn with in-flight
uncommitted frames in the pool. The next borrower's first flush is
QWP's "immediate commit", which would commit the stale frames
alongside their own.

The fix exposes a single new entry point:

  void questdb_db_drop_conn(questdb_db* db, qwpws_conn* conn);

semantically equivalent to "mark must_close, then return" but in one
atomic step. The conn enters the terminal state and the pool drops
it on return rather than recycling it.

Implementation:
- ColumnConn gains `mark_must_close(&mut self)` (pub(crate)).
- ColumnSender gains `mark_must_close(&mut self)` (pub) that
  forwards to ColumnConn.
- The FFI wraps these: questdb_db_drop_conn marks then drops.

The existing `qwpws_conn_must_close()` getter is unchanged; this
adds the corresponding setter at each layer.

clippy + fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three small hardening tweaks:

1. **Tighten format dispatch.** The Arrow C Data Interface only uses
   a `:`-prefixed parameter on timestamp / date / time formats;
   everything else is a single character. Previously
   `column_sender_chunk_append_arrow_column` did
   `format.split(':').next()` and dispatched on the prefix, which
   would spuriously match e.g. a malformed `"u:foo"` to the varchar
   arm. Exact-match the non-ts arms and use `starts_with("tsn:")` /
   `starts_with("tsu:")` for the ts arms.

2. **Accept `null_count == -1` with NULL bitmap as "no nulls".**
   pyarrow / polars emit this shape when the column has no nulls
   (the spec's "unknown" interpretation). We treat it as no-nulls;
   the encoder reads the data buffer densely. Only `null_count > 0`
   with a NULL bitmap is malformed.

3. **Guard `dict_array.length < 0`.** The main array's negative
   length is already rejected in
   `column_sender_chunk_append_arrow_column`; mirror the same check
   inside `arrow_dictionary_utf8` for symmetry.

clippy + fmt clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…refactor

# Conflicts:
#	questdb-rs/src/ingress.rs
Extends the column-sender pool to also serve egress readers from one
shared `questdb_db` configured by a single conf-string. Lazy-init for
readers, eager for writers, same `pool_size` / `pool_max` /
`pool_idle_timeout_ms` / `pool_reap` budget.

- questdb-rs/db.rs: parallel reader free-list, `borrow_reader_owned`,
  `ReaderPoolHandle`, `OwnedReader::mark_must_close`, integrated into
  the reaper. All reader-side state and methods feature-gated under
  `_egress` so the default build (no egress) stays lean.
- questdb-rs/egress/config: reader conf-string parser accepts the
  `qwpws::` / `qwpwss::` schemes and ignores `pool_*` keys, so a
  single conf-string drives both the sender and reader pools without
  translation.
- questdb-rs-ffi/egress: `line_reader` becomes a named struct with a
  `ReaderOwnership` enum (Standalone vs Pooled{handle, must_close});
  pool borrow/return + `line_reader_mark_must_close` exposed in C.
- column_sender.rs: `questdb_db(pub(crate) QuestDb)` so the egress
  FFI can reach the inner pool to wire reader borrows.
- Headers: reader-pool entry points live in `egress/line_reader.h`
  next to the type they wrap; `ingress/column_sender.h` points
  there.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
kafka1991 and others added 21 commits June 17, 2026 21:43
Keep borrowed sender handles populated when same-handle replacement connect fails, so later safe calls return typed errors instead of panicking. Preserve the logical pool slot during replacement so pool_max=1 can reborrow a live endpoint.

Retry Polars dataframe replacement-connect SocketErrors inside the reconnect budget, with focused regressions for delayed endpoint recovery and failed reborrow no-panic behavior.

Test: cargo test --features almost-all-features,arrow,polars column_sender_pool:: -- --nocapture
QWP/WebSocket connect-walk role rejections (421 + X-QuestDB-Role, or a
target= filter no reachable endpoint satisfies) were coded SocketError,
indistinguishable from "all endpoints unreachable". Add a dedicated
ErrorCode::RoleMismatch (FFI line_sender_error_role_mismatch = 18,
appended ABI-stable) and emit it from both the qwp/ws handshake-reject
classifier and the connect-walk wrapper, so callers can tell "no primary
elected yet" from "everything is down" -- matching the reader FFI's
existing line_reader_error_role_mismatch.

Walk and retry behaviour are unchanged: classification keys off the
attached role-reject struct, not the error code. Updates the C header,
the From<ErrorCode> mapping, and both ABI/coverage tripwire tests.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Resolve ci/run_tests_pipeline.yaml conflict in favour of main's
nightly-docker job (TestVsQuestDBNightly): drop the from-source server
build and its JDK/Rust/numpy setup steps. The numpy/pyarrow/polars
client test deps are now installed by templates/compile.yaml on the
hosted ubuntu-latest pool, so nothing is lost. Our Arrow-fuzz additions
to the separate from-source fuzz jobs are preserved.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Accept `impl TryInto<TableName>` on `flush_arrow_batch`,
`flush_arrow_batch_at_column`, and `flush_polars_dataframe`, so `&str`
table names work without `TableName::new(...)?` ceremony (backward
compatible; existing `TableName` callers are unaffected).

Add `PolarsIngestOptions` and switch `flush_polars_dataframe` to take it,
closing the polars capability gaps:
- per-column wire-type `overrides` (previously documented for "Polars
  frames built without pyarrow" yet unreachable through this entry),
- an optional designated-timestamp column (parity with the arrow sink),
- `max_rows(usize)` instead of a clunky `Option<NonZeroUsize>`.

Document that the polars entry owns the commit/failover replay boundary,
unlike `flush`/`flush_arrow_batch` which leave rows uncommitted until
`sync`. Add a regression test exercising overrides on the polars path.
Elevate API ergonomics from a thin "code quality" nit to a first-class,
blocking review dimension in both the pi and claude review-pr skills:

- new review-mindset principle treating ergonomics/cross-surface
  consistency as correctness for a client library;
- Agent 8 rewritten into a dedicated "Public API ergonomics &
  cross-surface consistency" agent with codebase-specific checks;
- new checklist section covering same-concept-same-shape, capability
  parity across sinks/transports, naming/verb parity, cross-language
  parity, easy-path=safe-path, and silent semantic divergence;
- Step 3b verification rule grading ergonomic findings by user impact
  (footguns are Critical/Moderate, pure style is Minor);
- the ergonomics agent now runs at level 1 and in the level-0 pass.
Add a row-major (`crate::ingress::Sender`) companion pool to `QuestDb`,
alongside the existing column-major senders. `borrow_row_sender()` hands
out a `BorrowedRowSender` that derefs to `Sender` (classic ILP
`Buffer` + `flush`) and returns to the pool on `Drop`, dropping instead
of recycling when the connection latched `must_close` or the caller
called `mark_must_close()`.

Mirrors the existing reader pool: lazy-init, independent free list and
`pool_max` cap, rebuilt from the stored connect string via
`SenderBuilder::from_conf`, with idle reaping (no warm floor). `borrow_sender`
remains column-major and unchanged. The three pools (column, reader, row)
cap independently, so combined live connections can reach `3 * pool_max`.

Add an exhaustive test suite (9 tests): lazy init, recycle/reuse,
auto-grow, fail-fast at cap, flush round-trip, explicit must-close drop,
column/row independence, concurrent borrow/return, manual + auto idle
reaping, and the build-failure in-use-slot leak guard.
@bluestreak01

Copy link
Copy Markdown
Member

Review — columnar DataFrame ingest (Arrow / Polars / Pandas) + Arrow egress

Adversarial level-3 review. Scope: 90 files, +40,097 / −972 — the combined landing of #148 (column_sender) + #150 (Arrow/Polars).

Disclosure: this review was produced with significant automated assistance, and the reviewer authored part of this PR (the borrow_row_sender pool, PolarsIngestOptions, impl TryInto<TableName>). Findings on those areas are effectively self-review — weight an independent pass on the pool/polars/ergonomics accordingly.

Method: build-profile facts captured; deep passes over the ~15 risk-bearing Rust/FFI files; docs/CI/examples/python-fuzz/bench treated as lower priority. Every finding below was verified against source (including arrow-rs 58.3.0).

Premise (load-bearing): questdb-rs-ffi is panic = "abort" in both release and dev (questdb-rs-ffi/Cargo.toml:72-76) → catch_unwind/panic_guard are no-ops; every reachable panic from an FFI entrypoint is a host-process abort.

Headline: unusually well-hardened. The highest-risk surface — the Arrow C Data Interface pre-from_ffi validation under panic=abort — was verified to close every eager-deref/assert/underflow site arrow-rs actually has. No confirmed memory-unsafety or data-corruption bug is reachable on the release profile through the documented, validated FFI path. The findings are an ergonomics data-loss footgun, a debug-only abort, FFI hardening gaps, and consistency/resource issues.


Critical (request changes)

C1 — flush() defers commit; dropping the sender (esp. the C++ RAII destructor) silently discards uncommitted rows. Easy path ≠ safe path. (verified)

  • ColumnSender::flush sends FLAG_DEFER_COMMIT and does not wait for an ack (sender.rs:164-170). Drop only poisons the conn — db.rs:829-832 / 868-871: if sender.conn.in_flight() > 0 { mark_must_close() } return_to_pool(...) — it never syncs. So flush + drop-without-sync() silently drops every uncommitted frame.
  • first_frame_sent travels with the pooled slot, so a recycled warm connection defers even the first flush. Identical user code loses everything on a recycled conn but commits on a cold one — nondeterministic from pool state.
  • C/C++ is the dangerous surface: column_sender.hpp ~borrowed_conn() noexcept { release(); } drops silently; the C header's flush/sync docs never state "un-synced rows are not committed." Row-API Sender::flush (HTTP) commits on return — the same verb has the opposite durability contract.
  • Fix: best-effort sync on Drop (or a loud debug_assert/log when in_flight()>0), and prominently document "MUST call sync/column_sender_sync or lose uncommitted rows" on flush, the C column_sender_flush, and the borrowed_conn dtor. (Caveat: documented on Rust flush and arguably intended pipelining design — flagging for an explicit decision, not asserting an accidental bug.)

Moderate

M1 — column_sender_arrow_import_new takes symbol_mode as a by-value #[repr(u32)] enum → out-of-range C value is instant UB. (verified) The file's own comment (column_sender.rs:273) says dtype/ack_level are taken as u32 specifically to make out-of-range a recoverable InvalidApiCall instead of UB — but column_sender_arrow_import_new (:1348) takes symbol_mode: column_sender_symbol_mode by value; a C caller passing 5 materializes an invalid discriminant (UB) before the match at :1352. Fix: make it u32 + map with InvalidApiCall on out-of-range, mirroring ack_level_from_u32.

M2 — numpy append has a data pointer + row_count but no buffer byte-length → a wrapper bug = OOB read (host crash/UB). (verified) column_sender_chunk_append_numpy_column(data: *const u8, row_count, dtype, ...) (column_sender.rs:2016) reads row_count * elem_size(dtype) bytes trusting caller dtype/count; nothing carries nbytes. An int8 array tagged f64/geohash_i64 reads 8× the buffer. Unlike the Arrow path (validated via from_ffi pre-checks), the numpy path has no length to even sanity-check. Fix: thread a data_len/nbytes param and assert row_count.checked_mul(elem_size) <= data_len before any read; reject non-C-contiguous / negative-stride in the wrapper.

M3 — Negative FixedSizeList size bypasses the Arrow validator → debug-build abort. (verified) validate_arrow_array_depth guards negative FixedSizeBinary(width) (lib.rs:4058-4064) but not FixedSizeList(_, size). A schema +w:-1 with n_buffers=2 passes the floor/cap checks, then arrow-rs bit_width computes child_bits * (-1 as usize). Under dev panic=abort + overflow-checks → abort; release wraps and validate_full then rejects gracefully (so: debug-only abort, release-safe). No +w:-N test exists. Fix: also reject negative FixedSizeList size (mirror the FixedSizeBinary guard).

M4 — Pool health Mutex held across the blocking connect handshake. (verified) connect_conn_pool (db.rs:1163) locks inner.health, then connect_connconnector.connect_round(&mut health) does full TCP+TLS+WS-upgrade I/O under the lock. Serializes all concurrent connects pool-wide and head-of-line-blocks transport-dead returns (which also lock_health) behind a slow/black-holed connect — degrading the failover path the pool exists for. No deadlock. Fix: choose the endpoint under the lock, release it during network I/O, re-acquire to record the result.

M5 — Egress: per-column degenerate-list node budget (16M) is reset per column, not per batch → OOM/abort amplification. (confirmed, reachability nuance) MAX_DEGENERATE_LIST_NODES=16M with node_budget recomputed each array_column_to_arrow call (convert.rs:644); a frame may carry MAX_COLUMNS_PER_TABLE=2048 array columns, each a [16M,0] row → multi-GB retained → OOM (=abort under FFI). Exact per-column wire cost not fully traced. Fix (regardless): make the node budget batch-global, or cap array-column count / cumulative expanded nodes per batch.

M6 — Egress: mid-query decode errors are now failover-eligible → deterministic-corruption reconnect/replay loop + per-round eprintln!. (confirmed, nuance) reader.rs routes decode ProtocolError through failover_after_stream_failure; warn_on_protocol_error_failover logs each round (reader.rs:541). A server deterministically emitting a corrupt frame now reconnect→replay→re-corrupt until the per-Execute budget drains, with unbounded stderr, where the old path failed fast. Fix: keep deterministic decode codes terminal (or give decode-driven replays a small dedicated cap); rate-limit the log.

M7 — Within the column API, table/column names use two different shapes + validation timings. (verified) column_sender_chunk_new takes raw const char* table_name / Chunk::new(impl Into<String>) validated lazily at flush (chunk.rs:423); column_sender_flush_arrow_batch takes a validated line_sender_table_name eagerly (column_sender.h:1081). A pre-validated line_sender_table_name can't feed the chunk path, and the same bad name fails at different points. Fix: accept the validated newtypes on the chunk/column entrypoints too, or document the split loudly.

M8 — Designated-timestamp requirement diverges between publish paths. (verified) The chunk path hard-errors without a designated timestamp (encoder.rs:108); flush_arrow_batch silently server-stamps unless _at_column is used — silently ignoring a timestamp column the caller intended as designated (clock-skew / wrong partition), no error. Fix: require an explicit server-stamp opt-in on flush_arrow_batch, or document the divergence at both sites.

M9 — durable_watermarks is never pruned (conn.rs:116, populated :476). Only pending_durable_targets is pruned on sync. With request_durable_ack=on, a server emitting many distinct table names grows the map for the connection's pooled life → slow OOM. Fix: prune/cap durable_watermarks.

M10 — No test coverage for sliced / non-zero-offset Arrow arrays — the classic encoder trap. (verified by grep) No .slice( anywhere in arrow_batch.rs tests; every test builds offset-0 arrays. Sliced arrays reach this encoder via the polars path (after df.slice/filter) and FFI. The slice handling reads correct in all paths traced, but a mission-critical column-major unsafe path with zero offset-slice tests is a real gap. Fix: add encode tests over array.slice(1, n) per ColumnKind, including an offset%8 != 0 bitmap case.


Minor

  • Encoder never calls validate_full itself (arrow_batch.rs:3205 only checks null_count ≤ len); safety is one missing call away for any future native caller of flush_arrow_batch passing an unvalidated/build_unchecked array.
  • numpy NumpyDtype::validate() doesn't range-check decimal scale (numpy_wire.rs:245) despite the module doc claiming the caller validates; FFI path is covered, direct Rust callers aren't.
  • numpy bulk paths use panicking out.reserve(..) while only the ndarray path uses try_reserve; mitigated upstream but inconsistent under panic=abort.
  • drifted_batch not cleared on the raw↔arrow cursor API switch (egress.rs:2566) → stale parked batch replayed out of order on a supported mixed-use path.
  • symbol_array assumes codes.len()==row_count unguarded (convert.rs:507) and reader.rs:1658 .expect("HaveBatch implies last_batch") → abort-on-invariant-violation under FFI; return ProtocolError instead.
  • Pre-existing (not this PR): line_sender_utf8_assert/table_name_assert/column_name_assert panic! on caller input; line_sender_utf8_init does from_raw_parts(buf,len) with no NULL guard.
  • Redundant double dict-rollback (encoder.rs:146/177 + sender.rs:303/354) — idempotent today, fragile if rollback ever becomes non-idempotent.
  • Strict trailing-byte rejection (conn.rs:931) latches the conn on any extra bytes → breaks forward-compat with a server that appends response fields.

Downgraded (initially suspected, verified safe — high-signal)

  • Arrow FFI is not an abort vector on release for structural malformation. The pre-from_ffi walk (lib.rs:3950-4136) closes every arrow-rs eager-deref/assert/underflow site, uses cycle-safe iterative depth/try_reserve walks, and rejects negative/over-MAX_CHUNK_ROWS length/offset. Verified against arrow-rs 58.3.0. (Residual truth: buffer byte-size/alignment lies are inherently untrustable across the C Data Interface — worth documenting.)
  • Null-bitmap construction is bit-exact and in-bounds for all offset/length/slice combinations (arrow_batch.rs:639, both aligned and unaligned byte-stride branches) — the PR's headline risk, verified.
  • numpy uses read_unaligned everywhere for multibyte reads; unaligned numpy buffers do not cause alignment UB.
  • conn.rs parser cannot be panicked by server bytes — every try_into().unwrap() is preceded by a length check; frames capped at 256 MiB; try_reserve before resize. Flagged panic!/expect are test-only.
  • In-flight/ack-slot accounting has no off-by-one; no cross-borrower commit — recycled conns are latched must_close when in_flight()>0; status 0x00 = committed.
  • Ownership/free across the Arrow CDI is correct — consume-on-success, double-import rejected (release.is_none()), exported batches own refcounted buffers (no use-after-free when the cursor drops), FailoverRetry re-exports a valid release.
  • ABI evolvability is sound — new error codes appended (no mid-enum insertion); ErrorCode/NumpyDtype/ArrowColumnOverride/AckLevel are #[non_exhaustive]; C++ noexcept forwarders call only non-throwing C fns. C↔Rust signature parity verified across ~40 column-sender entrypoints + the 43-value numpy-dtype enum.

Summary

  • Verdict: request changes — driven by C1 (silent data-loss footgun on the C/C++ RAII path) and FFI hardening gaps M1/M2/M3. The core unsafe machinery (Arrow validation, null-bitmap, wire encoders, pool accounting) is verified sound on the release profile; this is a high-quality PR.
  • Findings: 1 Critical, 10 Moderate, ~8 Minor; ~8 suspected issues verified-safe and downgraded.
  • Most actionable: (1) decide C1 (Drop-sync or loud warning + docs); (2) fix M1 (symbol_modeu32) and M3 (negative FixedSizeList) — small, clearly correct; (3) thread a length into the numpy ABI (M2); (4) add sliced-array encoder tests (M10).
  • Not deeply covered (time-boxed): the Python system-test fuzzers, C/C++ doctests, CMake/CI wiring, benches, and doc/*.md specs — recommend a focused pass on system_test/arrow_*_fuzz.py and cpp_test/test_arrow_* before merge for CI-parity assurance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants