Skip to content

chore: migrate workflows to db#818

Merged
adityachoudhari26 merged 5 commits intomainfrom
migrate-workflows-db
Feb 26, 2026
Merged

chore: migrate workflows to db#818
adityachoudhari26 merged 5 commits intomainfrom
migrate-workflows-db

Conversation

@adityachoudhari26
Copy link
Member

@adityachoudhari26 adityachoudhari26 commented Feb 26, 2026

Summary by CodeRabbit

  • New Features
    • Added a comprehensive workflow system: create/read/update/delete workflows, job templates, runs, and jobs.
    • Workflow job templates now include a workflowId field (required) to link templates to their parent workflow.
    • Workflows, templates, runs, and jobs are persisted with DB-backed storage, migration, and workspace-scoped access.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 26, 2026

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 3cf05de and 93149a0.

📒 Files selected for processing (3)
  • packages/db/drizzle/meta/0153_snapshot.json
  • packages/db/drizzle/meta/_journal.json
  • packages/db/src/schema/workflow.ts

📝 Walkthrough

Walkthrough

Adds a workflow subsystem: OpenAPI/schema change (WorkflowJobTemplate.workflowId), DB schema and migrations, sqlc-generated queries and Go DAL, DB and in-memory repository implementations, store wiring and migration, repo-backed store usage, and small manager/test adjustments.

Changes

Cohort / File(s) Summary
API Schema & Generated Models
apps/workspace-engine/oapi/openapi.json, apps/workspace-engine/oapi/spec/schemas/workflows.jsonnet, apps/workspace-engine/pkg/oapi/oapi.gen.go
Added workflowId to WorkflowJobTemplate properties and moved it into the required set (removed ref from required array).
DB Schema & Migrations
apps/workspace-engine/pkg/db/queries/schema.sql, packages/db/drizzle/0153_neat_quasimodo.sql, packages/db/drizzle/meta/_journal.json, packages/db/src/schema/workflow.ts, packages/db/src/schema/index.ts
Added four tables: workflow, workflow_job_template, workflow_run, workflow_job with FKs and cascade deletes; added drizzle/schema exports and migration journal entry.
DB Models & sqlc Mappings
apps/workspace-engine/pkg/db/models.go, apps/workspace-engine/pkg/db/sqlc.yaml
Added Go DB model types for workflows and sqlc type mappings for JSON/pg types.
SQL Queries & Generated DAL
apps/workspace-engine/pkg/db/queries/workflows.sql, apps/workspace-engine/pkg/db/workflows.sql.go
Added CRUD/List/Upsert/Delete SQL queries for workflow entities and generated Go query methods and Upsert param structs.
Repository Interfaces & In-Memory Adapters
apps/workspace-engine/pkg/workspace/store/repository/interfaces.go, .../memory/repo.go
Introduced Workflow* repository interfaces; made in-memory stores private and added public accessor methods that return repo adapters.
DB Repository Implementation & Mappers
apps/workspace-engine/pkg/workspace/store/repository/db/repo.go, .../db/workflows/mapper.go, .../db/workflows/repo.go
Wired DB repos into DBRepo; implemented mappers between db models and oapi types; added DB-backed repository implementations with UUID parsing, JSON (un)marshalling, and error wrapping.
Store Layer Wiring & Migration
apps/workspace-engine/pkg/workspace/store/store.go, .../workflows.go, .../workflow_job_templates.go, .../workflow_runs.go, .../workflow_job.go, .../repo.go
Refactored store components to use repository interfaces, added SetRepo setters, DB-backed store options (WithDBWorkflows/...Jobs), and migration code to persist legacy in-memory workflow entities.
Workflow Manager & Tests
apps/workspace-engine/pkg/workspace/workflowmanager/manager.go, manager_test.go, apps/workspace-engine/test/e2e/engine_workflow_test.go, .../creators/workflow.go, .../dbtest.go
Inserted an early Upsert of workflowRun (duplicate later), updated tests to use UUIDs and set workflowId on created templates, commented out one per-job removal assertion, and included DB workflow options in test workspace setup.
Store Repo Wiring
apps/workspace-engine/pkg/workspace/store/repository/db/repo.go, apps/workspace-engine/pkg/workspace/store/repository/db/workflows/repo.go
DBRepo now initializes and exposes workflow repositories (Workflows, WorkflowJobTemplates, WorkflowRuns, WorkflowJobs).

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant API as Workspace API
    participant Store
    participant Repo as WorkflowRepo
    participant DB

    Client->>API: Create WorkflowRun
    API->>Store: Upsert WorkflowRun
    Store->>Repo: Set(WorkflowRun)
    Repo->>DB: Upsert workflow_run row
    DB-->>Repo: Persisted row
    Repo-->>Store: Persisted WorkflowRun
    Store-->>API: Created WorkflowRun response
    API-->>Client: 201 Created
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • jsbroks

Poem

🐇
I hopped through rows and JSON streams,
Tied jobs to workflows in my dreams,
I mapped and marshaled, upserted bright,
Repos and stores now hum at night,
A tiny rabbit, schema beams!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title 'chore: migrate workflows to db' accurately and concisely summarizes the main objective of the changeset, which is moving workflow-related functionality from in-memory storage to database-backed persistence.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch migrate-workflows-db

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@adityachoudhari26 adityachoudhari26 marked this pull request as ready for review February 26, 2026 22:40
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
apps/workspace-engine/pkg/workspace/store/workflow_job.go (1)

50-58: ⚠️ Potential issue | 🟠 Major

Delete dependent jobs before deleting the workflow job.

repo.Remove(id) runs before removing w.store.Jobs children. This can fail under FK constraints and can lose expected child-delete propagation.

