diff --git a/src/runtime.ts b/src/runtime.ts index 2ae5617..27d9b80 100644 --- a/src/runtime.ts +++ b/src/runtime.ts @@ -11,7 +11,7 @@ import { AgentDispatchError, toRuntimeError } from "./errors.js"; import { createId, nowIso } from "./ids.js"; import { authorizeDispatchRequest } from "./policy.js"; import type { TaskStore } from "./store.js"; -import type { DispatchRequest, RuntimeEvent, TaskHandle, TaskRecord, TaskResult } from "./types.js"; +import type { CleanupResult, DispatchRequest, ProvisionResult, RuntimeEvent, RuntimeTarget, TaskHandle, TaskRecord, TaskResult } from "./types.js"; export interface RuntimeServiceOptions { config: AgentDispatchConfig; @@ -19,6 +19,10 @@ export interface RuntimeServiceOptions { adapters: BackendAdapter[]; } +function isTerminalStatus(status: TaskRecord["status"]): boolean { + return status === "succeeded" || status === "failed" || status === "cancelled"; +} + export class RuntimeService { private readonly config: AgentDispatchConfig; private readonly store: TaskStore; @@ -132,6 +136,9 @@ export class RuntimeService { async cancelTask(taskId: string) { const task = await this.getTaskStatus(taskId); + if (isTerminalStatus(task.status)) { + throw new AgentDispatchError({ code: "task.terminal", message: `Task ${taskId} is already ${task.status}.` }); + } const adapter = this.adapters.find((candidate) => candidate.name === task.backend); if (!adapter) { throw new AgentDispatchError({ code: "adapter.not_found", message: `Adapter ${task.backend} was not found.` }); @@ -139,10 +146,14 @@ export class RuntimeService { await this.store.updateTask(taskId, { status: "cancelling", updatedAt: nowIso() }); await this.store.appendEvent(this.event(taskId, "task.cancelling", "Cancellation requested.")); const result = await adapter.cancel(taskId); + const latestTask = await this.getTaskStatus(taskId); + if (isTerminalStatus(latestTask.status)) { + return result; + } const status = result.status === "cancelled" ? "cancelled" : "failed"; await this.store.updateTask(taskId, { status, - providerRefs: { ...task.providerRefs, ...result.providerRefs }, + providerRefs: { ...latestTask.providerRefs, ...result.providerRefs }, error: result.error, updatedAt: nowIso() }); @@ -151,11 +162,15 @@ export class RuntimeService { } private async runTask(adapter: BackendAdapter, request: DispatchRequest, task: TaskRecord): Promise { + let target: RuntimeTarget | undefined; + let provisioned: ProvisionResult | undefined; + let cleanupAttempted = false; try { await this.store.updateTask(task.id, { status: "provisioning", updatedAt: nowIso() }); await this.store.appendEvent(this.event(task.id, "task.provisioning", "Resolving provider target.")); const resolved = await adapter.resolveTarget(request); - const provisioned = await adapter.provision({ dispatch: request, task, target: resolved.target }); + target = resolved.target; + provisioned = await adapter.provision({ dispatch: request, task, target }); if (provisioned.runtime) { await this.store.saveRuntime(provisioned.runtime); } @@ -164,7 +179,7 @@ export class RuntimeService { } await this.store.updateTask(task.id, { status: "starting", - providerRefs: { ...task.providerRefs, ...provisioned.providerRefs, ...resolved.target.providerRefs }, + providerRefs: { ...task.providerRefs, ...provisioned.providerRefs, ...target.providerRefs }, updatedAt: nowIso() }); await this.store.appendEvent(this.event(task.id, "task.started", "Starting provider task.")); @@ -172,11 +187,16 @@ export class RuntimeService { const started = await adapter.startTask({ dispatch: request, task, - target: resolved.target, + target, runtime: provisioned.runtime, session: provisioned.session }); + const afterStartTask = await this.store.getTask(task.id); + if (afterStartTask?.status === "cancelled" || afterStartTask?.status === "failed" || afterStartTask?.status === "cancelling") { + return; + } + await this.store.updateTask(task.id, { status: "running", providerRefs: { ...(await this.latestProviderRefs(task.id)), ...started.providerRefs }, @@ -187,6 +207,12 @@ export class RuntimeService { } for await (const event of adapter.streamEvents(task.id)) { + if (event.taskId !== task.id) { + throw new AgentDispatchError({ + code: "adapter.event_task_mismatch", + message: `Adapter ${adapter.name} emitted an event for ${event.taskId}, expected ${task.id}.` + }); + } await this.store.appendEvent(event); if (event.type === "task.log" && event.message) { await this.store.appendLog(task.id, `${event.message}\n`); @@ -194,7 +220,7 @@ export class RuntimeService { } const finalTask = await this.store.getTask(task.id); - if (finalTask?.status === "cancelled" || finalTask?.status === "failed") { + if (finalTask?.status === "cancelled" || finalTask?.status === "failed" || finalTask?.status === "cancelling") { return; } await this.store.updateTask(task.id, { @@ -203,28 +229,40 @@ export class RuntimeService { updatedAt: nowIso() }); await this.store.appendEvent(this.event(task.id, "task.succeeded", "Task completed.")); - const cleanup = await adapter.cleanup(resolved.target); - if (provisioned.runtime) { - await this.store.updateRuntime(provisioned.runtime.id, { - status: cleanup.status === "completed" ? "deleted" : cleanup.status === "failed" ? "failed" : provisioned.runtime.status, - cleanupStatus: cleanup.status === "completed" ? "completed" : cleanup.status === "failed" ? "failed" : provisioned.runtime.cleanupStatus, - providerRefs: { ...provisioned.runtime.providerRefs, ...cleanup.providerRefs }, - updatedAt: nowIso() - }); - } - if (cleanup.status !== "skipped") { - await this.store.appendEvent(this.event( - task.id, - cleanup.status === "failed" ? "task.failed" : "task.progress", - cleanup.status === "failed" ? cleanup.error?.message ?? "Runtime cleanup failed." : "Runtime cleanup completed.", - { cleanup } - )); - } + cleanupAttempted = true; + await this.cleanupProvisionedRuntime(adapter, task.id, target, provisioned); } catch (error) { const runtimeError = toRuntimeError(error); await this.store.updateTask(task.id, { status: "failed", error: runtimeError, updatedAt: nowIso() }); await this.store.appendEvent(this.event(task.id, "task.failed", runtimeError.message, { error: runtimeError })); + if (target && provisioned && !cleanupAttempted) { + await this.cleanupProvisionedRuntime(adapter, task.id, target, provisioned).catch(async (cleanupError) => { + const normalized = toRuntimeError(cleanupError); + await this.store.appendEvent(this.event(task.id, "task.failed", normalized.message, { cleanupError: normalized })); + }); + } + } + } + + private async cleanupProvisionedRuntime(adapter: BackendAdapter, taskId: string, target: RuntimeTarget, provisioned: ProvisionResult): Promise { + const cleanup = await adapter.cleanup(target); + if (provisioned.runtime) { + await this.store.updateRuntime(provisioned.runtime.id, { + status: cleanup.status === "completed" ? "deleted" : cleanup.status === "failed" ? "failed" : provisioned.runtime.status, + cleanupStatus: cleanup.status === "completed" ? "completed" : cleanup.status === "failed" ? "failed" : provisioned.runtime.cleanupStatus, + providerRefs: { ...provisioned.runtime.providerRefs, ...cleanup.providerRefs }, + updatedAt: nowIso() + }); + } + if (cleanup.status !== "skipped") { + await this.store.appendEvent(this.event( + taskId, + cleanup.status === "failed" ? "task.failed" : "task.progress", + cleanup.status === "failed" ? cleanup.error?.message ?? "Runtime cleanup failed." : "Runtime cleanup completed.", + { cleanup } + )); } + return cleanup; } private createTaskRecord(request: DispatchRequest, backend: string): TaskRecord { diff --git a/test/runtime.test.ts b/test/runtime.test.ts index 8feeb9c..084bbf9 100644 --- a/test/runtime.test.ts +++ b/test/runtime.test.ts @@ -55,7 +55,11 @@ function mockAdapter(events: RuntimeEvent[], cleanup: CleanupResult = { status: }), provision: async () => ({}), startTask: async () => ({ result: { ok: true } }), - streamEvents: async function* () { yield* events; }, + streamEvents: async function* (taskId: string) { + for (const event of events) { + yield { ...event, taskId: event.taskId === "unused" ? taskId : event.taskId }; + } + }, cancel: async () => ({ status: "cancelled" }), cleanup: async () => cleanup }; @@ -218,8 +222,133 @@ describe("RuntimeService", () => { cleanupStatus: "failed" }); }); + + it("cleans up provisioned runtime resources when startup fails", async () => { + const store = new MemoryStore(); + const timestamp = nowIso(); + let cleanupCalls = 0; + const adapter: BackendAdapter = { + ...mockAdapter([]), + capabilities: () => [{ provider: "aws", capability: "agent-runtime", taskTypes: ["agent.run"], targetModes: ["runtime"] }], + resolveTarget: async (dispatch) => ({ + account: { name: dispatch.accountProfile, provider: "aws", credentialSource: "test" }, + target: { provider: "aws", accountProfile: dispatch.accountProfile, capability: "agent-runtime", backend: "mock-agent-runtime", mode: "runtime" } + }), + provision: async ({ task, dispatch }) => ({ + runtime: { + id: "runtime_start_failed", + taskId: task.id, + provider: dispatch.provider, + accountProfile: dispatch.accountProfile, + capability: dispatch.capability, + backend: "mock-agent-runtime", + status: "ready", + providerRefs: {}, + cleanupStatus: "pending", + createdAt: timestamp, + updatedAt: timestamp + } + }), + startTask: async () => { throw new Error("start failed"); }, + cleanup: async () => { + cleanupCalls += 1; + return { status: "completed" }; + } + }; + const service = runtimeService(store, adapter); + + const handle = await service.dispatchTask(runtimeRequest()); + await waitForStatus(service, handle.taskId, "failed"); + + expect(cleanupCalls).toBe(1); + expect(store.runtimes.get("runtime_start_failed")).toMatchObject({ status: "deleted", cleanupStatus: "completed" }); + }); + + it("does not mutate terminal tasks when cancellation is requested", async () => { + const store = new MemoryStore(); + const service = runtimeService(store, mockAdapter([])); + const handle = await service.dispatchTask(sessionRequest()); + await waitForStatus(service, handle.taskId, "succeeded"); + + await expect(service.cancelTask(handle.taskId)).rejects.toMatchObject({ code: "task.terminal" }); + await expect(service.getTaskStatus(handle.taskId)).resolves.toMatchObject({ status: "succeeded" }); + }); + + it("does not mark a task succeeded while cancellation is in progress", async () => { + const store = new MemoryStore(); + let releaseCancel!: () => void; + let releaseStart!: () => void; + const cancelStarted = new Promise((resolve) => { releaseCancel = resolve; }); + const startCanFinish = new Promise((resolve) => { releaseStart = resolve; }); + const adapter: BackendAdapter = { + ...mockAdapter([]), + cancel: async () => { + await cancelStarted; + return { status: "cancelled" }; + }, + startTask: async () => { + await startCanFinish; + return { result: { ok: true } }; + }, + streamEvents: async function* (taskId: string) { + yield { taskId, type: "task.log", message: "done" }; + } + }; + const service = runtimeService(store, adapter); + + const handle = await service.dispatchTask(sessionRequest()); + await waitForStatus(service, handle.taskId, "starting"); + const cancelPromise = service.cancelTask(handle.taskId); + await waitForStatus(service, handle.taskId, "cancelling"); + releaseStart(); + await new Promise((resolve) => setTimeout(resolve, 30)); + await expect(service.getTaskStatus(handle.taskId)).resolves.toMatchObject({ status: "cancelling" }); + releaseCancel(); + await cancelPromise; + await expect(service.getTaskStatus(handle.taskId)).resolves.toMatchObject({ status: "cancelled" }); + }); + + it("fails the task when adapters emit events for the wrong task", async () => { + const store = new MemoryStore(); + const adapter: BackendAdapter = { + ...mockAdapter([]), + streamEvents: async function* () { + yield { taskId: "other_task", type: "task.log", message: "wrong" }; + } + }; + const service = runtimeService(store, adapter); + + const handle = await service.dispatchTask(sessionRequest()); + const task = await waitForStatus(service, handle.taskId, "failed"); + + expect(task.error?.code).toBe("adapter.event_task_mismatch"); + expect(await store.listEvents("other_task")).toHaveLength(0); + }); }); +function sessionRequest(): DispatchRequest { + return { + provider: "aws", + accountProfile: "dev-aws", + capability: "agent-runtime", + taskType: "agent.run", + target: { mode: "session" }, + input: { instruction: "run" } + }; +} + +function runtimeRequest(): DispatchRequest { + return { ...sessionRequest(), target: { mode: "runtime" } }; +} + +function runtimeService(store: TaskStore, adapter: BackendAdapter): RuntimeService { + return new RuntimeService({ + config: { accounts: { "dev-aws": { provider: "aws", credentialSource: "aws-sdk-default" } }, backends: {} }, + store, + adapters: [adapter] + }); +} + async function waitForStatus(service: RuntimeService, taskId: string, status: string): Promise { for (let attempt = 0; attempt < 20; attempt += 1) { const task = await service.getTaskStatus(taskId);