chore: migrate workflows to db#818
Conversation
|
Caution Review failedThe pull request is closed. ℹ️ Recent review infoConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Disabled knowledge base sources:
📒 Files selected for processing (3)
📝 WalkthroughWalkthroughAdds a workflow subsystem: OpenAPI/schema change (WorkflowJobTemplate.workflowId), DB schema and migrations, sqlc-generated queries and Go DAL, DB and in-memory repository implementations, store wiring and migration, repo-backed store usage, and small manager/test adjustments. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant API as Workspace API
participant Store
participant Repo as WorkflowRepo
participant DB
Client->>API: Create WorkflowRun
API->>Store: Upsert WorkflowRun
Store->>Repo: Set(WorkflowRun)
Repo->>DB: Upsert workflow_run row
DB-->>Repo: Persisted row
Repo-->>Store: Persisted WorkflowRun
Store-->>API: Created WorkflowRun response
API-->>Client: 201 Created
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
92e0949 to
fab1ae3
Compare
c717e9f to
474c596
Compare
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
apps/workspace-engine/pkg/workspace/store/workflow_job.go (1)
50-58:⚠️ Potential issue | 🟠 MajorDelete dependent jobs before deleting the workflow job.
repo.Remove(id)runs before removingw.store.Jobschildren. This can fail under FK constraints and can lose expected child-delete propagation.Suggested fix
func (w *WorkflowJobs) Remove(ctx context.Context, id string) { workflowJob, ok := w.repo.Get(id) if !ok || workflowJob == nil { return } + jobs := w.store.Jobs.GetByWorkflowJobId(id) + for _, job := range jobs { + w.store.Jobs.Remove(ctx, job.Id) + } + if err := w.repo.Remove(id); err != nil { log.Error("Failed to remove workflow job", "error", err) return } - - jobs := w.store.Jobs.GetByWorkflowJobId(id) - for _, job := range jobs { - w.store.Jobs.Remove(ctx, job.Id) - } w.store.changeset.RecordDelete(workflowJob) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/workspace-engine/pkg/workspace/store/workflow_job.go` around lines 50 - 58, The repository call w.repo.Remove(id) is happening before deleting dependent child jobs (w.store.Jobs.GetByWorkflowJobId / w.store.Jobs.Remove), which can violate FK constraints; change the order to first fetch and remove all child jobs via w.store.Jobs.GetByWorkflowJobId(id) and loop calling w.store.Jobs.Remove(ctx, job.Id), checking and returning errors as they occur, and only after all children are deleted call w.repo.Remove(id); if possible perform both steps inside the same transaction or ensure error handling/rollback so partial deletes do not leave inconsistent state.apps/workspace-engine/pkg/workspace/store/workflows.go (1)
49-57:⚠️ Potential issue | 🔴 CriticalDelete child workflow runs before deleting the workflow.
Current order removes the parent first, then children. This can fail with FK constraints and can skip downstream run/job cleanup behavior.
Suggested fix
func (w *Workflows) Remove(ctx context.Context, id string) { workflow, ok := w.repo.Get(id) if !ok || workflow == nil { return } + workflowRuns := w.store.WorkflowRuns.GetByWorkflowId(id) + for _, workflowRun := range workflowRuns { + w.store.WorkflowRuns.Remove(ctx, workflowRun.Id) + } + if err := w.repo.Remove(id); err != nil { log.Error("Failed to remove workflow", "error", err) return } - - workflowRuns := w.store.WorkflowRuns.GetByWorkflowId(id) - for _, workflowRun := range workflowRuns { - w.store.WorkflowRuns.Remove(ctx, workflowRun.Id) - } w.store.changeset.RecordDelete(workflow) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/workspace-engine/pkg/workspace/store/workflows.go` around lines 49 - 57, The delete order is reversed: currently w.repo.Remove(id) runs before deleting child workflow runs, which can trigger FK constraint errors and skip downstream cleanup; change the logic in the function that calls w.repo.Remove to first fetch child runs via w.store.WorkflowRuns.GetByWorkflowId(id) and loop calling w.store.WorkflowRuns.Remove(ctx, workflowRun.Id) for each child, ensure removal errors are handled/logged, and only after all children are removed call w.repo.Remove(id) to delete the parent workflow.apps/workspace-engine/pkg/workspace/store/workflow_runs.go (1)
61-69:⚠️ Potential issue | 🔴 CriticalRemove workflow jobs before removing the workflow run.
The run is deleted first and child workflow jobs are deleted after. This is fragile with FK constraints and can bypass intended child cleanup semantics.
Suggested fix
func (w *WorkflowRuns) Remove(ctx context.Context, id string) { workflowRun, ok := w.repo.Get(id) if !ok || workflowRun == nil { return } + workflowJobs := w.store.WorkflowJobs.GetByWorkflowRunId(id) + for _, workflowJob := range workflowJobs { + w.store.WorkflowJobs.Remove(ctx, workflowJob.Id) + } + if err := w.repo.Remove(id); err != nil { log.Error("Failed to remove workflow run", "error", err) return } - - workflowJobs := w.store.WorkflowJobs.GetByWorkflowRunId(id) - for _, workflowJob := range workflowJobs { - w.store.WorkflowJobs.Remove(ctx, workflowJob.Id) - } w.store.changeset.RecordDelete(workflowRun) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/workspace-engine/pkg/workspace/store/workflow_runs.go` around lines 61 - 69, The workflow run is being removed before its child workflow jobs, which can break FK constraints; change the deletion order so you first fetch and delete children using w.store.WorkflowJobs.GetByWorkflowRunId(id) and call w.store.WorkflowJobs.Remove(ctx, workflowJob.Id) for each job, handling and returning on any Remove error, and only after all child removals succeed call w.repo.Remove(id); ensure you use the existing variables and methods (id, ctx, w.store.WorkflowJobs.GetByWorkflowRunId, w.store.WorkflowJobs.Remove, w.repo.Remove) and preserve logging for failures.
🧹 Nitpick comments (4)
apps/workspace-engine/pkg/workspace/workflowmanager/manager.go (1)
99-125: Remove duplicate workflow run upsert.Line 124 repeats the same
workflowRunupsert already done at Line 99, with no intermediate mutation.♻️ Suggested cleanup
m.store.WorkflowRuns.Upsert(ctx, workflowRun) @@ - m.store.WorkflowRuns.Upsert(ctx, workflowRun) - for _, wfJob := range workflowJobs {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/workspace-engine/pkg/workspace/workflowmanager/manager.go` around lines 99 - 125, There is a duplicate call to m.store.WorkflowRuns.Upsert for the same workflowRun with no intervening changes; remove the redundant Upsert invocation (the second one after building workflowJobs) so only the initial m.store.WorkflowRuns.Upsert(ctx, workflowRun) remains; locate the duplicate by searching for m.store.WorkflowRuns.Upsert and the workflowRun variable in manager.go and delete the extra call.apps/workspace-engine/pkg/workspace/workflowmanager/manager_test.go (1)
47-55: PopulateWorkflowIdin theWorkflowJobTemplatefixture.This fixture leaves
WorkflowIdunset (""), so the test doesn’t exercise the new workflow-template linkage path.Proposed test fixture update
Jobs: []oapi.WorkflowJobTemplate{ { - Id: workflowJobTemplateID, - Name: "test-job", - Ref: jobAgent1ID, + Id: workflowJobTemplateID, + Name: "test-job", + Ref: jobAgent1ID, + WorkflowId: workflowID, Config: map[string]any{ "delaySeconds": 10, }, }, },🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/workspace-engine/pkg/workspace/workflowmanager/manager_test.go` around lines 47 - 55, The test fixture for Jobs uses an oapi.WorkflowJobTemplate with Id workflowJobTemplateID but leaves WorkflowId empty; update the Jobs entry in manager_test.go to set WorkflowId to the parent workflow's ID (the same ID used for the workflow fixture) so the test exercises the workflow-template linkage path—locate the Jobs slice where WorkflowJobTemplate is constructed (the element with Id = workflowJobTemplateID and Ref = jobAgent1ID) and assign its WorkflowId field to the appropriate workflow ID constant used in the test.apps/workspace-engine/pkg/workspace/store/workflow_runs.go (1)
36-39: Avoid silent failure onGetByWorkflowIdrepository errors.Returning an empty map without logging makes DB/read failures look like “no runs found,” which hides operational issues.
Suggested fix
runs, err := w.repo.GetByWorkflowID(workflowId) if err != nil { - return make(map[string]*oapi.WorkflowRun) + log.Error("Failed to get workflow runs by workflow ID", "workflow_id", workflowId, "error", err) + return make(map[string]*oapi.WorkflowRun) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/workspace-engine/pkg/workspace/store/workflow_runs.go` around lines 36 - 39, The current handling of w.repo.GetByWorkflowID(workflowId) silently returns an empty map on error; change the code in the method that calls w.repo.GetByWorkflowID (in workflow_runs.go) to surface or log the repository error instead of swallowing it: either return the error up the call stack (propagate the error from the surrounding function) or call the appropriate logger (e.g., processLogger or w.logger) to record the error before returning a nil/empty result; locate the call to w.repo.GetByWorkflowID(workflowId) and replace the current silent-return branch so it includes error propagation or a clear error log message that contains err and workflowId for debugging.apps/workspace-engine/pkg/workspace/store/workflow_job.go (1)
64-67: Log repository errors inGetByWorkflowRunId.Returning
nilon error without logging masks data-access failures and complicates debugging.Suggested fix
wfJobs, err := w.repo.GetByWorkflowRunID(workflowRunId) if err != nil { + log.Error("Failed to get workflow jobs by workflow run ID", "workflow_run_id", workflowRunId, "error", err) return nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/workspace-engine/pkg/workspace/store/workflow_job.go` around lines 64 - 67, When w.repo.GetByWorkflowRunID(workflowRunId) returns an error, log that error before returning to avoid swallowing data-access failures; modify the err != nil branch in the function that calls w.repo.GetByWorkflowRunID to call the component logger (e.g., w.logger.Errorf("GetByWorkflowRunID(%s) error: %v", workflowRunId, err) or w.log.Error/Infof depending on the logger in this type) and then return the existing nil result (or wrap/return the error if the function signature allows) so repository errors are visible in logs; keep references to w.repo and GetByWorkflowRunID and the workflowRunId parameter when forming the log message.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/workspace-engine/pkg/db/models.go`:
- Around line 318-326: The generated DB model WorkflowJobTemplate is out of sync
with the SQL (it defines WorkspaceID but the SQL and table use workflow_id),
which breaks UpsertWorkflowJobTemplate and GetWorkflowJobTemplateByID scanning
and the mapper that needs workflowId; regenerate sqlc outputs so the struct and
generated query code match the schema (workflow_job_template) and queries:
re-run sqlc code generation to update WorkflowJobTemplate to include WorkflowID
uuid.UUID (and remove/rename WorkspaceID if present), then rebuild and verify
UpsertWorkflowJobTemplate and GetWorkflowJobTemplateByID now reference
workflow_id and that the mapper can populate workflowId in API responses.
In `@apps/workspace-engine/pkg/db/queries/schema.sql`:
- Around line 252-282: The foreign key columns lack indexes which will hurt
list/query and cascade performance; add explicit B-tree indexes on
workflow.workspace_id, workflow_job_template.workflow_id,
workflow_run.workflow_id, and workflow_job.workflow_run_id (i.e., create indexes
for each FK column) so lookups and cascading deletes are efficient and the SQLC
schema reflects those indexes.
In `@apps/workspace-engine/pkg/db/queries/workflows.sql`:
- Around line 8-10: The upsert currently allows changing workspace ownership by
including workspace_id in the DO UPDATE SET list; update the UpsertWorkflow SQL
(the INSERT INTO workflow ... ON CONFLICT (id) DO UPDATE ...) to stop updating
workspace_id on conflict so existing rows keep their original workspace. Edit
the query used by UpsertWorkflow to remove workspace_id from the SET clause (or
otherwise ensure the DO UPDATE does not assign EXCLUDED.workspace_id), leaving
only name, inputs, jobs (and optionally add a WHERE clause to guard workspace_id
equality if you want extra safety).
In `@apps/workspace-engine/pkg/db/workflows.sql.go`:
- Around line 85-87: The query for GetWorkflowJobTemplateByID and related
read/upsert SQL (e.g., the getWorkflowJobTemplateByID constant and the other
statements around lines 359-373) omits the workflow_id column so the
WorkflowJobTemplate.WorkflowId cannot be persisted/hydrated; update the SELECT,
INSERT/UPSERT and any param lists that touch the workflow_job_template table to
include workflow_id (and the corresponding placeholder $N) so the DB round-trip
preserves the WorkflowId field, or alternatively remove WorkflowId from the API
model if the relation is intentionally unsupported. Ensure the unique
identifiers referenced are workflow_job_template table,
getWorkflowJobTemplateByID constant, and the WorkflowJobTemplate/WorkflowId
fields when making the change.
In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go`:
- Around line 13-180: Several exported mapper functions lack package-level
documentation; add clear doc comments for each exported function
(WorkflowToOapi, ToWorkflowUpsertParams, WorkflowJobTemplateToOapi,
ToWorkflowJobTemplateUpsertParams, WorkflowRunToOapi, ToWorkflowRunUpsertParams,
WorkflowJobToOapi, ToWorkflowJobUpsertParams) describing what they convert, the
expected input types and any important behavior (e.g., JSON
unmarshalling/marshalling, handling of nil/valid fields, error returns). Place a
brief single-sentence comment immediately above each function declaration
following Go doc conventions (starting with the function name) and note special
cases like when Inputs/Jobs are nil, IfCondition/Matrix handling, or when errors
are returned from uuid.Parse/json.Marshal.
In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/repo.go`:
- Around line 13-315: Add Go doc comments for each exported repository type and
their constructors: document WorkflowRepo and NewWorkflowRepo,
WorkflowJobTemplateRepo and NewWorkflowJobTemplateRepo, WorkflowRunRepo and
NewWorkflowRunRepo, and WorkflowJobRepo and NewWorkflowJobRepo. For each comment
place a short sentence above the type/constructor explaining its purpose and
usage in the package API (e.g., "WorkflowRepo provides workspace-scoped
persistence for workflows." and "NewWorkflowRepo creates a new WorkflowRepo for
the given workspace."). Ensure comments follow Go convention starting with the
symbol name and give any non-obvious context about workspaceID or ctx where
relevant.
In `@apps/workspace-engine/pkg/workspace/store/repository/memory/repo.go`:
- Around line 595-673: Add package-level doc comments for the exported accessor
methods Workflows, WorkflowJobTemplates, WorkflowRuns, and WorkflowJobs: for
each of the methods (InMemory.Workflows, InMemory.WorkflowJobTemplates,
InMemory.WorkflowRuns, InMemory.WorkflowJobs) add a brief comment above the
method that documents what repository is returned, the intended usage/context
and any non-obvious behavior (e.g., that these return in-memory adapters backed
by cmap.ConcurrentMap and are safe for concurrent use), following the project's
comment style for exported symbols.
In `@apps/workspace-engine/test/e2e/engine_workflow_test.go`:
- Around line 522-525: Restore the deleted verification loop so the cascade test
asserts job rows are removed: re-enable the commented loop that iterates over
allJobIDs and for each jobID calls engine.Workspace().Jobs().Get(jobID) and uses
assert.False(t, ok, "job %s should be removed", jobID) (or equivalent assertion)
to fail the test if any job still exists; ensure the loop references the
existing allJobIDs variable and the engine.Workspace().Jobs().Get method exactly
as in the diff.
In `@packages/db/drizzle/0153_neat_quasimodo.sql`:
- Around line 33-36: The new foreign-key columns need indexes to avoid
performance regressions; add CREATE INDEX statements for workflow.workspace_id,
workflow_run.workflow_id, workflow_job.workflow_run_id, and
workflow_job_template.workflow_id (e.g., CREATE INDEX CONCURRENTLY IF NOT EXISTS
idx_workflow_workspace_id ON public.workflow(workspace_id),
idx_workflow_run_workflow_id ON public.workflow_run(workflow_id),
idx_workflow_job_workflow_run_id ON public.workflow_job(workflow_run_id),
idx_workflow_job_template_workflow_id ON
public.workflow_job_template(workflow_id)) in the same migration so each FK
column is indexed and names are unique and descriptive.
In `@packages/db/src/schema/workflow.ts`:
- Around line 1-49: Add explicit indexes for the foreign key columns so Drizzle
generates migrations that preserve relational access patterns: add indexes for
workflow.workspaceId, workflowJobTemplate.workflowId, workflowRun.workflowId,
and workflowJob.workflowRunId by declaring appropriate index definitions for
those columns in the respective pgTable definitions (workflow,
workflow_job_template, workflow_run, workflow_job); ensure the index names are
unique and descriptive and follow existing naming conventions so migrations
include CREATE INDEX for these FK columns.
---
Outside diff comments:
In `@apps/workspace-engine/pkg/workspace/store/workflow_job.go`:
- Around line 50-58: The repository call w.repo.Remove(id) is happening before
deleting dependent child jobs (w.store.Jobs.GetByWorkflowJobId /
w.store.Jobs.Remove), which can violate FK constraints; change the order to
first fetch and remove all child jobs via w.store.Jobs.GetByWorkflowJobId(id)
and loop calling w.store.Jobs.Remove(ctx, job.Id), checking and returning errors
as they occur, and only after all children are deleted call w.repo.Remove(id);
if possible perform both steps inside the same transaction or ensure error
handling/rollback so partial deletes do not leave inconsistent state.
In `@apps/workspace-engine/pkg/workspace/store/workflow_runs.go`:
- Around line 61-69: The workflow run is being removed before its child workflow
jobs, which can break FK constraints; change the deletion order so you first
fetch and delete children using w.store.WorkflowJobs.GetByWorkflowRunId(id) and
call w.store.WorkflowJobs.Remove(ctx, workflowJob.Id) for each job, handling and
returning on any Remove error, and only after all child removals succeed call
w.repo.Remove(id); ensure you use the existing variables and methods (id, ctx,
w.store.WorkflowJobs.GetByWorkflowRunId, w.store.WorkflowJobs.Remove,
w.repo.Remove) and preserve logging for failures.
In `@apps/workspace-engine/pkg/workspace/store/workflows.go`:
- Around line 49-57: The delete order is reversed: currently w.repo.Remove(id)
runs before deleting child workflow runs, which can trigger FK constraint errors
and skip downstream cleanup; change the logic in the function that calls
w.repo.Remove to first fetch child runs via
w.store.WorkflowRuns.GetByWorkflowId(id) and loop calling
w.store.WorkflowRuns.Remove(ctx, workflowRun.Id) for each child, ensure removal
errors are handled/logged, and only after all children are removed call
w.repo.Remove(id) to delete the parent workflow.
---
Nitpick comments:
In `@apps/workspace-engine/pkg/workspace/store/workflow_job.go`:
- Around line 64-67: When w.repo.GetByWorkflowRunID(workflowRunId) returns an
error, log that error before returning to avoid swallowing data-access failures;
modify the err != nil branch in the function that calls
w.repo.GetByWorkflowRunID to call the component logger (e.g.,
w.logger.Errorf("GetByWorkflowRunID(%s) error: %v", workflowRunId, err) or
w.log.Error/Infof depending on the logger in this type) and then return the
existing nil result (or wrap/return the error if the function signature allows)
so repository errors are visible in logs; keep references to w.repo and
GetByWorkflowRunID and the workflowRunId parameter when forming the log message.
In `@apps/workspace-engine/pkg/workspace/store/workflow_runs.go`:
- Around line 36-39: The current handling of w.repo.GetByWorkflowID(workflowId)
silently returns an empty map on error; change the code in the method that calls
w.repo.GetByWorkflowID (in workflow_runs.go) to surface or log the repository
error instead of swallowing it: either return the error up the call stack
(propagate the error from the surrounding function) or call the appropriate
logger (e.g., processLogger or w.logger) to record the error before returning a
nil/empty result; locate the call to w.repo.GetByWorkflowID(workflowId) and
replace the current silent-return branch so it includes error propagation or a
clear error log message that contains err and workflowId for debugging.
In `@apps/workspace-engine/pkg/workspace/workflowmanager/manager_test.go`:
- Around line 47-55: The test fixture for Jobs uses an oapi.WorkflowJobTemplate
with Id workflowJobTemplateID but leaves WorkflowId empty; update the Jobs entry
in manager_test.go to set WorkflowId to the parent workflow's ID (the same ID
used for the workflow fixture) so the test exercises the workflow-template
linkage path—locate the Jobs slice where WorkflowJobTemplate is constructed (the
element with Id = workflowJobTemplateID and Ref = jobAgent1ID) and assign its
WorkflowId field to the appropriate workflow ID constant used in the test.
In `@apps/workspace-engine/pkg/workspace/workflowmanager/manager.go`:
- Around line 99-125: There is a duplicate call to m.store.WorkflowRuns.Upsert
for the same workflowRun with no intervening changes; remove the redundant
Upsert invocation (the second one after building workflowJobs) so only the
initial m.store.WorkflowRuns.Upsert(ctx, workflowRun) remains; locate the
duplicate by searching for m.store.WorkflowRuns.Upsert and the workflowRun
variable in manager.go and delete the extra call.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (30)
apps/workspace-engine/oapi/openapi.jsonapps/workspace-engine/oapi/spec/schemas/workflows.jsonnetapps/workspace-engine/pkg/db/models.goapps/workspace-engine/pkg/db/queries/schema.sqlapps/workspace-engine/pkg/db/queries/workflows.sqlapps/workspace-engine/pkg/db/sqlc.yamlapps/workspace-engine/pkg/db/workflows.sql.goapps/workspace-engine/pkg/oapi/oapi.gen.goapps/workspace-engine/pkg/workspace/store/repository/db/repo.goapps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.goapps/workspace-engine/pkg/workspace/store/repository/db/workflows/repo.goapps/workspace-engine/pkg/workspace/store/repository/interfaces.goapps/workspace-engine/pkg/workspace/store/repository/memory/repo.goapps/workspace-engine/pkg/workspace/store/repository/repo.goapps/workspace-engine/pkg/workspace/store/store.goapps/workspace-engine/pkg/workspace/store/workflow_job.goapps/workspace-engine/pkg/workspace/store/workflow_job_templates.goapps/workspace-engine/pkg/workspace/store/workflow_runs.goapps/workspace-engine/pkg/workspace/store/workflows.goapps/workspace-engine/pkg/workspace/workflowmanager/manager.goapps/workspace-engine/pkg/workspace/workflowmanager/manager_test.goapps/workspace-engine/svc/workspaceconsumer/consumer.goapps/workspace-engine/test/e2e/engine_workflow_test.goapps/workspace-engine/test/integration/creators/workflow.goapps/workspace-engine/test/integration/dbtest.gopackages/db/drizzle/0153_neat_quasimodo.sqlpackages/db/drizzle/meta/0153_snapshot.jsonpackages/db/drizzle/meta/_journal.jsonpackages/db/src/schema/index.tspackages/db/src/schema/workflow.ts
| CREATE TABLE workflow ( | ||
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | ||
| name TEXT NOT NULL, | ||
| inputs JSONB NOT NULL DEFAULT '[]', | ||
| jobs JSONB NOT NULL DEFAULT '[]', | ||
| workspace_id UUID NOT NULL REFERENCES workspace(id) ON DELETE CASCADE | ||
| ); | ||
|
|
||
| CREATE TABLE workflow_job_template ( | ||
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | ||
| workflow_id UUID NOT NULL REFERENCES workflow(id) ON DELETE CASCADE, | ||
| name TEXT NOT NULL, | ||
| ref TEXT NOT NULL, | ||
| config JSONB NOT NULL DEFAULT '{}', | ||
| if_condition TEXT, | ||
| matrix JSONB | ||
| ); | ||
|
|
||
| CREATE TABLE workflow_run ( | ||
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | ||
| workflow_id UUID NOT NULL REFERENCES workflow(id) ON DELETE CASCADE, | ||
| inputs JSONB NOT NULL DEFAULT '{}' | ||
| ); | ||
|
|
||
| CREATE TABLE workflow_job ( | ||
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | ||
| workflow_run_id UUID NOT NULL REFERENCES workflow_run(id) ON DELETE CASCADE, | ||
| ref TEXT NOT NULL, | ||
| config JSONB NOT NULL DEFAULT '{}', | ||
| index INTEGER NOT NULL DEFAULT 0 | ||
| ); |
There was a problem hiding this comment.
Add indexes on workflow foreign keys in SQLC schema.
These FK columns should be indexed to keep list/query paths and cascading cleanup performant.
📌 Suggested SQL additions
CREATE TABLE workflow_job (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_run_id UUID NOT NULL REFERENCES workflow_run(id) ON DELETE CASCADE,
ref TEXT NOT NULL,
config JSONB NOT NULL DEFAULT '{}',
index INTEGER NOT NULL DEFAULT 0
);
+
+CREATE INDEX workflow_workspace_id_idx ON workflow(workspace_id);
+CREATE INDEX workflow_job_template_workflow_id_idx ON workflow_job_template(workflow_id);
+CREATE INDEX workflow_run_workflow_id_idx ON workflow_run(workflow_id);
+CREATE INDEX workflow_job_workflow_run_id_idx ON workflow_job(workflow_run_id);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| CREATE TABLE workflow ( | |
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
| name TEXT NOT NULL, | |
| inputs JSONB NOT NULL DEFAULT '[]', | |
| jobs JSONB NOT NULL DEFAULT '[]', | |
| workspace_id UUID NOT NULL REFERENCES workspace(id) ON DELETE CASCADE | |
| ); | |
| CREATE TABLE workflow_job_template ( | |
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
| workflow_id UUID NOT NULL REFERENCES workflow(id) ON DELETE CASCADE, | |
| name TEXT NOT NULL, | |
| ref TEXT NOT NULL, | |
| config JSONB NOT NULL DEFAULT '{}', | |
| if_condition TEXT, | |
| matrix JSONB | |
| ); | |
| CREATE TABLE workflow_run ( | |
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
| workflow_id UUID NOT NULL REFERENCES workflow(id) ON DELETE CASCADE, | |
| inputs JSONB NOT NULL DEFAULT '{}' | |
| ); | |
| CREATE TABLE workflow_job ( | |
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
| workflow_run_id UUID NOT NULL REFERENCES workflow_run(id) ON DELETE CASCADE, | |
| ref TEXT NOT NULL, | |
| config JSONB NOT NULL DEFAULT '{}', | |
| index INTEGER NOT NULL DEFAULT 0 | |
| ); | |
| CREATE TABLE workflow ( | |
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
| name TEXT NOT NULL, | |
| inputs JSONB NOT NULL DEFAULT '[]', | |
| jobs JSONB NOT NULL DEFAULT '[]', | |
| workspace_id UUID NOT NULL REFERENCES workspace(id) ON DELETE CASCADE | |
| ); | |
| CREATE TABLE workflow_job_template ( | |
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
| workflow_id UUID NOT NULL REFERENCES workflow(id) ON DELETE CASCADE, | |
| name TEXT NOT NULL, | |
| ref TEXT NOT NULL, | |
| config JSONB NOT NULL DEFAULT '{}', | |
| if_condition TEXT, | |
| matrix JSONB | |
| ); | |
| CREATE TABLE workflow_run ( | |
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
| workflow_id UUID NOT NULL REFERENCES workflow(id) ON DELETE CASCADE, | |
| inputs JSONB NOT NULL DEFAULT '{}' | |
| ); | |
| CREATE TABLE workflow_job ( | |
| id UUID PRIMARY KEY DEFAULT gen_random_uuid(), | |
| workflow_run_id UUID NOT NULL REFERENCES workflow_run(id) ON DELETE CASCADE, | |
| ref TEXT NOT NULL, | |
| config JSONB NOT NULL DEFAULT '{}', | |
| index INTEGER NOT NULL DEFAULT 0 | |
| ); | |
| CREATE INDEX workflow_workspace_id_idx ON workflow(workspace_id); | |
| CREATE INDEX workflow_job_template_workflow_id_idx ON workflow_job_template(workflow_id); | |
| CREATE INDEX workflow_run_workflow_id_idx ON workflow_run(workflow_id); | |
| CREATE INDEX workflow_job_workflow_run_id_idx ON workflow_job(workflow_run_id); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/workspace-engine/pkg/db/queries/schema.sql` around lines 252 - 282, The
foreign key columns lack indexes which will hurt list/query and cascade
performance; add explicit B-tree indexes on workflow.workspace_id,
workflow_job_template.workflow_id, workflow_run.workflow_id, and
workflow_job.workflow_run_id (i.e., create indexes for each FK column) so
lookups and cascading deletes are efficient and the SQLC schema reflects those
indexes.
There was a problem hiding this comment.
premature optimiziation
There was a problem hiding this comment.
@adityachoudhari26, understood! You can always add these indexes later if query performance becomes a concern.
✏️ Learnings added
Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 818
File: apps/workspace-engine/pkg/db/queries/schema.sql:252-282
Timestamp: 2026-02-26T23:01:19.319Z
Learning: User adityachoudhari26 prefers to defer adding database indexes on foreign keys until there's evidence they're needed, considering such additions premature optimization at the initial schema design stage.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 668
File: apps/webservice/src/app/api/v1/deployments/[deploymentId]/variables/variable-diff-check.ts:104-134
Timestamp: 2025-09-26T01:53:05.472Z
Learning: The deployment variable value tables in the ctrlplane/db schema do not have timestamp fields like createdAt or updatedAt. The deploymentVariableValueReference table has fields: id, variableValueId, reference, path, defaultValue. The deploymentVariableValue table has fields: id, variableId, resourceSelector, priority.
Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 668
File: apps/webservice/src/app/api/v1/deployments/[deploymentId]/variables/route.ts:185-194
Timestamp: 2025-09-26T01:51:59.096Z
Learning: The deploymentVariable schema in ctrlplane/db does not contain timestamp fields like createdAt or updatedAt - it only has id, key, description, deploymentId, defaultValueId, and config fields.
Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 637
File: packages/events/src/kafka/client.ts:10-16
Timestamp: 2025-08-01T04:41:41.345Z
Learning: User adityachoudhari26 prefers not to add null safety checks for required environment variables when they are guaranteed to be present in their deployment configuration, similar to their preference for simplicity over defensive programming in test code.
Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 601
File: e2e/tests/api/policies/retry-policy.spec.ts:23-24
Timestamp: 2025-06-24T23:52:50.732Z
Learning: The user adityachoudhari26 prefers not to add null safety checks or defensive programming in test code, particularly in e2e tests, as they prioritize simplicity and focus on the main functionality being tested rather than comprehensive error handling within the test itself.
| func WorkflowToOapi(row db.Workflow) *oapi.Workflow { | ||
| var inputs []oapi.WorkflowInput | ||
| if row.Inputs != nil { | ||
| _ = json.Unmarshal(row.Inputs, &inputs) | ||
| } | ||
|
|
||
| var jobs []oapi.WorkflowJobTemplate | ||
| if row.Jobs != nil { | ||
| _ = json.Unmarshal(row.Jobs, &jobs) | ||
| } | ||
|
|
||
| return &oapi.Workflow{ | ||
| Id: row.ID.String(), | ||
| Name: row.Name, | ||
| Inputs: inputs, | ||
| Jobs: jobs, | ||
| } | ||
| } | ||
|
|
||
| func ToWorkflowUpsertParams(workspaceID string, e *oapi.Workflow) (db.UpsertWorkflowParams, error) { | ||
| id, err := uuid.Parse(e.Id) | ||
| if err != nil { | ||
| return db.UpsertWorkflowParams{}, fmt.Errorf("parse id: %w", err) | ||
| } | ||
|
|
||
| wid, err := uuid.Parse(workspaceID) | ||
| if err != nil { | ||
| return db.UpsertWorkflowParams{}, fmt.Errorf("parse workspace_id: %w", err) | ||
| } | ||
|
|
||
| inputsBytes, err := json.Marshal(e.Inputs) | ||
| if err != nil { | ||
| return db.UpsertWorkflowParams{}, fmt.Errorf("marshal inputs: %w", err) | ||
| } | ||
|
|
||
| jobsBytes, err := json.Marshal(e.Jobs) | ||
| if err != nil { | ||
| return db.UpsertWorkflowParams{}, fmt.Errorf("marshal jobs: %w", err) | ||
| } | ||
|
|
||
| return db.UpsertWorkflowParams{ | ||
| ID: id, | ||
| Name: e.Name, | ||
| Inputs: inputsBytes, | ||
| Jobs: jobsBytes, | ||
| WorkspaceID: wid, | ||
| }, nil | ||
| } | ||
|
|
||
| func WorkflowJobTemplateToOapi(row db.WorkflowJobTemplate) *oapi.WorkflowJobTemplate { | ||
| var ifCond *string | ||
| if row.IfCondition.Valid { | ||
| ifCond = &row.IfCondition.String | ||
| } | ||
|
|
||
| var matrix *oapi.WorkflowJobMatrix | ||
| if row.Matrix != nil { | ||
| m := &oapi.WorkflowJobMatrix{} | ||
| if err := json.Unmarshal(row.Matrix, m); err == nil { | ||
| matrix = m | ||
| } | ||
| } | ||
|
|
||
| return &oapi.WorkflowJobTemplate{ | ||
| Id: row.ID.String(), | ||
| Name: row.Name, | ||
| Ref: row.Ref, | ||
| Config: row.Config, | ||
| If: ifCond, | ||
| Matrix: matrix, | ||
| } | ||
| } | ||
|
|
||
| func ToWorkflowJobTemplateUpsertParams(workspaceID string, e *oapi.WorkflowJobTemplate) (db.UpsertWorkflowJobTemplateParams, error) { | ||
| id, err := uuid.Parse(e.Id) | ||
| if err != nil { | ||
| return db.UpsertWorkflowJobTemplateParams{}, fmt.Errorf("parse id: %w", err) | ||
| } | ||
|
|
||
| wid, err := uuid.Parse(workspaceID) | ||
| if err != nil { | ||
| return db.UpsertWorkflowJobTemplateParams{}, fmt.Errorf("parse workspace_id: %w", err) | ||
| } | ||
|
|
||
| var ifCondition pgtype.Text | ||
| if e.If != nil { | ||
| ifCondition = pgtype.Text{String: *e.If, Valid: true} | ||
| } | ||
|
|
||
| var matrixBytes []byte | ||
| if e.Matrix != nil { | ||
| matrixBytes, err = json.Marshal(e.Matrix) | ||
| if err != nil { | ||
| return db.UpsertWorkflowJobTemplateParams{}, fmt.Errorf("marshal matrix: %w", err) | ||
| } | ||
| } | ||
|
|
||
| return db.UpsertWorkflowJobTemplateParams{ | ||
| ID: id, | ||
| Name: e.Name, | ||
| Ref: e.Ref, | ||
| Config: e.Config, | ||
| IfCondition: ifCondition, | ||
| Matrix: matrixBytes, | ||
| WorkspaceID: wid, | ||
| }, nil | ||
| } | ||
|
|
||
| func WorkflowRunToOapi(row db.WorkflowRun) *oapi.WorkflowRun { | ||
| return &oapi.WorkflowRun{ | ||
| Id: row.ID.String(), | ||
| WorkflowId: row.WorkflowID.String(), | ||
| Inputs: row.Inputs, | ||
| } | ||
| } | ||
|
|
||
| func ToWorkflowRunUpsertParams(e *oapi.WorkflowRun) (db.UpsertWorkflowRunParams, error) { | ||
| id, err := uuid.Parse(e.Id) | ||
| if err != nil { | ||
| return db.UpsertWorkflowRunParams{}, fmt.Errorf("parse id: %w", err) | ||
| } | ||
|
|
||
| wfid, err := uuid.Parse(e.WorkflowId) | ||
| if err != nil { | ||
| return db.UpsertWorkflowRunParams{}, fmt.Errorf("parse workflow_id: %w", err) | ||
| } | ||
|
|
||
| inputs := e.Inputs | ||
| if inputs == nil { | ||
| inputs = make(map[string]any) | ||
| } | ||
|
|
||
| return db.UpsertWorkflowRunParams{ | ||
| ID: id, | ||
| WorkflowID: wfid, | ||
| Inputs: inputs, | ||
| }, nil | ||
| } | ||
|
|
||
| func WorkflowJobToOapi(row db.WorkflowJob) *oapi.WorkflowJob { | ||
| return &oapi.WorkflowJob{ | ||
| Id: row.ID.String(), | ||
| WorkflowRunId: row.WorkflowRunID.String(), | ||
| Ref: row.Ref, | ||
| Config: row.Config, | ||
| Index: int(row.Index), | ||
| } | ||
| } | ||
|
|
||
| func ToWorkflowJobUpsertParams(e *oapi.WorkflowJob) (db.UpsertWorkflowJobParams, error) { | ||
| id, err := uuid.Parse(e.Id) | ||
| if err != nil { | ||
| return db.UpsertWorkflowJobParams{}, fmt.Errorf("parse id: %w", err) | ||
| } | ||
|
|
||
| wrid, err := uuid.Parse(e.WorkflowRunId) | ||
| if err != nil { | ||
| return db.UpsertWorkflowJobParams{}, fmt.Errorf("parse workflow_run_id: %w", err) | ||
| } | ||
|
|
||
| return db.UpsertWorkflowJobParams{ | ||
| ID: id, | ||
| WorkflowRunID: wrid, | ||
| Ref: e.Ref, | ||
| Config: e.Config, | ||
| Index: int32(e.Index), | ||
| }, nil | ||
| } |
There was a problem hiding this comment.
Document exported mapper functions.
This file introduces several exported conversion helpers without doc comments, which makes package API intent harder to discover.
As per coding guidelines, "Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go`
around lines 13 - 180, Several exported mapper functions lack package-level
documentation; add clear doc comments for each exported function
(WorkflowToOapi, ToWorkflowUpsertParams, WorkflowJobTemplateToOapi,
ToWorkflowJobTemplateUpsertParams, WorkflowRunToOapi, ToWorkflowRunUpsertParams,
WorkflowJobToOapi, ToWorkflowJobUpsertParams) describing what they convert, the
expected input types and any important behavior (e.g., JSON
unmarshalling/marshalling, handling of nil/valid fields, error returns). Place a
brief single-sentence comment immediately above each function declaration
following Go doc conventions (starting with the function name) and note special
cases like when Inputs/Jobs are nil, IfCondition/Matrix handling, or when errors
are returned from uuid.Parse/json.Marshal.
| type WorkflowRepo struct { | ||
| ctx context.Context | ||
| workspaceID string | ||
| } | ||
|
|
||
| func NewWorkflowRepo(ctx context.Context, workspaceID string) *WorkflowRepo { | ||
| return &WorkflowRepo{ctx: ctx, workspaceID: workspaceID} | ||
| } | ||
|
|
||
| func (r *WorkflowRepo) Get(id string) (*oapi.Workflow, bool) { | ||
| uid, err := uuid.Parse(id) | ||
| if err != nil { | ||
| log.Warn("Failed to parse workflow id", "id", id, "error", err) | ||
| return nil, false | ||
| } | ||
|
|
||
| row, err := db.GetQueries(r.ctx).GetWorkflowByID(r.ctx, uid) | ||
| if err != nil { | ||
| return nil, false | ||
| } | ||
|
|
||
| return WorkflowToOapi(row), true | ||
| } | ||
|
|
||
| func (r *WorkflowRepo) Set(entity *oapi.Workflow) error { | ||
| params, err := ToWorkflowUpsertParams(r.workspaceID, entity) | ||
| if err != nil { | ||
| return fmt.Errorf("convert to upsert params: %w", err) | ||
| } | ||
|
|
||
| _, err = db.GetQueries(r.ctx).UpsertWorkflow(r.ctx, params) | ||
| if err != nil { | ||
| return fmt.Errorf("upsert workflow: %w", err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (r *WorkflowRepo) Remove(id string) error { | ||
| uid, err := uuid.Parse(id) | ||
| if err != nil { | ||
| return fmt.Errorf("parse id: %w", err) | ||
| } | ||
|
|
||
| return db.GetQueries(r.ctx).DeleteWorkflow(r.ctx, uid) | ||
| } | ||
|
|
||
| func (r *WorkflowRepo) Items() map[string]*oapi.Workflow { | ||
| uid, err := uuid.Parse(r.workspaceID) | ||
| if err != nil { | ||
| log.Warn("Failed to parse workspace id for Items()", "id", r.workspaceID, "error", err) | ||
| return make(map[string]*oapi.Workflow) | ||
| } | ||
|
|
||
| rows, err := db.GetQueries(r.ctx).ListWorkflowsByWorkspaceID(r.ctx, uid) | ||
| if err != nil { | ||
| log.Warn("Failed to list workflows by workspace", "workspaceId", r.workspaceID, "error", err) | ||
| return make(map[string]*oapi.Workflow) | ||
| } | ||
|
|
||
| result := make(map[string]*oapi.Workflow, len(rows)) | ||
| for _, row := range rows { | ||
| w := WorkflowToOapi(row) | ||
| result[w.Id] = w | ||
| } | ||
| return result | ||
| } | ||
|
|
||
| type WorkflowJobTemplateRepo struct { | ||
| ctx context.Context | ||
| workspaceID string | ||
| } | ||
|
|
||
| func NewWorkflowJobTemplateRepo(ctx context.Context, workspaceID string) *WorkflowJobTemplateRepo { | ||
| return &WorkflowJobTemplateRepo{ctx: ctx, workspaceID: workspaceID} | ||
| } | ||
|
|
||
| func (r *WorkflowJobTemplateRepo) Get(id string) (*oapi.WorkflowJobTemplate, bool) { | ||
| uid, err := uuid.Parse(id) | ||
| if err != nil { | ||
| log.Warn("Failed to parse workflow job template id", "id", id, "error", err) | ||
| return nil, false | ||
| } | ||
|
|
||
| row, err := db.GetQueries(r.ctx).GetWorkflowJobTemplateByID(r.ctx, uid) | ||
| if err != nil { | ||
| return nil, false | ||
| } | ||
|
|
||
| return WorkflowJobTemplateToOapi(row), true | ||
| } | ||
|
|
||
| func (r *WorkflowJobTemplateRepo) Set(entity *oapi.WorkflowJobTemplate) error { | ||
| params, err := ToWorkflowJobTemplateUpsertParams(r.workspaceID, entity) | ||
| if err != nil { | ||
| return fmt.Errorf("convert to upsert params: %w", err) | ||
| } | ||
|
|
||
| _, err = db.GetQueries(r.ctx).UpsertWorkflowJobTemplate(r.ctx, params) | ||
| if err != nil { | ||
| return fmt.Errorf("upsert workflow job template: %w", err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (r *WorkflowJobTemplateRepo) Remove(id string) error { | ||
| uid, err := uuid.Parse(id) | ||
| if err != nil { | ||
| return fmt.Errorf("parse id: %w", err) | ||
| } | ||
|
|
||
| return db.GetQueries(r.ctx).DeleteWorkflowJobTemplate(r.ctx, uid) | ||
| } | ||
|
|
||
| func (r *WorkflowJobTemplateRepo) Items() map[string]*oapi.WorkflowJobTemplate { | ||
| uid, err := uuid.Parse(r.workspaceID) | ||
| if err != nil { | ||
| log.Warn("Failed to parse workspace id for Items()", "id", r.workspaceID, "error", err) | ||
| return make(map[string]*oapi.WorkflowJobTemplate) | ||
| } | ||
|
|
||
| rows, err := db.GetQueries(r.ctx).ListWorkflowJobTemplatesByWorkspaceID(r.ctx, uid) | ||
| if err != nil { | ||
| log.Warn("Failed to list workflow job templates by workspace", "workspaceId", r.workspaceID, "error", err) | ||
| return make(map[string]*oapi.WorkflowJobTemplate) | ||
| } | ||
|
|
||
| result := make(map[string]*oapi.WorkflowJobTemplate, len(rows)) | ||
| for _, row := range rows { | ||
| jt := WorkflowJobTemplateToOapi(row) | ||
| result[jt.Id] = jt | ||
| } | ||
| return result | ||
| } | ||
|
|
||
| type WorkflowRunRepo struct { | ||
| ctx context.Context | ||
| workspaceID string | ||
| } | ||
|
|
||
| func NewWorkflowRunRepo(ctx context.Context, workspaceID string) *WorkflowRunRepo { | ||
| return &WorkflowRunRepo{ctx: ctx, workspaceID: workspaceID} | ||
| } | ||
|
|
||
| func (r *WorkflowRunRepo) Get(id string) (*oapi.WorkflowRun, bool) { | ||
| uid, err := uuid.Parse(id) | ||
| if err != nil { | ||
| log.Warn("Failed to parse workflow run id", "id", id, "error", err) | ||
| return nil, false | ||
| } | ||
|
|
||
| row, err := db.GetQueries(r.ctx).GetWorkflowRunByID(r.ctx, uid) | ||
| if err != nil { | ||
| return nil, false | ||
| } | ||
|
|
||
| return WorkflowRunToOapi(row), true | ||
| } | ||
|
|
||
| func (r *WorkflowRunRepo) Set(entity *oapi.WorkflowRun) error { | ||
| params, err := ToWorkflowRunUpsertParams(entity) | ||
| if err != nil { | ||
| return fmt.Errorf("convert to upsert params: %w", err) | ||
| } | ||
|
|
||
| _, err = db.GetQueries(r.ctx).UpsertWorkflowRun(r.ctx, params) | ||
| if err != nil { | ||
| return fmt.Errorf("upsert workflow run: %w", err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (r *WorkflowRunRepo) Remove(id string) error { | ||
| uid, err := uuid.Parse(id) | ||
| if err != nil { | ||
| return fmt.Errorf("parse id: %w", err) | ||
| } | ||
|
|
||
| return db.GetQueries(r.ctx).DeleteWorkflowRun(r.ctx, uid) | ||
| } | ||
|
|
||
| func (r *WorkflowRunRepo) Items() map[string]*oapi.WorkflowRun { | ||
| uid, err := uuid.Parse(r.workspaceID) | ||
| if err != nil { | ||
| log.Warn("Failed to parse workspace id for Items()", "id", r.workspaceID, "error", err) | ||
| return make(map[string]*oapi.WorkflowRun) | ||
| } | ||
|
|
||
| rows, err := db.GetQueries(r.ctx).ListWorkflowRunsByWorkspaceID(r.ctx, uid) | ||
| if err != nil { | ||
| log.Warn("Failed to list workflow runs by workspace", "workspaceId", r.workspaceID, "error", err) | ||
| return make(map[string]*oapi.WorkflowRun) | ||
| } | ||
|
|
||
| result := make(map[string]*oapi.WorkflowRun, len(rows)) | ||
| for _, row := range rows { | ||
| wr := WorkflowRunToOapi(row) | ||
| result[wr.Id] = wr | ||
| } | ||
| return result | ||
| } | ||
|
|
||
| func (r *WorkflowRunRepo) GetByWorkflowID(workflowID string) ([]*oapi.WorkflowRun, error) { | ||
| uid, err := uuid.Parse(workflowID) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("parse workflow_id: %w", err) | ||
| } | ||
|
|
||
| rows, err := db.GetQueries(r.ctx).ListWorkflowRunsByWorkflowID(r.ctx, uid) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("list workflow runs: %w", err) | ||
| } | ||
|
|
||
| result := make([]*oapi.WorkflowRun, len(rows)) | ||
| for i, row := range rows { | ||
| result[i] = WorkflowRunToOapi(row) | ||
| } | ||
| return result, nil | ||
| } | ||
|
|
||
| type WorkflowJobRepo struct { | ||
| ctx context.Context | ||
| workspaceID string | ||
| } | ||
|
|
||
| func NewWorkflowJobRepo(ctx context.Context, workspaceID string) *WorkflowJobRepo { | ||
| return &WorkflowJobRepo{ctx: ctx, workspaceID: workspaceID} | ||
| } | ||
|
|
||
| func (r *WorkflowJobRepo) Get(id string) (*oapi.WorkflowJob, bool) { | ||
| uid, err := uuid.Parse(id) | ||
| if err != nil { | ||
| log.Warn("Failed to parse workflow job id", "id", id, "error", err) | ||
| return nil, false | ||
| } | ||
|
|
||
| row, err := db.GetQueries(r.ctx).GetWorkflowJobByID(r.ctx, uid) | ||
| if err != nil { | ||
| return nil, false | ||
| } | ||
|
|
||
| return WorkflowJobToOapi(row), true | ||
| } | ||
|
|
||
| func (r *WorkflowJobRepo) Set(entity *oapi.WorkflowJob) error { | ||
| params, err := ToWorkflowJobUpsertParams(entity) | ||
| if err != nil { | ||
| return fmt.Errorf("convert to upsert params: %w", err) | ||
| } | ||
|
|
||
| _, err = db.GetQueries(r.ctx).UpsertWorkflowJob(r.ctx, params) | ||
| if err != nil { | ||
| return fmt.Errorf("upsert workflow job: %w", err) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (r *WorkflowJobRepo) Remove(id string) error { | ||
| uid, err := uuid.Parse(id) | ||
| if err != nil { | ||
| return fmt.Errorf("parse id: %w", err) | ||
| } | ||
|
|
||
| return db.GetQueries(r.ctx).DeleteWorkflowJob(r.ctx, uid) | ||
| } | ||
|
|
||
| func (r *WorkflowJobRepo) Items() map[string]*oapi.WorkflowJob { | ||
| uid, err := uuid.Parse(r.workspaceID) | ||
| if err != nil { | ||
| log.Warn("Failed to parse workspace id for Items()", "id", r.workspaceID, "error", err) | ||
| return make(map[string]*oapi.WorkflowJob) | ||
| } | ||
|
|
||
| rows, err := db.GetQueries(r.ctx).ListWorkflowJobsByWorkspaceID(r.ctx, uid) | ||
| if err != nil { | ||
| log.Warn("Failed to list workflow jobs by workspace", "workspaceId", r.workspaceID, "error", err) | ||
| return make(map[string]*oapi.WorkflowJob) | ||
| } | ||
|
|
||
| result := make(map[string]*oapi.WorkflowJob, len(rows)) | ||
| for _, row := range rows { | ||
| wj := WorkflowJobToOapi(row) | ||
| result[wj.Id] = wj | ||
| } | ||
| return result | ||
| } | ||
|
|
||
| func (r *WorkflowJobRepo) GetByWorkflowRunID(workflowRunID string) ([]*oapi.WorkflowJob, error) { | ||
| uid, err := uuid.Parse(workflowRunID) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("parse workflow_run_id: %w", err) | ||
| } | ||
|
|
||
| rows, err := db.GetQueries(r.ctx).ListWorkflowJobsByWorkflowRunID(r.ctx, uid) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("list workflow jobs: %w", err) | ||
| } | ||
|
|
||
| result := make([]*oapi.WorkflowJob, len(rows)) | ||
| for i, row := range rows { | ||
| result[i] = WorkflowJobToOapi(row) | ||
| } | ||
| return result, nil | ||
| } |
There was a problem hiding this comment.
Add doc comments for exported repository types and constructors.
The exported repo types and constructors in this file should be documented to satisfy package API documentation requirements.
As per coding guidelines, "Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/repo.go`
around lines 13 - 315, Add Go doc comments for each exported repository type and
their constructors: document WorkflowRepo and NewWorkflowRepo,
WorkflowJobTemplateRepo and NewWorkflowJobTemplateRepo, WorkflowRunRepo and
NewWorkflowRunRepo, and WorkflowJobRepo and NewWorkflowJobRepo. For each comment
place a short sentence above the type/constructor explaining its purpose and
usage in the package API (e.g., "WorkflowRepo provides workspace-scoped
persistence for workflows." and "NewWorkflowRepo creates a new WorkflowRepo for
the given workspace."). Ensure comments follow Go convention starting with the
symbol name and give any non-obvious context about workspaceID or ctx where
relevant.
| func (s *InMemory) Workflows() repository.WorkflowRepo { | ||
| return &cmapRepoAdapter[*oapi.Workflow]{store: &s.workflows} | ||
| } | ||
|
|
||
| func (s *InMemory) WorkflowJobTemplates() repository.WorkflowJobTemplateRepo { | ||
| return &cmapRepoAdapter[*oapi.WorkflowJobTemplate]{store: &s.workflowJobTemplates} | ||
| } | ||
|
|
||
| type workflowRunRepoAdapter struct { | ||
| store *cmap.ConcurrentMap[string, *oapi.WorkflowRun] | ||
| } | ||
|
|
||
| func (a *workflowRunRepoAdapter) Get(id string) (*oapi.WorkflowRun, bool) { | ||
| return a.store.Get(id) | ||
| } | ||
|
|
||
| func (a *workflowRunRepoAdapter) Set(entity *oapi.WorkflowRun) error { | ||
| a.store.Set(entity.Id, entity) | ||
| return nil | ||
| } | ||
|
|
||
| func (a *workflowRunRepoAdapter) Remove(id string) error { | ||
| a.store.Remove(id) | ||
| return nil | ||
| } | ||
|
|
||
| func (a *workflowRunRepoAdapter) Items() map[string]*oapi.WorkflowRun { | ||
| return a.store.Items() | ||
| } | ||
|
|
||
| func (a *workflowRunRepoAdapter) GetByWorkflowID(workflowID string) ([]*oapi.WorkflowRun, error) { | ||
| var result []*oapi.WorkflowRun | ||
| for item := range a.store.IterBuffered() { | ||
| if item.Val.WorkflowId == workflowID { | ||
| result = append(result, item.Val) | ||
| } | ||
| } | ||
| return result, nil | ||
| } | ||
|
|
||
| func (s *InMemory) WorkflowRuns() repository.WorkflowRunRepo { | ||
| return &workflowRunRepoAdapter{store: &s.workflowRuns} | ||
| } | ||
|
|
||
| type workflowJobRepoAdapter struct { | ||
| store *cmap.ConcurrentMap[string, *oapi.WorkflowJob] | ||
| } | ||
|
|
||
| func (a *workflowJobRepoAdapter) Get(id string) (*oapi.WorkflowJob, bool) { | ||
| return a.store.Get(id) | ||
| } | ||
|
|
||
| func (a *workflowJobRepoAdapter) Set(entity *oapi.WorkflowJob) error { | ||
| a.store.Set(entity.Id, entity) | ||
| return nil | ||
| } | ||
|
|
||
| func (a *workflowJobRepoAdapter) Remove(id string) error { | ||
| a.store.Remove(id) | ||
| return nil | ||
| } | ||
|
|
||
| func (a *workflowJobRepoAdapter) Items() map[string]*oapi.WorkflowJob { | ||
| return a.store.Items() | ||
| } | ||
|
|
||
| func (a *workflowJobRepoAdapter) GetByWorkflowRunID(workflowRunID string) ([]*oapi.WorkflowJob, error) { | ||
| var result []*oapi.WorkflowJob | ||
| for item := range a.store.IterBuffered() { | ||
| if item.Val.WorkflowRunId == workflowRunID { | ||
| result = append(result, item.Val) | ||
| } | ||
| } | ||
| return result, nil | ||
| } | ||
|
|
||
| func (s *InMemory) WorkflowJobs() repository.WorkflowJobRepo { | ||
| return &workflowJobRepoAdapter{store: &s.workflowJobs} | ||
| } |
There was a problem hiding this comment.
Add doc comments for the newly exported workflow repository accessors.
Workflows, WorkflowJobTemplates, WorkflowRuns, and WorkflowJobs are exported and should be documented for package-level API clarity.
As per coding guidelines, "Write comments that explain why, document complex logic/algorithms, provide non-obvious context, include TODO/FIXME, and document exported functions/types/methods".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/workspace-engine/pkg/workspace/store/repository/memory/repo.go` around
lines 595 - 673, Add package-level doc comments for the exported accessor
methods Workflows, WorkflowJobTemplates, WorkflowRuns, and WorkflowJobs: for
each of the methods (InMemory.Workflows, InMemory.WorkflowJobTemplates,
InMemory.WorkflowRuns, InMemory.WorkflowJobs) add a brief comment above the
method that documents what repository is returned, the intended usage/context
and any non-obvious behavior (e.g., that these return in-memory adapters backed
by cmap.ConcurrentMap and are safe for concurrent use), following the project's
comment style for exported symbols.
| // for _, jobID := range allJobIDs { | ||
| // _, ok := engine.Workspace().Jobs().Get(jobID) | ||
| // assert.False(t, ok, "job %s should be removed", jobID) | ||
| // } |
There was a problem hiding this comment.
Restore job-deletion verification in cascade test.
Line 522 comments out the only assertion that verifies job rows are removed; this weakens the cascade contract the test is supposed to enforce.
💡 Proposed fix
- // for _, jobID := range allJobIDs {
- // _, ok := engine.Workspace().Jobs().Get(jobID)
- // assert.False(t, ok, "job %s should be removed", jobID)
- // }
+ for _, jobID := range allJobIDs {
+ _, ok := engine.Workspace().Jobs().Get(jobID)
+ assert.False(t, ok, "job %s should be removed", jobID)
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // for _, jobID := range allJobIDs { | |
| // _, ok := engine.Workspace().Jobs().Get(jobID) | |
| // assert.False(t, ok, "job %s should be removed", jobID) | |
| // } | |
| for _, jobID := range allJobIDs { | |
| _, ok := engine.Workspace().Jobs().Get(jobID) | |
| assert.False(t, ok, "job %s should be removed", jobID) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/workspace-engine/test/e2e/engine_workflow_test.go` around lines 522 -
525, Restore the deleted verification loop so the cascade test asserts job rows
are removed: re-enable the commented loop that iterates over allJobIDs and for
each jobID calls engine.Workspace().Jobs().Get(jobID) and uses assert.False(t,
ok, "job %s should be removed", jobID) (or equivalent assertion) to fail the
test if any job still exists; ensure the loop references the existing allJobIDs
variable and the engine.Workspace().Jobs().Get method exactly as in the diff.
| ALTER TABLE "workflow" ADD CONSTRAINT "workflow_workspace_id_workspace_id_fk" FOREIGN KEY ("workspace_id") REFERENCES "public"."workspace"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint | ||
| ALTER TABLE "workflow_job" ADD CONSTRAINT "workflow_job_workflow_run_id_workflow_run_id_fk" FOREIGN KEY ("workflow_run_id") REFERENCES "public"."workflow_run"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint | ||
| ALTER TABLE "workflow_job_template" ADD CONSTRAINT "workflow_job_template_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint | ||
| ALTER TABLE "workflow_run" ADD CONSTRAINT "workflow_run_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action; No newline at end of file |
There was a problem hiding this comment.
Add indexes for new foreign keys.
The new FK columns are unindexed. This will hurt query performance and can significantly slow cascading deletes on parent rows.
⚡ Suggested migration patch
ALTER TABLE "workflow" ADD CONSTRAINT "workflow_workspace_id_workspace_id_fk" FOREIGN KEY ("workspace_id") REFERENCES "public"."workspace"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_job" ADD CONSTRAINT "workflow_job_workflow_run_id_workflow_run_id_fk" FOREIGN KEY ("workflow_run_id") REFERENCES "public"."workflow_run"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_job_template" ADD CONSTRAINT "workflow_job_template_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_run" ADD CONSTRAINT "workflow_run_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;
+--> statement-breakpoint
+CREATE INDEX "workflow_workspace_id_idx" ON "workflow" ("workspace_id");
+--> statement-breakpoint
+CREATE INDEX "workflow_job_workflow_run_id_idx" ON "workflow_job" ("workflow_run_id");
+--> statement-breakpoint
+CREATE INDEX "workflow_job_template_workflow_id_idx" ON "workflow_job_template" ("workflow_id");
+--> statement-breakpoint
+CREATE INDEX "workflow_run_workflow_id_idx" ON "workflow_run" ("workflow_id");📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ALTER TABLE "workflow" ADD CONSTRAINT "workflow_workspace_id_workspace_id_fk" FOREIGN KEY ("workspace_id") REFERENCES "public"."workspace"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint | |
| ALTER TABLE "workflow_job" ADD CONSTRAINT "workflow_job_workflow_run_id_workflow_run_id_fk" FOREIGN KEY ("workflow_run_id") REFERENCES "public"."workflow_run"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint | |
| ALTER TABLE "workflow_job_template" ADD CONSTRAINT "workflow_job_template_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint | |
| ALTER TABLE "workflow_run" ADD CONSTRAINT "workflow_run_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action; | |
| ALTER TABLE "workflow" ADD CONSTRAINT "workflow_workspace_id_workspace_id_fk" FOREIGN KEY ("workspace_id") REFERENCES "public"."workspace"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint | |
| ALTER TABLE "workflow_job" ADD CONSTRAINT "workflow_job_workflow_run_id_workflow_run_id_fk" FOREIGN KEY ("workflow_run_id") REFERENCES "public"."workflow_run"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint | |
| ALTER TABLE "workflow_job_template" ADD CONSTRAINT "workflow_job_template_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint | |
| ALTER TABLE "workflow_run" ADD CONSTRAINT "workflow_run_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint | |
| CREATE INDEX "workflow_workspace_id_idx" ON "workflow" ("workspace_id");--> statement-breakpoint | |
| CREATE INDEX "workflow_job_workflow_run_id_idx" ON "workflow_job" ("workflow_run_id");--> statement-breakpoint | |
| CREATE INDEX "workflow_job_template_workflow_id_idx" ON "workflow_job_template" ("workflow_id");--> statement-breakpoint | |
| CREATE INDEX "workflow_run_workflow_id_idx" ON "workflow_run" ("workflow_id"); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/db/drizzle/0153_neat_quasimodo.sql` around lines 33 - 36, The new
foreign-key columns need indexes to avoid performance regressions; add CREATE
INDEX statements for workflow.workspace_id, workflow_run.workflow_id,
workflow_job.workflow_run_id, and workflow_job_template.workflow_id (e.g.,
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_workflow_workspace_id ON
public.workflow(workspace_id), idx_workflow_run_workflow_id ON
public.workflow_run(workflow_id), idx_workflow_job_workflow_run_id ON
public.workflow_job(workflow_run_id), idx_workflow_job_template_workflow_id ON
public.workflow_job_template(workflow_id)) in the same migration so each FK
column is indexed and names are unique and descriptive.
There was a problem hiding this comment.
premature optimization
There was a problem hiding this comment.
@adityachoudhari26, understood. I respect your preference to add indexes once the workload patterns are better understood. The load-dependent approach of initially designing with foreign keys but deferring indexes until the database grows and workload is understood is a valid strategy for avoiding premature optimization.
🧠 Learnings used
Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 818
File: apps/workspace-engine/pkg/db/queries/schema.sql:252-282
Timestamp: 2026-02-26T23:01:19.319Z
Learning: User adityachoudhari26 prefers to defer adding database indexes on foreign keys until there's evidence they're needed, considering such additions premature optimization at the initial schema design stage.
Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 637
File: packages/events/src/kafka/client.ts:10-16
Timestamp: 2025-08-01T04:41:41.345Z
Learning: User adityachoudhari26 prefers not to add null safety checks for required environment variables when they are guaranteed to be present in their deployment configuration, similar to their preference for simplicity over defensive programming in test code.
Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 601
File: e2e/tests/api/policies/retry-policy.spec.ts:23-24
Timestamp: 2025-06-24T23:52:50.732Z
Learning: The user adityachoudhari26 prefers not to add null safety checks or defensive programming in test code, particularly in e2e tests, as they prioritize simplicity and focus on the main functionality being tested rather than comprehensive error handling within the test itself.
packages/db/src/schema/workflow.ts
Outdated
| import { | ||
| integer, | ||
| jsonb, | ||
| pgTable, | ||
| text, | ||
| uuid, | ||
| } from "drizzle-orm/pg-core"; | ||
|
|
||
| import { workspace } from "./workspace.js"; | ||
|
|
||
| export const workflow = pgTable("workflow", { | ||
| id: uuid("id").primaryKey().defaultRandom(), | ||
| name: text("name").notNull(), | ||
| inputs: jsonb("inputs").notNull().default("[]"), | ||
| jobs: jsonb("jobs").notNull().default("[]"), | ||
| workspaceId: uuid("workspace_id") | ||
| .notNull() | ||
| .references(() => workspace.id, { onDelete: "cascade" }), | ||
| }); | ||
|
|
||
| export const workflowJobTemplate = pgTable("workflow_job_template", { | ||
| id: uuid("id").primaryKey().defaultRandom(), | ||
| workflowId: uuid("workflow_id") | ||
| .notNull() | ||
| .references(() => workflow.id, { onDelete: "cascade" }), | ||
| name: text("name").notNull(), | ||
| ref: text("ref").notNull(), | ||
| config: jsonb("config").notNull().default("{}"), | ||
| ifCondition: text("if_condition"), | ||
| matrix: jsonb("matrix"), | ||
| }); | ||
|
|
||
| export const workflowRun = pgTable("workflow_run", { | ||
| id: uuid("id").primaryKey().defaultRandom(), | ||
| workflowId: uuid("workflow_id") | ||
| .notNull() | ||
| .references(() => workflow.id, { onDelete: "cascade" }), | ||
| inputs: jsonb("inputs").notNull().default("{}"), | ||
| }); | ||
|
|
||
| export const workflowJob = pgTable("workflow_job", { | ||
| id: uuid("id").primaryKey().defaultRandom(), | ||
| workflowRunId: uuid("workflow_run_id") | ||
| .notNull() | ||
| .references(() => workflowRun.id, { onDelete: "cascade" }), | ||
| ref: text("ref").notNull(), | ||
| config: jsonb("config").notNull().default("{}"), | ||
| index: integer("index").notNull().default(0), | ||
| }); |
There was a problem hiding this comment.
Define FK indexes in Drizzle schema to match relational access patterns.
The schema defines four FK columns but no indexes. Add them here so generated/maintained migrations preserve performance characteristics.
🧩 Suggested Drizzle update
import {
+ index,
integer,
jsonb,
pgTable,
text,
uuid,
} from "drizzle-orm/pg-core";
@@
export const workflow = pgTable("workflow", {
@@
workspaceId: uuid("workspace_id")
.notNull()
.references(() => workspace.id, { onDelete: "cascade" }),
-});
+}, (table) => ({
+ workflowWorkspaceIdIdx: index("workflow_workspace_id_idx").on(table.workspaceId),
+}));
@@
export const workflowJobTemplate = pgTable("workflow_job_template", {
@@
matrix: jsonb("matrix"),
-});
+}, (table) => ({
+ workflowJobTemplateWorkflowIdIdx: index("workflow_job_template_workflow_id_idx").on(table.workflowId),
+}));
@@
export const workflowRun = pgTable("workflow_run", {
@@
inputs: jsonb("inputs").notNull().default("{}"),
-});
+}, (table) => ({
+ workflowRunWorkflowIdIdx: index("workflow_run_workflow_id_idx").on(table.workflowId),
+}));
@@
export const workflowJob = pgTable("workflow_job", {
@@
index: integer("index").notNull().default(0),
-});
+}, (table) => ({
+ workflowJobWorkflowRunIdIdx: index("workflow_job_workflow_run_id_idx").on(table.workflowRunId),
+}));📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import { | |
| integer, | |
| jsonb, | |
| pgTable, | |
| text, | |
| uuid, | |
| } from "drizzle-orm/pg-core"; | |
| import { workspace } from "./workspace.js"; | |
| export const workflow = pgTable("workflow", { | |
| id: uuid("id").primaryKey().defaultRandom(), | |
| name: text("name").notNull(), | |
| inputs: jsonb("inputs").notNull().default("[]"), | |
| jobs: jsonb("jobs").notNull().default("[]"), | |
| workspaceId: uuid("workspace_id") | |
| .notNull() | |
| .references(() => workspace.id, { onDelete: "cascade" }), | |
| }); | |
| export const workflowJobTemplate = pgTable("workflow_job_template", { | |
| id: uuid("id").primaryKey().defaultRandom(), | |
| workflowId: uuid("workflow_id") | |
| .notNull() | |
| .references(() => workflow.id, { onDelete: "cascade" }), | |
| name: text("name").notNull(), | |
| ref: text("ref").notNull(), | |
| config: jsonb("config").notNull().default("{}"), | |
| ifCondition: text("if_condition"), | |
| matrix: jsonb("matrix"), | |
| }); | |
| export const workflowRun = pgTable("workflow_run", { | |
| id: uuid("id").primaryKey().defaultRandom(), | |
| workflowId: uuid("workflow_id") | |
| .notNull() | |
| .references(() => workflow.id, { onDelete: "cascade" }), | |
| inputs: jsonb("inputs").notNull().default("{}"), | |
| }); | |
| export const workflowJob = pgTable("workflow_job", { | |
| id: uuid("id").primaryKey().defaultRandom(), | |
| workflowRunId: uuid("workflow_run_id") | |
| .notNull() | |
| .references(() => workflowRun.id, { onDelete: "cascade" }), | |
| ref: text("ref").notNull(), | |
| config: jsonb("config").notNull().default("{}"), | |
| index: integer("index").notNull().default(0), | |
| }); | |
| import { | |
| index, | |
| integer, | |
| jsonb, | |
| pgTable, | |
| text, | |
| uuid, | |
| } from "drizzle-orm/pg-core"; | |
| import { workspace } from "./workspace.js"; | |
| export const workflow = pgTable("workflow", { | |
| id: uuid("id").primaryKey().defaultRandom(), | |
| name: text("name").notNull(), | |
| inputs: jsonb("inputs").notNull().default("[]"), | |
| jobs: jsonb("jobs").notNull().default("[]"), | |
| workspaceId: uuid("workspace_id") | |
| .notNull() | |
| .references(() => workspace.id, { onDelete: "cascade" }), | |
| }, (table) => ({ | |
| workflowWorkspaceIdIdx: index("workflow_workspace_id_idx").on(table.workspaceId), | |
| })); | |
| export const workflowJobTemplate = pgTable("workflow_job_template", { | |
| id: uuid("id").primaryKey().defaultRandom(), | |
| workflowId: uuid("workflow_id") | |
| .notNull() | |
| .references(() => workflow.id, { onDelete: "cascade" }), | |
| name: text("name").notNull(), | |
| ref: text("ref").notNull(), | |
| config: jsonb("config").notNull().default("{}"), | |
| ifCondition: text("if_condition"), | |
| matrix: jsonb("matrix"), | |
| }, (table) => ({ | |
| workflowJobTemplateWorkflowIdIdx: index("workflow_job_template_workflow_id_idx").on(table.workflowId), | |
| })); | |
| export const workflowRun = pgTable("workflow_run", { | |
| id: uuid("id").primaryKey().defaultRandom(), | |
| workflowId: uuid("workflow_id") | |
| .notNull() | |
| .references(() => workflow.id, { onDelete: "cascade" }), | |
| inputs: jsonb("inputs").notNull().default("{}"), | |
| }, (table) => ({ | |
| workflowRunWorkflowIdIdx: index("workflow_run_workflow_id_idx").on(table.workflowId), | |
| })); | |
| export const workflowJob = pgTable("workflow_job", { | |
| id: uuid("id").primaryKey().defaultRandom(), | |
| workflowRunId: uuid("workflow_run_id") | |
| .notNull() | |
| .references(() => workflowRun.id, { onDelete: "cascade" }), | |
| ref: text("ref").notNull(), | |
| config: jsonb("config").notNull().default("{}"), | |
| index: integer("index").notNull().default(0), | |
| }, (table) => ({ | |
| workflowJobWorkflowRunIdIdx: index("workflow_job_workflow_run_id_idx").on(table.workflowRunId), | |
| })); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@packages/db/src/schema/workflow.ts` around lines 1 - 49, Add explicit indexes
for the foreign key columns so Drizzle generates migrations that preserve
relational access patterns: add indexes for workflow.workspaceId,
workflowJobTemplate.workflowId, workflowRun.workflowId, and
workflowJob.workflowRunId by declaring appropriate index definitions for those
columns in the respective pgTable definitions (workflow, workflow_job_template,
workflow_run, workflow_job); ensure the index names are unique and descriptive
and follow existing naming conventions so migrations include CREATE INDEX for
these FK columns.
There was a problem hiding this comment.
premature optimization
There was a problem hiding this comment.
@adityachoudhari26, understood! I'll defer the index additions until there's evidence they're needed in practice.
🧠 Learnings used
Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 579
File: packages/db/src/schema/rules/concurrency.ts:0-0
Timestamp: 2025-06-01T19:10:11.535Z
Learning: In the ctrlplane codebase, when defining database schemas with Drizzle ORM, it's an intentional pattern to spread base fields (like `basePolicyRuleFields`) and then redefine specific fields to add additional constraints (like unique constraints or foreign key references). The TypeScript field overwriting behavior is deliberately used to override base field definitions with more specific requirements.
Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 601
File: packages/job-dispatch/src/job-update.ts:264-270
Timestamp: 2025-06-24T23:56:54.799Z
Learning: In this codebase, the `Tx` type is just an alias for the database client type (`Omit<typeof db, "$client">`) and does not necessarily indicate an active transaction context. Functions like `createReleaseJob` need to be called within a transaction, which is why they are wrapped with `db.transaction()` even when the parameter is typed as `Tx`. Drizzle supports nested transactions via breakpoints, so additional transaction wrappers are safe even if already within a transaction.
Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 818
File: apps/workspace-engine/pkg/db/queries/schema.sql:252-282
Timestamp: 2026-02-26T23:01:19.319Z
Learning: User adityachoudhari26 prefers to defer adding database indexes on foreign keys until there's evidence they're needed, considering such additions premature optimization at the initial schema design stage.
Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 637
File: packages/events/src/kafka/client.ts:10-16
Timestamp: 2025-08-01T04:41:41.345Z
Learning: User adityachoudhari26 prefers not to add null safety checks for required environment variables when they are guaranteed to be present in their deployment configuration, similar to their preference for simplicity over defensive programming in test code.
Learnt from: adityachoudhari26
Repo: ctrlplanedev/ctrlplane PR: 601
File: e2e/tests/api/policies/retry-policy.spec.ts:23-24
Timestamp: 2025-06-24T23:52:50.732Z
Learning: The user adityachoudhari26 prefers not to add null safety checks or defensive programming in test code, particularly in e2e tests, as they prioritize simplicity and focus on the main functionality being tested rather than comprehensive error handling within the test itself.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go (1)
179-179: Potential integer overflow on 64-bit systems.Converting
e.Index(int, 64-bit on most systems) toint32could overflow silently if the value exceedsmath.MaxInt32. While job indices realistically won't reach this limit, the conversion is technically unsafe.🛡️ Optional bounds check
+import "math" + func ToWorkflowJobUpsertParams(e *oapi.WorkflowJob) (db.UpsertWorkflowJobParams, error) { // ... existing code ... + if e.Index < math.MinInt32 || e.Index > math.MaxInt32 { + return db.UpsertWorkflowJobParams{}, fmt.Errorf("index %d out of int32 range", e.Index) + } + return db.UpsertWorkflowJobParams{ // ... Index: int32(e.Index), }, nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go` at line 179, The assignment "Index: int32(e.Index)" can overflow when e.Index exceeds int32 bounds; change the mapper in mapper.go to validate e.Index before casting (e.g., check against math.MaxInt32 and math.MinInt32) and either clamp to the nearest int32 value or return an error/unwrap with explicit handling; update the mapping function that sets the Index field (the code that currently does "Index: int32(e.Index)") to perform this bounds check and handle out-of-range values safely instead of a blind cast.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go`:
- Around line 68-74: The JSON unmarshal for row.Matrix currently ignores errors;
change the block that reads row.Matrix into an oapi.WorkflowJobMatrix
(variables: row.Matrix, m, matrix, type oapi.WorkflowJobMatrix) to check the
error returned by json.Unmarshal and handle it consistently with WorkflowToOapi
— e.g., log the unmarshalling error (including contextual identifiers like the
workflow/row ID) with the repository logger or return the error to the caller
instead of silently dropping it, so malformed JSON is surfaced during mapping.
- Around line 13-30: The function WorkflowToOapi currently swallows
json.Unmarshal errors for row.Inputs and row.Jobs causing silent data loss;
update WorkflowToOapi to surface failures by either (A) adding error
propagation: change signature to WorkflowToOapi(row db.Workflow)
(*oapi.Workflow, error) and return an error if unmarshaling row.Inputs or
row.Jobs fails, or (B) if you must keep the current signature, at minimum log
the unmarshal error (including the field name row.Inputs / row.Jobs and row.ID)
via the package logger before returning the partially-built *oapi.Workflow so
corruption is visible; adjust callers accordingly if you choose option A (handle
the returned error from WorkflowToOapi).
---
Nitpick comments:
In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go`:
- Line 179: The assignment "Index: int32(e.Index)" can overflow when e.Index
exceeds int32 bounds; change the mapper in mapper.go to validate e.Index before
casting (e.g., check against math.MaxInt32 and math.MinInt32) and either clamp
to the nearest int32 value or return an error/unwrap with explicit handling;
update the mapping function that sets the Index field (the code that currently
does "Index: int32(e.Index)") to perform this bounds check and handle
out-of-range values safely instead of a blind cast.
ℹ️ Review info
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
apps/workspace-engine/pkg/db/models.goapps/workspace-engine/pkg/db/queries/workflows.sqlapps/workspace-engine/pkg/db/workflows.sql.goapps/workspace-engine/pkg/workspace/store/repository/db/repo.goapps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.goapps/workspace-engine/pkg/workspace/store/repository/db/workflows/repo.go
🚧 Files skipped from review as they are similar to previous changes (2)
- apps/workspace-engine/pkg/db/queries/workflows.sql
- apps/workspace-engine/pkg/workspace/store/repository/db/workflows/repo.go
| func WorkflowToOapi(row db.Workflow) *oapi.Workflow { | ||
| var inputs []oapi.WorkflowInput | ||
| if row.Inputs != nil { | ||
| _ = json.Unmarshal(row.Inputs, &inputs) | ||
| } | ||
|
|
||
| var jobs []oapi.WorkflowJobTemplate | ||
| if row.Jobs != nil { | ||
| _ = json.Unmarshal(row.Jobs, &jobs) | ||
| } | ||
|
|
||
| return &oapi.Workflow{ | ||
| Id: row.ID.String(), | ||
| Name: row.Name, | ||
| Inputs: inputs, | ||
| Jobs: jobs, | ||
| } | ||
| } |
There was a problem hiding this comment.
Silent JSON unmarshal errors may hide data corruption.
The discarded errors on lines 16 and 21 mean callers receive empty slices when Inputs or Jobs contain malformed JSON. If this data originates from the DB (which it does), corruption would go unnoticed.
Consider logging or returning an error if the data is expected to be valid, or at minimum document this as intentional behavior.
🛡️ Suggested improvement to surface errors
func WorkflowToOapi(row db.Workflow) *oapi.Workflow {
var inputs []oapi.WorkflowInput
if row.Inputs != nil {
- _ = json.Unmarshal(row.Inputs, &inputs)
+ if err := json.Unmarshal(row.Inputs, &inputs); err != nil {
+ // Log or handle: inputs will default to nil/empty
+ }
}
var jobs []oapi.WorkflowJobTemplate
if row.Jobs != nil {
- _ = json.Unmarshal(row.Jobs, &jobs)
+ if err := json.Unmarshal(row.Jobs, &jobs); err != nil {
+ // Log or handle: jobs will default to nil/empty
+ }
}Alternatively, change the signature to return (*oapi.Workflow, error) if strict validation is preferred.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go`
around lines 13 - 30, The function WorkflowToOapi currently swallows
json.Unmarshal errors for row.Inputs and row.Jobs causing silent data loss;
update WorkflowToOapi to surface failures by either (A) adding error
propagation: change signature to WorkflowToOapi(row db.Workflow)
(*oapi.Workflow, error) and return an error if unmarshaling row.Inputs or
row.Jobs fails, or (B) if you must keep the current signature, at minimum log
the unmarshal error (including the field name row.Inputs / row.Jobs and row.ID)
via the package logger before returning the partially-built *oapi.Workflow so
corruption is visible; adjust callers accordingly if you choose option A (handle
the returned error from WorkflowToOapi).
| var matrix *oapi.WorkflowJobMatrix | ||
| if row.Matrix != nil { | ||
| m := &oapi.WorkflowJobMatrix{} | ||
| if err := json.Unmarshal(row.Matrix, m); err == nil { | ||
| matrix = m | ||
| } | ||
| } |
There was a problem hiding this comment.
Same silent unmarshal pattern for Matrix.
Similar to the WorkflowToOapi function, the JSON unmarshal error for Matrix is silently ignored. If the stored JSON is malformed, matrix remains nil without any indication of failure.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@apps/workspace-engine/pkg/workspace/store/repository/db/workflows/mapper.go`
around lines 68 - 74, The JSON unmarshal for row.Matrix currently ignores
errors; change the block that reads row.Matrix into an oapi.WorkflowJobMatrix
(variables: row.Matrix, m, matrix, type oapi.WorkflowJobMatrix) to check the
error returned by json.Unmarshal and handle it consistently with WorkflowToOapi
— e.g., log the unmarshalling error (including contextual identifiers like the
workflow/row ID) with the repository logger or return the error to the caller
instead of silently dropping it, so malformed JSON is surfaced during mapping.
Summary by CodeRabbit