Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 84 additions & 2 deletions core/src/main/java/io/questdb/client/Sender.java
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,15 @@ final class LineSenderBuilder {
// runtime lands in a follow-up commit. For now we surface the
// count via logging so users can confirm orphans are being seen.
private boolean drainOrphans = false;
// Orphan-scan exclusion for the connection pool. The pool co-manages
// exactly <orphanDrainBase>-<i> for i in [0, orphanDrainSlotCount) and
// recovers each of those on (re)creation, so pooled senders must never
// treat one another's live slots as drainable orphans. Anything else --
// a different base, a bare un-suffixed id, OR a same-base index at or
// above the count (a slot left behind by a larger pool before maxSize
// shrank) -- is still drained, so unacked data is never stranded.
private String orphanDrainBase;
private int orphanDrainSlotCount;
private long durableAckKeepaliveIntervalMillis = DURABLE_ACK_KEEPALIVE_NOT_SET;
// Optional user-supplied async error handler. When null, the sender
// uses DefaultSenderErrorHandler.INSTANCE (loud-not-silent log).
Expand Down Expand Up @@ -1472,7 +1481,15 @@ public Sender build() {
} else {
if (!Files.exists(sfDir)) {
int rc = Files.mkdir(sfDir, Files.DIR_MODE_DEFAULT);
if (rc != 0) {
// mkdir is non-zero on failure, but "already exists"
// is one such failure. Multiple SF senders sharing one
// sf_dir can be built concurrently (the pool calls
// build() outside its lock), so two threads can both
// pass the exists() check and race into mkdir; the
// loser gets EEXIST. Treat a benign creation race --
// the dir now exists -- as success and only fail when
// the directory is genuinely absent afterwards.
if (rc != 0 && !Files.exists(sfDir)) {
throw new LineSenderException(
"could not create sf_dir: " + sfDir + " rc=" + rc);
}
Expand Down Expand Up @@ -1548,7 +1565,7 @@ public Sender build() {
if (drainOrphans && sfDir != null) {
io.questdb.client.std.ObjList<String> orphans =
io.questdb.client.cutlass.qwp.client.sf.cursor.OrphanScanner
.scan(sfDir, senderId);
.scan(sfDir, senderId, orphanDrainBase, orphanDrainSlotCount);
if (orphans.size() > 0) {
org.slf4j.LoggerFactory.getLogger(LineSenderBuilder.class)
.info("dispatching drainers for {} orphan slot(s) under {} "
Expand Down Expand Up @@ -2471,6 +2488,71 @@ public LineSenderBuilder senderId(String id) {
return this;
}

/**
* The slot id ({@code sender_id}) currently configured on this
* builder, either parsed from the config string or left at its
* {@code "default"} default. Introspection hook for the connection
* pool, which derives a distinct per-slot id from this base so that
* multiple pooled senders sharing one {@code sf_dir} don't collide
* on the slot {@code flock}.
*/
public String getConfiguredSenderId() {
return senderId;
}

/**
* The store-and-forward group root ({@code sf_dir}) currently
* configured on this builder, or {@code null} when SF is disabled.
* Introspection hook for the connection pool, which needs the group
* root to locate its own managed slot dirs {@code <sf_dir>/<base>-<i>}
* when recovering unacked data a previous run left behind.
*/
public String getConfiguredSfDir() {
return sfDir;
}

/**
* Excludes the connection pool's <em>live</em> slot set from
* {@link #drainOrphans(boolean)} scanning: a sibling slot under
* {@code sf_dir} named {@code <base>-<index>} with
* {@code 0 <= index < slotCount} is never treated as a drainable orphan.
* <p>
* Internal introspection hook for the connection pool. The pool gives
* each pooled SF sender a distinct slot id {@code <base>-<index>} and
* recovers each slot's unacked data itself when it (re)creates that
* slot. Without this exclusion, one pooled sender's startup drainer
* could adopt a sibling pool slot's lock and dir, reintroducing the
* very "sf slot already in use" collision the per-slot ids were added
* to prevent.
* <p>
* Unlike a blanket {@code <base>-} prefix exclusion, the bound is the
* pool's {@code maxSize}: a same-base slot whose index is at or above
* {@code slotCount} (e.g. {@code <base>-3} left behind by a larger pool
* before {@code maxSize} shrank from 4 to 2) is NOT excluded and is
* drained like any foreign leftover, so its unacked data is recovered
* instead of being silently stranded. Foreign leftovers (a different
* base, or a bare un-suffixed id) are also still drained.
* <p>
* Pass a {@code null}/empty base or {@code slotCount <= 0} to disable
* the exclusion (the default).
*/
public LineSenderBuilder orphanDrainExcludeManagedSlots(String base, int slotCount) {
this.orphanDrainBase = base;
this.orphanDrainSlotCount = slotCount;
return this;
}

/**
* True iff store-and-forward is enabled (an {@code sf_dir} was set).
* Introspection hook for the connection pool: SF senders own an
* exclusive on-disk slot, so each pooled sender needs its own slot
* id, whereas non-SF (memory-mode / HTTP / TCP) senders share no
* such resource and need no per-slot identity.
*/
public boolean isStoreAndForwardEnabled() {
return sfDir != null;
}

/**
* Per-call deadline for {@code Sender.flush()} spinning on a full
* cursor segment ring waiting for ACKs to drain space. Default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,12 @@ public class QwpWebSocketSender implements Sender {
private Sender.InitialConnectMode initialConnectMode = Sender.InitialConnectMode.OFF;
private boolean ownsCursorEngine;
private long pendingBytes;
// Set true by close() once the SF slot flock has been released (the normal
// teardown path). Stays false if close() bailed early with the I/O thread
// still running -- then cursorEngine.close() never ran and the flock is
// still held, so the owning pool MUST keep the slot reserved rather than
// hand the still-locked dir to the next borrow ("sf slot already in use").
private boolean slotLockReleased;
private int pendingRowCount;
private SenderProgressDispatcher progressDispatcher;
// Async-delivery sink for ack-watermark advances. Default no-op; a
Expand Down Expand Up @@ -1087,6 +1093,10 @@ public void close() {
cursorEngine = null;
ownsCursorEngine = false;
}
// Past the ioThreadStopped guard => cursorEngine.close() ran and
// released the SF flock in its finally (or this sender owned no
// engine holding one). Signal the pool it may reuse the slot.
slotLockReleased = true;

// Shutdown order: dispatcher last, after the I/O loop has stopped
// producing into it. close() drains pending entries with a short
Expand Down Expand Up @@ -1129,6 +1139,16 @@ public void close() {
}
}

/**
* True once {@link #close()} has released the store-and-forward slot
* flock. False means close() leaked the still-running I/O thread (and its
* resources), so the flock is still held; the owning pool must keep the
* slot index reserved instead of reusing the still-locked slot dir.
*/
public boolean isSlotLockReleased() {
return slotLockReleased;
}

@Override
public Sender decimalColumn(CharSequence name, Decimal64 value) {
checkNotClosed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,28 @@ private OrphanScanner() {
* "no orphans" answer in that case.
*/
public static ObjList<String> scan(String sfDir, String excludeSlotName) {
return scan(sfDir, excludeSlotName, null);
}

/**
* As {@link #scan(String, String)}, but additionally skips any child
* whose name starts with {@code excludeNamePrefix} (when non-null and
* non-empty).
* <p>
* This is how a connection pool keeps the scanner from treating its own
* sibling slots as orphans: every pooled SF sender lives at
* {@code <sf_dir>/<base>-<index>}, and the pool itself recovers each
* slot's unacked data when it (re)creates that slot — so the pool's
* whole {@code <base>-} namespace must be excluded from auto-drain.
* Genuine foreign leftovers (a different base, or a bare un-suffixed
* id) do not match the prefix and are still reported.
*/
public static ObjList<String> scan(String sfDir, String excludeSlotName, String excludeNamePrefix) {
ObjList<String> orphans = new ObjList<>();
if (sfDir == null || !Files.exists(sfDir)) {
return orphans;
}
boolean hasPrefix = excludeNamePrefix != null && !excludeNamePrefix.isEmpty();
long find = Files.findFirst(sfDir);
if (find < 0) {
LOG.warn("orphan scan could not enumerate {} — treating as no orphans, "
Expand All @@ -99,6 +117,76 @@ public static ObjList<String> scan(String sfDir, String excludeSlotName) {
if (excludeSlotName != null && excludeSlotName.equals(name)) {
continue;
}
if (hasPrefix && name.startsWith(excludeNamePrefix)) {
continue;
}
String slotPath = sfDir + "/" + name;
if (!isCandidateOrphan(slotPath)) {
continue;
}
orphans.add(slotPath);
}
} finally {
Files.findClose(find);
}
return orphans;
}

/**
* As {@link #scan(String, String)}, but excludes only the <em>exact</em>
* set of slot dirs a connection pool can re-create and self-recover:
* {@code <managedBase>-<i>} for {@code 0 <= i < managedSlotCount}.
* <p>
* This is the precise replacement for the coarser prefix exclusion
* ({@link #scan(String, String, String)}). The prefix form fences off the
* <em>whole</em> {@code <base>-} namespace, which silently strands unacked
* data after a {@code maxSize} shrink across restarts: a slot like
* {@code <base>-3} left over from a larger pool is neither re-created (out
* of the new {@code [0,maxSize)} index range) nor drained (it matched the
* excluded prefix). By bounding the exclusion to {@code [0,managedSlotCount)},
* any same-base slot with an index at or above {@code managedSlotCount} is
* treated like a foreign leftover and becomes a drainable orphan, so its
* data is recovered through the normal drain path.
* <p>
* Only canonical, pool-minted names are excluded: the suffix after
* {@code <managedBase>-} must be a canonical non-negative decimal
* ({@code 0,1,2,...} with no leading zeros, sign, or non-digits). Anything
* else under the same base ({@code <base>-foo}, {@code <base>-007}) is not a
* name the pool creates and is reported as a candidate.
* <p>
* When {@code managedBase} is null/empty or {@code managedSlotCount <= 0}
* no exclusion is applied (every sibling with data is a candidate).
*/
public static ObjList<String> scan(String sfDir, String excludeSlotName, String managedBase, int managedSlotCount) {
ObjList<String> orphans = new ObjList<>();
if (sfDir == null || !Files.exists(sfDir)) {
return orphans;
}
boolean hasManaged = managedBase != null && !managedBase.isEmpty() && managedSlotCount > 0;
String managedPrefix = hasManaged ? managedBase + "-" : null;
long find = Files.findFirst(sfDir);
if (find < 0) {
LOG.warn("orphan scan could not enumerate {} \u2014 treating as no orphans, "
+ "but this may indicate a permission or transient error", sfDir);
return orphans;
}
if (find == 0) {
return orphans;
}
try {
int rc = 1;
while (rc > 0) {
String name = Files.utf8ToString(Files.findName(find));
rc = Files.findNext(find);
if (name == null || ".".equals(name) || "..".equals(name)) {
continue;
}
if (excludeSlotName != null && excludeSlotName.equals(name)) {
continue;
}
if (hasManaged && isManagedSlot(name, managedPrefix, managedSlotCount)) {
continue;
}
String slotPath = sfDir + "/" + name;
if (!isCandidateOrphan(slotPath)) {
continue;
Expand All @@ -111,6 +199,50 @@ public static ObjList<String> scan(String sfDir, String excludeSlotName) {
return orphans;
}

/**
* True iff {@code name} is a slot the pool actively co-manages, i.e.
* {@code <managedPrefix><i>} where {@code i} is a canonical non-negative
* decimal in {@code [0, managedSlotCount)}. Visible for testing.
*/
public static boolean isManagedSlot(String name, String managedPrefix, int managedSlotCount) {
if (name == null || managedPrefix == null || !name.startsWith(managedPrefix)) {
return false;
}
int idx = parseCanonicalIndex(name, managedPrefix.length());
return idx >= 0 && idx < managedSlotCount;
}

/**
* Parses the canonical non-negative decimal that makes up the rest of
* {@code name} from {@code from}. Returns {@code -1} for an empty suffix,
* a non-digit, a leading zero (e.g. {@code "007"}), or anything that would
* overflow {@code int}. Only the exact form the pool emits
* ({@code Integer.toString(index)}) is accepted, so foreign or malformed
* same-base names never get mistaken for a managed slot.
*/
private static int parseCanonicalIndex(String name, int from) {
int len = name.length();
if (from >= len) {
return -1;
}
// Reject leading zeros unless the whole suffix is exactly "0".
if (name.charAt(from) == '0' && len - from > 1) {
return -1;
}
long acc = 0;
for (int i = from; i < len; i++) {
char c = name.charAt(i);
if (c < '0' || c > '9') {
return -1;
}
acc = acc * 10 + (c - '0');
if (acc > Integer.MAX_VALUE) {
return -1;
}
}
return (int) acc;
}

/**
* True iff {@code slotPath} looks like a slot dir with unacked data
* and no failure sentinel. Visible for testing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,15 @@ private void runLoop() {
}
try {
senderPool.reapIdle();
} catch (RuntimeException ignored) {
} catch (Throwable ignored) {
// Reaping must not propagate -- it's best-effort housekeeping.
// Catch Throwable (not just RuntimeException) so an Error from a
// delegate teardown can never kill this daemon thread and stop
// all future reaping for the life of the handle.
}
try {
queryPool.reapIdle();
} catch (RuntimeException ignored) {
} catch (Throwable ignored) {
}
}
}
Expand Down
Loading
Loading