diff --git a/v1/brokers/azure/storage_queue.go b/v1/brokers/azure/storage_queue.go index 9c790c1c..5bdf80b5 100644 --- a/v1/brokers/azure/storage_queue.go +++ b/v1/brokers/azure/storage_queue.go @@ -20,6 +20,10 @@ import ( const ( maxDelay = time.Minute * 7 // Max supported Visibility Timeout + // decodeFailureDeleteThreshold is the DequeueCount at which an undecodable + // message is deleted. Set above the default DLQ threshold (10) so that + // queues with a DLQ still route there first. + decodeFailureDeleteThreshold int64 = 15 ) type queueClient interface { @@ -38,6 +42,9 @@ type Broker struct { cfg config.AzureConfig queueName string newQueueClient func(string) queueClient + dlqClient queueClient // nil ⇒ DLQ disabled + maxReceives int64 + dlqTTL time.Duration } // New creates new Broker instance @@ -48,6 +55,17 @@ func New(cnf *config.Config) iface.Broker { b.newQueueClient = func(name string) queueClient { return cnf.Azure.Client.NewQueueClient(name) } + if cnf.Azure.DLQ != nil { + b.dlqClient = cnf.Azure.DLQ + b.maxReceives = cnf.Azure.MaxReceives + if b.maxReceives <= 0 { + b.maxReceives = 10 + } + b.dlqTTL = cnf.Azure.DLQTTL + if b.dlqTTL <= 0 { + b.dlqTTL = 30 * 24 * time.Hour + } + } return b } @@ -261,13 +279,28 @@ func (b *Broker) consumeOne(delivery azqueue.DequeueMessagesResponse, taskProces msg := delivery.Messages[0] + if b.dlqClient != nil && msg.DequeueCount != nil && *msg.DequeueCount > b.maxReceives { + if err := b.dlqOne(msg); err != nil { + log.ERROR.Printf("error enqueueing message %s to DLQ: %s", *msg.MessageID, err) + return nil // leave on source; visibility timeout will retry + } + if err := b.deleteOne(msg); err != nil { + log.ERROR.Printf("error deleting message %s from source after DLQ enqueue: %s", *msg.MessageID, err) + return nil // duplicate in DLQ on next attempt; do not exit consume loop + } + log.INFO.Printf("moved message %s to DLQ after %d receives", *msg.MessageID, *msg.DequeueCount) + return nil + } + sig := new(tasks.Signature) decoder := json.NewDecoder(strings.NewReader(*msg.MessageText)) decoder.UseNumber() if err := decoder.Decode(sig); err != nil { log.ERROR.Printf("unmarshal error. the delivery is %v", delivery) - if delErr := b.deleteOne(msg); delErr != nil { - log.ERROR.Printf("error when deleting the delivery. delivery is %v, Error=%s", delivery, delErr) + if msg.DequeueCount != nil && *msg.DequeueCount >= decodeFailureDeleteThreshold { + if delErr := b.deleteOne(msg); delErr != nil { + log.ERROR.Printf("error when deleting the delivery. delivery is %v, Error=%s", delivery, delErr) + } } // Never return an error — doing so would kill the consumer loop. return nil @@ -333,6 +366,17 @@ func (b *Broker) consumeOne(delivery azqueue.DequeueMessagesResponse, taskProces return err } +// dlqOne enqueues a message to the configured DLQ. +func (b *Broker) dlqOne(msg *azqueue.DequeuedMessage) error { + ttlSeconds := int32(b.dlqTTL.Seconds()) + _, err := b.dlqClient.EnqueueMessage( + context.Background(), + *msg.MessageText, + &azqueue.EnqueueMessageOptions{TimeToLive: &ttlSeconds}, + ) + return err +} + // deleteOne is a method delete a delivery from AWS SQS func (b *Broker) deleteOne(message *azqueue.DequeuedMessage) error { _, err := b.newQueueClient(b.queueName).DeleteMessage(context.Background(), *message.MessageID, *message.PopReceipt, nil) diff --git a/v1/brokers/azure/storage_queue_export_test.go b/v1/brokers/azure/storage_queue_export_test.go index be1c4f60..5add6c07 100644 --- a/v1/brokers/azure/storage_queue_export_test.go +++ b/v1/brokers/azure/storage_queue_export_test.go @@ -3,6 +3,7 @@ package azure import ( "context" "sync" + "time" "github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue" "github.com/RichardKnop/machinery/v1/brokers/iface" @@ -85,3 +86,20 @@ func (b *Broker) GetStopReceivingChanForTest() chan int { func (b *Broker) ConsumeOneForTest(delivery azqueue.DequeueMessagesResponse, taskProcessor iface.TaskProcessor) error { return b.consumeOne(delivery, taskProcessor) } + +// SetDLQClientForTest configures the broker's DLQ client and applies the same +// defaulting logic as New() (0 maxReceives → 10, 0 dlqTTL → 30 days). +// Pass c=nil to disable DLQ. +func (b *Broker) SetDLQClientForTest(c queueClient, maxReceives int64, dlqTTL time.Duration) { + b.dlqClient = c + if c != nil { + b.maxReceives = maxReceives + if b.maxReceives <= 0 { + b.maxReceives = 10 + } + b.dlqTTL = dlqTTL + if b.dlqTTL <= 0 { + b.dlqTTL = 30 * 24 * time.Hour + } + } +} diff --git a/v1/brokers/azure/storage_queue_internal_test.go b/v1/brokers/azure/storage_queue_internal_test.go index e16a3693..ee3c303a 100644 --- a/v1/brokers/azure/storage_queue_internal_test.go +++ b/v1/brokers/azure/storage_queue_internal_test.go @@ -9,6 +9,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue" "github.com/RichardKnop/machinery/v1/brokers/errs" + "github.com/RichardKnop/machinery/v1/config" "github.com/RichardKnop/machinery/v1/tasks" "github.com/stretchr/testify/assert" ) @@ -38,6 +39,12 @@ func makeDelivery(msgText, msgID, popReceipt string) azqueue.DequeueMessagesResp } } +func makeDeliveryWithCount(msgText, msgID, popReceipt string, dequeueCount int64) azqueue.DequeueMessagesResponse { + d := makeDelivery(msgText, msgID, popReceipt) + d.Messages[0].DequeueCount = &dequeueCount + return d +} + func TestBadRequestErrRegex(t *testing.T) { t.Parallel() @@ -87,7 +94,7 @@ func TestConsumeOne_ValidMessage_Success(t *testing.T) { assert.Equal(t, "msg-id", deletedID) } -func TestConsumeOne_InvalidJSON(t *testing.T) { +func TestConsumeOne_InvalidJSON_BelowDeleteThreshold(t *testing.T) { deleted := false client := &MockClient{ DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { @@ -99,7 +106,26 @@ func TestConsumeOne_InvalidJSON(t *testing.T) { broker := NewTestBroker() broker.SetMockClientForTest(client) - err := broker.consumeOne(makeDelivery("not valid json", "msg-id", "pop-receipt"), nil) + // DequeueCount below decodeFailureDeleteThreshold — leave on queue so a + // configured DLQ can still catch it before we give up and delete. + err := broker.consumeOne(makeDeliveryWithCount("not valid json", "msg-id", "pop-receipt", decodeFailureDeleteThreshold-1), nil) + assert.NoError(t, err) + assert.False(t, deleted) +} + +func TestConsumeOne_InvalidJSON_AtDeleteThreshold(t *testing.T) { + deleted := false + client := &MockClient{ + DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { + deleted = true + return azqueue.DeleteMessageResponse{}, nil + }, + } + + broker := NewTestBroker() + broker.SetMockClientForTest(client) + + err := broker.consumeOne(makeDeliveryWithCount("not valid json", "msg-id", "pop-receipt", decodeFailureDeleteThreshold), nil) assert.NoError(t, err) assert.True(t, deleted) } @@ -353,3 +379,18 @@ func TestDeleteOne_Error(t *testing.T) { err := broker.deleteOne(&azqueue.DequeuedMessage{MessageID: new("msg-id"), PopReceipt: new("pop-receipt")}) assert.ErrorContains(t, err, "delete error") } + +func TestNew_DLQ_Defaults(t *testing.T) { + t.Parallel() + + cnf := &config.Config{ + DefaultQueue: "test_queue", + Azure: &config.AzureConfig{ + Client: &azqueue.ServiceClient{}, + DLQ: &azqueue.QueueClient{}, // non-nil enables DLQ branch in New() + }, + } + b := New(cnf).(*Broker) + assert.Equal(t, int64(10), b.maxReceives, "maxReceives should default to 10 when zero") + assert.Equal(t, 30*24*time.Hour, b.dlqTTL, "dlqTTL should default to 30 days when zero") +} diff --git a/v1/brokers/azure/storage_queue_test.go b/v1/brokers/azure/storage_queue_test.go index 8d3e95cd..b958490b 100644 --- a/v1/brokers/azure/storage_queue_test.go +++ b/v1/brokers/azure/storage_queue_test.go @@ -2,6 +2,7 @@ package azure_test import ( "context" + "errors" "sync/atomic" "testing" "time" @@ -12,10 +13,24 @@ import ( "github.com/RichardKnop/machinery/v1/brokers/iface" "github.com/RichardKnop/machinery/v1/common" "github.com/RichardKnop/machinery/v1/config" + "github.com/RichardKnop/machinery/v1/tasks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) +// validDLQTaskBody is a minimal valid task JSON used in DLQ tests that need a +// decodable message without exercising task processing logic. +const validDLQTaskBody = `{"UUID":"dlq-test-uuid","Name":"unknown-dlq-task"}` + +type countingProcessor struct{ count atomic.Int32 } + +func (p *countingProcessor) Process(_ *tasks.Signature, _ tasks.ExtendForSignatureFunc) error { + p.count.Add(1) + return nil +} +func (p *countingProcessor) CustomQueue() string { return "" } +func (p *countingProcessor) PreConsumeHandler() bool { return true } + func TestNew_ImplementsInterfaces(t *testing.T) { t.Parallel() @@ -141,3 +156,217 @@ func TestStartConsuming_V1_ProcessesTask(t *testing.T) { func TestStartConsuming_V2_ProcessesTask(t *testing.T) { testStartConsumingProcessesTask(t, newTestV2Pool(1)) } + +// dlqTestDelivery builds a single-message DequeueMessagesResponse for DLQ tests. +func dlqTestDelivery(dequeueCount int64, body string) azqueue.DequeueMessagesResponse { + msgID := "test-msg-id" + popReceipt := "test-pop-receipt" + return azqueue.DequeueMessagesResponse{ + Messages: []*azqueue.DequeuedMessage{{ + MessageID: &msgID, + PopReceipt: &popReceipt, + MessageText: &body, + DequeueCount: &dequeueCount, + }}, + } +} + +func TestConsumeOne_DLQ_ThresholdTransition(t *testing.T) { + t.Parallel() + + var dlqEnqueueCalls, sourceDeleteCalls atomic.Int32 + + dlqMock := &azure.MockClient{ + EnqueueFunc: func(_ context.Context, _ string, _ *azqueue.EnqueueMessageOptions) (azqueue.EnqueueMessagesResponse, error) { + dlqEnqueueCalls.Add(1) + return azqueue.EnqueueMessagesResponse{}, nil + }, + } + sourceMock := &azure.MockClient{ + DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { + sourceDeleteCalls.Add(1) + return azqueue.DeleteMessageResponse{}, nil + }, + } + + broker := azure.NewTestBroker() + broker.SetRegisteredTaskNames([]string{"unknown-dlq-task"}) + broker.SetMockClientForTest(sourceMock) + broker.SetDLQClientForTest(dlqMock, 10, time.Hour) + + processor := &countingProcessor{} + + // DequeueCount=10: at threshold, must NOT redrive — processes normally. + broker.ConsumeOneForTest(dlqTestDelivery(10, validDLQTaskBody), processor) + assert.Equal(t, int32(0), dlqEnqueueCalls.Load(), "DequeueCount=10 must not trigger DLQ") + assert.Equal(t, int32(1), processor.count.Load(), "task should be processed normally at threshold") + assert.Equal(t, int32(1), sourceDeleteCalls.Load(), "source deleted after normal processing") + + // DequeueCount=11: over threshold, must redrive. + err := broker.ConsumeOneForTest(dlqTestDelivery(11, validDLQTaskBody), processor) + require.NoError(t, err) + assert.Equal(t, int32(1), dlqEnqueueCalls.Load(), "DequeueCount=11 must trigger DLQ") + assert.Equal(t, int32(1), processor.count.Load(), "message must not be processed when redriven to DLQ") + assert.Equal(t, int32(2), sourceDeleteCalls.Load(), "source deleted from DLQ path") +} + +func TestConsumeOne_DLQ_AboveThreshold_Redrives(t *testing.T) { + t.Parallel() + + var dlqEnqueueCalls, sourceDeleteCalls atomic.Int32 + var capturedContent string + var capturedTTL int32 + + dlqMock := &azure.MockClient{ + EnqueueFunc: func(_ context.Context, content string, opts *azqueue.EnqueueMessageOptions) (azqueue.EnqueueMessagesResponse, error) { + dlqEnqueueCalls.Add(1) + capturedContent = content + if opts != nil && opts.TimeToLive != nil { + capturedTTL = *opts.TimeToLive + } + return azqueue.EnqueueMessagesResponse{}, nil + }, + } + sourceMock := &azure.MockClient{ + DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { + sourceDeleteCalls.Add(1) + return azqueue.DeleteMessageResponse{}, nil + }, + } + + broker := azure.NewTestBroker() + broker.SetMockClientForTest(sourceMock) + broker.SetDLQClientForTest(dlqMock, 10, time.Hour) + + processor := &countingProcessor{} + body := validDLQTaskBody + err := broker.ConsumeOneForTest(dlqTestDelivery(11, body), processor) + + require.NoError(t, err) + assert.Equal(t, int32(1), dlqEnqueueCalls.Load()) + assert.Equal(t, body, capturedContent, "DLQ should receive the original message body") + assert.Equal(t, int32(time.Hour.Seconds()), capturedTTL, "DLQ enqueue should use the configured TTL") + assert.Equal(t, int32(0), processor.count.Load(), "message must not be processed when redriven to DLQ") + assert.Equal(t, int32(1), sourceDeleteCalls.Load()) +} + +func TestConsumeOne_DLQ_EnqueueFails_RetrySucceeds(t *testing.T) { + // Not parallel: broker state is mutated between the two consumeOne calls. + var dlqEnqueueCalls, sourceDeleteCalls atomic.Int32 + + sourceMock := &azure.MockClient{ + DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { + sourceDeleteCalls.Add(1) + return azqueue.DeleteMessageResponse{}, nil + }, + } + failingDLQ := &azure.MockClient{ + EnqueueFunc: func(_ context.Context, _ string, _ *azqueue.EnqueueMessageOptions) (azqueue.EnqueueMessagesResponse, error) { + dlqEnqueueCalls.Add(1) + return azqueue.EnqueueMessagesResponse{}, errors.New("DLQ unavailable") + }, + } + + broker := azure.NewTestBroker() + broker.SetMockClientForTest(sourceMock) + broker.SetDLQClientForTest(failingDLQ, 10, time.Hour) + + processor := &countingProcessor{} + delivery := dlqTestDelivery(11, validDLQTaskBody) + + // First attempt: DLQ enqueue fails — consumer must survive. + err := broker.ConsumeOneForTest(delivery, processor) + require.NoError(t, err, "consumer must survive DLQ enqueue failure") + assert.Equal(t, int32(1), dlqEnqueueCalls.Load(), "DLQ enqueue was attempted") + assert.Equal(t, int32(0), sourceDeleteCalls.Load(), "source must not be deleted when DLQ fails") + assert.Equal(t, int32(0), processor.count.Load(), "message must not be processed while DLQ redrive is in progress") + + // Simulate visibility-timeout redelivery: swap in a working DLQ client. + workingDLQ := &azure.MockClient{ + EnqueueFunc: func(_ context.Context, _ string, _ *azqueue.EnqueueMessageOptions) (azqueue.EnqueueMessagesResponse, error) { + dlqEnqueueCalls.Add(1) + return azqueue.EnqueueMessagesResponse{}, nil + }, + } + broker.SetDLQClientForTest(workingDLQ, 10, time.Hour) + + err = broker.ConsumeOneForTest(delivery, processor) + require.NoError(t, err) + assert.Equal(t, int32(2), dlqEnqueueCalls.Load(), "DLQ enqueue retried on second attempt") + assert.Equal(t, int32(1), sourceDeleteCalls.Load(), "source deleted after successful DLQ enqueue") + assert.Equal(t, int32(0), processor.count.Load(), "message must not be processed when redriven to DLQ") +} + +func TestConsumeOne_DLQ_DeleteFails_RetryEnqueuesDuplicate(t *testing.T) { + // Not parallel: broker state is mutated between the two consumeOne calls. + var dlqEnqueueCalls, sourceDeleteCalls atomic.Int32 + + dlqMock := &azure.MockClient{ + EnqueueFunc: func(_ context.Context, _ string, _ *azqueue.EnqueueMessageOptions) (azqueue.EnqueueMessagesResponse, error) { + dlqEnqueueCalls.Add(1) + return azqueue.EnqueueMessagesResponse{}, nil + }, + } + failingSourceMock := &azure.MockClient{ + DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { + sourceDeleteCalls.Add(1) + return azqueue.DeleteMessageResponse{}, errors.New("source delete failed") + }, + } + + broker := azure.NewTestBroker() + broker.SetMockClientForTest(failingSourceMock) + broker.SetDLQClientForTest(dlqMock, 10, time.Hour) + + processor := &countingProcessor{} + delivery := dlqTestDelivery(11, validDLQTaskBody) + + // First attempt: DLQ enqueue OK, source delete fails — consumer must survive. + err := broker.ConsumeOneForTest(delivery, processor) + require.NoError(t, err, "consumer must survive source delete failure") + assert.Equal(t, int32(1), dlqEnqueueCalls.Load(), "DLQ enqueue succeeded") + assert.Equal(t, int32(1), sourceDeleteCalls.Load(), "source delete was attempted") + assert.Equal(t, int32(0), processor.count.Load(), "message must not be processed while DLQ redrive is in progress") + + // Simulate visibility-timeout redelivery: swap in a working source mock. + workingSourceMock := &azure.MockClient{ + DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { + sourceDeleteCalls.Add(1) + return azqueue.DeleteMessageResponse{}, nil + }, + } + broker.SetMockClientForTest(workingSourceMock) + + err = broker.ConsumeOneForTest(delivery, processor) + require.NoError(t, err) + // DLQ receives a duplicate — this is the documented at-least-once semantic. + assert.Equal(t, int32(2), dlqEnqueueCalls.Load(), "DLQ receives a duplicate on retry (at-least-once)") + assert.Equal(t, int32(2), sourceDeleteCalls.Load(), "source successfully deleted on retry") + assert.Equal(t, int32(0), processor.count.Load(), "message must not be processed when redriven to DLQ") +} + +func TestConsumeOne_DLQ_Disabled_IgnoresDequeueCount(t *testing.T) { + t.Parallel() + + var sourceDeleteCalls atomic.Int32 + + sourceMock := &azure.MockClient{ + DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) { + sourceDeleteCalls.Add(1) + return azqueue.DeleteMessageResponse{}, nil + }, + } + + broker := azure.NewTestBroker() // no DLQ configured + broker.SetRegisteredTaskNames([]string{"unknown-dlq-task"}) + broker.SetMockClientForTest(sourceMock) + + processor := &countingProcessor{} + // Very high DequeueCount, but DLQ is not configured — normal processing path is taken. + broker.ConsumeOneForTest(dlqTestDelivery(100, validDLQTaskBody), processor) + + assert.Equal(t, int32(1), processor.count.Load(), "task was processed normally") + assert.Equal(t, int32(1), sourceDeleteCalls.Load(), "source deleted after normal processing") +} + + diff --git a/v1/config/config.go b/v1/config/config.go index 4c33477a..7be616fe 100644 --- a/v1/config/config.go +++ b/v1/config/config.go @@ -109,6 +109,11 @@ type AzureConfig struct { TTL time.Duration // Visibility timeout, same concept as SQS, how long to exclusively hold the message. VisibilityTimeout time.Duration + + // Optional DLQ. Set DLQ to enable; MaxReceives and DLQTTL have defaults when zero. + DLQ *azqueue.QueueClient // pre-bound DLQ queue client; nil ⇒ DLQ disabled + MaxReceives int64 // max DequeueCount before redriving; 0 with DLQ set ⇒ 10 + DLQTTL time.Duration // TTL for DLQ messages; 0 ⇒ 30 days } // RedisConfig ...