Skip to content

Commit 2dd4c08

Browse files
authored
Merge branch 'main' into copilot/support-urlencoded-body-parsing
2 parents 7009ffb + a5bf3f5 commit 2dd4c08

6 files changed

Lines changed: 792 additions & 22 deletions

File tree

engine.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,6 +583,13 @@ func (e *StdEngine) TriggerWorkflow(ctx context.Context, workflowType string, ac
583583
e.logger.Debug(fmt.Sprintf(" Result %s: %v", k, v))
584584
}
585585

586+
// If the caller stored a PipelineResultHolder in the context, populate it
587+
// so HTTP trigger handlers can read response_status/body/headers without
588+
// requiring the WorkflowEngine interface to return a result map.
589+
if holder, ok := ctx.Value(module.PipelineResultContextKey).(*module.PipelineResultHolder); ok && holder != nil {
590+
holder.Set(results)
591+
}
592+
586593
if e.eventEmitter != nil {
587594
e.eventEmitter.EmitWorkflowCompleted(ctx, workflowType, action, time.Since(startTime), results)
588595
}

engine_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,6 +1268,62 @@ func TestCanHandleTrigger_EventBus(t *testing.T) {
12681268
}
12691269
}
12701270

1271+
// TestEngine_TriggerWorkflow_PopulatesPipelineResultHolder verifies that
1272+
// TriggerWorkflow populates a *module.PipelineResultHolder stored in the context
1273+
// with the workflow handler's result map after successful execution.
1274+
func TestEngine_TriggerWorkflow_PopulatesPipelineResultHolder(t *testing.T) {
1275+
app := newMockApplication()
1276+
engine := NewStdEngine(app, app.Logger())
1277+
loadAllPlugins(t, engine)
1278+
1279+
handler := &errorMockWorkflowHandler{
1280+
mockWorkflowHandler: mockWorkflowHandler{
1281+
name: "holder-handler",
1282+
handlesFor: []string{"holder-wf"},
1283+
},
1284+
// returns a result map with response fields
1285+
}
1286+
engine.RegisterWorkflowHandler(handler)
1287+
1288+
holder := &module.PipelineResultHolder{}
1289+
ctx := context.WithValue(context.Background(), module.PipelineResultContextKey, holder)
1290+
1291+
err := engine.TriggerWorkflow(ctx, "holder-wf", "run", map[string]any{})
1292+
if err != nil {
1293+
t.Fatalf("TriggerWorkflow failed: %v", err)
1294+
}
1295+
1296+
got := holder.Get()
1297+
if got == nil {
1298+
t.Fatal("expected PipelineResultHolder to be populated, got nil")
1299+
}
1300+
if got["status"] != "ok" {
1301+
t.Errorf("expected result[status]='ok', got %v", got["status"])
1302+
}
1303+
}
1304+
1305+
// TestEngine_TriggerWorkflow_HolderAbsent verifies that TriggerWorkflow does not
1306+
// panic and succeeds normally when no PipelineResultHolder is in the context.
1307+
func TestEngine_TriggerWorkflow_HolderAbsent(t *testing.T) {
1308+
app := newMockApplication()
1309+
engine := NewStdEngine(app, app.Logger())
1310+
loadAllPlugins(t, engine)
1311+
1312+
handler := &errorMockWorkflowHandler{
1313+
mockWorkflowHandler: mockWorkflowHandler{
1314+
name: "no-holder-handler",
1315+
handlesFor: []string{"no-holder-wf"},
1316+
},
1317+
}
1318+
engine.RegisterWorkflowHandler(handler)
1319+
1320+
// No holder in context — should succeed without panicking.
1321+
err := engine.TriggerWorkflow(context.Background(), "no-holder-wf", "run", map[string]any{})
1322+
if err != nil {
1323+
t.Fatalf("TriggerWorkflow failed: %v", err)
1324+
}
1325+
}
1326+
12711327
// ============================================================================
12721328
// Tests for requires.plugins validation (Phase 4 - Engine Decomposition)
12731329
// ============================================================================

module/http_trigger.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"log"
1010
"maps"
1111
"net/http"
12+
"strconv"
13+
"strings"
1214

1315
"github.com/CrisisTextLine/modular"
1416
)
@@ -31,6 +33,129 @@ type httpReqContextKey struct{}
3133
// headers, path parameters, and the request body.
3234
var HTTPRequestContextKey = httpReqContextKey{}
3335

36+
// pipelineResultKey is the unexported type for the pipeline result context key.
37+
type pipelineResultKey struct{}
38+
39+
// PipelineResultContextKey is the context key used to capture pipeline execution
40+
// results from TriggerWorkflow. HTTP trigger handlers store a *PipelineResultHolder
41+
// in the context before calling TriggerWorkflow; the engine populates it with the
42+
// pipeline's result.Current map after execution. This lets the trigger apply
43+
// response_status/response_body/response_headers from the pipeline output when no
44+
// step wrote directly to the HTTP response writer.
45+
var PipelineResultContextKey = pipelineResultKey{}
46+
47+
// PipelineResultHolder is a mutable container used to pass pipeline execution
48+
// results back through the context from the engine to the HTTP trigger handler.
49+
type PipelineResultHolder struct {
50+
result map[string]any
51+
}
52+
53+
// Set stores the pipeline result in the holder.
54+
func (h *PipelineResultHolder) Set(result map[string]any) {
55+
h.result = result
56+
}
57+
58+
// Get returns the stored pipeline result, or nil if not set.
59+
func (h *PipelineResultHolder) Get() map[string]any {
60+
return h.result
61+
}
62+
63+
// coercePipelineStatus coerces common numeric/string types into an HTTP status
64+
// code. Pipeline steps may emit response_status as int, int64, float64 (common
65+
// after generic JSON decoding), json.Number, or a numeric string.
66+
func coercePipelineStatus(v any) (int, bool) {
67+
switch s := v.(type) {
68+
case int:
69+
return s, true
70+
case int64:
71+
status := int(s)
72+
if int64(status) != s {
73+
return 0, false
74+
}
75+
return status, true
76+
case float64:
77+
status := int(s)
78+
if float64(status) != s {
79+
return 0, false
80+
}
81+
return status, true
82+
case json.Number:
83+
i64, err := s.Int64()
84+
if err != nil {
85+
return 0, false
86+
}
87+
status := int(i64)
88+
if int64(status) != i64 {
89+
return 0, false
90+
}
91+
return status, true
92+
case string:
93+
n, err := strconv.Atoi(strings.TrimSpace(s))
94+
if err != nil {
95+
return 0, false
96+
}
97+
return n, true
98+
default:
99+
return 0, false
100+
}
101+
}
102+
103+
// applyPipelineHeaders writes response headers from common map/header shapes
104+
// that pipeline steps may emit for response_headers.
105+
func applyPipelineHeaders(w http.ResponseWriter, rawHeaders any) {
106+
switch headers := rawHeaders.(type) {
107+
case map[string]any:
108+
for k, v := range headers {
109+
switch hv := v.(type) {
110+
case string:
111+
w.Header().Set(k, hv)
112+
case []string:
113+
for _, sv := range hv {
114+
w.Header().Add(k, sv)
115+
}
116+
case []any:
117+
for _, sv := range hv {
118+
w.Header().Add(k, fmt.Sprint(sv))
119+
}
120+
default:
121+
w.Header().Set(k, fmt.Sprint(hv))
122+
}
123+
}
124+
case map[string]string:
125+
for k, v := range headers {
126+
w.Header().Set(k, v)
127+
}
128+
case http.Header:
129+
for k, vals := range headers {
130+
for _, v := range vals {
131+
w.Header().Add(k, v)
132+
}
133+
}
134+
}
135+
}
136+
137+
// writePipelineContextResponse checks the result map for response_status and,
138+
// if present, applies response_headers and writes the response. Returns true if
139+
// the response was written from the pipeline context fields.
140+
func writePipelineContextResponse(w http.ResponseWriter, result map[string]any) bool {
141+
rawStatus, ok := result["response_status"]
142+
if !ok {
143+
return false
144+
}
145+
status, ok := coercePipelineStatus(rawStatus)
146+
if !ok {
147+
return false
148+
}
149+
if rawHeaders, ok := result["response_headers"]; ok {
150+
applyPipelineHeaders(w, rawHeaders)
151+
}
152+
w.WriteHeader(status)
153+
if body, ok := result["response_body"].(string); ok {
154+
_, _ = w.Write([]byte(body)) //nolint:gosec // G705: body is pipeline step output explicitly set as response body
155+
}
156+
return true
157+
}
158+
34159
// trackedResponseWriter wraps http.ResponseWriter and tracks whether a response
35160
// body has been written, so the HTTP trigger can fall back to the generic
36161
// "workflow triggered" response only when the pipeline didn't write one.
@@ -267,6 +392,11 @@ func (t *HTTPTrigger) createHandler(route HTTPTriggerRoute) HTTPHandler {
267392
// to headers (e.g. Authorization), method, URL, and body.
268393
ctx = context.WithValue(ctx, HTTPRequestContextKey, r)
269394

395+
// Inject a result holder so the engine can pass the pipeline's result.Current
396+
// back to this handler without changing the WorkflowEngine interface.
397+
resultHolder := &PipelineResultHolder{}
398+
ctx = context.WithValue(ctx, PipelineResultContextKey, resultHolder)
399+
270400
// Extract data from the request to pass to the workflow.
271401
// Include method, path, and parsed body so pipelines have full
272402
// access to request context (consistent with CommandHandler).
@@ -316,6 +446,14 @@ func (t *HTTPTrigger) createHandler(route HTTPTriggerRoute) HTTPHandler {
316446
return
317447
}
318448

449+
// If the pipeline set response_status in its output (without writing
450+
// directly to the response writer), use those values to build the response.
451+
if result := resultHolder.Get(); result != nil {
452+
if writePipelineContextResponse(w, result) {
453+
return
454+
}
455+
}
456+
319457
// Fallback: return a generic accepted response when the pipeline doesn't
320458
// write its own HTTP response.
321459
w.Header().Set("Content-Type", "application/json")

0 commit comments

Comments
 (0)