Durable execution for the Beyond platform. A small layer over the existing queue: workflow runs are FIFO message groups, steps are journal rows, timers are delayed messages, signals are conditional sends. Two new tables, one new pgrx function, five SDK verbs.
Status: design proposal. Not yet built.
- Multi-step background work that survives crashes, restarts, and deploys.
- Idiomatic TypeScript SDK —
workflow({ name, run })+step.run("name", fn). - Composes with the existing queue. No new transport, no new wire protocol, no new wakeup mechanism. Workflows are what the queue grows into when its messages start carrying journals.
- Forks with the rest of the platform. All state lives on the user's
GlideFS volume, so
glide forkcarries it for free. - The minimum effective surface. Roughly Cloudflare Workflows in scope.
- Deterministic-replay sandboxing (Temporal).
- Fan-out, concurrency limits, throttle, workflow-level timeouts. All composable later if demand shows up. Not v1.
- Pause/resume, manual replay, run lists with rich filtering. Not v1.
- HTTP-push worker model. v1 is long-poll only.
- Cross-cluster live migration of in-flight runs.
Closest in scope to Cloudflare Workflows; closest in ergonomics to Inngest. Postgres-native like DBOS, but the storage layer is the queue we already have rather than a parallel runtime schema.
The wedge is what's unique to Beyond: workflow state lives on the same volume as the database, the queue carrying its continuations, and the boxes its steps run on. One CoW operation, the whole world forks. No other runtime can do this — Temporal Cloud, Inngest, DBOS all sit outside the substrate.
Workflows are not a parallel system. They reuse queue primitives directly.
| Workflow concept | Queue primitive |
|---|---|
| Per-run ordering, single-flight | FIFO message_group_id = run_id |
| Resume after a step | Self-send to the workflow's queue |
step.sleep("name", duration) |
Self-send with delay_seconds = duration |
step.waitForEvent timeout |
Self-send with delay_seconds = timeout and a wait-marker payload |
step.waitForEvent resolution |
signal() deletes the wait message + writes the journal in one tx |
| At-least-once retry on crash | vt expiry + read_ct increment |
| Trigger from external event | Topic subscription (POST /v1/topics/...) |
| Trigger from anywhere | send to the workflow queue |
Wake-up after send |
Existing XactCallback + waiter registry |
Every workflow run is a FIFO message group on a queue named
__wf_{workflow_name}. Every step boundary is a queue message. Every
timer is delay_seconds. The queue does the work.
A workflow body is an async function. Inside it, each durable boundary is a named string call:
async run(ctx, input) {
const user = await ctx.step.run("create-user", () => db.users.create(input));
await ctx.step.sleep("welcome-delay", "5m");
await ctx.step.run("send-welcome", () => email.send(user.id));
}The runtime executes the body from the top on every resume. Each
step.* call:
- Looks up its name in the run's journal (
workflow_steptable). - If a row exists, returns the cached output without invoking the callback.
- Otherwise, runs the callback, atomically writes the result + sends the next continuation + acks the in-flight message, exits the handler.
User-visible consequence: code outside step.run re-runs on every step
boundary. Side effects belong inside step.run. Same contract as
Cloudflare Workflows, Inngest, Restate.
- Crash inside a
step.runcallback: vt expires, redelivery, callback re-runs. The callback must be idempotent. - Crash after callback returned, before journal write: same — vt expires, callback re-runs. The journal write and the continuation send happen in one Postgres transaction; they commit together or not at all.
- Crash after journal commit: the next continuation has been enqueued. Recovery is automatic on next worker pickup.
| Verb | Behavior |
|---|---|
step.run(name, fn, opts?) |
Run fn, journal output. Cached on replay. opts.retry configures backoff. |
step.sleep(name, duration) |
Send a continuation with delay_seconds = duration. Journal {slept_until} so timing is replay-stable. |
step.waitForEvent(name, opts) |
Send a continuation with delay_seconds = timeout and a wait-marker payload. Suspends until signal() arrives or the timeout fires. |
Three verbs. Cover the workflow shapes Cloudflare and Vercel ship.
await ctx.step.run("call-stripe", () => stripe.charges.create(...), {
retry: {
attempts: 5,
backoff: { kind: "exp", base: "1s", max: "60s", jitter: 0.2 },
},
});On exception, the runtime calls change_visibility on the in-flight
message with the next backoff delay. The message reappears at the
deadline; the handler reruns the body, hits the same step (cached up to
this one), retries the callback. read_ct is the attempt counter.
When read_ct >= attempts: workflow_step row is written with error,
the run terminates with status failed, the in-flight message is deleted.
Backoff kinds: fixed, linear, exp. Default if retry is omitted: 3
attempts, exponential 1s→60s, 20% jitter.
Branching on prior step outputs is fine. Branching on Math.random(),
Date.now(), or fresh process.env reads outside step.run is not.
Documented, not enforced. Inngest and CF take the same stance and it has
not been a problem.
Two new tables in the queue schema. Both live in the user's Postgres,
on the user's volume — so they fork.
CREATE TYPE queue.workflow_status AS ENUM (
'running', 'completed', 'failed', 'cancelled'
);
CREATE TABLE queue.workflow_run (
run_id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY (CACHE 100),
workflow_name TEXT NOT NULL,
workflow_version INT NOT NULL,
status queue.workflow_status NOT NULL,
input JSONB,
output JSONB,
error JSONB,
idempotency_key TEXT,
current_msg_id BIGINT, -- in-flight queue msg
retention_days INT NOT NULL, -- snapshotted at start
started_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
completed_at TIMESTAMP WITH TIME ZONE,
UNIQUE (workflow_name, idempotency_key)
);
-- Sweeper queries terminal runs by completed_at; partial index keeps it
-- bounded to terminal rows.
CREATE INDEX workflow_run_retention_idx
ON queue.workflow_run (completed_at)
WHERE status IN ('completed', 'failed', 'cancelled');
-- Listing / observability — used by GET /runs filters.
CREATE INDEX workflow_run_listing_idx
ON queue.workflow_run (workflow_name, status, started_at DESC);One row per run. current_msg_id lets signal() find and delete the
in-flight wait message atomically. The source of truth for what to do
next is the queue message keyed by message_group_id = run_id in
queue.q___wf_{workflow_name}.
CREATE TABLE queue.workflow_step (
run_id BIGINT NOT NULL REFERENCES queue.workflow_run(run_id) ON DELETE CASCADE,
step_name TEXT NOT NULL,
output JSONB,
started_at TIMESTAMP WITH TIME ZONE NOT NULL,
completed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(),
PRIMARY KEY (run_id, step_name)
);The journal. A step is either absent (not yet run) or present with its
cached output. Mid-attempt state is transient and lives in read_ct on
the queue message. Terminal failures end the run — the cause is recorded
in workflow_run.error, not on the step row, so the step table holds
only successful results. started_at and completed_at enable per-step
timing in observability tools.
That's it. No waiter table — step.waitForEvent is just a delayed queue
message that signal() deletes on resolve. No concurrency, throttle,
parallel-item, or schedule tables in v1.
Terminal runs (completed, failed, cancelled) are deleted by a
sweeper when completed_at + retention_days < now(). Journal rows
CASCADE.
Retention is a property of the workflow. The user owns the
definition; they choose. Each workflow_run snapshots retention_days
at start time so an in-flight run keeps its original retention even if
the definition changes mid-flight (same model as workflow_version).
export const onboardUser = workflow({
name: "onboard-user",
retention: "30d", // ← user's choice; default 30 days
run: async (ctx, input) => { ... },
});
// Override for a specific run:
await client.start("onboard-user", { email: "..." }, { retentionDays: 90 });Server-enforced ceiling. The operator sets a hard cap via
WORKFLOWS_MAX_RETENTION_DAYS (default 365). The server clamps both the
definition's retention and per-start overrides to min(requested, cap).
Bounds storage growth from a misconfigured user.
In-flight runs are never swept. Status running is excluded from
the sweep regardless of age — a workflow that runs longer than its own
retention is fine.
Sweeper. Runs every WORKFLOWS_SWEEPER_INTERVAL_MS (default 5min)
in the API process alongside the existing HTTP delivery worker. Same
FOR UPDATE SKIP LOCKED pattern as ops/delivery.rs:
DELETE FROM queue.workflow_run
WHERE run_id IN (
SELECT run_id FROM queue.workflow_run
WHERE status IN ('completed', 'failed', 'cancelled')
AND completed_at + make_interval(days => retention_days) < now()
ORDER BY completed_at
LIMIT 1000
FOR UPDATE SKIP LOCKED
);
-- workflow_step CASCADEs.Per-workflow retention via a workflow_config table is not v1. If
operators want overrides without changing user code, they get them
through the env var ceiling.
We don't ship a UI. The platform handles dashboards (glide CLI,
future Beyond dashboard) — building a React app inside this repo would
violate the substrate-composes-from-the-bottom principle. What we ship
is the data surface that any UI, tracer, or metrics backend can
render against.
Three layers:
The REST endpoints above (list + filter, per-run journal with timings,
workflow index) give a dashboard everything it needs. Cursors are
completed_at-based for stable pagination. Filter examples:
GET /v1/workflows/onboard-user/runs?status=failed&since=2026-05-01
GET /v1/workflows/onboard-user/runs/{id}/steps
GET /v1/workflows → [{name, running, completed, failed, cancelled}, ...]
JSONB filtering on input falls out of Postgres for free
(?input.userId=42 translates to input @> '{"userId": 42}') — useful
for "find the run for this customer."
OTLP spans, naming convention:
| Span | Attributes |
|---|---|
workflow.run |
workflow.name, workflow.version, run_id, status |
workflow.step |
workflow.name, run_id, step.name, step.kind (run/sleep/wait), step.attempt |
workflow.signal |
workflow.name, run_id, event |
The queue already has OTLP_ENABLED + OTLP_ENDPOINT plumbing.
Workflows hook into the same exporter — no new config. A user pointing
Honeycomb / Tempo / Jaeger at the OTLP endpoint gets a flame graph per
run for free, with each step as a span and waits showing as gaps.
Exposed at the existing /metrics endpoint:
workflow_runs_started_total{workflow}
workflow_runs_completed_total{workflow, status} # status: completed|failed|cancelled
workflow_runs_in_flight{workflow} # gauge
workflow_run_duration_seconds{workflow} # histogram
workflow_step_duration_seconds{workflow, step} # histogram
workflow_step_retries_total{workflow, step}
workflow_signals_total{workflow, event}
workflow_retention_swept_total # sweeper counter
Operators get Grafana dashboards and alerts (e.g.
rate(workflow_runs_completed_total{status="failed"}[5m]) > N)
without touching our code.
- Replay UI — the data is in the journal; replay endpoint is cut from v1 and lands when demand justifies it.
- Live log streaming per run — runs run in the user's worker processes; logs go to whatever they've configured. We don't aggregate.
- Visual workflow editor — workflows are code, not diagrams. A diagram view of a static workflow is a nice-to-have but not a primitive.
The runtime adds one new pgrx function. Everything else reuses what the queue already exposes.
The atomic transition: write the journal entry, send the continuation, ack the in-flight message. One transaction.
queue.workflow_complete_step(
run_id BIGINT,
step_name TEXT,
output JSONB, -- null if step terminally failed
error JSONB, -- non-null on terminal failure
next_msg JSONB, -- continuation payload; null if run terminates here
next_delay INT, -- delay_seconds for sleep/wait; 0 otherwise
ack_msg_id BIGINT -- the receipt being completed
) RETURNS voidInside:
INSERT INTO queue.workflow_step (run_id, step_name, output, error, ...)withON CONFLICT DO NOTHING. If a row already existed, the worker is a duplicate replay — discardnext_msg, just deleteack_msg_id.- If
next_msg IS NOT NULL:INSERT INTO queue.q___wf_{name}withvt = now() + next_delay,message_group_id = run_id. Capture the new msg_id;UPDATE workflow_run SET current_msg_id = $new_id. - If
next_msg IS NULL(terminal):UPDATE workflow_run SET status = ..., output = ..., error = ..., completed_at = now(), current_msg_id = NULL. DELETE FROM queue.q___wf_{name} WHERE msg_id = ack_msg_id.register_notify_after_commit('__wf_' || workflow_name).
Because workflow_step, workflow_run, and the queue tables live in the
same Postgres, this is one local transaction. No two-phase commit, no
compensating actions.
queue.send_fifo— start a run = INSERTworkflow_runrow + send first message to FIFO grouprun_id. SDK helper, not a new pgrx fn.queue.workflow_signal— thin SQL wrapper that deletesworkflow_run.current_msg_idfrom the queue table and callsworkflow_complete_stepwith the signal payload as the journaled output. PL/pgSQL is fine; it's not a hot path.queue.receive_fifo— workers poll the workflow's queue. FIFO eligibility predicate already enforces single-flight per run.queue.change_visibility— used for retry backoff.queue.delete— cancel a run.
The queue's existing waiter registry wakes workflow handlers exactly the way it wakes ordinary consumers. We don't add a new wakeup path.
POST /v1/workflows/{name}/runs Start a run. Body: {input, version?, idempotencyKey?, retentionDays?}
GET /v1/workflows/{name}/runs List + filter. Query: status, since, until, cursor, limit
GET /v1/workflows/{name}/runs/{run_id} Run status + output
DELETE /v1/workflows/{name}/runs/{run_id} Cancel run
POST /v1/workflows/{name}/runs/{run_id}/signals/{event} Send signal. Body: payload
GET /v1/workflows/{name}/runs/{run_id}/steps Journal with timings
GET /v1/workflows List distinct workflow names + run counts (sidebar/index)
Five endpoints. No POST /v1/workflows — workflow definitions live in
user code. The server learns about a workflow the first time a run is
started for it; the queue (__wf_{name}) is created lazily.
Triggering from outside HTTP:
- From a topic subscription: subscribe a workflow to a topic via the
existing
POST /v1/topics/{pattern}/subscriptionswith{type: "workflow", name}. New target type for the existing fan-out mechanism — one extra branch inops/topic.rs.
import { createWorkflowClient, workflow } from "@beyond.dev/workflows";
// 1. Define a workflow.
export const onboardUser = workflow({
name: "onboard-user",
version: 1,
retention: "30d",
run: async (ctx, input: { email: string }) => {
const user = await ctx.step.run(
"create-user",
() => db.users.create(input),
);
await ctx.step.sleep("welcome-delay", "5m");
await ctx.step.run("send-welcome", () => email.send(user.id), {
retry: { attempts: 5, backoff: { kind: "exp", base: "2s" } },
});
const verified = await ctx.step.waitForEvent("verify", {
match: { userId: user.id },
timeout: "7d",
});
if (!verified) {
await ctx.step.run("nudge", () => email.nudge(user.id));
}
return user.id;
},
});
// 2. Run a worker that polls the workflow's queue.
export const worker = onboardUser.serve({ url: process.env.QUEUE_URL });
// Inside: long-polls receive_fifo on __wf_onboard-user, replays journal,
// runs missing steps, calls workflow_complete_step.
// 3. Trigger from anywhere.
const client = createWorkflowClient({ url: process.env.QUEUE_URL });
const { runId } = await client.start("onboard-user", { email: "a@b.c" });
// Idempotent — same key returns the existing runId.
const { runId: same } = await client.start(
"onboard-user",
{ email: "a@b.c" },
{ idempotencyKey: "user-42-onboard" },
);
await client.signal(runId, "verify", { userId: 42 });
const { status, output } = await client.runs.get("onboard-user", runId);step.waitForEvent accepts a match object — a JSONB superset query.
This falls out of the queue's existing conditional-receive primitive
(message @> $matcher) and is strictly more expressive than a string
event-type filter:
// CF / Inngest: match by event name only
ctx.step.waitForEvent("approved", { type: "manager.approved" });
// Ours: match by any subset of the payload
ctx.step.waitForEvent("approved", {
match: { kind: "manager.approved", managerId: 42 },
});The signal() payload is matched against match via payload @> match.
First waiter whose superset-matches wins.
workflow.serve() returns a long-running worker that:
- Calls
receive_fifoon__wf_{name}with a 30s wait. - On message: loads the run's journal, replays the body, runs the next
missing step, calls
workflow_complete_step. - On exception inside a
step.run: callschange_visibilitywith the backoff delay; exits the handler. - Loops.
On Beyond, this is a long-running box. On serverless platforms a sidecar worker (separate persistent process) runs the poll loop — same pattern as self-hosted Inngest workers. There is no HTTP-push delivery model; continuations live in the queue and pull-based workers consume them.
The runtime passes ctx.signal: AbortSignal into every step. User code
that wants to be interruptible threads it into its async work:
await ctx.step.run("slow-call", () => fetch(url, { signal: ctx.signal }));When client.runs.cancel() is called, the in-flight message is deleted
and the run is flagged cancelled. Any worker currently inside a step
callback receives ctx.signal.aborted = true and may abort cleanly. Code
that ignores ctx.signal finishes its current work — workflow_complete_step
becomes a no-op because the run is no longer running. This is the same
soft-cancel contract as every other workflow runtime.
All workflow state — the run rows, the journal, the in-flight queue
messages with their vt deadlines — lives in the user's Postgres on
their GlideFS volume. glide fork does a CoW of the volume; every byte
comes across at the same logical timestamp.
A run that was waiting on a 5-minute step.sleep in production is still
waiting in the fork, with the same deadline. When that wall clock is
reached, a worker on the fork picks up the continuation and runs the
next step against the fork's data. Production proceeds independently.
Same for step.waitForEvent — the wait message is on the volume, signals
sent to the fork resolve the fork's run, signals to production resolve
production's. Branches diverge.
Side-effect isolation (sandbox API keys, separate network egress, fork not running workers for certain workflows) is the substrate's job. The workflow runtime has no fork-aware code path.
workflow_run.workflow_version is set at start time. New runs use the
latest registered version. Old runs keep running against the version they
started with. The SDK refuses to handle a continuation for a version it
doesn't recognize:
class WorkflowVersionMismatch extends Error {
expected: number;
got: number;
}Two binaries run side-by-side during a deploy until in-flight runs of
version N drain. On Beyond's box-per-app model, the orchestrator runs both
versions and routes by workflow_version.
Same network model as the queue. Workflows don't add a security layer.
- Internal service. Operator's proxy is the perimeter.
Authorizationheader presence required; contents not verified.- Workflow names:
[a-z0-9_], 1–48 chars (must be a valid queue name —__wf_prefix uses 5 of the 48). - Step names: enforced at SDK level —
[a-zA-Z0-9._-]{1,128}. Server rejects > 256 bytes to bound journal size.
| Failure | What happens | Recovery |
|---|---|---|
| Worker crashes mid-step | Continuation message vt expires; another worker picks it up; replays journal; reruns failed step. | Automatic. Step callback must be idempotent. |
| Worker crashes after journal commit | Cannot — journal write and continuation send are one Postgres tx. | n/a |
step.run exhausts retries |
Final journal row written with error; workflow_run.status = 'failed'; in-flight message deleted. |
Operator inspects via GET /runs/{id}/steps; starts a new run. |
Workflow throws an unhandled error outside step.run |
Treated as a permanent body failure: run marked failed with the error. |
Same as above. |
step.waitForEvent times out |
Wait message's vt reaches now; worker receives it, sees the wait marker, journals {timeout: true} and continues. |
Workflow body branches on the result. |
| Signal arrives before wait is registered | Dropped. SDK does not buffer late signals in v1. | Caller code orders the signal after observing the run. |
| Two workers grab the same continuation | Cannot — receive_fifo enforces single-flight per group. |
n/a |
| Database fails over mid-step | sqlx connection drops; vt expires; redelivery on a fresh connection. | Automatic. |
| Idempotent start collision | Existing run_id returned with current status. No duplicate row, no second continuation. |
n/a |
| Workflow definition removed but runs in flight | Worker sees a continuation for an unknown workflow; logs error; vt expires repeatedly until read_ct exceeds threshold and the run is failed by the worker. |
Restore the definition or DELETE the run. |
- Start a run: 1 INSERT (
workflow_run) + 1 INSERT (queue) + 1 NOTIFY. - Resume a run: 1
receive_fifo+ body re-execution + 1workflow_complete_step(1 INSERT, 1 INSERT, 1 UPDATE, 1 DELETE, 1 NOTIFY). - Body re-execution cost: linear in journal size. SDK fetches the
whole journal once at the start of each handler invocation. For runs
100 steps, the journal lookup is by
(run_id, step_name)PK and is in-memory after the first fetch. - Sleep accuracy: bounded by
receive_fifolatency. Existing waiter registry latency is sub-millisecond. - Fork cost: O(1). Whatever GlideFS gives us.
Workflow code joins the existing beyond-queue crate workspace. No new
binary; the queue server gains workflow routes, the extension gains one
new pgrx function.
queue/
src/
ops/
workflow.rs # start, signal, complete_step, get, cancel
routes/
workflows.rs # /v1/workflows/...
beyond-queue-extension/
src/
workflow.rs # pgrx: workflow_complete_step
sql/
schema.sql # + workflow_status, workflow_run, workflow_step, workflow_signal()
sdk/ts/
workflows/
package.json # @beyond.dev/workflows
src/
client.ts # createWorkflowClient — start/signal/runs.get/cancel
runtime.ts # workflow(), step.run/sleep/waitForEvent, replay engine
worker.ts # serve() — long-poll loop
The SDK ships two entry points within one package:
// trigger only — light
import { createWorkflowClient } from "@beyond.dev/workflows";
// define + serve — pulls in the replay engine
import { workflow } from "@beyond.dev/workflows/runtime";Topic subscriptions already exist (POST /v1/topics/{pattern}/subscriptions).
Workflows become a third subscription target type alongside queue and
http/https:
await events.subscriptions.create("user.signup", {
type: "workflow",
name: "onboard-user",
});Server-side, the existing queue.send_topic fan-out gains a new branch:
when a subscription has target_type = 'workflow', instead of queue.send
it calls the workflow start path. The publish from the user's perspective
is unchanged.
This is the small composition that earns the platform line. Publish a domain event — it can hit a queue (decoupled async work), an HTTP endpoint (webhook), or kick off a durable, multi-step, forkable workflow. Same publish, three behaviors, one storage layer.
We add one pgrx function, two tables, one sweeper task, one SDK
package. Three SDK verbs (run, sleep, waitForEvent), four client
methods (start, signal, runs.get, runs.cancel), two operator env
vars (retention ceiling, sweeper interval).
Everything else is the queue. Workflows are not a separate service we built next to the queue — they are what the queue grows into when its messages start carrying durable journals.