From 338df64e365130148b202037649a912782c13130 Mon Sep 17 00:00:00 2001 From: Vamil Gandhi <13998000+vamgan@users.noreply.github.com> Date: Mon, 11 May 2026 01:49:23 -0400 Subject: [PATCH] Validate adapter event task ids --- src/runtime.ts | 18 +++++++++++++++--- test/runtime.test.ts | 39 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/src/runtime.ts b/src/runtime.ts index 2ae5617..d84a9db 100644 --- a/src/runtime.ts +++ b/src/runtime.ts @@ -187,9 +187,11 @@ export class RuntimeService { } for await (const event of adapter.streamEvents(task.id)) { - await this.store.appendEvent(event); - if (event.type === "task.log" && event.message) { - await this.store.appendLog(task.id, `${event.message}\n`); + const validatedEvent = this.validateAdapterEvent(task.id, event); + if (!validatedEvent) continue; + await this.store.appendEvent(validatedEvent); + if (validatedEvent.type === "task.log" && validatedEvent.message) { + await this.store.appendLog(task.id, `${validatedEvent.message}\n`); } } @@ -288,6 +290,16 @@ export class RuntimeService { : undefined; } + private validateAdapterEvent(taskId: string, event: RuntimeEvent): RuntimeEvent | undefined { + if (event.taskId === taskId) return event; + void this.store.appendEvent(this.event(taskId, "task.log", "Ignored adapter event with mismatched taskId.", { + expectedTaskId: taskId, + receivedTaskId: event.taskId, + eventType: event.type + })); + return undefined; + } + private event(taskId: string, type: RuntimeEvent["type"], message?: string, payload?: Record): RuntimeEvent { return { taskId, type, message, payload, timestamp: nowIso() }; } diff --git a/test/runtime.test.ts b/test/runtime.test.ts index 8feeb9c..a9a6d8b 100644 --- a/test/runtime.test.ts +++ b/test/runtime.test.ts @@ -55,7 +55,11 @@ function mockAdapter(events: RuntimeEvent[], cleanup: CleanupResult = { status: }), provision: async () => ({}), startTask: async () => ({ result: { ok: true } }), - streamEvents: async function* () { yield* events; }, + streamEvents: async function* (taskId: string) { + for (const event of events) { + yield { ...event, taskId: event.taskId === "unused" ? taskId : event.taskId }; + } + }, cancel: async () => ({ status: "cancelled" }), cleanup: async () => cleanup }; @@ -111,6 +115,39 @@ describe("RuntimeService", () => { })).rejects.toMatchObject({ code: "policy.denied" }); }); + it("ignores adapter events for a different task id", async () => { + const store = new MemoryStore(); + const request: DispatchRequest = { + provider: "aws", + accountProfile: "dev-aws", + capability: "agent-runtime", + taskType: "agent.run", + target: { mode: "session" }, + input: { instruction: "run" } + }; + const service = new RuntimeService({ + config: { + accounts: { "dev-aws": { provider: "aws", credentialSource: "aws-sdk-default" } }, + backends: {} + }, + store, + adapters: [mockAdapter([ + { taskId: "wrong_task", type: "task.log", message: "wrong task log", timestamp: nowIso() }, + { taskId: "unused", type: "task.log", message: "correct task log", timestamp: nowIso() } + ])] + }); + + const handle = await service.dispatchTask(request); + await waitForStatus(service, handle.taskId, "succeeded"); + + expect(store.events.get("wrong_task")).toBeUndefined(); + expect(store.logs.get(handle.taskId)).not.toContain("wrong task log"); + expect(store.logs.get(handle.taskId)).toContain("correct task log"); + expect(store.events.get(handle.taskId)).toEqual(expect.arrayContaining([ + expect.objectContaining({ type: "task.log", message: "Ignored adapter event with mismatched taskId." }) + ])); + }); + it("persists runtime cleanup status after successful runtime tasks", async () => { const store = new MemoryStore(); const request: DispatchRequest = {