Suggested fix
 func (w *WorkflowJobs) Remove(ctx context.Context, id string) {
 	workflowJob, ok := w.repo.Get(id)
 	if !ok || workflowJob == nil {
 		return
 	}
 
+	jobs := w.store.Jobs.GetByWorkflowJobId(id)
+	for _, job := range jobs {
+		w.store.Jobs.Remove(ctx, job.Id)
+	}
+
 	if err := w.repo.Remove(id); err != nil {
 		log.Error("Failed to remove workflow job", "error", err)
 		return
 	}
-
-	jobs := w.store.Jobs.GetByWorkflowJobId(id)
-	for _, job := range jobs {
-		w.store.Jobs.Remove(ctx, job.Id)
-	}
 
 	w.store.changeset.RecordDelete(workflowJob)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/store/workflow_job.go` around lines 50 -
58, The repository call w.repo.Remove(id) is happening before deleting dependent
child jobs (w.store.Jobs.GetByWorkflowJobId / w.store.Jobs.Remove), which can
violate FK constraints; change the order to first fetch and remove all child
jobs via w.store.Jobs.GetByWorkflowJobId(id) and loop calling
w.store.Jobs.Remove(ctx, job.Id), checking and returning errors as they occur,
and only after all children are deleted call w.repo.Remove(id); if possible
perform both steps inside the same transaction or ensure error handling/rollback
so partial deletes do not leave inconsistent state.
apps/workspace-engine/pkg/workspace/store/workflows.go (1)

49-57: ⚠️ Potential issue | 🔴 Critical

Delete child workflow runs before deleting the workflow.

Current order removes the parent first, then children. This can fail with FK constraints and can skip downstream run/job cleanup behavior.

Suggested fix
 func (w *Workflows) Remove(ctx context.Context, id string) {
 	workflow, ok := w.repo.Get(id)
 	if !ok || workflow == nil {
 		return
 	}
 
+	workflowRuns := w.store.WorkflowRuns.GetByWorkflowId(id)
+	for _, workflowRun := range workflowRuns {
+		w.store.WorkflowRuns.Remove(ctx, workflowRun.Id)
+	}
+
 	if err := w.repo.Remove(id); err != nil {
 		log.Error("Failed to remove workflow", "error", err)
 		return
 	}
-
-	workflowRuns := w.store.WorkflowRuns.GetByWorkflowId(id)
-	for _, workflowRun := range workflowRuns {
-		w.store.WorkflowRuns.Remove(ctx, workflowRun.Id)
-	}
 
 	w.store.changeset.RecordDelete(workflow)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/store/workflows.go` around lines 49 - 57,
The delete order is reversed: currently w.repo.Remove(id) runs before deleting
child workflow runs, which can trigger FK constraint errors and skip downstream
cleanup; change the logic in the function that calls w.repo.Remove to first
fetch child runs via w.store.WorkflowRuns.GetByWorkflowId(id) and loop calling
w.store.WorkflowRuns.Remove(ctx, workflowRun.Id) for each child, ensure removal
errors are handled/logged, and only after all children are removed call
w.repo.Remove(id) to delete the parent workflow.
apps/workspace-engine/pkg/workspace/store/workflow_runs.go (1)

61-69: ⚠️ Potential issue | 🔴 Critical

Remove workflow jobs before removing the workflow run.

The run is deleted first and child workflow jobs are deleted after. This is fragile with FK constraints and can bypass intended child cleanup semantics.

Suggested fix
 func (w *WorkflowRuns) Remove(ctx context.Context, id string) {
 	workflowRun, ok := w.repo.Get(id)
 	if !ok || workflowRun == nil {
 		return
 	}
 
+	workflowJobs := w.store.WorkflowJobs.GetByWorkflowRunId(id)
+	for _, workflowJob := range workflowJobs {
+		w.store.WorkflowJobs.Remove(ctx, workflowJob.Id)
+	}
+
 	if err := w.repo.Remove(id); err != nil {
 		log.Error("Failed to remove workflow run", "error", err)
 		return
 	}
-
-	workflowJobs := w.store.WorkflowJobs.GetByWorkflowRunId(id)
-	for _, workflowJob := range workflowJobs {
-		w.store.WorkflowJobs.Remove(ctx, workflowJob.Id)
-	}
 
 	w.store.changeset.RecordDelete(workflowRun)
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/store/workflow_runs.go` around lines 61 -
69, The workflow run is being removed before its child workflow jobs, which can
break FK constraints; change the deletion order so you first fetch and delete
children using w.store.WorkflowJobs.GetByWorkflowRunId(id) and call
w.store.WorkflowJobs.Remove(ctx, workflowJob.Id) for each job, handling and
returning on any Remove error, and only after all child removals succeed call
w.repo.Remove(id); ensure you use the existing variables and methods (id, ctx,
w.store.WorkflowJobs.GetByWorkflowRunId, w.store.WorkflowJobs.Remove,
w.repo.Remove) and preserve logging for failures.
🧹 Nitpick comments (4)
apps/workspace-engine/pkg/workspace/workflowmanager/manager.go (1)

99-125: Remove duplicate workflow run upsert.

Line 124 repeats the same workflowRun upsert already done at Line 99, with no intermediate mutation.

♻️ Suggested cleanup
 	m.store.WorkflowRuns.Upsert(ctx, workflowRun)
@@
-	m.store.WorkflowRuns.Upsert(ctx, workflowRun)
-
 	for _, wfJob := range workflowJobs {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/workflowmanager/manager.go` around lines
99 - 125, There is a duplicate call to m.store.WorkflowRuns.Upsert for the same
workflowRun with no intervening changes; remove the redundant Upsert invocation
(the second one after building workflowJobs) so only the initial
m.store.WorkflowRuns.Upsert(ctx, workflowRun) remains; locate the duplicate by
searching for m.store.WorkflowRuns.Upsert and the workflowRun variable in
manager.go and delete the extra call.
apps/workspace-engine/pkg/workspace/workflowmanager/manager_test.go (1)

47-55: Populate WorkflowId in the WorkflowJobTemplate fixture.

This fixture leaves WorkflowId unset (""), so the test doesn’t exercise the new workflow-template linkage path.

Proposed test fixture update
 	Jobs: []oapi.WorkflowJobTemplate{
 		{
-			Id:   workflowJobTemplateID,
-			Name: "test-job",
-			Ref:  jobAgent1ID,
+			Id:         workflowJobTemplateID,
+			Name:       "test-job",
+			Ref:        jobAgent1ID,
+			WorkflowId: workflowID,
 			Config: map[string]any{
 				"delaySeconds": 10,
 			},
 		},
 	},
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/workflowmanager/manager_test.go` around
lines 47 - 55, The test fixture for Jobs uses an oapi.WorkflowJobTemplate with
Id workflowJobTemplateID but leaves WorkflowId empty; update the Jobs entry in
manager_test.go to set WorkflowId to the parent workflow's ID (the same ID used
for the workflow fixture) so the test exercises the workflow-template linkage
path—locate the Jobs slice where WorkflowJobTemplate is constructed (the element
with Id = workflowJobTemplateID and Ref = jobAgent1ID) and assign its WorkflowId
field to the appropriate workflow ID constant used in the test.
apps/workspace-engine/pkg/workspace/store/workflow_runs.go (1)

36-39: Avoid silent failure on GetByWorkflowId repository errors.

Returning an empty map without logging makes DB/read failures look like “no runs found,” which hides operational issues.

Suggested fix
 	runs, err := w.repo.GetByWorkflowID(workflowId)
 	if err != nil {
-		return make(map[string]*oapi.WorkflowRun)
+		log.Error("Failed to get workflow runs by workflow ID", "workflow_id", workflowId, "error", err)
+		return make(map[string]*oapi.WorkflowRun)
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/store/workflow_runs.go` around lines 36 -
39, The current handling of w.repo.GetByWorkflowID(workflowId) silently returns
an empty map on error; change the code in the method that calls
w.repo.GetByWorkflowID (in workflow_runs.go) to surface or log the repository
error instead of swallowing it: either return the error up the call stack
(propagate the error from the surrounding function) or call the appropriate
logger (e.g., processLogger or w.logger) to record the error before returning a
nil/empty result; locate the call to w.repo.GetByWorkflowID(workflowId) and
replace the current silent-return branch so it includes error propagation or a
clear error log message that contains err and workflowId for debugging.
apps/workspace-engine/pkg/workspace/store/workflow_job.go (1)

64-67: Log repository errors in GetByWorkflowRunId.

Returning nil on error without logging masks data-access failures and complicates debugging.

Suggested fix
 	wfJobs, err := w.repo.GetByWorkflowRunID(workflowRunId)
 	if err != nil {
+		log.Error("Failed to get workflow jobs by workflow run ID", "workflow_run_id", workflowRunId, "error", err)
 		return nil
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/store/workflow_job.go` around lines 64 -
67, When w.repo.GetByWorkflowRunID(workflowRunId) returns an error, log that
error before returning to avoid swallowing data-access failures; modify the err
!= nil branch in the function that calls w.repo.GetByWorkflowRunID to call the
component logger (e.g., w.logger.Errorf("GetByWorkflowRunID(%s) error: %v",
workflowRunId, err) or w.log.Error/Infof depending on the logger in this type)
and then return the existing nil result (or wrap/return the error if the
function signature allows) so repository errors are visible in logs; keep
references to w.repo and GetByWorkflowRunID and the workflowRunId parameter when
forming the log message.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/workspace-engine/pkg/db/models.go`:
- Around line 318-326: The generated DB model WorkflowJobTemplate is out of sync
with the SQL (it defines WorkspaceID but the SQL and table use workflow_id),
which breaks UpsertWorkflowJobTemplate and GetWorkflowJobTemplateByID scanning
and the mapper that needs workflowId; regenerate sqlc outputs so the struct and
generated query code match the schema (workflow_job_template) and queries:
re-run sqlc code generation to update WorkflowJobTemplate to include WorkflowID
uuid.UUID (and remove/rename WorkspaceID if present), then rebuild and verify
UpsertWorkflowJobTemplate and GetWorkflowJobTemplateByID now reference
workflow_id and that the mapper can populate workflowId in API responses.

In `@apps/workspace-engine/pkg/db/queries/schema.sql`:
- Around line 252-282: The foreign key columns lack indexes which will hurt
list/query and cascade performance; add explicit B-tree indexes on
workflow.workspace_id, workflow_job_template.workflow_id,
workflow_run.workflow_id, and workflow_job.workflow_run_id (i.e., create indexes
for each FK column) so lookups and cascading deletes are efficient and the SQLC
schema reflects those indexes.

In `@apps/workspace-engine/pkg/db/queries/workflows.sql`:
- Around line 8-10: The upsert currently allows changing workspace ownership by
including workspace_id in the DO UPDATE SET list; update the UpsertWorkflow SQL
(the INSERT INTO workflow ... ON CONFLICT (id) DO UPDATE ...) to stop updating
workspace_id on conflict so existing rows keep their original workspace. Edit
the query used by UpsertWorkflow to remove workspace_id from the SET clause (or
otherwise ensure the DO UPDATE does not assign EXCLUDED.workspace_id), leaving
only name, inputs, jobs (and optionally add a WHERE clause to guard workspace_id
equality if you want extra safety).

In `@apps/workspace-engine/pkg/db/workflows.sql.go`:
- Around line 85-87: The query for GetWorkflowJobTemplateByID and related
read/upsert SQL (e.g., the getWorkflowJobTemplateByID constant and the other
statements around lines 359-373) omits the workflow_id column so the
WorkflowJobTemplate.WorkflowId cannot be persisted/hydrated; update the SELECT,
INSERT/UPSERT and any param lists that touch the workflow_job_template table to
include workflow_id (and the corresponding placeholder $N) so the DB round-trip
preserves the WorkflowId field, or alternatively remove WorkflowId from the API
model if the relation is intentionally unsupported. Ensure the unique
identifiers referenced are workflow_job_template table,
getWorkflowJobTemplateByID constant, and the WorkflowJobTemplate/WorkflowId
fields when making the change.

In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go`:
- Around line 13-180: Several exported mapper functions lack package-level
documentation; add clear doc comments for each exported function
(WorkflowToOapi, ToWorkflowUpsertParams, WorkflowJobTemplateToOapi,
ToWorkflowJobTemplateUpsertParams, WorkflowRunToOapi, ToWorkflowRunUpsertParams,
WorkflowJobToOapi, ToWorkflowJobUpsertParams) describing what they convert, the
expected input types and any important behavior (e.g., JSON
unmarshalling/marshalling, handling of nil/valid fields, error returns). Place a
brief single-sentence comment immediately above each function declaration
following Go doc conventions (starting with the function name) and note special
cases like when Inputs/Jobs are nil, IfCondition/Matrix handling, or when errors
are returned from uuid.Parse/json.Marshal.

In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/repo.go`:
- Around line 13-315: Add Go doc comments for each exported repository type and
their constructors: document WorkflowRepo and NewWorkflowRepo,
WorkflowJobTemplateRepo and NewWorkflowJobTemplateRepo, WorkflowRunRepo and
NewWorkflowRunRepo, and WorkflowJobRepo and NewWorkflowJobRepo. For each comment
place a short sentence above the type/constructor explaining its purpose and
usage in the package API (e.g., "WorkflowRepo provides workspace-scoped
persistence for workflows." and "NewWorkflowRepo creates a new WorkflowRepo for
the given workspace."). Ensure comments follow Go convention starting with the
symbol name and give any non-obvious context about workspaceID or ctx where
relevant.

In `@apps/workspace-engine/pkg/workspace/store/repository/memory/repo.go`:
- Around line 595-673: Add package-level doc comments for the exported accessor
methods Workflows, WorkflowJobTemplates, WorkflowRuns, and WorkflowJobs: for
each of the methods (InMemory.Workflows, InMemory.WorkflowJobTemplates,
InMemory.WorkflowRuns, InMemory.WorkflowJobs) add a brief comment above the
method that documents what repository is returned, the intended usage/context
and any non-obvious behavior (e.g., that these return in-memory adapters backed
by cmap.ConcurrentMap and are safe for concurrent use), following the project's
comment style for exported symbols.

In `@apps/workspace-engine/test/e2e/engine_workflow_test.go`:
- Around line 522-525: Restore the deleted verification loop so the cascade test
asserts job rows are removed: re-enable the commented loop that iterates over
allJobIDs and for each jobID calls engine.Workspace().Jobs().Get(jobID) and uses
assert.False(t, ok, "job %s should be removed", jobID) (or equivalent assertion)
to fail the test if any job still exists; ensure the loop references the
existing allJobIDs variable and the engine.Workspace().Jobs().Get method exactly
as in the diff.

In `@packages/db/drizzle/0153_neat_quasimodo.sql`:
- Around line 33-36: The new foreign-key columns need indexes to avoid
performance regressions; add CREATE INDEX statements for workflow.workspace_id,
workflow_run.workflow_id, workflow_job.workflow_run_id, and
workflow_job_template.workflow_id (e.g., CREATE INDEX CONCURRENTLY IF NOT EXISTS
idx_workflow_workspace_id ON public.workflow(workspace_id),
idx_workflow_run_workflow_id ON public.workflow_run(workflow_id),
idx_workflow_job_workflow_run_id ON public.workflow_job(workflow_run_id),
idx_workflow_job_template_workflow_id ON
public.workflow_job_template(workflow_id)) in the same migration so each FK
column is indexed and names are unique and descriptive.

In `@packages/db/src/schema/workflow.ts`:
- Around line 1-49: Add explicit indexes for the foreign key columns so Drizzle
generates migrations that preserve relational access patterns: add indexes for
workflow.workspaceId, workflowJobTemplate.workflowId, workflowRun.workflowId,
and workflowJob.workflowRunId by declaring appropriate index definitions for
those columns in the respective pgTable definitions (workflow,
workflow_job_template, workflow_run, workflow_job); ensure the index names are
unique and descriptive and follow existing naming conventions so migrations
include CREATE INDEX for these FK columns.

---

Outside diff comments:
In `@apps/workspace-engine/pkg/workspace/store/workflow_job.go`:
- Around line 50-58: The repository call w.repo.Remove(id) is happening before
deleting dependent child jobs (w.store.Jobs.GetByWorkflowJobId /
w.store.Jobs.Remove), which can violate FK constraints; change the order to
first fetch and remove all child jobs via w.store.Jobs.GetByWorkflowJobId(id)
and loop calling w.store.Jobs.Remove(ctx, job.Id), checking and returning errors
as they occur, and only after all children are deleted call w.repo.Remove(id);
if possible perform both steps inside the same transaction or ensure error
handling/rollback so partial deletes do not leave inconsistent state.

In `@apps/workspace-engine/pkg/workspace/store/workflow_runs.go`:
- Around line 61-69: The workflow run is being removed before its child workflow
jobs, which can break FK constraints; change the deletion order so you first
fetch and delete children using w.store.WorkflowJobs.GetByWorkflowRunId(id) and
call w.store.WorkflowJobs.Remove(ctx, workflowJob.Id) for each job, handling and
returning on any Remove error, and only after all child removals succeed call
w.repo.Remove(id); ensure you use the existing variables and methods (id, ctx,
w.store.WorkflowJobs.GetByWorkflowRunId, w.store.WorkflowJobs.Remove,
w.repo.Remove) and preserve logging for failures.

In `@apps/workspace-engine/pkg/workspace/store/workflows.go`:
- Around line 49-57: The delete order is reversed: currently w.repo.Remove(id)
runs before deleting child workflow runs, which can trigger FK constraint errors
and skip downstream cleanup; change the logic in the function that calls
w.repo.Remove to first fetch child runs via
w.store.WorkflowRuns.GetByWorkflowId(id) and loop calling
w.store.WorkflowRuns.Remove(ctx, workflowRun.Id) for each child, ensure removal
errors are handled/logged, and only after all children are removed call
w.repo.Remove(id) to delete the parent workflow.

---

Nitpick comments:
In `@apps/workspace-engine/pkg/workspace/store/workflow_job.go`:
- Around line 64-67: When w.repo.GetByWorkflowRunID(workflowRunId) returns an
error, log that error before returning to avoid swallowing data-access failures;
modify the err != nil branch in the function that calls
w.repo.GetByWorkflowRunID to call the component logger (e.g.,
w.logger.Errorf("GetByWorkflowRunID(%s) error: %v", workflowRunId, err) or
w.log.Error/Infof depending on the logger in this type) and then return the
existing nil result (or wrap/return the error if the function signature allows)
so repository errors are visible in logs; keep references to w.repo and
GetByWorkflowRunID and the workflowRunId parameter when forming the log message.

In `@apps/workspace-engine/pkg/workspace/store/workflow_runs.go`:
- Around line 36-39: The current handling of w.repo.GetByWorkflowID(workflowId)
silently returns an empty map on error; change the code in the method that calls
w.repo.GetByWorkflowID (in workflow_runs.go) to surface or log the repository
error instead of swallowing it: either return the error up the call stack
(propagate the error from the surrounding function) or call the appropriate
logger (e.g., processLogger or w.logger) to record the error before returning a
nil/empty result; locate the call to w.repo.GetByWorkflowID(workflowId) and
replace the current silent-return branch so it includes error propagation or a
clear error log message that contains err and workflowId for debugging.

In `@apps/workspace-engine/pkg/workspace/workflowmanager/manager_test.go`:
- Around line 47-55: The test fixture for Jobs uses an oapi.WorkflowJobTemplate
with Id workflowJobTemplateID but leaves WorkflowId empty; update the Jobs entry
in manager_test.go to set WorkflowId to the parent workflow's ID (the same ID
used for the workflow fixture) so the test exercises the workflow-template
linkage path—locate the Jobs slice where WorkflowJobTemplate is constructed (the
element with Id = workflowJobTemplateID and Ref = jobAgent1ID) and assign its
WorkflowId field to the appropriate workflow ID constant used in the test.

In `@apps/workspace-engine/pkg/workspace/workflowmanager/manager.go`:
- Around line 99-125: There is a duplicate call to m.store.WorkflowRuns.Upsert
for the same workflowRun with no intervening changes; remove the redundant
Upsert invocation (the second one after building workflowJobs) so only the
initial m.store.WorkflowRuns.Upsert(ctx, workflowRun) remains; locate the
duplicate by searching for m.store.WorkflowRuns.Upsert and the workflowRun
variable in manager.go and delete the extra call.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between a3d4163 and 474c596.

📒 Files selected for processing (30)
  • apps/workspace-engine/oapi/openapi.json
  • apps/workspace-engine/oapi/spec/schemas/workflows.jsonnet
  • apps/workspace-engine/pkg/db/models.go
  • apps/workspace-engine/pkg/db/queries/schema.sql
  • apps/workspace-engine/pkg/db/queries/workflows.sql
  • apps/workspace-engine/pkg/db/sqlc.yaml
  • apps/workspace-engine/pkg/db/workflows.sql.go
  • apps/workspace-engine/pkg/oapi/oapi.gen.go
  • apps/workspace-engine/pkg/workspace/store/repository/db/repo.go
  • apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go
  • apps/workspace-engine/pkg/workspace/store/repository/db/workflows/repo.go
  • apps/workspace-engine/pkg/workspace/store/repository/interfaces.go
  • apps/workspace-engine/pkg/workspace/store/repository/memory/repo.go
  • apps/workspace-engine/pkg/workspace/store/repository/repo.go
  • apps/workspace-engine/pkg/workspace/store/store.go
  • apps/workspace-engine/pkg/workspace/store/workflow_job.go
  • apps/workspace-engine/pkg/workspace/store/workflow_job_templates.go
  • apps/workspace-engine/pkg/workspace/store/workflow_runs.go
  • apps/workspace-engine/pkg/workspace/store/workflows.go
  • apps/workspace-engine/pkg/workspace/workflowmanager/manager.go
  • apps/workspace-engine/pkg/workspace/workflowmanager/manager_test.go
  • apps/workspace-engine/svc/workspaceconsumer/consumer.go
  • apps/workspace-engine/test/e2e/engine_workflow_test.go
  • apps/workspace-engine/test/integration/creators/workflow.go
  • apps/workspace-engine/test/integration/dbtest.go
  • packages/db/drizzle/0153_neat_quasimodo.sql
  • packages/db/drizzle/meta/0153_snapshot.json
  • packages/db/drizzle/meta/_journal.json
  • packages/db/src/schema/index.ts
  • packages/db/src/schema/workflow.ts

Comment on lines +252 to +282
CREATE TABLE workflow (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
inputs JSONB NOT NULL DEFAULT '[]',
jobs JSONB NOT NULL DEFAULT '[]',
workspace_id UUID NOT NULL REFERENCES workspace(id) ON DELETE CASCADE
);

CREATE TABLE workflow_job_template (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_id UUID NOT NULL REFERENCES workflow(id) ON DELETE CASCADE,
name TEXT NOT NULL,
ref TEXT NOT NULL,
config JSONB NOT NULL DEFAULT '{}',
if_condition TEXT,
matrix JSONB
);

CREATE TABLE workflow_run (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_id UUID NOT NULL REFERENCES workflow(id) ON DELETE CASCADE,
inputs JSONB NOT NULL DEFAULT '{}'
);

CREATE TABLE workflow_job (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_run_id UUID NOT NULL REFERENCES workflow_run(id) ON DELETE CASCADE,
ref TEXT NOT NULL,
config JSONB NOT NULL DEFAULT '{}',
index INTEGER NOT NULL DEFAULT 0
);
Copy link
Contributor

@coderabbitai coderabbitai bot Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add indexes on workflow foreign keys in SQLC schema.

These FK columns should be indexed to keep list/query paths and cascading cleanup performant.

📌 Suggested SQL additions
 CREATE TABLE workflow_job (
     id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
     workflow_run_id UUID NOT NULL REFERENCES workflow_run(id) ON DELETE CASCADE,
     ref TEXT NOT NULL,
     config JSONB NOT NULL DEFAULT '{}',
     index INTEGER NOT NULL DEFAULT 0
 );
+
+CREATE INDEX workflow_workspace_id_idx ON workflow(workspace_id);
+CREATE INDEX workflow_job_template_workflow_id_idx ON workflow_job_template(workflow_id);
+CREATE INDEX workflow_run_workflow_id_idx ON workflow_run(workflow_id);
+CREATE INDEX workflow_job_workflow_run_id_idx ON workflow_job(workflow_run_id);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
CREATE TABLE workflow (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
inputs JSONB NOT NULL DEFAULT '[]',
jobs JSONB NOT NULL DEFAULT '[]',
workspace_id UUID NOT NULL REFERENCES workspace(id) ON DELETE CASCADE
);
CREATE TABLE workflow_job_template (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_id UUID NOT NULL REFERENCES workflow(id) ON DELETE CASCADE,
name TEXT NOT NULL,
ref TEXT NOT NULL,
config JSONB NOT NULL DEFAULT '{}',
if_condition TEXT,
matrix JSONB
);
CREATE TABLE workflow_run (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_id UUID NOT NULL REFERENCES workflow(id) ON DELETE CASCADE,
inputs JSONB NOT NULL DEFAULT '{}'
);
CREATE TABLE workflow_job (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_run_id UUID NOT NULL REFERENCES workflow_run(id) ON DELETE CASCADE,
ref TEXT NOT NULL,
config JSONB NOT NULL DEFAULT '{}',
index INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE workflow (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
inputs JSONB NOT NULL DEFAULT '[]',
jobs JSONB NOT NULL DEFAULT '[]',
workspace_id UUID NOT NULL REFERENCES workspace(id) ON DELETE CASCADE
);
CREATE TABLE workflow_job_template (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_id UUID NOT NULL REFERENCES workflow(id) ON DELETE CASCADE,
name TEXT NOT NULL,
ref TEXT NOT NULL,
config JSONB NOT NULL DEFAULT '{}',
if_condition TEXT,
matrix JSONB
);
CREATE TABLE workflow_run (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_id UUID NOT NULL REFERENCES workflow(id) ON DELETE CASCADE,
inputs JSONB NOT NULL DEFAULT '{}'
);
CREATE TABLE workflow_job (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_run_id UUID NOT NULL REFERENCES workflow_run(id) ON DELETE CASCADE,
ref TEXT NOT NULL,
config JSONB NOT NULL DEFAULT '{}',
index INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX workflow_workspace_id_idx ON workflow(workspace_id);
CREATE INDEX workflow_job_template_workflow_id_idx ON workflow_job_template(workflow_id);
CREATE INDEX workflow_run_workflow_id_idx ON workflow_run(workflow_id);
CREATE INDEX workflow_job_workflow_run_id_idx ON workflow_job(workflow_run_id);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/db/queries/schema.sql` around lines 252 - 282, The
foreign key columns lack indexes which will hurt list/query and cascade
performance; add explicit B-tree indexes on workflow.workspace_id,
workflow_job_template.workflow_id, workflow_run.workflow_id, and
workflow_job.workflow_run_id (i.e., create indexes for each FK column) so
lookups and cascading deletes are efficient and the SQLC schema reflects those
indexes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

premature optimiziation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adityachoudhari26, understood! You can always add these indexes later if query performance becomes a concern.


✏️ Learnings added
Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 818
File: apps/workspace-engine/pkg/db/queries/schema.sql:252-282
Timestamp: 2026-02-26T23:01:19.319Z
Learning: User adityachoudhari26 prefers to defer adding database indexes on foreign keys until there's evidence they're needed, considering such additions premature optimization at the initial schema design stage.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


🧠 Learnings used
Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 668
File: apps/webservice/src/app/api/v1/deployments/[deploymentId]/variables/variable-diff-check.ts:104-134
Timestamp: 2025-09-26T01:53:05.472Z
Learning: The deployment variable value tables in the ctrlplane/db schema do not have timestamp fields like createdAt or updatedAt. The deploymentVariableValueReference table has fields: id, variableValueId, reference, path, defaultValue. The deploymentVariableValue table has fields: id, variableId, resourceSelector, priority.

Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 668
File: apps/webservice/src/app/api/v1/deployments/[deploymentId]/variables/route.ts:185-194
Timestamp: 2025-09-26T01:51:59.096Z
Learning: The deploymentVariable schema in ctrlplane/db does not contain timestamp fields like createdAt or updatedAt - it only has id, key, description, deploymentId, defaultValueId, and config fields.

Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 637
File: packages/events/src/kafka/client.ts:10-16
Timestamp: 2025-08-01T04:41:41.345Z
Learning: User adityachoudhari26 prefers not to add null safety checks for required environment variables when they are guaranteed to be present in their deployment configuration, similar to their preference for simplicity over defensive programming in test code.

Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 601
File: e2e/tests/api/policies/retry-policy.spec.ts:23-24
Timestamp: 2025-06-24T23:52:50.732Z
Learning: The user adityachoudhari26 prefers not to add null safety checks or defensive programming in test code, particularly in e2e tests, as they prioritize simplicity and focus on the main functionality being tested rather than comprehensive error handling within the test itself.

Comment on lines 13 to 180
func WorkflowToOapi(row db.Workflow) *oapi.Workflow {
var inputs []oapi.WorkflowInput
if row.Inputs != nil {
_ = json.Unmarshal(row.Inputs, &inputs)
}

var jobs []oapi.WorkflowJobTemplate
if row.Jobs != nil {
_ = json.Unmarshal(row.Jobs, &jobs)
}

return &oapi.Workflow{
Id: row.ID.String(),
Name: row.Name,
Inputs: inputs,
Jobs: jobs,
}
}

func ToWorkflowUpsertParams(workspaceID string, e *oapi.Workflow) (db.UpsertWorkflowParams, error) {
id, err := uuid.Parse(e.Id)
if err != nil {
return db.UpsertWorkflowParams{}, fmt.Errorf("parse id: %w", err)
}

wid, err := uuid.Parse(workspaceID)
if err != nil {
return db.UpsertWorkflowParams{}, fmt.Errorf("parse workspace_id: %w", err)
}

inputsBytes, err := json.Marshal(e.Inputs)
if err != nil {
return db.UpsertWorkflowParams{}, fmt.Errorf("marshal inputs: %w", err)
}

jobsBytes, err := json.Marshal(e.Jobs)
if err != nil {
return db.UpsertWorkflowParams{}, fmt.Errorf("marshal jobs: %w", err)
}

return db.UpsertWorkflowParams{
ID: id,
Name: e.Name,
Inputs: inputsBytes,
Jobs: jobsBytes,
WorkspaceID: wid,
}, nil
}

func WorkflowJobTemplateToOapi(row db.WorkflowJobTemplate) *oapi.WorkflowJobTemplate {
var ifCond *string
if row.IfCondition.Valid {
ifCond = &row.IfCondition.String
}

var matrix *oapi.WorkflowJobMatrix
if row.Matrix != nil {
m := &oapi.WorkflowJobMatrix{}
if err := json.Unmarshal(row.Matrix, m); err == nil {
matrix = m
}
}

return &oapi.WorkflowJobTemplate{
Id: row.ID.String(),
Name: row.Name,
Ref: row.Ref,
Config: row.Config,
If: ifCond,
Matrix: matrix,
}
}

func ToWorkflowJobTemplateUpsertParams(workspaceID string, e *oapi.WorkflowJobTemplate) (db.UpsertWorkflowJobTemplateParams, error) {
id, err := uuid.Parse(e.Id)
if err != nil {
return db.UpsertWorkflowJobTemplateParams{}, fmt.Errorf("parse id: %w", err)
}

wid, err := uuid.Parse(workspaceID)
if err != nil {
return db.UpsertWorkflowJobTemplateParams{}, fmt.Errorf("parse workspace_id: %w", err)
}

var ifCondition pgtype.Text
if e.If != nil {
ifCondition = pgtype.Text{String: *e.If, Valid: true}
}

var matrixBytes []byte
if e.Matrix != nil {
matrixBytes, err = json.Marshal(e.Matrix)
if err != nil {
return db.UpsertWorkflowJobTemplateParams{}, fmt.Errorf("marshal matrix: %w", err)
}
}

return db.UpsertWorkflowJobTemplateParams{
ID: id,
Name: e.Name,
Ref: e.Ref,
Config: e.Config,
IfCondition: ifCondition,
Matrix: matrixBytes,
WorkspaceID: wid,
}, nil
}

func WorkflowRunToOapi(row db.WorkflowRun) *oapi.WorkflowRun {
return &oapi.WorkflowRun{
Id: row.ID.String(),
WorkflowId: row.WorkflowID.String(),
Inputs: row.Inputs,
}
}

func ToWorkflowRunUpsertParams(e *oapi.WorkflowRun) (db.UpsertWorkflowRunParams, error) {
id, err := uuid.Parse(e.Id)
if err != nil {
return db.UpsertWorkflowRunParams{}, fmt.Errorf("parse id: %w", err)
}

wfid, err := uuid.Parse(e.WorkflowId)
if err != nil {
return db.UpsertWorkflowRunParams{}, fmt.Errorf("parse workflow_id: %w", err)
}

inputs := e.Inputs
if inputs == nil {
inputs = make(map[string]any)
}

return db.UpsertWorkflowRunParams{
ID: id,
WorkflowID: wfid,
Inputs: inputs,
}, nil
}

func WorkflowJobToOapi(row db.WorkflowJob) *oapi.WorkflowJob {
return &oapi.WorkflowJob{
Id: row.ID.String(),
WorkflowRunId: row.WorkflowRunID.String(),
Ref: row.Ref,
Config: row.Config,
Index: int(row.Index),
}
}

func ToWorkflowJobUpsertParams(e *oapi.WorkflowJob) (db.UpsertWorkflowJobParams, error) {
id, err := uuid.Parse(e.Id)
if err != nil {
return db.UpsertWorkflowJobParams{}, fmt.Errorf("parse id: %w", err)
}

wrid, err := uuid.Parse(e.WorkflowRunId)
if err != nil {
return db.UpsertWorkflowJobParams{}, fmt.Errorf("parse workflow_run_id: %w", err)
}

return db.UpsertWorkflowJobParams{
ID: id,
WorkflowRunID: wrid,
Ref: e.Ref,
Config: e.Config,
Index: int32(e.Index),
}, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Document exported mapper functions.

This file introduces several exported conversion helpers without doc comments, which makes package API intent harder to discover.

As per coding guidelines, "Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go`
around lines 13 - 180, Several exported mapper functions lack package-level
documentation; add clear doc comments for each exported function
(WorkflowToOapi, ToWorkflowUpsertParams, WorkflowJobTemplateToOapi,
ToWorkflowJobTemplateUpsertParams, WorkflowRunToOapi, ToWorkflowRunUpsertParams,
WorkflowJobToOapi, ToWorkflowJobUpsertParams) describing what they convert, the
expected input types and any important behavior (e.g., JSON
unmarshalling/marshalling, handling of nil/valid fields, error returns). Place a
brief single-sentence comment immediately above each function declaration
following Go doc conventions (starting with the function name) and note special
cases like when Inputs/Jobs are nil, IfCondition/Matrix handling, or when errors
are returned from uuid.Parse/json.Marshal.

Comment on lines 13 to 315
type WorkflowRepo struct {
ctx context.Context
workspaceID string
}

func NewWorkflowRepo(ctx context.Context, workspaceID string) *WorkflowRepo {
return &WorkflowRepo{ctx: ctx, workspaceID: workspaceID}
}

func (r *WorkflowRepo) Get(id string) (*oapi.Workflow, bool) {
uid, err := uuid.Parse(id)
if err != nil {
log.Warn("Failed to parse workflow id", "id", id, "error", err)
return nil, false
}

row, err := db.GetQueries(r.ctx).GetWorkflowByID(r.ctx, uid)
if err != nil {
return nil, false
}

return WorkflowToOapi(row), true
}

func (r *WorkflowRepo) Set(entity *oapi.Workflow) error {
params, err := ToWorkflowUpsertParams(r.workspaceID, entity)
if err != nil {
return fmt.Errorf("convert to upsert params: %w", err)
}

_, err = db.GetQueries(r.ctx).UpsertWorkflow(r.ctx, params)
if err != nil {
return fmt.Errorf("upsert workflow: %w", err)
}
return nil
}

func (r *WorkflowRepo) Remove(id string) error {
uid, err := uuid.Parse(id)
if err != nil {
return fmt.Errorf("parse id: %w", err)
}

return db.GetQueries(r.ctx).DeleteWorkflow(r.ctx, uid)
}

func (r *WorkflowRepo) Items() map[string]*oapi.Workflow {
uid, err := uuid.Parse(r.workspaceID)
if err != nil {
log.Warn("Failed to parse workspace id for Items()", "id", r.workspaceID, "error", err)
return make(map[string]*oapi.Workflow)
}

rows, err := db.GetQueries(r.ctx).ListWorkflowsByWorkspaceID(r.ctx, uid)
if err != nil {
log.Warn("Failed to list workflows by workspace", "workspaceId", r.workspaceID, "error", err)
return make(map[string]*oapi.Workflow)
}

result := make(map[string]*oapi.Workflow, len(rows))
for _, row := range rows {
w := WorkflowToOapi(row)
result[w.Id] = w
}
return result
}

type WorkflowJobTemplateRepo struct {
ctx context.Context
workspaceID string
}

func NewWorkflowJobTemplateRepo(ctx context.Context, workspaceID string) *WorkflowJobTemplateRepo {
return &WorkflowJobTemplateRepo{ctx: ctx, workspaceID: workspaceID}
}

func (r *WorkflowJobTemplateRepo) Get(id string) (*oapi.WorkflowJobTemplate, bool) {
uid, err := uuid.Parse(id)
if err != nil {
log.Warn("Failed to parse workflow job template id", "id", id, "error", err)
return nil, false
}

row, err := db.GetQueries(r.ctx).GetWorkflowJobTemplateByID(r.ctx, uid)
if err != nil {
return nil, false
}

return WorkflowJobTemplateToOapi(row), true
}

func (r *WorkflowJobTemplateRepo) Set(entity *oapi.WorkflowJobTemplate) error {
params, err := ToWorkflowJobTemplateUpsertParams(r.workspaceID, entity)
if err != nil {
return fmt.Errorf("convert to upsert params: %w", err)
}

_, err = db.GetQueries(r.ctx).UpsertWorkflowJobTemplate(r.ctx, params)
if err != nil {
return fmt.Errorf("upsert workflow job template: %w", err)
}
return nil
}

func (r *WorkflowJobTemplateRepo) Remove(id string) error {
uid, err := uuid.Parse(id)
if err != nil {
return fmt.Errorf("parse id: %w", err)
}

return db.GetQueries(r.ctx).DeleteWorkflowJobTemplate(r.ctx, uid)
}

func (r *WorkflowJobTemplateRepo) Items() map[string]*oapi.WorkflowJobTemplate {
uid, err := uuid.Parse(r.workspaceID)
if err != nil {
log.Warn("Failed to parse workspace id for Items()", "id", r.workspaceID, "error", err)
return make(map[string]*oapi.WorkflowJobTemplate)
}

rows, err := db.GetQueries(r.ctx).ListWorkflowJobTemplatesByWorkspaceID(r.ctx, uid)
if err != nil {
log.Warn("Failed to list workflow job templates by workspace", "workspaceId", r.workspaceID, "error", err)
return make(map[string]*oapi.WorkflowJobTemplate)
}

result := make(map[string]*oapi.WorkflowJobTemplate, len(rows))
for _, row := range rows {
jt := WorkflowJobTemplateToOapi(row)
result[jt.Id] = jt
}
return result
}

type WorkflowRunRepo struct {
ctx context.Context
workspaceID string
}

func NewWorkflowRunRepo(ctx context.Context, workspaceID string) *WorkflowRunRepo {
return &WorkflowRunRepo{ctx: ctx, workspaceID: workspaceID}
}

func (r *WorkflowRunRepo) Get(id string) (*oapi.WorkflowRun, bool) {
uid, err := uuid.Parse(id)
if err != nil {
log.Warn("Failed to parse workflow run id", "id", id, "error", err)
return nil, false
}

row, err := db.GetQueries(r.ctx).GetWorkflowRunByID(r.ctx, uid)
if err != nil {
return nil, false
}

return WorkflowRunToOapi(row), true
}

func (r *WorkflowRunRepo) Set(entity *oapi.WorkflowRun) error {
params, err := ToWorkflowRunUpsertParams(entity)
if err != nil {
return fmt.Errorf("convert to upsert params: %w", err)
}

_, err = db.GetQueries(r.ctx).UpsertWorkflowRun(r.ctx, params)
if err != nil {
return fmt.Errorf("upsert workflow run: %w", err)
}
return nil
}

func (r *WorkflowRunRepo) Remove(id string) error {
uid, err := uuid.Parse(id)
if err != nil {
return fmt.Errorf("parse id: %w", err)
}

return db.GetQueries(r.ctx).DeleteWorkflowRun(r.ctx, uid)
}

func (r *WorkflowRunRepo) Items() map[string]*oapi.WorkflowRun {
uid, err := uuid.Parse(r.workspaceID)
if err != nil {
log.Warn("Failed to parse workspace id for Items()", "id", r.workspaceID, "error", err)
return make(map[string]*oapi.WorkflowRun)
}

rows, err := db.GetQueries(r.ctx).ListWorkflowRunsByWorkspaceID(r.ctx, uid)
if err != nil {
log.Warn("Failed to list workflow runs by workspace", "workspaceId", r.workspaceID, "error", err)
return make(map[string]*oapi.WorkflowRun)
}

result := make(map[string]*oapi.WorkflowRun, len(rows))
for _, row := range rows {
wr := WorkflowRunToOapi(row)
result[wr.Id] = wr
}
return result
}

func (r *WorkflowRunRepo) GetByWorkflowID(workflowID string) ([]*oapi.WorkflowRun, error) {
uid, err := uuid.Parse(workflowID)
if err != nil {
return nil, fmt.Errorf("parse workflow_id: %w", err)
}

rows, err := db.GetQueries(r.ctx).ListWorkflowRunsByWorkflowID(r.ctx, uid)
if err != nil {
return nil, fmt.Errorf("list workflow runs: %w", err)
}

result := make([]*oapi.WorkflowRun, len(rows))
for i, row := range rows {
result[i] = WorkflowRunToOapi(row)
}
return result, nil
}

type WorkflowJobRepo struct {
ctx context.Context
workspaceID string
}

func NewWorkflowJobRepo(ctx context.Context, workspaceID string) *WorkflowJobRepo {
return &WorkflowJobRepo{ctx: ctx, workspaceID: workspaceID}
}

func (r *WorkflowJobRepo) Get(id string) (*oapi.WorkflowJob, bool) {
uid, err := uuid.Parse(id)
if err != nil {
log.Warn("Failed to parse workflow job id", "id", id, "error", err)
return nil, false
}

row, err := db.GetQueries(r.ctx).GetWorkflowJobByID(r.ctx, uid)
if err != nil {
return nil, false
}

return WorkflowJobToOapi(row), true
}

func (r *WorkflowJobRepo) Set(entity *oapi.WorkflowJob) error {
params, err := ToWorkflowJobUpsertParams(entity)
if err != nil {
return fmt.Errorf("convert to upsert params: %w", err)
}

_, err = db.GetQueries(r.ctx).UpsertWorkflowJob(r.ctx, params)
if err != nil {
return fmt.Errorf("upsert workflow job: %w", err)
}
return nil
}

func (r *WorkflowJobRepo) Remove(id string) error {
uid, err := uuid.Parse(id)
if err != nil {
return fmt.Errorf("parse id: %w", err)
}

return db.GetQueries(r.ctx).DeleteWorkflowJob(r.ctx, uid)
}

func (r *WorkflowJobRepo) Items() map[string]*oapi.WorkflowJob {
uid, err := uuid.Parse(r.workspaceID)
if err != nil {
log.Warn("Failed to parse workspace id for Items()", "id", r.workspaceID, "error", err)
return make(map[string]*oapi.WorkflowJob)
}

rows, err := db.GetQueries(r.ctx).ListWorkflowJobsByWorkspaceID(r.ctx, uid)
if err != nil {
log.Warn("Failed to list workflow jobs by workspace", "workspaceId", r.workspaceID, "error", err)
return make(map[string]*oapi.WorkflowJob)
}

result := make(map[string]*oapi.WorkflowJob, len(rows))
for _, row := range rows {
wj := WorkflowJobToOapi(row)
result[wj.Id] = wj
}
return result
}

func (r *WorkflowJobRepo) GetByWorkflowRunID(workflowRunID string) ([]*oapi.WorkflowJob, error) {
uid, err := uuid.Parse(workflowRunID)
if err != nil {
return nil, fmt.Errorf("parse workflow_run_id: %w", err)
}

rows, err := db.GetQueries(r.ctx).ListWorkflowJobsByWorkflowRunID(r.ctx, uid)
if err != nil {
return nil, fmt.Errorf("list workflow jobs: %w", err)
}

result := make([]*oapi.WorkflowJob, len(rows))
for i, row := range rows {
result[i] = WorkflowJobToOapi(row)
}
return result, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add doc comments for exported repository types and constructors.

The exported repo types and constructors in this file should be documented to satisfy package API documentation requirements.

As per coding guidelines, "Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/repo.go`
around lines 13 - 315, Add Go doc comments for each exported repository type and
their constructors: document WorkflowRepo and NewWorkflowRepo,
WorkflowJobTemplateRepo and NewWorkflowJobTemplateRepo, WorkflowRunRepo and
NewWorkflowRunRepo, and WorkflowJobRepo and NewWorkflowJobRepo. For each comment
place a short sentence above the type/constructor explaining its purpose and
usage in the package API (e.g., "WorkflowRepo provides workspace-scoped
persistence for workflows." and "NewWorkflowRepo creates a new WorkflowRepo for
the given workspace."). Ensure comments follow Go convention starting with the
symbol name and give any non-obvious context about workspaceID or ctx where
relevant.

Comment on lines +595 to +673
func (s *InMemory) Workflows() repository.WorkflowRepo {
return &cmapRepoAdapter[*oapi.Workflow]{store: &s.workflows}
}

func (s *InMemory) WorkflowJobTemplates() repository.WorkflowJobTemplateRepo {
return &cmapRepoAdapter[*oapi.WorkflowJobTemplate]{store: &s.workflowJobTemplates}
}

type workflowRunRepoAdapter struct {
store *cmap.ConcurrentMap[string, *oapi.WorkflowRun]
}

func (a *workflowRunRepoAdapter) Get(id string) (*oapi.WorkflowRun, bool) {
return a.store.Get(id)
}

func (a *workflowRunRepoAdapter) Set(entity *oapi.WorkflowRun) error {
a.store.Set(entity.Id, entity)
return nil
}

func (a *workflowRunRepoAdapter) Remove(id string) error {
a.store.Remove(id)
return nil
}

func (a *workflowRunRepoAdapter) Items() map[string]*oapi.WorkflowRun {
return a.store.Items()
}

func (a *workflowRunRepoAdapter) GetByWorkflowID(workflowID string) ([]*oapi.WorkflowRun, error) {
var result []*oapi.WorkflowRun
for item := range a.store.IterBuffered() {
if item.Val.WorkflowId == workflowID {
result = append(result, item.Val)
}
}
return result, nil
}

func (s *InMemory) WorkflowRuns() repository.WorkflowRunRepo {
return &workflowRunRepoAdapter{store: &s.workflowRuns}
}

type workflowJobRepoAdapter struct {
store *cmap.ConcurrentMap[string, *oapi.WorkflowJob]
}

func (a *workflowJobRepoAdapter) Get(id string) (*oapi.WorkflowJob, bool) {
return a.store.Get(id)
}

func (a *workflowJobRepoAdapter) Set(entity *oapi.WorkflowJob) error {
a.store.Set(entity.Id, entity)
return nil
}

func (a *workflowJobRepoAdapter) Remove(id string) error {
a.store.Remove(id)
return nil
}

func (a *workflowJobRepoAdapter) Items() map[string]*oapi.WorkflowJob {
return a.store.Items()
}

func (a *workflowJobRepoAdapter) GetByWorkflowRunID(workflowRunID string) ([]*oapi.WorkflowJob, error) {
var result []*oapi.WorkflowJob
for item := range a.store.IterBuffered() {
if item.Val.WorkflowRunId == workflowRunID {
result = append(result, item.Val)
}
}
return result, nil
}

func (s *InMemory) WorkflowJobs() repository.WorkflowJobRepo {
return &workflowJobRepoAdapter{store: &s.workflowJobs}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add doc comments for the newly exported workflow repository accessors.

Workflows, WorkflowJobTemplates, WorkflowRuns, and WorkflowJobs are exported and should be documented for package-level API clarity.

As per coding guidelines, "Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/store/repository/memory/repo.go` around
lines 595 - 673, Add package-level doc comments for the exported accessor
methods Workflows, WorkflowJobTemplates, WorkflowRuns, and WorkflowJobs: for
each of the methods (InMemory.Workflows, InMemory.WorkflowJobTemplates,
InMemory.WorkflowRuns, InMemory.WorkflowJobs) add a brief comment above the
method that documents what repository is returned, the intended usage/context
and any non-obvious behavior (e.g., that these return in-memory adapters backed
by cmap.ConcurrentMap and are safe for concurrent use), following the project's
comment style for exported symbols.

Comment on lines +522 to +525
// for _, jobID := range allJobIDs {
// _, ok := engine.Workspace().Jobs().Get(jobID)
// assert.False(t, ok, "job %s should be removed", jobID)
// }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Restore job-deletion verification in cascade test.

Line 522 comments out the only assertion that verifies job rows are removed; this weakens the cascade contract the test is supposed to enforce.

💡 Proposed fix
-	// for _, jobID := range allJobIDs {
-	// 	_, ok := engine.Workspace().Jobs().Get(jobID)
-	// 	assert.False(t, ok, "job %s should be removed", jobID)
-	// }
+	for _, jobID := range allJobIDs {
+		_, ok := engine.Workspace().Jobs().Get(jobID)
+		assert.False(t, ok, "job %s should be removed", jobID)
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// for _, jobID := range allJobIDs {
// _, ok := engine.Workspace().Jobs().Get(jobID)
// assert.False(t, ok, "job %s should be removed", jobID)
// }
for _, jobID := range allJobIDs {
_, ok := engine.Workspace().Jobs().Get(jobID)
assert.False(t, ok, "job %s should be removed", jobID)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/test/e2e/engine_workflow_test.go` around lines 522 -
525, Restore the deleted verification loop so the cascade test asserts job rows
are removed: re-enable the commented loop that iterates over allJobIDs and for
each jobID calls engine.Workspace().Jobs().Get(jobID) and uses assert.False(t,
ok, "job %s should be removed", jobID) (or equivalent assertion) to fail the
test if any job still exists; ensure the loop references the existing allJobIDs
variable and the engine.Workspace().Jobs().Get method exactly as in the diff.

Comment on lines +33 to +36
ALTER TABLE "workflow" ADD CONSTRAINT "workflow_workspace_id_workspace_id_fk" FOREIGN KEY ("workspace_id") REFERENCES "public"."workspace"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_job" ADD CONSTRAINT "workflow_job_workflow_run_id_workflow_run_id_fk" FOREIGN KEY ("workflow_run_id") REFERENCES "public"."workflow_run"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_job_template" ADD CONSTRAINT "workflow_job_template_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_run" ADD CONSTRAINT "workflow_run_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action; No newline at end of file
Copy link
Contributor

@coderabbitai coderabbitai bot Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add indexes for new foreign keys.

The new FK columns are unindexed. This will hurt query performance and can significantly slow cascading deletes on parent rows.

⚡ Suggested migration patch
 ALTER TABLE "workflow" ADD CONSTRAINT "workflow_workspace_id_workspace_id_fk" FOREIGN KEY ("workspace_id") REFERENCES "public"."workspace"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
 ALTER TABLE "workflow_job" ADD CONSTRAINT "workflow_job_workflow_run_id_workflow_run_id_fk" FOREIGN KEY ("workflow_run_id") REFERENCES "public"."workflow_run"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
 ALTER TABLE "workflow_job_template" ADD CONSTRAINT "workflow_job_template_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
 ALTER TABLE "workflow_run" ADD CONSTRAINT "workflow_run_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;
+--> statement-breakpoint
+CREATE INDEX "workflow_workspace_id_idx" ON "workflow" ("workspace_id");
+--> statement-breakpoint
+CREATE INDEX "workflow_job_workflow_run_id_idx" ON "workflow_job" ("workflow_run_id");
+--> statement-breakpoint
+CREATE INDEX "workflow_job_template_workflow_id_idx" ON "workflow_job_template" ("workflow_id");
+--> statement-breakpoint
+CREATE INDEX "workflow_run_workflow_id_idx" ON "workflow_run" ("workflow_id");
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ALTER TABLE "workflow" ADD CONSTRAINT "workflow_workspace_id_workspace_id_fk" FOREIGN KEY ("workspace_id") REFERENCES "public"."workspace"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_job" ADD CONSTRAINT "workflow_job_workflow_run_id_workflow_run_id_fk" FOREIGN KEY ("workflow_run_id") REFERENCES "public"."workflow_run"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_job_template" ADD CONSTRAINT "workflow_job_template_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_run" ADD CONSTRAINT "workflow_run_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;
ALTER TABLE "workflow" ADD CONSTRAINT "workflow_workspace_id_workspace_id_fk" FOREIGN KEY ("workspace_id") REFERENCES "public"."workspace"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_job" ADD CONSTRAINT "workflow_job_workflow_run_id_workflow_run_id_fk" FOREIGN KEY ("workflow_run_id") REFERENCES "public"."workflow_run"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_job_template" ADD CONSTRAINT "workflow_job_template_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_run" ADD CONSTRAINT "workflow_run_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
CREATE INDEX "workflow_workspace_id_idx" ON "workflow" ("workspace_id");--> statement-breakpoint
CREATE INDEX "workflow_job_workflow_run_id_idx" ON "workflow_job" ("workflow_run_id");--> statement-breakpoint
CREATE INDEX "workflow_job_template_workflow_id_idx" ON "workflow_job_template" ("workflow_id");--> statement-breakpoint
CREATE INDEX "workflow_run_workflow_id_idx" ON "workflow_run" ("workflow_id");
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/db/drizzle/0153_neat_quasimodo.sql` around lines 33 - 36, The new
foreign-key columns need indexes to avoid performance regressions; add CREATE
INDEX statements for workflow.workspace_id, workflow_run.workflow_id,
workflow_job.workflow_run_id, and workflow_job_template.workflow_id (e.g.,
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_workflow_workspace_id ON
public.workflow(workspace_id), idx_workflow_run_workflow_id ON
public.workflow_run(workflow_id), idx_workflow_job_workflow_run_id ON
public.workflow_job(workflow_run_id), idx_workflow_job_template_workflow_id ON
public.workflow_job_template(workflow_id)) in the same migration so each FK
column is indexed and names are unique and descriptive.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

premature optimization

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adityachoudhari26, understood. I respect your preference to add indexes once the workload patterns are better understood. The load-dependent approach of initially designing with foreign keys but deferring indexes until the database grows and workload is understood is a valid strategy for avoiding premature optimization.


🧠 Learnings used
Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 818
File: apps/workspace-engine/pkg/db/queries/schema.sql:252-282
Timestamp: 2026-02-26T23:01:19.319Z
Learning: User adityachoudhari26 prefers to defer adding database indexes on foreign keys until there's evidence they're needed, considering such additions premature optimization at the initial schema design stage.

Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 637
File: packages/events/src/kafka/client.ts:10-16
Timestamp: 2025-08-01T04:41:41.345Z
Learning: User adityachoudhari26 prefers not to add null safety checks for required environment variables when they are guaranteed to be present in their deployment configuration, similar to their preference for simplicity over defensive programming in test code.

Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 601
File: e2e/tests/api/policies/retry-policy.spec.ts:23-24
Timestamp: 2025-06-24T23:52:50.732Z
Learning: The user adityachoudhari26 prefers not to add null safety checks or defensive programming in test code, particularly in e2e tests, as they prioritize simplicity and focus on the main functionality being tested rather than comprehensive error handling within the test itself.

Comment on lines 1 to 49
import {
integer,
jsonb,
pgTable,
text,
uuid,
} from "drizzle-orm/pg-core";

import { workspace } from "./workspace.js";

export const workflow = pgTable("workflow", {
id: uuid("id").primaryKey().defaultRandom(),
name: text("name").notNull(),
inputs: jsonb("inputs").notNull().default("[]"),
jobs: jsonb("jobs").notNull().default("[]"),
workspaceId: uuid("workspace_id")
.notNull()
.references(() => workspace.id, { onDelete: "cascade" }),
});

export const workflowJobTemplate = pgTable("workflow_job_template", {
id: uuid("id").primaryKey().defaultRandom(),
workflowId: uuid("workflow_id")
.notNull()
.references(() => workflow.id, { onDelete: "cascade" }),
name: text("name").notNull(),
ref: text("ref").notNull(),
config: jsonb("config").notNull().default("{}"),
ifCondition: text("if_condition"),
matrix: jsonb("matrix"),
});

export const workflowRun = pgTable("workflow_run", {
id: uuid("id").primaryKey().defaultRandom(),
workflowId: uuid("workflow_id")
.notNull()
.references(() => workflow.id, { onDelete: "cascade" }),
inputs: jsonb("inputs").notNull().default("{}"),
});

export const workflowJob = pgTable("workflow_job", {
id: uuid("id").primaryKey().defaultRandom(),
workflowRunId: uuid("workflow_run_id")
.notNull()
.references(() => workflowRun.id, { onDelete: "cascade" }),
ref: text("ref").notNull(),
config: jsonb("config").notNull().default("{}"),
index: integer("index").notNull().default(0),
});
Copy link
Contributor

@coderabbitai coderabbitai bot Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Define FK indexes in Drizzle schema to match relational access patterns.

The schema defines four FK columns but no indexes. Add them here so generated/maintained migrations preserve performance characteristics.

🧩 Suggested Drizzle update
 import {
+  index,
   integer,
   jsonb,
   pgTable,
   text,
   uuid,
 } from "drizzle-orm/pg-core";
@@
 export const workflow = pgTable("workflow", {
@@
   workspaceId: uuid("workspace_id")
     .notNull()
     .references(() => workspace.id, { onDelete: "cascade" }),
-});
+}, (table) => ({
+  workflowWorkspaceIdIdx: index("workflow_workspace_id_idx").on(table.workspaceId),
+}));
@@
 export const workflowJobTemplate = pgTable("workflow_job_template", {
@@
   matrix: jsonb("matrix"),
-});
+}, (table) => ({
+  workflowJobTemplateWorkflowIdIdx: index("workflow_job_template_workflow_id_idx").on(table.workflowId),
+}));
@@
 export const workflowRun = pgTable("workflow_run", {
@@
   inputs: jsonb("inputs").notNull().default("{}"),
-});
+}, (table) => ({
+  workflowRunWorkflowIdIdx: index("workflow_run_workflow_id_idx").on(table.workflowId),
+}));
@@
 export const workflowJob = pgTable("workflow_job", {
@@
   index: integer("index").notNull().default(0),
-});
+}, (table) => ({
+  workflowJobWorkflowRunIdIdx: index("workflow_job_workflow_run_id_idx").on(table.workflowRunId),
+}));
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import {
integer,
jsonb,
pgTable,
text,
uuid,
} from "drizzle-orm/pg-core";
import { workspace } from "./workspace.js";
export const workflow = pgTable("workflow", {
id: uuid("id").primaryKey().defaultRandom(),
name: text("name").notNull(),
inputs: jsonb("inputs").notNull().default("[]"),
jobs: jsonb("jobs").notNull().default("[]"),
workspaceId: uuid("workspace_id")
.notNull()
.references(() => workspace.id, { onDelete: "cascade" }),
});
export const workflowJobTemplate = pgTable("workflow_job_template", {
id: uuid("id").primaryKey().defaultRandom(),
workflowId: uuid("workflow_id")
.notNull()
.references(() => workflow.id, { onDelete: "cascade" }),
name: text("name").notNull(),
ref: text("ref").notNull(),
config: jsonb("config").notNull().default("{}"),
ifCondition: text("if_condition"),
matrix: jsonb("matrix"),
});
export const workflowRun = pgTable("workflow_run", {
id: uuid("id").primaryKey().defaultRandom(),
workflowId: uuid("workflow_id")
.notNull()
.references(() => workflow.id, { onDelete: "cascade" }),
inputs: jsonb("inputs").notNull().default("{}"),
});
export const workflowJob = pgTable("workflow_job", {
id: uuid("id").primaryKey().defaultRandom(),
workflowRunId: uuid("workflow_run_id")
.notNull()
.references(() => workflowRun.id, { onDelete: "cascade" }),
ref: text("ref").notNull(),
config: jsonb("config").notNull().default("{}"),
index: integer("index").notNull().default(0),
});
import {
index,
integer,
jsonb,
pgTable,
text,
uuid,
} from "drizzle-orm/pg-core";
import { workspace } from "./workspace.js";
export const workflow = pgTable("workflow", {
id: uuid("id").primaryKey().defaultRandom(),
name: text("name").notNull(),
inputs: jsonb("inputs").notNull().default("[]"),
jobs: jsonb("jobs").notNull().default("[]"),
workspaceId: uuid("workspace_id")
.notNull()
.references(() => workspace.id, { onDelete: "cascade" }),
}, (table) => ({
workflowWorkspaceIdIdx: index("workflow_workspace_id_idx").on(table.workspaceId),
}));
export const workflowJobTemplate = pgTable("workflow_job_template", {
id: uuid("id").primaryKey().defaultRandom(),
workflowId: uuid("workflow_id")
.notNull()
.references(() => workflow.id, { onDelete: "cascade" }),
name: text("name").notNull(),
ref: text("ref").notNull(),
config: jsonb("config").notNull().default("{}"),
ifCondition: text("if_condition"),
matrix: jsonb("matrix"),
}, (table) => ({
workflowJobTemplateWorkflowIdIdx: index("workflow_job_template_workflow_id_idx").on(table.workflowId),
}));
export const workflowRun = pgTable("workflow_run", {
id: uuid("id").primaryKey().defaultRandom(),
workflowId: uuid("workflow_id")
.notNull()
.references(() => workflow.id, { onDelete: "cascade" }),
inputs: jsonb("inputs").notNull().default("{}"),
}, (table) => ({
workflowRunWorkflowIdIdx: index("workflow_run_workflow_id_idx").on(table.workflowId),
}));
export const workflowJob = pgTable("workflow_job", {
id: uuid("id").primaryKey().defaultRandom(),
workflowRunId: uuid("workflow_run_id")
.notNull()
.references(() => workflowRun.id, { onDelete: "cascade" }),
ref: text("ref").notNull(),
config: jsonb("config").notNull().default("{}"),
index: integer("index").notNull().default(0),
}, (table) => ({
workflowJobWorkflowRunIdIdx: index("workflow_job_workflow_run_id_idx").on(table.workflowRunId),
}));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/db/src/schema/workflow.ts` around lines 1 - 49, Add explicit indexes
for the foreign key columns so Drizzle generates migrations that preserve
relational access patterns: add indexes for workflow.workspaceId,
workflowJobTemplate.workflowId, workflowRun.workflowId, and
workflowJob.workflowRunId by declaring appropriate index definitions for those
columns in the respective pgTable definitions (workflow, workflow_job_template,
workflow_run, workflow_job); ensure the index names are unique and descriptive
and follow existing naming conventions so migrations include CREATE INDEX for
these FK columns.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

premature optimization

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adityachoudhari26, understood! I'll defer the index additions until there's evidence they're needed in practice.


🧠 Learnings used
Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 579
File: packages/db/src/schema/rules/concurrency.ts:0-0
Timestamp: 2025-06-01T19:10:11.535Z
Learning: In the ctrlplane codebase, when defining database schemas with Drizzle ORM, it's an intentional pattern to spread base fields (like `basePolicyRuleFields`) and then redefine specific fields to add additional constraints (like unique constraints or foreign key references). The TypeScript field overwriting behavior is deliberately used to override base field definitions with more specific requirements.

Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 601
File: packages/job-dispatch/src/job-update.ts:264-270
Timestamp: 2025-06-24T23:56:54.799Z
Learning: In this codebase, the `Tx` type is just an alias for the database client type (`Omit<typeof db, "$client">`) and does not necessarily indicate an active transaction context. Functions like `createReleaseJob` need to be called within a transaction, which is why they are wrapped with `db.transaction()` even when the parameter is typed as `Tx`. Drizzle supports nested transactions via breakpoints, so additional transaction wrappers are safe even if already within a transaction.

Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 818
File: apps/workspace-engine/pkg/db/queries/schema.sql:252-282
Timestamp: 2026-02-26T23:01:19.319Z
Learning: User adityachoudhari26 prefers to defer adding database indexes on foreign keys until there's evidence they're needed, considering such additions premature optimization at the initial schema design stage.

Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 637
File: packages/events/src/kafka/client.ts:10-16
Timestamp: 2025-08-01T04:41:41.345Z
Learning: User adityachoudhari26 prefers not to add null safety checks for required environment variables when they are guaranteed to be present in their deployment configuration, similar to their preference for simplicity over defensive programming in test code.

Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 601
File: e2e/tests/api/policies/retry-policy.spec.ts:23-24
Timestamp: 2025-06-24T23:52:50.732Z
Learning: The user adityachoudhari26 prefers not to add null safety checks or defensive programming in test code, particularly in e2e tests, as they prioritize simplicity and focus on the main functionality being tested rather than comprehensive error handling within the test itself.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go (1)

179-179: Potential integer overflow on 64-bit systems.

Converting e.Index (int, 64-bit on most systems) to int32 could overflow silently if the value exceeds math.MaxInt32. While job indices realistically won't reach this limit, the conversion is technically unsafe.

🛡️ Optional bounds check
+import "math"
+
 func ToWorkflowJobUpsertParams(e *oapi.WorkflowJob) (db.UpsertWorkflowJobParams, error) {
 	// ... existing code ...
 
+	if e.Index < math.MinInt32 || e.Index > math.MaxInt32 {
+		return db.UpsertWorkflowJobParams{}, fmt.Errorf("index %d out of int32 range", e.Index)
+	}
+
 	return db.UpsertWorkflowJobParams{
 		// ...
 		Index:         int32(e.Index),
 	}, nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go`
at line 179, The assignment "Index: int32(e.Index)" can overflow when e.Index
exceeds int32 bounds; change the mapper in mapper.go to validate e.Index before
casting (e.g., check against math.MaxInt32 and math.MinInt32) and either clamp
to the nearest int32 value or return an error/unwrap with explicit handling;
update the mapping function that sets the Index field (the code that currently
does "Index: int32(e.Index)") to perform this bounds check and handle
out-of-range values safely instead of a blind cast.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go`:
- Around line 68-74: The JSON unmarshal for row.Matrix currently ignores errors;
change the block that reads row.Matrix into an oapi.WorkflowJobMatrix
(variables: row.Matrix, m, matrix, type oapi.WorkflowJobMatrix) to check the
error returned by json.Unmarshal and handle it consistently with WorkflowToOapi
— e.g., log the unmarshalling error (including contextual identifiers like the
workflow/row ID) with the repository logger or return the error to the caller
instead of silently dropping it, so malformed JSON is surfaced during mapping.
- Around line 13-30: The function WorkflowToOapi currently swallows
json.Unmarshal errors for row.Inputs and row.Jobs causing silent data loss;
update WorkflowToOapi to surface failures by either (A) adding error
propagation: change signature to WorkflowToOapi(row db.Workflow)
(*oapi.Workflow, error) and return an error if unmarshaling row.Inputs or
row.Jobs fails, or (B) if you must keep the current signature, at minimum log
the unmarshal error (including the field name row.Inputs / row.Jobs and row.ID)
via the package logger before returning the partially-built *oapi.Workflow so
corruption is visible; adjust callers accordingly if you choose option A (handle
the returned error from WorkflowToOapi).

---

Nitpick comments:
In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go`:
- Line 179: The assignment "Index: int32(e.Index)" can overflow when e.Index
exceeds int32 bounds; change the mapper in mapper.go to validate e.Index before
casting (e.g., check against math.MaxInt32 and math.MinInt32) and either clamp
to the nearest int32 value or return an error/unwrap with explicit handling;
update the mapping function that sets the Index field (the code that currently
does "Index: int32(e.Index)") to perform this bounds check and handle
out-of-range values safely instead of a blind cast.

ℹ️ Review info

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Disabled knowledge base sources:

  • Linear integration is disabled

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 474c596 and 3cf05de.

📒 Files selected for processing (6)
  • apps/workspace-engine/pkg/db/models.go
  • apps/workspace-engine/pkg/db/queries/workflows.sql
  • apps/workspace-engine/pkg/db/workflows.sql.go
  • apps/workspace-engine/pkg/workspace/store/repository/db/repo.go
  • apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go
  • apps/workspace-engine/pkg/workspace/store/repository/db/workflows/repo.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • apps/workspace-engine/pkg/db/queries/workflows.sql
  • apps/workspace-engine/pkg/workspace/store/repository/db/workflows/repo.go

Comment on lines +13 to +30
func WorkflowToOapi(row db.Workflow) *oapi.Workflow {
var inputs []oapi.WorkflowInput
if row.Inputs != nil {
_ = json.Unmarshal(row.Inputs, &inputs)
}

var jobs []oapi.WorkflowJobTemplate
if row.Jobs != nil {
_ = json.Unmarshal(row.Jobs, &jobs)
}

return &oapi.Workflow{
Id: row.ID.String(),
Name: row.Name,
Inputs: inputs,
Jobs: jobs,
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Silent JSON unmarshal errors may hide data corruption.

The discarded errors on lines 16 and 21 mean callers receive empty slices when Inputs or Jobs contain malformed JSON. If this data originates from the DB (which it does), corruption would go unnoticed.

Consider logging or returning an error if the data is expected to be valid, or at minimum document this as intentional behavior.

🛡️ Suggested improvement to surface errors
 func WorkflowToOapi(row db.Workflow) *oapi.Workflow {
 	var inputs []oapi.WorkflowInput
 	if row.Inputs != nil {
-		_ = json.Unmarshal(row.Inputs, &inputs)
+		if err := json.Unmarshal(row.Inputs, &inputs); err != nil {
+			// Log or handle: inputs will default to nil/empty
+		}
 	}

 	var jobs []oapi.WorkflowJobTemplate
 	if row.Jobs != nil {
-		_ = json.Unmarshal(row.Jobs, &jobs)
+		if err := json.Unmarshal(row.Jobs, &jobs); err != nil {
+			// Log or handle: jobs will default to nil/empty
+		}
 	}

Alternatively, change the signature to return (*oapi.Workflow, error) if strict validation is preferred.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go`
around lines 13 - 30, The function WorkflowToOapi currently swallows
json.Unmarshal errors for row.Inputs and row.Jobs causing silent data loss;
update WorkflowToOapi to surface failures by either (A) adding error
propagation: change signature to WorkflowToOapi(row db.Workflow)
(*oapi.Workflow, error) and return an error if unmarshaling row.Inputs or
row.Jobs fails, or (B) if you must keep the current signature, at minimum log
the unmarshal error (including the field name row.Inputs / row.Jobs and row.ID)
via the package logger before returning the partially-built *oapi.Workflow so
corruption is visible; adjust callers accordingly if you choose option A (handle
the returned error from WorkflowToOapi).

Comment on lines +68 to +74
var matrix *oapi.WorkflowJobMatrix
if row.Matrix != nil {
m := &oapi.WorkflowJobMatrix{}
if err := json.Unmarshal(row.Matrix, m); err == nil {
matrix = m
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Same silent unmarshal pattern for Matrix.

Similar to the WorkflowToOapi function, the JSON unmarshal error for Matrix is silently ignored. If the stored JSON is malformed, matrix remains nil without any indication of failure.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go`
around lines 68 - 74, The JSON unmarshal for row.Matrix currently ignores
errors; change the block that reads row.Matrix into an oapi.WorkflowJobMatrix
(variables: row.Matrix, m, matrix, type oapi.WorkflowJobMatrix) to check the
error returned by json.Unmarshal and handle it consistently with WorkflowToOapi
— e.g., log the unmarshalling error (including contextual identifiers like the
workflow/row ID) with the repository logger or return the error to the caller
instead of silently dropping it, so malformed JSON is surfaced during mapping.

@adityachoudhari26 adityachoudhari26 merged commit 422287f into main Feb 26, 2026
9 checks passed
@adityachoudhari26 adityachoudhari26 deleted the migrate-workflows-db branch February 26, 2026 23:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant