Skip to content

Commit c53529e

Browse files
Copilotkarenychen
andauthored
[sender] Add SendAsBatch() method with multiple batch support and deprecate SendMessageBatch() (#279)
* Initial plan * Implement SendAsBatch method with multiple batch support and deprecate SendMessageBatch Co-authored-by: karenychen <56372250+karenychen@users.noreply.github.com> * Add comprehensive documentation and usage examples for SendAsBatch method Co-authored-by: karenychen <56372250+karenychen@users.noreply.github.com> * Refactor SendAsBatch to follow Azure Service Bus patterns and eliminate code duplication Co-authored-by: karenychen <56372250+karenychen@users.noreply.github.com> * Remove example usage docs and reorder timeout configuration in SendAsBatch method Co-authored-by: karenychen <56372250+karenychen@users.noreply.github.com> * ut * clean up * fix * fix * Change SendAsBatch to return error for empty message arrays instead of sending empty batches Co-authored-by: karenychen <56372250+karenychen@users.noreply.github.com> * fix ut --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: karenychen <56372250+karenychen@users.noreply.github.com> Co-authored-by: Karen Chen <karenchen@microsoft.com>
1 parent f895dc1 commit c53529e

2 files changed

Lines changed: 251 additions & 21 deletions

File tree

v2/sender.go

Lines changed: 82 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package shuttle
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"reflect"
78
"sync"
@@ -20,6 +21,14 @@ const (
2021
// MessageBody is a type to represent that an input message body can be of any type
2122
type MessageBody any
2223

24+
// SendAsBatchOptions contains options for the SendAsBatch method
25+
type SendAsBatchOptions struct {
26+
// AllowMultipleBatch when true, allows splitting large message arrays into multiple batches.
27+
// When false, behaves like the original SendMessageBatch method.
28+
// Default: false
29+
AllowMultipleBatch bool
30+
}
31+
2332
// AzServiceBusSender is satisfied by *azservicebus.Sender
2433
type AzServiceBusSender interface {
2534
SendMessage(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error
@@ -138,31 +147,87 @@ func (d *Sender) ToServiceBusMessage(
138147
return msg, nil
139148
}
140149

141-
// SendMessageBatch sends the array of azservicebus messages as a batch.
142-
func (d *Sender) SendMessageBatch(ctx context.Context, messages []*azservicebus.Message) error {
150+
// SendAsBatch sends the array of azservicebus messages as batches.
151+
// When options.AllowMultipleBatch is true, large message arrays are split into multiple batches.
152+
// When options.AllowMultipleBatch is false, behaves like SendMessageBatch (fails if messages don't fit in single batch).
153+
func (d *Sender) SendAsBatch(ctx context.Context, messages []*azservicebus.Message, options *SendAsBatchOptions) error {
143154
// Check if there is a context error before doing anything since
144155
// we rely on context failures to detect if the sender is dead.
145156
if ctx.Err() != nil {
146-
return fmt.Errorf("failed to send message: %w", ctx.Err())
157+
return fmt.Errorf("failed to send message batch: %w", ctx.Err())
158+
}
159+
160+
if options == nil {
161+
options = &SendAsBatchOptions{AllowMultipleBatch: false}
162+
}
163+
164+
// Apply timeout for the entire operation
165+
if d.options.SendTimeout > 0 {
166+
var cancel func()
167+
ctx, cancel = context.WithTimeout(ctx, d.options.SendTimeout)
168+
defer cancel()
147169
}
148170

149-
batch, err := d.newMessageBatch(ctx, &azservicebus.MessageBatchOptions{})
171+
if len(messages) == 0 {
172+
return fmt.Errorf("cannot send empty message array")
173+
}
174+
175+
// Create a message batch. It will automatically be sized for the Service Bus
176+
// namespace's maximum message size.
177+
currentMessageBatch, err := d.newMessageBatch(ctx, nil)
150178
if err != nil {
151179
return err
152180
}
153-
for _, msg := range messages {
154-
if err := batch.AddMessage(msg, nil); err != nil {
181+
182+
for i := 0; i < len(messages); i++ {
183+
// Add a message to our message batch. This can be called multiple times.
184+
err = currentMessageBatch.AddMessage(messages[i], nil)
185+
186+
if err != nil && errors.Is(err, azservicebus.ErrMessageTooLarge) {
187+
if currentMessageBatch.NumMessages() == 0 {
188+
// This means the message itself is too large to be sent, even on its own.
189+
// This will require intervention from the user.
190+
return fmt.Errorf("single message is too large to be sent in a batch: %w", err)
191+
}
192+
193+
// Message batch is full. Send it and create a new one.
194+
if !options.AllowMultipleBatch {
195+
// For single batch mode, return error if messages don't fit
196+
return fmt.Errorf("messages do not fit in a single batch: %w", err)
197+
}
198+
199+
// Send what we have since the batch is full
200+
if err := d.sendBatch(ctx, currentMessageBatch); err != nil {
201+
return err
202+
}
203+
204+
// Create a new batch and retry adding this message to our batch.
205+
newBatch, err := d.newMessageBatch(ctx, nil)
206+
if err != nil {
207+
return err
208+
}
209+
210+
currentMessageBatch = newBatch
211+
212+
// rewind the counter and attempt to add the message again (this batch
213+
// was full so it didn't go out with the previous sendBatch call).
214+
i--
215+
} else if err != nil {
155216
return err
156217
}
157218
}
158-
if d.options.SendTimeout > 0 {
159-
var cancel func()
160-
ctx, cancel = context.WithTimeout(ctx, d.options.SendTimeout)
161-
defer cancel()
219+
220+
// check if any messages are remaining to be sent.
221+
if currentMessageBatch.NumMessages() > 0 {
222+
return d.sendBatch(ctx, currentMessageBatch)
162223
}
163224

164-
errChan := make(chan error)
225+
return nil
226+
}
165227

228+
// sendBatch sends a single message batch with proper error handling and metrics
229+
func (d *Sender) sendBatch(ctx context.Context, batch *azservicebus.MessageBatch) error {
230+
errChan := make(chan error)
166231
go func() {
167232
if err := d.sendMessageBatch(ctx, batch, nil); err != nil {
168233
errChan <- fmt.Errorf("failed to send message batch: %w", err)
@@ -185,6 +250,12 @@ func (d *Sender) SendMessageBatch(ctx context.Context, messages []*azservicebus.
185250
}
186251
}
187252

253+
// SendMessageBatch sends the array of azservicebus messages as a batch.
254+
// Deprecated: Use SendAsBatch instead. This method will be removed in a future version.
255+
func (d *Sender) SendMessageBatch(ctx context.Context, messages []*azservicebus.Message) error {
256+
return d.SendAsBatch(ctx, messages, &SendAsBatchOptions{AllowMultipleBatch: false})
257+
}
258+
188259
func (d *Sender) sendMessage(ctx context.Context, msg *azservicebus.Message, options *azservicebus.SendMessageOptions) error {
189260
d.mu.RLock()
190261
defer d.mu.RUnlock()

v2/sender_test.go

Lines changed: 169 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,8 @@ func TestSender_WithDefaultSendTimeout(t *testing.T) {
159159
})
160160
err := sender.SendMessage(context.Background(), "test")
161161
g.Expect(err).ToNot(HaveOccurred())
162-
err = sender.SendMessageBatch(context.Background(), nil)
163-
g.Expect(err).ToNot(HaveOccurred())
162+
err = sender.SendMessageBatch(context.Background(), []*azservicebus.Message{})
163+
g.Expect(err).To(HaveOccurred())
164164
}
165165

166166
func TestSender_WithSendTimeout(t *testing.T) {
@@ -186,14 +186,19 @@ func TestSender_WithSendTimeout(t *testing.T) {
186186
})
187187
err := sender.SendMessage(context.Background(), "test")
188188
g.Expect(err).ToNot(HaveOccurred())
189-
err = sender.SendMessageBatch(context.Background(), nil)
190-
g.Expect(err).ToNot(HaveOccurred())
189+
err = sender.SendMessageBatch(context.Background(), []*azservicebus.Message{})
190+
g.Expect(err).To(HaveOccurred())
191+
err = sender.SendAsBatch(context.Background(), []*azservicebus.Message{}, nil)
192+
g.Expect(err).To(HaveOccurred())
193+
err = sender.SendAsBatch(context.Background(), []*azservicebus.Message{}, &SendAsBatchOptions{AllowMultipleBatch: true})
194+
g.Expect(err).To(HaveOccurred())
191195
}
192196

193197
func TestSender_WithContextCanceled(t *testing.T) {
194198
g := NewWithT(t)
195199
sendTimeout := 1 * time.Second
196200
azSender := &fakeAzSender{
201+
NewMessageBatchReturnValue: &azservicebus.MessageBatch{},
197202
DoSendMessage: func(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error {
198203
time.Sleep(2 * time.Second)
199204
return nil
@@ -210,8 +215,8 @@ func TestSender_WithContextCanceled(t *testing.T) {
210215

211216
err := sender.SendMessage(context.Background(), "test")
212217
g.Expect(err).To(MatchError(context.DeadlineExceeded))
213-
err = sender.SendMessageBatch(context.Background(), nil)
214-
g.Expect(err).To(MatchError(context.DeadlineExceeded))
218+
err = sender.SendMessageBatch(context.Background(), []*azservicebus.Message{})
219+
g.Expect(err).To(HaveOccurred()) // error for empty messages instead of timeout
215220
}
216221

217222
func TestSender_SendWithCanceledContext(t *testing.T) {
@@ -233,14 +238,15 @@ func TestSender_SendWithCanceledContext(t *testing.T) {
233238

234239
err := sender.SendMessage(ctx, "test")
235240
g.Expect(err).To(MatchError(context.Canceled))
236-
err = sender.SendMessageBatch(ctx, nil)
241+
err = sender.SendMessageBatch(ctx, []*azservicebus.Message{})
237242
g.Expect(err).To(MatchError(context.Canceled))
238243
}
239244

240245
func TestSender_DisabledSendTimeout(t *testing.T) {
241246
g := NewWithT(t)
242247
sendTimeout := -1 * time.Second
243248
azSender := &fakeAzSender{
249+
NewMessageBatchReturnValue: &azservicebus.MessageBatch{},
244250
DoSendMessage: func(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error {
245251
_, ok := ctx.Deadline()
246252
g.Expect(ok).To(BeFalse())
@@ -258,8 +264,8 @@ func TestSender_DisabledSendTimeout(t *testing.T) {
258264
})
259265
err := sender.SendMessage(context.Background(), "test")
260266
g.Expect(err).ToNot(HaveOccurred())
261-
err = sender.SendMessageBatch(context.Background(), nil)
262-
g.Expect(err).ToNot(HaveOccurred())
267+
err = sender.SendMessageBatch(context.Background(), []*azservicebus.Message{})
268+
g.Expect(err).To(HaveOccurred())
263269
}
264270

265271
func TestSender_SendMessage(t *testing.T) {
@@ -286,8 +292,8 @@ func TestSender_SendMessageBatch(t *testing.T) {
286292
msg, err := sender.ToServiceBusMessage(context.Background(), "test")
287293
g.Expect(err).ToNot(HaveOccurred())
288294
err = sender.SendMessageBatch(context.Background(), []*azservicebus.Message{msg})
295+
// no way to create a MessageBatch struct with a non-0 max bytes in test, so the best we can do is expect an error.
289296
g.Expect(err).To(HaveOccurred())
290-
// No way to create a MessageBatch struct with a non-0 max bytes in test, so the best we can do is expect an error.
291297
}
292298

293299
func TestSender_AzSender(t *testing.T) {
@@ -341,6 +347,150 @@ func TestSender_ConcurrentSendAndSetAzSender(t *testing.T) {
341347
g.Expect(azSender2.SendMessageCalled).To(BeTrue())
342348
}
343349

350+
func TestSender_SendAsBatch_EmptyMessages(t *testing.T) {
351+
g := NewWithT(t)
352+
azSender := &fakeAzSender{}
353+
sender := NewSender(azSender, nil)
354+
355+
options := &SendAsBatchOptions{AllowMultipleBatch: true}
356+
err := sender.SendAsBatch(context.Background(), []*azservicebus.Message{}, options)
357+
g.Expect(err).To(HaveOccurred())
358+
g.Expect(err.Error()).To(ContainSubstring("cannot send empty message array"))
359+
// Should not call send since error returned early
360+
g.Expect(azSender.SendMessageBatchCalled).To(BeFalse())
361+
// No batches should be created
362+
g.Expect(azSender.BatchesCreated).To(Equal(0))
363+
}
364+
365+
func TestSender_SendAsBatch_EmptyMessages_SingleBatch(t *testing.T) {
366+
g := NewWithT(t)
367+
azSender := &fakeAzSender{
368+
NewMessageBatchReturnValue: &azservicebus.MessageBatch{},
369+
}
370+
sender := NewSender(azSender, nil)
371+
372+
options := &SendAsBatchOptions{AllowMultipleBatch: false}
373+
err := sender.SendAsBatch(context.Background(), []*azservicebus.Message{}, options)
374+
// Should fail because empty message array is not allowed
375+
g.Expect(err).To(HaveOccurred())
376+
g.Expect(err.Error()).To(ContainSubstring("cannot send empty message array"))
377+
// Should not attempt to send
378+
g.Expect(azSender.SendMessageBatchCalled).To(BeFalse())
379+
// No batch should be created
380+
g.Expect(azSender.BatchesCreated).To(Equal(0))
381+
}
382+
383+
func TestSender_SendAsBatch_ContextCanceled(t *testing.T) {
384+
g := NewWithT(t)
385+
azSender := &fakeAzSender{}
386+
sender := NewSender(azSender, nil)
387+
388+
ctx, cancel := context.WithCancel(context.Background())
389+
cancel()
390+
391+
msg, err := sender.ToServiceBusMessage(context.Background(), "test")
392+
g.Expect(err).ToNot(HaveOccurred())
393+
394+
options := &SendAsBatchOptions{AllowMultipleBatch: true}
395+
err = sender.SendAsBatch(ctx, []*azservicebus.Message{msg}, options)
396+
g.Expect(err).To(MatchError(context.Canceled))
397+
}
398+
399+
func TestSender_SendAsBatch_NewMessageBatchError(t *testing.T) {
400+
g := NewWithT(t)
401+
expectedErr := fmt.Errorf("batch creation failed")
402+
azSender := &fakeAzSender{
403+
NewMessageBatchErr: expectedErr,
404+
}
405+
sender := NewSender(azSender, nil)
406+
407+
msg, err := sender.ToServiceBusMessage(context.Background(), "test")
408+
g.Expect(err).ToNot(HaveOccurred())
409+
410+
options := &SendAsBatchOptions{AllowMultipleBatch: true}
411+
err = sender.SendAsBatch(context.Background(), []*azservicebus.Message{msg}, options)
412+
g.Expect(err).To(Equal(expectedErr))
413+
g.Expect(azSender.BatchesCreated).To(Equal(1))
414+
g.Expect(azSender.SendMessageBatchCalled).To(BeFalse()) // Should not try to send if batch creation fails
415+
}
416+
417+
func TestSender_SendAsBatch_SingleBatch_Success(t *testing.T) {
418+
g := NewWithT(t)
419+
420+
azSender := &fakeAzSender{
421+
NewMessageBatchReturnValue: &azservicebus.MessageBatch{},
422+
DoSendMessageBatch: func(ctx context.Context, batch *azservicebus.MessageBatch, options *azservicebus.SendMessageBatchOptions) error {
423+
return nil
424+
},
425+
}
426+
427+
sender := NewSender(azSender, &SenderOptions{
428+
Marshaller: &DefaultJSONMarshaller{},
429+
})
430+
431+
// Create a message (the real batch will fail to add it due to zero size, but we can test the logic)
432+
msg, err := sender.ToServiceBusMessage(context.Background(), "test")
433+
g.Expect(err).ToNot(HaveOccurred())
434+
435+
options := &SendAsBatchOptions{AllowMultipleBatch: true}
436+
err = sender.SendAsBatch(context.Background(), []*azservicebus.Message{msg}, options)
437+
438+
// no way to create a MessageBatch struct with a non-0 max bytes in test, so the best we can do is expect an error.
439+
g.Expect(err).To(HaveOccurred()) // Real MessageBatch fails in tests due to zero max size
440+
g.Expect(azSender.BatchesCreated).To(Equal(1))
441+
g.Expect(azSender.BatchesSent).To(Equal(0)) // No batches sent due to AddMessage failure
442+
}
443+
444+
func TestSender_SendAsBatch_MessageTooLarge_SingleMessage(t *testing.T) {
445+
g := NewWithT(t)
446+
447+
azSender := &fakeAzSender{
448+
NewMessageBatchReturnValue: &azservicebus.MessageBatch{}, // Real MessageBatch with 0 max size
449+
}
450+
451+
sender := NewSender(azSender, &SenderOptions{
452+
Marshaller: &DefaultJSONMarshaller{},
453+
})
454+
455+
// Create any message - it will be too large for the real MessageBatch with 0 max size
456+
msg, err := sender.ToServiceBusMessage(context.Background(), "test")
457+
g.Expect(err).ToNot(HaveOccurred())
458+
459+
options := &SendAsBatchOptions{AllowMultipleBatch: true}
460+
err = sender.SendAsBatch(context.Background(), []*azservicebus.Message{msg}, options)
461+
462+
// Should fail because any message is too large for a real MessageBatch in tests
463+
g.Expect(err).To(HaveOccurred())
464+
g.Expect(err.Error()).To(ContainSubstring("single message is too large"))
465+
g.Expect(azSender.BatchesCreated).To(Equal(1))
466+
}
467+
468+
func TestSender_SendAsBatch_SingleBatch_TooManyMessages_AllowMultipleFalse(t *testing.T) {
469+
g := NewWithT(t)
470+
azSender := &fakeAzSender{
471+
NewMessageBatchReturnValue: &azservicebus.MessageBatch{},
472+
}
473+
sender := NewSender(azSender, &SenderOptions{
474+
Marshaller: &DefaultJSONMarshaller{},
475+
})
476+
477+
// Create multiple messages
478+
messages := make([]*azservicebus.Message, 3)
479+
for i := range messages {
480+
msg, err := sender.ToServiceBusMessage(context.Background(), fmt.Sprintf("test%d", i))
481+
g.Expect(err).ToNot(HaveOccurred())
482+
messages[i] = msg
483+
}
484+
485+
options := &SendAsBatchOptions{AllowMultipleBatch: false}
486+
err := sender.SendAsBatch(context.Background(), messages, options)
487+
488+
// Should fail because messages don't fit in single batch and multiple batches not allowed
489+
// The real MessageBatch has max size 0 in tests, so AddMessage will fail immediately
490+
g.Expect(err).To(HaveOccurred())
491+
g.Expect(azSender.BatchesCreated).To(Equal(1))
492+
}
493+
344494
type fakeAzSender struct {
345495
mu sync.RWMutex
346496
DoSendMessage func(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error
@@ -355,6 +505,9 @@ type fakeAzSender struct {
355505
NewMessageBatchErr error
356506
SendMessageBatchReceivedValue *azservicebus.MessageBatch
357507
CloseErr error
508+
509+
BatchesCreated int // Track how many batches were created
510+
BatchesSent int // Track how many batches were sent
358511
}
359512

360513
func (f *fakeAzSender) SendMessage(
@@ -382,6 +535,8 @@ func (f *fakeAzSender) SendMessageBatch(
382535
defer f.mu.Unlock()
383536
f.SendMessageBatchCalled = true
384537
f.SendMessageBatchReceivedValue = batch
538+
f.BatchesSent++
539+
385540
if f.DoSendMessageBatch != nil {
386541
if err := f.DoSendMessageBatch(ctx, batch, options); err != nil {
387542
return err
@@ -393,6 +548,10 @@ func (f *fakeAzSender) SendMessageBatch(
393548
func (f *fakeAzSender) NewMessageBatch(
394549
ctx context.Context,
395550
options *azservicebus.MessageBatchOptions) (*azservicebus.MessageBatch, error) {
551+
f.mu.Lock()
552+
defer f.mu.Unlock()
553+
f.BatchesCreated++
554+
396555
return f.NewMessageBatchReturnValue, f.NewMessageBatchErr
397556
}
398557

0 commit comments

Comments
 (0)