@@ -23,7 +23,10 @@ import { TaskRun } from "@trigger.dev/database";
2323import { Redis } from "ioredis" ;
2424import { IdempotencyKeyConcern } from "~/runEngine/concerns/idempotencyKeys.server" ;
2525import { DefaultQueueManager } from "~/runEngine/concerns/queues.server" ;
26- import { RedisTaskMetadataCache } from "~/services/taskMetadataCache.server" ;
26+ import {
27+ NoopTaskMetadataCache ,
28+ RedisTaskMetadataCache ,
29+ } from "~/services/taskMetadataCache.server" ;
2730import {
2831 EntitlementValidationParams ,
2932 MaxAttemptsValidationParams ,
@@ -949,6 +952,129 @@ describe("RunEngineTriggerTaskService", () => {
949952 }
950953 ) ;
951954
955+ containerTest (
956+ "should fall back to the writer when a stale replica returns no row for a locked task" ,
957+ async ( { prisma, redisOptions } ) => {
958+ const engine = new RunEngine ( {
959+ prisma,
960+ worker : {
961+ redis : redisOptions ,
962+ workers : 1 ,
963+ tasksPerWorker : 10 ,
964+ pollIntervalMs : 100 ,
965+ } ,
966+ queue : {
967+ redis : redisOptions ,
968+ } ,
969+ runLock : {
970+ redis : redisOptions ,
971+ } ,
972+ machines : {
973+ defaultMachine : "small-1x" ,
974+ machines : {
975+ "small-1x" : {
976+ name : "small-1x" as const ,
977+ cpu : 0.5 ,
978+ memory : 0.5 ,
979+ centsPerMs : 0.0001 ,
980+ } ,
981+ } ,
982+ baseCostInCents : 0.0005 ,
983+ } ,
984+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
985+ } ) ;
986+
987+ const authenticatedEnvironment = await setupAuthenticatedEnvironment ( prisma , "PRODUCTION" ) ;
988+ const taskIdentifier = "test-task" ;
989+
990+ const worker = await setupBackgroundWorker ( engine , authenticatedEnvironment , taskIdentifier ) ;
991+
992+ // A read replica that has not yet caught up to the BackgroundWorkerTask
993+ // row: it is the real database for every query except the locked-task
994+ // lookup, which comes back empty (the TRI-10868 false-negative window).
995+ const staleReplica = new Proxy ( prisma , {
996+ get ( target , prop , receiver ) {
997+ if ( prop === "backgroundWorkerTask" ) {
998+ const delegate = Reflect . get ( target , prop , receiver ) ;
999+ return new Proxy ( delegate , {
1000+ get ( taskTarget , taskProp , taskReceiver ) {
1001+ if ( taskProp === "findFirst" ) {
1002+ return async ( ) => null ;
1003+ }
1004+ const value = Reflect . get ( taskTarget , taskProp , taskReceiver ) ;
1005+ return typeof value === "function" ? value . bind ( taskTarget ) : value ;
1006+ } ,
1007+ } ) ;
1008+ }
1009+ const value = Reflect . get ( target , prop , receiver ) ;
1010+ return typeof value === "function" ? value . bind ( target ) : value ;
1011+ } ,
1012+ } ) as typeof prisma ;
1013+
1014+ // Noop cache so every resolve misses the cache and exercises the
1015+ // replica -> writer fallback. The writer is the real `prisma`.
1016+ const queuesManager = new DefaultQueueManager (
1017+ prisma ,
1018+ engine ,
1019+ staleReplica ,
1020+ new NoopTaskMetadataCache ( )
1021+ ) ;
1022+
1023+ const triggerTaskService = new RunEngineTriggerTaskService ( {
1024+ engine,
1025+ prisma,
1026+ payloadProcessor : new MockPayloadProcessor ( ) ,
1027+ queueConcern : queuesManager ,
1028+ idempotencyKeyConcern : new IdempotencyKeyConcern (
1029+ prisma ,
1030+ engine ,
1031+ new MockTraceEventConcern ( )
1032+ ) ,
1033+ validator : new MockTriggerTaskValidator ( ) ,
1034+ traceEventConcern : new MockTraceEventConcern ( ) ,
1035+ tracer : trace . getTracer ( "test" , "0.0.0" ) ,
1036+ metadataMaximumSize : 1024 * 1024 * 1 ,
1037+ } ) ;
1038+
1039+ // The task IS registered on the locked worker, but the replica returns
1040+ // nothing. Before the fix this threw "not found on locked version"; now
1041+ // the writer fallback resolves the registered row.
1042+ const result = await triggerTaskService . call ( {
1043+ taskId : taskIdentifier ,
1044+ environment : authenticatedEnvironment ,
1045+ body : {
1046+ payload : { test : "test" } ,
1047+ options : {
1048+ lockToVersion : worker . worker . version ,
1049+ } ,
1050+ } ,
1051+ } ) ;
1052+
1053+ expect ( result ) . toBeDefined ( ) ;
1054+ expect ( result ?. run . status ) . toBe ( "PENDING" ) ;
1055+ expect ( result ?. run . queue ) . toBe ( `task/${ taskIdentifier } ` ) ;
1056+
1057+ // A genuinely unregistered task must still throw, even with the writer
1058+ // fallback — the writer has no row either, so the 422 is correct.
1059+ await expect (
1060+ triggerTaskService . call ( {
1061+ taskId : "not-a-registered-task" ,
1062+ environment : authenticatedEnvironment ,
1063+ body : {
1064+ payload : { test : "test" } ,
1065+ options : {
1066+ lockToVersion : worker . worker . version ,
1067+ } ,
1068+ } ,
1069+ } )
1070+ ) . rejects . toThrow (
1071+ `Task 'not-a-registered-task' not found on locked version '${ worker . worker . version } '`
1072+ ) ;
1073+
1074+ await engine . quit ( ) ;
1075+ }
1076+ ) ;
1077+
9521078 containerTest (
9531079 "should preserve runFriendlyId across retries when RunDuplicateIdempotencyKeyError is thrown" ,
9541080 async ( { prisma, redisOptions } ) => {
0 commit comments