Skip to content

fix(qwp): give pooled store-and-forward senders distinct slot ids#53

Open
bluestreak01 wants to merge 13 commits into
mainfrom
fix/pool-sf-slot-collision
Open

fix(qwp): give pooled store-and-forward senders distinct slot ids#53
bluestreak01 wants to merge 13 commits into
mainfrom
fix/pool-sf-slot-collision

Conversation

@bluestreak01

@bluestreak01 bluestreak01 commented Jun 17, 2026

Copy link
Copy Markdown
Member

Problem

SenderPool builds every sender from one immutable config string. When store-and-forward (SF) is enabled (sf_dir set), every sender therefore inherited the same sender_id (default "default"), pointed at the same slot dir <sf_dir>/default, and took an exclusive flock there for its lifetime. The second borrow() died on the lock:

java.lang.IllegalStateException: sf slot already in use by another process
  [slot=/.../qdb-sf-.../default, holder=pid=...]
  at SlotLock.acquire(SlotLock.java:99)
  at CursorSendEngine.<init>(...)
  at Sender.fromConfig(...)
  at SenderPool.createUnlocked(...)
  at SenderPool.borrow(...)

Net effect: an SF pool could hold at most one live sender — pooling + SFA was effectively broken. The code even documented the footgun (Sender.java: "multi-sender setups must set this explicitly or the second sender will fail") but the pool had no way to satisfy it with a single config string.

Fix

When SF is enabled, the pool now hands each sender a distinct, stable slot id <base>-<index>:

  • <base> = the configured sender_id (default "default"), so two pools/processes can share one sf_dir by using distinct sender_ids.
  • <index> = a pool slot index in [0, maxSize), recycled lowest-free-first.

Because indices are deterministic and reused, a slot dir is re-adopted across restarts and any unacked data it holds is recovered on creation (recovery semantics rely on stable ids — random UUIDs would orphan the data).

Non-SF paths (HTTP / TCP / memory-mode) are unchangedslotIndex is -1 and no slot bookkeeping runs. The cross-writer guard the slot lock exists for is preserved: two pools sharing one sf_dir with the same base still fail fast.

Flock-safe slot lifecycle

A slot index is only returned to the free set after its delegate releases the flock. The pool tracks the in-between states with two counters, both folded into the capacity check:

all + inFlight + closingSlots + leakedSlots < maxSize
  • closingSlots — a slot whose delegate is mid-close() and still releasing its flock. Transient: once the flock drops, the index goes back to the free set. This keeps a concurrent borrow() from reclaiming a dir whose lock is still held.
  • leakedSlots — a slot whose delegate close() returned with the flock still held (the I/O thread refused to stop, observed via the new QwpWebSocketSender.isSlotLockReleased()). The index is retired permanently — never freed, never reused — so no borrow ever hands out a still-locked dir. The lost capacity is accounted for in the cap math, so the pool degrades gracefully (effective max shrinks) instead of resurrecting "sf slot already in use".

A permanent retirement is no longer silent: SenderPool now has a logger and emits a WARN at both retire sites (discardBroken + reapIdle) naming the slot index and the reduced effective capacity, and exposes the count via a public leakedSlotCount() accessor for metrics. Previously the pool had no logger at all, so a pool bleeding capacity degraded to "every borrow() times out" with nothing in the logs to explain why.

Orphan-drain coexistence

The pool co-manages every <base>-<index> slot and recovers each slot's unacked data when it (re)creates it. To stop a sibling sender's startup drainer from adopting another pool slot's dir/lock (which would resurface "sf slot already in use"), the pool fences off its own <base>- namespace from orphan draining:

  • new LineSenderBuilder.orphanDrainExcludePrefix(String) builder option,
  • new 3-arg OrphanScanner.scan(sfDir, excludeSlotName, excludeNamePrefix) overload,
  • build() passes the configured exclude-prefix into the orphan scan.

This is a no-op unless drain_orphans=on; foreign leftovers under other names are still drained.

Teardown hardening (Error-safe)

