Skip to content

Commit ebd6e2b

Browse files
committed
feat(worker): Add retryTask method and integrate with task loop
1 parent 76791b9 commit ebd6e2b

2 files changed

Lines changed: 101 additions & 8 deletions

File tree

tests/worker_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,4 +194,76 @@ func TestWorkerProcesses(t *testing.T) {
194194

195195
assert.Contains(t, logOutput.String(), "Task channel closed, worker exiting...")
196196
})
197+
198+
t.Run("should handle publish failure", func(t *testing.T) {
199+
mockBroker := NewMockBroker(1)
200+
wg := &sync.WaitGroup{}
201+
cfg := taskqueue.WorkerConfig{ID: 1, Broker: mockBroker, Backoff: nil, WG: wg}
202+
worker := taskqueue.DefaultWorkerFactory(cfg)
203+
worker.Register(failTaskName, failTaskHandler)
204+
205+
task := taskqueue.Task{
206+
Name: failTaskName,
207+
Args: taskqueue.TaskArgs{},
208+
MaxRetry: 3,
209+
}
210+
err := mockBroker.Publish(task)
211+
assert.Nil(t, err)
212+
213+
ctx, cancel := context.WithCancel(context.Background())
214+
defer cancel()
215+
216+
var logOutput bytes.Buffer
217+
log.SetOutput(&logOutput)
218+
defer log.SetOutput(nil)
219+
220+
wg.Add(1)
221+
go worker.Start(ctx)
222+
mockBroker.badPublish = true
223+
time.Sleep(100 * time.Millisecond)
224+
225+
cancel()
226+
wg.Wait()
227+
228+
expected := fmt.Sprintf("Worker %d: failed to re-queue task %s", cfg.ID, task.Name)
229+
assert.Contains(t, logOutput.String(), expected)
230+
})
231+
232+
t.Run("should handle publish failure with backoff", func(t *testing.T) {
233+
backoff := &taskqueue.BackoffPolicy{
234+
BaseDelay: 1 * time.Second, // small delay for fast test
235+
MaxDelay: 20 * time.Second,
236+
}
237+
mockBroker := NewMockBroker(1)
238+
wg := &sync.WaitGroup{}
239+
cfg := taskqueue.WorkerConfig{ID: 1, Broker: mockBroker, Backoff: backoff, WG: wg}
240+
worker := taskqueue.DefaultWorkerFactory(cfg)
241+
worker.Register(failTaskName, failTaskHandler)
242+
243+
task := taskqueue.Task{
244+
Name: failTaskName,
245+
Args: taskqueue.TaskArgs{},
246+
MaxRetry: 3,
247+
}
248+
err := mockBroker.Publish(task)
249+
assert.Nil(t, err)
250+
251+
ctx, cancel := context.WithCancel(context.Background())
252+
defer cancel()
253+
254+
var logOutput bytes.Buffer
255+
log.SetOutput(&logOutput)
256+
defer log.SetOutput(nil)
257+
258+
wg.Add(1)
259+
go worker.Start(ctx)
260+
mockBroker.badPublish = true
261+
time.Sleep(100 * time.Millisecond)
262+
263+
cancel()
264+
wg.Wait()
265+
266+
expected := fmt.Sprintf("Worker %d: failed to re-queue task %s", cfg.ID, task.Name)
267+
assert.Contains(t, logOutput.String(), expected)
268+
})
197269
}

worker.go

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package taskqueue
22

33
import (
44
"context"
5+
"fmt"
56
"log"
67
"sync"
78
"time"
@@ -190,16 +191,36 @@ func (w *DefaultWorker) Start(ctx context.Context) {
190191
log.Printf("Worker %d: task %s exceeded retries\n", w.id, task.Name)
191192
continue
192193
}
194+
w.retryTask(ctx, task)
195+
}
196+
}
197+
}
193198

194-
task.Retry++
195-
if w.backoff != nil {
196-
backoffDelay := w.backoff.Calculate(task.Retry)
197-
log.Printf("Task %s failed, backing off for %v (retry %d of %d)", task.Name, backoffDelay, task.Retry, task.MaxRetry)
198-
time.Sleep(backoffDelay)
199-
}
199+
func (w *DefaultWorker) retryTask(ctx context.Context, task Task) error {
200+
failedToRepublish := fmt.Sprintf("Worker %d: failed to re-queue task %s", w.id, task.Name)
201+
202+
task.Retry++
203+
if w.backoff != nil {
204+
backoffDelay := w.backoff.Calculate(task.Retry)
205+
log.Printf("Task %s failed, backing off for %v (retry %d of %d)", task.Name, backoffDelay, task.Retry, task.MaxRetry)
206+
207+
select {
208+
case <-time.After(backoffDelay):
200209

201-
log.Printf("Task %s failed, retrying now (retry %d of %d)", task.Name, task.Retry, task.MaxRetry)
202-
w.broker.Publish(task)
210+
case <-ctx.Done():
211+
log.Printf("Worker %d: context cancelled during backoff, re-queuing task %s", w.id, task.Name)
212+
err := w.broker.Publish(task)
213+
if err != nil {
214+
log.Println(failedToRepublish)
215+
}
216+
return err
203217
}
204218
}
219+
220+
log.Printf("Task %s failed, retrying now (retry %d of %d)", task.Name, task.Retry, task.MaxRetry)
221+
err := w.broker.Publish(task)
222+
if err != nil {
223+
log.Println(failedToRepublish)
224+
}
225+
return err
205226
}

0 commit comments

Comments
 (0)