Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions module/pipeline_step_db_dynamic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package module

import (
"fmt"
"strings"
)

// validateSQLIdentifier checks that s is safe to interpolate directly into SQL as an
// identifier (e.g. a table name). Only ASCII letters (A-Z, a-z), ASCII digits (0-9),
// underscores (_) and hyphens (-) are permitted. This strict allowlist prevents SQL
// injection when dynamic values are embedded in queries via allow_dynamic_sql.
func validateSQLIdentifier(s string) error {
if s == "" {
return fmt.Errorf("dynamic SQL identifier must not be empty")
}
for _, c := range s {
if (c < 'a' || c > 'z') &&
(c < 'A' || c > 'Z') &&
(c < '0' || c > '9') &&
c != '_' && c != '-' {
return fmt.Errorf("dynamic SQL identifier %q contains unsafe character %q (only ASCII letters, digits, underscores and hyphens are allowed)", s, string(c))
}
}
return nil
}

// resolveDynamicSQL resolves every {{ }} template expression found in query against
// pc and validates that each resolved value is a safe SQL identifier. The validated
// values are substituted back into the query in left-to-right order and the final
// SQL string is returned.
//
// Each occurrence of a template expression is resolved independently, so
// non-deterministic functions like {{uuid}} or {{now}} produce a distinct value
// per occurrence.
//
// This is only called when allow_dynamic_sql is true (explicit opt-in). Callers
// are responsible for ensuring that the query has already passed template parsing.
func resolveDynamicSQL(tmpl *TemplateEngine, query string, pc *PipelineContext) (string, error) {
if !strings.Contains(query, "{{") {
return query, nil
}

// Process template expressions in left-to-right order. Each occurrence is
// resolved and validated independently to preserve correct semantics for
// non-deterministic template functions (e.g. {{uuid}}, {{now}}).
var result strings.Builder
rest := query
for {
openIdx := strings.Index(rest, "{{")
if openIdx < 0 {
result.WriteString(rest)
break
}
closeIdx := strings.Index(rest[openIdx:], "}}")
if closeIdx < 0 {
return "", fmt.Errorf("dynamic SQL: unclosed template action in query (missing closing '}}')")
}
closeIdx += openIdx

// Write the literal SQL text before this expression.
result.WriteString(rest[:openIdx])

expr := rest[openIdx : closeIdx+2]

resolved, err := tmpl.Resolve(expr, pc)
if err != nil {
return "", fmt.Errorf("dynamic SQL: failed to resolve %q: %w", expr, err)
}
if err := validateSQLIdentifier(resolved); err != nil {
return "", fmt.Errorf("dynamic SQL: %w", err)
}
result.WriteString(resolved)
rest = rest[closeIdx+2:]
}
return result.String(), nil
}
50 changes: 33 additions & 17 deletions module/pipeline_step_db_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import (

// DBExecStep executes parameterized SQL INSERT/UPDATE/DELETE against a named database service.
type DBExecStep struct {
name string
database string
query string
params []string
ignoreError bool
app modular.Application
tmpl *TemplateEngine
name string
database string
query string
params []string
ignoreError bool
allowDynamicSQL bool
app modular.Application
tmpl *TemplateEngine
}

// NewDBExecStepFactory returns a StepFactory that creates DBExecStep instances.
Expand All @@ -32,8 +33,10 @@ func NewDBExecStepFactory() StepFactory {
return nil, fmt.Errorf("db_exec step %q: 'query' is required", name)
}

// Safety: reject template expressions in SQL to prevent injection
if strings.Contains(query, "{{") {
// Safety: reject template expressions in SQL to prevent injection,
// unless allow_dynamic_sql is explicitly enabled.
allowDynamicSQL, _ := config["allow_dynamic_sql"].(bool)
if !allowDynamicSQL && strings.Contains(query, "{{") {
return nil, fmt.Errorf("db_exec step %q: query must not contain template expressions (use params instead)", name)
}

Expand All @@ -51,20 +54,33 @@ func NewDBExecStepFactory() StepFactory {
ignoreError, _ := config["ignore_error"].(bool)

return &DBExecStep{
name: name,
database: database,
query: query,
params: params,
ignoreError: ignoreError,
app: app,
tmpl: NewTemplateEngine(),
name: name,
database: database,
query: query,
params: params,
ignoreError: ignoreError,
allowDynamicSQL: allowDynamicSQL,
app: app,
tmpl: NewTemplateEngine(),
}, nil
}
}

func (s *DBExecStep) Name() string { return s.name }

func (s *DBExecStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) {
// Resolve template expressions in the query early (before any DB access) when
// dynamic SQL is enabled. This validates resolved identifiers against an
// allowlist before any database interaction.
query := s.query
if s.allowDynamicSQL {
var err error
query, err = resolveDynamicSQL(s.tmpl, query, pc)
if err != nil {
return nil, fmt.Errorf("db_exec step %q: %w", s.name, err)
}
}

if s.app == nil {
return nil, fmt.Errorf("db_exec step %q: no application context", s.name)
}
Expand Down Expand Up @@ -102,7 +118,7 @@ func (s *DBExecStep) Execute(_ context.Context, pc *PipelineContext) (*StepResul

// Normalize SQL placeholders: users write $1,$2,$3 (PostgreSQL style),
// engine converts to ? for SQLite automatically.
query := normalizePlaceholders(s.query, driver)
query = normalizePlaceholders(query, driver)

// Execute statement
result, err := db.Exec(query, resolvedParams...)
Expand Down
63 changes: 63 additions & 0 deletions module/pipeline_step_db_exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package module
import (
"context"
"database/sql"
"strings"
"testing"

_ "modernc.org/sqlite"
Expand Down Expand Up @@ -195,6 +196,68 @@ func TestDBExecStep_RejectsTemplateInQuery(t *testing.T) {
}
}

func TestDBExecStep_DynamicTableName(t *testing.T) {
db, err := sql.Open("sqlite", ":memory:")
if err != nil {
t.Fatalf("open db: %v", err)
}
defer db.Close()

_, err = db.Exec(`CREATE TABLE items_alpha (id TEXT PRIMARY KEY, name TEXT NOT NULL)`)
if err != nil {
t.Fatalf("create table: %v", err)
}

app := mockAppWithDB("test-db", db)
factory := NewDBExecStepFactory()
step, err := factory("dynamic-insert", map[string]any{
"database": "test-db",
"query": `INSERT INTO items_{{.steps.auth.tenant}} (id, name) VALUES (?, ?)`,
"params": []any{"i1", "Widget"},
"allow_dynamic_sql": true,
}, app)
if err != nil {
t.Fatalf("factory error: %v", err)
}

pc := NewPipelineContext(nil, nil)
pc.MergeStepOutput("auth", map[string]any{"tenant": "alpha"})

result, err := step.Execute(context.Background(), pc)
if err != nil {
t.Fatalf("execute error: %v", err)
}

affected, _ := result.Output["affected_rows"].(int64)
if affected != 1 {
t.Errorf("expected affected_rows=1, got %v", affected)
}
}

func TestDBExecStep_DynamicSQL_RejectsInjection(t *testing.T) {
factory := NewDBExecStepFactory()
step, err := factory("injection-exec", map[string]any{
"database": "test-db",
"query": `DELETE FROM items_{{.steps.auth.tenant}} WHERE id = ?`,
"params": []any{"i1"},
"allow_dynamic_sql": true,
}, nil)
if err != nil {
t.Fatalf("factory error: %v", err)
}

pc := NewPipelineContext(nil, nil)
pc.MergeStepOutput("auth", map[string]any{"tenant": "alpha'; DROP TABLE items;--"})

_, err = step.Execute(context.Background(), pc)
if err == nil {
t.Fatal("expected error for unsafe SQL identifier")
}
if !strings.Contains(err.Error(), "unsafe character") {
t.Errorf("expected 'unsafe character' in error, got: %v", err)
}
}

func TestDBExecStep_MissingDatabase(t *testing.T) {
factory := NewDBExecStepFactory()
_, err := factory("no-db", map[string]any{
Expand Down
50 changes: 33 additions & 17 deletions module/pipeline_step_db_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ type DBDriverProvider interface {

// DBQueryStep executes a parameterized SQL SELECT against a named database service.
type DBQueryStep struct {
name string
database string
query string
params []string
mode string // "list" or "single"
app modular.Application
tmpl *TemplateEngine
name string
database string
query string
params []string
mode string // "list" or "single"
allowDynamicSQL bool
app modular.Application
tmpl *TemplateEngine
}

// NewDBQueryStepFactory returns a StepFactory that creates DBQueryStep instances.
Expand All @@ -47,8 +48,10 @@ func NewDBQueryStepFactory() StepFactory {
return nil, fmt.Errorf("db_query step %q: 'query' is required", name)
}

// Safety: reject template expressions in SQL to prevent injection
if strings.Contains(query, "{{") {
// Safety: reject template expressions in SQL to prevent injection,
// unless allow_dynamic_sql is explicitly enabled.
allowDynamicSQL, _ := config["allow_dynamic_sql"].(bool)
if !allowDynamicSQL && strings.Contains(query, "{{") {
return nil, fmt.Errorf("db_query step %q: query must not contain template expressions (use params instead)", name)
}

Expand All @@ -72,20 +75,33 @@ func NewDBQueryStepFactory() StepFactory {
}

return &DBQueryStep{
name: name,
database: database,
query: query,
params: params,
mode: mode,
app: app,
tmpl: NewTemplateEngine(),
name: name,
database: database,
query: query,
params: params,
mode: mode,
allowDynamicSQL: allowDynamicSQL,
app: app,
tmpl: NewTemplateEngine(),
}, nil
}
}

func (s *DBQueryStep) Name() string { return s.name }

func (s *DBQueryStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) {
// Resolve template expressions in the query early (before any DB access) when
// dynamic SQL is enabled. This validates resolved identifiers against an
// allowlist before any database interaction.
query := s.query
if s.allowDynamicSQL {
var err error
query, err = resolveDynamicSQL(s.tmpl, query, pc)
if err != nil {
return nil, fmt.Errorf("db_query step %q: %w", s.name, err)
}
}

// Resolve database service
if s.app == nil {
return nil, fmt.Errorf("db_query step %q: no application context", s.name)
Expand Down Expand Up @@ -124,7 +140,7 @@ func (s *DBQueryStep) Execute(_ context.Context, pc *PipelineContext) (*StepResu

// Normalize SQL placeholders: users write $1,$2,$3 (PostgreSQL style),
// engine converts to ? for SQLite automatically.
query := normalizePlaceholders(s.query, driver)
query = normalizePlaceholders(query, driver)

// Execute query
rows, err := db.Query(query, resolvedParams...)
Expand Down
Loading
Loading