diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java index 8f01c6e2..e459b066 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java @@ -3072,6 +3072,18 @@ private void sealAndSwapBuffer() { cursorEngine.appendBlocking(toSend.getBufferPtr(), toSend.getBufferPos()); toSend.markRecycled(); } catch (Throwable t) { + // appendBlocking failed synchronously on the user thread — the + // payload never reached the engine, so no I/O thread will + // recycle toSend. Recycle it here so a later flush can swap + // back to it; flushPendingRows aborts its post-enqueue state + // updates after this throw, so the source rows and the + // sent-schema watermark stay intact and the next batch re-emits + // the same rows along with the full schema + symbol-dict delta. + if (toSend.isSending()) { + toSend.markRecycled(); + } else if (toSend.isSealed()) { + toSend.rollbackSealForRetry(); + } // Surface any I/O thread error first — appendBlocking itself only // throws on PAYLOAD_TOO_LARGE / backpressure deadline, but the // I/O loop can have failed independently. diff --git a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java index b817fa84..ce2ff990 100644 --- a/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java +++ b/core/src/main/java/io/questdb/client/cutlass/qwp/client/sf/cursor/SegmentManager.java @@ -247,6 +247,17 @@ public void register(SegmentRing ring, String dir, AckWatermark watermark) { } } ring.setManagerWakeup(this::wakeWorker); + // Nudge the worker so it picks up the new ring on its very next + // iteration. Without this, register-after-start has a race window: + // start() schedules the worker thread, and if that thread reaches + // workerLoop and takes `lock` before this method does, it observes + // an empty `rings` snapshot, services nothing, then parkNanos + // (potentially seconds). A new ring whose first append does not + // cross the high-water mark fires no producer-side wakeup either, + // leaving the ring without a spare for the full poll interval. + // wakeWorker is cheap (a single LockSupport.unpark) and a no-op + // when the worker has not been started yet. + wakeWorker(); } public synchronized void start() { diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java index ecc8eed5..2a602cee 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/QwpWebSocketSenderTest.java @@ -27,17 +27,22 @@ import io.questdb.client.cutlass.line.LineSenderException; import io.questdb.client.cutlass.line.array.DoubleArray; import io.questdb.client.cutlass.line.array.LongArray; +import io.questdb.client.cutlass.qwp.client.MicrobatchBuffer; import io.questdb.client.cutlass.qwp.client.QwpWebSocketSender; +import io.questdb.client.cutlass.qwp.client.sf.cursor.CursorSendEngine; import io.questdb.client.cutlass.qwp.protocol.QwpTableBuffer; import io.questdb.client.std.Decimal128; import io.questdb.client.std.Decimal256; import io.questdb.client.std.Decimal64; import io.questdb.client.std.bytes.DirectByteSlice; +import io.questdb.client.test.cutlass.qwp.websocket.TestWebSocketServer; import org.junit.Assert; import org.junit.Test; +import java.lang.reflect.Field; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.concurrent.TimeUnit; import static io.questdb.client.test.tools.TestUtils.assertMemoryLeak; @@ -326,6 +331,54 @@ public void testDoubleColumnAfterCloseThrows() throws Exception { }); } + @Test + public void testFlushAppendFailureDoesNotLeaveMicrobatchBufferInUse() throws Exception { + assertMemoryLeak(() -> { + int port = TestPorts.findUnusedPort(); + try (TestWebSocketServer server = new TestWebSocketServer(port, new TestWebSocketServer.WebSocketServerHandler() { + })) { + server.start(); + Assert.assertTrue(server.awaitStart(5, TimeUnit.SECONDS)); + + // Memory-only engine with a 33-byte budget and a 1 ns append + // deadline guarantees every appendBlocking() call trips the + // backpressure deadline and throws. + CursorSendEngine engine = new CursorSendEngine(null, 33, 33, 1L); + QwpWebSocketSender sender = QwpWebSocketSender.connect( + "localhost", port, null, Integer.MAX_VALUE, 0, 0L, null, + QwpWebSocketSender.DEFAULT_MAX_SCHEMAS_PER_CONNECTION, false, engine, 0L); + try { + sender.table("t").longColumn("v", 1L).atNow(); + + try { + sender.flushAndGetSequence(); + Assert.fail("Expected LineSenderException"); + } catch (LineSenderException e) { + Assert.assertTrue(e.getMessage().contains("cursor SF append failed")); + } + + MicrobatchBuffer buffer0 = getMicrobatchBuffer(sender, "buffer0"); + MicrobatchBuffer buffer1 = getMicrobatchBuffer(sender, "buffer1"); + Assert.assertFalse( + "failed append must not leave any buffer in use [buffer0=" + + MicrobatchBuffer.stateName(buffer0.getState()) + + ", buffer1=" + MicrobatchBuffer.stateName(buffer1.getState()) + "]", + buffer0.isInUse() || buffer1.isInUse()); + } finally { + // close() drains pending rows, which appendBlocking still + // rejects because the engine is permanently wedged in this + // test. The bug under test is about microbatch buffer + // state, not about close() being lenient toward residual + // unflushed rows — swallow the predictable rethrow here. + try { + sender.close(); + } catch (LineSenderException ignored) { + } + } + } + }); + } + @Test public void testGeoHashColumnLongAfterCloseThrows() throws Exception { assertMemoryLeak(() -> { @@ -705,6 +758,12 @@ private static void assertClosed(Runnable r) { } } + private static MicrobatchBuffer getMicrobatchBuffer(QwpWebSocketSender sender, String fieldName) throws Exception { + Field field = QwpWebSocketSender.class.getDeclaredField(fieldName); + field.setAccessible(true); + return (MicrobatchBuffer) field.get(sender); + } + /** * Creates a sender without connecting. * For unit tests that don't need actual connectivity. diff --git a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerTest.java b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerTest.java index 1faca61f..3ee4301b 100644 --- a/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerTest.java +++ b/core/src/test/java/io/questdb/client/test/cutlass/qwp/client/sf/cursor/SegmentManagerTest.java @@ -218,18 +218,20 @@ public void testFirstSpareLandsBeforeFirstPoll() throws Exception { TestUtils.assertMemoryLeak(() -> { // pollNanos is intentionally long enough that the 5s park can be // ruled out as the mechanism by which the first spare arrives. - // The worker thread enters workerLoop on start(), takes the lock, - // sees the just-registered ring with needsHotSpare()==true, and - // provisions the spare BEFORE parking. The spare must therefore - // land within seconds of register(), not minutes -- the 5s park is - // never reached on the first iteration. + // register() unparks the worker after publishing the new ring, + // so the worker re-iterates and provisions the spare even when + // its first loop snapshot ran before register() acquired `lock`. + // The spare must therefore land within seconds of register(), + // not minutes -- the 5s park is never reached. // // The append below is incidental to the contract under test; it // does NOT cross the SegmentRing high-water mark for this 4-frame // segment (HEADER_SIZE 24 + FRAME_HEADER_SIZE 8 + 16 = 48 vs // signalAtBytes = (120 >> 2) * 3 = 90), so no producer-side wakeup // fires. The rotation/high-water wakeup paths are covered by - // testRotationWakeupTriggersImmediateSparePrep. + // testRotationWakeupTriggersImmediateSparePrep, and the + // deterministic register-after-park case is covered by + // testRegisterAfterWorkerParkedWakesWorker. long pollNanos = 5_000_000_000L; // 5 seconds long segSize = MmapSegment.HEADER_SIZE + 4 * (MmapSegment.FRAME_HEADER_SIZE + 16); @@ -256,6 +258,40 @@ public void testFirstSpareLandsBeforeFirstPoll() throws Exception { }); } + @Test + public void testRegisterAfterWorkerParkedWakesWorker() throws Exception { + TestUtils.assertMemoryLeak(() -> { + // Deterministic version of testFirstSpareLandsBeforeFirstPoll: + // sleep between start() and register() long enough for the worker + // to definitely complete its first (empty) iteration and enter + // parkNanos. Without register()'s wakeWorker() the spare would + // not land for the full 5s poll interval; with it the spare lands + // promptly because register() unparks the worker out of its park. + // No append at all, so no producer-side wakeup can mask a missing + // register-side wakeup. + long pollNanos = 5_000_000_000L; // 5 seconds + long segSize = MmapSegment.HEADER_SIZE + + 4 * (MmapSegment.FRAME_HEADER_SIZE + 16); + MmapSegment seg0 = MmapSegment.create(tmpDir + "/0000000000000000.sfa", 0, segSize); + try (SegmentRing ring = new SegmentRing(seg0, segSize); + SegmentManager mgr = new SegmentManager(segSize, pollNanos)) { + mgr.start(); + // Give the worker plenty of time to enter workerLoop, snapshot + // an empty rings list, and reach parkNanos. 250ms is far more + // than the OS scheduling + thread startup cost on any sane + // CI runner, and still well below the 5s poll interval. + Thread.sleep(250); + long t0 = System.nanoTime(); + mgr.register(ring, tmpDir); + assertTrue("register must wake a worker that has already parked", + waitFor(() -> !ring.needsHotSpare(), 2000)); + long elapsedMs = (System.nanoTime() - t0) / 1_000_000L; + assertTrue("spare arrived in " + elapsedMs + "ms -- should be <<5000ms", + elapsedMs < 4000); + } + }); + } + @Test public void testRotationWakeupTriggersImmediateSparePrep() throws Exception { TestUtils.assertMemoryLeak(() -> {