Skip to content

Commit 006b368

Browse files
Copilotintel352
andauthored
Fix double-encoding of PostgreSQL json/jsonb columns; add step.json_parse (#297)
* Initial plan * Fix json/jsonb double-encoding and add step.json_parse Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> * Address reviewer feedback: shared helper, pre-check, nil-check on source Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> Co-authored-by: Jonathan Langevin <codingsloth@pm.me>
1 parent 858204f commit 006b368

13 files changed

Lines changed: 451 additions & 23 deletions

DOCUMENTATION.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ flowchart TD
127127
| `step.db_sync_partitions` | Ensures future partitions exist for a partitioned table |
128128
| `step.json_response` | Writes HTTP JSON response with custom status code and headers |
129129
| `step.raw_response` | Writes a raw HTTP response with arbitrary content type |
130+
| `step.json_parse` | Parses a JSON string (or `[]byte`) in the pipeline context into a structured object |
130131
| `step.static_file` | Serves a pre-loaded file from disk as an HTTP response |
131132
| `step.workflow_call` | Invokes another workflow pipeline by name |
132133
| `step.validate_path_param` | Validates a URL path parameter against a set of rules |

module/database.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,10 @@ func (w *WorkflowDatabase) Query(ctx context.Context, sqlStr string, args ...any
236236
row := make(map[string]any)
237237
for i, col := range columns {
238238
val := values[i]
239-
// Convert byte slices to strings for readability
239+
// Convert byte slices: try JSON parse first (handles PostgreSQL
240+
// json/jsonb columns), fall back to string for non-JSON byte data.
240241
if b, ok := val.([]byte); ok {
241-
row[col] = string(b)
242+
row[col] = parseJSONBytesOrString(b)
242243
} else {
243244
row[col] = val
244245
}

module/database_scan_helpers.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package module
2+
3+
import (
4+
"bytes"
5+
"encoding/json"
6+
)
7+
8+
// parseJSONBytesOrString attempts to unmarshal b as JSON. If successful the
9+
// parsed Go value is returned (map[string]any, []any, string, float64, bool,
10+
// or nil). This transparently handles PostgreSQL json/jsonb columns, which the
11+
// pgx driver delivers as raw JSON bytes rather than pre-typed Go values.
12+
//
13+
// A cheap leading-byte pre-check is applied first so that binary blobs (e.g.
14+
// PostgreSQL bytea) skip the full JSON parser entirely and fall back to
15+
// string conversion without incurring unnecessary CPU overhead.
16+
//
17+
// If b is not valid JSON (e.g. PostgreSQL bytea binary data), string(b) is
18+
// returned so that the existing string-fallback behaviour is preserved.
19+
func parseJSONBytesOrString(b []byte) any {
20+
if len(b) == 0 {
21+
return string(b)
22+
}
23+
// Quick check: JSON must start with one of these characters (after optional
24+
// whitespace). Anything else is definitely not JSON and we avoid calling the
25+
// full decoder on large binary blobs.
26+
trimmed := bytes.TrimLeft(b, " \t\r\n")
27+
if len(trimmed) == 0 {
28+
return string(b)
29+
}
30+
first := trimmed[0]
31+
if first != '{' && first != '[' && first != '"' &&
32+
first != 't' && first != 'f' && first != 'n' &&
33+
first != '-' && (first < '0' || first > '9') {
34+
return string(b)
35+
}
36+
var v any
37+
if err := json.Unmarshal(b, &v); err == nil {
38+
return v
39+
}
40+
return string(b)
41+
}

module/pipeline_step_db_query.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,11 @@ func (s *DBQueryStep) Execute(_ context.Context, pc *PipelineContext) (*StepResu
193193
row := make(map[string]any, len(columns))
194194
for i, col := range columns {
195195
val := values[i]
196-
// Convert []byte to string for readability
196+
// Convert []byte: try JSON parse first (handles PostgreSQL json/jsonb
197+
// column types returned by the pgx driver as raw JSON bytes), then
198+
// fall back to string conversion for non-JSON byte data (e.g. bytea).
197199
if b, ok := val.([]byte); ok {
198-
row[col] = string(b)
200+
row[col] = parseJSONBytesOrString(b)
199201
} else {
200202
row[col] = val
201203
}

module/pipeline_step_db_query_cached.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ func (s *DBQueryCachedStep) runQuery(ctx context.Context, pc *PipelineContext) (
249249
}
250250
val := values[i]
251251
if b, ok := val.([]byte); ok {
252-
row[col] = string(b)
252+
row[col] = parseJSONBytesOrString(b)
253253
} else {
254254
row[col] = val
255255
}
@@ -290,7 +290,7 @@ func (s *DBQueryCachedStep) runQuery(ctx context.Context, pc *PipelineContext) (
290290
}
291291
val := values[i]
292292
if b, ok := val.([]byte); ok {
293-
output[col] = string(b)
293+
output[col] = parseJSONBytesOrString(b)
294294
} else {
295295
output[col] = val
296296
}

module/pipeline_step_db_query_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,3 +370,85 @@ func TestDBQueryStep_EmptyResult(t *testing.T) {
370370
t.Errorf("expected count=0, got %d", count)
371371
}
372372
}
373+
374+
// TestParseJSONBytesOrString exercises the helper used by the db_query scanner
375+
// to transparently parse PostgreSQL json/jsonb column bytes.
376+
func TestParseJSONBytesOrString(t *testing.T) {
377+
tests := []struct {
378+
name string
379+
input []byte
380+
want any
381+
}{
382+
{
383+
name: "json object",
384+
input: []byte(`{"id":1,"type":"follow-ups"}`),
385+
want: map[string]any{"id": float64(1), "type": "follow-ups"},
386+
},
387+
{
388+
name: "json array",
389+
input: []byte(`[{"id":1},{"id":2}]`),
390+
want: []any{map[string]any{"id": float64(1)}, map[string]any{"id": float64(2)}},
391+
},
392+
{
393+
name: "json string",
394+
input: []byte(`"hello"`),
395+
want: "hello",
396+
},
397+
{
398+
name: "json number",
399+
input: []byte(`42`),
400+
want: float64(42),
401+
},
402+
{
403+
name: "json bool",
404+
input: []byte(`true`),
405+
want: true,
406+
},
407+
{
408+
name: "json null",
409+
input: []byte(`null`),
410+
want: nil,
411+
},
412+
{
413+
name: "binary / non-json bytes",
414+
input: []byte{0x89, 0x50, 0x4e, 0x47}, // PNG magic bytes
415+
want: string([]byte{0x89, 0x50, 0x4e, 0x47}),
416+
},
417+
{
418+
name: "empty bytes",
419+
input: []byte{},
420+
want: "",
421+
},
422+
}
423+
424+
for _, tc := range tests {
425+
t.Run(tc.name, func(t *testing.T) {
426+
got := parseJSONBytesOrString(tc.input)
427+
// Use JSON round-trip for equality check to handle map/slice cases.
428+
switch expected := tc.want.(type) {
429+
case map[string]any:
430+
m, ok := got.(map[string]any)
431+
if !ok {
432+
t.Fatalf("expected map[string]any, got %T", got)
433+
}
434+
for k, v := range expected {
435+
if m[k] != v {
436+
t.Errorf("key %q: expected %v, got %v", k, v, m[k])
437+
}
438+
}
439+
case []any:
440+
sl, ok := got.([]any)
441+
if !ok {
442+
t.Fatalf("expected []any, got %T", got)
443+
}
444+
if len(sl) != len(expected) {
445+
t.Fatalf("expected len %d, got %d", len(expected), len(sl))
446+
}
447+
default:
448+
if got != tc.want {
449+
t.Errorf("expected %v (%T), got %v (%T)", tc.want, tc.want, got, got)
450+
}
451+
}
452+
})
453+
}
454+
}

module/pipeline_step_json_parse.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package module
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
8+
"github.com/GoCodeAlone/modular"
9+
)
10+
11+
// JSONParseStep parses a JSON string value from the pipeline context into a
12+
// structured Go value (map, slice, etc.) and stores the result as step output.
13+
//
14+
// This is useful when a pipeline step (e.g. step.db_query against a legacy
15+
// driver, or step.http_call) returns a JSON column/field as a raw string rather
16+
// than as a pre-parsed Go type. It is the explicit counterpart to the automatic
17+
// json/jsonb detection that step.db_query performs for the pgx driver.
18+
//
19+
// Configuration:
20+
//
21+
// source: "steps.fetch.row.json_column" # dot-path to the JSON string value (required)
22+
// target: "parsed_data" # output key name (optional, defaults to "value")
23+
type JSONParseStep struct {
24+
name string
25+
source string
26+
target string
27+
}
28+
29+
// NewJSONParseStepFactory returns a StepFactory that creates JSONParseStep instances.
30+
func NewJSONParseStepFactory() StepFactory {
31+
return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) {
32+
source, _ := config["source"].(string)
33+
if source == "" {
34+
return nil, fmt.Errorf("json_parse step %q: 'source' is required", name)
35+
}
36+
37+
target, _ := config["target"].(string)
38+
if target == "" {
39+
target = "value"
40+
}
41+
42+
return &JSONParseStep{
43+
name: name,
44+
source: source,
45+
target: target,
46+
}, nil
47+
}
48+
}
49+
50+
// Name returns the step name.
51+
func (s *JSONParseStep) Name() string { return s.name }
52+
53+
// Execute resolves the source path, parses the value as JSON if it is a string,
54+
// and stores the result under the configured target key.
55+
func (s *JSONParseStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) {
56+
raw := resolveBodyFrom(s.source, pc)
57+
if raw == nil {
58+
return nil, fmt.Errorf("json_parse step %q: source %q not found or resolved to nil", s.name, s.source)
59+
}
60+
61+
var parsed any
62+
switch v := raw.(type) {
63+
case string:
64+
if err := json.Unmarshal([]byte(v), &parsed); err != nil {
65+
return nil, fmt.Errorf("json_parse step %q: failed to parse JSON from %q: %w", s.name, s.source, err)
66+
}
67+
case []byte:
68+
if err := json.Unmarshal(v, &parsed); err != nil {
69+
return nil, fmt.Errorf("json_parse step %q: failed to parse JSON bytes from %q: %w", s.name, s.source, err)
70+
}
71+
default:
72+
// Value is already a structured type (map, slice, number, bool, nil).
73+
// Pass it through unchanged so that pipelines are idempotent when the
74+
// upstream step already returns a parsed value (e.g. after the db_query
75+
// fix lands, json_parse is a no-op for json/jsonb columns).
76+
parsed = raw
77+
}
78+
79+
return &StepResult{Output: map[string]any{
80+
s.target: parsed,
81+
}}, nil
82+
}

0 commit comments

Comments
 (0)