Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 100 additions & 2 deletions module/database_partitioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"regexp"
"strings"
"sync"
"time"

"github.com/GoCodeAlone/modular"
)
Expand Down Expand Up @@ -120,6 +121,17 @@ type PartitionedDatabaseConfig struct {
// Defaults to PartitionKey if empty.
SourceColumn string `json:"sourceColumn" yaml:"sourceColumn"`

// ── Lifecycle sync settings ───────────────────────────────────────────────
// AutoSync controls whether SyncPartitionsFromSource is called automatically
// during Start(). Defaults to true when any sourceTable is configured.
// Set to false to disable automatic sync on startup.
AutoSync *bool `json:"autoSync" yaml:"autoSync"`
// SyncInterval is a duration string (e.g. "60s", "5m") for periodic
// re-sync of partitions from the source table. When set, a background
// goroutine calls SyncPartitionsFromSource at this interval after Start().
// Requires at least one sourceTable to be configured. Example: "60s".
SyncInterval string `json:"syncInterval" yaml:"syncInterval"`

// ── Multi-partition mode ─────────────────────────────────────────────────
// Partitions lists independent partition key configurations. When non-empty,
// the single-partition fields above are ignored.
Expand All @@ -135,6 +147,11 @@ type PartitionedDatabase struct {
partitions []PartitionConfig // normalized; always len >= 1 after construction
base *WorkflowDatabase
mu sync.RWMutex
logger modular.Logger

// periodic sync state
syncStop chan struct{}
syncWg sync.WaitGroup
}

// normalizePartitionConfig applies defaults to a PartitionConfig and returns the result.
Expand Down Expand Up @@ -190,6 +207,7 @@ func (p *PartitionedDatabase) Name() string { return p.name }

// Init registers this module as a service.
func (p *PartitionedDatabase) Init(app modular.Application) error {
p.logger = app.Logger()
return app.RegisterService(p.name, p)
}

Expand All @@ -209,13 +227,93 @@ func (p *PartitionedDatabase) RequiresServices() []modular.ServiceDependency {
return nil
}

// Start opens the database connection during application startup.
// Start opens the database connection during application startup. When autoSync
// is enabled (the default when any sourceTable is configured), it calls
// SyncPartitionsFromSource to create partitions for all existing tenant values.
// When syncInterval is configured, a background goroutine periodically re-syncs
// partitions at that interval.
func (p *PartitionedDatabase) Start(ctx context.Context) error {
return p.base.Start(ctx)
if err := p.base.Start(ctx); err != nil {
return err
}

// Determine whether any partition config has a sourceTable.
hasSourceTable := false
for _, cfg := range p.partitions {
if cfg.SourceTable != "" {
hasSourceTable = true
break
}
}

// Auto-sync on startup: default true when sourceTable is configured.
autoSync := hasSourceTable
if p.config.AutoSync != nil {
autoSync = *p.config.AutoSync
}

if autoSync && hasSourceTable {
if err := p.SyncPartitionsFromSource(ctx); err != nil {
// DB was opened; close it to avoid leaking the connection on startup failure.
_ = p.base.Stop(ctx)
return fmt.Errorf("partitioned database %q: auto-sync on startup failed: %w", p.name, err)
}
}

// Start periodic sync goroutine if syncInterval is configured.
if p.config.SyncInterval != "" && hasSourceTable {
interval, err := time.ParseDuration(p.config.SyncInterval)
if err != nil {
// DB was opened; close it to avoid leaking the connection on startup failure.
_ = p.base.Stop(ctx)
return fmt.Errorf("partitioned database %q: invalid syncInterval %q: %w", p.name, p.config.SyncInterval, err)
}
if interval > 0 {
if p.base.DB() == nil {
// No database connection is available; starting the goroutine would
// produce repeated error logs with no useful work.
_ = p.base.Stop(ctx)
return fmt.Errorf("partitioned database %q: syncInterval requires an open database connection (is DSN configured?)", p.name)
}
p.syncStop = make(chan struct{})
p.syncWg.Add(1)
go p.runPeriodicSync(ctx, interval)
}
}

return nil
}

// runPeriodicSync runs SyncPartitionsFromSource on a ticker until stopSync is
// closed or the parent context is cancelled.
func (p *PartitionedDatabase) runPeriodicSync(ctx context.Context, interval time.Duration) {
defer p.syncWg.Done()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-p.syncStop:
return
case <-ctx.Done():
return
case <-ticker.C:
if err := p.SyncPartitionsFromSource(ctx); err != nil {
if p.logger != nil {
p.logger.Error("partitioned database periodic sync failed",
"module", p.name, "error", err)
}
}
}
}
}

