Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,13 @@ func (e *StdEngine) TriggerWorkflow(ctx context.Context, workflowType string, ac
e.logger.Debug(fmt.Sprintf(" Result %s: %v", k, v))
}

// If the caller stored a PipelineResultHolder in the context, populate it
// so HTTP trigger handlers can read response_status/body/headers without
// requiring the WorkflowEngine interface to return a result map.
if holder, ok := ctx.Value(module.PipelineResultContextKey).(*module.PipelineResultHolder); ok && holder != nil {
holder.Set(results)
}
Comment on lines +586 to +591
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds a new behavior path where TriggerWorkflow populates a PipelineResultHolder from the context. There are existing unit tests for StdEngine.TriggerWorkflow in engine_test.go, but none exercising this holder population; adding a test that asserts the holder is set (and remains nil when absent) would help prevent regressions.

Copilot uses AI. Check for mistakes.

if e.eventEmitter != nil {
e.eventEmitter.EmitWorkflowCompleted(ctx, workflowType, action, time.Since(startTime), results)
}
Expand Down
56 changes: 56 additions & 0 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,62 @@ func TestCanHandleTrigger_EventBus(t *testing.T) {
}
}

// TestEngine_TriggerWorkflow_PopulatesPipelineResultHolder verifies that
// TriggerWorkflow populates a *module.PipelineResultHolder stored in the context
// with the workflow handler's result map after successful execution.
func TestEngine_TriggerWorkflow_PopulatesPipelineResultHolder(t *testing.T) {
app := newMockApplication()
engine := NewStdEngine(app, app.Logger())
loadAllPlugins(t, engine)

handler := &errorMockWorkflowHandler{
mockWorkflowHandler: mockWorkflowHandler{
name: "holder-handler",
handlesFor: []string{"holder-wf"},
},
// returns a result map with response fields
}
engine.RegisterWorkflowHandler(handler)

holder := &module.PipelineResultHolder{}
ctx := context.WithValue(context.Background(), module.PipelineResultContextKey, holder)

err := engine.TriggerWorkflow(ctx, "holder-wf", "run", map[string]any{})
if err != nil {
t.Fatalf("TriggerWorkflow failed: %v", err)
}

got := holder.Get()
if got == nil {
t.Fatal("expected PipelineResultHolder to be populated, got nil")
}
if got["status"] != "ok" {
t.Errorf("expected result[status]='ok', got %v", got["status"])
}
}

// TestEngine_TriggerWorkflow_HolderAbsent verifies that TriggerWorkflow does not
// panic and succeeds normally when no PipelineResultHolder is in the context.
func TestEngine_TriggerWorkflow_HolderAbsent(t *testing.T) {
app := newMockApplication()
engine := NewStdEngine(app, app.Logger())
loadAllPlugins(t, engine)

handler := &errorMockWorkflowHandler{
mockWorkflowHandler: mockWorkflowHandler{
name: "no-holder-handler",
handlesFor: []string{"no-holder-wf"},
},
}
engine.RegisterWorkflowHandler(handler)

// No holder in context — should succeed without panicking.
err := engine.TriggerWorkflow(context.Background(), "no-holder-wf", "run", map[string]any{})
if err != nil {
t.Fatalf("TriggerWorkflow failed: %v", err)
}
}

// ============================================================================
// Tests for requires.plugins validation (Phase 4 - Engine Decomposition)
// ============================================================================
Expand Down
138 changes: 138 additions & 0 deletions module/http_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"log"
"maps"
"net/http"
"strconv"
"strings"

"github.com/CrisisTextLine/modular"
)
Expand All @@ -31,6 +33,129 @@ type httpReqContextKey struct{}
// headers, path parameters, and the request body.
var HTTPRequestContextKey = httpReqContextKey{}

// pipelineResultKey is the unexported type for the pipeline result context key.
type pipelineResultKey struct{}

// PipelineResultContextKey is the context key used to capture pipeline execution
// results from TriggerWorkflow. HTTP trigger handlers store a *PipelineResultHolder
// in the context before calling TriggerWorkflow; the engine populates it with the
// pipeline's result.Current map after execution. This lets the trigger apply
// response_status/response_body/response_headers from the pipeline output when no
// step wrote directly to the HTTP response writer.
var PipelineResultContextKey = pipelineResultKey{}

