Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions v1/brokers/azure/storage_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (

const (
maxDelay = time.Minute * 7 // Max supported Visibility Timeout
// decodeFailureDeleteThreshold is the DequeueCount at which an undecodable
// message is deleted. Set above the default DLQ threshold (10) so that
// queues with a DLQ still route there first.
decodeFailureDeleteThreshold int64 = 15
)

type queueClient interface {
Expand All @@ -38,6 +42,9 @@ type Broker struct {
cfg config.AzureConfig
queueName string
newQueueClient func(string) queueClient
dlqClient queueClient // nil ⇒ DLQ disabled
maxReceives int64
dlqTTL time.Duration
}

// New creates new Broker instance
Expand All @@ -48,6 +55,17 @@ func New(cnf *config.Config) iface.Broker {
b.newQueueClient = func(name string) queueClient {
return cnf.Azure.Client.NewQueueClient(name)
}
if cnf.Azure.DLQ != nil {
b.dlqClient = cnf.Azure.DLQ
b.maxReceives = cnf.Azure.MaxReceives
if b.maxReceives <= 0 {
b.maxReceives = 10
}
b.dlqTTL = cnf.Azure.DLQTTL
if b.dlqTTL <= 0 {
b.dlqTTL = 30 * 24 * time.Hour
}
}

return b
}
Expand Down Expand Up @@ -261,13 +279,28 @@ func (b *Broker) consumeOne(delivery azqueue.DequeueMessagesResponse, taskProces

msg := delivery.Messages[0]

if b.dlqClient != nil && msg.DequeueCount != nil && *msg.DequeueCount > b.maxReceives {
if err := b.dlqOne(msg); err != nil {
log.ERROR.Printf("error enqueueing message %s to DLQ: %s", *msg.MessageID, err)
return nil // leave on source; visibility timeout will retry
}
if err := b.deleteOne(msg); err != nil {
log.ERROR.Printf("error deleting message %s from source after DLQ enqueue: %s", *msg.MessageID, err)
return nil // duplicate in DLQ on next attempt; do not exit consume loop
}
log.INFO.Printf("moved message %s to DLQ after %d receives", *msg.MessageID, *msg.DequeueCount)
return nil
}

sig := new(tasks.Signature)
decoder := json.NewDecoder(strings.NewReader(*msg.MessageText))
decoder.UseNumber()
if err := decoder.Decode(sig); err != nil {
log.ERROR.Printf("unmarshal error. the delivery is %v", delivery)
if delErr := b.deleteOne(msg); delErr != nil {
log.ERROR.Printf("error when deleting the delivery. delivery is %v, Error=%s", delivery, delErr)
if msg.DequeueCount != nil && *msg.DequeueCount >= decodeFailureDeleteThreshold {
if delErr := b.deleteOne(msg); delErr != nil {
log.ERROR.Printf("error when deleting the delivery. delivery is %v, Error=%s", delivery, delErr)
}
}
// Never return an error — doing so would kill the consumer loop.
return nil
Expand Down Expand Up @@ -333,6 +366,17 @@ func (b *Broker) consumeOne(delivery azqueue.DequeueMessagesResponse, taskProces
return err
}

// dlqOne enqueues a message to the configured DLQ.
func (b *Broker) dlqOne(msg *azqueue.DequeuedMessage) error {
ttlSeconds := int32(b.dlqTTL.Seconds())
_, err := b.dlqClient.EnqueueMessage(
context.Background(),
*msg.MessageText,
&azqueue.EnqueueMessageOptions{TimeToLive: &ttlSeconds},
)
return err
}

// deleteOne is a method delete a delivery from AWS SQS
func (b *Broker) deleteOne(message *azqueue.DequeuedMessage) error {
_, err := b.newQueueClient(b.queueName).DeleteMessage(context.Background(), *message.MessageID, *message.PopReceipt, nil)
Expand Down
18 changes: 18 additions & 0 deletions v1/brokers/azure/storage_queue_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package azure
import (
"context"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue"
"github.com/RichardKnop/machinery/v1/brokers/iface"
Expand Down Expand Up @@ -85,3 +86,20 @@ func (b *Broker) GetStopReceivingChanForTest() chan int {
func (b *Broker) ConsumeOneForTest(delivery azqueue.DequeueMessagesResponse, taskProcessor iface.TaskProcessor) error {
return b.consumeOne(delivery, taskProcessor)
}

// SetDLQClientForTest configures the broker's DLQ client and applies the same
// defaulting logic as New() (0 maxReceives → 10, 0 dlqTTL → 30 days).
// Pass c=nil to disable DLQ.
func (b *Broker) SetDLQClientForTest(c queueClient, maxReceives int64, dlqTTL time.Duration) {
b.dlqClient = c
if c != nil {
b.maxReceives = maxReceives
if b.maxReceives <= 0 {
b.maxReceives = 10
}
b.dlqTTL = dlqTTL
if b.dlqTTL <= 0 {
b.dlqTTL = 30 * 24 * time.Hour
}
}
}
45 changes: 43 additions & 2 deletions v1/brokers/azure/storage_queue_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue"
"github.com/RichardKnop/machinery/v1/brokers/errs"
"github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v1/tasks"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -38,6 +39,12 @@ func makeDelivery(msgText, msgID, popReceipt string) azqueue.DequeueMessagesResp
}
}

func makeDeliveryWithCount(msgText, msgID, popReceipt string, dequeueCount int64) azqueue.DequeueMessagesResponse {
d := makeDelivery(msgText, msgID, popReceipt)
d.Messages[0].DequeueCount = &dequeueCount
return d
}

func TestBadRequestErrRegex(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -87,7 +94,7 @@ func TestConsumeOne_ValidMessage_Success(t *testing.T) {
assert.Equal(t, "msg-id", deletedID)
}

func TestConsumeOne_InvalidJSON(t *testing.T) {
func TestConsumeOne_InvalidJSON_BelowDeleteThreshold(t *testing.T) {
deleted := false
client := &MockClient{
DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) {
Expand All @@ -99,7 +106,26 @@ func TestConsumeOne_InvalidJSON(t *testing.T) {
broker := NewTestBroker()
broker.SetMockClientForTest(client)

err := broker.consumeOne(makeDelivery("not valid json", "msg-id", "pop-receipt"), nil)
// DequeueCount below decodeFailureDeleteThreshold — leave on queue so a
// configured DLQ can still catch it before we give up and delete.
err := broker.consumeOne(makeDeliveryWithCount("not valid json", "msg-id", "pop-receipt", decodeFailureDeleteThreshold-1), nil)
assert.NoError(t, err)
assert.False(t, deleted)
}

func TestConsumeOne_InvalidJSON_AtDeleteThreshold(t *testing.T) {
deleted := false
client := &MockClient{
DeleteFunc: func(_ context.Context, _, _ string, _ *azqueue.DeleteMessageOptions) (azqueue.DeleteMessageResponse, error) {
deleted = true
return azqueue.DeleteMessageResponse{}, nil
},
}

broker := NewTestBroker()
broker.SetMockClientForTest(client)

err := broker.consumeOne(makeDeliveryWithCount("not valid json", "msg-id", "pop-receipt", decodeFailureDeleteThreshold), nil)
assert.NoError(t, err)
assert.True(t, deleted)
}
Expand Down Expand Up @@ -353,3 +379,18 @@ func TestDeleteOne_Error(t *testing.T) {
err := broker.deleteOne(&azqueue.DequeuedMessage{MessageID: new("msg-id"), PopReceipt: new("pop-receipt")})
assert.ErrorContains(t, err, "delete error")
}

func TestNew_DLQ_Defaults(t *testing.T) {
t.Parallel()

cnf := &config.Config{
DefaultQueue: "test_queue",
Azure: &config.AzureConfig{
Client: &azqueue.ServiceClient{},
DLQ: &azqueue.QueueClient{}, // non-nil enables DLQ branch in New()
},
}
b := New(cnf).(*Broker)
assert.Equal(t, int64(10), b.maxReceives, "maxReceives should default to 10 when zero")
assert.Equal(t, 30*24*time.Hour, b.dlqTTL, "dlqTTL should default to 30 days when zero")
}
Loading
Loading