Skip to content

Commit eadc679

Browse files
intel352claude
andcommitted
feat: add DLQ, retry, and circuit breaker pipeline steps
- step.dlq_send: sends failed messages to a DLQ topic with error metadata - step.dlq_replay: replays DLQ messages back to the original topic (single message or batch via "messages" array in context) - step.retry_with_backoff: wraps a sub-step with exponential backoff retry logic (max_retries, initial_delay, max_delay, multiplier) - step.resilient_circuit_breaker: wraps a sub-step with circuit breaker protection (failure_threshold, reset_timeout) with optional fallback step All steps registered in plugins/pipelinesteps and cmd/wfctl/type_registry. Tests cover factory validation, happy paths, and failure modes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 3800b82 commit eadc679

6 files changed

Lines changed: 1251 additions & 1 deletion

File tree

cmd/wfctl/type_registry.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,26 @@ func KnownStepTypes() map[string]StepTypeInfo {
621621
Plugin: "pipelinesteps",
622622
ConfigKeys: []string{"cache", "key"},
623623
},
624+
"step.dlq_send": {
625+
Type: "step.dlq_send",
626+
Plugin: "pipelinesteps",
627+
ConfigKeys: []string{"topic", "original_topic", "error", "payload", "broker"},
628+
},
629+
"step.dlq_replay": {
630+
Type: "step.dlq_replay",
631+
Plugin: "pipelinesteps",
632+
ConfigKeys: []string{"dlq_topic", "target_topic", "max_messages", "broker"},
633+
},
634+
"step.retry_with_backoff": {
635+
Type: "step.retry_with_backoff",
636+
Plugin: "pipelinesteps",
637+
ConfigKeys: []string{"max_retries", "initial_delay", "max_delay", "multiplier", "step"},
638+
},
639+
"step.resilient_circuit_breaker": {
640+
Type: "step.resilient_circuit_breaker",
641+
Plugin: "pipelinesteps",
642+
ConfigKeys: []string{"failure_threshold", "reset_timeout", "step", "fallback"},
643+
},
624644

