Skip to content

Commit 00f9451

Browse files
Copilotintel352
andauthored
database.partitioned: automatic partition sync on engine start with lifecycle hooks (#298)
* Initial plan * database.partitioned: add autoSync and syncInterval lifecycle hooks Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> * database.partitioned: fix resource leaks, nil-DB goroutine guard, flaky test Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> Co-authored-by: Jonathan Langevin <codingsloth@pm.me>
1 parent abf649a commit 00f9451

3 files changed

Lines changed: 342 additions & 3 deletions

File tree

module/database_partitioned.go

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"regexp"
88
"strings"
99
"sync"
10+
"time"
1011

1112
"github.com/GoCodeAlone/modular"
1213
)
@@ -120,6 +121,17 @@ type PartitionedDatabaseConfig struct {
120121
// Defaults to PartitionKey if empty.
121122
SourceColumn string `json:"sourceColumn" yaml:"sourceColumn"`
122123

124+
// ── Lifecycle sync settings ───────────────────────────────────────────────
125+
// AutoSync controls whether SyncPartitionsFromSource is called automatically
126+
// during Start(). Defaults to true when any sourceTable is configured.
127+
// Set to false to disable automatic sync on startup.
128+
AutoSync *bool `json:"autoSync" yaml:"autoSync"`
129+
// SyncInterval is a duration string (e.g. "60s", "5m") for periodic
130+
// re-sync of partitions from the source table. When set, a background
131+
// goroutine calls SyncPartitionsFromSource at this interval after Start().
132+
// Requires at least one sourceTable to be configured. Example: "60s".
133+
SyncInterval string `json:"syncInterval" yaml:"syncInterval"`
134+
123135
// ── Multi-partition mode ─────────────────────────────────────────────────
124136
// Partitions lists independent partition key configurations. When non-empty,
125137
// the single-partition fields above are ignored.
@@ -135,6 +147,11 @@ type PartitionedDatabase struct {
135147
partitions []PartitionConfig // normalized; always len >= 1 after construction
136148
base *WorkflowDatabase
137149
mu sync.RWMutex
150+
logger modular.Logger
151+
152+
// periodic sync state
153+
syncStop chan struct{}
154+
syncWg sync.WaitGroup
138155
}
139156

140157
// normalizePartitionConfig applies defaults to a PartitionConfig and returns the result.
@@ -190,6 +207,7 @@ func (p *PartitionedDatabase) Name() string { return p.name }
190207

191208
// Init registers this module as a service.
192209
func (p *PartitionedDatabase) Init(app modular.Application) error {
210+
p.logger = app.Logger()
193211
return app.RegisterService(p.name, p)
194212
}
195213

@@ -209,13 +227,93 @@ func (p *PartitionedDatabase) RequiresServices() []modular.ServiceDependency {
209227
return nil
210228
}
211229

212-
// Start opens the database connection during application startup.
230+
// Start opens the database connection during application startup. When autoSync
231+
// is enabled (the default when any sourceTable is configured), it calls
232+
// SyncPartitionsFromSource to create partitions for all existing tenant values.
233+
// When syncInterval is configured, a background goroutine periodically re-syncs
234+
// partitions at that interval.
213235
func (p *PartitionedDatabase) Start(ctx context.Context) error {
214-
return p.base.Start(ctx)
236+
if err := p.base.Start(ctx); err != nil {
237+
return err
238+
}
239+
240+
// Determine whether any partition config has a sourceTable.
241+
hasSourceTable := false
242+
for _, cfg := range p.partitions {
243+
if cfg.SourceTable != "" {
244+
hasSourceTable = true
245+
break
246+
}
247+
}
248+
249+
// Auto-sync on startup: default true when sourceTable is configured.
250+
autoSync := hasSourceTable
251+
if p.config.AutoSync != nil {
252+
autoSync = *p.config.AutoSync
253+
}
254+
255+
if autoSync && hasSourceTable {
256+
if err := p.SyncPartitionsFromSource(ctx); err != nil {
257+
// DB was opened; close it to avoid leaking the connection on startup failure.
258+
_ = p.base.Stop(ctx)
259+
return fmt.Errorf("partitioned database %q: auto-sync on startup failed: %w", p.name, err)
260+
}
261+
}
262+
263+
// Start periodic sync goroutine if syncInterval is configured.
264+
if p.config.SyncInterval != "" && hasSourceTable {
265+
interval, err := time.ParseDuration(p.config.SyncInterval)
266+
if err != nil {
267+
// DB was opened; close it to avoid leaking the connection on startup failure.
268+
_ = p.base.Stop(ctx)
269+
return fmt.Errorf("partitioned database %q: invalid syncInterval %q: %w", p.name, p.config.SyncInterval, err)
270+
}
271+
if interval > 0 {
272+
if p.base.DB() == nil {
273+
// No database connection is available; starting the goroutine would
274+
// produce repeated error logs with no useful work.
275+
_ = p.base.Stop(ctx)
276+
return fmt.Errorf("partitioned database %q: syncInterval requires an open database connection (is DSN configured?)", p.name)
277+
}
278+
p.syncStop = make(chan struct{})
279+
p.syncWg.Add(1)
280+
go p.runPeriodicSync(ctx, interval)
281+
}
282+
}
283+
284+
return nil
285+
}
286+
287+
// runPeriodicSync runs SyncPartitionsFromSource on a ticker until stopSync is
288+
// closed or the parent context is cancelled.
289+
func (p *PartitionedDatabase) runPeriodicSync(ctx context.Context, interval time.Duration) {
290+
defer p.syncWg.Done()
291+
ticker := time.NewTicker(interval)
292+
defer ticker.Stop()
293+
for {
294+
select {
295+
case <-p.syncStop:
296+
return
297+
case <-ctx.Done():
298+
return
299+
case <-ticker.C:
300+
if err := p.SyncPartitionsFromSource(ctx); err != nil {
301+
if p.logger != nil {
302+
p.logger.Error("partitioned database periodic sync failed",
303+
"module", p.name, "error", err)
304+
}
305+
}
306+
}
307+
}
215308
}
216309

217310
// Stop closes the database connection during application shutdown.
218311
func (p *PartitionedDatabase) Stop(ctx context.Context) error {
312+
if p.syncStop != nil {
313+
close(p.syncStop)
314+
p.syncWg.Wait()
315+
p.syncStop = nil
316+
}
219317
return p.base.Stop(ctx)
220318
}
221319

module/database_partitioned_test.go

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"strings"
88
"testing"
9+
"time"
910
)
1011

