From be79e84a3c242dae955071f0c39177fd04bfe995 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 21 Apr 2026 15:23:47 +0200 Subject: [PATCH 1/6] fix(ilp): report actionable QWP websocket frame-size errors Report oversized WebSocket frames with the received frame size, configured max size, and guidance to decrease batch size. Also propagate terminal WebSocket sender failures at Sender level so async ACK errors surface promptly. --- .../qwp/client/QwpWebSocketSender.java | 67 +++++++++++++------ .../qwp/client/WebSocketSendQueue.java | 61 ++++++++++++----- .../qwp/client/AsyncModeIntegrationTest.java | 12 ++-- .../client/QwpWebSocketSenderStateTest.java | 41 ++++++++++++ .../qwp/client/WebSocketSendQueueTest.java | 61 +++++++++++++++++ 5 files changed, 199 insertions(+), 43 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index cc4f8d72..dff1c4b8 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -51,6 +51,7 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; /** * QWP v1 WebSocket client sender for streaming data to QuestDB. @@ -112,6 +113,7 @@ public class QwpWebSocketSender implements Sender { private final String host; // Flow control configuration private final int inFlightWindowSize; + private final AtomicReference connectionError = new AtomicReference<>(); private final int maxSchemasPerConnection; private final int port; private final CharSequenceObjHashMap tableBuffers; @@ -482,7 +484,7 @@ public void close() { // Flush any remaining data try { - if (inFlightWindowSize > 1) { + if (connectionError.get() == null && inFlightWindowSize > 1) { // Async mode (window > 1): flush accumulated rows in table buffers first flushPendingRows(); @@ -496,7 +498,7 @@ public void close() { } else if (inFlightWindow != null) { inFlightWindow.awaitEmpty(); } - } else { + } else if (connectionError.get() == null) { // Sync mode (window=1): flush pending rows synchronously if (pendingRowCount > 0 && client != null && client.isConnected()) { flushSync(); @@ -744,10 +746,21 @@ public void flush() { } // Wait for all pending batches to be sent to the server - sendQueue.flush(); + try { + sendQueue.flush(); + } catch (LineSenderException e) { + checkConnectionError(); + throw e; + } // Wait for all in-flight batches to be acknowledged by the server - sendQueue.awaitPendingAcks(); + try { + sendQueue.awaitPendingAcks(); + } catch (LineSenderException e) { + checkConnectionError(); + throw e; + } + checkConnectionError(); if (LOG.isDebugEnabled()) { LOG.debug("Flush complete [totalBatches={}, totalBytes={}, totalAcked={}]", sendQueue.getTotalBatchesSent(), sendQueue.getTotalBytesSent(), inFlightWindow.getTotalAcked()); @@ -1158,6 +1171,14 @@ private void checkNotClosed() { if (closed) { throw new LineSenderException("Sender is closed"); } + checkConnectionError(); + } + + private void checkConnectionError() { + LineSenderException error = connectionError.get(); + if (error != null) { + throw error; + } } private void checkTableSelected() { @@ -1200,9 +1221,7 @@ private void ensureActiveBufferReady() { } private void ensureConnected() { - if (closed) { - throw new LineSenderException("Sender is closed"); - } + checkNotClosed(); if (!connected) { // Create WebSocket client using factory (zero-GC native implementation) if (tlsConfig != null) { @@ -1232,7 +1251,8 @@ private void ensureConnected() { try { sendQueue = new WebSocketSendQueue(client, inFlightWindow, WebSocketSendQueue.DEFAULT_ENQUEUE_TIMEOUT_MS, - WebSocketSendQueue.DEFAULT_SHUTDOWN_TIMEOUT_MS); + WebSocketSendQueue.DEFAULT_SHUTDOWN_TIMEOUT_MS, + this::recordConnectionFailure); } catch (Throwable t) { inFlightWindow = null; client.close(); @@ -1248,6 +1268,7 @@ private void ensureConnected() { // Server starts fresh on each connection, so any sender-local schema // IDs retained from a prior connection must be discarded as well. resetSchemaStateForNewConnection(); + connectionError.set(null); connected = true; LOG.info("Connected to WebSocket [host={}, port={}, windowSize={}, qwpVersion={}]", @@ -1264,12 +1285,16 @@ private void ensureNoInProgressRow() { } } - private void failExpectedIfNeeded(long expectedSequence, LineSenderException error) { - if (inFlightWindow != null && inFlightWindow.getLastError() == null) { - inFlightWindow.fail(expectedSequence, error); + private void failConnectionIfNeeded(LineSenderException error) { + if (recordConnectionFailure(error) && inFlightWindow != null) { + inFlightWindow.failAll(error); } } + private boolean recordConnectionFailure(LineSenderException error) { + return connectionError.compareAndSet(null, error); + } + /** * Flushes pending rows by encoding and sending them. * All non-empty tables are encoded into a single QWP v1 message and sent as one WebSocket frame. @@ -1451,6 +1476,7 @@ private void flushSync() { // Track batch in InFlightWindow before sending long batchSequence = nextBatchSequence++; + checkConnectionError(); inFlightWindow.addInFlight(batchSequence); if (LOG.isDebugEnabled()) { @@ -1462,11 +1488,11 @@ private void flushSync() { try { client.sendBinary(buffer.getBufferPtr(), messageSize); } catch (LineSenderException e) { - failExpectedIfNeeded(batchSequence, e); + failConnectionIfNeeded(e); throw e; } catch (Throwable t) { LineSenderException error = new LineSenderException("Failed to send batch " + batchSequence, t); - failExpectedIfNeeded(batchSequence, error); + failConnectionIfNeeded(error); throw error; } @@ -1583,6 +1609,7 @@ private void sealAndSwapBuffer() { if (toSend.isSealed()) { toSend.rollbackSealForRetry(); } + checkConnectionError(); throw e; } } @@ -1701,24 +1728,22 @@ private void waitForAck(long expectedSequence) { LineSenderException error = new LineSenderException( "Server error for batch " + sequence + ": " + ackResponse.getStatusName() + " - " + errorMessage); - inFlightWindow.fail(sequence, error); - if (sequence == expectedSequence) { - throw error; - } + failConnectionIfNeeded(error); + throw error; } } } catch (LineSenderException e) { - failExpectedIfNeeded(expectedSequence, e); + failConnectionIfNeeded(e); throw e; } catch (Exception e) { LineSenderException wrapped = new LineSenderException("Error waiting for ACK: " + e.getMessage(), e); - failExpectedIfNeeded(expectedSequence, wrapped); + failConnectionIfNeeded(wrapped); throw wrapped; } } LineSenderException timeout = new LineSenderException("Timeout waiting for ACK for batch " + expectedSequence); - failExpectedIfNeeded(expectedSequence, timeout); + failConnectionIfNeeded(timeout); throw timeout; } @@ -1744,7 +1769,7 @@ public void onBinaryMessage(long payloadPtr, int payloadLen) { @Override public void onClose(int code, String reason) { - throw new LineSenderException("WebSocket closed while waiting for ACK: " + reason); + throw new LineSenderException("WebSocket closed while waiting for ACK [code=" + code + ", reason=" + reason + ']'); } } } diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketSendQueue.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketSendQueue.java index 05bf3315..181602b8 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketSendQueue.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketSendQueue.java @@ -73,6 +73,8 @@ public class WebSocketSendQueue implements QuietCloseable { private final WebSocketClient client; // Configuration private final long enqueueTimeoutMs; + @Nullable + private final ConnectionFailureListener connectionFailureListener; // Optional InFlightWindow for tracking sent batches awaiting ACK @Nullable private final InFlightWindow inFlightWindow; @@ -121,6 +123,21 @@ public class WebSocketSendQueue implements QuietCloseable { */ public WebSocketSendQueue(WebSocketClient client, @Nullable InFlightWindow inFlightWindow, long enqueueTimeoutMs, long shutdownTimeoutMs) { + this(client, inFlightWindow, enqueueTimeoutMs, shutdownTimeoutMs, null); + } + + /** + * Creates a new send queue with custom configuration. + * + * @param client the WebSocket client for I/O + * @param inFlightWindow the window to track sent batches awaiting ACK (may be null) + * @param enqueueTimeoutMs timeout for enqueue operations (ms) + * @param shutdownTimeoutMs timeout for graceful shutdown (ms) + * @param connectionFailureListener notified once when the queue detects a terminal connection failure + */ + public WebSocketSendQueue(WebSocketClient client, @Nullable InFlightWindow inFlightWindow, + long enqueueTimeoutMs, long shutdownTimeoutMs, + @Nullable ConnectionFailureListener connectionFailureListener) { if (client == null) { throw new IllegalArgumentException("client cannot be null"); } @@ -129,6 +146,7 @@ public WebSocketSendQueue(WebSocketClient client, @Nullable InFlightWindow inFli this.inFlightWindow = inFlightWindow; this.enqueueTimeoutMs = enqueueTimeoutMs; this.shutdownTimeoutMs = shutdownTimeoutMs; + this.connectionFailureListener = connectionFailureListener; this.running = true; this.shuttingDown = false; this.shutdownLatch = new CountDownLatch(1); @@ -226,20 +244,20 @@ public boolean enqueue(MicrobatchBuffer buffer) { throw new LineSenderException("Buffer must be sealed before enqueue, state=" + MicrobatchBuffer.stateName(buffer.getState())); } + checkError(); if (!running || shuttingDown) { + checkError(); throw new LineSenderException("Send queue is not running"); } - // Check for errors from I/O thread - checkError(); - final long deadline = System.currentTimeMillis() + enqueueTimeoutMs; synchronized (processingLock) { while (true) { + checkError(); if (!running || shuttingDown) { + checkError(); throw new LineSenderException("Send queue is not running"); } - checkError(); if (offerPending(buffer)) { processingLock.notifyAll(); @@ -368,12 +386,20 @@ private IoState computeState(boolean hasInFlight) { } } - private void failTransport(LineSenderException error) { + private void failConnection(LineSenderException error) { Throwable rootError = lastError; + boolean firstFailure = rootError == null; if (rootError == null) { lastError = error; rootError = error; } + if (firstFailure && connectionFailureListener != null) { + try { + connectionFailureListener.onConnectionFailure(error); + } catch (Throwable t) { + LOG.error("Error notifying connection failure listener", t); + } + } running = false; shuttingDown = true; if (inFlightWindow != null) { @@ -532,7 +558,7 @@ private void safeSendBatch(MicrobatchBuffer batch) { sendBatch(batch); } catch (Throwable t) { LOG.error("Error sending batch [id={}]{}", batch.getBatchId(), "", t); - failTransport(new LineSenderException("Error sending batch " + batch.getBatchId() + ": " + t.getMessage(), t)); + failConnection(new LineSenderException("Error sending batch " + batch.getBatchId() + ": " + t.getMessage(), t)); // Mark as recycled even on error to allow cleanup if (batch.isSealed()) { batch.markSending(); @@ -608,7 +634,7 @@ private boolean tryReceiveAcks() { } catch (Exception e) { if (running) { LOG.error("Error receiving response: {}", e.getMessage()); - failTransport(new LineSenderException("Error receiving response: " + e.getMessage(), e)); + failConnection(new LineSenderException("Error receiving response: " + e.getMessage(), e)); } } return received; @@ -626,6 +652,11 @@ private enum IoState { IDLE, ACTIVE, DRAINING } + @FunctionalInterface + public interface ConnectionFailureListener { + void onConnectionFailure(LineSenderException error); + } + /** * Handler for received WebSocket frames (ACKs from server). */ @@ -638,7 +669,7 @@ public void onBinaryMessage(long payloadPtr, int payloadLen) { "Invalid ACK response payload [length=" + payloadLen + ']' ); LOG.error("Invalid ACK response payload [length={}]", payloadLen); - failTransport(error); + failConnection(error); return; } @@ -646,7 +677,7 @@ public void onBinaryMessage(long payloadPtr, int payloadLen) { if (!response.readFrom(payloadPtr, payloadLen)) { LineSenderException error = new LineSenderException("Failed to parse ACK response"); LOG.error("Failed to parse response"); - failTransport(error); + failConnection(error); return; } @@ -670,20 +701,18 @@ public void onBinaryMessage(long payloadPtr, int payloadLen) { String errorMessage = response.getErrorMessage(); LOG.error("Error response [seq={}, status={}, error={}]", sequence, response.getStatusName(), errorMessage); - if (inFlightWindow != null) { - LineSenderException error = new LineSenderException( - "Server error for batch " + sequence + ": " + - response.getStatusName() + " - " + errorMessage); - inFlightWindow.fail(sequence, error); - } + LineSenderException error = new LineSenderException( + "Server error for batch " + sequence + ": " + + response.getStatusName() + " - " + errorMessage); totalErrors.incrementAndGet(); + failConnection(error); } } @Override public void onClose(int code, String reason) { LOG.info("WebSocket closed by server [code={}, reason={}]", code, reason); - failTransport(new LineSenderException("WebSocket closed by server [code=" + code + ", reason=" + reason + ']')); + failConnection(new LineSenderException("WebSocket closed by server [code=" + code + ", reason=" + reason + ']')); } } } diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/AsyncModeIntegrationTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/AsyncModeIntegrationTest.java index ad03199b..475595fa 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/AsyncModeIntegrationTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/AsyncModeIntegrationTest.java @@ -461,8 +461,8 @@ public void testHighThroughputWithManyBatches() throws Exception { /** * The server ACKs the first batch but returns a WRITE_ERROR for the - * second. {@link WebSocketSendQueue#flush()} completes (both batches - * were sent) but {@link InFlightWindow#awaitEmpty()} surfaces the error. + * second. The error is treated as a terminal connection failure and is + * surfaced by the next queue operation. */ @Test public void testServerErrorPropagatesOnFlush() throws Exception { @@ -471,6 +471,7 @@ public void testServerErrorPropagatesOnFlush() throws Exception { FakeWebSocketClient client = new FakeWebSocketClient(); AtomicLong highestSent = new AtomicLong(-1); AtomicLong highestDelivered = new AtomicLong(-1); + CountDownLatch errorDelivered = new CountDownLatch(1); client.setSendBehavior((ptr, len) -> highestSent.incrementAndGet()); client.setTryReceiveBehavior(handler -> { @@ -481,6 +482,7 @@ public void testServerErrorPropagatesOnFlush() throws Exception { highestDelivered.set(next); if (next == 1) { emitDiskFullError(handler, next); + errorDelivered.countDown(); } else { emitAck(handler, next); } @@ -506,12 +508,10 @@ public void testServerErrorPropagatesOnFlush() throws Exception { buf1.seal(); queue.enqueue(buf1); - // flush() waits for the queue to drain (both batches sent). - queue.flush(); + assertTrue("Expected server error ACK", errorDelivered.await(2, TimeUnit.SECONDS)); - // awaitEmpty() surfaces the server error for batch 1. try { - window.awaitEmpty(); + queue.flush(); fail("Expected server error to propagate"); } catch (LineSenderException e) { assertTrue("Error should mention server failure", diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderStateTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderStateTest.java index 66282fe8..a843e4b2 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderStateTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderStateTest.java @@ -24,6 +24,7 @@ package io.questdb.client.test.cutlass.qwp.client; +import io.questdb.client.cutlass.line.LineSenderException; import io.questdb.client.cutlass.qwp.client.InFlightWindow; import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender; import io.questdb.client.cutlass.qwp.protocol.QwpTableBuffer; @@ -49,6 +50,40 @@ */ public class QwpWebSocketSenderStateTest extends AbstractTest { + @Test + public void testConnectionFailureIsSenderLevelTerminalState() throws Exception { + assertMemoryLeak(() -> { + QwpWebSocketSender sender = QwpWebSocketSender.createForTesting( + "localhost", 0, 10_000, 0, 0L, 8 + ); + try { + LineSenderException failure = new LineSenderException( + "Server error for batch 7: WRITE_ERROR - disk full" + ); + Assert.assertTrue(invokeRecordConnectionFailure(sender, failure)); + + try { + sender.table("t"); + Assert.fail("Expected sender-level connection failure"); + } catch (LineSenderException e) { + Assert.assertSame(failure, e); + } + + LineSenderException secondFailure = new LineSenderException("second failure"); + Assert.assertFalse(invokeRecordConnectionFailure(sender, secondFailure)); + + try { + sender.flush(); + Assert.fail("Expected original sender-level connection failure"); + } catch (LineSenderException e) { + Assert.assertSame(failure, e); + } + } finally { + sender.close(); + } + }); + } + @Test public void testAutoFlushAccumulatesRowsAcrossAllTables() throws Exception { assertMemoryLeak(() -> { @@ -319,6 +354,12 @@ private static void invokeResetSchemaStateForNewConnection(Object target) throws method.invoke(target); } + private static boolean invokeRecordConnectionFailure(Object target, LineSenderException error) throws Exception { + Method method = target.getClass().getDeclaredMethod("recordConnectionFailure", LineSenderException.class); + method.setAccessible(true); + return (boolean) method.invoke(target, error); + } + private static void setField(Object target, String fieldName, Object value) throws Exception { Field f = target.getClass().getDeclaredField(fieldName); f.setAccessible(true); diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketSendQueueTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketSendQueueTest.java index 0b769628..19b4753f 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketSendQueueTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/WebSocketSendQueueTest.java @@ -270,6 +270,55 @@ public void testFlushFailsWhenServerClosesConnection() throws Exception { }); } + @Test + public void testEnqueueAfterServerErrorAckSurfacesServerError() throws Exception { + assertMemoryLeak(() -> { + InFlightWindow window = new InFlightWindow(8, 5_000); + FakeWebSocketClient client = new FakeWebSocketClient(); + WebSocketSendQueue queue = null; + MicrobatchBuffer batch0 = sealedBuffer((byte) 42); + MicrobatchBuffer batch1 = sealedBuffer((byte) 43); + CountDownLatch errorDelivered = new CountDownLatch(1); + AtomicBoolean fired = new AtomicBoolean(false); + AtomicLong highestSent = new AtomicLong(-1); + AtomicReference connectionFailure = new AtomicReference<>(); + + try { + client.setSendBehavior((dataPtr, length) -> highestSent.incrementAndGet()); + client.setTryReceiveBehavior(handler -> { + long sent = highestSent.get(); + if (sent >= 0 && fired.compareAndSet(false, true)) { + emitError(handler, sent, WebSocketResponse.STATUS_WRITE_ERROR, "disk full"); + errorDelivered.countDown(); + return true; + } + return false; + }); + + queue = new WebSocketSendQueue(client, window, 1_000, 500, connectionFailure::set); + queue.enqueue(batch0); + assertTrue("Expected server error ACK callback", errorDelivered.await(2, TimeUnit.SECONDS)); + assertNotNull("Expected connection failure callback", connectionFailure.get()); + assertTrue(connectionFailure.get().getMessage(), connectionFailure.get().getMessage().contains("WRITE_ERROR")); + assertTrue(connectionFailure.get().getMessage(), connectionFailure.get().getMessage().contains("disk full")); + + try { + queue.enqueue(batch1); + fail("Expected enqueue failure after server error ACK"); + } catch (LineSenderException e) { + assertTrue(e.getMessage(), e.getMessage().contains("WRITE_ERROR")); + assertTrue(e.getMessage(), e.getMessage().contains("disk full")); + assertSame(connectionFailure.get(), e.getCause()); + } + } finally { + closeQuietly(queue); + batch0.close(); + batch1.close(); + client.close(); + } + }); + } + @Test public void testAwaitPendingAcksKeepsDrainNonBlocking() throws Exception { assertMemoryLeak(() -> { @@ -390,6 +439,18 @@ private static void emitAck(WebSocketFrameHandler handler, long sequence) { } } + private static void emitError(WebSocketFrameHandler handler, long sequence, byte status, String errorMessage) { + WebSocketResponse response = WebSocketResponse.error(sequence, status, errorMessage); + int size = response.serializedSize(); + long ptr = Unsafe.malloc(size, MemoryTag.NATIVE_DEFAULT); + try { + response.writeTo(ptr); + handler.onBinaryMessage(ptr, size); + } finally { + Unsafe.free(ptr, size, MemoryTag.NATIVE_DEFAULT); + } + } + private static MicrobatchBuffer sealedBuffer(byte value) { MicrobatchBuffer buffer = new MicrobatchBuffer(64); buffer.writeByte(value); From 90b0b70ab2c0b751c4dfe05d26de9f120abe07f8 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Tue, 21 Apr 2026 17:10:12 +0200 Subject: [PATCH 2/6] the close-frame and bad-request paths allocated a byte[] per response. Swap them for zero-GC Utf8s helpers. --- .../http/client/WebSocketSendBuffer.java | 64 ++++----- .../qwp/client/NativeBufferWriter.java | 56 +------- .../cutlass/qwp/client/WebSocketResponse.java | 12 +- .../java/io/questdb/client/std/str/Utf8s.java | 117 ++++++++++++++++ .../client/test/std/str/Utf8sTest.java | 125 ++++++++++++++++++ 5 files changed, 281 insertions(+), 93 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java b/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java index 95f9bc66..c317ba19 100644 --- a/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java +++ b/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java @@ -35,6 +35,7 @@ import io.questdb.client.std.SecureRnd; import io.questdb.client.std.Unsafe; import io.questdb.client.std.Vect; +import io.questdb.client.std.str.Utf8s; /** * Zero-GC WebSocket send buffer that implements {@link ArrayBufferAppender} for direct @@ -208,13 +209,13 @@ public int getPosition() { } @Override - public long getWriteAddress() { - return bufPtr + writePos; + public int getWritableBytes() { + return bufCapacity - writePos; } @Override - public int getWritableBytes() { - return bufCapacity - writePos; + public long getWriteAddress() { + return bufPtr + writePos; } /** @@ -329,32 +330,27 @@ public void putUtf8(String value) { if (value == null || value.isEmpty()) { return; } - for (int i = 0, n = value.length(); i < n; i++) { + + int charLen = value.length(); + ensureCapacity(charLen); + + // Single-pass ASCII path. Non-ASCII falls back to the shared UTF-8 utility. + long addr = bufPtr + writePos; + int i = 0; + for (; i < charLen; i++) { char c = value.charAt(i); - if (c < 0x80) { - putByte((byte) c); - } else if (c < 0x800) { - putByte((byte) (0xC0 | (c >> 6))); - putByte((byte) (0x80 | (c & 0x3F))); - } else if (c >= 0xD800 && c <= 0xDBFF && i + 1 < n) { - char c2 = value.charAt(++i); - if (Character.isLowSurrogate(c2)) { - int codePoint = 0x10000 + ((c - 0xD800) << 10) + (c2 - 0xDC00); - putByte((byte) (0xF0 | (codePoint >> 18))); - putByte((byte) (0x80 | ((codePoint >> 12) & 0x3F))); - putByte((byte) (0x80 | ((codePoint >> 6) & 0x3F))); - putByte((byte) (0x80 | (codePoint & 0x3F))); - } else { - putByte((byte) '?'); - i--; - } - } else if (Character.isSurrogate(c)) { - putByte((byte) '?'); - } else { - putByte((byte) (0xE0 | (c >> 12))); - putByte((byte) (0x80 | ((c >> 6) & 0x3F))); - putByte((byte) (0x80 | (c & 0x3F))); + if (c >= 0x80) { + break; } + Unsafe.getUnsafe().putByte(addr++, (byte) c); + } + + if (i == charLen) { + writePos += charLen; + } else { + int utf8Len = NativeBufferWriter.utf8Length(value); + ensureCapacity(utf8Len); + writePos += Utf8s.strCpyUtf8(value, bufPtr + writePos, utf8Len); } } @@ -397,10 +393,10 @@ public void skip(int bytes) { */ public FrameInfo writeCloseFrame(int code, String reason) { int payloadLen = 2; // status code - byte[] reasonBytes = null; + int reasonLen = 0; if (reason != null && !reason.isEmpty()) { - reasonBytes = reason.getBytes(java.nio.charset.StandardCharsets.UTF_8); - payloadLen += reasonBytes.length; + reasonLen = Utf8s.utf8Bytes(reason); + payloadLen += reasonLen; } if (payloadLen > 125) { @@ -422,10 +418,8 @@ public FrameInfo writeCloseFrame(int code, String reason) { writePos += 2; // Write reason if present - if (reasonBytes != null) { - for (byte reasonByte : reasonBytes) { - Unsafe.getUnsafe().putByte(bufPtr + writePos++, reasonByte); - } + if (reasonLen > 0) { + writePos += Utf8s.strCpyUtf8(reason, bufPtr + writePos, reasonLen); } // Mask the payload (including status code and reason) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java index 4cf66558..cc501594 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java @@ -27,6 +27,7 @@ import io.questdb.client.std.MemoryTag; import io.questdb.client.std.QuietCloseable; import io.questdb.client.std.Unsafe; +import io.questdb.client.std.str.Utf8s; /** * A simple native memory buffer writer for encoding QWP v1 messages. @@ -61,24 +62,7 @@ public NativeBufferWriter(int initialCapacity) { * @return the number of bytes needed to encode the string as UTF-8 */ public static int utf8Length(String s) { - if (s == null) return 0; - int len = 0; - for (int i = 0, n = s.length(); i < n; i++) { - char c = s.charAt(i); - if (c < 0x80) { - len++; - } else if (c < 0x800) { - len += 2; - } else if (c >= 0xD800 && c <= 0xDBFF && i + 1 < n && Character.isLowSurrogate(s.charAt(i + 1))) { - i++; - len += 4; - } else if (Character.isSurrogate(c)) { - len++; - } else { - len += 3; - } - } - return len; + return s == null ? 0 : Utf8s.utf8Bytes(s); } /** @@ -274,7 +258,7 @@ public void putString(String value) { int utf8Len = utf8Length(value); putVarint(utf8Len); ensureCapacity(utf8Len); - encodeUtf8(value); + encodeUtf8(value, utf8Len); } } @@ -308,7 +292,7 @@ public void putUtf8(String value) { // Non-ASCII — fall back to two-pass (re-encodes from start) int utf8Len = utf8Length(value); ensureCapacity(utf8Len); - encodeUtf8(value); + encodeUtf8(value, utf8Len); } } @@ -355,35 +339,7 @@ private static void writeVarintDirect(long addr, long value) { Unsafe.getUnsafe().putByte(addr, (byte) value); } - private void encodeUtf8(String value) { - long addr = bufferPtr + position; - for (int i = 0, n = value.length(); i < n; i++) { - char c = value.charAt(i); - if (c < 0x80) { - Unsafe.getUnsafe().putByte(addr++, (byte) c); - } else if (c < 0x800) { - Unsafe.getUnsafe().putByte(addr++, (byte) (0xC0 | (c >> 6))); - Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | (c & 0x3F))); - } else if (c >= 0xD800 && c <= 0xDBFF && i + 1 < n) { - char c2 = value.charAt(++i); - if (Character.isLowSurrogate(c2)) { - int codePoint = 0x10000 + ((c - 0xD800) << 10) + (c2 - 0xDC00); - Unsafe.getUnsafe().putByte(addr++, (byte) (0xF0 | (codePoint >> 18))); - Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | ((codePoint >> 12) & 0x3F))); - Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | ((codePoint >> 6) & 0x3F))); - Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | (codePoint & 0x3F))); - } else { - Unsafe.getUnsafe().putByte(addr++, (byte) '?'); - i--; - } - } else if (Character.isSurrogate(c)) { - Unsafe.getUnsafe().putByte(addr++, (byte) '?'); - } else { - Unsafe.getUnsafe().putByte(addr++, (byte) (0xE0 | (c >> 12))); - Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | ((c >> 6) & 0x3F))); - Unsafe.getUnsafe().putByte(addr++, (byte) (0x80 | (c & 0x3F))); - } - } - position = (int) (addr - bufferPtr); + private void encodeUtf8(String value, int utf8Len) { + position += Utf8s.strCpyUtf8(value, bufferPtr + position, utf8Len); } } diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketResponse.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketResponse.java index e87ceab3..5edac599 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketResponse.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketResponse.java @@ -25,6 +25,7 @@ package io.questdb.client.cutlass.qwp.client; import io.questdb.client.std.Unsafe; +import io.questdb.client.std.str.Utf8s; import java.nio.charset.StandardCharsets; @@ -219,8 +220,7 @@ public boolean readFrom(long ptr, int length) { public int serializedSize() { int size = MIN_RESPONSE_SIZE; if (errorMessage != null && !errorMessage.isEmpty()) { - byte[] msgBytes = errorMessage.getBytes(StandardCharsets.UTF_8); - int msgLen = Math.min(msgBytes.length, MAX_ERROR_MESSAGE_LENGTH); + int msgLen = Utf8s.utf8Bytes(errorMessage, MAX_ERROR_MESSAGE_LENGTH); size += 2 + msgLen; // 2 bytes for length prefix } return size; @@ -255,18 +255,14 @@ public int writeTo(long ptr) { // Error message (if any) if (status != STATUS_OK && errorMessage != null && !errorMessage.isEmpty()) { - byte[] msgBytes = errorMessage.getBytes(StandardCharsets.UTF_8); - int msgLen = Math.min(msgBytes.length, MAX_ERROR_MESSAGE_LENGTH); + int msgLen = Utf8s.utf8Bytes(errorMessage, MAX_ERROR_MESSAGE_LENGTH); // Length prefix (2 bytes, little-endian) Unsafe.getUnsafe().putShort(ptr + offset, (short) msgLen); offset += 2; // Message bytes - for (int i = 0; i < msgLen; i++) { - Unsafe.getUnsafe().putByte(ptr + offset + i, msgBytes[i]); - } - offset += msgLen; + offset += Utf8s.strCpyUtf8(errorMessage, ptr + offset, msgLen); } return offset; diff --git a/core/src/main/java/io/questdb/client/std/str/Utf8s.java b/core/src/main/java/io/questdb/client/std/str/Utf8s.java index 5efca1e0..480749af 100644 --- a/core/src/main/java/io/questdb/client/std/str/Utf8s.java +++ b/core/src/main/java/io/questdb/client/std/str/Utf8s.java @@ -173,6 +173,62 @@ public static void strCpyAscii(@NotNull CharSequence asciiSrc, int srcLo, int sr } } + /** + * Encodes a UTF-16 sequence as UTF-8 directly into memory. Writes at most + * {@code maxBytes} bytes without splitting multi-byte sequences. Invalid + * surrogates are replaced with '?'. + * + * @return the number of UTF-8 bytes written + */ + public static int strCpyUtf8(@NotNull CharSequence src, long destAddr, int maxBytes) { + int pos = 0; + for (int i = 0, n = src.length(); i < n; i++) { + char c = src.charAt(i); + if (c < 0x80) { + if (pos + 1 > maxBytes) { + break; + } + Unsafe.getUnsafe().putByte(destAddr + pos, (byte) c); + pos++; + } else if (c < 0x800) { + if (pos + 2 > maxBytes) { + break; + } + Unsafe.getUnsafe().putByte(destAddr + pos, (byte) (192 | c >> 6)); + Unsafe.getUnsafe().putByte(destAddr + pos + 1, (byte) (128 | c & 63)); + pos += 2; + } else if (Character.isSurrogate(c)) { + if (Character.isHighSurrogate(c) && i + 1 < n && Character.isLowSurrogate(src.charAt(i + 1))) { + if (pos + 4 > maxBytes) { + break; + } + int cp = Character.toCodePoint(c, src.charAt(i + 1)); + Unsafe.getUnsafe().putByte(destAddr + pos, (byte) (240 | cp >> 18)); + Unsafe.getUnsafe().putByte(destAddr + pos + 1, (byte) (128 | cp >> 12 & 63)); + Unsafe.getUnsafe().putByte(destAddr + pos + 2, (byte) (128 | cp >> 6 & 63)); + Unsafe.getUnsafe().putByte(destAddr + pos + 3, (byte) (128 | cp & 63)); + pos += 4; + i++; + } else { + if (pos + 1 > maxBytes) { + break; + } + Unsafe.getUnsafe().putByte(destAddr + pos, (byte) '?'); + pos++; + } + } else { + if (pos + 3 > maxBytes) { + break; + } + Unsafe.getUnsafe().putByte(destAddr + pos, (byte) (224 | c >> 12)); + Unsafe.getUnsafe().putByte(destAddr + pos + 1, (byte) (128 | c >> 6 & 63)); + Unsafe.getUnsafe().putByte(destAddr + pos + 2, (byte) (128 | c & 63)); + pos += 3; + } + } + return pos; + } + public static String stringFromUtf8Bytes(long lo, long hi) { if (hi == lo) { return ""; @@ -206,6 +262,67 @@ public static String stringFromUtf8Bytes(@NotNull Utf8Sequence seq) { return b.toString(); } + public static int utf8Bytes(@NotNull CharSequence sequence) { + int count = 0; + int len = sequence.length(); + + for (int i = 0; i < len; i++) { + char ch = sequence.charAt(i); + if (ch < 0x80) { + count++; + } else if (ch < 0x800) { + count += 2; + } else if (Character.isSurrogate(ch)) { + if (Character.isHighSurrogate(ch)) { + if (i + 1 < len && Character.isLowSurrogate(sequence.charAt(i + 1))) { + count += 4; + i++; + } else { + count++; + } + } else { + count++; + } + } else { + count += 3; + } + } + return count; + } + + public static int utf8Bytes(@NotNull CharSequence sequence, int maxBytes) { + int count = 0; + int len = sequence.length(); + + for (int i = 0; i < len; i++) { + char ch = sequence.charAt(i); + int charBytes; + if (ch < 0x80) { + charBytes = 1; + } else if (ch < 0x800) { + charBytes = 2; + } else if (Character.isSurrogate(ch)) { + if (Character.isHighSurrogate(ch) && i + 1 < len && Character.isLowSurrogate(sequence.charAt(i + 1))) { + charBytes = 4; + if (count + charBytes > maxBytes) { + break; + } + count += charBytes; + i++; + continue; + } + charBytes = 1; + } else { + charBytes = 3; + } + if (count + charBytes > maxBytes) { + break; + } + count += charBytes; + } + return count; + } + public static int utf8DecodeMultiByte(long lo, long hi, byte b, Utf16Sink sink) { if (b >> 5 == -2 && (b & 30) != 0) { return utf8Decode2Bytes(lo, hi, b, sink); diff --git a/core/src/test/java/io/questdb/client/test/std/str/Utf8sTest.java b/core/src/test/java/io/questdb/client/test/std/str/Utf8sTest.java index 19133266..a9c70eed 100644 --- a/core/src/test/java/io/questdb/client/test/std/str/Utf8sTest.java +++ b/core/src/test/java/io/questdb/client/test/std/str/Utf8sTest.java @@ -247,6 +247,99 @@ public void testQuotedTextParsing() { Assert.assertEquals(text, query.toString()); } + @Test + public void testStrCpyUtf8() { + final int bufSize = 64; + long mem = Unsafe.malloc(bufSize, MemoryTag.NATIVE_DEFAULT); + try { + fill(mem, bufSize, (byte) 0x7F); + + Assert.assertEquals(0, Utf8s.strCpyUtf8("", mem, bufSize)); + assertUntouched(mem, bufSize, (byte) 0x7F); + + Assert.assertEquals(0, Utf8s.strCpyUtf8("hello", mem, 0)); + assertUntouched(mem, bufSize, (byte) 0x7F); + + assertStrCpyUtf8(mem, bufSize, "hello", bufSize, 5, "hello"); + assertStrCpyUtf8(mem, bufSize, "éü", bufSize, 4, "éü"); + assertStrCpyUtf8(mem, bufSize, "世界", bufSize, 6, "世界"); + assertStrCpyUtf8(mem, bufSize, "\uD83D\uDE00", bufSize, 4, "\uD83D\uDE00"); + assertStrCpyUtf8(mem, bufSize, "Aé世\uD83D\uDE00", bufSize, 10, "Aé世\uD83D\uDE00"); + } finally { + Unsafe.free(mem, bufSize, MemoryTag.NATIVE_DEFAULT); + } + } + + @Test + public void testStrCpyUtf8DoesNotSplitCharactersAtLimit() { + final int bufSize = 16; + long mem = Unsafe.malloc(bufSize, MemoryTag.NATIVE_DEFAULT); + try { + assertStrCpyUtf8(mem, bufSize, "abcdef", 3, 3, "abc"); + assertStrCpyUtf8(mem, bufSize, "aéb", 2, 1, "a"); + assertStrCpyUtf8(mem, bufSize, "a世b", 3, 1, "a"); + assertStrCpyUtf8(mem, bufSize, "a\uD83D\uDE00b", 4, 1, "a"); + + assertStrCpyUtf8(mem, bufSize, "aé", 3, 3, "aé"); + assertStrCpyUtf8(mem, bufSize, "a世", 4, 4, "a世"); + assertStrCpyUtf8(mem, bufSize, "a\uD83D\uDE00", 5, 5, "a\uD83D\uDE00"); + } finally { + Unsafe.free(mem, bufSize, MemoryTag.NATIVE_DEFAULT); + } + } + + @Test + public void testStrCpyUtf8ReplacesInvalidSurrogates() { + final int bufSize = 16; + long mem = Unsafe.malloc(bufSize, MemoryTag.NATIVE_DEFAULT); + try { + assertStrCpyUtf8(mem, bufSize, "\uD83Da", bufSize, 2, "?a"); + assertStrCpyUtf8(mem, bufSize, "\uDE00b", bufSize, 2, "?b"); + assertStrCpyUtf8(mem, bufSize, "\uD83D", bufSize, 1, "?"); + assertStrCpyUtf8(mem, bufSize, "\uDE00", bufSize, 1, "?"); + assertStrCpyUtf8(mem, bufSize, "\uD83Da", 1, 1, "?"); + assertStrCpyUtf8(mem, bufSize, "a\uD83D", 1, 1, "a"); + assertStrCpyUtf8(mem, bufSize, "a\uDE00", 1, 1, "a"); + } finally { + Unsafe.free(mem, bufSize, MemoryTag.NATIVE_DEFAULT); + } + } + + @Test + public void testUtf8Bytes() { + Assert.assertEquals(0, Utf8s.utf8Bytes("")); + Assert.assertEquals(5, Utf8s.utf8Bytes("hello")); + Assert.assertEquals(4, Utf8s.utf8Bytes("éü")); + Assert.assertEquals(6, Utf8s.utf8Bytes("世界")); + Assert.assertEquals(4, Utf8s.utf8Bytes("\uD83D\uDE00")); + Assert.assertEquals(10, Utf8s.utf8Bytes("Aé世\uD83D\uDE00")); + Assert.assertEquals(2, Utf8s.utf8Bytes("\uD83Da")); + Assert.assertEquals(2, Utf8s.utf8Bytes("\uDE00b")); + Assert.assertEquals(1, Utf8s.utf8Bytes("\uD83D")); + Assert.assertEquals(1, Utf8s.utf8Bytes("\uDE00")); + } + + @Test + public void testUtf8BytesWithLimit() { + Assert.assertEquals(0, Utf8s.utf8Bytes("hello", 0)); + Assert.assertEquals(5, Utf8s.utf8Bytes("hello", 10)); + Assert.assertEquals(3, Utf8s.utf8Bytes("hello", 3)); + + Assert.assertEquals(1, Utf8s.utf8Bytes("aéb", 2)); + Assert.assertEquals(3, Utf8s.utf8Bytes("aé", 3)); + + Assert.assertEquals(1, Utf8s.utf8Bytes("a世b", 3)); + Assert.assertEquals(4, Utf8s.utf8Bytes("a世", 4)); + + Assert.assertEquals(1, Utf8s.utf8Bytes("a\uD83D\uDE00b", 4)); + Assert.assertEquals(5, Utf8s.utf8Bytes("a\uD83D\uDE00", 5)); + + Assert.assertEquals(1, Utf8s.utf8Bytes("\uD83Da", 1)); + Assert.assertEquals(1, Utf8s.utf8Bytes("\uDE00b", 1)); + Assert.assertEquals(1, Utf8s.utf8Bytes("a\uD83D", 1)); + Assert.assertEquals(1, Utf8s.utf8Bytes("a\uDE00", 1)); + } + @Test public void testUtf8Support() { StringBuilder expected = new StringBuilder(); @@ -269,6 +362,18 @@ public void testUtf8Support() { } } + private static void assertStrCpyUtf8(long mem, int bufSize, String value, int maxBytes, int expectedBytes, String expected) { + final byte sentinel = 0x5A; + fill(mem, bufSize, sentinel); + + int n = Utf8s.strCpyUtf8(value, mem, maxBytes); + Assert.assertEquals(expectedBytes, n); + Assert.assertEquals(expected, readUtf8(mem, n)); + for (int i = n; i < bufSize; i++) { + Assert.assertEquals("write past copied bytes at offset " + i, sentinel, Unsafe.getUnsafe().getByte(mem + i)); + } + } + private static long copyBytes(long buf, byte[] bytes) { for (int n = bytes.length, i = 0; i < n; i++) { Unsafe.getUnsafe().putByte(buf + i, bytes[i]); @@ -276,6 +381,26 @@ private static long copyBytes(long buf, byte[] bytes) { return buf + bytes.length; } + private static void assertUntouched(long mem, int bufSize, byte expected) { + for (int i = 0; i < bufSize; i++) { + Assert.assertEquals("unexpected write at offset " + i, expected, Unsafe.getUnsafe().getByte(mem + i)); + } + } + + private static void fill(long mem, int len, byte value) { + for (int i = 0; i < len; i++) { + Unsafe.getUnsafe().putByte(mem + i, value); + } + } + + private static String readUtf8(long mem, int len) { + byte[] bytes = new byte[len]; + for (int i = 0; i < len; i++) { + bytes[i] = Unsafe.getUnsafe().getByte(mem + i); + } + return new String(bytes, StandardCharsets.UTF_8); + } + private boolean copyToSinkWithTextUtil(StringSink query, String text) { byte[] bytes = text.getBytes(Files.UTF_8); long ptr = Unsafe.malloc(bytes.length, MemoryTag.NATIVE_DEFAULT); From be3092490df88e581d3ea41bf3fdc2b52452a7a4 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 22 Apr 2026 14:47:14 +0200 Subject: [PATCH 3/6] contract clarification --- .../main/java/io/questdb/client/Sender.java | 5 ++++ .../qwp/client/QwpWebSocketSender.java | 25 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/core/src/main/java/io/questdb/client/Sender.java b/core/src/main/java/io/questdb/client/Sender.java index ab77fba4..1175db7f 100644 --- a/core/src/main/java/io/questdb/client/Sender.java +++ b/core/src/main/java/io/questdb/client/Sender.java @@ -102,6 +102,11 @@ *
* Note: If the underlying error is permanent, retrying {@link #flush()} will fail again. * Use {@link #reset()} to discard the problematic data and continue with new data. + *
+ * Note: WebSocket transport uses a terminal sender-level failure model after a + * connection has been established. After a WebSocket send, ACK, or connection + * failure, {@link #reset()} does not recover the sender; close it and create a + * new one. * */ public interface Sender extends Closeable, ArraySender { diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index 90e46e31..c5f004fc 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -85,6 +85,17 @@ * sender.flush(); * } * + *

+ * Failure handling: after this sender has established a WebSocket connection, + * any WebSocket send failure, receive failure, ACK timeout, server error ACK, + * invalid ACK, or server close is terminal for this sender instance. The first + * such failure is retained and subsequent public operations rethrow the same + * {@link LineSenderException}. {@link #reset()} only discards buffered row data; + * it does not recover a terminal WebSocket failure. To resume sending after a + * terminal WebSocket failure, close this sender and create a new instance. + *

+ * Initial connection failures are not retained as terminal sender state; a later + * operation may try to connect again. */ public class QwpWebSocketSender implements Sender { @@ -730,6 +741,20 @@ public QwpWebSocketSender floatColumn(CharSequence columnName, float value) { return this; } + /** + * Flushes buffered rows and waits until the server acknowledges all submitted + * WebSocket batches. + *

+ * If a WebSocket send, receive, ACK timeout, server error ACK, invalid ACK, + * or server close is observed after the connection has been established, the + * sender enters a terminal failed state. The first failure is retained and + * subsequent public operations rethrow the same {@link LineSenderException}. + * Create a new sender to resume sending. + * + * @throws LineSenderException if the sender is closed, a row is still in + * progress, connection setup fails, or a terminal + * WebSocket failure is observed + */ @Override public void flush() { checkNotClosed(); From ab680bedbc664b4fb1216800e9d569c2f1a269d4 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 22 Apr 2026 14:52:41 +0200 Subject: [PATCH 4/6] refresh QWP WebSocket terminal error stack traces --- .../cutlass/qwp/client/QwpWebSocketSender.java | 4 ++++ .../qwp/client/QwpWebSocketSenderStateTest.java | 12 ++++++++++++ 2 files changed, 16 insertions(+) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index c5f004fc..86885e6e 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -1202,6 +1202,10 @@ private void checkNotClosed() { private void checkConnectionError() { LineSenderException error = connectionError.get(); if (error != null) { + // Refresh the stack so subsequent public API calls point at the + // call that observed the terminal sender state, not the I/O thread + // that originally recorded the failure. + error.fillInStackTrace(); throw error; } } diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderStateTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderStateTest.java index a843e4b2..23ed4774 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderStateTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderStateTest.java @@ -67,6 +67,7 @@ public void testConnectionFailureIsSenderLevelTerminalState() throws Exception { Assert.fail("Expected sender-level connection failure"); } catch (LineSenderException e) { Assert.assertSame(failure, e); + assertStackContains(e, "table"); } LineSenderException secondFailure = new LineSenderException("second failure"); @@ -77,6 +78,7 @@ public void testConnectionFailureIsSenderLevelTerminalState() throws Exception { Assert.fail("Expected original sender-level connection failure"); } catch (LineSenderException e) { Assert.assertSame(failure, e); + assertStackContains(e, "flush"); } } finally { sender.close(); @@ -354,6 +356,16 @@ private static void invokeResetSchemaStateForNewConnection(Object target) throws method.invoke(target); } + private static void assertStackContains(Throwable throwable, String methodName) { + for (StackTraceElement element : throwable.getStackTrace()) { + if (QwpWebSocketSender.class.getName().equals(element.getClassName()) + && methodName.equals(element.getMethodName())) { + return; + } + } + Assert.fail("Expected stack trace to contain QwpWebSocketSender." + methodName); + } + private static boolean invokeRecordConnectionFailure(Object target, LineSenderException error) throws Exception { Method method = target.getClass().getDeclaredMethod("recordConnectionFailure", LineSenderException.class); method.setAccessible(true); From 6a6698da1e95242014cf2109f54c62327513f6e1 Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 22 Apr 2026 15:01:56 +0200 Subject: [PATCH 5/6] optimize UTF-8 fallback after ASCII prefix --- .../http/client/WebSocketSendBuffer.java | 8 ++--- .../qwp/client/NativeBufferWriter.java | 7 ++--- .../java/io/questdb/client/std/str/Utf8s.java | 16 ++++++---- .../http/client/WebSocketSendBufferTest.java | 18 +++++++++++- .../qwp/client/NativeBufferWriterTest.java | 16 ++++++++++ .../client/test/std/str/Utf8sTest.java | 29 +++++++++++++++++++ 6 files changed, 80 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java b/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java index c317ba19..cc25beda 100644 --- a/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java +++ b/core/src/main/java/io/questdb/client/cutlass/http/client/WebSocketSendBuffer.java @@ -334,7 +334,7 @@ public void putUtf8(String value) { int charLen = value.length(); ensureCapacity(charLen); - // Single-pass ASCII path. Non-ASCII falls back to the shared UTF-8 utility. + // Single-pass for ASCII. Mixed strings keep the ASCII prefix and resume UTF-8 encoding. long addr = bufPtr + writePos; int i = 0; for (; i < charLen; i++) { @@ -348,9 +348,9 @@ public void putUtf8(String value) { if (i == charLen) { writePos += charLen; } else { - int utf8Len = NativeBufferWriter.utf8Length(value); - ensureCapacity(utf8Len); - writePos += Utf8s.strCpyUtf8(value, bufPtr + writePos, utf8Len); + int utf8Len = Utf8s.utf8Bytes(value, i, charLen); + ensureCapacity(i + utf8Len); + writePos += i + Utf8s.strCpyUtf8(value, i, bufPtr + writePos + i, utf8Len); } } diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java index cc501594..274ce5eb 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/NativeBufferWriter.java @@ -289,10 +289,9 @@ public void putUtf8(String value) { // All ASCII — done in a single pass position += charLen; } else { - // Non-ASCII — fall back to two-pass (re-encodes from start) - int utf8Len = utf8Length(value); - ensureCapacity(utf8Len); - encodeUtf8(value, utf8Len); + int utf8Len = Utf8s.utf8Bytes(value, i, charLen); + ensureCapacity(i + utf8Len); + position += i + Utf8s.strCpyUtf8(value, i, bufferPtr + position + i, utf8Len); } } diff --git a/core/src/main/java/io/questdb/client/std/str/Utf8s.java b/core/src/main/java/io/questdb/client/std/str/Utf8s.java index 480749af..511a4e75 100644 --- a/core/src/main/java/io/questdb/client/std/str/Utf8s.java +++ b/core/src/main/java/io/questdb/client/std/str/Utf8s.java @@ -181,8 +181,12 @@ public static void strCpyAscii(@NotNull CharSequence asciiSrc, int srcLo, int sr * @return the number of UTF-8 bytes written */ public static int strCpyUtf8(@NotNull CharSequence src, long destAddr, int maxBytes) { + return strCpyUtf8(src, 0, destAddr, maxBytes); + } + + public static int strCpyUtf8(@NotNull CharSequence src, int srcLo, long destAddr, int maxBytes) { int pos = 0; - for (int i = 0, n = src.length(); i < n; i++) { + for (int i = srcLo, n = src.length(); i < n; i++) { char c = src.charAt(i); if (c < 0x80) { if (pos + 1 > maxBytes) { @@ -263,10 +267,12 @@ public static String stringFromUtf8Bytes(@NotNull Utf8Sequence seq) { } public static int utf8Bytes(@NotNull CharSequence sequence) { - int count = 0; - int len = sequence.length(); + return utf8Bytes(sequence, 0, sequence.length()); + } - for (int i = 0; i < len; i++) { + public static int utf8Bytes(@NotNull CharSequence sequence, int lo, int hi) { + int count = 0; + for (int i = lo; i < hi; i++) { char ch = sequence.charAt(i); if (ch < 0x80) { count++; @@ -274,7 +280,7 @@ public static int utf8Bytes(@NotNull CharSequence sequence) { count += 2; } else if (Character.isSurrogate(ch)) { if (Character.isHighSurrogate(ch)) { - if (i + 1 < len && Character.isLowSurrogate(sequence.charAt(i + 1))) { + if (i + 1 < hi && Character.isLowSurrogate(sequence.charAt(i + 1))) { count += 4; i++; } else { diff --git a/core/src/test/java/io/questdb/client/test/cutlass/http/client/WebSocketSendBufferTest.java b/core/src/test/java/io/questdb/client/test/cutlass/http/client/WebSocketSendBufferTest.java index dde2ccf9..be63482d 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/http/client/WebSocketSendBufferTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/http/client/WebSocketSendBufferTest.java @@ -26,9 +26,10 @@ import io.questdb.client.cutlass.http.client.WebSocketSendBuffer; import io.questdb.client.std.Unsafe; -import static io.questdb.client.test.tools.TestUtils.assertMemoryLeak; +import io.questdb.client.std.str.Utf8s; import org.junit.Test; +import static io.questdb.client.test.tools.TestUtils.assertMemoryLeak; import static org.junit.Assert.assertEquals; public class WebSocketSendBufferTest { @@ -46,4 +47,19 @@ public void testPutUtf8InvalidSurrogatePair() throws Exception { } }); } + + @Test + public void testPutUtf8MixedAsciiAndNonAsciiAfterGrow() throws Exception { + assertMemoryLeak(() -> { + try (WebSocketSendBuffer buf = new WebSocketSendBuffer(8)) { + String value = "abcdefghijklmnop世界世界世界世界世界世界世界世界世界世界"; + + buf.putUtf8(value); + + int utf8Len = Utf8s.utf8Bytes(value); + assertEquals(utf8Len, buf.getWritePos()); + assertEquals(value, Utf8s.stringFromUtf8Bytes(buf.getBufferPtr(), buf.getBufferPtr() + utf8Len)); + } + }); + } } diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/NativeBufferWriterTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/NativeBufferWriterTest.java index bacdd69a..0fb85406 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/NativeBufferWriterTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/NativeBufferWriterTest.java @@ -26,6 +26,7 @@ import io.questdb.client.cutlass.qwp.client.NativeBufferWriter; import io.questdb.client.std.Unsafe; +import io.questdb.client.std.str.Utf8s; import org.junit.Assert; import org.junit.Test; @@ -522,6 +523,21 @@ public void testWriteUtf8MixedAsciiAndNonAscii() throws Exception { }); } + @Test + public void testWriteUtf8MixedAsciiAndNonAsciiAfterGrow() throws Exception { + assertMemoryLeak(() -> { + try (NativeBufferWriter writer = new NativeBufferWriter(8)) { + String value = "abcdefghijklmnop世界世界世界世界世界世界世界世界世界世界"; + + writer.putUtf8(value); + + int utf8Len = Utf8s.utf8Bytes(value); + Assert.assertEquals(utf8Len, writer.getPosition()); + Assert.assertEquals(value, Utf8s.stringFromUtf8Bytes(writer.getBufferPtr(), writer.getBufferPtr() + utf8Len)); + } + }); + } + @Test public void testWriteUtf8Ascii() throws Exception { assertMemoryLeak(() -> { diff --git a/core/src/test/java/io/questdb/client/test/std/str/Utf8sTest.java b/core/src/test/java/io/questdb/client/test/std/str/Utf8sTest.java index a9c70eed..4a7c09bf 100644 --- a/core/src/test/java/io/questdb/client/test/std/str/Utf8sTest.java +++ b/core/src/test/java/io/questdb/client/test/std/str/Utf8sTest.java @@ -270,6 +270,27 @@ public void testStrCpyUtf8() { } } + @Test + public void testStrCpyUtf8FromOffset() { + final int bufSize = 64; + long mem = Unsafe.malloc(bufSize, MemoryTag.NATIVE_DEFAULT); + try { + final byte sentinel = 0x5A; + String value = "ascii-prefixé世\uD83D\uDE00\uD800x"; + int lo = "ascii-prefix".length(); + int expectedBytes = Utf8s.utf8Bytes(value, lo, value.length()); + + fill(mem, bufSize, sentinel); + Assert.assertEquals(expectedBytes, Utf8s.strCpyUtf8(value, lo, mem, expectedBytes)); + Assert.assertEquals("é世\uD83D\uDE00?x", readUtf8(mem, expectedBytes)); + for (int i = expectedBytes; i < bufSize; i++) { + Assert.assertEquals("write past copied bytes at offset " + i, sentinel, Unsafe.getUnsafe().getByte(mem + i)); + } + } finally { + Unsafe.free(mem, bufSize, MemoryTag.NATIVE_DEFAULT); + } + } + @Test public void testStrCpyUtf8DoesNotSplitCharactersAtLimit() { final int bufSize = 16; @@ -319,6 +340,14 @@ public void testUtf8Bytes() { Assert.assertEquals(1, Utf8s.utf8Bytes("\uDE00")); } + @Test + public void testUtf8BytesRange() { + String value = "xxAé世\uD83D\uDE00yy"; + Assert.assertEquals(10, Utf8s.utf8Bytes(value, 2, 7)); + Assert.assertEquals(1, Utf8s.utf8Bytes(value, 5, 6)); + Assert.assertEquals(4, Utf8s.utf8Bytes(value, 5, 7)); + } + @Test public void testUtf8BytesWithLimit() { Assert.assertEquals(0, Utf8s.utf8Bytes("hello", 0)); From 699fe266676265fb2a6d4daa1afd0f50310e4cbe Mon Sep 17 00:00:00 2001 From: Jaromir Hamala Date: Wed, 22 Apr 2026 15:17:29 +0200 Subject: [PATCH 6/6] cache encoded error-message length and always emit the error length field for non-OK responses, including empty messages. --- .../cutlass/qwp/client/WebSocketResponse.java | 37 ++++++++++++++----- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketResponse.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketResponse.java index 5edac599..9ed6b9b9 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketResponse.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/WebSocketResponse.java @@ -67,6 +67,7 @@ public class WebSocketResponse { public static final byte STATUS_SECURITY_ERROR = 0x08; public static final byte STATUS_WRITE_ERROR = 0x09; private String errorMessage; + private int errorMessageUtf8Length; private long sequence; private byte status; @@ -74,6 +75,7 @@ public WebSocketResponse() { this.status = STATUS_OK; this.sequence = 0; this.errorMessage = null; + this.errorMessageUtf8Length = -1; } /** @@ -84,6 +86,7 @@ public static WebSocketResponse error(long sequence, byte status, String errorMe response.status = status; response.sequence = sequence; response.errorMessage = errorMessage; + response.errorMessageUtf8Length = -1; return response; } @@ -125,6 +128,7 @@ public static WebSocketResponse success(long sequence) { WebSocketResponse response = new WebSocketResponse(); response.status = STATUS_OK; response.sequence = sequence; + response.errorMessageUtf8Length = -1; return response; } @@ -204,11 +208,14 @@ public boolean readFrom(long ptr, int length) { msgBytes[i] = Unsafe.getUnsafe().getByte(ptr + offset + i); } errorMessage = new String(msgBytes, StandardCharsets.UTF_8); + errorMessageUtf8Length = -1; } else { errorMessage = null; + errorMessageUtf8Length = 0; } } else { errorMessage = null; + errorMessageUtf8Length = -1; } return true; @@ -218,12 +225,10 @@ public boolean readFrom(long ptr, int length) { * Calculates the serialized size of this response. */ public int serializedSize() { - int size = MIN_RESPONSE_SIZE; - if (errorMessage != null && !errorMessage.isEmpty()) { - int msgLen = Utf8s.utf8Bytes(errorMessage, MAX_ERROR_MESSAGE_LENGTH); - size += 2 + msgLen; // 2 bytes for length prefix + if (status == STATUS_OK) { + return MIN_RESPONSE_SIZE; } - return size; + return MIN_ERROR_RESPONSE_SIZE + getErrorMessageUtf8Length(); } @Override @@ -253,18 +258,30 @@ public int writeTo(long ptr) { Unsafe.getUnsafe().putLong(ptr + offset, sequence); offset += 8; - // Error message (if any) - if (status != STATUS_OK && errorMessage != null && !errorMessage.isEmpty()) { - int msgLen = Utf8s.utf8Bytes(errorMessage, MAX_ERROR_MESSAGE_LENGTH); - + // Error message length and bytes (if any) + if (status != STATUS_OK) { + int msgLen = getErrorMessageUtf8Length(); // Length prefix (2 bytes, little-endian) Unsafe.getUnsafe().putShort(ptr + offset, (short) msgLen); offset += 2; // Message bytes - offset += Utf8s.strCpyUtf8(errorMessage, ptr + offset, msgLen); + if (msgLen > 0) { + offset += Utf8s.strCpyUtf8(errorMessage, ptr + offset, msgLen); + } } return offset; } + + private int getErrorMessageUtf8Length() { + if (status == STATUS_OK || errorMessage == null || errorMessage.isEmpty()) { + errorMessageUtf8Length = 0; + return 0; + } + if (errorMessageUtf8Length < 0) { + errorMessageUtf8Length = Utf8s.utf8Bytes(errorMessage, MAX_ERROR_MESSAGE_LENGTH); + } + return errorMessageUtf8Length; + } }