Skip to content

Commit f73613d

Browse files
intel352claude
andcommitted
fix: address review comments on dynamic config system
- HasNonModuleChanges now also compares Requires (plugin/capability changes) - DatabaseSource.Hash() fallback bypasses cache to ensure accurate change detection - DatabasePoller.Stop() and ConfigWatcher.Stop() are idempotent via sync.Once - ConfigWatcher handles fsnotify.Rename events for atomic-save editor compatibility - ConfigReloader falls back to full reload when reconfigurer is nil but modules changed - Add SetReconfigurer() for updating the reconfigurer after a full engine reload - serverApp implements ModuleReconfigurer as stable adapter (survives engine reloads) - Module reconfigure endpoint uses proper JSON encoding, 1MiB body limit, and RequireAuth - Add tests for nil reconfigurer fallback and SetReconfigurer Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent dfe5a3c commit f73613d

7 files changed

Lines changed: 159 additions & 28 deletions

File tree

cmd/server/main.go

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,12 @@ type serverApp struct {
347347
currentConfig *config.WorkflowConfig // last loaded config, used by dynamic config watcher
348348
}
349349

350+
// ReconfigureModules delegates to the current engine, ensuring the reloader
351+
// always targets the active engine even after a full reload replaces it.
352+
func (app *serverApp) ReconfigureModules(ctx context.Context, changes []config.ModuleConfigChange) ([]string, error) {
353+
return app.engine.ReconfigureModules(ctx, changes)
354+
}
355+
350356
// setup initializes all server components: engine, AI services, and HTTP mux.
351357
func setup(logger *slog.Logger, cfg *config.WorkflowConfig) (*serverApp, error) {
352358
app := &serverApp{
@@ -1232,9 +1238,9 @@ func run(ctx context.Context, app *serverApp, listenAddr string) error {
12321238

12331239
var reloaderErr error
12341240
reloader, reloaderErr = config.NewConfigReloader(
1235-
app.currentConfig, // the loaded WorkflowConfig
1236-
app.reloadEngine, // existing full reload function
1237-
app.engine, // implements ModuleReconfigurer
1241+
app.currentConfig, // the loaded WorkflowConfig
1242+
app.reloadEngine, // existing full reload function
1243+
app, // stable adapter — delegates to current app.engine
12381244
app.logger,
12391245
)
12401246
if reloaderErr != nil {
@@ -1269,7 +1275,7 @@ func run(ctx context.Context, app *serverApp, listenAddr string) error {
12691275
// Reuse or create a reloader for the DB poller.
12701276
if reloader == nil {
12711277
var reloaderErr error
1272-
reloader, reloaderErr = config.NewConfigReloader(app.currentConfig, app.reloadEngine, app.engine, app.logger)
1278+
reloader, reloaderErr = config.NewConfigReloader(app.currentConfig, app.reloadEngine, app, app.logger)
12731279
if reloaderErr != nil {
12741280
app.logger.Error("Failed to create config reloader for DB poller", "error", reloaderErr)
12751281
}
@@ -1590,40 +1596,56 @@ func runMultiWorkflow(logger *slog.Logger) error {
15901596

15911597
// Module reconfiguration endpoint — allows runtime hot-reload of individual
15921598
// modules that implement interfaces.Reconfigurable without a full engine restart.
1593-
mux.HandleFunc("PUT /api/v1/modules/{name}/config", func(w http.ResponseWriter, r *http.Request) {
1599+
// Wrapped with the same RequireAuth middleware used by the API router.
1600+
reconfigPerms := apihandler.NewPermissionService(stores.Memberships, stores.Workflows, stores.Projects)
1601+
reconfigMw := apihandler.NewMiddleware([]byte(secret), stores.Users, reconfigPerms)
1602+
mux.Handle("PUT /api/v1/modules/{name}/config", reconfigMw.RequireAuth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
15941603
moduleName := r.PathValue("name")
15951604
if moduleName == "" {
1596-
http.Error(w, `{"error":"module name required"}`, http.StatusBadRequest)
1605+
w.Header().Set("Content-Type", "application/json")
1606+
w.WriteHeader(http.StatusBadRequest)
1607+
_ = json.NewEncoder(w).Encode(map[string]string{"error": "module name required"})
15971608
return
15981609
}
15991610

1611+
const maxConfigBytes = 1 << 20 // 1 MiB
1612+
limitedBody := http.MaxBytesReader(w, r.Body, maxConfigBytes)
1613+
defer limitedBody.Close() //nolint:errcheck
1614+
16001615
var newConfig map[string]any
1601-
if err := json.NewDecoder(r.Body).Decode(&newConfig); err != nil {
1602-
http.Error(w, fmt.Sprintf(`{"error":"invalid JSON: %v"}`, err), http.StatusBadRequest)
1616+
if err := json.NewDecoder(limitedBody).Decode(&newConfig); err != nil {
1617+
w.Header().Set("Content-Type", "application/json")
1618+
w.WriteHeader(http.StatusBadRequest)
1619+
_ = json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("invalid JSON: %v", err)})
16031620
return
16041621
}
16051622

16061623
mod := app.engine.GetApp().GetModule(moduleName)
16071624
if mod == nil {
1608-
http.Error(w, fmt.Sprintf(`{"error":"module %q not found"}`, moduleName), http.StatusNotFound)
1625+
w.Header().Set("Content-Type", "application/json")
1626+
w.WriteHeader(http.StatusNotFound)
1627+
_ = json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("module %q not found", moduleName)})
16091628
return
16101629
}
16111630

16121631
reconf, ok := mod.(interfaces.Reconfigurable)
16131632
if !ok {
1614-
http.Error(w, fmt.Sprintf(`{"error":"module %q does not support runtime reconfiguration"}`, moduleName), http.StatusNotImplemented)
1633+
w.Header().Set("Content-Type", "application/json")
1634+
w.WriteHeader(http.StatusNotImplemented)
1635+
_ = json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("module %q does not support runtime reconfiguration", moduleName)})
16151636
return
16161637
}
16171638

16181639
if err := reconf.Reconfigure(r.Context(), newConfig); err != nil {
1619-
http.Error(w, fmt.Sprintf(`{"error":"reconfiguration failed: %v"}`, err), http.StatusInternalServerError)
1640+
w.Header().Set("Content-Type", "application/json")
1641+
w.WriteHeader(http.StatusInternalServerError)
1642+
_ = json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("reconfiguration failed: %v", err)})
16201643
return
16211644
}
16221645

16231646
w.Header().Set("Content-Type", "application/json")
1624-
resp, _ := json.Marshal(map[string]string{"status": "ok", "module": moduleName})
1625-
w.Write(resp) //nolint:errcheck
1626-
})
1647+
_ = json.NewEncoder(w).Encode(map[string]string{"status": "ok", "module": moduleName})
1648+
})))
16271649

16281650
mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
16291651
w.Header().Set("Content-Type", "application/json")

config/diff.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,15 @@ func DiffModuleConfigs(old, new *WorkflowConfig) *ModuleConfigDiff {
6262
return diff
6363
}
6464

65-
// HasNonModuleChanges returns true if workflows, triggers, pipelines, or
66-
// platform config changed between old and new (requiring full reload).
65+
// HasNonModuleChanges returns true if workflows, triggers, pipelines,
66+
// platform config, or requirements changed between old and new
67+
// (requiring full reload).
6768
func HasNonModuleChanges(old, new *WorkflowConfig) bool {
6869
return hashAny(old.Workflows) != hashAny(new.Workflows) ||
6970
hashAny(old.Triggers) != hashAny(new.Triggers) ||
7071
hashAny(old.Pipelines) != hashAny(new.Pipelines) ||
71-
hashAny(old.Platform) != hashAny(new.Platform)
72+
hashAny(old.Platform) != hashAny(new.Platform) ||
73+
hashAny(old.Requires) != hashAny(new.Requires)
7274
}
7375

7476
func hashModuleConfig(m ModuleConfig) string {

config/reloader.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,15 @@ func NewConfigReloader(
5252
}, nil
5353
}
5454

55+
// SetReconfigurer updates the ModuleReconfigurer used for partial (per-module)
56+
// reloads. This should be called after a successful full engine reload if the
57+
// underlying engine (and its reconfigurer) has changed.
58+
func (r *ConfigReloader) SetReconfigurer(reconfigurer ModuleReconfigurer) {
59+
r.mu.Lock()
60+
defer r.mu.Unlock()
61+
r.reconfigurer = reconfigurer
62+
}
63+
5564
// HandleChange processes a config change event. It diffs the old and new configs,
5665
// attempts per-module reconfiguration for module-only changes, and falls back
5766
// to a full reload when necessary.
@@ -75,7 +84,19 @@ func (r *ConfigReloader) HandleChange(evt ConfigChangeEvent) error {
7584
}
7685

