Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ flowchart TD
| `step.db_sync_partitions` | Ensures future partitions exist for a partitioned table |
| `step.json_response` | Writes HTTP JSON response with custom status code and headers |
| `step.raw_response` | Writes a raw HTTP response with arbitrary content type |
| `step.json_parse` | Parses a JSON string (or `[]byte`) in the pipeline context into a structured object |
| `step.static_file` | Serves a pre-loaded file from disk as an HTTP response |
| `step.workflow_call` | Invokes another workflow pipeline by name |
| `step.validate_path_param` | Validates a URL path parameter against a set of rules |
Expand Down
5 changes: 3 additions & 2 deletions module/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,10 @@ func (w *WorkflowDatabase) Query(ctx context.Context, sqlStr string, args ...any
row := make(map[string]any)
for i, col := range columns {
val := values[i]
// Convert byte slices to strings for readability
// Convert byte slices: try JSON parse first (handles PostgreSQL
// json/jsonb columns), fall back to string for non-JSON byte data.
if b, ok := val.([]byte); ok {
row[col] = string(b)
row[col] = parseJSONBytesOrString(b)
} else {
row[col] = val
}
Expand Down
41 changes: 41 additions & 0 deletions module/database_scan_helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package module

import (
"bytes"
"encoding/json"
)

// parseJSONBytesOrString attempts to unmarshal b as JSON. If successful the
// parsed Go value is returned (map[string]any, []any, string, float64, bool,
// or nil). This transparently handles PostgreSQL json/jsonb columns, which the
// pgx driver delivers as raw JSON bytes rather than pre-typed Go values.
//
// A cheap leading-byte pre-check is applied first so that binary blobs (e.g.
// PostgreSQL bytea) skip the full JSON parser entirely and fall back to
// string conversion without incurring unnecessary CPU overhead.
//
// If b is not valid JSON (e.g. PostgreSQL bytea binary data), string(b) is
// returned so that the existing string-fallback behaviour is preserved.
func parseJSONBytesOrString(b []byte) any {
if len(b) == 0 {
return string(b)
}
// Quick check: JSON must start with one of these characters (after optional
// whitespace). Anything else is definitely not JSON and we avoid calling the
// full decoder on large binary blobs.
trimmed := bytes.TrimLeft(b, " \t\r\n")
if len(trimmed) == 0 {
return string(b)
}
first := trimmed[0]
if first != '{' && first != '[' && first != '"' &&
first != 't' && first != 'f' && first != 'n' &&
first != '-' && (first < '0' || first > '9') {
return string(b)
}
var v any
if err := json.Unmarshal(b, &v); err == nil {
return v
}
return string(b)
}
6 changes: 4 additions & 2 deletions module/pipeline_step_db_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,11 @@ func (s *DBQueryStep) Execute(_ context.Context, pc *PipelineContext) (*StepResu
row := make(map[string]any, len(columns))
for i, col := range columns {
val := values[i]
// Convert []byte to string for readability
// Convert []byte: try JSON parse first (handles PostgreSQL json/jsonb
// column types returned by the pgx driver as raw JSON bytes), then
// fall back to string conversion for non-JSON byte data (e.g. bytea).
if b, ok := val.([]byte); ok {
row[col] = string(b)
row[col] = parseJSONBytesOrString(b)
} else {
row[col] = val
}
Expand Down
4 changes: 2 additions & 2 deletions module/pipeline_step_db_query_cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (s *DBQueryCachedStep) runQuery(ctx context.Context, pc *PipelineContext) (
}
val := values[i]
if b, ok := val.([]byte); ok {
row[col] = string(b)
row[col] = parseJSONBytesOrString(b)
} else {
row[col] = val
}
Expand Down Expand Up @@ -290,7 +290,7 @@ func (s *DBQueryCachedStep) runQuery(ctx context.Context, pc *PipelineContext) (
}
val := values[i]
if b, ok := val.([]byte); ok {
output[col] = string(b)
output[col] = parseJSONBytesOrString(b)
} else {
output[col] = val
}
Expand Down
82 changes: 82 additions & 0 deletions module/pipeline_step_db_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,85 @@ func TestDBQueryStep_EmptyResult(t *testing.T) {
t.Errorf("expected count=0, got %d", count)
}
}

// TestParseJSONBytesOrString exercises the helper used by the db_query scanner
// to transparently parse PostgreSQL json/jsonb column bytes.
func TestParseJSONBytesOrString(t *testing.T) {
tests := []struct {
name string
input []byte
want any
}{
{
name: "json object",
input: []byte(`{"id":1,"type":"follow-ups"}`),
want: map[string]any{"id": float64(1), "type": "follow-ups"},
},
{
name: "json array",
input: []byte(`[{"id":1},{"id":2}]`),
want: []any{map[string]any{"id": float64(1)}, map[string]any{"id": float64(2)}},
},
{
name: "json string",
input: []byte(`"hello"`),
want: "hello",
},
{
name: "json number",
input: []byte(`42`),
want: float64(42),
},
{
name: "json bool",
input: []byte(`true`),
want: true,
},
{
name: "json null",
input: []byte(`null`),
want: nil,
},
{
name: "binary / non-json bytes",
input: []byte{0x89, 0x50, 0x4e, 0x47}, // PNG magic bytes
want: string([]byte{0x89, 0x50, 0x4e, 0x47}),
},
{
name: "empty bytes",
input: []byte{},
want: "",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := parseJSONBytesOrString(tc.input)
// Use JSON round-trip for equality check to handle map/slice cases.
switch expected := tc.want.(type) {
case map[string]any:
m, ok := got.(map[string]any)
if !ok {
t.Fatalf("expected map[string]any, got %T", got)
}
for k, v := range expected {
if m[k] != v {
t.Errorf("key %q: expected %v, got %v", k, v, m[k])
}
}
case []any:
sl, ok := got.([]any)
if !ok {
t.Fatalf("expected []any, got %T", got)
}
if len(sl) != len(expected) {
t.Fatalf("expected len %d, got %d", len(expected), len(sl))
}
default:
if got != tc.want {
t.Errorf("expected %v (%T), got %v (%T)", tc.want, tc.want, got, got)
}
}
})
}
}
82 changes: 82 additions & 0 deletions module/pipeline_step_json_parse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package module

import (
"context"
"encoding/json"
"fmt"

"github.com/GoCodeAlone/modular"
)

// JSONParseStep parses a JSON string value from the pipeline context into a
// structured Go value (map, slice, etc.) and stores the result as step output.
//
// This is useful when a pipeline step (e.g. step.db_query against a legacy
// driver, or step.http_call) returns a JSON column/field as a raw string rather
// than as a pre-parsed Go type. It is the explicit counterpart to the automatic
// json/jsonb detection that step.db_query performs for the pgx driver.
//
// Configuration:
//
// source: "steps.fetch.row.json_column" # dot-path to the JSON string value (required)
// target: "parsed_data" # output key name (optional, defaults to "value")
type JSONParseStep struct {
name string
source string
target string
}

// NewJSONParseStepFactory returns a StepFactory that creates JSONParseStep instances.
func NewJSONParseStepFactory() StepFactory {
return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) {
source, _ := config["source"].(string)
if source == "" {
return nil, fmt.Errorf("json_parse step %q: 'source' is required", name)
}

target, _ := config["target"].(string)
if target == "" {
target = "value"
}

return &JSONParseStep{
name: name,
source: source,
target: target,
}, nil
}
}

// Name returns the step name.
func (s *JSONParseStep) Name() string { return s.name }

// Execute resolves the source path, parses the value as JSON if it is a string,
// and stores the result under the configured target key.
func (s *JSONParseStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) {
raw := resolveBodyFrom(s.source, pc)
Copy link

Copilot AI Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json_parse resolves the source via resolveBodyFrom, which returns nil for missing/unresolvable paths. That means a typo in source will silently produce {target: nil} and the step will appear to succeed. Consider using a strict resolver (e.g. build a data map like JQStep.resolveInput and call resolveDottedPath) or otherwise detecting “path not found” and returning an error so misconfigurations fail fast.

Suggested change
raw := resolveBodyFrom(s.source, pc)
raw := resolveBodyFrom(s.source, pc)
if raw == nil {
return nil, fmt.Errorf("json_parse step %q: source %q not found or resolved to nil", s.name, s.source)
}

Copilot uses AI. Check for mistakes.
if raw == nil {
return nil, fmt.Errorf("json_parse step %q: source %q not found or resolved to nil", s.name, s.source)
}

var parsed any
switch v := raw.(type) {
case string:
if err := json.Unmarshal([]byte(v), &parsed); err != nil {
return nil, fmt.Errorf("json_parse step %q: failed to parse JSON from %q: %w", s.name, s.source, err)
}
case []byte:
if err := json.Unmarshal(v, &parsed); err != nil {
return nil, fmt.Errorf("json_parse step %q: failed to parse JSON bytes from %q: %w", s.name, s.source, err)
}
default:
// Value is already a structured type (map, slice, number, bool, nil).
// Pass it through unchanged so that pipelines are idempotent when the
// upstream step already returns a parsed value (e.g. after the db_query
// fix lands, json_parse is a no-op for json/jsonb columns).
parsed = raw
}

return &StepResult{Output: map[string]any{
s.target: parsed,
}}, nil
}
Loading
Loading