// PipelineResultHolder is a mutable container used to pass pipeline execution
// results back through the context from the engine to the HTTP trigger handler.
type PipelineResultHolder struct {
result map[string]any
}

// Set stores the pipeline result in the holder.
func (h *PipelineResultHolder) Set(result map[string]any) {
h.result = result
}

// Get returns the stored pipeline result, or nil if not set.
func (h *PipelineResultHolder) Get() map[string]any {
return h.result
}

// coercePipelineStatus coerces common numeric/string types into an HTTP status
// code. Pipeline steps may emit response_status as int, int64, float64 (common
// after generic JSON decoding), json.Number, or a numeric string.
func coercePipelineStatus(v any) (int, bool) {
switch s := v.(type) {
case int:
return s, true
case int64:
status := int(s)
if int64(status) != s {
return 0, false
}
return status, true
case float64:
status := int(s)
if float64(status) != s {
return 0, false
}
return status, true
case json.Number:
i64, err := s.Int64()
if err != nil {
return 0, false
}
status := int(i64)
if int64(status) != i64 {
return 0, false
}
return status, true
case string:
n, err := strconv.Atoi(strings.TrimSpace(s))
if err != nil {
return 0, false
}
return n, true
default:
return 0, false
}
}

// applyPipelineHeaders writes response headers from common map/header shapes
// that pipeline steps may emit for response_headers.
func applyPipelineHeaders(w http.ResponseWriter, rawHeaders any) {
switch headers := rawHeaders.(type) {
case map[string]any:
for k, v := range headers {
switch hv := v.(type) {
case string:
w.Header().Set(k, hv)
case []string:
for _, sv := range hv {
w.Header().Add(k, sv)
}
case []any:
for _, sv := range hv {
w.Header().Add(k, fmt.Sprint(sv))
}
default:
w.Header().Set(k, fmt.Sprint(hv))
}
}
case map[string]string:
for k, v := range headers {
w.Header().Set(k, v)
}
case http.Header:
for k, vals := range headers {
for _, v := range vals {
w.Header().Add(k, v)
}
}
}
}

// writePipelineContextResponse checks the result map for response_status and,
// if present, applies response_headers and writes the response. Returns true if
// the response was written from the pipeline context fields.
func writePipelineContextResponse(w http.ResponseWriter, result map[string]any) bool {
rawStatus, ok := result["response_status"]
if !ok {
return false
}
status, ok := coercePipelineStatus(rawStatus)
if !ok {
return false
}
if rawHeaders, ok := result["response_headers"]; ok {
applyPipelineHeaders(w, rawHeaders)
}
w.WriteHeader(status)
if body, ok := result["response_body"].(string); ok {
_, _ = w.Write([]byte(body)) //nolint:gosec // G705: body is pipeline step output explicitly set as response body
}
return true
}

// trackedResponseWriter wraps http.ResponseWriter and tracks whether a response
// body has been written, so the HTTP trigger can fall back to the generic
// "workflow triggered" response only when the pipeline didn't write one.
Expand Down Expand Up @@ -267,6 +392,11 @@ func (t *HTTPTrigger) createHandler(route HTTPTriggerRoute) HTTPHandler {
// to headers (e.g. Authorization), method, URL, and body.
ctx = context.WithValue(ctx, HTTPRequestContextKey, r)

// Inject a result holder so the engine can pass the pipeline's result.Current
// back to this handler without changing the WorkflowEngine interface.
resultHolder := &PipelineResultHolder{}
ctx = context.WithValue(ctx, PipelineResultContextKey, resultHolder)

// Extract data from the request to pass to the workflow.
// Include method, path, and parsed body so pipelines have full
// access to request context (consistent with CommandHandler).
Expand Down Expand Up @@ -316,6 +446,14 @@ func (t *HTTPTrigger) createHandler(route HTTPTriggerRoute) HTTPHandler {
return
}

// If the pipeline set response_status in its output (without writing
// directly to the response writer), use those values to build the response.
if result := resultHolder.Get(); result != nil {
if writePipelineContextResponse(w, result) {
return
}
}

// Fallback: return a generic accepted response when the pipeline doesn't
// write its own HTTP response.
w.Header().Set("Content-Type", "application/json")
Expand Down
Loading
Loading