From 14fb1c9ae24ab1b5ed068791f5ff6783ac45055e Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Mon, 23 Feb 2026 05:55:07 -0500 Subject: [PATCH 1/2] feat: add execution/audit tables and decompose admin routes (#87) Add tests for Phase C database tables (workflow_executions, execution_steps, execution_logs, audit_log) and validate the admin config parses correctly. Tests cover: - Execution lifecycle (insert, complete, count by status) - Execution step lifecycle (insert, complete, ordering) - Execution logs (insert, count by level) - Audit log (insert, query) - Phase C table existence verification - Execution failure handling - Admin config parsing and module validation Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- admin/admin_test.go | 57 +++++++ module/api_v1_test.go | 272 +++++++++++++++++++++++++++++++++ module/command_handler_test.go | 1 - module/query_handler_test.go | 1 - plugins/api/plugin.go | 14 +- 5 files changed, 337 insertions(+), 8 deletions(-) create mode 100644 admin/admin_test.go diff --git a/admin/admin_test.go b/admin/admin_test.go new file mode 100644 index 00000000..4fac365e --- /dev/null +++ b/admin/admin_test.go @@ -0,0 +1,57 @@ +package admin + +import ( + "testing" +) + +func TestLoadConfig_Parses(t *testing.T) { + cfg, err := LoadConfig() + if err != nil { + t.Fatalf("LoadConfig: %v", err) + } + if cfg == nil { + t.Fatal("expected non-nil config") + } + if len(cfg.Modules) == 0 { + t.Error("expected at least one module in admin config") + } +} + +func TestLoadConfigRaw_NonEmpty(t *testing.T) { + raw, err := LoadConfigRaw() + if err != nil { + t.Fatalf("LoadConfigRaw: %v", err) + } + if len(raw) == 0 { + t.Error("expected non-empty raw config data") + } +} + +func TestLoadConfig_HasExpectedModules(t *testing.T) { + cfg, err := LoadConfig() + if err != nil { + t.Fatalf("LoadConfig: %v", err) + } + + moduleNames := make(map[string]bool) + for _, m := range cfg.Modules { + moduleNames[m.Name] = true + } + + required := []string{"admin-server", "admin-router", "admin-db", "admin-auth"} + for _, name := range required { + if !moduleNames[name] { + t.Errorf("expected module %q in admin config", name) + } + } +} + +func TestLoadConfig_HasWorkflows(t *testing.T) { + cfg, err := LoadConfig() + if err != nil { + t.Fatalf("LoadConfig: %v", err) + } + if len(cfg.Workflows) == 0 { + t.Error("expected at least one workflow in admin config") + } +} diff --git a/module/api_v1_test.go b/module/api_v1_test.go index edb97357..1e401aec 100644 --- a/module/api_v1_test.go +++ b/module/api_v1_test.go @@ -718,3 +718,275 @@ func TestV1Handler_Projects(t *testing.T) { t.Errorf("got %d projects, want 1", len(projects)) } } + +// --- Phase C: Execution, Step, Log, and Audit table tests --- + +func TestV1Store_ExecutionLifecycle(t *testing.T) { + store := setupTestStore(t) + + company, _ := store.CreateCompany("Co", "", "u1") + org, _ := store.CreateOrganization(company.ID, "Org", "", "u1") + proj, _ := store.CreateProject(org.ID, "Proj", "", "") + wf, _ := store.CreateWorkflow(proj.ID, "WF", "", "", "modules: []", "u1") + + now := time.Now().UTC() + execID := "exec-001" + + // Insert execution + err := store.InsertExecution(execID, wf.ID, "manual", "pending", "u1", now) + if err != nil { + t.Fatalf("InsertExecution: %v", err) + } + + // Verify via direct query + var status string + err = store.DB().QueryRow("SELECT status FROM workflow_executions WHERE id = ?", execID).Scan(&status) + if err != nil { + t.Fatalf("query execution: %v", err) + } + if status != "pending" { + t.Errorf("got status %q, want %q", status, "pending") + } + + // Complete execution + completedAt := now.Add(5 * time.Second) + err = store.CompleteExecution(execID, "completed", completedAt, 5000, "") + if err != nil { + t.Fatalf("CompleteExecution: %v", err) + } + + err = store.DB().QueryRow("SELECT status FROM workflow_executions WHERE id = ?", execID).Scan(&status) + if err != nil { + t.Fatalf("query completed execution: %v", err) + } + if status != "completed" { + t.Errorf("got status %q, want %q", status, "completed") + } + + // Count executions + counts, err := store.CountExecutionsByWorkflow(wf.ID) + if err != nil { + t.Fatalf("CountExecutionsByWorkflow: %v", err) + } + if counts["completed"] != 1 { + t.Errorf("got completed count %d, want 1", counts["completed"]) + } +} + +func TestV1Store_ExecutionStepLifecycle(t *testing.T) { + store := setupTestStore(t) + + company, _ := store.CreateCompany("Co", "", "u1") + org, _ := store.CreateOrganization(company.ID, "Org", "", "u1") + proj, _ := store.CreateProject(org.ID, "Proj", "", "") + wf, _ := store.CreateWorkflow(proj.ID, "WF", "", "", "modules: []", "u1") + + now := time.Now().UTC() + execID := "exec-step-001" + _ = store.InsertExecution(execID, wf.ID, "manual", "running", "u1", now) + + stepID := "step-001" + err := store.InsertExecutionStep(stepID, execID, "parse-request", "step.request_parse", "running", 0, now) + if err != nil { + t.Fatalf("InsertExecutionStep: %v", err) + } + + // Verify step exists + var stepName string + err = store.DB().QueryRow("SELECT step_name FROM execution_steps WHERE id = ?", stepID).Scan(&stepName) + if err != nil { + t.Fatalf("query step: %v", err) + } + if stepName != "parse-request" { + t.Errorf("got step_name %q, want %q", stepName, "parse-request") + } + + // Complete step + err = store.CompleteExecutionStep(stepID, "completed", now.Add(time.Second), 1000, "") + if err != nil { + t.Fatalf("CompleteExecutionStep: %v", err) + } + + var stepStatus string + err = store.DB().QueryRow("SELECT status FROM execution_steps WHERE id = ?", stepID).Scan(&stepStatus) + if err != nil { + t.Fatalf("query completed step: %v", err) + } + if stepStatus != "completed" { + t.Errorf("got status %q, want %q", stepStatus, "completed") + } + + // Insert a second step with higher sequence + step2ID := "step-002" + err = store.InsertExecutionStep(step2ID, execID, "db-query", "step.db_query", "running", 1, now) + if err != nil { + t.Fatalf("InsertExecutionStep(2): %v", err) + } + + // Verify ordering + rows, err := store.DB().Query("SELECT id FROM execution_steps WHERE execution_id = ? ORDER BY sequence_num ASC", execID) + if err != nil { + t.Fatalf("query steps: %v", err) + } + defer rows.Close() + var ids []string + for rows.Next() { + var id string + rows.Scan(&id) + ids = append(ids, id) + } + if len(ids) != 2 || ids[0] != stepID || ids[1] != step2ID { + t.Errorf("got step order %v, want [%s, %s]", ids, stepID, step2ID) + } +} + +func TestV1Store_ExecutionLogs(t *testing.T) { + store := setupTestStore(t) + + company, _ := store.CreateCompany("Co", "", "u1") + org, _ := store.CreateOrganization(company.ID, "Org", "", "u1") + proj, _ := store.CreateProject(org.ID, "Proj", "", "") + wf, _ := store.CreateWorkflow(proj.ID, "WF", "", "", "modules: []", "u1") + + now := time.Now().UTC() + + // Insert logs at various levels + err := store.InsertLog(wf.ID, "exec-1", "info", "Workflow started", "engine", "{}", now) + if err != nil { + t.Fatalf("InsertLog(info): %v", err) + } + err = store.InsertLog(wf.ID, "exec-1", "error", "Step failed", "handler", `{"step":"validate"}`, now.Add(time.Second)) + if err != nil { + t.Fatalf("InsertLog(error): %v", err) + } + err = store.InsertLog(wf.ID, "exec-1", "event", "user.created", "eventbus", "{}", now.Add(2*time.Second)) + if err != nil { + t.Fatalf("InsertLog(event): %v", err) + } + + // Count logs by level + counts, err := store.CountLogsByWorkflow(wf.ID) + if err != nil { + t.Fatalf("CountLogsByWorkflow: %v", err) + } + if counts["info"] != 1 { + t.Errorf("got info count %d, want 1", counts["info"]) + } + if counts["error"] != 1 { + t.Errorf("got error count %d, want 1", counts["error"]) + } + if counts["event"] != 1 { + t.Errorf("got event count %d, want 1", counts["event"]) + } + + // Verify log content + var msg string + err = store.DB().QueryRow("SELECT message FROM execution_logs WHERE level = 'error' AND workflow_id = ?", wf.ID).Scan(&msg) + if err != nil { + t.Fatalf("query error log: %v", err) + } + if msg != "Step failed" { + t.Errorf("got message %q, want %q", msg, "Step failed") + } +} + +func TestV1Store_AuditLog(t *testing.T) { + store := setupTestStore(t) + + now := time.Now().UTC().Format(time.RFC3339) + + // Insert audit entries directly (no convenience method exists) + _, err := store.DB().Exec( + "INSERT INTO audit_log (user_id, action, resource_type, resource_id, details, ip_address, user_agent, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + "user-1", "create", "workflow", "wf-123", `{"name":"test"}`, "127.0.0.1", "test-agent", now, + ) + if err != nil { + t.Fatalf("insert audit_log: %v", err) + } + + _, err = store.DB().Exec( + "INSERT INTO audit_log (user_id, action, resource_type, resource_id, details, ip_address, user_agent, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + "user-1", "deploy", "workflow", "wf-123", "{}", "127.0.0.1", "test-agent", now, + ) + if err != nil { + t.Fatalf("insert audit_log(2): %v", err) + } + + // Query audit log + var count int + err = store.DB().QueryRow("SELECT COUNT(*) FROM audit_log WHERE resource_id = ?", "wf-123").Scan(&count) + if err != nil { + t.Fatalf("count audit_log: %v", err) + } + if count != 2 { + t.Errorf("got %d audit entries, want 2", count) + } + + // Verify ordering (newest first) + var action string + err = store.DB().QueryRow("SELECT action FROM audit_log ORDER BY id DESC LIMIT 1").Scan(&action) + if err != nil { + t.Fatalf("query latest audit: %v", err) + } + if action != "deploy" { + t.Errorf("got latest action %q, want %q", action, "deploy") + } +} + +func TestV1Store_PhaseCTablesExist(t *testing.T) { + store := setupTestStore(t) + db := store.DB() + + tables := []string{ + "workflow_executions", + "execution_steps", + "execution_logs", + "audit_log", + "iam_provider_configs", + "iam_role_mappings", + } + + for _, table := range tables { + var name string + err := db.QueryRow("SELECT name FROM sqlite_master WHERE type='table' AND name=?", table).Scan(&name) + if err != nil { + t.Errorf("table %q not found: %v", table, err) + } + } +} + +func TestV1Store_ExecutionFailure(t *testing.T) { + store := setupTestStore(t) + + company, _ := store.CreateCompany("Co", "", "u1") + org, _ := store.CreateOrganization(company.ID, "Org", "", "u1") + proj, _ := store.CreateProject(org.ID, "Proj", "", "") + wf, _ := store.CreateWorkflow(proj.ID, "WF", "", "", "modules: []", "u1") + + now := time.Now().UTC() + execID := "exec-fail-001" + _ = store.InsertExecution(execID, wf.ID, "manual", "running", "u1", now) + + // Fail the execution with an error message + err := store.CompleteExecution(execID, "failed", now.Add(3*time.Second), 3000, "timeout exceeded") + if err != nil { + t.Fatalf("CompleteExecution(failed): %v", err) + } + + var status, errMsg string + err = store.DB().QueryRow("SELECT status, error_message FROM workflow_executions WHERE id = ?", execID).Scan(&status, &errMsg) + if err != nil { + t.Fatalf("query failed execution: %v", err) + } + if status != "failed" { + t.Errorf("got status %q, want %q", status, "failed") + } + if errMsg != "timeout exceeded" { + t.Errorf("got error_message %q, want %q", errMsg, "timeout exceeded") + } + + counts, _ := store.CountExecutionsByWorkflow(wf.ID) + if counts["failed"] != 1 { + t.Errorf("got failed count %d, want 1", counts["failed"]) + } +} diff --git a/module/command_handler_test.go b/module/command_handler_test.go index 7819aad4..14605036 100644 --- a/module/command_handler_test.go +++ b/module/command_handler_test.go @@ -294,4 +294,3 @@ func TestCommandHandler_RoutePipeline_TypedNil(t *testing.T) { t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code) } } - diff --git a/module/query_handler_test.go b/module/query_handler_test.go index a8243538..132ebab5 100644 --- a/module/query_handler_test.go +++ b/module/query_handler_test.go @@ -299,4 +299,3 @@ func TestQueryHandler_RoutePipeline_TypedNil(t *testing.T) { t.Errorf("expected 404 for typed-nil pipeline, got %d", rr.Code) } } - diff --git a/plugins/api/plugin.go b/plugins/api/plugin.go index c3e1fa49..9ef86b44 100644 --- a/plugins/api/plugin.go +++ b/plugins/api/plugin.go @@ -100,12 +100,14 @@ func New() *Plugin { return &Plugin{ // Default constructors wrap the concrete module constructors, adapting // their return types to modular.Module via implicit interface satisfaction. - newQueryHandler: func(name string) modular.Module { return module.NewQueryHandler(name) }, - newCommandHandler: func(name string) modular.Module { return module.NewCommandHandler(name) }, - newRESTAPIHandler: func(name, resourceName string) modular.Module { return module.NewRESTAPIHandler(name, resourceName) }, - newAPIGateway: func(name string) modular.Module { return module.NewAPIGateway(name) }, - newWorkflowRegistry: func(name, storageBackend string) modular.Module { return module.NewWorkflowRegistry(name, storageBackend) }, - newDataTransformer: func(name string) modular.Module { return module.NewDataTransformer(name) }, + newQueryHandler: func(name string) modular.Module { return module.NewQueryHandler(name) }, + newCommandHandler: func(name string) modular.Module { return module.NewCommandHandler(name) }, + newRESTAPIHandler: func(name, resourceName string) modular.Module { return module.NewRESTAPIHandler(name, resourceName) }, + newAPIGateway: func(name string) modular.Module { return module.NewAPIGateway(name) }, + newWorkflowRegistry: func(name, storageBackend string) modular.Module { + return module.NewWorkflowRegistry(name, storageBackend) + }, + newDataTransformer: func(name string) modular.Module { return module.NewDataTransformer(name) }, newProcessingStep: func(name string, cfg module.ProcessingStepConfig) modular.Module { return module.NewProcessingStep(name, cfg) }, From baa0bb8798fe3bb34eb225227eddf992fb138691 Mon Sep 17 00:00:00 2001 From: Copilot <198982749+Copilot@users.noreply.github.com> Date: Mon, 23 Feb 2026 09:31:57 -0500 Subject: [PATCH 2/2] fix: check all errors in Phase C store tests (#140) * Initial plan * fix: check all errors in Phase C store tests per review feedback Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: intel352 <77607+intel352@users.noreply.github.com> --- module/api_v1_test.go | 108 ++++++++++++++++++++++++++++++++---------- 1 file changed, 84 insertions(+), 24 deletions(-) diff --git a/module/api_v1_test.go b/module/api_v1_test.go index 1e401aec..7bf27f8e 100644 --- a/module/api_v1_test.go +++ b/module/api_v1_test.go @@ -724,16 +724,28 @@ func TestV1Handler_Projects(t *testing.T) { func TestV1Store_ExecutionLifecycle(t *testing.T) { store := setupTestStore(t) - company, _ := store.CreateCompany("Co", "", "u1") - org, _ := store.CreateOrganization(company.ID, "Org", "", "u1") - proj, _ := store.CreateProject(org.ID, "Proj", "", "") - wf, _ := store.CreateWorkflow(proj.ID, "WF", "", "", "modules: []", "u1") + company, err := store.CreateCompany("Co", "", "u1") + if err != nil { + t.Fatalf("CreateCompany: %v", err) + } + org, err := store.CreateOrganization(company.ID, "Org", "", "u1") + if err != nil { + t.Fatalf("CreateOrganization: %v", err) + } + proj, err := store.CreateProject(org.ID, "Proj", "", "") + if err != nil { + t.Fatalf("CreateProject: %v", err) + } + wf, err := store.CreateWorkflow(proj.ID, "WF", "", "", "modules: []", "u1") + if err != nil { + t.Fatalf("CreateWorkflow: %v", err) + } now := time.Now().UTC() execID := "exec-001" // Insert execution - err := store.InsertExecution(execID, wf.ID, "manual", "pending", "u1", now) + err = store.InsertExecution(execID, wf.ID, "manual", "pending", "u1", now) if err != nil { t.Fatalf("InsertExecution: %v", err) } @@ -776,17 +788,31 @@ func TestV1Store_ExecutionLifecycle(t *testing.T) { func TestV1Store_ExecutionStepLifecycle(t *testing.T) { store := setupTestStore(t) - company, _ := store.CreateCompany("Co", "", "u1") - org, _ := store.CreateOrganization(company.ID, "Org", "", "u1") - proj, _ := store.CreateProject(org.ID, "Proj", "", "") - wf, _ := store.CreateWorkflow(proj.ID, "WF", "", "", "modules: []", "u1") + company, err := store.CreateCompany("Co", "", "u1") + if err != nil { + t.Fatalf("CreateCompany: %v", err) + } + org, err := store.CreateOrganization(company.ID, "Org", "", "u1") + if err != nil { + t.Fatalf("CreateOrganization: %v", err) + } + proj, err := store.CreateProject(org.ID, "Proj", "", "") + if err != nil { + t.Fatalf("CreateProject: %v", err) + } + wf, err := store.CreateWorkflow(proj.ID, "WF", "", "", "modules: []", "u1") + if err != nil { + t.Fatalf("CreateWorkflow: %v", err) + } now := time.Now().UTC() execID := "exec-step-001" - _ = store.InsertExecution(execID, wf.ID, "manual", "running", "u1", now) + if err = store.InsertExecution(execID, wf.ID, "manual", "running", "u1", now); err != nil { + t.Fatalf("InsertExecution: %v", err) + } stepID := "step-001" - err := store.InsertExecutionStep(stepID, execID, "parse-request", "step.request_parse", "running", 0, now) + err = store.InsertExecutionStep(stepID, execID, "parse-request", "step.request_parse", "running", 0, now) if err != nil { t.Fatalf("InsertExecutionStep: %v", err) } @@ -832,9 +858,14 @@ func TestV1Store_ExecutionStepLifecycle(t *testing.T) { var ids []string for rows.Next() { var id string - rows.Scan(&id) + if err := rows.Scan(&id); err != nil { + t.Fatalf("scan step id: %v", err) + } ids = append(ids, id) } + if err := rows.Err(); err != nil { + t.Fatalf("iterate steps: %v", err) + } if len(ids) != 2 || ids[0] != stepID || ids[1] != step2ID { t.Errorf("got step order %v, want [%s, %s]", ids, stepID, step2ID) } @@ -843,15 +874,27 @@ func TestV1Store_ExecutionStepLifecycle(t *testing.T) { func TestV1Store_ExecutionLogs(t *testing.T) { store := setupTestStore(t) - company, _ := store.CreateCompany("Co", "", "u1") - org, _ := store.CreateOrganization(company.ID, "Org", "", "u1") - proj, _ := store.CreateProject(org.ID, "Proj", "", "") - wf, _ := store.CreateWorkflow(proj.ID, "WF", "", "", "modules: []", "u1") + company, err := store.CreateCompany("Co", "", "u1") + if err != nil { + t.Fatalf("CreateCompany: %v", err) + } + org, err := store.CreateOrganization(company.ID, "Org", "", "u1") + if err != nil { + t.Fatalf("CreateOrganization: %v", err) + } + proj, err := store.CreateProject(org.ID, "Proj", "", "") + if err != nil { + t.Fatalf("CreateProject: %v", err) + } + wf, err := store.CreateWorkflow(proj.ID, "WF", "", "", "modules: []", "u1") + if err != nil { + t.Fatalf("CreateWorkflow: %v", err) + } now := time.Now().UTC() // Insert logs at various levels - err := store.InsertLog(wf.ID, "exec-1", "info", "Workflow started", "engine", "{}", now) + err = store.InsertLog(wf.ID, "exec-1", "info", "Workflow started", "engine", "{}", now) if err != nil { t.Fatalf("InsertLog(info): %v", err) } @@ -958,17 +1001,31 @@ func TestV1Store_PhaseCTablesExist(t *testing.T) { func TestV1Store_ExecutionFailure(t *testing.T) { store := setupTestStore(t) - company, _ := store.CreateCompany("Co", "", "u1") - org, _ := store.CreateOrganization(company.ID, "Org", "", "u1") - proj, _ := store.CreateProject(org.ID, "Proj", "", "") - wf, _ := store.CreateWorkflow(proj.ID, "WF", "", "", "modules: []", "u1") + company, err := store.CreateCompany("Co", "", "u1") + if err != nil { + t.Fatalf("CreateCompany: %v", err) + } + org, err := store.CreateOrganization(company.ID, "Org", "", "u1") + if err != nil { + t.Fatalf("CreateOrganization: %v", err) + } + proj, err := store.CreateProject(org.ID, "Proj", "", "") + if err != nil { + t.Fatalf("CreateProject: %v", err) + } + wf, err := store.CreateWorkflow(proj.ID, "WF", "", "", "modules: []", "u1") + if err != nil { + t.Fatalf("CreateWorkflow: %v", err) + } now := time.Now().UTC() execID := "exec-fail-001" - _ = store.InsertExecution(execID, wf.ID, "manual", "running", "u1", now) + if err = store.InsertExecution(execID, wf.ID, "manual", "running", "u1", now); err != nil { + t.Fatalf("InsertExecution: %v", err) + } // Fail the execution with an error message - err := store.CompleteExecution(execID, "failed", now.Add(3*time.Second), 3000, "timeout exceeded") + err = store.CompleteExecution(execID, "failed", now.Add(3*time.Second), 3000, "timeout exceeded") if err != nil { t.Fatalf("CompleteExecution(failed): %v", err) } @@ -985,7 +1042,10 @@ func TestV1Store_ExecutionFailure(t *testing.T) { t.Errorf("got error_message %q, want %q", errMsg, "timeout exceeded") } - counts, _ := store.CountExecutionsByWorkflow(wf.ID) + counts, err := store.CountExecutionsByWorkflow(wf.ID) + if err != nil { + t.Fatalf("CountExecutionsByWorkflow: %v", err) + } if counts["failed"] != 1 { t.Errorf("got failed count %d, want 1", counts["failed"]) }