Skip to content

Commit 6616068

Browse files
intel352claude
andcommitted
feat: dynamic config system with file watcher, database source, and per-module reconfiguration
Adds runtime config reloading without engine restart: - ConfigSource interface with file, database, and composite implementations - fsnotify-based file watcher with debounce and hash-based change detection - DatabaseSource with cached refresh and DatabasePoller for periodic polling - Reconfigurable interface for modules that support hot reconfiguration - ConfigReloader coordinator that diffs configs and routes to partial reconfigure or full reload - ReconfigureModules on StdEngine for surgical module updates - PGConfigStore with transactional upsert and version history - Server integration: --watch flag, PUT /api/v1/modules/{name}/config endpoint Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 8276b5a commit 6616068

23 files changed

Lines changed: 3049 additions & 3 deletions

cmd/server/main.go

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ var (
102102
// Deprecated: admin UI is now served by the external workflow-plugin-admin binary.
103103
// This flag is accepted for backwards compatibility but has no effect.
104104
_ = flag.String("admin-ui-dir", "", "Deprecated: admin UI is now served by the external workflow-plugin-admin binary")
105+
106+
watchConfig = flag.Bool("watch", false, "Watch config file for changes and auto-reload")
105107
)
106108

107109
// defaultEnginePlugins returns the standard set of engine plugins used by all engine instances.
@@ -342,12 +344,14 @@ type serverApp struct {
342344
stores storeComponents
343345
mgmt mgmtComponents
344346
services serviceComponents
347+
currentConfig *config.WorkflowConfig // last loaded config, used by dynamic config watcher
345348
}
346349

347350
// setup initializes all server components: engine, AI services, and HTTP mux.
348351
func setup(logger *slog.Logger, cfg *config.WorkflowConfig) (*serverApp, error) {
349352
app := &serverApp{
350-
logger: logger,
353+
logger: logger,
354+
currentConfig: cfg,
351355
}
352356

353357
engine, loader, registry, err := buildEngine(cfg, logger)
@@ -400,8 +404,9 @@ func setupFromAppConfig(logger *slog.Logger, appCfg *config.ApplicationConfig) (
400404
}
401405

402406
sApp := &serverApp{
403-
engine: engine,
404-
logger: logger,
407+
engine: engine,
408+
logger: logger,
409+
currentConfig: combined,
405410
}
406411

407412
pool := dynamic.NewInterpreterPool()
@@ -1220,6 +1225,74 @@ func run(ctx context.Context, app *serverApp, listenAddr string) error {
12201225
}
12211226
}
12221227