createUnlocked() runs a heavy native build path (mmap, flock, WebSocket connect) and delegate close() can throw an Error (e.g. an -ea AssertionError, OutOfMemoryError), not just a RuntimeException. Best-effort teardown and the reaper daemon are therefore widened from catch (RuntimeException) to catch (Throwable) so a stray Error can't strand sibling flocks, over-count slot bookkeeping, or kill the housekeeper thread mid-flight. Hardened across 3 files: SenderPool, QueryClientPool, and PoolHousekeeper.

The pool builds senders outside its lock, so two first-borrows sharing one sf_dir can race into mkdir(sf_dir); the loser previously failed with a spurious could not create sf_dir. build() now treats a benign creation race (the dir exists afterwards) as success.

Changes

  • Sender.LineSenderBuilder — additive introspection accessors isStoreAndForwardEnabled() and getConfiguredSenderId(), plus the orphanDrainExcludePrefix(...) option; build() wires the exclude-prefix into the orphan scan. The pool probes the config once at construction, so a bad config now fails eagerly even when minSize == 0.
  • QwpWebSocketSender — exposes isSlotLockReleased() so the pool can tell whether close() actually dropped the flock.
  • OrphanScanner — new 3-arg scan(...) overload with a name-prefix exclusion.
  • PooledSender — carries a stable slotIndex.
  • SenderPool — per-slot sender_id derivation, slot-index allocator, flock-safe slot lifecycle via closingSlots/leakedSlots, orphan-drain namespace fencing, Throwable-safe teardown, and a logger that WARNs on permanent slot retirement (plus a leakedSlotCount() accessor).
  • Sender.build() — tolerate a benign concurrent mkdir(sf_dir) race.
  • QueryClientPool / PoolHousekeeper — Throwable-safe teardown / reap loop.

Tests

New SenderPoolSfTest (17 tests):

  • Two concurrent SF senders (the original repro) — now passes
  • Grow-to-max coexistence; extra borrow times out (no collision)
  • Configured sender_id used as slot base
  • Returned sender reuses same slot
  • Reaped slot index recycled (no default-N drift)
  • Repeated saturation never exhausts indices (cap / closingSlots invariant guard)
  • End-to-end ingest through pooled senders
  • Second pool same sf_dir+base fails fast
  • Distinct base shares sf_dir
  • close() releases all slots (re-acquirable)
  • Leaked slot retired permanently when delegate close() leaves the flock held (leakedSlots capacity-loss guard)
  • Recovery replay through a re-adopted slot
  • Concurrent borrow/return stress (6 threads × 25 iterations)
  • drain_orphans pool does not cannibalize sibling pool slots
  • drain_orphans pool still drains genuinely foreign orphans
  • Leaked slot is observable — retirement emits a WARN (captured via a logback appender) and bumps leakedSlotCount()
  • Concurrent first-borrows with min=0 race on sf_dir (benign mkdir race)

Plus new OrphanScannerTest (11 tests) covering the prefix-exclusion overload, and SenderPoolTest gains additional non-SF coverage.

Verification

  • SenderPoolSfTest: 17/17 ✅
  • OrphanScannerTest: 11/11 ✅
  • SenderPoolTest (incl. existing non-SF): 19/19 ✅ (no regression)
  • All sf.** / sf.cursor.** suites ✅
  • 79 WS lifecycle/integration tests (reconnect, failover, recovery, close, connect): 79/79 ✅

A SenderPool builds every sender from one immutable config string, so
every store-and-forward (SF) sender inherited the same sender_id (default
"default"), pointed at the same slot dir <sf_dir>/default, and the second
borrow() died on the slot flock with "sf slot already in use by another
process". An SF pool could therefore hold at most one live sender.

Fix: when SF is enabled, the pool now hands each sender a distinct, stable
slot id <base>-<index>, where <base> is the configured sender_id (default
"default") and <index> is a pool slot index in [0, maxSize). Indices are
recycled lowest-free-first, so a slot dir is reused deterministically and
across a restart the same dirs are re-adopted and their unacked data is
recovered on creation.

