Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 137 additions & 3 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ var (
// Deprecated: admin UI is now served by the external workflow-plugin-admin binary.
// This flag is accepted for backwards compatibility but has no effect.
_ = flag.String("admin-ui-dir", "", "Deprecated: admin UI is now served by the external workflow-plugin-admin binary")

watchConfig = flag.Bool("watch", false, "Watch config file for changes and auto-reload")
)

// defaultEnginePlugins returns the standard set of engine plugins used by all engine instances.
Expand Down Expand Up @@ -342,12 +344,20 @@ type serverApp struct {
stores storeComponents
mgmt mgmtComponents
services serviceComponents
currentConfig *config.WorkflowConfig // last loaded config, used by dynamic config watcher
}

// ReconfigureModules delegates to the current engine, ensuring the reloader
// always targets the active engine even after a full reload replaces it.
func (app *serverApp) ReconfigureModules(ctx context.Context, changes []config.ModuleConfigChange) ([]string, error) {
return app.engine.ReconfigureModules(ctx, changes)
}

// setup initializes all server components: engine, AI services, and HTTP mux.
func setup(logger *slog.Logger, cfg *config.WorkflowConfig) (*serverApp, error) {
app := &serverApp{
logger: logger,
logger: logger,
currentConfig: cfg,
}

engine, loader, registry, err := buildEngine(cfg, logger)
Expand Down Expand Up @@ -400,8 +410,9 @@ func setupFromAppConfig(logger *slog.Logger, appCfg *config.ApplicationConfig) (
}

sApp := &serverApp{
engine: engine,
logger: logger,
engine: engine,
logger: logger,
currentConfig: combined,
}

pool := dynamic.NewInterpreterPool()
Expand Down Expand Up @@ -1220,6 +1231,74 @@ func run(ctx context.Context, app *serverApp, listenAddr string) error {
}
}

// Config file watcher — started after the engine and all post-start hooks are up.
var reloader *config.ConfigReloader
if *watchConfig && *configFile != "" {
fileSource := config.NewFileSource(*configFile)

var reloaderErr error
reloader, reloaderErr = config.NewConfigReloader(
app.currentConfig, // the loaded WorkflowConfig
app.reloadEngine, // existing full reload function
app, // stable adapter — delegates to current app.engine
app.logger,
)
if reloaderErr != nil {
app.logger.Error("Failed to create config reloader", "error", reloaderErr)
} else {
configWatcher := config.NewConfigWatcher(fileSource, func(evt config.ConfigChangeEvent) {
app.logger.Info("Config file changed, reloading",
"source", evt.Source,
"old_hash", evt.OldHash[:8],
"new_hash", evt.NewHash[:8])
if err := reloader.HandleChange(evt); err != nil {
app.logger.Error("Config reload failed", "error", err)
}
}, config.WithWatchLogger(app.logger))

if err := configWatcher.Start(); err != nil {
app.logger.Error("Failed to start config watcher", "error", err)
} else {
defer func() { _ = configWatcher.Stop() }()
app.logger.Info("Config file watcher started", "file", *configFile)
}
}
}

// Database config poller — activated when a PG store is available.
if app.pgStore != nil {
dbSource := config.NewDatabaseSource(
app.pgStore.ConfigDocs(),
config.WithRefreshInterval(30*time.Second),
)

// Reuse or create a reloader for the DB poller.
if reloader == nil {
var reloaderErr error
reloader, reloaderErr = config.NewConfigReloader(app.currentConfig, app.reloadEngine, app, app.logger)
if reloaderErr != nil {
app.logger.Error("Failed to create config reloader for DB poller", "error", reloaderErr)
}
}

if reloader != nil {
poller := config.NewDatabasePoller(dbSource, 30*time.Second, func(evt config.ConfigChangeEvent) {
app.logger.Info("Database config changed, reloading",
"source", evt.Source)
if err := reloader.HandleChange(evt); err != nil {
app.logger.Error("DB config reload failed", "error", err)
}
}, app.logger)

if err := poller.Start(ctx); err != nil {
app.logger.Error("DB config poller start failed", "error", err)
} else {
defer poller.Stop()
app.logger.Info("Database config poller started", "interval", "30s")
}
}
}

// Wait for context cancellation
<-ctx.Done()

Expand Down Expand Up @@ -1317,6 +1396,7 @@ func applyEnvOverrides() {
"load-workflows": "WORKFLOW_LOAD_WORKFLOWS",
"import-bundle": "WORKFLOW_IMPORT_BUNDLE",
"license-key": "WORKFLOW_LICENSE_KEY",
"watch": "WORKFLOW_WATCH",
}

