Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 61 additions & 23 deletions src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,18 @@ import { AgentDispatchError, toRuntimeError } from "./errors.js";
import { createId, nowIso } from "./ids.js";
import { authorizeDispatchRequest } from "./policy.js";
import type { TaskStore } from "./store.js";
import type { DispatchRequest, RuntimeEvent, TaskHandle, TaskRecord, TaskResult } from "./types.js";
import type { CleanupResult, DispatchRequest, ProvisionResult, RuntimeEvent, RuntimeTarget, TaskHandle, TaskRecord, TaskResult } from "./types.js";

export interface RuntimeServiceOptions {
config: AgentDispatchConfig;
store: TaskStore;
adapters: BackendAdapter[];
}

function isTerminalStatus(status: TaskRecord["status"]): boolean {
return status === "succeeded" || status === "failed" || status === "cancelled";
}

export class RuntimeService {
private readonly config: AgentDispatchConfig;
private readonly store: TaskStore;
Expand Down Expand Up @@ -132,17 +136,24 @@ export class RuntimeService {

async cancelTask(taskId: string) {
const task = await this.getTaskStatus(taskId);
if (isTerminalStatus(task.status)) {
throw new AgentDispatchError({ code: "task.terminal", message: `Task ${taskId} is already ${task.status}.` });
}
const adapter = this.adapters.find((candidate) => candidate.name === task.backend);
if (!adapter) {
throw new AgentDispatchError({ code: "adapter.not_found", message: `Adapter ${task.backend} was not found.` });
}
await this.store.updateTask(taskId, { status: "cancelling", updatedAt: nowIso() });
await this.store.appendEvent(this.event(taskId, "task.cancelling", "Cancellation requested."));
const result = await adapter.cancel(taskId);
const latestTask = await this.getTaskStatus(taskId);
if (isTerminalStatus(latestTask.status)) {
return result;
}
const status = result.status === "cancelled" ? "cancelled" : "failed";
await this.store.updateTask(taskId, {
status,
providerRefs: { ...task.providerRefs, ...result.providerRefs },
providerRefs: { ...latestTask.providerRefs, ...result.providerRefs },
error: result.error,
updatedAt: nowIso()
});
Expand All @@ -151,11 +162,15 @@ export class RuntimeService {
}

private async runTask(adapter: BackendAdapter, request: DispatchRequest, task: TaskRecord): Promise<void> {
let target: RuntimeTarget | undefined;
let provisioned: ProvisionResult | undefined;
let cleanupAttempted = false;
try {
await this.store.updateTask(task.id, { status: "provisioning", updatedAt: nowIso() });
await this.store.appendEvent(this.event(task.id, "task.provisioning", "Resolving provider target."));
const resolved = await adapter.resolveTarget(request);
const provisioned = await adapter.provision({ dispatch: request, task, target: resolved.target });
target = resolved.target;
provisioned = await adapter.provision({ dispatch: request, task, target });
if (provisioned.runtime) {
await this.store.saveRuntime(provisioned.runtime);
}
Expand All @@ -164,19 +179,24 @@ export class RuntimeService {
}
await this.store.updateTask(task.id, {
status: "starting",
providerRefs: { ...task.providerRefs, ...provisioned.providerRefs, ...resolved.target.providerRefs },
providerRefs: { ...task.providerRefs, ...provisioned.providerRefs, ...target.providerRefs },
updatedAt: nowIso()
});
await this.store.appendEvent(this.event(task.id, "task.started", "Starting provider task."));

const started = await adapter.startTask({
dispatch: request,
task,
target: resolved.target,
target,
runtime: provisioned.runtime,
session: provisioned.session
});

const afterStartTask = await this.store.getTask(task.id);
if (afterStartTask?.status === "cancelled" || afterStartTask?.status === "failed" || afterStartTask?.status === "cancelling") {
return;
}

await this.store.updateTask(task.id, {
status: "running",
providerRefs: { ...(await this.latestProviderRefs(task.id)), ...started.providerRefs },
Expand All @@ -187,14 +207,20 @@ export class RuntimeService {
}

for await (const event of adapter.streamEvents(task.id)) {
if (event.taskId !== task.id) {
throw new AgentDispatchError({
code: "adapter.event_task_mismatch",
message: `Adapter ${adapter.name} emitted an event for ${event.taskId}, expected ${task.id}.`
});
}
await this.store.appendEvent(event);
if (event.type === "task.log" && event.message) {
await this.store.appendLog(task.id, `${event.message}\n`);
}
}

const finalTask = await this.store.getTask(task.id);
if (finalTask?.status === "cancelled" || finalTask?.status === "failed") {
if (finalTask?.status === "cancelled" || finalTask?.status === "failed" || finalTask?.status === "cancelling") {
return;
}
await this.store.updateTask(task.id, {
Expand All @@ -203,28 +229,40 @@ export class RuntimeService {
updatedAt: nowIso()
});
await this.store.appendEvent(this.event(task.id, "task.succeeded", "Task completed."));
const cleanup = await adapter.cleanup(resolved.target);
if (provisioned.runtime) {
await this.store.updateRuntime(provisioned.runtime.id, {
status: cleanup.status === "completed" ? "deleted" : cleanup.status === "failed" ? "failed" : provisioned.runtime.status,
cleanupStatus: cleanup.status === "completed" ? "completed" : cleanup.status === "failed" ? "failed" : provisioned.runtime.cleanupStatus,
providerRefs: { ...provisioned.runtime.providerRefs, ...cleanup.providerRefs },
updatedAt: nowIso()
});
}
if (cleanup.status !== "skipped") {
await this.store.appendEvent(this.event(
task.id,
cleanup.status === "failed" ? "task.failed" : "task.progress",
cleanup.status === "failed" ? cleanup.error?.message ?? "Runtime cleanup failed." : "Runtime cleanup completed.",
{ cleanup }
));
}
cleanupAttempted = true;
await this.cleanupProvisionedRuntime(adapter, task.id, target, provisioned);
} catch (error) {
const runtimeError = toRuntimeError(error);
await this.store.updateTask(task.id, { status: "failed", error: runtimeError, updatedAt: nowIso() });
await this.store.appendEvent(this.event(task.id, "task.failed", runtimeError.message, { error: runtimeError }));
if (target && provisioned && !cleanupAttempted) {
await this.cleanupProvisionedRuntime(adapter, task.id, target, provisioned).catch(async (cleanupError) => {
const normalized = toRuntimeError(cleanupError);
await this.store.appendEvent(this.event(task.id, "task.failed", normalized.message, { cleanupError: normalized }));
});
}
}
}

private async cleanupProvisionedRuntime(adapter: BackendAdapter, taskId: string, target: RuntimeTarget, provisioned: ProvisionResult): Promise<CleanupResult> {
const cleanup = await adapter.cleanup(target);
if (provisioned.runtime) {
await this.store.updateRuntime(provisioned.runtime.id, {
status: cleanup.status === "completed" ? "deleted" : cleanup.status === "failed" ? "failed" : provisioned.runtime.status,
cleanupStatus: cleanup.status === "completed" ? "completed" : cleanup.status === "failed" ? "failed" : provisioned.runtime.cleanupStatus,
providerRefs: { ...provisioned.runtime.providerRefs, ...cleanup.providerRefs },
updatedAt: nowIso()
});
}
if (cleanup.status !== "skipped") {
await this.store.appendEvent(this.event(
taskId,
cleanup.status === "failed" ? "task.failed" : "task.progress",
cleanup.status === "failed" ? cleanup.error?.message ?? "Runtime cleanup failed." : "Runtime cleanup completed.",
{ cleanup }
));
}
return cleanup;
}

private createTaskRecord(request: DispatchRequest, backend: string): TaskRecord {
Expand Down
131 changes: 130 additions & 1 deletion test/runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ function mockAdapter(events: RuntimeEvent[], cleanup: CleanupResult = { status:
}),
provision: async () => ({}),
startTask: async () => ({ result: { ok: true } }),
streamEvents: async function* () { yield* events; },
streamEvents: async function* (taskId: string) {
for (const event of events) {
yield { ...event, taskId: event.taskId === "unused" ? taskId : event.taskId };
}
},
cancel: async () => ({ status: "cancelled" }),
cleanup: async () => cleanup
};
Expand Down Expand Up @@ -218,8 +222,133 @@ describe("RuntimeService", () => {
cleanupStatus: "failed"
});
});

it("cleans up provisioned runtime resources when startup fails", async () => {
const store = new MemoryStore();
const timestamp = nowIso();
let cleanupCalls = 0;
const adapter: BackendAdapter = {
...mockAdapter([]),
capabilities: () => [{ provider: "aws", capability: "agent-runtime", taskTypes: ["agent.run"], targetModes: ["runtime"] }],
resolveTarget: async (dispatch) => ({
account: { name: dispatch.accountProfile, provider: "aws", credentialSource: "test" },
target: { provider: "aws", accountProfile: dispatch.accountProfile, capability: "agent-runtime", backend: "mock-agent-runtime", mode: "runtime" }
}),
provision: async ({ task, dispatch }) => ({
runtime: {
id: "runtime_start_failed",
taskId: task.id,
provider: dispatch.provider,
accountProfile: dispatch.accountProfile,
capability: dispatch.capability,
backend: "mock-agent-runtime",
status: "ready",
providerRefs: {},
cleanupStatus: "pending",
createdAt: timestamp,
updatedAt: timestamp
}
}),
startTask: async () => { throw new Error("start failed"); },
cleanup: async () => {
cleanupCalls += 1;
return { status: "completed" };
}
};
const service = runtimeService(store, adapter);

const handle = await service.dispatchTask(runtimeRequest());
await waitForStatus(service, handle.taskId, "failed");

expect(cleanupCalls).toBe(1);
expect(store.runtimes.get("runtime_start_failed")).toMatchObject({ status: "deleted", cleanupStatus: "completed" });
});

it("does not mutate terminal tasks when cancellation is requested", async () => {
const store = new MemoryStore();
const service = runtimeService(store, mockAdapter([]));
const handle = await service.dispatchTask(sessionRequest());
await waitForStatus(service, handle.taskId, "succeeded");

await expect(service.cancelTask(handle.taskId)).rejects.toMatchObject({ code: "task.terminal" });
await expect(service.getTaskStatus(handle.taskId)).resolves.toMatchObject({ status: "succeeded" });
});

it("does not mark a task succeeded while cancellation is in progress", async () => {
const store = new MemoryStore();
let releaseCancel!: () => void;
let releaseStart!: () => void;
const cancelStarted = new Promise<void>((resolve) => { releaseCancel = resolve; });
const startCanFinish = new Promise<void>((resolve) => { releaseStart = resolve; });
const adapter: BackendAdapter = {
...mockAdapter([]),
cancel: async () => {
await cancelStarted;
return { status: "cancelled" };
},
startTask: async () => {
await startCanFinish;
return { result: { ok: true } };
},
streamEvents: async function* (taskId: string) {
yield { taskId, type: "task.log", message: "done" };
}
};
const service = runtimeService(store, adapter);

const handle = await service.dispatchTask(sessionRequest());
await waitForStatus(service, handle.taskId, "starting");
const cancelPromise = service.cancelTask(handle.taskId);
await waitForStatus(service, handle.taskId, "cancelling");
releaseStart();
await new Promise((resolve) => setTimeout(resolve, 30));
await expect(service.getTaskStatus(handle.taskId)).resolves.toMatchObject({ status: "cancelling" });
releaseCancel();
await cancelPromise;
await expect(service.getTaskStatus(handle.taskId)).resolves.toMatchObject({ status: "cancelled" });
});

it("fails the task when adapters emit events for the wrong task", async () => {
const store = new MemoryStore();
const adapter: BackendAdapter = {
...mockAdapter([]),
streamEvents: async function* () {
yield { taskId: "other_task", type: "task.log", message: "wrong" };
}
};
const service = runtimeService(store, adapter);

const handle = await service.dispatchTask(sessionRequest());
const task = await waitForStatus(service, handle.taskId, "failed");

expect(task.error?.code).toBe("adapter.event_task_mismatch");
expect(await store.listEvents("other_task")).toHaveLength(0);
});
});

function sessionRequest(): DispatchRequest {
return {
provider: "aws",
accountProfile: "dev-aws",
capability: "agent-runtime",
taskType: "agent.run",
target: { mode: "session" },
input: { instruction: "run" }
};
}

function runtimeRequest(): DispatchRequest {
return { ...sessionRequest(), target: { mode: "runtime" } };
}

function runtimeService(store: TaskStore, adapter: BackendAdapter): RuntimeService {
return new RuntimeService({
config: { accounts: { "dev-aws": { provider: "aws", credentialSource: "aws-sdk-default" } }, backends: {} },
store,
adapters: [adapter]
});
}

async function waitForStatus(service: RuntimeService, taskId: string, status: string): Promise<any> {
for (let attempt = 0; attempt < 20; attempt += 1) {
const task = await service.getTaskStatus(taskId);
Expand Down
Loading