Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 226 additions & 54 deletions module/database_partitioned.go

Large diffs are not rendered by default.

465 changes: 465 additions & 0 deletions module/database_partitioned_test.go

Large diffs are not rendered by default.

38 changes: 26 additions & 12 deletions module/pipeline_step_db_create_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
// DBCreatePartitionStep creates a PostgreSQL LIST partition for a given tenant value
// on all tables managed by a database.partitioned module.
type DBCreatePartitionStep struct {
name string
database string
tenantKey string // dot-path in PipelineContext to resolve the tenant value
app modular.Application
tmpl *TemplateEngine
name string
database string
tenantKey string // dot-path in PipelineContext to resolve the tenant value
partitionKey string // optional: target a specific partition config by key
app modular.Application
tmpl *TemplateEngine
}

// NewDBCreatePartitionStepFactory returns a StepFactory for DBCreatePartitionStep.
Expand All @@ -30,12 +31,15 @@ func NewDBCreatePartitionStepFactory() StepFactory {
return nil, fmt.Errorf("db_create_partition step %q: 'tenantKey' is required", name)
}

partitionKey, _ := config["partitionKey"].(string)

return &DBCreatePartitionStep{
name: name,
database: database,
tenantKey: tenantKey,
app: app,
tmpl: NewTemplateEngine(),
name: name,
database: database,
tenantKey: tenantKey,
partitionKey: partitionKey,
app: app,
tmpl: NewTemplateEngine(),
}, nil
}
}
Expand Down Expand Up @@ -63,8 +67,18 @@ func (s *DBCreatePartitionStep) Execute(ctx context.Context, pc *PipelineContext
}
tenantStr := fmt.Sprintf("%v", tenantVal)

