From c5c2bb67ca7adbff478fc852e291ae99e4e502b3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 11 Mar 2026 22:52:38 +0000 Subject: [PATCH 1/3] Initial plan From 19870c5791cf36101a2565c44c7a75df8368d434 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 11 Mar 2026 23:08:28 +0000 Subject: [PATCH 2/3] database.partitioned: add autoSync and syncInterval lifecycle hooks Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- module/database_partitioned.go | 92 ++++++++++++- module/database_partitioned_test.go | 205 ++++++++++++++++++++++++++++ plugins/storage/plugin.go | 10 +- 3 files changed, 304 insertions(+), 3 deletions(-) diff --git a/module/database_partitioned.go b/module/database_partitioned.go index d2f1687a..e1922daa 100644 --- a/module/database_partitioned.go +++ b/module/database_partitioned.go @@ -7,6 +7,7 @@ import ( "regexp" "strings" "sync" + "time" "github.com/GoCodeAlone/modular" ) @@ -120,6 +121,17 @@ type PartitionedDatabaseConfig struct { // Defaults to PartitionKey if empty. SourceColumn string `json:"sourceColumn" yaml:"sourceColumn"` + // ── Lifecycle sync settings ─────────────────────────────────────────────── + // AutoSync controls whether SyncPartitionsFromSource is called automatically + // during Start(). Defaults to true when any sourceTable is configured. + // Set to false to disable automatic sync on startup. + AutoSync *bool `json:"autoSync" yaml:"autoSync"` + // SyncInterval is a duration string (e.g. "60s", "5m") for periodic + // re-sync of partitions from the source table. When set, a background + // goroutine calls SyncPartitionsFromSource at this interval after Start(). + // Requires at least one sourceTable to be configured. Example: "60s". + SyncInterval string `json:"syncInterval" yaml:"syncInterval"` + // ── Multi-partition mode ───────────────────────────────────────────────── // Partitions lists independent partition key configurations. When non-empty, // the single-partition fields above are ignored. @@ -135,6 +147,11 @@ type PartitionedDatabase struct { partitions []PartitionConfig // normalized; always len >= 1 after construction base *WorkflowDatabase mu sync.RWMutex + logger modular.Logger + + // periodic sync state + syncStop chan struct{} + syncWg sync.WaitGroup } // normalizePartitionConfig applies defaults to a PartitionConfig and returns the result. @@ -190,6 +207,7 @@ func (p *PartitionedDatabase) Name() string { return p.name } // Init registers this module as a service. func (p *PartitionedDatabase) Init(app modular.Application) error { + p.logger = app.Logger() return app.RegisterService(p.name, p) } @@ -209,13 +227,83 @@ func (p *PartitionedDatabase) RequiresServices() []modular.ServiceDependency { return nil } -// Start opens the database connection during application startup. +// Start opens the database connection during application startup. When autoSync +// is enabled (the default when any sourceTable is configured), it calls +// SyncPartitionsFromSource to create partitions for all existing tenant values. +// When syncInterval is configured, a background goroutine periodically re-syncs +// partitions at that interval. func (p *PartitionedDatabase) Start(ctx context.Context) error { - return p.base.Start(ctx) + if err := p.base.Start(ctx); err != nil { + return err + } + + // Determine whether any partition config has a sourceTable. + hasSourceTable := false + for _, cfg := range p.partitions { + if cfg.SourceTable != "" { + hasSourceTable = true + break + } + } + + // Auto-sync on startup: default true when sourceTable is configured. + autoSync := hasSourceTable + if p.config.AutoSync != nil { + autoSync = *p.config.AutoSync + } + + if autoSync && hasSourceTable { + if err := p.SyncPartitionsFromSource(ctx); err != nil { + return fmt.Errorf("partitioned database %q: auto-sync on startup failed: %w", p.name, err) + } + } + + // Start periodic sync goroutine if syncInterval is configured. + if p.config.SyncInterval != "" && hasSourceTable { + interval, err := time.ParseDuration(p.config.SyncInterval) + if err != nil { + return fmt.Errorf("partitioned database %q: invalid syncInterval %q: %w", p.name, p.config.SyncInterval, err) + } + if interval > 0 { + p.syncStop = make(chan struct{}) + p.syncWg.Add(1) + go p.runPeriodicSync(ctx, interval) + } + } + + return nil +} + +// runPeriodicSync runs SyncPartitionsFromSource on a ticker until stopSync is +// closed or the parent context is cancelled. +func (p *PartitionedDatabase) runPeriodicSync(ctx context.Context, interval time.Duration) { + defer p.syncWg.Done() + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-p.syncStop: + return + case <-ctx.Done(): + return + case <-ticker.C: + if err := p.SyncPartitionsFromSource(ctx); err != nil { + if p.logger != nil { + p.logger.Error("partitioned database periodic sync failed", + "module", p.name, "error", err) + } + } + } + } } // Stop closes the database connection during application shutdown. func (p *PartitionedDatabase) Stop(ctx context.Context) error { + if p.syncStop != nil { + close(p.syncStop) + p.syncWg.Wait() + p.syncStop = nil + } return p.base.Stop(ctx) } diff --git a/module/database_partitioned_test.go b/module/database_partitioned_test.go index db884d8e..cc968825 100644 --- a/module/database_partitioned_test.go +++ b/module/database_partitioned_test.go @@ -6,6 +6,7 @@ import ( "fmt" "strings" "testing" + "time" ) func TestPartitionedDatabase_PartitionKey(t *testing.T) { @@ -838,3 +839,207 @@ func TestDBSyncPartitionsStep_NotPartitionManager(t *testing.T) { t.Fatal("expected error when service does not implement PartitionManager") } } + +// ─── Auto-sync and periodic sync tests ─────────────────────────────────────── + +// boolPtr is a test helper that returns a pointer to a bool value. +func boolPtr(v bool) *bool { return &v } + +func TestPartitionedDatabase_Start_NoSourceTable_NoSync(t *testing.T) { + // When no sourceTable is configured, Start should succeed without attempting sync. + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + PartitionKey: "tenant_id", + Tables: []string{"forms"}, + // No DSN: base.Start is a no-op; no sourceTable: no sync attempted. + } + pd := NewPartitionedDatabase("db", cfg) + + app := NewMockApplication() + if err := pd.Init(app); err != nil { + t.Fatalf("Init error: %v", err) + } + + if err := pd.Start(context.Background()); err != nil { + t.Fatalf("unexpected Start error: %v", err) + } + _ = pd.Stop(context.Background()) +} + +func TestPartitionedDatabase_Start_AutoSyncDisabled_NoSync(t *testing.T) { + // When autoSync is explicitly false, Start should not call SyncPartitionsFromSource + // even when sourceTable is configured. + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + PartitionKey: "tenant_id", + Tables: []string{"forms"}, + SourceTable: "tenants", + AutoSync: boolPtr(false), + // No DSN: base.Start is a no-op; sourceTable set but autoSync=false. + } + pd := NewPartitionedDatabase("db", cfg) + + app := NewMockApplication() + if err := pd.Init(app); err != nil { + t.Fatalf("Init error: %v", err) + } + + if err := pd.Start(context.Background()); err != nil { + t.Fatalf("unexpected Start error: %v", err) + } + _ = pd.Stop(context.Background()) +} + +func TestPartitionedDatabase_Start_AutoSyncEnabled_NilDB(t *testing.T) { + // When autoSync defaults to true and sourceTable is configured, Start must + // attempt SyncPartitionsFromSource. With no DB connection the sync returns + // "database connection is nil", which Start wraps and returns. + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + PartitionKey: "tenant_id", + Tables: []string{"forms"}, + SourceTable: "tenants", + // No DSN: base.Start is a no-op so DB stays nil. + // AutoSync not set: defaults to true when sourceTable is present. + } + pd := NewPartitionedDatabase("db", cfg) + + app := NewMockApplication() + if err := pd.Init(app); err != nil { + t.Fatalf("Init error: %v", err) + } + + err := pd.Start(context.Background()) + if err == nil { + t.Fatal("expected Start to return an error when DB connection is nil") + } + if !strings.Contains(err.Error(), "auto-sync on startup failed") { + t.Errorf("expected auto-sync error message, got: %v", err) + } +} + +func TestPartitionedDatabase_Start_InvalidSyncInterval(t *testing.T) { + // An invalid syncInterval string must cause Start to return a parse error. + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + PartitionKey: "tenant_id", + Tables: []string{"forms"}, + SourceTable: "tenants", + AutoSync: boolPtr(false), // skip startup sync so we reach interval parsing + SyncInterval: "not-a-duration", + } + pd := NewPartitionedDatabase("db", cfg) + + app := NewMockApplication() + if err := pd.Init(app); err != nil { + t.Fatalf("Init error: %v", err) + } + + err := pd.Start(context.Background()) + if err == nil { + t.Fatal("expected Start to return an error for invalid syncInterval") + } + if !strings.Contains(err.Error(), "invalid syncInterval") { + t.Errorf("expected syncInterval parse error, got: %v", err) + } +} + +func TestPartitionedDatabase_SyncInterval_NoSourceTable_NoGoroutine(t *testing.T) { + // When syncInterval is set but no sourceTable is configured, no background + // goroutine is started (hasSourceTable=false gates the goroutine launch). + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + PartitionKey: "tenant_id", + Tables: []string{"forms"}, + SyncInterval: "100ms", + // No sourceTable: no goroutine should be started. + } + pd := NewPartitionedDatabase("db", cfg) + + app := NewMockApplication() + if err := pd.Init(app); err != nil { + t.Fatalf("Init error: %v", err) + } + + if err := pd.Start(context.Background()); err != nil { + t.Fatalf("unexpected Start error: %v", err) + } + + if pd.syncStop != nil { + t.Error("expected syncStop channel to be nil when no sourceTable is configured") + } + + if err := pd.Stop(context.Background()); err != nil { + t.Fatalf("unexpected Stop error: %v", err) + } +} + +func TestPartitionedDatabase_PeriodicSync_GoroutineLifecycle(t *testing.T) { + // When sourceTable is configured, autoSync is false, and syncInterval is set, + // a background goroutine must be launched. Stop must cleanly terminate it. + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + PartitionKey: "tenant_id", + Tables: []string{"forms"}, + SourceTable: "tenants", + AutoSync: boolPtr(false), // skip startup sync + SyncInterval: "100ms", + } + pd := NewPartitionedDatabase("db", cfg) + + app := NewMockApplication() + if err := pd.Init(app); err != nil { + t.Fatalf("Init error: %v", err) + } + + if err := pd.Start(context.Background()); err != nil { + t.Fatalf("unexpected Start error: %v", err) + } + + if pd.syncStop == nil { + t.Fatal("expected syncStop channel to be set after Start with syncInterval") + } + + // Allow at least one tick; the goroutine will log nil-DB error but must + // not panic or deadlock. + time.Sleep(150 * time.Millisecond) + + done := make(chan error, 1) + go func() { done <- pd.Stop(context.Background()) }() + + select { + case err := <-done: + if err != nil { + t.Errorf("unexpected Stop error: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Stop did not return within 2 seconds") + } +} + +func TestPartitionedDatabase_AutoSync_DefaultTrueWhenSourceTableSet(t *testing.T) { + // Confirm that AutoSync==nil is treated as "true" when sourceTable is + // configured: Start must attempt sync (and fail with nil DB error). + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + SourceTable: "tenants", + // AutoSync is nil: should behave as true when sourceTable is present. + } + if cfg.AutoSync != nil { + t.Fatal("AutoSync must be nil for this test to be meaningful") + } + + pd := NewPartitionedDatabase("db", cfg) + app := NewMockApplication() + if err := pd.Init(app); err != nil { + t.Fatalf("Init error: %v", err) + } + + err := pd.Start(context.Background()) + if err == nil { + t.Fatal("expected Start to fail when autoSync defaults to true and DB is nil") + } + if !strings.Contains(err.Error(), "auto-sync on startup failed") { + t.Errorf("expected auto-sync startup error, got: %v", err) + } +} diff --git a/plugins/storage/plugin.go b/plugins/storage/plugin.go index 5a40f448..350920ce 100644 --- a/plugins/storage/plugin.go +++ b/plugins/storage/plugin.go @@ -186,6 +186,12 @@ func (p *Plugin) ModuleFactories() map[string]plugin.ModuleFactory { if sc, ok := cfg["sourceColumn"].(string); ok { partCfg.SourceColumn = sc } + if autoSync, ok := cfg["autoSync"].(bool); ok { + partCfg.AutoSync = &autoSync + } + if syncInterval, ok := cfg["syncInterval"].(string); ok { + partCfg.SyncInterval = syncInterval + } if partitions, ok := cfg["partitions"].([]any); ok { for _, item := range partitions { pMap, ok := item.(map[string]any) @@ -399,10 +405,12 @@ func (p *Plugin) ModuleSchemas() []*schema.ModuleSchema { {Key: "partitionNameFormat", Label: "Partition Name Format", Type: schema.FieldTypeString, DefaultValue: "{table}_{tenant}", Description: "Template for partition table names. Supports {table} and {tenant} placeholders.", Placeholder: "{table}_{tenant}"}, {Key: "sourceTable", Label: "Source Table", Type: schema.FieldTypeString, Description: "Table containing all tenant IDs for auto-partition sync (e.g. tenants)", Placeholder: "tenants"}, {Key: "sourceColumn", Label: "Source Column", Type: schema.FieldTypeString, Description: "Column in source table to query for tenant values. Defaults to partitionKey.", Placeholder: "id"}, + {Key: "autoSync", Label: "Auto Sync", Type: schema.FieldTypeBool, DefaultValue: true, Description: "Automatically sync partitions from sourceTable on startup. Defaults to true when sourceTable is set."}, + {Key: "syncInterval", Label: "Sync Interval", Type: schema.FieldTypeDuration, Description: "Interval for periodic partition re-sync from sourceTable (e.g. 60s, 5m). Leave empty to disable.", Placeholder: "60s"}, {Key: "maxOpenConns", Label: "Max Open Connections", Type: schema.FieldTypeNumber, DefaultValue: 25, Description: "Maximum number of open database connections"}, {Key: "maxIdleConns", Label: "Max Idle Connections", Type: schema.FieldTypeNumber, DefaultValue: 5, Description: "Maximum number of idle connections in the pool"}, }, - DefaultConfig: map[string]any{"maxOpenConns": 25, "maxIdleConns": 5, "partitionType": "list", "partitionNameFormat": "{table}_{tenant}"}, + DefaultConfig: map[string]any{"maxOpenConns": 25, "maxIdleConns": 5, "partitionType": "list", "partitionNameFormat": "{table}_{tenant}", "autoSync": true}, }, { Type: "persistence.store", From df4dd18ef1d2fde932211f0bd8b042e208952c4d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 12 Mar 2026 00:17:19 +0000 Subject: [PATCH 3/3] database.partitioned: fix resource leaks, nil-DB goroutine guard, flaky test Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- module/database_partitioned.go | 10 ++++++++ module/database_partitioned_test.go | 38 +++++++++++++++++++++++++---- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/module/database_partitioned.go b/module/database_partitioned.go index e1922daa..1729e237 100644 --- a/module/database_partitioned.go +++ b/module/database_partitioned.go @@ -254,6 +254,8 @@ func (p *PartitionedDatabase) Start(ctx context.Context) error { if autoSync && hasSourceTable { if err := p.SyncPartitionsFromSource(ctx); err != nil { + // DB was opened; close it to avoid leaking the connection on startup failure. + _ = p.base.Stop(ctx) return fmt.Errorf("partitioned database %q: auto-sync on startup failed: %w", p.name, err) } } @@ -262,9 +264,17 @@ func (p *PartitionedDatabase) Start(ctx context.Context) error { if p.config.SyncInterval != "" && hasSourceTable { interval, err := time.ParseDuration(p.config.SyncInterval) if err != nil { + // DB was opened; close it to avoid leaking the connection on startup failure. + _ = p.base.Stop(ctx) return fmt.Errorf("partitioned database %q: invalid syncInterval %q: %w", p.name, p.config.SyncInterval, err) } if interval > 0 { + if p.base.DB() == nil { + // No database connection is available; starting the goroutine would + // produce repeated error logs with no useful work. + _ = p.base.Stop(ctx) + return fmt.Errorf("partitioned database %q: syncInterval requires an open database connection (is DSN configured?)", p.name) + } p.syncStop = make(chan struct{}) p.syncWg.Add(1) go p.runPeriodicSync(ctx, interval) diff --git a/module/database_partitioned_test.go b/module/database_partitioned_test.go index cc968825..87a6945a 100644 --- a/module/database_partitioned_test.go +++ b/module/database_partitioned_test.go @@ -977,8 +977,10 @@ func TestPartitionedDatabase_SyncInterval_NoSourceTable_NoGoroutine(t *testing.T func TestPartitionedDatabase_PeriodicSync_GoroutineLifecycle(t *testing.T) { // When sourceTable is configured, autoSync is false, and syncInterval is set, // a background goroutine must be launched. Stop must cleanly terminate it. + // Use sqlite so the DB connection is real (nil-DB guard requires an open connection). cfg := PartitionedDatabaseConfig{ - Driver: "pgx", + Driver: "sqlite", + DSN: ":memory:", PartitionKey: "tenant_id", Tables: []string{"forms"}, SourceTable: "tenants", @@ -1000,10 +1002,7 @@ func TestPartitionedDatabase_PeriodicSync_GoroutineLifecycle(t *testing.T) { t.Fatal("expected syncStop channel to be set after Start with syncInterval") } - // Allow at least one tick; the goroutine will log nil-DB error but must - // not panic or deadlock. - time.Sleep(150 * time.Millisecond) - + // Ensure Stop cleanly terminates the background goroutine without panic or deadlock. done := make(chan error, 1) go func() { done <- pd.Stop(context.Background()) }() @@ -1043,3 +1042,32 @@ func TestPartitionedDatabase_AutoSync_DefaultTrueWhenSourceTableSet(t *testing.T t.Errorf("expected auto-sync startup error, got: %v", err) } } + +func TestPartitionedDatabase_SyncInterval_NilDB_ReturnsError(t *testing.T) { + // When syncInterval is configured and sourceTable is set, but no DSN is + // provided (DB is nil), Start must return a clear error instead of starting + // a goroutine that would repeatedly fail and produce log noise. + cfg := PartitionedDatabaseConfig{ + Driver: "pgx", + PartitionKey: "tenant_id", + Tables: []string{"forms"}, + SourceTable: "tenants", + AutoSync: boolPtr(false), // skip startup sync to isolate interval check + SyncInterval: "100ms", + // No DSN: base.Start is a no-op → DB remains nil. + } + pd := NewPartitionedDatabase("db", cfg) + + app := NewMockApplication() + if err := pd.Init(app); err != nil { + t.Fatalf("Init error: %v", err) + } + + err := pd.Start(context.Background()) + if err == nil { + t.Fatal("expected Start to return an error when syncInterval is set but DB is nil") + } + if !strings.Contains(err.Error(), "syncInterval requires an open database connection") { + t.Errorf("expected nil-DB syncInterval error, got: %v", err) + } +}