// Track which flags were explicitly set on the command line.
Expand Down Expand Up @@ -1513,6 +1593,60 @@ func runMultiWorkflow(logger *slog.Logger) error {
// 8. Mount API router on the same HTTP mux
mux := http.NewServeMux()
mux.Handle("/api/v1/", apiRouter)

// Module reconfiguration endpoint — allows runtime hot-reload of individual
// modules that implement interfaces.Reconfigurable without a full engine restart.
// Wrapped with the same RequireAuth middleware used by the API router.
reconfigPerms := apihandler.NewPermissionService(stores.Memberships, stores.Workflows, stores.Projects)
reconfigMw := apihandler.NewMiddleware([]byte(secret), stores.Users, reconfigPerms)
mux.Handle("PUT /api/v1/modules/{name}/config", reconfigMw.RequireAuth(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
moduleName := r.PathValue("name")
if moduleName == "" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(map[string]string{"error": "module name required"})
return
}

const maxConfigBytes = 1 << 20 // 1 MiB
limitedBody := http.MaxBytesReader(w, r.Body, maxConfigBytes)
defer limitedBody.Close() //nolint:errcheck

var newConfig map[string]any
if err := json.NewDecoder(limitedBody).Decode(&newConfig); err != nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadRequest)
_ = json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("invalid JSON: %v", err)})
return
}

mod := app.engine.GetApp().GetModule(moduleName)
if mod == nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusNotFound)
_ = json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("module %q not found", moduleName)})
return
}

reconf, ok := mod.(interfaces.Reconfigurable)
if !ok {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusNotImplemented)
_ = json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("module %q does not support runtime reconfiguration", moduleName)})
return
}

if err := reconf.Reconfigure(r.Context(), newConfig); err != nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(w).Encode(map[string]string{"error": fmt.Sprintf("reconfiguration failed: %v", err)})
return
}

w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(map[string]string{"status": "ok", "module": moduleName})
})))

mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"mode":"multi-workflow","status":"ok"}`))
Expand Down
95 changes: 95 additions & 0 deletions config/diff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package config

import (
"crypto/sha256"
"encoding/hex"
"fmt"

"gopkg.in/yaml.v3"
)

// ModuleConfigDiff represents what changed between two configs.
type ModuleConfigDiff struct {
Added []ModuleConfig // modules in new but not old
Removed []ModuleConfig // modules in old but not new
Modified []ModuleConfigChange // modules present in both with different config
Unchanged []string // module names with no config change
}

// ModuleConfigChange represents a change to a single module's config.
type ModuleConfigChange struct {
Name string
OldConfig map[string]any
NewConfig map[string]any
}

// DiffModuleConfigs compares two configs and identifies module-level changes.
func DiffModuleConfigs(old, new *WorkflowConfig) *ModuleConfigDiff {
diff := &ModuleConfigDiff{}

oldMap := make(map[string]ModuleConfig)
for _, m := range old.Modules {
oldMap[m.Name] = m
}
newMap := make(map[string]ModuleConfig)
for _, m := range new.Modules {
newMap[m.Name] = m
}

for name, newMod := range newMap {
oldMod, exists := oldMap[name]
if !exists {
diff.Added = append(diff.Added, newMod)
continue
}
if hashModuleConfig(oldMod) != hashModuleConfig(newMod) {
diff.Modified = append(diff.Modified, ModuleConfigChange{
Name: name,
OldConfig: oldMod.Config,
NewConfig: newMod.Config,
})
} else {
diff.Unchanged = append(diff.Unchanged, name)
}
}

for name, oldMod := range oldMap {
if _, exists := newMap[name]; !exists {
diff.Removed = append(diff.Removed, oldMod)
}
}

return diff
}

// HasNonModuleChanges returns true if workflows, triggers, pipelines,
// platform config, or requirements changed between old and new
// (requiring full reload).
func HasNonModuleChanges(old, new *WorkflowConfig) bool {
return hashAny(old.Workflows) != hashAny(new.Workflows) ||
hashAny(old.Triggers) != hashAny(new.Triggers) ||
hashAny(old.Pipelines) != hashAny(new.Pipelines) ||
hashAny(old.Platform) != hashAny(new.Platform) ||
hashAny(old.Requires) != hashAny(new.Requires)
}

func hashModuleConfig(m ModuleConfig) string {
data, err := yaml.Marshal(m)
if err != nil {
return fmt.Sprintf("error:%v", err)
}
sum := sha256.Sum256(data)
return hex.EncodeToString(sum[:])
}

func hashAny(v any) string {
if v == nil {
return "nil"
}
data, err := yaml.Marshal(v)
if err != nil {
return fmt.Sprintf("error:%v", err)
}
sum := sha256.Sum256(data)
return hex.EncodeToString(sum[:])
}
Loading
Loading