1228+
// Config file watcher — started after the engine and all post-start hooks are up.
1229+
var reloader *config.ConfigReloader
1230+
if *watchConfig && *configFile != "" {
1231+
fileSource := config.NewFileSource(*configFile)
1232+
1233+
var reloaderErr error
1234+
reloader, reloaderErr = config.NewConfigReloader(
1235+
app.currentConfig, // the loaded WorkflowConfig
1236+
app.reloadEngine, // existing full reload function
1237+
app.engine, // implements ModuleReconfigurer
1238+
app.logger,
1239+
)
1240+
if reloaderErr != nil {
1241+
app.logger.Error("Failed to create config reloader", "error", reloaderErr)
1242+
} else {
1243+
configWatcher := config.NewConfigWatcher(fileSource, func(evt config.ConfigChangeEvent) {
1244+
app.logger.Info("Config file changed, reloading",
1245+
"source", evt.Source,
1246+
"old_hash", evt.OldHash[:8],
1247+
"new_hash", evt.NewHash[:8])
1248+
if err := reloader.HandleChange(evt); err != nil {
1249+
app.logger.Error("Config reload failed", "error", err)
1250+
}
1251+
}, config.WithWatchLogger(app.logger))
1252+
1253+
if err := configWatcher.Start(); err != nil {
1254+
app.logger.Error("Failed to start config watcher", "error", err)
1255+
} else {
1256+
defer func() { _ = configWatcher.Stop() }()
1257+
app.logger.Info("Config file watcher started", "file", *configFile)
1258+
}
1259+
}
1260+
}
1261+
1262+
// Database config poller — activated when a PG store is available.
1263+
if app.pgStore != nil {
1264+
dbSource := config.NewDatabaseSource(
1265+
app.pgStore.ConfigDocs(),
1266+
config.WithRefreshInterval(30*time.Second),
1267+
)
1268+
1269+
// Reuse or create a reloader for the DB poller.
1270+
if reloader == nil {
1271+
var reloaderErr error
1272+
reloader, reloaderErr = config.NewConfigReloader(app.currentConfig, app.reloadEngine, app.engine, app.logger)
1273+
if reloaderErr != nil {
1274+
app.logger.Error("Failed to create config reloader for DB poller", "error", reloaderErr)
1275+
}
1276+
}
1277+
1278+
if reloader != nil {
1279+
poller := config.NewDatabasePoller(dbSource, 30*time.Second, func(evt config.ConfigChangeEvent) {
1280+
app.logger.Info("Database config changed, reloading",
1281+
"source", evt.Source)
1282+
if err := reloader.HandleChange(evt); err != nil {
1283+
app.logger.Error("DB config reload failed", "error", err)
1284+
}
1285+
}, app.logger)
1286+
1287+
if err := poller.Start(ctx); err != nil {
1288+
app.logger.Error("DB config poller start failed", "error", err)
1289+
} else {
1290+
defer poller.Stop()
1291+
app.logger.Info("Database config poller started", "interval", "30s")
1292+
}
1293+
}
1294+
}
1295+
12231296
// Wait for context cancellation
12241297
<-ctx.Done()
12251298

@@ -1317,6 +1390,7 @@ func applyEnvOverrides() {
13171390
"load-workflows": "WORKFLOW_LOAD_WORKFLOWS",
13181391
"import-bundle": "WORKFLOW_IMPORT_BUNDLE",
13191392
"license-key": "WORKFLOW_LICENSE_KEY",
1393+
"watch": "WORKFLOW_WATCH",
13201394
}
13211395

