diff --git a/core/src/main/java/io/questdb/client/Sender.java b/core/src/main/java/io/questdb/client/Sender.java
index 896113b0..b1c0677f 100644
--- a/core/src/main/java/io/questdb/client/Sender.java
+++ b/core/src/main/java/io/questdb/client/Sender.java
@@ -1074,6 +1074,7 @@ public int getTimeout() {
private long reconnectMaxDurationMillis = PARAMETER_NOT_SET_EXPLICITLY;
private boolean requestDurableAck;
private int retryTimeoutMillis = PARAMETER_NOT_SET_EXPLICITLY;
+ private boolean transactional;
private String senderId = DEFAULT_SENDER_ID;
// Per-append deadline for SF appendBlocking spin-then-throw. Used to
// be a hardcoded 30s constant; expose so tight-SLA users can lower
@@ -1531,6 +1532,7 @@ public Sender build() {
// closing the engine alone would leak the I/O thread,
// dispatcher daemon, drainer pool, microbatch buffers and
// WebSocketClient inside the abandoned `connected`.
+ connected.setTransactional(transactional);
try {
// Once the foreground sender is up, dispatch drainers
// for any sibling orphan slots. Scan AFTER we acquire
@@ -2402,6 +2404,24 @@ public LineSenderBuilder requestDurableAck(boolean enabled) {
return this;
}
+ /**
+ * Enables transactional mode. Auto-flush sends data to the server
+ * with {@code FLAG_DEFER_COMMIT}; only an explicit {@code flush()}
+ * triggers the server-side WAL commit. This allows accumulating
+ * datasets larger than the server's recv buffer while committing
+ * atomically per table.
+ *
+ * @param enabled true to enable transactional mode
+ * @return this instance for method chaining
+ */
+ public LineSenderBuilder transactional(boolean enabled) {
+ if (protocol != PARAMETER_NOT_SET_EXPLICITLY && protocol != PROTOCOL_WEBSOCKET) {
+ throw new LineSenderException("transactional is only supported for WebSocket transport");
+ }
+ this.transactional = enabled;
+ return this;
+ }
+
/**
* Configures the maximum time the Sender will spend retrying upon receiving a recoverable error from the server.
*
@@ -3122,6 +3142,18 @@ private LineSenderBuilder fromConfig(CharSequence configurationString) {
} else {
throw new LineSenderException("invalid request_durable_ack [value=").put(sink).put(", allowed-values=[on, off]]");
}
+ } else if (Chars.equals("transaction", sink)) {
+ if (protocol != PROTOCOL_WEBSOCKET) {
+ throw new LineSenderException("transaction is only supported for WebSocket transport");
+ }
+ pos = getValue(configurationString, pos, sink, "transaction");
+ if (Chars.equalsIgnoreCase("on", sink)) {
+ transactional(true);
+ } else if (Chars.equalsIgnoreCase("off", sink)) {
+ transactional(false);
+ } else {
+ throw new LineSenderException("invalid transaction [value=").put(sink).put(", allowed-values=[on, off]]");
+ }
} else if (Chars.equals("max_schemas_per_connection", sink)) {
if (protocol != PROTOCOL_WEBSOCKET) {
throw new LineSenderException("max_schemas_per_connection is only supported for WebSocket transport");
diff --git a/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java b/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java
index b9598c78..37fe03a6 100644
--- a/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java
+++ b/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java
@@ -225,6 +225,11 @@ public int getWritePos() {
return writePos;
}
+ @Override
+ public void patchByte(int offset, byte value) {
+ Unsafe.getUnsafe().putByte(bufPtr + offset, value);
+ }
+
/**
* Patches an int value at the specified offset.
*/
diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java
index add22fb0..aad95f3c 100644
--- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java
+++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java
@@ -138,6 +138,11 @@ public int getWritableBytes() {
return capacity - position;
}
+ @Override
+ public void patchByte(int offset, byte value) {
+ Unsafe.getUnsafe().putByte(bufferPtr + offset, value);
+ }
+
/**
* Patches an int value at the specified offset.
* Used for updating length fields after writing content.
diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpBufferWriter.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpBufferWriter.java
index d3baf938..7c2c7d2c 100644
--- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpBufferWriter.java
+++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpBufferWriter.java
@@ -86,6 +86,14 @@ public interface QwpBufferWriter extends ArrayBufferAppender {
*/
int getWritableBytes();
+ /**
+ * Patches a byte value at the specified offset in the buffer.
+ *
+ * @param offset the byte offset from buffer start
+ * @param value the byte value to write
+ */
+ void patchByte(int offset, byte value);
+
/**
* Patches an int value at the specified offset in the buffer.
*
diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnWriter.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnWriter.java
index 732062ed..37833b99 100644
--- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnWriter.java
+++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpColumnWriter.java
@@ -254,7 +254,7 @@ private void writeSymbolColumn(QwpTableBuffer.ColumnBuffer col, int count, int d
long dataAddr = col.getDataAddress();
buffer.putVarint(dictionarySize);
for (int i = 0; i < dictionarySize; i++) {
- buffer.putString((String) col.getSymbolValue(i));
+ buffer.putString(col.getSymbolValue(i));
}
for (int i = 0; i < count; i++) {
diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketEncoder.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketEncoder.java
index 552d961f..f4b13733 100644
--- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketEncoder.java
+++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketEncoder.java
@@ -126,6 +126,14 @@ public boolean isGorillaEnabled() {
return (flags & FLAG_GORILLA) != 0;
}
+ public void setDeferCommit(boolean defer) {
+ if (defer) {
+ flags |= FLAG_DEFER_COMMIT;
+ } else {
+ flags &= ~FLAG_DEFER_COMMIT;
+ }
+ }
+
public void setGorillaEnabled(boolean enabled) {
if (enabled) {
flags |= FLAG_GORILLA;
diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java
index e459b066..7d43d72f 100644
--- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java
+++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java
@@ -193,6 +193,7 @@ public class QwpWebSocketSender implements Sender {
// that walks the ring and sends frames.
private CursorSendEngine cursorEngine;
private CursorWebSocketSendLoop cursorSendLoop;
+ private boolean deferCommit;
// Orphan-slot drainer pool. Non-null only when the builder requested
// drain_orphans=true AND we have a slot path to scan against. Closed
// alongside the cursor send loop in close().
@@ -219,6 +220,7 @@ public class QwpWebSocketSender implements Sender {
private int errorInboxCapacity = SenderErrorDispatcher.DEFAULT_CAPACITY;
private long firstPendingRowTimeNanos;
private boolean gorillaEnabled = true;
+ private boolean hasDeferredMessages;
// Stickys true once any successful connect has happened. Drives the
// CONNECTED-vs-RECONNECTED-vs-FAILED_OVER classification at the success
// point in buildAndConnect.
@@ -260,6 +262,10 @@ public class QwpWebSocketSender implements Sender {
// beginRound(true) call. roundSeq=1 is the first round; CONNECTED in the
// first round indicates the initial connect.
private long roundSeq;
+ // When true, auto-flush sends messages with FLAG_DEFER_COMMIT and only
+ // explicit flush() triggers the server-side commit. Enables accumulating
+ // arbitrarily large datasets that exceed the server's recv buffer.
+ private boolean transactional;
// Server-advertised hard cap on QWP ingest payload bytes, captured from
// X-QWP-Max-Batch-Size on each successful handshake. 0 when the server
// did not advertise the header (older builds); the sender then falls back
@@ -974,7 +980,10 @@ public void close() {
// rows -> mmap'd / malloc'd ring). After this, the
// cursor engine's publishedFsn reflects the final
// target the I/O loop must drive ackedFsn up to.
- flushPendingRows();
+ flushPendingRows(deferCommit);
+ if (!deferCommit && hasDeferredMessages) {
+ sendCommitMessage();
+ }
if (activeBuffer != null && activeBuffer.hasData()) {
sealAndSwapBuffer();
}
@@ -1370,7 +1379,10 @@ public long flushAndGetSequence() {
// sealAndSwapBuffer, so by the time we reach here every encoded
// batch is durable on its mmap'd segment. No processingCount to
// drain, no awaitPendingAcks. Just surface any I/O thread error.
- flushPendingRows();
+ flushPendingRows(deferCommit);
+ if (!deferCommit && hasDeferredMessages) {
+ sendCommitMessage();
+ }
if (activeBuffer != null && activeBuffer.hasData()) {
sealAndSwapBuffer();
}
@@ -1954,17 +1966,15 @@ public void reset() {
* publishing or reconnect. See {@link SenderConnectionListener} for the
* full delivery contract.
*/
- /**
- * Forces the {@code connected} flag without going through the real
- * connect handshake. Lets unit tests exercise post-connect code paths
- * (auto-flush bookkeeping, batch-size guards, ack tracking) on a
- * sender that never opened a socket. Never call from production code.
- */
@TestOnly
public void setConnectedForTest(boolean connected) {
this.connected = connected;
}
+ public void setDeferCommit(boolean enabled) {
+ this.deferCommit = enabled;
+ }
+
public void setConnectionListener(SenderConnectionListener listener) {
SenderConnectionListener effective = listener != null ? listener : DefaultSenderConnectionListener.INSTANCE;
this.connectionListener = effective;
@@ -2039,6 +2049,10 @@ public void setGorillaEnabled(boolean enabled) {
this.encoder.setGorillaEnabled(enabled);
}
+ public void setTransactional(boolean transactional) {
+ this.transactional = transactional;
+ }
+
/**
* Register an async observer for ack-watermark advances. May be called
* either before or after {@code connect()} — post-connect changes
@@ -2858,9 +2872,18 @@ private void ensureNoInProgressRow() {
/**
* Flushes pending rows by encoding and sending them.
- * All non-empty tables are encoded into a single QWP v1 message and sent as one WebSocket frame.
+ * When all tables fit in a single message, the encoder produces one
+ * WebSocket frame. When the encoded size exceeds {@code serverMaxBatchSize},
+ * the method splits tables across multiple messages using
+ * {@code FLAG_DEFER_COMMIT}: all but the last message carry the flag so
+ * the server appends rows without committing, and the final message
+ * triggers the commit.
+ *
+ * @param deferCommit when true, the message carries FLAG_DEFER_COMMIT
+ * so the server appends rows but does not commit.
+ * Used by auto-flush in transactional mode.
*/
- private void flushPendingRows() {
+ private void flushPendingRows(boolean deferCommit) {
if (pendingRowCount <= 0) {
return;
}
@@ -2880,7 +2903,7 @@ private void flushPendingRows() {
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Flushing pending rows [count={}, tables={}]", pendingRowCount, tableCount);
+ LOG.debug("Flushing pending rows [count={}, tables={}, defer={}]", pendingRowCount, tableCount, deferCommit);
}
ensureActiveBufferReady();
@@ -2894,6 +2917,7 @@ private void flushPendingRows() {
// self-sufficient frames there's no encode-vs-reconnect race
// to defend against: the bytes are valid against any server.
int batchMaxSchemaId = maxSentSchemaId;
+ encoder.setDeferCommit(deferCommit);
encoder.beginMessage(tableCount, globalSymbolDictionary,
/*confirmedMaxId=*/ -1, currentBatchMaxSymbolId);
for (int i = 0, n = keys.size(); i < n; i++) {
@@ -2925,39 +2949,9 @@ private void flushPendingRows() {
int messageSize = encoder.finishMessage();
QwpBufferWriter buffer = encoder.getBuffer();
- // Defensive flush-time cap check: the per-row guard in sendRow()
- // catches individual oversize rows, but schema and dict-delta
- // bytes the encoder adds at message-build time can push a batch
- // of legitimately-sized rows above the wire cap. Without this
- // check the frame would be sent and the server would close the
- // connection with ws-close[1009 Message Too Big], surfacing the
- // failure asynchronously on a later op rather than from this
- // flush() call. Reset all pending state so the sender stays
- // usable -- the caller's pending rows are dropped; they must
- // re-batch with fewer rows per flush. close()'s drain path
- // also relies on this being a clean reset to avoid re-throwing.
if (serverMaxBatchSize > 0 && messageSize > serverMaxBatchSize) {
- int droppedRows = pendingRowCount;
- for (int i = 0, n = keys.size(); i < n; i++) {
- CharSequence tableName = keys.getQuick(i);
- if (tableName == null) {
- continue;
- }
- QwpTableBuffer tableBuffer = tableBuffers.get(tableName);
- if (tableBuffer == null || tableBuffer.getRowCount() == 0) {
- continue;
- }
- tableBuffer.reset();
- }
- currentBatchMaxSymbolId = -1;
- pendingBytes = 0;
- currentTableBufferSnapshotBytes = 0;
- pendingRowCount = 0;
- firstPendingRowTimeNanos = 0;
- throw new LineSenderException("batch too large for server batch cap")
- .put(" [messageSize=").put(messageSize)
- .put(", serverMaxBatchSize=").put(serverMaxBatchSize)
- .put(", droppedRows=").put(droppedRows).put(']');
+ flushPendingRowsSplit(keys, batchMaxSchemaId, deferCommit);
+ return;
}
activeBuffer.ensureCapacity(messageSize);
@@ -2965,11 +2959,89 @@ private void flushPendingRows() {
activeBuffer.incrementRowCount();
sealAndSwapBuffer();
+ hasDeferredMessages = deferCommit;
+
// Update sent state only after successful enqueue.
// If sealAndSwapBuffer() threw, these remain unchanged so the
// next batch's delta dictionary will correctly re-include the
// symbols and schema that the server never received.
maxSentSchemaId = batchMaxSchemaId;
+ resetTableBuffersAfterFlush(keys);
+ }
+
+ /**
+ * Splitting path: the full batch exceeds serverMaxBatchSize, so
+ * flushPendingRows() delegates here. Each non-empty table gets its
+ * own message. All messages except the last carry FLAG_DEFER_COMMIT
+ * so the server appends rows without committing until the final
+ * message arrives.
+ *
+ * @param deferCommit when true, ALL messages (including the last)
+ * carry FLAG_DEFER_COMMIT. When false, only the
+ * last message omits the flag.
+ */
+ private void flushPendingRowsSplit(ObjList keys, int batchMaxSchemaId, boolean deferCommit) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Splitting flush across multiple messages [serverMaxBatchSize={}, defer={}]", serverMaxBatchSize, deferCommit);
+ }
+
+ // Collect non-empty table indices so we know which is last.
+ int nonEmptyCount = 0;
+ for (int i = 0, n = keys.size(); i < n; i++) {
+ CharSequence tableName = keys.getQuick(i);
+ if (tableName == null) {
+ continue;
+ }
+ QwpTableBuffer tableBuffer = tableBuffers.get(tableName);
+ if (tableBuffer != null && tableBuffer.getRowCount() > 0) {
+ nonEmptyCount++;
+ }
+ }
+
+ int sent = 0;
+ for (int i = 0, n = keys.size(); i < n; i++) {
+ CharSequence tableName = keys.getQuick(i);
+ if (tableName == null) {
+ continue;
+ }
+ QwpTableBuffer tableBuffer = tableBuffers.get(tableName);
+ if (tableBuffer == null || tableBuffer.getRowCount() == 0) {
+ continue;
+ }
+
+ sent++;
+ boolean isLast = (sent == nonEmptyCount);
+ boolean deferThis = deferCommit || !isLast;
+
+ encoder.setDeferCommit(deferThis);
+ encoder.beginMessage(1, globalSymbolDictionary,
+ /*confirmedMaxId=*/ -1, currentBatchMaxSymbolId);
+ encoder.addTable(tableBuffer, /*useSchemaRef=*/ false);
+ int messageSize = encoder.finishMessage();
+ QwpBufferWriter buffer = encoder.getBuffer();
+
+ if (messageSize > serverMaxBatchSize) {
+ resetTableBuffersAfterFlush(keys);
+ throw new LineSenderException("single table batch too large for server batch cap")
+ .put(" [table=").put(tableName)
+ .put(", messageSize=").put(messageSize)
+ .put(", serverMaxBatchSize=").put(serverMaxBatchSize).put(']');
+ }
+
+ ensureActiveBufferReady();
+ activeBuffer.ensureCapacity(messageSize);
+ activeBuffer.write(buffer.getBufferPtr(), messageSize);
+ activeBuffer.incrementRowCount();
+ sealAndSwapBuffer();
+ }
+
+ encoder.setDeferCommit(false);
+ hasDeferredMessages = deferCommit;
+ maxSentSchemaId = batchMaxSchemaId;
+ resetTableBuffersAfterFlush(keys);
+ }
+
+ private void resetTableBuffersAfterFlush(ObjList keys) {
for (int i = 0, n = keys.size(); i < n; i++) {
CharSequence tableName = keys.getQuick(i);
if (tableName == null) {
@@ -2982,14 +3054,33 @@ private void flushPendingRows() {
tableBuffer.reset();
}
currentBatchMaxSymbolId = -1;
-
- // Reset pending count
pendingBytes = 0;
currentTableBufferSnapshotBytes = 0;
pendingRowCount = 0;
firstPendingRowTimeNanos = 0;
}
+ /**
+ * Sends an empty QWP message without FLAG_DEFER_COMMIT to trigger
+ * the server-side commit of all previously deferred rows.
+ */
+ private void sendCommitMessage() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending commit message for deferred batch");
+ }
+ encoder.setDeferCommit(false);
+ encoder.beginMessage(0, globalSymbolDictionary,
+ /*confirmedMaxId=*/ -1, currentBatchMaxSymbolId);
+ int messageSize = encoder.finishMessage();
+ QwpBufferWriter buffer = encoder.getBuffer();
+ ensureActiveBufferReady();
+ activeBuffer.ensureCapacity(messageSize);
+ activeBuffer.write(buffer.getBufferPtr(), messageSize);
+ activeBuffer.incrementRowCount();
+ sealAndSwapBuffer();
+ hasDeferredMessages = false;
+ }
+
private void resetSchemaStateForNewConnection() {
maxSentSchemaId = -1;
nextSchemaId = 0;
@@ -3134,7 +3225,7 @@ private void sendRow() {
currentTableBufferSnapshotBytes = bufferedNow;
if (shouldAutoFlush()) {
- flushPendingRows();
+ flushPendingRows(transactional);
}
}
diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/SegmentedNativeBufferWriter.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/SegmentedNativeBufferWriter.java
index 458908f8..6ef10a4e 100644
--- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/SegmentedNativeBufferWriter.java
+++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/SegmentedNativeBufferWriter.java
@@ -93,6 +93,14 @@ public int getWritableBytes() {
return currentChunk.getWritableBytes();
}
+ @Override
+ public void patchByte(int offset, byte value) {
+ if (offset < flushedBytes || offset + 1 > flushedBytes + currentChunk.getPosition()) {
+ throw new UnsupportedOperationException("cannot patch flushed segment data");
+ }
+ currentChunk.patchByte((int) (offset - flushedBytes), value);
+ }
+
@Override
public void patchInt(int offset, int value) {
if (offset < flushedBytes || offset + Integer.BYTES > flushedBytes + currentChunk.getPosition()) {
diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/protocol/QwpConstants.java b/core/src/main/java/io/questdb/client/cutlass/qwp/protocol/QwpConstants.java
index c23b8045..44e3305a 100644
--- a/core/src/main/java/io/questdb/client/cutlass/qwp/protocol/QwpConstants.java
+++ b/core/src/main/java/io/questdb/client/cutlass/qwp/protocol/QwpConstants.java
@@ -36,6 +36,11 @@ public final class QwpConstants {
* Client identifier sent in the X-QWP-Client-Id upgrade header.
*/
public static final String CLIENT_ID = "java/1.0.2";
+ /**
+ * Flag bit: defer WAL commit. The server appends rows to WAL writers
+ * but skips the commit until a subsequent message without this flag.
+ */
+ public static final byte FLAG_DEFER_COMMIT = 0x01;
/**
* Flag bit: Delta symbol dictionary encoding enabled.
* When set, symbol columns use global IDs and send only new dictionary entries.
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java
index df989d66..4a32b1f2 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CleanShutdownNoReplayTest.java
@@ -83,7 +83,7 @@ public void testFullyAckedActiveDoesNotReplayAfterCleanRestart() throws Exceptio
+ ";sf_dir=" + sfDir + ";";
try (Sender sender = Sender.fromConfig(cfg1)) {
for (int i = 0; i < 5; i++) {
- sender.table("foo").longColumn("v", (long) i).atNow();
+ sender.table("foo").longColumn("v", i).atNow();
sender.flush();
}
// Wait until the server has ACK'd everything we sent. The
@@ -113,7 +113,7 @@ public void testFullyAckedActiveDoesNotReplayAfterCleanRestart() throws Exceptio
String cfg2 = "ws::addr=localhost:" + port2
+ ";sf_dir=" + sfDir + ";";
- try (Sender sender = Sender.fromConfig(cfg2)) {
+ try (Sender ignored = Sender.fromConfig(cfg2)) {
// No new appends — purely observe whether recovery replays
// anything. Give the I/O loop ample room to push any
// replayed bytes onto the wire.
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java
index d836b9c0..d4a77713 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/CloseDrainTest.java
@@ -154,8 +154,7 @@ public void testCloseDrainTimesOutWhenAcksNeverArrive() throws Exception {
String cfg = "ws::addr=localhost:" + port
+ ";close_flush_timeout_millis=" + timeoutMs + ";";
long elapsedMs;
- Sender sender = Sender.fromConfig(cfg);
- try {
+ try (Sender sender = Sender.fromConfig(cfg)) {
sender.table("foo").longColumn("v", 1L).atNow();
sender.flush();
long t0 = System.nanoTime();
@@ -167,9 +166,8 @@ public void testCloseDrainTimesOutWhenAcksNeverArrive() throws Exception {
e.getMessage().contains("drain timed out"));
}
elapsedMs = (System.nanoTime() - t0) / 1_000_000;
- } finally {
- sender.close(); // idempotent — closed flag is set on first call
}
+ // idempotent — closed flag is set on first call
Assert.assertTrue("close() returned too early: " + elapsedMs + "ms",
elapsedMs >= timeoutMs);
Assert.assertTrue("close() exceeded the bounded timeout by too much: " + elapsedMs + "ms",
@@ -321,7 +319,7 @@ public void testAsyncCloseDrainSucceedsWhenServerWasUpAllAlong() throws Exceptio
+ ";close_flush_timeout_millis=3000;";
long t0 = System.nanoTime();
try (Sender sender = Sender.fromConfig(cfg)) {
- sender.table("foo").longColumn("v", (long) i).atNow();
+ sender.table("foo").longColumn("v", i).atNow();
sender.flush();
}
long elapsedMs = (System.nanoTime() - t0) / 1_000_000;
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpUdpSenderTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpUdpSenderTest.java
index 90362762..98bbbdeb 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpUdpSenderTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpUdpSenderTest.java
@@ -1920,7 +1920,7 @@ public void testNullArgsAfterCloseAllThrowSenderClosed() throws Exception {
assertClosedUdp(() -> sender.doubleArray("x", (double[][]) null));
assertClosedUdp(() -> sender.doubleArray("x", (double[][][]) null));
assertClosedUdp(() -> sender.doubleArray("x", (DoubleArray) null));
- assertClosedUdp(() -> sender.geoHashColumn("x", (CharSequence) null));
+ assertClosedUdp(() -> sender.geoHashColumn("x", null));
assertClosedUdp(() -> sender.longArray("x", (long[]) null));
assertClosedUdp(() -> sender.longArray("x", (long[][]) null));
assertClosedUdp(() -> sender.longArray("x", (long[][][]) null));
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java
index 2a602cee..2d61ad9d 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java
@@ -344,10 +344,9 @@ public void testFlushAppendFailureDoesNotLeaveMicrobatchBufferInUse() throws Exc
// deadline guarantees every appendBlocking() call trips the
// backpressure deadline and throws.
CursorSendEngine engine = new CursorSendEngine(null, 33, 33, 1L);
- QwpWebSocketSender sender = QwpWebSocketSender.connect(
+ try (QwpWebSocketSender sender = QwpWebSocketSender.connect(
"localhost", port, null, Integer.MAX_VALUE, 0, 0L, null,
- QwpWebSocketSender.DEFAULT_MAX_SCHEMAS_PER_CONNECTION, false, engine, 0L);
- try {
+ QwpWebSocketSender.DEFAULT_MAX_SCHEMAS_PER_CONNECTION, false, engine, 0L)) {
sender.table("t").longColumn("v", 1L).atNow();
try {
@@ -364,17 +363,13 @@ public void testFlushAppendFailureDoesNotLeaveMicrobatchBufferInUse() throws Exc
+ MicrobatchBuffer.stateName(buffer0.getState())
+ ", buffer1=" + MicrobatchBuffer.stateName(buffer1.getState()) + "]",
buffer0.isInUse() || buffer1.isInUse());
- } finally {
- // close() drains pending rows, which appendBlocking still
- // rejects because the engine is permanently wedged in this
- // test. The bug under test is about microbatch buffer
- // state, not about close() being lenient toward residual
- // unflushed rows — swallow the predictable rethrow here.
- try {
- sender.close();
- } catch (LineSenderException ignored) {
- }
+ } catch (LineSenderException ignored) {
}
+ // close() drains pending rows, which appendBlocking still
+ // rejects because the engine is permanently wedged in this
+ // test. The bug under test is about microbatch buffer
+ // state, not about close() being lenient toward residual
+ // unflushed rows — swallow the predictable rethrow here.
}
});
}
@@ -428,7 +423,7 @@ public void testIpv4ColumnStringNullReturnsThis() throws Exception {
assertMemoryLeak(() -> {
try (QwpWebSocketSender sender = createUnconnectedSender()) {
sender.table("t");
- Assert.assertSame(sender, sender.ipv4Column("addr", (CharSequence) null));
+ Assert.assertSame(sender, sender.ipv4Column("addr", null));
}
});
}
@@ -533,8 +528,8 @@ public void testNullArgsAfterCloseAllThrowSenderClosed() throws Exception {
assertClosed(() -> sender.doubleArray("x", (double[][]) null));
assertClosed(() -> sender.doubleArray("x", (double[][][]) null));
assertClosed(() -> sender.doubleArray("x", (DoubleArray) null));
- assertClosed(() -> sender.geoHashColumn("x", (CharSequence) null));
- assertClosed(() -> sender.ipv4Column("x", (CharSequence) null));
+ assertClosed(() -> sender.geoHashColumn("x", null));
+ assertClosed(() -> sender.ipv4Column("x", null));
assertClosed(() -> sender.longArray("x", (long[]) null));
assertClosed(() -> sender.longArray("x", (long[][]) null));
assertClosed(() -> sender.longArray("x", (long[][][]) null));
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/RecoveryReplayTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/RecoveryReplayTest.java
index 56c8b4ca..d94926b8 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/RecoveryReplayTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/RecoveryReplayTest.java
@@ -25,6 +25,7 @@
package io.questdb.client.test.cutlass.qwp.client;
import io.questdb.client.Sender;
+import io.questdb.client.cutlass.qwp.client.sf.cursor.MmapSegment;
import io.questdb.client.std.Files;
import io.questdb.client.test.cutlass.qwp.websocket.TestWebSocketServer;
import org.junit.After;
@@ -89,7 +90,7 @@ public void testRestartReplaysSealedSegmentsAgainstFreshServer() throws Exceptio
+ ";close_flush_timeout_millis=0;";
try (Sender s1 = Sender.fromConfig(cfg1)) {
for (int i = 0; i < 50; i++) {
- s1.table("foo").stringColumn("p", pad).longColumn("v", (long) i).atNow();
+ s1.table("foo").stringColumn("p", pad).longColumn("v", i).atNow();
s1.flush();
}
}
@@ -118,7 +119,7 @@ public void testRestartReplaysSealedSegmentsAgainstFreshServer() throws Exceptio
String cfg2 = "ws::addr=localhost:" + port2
+ ";sf_dir=" + sfDir + ";";
- try (Sender s2 = Sender.fromConfig(cfg2)) {
+ try (Sender ignored = Sender.fromConfig(cfg2)) {
// No new appends — purely replay.
long deadline = System.currentTimeMillis() + 5_000;
while (System.currentTimeMillis() < deadline
@@ -137,24 +138,6 @@ public void testRestartReplaysSealedSegmentsAgainstFreshServer() throws Exceptio
}
}
- private static int countSegmentFiles(String dir) {
- if (!Files.exists(dir)) return 0;
- long find = Files.findFirst(dir);
- if (find <= 0) return 0;
- int n = 0;
- try {
- int rc = 1;
- while (rc > 0) {
- String name = Files.utf8ToString(Files.findName(find));
- if (name != null && name.endsWith(".sfa")) n++;
- rc = Files.findNext(find);
- }
- } finally {
- Files.findClose(find);
- }
- return n;
- }
-
/**
* Counts only segment files that actually carry frames — opens each
* .sfa via the cursor's MmapSegment recovery path and excludes the
@@ -173,13 +156,8 @@ private static int countPopulatedSegmentFiles(String dir) {
String name = Files.utf8ToString(Files.findName(find));
if (name != null && name.endsWith(".sfa")) {
try {
- io.questdb.client.cutlass.qwp.client.sf.cursor.MmapSegment seg =
- io.questdb.client.cutlass.qwp.client.sf.cursor.MmapSegment
- .openExisting(dir + "/" + name);
- try {
+ try (MmapSegment seg = MmapSegment.openExisting(dir + "/" + name)) {
if (seg.frameCount() > 0) n++;
- } finally {
- seg.close();
}
} catch (Throwable ignored) {
// best-effort
@@ -194,9 +172,7 @@ private static int countPopulatedSegmentFiles(String dir) {
}
private static String repeat(String c, int n) {
- StringBuilder sb = new StringBuilder(n);
- for (int i = 0; i < n; i++) sb.append(c);
- return sb.toString();
+ return String.valueOf(c).repeat(Math.max(0, n));
}
private static void rmDirRec(String dir) {
@@ -220,15 +196,9 @@ private static void rmDirRec(String dir) {
Files.remove(dir);
}
- /** Receives binary frames but never acks. Sender drops them on close. */
- private static class SilentHandler implements TestWebSocketServer.WebSocketServerHandler {
- @Override
- public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) {
- // intentionally empty
- }
- }
-
- /** Acks every binary frame and tracks distinct payloads. */
+ /**
+ * Acks every binary frame and tracks distinct payloads.
+ */
private static class AckHandler implements TestWebSocketServer.WebSocketServerHandler {
// Distinct *payload bytes* — each row carries a unique long value
// so every frame's bytes differ. Counts unique frames received,
@@ -256,4 +226,14 @@ static byte[] buildAck(long seq) {
return buf;
}
}
+
+ /**
+ * Receives binary frames but never acks. Sender drops them on close.
+ */
+ private static class SilentHandler implements TestWebSocketServer.WebSocketServerHandler {
+ @Override
+ public void onBinaryMessage(TestWebSocketServer.ClientHandler client, byte[] data) {
+ // intentionally empty
+ }
+ }
}
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/BackgroundDrainerEndToEndTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/BackgroundDrainerEndToEndTest.java
index aa3b30b6..9cd50443 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/BackgroundDrainerEndToEndTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/BackgroundDrainerEndToEndTest.java
@@ -85,7 +85,7 @@ public void testDrainerEmptiesOrphanSlotAgainstAckServer() throws Exception {
+ ";close_flush_timeout_millis=0;";
try (Sender g = Sender.fromConfig(cfg1)) {
for (int i = 0; i < 30; i++) {
- g.table("foo").longColumn("v", (long) i).atNow();
+ g.table("foo").longColumn("v", i).atNow();
g.flush();
}
}
@@ -106,7 +106,7 @@ public void testDrainerEmptiesOrphanSlotAgainstAckServer() throws Exception {
+ ";sender_id=primary"
+ ";drain_orphans=true"
+ ";max_background_drainers=2;";
- try (Sender foreground = Sender.fromConfig(cfg2)) {
+ try (Sender ignored = Sender.fromConfig(cfg2)) {
// Drainer runs in the background. Wait for the ghost slot
// to drain through. 30 distinct rows expected at the ack
// server (drainer's contribution; the foreground sender
@@ -197,24 +197,6 @@ public void testDrainerLeavesFailedSentinelOnTerminalError() throws Exception {
});
}
- private static int countSegmentFiles(String dir) {
- if (!Files.exists(dir)) return 0;
- long find = Files.findFirst(dir);
- if (find <= 0) return 0;
- int n = 0;
- try {
- int rc = 1;
- while (rc > 0) {
- String name = Files.utf8ToString(Files.findName(find));
- if (name != null && name.endsWith(".sfa")) n++;
- rc = Files.findNext(find);
- }
- } finally {
- Files.findClose(find);
- }
- return n;
- }
-
private static void rmDirRec(String dir) {
if (!Files.exists(dir)) return;
long find = Files.findFirst(dir);
diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/SfFromConfigTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/SfFromConfigTest.java
index 0236f828..bce1e191 100644
--- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/SfFromConfigTest.java
+++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/SfFromConfigTest.java
@@ -106,7 +106,7 @@ public void testSfMaxBytesParsing() throws Exception {
try (Sender sender = Sender.fromConfig(config)) {
// Write enough data that segments rotate at ~128 KiB boundary.
for (int i = 0; i < 50; i++) {
- sender.table("foo").longColumn("v", (long) i).atNow();
+ sender.table("foo").longColumn("v", i).atNow();
}
sender.flush();
}
diff --git a/design/qwp-cursor-error-api.md b/design/qwp-cursor-error-api.md
index b8371d4c..eae99bc0 100644
--- a/design/qwp-cursor-error-api.md
+++ b/design/qwp-cursor-error-api.md
@@ -82,16 +82,17 @@ User overrides via builder (`errorPolicy(Category, Policy)` or full `errorPolicy
## `SenderError` (public, immutable)
```java
-public final class SenderError {
- public final Category category;
- public final Policy appliedPolicy; // what the loop actually did
- public final int serverStatusByte; // raw byte (0x03/0x05/...); -1 for PROTOCOL_VIOLATION
- public final String serverMessage; // ≤1024 UTF-8 from frame, or WS close reason
- public final long messageSequence; // server's per-frame seq (mirrors what server logs); -1 for PROTOCOL_VIOLATION
- public final long fromFsn; // client-side FSN span — load-bearing for correlation
- public final long toFsn; // inclusive
- public final String tableName; // best-effort; null if multi-table batch
- public final long detectedAtNanos; // System.nanoTime() at I/O thread receipt
+/**
+ * @param appliedPolicy what the loop actually did
+ * @param serverStatusByte raw byte (0x03/0x05/...); -1 for PROTOCOL_VIOLATION
+ * @param serverMessage ≤1024 UTF-8 from frame, or WS close reason
+ * @param messageSequence server's per-frame seq (mirrors what server logs); -1 for PROTOCOL_VIOLATION
+ * @param fromFsn client-side FSN span — load-bearing for correlation
+ * @param toFsn inclusive
+ * @param tableName best-effort; null if multi-table batch
+ * @param detectedAtNanos System.nanoTime() at I/O thread receipt */
+public record SenderError(Category category, Policy appliedPolicy, int serverStatusByte, String serverMessage,
+ long messageSequence, long fromFsn, long toFsn, String tableName, long detectedAtNanos) {
// accessors only; no mutation
}
```