A slot is only returned to the free set once its delegate has released the
flock, tracked via a new closingSlots counter so the cap check
(all + inFlight + closingSlots < maxSize) and the slot allocator stay
consistent and no concurrent borrow can reclaim a slot dir whose lock is
still held. Non-SF (HTTP/TCP/memory-mode) paths are unchanged: slotIndex
is -1 and no slot bookkeeping runs.

The cross-writer guard the slot lock exists for is preserved: two pools
sharing one sf_dir with the same base still fail fast.

Changes:
- Sender.LineSenderBuilder: add isStoreAndForwardEnabled() and
  getConfiguredSenderId() introspection accessors (additive, no behavior
  change). The pool also probes the config once at construction so a bad
  config fails eagerly even when minSize == 0.
- PooledSender: carry a stable slotIndex (-1 when SF is off).
- SenderPool: per-slot sender_id derivation, slot-index allocator, and
  flock-safe slot lifecycle via closingSlots.

Tests: new SenderPoolSfTest (12 tests) covering the two-concurrent-sender
repro, grow-to-max coexistence, configured-base slot naming, slot reuse,
reap-and-reuse, repeated-saturation invariant, end-to-end ingest, the
cross-pool flock guard, distinct-base sharing, recovery replay through a
re-adopted slot, and a concurrent borrow/return stress test. Existing
non-SF SenderPoolTest (17) and the WS lifecycle suites remain green.
@bluestreak01 bluestreak01 added the bug Something isn't working label Jun 17, 2026
@bluestreak01 bluestreak01 changed the title fix(pool): give pooled store-and-forward senders distinct slot ids fix(qwp): give pooled store-and-forward senders distinct slot ids Jun 18, 2026
…m orphan drain

Pool teardown Error-safety (M1):
Best-effort teardown paths caught only RuntimeException, but a delegate
close()/worker shutdown() can throw an Error (e.g. an -ea AssertionError,
rethrown as-is by QwpWebSocketSender.close via rethrowTerminal). Such an
Error escaped and caused a permanent capacity leak:
  - SenderPool.discardBroken: skipped freeSlotIndex + closingSlots--, so
    an SF slot stayed reserved forever (slotInUse stuck true).
  - SenderPool.reapIdle: aborted the close loop (sibling flocks leaked) and
    skipped the slot-release block (all reaped indices stranded).
  - PoolHousekeeper: the Error killed the daemon thread, stopping all
    future reaping.
The understated capacity (cap check uses closingSlots) eventually forced
borrow() to only ever time out.

Fix: catch Throwable at every delegate close()/worker shutdown() site and
move the slot accounting into a finally so it always runs. Widen the
housekeeper's reapIdle guards to Throwable so an Error can never kill the
thread. Apply the same discipline to QueryClientPool (its close() loop had
no guard at all). Add regression tests that inject an AssertionError into
delegate teardown via a Proxy delegate swap and assert the reap/close loops
survive it, close every delegate, and leave the pool fully usable.

Orphan-drain namespace exclusion:
Add LineSenderBuilder.orphanDrainExcludePrefix / OrphanScanner.scan overload
so a pooled SF sender's startup drainer never adopts a sibling pool slot's
dir/lock (which would resurrect "sf slot already in use"). The pool sets the
exclude prefix to its own "<base>-" namespace; foreign leftovers are still
drained.
The pool's invariant was "delegate.close() returned => SF flock released",
so discardBroken()/close() called freeSlotIndex() in a finally regardless.
But QwpWebSocketSender.close() has an early-return path (ioThreadStopped
becomes false when cursorSendLoop.close() throws) that returns normally
before reaching cursorEngine.close() -- the only flock release. The pool
would then free the index and hand a still-locked dir to the next borrow,
resurrecting "sf slot already in use" permanently.

Make the flock-release a queryable postcondition and retire (not reuse) a
slot whose delegate did not release it:

- QwpWebSocketSender: add slotLockReleased flag (set true only after the
  cursorEngine.close() block) + isSlotLockReleased() accessor.
- SenderPool: add leakedSlots counter (folded into the borrow cap check);
  freeSlotIndex only when flockReleased() confirms the lock dropped,
  otherwise move the slot from closingSlots to leakedSlots and keep the
  index reserved permanently so no borrow ever reuses a locked dir.

