Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 57 additions & 4 deletions module/pipeline_step_json_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,71 @@ func (s *JSONResponseStep) resolveResponseBody(pc *PipelineContext) any {
return resolveBodyFrom(s.bodyFrom, pc)
}
if s.body != nil {
resolved, err := s.tmpl.ResolveMap(s.body, pc)
if err != nil {
return s.body // fallback to unresolved
result := make(map[string]any, len(s.body))
for k, v := range s.body {
resolved, err := s.resolveBodyValue(v, pc)
if err != nil {
return s.body // fallback to unresolved
}
result[k] = resolved
}
return resolved
return result
}
if s.bodyRaw != nil {
return s.bodyRaw
}
return nil
}

// resolveBodyValue resolves a single body value, supporting:
// - `_from` references that inject raw step output values
// - nested maps and slices
// - template strings resolved via the TemplateEngine.
//
// `_from` is treated as a special directive only when it is the sole key in a map,
// e.g. `{"_from": "steps.fetch.rows"}`. This keeps the semantics simple and avoids
// ambiguity: the entire value is replaced with the referenced data.
//
// As a consequence, `_from` cannot be combined with other fields or template
// expressions in the same map node. Configuration authors can still mix raw
// injections and templated fields by using `_from` on a sibling field in the
// parent object instead.
func (s *JSONResponseStep) resolveBodyValue(v any, pc *PipelineContext) (any, error) {
switch val := v.(type) {
case map[string]any:
// Check for _from reference, used only when it is the single key:
// {"_from": "steps.fetch.rows"}. Combining `_from` with other keys in
// the same map is intentionally not supported.
if from, ok := val["_from"].(string); ok && len(val) == 1 {
return resolveBodyFrom(from, pc), nil
}
// Recurse into nested map
result := make(map[string]any, len(val))
for k, item := range val {
resolved, err := s.resolveBodyValue(item, pc)
if err != nil {
return nil, fmt.Errorf("field %q: %w", k, err)
}
result[k] = resolved
}
return result, nil
case []any:
result := make([]any, len(val))
for i, item := range val {
resolved, err := s.resolveBodyValue(item, pc)
if err != nil {
return nil, err
}
result[i] = resolved
}
return result, nil
case string:
return s.tmpl.Resolve(val, pc)
default:
return v, nil
}
}

// resolveBodyFrom resolves a dotted path like "steps.get-company.row" from the
// pipeline context. It looks in StepOutputs first (for "steps.X.Y" paths),
// then in Current.
Expand Down
211 changes: 211 additions & 0 deletions module/pipeline_step_json_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,217 @@ func TestJSONResponseStep_BodyFromRows(t *testing.T) {
}
}

func TestJSONResponseStep_BodyFromRef(t *testing.T) {
factory := NewJSONResponseStepFactory()
step, err := factory("from-ref", map[string]any{
"status": 200,
"body": map[string]any{
"data": map[string]any{
"_from": "steps.list-companies.rows",
},
"total": map[string]any{
"_from": "steps.list-companies.count",
},
},
}, nil)
if err != nil {
t.Fatalf("factory error: %v", err)
}

recorder := httptest.NewRecorder()
pc := NewPipelineContext(nil, map[string]any{
"_http_response_writer": recorder,
})
pc.MergeStepOutput("list-companies", map[string]any{
"rows": []map[string]any{
{"id": "c1", "name": "Acme"},
{"id": "c2", "name": "Beta"},
},
"count": 2,
})

_, err = step.Execute(context.Background(), pc)
if err != nil {
t.Fatalf("execute error: %v", err)
}

var body map[string]any
if err := json.NewDecoder(recorder.Body).Decode(&body); err != nil {
t.Fatalf("decode error: %v", err)
}

rows, ok := body["data"].([]any)
if !ok {
t.Fatalf("expected data to be []any, got %T", body["data"])
}
if len(rows) != 2 {
t.Errorf("expected 2 rows, got %d", len(rows))
}

// total should be a number (JSON numbers decode as float64)
total, ok := body["total"].(float64)
if !ok {
t.Fatalf("expected total to be a number, got %T (%v)", body["total"], body["total"])
}
if total != 2 {
t.Errorf("expected total=2, got %v", total)
}
}