625645
// http plugin steps
626646
"step.rate_limit": {

module/pipeline_step_dlq.go

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
package module
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"time"
8+
9+
"github.com/CrisisTextLine/modular"
10+
"github.com/CrisisTextLine/modular/modules/eventbus/v2"
11+
)
12+
13+
// DLQSendStep sends a failed message to a dead letter queue topic.
14+
type DLQSendStep struct {
15+
name string
16+
topic string
17+
originalTopic string
18+
errTemplate string
19+
payload map[string]any
20+
broker string
21+
app modular.Application
22+
tmpl *TemplateEngine
23+
}
24+
25+
// NewDLQSendStepFactory returns a StepFactory that creates DLQSendStep instances.
26+
func NewDLQSendStepFactory() StepFactory {
27+
return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) {
28+
topic, _ := config["topic"].(string)
29+
if topic == "" {
30+
return nil, fmt.Errorf("dlq_send step %q: 'topic' is required", name)
31+
}
32+
33+
step := &DLQSendStep{
34+
name: name,
35+
topic: topic,
36+
app: app,
37+
tmpl: NewTemplateEngine(),
38+
}
39+
40+
step.originalTopic, _ = config["original_topic"].(string)
41+
step.errTemplate, _ = config["error"].(string)
42+
step.broker, _ = config["broker"].(string)
43+
44+
if payload, ok := config["payload"].(map[string]any); ok {
45+
step.payload = payload
46+
}
47+
48+
return step, nil
49+
}
50+
}
51+
52+
// Name returns the step name.
53+
func (s *DLQSendStep) Name() string { return s.name }
54+
55+
// Execute sends the current message to the DLQ topic with error metadata.
56+
func (s *DLQSendStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) {
57+
resolvedTopic, err := s.tmpl.Resolve(s.topic, pc)
58+
if err != nil {
59+
return nil, fmt.Errorf("dlq_send step %q: failed to resolve topic: %w", s.name, err)
60+
}
61+
62+
var resolvedPayload map[string]any
63+
if s.payload != nil {
64+
resolvedPayload, err = s.tmpl.ResolveMap(s.payload, pc)
65+
if err != nil {
66+
return nil, fmt.Errorf("dlq_send step %q: failed to resolve payload: %w", s.name, err)
67+
}
68+
} else {
69+
resolvedPayload = pc.Current
70+
}
71+
72+
errMsg := ""
73+
if s.errTemplate != "" {
74+
errMsg, _ = s.tmpl.Resolve(s.errTemplate, pc)
75+
}
76+
77+
envelope := map[string]any{
78+
"payload": resolvedPayload,
79+
"sent_at": time.Now().UTC().Format(time.RFC3339),
80+
}
81+
if s.originalTopic != "" {
82+
envelope["original_topic"] = s.originalTopic
83+
}
84+
if errMsg != "" {
85+
envelope["error"] = errMsg
86+
}
87+
88+
if s.broker != "" {
89+
return s.sendViaBroker(resolvedTopic, envelope)
90+
}
91+
return s.sendViaEventBus(ctx, resolvedTopic, envelope)
92+
}
93+
94+
func (s *DLQSendStep) sendViaBroker(topic string, envelope map[string]any) (*StepResult, error) {
95+
var broker MessageBroker
96+
if err := s.app.GetService(s.broker, &broker); err != nil {
97+
return nil, fmt.Errorf("dlq_send step %q: broker service %q not found: %w", s.name, s.broker, err)
98+
}
99+
100+
data, err := json.Marshal(envelope)
101+
if err != nil {
102+
return nil, fmt.Errorf("dlq_send step %q: failed to marshal envelope: %w", s.name, err)
103+
}
104+
105+
if err := broker.Producer().SendMessage(topic, data); err != nil {
106+
return nil, fmt.Errorf("dlq_send step %q: failed to send to DLQ via broker: %w", s.name, err)
107+
}
108+
109+
return &StepResult{Output: map[string]any{"dlq_sent": true, "topic": topic}}, nil
110+
}
111+
112+
func (s *DLQSendStep) sendViaEventBus(ctx context.Context, topic string, envelope map[string]any) (*StepResult, error) {
113+
var eb *eventbus.EventBusModule
114+
if err := s.app.GetService("eventbus.provider", &eb); err != nil || eb == nil {
115+
return nil, fmt.Errorf("dlq_send step %q: no broker configured and eventbus not available", s.name)
116+
}
117+
118+
if err := eb.Publish(ctx, topic, envelope); err != nil {
119+
return nil, fmt.Errorf("dlq_send step %q: failed to publish to eventbus: %w", s.name, err)
120+
}
121+
122+
return &StepResult{Output: map[string]any{"dlq_sent": true, "topic": topic}}, nil
123+
}
124+
125+
// DLQReplayStep replays messages from a DLQ topic back to the original topic.
126+
// When used in a pipeline triggered by a DLQ consumer, pc.Current holds the DLQ
127+
// message (or a "messages" array for batch replay).
128+
type DLQReplayStep struct {
129+
name string
130+
dlqTopic string
131+
targetTopic string
132+
maxMessages int
133+
broker string
134+
app modular.Application
135+
tmpl *TemplateEngine
136+
}
137+
138+
// NewDLQReplayStepFactory returns a StepFactory that creates DLQReplayStep instances.
139+
func NewDLQReplayStepFactory() StepFactory {
140+
return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) {
141+
dlqTopic, _ := config["dlq_topic"].(string)
142+
if dlqTopic == "" {
143+
return nil, fmt.Errorf("dlq_replay step %q: 'dlq_topic' is required", name)
144+
}
145+
146+
targetTopic, _ := config["target_topic"].(string)
147+
if targetTopic == "" {
148+
return nil, fmt.Errorf("dlq_replay step %q: 'target_topic' is required", name)
149+
}
150+
151+
maxMessages := 100
152+
if v, ok := config["max_messages"]; ok {
153+
switch val := v.(type) {
154+
case int:
155+
maxMessages = val
156+
case float64:
157+
maxMessages = int(val)
158+
}
159+
}
160+
if maxMessages <= 0 {
161+
maxMessages = 100
162+
}
163+
164+
broker, _ := config["broker"].(string)
165+
166+
return &DLQReplayStep{
167+
name: name,
168+
dlqTopic: dlqTopic,
169+
targetTopic: targetTopic,
170+
maxMessages: maxMessages,
171+
broker: broker,
172+
app: app,
173+
tmpl: NewTemplateEngine(),
174+
}, nil
175+
}
176+
}
177+
178+
// Name returns the step name.
179+
func (s *DLQReplayStep) Name() string { return s.name }
180+
181+
// Execute replays messages from the DLQ to the target topic.
182+
func (s *DLQReplayStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) {
183+
resolvedTarget, err := s.tmpl.Resolve(s.targetTopic, pc)
184+
if err != nil {
185+
return nil, fmt.Errorf("dlq_replay step %q: failed to resolve target_topic: %w", s.name, err)
186+
}
187+
188+
messages := s.collectMessages(pc)
189+
if len(messages) > s.maxMessages {
190+
messages = messages[:s.maxMessages]
191+
}
192+
193+
if len(messages) == 0 {
194+
return &StepResult{Output: map[string]any{"replayed": 0, "target_topic": resolvedTarget}}, nil
195+
}
196+
197+
for i, msg := range messages {
198+
if err := s.publishMessage(ctx, resolvedTarget, msg); err != nil {
199+
return nil, fmt.Errorf("dlq_replay step %q: failed to replay message %d: %w", s.name, i, err)
200+
}
201+
}
202+
203+
return &StepResult{Output: map[string]any{
204+
"replayed": len(messages),
205+
"target_topic": resolvedTarget,
206+
"dlq_topic": s.dlqTopic,
207+
}}, nil
208+
}
209+
210+
// collectMessages gathers messages from the pipeline context.
211+
// Handles both batch ("messages" array) and single-message contexts.
212+
func (s *DLQReplayStep) collectMessages(pc *PipelineContext) []map[string]any {
213+
if msgs, ok := pc.Current["messages"]; ok {
214+
if msgSlice, ok := msgs.([]any); ok {
215+
result := make([]map[string]any, 0, len(msgSlice))
216+
for _, m := range msgSlice {
217+
if mMap, ok := m.(map[string]any); ok {
218+
if payload, ok := mMap["payload"].(map[string]any); ok {
219+
result = append(result, payload)
220+
} else {
221+
result = append(result, mMap)
222+
}
223+
}
224+
}
225+
return result
226+
}
227+
}
228+
229+
// Single DLQ envelope — extract original payload if present
230+
if payload, ok := pc.Current["payload"].(map[string]any); ok {
231+
return []map[string]any{payload}
232+
}
233+
234+
return []map[string]any{pc.Current}
235+
}
236+
237+
func (s *DLQReplayStep) publishMessage(ctx context.Context, topic string, payload map[string]any) error {
238+
if s.broker != "" {
239+
var broker MessageBroker
240+
if err := s.app.GetService(s.broker, &broker); err != nil {
241+
return fmt.Errorf("broker service %q not found: %w", s.broker, err)
242+
}
243+
data, err := json.Marshal(payload)
244+
if err != nil {
245+
return fmt.Errorf("failed to marshal payload: %w", err)
246+
}
247+
return broker.Producer().SendMessage(topic, data)
248+
}
249+
250+
var eb *eventbus.EventBusModule
251+
if err := s.app.GetService("eventbus.provider", &eb); err != nil || eb == nil {
252+
return fmt.Errorf("no broker configured and eventbus not available")
253+
}
254+
return eb.Publish(ctx, topic, payload)
255+
}

0 commit comments

Comments
 (0)