diff --git a/admin/config.yaml b/admin/config.yaml index 8da1acd6..63c42b0a 100644 --- a/admin/config.yaml +++ b/admin/config.yaml @@ -136,6 +136,29 @@ modules: sse_enabled: true db_path: "data/featureflags.db" + # --- Event Store --- + # Provides the execution event store; auto-discovered by timeline and replay modules. + - name: admin-event-store + type: eventstore.service + config: + db_path: "data/events.db" + retention_days: 90 + + # --- Timeline / Replay --- + # Provides timeline, replay, and backfill/mock/diff HTTP handlers. + - name: admin-timeline + type: timeline.service + config: + event_store: "admin-event-store" + + # --- Dead Letter Queue --- + # Provides DLQ management HTTP endpoints. + - name: admin-dlq + type: dlq.service + config: + max_retries: 3 + retention_days: 30 + workflows: http-admin: router: admin-router diff --git a/cmd/server/main.go b/cmd/server/main.go index aaf59157..bd571b5c 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -41,6 +41,8 @@ import ( pluginapi "github.com/GoCodeAlone/workflow/plugins/api" pluginauth "github.com/GoCodeAlone/workflow/plugins/auth" plugincicd "github.com/GoCodeAlone/workflow/plugins/cicd" + plugindlq "github.com/GoCodeAlone/workflow/plugins/dlq" + pluginevstore "github.com/GoCodeAlone/workflow/plugins/eventstore" pluginff "github.com/GoCodeAlone/workflow/plugins/featureflags" pluginhttp "github.com/GoCodeAlone/workflow/plugins/http" pluginintegration "github.com/GoCodeAlone/workflow/plugins/integration" @@ -54,6 +56,7 @@ import ( pluginsecrets "github.com/GoCodeAlone/workflow/plugins/secrets" pluginsm "github.com/GoCodeAlone/workflow/plugins/statemachine" pluginstorage "github.com/GoCodeAlone/workflow/plugins/storage" + plugintimeline "github.com/GoCodeAlone/workflow/plugins/timeline" "github.com/GoCodeAlone/workflow/provider" _ "github.com/GoCodeAlone/workflow/provider/aws" _ "github.com/GoCodeAlone/workflow/provider/azure" @@ -109,6 +112,9 @@ func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.Std pluginpipeline.New(), plugincicd.New(), pluginff.New(), + pluginevstore.New(), + plugintimeline.New(), + plugindlq.New(), pluginsecrets.New(), pluginmodcompat.New(), pluginscheduler.New(), @@ -550,14 +556,29 @@ func (app *serverApp) initStores(logger *slog.Logger) error { // Event store, idempotency store // ----------------------------------------------------------------------- - // Create SQLite event store for execution events - eventsDBPath := filepath.Join(*dataDir, "events.db") - eventStore, err := evstore.NewSQLiteEventStore(eventsDBPath) - if err != nil { - logger.Warn("Failed to create event store — timeline/replay/diff features disabled", "error", err) - } else { + // Try to discover the event store from the service registry (registered + // by an eventstore.service module declared in config). Fall back to + // creating one directly if no module was configured. + var eventStore *evstore.SQLiteEventStore + for _, svc := range engine.GetApp().SvcRegistry() { + if es, ok := svc.(*evstore.SQLiteEventStore); ok { + eventStore = es + logger.Info("Discovered event store from service registry") + break + } + } + if eventStore == nil { + eventsDBPath := filepath.Join(*dataDir, "events.db") + var esErr error + eventStore, esErr = evstore.NewSQLiteEventStore(eventsDBPath) + if esErr != nil { + logger.Warn("Failed to create event store — timeline/replay/diff features disabled", "error", esErr) + } else { + logger.Info("Opened event store (fallback)", "path", eventsDBPath) + } + } + if eventStore != nil { app.stores.eventStore = eventStore - logger.Info("Opened event store", "path", eventsDBPath) } // Create SQLite idempotency store (separate DB connection, same data dir) @@ -582,47 +603,105 @@ func (app *serverApp) initStores(logger *slog.Logger) error { // Timeline, replay, backfill handlers // ----------------------------------------------------------------------- - if eventStore != nil { - // Timeline handler (execution list, timeline, events) - timelineHandler := evstore.NewTimelineHandler(eventStore, logger) - timelineMux := http.NewServeMux() - timelineHandler.RegisterRoutes(timelineMux) - app.services.timelineMux = timelineMux - - // Replay handler - replayHandler := evstore.NewReplayHandler(eventStore, logger) - replayMux := http.NewServeMux() - replayHandler.RegisterRoutes(replayMux) - app.services.replayMux = replayMux - - // Backfill / Mock / Diff handler - backfillStore := evstore.NewInMemoryBackfillStore() - mockStore := evstore.NewInMemoryStepMockStore() - diffCalc := evstore.NewDiffCalculator(eventStore) - bmdHandler := evstore.NewBackfillMockDiffHandler(backfillStore, mockStore, diffCalc, logger) - bmdMux := http.NewServeMux() - bmdHandler.RegisterRoutes(bmdMux) - app.services.backfillMux = bmdMux - - logger.Info("Created timeline, replay, and backfill/mock/diff handlers") - } else { - // Create stub handlers so delegate routes return 503 instead of 500 - stubMsg := "event store unavailable — timeline/replay/backfill features disabled" - app.services.timelineMux = featureDisabledHandler(stubMsg) - app.services.replayMux = featureDisabledHandler(stubMsg) - app.services.backfillMux = featureDisabledHandler(stubMsg) - logger.Info("Created stub handlers for timeline/replay/backfill (event store unavailable)") + // Discover timeline/replay/backfill mux services registered by ProvidesServices + // (registered by a timeline.service module). Fall back to direct creation. + timelineDiscovered := false + var ( + discoveredTimelineMux http.Handler + discoveredReplayMux http.Handler + discoveredBackfillMux http.Handler + ) + for svcName, svc := range engine.GetApp().SvcRegistry() { + if strings.HasSuffix(svcName, ".timeline") { + if h, ok := svc.(http.Handler); ok { + discoveredTimelineMux = h + logger.Info("Discovered timeline mux from service registry", "service", svcName) + } + } + if strings.HasSuffix(svcName, ".replay") { + if h, ok := svc.(http.Handler); ok { + discoveredReplayMux = h + } + } + if strings.HasSuffix(svcName, ".backfill") { + if h, ok := svc.(http.Handler); ok { + discoveredBackfillMux = h + } + } + } + if discoveredTimelineMux != nil && discoveredReplayMux != nil && discoveredBackfillMux != nil { + app.services.timelineMux = discoveredTimelineMux + app.services.replayMux = discoveredReplayMux + app.services.backfillMux = discoveredBackfillMux + timelineDiscovered = true + logger.Info("Discovered timeline, replay, and backfill muxes from service registry") + } + if !timelineDiscovered { + if eventStore != nil { + timelineHandler := evstore.NewTimelineHandler(eventStore, logger) + timelineMux := http.NewServeMux() + timelineHandler.RegisterRoutes(timelineMux) + app.services.timelineMux = timelineMux + + replayHandler := evstore.NewReplayHandler(eventStore, logger) + replayMux := http.NewServeMux() + replayHandler.RegisterRoutes(replayMux) + app.services.replayMux = replayMux + + backfillStore := evstore.NewInMemoryBackfillStore() + mockStore := evstore.NewInMemoryStepMockStore() + diffCalc := evstore.NewDiffCalculator(eventStore) + bmdHandler := evstore.NewBackfillMockDiffHandler(backfillStore, mockStore, diffCalc, logger) + bmdMux := http.NewServeMux() + bmdHandler.RegisterRoutes(bmdMux) + app.services.backfillMux = bmdMux + + logger.Info("Created timeline, replay, and backfill/mock/diff handlers (fallback)") + } else { + stubMsg := "event store unavailable — timeline/replay/backfill features disabled" + app.services.timelineMux = featureDisabledHandler(stubMsg) + app.services.replayMux = featureDisabledHandler(stubMsg) + app.services.backfillMux = featureDisabledHandler(stubMsg) + logger.Info("Created stub handlers for timeline/replay/backfill (event store unavailable)") + } } // ----------------------------------------------------------------------- // DLQ handler // ----------------------------------------------------------------------- - dlqStore := evstore.NewInMemoryDLQStore() - dlqHandler := evstore.NewDLQHandler(dlqStore, logger) - dlqMux := http.NewServeMux() - dlqHandler.RegisterRoutes(dlqMux) - app.services.dlqMux = dlqMux + // Discover DLQ mux and store from the service registry (registered by a + // dlq.service module). Fall back to direct creation. + dlqDiscovered := false + var dlqStore evstore.DLQStore + var discoveredDLQMux http.Handler + for svcName, svc := range engine.GetApp().SvcRegistry() { + if strings.HasSuffix(svcName, ".store") { + if ds, ok := svc.(*evstore.InMemoryDLQStore); ok { + dlqStore = ds + } + } + if strings.HasSuffix(svcName, ".admin") { + if h, ok := svc.(http.Handler); ok && strings.Contains(svcName, "dlq") { + discoveredDLQMux = h + logger.Info("Discovered DLQ mux from service registry", "service", svcName) + } + } + } + if discoveredDLQMux != nil && dlqStore != nil { + app.services.dlqMux = discoveredDLQMux + dlqDiscovered = true + logger.Info("Discovered DLQ service from service registry") + } + if !dlqDiscovered { + inMemDLQStore := evstore.NewInMemoryDLQStore() + dlqStore = inMemDLQStore + dlqHandler := evstore.NewDLQHandler(inMemDLQStore, logger) + dlqMux := http.NewServeMux() + dlqHandler.RegisterRoutes(dlqMux) + app.services.dlqMux = dlqMux + logger.Info("Created DLQ handler (fallback)") + } // ----------------------------------------------------------------------- // Billing handler diff --git a/module/command_handler_test.go b/module/command_handler_test.go index 7819aad4..14605036 100644 --- a/module/command_handler_test.go +++ b/module/command_handler_test.go @@ -294,4 +294,3 @@ func TestCommandHandler_RoutePipeline_TypedNil(t *testing.T) { t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code) } } - diff --git a/module/dlq_service.go b/module/dlq_service.go new file mode 100644 index 00000000..8717727b --- /dev/null +++ b/module/dlq_service.go @@ -0,0 +1,95 @@ +package module + +import ( + "log/slog" + "net/http" + + "github.com/CrisisTextLine/modular" + evstore "github.com/GoCodeAlone/workflow/store" +) + +// DLQServiceConfig holds the configuration for the DLQ service module. +type DLQServiceConfig struct { + // MaxRetries is reserved for future implementation of per-entry retry limits. + // It is stored and exposed via MaxRetries() but not yet applied to the DLQ store. + MaxRetries int `yaml:"max_retries" default:"3"` + // RetentionDays is reserved for future implementation of automatic DLQ entry purging. + // It is stored and exposed via RetentionDays() but not yet applied to the DLQ store. + RetentionDays int `yaml:"retention_days" default:"30"` +} + +// DLQServiceModule wraps an evstore.DLQHandler as a modular.Module. +// It initializes the in-memory DLQ store and handler, making them available +// in the modular service registry. +type DLQServiceModule struct { + name string + config DLQServiceConfig + store *evstore.InMemoryDLQStore + handler *evstore.DLQHandler + mux *http.ServeMux +} + +// NewDLQServiceModule creates a new DLQ service module with the given name and config. +func NewDLQServiceModule(name string, cfg DLQServiceConfig) *DLQServiceModule { + logger := slog.Default() + + dlqStore := evstore.NewInMemoryDLQStore() + dlqHandler := evstore.NewDLQHandler(dlqStore, logger) + dlqMux := http.NewServeMux() + dlqHandler.RegisterRoutes(dlqMux) + + logger.Info("Created DLQ handler", "module", name) + + return &DLQServiceModule{ + name: name, + config: cfg, + store: dlqStore, + handler: dlqHandler, + mux: dlqMux, + } +} + +// Name implements modular.Module. +func (m *DLQServiceModule) Name() string { return m.name } + +// Init implements modular.Module. +func (m *DLQServiceModule) Init(_ modular.Application) error { return nil } + +// ProvidesServices implements modular.Module. The DLQ handler mux is registered +// under the module name and also under {name}.admin for admin route delegation. +func (m *DLQServiceModule) ProvidesServices() []modular.ServiceProvider { + return []modular.ServiceProvider{ + { + Name: m.name, + Description: "DLQ service: " + m.name, + Instance: m.mux, + }, + { + Name: m.name + ".admin", + Description: "DLQ admin handler: " + m.name, + Instance: http.Handler(m.mux), + }, + { + Name: m.name + ".store", + Description: "DLQ store: " + m.name, + Instance: m.store, + }, + } +} + +// RequiresServices implements modular.Module. +func (m *DLQServiceModule) RequiresServices() []modular.ServiceDependency { + return nil +} + +// DLQMux returns the HTTP mux for DLQ endpoints. +func (m *DLQServiceModule) DLQMux() http.Handler { return m.mux } + +// Store returns the underlying DLQ store. +func (m *DLQServiceModule) Store() *evstore.InMemoryDLQStore { return m.store } + +// MaxRetries returns the configured max retry count. +func (m *DLQServiceModule) MaxRetries() int { return m.config.MaxRetries } + +// RetentionDays returns the configured retention period. +func (m *DLQServiceModule) RetentionDays() int { return m.config.RetentionDays } diff --git a/module/dlq_service_test.go b/module/dlq_service_test.go new file mode 100644 index 00000000..11a1befd --- /dev/null +++ b/module/dlq_service_test.go @@ -0,0 +1,82 @@ +package module + +import ( + "testing" + + "github.com/CrisisTextLine/modular" +) + +func TestDLQServiceModule_Name(t *testing.T) { + m := NewDLQServiceModule("test-dlq", DLQServiceConfig{MaxRetries: 5, RetentionDays: 14}) + if m.Name() != "test-dlq" { + t.Errorf("Name() = %q, want %q", m.Name(), "test-dlq") + } +} + +func TestDLQServiceModule_Init(t *testing.T) { + m := NewDLQServiceModule("test-dlq", DLQServiceConfig{}) + if err := m.Init(nil); err != nil { + t.Errorf("Init() error = %v", err) + } +} + +func TestDLQServiceModule_ProvidesServices(t *testing.T) { + m := NewDLQServiceModule("test-dlq", DLQServiceConfig{}) + + providers := m.ProvidesServices() + if len(providers) != 3 { + t.Fatalf("ProvidesServices() returned %d providers, want 3", len(providers)) + } + + expected := map[string]bool{ + "test-dlq": false, + "test-dlq.admin": false, + "test-dlq.store": false, + } + for _, p := range providers { + if _, ok := expected[p.Name]; !ok { + t.Errorf("unexpected service name %q", p.Name) + } + expected[p.Name] = true + } + for name, found := range expected { + if !found { + t.Errorf("expected service %q not found", name) + } + } +} + +func TestDLQServiceModule_RequiresServices(t *testing.T) { + m := NewDLQServiceModule("test-dlq", DLQServiceConfig{}) + deps := m.RequiresServices() + if len(deps) != 0 { + t.Errorf("RequiresServices() returned %d deps, want 0", len(deps)) + } +} + +func TestDLQServiceModule_DLQMux(t *testing.T) { + m := NewDLQServiceModule("test-dlq", DLQServiceConfig{}) + if m.DLQMux() == nil { + t.Error("DLQMux() returned nil") + } +} + +func TestDLQServiceModule_Store(t *testing.T) { + m := NewDLQServiceModule("test-dlq", DLQServiceConfig{}) + if m.Store() == nil { + t.Error("Store() returned nil") + } +} + +func TestDLQServiceModule_Config(t *testing.T) { + m := NewDLQServiceModule("test-dlq", DLQServiceConfig{MaxRetries: 5, RetentionDays: 14}) + if m.MaxRetries() != 5 { + t.Errorf("MaxRetries() = %d, want 5", m.MaxRetries()) + } + if m.RetentionDays() != 14 { + t.Errorf("RetentionDays() = %d, want 14", m.RetentionDays()) + } +} + +// Verify DLQServiceModule satisfies the modular.Module interface. +var _ modular.Module = (*DLQServiceModule)(nil) diff --git a/module/eventstore_service.go b/module/eventstore_service.go new file mode 100644 index 00000000..9f33a28b --- /dev/null +++ b/module/eventstore_service.go @@ -0,0 +1,93 @@ +package module + +import ( + "fmt" + "log/slog" + "os" + "path/filepath" + + "github.com/CrisisTextLine/modular" + evstore "github.com/GoCodeAlone/workflow/store" +) + +// EventStoreServiceConfig holds the configuration for the event store service module. +type EventStoreServiceConfig struct { + DBPath string `yaml:"db_path" default:"data/events.db"` + // RetentionDays is reserved for future implementation of automatic event pruning. + // It is stored and exposed via RetentionDays() but not yet applied to the store. + RetentionDays int `yaml:"retention_days" default:"90"` +} + +// EventStoreServiceModule wraps an evstore.SQLiteEventStore as a modular.Module. +// It initializes the store and makes it available in the modular service registry. +type EventStoreServiceModule struct { + name string + config EventStoreServiceConfig + store *evstore.SQLiteEventStore +} + +// NewEventStoreServiceModule creates a new event store service module with the given name and config. +func NewEventStoreServiceModule(name string, cfg EventStoreServiceConfig) (*EventStoreServiceModule, error) { + dbPath := cfg.DBPath + if dbPath == "" { + dbPath = "data/events.db" + } + + // Ensure parent directory exists for the SQLite file. + if dir := filepath.Dir(dbPath); dir != "" && dir != "." { + if mkErr := os.MkdirAll(dir, 0o750); mkErr != nil { + return nil, fmt.Errorf("eventstore.service %q: failed to create db directory %q: %w", name, dir, mkErr) + } + } + + store, err := evstore.NewSQLiteEventStore(dbPath) + if err != nil { + return nil, fmt.Errorf("eventstore.service %q: failed to open store: %w", name, err) + } + + slog.Default().Info("Opened event store", "module", name, "path", dbPath) + + return &EventStoreServiceModule{ + name: name, + config: cfg, + store: store, + }, nil +} + +// Name implements modular.Module. +func (m *EventStoreServiceModule) Name() string { return m.name } + +// Init implements modular.Module. +func (m *EventStoreServiceModule) Init(_ modular.Application) error { return nil } + +// ProvidesServices implements modular.Module. The event store is registered under +// the module name so other modules (timeline, replay, DLQ) can look it up. +func (m *EventStoreServiceModule) ProvidesServices() []modular.ServiceProvider { + return []modular.ServiceProvider{ + { + Name: m.name, + Description: "Event store service: " + m.name, + Instance: m.store, + }, + { + Name: m.name + ".admin", + Description: "Event store admin (closable): " + m.name, + Instance: m.store, + }, + } +} + +// RequiresServices implements modular.Module. +func (m *EventStoreServiceModule) RequiresServices() []modular.ServiceDependency { + return nil +} + +// Store returns the underlying SQLiteEventStore for direct use. +func (m *EventStoreServiceModule) Store() *evstore.SQLiteEventStore { + return m.store +} + +// RetentionDays returns the configured retention period. +func (m *EventStoreServiceModule) RetentionDays() int { + return m.config.RetentionDays +} diff --git a/module/eventstore_service_test.go b/module/eventstore_service_test.go new file mode 100644 index 00000000..714b1590 --- /dev/null +++ b/module/eventstore_service_test.go @@ -0,0 +1,106 @@ +package module + +import ( + "os" + "testing" + + "github.com/CrisisTextLine/modular" +) + +func TestEventStoreServiceModule_Name(t *testing.T) { + dbPath := t.TempDir() + "/test-events.db" + m, err := NewEventStoreServiceModule("test-es", EventStoreServiceConfig{ + DBPath: dbPath, + RetentionDays: 30, + }) + if err != nil { + t.Fatalf("NewEventStoreServiceModule() error = %v", err) + } + if m.Name() != "test-es" { + t.Errorf("Name() = %q, want %q", m.Name(), "test-es") + } +} + +func TestEventStoreServiceModule_Init(t *testing.T) { + dbPath := t.TempDir() + "/test-events.db" + m, err := NewEventStoreServiceModule("test-es", EventStoreServiceConfig{DBPath: dbPath}) + if err != nil { + t.Fatalf("NewEventStoreServiceModule() error = %v", err) + } + if err := m.Init(nil); err != nil { + t.Errorf("Init() error = %v", err) + } +} + +func TestEventStoreServiceModule_ProvidesServices(t *testing.T) { + dbPath := t.TempDir() + "/test-events.db" + m, err := NewEventStoreServiceModule("test-es", EventStoreServiceConfig{DBPath: dbPath}) + if err != nil { + t.Fatalf("NewEventStoreServiceModule() error = %v", err) + } + + providers := m.ProvidesServices() + if len(providers) != 2 { + t.Fatalf("ProvidesServices() returned %d providers, want 2", len(providers)) + } + + if providers[0].Name != "test-es" { + t.Errorf("providers[0].Name = %q, want %q", providers[0].Name, "test-es") + } + if providers[1].Name != "test-es.admin" { + t.Errorf("providers[1].Name = %q, want %q", providers[1].Name, "test-es.admin") + } +} + +func TestEventStoreServiceModule_RequiresServices(t *testing.T) { + dbPath := t.TempDir() + "/test-events.db" + m, err := NewEventStoreServiceModule("test-es", EventStoreServiceConfig{DBPath: dbPath}) + if err != nil { + t.Fatalf("NewEventStoreServiceModule() error = %v", err) + } + deps := m.RequiresServices() + if len(deps) != 0 { + t.Errorf("RequiresServices() returned %d deps, want 0", len(deps)) + } +} + +func TestEventStoreServiceModule_Store(t *testing.T) { + dbPath := t.TempDir() + "/test-events.db" + m, err := NewEventStoreServiceModule("test-es", EventStoreServiceConfig{DBPath: dbPath}) + if err != nil { + t.Fatalf("NewEventStoreServiceModule() error = %v", err) + } + if m.Store() == nil { + t.Error("Store() returned nil") + } +} + +func TestEventStoreServiceModule_RetentionDays(t *testing.T) { + dbPath := t.TempDir() + "/test-events.db" + m, err := NewEventStoreServiceModule("test-es", EventStoreServiceConfig{ + DBPath: dbPath, + RetentionDays: 60, + }) + if err != nil { + t.Fatalf("NewEventStoreServiceModule() error = %v", err) + } + if m.RetentionDays() != 60 { + t.Errorf("RetentionDays() = %d, want 60", m.RetentionDays()) + } +} + +func TestEventStoreServiceModule_DefaultDBPath(t *testing.T) { + // Test that an empty DBPath falls back to the default "data/events.db" path. + m, err := NewEventStoreServiceModule("test-es", EventStoreServiceConfig{}) + if err != nil { + t.Fatalf("NewEventStoreServiceModule() error = %v", err) + } + if m.Store() == nil { + t.Error("Store() returned nil with default path") + } + // Clean up the created data directory (created as side-effect of default path). + _ = os.RemoveAll("data") +} + +// Verify EventStoreServiceModule satisfies the modular.Module interface. +var _ modular.Module = (*EventStoreServiceModule)(nil) diff --git a/module/query_handler_test.go b/module/query_handler_test.go index a8243538..132ebab5 100644 --- a/module/query_handler_test.go +++ b/module/query_handler_test.go @@ -299,4 +299,3 @@ func TestQueryHandler_RoutePipeline_TypedNil(t *testing.T) { t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code) } } - diff --git a/module/timeline_service.go b/module/timeline_service.go new file mode 100644 index 00000000..7656650a --- /dev/null +++ b/module/timeline_service.go @@ -0,0 +1,107 @@ +package module + +import ( + "log/slog" + "net/http" + + "github.com/CrisisTextLine/modular" + evstore "github.com/GoCodeAlone/workflow/store" +) + +// TimelineServiceModule wraps evstore.TimelineHandler and evstore.ReplayHandler +// as a modular.Module. It provides HTTP muxes for timeline and replay features +// via the service registry. +type TimelineServiceModule struct { + name string + eventStore evstore.EventStore + timelineHandler *evstore.TimelineHandler + replayHandler *evstore.ReplayHandler + backfillHandler *evstore.BackfillMockDiffHandler + timelineMux *http.ServeMux + replayMux *http.ServeMux + backfillMux *http.ServeMux +} + +// NewTimelineServiceModule creates a new timeline service module. +// It requires a non-nil EventStore to function. Panics if eventStore is nil. +func NewTimelineServiceModule(name string, eventStore evstore.EventStore) *TimelineServiceModule { + if eventStore == nil { + panic("NewTimelineServiceModule: eventStore must not be nil") + } + logger := slog.Default() + + timelineHandler := evstore.NewTimelineHandler(eventStore, logger) + timelineMux := http.NewServeMux() + timelineHandler.RegisterRoutes(timelineMux) + + replayHandler := evstore.NewReplayHandler(eventStore, logger) + replayMux := http.NewServeMux() + replayHandler.RegisterRoutes(replayMux) + + backfillStore := evstore.NewInMemoryBackfillStore() + mockStore := evstore.NewInMemoryStepMockStore() + diffCalc := evstore.NewDiffCalculator(eventStore) + backfillHandler := evstore.NewBackfillMockDiffHandler(backfillStore, mockStore, diffCalc, logger) + backfillMux := http.NewServeMux() + backfillHandler.RegisterRoutes(backfillMux) + + logger.Info("Created timeline, replay, and backfill/mock/diff handlers", "module", name) + + return &TimelineServiceModule{ + name: name, + eventStore: eventStore, + timelineHandler: timelineHandler, + replayHandler: replayHandler, + backfillHandler: backfillHandler, + timelineMux: timelineMux, + replayMux: replayMux, + backfillMux: backfillMux, + } +} + +// Name implements modular.Module. +func (m *TimelineServiceModule) Name() string { return m.name } + +// Init implements modular.Module. +func (m *TimelineServiceModule) Init(_ modular.Application) error { return nil } + +// ProvidesServices implements modular.Module. Registers the timeline, replay, +// and backfill muxes as services so the server can delegate routes to them. +func (m *TimelineServiceModule) ProvidesServices() []modular.ServiceProvider { + return []modular.ServiceProvider{ + { + Name: m.name, + Description: "Timeline service: " + m.name, + Instance: m.timelineMux, + }, + { + Name: m.name + ".timeline", + Description: "Timeline handler mux: " + m.name, + Instance: http.Handler(m.timelineMux), + }, + { + Name: m.name + ".replay", + Description: "Replay handler mux: " + m.name, + Instance: http.Handler(m.replayMux), + }, + { + Name: m.name + ".backfill", + Description: "Backfill/mock/diff handler mux: " + m.name, + Instance: http.Handler(m.backfillMux), + }, + } +} + +// RequiresServices implements modular.Module. +func (m *TimelineServiceModule) RequiresServices() []modular.ServiceDependency { + return nil +} + +// TimelineMux returns the HTTP mux for timeline endpoints. +func (m *TimelineServiceModule) TimelineMux() http.Handler { return m.timelineMux } + +// ReplayMux returns the HTTP mux for replay endpoints. +func (m *TimelineServiceModule) ReplayMux() http.Handler { return m.replayMux } + +// BackfillMux returns the HTTP mux for backfill/mock/diff endpoints. +func (m *TimelineServiceModule) BackfillMux() http.Handler { return m.backfillMux } diff --git a/module/timeline_service_test.go b/module/timeline_service_test.go new file mode 100644 index 00000000..0a78024d --- /dev/null +++ b/module/timeline_service_test.go @@ -0,0 +1,99 @@ +package module + +import ( + "testing" + + "github.com/CrisisTextLine/modular" + evstore "github.com/GoCodeAlone/workflow/store" +) + +func TestTimelineServiceModule_Name(t *testing.T) { + dbPath := t.TempDir() + "/test-events.db" + es, err := evstore.NewSQLiteEventStore(dbPath) + if err != nil { + t.Fatalf("NewSQLiteEventStore() error = %v", err) + } + m := NewTimelineServiceModule("test-timeline", es) + if m.Name() != "test-timeline" { + t.Errorf("Name() = %q, want %q", m.Name(), "test-timeline") + } +} + +func TestTimelineServiceModule_Init(t *testing.T) { + dbPath := t.TempDir() + "/test-events.db" + es, err := evstore.NewSQLiteEventStore(dbPath) + if err != nil { + t.Fatalf("NewSQLiteEventStore() error = %v", err) + } + m := NewTimelineServiceModule("test-timeline", es) + if err := m.Init(nil); err != nil { + t.Errorf("Init() error = %v", err) + } +} + +func TestTimelineServiceModule_ProvidesServices(t *testing.T) { + dbPath := t.TempDir() + "/test-events.db" + es, err := evstore.NewSQLiteEventStore(dbPath) + if err != nil { + t.Fatalf("NewSQLiteEventStore() error = %v", err) + } + m := NewTimelineServiceModule("test-timeline", es) + + providers := m.ProvidesServices() + if len(providers) != 4 { + t.Fatalf("ProvidesServices() returned %d providers, want 4", len(providers)) + } + + expected := map[string]bool{ + "test-timeline": false, + "test-timeline.timeline": false, + "test-timeline.replay": false, + "test-timeline.backfill": false, + } + for _, p := range providers { + if _, ok := expected[p.Name]; !ok { + t.Errorf("unexpected service name %q", p.Name) + } + expected[p.Name] = true + } + for name, found := range expected { + if !found { + t.Errorf("expected service %q not found", name) + } + } +} + +func TestTimelineServiceModule_RequiresServices(t *testing.T) { + dbPath := t.TempDir() + "/test-events.db" + es, err := evstore.NewSQLiteEventStore(dbPath) + if err != nil { + t.Fatalf("NewSQLiteEventStore() error = %v", err) + } + m := NewTimelineServiceModule("test-timeline", es) + deps := m.RequiresServices() + if len(deps) != 0 { + t.Errorf("RequiresServices() returned %d deps, want 0", len(deps)) + } +} + +func TestTimelineServiceModule_Muxes(t *testing.T) { + dbPath := t.TempDir() + "/test-events.db" + es, err := evstore.NewSQLiteEventStore(dbPath) + if err != nil { + t.Fatalf("NewSQLiteEventStore() error = %v", err) + } + m := NewTimelineServiceModule("test-timeline", es) + + if m.TimelineMux() == nil { + t.Error("TimelineMux() returned nil") + } + if m.ReplayMux() == nil { + t.Error("ReplayMux() returned nil") + } + if m.BackfillMux() == nil { + t.Error("BackfillMux() returned nil") + } +} + +// Verify TimelineServiceModule satisfies the modular.Module interface. +var _ modular.Module = (*TimelineServiceModule)(nil) diff --git a/plugins/dlq/plugin.go b/plugins/dlq/plugin.go new file mode 100644 index 00000000..b1f37525 --- /dev/null +++ b/plugins/dlq/plugin.go @@ -0,0 +1,58 @@ +// Package dlq provides a plugin that registers the dlq.service module type +// for config-driven dead letter queue initialization. +package dlq + +import ( + "github.com/CrisisTextLine/modular" + "github.com/GoCodeAlone/workflow/module" + "github.com/GoCodeAlone/workflow/plugin" +) + +// Plugin registers the dlq.service module type. +type Plugin struct { + plugin.BaseEnginePlugin +} + +// New creates a new DLQ plugin. +func New() *Plugin { + return &Plugin{ + BaseEnginePlugin: plugin.BaseEnginePlugin{ + BaseNativePlugin: plugin.BaseNativePlugin{ + PluginName: "dlq", + PluginVersion: "1.0.0", + PluginDescription: "Dead letter queue service module for failed message management", + }, + Manifest: plugin.PluginManifest{ + Name: "dlq", + Version: "1.0.0", + Author: "GoCodeAlone", + Description: "Dead letter queue service module for failed message management", + Tier: plugin.TierCore, + ModuleTypes: []string{"dlq.service"}, + }, + }, + } +} + +// ModuleFactories returns the module factories for the DLQ service. +func (p *Plugin) ModuleFactories() map[string]plugin.ModuleFactory { + return map[string]plugin.ModuleFactory{ + "dlq.service": func(name string, config map[string]any) modular.Module { + cfg := module.DLQServiceConfig{ + MaxRetries: 3, + RetentionDays: 30, + } + if v, ok := config["max_retries"].(int); ok { + cfg.MaxRetries = v + } else if v, ok := config["max_retries"].(float64); ok { + cfg.MaxRetries = int(v) + } + if v, ok := config["retention_days"].(int); ok { + cfg.RetentionDays = v + } else if v, ok := config["retention_days"].(float64); ok { + cfg.RetentionDays = int(v) + } + return module.NewDLQServiceModule(name, cfg) + }, + } +} diff --git a/plugins/dlq/plugin_test.go b/plugins/dlq/plugin_test.go new file mode 100644 index 00000000..734ee7c0 --- /dev/null +++ b/plugins/dlq/plugin_test.go @@ -0,0 +1,41 @@ +package dlq + +import ( + "testing" +) + +func TestPlugin_New(t *testing.T) { + p := New() + if p.Name() != "dlq" { + t.Errorf("Name() = %q, want %q", p.Name(), "dlq") + } + if p.Version() != "1.0.0" { + t.Errorf("Version() = %q, want %q", p.Version(), "1.0.0") + } +} + +func TestPlugin_ModuleFactories(t *testing.T) { + p := New() + factories := p.ModuleFactories() + + if _, ok := factories["dlq.service"]; !ok { + t.Error("ModuleFactories() missing dlq.service") + } +} + +func TestPlugin_ModuleFactory_Creates(t *testing.T) { + p := New() + factories := p.ModuleFactories() + factory := factories["dlq.service"] + + mod := factory("test-dlq", map[string]any{ + "max_retries": 5, + "retention_days": 14, + }) + if mod == nil { + t.Fatal("factory returned nil") + } + if mod.Name() != "test-dlq" { + t.Errorf("Name() = %q, want %q", mod.Name(), "test-dlq") + } +} diff --git a/plugins/eventstore/plugin.go b/plugins/eventstore/plugin.go new file mode 100644 index 00000000..a69700bf --- /dev/null +++ b/plugins/eventstore/plugin.go @@ -0,0 +1,60 @@ +// Package eventstore provides a plugin that registers the eventstore.service +// module type for config-driven event store initialization. +package eventstore + +import ( + "github.com/CrisisTextLine/modular" + "github.com/GoCodeAlone/workflow/module" + "github.com/GoCodeAlone/workflow/plugin" +) + +// Plugin registers the eventstore.service module type. +type Plugin struct { + plugin.BaseEnginePlugin +} + +// New creates a new eventstore plugin. +func New() *Plugin { + return &Plugin{ + BaseEnginePlugin: plugin.BaseEnginePlugin{ + BaseNativePlugin: plugin.BaseNativePlugin{ + PluginName: "eventstore", + PluginVersion: "1.0.0", + PluginDescription: "Event store service module for execution event persistence", + }, + Manifest: plugin.PluginManifest{ + Name: "eventstore", + Version: "1.0.0", + Author: "GoCodeAlone", + Description: "Event store service module for execution event persistence", + Tier: plugin.TierCore, + ModuleTypes: []string{"eventstore.service"}, + }, + }, + } +} + +// ModuleFactories returns the module factories for the event store service. +func (p *Plugin) ModuleFactories() map[string]plugin.ModuleFactory { + return map[string]plugin.ModuleFactory{ + "eventstore.service": func(name string, config map[string]any) modular.Module { + cfg := module.EventStoreServiceConfig{ + DBPath: "data/events.db", + RetentionDays: 90, + } + if v, ok := config["db_path"].(string); ok { + cfg.DBPath = v + } + if v, ok := config["retention_days"].(int); ok { + cfg.RetentionDays = v + } else if v, ok := config["retention_days"].(float64); ok { + cfg.RetentionDays = int(v) + } + mod, err := module.NewEventStoreServiceModule(name, cfg) + if err != nil { + return nil + } + return mod + }, + } +} diff --git a/plugins/eventstore/plugin_test.go b/plugins/eventstore/plugin_test.go new file mode 100644 index 00000000..d19b307b --- /dev/null +++ b/plugins/eventstore/plugin_test.go @@ -0,0 +1,42 @@ +package eventstore + +import ( + "testing" +) + +func TestPlugin_New(t *testing.T) { + p := New() + if p.Name() != "eventstore" { + t.Errorf("Name() = %q, want %q", p.Name(), "eventstore") + } + if p.Version() != "1.0.0" { + t.Errorf("Version() = %q, want %q", p.Version(), "1.0.0") + } +} + +func TestPlugin_ModuleFactories(t *testing.T) { + p := New() + factories := p.ModuleFactories() + + if _, ok := factories["eventstore.service"]; !ok { + t.Error("ModuleFactories() missing eventstore.service") + } +} + +func TestPlugin_ModuleFactory_Creates(t *testing.T) { + p := New() + factories := p.ModuleFactories() + factory := factories["eventstore.service"] + + dbPath := t.TempDir() + "/test.db" + mod := factory("test-es", map[string]any{ + "db_path": dbPath, + "retention_days": 60, + }) + if mod == nil { + t.Fatal("factory returned nil") + } + if mod.Name() != "test-es" { + t.Errorf("Name() = %q, want %q", mod.Name(), "test-es") + } +} diff --git a/plugins/timeline/plugin.go b/plugins/timeline/plugin.go new file mode 100644 index 00000000..472d82ac --- /dev/null +++ b/plugins/timeline/plugin.go @@ -0,0 +1,100 @@ +// Package timeline provides a plugin that registers the timeline.service +// module type for config-driven timeline/replay handler initialization. +package timeline + +import ( + "github.com/CrisisTextLine/modular" + "github.com/GoCodeAlone/workflow/module" + "github.com/GoCodeAlone/workflow/plugin" + evstore "github.com/GoCodeAlone/workflow/store" +) + +// Plugin registers the timeline.service module type. +type Plugin struct { + plugin.BaseEnginePlugin +} + +// New creates a new timeline plugin. +func New() *Plugin { + return &Plugin{ + BaseEnginePlugin: plugin.BaseEnginePlugin{ + BaseNativePlugin: plugin.BaseNativePlugin{ + PluginName: "timeline", + PluginVersion: "1.0.0", + PluginDescription: "Timeline and replay service module for execution visualization", + }, + Manifest: plugin.PluginManifest{ + Name: "timeline", + Version: "1.0.0", + Author: "GoCodeAlone", + Description: "Timeline and replay service module for execution visualization", + Tier: plugin.TierCore, + ModuleTypes: []string{"timeline.service"}, + }, + }, + } +} + +// ModuleFactories returns the module factories for the timeline service. +func (p *Plugin) ModuleFactories() map[string]plugin.ModuleFactory { + return map[string]plugin.ModuleFactory{ + "timeline.service": func(name string, config map[string]any) modular.Module { + // The timeline module needs an EventStore. It discovers the event + // store from the config's "event_store" key, which should reference + // a service name registered by an eventstore.service module. + // At factory time we don't have the Application yet, so we use a + // deferred-init approach: create a stub that resolves at Init(). + return &deferredTimelineModule{ + name: name, + eventStoreName: stringFromConfig(config, "event_store", "admin-event-store"), + } + }, + } +} + +// deferredTimelineModule resolves the event store dependency at Init() time. +type deferredTimelineModule struct { + name string + eventStoreName string + inner *module.TimelineServiceModule +} + +func (m *deferredTimelineModule) Name() string { return m.name } + +func (m *deferredTimelineModule) Init(app modular.Application) error { + // Look up the event store from the service registry + var store *evstore.SQLiteEventStore + if err := app.GetService(m.eventStoreName, &store); err != nil || store == nil { + // Fallback: try to find any EventStore in the registry + for _, svc := range app.SvcRegistry() { + if es, ok := svc.(*evstore.SQLiteEventStore); ok { + store = es + break + } + } + } + if store == nil { + // No event store available — the module will provide stub handlers + return nil + } + m.inner = module.NewTimelineServiceModule(m.name, store) + return nil +} + +func (m *deferredTimelineModule) ProvidesServices() []modular.ServiceProvider { + if m.inner != nil { + return m.inner.ProvidesServices() + } + return nil +} + +func (m *deferredTimelineModule) RequiresServices() []modular.ServiceDependency { + return nil +} + +func stringFromConfig(config map[string]any, key, defaultVal string) string { + if v, ok := config[key].(string); ok && v != "" { + return v + } + return defaultVal +} diff --git a/plugins/timeline/plugin_test.go b/plugins/timeline/plugin_test.go new file mode 100644 index 00000000..4049cf05 --- /dev/null +++ b/plugins/timeline/plugin_test.go @@ -0,0 +1,40 @@ +package timeline + +import ( + "testing" +) + +func TestPlugin_New(t *testing.T) { + p := New() + if p.Name() != "timeline" { + t.Errorf("Name() = %q, want %q", p.Name(), "timeline") + } + if p.Version() != "1.0.0" { + t.Errorf("Version() = %q, want %q", p.Version(), "1.0.0") + } +} + +func TestPlugin_ModuleFactories(t *testing.T) { + p := New() + factories := p.ModuleFactories() + + if _, ok := factories["timeline.service"]; !ok { + t.Error("ModuleFactories() missing timeline.service") + } +} + +func TestPlugin_ModuleFactory_Creates(t *testing.T) { + p := New() + factories := p.ModuleFactories() + factory := factories["timeline.service"] + + mod := factory("test-timeline", map[string]any{ + "event_store": "some-event-store", + }) + if mod == nil { + t.Fatal("factory returned nil") + } + if mod.Name() != "test-timeline" { + t.Errorf("Name() = %q, want %q", mod.Name(), "test-timeline") + } +} diff --git a/schema/module_schema.go b/schema/module_schema.go index a69cdd53..2a74b709 100644 --- a/schema/module_schema.go +++ b/schema/module_schema.go @@ -1349,6 +1349,58 @@ func (r *ModuleSchemaRegistry) registerBuiltins() { MaxIncoming: intPtr(0), }) + // ---- Event Store ---- + + r.Register(&ModuleSchema{ + Type: "eventstore.service", + Label: "Event Store Service", + Category: "infrastructure", + Description: "SQLite-backed event store for execution event persistence, timeline, and replay features", + Outputs: []ServiceIODef{{Name: "EventStore", Type: "store.SQLiteEventStore", Description: "Execution event store"}}, + ConfigFields: []ConfigFieldDef{ + {Key: "db_path", Label: "Database Path", Type: FieldTypeString, DefaultValue: "data/events.db", Description: "Path to the SQLite database file for event storage", Placeholder: "data/events.db"}, + {Key: "retention_days", Label: "Retention Days", Type: FieldTypeNumber, DefaultValue: 90, Description: "Number of days to retain execution events"}, + }, + DefaultConfig: map[string]any{"db_path": "data/events.db", "retention_days": 90}, + MaxIncoming: intPtr(0), + }) + + // ---- Timeline / Replay ---- + + r.Register(&ModuleSchema{ + Type: "timeline.service", + Label: "Timeline & Replay Service", + Category: "infrastructure", + Description: "Provides execution timeline visualization and request replay HTTP endpoints", + Inputs: []ServiceIODef{{Name: "EventStore", Type: "store.EventStore", Description: "Event store dependency for timeline and replay data"}}, + Outputs: []ServiceIODef{ + {Name: "TimelineMux", Type: "http.Handler", Description: "HTTP handler for timeline endpoints"}, + {Name: "ReplayMux", Type: "http.Handler", Description: "HTTP handler for replay endpoints"}, + {Name: "BackfillMux", Type: "http.Handler", Description: "HTTP handler for backfill/mock/diff endpoints"}, + }, + ConfigFields: []ConfigFieldDef{ + {Key: "event_store", Label: "Event Store Service", Type: FieldTypeString, DefaultValue: "admin-event-store", Description: "Name of the event store service module to use", Placeholder: "admin-event-store"}, + }, + DefaultConfig: map[string]any{"event_store": "admin-event-store"}, + MaxIncoming: intPtr(1), + }) + + // ---- DLQ (Dead Letter Queue) ---- + + r.Register(&ModuleSchema{ + Type: "dlq.service", + Label: "Dead Letter Queue Service", + Category: "infrastructure", + Description: "In-memory dead letter queue for failed message management with retry, discard, and purge", + Outputs: []ServiceIODef{{Name: "DLQHandler", Type: "http.Handler", Description: "HTTP handler for DLQ management endpoints"}}, + ConfigFields: []ConfigFieldDef{ + {Key: "max_retries", Label: "Max Retries", Type: FieldTypeNumber, DefaultValue: 3, Description: "Maximum number of retry attempts for failed messages"}, + {Key: "retention_days", Label: "Retention Days", Type: FieldTypeNumber, DefaultValue: 30, Description: "Number of days to retain resolved/discarded DLQ entries"}, + }, + DefaultConfig: map[string]any{"max_retries": 3, "retention_days": 30}, + MaxIncoming: intPtr(0), + }) + r.Register(&ModuleSchema{ Type: "step.feature_flag", Label: "Feature Flag Check", diff --git a/schema/schema.go b/schema/schema.go index 5d2e60a7..fc94794c 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -60,7 +60,9 @@ var coreModuleTypes = []string{ "cache.modular", "data.transformer", "database.workflow", + "dlq.service", "dynamic.component", + "eventstore.service", "featureflag.service", "health.checker", "http.handler", @@ -142,6 +144,7 @@ var coreModuleTypes = []string{ "storage.local", "storage.s3", "storage.sqlite", + "timeline.service", "webhook.sender", "workflow.registry", }