1112
func TestPartitionedDatabase_PartitionKey(t *testing.T) {
@@ -838,3 +839,235 @@ func TestDBSyncPartitionsStep_NotPartitionManager(t *testing.T) {
838839
t.Fatal("expected error when service does not implement PartitionManager")
839840
}
840841
}
842+
843+
// ─── Auto-sync and periodic sync tests ───────────────────────────────────────
844+
845+
// boolPtr is a test helper that returns a pointer to a bool value.
846+
func boolPtr(v bool) *bool { return &v }
847+
848+
func TestPartitionedDatabase_Start_NoSourceTable_NoSync(t *testing.T) {
849+
// When no sourceTable is configured, Start should succeed without attempting sync.
850+
cfg := PartitionedDatabaseConfig{
851+
Driver: "pgx",
852+
PartitionKey: "tenant_id",
853+
Tables: []string{"forms"},
854+
// No DSN: base.Start is a no-op; no sourceTable: no sync attempted.
855+
}
856+
pd := NewPartitionedDatabase("db", cfg)
857+
858+
app := NewMockApplication()
859+
if err := pd.Init(app); err != nil {
860+
t.Fatalf("Init error: %v", err)
861+
}
862+
863+
if err := pd.Start(context.Background()); err != nil {
864+
t.Fatalf("unexpected Start error: %v", err)
865+
}
866+
_ = pd.Stop(context.Background())
867+
}
868+
869+
func TestPartitionedDatabase_Start_AutoSyncDisabled_NoSync(t *testing.T) {
870+
// When autoSync is explicitly false, Start should not call SyncPartitionsFromSource
871+
// even when sourceTable is configured.
872+
cfg := PartitionedDatabaseConfig{
873+
Driver: "pgx",
874+
PartitionKey: "tenant_id",
875+
Tables: []string{"forms"},
876+
SourceTable: "tenants",
877+
AutoSync: boolPtr(false),
878+
// No DSN: base.Start is a no-op; sourceTable set but autoSync=false.
879+
}
880+
pd := NewPartitionedDatabase("db", cfg)
881+
882+
app := NewMockApplication()
883+
if err := pd.Init(app); err != nil {
884+
t.Fatalf("Init error: %v", err)
885+
}
886+
887+
if err := pd.Start(context.Background()); err != nil {
888+
t.Fatalf("unexpected Start error: %v", err)
889+
}
890+
_ = pd.Stop(context.Background())
891+
}
892+
893+
func TestPartitionedDatabase_Start_AutoSyncEnabled_NilDB(t *testing.T) {
894+
// When autoSync defaults to true and sourceTable is configured, Start must
895+
// attempt SyncPartitionsFromSource. With no DB connection the sync returns
896+
// "database connection is nil", which Start wraps and returns.
897+
cfg := PartitionedDatabaseConfig{
898+
Driver: "pgx",
899+
PartitionKey: "tenant_id",
900+
Tables: []string{"forms"},
901+
SourceTable: "tenants",
902+
// No DSN: base.Start is a no-op so DB stays nil.
903+
// AutoSync not set: defaults to true when sourceTable is present.
904+
}
905+
pd := NewPartitionedDatabase("db", cfg)
906+
907+
app := NewMockApplication()
908+
if err := pd.Init(app); err != nil {
909+
t.Fatalf("Init error: %v", err)
910+
}
911+
912+
err := pd.Start(context.Background())
913+
if err == nil {
914+
t.Fatal("expected Start to return an error when DB connection is nil")
915+
}
916+
if !strings.Contains(err.Error(), "auto-sync on startup failed") {
917+
t.Errorf("expected auto-sync error message, got: %v", err)
918+
}
919+
}
920+
921+
func TestPartitionedDatabase_Start_InvalidSyncInterval(t *testing.T) {
922+
// An invalid syncInterval string must cause Start to return a parse error.
923+
cfg := PartitionedDatabaseConfig{
924+
Driver: "pgx",
925+
PartitionKey: "tenant_id",
926+
Tables: []string{"forms"},
927+
SourceTable: "tenants",
928+
AutoSync: boolPtr(false), // skip startup sync so we reach interval parsing
929+
SyncInterval: "not-a-duration",
930+
}
931+
pd := NewPartitionedDatabase("db", cfg)
932+
933+
app := NewMockApplication()
934+
if err := pd.Init(app); err != nil {
935+
t.Fatalf("Init error: %v", err)
936+
}
937+
938+
err := pd.Start(context.Background())
939+
if err == nil {
940+
t.Fatal("expected Start to return an error for invalid syncInterval")
941+
}
942+
if !strings.Contains(err.Error(), "invalid syncInterval") {
943+
t.Errorf("expected syncInterval parse error, got: %v", err)
944+
}
945+
}
946+
947+
func TestPartitionedDatabase_SyncInterval_NoSourceTable_NoGoroutine(t *testing.T) {
948+
// When syncInterval is set but no sourceTable is configured, no background
949+
// goroutine is started (hasSourceTable=false gates the goroutine launch).
950+
cfg := PartitionedDatabaseConfig{
951+
Driver: "pgx",
952+
PartitionKey: "tenant_id",
953+
Tables: []string{"forms"},
954+
SyncInterval: "100ms",
955+
// No sourceTable: no goroutine should be started.
956+
}
957+
pd := NewPartitionedDatabase("db", cfg)
958+
959+
app := NewMockApplication()
960+
if err := pd.Init(app); err != nil {
961+
t.Fatalf("Init error: %v", err)
962+
}
963+
964+
if err := pd.Start(context.Background()); err != nil {
965+
t.Fatalf("unexpected Start error: %v", err)
966+
}
967+
968+
if pd.syncStop != nil {
969+
t.Error("expected syncStop channel to be nil when no sourceTable is configured")
970+
}
971+
972+
if err := pd.Stop(context.Background()); err != nil {
973+
t.Fatalf("unexpected Stop error: %v", err)
974+
}
975+
}
976+
977+
func TestPartitionedDatabase_PeriodicSync_GoroutineLifecycle(t *testing.T) {
978+
// When sourceTable is configured, autoSync is false, and syncInterval is set,
979+
// a background goroutine must be launched. Stop must cleanly terminate it.
980+
// Use sqlite so the DB connection is real (nil-DB guard requires an open connection).
981+
cfg := PartitionedDatabaseConfig{
982+
Driver: "sqlite",
983+
DSN: ":memory:",
984+
PartitionKey: "tenant_id",
985+
Tables: []string{"forms"},
986+
SourceTable: "tenants",
987+
AutoSync: boolPtr(false), // skip startup sync
988+
SyncInterval: "100ms",
989+
}
990+
pd := NewPartitionedDatabase("db", cfg)
991+
992+
app := NewMockApplication()
993+
if err := pd.Init(app); err != nil {
994+
t.Fatalf("Init error: %v", err)
995+
}
996+
997+
if err := pd.Start(context.Background()); err != nil {
998+
t.Fatalf("unexpected Start error: %v", err)
999+
}
1000+
1001+
if pd.syncStop == nil {
1002+
t.Fatal("expected syncStop channel to be set after Start with syncInterval")
1003+
}
1004+
1005+
// Ensure Stop cleanly terminates the background goroutine without panic or deadlock.
1006+
done := make(chan error, 1)
1007+
go func() { done <- pd.Stop(context.Background()) }()
1008+
1009+
select {
1010+
case err := <-done:
1011+
if err != nil {
1012+
t.Errorf("unexpected Stop error: %v", err)
1013+
}
1014+
case <-time.After(2 * time.Second):
1015+
t.Fatal("Stop did not return within 2 seconds")
1016+
}
1017+
}
1018+
1019+
func TestPartitionedDatabase_AutoSync_DefaultTrueWhenSourceTableSet(t *testing.T) {
1020+
// Confirm that AutoSync==nil is treated as "true" when sourceTable is
1021+
// configured: Start must attempt sync (and fail with nil DB error).
1022+
cfg := PartitionedDatabaseConfig{
1023+
Driver: "pgx",
1024+
SourceTable: "tenants",
1025+
// AutoSync is nil: should behave as true when sourceTable is present.
1026+
}
1027+
if cfg.AutoSync != nil {
1028+
t.Fatal("AutoSync must be nil for this test to be meaningful")
1029+
}
1030+
1031+
pd := NewPartitionedDatabase("db", cfg)
1032+
app := NewMockApplication()
1033+
if err := pd.Init(app); err != nil {
1034+
t.Fatalf("Init error: %v", err)
1035+
}
1036+
1037+
err := pd.Start(context.Background())
1038+
if err == nil {
1039+
t.Fatal("expected Start to fail when autoSync defaults to true and DB is nil")
1040+
}
1041+
if !strings.Contains(err.Error(), "auto-sync on startup failed") {
1042+
t.Errorf("expected auto-sync startup error, got: %v", err)
1043+
}
1044+
}
1045+
1046+
func TestPartitionedDatabase_SyncInterval_NilDB_ReturnsError(t *testing.T) {
1047+
// When syncInterval is configured and sourceTable is set, but no DSN is
1048+
// provided (DB is nil), Start must return a clear error instead of starting
1049+
// a goroutine that would repeatedly fail and produce log noise.
1050+
cfg := PartitionedDatabaseConfig{
1051+
Driver: "pgx",
1052+
PartitionKey: "tenant_id",
1053+
Tables: []string{"forms"},
1054+
SourceTable: "tenants",
1055+
AutoSync: boolPtr(false), // skip startup sync to isolate interval check
1056+
SyncInterval: "100ms",
1057+
// No DSN: base.Start is a no-op → DB remains nil.
1058+
}
1059+
pd := NewPartitionedDatabase("db", cfg)
1060+
1061+
app := NewMockApplication()
1062+
if err := pd.Init(app); err != nil {
1063+
t.Fatalf("Init error: %v", err)
1064+
}
1065+
1066+
err := pd.Start(context.Background())
1067+
if err == nil {
1068+
t.Fatal("expected Start to return an error when syncInterval is set but DB is nil")
1069+
}
1070+
if !strings.Contains(err.Error(), "syncInterval requires an open database connection") {
1071+
t.Errorf("expected nil-DB syncInterval error, got: %v", err)
1072+
}
1073+
}

0 commit comments

Comments
 (0)