feat: dynamic config system with hot reload#186
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a pluggable dynamic configuration system (file + DB + composite sources) with hot-reload capabilities, including per-module in-place reconfiguration and server/runtime integration for watching/polling and an API endpoint to reconfigure modules.
Changes:
- Added
ConfigSourceimplementations (file/database/composite) plus change detection (hashing, watcher, poller). - Added config diffing +
ConfigReloaderto decide between partial module reconfigure vs full engine reload. - Added
interfaces.Reconfigurableand engine/server wiring for hot reload (--watch, DB poller, and module reconfigure HTTP endpoint).
Reviewed changes
Copilot reviewed 23 out of 23 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| store/pg_config.go | PostgreSQL-backed config document store with hash + history. |
| store/pg_config_test.go | Integration test for PGConfigStore CRUD/version/hash behavior. |
| store/pg.go | Wires PGConfigStore into PGStore via ConfigDocs(). |
| store/migrations/010_config_documents.sql | Adds tables/indexes for config documents + history. |
| interfaces/reconfigurable.go | Defines module Reconfigurable interface for runtime config updates. |
| engine.go | Adds StdEngine.ReconfigureModules to apply per-module updates. |
| engine_reconfig_test.go | Unit tests for ReconfigureModules behavior and failure modes. |
| config/source.go | Introduces ConfigSource, change event, and config hashing helper. |
| config/source_file.go | File-backed config loader + file content hashing. |
| config/source_file_test.go | Tests for file source load/hash/name/path behavior. |
| config/watcher.go | fsnotify-based file watcher with debounce + hash-based change detection. |
| config/watcher_test.go | Tests for watcher debounce/change/stop behavior. |
| config/source_db.go | Database-backed config loader with caching + hash fast-path. |
| config/source_db_test.go | Tests for DB source load/cache/hash/name and ConfigSource conformance. |
| config/source_db_poller.go | Poller for periodically detecting DB config changes. |
| config/source_db_poller_test.go | Tests for DB poller change detection and stop/cancel behavior. |
| config/source_composite.go | Composite source overlays configs across multiple backends. |
| config/source_composite_test.go | Tests for composite overlay semantics + hash/name. |
| config/diff.go | Module/non-module diff helpers to drive reload decisions. |
| config/diff_test.go | Tests for module diffing and non-module change detection. |
| config/reloader.go | Coordinator that chooses partial reconfigure vs full reload. |
| config/reloader_test.go | Tests for partial reconfigure vs full reload fallback logic. |
| cmd/server/main.go | Adds --watch hot reload wiring, DB poller wiring, and module reconfigure endpoint. |
| // Hash returns the SHA256 hex digest of the stored config bytes. It first | ||
| // tries the fast path of fetching the pre-computed hash from the database, | ||
| // and falls back to loading the full document if that fails. | ||
| func (s *DatabaseSource) Hash(ctx context.Context) (string, error) { | ||
| hash, err := s.store.GetConfigDocumentHash(ctx, s.key) | ||
| if err == nil { | ||
| return hash, nil | ||
| } | ||
| // Fallback: load and hash. | ||
| if _, loadErr := s.Load(ctx); loadErr != nil { | ||
| return "", loadErr | ||
| } | ||
| s.mu.RLock() | ||
| defer s.mu.RUnlock() | ||
| return s.cachedHash, nil | ||
| } |
There was a problem hiding this comment.
DatabaseSource.Hash() falls back to s.Load() when GetConfigDocumentHash fails, but Load may return a still-valid cached config (and cachedHash), meaning Hash() can stay stale and miss changes until refreshInterval expires. For change detection, consider bypassing the cache in this fallback (e.g., call refresh(ctx) directly) or otherwise ensure the returned hash reflects the latest DB state.
config/diff.go
Outdated
| // HasNonModuleChanges returns true if workflows, triggers, pipelines, or | ||
| // platform config changed between old and new (requiring full reload). | ||
| func HasNonModuleChanges(old, new *WorkflowConfig) bool { | ||
| return hashAny(old.Workflows) != hashAny(new.Workflows) || | ||
| hashAny(old.Triggers) != hashAny(new.Triggers) || | ||
| hashAny(old.Pipelines) != hashAny(new.Pipelines) || | ||
| hashAny(old.Platform) != hashAny(new.Platform) |
There was a problem hiding this comment.
HasNonModuleChanges() does not consider cfg.Requires, but engine.BuildFromConfig validates cfg.Requires and it can affect plugin/capability loading. A Requires change likely needs a full reload; consider including old.Requires vs new.Requires in this comparison to avoid incorrectly treating it as a module-only change.
| // HasNonModuleChanges returns true if workflows, triggers, pipelines, or | |
| // platform config changed between old and new (requiring full reload). | |
| func HasNonModuleChanges(old, new *WorkflowConfig) bool { | |
| return hashAny(old.Workflows) != hashAny(new.Workflows) || | |
| hashAny(old.Triggers) != hashAny(new.Triggers) || | |
| hashAny(old.Pipelines) != hashAny(new.Pipelines) || | |
| hashAny(old.Platform) != hashAny(new.Platform) | |
| // HasNonModuleChanges returns true if workflows, triggers, pipelines, | |
| // platform config, or requirements changed between old and new | |
| // (requiring full reload). | |
| func HasNonModuleChanges(old, new *WorkflowConfig) bool { | |
| return hashAny(old.Workflows) != hashAny(new.Workflows) || | |
| hashAny(old.Triggers) != hashAny(new.Triggers) || | |
| hashAny(old.Pipelines) != hashAny(new.Pipelines) || | |
| hashAny(old.Platform) != hashAny(new.Platform) || | |
| hashAny(old.Requires) != hashAny(new.Requires) |
| // Stop signals the polling goroutine to exit and waits for it to finish. | ||
| func (p *DatabasePoller) Stop() { | ||
| close(p.done) | ||
| p.wg.Wait() | ||
| } |
There was a problem hiding this comment.
Stop() closes p.done unconditionally; calling Stop more than once will panic. This is easy to hit if callers call Stop after ctx cancellation or defer Stop in multiple places. Consider making Stop idempotent (sync.Once/atomic) to avoid double-close panics.
cmd/server/main.go
Outdated
| http.Error(w, `{"error":"module name required"}`, http.StatusBadRequest) | ||
| return | ||
| } | ||
|
|
||
| var newConfig map[string]any | ||
| if err := json.NewDecoder(r.Body).Decode(&newConfig); err != nil { | ||
| http.Error(w, fmt.Sprintf(`{"error":"invalid JSON: %v"}`, err), http.StatusBadRequest) | ||
| return | ||
| } | ||
|
|
||
| mod := app.engine.GetApp().GetModule(moduleName) | ||
| if mod == nil { | ||
| http.Error(w, fmt.Sprintf(`{"error":"module %q not found"}`, moduleName), http.StatusNotFound) | ||
| return | ||
| } | ||
|
|
||
| reconf, ok := mod.(interfaces.Reconfigurable) | ||
| if !ok { | ||
| http.Error(w, fmt.Sprintf(`{"error":"module %q does not support runtime reconfiguration"}`, moduleName), http.StatusNotImplemented) | ||
| return | ||
| } | ||
|
|
||
| if err := reconf.Reconfigure(r.Context(), newConfig); err != nil { | ||
| http.Error(w, fmt.Sprintf(`{"error":"reconfiguration failed: %v"}`, err), http.StatusInternalServerError) | ||
| return | ||
| } | ||
|
|
||
| w.Header().Set("Content-Type", "application/json") | ||
| resp, _ := json.Marshal(map[string]string{"status": "ok", "module": moduleName}) | ||
| w.Write(resp) //nolint:errcheck |
There was a problem hiding this comment.
This endpoint is registered directly on the top-level mux, so it bypasses the API router’s auth middleware (most /api/v1/* routes are wrapped with mw.RequireAuth). That makes runtime module reconfiguration unauthenticated unless there’s an outer reverse-proxy enforcing auth. Consider routing this through the existing API middleware/permission checks, or explicitly wrapping with the same mw.RequireAuth/role checks. Also, error responses are built via http.Error/fmt.Sprintf into JSON literals without setting Content-Type and without escaping (can yield invalid JSON); prefer json.NewEncoder and add a request body size limit (http.MaxBytesReader) to avoid unbounded decode.
| http.Error(w, `{"error":"module name required"}`, http.StatusBadRequest) | |
| return | |
| } | |
| var newConfig map[string]any | |
| if err := json.NewDecoder(r.Body).Decode(&newConfig); err != nil { | |
| http.Error(w, fmt.Sprintf(`{"error":"invalid JSON: %v"}`, err), http.StatusBadRequest) | |
| return | |
| } | |
| mod := app.engine.GetApp().GetModule(moduleName) | |
| if mod == nil { | |
| http.Error(w, fmt.Sprintf(`{"error":"module %q not found"}`, moduleName), http.StatusNotFound) | |
| return | |
| } | |
| reconf, ok := mod.(interfaces.Reconfigurable) | |
| if !ok { | |
| http.Error(w, fmt.Sprintf(`{"error":"module %q does not support runtime reconfiguration"}`, moduleName), http.StatusNotImplemented) | |
| return | |
| } | |
| if err := reconf.Reconfigure(r.Context(), newConfig); err != nil { | |
| http.Error(w, fmt.Sprintf(`{"error":"reconfiguration failed: %v"}`, err), http.StatusInternalServerError) | |
| return | |
| } | |
| w.Header().Set("Content-Type", "application/json") | |
| resp, _ := json.Marshal(map[string]string{"status": "ok", "module": moduleName}) | |
| w.Write(resp) //nolint:errcheck | |
| w.Header().Set("Content-Type", "application/json") | |
| w.WriteHeader(http.StatusBadRequest) | |
| _ = json.NewEncoder(w).Encode(map[string]string{ | |
| "error": "module name required", | |
| }) | |
| return | |
| } | |
| // Limit the size of the incoming configuration to prevent unbounded reads. | |
| const maxConfigBytes = 1 << 20 // 1 MiB | |
| limitedBody := http.MaxBytesReader(w, r.Body, maxConfigBytes) | |
| defer limitedBody.Close() | |
| var newConfig map[string]any | |
| if err := json.NewDecoder(limitedBody).Decode(&newConfig); err != nil { | |
| w.Header().Set("Content-Type", "application/json") | |
| w.WriteHeader(http.StatusBadRequest) | |
| _ = json.NewEncoder(w).Encode(map[string]string{ | |
| "error": fmt.Sprintf("invalid JSON: %v", err), | |
| }) | |
| return | |
| } | |
| mod := app.engine.GetApp().GetModule(moduleName) | |
| if mod == nil { | |
| w.Header().Set("Content-Type", "application/json") | |
| w.WriteHeader(http.StatusNotFound) | |
| _ = json.NewEncoder(w).Encode(map[string]string{ | |
| "error": fmt.Sprintf("module %q not found", moduleName), | |
| }) | |
| return | |
| } | |
| reconf, ok := mod.(interfaces.Reconfigurable) | |
| if !ok { | |
| w.Header().Set("Content-Type", "application/json") | |
| w.WriteHeader(http.StatusNotImplemented) | |
| _ = json.NewEncoder(w).Encode(map[string]string{ | |
| "error": fmt.Sprintf("module %q does not support runtime reconfiguration", moduleName), | |
| }) | |
| return | |
| } | |
| if err := reconf.Reconfigure(r.Context(), newConfig); err != nil { | |
| w.Header().Set("Content-Type", "application/json") | |
| w.WriteHeader(http.StatusInternalServerError) | |
| _ = json.NewEncoder(w).Encode(map[string]string{ | |
| "error": fmt.Sprintf("reconfiguration failed: %v", err), | |
| }) | |
| return | |
| } | |
| w.Header().Set("Content-Type", "application/json") | |
| w.WriteHeader(http.StatusOK) | |
| _ = json.NewEncoder(w).Encode(map[string]string{ | |
| "status": "ok", | |
| "module": moduleName, | |
| }) |
| // Only module config changes — try partial reconfiguration. | ||
| if len(diff.Modified) > 0 && r.reconfigurer != nil { | ||
| failed, err := r.reconfigurer.ReconfigureModules(context.Background(), diff.Modified) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if len(failed) > 0 { | ||
| r.logger.Info("some modules cannot be reconfigured in-place, performing full reload", | ||
| "modules", failed) | ||
| if err := r.fullReloadFn(evt.Config); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| r.current = evt.Config | ||
| r.currentHash = evt.NewHash | ||
| return nil | ||
| } | ||
|
|
||
| r.logger.Debug("config change detected but no effective differences") | ||
| return nil |
There was a problem hiding this comment.
HandleChange() currently does nothing when there are module config changes but r.reconfigurer is nil (it falls through to the "no effective differences" path). That contradicts the comment that module changes should fall back to fullReloadFn, and will cause real config edits to be ignored. Consider: if len(diff.Modified)>0 and reconfigurer==nil, call fullReloadFn (or explicitly return an error) and then update r.current/currentHash.
| if !isYAMLFile(event.Name) { | ||
| continue | ||
| } | ||
| if event.Op&(fsnotify.Write|fsnotify.Create) != 0 { | ||
| w.mu.Lock() | ||
| w.pending[event.Name] = time.Now() | ||
| w.mu.Unlock() | ||
| } |
There was a problem hiding this comment.
The watcher claims "atomic-save compatibility" by watching the directory, but it only enqueues events for fsnotify.Write|Create. Many editors perform atomic saves via rename/replace, which commonly emits Rename (and sometimes Remove/Chmod) events; those changes may never trigger a reload. Consider including fsnotify.Rename (and potentially Remove/Chmod) in the handled ops, and/or enqueue a check for the target config path on any YAML event in the directory rather than only event.Name.
| // Stop terminates the watcher and waits for the background goroutine to exit. | ||
| func (w *ConfigWatcher) Stop() error { | ||
| close(w.done) | ||
| w.wg.Wait() | ||
| if w.fsWatcher != nil { | ||
| return w.fsWatcher.Close() | ||
| } | ||
| return nil |
There was a problem hiding this comment.
Stop() closes w.done unconditionally. If Stop() is called more than once (or called before/after a failed Start() in different code paths), this will panic due to closing a closed channel. Consider making Stop idempotent (e.g., sync.Once/atomic flag) and/or nil out w.done after close under a mutex.
| reconfigurer: reconfigurer, | ||
| }, nil | ||
| } | ||
|
|
There was a problem hiding this comment.
ConfigReloader stores a concrete ModuleReconfigurer instance, but in the server integration the engine instance is swapped on full reload. After the first full reload, subsequent partial reconfigurations will target the old engine. To avoid stale pointers, consider making reconfigurer a function/callback that resolves the current engine on each call, or provide a way to update r.reconfigurer after fullReloadFn succeeds.
| // SetReconfigurer updates the ModuleReconfigurer used for partial (per-module) | |
| // reloads. This should be called by the integration after a successful full | |
| // engine reload if the underlying engine (and its reconfigurer) has changed. | |
| func (r *ConfigReloader) SetReconfigurer(reconfigurer ModuleReconfigurer) { | |
| r.mu.Lock() | |
| defer r.mu.Unlock() | |
| r.reconfigurer = reconfigurer | |
| } |
| reloader, reloaderErr = config.NewConfigReloader( | ||
| app.currentConfig, // the loaded WorkflowConfig | ||
| app.reloadEngine, // existing full reload function | ||
| app.engine, // implements ModuleReconfigurer | ||
| app.logger, |
There was a problem hiding this comment.
NewConfigReloader is being constructed with app.engine as the ModuleReconfigurer, but reloadEngine replaces app.engine with a new *StdEngine. The reloader will keep calling ReconfigureModules on the old engine instance after the first full reload. Consider passing a stable adapter (e.g., have *serverApp implement config.ModuleReconfigurer by delegating to app.engine) so the reloader always targets the current engine.
…er-module reconfiguration
Adds runtime config reloading without engine restart:
- ConfigSource interface with file, database, and composite implementations
- fsnotify-based file watcher with debounce and hash-based change detection
- DatabaseSource with cached refresh and DatabasePoller for periodic polling
- Reconfigurable interface for modules that support hot reconfiguration
- ConfigReloader coordinator that diffs configs and routes to partial reconfigure or full reload
- ReconfigureModules on StdEngine for surgical module updates
- PGConfigStore with transactional upsert and version history
- Server integration: --watch flag, PUT /api/v1/modules/{name}/config endpoint
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- HasNonModuleChanges now also compares Requires (plugin/capability changes) - DatabaseSource.Hash() fallback bypasses cache to ensure accurate change detection - DatabasePoller.Stop() and ConfigWatcher.Stop() are idempotent via sync.Once - ConfigWatcher handles fsnotify.Rename events for atomic-save editor compatibility - ConfigReloader falls back to full reload when reconfigurer is nil but modules changed - Add SetReconfigurer() for updating the reconfigurer after a full engine reload - serverApp implements ModuleReconfigurer as stable adapter (survives engine reloads) - Module reconfigure endpoint uses proper JSON encoding, 1MiB body limit, and RequireAuth - Add tests for nil reconfigurer fallback and SetReconfigurer Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
6616068 to
f73613d
Compare
Summary
Reconfigurableinterface — modules can implement hot reconfigure without engine restart--watchflag enables file watching,PUT /api/v1/modules/{name}/configendpoint for runtime module reconfigurationTest plan
🤖 Generated with Claude Code