Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@ node_modules/

# Built admin UI assets (generated by `make build-ui`, embedded via go:embed)
module/ui_dist/*
!module/ui_dist/.gitkeep*.db-shm
!module/ui_dist/.gitkeep
*.db-shm
*.db-wal
6 changes: 6 additions & 0 deletions api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ type Config struct {

// OAuth providers keyed by provider name (e.g. "google", "okta").
OAuthProviders map[string]*OAuthProviderConfig

// Orchestrator is an optional engine manager that is called when workflows
// are deployed or stopped via the API. When nil, only the database status
// is updated (no live engine is started or stopped).
Orchestrator WorkflowOrchestrator
}

// Stores groups all store interfaces needed by the API.
Expand Down Expand Up @@ -109,6 +114,7 @@ func NewRouterWithIAM(stores Stores, cfg Config, iamResolver *iam.IAMResolver) h

// --- Workflows ---
wfH := NewWorkflowHandler(stores.Workflows, stores.Projects, stores.Memberships, permissions)
wfH.orchestrator = cfg.Orchestrator
mux.Handle("POST /api/v1/projects/{pid}/workflows", mw.RequireAuth(http.HandlerFunc(wfH.Create)))
mux.Handle("GET /api/v1/workflows", mw.RequireAuth(http.HandlerFunc(wfH.ListAll)))
mux.Handle("GET /api/v1/projects/{pid}/workflows", mw.RequireAuth(http.HandlerFunc(wfH.ListInProject)))
Expand Down
64 changes: 50 additions & 14 deletions api/workflow_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"context"
"encoding/json"
"errors"
"net/http"
Expand All @@ -11,12 +12,21 @@ import (
"github.com/google/uuid"
)

// WorkflowOrchestrator is implemented by the multi-workflow engine manager to
// allow the API layer to start and stop live workflow engines. It is optional;
// when nil the Deploy and Stop handlers only update the database status.
type WorkflowOrchestrator interface {
DeployWorkflow(ctx context.Context, workflowID uuid.UUID) error
StopWorkflow(ctx context.Context, workflowID uuid.UUID) error
}

// WorkflowHandler handles workflow CRUD and lifecycle endpoints.
type WorkflowHandler struct {
workflows store.WorkflowStore
projects store.ProjectStore
memberships store.MembershipStore
permissions *PermissionService
workflows store.WorkflowStore
projects store.ProjectStore
memberships store.MembershipStore
permissions *PermissionService
orchestrator WorkflowOrchestrator // optional; nil when no engine manager is wired
}

// NewWorkflowHandler creates a new WorkflowHandler.
Expand Down Expand Up @@ -259,11 +269,24 @@ func (h *WorkflowHandler) Deploy(w http.ResponseWriter, r *http.Request) {
WriteError(w, http.StatusInternalServerError, "internal error")
return
}
wf.Status = store.WorkflowStatusActive
wf.UpdatedAt = time.Now()
if err := h.workflows.Update(r.Context(), wf); err != nil {
WriteError(w, http.StatusInternalServerError, "internal error")
return
if h.orchestrator != nil {
if oErr := h.orchestrator.DeployWorkflow(r.Context(), id); oErr != nil {
WriteError(w, http.StatusInternalServerError, oErr.Error())
return
}
// Re-fetch to get updated status written by the orchestrator.
wf, err = h.workflows.Get(r.Context(), id)
if err != nil {
WriteError(w, http.StatusInternalServerError, "internal error")
return
}
} else {
wf.Status = store.WorkflowStatusActive
wf.UpdatedAt = time.Now()
if err := h.workflows.Update(r.Context(), wf); err != nil {
WriteError(w, http.StatusInternalServerError, "internal error")
return
}
}
WriteJSON(w, http.StatusOK, wf)
}
Expand All @@ -284,11 +307,24 @@ func (h *WorkflowHandler) Stop(w http.ResponseWriter, r *http.Request) {
WriteError(w, http.StatusInternalServerError, "internal error")
return
}
wf.Status = store.WorkflowStatusStopped
wf.UpdatedAt = time.Now()
if err := h.workflows.Update(r.Context(), wf); err != nil {
WriteError(w, http.StatusInternalServerError, "internal error")
return
if h.orchestrator != nil {
if oErr := h.orchestrator.StopWorkflow(r.Context(), id); oErr != nil {
WriteError(w, http.StatusInternalServerError, oErr.Error())
return
}
// Re-fetch to get updated status written by the orchestrator.
wf, err = h.workflows.Get(r.Context(), id)
if err != nil {
WriteError(w, http.StatusInternalServerError, "internal error")
return
}
} else {
wf.Status = store.WorkflowStatusStopped
wf.UpdatedAt = time.Now()
if err := h.workflows.Update(r.Context(), wf); err != nil {
WriteError(w, http.StatusInternalServerError, "internal error")
return
}
}
WriteJSON(w, http.StatusOK, wf)
}
Expand Down
139 changes: 111 additions & 28 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
Expand All @@ -14,13 +15,15 @@ import (
"path/filepath"
"strings"
"syscall"
"time"

"github.com/CrisisTextLine/modular"
"github.com/GoCodeAlone/workflow"
"github.com/GoCodeAlone/workflow/admin"
"github.com/GoCodeAlone/workflow/ai"
copilotai "github.com/GoCodeAlone/workflow/ai/copilot"
"github.com/GoCodeAlone/workflow/ai/llm"
apihandler "github.com/GoCodeAlone/workflow/api"
"github.com/GoCodeAlone/workflow/audit"
"github.com/GoCodeAlone/workflow/billing"
"github.com/GoCodeAlone/workflow/bundle"
Expand Down Expand Up @@ -62,6 +65,7 @@ import (
"github.com/GoCodeAlone/workflow/schema"
evstore "github.com/GoCodeAlone/workflow/store"
"github.com/google/uuid"
"golang.org/x/crypto/bcrypt"
_ "modernc.org/sqlite"
)

Expand All @@ -74,10 +78,11 @@ var (
anthropicModel = flag.String("anthropic-model", "", "Anthropic model name")

// Multi-workflow mode flags
databaseDSN = flag.String("database-dsn", "", "PostgreSQL connection string for multi-workflow mode")
jwtSecret = flag.String("jwt-secret", "", "JWT signing secret for API authentication")
adminEmail = flag.String("admin-email", "", "Initial admin user email (first-run bootstrap)")
adminPassword = flag.String("admin-password", "", "Initial admin user password (first-run bootstrap)")
databaseDSN = flag.String("database-dsn", "", "PostgreSQL connection string for multi-workflow mode")
jwtSecret = flag.String("jwt-secret", "", "JWT signing secret for API authentication")
adminEmail = flag.String("admin-email", "", "Initial admin user email (first-run bootstrap)")
adminPassword = flag.String("admin-password", "", "Initial admin user password (first-run bootstrap)")
multiWorkflowAddr = flag.String("multi-workflow-addr", ":8090", "HTTP listen address for multi-workflow REST API")

// License flags
licenseKey = flag.String("license-key", "", "License key for the workflow engine (or set WORKFLOW_LICENSE_KEY env var)")
Expand Down Expand Up @@ -1184,43 +1189,121 @@ func main() {
}))

if *databaseDSN != "" {
// Multi-workflow mode
logger.Info("Starting in multi-workflow mode")

// TODO: Once the api package is implemented, this section will:
// 1. Connect to PostgreSQL using *databaseDSN
// 2. Run database migrations
// 3. Create store instances (UserStore, CompanyStore, ProjectStore, WorkflowStore, etc.)
// 4. Bootstrap admin user if *adminEmail and *adminPassword are set (first-run)
// 5. Create WorkflowEngineManager with stores
// 6. Create api.NewRouter() with stores, *jwtSecret, and engine manager
// 7. Mount API router at /api/v1/ alongside existing routes

// For now, log the configuration and fall through to single-config mode
logger.Info("Multi-workflow mode configured",
"database_dsn_set", *databaseDSN != "",
// Multi-workflow mode: connect to PostgreSQL, run migrations, start the
// REST API router on a dedicated port alongside the single-config engine.
logger.Info("Starting in multi-workflow mode",
"database_dsn_set", true,
"jwt_secret_set", *jwtSecret != "",
"admin_email_set", *adminEmail != "",
"api_addr", *multiWorkflowAddr,
)

// Suppress unused variable warnings until api package is ready
_ = databaseDSN
_ = jwtSecret
_ = adminEmail
_ = adminPassword
// Validate JWT secret meets minimum security requirements.
if len(*jwtSecret) < 32 {
log.Fatalf("multi-workflow mode: --jwt-secret must be at least 32 bytes (got %d)", len(*jwtSecret))
}

pgStore, pgErr := evstore.NewPGStore(context.Background(), evstore.PGConfig{URL: *databaseDSN})
if pgErr != nil {
log.Fatalf("multi-workflow mode: failed to connect to PostgreSQL: %v", pgErr)
}
migrator := evstore.NewMigrator(pgStore.Pool())
if mErr := migrator.Migrate(context.Background()); mErr != nil {
log.Fatalf("multi-workflow mode: database migration failed: %v", mErr)
}
logger.Info("multi-workflow mode: database migrations applied")

// Bootstrap admin user on first run.
if *adminEmail != "" && *adminPassword != "" {
_, lookupErr := pgStore.Users().GetByEmail(context.Background(), *adminEmail)
switch {
case errors.Is(lookupErr, evstore.ErrNotFound):
hash, hashErr := bcrypt.GenerateFromPassword([]byte(*adminPassword), bcrypt.DefaultCost)
if hashErr != nil {
log.Fatalf("multi-workflow mode: failed to hash admin password: %v", hashErr)
}
now := time.Now()
adminUser := &evstore.User{
ID: uuid.New(),
Email: *adminEmail,
PasswordHash: string(hash),
DisplayName: "Admin",
Active: true,
CreatedAt: now,
UpdatedAt: now,
}
if createErr := pgStore.Users().Create(context.Background(), adminUser); createErr != nil {
logger.Warn("multi-workflow mode: failed to create admin user (may already exist)", "error", createErr)
} else {
logger.Info("multi-workflow mode: created bootstrap admin user", "email", *adminEmail)
}
case lookupErr != nil:
log.Fatalf("multi-workflow mode: failed to look up admin user: %v", lookupErr)
default:
logger.Info("multi-workflow mode: admin user already exists, skipping bootstrap", "email", *adminEmail)
}
}

logger.Warn("Multi-workflow mode requires the api package (not yet available); falling back to single-config mode")
engineBuilder := func(cfg *config.WorkflowConfig, lg *slog.Logger) (*workflow.StdEngine, modular.Application, error) {
eng, _, _, _, buildErr := buildEngine(cfg, lg)
if buildErr != nil {
return nil, nil, buildErr
}
app := eng.GetApp()
return eng, app, nil
}
engineMgr := workflow.NewWorkflowEngineManager(pgStore.Workflows(), pgStore.CrossWorkflowLinks(), logger, engineBuilder)

apiRouter := apihandler.NewRouter(apihandler.Stores{
Users: pgStore.Users(),
Sessions: pgStore.Sessions(),
Companies: pgStore.Companies(),
Projects: pgStore.Projects(),
Workflows: pgStore.Workflows(),
Memberships: pgStore.Memberships(),
Links: pgStore.CrossWorkflowLinks(),
Executions: pgStore.Executions(),
Logs: pgStore.Logs(),
Audit: pgStore.Audit(),
IAM: pgStore.IAM(),
}, apihandler.Config{
JWTSecret: *jwtSecret,
Orchestrator: engineMgr,
})

apiServer := &http.Server{ //nolint:gosec // ReadHeaderTimeout set below
Addr: *multiWorkflowAddr,
Handler: apiRouter,
ReadHeaderTimeout: 10 * time.Second,
}
go func() {
logger.Info("multi-workflow API listening", "addr", *multiWorkflowAddr)
if sErr := apiServer.ListenAndServe(); sErr != nil && sErr != http.ErrServerClosed {
logger.Error("multi-workflow API server error", "error", sErr)
}
}()
defer func() {
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
if sErr := apiServer.Shutdown(shutdownCtx); sErr != nil {
logger.Warn("multi-workflow API server shutdown error", "error", sErr)
}
if sErr := engineMgr.StopAll(shutdownCtx); sErr != nil {
logger.Warn("multi-workflow engine manager shutdown error", "error", sErr)
}
pgStore.Close()
}()
}

// Existing single-config behavior
// Single-config mode always runs alongside multi-workflow mode (if enabled).
cfg, err := loadConfig(logger)
if err != nil {
log.Fatalf("Configuration error: %v", err)
log.Fatalf("Configuration error: %v", err) //nolint:gocritic // exitAfterDefer: intentional, cleanup is best-effort
}

app, err := setup(logger, cfg)
if err != nil {
log.Fatalf("Setup error: %v", err)
log.Fatalf("Setup error: %v", err) //nolint:gocritic // exitAfterDefer: intentional, cleanup is best-effort
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down
Loading
Loading