Skip to content

feat(rust): pooled Db facade with custom high-performance connection pooling#162

Open
bluestreak01 wants to merge 1 commit into
mainfrom
feat/db-connection-pool
Open

feat(rust): pooled Db facade with custom high-performance connection pooling#162
bluestreak01 wants to merge 1 commit into
mainfrom
feat/db-connection-pool

Conversation

@bluestreak01

@bluestreak01 bluestreak01 commented Jun 18, 2026

Copy link
Copy Markdown
Member

Summary

Adds a single, cheaply-cloneable Db handle that pools both ingest Senders and egress Readers, so application code never opens or closes a connection itself. This mirrors the Java client's QuestDB facade (SenderPool / QueryClientPool / PooledSender / PoolHousekeeper), adapted to idiomatic Rust (RAII return instead of close()-to-return; synchronous query execution since the egress reader is sync/pull-based).

Pooling is custom — no external pool crate (no r2d2) — and the whole feature is gated behind a new pool Cargo feature.

use questdb::db::Db;
use questdb::ingress::TimestampNanos;

// One handle for the whole deployment; clone it across threads.
let db = Db::connect("http::addr=localhost:9000;", "ws::addr=localhost:9000;")?;

// Ingest — borrow a sender, build rows, flush. Returns to the pool on drop.
let mut sender = db.borrow_sender()?;
let mut buf = sender.new_buffer();
buf.table("trades")?.symbol("sym", "ETH-USD")?.column_f64("price", 2615.54)?.at(TimestampNanos::now())?;
sender.flush(&mut buf)?;

// Query — run a SELECT, consume batches; reader returns to the pool clean.
let summary = db.execute_query("select * from trades limit 100", |batch| {
    println!("{} rows", batch.row_count());
    true // return false to stop early (auto-cancels)
})?;

What's added

src/pool.rs — generic pool engine (Pool<M: Manage>)
Transport-agnostic core shared by both sides, unit-testable without a server.

  • Elastic: pre-warms min, grows on demand to max, reap_idle shrinks back toward min.
  • Blocking acquire with timeout via Mutex + Condvar.
  • New connections opened off-lock with an in_flight counter (a slow TLS/DNS connect never stalls other borrowers).
  • RAII return: Pooled<M> derefs to the connection and returns it on Drop; broken (mark_broken) or rejected (Manage::recycle) connections are discarded.
  • No per-borrow heap allocation; free list is a VecDeque pre-sized to max.

src/db.rs — the Db facade

  • Db is Clone (Arc-backed); construct once, share everywhere — pooling is hidden.
  • DbBuilder with defaults matching the Java client: min 1 / max 4 per pool, 5s acquire, 60s idle, 30min max-lifetime, 5s housekeeper.
  • RAII PooledSender / PooledReader guards (deref to Sender / Reader).
  • execute_query(sql, on_batch) drives a cursor to terminal and cancels cleanly on early stop, guaranteeing the reader returns reusable; errors discard the reader.
  • Unified Db::from_conf with http<->ws / https<->wss schema translation.
  • A background housekeeper thread reaps idle/over-age connections; stopped when the last Db clone drops.

Capacity-correctness fix (found by the stress tests)

Under churn with discards, a naive pool can briefly hold max + 1 real connections, because a discarded slot is freed (accounting-wise) before the connection is actually closed off-lock. Fixed with a closing counter that keeps the slot reserved during the off-lock close — the same mechanism as the Java client's closingSlots. The cap is now honoured against idle + leased + in_flight + closing.

Tests

Pool-core (deterministic, server-free, always in CI) — 18 tests, including 7 concurrency stress tests against an instrumented mock that tracks peak-live connections:

  • capacity cap never exceeded under heavy contention,
  • self-healing under ~25% random breakage,
  • reap-during-load (no deadlock),
  • close-under-load (no panic, no leak),
  • growth liveness (all max held at once via a barrier),
  • tight-cap serialisation (max=1, many contenders, no spurious timeouts).

Live-server (tests/db_pool_live.rs, gated behind live-server-tests, which now pulls in pool) — many threads share one cloned Db:

  • Ingress (sender pool): concurrent ingest, pool growth under a barrier, tight-pool serialisation, unified from_conf — verified via HTTP /exec. Run locally against a real QuestDB.
  • Egress (reader pool): concurrent queries, mixed ingest+query, early-stop reader reuse.

