diff --git a/module/pipeline_step_json_response.go b/module/pipeline_step_json_response.go index 012372b7..59804c4d 100644 --- a/module/pipeline_step_json_response.go +++ b/module/pipeline_step_json_response.go @@ -118,11 +118,15 @@ func (s *JSONResponseStep) resolveResponseBody(pc *PipelineContext) any { return resolveBodyFrom(s.bodyFrom, pc) } if s.body != nil { - resolved, err := s.tmpl.ResolveMap(s.body, pc) - if err != nil { - return s.body // fallback to unresolved + result := make(map[string]any, len(s.body)) + for k, v := range s.body { + resolved, err := s.resolveBodyValue(v, pc) + if err != nil { + return s.body // fallback to unresolved + } + result[k] = resolved } - return resolved + return result } if s.bodyRaw != nil { return s.bodyRaw @@ -130,6 +134,55 @@ func (s *JSONResponseStep) resolveResponseBody(pc *PipelineContext) any { return nil } +// resolveBodyValue resolves a single body value, supporting: +// - `_from` references that inject raw step output values +// - nested maps and slices +// - template strings resolved via the TemplateEngine. +// +// `_from` is treated as a special directive only when it is the sole key in a map, +// e.g. `{"_from": "steps.fetch.rows"}`. This keeps the semantics simple and avoids +// ambiguity: the entire value is replaced with the referenced data. +// +// As a consequence, `_from` cannot be combined with other fields or template +// expressions in the same map node. Configuration authors can still mix raw +// injections and templated fields by using `_from` on a sibling field in the +// parent object instead. +func (s *JSONResponseStep) resolveBodyValue(v any, pc *PipelineContext) (any, error) { + switch val := v.(type) { + case map[string]any: + // Check for _from reference, used only when it is the single key: + // {"_from": "steps.fetch.rows"}. Combining `_from` with other keys in + // the same map is intentionally not supported. + if from, ok := val["_from"].(string); ok && len(val) == 1 { + return resolveBodyFrom(from, pc), nil + } + // Recurse into nested map + result := make(map[string]any, len(val)) + for k, item := range val { + resolved, err := s.resolveBodyValue(item, pc) + if err != nil { + return nil, fmt.Errorf("field %q: %w", k, err) + } + result[k] = resolved + } + return result, nil + case []any: + result := make([]any, len(val)) + for i, item := range val { + resolved, err := s.resolveBodyValue(item, pc) + if err != nil { + return nil, err + } + result[i] = resolved + } + return result, nil + case string: + return s.tmpl.Resolve(val, pc) + default: + return v, nil + } +} + // resolveBodyFrom resolves a dotted path like "steps.get-company.row" from the // pipeline context. It looks in StepOutputs first (for "steps.X.Y" paths), // then in Current. diff --git a/module/pipeline_step_json_response_test.go b/module/pipeline_step_json_response_test.go index 4cbaf25f..bf603489 100644 --- a/module/pipeline_step_json_response_test.go +++ b/module/pipeline_step_json_response_test.go @@ -241,6 +241,217 @@ func TestJSONResponseStep_BodyFromRows(t *testing.T) { } } +func TestJSONResponseStep_BodyFromRef(t *testing.T) { + factory := NewJSONResponseStepFactory() + step, err := factory("from-ref", map[string]any{ + "status": 200, + "body": map[string]any{ + "data": map[string]any{ + "_from": "steps.list-companies.rows", + }, + "total": map[string]any{ + "_from": "steps.list-companies.count", + }, + }, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + recorder := httptest.NewRecorder() + pc := NewPipelineContext(nil, map[string]any{ + "_http_response_writer": recorder, + }) + pc.MergeStepOutput("list-companies", map[string]any{ + "rows": []map[string]any{ + {"id": "c1", "name": "Acme"}, + {"id": "c2", "name": "Beta"}, + }, + "count": 2, + }) + + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + var body map[string]any + if err := json.NewDecoder(recorder.Body).Decode(&body); err != nil { + t.Fatalf("decode error: %v", err) + } + + rows, ok := body["data"].([]any) + if !ok { + t.Fatalf("expected data to be []any, got %T", body["data"]) + } + if len(rows) != 2 { + t.Errorf("expected 2 rows, got %d", len(rows)) + } + + // total should be a number (JSON numbers decode as float64) + total, ok := body["total"].(float64) + if !ok { + t.Fatalf("expected total to be a number, got %T (%v)", body["total"], body["total"]) + } + if total != 2 { + t.Errorf("expected total=2, got %v", total) + } +} + +func TestJSONResponseStep_BodyFromRefComposite(t *testing.T) { + factory := NewJSONResponseStepFactory() + step, err := factory("composite-response", map[string]any{ + "status": 200, + "body": map[string]any{ + "data": map[string]any{ + "_from": "steps.fetch.rows", + }, + "meta": map[string]any{ + "total": map[string]any{ + "_from": "steps.fetch.count", + }, + "page": 1, + }, + }, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + recorder := httptest.NewRecorder() + pc := NewPipelineContext(nil, map[string]any{ + "_http_response_writer": recorder, + }) + pc.MergeStepOutput("fetch", map[string]any{ + "rows": []map[string]any{ + {"id": "r1", "name": "Row One"}, + }, + "count": 42, + }) + + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + var body map[string]any + if err := json.NewDecoder(recorder.Body).Decode(&body); err != nil { + t.Fatalf("decode error: %v", err) + } + + rows, ok := body["data"].([]any) + if !ok { + t.Fatalf("expected data to be []any, got %T", body["data"]) + } + if len(rows) != 1 { + t.Errorf("expected 1 row, got %d", len(rows)) + } + + meta, ok := body["meta"].(map[string]any) + if !ok { + t.Fatalf("expected meta to be map, got %T", body["meta"]) + } + if meta["total"] != float64(42) { + t.Errorf("expected meta.total=42, got %v", meta["total"]) + } + if meta["page"] != float64(1) { + t.Errorf("expected meta.page=1, got %v", meta["page"]) + } +} + +func TestJSONResponseStep_BodyFromRefMissingPath(t *testing.T) { + factory := NewJSONResponseStepFactory() + step, err := factory("missing-ref", map[string]any{ + "status": 200, + "body": map[string]any{ + "data": map[string]any{ + "_from": "steps.nonexistent.rows", + }, + }, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + recorder := httptest.NewRecorder() + pc := NewPipelineContext(nil, map[string]any{ + "_http_response_writer": recorder, + }) + + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + var body map[string]any + if err := json.NewDecoder(recorder.Body).Decode(&body); err != nil { + t.Fatalf("decode error: %v", err) + } + // Missing paths should resolve to nil; the key may be absent or null. + _ = body["data"] +} + +func TestJSONResponseStep_BodyFromRefInArray(t *testing.T) { + factory := NewJSONResponseStepFactory() + step, err := factory("array-from-ref", map[string]any{ + "status": 200, + "body": map[string]any{ + "items": []any{ + map[string]any{"_from": "steps.first.data"}, + map[string]any{"_from": "steps.second.data"}, + }, + }, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + recorder := httptest.NewRecorder() + pc := NewPipelineContext(nil, map[string]any{ + "_http_response_writer": recorder, + }) + pc.MergeStepOutput("first", map[string]any{ + "data": map[string]any{"id": "a1", "label": "Alpha"}, + }) + pc.MergeStepOutput("second", map[string]any{ + "data": map[string]any{"id": "b2", "label": "Beta"}, + }) + + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + var body map[string]any + if err := json.NewDecoder(recorder.Body).Decode(&body); err != nil { + t.Fatalf("decode error: %v", err) + } + + items, ok := body["items"].([]any) + if !ok { + t.Fatalf("expected items to be []any, got %T", body["items"]) + } + if len(items) != 2 { + t.Fatalf("expected 2 items, got %d", len(items)) + } + + first, ok := items[0].(map[string]any) + if !ok { + t.Fatalf("expected items[0] to be map, got %T", items[0]) + } + if first["id"] != "a1" { + t.Errorf("expected items[0].id='a1', got %v", first["id"]) + } + + second, ok := items[1].(map[string]any) + if !ok { + t.Fatalf("expected items[1] to be map, got %T", items[1]) + } + if second["id"] != "b2" { + t.Errorf("expected items[1].id='b2', got %v", second["id"]) + } +} + func TestJSONResponseStep_DefaultStatus(t *testing.T) { factory := NewJSONResponseStepFactory() step, err := factory("default-status", map[string]any{