From aba7dd087813a13443e1eac4026f7467143314e3 Mon Sep 17 00:00:00 2001 From: Thomas Jiang Date: Sat, 14 Feb 2026 16:34:42 +0800 Subject: [PATCH] Fix custom schema usage in postgres reschedule query --- .../openworkflow/postgres/backend.test.ts | 72 +++++++++++++++++++ packages/openworkflow/postgres/backend.ts | 4 +- 2 files changed, 75 insertions(+), 1 deletion(-) diff --git a/packages/openworkflow/postgres/backend.test.ts b/packages/openworkflow/postgres/backend.test.ts index ffb2483c..53ba08e0 100644 --- a/packages/openworkflow/postgres/backend.test.ts +++ b/packages/openworkflow/postgres/backend.test.ts @@ -93,4 +93,76 @@ describe("BackendPostgres schema option", () => { await pg.end(); } }); + + test("reschedules workflow runs in the configured schema", async () => { + const schema = `test_schema_${randomUUID().replaceAll("-", "_")}`; + const namespaceId = randomUUID(); + const workerId = randomUUID(); + const backend = await BackendPostgres.connect(DEFAULT_POSTGRES_URL, { + namespaceId, + schema, + }); + + try { + const workflowRun = await backend.createWorkflowRun({ + workflowName: "schema-reschedule-test", + version: null, + idempotencyKey: null, + input: null, + config: {}, + context: null, + availableAt: null, + deadlineAt: null, + }); + + const claimed = await backend.claimWorkflowRun({ + workerId, + leaseDurationMs: 60_000, + }); + + expect(claimed?.id).toBe(workflowRun.id); + + const availableAt = new Date(Date.now() + 60_000); + const rescheduled = + await backend.rescheduleWorkflowRunAfterFailedStepAttempt({ + workflowRunId: workflowRun.id, + workerId, + availableAt, + error: { message: "step failed" }, + }); + + expect(rescheduled.id).toBe(workflowRun.id); + expect(rescheduled.status).toBe("pending"); + expect(rescheduled.workerId).toBeNull(); + + const pg = newPostgresMaxOne(DEFAULT_POSTGRES_URL); + try { + const workflowRunsTable = pg`${pg(schema)}.${pg("workflow_runs")}`; + + const [record] = await pg< + { + id: string; + status: string; + }[] + >` + SELECT "id", "status" + FROM ${workflowRunsTable} + WHERE "namespace_id" = ${namespaceId} + AND "id" = ${workflowRun.id} + LIMIT 1 + `; + + expect(record?.id).toBe(workflowRun.id); + expect(record?.status).toBe("pending"); + } finally { + await pg.end(); + } + } finally { + await backend.stop(); + + const pg = newPostgresMaxOne(DEFAULT_POSTGRES_URL); + await dropSchema(pg, schema); + await pg.end(); + } + }); }); diff --git a/packages/openworkflow/postgres/backend.ts b/packages/openworkflow/postgres/backend.ts index ae84d410..359b5796 100644 --- a/packages/openworkflow/postgres/backend.ts +++ b/packages/openworkflow/postgres/backend.ts @@ -448,8 +448,10 @@ export class BackendPostgres implements Backend { async rescheduleWorkflowRunAfterFailedStepAttempt( params: RescheduleWorkflowRunAfterFailedStepAttemptParams, ): Promise { + const workflowRunsTable = this.workflowRunsTable(); + const [updated] = await this.pg` - UPDATE "openworkflow"."workflow_runs" + UPDATE ${workflowRunsTable} SET "status" = 'pending', "available_at" = ${params.availableAt},