Note: the egress live tests are compile- and clippy-verified but were not run locally — the QWP/WS egress endpoint isn't answering in my local server build (the existing egress_live_server tests fail identically here), so it's an environment limit, not a code issue. They follow the standard harness conventions and will run in CI. The egress reader pool mechanics are fully covered by the deterministic concurrency tests.

examples/db_pool.rs — end-to-end demo (ingest + query + multi-thread sharing).

Verification

  • cargo test --features almost-all-features --lib1594 passed, 0 failed.
  • cargo clippy clean on lib, tests, and examples.
  • cargo fmt --check clean.
  • Live ingress concurrency tests pass repeatedly against a real QuestDB.

Notes / follow-ups (intentionally out of scope)

  • Thread-affine sender pinning (sender() / releaseSender() via thread_local!).
  • Store-and-forward per-slot sender_id / flock management for pooled QWP/WS senders sharing one sf_dir (the Java SenderPool slot-index machinery).

Summary by CodeRabbit

Release Notes

New Features

  • Added connection pooling support for ingest and query operations with configurable pool sizing, acquire/idle timeouts, and connection lifetime recycling
  • Introduced Db facade for simplified pooled database access with automatic connection lifecycle management
  • Added example demonstrating pooled ingestion, streaming queries, and multi-threaded operations

…n pooling

Adds a single, cheaply-cloneable `Db` handle that pools both ingest
`Sender`s and egress `Reader`s, so application code never opens or
closes a connection itself — mirroring the Java client's `QuestDB`
facade. Pooling is custom (no external pool crate) and gated behind a
new `pool` feature.

New modules:
- `pool`: a generic, allocation-light, blocking pool (`Pool<M: Manage>`)
  shared by both sides. Elastic (pre-warmed `min`, grows to `max`),
  blocking acquire with timeout via `Mutex` + `Condvar`, connect
  off-lock with an `in_flight` counter, RAII return via `Pooled` (Drop
  returns to the pool; broken/rejected connections are discarded), and
  `reap_idle` for idle/max-lifetime recycling.
- `db`: the `Db` facade + `DbBuilder` (defaults match the Java client:
  min 1 / max 4, 5s acquire, 60s idle, 30min lifetime, 5s housekeeper),
  RAII `PooledSender` / `PooledReader` guards, `execute_query` (drives a
  cursor to terminal, cancels cleanly on early stop so the reader returns
  reusable), and unified `from_conf` schema translation (http<->ws).

Capacity correctness: a discarded connection keeps its slot reserved via
a `closing` counter while it is closed off-lock, so the number of real
connections never exceeds `max` under churn (mirrors the Java client's
`closingSlots`). This was found by the concurrency stress tests below.

Tests:
- 18 pool-core unit tests including 7 concurrency stress tests (capacity
  cap under contention, self-healing under random breakage, reap-during-
  load, close-under-load with no leak, growth liveness, tight-cap
  serialisation). Deterministic, server-free, always run in CI.
- `tests/db_pool_live.rs`: live-server concurrency tests for both pools.
  Ingress tests verify via HTTP `/exec`; egress tests drive the reader
  pool. Gated behind `live-server-tests` (now pulls in `pool`).
- `examples/db_pool.rs`: end-to-end demo.
@coderabbitai

coderabbitai Bot commented Jun 18, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

Introduces a pool crate feature containing a generic blocking connection pool (pool.rs) and a Db facade (db.rs) that pairs two independent pools—one for Sender ingest connections and one for Reader query connections—with a background housekeeper thread. A DbBuilder provides configuration and unified-config parsing. A runnable example and live-server integration tests are also added.

Changes

QuestDB Connection Pool Feature

