From e6a1549c57f8ff0a2e340e14cfabadf45f50311e Mon Sep 17 00:00:00 2001 From: bluestreak Date: Mon, 25 May 2026 03:06:21 +0100 Subject: [PATCH 1/6] wip --- .../main/java/io/questdb/client/Sender.java | 32 ++++ .../http/client/WebSocketSendBuffer.java | 5 + .../qwp/client/NativeBufferWriter.java | 5 + .../cutlass/qwp/client/QwpBufferWriter.java | 8 + .../qwp/client/QwpWebSocketEncoder.java | 8 + .../qwp/client/QwpWebSocketSender.java | 172 ++++++++++++++---- .../client/SegmentedNativeBufferWriter.java | 8 + .../cutlass/qwp/protocol/QwpConstants.java | 5 + 8 files changed, 203 insertions(+), 40 deletions(-) diff --git a/core/src/main/java/io/questdb/client/Sender.java b/core/src/main/java/io/questdb/client/Sender.java index 896113b0..b1c0677f 100644 --- a/core/src/main/java/io/questdb/client/Sender.java +++ b/core/src/main/java/io/questdb/client/Sender.java @@ -1074,6 +1074,7 @@ public int getTimeout() { private long reconnectMaxDurationMillis = PARAMETER_NOT_SET_EXPLICITLY; private boolean requestDurableAck; private int retryTimeoutMillis = PARAMETER_NOT_SET_EXPLICITLY; + private boolean transactional; private String senderId = DEFAULT_SENDER_ID; // Per-append deadline for SF appendBlocking spin-then-throw. Used to // be a hardcoded 30s constant; expose so tight-SLA users can lower @@ -1531,6 +1532,7 @@ public Sender build() { // closing the engine alone would leak the I/O thread, // dispatcher daemon, drainer pool, microbatch buffers and // WebSocketClient inside the abandoned `connected`. + connected.setTransactional(transactional); try { // Once the foreground sender is up, dispatch drainers // for any sibling orphan slots. Scan AFTER we acquire @@ -2402,6 +2404,24 @@ public LineSenderBuilder requestDurableAck(boolean enabled) { return this; } + /** + * Enables transactional mode. Auto-flush sends data to the server + * with {@code FLAG_DEFER_COMMIT}; only an explicit {@code flush()} + * triggers the server-side WAL commit. This allows accumulating + * datasets larger than the server's recv buffer while committing + * atomically per table. + * + * @param enabled true to enable transactional mode + * @return this instance for method chaining + */ + public LineSenderBuilder transactional(boolean enabled) { + if (protocol != PARAMETER_NOT_SET_EXPLICITLY && protocol != PROTOCOL_WEBSOCKET) { + throw new LineSenderException("transactional is only supported for WebSocket transport"); + } + this.transactional = enabled; + return this; + } + /** * Configures the maximum time the Sender will spend retrying upon receiving a recoverable error from the server. *
@@ -3122,6 +3142,18 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) { } else { throw new LineSenderException("invalid request_durable_ack [value=").put(sink).put(", allowed-values=[on, off]]"); } + } else if (Chars.equals("transaction", sink)) { + if (protocol != PROTOCOL_WEBSOCKET) { + throw new LineSenderException("transaction is only supported for WebSocket transport"); + } + pos = getValue(configurationString, pos, sink, "transaction"); + if (Chars.equalsIgnoreCase("on", sink)) { + transactional(true); + } else if (Chars.equalsIgnoreCase("off", sink)) { + transactional(false); + } else { + throw new LineSenderException("invalid transaction [value=").put(sink).put(", allowed-values=[on, off]]"); + } } else if (Chars.equals("max_schemas_per_connection", sink)) { if (protocol != PROTOCOL_WEBSOCKET) { throw new LineSenderException("max_schemas_per_connection is only supported for WebSocket transport"); diff --git a/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java b/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java index b9598c78..37fe03a6 100644 --- a/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java +++ b/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java @@ -225,6 +225,11 @@ public int getWritePos() { return writePos; } + @Override + public void patchByte(int offset, byte value) { + Unsafe.getUnsafe().putByte(bufPtr + offset, value); + } + /** * Patches an int value at the specified offset. */ diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java index add22fb0..aad95f3c 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java @@ -138,6 +138,11 @@ public int getWritableBytes() { return capacity - position; } + @Override + public void patchByte(int offset, byte value) { + Unsafe.getUnsafe().putByte(bufferPtr + offset, value); + } + /** * Patches an int value at the specified offset. * Used for updating length fields after writing content. diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpBufferWriter.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpBufferWriter.java index d3baf938..7c2c7d2c 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpBufferWriter.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpBufferWriter.java @@ -86,6 +86,14 @@ public interface QwpBufferWriter extends ArrayBufferAppender { */ int getWritableBytes(); + /** + * Patches a byte value at the specified offset in the buffer. + * + * @param offset the byte offset from buffer start + * @param value the byte value to write + */ + void patchByte(int offset, byte value); + /** * Patches an int value at the specified offset in the buffer. *

diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketEncoder.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketEncoder.java index 552d961f..f4b13733 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketEncoder.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketEncoder.java @@ -126,6 +126,14 @@ public boolean isGorillaEnabled() { return (flags & FLAG_GORILLA) != 0; } + public void setDeferCommit(boolean defer) { + if (defer) { + flags |= FLAG_DEFER_COMMIT; + } else { + flags &= ~FLAG_DEFER_COMMIT; + } + } + public void setGorillaEnabled(boolean enabled) { if (enabled) { flags |= FLAG_GORILLA; diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index e459b066..ed96b3e8 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -219,6 +219,7 @@ public class QwpWebSocketSender implements Sender { private int errorInboxCapacity = SenderErrorDispatcher.DEFAULT_CAPACITY; private long firstPendingRowTimeNanos; private boolean gorillaEnabled = true; + private boolean hasDeferredMessages; // Stickys true once any successful connect has happened. Drives the // CONNECTED-vs-RECONNECTED-vs-FAILED_OVER classification at the success // point in buildAndConnect. @@ -260,6 +261,10 @@ public class QwpWebSocketSender implements Sender { // beginRound(true) call. roundSeq=1 is the first round; CONNECTED in the // first round indicates the initial connect. private long roundSeq; + // When true, auto-flush sends messages with FLAG_DEFER_COMMIT and only + // explicit flush() triggers the server-side commit. Enables accumulating + // arbitrarily large datasets that exceed the server's recv buffer. + private boolean transactional; // Server-advertised hard cap on QWP ingest payload bytes, captured from // X-QWP-Max-Batch-Size on each successful handshake. 0 when the server // did not advertise the header (older builds); the sender then falls back @@ -974,7 +979,10 @@ public void close() { // rows -> mmap'd / malloc'd ring). After this, the // cursor engine's publishedFsn reflects the final // target the I/O loop must drive ackedFsn up to. - flushPendingRows(); + flushPendingRows(false); + if (hasDeferredMessages) { + sendCommitMessage(); + } if (activeBuffer != null && activeBuffer.hasData()) { sealAndSwapBuffer(); } @@ -1370,7 +1378,10 @@ public long flushAndGetSequence() { // sealAndSwapBuffer, so by the time we reach here every encoded // batch is durable on its mmap'd segment. No processingCount to // drain, no awaitPendingAcks. Just surface any I/O thread error. - flushPendingRows(); + flushPendingRows(false); + if (hasDeferredMessages) { + sendCommitMessage(); + } if (activeBuffer != null && activeBuffer.hasData()) { sealAndSwapBuffer(); } @@ -2039,6 +2050,10 @@ public void setGorillaEnabled(boolean enabled) { this.encoder.setGorillaEnabled(enabled); } + public void setTransactional(boolean transactional) { + this.transactional = transactional; + } + /** * Register an async observer for ack-watermark advances. May be called * either before or after {@code connect()} — post-connect changes @@ -2858,9 +2873,18 @@ private void ensureNoInProgressRow() { /** * Flushes pending rows by encoding and sending them. - * All non-empty tables are encoded into a single QWP v1 message and sent as one WebSocket frame. + * When all tables fit in a single message, the encoder produces one + * WebSocket frame. When the encoded size exceeds {@code serverMaxBatchSize}, + * the method splits tables across multiple messages using + * {@code FLAG_DEFER_COMMIT}: all but the last message carry the flag so + * the server appends rows without committing, and the final message + * triggers the commit. + * + * @param deferCommit when true, the message carries FLAG_DEFER_COMMIT + * so the server appends rows but does not commit. + * Used by auto-flush in transactional mode. */ - private void flushPendingRows() { + private void flushPendingRows(boolean deferCommit) { if (pendingRowCount <= 0) { return; } @@ -2880,7 +2904,7 @@ private void flushPendingRows() { } if (LOG.isDebugEnabled()) { - LOG.debug("Flushing pending rows [count={}, tables={}]", pendingRowCount, tableCount); + LOG.debug("Flushing pending rows [count={}, tables={}, defer={}]", pendingRowCount, tableCount, deferCommit); } ensureActiveBufferReady(); @@ -2894,6 +2918,7 @@ private void flushPendingRows() { // self-sufficient frames there's no encode-vs-reconnect race // to defend against: the bytes are valid against any server. int batchMaxSchemaId = maxSentSchemaId; + encoder.setDeferCommit(deferCommit); encoder.beginMessage(tableCount, globalSymbolDictionary, /*confirmedMaxId=*/ -1, currentBatchMaxSymbolId); for (int i = 0, n = keys.size(); i < n; i++) { @@ -2925,39 +2950,9 @@ private void flushPendingRows() { int messageSize = encoder.finishMessage(); QwpBufferWriter buffer = encoder.getBuffer(); - // Defensive flush-time cap check: the per-row guard in sendRow() - // catches individual oversize rows, but schema and dict-delta - // bytes the encoder adds at message-build time can push a batch - // of legitimately-sized rows above the wire cap. Without this - // check the frame would be sent and the server would close the - // connection with ws-close[1009 Message Too Big], surfacing the - // failure asynchronously on a later op rather than from this - // flush() call. Reset all pending state so the sender stays - // usable -- the caller's pending rows are dropped; they must - // re-batch with fewer rows per flush. close()'s drain path - // also relies on this being a clean reset to avoid re-throwing. if (serverMaxBatchSize > 0 && messageSize > serverMaxBatchSize) { - int droppedRows = pendingRowCount; - for (int i = 0, n = keys.size(); i < n; i++) { - CharSequence tableName = keys.getQuick(i); - if (tableName == null) { - continue; - } - QwpTableBuffer tableBuffer = tableBuffers.get(tableName); - if (tableBuffer == null || tableBuffer.getRowCount() == 0) { - continue; - } - tableBuffer.reset(); - } - currentBatchMaxSymbolId = -1; - pendingBytes = 0; - currentTableBufferSnapshotBytes = 0; - pendingRowCount = 0; - firstPendingRowTimeNanos = 0; - throw new LineSenderException("batch too large for server batch cap") - .put(" [messageSize=").put(messageSize) - .put(", serverMaxBatchSize=").put(serverMaxBatchSize) - .put(", droppedRows=").put(droppedRows).put(']'); + flushPendingRowsSplit(keys, batchMaxSchemaId, deferCommit); + return; } activeBuffer.ensureCapacity(messageSize); @@ -2965,11 +2960,89 @@ private void flushPendingRows() { activeBuffer.incrementRowCount(); sealAndSwapBuffer(); + hasDeferredMessages = deferCommit; + // Update sent state only after successful enqueue. // If sealAndSwapBuffer() threw, these remain unchanged so the // next batch's delta dictionary will correctly re-include the // symbols and schema that the server never received. maxSentSchemaId = batchMaxSchemaId; + resetTableBuffersAfterFlush(keys); + } + + /** + * Splitting path: the full batch exceeds serverMaxBatchSize, so + * flushPendingRows() delegates here. Each non-empty table gets its + * own message. All messages except the last carry FLAG_DEFER_COMMIT + * so the server appends rows without committing until the final + * message arrives. + * + * @param deferCommit when true, ALL messages (including the last) + * carry FLAG_DEFER_COMMIT. When false, only the + * last message omits the flag. + */ + private void flushPendingRowsSplit(ObjList keys, int batchMaxSchemaId, boolean deferCommit) { + if (LOG.isDebugEnabled()) { + LOG.debug("Splitting flush across multiple messages [serverMaxBatchSize={}, defer={}]", serverMaxBatchSize, deferCommit); + } + + // Collect non-empty table indices so we know which is last. + int nonEmptyCount = 0; + for (int i = 0, n = keys.size(); i < n; i++) { + CharSequence tableName = keys.getQuick(i); + if (tableName == null) { + continue; + } + QwpTableBuffer tableBuffer = tableBuffers.get(tableName); + if (tableBuffer != null && tableBuffer.getRowCount() > 0) { + nonEmptyCount++; + } + } + + int sent = 0; + for (int i = 0, n = keys.size(); i < n; i++) { + CharSequence tableName = keys.getQuick(i); + if (tableName == null) { + continue; + } + QwpTableBuffer tableBuffer = tableBuffers.get(tableName); + if (tableBuffer == null || tableBuffer.getRowCount() == 0) { + continue; + } + + sent++; + boolean isLast = (sent == nonEmptyCount); + boolean deferThis = deferCommit || !isLast; + + encoder.setDeferCommit(deferThis); + encoder.beginMessage(1, globalSymbolDictionary, + /*confirmedMaxId=*/ -1, currentBatchMaxSymbolId); + encoder.addTable(tableBuffer, /*useSchemaRef=*/ false); + int messageSize = encoder.finishMessage(); + QwpBufferWriter buffer = encoder.getBuffer(); + + if (messageSize > serverMaxBatchSize) { + resetTableBuffersAfterFlush(keys); + throw new LineSenderException("single table batch too large for server batch cap") + .put(" [table=").put(tableName) + .put(", messageSize=").put(messageSize) + .put(", serverMaxBatchSize=").put(serverMaxBatchSize).put(']'); + } + + ensureActiveBufferReady(); + activeBuffer.ensureCapacity(messageSize); + activeBuffer.write(buffer.getBufferPtr(), messageSize); + activeBuffer.incrementRowCount(); + sealAndSwapBuffer(); + } + + encoder.setDeferCommit(false); + hasDeferredMessages = deferCommit; + maxSentSchemaId = batchMaxSchemaId; + resetTableBuffersAfterFlush(keys); + } + + private void resetTableBuffersAfterFlush(ObjList keys) { for (int i = 0, n = keys.size(); i < n; i++) { CharSequence tableName = keys.getQuick(i); if (tableName == null) { @@ -2982,14 +3055,33 @@ private void flushPendingRows() { tableBuffer.reset(); } currentBatchMaxSymbolId = -1; - - // Reset pending count pendingBytes = 0; currentTableBufferSnapshotBytes = 0; pendingRowCount = 0; firstPendingRowTimeNanos = 0; } + /** + * Sends an empty QWP message without FLAG_DEFER_COMMIT to trigger + * the server-side commit of all previously deferred rows. + */ + private void sendCommitMessage() { + if (LOG.isDebugEnabled()) { + LOG.debug("Sending commit message for deferred batch"); + } + encoder.setDeferCommit(false); + encoder.beginMessage(0, globalSymbolDictionary, + /*confirmedMaxId=*/ -1, currentBatchMaxSymbolId); + int messageSize = encoder.finishMessage(); + QwpBufferWriter buffer = encoder.getBuffer(); + ensureActiveBufferReady(); + activeBuffer.ensureCapacity(messageSize); + activeBuffer.write(buffer.getBufferPtr(), messageSize); + activeBuffer.incrementRowCount(); + sealAndSwapBuffer(); + hasDeferredMessages = false; + } + private void resetSchemaStateForNewConnection() { maxSentSchemaId = -1; nextSchemaId = 0; @@ -3134,7 +3226,7 @@ private void sendRow() { currentTableBufferSnapshotBytes = bufferedNow; if (shouldAutoFlush()) { - flushPendingRows(); + flushPendingRows(transactional); } } diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/SegmentedNativeBufferWriter.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/SegmentedNativeBufferWriter.java index 458908f8..6ef10a4e 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/SegmentedNativeBufferWriter.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/SegmentedNativeBufferWriter.java @@ -93,6 +93,14 @@ public int getWritableBytes() { return currentChunk.getWritableBytes(); } + @Override + public void patchByte(int offset, byte value) { + if (offset < flushedBytes || offset + 1 > flushedBytes + currentChunk.getPosition()) { + throw new UnsupportedOperationException("cannot patch flushed segment data"); + } + currentChunk.patchByte((int) (offset - flushedBytes), value); + } + @Override public void patchInt(int offset, int value) { if (offset < flushedBytes || offset + Integer.BYTES > flushedBytes + currentChunk.getPosition()) { diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/protocol/QwpConstants.java b/core/src/main/java/io/questdb/client/cutlass/qwp/protocol/QwpConstants.java index c23b8045..44e3305a 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/protocol/QwpConstants.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/protocol/QwpConstants.java @@ -36,6 +36,11 @@ public final class QwpConstants { * Client identifier sent in the X-QWP-Client-Id upgrade header. */ public static final String CLIENT_ID = "java/1.0.2"; + /** + * Flag bit: defer WAL commit. The server appends rows to WAL writers + * but skips the commit until a subsequent message without this flag. + */ + public static final byte FLAG_DEFER_COMMIT = 0x01; /** * Flag bit: Delta symbol dictionary encoding enabled. * When set, symbol columns use global IDs and send only new dictionary entries. From 1704ad60eebf0c2f42ddb7c0d9c874fa011dc042 Mon Sep 17 00:00:00 2001 From: bluestreak Date: Mon, 25 May 2026 03:07:59 +0100 Subject: [PATCH 2/6] wip --- .../cutlass/qwp/client/QwpColumnWriter.java | 2 +- .../qwp/client/CleanShutdownNoReplayTest.java | 2 +- .../cutlass/qwp/client/CloseDrainTest.java | 2 +- .../cutlass/qwp/client/QwpUdpSenderTest.java | 2 +- .../qwp/client/QwpWebSocketSenderTest.java | 6 +++--- .../qwp/client/RecoveryReplayTest.java | 2 +- .../sf/BackgroundDrainerEndToEndTest.java | 2 +- .../qwp/client/sf/SfFromConfigTest.java | 2 +- design/qwp-cursor-error-api.md | 21 ++++++++++--------- 9 files changed, 21 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnWriter.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnWriter.java index 732062ed..37833b99 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnWriter.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnWriter.java @@ -254,7 +254,7 @@ private void writeSymbolColumn(QwpTableBuffer.ColumnBuffer col, int count, int d long dataAddr = col.getDataAddress(); buffer.putVarint(dictionarySize); for (int i = 0; i < dictionarySize; i++) { - buffer.putString((String) col.getSymbolValue(i)); + buffer.putString(col.getSymbolValue(i)); } for (int i = 0; i < count; i++) { diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java index df989d66..253d4ec8 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java @@ -83,7 +83,7 @@ public void testFullyAckedActiveDoesNotReplayAfterCleanRestart() throws Exceptio + ";sf_dir=" + sfDir + ";"; try (Sender sender = Sender.fromConfig(cfg1)) { for (int i = 0; i < 5; i++) { - sender.table("foo").longColumn("v", (long) i).atNow(); + sender.table("foo").longColumn("v", i).atNow(); sender.flush(); } // Wait until the server has ACK'd everything we sent. The diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java index d836b9c0..91960c21 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java @@ -321,7 +321,7 @@ public void testAsyncCloseDrainSucceedsWhenServerWasUpAllAlong() throws Exceptio + ";close_flush_timeout_millis=3000;"; long t0 = System.nanoTime(); try (Sender sender = Sender.fromConfig(cfg)) { - sender.table("foo").longColumn("v", (long) i).atNow(); + sender.table("foo").longColumn("v", i).atNow(); sender.flush(); } long elapsedMs = (System.nanoTime() - t0) / 1_000_000; diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpUdpSenderTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpUdpSenderTest.java index 90362762..98bbbdeb 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpUdpSenderTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpUdpSenderTest.java @@ -1920,7 +1920,7 @@ public void testNullArgsAfterCloseAllThrowSenderClosed() throws Exception { assertClosedUdp(() -> sender.doubleArray("x", (double[][]) null)); assertClosedUdp(() -> sender.doubleArray("x", (double[][][]) null)); assertClosedUdp(() -> sender.doubleArray("x", (DoubleArray) null)); - assertClosedUdp(() -> sender.geoHashColumn("x", (CharSequence) null)); + assertClosedUdp(() -> sender.geoHashColumn("x", null)); assertClosedUdp(() -> sender.longArray("x", (long[]) null)); assertClosedUdp(() -> sender.longArray("x", (long[][]) null)); assertClosedUdp(() -> sender.longArray("x", (long[][][]) null)); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java index 2a602cee..3cedfdd1 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java @@ -428,7 +428,7 @@ public void testIpv4ColumnStringNullReturnsThis() throws Exception { assertMemoryLeak(() -> { try (QwpWebSocketSender sender = createUnconnectedSender()) { sender.table("t"); - Assert.assertSame(sender, sender.ipv4Column("addr", (CharSequence) null)); + Assert.assertSame(sender, sender.ipv4Column("addr", null)); } }); } @@ -533,8 +533,8 @@ public void testNullArgsAfterCloseAllThrowSenderClosed() throws Exception { assertClosed(() -> sender.doubleArray("x", (double[][]) null)); assertClosed(() -> sender.doubleArray("x", (double[][][]) null)); assertClosed(() -> sender.doubleArray("x", (DoubleArray) null)); - assertClosed(() -> sender.geoHashColumn("x", (CharSequence) null)); - assertClosed(() -> sender.ipv4Column("x", (CharSequence) null)); + assertClosed(() -> sender.geoHashColumn("x", null)); + assertClosed(() -> sender.ipv4Column("x", null)); assertClosed(() -> sender.longArray("x", (long[]) null)); assertClosed(() -> sender.longArray("x", (long[][]) null)); assertClosed(() -> sender.longArray("x", (long[][][]) null)); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/RecoveryReplayTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/RecoveryReplayTest.java index 56c8b4ca..0afacffe 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/RecoveryReplayTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/RecoveryReplayTest.java @@ -89,7 +89,7 @@ public void testRestartReplaysSealedSegmentsAgainstFreshServer() throws Exceptio + ";close_flush_timeout_millis=0;"; try (Sender s1 = Sender.fromConfig(cfg1)) { for (int i = 0; i < 50; i++) { - s1.table("foo").stringColumn("p", pad).longColumn("v", (long) i).atNow(); + s1.table("foo").stringColumn("p", pad).longColumn("v", i).atNow(); s1.flush(); } } diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/BackgroundDrainerEndToEndTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/BackgroundDrainerEndToEndTest.java index aa3b30b6..f8ac17d5 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/BackgroundDrainerEndToEndTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/BackgroundDrainerEndToEndTest.java @@ -85,7 +85,7 @@ public void testDrainerEmptiesOrphanSlotAgainstAckServer() throws Exception { + ";close_flush_timeout_millis=0;"; try (Sender g = Sender.fromConfig(cfg1)) { for (int i = 0; i < 30; i++) { - g.table("foo").longColumn("v", (long) i).atNow(); + g.table("foo").longColumn("v", i).atNow(); g.flush(); } } diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/SfFromConfigTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/SfFromConfigTest.java index 0236f828..bce1e191 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/SfFromConfigTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/SfFromConfigTest.java @@ -106,7 +106,7 @@ public void testSfMaxBytesParsing() throws Exception { try (Sender sender = Sender.fromConfig(config)) { // Write enough data that segments rotate at ~128 KiB boundary. for (int i = 0; i < 50; i++) { - sender.table("foo").longColumn("v", (long) i).atNow(); + sender.table("foo").longColumn("v", i).atNow(); } sender.flush(); } diff --git a/design/qwp-cursor-error-api.md b/design/qwp-cursor-error-api.md index b8371d4c..eae99bc0 100644 --- a/design/qwp-cursor-error-api.md +++ b/design/qwp-cursor-error-api.md @@ -82,16 +82,17 @@ User overrides via builder (`errorPolicy(Category, Policy)` or full `errorPolicy ## `SenderError` (public, immutable) ```java -public final class SenderError { - public final Category category; - public final Policy appliedPolicy; // what the loop actually did - public final int serverStatusByte; // raw byte (0x03/0x05/...); -1 for PROTOCOL_VIOLATION - public final String serverMessage; // ≤1024 UTF-8 from frame, or WS close reason - public final long messageSequence; // server's per-frame seq (mirrors what server logs); -1 for PROTOCOL_VIOLATION - public final long fromFsn; // client-side FSN span — load-bearing for correlation - public final long toFsn; // inclusive - public final String tableName; // best-effort; null if multi-table batch - public final long detectedAtNanos; // System.nanoTime() at I/O thread receipt +/** + * @param appliedPolicy what the loop actually did + * @param serverStatusByte raw byte (0x03/0x05/...); -1 for PROTOCOL_VIOLATION + * @param serverMessage ≤1024 UTF-8 from frame, or WS close reason + * @param messageSequence server's per-frame seq (mirrors what server logs); -1 for PROTOCOL_VIOLATION + * @param fromFsn client-side FSN span — load-bearing for correlation + * @param toFsn inclusive + * @param tableName best-effort; null if multi-table batch + * @param detectedAtNanos System.nanoTime() at I/O thread receipt */ +public record SenderError(Category category, Policy appliedPolicy, int serverStatusByte, String serverMessage, + long messageSequence, long fromFsn, long toFsn, String tableName, long detectedAtNanos) { // accessors only; no mutation } ``` From c7ccde7db6a8b164fe08ea82990008b5bda1960a Mon Sep 17 00:00:00 2001 From: bluestreak Date: Mon, 25 May 2026 03:08:40 +0100 Subject: [PATCH 3/6] wip2 --- .../client/cutlass/qwp/client/QwpWebSocketSender.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index ed96b3e8..4f20933f 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -1965,12 +1965,6 @@ public void reset() { * publishing or reconnect. See {@link SenderConnectionListener} for the * full delivery contract. */ - /** - * Forces the {@code connected} flag without going through the real - * connect handshake. Lets unit tests exercise post-connect code paths - * (auto-flush bookkeeping, batch-size guards, ack tracking) on a - * sender that never opened a socket. Never call from production code. - */ @TestOnly public void setConnectedForTest(boolean connected) { this.connected = connected; From c933267944f11e8a33abd4a1f50f6d39fe0f0a54 Mon Sep 17 00:00:00 2001 From: bluestreak Date: Mon, 25 May 2026 03:11:48 +0100 Subject: [PATCH 4/6] wip2 --- .../qwp/client/CleanShutdownNoReplayTest.java | 2 +- .../cutlass/qwp/client/CloseDrainTest.java | 6 +-- .../qwp/client/QwpWebSocketSenderTest.java | 21 +++----- .../qwp/client/RecoveryReplayTest.java | 54 ++++++------------- .../sf/BackgroundDrainerEndToEndTest.java | 20 +------ 5 files changed, 29 insertions(+), 74 deletions(-) diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java index 253d4ec8..4a32b1f2 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java @@ -113,7 +113,7 @@ public void testFullyAckedActiveDoesNotReplayAfterCleanRestart() throws Exceptio String cfg2 = "ws::addr=localhost:" + port2 + ";sf_dir=" + sfDir + ";"; - try (Sender sender = Sender.fromConfig(cfg2)) { + try (Sender ignored = Sender.fromConfig(cfg2)) { // No new appends — purely observe whether recovery replays // anything. Give the I/O loop ample room to push any // replayed bytes onto the wire. diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java index 91960c21..d4a77713 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java @@ -154,8 +154,7 @@ public void testCloseDrainTimesOutWhenAcksNeverArrive() throws Exception { String cfg = "ws::addr=localhost:" + port + ";close_flush_timeout_millis=" + timeoutMs + ";"; long elapsedMs; - Sender sender = Sender.fromConfig(cfg); - try { + try (Sender sender = Sender.fromConfig(cfg)) { sender.table("foo").longColumn("v", 1L).atNow(); sender.flush(); long t0 = System.nanoTime(); @@ -167,9 +166,8 @@ public void testCloseDrainTimesOutWhenAcksNeverArrive() throws Exception { e.getMessage().contains("drain timed out")); } elapsedMs = (System.nanoTime() - t0) / 1_000_000; - } finally { - sender.close(); // idempotent — closed flag is set on first call } + // idempotent — closed flag is set on first call Assert.assertTrue("close() returned too early: " + elapsedMs + "ms", elapsedMs >= timeoutMs); Assert.assertTrue("close() exceeded the bounded timeout by too much: " + elapsedMs + "ms", diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java index 3cedfdd1..2d61ad9d 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java @@ -344,10 +344,9 @@ public void testFlushAppendFailureDoesNotLeaveMicrobatchBufferInUse() throws Exc // deadline guarantees every appendBlocking() call trips the // backpressure deadline and throws. CursorSendEngine engine = new CursorSendEngine(null, 33, 33, 1L); - QwpWebSocketSender sender = QwpWebSocketSender.connect( + try (QwpWebSocketSender sender = QwpWebSocketSender.connect( "localhost", port, null, Integer.MAX_VALUE, 0, 0L, null, - QwpWebSocketSender.DEFAULT_MAX_SCHEMAS_PER_CONNECTION, false, engine, 0L); - try { + QwpWebSocketSender.DEFAULT_MAX_SCHEMAS_PER_CONNECTION, false, engine, 0L)) { sender.table("t").longColumn("v", 1L).atNow(); try { @@ -364,17 +363,13 @@ public void testFlushAppendFailureDoesNotLeaveMicrobatchBufferInUse() throws Exc + MicrobatchBuffer.stateName(buffer0.getState()) + ", buffer1=" + MicrobatchBuffer.stateName(buffer1.getState()) + "]", buffer0.isInUse() || buffer1.isInUse()); - } finally { - // close() drains pending rows, which appendBlocking still - // rejects because the engine is permanently wedged in this - // test. The bug under test is about microbatch buffer - // state, not about close() being lenient toward residual - // unflushed rows — swallow the predictable rethrow here. - try { - sender.close(); - } catch (LineSenderException ignored) { - } + } catch (LineSenderException ignored) { } + // close() drains pending rows, which appendBlocking still + // rejects because the engine is permanently wedged in this + // test. The bug under test is about microbatch buffer + // state, not about close() being lenient toward residual + // unflushed rows — swallow the predictable rethrow here. } }); } diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/RecoveryReplayTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/RecoveryReplayTest.java index 0afacffe..d94926b8 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/RecoveryReplayTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/RecoveryReplayTest.java @@ -25,6 +25,7 @@ package io.questdb.client.test.cutlass.qwp.client; import io.questdb.client.Sender; +import io.questdb.client.cutlass.qwp.client.sf.cursor.MmapSegment; import io.questdb.client.std.Files; import io.questdb.client.test.cutlass.qwp.websocket.TestWebSocketServer; import org.junit.After; @@ -118,7 +119,7 @@ public void testRestartReplaysSealedSegmentsAgainstFreshServer() throws Exceptio String cfg2 = "ws::addr=localhost:" + port2 + ";sf_dir=" + sfDir + ";"; - try (Sender s2 = Sender.fromConfig(cfg2)) { + try (Sender ignored = Sender.fromConfig(cfg2)) { // No new appends — purely replay. long deadline = System.currentTimeMillis() + 5_000; while (System.currentTimeMillis() < deadline @@ -137,24 +138,6 @@ public void testRestartReplaysSealedSegmentsAgainstFreshServer() throws Exceptio } } - private static int countSegmentFiles(String dir) { - if (!Files.exists(dir)) return 0; - long find = Files.findFirst(dir); - if (find <= 0) return 0; - int n = 0; - try { - int rc = 1; - while (rc > 0) { - String name = Files.utf8ToString(Files.findName(find)); - if (name != null && name.endsWith(".sfa")) n++; - rc = Files.findNext(find); - } - } finally { - Files.findClose(find); - } - return n; - } - /** * Counts only segment files that actually carry frames — opens each * .sfa via the cursor's MmapSegment recovery path and excludes the @@ -173,13 +156,8 @@ private static int countPopulatedSegmentFiles(String dir) { String name = Files.utf8ToString(Files.findName(find)); if (name != null && name.endsWith(".sfa")) { try { - io.questdb.client.cutlass.qwp.client.sf.cursor.MmapSegment seg = - io.questdb.client.cutlass.qwp.client.sf.cursor.MmapSegment - .openExisting(dir + "/" + name); - try { + try (MmapSegment seg = MmapSegment.openExisting(dir + "/" + name)) { if (seg.frameCount() > 0) n++; - } finally { - seg.close(); } } catch (Throwable ignored) { // best-effort @@ -194,9 +172,7 @@ private static int countPopulatedSegmentFiles(String dir) { } private static String repeat(String c, int n) { - StringBuilder sb = new StringBuilder(n); - for (int i = 0; i < n; i++) sb.append(c); - return sb.toString(); + return String.valueOf(c).repeat(Math.max(0, n)); } private static void rmDirRec(String dir) { @@ -220,15 +196,9 @@ private static void rmDirRec(String dir) { Files.remove(dir); } - /** Receives binary frames but never acks. Sender drops them on close. */ - private static class SilentHandler implements TestWebSocketServer.WebSocketServerHandler { - @Override - public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) { - // intentionally empty - } - } - - /** Acks every binary frame and tracks distinct payloads. */ + /** + * Acks every binary frame and tracks distinct payloads. + */ private static class AckHandler implements TestWebSocketServer.WebSocketServerHandler { // Distinct *payload bytes* — each row carries a unique long value // so every frame's bytes differ. Counts unique frames received, @@ -256,4 +226,14 @@ static byte[] buildAck(long seq) { return buf; } } + + /** + * Receives binary frames but never acks. Sender drops them on close. + */ + private static class SilentHandler implements TestWebSocketServer.WebSocketServerHandler { + @Override + public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) { + // intentionally empty + } + } } diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/BackgroundDrainerEndToEndTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/BackgroundDrainerEndToEndTest.java index f8ac17d5..9cd50443 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/BackgroundDrainerEndToEndTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/BackgroundDrainerEndToEndTest.java @@ -106,7 +106,7 @@ public void testDrainerEmptiesOrphanSlotAgainstAckServer() throws Exception { + ";sender_id=primary" + ";drain_orphans=true" + ";max_background_drainers=2;"; - try (Sender foreground = Sender.fromConfig(cfg2)) { + try (Sender ignored = Sender.fromConfig(cfg2)) { // Drainer runs in the background. Wait for the ghost slot // to drain through. 30 distinct rows expected at the ack // server (drainer's contribution; the foreground sender @@ -197,24 +197,6 @@ public void testDrainerLeavesFailedSentinelOnTerminalError() throws Exception { }); } - private static int countSegmentFiles(String dir) { - if (!Files.exists(dir)) return 0; - long find = Files.findFirst(dir); - if (find <= 0) return 0; - int n = 0; - try { - int rc = 1; - while (rc > 0) { - String name = Files.utf8ToString(Files.findName(find)); - if (name != null && name.endsWith(".sfa")) n++; - rc = Files.findNext(find); - } - } finally { - Files.findClose(find); - } - return n; - } - private static void rmDirRec(String dir) { if (!Files.exists(dir)) return; long find = Files.findFirst(dir); From 081d3ae6753df380190df079e10f90331a6866dc Mon Sep 17 00:00:00 2001 From: Vlad Ilyushchenko Date: Tue, 26 May 2026 19:22:22 +0100 Subject: [PATCH 5/6] re-add public setDeferCommit API flushAndGetSequence() now reads the deferCommit field so that explicit flush() calls respect the flag set by the caller. The auto-flush path continues to derive the flag from the transactional field. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../client/cutlass/qwp/client/QwpWebSocketSender.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index 4f20933f..3f98fe43 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -193,6 +193,7 @@ public class QwpWebSocketSender implements Sender { // that walks the ring and sends frames. private CursorSendEngine cursorEngine; private CursorWebSocketSendLoop cursorSendLoop; + private boolean deferCommit; // Orphan-slot drainer pool. Non-null only when the builder requested // drain_orphans=true AND we have a slot path to scan against. Closed // alongside the cursor send loop in close(). @@ -1378,8 +1379,8 @@ public long flushAndGetSequence() { // sealAndSwapBuffer, so by the time we reach here every encoded // batch is durable on its mmap'd segment. No processingCount to // drain, no awaitPendingAcks. Just surface any I/O thread error. - flushPendingRows(false); - if (hasDeferredMessages) { + flushPendingRows(deferCommit); + if (!deferCommit && hasDeferredMessages) { sendCommitMessage(); } if (activeBuffer != null && activeBuffer.hasData()) { @@ -1970,6 +1971,10 @@ public void setConnectedForTest(boolean connected) { this.connected = connected; } + public void setDeferCommit(boolean enabled) { + this.deferCommit = enabled; + } + public void setConnectionListener(SenderConnectionListener listener) { SenderConnectionListener effective = listener != null ? listener : DefaultSenderConnectionListener.INSTANCE; this.connectionListener = effective; From f5589ed3a8a6a639154f80e328106d3b446051bc Mon Sep 17 00:00:00 2001 From: Vlad Ilyushchenko Date: Tue, 26 May 2026 20:20:13 +0100 Subject: [PATCH 6/6] respect deferCommit flag on close close() previously always sent a commit message for deferred batches, which committed data the caller intended to abandon. Align close() with flush() by checking the deferCommit flag before sending the commit message. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../questdb/client/cutlass/qwp/client/QwpWebSocketSender.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index 3f98fe43..7d43d72f 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -980,8 +980,8 @@ public void close() { // rows -> mmap'd / malloc'd ring). After this, the // cursor engine's publishedFsn reflects the final // target the I/O loop must drive ackedFsn up to. - flushPendingRows(false); - if (hasDeferredMessages) { + flushPendingRows(deferCommit); + if (!deferCommit && hasDeferredMessages) { sendCommitMessage(); } if (activeBuffer != null && activeBuffer.hasData()) {