From 50471a3449460a0ec2691ac6ce074ed600e952e7 Mon Sep 17 00:00:00 2001 From: Mark Tokutomi Date: Wed, 6 May 2026 17:12:51 -0700 Subject: [PATCH 01/11] feat: add optional DLQ support to Azure Storage Queue broker When AzureConfig.DLQ is set, messages whose DequeueCount exceeds MaxReceives (default 10) are enqueued to the DLQ and deleted from the source queue before processing is attempted, mirroring SQS redrive semantics. Infra failures (DLQ enqueue or source delete) are logged and return nil so transient errors do not exit the consume loop. Co-Authored-By: Claude Sonnet 4.6 --- v1/brokers/azure/storage_queue.go | 38 +++++++++++++++++++++++++++++++ v1/config/config.go | 5 ++++ 2 files changed, 43 insertions(+) diff --git a/v1/brokers/azure/storage_queue.go b/v1/brokers/azure/storage_queue.go index 9c790c1c..04ac2da4 100644 --- a/v1/brokers/azure/storage_queue.go +++ b/v1/brokers/azure/storage_queue.go @@ -38,6 +38,9 @@ type Broker struct { cfg config.AzureConfig queueName string newQueueClient func(string) queueClient + dlqClient queueClient // nil ⇒ DLQ disabled + maxReceives int32 + dlqTTL time.Duration } // New creates new Broker instance @@ -48,6 +51,17 @@ func New(cnf *config.Config) iface.Broker { b.newQueueClient = func(name string) queueClient { return cnf.Azure.Client.NewQueueClient(name) } + if cnf.Azure.DLQ != nil { + b.dlqClient = cnf.Azure.DLQ + b.maxReceives = cnf.Azure.MaxReceives + if b.maxReceives == 0 { + b.maxReceives = 10 + } + b.dlqTTL = cnf.Azure.DLQTTL + if b.dlqTTL == 0 { + b.dlqTTL = 30 * 24 * time.Hour + } + } return b } @@ -261,6 +275,19 @@ func (b *Broker) consumeOne(delivery azqueue.DequeueMessagesResponse, taskProces msg := delivery.Messages[0] + if b.dlqClient != nil && msg.DequeueCount != nil && *msg.DequeueCount > int64(b.maxReceives) { + if err := b.dlqOne(msg); err != nil { + log.ERROR.Printf("error enqueueing message %s to DLQ: %s", *msg.MessageID, err) + return nil // leave on source; visibility timeout will retry + } + if err := b.deleteOne(msg); err != nil { + log.ERROR.Printf("error deleting message %s from source after DLQ enqueue: %s", *msg.MessageID, err) + return nil // duplicate in DLQ on next attempt; do not exit consume loop + } + log.INFO.Printf("moved message %s to DLQ after %d receives", *msg.MessageID, *msg.DequeueCount) + return nil + } + sig := new(tasks.Signature) decoder := json.NewDecoder(strings.NewReader(*msg.MessageText)) decoder.UseNumber() @@ -333,6 +360,17 @@ func (b *Broker) consumeOne(delivery azqueue.DequeueMessagesResponse, taskProces return err } +// dlqOne enqueues a message to the configured DLQ. +func (b *Broker) dlqOne(msg *azqueue.DequeuedMessage) error { + ttlSeconds := int32(b.dlqTTL.Seconds()) + _, err := b.dlqClient.EnqueueMessage( + context.Background(), + *msg.MessageText, + &azqueue.EnqueueMessageOptions{TimeToLive: &ttlSeconds}, + ) + return err +} + // deleteOne is a method delete a delivery from AWS SQS func (b *Broker) deleteOne(message *azqueue.DequeuedMessage) error { _, err := b.newQueueClient(b.queueName).DeleteMessage(context.Background(), *message.MessageID, *message.PopReceipt, nil) diff --git a/v1/config/config.go b/v1/config/config.go index 4c33477a..fdde5847 100644 --- a/v1/config/config.go +++ b/v1/config/config.go @@ -109,6 +109,11 @@ type AzureConfig struct { TTL time.Duration // Visibility timeout, same concept as SQS, how long to exclusively hold the message. VisibilityTimeout time.Duration + + // Optional DLQ. Set DLQ to enable; MaxReceives and DLQTTL have defaults when zero. + DLQ *azqueue.QueueClient // pre-bound DLQ queue client; nil ⇒ DLQ disabled + MaxReceives int32 // max DequeueCount before redriving; 0 with DLQ set ⇒ 10 + DLQTTL time.Duration // TTL for DLQ messages; 0 ⇒ 30 days } // RedisConfig ... From 172214101b1543bd143780a62048326e840441bb Mon Sep 17 00:00:00 2001 From: Mark Tokutomi Date: Wed, 6 May 2026 17:19:13 -0700 Subject: [PATCH 02/11] test: DLQ support for Azure Storage Queue broker Covers the seven cases from the plan: below/above threshold, enqueue failure with retry, source-delete failure with retry (verifying the at-least-once duplicate), DLQ disabled, and the MaxReceives/DLQTTL defaults. Retry cases swap the failing mock mid-test to confirm the message actually lands in the DLQ on the subsequent attempt. Co-Authored-By: Claude Sonnet 4.6 --- v1/brokers/azure/storage_queue_export_test.go | 18 ++ v1/brokers/azure/storage_queue_test.go | 249 ++++++++++++++++++ 2 files changed, 267 insertions(+) diff --git a/v1/brokers/azure/storage_queue_export_test.go b/v1/brokers/azure/storage_queue_export_test.go index be1c4f60..11c55337 100644 --- a/v1/brokers/azure/storage_queue_export_test.go +++ b/v1/brokers/azure/storage_queue_export_test.go @@ -3,6 +3,7 @@ package azure import ( "context" "sync" + "time" "github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue" "github.com/RichardKnop/machinery/v1/brokers/iface" @@ -85,3 +86,20 @@ func (b *Broker) GetStopReceivingChanForTest() chan int { func (b *Broker) ConsumeOneForTest(delivery azqueue.DequeueMessagesResponse, taskProcessor iface.TaskProcessor) error { return b.consumeOne(delivery, taskProcessor) } + +// SetDLQClientForTest configures the broker's DLQ client and applies the same +// defaulting logic as New() (0 maxReceives → 10, 0 dlqTTL → 30 days). +// Pass c=nil to disable DLQ. +func (b *Broker) SetDLQClientForTest(c queueClient, maxReceives int32, dlqTTL time.Duration) { + b.dlqClient = c + if c != nil { + b.maxReceives = maxReceives + if b.maxReceives == 0 { + b.maxReceives = 10 + } + b.dlqTTL = dlqTTL + if b.dlqTTL == 0 { + b.dlqTTL = 30 * 24 * time.Hour + } + } +} diff --git a/v1/brokers/azure/storage_queue_test.go b/v1/brokers/azure/storage_queue_test.go index 8d3e95cd..836448c8 100644 --- a/v1/brokers/azure/storage_queue_test.go +++ b/v1/brokers/azure/storage_queue_test.go @@ -2,6 +2,7 @@ package azure_test import ( "context" + "errors" "sync/atomic" "testing" "time" @@ -141,3 +142,251 @@ func TestStartConsuming_V1_ProcessesTask(t *testing.T) { func TestStartConsuming_V2_ProcessesTask(t *testing.T) { testStartConsumingProcessesTask(t, newTestV2Pool(1)) } + +// dlqTestDelivery builds a single-message DequeueMessagesResponse for DLQ tests. +// Invalid JSON is fine for tests that exercise the pre-decode DLQ check. +func dlqTestDelivery(dequeueCount int64, body string) azqueue.DequeueMessagesResponse { + msgID := "test-msg-id" + popReceipt := "test-pop-receipt" + return azqueue.DequeueMessagesResponse{ + Messages: []*azqueue.DequeuedMessage{{ + MessageID: &msgID, + PopReceipt: &popReceipt, + MessageText: &body, + DequeueCount: &dequeueCount, + }}, + } +} + +func TestConsumeOne_DLQ_BelowThreshold_ProcessesNormally(t *testing.T) { + t.Parallel() + + var dlqEnqueueCalls, sourceDeleteCalls atomic.Int32 + + dlqMock := &azure.MockClient{ + EnqueueFunc: func(_ context.Context, _ string, _ *azqueue.EnqueueMessageOptions) (azqueue.EnqueueMessagesResponse, error) { + dlqEnqueueCalls.Add(1) + return azqueue.EnqueueMessagesResponse{}, nil + }, + } + sourceMock := &azure.MockClient{ + DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { + sourceDeleteCalls.Add(1) + return azqueue.DeleteMessageResponse{}, nil + }, + } + + broker := azure.NewTestBroker() + broker.SetMockClientForTest(sourceMock) + broker.SetDLQClientForTest(dlqMock, 10, time.Hour) + + // DequeueCount == MaxReceives: not over threshold, DLQ must not trigger. + // Invalid JSON causes the normal decode-failure path (source delete, error returned). + broker.ConsumeOneForTest(dlqTestDelivery(10, "not-valid-json"), nil) + + assert.Equal(t, int32(0), dlqEnqueueCalls.Load(), "DLQ should not be invoked at threshold") + assert.Equal(t, int32(1), sourceDeleteCalls.Load(), "source should be deleted via normal path") +} + +func TestConsumeOne_DLQ_AboveThreshold_Redrives(t *testing.T) { + t.Parallel() + + var dlqEnqueueCalls, sourceDeleteCalls atomic.Int32 + var capturedContent string + + dlqMock := &azure.MockClient{ + EnqueueFunc: func(_ context.Context, content string, _ *azqueue.EnqueueMessageOptions) (azqueue.EnqueueMessagesResponse, error) { + dlqEnqueueCalls.Add(1) + capturedContent = content + return azqueue.EnqueueMessagesResponse{}, nil + }, + } + sourceMock := &azure.MockClient{ + DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { + sourceDeleteCalls.Add(1) + return azqueue.DeleteMessageResponse{}, nil + }, + } + + broker := azure.NewTestBroker() + broker.SetMockClientForTest(sourceMock) + broker.SetDLQClientForTest(dlqMock, 10, time.Hour) + + const body = "original-message-body" + err := broker.ConsumeOneForTest(dlqTestDelivery(11, body), nil) + + require.NoError(t, err) + assert.Equal(t, int32(1), dlqEnqueueCalls.Load()) + assert.Equal(t, body, capturedContent, "DLQ should receive the original message body") + assert.Equal(t, int32(1), sourceDeleteCalls.Load()) +} + +func TestConsumeOne_DLQ_EnqueueFails_RetrySucceeds(t *testing.T) { + // Not parallel: broker state is mutated between the two consumeOne calls. + var dlqEnqueueCalls, sourceDeleteCalls atomic.Int32 + + sourceMock := &azure.MockClient{ + DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { + sourceDeleteCalls.Add(1) + return azqueue.DeleteMessageResponse{}, nil + }, + } + failingDLQ := &azure.MockClient{ + EnqueueFunc: func(_ context.Context, _ string, _ *azqueue.EnqueueMessageOptions) (azqueue.EnqueueMessagesResponse, error) { + dlqEnqueueCalls.Add(1) + return azqueue.EnqueueMessagesResponse{}, errors.New("DLQ unavailable") + }, + } + + broker := azure.NewTestBroker() + broker.SetMockClientForTest(sourceMock) + broker.SetDLQClientForTest(failingDLQ, 10, time.Hour) + + delivery := dlqTestDelivery(11, "some-body") + + // First attempt: DLQ enqueue fails — consumer must survive. + err := broker.ConsumeOneForTest(delivery, nil) + require.NoError(t, err, "consumer must survive DLQ enqueue failure") + assert.Equal(t, int32(1), dlqEnqueueCalls.Load(), "DLQ enqueue was attempted") + assert.Equal(t, int32(0), sourceDeleteCalls.Load(), "source must not be deleted when DLQ fails") + + // Simulate visibility-timeout redelivery: swap in a working DLQ client. + workingDLQ := &azure.MockClient{ + EnqueueFunc: func(_ context.Context, _ string, _ *azqueue.EnqueueMessageOptions) (azqueue.EnqueueMessagesResponse, error) { + dlqEnqueueCalls.Add(1) + return azqueue.EnqueueMessagesResponse{}, nil + }, + } + broker.SetDLQClientForTest(workingDLQ, 10, time.Hour) + + err = broker.ConsumeOneForTest(delivery, nil) + require.NoError(t, err) + assert.Equal(t, int32(2), dlqEnqueueCalls.Load(), "DLQ enqueue retried on second attempt") + assert.Equal(t, int32(1), sourceDeleteCalls.Load(), "source deleted after successful DLQ enqueue") +} + +func TestConsumeOne_DLQ_DeleteFails_RetryEnqueuesDuplicate(t *testing.T) { + // Not parallel: broker state is mutated between the two consumeOne calls. + var dlqEnqueueCalls, sourceDeleteCalls atomic.Int32 + + dlqMock := &azure.MockClient{ + EnqueueFunc: func(_ context.Context, _ string, _ *azqueue.EnqueueMessageOptions) (azqueue.EnqueueMessagesResponse, error) { + dlqEnqueueCalls.Add(1) + return azqueue.EnqueueMessagesResponse{}, nil + }, + } + failingSourceMock := &azure.MockClient{ + DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { + sourceDeleteCalls.Add(1) + return azqueue.DeleteMessageResponse{}, errors.New("source delete failed") + }, + } + + broker := azure.NewTestBroker() + broker.SetMockClientForTest(failingSourceMock) + broker.SetDLQClientForTest(dlqMock, 10, time.Hour) + + delivery := dlqTestDelivery(11, "some-body") + + // First attempt: DLQ enqueue OK, source delete fails — consumer must survive. + err := broker.ConsumeOneForTest(delivery, nil) + require.NoError(t, err, "consumer must survive source delete failure") + assert.Equal(t, int32(1), dlqEnqueueCalls.Load(), "DLQ enqueue succeeded") + assert.Equal(t, int32(1), sourceDeleteCalls.Load(), "source delete was attempted") + + // Simulate visibility-timeout redelivery: swap in a working source mock. + workingSourceMock := &azure.MockClient{ + DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { + sourceDeleteCalls.Add(1) + return azqueue.DeleteMessageResponse{}, nil + }, + } + broker.SetMockClientForTest(workingSourceMock) + + err = broker.ConsumeOneForTest(delivery, nil) + require.NoError(t, err) + // DLQ receives a duplicate — this is the documented at-least-once semantic. + assert.Equal(t, int32(2), dlqEnqueueCalls.Load(), "DLQ receives a duplicate on retry (at-least-once)") + assert.Equal(t, int32(2), sourceDeleteCalls.Load(), "source successfully deleted on retry") +} + +func TestConsumeOne_DLQ_Disabled_IgnoresDequeueCount(t *testing.T) { + t.Parallel() + + var sourceDeleteCalls atomic.Int32 + + sourceMock := &azure.MockClient{ + DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { + sourceDeleteCalls.Add(1) + return azqueue.DeleteMessageResponse{}, nil + }, + } + + broker := azure.NewTestBroker() // no DLQ configured + broker.SetMockClientForTest(sourceMock) + + // Very high DequeueCount, but DLQ is not configured — normal path taken. + broker.ConsumeOneForTest(dlqTestDelivery(100, "not-valid-json"), nil) + + assert.Equal(t, int32(1), sourceDeleteCalls.Load(), "source deleted via normal decode-failure path") +} + +func TestConsumeOne_DLQ_DefaultMaxReceives(t *testing.T) { + t.Parallel() + + var dlqEnqueueCalls atomic.Int32 + + dlqMock := &azure.MockClient{ + EnqueueFunc: func(_ context.Context, _ string, _ *azqueue.EnqueueMessageOptions) (azqueue.EnqueueMessagesResponse, error) { + dlqEnqueueCalls.Add(1) + return azqueue.EnqueueMessagesResponse{}, nil + }, + } + sourceMock := &azure.MockClient{ + DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { + return azqueue.DeleteMessageResponse{}, nil + }, + } + + broker := azure.NewTestBroker() + broker.SetMockClientForTest(sourceMock) + broker.SetDLQClientForTest(dlqMock, 0, time.Hour) // maxReceives=0 → default 10 + + // DequeueCount=10: at threshold, must NOT redrive. + broker.ConsumeOneForTest(dlqTestDelivery(10, "not-valid-json"), nil) + assert.Equal(t, int32(0), dlqEnqueueCalls.Load(), "DequeueCount=10 must not trigger DLQ (default MaxReceives=10)") + + // DequeueCount=11: over threshold, must redrive. + err := broker.ConsumeOneForTest(dlqTestDelivery(11, "not-valid-json"), nil) + require.NoError(t, err) + assert.Equal(t, int32(1), dlqEnqueueCalls.Load(), "DequeueCount=11 must trigger DLQ (default MaxReceives=10)") +} + +func TestConsumeOne_DLQ_DefaultTTL(t *testing.T) { + t.Parallel() + + var capturedTTL int32 + + dlqMock := &azure.MockClient{ + EnqueueFunc: func(_ context.Context, _ string, opts *azqueue.EnqueueMessageOptions) (azqueue.EnqueueMessagesResponse, error) { + if opts != nil && opts.TimeToLive != nil { + capturedTTL = *opts.TimeToLive + } + return azqueue.EnqueueMessagesResponse{}, nil + }, + } + sourceMock := &azure.MockClient{ + DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { + return azqueue.DeleteMessageResponse{}, nil + }, + } + + broker := azure.NewTestBroker() + broker.SetMockClientForTest(sourceMock) + broker.SetDLQClientForTest(dlqMock, 10, 0) // dlqTTL=0 → default 30 days + + err := broker.ConsumeOneForTest(dlqTestDelivery(11, "body"), nil) + require.NoError(t, err) + + assert.Equal(t, int32((30*24*time.Hour).Seconds()), capturedTTL, "DLQ TTL must default to 30 days") +} From 382d04e850ea5710f21d3195936aa8073884c205 Mon Sep 17 00:00:00 2001 From: Mark Tokutomi Date: Wed, 6 May 2026 17:22:54 -0700 Subject: [PATCH 03/11] refactor: use int64 for MaxReceives to match azqueue.DequeueCount type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Avoids the int32→int64 cast in the DequeueCount comparison. Co-Authored-By: Claude Sonnet 4.6 --- v1/brokers/azure/storage_queue.go | 4 ++-- v1/brokers/azure/storage_queue_export_test.go | 2 +- v1/config/config.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/v1/brokers/azure/storage_queue.go b/v1/brokers/azure/storage_queue.go index 04ac2da4..c290ca74 100644 --- a/v1/brokers/azure/storage_queue.go +++ b/v1/brokers/azure/storage_queue.go @@ -39,7 +39,7 @@ type Broker struct { queueName string newQueueClient func(string) queueClient dlqClient queueClient // nil ⇒ DLQ disabled - maxReceives int32 + maxReceives int64 dlqTTL time.Duration } @@ -275,7 +275,7 @@ func (b *Broker) consumeOne(delivery azqueue.DequeueMessagesResponse, taskProces msg := delivery.Messages[0] - if b.dlqClient != nil && msg.DequeueCount != nil && *msg.DequeueCount > int64(b.maxReceives) { + if b.dlqClient != nil && msg.DequeueCount != nil && *msg.DequeueCount > b.maxReceives { if err := b.dlqOne(msg); err != nil { log.ERROR.Printf("error enqueueing message %s to DLQ: %s", *msg.MessageID, err) return nil // leave on source; visibility timeout will retry diff --git a/v1/brokers/azure/storage_queue_export_test.go b/v1/brokers/azure/storage_queue_export_test.go index 11c55337..3d9478ca 100644 --- a/v1/brokers/azure/storage_queue_export_test.go +++ b/v1/brokers/azure/storage_queue_export_test.go @@ -90,7 +90,7 @@ func (b *Broker) ConsumeOneForTest(delivery azqueue.DequeueMessagesResponse, tas // SetDLQClientForTest configures the broker's DLQ client and applies the same // defaulting logic as New() (0 maxReceives → 10, 0 dlqTTL → 30 days). // Pass c=nil to disable DLQ. -func (b *Broker) SetDLQClientForTest(c queueClient, maxReceives int32, dlqTTL time.Duration) { +func (b *Broker) SetDLQClientForTest(c queueClient, maxReceives int64, dlqTTL time.Duration) { b.dlqClient = c if c != nil { b.maxReceives = maxReceives diff --git a/v1/config/config.go b/v1/config/config.go index fdde5847..7be616fe 100644 --- a/v1/config/config.go +++ b/v1/config/config.go @@ -112,7 +112,7 @@ type AzureConfig struct { // Optional DLQ. Set DLQ to enable; MaxReceives and DLQTTL have defaults when zero. DLQ *azqueue.QueueClient // pre-bound DLQ queue client; nil ⇒ DLQ disabled - MaxReceives int32 // max DequeueCount before redriving; 0 with DLQ set ⇒ 10 + MaxReceives int64 // max DequeueCount before redriving; 0 with DLQ set ⇒ 10 DLQTTL time.Duration // TTL for DLQ messages; 0 ⇒ 30 days } From b483bc055dc58be133525fdabbbe66bde00280b0 Mon Sep 17 00:00:00 2001 From: Mark Tokutomi Date: Thu, 7 May 2026 17:29:12 -0700 Subject: [PATCH 04/11] feat: match SQS deserialization-failure handling in ASQ broker On JSON decode failure, only delete from source once DequeueCount reaches maxReceiveCountBeforeDelete (15), mirroring the SQS broker. Below that threshold the message is left on the queue so a configured DLQ (threshold 10) can catch it first. Above the threshold it is deleted as a last resort. Co-Authored-By: Claude Sonnet 4.6 --- v1/brokers/azure/storage_queue.go | 10 +++++-- .../azure/storage_queue_internal_test.go | 29 +++++++++++++++++-- v1/brokers/azure/storage_queue_test.go | 5 ++-- 3 files changed, 38 insertions(+), 6 deletions(-) diff --git a/v1/brokers/azure/storage_queue.go b/v1/brokers/azure/storage_queue.go index c290ca74..a1c1a9ab 100644 --- a/v1/brokers/azure/storage_queue.go +++ b/v1/brokers/azure/storage_queue.go @@ -20,6 +20,10 @@ import ( const ( maxDelay = time.Minute * 7 // Max supported Visibility Timeout + // maxReceiveCountBeforeDelete is the DequeueCount at which an undecodable + // message is deleted. Set above the default DLQ threshold (10) so that + // queues with a DLQ still route there first. + maxReceiveCountBeforeDelete int64 = 15 ) type queueClient interface { @@ -293,8 +297,10 @@ func (b *Broker) consumeOne(delivery azqueue.DequeueMessagesResponse, taskProces decoder.UseNumber() if err := decoder.Decode(sig); err != nil { log.ERROR.Printf("unmarshal error. the delivery is %v", delivery) - if delErr := b.deleteOne(msg); delErr != nil { - log.ERROR.Printf("error when deleting the delivery. delivery is %v, Error=%s", delivery, delErr) + if msg.DequeueCount != nil && *msg.DequeueCount >= maxReceiveCountBeforeDelete { + if delErr := b.deleteOne(msg); delErr != nil { + log.ERROR.Printf("error when deleting the delivery. delivery is %v, Error=%s", delivery, delErr) + } } // Never return an error — doing so would kill the consumer loop. return nil diff --git a/v1/brokers/azure/storage_queue_internal_test.go b/v1/brokers/azure/storage_queue_internal_test.go index e16a3693..5fa63ec7 100644 --- a/v1/brokers/azure/storage_queue_internal_test.go +++ b/v1/brokers/azure/storage_queue_internal_test.go @@ -38,6 +38,12 @@ func makeDelivery(msgText, msgID, popReceipt string) azqueue.DequeueMessagesResp } } +func makeDeliveryWithCount(msgText, msgID, popReceipt string, dequeueCount int64) azqueue.DequeueMessagesResponse { + d := makeDelivery(msgText, msgID, popReceipt) + d.Messages[0].DequeueCount = &dequeueCount + return d +} + func TestBadRequestErrRegex(t *testing.T) { t.Parallel() @@ -87,7 +93,26 @@ func TestConsumeOne_ValidMessage_Success(t *testing.T) { assert.Equal(t, "msg-id", deletedID) } -func TestConsumeOne_InvalidJSON(t *testing.T) { +func TestConsumeOne_InvalidJSON_BelowDeleteThreshold(t *testing.T) { + deleted := false + client := &MockClient{ + DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { + deleted = true + return azqueue.DeleteMessageResponse{}, nil + }, + } + + broker := NewTestBroker() + broker.SetMockClientForTest(client) + + // DequeueCount below maxReceiveCountBeforeDelete — leave on queue so a + // configured DLQ can still catch it before we give up and delete. + err := broker.consumeOne(makeDeliveryWithCount("not valid json", "msg-id", "pop-receipt", maxReceiveCountBeforeDelete-1), nil) + assert.NoError(t, err) + assert.False(t, deleted) +} + +func TestConsumeOne_InvalidJSON_AtDeleteThreshold(t *testing.T) { deleted := false client := &MockClient{ DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { @@ -99,7 +124,7 @@ func TestConsumeOne_InvalidJSON(t *testing.T) { broker := NewTestBroker() broker.SetMockClientForTest(client) - err := broker.consumeOne(makeDelivery("not valid json", "msg-id", "pop-receipt"), nil) + err := broker.consumeOne(makeDeliveryWithCount("not valid json", "msg-id", "pop-receipt", maxReceiveCountBeforeDelete), nil) assert.NoError(t, err) assert.True(t, deleted) } diff --git a/v1/brokers/azure/storage_queue_test.go b/v1/brokers/azure/storage_queue_test.go index 836448c8..9e6c9cdd 100644 --- a/v1/brokers/azure/storage_queue_test.go +++ b/v1/brokers/azure/storage_queue_test.go @@ -181,11 +181,12 @@ func TestConsumeOne_DLQ_BelowThreshold_ProcessesNormally(t *testing.T) { broker.SetDLQClientForTest(dlqMock, 10, time.Hour) // DequeueCount == MaxReceives: not over threshold, DLQ must not trigger. - // Invalid JSON causes the normal decode-failure path (source delete, error returned). + // Invalid JSON + count below delete threshold (15) — source must also be left + // alone so the DLQ can catch it on the next pop (count 11 > maxReceives 10). broker.ConsumeOneForTest(dlqTestDelivery(10, "not-valid-json"), nil) assert.Equal(t, int32(0), dlqEnqueueCalls.Load(), "DLQ should not be invoked at threshold") - assert.Equal(t, int32(1), sourceDeleteCalls.Load(), "source should be deleted via normal path") + assert.Equal(t, int32(0), sourceDeleteCalls.Load(), "source should not be deleted; DLQ will catch it next pop") } func TestConsumeOne_DLQ_AboveThreshold_Redrives(t *testing.T) { From bfaee14b6bd3e1433c09f96194a7014fcd16bea1 Mon Sep 17 00:00:00 2001 From: Mark Tokutomi Date: Fri, 8 May 2026 10:29:13 -0700 Subject: [PATCH 05/11] fix: default DLQ maxReceives/TTL on <= 0 instead of == 0 Negative values (e.g. from misconfigured callers) now fall through to defaults rather than being used as-is. time.Duration is int64 so it can be negative; int64 MaxReceives could also be set negatively by mistake. Co-Authored-By: Claude Sonnet 4.6 --- v1/brokers/azure/storage_queue.go | 4 ++-- v1/brokers/azure/storage_queue_export_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/v1/brokers/azure/storage_queue.go b/v1/brokers/azure/storage_queue.go index a1c1a9ab..dbc8d3b5 100644 --- a/v1/brokers/azure/storage_queue.go +++ b/v1/brokers/azure/storage_queue.go @@ -58,11 +58,11 @@ func New(cnf *config.Config) iface.Broker { if cnf.Azure.DLQ != nil { b.dlqClient = cnf.Azure.DLQ b.maxReceives = cnf.Azure.MaxReceives - if b.maxReceives == 0 { + if b.maxReceives <= 0 { b.maxReceives = 10 } b.dlqTTL = cnf.Azure.DLQTTL - if b.dlqTTL == 0 { + if b.dlqTTL <= 0 { b.dlqTTL = 30 * 24 * time.Hour } } diff --git a/v1/brokers/azure/storage_queue_export_test.go b/v1/brokers/azure/storage_queue_export_test.go index 3d9478ca..5add6c07 100644 --- a/v1/brokers/azure/storage_queue_export_test.go +++ b/v1/brokers/azure/storage_queue_export_test.go @@ -94,11 +94,11 @@ func (b *Broker) SetDLQClientForTest(c queueClient, maxReceives int64, dlqTTL ti b.dlqClient = c if c != nil { b.maxReceives = maxReceives - if b.maxReceives == 0 { + if b.maxReceives <= 0 { b.maxReceives = 10 } b.dlqTTL = dlqTTL - if b.dlqTTL == 0 { + if b.dlqTTL <= 0 { b.dlqTTL = 30 * 24 * time.Hour } } From 7a1ae19234a8ee2095b578c1f058dd292ee915d4 Mon Sep 17 00:00:00 2001 From: Mark Tokutomi Date: Fri, 8 May 2026 10:33:12 -0700 Subject: [PATCH 06/11] test: cover New() DLQ defaults and TTL passthrough directly TestNew_DLQ_Defaults (internal) exercises the zero-value defaulting in New() itself rather than via the test helper. The redundant DefaultTTL test is removed; a TTL assertion is added to the existing above-threshold happy-path test instead. Co-Authored-By: Claude Sonnet 4.6 --- .../azure/storage_queue_internal_test.go | 16 +++++++++ v1/brokers/azure/storage_queue_test.go | 35 ++++--------------- 2 files changed, 22 insertions(+), 29 deletions(-) diff --git a/v1/brokers/azure/storage_queue_internal_test.go b/v1/brokers/azure/storage_queue_internal_test.go index 5fa63ec7..207d21cc 100644 --- a/v1/brokers/azure/storage_queue_internal_test.go +++ b/v1/brokers/azure/storage_queue_internal_test.go @@ -9,6 +9,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue" "github.com/RichardKnop/machinery/v1/brokers/errs" + "github.com/RichardKnop/machinery/v1/config" "github.com/RichardKnop/machinery/v1/tasks" "github.com/stretchr/testify/assert" ) @@ -378,3 +379,18 @@ func TestDeleteOne_Error(t *testing.T) { err := broker.deleteOne(&azqueue.DequeuedMessage{MessageID: new("msg-id"), PopReceipt: new("pop-receipt")}) assert.ErrorContains(t, err, "delete error") } + +func TestNew_DLQ_Defaults(t *testing.T) { + t.Parallel() + + cnf := &config.Config{ + DefaultQueue: "test_queue", + Azure: &config.AzureConfig{ + Client: &azqueue.ServiceClient{}, + DLQ: &azqueue.QueueClient{}, // non-nil enables DLQ branch in New() + }, + } + b := New(cnf).(*Broker) + assert.Equal(t, int64(10), b.maxReceives, "maxReceives should default to 10 when zero") + assert.Equal(t, 30*24*time.Hour, b.dlqTTL, "dlqTTL should default to 30 days when zero") +} diff --git a/v1/brokers/azure/storage_queue_test.go b/v1/brokers/azure/storage_queue_test.go index 9e6c9cdd..f511278d 100644 --- a/v1/brokers/azure/storage_queue_test.go +++ b/v1/brokers/azure/storage_queue_test.go @@ -194,11 +194,15 @@ func TestConsumeOne_DLQ_AboveThreshold_Redrives(t *testing.T) { var dlqEnqueueCalls, sourceDeleteCalls atomic.Int32 var capturedContent string + var capturedTTL int32 dlqMock := &azure.MockClient{ - EnqueueFunc: func(_ context.Context, content string, _ *azqueue.EnqueueMessageOptions) (azqueue.EnqueueMessagesResponse, error) { + EnqueueFunc: func(_ context.Context, content string, opts *azqueue.EnqueueMessageOptions) (azqueue.EnqueueMessagesResponse, error) { dlqEnqueueCalls.Add(1) capturedContent = content + if opts != nil && opts.TimeToLive != nil { + capturedTTL = *opts.TimeToLive + } return azqueue.EnqueueMessagesResponse{}, nil }, } @@ -219,6 +223,7 @@ func TestConsumeOne_DLQ_AboveThreshold_Redrives(t *testing.T) { require.NoError(t, err) assert.Equal(t, int32(1), dlqEnqueueCalls.Load()) assert.Equal(t, body, capturedContent, "DLQ should receive the original message body") + assert.Equal(t, int32(time.Hour.Seconds()), capturedTTL, "DLQ enqueue should use the configured TTL") assert.Equal(t, int32(1), sourceDeleteCalls.Load()) } @@ -363,31 +368,3 @@ func TestConsumeOne_DLQ_DefaultMaxReceives(t *testing.T) { assert.Equal(t, int32(1), dlqEnqueueCalls.Load(), "DequeueCount=11 must trigger DLQ (default MaxReceives=10)") } -func TestConsumeOne_DLQ_DefaultTTL(t *testing.T) { - t.Parallel() - - var capturedTTL int32 - - dlqMock := &azure.MockClient{ - EnqueueFunc: func(_ context.Context, _ string, opts *azqueue.EnqueueMessageOptions) (azqueue.EnqueueMessagesResponse, error) { - if opts != nil && opts.TimeToLive != nil { - capturedTTL = *opts.TimeToLive - } - return azqueue.EnqueueMessagesResponse{}, nil - }, - } - sourceMock := &azure.MockClient{ - DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { - return azqueue.DeleteMessageResponse{}, nil - }, - } - - broker := azure.NewTestBroker() - broker.SetMockClientForTest(sourceMock) - broker.SetDLQClientForTest(dlqMock, 10, 0) // dlqTTL=0 → default 30 days - - err := broker.ConsumeOneForTest(dlqTestDelivery(11, "body"), nil) - require.NoError(t, err) - - assert.Equal(t, int32((30*24*time.Hour).Seconds()), capturedTTL, "DLQ TTL must default to 30 days") -} From da7739d54304411886dcda18a35034cb9701a1cf Mon Sep 17 00:00:00 2001 From: Mark Tokutomi Date: Fri, 8 May 2026 10:43:04 -0700 Subject: [PATCH 07/11] test: decouple DLQ tests from decode-failure behavior DLQ tests now use a valid JSON body (validDLQTaskBody const) instead of invalid JSON. Above-threshold tests pass nil processor since the DLQ path returns before decode. The disabled-DLQ test uses a countingProcessor and a registered task name to assert normal processing ran, replacing the IgnoreWhenTaskNotRegistered workaround. Decode-failure coverage remains in the dedicated InvalidJSON_* tests in the internal file. Co-Authored-By: Claude Sonnet 4.6 --- v1/brokers/azure/storage_queue_test.go | 42 ++++++++++++++++++-------- 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/v1/brokers/azure/storage_queue_test.go b/v1/brokers/azure/storage_queue_test.go index f511278d..dc81dd71 100644 --- a/v1/brokers/azure/storage_queue_test.go +++ b/v1/brokers/azure/storage_queue_test.go @@ -13,10 +13,24 @@ import ( "github.com/RichardKnop/machinery/v1/brokers/iface" "github.com/RichardKnop/machinery/v1/common" "github.com/RichardKnop/machinery/v1/config" + "github.com/RichardKnop/machinery/v1/tasks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +// validDLQTaskBody is a minimal valid task JSON used in DLQ tests that need a +// decodable message without exercising task processing logic. +const validDLQTaskBody = `{"UUID":"dlq-test-uuid","Name":"unknown-dlq-task"}` + +type countingProcessor struct{ count atomic.Int32 } + +func (p *countingProcessor) Process(_ *tasks.Signature, _ tasks.ExtendForSignatureFunc) error { + p.count.Add(1) + return nil +} +func (p *countingProcessor) CustomQueue() string { return "" } +func (p *countingProcessor) PreConsumeHandler() bool { return true } + func TestNew_ImplementsInterfaces(t *testing.T) { t.Parallel() @@ -180,13 +194,12 @@ func TestConsumeOne_DLQ_BelowThreshold_ProcessesNormally(t *testing.T) { broker.SetMockClientForTest(sourceMock) broker.SetDLQClientForTest(dlqMock, 10, time.Hour) - // DequeueCount == MaxReceives: not over threshold, DLQ must not trigger. - // Invalid JSON + count below delete threshold (15) — source must also be left - // alone so the DLQ can catch it on the next pop (count 11 > maxReceives 10). - broker.ConsumeOneForTest(dlqTestDelivery(10, "not-valid-json"), nil) + // DequeueCount == MaxReceives (10): not over threshold, DLQ must not trigger. + // The message is left on the queue; the next pop (count 11 > maxReceives 10) will redrive. + broker.ConsumeOneForTest(dlqTestDelivery(10, validDLQTaskBody), nil) assert.Equal(t, int32(0), dlqEnqueueCalls.Load(), "DLQ should not be invoked at threshold") - assert.Equal(t, int32(0), sourceDeleteCalls.Load(), "source should not be deleted; DLQ will catch it next pop") + assert.Equal(t, int32(0), sourceDeleteCalls.Load(), "unregistered task without handler is left on queue") } func TestConsumeOne_DLQ_AboveThreshold_Redrives(t *testing.T) { @@ -217,7 +230,7 @@ func TestConsumeOne_DLQ_AboveThreshold_Redrives(t *testing.T) { broker.SetMockClientForTest(sourceMock) broker.SetDLQClientForTest(dlqMock, 10, time.Hour) - const body = "original-message-body" + body := validDLQTaskBody err := broker.ConsumeOneForTest(dlqTestDelivery(11, body), nil) require.NoError(t, err) @@ -248,7 +261,7 @@ func TestConsumeOne_DLQ_EnqueueFails_RetrySucceeds(t *testing.T) { broker.SetMockClientForTest(sourceMock) broker.SetDLQClientForTest(failingDLQ, 10, time.Hour) - delivery := dlqTestDelivery(11, "some-body") + delivery := dlqTestDelivery(11, validDLQTaskBody) // First attempt: DLQ enqueue fails — consumer must survive. err := broker.ConsumeOneForTest(delivery, nil) @@ -292,7 +305,7 @@ func TestConsumeOne_DLQ_DeleteFails_RetryEnqueuesDuplicate(t *testing.T) { broker.SetMockClientForTest(failingSourceMock) broker.SetDLQClientForTest(dlqMock, 10, time.Hour) - delivery := dlqTestDelivery(11, "some-body") + delivery := dlqTestDelivery(11, validDLQTaskBody) // First attempt: DLQ enqueue OK, source delete fails — consumer must survive. err := broker.ConsumeOneForTest(delivery, nil) @@ -329,12 +342,15 @@ func TestConsumeOne_DLQ_Disabled_IgnoresDequeueCount(t *testing.T) { } broker := azure.NewTestBroker() // no DLQ configured + broker.SetRegisteredTaskNames([]string{"unknown-dlq-task"}) broker.SetMockClientForTest(sourceMock) - // Very high DequeueCount, but DLQ is not configured — normal path taken. - broker.ConsumeOneForTest(dlqTestDelivery(100, "not-valid-json"), nil) + processor := &countingProcessor{} + // Very high DequeueCount, but DLQ is not configured — normal processing path is taken. + broker.ConsumeOneForTest(dlqTestDelivery(100, validDLQTaskBody), processor) - assert.Equal(t, int32(1), sourceDeleteCalls.Load(), "source deleted via normal decode-failure path") + assert.Equal(t, int32(1), processor.count.Load(), "task was processed normally") + assert.Equal(t, int32(1), sourceDeleteCalls.Load(), "source deleted after normal processing") } func TestConsumeOne_DLQ_DefaultMaxReceives(t *testing.T) { @@ -359,11 +375,11 @@ func TestConsumeOne_DLQ_DefaultMaxReceives(t *testing.T) { broker.SetDLQClientForTest(dlqMock, 0, time.Hour) // maxReceives=0 → default 10 // DequeueCount=10: at threshold, must NOT redrive. - broker.ConsumeOneForTest(dlqTestDelivery(10, "not-valid-json"), nil) + broker.ConsumeOneForTest(dlqTestDelivery(10, validDLQTaskBody), nil) assert.Equal(t, int32(0), dlqEnqueueCalls.Load(), "DequeueCount=10 must not trigger DLQ (default MaxReceives=10)") // DequeueCount=11: over threshold, must redrive. - err := broker.ConsumeOneForTest(dlqTestDelivery(11, "not-valid-json"), nil) + err := broker.ConsumeOneForTest(dlqTestDelivery(11, validDLQTaskBody), nil) require.NoError(t, err) assert.Equal(t, int32(1), dlqEnqueueCalls.Load(), "DequeueCount=11 must trigger DLQ (default MaxReceives=10)") } From e9f4105e80ba61b5a8c2b4999f3c720f8632a2b9 Mon Sep 17 00:00:00 2001 From: Mark Tokutomi Date: Fri, 8 May 2026 10:46:18 -0700 Subject: [PATCH 08/11] refactor: rename maxReceiveCountBeforeDelete to decodeFailureDeleteThreshold The old name implied the threshold applied generally; the new name makes clear it only governs deletion of messages that failed JSON decoding. Co-Authored-By: Claude Sonnet 4.6 --- v1/brokers/azure/storage_queue.go | 6 +++--- v1/brokers/azure/storage_queue_internal_test.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/v1/brokers/azure/storage_queue.go b/v1/brokers/azure/storage_queue.go index dbc8d3b5..5bdf80b5 100644 --- a/v1/brokers/azure/storage_queue.go +++ b/v1/brokers/azure/storage_queue.go @@ -20,10 +20,10 @@ import ( const ( maxDelay = time.Minute * 7 // Max supported Visibility Timeout - // maxReceiveCountBeforeDelete is the DequeueCount at which an undecodable + // decodeFailureDeleteThreshold is the DequeueCount at which an undecodable // message is deleted. Set above the default DLQ threshold (10) so that // queues with a DLQ still route there first. - maxReceiveCountBeforeDelete int64 = 15 + decodeFailureDeleteThreshold int64 = 15 ) type queueClient interface { @@ -297,7 +297,7 @@ func (b *Broker) consumeOne(delivery azqueue.DequeueMessagesResponse, taskProces decoder.UseNumber() if err := decoder.Decode(sig); err != nil { log.ERROR.Printf("unmarshal error. the delivery is %v", delivery) - if msg.DequeueCount != nil && *msg.DequeueCount >= maxReceiveCountBeforeDelete { + if msg.DequeueCount != nil && *msg.DequeueCount >= decodeFailureDeleteThreshold { if delErr := b.deleteOne(msg); delErr != nil { log.ERROR.Printf("error when deleting the delivery. delivery is %v, Error=%s", delivery, delErr) } diff --git a/v1/brokers/azure/storage_queue_internal_test.go b/v1/brokers/azure/storage_queue_internal_test.go index 207d21cc..ee3c303a 100644 --- a/v1/brokers/azure/storage_queue_internal_test.go +++ b/v1/brokers/azure/storage_queue_internal_test.go @@ -106,9 +106,9 @@ func TestConsumeOne_InvalidJSON_BelowDeleteThreshold(t *testing.T) { broker := NewTestBroker() broker.SetMockClientForTest(client) - // DequeueCount below maxReceiveCountBeforeDelete — leave on queue so a + // DequeueCount below decodeFailureDeleteThreshold — leave on queue so a // configured DLQ can still catch it before we give up and delete. - err := broker.consumeOne(makeDeliveryWithCount("not valid json", "msg-id", "pop-receipt", maxReceiveCountBeforeDelete-1), nil) + err := broker.consumeOne(makeDeliveryWithCount("not valid json", "msg-id", "pop-receipt", decodeFailureDeleteThreshold-1), nil) assert.NoError(t, err) assert.False(t, deleted) } @@ -125,7 +125,7 @@ func TestConsumeOne_InvalidJSON_AtDeleteThreshold(t *testing.T) { broker := NewTestBroker() broker.SetMockClientForTest(client) - err := broker.consumeOne(makeDeliveryWithCount("not valid json", "msg-id", "pop-receipt", maxReceiveCountBeforeDelete), nil) + err := broker.consumeOne(makeDeliveryWithCount("not valid json", "msg-id", "pop-receipt", decodeFailureDeleteThreshold), nil) assert.NoError(t, err) assert.True(t, deleted) } From b099ae74a54ccc2c0c1027fc77384c92acbf74c2 Mon Sep 17 00:00:00 2001 From: Mark Tokutomi Date: Fri, 8 May 2026 10:49:56 -0700 Subject: [PATCH 09/11] test: remove stale comment from dlqTestDelivery Co-Authored-By: Claude Sonnet 4.6 --- v1/brokers/azure/storage_queue_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/v1/brokers/azure/storage_queue_test.go b/v1/brokers/azure/storage_queue_test.go index dc81dd71..40640510 100644 --- a/v1/brokers/azure/storage_queue_test.go +++ b/v1/brokers/azure/storage_queue_test.go @@ -158,7 +158,6 @@ func TestStartConsuming_V2_ProcessesTask(t *testing.T) { } // dlqTestDelivery builds a single-message DequeueMessagesResponse for DLQ tests. -// Invalid JSON is fine for tests that exercise the pre-decode DLQ check. func dlqTestDelivery(dequeueCount int64, body string) azqueue.DequeueMessagesResponse { msgID := "test-msg-id" popReceipt := "test-pop-receipt" From a79a6991430e6e567d6d6638cac29eaa666c7620 Mon Sep 17 00:00:00 2001 From: Mark Tokutomi Date: Fri, 8 May 2026 10:53:58 -0700 Subject: [PATCH 10/11] test: assert task processor invocation in all DLQ tests Each DLQ test now wires a countingProcessor and asserts whether the task was processed. Above-threshold cases assert count=0 (DLQ path returns before decode). BelowThreshold and DefaultMaxReceives register the task and assert count=1 to confirm normal processing ran. Co-Authored-By: Claude Sonnet 4.6 --- v1/brokers/azure/storage_queue_test.go | 36 +++++++++++++++++++------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/v1/brokers/azure/storage_queue_test.go b/v1/brokers/azure/storage_queue_test.go index 40640510..81db429e 100644 --- a/v1/brokers/azure/storage_queue_test.go +++ b/v1/brokers/azure/storage_queue_test.go @@ -190,15 +190,18 @@ func TestConsumeOne_DLQ_BelowThreshold_ProcessesNormally(t *testing.T) { } broker := azure.NewTestBroker() + broker.SetRegisteredTaskNames([]string{"unknown-dlq-task"}) broker.SetMockClientForTest(sourceMock) broker.SetDLQClientForTest(dlqMock, 10, time.Hour) + processor := &countingProcessor{} // DequeueCount == MaxReceives (10): not over threshold, DLQ must not trigger. - // The message is left on the queue; the next pop (count 11 > maxReceives 10) will redrive. - broker.ConsumeOneForTest(dlqTestDelivery(10, validDLQTaskBody), nil) + // The message processes normally; the next pop (count 11 > maxReceives 10) will redrive. + broker.ConsumeOneForTest(dlqTestDelivery(10, validDLQTaskBody), processor) assert.Equal(t, int32(0), dlqEnqueueCalls.Load(), "DLQ should not be invoked at threshold") - assert.Equal(t, int32(0), sourceDeleteCalls.Load(), "unregistered task without handler is left on queue") + assert.Equal(t, int32(1), processor.count.Load(), "task should be processed normally at threshold") + assert.Equal(t, int32(1), sourceDeleteCalls.Load(), "source deleted after normal processing") } func TestConsumeOne_DLQ_AboveThreshold_Redrives(t *testing.T) { @@ -229,13 +232,15 @@ func TestConsumeOne_DLQ_AboveThreshold_Redrives(t *testing.T) { broker.SetMockClientForTest(sourceMock) broker.SetDLQClientForTest(dlqMock, 10, time.Hour) + processor := &countingProcessor{} body := validDLQTaskBody - err := broker.ConsumeOneForTest(dlqTestDelivery(11, body), nil) + err := broker.ConsumeOneForTest(dlqTestDelivery(11, body), processor) require.NoError(t, err) assert.Equal(t, int32(1), dlqEnqueueCalls.Load()) assert.Equal(t, body, capturedContent, "DLQ should receive the original message body") assert.Equal(t, int32(time.Hour.Seconds()), capturedTTL, "DLQ enqueue should use the configured TTL") + assert.Equal(t, int32(0), processor.count.Load(), "message must not be processed when redriven to DLQ") assert.Equal(t, int32(1), sourceDeleteCalls.Load()) } @@ -260,13 +265,15 @@ func TestConsumeOne_DLQ_EnqueueFails_RetrySucceeds(t *testing.T) { broker.SetMockClientForTest(sourceMock) broker.SetDLQClientForTest(failingDLQ, 10, time.Hour) + processor := &countingProcessor{} delivery := dlqTestDelivery(11, validDLQTaskBody) // First attempt: DLQ enqueue fails — consumer must survive. - err := broker.ConsumeOneForTest(delivery, nil) + err := broker.ConsumeOneForTest(delivery, processor) require.NoError(t, err, "consumer must survive DLQ enqueue failure") assert.Equal(t, int32(1), dlqEnqueueCalls.Load(), "DLQ enqueue was attempted") assert.Equal(t, int32(0), sourceDeleteCalls.Load(), "source must not be deleted when DLQ fails") + assert.Equal(t, int32(0), processor.count.Load(), "message must not be processed while DLQ redrive is in progress") // Simulate visibility-timeout redelivery: swap in a working DLQ client. workingDLQ := &azure.MockClient{ @@ -277,10 +284,11 @@ func TestConsumeOne_DLQ_EnqueueFails_RetrySucceeds(t *testing.T) { } broker.SetDLQClientForTest(workingDLQ, 10, time.Hour) - err = broker.ConsumeOneForTest(delivery, nil) + err = broker.ConsumeOneForTest(delivery, processor) require.NoError(t, err) assert.Equal(t, int32(2), dlqEnqueueCalls.Load(), "DLQ enqueue retried on second attempt") assert.Equal(t, int32(1), sourceDeleteCalls.Load(), "source deleted after successful DLQ enqueue") + assert.Equal(t, int32(0), processor.count.Load(), "message must not be processed when redriven to DLQ") } func TestConsumeOne_DLQ_DeleteFails_RetryEnqueuesDuplicate(t *testing.T) { @@ -304,13 +312,15 @@ func TestConsumeOne_DLQ_DeleteFails_RetryEnqueuesDuplicate(t *testing.T) { broker.SetMockClientForTest(failingSourceMock) broker.SetDLQClientForTest(dlqMock, 10, time.Hour) + processor := &countingProcessor{} delivery := dlqTestDelivery(11, validDLQTaskBody) // First attempt: DLQ enqueue OK, source delete fails — consumer must survive. - err := broker.ConsumeOneForTest(delivery, nil) + err := broker.ConsumeOneForTest(delivery, processor) require.NoError(t, err, "consumer must survive source delete failure") assert.Equal(t, int32(1), dlqEnqueueCalls.Load(), "DLQ enqueue succeeded") assert.Equal(t, int32(1), sourceDeleteCalls.Load(), "source delete was attempted") + assert.Equal(t, int32(0), processor.count.Load(), "message must not be processed while DLQ redrive is in progress") // Simulate visibility-timeout redelivery: swap in a working source mock. workingSourceMock := &azure.MockClient{ @@ -321,11 +331,12 @@ func TestConsumeOne_DLQ_DeleteFails_RetryEnqueuesDuplicate(t *testing.T) { } broker.SetMockClientForTest(workingSourceMock) - err = broker.ConsumeOneForTest(delivery, nil) + err = broker.ConsumeOneForTest(delivery, processor) require.NoError(t, err) // DLQ receives a duplicate — this is the documented at-least-once semantic. assert.Equal(t, int32(2), dlqEnqueueCalls.Load(), "DLQ receives a duplicate on retry (at-least-once)") assert.Equal(t, int32(2), sourceDeleteCalls.Load(), "source successfully deleted on retry") + assert.Equal(t, int32(0), processor.count.Load(), "message must not be processed when redriven to DLQ") } func TestConsumeOne_DLQ_Disabled_IgnoresDequeueCount(t *testing.T) { @@ -370,16 +381,21 @@ func TestConsumeOne_DLQ_DefaultMaxReceives(t *testing.T) { } broker := azure.NewTestBroker() + broker.SetRegisteredTaskNames([]string{"unknown-dlq-task"}) broker.SetMockClientForTest(sourceMock) broker.SetDLQClientForTest(dlqMock, 0, time.Hour) // maxReceives=0 → default 10 + processor := &countingProcessor{} + // DequeueCount=10: at threshold, must NOT redrive. - broker.ConsumeOneForTest(dlqTestDelivery(10, validDLQTaskBody), nil) + broker.ConsumeOneForTest(dlqTestDelivery(10, validDLQTaskBody), processor) assert.Equal(t, int32(0), dlqEnqueueCalls.Load(), "DequeueCount=10 must not trigger DLQ (default MaxReceives=10)") + assert.Equal(t, int32(1), processor.count.Load(), "message should be processed normally at threshold") // DequeueCount=11: over threshold, must redrive. - err := broker.ConsumeOneForTest(dlqTestDelivery(11, validDLQTaskBody), nil) + err := broker.ConsumeOneForTest(dlqTestDelivery(11, validDLQTaskBody), processor) require.NoError(t, err) assert.Equal(t, int32(1), dlqEnqueueCalls.Load(), "DequeueCount=11 must trigger DLQ (default MaxReceives=10)") + assert.Equal(t, int32(1), processor.count.Load(), "message must not be processed when redriven to DLQ") } From 9029da63bba470e3620ed49b2f68746621b16d28 Mon Sep 17 00:00:00 2001 From: Mark Tokutomi Date: Fri, 8 May 2026 11:03:39 -0700 Subject: [PATCH 11/11] test: combine threshold and default-maxReceives cases into ThresholdTransition Single test exercises count=10 (processes normally) then count=11 (redrives to DLQ) using an explicit maxReceives=10; the zero-default is already covered by TestNew_DLQ_Defaults in the internal file. Co-Authored-By: Claude Sonnet 4.6 --- v1/brokers/azure/storage_queue_test.go | 51 ++++++-------------------- 1 file changed, 11 insertions(+), 40 deletions(-) diff --git a/v1/brokers/azure/storage_queue_test.go b/v1/brokers/azure/storage_queue_test.go index 81db429e..b958490b 100644 --- a/v1/brokers/azure/storage_queue_test.go +++ b/v1/brokers/azure/storage_queue_test.go @@ -171,7 +171,7 @@ func dlqTestDelivery(dequeueCount int64, body string) azqueue.DequeueMessagesRes } } -func TestConsumeOne_DLQ_BelowThreshold_ProcessesNormally(t *testing.T) { +func TestConsumeOne_DLQ_ThresholdTransition(t *testing.T) { t.Parallel() var dlqEnqueueCalls, sourceDeleteCalls atomic.Int32 @@ -195,13 +195,19 @@ func TestConsumeOne_DLQ_BelowThreshold_ProcessesNormally(t *testing.T) { broker.SetDLQClientForTest(dlqMock, 10, time.Hour) processor := &countingProcessor{} - // DequeueCount == MaxReceives (10): not over threshold, DLQ must not trigger. - // The message processes normally; the next pop (count 11 > maxReceives 10) will redrive. - broker.ConsumeOneForTest(dlqTestDelivery(10, validDLQTaskBody), processor) - assert.Equal(t, int32(0), dlqEnqueueCalls.Load(), "DLQ should not be invoked at threshold") + // DequeueCount=10: at threshold, must NOT redrive — processes normally. + broker.ConsumeOneForTest(dlqTestDelivery(10, validDLQTaskBody), processor) + assert.Equal(t, int32(0), dlqEnqueueCalls.Load(), "DequeueCount=10 must not trigger DLQ") assert.Equal(t, int32(1), processor.count.Load(), "task should be processed normally at threshold") assert.Equal(t, int32(1), sourceDeleteCalls.Load(), "source deleted after normal processing") + + // DequeueCount=11: over threshold, must redrive. + err := broker.ConsumeOneForTest(dlqTestDelivery(11, validDLQTaskBody), processor) + require.NoError(t, err) + assert.Equal(t, int32(1), dlqEnqueueCalls.Load(), "DequeueCount=11 must trigger DLQ") + assert.Equal(t, int32(1), processor.count.Load(), "message must not be processed when redriven to DLQ") + assert.Equal(t, int32(2), sourceDeleteCalls.Load(), "source deleted from DLQ path") } func TestConsumeOne_DLQ_AboveThreshold_Redrives(t *testing.T) { @@ -363,39 +369,4 @@ func TestConsumeOne_DLQ_Disabled_IgnoresDequeueCount(t *testing.T) { assert.Equal(t, int32(1), sourceDeleteCalls.Load(), "source deleted after normal processing") } -func TestConsumeOne_DLQ_DefaultMaxReceives(t *testing.T) { - t.Parallel() - - var dlqEnqueueCalls atomic.Int32 - - dlqMock := &azure.MockClient{ - EnqueueFunc: func(_ context.Context, _ string, _ *azqueue.EnqueueMessageOptions) (azqueue.EnqueueMessagesResponse, error) { - dlqEnqueueCalls.Add(1) - return azqueue.EnqueueMessagesResponse{}, nil - }, - } - sourceMock := &azure.MockClient{ - DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { - return azqueue.DeleteMessageResponse{}, nil - }, - } - - broker := azure.NewTestBroker() - broker.SetRegisteredTaskNames([]string{"unknown-dlq-task"}) - broker.SetMockClientForTest(sourceMock) - broker.SetDLQClientForTest(dlqMock, 0, time.Hour) // maxReceives=0 → default 10 - - processor := &countingProcessor{} - - // DequeueCount=10: at threshold, must NOT redrive. - broker.ConsumeOneForTest(dlqTestDelivery(10, validDLQTaskBody), processor) - assert.Equal(t, int32(0), dlqEnqueueCalls.Load(), "DequeueCount=10 must not trigger DLQ (default MaxReceives=10)") - assert.Equal(t, int32(1), processor.count.Load(), "message should be processed normally at threshold") - - // DequeueCount=11: over threshold, must redrive. - err := broker.ConsumeOneForTest(dlqTestDelivery(11, validDLQTaskBody), processor) - require.NoError(t, err) - assert.Equal(t, int32(1), dlqEnqueueCalls.Load(), "DequeueCount=11 must trigger DLQ (default MaxReceives=10)") - assert.Equal(t, int32(1), processor.count.Load(), "message must not be processed when redriven to DLQ") -}