Skip to content

Commit 902a285

Browse files
Add listWorkflowRuns op to Backend
1 parent 2a4235c commit 902a285

3 files changed

Lines changed: 142 additions & 1 deletion

File tree

packages/backend-postgres/backend.test.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,84 @@ describe("BackendPostgres", () => {
8787
});
8888
});
8989

90+
describe("listWorkflowRuns()", () => {
91+
test("lists workflow runs ordered by creation time", async () => {
92+
const backend = await BackendPostgres.connect(DEFAULT_DATABASE_URL, {
93+
namespaceId: randomUUID(),
94+
});
95+
const first = await createPendingWorkflowRun(backend);
96+
await sleep(10); // ensure timestamp difference
97+
const second = await createPendingWorkflowRun(backend);
98+
99+
const listed = await backend.listWorkflowRuns({});
100+
expect(listed.data.map((run) => run.id)).toEqual([first.id, second.id]);
101+
await backend.stop();
102+
});
103+
104+
test("paginates workflow runs", async () => {
105+
const backend = await BackendPostgres.connect(DEFAULT_DATABASE_URL, {
106+
namespaceId: randomUUID(),
107+
});
108+
const runs: WorkflowRun[] = [];
109+
for (let i = 0; i < 5; i++) {
110+
runs.push(await createPendingWorkflowRun(backend));
111+
await sleep(10);
112+
}
113+
114+
// p1
115+
const page1 = await backend.listWorkflowRuns({ limit: 2 });
116+
expect(page1.data).toHaveLength(2);
117+
expect(page1.data[0]?.id).toBe(runs[0]?.id);
118+
expect(page1.data[1]?.id).toBe(runs[1]?.id);
119+
expect(page1.pagination.next).not.toBeNull();
120+
expect(page1.pagination.prev).toBeNull();
121+
122+
// p2
123+
const page2 = await backend.listWorkflowRuns({
124+
limit: 2,
125+
after: page1.pagination.next!, // eslint-disable-line @typescript-eslint/no-non-null-assertion
126+
});
127+
expect(page2.data).toHaveLength(2);
128+
expect(page2.data[0]?.id).toBe(runs[2]?.id);
129+
expect(page2.data[1]?.id).toBe(runs[3]?.id);
130+
expect(page2.pagination.next).not.toBeNull();
131+
expect(page2.pagination.prev).not.toBeNull();
132+
133+
// p3
134+
const page3 = await backend.listWorkflowRuns({
135+
limit: 2,
136+
after: page2.pagination.next!, // eslint-disable-line @typescript-eslint/no-non-null-assertion
137+
});
138+
expect(page3.data).toHaveLength(1);
139+
expect(page3.data[0]?.id).toBe(runs[4]?.id);
140+
expect(page3.pagination.next).toBeNull();
141+
expect(page3.pagination.prev).not.toBeNull();
142+
143+
// p2 again
144+
const page2Back = await backend.listWorkflowRuns({
145+
limit: 2,
146+
before: page3.pagination.prev!, // eslint-disable-line @typescript-eslint/no-non-null-assertion
147+
});
148+
expect(page2Back.data).toHaveLength(2);
149+
expect(page2Back.data[0]?.id).toBe(runs[2]?.id);
150+
expect(page2Back.data[1]?.id).toBe(runs[3]?.id);
151+
expect(page2Back.pagination.next).toEqual(page2.pagination.next);
152+
expect(page2Back.pagination.prev).toEqual(page2.pagination.prev);
153+
await backend.stop();
154+
});
155+
156+
test("handles empty results", async () => {
157+
const backend = await BackendPostgres.connect(DEFAULT_DATABASE_URL, {
158+
namespaceId: randomUUID(),
159+
});
160+
const listed = await backend.listWorkflowRuns({});
161+
expect(listed.data).toHaveLength(0);
162+
expect(listed.pagination.next).toBeNull();
163+
expect(listed.pagination.prev).toBeNull();
164+
await backend.stop();
165+
});
166+
});
167+
90168
describe("claimWorkflowRun()", () => {
91169
// because claims involve timing and leases, we create and teardown a new
92170
// namespaced backend instance for each test

packages/backend-postgres/backend.ts

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
GetWorkflowRunParams,
1717
HeartbeatWorkflowRunParams,
1818
ListStepAttemptsParams,
19+
ListWorkflowRunsParams,
1920
PaginatedResponse,
2021
MarkStepAttemptFailedParams,
2122
MarkStepAttemptSucceededParams,
@@ -28,6 +29,8 @@ import {
2829
JsonValue,
2930
} from "openworkflow";
3031

32+
export const DEFAULT_PAGINATION_PAGE_SIZE = 100;
33+
3134
interface BackendPostgresOptions {
3235
namespaceId?: string;
3336
runMigrations?: boolean;
@@ -132,6 +135,61 @@ export class BackendPostgres implements Backend {
132135
return workflowRun ?? null;
133136
}
134137

138+
async listWorkflowRuns(
139+
params: ListWorkflowRunsParams,
140+
): Promise<PaginatedResponse<WorkflowRun>> {
141+
const limit = params.limit ?? DEFAULT_PAGINATION_PAGE_SIZE;
142+
const { after, before } = params;
143+
144+
let cursor: Cursor | null = null;
145+
if (after) {
146+
cursor = decodeCursor(after);
147+
} else if (before) {
148+
cursor = decodeCursor(before);
149+
}
150+
151+
const whereClause = this.buildListWorkflowRunsWhere(params, cursor);
152+
const order = before
153+
? this.pg`ORDER BY "created_at" DESC, "id" DESC`
154+
: this.pg`ORDER BY "created_at" ASC, "id" ASC`;
155+
156+
const rows = await this.pg<WorkflowRun[]>`
157+
SELECT *
158+
FROM "openworkflow"."workflow_runs"
159+
WHERE ${whereClause}
160+
${order}
161+
LIMIT ${limit + 1}
162+
`;
163+
164+
return this.processPaginationResults(rows, limit, !!after, !!before);
165+
}
166+
167+
private buildListWorkflowRunsWhere(
168+
params: ListWorkflowRunsParams,
169+
cursor: Cursor | null,
170+
) {
171+
const { after } = params;
172+
const conditions = [this.pg`"namespace_id" = ${this.namespaceId}`];
173+
174+
if (cursor) {
175+
const op = after ? this.pg`>` : this.pg`<`;
176+
conditions.push(
177+
this.pg`("created_at", "id") ${op} (${cursor.createdAt}, ${cursor.id})`,
178+
);
179+
}
180+
181+
let whereClause = conditions[0];
182+
if (!whereClause) throw new Error("No conditions");
183+
184+
for (let i = 1; i < conditions.length; i++) {
185+
const condition = conditions[i];
186+
if (condition) {
187+
whereClause = this.pg`${whereClause} AND ${condition}`;
188+
}
189+
}
190+
return whereClause;
191+
}
192+
135193
async claimWorkflowRun(
136194
params: ClaimWorkflowRunParams,
137195
): Promise<WorkflowRun | null> {
@@ -416,7 +474,7 @@ export class BackendPostgres implements Backend {
416474
async listStepAttempts(
417475
params: ListStepAttemptsParams,
418476
): Promise<PaginatedResponse<StepAttempt>> {
419-
const limit = params.limit ?? 100;
477+
const limit = params.limit ?? DEFAULT_PAGINATION_PAGE_SIZE;
420478
const { after, before } = params;
421479

422480
let cursor: Cursor | null = null;

packages/openworkflow/backend.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ export interface Backend {
77
// Workflow Runs
88
createWorkflowRun(params: CreateWorkflowRunParams): Promise<WorkflowRun>;
99
getWorkflowRun(params: GetWorkflowRunParams): Promise<WorkflowRun | null>;
10+
listWorkflowRuns(
11+
params: ListWorkflowRunsParams,
12+
): Promise<PaginatedResponse<WorkflowRun>>;
1013
claimWorkflowRun(params: ClaimWorkflowRunParams): Promise<WorkflowRun | null>;
1114
heartbeatWorkflowRun(
1215
params: HeartbeatWorkflowRunParams,
@@ -49,6 +52,8 @@ export interface GetWorkflowRunParams {
4952
workflowRunId: string;
5053
}
5154

55+
export type ListWorkflowRunsParams = PaginationOptions;
56+
5257
export interface ClaimWorkflowRunParams {
5358
workerId: string;
5459
leaseDurationMs: number;

0 commit comments

Comments
 (0)