// Stop closes the database connection during application shutdown.
func (p *PartitionedDatabase) Stop(ctx context.Context) error {
if p.syncStop != nil {
close(p.syncStop)
p.syncWg.Wait()
p.syncStop = nil
}
return p.base.Stop(ctx)
}

Expand Down
233 changes: 233 additions & 0 deletions module/database_partitioned_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"strings"
"testing"
"time"
)

func TestPartitionedDatabase_PartitionKey(t *testing.T) {
Expand Down Expand Up @@ -838,3 +839,235 @@ func TestDBSyncPartitionsStep_NotPartitionManager(t *testing.T) {
t.Fatal("expected error when service does not implement PartitionManager")
}
}

// ─── Auto-sync and periodic sync tests ───────────────────────────────────────

// boolPtr is a test helper that returns a pointer to a bool value.
func boolPtr(v bool) *bool { return &v }

func TestPartitionedDatabase_Start_NoSourceTable_NoSync(t *testing.T) {
// When no sourceTable is configured, Start should succeed without attempting sync.
cfg := PartitionedDatabaseConfig{
Driver: "pgx",
PartitionKey: "tenant_id",
Tables: []string{"forms"},
// No DSN: base.Start is a no-op; no sourceTable: no sync attempted.
}
pd := NewPartitionedDatabase("db", cfg)

app := NewMockApplication()
if err := pd.Init(app); err != nil {
t.Fatalf("Init error: %v", err)
}

if err := pd.Start(context.Background()); err != nil {
t.Fatalf("unexpected Start error: %v", err)
}
_ = pd.Stop(context.Background())
}

func TestPartitionedDatabase_Start_AutoSyncDisabled_NoSync(t *testing.T) {
// When autoSync is explicitly false, Start should not call SyncPartitionsFromSource
// even when sourceTable is configured.
cfg := PartitionedDatabaseConfig{
Driver: "pgx",
PartitionKey: "tenant_id",
Tables: []string{"forms"},
SourceTable: "tenants",
AutoSync: boolPtr(false),
// No DSN: base.Start is a no-op; sourceTable set but autoSync=false.
}
pd := NewPartitionedDatabase("db", cfg)

app := NewMockApplication()
if err := pd.Init(app); err != nil {
t.Fatalf("Init error: %v", err)
}

if err := pd.Start(context.Background()); err != nil {
t.Fatalf("unexpected Start error: %v", err)
}
_ = pd.Stop(context.Background())
}

func TestPartitionedDatabase_Start_AutoSyncEnabled_NilDB(t *testing.T) {
// When autoSync defaults to true and sourceTable is configured, Start must
// attempt SyncPartitionsFromSource. With no DB connection the sync returns
// "database connection is nil", which Start wraps and returns.
cfg := PartitionedDatabaseConfig{
Driver: "pgx",
PartitionKey: "tenant_id",
Tables: []string{"forms"},
SourceTable: "tenants",
// No DSN: base.Start is a no-op so DB stays nil.
// AutoSync not set: defaults to true when sourceTable is present.
}
pd := NewPartitionedDatabase("db", cfg)

app := NewMockApplication()
if err := pd.Init(app); err != nil {
t.Fatalf("Init error: %v", err)
}

err := pd.Start(context.Background())
if err == nil {
t.Fatal("expected Start to return an error when DB connection is nil")
}
if !strings.Contains(err.Error(), "auto-sync on startup failed") {
t.Errorf("expected auto-sync error message, got: %v", err)
}
}

func TestPartitionedDatabase_Start_InvalidSyncInterval(t *testing.T) {
// An invalid syncInterval string must cause Start to return a parse error.
cfg := PartitionedDatabaseConfig{
Driver: "pgx",
PartitionKey: "tenant_id",
Tables: []string{"forms"},
SourceTable: "tenants",
AutoSync: boolPtr(false), // skip startup sync so we reach interval parsing
SyncInterval: "not-a-duration",
}
pd := NewPartitionedDatabase("db", cfg)

app := NewMockApplication()
if err := pd.Init(app); err != nil {
t.Fatalf("Init error: %v", err)
}

err := pd.Start(context.Background())
if err == nil {
t.Fatal("expected Start to return an error for invalid syncInterval")
}
if !strings.Contains(err.Error(), "invalid syncInterval") {
t.Errorf("expected syncInterval parse error, got: %v", err)
}
}