Releasing the flock early would be worse (a concurrent borrower could
corrupt the slot dir the zombie I/O thread is still writing), so the slot
is leaked rather than reused.

Add SenderPoolSfTest.testSlotLeakedWhenDelegateCloseDoesNotReleaseFlock:
forges the not-released symptom, routes the wrapper through discardBroken,
and asserts the index stays reserved, a fresh dir is used, and capacity is
permanently reduced. Verified it fails when the gate is neutered.
…erver API

main #47 (a02732c) reworked TestWebSocketServer to bind an OS-assigned
ephemeral port at construction and dropped the (int port, handler) ctor in
favour of (handler) + getPort(). This PR's new SenderPoolSfTest predated
that change and still called new TestWebSocketServer(port, handler), so the
PR-merge-into-main build failed testCompile with 18x 'int cannot be
converted to WebSocketServerHandler'.

Merge main and switch all 18 call sites to the new API: drop
TestPorts.findUnusedPort(), construct with the handler only, and read the
bound port via server.getPort() inside the try block. Removes the now-unused
TestPorts import. All 15 SenderPoolSfTest tests pass.
@bluestreak01 bluestreak01 enabled auto-merge (squash) June 18, 2026 12:09
When a pooled store-and-forward sender's delegate close() returns with
the slot flock still held (the I/O thread refused to stop), SenderPool
retires the slot index permanently. Until now this happened silently:
SenderPool had no logger, so a pool slowly bleeding capacity degraded to
"every borrow() times out" with nothing in the logs to explain why.

- SenderPool: add an SLF4J logger and emit a WARN at both leakedSlots++
  sites (discardBroken + reapIdle) naming the retired slot index and the
  reduced effective capacity.
- SenderPool: add public leakedSlotCount() accessor for metrics/tests.
- Sender.build(): treat a benign concurrent mkdir(sf_dir) race (EEXIST,
  dir now exists) as success instead of failing the loser.
- Tests: testLeakedSlotIsObservable (captures the WARN via a logback
  appender + asserts leakedSlotCount()) and
  testConcurrentFirstBorrowsWithMinZeroRaceOnSfDir (min=0 first-borrow
  race on sf_dir). module-info: test reads ch.qos.logback.core.
…meException

The teardown-hardening work widened best-effort cleanup catches to Throwable
but left the four outer catches that wrap the heavy native build/connect path
(createUnlocked/build/connect/start) at catch (RuntimeException). The project
runs with -ea, so those paths can throw an Error (AssertionError, OOM) that
skips cleanup:

