Skip to content

Commit 6afc9bf

Browse files
d-csclaude
andauthored
fix(run-engine): retry getSnapshotsSince on the replica then primary when the read replica lags (#3889)
## Summary When `RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED` is on, `RunEngine.getSnapshotsSince` reads from the read replica. During write spikes the replica can briefly lag, so the snapshot id a runner just learned from the writer isn't visible there yet: the lookup threw, the worker route returned a 500, and the runner waited for its next poll — turning sub-second snapshot notifications into poll-interval latency exactly when things are busiest. This PR makes the flag safe to enable: a replica miss of the since snapshot gets one jittered retry on the replica (most lag windows are shorter than the ~50–200ms wait, so the writer is never touched), then falls back to the primary, observed via a new `run_engine.snapshots_since.replica_miss` counter with an `outcome` attribute (`replica_retry` vs `primary`). Only genuine misses — absent on the primary too — remain errors. ## Design - `getExecutionSnapshotsSince` now throws a typed `ExecutionSnapshotNotFoundError` so the engine can distinguish the expected lag miss from real failures. The message string is unchanged and the error never leaves the engine. - The recovery path only engages when the flag is on, a distinct replica client is configured, and no transaction client was passed. With the flag off, the path is behaviorally identical to before. - Retry delay bounds are configurable (`RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MIN_MS`/`MAX_MS`, default 50/200; `MAX_MS=0` skips the replica retry and goes straight to the primary). - The warn log fires only when the primary serves the read (the writer spill is the operationally interesting event); replica-retry recoveries are counted but quiet. A permanently-missing snapshot id stays an error-level failure with a `failedDuring` field, so lag metrics aren't polluted by bogus ids. - Stale-tail lag (replica has the since snapshot but not newer rows) deliberately still returns the replica's view; the next poll catches up. - The since-snapshot anchor lookup is now scoped to the polled run (`where: { id, runId }`), so a snapshot id from a different run raises not-found instead of silently anchoring a too-wide window of the run's snapshots. ## Test plan All vitest + testcontainers, no mocks. A new `schemaOnlyPrisma` fixture (migrated-but-empty clone database) simulates a replica that hasn't caught up, and a real in-memory OTel meter pins the counter semantics per outcome. - [x] Replica catches up during the jittered retry window → served by the replica, `outcome=replica_retry` = 1, primary never consulted - [x] Replica permanently missing the since snapshot → served by the primary, `outcome=primary` = 1 - [x] Snapshot missing on both replica and primary → null, counter = 0 - [x] Replica has the since snapshot but lags by one → the replica's view is served, no fallback (verified discriminating power: the test fails if reads secretly hit the primary) - [x] Flag off with a replica configured → primary serves the read - [x] Transaction client provided → bypasses the replica entirely - [x] Since snapshot belonging to a different run → null - [x] Existing getSnapshotsSince + waitpoints suites green; run-engine, testcontainers, and webapp typechecks pass 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
1 parent 081b6ba commit 6afc9bf

12 files changed

Lines changed: 1005 additions & 26 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: improvement
4+
---
5+
6+
Run snapshot polling no longer errors or pays extra latency when the database read replica hasn't yet replicated the snapshot the runner is polling from (`RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED`): the read is briefly retried on the replica and served from the primary if it still hasn't caught up. Polling also now rejects a since-snapshot id that doesn't belong to the run being polled.

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,8 @@ const EnvironmentSchema = z
950950
.default("info"),
951951
RUN_ENGINE_TREAT_PRODUCTION_EXECUTION_STALLS_AS_OOM: z.string().default("0"),
952952
RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED: z.string().default("0"),
953+
RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MIN_MS: z.coerce.number().int().default(50),
954+
RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MAX_MS: z.coerce.number().int().default(200),
953955
RUN_ENGINE_DEBOUNCE_USE_REPLICA_FOR_FAST_PATH_READ: z.string().default("0"),
954956

955957
/** How long should the presence ttl last */

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ function createRunEngine() {
2222
env.RUN_ENGINE_TREAT_PRODUCTION_EXECUTION_STALLS_AS_OOM === "1",
2323
readReplicaSnapshotsSinceEnabled:
2424
env.RUN_ENGINE_READ_REPLICA_SNAPSHOTS_SINCE_ENABLED === "1",
25+
readReplicaSnapshotsSinceRetryDelay: {
26+
minMs: env.RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MIN_MS,
27+
maxMs: env.RUN_ENGINE_SNAPSHOTS_SINCE_REPLICA_RETRY_MAX_MS,
28+
},
2529
worker: {
2630
disabled: env.RUN_ENGINE_WORKER_ENABLED === "0",
2731
workers: env.RUN_ENGINE_WORKER_COUNT,

internal-packages/run-engine/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
},
3636
"devDependencies": {
3737
"@internal/testcontainers": "workspace:*",
38+
"@opentelemetry/sdk-metrics": "2.7.1",
3839
"@types/seedrandom": "^3.0.8",
3940
"rimraf": "6.0.1"
4041
},

internal-packages/run-engine/src/engine/errors.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,10 @@ export class RunOneTimeUseTokenError extends Error {
104104
this.name = "RunOneTimeUseTokenError";
105105
}
106106
}
107+
108+
export class ExecutionSnapshotNotFoundError extends Error {
109+
constructor(public readonly snapshotId: string) {
110+
super(`No execution snapshot found for id ${snapshotId}`);
111+
this.name = "ExecutionSnapshotNotFoundError";
112+
}
113+
}

internal-packages/run-engine/src/engine/index.ts

Lines changed: 86 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { createRedisClient, Redis } from "@internal/redis";
2-
import { getMeter, Meter, startSpan, trace, Tracer } from "@internal/tracing";
2+
import { type Counter, getMeter, Meter, startSpan, trace, Tracer } from "@internal/tracing";
33
import { Logger } from "@trigger.dev/core/logger";
44
import {
55
CheckpointInput,
@@ -33,6 +33,7 @@ import {
3333
import { Worker } from "@trigger.dev/redis-worker";
3434
import { assertNever } from "assert-never";
3535
import { EventEmitter } from "node:events";
36+
import { setTimeout } from "node:timers/promises";
3637
import { BatchQueue } from "../batch-queue/index.js";
3738
import type {
3839
BatchItem,
@@ -46,7 +47,12 @@ import { RunQueue } from "../run-queue/index.js";
4647
import { RunQueueFullKeyProducer } from "../run-queue/keyProducer.js";
4748
import { AuthenticatedEnvironment, MinimalAuthenticatedEnvironment } from "../shared/index.js";
4849
import { BillingCache } from "./billingCache.js";
49-
import { NotImplementedError, RunDuplicateIdempotencyKeyError, RunOneTimeUseTokenError } from "./errors.js";
50+
import {
51+
ExecutionSnapshotNotFoundError,
52+
NotImplementedError,
53+
RunDuplicateIdempotencyKeyError,
54+
RunOneTimeUseTokenError,
55+
} from "./errors.js";
5056
import { EventBus, EventBusEvents } from "./eventBus.js";
5157
import { RunLocker } from "./locking.js";
5258
import { getFinalRunStatuses } from "./statuses.js";
@@ -88,6 +94,8 @@ export class RunEngine {
8894
private logger: Logger;
8995
private tracer: Tracer;
9096
private meter: Meter;
97+
private snapshotsSinceReplicaMissCounter: Counter;
98+
private snapshotsSinceReplicaRetryDelay: { minMs: number; maxMs: number };
9199
private heartbeatTimeouts: HeartbeatTimeouts;
92100
private repairSnapshotTimeoutMs: number;
93101
private batchQueue: BatchQueue;
@@ -272,6 +280,22 @@ export class RunEngine {
272280
this.tracer = options.tracer;
273281
this.meter = options.meter ?? getMeter("run-engine");
274282

283+
this.snapshotsSinceReplicaMissCounter = this.meter.createCounter(
284+
"run_engine.snapshots_since.replica_miss",
285+
{
286+
description:
287+
"getSnapshotsSince reads where the since snapshot was not yet on the read replica, recovered via a replica retry or served from the primary",
288+
}
289+
);
290+
291+
// Normalize the bounds, but keep maxMs <= 0 meaning "skip the replica retry".
292+
const retryDelay = options.readReplicaSnapshotsSinceRetryDelay ?? { minMs: 50, maxMs: 200 };
293+
const retryMinMs = Math.max(0, retryDelay.minMs);
294+
this.snapshotsSinceReplicaRetryDelay = {
295+
minMs: retryDelay.maxMs > 0 ? Math.min(retryMinMs, retryDelay.maxMs) : retryMinMs,
296+
maxMs: retryDelay.maxMs,
297+
};
298+
275299
const defaultHeartbeatTimeouts: HeartbeatTimeouts = {
276300
PENDING_EXECUTING: 60_000,
277301
PENDING_CANCEL: 60_000,
@@ -1918,13 +1942,69 @@ export class RunEngine {
19181942
snapshotId: string;
19191943
tx?: PrismaClientOrTransaction;
19201944
}): Promise<RunExecutionData[] | null> {
1921-
const prisma =
1922-
tx ?? (this.options.readReplicaSnapshotsSinceEnabled ? this.readOnlyPrisma : this.prisma);
1945+
const useReplica =
1946+
!tx &&
1947+
this.options.readReplicaSnapshotsSinceEnabled === true &&
1948+
this.readOnlyPrisma !== this.prisma;
1949+
const prisma = tx ?? (useReplica ? this.readOnlyPrisma : this.prisma);
1950+
1951+
const query = async (client: PrismaClientOrTransaction) => {
1952+
const snapshots = await getExecutionSnapshotsSince(client, runId, snapshotId);
1953+
return snapshots.map(executionDataFromSnapshot);
1954+
};
19231955

19241956
try {
1925-
const snapshots = await getExecutionSnapshotsSince(prisma, runId, snapshotId);
1926-
return snapshots.map(executionDataFromSnapshot);
1957+
return await query(prisma);
19271958
} catch (e) {
1959+
if (useReplica && e instanceof ExecutionSnapshotNotFoundError) {
1960+
// Replica lag: the runner learned this snapshot id from the writer before the
1961+
// replica caught up. Give the replica one jittered retry; if it's still missing,
1962+
// serve from the writer. Only not-found errors get this treatment - any other
1963+
// replica failure stays an error rather than shifting read load to the writer.
1964+
// A miss on the writer too is a real error, not lag.
1965+
const { minMs, maxMs } = this.snapshotsSinceReplicaRetryDelay;
1966+
if (maxMs > 0) {
1967+
await setTimeout(minMs + Math.random() * Math.max(0, maxMs - minMs));
1968+
try {
1969+
const result = await query(this.readOnlyPrisma);
1970+
this.snapshotsSinceReplicaMissCounter.add(1, { outcome: "replica_retry" });
1971+
return result;
1972+
} catch (replicaRetryError) {
1973+
if (!(replicaRetryError instanceof ExecutionSnapshotNotFoundError)) {
1974+
this.logger.error("Failed to getSnapshotsSince", {
1975+
message:
1976+
replicaRetryError instanceof Error
1977+
? replicaRetryError.message
1978+
: replicaRetryError,
1979+
runId,
1980+
snapshotId,
1981+
failedDuring: "replica_retry",
1982+
});
1983+
return null;
1984+
}
1985+
// still not on the replica - fall through to the primary
1986+
}
1987+
}
1988+
1989+
try {
1990+
const result = await query(this.prisma);
1991+
this.snapshotsSinceReplicaMissCounter.add(1, { outcome: "primary" });
1992+
this.logger.warn("getSnapshotsSince: snapshot not yet on replica, served from primary", {
1993+
runId,
1994+
snapshotId,
1995+
});
1996+
return result;
1997+
} catch (retryError) {
1998+
this.logger.error("Failed to getSnapshotsSince", {
1999+
message: retryError instanceof Error ? retryError.message : retryError,
2000+
runId,
2001+
snapshotId,
2002+
failedDuring: "primary_fallback",
2003+
});
2004+
return null;
2005+
}
2006+
}
2007+
19282008
this.logger.error("Failed to getSnapshotsSince", {
19292009
message: e instanceof Error ? e.message : e,
19302010
runId,

internal-packages/run-engine/src/engine/systems/executionSnapshotSystem.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
TaskRunStatus,
1111
Waitpoint,
1212
} from "@trigger.dev/database";
13+
import { ExecutionSnapshotNotFoundError } from "../errors.js";
1314
import { HeartbeatTimeouts } from "../types.js";
1415
import { SystemResources } from "./systems.js";
1516

@@ -273,12 +274,12 @@ export async function getExecutionSnapshotsSince(
273274
): Promise<EnhancedExecutionSnapshot[]> {
274275
// Step 1: Find the createdAt of the sinceSnapshotId
275276
const sinceSnapshot = await prisma.taskRunExecutionSnapshot.findFirst({
276-
where: { id: sinceSnapshotId },
277+
where: { id: sinceSnapshotId, runId },
277278
select: { createdAt: true },
278279
});
279280

280281
if (!sinceSnapshot) {
281-
throw new Error(`No execution snapshot found for id ${sinceSnapshotId}`);
282+
throw new ExecutionSnapshotNotFoundError(sinceSnapshotId);
282283
}
283284

284285
// Step 2: Fetch snapshots WITHOUT waitpoints to avoid N×M data explosion

0 commit comments

Comments
 (0)