func TestPartitionedDatabase_SyncInterval_NoSourceTable_NoGoroutine(t *testing.T) {
// When syncInterval is set but no sourceTable is configured, no background
// goroutine is started (hasSourceTable=false gates the goroutine launch).
cfg := PartitionedDatabaseConfig{
Driver: "pgx",
PartitionKey: "tenant_id",
Tables: []string{"forms"},
SyncInterval: "100ms",
// No sourceTable: no goroutine should be started.
}
pd := NewPartitionedDatabase("db", cfg)

app := NewMockApplication()
if err := pd.Init(app); err != nil {
t.Fatalf("Init error: %v", err)
}

if err := pd.Start(context.Background()); err != nil {
t.Fatalf("unexpected Start error: %v", err)
}

if pd.syncStop != nil {
t.Error("expected syncStop channel to be nil when no sourceTable is configured")
}

if err := pd.Stop(context.Background()); err != nil {
t.Fatalf("unexpected Stop error: %v", err)
}
}

func TestPartitionedDatabase_PeriodicSync_GoroutineLifecycle(t *testing.T) {
// When sourceTable is configured, autoSync is false, and syncInterval is set,
// a background goroutine must be launched. Stop must cleanly terminate it.
// Use sqlite so the DB connection is real (nil-DB guard requires an open connection).
cfg := PartitionedDatabaseConfig{
Driver: "sqlite",
DSN: ":memory:",
PartitionKey: "tenant_id",
Tables: []string{"forms"},
SourceTable: "tenants",
AutoSync: boolPtr(false), // skip startup sync
SyncInterval: "100ms",
}
pd := NewPartitionedDatabase("db", cfg)

app := NewMockApplication()
if err := pd.Init(app); err != nil {
t.Fatalf("Init error: %v", err)
}

if err := pd.Start(context.Background()); err != nil {
t.Fatalf("unexpected Start error: %v", err)
}

if pd.syncStop == nil {
t.Fatal("expected syncStop channel to be set after Start with syncInterval")
}

// Ensure Stop cleanly terminates the background goroutine without panic or deadlock.
done := make(chan error, 1)
go func() { done <- pd.Stop(context.Background()) }()

select {
case err := <-done:
if err != nil {
t.Errorf("unexpected Stop error: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("Stop did not return within 2 seconds")
}
}

func TestPartitionedDatabase_AutoSync_DefaultTrueWhenSourceTableSet(t *testing.T) {
// Confirm that AutoSync==nil is treated as "true" when sourceTable is
// configured: Start must attempt sync (and fail with nil DB error).
cfg := PartitionedDatabaseConfig{
Driver: "pgx",
SourceTable: "tenants",
// AutoSync is nil: should behave as true when sourceTable is present.
}
if cfg.AutoSync != nil {
t.Fatal("AutoSync must be nil for this test to be meaningful")
}

pd := NewPartitionedDatabase("db", cfg)
app := NewMockApplication()
if err := pd.Init(app); err != nil {
t.Fatalf("Init error: %v", err)
}

err := pd.Start(context.Background())
if err == nil {
t.Fatal("expected Start to fail when autoSync defaults to true and DB is nil")
}
if !strings.Contains(err.Error(), "auto-sync on startup failed") {
t.Errorf("expected auto-sync startup error, got: %v", err)
}
}

func TestPartitionedDatabase_SyncInterval_NilDB_ReturnsError(t *testing.T) {
// When syncInterval is configured and sourceTable is set, but no DSN is
// provided (DB is nil), Start must return a clear error instead of starting
// a goroutine that would repeatedly fail and produce log noise.
cfg := PartitionedDatabaseConfig{
Driver: "pgx",
PartitionKey: "tenant_id",
Tables: []string{"forms"},
SourceTable: "tenants",
AutoSync: boolPtr(false), // skip startup sync to isolate interval check
SyncInterval: "100ms",
// No DSN: base.Start is a no-op → DB remains nil.
}
pd := NewPartitionedDatabase("db", cfg)

app := NewMockApplication()
if err := pd.Init(app); err != nil {
t.Fatalf("Init error: %v", err)
}

err := pd.Start(context.Background())
if err == nil {
t.Fatal("expected Start to return an error when syncInterval is set but DB is nil")
}
if !strings.Contains(err.Error(), "syncInterval requires an open database connection") {
t.Errorf("expected nil-DB syncInterval error, got: %v", err)
}
}
Loading
Loading