diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index a233cf7e..eef415a3 100644 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -127,6 +127,7 @@ flowchart TD | `step.db_sync_partitions` | Ensures future partitions exist for a partitioned table | | `step.json_response` | Writes HTTP JSON response with custom status code and headers | | `step.raw_response` | Writes a raw HTTP response with arbitrary content type | +| `step.json_parse` | Parses a JSON string (or `[]byte`) in the pipeline context into a structured object | | `step.static_file` | Serves a pre-loaded file from disk as an HTTP response | | `step.workflow_call` | Invokes another workflow pipeline by name | | `step.validate_path_param` | Validates a URL path parameter against a set of rules | diff --git a/module/database.go b/module/database.go index 3ecdde67..a9ad31b1 100644 --- a/module/database.go +++ b/module/database.go @@ -236,9 +236,10 @@ func (w *WorkflowDatabase) Query(ctx context.Context, sqlStr string, args ...any row := make(map[string]any) for i, col := range columns { val := values[i] - // Convert byte slices to strings for readability + // Convert byte slices: try JSON parse first (handles PostgreSQL + // json/jsonb columns), fall back to string for non-JSON byte data. if b, ok := val.([]byte); ok { - row[col] = string(b) + row[col] = parseJSONBytesOrString(b) } else { row[col] = val } diff --git a/module/database_scan_helpers.go b/module/database_scan_helpers.go new file mode 100644 index 00000000..dda45992 --- /dev/null +++ b/module/database_scan_helpers.go @@ -0,0 +1,41 @@ +package module + +import ( + "bytes" + "encoding/json" +) + +// parseJSONBytesOrString attempts to unmarshal b as JSON. If successful the +// parsed Go value is returned (map[string]any, []any, string, float64, bool, +// or nil). This transparently handles PostgreSQL json/jsonb columns, which the +// pgx driver delivers as raw JSON bytes rather than pre-typed Go values. +// +// A cheap leading-byte pre-check is applied first so that binary blobs (e.g. +// PostgreSQL bytea) skip the full JSON parser entirely and fall back to +// string conversion without incurring unnecessary CPU overhead. +// +// If b is not valid JSON (e.g. PostgreSQL bytea binary data), string(b) is +// returned so that the existing string-fallback behaviour is preserved. +func parseJSONBytesOrString(b []byte) any { + if len(b) == 0 { + return string(b) + } + // Quick check: JSON must start with one of these characters (after optional + // whitespace). Anything else is definitely not JSON and we avoid calling the + // full decoder on large binary blobs. + trimmed := bytes.TrimLeft(b, " \t\r\n") + if len(trimmed) == 0 { + return string(b) + } + first := trimmed[0] + if first != '{' && first != '[' && first != '"' && + first != 't' && first != 'f' && first != 'n' && + first != '-' && (first < '0' || first > '9') { + return string(b) + } + var v any + if err := json.Unmarshal(b, &v); err == nil { + return v + } + return string(b) +} diff --git a/module/pipeline_step_db_query.go b/module/pipeline_step_db_query.go index 9de097c1..cd94ff64 100644 --- a/module/pipeline_step_db_query.go +++ b/module/pipeline_step_db_query.go @@ -193,9 +193,11 @@ func (s *DBQueryStep) Execute(_ context.Context, pc *PipelineContext) (*StepResu row := make(map[string]any, len(columns)) for i, col := range columns { val := values[i] - // Convert []byte to string for readability + // Convert []byte: try JSON parse first (handles PostgreSQL json/jsonb + // column types returned by the pgx driver as raw JSON bytes), then + // fall back to string conversion for non-JSON byte data (e.g. bytea). if b, ok := val.([]byte); ok { - row[col] = string(b) + row[col] = parseJSONBytesOrString(b) } else { row[col] = val } diff --git a/module/pipeline_step_db_query_cached.go b/module/pipeline_step_db_query_cached.go index beaf36dd..fe1fad91 100644 --- a/module/pipeline_step_db_query_cached.go +++ b/module/pipeline_step_db_query_cached.go @@ -249,7 +249,7 @@ func (s *DBQueryCachedStep) runQuery(ctx context.Context, pc *PipelineContext) ( } val := values[i] if b, ok := val.([]byte); ok { - row[col] = string(b) + row[col] = parseJSONBytesOrString(b) } else { row[col] = val } @@ -290,7 +290,7 @@ func (s *DBQueryCachedStep) runQuery(ctx context.Context, pc *PipelineContext) ( } val := values[i] if b, ok := val.([]byte); ok { - output[col] = string(b) + output[col] = parseJSONBytesOrString(b) } else { output[col] = val } diff --git a/module/pipeline_step_db_query_test.go b/module/pipeline_step_db_query_test.go index e704faf4..21b43f34 100644 --- a/module/pipeline_step_db_query_test.go +++ b/module/pipeline_step_db_query_test.go @@ -370,3 +370,85 @@ func TestDBQueryStep_EmptyResult(t *testing.T) { t.Errorf("expected count=0, got %d", count) } } + +// TestParseJSONBytesOrString exercises the helper used by the db_query scanner +// to transparently parse PostgreSQL json/jsonb column bytes. +func TestParseJSONBytesOrString(t *testing.T) { + tests := []struct { + name string + input []byte + want any + }{ + { + name: "json object", + input: []byte(`{"id":1,"type":"follow-ups"}`), + want: map[string]any{"id": float64(1), "type": "follow-ups"}, + }, + { + name: "json array", + input: []byte(`[{"id":1},{"id":2}]`), + want: []any{map[string]any{"id": float64(1)}, map[string]any{"id": float64(2)}}, + }, + { + name: "json string", + input: []byte(`"hello"`), + want: "hello", + }, + { + name: "json number", + input: []byte(`42`), + want: float64(42), + }, + { + name: "json bool", + input: []byte(`true`), + want: true, + }, + { + name: "json null", + input: []byte(`null`), + want: nil, + }, + { + name: "binary / non-json bytes", + input: []byte{0x89, 0x50, 0x4e, 0x47}, // PNG magic bytes + want: string([]byte{0x89, 0x50, 0x4e, 0x47}), + }, + { + name: "empty bytes", + input: []byte{}, + want: "", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + got := parseJSONBytesOrString(tc.input) + // Use JSON round-trip for equality check to handle map/slice cases. + switch expected := tc.want.(type) { + case map[string]any: + m, ok := got.(map[string]any) + if !ok { + t.Fatalf("expected map[string]any, got %T", got) + } + for k, v := range expected { + if m[k] != v { + t.Errorf("key %q: expected %v, got %v", k, v, m[k]) + } + } + case []any: + sl, ok := got.([]any) + if !ok { + t.Fatalf("expected []any, got %T", got) + } + if len(sl) != len(expected) { + t.Fatalf("expected len %d, got %d", len(expected), len(sl)) + } + default: + if got != tc.want { + t.Errorf("expected %v (%T), got %v (%T)", tc.want, tc.want, got, got) + } + } + }) + } +} diff --git a/module/pipeline_step_json_parse.go b/module/pipeline_step_json_parse.go new file mode 100644 index 00000000..a06f7a5a --- /dev/null +++ b/module/pipeline_step_json_parse.go @@ -0,0 +1,82 @@ +package module + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/GoCodeAlone/modular" +) + +// JSONParseStep parses a JSON string value from the pipeline context into a +// structured Go value (map, slice, etc.) and stores the result as step output. +// +// This is useful when a pipeline step (e.g. step.db_query against a legacy +// driver, or step.http_call) returns a JSON column/field as a raw string rather +// than as a pre-parsed Go type. It is the explicit counterpart to the automatic +// json/jsonb detection that step.db_query performs for the pgx driver. +// +// Configuration: +// +// source: "steps.fetch.row.json_column" # dot-path to the JSON string value (required) +// target: "parsed_data" # output key name (optional, defaults to "value") +type JSONParseStep struct { + name string + source string + target string +} + +// NewJSONParseStepFactory returns a StepFactory that creates JSONParseStep instances. +func NewJSONParseStepFactory() StepFactory { + return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) { + source, _ := config["source"].(string) + if source == "" { + return nil, fmt.Errorf("json_parse step %q: 'source' is required", name) + } + + target, _ := config["target"].(string) + if target == "" { + target = "value" + } + + return &JSONParseStep{ + name: name, + source: source, + target: target, + }, nil + } +} + +// Name returns the step name. +func (s *JSONParseStep) Name() string { return s.name } + +// Execute resolves the source path, parses the value as JSON if it is a string, +// and stores the result under the configured target key. +func (s *JSONParseStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) { + raw := resolveBodyFrom(s.source, pc) + if raw == nil { + return nil, fmt.Errorf("json_parse step %q: source %q not found or resolved to nil", s.name, s.source) + } + + var parsed any + switch v := raw.(type) { + case string: + if err := json.Unmarshal([]byte(v), &parsed); err != nil { + return nil, fmt.Errorf("json_parse step %q: failed to parse JSON from %q: %w", s.name, s.source, err) + } + case []byte: + if err := json.Unmarshal(v, &parsed); err != nil { + return nil, fmt.Errorf("json_parse step %q: failed to parse JSON bytes from %q: %w", s.name, s.source, err) + } + default: + // Value is already a structured type (map, slice, number, bool, nil). + // Pass it through unchanged so that pipelines are idempotent when the + // upstream step already returns a parsed value (e.g. after the db_query + // fix lands, json_parse is a no-op for json/jsonb columns). + parsed = raw + } + + return &StepResult{Output: map[string]any{ + s.target: parsed, + }}, nil +} diff --git a/module/pipeline_step_json_parse_test.go b/module/pipeline_step_json_parse_test.go new file mode 100644 index 00000000..418c01a4 --- /dev/null +++ b/module/pipeline_step_json_parse_test.go @@ -0,0 +1,216 @@ +package module + +import ( + "context" + "testing" +) + +func TestJSONParseStep_StringJSON(t *testing.T) { + factory := NewJSONParseStepFactory() + step, err := factory("parse-json", map[string]any{ + "source": "steps.fetch.row.data", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + pc.MergeStepOutput("fetch", map[string]any{ + "row": map[string]any{ + "data": `[{"id":1,"type":"follow-ups"}]`, + }, + }) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + parsed, ok := result.Output["value"].([]any) + if !ok { + t.Fatalf("expected []any, got %T: %v", result.Output["value"], result.Output["value"]) + } + if len(parsed) != 1 { + t.Fatalf("expected 1 element, got %d", len(parsed)) + } + obj, ok := parsed[0].(map[string]any) + if !ok { + t.Fatalf("expected map[string]any element, got %T", parsed[0]) + } + // JSON numbers unmarshal to float64 by default. + if obj["id"] != float64(1) { + t.Errorf("expected id=1, got %v", obj["id"]) + } + if obj["type"] != "follow-ups" { + t.Errorf("expected type='follow-ups', got %v", obj["type"]) + } +} + +func TestJSONParseStep_StringJSONObject(t *testing.T) { + factory := NewJSONParseStepFactory() + step, err := factory("parse-obj", map[string]any{ + "source": "steps.fetch.row.meta", + "target": "parsed_meta", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + pc.MergeStepOutput("fetch", map[string]any{ + "row": map[string]any{ + "meta": `{"total":42,"page":1}`, + }, + }) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + parsed, ok := result.Output["parsed_meta"].(map[string]any) + if !ok { + t.Fatalf("expected map[string]any, got %T", result.Output["parsed_meta"]) + } + if parsed["total"] != float64(42) { + t.Errorf("expected total=42, got %v", parsed["total"]) + } + if parsed["page"] != float64(1) { + t.Errorf("expected page=1, got %v", parsed["page"]) + } +} + +func TestJSONParseStep_ByteSliceJSON(t *testing.T) { + factory := NewJSONParseStepFactory() + step, err := factory("parse-bytes", map[string]any{ + "source": "steps.fetch.row.jsonb_col", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + pc.MergeStepOutput("fetch", map[string]any{ + "row": map[string]any{ + "jsonb_col": []byte(`{"key":"value"}`), + }, + }) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + parsed, ok := result.Output["value"].(map[string]any) + if !ok { + t.Fatalf("expected map[string]any, got %T", result.Output["value"]) + } + if parsed["key"] != "value" { + t.Errorf("expected key='value', got %v", parsed["key"]) + } +} + +func TestJSONParseStep_AlreadyParsed(t *testing.T) { + // When the upstream step already returns a structured value (map/slice), + // json_parse should pass it through unchanged. + factory := NewJSONParseStepFactory() + step, err := factory("no-op-parse", map[string]any{ + "source": "steps.fetch.row.data", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + original := map[string]any{"id": 1, "name": "test"} + pc := NewPipelineContext(nil, nil) + pc.MergeStepOutput("fetch", map[string]any{ + "row": map[string]any{ + "data": original, + }, + }) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + parsed, ok := result.Output["value"].(map[string]any) + if !ok { + t.Fatalf("expected map[string]any, got %T", result.Output["value"]) + } + if parsed["name"] != "test" { + t.Errorf("expected name='test', got %v", parsed["name"]) + } +} + +func TestJSONParseStep_InvalidJSON(t *testing.T) { + factory := NewJSONParseStepFactory() + step, err := factory("bad-json", map[string]any{ + "source": "steps.fetch.row.data", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + pc.MergeStepOutput("fetch", map[string]any{ + "row": map[string]any{ + "data": "not valid json {{{", + }, + }) + + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for invalid JSON") + } +} + +func TestJSONParseStep_MissingSource(t *testing.T) { + factory := NewJSONParseStepFactory() + _, err := factory("no-source", map[string]any{}, nil) + if err == nil { + t.Fatal("expected error for missing source") + } +} + +func TestJSONParseStep_UnresolvablePath(t *testing.T) { + // A typo in source should fail fast rather than silently producing nil. + factory := NewJSONParseStepFactory() + step, err := factory("bad-path", map[string]any{ + "source": "steps.nonexistent.field", + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error when source path resolves to nil") + } +} + +func TestJSONParseStep_DefaultTargetKey(t *testing.T) { + factory := NewJSONParseStepFactory() + step, err := factory("default-target", map[string]any{ + "source": "steps.fetch.row.data", + // no "target" config — should default to "value" + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + pc.MergeStepOutput("fetch", map[string]any{ + "row": map[string]any{"data": `{"ok":true}`}, + }) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if _, hasValue := result.Output["value"]; !hasValue { + t.Errorf("expected 'value' key in output, got keys: %v", result.Output) + } +} diff --git a/module/platform_do_database.go b/module/platform_do_database.go index 8b550f33..ea44c7a5 100644 --- a/module/platform_do_database.go +++ b/module/platform_do_database.go @@ -13,12 +13,12 @@ import ( type DODatabaseState struct { ID string `json:"id"` Name string `json:"name"` - Engine string `json:"engine"` // pg, mysql, redis, mongodb, kafka + Engine string `json:"engine"` // pg, mysql, redis, mongodb, kafka Version string `json:"version"` - Size string `json:"size"` // e.g. db-s-1vcpu-1gb + Size string `json:"size"` // e.g. db-s-1vcpu-1gb Region string `json:"region"` NumNodes int `json:"numNodes"` - Status string `json:"status"` // pending, online, resizing, migrating, error + Status string `json:"status"` // pending, online, resizing, migrating, error Host string `json:"host"` Port int `json:"port"` DatabaseName string `json:"databaseName"` diff --git a/module/platform_do_database_test.go b/module/platform_do_database_test.go index 52e36f98..b1a28dcd 100644 --- a/module/platform_do_database_test.go +++ b/module/platform_do_database_test.go @@ -6,13 +6,13 @@ func TestPlatformDODatabase_MockBackend(t *testing.T) { m := &PlatformDODatabase{ name: "test-db", config: map[string]any{ - "provider": "mock", - "engine": "pg", - "version": "16", - "size": "db-s-1vcpu-1gb", - "region": "nyc1", + "provider": "mock", + "engine": "pg", + "version": "16", + "size": "db-s-1vcpu-1gb", + "region": "nyc1", "num_nodes": 1, - "name": "test-db", + "name": "test-db", }, state: &DODatabaseState{ Name: "test-db", diff --git a/module/scan_provider_test.go b/module/scan_provider_test.go index d6b83439..187e6d86 100644 --- a/module/scan_provider_test.go +++ b/module/scan_provider_test.go @@ -58,7 +58,7 @@ func (a *scanMockApp) GetService(name string, target any) error { return nil } -func (a *scanMockApp) RegisterService(name string, svc any) error { a.services[name] = svc; return nil } +func (a *scanMockApp) RegisterService(name string, svc any) error { a.services[name] = svc; return nil } func (a *scanMockApp) RegisterConfigSection(string, modular.ConfigProvider) {} func (a *scanMockApp) GetConfigSection(string) (modular.ConfigProvider, error) { return nil, nil @@ -67,7 +67,7 @@ func (a *scanMockApp) ConfigSections() map[string]modular.ConfigProvider { retur func (a *scanMockApp) Logger() modular.Logger { return nil } func (a *scanMockApp) SetLogger(modular.Logger) {} func (a *scanMockApp) ConfigProvider() modular.ConfigProvider { return nil } -func (a *scanMockApp) SvcRegistry() modular.ServiceRegistry { return a.services } +func (a *scanMockApp) SvcRegistry() modular.ServiceRegistry { return a.services } func (a *scanMockApp) RegisterModule(modular.Module) {} func (a *scanMockApp) Init() error { return nil } func (a *scanMockApp) Start() error { return nil } @@ -83,9 +83,9 @@ func (a *scanMockApp) GetServiceEntry(string) (*modular.ServiceRegistryEntry, bo func (a *scanMockApp) GetServicesByInterface(_ reflect.Type) []*modular.ServiceRegistryEntry { return nil } -func (a *scanMockApp) GetModule(string) modular.Module { return nil } -func (a *scanMockApp) GetAllModules() map[string]modular.Module { return nil } -func (a *scanMockApp) StartTime() time.Time { return time.Time{} } +func (a *scanMockApp) GetModule(string) modular.Module { return nil } +func (a *scanMockApp) GetAllModules() map[string]modular.Module { return nil } +func (a *scanMockApp) StartTime() time.Time { return time.Time{} } func (a *scanMockApp) OnConfigLoaded(func(modular.Application) error) {} func newScanApp(provider SecurityScannerProvider) *scanMockApp { diff --git a/plugins/pipelinesteps/plugin.go b/plugins/pipelinesteps/plugin.go index 8c0b38f0..2d80b8b0 100644 --- a/plugins/pipelinesteps/plugin.go +++ b/plugins/pipelinesteps/plugin.go @@ -1,9 +1,9 @@ // Package pipelinesteps provides a plugin that registers generic pipeline step // types: validate, transform, conditional, set, log, delegate, jq, publish, // http_call, http_proxy, request_parse, db_query, db_exec, db_query_cached, json_response, -// raw_response, static_file, validate_path_param, validate_pagination, validate_request_body, -// foreach, webhook_verify, base64_decode, ui_scaffold, ui_scaffold_analyze, -// dlq_send, dlq_replay, retry_with_backoff, circuit_breaker (wrapping), +// raw_response, json_parse, static_file, validate_path_param, validate_pagination, +// validate_request_body, foreach, webhook_verify, base64_decode, ui_scaffold, +// ui_scaffold_analyze, dlq_send, dlq_replay, retry_with_backoff, circuit_breaker (wrapping), // s3_upload, auth_validate, authz_check, token_revoke, sandbox_exec. // It also provides the PipelineWorkflowHandler for composable pipelines. package pipelinesteps @@ -70,6 +70,7 @@ func New() *Plugin { "step.db_sync_partitions", "step.json_response", "step.raw_response", + "step.json_parse", "step.static_file", "step.workflow_call", "step.validate_path_param", @@ -141,6 +142,7 @@ func (p *Plugin) StepFactories() map[string]plugin.StepFactory { "step.db_sync_partitions": wrapStepFactory(module.NewDBSyncPartitionsStepFactory()), "step.json_response": wrapStepFactory(module.NewJSONResponseStepFactory()), "step.raw_response": wrapStepFactory(module.NewRawResponseStepFactory()), + "step.json_parse": wrapStepFactory(module.NewJSONParseStepFactory()), "step.static_file": wrapStepFactory(module.NewStaticFileStepFactory()), "step.validate_path_param": wrapStepFactory(module.NewValidatePathParamStepFactory()), "step.validate_pagination": wrapStepFactory(module.NewValidatePaginationStepFactory()), diff --git a/plugins/pipelinesteps/plugin_test.go b/plugins/pipelinesteps/plugin_test.go index 052462e2..4f26339c 100644 --- a/plugins/pipelinesteps/plugin_test.go +++ b/plugins/pipelinesteps/plugin_test.go @@ -49,6 +49,7 @@ func TestStepFactories(t *testing.T) { "step.db_sync_partitions", "step.json_response", "step.raw_response", + "step.json_parse", "step.static_file", "step.validate_path_param", "step.validate_pagination",