Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 126 additions & 47 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"database/sql"
"encoding/json"
"flag"
"errors"
"fmt"
"log"
"log/slog"
"net"
"net/http"
"os"
"os/signal"
Expand Down Expand Up @@ -93,14 +95,10 @@ var (
adminUIDir = flag.String("admin-ui-dir", "", "Path to admin UI static assets directory (overrides ADMIN_UI_DIR env var). Leave empty to use the path in admin/config.yaml")
)

// buildEngine creates the workflow engine with all handlers registered and built from config.
func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.StdEngine, *dynamic.Loader, *dynamic.ComponentRegistry, error) {
app := modular.NewStdApplication(nil, logger)
engine := workflow.NewStdEngine(app, logger)

// Load all engine plugins — each registers its module factories, step factories,
// trigger factories, and workflow handlers via engine.LoadPlugin.
plugins := []plugin.EnginePlugin{
// defaultEnginePlugins returns the standard set of engine plugins used by all engine instances.
// Centralising the list here avoids duplication between buildEngine and runMultiWorkflow.
func defaultEnginePlugins() []plugin.EnginePlugin {
return []plugin.EnginePlugin{
pluginlicense.New(),
pluginhttp.New(),
pluginobs.New(),
Expand All @@ -119,7 +117,16 @@ func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.Std
pluginai.New(),
pluginplatform.New(),
}
for _, p := range plugins {
}

// buildEngine creates the workflow engine with all handlers registered and built from config.
func buildEngine(cfg *config.WorkflowConfig, logger *slog.Logger) (*workflow.StdEngine, *dynamic.Loader, *dynamic.ComponentRegistry, error) {
app := modular.NewStdApplication(nil, logger)
engine := workflow.NewStdEngine(app, logger)

// Load all engine plugins — each registers its module factories, step factories,
// trigger factories, and workflow handlers via engine.LoadPlugin.
for _, p := range defaultEnginePlugins() {
if err := engine.LoadPlugin(p); err != nil {
log.Fatalf("Failed to load plugin %s: %v", p.Name(), err)
}
Expand Down Expand Up @@ -1316,8 +1323,11 @@ func runMultiWorkflow(logger *slog.Logger) error {
logger.Info("Database migrations applied")

// 3. Bootstrap admin user if credentials provided
var adminUserID uuid.UUID
if *adminEmail != "" && *adminPassword != "" {
if err := bootstrapAdmin(ctx, pg.Users(), *adminEmail, *adminPassword, logger); err != nil {
var err error
adminUserID, err = bootstrapAdmin(ctx, pg.Users(), *adminEmail, *adminPassword, logger)
if err != nil {
return fmt.Errorf("bootstrap admin: %w", err)
}
}
Expand All @@ -1326,26 +1336,7 @@ func runMultiWorkflow(logger *slog.Logger) error {
engineBuilder := func(cfg *config.WorkflowConfig, l *slog.Logger) (*workflow.StdEngine, modular.Application, error) {
app := modular.NewStdApplication(nil, l)
engine := workflow.NewStdEngine(app, l)
plugins := []plugin.EnginePlugin{
pluginlicense.New(),
pluginhttp.New(),
pluginobs.New(),
pluginmessaging.New(),
pluginsm.New(),
pluginauth.New(),
pluginstorage.New(),
pluginapi.New(),
pluginpipeline.New(),
plugincicd.New(),
pluginff.New(),
pluginsecrets.New(),
pluginmodcompat.New(),
pluginscheduler.New(),
pluginintegration.New(),
pluginai.New(),
pluginplatform.New(),
}
for _, p := range plugins {
for _, p := range defaultEnginePlugins() {
if loadErr := engine.LoadPlugin(p); loadErr != nil {
return nil, nil, fmt.Errorf("load plugin %s: %w", p.Name(), loadErr)
}
Expand All @@ -1365,7 +1356,9 @@ func runMultiWorkflow(logger *slog.Logger) error {

// 5. Seed initial workflow from -config if provided
if *configFile != "" {
if err := seedWorkflow(ctx, pg, *configFile, logger); err != nil {
if adminUserID == uuid.Nil {
logger.Warn("Skipping workflow seed: -admin-email and -admin-password are required for seeding")
} else if err := seedWorkflow(ctx, pg, *configFile, adminUserID, logger); err != nil {
logger.Warn("Failed to seed workflow from config", "file", *configFile, "error", err)
}
}
Expand All @@ -1374,7 +1367,7 @@ func runMultiWorkflow(logger *slog.Logger) error {
secret := envOrFlag("JWT_SECRET", jwtSecret)
if secret == "" {
secret = "dev-secret-change-me"
logger.Warn("No JWT secret configured — using insecure default")
logger.Error("No JWT secret configured — using insecure default; set JWT_SECRET env var or -jwt-secret flag")
}
stores := workflowapi.Stores{
Users: pg.Users(),
Expand All @@ -1397,7 +1390,7 @@ func runMultiWorkflow(logger *slog.Logger) error {
}
apiRouter := workflowapi.NewRouter(stores, apiCfg)

// 7. Also set up single-config admin infrastructure
// 7. Set up admin UI and management infrastructure for workflow management
singleCfg, err := loadConfig(logger)
if err != nil {
return fmt.Errorf("load config: %w", err)
Expand Down Expand Up @@ -1435,22 +1428,34 @@ func runMultiWorkflow(logger *slog.Logger) error {
}
}

// Start API server
// Start API server; propagate failures back so we can initiate shutdown.
srvErrCh := make(chan error, 1)
go func() {
logger.Info("Multi-workflow API listening", "addr", *addr)
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error("API server error", "error", err)
srvErrCh <- err
}
}()

fmt.Printf("Multi-workflow API on http://localhost%s/api/v1/\n", *addr)
// Build display address: if the host part is empty or 0.0.0.0/::/[::], use "localhost".
displayAddr := *addr
if host, port, splitErr := net.SplitHostPort(*addr); splitErr == nil &&
(host == "" || host == "0.0.0.0" || host == "::" || host == "[::]") {
displayAddr = ":" + port
}
fmt.Printf("Multi-workflow API on http://localhost%s/api/v1/\n", displayAddr)
fmt.Println("Admin UI on http://localhost:8081")

// Wait for termination signal
// Wait for termination signal or server failure.
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
fmt.Println("Shutting down...")
select {
case <-sigCh:
fmt.Println("Shutting down...")
case <-srvErrCh:
logger.Error("API server failed; initiating shutdown")
}
cancel()

// Graceful shutdown
Expand All @@ -1471,16 +1476,20 @@ func runMultiWorkflow(logger *slog.Logger) error {
}

// bootstrapAdmin creates an admin user if one doesn't already exist.
func bootstrapAdmin(ctx context.Context, users evstore.UserStore, email, password string, logger *slog.Logger) error {
// It returns the admin user's UUID so callers can associate resources with them.
func bootstrapAdmin(ctx context.Context, users evstore.UserStore, email, password string, logger *slog.Logger) (uuid.UUID, error) {
existing, err := users.GetByEmail(ctx, email)
if err != nil && !errors.Is(err, evstore.ErrNotFound) {
return uuid.Nil, fmt.Errorf("check existing admin: %w", err)
}
if err == nil && existing != nil {
logger.Info("Admin user already exists", "email", email)
return nil
return existing.ID, nil
}

hash, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost)
if err != nil {
return fmt.Errorf("hash password: %w", err)
return uuid.Nil, fmt.Errorf("hash password: %w", err)
}
now := time.Now()
admin := &evstore.User{
Expand All @@ -1493,14 +1502,73 @@ func bootstrapAdmin(ctx context.Context, users evstore.UserStore, email, passwor
UpdatedAt: now,
}
if err := users.Create(ctx, admin); err != nil {
return fmt.Errorf("create admin user: %w", err)
return uuid.Nil, fmt.Errorf("create admin user: %w", err)
}
logger.Info("Bootstrapped admin user", "email", email)
return nil
return admin.ID, nil
}

// slugify converts a string into a URL-friendly slug: lowercase, ASCII alphanumeric
// characters and hyphens only, with consecutive hyphens collapsed and leading/trailing
// hyphens trimmed.
func slugify(s string) string {
var b strings.Builder
for _, r := range strings.ToLower(s) {
if (r >= 'a' && r <= 'z') || (r >= '0' && r <= '9') || r == '-' {
b.WriteRune(r)
} else {
b.WriteRune('-')
}
}
result := b.String()
for strings.Contains(result, "--") {
result = strings.ReplaceAll(result, "--", "-")
}
return strings.Trim(result, "-")
}

// ensureSystemProject finds or creates the "system" company and "default" project
// used to associate seed workflows with the required database entities.
func ensureSystemProject(ctx context.Context, pg *evstore.PGStore, ownerID uuid.UUID) (*evstore.Project, error) {
const companySlug = "system"
const projectSlug = "default"

company, err := pg.Companies().GetBySlug(ctx, companySlug)
if errors.Is(err, evstore.ErrNotFound) {
company = &evstore.Company{Name: "System", Slug: companySlug, OwnerID: ownerID}
if createErr := pg.Companies().Create(ctx, company); createErr != nil {
if !errors.Is(createErr, evstore.ErrDuplicate) {
return nil, fmt.Errorf("create system company: %w", createErr)
}
// Another process created it concurrently; fetch it.
if company, err = pg.Companies().GetBySlug(ctx, companySlug); err != nil {
return nil, fmt.Errorf("get system company: %w", err)
}
}
} else if err != nil {
return nil, fmt.Errorf("get system company: %w", err)
}

project, err := pg.Projects().GetBySlug(ctx, company.ID, projectSlug)
if errors.Is(err, evstore.ErrNotFound) {
project = &evstore.Project{CompanyID: company.ID, Name: "Default", Slug: projectSlug}
if createErr := pg.Projects().Create(ctx, project); createErr != nil {
if !errors.Is(createErr, evstore.ErrDuplicate) {
return nil, fmt.Errorf("create default project: %w", createErr)
}
if project, err = pg.Projects().GetBySlug(ctx, company.ID, projectSlug); err != nil {
return nil, fmt.Errorf("get default project: %w", err)
}
}
} else if err != nil {
return nil, fmt.Errorf("get default project: %w", err)
}

return project, nil
}

// seedWorkflow imports a YAML config as the initial workflow into the database.
func seedWorkflow(ctx context.Context, pg *evstore.PGStore, configPath string, logger *slog.Logger) error {
func seedWorkflow(ctx context.Context, pg *evstore.PGStore, configPath string, adminUserID uuid.UUID, logger *slog.Logger) error {
// Validate the config is loadable
if _, err := config.LoadFromFile(configPath); err != nil {
return fmt.Errorf("load config file: %w", err)
Expand All @@ -1513,26 +1581,37 @@ func seedWorkflow(ctx context.Context, pg *evstore.PGStore, configPath string, l

name := filepath.Base(configPath)
name = strings.TrimSuffix(name, filepath.Ext(name))
slug := strings.ToLower(strings.ReplaceAll(name, " ", "-"))
slug := slugify(name)

// Check if a workflow with this slug already exists in any project
existing, _ := pg.Workflows().List(ctx, evstore.WorkflowFilter{})
// Check if a workflow with this slug already exists in any project.
existing, err := pg.Workflows().List(ctx, evstore.WorkflowFilter{})
if err != nil {
return fmt.Errorf("list existing workflows: %w", err)
}
for _, wf := range existing {
if wf.Slug == slug {
logger.Info("Seed workflow already exists", "slug", slug)
return nil
}
}

project, err := ensureSystemProject(ctx, pg, adminUserID)
if err != nil {
return fmt.Errorf("ensure system project: %w", err)
}

now := time.Now()
record := &evstore.WorkflowRecord{
ID: uuid.New(),
ProjectID: project.ID,
Name: name,
Slug: slug,
Description: "Seeded from " + configPath,
ConfigYAML: string(yamlBytes),
Version: 1,
Status: evstore.WorkflowStatusDraft,
CreatedBy: adminUserID,
UpdatedBy: adminUserID,
CreatedAt: now,
UpdatedAt: now,
}
Expand Down