Layer / File(s) Summary
Generic pool: trait, config, error, internal state
questdb-rs/src/pool.rs
Defines Manage trait, PoolConfig, PoolError<E>, and internal Slot/State/Inner accounting structs backed by Mutex/Condvar.
Generic pool: Pool and Pooled RAII guard
questdb-rs/src/pool.rs
Implements Pool<M> with pre-warm, blocking borrow with off-lock connection creation and in_flight tracking, reap_idle, close, and the Pooled<M> drop guard that recycles or discards connections using the closing counter.
Generic pool: unit and concurrency test suite
questdb-rs/src/pool.rs
Comprehensive test harness with mock Manage, atomic Stats, assert_quiesced, and tests for pre-warm, reuse/growth, broken/discard, timeouts, connect-failure recovery, reap_idle, and concurrent stress/shutdown scenarios.
Db facade: error types, managers, guards, housekeeper
questdb-rs/src/db.rs
Defines DbError, default pool constants, SenderManager/ReaderManager implementing Manage, PooledSender/PooledReader with mark_broken, QuerySummary, and Housekeeper thread driving periodic reap_idle.
Db facade: Db API and execute_query loop
questdb-rs/src/db.rs
Implements DbInner/Db with Arc-based clone, close, borrow_sender/borrow_reader, and execute_query which streams result batches via a callback, accumulates QuerySummary, and cancels the cursor on early stop.
DbBuilder: config, validation, unified-config parsing, unit tests
questdb-rs/src/db.rs
Implements DbBuilder with sizing/timeout setters, validate_sizing, pool construction with mapped errors, derive_both_sides unified-config translation enforcing scheme and addr=, and unit tests.
Crate wiring, feature definition, and db_pool example
questdb-rs/src/lib.rs, questdb-rs/Cargo.toml, questdb-rs/examples/db_pool.rs
Exports pub mod pool and pub mod db behind the pool feature flag, defines the pool Cargo feature aggregating transport dependencies, extends live-server-tests and almost-all-features, and adds a runnable example demonstrating ingest, streaming query, and concurrent thread usage.
Live-server integration tests
questdb-rs/tests/db_pool_live.rs
Process-scoped QuestDbServer, HTTP validation helpers, and concurrency tests covering multi-thread ingest with pool growth, tight single-slot serialization, concurrent egress queries, mixed workloads, and early-stop query reusability.

Sequence Diagram(s)

sequenceDiagram
  participant App
  participant Db
  rect rgba(70, 130, 180, 0.5)
    Note over App,Db: Ingest path
    App->>Db: borrow_sender()
    Db->>Pool_Sender: borrow()
    Pool_Sender-->>Db: PooledSender
    Db-->>App: PooledSender
    App->>PooledSender: buffer rows, flush()
    PooledSender->>Pool_Sender: Drop → return to idle
  end
  rect rgba(60, 179, 113, 0.5)
    Note over App,Db: Query path
    App->>Db: execute_query(sql, on_batch)
    Db->>Pool_Reader: borrow()
    Pool_Reader-->>Db: PooledReader
    Db->>PooledReader: open_cursor(sql)
    loop per batch
      PooledReader-->>Db: DataSet
      Db->>App: on_batch(dataset) → bool
    end
    alt stopped early
      Db->>PooledReader: cursor.cancel()
    end
    Db-->>App: Ok(QuerySummary)
    PooledReader->>Pool_Reader: Drop → return to idle
  end
  rect rgba(184, 134, 11, 0.5)
    Note over Db: Background
    Housekeeper->>Pool_Sender: reap_idle()
    Housekeeper->>Pool_Reader: reap_idle()
  end
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Poem

