Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/locked-version-trigger-stale-replica.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: fix
---

Fix locked-version triggers such as triggerAndWait occasionally failing with "task not found on locked version" for a task that is actually registered, by confirming against the primary database when the read replica returns no row.
45 changes: 37 additions & 8 deletions apps/webapp/app/runEngine/concerns/queues.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,14 +268,27 @@ export class DefaultQueueManager implements QueueManager {
const cached = await this.taskMetaCache.getByWorker(workerId, slug);
if (cached) return cached;

const row = await this.replicaPrisma.backgroundWorkerTask.findFirst({
where: { workerId, runtimeEnvironmentId: environmentId, slug },
select: {
ttl: true,
triggerSource: true,
queue: { select: { id: true, name: true } },
},
});
// Cache miss. Read the row from the replica first; if the replica comes
// back empty, re-check the writer before concluding the task is missing.
// The locked worker itself was just resolved on the writer (see
// triggerTask.server.ts), so a replica that returns no row here is stale,
// not authoritative. Trusting a stale-replica negative throws a
// non-retryable "not found on locked version" for a task that is in fact
// registered. The writer read only runs on this rare miss-then-empty path,
// never on the hot path.
let row = await this.findLockedTaskRow(this.replicaPrisma, workerId, environmentId, slug);

if (!row && this.replicaPrisma !== this.prisma) {
row = await this.findLockedTaskRow(this.prisma, workerId, environmentId, slug);

if (row) {
logger.warn("Locked task metadata missing on replica but found on writer", {
workerId,
environmentId,
slug,
});
}
}

if (!row) return null;

Expand All @@ -294,6 +307,22 @@ export class DefaultQueueManager implements QueueManager {
return entry;
}

private findLockedTaskRow(
client: PrismaClientOrTransaction,
workerId: string,
environmentId: string,
slug: string
) {
return client.backgroundWorkerTask.findFirst({
where: { workerId, runtimeEnvironmentId: environmentId, slug },
select: {
ttl: true,
triggerSource: true,
queue: { select: { id: true, name: true } },
},
});
}

/**
* Resolve task metadata for a non-locked trigger. Reads from the
* `task-meta:env:{envId}` Redis hash; falls back to
Expand Down
128 changes: 127 additions & 1 deletion apps/webapp/test/engine/triggerTask.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import { TaskRun } from "@trigger.dev/database";
import { Redis } from "ioredis";
import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server";
import { DefaultQueueManager } from "~/runEngine/concerns/queues.server";
import { RedisTaskMetadataCache } from "~/services/taskMetadataCache.server";
import {
NoopTaskMetadataCache,
RedisTaskMetadataCache,
} from "~/services/taskMetadataCache.server";
import {
EntitlementValidationParams,
MaxAttemptsValidationParams,
Expand Down Expand Up @@ -949,6 +952,129 @@ describe("RunEngineTriggerTaskService", () => {
}
);

containerTest(
"should fall back to the writer when a stale replica returns no row for a locked task",
async ({ prisma, redisOptions }) => {
const engine = new RunEngine({
prisma,
worker: {
redis: redisOptions,
workers: 1,
tasksPerWorker: 10,
pollIntervalMs: 100,
},
queue: {
redis: redisOptions,
},
runLock: {
redis: redisOptions,
},
machines: {
defaultMachine: "small-1x",
machines: {
"small-1x": {
name: "small-1x" as const,
cpu: 0.5,
memory: 0.5,
centsPerMs: 0.0001,
},
},
baseCostInCents: 0.0005,
},
tracer: trace.getTracer("test", "0.0.0"),
});

const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
const taskIdentifier = "test-task";

const worker = await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);

// A read replica that has not yet caught up to the BackgroundWorkerTask
// row: it is the real database for every query except the locked-task
// lookup, which comes back empty (the TRI-10868 false-negative window).
const staleReplica = new Proxy(prisma, {
get(target, prop, receiver) {
if (prop === "backgroundWorkerTask") {
const delegate = Reflect.get(target, prop, receiver);
return new Proxy(delegate, {
get(taskTarget, taskProp, taskReceiver) {
if (taskProp === "findFirst") {
return async () => null;
}
const value = Reflect.get(taskTarget, taskProp, taskReceiver);
return typeof value === "function" ? value.bind(taskTarget) : value;
},
});
}
const value = Reflect.get(target, prop, receiver);
return typeof value === "function" ? value.bind(target) : value;
},
}) as typeof prisma;

// Noop cache so every resolve misses the cache and exercises the
// replica -> writer fallback. The writer is the real `prisma`.
const queuesManager = new DefaultQueueManager(
prisma,
engine,
staleReplica,
new NoopTaskMetadataCache()
);

const triggerTaskService = new RunEngineTriggerTaskService({
engine,
prisma,
payloadProcessor: new MockPayloadProcessor(),
queueConcern: queuesManager,
idempotencyKeyConcern: new IdempotencyKeyConcern(
prisma,
engine,
new MockTraceEventConcern()
),
validator: new MockTriggerTaskValidator(),
traceEventConcern: new MockTraceEventConcern(),
tracer: trace.getTracer("test", "0.0.0"),
metadataMaximumSize: 1024 * 1024 * 1,
});

// The task IS registered on the locked worker, but the replica returns
// nothing. Before the fix this threw "not found on locked version"; now
// the writer fallback resolves the registered row.
const result = await triggerTaskService.call({
taskId: taskIdentifier,
environment: authenticatedEnvironment,
body: {
payload: { test: "test" },
options: {
lockToVersion: worker.worker.version,
},
},
});

expect(result).toBeDefined();
expect(result?.run.status).toBe("PENDING");
expect(result?.run.queue).toBe(`task/${taskIdentifier}`);

// A genuinely unregistered task must still throw, even with the writer
// fallback — the writer has no row either, so the 422 is correct.
await expect(
triggerTaskService.call({
taskId: "not-a-registered-task",
environment: authenticatedEnvironment,
body: {
payload: { test: "test" },
options: {
lockToVersion: worker.worker.version,
},
},
})
).rejects.toThrow(
`Task 'not-a-registered-task' not found on locked version '${worker.worker.version}'`
);

await engine.quit();
}
);

containerTest(
"should preserve runFriendlyId across retries when RunDuplicateIdempotencyKeyError is thrown",
async ({ prisma, redisOptions }) => {
Expand Down