From f14b9fd6a2ce01a13207f97d5611ff5a32a7cf6a Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Mon, 23 Feb 2026 05:39:03 -0500 Subject: [PATCH 1/2] feat: wire multi-workflow mode into main server (#113) When -database-dsn is provided, the server starts in multi-workflow mode: - Connects to PostgreSQL and runs migrations - Creates WorkflowEngineManager for managing concurrent engines - Mounts the REST API at /api/v1/ on the same HTTP server - Bootstraps admin user when -admin-email and -admin-password are set - -config flag becomes a seed for the initial workflow - Without -database-dsn, single-config mode is preserved unchanged Closes #113 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- cmd/server/main.go | 290 ++++++++++++++++++++++++++++++--- module/command_handler_test.go | 1 - module/query_handler_test.go | 1 - plugins/api/plugin.go | 14 +- 4 files changed, 274 insertions(+), 32 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index aaf59157..f68116a0 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -14,6 +14,7 @@ import ( "path/filepath" "strings" "syscall" + "time" "github.com/CrisisTextLine/modular" "github.com/GoCodeAlone/workflow" @@ -21,6 +22,7 @@ import ( "github.com/GoCodeAlone/workflow/ai" copilotai "github.com/GoCodeAlone/workflow/ai/copilot" "github.com/GoCodeAlone/workflow/ai/llm" + workflowapi "github.com/GoCodeAlone/workflow/api" "github.com/GoCodeAlone/workflow/audit" "github.com/GoCodeAlone/workflow/billing" "github.com/GoCodeAlone/workflow/bundle" @@ -62,6 +64,7 @@ import ( "github.com/GoCodeAlone/workflow/schema" evstore "github.com/GoCodeAlone/workflow/store" "github.com/google/uuid" + "golang.org/x/crypto/bcrypt" _ "modernc.org/sqlite" ) @@ -290,6 +293,7 @@ type serviceComponents struct { type serverApp struct { engine *workflow.StdEngine engineManager *workflow.WorkflowEngineManager + pgStore *evstore.PGStore // multi-workflow mode PG connection logger *slog.Logger cleanupDirs []string // temp directories to clean up on shutdown cleanupFiles []string // temp files to clean up on shutdown @@ -1163,6 +1167,11 @@ func run(ctx context.Context, app *serverApp, listenAddr string) error { } } + // Close PG store (multi-workflow mode) + if app.pgStore != nil { + app.pgStore.Close() + } + // Clean up temp files and directories for _, f := range app.cleanupFiles { os.Remove(f) //nolint:gosec // G703: cleaning up server-managed temp files @@ -1240,32 +1249,14 @@ func main() { })) if *databaseDSN != "" { - // Multi-workflow mode + // Multi-workflow mode: PG-backed engine manager with REST API. logger.Info("Starting in multi-workflow mode") - // TODO: Once the api package is implemented, this section will: - // 1. Connect to PostgreSQL using *databaseDSN - // 2. Run database migrations - // 3. Create store instances (UserStore, CompanyStore, ProjectStore, WorkflowStore, etc.) - // 4. Bootstrap admin user if *adminEmail and *adminPassword are set (first-run) - // 5. Create WorkflowEngineManager with stores - // 6. Create api.NewRouter() with stores, *jwtSecret, and engine manager - // 7. Mount API router at /api/v1/ alongside existing routes - - // For now, log the configuration and fall through to single-config mode - logger.Info("Multi-workflow mode configured", - "database_dsn_set", *databaseDSN != "", - "jwt_secret_set", *jwtSecret != "", - "admin_email_set", *adminEmail != "", - ) - - // Suppress unused variable warnings until api package is ready - _ = databaseDSN - _ = jwtSecret - _ = adminEmail - _ = adminPassword - - logger.Warn("Multi-workflow mode requires the api package (not yet available); falling back to single-config mode") + if err := runMultiWorkflow(logger); err != nil { + log.Fatalf("Multi-workflow error: %v", err) + } + fmt.Println("Shutdown complete") + return } // Existing single-config behavior @@ -1301,6 +1292,257 @@ func main() { fmt.Println("Shutdown complete") } +// runMultiWorkflow implements multi-workflow mode: connects to PostgreSQL, +// runs migrations, creates an engine manager, mounts the REST API, and +// optionally seeds an initial workflow from -config. +func runMultiWorkflow(logger *slog.Logger) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // 1. Connect to PostgreSQL + pgCfg := evstore.PGConfig{URL: *databaseDSN} + pg, err := evstore.NewPGStore(ctx, pgCfg) + if err != nil { + return fmt.Errorf("connect to postgres: %w", err) + } + defer pg.Close() + logger.Info("Connected to PostgreSQL") + + // 2. Run database migrations + migrator := evstore.NewMigrator(pg.Pool()) + if err := migrator.Migrate(ctx); err != nil { + return fmt.Errorf("run migrations: %w", err) + } + logger.Info("Database migrations applied") + + // 3. Bootstrap admin user if credentials provided + if *adminEmail != "" && *adminPassword != "" { + if err := bootstrapAdmin(ctx, pg.Users(), *adminEmail, *adminPassword, logger); err != nil { + return fmt.Errorf("bootstrap admin: %w", err) + } + } + + // 4. Create WorkflowEngineManager + engineBuilder := func(cfg *config.WorkflowConfig, l *slog.Logger) (*workflow.StdEngine, modular.Application, error) { + app := modular.NewStdApplication(nil, l) + engine := workflow.NewStdEngine(app, l) + plugins := []plugin.EnginePlugin{ + pluginlicense.New(), + pluginhttp.New(), + pluginobs.New(), + pluginmessaging.New(), + pluginsm.New(), + pluginauth.New(), + pluginstorage.New(), + pluginapi.New(), + pluginpipeline.New(), + plugincicd.New(), + pluginff.New(), + pluginsecrets.New(), + pluginmodcompat.New(), + pluginscheduler.New(), + pluginintegration.New(), + pluginai.New(), + pluginplatform.New(), + } + for _, p := range plugins { + if loadErr := engine.LoadPlugin(p); loadErr != nil { + return nil, nil, fmt.Errorf("load plugin %s: %w", p.Name(), loadErr) + } + } + if err := engine.BuildFromConfig(cfg); err != nil { + return nil, nil, fmt.Errorf("build from config: %w", err) + } + return engine, app, nil + } + + mgr := workflow.NewWorkflowEngineManager( + pg.Workflows(), + pg.CrossWorkflowLinks(), + logger, + engineBuilder, + ) + + // 5. Seed initial workflow from -config if provided + if *configFile != "" { + if err := seedWorkflow(ctx, pg, *configFile, logger); err != nil { + logger.Warn("Failed to seed workflow from config", "file", *configFile, "error", err) + } + } + + // 6. Create API router + secret := envOrFlag("JWT_SECRET", jwtSecret) + if secret == "" { + secret = "dev-secret-change-me" + logger.Warn("No JWT secret configured — using insecure default") + } + stores := workflowapi.Stores{ + Users: pg.Users(), + Sessions: pg.Sessions(), + Companies: pg.Companies(), + Projects: pg.Projects(), + Workflows: pg.Workflows(), + Memberships: pg.Memberships(), + Links: pg.CrossWorkflowLinks(), + Executions: pg.Executions(), + Logs: pg.Logs(), + Audit: pg.Audit(), + IAM: pg.IAM(), + } + apiCfg := workflowapi.Config{ + JWTSecret: secret, + JWTIssuer: "workflow-server", + AccessTTL: 15 * time.Minute, + RefreshTTL: 7 * 24 * time.Hour, + } + apiRouter := workflowapi.NewRouter(stores, apiCfg) + + // 7. Also set up single-config admin infrastructure + singleCfg, err := loadConfig(logger) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + app, err := setup(logger, singleCfg) + if err != nil { + return fmt.Errorf("setup: %w", err) + } + app.engineManager = mgr + app.pgStore = pg + + // 8. Mount API router on the same HTTP mux + mux := http.NewServeMux() + mux.Handle("/api/v1/", apiRouter) + mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"mode":"multi-workflow","status":"ok"}`)) + })) + + srv := &http.Server{ + Addr: *addr, + Handler: mux, + ReadHeaderTimeout: 10 * time.Second, + } + + // Start admin engine (background — handles admin UI on :8081) + if app.engine != nil { + if err := app.engine.Start(ctx); err != nil { + return fmt.Errorf("start admin engine: %w", err) + } + } + for _, fn := range app.postStartFuncs { + if err := fn(); err != nil { + logger.Warn("Post-start hook failed", "error", err) + } + } + + // Start API server + go func() { + logger.Info("Multi-workflow API listening", "addr", *addr) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Error("API server error", "error", err) + } + }() + + fmt.Printf("Multi-workflow API on http://localhost%s/api/v1/\n", *addr) + fmt.Println("Admin UI on http://localhost:8081") + + // Wait for termination signal + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + <-sigCh + fmt.Println("Shutting down...") + cancel() + + // Graceful shutdown + shutdownCtx := context.Background() + if err := mgr.StopAll(shutdownCtx); err != nil { + logger.Error("Engine manager shutdown error", "error", err) + } + if err := srv.Shutdown(shutdownCtx); err != nil { + logger.Error("API server shutdown error", "error", err) + } + if app.engine != nil { + if err := app.engine.Stop(shutdownCtx); err != nil { + logger.Error("Admin engine shutdown error", "error", err) + } + } + + return nil +} + +// bootstrapAdmin creates an admin user if one doesn't already exist. +func bootstrapAdmin(ctx context.Context, users evstore.UserStore, email, password string, logger *slog.Logger) error { + existing, err := users.GetByEmail(ctx, email) + if err == nil && existing != nil { + logger.Info("Admin user already exists", "email", email) + return nil + } + + hash, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost) + if err != nil { + return fmt.Errorf("hash password: %w", err) + } + now := time.Now() + admin := &evstore.User{ + ID: uuid.New(), + Email: email, + PasswordHash: string(hash), + DisplayName: "Admin", + Active: true, + CreatedAt: now, + UpdatedAt: now, + } + if err := users.Create(ctx, admin); err != nil { + return fmt.Errorf("create admin user: %w", err) + } + logger.Info("Bootstrapped admin user", "email", email) + return nil +} + +// seedWorkflow imports a YAML config as the initial workflow into the database. +func seedWorkflow(ctx context.Context, pg *evstore.PGStore, configPath string, logger *slog.Logger) error { + // Validate the config is loadable + if _, err := config.LoadFromFile(configPath); err != nil { + return fmt.Errorf("load config file: %w", err) + } + + yamlBytes, err := os.ReadFile(configPath) + if err != nil { + return fmt.Errorf("read config file: %w", err) + } + + name := filepath.Base(configPath) + name = strings.TrimSuffix(name, filepath.Ext(name)) + slug := strings.ToLower(strings.ReplaceAll(name, " ", "-")) + + // Check if a workflow with this slug already exists in any project + existing, _ := pg.Workflows().List(ctx, evstore.WorkflowFilter{}) + for _, wf := range existing { + if wf.Slug == slug { + logger.Info("Seed workflow already exists", "slug", slug) + return nil + } + } + + now := time.Now() + record := &evstore.WorkflowRecord{ + ID: uuid.New(), + Name: name, + Slug: slug, + Description: "Seeded from " + configPath, + ConfigYAML: string(yamlBytes), + Version: 1, + Status: evstore.WorkflowStatusDraft, + CreatedAt: now, + UpdatedAt: now, + } + if err := pg.Workflows().Create(ctx, record); err != nil { + return fmt.Errorf("create seed workflow: %w", err) + } + logger.Info("Seeded workflow from config", "name", name, "id", record.ID) + return nil +} + func initAIService(logger *slog.Logger, registry *dynamic.ComponentRegistry, pool *dynamic.InterpreterPool) (*ai.Service, *ai.DeployService) { svc := ai.NewService() diff --git a/module/command_handler_test.go b/module/command_handler_test.go index 7819aad4..14605036 100644 --- a/module/command_handler_test.go +++ b/module/command_handler_test.go @@ -294,4 +294,3 @@ func TestCommandHandler_RoutePipeline_TypedNil(t *testing.T) { t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code) } } - diff --git a/module/query_handler_test.go b/module/query_handler_test.go index a8243538..132ebab5 100644 --- a/module/query_handler_test.go +++ b/module/query_handler_test.go @@ -299,4 +299,3 @@ func TestQueryHandler_RoutePipeline_TypedNil(t *testing.T) { t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code) } } - diff --git a/plugins/api/plugin.go b/plugins/api/plugin.go index c3e1fa49..9ef86b44 100644 --- a/plugins/api/plugin.go +++ b/plugins/api/plugin.go @@ -100,12 +100,14 @@ func New() *Plugin { return &Plugin{ // Default constructors wrap the concrete module constructors, adapting // their return types to modular.Module via implicit interface satisfaction. - newQueryHandler: func(name string) modular.Module { return module.NewQueryHandler(name) }, - newCommandHandler: func(name string) modular.Module { return module.NewCommandHandler(name) }, - newRESTAPIHandler: func(name, resourceName string) modular.Module { return module.NewRESTAPIHandler(name, resourceName) }, - newAPIGateway: func(name string) modular.Module { return module.NewAPIGateway(name) }, - newWorkflowRegistry: func(name, storageBackend string) modular.Module { return module.NewWorkflowRegistry(name, storageBackend) }, - newDataTransformer: func(name string) modular.Module { return module.NewDataTransformer(name) }, + newQueryHandler: func(name string) modular.Module { return module.NewQueryHandler(name) }, + newCommandHandler: func(name string) modular.Module { return module.NewCommandHandler(name) }, + newRESTAPIHandler: func(name, resourceName string) modular.Module { return module.NewRESTAPIHandler(name, resourceName) }, + newAPIGateway: func(name string) modular.Module { return module.NewAPIGateway(name) }, + newWorkflowRegistry: func(name, storageBackend string) modular.Module { + return module.NewWorkflowRegistry(name, storageBackend) + }, + newDataTransformer: func(name string) modular.Module { return module.NewDataTransformer(name) }, newProcessingStep: func(name string, cfg module.ProcessingStepConfig) modular.Module { return module.NewProcessingStep(name, cfg) }, From 1c7591336a3538c172a6810392ae0661b4671292 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Mon, 23 Feb 2026 09:31:37 -0500 Subject: [PATCH 2/2] fix: address multi-workflow mode review feedback (#138) * Initial plan * fix: address all review comments for multi-workflow mode - Extract defaultEnginePlugins() to eliminate plugin list duplication - Fix bootstrapAdmin to check ErrNotFound explicitly; return admin UUID - Fix seedWorkflow: handle List error, improve slug via slugify(), populate required WorkflowRecord fields (ProjectID, CreatedBy, UpdatedBy) using new ensureSystemProject() helper - Change JWT secret logger.Warn -> logger.Error - Use srvErrCh to propagate API server failures to initiate graceful shutdown - Fix URL display using net.SplitHostPort for non-localhost bind addresses - Fix misleading comment about admin infrastructure setup Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- cmd/server/main.go | 173 +++++++++++++++++++++++++++++++++------------ 1 file changed, 126 insertions(+), 47 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index f68116a0..d91c8e14 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -5,9 +5,11 @@ import ( "database/sql" "encoding/json" "flag" + "errors" "fmt" "log" "log/slog" + "net" "net/http" "os" "os/signal" @@ -93,14 +95,10 @@ var ( adminUIDir = flag.String("admin-ui-dir", "", "Path to admin UI static assets directory (overrides ADMIN_UI_DIR env var). Leave empty to use the path in admin/config.yaml") ) -// buildEngine creates the workflow engine with all handlers registered and built from config. -func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.StdEngine, *dynamic.Loader, *dynamic.ComponentRegistry, error) { - app := modular.NewStdApplication(nil, logger) - engine := workflow.NewStdEngine(app, logger) - - // Load all engine plugins — each registers its module factories, step factories, - // trigger factories, and workflow handlers via engine.LoadPlugin. - plugins := []plugin.EnginePlugin{ +// defaultEnginePlugins returns the standard set of engine plugins used by all engine instances. +// Centralising the list here avoids duplication between buildEngine and runMultiWorkflow. +func defaultEnginePlugins() []plugin.EnginePlugin { + return []plugin.EnginePlugin{ pluginlicense.New(), pluginhttp.New(), pluginobs.New(), @@ -119,7 +117,16 @@ func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.Std pluginai.New(), pluginplatform.New(), } - for _, p := range plugins { +} + +// buildEngine creates the workflow engine with all handlers registered and built from config. +func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.StdEngine, *dynamic.Loader, *dynamic.ComponentRegistry, error) { + app := modular.NewStdApplication(nil, logger) + engine := workflow.NewStdEngine(app, logger) + + // Load all engine plugins — each registers its module factories, step factories, + // trigger factories, and workflow handlers via engine.LoadPlugin. + for _, p := range defaultEnginePlugins() { if err := engine.LoadPlugin(p); err != nil { log.Fatalf("Failed to load plugin %s: %v", p.Name(), err) } @@ -1316,8 +1323,11 @@ func runMultiWorkflow(logger *slog.Logger) error { logger.Info("Database migrations applied") // 3. Bootstrap admin user if credentials provided + var adminUserID uuid.UUID if *adminEmail != "" && *adminPassword != "" { - if err := bootstrapAdmin(ctx, pg.Users(), *adminEmail, *adminPassword, logger); err != nil { + var err error + adminUserID, err = bootstrapAdmin(ctx, pg.Users(), *adminEmail, *adminPassword, logger) + if err != nil { return fmt.Errorf("bootstrap admin: %w", err) } } @@ -1326,26 +1336,7 @@ func runMultiWorkflow(logger *slog.Logger) error { engineBuilder := func(cfg *config.WorkflowConfig, l *slog.Logger) (*workflow.StdEngine, modular.Application, error) { app := modular.NewStdApplication(nil, l) engine := workflow.NewStdEngine(app, l) - plugins := []plugin.EnginePlugin{ - pluginlicense.New(), - pluginhttp.New(), - pluginobs.New(), - pluginmessaging.New(), - pluginsm.New(), - pluginauth.New(), - pluginstorage.New(), - pluginapi.New(), - pluginpipeline.New(), - plugincicd.New(), - pluginff.New(), - pluginsecrets.New(), - pluginmodcompat.New(), - pluginscheduler.New(), - pluginintegration.New(), - pluginai.New(), - pluginplatform.New(), - } - for _, p := range plugins { + for _, p := range defaultEnginePlugins() { if loadErr := engine.LoadPlugin(p); loadErr != nil { return nil, nil, fmt.Errorf("load plugin %s: %w", p.Name(), loadErr) } @@ -1365,7 +1356,9 @@ func runMultiWorkflow(logger *slog.Logger) error { // 5. Seed initial workflow from -config if provided if *configFile != "" { - if err := seedWorkflow(ctx, pg, *configFile, logger); err != nil { + if adminUserID == uuid.Nil { + logger.Warn("Skipping workflow seed: -admin-email and -admin-password are required for seeding") + } else if err := seedWorkflow(ctx, pg, *configFile, adminUserID, logger); err != nil { logger.Warn("Failed to seed workflow from config", "file", *configFile, "error", err) } } @@ -1374,7 +1367,7 @@ func runMultiWorkflow(logger *slog.Logger) error { secret := envOrFlag("JWT_SECRET", jwtSecret) if secret == "" { secret = "dev-secret-change-me" - logger.Warn("No JWT secret configured — using insecure default") + logger.Error("No JWT secret configured — using insecure default; set JWT_SECRET env var or -jwt-secret flag") } stores := workflowapi.Stores{ Users: pg.Users(), @@ -1397,7 +1390,7 @@ func runMultiWorkflow(logger *slog.Logger) error { } apiRouter := workflowapi.NewRouter(stores, apiCfg) - // 7. Also set up single-config admin infrastructure + // 7. Set up admin UI and management infrastructure for workflow management singleCfg, err := loadConfig(logger) if err != nil { return fmt.Errorf("load config: %w", err) @@ -1435,22 +1428,34 @@ func runMultiWorkflow(logger *slog.Logger) error { } } - // Start API server + // Start API server; propagate failures back so we can initiate shutdown. + srvErrCh := make(chan error, 1) go func() { logger.Info("Multi-workflow API listening", "addr", *addr) if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { logger.Error("API server error", "error", err) + srvErrCh <- err } }() - fmt.Printf("Multi-workflow API on http://localhost%s/api/v1/\n", *addr) + // Build display address: if the host part is empty or 0.0.0.0/::/[::], use "localhost". + displayAddr := *addr + if host, port, splitErr := net.SplitHostPort(*addr); splitErr == nil && + (host == "" || host == "0.0.0.0" || host == "::" || host == "[::]") { + displayAddr = ":" + port + } + fmt.Printf("Multi-workflow API on http://localhost%s/api/v1/\n", displayAddr) fmt.Println("Admin UI on http://localhost:8081") - // Wait for termination signal + // Wait for termination signal or server failure. sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - <-sigCh - fmt.Println("Shutting down...") + select { + case <-sigCh: + fmt.Println("Shutting down...") + case <-srvErrCh: + logger.Error("API server failed; initiating shutdown") + } cancel() // Graceful shutdown @@ -1471,16 +1476,20 @@ func runMultiWorkflow(logger *slog.Logger) error { } // bootstrapAdmin creates an admin user if one doesn't already exist. -func bootstrapAdmin(ctx context.Context, users evstore.UserStore, email, password string, logger *slog.Logger) error { +// It returns the admin user's UUID so callers can associate resources with them. +func bootstrapAdmin(ctx context.Context, users evstore.UserStore, email, password string, logger *slog.Logger) (uuid.UUID, error) { existing, err := users.GetByEmail(ctx, email) + if err != nil && !errors.Is(err, evstore.ErrNotFound) { + return uuid.Nil, fmt.Errorf("check existing admin: %w", err) + } if err == nil && existing != nil { logger.Info("Admin user already exists", "email", email) - return nil + return existing.ID, nil } hash, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost) if err != nil { - return fmt.Errorf("hash password: %w", err) + return uuid.Nil, fmt.Errorf("hash password: %w", err) } now := time.Now() admin := &evstore.User{ @@ -1493,14 +1502,73 @@ func bootstrapAdmin(ctx context.Context, users evstore.UserStore, email, passwor UpdatedAt: now, } if err := users.Create(ctx, admin); err != nil { - return fmt.Errorf("create admin user: %w", err) + return uuid.Nil, fmt.Errorf("create admin user: %w", err) } logger.Info("Bootstrapped admin user", "email", email) - return nil + return admin.ID, nil +} + +// slugify converts a string into a URL-friendly slug: lowercase, ASCII alphanumeric +// characters and hyphens only, with consecutive hyphens collapsed and leading/trailing +// hyphens trimmed. +func slugify(s string) string { + var b strings.Builder + for _, r := range strings.ToLower(s) { + if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '-' { + b.WriteRune(r) + } else { + b.WriteRune('-') + } + } + result := b.String() + for strings.Contains(result, "--") { + result = strings.ReplaceAll(result, "--", "-") + } + return strings.Trim(result, "-") +} + +// ensureSystemProject finds or creates the "system" company and "default" project +// used to associate seed workflows with the required database entities. +func ensureSystemProject(ctx context.Context, pg *evstore.PGStore, ownerID uuid.UUID) (*evstore.Project, error) { + const companySlug = "system" + const projectSlug = "default" + + company, err := pg.Companies().GetBySlug(ctx, companySlug) + if errors.Is(err, evstore.ErrNotFound) { + company = &evstore.Company{Name: "System", Slug: companySlug, OwnerID: ownerID} + if createErr := pg.Companies().Create(ctx, company); createErr != nil { + if !errors.Is(createErr, evstore.ErrDuplicate) { + return nil, fmt.Errorf("create system company: %w", createErr) + } + // Another process created it concurrently; fetch it. + if company, err = pg.Companies().GetBySlug(ctx, companySlug); err != nil { + return nil, fmt.Errorf("get system company: %w", err) + } + } + } else if err != nil { + return nil, fmt.Errorf("get system company: %w", err) + } + + project, err := pg.Projects().GetBySlug(ctx, company.ID, projectSlug) + if errors.Is(err, evstore.ErrNotFound) { + project = &evstore.Project{CompanyID: company.ID, Name: "Default", Slug: projectSlug} + if createErr := pg.Projects().Create(ctx, project); createErr != nil { + if !errors.Is(createErr, evstore.ErrDuplicate) { + return nil, fmt.Errorf("create default project: %w", createErr) + } + if project, err = pg.Projects().GetBySlug(ctx, company.ID, projectSlug); err != nil { + return nil, fmt.Errorf("get default project: %w", err) + } + } + } else if err != nil { + return nil, fmt.Errorf("get default project: %w", err) + } + + return project, nil } // seedWorkflow imports a YAML config as the initial workflow into the database. -func seedWorkflow(ctx context.Context, pg *evstore.PGStore, configPath string, logger *slog.Logger) error { +func seedWorkflow(ctx context.Context, pg *evstore.PGStore, configPath string, adminUserID uuid.UUID, logger *slog.Logger) error { // Validate the config is loadable if _, err := config.LoadFromFile(configPath); err != nil { return fmt.Errorf("load config file: %w", err) @@ -1513,10 +1581,13 @@ func seedWorkflow(ctx context.Context, pg *evstore.PGStore, configPath string, l name := filepath.Base(configPath) name = strings.TrimSuffix(name, filepath.Ext(name)) - slug := strings.ToLower(strings.ReplaceAll(name, " ", "-")) + slug := slugify(name) - // Check if a workflow with this slug already exists in any project - existing, _ := pg.Workflows().List(ctx, evstore.WorkflowFilter{}) + // Check if a workflow with this slug already exists in any project. + existing, err := pg.Workflows().List(ctx, evstore.WorkflowFilter{}) + if err != nil { + return fmt.Errorf("list existing workflows: %w", err) + } for _, wf := range existing { if wf.Slug == slug { logger.Info("Seed workflow already exists", "slug", slug) @@ -1524,15 +1595,23 @@ func seedWorkflow(ctx context.Context, pg *evstore.PGStore, configPath string, l } } + project, err := ensureSystemProject(ctx, pg, adminUserID) + if err != nil { + return fmt.Errorf("ensure system project: %w", err) + } + now := time.Now() record := &evstore.WorkflowRecord{ ID: uuid.New(), + ProjectID: project.ID, Name: name, Slug: slug, Description: "Seeded from " + configPath, ConfigYAML: string(yamlBytes), Version: 1, Status: evstore.WorkflowStatusDraft, + CreatedBy: adminUserID, + UpdatedBy: adminUserID, CreatedAt: now, UpdatedAt: now, }