From 9cb8fda7e874a76df037bc9b97159fcf2f43c248 Mon Sep 17 00:00:00 2001 From: Adam Bettigole Date: Thu, 25 Jun 2026 11:14:02 -0700 Subject: [PATCH] fix(integration): synchronize consumer lag assertion in crash-reject test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TestCrashAfterRejectDoesNotLoseMessages was flaky because it asserted consumer lag immediately after Ack(), but Ack() only marks the delivery state — watermark advancement (which updates offset_acked) is deferred to the next poll loop tick. The test raced against the poll loop: if the lag check ran before advanceWatermark, offset_acked was still stale and lag was non-zero. Fix by adding OnSignal to worker-2's queue and calling waitForSignal after acks, ensuring the poll loop has run advanceWatermark before we check lag. This matches the pattern already used by TestWatermarkAdvancesContiguously. Verified with 50 consecutive passes (0 failures). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../extension/messagequeue/mysql/queue_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/test/integration/extension/messagequeue/mysql/queue_test.go b/test/integration/extension/messagequeue/mysql/queue_test.go index 558a2cde..47ae5d32 100644 --- a/test/integration/extension/messagequeue/mysql/queue_test.go +++ b/test/integration/extension/messagequeue/mysql/queue_test.go @@ -2221,10 +2221,12 @@ func (s *SQLQueueIntegrationSuite) TestCrashAfterRejectDoesNotLoseMessages() { // Start worker-2 with same consumer group — it polls and finds msg-C // after lease + visibility expire in the DB + signalCh := make(chan queueMySQL.HookSignal, 100) q2, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + OnSignal: signalCh, }) require.NoError(t, err) defer q2.Close() @@ -2243,6 +2245,9 @@ func (s *SQLQueueIntegrationSuite) TestCrashAfterRejectDoesNotLoseMessages() { require.NoError(t, delivery.Ack(s.ctx)) t.Logf("Worker-2 recovered msg-C (attempt=%d)", delivery.Attempt()) + // Wait for the poll loop to advance the watermark after acking msg-C. + waitForSignal(t, signalCh, queueMySQL.SignalDeliveryCheck) + // Verify DLQ contains msg-B dlqTopic := topic + subConfig.DLQ.TopicSuffix dlqConfig := extqueue.DefaultSubscriptionConfig("worker-2", "crash-reject-cg") @@ -2254,7 +2259,9 @@ func (s *SQLQueueIntegrationSuite) TestCrashAfterRejectDoesNotLoseMessages() { assert.Equal(t, "msg-B", dlqDelivery.Message().ID, "msg-B should be in DLQ") require.NoError(t, dlqDelivery.Ack(s.ctx)) - // Verify consumer lag is 0 + // Verify consumer lag is 0. + // Wait for the poll loop so advanceWatermark has run after all acks. + waitForSignal(t, signalCh, queueMySQL.SignalDeliveryCheck) admin := queueAdmin.NewAdminStore(s.db) lags, err := admin.ConsumerLag(s.ctx, topic) require.NoError(t, err)