diff --git a/cmd/server/main.go b/cmd/server/main.go index aaf59157..d91c8e14 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -5,15 +5,18 @@ import ( "database/sql" "encoding/json" "flag" + "errors" "fmt" "log" "log/slog" + "net" "net/http" "os" "os/signal" "path/filepath" "strings" "syscall" + "time" "github.com/CrisisTextLine/modular" "github.com/GoCodeAlone/workflow" @@ -21,6 +24,7 @@ import ( "github.com/GoCodeAlone/workflow/ai" copilotai "github.com/GoCodeAlone/workflow/ai/copilot" "github.com/GoCodeAlone/workflow/ai/llm" + workflowapi "github.com/GoCodeAlone/workflow/api" "github.com/GoCodeAlone/workflow/audit" "github.com/GoCodeAlone/workflow/billing" "github.com/GoCodeAlone/workflow/bundle" @@ -62,6 +66,7 @@ import ( "github.com/GoCodeAlone/workflow/schema" evstore "github.com/GoCodeAlone/workflow/store" "github.com/google/uuid" + "golang.org/x/crypto/bcrypt" _ "modernc.org/sqlite" ) @@ -90,14 +95,10 @@ var ( adminUIDir = flag.String("admin-ui-dir", "", "Path to admin UI static assets directory (overrides ADMIN_UI_DIR env var). Leave empty to use the path in admin/config.yaml") ) -// buildEngine creates the workflow engine with all handlers registered and built from config. -func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.StdEngine, *dynamic.Loader, *dynamic.ComponentRegistry, error) { - app := modular.NewStdApplication(nil, logger) - engine := workflow.NewStdEngine(app, logger) - - // Load all engine plugins — each registers its module factories, step factories, - // trigger factories, and workflow handlers via engine.LoadPlugin. - plugins := []plugin.EnginePlugin{ +// defaultEnginePlugins returns the standard set of engine plugins used by all engine instances. +// Centralising the list here avoids duplication between buildEngine and runMultiWorkflow. +func defaultEnginePlugins() []plugin.EnginePlugin { + return []plugin.EnginePlugin{ pluginlicense.New(), pluginhttp.New(), pluginobs.New(), @@ -116,7 +117,16 @@ func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.Std pluginai.New(), pluginplatform.New(), } - for _, p := range plugins { +} + +// buildEngine creates the workflow engine with all handlers registered and built from config. +func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.StdEngine, *dynamic.Loader, *dynamic.ComponentRegistry, error) { + app := modular.NewStdApplication(nil, logger) + engine := workflow.NewStdEngine(app, logger) + + // Load all engine plugins — each registers its module factories, step factories, + // trigger factories, and workflow handlers via engine.LoadPlugin. + for _, p := range defaultEnginePlugins() { if err := engine.LoadPlugin(p); err != nil { log.Fatalf("Failed to load plugin %s: %v", p.Name(), err) } @@ -290,6 +300,7 @@ type serviceComponents struct { type serverApp struct { engine *workflow.StdEngine engineManager *workflow.WorkflowEngineManager + pgStore *evstore.PGStore // multi-workflow mode PG connection logger *slog.Logger cleanupDirs []string // temp directories to clean up on shutdown cleanupFiles []string // temp files to clean up on shutdown @@ -1163,6 +1174,11 @@ func run(ctx context.Context, app *serverApp, listenAddr string) error { } } + // Close PG store (multi-workflow mode) + if app.pgStore != nil { + app.pgStore.Close() + } + // Clean up temp files and directories for _, f := range app.cleanupFiles { os.Remove(f) //nolint:gosec // G703: cleaning up server-managed temp files @@ -1240,32 +1256,14 @@ func main() { })) if *databaseDSN != "" { - // Multi-workflow mode + // Multi-workflow mode: PG-backed engine manager with REST API. logger.Info("Starting in multi-workflow mode") - // TODO: Once the api package is implemented, this section will: - // 1. Connect to PostgreSQL using *databaseDSN - // 2. Run database migrations - // 3. Create store instances (UserStore, CompanyStore, ProjectStore, WorkflowStore, etc.) - // 4. Bootstrap admin user if *adminEmail and *adminPassword are set (first-run) - // 5. Create WorkflowEngineManager with stores - // 6. Create api.NewRouter() with stores, *jwtSecret, and engine manager - // 7. Mount API router at /api/v1/ alongside existing routes - - // For now, log the configuration and fall through to single-config mode - logger.Info("Multi-workflow mode configured", - "database_dsn_set", *databaseDSN != "", - "jwt_secret_set", *jwtSecret != "", - "admin_email_set", *adminEmail != "", - ) - - // Suppress unused variable warnings until api package is ready - _ = databaseDSN - _ = jwtSecret - _ = adminEmail - _ = adminPassword - - logger.Warn("Multi-workflow mode requires the api package (not yet available); falling back to single-config mode") + if err := runMultiWorkflow(logger); err != nil { + log.Fatalf("Multi-workflow error: %v", err) + } + fmt.Println("Shutdown complete") + return } // Existing single-config behavior @@ -1301,6 +1299,329 @@ func main() { fmt.Println("Shutdown complete") } +// runMultiWorkflow implements multi-workflow mode: connects to PostgreSQL, +// runs migrations, creates an engine manager, mounts the REST API, and +// optionally seeds an initial workflow from -config. +func runMultiWorkflow(logger *slog.Logger) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // 1. Connect to PostgreSQL + pgCfg := evstore.PGConfig{URL: *databaseDSN} + pg, err := evstore.NewPGStore(ctx, pgCfg) + if err != nil { + return fmt.Errorf("connect to postgres: %w", err) + } + defer pg.Close() + logger.Info("Connected to PostgreSQL") + + // 2. Run database migrations + migrator := evstore.NewMigrator(pg.Pool()) + if err := migrator.Migrate(ctx); err != nil { + return fmt.Errorf("run migrations: %w", err) + } + logger.Info("Database migrations applied") + + // 3. Bootstrap admin user if credentials provided + var adminUserID uuid.UUID + if *adminEmail != "" && *adminPassword != "" { + var err error + adminUserID, err = bootstrapAdmin(ctx, pg.Users(), *adminEmail, *adminPassword, logger) + if err != nil { + return fmt.Errorf("bootstrap admin: %w", err) + } + } + + // 4. Create WorkflowEngineManager + engineBuilder := func(cfg *config.WorkflowConfig, l *slog.Logger) (*workflow.StdEngine, modular.Application, error) { + app := modular.NewStdApplication(nil, l) + engine := workflow.NewStdEngine(app, l) + for _, p := range defaultEnginePlugins() { + if loadErr := engine.LoadPlugin(p); loadErr != nil { + return nil, nil, fmt.Errorf("load plugin %s: %w", p.Name(), loadErr) + } + } + if err := engine.BuildFromConfig(cfg); err != nil { + return nil, nil, fmt.Errorf("build from config: %w", err) + } + return engine, app, nil + } + + mgr := workflow.NewWorkflowEngineManager( + pg.Workflows(), + pg.CrossWorkflowLinks(), + logger, + engineBuilder, + ) + + // 5. Seed initial workflow from -config if provided + if *configFile != "" { + if adminUserID == uuid.Nil { + logger.Warn("Skipping workflow seed: -admin-email and -admin-password are required for seeding") + } else if err := seedWorkflow(ctx, pg, *configFile, adminUserID, logger); err != nil { + logger.Warn("Failed to seed workflow from config", "file", *configFile, "error", err) + } + } + + // 6. Create API router + secret := envOrFlag("JWT_SECRET", jwtSecret) + if secret == "" { + secret = "dev-secret-change-me" + logger.Error("No JWT secret configured — using insecure default; set JWT_SECRET env var or -jwt-secret flag") + } + stores := workflowapi.Stores{ + Users: pg.Users(), + Sessions: pg.Sessions(), + Companies: pg.Companies(), + Projects: pg.Projects(), + Workflows: pg.Workflows(), + Memberships: pg.Memberships(), + Links: pg.CrossWorkflowLinks(), + Executions: pg.Executions(), + Logs: pg.Logs(), + Audit: pg.Audit(), + IAM: pg.IAM(), + } + apiCfg := workflowapi.Config{ + JWTSecret: secret, + JWTIssuer: "workflow-server", + AccessTTL: 15 * time.Minute, + RefreshTTL: 7 * 24 * time.Hour, + } + apiRouter := workflowapi.NewRouter(stores, apiCfg) + + // 7. Set up admin UI and management infrastructure for workflow management + singleCfg, err := loadConfig(logger) + if err != nil { + return fmt.Errorf("load config: %w", err) + } + app, err := setup(logger, singleCfg) + if err != nil { + return fmt.Errorf("setup: %w", err) + } + app.engineManager = mgr + app.pgStore = pg + + // 8. Mount API router on the same HTTP mux + mux := http.NewServeMux() + mux.Handle("/api/v1/", apiRouter) + mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write([]byte(`{"mode":"multi-workflow","status":"ok"}`)) + })) + + srv := &http.Server{ + Addr: *addr, + Handler: mux, + ReadHeaderTimeout: 10 * time.Second, + } + + // Start admin engine (background — handles admin UI on :8081) + if app.engine != nil { + if err := app.engine.Start(ctx); err != nil { + return fmt.Errorf("start admin engine: %w", err) + } + } + for _, fn := range app.postStartFuncs { + if err := fn(); err != nil { + logger.Warn("Post-start hook failed", "error", err) + } + } + + // Start API server; propagate failures back so we can initiate shutdown. + srvErrCh := make(chan error, 1) + go func() { + logger.Info("Multi-workflow API listening", "addr", *addr) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Error("API server error", "error", err) + srvErrCh <- err + } + }() + + // Build display address: if the host part is empty or 0.0.0.0/::/[::], use "localhost". + displayAddr := *addr + if host, port, splitErr := net.SplitHostPort(*addr); splitErr == nil && + (host == "" || host == "0.0.0.0" || host == "::" || host == "[::]") { + displayAddr = ":" + port + } + fmt.Printf("Multi-workflow API on http://localhost%s/api/v1/\n", displayAddr) + fmt.Println("Admin UI on http://localhost:8081") + + // Wait for termination signal or server failure. + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + select { + case <-sigCh: + fmt.Println("Shutting down...") + case <-srvErrCh: + logger.Error("API server failed; initiating shutdown") + } + cancel() + + // Graceful shutdown + shutdownCtx := context.Background() + if err := mgr.StopAll(shutdownCtx); err != nil { + logger.Error("Engine manager shutdown error", "error", err) + } + if err := srv.Shutdown(shutdownCtx); err != nil { + logger.Error("API server shutdown error", "error", err) + } + if app.engine != nil { + if err := app.engine.Stop(shutdownCtx); err != nil { + logger.Error("Admin engine shutdown error", "error", err) + } + } + + return nil +} + +// bootstrapAdmin creates an admin user if one doesn't already exist. +// It returns the admin user's UUID so callers can associate resources with them. +func bootstrapAdmin(ctx context.Context, users evstore.UserStore, email, password string, logger *slog.Logger) (uuid.UUID, error) { + existing, err := users.GetByEmail(ctx, email) + if err != nil && !errors.Is(err, evstore.ErrNotFound) { + return uuid.Nil, fmt.Errorf("check existing admin: %w", err) + } + if err == nil && existing != nil { + logger.Info("Admin user already exists", "email", email) + return existing.ID, nil + } + + hash, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost) + if err != nil { + return uuid.Nil, fmt.Errorf("hash password: %w", err) + } + now := time.Now() + admin := &evstore.User{ + ID: uuid.New(), + Email: email, + PasswordHash: string(hash), + DisplayName: "Admin", + Active: true, + CreatedAt: now, + UpdatedAt: now, + } + if err := users.Create(ctx, admin); err != nil { + return uuid.Nil, fmt.Errorf("create admin user: %w", err) + } + logger.Info("Bootstrapped admin user", "email", email) + return admin.ID, nil +} + +// slugify converts a string into a URL-friendly slug: lowercase, ASCII alphanumeric +// characters and hyphens only, with consecutive hyphens collapsed and leading/trailing +// hyphens trimmed. +func slugify(s string) string { + var b strings.Builder + for _, r := range strings.ToLower(s) { + if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '-' { + b.WriteRune(r) + } else { + b.WriteRune('-') + } + } + result := b.String() + for strings.Contains(result, "--") { + result = strings.ReplaceAll(result, "--", "-") + } + return strings.Trim(result, "-") +} + +// ensureSystemProject finds or creates the "system" company and "default" project +// used to associate seed workflows with the required database entities. +func ensureSystemProject(ctx context.Context, pg *evstore.PGStore, ownerID uuid.UUID) (*evstore.Project, error) { + const companySlug = "system" + const projectSlug = "default" + + company, err := pg.Companies().GetBySlug(ctx, companySlug) + if errors.Is(err, evstore.ErrNotFound) { + company = &evstore.Company{Name: "System", Slug: companySlug, OwnerID: ownerID} + if createErr := pg.Companies().Create(ctx, company); createErr != nil { + if !errors.Is(createErr, evstore.ErrDuplicate) { + return nil, fmt.Errorf("create system company: %w", createErr) + } + // Another process created it concurrently; fetch it. + if company, err = pg.Companies().GetBySlug(ctx, companySlug); err != nil { + return nil, fmt.Errorf("get system company: %w", err) + } + } + } else if err != nil { + return nil, fmt.Errorf("get system company: %w", err) + } + + project, err := pg.Projects().GetBySlug(ctx, company.ID, projectSlug) + if errors.Is(err, evstore.ErrNotFound) { + project = &evstore.Project{CompanyID: company.ID, Name: "Default", Slug: projectSlug} + if createErr := pg.Projects().Create(ctx, project); createErr != nil { + if !errors.Is(createErr, evstore.ErrDuplicate) { + return nil, fmt.Errorf("create default project: %w", createErr) + } + if project, err = pg.Projects().GetBySlug(ctx, company.ID, projectSlug); err != nil { + return nil, fmt.Errorf("get default project: %w", err) + } + } + } else if err != nil { + return nil, fmt.Errorf("get default project: %w", err) + } + + return project, nil +} + +// seedWorkflow imports a YAML config as the initial workflow into the database. +func seedWorkflow(ctx context.Context, pg *evstore.PGStore, configPath string, adminUserID uuid.UUID, logger *slog.Logger) error { + // Validate the config is loadable + if _, err := config.LoadFromFile(configPath); err != nil { + return fmt.Errorf("load config file: %w", err) + } + + yamlBytes, err := os.ReadFile(configPath) + if err != nil { + return fmt.Errorf("read config file: %w", err) + } + + name := filepath.Base(configPath) + name = strings.TrimSuffix(name, filepath.Ext(name)) + slug := slugify(name) + + // Check if a workflow with this slug already exists in any project. + existing, err := pg.Workflows().List(ctx, evstore.WorkflowFilter{}) + if err != nil { + return fmt.Errorf("list existing workflows: %w", err) + } + for _, wf := range existing { + if wf.Slug == slug { + logger.Info("Seed workflow already exists", "slug", slug) + return nil + } + } + + project, err := ensureSystemProject(ctx, pg, adminUserID) + if err != nil { + return fmt.Errorf("ensure system project: %w", err) + } + + now := time.Now() + record := &evstore.WorkflowRecord{ + ID: uuid.New(), + ProjectID: project.ID, + Name: name, + Slug: slug, + Description: "Seeded from " + configPath, + ConfigYAML: string(yamlBytes), + Version: 1, + Status: evstore.WorkflowStatusDraft, + CreatedBy: adminUserID, + UpdatedBy: adminUserID, + CreatedAt: now, + UpdatedAt: now, + } + if err := pg.Workflows().Create(ctx, record); err != nil { + return fmt.Errorf("create seed workflow: %w", err) + } + logger.Info("Seeded workflow from config", "name", name, "id", record.ID) + return nil +} + func initAIService(logger *slog.Logger, registry *dynamic.ComponentRegistry, pool *dynamic.InterpreterPool) (*ai.Service, *ai.DeployService) { svc := ai.NewService() diff --git a/module/command_handler_test.go b/module/command_handler_test.go index 7819aad4..14605036 100644 --- a/module/command_handler_test.go +++ b/module/command_handler_test.go @@ -294,4 +294,3 @@ func TestCommandHandler_RoutePipeline_TypedNil(t *testing.T) { t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code) } } - diff --git a/module/query_handler_test.go b/module/query_handler_test.go index a8243538..132ebab5 100644 --- a/module/query_handler_test.go +++ b/module/query_handler_test.go @@ -299,4 +299,3 @@ func TestQueryHandler_RoutePipeline_TypedNil(t *testing.T) { t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code) } } - diff --git a/plugins/api/plugin.go b/plugins/api/plugin.go index c3e1fa49..9ef86b44 100644 --- a/plugins/api/plugin.go +++ b/plugins/api/plugin.go @@ -100,12 +100,14 @@ func New() *Plugin { return &Plugin{ // Default constructors wrap the concrete module constructors, adapting // their return types to modular.Module via implicit interface satisfaction. - newQueryHandler: func(name string) modular.Module { return module.NewQueryHandler(name) }, - newCommandHandler: func(name string) modular.Module { return module.NewCommandHandler(name) }, - newRESTAPIHandler: func(name, resourceName string) modular.Module { return module.NewRESTAPIHandler(name, resourceName) }, - newAPIGateway: func(name string) modular.Module { return module.NewAPIGateway(name) }, - newWorkflowRegistry: func(name, storageBackend string) modular.Module { return module.NewWorkflowRegistry(name, storageBackend) }, - newDataTransformer: func(name string) modular.Module { return module.NewDataTransformer(name) }, + newQueryHandler: func(name string) modular.Module { return module.NewQueryHandler(name) }, + newCommandHandler: func(name string) modular.Module { return module.NewCommandHandler(name) }, + newRESTAPIHandler: func(name, resourceName string) modular.Module { return module.NewRESTAPIHandler(name, resourceName) }, + newAPIGateway: func(name string) modular.Module { return module.NewAPIGateway(name) }, + newWorkflowRegistry: func(name, storageBackend string) modular.Module { + return module.NewWorkflowRegistry(name, storageBackend) + }, + newDataTransformer: func(name string) modular.Module { return module.NewDataTransformer(name) }, newProcessingStep: func(name string, cfg module.ProcessingStepConfig) modular.Module { return module.NewProcessingStep(name, cfg) },