🐇 Hoppity-hop through the connection pool!
A Mutex and Condvar keep borrowers cool.
Senders for ingest, Readers for query—
The housekeeper reaps so things don't get scary.
With RAII guards and arcs shared wide,
The rabbit pools connections with burrow-deep pride! 🎉

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: introduction of a pooled Db facade with connection pooling for the Rust QuestDB client, which is the primary focus of all file changes.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/db-connection-pool

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@questdb-rs/src/pool.rs`:
- Around line 343-371: The reap_idle function removes connections from the idle
queue and drops them outside the lock without updating the closing counter,
which can cause the max capacity to be temporarily exceeded during concurrent
operations. Before releasing the lock on self.inner.state, increment the closing
counter by the length of the to_close vector to account for the connections
being removed. After the drop(to_close) call completes, decrement the closing
counter by the same amount. This ensures that concurrent borrow operations see
an accurate connection count and cannot exceed the max capacity while
connections are being dropped outside the lock.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 00f0c72c-aa02-488e-a1ef-4cc79ea52d1e

📥 Commits

Reviewing files that changed from the base of the PR and between db63120 and f60168b.

📒 Files selected for processing (6)
  • questdb-rs/Cargo.toml
  • questdb-rs/examples/db_pool.rs
  • questdb-rs/src/db.rs
  • questdb-rs/src/lib.rs
  • questdb-rs/src/pool.rs
  • questdb-rs/tests/db_pool_live.rs

Comment thread questdb-rs/src/pool.rs
Comment on lines +343 to +371
let mut to_close: Vec<M::Conn> = Vec::new();
{
let mut st = self.inner.state.lock().unwrap();
if st.closed {
return;
}
// Most we may remove without dropping below `min`.
let mut removable = st.live().saturating_sub(self.inner.cfg.min);
if removable == 0 {
return;
}
let mut kept: VecDeque<Slot<M::Conn>> = VecDeque::with_capacity(st.idle.len());
while let Some(slot) = st.idle.pop_front() {
let idle_expired = idle_timeout
.is_some_and(|t| now.saturating_duration_since(slot.idle_since) >= t);
let over_age = max_lifetime
.is_some_and(|t| now.saturating_duration_since(slot.created_at) >= t);
if removable > 0 && (idle_expired || over_age) {
removable -= 1;
to_close.push(slot.conn);
} else {
kept.push_back(slot);
}
}
st.idle = kept;
}
// Close outside the lock.
drop(to_close);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚖️ Poor tradeoff

reap_idle should use the closing counter to maintain the capacity guarantee.

The closing counter exists to ensure real connections never exceed max during off-lock closure (as documented at lines 163-168). However, reap_idle removes connections from the idle accounting and drops them outside the lock without using this counter.

Race scenario:

  1. Pool max=2, both connections idle and expired
  2. reap_idle moves both to to_close, sets idle=0, releases lock
  3. Before drop(to_close) completes, two concurrent borrows see live()=0 < max
  4. Both create new connections → 4 real connections exist briefly, exceeding max=2
Proposed fix to use closing counter
     let mut to_close: Vec<M::Conn> = Vec::new();
     {
         let mut st = self.inner.state.lock().unwrap();
         if st.closed {
             return;
         }
         // Most we may remove without dropping below `min`.
         let mut removable = st.live().saturating_sub(self.inner.cfg.min);
         if removable == 0 {
             return;
         }
         let mut kept: VecDeque<Slot<M::Conn>> = VecDeque::with_capacity(st.idle.len());
         while let Some(slot) = st.idle.pop_front() {
             let idle_expired = idle_timeout
                 .is_some_and(|t| now.saturating_duration_since(slot.idle_since) >= t);
             let over_age = max_lifetime
                 .is_some_and(|t| now.saturating_duration_since(slot.created_at) >= t);
             if removable > 0 && (idle_expired || over_age) {
                 removable -= 1;
                 to_close.push(slot.conn);
             } else {
                 kept.push_back(slot);
             }
         }
         st.idle = kept;
+        st.closing += to_close.len();
     }
     // Close outside the lock.
     drop(to_close);
+    if to_close.capacity() > 0 {
+        // to_close was non-empty; decrement closing.
+        let mut st = self.inner.state.lock().unwrap();
+        st.closing -= to_close.capacity(); // capacity preserved after drop
+    }

Note: After drop(to_close), the Vec's length is 0 but capacity is preserved. You may prefer tracking the count in a separate variable:

+    let reap_count = to_close.len();
     // Close outside the lock.
     drop(to_close);
+    if reap_count > 0 {
+        let mut st = self.inner.state.lock().unwrap();
+        st.closing -= reap_count;
+        drop(st);
+        self.inner.cond.notify_all();
+    }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@questdb-rs/src/pool.rs` around lines 343 - 371, The reap_idle function
removes connections from the idle queue and drops them outside the lock without
updating the closing counter, which can cause the max capacity to be temporarily
exceeded during concurrent operations. Before releasing the lock on
self.inner.state, increment the closing counter by the length of the to_close
vector to account for the connections being removed. After the drop(to_close)
call completes, decrement the closing counter by the same amount. This ensures
that concurrent borrow operations see an accurate connection count and cannot
exceed the max capacity while connections are being dropped outside the lock.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant