Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,6 @@ node_modules/

# Built admin UI assets (generated by `make build-ui`, embedded via go:embed)
module/ui_dist/*
!module/ui_dist/.gitkeep*.db-shm
!module/ui_dist/.gitkeep
*.db-shm
*.db-wal
6 changes: 6 additions & 0 deletions api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ type Config struct {

// OAuth providers keyed by provider name (e.g. "google", "okta").
OAuthProviders map[string]*OAuthProviderConfig

// Orchestrator is an optional engine manager that is called when workflows
// are deployed or stopped via the API. When nil, only the database status
// is updated (no live engine is started or stopped).
Orchestrator WorkflowOrchestrator
}

// Stores groups all store interfaces needed by the API.
Expand Down Expand Up @@ -109,6 +114,7 @@ func NewRouterWithIAM(stores Stores, cfg Config, iamResolver *iam.IAMResolver) h

// --- Workflows ---
wfH := NewWorkflowHandler(stores.Workflows, stores.Projects, stores.Memberships, permissions)
wfH.orchestrator = cfg.Orchestrator
mux.Handle("POST /api/v1/projects/{pid}/workflows", mw.RequireAuth(http.HandlerFunc(wfH.Create)))
mux.Handle("GET /api/v1/workflows", mw.RequireAuth(http.HandlerFunc(wfH.ListAll)))
mux.Handle("GET /api/v1/projects/{pid}/workflows", mw.RequireAuth(http.HandlerFunc(wfH.ListInProject)))
Expand Down
64 changes: 50 additions & 14 deletions api/workflow_handler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"context"
"encoding/json"
"errors"
"net/http"
Expand All @@ -11,12 +12,21 @@ import (
"github.com/google/uuid"
)

// WorkflowOrchestrator is implemented by the multi-workflow engine manager to
// allow the API layer to start and stop live workflow engines. It is optional;
// when nil the Deploy and Stop handlers only update the database status.
type WorkflowOrchestrator interface {
DeployWorkflow(ctx context.Context, workflowID uuid.UUID) error
StopWorkflow(ctx context.Context, workflowID uuid.UUID) error
}

// WorkflowHandler handles workflow CRUD and lifecycle endpoints.
type WorkflowHandler struct {
workflows store.WorkflowStore
projects store.ProjectStore
memberships store.MembershipStore
permissions *PermissionService
workflows store.WorkflowStore
projects store.ProjectStore
memberships store.MembershipStore
permissions *PermissionService
orchestrator WorkflowOrchestrator // optional; nil when no engine manager is wired
}

// NewWorkflowHandler creates a new WorkflowHandler.
Expand Down Expand Up @@ -259,11 +269,24 @@ func (h *WorkflowHandler) Deploy(w http.ResponseWriter, r *http.Request) {
WriteError(w, http.StatusInternalServerError, "internal error")
return
}
wf.Status = store.WorkflowStatusActive
wf.UpdatedAt = time.Now()
if err := h.workflows.Update(r.Context(), wf); err != nil {
WriteError(w, http.StatusInternalServerError, "internal error")
return
if h.orchestrator != nil {
if oErr := h.orchestrator.DeployWorkflow(r.Context(), id); oErr != nil {
WriteError(w, http.StatusInternalServerError, oErr.Error())
return
}
// Re-fetch to get updated status written by the orchestrator.
wf, err = h.workflows.Get(r.Context(), id)
if err != nil {
WriteError(w, http.StatusInternalServerError, "internal error")
return
}
} else {
wf.Status = store.WorkflowStatusActive
wf.UpdatedAt = time.Now()
if err := h.workflows.Update(r.Context(), wf); err != nil {
WriteError(w, http.StatusInternalServerError, "internal error")
return
}
}
WriteJSON(w, http.StatusOK, wf)
}
Expand All @@ -284,11 +307,24 @@ func (h *WorkflowHandler) Stop(w http.ResponseWriter, r *http.Request) {
WriteError(w, http.StatusInternalServerError, "internal error")
return
}
wf.Status = store.WorkflowStatusStopped
wf.UpdatedAt = time.Now()
if err := h.workflows.Update(r.Context(), wf); err != nil {
WriteError(w, http.StatusInternalServerError, "internal error")
return
if h.orchestrator != nil {
if oErr := h.orchestrator.StopWorkflow(r.Context(), id); oErr != nil {
WriteError(w, http.StatusInternalServerError, oErr.Error())
return
}
// Re-fetch to get updated status written by the orchestrator.
wf, err = h.workflows.Get(r.Context(), id)
if err != nil {
WriteError(w, http.StatusInternalServerError, "internal error")
return
}
} else {
wf.Status = store.WorkflowStatusStopped
wf.UpdatedAt = time.Now()
if err := h.workflows.Update(r.Context(), wf); err != nil {
WriteError(w, http.StatusInternalServerError, "internal error")
return
}
}
WriteJSON(w, http.StatusOK, wf)
}
Expand Down
23 changes: 18 additions & 5 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
Expand Down Expand Up @@ -1196,6 +1197,12 @@ func main() {
"admin_email_set", *adminEmail != "",
"api_addr", *multiWorkflowAddr,
)

// Validate JWT secret meets minimum security requirements.
if len(*jwtSecret) < 32 {
log.Fatalf("multi-workflow mode: --jwt-secret must be at least 32 bytes (got %d)", len(*jwtSecret))
}

pgStore, pgErr := evstore.NewPGStore(context.Background(), evstore.PGConfig{URL: *databaseDSN})
if pgErr != nil {
log.Fatalf("multi-workflow mode: failed to connect to PostgreSQL: %v", pgErr)
Expand All @@ -1209,7 +1216,8 @@ func main() {
// Bootstrap admin user on first run.
if *adminEmail != "" && *adminPassword != "" {
_, lookupErr := pgStore.Users().GetByEmail(context.Background(), *adminEmail)
if lookupErr != nil {
switch {
case errors.Is(lookupErr, evstore.ErrNotFound):
hash, hashErr := bcrypt.GenerateFromPassword([]byte(*adminPassword), bcrypt.DefaultCost)
if hashErr != nil {
log.Fatalf("multi-workflow mode: failed to hash admin password: %v", hashErr)
Expand All @@ -1229,7 +1237,9 @@ func main() {
} else {
logger.Info("multi-workflow mode: created bootstrap admin user", "email", *adminEmail)
}
} else {
case lookupErr != nil:
log.Fatalf("multi-workflow mode: failed to look up admin user: %v", lookupErr)
default:
logger.Info("multi-workflow mode: admin user already exists, skipping bootstrap", "email", *adminEmail)
}
}
Expand All @@ -1256,7 +1266,10 @@ func main() {
Logs: pgStore.Logs(),
Audit: pgStore.Audit(),
IAM: pgStore.IAM(),
}, apihandler.Config{JWTSecret: *jwtSecret})
}, apihandler.Config{
JWTSecret: *jwtSecret,
Orchestrator: engineMgr,
})

apiServer := &http.Server{ //nolint:gosec // ReadHeaderTimeout set below
Addr: *multiWorkflowAddr,
Expand All @@ -1270,7 +1283,8 @@ func main() {
}
}()
defer func() {
shutdownCtx := context.Background()
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
defer shutdownCancel()
if sErr := apiServer.Shutdown(shutdownCtx); sErr != nil {
logger.Warn("multi-workflow API server shutdown error", "error", sErr)
}
Expand All @@ -1279,7 +1293,6 @@ func main() {
}
pgStore.Close()
}()
_ = engineMgr // used via closure above
}

// Single-config mode always runs alongside multi-workflow mode (if enabled).
Expand Down