From 1aa65fc704769102fa1f6bb57af7a5acd39f0c63 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Thu, 21 May 2026 14:18:26 +0200 Subject: [PATCH] chore(qwp): drain method for QWP Sender allow to drain with per call-site timeout without closing the the Sender --- .../main/java/io/questdb/client/Sender.java | 22 +++++- .../qwp/client/QwpWebSocketSender.java | 37 +++++++++- .../qwp/client/CleanShutdownNoReplayTest.java | 4 +- .../cutlass/qwp/client/CloseDrainTest.java | 67 +++++++++++++++++-- 4 files changed, 118 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/io/questdb/client/Sender.java b/core/src/main/java/io/questdb/client/Sender.java index 990afb11..85912ab5 100644 --- a/core/src/main/java/io/questdb/client/Sender.java +++ b/core/src/main/java/io/questdb/client/Sender.java @@ -474,6 +474,22 @@ default Sender floatColumn(CharSequence name, float value) { */ void flush(); + /** + * Force flushing internal buffers to the server, then wait until the server + * acknowledges everything published so far, or until {@code timeoutMillis} + * elapses. + *

+ * This acknowledgement drain is currently supported only by the WebSocket + * transport. It is useful for call sites that need an explicit durability + * checkpoint without closing the sender. The close-time drain budget is + * configured separately with {@link LineSenderBuilder#closeFlushTimeoutMillis(long)}. + * + * @param timeoutMillis upper bound on the acknowledgement wait + */ + default void drain(long timeoutMillis) { + throw new LineSenderException("drain(timeoutMillis) is only supported for WebSocket transport"); + } + /** * Add a GEOHASH column value from pre-packed bits and an explicit bit precision. *

@@ -846,10 +862,10 @@ final class LineSenderBuilder { private static final int DEFAULT_AUTO_FLUSH_INTERVAL_MILLIS = 1_000; private static final int DEFAULT_AUTO_FLUSH_ROWS = 75_000; private static final int DEFAULT_BUFFER_CAPACITY = 64 * 1024; - // Default close() drain timeout: block up to 5s waiting for the + // Default close() drain timeout: block up to 60s waiting for the // server to ACK everything published into the engine before // shutting down the I/O loop. - private static final long DEFAULT_CLOSE_FLUSH_TIMEOUT_MILLIS = 5_000L; + private static final long DEFAULT_CLOSE_FLUSH_TIMEOUT_MILLIS = QwpWebSocketSender.DEFAULT_CLOSE_FLUSH_TIMEOUT_MS; private static final int DEFAULT_HTTP_PORT = 9000; private static final int DEFAULT_HTTP_TIMEOUT = 30_000; private static final int DEFAULT_MAXIMUM_BUFFER_CAPACITY = 100 * 1024 * 1024; @@ -1548,7 +1564,7 @@ public Sender build() { * close() drain timeout in milliseconds. The sender's {@code close()} * method blocks up to this many millis waiting for the server to ACK * every batch already published into the engine before shutting down - * the I/O loop. Default {@code 5000}. + * the I/O loop. Default {@code 60000}. *

* Set to {@code 0} or {@code -1} to opt out — close() will not wait * at all (fast close). Pending data is then lost in memory mode and diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index fbf66d47..962109a5 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -126,6 +126,7 @@ public class QwpWebSocketSender implements Sender { public static final int DEFAULT_AUTO_FLUSH_BYTES = 8 * 1024 * 1024; public static final long DEFAULT_AUTO_FLUSH_INTERVAL_NANOS = 100_000_000L; // 100ms public static final int DEFAULT_AUTO_FLUSH_ROWS = 1_000; + public static final long DEFAULT_CLOSE_FLUSH_TIMEOUT_MS = 60_000L; public static final int DEFAULT_MAX_SCHEMAS_PER_CONNECTION = 65_535; private static final int DEFAULT_BUFFER_SIZE = 8192; private static final int DEFAULT_MICROBATCH_BUFFER_SIZE = 1024 * 1024; // 1MB @@ -165,7 +166,7 @@ public class QwpWebSocketSender implements Sender { // close() drain timeout in millis. Default applied at construction. // 0 or -1 means "fast close" (skip the drain); otherwise close blocks // up to this many millis for ackedFsn to catch up to publishedFsn. - private long closeFlushTimeoutMillis = 5_000L; + private long closeFlushTimeoutMillis = DEFAULT_CLOSE_FLUSH_TIMEOUT_MS; private volatile boolean closed; private boolean connected; private SenderConnectionDispatcher connectionDispatcher; @@ -382,7 +383,7 @@ public static QwpWebSocketSender connect( ) { return connect(host, port, tlsConfig, autoFlushRows, autoFlushBytes, autoFlushIntervalNanos, authorizationHeader, maxSchemasPerConnection, - requestDurableAck, cursorEngine, 5_000L); + requestDurableAck, cursorEngine, DEFAULT_CLOSE_FLUSH_TIMEOUT_MS); } /** @@ -802,6 +803,38 @@ public boolean awaitAckedFsn(long targetFsn, long timeoutMillis) { return true; } + /** + * Flushes all buffered rows into the cursor engine and blocks until the + * server has ACKed everything published so far, or until + * {@code timeoutMillis} elapses. + *

+ * Unlike {@link #close()}, this call does not shut the sender down and does + * not use {@code close_flush_timeout_millis}; the caller supplies the wait + * budget for this specific drain point. {@code timeoutMillis <= 0} performs + * a non-blocking check and throws if the published FSN is not already ACKed. + * + * @param timeoutMillis upper bound on the ACK wait + * @throws LineSenderException if the sender is closed, a row is still in + * progress, a terminal WebSocket failure is + * observed, or the timeout elapses before all + * published batches are ACKed + */ + @Override + public void drain(long timeoutMillis) { + long target = flushAndGetSequence(); + if (!awaitAckedFsn(target, timeoutMillis)) { + long acked = getAckedFsn(); + LOG.warn("drain() timed out after {}ms [target={} acked={}], pending data is still in flight", + timeoutMillis, target, acked); + throw new LineSenderException("drain() timed out after ") + .put(timeoutMillis).put(" ms [publishedFsn=") + .put(target).put(", ackedFsn=").put(acked) + .put("] - server did not acknowledge ") + .put(target - acked) + .put(" pending batches"); + } + } + /** * Adds a BINARY column value to the current row. The bytes are written * verbatim with no encoding or transformation. A {@code null} array diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java index df989d66..4ccc5dff 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java @@ -71,7 +71,7 @@ public void tearDown() { @Test public void testFullyAckedActiveDoesNotReplayAfterCleanRestart() throws Exception { // Phase 1: server ACKs every frame. Sender writes a few rows, - // flushes, then close() blocks for the default 5s drain — by the + // flushes, then close() blocks for the default drain — by the // time close returns, every frame has been ACK'd. int port1 = TestPorts.findUnusedPort(); AckHandler ack1 = new AckHandler(); @@ -87,7 +87,7 @@ public void testFullyAckedActiveDoesNotReplayAfterCleanRestart() throws Exceptio sender.flush(); } // Wait until the server has ACK'd everything we sent. The - // close() drain timeout is 5s by default but we want a + // close() drain timeout is 60s by default but we want a // tighter assert that the precondition really holds. long deadline = System.currentTimeMillis() + 3_000L; while (System.currentTimeMillis() < deadline diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java index 13168f60..d8e20617 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java @@ -52,7 +52,7 @@ public class CloseDrainTest { @Test public void testCloseBlocksUntilAckArrives() throws Exception { // Server delays every ACK by 800ms. With the default - // close_flush_timeout_millis=5000, close() must wait for that ACK + // close_flush_timeout_millis=60000, close() must wait for that ACK // before returning. Pre-fix close() returned within milliseconds. int port = TestPorts.findUnusedPort(); long ackDelayMs = 800; @@ -105,16 +105,73 @@ public void testCloseFastWhenTimeoutIsZero() throws Exception { } } + @Test + public void testDrainBlocksUntilAckArrives() throws Exception { + int port = TestPorts.findUnusedPort(); + long ackDelayMs = 800; + DelayingAckHandler handler = new DelayingAckHandler(ackDelayMs); + try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + String cfg = "ws::addr=localhost:" + port + + ";close_flush_timeout_millis=0;"; + long elapsedMs; + try (Sender sender = Sender.fromConfig(cfg)) { + sender.table("foo").longColumn("v", 1L).atNow(); + long t0 = System.nanoTime(); + sender.drain(5_000); + elapsedMs = (System.nanoTime() - t0) / 1_000_000; + } + Assert.assertTrue( + "drain() took only " + elapsedMs + "ms — did not wait for ACK", + elapsedMs >= ackDelayMs / 2); + } + } + + @Test + public void testDrainTimesOutWhenAcksNeverArrive() throws Exception { + int port = TestPorts.findUnusedPort(); + long timeoutMs = 500; + SilentHandler handler = new SilentHandler(); + try (TestWebSocketServer server = new TestWebSocketServer(port, handler)) { + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + String cfg = "ws::addr=localhost:" + port + + ";close_flush_timeout_millis=0;"; + long elapsedMs; + Sender sender = Sender.fromConfig(cfg); + try { + sender.table("foo").longColumn("v", 1L).atNow(); + long t0 = System.nanoTime(); + try { + sender.drain(timeoutMs); + Assert.fail("drain() should have thrown a timeout error"); + } catch (LineSenderException e) { + Assert.assertTrue("expected drain-timeout message, got: " + e.getMessage(), + e.getMessage().contains("drain() timed out")); + } + elapsedMs = (System.nanoTime() - t0) / 1_000_000; + } finally { + sender.close(); + } + Assert.assertTrue("drain() returned too early: " + elapsedMs + "ms", + elapsedMs >= timeoutMs); + Assert.assertTrue("drain() exceeded the bounded timeout by too much: " + elapsedMs + "ms", + elapsedMs < timeoutMs * 4); + } + } + @Test public void testCloseFastWhenTimeoutIsMinusOne() throws Exception { // Documented contract: close_flush_timeout_millis=-1 opts out of the // drain (fast close), same as 0. See LineSenderBuilder#closeFlushTimeoutMillis // Javadoc — "Set to 0 or -1 to opt out — close() will not wait at all". // - // Currently fails because -1 collides with the PARAMETER_NOT_SET_EXPLICITLY - // sentinel in LineSenderBuilder, so the build path silently substitutes - // DEFAULT_CLOSE_FLUSH_TIMEOUT_MILLIS (5000ms) and close() blocks for the - // full ACK delay instead of returning fast. + // Regression guard: -1 must not collide with LineSenderBuilder's + // "parameter not set" sentinel and silently substitute the default + // close flush timeout. int port = TestPorts.findUnusedPort(); long ackDelayMs = 1500; DelayingAckHandler handler = new DelayingAckHandler(ackDelayMs);