diff --git a/test/integration/extension/messagequeue/mysql/queue_test.go b/test/integration/extension/messagequeue/mysql/queue_test.go index 558a2cde..47ae5d32 100644 --- a/test/integration/extension/messagequeue/mysql/queue_test.go +++ b/test/integration/extension/messagequeue/mysql/queue_test.go @@ -2221,10 +2221,12 @@ func (s *SQLQueueIntegrationSuite) TestCrashAfterRejectDoesNotLoseMessages() { // Start worker-2 with same consumer group — it polls and finds msg-C // after lease + visibility expire in the DB + signalCh := make(chan queueMySQL.HookSignal, 100) q2, err := queueMySQL.NewQueue(queueMySQL.Params{ DB: s.db, Logger: zaptest.NewLogger(t), MetricsScope: tally.NoopScope, + OnSignal: signalCh, }) require.NoError(t, err) defer q2.Close() @@ -2243,6 +2245,9 @@ func (s *SQLQueueIntegrationSuite) TestCrashAfterRejectDoesNotLoseMessages() { require.NoError(t, delivery.Ack(s.ctx)) t.Logf("Worker-2 recovered msg-C (attempt=%d)", delivery.Attempt()) + // Wait for the poll loop to advance the watermark after acking msg-C. + waitForSignal(t, signalCh, queueMySQL.SignalDeliveryCheck) + // Verify DLQ contains msg-B dlqTopic := topic + subConfig.DLQ.TopicSuffix dlqConfig := extqueue.DefaultSubscriptionConfig("worker-2", "crash-reject-cg") @@ -2254,7 +2259,9 @@ func (s *SQLQueueIntegrationSuite) TestCrashAfterRejectDoesNotLoseMessages() { assert.Equal(t, "msg-B", dlqDelivery.Message().ID, "msg-B should be in DLQ") require.NoError(t, dlqDelivery.Ack(s.ctx)) - // Verify consumer lag is 0 + // Verify consumer lag is 0. + // Wait for the poll loop so advanceWatermark has run after all acks. + waitForSignal(t, signalCh, queueMySQL.SignalDeliveryCheck) admin := queueAdmin.NewAdminStore(s.db) lags, err := admin.ConsumerLag(s.ctx, topic) require.NoError(t, err)