13221396
// Track which flags were explicitly set on the command line.
@@ -1513,6 +1587,44 @@ func runMultiWorkflow(logger *slog.Logger) error {
15131587
// 8. Mount API router on the same HTTP mux
15141588
mux := http.NewServeMux()
15151589
mux.Handle("/api/v1/", apiRouter)
1590+
1591+
// Module reconfiguration endpoint — allows runtime hot-reload of individual
1592+
// modules that implement interfaces.Reconfigurable without a full engine restart.
1593+
mux.HandleFunc("PUT /api/v1/modules/{name}/config", func(w http.ResponseWriter, r *http.Request) {
1594+
moduleName := r.PathValue("name")
1595+
if moduleName == "" {
1596+
http.Error(w, `{"error":"module name required"}`, http.StatusBadRequest)
1597+
return
1598+
}
1599+
1600+
var newConfig map[string]any
1601+
if err := json.NewDecoder(r.Body).Decode(&newConfig); err != nil {
1602+
http.Error(w, fmt.Sprintf(`{"error":"invalid JSON: %v"}`, err), http.StatusBadRequest)
1603+
return
1604+
}
1605+
1606+
mod := app.engine.GetApp().GetModule(moduleName)
1607+
if mod == nil {
1608+
http.Error(w, fmt.Sprintf(`{"error":"module %q not found"}`, moduleName), http.StatusNotFound)
1609+
return
1610+
}
1611+
1612+
reconf, ok := mod.(interfaces.Reconfigurable)
1613+
if !ok {
1614+
http.Error(w, fmt.Sprintf(`{"error":"module %q does not support runtime reconfiguration"}`, moduleName), http.StatusNotImplemented)
1615+
return
1616+
}
1617+
1618+
if err := reconf.Reconfigure(r.Context(), newConfig); err != nil {
1619+
http.Error(w, fmt.Sprintf(`{"error":"reconfiguration failed: %v"}`, err), http.StatusInternalServerError)
1620+
return
1621+
}
1622+
1623+
w.Header().Set("Content-Type", "application/json")
1624+
resp, _ := json.Marshal(map[string]string{"status": "ok", "module": moduleName})
1625+
w.Write(resp) //nolint:errcheck
1626+
})
1627+
15161628
mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
15171629
w.Header().Set("Content-Type", "application/json")
15181630
_, _ = w.Write([]byte(`{"mode":"multi-workflow","status":"ok"}`))

config/diff.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package config
2+
3+
import (
4+
"crypto/sha256"
5+
"encoding/hex"
6+
"fmt"
7+
8+
"gopkg.in/yaml.v3"
9+
)
10+
11+
// ModuleConfigDiff represents what changed between two configs.
12+
type ModuleConfigDiff struct {
13+
Added []ModuleConfig // modules in new but not old
14+
Removed []ModuleConfig // modules in old but not new
15+
Modified []ModuleConfigChange // modules present in both with different config
16+
Unchanged []string // module names with no config change
17+
}
18+
19+
// ModuleConfigChange represents a change to a single module's config.
20+
type ModuleConfigChange struct {
21+
Name string
22+
OldConfig map[string]any
23+
NewConfig map[string]any
24+
}
25+
26+
// DiffModuleConfigs compares two configs and identifies module-level changes.
27+
func DiffModuleConfigs(old, new *WorkflowConfig) *ModuleConfigDiff {
28+
diff := &ModuleConfigDiff{}
29+
30+
oldMap := make(map[string]ModuleConfig)
31+
for _, m := range old.Modules {
32+
oldMap[m.Name] = m
33+
}
34+
newMap := make(map[string]ModuleConfig)
35+
for _, m := range new.Modules {
36+
newMap[m.Name] = m
37+
}
38+
39+
for name, newMod := range newMap {
40+
oldMod, exists := oldMap[name]
41+
if !exists {
42+
diff.Added = append(diff.Added, newMod)
43+
continue
44+
}
45+
if hashModuleConfig(oldMod) != hashModuleConfig(newMod) {
46+
diff.Modified = append(diff.Modified, ModuleConfigChange{
47+
Name: name,
48+
OldConfig: oldMod.Config,
49+
NewConfig: newMod.Config,
50+
})
51+
} else {
52+
diff.Unchanged = append(diff.Unchanged, name)
53+
}
54+
}
55+
56+
for name, oldMod := range oldMap {
57+
if _, exists := newMap[name]; !exists {
58+
diff.Removed = append(diff.Removed, oldMod)
59+
}
60+
}
61+
62+
return diff
63+
}
64+
65+
// HasNonModuleChanges returns true if workflows, triggers, pipelines, or
66+
// platform config changed between old and new (requiring full reload).
67+
func HasNonModuleChanges(old, new *WorkflowConfig) bool {
68+
return hashAny(old.Workflows) != hashAny(new.Workflows) ||
69+
hashAny(old.Triggers) != hashAny(new.Triggers) ||
70+
hashAny(old.Pipelines) != hashAny(new.Pipelines) ||
71+
hashAny(old.Platform) != hashAny(new.Platform)
72+
}
73+
74+
func hashModuleConfig(m ModuleConfig) string {
75+
data, err := yaml.Marshal(m)
76+
if err != nil {
77+
return fmt.Sprintf("error:%v", err)
78+
}
79+
sum := sha256.Sum256(data)
80+
return hex.EncodeToString(sum[:])
81+
}
82+
83+
func hashAny(v any) string {
84+
if v == nil {
85+
return "nil"
86+
}
87+
data, err := yaml.Marshal(v)
88+
if err != nil {
89+
return fmt.Sprintf("error:%v", err)
90+
}
91+
sum := sha256.Sum256(data)
92+
return hex.EncodeToString(sum[:])
93+
}

0 commit comments

Comments
 (0)