- SenderPool ctor prewarm: an Error stranded already-built SF delegates
  (flock + mmap'd ring + I/O thread), resurrecting 'sf slot already in use'.
- QueryClientPool ctor prewarm: an Error stranded pre-warmed worker threads.
- QueryClientPool.acquire(): an Error skipped inFlightCreations--, permanently
  shrinking capacity until every acquire() timed out.
- QueryClientPool.createUnlocked(): an Error skipped client.close(), leaking
  the field-initialised NATIVE_DEFAULT scratch.

Widen all four to catch (Throwable e), matching SenderPool.borrow(). The inner
cleanup loops were already Throwable-safe and the bodies already rethrow.

Add red/green regression tests for each site. QwpQueryClient is concrete, so a
package-private connectHook seam injects an Error at the real connect step
(fromConfig still allocates, so NATIVE_DEFAULT leak assertions are meaningful);
Sender is an interface, faked via Proxy behind a package-private senderFactory
seam. Both seams are reached by reflection through package-private constructors
(main module is open); production callers pass null for real behaviour. Each
test was confirmed to go red when its catch is reverted to RuntimeException.
…axSize doesn't strand SF data

A pooled SF sender excluded its whole "<base>-" namespace from orphan
draining, while the pool only ever issues slot indices in [0, maxSize).
If a deployment restarted with a smaller maxSize (e.g. 4->2) while
default-2/default-3 still held unacked .sfa data, those slots were
neither re-created (out of index range) nor drained (matched the
excluded prefix) -- so even with drain_orphans=on the data was silently
never recovered.

Replace the coarse prefix exclusion with an exact managed-slot
exclusion bounded by maxSize:

- OrphanScanner.scan(sfDir, exclude, managedBase, managedSlotCount)
  excludes only <base>-<i> for canonical i in [0, managedSlotCount).
  Same-base indices >= count, and non-canonical suffixes
  (default-007, default-foo), are treated like foreign leftovers and
  drained through the existing flock-safe background drainer.
- Sender builder hook orphanDrainExcludePrefix(String) ->
  orphanDrainExcludeManagedSlots(String base, int slotCount).
- SenderPool passes (slotBaseId, maxSize).

In-range live slots are still excluded, preserving the anti-collision
guarantee the per-slot ids were added for. A base change keeps working
(old dirs become foreign and drain).

Tests (verified red without the fix):
- OrphanScannerTest: bounded in-range exclusion, out-of-range drain
  after shrink, non-canonical names drained, disabled-when-count<=0,
  isManagedSlot predicate matrix.
- SenderPoolSfTest#testShrinkingMaxSizeDrainsStrandedOutOfRangeSlots:
  seeds unacked data into default-0..3 via a maxSize=4 pool, restarts
  at maxSize=2 with drain_orphans=on, asserts default-2/3 are recovered
  (no .sfa left, no .failed sentinel).
PooledSender.close() wrapped delegate.flush() in catch (RuntimeException),
so an Error (AssertionError under -ea, OutOfMemoryError, ...) left broken
unset and the failed sender was returned to the pool via giveBack() as
healthy. Since Sender does not clear its buffer on flush failure, the next
borrower would inherit the unsent rows or a dead connection.

Invert the flag to track normal completion instead: flush() must complete
to set flushed=true, otherwise discardBroken() runs. This treats any
abnormal exit (RuntimeException or Error) as unrecyclable, needs no
explicit rethrow (the original throwable propagates naturally), and matches
the catch-Throwable posture already used elsewhere in the pool.
The QuestDBImpl constructor orchestrates SenderPool and QueryClientPool
construction: it builds the SenderPool first, then the QueryClientPool,
then starts the housekeeper, with a cleanup catch that closes whatever was
already built on failure. That catch was catch (RuntimeException).

Both pool constructors run heavy native build/connect paths (mmap, flock,
WebSocket connect) that can throw an Error under -ea (AssertionError) or via
OOM in production. The teardown-hardening work already widened SenderPool,
QueryClientPool, and their prewarm catches to catch (Throwable) for exactly
this reason, but the orchestrator that ties them together was missed: an
Error from new QueryClientPool(...) skipped the narrow catch, so the
already-built SenderPool was never closed -- stranding its prewarmed
delegates' flocks, mmap'd rings, and I/O threads. This is the precise leak
class the hardening work exists to kill, reachable under -ea (how QuestDB
tests run) and via OOM in production.

Widen the cleanup catch to catch (Throwable e), matching the pools it calls.

Add a red/green regression test. A package-private QuestDBImpl seam
constructor (mirroring the SenderPool/QueryClientPool seams) threads the
senderFactory and connectHook through to the pools; production callers pass
null for both. The test fakes a Sender via Proxy (close() flips a flag,
senderMin=1) and injects an AssertionError from the QueryClientPool connect
hook (queryMin=1); the fake delegate's close() flag is the discriminator.
Confirmed red against catch (RuntimeException) and green against
catch (Throwable).
Every pooled SF sender's orphan drainer excludes the pool's whole
[0, maxSize) managed slot range (orphanDrainExcludeManagedSlots) so a
sibling never adopts a slot dir/lock the pool intends to (re)create --
that exclusion is what keeps the per-slot ids from resurfacing
"sf slot already in use". The side effect: an in-range slot left holding
unacked data by a previous run was recovered ONLY when the pool happened
to (re)create that index. Because the pool pre-warms [0, minSize) and
builds [minSize, maxSize) lazily at the lowest free index on demand, a
high in-range slot under steady low load was never rebuilt -- neither
drained (excluded) nor recovered -- so its store-and-forward data sat
stranded on disk (durable, but undelivered) until a restart or a load
spike. This was strictly weaker than the drain_orphans=on contract a
standalone sender gives, and the Javadoc quietly assumed the slot would
be recreated.

Close the gap without touching the exclusion (widening it would re-open
the cannibalization race). The pool now recovers its own stranded managed
slots once, at construction, while it is still single-threaded and
unpublished: for each in-range slot dir that holds unacked data and is not
already live (prewarmed), it reserves the index, builds a sender on it
(which re-adopts and recovers the slot), drains the recovered frames, then
closes it and frees the index. Reserving the index for the duration means
no concurrent borrow can target the dir, so the cannibalization race the
exclusion guards against cannot occur here either. Every step is
best-effort: a clean slot is a cheap directory probe; an unreachable
server, slow drain, or build/close Error is logged and never fails
construction (the data stays durable on disk for a later attempt), and a
build/connect failure short-circuits the scan to avoid paying a connect
timeout per slot.

Add getConfiguredSfDir() to LineSenderBuilder so the pool can locate its
group root, mirroring the existing getConfiguredSenderId() hook.

Add a red/green regression test: seed default-0..3 in a busy maxSize=4
run, then restart at the SAME maxSize=4 with minSize=0 and steady low load
(a single borrow). Before the fix all four in-range slots stayed stranded
(test waits 15s and fails); after the fix startup recovery empties every
slot and replays the frames to the server. Confirmed red with the recovery
call disabled and green with it enabled; full SenderPool/SF/orphan suites
pass unchanged.
Commit 7118264 made PooledSender.close() route any abnormal flush() exit
(RuntimeException OR Error) to discardBroken instead of recycling, but
shipped without a test that exercises the Error branch -- the existing
flush-failure tests drive a LineSenderException (a RuntimeException), which
the pre-fix catch (RuntimeException) already handled, so they pass against
both pre- and post-fix code.

Add a white-box test via the package-private senderFactory seam: inject a
delegate whose flush() throws an AssertionError, then assert the wrapper is
discarded (next borrow() returns a fresh instance) rather than recycled.

Verified RED against 7118264^ (fails on assertNotSame -- the broken wrapper
is handed back to the next borrower) and GREEN on the current tree.
SenderPool.borrow() reserves an SF slot index (allocateSlotIndex) before
building the delegate and must return it (freeSlotIndex) if createUnlocked()
throws, or slotInUse[idx] stays stuck true: capacity is permanently lowered
and a later borrow() trips "no free SF slot index" / eventually only times
out -- the failure mode this PR fixes. The pre-existing error-injection test
fails in the constructor pre-warm loop with a non-SF config (slotIndex == -1),
so the borrow path and the SF (slotIndex >= 0) case were uncovered.

Inject (via the senderFactory seam) an SF-config pool whose factory throws on
the first borrow-triggered build and succeeds afterwards; assert the throwable
propagates AND a subsequent borrow() reuses the slot (capacity intact).

Verified RED by removing freeSlotIndex(slotIndex) from the borrow catch
(2nd borrow() throws IllegalStateException "no free SF slot index"), GREEN on
the current tree.
@mtopolnik

Copy link
Copy Markdown
Contributor

[PR Coverage check]

😍 pass : 165 / 205 (80.49%)

file detail

path covered line new line coverage
🔵 io/questdb/client/impl/PoolHousekeeper.java 0 2 00.00%
🔵 io/questdb/client/impl/QuestDBImpl.java 1 3 33.33%
🔵 io/questdb/client/impl/QueryClientPool.java 10 17 58.82%
🔵 io/questdb/client/impl/SenderPool.java 92 116 79.31%
🔵 io/questdb/client/cutlass/qwp/client/sf/cursor/OrphanScanner.java 45 50 90.00%
🔵 io/questdb/client/impl/PooledSender.java 7 7 100.00%
🔵 io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java 2 2 100.00%
🔵 io/questdb/client/Sender.java 8 8 100.00%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants