Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions packages/docs/docs/workflows.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,31 @@ The `run()` method returns a handle immediately. The actual workflow execution
happens in a worker process. This lets your application continue without waiting
for the workflow to complete.

### Scheduling a Workflow Run

You can schedule a workflow run for a specific time by passing `availableAt`:

```ts
const runAt = new Date("2026-02-05T15:00:00.000Z");
const handle = await sendWelcomeEmail.run(
{ userId: "user_123" },
{ availableAt: runAt },
);
Comment thread
jamescmartinez marked this conversation as resolved.
```

You can also pass a duration string using the same [duration
format](/docs/sleeping#duration-formats) as `step.sleep`:

```ts
const handle = await sendWelcomeEmail.run(
{ userId: "user_123" },
{ availableAt: "5m" },
);
```

The run stays `pending` until `availableAt` is reached, then workers can claim
it. If `availableAt` is in the past, the run is immediately available.

Comment thread
jamescmartinez marked this conversation as resolved.
## Waiting for Results

If you need to wait for a workflow to complete, use `.result()` on the handle:
Expand Down
5 changes: 5 additions & 0 deletions packages/openworkflow/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# openworkflow

## Unreleased

- Added support for scheduling workflow runs with a `Date` or duration string
See https://openworkflow.dev/docs/workflows#scheduling-a-workflow-run

## 0.6.3

- Export the full Backend interface for third-party backends
Expand Down
1 change: 1 addition & 0 deletions packages/openworkflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ For more details, check out our [docs](https://openworkflow.dev/docs).
- ✅ **Step memoization** - Never repeat completed work
- ✅ **Automatic retries** - Built-in exponential backoff
- ✅ **Long pauses** - Sleep for seconds or months
- ✅ **Scheduled runs** - Start workflows at a specific time
- ✅ **Parallel execution** - Run steps concurrently
- ✅ **No extra servers** - Uses your existing database
- ✅ **Dashboard included** - Monitor and debug workflows
Expand Down
54 changes: 54 additions & 0 deletions packages/openworkflow/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,60 @@ describe("OpenWorkflow", () => {
expect(handle.workflowRun.deadlineAt?.getTime()).toBe(deadline.getTime());
});

test("creates workflow run with availableAt", async () => {
const backend = await createBackend();
const client = new OpenWorkflow({ backend });

const workflow = client.defineWorkflow(
{ name: "available-at-test" },
noopFn,
);
const availableAt = new Date(Date.now() + 60_000); // in 1 minute
const handle = await workflow.run({ value: 1 }, { availableAt });

expect(handle.workflowRun.availableAt).not.toBeNull();
expect(handle.workflowRun.availableAt?.getTime()).toBe(
availableAt.getTime(),
);
});

test("creates workflow run with availableAt duration", async () => {
const backend = await createBackend();
const client = new OpenWorkflow({ backend });

const workflow = client.defineWorkflow(
{ name: "available-at-duration-test" },
noopFn,
);

const start = Date.now();
const handle = await workflow.run({ value: 1 }, { availableAt: "2s" });

expect(handle.workflowRun.availableAt).not.toBeNull();
if (!handle.workflowRun.availableAt) {
throw new Error("availableAt should be set");
}

const delayMs = handle.workflowRun.availableAt.getTime() - start;
expect(delayMs).toBeGreaterThanOrEqual(1900);
expect(delayMs).toBeLessThanOrEqual(10_000);
});

test("throws for invalid availableAt duration", async () => {
const backend = await createBackend();
const client = new OpenWorkflow({ backend });

const workflow = client.defineWorkflow(
{ name: "available-at-invalid-test" },
noopFn,
);

await expect(
// @ts-expect-error - invalid duration format
workflow.run({ value: 1 }, { availableAt: "not-a-duration" }),
).rejects.toThrow('Invalid duration format: "not-a-duration"');
});

test("creates workflow run with version", async () => {
const backend = await createBackend();
const client = new OpenWorkflow({ backend });
Expand Down
30 changes: 29 additions & 1 deletion packages/openworkflow/client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import type { Backend } from "./backend.js";
import type { DurationString } from "./core/duration.js";
import type { StandardSchemaV1 } from "./core/schema.js";
import { calculateDateFromDuration } from "./core/step.js";
import type {
SchemaInput,
SchemaOutput,
Expand Down Expand Up @@ -106,7 +108,7 @@ export class OpenWorkflow {
config: {},
context: null,
input: parsedInput ?? null,
availableAt: null,
availableAt: resolveAvailableAt(options?.availableAt),
deadlineAt: options?.deadlineAt ?? null,
});

Expand Down Expand Up @@ -194,13 +196,39 @@ export class RunnableWorkflow<Input, Output, RunInput = Input> {
* `workflowDef.run()`.
*/
export interface WorkflowRunOptions {
/**
* Schedule the workflow run for a future time. When set, the run will stay
* pending until the timestamp is reached. Accepts an absolute Date or a
* duration string (e.g. "5m", "2 hours").
Comment thread
jamescmartinez marked this conversation as resolved.
*/
availableAt?: Date | DurationString;
/**
* Set a deadline for the workflow run. If the workflow exceeds this deadline,
* it will be marked as failed.
*/
deadlineAt?: Date;
}

/**
* Resolve availableAt to an absolute Date or null.
* @param availableAt - Absolute Date or duration string
* @returns Absolute Date or null
* @throws {Error} When a duration string is invalid
*/
function resolveAvailableAt(
availableAt: Date | DurationString | undefined,
): Date | null {
if (!availableAt) return null;
if (availableAt instanceof Date) return availableAt;

const result = calculateDateFromDuration(availableAt);
if (!result.ok) {
throw result.error;
}

return result.value;
}
Comment thread
jamescmartinez marked this conversation as resolved.

/**
* Options for WorkflowHandle.
*/
Expand Down
18 changes: 9 additions & 9 deletions packages/openworkflow/core/step.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
getCachedStepAttempt,
addToStepAttemptCache,
normalizeStepOutput,
calculateSleepResumeAt,
calculateDateFromDuration,
createSleepContext,
} from "./step.js";
import type { StepAttempt, StepAttemptCache } from "./step.js";
Expand Down Expand Up @@ -224,38 +224,38 @@ describe("normalizeStepOutput", () => {
});
});

describe("calculateSleepResumeAt", () => {
describe("calculateDateFromDuration", () => {
test("calculates resume time from duration string", () => {
const now = 1_000_000;
const result = calculateSleepResumeAt("5s", now);
const result = calculateDateFromDuration("5s", now);

expect(result).toEqual(ok(new Date(now + 5000)));
});

test("calculates resume time with milliseconds", () => {
const now = 1_000_000;
const result = calculateSleepResumeAt("500ms", now);
const result = calculateDateFromDuration("500ms", now);

expect(result).toEqual(ok(new Date(now + 500)));
});

test("calculates resume time with minutes", () => {
const now = 1_000_000;
const result = calculateSleepResumeAt("2m", now);
const result = calculateDateFromDuration("2m", now);

expect(result).toEqual(ok(new Date(now + 2 * 60 * 1000)));
});

test("calculates resume time with hours", () => {
const now = 1_000_000;
const result = calculateSleepResumeAt("1h", now);
const result = calculateDateFromDuration("1h", now);

expect(result).toEqual(ok(new Date(now + 60 * 60 * 1000)));
});

test("uses Date.now() when now is not provided", () => {
const before = Date.now();
const result = calculateSleepResumeAt("1s");
const result = calculateDateFromDuration("1s");
const after = Date.now();

expect(result.ok).toBe(true);
Expand All @@ -268,7 +268,7 @@ describe("calculateSleepResumeAt", () => {

test("returns error for invalid duration", () => {
// @ts-expect-error testing invalid input
const result = calculateSleepResumeAt("invalid");
const result = calculateDateFromDuration("invalid");

expect(result.ok).toBe(false);
if (!result.ok) {
Expand All @@ -278,7 +278,7 @@ describe("calculateSleepResumeAt", () => {

test("returns error for empty duration", () => {
// @ts-expect-error testing invalid input
const result = calculateSleepResumeAt("");
const result = calculateDateFromDuration("");

expect(result.ok).toBe(false);
});
Expand Down
6 changes: 3 additions & 3 deletions packages/openworkflow/core/step.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ export function normalizeStepOutput(result: unknown): JsonValue {
}

/**
* Calculate the resume time for a sleep step.
* @param duration - The duration string to sleep for
* Calculate a future time from a duration string.
* @param duration - The duration string to add
* @param now - The current timestamp (defaults to Date.now())
* @returns A Result containing the resume Date or an Error
Comment thread
jamescmartinez marked this conversation as resolved.
*/
export function calculateSleepResumeAt(
export function calculateDateFromDuration(
duration: DurationString,
now: number = Date.now(),
): Result<Date> {
Expand Down
4 changes: 2 additions & 2 deletions packages/openworkflow/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
getCachedStepAttempt,
addToStepAttemptCache,
normalizeStepOutput,
calculateSleepResumeAt,
calculateDateFromDuration,
createSleepContext,
} from "./core/step.js";
import type { WorkflowRun } from "./core/workflow.js";
Expand Down Expand Up @@ -160,7 +160,7 @@ class StepExecutor implements StepApi {
if (existingAttempt) return;

// create new step attempt for the sleep
const result = calculateSleepResumeAt(duration);
const result = calculateDateFromDuration(duration);
if (!result.ok) {
throw result.error;
}
Expand Down