From c63753e9350cfefb69e34365fc0151ad19339829 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 2 Mar 2026 20:58:01 +0000 Subject: [PATCH 1/4] Initial plan From da6381616e95eca68aedbfb481bd05bbd169088b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 2 Mar 2026 21:10:15 +0000 Subject: [PATCH 2/4] feat: add step.db_query_cached pipeline step with in-process TTL caching Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- cmd/wfctl/type_registry.go | 5 + module/pipeline_step_db_query_cached.go | 244 +++++++++++++++ module/pipeline_step_db_query_cached_test.go | 302 +++++++++++++++++++ plugins/pipelinesteps/plugin.go | 4 +- plugins/pipelinesteps/plugin_test.go | 1 + schema/module_schema.go | 17 ++ schema/schema.go | 1 + 7 files changed, 573 insertions(+), 1 deletion(-) create mode 100644 module/pipeline_step_db_query_cached.go create mode 100644 module/pipeline_step_db_query_cached_test.go diff --git a/cmd/wfctl/type_registry.go b/cmd/wfctl/type_registry.go index 31ae0e99..b8df25df 100644 --- a/cmd/wfctl/type_registry.go +++ b/cmd/wfctl/type_registry.go @@ -586,6 +586,11 @@ func KnownStepTypes() map[string]StepTypeInfo { Plugin: "pipelinesteps", ConfigKeys: []string{"database", "query", "params"}, }, + "step.db_query_cached": { + Type: "step.db_query_cached", + Plugin: "pipelinesteps", + ConfigKeys: []string{"database", "query", "params", "cache_key", "cache_ttl", "scan_fields"}, + }, "step.json_response": { Type: "step.json_response", Plugin: "pipelinesteps", diff --git a/module/pipeline_step_db_query_cached.go b/module/pipeline_step_db_query_cached.go new file mode 100644 index 00000000..a7e5b3d8 --- /dev/null +++ b/module/pipeline_step_db_query_cached.go @@ -0,0 +1,244 @@ +package module + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/CrisisTextLine/modular" +) + +// dbQueryCacheEntry holds a cached query result with its expiry time. +type dbQueryCacheEntry struct { + value map[string]any + expiresAt time.Time +} + +// DBQueryCachedStep executes a parameterized SQL SELECT and caches the result +// in an in-process, TTL-aware cache keyed by a template-resolved cache key. +// Concurrent pipeline executions are safe: access is protected by a read-write mutex. +type DBQueryCachedStep struct { + name string + database string + query string + params []string + cacheKey string + cacheTTL time.Duration + scanFields []string + app modular.Application + tmpl *TemplateEngine + + mu sync.RWMutex + cache map[string]dbQueryCacheEntry +} + +// NewDBQueryCachedStepFactory returns a StepFactory that creates DBQueryCachedStep instances. +func NewDBQueryCachedStepFactory() StepFactory { + return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) { + database, _ := config["database"].(string) + if database == "" { + return nil, fmt.Errorf("db_query_cached step %q: 'database' is required", name) + } + + query, _ := config["query"].(string) + if query == "" { + return nil, fmt.Errorf("db_query_cached step %q: 'query' is required", name) + } + + // Safety: reject template expressions in SQL to prevent injection + if strings.Contains(query, "{{") { + return nil, fmt.Errorf("db_query_cached step %q: query must not contain template expressions (use params instead)", name) + } + + cacheKey, _ := config["cache_key"].(string) + if cacheKey == "" { + return nil, fmt.Errorf("db_query_cached step %q: 'cache_key' is required", name) + } + + cacheTTL := 5 * time.Minute + if ttlStr, ok := config["cache_ttl"].(string); ok && ttlStr != "" { + parsed, err := time.ParseDuration(ttlStr) + if err != nil { + return nil, fmt.Errorf("db_query_cached step %q: invalid 'cache_ttl' %q: %w", name, ttlStr, err) + } + cacheTTL = parsed + } + + var params []string + if p, ok := config["params"]; ok { + if list, ok := p.([]any); ok { + for _, item := range list { + if s, ok := item.(string); ok { + params = append(params, s) + } + } + } + } + + var scanFields []string + if sf, ok := config["scan_fields"]; ok { + if list, ok := sf.([]any); ok { + for _, item := range list { + if s, ok := item.(string); ok { + scanFields = append(scanFields, s) + } + } + } + } + + return &DBQueryCachedStep{ + name: name, + database: database, + query: query, + params: params, + cacheKey: cacheKey, + cacheTTL: cacheTTL, + scanFields: scanFields, + app: app, + tmpl: NewTemplateEngine(), + cache: make(map[string]dbQueryCacheEntry), + }, nil + } +} + +// Name returns the step name. +func (s *DBQueryCachedStep) Name() string { return s.name } + +// Execute checks the in-memory cache first; on a miss (or expiry) it queries +// the database, stores the result, and returns it. +func (s *DBQueryCachedStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) { + if s.app == nil { + return nil, fmt.Errorf("db_query_cached step %q: no application context", s.name) + } + + // Resolve the cache key template + resolvedKey, err := s.tmpl.Resolve(s.cacheKey, pc) + if err != nil { + return nil, fmt.Errorf("db_query_cached step %q: failed to resolve cache_key template: %w", s.name, err) + } + key := fmt.Sprintf("%v", resolvedKey) + + // Check cache (read lock) + s.mu.RLock() + entry, found := s.cache[key] + s.mu.RUnlock() + + if found && time.Now().Before(entry.expiresAt) { + output := copyMap(entry.value) + output["cache_hit"] = true + return &StepResult{Output: output}, nil + } + + // Cache miss or expired — query the database + result, err := s.runQuery(ctx, pc) + if err != nil { + return nil, err + } + + // Store in cache (write lock) + s.mu.Lock() + s.cache[key] = dbQueryCacheEntry{ + value: copyMap(result), + expiresAt: time.Now().Add(s.cacheTTL), + } + s.mu.Unlock() + + result["cache_hit"] = false + return &StepResult{Output: result}, nil +} + +// runQuery executes the SQL query and returns the result as a map. +func (s *DBQueryCachedStep) runQuery(ctx context.Context, pc *PipelineContext) (map[string]any, error) { + svc, ok := s.app.SvcRegistry()[s.database] + if !ok { + return nil, fmt.Errorf("db_query_cached step %q: database service %q not found", s.name, s.database) + } + + provider, ok := svc.(DBProvider) + if !ok { + return nil, fmt.Errorf("db_query_cached step %q: service %q does not implement DBProvider", s.name, s.database) + } + + db := provider.DB() + if db == nil { + return nil, fmt.Errorf("db_query_cached step %q: database connection is nil", s.name) + } + + var driver string + if dp, ok := svc.(DBDriverProvider); ok { + driver = dp.DriverName() + } + + // Resolve template params + resolvedParams := make([]any, len(s.params)) + for i, p := range s.params { + resolved, err := s.tmpl.Resolve(p, pc) + if err != nil { + return nil, fmt.Errorf("db_query_cached step %q: failed to resolve param %d: %w", s.name, i, err) + } + resolvedParams[i] = resolved + } + + query := normalizePlaceholders(s.query, driver) + + rows, err := db.QueryContext(ctx, query, resolvedParams...) + if err != nil { + return nil, fmt.Errorf("db_query_cached step %q: query failed: %w", s.name, err) + } + defer rows.Close() + + columns, err := rows.Columns() + if err != nil { + return nil, fmt.Errorf("db_query_cached step %q: failed to get columns: %w", s.name, err) + } + + // If scan_fields are specified, only keep those columns + fieldSet := make(map[string]bool, len(s.scanFields)) + for _, f := range s.scanFields { + fieldSet[f] = true + } + + output := make(map[string]any) + for rows.Next() { + values := make([]any, len(columns)) + valuePtrs := make([]any, len(columns)) + for i := range values { + valuePtrs[i] = &values[i] + } + + if err := rows.Scan(valuePtrs...); err != nil { + return nil, fmt.Errorf("db_query_cached step %q: scan failed: %w", s.name, err) + } + + for i, col := range columns { + if len(fieldSet) > 0 && !fieldSet[col] { + continue + } + val := values[i] + if b, ok := val.([]byte); ok { + output[col] = string(b) + } else { + output[col] = val + } + } + // Only take the first row + break + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("db_query_cached step %q: row iteration error: %w", s.name, err) + } + + return output, nil +} + +// copyMap creates a shallow copy of a map. +func copyMap(m map[string]any) map[string]any { + cp := make(map[string]any, len(m)) + for k, v := range m { + cp[k] = v + } + return cp +} diff --git a/module/pipeline_step_db_query_cached_test.go b/module/pipeline_step_db_query_cached_test.go new file mode 100644 index 00000000..21d2c2b5 --- /dev/null +++ b/module/pipeline_step_db_query_cached_test.go @@ -0,0 +1,302 @@ +package module + +import ( + "context" + "testing" + "time" +) + +func TestDBQueryCachedStep_CacheMiss(t *testing.T) { + db := setupTestDB(t) + app := mockAppWithDB("test-db", db) + + factory := NewDBQueryCachedStepFactory() + step, err := factory("lookup-config", map[string]any{ + "database": "test-db", + "query": "SELECT id, name FROM companies WHERE id = ?", + "params": []any{"c1"}, + "cache_key": "company:c1", + "cache_ttl": "5m", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["cache_hit"] != false { + t.Errorf("expected cache_hit=false on first call, got %v", result.Output["cache_hit"]) + } + if result.Output["name"] != "Acme Corp" { + t.Errorf("expected name='Acme Corp', got %v", result.Output["name"]) + } +} + +func TestDBQueryCachedStep_CacheHit(t *testing.T) { + db := setupTestDB(t) + app := mockAppWithDB("test-db", db) + + factory := NewDBQueryCachedStepFactory() + step, err := factory("lookup-config", map[string]any{ + "database": "test-db", + "query": "SELECT id, name FROM companies WHERE id = ?", + "params": []any{"c1"}, + "cache_key": "company:c1", + "cache_ttl": "5m", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + + // First call — cache miss + _, err = step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("first execute error: %v", err) + } + + // Second call — cache hit + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("second execute error: %v", err) + } + + if result.Output["cache_hit"] != true { + t.Errorf("expected cache_hit=true on second call, got %v", result.Output["cache_hit"]) + } + if result.Output["name"] != "Acme Corp" { + t.Errorf("expected name='Acme Corp' on cache hit, got %v", result.Output["name"]) + } +} + +func TestDBQueryCachedStep_TTLExpiry(t *testing.T) { + db := setupTestDB(t) + app := mockAppWithDB("test-db", db) + + factory := NewDBQueryCachedStepFactory() + step, err := factory("lookup-config", map[string]any{ + "database": "test-db", + "query": "SELECT id, name FROM companies WHERE id = ?", + "params": []any{"c1"}, + "cache_key": "company:c1", + "cache_ttl": "1ms", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + + // First call — cache miss + first, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("first execute error: %v", err) + } + if first.Output["cache_hit"] != false { + t.Errorf("expected cache_hit=false on first call") + } + + // Wait for TTL to expire + time.Sleep(5 * time.Millisecond) + + // Second call — should be cache miss again due to expiry + second, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("second execute error: %v", err) + } + if second.Output["cache_hit"] != false { + t.Errorf("expected cache_hit=false after TTL expiry, got %v", second.Output["cache_hit"]) + } +} + +func TestDBQueryCachedStep_ScanFields(t *testing.T) { + db := setupTestDB(t) + app := mockAppWithDB("test-db", db) + + factory := NewDBQueryCachedStepFactory() + step, err := factory("lookup-config", map[string]any{ + "database": "test-db", + "query": "SELECT id, name, slug FROM companies WHERE id = ?", + "params": []any{"c1"}, + "cache_key": "company:c1", + "scan_fields": []any{"name"}, + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["name"] != "Acme Corp" { + t.Errorf("expected name='Acme Corp', got %v", result.Output["name"]) + } + // id and slug should not be in output since scan_fields only has "name" + if _, ok := result.Output["id"]; ok { + t.Errorf("expected 'id' to be excluded from output when not in scan_fields") + } + if _, ok := result.Output["slug"]; ok { + t.Errorf("expected 'slug' to be excluded from output when not in scan_fields") + } +} + +func TestDBQueryCachedStep_TemplateParams(t *testing.T) { + db := setupTestDB(t) + app := mockAppWithDB("test-db", db) + + factory := NewDBQueryCachedStepFactory() + step, err := factory("lookup-config", map[string]any{ + "database": "test-db", + "query": "SELECT id, name FROM companies WHERE id = ?", + "params": []any{"{{index .steps \"parse\" \"path_params\" \"id\"}}"}, + "cache_key": "company:{{index .steps \"parse\" \"path_params\" \"id\"}}", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + pc.MergeStepOutput("parse", map[string]any{ + "path_params": map[string]any{"id": "c2"}, + }) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + if result.Output["name"] != "Beta Inc" { + t.Errorf("expected name='Beta Inc', got %v", result.Output["name"]) + } +} + +func TestDBQueryCachedStep_DefaultTTL(t *testing.T) { + db := setupTestDB(t) + app := mockAppWithDB("test-db", db) + + // No cache_ttl specified — should default to 5m + factory := NewDBQueryCachedStepFactory() + step, err := factory("lookup-config", map[string]any{ + "database": "test-db", + "query": "SELECT id FROM companies WHERE id = ?", + "params": []any{"c1"}, + "cache_key": "company:c1", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + if result.Output["cache_hit"] != false { + t.Errorf("expected cache_hit=false on first call") + } + + // Immediately call again — should be a hit + result2, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("second execute error: %v", err) + } + if result2.Output["cache_hit"] != true { + t.Errorf("expected cache_hit=true on second call with default TTL") + } +} + +func TestDBQueryCachedStep_MissingDatabase(t *testing.T) { + factory := NewDBQueryCachedStepFactory() + _, err := factory("bad", map[string]any{ + "query": "SELECT 1", + "cache_key": "k", + }, nil) + if err == nil { + t.Fatal("expected error for missing database") + } +} + +func TestDBQueryCachedStep_MissingQuery(t *testing.T) { + factory := NewDBQueryCachedStepFactory() + _, err := factory("bad", map[string]any{ + "database": "db", + "cache_key": "k", + }, nil) + if err == nil { + t.Fatal("expected error for missing query") + } +} + +func TestDBQueryCachedStep_MissingCacheKey(t *testing.T) { + factory := NewDBQueryCachedStepFactory() + _, err := factory("bad", map[string]any{ + "database": "db", + "query": "SELECT 1", + }, nil) + if err == nil { + t.Fatal("expected error for missing cache_key") + } +} + +func TestDBQueryCachedStep_InvalidTTL(t *testing.T) { + factory := NewDBQueryCachedStepFactory() + _, err := factory("bad", map[string]any{ + "database": "db", + "query": "SELECT 1", + "cache_key": "k", + "cache_ttl": "not-a-duration", + }, nil) + if err == nil { + t.Fatal("expected error for invalid cache_ttl") + } +} + +func TestDBQueryCachedStep_RejectsTemplateInQuery(t *testing.T) { + factory := NewDBQueryCachedStepFactory() + _, err := factory("bad", map[string]any{ + "database": "db", + "query": "SELECT * FROM t WHERE id = '{{.id}}'", + "cache_key": "k", + }, nil) + if err == nil { + t.Fatal("expected error for template in query") + } +} + +func TestDBQueryCachedStep_NoRows(t *testing.T) { + db := setupTestDB(t) + app := mockAppWithDB("test-db", db) + + factory := NewDBQueryCachedStepFactory() + step, err := factory("lookup-missing", map[string]any{ + "database": "test-db", + "query": "SELECT id, name FROM companies WHERE id = ?", + "params": []any{"nonexistent"}, + "cache_key": "company:nonexistent", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + + // No rows means empty output (no id/name keys), cache_hit=false + if result.Output["cache_hit"] != false { + t.Errorf("expected cache_hit=false, got %v", result.Output["cache_hit"]) + } + if _, ok := result.Output["id"]; ok { + t.Errorf("expected no 'id' field when no rows returned") + } +} diff --git a/plugins/pipelinesteps/plugin.go b/plugins/pipelinesteps/plugin.go index 5b9815ef..0a980695 100644 --- a/plugins/pipelinesteps/plugin.go +++ b/plugins/pipelinesteps/plugin.go @@ -1,6 +1,6 @@ // Package pipelinesteps provides a plugin that registers generic pipeline step // types: validate, transform, conditional, set, log, delegate, jq, publish, -// http_call, request_parse, db_query, db_exec, json_response, raw_response, +// http_call, request_parse, db_query, db_exec, db_query_cached, json_response, raw_response, // validate_path_param, validate_pagination, validate_request_body, // foreach, webhook_verify, base64_decode, ui_scaffold, ui_scaffold_analyze, // dlq_send, dlq_replay, retry_with_backoff, circuit_breaker (wrapping), @@ -65,6 +65,7 @@ func New() *Plugin { "step.request_parse", "step.db_query", "step.db_exec", + "step.db_query_cached", "step.json_response", "step.raw_response", "step.workflow_call", @@ -124,6 +125,7 @@ func (p *Plugin) StepFactories() map[string]plugin.StepFactory { "step.request_parse": wrapStepFactory(module.NewRequestParseStepFactory()), "step.db_query": wrapStepFactory(module.NewDBQueryStepFactory()), "step.db_exec": wrapStepFactory(module.NewDBExecStepFactory()), + "step.db_query_cached": wrapStepFactory(module.NewDBQueryCachedStepFactory()), "step.json_response": wrapStepFactory(module.NewJSONResponseStepFactory()), "step.raw_response": wrapStepFactory(module.NewRawResponseStepFactory()), "step.validate_path_param": wrapStepFactory(module.NewValidatePathParamStepFactory()), diff --git a/plugins/pipelinesteps/plugin_test.go b/plugins/pipelinesteps/plugin_test.go index aa890748..b34bd635 100644 --- a/plugins/pipelinesteps/plugin_test.go +++ b/plugins/pipelinesteps/plugin_test.go @@ -44,6 +44,7 @@ func TestStepFactories(t *testing.T) { "step.request_parse", "step.db_query", "step.db_exec", + "step.db_query_cached", "step.json_response", "step.raw_response", "step.validate_path_param", diff --git a/schema/module_schema.go b/schema/module_schema.go index 5fc9b715..9ab8fdb2 100644 --- a/schema/module_schema.go +++ b/schema/module_schema.go @@ -990,6 +990,23 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { }, }) + r.Register(&ModuleSchema{ + Type: "step.db_query_cached", + Label: "Database Query (Cached)", + Category: "pipeline", + Description: "Executes a parameterized SQL SELECT query and caches the result in-process with TTL. On subsequent calls the cached value is returned until the TTL expires.", + Inputs: []ServiceIODef{{Name: "context", Type: "PipelineContext", Description: "Pipeline context for template parameter and cache key resolution"}}, + Outputs: []ServiceIODef{{Name: "result", Type: "StepResult", Description: "Query result fields as top-level keys plus cache_hit boolean"}}, + ConfigFields: []ConfigFieldDef{ + {Key: "database", Label: "Database", Type: FieldTypeString, Required: true, Description: "Name of the database service (must implement DBProvider)", Placeholder: "db", InheritFrom: "dependency.name"}, + {Key: "query", Label: "SQL Query", Type: FieldTypeSQL, Required: true, Description: "Parameterized SQL SELECT query (use $N or ? placeholders, no template expressions allowed)", Placeholder: "SELECT backend_url, settings FROM routing_config WHERE tenant_id = $1 LIMIT 1"}, + {Key: "params", Label: "Parameters", Type: FieldTypeArray, ArrayItemType: "string", Description: "Template-resolved parameter values for query placeholders"}, + {Key: "cache_key", Label: "Cache Key", Type: FieldTypeString, Required: true, Description: "Template-resolved key used to store/retrieve the cached result", Placeholder: "tenant_config:{{.steps.parse.headers.X-Tenant-Id}}"}, + {Key: "cache_ttl", Label: "Cache TTL", Type: FieldTypeString, DefaultValue: "5m", Description: "Duration string for how long to cache the result (e.g. '5m', '30s', '1h')", Placeholder: "5m"}, + {Key: "scan_fields", Label: "Scan Fields", Type: FieldTypeArray, ArrayItemType: "string", Description: "Column names to include in the output map (omit to include all columns)"}, + }, + }) + r.Register(&ModuleSchema{ Type: "step.db_exec", Label: "Database Execute", diff --git a/schema/schema.go b/schema/schema.go index 80a25830..1962a40e 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -211,6 +211,7 @@ var coreModuleTypes = []string{ "step.constraint_check", "step.db_exec", "step.db_query", + "step.db_query_cached", "step.delegate", "step.deploy", "step.dlq_replay", From 6142f19fd0bfb18116d5d3b860abc43943c8cca4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 2 Mar 2026 21:14:32 +0000 Subject: [PATCH 3/4] =?UTF-8?q?feat:=20Add=20step.db=5Fquery=5Fcached=20?= =?UTF-8?q?=E2=80=94=20single-step=20DB=20query=20with=20in-process=20TTL?= =?UTF-8?q?=20caching?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- example/go.mod | 12 +++++------- example/go.sum | 28 ++++++++++++---------------- 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/example/go.mod b/example/go.mod index ca153a59..5665b04a 100644 --- a/example/go.mod +++ b/example/go.mod @@ -22,7 +22,7 @@ require ( github.com/BurntSushi/toml v1.6.0 // indirect github.com/CrisisTextLine/modular/modules/auth v0.4.0 // indirect github.com/CrisisTextLine/modular/modules/cache v0.4.0 // indirect - github.com/CrisisTextLine/modular/modules/eventbus/v2 v2.0.0 // indirect + github.com/CrisisTextLine/modular/modules/eventbus/v2 v2.1.0 // indirect github.com/CrisisTextLine/modular/modules/jsonschema v1.4.0 // indirect github.com/CrisisTextLine/modular/modules/reverseproxy/v2 v2.2.0 // indirect github.com/CrisisTextLine/modular/modules/scheduler v0.4.0 // indirect @@ -31,7 +31,7 @@ require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.55.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.55.0 // indirect - github.com/IBM/sarama v1.46.3 // indirect + github.com/IBM/sarama v1.47.0 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/aws/aws-sdk-go-v2 v1.41.2 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 // indirect @@ -77,7 +77,6 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/eapache/go-resiliency v1.7.0 // indirect - github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/envoyproxy/go-control-plane/envoy v1.36.0 // indirect @@ -95,7 +94,6 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v5 v5.3.1 // indirect github.com/golang/protobuf v1.5.4 // indirect - github.com/golang/snappy v0.0.4 // indirect github.com/golobby/cast v1.3.3 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.7.0 // indirect @@ -133,7 +131,7 @@ require ( github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.13-0.20220915233716-71ac16282d12 // indirect - github.com/klauspost/compress v1.18.3 // indirect + github.com/klauspost/compress v1.18.4 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect @@ -149,7 +147,7 @@ require ( github.com/ncruces/go-strftime v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.1 // indirect - github.com/pierrec/lz4/v4 v4.1.22 // indirect + github.com/pierrec/lz4/v4 v4.1.25 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/prometheus/client_golang v1.19.1 // indirect @@ -184,7 +182,7 @@ require ( go.uber.org/zap v1.27.0 // indirect golang.org/x/crypto v0.48.0 // indirect golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect - golang.org/x/net v0.50.0 // indirect + golang.org/x/net v0.51.0 // indirect golang.org/x/oauth2 v0.35.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.41.0 // indirect diff --git a/example/go.sum b/example/go.sum index e7af08ac..4b7d528c 100644 --- a/example/go.sum +++ b/example/go.sum @@ -32,8 +32,8 @@ github.com/CrisisTextLine/modular/modules/cache v0.4.0 h1:vlPXAsucSM1M0RsPly9cWy github.com/CrisisTextLine/modular/modules/cache v0.4.0/go.mod h1:4irZOGXxUlgJqAnWlpMyPC3C1tM/f5145/wMThYnAsY= github.com/CrisisTextLine/modular/modules/eventbus v1.7.0 h1:SSeu7rjuECDgFa+iNyndn94YPQxffHxJgfR7U4psz6E= github.com/CrisisTextLine/modular/modules/eventbus v1.7.0/go.mod h1:I1tGf3DmadwyMP2NE2m6XHYl9ebXB9wBc/KZLywTR4c= -github.com/CrisisTextLine/modular/modules/eventbus/v2 v2.0.0 h1:bDNWBparvVzXnhLxjFPJ9MDg7N4NUnNOjfn56G/CwGU= -github.com/CrisisTextLine/modular/modules/eventbus/v2 v2.0.0/go.mod h1:5DmacIYrhhiN18i2OHyAVBiNKbN2jHuEv2UJoRToMg0= +github.com/CrisisTextLine/modular/modules/eventbus/v2 v2.1.0 h1:jCG/5cuCITnGH4ztOrU5vY00+ykP9j+RL0zXy/CX1ak= +github.com/CrisisTextLine/modular/modules/eventbus/v2 v2.1.0/go.mod h1:5DmacIYrhhiN18i2OHyAVBiNKbN2jHuEv2UJoRToMg0= github.com/CrisisTextLine/modular/modules/jsonschema v1.4.0 h1:NIhTrDgjhGwMi2D0ukGsd3n/M1W807u6Rhlqm89Sj8Q= github.com/CrisisTextLine/modular/modules/jsonschema v1.4.0/go.mod h1:TeM3mt/+1X5VmlWF4nZpgp4qCGPmAahQs5jAzuWLbOo= github.com/CrisisTextLine/modular/modules/reverseproxy/v2 v2.2.0 h1:SUJEPA61IbjdUwKdSembQTbX9rKz5v4vmyr/cbvb4tY= @@ -52,8 +52,8 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0 github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0.55.0/go.mod h1:vB2GH9GAYYJTO3mEn8oYwzEdhlayZIdQz6zdzgUIRvA= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.55.0 h1:0s6TxfCu2KHkkZPnBfsQ2y5qia0jl3MMrmBhu3nCOYk= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.55.0/go.mod h1:Mf6O40IAyB9zR/1J8nGDDPirZQQPbYJni8Yisy7NTMc= -github.com/IBM/sarama v1.46.3 h1:njRsX6jNlnR+ClJ8XmkO+CM4unbrNr/2vB5KK6UA+IE= -github.com/IBM/sarama v1.46.3/go.mod h1:GTUYiF9DMOZVe3FwyGT+dtSPceGFIgA+sPc5u6CBwko= +github.com/IBM/sarama v1.47.0 h1:GcQFEd12+KzfPYeLgN69Fh7vLCtYRhVIx0rO4TZO318= +github.com/IBM/sarama v1.47.0/go.mod h1:7gLLIU97nznOmA6TX++Qds+DRxH89P2XICY2KAQUzAY= github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= @@ -91,8 +91,8 @@ github.com/aws/aws-sdk-go-v2/service/ecs v1.72.0 h1:hggRKpv26DpYMOik3wWo1Ty5MkAN github.com/aws/aws-sdk-go-v2/service/ecs v1.72.0/go.mod h1:pMlGFDpHoLTJOIZHGdJOAWmi+xeIlQXuFTuQxs1epYE= github.com/aws/aws-sdk-go-v2/service/eks v1.80.0 h1:moQGV8cPbVTN7r2Xte1Mybku35QDePSJEd3onYVmBtY= github.com/aws/aws-sdk-go-v2/service/eks v1.80.0/go.mod h1:Qg678m+87sCuJhcsZojenz8mblYG+Tq86V4m3hjVz0s= -github.com/aws/aws-sdk-go-v2/service/iam v1.53.2 h1:62G6btFUwAa5uR5iPlnlNVAM0zJSLbWgDfKOfUC7oW4= -github.com/aws/aws-sdk-go-v2/service/iam v1.53.2/go.mod h1:av9clChrbZbJ5E21msSsiT2oghl2BJHfQGhCkXmhyu8= +github.com/aws/aws-sdk-go-v2/service/iam v1.53.3 h1:boKZv8dNdHznhAA68hb/dqFz5pxoWmRAOJr9LtscVCI= +github.com/aws/aws-sdk-go-v2/service/iam v1.53.3/go.mod h1:E0QHh3aEwxYb7xshjvxYDELiOda7KBYJ77e/TvGhpcM= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.5 h1:CeY9LUdur+Dxoeldqoun6y4WtJ3RQtzk0JMP2gfUay0= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.5/go.mod h1:AZLZf2fMaahW5s/wMRciu1sYbdsikT/UHwbUjOdEVTc= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.8 h1:Z5EiPIzXKewUQK0QTMkutjiaPVeVYXX7KIqhXu/0fXs= @@ -168,8 +168,6 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= -github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= -github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/emicklei/go-restful/v3 v3.9.0 h1:XwGDlfxEnQZzuopoqxwSEllNcCOM9DhhFyhFIIGKwxE= @@ -221,8 +219,6 @@ github.com/golang-jwt/jwt/v5 v5.3.1/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArs github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= -github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= -github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golobby/cast v1.3.3 h1:s2Lawb9RMz7YyYf8IrfMQY4IFmA1R/lgfmj97Vc6fig= github.com/golobby/cast v1.3.3/go.mod h1:0oDO5IT84HTXcbLDf1YXuk0xtg/cRDrxhbpWKxwtJCY= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= @@ -323,8 +319,8 @@ github.com/json-iterator/go v1.1.13-0.20220915233716-71ac16282d12 h1:9Nu54bhS/H/ github.com/json-iterator/go v1.1.13-0.20220915233716-71ac16282d12/go.mod h1:TBzl5BIHNXfS9+C35ZyJaklL7mLDbgUkcgXzSLa8Tk0= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw= -github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -383,8 +379,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= -github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= -github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.25 h1:kocOqRffaIbU5djlIBr7Wh+cx82C0vtFb0fOurZHqD0= +github.com/pierrec/lz4/v4 v4.1.25/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -514,8 +510,8 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= -golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= +golang.org/x/net v0.51.0 h1:94R/GTO7mt3/4wIKpcR5gkGmRLOuE/2hNGeWq/GBIFo= +golang.org/x/net v0.51.0/go.mod h1:aamm+2QF5ogm02fjy5Bb7CQ0WMt1/WVM7FtyaTLlA9Y= golang.org/x/oauth2 v0.35.0 h1:Mv2mzuHuZuY2+bkyWXIHMfhNdJAdwW3FuWeCPYN5GVQ= golang.org/x/oauth2 v0.35.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= From d2cfbe8b6cfbeafdb8c1c7cb880952505cd239dc Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 2 Mar 2026 21:55:22 +0000 Subject: [PATCH 4/4] fix: address review comments - stampede protection, TTL validation, schema clarity, test coverage Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- module/pipeline_step_db_query_cached.go | 21 +++- module/pipeline_step_db_query_cached_test.go | 105 +++++++++++++++++++ schema/module_schema.go | 2 +- 3 files changed, 126 insertions(+), 2 deletions(-) diff --git a/module/pipeline_step_db_query_cached.go b/module/pipeline_step_db_query_cached.go index a7e5b3d8..2b40fe55 100644 --- a/module/pipeline_step_db_query_cached.go +++ b/module/pipeline_step_db_query_cached.go @@ -63,6 +63,9 @@ func NewDBQueryCachedStepFactory() StepFactory { if err != nil { return nil, fmt.Errorf("db_query_cached step %q: invalid 'cache_ttl' %q: %w", name, ttlStr, err) } + if parsed <= 0 { + return nil, fmt.Errorf("db_query_cached step %q: 'cache_ttl' must be > 0, got %q", name, ttlStr) + } cacheTTL = parsed } @@ -131,7 +134,23 @@ func (s *DBQueryCachedStep) Execute(ctx context.Context, pc *PipelineContext) (* return &StepResult{Output: output}, nil } - // Cache miss or expired — query the database + // Cache miss or expired — acquire write lock and double-check to prevent stampede + s.mu.Lock() + entry, found = s.cache[key] + if found && time.Now().Before(entry.expiresAt) { + // Another goroutine populated the cache while we were waiting for the lock + output := copyMap(entry.value) + s.mu.Unlock() + output["cache_hit"] = true + return &StepResult{Output: output}, nil + } + // Evict expired entry (if any) to prevent unbounded memory growth + if found { + delete(s.cache, key) + } + s.mu.Unlock() + + // Query the database result, err := s.runQuery(ctx, pc) if err != nil { return nil, err diff --git a/module/pipeline_step_db_query_cached_test.go b/module/pipeline_step_db_query_cached_test.go index 21d2c2b5..ab21aebf 100644 --- a/module/pipeline_step_db_query_cached_test.go +++ b/module/pipeline_step_db_query_cached_test.go @@ -300,3 +300,108 @@ func TestDBQueryCachedStep_NoRows(t *testing.T) { t.Errorf("expected no 'id' field when no rows returned") } } + +// TestDBQueryCachedStep_PostgresPlaceholderNormalization verifies that $1-style +// placeholders are converted to ? for SQLite (driver "sqlite" triggers normalization). +func TestDBQueryCachedStep_PostgresPlaceholderNormalization(t *testing.T) { + db := setupTestDB(t) + // Register with SQLite driver name so normalizePlaceholders converts $1 → ? + app := mockAppWithDBDriver("test-db", db, "sqlite") + + factory := NewDBQueryCachedStepFactory() + step, err := factory("lookup-by-dollar", map[string]any{ + "database": "test-db", + "query": "SELECT id, name FROM companies WHERE id = $1", + "params": []any{"c1"}, + "cache_key": "company:c1", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error (placeholder normalization may have failed): %v", err) + } + + if result.Output["name"] != "Acme Corp" { + t.Errorf("expected name='Acme Corp' after $1→? normalization, got %v", result.Output["name"]) + } +} + +// TestDBQueryCachedStep_CacheHitSkipsDB verifies that a cache hit does not re-query +// the database. After the first call succeeds, we close the DB; a second call should +// still succeed from the cache without hitting the closed connection. +func TestDBQueryCachedStep_CacheHitSkipsDB(t *testing.T) { + db := setupTestDB(t) + app := mockAppWithDB("test-db", db) + + factory := NewDBQueryCachedStepFactory() + step, err := factory("lookup-config", map[string]any{ + "database": "test-db", + "query": "SELECT id, name FROM companies WHERE id = ?", + "params": []any{"c1"}, + "cache_key": "company:c1", + "cache_ttl": "5m", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + + // First call — populates the cache + first, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("first execute error: %v", err) + } + if first.Output["cache_hit"] != false { + t.Errorf("expected cache_hit=false on first call") + } + + // Close the DB — any DB access from here will fail + if err := db.Close(); err != nil { + t.Fatalf("failed to close db: %v", err) + } + + // Second call — must be served from cache, not from the (now-closed) DB + second, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("second execute should have hit cache, got error: %v", err) + } + if second.Output["cache_hit"] != true { + t.Errorf("expected cache_hit=true on second call after DB closed") + } + if second.Output["name"] != "Acme Corp" { + t.Errorf("expected name='Acme Corp' from cache, got %v", second.Output["name"]) + } +} + +// TestDBQueryCachedStep_ZeroTTLRejected verifies that cache_ttl=0 is rejected. +func TestDBQueryCachedStep_ZeroTTLRejected(t *testing.T) { + factory := NewDBQueryCachedStepFactory() + _, err := factory("bad", map[string]any{ + "database": "db", + "query": "SELECT 1", + "cache_key": "k", + "cache_ttl": "0s", + }, nil) + if err == nil { + t.Fatal("expected error for zero cache_ttl") + } +} + +// TestDBQueryCachedStep_NegativeTTLRejected verifies that a negative cache_ttl is rejected. +func TestDBQueryCachedStep_NegativeTTLRejected(t *testing.T) { + factory := NewDBQueryCachedStepFactory() + _, err := factory("bad", map[string]any{ + "database": "db", + "query": "SELECT 1", + "cache_key": "k", + "cache_ttl": "-1s", + }, nil) + if err == nil { + t.Fatal("expected error for negative cache_ttl") + } +} diff --git a/schema/module_schema.go b/schema/module_schema.go index 9ab8fdb2..e14a2ae7 100644 --- a/schema/module_schema.go +++ b/schema/module_schema.go @@ -999,7 +999,7 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Outputs: []ServiceIODef{{Name: "result", Type: "StepResult", Description: "Query result fields as top-level keys plus cache_hit boolean"}}, ConfigFields: []ConfigFieldDef{ {Key: "database", Label: "Database", Type: FieldTypeString, Required: true, Description: "Name of the database service (must implement DBProvider)", Placeholder: "db", InheritFrom: "dependency.name"}, - {Key: "query", Label: "SQL Query", Type: FieldTypeSQL, Required: true, Description: "Parameterized SQL SELECT query (use $N or ? placeholders, no template expressions allowed)", Placeholder: "SELECT backend_url, settings FROM routing_config WHERE tenant_id = $1 LIMIT 1"}, + {Key: "query", Label: "SQL Query", Type: FieldTypeSQL, Required: true, Description: "Parameterized SQL SELECT query using $N placeholders (e.g. $1, $2); automatically converted to ? for SQLite drivers. No template expressions allowed.", Placeholder: "SELECT backend_url, settings FROM routing_config WHERE tenant_id = $1 LIMIT 1"}, {Key: "params", Label: "Parameters", Type: FieldTypeArray, ArrayItemType: "string", Description: "Template-resolved parameter values for query placeholders"}, {Key: "cache_key", Label: "Cache Key", Type: FieldTypeString, Required: true, Description: "Template-resolved key used to store/retrieve the cached result", Placeholder: "tenant_config:{{.steps.parse.headers.X-Tenant-Id}}"}, {Key: "cache_ttl", Label: "Cache TTL", Type: FieldTypeString, DefaultValue: "5m", Description: "Duration string for how long to cache the result (e.g. '5m', '30s', '1h')", Placeholder: "5m"},