feat(rust): pooled Db facade with custom high-performance connection pooling#162
feat(rust): pooled Db facade with custom high-performance connection pooling#162bluestreak01 wants to merge 1 commit into
Db facade with custom high-performance connection pooling#162Conversation
…n pooling Adds a single, cheaply-cloneable `Db` handle that pools both ingest `Sender`s and egress `Reader`s, so application code never opens or closes a connection itself — mirroring the Java client's `QuestDB` facade. Pooling is custom (no external pool crate) and gated behind a new `pool` feature. New modules: - `pool`: a generic, allocation-light, blocking pool (`Pool<M: Manage>`) shared by both sides. Elastic (pre-warmed `min`, grows to `max`), blocking acquire with timeout via `Mutex` + `Condvar`, connect off-lock with an `in_flight` counter, RAII return via `Pooled` (Drop returns to the pool; broken/rejected connections are discarded), and `reap_idle` for idle/max-lifetime recycling. - `db`: the `Db` facade + `DbBuilder` (defaults match the Java client: min 1 / max 4, 5s acquire, 60s idle, 30min lifetime, 5s housekeeper), RAII `PooledSender` / `PooledReader` guards, `execute_query` (drives a cursor to terminal, cancels cleanly on early stop so the reader returns reusable), and unified `from_conf` schema translation (http<->ws). Capacity correctness: a discarded connection keeps its slot reserved via a `closing` counter while it is closed off-lock, so the number of real connections never exceeds `max` under churn (mirrors the Java client's `closingSlots`). This was found by the concurrency stress tests below. Tests: - 18 pool-core unit tests including 7 concurrency stress tests (capacity cap under contention, self-healing under random breakage, reap-during- load, close-under-load with no leak, growth liveness, tight-cap serialisation). Deterministic, server-free, always run in CI. - `tests/db_pool_live.rs`: live-server concurrency tests for both pools. Ingress tests verify via HTTP `/exec`; egress tests drive the reader pool. Gated behind `live-server-tests` (now pulls in `pool`). - `examples/db_pool.rs`: end-to-end demo.
📝 WalkthroughWalkthroughIntroduces a ChangesQuestDB Connection Pool Feature
Sequence Diagram(s)sequenceDiagram
participant App
participant Db
rect rgba(70, 130, 180, 0.5)
Note over App,Db: Ingest path
App->>Db: borrow_sender()
Db->>Pool_Sender: borrow()
Pool_Sender-->>Db: PooledSender
Db-->>App: PooledSender
App->>PooledSender: buffer rows, flush()
PooledSender->>Pool_Sender: Drop → return to idle
end
rect rgba(60, 179, 113, 0.5)
Note over App,Db: Query path
App->>Db: execute_query(sql, on_batch)
Db->>Pool_Reader: borrow()
Pool_Reader-->>Db: PooledReader
Db->>PooledReader: open_cursor(sql)
loop per batch
PooledReader-->>Db: DataSet
Db->>App: on_batch(dataset) → bool
end
alt stopped early
Db->>PooledReader: cursor.cancel()
end
Db-->>App: Ok(QuerySummary)
PooledReader->>Pool_Reader: Drop → return to idle
end
rect rgba(184, 134, 11, 0.5)
Note over Db: Background
Housekeeper->>Pool_Sender: reap_idle()
Housekeeper->>Pool_Reader: reap_idle()
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@questdb-rs/src/pool.rs`:
- Around line 343-371: The reap_idle function removes connections from the idle
queue and drops them outside the lock without updating the closing counter,
which can cause the max capacity to be temporarily exceeded during concurrent
operations. Before releasing the lock on self.inner.state, increment the closing
counter by the length of the to_close vector to account for the connections
being removed. After the drop(to_close) call completes, decrement the closing
counter by the same amount. This ensures that concurrent borrow operations see
an accurate connection count and cannot exceed the max capacity while
connections are being dropped outside the lock.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 00f0c72c-aa02-488e-a1ef-4cc79ea52d1e
📒 Files selected for processing (6)
questdb-rs/Cargo.tomlquestdb-rs/examples/db_pool.rsquestdb-rs/src/db.rsquestdb-rs/src/lib.rsquestdb-rs/src/pool.rsquestdb-rs/tests/db_pool_live.rs
| let mut to_close: Vec<M::Conn> = Vec::new(); | ||
| { | ||
| let mut st = self.inner.state.lock().unwrap(); | ||
| if st.closed { | ||
| return; | ||
| } | ||
| // Most we may remove without dropping below `min`. | ||
| let mut removable = st.live().saturating_sub(self.inner.cfg.min); | ||
| if removable == 0 { | ||
| return; | ||
| } | ||
| let mut kept: VecDeque<Slot<M::Conn>> = VecDeque::with_capacity(st.idle.len()); | ||
| while let Some(slot) = st.idle.pop_front() { | ||
| let idle_expired = idle_timeout | ||
| .is_some_and(|t| now.saturating_duration_since(slot.idle_since) >= t); | ||
| let over_age = max_lifetime | ||
| .is_some_and(|t| now.saturating_duration_since(slot.created_at) >= t); | ||
| if removable > 0 && (idle_expired || over_age) { | ||
| removable -= 1; | ||
| to_close.push(slot.conn); | ||
| } else { | ||
| kept.push_back(slot); | ||
| } | ||
| } | ||
| st.idle = kept; | ||
| } | ||
| // Close outside the lock. | ||
| drop(to_close); | ||
| } |
There was a problem hiding this comment.
reap_idle should use the closing counter to maintain the capacity guarantee.
The closing counter exists to ensure real connections never exceed max during off-lock closure (as documented at lines 163-168). However, reap_idle removes connections from the idle accounting and drops them outside the lock without using this counter.
Race scenario:
- Pool
max=2, both connections idle and expired reap_idlemoves both toto_close, setsidle=0, releases lock- Before
drop(to_close)completes, two concurrent borrows seelive()=0 < max - Both create new connections → 4 real connections exist briefly, exceeding
max=2
Proposed fix to use closing counter
let mut to_close: Vec<M::Conn> = Vec::new();
{
let mut st = self.inner.state.lock().unwrap();
if st.closed {
return;
}
// Most we may remove without dropping below `min`.
let mut removable = st.live().saturating_sub(self.inner.cfg.min);
if removable == 0 {
return;
}
let mut kept: VecDeque<Slot<M::Conn>> = VecDeque::with_capacity(st.idle.len());
while let Some(slot) = st.idle.pop_front() {
let idle_expired = idle_timeout
.is_some_and(|t| now.saturating_duration_since(slot.idle_since) >= t);
let over_age = max_lifetime
.is_some_and(|t| now.saturating_duration_since(slot.created_at) >= t);
if removable > 0 && (idle_expired || over_age) {
removable -= 1;
to_close.push(slot.conn);
} else {
kept.push_back(slot);
}
}
st.idle = kept;
+ st.closing += to_close.len();
}
// Close outside the lock.
drop(to_close);
+ if to_close.capacity() > 0 {
+ // to_close was non-empty; decrement closing.
+ let mut st = self.inner.state.lock().unwrap();
+ st.closing -= to_close.capacity(); // capacity preserved after drop
+ }Note: After drop(to_close), the Vec's length is 0 but capacity is preserved. You may prefer tracking the count in a separate variable:
+ let reap_count = to_close.len();
// Close outside the lock.
drop(to_close);
+ if reap_count > 0 {
+ let mut st = self.inner.state.lock().unwrap();
+ st.closing -= reap_count;
+ drop(st);
+ self.inner.cond.notify_all();
+ }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@questdb-rs/src/pool.rs` around lines 343 - 371, The reap_idle function
removes connections from the idle queue and drops them outside the lock without
updating the closing counter, which can cause the max capacity to be temporarily
exceeded during concurrent operations. Before releasing the lock on
self.inner.state, increment the closing counter by the length of the to_close
vector to account for the connections being removed. After the drop(to_close)
call completes, decrement the closing counter by the same amount. This ensures
that concurrent borrow operations see an accurate connection count and cannot
exceed the max capacity while connections are being dropped outside the lock.
Summary
Adds a single, cheaply-cloneable
Dbhandle that pools both ingestSenders and egressReaders, so application code never opens or closes a connection itself. This mirrors the Java client'sQuestDBfacade (SenderPool/QueryClientPool/PooledSender/PoolHousekeeper), adapted to idiomatic Rust (RAII return instead ofclose()-to-return; synchronous query execution since the egress reader is sync/pull-based).Pooling is custom — no external pool crate (no
r2d2) — and the whole feature is gated behind a newpoolCargo feature.What's added
src/pool.rs— generic pool engine (Pool<M: Manage>)Transport-agnostic core shared by both sides, unit-testable without a server.
min, grows on demand tomax,reap_idleshrinks back towardmin.Mutex+Condvar.in_flightcounter (a slow TLS/DNS connect never stalls other borrowers).Pooled<M>derefs to the connection and returns it onDrop; broken (mark_broken) or rejected (Manage::recycle) connections are discarded.VecDequepre-sized tomax.src/db.rs— theDbfacadeDbisClone(Arc-backed); construct once, share everywhere — pooling is hidden.DbBuilderwith defaults matching the Java client: min 1 / max 4 per pool, 5s acquire, 60s idle, 30min max-lifetime, 5s housekeeper.PooledSender/PooledReaderguards (deref toSender/Reader).execute_query(sql, on_batch)drives a cursor to terminal and cancels cleanly on early stop, guaranteeing the reader returns reusable; errors discard the reader.Db::from_confwithhttp<->ws/https<->wssschema translation.Dbclone drops.Capacity-correctness fix (found by the stress tests)
Under churn with discards, a naive pool can briefly hold
max + 1real connections, because a discarded slot is freed (accounting-wise) before the connection is actually closed off-lock. Fixed with aclosingcounter that keeps the slot reserved during the off-lock close — the same mechanism as the Java client'sclosingSlots. The cap is now honoured againstidle + leased + in_flight + closing.Tests
Pool-core (deterministic, server-free, always in CI) — 18 tests, including 7 concurrency stress tests against an instrumented mock that tracks peak-live connections:
maxheld at once via a barrier),Live-server (
tests/db_pool_live.rs, gated behindlive-server-tests, which now pulls inpool) — many threads share one clonedDb:from_conf— verified via HTTP/exec. Run locally against a real QuestDB.examples/db_pool.rs— end-to-end demo (ingest + query + multi-thread sharing).Verification
cargo test --features almost-all-features --lib→ 1594 passed, 0 failed.cargo clippyclean on lib, tests, and examples.cargo fmt --checkclean.Notes / follow-ups (intentionally out of scope)
sender()/releaseSender()viathread_local!).sender_id/ flock management for pooled QWP/WS senders sharing onesf_dir(the JavaSenderPoolslot-index machinery).Summary by CodeRabbit
Release Notes
New Features
Dbfacade for simplified pooled database access with automatic connection lifecycle management