fix(qwp): give pooled store-and-forward senders distinct slot ids#53
Open
bluestreak01 wants to merge 13 commits into
Open
fix(qwp): give pooled store-and-forward senders distinct slot ids#53bluestreak01 wants to merge 13 commits into
bluestreak01 wants to merge 13 commits into
Conversation
A SenderPool builds every sender from one immutable config string, so every store-and-forward (SF) sender inherited the same sender_id (default "default"), pointed at the same slot dir <sf_dir>/default, and the second borrow() died on the slot flock with "sf slot already in use by another process". An SF pool could therefore hold at most one live sender. Fix: when SF is enabled, the pool now hands each sender a distinct, stable slot id <base>-<index>, where <base> is the configured sender_id (default "default") and <index> is a pool slot index in [0, maxSize). Indices are recycled lowest-free-first, so a slot dir is reused deterministically and across a restart the same dirs are re-adopted and their unacked data is recovered on creation. A slot is only returned to the free set once its delegate has released the flock, tracked via a new closingSlots counter so the cap check (all + inFlight + closingSlots < maxSize) and the slot allocator stay consistent and no concurrent borrow can reclaim a slot dir whose lock is still held. Non-SF (HTTP/TCP/memory-mode) paths are unchanged: slotIndex is -1 and no slot bookkeeping runs. The cross-writer guard the slot lock exists for is preserved: two pools sharing one sf_dir with the same base still fail fast. Changes: - Sender.LineSenderBuilder: add isStoreAndForwardEnabled() and getConfiguredSenderId() introspection accessors (additive, no behavior change). The pool also probes the config once at construction so a bad config fails eagerly even when minSize == 0. - PooledSender: carry a stable slotIndex (-1 when SF is off). - SenderPool: per-slot sender_id derivation, slot-index allocator, and flock-safe slot lifecycle via closingSlots. Tests: new SenderPoolSfTest (12 tests) covering the two-concurrent-sender repro, grow-to-max coexistence, configured-base slot naming, slot reuse, reap-and-reuse, repeated-saturation invariant, end-to-end ingest, the cross-pool flock guard, distinct-base sharing, recovery replay through a re-adopted slot, and a concurrent borrow/return stress test. Existing non-SF SenderPoolTest (17) and the WS lifecycle suites remain green.
…m orphan drain
Pool teardown Error-safety (M1):
Best-effort teardown paths caught only RuntimeException, but a delegate
close()/worker shutdown() can throw an Error (e.g. an -ea AssertionError,
rethrown as-is by QwpWebSocketSender.close via rethrowTerminal). Such an
Error escaped and caused a permanent capacity leak:
- SenderPool.discardBroken: skipped freeSlotIndex + closingSlots--, so
an SF slot stayed reserved forever (slotInUse stuck true).
- SenderPool.reapIdle: aborted the close loop (sibling flocks leaked) and
skipped the slot-release block (all reaped indices stranded).
- PoolHousekeeper: the Error killed the daemon thread, stopping all
future reaping.
The understated capacity (cap check uses closingSlots) eventually forced
borrow() to only ever time out.
Fix: catch Throwable at every delegate close()/worker shutdown() site and
move the slot accounting into a finally so it always runs. Widen the
housekeeper's reapIdle guards to Throwable so an Error can never kill the
thread. Apply the same discipline to QueryClientPool (its close() loop had
no guard at all). Add regression tests that inject an AssertionError into
delegate teardown via a Proxy delegate swap and assert the reap/close loops
survive it, close every delegate, and leave the pool fully usable.
Orphan-drain namespace exclusion:
Add LineSenderBuilder.orphanDrainExcludePrefix / OrphanScanner.scan overload
so a pooled SF sender's startup drainer never adopts a sibling pool slot's
dir/lock (which would resurrect "sf slot already in use"). The pool sets the
exclude prefix to its own "<base>-" namespace; foreign leftovers are still
drained.
The pool's invariant was "delegate.close() returned => SF flock released", so discardBroken()/close() called freeSlotIndex() in a finally regardless. But QwpWebSocketSender.close() has an early-return path (ioThreadStopped becomes false when cursorSendLoop.close() throws) that returns normally before reaching cursorEngine.close() -- the only flock release. The pool would then free the index and hand a still-locked dir to the next borrow, resurrecting "sf slot already in use" permanently. Make the flock-release a queryable postcondition and retire (not reuse) a slot whose delegate did not release it: - QwpWebSocketSender: add slotLockReleased flag (set true only after the cursorEngine.close() block) + isSlotLockReleased() accessor. - SenderPool: add leakedSlots counter (folded into the borrow cap check); freeSlotIndex only when flockReleased() confirms the lock dropped, otherwise move the slot from closingSlots to leakedSlots and keep the index reserved permanently so no borrow ever reuses a locked dir. Releasing the flock early would be worse (a concurrent borrower could corrupt the slot dir the zombie I/O thread is still writing), so the slot is leaked rather than reused. Add SenderPoolSfTest.testSlotLeakedWhenDelegateCloseDoesNotReleaseFlock: forges the not-released symptom, routes the wrapper through discardBroken, and asserts the index stays reserved, a fresh dir is used, and capacity is permanently reduced. Verified it fails when the gate is neutered.
…erver API main #47 (a02732c) reworked TestWebSocketServer to bind an OS-assigned ephemeral port at construction and dropped the (int port, handler) ctor in favour of (handler) + getPort(). This PR's new SenderPoolSfTest predated that change and still called new TestWebSocketServer(port, handler), so the PR-merge-into-main build failed testCompile with 18x 'int cannot be converted to WebSocketServerHandler'. Merge main and switch all 18 call sites to the new API: drop TestPorts.findUnusedPort(), construct with the handler only, and read the bound port via server.getPort() inside the try block. Removes the now-unused TestPorts import. All 15 SenderPoolSfTest tests pass.
When a pooled store-and-forward sender's delegate close() returns with the slot flock still held (the I/O thread refused to stop), SenderPool retires the slot index permanently. Until now this happened silently: SenderPool had no logger, so a pool slowly bleeding capacity degraded to "every borrow() times out" with nothing in the logs to explain why. - SenderPool: add an SLF4J logger and emit a WARN at both leakedSlots++ sites (discardBroken + reapIdle) naming the retired slot index and the reduced effective capacity. - SenderPool: add public leakedSlotCount() accessor for metrics/tests. - Sender.build(): treat a benign concurrent mkdir(sf_dir) race (EEXIST, dir now exists) as success instead of failing the loser. - Tests: testLeakedSlotIsObservable (captures the WARN via a logback appender + asserts leakedSlotCount()) and testConcurrentFirstBorrowsWithMinZeroRaceOnSfDir (min=0 first-borrow race on sf_dir). module-info: test reads ch.qos.logback.core.
…meException The teardown-hardening work widened best-effort cleanup catches to Throwable but left the four outer catches that wrap the heavy native build/connect path (createUnlocked/build/connect/start) at catch (RuntimeException). The project runs with -ea, so those paths can throw an Error (AssertionError, OOM) that skips cleanup: - SenderPool ctor prewarm: an Error stranded already-built SF delegates (flock + mmap'd ring + I/O thread), resurrecting 'sf slot already in use'. - QueryClientPool ctor prewarm: an Error stranded pre-warmed worker threads. - QueryClientPool.acquire(): an Error skipped inFlightCreations--, permanently shrinking capacity until every acquire() timed out. - QueryClientPool.createUnlocked(): an Error skipped client.close(), leaking the field-initialised NATIVE_DEFAULT scratch. Widen all four to catch (Throwable e), matching SenderPool.borrow(). The inner cleanup loops were already Throwable-safe and the bodies already rethrow. Add red/green regression tests for each site. QwpQueryClient is concrete, so a package-private connectHook seam injects an Error at the real connect step (fromConfig still allocates, so NATIVE_DEFAULT leak assertions are meaningful); Sender is an interface, faked via Proxy behind a package-private senderFactory seam. Both seams are reached by reflection through package-private constructors (main module is open); production callers pass null for real behaviour. Each test was confirmed to go red when its catch is reverted to RuntimeException.
…axSize doesn't strand SF data A pooled SF sender excluded its whole "<base>-" namespace from orphan draining, while the pool only ever issues slot indices in [0, maxSize). If a deployment restarted with a smaller maxSize (e.g. 4->2) while default-2/default-3 still held unacked .sfa data, those slots were neither re-created (out of index range) nor drained (matched the excluded prefix) -- so even with drain_orphans=on the data was silently never recovered. Replace the coarse prefix exclusion with an exact managed-slot exclusion bounded by maxSize: - OrphanScanner.scan(sfDir, exclude, managedBase, managedSlotCount) excludes only <base>-<i> for canonical i in [0, managedSlotCount). Same-base indices >= count, and non-canonical suffixes (default-007, default-foo), are treated like foreign leftovers and drained through the existing flock-safe background drainer. - Sender builder hook orphanDrainExcludePrefix(String) -> orphanDrainExcludeManagedSlots(String base, int slotCount). - SenderPool passes (slotBaseId, maxSize). In-range live slots are still excluded, preserving the anti-collision guarantee the per-slot ids were added for. A base change keeps working (old dirs become foreign and drain). Tests (verified red without the fix): - OrphanScannerTest: bounded in-range exclusion, out-of-range drain after shrink, non-canonical names drained, disabled-when-count<=0, isManagedSlot predicate matrix. - SenderPoolSfTest#testShrinkingMaxSizeDrainsStrandedOutOfRangeSlots: seeds unacked data into default-0..3 via a maxSize=4 pool, restarts at maxSize=2 with drain_orphans=on, asserts default-2/3 are recovered (no .sfa left, no .failed sentinel).
PooledSender.close() wrapped delegate.flush() in catch (RuntimeException), so an Error (AssertionError under -ea, OutOfMemoryError, ...) left broken unset and the failed sender was returned to the pool via giveBack() as healthy. Since Sender does not clear its buffer on flush failure, the next borrower would inherit the unsent rows or a dead connection. Invert the flag to track normal completion instead: flush() must complete to set flushed=true, otherwise discardBroken() runs. This treats any abnormal exit (RuntimeException or Error) as unrecyclable, needs no explicit rethrow (the original throwable propagates naturally), and matches the catch-Throwable posture already used elsewhere in the pool.
The QuestDBImpl constructor orchestrates SenderPool and QueryClientPool construction: it builds the SenderPool first, then the QueryClientPool, then starts the housekeeper, with a cleanup catch that closes whatever was already built on failure. That catch was catch (RuntimeException). Both pool constructors run heavy native build/connect paths (mmap, flock, WebSocket connect) that can throw an Error under -ea (AssertionError) or via OOM in production. The teardown-hardening work already widened SenderPool, QueryClientPool, and their prewarm catches to catch (Throwable) for exactly this reason, but the orchestrator that ties them together was missed: an Error from new QueryClientPool(...) skipped the narrow catch, so the already-built SenderPool was never closed -- stranding its prewarmed delegates' flocks, mmap'd rings, and I/O threads. This is the precise leak class the hardening work exists to kill, reachable under -ea (how QuestDB tests run) and via OOM in production. Widen the cleanup catch to catch (Throwable e), matching the pools it calls. Add a red/green regression test. A package-private QuestDBImpl seam constructor (mirroring the SenderPool/QueryClientPool seams) threads the senderFactory and connectHook through to the pools; production callers pass null for both. The test fakes a Sender via Proxy (close() flips a flag, senderMin=1) and injects an AssertionError from the QueryClientPool connect hook (queryMin=1); the fake delegate's close() flag is the discriminator. Confirmed red against catch (RuntimeException) and green against catch (Throwable).
Every pooled SF sender's orphan drainer excludes the pool's whole [0, maxSize) managed slot range (orphanDrainExcludeManagedSlots) so a sibling never adopts a slot dir/lock the pool intends to (re)create -- that exclusion is what keeps the per-slot ids from resurfacing "sf slot already in use". The side effect: an in-range slot left holding unacked data by a previous run was recovered ONLY when the pool happened to (re)create that index. Because the pool pre-warms [0, minSize) and builds [minSize, maxSize) lazily at the lowest free index on demand, a high in-range slot under steady low load was never rebuilt -- neither drained (excluded) nor recovered -- so its store-and-forward data sat stranded on disk (durable, but undelivered) until a restart or a load spike. This was strictly weaker than the drain_orphans=on contract a standalone sender gives, and the Javadoc quietly assumed the slot would be recreated. Close the gap without touching the exclusion (widening it would re-open the cannibalization race). The pool now recovers its own stranded managed slots once, at construction, while it is still single-threaded and unpublished: for each in-range slot dir that holds unacked data and is not already live (prewarmed), it reserves the index, builds a sender on it (which re-adopts and recovers the slot), drains the recovered frames, then closes it and frees the index. Reserving the index for the duration means no concurrent borrow can target the dir, so the cannibalization race the exclusion guards against cannot occur here either. Every step is best-effort: a clean slot is a cheap directory probe; an unreachable server, slow drain, or build/close Error is logged and never fails construction (the data stays durable on disk for a later attempt), and a build/connect failure short-circuits the scan to avoid paying a connect timeout per slot. Add getConfiguredSfDir() to LineSenderBuilder so the pool can locate its group root, mirroring the existing getConfiguredSenderId() hook. Add a red/green regression test: seed default-0..3 in a busy maxSize=4 run, then restart at the SAME maxSize=4 with minSize=0 and steady low load (a single borrow). Before the fix all four in-range slots stayed stranded (test waits 15s and fails); after the fix startup recovery empties every slot and replays the frames to the server. Confirmed red with the recovery call disabled and green with it enabled; full SenderPool/SF/orphan suites pass unchanged.
Commit 7118264 made PooledSender.close() route any abnormal flush() exit (RuntimeException OR Error) to discardBroken instead of recycling, but shipped without a test that exercises the Error branch -- the existing flush-failure tests drive a LineSenderException (a RuntimeException), which the pre-fix catch (RuntimeException) already handled, so they pass against both pre- and post-fix code. Add a white-box test via the package-private senderFactory seam: inject a delegate whose flush() throws an AssertionError, then assert the wrapper is discarded (next borrow() returns a fresh instance) rather than recycled. Verified RED against 7118264^ (fails on assertNotSame -- the broken wrapper is handed back to the next borrower) and GREEN on the current tree.
SenderPool.borrow() reserves an SF slot index (allocateSlotIndex) before building the delegate and must return it (freeSlotIndex) if createUnlocked() throws, or slotInUse[idx] stays stuck true: capacity is permanently lowered and a later borrow() trips "no free SF slot index" / eventually only times out -- the failure mode this PR fixes. The pre-existing error-injection test fails in the constructor pre-warm loop with a non-SF config (slotIndex == -1), so the borrow path and the SF (slotIndex >= 0) case were uncovered. Inject (via the senderFactory seam) an SF-config pool whose factory throws on the first borrow-triggered build and succeeds afterwards; assert the throwable propagates AND a subsequent borrow() reuses the slot (capacity intact). Verified RED by removing freeSlotIndex(slotIndex) from the borrow catch (2nd borrow() throws IllegalStateException "no free SF slot index"), GREEN on the current tree.
Contributor
[PR Coverage check]😍 pass : 165 / 205 (80.49%) file detail
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
SenderPoolbuilds every sender from one immutable config string. When store-and-forward (SF) is enabled (sf_dirset), every sender therefore inherited the samesender_id(default"default"), pointed at the same slot dir<sf_dir>/default, and took an exclusiveflockthere for its lifetime. The secondborrow()died on the lock:Net effect: an SF pool could hold at most one live sender — pooling + SFA was effectively broken. The code even documented the footgun (
Sender.java: "multi-sender setups must set this explicitly or the second sender will fail") but the pool had no way to satisfy it with a single config string.Fix
When SF is enabled, the pool now hands each sender a distinct, stable slot id
<base>-<index>:<base>= the configuredsender_id(default"default"), so two pools/processes can share onesf_dirby using distinctsender_ids.<index>= a pool slot index in[0, maxSize), recycled lowest-free-first.Because indices are deterministic and reused, a slot dir is re-adopted across restarts and any unacked data it holds is recovered on creation (recovery semantics rely on stable ids — random UUIDs would orphan the data).
Non-SF paths (HTTP / TCP / memory-mode) are unchanged —
slotIndexis-1and no slot bookkeeping runs. The cross-writer guard the slot lock exists for is preserved: two pools sharing onesf_dirwith the same base still fail fast.Flock-safe slot lifecycle
A slot index is only returned to the free set after its delegate releases the
flock. The pool tracks the in-between states with two counters, both folded into the capacity check:closingSlots— a slot whose delegate is mid-close()and still releasing its flock. Transient: once the flock drops, the index goes back to the free set. This keeps a concurrentborrow()from reclaiming a dir whose lock is still held.leakedSlots— a slot whose delegateclose()returned with the flock still held (the I/O thread refused to stop, observed via the newQwpWebSocketSender.isSlotLockReleased()). The index is retired permanently — never freed, never reused — so no borrow ever hands out a still-locked dir. The lost capacity is accounted for in the cap math, so the pool degrades gracefully (effective max shrinks) instead of resurrecting "sf slot already in use".A permanent retirement is no longer silent:
SenderPoolnow has a logger and emits aWARNat both retire sites (discardBroken+reapIdle) naming the slot index and the reduced effective capacity, and exposes the count via a publicleakedSlotCount()accessor for metrics. Previously the pool had no logger at all, so a pool bleeding capacity degraded to "everyborrow()times out" with nothing in the logs to explain why.Orphan-drain coexistence
The pool co-manages every
<base>-<index>slot and recovers each slot's unacked data when it (re)creates it. To stop a sibling sender's startup drainer from adopting another pool slot's dir/lock (which would resurface "sf slot already in use"), the pool fences off its own<base>-namespace from orphan draining:LineSenderBuilder.orphanDrainExcludePrefix(String)builder option,OrphanScanner.scan(sfDir, excludeSlotName, excludeNamePrefix)overload,build()passes the configured exclude-prefix into the orphan scan.This is a no-op unless
drain_orphans=on; foreign leftovers under other names are still drained.Teardown hardening (Error-safe)
createUnlocked()runs a heavy native build path (mmap, flock, WebSocket connect) and delegateclose()can throw anError(e.g. an-eaAssertionError,OutOfMemoryError), not just aRuntimeException. Best-effort teardown and the reaper daemon are therefore widened fromcatch (RuntimeException)tocatch (Throwable)so a strayErrorcan't strand sibling flocks, over-count slot bookkeeping, or kill the housekeeper thread mid-flight. Hardened across 3 files:SenderPool,QueryClientPool, andPoolHousekeeper.The pool builds senders outside its lock, so two first-borrows sharing one
sf_dircan race intomkdir(sf_dir); the loser previously failed with a spuriouscould not create sf_dir.build()now treats a benign creation race (the dir exists afterwards) as success.Changes
Sender.LineSenderBuilder— additive introspection accessorsisStoreAndForwardEnabled()andgetConfiguredSenderId(), plus theorphanDrainExcludePrefix(...)option;build()wires the exclude-prefix into the orphan scan. The pool probes the config once at construction, so a bad config now fails eagerly even whenminSize == 0.QwpWebSocketSender— exposesisSlotLockReleased()so the pool can tell whetherclose()actually dropped the flock.OrphanScanner— new 3-argscan(...)overload with a name-prefix exclusion.PooledSender— carries a stableslotIndex.SenderPool— per-slotsender_idderivation, slot-index allocator, flock-safe slot lifecycle viaclosingSlots/leakedSlots, orphan-drain namespace fencing, Throwable-safe teardown, and a logger thatWARNs on permanent slot retirement (plus aleakedSlotCount()accessor).Sender.build()— tolerate a benign concurrentmkdir(sf_dir)race.QueryClientPool/PoolHousekeeper— Throwable-safe teardown / reap loop.Tests
New
SenderPoolSfTest(17 tests):sender_idused as slot basedefault-Ndrift)closingSlotsinvariant guard)sf_dir+base fails fastsf_dirclose()releases all slots (re-acquirable)close()leaves the flock held (leakedSlotscapacity-loss guard)drain_orphanspool does not cannibalize sibling pool slotsdrain_orphanspool still drains genuinely foreign orphansWARN(captured via a logback appender) and bumpsleakedSlotCount()min=0race onsf_dir(benignmkdirrace)Plus new
OrphanScannerTest(11 tests) covering the prefix-exclusion overload, andSenderPoolTestgains additional non-SF coverage.Verification
SenderPoolSfTest: 17/17 ✅OrphanScannerTest: 11/11 ✅SenderPoolTest(incl. existing non-SF): 19/19 ✅ (no regression)sf.**/sf.cursor.**suites ✅