Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 40 additions & 49 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,41 +603,39 @@ func (app *serverApp) initStores(logger *slog.Logger) error {
// Timeline, replay, backfill handlers
// -----------------------------------------------------------------------

// Try to discover timeline/replay/backfill from the service registry
// Discover timeline/replay/backfill mux services registered by ProvidesServices
// (registered by a timeline.service module). Fall back to direct creation.
timelineDiscovered := false
var (
discoveredTimelineMux http.Handler
discoveredReplayMux http.Handler
discoveredBackfillMux http.Handler
)
for svcName, svc := range engine.GetApp().SvcRegistry() {
if tsMod, ok := svc.(*module.TimelineServiceModule); ok {
app.services.timelineMux = tsMod.TimelineMux()
app.services.replayMux = tsMod.ReplayMux()
app.services.backfillMux = tsMod.BackfillMux()
timelineDiscovered = true
logger.Info("Discovered timeline service from service registry", "service", svcName)
break
}
}
// Also check for individual mux services registered by ProvidesServices
if !timelineDiscovered {
for svcName, svc := range engine.GetApp().SvcRegistry() {
if strings.HasSuffix(svcName, ".timeline") {
if h, ok := svc.(http.Handler); ok {
app.services.timelineMux = h
timelineDiscovered = true
logger.Info("Discovered timeline mux from service registry", "service", svcName)
}
if strings.HasSuffix(svcName, ".timeline") {
if h, ok := svc.(http.Handler); ok {
discoveredTimelineMux = h
logger.Info("Discovered timeline mux from service registry", "service", svcName)
}
if strings.HasSuffix(svcName, ".replay") {
if h, ok := svc.(http.Handler); ok {
app.services.replayMux = h
}
}
if strings.HasSuffix(svcName, ".replay") {
if h, ok := svc.(http.Handler); ok {
discoveredReplayMux = h
}
if strings.HasSuffix(svcName, ".backfill") {
if h, ok := svc.(http.Handler); ok {
app.services.backfillMux = h
}
}
if strings.HasSuffix(svcName, ".backfill") {
if h, ok := svc.(http.Handler); ok {
discoveredBackfillMux = h
}
}
}
if discoveredTimelineMux != nil && discoveredReplayMux != nil && discoveredBackfillMux != nil {
app.services.timelineMux = discoveredTimelineMux
app.services.replayMux = discoveredReplayMux
app.services.backfillMux = discoveredBackfillMux
timelineDiscovered = true
logger.Info("Discovered timeline, replay, and backfill muxes from service registry")
}
if !timelineDiscovered {
if eventStore != nil {
timelineHandler := evstore.NewTimelineHandler(eventStore, logger)
Expand Down Expand Up @@ -672,36 +670,29 @@ func (app *serverApp) initStores(logger *slog.Logger) error {
// DLQ handler
// -----------------------------------------------------------------------

// Try to discover DLQ from the service registry (registered by a
// Discover DLQ mux and store from the service registry (registered by a
// dlq.service module). Fall back to direct creation.
dlqDiscovered := false
var dlqStore evstore.DLQStore
var discoveredDLQMux http.Handler
for svcName, svc := range engine.GetApp().SvcRegistry() {
if dlqMod, ok := svc.(*module.DLQServiceModule); ok {
app.services.dlqMux = dlqMod.DLQMux()
dlqStore = dlqMod.Store()
dlqDiscovered = true
logger.Info("Discovered DLQ service from service registry", "service", svcName)
break
}
}
// Also check for the DLQ store registered by ProvidesServices
if !dlqDiscovered {
for svcName, svc := range engine.GetApp().SvcRegistry() {
if strings.HasSuffix(svcName, ".store") {
if ds, ok := svc.(*evstore.InMemoryDLQStore); ok {
dlqStore = ds
}
if strings.HasSuffix(svcName, ".store") {
if ds, ok := svc.(*evstore.InMemoryDLQStore); ok {
dlqStore = ds
}
if strings.HasSuffix(svcName, ".admin") {
if h, ok := svc.(http.Handler); ok && strings.Contains(svcName, "dlq") {
app.services.dlqMux = h
dlqDiscovered = true
logger.Info("Discovered DLQ mux from service registry", "service", svcName)
}
}
if strings.HasSuffix(svcName, ".admin") {
if h, ok := svc.(http.Handler); ok && strings.Contains(svcName, "dlq") {
discoveredDLQMux = h
logger.Info("Discovered DLQ mux from service registry", "service", svcName)
}
}
}
if discoveredDLQMux != nil && dlqStore != nil {
app.services.dlqMux = discoveredDLQMux
dlqDiscovered = true
logger.Info("Discovered DLQ service from service registry")
}
if !dlqDiscovered {
inMemDLQStore := evstore.NewInMemoryDLQStore()
dlqStore = inMemDLQStore
Expand Down
6 changes: 5 additions & 1 deletion module/dlq_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import (

// DLQServiceConfig holds the configuration for the DLQ service module.
type DLQServiceConfig struct {
MaxRetries int `yaml:"max_retries" default:"3"`
// MaxRetries is reserved for future implementation of per-entry retry limits.
// It is stored and exposed via MaxRetries() but not yet applied to the DLQ store.
MaxRetries int `yaml:"max_retries" default:"3"`
// RetentionDays is reserved for future implementation of automatic DLQ entry purging.
// It is stored and exposed via RetentionDays() but not yet applied to the DLQ store.
RetentionDays int `yaml:"retention_days" default:"30"`
}

Expand Down
6 changes: 4 additions & 2 deletions module/eventstore_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (

// EventStoreServiceConfig holds the configuration for the event store service module.
type EventStoreServiceConfig struct {
DBPath string `yaml:"db_path" default:"data/events.db"`
RetentionDays int `yaml:"retention_days" default:"90"`
DBPath string `yaml:"db_path" default:"data/events.db"`
// RetentionDays is reserved for future implementation of automatic event pruning.
// It is stored and exposed via RetentionDays() but not yet applied to the store.
RetentionDays int `yaml:"retention_days" default:"90"`
}

// EventStoreServiceModule wraps an evstore.SQLiteEventStore as a modular.Module.
Expand Down
12 changes: 6 additions & 6 deletions module/eventstore_service_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package module

import (
"os"
"testing"

"github.com/CrisisTextLine/modular"
Expand Down Expand Up @@ -89,17 +90,16 @@ func TestEventStoreServiceModule_RetentionDays(t *testing.T) {
}

func TestEventStoreServiceModule_DefaultDBPath(t *testing.T) {
// Test that empty DBPath falls back to default
tmpDir := t.TempDir()
m, err := NewEventStoreServiceModule("test-es", EventStoreServiceConfig{
DBPath: tmpDir + "/events.db",
})
// Test that an empty DBPath falls back to the default "data/events.db" path.
m, err := NewEventStoreServiceModule("test-es", EventStoreServiceConfig{})
if err != nil {
t.Fatalf("NewEventStoreServiceModule() error = %v", err)
}
if m.Store() == nil {
t.Error("Store() returned nil with explicit path")
t.Error("Store() returned nil with default path")
}
// Clean up the created data directory (created as side-effect of default path).
_ = os.RemoveAll("data")
}

// Verify EventStoreServiceModule satisfies the modular.Module interface.
Expand Down
5 changes: 4 additions & 1 deletion module/timeline_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ type TimelineServiceModule struct {
}

// NewTimelineServiceModule creates a new timeline service module.
// It requires a non-nil EventStore to function.
// It requires a non-nil EventStore to function. Panics if eventStore is nil.
func NewTimelineServiceModule(name string, eventStore evstore.EventStore) *TimelineServiceModule {
if eventStore == nil {
panic("NewTimelineServiceModule: eventStore must not be nil")
}
logger := slog.Default()

timelineHandler := evstore.NewTimelineHandler(eventStore, logger)
Expand Down
4 changes: 4 additions & 0 deletions plugins/dlq/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ func (p *Plugin) ModuleFactories() map[string]plugin.ModuleFactory {
}
if v, ok := config["max_retries"].(int); ok {
cfg.MaxRetries = v
} else if v, ok := config["max_retries"].(float64); ok {
cfg.MaxRetries = int(v)
}
if v, ok := config["retention_days"].(int); ok {
cfg.RetentionDays = v
} else if v, ok := config["retention_days"].(float64); ok {
cfg.RetentionDays = int(v)
}
return module.NewDLQServiceModule(name, cfg)
},
Expand Down
2 changes: 2 additions & 0 deletions plugins/eventstore/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func (p *Plugin) ModuleFactories() map[string]plugin.ModuleFactory {
}
if v, ok := config["retention_days"].(int); ok {
cfg.RetentionDays = v
} else if v, ok := config["retention_days"].(float64); ok {
cfg.RetentionDays = int(v)
}
mod, err := module.NewEventStoreServiceModule(name, cfg)
if err != nil {
Expand Down