From 9fd2347de67099d7066ed1bfa9b8805a82408937 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Mar 2026 06:32:44 +0000 Subject: [PATCH 1/4] Initial plan From 2549b72efcfe9fbd4271082cdb712e3cad784c0b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Mar 2026 06:44:23 +0000 Subject: [PATCH 2/4] step.db_query_cached: add mode field for single/list result support Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- module/cache_redis.go | 2 +- module/http_server.go | 8 +- module/kafka_broker.go | 28 +- module/pipeline_step_db_query_cached.go | 41 ++- module/pipeline_step_db_query_cached_test.go | 282 +++++++++++++++++-- module/pipeline_step_db_query_test.go | 2 +- module/pipeline_step_sandbox_exec.go | 16 +- module/pipeline_step_token_revoke_test.go | 1 - schema/module_schema.go | 5 +- schema/schema.go | 2 +- schema/snippets_export.go | 18 +- 11 files changed, 340 insertions(+), 65 deletions(-) diff --git a/module/cache_redis.go b/module/cache_redis.go index 8d750d7e..31281490 100644 --- a/module/cache_redis.go +++ b/module/cache_redis.go @@ -31,7 +31,7 @@ type RedisClient interface { // RedisCacheConfig holds configuration for the cache.redis module. type RedisCacheConfig struct { Address string - Password string //nolint:gosec // G117: config struct field, not a hardcoded secret + Password string //nolint:gosec // G117: config struct field, not a hardcoded secret DB int Prefix string DefaultTTL time.Duration diff --git a/module/http_server.go b/module/http_server.go index f562becc..c58b045a 100644 --- a/module/http_server.go +++ b/module/http_server.go @@ -15,11 +15,11 @@ import ( // HTTPServerTLSConfig holds TLS configuration for the HTTP server. type HTTPServerTLSConfig struct { - Mode string `yaml:"mode" json:"mode"` // manual | autocert | disabled - Manual tlsutil.TLSConfig `yaml:"manual" json:"manual"` + Mode string `yaml:"mode" json:"mode"` // manual | autocert | disabled + Manual tlsutil.TLSConfig `yaml:"manual" json:"manual"` Autocert tlsutil.AutocertConfig `yaml:"autocert" json:"autocert"` - ClientCAFile string `yaml:"client_ca_file" json:"client_ca_file"` - ClientAuth string `yaml:"client_auth" json:"client_auth"` // require | request | none + ClientCAFile string `yaml:"client_ca_file" json:"client_ca_file"` + ClientAuth string `yaml:"client_auth" json:"client_auth"` // require | request | none } // StandardHTTPServer implements the HTTPServer interface and modular.Module interfaces diff --git a/module/kafka_broker.go b/module/kafka_broker.go index 4cdfd92e..f81ba345 100644 --- a/module/kafka_broker.go +++ b/module/kafka_broker.go @@ -7,8 +7,8 @@ import ( "sync" "github.com/CrisisTextLine/modular" - "github.com/IBM/sarama" "github.com/GoCodeAlone/workflow/pkg/tlsutil" + "github.com/IBM/sarama" ) // KafkaSASLConfig holds SASL authentication configuration for Kafka. @@ -26,19 +26,19 @@ type KafkaTLSConfig struct { // KafkaBroker implements the MessageBroker interface using Apache Kafka via Sarama. type KafkaBroker struct { - name string - brokers []string - groupID string - producer sarama.SyncProducer - consumerGroup sarama.ConsumerGroup - handlers map[string]MessageHandler - mu sync.RWMutex - kafkaProducer *kafkaProducerAdapter - kafkaConsumer *kafkaConsumerAdapter - cancelFunc context.CancelFunc - logger modular.Logger - healthy bool - healthMsg string + name string + brokers []string + groupID string + producer sarama.SyncProducer + consumerGroup sarama.ConsumerGroup + handlers map[string]MessageHandler + mu sync.RWMutex + kafkaProducer *kafkaProducerAdapter + kafkaConsumer *kafkaConsumerAdapter + cancelFunc context.CancelFunc + logger modular.Logger + healthy bool + healthMsg string encryptor *FieldEncryptor fieldProtector *ProtectedFieldManager tlsCfg KafkaTLSConfig diff --git a/module/pipeline_step_db_query_cached.go b/module/pipeline_step_db_query_cached.go index 2b40fe55..1896fde2 100644 --- a/module/pipeline_step_db_query_cached.go +++ b/module/pipeline_step_db_query_cached.go @@ -24,6 +24,7 @@ type DBQueryCachedStep struct { database string query string params []string + mode string // "single" or "list" cacheKey string cacheTTL time.Duration scanFields []string @@ -80,6 +81,14 @@ func NewDBQueryCachedStepFactory() StepFactory { } } + mode, _ := config["mode"].(string) + if mode == "" { + mode = "single" + } + if mode != "list" && mode != "single" { + return nil, fmt.Errorf("db_query_cached step %q: mode must be 'list' or 'single', got %q", name, mode) + } + var scanFields []string if sf, ok := config["scan_fields"]; ok { if list, ok := sf.([]any); ok { @@ -96,6 +105,7 @@ func NewDBQueryCachedStepFactory() StepFactory { database: database, query: query, params: params, + mode: mode, cacheKey: cacheKey, cacheTTL: cacheTTL, scanFields: scanFields, @@ -219,7 +229,7 @@ func (s *DBQueryCachedStep) runQuery(ctx context.Context, pc *PipelineContext) ( fieldSet[f] = true } - output := make(map[string]any) + var results []map[string]any for rows.Next() { values := make([]any, len(columns)) valuePtrs := make([]any, len(columns)) @@ -231,25 +241,46 @@ func (s *DBQueryCachedStep) runQuery(ctx context.Context, pc *PipelineContext) ( return nil, fmt.Errorf("db_query_cached step %q: scan failed: %w", s.name, err) } + row := make(map[string]any, len(columns)) for i, col := range columns { if len(fieldSet) > 0 && !fieldSet[col] { continue } val := values[i] if b, ok := val.([]byte); ok { - output[col] = string(b) + row[col] = string(b) } else { - output[col] = val + row[col] = val } } - // Only take the first row - break + results = append(results, row) + if s.mode == "single" { + // Only take the first row + break + } } if err := rows.Err(); err != nil { return nil, fmt.Errorf("db_query_cached step %q: row iteration error: %w", s.name, err) } + output := make(map[string]any) + if s.mode == "single" { + if len(results) > 0 { + output["row"] = results[0] + output["found"] = true + } else { + output["row"] = map[string]any{} + output["found"] = false + } + } else { + if results == nil { + results = []map[string]any{} + } + output["rows"] = results + output["count"] = len(results) + } + return output, nil } diff --git a/module/pipeline_step_db_query_cached_test.go b/module/pipeline_step_db_query_cached_test.go index ab21aebf..3aa465fd 100644 --- a/module/pipeline_step_db_query_cached_test.go +++ b/module/pipeline_step_db_query_cached_test.go @@ -31,8 +31,12 @@ func TestDBQueryCachedStep_CacheMiss(t *testing.T) { if result.Output["cache_hit"] != false { t.Errorf("expected cache_hit=false on first call, got %v", result.Output["cache_hit"]) } - if result.Output["name"] != "Acme Corp" { - t.Errorf("expected name='Acme Corp', got %v", result.Output["name"]) + row, ok := result.Output["row"].(map[string]any) + if !ok { + t.Fatal("expected row in output") + } + if row["name"] != "Acme Corp" { + t.Errorf("expected name='Acme Corp', got %v", row["name"]) } } @@ -69,8 +73,12 @@ func TestDBQueryCachedStep_CacheHit(t *testing.T) { if result.Output["cache_hit"] != true { t.Errorf("expected cache_hit=true on second call, got %v", result.Output["cache_hit"]) } - if result.Output["name"] != "Acme Corp" { - t.Errorf("expected name='Acme Corp' on cache hit, got %v", result.Output["name"]) + row, ok := result.Output["row"].(map[string]any) + if !ok { + t.Fatal("expected row in output on cache hit") + } + if row["name"] != "Acme Corp" { + t.Errorf("expected name='Acme Corp' on cache hit, got %v", row["name"]) } } @@ -136,15 +144,19 @@ func TestDBQueryCachedStep_ScanFields(t *testing.T) { t.Fatalf("execute error: %v", err) } - if result.Output["name"] != "Acme Corp" { - t.Errorf("expected name='Acme Corp', got %v", result.Output["name"]) + row, ok := result.Output["row"].(map[string]any) + if !ok { + t.Fatal("expected row in output") + } + if row["name"] != "Acme Corp" { + t.Errorf("expected name='Acme Corp', got %v", row["name"]) } // id and slug should not be in output since scan_fields only has "name" - if _, ok := result.Output["id"]; ok { - t.Errorf("expected 'id' to be excluded from output when not in scan_fields") + if _, ok := row["id"]; ok { + t.Errorf("expected 'id' to be excluded from row when not in scan_fields") } - if _, ok := result.Output["slug"]; ok { - t.Errorf("expected 'slug' to be excluded from output when not in scan_fields") + if _, ok := row["slug"]; ok { + t.Errorf("expected 'slug' to be excluded from row when not in scan_fields") } } @@ -173,8 +185,12 @@ func TestDBQueryCachedStep_TemplateParams(t *testing.T) { t.Fatalf("execute error: %v", err) } - if result.Output["name"] != "Beta Inc" { - t.Errorf("expected name='Beta Inc', got %v", result.Output["name"]) + row, ok := result.Output["row"].(map[string]any) + if !ok { + t.Fatal("expected row in output") + } + if row["name"] != "Beta Inc" { + t.Errorf("expected name='Beta Inc', got %v", row["name"]) } } @@ -292,12 +308,20 @@ func TestDBQueryCachedStep_NoRows(t *testing.T) { t.Fatalf("execute error: %v", err) } - // No rows means empty output (no id/name keys), cache_hit=false + // No rows in single mode means row={}, found=false, cache_hit=false if result.Output["cache_hit"] != false { t.Errorf("expected cache_hit=false, got %v", result.Output["cache_hit"]) } - if _, ok := result.Output["id"]; ok { - t.Errorf("expected no 'id' field when no rows returned") + found, _ := result.Output["found"].(bool) + if found { + t.Errorf("expected found=false when no rows returned") + } + row, ok := result.Output["row"].(map[string]any) + if !ok { + t.Fatal("expected row in output even when no rows found") + } + if len(row) != 0 { + t.Errorf("expected empty row map, got %v", row) } } @@ -325,8 +349,12 @@ func TestDBQueryCachedStep_PostgresPlaceholderNormalization(t *testing.T) { t.Fatalf("execute error (placeholder normalization may have failed): %v", err) } - if result.Output["name"] != "Acme Corp" { - t.Errorf("expected name='Acme Corp' after $1→? normalization, got %v", result.Output["name"]) + row, ok := result.Output["row"].(map[string]any) + if !ok { + t.Fatal("expected row in output after $1→? normalization") + } + if row["name"] != "Acme Corp" { + t.Errorf("expected name='Acme Corp' after $1→? normalization, got %v", row["name"]) } } @@ -373,8 +401,12 @@ func TestDBQueryCachedStep_CacheHitSkipsDB(t *testing.T) { if second.Output["cache_hit"] != true { t.Errorf("expected cache_hit=true on second call after DB closed") } - if second.Output["name"] != "Acme Corp" { - t.Errorf("expected name='Acme Corp' from cache, got %v", second.Output["name"]) + secondRow, ok := second.Output["row"].(map[string]any) + if !ok { + t.Fatal("expected row in cached output") + } + if secondRow["name"] != "Acme Corp" { + t.Errorf("expected name='Acme Corp' from cache, got %v", secondRow["name"]) } } @@ -405,3 +437,215 @@ func TestDBQueryCachedStep_NegativeTTLRejected(t *testing.T) { t.Fatal("expected error for negative cache_ttl") } } + +// TestDBQueryCachedStep_InvalidModeRejected verifies that an unknown mode is rejected. +func TestDBQueryCachedStep_InvalidModeRejected(t *testing.T) { + factory := NewDBQueryCachedStepFactory() + _, err := factory("bad", map[string]any{ + "database": "db", + "query": "SELECT 1", + "cache_key": "k", + "mode": "bulk", + }, nil) + if err == nil { + t.Fatal("expected error for invalid mode") + } +} + +// TestDBQueryCachedStep_ListMode verifies that mode: list returns rows/count format. +func TestDBQueryCachedStep_ListMode(t *testing.T) { + db := setupTestDB(t) + app := mockAppWithDB("test-db", db) + + factory := NewDBQueryCachedStepFactory() + step, err := factory("list-companies", map[string]any{ + "database": "test-db", + "query": "SELECT id, name, slug FROM companies WHERE parent_id IS NULL ORDER BY name", + "mode": "list", + "cache_key": "companies:list", + "cache_ttl": "5m", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["cache_hit"] != false { + t.Errorf("expected cache_hit=false on first call, got %v", result.Output["cache_hit"]) + } + rows, ok := result.Output["rows"].([]map[string]any) + if !ok { + t.Fatal("expected rows in output for list mode") + } + count, ok := result.Output["count"].(int) + if !ok { + t.Fatal("expected count in output for list mode") + } + if count != 2 { + t.Errorf("expected count=2, got %d", count) + } + if len(rows) != 2 { + t.Errorf("expected 2 rows, got %d", len(rows)) + } + if rows[0]["name"] != "Acme Corp" { + t.Errorf("expected first row name='Acme Corp', got %v", rows[0]["name"]) + } +} + +// TestDBQueryCachedStep_ListModeCacheHit verifies that list mode results are cached and returned correctly. +func TestDBQueryCachedStep_ListModeCacheHit(t *testing.T) { + db := setupTestDB(t) + app := mockAppWithDB("test-db", db) + + factory := NewDBQueryCachedStepFactory() + step, err := factory("list-companies", map[string]any{ + "database": "test-db", + "query": "SELECT id, name FROM companies WHERE parent_id IS NULL ORDER BY name", + "mode": "list", + "cache_key": "companies:list", + "cache_ttl": "5m", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + + // First call — cache miss + first, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("first execute error: %v", err) + } + if first.Output["cache_hit"] != false { + t.Errorf("expected cache_hit=false on first call") + } + + // Second call — cache hit + second, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("second execute error: %v", err) + } + if second.Output["cache_hit"] != true { + t.Errorf("expected cache_hit=true on second call, got %v", second.Output["cache_hit"]) + } + rows, ok := second.Output["rows"].([]map[string]any) + if !ok { + t.Fatal("expected rows in cached output") + } + if len(rows) != 2 { + t.Errorf("expected 2 rows from cache, got %d", len(rows)) + } +} + +// TestDBQueryCachedStep_ListModeEmpty verifies that list mode returns an empty rows slice when no rows match. +func TestDBQueryCachedStep_ListModeEmpty(t *testing.T) { + db := setupTestDB(t) + app := mockAppWithDB("test-db", db) + + factory := NewDBQueryCachedStepFactory() + step, err := factory("list-empty", map[string]any{ + "database": "test-db", + "query": "SELECT id, name FROM companies WHERE id = ?", + "params": []any{"nonexistent"}, + "mode": "list", + "cache_key": "companies:empty", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + rows, ok := result.Output["rows"].([]map[string]any) + if !ok { + t.Fatal("expected rows in output for list mode even when empty") + } + if len(rows) != 0 { + t.Errorf("expected 0 rows, got %d", len(rows)) + } + count, _ := result.Output["count"].(int) + if count != 0 { + t.Errorf("expected count=0, got %d", count) + } +} + +// TestDBQueryCachedStep_SingleModeFound verifies that mode: single returns row/found format when a row is found. +func TestDBQueryCachedStep_SingleModeFound(t *testing.T) { + db := setupTestDB(t) + app := mockAppWithDB("test-db", db) + + factory := NewDBQueryCachedStepFactory() + step, err := factory("get-company", map[string]any{ + "database": "test-db", + "query": "SELECT id, name FROM companies WHERE id = ?", + "params": []any{"c1"}, + "mode": "single", + "cache_key": "company:c1", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + found, _ := result.Output["found"].(bool) + if !found { + t.Error("expected found=true") + } + row, ok := result.Output["row"].(map[string]any) + if !ok { + t.Fatal("expected row in output") + } + if row["name"] != "Acme Corp" { + t.Errorf("expected name='Acme Corp', got %v", row["name"]) + } +} + +// TestDBQueryCachedStep_SingleModeNotFound verifies that mode: single returns row={}/found=false when no row matches. +func TestDBQueryCachedStep_SingleModeNotFound(t *testing.T) { + db := setupTestDB(t) + app := mockAppWithDB("test-db", db) + + factory := NewDBQueryCachedStepFactory() + step, err := factory("get-missing", map[string]any{ + "database": "test-db", + "query": "SELECT id, name FROM companies WHERE id = ?", + "params": []any{"nonexistent"}, + "mode": "single", + "cache_key": "company:nonexistent", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + found, _ := result.Output["found"].(bool) + if found { + t.Error("expected found=false") + } + row, ok := result.Output["row"].(map[string]any) + if !ok { + t.Fatal("expected row in output even when not found") + } + if len(row) != 0 { + t.Errorf("expected empty row map, got %v", row) + } +} diff --git a/module/pipeline_step_db_query_test.go b/module/pipeline_step_db_query_test.go index acef121d..c3d19825 100644 --- a/module/pipeline_step_db_query_test.go +++ b/module/pipeline_step_db_query_test.go @@ -21,7 +21,7 @@ type testDBDriverProvider struct { driver string } -func (p *testDBDriverProvider) DB() *sql.DB { return p.db } +func (p *testDBDriverProvider) DB() *sql.DB { return p.db } func (p *testDBDriverProvider) DriverName() string { return p.driver } // mockAppWithDBDriver creates a MockApplication with a named database that reports its driver diff --git a/module/pipeline_step_sandbox_exec.go b/module/pipeline_step_sandbox_exec.go index d63a4a88..5011e01b 100644 --- a/module/pipeline_step_sandbox_exec.go +++ b/module/pipeline_step_sandbox_exec.go @@ -163,15 +163,15 @@ func (s *SandboxExecStep) buildSandboxConfig() sandbox.SandboxConfig { } case "standard": cfg = sandbox.SandboxConfig{ - Image: s.image, - MemoryLimit: 256 * 1024 * 1024, - CPULimit: 0.5, - NetworkMode: "bridge", - CapDrop: []string{"NET_ADMIN", "SYS_ADMIN", "SYS_PTRACE", "SETUID", "SETGID"}, - CapAdd: []string{"NET_BIND_SERVICE"}, + Image: s.image, + MemoryLimit: 256 * 1024 * 1024, + CPULimit: 0.5, + NetworkMode: "bridge", + CapDrop: []string{"NET_ADMIN", "SYS_ADMIN", "SYS_PTRACE", "SETUID", "SETGID"}, + CapAdd: []string{"NET_BIND_SERVICE"}, NoNewPrivileges: true, - PidsLimit: 64, - Timeout: 5 * time.Minute, + PidsLimit: 64, + Timeout: 5 * time.Minute, } default: // "strict" cfg = sandbox.DefaultSecureSandboxConfig(s.image) diff --git a/module/pipeline_step_token_revoke_test.go b/module/pipeline_step_token_revoke_test.go index b3b79a84..d83a8ea3 100644 --- a/module/pipeline_step_token_revoke_test.go +++ b/module/pipeline_step_token_revoke_test.go @@ -296,4 +296,3 @@ var _ TokenBlacklist = (*TokenBlacklistModule)(nil) // Compile-time check: mockBlacklist satisfies TokenBlacklist. var _ TokenBlacklist = (*mockBlacklist)(nil) - diff --git a/schema/module_schema.go b/schema/module_schema.go index 83c670e3..4322d9e4 100644 --- a/schema/module_schema.go +++ b/schema/module_schema.go @@ -1011,14 +1011,15 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Category: "pipeline", Description: "Executes a parameterized SQL SELECT query and caches the result in-process with TTL. On subsequent calls the cached value is returned until the TTL expires.", Inputs: []ServiceIODef{{Name: "context", Type: "PipelineContext", Description: "Pipeline context for template parameter and cache key resolution"}}, - Outputs: []ServiceIODef{{Name: "result", Type: "StepResult", Description: "Query result fields as top-level keys plus cache_hit boolean"}}, + Outputs: []ServiceIODef{{Name: "result", Type: "StepResult", Description: "Query results as rows/count (list mode) or row/found (single mode), plus cache_hit boolean"}}, ConfigFields: []ConfigFieldDef{ {Key: "database", Label: "Database", Type: FieldTypeString, Required: true, Description: "Name of the database service (must implement DBProvider)", Placeholder: "db", InheritFrom: "dependency.name"}, {Key: "query", Label: "SQL Query", Type: FieldTypeSQL, Required: true, Description: "Parameterized SQL SELECT query using $N placeholders (e.g. $1, $2); automatically converted to ? for SQLite drivers. No template expressions allowed.", Placeholder: "SELECT backend_url, settings FROM routing_config WHERE tenant_id = $1 LIMIT 1"}, {Key: "params", Label: "Parameters", Type: FieldTypeArray, ArrayItemType: "string", Description: "Template-resolved parameter values for query placeholders"}, + {Key: "mode", Label: "Mode", Type: FieldTypeSelect, Options: []string{"single", "list"}, DefaultValue: "single", Description: "Result mode: 'single' returns row/found, 'list' returns rows/count"}, {Key: "cache_key", Label: "Cache Key", Type: FieldTypeString, Required: true, Description: "Template-resolved key used to store/retrieve the cached result", Placeholder: "tenant_config:{{.steps.parse.headers.X-Tenant-Id}}"}, {Key: "cache_ttl", Label: "Cache TTL", Type: FieldTypeString, DefaultValue: "5m", Description: "Duration string for how long to cache the result (e.g. '5m', '30s', '1h')", Placeholder: "5m"}, - {Key: "scan_fields", Label: "Scan Fields", Type: FieldTypeArray, ArrayItemType: "string", Description: "Column names to include in the output map (omit to include all columns)"}, + {Key: "scan_fields", Label: "Scan Fields", Type: FieldTypeArray, ArrayItemType: "string", Description: "Column names to include in the output (omit to include all columns)"}, }, }) diff --git a/schema/schema.go b/schema/schema.go index 81d70764..05ea7e51 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -612,7 +612,7 @@ func GenerateWorkflowSchema() *Schema { Type: "object", Description: "Workflow handler configurations keyed by workflow type (e.g. http, messaging, statemachine, scheduler, integration)", }, - "triggers": triggerSchema, + "triggers": triggerSchema, "pipelines": buildPipelinesSchema(pipelineSchema), "imports": { Type: "array", diff --git a/schema/snippets_export.go b/schema/snippets_export.go index bfd499ec..99ac0b9d 100644 --- a/schema/snippets_export.go +++ b/schema/snippets_export.go @@ -31,19 +31,19 @@ type vscodeSnippet struct { // jetbrainsTemplateSet is the root XML element for JetBrains live templates. type jetbrainsTemplateSet struct { - XMLName xml.Name `xml:"templateSet"` - Group string `xml:"group,attr"` + XMLName xml.Name `xml:"templateSet"` + Group string `xml:"group,attr"` Templates []jetbrainsTemplate `xml:"template"` } type jetbrainsTemplate struct { - Name string `xml:"name,attr"` - Value string `xml:"value,attr"` - Description string `xml:"description,attr"` - ToReformat bool `xml:"toReformat,attr"` - ToShortenFQ bool `xml:"toShortenFQNames,attr"` - Variables []jetbrainsVariable `xml:"variable,omitempty"` - Contexts []jetbrainsContext `xml:"context"` + Name string `xml:"name,attr"` + Value string `xml:"value,attr"` + Description string `xml:"description,attr"` + ToReformat bool `xml:"toReformat,attr"` + ToShortenFQ bool `xml:"toShortenFQNames,attr"` + Variables []jetbrainsVariable `xml:"variable,omitempty"` + Contexts []jetbrainsContext `xml:"context"` } type jetbrainsVariable struct { From 31e339ac105ea3bff5a73c47c6b035d77d97450b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Mar 2026 07:03:53 +0000 Subject: [PATCH 3/4] resolve merge conflicts: integrate allowDynamicSQL from main with mode support Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- module/pipeline_step_db_dynamic.go | 76 ++++++++++++++++++++ module/pipeline_step_db_query_cached.go | 69 +++++++++++------- module/pipeline_step_db_query_cached_test.go | 70 ++++++++++++++++++ schema/module_schema.go | 3 +- 4 files changed, 191 insertions(+), 27 deletions(-) create mode 100644 module/pipeline_step_db_dynamic.go diff --git a/module/pipeline_step_db_dynamic.go b/module/pipeline_step_db_dynamic.go new file mode 100644 index 00000000..0175022c --- /dev/null +++ b/module/pipeline_step_db_dynamic.go @@ -0,0 +1,76 @@ +package module + +import ( + "fmt" + "strings" +) + +// validateSQLIdentifier checks that s is safe to interpolate directly into SQL as an +// identifier (e.g. a table name). Only ASCII letters (A-Z, a-z), ASCII digits (0-9), +// underscores (_) and hyphens (-) are permitted. This strict allowlist prevents SQL +// injection when dynamic values are embedded in queries via allow_dynamic_sql. +func validateSQLIdentifier(s string) error { + if s == "" { + return fmt.Errorf("dynamic SQL identifier must not be empty") + } + for _, c := range s { + if (c < 'a' || c > 'z') && + (c < 'A' || c > 'Z') && + (c < '0' || c > '9') && + c != '_' && c != '-' { + return fmt.Errorf("dynamic SQL identifier %q contains unsafe character %q (only ASCII letters, digits, underscores and hyphens are allowed)", s, string(c)) + } + } + return nil +} + +// resolveDynamicSQL resolves every {{ }} template expression found in query against +// pc and validates that each resolved value is a safe SQL identifier. The validated +// values are substituted back into the query in left-to-right order and the final +// SQL string is returned. +// +// Each occurrence of a template expression is resolved independently, so +// non-deterministic functions like {{uuid}} or {{now}} produce a distinct value +// per occurrence. +// +// This is only called when allow_dynamic_sql is true (explicit opt-in). Callers +// are responsible for ensuring that the query has already passed template parsing. +func resolveDynamicSQL(tmpl *TemplateEngine, query string, pc *PipelineContext) (string, error) { + if !strings.Contains(query, "{{") { + return query, nil + } + + // Process template expressions in left-to-right order. Each occurrence is + // resolved and validated independently to preserve correct semantics for + // non-deterministic template functions (e.g. {{uuid}}, {{now}}). + var result strings.Builder + rest := query + for { + openIdx := strings.Index(rest, "{{") + if openIdx < 0 { + result.WriteString(rest) + break + } + closeIdx := strings.Index(rest[openIdx:], "}}") + if closeIdx < 0 { + return "", fmt.Errorf("dynamic SQL: unclosed template action in query (missing closing '}}')") + } + closeIdx += openIdx + + // Write the literal SQL text before this expression. + result.WriteString(rest[:openIdx]) + + expr := rest[openIdx : closeIdx+2] + + resolved, err := tmpl.Resolve(expr, pc) + if err != nil { + return "", fmt.Errorf("dynamic SQL: failed to resolve %q: %w", expr, err) + } + if err := validateSQLIdentifier(resolved); err != nil { + return "", fmt.Errorf("dynamic SQL: %w", err) + } + result.WriteString(resolved) + rest = rest[closeIdx+2:] + } + return result.String(), nil +} diff --git a/module/pipeline_step_db_query_cached.go b/module/pipeline_step_db_query_cached.go index 1896fde2..cb73eaa5 100644 --- a/module/pipeline_step_db_query_cached.go +++ b/module/pipeline_step_db_query_cached.go @@ -20,16 +20,17 @@ type dbQueryCacheEntry struct { // in an in-process, TTL-aware cache keyed by a template-resolved cache key. // Concurrent pipeline executions are safe: access is protected by a read-write mutex. type DBQueryCachedStep struct { - name string - database string - query string - params []string - mode string // "single" or "list" - cacheKey string - cacheTTL time.Duration - scanFields []string - app modular.Application - tmpl *TemplateEngine + name string + database string + query string + params []string + mode string // "single" or "list" + cacheKey string + cacheTTL time.Duration + scanFields []string + allowDynamicSQL bool + app modular.Application + tmpl *TemplateEngine mu sync.RWMutex cache map[string]dbQueryCacheEntry @@ -48,8 +49,10 @@ func NewDBQueryCachedStepFactory() StepFactory { return nil, fmt.Errorf("db_query_cached step %q: 'query' is required", name) } - // Safety: reject template expressions in SQL to prevent injection - if strings.Contains(query, "{{") { + // Safety: reject template expressions in SQL to prevent injection, + // unless allow_dynamic_sql is explicitly enabled. + allowDynamicSQL, _ := config["allow_dynamic_sql"].(bool) + if !allowDynamicSQL && strings.Contains(query, "{{") { return nil, fmt.Errorf("db_query_cached step %q: query must not contain template expressions (use params instead)", name) } @@ -101,17 +104,18 @@ func NewDBQueryCachedStepFactory() StepFactory { } return &DBQueryCachedStep{ - name: name, - database: database, - query: query, - params: params, - mode: mode, - cacheKey: cacheKey, - cacheTTL: cacheTTL, - scanFields: scanFields, - app: app, - tmpl: NewTemplateEngine(), - cache: make(map[string]dbQueryCacheEntry), + name: name, + database: database, + query: query, + params: params, + mode: mode, + cacheKey: cacheKey, + cacheTTL: cacheTTL, + scanFields: scanFields, + allowDynamicSQL: allowDynamicSQL, + app: app, + tmpl: NewTemplateEngine(), + cache: make(map[string]dbQueryCacheEntry), }, nil } } @@ -122,6 +126,18 @@ func (s *DBQueryCachedStep) Name() string { return s.name } // Execute checks the in-memory cache first; on a miss (or expiry) it queries // the database, stores the result, and returns it. func (s *DBQueryCachedStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { + // Resolve template expressions in the query early (before any DB access) when + // dynamic SQL is enabled. This validates resolved identifiers against an + // allowlist before any database interaction. + query := s.query + if s.allowDynamicSQL { + var err error + query, err = resolveDynamicSQL(s.tmpl, query, pc) + if err != nil { + return nil, fmt.Errorf("db_query_cached step %q: %w", s.name, err) + } + } + if s.app == nil { return nil, fmt.Errorf("db_query_cached step %q: no application context", s.name) } @@ -161,7 +177,7 @@ func (s *DBQueryCachedStep) Execute(ctx context.Context, pc *PipelineContext) (* s.mu.Unlock() // Query the database - result, err := s.runQuery(ctx, pc) + result, err := s.runQuery(ctx, pc, query) if err != nil { return nil, err } @@ -179,7 +195,8 @@ func (s *DBQueryCachedStep) Execute(ctx context.Context, pc *PipelineContext) (* } // runQuery executes the SQL query and returns the result as a map. -func (s *DBQueryCachedStep) runQuery(ctx context.Context, pc *PipelineContext) (map[string]any, error) { +// query is the (already dynamic-SQL-resolved) query string to execute. +func (s *DBQueryCachedStep) runQuery(ctx context.Context, pc *PipelineContext, query string) (map[string]any, error) { svc, ok := s.app.SvcRegistry()[s.database] if !ok { return nil, fmt.Errorf("db_query_cached step %q: database service %q not found", s.name, s.database) @@ -210,7 +227,7 @@ func (s *DBQueryCachedStep) runQuery(ctx context.Context, pc *PipelineContext) ( resolvedParams[i] = resolved } - query := normalizePlaceholders(s.query, driver) + query = normalizePlaceholders(query, driver) rows, err := db.QueryContext(ctx, query, resolvedParams...) if err != nil { diff --git a/module/pipeline_step_db_query_cached_test.go b/module/pipeline_step_db_query_cached_test.go index 3aa465fd..5b554024 100644 --- a/module/pipeline_step_db_query_cached_test.go +++ b/module/pipeline_step_db_query_cached_test.go @@ -2,6 +2,7 @@ package module import ( "context" + "strings" "testing" "time" ) @@ -649,3 +650,72 @@ func TestDBQueryCachedStep_SingleModeNotFound(t *testing.T) { t.Errorf("expected empty row map, got %v", row) } } + +func TestDBQueryCachedStep_DynamicTableName(t *testing.T) { + db := setupTestDB(t) + _, err := db.Exec(`CREATE TABLE companies_beta (id TEXT PRIMARY KEY, name TEXT NOT NULL)`) + if err != nil { + t.Fatalf("create table: %v", err) + } + _, err = db.Exec(`INSERT INTO companies_beta (id, name) VALUES ('b1', 'Beta LLC')`) + if err != nil { + t.Fatalf("insert: %v", err) + } + + app := mockAppWithDB("test-db", db) + factory := NewDBQueryCachedStepFactory() + step, err := factory("dynamic-cached", map[string]any{ + "database": "test-db", + "query": `SELECT id, name FROM companies_{{.steps.auth.tenant}} WHERE id = $1`, + "params": []any{"b1"}, + "cache_key": `tenant:{{.steps.auth.tenant}}:b1`, + "cache_ttl": "5m", + "allow_dynamic_sql": true, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + pc.MergeStepOutput("auth", map[string]any{"tenant": "beta"}) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["cache_hit"] != false { + t.Errorf("expected cache_hit=false on first call, got %v", result.Output["cache_hit"]) + } + row, ok := result.Output["row"].(map[string]any) + if !ok { + t.Fatal("expected row in output") + } + if row["name"] != "Beta LLC" { + t.Errorf("expected name='Beta LLC', got %v", row["name"]) + } +} + +func TestDBQueryCachedStep_DynamicSQL_RejectsInjection(t *testing.T) { + factory := NewDBQueryCachedStepFactory() + step, err := factory("injection-cached", map[string]any{ + "database": "test-db", + "query": `SELECT * FROM companies_{{.steps.auth.tenant}}`, + "cache_key": "k", + "allow_dynamic_sql": true, + }, nil) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + pc.MergeStepOutput("auth", map[string]any{"tenant": "alpha'; DROP TABLE companies;--"}) + + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error for unsafe SQL identifier") + } + if !strings.Contains(err.Error(), "unsafe character") { + t.Errorf("expected 'unsafe character' in error, got: %v", err) + } +} diff --git a/schema/module_schema.go b/schema/module_schema.go index 4322d9e4..6a60e106 100644 --- a/schema/module_schema.go +++ b/schema/module_schema.go @@ -1014,12 +1014,13 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Outputs: []ServiceIODef{{Name: "result", Type: "StepResult", Description: "Query results as rows/count (list mode) or row/found (single mode), plus cache_hit boolean"}}, ConfigFields: []ConfigFieldDef{ {Key: "database", Label: "Database", Type: FieldTypeString, Required: true, Description: "Name of the database service (must implement DBProvider)", Placeholder: "db", InheritFrom: "dependency.name"}, - {Key: "query", Label: "SQL Query", Type: FieldTypeSQL, Required: true, Description: "Parameterized SQL SELECT query using $N placeholders (e.g. $1, $2); automatically converted to ? for SQLite drivers. No template expressions allowed.", Placeholder: "SELECT backend_url, settings FROM routing_config WHERE tenant_id = $1 LIMIT 1"}, + {Key: "query", Label: "SQL Query", Type: FieldTypeSQL, Required: true, Description: "Parameterized SQL SELECT query using $N placeholders (e.g. $1, $2); automatically converted to ? for SQLite drivers. Template expressions are forbidden unless allow_dynamic_sql is true.", Placeholder: "SELECT backend_url, settings FROM routing_config WHERE tenant_id = $1 LIMIT 1"}, {Key: "params", Label: "Parameters", Type: FieldTypeArray, ArrayItemType: "string", Description: "Template-resolved parameter values for query placeholders"}, {Key: "mode", Label: "Mode", Type: FieldTypeSelect, Options: []string{"single", "list"}, DefaultValue: "single", Description: "Result mode: 'single' returns row/found, 'list' returns rows/count"}, {Key: "cache_key", Label: "Cache Key", Type: FieldTypeString, Required: true, Description: "Template-resolved key used to store/retrieve the cached result", Placeholder: "tenant_config:{{.steps.parse.headers.X-Tenant-Id}}"}, {Key: "cache_ttl", Label: "Cache TTL", Type: FieldTypeString, DefaultValue: "5m", Description: "Duration string for how long to cache the result (e.g. '5m', '30s', '1h')", Placeholder: "5m"}, {Key: "scan_fields", Label: "Scan Fields", Type: FieldTypeArray, ArrayItemType: "string", Description: "Column names to include in the output (omit to include all columns)"}, + {Key: "allow_dynamic_sql", Label: "Allow Dynamic SQL", Type: FieldTypeBool, DefaultValue: "false", Description: "When true, template expressions in 'query' are resolved at runtime. Each resolved value must contain only letters, digits, underscores and hyphens to prevent SQL injection."}, }, }) From 2efb1d17b5f613687e28f98ae095fbead8619c91 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 3 Mar 2026 08:26:44 +0000 Subject: [PATCH 4/4] address review feedback: deep copy, remove hyphens from SQL identifier allowlist, backward compat mode Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- module/pipeline_step_db_dynamic.go | 8 +- module/pipeline_step_db_exec_test.go | 12 +-- module/pipeline_step_db_query_cached.go | 57 +++++++++----- module/pipeline_step_db_query_cached_test.go | 78 ++++++-------------- module/pipeline_step_db_query_test.go | 26 +++---- schema/module_schema.go | 6 +- 6 files changed, 86 insertions(+), 101 deletions(-) diff --git a/module/pipeline_step_db_dynamic.go b/module/pipeline_step_db_dynamic.go index 0175022c..aa93495a 100644 --- a/module/pipeline_step_db_dynamic.go +++ b/module/pipeline_step_db_dynamic.go @@ -7,8 +7,8 @@ import ( // validateSQLIdentifier checks that s is safe to interpolate directly into SQL as an // identifier (e.g. a table name). Only ASCII letters (A-Z, a-z), ASCII digits (0-9), -// underscores (_) and hyphens (-) are permitted. This strict allowlist prevents SQL -// injection when dynamic values are embedded in queries via allow_dynamic_sql. +// and underscores (_) are permitted. This strict allowlist prevents SQL injection +// when dynamic values are embedded in queries via allow_dynamic_sql. func validateSQLIdentifier(s string) error { if s == "" { return fmt.Errorf("dynamic SQL identifier must not be empty") @@ -17,8 +17,8 @@ func validateSQLIdentifier(s string) error { if (c < 'a' || c > 'z') && (c < 'A' || c > 'Z') && (c < '0' || c > '9') && - c != '_' && c != '-' { - return fmt.Errorf("dynamic SQL identifier %q contains unsafe character %q (only ASCII letters, digits, underscores and hyphens are allowed)", s, string(c)) + c != '_' { + return fmt.Errorf("dynamic SQL identifier %q contains unsafe character %q (only ASCII letters, digits, and underscores are allowed)", s, string(c)) } } return nil diff --git a/module/pipeline_step_db_exec_test.go b/module/pipeline_step_db_exec_test.go index eb1ea246..77dc5483 100644 --- a/module/pipeline_step_db_exec_test.go +++ b/module/pipeline_step_db_exec_test.go @@ -211,9 +211,9 @@ func TestDBExecStep_DynamicTableName(t *testing.T) { app := mockAppWithDB("test-db", db) factory := NewDBExecStepFactory() step, err := factory("dynamic-insert", map[string]any{ - "database": "test-db", - "query": `INSERT INTO items_{{.steps.auth.tenant}} (id, name) VALUES (?, ?)`, - "params": []any{"i1", "Widget"}, + "database": "test-db", + "query": `INSERT INTO items_{{.steps.auth.tenant}} (id, name) VALUES (?, ?)`, + "params": []any{"i1", "Widget"}, "allow_dynamic_sql": true, }, app) if err != nil { @@ -237,9 +237,9 @@ func TestDBExecStep_DynamicTableName(t *testing.T) { func TestDBExecStep_DynamicSQL_RejectsInjection(t *testing.T) { factory := NewDBExecStepFactory() step, err := factory("injection-exec", map[string]any{ - "database": "test-db", - "query": `DELETE FROM items_{{.steps.auth.tenant}} WHERE id = ?`, - "params": []any{"i1"}, + "database": "test-db", + "query": `DELETE FROM items_{{.steps.auth.tenant}} WHERE id = ?`, + "params": []any{"i1"}, "allow_dynamic_sql": true, }, nil) if err != nil { diff --git a/module/pipeline_step_db_query_cached.go b/module/pipeline_step_db_query_cached.go index cb73eaa5..88385392 100644 --- a/module/pipeline_step_db_query_cached.go +++ b/module/pipeline_step_db_query_cached.go @@ -85,10 +85,11 @@ func NewDBQueryCachedStepFactory() StepFactory { } mode, _ := config["mode"].(string) - if mode == "" { - mode = "single" - } - if mode != "list" && mode != "single" { + mode = strings.TrimSpace(mode) + // Backwards compatibility: if mode is omitted or empty, leave it blank. + // Execution logic treats a blank mode as the legacy single-row flat + // output, while explicit "single" uses the new row/found envelope. + if mode != "" && mode != "list" && mode != "single" { return nil, fmt.Errorf("db_query_cached step %q: mode must be 'list' or 'single', got %q", name, mode) } @@ -155,7 +156,7 @@ func (s *DBQueryCachedStep) Execute(ctx context.Context, pc *PipelineContext) (* s.mu.RUnlock() if found && time.Now().Before(entry.expiresAt) { - output := copyMap(entry.value) + output := deepCopyMap(entry.value) output["cache_hit"] = true return &StepResult{Output: output}, nil } @@ -165,7 +166,7 @@ func (s *DBQueryCachedStep) Execute(ctx context.Context, pc *PipelineContext) (* entry, found = s.cache[key] if found && time.Now().Before(entry.expiresAt) { // Another goroutine populated the cache while we were waiting for the lock - output := copyMap(entry.value) + output := deepCopyMap(entry.value) s.mu.Unlock() output["cache_hit"] = true return &StepResult{Output: output}, nil @@ -185,7 +186,7 @@ func (s *DBQueryCachedStep) Execute(ctx context.Context, pc *PipelineContext) (* // Store in cache (write lock) s.mu.Lock() s.cache[key] = dbQueryCacheEntry{ - value: copyMap(result), + value: deepCopyMap(result), expiresAt: time.Now().Add(s.cacheTTL), } s.mu.Unlock() @@ -271,8 +272,8 @@ func (s *DBQueryCachedStep) runQuery(ctx context.Context, pc *PipelineContext, q } } results = append(results, row) - if s.mode == "single" { - // Only take the first row + if s.mode != "list" { + // Only take the first row for single and legacy modes break } } @@ -282,7 +283,14 @@ func (s *DBQueryCachedStep) runQuery(ctx context.Context, pc *PipelineContext, q } output := make(map[string]any) - if s.mode == "single" { + switch s.mode { + case "list": + if results == nil { + results = []map[string]any{} + } + output["rows"] = results + output["count"] = len(results) + case "single": if len(results) > 0 { output["row"] = results[0] output["found"] = true @@ -290,22 +298,35 @@ func (s *DBQueryCachedStep) runQuery(ctx context.Context, pc *PipelineContext, q output["row"] = map[string]any{} output["found"] = false } - } else { - if results == nil { - results = []map[string]any{} + default: // "" — legacy flat column map (backward compatible) + if len(results) > 0 { + for k, v := range results[0] { + output[k] = v + } } - output["rows"] = results - output["count"] = len(results) } return output, nil } -// copyMap creates a shallow copy of a map. -func copyMap(m map[string]any) map[string]any { +// deepCopyMap creates a deep copy of a map, recursively copying nested +// map[string]any values and []map[string]any slices to prevent callers from +// mutating cached data or triggering data races across goroutines. +func deepCopyMap(m map[string]any) map[string]any { cp := make(map[string]any, len(m)) for k, v := range m { - cp[k] = v + switch val := v.(type) { + case map[string]any: + cp[k] = deepCopyMap(val) + case []map[string]any: + sliceCopy := make([]map[string]any, len(val)) + for i, row := range val { + sliceCopy[i] = deepCopyMap(row) + } + cp[k] = sliceCopy + default: + cp[k] = v + } } return cp } diff --git a/module/pipeline_step_db_query_cached_test.go b/module/pipeline_step_db_query_cached_test.go index 5b554024..4dee217c 100644 --- a/module/pipeline_step_db_query_cached_test.go +++ b/module/pipeline_step_db_query_cached_test.go @@ -32,12 +32,8 @@ func TestDBQueryCachedStep_CacheMiss(t *testing.T) { if result.Output["cache_hit"] != false { t.Errorf("expected cache_hit=false on first call, got %v", result.Output["cache_hit"]) } - row, ok := result.Output["row"].(map[string]any) - if !ok { - t.Fatal("expected row in output") - } - if row["name"] != "Acme Corp" { - t.Errorf("expected name='Acme Corp', got %v", row["name"]) + if result.Output["name"] != "Acme Corp" { + t.Errorf("expected name='Acme Corp', got %v", result.Output["name"]) } } @@ -74,12 +70,8 @@ func TestDBQueryCachedStep_CacheHit(t *testing.T) { if result.Output["cache_hit"] != true { t.Errorf("expected cache_hit=true on second call, got %v", result.Output["cache_hit"]) } - row, ok := result.Output["row"].(map[string]any) - if !ok { - t.Fatal("expected row in output on cache hit") - } - if row["name"] != "Acme Corp" { - t.Errorf("expected name='Acme Corp' on cache hit, got %v", row["name"]) + if result.Output["name"] != "Acme Corp" { + t.Errorf("expected name='Acme Corp' on cache hit, got %v", result.Output["name"]) } } @@ -145,19 +137,15 @@ func TestDBQueryCachedStep_ScanFields(t *testing.T) { t.Fatalf("execute error: %v", err) } - row, ok := result.Output["row"].(map[string]any) - if !ok { - t.Fatal("expected row in output") - } - if row["name"] != "Acme Corp" { - t.Errorf("expected name='Acme Corp', got %v", row["name"]) + if result.Output["name"] != "Acme Corp" { + t.Errorf("expected name='Acme Corp', got %v", result.Output["name"]) } // id and slug should not be in output since scan_fields only has "name" - if _, ok := row["id"]; ok { - t.Errorf("expected 'id' to be excluded from row when not in scan_fields") + if _, ok := result.Output["id"]; ok { + t.Errorf("expected 'id' to be excluded from output when not in scan_fields") } - if _, ok := row["slug"]; ok { - t.Errorf("expected 'slug' to be excluded from row when not in scan_fields") + if _, ok := result.Output["slug"]; ok { + t.Errorf("expected 'slug' to be excluded from output when not in scan_fields") } } @@ -186,12 +174,8 @@ func TestDBQueryCachedStep_TemplateParams(t *testing.T) { t.Fatalf("execute error: %v", err) } - row, ok := result.Output["row"].(map[string]any) - if !ok { - t.Fatal("expected row in output") - } - if row["name"] != "Beta Inc" { - t.Errorf("expected name='Beta Inc', got %v", row["name"]) + if result.Output["name"] != "Beta Inc" { + t.Errorf("expected name='Beta Inc', got %v", result.Output["name"]) } } @@ -309,20 +293,12 @@ func TestDBQueryCachedStep_NoRows(t *testing.T) { t.Fatalf("execute error: %v", err) } - // No rows in single mode means row={}, found=false, cache_hit=false + // No rows in legacy mode means empty output (no column keys), cache_hit=false if result.Output["cache_hit"] != false { t.Errorf("expected cache_hit=false, got %v", result.Output["cache_hit"]) } - found, _ := result.Output["found"].(bool) - if found { - t.Errorf("expected found=false when no rows returned") - } - row, ok := result.Output["row"].(map[string]any) - if !ok { - t.Fatal("expected row in output even when no rows found") - } - if len(row) != 0 { - t.Errorf("expected empty row map, got %v", row) + if _, ok := result.Output["id"]; ok { + t.Errorf("expected no 'id' field when no rows returned") } } @@ -350,12 +326,8 @@ func TestDBQueryCachedStep_PostgresPlaceholderNormalization(t *testing.T) { t.Fatalf("execute error (placeholder normalization may have failed): %v", err) } - row, ok := result.Output["row"].(map[string]any) - if !ok { - t.Fatal("expected row in output after $1→? normalization") - } - if row["name"] != "Acme Corp" { - t.Errorf("expected name='Acme Corp' after $1→? normalization, got %v", row["name"]) + if result.Output["name"] != "Acme Corp" { + t.Errorf("expected name='Acme Corp' after $1→? normalization, got %v", result.Output["name"]) } } @@ -402,12 +374,8 @@ func TestDBQueryCachedStep_CacheHitSkipsDB(t *testing.T) { if second.Output["cache_hit"] != true { t.Errorf("expected cache_hit=true on second call after DB closed") } - secondRow, ok := second.Output["row"].(map[string]any) - if !ok { - t.Fatal("expected row in cached output") - } - if secondRow["name"] != "Acme Corp" { - t.Errorf("expected name='Acme Corp' from cache, got %v", secondRow["name"]) + if second.Output["name"] != "Acme Corp" { + t.Errorf("expected name='Acme Corp' from cache, got %v", second.Output["name"]) } } @@ -687,12 +655,8 @@ func TestDBQueryCachedStep_DynamicTableName(t *testing.T) { if result.Output["cache_hit"] != false { t.Errorf("expected cache_hit=false on first call, got %v", result.Output["cache_hit"]) } - row, ok := result.Output["row"].(map[string]any) - if !ok { - t.Fatal("expected row in output") - } - if row["name"] != "Beta LLC" { - t.Errorf("expected name='Beta LLC', got %v", row["name"]) + if result.Output["name"] != "Beta LLC" { + t.Errorf("expected name='Beta LLC', got %v", result.Output["name"]) } } diff --git a/module/pipeline_step_db_query_test.go b/module/pipeline_step_db_query_test.go index 80fe3e4e..e704faf4 100644 --- a/module/pipeline_step_db_query_test.go +++ b/module/pipeline_step_db_query_test.go @@ -234,10 +234,10 @@ func TestDBQueryStep_DynamicTableName(t *testing.T) { app := mockAppWithDB("test-db", db) factory := NewDBQueryStepFactory() step, err := factory("dynamic-table", map[string]any{ - "database": "test-db", - "query": `SELECT id, name FROM companies_{{.steps.auth.tenant}} WHERE id = ?`, - "params": []any{"a1"}, - "mode": "single", + "database": "test-db", + "query": `SELECT id, name FROM companies_{{.steps.auth.tenant}} WHERE id = ?`, + "params": []any{"a1"}, + "mode": "single", "allow_dynamic_sql": true, }, app) if err != nil { @@ -265,9 +265,9 @@ func TestDBQueryStep_DynamicTableName(t *testing.T) { func TestDBQueryStep_DynamicSQL_RejectsInjection(t *testing.T) { factory := NewDBQueryStepFactory() step, err := factory("injection-attempt", map[string]any{ - "database": "test-db", - "query": `SELECT * FROM companies_{{.steps.auth.tenant}}`, - "mode": "list", + "database": "test-db", + "query": `SELECT * FROM companies_{{.steps.auth.tenant}}`, + "mode": "list", "allow_dynamic_sql": true, }, nil) // nil app is fine – we expect an error before the DB is touched if err != nil { @@ -289,9 +289,9 @@ func TestDBQueryStep_DynamicSQL_RejectsInjection(t *testing.T) { func TestDBQueryStep_DynamicSQL_RejectsEmpty(t *testing.T) { factory := NewDBQueryStepFactory() step, err := factory("empty-ident", map[string]any{ - "database": "test-db", - "query": `SELECT * FROM companies_{{.steps.auth.tenant}}`, - "mode": "list", + "database": "test-db", + "query": `SELECT * FROM companies_{{.steps.auth.tenant}}`, + "mode": "list", "allow_dynamic_sql": true, }, nil) if err != nil { @@ -319,9 +319,9 @@ func TestDBQueryStep_MissingDatabase(t *testing.T) { func TestDBQueryStep_DynamicSQL_UnclosedAction(t *testing.T) { factory := NewDBQueryStepFactory() step, err := factory("unclosed", map[string]any{ - "database": "test-db", - "query": `SELECT * FROM companies_{{.steps.auth.tenant`, - "mode": "list", + "database": "test-db", + "query": `SELECT * FROM companies_{{.steps.auth.tenant`, + "mode": "list", "allow_dynamic_sql": true, }, nil) if err != nil { diff --git a/schema/module_schema.go b/schema/module_schema.go index 35240ab9..91e3ab47 100644 --- a/schema/module_schema.go +++ b/schema/module_schema.go @@ -1002,7 +1002,7 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { {Key: "query", Label: "SQL Query", Type: FieldTypeSQL, Required: true, Description: "Parameterized SQL SELECT query (use ? for placeholders). Template expressions are forbidden unless allow_dynamic_sql is true.", Placeholder: "SELECT id, name FROM companies WHERE id = ?"}, {Key: "params", Label: "Parameters", Type: FieldTypeArray, ArrayItemType: "string", Description: "Template-resolved parameter values for ? placeholders in query"}, {Key: "mode", Label: "Mode", Type: FieldTypeSelect, Options: []string{"list", "single"}, DefaultValue: "list", Description: "Result mode: 'list' returns rows/count, 'single' returns row/found"}, - {Key: "allow_dynamic_sql", Label: "Allow Dynamic SQL", Type: FieldTypeBool, DefaultValue: "false", Description: "When true, template expressions in 'query' are resolved at runtime. Each resolved value must contain only letters, digits, underscores and hyphens to prevent SQL injection."}, + {Key: "allow_dynamic_sql", Label: "Allow Dynamic SQL", Type: FieldTypeBool, DefaultValue: "false", Description: "When true, template expressions in 'query' are resolved at runtime. Each resolved value must contain only letters, digits, and underscores to prevent SQL injection."}, }, }) @@ -1021,7 +1021,7 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { {Key: "cache_key", Label: "Cache Key", Type: FieldTypeString, Required: true, Description: "Template-resolved key used to store/retrieve the cached result", Placeholder: "tenant_config:{{.steps.parse.headers.X-Tenant-Id}}"}, {Key: "cache_ttl", Label: "Cache TTL", Type: FieldTypeString, DefaultValue: "5m", Description: "Duration string for how long to cache the result (e.g. '5m', '30s', '1h')", Placeholder: "5m"}, {Key: "scan_fields", Label: "Scan Fields", Type: FieldTypeArray, ArrayItemType: "string", Description: "Column names to include in the output (omit to include all columns)"}, - {Key: "allow_dynamic_sql", Label: "Allow Dynamic SQL", Type: FieldTypeBool, DefaultValue: "false", Description: "When true, template expressions in 'query' are resolved at runtime. Each resolved value must contain only letters, digits, underscores and hyphens to prevent SQL injection."}, + {Key: "allow_dynamic_sql", Label: "Allow Dynamic SQL", Type: FieldTypeBool, DefaultValue: "false", Description: "When true, template expressions in 'query' are resolved at runtime. Each resolved value must contain only letters, digits, and underscores to prevent SQL injection."}, }, }) @@ -1036,7 +1036,7 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { {Key: "database", Label: "Database", Type: FieldTypeString, Required: true, Description: "Name of the database service (must implement DBProvider)", Placeholder: "admin-db", InheritFrom: "dependency.name"}, {Key: "query", Label: "SQL Statement", Type: FieldTypeSQL, Required: true, Description: "Parameterized SQL INSERT/UPDATE/DELETE statement (use ? for placeholders). Template expressions are forbidden unless allow_dynamic_sql is true.", Placeholder: "INSERT INTO companies (id, name) VALUES (?, ?)"}, {Key: "params", Label: "Parameters", Type: FieldTypeArray, ArrayItemType: "string", Description: "Template-resolved parameter values for ? placeholders"}, - {Key: "allow_dynamic_sql", Label: "Allow Dynamic SQL", Type: FieldTypeBool, DefaultValue: "false", Description: "When true, template expressions in 'query' are resolved at runtime. Each resolved value must contain only letters, digits, underscores and hyphens to prevent SQL injection."}, + {Key: "allow_dynamic_sql", Label: "Allow Dynamic SQL", Type: FieldTypeBool, DefaultValue: "false", Description: "When true, template expressions in 'query' are resolved at runtime. Each resolved value must contain only letters, digits, and underscores to prevent SQL injection."}, }, })