diff --git a/cmd/server/main.go b/cmd/server/main.go index ee27695e..bd571b5c 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -603,41 +603,39 @@ func (app *serverApp) initStores(logger *slog.Logger) error { // Timeline, replay, backfill handlers // ----------------------------------------------------------------------- - // Try to discover timeline/replay/backfill from the service registry + // Discover timeline/replay/backfill mux services registered by ProvidesServices // (registered by a timeline.service module). Fall back to direct creation. timelineDiscovered := false + var ( + discoveredTimelineMux http.Handler + discoveredReplayMux http.Handler + discoveredBackfillMux http.Handler + ) for svcName, svc := range engine.GetApp().SvcRegistry() { - if tsMod, ok := svc.(*module.TimelineServiceModule); ok { - app.services.timelineMux = tsMod.TimelineMux() - app.services.replayMux = tsMod.ReplayMux() - app.services.backfillMux = tsMod.BackfillMux() - timelineDiscovered = true - logger.Info("Discovered timeline service from service registry", "service", svcName) - break - } - } - // Also check for individual mux services registered by ProvidesServices - if !timelineDiscovered { - for svcName, svc := range engine.GetApp().SvcRegistry() { - if strings.HasSuffix(svcName, ".timeline") { - if h, ok := svc.(http.Handler); ok { - app.services.timelineMux = h - timelineDiscovered = true - logger.Info("Discovered timeline mux from service registry", "service", svcName) - } + if strings.HasSuffix(svcName, ".timeline") { + if h, ok := svc.(http.Handler); ok { + discoveredTimelineMux = h + logger.Info("Discovered timeline mux from service registry", "service", svcName) } - if strings.HasSuffix(svcName, ".replay") { - if h, ok := svc.(http.Handler); ok { - app.services.replayMux = h - } + } + if strings.HasSuffix(svcName, ".replay") { + if h, ok := svc.(http.Handler); ok { + discoveredReplayMux = h } - if strings.HasSuffix(svcName, ".backfill") { - if h, ok := svc.(http.Handler); ok { - app.services.backfillMux = h - } + } + if strings.HasSuffix(svcName, ".backfill") { + if h, ok := svc.(http.Handler); ok { + discoveredBackfillMux = h } } } + if discoveredTimelineMux != nil && discoveredReplayMux != nil && discoveredBackfillMux != nil { + app.services.timelineMux = discoveredTimelineMux + app.services.replayMux = discoveredReplayMux + app.services.backfillMux = discoveredBackfillMux + timelineDiscovered = true + logger.Info("Discovered timeline, replay, and backfill muxes from service registry") + } if !timelineDiscovered { if eventStore != nil { timelineHandler := evstore.NewTimelineHandler(eventStore, logger) @@ -672,36 +670,29 @@ func (app *serverApp) initStores(logger *slog.Logger) error { // DLQ handler // ----------------------------------------------------------------------- - // Try to discover DLQ from the service registry (registered by a + // Discover DLQ mux and store from the service registry (registered by a // dlq.service module). Fall back to direct creation. dlqDiscovered := false var dlqStore evstore.DLQStore + var discoveredDLQMux http.Handler for svcName, svc := range engine.GetApp().SvcRegistry() { - if dlqMod, ok := svc.(*module.DLQServiceModule); ok { - app.services.dlqMux = dlqMod.DLQMux() - dlqStore = dlqMod.Store() - dlqDiscovered = true - logger.Info("Discovered DLQ service from service registry", "service", svcName) - break - } - } - // Also check for the DLQ store registered by ProvidesServices - if !dlqDiscovered { - for svcName, svc := range engine.GetApp().SvcRegistry() { - if strings.HasSuffix(svcName, ".store") { - if ds, ok := svc.(*evstore.InMemoryDLQStore); ok { - dlqStore = ds - } + if strings.HasSuffix(svcName, ".store") { + if ds, ok := svc.(*evstore.InMemoryDLQStore); ok { + dlqStore = ds } - if strings.HasSuffix(svcName, ".admin") { - if h, ok := svc.(http.Handler); ok && strings.Contains(svcName, "dlq") { - app.services.dlqMux = h - dlqDiscovered = true - logger.Info("Discovered DLQ mux from service registry", "service", svcName) - } + } + if strings.HasSuffix(svcName, ".admin") { + if h, ok := svc.(http.Handler); ok && strings.Contains(svcName, "dlq") { + discoveredDLQMux = h + logger.Info("Discovered DLQ mux from service registry", "service", svcName) } } } + if discoveredDLQMux != nil && dlqStore != nil { + app.services.dlqMux = discoveredDLQMux + dlqDiscovered = true + logger.Info("Discovered DLQ service from service registry") + } if !dlqDiscovered { inMemDLQStore := evstore.NewInMemoryDLQStore() dlqStore = inMemDLQStore diff --git a/module/dlq_service.go b/module/dlq_service.go index b437fac5..8717727b 100644 --- a/module/dlq_service.go +++ b/module/dlq_service.go @@ -10,7 +10,11 @@ import ( // DLQServiceConfig holds the configuration for the DLQ service module. type DLQServiceConfig struct { - MaxRetries int `yaml:"max_retries" default:"3"` + // MaxRetries is reserved for future implementation of per-entry retry limits. + // It is stored and exposed via MaxRetries() but not yet applied to the DLQ store. + MaxRetries int `yaml:"max_retries" default:"3"` + // RetentionDays is reserved for future implementation of automatic DLQ entry purging. + // It is stored and exposed via RetentionDays() but not yet applied to the DLQ store. RetentionDays int `yaml:"retention_days" default:"30"` } diff --git a/module/eventstore_service.go b/module/eventstore_service.go index 3d5e6c55..9f33a28b 100644 --- a/module/eventstore_service.go +++ b/module/eventstore_service.go @@ -12,8 +12,10 @@ import ( // EventStoreServiceConfig holds the configuration for the event store service module. type EventStoreServiceConfig struct { - DBPath string `yaml:"db_path" default:"data/events.db"` - RetentionDays int `yaml:"retention_days" default:"90"` + DBPath string `yaml:"db_path" default:"data/events.db"` + // RetentionDays is reserved for future implementation of automatic event pruning. + // It is stored and exposed via RetentionDays() but not yet applied to the store. + RetentionDays int `yaml:"retention_days" default:"90"` } // EventStoreServiceModule wraps an evstore.SQLiteEventStore as a modular.Module. diff --git a/module/eventstore_service_test.go b/module/eventstore_service_test.go index c2bb1504..714b1590 100644 --- a/module/eventstore_service_test.go +++ b/module/eventstore_service_test.go @@ -1,6 +1,7 @@ package module import ( + "os" "testing" "github.com/CrisisTextLine/modular" @@ -89,17 +90,16 @@ func TestEventStoreServiceModule_RetentionDays(t *testing.T) { } func TestEventStoreServiceModule_DefaultDBPath(t *testing.T) { - // Test that empty DBPath falls back to default - tmpDir := t.TempDir() - m, err := NewEventStoreServiceModule("test-es", EventStoreServiceConfig{ - DBPath: tmpDir + "/events.db", - }) + // Test that an empty DBPath falls back to the default "data/events.db" path. + m, err := NewEventStoreServiceModule("test-es", EventStoreServiceConfig{}) if err != nil { t.Fatalf("NewEventStoreServiceModule() error = %v", err) } if m.Store() == nil { - t.Error("Store() returned nil with explicit path") + t.Error("Store() returned nil with default path") } + // Clean up the created data directory (created as side-effect of default path). + _ = os.RemoveAll("data") } // Verify EventStoreServiceModule satisfies the modular.Module interface. diff --git a/module/timeline_service.go b/module/timeline_service.go index 5044c0ad..7656650a 100644 --- a/module/timeline_service.go +++ b/module/timeline_service.go @@ -23,8 +23,11 @@ type TimelineServiceModule struct { } // NewTimelineServiceModule creates a new timeline service module. -// It requires a non-nil EventStore to function. +// It requires a non-nil EventStore to function. Panics if eventStore is nil. func NewTimelineServiceModule(name string, eventStore evstore.EventStore) *TimelineServiceModule { + if eventStore == nil { + panic("NewTimelineServiceModule: eventStore must not be nil") + } logger := slog.Default() timelineHandler := evstore.NewTimelineHandler(eventStore, logger) diff --git a/plugins/dlq/plugin.go b/plugins/dlq/plugin.go index 55272a7d..b1f37525 100644 --- a/plugins/dlq/plugin.go +++ b/plugins/dlq/plugin.go @@ -44,9 +44,13 @@ func (p *Plugin) ModuleFactories() map[string]plugin.ModuleFactory { } if v, ok := config["max_retries"].(int); ok { cfg.MaxRetries = v + } else if v, ok := config["max_retries"].(float64); ok { + cfg.MaxRetries = int(v) } if v, ok := config["retention_days"].(int); ok { cfg.RetentionDays = v + } else if v, ok := config["retention_days"].(float64); ok { + cfg.RetentionDays = int(v) } return module.NewDLQServiceModule(name, cfg) }, diff --git a/plugins/eventstore/plugin.go b/plugins/eventstore/plugin.go index 0640f11c..a69700bf 100644 --- a/plugins/eventstore/plugin.go +++ b/plugins/eventstore/plugin.go @@ -47,6 +47,8 @@ func (p *Plugin) ModuleFactories() map[string]plugin.ModuleFactory { } if v, ok := config["retention_days"].(int); ok { cfg.RetentionDays = v + } else if v, ok := config["retention_days"].(float64); ok { + cfg.RetentionDays = int(v) } mod, err := module.NewEventStoreServiceModule(name, cfg) if err != nil {