Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions admin/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,29 @@ modules:
sse_enabled: true
db_path: "data/featureflags.db"

# --- Event Store ---
# Provides the execution event store; auto-discovered by timeline and replay modules.
- name: admin-event-store
type: eventstore.service
config:
db_path: "data/events.db"
retention_days: 90

# --- Timeline / Replay ---
# Provides timeline, replay, and backfill/mock/diff HTTP handlers.
- name: admin-timeline
type: timeline.service
config:
event_store: "admin-event-store"

# --- Dead Letter Queue ---
# Provides DLQ management HTTP endpoints.
- name: admin-dlq
type: dlq.service
config:
max_retries: 3
retention_days: 30

workflows:
http-admin:
router: admin-router
Expand Down
163 changes: 121 additions & 42 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
pluginapi "github.com/GoCodeAlone/workflow/plugins/api"
pluginauth "github.com/GoCodeAlone/workflow/plugins/auth"
plugincicd "github.com/GoCodeAlone/workflow/plugins/cicd"
plugindlq "github.com/GoCodeAlone/workflow/plugins/dlq"
pluginevstore "github.com/GoCodeAlone/workflow/plugins/eventstore"
pluginff "github.com/GoCodeAlone/workflow/plugins/featureflags"
pluginhttp "github.com/GoCodeAlone/workflow/plugins/http"
pluginintegration "github.com/GoCodeAlone/workflow/plugins/integration"
Expand All @@ -54,6 +56,7 @@ import (
pluginsecrets "github.com/GoCodeAlone/workflow/plugins/secrets"
pluginsm "github.com/GoCodeAlone/workflow/plugins/statemachine"
pluginstorage "github.com/GoCodeAlone/workflow/plugins/storage"
plugintimeline "github.com/GoCodeAlone/workflow/plugins/timeline"
"github.com/GoCodeAlone/workflow/provider"
_ "github.com/GoCodeAlone/workflow/provider/aws"
_ "github.com/GoCodeAlone/workflow/provider/azure"
Expand Down Expand Up @@ -109,6 +112,9 @@ func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.Std
pluginpipeline.New(),
plugincicd.New(),
pluginff.New(),
pluginevstore.New(),
plugintimeline.New(),
plugindlq.New(),
pluginsecrets.New(),
pluginmodcompat.New(),
pluginscheduler.New(),
Expand Down Expand Up @@ -550,14 +556,29 @@ func (app *serverApp) initStores(logger *slog.Logger) error {
// Event store, idempotency store
// -----------------------------------------------------------------------

// Create SQLite event store for execution events
eventsDBPath := filepath.Join(*dataDir, "events.db")
eventStore, err := evstore.NewSQLiteEventStore(eventsDBPath)
if err != nil {
logger.Warn("Failed to create event store — timeline/replay/diff features disabled", "error", err)
} else {
// Try to discover the event store from the service registry (registered
// by an eventstore.service module declared in config). Fall back to
// creating one directly if no module was configured.
var eventStore *evstore.SQLiteEventStore
for _, svc := range engine.GetApp().SvcRegistry() {
if es, ok := svc.(*evstore.SQLiteEventStore); ok {
eventStore = es
logger.Info("Discovered event store from service registry")
break
}
}
if eventStore == nil {
eventsDBPath := filepath.Join(*dataDir, "events.db")
var esErr error
eventStore, esErr = evstore.NewSQLiteEventStore(eventsDBPath)
if esErr != nil {
logger.Warn("Failed to create event store — timeline/replay/diff features disabled", "error", esErr)
} else {
logger.Info("Opened event store (fallback)", "path", eventsDBPath)
}
}
if eventStore != nil {
app.stores.eventStore = eventStore
logger.Info("Opened event store", "path", eventsDBPath)
}

// Create SQLite idempotency store (separate DB connection, same data dir)
Expand All @@ -582,47 +603,105 @@ func (app *serverApp) initStores(logger *slog.Logger) error {
// Timeline, replay, backfill handlers
// -----------------------------------------------------------------------

if eventStore != nil {
// Timeline handler (execution list, timeline, events)
timelineHandler := evstore.NewTimelineHandler(eventStore, logger)
timelineMux := http.NewServeMux()
timelineHandler.RegisterRoutes(timelineMux)
app.services.timelineMux = timelineMux

// Replay handler
replayHandler := evstore.NewReplayHandler(eventStore, logger)
replayMux := http.NewServeMux()
replayHandler.RegisterRoutes(replayMux)
app.services.replayMux = replayMux

// Backfill / Mock / Diff handler
backfillStore := evstore.NewInMemoryBackfillStore()
mockStore := evstore.NewInMemoryStepMockStore()
diffCalc := evstore.NewDiffCalculator(eventStore)
bmdHandler := evstore.NewBackfillMockDiffHandler(backfillStore, mockStore, diffCalc, logger)
bmdMux := http.NewServeMux()
bmdHandler.RegisterRoutes(bmdMux)
app.services.backfillMux = bmdMux

logger.Info("Created timeline, replay, and backfill/mock/diff handlers")
} else {
// Create stub handlers so delegate routes return 503 instead of 500
stubMsg := "event store unavailable — timeline/replay/backfill features disabled"
app.services.timelineMux = featureDisabledHandler(stubMsg)
app.services.replayMux = featureDisabledHandler(stubMsg)
app.services.backfillMux = featureDisabledHandler(stubMsg)
logger.Info("Created stub handlers for timeline/replay/backfill (event store unavailable)")
// Discover timeline/replay/backfill mux services registered by ProvidesServices
// (registered by a timeline.service module). Fall back to direct creation.
timelineDiscovered := false
var (
discoveredTimelineMux http.Handler
discoveredReplayMux http.Handler
discoveredBackfillMux http.Handler
)
for svcName, svc := range engine.GetApp().SvcRegistry() {
if strings.HasSuffix(svcName, ".timeline") {
if h, ok := svc.(http.Handler); ok {
discoveredTimelineMux = h
logger.Info("Discovered timeline mux from service registry", "service", svcName)
}
}
if strings.HasSuffix(svcName, ".replay") {
if h, ok := svc.(http.Handler); ok {
discoveredReplayMux = h
}
}
if strings.HasSuffix(svcName, ".backfill") {
if h, ok := svc.(http.Handler); ok {
discoveredBackfillMux = h
}
}
}
if discoveredTimelineMux != nil && discoveredReplayMux != nil && discoveredBackfillMux != nil {
app.services.timelineMux = discoveredTimelineMux
app.services.replayMux = discoveredReplayMux
app.services.backfillMux = discoveredBackfillMux
timelineDiscovered = true
logger.Info("Discovered timeline, replay, and backfill muxes from service registry")
}
if !timelineDiscovered {
if eventStore != nil {
timelineHandler := evstore.NewTimelineHandler(eventStore, logger)
timelineMux := http.NewServeMux()
timelineHandler.RegisterRoutes(timelineMux)
app.services.timelineMux = timelineMux

replayHandler := evstore.NewReplayHandler(eventStore, logger)
replayMux := http.NewServeMux()
replayHandler.RegisterRoutes(replayMux)
app.services.replayMux = replayMux

backfillStore := evstore.NewInMemoryBackfillStore()
mockStore := evstore.NewInMemoryStepMockStore()
diffCalc := evstore.NewDiffCalculator(eventStore)
bmdHandler := evstore.NewBackfillMockDiffHandler(backfillStore, mockStore, diffCalc, logger)
bmdMux := http.NewServeMux()
bmdHandler.RegisterRoutes(bmdMux)
app.services.backfillMux = bmdMux

logger.Info("Created timeline, replay, and backfill/mock/diff handlers (fallback)")
} else {
stubMsg := "event store unavailable — timeline/replay/backfill features disabled"
app.services.timelineMux = featureDisabledHandler(stubMsg)
app.services.replayMux = featureDisabledHandler(stubMsg)
app.services.backfillMux = featureDisabledHandler(stubMsg)
logger.Info("Created stub handlers for timeline/replay/backfill (event store unavailable)")
}
}

// -----------------------------------------------------------------------
// DLQ handler
// -----------------------------------------------------------------------

dlqStore := evstore.NewInMemoryDLQStore()
dlqHandler := evstore.NewDLQHandler(dlqStore, logger)
dlqMux := http.NewServeMux()
dlqHandler.RegisterRoutes(dlqMux)
app.services.dlqMux = dlqMux
// Discover DLQ mux and store from the service registry (registered by a
// dlq.service module). Fall back to direct creation.
dlqDiscovered := false
var dlqStore evstore.DLQStore
var discoveredDLQMux http.Handler
for svcName, svc := range engine.GetApp().SvcRegistry() {
if strings.HasSuffix(svcName, ".store") {
if ds, ok := svc.(*evstore.InMemoryDLQStore); ok {
dlqStore = ds
}
}
if strings.HasSuffix(svcName, ".admin") {
if h, ok := svc.(http.Handler); ok && strings.Contains(svcName, "dlq") {
discoveredDLQMux = h
logger.Info("Discovered DLQ mux from service registry", "service", svcName)
}
}
}
if discoveredDLQMux != nil && dlqStore != nil {
app.services.dlqMux = discoveredDLQMux
dlqDiscovered = true
logger.Info("Discovered DLQ service from service registry")
}
if !dlqDiscovered {
inMemDLQStore := evstore.NewInMemoryDLQStore()
dlqStore = inMemDLQStore
dlqHandler := evstore.NewDLQHandler(inMemDLQStore, logger)
dlqMux := http.NewServeMux()
dlqHandler.RegisterRoutes(dlqMux)
app.services.dlqMux = dlqMux
logger.Info("Created DLQ handler (fallback)")
}

// -----------------------------------------------------------------------
// Billing handler
Expand Down
1 change: 0 additions & 1 deletion module/command_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,4 +294,3 @@ func TestCommandHandler_RoutePipeline_TypedNil(t *testing.T) {
t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code)
}
}

95 changes: 95 additions & 0 deletions module/dlq_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package module

import (
"log/slog"
"net/http"

"github.com/CrisisTextLine/modular"
evstore "github.com/GoCodeAlone/workflow/store"
)

// DLQServiceConfig holds the configuration for the DLQ service module.
type DLQServiceConfig struct {
// MaxRetries is reserved for future implementation of per-entry retry limits.
// It is stored and exposed via MaxRetries() but not yet applied to the DLQ store.
MaxRetries int `yaml:"max_retries" default:"3"`
// RetentionDays is reserved for future implementation of automatic DLQ entry purging.
// It is stored and exposed via RetentionDays() but not yet applied to the DLQ store.
RetentionDays int `yaml:"retention_days" default:"30"`
}

// DLQServiceModule wraps an evstore.DLQHandler as a modular.Module.
// It initializes the in-memory DLQ store and handler, making them available
// in the modular service registry.
type DLQServiceModule struct {
name string
config DLQServiceConfig
store *evstore.InMemoryDLQStore
handler *evstore.DLQHandler
mux *http.ServeMux
}

// NewDLQServiceModule creates a new DLQ service module with the given name and config.
func NewDLQServiceModule(name string, cfg DLQServiceConfig) *DLQServiceModule {
logger := slog.Default()

dlqStore := evstore.NewInMemoryDLQStore()
dlqHandler := evstore.NewDLQHandler(dlqStore, logger)
dlqMux := http.NewServeMux()
dlqHandler.RegisterRoutes(dlqMux)

logger.Info("Created DLQ handler", "module", name)

return &DLQServiceModule{
name: name,
config: cfg,
store: dlqStore,
handler: dlqHandler,
mux: dlqMux,
}
}

// Name implements modular.Module.
func (m *DLQServiceModule) Name() string { return m.name }

// Init implements modular.Module.
func (m *DLQServiceModule) Init(_ modular.Application) error { return nil }

// ProvidesServices implements modular.Module. The DLQ handler mux is registered
// under the module name and also under {name}.admin for admin route delegation.
func (m *DLQServiceModule) ProvidesServices() []modular.ServiceProvider {
return []modular.ServiceProvider{
{
Name: m.name,
Description: "DLQ service: " + m.name,
Instance: m.mux,
},
{
Name: m.name + ".admin",
Description: "DLQ admin handler: " + m.name,
Instance: http.Handler(m.mux),
},
{
Name: m.name + ".store",
Description: "DLQ store: " + m.name,
Instance: m.store,
},
}
}

// RequiresServices implements modular.Module.
func (m *DLQServiceModule) RequiresServices() []modular.ServiceDependency {
return nil
}

// DLQMux returns the HTTP mux for DLQ endpoints.
func (m *DLQServiceModule) DLQMux() http.Handler { return m.mux }

// Store returns the underlying DLQ store.
func (m *DLQServiceModule) Store() *evstore.InMemoryDLQStore { return m.store }

// MaxRetries returns the configured max retry count.
func (m *DLQServiceModule) MaxRetries() int { return m.config.MaxRetries }

// RetentionDays returns the configured retention period.
func (m *DLQServiceModule) RetentionDays() int { return m.config.RetentionDays }
Loading
Loading