Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions core/src/main/java/io/questdb/client/Sender.java
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,22 @@ default Sender floatColumn(CharSequence name, float value) {
*/
void flush();

/**
* Force flushing internal buffers to the server, then wait until the server
* acknowledges everything published so far, or until {@code timeoutMillis}
* elapses.
* <p>
* This acknowledgement drain is currently supported only by the WebSocket
* transport. It is useful for call sites that need an explicit durability
* checkpoint without closing the sender. The close-time drain budget is
* configured separately with {@link LineSenderBuilder#closeFlushTimeoutMillis(long)}.
*
* @param timeoutMillis upper bound on the acknowledgement wait
*/
default void drain(long timeoutMillis) {
throw new LineSenderException("drain(timeoutMillis) is only supported for WebSocket transport");
}

/**
* Add a GEOHASH column value from pre-packed bits and an explicit bit precision.
* <p>
Expand Down Expand Up @@ -846,10 +862,10 @@ final class LineSenderBuilder {
private static final int DEFAULT_AUTO_FLUSH_INTERVAL_MILLIS = 1_000;
private static final int DEFAULT_AUTO_FLUSH_ROWS = 75_000;
private static final int DEFAULT_BUFFER_CAPACITY = 64 * 1024;
// Default close() drain timeout: block up to 5s waiting for the
// Default close() drain timeout: block up to 60s waiting for the
// server to ACK everything published into the engine before
// shutting down the I/O loop.
private static final long DEFAULT_CLOSE_FLUSH_TIMEOUT_MILLIS = 5_000L;
private static final long DEFAULT_CLOSE_FLUSH_TIMEOUT_MILLIS = QwpWebSocketSender.DEFAULT_CLOSE_FLUSH_TIMEOUT_MS;
private static final int DEFAULT_HTTP_PORT = 9000;
private static final int DEFAULT_HTTP_TIMEOUT = 30_000;
private static final int DEFAULT_MAXIMUM_BUFFER_CAPACITY = 100 * 1024 * 1024;
Expand Down Expand Up @@ -1548,7 +1564,7 @@ public Sender build() {
* close() drain timeout in milliseconds. The sender's {@code close()}
* method blocks up to this many millis waiting for the server to ACK
* every batch already published into the engine before shutting down
* the I/O loop. Default {@code 5000}.
* the I/O loop. Default {@code 60000}.
* <p>
* Set to {@code 0} or {@code -1} to opt out — close() will not wait
* at all (fast close). Pending data is then lost in memory mode and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public class QwpWebSocketSender implements Sender {
public static final int DEFAULT_AUTO_FLUSH_BYTES = 8 * 1024 * 1024;
public static final long DEFAULT_AUTO_FLUSH_INTERVAL_NANOS = 100_000_000L; // 100ms
public static final int DEFAULT_AUTO_FLUSH_ROWS = 1_000;
public static final long DEFAULT_CLOSE_FLUSH_TIMEOUT_MS = 60_000L;
public static final int DEFAULT_MAX_SCHEMAS_PER_CONNECTION = 65_535;
private static final int DEFAULT_BUFFER_SIZE = 8192;
private static final int DEFAULT_MICROBATCH_BUFFER_SIZE = 1024 * 1024; // 1MB
Expand Down Expand Up @@ -165,7 +166,7 @@ public class QwpWebSocketSender implements Sender {
// close() drain timeout in millis. Default applied at construction.
// 0 or -1 means "fast close" (skip the drain); otherwise close blocks
// up to this many millis for ackedFsn to catch up to publishedFsn.
private long closeFlushTimeoutMillis = 5_000L;
private long closeFlushTimeoutMillis = DEFAULT_CLOSE_FLUSH_TIMEOUT_MS;
private volatile boolean closed;
private boolean connected;
private SenderConnectionDispatcher connectionDispatcher;
Expand Down Expand Up @@ -382,7 +383,7 @@ public static QwpWebSocketSender connect(
) {
return connect(host, port, tlsConfig, autoFlushRows, autoFlushBytes, autoFlushIntervalNanos,
authorizationHeader, maxSchemasPerConnection,
requestDurableAck, cursorEngine, 5_000L);
requestDurableAck, cursorEngine, DEFAULT_CLOSE_FLUSH_TIMEOUT_MS);
}

/**
Expand Down Expand Up @@ -802,6 +803,38 @@ public boolean awaitAckedFsn(long targetFsn, long timeoutMillis) {
return true;
}

/**
* Flushes all buffered rows into the cursor engine and blocks until the
* server has ACKed everything published so far, or until
* {@code timeoutMillis} elapses.
* <p>
* Unlike {@link #close()}, this call does not shut the sender down and does
* not use {@code close_flush_timeout_millis}; the caller supplies the wait
* budget for this specific drain point. {@code timeoutMillis <= 0} performs
* a non-blocking check and throws if the published FSN is not already ACKed.
*
* @param timeoutMillis upper bound on the ACK wait
* @throws LineSenderException if the sender is closed, a row is still in
* progress, a terminal WebSocket failure is
* observed, or the timeout elapses before all
* published batches are ACKed
*/
@Override
public void drain(long timeoutMillis) {
long target = flushAndGetSequence();
if (!awaitAckedFsn(target, timeoutMillis)) {
long acked = getAckedFsn();
LOG.warn("drain() timed out after {}ms [target={} acked={}], pending data is still in flight",
timeoutMillis, target, acked);
throw new LineSenderException("drain() timed out after ")
.put(timeoutMillis).put(" ms [publishedFsn=")
.put(target).put(", ackedFsn=").put(acked)
.put("] - server did not acknowledge ")
.put(target - acked)
.put(" pending batches");
}
}

/**
* Adds a BINARY column value to the current row. The bytes are written
* verbatim with no encoding or transformation. A {@code null} array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void tearDown() {
@Test
public void testFullyAckedActiveDoesNotReplayAfterCleanRestart() throws Exception {
// Phase 1: server ACKs every frame. Sender writes a few rows,
// flushes, then close() blocks for the default 5s drain — by the
// flushes, then close() blocks for the default drain — by the
// time close returns, every frame has been ACK'd.
int port1 = TestPorts.findUnusedPort();
AckHandler ack1 = new AckHandler();
Expand All @@ -87,7 +87,7 @@ public void testFullyAckedActiveDoesNotReplayAfterCleanRestart() throws Exceptio
sender.flush();
}
// Wait until the server has ACK'd everything we sent. The
// close() drain timeout is 5s by default but we want a
// close() drain timeout is 60s by default but we want a
// tighter assert that the precondition really holds.
long deadline = System.currentTimeMillis() + 3_000L;
while (System.currentTimeMillis() < deadline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class CloseDrainTest {
@Test
public void testCloseBlocksUntilAckArrives() throws Exception {
// Server delays every ACK by 800ms. With the default
// close_flush_timeout_millis=5000, close() must wait for that ACK
// close_flush_timeout_millis=60000, close() must wait for that ACK
// before returning. Pre-fix close() returned within milliseconds.
int port = TestPorts.findUnusedPort();
long ackDelayMs = 800;
Expand Down Expand Up @@ -105,16 +105,73 @@ public void testCloseFastWhenTimeoutIsZero() throws Exception {
}
}

@Test
public void testDrainBlocksUntilAckArrives() throws Exception {
int port = TestPorts.findUnusedPort();
long ackDelayMs = 800;
DelayingAckHandler handler = new DelayingAckHandler(ackDelayMs);
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
server.start();
Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS));

String cfg = "ws::addr=localhost:" + port
+ ";close_flush_timeout_millis=0;";
long elapsedMs;
try (Sender sender = Sender.fromConfig(cfg)) {
sender.table("foo").longColumn("v", 1L).atNow();
long t0 = System.nanoTime();
sender.drain(5_000);
elapsedMs = (System.nanoTime() - t0) / 1_000_000;
}
Assert.assertTrue(
"drain() took only " + elapsedMs + "ms — did not wait for ACK",
elapsedMs >= ackDelayMs / 2);
}
}

@Test
public void testDrainTimesOutWhenAcksNeverArrive() throws Exception {
int port = TestPorts.findUnusedPort();
long timeoutMs = 500;
SilentHandler handler = new SilentHandler();
try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) {
server.start();
Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS));

String cfg = "ws::addr=localhost:" + port
+ ";close_flush_timeout_millis=0;";
long elapsedMs;
Sender sender = Sender.fromConfig(cfg);
try {
sender.table("foo").longColumn("v", 1L).atNow();
long t0 = System.nanoTime();
try {
sender.drain(timeoutMs);
Assert.fail("drain() should have thrown a timeout error");
} catch (LineSenderException e) {
Assert.assertTrue("expected drain-timeout message, got: " + e.getMessage(),
e.getMessage().contains("drain() timed out"));
}
elapsedMs = (System.nanoTime() - t0) / 1_000_000;
} finally {
sender.close();
}
Assert.assertTrue("drain() returned too early: " + elapsedMs + "ms",
elapsedMs >= timeoutMs);
Assert.assertTrue("drain() exceeded the bounded timeout by too much: " + elapsedMs + "ms",
elapsedMs < timeoutMs * 4);
}
}

@Test
public void testCloseFastWhenTimeoutIsMinusOne() throws Exception {
// Documented contract: close_flush_timeout_millis=-1 opts out of the
// drain (fast close), same as 0. See LineSenderBuilder#closeFlushTimeoutMillis
// Javadoc — "Set to 0 or -1 to opt out — close() will not wait at all".
//
// Currently fails because -1 collides with the PARAMETER_NOT_SET_EXPLICITLY
// sentinel in LineSenderBuilder, so the build path silently substitutes
// DEFAULT_CLOSE_FLUSH_TIMEOUT_MILLIS (5000ms) and close() blocks for the
// full ACK delay instead of returning fast.
// Regression guard: -1 must not collide with LineSenderBuilder's
// "parameter not set" sentinel and silently substitute the default
// close flush timeout.
int port = TestPorts.findUnusedPort();
long ackDelayMs = 1500;
DelayingAckHandler handler = new DelayingAckHandler(ackDelayMs);
Expand Down
Loading