diff --git a/core/src/main/java/io/questdb/client/Sender.java b/core/src/main/java/io/questdb/client/Sender.java index 8e9513b1..bd536e6b 100644 --- a/core/src/main/java/io/questdb/client/Sender.java +++ b/core/src/main/java/io/questdb/client/Sender.java @@ -1022,6 +1022,15 @@ final class LineSenderBuilder { // runtime lands in a follow-up commit. For now we surface the // count via logging so users can confirm orphans are being seen. private boolean drainOrphans = false; + // Orphan-scan exclusion for the connection pool. The pool co-manages + // exactly - for i in [0, orphanDrainSlotCount) and + // recovers each of those on (re)creation, so pooled senders must never + // treat one another's live slots as drainable orphans. Anything else -- + // a different base, a bare un-suffixed id, OR a same-base index at or + // above the count (a slot left behind by a larger pool before maxSize + // shrank) -- is still drained, so unacked data is never stranded. + private String orphanDrainBase; + private int orphanDrainSlotCount; private long durableAckKeepaliveIntervalMillis = DURABLE_ACK_KEEPALIVE_NOT_SET; // Optional user-supplied async error handler. When null, the sender // uses DefaultSenderErrorHandler.INSTANCE (loud-not-silent log). @@ -1472,7 +1481,15 @@ public Sender build() { } else { if (!Files.exists(sfDir)) { int rc = Files.mkdir(sfDir, Files.DIR_MODE_DEFAULT); - if (rc != 0) { + // mkdir is non-zero on failure, but "already exists" + // is one such failure. Multiple SF senders sharing one + // sf_dir can be built concurrently (the pool calls + // build() outside its lock), so two threads can both + // pass the exists() check and race into mkdir; the + // loser gets EEXIST. Treat a benign creation race -- + // the dir now exists -- as success and only fail when + // the directory is genuinely absent afterwards. + if (rc != 0 && !Files.exists(sfDir)) { throw new LineSenderException( "could not create sf_dir: " + sfDir + " rc=" + rc); } @@ -1548,7 +1565,7 @@ public Sender build() { if (drainOrphans && sfDir != null) { io.questdb.client.std.ObjList orphans = io.questdb.client.cutlass.qwp.client.sf.cursor.OrphanScanner - .scan(sfDir, senderId); + .scan(sfDir, senderId, orphanDrainBase, orphanDrainSlotCount); if (orphans.size() > 0) { org.slf4j.LoggerFactory.getLogger(LineSenderBuilder.class) .info("dispatching drainers for {} orphan slot(s) under {} " @@ -2471,6 +2488,71 @@ public LineSenderBuilder senderId(String id) { return this; } + /** + * The slot id ({@code sender_id}) currently configured on this + * builder, either parsed from the config string or left at its + * {@code "default"} default. Introspection hook for the connection + * pool, which derives a distinct per-slot id from this base so that + * multiple pooled senders sharing one {@code sf_dir} don't collide + * on the slot {@code flock}. + */ + public String getConfiguredSenderId() { + return senderId; + } + + /** + * The store-and-forward group root ({@code sf_dir}) currently + * configured on this builder, or {@code null} when SF is disabled. + * Introspection hook for the connection pool, which needs the group + * root to locate its own managed slot dirs {@code /-} + * when recovering unacked data a previous run left behind. + */ + public String getConfiguredSfDir() { + return sfDir; + } + + /** + * Excludes the connection pool's live slot set from + * {@link #drainOrphans(boolean)} scanning: a sibling slot under + * {@code sf_dir} named {@code -} with + * {@code 0 <= index < slotCount} is never treated as a drainable orphan. + *

+ * Internal introspection hook for the connection pool. The pool gives + * each pooled SF sender a distinct slot id {@code -} and + * recovers each slot's unacked data itself when it (re)creates that + * slot. Without this exclusion, one pooled sender's startup drainer + * could adopt a sibling pool slot's lock and dir, reintroducing the + * very "sf slot already in use" collision the per-slot ids were added + * to prevent. + *

+ * Unlike a blanket {@code -} prefix exclusion, the bound is the + * pool's {@code maxSize}: a same-base slot whose index is at or above + * {@code slotCount} (e.g. {@code -3} left behind by a larger pool + * before {@code maxSize} shrank from 4 to 2) is NOT excluded and is + * drained like any foreign leftover, so its unacked data is recovered + * instead of being silently stranded. Foreign leftovers (a different + * base, or a bare un-suffixed id) are also still drained. + *

+ * Pass a {@code null}/empty base or {@code slotCount <= 0} to disable + * the exclusion (the default). + */ + public LineSenderBuilder orphanDrainExcludeManagedSlots(String base, int slotCount) { + this.orphanDrainBase = base; + this.orphanDrainSlotCount = slotCount; + return this; + } + + /** + * True iff store-and-forward is enabled (an {@code sf_dir} was set). + * Introspection hook for the connection pool: SF senders own an + * exclusive on-disk slot, so each pooled sender needs its own slot + * id, whereas non-SF (memory-mode / HTTP / TCP) senders share no + * such resource and need no per-slot identity. + */ + public boolean isStoreAndForwardEnabled() { + return sfDir != null; + } + /** * Per-call deadline for {@code Sender.flush()} spinning on a full * cursor segment ring waiting for ACKs to drain space. Default diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index e34a1923..782f6a84 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -234,6 +234,12 @@ public class QwpWebSocketSender implements Sender { private Sender.InitialConnectMode initialConnectMode = Sender.InitialConnectMode.OFF; private boolean ownsCursorEngine; private long pendingBytes; + // Set true by close() once the SF slot flock has been released (the normal + // teardown path). Stays false if close() bailed early with the I/O thread + // still running -- then cursorEngine.close() never ran and the flock is + // still held, so the owning pool MUST keep the slot reserved rather than + // hand the still-locked dir to the next borrow ("sf slot already in use"). + private boolean slotLockReleased; private int pendingRowCount; private SenderProgressDispatcher progressDispatcher; // Async-delivery sink for ack-watermark advances. Default no-op; a @@ -1087,6 +1093,10 @@ public void close() { cursorEngine = null; ownsCursorEngine = false; } + // Past the ioThreadStopped guard => cursorEngine.close() ran and + // released the SF flock in its finally (or this sender owned no + // engine holding one). Signal the pool it may reuse the slot. + slotLockReleased = true; // Shutdown order: dispatcher last, after the I/O loop has stopped // producing into it. close() drains pending entries with a short @@ -1129,6 +1139,16 @@ public void close() { } } + /** + * True once {@link #close()} has released the store-and-forward slot + * flock. False means close() leaked the still-running I/O thread (and its + * resources), so the flock is still held; the owning pool must keep the + * slot index reserved instead of reusing the still-locked slot dir. + */ + public boolean isSlotLockReleased() { + return slotLockReleased; + } + @Override public Sender decimalColumn(CharSequence name, Decimal64 value) { checkNotClosed(); diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/OrphanScanner.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/OrphanScanner.java index ba29779d..250ac8ff 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/OrphanScanner.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/OrphanScanner.java @@ -75,10 +75,28 @@ private OrphanScanner() { * "no orphans" answer in that case. */ public static ObjList scan(String sfDir, String excludeSlotName) { + return scan(sfDir, excludeSlotName, null); + } + + /** + * As {@link #scan(String, String)}, but additionally skips any child + * whose name starts with {@code excludeNamePrefix} (when non-null and + * non-empty). + *

+ * This is how a connection pool keeps the scanner from treating its own + * sibling slots as orphans: every pooled SF sender lives at + * {@code /-}, and the pool itself recovers each + * slot's unacked data when it (re)creates that slot — so the pool's + * whole {@code -} namespace must be excluded from auto-drain. + * Genuine foreign leftovers (a different base, or a bare un-suffixed + * id) do not match the prefix and are still reported. + */ + public static ObjList scan(String sfDir, String excludeSlotName, String excludeNamePrefix) { ObjList orphans = new ObjList<>(); if (sfDir == null || !Files.exists(sfDir)) { return orphans; } + boolean hasPrefix = excludeNamePrefix != null && !excludeNamePrefix.isEmpty(); long find = Files.findFirst(sfDir); if (find < 0) { LOG.warn("orphan scan could not enumerate {} — treating as no orphans, " @@ -99,6 +117,76 @@ public static ObjList scan(String sfDir, String excludeSlotName) { if (excludeSlotName != null && excludeSlotName.equals(name)) { continue; } + if (hasPrefix && name.startsWith(excludeNamePrefix)) { + continue; + } + String slotPath = sfDir + "/" + name; + if (!isCandidateOrphan(slotPath)) { + continue; + } + orphans.add(slotPath); + } + } finally { + Files.findClose(find); + } + return orphans; + } + + /** + * As {@link #scan(String, String)}, but excludes only the exact + * set of slot dirs a connection pool can re-create and self-recover: + * {@code -} for {@code 0 <= i < managedSlotCount}. + *

+ * This is the precise replacement for the coarser prefix exclusion + * ({@link #scan(String, String, String)}). The prefix form fences off the + * whole {@code -} namespace, which silently strands unacked + * data after a {@code maxSize} shrink across restarts: a slot like + * {@code -3} left over from a larger pool is neither re-created (out + * of the new {@code [0,maxSize)} index range) nor drained (it matched the + * excluded prefix). By bounding the exclusion to {@code [0,managedSlotCount)}, + * any same-base slot with an index at or above {@code managedSlotCount} is + * treated like a foreign leftover and becomes a drainable orphan, so its + * data is recovered through the normal drain path. + *

+ * Only canonical, pool-minted names are excluded: the suffix after + * {@code -} must be a canonical non-negative decimal + * ({@code 0,1,2,...} with no leading zeros, sign, or non-digits). Anything + * else under the same base ({@code -foo}, {@code -007}) is not a + * name the pool creates and is reported as a candidate. + *

+ * When {@code managedBase} is null/empty or {@code managedSlotCount <= 0} + * no exclusion is applied (every sibling with data is a candidate). + */ + public static ObjList scan(String sfDir, String excludeSlotName, String managedBase, int managedSlotCount) { + ObjList orphans = new ObjList<>(); + if (sfDir == null || !Files.exists(sfDir)) { + return orphans; + } + boolean hasManaged = managedBase != null && !managedBase.isEmpty() && managedSlotCount > 0; + String managedPrefix = hasManaged ? managedBase + "-" : null; + long find = Files.findFirst(sfDir); + if (find < 0) { + LOG.warn("orphan scan could not enumerate {} \u2014 treating as no orphans, " + + "but this may indicate a permission or transient error", sfDir); + return orphans; + } + if (find == 0) { + return orphans; + } + try { + int rc = 1; + while (rc > 0) { + String name = Files.utf8ToString(Files.findName(find)); + rc = Files.findNext(find); + if (name == null || ".".equals(name) || "..".equals(name)) { + continue; + } + if (excludeSlotName != null && excludeSlotName.equals(name)) { + continue; + } + if (hasManaged && isManagedSlot(name, managedPrefix, managedSlotCount)) { + continue; + } String slotPath = sfDir + "/" + name; if (!isCandidateOrphan(slotPath)) { continue; @@ -111,6 +199,50 @@ public static ObjList scan(String sfDir, String excludeSlotName) { return orphans; } + /** + * True iff {@code name} is a slot the pool actively co-manages, i.e. + * {@code } where {@code i} is a canonical non-negative + * decimal in {@code [0, managedSlotCount)}. Visible for testing. + */ + public static boolean isManagedSlot(String name, String managedPrefix, int managedSlotCount) { + if (name == null || managedPrefix == null || !name.startsWith(managedPrefix)) { + return false; + } + int idx = parseCanonicalIndex(name, managedPrefix.length()); + return idx >= 0 && idx < managedSlotCount; + } + + /** + * Parses the canonical non-negative decimal that makes up the rest of + * {@code name} from {@code from}. Returns {@code -1} for an empty suffix, + * a non-digit, a leading zero (e.g. {@code "007"}), or anything that would + * overflow {@code int}. Only the exact form the pool emits + * ({@code Integer.toString(index)}) is accepted, so foreign or malformed + * same-base names never get mistaken for a managed slot. + */ + private static int parseCanonicalIndex(String name, int from) { + int len = name.length(); + if (from >= len) { + return -1; + } + // Reject leading zeros unless the whole suffix is exactly "0". + if (name.charAt(from) == '0' && len - from > 1) { + return -1; + } + long acc = 0; + for (int i = from; i < len; i++) { + char c = name.charAt(i); + if (c < '0' || c > '9') { + return -1; + } + acc = acc * 10 + (c - '0'); + if (acc > Integer.MAX_VALUE) { + return -1; + } + } + return (int) acc; + } + /** * True iff {@code slotPath} looks like a slot dir with unacked data * and no failure sentinel. Visible for testing. diff --git a/core/src/main/java/io/questdb/client/impl/PoolHousekeeper.java b/core/src/main/java/io/questdb/client/impl/PoolHousekeeper.java index ecdd6fc9..60ea2a41 100644 --- a/core/src/main/java/io/questdb/client/impl/PoolHousekeeper.java +++ b/core/src/main/java/io/questdb/client/impl/PoolHousekeeper.java @@ -80,12 +80,15 @@ private void runLoop() { } try { senderPool.reapIdle(); - } catch (RuntimeException ignored) { + } catch (Throwable ignored) { // Reaping must not propagate -- it's best-effort housekeeping. + // Catch Throwable (not just RuntimeException) so an Error from a + // delegate teardown can never kill this daemon thread and stop + // all future reaping for the life of the handle. } try { queryPool.reapIdle(); - } catch (RuntimeException ignored) { + } catch (Throwable ignored) { } } } diff --git a/core/src/main/java/io/questdb/client/impl/PooledSender.java b/core/src/main/java/io/questdb/client/impl/PooledSender.java index 9e2dbbb6..61d89296 100644 --- a/core/src/main/java/io/questdb/client/impl/PooledSender.java +++ b/core/src/main/java/io/questdb/client/impl/PooledSender.java @@ -50,13 +50,21 @@ public final class PooledSender implements Sender { private final long createdAtMillis; private final Sender delegate; private final SenderPool pool; + // Index of the store-and-forward slot this wrapper owns within the pool, + // or -1 when SF is disabled. Stable for the wrapper's whole life; the + // pool returns it to the free set only when the wrapper is evicted from + // {@code all} (discardBroken / reapIdle). Used to derive a distinct + // {@code sender_id} per pooled sender so concurrent SF senders sharing + // one {@code sf_dir} never collide on the slot {@code flock}. + private final int slotIndex; private volatile long idleSinceMillis; private volatile boolean inUse; private volatile boolean invalidated; - PooledSender(Sender delegate, SenderPool pool) { + PooledSender(Sender delegate, SenderPool pool, int slotIndex) { this.delegate = delegate; this.pool = pool; + this.slotIndex = slotIndex; this.createdAtMillis = System.currentTimeMillis(); this.idleSinceMillis = this.createdAtMillis; } @@ -148,17 +156,15 @@ public void close() { if (!inUse) { return; } - boolean broken = false; + // Track normal completion rather than catching a specific throwable + // type. flush() can exit abnormally with an Error (AssertionError + // under -ea, OutOfMemoryError, ...) as well as a RuntimeException; + // keying the recycle decision off normal completion treats every + // abnormal exit as unrecyclable, which is the fail-safe default. + boolean flushed = false; try { delegate.flush(); - } catch (RuntimeException e) { - // Sender does not clear its buffer on flush failure (see - // Sender Javadoc), and WebSocket transport latches the failure - // for good. Either way, the wrapper is unsafe to recycle: the - // next borrower would inherit the failed rows or a dead - // connection. - broken = true; - throw e; + flushed = true; } finally { inUse = false; // Clear the pin BEFORE returning the slot. If we cleared @@ -167,10 +173,17 @@ public void close() { // re-pin on this thread would return the (now in-use) // wrapper -- the same race this clear is meant to close. pool.clearPinIfCurrent(this); - if (broken) { - pool.discardBroken(this); - } else { + if (flushed) { pool.giveBack(this); + } else { + // flush() did not complete normally. Sender does not clear + // its buffer on flush failure (see Sender Javadoc), and + // WebSocket transport latches the failure for good. Either + // way the wrapper is unsafe to recycle: the next borrower + // would inherit the failed rows or a dead connection. The + // original throwable propagates naturally once this finally + // returns -- no explicit rethrow needed. + pool.discardBroken(this); } } } @@ -372,6 +385,10 @@ long createdAtMillis() { return createdAtMillis; } + int slotIndex() { + return slotIndex; + } + Sender delegate() { return delegate; } diff --git a/core/src/main/java/io/questdb/client/impl/QueryClientPool.java b/core/src/main/java/io/questdb/client/impl/QueryClientPool.java index 8191f162..5ee12a26 100644 --- a/core/src/main/java/io/questdb/client/impl/QueryClientPool.java +++ b/core/src/main/java/io/questdb/client/impl/QueryClientPool.java @@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; /** * Elastic pool of {@link QueryWorker}s. Each worker pairs one @@ -52,6 +53,12 @@ public final class QueryClientPool { private final ArrayList all; private final ArrayDeque available; private final String configurationString; + // Test seam. Production connects via QwpQueryClient.connect(); white-box + // tests in io.questdb.client.test.impl reach the package-private constructor + // by reflection to inject a hook that throws a non-RuntimeException + // Throwable (e.g. an -ea AssertionError) from the native connect path, + // exercising the Error-safe cleanup on the prewarm and acquire paths. + private final Consumer connectHook; private final long idleTimeoutMillis; private final ReentrantLock lock = new ReentrantLock(); private final long maxLifetimeMillis; @@ -69,10 +76,28 @@ public QueryClientPool( long acquireTimeoutMillis, long idleTimeoutMillis, long maxLifetimeMillis + ) { + this(configurationString, minSize, maxSize, acquireTimeoutMillis, + idleTimeoutMillis, maxLifetimeMillis, null); + } + + // Package-private constructor exposing the connectHook test seam: production + // passes null (-> the real QwpQueryClient.connect()). White-box tests in + // io.questdb.client.test.impl reach this by reflection to inject a hook that + // throws a non-RuntimeException Throwable from the native connect path. + QueryClientPool( + String configurationString, + int minSize, + int maxSize, + long acquireTimeoutMillis, + long idleTimeoutMillis, + long maxLifetimeMillis, + Consumer connectHook ) { if (minSize < 0 || maxSize < 1 || minSize > maxSize) { throw new IllegalArgumentException("invalid pool sizing: min=" + minSize + ", max=" + maxSize); } + this.connectHook = connectHook != null ? connectHook : QwpQueryClient::connect; this.configurationString = configurationString; this.minSize = minSize; this.maxSize = maxSize; @@ -91,11 +116,20 @@ public QueryClientPool( available.add(w); built++; } - } catch (RuntimeException e) { + } catch (Throwable e) { + // Catch Throwable, not just RuntimeException: createUnlocked()/start() + // run a heavy native build path that can throw an Error -- e.g. an + // -ea AssertionError or OutOfMemoryError -- mid-prewarm. If we only + // caught RuntimeException the Error would propagate without running + // the cleanup below, stranding every already-built worker's I/O + // thread and native allocations. for (int i = 0; i < built; i++) { try { all.get(i).shutdown(); - } catch (RuntimeException ignored) { + } catch (Throwable ignored) { + // Best-effort cleanup: an Error (e.g. -ea AssertionError) + // from one worker's shutdown must not strand the remaining + // pre-warmed workers nor mask the original failure below. } } throw e; @@ -123,7 +157,15 @@ public QueryWorker acquire() { try { created = createUnlocked(); created.start(); - } catch (RuntimeException e) { + } catch (Throwable e) { + // Catch Throwable, not just RuntimeException: + // createUnlocked()/start() run a heavy native build path + // that can throw an Error -- e.g. an -ea AssertionError + // or OutOfMemoryError. If we only caught RuntimeException + // the Error would propagate with inFlightCreations still + // incremented, permanently shrinking pool capacity until + // every acquire() times out. Restoring the reservation + // for any throwable is safe. lock.lock(); inFlightCreations--; workerReleased.signal(); @@ -136,7 +178,9 @@ public QueryWorker acquire() { if (closed) { try { created.shutdown(); - } catch (RuntimeException ignored) { + } catch (Throwable ignored) { + // Best-effort: an Error from teardown must not mask + // the closed-pool signal. } throw new QueryException((byte) 0, "QuestDB handle is closed"); } @@ -180,7 +224,13 @@ public void close() { // join the worker threads and close their clients. Done outside the lock // so a slow join doesn't keep the pool latched. for (int i = 0; i < snapshot.size(); i++) { - snapshot.get(i).shutdown(); + try { + snapshot.get(i).shutdown(); + } catch (Throwable ignored) { + // Best-effort: a single worker's shutdown failure (including an + // Error such as an -ea AssertionError) must not abort the loop + // and strand the remaining workers unclosed. + } } } @@ -218,7 +268,10 @@ void reapIdle() { for (int i = 0, n = toShutdown.size(); i < n; i++) { try { toShutdown.get(i).shutdown(); - } catch (RuntimeException ignored) { + } catch (Throwable ignored) { + // Best-effort: a single worker's shutdown failure (including + // an Error such as an -ea AssertionError) must not abort the + // reap loop and strand the remaining reaped workers. } } } @@ -239,19 +292,36 @@ void release(QueryWorker w) { } } + // Package-private white-box accessor for tests: reports the current + // in-flight creation count under the pool lock. A non-zero value after a + // failed acquire() means the slot reservation was never released -- the + // capacity-shrink bug this guards against. + int inFlightCreations() { + lock.lock(); + try { + return inFlightCreations; + } finally { + lock.unlock(); + } + } + private QueryWorker createUnlocked() { QwpQueryClient client = QwpQueryClient.fromConfig(configurationString); try { - client.connect(); - } catch (RuntimeException e) { - // connect() may throw after QwpQueryClient.fromConfig() has already + connectHook.accept(client); + } catch (Throwable e) { + // Catch Throwable, not just RuntimeException: connect() runs a heavy + // native path that can throw an Error (e.g. an -ea AssertionError or + // OutOfMemoryError) after QwpQueryClient.fromConfig() has already // allocated native scratch (the QwpBindValues NativeBufferWriter is // field-initialised). Close the half-built client so its allocations - // are released, otherwise every connect failure during pool growth - // leaks NATIVE_DEFAULT bytes. + // are released, otherwise an Error during pool growth leaks the + // NATIVE_DEFAULT bytes that only this cleanup would reclaim. try { client.close(); - } catch (RuntimeException ignored) { + } catch (Throwable ignored) { + // Best-effort: an Error from closing the half-built client must + // not mask the original connect failure being rethrown below. } throw e; } diff --git a/core/src/main/java/io/questdb/client/impl/QuestDBImpl.java b/core/src/main/java/io/questdb/client/impl/QuestDBImpl.java index cc974ac1..752124b1 100644 --- a/core/src/main/java/io/questdb/client/impl/QuestDBImpl.java +++ b/core/src/main/java/io/questdb/client/impl/QuestDBImpl.java @@ -29,6 +29,10 @@ import io.questdb.client.Query; import io.questdb.client.Sender; import io.questdb.client.cutlass.qwp.client.QwpColumnBatchHandler; +import io.questdb.client.cutlass.qwp.client.QwpQueryClient; + +import java.util.function.Consumer; +import java.util.function.IntFunction; /** * Implementation of {@link QuestDB}. Owns the elastic {@link SenderPool} @@ -55,6 +59,31 @@ public QuestDBImpl( long idleTimeoutMillis, long maxLifetimeMillis, long housekeeperIntervalMillis + ) { + this(ingestConfig, queryConfig, senderMin, senderMax, queryMin, queryMax, + acquireTimeoutMillis, idleTimeoutMillis, maxLifetimeMillis, + housekeeperIntervalMillis, null, null); + } + + // Package-private constructor exposing the senderFactory and connectHook test + // seams: production passes null for both (-> the real native build/connect + // paths). White-box tests in io.questdb.client.test.impl reach this by + // reflection (the main module is declared `open`) to make SenderPool prewarm + // an observable delegate while QueryClientPool construction throws an Error, + // exercising the cleanup catch below. + QuestDBImpl( + String ingestConfig, + String queryConfig, + int senderMin, + int senderMax, + int queryMin, + int queryMax, + long acquireTimeoutMillis, + long idleTimeoutMillis, + long maxLifetimeMillis, + long housekeeperIntervalMillis, + IntFunction senderFactory, + Consumer connectHook ) { SenderPool builtSenderPool = null; QueryClientPool builtQueryPool = null; @@ -62,13 +91,24 @@ public QuestDBImpl( try { builtSenderPool = new SenderPool( ingestConfig, senderMin, senderMax, acquireTimeoutMillis, - idleTimeoutMillis, maxLifetimeMillis); + idleTimeoutMillis, maxLifetimeMillis, senderFactory); builtQueryPool = new QueryClientPool( queryConfig, queryMin, queryMax, acquireTimeoutMillis, - idleTimeoutMillis, maxLifetimeMillis); + idleTimeoutMillis, maxLifetimeMillis, connectHook); builtHousekeeper = new PoolHousekeeper(builtSenderPool, builtQueryPool, housekeeperIntervalMillis); builtHousekeeper.start(); - } catch (RuntimeException e) { + } catch (Throwable e) { + // Catch Throwable, not just RuntimeException: this orchestrator is the + // direct caller of the SenderPool and QueryClientPool constructors, + // both of which run heavy native build/connect paths that can throw an + // Error under -ea (AssertionError, OutOfMemoryError). The pools widened + // their own prewarm catches to Throwable for exactly this reason; if we + // only caught RuntimeException here, an Error from QueryClientPool + // construction (or the housekeeper start) would propagate without + // closing the already-built SenderPool, stranding its prewarmed + // delegates' flocks, mmap'd rings, and I/O threads -- the precise leak + // class this teardown-hardening work exists to kill. The cleanup below + // is best-effort and rethrows the original failure. if (builtHousekeeper != null) { builtHousekeeper.stop(); } diff --git a/core/src/main/java/io/questdb/client/impl/SenderPool.java b/core/src/main/java/io/questdb/client/impl/SenderPool.java index 61b6ac69..755592d2 100644 --- a/core/src/main/java/io/questdb/client/impl/SenderPool.java +++ b/core/src/main/java/io/questdb/client/impl/SenderPool.java @@ -26,6 +26,11 @@ import io.questdb.client.Sender; import io.questdb.client.cutlass.line.LineSenderException; +import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender; +import io.questdb.client.cutlass.qwp.client.sf.cursor.OrphanScanner; +import io.questdb.client.std.Files; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayDeque; import java.util.ArrayList; @@ -33,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.IntFunction; /** * Elastic pool of {@link Sender} instances, each wrapped in a @@ -49,24 +55,72 @@ * Connection creation happens outside the lock so a slow connect (TLS * handshake, DNS) does not block other borrowers or the housekeeper. The * pool tracks in-flight creations via {@code inFlightCreations} so the cap - * check ({@code allSize + inFlightCreations < maxSize}) stays correct under - * concurrent borrows. + * check ({@code allSize + inFlightCreations + closingSlots + leakedSlots < + * maxSize}) stays correct under concurrent borrows. + *

+ * Store-and-forward slots. When the configuration enables SF + * ({@code sf_dir} set), every sender owns an exclusive on-disk slot + * {@code /} guarded by a {@code flock}. A pool reuses one + * immutable config string for every sender, so without intervention all + * senders would inherit the same {@code sender_id}, point at the same slot, + * and every sender after the first would die with "sf slot already in use". + * The pool therefore hands each slot a distinct id {@code -}, + * where {@code } is the configured {@code sender_id} (default + * {@code "default"}) and {@code } is a stable pool slot index in + * {@code [0, maxSize)}. Indices are reused deterministically (lowest free + * first), so across a restart the same slot dirs are re-adopted and any + * unacked data they hold is recovered on creation. A slot is only returned + * to the free set once its delegate has released the {@code flock}, tracked + * via {@code closingSlots} so a concurrent borrow can never reclaim a slot + * dir whose lock is still held. */ public final class SenderPool implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(SenderPool.class); private final long acquireTimeoutMillis; private final ArrayList all; private final ArrayDeque available; private final String configurationString; private final long idleTimeoutMillis; + // Test seam. Production builds delegates via defaultSender(); white-box + // tests in io.questdb.client.test.impl reach the package-private + // constructor by reflection to inject a factory that throws a non- + // RuntimeException Throwable (e.g. an -ea AssertionError) mid-prewarm, + // exercising the Error-safe delegate cleanup loop. + private final IntFunction senderFactory; private final ReentrantLock lock = new ReentrantLock(); private final long maxLifetimeMillis; private final int maxSize; private final int minSize; + // SF slot base id (configured sender_id, default "default") when SF is + // enabled; null otherwise. Each pooled sender's slot id is + // {@code slotBaseId + "-" + slotIndex}. + private final String slotBaseId; + // SF group root (sf_dir) when SF is enabled; null otherwise. Used to + // locate this pool's own managed slot dirs /- + // for startup recovery of unacked data left by a previous run. + private final String sfDir; + // Reservation bitmap for SF slot indices [0, maxSize). Guarded by lock. + // null when SF is disabled (no per-slot identity needed). + private final boolean[] slotInUse; private final Condition slotReleased; + // True iff the configuration enables store-and-forward (sf_dir set). + private final boolean storeAndForward; private final ThreadLocal threadAffine = new ThreadLocal<>(); + // Slots removed from `all` whose delegate is still releasing its flock. + // They keep reserving capacity (and their slotInUse mark) until the + // flock drops, so the cap check and the slot allocator stay consistent + // and no concurrent borrow can reclaim a slot dir that is still locked. + // Guarded by lock. Only ever ticks for SF slots. + private int closingSlots; private volatile boolean closed; private int inFlightCreations; + // Slots whose delegate close() returned with the SF flock still held + // (the I/O thread refused to stop). Permanently consumed: the index is + // never freed and never reused, so no borrow ever hands out a still- + // locked slot dir. Counted in the cap check so the lost capacity is + // accounted for. Guarded by lock; only ever ticks for SF slots. + private int leakedSlots; public SenderPool( String configurationString, @@ -75,10 +129,28 @@ public SenderPool( long acquireTimeoutMillis, long idleTimeoutMillis, long maxLifetimeMillis + ) { + this(configurationString, minSize, maxSize, acquireTimeoutMillis, + idleTimeoutMillis, maxLifetimeMillis, null); + } + + // Package-private constructor exposing the senderFactory test seam: + // production passes null (-> the real defaultSender()). White-box tests in + // io.questdb.client.test.impl reach this by reflection to inject a factory + // that throws a non-RuntimeException Throwable mid-prewarm. + SenderPool( + String configurationString, + int minSize, + int maxSize, + long acquireTimeoutMillis, + long idleTimeoutMillis, + long maxLifetimeMillis, + IntFunction senderFactory ) { if (minSize < 0 || maxSize < 1 || minSize > maxSize) { throw new IllegalArgumentException("invalid pool sizing: min=" + minSize + ", max=" + maxSize); } + this.senderFactory = senderFactory != null ? senderFactory : this::defaultSender; this.configurationString = configurationString; this.minSize = minSize; this.maxSize = maxSize; @@ -88,24 +160,148 @@ public SenderPool( this.all = new ArrayList<>(maxSize); this.available = new ArrayDeque<>(maxSize); this.slotReleased = lock.newCondition(); - // Pre-warm minSize connections. + // Probe the config once, up front: this validates it eagerly (so a + // bad config fails at construction even when minSize == 0) and tells + // us whether SF is on and, if so, the base slot id to derive + // per-sender ids from. + Sender.LineSenderBuilder probe = Sender.builder(configurationString); + this.storeAndForward = probe.isStoreAndForwardEnabled(); + this.slotBaseId = this.storeAndForward ? probe.getConfiguredSenderId() : null; + this.sfDir = this.storeAndForward ? probe.getConfiguredSfDir() : null; + this.slotInUse = this.storeAndForward ? new boolean[maxSize] : null; + // Pre-warm minSize connections. Pre-warm runs single-threaded in the + // constructor, so slots 0..minSize-1 are reserved directly. int built = 0; try { for (int i = 0; i < minSize; i++) { - PooledSender ps = createUnlocked(); + if (storeAndForward) { + slotInUse[i] = true; + } + PooledSender ps = createUnlocked(storeAndForward ? i : -1); all.add(ps); available.add(ps); built++; } - } catch (RuntimeException e) { + } catch (Throwable e) { + // Catch Throwable, not just RuntimeException: createUnlocked() runs a + // heavy native build path (mmap, flock, WebSocket connect) that can + // throw an Error -- e.g. an -ea AssertionError or OutOfMemoryError -- + // mid-prewarm. If we only caught RuntimeException the Error would + // propagate without running the cleanup below, leaking every + // already-built delegate's flock + mmap'd ring + I/O thread and + // resurrecting "sf slot already in use" on the next attempt. for (int i = 0; i < built; i++) { try { all.get(i).delegate().close(); - } catch (RuntimeException ignored) { + } catch (Throwable ignored) { + // Best-effort cleanup: a delegate close() can throw an + // Error (e.g. an -ea AssertionError) as well as a + // RuntimeException. Swallow either so we still close the + // remaining pre-warmed slots and rethrow the original + // construction failure below. } } throw e; } + // Prewarm succeeded; the pool is still single-threaded and unpublished. + // Recover any unacked data a previous run left in this pool's own + // managed slots that prewarm did not already re-adopt. + recoverStrandedManagedSlots(); + } + + /** + * Best-effort, one-shot recovery of unacked data left in this pool's own + * managed SF slots {@code [0, maxSize)} by a previous run. + *

+ * Every pooled SF sender's orphan drainer deliberately excludes the whole + * {@code [0, maxSize)} managed range (via + * {@code orphanDrainExcludeManagedSlots}) so a sibling never adopts a slot + * dir/lock the pool intends to (re)create -- that exclusion is what keeps + * the per-slot ids from resurfacing "sf slot already in use". The + * trade-off is that an in-range slot left holding unacked data is otherwise + * recovered ONLY when the pool happens to (re)create that index; under + * steady low load the pool may never grow to a high index, stranding that + * slot's data (durable on disk, but undelivered) until a later restart or + * a load spike. This method closes that gap by recovering such slots once, + * here at construction. + *

+ * It runs while the pool is single-threaded and unpublished, and reserves + * each slot index for the duration of its recovery, so no concurrent + * borrow can target the dir -- the cannibalization race the drainer + * exclusion prevents cannot occur here either. Prewarmed slots (already + * live, holding their flock) are skipped; they deliver their recovered + * data through normal use. + *

+ * Every step is best-effort: a slot with no unacked data is a cheap + * directory probe; an unreachable server, a slow drain, or a build/close + * Error is logged and never fails construction, since the data stays + * durable on disk for a later attempt. + */ + private void recoverStrandedManagedSlots() { + if (!storeAndForward || sfDir == null || !Files.exists(sfDir)) { + return; + } + for (int i = 0; i < maxSize; i++) { + // Reserve this index unless prewarm already holds it live. + boolean reserved; + lock.lock(); + try { + reserved = slotInUse[i]; + if (!reserved) { + slotInUse[i] = true; + } + } finally { + lock.unlock(); + } + if (reserved) { + continue; + } + String slotPath = sfDir + "/" + slotBaseId + "-" + i; + boolean stopScan = false; + try { + if (OrphanScanner.isCandidateOrphan(slotPath)) { + PooledSender recoverer = null; + try { + recoverer = createUnlocked(i); + } catch (Throwable buildErr) { + // A build/connect failure (e.g. server unreachable) + // will very likely repeat for every remaining slot, so + // stop here rather than pay a connect timeout per slot. + LOG.warn("startup SF recovery: could not open slot {} ({}); " + + "skipping remaining slots", slotPath, buildErr.toString()); + stopScan = true; + } + if (recoverer != null) { + try { + recoverer.drain(acquireTimeoutMillis); + } catch (Throwable drainErr) { + LOG.warn("startup SF recovery: drain failed for slot {} ({})", + slotPath, drainErr.toString()); + } finally { + try { + recoverer.delegate().close(); + } catch (Throwable ignored) { + // Best-effort close: a teardown Error must not + // abort recovery of the remaining slots. + } + } + } + } + } catch (Throwable scanErr) { + LOG.warn("startup SF recovery: scan failed for slot {} ({})", + slotPath, scanErr.toString()); + } finally { + lock.lock(); + try { + slotInUse[i] = false; + } finally { + lock.unlock(); + } + } + if (stopScan) { + break; + } + } } public PooledSender borrow() { @@ -124,15 +320,31 @@ public PooledSender borrow() { s.markInUse(); return s; } - if (all.size() + inFlightCreations < maxSize) { + if (all.size() + inFlightCreations + closingSlots + leakedSlots < maxSize) { inFlightCreations++; + // Reserve a slot index under the lock so concurrent + // creations never target the same SF slot dir. -1 when + // SF is off (no per-slot identity needed). + int slotIndex = storeAndForward ? allocateSlotIndex() : -1; lock.unlock(); PooledSender created; try { - created = createUnlocked(); - } catch (RuntimeException e) { + created = createUnlocked(slotIndex); + } catch (Throwable e) { + // Catch Throwable, not just RuntimeException: + // createUnlocked() runs a heavy native build path + // (mmap, flock, WebSocket connect) that can throw an + // Error -- e.g. an -ea AssertionError or + // OutOfMemoryError. If we only caught RuntimeException + // the Error would propagate with inFlightCreations + // still incremented and the SF slot index still + // reserved (slotInUse[idx] stuck true), permanently + // lowering pool capacity. The cleanup below is + // idempotent, so undoing the reservation for any + // throwable is safe. lock.lock(); inFlightCreations--; + freeSlotIndex(slotIndex); slotReleased.signal(); lock.unlock(); throw e; @@ -143,9 +355,12 @@ public PooledSender borrow() { // Pool was closed mid-creation -- destroy the new connection // rather than leaking it. Other waiters have been signaled // by close() already. + freeSlotIndex(slotIndex); try { created.delegate().close(); - } catch (RuntimeException ignored) { + } catch (Throwable ignored) { + // Best-effort: an Error (e.g. -ea AssertionError) + // from teardown must not mask the closed-pool signal. } throw new LineSenderException("QuestDB handle is closed"); } @@ -205,7 +420,9 @@ public void close() { for (int i = 0; i < snapshot.length; i++) { try { snapshot[i].delegate().close(); - } catch (RuntimeException ignored) { + } catch (Throwable ignored) { + // Best-effort: an Error from one delegate's teardown must not + // abort the loop and strand the remaining delegates unclosed. } } } @@ -241,21 +458,66 @@ void clearPinIfCurrent(PooledSender s) { */ void discardBroken(PooledSender s) { s.markInvalidated(); + boolean reserved = false; lock.lock(); try { if (closed) { return; } - all.remove(s); - // Wake one waiter -- the cap check in borrow() uses all.size(), - // so a freed slot may now allow a creation attempt. + boolean removed = all.remove(s); + // For an SF slot, keep its index reserved (move the reservation + // from `all` to `closingSlots`) until the delegate below releases + // the flock. Capacity stays accounted for, so a concurrent borrow + // cannot reclaim this slot dir while its lock is still held. + if (removed && s.slotIndex() >= 0) { + closingSlots++; + reserved = true; + } + // Wake one waiter -- the cap check in borrow() may now admit a + // creation attempt (on a *different* slot). slotReleased.signal(); } finally { lock.unlock(); } + // Close the delegate outside the lock (releases the SF flock). Always + // attempt it so native resources are freed even on the defensive path + // where the wrapper had already left `all`. try { s.delegate().close(); - } catch (RuntimeException ignored) { + } catch (Throwable ignored) { + // Best-effort teardown: a delegate close() can throw an Error + // (e.g. an -ea AssertionError) as well as a RuntimeException. + // Either way the slot accounting in the finally below MUST run, + // otherwise an SF slot stays reserved forever (slotInUse stuck + // true, closingSlots over-counted) and the pool leaks capacity + // until borrow() can only ever time out. + } finally { + if (reserved) { + lock.lock(); + try { + if (flockReleased(s)) { + // Flock is released now: return the reserved slot + // index to the free set. + freeSlotIndex(s.slotIndex()); + closingSlots--; + slotReleased.signal(); + } else { + // close() leaked the still-running I/O thread; the + // flock is still held. Retire the slot permanently: + // keep slotInUse[idx] set and move it from + // closingSlots to leakedSlots so the cap math + // accounts for the lost capacity and no borrow ever + // reuses the still-locked dir. + closingSlots--; + leakedSlots++; + LOG.warn("SF slot {} retired permanently: delegate close() returned with the flock still held " + + "(I/O thread refused to stop); pool capacity reduced by 1, now {} of {} usable [leakedSlots={}]", + s.slotIndex(), maxSize - leakedSlots, maxSize, leakedSlots); + } + } finally { + lock.unlock(); + } + } } } @@ -314,6 +576,11 @@ public void reapIdle() { if (idleExpired || overAge) { it.remove(); all.remove(s); + // Keep the SF slot reserved until its flock is released + // below (see discardBroken for the rationale). + if (s.slotIndex() >= 0) { + closingSlots++; + } if (toClose == null) { toClose = new ArrayList<>(); } @@ -327,7 +594,40 @@ public void reapIdle() { for (int i = 0, n = toClose.size(); i < n; i++) { try { toClose.get(i).delegate().close(); - } catch (RuntimeException ignored) { + } catch (Throwable ignored) { + // Best-effort: a single delegate close() failure (including + // an Error such as an -ea AssertionError) must not abort the + // loop -- that would leave sibling flocks unreleased -- nor + // skip the slot-accounting release block below, which would + // strand every reaped index (slotInUse stuck true, + // closingSlots over-counted) and leak pool capacity. + } + } + // Return reserved SF slot indices to the free set -- but only for + // slots whose delegate confirmed the flock was released. A slot + // left locked (I/O thread refused to stop) is retired permanently. + if (storeAndForward) { + lock.lock(); + try { + for (int i = 0, n = toClose.size(); i < n; i++) { + PooledSender s = toClose.get(i); + if (s.slotIndex() >= 0) { + if (flockReleased(s)) { + freeSlotIndex(s.slotIndex()); + closingSlots--; + } else { + closingSlots--; + leakedSlots++; + LOG.warn("SF slot {} retired permanently during idle reaping: delegate close() returned " + + "with the flock still held (I/O thread refused to stop); pool capacity reduced by 1, " + + "now {} of {} usable [leakedSlots={}]", + s.slotIndex(), maxSize - leakedSlots, maxSize, leakedSlots); + } + } + } + slotReleased.signalAll(); + } finally { + lock.unlock(); } } } @@ -353,6 +653,23 @@ public int totalSize() { } } + /** + * Snapshot of the number of SF slots permanently retired because a + * delegate {@code close()} returned with the slot flock still held (the + * I/O thread refused to stop). Each leaked slot permanently lowers the + * pool's effective capacity ({@code maxSize - leakedSlotCount()}). A + * non-zero, growing value explains a pool that has started timing out + * every {@code borrow()}. For metrics and tests. + */ + public int leakedSlotCount() { + lock.lock(); + try { + return leakedSlots; + } finally { + lock.unlock(); + } + } + public void releaseCurrentThread() { PooledSender pinned = threadAffine.get(); if (pinned == null) { @@ -366,8 +683,78 @@ public void releaseCurrentThread() { pinned.close(); } - private PooledSender createUnlocked() { - Sender raw = Sender.fromConfig(configurationString); - return new PooledSender(raw, this); + private PooledSender createUnlocked(int slotIndex) { + return new PooledSender(senderFactory.apply(slotIndex), this, slotIndex); + } + + private Sender defaultSender(int slotIndex) { + final Sender raw; + if (storeAndForward) { + // Give this pooled sender its own slot dir /- + // so concurrent SF senders sharing one sf_dir never collide on + // the slot flock. senderId() is only legal on WebSocket transport, + // which is exactly when storeAndForward is true. + // + // Also fence off the pool's own live slot set [0, maxSize) from + // orphan draining: the pool co-manages every - slot it + // can re-create and recovers each slot's unacked data when it + // (re)creates it, so a sibling's startup drainer must never adopt + // another live pool slot's dir/lock (that would resurrect "sf slot + // already in use"). The bound is maxSize, NOT the whole "-" + // prefix: a same-base slot at an index >= maxSize (left behind when + // a previous run used a larger maxSize) is out of the pool's index + // range forever, so it is left drainable and its unacked data is + // recovered rather than silently stranded. This is a no-op unless + // the config also set drain_orphans=on; foreign leftovers under + // other names are still drained. + raw = Sender.builder(configurationString) + .senderId(slotBaseId + "-" + slotIndex) + .orphanDrainExcludeManagedSlots(slotBaseId, maxSize) + .build(); + } else { + raw = Sender.fromConfig(configurationString); + } + return raw; + } + + /** + * Reserves and returns the lowest free SF slot index. The borrow() cap + * check ({@code all.size() + inFlightCreations + closingSlots + leakedSlots + * < maxSize}) guarantees a free index exists whenever a creation is + * admitted, so this never fails in practice; the guard throws defensively rather than + * silently colliding two senders on one slot dir. Caller must hold + * {@code lock}. + */ + private int allocateSlotIndex() { + for (int i = 0; i < slotInUse.length; i++) { + if (!slotInUse[i]) { + slotInUse[i] = true; + return i; + } + } + throw new IllegalStateException( + "no free SF slot index -- pool capacity invariant violated"); + } + + /** + * Returns an SF slot index to the free set. No-op for non-SF pools and + * for the {@code -1} sentinel. Caller must hold {@code lock}. + */ + private void freeSlotIndex(int idx) { + if (idx >= 0 && slotInUse != null) { + slotInUse[idx] = false; + } + } + + /** + * Whether the delegate's {@code close()} released the SF slot flock. A + * non-QWP delegate never holds an SF flock, so it is always treated as + * released. A {@link QwpWebSocketSender} reports it via + * {@link QwpWebSocketSender#isSlotLockReleased()} -- false means close() + * bailed early with the I/O thread still running and the flock still held. + */ + private static boolean flockReleased(PooledSender s) { + Sender d = s.delegate(); + return !(d instanceof QwpWebSocketSender) || ((QwpWebSocketSender) d).isSlotLockReleased(); } } diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/OrphanScannerTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/OrphanScannerTest.java index bf9f084a..a6aabb90 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/OrphanScannerTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/OrphanScannerTest.java @@ -148,6 +148,180 @@ public void testMultipleOrphansReturned() throws Exception { }); } + @Test + public void testExcludeNamePrefixSkipsWholePoolNamespace() throws Exception { + TestUtils.assertMemoryLeak(() -> { + // A connection pool co-manages the whole "-" namespace + // (default-0, default-1, ...). With drain_orphans=on, one pooled + // sender's startup scan must NOT list its siblings as orphans -- + // the prefix filter fences them off. A foreign leftover under a + // different name is still reported. + for (String name : new String[]{"default-0", "default-1", "default-2"}) { + String slot = sfDir + "/" + name; + assertEquals(0, Files.mkdir(slot, Files.DIR_MODE_DEFAULT)); + touchFile(slot + "/sf-0001.sfa"); + } + String foreign = sfDir + "/legacy"; + assertEquals(0, Files.mkdir(foreign, Files.DIR_MODE_DEFAULT)); + touchFile(foreign + "/sf-0001.sfa"); + + // Caller is default-0; exclude the whole default- namespace. + ObjList orphans = OrphanScanner.scan(sfDir, "default-0", "default-"); + assertEquals("only the foreign slot remains a candidate", 1, orphans.size()); + assertEquals(foreign, orphans.get(0)); + }); + } + + @Test + public void testExcludeNamePrefixDoesNotMatchBareBase() throws Exception { + TestUtils.assertMemoryLeak(() -> { + // A bare un-suffixed id equal to the base (e.g. a standalone + // sender's "default" slot) is NOT part of the pool's "default-" + // namespace, so it is still a drainable foreign orphan. + String bare = sfDir + "/default"; + String sibling = sfDir + "/default-1"; + assertEquals(0, Files.mkdir(bare, Files.DIR_MODE_DEFAULT)); + assertEquals(0, Files.mkdir(sibling, Files.DIR_MODE_DEFAULT)); + touchFile(bare + "/sf-0001.sfa"); + touchFile(sibling + "/sf-0001.sfa"); + + ObjList orphans = OrphanScanner.scan(sfDir, "default-0", "default-"); + assertEquals(1, orphans.size()); + assertEquals(bare, orphans.get(0)); + }); + } + + @Test + public void testNullOrEmptyPrefixBehavesLikeTwoArgScan() throws Exception { + TestUtils.assertMemoryLeak(() -> { + // Without a prefix the sibling IS listed -- this is exactly the + // pre-fix behavior the pool now suppresses by passing "default-". + String sibling = sfDir + "/default-1"; + assertEquals(0, Files.mkdir(sibling, Files.DIR_MODE_DEFAULT)); + touchFile(sibling + "/sf-0001.sfa"); + + assertEquals(1, OrphanScanner.scan(sfDir, "default-0", null).size()); + assertEquals(1, OrphanScanner.scan(sfDir, "default-0", "").size()); + assertEquals(1, OrphanScanner.scan(sfDir, "default-0").size()); + }); + } + + // ---------------------------------------------------------------------- + // Bounded (managed-slot) exclusion: scan(sfDir, exclude, base, count). + // This is the precise replacement for the prefix exclusion and the fix + // for the "shrinking maxSize strands unacked SF data" bug. + // ---------------------------------------------------------------------- + + @Test + public void testBoundedScanExcludesInRangeManagedSlots() throws Exception { + TestUtils.assertMemoryLeak(() -> { + // Pool at maxSize=2 co-manages default-0 and default-1; a startup + // scan must not list a live sibling as an orphan. A foreign slot + // is still reported. + for (String name : new String[]{"default-0", "default-1"}) { + String slot = sfDir + "/" + name; + assertEquals(0, Files.mkdir(slot, Files.DIR_MODE_DEFAULT)); + touchFile(slot + "/sf-0001.sfa"); + } + String foreign = sfDir + "/legacy"; + assertEquals(0, Files.mkdir(foreign, Files.DIR_MODE_DEFAULT)); + touchFile(foreign + "/sf-0001.sfa"); + + // caller is default-0; managed set is [0, 2) + ObjList orphans = OrphanScanner.scan(sfDir, "default-0", "default", 2); + assertEquals("only the foreign slot is a candidate", 1, orphans.size()); + assertEquals(foreign, orphans.get(0)); + }); + } + + @Test + public void testBoundedScanDrainsOutOfRangeSameBaseSlotsAfterShrink() throws Exception { + TestUtils.assertMemoryLeak(() -> { + // The bug: a previous run used maxSize=4 (default-0..3 hold unacked + // data); this run restarts at maxSize=2. default-0/1 are re-created + // and self-recovered (excluded), but default-2/3 are OUT of the new + // [0,2) index range forever -- they must be drainable, not stranded. + for (String name : new String[]{"default-0", "default-1", "default-2", "default-3"}) { + String slot = sfDir + "/" + name; + assertEquals(0, Files.mkdir(slot, Files.DIR_MODE_DEFAULT)); + touchFile(slot + "/sf-0001.sfa"); + } + // caller is default-0; managed set is [0, 2) + ObjList orphans = OrphanScanner.scan(sfDir, "default-0", "default", 2); + assertEquals("default-2 and default-3 must be drainable orphans", 2, orphans.size()); + boolean has2 = false, has3 = false; + for (int i = 0; i < orphans.size(); i++) { + String p = orphans.get(i); + if (p.equals(sfDir + "/default-2")) has2 = true; + if (p.equals(sfDir + "/default-3")) has3 = true; + } + assertTrue("default-2 stranded", has2); + assertTrue("default-3 stranded", has3); + }); + } + + @Test + public void testBoundedScanDrainsNonCanonicalSameBaseNames() throws Exception { + TestUtils.assertMemoryLeak(() -> { + // The pool only ever mints canonical Integer.toString suffixes + // ("0","1",...). A same-base dir with a leading-zero or non-numeric + // suffix is not a managed slot, so it is drained like any foreign + // leftover -- even if its numeric value would fall inside [0,count). + for (String name : new String[]{"default-00", "default-01", "default-foo", "default-"}) { + String slot = sfDir + "/" + name; + assertEquals(0, Files.mkdir(slot, Files.DIR_MODE_DEFAULT)); + touchFile(slot + "/sf-0001.sfa"); + } + // also a genuinely-managed slot that must stay excluded + String managed = sfDir + "/default-1"; + assertEquals(0, Files.mkdir(managed, Files.DIR_MODE_DEFAULT)); + touchFile(managed + "/sf-0001.sfa"); + + ObjList orphans = OrphanScanner.scan(sfDir, "default-0", "default", 4); + assertEquals("all non-canonical same-base names are drainable", 4, orphans.size()); + for (int i = 0; i < orphans.size(); i++) { + assertFalse("managed slot must not appear", + orphans.get(i).equals(managed)); + } + }); + } + + @Test + public void testBoundedScanDisabledWhenCountNonPositive() throws Exception { + TestUtils.assertMemoryLeak(() -> { + // count <= 0 or null/empty base disables the exclusion: every + // sibling with data (except the explicit excludeSlotName) is a + // candidate. + String sibling = sfDir + "/default-1"; + assertEquals(0, Files.mkdir(sibling, Files.DIR_MODE_DEFAULT)); + touchFile(sibling + "/sf-0001.sfa"); + + assertEquals(1, OrphanScanner.scan(sfDir, "default-0", "default", 0).size()); + assertEquals(1, OrphanScanner.scan(sfDir, "default-0", null, 4).size()); + assertEquals(1, OrphanScanner.scan(sfDir, "default-0", "", 4).size()); + }); + } + + @Test + public void testIsManagedSlotPredicate() throws Exception { + TestUtils.assertMemoryLeak(() -> { + // In-range canonical indices are managed. + assertTrue(OrphanScanner.isManagedSlot("default-0", "default-", 2)); + assertTrue(OrphanScanner.isManagedSlot("default-1", "default-", 2)); + // At-or-above the count is NOT managed (drainable). + assertFalse(OrphanScanner.isManagedSlot("default-2", "default-", 2)); + assertFalse(OrphanScanner.isManagedSlot("default-10", "default-", 2)); + // Non-canonical / foreign suffixes are NOT managed. + assertFalse(OrphanScanner.isManagedSlot("default-00", "default-", 4)); + assertFalse(OrphanScanner.isManagedSlot("default-01", "default-", 4)); + assertFalse(OrphanScanner.isManagedSlot("default-foo", "default-", 4)); + assertFalse(OrphanScanner.isManagedSlot("default-", "default-", 4)); + assertFalse(OrphanScanner.isManagedSlot("default--1", "default-", 4)); + assertFalse(OrphanScanner.isManagedSlot("other-0", "default-", 4)); + assertFalse(OrphanScanner.isManagedSlot("default", "default-", 4)); + }); + } + @Test public void testIsCandidateOrphanDirect() throws Exception { TestUtils.assertMemoryLeak(() -> { diff --git a/core/src/test/java/io/questdb/client/test/impl/QueryClientPoolErrorSafetyTest.java b/core/src/test/java/io/questdb/client/test/impl/QueryClientPoolErrorSafetyTest.java new file mode 100644 index 00000000..160be8e7 --- /dev/null +++ b/core/src/test/java/io/questdb/client/test/impl/QueryClientPoolErrorSafetyTest.java @@ -0,0 +1,167 @@ +/*+***************************************************************************** + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2026 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.client.test.impl; + +import io.questdb.client.QueryException; +import io.questdb.client.cutlass.qwp.client.QwpQueryClient; +import io.questdb.client.impl.QueryClientPool; +import io.questdb.client.std.MemoryTag; +import io.questdb.client.std.Unsafe; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +// Error-safety of the three QueryClientPool creation paths the teardown-hardening +// fix widened from catch (RuntimeException) to catch (Throwable). The native +// build/connect path runs under -ea and can throw an Error (AssertionError, +// OutOfMemoryError); the old catches let that Error skip cleanup. +// +// QwpQueryClient is a concrete class with no fake seam, so these tests inject an +// Error at the real connect step via the package-private connectHook constructor +// (reached by reflection -- the main module is declared `open`). fromConfig() +// still runs for real, committing the NATIVE_DEFAULT scratch the cleanup must +// reclaim, so the memory assertions are meaningful. +public class QueryClientPoolErrorSafetyTest { + + // ws config that fromConfig() parses without opening a socket; the injected + // connectHook replaces connect(), so the port is never dialled. + private static final String CFG = "ws::addr=127.0.0.1:9000;"; + + // Site: acquire() inner catch around client.connect(). createUnlocked() must + // close the half-built client when connect throws an Error, otherwise the + // field-initialised QwpBindValues scratch (NATIVE_DEFAULT) leaks. + // RED: catch (RuntimeException) -> Error skips client.close() -> leak. + // GREEN: catch (Throwable) -> client.close() runs -> no leak. + @Test(timeout = 30_000) + public void acquireDoesNotLeakNativeScratchOnErrorFromConnect() throws Exception { + QueryClientPool pool = newPool(CFG, 0, 1, 250, alwaysThrow()); + try { + long baseline = Unsafe.getMemUsedByTag(MemoryTag.NATIVE_DEFAULT); + try { + pool.acquire(); + Assert.fail("expected acquire() to propagate the injected Error"); + } catch (Throwable expected) { + // wrapped or raw -- the leak check is the discriminator + } + long after = Unsafe.getMemUsedByTag(MemoryTag.NATIVE_DEFAULT); + Assert.assertEquals( + "acquire() leaked NATIVE_DEFAULT scratch on an Error from connect()", + baseline, after); + } finally { + pool.close(); + } + } + + // Site: acquire() outer catch around createUnlocked()/start(). An Error must + // still run inFlightCreations--, otherwise the reserved slot is leaked and + // (maxSize == 1) the pool is wedged forever. + // RED: catch (RuntimeException) -> inFlightCreations stuck at 1. + // GREEN: catch (Throwable) -> inFlightCreations restored to 0. + @Test(timeout = 30_000) + public void acquireRestoresInFlightCreationsOnErrorFromConnect() throws Exception { + QueryClientPool pool = newPool(CFG, 0, 1, 250, alwaysThrow()); + try { + try { + pool.acquire(); + Assert.fail("expected acquire() to propagate the injected Error"); + } catch (Throwable expected) { + // expected + } + + Assert.assertEquals( + "acquire() leaked an in-flight creation slot on an Error from connect()", + 0, inFlightCreations(pool)); + + // Corollary: capacity is usable again -- the next acquire() must + // reach the creation path (and fail there) rather than time out. + try { + pool.acquire(); + Assert.fail("expected second acquire() to re-attempt creation"); + } catch (QueryException e) { + Assert.assertFalse( + "pool wedged: second acquire() timed out -> capacity permanently lost (" + + e.getMessage() + ")", + e.getMessage() != null && e.getMessage().contains("timed out")); + } catch (Throwable injectedAgain) { + // also fine: the Error surfaced again from the re-attempt + } + } finally { + pool.close(); + } + } + + // Site: constructor prewarm outer catch. An Error mid-prewarm must run the + // cleanup loop that shuts down already-built workers, otherwise the first + // worker's client (NATIVE_DEFAULT) and I/O thread leak. + // RED: catch (RuntimeException) -> first worker's client never closed. + // GREEN: catch (Throwable) -> cleanup loop closes it -> no leak. + @Test(timeout = 30_000) + public void preWarmDoesNotLeakNativeScratchOnErrorFromConnect() throws Exception { + long baseline = Unsafe.getMemUsedByTag(MemoryTag.NATIVE_DEFAULT); + // First connect() succeeds (no-op, leaves the client unconnected but + // built); the second throws an Error mid-prewarm. + AtomicInteger calls = new AtomicInteger(); + Consumer hook = client -> { + if (calls.incrementAndGet() >= 2) { + throw new AssertionError("injected native connect failure"); + } + }; + try { + newPool(CFG, 2, 2, 250, hook); + Assert.fail("expected prewarm to propagate the injected Error"); + } catch (Throwable expected) { + // expected -- construction aborts + } + long after = Unsafe.getMemUsedByTag(MemoryTag.NATIVE_DEFAULT); + Assert.assertEquals( + "prewarm leaked NATIVE_DEFAULT scratch of an already-built worker on an Error", + baseline, after); + } + + private static Consumer alwaysThrow() { + return client -> { + throw new AssertionError("injected native connect failure"); + }; + } + + private static int inFlightCreations(QueryClientPool pool) throws Exception { + Method m = QueryClientPool.class.getDeclaredMethod("inFlightCreations"); + m.setAccessible(true); + return (int) m.invoke(pool); + } + + private static QueryClientPool newPool( + String cfg, int min, int max, long acquireMs, Consumer connectHook + ) throws Exception { + Constructor c = QueryClientPool.class.getDeclaredConstructor( + String.class, int.class, int.class, long.class, long.class, long.class, Consumer.class); + c.setAccessible(true); + return c.newInstance(cfg, min, max, acquireMs, Long.MAX_VALUE, Long.MAX_VALUE, connectHook); + } +} diff --git a/core/src/test/java/io/questdb/client/test/impl/QuestDBImplErrorSafetyTest.java b/core/src/test/java/io/questdb/client/test/impl/QuestDBImplErrorSafetyTest.java new file mode 100644 index 00000000..93b10301 --- /dev/null +++ b/core/src/test/java/io/questdb/client/test/impl/QuestDBImplErrorSafetyTest.java @@ -0,0 +1,154 @@ +/*+***************************************************************************** + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2026 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.client.test.impl; + +import io.questdb.client.Sender; +import io.questdb.client.cutlass.qwp.client.QwpQueryClient; +import io.questdb.client.impl.QuestDBImpl; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Proxy; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.IntFunction; + +// Error-safety of the QuestDBImpl constructor that ties SenderPool and +// QueryClientPool together. The constructor builds the SenderPool first, then +// the QueryClientPool; both run heavy native build/connect paths that, under -ea, +// can throw an Error (AssertionError, OutOfMemoryError). If the orchestrator's +// cleanup catch is narrower than the pools' own (catch (RuntimeException) vs +// catch (Throwable)), an Error from QueryClientPool construction propagates +// without closing the already-built SenderPool, stranding its prewarmed +// delegates (flock + mmap'd ring + I/O thread). +// +// Sender is an interface, faked with a Proxy whose close() flips a flag, injected +// via the SenderPool senderFactory seam. The connect Error is injected via the +// QueryClientPool connectHook seam. Both are passed through the package-private +// QuestDBImpl seam constructor (reached by reflection -- the main module is +// declared `open`); production callers pass null for both. +public class QuestDBImplErrorSafetyTest { + + // Non-SF http config: the SenderPool factory replaces the build, but the + // constructor's eager config probe must still parse it. + private static final String SENDER_CFG = "http::addr=127.0.0.1:1;protocol_version=2;auto_flush=off;"; + // ws config that fromConfig() parses without opening a socket; the injected + // connectHook replaces connect(), so the port is never dialled. + private static final String QUERY_CFG = "ws::addr=127.0.0.1:9000;"; + + // RED: catch (RuntimeException) -> the Error from QueryClientPool construction + // skips the cleanup block -> the prewarmed SenderPool is never closed -> + // its fake delegate's close() is never called. + // GREEN: catch (Throwable) -> cleanup closes the SenderPool -> the fake + // delegate's close() runs. + @Test(timeout = 30_000) + public void ctorClosesBuiltSenderPoolWhenQueryPoolConstructionThrowsError() throws Exception { + AtomicBoolean senderClosed = new AtomicBoolean(false); + // senderMin = 1 -> SenderPool prewarms one observable delegate. + IntFunction senderFactory = slotIndex -> fakeSender(senderClosed); + // queryMin = 1 -> QueryClientPool prewarm reaches connect(), which throws. + Consumer connectHook = client -> { + throw new AssertionError("injected native connect failure"); + }; + + try { + newQuestDB(senderFactory, connectHook); + Assert.fail("expected QuestDBImpl construction to propagate the injected Error"); + } catch (Throwable expected) { + // expected -- construction aborts + } + + Assert.assertTrue( + "QuestDBImpl ctor leaked the already-built SenderPool on an Error from " + + "QueryClientPool construction: the prewarmed delegate's close() was never called", + senderClosed.get()); + } + + private static Sender fakeSender(AtomicBoolean closedFlag) { + return (Sender) Proxy.newProxyInstance( + Sender.class.getClassLoader(), + new Class[]{Sender.class}, + (proxy, method, args) -> { + switch (method.getName()) { + case "close": + closedFlag.set(true); + return null; + case "toString": + return "FakeSender"; + case "hashCode": + return System.identityHashCode(proxy); + case "equals": + return proxy == args[0]; + default: + Class rt = method.getReturnType(); + if (rt == boolean.class) return false; + if (rt == byte.class) return (byte) 0; + if (rt == short.class) return (short) 0; + if (rt == int.class) return 0; + if (rt == long.class) return 0L; + if (rt == float.class) return 0f; + if (rt == double.class) return 0d; + if (rt == char.class) return (char) 0; + if (rt == void.class) return null; + if (rt.isInstance(proxy)) return proxy; + return null; + } + }); + } + + private static QuestDBImpl newQuestDB( + IntFunction senderFactory, Consumer connectHook + ) throws Exception { + Constructor c = QuestDBImpl.class.getDeclaredConstructor( + String.class, String.class, int.class, int.class, int.class, int.class, + long.class, long.class, long.class, long.class, + IntFunction.class, Consumer.class); + c.setAccessible(true); + try { + return c.newInstance( + SENDER_CFG, QUERY_CFG, + /*senderMin*/ 1, /*senderMax*/ 1, + /*queryMin*/ 1, /*queryMax*/ 1, + /*acquireTimeoutMillis*/ 250L, + /*idleTimeoutMillis*/ Long.MAX_VALUE, + /*maxLifetimeMillis*/ Long.MAX_VALUE, + /*housekeeperIntervalMillis*/ Long.MAX_VALUE, + senderFactory, connectHook); + } catch (InvocationTargetException e) { + // Unwrap so the caller sees the real construction failure (Error or + // RuntimeException), matching a direct constructor invocation. + Throwable cause = e.getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } + if (cause instanceof Error) { + throw (Error) cause; + } + throw e; + } + } +} diff --git a/core/src/test/java/io/questdb/client/test/impl/SenderPoolErrorSafetyTest.java b/core/src/test/java/io/questdb/client/test/impl/SenderPoolErrorSafetyTest.java new file mode 100644 index 00000000..b7b56e7a --- /dev/null +++ b/core/src/test/java/io/questdb/client/test/impl/SenderPoolErrorSafetyTest.java @@ -0,0 +1,255 @@ +/*+***************************************************************************** + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2026 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.client.test.impl; + +import io.questdb.client.Sender; +import io.questdb.client.impl.SenderPool; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Proxy; +import java.nio.file.Paths; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntFunction; + +// Error-safety of the SenderPool constructor prewarm catch that the +// teardown-hardening fix widened from catch (RuntimeException) to +// catch (Throwable). createUnlocked() runs a heavy native build path (mmap, +// flock, WebSocket connect) that can throw an Error under -ea; if the prewarm +// catch does not fire, the cleanup loop never runs and every already-built +// delegate leaks its flock + mmap'd ring + I/O thread -- resurrecting +// "sf slot already in use" on the next attempt. +// +// Sender is an interface, so the build path is faked with a Proxy whose close() +// flips a flag. The fake is injected via the package-private senderFactory +// constructor (reached by reflection -- the main module is declared `open`). +public class SenderPoolErrorSafetyTest { + + // Non-SF http config: fromConfig is never reached (the factory replaces the + // build), but the constructor's eager config probe must still parse it. + private static final String CFG = "http::addr=127.0.0.1:1;protocol_version=2;auto_flush=off;"; + + // RED: catch (RuntimeException) -> the Error from the 2nd build skips the + // cleanup loop -> the 1st delegate is never closed. + // GREEN: catch (Throwable) -> the cleanup loop closes the 1st delegate. + @Test(timeout = 30_000) + public void preWarmClosesBuiltDelegatesWhenBuildThrowsError() throws Exception { + AtomicBoolean firstClosed = new AtomicBoolean(false); + AtomicInteger calls = new AtomicInteger(); + IntFunction factory = slotIndex -> { + if (calls.incrementAndGet() >= 2) { + throw new AssertionError("injected native build failure"); + } + return fakeSender(firstClosed); + }; + + try { + newPool(CFG, 2, 2, 250, factory); + Assert.fail("expected prewarm to propagate the injected Error"); + } catch (Throwable expected) { + // expected -- construction aborts + } + + Assert.assertTrue( + "prewarm leaked an already-built delegate: its close() was never called on an Error", + firstClosed.get()); + } + + // Companion to the catch (RuntimeException) -> track-normal-completion fix in + // PooledSender.close(). flush() can exit with an Error (AssertionError under + // -ea, OutOfMemoryError, ...) as well as a RuntimeException; the wrapper is + // unsafe to recycle either way because Sender does not clear its buffer on a + // failed flush and WebSocket transport latches the failure. + // + // RED (catch (RuntimeException)): the AssertionError from flush() is not + // caught, broken stays false, the finally runs giveBack() -> the broken + // wrapper is recycled -> the next borrow() hands back the SAME instance. + // GREEN (track normal completion): flush() throwing leaves flushed=false -> + // discardBroken() -> the next borrow() builds a fresh wrapper. + @Test(timeout = 30_000) + public void flushErrorDiscardsBrokenSenderInsteadOfRecycling() throws Exception { + IntFunction factory = slotIndex -> flushThrowingSender(); + + try (SenderPool pool = newPool(CFG, 1, 1, 1_000, factory)) { + Sender first = pool.borrow(); + try { + first.close(); + Assert.fail("close() must propagate the Error thrown by flush()"); + } catch (AssertionError expected) { + // expected: the original throwable propagates naturally + } + + Sender second = pool.borrow(); + try { + Assert.assertNotSame( + "a sender whose flush() exited with an Error must be discarded, not recycled", + first, second); + } finally { + // second's flush() also throws on close(); swallow on teardown. + try { + second.close(); + } catch (AssertionError ignored) { + // expected + } + } + } + } + + // Like fakeSender(), but flush() throws an Error to drive the + // PooledSender.close() abnormal-exit branch. + private static Sender flushThrowingSender() { + return (Sender) Proxy.newProxyInstance( + Sender.class.getClassLoader(), + new Class[]{Sender.class}, + (proxy, method, args) -> { + switch (method.getName()) { + case "flush": + throw new AssertionError("injected flush failure"); + case "close": + return null; + case "toString": + return "FlushThrowingSender"; + case "hashCode": + return System.identityHashCode(proxy); + case "equals": + return proxy == args[0]; + default: + Class rt = method.getReturnType(); + if (rt == boolean.class) return false; + if (rt == byte.class) return (byte) 0; + if (rt == short.class) return (short) 0; + if (rt == int.class) return 0; + if (rt == long.class) return 0L; + if (rt == float.class) return 0f; + if (rt == double.class) return 0d; + if (rt == char.class) return (char) 0; + if (rt == void.class) return null; + // fluent ArraySender methods return Sender + if (rt.isInstance(proxy)) return proxy; + return null; + } + }); + } + + // Companion to the SF slot-index reservation in borrow(): when + // createUnlocked() throws on a borrow-triggered grow, the reserved slot + // index MUST be returned via freeSlotIndex(). Otherwise slotInUse[idx] is + // stuck true, pool capacity is permanently lowered, and the next borrow() + // either trips the "no free SF slot index" invariant in allocateSlotIndex() + // or eventually only times out -- the exact failure mode this PR fixes. + // + // The other SenderPool error-injection test fails in the constructor + // pre-warm loop with a non-SF config (slotIndex == -1), so neither the + // borrow-path freeSlotIndex nor the SF (slotIndex >= 0) case is otherwise + // covered. + // + // RED (freeSlotIndex(slotIndex) removed from the borrow catch): the 2nd + // borrow() throws IllegalStateException out of allocateSlotIndex(). + // GREEN (slot index returned): the 2nd borrow() reuses the slot and + // succeeds, proving capacity survived the failed grow. + @Test(timeout = 30_000) + public void borrowReleasesSfSlotIndexWhenCreationFails() throws Exception { + // Unique, non-existent sf_dir: minSize=0 means no pre-warm, so the dir + // is never created and the constructor's startup SF recovery is a no-op. + // The factory replaces createUnlocked(), so localhost:1 is never dialed. + String sfDir = Paths.get(System.getProperty("java.io.tmpdir"), + "qdb-sf-borrowfail-" + System.nanoTime()).toString(); + String sfCfg = "ws::addr=localhost:1;sf_dir=" + sfDir + ";"; + + AtomicInteger calls = new AtomicInteger(); + IntFunction factory = slotIndex -> { + // First borrow-triggered build fails (the slot index reserved for + // it must be released); later builds succeed. + if (calls.getAndIncrement() == 0) { + throw new AssertionError("injected native build failure on first grow"); + } + return fakeSender(new AtomicBoolean()); + }; + + try (SenderPool pool = newPool(sfCfg, 0, 1, 2_000, factory)) { + try { + pool.borrow(); + Assert.fail("borrow() must propagate the Error from the failed build"); + } catch (AssertionError expected) { + // expected: the original throwable propagates out of borrow() + } + + // The single SF slot index must have been returned to the free set. + // If it leaked, this borrow() trips the capacity invariant (or, in + // the timeout-only variant, exhausts the acquire budget). + Sender second = pool.borrow(); + try { + Assert.assertNotNull( + "after a failed grow the SF slot index must be reusable", second); + } finally { + second.close(); + } + } + } + + private static Sender fakeSender(AtomicBoolean closedFlag) { + return (Sender) Proxy.newProxyInstance( + Sender.class.getClassLoader(), + new Class[]{Sender.class}, + (proxy, method, args) -> { + switch (method.getName()) { + case "close": + closedFlag.set(true); + return null; + case "toString": + return "FakeSender"; + case "hashCode": + return System.identityHashCode(proxy); + case "equals": + return proxy == args[0]; + default: + Class rt = method.getReturnType(); + if (rt == boolean.class) return false; + if (rt == byte.class) return (byte) 0; + if (rt == short.class) return (short) 0; + if (rt == int.class) return 0; + if (rt == long.class) return 0L; + if (rt == float.class) return 0f; + if (rt == double.class) return 0d; + if (rt == char.class) return (char) 0; + if (rt == void.class) return null; + // fluent ArraySender methods return Sender + if (rt.isInstance(proxy)) return proxy; + return null; + } + }); + } + + private static SenderPool newPool( + String cfg, int min, int max, long acquireMs, IntFunction senderFactory + ) throws Exception { + Constructor c = SenderPool.class.getDeclaredConstructor( + String.class, int.class, int.class, long.class, long.class, long.class, IntFunction.class); + c.setAccessible(true); + return c.newInstance(cfg, min, max, acquireMs, Long.MAX_VALUE, Long.MAX_VALUE, senderFactory); + } +} diff --git a/core/src/test/java/io/questdb/client/test/impl/SenderPoolSfTest.java b/core/src/test/java/io/questdb/client/test/impl/SenderPoolSfTest.java new file mode 100644 index 00000000..354ff929 --- /dev/null +++ b/core/src/test/java/io/questdb/client/test/impl/SenderPoolSfTest.java @@ -0,0 +1,1198 @@ +/*+***************************************************************************** + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (c) 2014-2019 Appsicle + * Copyright (c) 2019-2026 QuestDB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + ******************************************************************************/ + +package io.questdb.client.test.impl; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; +import io.questdb.client.Sender; +import io.questdb.client.cutlass.line.LineSenderException; +import io.questdb.client.cutlass.qwp.client.sf.cursor.OrphanScanner; +import io.questdb.client.impl.PooledSender; +import io.questdb.client.impl.SenderPool; +import io.questdb.client.std.Files; +import io.questdb.client.test.cutlass.qwp.websocket.TestWebSocketServer; +import io.questdb.client.test.tools.TestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.file.Paths; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Exhaustive tests for {@link SenderPool} interaction with store-and-forward + * (SF) slots. + *

+ * The pool reuses one immutable config string for every sender it builds, so + * before the slot-id fix every SF sender inherited the same {@code sender_id}, + * pointed at the same {@code /} slot, and the second sender + * to start died with "sf slot already in use by another process". These tests + * pin down the fix: each pooled SF sender gets a distinct, stable slot id + * {@code -}; indices are reused deterministically; a slot is only + * returned to the free set once its delegate releases the {@code flock}; and + * the cross-writer guard that the slot lock exists for is still enforced + * between independent pools. + */ +public class SenderPoolSfTest { + + private String sfDir; + + @Before + public void setUp() { + sfDir = Paths.get(System.getProperty("java.io.tmpdir"), + "qdb-sf-pool-" + System.nanoTime()).toString(); + } + + @After + public void tearDown() { + rmDir(sfDir); + } + + // ---------------------------------------------------------------------- + // Core fix: the original claim repro -- two concurrent SF senders. + // ---------------------------------------------------------------------- + + @Test + public void testTwoConcurrentSfSendersGetDistinctSlots() throws Exception { + // The exact scenario from the bug report: a maxSize=2 SF pool must + // hand out two live senders. Pre-fix, the second borrow() blew up on + // the slot flock. + TestUtils.assertMemoryLeak(() -> { + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";"; + try (SenderPool pool = new SenderPool(config, 1, 2, 5_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + PooledSender a = pool.borrow(); + PooledSender b = pool.borrow(); + try { + Assert.assertNotSame("two borrows must be distinct wrappers", a, b); + Assert.assertTrue("slot default-0 must exist", Files.exists(slot("default-0"))); + Assert.assertTrue("slot default-1 must exist", Files.exists(slot("default-1"))); + Assert.assertEquals("exactly two slot dirs", 2, countSlotDirs()); + } finally { + b.close(); + a.close(); + } + } + } + }); + } + + @Test + public void testGrowToMaxAllSfSendersCoexist() throws Exception { + TestUtils.assertMemoryLeak(() -> { + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";"; + try (SenderPool pool = new SenderPool(config, 1, 4, 1_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + PooledSender a = pool.borrow(); + PooledSender b = pool.borrow(); + PooledSender c = pool.borrow(); + PooledSender d = pool.borrow(); + try { + Assert.assertEquals(4, pool.totalSize()); + for (int i = 0; i < 4; i++) { + Assert.assertTrue("slot default-" + i + " must exist", + Files.exists(slot("default-" + i))); + } + Assert.assertEquals(4, countSlotDirs()); + // At max -- the 5th borrow must time out, not collide. + try { + pool.borrow(); + Assert.fail("5th borrow must time out at max=4"); + } catch (LineSenderException e) { + Assert.assertTrue(e.getMessage(), e.getMessage().contains("timed out")); + } + } finally { + d.close(); + c.close(); + b.close(); + a.close(); + } + } + } + }); + } + + @Test + public void testConfiguredSenderIdUsedAsSlotBase() throws Exception { + // A sender_id in the config string becomes the base prefix; the pool + // appends - per slot. This is the knob that lets two pools (or + // two processes) share one sf_dir without colliding. + TestUtils.assertMemoryLeak(() -> { + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";sender_id=myapp;"; + try (SenderPool pool = new SenderPool(config, 2, 2, 1_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + Assert.assertTrue(Files.exists(slot("myapp-0"))); + Assert.assertTrue(Files.exists(slot("myapp-1"))); + Assert.assertFalse("no default-* slots when sender_id is set", + Files.exists(slot("default-0"))); + Assert.assertEquals(2, countSlotDirs()); + } + } + }); + } + + // ---------------------------------------------------------------------- + // Slot lifecycle: reuse, reap-and-reuse, deterministic index recycling. + // ---------------------------------------------------------------------- + + @Test + public void testReturnedSenderReusesSameSlot() throws Exception { + // Borrow, return, borrow again: the wrapper (and thus its slot) is + // recycled, NOT grown into a new slot dir. + TestUtils.assertMemoryLeak(() -> { + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";"; + try (SenderPool pool = new SenderPool(config, 1, 2, 1_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + PooledSender first = pool.borrow(); + first.close(); + PooledSender second = pool.borrow(); + try { + Assert.assertSame("returned slot must be recycled", first, second); + Assert.assertEquals("no new slot dir on recycle", 1, countSlotDirs()); + Assert.assertTrue(Files.exists(slot("default-0"))); + } finally { + second.close(); + } + } + } + }); + } + + @Test + public void testReapIdleFreesSlotAndIndexIsReused() throws Exception { + // Grow to max, return all, reap the over-min idle slots, then borrow + // again. The reaped slot indices must be returned to the free set and + // re-used -- no new index beyond the original high-water mark, and no + // "no free SF slot index" / "sf slot already in use" failure. + TestUtils.assertMemoryLeak(() -> { + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";"; + try (SenderPool pool = new SenderPool(config, 1, 3, 1_000, 80, Long.MAX_VALUE)) { + PooledSender a = pool.borrow(); + PooledSender b = pool.borrow(); + PooledSender c = pool.borrow(); + Assert.assertEquals(3, pool.totalSize()); + a.close(); + b.close(); + c.close(); + + Thread.sleep(150); + pool.reapIdle(); + Assert.assertEquals("reap shrinks to min", 1, pool.totalSize()); + + // High-water mark of slot dirs created so far is 3. + int dirsAfterReap = countSlotDirs(); + Assert.assertTrue("slot dirs persist on disk after reap (>=1)", dirsAfterReap >= 1); + + // Borrow back up to max. Must reuse the freed indices: no + // new slot dir beyond default-0..2 is ever created. + PooledSender x = pool.borrow(); + PooledSender y = pool.borrow(); + PooledSender z = pool.borrow(); + try { + Assert.assertEquals(3, pool.totalSize()); + Assert.assertEquals("indices recycled -- no 4th slot dir", + 3, countSlotDirs()); + Assert.assertFalse("default-3 must never be created", + Files.exists(slot("default-3"))); + } finally { + x.close(); + y.close(); + z.close(); + } + } + } + }); + } + + @Test + public void testRepeatedSaturationNeverExhaustsSlotIndices() throws Exception { + // Regression guard for the cap/slot-allocator invariant: hammer the + // pool through many full saturate/return/reap cycles. If freeing and + // the closingSlots accounting ever drifted, allocateSlotIndex() would + // throw "no free SF slot index" or a borrow would collide on a flock. + TestUtils.assertMemoryLeak(() -> { + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";"; + try (SenderPool pool = new SenderPool(config, 1, 3, 2_000, 1, Long.MAX_VALUE)) { + for (int cycle = 0; cycle < 20; cycle++) { + PooledSender a = pool.borrow(); + PooledSender b = pool.borrow(); + PooledSender c = pool.borrow(); + Assert.assertEquals(3, pool.totalSize()); + a.close(); + b.close(); + c.close(); + pool.reapIdle(); + } + // Never grew past max -- indices stayed within [0,3). + Assert.assertEquals(3, countSlotDirs()); + Assert.assertFalse(Files.exists(slot("default-3"))); + } + } + }); + } + + // ---------------------------------------------------------------------- + // End-to-end ingest through pooled SF senders. + // ---------------------------------------------------------------------- + + @Test + public void testEndToEndIngestThroughPooledSenders() throws Exception { + TestUtils.assertMemoryLeak(() -> { + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";"; + try (SenderPool pool = new SenderPool(config, 1, 3, 5_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + PooledSender a = pool.borrow(); + PooledSender b = pool.borrow(); + try { + a.table("t1").longColumn("v", 1L).atNow(); + a.flush(); + b.table("t2").longColumn("v", 2L).atNow(); + b.flush(); + Assert.assertTrue("server must receive frames from both pooled senders", + awaitAtLeast(handler.frames, 2, 5_000)); + } finally { + b.close(); + a.close(); + } + } + } + }); + } + + // ---------------------------------------------------------------------- + // Cross-writer guard is preserved between independent pools / processes. + // ---------------------------------------------------------------------- + + @Test + public void testSecondPoolSameSfDirSameBaseFailsFast() throws Exception { + // The slot flock is the multi-writer footgun guard. Two pools sharing + // one sf_dir with the same base would both try slot -0; the + // second must fail fast rather than interleave FSNs on disk. The pool + // fix must NOT weaken this contract. + TestUtils.assertMemoryLeak(() -> { + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";"; + try (SenderPool pool1 = new SenderPool(config, 1, 2, 1_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + Assert.assertTrue(Files.exists(slot("default-0"))); + try { + SenderPool pool2 = new SenderPool(config, 1, 2, 1_000, Long.MAX_VALUE, Long.MAX_VALUE); + pool2.close(); + Assert.fail("second pool on same sf_dir+base must fail on the slot lock"); + } catch (IllegalStateException e) { + Assert.assertTrue("message must name the slot-in-use contract, was: " + e.getMessage(), + e.getMessage().contains("sf slot already in use")); + } + } + } + }); + } + + @Test + public void testTwoPoolsDistinctBaseShareSfDir() throws Exception { + // Distinct sender_id base per pool -> distinct slot dirs -> both pools + // coexist on one sf_dir and both can ingest. + TestUtils.assertMemoryLeak(() -> { + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + String configA = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";sender_id=appA;"; + String configB = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";sender_id=appB;"; + try (SenderPool poolA = new SenderPool(configA, 1, 2, 5_000, Long.MAX_VALUE, Long.MAX_VALUE); + SenderPool poolB = new SenderPool(configB, 1, 2, 5_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + PooledSender a = poolA.borrow(); + PooledSender b = poolB.borrow(); + try { + Assert.assertTrue(Files.exists(slot("appA-0"))); + Assert.assertTrue(Files.exists(slot("appB-0"))); + a.table("t").longColumn("v", 1L).atNow(); + a.flush(); + b.table("t").longColumn("v", 2L).atNow(); + b.flush(); + Assert.assertTrue(awaitAtLeast(handler.frames, 2, 5_000)); + } finally { + b.close(); + a.close(); + } + } + } + }); + } + + @Test + public void testCloseReleasesAllSlots() throws Exception { + // After the pool closes, every slot flock must be released so the + // dirs can be re-acquired -- by a fresh pool or a standalone sender. + TestUtils.assertMemoryLeak(() -> { + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";"; + SenderPool pool = new SenderPool(config, 2, 2, 1_000, Long.MAX_VALUE, Long.MAX_VALUE); + PooledSender a = pool.borrow(); + PooledSender b = pool.borrow(); + // Leave them borrowed: close() must still release their flocks. + pool.close(); + + // A fresh pool over the same dirs must re-acquire slot 0 and 1. + try (SenderPool reopened = new SenderPool(config, 2, 2, 1_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + Assert.assertEquals(2, reopened.totalSize()); + Assert.assertTrue(Files.exists(slot("default-0"))); + Assert.assertTrue(Files.exists(slot("default-1"))); + } + } + }); + } + + @Test + public void testSlotLeakedWhenDelegateCloseDoesNotReleaseFlock() throws Exception { + // Latent-fragility guard (M2): the pool returns a slot index to the + // free set ONLY after the delegate's close() has released the SF + // flock. If close() returns with the flock still held (it bailed out + // early with the I/O thread still running), the index must stay + // reserved forever -- otherwise the pool would hand the still-locked + // dir to the next borrow and resurrect "sf slot already in use". + TestUtils.assertMemoryLeak(() -> { + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";"; + try (SenderPool pool = new SenderPool(config, 1, 2, 500, Long.MAX_VALUE, Long.MAX_VALUE)) { + PooledSender a = pool.borrow(); + Assert.assertTrue(Files.exists(slot("default-0"))); + + // Tear the delegate down for real first (so the test leaks + // no native resources), then forge the exact symptom: + // close() returned WITHOUT clearing slotLockReleased. + // close() is idempotent, so the discardBroken re-close + // below is a no-op and leaves the forged flag in place. + Sender delegate = getDelegate(a); + delegate.close(); + setBooleanField(delegate, "slotLockReleased", false); + + // Route the wrapper through the pool's broken-eviction path. + invokeDiscardBroken(pool, a); + + // The leaked index must NOT be returned to the free set, + // and capacity must be accounted as permanently consumed. + Assert.assertEquals("one slot must be retired as leaked", + 1, getIntField(pool, "leakedSlots")); + boolean[] slotInUse = (boolean[]) getField(pool, "slotInUse"); + Assert.assertTrue("leaked slot index 0 must stay reserved", slotInUse[0]); + + // The next borrow must take a fresh index -- never reuse the + // still-locked default-0 dir. + PooledSender b = pool.borrow(); + try { + Assert.assertTrue("new borrow must use a fresh slot dir", + Files.exists(slot("default-1"))); + Assert.assertEquals(2, countSlotDirs()); + + // Capacity is permanently reduced by the leaked slot: + // max=2, one leaked + one live => the next borrow times + // out rather than colliding on the locked dir. + try { + pool.borrow(); + Assert.fail("capacity must be reduced by the leaked slot"); + } catch (LineSenderException e) { + Assert.assertTrue(e.getMessage(), e.getMessage().contains("timed out")); + } + } finally { + b.close(); + } + } + } + }); + } + + @Test + public void testLeakedSlotIsObservable() throws Exception { + // M1 (observability): when a delegate's close() returns with the SF + // flock still held, the pool retires the slot forever and silently + // shrinks capacity. SenderPool has no logger today, so a pool that + // bleeds capacity this way degrades to "every borrow() times out" + // with nothing in the logs to explain why. Pin the contract: the + // leakedSlots++ path MUST emit a WARN (or louder) that names the + // retired slot. This test is RED until that log line is added. + TestUtils.assertMemoryLeak(() -> { + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + // Capture everything SenderPool logs (logback is the SLF4J + // binding on the test classpath). + Logger poolLogger = (Logger) LoggerFactory.getLogger(SenderPool.class); + ListAppender appender = new ListAppender<>(); + appender.start(); + Level savedLevel = poolLogger.getLevel(); + poolLogger.setLevel(Level.ALL); + poolLogger.addAppender(appender); + try { + String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";"; + try (SenderPool pool = new SenderPool(config, 1, 2, 500, Long.MAX_VALUE, Long.MAX_VALUE)) { + PooledSender a = pool.borrow(); + + // Forge the exact leak symptom: close() returned with + // the flock still held (I/O thread refused to stop). + // Tear the delegate down for real first so the test + // leaks no native resources; close() is idempotent so + // discardBroken's re-close leaves the forged flag set. + Sender delegate = getDelegate(a); + delegate.close(); + setBooleanField(delegate, "slotLockReleased", false); + invokeDiscardBroken(pool, a); + + // Sanity: the slot really was retired as leaked. + Assert.assertEquals("precondition: one slot must leak", + 1, getIntField(pool, "leakedSlots")); + // The leak must be observable via public API (metric). + Assert.assertEquals("leaked slot must be observable via leakedSlotCount()", + 1, pool.leakedSlotCount()); + + // Contract under test: the leak must be observable. + boolean warned = appender.list.stream().anyMatch(e -> + e.getLevel().isGreaterOrEqual(Level.WARN) + && e.getFormattedMessage().toLowerCase().contains("slot")); + Assert.assertTrue( + "leakedSlots++ must emit a WARN naming the retired slot, " + + "otherwise capacity loss is invisible; captured events=" + + appender.list, + warned); + } + } finally { + poolLogger.detachAppender(appender); + poolLogger.setLevel(savedLevel); + appender.stop(); + } + } + }); + } + + // ---------------------------------------------------------------------- + // Recovery: stable slot ids let a re-created pool re-adopt unacked data. + // ---------------------------------------------------------------------- + + @Test + public void testRecoveryReplayThroughPooledSlot() throws Exception { + // Phase 1: write rows to a slot against a silent server (no acks), so + // the data persists unacked on disk under default-0. Close. + // Phase 2: a new pool against an ack-ing server re-adopts default-0 + // (stable index) and replays the unacked frames. Stable, deterministic + // slot ids are exactly what make this recovery possible. + TestUtils.assertMemoryLeak(() -> { + // Phase 1 -- silent server. + try (TestWebSocketServer silent = new TestWebSocketServer(new SilentHandler())) { + int silentPort = silent.getPort(); + silent.start(); + Assert.assertTrue(silent.awaitStart(5, TimeUnit.SECONDS)); + String cfg1 = "ws::addr=localhost:" + silentPort + ";sf_dir=" + sfDir + + ";close_flush_timeout_millis=500;"; + try (SenderPool pool = new SenderPool(cfg1, 1, 1, 1_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + PooledSender s = pool.borrow(); + for (int i = 0; i < 3; i++) { + s.table("recover").longColumn("v", i).atNow(); + s.flush(); + } + s.close(); + } + } + // Data must be on disk, unacked, under default-0. + Assert.assertTrue("unacked data must persist on disk", hasSegmentFile(slot("default-0"))); + + // Phase 2 -- ack-ing server, brand-new pool, same sf_dir. + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer ack = new TestWebSocketServer(handler)) { + int ackPort = ack.getPort(); + ack.start(); + Assert.assertTrue(ack.awaitStart(5, TimeUnit.SECONDS)); + String cfg2 = "ws::addr=localhost:" + ackPort + ";sf_dir=" + sfDir + ";"; + try (SenderPool pool = new SenderPool(cfg2, 1, 1, 5_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + PooledSender s = pool.borrow(); + try { + // Drain replays the recovered, unacked frames. + s.drain(5_000); + Assert.assertTrue("recovered frames must be replayed to the new server", + awaitAtLeast(handler.frames, 1, 5_000)); + } finally { + s.close(); + } + } + } + }); + } + + // ---------------------------------------------------------------------- + // Concurrency stress: borrow/return churn must never collide on a slot. + // ---------------------------------------------------------------------- + + @Test + public void testConcurrentBorrowReturnStress() throws Exception { + TestUtils.assertMemoryLeak(() -> { + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";"; + final int threads = 6; + final int iterations = 25; + try (SenderPool pool = new SenderPool(config, 1, 4, 10_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + final CountDownLatch start = new CountDownLatch(1); + final CountDownLatch done = new CountDownLatch(threads); + final AtomicReference failure = new AtomicReference<>(); + for (int t = 0; t < threads; t++) { + final int id = t; + Thread worker = new Thread(() -> { + try { + start.await(); + for (int i = 0; i < iterations; i++) { + PooledSender s = pool.borrow(); + try { + s.table("stress").longColumn("thread", id) + .longColumn("i", i).atNow(); + s.flush(); + } finally { + s.close(); + } + } + } catch (Throwable e) { + failure.compareAndSet(null, e); + } finally { + done.countDown(); + } + }); + worker.start(); + } + start.countDown(); + Assert.assertTrue("workers must finish", done.await(60, TimeUnit.SECONDS)); + if (failure.get() != null) { + throw new AssertionError("concurrent borrow/return failed", failure.get()); + } + // Invariants after the storm. + Assert.assertTrue("totalSize within max", pool.totalSize() <= 4); + Assert.assertTrue("available <= total", + pool.availableSize() <= pool.totalSize()); + Assert.assertTrue("no slot dir beyond max created", countSlotDirs() <= 4); + } + } + }); + } + + @Test + public void testConcurrentFirstBorrowsWithMinZeroRaceOnSfDir() throws Exception { + // C2 regression: senderPoolMin(0) means no single-threaded pre-warm, + // so the shared parent sf_dir is NOT created at construction (the + // constructor probe only parses the config). The first concurrent + // borrows then race into build() -> Files.mkdir(sfDir) outside the + // pool lock. Pre-fix, the mkdir loser got a non-zero rc (EEXIST) and + // its borrow() threw "could not create sf_dir" on a perfectly healthy + // pool. Post-fix, a benign creation race is treated as success. + TestUtils.assertMemoryLeak(() -> { + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer server = new TestWebSocketServer(handler)) { + int port = server.getPort(); + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + String config = "ws::addr=localhost:" + port + ";sf_dir=" + sfDir + ";"; + // minSize=0 -> no pre-warm -> sf_dir absent until first borrow. + try (SenderPool pool = new SenderPool(config, 0, 4, 10_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + Assert.assertFalse("sf_dir must not exist before the first borrow", + Files.exists(sfDir)); + + final int threads = 4; + final CyclicBarrier barrier = new CyclicBarrier(threads); + final CountDownLatch done = new CountDownLatch(threads); + final AtomicReference failure = new AtomicReference<>(); + final PooledSender[] borrowed = new PooledSender[threads]; + for (int t = 0; t < threads; t++) { + final int id = t; + Thread worker = new Thread(() -> { + try { + // Align all first borrows so they race into the + // shared parent mkdir simultaneously. + barrier.await(); + borrowed[id] = pool.borrow(); + } catch (Throwable e) { + failure.compareAndSet(null, e); + } finally { + done.countDown(); + } + }); + worker.start(); + } + Assert.assertTrue("workers must finish", done.await(30, TimeUnit.SECONDS)); + try { + if (failure.get() != null) { + throw new AssertionError( + "concurrent first borrows must not race on sf_dir", failure.get()); + } + Assert.assertEquals(threads, pool.totalSize()); + Assert.assertEquals("one slot dir per borrow", threads, countSlotDirs()); + } finally { + for (PooledSender s : borrowed) { + if (s != null) { + s.close(); + } + } + } + } + } + }); + } + + // ---------------------------------------------------------------------- + // drain_orphans=on + pool: the pool must NOT treat its own sibling slots + // as drainable orphans, but MUST still drain genuine foreign leftovers. + // ---------------------------------------------------------------------- + + @Test + public void testDrainOrphansPoolDoesNotCannibalizeSiblingSlots() throws Exception { + // Regression guard. The pool gives each SF sender a sibling slot + // -. With drain_orphans=on, every pooled build runs an + // orphan scan -- and before the namespace-exclusion fix that scan + // listed the pool's OWN siblings (default-1 holds unacked .sfa) as + // orphans and dispatched a background drainer at them. That drainer + // could win a sibling's flock and re-surface the exact + // "sf slot already in use" collision the per-slot ids were added to + // prevent. After the fix the pool fences off its whole "-" + // namespace, so building a drain_orphans pool over pre-existing + // sibling data is clean and the data is recovered by the pool itself. + TestUtils.assertMemoryLeak(() -> { + // Phase 1: seed unacked data into default-0 AND default-1 via a + // plain (no drain_orphans) pool against a silent server. + try (TestWebSocketServer silent = new TestWebSocketServer(new SilentHandler())) { + int silentPort = silent.getPort(); + silent.start(); + Assert.assertTrue(silent.awaitStart(5, TimeUnit.SECONDS)); + String seedCfg = "ws::addr=localhost:" + silentPort + ";sf_dir=" + sfDir + + ";close_flush_timeout_millis=500;"; + try (SenderPool seed = new SenderPool(seedCfg, 2, 2, 1_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + PooledSender a = seed.borrow(); + PooledSender b = seed.borrow(); + a.table("recover").longColumn("v", 1L).atNow(); + a.flush(); + b.table("recover").longColumn("v", 2L).atNow(); + b.flush(); + b.close(); + a.close(); + } + } + Assert.assertTrue("default-0 must hold unacked data", hasSegmentFile(slot("default-0"))); + Assert.assertTrue("default-1 must hold unacked data", hasSegmentFile(slot("default-1"))); + + // Phase 2: a drain_orphans=on pool over the same sf_dir. Pre-fix + // this construction could throw "sf slot already in use"; post-fix + // it is deterministically clean -- no drainer targets a sibling. + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer ack = new TestWebSocketServer(handler)) { + int ackPort = ack.getPort(); + ack.start(); + Assert.assertTrue(ack.awaitStart(5, TimeUnit.SECONDS)); + String cfg = "ws::addr=localhost:" + ackPort + ";sf_dir=" + sfDir + + ";drain_orphans=on;"; + try (SenderPool pool = new SenderPool(cfg, 2, 2, 5_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + Assert.assertEquals(2, pool.totalSize()); + Assert.assertEquals("no extra slot dirs spawned by a rogue drainer", + 2, countSlotDirs()); + // A drainer must NOT have given up on a sibling slot. + Assert.assertFalse("sibling must not be flagged .failed", + Files.exists(slot("default-0") + "/" + OrphanScanner.FAILED_SENTINEL_NAME)); + Assert.assertFalse("sibling must not be flagged .failed", + Files.exists(slot("default-1") + "/" + OrphanScanner.FAILED_SENTINEL_NAME)); + + // The pool owns and recovers both slots: borrowing + draining + // replays the recovered frames through the legitimate senders. + PooledSender a = pool.borrow(); + PooledSender b = pool.borrow(); + try { + a.drain(5_000); + b.drain(5_000); + Assert.assertTrue("both slots' recovered data must replay", + awaitAtLeast(handler.frames, 2, 5_000)); + } finally { + b.close(); + a.close(); + } + } + } + }); + } + + @Test + public void testDrainOrphansPoolStillDrainsForeignOrphan() throws Exception { + // The fix excludes only the pool's OWN "-" namespace -- a + // genuine foreign leftover (a different sender_id base) must still be + // adopted and drained, otherwise we would have silently disabled the + // drain_orphans feature for pooled deployments. + TestUtils.assertMemoryLeak(() -> { + // Phase 1: a standalone sender with a DIFFERENT base leaves unacked + // data under /legacy. + try (TestWebSocketServer silent = new TestWebSocketServer(new SilentHandler())) { + int silentPort = silent.getPort(); + silent.start(); + Assert.assertTrue(silent.awaitStart(5, TimeUnit.SECONDS)); + // close_flush_timeout_millis=0 => close() does not drain, so + // the flushed-but-unacked frames stay on disk (silent server + // never acks) and close() never throws a drain timeout. + String ghostCfg = "ws::addr=localhost:" + silentPort + ";sf_dir=" + sfDir + + ";sender_id=legacy;close_flush_timeout_millis=0;"; + try (Sender ghost = Sender.fromConfig(ghostCfg)) { + for (int i = 0; i < 3; i++) { + ghost.table("foreign").longColumn("v", i).atNow(); + ghost.flush(); + } + } catch (Exception ignored) { + // best-effort: we only need the unacked .sfa on disk + } + } + Assert.assertTrue("foreign leftover must hold unacked data", + hasSegmentFile(slot("legacy"))); + + // Phase 2: a drain_orphans=on pool with the default base. Its + // pooled senders are default-*, so "legacy" is NOT in the excluded + // namespace -- the background drainer must adopt it, replay its + // frames to the ack server, and clear the slot. + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer ack = new TestWebSocketServer(handler)) { + int ackPort = ack.getPort(); + ack.start(); + Assert.assertTrue(ack.awaitStart(5, TimeUnit.SECONDS)); + String cfg = "ws::addr=localhost:" + ackPort + ";sf_dir=" + sfDir + + ";drain_orphans=on;"; + try (SenderPool pool = new SenderPool(cfg, 1, 2, 5_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + PooledSender s = pool.borrow(); + try { + Assert.assertTrue("foreign orphan frames must be replayed by a drainer", + awaitAtLeast(handler.frames, 1, 10_000)); + Assert.assertTrue("foreign orphan slot must be drained (no unacked .sfa left)", + awaitNoSegmentFile(slot("legacy"), 10_000)); + } finally { + s.close(); + } + } + } + }); + } + + @Test + public void testShrinkingMaxSizeDrainsStrandedOutOfRangeSlots() throws Exception { + // The bug: a deployment that previously ran at maxSize=4 leaves unacked + // data in default-0..3. Restarting at maxSize=2 means default-2 and + // default-3 are out of the new [0,2) index range forever -- the pool + // never re-creates them. Before the fix the pool also fenced off the + // WHOLE "default-" prefix from draining, so default-2/3 were neither + // re-created nor drained: their store-and-forward data was silently + // stranded even with drain_orphans=on. After the fix the exclusion is + // bounded to [0,maxSize), so the out-of-range slots are adopted by a + // background drainer and recovered. + TestUtils.assertMemoryLeak(() -> { + // Phase 1: seed unacked data into default-0..3 via a maxSize=4 pool + // against a silent (never-acks) server. close_flush_timeout_millis=0 + // so close() leaves the flushed-but-unacked .sfa on disk. + try (TestWebSocketServer silent = new TestWebSocketServer(new SilentHandler())) { + int silentPort = silent.getPort(); + silent.start(); + Assert.assertTrue(silent.awaitStart(5, TimeUnit.SECONDS)); + String seedCfg = "ws::addr=localhost:" + silentPort + ";sf_dir=" + sfDir + + ";close_flush_timeout_millis=0;"; + try (SenderPool seed = new SenderPool(seedCfg, 4, 4, 5_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + PooledSender[] s = new PooledSender[4]; + for (int i = 0; i < 4; i++) { + s[i] = seed.borrow(); + } + for (int i = 0; i < 4; i++) { + s[i].table("recover").longColumn("v", i).atNow(); + s[i].flush(); + } + for (int i = 3; i >= 0; i--) { + s[i].close(); + } + } + } + for (int i = 0; i < 4; i++) { + Assert.assertTrue("default-" + i + " must hold unacked data", + hasSegmentFile(slot("default-" + i))); + } + + // Phase 2: restart at maxSize=2 with drain_orphans=on against an ack + // server. The pool re-creates + self-recovers default-0/1; the + // out-of-range default-2/3 must be drained, not stranded. + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer ack = new TestWebSocketServer(handler)) { + int ackPort = ack.getPort(); + ack.start(); + Assert.assertTrue(ack.awaitStart(5, TimeUnit.SECONDS)); + String cfg = "ws::addr=localhost:" + ackPort + ";sf_dir=" + sfDir + + ";drain_orphans=on;"; + try (SenderPool pool = new SenderPool(cfg, 1, 2, 5_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + PooledSender a = pool.borrow(); + PooledSender b = pool.borrow(); + try { + // The regression: the out-of-range slots must be adopted + // by a background drainer and emptied. + Assert.assertTrue("default-2 unacked data must be recovered, not stranded", + awaitNoSegmentFile(slot("default-2"), 15_000)); + Assert.assertTrue("default-3 unacked data must be recovered, not stranded", + awaitNoSegmentFile(slot("default-3"), 15_000)); + Assert.assertFalse("out-of-range slot must not be abandoned as .failed", + Files.exists(slot("default-2") + "/" + OrphanScanner.FAILED_SENTINEL_NAME)); + Assert.assertFalse("out-of-range slot must not be abandoned as .failed", + Files.exists(slot("default-3") + "/" + OrphanScanner.FAILED_SENTINEL_NAME)); + } finally { + b.close(); + a.close(); + } + } + } + }); + } + + @Test + public void testInRangeIdleSlotIsRecoveredAtStartupUnderSteadyLowLoad() throws Exception { + // The drain exclusion is bounded to [0, maxSize) so a sibling's drainer + // never adopts a slot dir the pool intends to (re)create -- that is what + // prevents "sf slot already in use" (see + // testDrainOrphansPoolDoesNotCannibalizeSiblingSlots). The trade-off was + // that an in-range slot left holding unacked data by a previous run was + // recovered ONLY when the pool happened to (re)create that index: the + // pool pre-warms [0, minSize) and builds [minSize, maxSize) lazily on + // demand, so under steady low load a high in-range index was never + // rebuilt -- neither drained (excluded) nor recovered -- and its data + // was stranded on disk until a restart or load spike. + // + // The fix has the pool recover its own stranded managed slots once, at + // construction, under its own slot reservation (so the cannibalization + // race the exclusion guards against still cannot happen). This test + // seeds a busy run, then restarts under steady low load and asserts the + // idle in-range slots are recovered anyway. + TestUtils.assertMemoryLeak(() -> { + // Phase 1: a busy run at maxSize=4 seeds unacked data into + // default-0..3 (silent server never acks; close_flush_timeout=0 so + // close() leaves the flushed-but-unacked .sfa on disk). + try (TestWebSocketServer silent = new TestWebSocketServer(new SilentHandler())) { + int silentPort = silent.getPort(); + silent.start(); + Assert.assertTrue(silent.awaitStart(5, TimeUnit.SECONDS)); + String seedCfg = "ws::addr=localhost:" + silentPort + ";sf_dir=" + sfDir + + ";close_flush_timeout_millis=0;"; + try (SenderPool seed = new SenderPool(seedCfg, 4, 4, 5_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + PooledSender[] s = new PooledSender[4]; + for (int i = 0; i < 4; i++) { + s[i] = seed.borrow(); + } + for (int i = 0; i < 4; i++) { + s[i].table("recover").longColumn("v", i).atNow(); + s[i].flush(); + } + for (int i = 3; i >= 0; i--) { + s[i].close(); + } + } + } + for (int i = 0; i < 4; i++) { + Assert.assertTrue("default-" + i + " must hold unacked data", + hasSegmentFile(slot("default-" + i))); + } + + // Phase 2: restart at the SAME maxSize=4 (so default-0..3 stay in + // range) with steady low load. minSize=0 means prewarm builds + // nothing and the lowest-free allocator would never reach the high + // indices under a single in-flight borrow -- yet startup recovery + // must still empty every in-range slot. + CountingAckHandler handler = new CountingAckHandler(); + try (TestWebSocketServer ack = new TestWebSocketServer(handler)) { + int ackPort = ack.getPort(); + ack.start(); + Assert.assertTrue(ack.awaitStart(5, TimeUnit.SECONDS)); + String cfg = "ws::addr=localhost:" + ackPort + ";sf_dir=" + sfDir + ";"; + try (SenderPool pool = new SenderPool(cfg, 0, 4, 5_000, Long.MAX_VALUE, Long.MAX_VALUE)) { + // All four in-range slots must be recovered by the startup + // pass, even though steady low load never grows the pool to + // their indices. + for (int i = 0; i < 4; i++) { + Assert.assertTrue("in-range idle default-" + i + + " must be recovered at startup, not stranded", + awaitNoSegmentFile(slot("default-" + i), 15_000)); + Assert.assertFalse("recovered slot must not be flagged .failed", + Files.exists(slot("default-" + i) + "/" + OrphanScanner.FAILED_SENTINEL_NAME)); + } + // The recovered frames must have actually reached the server. + Assert.assertTrue("recovered frames must be replayed to the server", + awaitAtLeast(handler.frames, 4, 15_000)); + + // Sanity: the pool is still usable for normal borrows. + PooledSender a = pool.borrow(); + a.close(); + } + } + }); + } + + // ---------------------------------------------------------------------- + // Helpers. + // ---------------------------------------------------------------------- + + private String slot(String name) { + return sfDir + "/" + name; + } + + private int countSlotDirs() { + if (!Files.exists(sfDir)) { + return 0; + } + int count = 0; + long find = Files.findFirst(sfDir); + if (find <= 0) { + return 0; + } + try { + int rc = 1; + while (rc > 0) { + String name = Files.utf8ToString(Files.findName(find)); + rc = Files.findNext(find); + if (name == null || ".".equals(name) || "..".equals(name)) { + continue; + } + // Slot dirs are the only children the pool creates under sfDir. + if (Files.exists(sfDir + "/" + name + "/.lock")) { + count++; + } + } + } finally { + Files.findClose(find); + } + return count; + } + + private static boolean hasSegmentFile(String slotPath) { + if (!Files.exists(slotPath)) { + return false; + } + long find = Files.findFirst(slotPath); + if (find <= 0) { + return false; + } + try { + int rc = 1; + while (rc > 0) { + String name = Files.utf8ToString(Files.findName(find)); + rc = Files.findNext(find); + if (name != null && name.endsWith(".sfa")) { + return true; + } + } + } finally { + Files.findClose(find); + } + return false; + } + + private static boolean awaitAtLeast(AtomicInteger counter, int target, long timeoutMillis) + throws InterruptedException { + long deadline = System.currentTimeMillis() + timeoutMillis; + while (System.currentTimeMillis() < deadline) { + if (counter.get() >= target) { + return true; + } + Thread.sleep(10); + } + return counter.get() >= target; + } + + private static boolean awaitNoSegmentFile(String slotPath, long timeoutMillis) + throws InterruptedException { + long deadline = System.currentTimeMillis() + timeoutMillis; + while (System.currentTimeMillis() < deadline) { + if (!hasSegmentFile(slotPath)) { + return true; + } + Thread.sleep(10); + } + return !hasSegmentFile(slotPath); + } + + private static void rmDir(String dir) { + if (dir == null || !Files.exists(dir)) { + return; + } + long find = Files.findFirst(dir); + if (find > 0) { + try { + int rc = 1; + while (rc > 0) { + String name = Files.utf8ToString(Files.findName(find)); + if (name != null && !".".equals(name) && !"..".equals(name)) { + String child = dir + "/" + name; + if (!Files.remove(child)) { + rmDir(child); + } + } + rc = Files.findNext(find); + } + } finally { + Files.findClose(find); + } + } + Files.remove(dir); + } + + private static Sender getDelegate(PooledSender ps) throws Exception { + Field f = PooledSender.class.getDeclaredField("delegate"); + f.setAccessible(true); + return (Sender) f.get(ps); + } + + private static void setBooleanField(Object target, String name, boolean value) throws Exception { + Field f = target.getClass().getDeclaredField(name); + f.setAccessible(true); + f.setBoolean(target, value); + } + + private static int getIntField(Object target, String name) throws Exception { + Field f = target.getClass().getDeclaredField(name); + f.setAccessible(true); + return f.getInt(target); + } + + private static Object getField(Object target, String name) throws Exception { + Field f = target.getClass().getDeclaredField(name); + f.setAccessible(true); + return f.get(target); + } + + private static void invokeDiscardBroken(SenderPool pool, PooledSender ps) throws Exception { + Method m = SenderPool.class.getDeclaredMethod("discardBroken", PooledSender.class); + m.setAccessible(true); + m.invoke(pool, ps); + } + + /** + * Acks every binary frame with a per-connection running sequence (each + * pooled sender is its own WebSocket connection, so each needs its own + * FSN counter), and counts total frames received across all connections. + */ + private static final class CountingAckHandler implements TestWebSocketServer.WebSocketServerHandler { + final AtomicInteger frames = new AtomicInteger(); + private final Map seqByClient = + new ConcurrentHashMap<>(); + + @Override + public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) { + frames.incrementAndGet(); + AtomicLong seq = seqByClient.computeIfAbsent(client, c -> new AtomicLong(0)); + try { + client.sendBinary(buildAck(seq.getAndIncrement())); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + static byte[] buildAck(long seq) { + byte[] buf = new byte[1 + 8 + 2]; + ByteBuffer bb = ByteBuffer.wrap(buf).order(ByteOrder.LITTLE_ENDIAN); + bb.put((byte) 0x00); // STATUS_OK + bb.putLong(seq); + bb.putShort((short) 0); + return buf; + } + } + + private static final class SilentHandler implements TestWebSocketServer.WebSocketServerHandler { + @Override + public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) { + // No ack -- frames stay unacked on disk for the recovery test. + } + } +} diff --git a/core/src/test/java/io/questdb/client/test/impl/SenderPoolTest.java b/core/src/test/java/io/questdb/client/test/impl/SenderPoolTest.java index 7c58943a..8c35d21a 100644 --- a/core/src/test/java/io/questdb/client/test/impl/SenderPoolTest.java +++ b/core/src/test/java/io/questdb/client/test/impl/SenderPoolTest.java @@ -26,12 +26,16 @@ import io.questdb.client.Sender; import io.questdb.client.cutlass.line.LineSenderException; +import io.questdb.client.impl.PooledSender; import io.questdb.client.impl.SenderPool; import org.junit.Assert; import org.junit.Test; +import java.lang.reflect.Field; +import java.lang.reflect.Proxy; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; /** @@ -487,4 +491,96 @@ public void testThreadAffinityIsPerThread() throws InterruptedException { pool.releaseCurrentThread(); } } + + // ---------------------------------------------------------------------- + // Teardown robustness: a delegate close() can throw an Error (e.g. an + // -ea AssertionError), not just a RuntimeException. The pool's best-effort + // teardown paths used to catch only RuntimeException, so such an Error + // escaped: reapIdle() aborted mid-loop (stranding siblings) and the cap + // accounting was skipped, permanently understating capacity. These tests + // pin the fix: an Error during delegate close() must not abort the loop, + // must not propagate, and must leave the pool fully usable. + // ---------------------------------------------------------------------- + + @Test + public void testReapIdleSurvivesDelegateCloseError() throws Exception { + // idleTimeout=1ms so every idle slot is reap-eligible after a short sleep. + try (SenderPool pool = new SenderPool(DEAD_HTTP_CONFIG, 0, 3, 1_000, 1, Long.MAX_VALUE)) { + AtomicInteger closeAttempts = new AtomicInteger(); + PooledSender a = pool.borrow(); + PooledSender b = pool.borrow(); + PooledSender c = pool.borrow(); + installFailingCloseDelegate(a, closeAttempts); + installFailingCloseDelegate(b, closeAttempts); + installFailingCloseDelegate(c, closeAttempts); + pool.giveBack(a); + pool.giveBack(b); + pool.giveBack(c); + Assert.assertEquals(3, pool.totalSize()); + + Thread.sleep(10); + // Must NOT throw even though every delegate.close() raises an Error. + pool.reapIdle(); + + // The loop attempted to close ALL three delegates -- it did not abort + // after the first Error. + Assert.assertEquals("reap loop must not abort on a delegate close() Error", + 3, closeAttempts.get()); + // Every reaped slot left `all`; capacity was not stranded. + Assert.assertEquals(0, pool.totalSize()); + Assert.assertEquals(0, pool.availableSize()); + + // Pool still grows back to full capacity -- no permanent leak. + PooledSender d = pool.borrow(); + PooledSender e = pool.borrow(); + PooledSender f = pool.borrow(); + Assert.assertEquals(3, pool.totalSize()); + pool.giveBack(d); + pool.giveBack(e); + pool.giveBack(f); + } + } + + @Test + public void testCloseSurvivesDelegateCloseError() throws Exception { + SenderPool pool = new SenderPool(DEAD_HTTP_CONFIG, 2, 2, 1_000, Long.MAX_VALUE, Long.MAX_VALUE); + AtomicInteger closeAttempts = new AtomicInteger(); + PooledSender a = pool.borrow(); + PooledSender b = pool.borrow(); + installFailingCloseDelegate(a, closeAttempts); + installFailingCloseDelegate(b, closeAttempts); + pool.giveBack(a); + pool.giveBack(b); + + // close() must not propagate an Error and must attempt every delegate. + pool.close(); + Assert.assertEquals("close() must attempt to close every delegate despite an Error", + 2, closeAttempts.get()); + // Idempotent second close stays clean. + pool.close(); + } + + /** + * Reflectively replaces the wrapper's delegate with a {@link Proxy} that + * throws an {@link AssertionError} (an {@link Error}, not a + * {@link RuntimeException}) on {@code close()} and forwards every other + * call to the real delegate. Models an -ea assertion firing during native + * teardown. + */ + private static void installFailingCloseDelegate(PooledSender ps, AtomicInteger closeAttempts) throws Exception { + Field f = PooledSender.class.getDeclaredField("delegate"); + f.setAccessible(true); + Sender real = (Sender) f.get(ps); + Sender failing = (Sender) Proxy.newProxyInstance( + Sender.class.getClassLoader(), + new Class[]{Sender.class}, + (proxy, method, args) -> { + if ("close".equals(method.getName()) && (args == null || args.length == 0)) { + closeAttempts.incrementAndGet(); + throw new AssertionError("injected delegate close() Error"); + } + return method.invoke(real, args); + }); + f.set(ps, failing); + } } diff --git a/core/src/test/java/module-info.java b/core/src/test/java/module-info.java index e9997b3d..e42b73ff 100644 --- a/core/src/test/java/module-info.java +++ b/core/src/test/java/module-info.java @@ -34,6 +34,7 @@ requires org.postgresql.jdbc; requires jmh.core; requires ch.qos.logback.classic; + requires ch.qos.logback.core; exports io.questdb.client.test; exports io.questdb.client.test.cairo;