From 87fa5229ef95aebc078a8f704efc66c33f6ddb60 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Fri, 12 Jun 2026 16:56:47 +0100 Subject: [PATCH] fix(webapp): stop locked-version triggers failing on stale replica reads A locked-version trigger such as triggerAndWait resolved the task's metadata from the read replica and, on a miss, threw a non-retryable "task not found on locked version" even though the task was registered. A read replica can return an empty result for a row that already exists on the primary, so this surfaced as intermittent, self-recovering trigger failures. The locked worker is already resolved on the primary in the same request, so the resolver now re-checks the primary when the replica returns no row, and only reports the task missing when the primary genuinely lacks it. This runs on the cache-miss path only and leaves the hot path unchanged. --- .../locked-version-trigger-stale-replica.md | 6 + .../app/runEngine/concerns/queues.server.ts | 45 ++++-- apps/webapp/test/engine/triggerTask.test.ts | 128 +++++++++++++++++- 3 files changed, 170 insertions(+), 9 deletions(-) create mode 100644 .server-changes/locked-version-trigger-stale-replica.md diff --git a/.server-changes/locked-version-trigger-stale-replica.md b/.server-changes/locked-version-trigger-stale-replica.md new file mode 100644 index 00000000000..921006f264a --- /dev/null +++ b/.server-changes/locked-version-trigger-stale-replica.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Fix locked-version triggers such as triggerAndWait occasionally failing with "task not found on locked version" for a task that is actually registered, by confirming against the primary database when the read replica returns no row. diff --git a/apps/webapp/app/runEngine/concerns/queues.server.ts b/apps/webapp/app/runEngine/concerns/queues.server.ts index dce74d7d1a9..1c3f9a06aa7 100644 --- a/apps/webapp/app/runEngine/concerns/queues.server.ts +++ b/apps/webapp/app/runEngine/concerns/queues.server.ts @@ -268,14 +268,27 @@ export class DefaultQueueManager implements QueueManager { const cached = await this.taskMetaCache.getByWorker(workerId, slug); if (cached) return cached; - const row = await this.replicaPrisma.backgroundWorkerTask.findFirst({ - where: { workerId, runtimeEnvironmentId: environmentId, slug }, - select: { - ttl: true, - triggerSource: true, - queue: { select: { id: true, name: true } }, - }, - }); + // Cache miss. Read the row from the replica first; if the replica comes + // back empty, re-check the writer before concluding the task is missing. + // The locked worker itself was just resolved on the writer (see + // triggerTask.server.ts), so a replica that returns no row here is stale, + // not authoritative. Trusting a stale-replica negative throws a + // non-retryable "not found on locked version" for a task that is in fact + // registered. The writer read only runs on this rare miss-then-empty path, + // never on the hot path. + let row = await this.findLockedTaskRow(this.replicaPrisma, workerId, environmentId, slug); + + if (!row && this.replicaPrisma !== this.prisma) { + row = await this.findLockedTaskRow(this.prisma, workerId, environmentId, slug); + + if (row) { + logger.warn("Locked task metadata missing on replica but found on writer", { + workerId, + environmentId, + slug, + }); + } + } if (!row) return null; @@ -294,6 +307,22 @@ export class DefaultQueueManager implements QueueManager { return entry; } + private findLockedTaskRow( + client: PrismaClientOrTransaction, + workerId: string, + environmentId: string, + slug: string + ) { + return client.backgroundWorkerTask.findFirst({ + where: { workerId, runtimeEnvironmentId: environmentId, slug }, + select: { + ttl: true, + triggerSource: true, + queue: { select: { id: true, name: true } }, + }, + }); + } + /** * Resolve task metadata for a non-locked trigger. Reads from the * `task-meta:env:{envId}` Redis hash; falls back to diff --git a/apps/webapp/test/engine/triggerTask.test.ts b/apps/webapp/test/engine/triggerTask.test.ts index c40416837dd..c524d0a3b9c 100644 --- a/apps/webapp/test/engine/triggerTask.test.ts +++ b/apps/webapp/test/engine/triggerTask.test.ts @@ -23,7 +23,10 @@ import { TaskRun } from "@trigger.dev/database"; import { Redis } from "ioredis"; import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server"; import { DefaultQueueManager } from "~/runEngine/concerns/queues.server"; -import { RedisTaskMetadataCache } from "~/services/taskMetadataCache.server"; +import { + NoopTaskMetadataCache, + RedisTaskMetadataCache, +} from "~/services/taskMetadataCache.server"; import { EntitlementValidationParams, MaxAttemptsValidationParams, @@ -949,6 +952,129 @@ describe("RunEngineTriggerTaskService", () => { } ); + containerTest( + "should fall back to the writer when a stale replica returns no row for a locked task", + async ({ prisma, redisOptions }) => { + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0005, + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + const taskIdentifier = "test-task"; + + const worker = await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + // A read replica that has not yet caught up to the BackgroundWorkerTask + // row: it is the real database for every query except the locked-task + // lookup, which comes back empty (the TRI-10868 false-negative window). + const staleReplica = new Proxy(prisma, { + get(target, prop, receiver) { + if (prop === "backgroundWorkerTask") { + const delegate = Reflect.get(target, prop, receiver); + return new Proxy(delegate, { + get(taskTarget, taskProp, taskReceiver) { + if (taskProp === "findFirst") { + return async () => null; + } + const value = Reflect.get(taskTarget, taskProp, taskReceiver); + return typeof value === "function" ? value.bind(taskTarget) : value; + }, + }); + } + const value = Reflect.get(target, prop, receiver); + return typeof value === "function" ? value.bind(target) : value; + }, + }) as typeof prisma; + + // Noop cache so every resolve misses the cache and exercises the + // replica -> writer fallback. The writer is the real `prisma`. + const queuesManager = new DefaultQueueManager( + prisma, + engine, + staleReplica, + new NoopTaskMetadataCache() + ); + + const triggerTaskService = new RunEngineTriggerTaskService({ + engine, + prisma, + payloadProcessor: new MockPayloadProcessor(), + queueConcern: queuesManager, + idempotencyKeyConcern: new IdempotencyKeyConcern( + prisma, + engine, + new MockTraceEventConcern() + ), + validator: new MockTriggerTaskValidator(), + traceEventConcern: new MockTraceEventConcern(), + tracer: trace.getTracer("test", "0.0.0"), + metadataMaximumSize: 1024 * 1024 * 1, + }); + + // The task IS registered on the locked worker, but the replica returns + // nothing. Before the fix this threw "not found on locked version"; now + // the writer fallback resolves the registered row. + const result = await triggerTaskService.call({ + taskId: taskIdentifier, + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + lockToVersion: worker.worker.version, + }, + }, + }); + + expect(result).toBeDefined(); + expect(result?.run.status).toBe("PENDING"); + expect(result?.run.queue).toBe(`task/${taskIdentifier}`); + + // A genuinely unregistered task must still throw, even with the writer + // fallback — the writer has no row either, so the 422 is correct. + await expect( + triggerTaskService.call({ + taskId: "not-a-registered-task", + environment: authenticatedEnvironment, + body: { + payload: { test: "test" }, + options: { + lockToVersion: worker.worker.version, + }, + }, + }) + ).rejects.toThrow( + `Task 'not-a-registered-task' not found on locked version '${worker.worker.version}'` + ); + + await engine.quit(); + } + ); + containerTest( "should preserve runFriendlyId across retries when RunDuplicateIdempotencyKeyError is thrown", async ({ prisma, redisOptions }) => {