From dfe5a3c21b7c9079a1c1ab9efb32d00753825ee5 Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Thu, 26 Feb 2026 22:20:24 -0500 Subject: [PATCH 1/2] feat: dynamic config system with file watcher, database source, and per-module reconfiguration Adds runtime config reloading without engine restart: - ConfigSource interface with file, database, and composite implementations - fsnotify-based file watcher with debounce and hash-based change detection - DatabaseSource with cached refresh and DatabasePoller for periodic polling - Reconfigurable interface for modules that support hot reconfiguration - ConfigReloader coordinator that diffs configs and routes to partial reconfigure or full reload - ReconfigureModules on StdEngine for surgical module updates - PGConfigStore with transactional upsert and version history - Server integration: --watch flag, PUT /api/v1/modules/{name}/config endpoint Co-Authored-By: Claude Opus 4.6 --- cmd/server/main.go | 118 +++++++- config/diff.go | 93 +++++++ config/diff_test.go | 195 ++++++++++++++ config/reloader.go | 97 +++++++ config/reloader_test.go | 297 ++++++++++++++++++++ config/source.go | 43 +++ config/source_composite.go | 99 +++++++ config/source_composite_test.go | 223 +++++++++++++++ config/source_db.go | 119 ++++++++ config/source_db_poller.go | 97 +++++++ config/source_db_poller_test.go | 193 +++++++++++++ config/source_db_test.go | 228 ++++++++++++++++ config/source_file.go | 54 ++++ config/source_file_test.go | 138 ++++++++++ config/watcher.go | 198 ++++++++++++++ config/watcher_test.go | 215 +++++++++++++++ engine.go | 31 +++ engine_reconfig_test.go | 315 ++++++++++++++++++++++ interfaces/reconfigurable.go | 18 ++ store/migrations/010_config_documents.sql | 23 ++ store/pg.go | 5 + store/pg_config.go | 104 +++++++ store/pg_config_test.go | 149 ++++++++++ 23 files changed, 3049 insertions(+), 3 deletions(-) create mode 100644 config/diff.go create mode 100644 config/diff_test.go create mode 100644 config/reloader.go create mode 100644 config/reloader_test.go create mode 100644 config/source.go create mode 100644 config/source_composite.go create mode 100644 config/source_composite_test.go create mode 100644 config/source_db.go create mode 100644 config/source_db_poller.go create mode 100644 config/source_db_poller_test.go create mode 100644 config/source_db_test.go create mode 100644 config/source_file.go create mode 100644 config/source_file_test.go create mode 100644 config/watcher.go create mode 100644 config/watcher_test.go create mode 100644 engine_reconfig_test.go create mode 100644 interfaces/reconfigurable.go create mode 100644 store/migrations/010_config_documents.sql create mode 100644 store/pg_config.go create mode 100644 store/pg_config_test.go diff --git a/cmd/server/main.go b/cmd/server/main.go index 58601df9..e9264b3a 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -102,6 +102,8 @@ var ( // Deprecated: admin UI is now served by the external workflow-plugin-admin binary. // This flag is accepted for backwards compatibility but has no effect. _ = flag.String("admin-ui-dir", "", "Deprecated: admin UI is now served by the external workflow-plugin-admin binary") + + watchConfig = flag.Bool("watch", false, "Watch config file for changes and auto-reload") ) // defaultEnginePlugins returns the standard set of engine plugins used by all engine instances. @@ -342,12 +344,14 @@ type serverApp struct { stores storeComponents mgmt mgmtComponents services serviceComponents + currentConfig *config.WorkflowConfig // last loaded config, used by dynamic config watcher } // setup initializes all server components: engine, AI services, and HTTP mux. func setup(logger *slog.Logger, cfg *config.WorkflowConfig) (*serverApp, error) { app := &serverApp{ - logger: logger, + logger: logger, + currentConfig: cfg, } engine, loader, registry, err := buildEngine(cfg, logger) @@ -400,8 +404,9 @@ func setupFromAppConfig(logger *slog.Logger, appCfg *config.ApplicationConfig) ( } sApp := &serverApp{ - engine: engine, - logger: logger, + engine: engine, + logger: logger, + currentConfig: combined, } pool := dynamic.NewInterpreterPool() @@ -1220,6 +1225,74 @@ func run(ctx context.Context, app *serverApp, listenAddr string) error { } } + // Config file watcher — started after the engine and all post-start hooks are up. + var reloader *config.ConfigReloader + if *watchConfig && *configFile != "" { + fileSource := config.NewFileSource(*configFile) + + var reloaderErr error + reloader, reloaderErr = config.NewConfigReloader( + app.currentConfig, // the loaded WorkflowConfig + app.reloadEngine, // existing full reload function + app.engine, // implements ModuleReconfigurer + app.logger, + ) + if reloaderErr != nil { + app.logger.Error("Failed to create config reloader", "error", reloaderErr) + } else { + configWatcher := config.NewConfigWatcher(fileSource, func(evt config.ConfigChangeEvent) { + app.logger.Info("Config file changed, reloading", + "source", evt.Source, + "old_hash", evt.OldHash[:8], + "new_hash", evt.NewHash[:8]) + if err := reloader.HandleChange(evt); err != nil { + app.logger.Error("Config reload failed", "error", err) + } + }, config.WithWatchLogger(app.logger)) + + if err := configWatcher.Start(); err != nil { + app.logger.Error("Failed to start config watcher", "error", err) + } else { + defer func() { _ = configWatcher.Stop() }() + app.logger.Info("Config file watcher started", "file", *configFile) + } + } + } + + // Database config poller — activated when a PG store is available. + if app.pgStore != nil { + dbSource := config.NewDatabaseSource( + app.pgStore.ConfigDocs(), + config.WithRefreshInterval(30*time.Second), + ) + + // Reuse or create a reloader for the DB poller. + if reloader == nil { + var reloaderErr error + reloader, reloaderErr = config.NewConfigReloader(app.currentConfig, app.reloadEngine, app.engine, app.logger) + if reloaderErr != nil { + app.logger.Error("Failed to create config reloader for DB poller", "error", reloaderErr) + } + } + + if reloader != nil { + poller := config.NewDatabasePoller(dbSource, 30*time.Second, func(evt config.ConfigChangeEvent) { + app.logger.Info("Database config changed, reloading", + "source", evt.Source) + if err := reloader.HandleChange(evt); err != nil { + app.logger.Error("DB config reload failed", "error", err) + } + }, app.logger) + + if err := poller.Start(ctx); err != nil { + app.logger.Error("DB config poller start failed", "error", err) + } else { + defer poller.Stop() + app.logger.Info("Database config poller started", "interval", "30s") + } + } + } + // Wait for context cancellation <-ctx.Done() @@ -1317,6 +1390,7 @@ func applyEnvOverrides() { "load-workflows": "WORKFLOW_LOAD_WORKFLOWS", "import-bundle": "WORKFLOW_IMPORT_BUNDLE", "license-key": "WORKFLOW_LICENSE_KEY", + "watch": "WORKFLOW_WATCH", } // Track which flags were explicitly set on the command line. @@ -1513,6 +1587,44 @@ func runMultiWorkflow(logger *slog.Logger) error { // 8. Mount API router on the same HTTP mux mux := http.NewServeMux() mux.Handle("/api/v1/", apiRouter) + + // Module reconfiguration endpoint — allows runtime hot-reload of individual + // modules that implement interfaces.Reconfigurable without a full engine restart. + mux.HandleFunc("PUT /api/v1/modules/{name}/config", func(w http.ResponseWriter, r *http.Request) { + moduleName := r.PathValue("name") + if moduleName == "" { + http.Error(w, `{"error":"module name required"}`, http.StatusBadRequest) + return + } + + var newConfig map[string]any + if err := json.NewDecoder(r.Body).Decode(&newConfig); err != nil { + http.Error(w, fmt.Sprintf(`{"error":"invalid JSON: %v"}`, err), http.StatusBadRequest) + return + } + + mod := app.engine.GetApp().GetModule(moduleName) + if mod == nil { + http.Error(w, fmt.Sprintf(`{"error":"module %q not found"}`, moduleName), http.StatusNotFound) + return + } + + reconf, ok := mod.(interfaces.Reconfigurable) + if !ok { + http.Error(w, fmt.Sprintf(`{"error":"module %q does not support runtime reconfiguration"}`, moduleName), http.StatusNotImplemented) + return + } + + if err := reconf.Reconfigure(r.Context(), newConfig); err != nil { + http.Error(w, fmt.Sprintf(`{"error":"reconfiguration failed: %v"}`, err), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + resp, _ := json.Marshal(map[string]string{"status": "ok", "module": moduleName}) + w.Write(resp) //nolint:errcheck + }) + mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") _, _ = w.Write([]byte(`{"mode":"multi-workflow","status":"ok"}`)) diff --git a/config/diff.go b/config/diff.go new file mode 100644 index 00000000..b9b58899 --- /dev/null +++ b/config/diff.go @@ -0,0 +1,93 @@ +package config + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + + "gopkg.in/yaml.v3" +) + +// ModuleConfigDiff represents what changed between two configs. +type ModuleConfigDiff struct { + Added []ModuleConfig // modules in new but not old + Removed []ModuleConfig // modules in old but not new + Modified []ModuleConfigChange // modules present in both with different config + Unchanged []string // module names with no config change +} + +// ModuleConfigChange represents a change to a single module's config. +type ModuleConfigChange struct { + Name string + OldConfig map[string]any + NewConfig map[string]any +} + +// DiffModuleConfigs compares two configs and identifies module-level changes. +func DiffModuleConfigs(old, new *WorkflowConfig) *ModuleConfigDiff { + diff := &ModuleConfigDiff{} + + oldMap := make(map[string]ModuleConfig) + for _, m := range old.Modules { + oldMap[m.Name] = m + } + newMap := make(map[string]ModuleConfig) + for _, m := range new.Modules { + newMap[m.Name] = m + } + + for name, newMod := range newMap { + oldMod, exists := oldMap[name] + if !exists { + diff.Added = append(diff.Added, newMod) + continue + } + if hashModuleConfig(oldMod) != hashModuleConfig(newMod) { + diff.Modified = append(diff.Modified, ModuleConfigChange{ + Name: name, + OldConfig: oldMod.Config, + NewConfig: newMod.Config, + }) + } else { + diff.Unchanged = append(diff.Unchanged, name) + } + } + + for name, oldMod := range oldMap { + if _, exists := newMap[name]; !exists { + diff.Removed = append(diff.Removed, oldMod) + } + } + + return diff +} + +// HasNonModuleChanges returns true if workflows, triggers, pipelines, or +// platform config changed between old and new (requiring full reload). +func HasNonModuleChanges(old, new *WorkflowConfig) bool { + return hashAny(old.Workflows) != hashAny(new.Workflows) || + hashAny(old.Triggers) != hashAny(new.Triggers) || + hashAny(old.Pipelines) != hashAny(new.Pipelines) || + hashAny(old.Platform) != hashAny(new.Platform) +} + +func hashModuleConfig(m ModuleConfig) string { + data, err := yaml.Marshal(m) + if err != nil { + return fmt.Sprintf("error:%v", err) + } + sum := sha256.Sum256(data) + return hex.EncodeToString(sum[:]) +} + +func hashAny(v any) string { + if v == nil { + return "nil" + } + data, err := yaml.Marshal(v) + if err != nil { + return fmt.Sprintf("error:%v", err) + } + sum := sha256.Sum256(data) + return hex.EncodeToString(sum[:]) +} diff --git a/config/diff_test.go b/config/diff_test.go new file mode 100644 index 00000000..0cb35eca --- /dev/null +++ b/config/diff_test.go @@ -0,0 +1,195 @@ +package config + +import ( + "testing" +) + +func makeConfig(modules []ModuleConfig, workflows, triggers, pipelines map[string]any) *WorkflowConfig { + return &WorkflowConfig{ + Modules: modules, + Workflows: workflows, + Triggers: triggers, + Pipelines: pipelines, + } +} + +func TestDiffModuleConfigs_NoChanges(t *testing.T) { + modules := []ModuleConfig{ + {Name: "alpha", Type: "http.server", Config: map[string]any{"port": 8080}}, + {Name: "beta", Type: "http.router"}, + } + old := makeConfig(modules, nil, nil, nil) + new := makeConfig(modules, nil, nil, nil) + + diff := DiffModuleConfigs(old, new) + if len(diff.Added) != 0 { + t.Errorf("expected 0 added, got %d", len(diff.Added)) + } + if len(diff.Removed) != 0 { + t.Errorf("expected 0 removed, got %d", len(diff.Removed)) + } + if len(diff.Modified) != 0 { + t.Errorf("expected 0 modified, got %d", len(diff.Modified)) + } + if len(diff.Unchanged) != 2 { + t.Errorf("expected 2 unchanged, got %d", len(diff.Unchanged)) + } +} + +func TestDiffModuleConfigs_AddedModule(t *testing.T) { + old := makeConfig([]ModuleConfig{ + {Name: "alpha", Type: "http.server"}, + }, nil, nil, nil) + new := makeConfig([]ModuleConfig{ + {Name: "alpha", Type: "http.server"}, + {Name: "beta", Type: "http.router"}, + }, nil, nil, nil) + + diff := DiffModuleConfigs(old, new) + if len(diff.Added) != 1 { + t.Fatalf("expected 1 added, got %d", len(diff.Added)) + } + if diff.Added[0].Name != "beta" { + t.Errorf("expected added module 'beta', got %q", diff.Added[0].Name) + } + if len(diff.Removed) != 0 { + t.Errorf("expected 0 removed, got %d", len(diff.Removed)) + } + if len(diff.Modified) != 0 { + t.Errorf("expected 0 modified, got %d", len(diff.Modified)) + } +} + +func TestDiffModuleConfigs_RemovedModule(t *testing.T) { + old := makeConfig([]ModuleConfig{ + {Name: "alpha", Type: "http.server"}, + {Name: "beta", Type: "http.router"}, + }, nil, nil, nil) + new := makeConfig([]ModuleConfig{ + {Name: "alpha", Type: "http.server"}, + }, nil, nil, nil) + + diff := DiffModuleConfigs(old, new) + if len(diff.Removed) != 1 { + t.Fatalf("expected 1 removed, got %d", len(diff.Removed)) + } + if diff.Removed[0].Name != "beta" { + t.Errorf("expected removed module 'beta', got %q", diff.Removed[0].Name) + } + if len(diff.Added) != 0 { + t.Errorf("expected 0 added, got %d", len(diff.Added)) + } +} + +func TestDiffModuleConfigs_ModifiedModule(t *testing.T) { + old := makeConfig([]ModuleConfig{ + {Name: "alpha", Type: "http.server", Config: map[string]any{"port": 8080}}, + }, nil, nil, nil) + new := makeConfig([]ModuleConfig{ + {Name: "alpha", Type: "http.server", Config: map[string]any{"port": 9090}}, + }, nil, nil, nil) + + diff := DiffModuleConfigs(old, new) + if len(diff.Modified) != 1 { + t.Fatalf("expected 1 modified, got %d", len(diff.Modified)) + } + change := diff.Modified[0] + if change.Name != "alpha" { + t.Errorf("expected modified module 'alpha', got %q", change.Name) + } + if change.OldConfig["port"] != 8080 { + t.Errorf("expected old port 8080, got %v", change.OldConfig["port"]) + } + if change.NewConfig["port"] != 9090 { + t.Errorf("expected new port 9090, got %v", change.NewConfig["port"]) + } + if len(diff.Added) != 0 { + t.Errorf("expected 0 added, got %d", len(diff.Added)) + } + if len(diff.Removed) != 0 { + t.Errorf("expected 0 removed, got %d", len(diff.Removed)) + } + if len(diff.Unchanged) != 0 { + t.Errorf("expected 0 unchanged, got %d", len(diff.Unchanged)) + } +} + +func TestDiffModuleConfigs_Mixed(t *testing.T) { + old := makeConfig([]ModuleConfig{ + {Name: "keep", Type: "http.server", Config: map[string]any{"port": 8080}}, + {Name: "modify", Type: "cache", Config: map[string]any{"ttl": 60}}, + {Name: "remove", Type: "db"}, + }, nil, nil, nil) + new := makeConfig([]ModuleConfig{ + {Name: "keep", Type: "http.server", Config: map[string]any{"port": 8080}}, + {Name: "modify", Type: "cache", Config: map[string]any{"ttl": 120}}, + {Name: "add", Type: "queue"}, + }, nil, nil, nil) + + diff := DiffModuleConfigs(old, new) + + if len(diff.Added) != 1 || diff.Added[0].Name != "add" { + t.Errorf("expected 1 added ('add'), got %v", diff.Added) + } + if len(diff.Removed) != 1 || diff.Removed[0].Name != "remove" { + t.Errorf("expected 1 removed ('remove'), got %v", diff.Removed) + } + if len(diff.Modified) != 1 || diff.Modified[0].Name != "modify" { + t.Errorf("expected 1 modified ('modify'), got %v", diff.Modified) + } + if len(diff.Unchanged) != 1 || diff.Unchanged[0] != "keep" { + t.Errorf("expected 1 unchanged ('keep'), got %v", diff.Unchanged) + } +} + +func TestHasNonModuleChanges_NoChanges(t *testing.T) { + wf := map[string]any{"flow1": map[string]any{"initial": "start"}} + tr := map[string]any{"t1": map[string]any{"type": "http"}} + pl := map[string]any{"p1": map[string]any{"steps": []any{}}} + + old := &WorkflowConfig{Workflows: wf, Triggers: tr, Pipelines: pl} + new := &WorkflowConfig{Workflows: wf, Triggers: tr, Pipelines: pl} + + if HasNonModuleChanges(old, new) { + t.Error("expected no non-module changes for identical configs") + } +} + +func TestHasNonModuleChanges_WorkflowChanged(t *testing.T) { + old := &WorkflowConfig{ + Workflows: map[string]any{"flow1": map[string]any{"initial": "start"}}, + } + new := &WorkflowConfig{ + Workflows: map[string]any{"flow1": map[string]any{"initial": "running"}}, + } + + if !HasNonModuleChanges(old, new) { + t.Error("expected non-module changes when workflow differs") + } +} + +func TestHasNonModuleChanges_TriggerChanged(t *testing.T) { + old := &WorkflowConfig{ + Triggers: map[string]any{"t1": map[string]any{"type": "http", "path": "/old"}}, + } + new := &WorkflowConfig{ + Triggers: map[string]any{"t1": map[string]any{"type": "http", "path": "/new"}}, + } + + if !HasNonModuleChanges(old, new) { + t.Error("expected non-module changes when trigger differs") + } +} + +func TestHasNonModuleChanges_PipelineChanged(t *testing.T) { + old := &WorkflowConfig{ + Pipelines: map[string]any{"p1": map[string]any{"steps": []any{"a"}}}, + } + new := &WorkflowConfig{ + Pipelines: map[string]any{"p1": map[string]any{"steps": []any{"a", "b"}}}, + } + + if !HasNonModuleChanges(old, new) { + t.Error("expected non-module changes when pipeline differs") + } +} diff --git a/config/reloader.go b/config/reloader.go new file mode 100644 index 00000000..61c4686a --- /dev/null +++ b/config/reloader.go @@ -0,0 +1,97 @@ +package config + +import ( + "context" + "log/slog" + "sync" +) + +// ModuleReconfigurer is implemented by the engine to support partial (per-module) reloads. +// When a config change only affects module configs, the engine can apply changes surgically +// rather than performing a full stop/rebuild/start cycle. +type ModuleReconfigurer interface { + // ReconfigureModules applies new configuration to specific running modules. + // Returns the names of any modules that could not be reconfigured in-place + // (requiring a full reload) and any hard error. + ReconfigureModules(ctx context.Context, changes []ModuleConfigChange) (failedModules []string, err error) +} + +// ConfigReloader coordinates config change detection and engine reload decisions. +// It diffs old and new configs, performs partial per-module reconfiguration when +// possible, and falls back to a full reload when non-module sections change or +// modules are added/removed/non-reconfigurable. +type ConfigReloader struct { + mu sync.Mutex + current *WorkflowConfig + currentHash string + logger *slog.Logger + + fullReloadFn func(*WorkflowConfig) error + reconfigurer ModuleReconfigurer +} + +// NewConfigReloader creates a ConfigReloader with the given initial config. +// fullReloadFn is called when a full engine restart is required. +// reconfigurer is optional; if nil, all module changes fall back to fullReloadFn. +func NewConfigReloader( + initial *WorkflowConfig, + fullReloadFn func(*WorkflowConfig) error, + reconfigurer ModuleReconfigurer, + logger *slog.Logger, +) (*ConfigReloader, error) { + hash, err := HashConfig(initial) + if err != nil { + return nil, err + } + return &ConfigReloader{ + current: initial, + currentHash: hash, + logger: logger, + fullReloadFn: fullReloadFn, + reconfigurer: reconfigurer, + }, nil +} + +// HandleChange processes a config change event. It diffs the old and new configs, +// attempts per-module reconfiguration for module-only changes, and falls back +// to a full reload when necessary. +func (r *ConfigReloader) HandleChange(evt ConfigChangeEvent) error { + r.mu.Lock() + defer r.mu.Unlock() + + diff := DiffModuleConfigs(r.current, evt.Config) + + // Non-module sections changed, or modules were added/removed — full reload required. + if HasNonModuleChanges(r.current, evt.Config) || + len(diff.Added) > 0 || len(diff.Removed) > 0 { + r.logger.Info("non-module changes detected, performing full reload", + "added", len(diff.Added), "removed", len(diff.Removed)) + if err := r.fullReloadFn(evt.Config); err != nil { + return err + } + r.current = evt.Config + r.currentHash = evt.NewHash + return nil + } + + // Only module config changes — try partial reconfiguration. + if len(diff.Modified) > 0 && r.reconfigurer != nil { + failed, err := r.reconfigurer.ReconfigureModules(context.Background(), diff.Modified) + if err != nil { + return err + } + if len(failed) > 0 { + r.logger.Info("some modules cannot be reconfigured in-place, performing full reload", + "modules", failed) + if err := r.fullReloadFn(evt.Config); err != nil { + return err + } + } + r.current = evt.Config + r.currentHash = evt.NewHash + return nil + } + + r.logger.Debug("config change detected but no effective differences") + return nil +} diff --git a/config/reloader_test.go b/config/reloader_test.go new file mode 100644 index 00000000..11c8fb6c --- /dev/null +++ b/config/reloader_test.go @@ -0,0 +1,297 @@ +package config + +import ( + "context" + "errors" + "log/slog" + "os" + "testing" + "time" +) + +// mockReconfigurer is a test double for ModuleReconfigurer. +type mockReconfigurer struct { + called [][]ModuleConfigChange + failed []string + err error +} + +func (m *mockReconfigurer) ReconfigureModules(_ context.Context, changes []ModuleConfigChange) ([]string, error) { + m.called = append(m.called, changes) + return m.failed, m.err +} + +func newTestReloader(t *testing.T, initial *WorkflowConfig, fullFn func(*WorkflowConfig) error, rec ModuleReconfigurer) *ConfigReloader { + t.Helper() + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})) + r, err := NewConfigReloader(initial, fullFn, rec, logger) + if err != nil { + t.Fatalf("NewConfigReloader: %v", err) + } + return r +} + +func makeWorkflowConfig(modules []ModuleConfig, workflows map[string]any) *WorkflowConfig { + return &WorkflowConfig{ + Modules: modules, + Workflows: workflows, + Triggers: make(map[string]any), + Pipelines: make(map[string]any), + } +} + +func makeChangeEvent(cfg *WorkflowConfig) (ConfigChangeEvent, error) { + hash, err := HashConfig(cfg) + if err != nil { + return ConfigChangeEvent{}, err + } + return ConfigChangeEvent{ + Source: "test", + OldHash: "oldhash", + NewHash: hash, + Config: cfg, + Time: time.Now(), + }, nil +} + +func TestConfigReloader_FullReload_NonModuleChanges(t *testing.T) { + initial := makeWorkflowConfig( + []ModuleConfig{{Name: "alpha", Type: "http.server"}}, + map[string]any{"flow1": map[string]any{"initial": "start"}}, + ) + + var fullReloadCalled int + var lastFullConfig *WorkflowConfig + + fullFn := func(cfg *WorkflowConfig) error { + fullReloadCalled++ + lastFullConfig = cfg + return nil + } + + r := newTestReloader(t, initial, fullFn, nil) + + // New config: same modules, different workflow. + newCfg := makeWorkflowConfig( + []ModuleConfig{{Name: "alpha", Type: "http.server"}}, + map[string]any{"flow1": map[string]any{"initial": "running"}}, + ) + evt, err := makeChangeEvent(newCfg) + if err != nil { + t.Fatalf("makeChangeEvent: %v", err) + } + + if err := r.HandleChange(evt); err != nil { + t.Fatalf("HandleChange: %v", err) + } + + if fullReloadCalled != 1 { + t.Errorf("expected 1 full reload, got %d", fullReloadCalled) + } + if lastFullConfig != newCfg { + t.Error("expected full reload called with new config") + } +} + +func TestConfigReloader_FullReload_AddedModule(t *testing.T) { + initial := makeWorkflowConfig( + []ModuleConfig{{Name: "alpha", Type: "http.server"}}, + nil, + ) + + var fullReloadCalled int + fullFn := func(cfg *WorkflowConfig) error { + fullReloadCalled++ + return nil + } + + r := newTestReloader(t, initial, fullFn, nil) + + newCfg := makeWorkflowConfig( + []ModuleConfig{ + {Name: "alpha", Type: "http.server"}, + {Name: "beta", Type: "http.router"}, + }, + nil, + ) + evt, err := makeChangeEvent(newCfg) + if err != nil { + t.Fatalf("makeChangeEvent: %v", err) + } + + if err := r.HandleChange(evt); err != nil { + t.Fatalf("HandleChange: %v", err) + } + if fullReloadCalled != 1 { + t.Errorf("expected 1 full reload for added module, got %d", fullReloadCalled) + } +} + +func TestConfigReloader_FullReload_RemovedModule(t *testing.T) { + initial := makeWorkflowConfig( + []ModuleConfig{ + {Name: "alpha", Type: "http.server"}, + {Name: "beta", Type: "http.router"}, + }, + nil, + ) + + var fullReloadCalled int + fullFn := func(cfg *WorkflowConfig) error { + fullReloadCalled++ + return nil + } + + r := newTestReloader(t, initial, fullFn, nil) + + newCfg := makeWorkflowConfig( + []ModuleConfig{{Name: "alpha", Type: "http.server"}}, + nil, + ) + evt, err := makeChangeEvent(newCfg) + if err != nil { + t.Fatalf("makeChangeEvent: %v", err) + } + + if err := r.HandleChange(evt); err != nil { + t.Fatalf("HandleChange: %v", err) + } + if fullReloadCalled != 1 { + t.Errorf("expected 1 full reload for removed module, got %d", fullReloadCalled) + } +} + +func TestConfigReloader_PartialReconfigure(t *testing.T) { + initial := makeWorkflowConfig( + []ModuleConfig{{Name: "alpha", Type: "cache", Config: map[string]any{"ttl": 60}}}, + nil, + ) + + var fullReloadCalled int + fullFn := func(cfg *WorkflowConfig) error { + fullReloadCalled++ + return nil + } + + rec := &mockReconfigurer{} + r := newTestReloader(t, initial, fullFn, rec) + + newCfg := makeWorkflowConfig( + []ModuleConfig{{Name: "alpha", Type: "cache", Config: map[string]any{"ttl": 120}}}, + nil, + ) + evt, err := makeChangeEvent(newCfg) + if err != nil { + t.Fatalf("makeChangeEvent: %v", err) + } + + if err := r.HandleChange(evt); err != nil { + t.Fatalf("HandleChange: %v", err) + } + + if fullReloadCalled != 0 { + t.Errorf("expected 0 full reloads for module-only change, got %d", fullReloadCalled) + } + if len(rec.called) != 1 { + t.Fatalf("expected ReconfigureModules called once, got %d", len(rec.called)) + } + if len(rec.called[0]) != 1 || rec.called[0][0].Name != "alpha" { + t.Errorf("expected ReconfigureModules called with 'alpha', got %v", rec.called[0]) + } +} + +func TestConfigReloader_FallbackToFull(t *testing.T) { + initial := makeWorkflowConfig( + []ModuleConfig{{Name: "alpha", Type: "db", Config: map[string]any{"dsn": "old"}}}, + nil, + ) + + var fullReloadCalled int + fullFn := func(cfg *WorkflowConfig) error { + fullReloadCalled++ + return nil + } + + // Reconfigurer reports alpha as failed. + rec := &mockReconfigurer{failed: []string{"alpha"}} + r := newTestReloader(t, initial, fullFn, rec) + + newCfg := makeWorkflowConfig( + []ModuleConfig{{Name: "alpha", Type: "db", Config: map[string]any{"dsn": "new"}}}, + nil, + ) + evt, err := makeChangeEvent(newCfg) + if err != nil { + t.Fatalf("makeChangeEvent: %v", err) + } + + if err := r.HandleChange(evt); err != nil { + t.Fatalf("HandleChange: %v", err) + } + + if len(rec.called) != 1 { + t.Errorf("expected ReconfigureModules called once, got %d", len(rec.called)) + } + if fullReloadCalled != 1 { + t.Errorf("expected 1 full reload after fallback, got %d", fullReloadCalled) + } +} + +func TestConfigReloader_NoEffectiveChanges(t *testing.T) { + initial := makeWorkflowConfig( + []ModuleConfig{{Name: "alpha", Type: "http.server", Config: map[string]any{"port": 8080}}}, + nil, + ) + + var fullReloadCalled int + fullFn := func(cfg *WorkflowConfig) error { + fullReloadCalled++ + return nil + } + + rec := &mockReconfigurer{} + r := newTestReloader(t, initial, fullFn, rec) + + // Identical config. + sameCfg := makeWorkflowConfig( + []ModuleConfig{{Name: "alpha", Type: "http.server", Config: map[string]any{"port": 8080}}}, + nil, + ) + evt, err := makeChangeEvent(sameCfg) + if err != nil { + t.Fatalf("makeChangeEvent: %v", err) + } + + if err := r.HandleChange(evt); err != nil { + t.Fatalf("HandleChange: %v", err) + } + + if fullReloadCalled != 0 { + t.Errorf("expected 0 full reloads for identical config, got %d", fullReloadCalled) + } + if len(rec.called) != 0 { + t.Errorf("expected 0 ReconfigureModules calls for identical config, got %d", len(rec.called)) + } +} + +func TestConfigReloader_FullReloadError(t *testing.T) { + initial := makeWorkflowConfig(nil, nil) + sentinel := errors.New("reload failed") + + fullFn := func(cfg *WorkflowConfig) error { + return sentinel + } + + r := newTestReloader(t, initial, fullFn, nil) + + newCfg := makeWorkflowConfig(nil, map[string]any{"flow1": "new"}) + evt, err := makeChangeEvent(newCfg) + if err != nil { + t.Fatalf("makeChangeEvent: %v", err) + } + + err = r.HandleChange(evt) + if !errors.Is(err, sentinel) { + t.Errorf("expected sentinel error from HandleChange, got %v", err) + } +} diff --git a/config/source.go b/config/source.go new file mode 100644 index 00000000..adff8c3d --- /dev/null +++ b/config/source.go @@ -0,0 +1,43 @@ +package config + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "time" + + "gopkg.in/yaml.v3" +) + +// ConfigSource provides configuration from an arbitrary backend. +// Implementations must be safe for concurrent use. +type ConfigSource interface { + // Load retrieves the current configuration. + Load(ctx context.Context) (*WorkflowConfig, error) + + // Hash returns a content-addressable hash of the current config. + // Used for change detection without full deserialization. + Hash(ctx context.Context) (string, error) + + // Name returns a human-readable identifier for this source. + Name() string +} + +// ConfigChangeEvent is emitted when a ConfigSource detects a change. +type ConfigChangeEvent struct { + Source string + OldHash string + NewHash string + Config *WorkflowConfig + Time time.Time +} + +// HashConfig returns the SHA256 hex digest of the YAML-serialized config. +func HashConfig(cfg *WorkflowConfig) (string, error) { + data, err := yaml.Marshal(cfg) + if err != nil { + return "", err + } + sum := sha256.Sum256(data) + return hex.EncodeToString(sum[:]), nil +} diff --git a/config/source_composite.go b/config/source_composite.go new file mode 100644 index 00000000..66848110 --- /dev/null +++ b/config/source_composite.go @@ -0,0 +1,99 @@ +package config + +import ( + "context" + "fmt" +) + +// CompositeSource layers multiple ConfigSources. Later sources override earlier ones. +// Module-level overrides are applied by name; map keys (workflows, triggers, +// pipelines, platform) from later sources replace or add to those from earlier ones. +type CompositeSource struct { + sources []ConfigSource +} + +// NewCompositeSource creates a CompositeSource from the given sources. +// Sources are applied in order: sources[0] is the base, each subsequent source +// overlays on top of the result. +func NewCompositeSource(sources ...ConfigSource) *CompositeSource { + return &CompositeSource{sources: sources} +} + +// Load loads all sources and merges them into a single WorkflowConfig. +func (s *CompositeSource) Load(ctx context.Context) (*WorkflowConfig, error) { + if len(s.sources) == 0 { + return nil, fmt.Errorf("composite source: no sources configured") + } + base, err := s.sources[0].Load(ctx) + if err != nil { + return nil, err + } + for _, src := range s.sources[1:] { + overlay, err := src.Load(ctx) + if err != nil { + return nil, fmt.Errorf("composite source %s: %w", src.Name(), err) + } + mergeOverlay(base, overlay) + } + return base, nil +} + +// Hash loads the merged config and returns its hash. +func (s *CompositeSource) Hash(ctx context.Context) (string, error) { + cfg, err := s.Load(ctx) + if err != nil { + return "", err + } + return HashConfig(cfg) +} + +// Name returns a human-readable identifier for this source. +func (s *CompositeSource) Name() string { return "composite" } + +// mergeOverlay applies overlay's configuration on top of base in place. +// Modules: overlay modules replace base modules with the same name; new names are appended. +// Workflows/Triggers/Pipelines/Platform: overlay keys replace or add to base keys. +func mergeOverlay(base, overlay *WorkflowConfig) { + if overlay == nil { + return + } + + // Replace or append modules by name. + existing := make(map[string]int, len(base.Modules)) + for i, m := range base.Modules { + existing[m.Name] = i + } + for _, m := range overlay.Modules { + if idx, ok := existing[m.Name]; ok { + base.Modules[idx] = m + } else { + base.Modules = append(base.Modules, m) + } + } + + // Merge map sections. + for k, v := range overlay.Workflows { + if base.Workflows == nil { + base.Workflows = make(map[string]any) + } + base.Workflows[k] = v + } + for k, v := range overlay.Triggers { + if base.Triggers == nil { + base.Triggers = make(map[string]any) + } + base.Triggers[k] = v + } + for k, v := range overlay.Pipelines { + if base.Pipelines == nil { + base.Pipelines = make(map[string]any) + } + base.Pipelines[k] = v + } + for k, v := range overlay.Platform { + if base.Platform == nil { + base.Platform = make(map[string]any) + } + base.Platform[k] = v + } +} diff --git a/config/source_composite_test.go b/config/source_composite_test.go new file mode 100644 index 00000000..5765bb04 --- /dev/null +++ b/config/source_composite_test.go @@ -0,0 +1,223 @@ +package config + +import ( + "context" + "os" + "path/filepath" + "testing" +) + +const compositeBaseYAML = ` +modules: + - name: server + type: http.server + config: + port: 8080 + - name: router + type: http.router +workflows: + flow1: + initial: start +triggers: + t1: + type: http +` + +const compositeOverlayYAML = ` +modules: + - name: server + type: http.server + config: + port: 9090 +` + +const compositeNewModuleYAML = ` +modules: + - name: cache + type: redis + config: + addr: localhost:6379 +` + +func writeConfigFile(t *testing.T, dir, name, content string) string { + t.Helper() + fp := filepath.Join(dir, name) + if err := os.WriteFile(fp, []byte(content), 0644); err != nil { + t.Fatalf("write %s: %v", name, err) + } + return fp +} + +func TestCompositeSource_MergeModules(t *testing.T) { + dir := t.TempDir() + base := writeConfigFile(t, dir, "base.yaml", compositeBaseYAML) + overlay := writeConfigFile(t, dir, "overlay.yaml", compositeOverlayYAML) + + cs := NewCompositeSource(NewFileSource(base), NewFileSource(overlay)) + cfg, err := cs.Load(context.Background()) + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + // 'server' module should be replaced by overlay (port 9090), 'router' stays. + if len(cfg.Modules) != 2 { + t.Fatalf("expected 2 modules after merge, got %d", len(cfg.Modules)) + } + + serverFound := false + for _, m := range cfg.Modules { + if m.Name == "server" { + serverFound = true + port := m.Config["port"] + if port != 9090 { + t.Errorf("expected overlaid server port 9090, got %v", port) + } + } + } + if !serverFound { + t.Error("expected 'server' module in merged config") + } +} + +func TestCompositeSource_AddModules(t *testing.T) { + dir := t.TempDir() + base := writeConfigFile(t, dir, "base.yaml", compositeBaseYAML) + overlay := writeConfigFile(t, dir, "new_mod.yaml", compositeNewModuleYAML) + + cs := NewCompositeSource(NewFileSource(base), NewFileSource(overlay)) + cfg, err := cs.Load(context.Background()) + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + // base has 2 modules, overlay adds 1 new one → 3 total. + if len(cfg.Modules) != 3 { + t.Fatalf("expected 3 modules after overlay add, got %d", len(cfg.Modules)) + } + + cacheFound := false + for _, m := range cfg.Modules { + if m.Name == "cache" { + cacheFound = true + if m.Type != "redis" { + t.Errorf("expected cache type 'redis', got %q", m.Type) + } + } + } + if !cacheFound { + t.Error("expected 'cache' module in merged config") + } +} + +func TestCompositeSource_MergeWorkflows(t *testing.T) { + dir := t.TempDir() + + base := writeConfigFile(t, dir, "base.yaml", ` +modules: [] +workflows: + flow1: + initial: start +triggers: {} +`) + overlay := writeConfigFile(t, dir, "overlay.yaml", ` +modules: [] +workflows: + flow2: + initial: running +triggers: {} +`) + + cs := NewCompositeSource(NewFileSource(base), NewFileSource(overlay)) + cfg, err := cs.Load(context.Background()) + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + if cfg.Workflows["flow1"] == nil { + t.Error("expected 'flow1' from base to be present") + } + if cfg.Workflows["flow2"] == nil { + t.Error("expected 'flow2' from overlay to be present") + } +} + +func TestCompositeSource_NoSources(t *testing.T) { + cs := NewCompositeSource() + _, err := cs.Load(context.Background()) + if err == nil { + t.Fatal("expected error for empty sources") + } +} + +func TestCompositeSource_Name(t *testing.T) { + cs := NewCompositeSource() + if cs.Name() != "composite" { + t.Errorf("expected name 'composite', got %q", cs.Name()) + } +} + +func TestCompositeSource_Hash(t *testing.T) { + dir := t.TempDir() + base := writeConfigFile(t, dir, "base.yaml", compositeBaseYAML) + + cs := NewCompositeSource(NewFileSource(base)) + ctx := context.Background() + + h1, err := cs.Hash(ctx) + if err != nil { + t.Fatalf("Hash() error: %v", err) + } + if h1 == "" { + t.Fatal("expected non-empty hash") + } + + // Same content → same hash. + h2, err := cs.Hash(ctx) + if err != nil { + t.Fatalf("Hash() second call error: %v", err) + } + if h1 != h2 { + t.Errorf("expected stable hashes, got %q and %q", h1, h2) + } +} + +func TestCompositeSource_OverlayTriggersAndPipelines(t *testing.T) { + dir := t.TempDir() + base := writeConfigFile(t, dir, "base.yaml", ` +modules: [] +triggers: + t1: + type: http +pipelines: + p1: + steps: [] +`) + overlay := writeConfigFile(t, dir, "overlay.yaml", ` +modules: [] +triggers: + t2: + type: cron +pipelines: + p2: + steps: [] +`) + + cs := NewCompositeSource(NewFileSource(base), NewFileSource(overlay)) + cfg, err := cs.Load(context.Background()) + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + if cfg.Triggers["t1"] == nil { + t.Error("expected 't1' trigger from base") + } + if cfg.Triggers["t2"] == nil { + t.Error("expected 't2' trigger from overlay") + } + if cfg.Pipelines["p1"] == nil { + t.Error("expected 'p1' pipeline from base") + } + if cfg.Pipelines["p2"] == nil { + t.Error("expected 'p2' pipeline from overlay") + } +} diff --git a/config/source_db.go b/config/source_db.go new file mode 100644 index 00000000..2755db99 --- /dev/null +++ b/config/source_db.go @@ -0,0 +1,119 @@ +package config + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "sync" + "time" + + "gopkg.in/yaml.v3" +) + +// DBConfigStore is the database interface needed by DatabaseSource. +type DBConfigStore interface { + GetConfigDocument(ctx context.Context, key string) ([]byte, error) + GetConfigDocumentHash(ctx context.Context, key string) (string, error) + PutConfigDocument(ctx context.Context, key string, data []byte) error +} + +// DatabaseSource loads config from a database with caching. +type DatabaseSource struct { + store DBConfigStore + key string + refreshInterval time.Duration + + mu sync.RWMutex + cached *WorkflowConfig + cachedHash string + cachedAt time.Time +} + +// DatabaseSourceOption configures a DatabaseSource. +type DatabaseSourceOption func(*DatabaseSource) + +// WithRefreshInterval sets the cache TTL for the DatabaseSource. +func WithRefreshInterval(d time.Duration) DatabaseSourceOption { + return func(s *DatabaseSource) { s.refreshInterval = d } +} + +// WithConfigKey sets the document key used to look up config in the database. +func WithConfigKey(key string) DatabaseSourceOption { + return func(s *DatabaseSource) { s.key = key } +} + +// NewDatabaseSource creates a DatabaseSource backed by the given store. +func NewDatabaseSource(store DBConfigStore, opts ...DatabaseSourceOption) *DatabaseSource { + s := &DatabaseSource{ + store: store, + key: "default", + refreshInterval: 30 * time.Second, + } + for _, opt := range opts { + opt(s) + } + return s +} + +// Load retrieves the current configuration, returning a cached copy if still +// within the refresh interval. +func (s *DatabaseSource) Load(ctx context.Context) (*WorkflowConfig, error) { + s.mu.RLock() + if s.cached != nil && time.Since(s.cachedAt) < s.refreshInterval { + cfg := s.cached + s.mu.RUnlock() + return cfg, nil + } + s.mu.RUnlock() + return s.refresh(ctx) +} + +func (s *DatabaseSource) refresh(ctx context.Context) (*WorkflowConfig, error) { + s.mu.Lock() + defer s.mu.Unlock() + + // Double-check after acquiring write lock. + if s.cached != nil && time.Since(s.cachedAt) < s.refreshInterval { + return s.cached, nil + } + + data, err := s.store.GetConfigDocument(ctx, s.key) + if err != nil { + return nil, fmt.Errorf("db source: get config %q: %w", s.key, err) + } + + var cfg WorkflowConfig + if err := yaml.Unmarshal(data, &cfg); err != nil { + return nil, fmt.Errorf("db source: parse config %q: %w", s.key, err) + } + + sum := sha256.Sum256(data) + s.cached = &cfg + s.cachedHash = hex.EncodeToString(sum[:]) + s.cachedAt = time.Now() + + return &cfg, nil +} + +// Hash returns the SHA256 hex digest of the stored config bytes. It first +// tries the fast path of fetching the pre-computed hash from the database, +// and falls back to loading the full document if that fails. +func (s *DatabaseSource) Hash(ctx context.Context) (string, error) { + hash, err := s.store.GetConfigDocumentHash(ctx, s.key) + if err == nil { + return hash, nil + } + // Fallback: load and hash. + if _, loadErr := s.Load(ctx); loadErr != nil { + return "", loadErr + } + s.mu.RLock() + defer s.mu.RUnlock() + return s.cachedHash, nil +} + +// Name returns a human-readable identifier for this source. +func (s *DatabaseSource) Name() string { + return fmt.Sprintf("database:%s", s.key) +} diff --git a/config/source_db_poller.go b/config/source_db_poller.go new file mode 100644 index 00000000..d6b8e021 --- /dev/null +++ b/config/source_db_poller.go @@ -0,0 +1,97 @@ +package config + +import ( + "context" + "fmt" + "log/slog" + "sync" + "time" +) + +// DatabasePoller periodically checks a DatabaseSource for config changes. +type DatabasePoller struct { + source *DatabaseSource + interval time.Duration + onChange func(ConfigChangeEvent) + logger *slog.Logger + lastHash string + + done chan struct{} + wg sync.WaitGroup +} + +// NewDatabasePoller creates a DatabasePoller that calls onChange whenever the +// config stored in source changes. +func NewDatabasePoller(source *DatabaseSource, interval time.Duration, onChange func(ConfigChangeEvent), logger *slog.Logger) *DatabasePoller { + return &DatabasePoller{ + source: source, + interval: interval, + onChange: onChange, + logger: logger, + done: make(chan struct{}), + } +} + +// Start fetches the initial hash and launches the background polling goroutine. +func (p *DatabasePoller) Start(ctx context.Context) error { + hash, err := p.source.Hash(ctx) + if err != nil { + return fmt.Errorf("db poller: initial hash: %w", err) + } + p.lastHash = hash + + p.wg.Add(1) + go p.loop(ctx) + return nil +} + +// Stop signals the polling goroutine to exit and waits for it to finish. +func (p *DatabasePoller) Stop() { + close(p.done) + p.wg.Wait() +} + +func (p *DatabasePoller) loop(ctx context.Context) { + defer p.wg.Done() + ticker := time.NewTicker(p.interval) + defer ticker.Stop() + + for { + select { + case <-p.done: + return + case <-ctx.Done(): + return + case <-ticker.C: + p.checkForChanges(ctx) + } + } +} + +func (p *DatabasePoller) checkForChanges(ctx context.Context) { + hash, err := p.source.Hash(ctx) + if err != nil { + p.logger.Error("DB config poll failed", "error", err) + return + } + if hash == p.lastHash { + return + } + + cfg, err := p.source.Load(ctx) + if err != nil { + p.logger.Error("DB config load failed", "error", err) + return + } + + oldHash := p.lastHash + p.lastHash = hash + + p.onChange(ConfigChangeEvent{ + Source: p.source.Name(), + OldHash: oldHash, + NewHash: hash, + Config: cfg, + Time: time.Now(), + }) +} diff --git a/config/source_db_poller_test.go b/config/source_db_poller_test.go new file mode 100644 index 00000000..ad879ca8 --- /dev/null +++ b/config/source_db_poller_test.go @@ -0,0 +1,193 @@ +package config + +import ( + "context" + "log/slog" + "sync" + "sync/atomic" + "testing" + "time" +) + +const altConfigYAML = ` +modules: + - name: cache + type: redis.cache + config: + addr: localhost:6379 +` + +func newTestPoller(store *mockDBStore, interval time.Duration, onChange func(ConfigChangeEvent)) *DatabasePoller { + src := NewDatabaseSource(store, + WithRefreshInterval(0), // disable caching so changes are seen immediately + ) + return NewDatabasePoller(src, interval, onChange, slog.Default()) +} + +func TestDatabasePoller_DetectsChange(t *testing.T) { + store := newMockDBStore() + store.set("default", []byte(testConfigYAML)) + + var ( + mu sync.Mutex + events []ConfigChangeEvent + ) + onChange := func(e ConfigChangeEvent) { + mu.Lock() + events = append(events, e) + mu.Unlock() + } + + poller := newTestPoller(store, 20*time.Millisecond, onChange) + ctx := context.Background() + if err := poller.Start(ctx); err != nil { + t.Fatalf("Start failed: %v", err) + } + defer poller.Stop() + + // Update the store content after a short delay. + time.Sleep(10 * time.Millisecond) + store.set("default", []byte(altConfigYAML)) + + // Wait long enough for at least one poll to detect the change. + deadline := time.Now().Add(500 * time.Millisecond) + for time.Now().Before(deadline) { + mu.Lock() + n := len(events) + mu.Unlock() + if n > 0 { + break + } + time.Sleep(5 * time.Millisecond) + } + + mu.Lock() + defer mu.Unlock() + if len(events) == 0 { + t.Fatal("expected onChange to be called after config change, but it was not") + } + e := events[0] + if e.Source != "database:default" { + t.Errorf("unexpected source: %q", e.Source) + } + if e.OldHash == e.NewHash { + t.Error("expected OldHash != NewHash after change") + } + if e.Config == nil { + t.Error("expected non-nil Config in event") + } +} + +func TestDatabasePoller_SkipsUnchanged(t *testing.T) { + store := newMockDBStore() + store.set("default", []byte(testConfigYAML)) + + var called atomic.Int32 + onChange := func(ConfigChangeEvent) { called.Add(1) } + + poller := newTestPoller(store, 20*time.Millisecond, onChange) + ctx := context.Background() + if err := poller.Start(ctx); err != nil { + t.Fatalf("Start failed: %v", err) + } + + // Let several poll ticks run without changing the config. + time.Sleep(100 * time.Millisecond) + poller.Stop() + + if n := called.Load(); n != 0 { + t.Errorf("onChange called %d times for unchanged config, expected 0", n) + } +} + +func TestDatabasePoller_Stop(t *testing.T) { + store := newMockDBStore() + store.set("default", []byte(testConfigYAML)) + + poller := newTestPoller(store, 10*time.Millisecond, func(ConfigChangeEvent) {}) + ctx := context.Background() + if err := poller.Start(ctx); err != nil { + t.Fatalf("Start failed: %v", err) + } + + // Stop should return promptly (within a reasonable timeout). + done := make(chan struct{}) + go func() { + poller.Stop() + close(done) + }() + + select { + case <-done: + // success + case <-time.After(2 * time.Second): + t.Fatal("Stop did not return within 2 seconds") + } +} + +func TestDatabasePoller_ContextCancel(t *testing.T) { + store := newMockDBStore() + store.set("default", []byte(testConfigYAML)) + + poller := newTestPoller(store, 10*time.Millisecond, func(ConfigChangeEvent) {}) + ctx, cancel := context.WithCancel(context.Background()) + if err := poller.Start(ctx); err != nil { + t.Fatalf("Start failed: %v", err) + } + + cancel() + + // After context cancellation the goroutine should stop; Stop() must still + // be safe to call (close(done) only once). + done := make(chan struct{}) + go func() { + poller.Stop() + close(done) + }() + + select { + case <-done: + // success + case <-time.After(2 * time.Second): + t.Fatal("Stop did not return within 2 seconds after context cancel") + } +} + +func TestDatabasePoller_MultipleChanges(t *testing.T) { + store := newMockDBStore() + store.set("default", []byte(testConfigYAML)) + + var ( + mu sync.Mutex + events []ConfigChangeEvent + ) + onChange := func(e ConfigChangeEvent) { + mu.Lock() + events = append(events, e) + mu.Unlock() + } + + poller := newTestPoller(store, 20*time.Millisecond, onChange) + ctx := context.Background() + if err := poller.Start(ctx); err != nil { + t.Fatalf("Start failed: %v", err) + } + defer poller.Stop() + + // Apply two successive changes. + configs := []string{altConfigYAML, testConfigYAML} + for _, c := range configs { + time.Sleep(40 * time.Millisecond) + store.set("default", []byte(c)) + // Wait for detection. + time.Sleep(60 * time.Millisecond) + } + + mu.Lock() + n := len(events) + mu.Unlock() + + if n < 2 { + t.Errorf("expected at least 2 change events, got %d", n) + } +} diff --git a/config/source_db_test.go b/config/source_db_test.go new file mode 100644 index 00000000..b4fc762f --- /dev/null +++ b/config/source_db_test.go @@ -0,0 +1,228 @@ +package config + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "sync" + "testing" + "time" +) + +// mockDBStore is an in-memory implementation of DBConfigStore for testing. +type mockDBStore struct { + mu sync.Mutex + docs map[string][]byte + calls int // tracks number of GetConfigDocument calls +} + +func newMockDBStore() *mockDBStore { + return &mockDBStore{docs: make(map[string][]byte)} +} + +func (m *mockDBStore) GetConfigDocument(ctx context.Context, key string) ([]byte, error) { + m.mu.Lock() + defer m.mu.Unlock() + m.calls++ + data, ok := m.docs[key] + if !ok { + return nil, fmt.Errorf("not found: %s", key) + } + return data, nil +} + +func (m *mockDBStore) GetConfigDocumentHash(ctx context.Context, key string) (string, error) { + m.mu.Lock() + defer m.mu.Unlock() + data, ok := m.docs[key] + if !ok { + return "", fmt.Errorf("not found: %s", key) + } + sum := sha256.Sum256(data) + return hex.EncodeToString(sum[:]), nil +} + +func (m *mockDBStore) PutConfigDocument(ctx context.Context, key string, data []byte) error { + m.mu.Lock() + defer m.mu.Unlock() + m.docs[key] = data + return nil +} + +func (m *mockDBStore) set(key string, data []byte) { + m.mu.Lock() + defer m.mu.Unlock() + m.docs[key] = data +} + +func (m *mockDBStore) getCalls() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.calls +} + +const testConfigYAML = ` +modules: + - name: server + type: http.server + config: + port: 8080 + - name: router + type: http.router +` + +func TestDatabaseSource_Load(t *testing.T) { + store := newMockDBStore() + store.set("default", []byte(testConfigYAML)) + + src := NewDatabaseSource(store) + cfg, err := src.Load(context.Background()) + if err != nil { + t.Fatalf("Load failed: %v", err) + } + if len(cfg.Modules) != 2 { + t.Errorf("expected 2 modules, got %d", len(cfg.Modules)) + } + if cfg.Modules[0].Name != "server" { + t.Errorf("expected module name 'server', got %q", cfg.Modules[0].Name) + } + if cfg.Modules[1].Name != "router" { + t.Errorf("expected module name 'router', got %q", cfg.Modules[1].Name) + } +} + +func TestDatabaseSource_Load_CustomKey(t *testing.T) { + store := newMockDBStore() + store.set("prod", []byte(testConfigYAML)) + + src := NewDatabaseSource(store, WithConfigKey("prod")) + cfg, err := src.Load(context.Background()) + if err != nil { + t.Fatalf("Load failed: %v", err) + } + if len(cfg.Modules) != 2 { + t.Errorf("expected 2 modules, got %d", len(cfg.Modules)) + } +} + +func TestDatabaseSource_CacheHit(t *testing.T) { + store := newMockDBStore() + store.set("default", []byte(testConfigYAML)) + + // Long refresh interval so the cache never expires during the test. + src := NewDatabaseSource(store, WithRefreshInterval(10*time.Minute)) + + ctx := context.Background() + + _, err := src.Load(ctx) + if err != nil { + t.Fatalf("first Load failed: %v", err) + } + callsAfterFirst := store.getCalls() + + _, err = src.Load(ctx) + if err != nil { + t.Fatalf("second Load failed: %v", err) + } + callsAfterSecond := store.getCalls() + + if callsAfterFirst != 1 { + t.Errorf("expected 1 DB call after first load, got %d", callsAfterFirst) + } + if callsAfterSecond != callsAfterFirst { + t.Errorf("expected no additional DB calls on cache hit, got %d total calls", callsAfterSecond) + } +} + +func TestDatabaseSource_CacheExpiry(t *testing.T) { + store := newMockDBStore() + store.set("default", []byte(testConfigYAML)) + + // Very short refresh interval so the cache expires quickly. + src := NewDatabaseSource(store, WithRefreshInterval(10*time.Millisecond)) + + ctx := context.Background() + + _, err := src.Load(ctx) + if err != nil { + t.Fatalf("first Load failed: %v", err) + } + callsAfterFirst := store.getCalls() + + // Wait for cache to expire. + time.Sleep(20 * time.Millisecond) + + _, err = src.Load(ctx) + if err != nil { + t.Fatalf("second Load failed: %v", err) + } + callsAfterSecond := store.getCalls() + + if callsAfterFirst != 1 { + t.Errorf("expected 1 DB call after first load, got %d", callsAfterFirst) + } + if callsAfterSecond != 2 { + t.Errorf("expected 2 DB calls total after cache expiry, got %d", callsAfterSecond) + } +} + +func TestDatabaseSource_Hash(t *testing.T) { + store := newMockDBStore() + data := []byte(testConfigYAML) + store.set("default", data) + + src := NewDatabaseSource(store) + ctx := context.Background() + + hash, err := src.Hash(ctx) + if err != nil { + t.Fatalf("Hash failed: %v", err) + } + if hash == "" { + t.Fatal("expected non-empty hash") + } + + // Verify hash is consistent with the raw bytes. + sum := sha256.Sum256(data) + expected := hex.EncodeToString(sum[:]) + if hash != expected { + t.Errorf("hash mismatch: got %q, want %q", hash, expected) + } + + // Calling Hash again should return the same value. + hash2, err := src.Hash(ctx) + if err != nil { + t.Fatalf("second Hash failed: %v", err) + } + if hash != hash2 { + t.Errorf("hash not stable: got %q then %q", hash, hash2) + } +} + +func TestDatabaseSource_Name(t *testing.T) { + store := newMockDBStore() + src := NewDatabaseSource(store) + if src.Name() != "database:default" { + t.Errorf("unexpected name: %q", src.Name()) + } + + src2 := NewDatabaseSource(store, WithConfigKey("prod")) + if src2.Name() != "database:prod" { + t.Errorf("unexpected name: %q", src2.Name()) + } +} + +func TestDatabaseSource_Load_NotFound(t *testing.T) { + store := newMockDBStore() + src := NewDatabaseSource(store) + _, err := src.Load(context.Background()) + if err == nil { + t.Fatal("expected error for missing key") + } +} + +func TestDatabaseSource_ImplementsConfigSource(t *testing.T) { + store := newMockDBStore() + var _ ConfigSource = NewDatabaseSource(store) +} diff --git a/config/source_file.go b/config/source_file.go new file mode 100644 index 00000000..fe89d6f6 --- /dev/null +++ b/config/source_file.go @@ -0,0 +1,54 @@ +package config + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "os" +) + +// FileSource loads config from a YAML file on disk. +type FileSource struct { + path string +} + +// NewFileSource creates a FileSource that reads from the given path. +func NewFileSource(path string) *FileSource { + return &FileSource{path: path} +} + +// Load reads the config file and returns a parsed WorkflowConfig. +// Supports both ApplicationConfig (multi-workflow) and WorkflowConfig formats. +func (s *FileSource) Load(_ context.Context) (*WorkflowConfig, error) { + data, err := os.ReadFile(s.path) + if err != nil { + return nil, fmt.Errorf("file source: read %s: %w", s.path, err) + } + if IsApplicationConfig(data) { + appCfg, err := LoadApplicationConfig(s.path) + if err != nil { + return nil, fmt.Errorf("file source: load application config: %w", err) + } + return MergeApplicationConfig(appCfg) + } + return LoadFromFile(s.path) +} + +// Hash returns the SHA256 hex digest of the raw file bytes. +func (s *FileSource) Hash(_ context.Context) (string, error) { + data, err := os.ReadFile(s.path) + if err != nil { + return "", fmt.Errorf("file source: read %s: %w", s.path, err) + } + sum := sha256.Sum256(data) + return hex.EncodeToString(sum[:]), nil +} + +// Name returns a human-readable identifier for this source. +func (s *FileSource) Name() string { + return "file:" + s.path +} + +// Path returns the filesystem path this source reads from. +func (s *FileSource) Path() string { return s.path } diff --git a/config/source_file_test.go b/config/source_file_test.go new file mode 100644 index 00000000..e8f14632 --- /dev/null +++ b/config/source_file_test.go @@ -0,0 +1,138 @@ +package config + +import ( + "context" + "os" + "path/filepath" + "strings" + "testing" +) + +const testWorkflowYAML = ` +modules: + - name: test-server + type: http.server + config: + port: 8080 + - name: test-router + type: http.router + dependsOn: + - test-server +workflows: + test-flow: + initial: start +triggers: + test-trigger: + type: http +` + +func TestFileSource_Load(t *testing.T) { + dir := t.TempDir() + fp := filepath.Join(dir, "config.yaml") + if err := os.WriteFile(fp, []byte(testWorkflowYAML), 0644); err != nil { + t.Fatalf("write temp file: %v", err) + } + + src := NewFileSource(fp) + cfg, err := src.Load(context.Background()) + if err != nil { + t.Fatalf("Load() error: %v", err) + } + + if len(cfg.Modules) != 2 { + t.Fatalf("expected 2 modules, got %d", len(cfg.Modules)) + } + if cfg.Modules[0].Name != "test-server" { + t.Errorf("expected module name 'test-server', got %q", cfg.Modules[0].Name) + } + if cfg.Modules[0].Type != "http.server" { + t.Errorf("expected module type 'http.server', got %q", cfg.Modules[0].Type) + } + if cfg.Modules[1].Name != "test-router" { + t.Errorf("expected module name 'test-router', got %q", cfg.Modules[1].Name) + } + if cfg.Workflows["test-flow"] == nil { + t.Error("expected 'test-flow' in workflows") + } + if cfg.Triggers["test-trigger"] == nil { + t.Error("expected 'test-trigger' in triggers") + } +} + +func TestFileSource_Load_NonExistent(t *testing.T) { + src := NewFileSource("/nonexistent/path/config.yaml") + _, err := src.Load(context.Background()) + if err == nil { + t.Fatal("expected error for non-existent file") + } +} + +func TestFileSource_Hash(t *testing.T) { + dir := t.TempDir() + fp := filepath.Join(dir, "config.yaml") + if err := os.WriteFile(fp, []byte(testWorkflowYAML), 0644); err != nil { + t.Fatalf("write temp file: %v", err) + } + + src := NewFileSource(fp) + ctx := context.Background() + + h1, err := src.Hash(ctx) + if err != nil { + t.Fatalf("Hash() error: %v", err) + } + if h1 == "" { + t.Fatal("expected non-empty hash") + } + + // Same file produces same hash. + h2, err := src.Hash(ctx) + if err != nil { + t.Fatalf("Hash() second call error: %v", err) + } + if h1 != h2 { + t.Errorf("expected identical hashes for same content, got %q and %q", h1, h2) + } + + // Different content produces different hash. + fp2 := filepath.Join(dir, "config2.yaml") + if err := os.WriteFile(fp2, []byte(testWorkflowYAML+"\n# extra comment\n"), 0644); err != nil { + t.Fatalf("write second temp file: %v", err) + } + src2 := NewFileSource(fp2) + h3, err := src2.Hash(ctx) + if err != nil { + t.Fatalf("Hash() for second file error: %v", err) + } + if h1 == h3 { + t.Errorf("expected different hashes for different content") + } +} + +func TestFileSource_Hash_NonExistent(t *testing.T) { + src := NewFileSource("/nonexistent/path/config.yaml") + _, err := src.Hash(context.Background()) + if err == nil { + t.Fatal("expected error for non-existent file") + } +} + +func TestFileSource_Name(t *testing.T) { + path := "/some/path/to/config.yaml" + src := NewFileSource(path) + name := src.Name() + if !strings.HasPrefix(name, "file:") { + t.Errorf("expected name to start with 'file:', got %q", name) + } + if !strings.Contains(name, path) { + t.Errorf("expected name to contain path %q, got %q", path, name) + } +} + +func TestFileSource_Path(t *testing.T) { + path := "/some/path/to/config.yaml" + src := NewFileSource(path) + if src.Path() != path { + t.Errorf("expected path %q, got %q", path, src.Path()) + } +} diff --git a/config/watcher.go b/config/watcher.go new file mode 100644 index 00000000..5482bc00 --- /dev/null +++ b/config/watcher.go @@ -0,0 +1,198 @@ +package config + +import ( + "context" + "fmt" + "log/slog" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/fsnotify/fsnotify" +) + +// WatcherOption configures a ConfigWatcher. +type WatcherOption func(*ConfigWatcher) + +// WithWatchDebounce sets the debounce duration for file change events. +func WithWatchDebounce(d time.Duration) WatcherOption { + return func(w *ConfigWatcher) { w.debounce = d } +} + +// WithWatchLogger sets the logger for the watcher. +func WithWatchLogger(l *slog.Logger) WatcherOption { + return func(w *ConfigWatcher) { w.logger = l } +} + +// ConfigWatcher monitors a config file for changes and invokes a callback. +// It watches the directory containing the file for atomic-save compatibility. +type ConfigWatcher struct { + source *FileSource + debounce time.Duration + logger *slog.Logger + onChange func(ConfigChangeEvent) + + fsWatcher *fsnotify.Watcher + done chan struct{} + wg sync.WaitGroup + lastHash string + + mu sync.Mutex + pending map[string]time.Time // path -> last event time +} + +// NewConfigWatcher creates a ConfigWatcher for the given FileSource. +// onChange is called with a ConfigChangeEvent whenever the config changes. +func NewConfigWatcher(source *FileSource, onChange func(ConfigChangeEvent), opts ...WatcherOption) *ConfigWatcher { + w := &ConfigWatcher{ + source: source, + debounce: 500 * time.Millisecond, + logger: slog.Default(), + onChange: onChange, + done: make(chan struct{}), + pending: make(map[string]time.Time), + } + for _, opt := range opts { + opt(w) + } + return w +} + +// Start begins watching the config file's directory for changes. +func (w *ConfigWatcher) Start() error { + ctx := context.Background() + hash, err := w.source.Hash(ctx) + if err != nil { + return fmt.Errorf("config watcher: initial hash: %w", err) + } + w.lastHash = hash + + fsw, err := fsnotify.NewWatcher() + if err != nil { + return fmt.Errorf("config watcher: create fsnotify: %w", err) + } + w.fsWatcher = fsw + + // Watch the directory so we catch atomic saves (rename-over). + dir := filepath.Dir(w.source.Path()) + if err := fsw.Add(dir); err != nil { + _ = fsw.Close() + return fmt.Errorf("config watcher: watch %s: %w", dir, err) + } + + w.wg.Add(1) + go w.loop() + return nil +} + +// Stop terminates the watcher and waits for the background goroutine to exit. +func (w *ConfigWatcher) Stop() error { + close(w.done) + w.wg.Wait() + if w.fsWatcher != nil { + return w.fsWatcher.Close() + } + return nil +} + +func (w *ConfigWatcher) loop() { + defer w.wg.Done() + + ticker := time.NewTicker(w.debounce) + defer ticker.Stop() + + for { + select { + case <-w.done: + return + + case event, ok := <-w.fsWatcher.Events: + if !ok { + return + } + if !isYAMLFile(event.Name) { + continue + } + if event.Op&(fsnotify.Write|fsnotify.Create) != 0 { + w.mu.Lock() + w.pending[event.Name] = time.Now() + w.mu.Unlock() + } + + case err, ok := <-w.fsWatcher.Errors: + if !ok { + return + } + w.logger.Error("config watcher error", "err", err) + + case <-ticker.C: + w.processPending() + } + } +} + +func (w *ConfigWatcher) processPending() { + w.mu.Lock() + now := time.Now() + var ready []string + for path, t := range w.pending { + if now.Sub(t) >= w.debounce { + ready = append(ready, path) + } + } + for _, path := range ready { + delete(w.pending, path) + } + w.mu.Unlock() + + for _, path := range ready { + w.processChange(path) + } +} + +// processChange loads the config, computes its hash, and calls onChange if +// the content has actually changed since the last known hash. +func (w *ConfigWatcher) processChange(path string) { + // Only react to events for the file we care about. + if filepath.Clean(path) != filepath.Clean(w.source.Path()) { + return + } + + ctx := context.Background() + + cfg, err := w.source.Load(ctx) + if err != nil { + w.logger.Error("config watcher: failed to load config", "path", path, "err", err) + return + } + + newHash, err := w.source.Hash(ctx) + if err != nil { + w.logger.Error("config watcher: failed to hash config", "path", path, "err", err) + return + } + + if newHash == w.lastHash { + w.logger.Debug("config watcher: content unchanged, skipping", "path", path) + return + } + + oldHash := w.lastHash + w.lastHash = newHash + + w.logger.Info("config changed", "path", path, "old_hash", oldHash[:8], "new_hash", newHash[:8]) + + w.onChange(ConfigChangeEvent{ + Source: w.source.Name(), + OldHash: oldHash, + NewHash: newHash, + Config: cfg, + Time: time.Now(), + }) +} + +func isYAMLFile(name string) bool { + ext := strings.ToLower(filepath.Ext(name)) + return ext == ".yaml" || ext == ".yml" +} diff --git a/config/watcher_test.go b/config/watcher_test.go new file mode 100644 index 00000000..3d572537 --- /dev/null +++ b/config/watcher_test.go @@ -0,0 +1,215 @@ +package config + +import ( + "context" + "os" + "path/filepath" + "sync" + "sync/atomic" + "testing" + "time" +) + +const watcherTestYAML = ` +modules: + - name: watcher-server + type: http.server + config: + port: 8080 +` + +const watcherTestYAMLv2 = ` +modules: + - name: watcher-server + type: http.server + config: + port: 9090 +` + +func TestConfigWatcher_DetectsChange(t *testing.T) { + dir := t.TempDir() + fp := filepath.Join(dir, "config.yaml") + + if err := os.WriteFile(fp, []byte(watcherTestYAML), 0644); err != nil { + t.Fatalf("write initial config: %v", err) + } + + var called atomic.Int32 + var mu sync.Mutex + var lastEvt ConfigChangeEvent + + src := NewFileSource(fp) + w := NewConfigWatcher(src, func(evt ConfigChangeEvent) { + mu.Lock() + lastEvt = evt + mu.Unlock() + called.Add(1) + }, WithWatchDebounce(50*time.Millisecond)) + + if err := w.Start(); err != nil { + t.Fatalf("Start() error: %v", err) + } + t.Cleanup(func() { _ = w.Stop() }) + + // Modify the file. + time.Sleep(100 * time.Millisecond) + if err := os.WriteFile(fp, []byte(watcherTestYAMLv2), 0644); err != nil { + t.Fatalf("write updated config: %v", err) + } + + // Wait for debounce + processing. + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if called.Load() > 0 { + break + } + time.Sleep(20 * time.Millisecond) + } + + if called.Load() == 0 { + t.Fatal("onChange was not called after file modification") + } + + mu.Lock() + evt := lastEvt + mu.Unlock() + + if evt.Config == nil { + t.Fatal("onChange event has nil Config") + } + if evt.NewHash == "" || evt.OldHash == "" { + t.Errorf("expected non-empty hashes, got old=%q new=%q", evt.OldHash, evt.NewHash) + } + if evt.OldHash == evt.NewHash { + t.Error("expected old and new hashes to differ") + } +} + +func TestConfigWatcher_DebounceMultipleWrites(t *testing.T) { + dir := t.TempDir() + fp := filepath.Join(dir, "config.yaml") + + if err := os.WriteFile(fp, []byte(watcherTestYAML), 0644); err != nil { + t.Fatalf("write initial config: %v", err) + } + + var called atomic.Int32 + + src := NewFileSource(fp) + w := NewConfigWatcher(src, func(evt ConfigChangeEvent) { + called.Add(1) + }, WithWatchDebounce(200*time.Millisecond)) + + if err := w.Start(); err != nil { + t.Fatalf("Start() error: %v", err) + } + t.Cleanup(func() { _ = w.Stop() }) + + time.Sleep(50 * time.Millisecond) + + // Rapid succession of writes — all within the debounce window. + for i := 0; i < 5; i++ { + content := watcherTestYAMLv2 + if err := os.WriteFile(fp, []byte(content), 0644); err != nil { + t.Fatalf("write %d: %v", i, err) + } + time.Sleep(10 * time.Millisecond) + } + + // Wait for debounce to fire once. + time.Sleep(600 * time.Millisecond) + + count := called.Load() + if count == 0 { + t.Fatal("expected at least one onChange call") + } + // Due to debounce, we expect far fewer calls than writes. + // The debounce period is 200ms and we wait 10ms between writes, + // so it's valid to have 1–2 calls but not 5. + if count > 3 { + t.Errorf("expected debounce to reduce calls (got %d, expected ≤3)", count) + } +} + +func TestConfigWatcher_SkipUnchangedContent(t *testing.T) { + dir := t.TempDir() + fp := filepath.Join(dir, "config.yaml") + + if err := os.WriteFile(fp, []byte(watcherTestYAML), 0644); err != nil { + t.Fatalf("write initial config: %v", err) + } + + var called atomic.Int32 + + src := NewFileSource(fp) + w := NewConfigWatcher(src, func(evt ConfigChangeEvent) { + called.Add(1) + }, WithWatchDebounce(50*time.Millisecond)) + + if err := w.Start(); err != nil { + t.Fatalf("Start() error: %v", err) + } + t.Cleanup(func() { _ = w.Stop() }) + + time.Sleep(100 * time.Millisecond) + + // Rewrite the exact same content. + if err := os.WriteFile(fp, []byte(watcherTestYAML), 0644); err != nil { + t.Fatalf("rewrite same content: %v", err) + } + + // Wait long enough for debounce to fire. + time.Sleep(300 * time.Millisecond) + + if called.Load() != 0 { + t.Errorf("expected onChange NOT to be called for unchanged content, got %d calls", called.Load()) + } +} + +func TestConfigWatcher_StopCleanup(t *testing.T) { + dir := t.TempDir() + fp := filepath.Join(dir, "config.yaml") + + if err := os.WriteFile(fp, []byte(watcherTestYAML), 0644); err != nil { + t.Fatalf("write initial config: %v", err) + } + + src := NewFileSource(fp) + w := NewConfigWatcher(src, func(evt ConfigChangeEvent) {}, WithWatchDebounce(50*time.Millisecond)) + + if err := w.Start(); err != nil { + t.Fatalf("Start() error: %v", err) + } + + // Stop should return quickly and cleanly. + done := make(chan error, 1) + go func() { done <- w.Stop() }() + + select { + case err := <-done: + if err != nil { + t.Errorf("Stop() returned error: %v", err) + } + case <-time.After(2 * time.Second): + t.Fatal("Stop() timed out — possible goroutine leak") + } +} + +func TestConfigWatcher_Hash_InitialLoad(t *testing.T) { + dir := t.TempDir() + fp := filepath.Join(dir, "config.yaml") + + if err := os.WriteFile(fp, []byte(watcherTestYAML), 0644); err != nil { + t.Fatalf("write initial config: %v", err) + } + + src := NewFileSource(fp) + ctx := context.Background() + h, err := src.Hash(ctx) + if err != nil { + t.Fatalf("Hash() error: %v", err) + } + if h == "" { + t.Fatal("expected non-empty hash") + } +} diff --git a/engine.go b/engine.go index af98fbfc..cb55e653 100644 --- a/engine.go +++ b/engine.go @@ -917,6 +917,37 @@ func (e *StdEngine) GetApp() modular.Application { return e.app } +// ReconfigureModules attempts to reconfigure only the specified modules +// without a full engine restart. Returns modules that could not be reconfigured. +func (e *StdEngine) ReconfigureModules(ctx context.Context, changes []config.ModuleConfigChange) ([]string, error) { + var failedModules []string + + for _, change := range changes { + mod := e.app.GetModule(change.Name) + if mod == nil { + e.logger.Warn(fmt.Sprintf("Module %q not found for reconfiguration", change.Name)) + failedModules = append(failedModules, change.Name) + continue + } + + reconf, ok := mod.(interfaces.Reconfigurable) + if !ok { + e.logger.Info(fmt.Sprintf("Module %q does not support runtime reconfiguration", change.Name)) + failedModules = append(failedModules, change.Name) + continue + } + + if err := reconf.Reconfigure(ctx, change.NewConfig); err != nil { + e.logger.Error(fmt.Sprintf("Failed to reconfigure module %q: %v", change.Name, err)) + failedModules = append(failedModules, change.Name) + } else { + e.logger.Info(fmt.Sprintf("Reconfigured module %q in-place", change.Name)) + } + } + + return failedModules, nil +} + // GetPipeline returns the named pipeline from the engine's pipeline registry. // Returns nil and false if no pipeline with the given name exists. // This is useful for CLI tools (e.g., wfctl pipeline run) that need to diff --git a/engine_reconfig_test.go b/engine_reconfig_test.go new file mode 100644 index 00000000..4bf17da2 --- /dev/null +++ b/engine_reconfig_test.go @@ -0,0 +1,315 @@ +package workflow + +import ( + "context" + "fmt" + "testing" + + "github.com/CrisisTextLine/modular" + "github.com/GoCodeAlone/workflow/config" +) + +// mockReconfigurableModule is a test module that supports reconfiguration. +type mockReconfigurableModule struct { + name string + config map[string]any + calls int + failOn string // if set, Reconfigure returns error when name matches +} + +func (m *mockReconfigurableModule) Name() string { return m.name } + +func (m *mockReconfigurableModule) Init(app modular.Application) error { return nil } + +func (m *mockReconfigurableModule) Reconfigure(_ context.Context, cfg map[string]any) error { + if m.failOn != "" && m.name == m.failOn { + return fmt.Errorf("reconfigure failed for %s", m.name) + } + m.config = cfg + m.calls++ + return nil +} + +// mockNonReconfigurableModule is a test module that does NOT support reconfiguration. +type mockNonReconfigurableModule struct { + name string +} + +func (m *mockNonReconfigurableModule) Name() string { return m.name } + +func (m *mockNonReconfigurableModule) Init(app modular.Application) error { return nil } + +// TestReconfigureModules_Success tests successful reconfiguration of a single module. +func TestReconfigureModules_Success(t *testing.T) { + app := newMockApplication() + engine := NewStdEngine(app, app.Logger()) + + // Register a reconfigurable module + mod := &mockReconfigurableModule{ + name: "test-module", + config: map[string]any{"version": "v1"}, + } + app.RegisterModule(mod) + + ctx := context.Background() + newConfig := map[string]any{"version": "v2", "timeout": 30} + + changes := []config.ModuleConfigChange{ + { + Name: "test-module", + OldConfig: map[string]any{"version": "v1"}, + NewConfig: newConfig, + }, + } + + failed, err := engine.ReconfigureModules(ctx, changes) + + if err != nil { + t.Fatalf("ReconfigureModules returned error: %v", err) + } + + if len(failed) != 0 { + t.Fatalf("Expected 0 failed modules, got %d: %v", len(failed), failed) + } + + if mod.calls != 1 { + t.Fatalf("Expected Reconfigure to be called once, got %d", mod.calls) + } + + if mod.config["version"] != "v2" { + t.Fatalf("Expected config version to be v2, got %v", mod.config["version"]) + } + + if mod.config["timeout"] != 30 { + t.Fatalf("Expected timeout to be 30, got %v", mod.config["timeout"]) + } +} + +// TestReconfigureModules_NotReconfigurable tests that non-reconfigurable modules +// are reported in failedModules. +func TestReconfigureModules_NotReconfigurable(t *testing.T) { + app := newMockApplication() + engine := NewStdEngine(app, app.Logger()) + + // Register a non-reconfigurable module + mod := &mockNonReconfigurableModule{name: "static-module"} + app.RegisterModule(mod) + + ctx := context.Background() + changes := []config.ModuleConfigChange{ + { + Name: "static-module", + OldConfig: map[string]any{}, + NewConfig: map[string]any{"key": "value"}, + }, + } + + failed, err := engine.ReconfigureModules(ctx, changes) + + if err != nil { + t.Fatalf("ReconfigureModules returned error: %v", err) + } + + if len(failed) != 1 { + t.Fatalf("Expected 1 failed module, got %d: %v", len(failed), failed) + } + + if failed[0] != "static-module" { + t.Fatalf("Expected failed module to be 'static-module', got %s", failed[0]) + } +} + +// TestReconfigureModules_ModuleNotFound tests that non-existent modules +// are reported in failedModules. +func TestReconfigureModules_ModuleNotFound(t *testing.T) { + app := newMockApplication() + engine := NewStdEngine(app, app.Logger()) + + ctx := context.Background() + changes := []config.ModuleConfigChange{ + { + Name: "nonexistent-module", + OldConfig: map[string]any{}, + NewConfig: map[string]any{"key": "value"}, + }, + } + + failed, err := engine.ReconfigureModules(ctx, changes) + + if err != nil { + t.Fatalf("ReconfigureModules returned error: %v", err) + } + + if len(failed) != 1 { + t.Fatalf("Expected 1 failed module, got %d: %v", len(failed), failed) + } + + if failed[0] != "nonexistent-module" { + t.Fatalf("Expected failed module to be 'nonexistent-module', got %s", failed[0]) + } +} + +// TestReconfigureModules_ReconfigureFails tests that when Reconfigure returns +// an error, the module is added to failedModules. +func TestReconfigureModules_ReconfigureFails(t *testing.T) { + app := newMockApplication() + engine := NewStdEngine(app, app.Logger()) + + // Register a module that fails on reconfigure + mod := &mockReconfigurableModule{ + name: "failing-module", + config: map[string]any{}, + failOn: "failing-module", + } + app.RegisterModule(mod) + + ctx := context.Background() + changes := []config.ModuleConfigChange{ + { + Name: "failing-module", + OldConfig: map[string]any{}, + NewConfig: map[string]any{"key": "value"}, + }, + } + + failed, err := engine.ReconfigureModules(ctx, changes) + + if err != nil { + t.Fatalf("ReconfigureModules returned error: %v", err) + } + + if len(failed) != 1 { + t.Fatalf("Expected 1 failed module, got %d: %v", len(failed), failed) + } + + if failed[0] != "failing-module" { + t.Fatalf("Expected failed module to be 'failing-module', got %s", failed[0]) + } +} + +// TestReconfigureModules_MultipleModules tests reconfiguration of multiple +// modules with different outcomes (success, not found, not reconfigurable, error). +func TestReconfigureModules_MultipleModules(t *testing.T) { + app := newMockApplication() + engine := NewStdEngine(app, app.Logger()) + + // Register modules + reconfigMod := &mockReconfigurableModule{ + name: "reconfig-module", + config: map[string]any{}, + } + nonReconfigMod := &mockNonReconfigurableModule{name: "static-module"} + failingMod := &mockReconfigurableModule{ + name: "failing-module", + config: map[string]any{}, + failOn: "failing-module", + } + + app.RegisterModule(reconfigMod) + app.RegisterModule(nonReconfigMod) + app.RegisterModule(failingMod) + + ctx := context.Background() + changes := []config.ModuleConfigChange{ + { + Name: "reconfig-module", + OldConfig: map[string]any{}, + NewConfig: map[string]any{"key": "value1"}, + }, + { + Name: "static-module", + OldConfig: map[string]any{}, + NewConfig: map[string]any{"key": "value2"}, + }, + { + Name: "failing-module", + OldConfig: map[string]any{}, + NewConfig: map[string]any{"key": "value3"}, + }, + { + Name: "nonexistent-module", + OldConfig: map[string]any{}, + NewConfig: map[string]any{"key": "value4"}, + }, + } + + failed, err := engine.ReconfigureModules(ctx, changes) + + if err != nil { + t.Fatalf("ReconfigureModules returned error: %v", err) + } + + // Should have 3 failed modules: static-module (not reconfigurable), + // failing-module (error), and nonexistent-module (not found) + if len(failed) != 3 { + t.Fatalf("Expected 3 failed modules, got %d: %v", len(failed), failed) + } + + // Verify the successful module was reconfigured + if reconfigMod.calls != 1 { + t.Fatalf("Expected reconfig-module Reconfigure to be called once, got %d", reconfigMod.calls) + } + if reconfigMod.config["key"] != "value1" { + t.Fatalf("Expected reconfig-module config key to be value1, got %v", reconfigMod.config["key"]) + } +} + +// TestReconfigureModules_EmptyChanges tests that empty changes list returns +// success with no failed modules. +func TestReconfigureModules_EmptyChanges(t *testing.T) { + app := newMockApplication() + engine := NewStdEngine(app, app.Logger()) + + ctx := context.Background() + changes := []config.ModuleConfigChange{} + + failed, err := engine.ReconfigureModules(ctx, changes) + + if err != nil { + t.Fatalf("ReconfigureModules returned error: %v", err) + } + + if len(failed) != 0 { + t.Fatalf("Expected 0 failed modules, got %d: %v", len(failed), failed) + } +} + +// TestReconfigureModules_ContextCancellation tests that context cancellation +// is respected during reconfiguration. +func TestReconfigureModules_ContextCancellation(t *testing.T) { + app := newMockApplication() + engine := NewStdEngine(app, app.Logger()) + + // Register a module with a slow reconfigure + mod := &mockReconfigurableModule{ + name: "test-module", + config: map[string]any{}, + } + app.RegisterModule(mod) + + // Create a cancelled context + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + changes := []config.ModuleConfigChange{ + { + Name: "test-module", + OldConfig: map[string]any{}, + NewConfig: map[string]any{"key": "value"}, + }, + } + + // This should still work - our mock doesn't check context cancellation, + // but in a real scenario with a context-aware Reconfigure, it would respect it + failed, err := engine.ReconfigureModules(ctx, changes) + + if err != nil { + t.Fatalf("ReconfigureModules returned error: %v", err) + } + + // Even with cancelled context, the module should be reconfigured + // (our mock implementation doesn't check context) + if len(failed) != 0 { + t.Fatalf("Expected 0 failed modules, got %d: %v", len(failed), failed) + } +} diff --git a/interfaces/reconfigurable.go b/interfaces/reconfigurable.go new file mode 100644 index 00000000..dea4be60 --- /dev/null +++ b/interfaces/reconfigurable.go @@ -0,0 +1,18 @@ +package interfaces + +import "context" + +// Reconfigurable is optionally implemented by modules that support +// runtime reconfiguration without requiring a full engine restart. +// When a config change affects only modules implementing this interface, +// the engine can perform a surgical update instead of a full stop/rebuild/start. +type Reconfigurable interface { + // Reconfigure applies new configuration to a running module. + // The module should: + // 1. Validate the new config + // 2. Gracefully drain in-flight work + // 3. Apply the new configuration + // 4. Resume accepting new work + // Returns an error if the new config is invalid or cannot be applied. + Reconfigure(ctx context.Context, newConfig map[string]any) error +} diff --git a/store/migrations/010_config_documents.sql b/store/migrations/010_config_documents.sql new file mode 100644 index 00000000..dcb82ea4 --- /dev/null +++ b/store/migrations/010_config_documents.sql @@ -0,0 +1,23 @@ +CREATE TABLE IF NOT EXISTS config_documents ( + key TEXT PRIMARY KEY, + data BYTEA NOT NULL, + hash TEXT NOT NULL, + version INTEGER NOT NULL DEFAULT 1, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_by TEXT NOT NULL DEFAULT 'system' +); + +CREATE INDEX IF NOT EXISTS idx_config_documents_updated_at ON config_documents(updated_at); + +CREATE TABLE IF NOT EXISTS config_document_history ( + id BIGSERIAL PRIMARY KEY, + key TEXT NOT NULL, + data BYTEA NOT NULL, + hash TEXT NOT NULL, + version INTEGER NOT NULL, + changed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + changed_by TEXT NOT NULL DEFAULT 'system' +); + +CREATE INDEX IF NOT EXISTS idx_config_history_key ON config_document_history(key, version); diff --git a/store/pg.go b/store/pg.go index 5661dc00..c2ebe954 100644 --- a/store/pg.go +++ b/store/pg.go @@ -30,6 +30,7 @@ type PGStore struct { logs *PGLogStore audit *PGAuditStore iam *PGIAMStore + configDocs *PGConfigStore } // NewPGStore connects to PostgreSQL and returns a PGStore with all sub-stores. @@ -68,6 +69,7 @@ func NewPGStore(ctx context.Context, cfg PGConfig) (*PGStore, error) { s.logs = &PGLogStore{pool: pool} s.audit = &PGAuditStore{pool: pool} s.iam = &PGIAMStore{pool: pool} + s.configDocs = NewPGConfigStore(pool) return s, nil } @@ -110,3 +112,6 @@ func (s *PGStore) Audit() AuditStore { return s.audit } // IAM returns the IAMStore. func (s *PGStore) IAM() IAMStore { return s.iam } + +// ConfigDocs returns the PGConfigStore. +func (s *PGStore) ConfigDocs() *PGConfigStore { return s.configDocs } diff --git a/store/pg_config.go b/store/pg_config.go new file mode 100644 index 00000000..9177fe12 --- /dev/null +++ b/store/pg_config.go @@ -0,0 +1,104 @@ +package store + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "fmt" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +// ConfigDocument represents a config record in the database. +type ConfigDocument struct { + Key string + Data []byte + Hash string + Version int + UpdatedAt time.Time +} + +// PGConfigStore manages config documents in PostgreSQL. +type PGConfigStore struct { + pool *pgxpool.Pool +} + +// NewPGConfigStore creates a config store backed by the given connection pool. +func NewPGConfigStore(pool *pgxpool.Pool) *PGConfigStore { + return &PGConfigStore{pool: pool} +} + +func (s *PGConfigStore) GetConfigDocument(ctx context.Context, key string) ([]byte, error) { + var data []byte + err := s.pool.QueryRow(ctx, + `SELECT data FROM config_documents WHERE key = $1`, key).Scan(&data) + if err != nil { + return nil, fmt.Errorf("get config document %q: %w", key, err) + } + return data, nil +} + +func (s *PGConfigStore) GetConfigDocumentHash(ctx context.Context, key string) (string, error) { + var hash string + err := s.pool.QueryRow(ctx, + `SELECT hash FROM config_documents WHERE key = $1`, key).Scan(&hash) + if err != nil { + return "", fmt.Errorf("get config document hash %q: %w", key, err) + } + return hash, nil +} + +func (s *PGConfigStore) PutConfigDocument(ctx context.Context, key string, data []byte) error { + sum := sha256.Sum256(data) + hash := hex.EncodeToString(sum[:]) + + tx, err := s.pool.Begin(ctx) + if err != nil { + return fmt.Errorf("begin tx: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + + _, err = tx.Exec(ctx, ` + INSERT INTO config_documents (key, data, hash, version, created_by) + VALUES ($1, $2, $3, 1, 'system') + ON CONFLICT (key) DO UPDATE SET + data = EXCLUDED.data, + hash = EXCLUDED.hash, + version = config_documents.version + 1, + updated_at = NOW() + `, key, data, hash) + if err != nil { + return fmt.Errorf("upsert config document %q: %w", key, err) + } + + // Record history + _, err = tx.Exec(ctx, ` + INSERT INTO config_document_history (key, data, hash, version, changed_by) + SELECT key, data, hash, version, 'system' FROM config_documents WHERE key = $1 + `, key) + if err != nil { + return fmt.Errorf("record config history %q: %w", key, err) + } + + return tx.Commit(ctx) +} + +func (s *PGConfigStore) ListConfigDocuments(ctx context.Context) ([]ConfigDocument, error) { + rows, err := s.pool.Query(ctx, + `SELECT key, data, hash, version, updated_at FROM config_documents ORDER BY key`) + if err != nil { + return nil, fmt.Errorf("list config documents: %w", err) + } + defer rows.Close() + + var docs []ConfigDocument + for rows.Next() { + var d ConfigDocument + if err := rows.Scan(&d.Key, &d.Data, &d.Hash, &d.Version, &d.UpdatedAt); err != nil { + return nil, fmt.Errorf("scan config document: %w", err) + } + docs = append(docs, d) + } + return docs, rows.Err() +} diff --git a/store/pg_config_test.go b/store/pg_config_test.go new file mode 100644 index 00000000..9070511e --- /dev/null +++ b/store/pg_config_test.go @@ -0,0 +1,149 @@ +package store + +import ( + "context" + "os" + "testing" + + "github.com/jackc/pgx/v5/pgxpool" +) + +func TestPGConfigStore_Integration(t *testing.T) { + pgURL := os.Getenv("PG_URL") + if pgURL == "" { + t.Skip("PG_URL not set") + } + + ctx := context.Background() + pool, err := pgxpool.New(ctx, pgURL) + if err != nil { + t.Fatalf("connect to postgres: %v", err) + } + defer pool.Close() + + if err := pool.Ping(ctx); err != nil { + t.Fatalf("ping postgres: %v", err) + } + + // Ensure the table exists (run migration inline for integration test). + _, err = pool.Exec(ctx, ` + CREATE TABLE IF NOT EXISTS config_documents ( + key TEXT PRIMARY KEY, + data BYTEA NOT NULL, + hash TEXT NOT NULL, + version INTEGER NOT NULL DEFAULT 1, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_by TEXT NOT NULL DEFAULT 'system' + ); + CREATE TABLE IF NOT EXISTS config_document_history ( + id BIGSERIAL PRIMARY KEY, + key TEXT NOT NULL, + data BYTEA NOT NULL, + hash TEXT NOT NULL, + version INTEGER NOT NULL, + changed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + changed_by TEXT NOT NULL DEFAULT 'system' + ); + `) + if err != nil { + t.Fatalf("create tables: %v", err) + } + + // Clean up test data. + t.Cleanup(func() { + _, _ = pool.Exec(ctx, `DELETE FROM config_document_history WHERE key = 'test-integration'`) + _, _ = pool.Exec(ctx, `DELETE FROM config_documents WHERE key = 'test-integration'`) + }) + + s := NewPGConfigStore(pool) + const key = "test-integration" + const yamlV1 = `modules: + - name: server + type: http.server +` + const yamlV2 = `modules: + - name: server + type: http.server + - name: router + type: http.router +` + + // PutConfigDocument — initial insert. + if err := s.PutConfigDocument(ctx, key, []byte(yamlV1)); err != nil { + t.Fatalf("PutConfigDocument (v1): %v", err) + } + + // GetConfigDocument. + data, err := s.GetConfigDocument(ctx, key) + if err != nil { + t.Fatalf("GetConfigDocument: %v", err) + } + if string(data) != yamlV1 { + t.Errorf("unexpected data: got %q, want %q", data, yamlV1) + } + + // GetConfigDocumentHash. + hash, err := s.GetConfigDocumentHash(ctx, key) + if err != nil { + t.Fatalf("GetConfigDocumentHash: %v", err) + } + if hash == "" { + t.Error("expected non-empty hash") + } + + // ListConfigDocuments. + docs, err := s.ListConfigDocuments(ctx) + if err != nil { + t.Fatalf("ListConfigDocuments: %v", err) + } + found := false + for _, d := range docs { + if d.Key == key { + found = true + if d.Version != 1 { + t.Errorf("expected version 1, got %d", d.Version) + } + break + } + } + if !found { + t.Error("expected to find test key in ListConfigDocuments") + } + + // PutConfigDocument — update (upsert). + if err := s.PutConfigDocument(ctx, key, []byte(yamlV2)); err != nil { + t.Fatalf("PutConfigDocument (v2): %v", err) + } + + data2, err := s.GetConfigDocument(ctx, key) + if err != nil { + t.Fatalf("GetConfigDocument after update: %v", err) + } + if string(data2) != yamlV2 { + t.Errorf("unexpected data after update: got %q, want %q", data2, yamlV2) + } + + // Verify version incremented. + docs2, err := s.ListConfigDocuments(ctx) + if err != nil { + t.Fatalf("ListConfigDocuments after update: %v", err) + } + for _, d := range docs2 { + if d.Key == key { + if d.Version != 2 { + t.Errorf("expected version 2 after update, got %d", d.Version) + } + break + } + } + + // Hash must have changed after update. + hash2, err := s.GetConfigDocumentHash(ctx, key) + if err != nil { + t.Fatalf("GetConfigDocumentHash after update: %v", err) + } + if hash2 == hash { + t.Error("expected hash to change after update") + } +} From f73613dfeca0f847c7f6a9808c6e005c3056eada Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Thu, 26 Feb 2026 22:49:53 -0500 Subject: [PATCH 2/2] fix: address review comments on dynamic config system - HasNonModuleChanges now also compares Requires (plugin/capability changes) - DatabaseSource.Hash() fallback bypasses cache to ensure accurate change detection - DatabasePoller.Stop() and ConfigWatcher.Stop() are idempotent via sync.Once - ConfigWatcher handles fsnotify.Rename events for atomic-save editor compatibility - ConfigReloader falls back to full reload when reconfigurer is nil but modules changed - Add SetReconfigurer() for updating the reconfigurer after a full engine reload - serverApp implements ModuleReconfigurer as stable adapter (survives engine reloads) - Module reconfigure endpoint uses proper JSON encoding, 1MiB body limit, and RequireAuth - Add tests for nil reconfigurer fallback and SetReconfigurer Co-Authored-By: Claude Opus 4.6 --- cmd/server/main.go | 50 ++++++++++++++++++-------- config/diff.go | 8 +++-- config/reloader.go | 23 +++++++++++- config/reloader_test.go | 74 ++++++++++++++++++++++++++++++++++++++ config/source_db.go | 9 ++--- config/source_db_poller.go | 8 +++-- config/watcher.go | 15 ++++++-- 7 files changed, 159 insertions(+), 28 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index e9264b3a..6b383b01 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -347,6 +347,12 @@ type serverApp struct { currentConfig *config.WorkflowConfig // last loaded config, used by dynamic config watcher } +// ReconfigureModules delegates to the current engine, ensuring the reloader +// always targets the active engine even after a full reload replaces it. +func (app *serverApp) ReconfigureModules(ctx context.Context, changes []config.ModuleConfigChange) ([]string, error) { + return app.engine.ReconfigureModules(ctx, changes) +} + // setup initializes all server components: engine, AI services, and HTTP mux. func setup(logger *slog.Logger, cfg *config.WorkflowConfig) (*serverApp, error) { app := &serverApp{ @@ -1232,9 +1238,9 @@ func run(ctx context.Context, app *serverApp, listenAddr string) error { var reloaderErr error reloader, reloaderErr = config.NewConfigReloader( - app.currentConfig, // the loaded WorkflowConfig - app.reloadEngine, // existing full reload function - app.engine, // implements ModuleReconfigurer + app.currentConfig, // the loaded WorkflowConfig + app.reloadEngine, // existing full reload function + app, // stable adapter — delegates to current app.engine app.logger, ) if reloaderErr != nil { @@ -1269,7 +1275,7 @@ func run(ctx context.Context, app *serverApp, listenAddr string) error { // Reuse or create a reloader for the DB poller. if reloader == nil { var reloaderErr error - reloader, reloaderErr = config.NewConfigReloader(app.currentConfig, app.reloadEngine, app.engine, app.logger) + reloader, reloaderErr = config.NewConfigReloader(app.currentConfig, app.reloadEngine, app, app.logger) if reloaderErr != nil { app.logger.Error("Failed to create config reloader for DB poller", "error", reloaderErr) } @@ -1590,40 +1596,56 @@ func runMultiWorkflow(logger *slog.Logger) error { // Module reconfiguration endpoint — allows runtime hot-reload of individual // modules that implement interfaces.Reconfigurable without a full engine restart. - mux.HandleFunc("PUT /api/v1/modules/{name}/config", func(w http.ResponseWriter, r *http.Request) { + // Wrapped with the same RequireAuth middleware used by the API router. + reconfigPerms := apihandler.NewPermissionService(stores.Memberships, stores.Workflows, stores.Projects) + reconfigMw := apihandler.NewMiddleware([]byte(secret), stores.Users, reconfigPerms) + mux.Handle("PUT /api/v1/modules/{name}/config", reconfigMw.RequireAuth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { moduleName := r.PathValue("name") if moduleName == "" { - http.Error(w, `{"error":"module name required"}`, http.StatusBadRequest) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + _ = json.NewEncoder(w).Encode(map[string]string{"error": "module name required"}) return } + const maxConfigBytes = 1 << 20 // 1 MiB + limitedBody := http.MaxBytesReader(w, r.Body, maxConfigBytes) + defer limitedBody.Close() //nolint:errcheck + var newConfig map[string]any - if err := json.NewDecoder(r.Body).Decode(&newConfig); err != nil { - http.Error(w, fmt.Sprintf(`{"error":"invalid JSON: %v"}`, err), http.StatusBadRequest) + if err := json.NewDecoder(limitedBody).Decode(&newConfig); err != nil { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusBadRequest) + _ = json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("invalid JSON: %v", err)}) return } mod := app.engine.GetApp().GetModule(moduleName) if mod == nil { - http.Error(w, fmt.Sprintf(`{"error":"module %q not found"}`, moduleName), http.StatusNotFound) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusNotFound) + _ = json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("module %q not found", moduleName)}) return } reconf, ok := mod.(interfaces.Reconfigurable) if !ok { - http.Error(w, fmt.Sprintf(`{"error":"module %q does not support runtime reconfiguration"}`, moduleName), http.StatusNotImplemented) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusNotImplemented) + _ = json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("module %q does not support runtime reconfiguration", moduleName)}) return } if err := reconf.Reconfigure(r.Context(), newConfig); err != nil { - http.Error(w, fmt.Sprintf(`{"error":"reconfiguration failed: %v"}`, err), http.StatusInternalServerError) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusInternalServerError) + _ = json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("reconfiguration failed: %v", err)}) return } w.Header().Set("Content-Type", "application/json") - resp, _ := json.Marshal(map[string]string{"status": "ok", "module": moduleName}) - w.Write(resp) //nolint:errcheck - }) + _ = json.NewEncoder(w).Encode(map[string]string{"status": "ok", "module": moduleName}) + }))) mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") diff --git a/config/diff.go b/config/diff.go index b9b58899..2dace449 100644 --- a/config/diff.go +++ b/config/diff.go @@ -62,13 +62,15 @@ func DiffModuleConfigs(old, new *WorkflowConfig) *ModuleConfigDiff { return diff } -// HasNonModuleChanges returns true if workflows, triggers, pipelines, or -// platform config changed between old and new (requiring full reload). +// HasNonModuleChanges returns true if workflows, triggers, pipelines, +// platform config, or requirements changed between old and new +// (requiring full reload). func HasNonModuleChanges(old, new *WorkflowConfig) bool { return hashAny(old.Workflows) != hashAny(new.Workflows) || hashAny(old.Triggers) != hashAny(new.Triggers) || hashAny(old.Pipelines) != hashAny(new.Pipelines) || - hashAny(old.Platform) != hashAny(new.Platform) + hashAny(old.Platform) != hashAny(new.Platform) || + hashAny(old.Requires) != hashAny(new.Requires) } func hashModuleConfig(m ModuleConfig) string { diff --git a/config/reloader.go b/config/reloader.go index 61c4686a..3029abe9 100644 --- a/config/reloader.go +++ b/config/reloader.go @@ -52,6 +52,15 @@ func NewConfigReloader( }, nil } +// SetReconfigurer updates the ModuleReconfigurer used for partial (per-module) +// reloads. This should be called after a successful full engine reload if the +// underlying engine (and its reconfigurer) has changed. +func (r *ConfigReloader) SetReconfigurer(reconfigurer ModuleReconfigurer) { + r.mu.Lock() + defer r.mu.Unlock() + r.reconfigurer = reconfigurer +} + // HandleChange processes a config change event. It diffs the old and new configs, // attempts per-module reconfiguration for module-only changes, and falls back // to a full reload when necessary. @@ -75,7 +84,19 @@ func (r *ConfigReloader) HandleChange(evt ConfigChangeEvent) error { } // Only module config changes — try partial reconfiguration. - if len(diff.Modified) > 0 && r.reconfigurer != nil { + if len(diff.Modified) > 0 { + if r.reconfigurer == nil { + // No reconfigurer available — fall back to full reload. + r.logger.Info("module changes detected but no reconfigurer, performing full reload", + "modified", len(diff.Modified)) + if err := r.fullReloadFn(evt.Config); err != nil { + return err + } + r.current = evt.Config + r.currentHash = evt.NewHash + return nil + } + failed, err := r.reconfigurer.ReconfigureModules(context.Background(), diff.Modified) if err != nil { return err diff --git a/config/reloader_test.go b/config/reloader_test.go index 11c8fb6c..d3d5cb9c 100644 --- a/config/reloader_test.go +++ b/config/reloader_test.go @@ -237,6 +237,80 @@ func TestConfigReloader_FallbackToFull(t *testing.T) { } } +func TestConfigReloader_NilReconfigurer_FallsBackToFullReload(t *testing.T) { + initial := makeWorkflowConfig( + []ModuleConfig{{Name: "alpha", Type: "cache", Config: map[string]any{"ttl": 60}}}, + nil, + ) + + var fullReloadCalled int + fullFn := func(cfg *WorkflowConfig) error { + fullReloadCalled++ + return nil + } + + // nil reconfigurer — module changes should still trigger full reload. + r := newTestReloader(t, initial, fullFn, nil) + + newCfg := makeWorkflowConfig( + []ModuleConfig{{Name: "alpha", Type: "cache", Config: map[string]any{"ttl": 120}}}, + nil, + ) + evt, err := makeChangeEvent(newCfg) + if err != nil { + t.Fatalf("makeChangeEvent: %v", err) + } + + if err := r.HandleChange(evt); err != nil { + t.Fatalf("HandleChange: %v", err) + } + + if fullReloadCalled != 1 { + t.Errorf("expected 1 full reload when reconfigurer is nil, got %d", fullReloadCalled) + } +} + +func TestConfigReloader_SetReconfigurer(t *testing.T) { + initial := makeWorkflowConfig( + []ModuleConfig{{Name: "alpha", Type: "cache", Config: map[string]any{"ttl": 60}}}, + nil, + ) + + var fullReloadCalled int + fullFn := func(cfg *WorkflowConfig) error { + fullReloadCalled++ + return nil + } + + // Start with nil reconfigurer. + r := newTestReloader(t, initial, fullFn, nil) + + // Set a reconfigurer dynamically. + rec := &mockReconfigurer{} + r.SetReconfigurer(rec) + + newCfg := makeWorkflowConfig( + []ModuleConfig{{Name: "alpha", Type: "cache", Config: map[string]any{"ttl": 120}}}, + nil, + ) + evt, err := makeChangeEvent(newCfg) + if err != nil { + t.Fatalf("makeChangeEvent: %v", err) + } + + if err := r.HandleChange(evt); err != nil { + t.Fatalf("HandleChange: %v", err) + } + + // Should use partial reconfigure, not full reload. + if fullReloadCalled != 0 { + t.Errorf("expected 0 full reloads after SetReconfigurer, got %d", fullReloadCalled) + } + if len(rec.called) != 1 { + t.Errorf("expected ReconfigureModules called once, got %d", len(rec.called)) + } +} + func TestConfigReloader_NoEffectiveChanges(t *testing.T) { initial := makeWorkflowConfig( []ModuleConfig{{Name: "alpha", Type: "http.server", Config: map[string]any{"port": 8080}}}, diff --git a/config/source_db.go b/config/source_db.go index 2755db99..c6f7ea5e 100644 --- a/config/source_db.go +++ b/config/source_db.go @@ -98,15 +98,16 @@ func (s *DatabaseSource) refresh(ctx context.Context) (*WorkflowConfig, error) { // Hash returns the SHA256 hex digest of the stored config bytes. It first // tries the fast path of fetching the pre-computed hash from the database, -// and falls back to loading the full document if that fails. +// and falls back to loading the full document if that fails. The fallback +// always fetches fresh data to ensure change detection is accurate. func (s *DatabaseSource) Hash(ctx context.Context) (string, error) { hash, err := s.store.GetConfigDocumentHash(ctx, s.key) if err == nil { return hash, nil } - // Fallback: load and hash. - if _, loadErr := s.Load(ctx); loadErr != nil { - return "", loadErr + // Fallback: force a fresh load (bypass cache) to get an accurate hash. + if _, refreshErr := s.refresh(ctx); refreshErr != nil { + return "", refreshErr } s.mu.RLock() defer s.mu.RUnlock() diff --git a/config/source_db_poller.go b/config/source_db_poller.go index d6b8e021..ff3d69bd 100644 --- a/config/source_db_poller.go +++ b/config/source_db_poller.go @@ -16,8 +16,9 @@ type DatabasePoller struct { logger *slog.Logger lastHash string - done chan struct{} - wg sync.WaitGroup + done chan struct{} + stopOnce sync.Once + wg sync.WaitGroup } // NewDatabasePoller creates a DatabasePoller that calls onChange whenever the @@ -46,8 +47,9 @@ func (p *DatabasePoller) Start(ctx context.Context) error { } // Stop signals the polling goroutine to exit and waits for it to finish. +// It is safe to call Stop multiple times. func (p *DatabasePoller) Stop() { - close(p.done) + p.stopOnce.Do(func() { close(p.done) }) p.wg.Wait() } diff --git a/config/watcher.go b/config/watcher.go index 5482bc00..748fe24b 100644 --- a/config/watcher.go +++ b/config/watcher.go @@ -35,6 +35,7 @@ type ConfigWatcher struct { fsWatcher *fsnotify.Watcher done chan struct{} + stopOnce sync.Once wg sync.WaitGroup lastHash string @@ -87,8 +88,9 @@ func (w *ConfigWatcher) Start() error { } // Stop terminates the watcher and waits for the background goroutine to exit. +// It is safe to call Stop multiple times. func (w *ConfigWatcher) Stop() error { - close(w.done) + w.stopOnce.Do(func() { close(w.done) }) w.wg.Wait() if w.fsWatcher != nil { return w.fsWatcher.Close() @@ -114,9 +116,16 @@ func (w *ConfigWatcher) loop() { if !isYAMLFile(event.Name) { continue } - if event.Op&(fsnotify.Write|fsnotify.Create) != 0 { + if event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Rename) != 0 { + // Also handle Rename for atomic-save editors that rename-over + // the config file. On a Rename we enqueue the target config path + // rather than the renamed-away path so processChange still matches. + name := event.Name + if event.Op&fsnotify.Rename != 0 { + name = w.source.Path() + } w.mu.Lock() - w.pending[event.Name] = time.Now() + w.pending[name] = time.Now() w.mu.Unlock() }