if err := mgr.EnsurePartition(ctx, tenantStr); err != nil {
return nil, fmt.Errorf("db_create_partition step %q: %w", s.name, err)
if s.partitionKey != "" {
multiMgr, ok := svc.(MultiPartitionManager)
if !ok {
return nil, fmt.Errorf("db_create_partition step %q: service %q does not implement MultiPartitionManager (required when partitionKey is set)", s.name, s.database)
}
if err := multiMgr.EnsurePartitionForKey(ctx, s.partitionKey, tenantStr); err != nil {
return nil, fmt.Errorf("db_create_partition step %q: %w", s.name, err)
}
} else {
if err := mgr.EnsurePartition(ctx, tenantStr); err != nil {
return nil, fmt.Errorf("db_create_partition step %q: %w", s.name, err)
}
}

return &StepResult{Output: map[string]any{
Expand Down
30 changes: 22 additions & 8 deletions module/pipeline_step_db_sync_partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (
// for all tables managed by a database.partitioned module. This enables automatic
// partition creation when new tenants are onboarded.
type DBSyncPartitionsStep struct {
name string
database string
app modular.Application
name string
database string
partitionKey string // optional: target a specific partition config by key
app modular.Application
}

// NewDBSyncPartitionsStepFactory returns a StepFactory for DBSyncPartitionsStep.
Expand All @@ -24,10 +25,13 @@ func NewDBSyncPartitionsStepFactory() StepFactory {
return nil, fmt.Errorf("db_sync_partitions step %q: 'database' is required", name)
}

partitionKey, _ := config["partitionKey"].(string)

return &DBSyncPartitionsStep{
name: name,
database: database,
app: app,
name: name,
database: database,
partitionKey: partitionKey,
app: app,
}, nil
}
}
Expand All @@ -49,8 +53,18 @@ func (s *DBSyncPartitionsStep) Execute(ctx context.Context, _ *PipelineContext)
return nil, fmt.Errorf("db_sync_partitions step %q: service %q does not implement PartitionManager (use database.partitioned)", s.name, s.database)
}

if err := mgr.SyncPartitionsFromSource(ctx); err != nil {
return nil, fmt.Errorf("db_sync_partitions step %q: %w", s.name, err)
if s.partitionKey != "" {
multiMgr, ok := svc.(MultiPartitionManager)
if !ok {
return nil, fmt.Errorf("db_sync_partitions step %q: service %q does not implement MultiPartitionManager (required when partitionKey is set)", s.name, s.database)
}
if err := multiMgr.SyncPartitionsForKey(ctx, s.partitionKey); err != nil {
return nil, fmt.Errorf("db_sync_partitions step %q: %w", s.name, err)
}
} else {
if err := mgr.SyncPartitionsFromSource(ctx); err != nil {
return nil, fmt.Errorf("db_sync_partitions step %q: %w", s.name, err)
}
}

return &StepResult{Output: map[string]any{
Expand Down
32 changes: 32 additions & 0 deletions plugins/storage/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,38 @@ func (p *Plugin) ModuleFactories() map[string]plugin.ModuleFactory {
if sc, ok := cfg["sourceColumn"].(string); ok {
partCfg.SourceColumn = sc
}
if partitions, ok := cfg["partitions"].([]any); ok {
for _, item := range partitions {
pMap, ok := item.(map[string]any)
if !ok {
continue
}
pc := module.PartitionConfig{}
if pk, ok := pMap["partitionKey"].(string); ok {
pc.PartitionKey = pk
}
if tables, ok := pMap["tables"].([]any); ok {
for _, t := range tables {
if s, ok := t.(string); ok {
pc.Tables = append(pc.Tables, s)
}
}
}
if pt, ok := pMap["partitionType"].(string); ok {
pc.PartitionType = pt
}
if pnf, ok := pMap["partitionNameFormat"].(string); ok {
pc.PartitionNameFormat = pnf
}
if st, ok := pMap["sourceTable"].(string); ok {
pc.SourceTable = st
}
if scol, ok := pMap["sourceColumn"].(string); ok {
pc.SourceColumn = scol
}
partCfg.Partitions = append(partCfg.Partitions, pc)
}
}
return module.NewPartitionedDatabase(name, partCfg)
},
"persistence.store": func(name string, cfg map[string]any) modular.Module {
Expand Down
17 changes: 10 additions & 7 deletions schema/module_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,18 +516,19 @@ func (r *ModuleSchemaRegistry) registerBuiltins() {
Type: "database.partitioned",
Label: "Partitioned Database",
Category: "database",
Description: "PostgreSQL partitioned database for multi-tenant data isolation. Supports LIST and RANGE partitions with configurable naming format and optional source-table-driven auto-partition creation.",
Description: "PostgreSQL partitioned database for multi-tenant data isolation. Supports LIST and RANGE partitions with configurable naming format and optional source-table-driven auto-partition creation. Use partitionKey/tables for a single partition config, or partitions[] for multiple independent partition key configurations.",
Inputs: []ServiceIODef{{Name: "query", Type: "SQL", Description: "SQL query to execute"}},
Outputs: []ServiceIODef{{Name: "database", Type: "sql.DB", Description: "SQL database connection pool"}},
ConfigFields: []ConfigFieldDef{
{Key: "driver", Label: "Driver", Type: FieldTypeSelect, Options: []string{"pgx", "pgx/v5", "postgres"}, Required: true, Description: "PostgreSQL database driver"},
{Key: "dsn", Label: "DSN", Type: FieldTypeString, Required: true, Description: "Data source name / connection string", Placeholder: "postgres://user:pass@localhost/db?sslmode=disable", Sensitive: true}, //nolint:gosec // G101: placeholder DSN example in schema documentation
{Key: "partitionKey", Label: "Partition Key", Type: FieldTypeString, Required: true, Description: "Column name used for partitioning (e.g. tenant_id)", Placeholder: "tenant_id"},
{Key: "tables", Label: "Tables", Type: FieldTypeArray, ArrayItemType: "string", Required: true, Description: "Tables to manage partitions for", Placeholder: "forms"},
{Key: "partitionType", Label: "Partition Type", Type: FieldTypeSelect, Options: []string{"list", "range"}, DefaultValue: "list", Description: "PostgreSQL partition type: list (FOR VALUES IN) or range (FOR VALUES FROM/TO)"},
{Key: "partitionNameFormat", Label: "Partition Name Format", Type: FieldTypeString, DefaultValue: "{table}_{tenant}", Description: "Template for partition table names. Supports {table} and {tenant} placeholders.", Placeholder: "{table}_{tenant}"},
{Key: "sourceTable", Label: "Source Table", Type: FieldTypeString, Description: "Table containing all tenant IDs for auto-partition sync (e.g. tenants)", Placeholder: "tenants"},
{Key: "sourceColumn", Label: "Source Column", Type: FieldTypeString, Description: "Column in source table to query for tenant values. Defaults to partitionKey.", Placeholder: "id"},
{Key: "partitionKey", Label: "Partition Key", Type: FieldTypeString, Description: "Column name used for partitioning in single-partition mode (e.g. tenant_id). Ignored when 'partitions' is set.", Placeholder: "tenant_id"},
{Key: "tables", Label: "Tables", Type: FieldTypeArray, ArrayItemType: "string", Description: "Tables to manage partitions for in single-partition mode. Ignored when 'partitions' is set.", Placeholder: "forms"},
{Key: "partitionType", Label: "Partition Type", Type: FieldTypeSelect, Options: []string{"list", "range"}, DefaultValue: "list", Description: "PostgreSQL partition type for single-partition mode: list (FOR VALUES IN) or range (FOR VALUES FROM/TO). Ignored when 'partitions' is set."},
{Key: "partitionNameFormat", Label: "Partition Name Format", Type: FieldTypeString, DefaultValue: "{table}_{tenant}", Description: "Template for partition table names in single-partition mode. Supports {table} and {tenant} placeholders. Ignored when 'partitions' is set.", Placeholder: "{table}_{tenant}"},
{Key: "sourceTable", Label: "Source Table", Type: FieldTypeString, Description: "Table containing all tenant IDs for auto-partition sync in single-partition mode. Ignored when 'partitions' is set.", Placeholder: "tenants"},
{Key: "sourceColumn", Label: "Source Column", Type: FieldTypeString, Description: "Column in source table to query for tenant values in single-partition mode. Defaults to partitionKey.", Placeholder: "id"},
{Key: "partitions", Label: "Partitions", Type: FieldTypeArray, ArrayItemType: "object", Description: "List of independent partition key configurations. When set, overrides the single-partition fields. Each entry supports: partitionKey, tables, partitionType, partitionNameFormat, sourceTable, sourceColumn."},
Comment on lines +525 to +531
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By making partitionKey and tables non-required here, schema.ValidateConfig will no longer reject a database.partitioned module config that sets neither the legacy single-partition fields nor a non-empty partitions array. That invalid config will pass validation but later fail at runtime (e.g., EnsurePartition will error on an empty/invalid identifier). Consider adding explicit type-specific validation for database.partitioned in schema/validate.go (require either partitions with required fields per entry, or partitionKey+tables in single-partition mode).

Copilot uses AI. Check for mistakes.
{Key: "maxOpenConns", Label: "Max Open Connections", Type: FieldTypeNumber, DefaultValue: 25, Description: "Maximum number of open database connections"},
{Key: "maxIdleConns", Label: "Max Idle Connections", Type: FieldTypeNumber, DefaultValue: 5, Description: "Maximum number of idle connections in the pool"},
},
Expand Down Expand Up @@ -1074,6 +1075,7 @@ func (r *ModuleSchemaRegistry) registerBuiltins() {
ConfigFields: []ConfigFieldDef{
{Key: "database", Label: "Database", Type: FieldTypeString, Required: true, Description: "Name of a database.partitioned service", Placeholder: "db", InheritFrom: "dependency.name"},
{Key: "tenantKey", Label: "Tenant Key", Type: FieldTypeString, Required: true, Description: "Dot-path in pipeline context to resolve the tenant value (e.g. the new tenant's ID)", Placeholder: "steps.body.tenant_id"},
{Key: "partitionKey", Label: "Partition Key", Type: FieldTypeString, Description: "Target a specific partition config by its partitionKey. Required when the database has multiple partition configs. Omit to use the primary (first) partition config.", Placeholder: "tenant_id"},
},
})

Expand All @@ -1086,6 +1088,7 @@ func (r *ModuleSchemaRegistry) registerBuiltins() {
Outputs: []ServiceIODef{{Name: "result", Type: "StepResult", Description: "Sync result with synced boolean"}},
ConfigFields: []ConfigFieldDef{
{Key: "database", Label: "Database", Type: FieldTypeString, Required: true, Description: "Name of a database.partitioned service with sourceTable configured", Placeholder: "db", InheritFrom: "dependency.name"},
{Key: "partitionKey", Label: "Partition Key", Type: FieldTypeString, Description: "Target a specific partition config by its partitionKey for syncing. Omit to sync all configured partition groups.", Placeholder: "tenant_id"},
},
})

Expand Down
199 changes: 199 additions & 0 deletions schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,3 +952,202 @@ func makeDir(path string) error {
func writeFile(path string, data []byte) error {
return os.WriteFile(path, data, 0644)
}

// ---------------------------------------------------------------------------
// database.partitioned validation tests
// ---------------------------------------------------------------------------

func TestValidateConfig_PartitionedDB_SingleMode_Valid(t *testing.T) {
cfg := &config.WorkflowConfig{
Modules: []config.ModuleConfig{
{Name: "db", Type: "database.partitioned", Config: map[string]any{
"driver": "pgx",
"dsn": "postgres://localhost/test",
"partitionKey": "tenant_id",
"tables": []any{"forms"},
}},
},
}
err := ValidateConfig(cfg, WithAllowNoEntryPoints())
if err != nil {
t.Fatalf("expected valid config, got: %v", err)
}
}

func TestValidateConfig_PartitionedDB_SingleMode_MissingPartitionKey(t *testing.T) {
cfg := &config.WorkflowConfig{
Modules: []config.ModuleConfig{
{Name: "db", Type: "database.partitioned", Config: map[string]any{
"driver": "pgx",
"dsn": "postgres://localhost/test",
"tables": []any{"forms"},
}},
},
}
err := ValidateConfig(cfg, WithAllowNoEntryPoints())
if err == nil {
t.Fatal("expected error for missing partitionKey in single-partition mode")
}
assertContains(t, err.Error(), "partitionKey")
}

func TestValidateConfig_PartitionedDB_SingleMode_MissingTables(t *testing.T) {
cfg := &config.WorkflowConfig{
Modules: []config.ModuleConfig{
{Name: "db", Type: "database.partitioned", Config: map[string]any{
"driver": "pgx",
"dsn": "postgres://localhost/test",
"partitionKey": "tenant_id",
}},
},
}
err := ValidateConfig(cfg, WithAllowNoEntryPoints())
if err == nil {
t.Fatal("expected error for missing tables in single-partition mode")
}
assertContains(t, err.Error(), "tables")
}

func TestValidateConfig_PartitionedDB_SingleMode_EmptyTables(t *testing.T) {
cfg := &config.WorkflowConfig{
Modules: []config.ModuleConfig{
{Name: "db", Type: "database.partitioned", Config: map[string]any{
"driver": "pgx",
"dsn": "postgres://localhost/test",
"partitionKey": "tenant_id",
"tables": []any{},
}},
},
}
err := ValidateConfig(cfg, WithAllowNoEntryPoints())
if err == nil {
t.Fatal("expected error for empty tables list in single-partition mode")
}
assertContains(t, err.Error(), "tables")
}

func TestValidateConfig_PartitionedDB_MultiMode_Valid(t *testing.T) {
cfg := &config.WorkflowConfig{
Modules: []config.ModuleConfig{
{Name: "db", Type: "database.partitioned", Config: map[string]any{
"driver": "pgx",
"dsn": "postgres://localhost/test",
"partitions": []any{
map[string]any{"partitionKey": "tenant_id", "tables": []any{"forms"}},
map[string]any{"partitionKey": "api_version", "tables": []any{"contracts"}, "partitionType": "range"},
},
}},
},
}
err := ValidateConfig(cfg, WithAllowNoEntryPoints())
if err != nil {
t.Fatalf("expected valid multi-partition config, got: %v", err)
}
}

func TestValidateConfig_PartitionedDB_MultiMode_EntryMissingPartitionKey(t *testing.T) {
cfg := &config.WorkflowConfig{
Modules: []config.ModuleConfig{
{Name: "db", Type: "database.partitioned", Config: map[string]any{
"driver": "pgx",
"dsn": "postgres://localhost/test",
"partitions": []any{
map[string]any{"tables": []any{"forms"}}, // missing partitionKey
},
}},
},
}
err := ValidateConfig(cfg, WithAllowNoEntryPoints())
if err == nil {
t.Fatal("expected error for missing partitionKey in partition entry")
}
assertContains(t, err.Error(), "partitionKey")
}

func TestValidateConfig_PartitionedDB_MultiMode_EntryMissingTables(t *testing.T) {
cfg := &config.WorkflowConfig{
Modules: []config.ModuleConfig{
{Name: "db", Type: "database.partitioned", Config: map[string]any{
"driver": "pgx",
"dsn": "postgres://localhost/test",
"partitions": []any{
map[string]any{"partitionKey": "tenant_id"}, // missing tables
},
}},
},
}
err := ValidateConfig(cfg, WithAllowNoEntryPoints())
if err == nil {
t.Fatal("expected error for missing tables in partition entry")
}
assertContains(t, err.Error(), "tables")
}

func TestValidateConfig_PartitionedDB_MultiMode_EntryEmptyTables(t *testing.T) {
cfg := &config.WorkflowConfig{
Modules: []config.ModuleConfig{
{Name: "db", Type: "database.partitioned", Config: map[string]any{
"driver": "pgx",
"dsn": "postgres://localhost/test",
"partitions": []any{
map[string]any{"partitionKey": "tenant_id", "tables": []any{}},
},
}},
},
}
err := ValidateConfig(cfg, WithAllowNoEntryPoints())
if err == nil {
t.Fatal("expected error for empty tables list in partition entry")
}
assertContains(t, err.Error(), "tables")
}

func TestValidateConfig_PartitionedDB_MultiMode_EntryNotObject(t *testing.T) {
cfg := &config.WorkflowConfig{
Modules: []config.ModuleConfig{
{Name: "db", Type: "database.partitioned", Config: map[string]any{
"driver": "pgx",
"dsn": "postgres://localhost/test",
"partitions": []any{"not-an-object"},
}},
},
}
err := ValidateConfig(cfg, WithAllowNoEntryPoints())
if err == nil {
t.Fatal("expected error for non-object partition entry")
}
assertContains(t, err.Error(), "must be an object")
}

func TestValidateConfig_PartitionedDB_EmptyPartitionsArray_FallsBackToSingleMode(t *testing.T) {
// An empty partitions array should fall through to single-partition validation.
cfg := &config.WorkflowConfig{
Modules: []config.ModuleConfig{
{Name: "db", Type: "database.partitioned", Config: map[string]any{
"driver": "pgx",
"dsn": "postgres://localhost/test",
"partitions": []any{}, // empty → single-partition mode applies
}},
},
}
err := ValidateConfig(cfg, WithAllowNoEntryPoints())
if err == nil {
t.Fatal("expected error for missing partitionKey/tables when partitions is empty")
}
assertContains(t, err.Error(), "partitionKey")
}

func TestValidateConfig_PartitionedDB_NilConfig(t *testing.T) {
cfg := &config.WorkflowConfig{
Modules: []config.ModuleConfig{
{Name: "db", Type: "database.partitioned", Config: nil},
},
}
err := ValidateConfig(cfg, WithAllowNoEntryPoints())
// nil config → driver+dsn required errors from schema, no panic from type-specific check
if err == nil {
t.Fatal("expected errors for nil config")
}
// Should report driver/dsn as required
assertContains(t, err.Error(), "driver")
}
Loading
Loading