diff --git a/module/database_partitioned.go b/module/database_partitioned.go index fe02eec1..e5dc5add 100644 --- a/module/database_partitioned.go +++ b/module/database_partitioned.go @@ -45,12 +45,62 @@ type PartitionManager interface { SyncPartitionsFromSource(ctx context.Context) error } +// MultiPartitionManager extends PartitionManager for databases that can have +// more than one partition key configuration (e.g. tenant-partitioned tables +// AND api-version-partitioned tables in the same database). It is implemented +// by PartitionedDatabase; the additional methods are primarily meaningful when +// multiple partition configs are configured. +type MultiPartitionManager interface { + PartitionManager + // PartitionConfigs returns all configured partition groups. + PartitionConfigs() []PartitionConfig + // EnsurePartitionForKey creates partitions for the specified partition key + // and value on all tables that belong to that partition config. Returns an + // error if no config with that partitionKey is registered. + EnsurePartitionForKey(ctx context.Context, partitionKey, value string) error + // SyncPartitionsForKey syncs partitions for the specified partition key's + // configured source table. No-ops if no sourceTable is configured for that + // key. Returns an error if no config with that partitionKey is registered. + SyncPartitionsForKey(ctx context.Context, partitionKey string) error +} + +// PartitionConfig holds per-partition-key configuration within a +// database.partitioned module. Multiple PartitionConfig entries allow a single +// module to manage tables that are partitioned by different columns or with +// different partition types. +type PartitionConfig struct { + // PartitionKey is the column name used for partitioning (e.g. tenant_id). + PartitionKey string `json:"partitionKey" yaml:"partitionKey"` + // Tables lists the tables that are partitioned by this key. + Tables []string `json:"tables" yaml:"tables"` + // PartitionType is "list" (default) or "range". + PartitionType string `json:"partitionType" yaml:"partitionType"` + // PartitionNameFormat is a template for generating partition table names. + // Supports {table} and {tenant} placeholders. Default: "{table}_{tenant}". + PartitionNameFormat string `json:"partitionNameFormat" yaml:"partitionNameFormat"` + // SourceTable is the table queried by SyncPartitionsFromSource for this key. + SourceTable string `json:"sourceTable" yaml:"sourceTable"` + // SourceColumn overrides the column queried in SourceTable. Defaults to PartitionKey. + SourceColumn string `json:"sourceColumn" yaml:"sourceColumn"` +} + // PartitionedDatabaseConfig holds configuration for the database.partitioned module. +// +// Single-partition mode (backward-compatible): set PartitionKey, Tables, and +// optionally PartitionType, PartitionNameFormat, SourceTable, SourceColumn at +// the top level. +// +// Multi-partition mode: set Partitions to a list of PartitionConfig entries. +// Each entry is an independent partition group with its own key, tables, type, +// naming format and optional source. The top-level single-partition fields are +// ignored when Partitions is non-empty. type PartitionedDatabaseConfig struct { - Driver string `json:"driver" yaml:"driver"` - DSN string `json:"dsn" yaml:"dsn"` - MaxOpenConns int `json:"maxOpenConns" yaml:"maxOpenConns"` - MaxIdleConns int `json:"maxIdleConns" yaml:"maxIdleConns"` + Driver string `json:"driver" yaml:"driver"` + DSN string `json:"dsn" yaml:"dsn"` + MaxOpenConns int `json:"maxOpenConns" yaml:"maxOpenConns"` + MaxIdleConns int `json:"maxIdleConns" yaml:"maxIdleConns"` + + // ── Single-partition fields (used when Partitions is empty) ────────────── PartitionKey string `json:"partitionKey" yaml:"partitionKey"` Tables []string `json:"tables" yaml:"tables"` // PartitionType is "list" (default) or "range". @@ -69,19 +119,40 @@ type PartitionedDatabaseConfig struct { // SourceColumn overrides the column queried in sourceTable. // Defaults to PartitionKey if empty. SourceColumn string `json:"sourceColumn" yaml:"sourceColumn"` + + // ── Multi-partition mode ───────────────────────────────────────────────── + // Partitions lists independent partition key configurations. When non-empty, + // the single-partition fields above are ignored. + Partitions []PartitionConfig `json:"partitions" yaml:"partitions"` } // PartitionedDatabase wraps WorkflowDatabase and adds PostgreSQL partition // management. It satisfies DBProvider, DBDriverProvider, PartitionKeyProvider, -// and PartitionManager. +// PartitionManager, and MultiPartitionManager. type PartitionedDatabase struct { - name string - config PartitionedDatabaseConfig - base *WorkflowDatabase - mu sync.RWMutex + name string + config PartitionedDatabaseConfig + partitions []PartitionConfig // normalized; always len >= 1 after construction + base *WorkflowDatabase + mu sync.RWMutex +} + +// normalizePartitionConfig applies defaults to a PartitionConfig and returns the result. +func normalizePartitionConfig(p PartitionConfig) PartitionConfig { + if p.PartitionType == "" { + p.PartitionType = PartitionTypeList + } + if p.PartitionNameFormat == "" { + p.PartitionNameFormat = "{table}_{tenant}" + } + return p } // NewPartitionedDatabase creates a new PartitionedDatabase module. +// +// When cfg.Partitions is non-empty the entries are used as-is (with defaults +// applied). Otherwise a single PartitionConfig is built from the top-level +// PartitionKey / Tables / … fields for backward compatibility. func NewPartitionedDatabase(name string, cfg PartitionedDatabaseConfig) *PartitionedDatabase { dbConfig := DatabaseConfig{ Driver: cfg.Driver, @@ -89,16 +160,28 @@ func NewPartitionedDatabase(name string, cfg PartitionedDatabaseConfig) *Partiti MaxOpenConns: cfg.MaxOpenConns, MaxIdleConns: cfg.MaxIdleConns, } - if cfg.PartitionType == "" { - cfg.PartitionType = PartitionTypeList - } - if cfg.PartitionNameFormat == "" { - cfg.PartitionNameFormat = "{table}_{tenant}" + + var partitions []PartitionConfig + if len(cfg.Partitions) > 0 { + for _, p := range cfg.Partitions { + partitions = append(partitions, normalizePartitionConfig(p)) + } + } else { + partitions = []PartitionConfig{normalizePartitionConfig(PartitionConfig{ + PartitionKey: cfg.PartitionKey, + Tables: cfg.Tables, + PartitionType: cfg.PartitionType, + PartitionNameFormat: cfg.PartitionNameFormat, + SourceTable: cfg.SourceTable, + SourceColumn: cfg.SourceColumn, + })} } + return &PartitionedDatabase{ - name: name, - config: cfg, - base: NewWorkflowDatabase(name+"._base", dbConfig), + name: name, + config: cfg, + partitions: partitions, + base: NewWorkflowDatabase(name+"._base", dbConfig), } } @@ -147,40 +230,67 @@ func (p *PartitionedDatabase) DriverName() string { } // PartitionKey returns the column name used for partitioning (satisfies PartitionKeyProvider). +// When multiple partition configs are defined, it returns the first config's key. func (p *PartitionedDatabase) PartitionKey() string { - return p.config.PartitionKey + if len(p.partitions) > 0 { + return p.partitions[0].PartitionKey + } + return "" } -// PartitionType returns the partition type ("list" or "range"). +// PartitionType returns the partition type of the primary partition config ("list" or "range"). func (p *PartitionedDatabase) PartitionType() string { - return p.config.PartitionType + if len(p.partitions) > 0 { + return p.partitions[0].PartitionType + } + return PartitionTypeList } -// PartitionNameFormat returns the configured partition name format template. +// PartitionNameFormat returns the partition name format of the primary partition config. func (p *PartitionedDatabase) PartitionNameFormat() string { - return p.config.PartitionNameFormat + if len(p.partitions) > 0 { + return p.partitions[0].PartitionNameFormat + } + return "{table}_{tenant}" } // PartitionTableName resolves the partition table name for a given parent -// table and tenant value using the configured partitionNameFormat. +// table and tenant value using the primary partition config's partitionNameFormat. func (p *PartitionedDatabase) PartitionTableName(parentTable, tenantValue string) string { - suffix := sanitizePartitionSuffix(tenantValue) - name := p.config.PartitionNameFormat - name = strings.ReplaceAll(name, "{table}", parentTable) - name = strings.ReplaceAll(name, "{tenant}", suffix) - return name + if len(p.partitions) == 0 { + return parentTable + } + return applyPartitionNameFormat(p.partitions[0].PartitionNameFormat, parentTable, tenantValue) } -// Tables returns the list of tables managed by this partitioned database. +// Tables returns the list of tables managed by the primary partition config. func (p *PartitionedDatabase) Tables() []string { - result := make([]string, len(p.config.Tables)) - copy(result, p.config.Tables) + if len(p.partitions) == 0 { + return nil + } + result := make([]string, len(p.partitions[0].Tables)) + copy(result, p.partitions[0].Tables) + return result +} + +// PartitionConfigs returns all configured partition groups (satisfies MultiPartitionManager). +// It returns a deep copy so callers cannot mutate the internal state. +func (p *PartitionedDatabase) PartitionConfigs() []PartitionConfig { + result := make([]PartitionConfig, len(p.partitions)) + for i, cfg := range p.partitions { + result[i] = cfg + if cfg.Tables != nil { + tablesCopy := make([]string, len(cfg.Tables)) + copy(tablesCopy, cfg.Tables) + result[i].Tables = tablesCopy + } + } return result } -// EnsurePartition creates a partition for the given tenant value on all -// configured tables. The operation is idempotent — IF NOT EXISTS prevents errors -// when the partition already exists. +// EnsurePartition creates a partition for the given value on all tables managed +// by the primary partition config. The operation is idempotent — IF NOT EXISTS +// prevents errors when the partition already exists. // // For LIST partitions: CREATE TABLE IF NOT EXISTS PARTITION OF FOR VALUES IN ('') // For RANGE partitions: CREATE TABLE IF NOT EXISTS PARTITION OF
FOR VALUES FROM ('') TO ('\x00') @@ -188,6 +298,27 @@ func (p *PartitionedDatabase) Tables() []string { // Only PostgreSQL (pgx, pgx/v5, postgres) is supported. The method validates // the tenant value and table/column names to prevent SQL injection. func (p *PartitionedDatabase) EnsurePartition(ctx context.Context, tenantValue string) error { + if len(p.partitions) == 0 { + return fmt.Errorf("partitioned database %q: no partition config defined", p.name) + } + return p.ensurePartitionForConfig(ctx, p.partitions[0], tenantValue) +} + +// EnsurePartitionForKey creates partitions for the specified partition key and +// value on all tables that belong to that partition config (satisfies +// MultiPartitionManager). Returns an error if no config with that partitionKey +// is registered. +func (p *PartitionedDatabase) EnsurePartitionForKey(ctx context.Context, partitionKey, value string) error { + cfg, ok := p.partitionConfigByKey(partitionKey) + if !ok { + return fmt.Errorf("partitioned database %q: no partition config found for key %q", p.name, partitionKey) + } + return p.ensurePartitionForConfig(ctx, cfg, value) +} + +// ensurePartitionForConfig is the shared implementation for EnsurePartition and +// EnsurePartitionForKey. It validates inputs and executes the DDL for each table. +func (p *PartitionedDatabase) ensurePartitionForConfig(ctx context.Context, cfg PartitionConfig, tenantValue string) error { if !validPartitionValue.MatchString(tenantValue) { return fmt.Errorf("partitioned database %q: invalid tenant value %q (must match [a-zA-Z0-9_.\\-]+)", p.name, tenantValue) } @@ -196,7 +327,7 @@ func (p *PartitionedDatabase) EnsurePartition(ctx context.Context, tenantValue s return fmt.Errorf("partitioned database %q: driver %q does not support partitioning (use pgx, pgx/v5, or postgres)", p.name, p.config.Driver) } - if err := validateIdentifier(p.config.PartitionKey); err != nil { + if err := validateIdentifier(cfg.PartitionKey); err != nil { return fmt.Errorf("partitioned database %q: invalid partition_key: %w", p.name, err) } @@ -208,12 +339,12 @@ func (p *PartitionedDatabase) EnsurePartition(ctx context.Context, tenantValue s p.mu.Lock() defer p.mu.Unlock() - for _, table := range p.config.Tables { + for _, table := range cfg.Tables { if err := validateIdentifier(table); err != nil { return fmt.Errorf("partitioned database %q: invalid table name: %w", p.name, err) } - partitionName := p.PartitionTableName(table, tenantValue) + partitionName := applyPartitionNameFormat(cfg.PartitionNameFormat, table, tenantValue) // Validate the computed partition name is a safe identifier. if err := validateIdentifier(partitionName); err != nil { @@ -225,7 +356,7 @@ func (p *PartitionedDatabase) EnsurePartition(ctx context.Context, tenantValue s // it cannot contain single-quote characters. safeValue := strings.ReplaceAll(tenantValue, "'", "") - switch p.config.PartitionType { + switch cfg.PartitionType { case PartitionTypeList: ddl = fmt.Sprintf( "CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES IN ('%s')", @@ -242,7 +373,7 @@ func (p *PartitionedDatabase) EnsurePartition(ctx context.Context, tenantValue s ) default: return fmt.Errorf("partitioned database %q: unsupported partition type %q (use %q or %q)", - p.name, p.config.PartitionType, PartitionTypeList, PartitionTypeRange) + p.name, cfg.PartitionType, PartitionTypeList, PartitionTypeRange) } if _, err := db.ExecContext(ctx, ddl); err != nil { @@ -255,23 +386,45 @@ func (p *PartitionedDatabase) EnsurePartition(ctx context.Context, tenantValue s } // SyncPartitionsFromSource queries the configured sourceTable for all distinct -// tenant values and ensures that partitions exist for each one. -// This enables automatic partition creation when new tenants are added to a -// source table (e.g., a "tenants" table). +// tenant values and ensures that partitions exist for each one. When multiple +// partition configs are defined, all configs with a sourceTable are synced. // -// No-ops if sourceTable is not configured. +// No-ops if no sourceTable is configured in any partition config. func (p *PartitionedDatabase) SyncPartitionsFromSource(ctx context.Context) error { - if p.config.SourceTable == "" { + for _, cfg := range p.partitions { + if err := p.syncPartitionConfigFromSource(ctx, cfg); err != nil { + return err + } + } + return nil +} + +// SyncPartitionsForKey syncs partitions for the specified partition key's +// configured source table (satisfies MultiPartitionManager). No-ops if no +// sourceTable is configured for that key. Returns an error if no config with +// that partitionKey is registered. +func (p *PartitionedDatabase) SyncPartitionsForKey(ctx context.Context, partitionKey string) error { + cfg, ok := p.partitionConfigByKey(partitionKey) + if !ok { + return fmt.Errorf("partitioned database %q: no partition config found for key %q", p.name, partitionKey) + } + return p.syncPartitionConfigFromSource(ctx, cfg) +} + +// syncPartitionConfigFromSource is the shared implementation for +// SyncPartitionsFromSource and SyncPartitionsForKey. +func (p *PartitionedDatabase) syncPartitionConfigFromSource(ctx context.Context, cfg PartitionConfig) error { + if cfg.SourceTable == "" { return nil } - if err := validateIdentifier(p.config.SourceTable); err != nil { + if err := validateIdentifier(cfg.SourceTable); err != nil { return fmt.Errorf("partitioned database %q: invalid source table: %w", p.name, err) } - srcCol := p.config.SourceColumn + srcCol := cfg.SourceColumn if srcCol == "" { - srcCol = p.config.PartitionKey + srcCol = cfg.PartitionKey } if err := validateIdentifier(srcCol); err != nil { return fmt.Errorf("partitioned database %q: invalid source column: %w", p.name, err) @@ -284,29 +437,29 @@ func (p *PartitionedDatabase) SyncPartitionsFromSource(ctx context.Context) erro // All identifiers (srcCol, SourceTable) have been validated by validateIdentifier above. query := fmt.Sprintf("SELECT DISTINCT %s FROM %s WHERE %s IS NOT NULL", //nolint:gosec // G201: identifiers validated above - srcCol, p.config.SourceTable, srcCol) + srcCol, cfg.SourceTable, srcCol) rows, err := db.QueryContext(ctx, query) if err != nil { return fmt.Errorf("partitioned database %q: failed to query source table %q: %w", - p.name, p.config.SourceTable, err) + p.name, cfg.SourceTable, err) } defer rows.Close() - var tenants []string + var values []string for rows.Next() { var val string if err := rows.Scan(&val); err != nil { - return fmt.Errorf("partitioned database %q: failed to scan tenant value: %w", p.name, err) + return fmt.Errorf("partitioned database %q: failed to scan partition value: %w", p.name, err) } - tenants = append(tenants, val) + values = append(values, val) } if err := rows.Err(); err != nil { return fmt.Errorf("partitioned database %q: row iteration error: %w", p.name, err) } - for _, tenant := range tenants { - if err := p.EnsurePartition(ctx, tenant); err != nil { + for _, val := range values { + if err := p.ensurePartitionForConfig(ctx, cfg, val); err != nil { return err } } @@ -314,6 +467,16 @@ func (p *PartitionedDatabase) SyncPartitionsFromSource(ctx context.Context) erro return nil } +// partitionConfigByKey returns the PartitionConfig for the given partition key, if any. +func (p *PartitionedDatabase) partitionConfigByKey(partitionKey string) (PartitionConfig, bool) { + for _, cfg := range p.partitions { + if cfg.PartitionKey == partitionKey { + return cfg, true + } + } + return PartitionConfig{}, false +} + // isSupportedPartitionDriver returns true for PostgreSQL-compatible drivers. func isSupportedPartitionDriver(driver string) bool { switch driver { @@ -329,3 +492,12 @@ func sanitizePartitionSuffix(tenantValue string) string { r := strings.NewReplacer("-", "_", ".", "_") return r.Replace(tenantValue) } + +// applyPartitionNameFormat applies a partition name format template to a table +// name and tenant value. Supports {table} and {tenant} placeholders. +func applyPartitionNameFormat(format, parentTable, tenantValue string) string { + suffix := sanitizePartitionSuffix(tenantValue) + name := strings.ReplaceAll(format, "{table}", parentTable) + name = strings.ReplaceAll(name, "{tenant}", suffix) + return name +} diff --git a/module/database_partitioned_test.go b/module/database_partitioned_test.go index 5298d13d..b754a508 100644 --- a/module/database_partitioned_test.go +++ b/module/database_partitioned_test.go @@ -3,6 +3,7 @@ package module import ( "context" "database/sql" + "fmt" "strings" "testing" ) @@ -193,6 +194,470 @@ func TestIsSupportedPartitionDriver(t *testing.T) { } } +// testMultiPartitionManager extends testPartitionManager with MultiPartitionManager support. +type testMultiPartitionManager struct { + testPartitionManager + configs []PartitionConfig + ensureForKeyCalledWith []struct{ key, value string } + syncForKeyCalledWith []string + ensureForKeyErr error + syncForKeyErr error +} + +func (m *testMultiPartitionManager) PartitionConfigs() []PartitionConfig { return m.configs } + +func (m *testMultiPartitionManager) EnsurePartitionForKey(_ context.Context, partitionKey, value string) error { + m.ensureForKeyCalledWith = append(m.ensureForKeyCalledWith, struct{ key, value string }{partitionKey, value}) + return m.ensureForKeyErr +} + +func (m *testMultiPartitionManager) SyncPartitionsForKey(_ context.Context, partitionKey string) error { + m.syncForKeyCalledWith = append(m.syncForKeyCalledWith, partitionKey) + return m.syncForKeyErr +} + +// ─── Multi-partition config tests ──────────────────────────────────────────── + +func TestPartitionedDatabase_MultiPartition_NormalizesDefaults(t *testing.T) { + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + Partitions: []PartitionConfig{ + {PartitionKey: "tenant_id", Tables: []string{"forms"}}, + {PartitionKey: "api_version", Tables: []string{"contracts"}, PartitionType: PartitionTypeRange}, + }, + } + pd := NewPartitionedDatabase("db", cfg) + + cfgs := pd.PartitionConfigs() + if len(cfgs) != 2 { + t.Fatalf("expected 2 partition configs, got %d", len(cfgs)) + } + + // First config gets default type and format + if cfgs[0].PartitionType != PartitionTypeList { + t.Errorf("expected first partition type %q, got %q", PartitionTypeList, cfgs[0].PartitionType) + } + if cfgs[0].PartitionNameFormat != "{table}_{tenant}" { + t.Errorf("expected first partition format %q, got %q", "{table}_{tenant}", cfgs[0].PartitionNameFormat) + } + + // Second config keeps explicit type + if cfgs[1].PartitionType != PartitionTypeRange { + t.Errorf("expected second partition type %q, got %q", PartitionTypeRange, cfgs[1].PartitionType) + } +} + +func TestPartitionedDatabase_MultiPartition_PrimaryKeyIsFirst(t *testing.T) { + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + Partitions: []PartitionConfig{ + {PartitionKey: "tenant_id", Tables: []string{"forms"}}, + {PartitionKey: "api_version", Tables: []string{"contracts"}}, + }, + } + pd := NewPartitionedDatabase("db", cfg) + + if pd.PartitionKey() != "tenant_id" { + t.Errorf("expected PartitionKey() = %q, got %q", "tenant_id", pd.PartitionKey()) + } +} + +func TestPartitionedDatabase_MultiPartition_TablesReturnsPrimary(t *testing.T) { + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + Partitions: []PartitionConfig{ + {PartitionKey: "tenant_id", Tables: []string{"forms", "submissions"}}, + {PartitionKey: "api_version", Tables: []string{"contracts"}}, + }, + } + pd := NewPartitionedDatabase("db", cfg) + + tables := pd.Tables() + if len(tables) != 2 || tables[0] != "forms" || tables[1] != "submissions" { + t.Errorf("unexpected Tables() result: %v", tables) + } +} + +func TestPartitionedDatabase_MultiPartition_SinglePartitionFieldsIgnored(t *testing.T) { + // When Partitions is set, top-level single-partition fields must be ignored. + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + PartitionKey: "should_be_ignored", + Tables: []string{"ignored_table"}, + Partitions: []PartitionConfig{ + {PartitionKey: "tenant_id", Tables: []string{"forms"}}, + }, + } + pd := NewPartitionedDatabase("db", cfg) + + if pd.PartitionKey() != "tenant_id" { + t.Errorf("expected PartitionKey() = %q, got %q", "tenant_id", pd.PartitionKey()) + } + if len(pd.Tables()) != 1 || pd.Tables()[0] != "forms" { + t.Errorf("unexpected Tables(): %v", pd.Tables()) + } +} + +func TestPartitionedDatabase_EnsurePartitionForKey_InvalidKey(t *testing.T) { + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + Partitions: []PartitionConfig{ + {PartitionKey: "tenant_id", Tables: []string{"forms"}}, + }, + } + pd := NewPartitionedDatabase("db", cfg) + + err := pd.EnsurePartitionForKey(context.Background(), "unknown_key", "val") + if err == nil { + t.Fatal("expected error for unknown partition key") + } + if !strings.Contains(err.Error(), "no partition config found for key") { + t.Errorf("unexpected error: %v", err) + } +} + +func TestPartitionedDatabase_SyncPartitionsForKey_InvalidKey(t *testing.T) { + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + Partitions: []PartitionConfig{ + {PartitionKey: "tenant_id", Tables: []string{"forms"}}, + }, + } + pd := NewPartitionedDatabase("db", cfg) + + err := pd.SyncPartitionsForKey(context.Background(), "unknown_key") + if err == nil { + t.Fatal("expected error for unknown partition key") + } + if !strings.Contains(err.Error(), "no partition config found for key") { + t.Errorf("unexpected error: %v", err) + } +} + +func TestPartitionedDatabase_SyncPartitionsForKey_NoSourceTable(t *testing.T) { + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + Partitions: []PartitionConfig{ + {PartitionKey: "tenant_id", Tables: []string{"forms"}}, + }, + } + pd := NewPartitionedDatabase("db", cfg) + + // No sourceTable on the config → no-op + err := pd.SyncPartitionsForKey(context.Background(), "tenant_id") + if err != nil { + t.Fatalf("expected no-op, got: %v", err) + } +} + +func TestPartitionedDatabase_SyncPartitionsForKey_InvalidSourceTable(t *testing.T) { + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + Partitions: []PartitionConfig{ + {PartitionKey: "tenant_id", Tables: []string{"forms"}, SourceTable: "invalid table!"}, + }, + } + pd := NewPartitionedDatabase("db", cfg) + + err := pd.SyncPartitionsForKey(context.Background(), "tenant_id") + if err == nil { + t.Fatal("expected error for invalid source table") + } +} + +func TestPartitionedDatabase_PartitionConfigs_ReturnsCopy(t *testing.T) { + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + Partitions: []PartitionConfig{ + {PartitionKey: "tenant_id", Tables: []string{"forms"}}, + {PartitionKey: "api_version", Tables: []string{"contracts"}}, + }, + } + pd := NewPartitionedDatabase("db", cfg) + + cfgs1 := pd.PartitionConfigs() + cfgs1[0].PartitionKey = "mutated" // mutate the returned struct field + cfgs2 := pd.PartitionConfigs() + if cfgs2[0].PartitionKey == "mutated" { + t.Error("PartitionConfigs returned a reference instead of a copy (PartitionKey)") + } +} + +func TestPartitionedDatabase_PartitionConfigs_DeepCopiesTables(t *testing.T) { + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + Partitions: []PartitionConfig{ + {PartitionKey: "tenant_id", Tables: []string{"forms", "submissions"}}, + }, + } + pd := NewPartitionedDatabase("db", cfg) + + cfgs1 := pd.PartitionConfigs() + cfgs1[0].Tables[0] = "mutated_table" // mutate element of the returned Tables slice + cfgs2 := pd.PartitionConfigs() + if cfgs2[0].Tables[0] == "mutated_table" { + t.Error("PartitionConfigs returned a shallow copy: Tables slice element was mutated in internal state") + } +} + +func TestPartitionedDatabase_BackwardCompat_SinglePartition(t *testing.T) { + // Old-style config without Partitions field must behave exactly as before. + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + PartitionKey: "tenant_id", + Tables: []string{"forms", "submissions"}, + PartitionType: PartitionTypeList, + PartitionNameFormat: "{table}_{tenant}", + } + pd := NewPartitionedDatabase("db", cfg) + + cfgs := pd.PartitionConfigs() + if len(cfgs) != 1 { + t.Fatalf("expected 1 partition config for backward compat, got %d", len(cfgs)) + } + if pd.PartitionKey() != "tenant_id" { + t.Errorf("expected PartitionKey = 'tenant_id', got %q", pd.PartitionKey()) + } + if pd.PartitionTableName("forms", "org-alpha") != "forms_org_alpha" { + t.Errorf("unexpected PartitionTableName: %q", pd.PartitionTableName("forms", "org-alpha")) + } +} + +func TestPartitionedDatabase_MultiPartition_EnsurePartitionForKey_InvalidDriver(t *testing.T) { + cfg := PartitionedDatabaseConfig{ + Driver: "sqlite3", + Partitions: []PartitionConfig{ + {PartitionKey: "tenant_id", Tables: []string{"forms"}}, + }, + } + pd := NewPartitionedDatabase("db", cfg) + + err := pd.EnsurePartitionForKey(context.Background(), "tenant_id", "org-alpha") + if err == nil { + t.Fatal("expected error for non-postgres driver") + } +} + +func TestPartitionedDatabase_MultiPartition_EnsurePartitionForKey_InvalidValue(t *testing.T) { + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + Partitions: []PartitionConfig{ + {PartitionKey: "tenant_id", Tables: []string{"forms"}}, + }, + } + pd := NewPartitionedDatabase("db", cfg) + + err := pd.EnsurePartitionForKey(context.Background(), "tenant_id", "org'; DROP TABLE forms;--") + if err == nil { + t.Fatal("expected error for invalid partition value") + } +} + +func TestPartitionedDatabase_MultiPartition_EnsurePartitionForKey_UnsupportedType(t *testing.T) { + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + Partitions: []PartitionConfig{ + {PartitionKey: "tenant_id", Tables: []string{"forms"}, PartitionType: "hash"}, + }, + } + pd := NewPartitionedDatabase("db", cfg) + + // Unsupported type should error — partition type check comes before nil-db check + err := pd.EnsurePartitionForKey(context.Background(), "tenant_id", "org-alpha") + if err == nil { + t.Fatal("expected error for unsupported partition type") + } +} + +func TestPartitionedDatabase_MultiPartition_SyncPartitionsFromSource_AllConfigs(t *testing.T) { + // SyncPartitionsFromSource with multiple configs that have no sourceTable is a no-op for each. + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + Partitions: []PartitionConfig{ + {PartitionKey: "tenant_id", Tables: []string{"forms"}}, + {PartitionKey: "api_version", Tables: []string{"contracts"}}, + }, + } + pd := NewPartitionedDatabase("db", cfg) + + err := pd.SyncPartitionsFromSource(context.Background()) + if err != nil { + t.Fatalf("expected no-op with no source tables, got: %v", err) + } +} + +// ─── Step tests for partitionKey field ─────────────────────────────────────── + +func TestDBCreatePartitionStep_WithPartitionKey(t *testing.T) { + mgr := &testMultiPartitionManager{ + testPartitionManager: testPartitionManager{partitionKey: "tenant_id"}, + configs: []PartitionConfig{ + {PartitionKey: "tenant_id", Tables: []string{"forms"}}, + {PartitionKey: "api_version", Tables: []string{"contracts"}}, + }, + } + app := NewMockApplication() + app.Services["multi-db"] = mgr + + factory := NewDBCreatePartitionStepFactory() + step, err := factory("create-part", map[string]any{ + "database": "multi-db", + "tenantKey": "steps.body.tenant_id", + "partitionKey": "api_version", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + pc.MergeStepOutput("body", map[string]any{"tenant_id": "v2"}) + + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + if result.Output["tenant"] != "v2" { + t.Errorf("expected tenant='v2', got %v", result.Output["tenant"]) + } + if len(mgr.ensureForKeyCalledWith) != 1 || + mgr.ensureForKeyCalledWith[0].key != "api_version" || + mgr.ensureForKeyCalledWith[0].value != "v2" { + t.Errorf("unexpected EnsurePartitionForKey calls: %v", mgr.ensureForKeyCalledWith) + } +} + +func TestDBCreatePartitionStep_WithPartitionKey_NotMultiManager(t *testing.T) { + // Service is a PartitionManager but not MultiPartitionManager; using partitionKey should fail. + mgr := &testPartitionManager{partitionKey: "tenant_id"} + app := NewMockApplication() + app.Services["part-db"] = mgr + + factory := NewDBCreatePartitionStepFactory() + step, err := factory("create-part", map[string]any{ + "database": "part-db", + "tenantKey": "steps.body.tenant_id", + "partitionKey": "api_version", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + pc.MergeStepOutput("body", map[string]any{"tenant_id": "v2"}) + + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error when service does not implement MultiPartitionManager") + } +} + +func TestDBSyncPartitionsStep_WithPartitionKey(t *testing.T) { + mgr := &testMultiPartitionManager{ + testPartitionManager: testPartitionManager{partitionKey: "tenant_id"}, + configs: []PartitionConfig{ + {PartitionKey: "tenant_id", Tables: []string{"forms"}}, + {PartitionKey: "api_version", Tables: []string{"contracts"}}, + }, + } + app := NewMockApplication() + app.Services["multi-db"] = mgr + + factory := NewDBSyncPartitionsStepFactory() + step, err := factory("sync-parts", map[string]any{ + "database": "multi-db", + "partitionKey": "api_version", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + result, err := step.Execute(context.Background(), pc) + if err != nil { + t.Fatalf("execute error: %v", err) + } + if result.Output["synced"] != true { + t.Errorf("expected synced=true, got %v", result.Output["synced"]) + } + if len(mgr.syncForKeyCalledWith) != 1 || mgr.syncForKeyCalledWith[0] != "api_version" { + t.Errorf("unexpected SyncPartitionsForKey calls: %v", mgr.syncForKeyCalledWith) + } +} + +func TestDBSyncPartitionsStep_WithPartitionKey_NotMultiManager(t *testing.T) { + mgr := &testPartitionManager{partitionKey: "tenant_id"} + app := NewMockApplication() + app.Services["part-db"] = mgr + + factory := NewDBSyncPartitionsStepFactory() + step, err := factory("sync-parts", map[string]any{ + "database": "part-db", + "partitionKey": "api_version", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error when service does not implement MultiPartitionManager") + } +} + +func TestDBCreatePartitionStep_WithPartitionKey_Error(t *testing.T) { + mgr := &testMultiPartitionManager{ + testPartitionManager: testPartitionManager{partitionKey: "tenant_id"}, + configs: []PartitionConfig{{PartitionKey: "api_version", Tables: []string{"contracts"}}}, + ensureForKeyErr: fmt.Errorf("injected error"), + } + app := NewMockApplication() + app.Services["multi-db"] = mgr + + factory := NewDBCreatePartitionStepFactory() + step, err := factory("create-part", map[string]any{ + "database": "multi-db", + "tenantKey": "steps.body.val", + "partitionKey": "api_version", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + pc.MergeStepOutput("body", map[string]any{"val": "v1"}) + + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error propagated from EnsurePartitionForKey") + } +} + +func TestDBSyncPartitionsStep_WithPartitionKey_Error(t *testing.T) { + mgr := &testMultiPartitionManager{ + testPartitionManager: testPartitionManager{partitionKey: "tenant_id"}, + configs: []PartitionConfig{{PartitionKey: "api_version", Tables: []string{"contracts"}}}, + syncForKeyErr: fmt.Errorf("injected sync error"), + } + app := NewMockApplication() + app.Services["multi-db"] = mgr + + factory := NewDBSyncPartitionsStepFactory() + step, err := factory("sync-parts", map[string]any{ + "database": "multi-db", + "partitionKey": "api_version", + }, app) + if err != nil { + t.Fatalf("factory error: %v", err) + } + + pc := NewPipelineContext(nil, nil) + _, err = step.Execute(context.Background(), pc) + if err == nil { + t.Fatal("expected error propagated from SyncPartitionsForKey") + } +} + // testPartitionManager is a mock PartitionManager for testing step.db_create_partition. type testPartitionManager struct { partitionKey string diff --git a/module/pipeline_step_db_create_partition.go b/module/pipeline_step_db_create_partition.go index c76f193a..d11fb281 100644 --- a/module/pipeline_step_db_create_partition.go +++ b/module/pipeline_step_db_create_partition.go @@ -10,11 +10,12 @@ import ( // DBCreatePartitionStep creates a PostgreSQL LIST partition for a given tenant value // on all tables managed by a database.partitioned module. type DBCreatePartitionStep struct { - name string - database string - tenantKey string // dot-path in PipelineContext to resolve the tenant value - app modular.Application - tmpl *TemplateEngine + name string + database string + tenantKey string // dot-path in PipelineContext to resolve the tenant value + partitionKey string // optional: target a specific partition config by key + app modular.Application + tmpl *TemplateEngine } // NewDBCreatePartitionStepFactory returns a StepFactory for DBCreatePartitionStep. @@ -30,12 +31,15 @@ func NewDBCreatePartitionStepFactory() StepFactory { return nil, fmt.Errorf("db_create_partition step %q: 'tenantKey' is required", name) } + partitionKey, _ := config["partitionKey"].(string) + return &DBCreatePartitionStep{ - name: name, - database: database, - tenantKey: tenantKey, - app: app, - tmpl: NewTemplateEngine(), + name: name, + database: database, + tenantKey: tenantKey, + partitionKey: partitionKey, + app: app, + tmpl: NewTemplateEngine(), }, nil } } @@ -63,8 +67,18 @@ func (s *DBCreatePartitionStep) Execute(ctx context.Context, pc *PipelineContext } tenantStr := fmt.Sprintf("%v", tenantVal) - if err := mgr.EnsurePartition(ctx, tenantStr); err != nil { - return nil, fmt.Errorf("db_create_partition step %q: %w", s.name, err) + if s.partitionKey != "" { + multiMgr, ok := svc.(MultiPartitionManager) + if !ok { + return nil, fmt.Errorf("db_create_partition step %q: service %q does not implement MultiPartitionManager (required when partitionKey is set)", s.name, s.database) + } + if err := multiMgr.EnsurePartitionForKey(ctx, s.partitionKey, tenantStr); err != nil { + return nil, fmt.Errorf("db_create_partition step %q: %w", s.name, err) + } + } else { + if err := mgr.EnsurePartition(ctx, tenantStr); err != nil { + return nil, fmt.Errorf("db_create_partition step %q: %w", s.name, err) + } } return &StepResult{Output: map[string]any{ diff --git a/module/pipeline_step_db_sync_partitions.go b/module/pipeline_step_db_sync_partitions.go index 6d7bc414..9f9afdba 100644 --- a/module/pipeline_step_db_sync_partitions.go +++ b/module/pipeline_step_db_sync_partitions.go @@ -11,9 +11,10 @@ import ( // for all tables managed by a database.partitioned module. This enables automatic // partition creation when new tenants are onboarded. type DBSyncPartitionsStep struct { - name string - database string - app modular.Application + name string + database string + partitionKey string // optional: target a specific partition config by key + app modular.Application } // NewDBSyncPartitionsStepFactory returns a StepFactory for DBSyncPartitionsStep. @@ -24,10 +25,13 @@ func NewDBSyncPartitionsStepFactory() StepFactory { return nil, fmt.Errorf("db_sync_partitions step %q: 'database' is required", name) } + partitionKey, _ := config["partitionKey"].(string) + return &DBSyncPartitionsStep{ - name: name, - database: database, - app: app, + name: name, + database: database, + partitionKey: partitionKey, + app: app, }, nil } } @@ -49,8 +53,18 @@ func (s *DBSyncPartitionsStep) Execute(ctx context.Context, _ *PipelineContext) return nil, fmt.Errorf("db_sync_partitions step %q: service %q does not implement PartitionManager (use database.partitioned)", s.name, s.database) } - if err := mgr.SyncPartitionsFromSource(ctx); err != nil { - return nil, fmt.Errorf("db_sync_partitions step %q: %w", s.name, err) + if s.partitionKey != "" { + multiMgr, ok := svc.(MultiPartitionManager) + if !ok { + return nil, fmt.Errorf("db_sync_partitions step %q: service %q does not implement MultiPartitionManager (required when partitionKey is set)", s.name, s.database) + } + if err := multiMgr.SyncPartitionsForKey(ctx, s.partitionKey); err != nil { + return nil, fmt.Errorf("db_sync_partitions step %q: %w", s.name, err) + } + } else { + if err := mgr.SyncPartitionsFromSource(ctx); err != nil { + return nil, fmt.Errorf("db_sync_partitions step %q: %w", s.name, err) + } } return &StepResult{Output: map[string]any{ diff --git a/plugins/storage/plugin.go b/plugins/storage/plugin.go index d4e1d399..4ca4ebac 100644 --- a/plugins/storage/plugin.go +++ b/plugins/storage/plugin.go @@ -186,6 +186,38 @@ func (p *Plugin) ModuleFactories() map[string]plugin.ModuleFactory { if sc, ok := cfg["sourceColumn"].(string); ok { partCfg.SourceColumn = sc } + if partitions, ok := cfg["partitions"].([]any); ok { + for _, item := range partitions { + pMap, ok := item.(map[string]any) + if !ok { + continue + } + pc := module.PartitionConfig{} + if pk, ok := pMap["partitionKey"].(string); ok { + pc.PartitionKey = pk + } + if tables, ok := pMap["tables"].([]any); ok { + for _, t := range tables { + if s, ok := t.(string); ok { + pc.Tables = append(pc.Tables, s) + } + } + } + if pt, ok := pMap["partitionType"].(string); ok { + pc.PartitionType = pt + } + if pnf, ok := pMap["partitionNameFormat"].(string); ok { + pc.PartitionNameFormat = pnf + } + if st, ok := pMap["sourceTable"].(string); ok { + pc.SourceTable = st + } + if scol, ok := pMap["sourceColumn"].(string); ok { + pc.SourceColumn = scol + } + partCfg.Partitions = append(partCfg.Partitions, pc) + } + } return module.NewPartitionedDatabase(name, partCfg) }, "persistence.store": func(name string, cfg map[string]any) modular.Module { diff --git a/schema/module_schema.go b/schema/module_schema.go index 8846e578..2ddfe42f 100644 --- a/schema/module_schema.go +++ b/schema/module_schema.go @@ -516,18 +516,19 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Type: "database.partitioned", Label: "Partitioned Database", Category: "database", - Description: "PostgreSQL partitioned database for multi-tenant data isolation. Supports LIST and RANGE partitions with configurable naming format and optional source-table-driven auto-partition creation.", + Description: "PostgreSQL partitioned database for multi-tenant data isolation. Supports LIST and RANGE partitions with configurable naming format and optional source-table-driven auto-partition creation. Use partitionKey/tables for a single partition config, or partitions[] for multiple independent partition key configurations.", Inputs: []ServiceIODef{{Name: "query", Type: "SQL", Description: "SQL query to execute"}}, Outputs: []ServiceIODef{{Name: "database", Type: "sql.DB", Description: "SQL database connection pool"}}, ConfigFields: []ConfigFieldDef{ {Key: "driver", Label: "Driver", Type: FieldTypeSelect, Options: []string{"pgx", "pgx/v5", "postgres"}, Required: true, Description: "PostgreSQL database driver"}, {Key: "dsn", Label: "DSN", Type: FieldTypeString, Required: true, Description: "Data source name / connection string", Placeholder: "postgres://user:pass@localhost/db?sslmode=disable", Sensitive: true}, //nolint:gosec // G101: placeholder DSN example in schema documentation - {Key: "partitionKey", Label: "Partition Key", Type: FieldTypeString, Required: true, Description: "Column name used for partitioning (e.g. tenant_id)", Placeholder: "tenant_id"}, - {Key: "tables", Label: "Tables", Type: FieldTypeArray, ArrayItemType: "string", Required: true, Description: "Tables to manage partitions for", Placeholder: "forms"}, - {Key: "partitionType", Label: "Partition Type", Type: FieldTypeSelect, Options: []string{"list", "range"}, DefaultValue: "list", Description: "PostgreSQL partition type: list (FOR VALUES IN) or range (FOR VALUES FROM/TO)"}, - {Key: "partitionNameFormat", Label: "Partition Name Format", Type: FieldTypeString, DefaultValue: "{table}_{tenant}", Description: "Template for partition table names. Supports {table} and {tenant} placeholders.", Placeholder: "{table}_{tenant}"}, - {Key: "sourceTable", Label: "Source Table", Type: FieldTypeString, Description: "Table containing all tenant IDs for auto-partition sync (e.g. tenants)", Placeholder: "tenants"}, - {Key: "sourceColumn", Label: "Source Column", Type: FieldTypeString, Description: "Column in source table to query for tenant values. Defaults to partitionKey.", Placeholder: "id"}, + {Key: "partitionKey", Label: "Partition Key", Type: FieldTypeString, Description: "Column name used for partitioning in single-partition mode (e.g. tenant_id). Ignored when 'partitions' is set.", Placeholder: "tenant_id"}, + {Key: "tables", Label: "Tables", Type: FieldTypeArray, ArrayItemType: "string", Description: "Tables to manage partitions for in single-partition mode. Ignored when 'partitions' is set.", Placeholder: "forms"}, + {Key: "partitionType", Label: "Partition Type", Type: FieldTypeSelect, Options: []string{"list", "range"}, DefaultValue: "list", Description: "PostgreSQL partition type for single-partition mode: list (FOR VALUES IN) or range (FOR VALUES FROM/TO). Ignored when 'partitions' is set."}, + {Key: "partitionNameFormat", Label: "Partition Name Format", Type: FieldTypeString, DefaultValue: "{table}_{tenant}", Description: "Template for partition table names in single-partition mode. Supports {table} and {tenant} placeholders. Ignored when 'partitions' is set.", Placeholder: "{table}_{tenant}"}, + {Key: "sourceTable", Label: "Source Table", Type: FieldTypeString, Description: "Table containing all tenant IDs for auto-partition sync in single-partition mode. Ignored when 'partitions' is set.", Placeholder: "tenants"}, + {Key: "sourceColumn", Label: "Source Column", Type: FieldTypeString, Description: "Column in source table to query for tenant values in single-partition mode. Defaults to partitionKey.", Placeholder: "id"}, + {Key: "partitions", Label: "Partitions", Type: FieldTypeArray, ArrayItemType: "object", Description: "List of independent partition key configurations. When set, overrides the single-partition fields. Each entry supports: partitionKey, tables, partitionType, partitionNameFormat, sourceTable, sourceColumn."}, {Key: "maxOpenConns", Label: "Max Open Connections", Type: FieldTypeNumber, DefaultValue: 25, Description: "Maximum number of open database connections"}, {Key: "maxIdleConns", Label: "Max Idle Connections", Type: FieldTypeNumber, DefaultValue: 5, Description: "Maximum number of idle connections in the pool"}, }, @@ -1074,6 +1075,7 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { ConfigFields: []ConfigFieldDef{ {Key: "database", Label: "Database", Type: FieldTypeString, Required: true, Description: "Name of a database.partitioned service", Placeholder: "db", InheritFrom: "dependency.name"}, {Key: "tenantKey", Label: "Tenant Key", Type: FieldTypeString, Required: true, Description: "Dot-path in pipeline context to resolve the tenant value (e.g. the new tenant's ID)", Placeholder: "steps.body.tenant_id"}, + {Key: "partitionKey", Label: "Partition Key", Type: FieldTypeString, Description: "Target a specific partition config by its partitionKey. Required when the database has multiple partition configs. Omit to use the primary (first) partition config.", Placeholder: "tenant_id"}, }, }) @@ -1086,6 +1088,7 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { Outputs: []ServiceIODef{{Name: "result", Type: "StepResult", Description: "Sync result with synced boolean"}}, ConfigFields: []ConfigFieldDef{ {Key: "database", Label: "Database", Type: FieldTypeString, Required: true, Description: "Name of a database.partitioned service with sourceTable configured", Placeholder: "db", InheritFrom: "dependency.name"}, + {Key: "partitionKey", Label: "Partition Key", Type: FieldTypeString, Description: "Target a specific partition config by its partitionKey for syncing. Omit to sync all configured partition groups.", Placeholder: "tenant_id"}, }, }) diff --git a/schema/schema_test.go b/schema/schema_test.go index 06384866..305dda9f 100644 --- a/schema/schema_test.go +++ b/schema/schema_test.go @@ -952,3 +952,202 @@ func makeDir(path string) error { func writeFile(path string, data []byte) error { return os.WriteFile(path, data, 0644) } + +// --------------------------------------------------------------------------- +// database.partitioned validation tests +// --------------------------------------------------------------------------- + +func TestValidateConfig_PartitionedDB_SingleMode_Valid(t *testing.T) { + cfg := &config.WorkflowConfig{ + Modules: []config.ModuleConfig{ + {Name: "db", Type: "database.partitioned", Config: map[string]any{ + "driver": "pgx", + "dsn": "postgres://localhost/test", + "partitionKey": "tenant_id", + "tables": []any{"forms"}, + }}, + }, + } + err := ValidateConfig(cfg, WithAllowNoEntryPoints()) + if err != nil { + t.Fatalf("expected valid config, got: %v", err) + } +} + +func TestValidateConfig_PartitionedDB_SingleMode_MissingPartitionKey(t *testing.T) { + cfg := &config.WorkflowConfig{ + Modules: []config.ModuleConfig{ + {Name: "db", Type: "database.partitioned", Config: map[string]any{ + "driver": "pgx", + "dsn": "postgres://localhost/test", + "tables": []any{"forms"}, + }}, + }, + } + err := ValidateConfig(cfg, WithAllowNoEntryPoints()) + if err == nil { + t.Fatal("expected error for missing partitionKey in single-partition mode") + } + assertContains(t, err.Error(), "partitionKey") +} + +func TestValidateConfig_PartitionedDB_SingleMode_MissingTables(t *testing.T) { + cfg := &config.WorkflowConfig{ + Modules: []config.ModuleConfig{ + {Name: "db", Type: "database.partitioned", Config: map[string]any{ + "driver": "pgx", + "dsn": "postgres://localhost/test", + "partitionKey": "tenant_id", + }}, + }, + } + err := ValidateConfig(cfg, WithAllowNoEntryPoints()) + if err == nil { + t.Fatal("expected error for missing tables in single-partition mode") + } + assertContains(t, err.Error(), "tables") +} + +func TestValidateConfig_PartitionedDB_SingleMode_EmptyTables(t *testing.T) { + cfg := &config.WorkflowConfig{ + Modules: []config.ModuleConfig{ + {Name: "db", Type: "database.partitioned", Config: map[string]any{ + "driver": "pgx", + "dsn": "postgres://localhost/test", + "partitionKey": "tenant_id", + "tables": []any{}, + }}, + }, + } + err := ValidateConfig(cfg, WithAllowNoEntryPoints()) + if err == nil { + t.Fatal("expected error for empty tables list in single-partition mode") + } + assertContains(t, err.Error(), "tables") +} + +func TestValidateConfig_PartitionedDB_MultiMode_Valid(t *testing.T) { + cfg := &config.WorkflowConfig{ + Modules: []config.ModuleConfig{ + {Name: "db", Type: "database.partitioned", Config: map[string]any{ + "driver": "pgx", + "dsn": "postgres://localhost/test", + "partitions": []any{ + map[string]any{"partitionKey": "tenant_id", "tables": []any{"forms"}}, + map[string]any{"partitionKey": "api_version", "tables": []any{"contracts"}, "partitionType": "range"}, + }, + }}, + }, + } + err := ValidateConfig(cfg, WithAllowNoEntryPoints()) + if err != nil { + t.Fatalf("expected valid multi-partition config, got: %v", err) + } +} + +func TestValidateConfig_PartitionedDB_MultiMode_EntryMissingPartitionKey(t *testing.T) { + cfg := &config.WorkflowConfig{ + Modules: []config.ModuleConfig{ + {Name: "db", Type: "database.partitioned", Config: map[string]any{ + "driver": "pgx", + "dsn": "postgres://localhost/test", + "partitions": []any{ + map[string]any{"tables": []any{"forms"}}, // missing partitionKey + }, + }}, + }, + } + err := ValidateConfig(cfg, WithAllowNoEntryPoints()) + if err == nil { + t.Fatal("expected error for missing partitionKey in partition entry") + } + assertContains(t, err.Error(), "partitionKey") +} + +func TestValidateConfig_PartitionedDB_MultiMode_EntryMissingTables(t *testing.T) { + cfg := &config.WorkflowConfig{ + Modules: []config.ModuleConfig{ + {Name: "db", Type: "database.partitioned", Config: map[string]any{ + "driver": "pgx", + "dsn": "postgres://localhost/test", + "partitions": []any{ + map[string]any{"partitionKey": "tenant_id"}, // missing tables + }, + }}, + }, + } + err := ValidateConfig(cfg, WithAllowNoEntryPoints()) + if err == nil { + t.Fatal("expected error for missing tables in partition entry") + } + assertContains(t, err.Error(), "tables") +} + +func TestValidateConfig_PartitionedDB_MultiMode_EntryEmptyTables(t *testing.T) { + cfg := &config.WorkflowConfig{ + Modules: []config.ModuleConfig{ + {Name: "db", Type: "database.partitioned", Config: map[string]any{ + "driver": "pgx", + "dsn": "postgres://localhost/test", + "partitions": []any{ + map[string]any{"partitionKey": "tenant_id", "tables": []any{}}, + }, + }}, + }, + } + err := ValidateConfig(cfg, WithAllowNoEntryPoints()) + if err == nil { + t.Fatal("expected error for empty tables list in partition entry") + } + assertContains(t, err.Error(), "tables") +} + +func TestValidateConfig_PartitionedDB_MultiMode_EntryNotObject(t *testing.T) { + cfg := &config.WorkflowConfig{ + Modules: []config.ModuleConfig{ + {Name: "db", Type: "database.partitioned", Config: map[string]any{ + "driver": "pgx", + "dsn": "postgres://localhost/test", + "partitions": []any{"not-an-object"}, + }}, + }, + } + err := ValidateConfig(cfg, WithAllowNoEntryPoints()) + if err == nil { + t.Fatal("expected error for non-object partition entry") + } + assertContains(t, err.Error(), "must be an object") +} + +func TestValidateConfig_PartitionedDB_EmptyPartitionsArray_FallsBackToSingleMode(t *testing.T) { + // An empty partitions array should fall through to single-partition validation. + cfg := &config.WorkflowConfig{ + Modules: []config.ModuleConfig{ + {Name: "db", Type: "database.partitioned", Config: map[string]any{ + "driver": "pgx", + "dsn": "postgres://localhost/test", + "partitions": []any{}, // empty → single-partition mode applies + }}, + }, + } + err := ValidateConfig(cfg, WithAllowNoEntryPoints()) + if err == nil { + t.Fatal("expected error for missing partitionKey/tables when partitions is empty") + } + assertContains(t, err.Error(), "partitionKey") +} + +func TestValidateConfig_PartitionedDB_NilConfig(t *testing.T) { + cfg := &config.WorkflowConfig{ + Modules: []config.ModuleConfig{ + {Name: "db", Type: "database.partitioned", Config: nil}, + }, + } + err := ValidateConfig(cfg, WithAllowNoEntryPoints()) + // nil config → driver+dsn required errors from schema, no panic from type-specific check + if err == nil { + t.Fatal("expected errors for nil config") + } + // Should report driver/dsn as required + assertContains(t, err.Error(), "driver") +} diff --git a/schema/validate.go b/schema/validate.go index 21c23eb8..88246d4a 100644 --- a/schema/validate.go +++ b/schema/validate.go @@ -318,6 +318,67 @@ func validateModuleConfig(mod config.ModuleConfig, prefix string, errs *Validati // Additional type-specific structural checks beyond simple required fields switch mod.Type { + case "database.partitioned": + if mod.Config == nil { + break + } + hasPartitionsArr := false + if partitions, ok := mod.Config["partitions"]; ok { + if arr, ok := partitions.([]any); ok && len(arr) > 0 { + hasPartitionsArr = true + // Each entry in partitions must have partitionKey and tables. + for j, item := range arr { + entryPath := fmt.Sprintf("%s.config.partitions[%d]", prefix, j) + entry, ok := item.(map[string]any) + if !ok { + *errs = append(*errs, &ValidationError{ + Path: entryPath, + Message: "each partition entry must be an object", + }) + continue + } + pk, _ := entry["partitionKey"].(string) + if pk == "" { + *errs = append(*errs, &ValidationError{ + Path: entryPath + ".partitionKey", + Message: "required field \"partitionKey\" is missing or empty", + }) + } + if tables, ok := entry["tables"]; !ok { + *errs = append(*errs, &ValidationError{ + Path: entryPath + ".tables", + Message: "required field \"tables\" is missing", + }) + } else if arr, ok := tables.([]any); !ok || len(arr) == 0 { + *errs = append(*errs, &ValidationError{ + Path: entryPath + ".tables", + Message: "\"tables\" must be a non-empty list", + }) + } + } + } + } + if !hasPartitionsArr { + // Single-partition mode: partitionKey and tables are required. + pk, _ := mod.Config["partitionKey"].(string) + if pk == "" { + *errs = append(*errs, &ValidationError{ + Path: prefix + ".config.partitionKey", + Message: "required config field \"partitionKey\" is missing or empty (set partitionKey for single-partition mode or provide a non-empty \"partitions\" list for multi-partition mode)", + }) + } + if tables, ok := mod.Config["tables"]; !ok { + *errs = append(*errs, &ValidationError{ + Path: prefix + ".config.tables", + Message: "required config field \"tables\" is missing (set tables for single-partition mode or provide a non-empty \"partitions\" list for multi-partition mode)", + }) + } else if arr, ok := tables.([]any); !ok || len(arr) == 0 { + *errs = append(*errs, &ValidationError{ + Path: prefix + ".config.tables", + Message: "\"tables\" must be a non-empty list", + }) + } + } case "messaging.kafka": if mod.Config != nil { if brokers, ok := mod.Config["brokers"]; ok {