func TestJSONResponseStep_BodyFromRefComposite(t *testing.T) {
factory := NewJSONResponseStepFactory()
step, err := factory("composite-response", map[string]any{
"status": 200,
"body": map[string]any{
"data": map[string]any{
"_from": "steps.fetch.rows",
},
"meta": map[string]any{
"total": map[string]any{
"_from": "steps.fetch.count",
},
"page": 1,
},
},
}, nil)
if err != nil {
t.Fatalf("factory error: %v", err)
}

recorder := httptest.NewRecorder()
pc := NewPipelineContext(nil, map[string]any{
"_http_response_writer": recorder,
})
pc.MergeStepOutput("fetch", map[string]any{
"rows": []map[string]any{
{"id": "r1", "name": "Row One"},
},
"count": 42,
})

_, err = step.Execute(context.Background(), pc)
if err != nil {
t.Fatalf("execute error: %v", err)
}

var body map[string]any
if err := json.NewDecoder(recorder.Body).Decode(&body); err != nil {
t.Fatalf("decode error: %v", err)
}

rows, ok := body["data"].([]any)
if !ok {
t.Fatalf("expected data to be []any, got %T", body["data"])
}
if len(rows) != 1 {
t.Errorf("expected 1 row, got %d", len(rows))
}

meta, ok := body["meta"].(map[string]any)
if !ok {
t.Fatalf("expected meta to be map, got %T", body["meta"])
}
if meta["total"] != float64(42) {
t.Errorf("expected meta.total=42, got %v", meta["total"])
}
if meta["page"] != float64(1) {
t.Errorf("expected meta.page=1, got %v", meta["page"])
}
}

func TestJSONResponseStep_BodyFromRefMissingPath(t *testing.T) {
factory := NewJSONResponseStepFactory()
step, err := factory("missing-ref", map[string]any{
"status": 200,
"body": map[string]any{
"data": map[string]any{
"_from": "steps.nonexistent.rows",
},
},
}, nil)
if err != nil {
t.Fatalf("factory error: %v", err)
}

recorder := httptest.NewRecorder()
pc := NewPipelineContext(nil, map[string]any{
"_http_response_writer": recorder,
})

_, err = step.Execute(context.Background(), pc)
if err != nil {
t.Fatalf("execute error: %v", err)
}

var body map[string]any
if err := json.NewDecoder(recorder.Body).Decode(&body); err != nil {
t.Fatalf("decode error: %v", err)
}
// Missing paths should resolve to nil; the key may be absent or null.
_ = body["data"]
}

func TestJSONResponseStep_BodyFromRefInArray(t *testing.T) {
factory := NewJSONResponseStepFactory()
step, err := factory("array-from-ref", map[string]any{
"status": 200,
"body": map[string]any{
"items": []any{
map[string]any{"_from": "steps.first.data"},
map[string]any{"_from": "steps.second.data"},
},
},
}, nil)
if err != nil {
t.Fatalf("factory error: %v", err)
}

recorder := httptest.NewRecorder()
pc := NewPipelineContext(nil, map[string]any{
"_http_response_writer": recorder,
})
pc.MergeStepOutput("first", map[string]any{
"data": map[string]any{"id": "a1", "label": "Alpha"},
})
pc.MergeStepOutput("second", map[string]any{
"data": map[string]any{"id": "b2", "label": "Beta"},
})

_, err = step.Execute(context.Background(), pc)
if err != nil {
t.Fatalf("execute error: %v", err)
}

var body map[string]any
if err := json.NewDecoder(recorder.Body).Decode(&body); err != nil {
t.Fatalf("decode error: %v", err)
}

items, ok := body["items"].([]any)
if !ok {
t.Fatalf("expected items to be []any, got %T", body["items"])
}
if len(items) != 2 {
t.Fatalf("expected 2 items, got %d", len(items))
}

first, ok := items[0].(map[string]any)
if !ok {
t.Fatalf("expected items[0] to be map, got %T", items[0])
}
if first["id"] != "a1" {
t.Errorf("expected items[0].id='a1', got %v", first["id"])
}

second, ok := items[1].(map[string]any)
if !ok {
t.Fatalf("expected items[1] to be map, got %T", items[1])
}
if second["id"] != "b2" {
t.Errorf("expected items[1].id='b2', got %v", second["id"])
}
}

func TestJSONResponseStep_DefaultStatus(t *testing.T) {
factory := NewJSONResponseStepFactory()
step, err := factory("default-status", map[string]any{
Expand Down
Loading