From 3e1bd503ed4c40e2e72ff1325cd6b7e98e8d5154 Mon Sep 17 00:00:00 2001 From: bluestreak Date: Sat, 23 May 2026 22:34:45 +0100 Subject: [PATCH 1/2] Fix microbatch buffer stuck after cursor append failure When CursorSendEngine.appendBlocking() throws inside sealAndSwapBuffer(), the catch block now puts the sealed buffer back into a state isInUse() reports as false: markRecycled() when it is SENDING, rollbackSealForRetry() when it is still SEALED. Without this, the buffer stayed in SENDING forever. No I/O thread ever recycles a buffer the engine never accepted, so the next flush would wait the 30 s recycle timeout and throw "Timeout waiting for buffer to be recycled". The encoded payload is dropped, but flushPendingRows bails out of its post-enqueue state updates after sealAndSwapBuffer throws, so the source rows and the sent-schema watermark stay intact and the next batch re-emits the same rows along with the full schema and symbol-dict delta. Adds testFlushAppendFailureDoesNotLeaveMicrobatchBufferInUse, the reproducer from questdb/questdb#7143. A memory-only CursorSendEngine configured to fail every append lets the test confirm both microbatch buffers leave the SENDING and SEALED states after a flush failure. Refs questdb/questdb#7143 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../qwp/client/QwpWebSocketSender.java | 12 ++++ .../qwp/client/QwpWebSocketSenderTest.java | 59 +++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index 8f01c6e2..e459b066 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -3072,6 +3072,18 @@ private void sealAndSwapBuffer() { cursorEngine.appendBlocking(toSend.getBufferPtr(), toSend.getBufferPos()); toSend.markRecycled(); } catch (Throwable t) { + // appendBlocking failed synchronously on the user thread — the + // payload never reached the engine, so no I/O thread will + // recycle toSend. Recycle it here so a later flush can swap + // back to it; flushPendingRows aborts its post-enqueue state + // updates after this throw, so the source rows and the + // sent-schema watermark stay intact and the next batch re-emits + // the same rows along with the full schema + symbol-dict delta. + if (toSend.isSending()) { + toSend.markRecycled(); + } else if (toSend.isSealed()) { + toSend.rollbackSealForRetry(); + } // Surface any I/O thread error first — appendBlocking itself only // throws on PAYLOAD_TOO_LARGE / backpressure deadline, but the // I/O loop can have failed independently. diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java index ecc8eed5..2a602cee 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java @@ -27,17 +27,22 @@ import io.questdb.client.cutlass.line.LineSenderException; import io.questdb.client.cutlass.line.array.DoubleArray; import io.questdb.client.cutlass.line.array.LongArray; +import io.questdb.client.cutlass.qwp.client.MicrobatchBuffer; import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender; +import io.questdb.client.cutlass.qwp.client.sf.cursor.CursorSendEngine; import io.questdb.client.cutlass.qwp.protocol.QwpTableBuffer; import io.questdb.client.std.Decimal128; import io.questdb.client.std.Decimal256; import io.questdb.client.std.Decimal64; import io.questdb.client.std.bytes.DirectByteSlice; +import io.questdb.client.test.cutlass.qwp.websocket.TestWebSocketServer; import org.junit.Assert; import org.junit.Test; +import java.lang.reflect.Field; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.concurrent.TimeUnit; import static io.questdb.client.test.tools.TestUtils.assertMemoryLeak; @@ -326,6 +331,54 @@ public void testDoubleColumnAfterCloseThrows() throws Exception { }); } + @Test + public void testFlushAppendFailureDoesNotLeaveMicrobatchBufferInUse() throws Exception { + assertMemoryLeak(() -> { + int port = TestPorts.findUnusedPort(); + try (TestWebSocketServer server = new TestWebSocketServer(port, new TestWebSocketServer.WebSocketServerHandler() { + })) { + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + // Memory-only engine with a 33-byte budget and a 1 ns append + // deadline guarantees every appendBlocking() call trips the + // backpressure deadline and throws. + CursorSendEngine engine = new CursorSendEngine(null, 33, 33, 1L); + QwpWebSocketSender sender = QwpWebSocketSender.connect( + "localhost", port, null, Integer.MAX_VALUE, 0, 0L, null, + QwpWebSocketSender.DEFAULT_MAX_SCHEMAS_PER_CONNECTION, false, engine, 0L); + try { + sender.table("t").longColumn("v", 1L).atNow(); + + try { + sender.flushAndGetSequence(); + Assert.fail("Expected LineSenderException"); + } catch (LineSenderException e) { + Assert.assertTrue(e.getMessage().contains("cursor SF append failed")); + } + + MicrobatchBuffer buffer0 = getMicrobatchBuffer(sender, "buffer0"); + MicrobatchBuffer buffer1 = getMicrobatchBuffer(sender, "buffer1"); + Assert.assertFalse( + "failed append must not leave any buffer in use [buffer0=" + + MicrobatchBuffer.stateName(buffer0.getState()) + + ", buffer1=" + MicrobatchBuffer.stateName(buffer1.getState()) + "]", + buffer0.isInUse() || buffer1.isInUse()); + } finally { + // close() drains pending rows, which appendBlocking still + // rejects because the engine is permanently wedged in this + // test. The bug under test is about microbatch buffer + // state, not about close() being lenient toward residual + // unflushed rows — swallow the predictable rethrow here. + try { + sender.close(); + } catch (LineSenderException ignored) { + } + } + } + }); + } + @Test public void testGeoHashColumnLongAfterCloseThrows() throws Exception { assertMemoryLeak(() -> { @@ -705,6 +758,12 @@ private static void assertClosed(Runnable r) { } } + private static MicrobatchBuffer getMicrobatchBuffer(QwpWebSocketSender sender, String fieldName) throws Exception { + Field field = QwpWebSocketSender.class.getDeclaredField(fieldName); + field.setAccessible(true); + return (MicrobatchBuffer) field.get(sender); + } + /** * Creates a sender without connecting. * For unit tests that don't need actual connectivity. From ae09d47ed214c946f560a159b722caed350885e8 Mon Sep 17 00:00:00 2001 From: bluestreak Date: Sun, 24 May 2026 00:29:53 +0100 Subject: [PATCH 2/2] Wake segment-manager worker on register SegmentManager.register() now unparks the worker thread after publishing the new ring. Without this, register-after-start has a race: start() schedules the worker, and if that thread reaches workerLoop and takes `lock` before register() does, it snapshots an empty `rings`, services nothing, and parks for the full poll interval. A ring whose first append does not cross the high-water mark fires no producer-side wakeup either, so the spare never lands until the poll expires. testFirstSpareLandsBeforeFirstPoll fails on CI under JaCoCo on the mac-other runner whenever the worker wins the lock first; the prior fix (commit 19c5c65) only widened the budget to 2s, but the poll interval is 5s so no budget below 5s can rescue that ordering. The LockSupport.unpark is cheap, no-ops when the worker has not been started, and grants a permit that the next parkNanos consumes immediately, so it covers both interleavings. Adds testRegisterAfterWorkerParkedWakesWorker as a deterministic regression test: sleeps 250ms between start() and register() so the worker is guaranteed to have parked, then asserts the spare lands within 2s. Without the wakeWorker() call in register() this test fails reliably; with it, all 9 SegmentManagerTest cases pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../qwp/client/sf/cursor/SegmentManager.java | 11 +++++ .../client/sf/cursor/SegmentManagerTest.java | 48 ++++++++++++++++--- 2 files changed, 53 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java index b817fa84..ce2ff990 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java @@ -247,6 +247,17 @@ public void register(SegmentRing ring, String dir, AckWatermark watermark) { } } ring.setManagerWakeup(this::wakeWorker); + // Nudge the worker so it picks up the new ring on its very next + // iteration. Without this, register-after-start has a race window: + // start() schedules the worker thread, and if that thread reaches + // workerLoop and takes `lock` before this method does, it observes + // an empty `rings` snapshot, services nothing, then parkNanos + // (potentially seconds). A new ring whose first append does not + // cross the high-water mark fires no producer-side wakeup either, + // leaving the ring without a spare for the full poll interval. + // wakeWorker is cheap (a single LockSupport.unpark) and a no-op + // when the worker has not been started yet. + wakeWorker(); } public synchronized void start() { diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerTest.java index 1faca61f..3ee4301b 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerTest.java @@ -218,18 +218,20 @@ public void testFirstSpareLandsBeforeFirstPoll() throws Exception { TestUtils.assertMemoryLeak(() -> { // pollNanos is intentionally long enough that the 5s park can be // ruled out as the mechanism by which the first spare arrives. - // The worker thread enters workerLoop on start(), takes the lock, - // sees the just-registered ring with needsHotSpare()==true, and - // provisions the spare BEFORE parking. The spare must therefore - // land within seconds of register(), not minutes -- the 5s park is - // never reached on the first iteration. + // register() unparks the worker after publishing the new ring, + // so the worker re-iterates and provisions the spare even when + // its first loop snapshot ran before register() acquired `lock`. + // The spare must therefore land within seconds of register(), + // not minutes -- the 5s park is never reached. // // The append below is incidental to the contract under test; it // does NOT cross the SegmentRing high-water mark for this 4-frame // segment (HEADER_SIZE 24 + FRAME_HEADER_SIZE 8 + 16 = 48 vs // signalAtBytes = (120 >> 2) * 3 = 90), so no producer-side wakeup // fires. The rotation/high-water wakeup paths are covered by - // testRotationWakeupTriggersImmediateSparePrep. + // testRotationWakeupTriggersImmediateSparePrep, and the + // deterministic register-after-park case is covered by + // testRegisterAfterWorkerParkedWakesWorker. long pollNanos = 5_000_000_000L; // 5 seconds long segSize = MmapSegment.HEADER_SIZE + 4 * (MmapSegment.FRAME_HEADER_SIZE + 16); @@ -256,6 +258,40 @@ public void testFirstSpareLandsBeforeFirstPoll() throws Exception { }); } + @Test + public void testRegisterAfterWorkerParkedWakesWorker() throws Exception { + TestUtils.assertMemoryLeak(() -> { + // Deterministic version of testFirstSpareLandsBeforeFirstPoll: + // sleep between start() and register() long enough for the worker + // to definitely complete its first (empty) iteration and enter + // parkNanos. Without register()'s wakeWorker() the spare would + // not land for the full 5s poll interval; with it the spare lands + // promptly because register() unparks the worker out of its park. + // No append at all, so no producer-side wakeup can mask a missing + // register-side wakeup. + long pollNanos = 5_000_000_000L; // 5 seconds + long segSize = MmapSegment.HEADER_SIZE + + 4 * (MmapSegment.FRAME_HEADER_SIZE + 16); + MmapSegment seg0 = MmapSegment.create(tmpDir + "/0000000000000000.sfa", 0, segSize); + try (SegmentRing ring = new SegmentRing(seg0, segSize); + SegmentManager mgr = new SegmentManager(segSize, pollNanos)) { + mgr.start(); + // Give the worker plenty of time to enter workerLoop, snapshot + // an empty rings list, and reach parkNanos. 250ms is far more + // than the OS scheduling + thread startup cost on any sane + // CI runner, and still well below the 5s poll interval. + Thread.sleep(250); + long t0 = System.nanoTime(); + mgr.register(ring, tmpDir); + assertTrue("register must wake a worker that has already parked", + waitFor(() -> !ring.needsHotSpare(), 2000)); + long elapsedMs = (System.nanoTime() - t0) / 1_000_000L; + assertTrue("spare arrived in " + elapsedMs + "ms -- should be <<5000ms", + elapsedMs < 4000); + } + }); + } + @Test public void testRotationWakeupTriggersImmediateSparePrep() throws Exception { TestUtils.assertMemoryLeak(() -> {