7786
// Only module config changes — try partial reconfiguration.
78-
if len(diff.Modified) > 0 && r.reconfigurer != nil {
87+
if len(diff.Modified) > 0 {
88+
if r.reconfigurer == nil {
89+
// No reconfigurer available — fall back to full reload.
90+
r.logger.Info("module changes detected but no reconfigurer, performing full reload",
91+
"modified", len(diff.Modified))
92+
if err := r.fullReloadFn(evt.Config); err != nil {
93+
return err
94+
}
95+
r.current = evt.Config
96+
r.currentHash = evt.NewHash
97+
return nil
98+
}
99+
79100
failed, err := r.reconfigurer.ReconfigureModules(context.Background(), diff.Modified)
80101
if err != nil {
81102
return err

config/reloader_test.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,80 @@ func TestConfigReloader_FallbackToFull(t *testing.T) {
237237
}
238238
}
239239

240+
func TestConfigReloader_NilReconfigurer_FallsBackToFullReload(t *testing.T) {
241+
initial := makeWorkflowConfig(
242+
[]ModuleConfig{{Name: "alpha", Type: "cache", Config: map[string]any{"ttl": 60}}},
243+
nil,
244+
)
245+
246+
var fullReloadCalled int
247+
fullFn := func(cfg *WorkflowConfig) error {
248+
fullReloadCalled++
249+
return nil
250+
}
251+
252+
// nil reconfigurer — module changes should still trigger full reload.
253+
r := newTestReloader(t, initial, fullFn, nil)
254+
255+
newCfg := makeWorkflowConfig(
256+
[]ModuleConfig{{Name: "alpha", Type: "cache", Config: map[string]any{"ttl": 120}}},
257+
nil,
258+
)
259+
evt, err := makeChangeEvent(newCfg)
260+
if err != nil {
261+
t.Fatalf("makeChangeEvent: %v", err)
262+
}
263+
264+
if err := r.HandleChange(evt); err != nil {
265+
t.Fatalf("HandleChange: %v", err)
266+
}
267+
268+
if fullReloadCalled != 1 {
269+
t.Errorf("expected 1 full reload when reconfigurer is nil, got %d", fullReloadCalled)
270+
}
271+
}
272+
273+
func TestConfigReloader_SetReconfigurer(t *testing.T) {
274+
initial := makeWorkflowConfig(
275+
[]ModuleConfig{{Name: "alpha", Type: "cache", Config: map[string]any{"ttl": 60}}},
276+
nil,
277+
)
278+
279+
var fullReloadCalled int
280+
fullFn := func(cfg *WorkflowConfig) error {
281+
fullReloadCalled++
282+
return nil
283+
}
284+
285+
// Start with nil reconfigurer.
286+
r := newTestReloader(t, initial, fullFn, nil)
287+
288+
// Set a reconfigurer dynamically.
289+
rec := &mockReconfigurer{}
290+
r.SetReconfigurer(rec)
291+
292+
newCfg := makeWorkflowConfig(
293+
[]ModuleConfig{{Name: "alpha", Type: "cache", Config: map[string]any{"ttl": 120}}},
294+
nil,
295+
)
296+
evt, err := makeChangeEvent(newCfg)
297+
if err != nil {
298+
t.Fatalf("makeChangeEvent: %v", err)
299+
}
300+
301+
if err := r.HandleChange(evt); err != nil {
302+
t.Fatalf("HandleChange: %v", err)
303+
}
304+
305+
// Should use partial reconfigure, not full reload.
306+
if fullReloadCalled != 0 {
307+
t.Errorf("expected 0 full reloads after SetReconfigurer, got %d", fullReloadCalled)
308+
}
309+
if len(rec.called) != 1 {
310+
t.Errorf("expected ReconfigureModules called once, got %d", len(rec.called))
311+
}
312+
}
313+
240314
func TestConfigReloader_NoEffectiveChanges(t *testing.T) {
241315
initial := makeWorkflowConfig(
242316
[]ModuleConfig{{Name: "alpha", Type: "http.server", Config: map[string]any{"port": 8080}}},

config/source_db.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,16 @@ func (s *DatabaseSource) refresh(ctx context.Context) (*WorkflowConfig, error) {
9898

9999
// Hash returns the SHA256 hex digest of the stored config bytes. It first
100100
// tries the fast path of fetching the pre-computed hash from the database,
101-
// and falls back to loading the full document if that fails.
101+
// and falls back to loading the full document if that fails. The fallback
102+
// always fetches fresh data to ensure change detection is accurate.
102103
func (s *DatabaseSource) Hash(ctx context.Context) (string, error) {
103104
hash, err := s.store.GetConfigDocumentHash(ctx, s.key)
104105
if err == nil {
105106
return hash, nil
106107
}
107-
// Fallback: load and hash.
108-
if _, loadErr := s.Load(ctx); loadErr != nil {
109-
return "", loadErr
108+
// Fallback: force a fresh load (bypass cache) to get an accurate hash.
109+
if _, refreshErr := s.refresh(ctx); refreshErr != nil {
110+
return "", refreshErr
110111
}
111112
s.mu.RLock()
112113
defer s.mu.RUnlock()

config/source_db_poller.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ type DatabasePoller struct {
1616
logger *slog.Logger
1717
lastHash string
1818

19-
done chan struct{}
20-
wg sync.WaitGroup
19+
done chan struct{}
20+
stopOnce sync.Once
21+
wg sync.WaitGroup
2122
}
2223

2324
// NewDatabasePoller creates a DatabasePoller that calls onChange whenever the
@@ -46,8 +47,9 @@ func (p *DatabasePoller) Start(ctx context.Context) error {
4647
}
4748

4849
// Stop signals the polling goroutine to exit and waits for it to finish.
50+
// It is safe to call Stop multiple times.
4951
func (p *DatabasePoller) Stop() {
50-
close(p.done)
52+
p.stopOnce.Do(func() { close(p.done) })
5153
p.wg.Wait()
5254
}
5355

config/watcher.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type ConfigWatcher struct {
3535

3636
fsWatcher *fsnotify.Watcher
3737
done chan struct{}
38+
stopOnce sync.Once
3839
wg sync.WaitGroup
3940
lastHash string
4041

@@ -87,8 +88,9 @@ func (w *ConfigWatcher) Start() error {
8788
}
8889

8990
// Stop terminates the watcher and waits for the background goroutine to exit.
91+
// It is safe to call Stop multiple times.
9092
func (w *ConfigWatcher) Stop() error {
91-
close(w.done)
93+
w.stopOnce.Do(func() { close(w.done) })
9294
w.wg.Wait()
9395
if w.fsWatcher != nil {
9496
return w.fsWatcher.Close()
@@ -114,9 +116,16 @@ func (w *ConfigWatcher) loop() {
114116
if !isYAMLFile(event.Name) {
115117
continue
116118
}
117-
if event.Op&(fsnotify.Write|fsnotify.Create) != 0 {
119+
if event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Rename) != 0 {
120+
// Also handle Rename for atomic-save editors that rename-over
121+
// the config file. On a Rename we enqueue the target config path
122+
// rather than the renamed-away path so processChange still matches.
123+
name := event.Name
124+
if event.Op&fsnotify.Rename != 0 {
125+
name = w.source.Path()
126+
}
118127
w.mu.Lock()
119-
w.pending[event.Name] = time.Now()
128+
w.pending[name] = time.Now()
120129
w.mu.Unlock()
121130
}
122131

0 commit comments

Comments
 (0)