From aa66e080349523f8ae98cc6a0a4c486590cf7fa5 Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Wed, 18 Mar 2026 13:26:12 -0700 Subject: [PATCH 1/4] update adapter --- .../src/provider/Layers/ClaudeAdapter.test.ts | 196 ++++++++++++++++++ .../src/provider/Layers/ClaudeAdapter.ts | 149 ++++++++++++- 2 files changed, 343 insertions(+), 2 deletions(-) diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.test.ts b/apps/server/src/provider/Layers/ClaudeAdapter.test.ts index 556ea304d..3f1c85197 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.test.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.test.ts @@ -890,6 +890,65 @@ describe("ClaudeAdapterLive", () => { ); }); + it.effect("treats user-aborted Claude results as interrupted without a runtime error", () => { + const harness = makeHarness(); + return Effect.gen(function* () { + const adapter = yield* ClaudeAdapter; + + const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 6).pipe( + Stream.runCollect, + Effect.forkChild, + ); + + const session = yield* adapter.startSession({ + threadId: THREAD_ID, + provider: "claudeAgent", + runtimeMode: "full-access", + }); + + const turn = yield* adapter.sendTurn({ + threadId: session.threadId, + input: "hello", + attachments: [], + }); + + harness.query.emit({ + type: "result", + subtype: "error_during_execution", + is_error: false, + errors: ["Error: Request was aborted."], + stop_reason: "tool_use", + session_id: "sdk-session-abort", + uuid: "result-abort", + } as unknown as SDKMessage); + + const runtimeEvents = Array.from(yield* Fiber.join(runtimeEventsFiber)); + assert.deepEqual( + runtimeEvents.map((event) => event.type), + [ + "session.started", + "session.configured", + "session.state.changed", + "turn.started", + "thread.started", + "turn.completed", + ], + ); + + const turnCompleted = runtimeEvents[runtimeEvents.length - 1]; + assert.equal(turnCompleted?.type, "turn.completed"); + if (turnCompleted?.type === "turn.completed") { + assert.equal(String(turnCompleted.turnId), String(turn.turnId)); + assert.equal(turnCompleted.payload.state, "interrupted"); + assert.equal(turnCompleted.payload.errorMessage, "Error: Request was aborted."); + assert.equal(turnCompleted.payload.stopReason, "tool_use"); + } + }).pipe( + Effect.provideService(Random.Random, makeDeterministicRandomService()), + Effect.provide(harness.layer), + ); + }); + it.effect("forwards Claude task progress summaries for subagent updates", () => { const harness = makeHarness(); return Effect.gen(function* () { @@ -1841,6 +1900,143 @@ describe("ClaudeAdapterLive", () => { ); }); + it.effect("captures ExitPlanMode as a proposed plan and denies auto-exit", () => { + const harness = makeHarness(); + return Effect.gen(function* () { + const adapter = yield* ClaudeAdapter; + + const session = yield* adapter.startSession({ + threadId: THREAD_ID, + provider: "claudeAgent", + runtimeMode: "full-access", + }); + + yield* Stream.take(adapter.streamEvents, 3).pipe(Stream.runDrain); + + yield* adapter.sendTurn({ + threadId: session.threadId, + input: "plan this", + interactionMode: "plan", + attachments: [], + }); + yield* Stream.take(adapter.streamEvents, 1).pipe(Stream.runDrain); + + const createInput = harness.getLastCreateQueryInput(); + const canUseTool = createInput?.options.canUseTool; + assert.equal(typeof canUseTool, "function"); + if (!canUseTool) { + return; + } + + const permissionPromise = canUseTool( + "ExitPlanMode", + { + plan: "# Ship it\n\n- one\n- two", + allowedPrompts: [{ tool: "Bash", prompt: "run tests" }], + }, + { + signal: new AbortController().signal, + toolUseID: "tool-exit-1", + }, + ); + + const proposedEvent = yield* Stream.runHead(adapter.streamEvents); + assert.equal(proposedEvent._tag, "Some"); + if (proposedEvent._tag !== "Some") { + return; + } + assert.equal(proposedEvent.value.type, "turn.proposed.completed"); + if (proposedEvent.value.type !== "turn.proposed.completed") { + return; + } + assert.equal(proposedEvent.value.payload.planMarkdown, "# Ship it\n\n- one\n- two"); + assert.deepEqual(proposedEvent.value.providerRefs, { + providerItemId: ProviderItemId.makeUnsafe("tool-exit-1"), + }); + + const permissionResult = yield* Effect.promise(() => permissionPromise); + assert.equal((permissionResult as PermissionResult).behavior, "deny"); + const deniedResult = permissionResult as PermissionResult & { + message?: string; + }; + assert.equal(deniedResult.message?.includes("captured your proposed plan"), true); + }).pipe( + Effect.provideService(Random.Random, makeDeterministicRandomService()), + Effect.provide(harness.layer), + ); + }); + + it.effect("extracts proposed plans from assistant ExitPlanMode snapshots", () => { + const harness = makeHarness(); + return Effect.gen(function* () { + const adapter = yield* ClaudeAdapter; + + const session = yield* adapter.startSession({ + threadId: THREAD_ID, + provider: "claudeAgent", + runtimeMode: "full-access", + }); + + yield* Stream.take(adapter.streamEvents, 3).pipe(Stream.runDrain); + + yield* adapter.sendTurn({ + threadId: session.threadId, + input: "plan this", + interactionMode: "plan", + attachments: [], + }); + yield* Stream.take(adapter.streamEvents, 1).pipe(Stream.runDrain); + + const proposedEventFiber = yield* Stream.filter( + adapter.streamEvents, + (event) => event.type === "turn.proposed.completed", + ).pipe(Stream.runHead, Effect.forkChild); + + harness.query.emit({ + type: "assistant", + session_id: "sdk-session-exit-plan", + uuid: "assistant-exit-plan", + parent_tool_use_id: null, + message: { + model: "claude-opus-4-6", + id: "msg-exit-plan", + type: "message", + role: "assistant", + content: [ + { + type: "tool_use", + id: "tool-exit-2", + name: "ExitPlanMode", + input: { + plan: "# Final plan\n\n- capture it", + }, + }, + ], + stop_reason: null, + stop_sequence: null, + usage: {}, + }, + } as unknown as SDKMessage); + + const proposedEvent = yield* Fiber.join(proposedEventFiber); + assert.equal(proposedEvent._tag, "Some"); + if (proposedEvent._tag !== "Some") { + return; + } + assert.equal(proposedEvent.value.type, "turn.proposed.completed"); + if (proposedEvent.value.type !== "turn.proposed.completed") { + return; + } + assert.equal(proposedEvent.value.payload.planMarkdown, "# Final plan\n\n- capture it"); + assert.deepEqual(proposedEvent.value.providerRefs, { + providerItemId: ProviderItemId.makeUnsafe("tool-exit-2"), + }); + }).pipe( + Effect.provideService(Random.Random, makeDeterministicRandomService()), + Effect.provide(harness.layer), + ); + }); + it.effect("handles AskUserQuestion via user-input.requested/resolved lifecycle", () => { const harness = makeHarness(); return Effect.gen(function* () { diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.ts b/apps/server/src/provider/Layers/ClaudeAdapter.ts index 673f08620..d69d34e1a 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.ts @@ -88,6 +88,7 @@ interface ClaudeTurnState { readonly items: Array; readonly assistantTextBlocks: Map; readonly assistantTextBlockOrder: Array; + readonly capturedProposedPlanKeys: Set; nextSyntheticAssistantBlockIndex: number; } @@ -174,6 +175,27 @@ function toMessage(cause: unknown, fallback: string): string { return fallback; } +function resultErrorsText(result: SDKResultMessage): string { + return "errors" in result && Array.isArray(result.errors) + ? result.errors.join(" ").toLowerCase() + : ""; +} + +function isInterruptedResult(result: SDKResultMessage): boolean { + const errors = resultErrorsText(result); + if (errors.includes("interrupt")) { + return true; + } + + return ( + result.subtype === "error_during_execution" && + result.is_error === false && + (errors.includes("request was aborted") || + errors.includes("interrupted by user") || + errors.includes("aborted")) + ); +} + function asRuntimeItemId(value: string): RuntimeItemId { return RuntimeItemId.makeUnsafe(value); } @@ -384,8 +406,8 @@ function turnStatusFromResult(result: SDKResultMessage): ProviderRuntimeTurnStat return "completed"; } - const errors = result.errors.join(" ").toLowerCase(); - if (errors.includes("interrupt")) { + const errors = resultErrorsText(result); + if (isInterruptedResult(result)) { return "interrupted"; } if (errors.includes("cancel")) { @@ -474,6 +496,28 @@ function extractTextContent(value: unknown): string { return extractTextContent(record.content); } +function extractExitPlanModePlan(value: unknown): string | undefined { + if (!value || typeof value !== "object") { + return undefined; + } + + const record = value as { + plan?: unknown; + }; + return typeof record.plan === "string" && record.plan.trim().length > 0 + ? record.plan.trim() + : undefined; +} + +function exitPlanCaptureKey(input: { + readonly toolUseId?: string | undefined; + readonly planMarkdown: string; +}): string { + return input.toolUseId && input.toolUseId.length > 0 + ? `tool:${input.toolUseId}` + : `plan:${input.planMarkdown}`; +} + function tryParseJsonRecord(value: string): Record | undefined { try { const parsed = JSON.parse(value); @@ -1053,6 +1097,54 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) { }); }); + const emitProposedPlanCompleted = ( + context: ClaudeSessionContext, + input: { + readonly planMarkdown: string; + readonly toolUseId?: string | undefined; + readonly rawSource: "claude.sdk.message" | "claude.sdk.permission"; + readonly rawMethod: string; + readonly rawPayload: unknown; + }, + ): Effect.Effect => + Effect.gen(function* () { + const turnState = context.turnState; + const planMarkdown = input.planMarkdown.trim(); + if (!turnState || planMarkdown.length === 0) { + return; + } + + const captureKey = exitPlanCaptureKey({ + toolUseId: input.toolUseId, + planMarkdown, + }); + if (turnState.capturedProposedPlanKeys.has(captureKey)) { + return; + } + turnState.capturedProposedPlanKeys.add(captureKey); + + const stamp = yield* makeEventStamp(); + yield* offerRuntimeEvent({ + type: "turn.proposed.completed", + eventId: stamp.eventId, + provider: PROVIDER, + createdAt: stamp.createdAt, + threadId: context.session.threadId, + turnId: turnState.turnId, + payload: { + planMarkdown, + }, + providerRefs: nativeProviderRefs(context, { + providerItemId: input.toolUseId, + }), + raw: { + source: input.rawSource, + method: input.rawMethod, + payload: input.rawPayload, + }, + }); + }); + const completeTurn = ( context: ClaudeSessionContext, status: ProviderRuntimeTurnStatus, @@ -1506,6 +1598,7 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) { items: [], assistantTextBlocks: new Map(), assistantTextBlockOrder: [], + capturedProposedPlanKeys: new Set(), nextSyntheticAssistantBlockIndex: -1, }; context.session = { @@ -1535,6 +1628,35 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) { }); } + const content = message.message?.content; + if (Array.isArray(content)) { + for (const block of content) { + if (!block || typeof block !== "object") { + continue; + } + const toolUse = block as { + type?: unknown; + id?: unknown; + name?: unknown; + input?: unknown; + }; + if (toolUse.type !== "tool_use" || toolUse.name !== "ExitPlanMode") { + continue; + } + const planMarkdown = extractExitPlanModePlan(toolUse.input); + if (!planMarkdown) { + continue; + } + yield* emitProposedPlanCompleted(context, { + planMarkdown, + toolUseId: typeof toolUse.id === "string" ? toolUse.id : undefined, + rawSource: "claude.sdk.message", + rawMethod: "claude/assistant", + rawPayload: message, + }); + } + } + if (context.turnState) { context.turnState.items.push(message.message); yield* backfillAssistantTextBlocksFromSnapshot(context, message); @@ -2090,6 +2212,28 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) { return yield* handleAskUserQuestion(context, toolInput, callbackOptions); } + if (toolName === "ExitPlanMode") { + const planMarkdown = extractExitPlanModePlan(toolInput); + if (planMarkdown) { + yield* emitProposedPlanCompleted(context, { + planMarkdown, + toolUseId: callbackOptions.toolUseID, + rawSource: "claude.sdk.permission", + rawMethod: "canUseTool/ExitPlanMode", + rawPayload: { + toolName, + input: toolInput, + }, + }); + } + + return { + behavior: "deny", + message: + "The client captured your proposed plan. Stop here and wait for the user's feedback or implementation request in a later turn.", + } satisfies PermissionResult; + } + const runtimeMode = input.runtimeMode ?? "full-access"; if (runtimeMode === "full-access") { return { @@ -2405,6 +2549,7 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) { items: [], assistantTextBlocks: new Map(), assistantTextBlockOrder: [], + capturedProposedPlanKeys: new Set(), nextSyntheticAssistantBlockIndex: -1, }; From 654ca0da91eaca22d4f74d6af5670f00a8cf0fdd Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Wed, 18 Mar 2026 14:17:23 -0700 Subject: [PATCH 2/4] Backfill missing app settings defaults during decode - add decoding defaults in `AppSettingsSchema` so older persisted settings load safely - export shared `Schema.Literals` types for `EnvMode` and `TimestampFormat` - add a regression test covering pre-new-key settings hydration --- apps/web/src/appSettings.test.ts | 26 ++++++++++ apps/web/src/appSettings.ts | 52 ++++++++++--------- .../web/src/components/BranchToolbar.logic.ts | 4 +- 3 files changed, 56 insertions(+), 26 deletions(-) diff --git a/apps/web/src/appSettings.test.ts b/apps/web/src/appSettings.test.ts index 6ef0e58a2..605b281df 100644 --- a/apps/web/src/appSettings.test.ts +++ b/apps/web/src/appSettings.test.ts @@ -1,6 +1,8 @@ +import { Schema } from "effect"; import { describe, expect, it } from "vitest"; import { + AppSettingsSchema, DEFAULT_TIMESTAMP_FORMAT, getAppModelOptions, normalizeCustomModelSlugs, @@ -87,3 +89,27 @@ describe("provider-specific custom models", () => { expect(claudeOptions.some((option) => option.slug === "claude/custom-opus")).toBe(true); }); }); + +describe("AppSettingsSchema", () => { + it("fills decoding defaults for persisted settings that predate newer keys", () => { + const decode = Schema.decodeSync(Schema.fromJsonString(AppSettingsSchema)); + + expect( + decode( + JSON.stringify({ + codexBinaryPath: "/usr/local/bin/codex", + confirmThreadDelete: false, + }), + ), + ).toMatchObject({ + codexBinaryPath: "/usr/local/bin/codex", + codexHomePath: "", + defaultThreadEnvMode: "local", + confirmThreadDelete: false, + enableAssistantStreaming: false, + timestampFormat: DEFAULT_TIMESTAMP_FORMAT, + customCodexModels: [], + customClaudeModels: [], + }); + }); +}); diff --git a/apps/web/src/appSettings.ts b/apps/web/src/appSettings.ts index 27c88a694..e4f4d8b1c 100644 --- a/apps/web/src/appSettings.ts +++ b/apps/web/src/appSettings.ts @@ -3,41 +3,43 @@ import { Option, Schema } from "effect"; import { TrimmedNonEmptyString, type ProviderKind } from "@t3tools/contracts"; import { getDefaultModel, getModelOptions, normalizeModelSlug } from "@t3tools/shared/model"; import { useLocalStorage } from "./hooks/useLocalStorage"; +import { EnvMode } from "./components/BranchToolbar.logic"; const APP_SETTINGS_STORAGE_KEY = "t3code:app-settings:v1"; const MAX_CUSTOM_MODEL_COUNT = 32; export const MAX_CUSTOM_MODEL_LENGTH = 256; -export const TIMESTAMP_FORMAT_OPTIONS = ["locale", "12-hour", "24-hour"] as const; -export type TimestampFormat = (typeof TIMESTAMP_FORMAT_OPTIONS)[number]; + +export const TimestampFormat = Schema.Literals(["locale", "12-hour", "24-hour"]); +export type TimestampFormat = typeof TimestampFormat.Type; export const DEFAULT_TIMESTAMP_FORMAT: TimestampFormat = "locale"; + const BUILT_IN_MODEL_SLUGS_BY_PROVIDER: Record> = { codex: new Set(getModelOptions("codex").map((option) => option.slug)), claudeAgent: new Set(getModelOptions("claudeAgent").map((option) => option.slug)), }; -const AppSettingsSchema = Schema.Struct({ - codexBinaryPath: Schema.String.check(Schema.isMaxLength(4096)).pipe( - Schema.withConstructorDefault(() => Option.some("")), - ), - codexHomePath: Schema.String.check(Schema.isMaxLength(4096)).pipe( - Schema.withConstructorDefault(() => Option.some("")), - ), - defaultThreadEnvMode: Schema.Literals(["local", "worktree"]).pipe( - Schema.withConstructorDefault(() => Option.some("local")), - ), - confirmThreadDelete: Schema.Boolean.pipe(Schema.withConstructorDefault(() => Option.some(true))), - enableAssistantStreaming: Schema.Boolean.pipe( - Schema.withConstructorDefault(() => Option.some(false)), - ), - timestampFormat: Schema.Literals(["locale", "12-hour", "24-hour"]).pipe( - Schema.withConstructorDefault(() => Option.some(DEFAULT_TIMESTAMP_FORMAT)), - ), - customCodexModels: Schema.Array(Schema.String).pipe( - Schema.withConstructorDefault(() => Option.some([])), - ), - customClaudeModels: Schema.Array(Schema.String).pipe( - Schema.withConstructorDefault(() => Option.some([])), - ), +const withDefaults = + < + S extends Schema.Top & Schema.WithoutConstructorDefault, + D extends S["~type.make.in"] & S["Encoded"], + >( + fallback: () => D, + ) => + (schema: S) => + schema.pipe( + Schema.withConstructorDefault(() => Option.some(fallback())), + Schema.withDecodingDefault(() => fallback()), + ); + +export const AppSettingsSchema = Schema.Struct({ + codexBinaryPath: Schema.String.check(Schema.isMaxLength(4096)).pipe(withDefaults(() => "")), + codexHomePath: Schema.String.check(Schema.isMaxLength(4096)).pipe(withDefaults(() => "")), + defaultThreadEnvMode: EnvMode.pipe(withDefaults(() => "local" as const satisfies EnvMode)), + confirmThreadDelete: Schema.Boolean.pipe(withDefaults(() => true)), + enableAssistantStreaming: Schema.Boolean.pipe(withDefaults(() => false)), + timestampFormat: TimestampFormat.pipe(withDefaults(() => DEFAULT_TIMESTAMP_FORMAT)), + customCodexModels: Schema.Array(Schema.String).pipe(withDefaults(() => [])), + customClaudeModels: Schema.Array(Schema.String).pipe(withDefaults(() => [])), textGenerationModel: Schema.optional(TrimmedNonEmptyString), }); export type AppSettings = typeof AppSettingsSchema.Type; diff --git a/apps/web/src/components/BranchToolbar.logic.ts b/apps/web/src/components/BranchToolbar.logic.ts index 888c52cfd..2215569c8 100644 --- a/apps/web/src/components/BranchToolbar.logic.ts +++ b/apps/web/src/components/BranchToolbar.logic.ts @@ -1,6 +1,8 @@ import type { GitBranch } from "@t3tools/contracts"; +import { Schema } from "effect"; -export type EnvMode = "local" | "worktree"; +export const EnvMode = Schema.Literals(["local", "worktree"]); +export type EnvMode = typeof EnvMode.Type; export function resolveEffectiveEnvMode(input: { activeWorktreePath: string | null; From ef843d400ebcf5b3cd8ccdd5953cf0ce397d56fe Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Wed, 18 Mar 2026 15:47:34 -0700 Subject: [PATCH 3/4] maybe fix codex subturns --- apps/server/src/codexAppServerManager.test.ts | 187 ++++++++++++++++++ apps/server/src/codexAppServerManager.ts | 104 +++++++++- .../src/provider/Layers/CodexAdapter.test.ts | 39 ++++ .../src/provider/Layers/CodexAdapter.ts | 20 +- 4 files changed, 334 insertions(+), 16 deletions(-) diff --git a/apps/server/src/codexAppServerManager.test.ts b/apps/server/src/codexAppServerManager.test.ts index cea8df0a0..80323c744 100644 --- a/apps/server/src/codexAppServerManager.test.ts +++ b/apps/server/src/codexAppServerManager.test.ts @@ -37,6 +37,7 @@ function createSendTurnHarness() { planType: null, sparkEnabled: true, }, + collabReceiverTurns: new Map(), }; const requireSession = vi @@ -75,6 +76,7 @@ function createThreadControlHarness() { createdAt: "2026-02-10T00:00:00.000Z", updatedAt: "2026-02-10T00:00:00.000Z", }, + collabReceiverTurns: new Map(), }; const requireSession = vi @@ -117,6 +119,7 @@ function createPendingUserInputHarness() { }, ], ]), + collabReceiverTurns: new Map(), }; const requireSession = vi @@ -135,6 +138,43 @@ function createPendingUserInputHarness() { return { manager, context, requireSession, writeMessage, emitEvent }; } +function createCollabNotificationHarness() { + const manager = new CodexAppServerManager(); + const context = { + session: { + provider: "codex", + status: "running", + threadId: asThreadId("thread_1"), + runtimeMode: "full-access", + model: "gpt-5.3-codex", + activeTurnId: "turn_parent", + resumeCursor: { threadId: "provider_parent" }, + createdAt: "2026-02-10T00:00:00.000Z", + updatedAt: "2026-02-10T00:00:00.000Z", + }, + account: { + type: "unknown", + planType: null, + sparkEnabled: true, + }, + pending: new Map(), + pendingApprovals: new Map(), + pendingUserInputs: new Map(), + collabReceiverTurns: new Map(), + nextRequestId: 1, + stopping: false, + }; + + const emitEvent = vi + .spyOn(manager as unknown as { emitEvent: (...args: unknown[]) => void }, "emitEvent") + .mockImplementation(() => {}); + const updateSession = vi + .spyOn(manager as unknown as { updateSession: (...args: unknown[]) => void }, "updateSession") + .mockImplementation(() => {}); + + return { manager, context, emitEvent, updateSession }; +} + describe("classifyCodexStderrLine", () => { it("ignores empty lines", () => { expect(classifyCodexStderrLine(" ")).toBeNull(); @@ -721,6 +761,7 @@ describe("respondToUserInput", () => { }, pendingApprovals: new Map(), pendingUserInputs: new Map(), + collabReceiverTurns: new Map(), }; type ApprovalRequestContext = { session: typeof context.session; @@ -748,6 +789,152 @@ describe("respondToUserInput", () => { }); }); +describe("collab child conversation routing", () => { + it("rewrites child notification turn ids onto the parent turn", () => { + const { manager, context, emitEvent } = createCollabNotificationHarness(); + + ( + manager as unknown as { + handleServerNotification: (context: unknown, notification: Record) => void; + } + ).handleServerNotification(context, { + method: "item/completed", + params: { + item: { + type: "collabAgentToolCall", + id: "call_collab_1", + receiverThreadIds: ["child_provider_1"], + }, + threadId: "provider_parent", + turnId: "turn_parent", + }, + }); + + ( + manager as unknown as { + handleServerNotification: (context: unknown, notification: Record) => void; + } + ).handleServerNotification(context, { + method: "item/agentMessage/delta", + params: { + threadId: "child_provider_1", + turnId: "turn_child_1", + itemId: "msg_child_1", + delta: "working", + }, + }); + + expect(emitEvent).toHaveBeenLastCalledWith( + expect.objectContaining({ + method: "item/agentMessage/delta", + turnId: "turn_parent", + itemId: "msg_child_1", + }), + ); + }); + + it("suppresses child lifecycle notifications so they cannot replace the parent turn", () => { + const { manager, context, emitEvent, updateSession } = createCollabNotificationHarness(); + + ( + manager as unknown as { + handleServerNotification: (context: unknown, notification: Record) => void; + } + ).handleServerNotification(context, { + method: "item/completed", + params: { + item: { + type: "collabAgentToolCall", + id: "call_collab_1", + receiverThreadIds: ["child_provider_1"], + }, + threadId: "provider_parent", + turnId: "turn_parent", + }, + }); + emitEvent.mockClear(); + updateSession.mockClear(); + + ( + manager as unknown as { + handleServerNotification: (context: unknown, notification: Record) => void; + } + ).handleServerNotification(context, { + method: "turn/started", + params: { + threadId: "child_provider_1", + turn: { id: "turn_child_1" }, + }, + }); + + ( + manager as unknown as { + handleServerNotification: (context: unknown, notification: Record) => void; + } + ).handleServerNotification(context, { + method: "turn/completed", + params: { + threadId: "child_provider_1", + turn: { id: "turn_child_1", status: "completed" }, + }, + }); + + expect(emitEvent).not.toHaveBeenCalled(); + expect(updateSession).not.toHaveBeenCalled(); + }); + + it("rewrites child approval requests onto the parent turn", () => { + const { manager, context, emitEvent } = createCollabNotificationHarness(); + + ( + manager as unknown as { + handleServerNotification: (context: unknown, notification: Record) => void; + } + ).handleServerNotification(context, { + method: "item/completed", + params: { + item: { + type: "collabAgentToolCall", + id: "call_collab_1", + receiverThreadIds: ["child_provider_1"], + }, + threadId: "provider_parent", + turnId: "turn_parent", + }, + }); + emitEvent.mockClear(); + + ( + manager as unknown as { + handleServerRequest: (context: unknown, request: Record) => void; + } + ).handleServerRequest(context, { + id: 42, + method: "item/commandExecution/requestApproval", + params: { + threadId: "child_provider_1", + turnId: "turn_child_1", + itemId: "call_child_1", + command: "bun install", + }, + }); + + expect(Array.from(context.pendingApprovals.values())[0]).toEqual( + expect.objectContaining({ + turnId: "turn_parent", + itemId: "call_child_1", + }), + ); + expect(emitEvent).toHaveBeenCalledWith( + expect.objectContaining({ + method: "item/commandExecution/requestApproval", + turnId: "turn_parent", + itemId: "call_child_1", + }), + ); + }); +}); + describe.skipIf(!process.env.CODEX_BINARY_PATH)("startSession live Codex resume", () => { it("keeps prior thread history when resuming with a changed runtime mode", async () => { const workspaceDir = mkdtempSync(path.join(os.tmpdir(), "codex-live-resume-")); diff --git a/apps/server/src/codexAppServerManager.ts b/apps/server/src/codexAppServerManager.ts index a8a8ce460..0ac37db3e 100644 --- a/apps/server/src/codexAppServerManager.ts +++ b/apps/server/src/codexAppServerManager.ts @@ -70,6 +70,7 @@ interface CodexSessionContext { pending: Map; pendingApprovals: Map; pendingUserInputs: Map; + collabReceiverTurns: Map; nextRequestId: number; stopping: boolean; } @@ -571,6 +572,7 @@ export class CodexAppServerManager extends EventEmitter { const context = this.requireSession(input.threadId); + context.collabReceiverTurns.clear(); const turnInput: Array< { type: "text"; text: string; text_elements: [] } | { type: "image"; url: string } @@ -1121,7 +1124,16 @@ export class CodexAppServerManager extends EventEmitter (typeof value === "string" ? value : null)) + .filter((value): value is string => value !== null) ?? []; + for (const receiverThreadId of receiverThreadIds) { + context.collabReceiverTurns.set(receiverThreadId, parentTurnId); + } + } + + private shouldSuppressChildConversationNotification(method: string): boolean { + return ( + method === "thread/started" || + method === "thread/status/changed" || + method === "thread/archived" || + method === "thread/unarchived" || + method === "thread/closed" || + method === "thread/compacted" || + method === "thread/name/updated" || + method === "thread/tokenUsage/updated" || + method === "turn/started" || + method === "turn/completed" || + method === "turn/aborted" || + method === "turn/plan/updated" || + method === "item/plan/delta" + ); + } + private readObject(value: unknown, key?: string): Record | undefined { const target = key === undefined diff --git a/apps/server/src/provider/Layers/CodexAdapter.test.ts b/apps/server/src/provider/Layers/CodexAdapter.test.ts index a402fb4c7..31d394c3e 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.test.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.test.ts @@ -800,6 +800,45 @@ lifecycleLayer("CodexAdapterLive lifecycle", (it) => { } }), ); + + it.effect("prefers manager-assigned turn ids for Codex task events", () => + Effect.gen(function* () { + const adapter = yield* CodexAdapter; + const firstEventFiber = yield* Stream.runHead(adapter.streamEvents).pipe(Effect.forkChild); + + lifecycleManager.emit("event", { + id: asEventId("evt-codex-task-started-parent-turn"), + kind: "notification", + provider: "codex", + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-parent"), + createdAt: new Date().toISOString(), + method: "codex/event/task_started", + payload: { + id: "turn-child", + msg: { + type: "task_started", + turn_id: "turn-child", + collaboration_mode_kind: "default", + }, + conversationId: "child-provider-thread", + }, + } satisfies ProviderEvent); + + const firstEvent = yield* Fiber.join(firstEventFiber); + assert.equal(firstEvent._tag, "Some"); + if (firstEvent._tag !== "Some") { + return; + } + assert.equal(firstEvent.value.type, "task.started"); + if (firstEvent.value.type !== "task.started") { + return; + } + assert.equal(firstEvent.value.turnId, "turn-parent"); + assert.equal(firstEvent.value.providerRefs?.providerTurnId, "turn-parent"); + assert.equal(firstEvent.value.payload.taskId, "turn-child"); + }), + ); }); afterAll(() => { diff --git a/apps/server/src/provider/Layers/CodexAdapter.ts b/apps/server/src/provider/Layers/CodexAdapter.ts index 1e4b80ae9..f87284d12 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.ts @@ -109,6 +109,14 @@ function asNumber(value: unknown): number | undefined { return typeof value === "number" && Number.isFinite(value) ? value : undefined; } +function toTurnId(value: string | undefined): TurnId | undefined { + return value?.trim() ? TurnId.makeUnsafe(value) : undefined; +} + +function toProviderItemId(value: string | undefined): ProviderItemId | undefined { + return value?.trim() ? ProviderItemId.makeUnsafe(value) : undefined; +} + function toTurnStatus(value: unknown): "completed" | "failed" | "cancelled" | "interrupted" { switch (value) { case "completed": @@ -415,27 +423,27 @@ function codexEventBase( ): Omit { const payload = asObject(event.payload); const msg = codexEventMessage(payload); - const turnId = asString(msg?.turn_id) ?? asString(msg?.turnId); - const itemId = asString(msg?.item_id) ?? asString(msg?.itemId); + const turnId = event.turnId ?? toTurnId(asString(msg?.turn_id) ?? asString(msg?.turnId)); + const itemId = event.itemId ?? toProviderItemId(asString(msg?.item_id) ?? asString(msg?.itemId)); const requestId = asString(msg?.request_id) ?? asString(msg?.requestId); const base = runtimeEventBase(event, canonicalThreadId); const providerRefs = base.providerRefs ? { ...base.providerRefs, ...(turnId ? { providerTurnId: turnId } : {}), - ...(itemId ? { providerItemId: ProviderItemId.makeUnsafe(itemId) } : {}), + ...(itemId ? { providerItemId: itemId } : {}), ...(requestId ? { providerRequestId: requestId } : {}), } : { ...(turnId ? { providerTurnId: turnId } : {}), - ...(itemId ? { providerItemId: ProviderItemId.makeUnsafe(itemId) } : {}), + ...(itemId ? { providerItemId: itemId } : {}), ...(requestId ? { providerRequestId: requestId } : {}), }; return { ...base, - ...(turnId ? { turnId: TurnId.makeUnsafe(turnId) } : {}), - ...(itemId ? { itemId: asRuntimeItemId(ProviderItemId.makeUnsafe(itemId)) } : {}), + ...(turnId ? { turnId } : {}), + ...(itemId ? { itemId: asRuntimeItemId(itemId) } : {}), ...(requestId ? { requestId: asRuntimeRequestId(requestId) } : {}), ...(Object.keys(providerRefs).length > 0 ? { providerRefs } : {}), }; From e6d856b93e636f2d458f5d1c7cbf15b476e6ad3d Mon Sep 17 00:00:00 2001 From: Julius Marminge Date: Wed, 18 Mar 2026 15:54:55 -0700 Subject: [PATCH 4/4] Add subagent work units across providers Co-authored-by: codex --- .../Layers/CheckpointDiffQuery.test.ts | 1 + apps/server/src/codexAppServerManager.test.ts | 50 ++++ apps/server/src/codexAppServerManager.ts | 31 ++- .../orchestration/Layers/CheckpointReactor.ts | 3 + .../Layers/ProjectionPipeline.test.ts | 133 +++++++++++ .../Layers/ProjectionPipeline.ts | 93 ++++++++ .../Layers/ProjectionSnapshotQuery.test.ts | 65 ++++- .../Layers/ProjectionSnapshotQuery.ts | 67 ++++++ .../Layers/ProviderCommandReactor.test.ts | 1 + .../Layers/ProviderCommandReactor.ts | 1 + .../Layers/ProviderRuntimeIngestion.test.ts | 36 +++ .../Layers/ProviderRuntimeIngestion.ts | 223 ++++++++++++++++++ apps/server/src/orchestration/Schemas.ts | 2 + .../orchestration/commandInvariants.test.ts | 2 + apps/server/src/orchestration/decider.ts | 21 ++ .../src/orchestration/projector.test.ts | 85 +++++++ apps/server/src/orchestration/projector.ts | 47 ++++ .../Layers/ProjectionThreadActivities.ts | 5 + .../Layers/ProjectionThreadWorkUnits.ts | 163 +++++++++++++ apps/server/src/persistence/Migrations.ts | 2 + .../016_ProjectionThreadWorkUnits.ts | 88 +++++++ .../Services/ProjectionThreadActivities.ts | 2 + .../Services/ProjectionThreadWorkUnits.ts | 59 +++++ .../src/provider/Layers/ClaudeAdapter.test.ts | 78 ++++++ .../src/provider/Layers/ClaudeAdapter.ts | 25 ++ .../src/provider/Layers/CodexAdapter.test.ts | 3 + apps/web/src/components/ChatView.browser.tsx | 2 + .../components/KeybindingsToast.browser.tsx | 1 + apps/web/src/session-logic.test.ts | 1 + apps/web/src/store.test.ts | 1 + packages/contracts/src/baseSchemas.ts | 2 + packages/contracts/src/orchestration.test.ts | 44 ++++ packages/contracts/src/orchestration.ts | 62 +++++ 33 files changed, 1394 insertions(+), 5 deletions(-) create mode 100644 apps/server/src/persistence/Layers/ProjectionThreadWorkUnits.ts create mode 100644 apps/server/src/persistence/Migrations/016_ProjectionThreadWorkUnits.ts create mode 100644 apps/server/src/persistence/Services/ProjectionThreadWorkUnits.ts diff --git a/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts b/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts index 2f79ea9d5..f6616f970 100644 --- a/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts +++ b/apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts @@ -62,6 +62,7 @@ function makeSnapshot(input: { messages: [], activities: [], proposedPlans: [], + workUnits: [], checkpoints: [ { turnId: TurnId.makeUnsafe("turn-1"), diff --git a/apps/server/src/codexAppServerManager.test.ts b/apps/server/src/codexAppServerManager.test.ts index 80323c744..e5ab907a6 100644 --- a/apps/server/src/codexAppServerManager.test.ts +++ b/apps/server/src/codexAppServerManager.test.ts @@ -38,6 +38,7 @@ function createSendTurnHarness() { sparkEnabled: true, }, collabReceiverTurns: new Map(), + collabReceiverItems: new Map(), }; const requireSession = vi @@ -77,6 +78,7 @@ function createThreadControlHarness() { updatedAt: "2026-02-10T00:00:00.000Z", }, collabReceiverTurns: new Map(), + collabReceiverItems: new Map(), }; const requireSession = vi @@ -120,6 +122,7 @@ function createPendingUserInputHarness() { ], ]), collabReceiverTurns: new Map(), + collabReceiverItems: new Map(), }; const requireSession = vi @@ -161,6 +164,7 @@ function createCollabNotificationHarness() { pendingApprovals: new Map(), pendingUserInputs: new Map(), collabReceiverTurns: new Map(), + collabReceiverItems: new Map(), nextRequestId: 1, stopping: false, }; @@ -762,6 +766,7 @@ describe("respondToUserInput", () => { pendingApprovals: new Map(), pendingUserInputs: new Map(), collabReceiverTurns: new Map(), + collabReceiverItems: new Map(), }; type ApprovalRequestContext = { session: typeof context.session; @@ -933,6 +938,51 @@ describe("collab child conversation routing", () => { }), ); }); + + it("rewrites child task notifications onto the parent collab item when no child item id exists", () => { + const { manager, context, emitEvent } = createCollabNotificationHarness(); + + ( + manager as unknown as { + handleServerNotification: (context: unknown, notification: Record) => void; + } + ).handleServerNotification(context, { + method: "item/completed", + params: { + item: { + type: "collabAgentToolCall", + id: "call_collab_1", + receiverThreadIds: ["child_provider_1"], + }, + threadId: "provider_parent", + turnId: "turn_parent", + }, + }); + emitEvent.mockClear(); + + ( + manager as unknown as { + handleServerNotification: (context: unknown, notification: Record) => void; + } + ).handleServerNotification(context, { + method: "codex/event/task_started", + params: { + threadId: "child_provider_1", + msg: { + type: "task_started", + turn_id: "turn_child_1", + }, + }, + }); + + expect(emitEvent).toHaveBeenCalledWith( + expect.objectContaining({ + method: "codex/event/task_started", + turnId: "turn_parent", + itemId: "call_collab_1", + }), + ); + }); }); describe.skipIf(!process.env.CODEX_BINARY_PATH)("startSession live Codex resume", () => { diff --git a/apps/server/src/codexAppServerManager.ts b/apps/server/src/codexAppServerManager.ts index 0ac37db3e..dcc6e7faa 100644 --- a/apps/server/src/codexAppServerManager.ts +++ b/apps/server/src/codexAppServerManager.ts @@ -71,6 +71,7 @@ interface CodexSessionContext { pendingApprovals: Map; pendingUserInputs: Map; collabReceiverTurns: Map; + collabReceiverItems: Map; nextRequestId: number; stopping: boolean; } @@ -573,6 +574,7 @@ export class CodexAppServerManager extends EventEmitter { const context = this.requireSession(input.threadId); context.collabReceiverTurns.clear(); + context.collabReceiverItems.clear(); const turnInput: Array< { type: "text"; text: string; text_elements: [] } | { type: "image"; url: string } @@ -1127,7 +1130,9 @@ export class CodexAppServerManager extends EventEmitter value !== null) ?? []; for (const receiverThreadId of receiverThreadIds) { context.collabReceiverTurns.set(receiverThreadId, parentTurnId); + if (parentItemId) { + context.collabReceiverItems.set(receiverThreadId, parentItemId); + } } } diff --git a/apps/server/src/orchestration/Layers/CheckpointReactor.ts b/apps/server/src/orchestration/Layers/CheckpointReactor.ts index ab38c1033..004c493bc 100644 --- a/apps/server/src/orchestration/Layers/CheckpointReactor.ts +++ b/apps/server/src/orchestration/Layers/CheckpointReactor.ts @@ -89,6 +89,7 @@ const make = Effect.gen(function* () { detail: input.detail, }, turnId: null, + workUnitId: null, createdAt: input.createdAt, }, createdAt: input.createdAt, @@ -113,6 +114,7 @@ const make = Effect.gen(function* () { detail: input.detail, }, turnId: input.turnId, + workUnitId: null, createdAt: input.createdAt, }, createdAt: input.createdAt, @@ -313,6 +315,7 @@ const make = Effect.gen(function* () { status: input.status, }, turnId: input.turnId, + workUnitId: null, createdAt: input.createdAt, }, createdAt: input.createdAt, diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts index 5fbe3016f..c35c7b2fb 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.test.ts @@ -7,6 +7,7 @@ import { ProjectId, ThreadId, TurnId, + WorkUnitId, } from "@t3tools/contracts"; import * as NodeServices from "@effect/platform-node/NodeServices"; import { assert, it } from "@effect/vitest"; @@ -175,6 +176,138 @@ projectionLayer("OrchestrationProjectionPipeline", (it) => { }), ); + it.effect("projects work units and activity work-unit links", () => + Effect.gen(function* () { + const projectionPipeline = yield* OrchestrationProjectionPipeline; + const eventStore = yield* OrchestrationEventStore; + const sql = yield* SqlClient.SqlClient; + const now = new Date().toISOString(); + + yield* eventStore.append({ + type: "thread.created", + eventId: EventId.makeUnsafe("evt-work-unit-thread"), + aggregateKind: "thread", + aggregateId: ThreadId.makeUnsafe("thread-work-unit"), + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-work-unit-thread"), + causationEventId: null, + correlationId: CommandId.makeUnsafe("cmd-work-unit-thread"), + metadata: {}, + payload: { + threadId: ThreadId.makeUnsafe("thread-work-unit"), + projectId: ProjectId.makeUnsafe("project-1"), + title: "Thread work unit", + model: "gpt-5-codex", + runtimeMode: "full-access", + branch: null, + worktreePath: null, + createdAt: now, + updatedAt: now, + }, + }); + + yield* eventStore.append({ + type: "thread.work-unit-upserted", + eventId: EventId.makeUnsafe("evt-work-unit-upsert"), + aggregateKind: "thread", + aggregateId: ThreadId.makeUnsafe("thread-work-unit"), + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-work-unit-upsert"), + causationEventId: null, + correlationId: CommandId.makeUnsafe("cmd-work-unit-upsert"), + metadata: {}, + payload: { + threadId: ThreadId.makeUnsafe("thread-work-unit"), + workUnit: { + id: WorkUnitId.makeUnsafe("wu:thread-work-unit:turn:turn-1:root"), + turnId: TurnId.makeUnsafe("turn-1"), + parentWorkUnitId: null, + kind: "primary_agent", + state: "running", + title: "Primary agent", + detail: null, + spawnedByActivityId: null, + providerRefs: { + providerTurnId: "provider-turn-1", + }, + startedAt: now, + updatedAt: now, + completedAt: null, + }, + }, + }); + + yield* eventStore.append({ + type: "thread.activity-appended", + eventId: EventId.makeUnsafe("evt-work-unit-activity"), + aggregateKind: "thread", + aggregateId: ThreadId.makeUnsafe("thread-work-unit"), + occurredAt: now, + commandId: CommandId.makeUnsafe("cmd-work-unit-activity"), + causationEventId: null, + correlationId: CommandId.makeUnsafe("cmd-work-unit-activity"), + metadata: {}, + payload: { + threadId: ThreadId.makeUnsafe("thread-work-unit"), + activity: { + id: EventId.makeUnsafe("activity-work-unit"), + tone: "info", + kind: "task.progress", + summary: "Reasoning update", + payload: { + detail: "Looking around", + }, + turnId: TurnId.makeUnsafe("turn-1"), + workUnitId: WorkUnitId.makeUnsafe("wu:thread-work-unit:turn:turn-1:root"), + createdAt: now, + }, + }, + }); + + yield* projectionPipeline.bootstrap; + + const workUnitRows = yield* sql<{ + readonly workUnitId: string; + readonly turnId: string; + readonly state: string; + readonly providerRefsJson: string | null; + }>` + SELECT + work_unit_id AS "workUnitId", + turn_id AS "turnId", + state, + provider_refs_json AS "providerRefsJson" + FROM projection_thread_work_units + WHERE thread_id = 'thread-work-unit' + `; + assert.deepEqual(workUnitRows, [ + { + workUnitId: "wu:thread-work-unit:turn:turn-1:root", + turnId: "turn-1", + state: "running", + providerRefsJson: '{"providerTurnId":"provider-turn-1"}', + }, + ]); + + const activityRows = yield* sql<{ + readonly activityId: string; + readonly workUnitId: string | null; + }>` + SELECT + activity_id AS "activityId", + work_unit_id AS "workUnitId" + FROM projection_thread_activities + WHERE thread_id = 'thread-work-unit' + `; + assert.deepEqual(activityRows, [ + { + activityId: "activity-work-unit", + workUnitId: "wu:thread-work-unit:turn:turn-1:root", + }, + ]); + }), + ); + it.effect("stores message attachment references without mutating payloads", () => Effect.sync(() => fs.mkdtempSync(path.join(os.tmpdir(), "t3-projection-attachments-"))).pipe( Effect.flatMap((stateDir) => diff --git a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts index d46764cc8..5c9d73a8e 100644 --- a/apps/server/src/orchestration/Layers/ProjectionPipeline.ts +++ b/apps/server/src/orchestration/Layers/ProjectionPipeline.ts @@ -22,6 +22,10 @@ import { type ProjectionThreadProposedPlan, ProjectionThreadProposedPlanRepository, } from "../../persistence/Services/ProjectionThreadProposedPlans.ts"; +import { + type ProjectionThreadWorkUnit, + ProjectionThreadWorkUnitRepository, +} from "../../persistence/Services/ProjectionThreadWorkUnits.ts"; import { ProjectionThreadSessionRepository } from "../../persistence/Services/ProjectionThreadSessions.ts"; import { type ProjectionTurn, @@ -34,6 +38,7 @@ import { ProjectionStateRepositoryLive } from "../../persistence/Layers/Projecti import { ProjectionThreadActivityRepositoryLive } from "../../persistence/Layers/ProjectionThreadActivities.ts"; import { ProjectionThreadMessageRepositoryLive } from "../../persistence/Layers/ProjectionThreadMessages.ts"; import { ProjectionThreadProposedPlanRepositoryLive } from "../../persistence/Layers/ProjectionThreadProposedPlans.ts"; +import { ProjectionThreadWorkUnitRepositoryLive } from "../../persistence/Layers/ProjectionThreadWorkUnits.ts"; import { ProjectionThreadSessionRepositoryLive } from "../../persistence/Layers/ProjectionThreadSessions.ts"; import { ProjectionTurnRepositoryLive } from "../../persistence/Layers/ProjectionTurns.ts"; import { ProjectionThreadRepositoryLive } from "../../persistence/Layers/ProjectionThreads.ts"; @@ -55,6 +60,7 @@ export const ORCHESTRATION_PROJECTOR_NAMES = { threadMessages: "projection.thread-messages", threadProposedPlans: "projection.thread-proposed-plans", threadActivities: "projection.thread-activities", + threadWorkUnits: "projection.thread-work-units", threadSessions: "projection.thread-sessions", threadTurns: "projection.thread-turns", checkpoints: "projection.checkpoints", @@ -214,6 +220,28 @@ function retainProjectionProposedPlansAfterRevert( ); } +function retainProjectionWorkUnitsAfterRevert( + workUnits: ReadonlyArray, + turns: ReadonlyArray, + turnCount: number, +): ReadonlyArray { + const retainedTurnIds = new Set( + turns + .filter( + (turn) => + turn.turnId !== null && + turn.checkpointTurnCount !== null && + turn.checkpointTurnCount <= turnCount, + ) + .flatMap((turn) => (turn.turnId === null ? [] : [turn.turnId])), + ); + const retained = workUnits.filter((workUnit) => retainedTurnIds.has(workUnit.turnId)); + const retainedIds = new Set(retained.map((workUnit) => workUnit.workUnitId)); + return retained.filter( + (workUnit) => workUnit.parentWorkUnitId === null || retainedIds.has(workUnit.parentWorkUnitId), + ); +} + function collectThreadAttachmentRelativePaths( threadId: string, messages: ReadonlyArray, @@ -346,6 +374,7 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { const projectionThreadMessageRepository = yield* ProjectionThreadMessageRepository; const projectionThreadProposedPlanRepository = yield* ProjectionThreadProposedPlanRepository; const projectionThreadActivityRepository = yield* ProjectionThreadActivityRepository; + const projectionThreadWorkUnitRepository = yield* ProjectionThreadWorkUnitRepository; const projectionThreadSessionRepository = yield* ProjectionThreadSessionRepository; const projectionTurnRepository = yield* ProjectionTurnRepository; const projectionPendingApprovalRepository = yield* ProjectionPendingApprovalRepository; @@ -705,6 +734,7 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { activityId: event.payload.activity.id, threadId: event.payload.threadId, turnId: event.payload.activity.turnId, + workUnitId: event.payload.activity.workUnitId, tone: event.payload.activity.tone, kind: event.payload.activity.kind, summary: event.payload.activity.summary, @@ -748,6 +778,64 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { } }); + const applyThreadWorkUnitsProjection: ProjectorDefinition["apply"] = ( + event, + _attachmentSideEffects, + ) => + Effect.gen(function* () { + switch (event.type) { + case "thread.work-unit-upserted": + yield* projectionThreadWorkUnitRepository.upsert({ + workUnitId: event.payload.workUnit.id, + threadId: event.payload.threadId, + turnId: event.payload.workUnit.turnId, + parentWorkUnitId: event.payload.workUnit.parentWorkUnitId, + kind: event.payload.workUnit.kind, + state: event.payload.workUnit.state, + title: event.payload.workUnit.title, + detail: event.payload.workUnit.detail, + spawnedByActivityId: event.payload.workUnit.spawnedByActivityId, + ...(event.payload.workUnit.providerRefs !== undefined + ? { providerRefs: event.payload.workUnit.providerRefs } + : {}), + startedAt: event.payload.workUnit.startedAt, + updatedAt: event.payload.workUnit.updatedAt, + completedAt: event.payload.workUnit.completedAt, + }); + return; + + case "thread.reverted": { + const existingRows = yield* projectionThreadWorkUnitRepository.listByThreadId({ + threadId: event.payload.threadId, + }); + if (existingRows.length === 0) { + return; + } + const existingTurns = yield* projectionTurnRepository.listByThreadId({ + threadId: event.payload.threadId, + }); + const keptRows = retainProjectionWorkUnitsAfterRevert( + existingRows, + existingTurns, + event.payload.turnCount, + ); + if (keptRows.length === existingRows.length) { + return; + } + yield* projectionThreadWorkUnitRepository.deleteByThreadId({ + threadId: event.payload.threadId, + }); + yield* Effect.forEach(keptRows, projectionThreadWorkUnitRepository.upsert, { + concurrency: 1, + }).pipe(Effect.asVoid); + return; + } + + default: + return; + } + }); + const applyThreadSessionsProjection: ProjectorDefinition["apply"] = ( event, _attachmentSideEffects, @@ -1134,6 +1222,10 @@ const makeOrchestrationProjectionPipeline = Effect.gen(function* () { name: ORCHESTRATION_PROJECTOR_NAMES.threadActivities, apply: applyThreadActivitiesProjection, }, + { + name: ORCHESTRATION_PROJECTOR_NAMES.threadWorkUnits, + apply: applyThreadWorkUnitsProjection, + }, { name: ORCHESTRATION_PROJECTOR_NAMES.threadSessions, apply: applyThreadSessionsProjection, @@ -1251,6 +1343,7 @@ export const OrchestrationProjectionPipelineLive = Layer.effect( Layer.provideMerge(ProjectionThreadMessageRepositoryLive), Layer.provideMerge(ProjectionThreadProposedPlanRepositoryLive), Layer.provideMerge(ProjectionThreadActivityRepositoryLive), + Layer.provideMerge(ProjectionThreadWorkUnitRepositoryLive), Layer.provideMerge(ProjectionThreadSessionRepositoryLive), Layer.provideMerge(ProjectionTurnRepositoryLive), Layer.provideMerge(ProjectionPendingApprovalRepositoryLive), diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts index b5b73fd6e..638c6d1c0 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.test.ts @@ -1,4 +1,12 @@ -import { CheckpointRef, EventId, MessageId, ProjectId, ThreadId, TurnId } from "@t3tools/contracts"; +import { + CheckpointRef, + EventId, + MessageId, + ProjectId, + ThreadId, + TurnId, + WorkUnitId, +} from "@t3tools/contracts"; import { assert, it } from "@effect/vitest"; import { Effect, Layer } from "effect"; import * as SqlClient from "effect/unstable/sql/SqlClient"; @@ -13,6 +21,7 @@ const asTurnId = (value: string): TurnId => TurnId.makeUnsafe(value); const asMessageId = (value: string): MessageId => MessageId.makeUnsafe(value); const asEventId = (value: string): EventId => EventId.makeUnsafe(value); const asCheckpointRef = (value: string): CheckpointRef => CheckpointRef.makeUnsafe(value); +const asWorkUnitId = (value: string): WorkUnitId => WorkUnitId.makeUnsafe(value); const projectionSnapshotLayer = it.layer( OrchestrationProjectionSnapshotQueryLive.pipe(Layer.provideMerge(SqlitePersistenceMemory)), @@ -130,6 +139,7 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { activity_id, thread_id, turn_id, + work_unit_id, tone, kind, summary, @@ -140,6 +150,7 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { 'activity-1', 'thread-1', 'turn-1', + 'wu:thread-1:turn:turn-1:root', 'info', 'runtime.note', 'provider started', @@ -148,6 +159,39 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { ) `; + yield* sql` + INSERT INTO projection_thread_work_units ( + work_unit_id, + thread_id, + turn_id, + parent_work_unit_id, + kind, + state, + title, + detail, + spawned_by_activity_id, + provider_refs_json, + started_at, + updated_at, + completed_at + ) + VALUES ( + 'wu:thread-1:turn:turn-1:root', + 'thread-1', + 'turn-1', + NULL, + 'primary_agent', + 'completed', + 'Primary agent', + NULL, + NULL, + '{"providerTurnId":"provider-turn-1"}', + '2026-02-24T00:00:08.000Z', + '2026-02-24T00:00:08.000Z', + '2026-02-24T00:00:08.000Z' + ) + `; + yield* sql` INSERT INTO projection_thread_sessions ( thread_id, @@ -304,9 +348,28 @@ projectionSnapshotLayer("ProjectionSnapshotQuery", (it) => { summary: "provider started", payload: { stage: "start" }, turnId: asTurnId("turn-1"), + workUnitId: asWorkUnitId("wu:thread-1:turn:turn-1:root"), createdAt: "2026-02-24T00:00:06.000Z", }, ], + workUnits: [ + { + id: asWorkUnitId("wu:thread-1:turn:turn-1:root"), + turnId: asTurnId("turn-1"), + parentWorkUnitId: null, + kind: "primary_agent", + state: "completed", + title: "Primary agent", + detail: null, + spawnedByActivityId: null, + providerRefs: { + providerTurnId: "provider-turn-1", + }, + startedAt: "2026-02-24T00:00:08.000Z", + updatedAt: "2026-02-24T00:00:08.000Z", + completedAt: "2026-02-24T00:00:08.000Z", + }, + ], checkpoints: [ { turnId: asTurnId("turn-1"), diff --git a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts index 849d2fa3b..a55cda290 100644 --- a/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts +++ b/apps/server/src/orchestration/Layers/ProjectionSnapshotQuery.ts @@ -17,6 +17,7 @@ import { type OrchestrationSession, type OrchestrationThread, type OrchestrationThreadActivity, + type OrchestrationWorkUnit, } from "@t3tools/contracts"; import { Effect, Layer, Schema, Struct } from "effect"; import * as SqlClient from "effect/unstable/sql/SqlClient"; @@ -34,6 +35,7 @@ import { ProjectionState } from "../../persistence/Services/ProjectionState.ts"; import { ProjectionThreadActivity } from "../../persistence/Services/ProjectionThreadActivities.ts"; import { ProjectionThreadMessage } from "../../persistence/Services/ProjectionThreadMessages.ts"; import { ProjectionThreadProposedPlan } from "../../persistence/Services/ProjectionThreadProposedPlans.ts"; +import { ProjectionThreadWorkUnit } from "../../persistence/Services/ProjectionThreadWorkUnits.ts"; import { ProjectionThreadSession } from "../../persistence/Services/ProjectionThreadSessions.ts"; import { ProjectionThread } from "../../persistence/Services/ProjectionThreads.ts"; import { ORCHESTRATION_PROJECTOR_NAMES } from "./ProjectionPipeline.ts"; @@ -62,6 +64,13 @@ const ProjectionThreadActivityDbRowSchema = ProjectionThreadActivity.mapFields( sequence: Schema.NullOr(NonNegativeInt), }), ); +const ProjectionThreadWorkUnitDbRowSchema = ProjectionThreadWorkUnit.mapFields( + Struct.assign({ + providerRefs: Schema.NullOr( + Schema.fromJsonString(ProjectionThreadWorkUnit.fields.providerRefs), + ), + }), +); const ProjectionThreadSessionDbRowSchema = ProjectionThreadSession; const ProjectionCheckpointDbRowSchema = ProjectionCheckpoint.mapFields( Struct.assign({ @@ -87,6 +96,7 @@ const REQUIRED_SNAPSHOT_PROJECTORS = [ ORCHESTRATION_PROJECTOR_NAMES.threadMessages, ORCHESTRATION_PROJECTOR_NAMES.threadProposedPlans, ORCHESTRATION_PROJECTOR_NAMES.threadActivities, + ORCHESTRATION_PROJECTOR_NAMES.threadWorkUnits, ORCHESTRATION_PROJECTOR_NAMES.threadSessions, ORCHESTRATION_PROJECTOR_NAMES.checkpoints, ] as const; @@ -222,6 +232,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { activity_id AS "activityId", thread_id AS "threadId", turn_id AS "turnId", + work_unit_id AS "workUnitId", tone, kind, summary, @@ -238,6 +249,30 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { `, }); + const listThreadWorkUnitRows = SqlSchema.findAll({ + Request: Schema.Void, + Result: ProjectionThreadWorkUnitDbRowSchema, + execute: () => + sql` + SELECT + work_unit_id AS "workUnitId", + thread_id AS "threadId", + turn_id AS "turnId", + parent_work_unit_id AS "parentWorkUnitId", + kind, + state, + title, + detail, + spawned_by_activity_id AS "spawnedByActivityId", + provider_refs_json AS "providerRefs", + started_at AS "startedAt", + updated_at AS "updatedAt", + completed_at AS "completedAt" + FROM projection_thread_work_units + ORDER BY thread_id ASC, started_at ASC, work_unit_id ASC + `, + }); + const listThreadSessionRows = SqlSchema.findAll({ Request: Schema.Void, Result: ProjectionThreadSessionDbRowSchema, @@ -322,6 +357,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { messageRows, proposedPlanRows, activityRows, + workUnitRows, sessionRows, checkpointRows, latestTurnRows, @@ -367,6 +403,14 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { ), ), ), + listThreadWorkUnitRows(undefined).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionSnapshotQuery.getSnapshot:listThreadWorkUnits:query", + "ProjectionSnapshotQuery.getSnapshot:listThreadWorkUnits:decodeRows", + ), + ), + ), listThreadSessionRows(undefined).pipe( Effect.mapError( toPersistenceSqlOrDecodeError( @@ -404,6 +448,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { const messagesByThread = new Map>(); const proposedPlansByThread = new Map>(); const activitiesByThread = new Map>(); + const workUnitsByThread = new Map>(); const checkpointsByThread = new Map>(); const sessionsByThread = new Map(); const latestTurnByThread = new Map(); @@ -461,12 +506,33 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { summary: row.summary, payload: row.payload, turnId: row.turnId, + workUnitId: row.workUnitId, ...(row.sequence !== null ? { sequence: row.sequence } : {}), createdAt: row.createdAt, }); activitiesByThread.set(row.threadId, threadActivities); } + for (const row of workUnitRows) { + updatedAt = maxIso(updatedAt, row.updatedAt); + const threadWorkUnits = workUnitsByThread.get(row.threadId) ?? []; + threadWorkUnits.push({ + id: row.workUnitId, + turnId: row.turnId, + parentWorkUnitId: row.parentWorkUnitId, + kind: row.kind, + state: row.state, + title: row.title, + detail: row.detail, + spawnedByActivityId: row.spawnedByActivityId, + ...(row.providerRefs !== null ? { providerRefs: row.providerRefs } : {}), + startedAt: row.startedAt, + updatedAt: row.updatedAt, + completedAt: row.completedAt, + }); + workUnitsByThread.set(row.threadId, threadWorkUnits); + } + for (const row of checkpointRows) { updatedAt = maxIso(updatedAt, row.completedAt); const threadCheckpoints = checkpointsByThread.get(row.threadId) ?? []; @@ -558,6 +624,7 @@ const makeProjectionSnapshotQuery = Effect.gen(function* () { messages: messagesByThread.get(row.threadId) ?? [], proposedPlans: proposedPlansByThread.get(row.threadId) ?? [], activities: activitiesByThread.get(row.threadId) ?? [], + workUnits: workUnitsByThread.get(row.threadId) ?? [], checkpoints: checkpointsByThread.get(row.threadId) ?? [], session: sessionsByThread.get(row.threadId) ?? null, })); diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts index 65fd44130..7bd7926a5 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts @@ -1097,6 +1097,7 @@ describe("ProviderCommandReactor", () => { requestKind: "command", }, turnId: null, + workUnitId: null, createdAt: now, }, createdAt: now, diff --git a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts index d190b97c6..f245727d5 100644 --- a/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts +++ b/apps/server/src/orchestration/Layers/ProviderCommandReactor.ts @@ -173,6 +173,7 @@ const make = Effect.gen(function* () { ...(input.requestId ? { requestId: input.requestId } : {}), }, turnId: input.turnId, + workUnitId: null, createdAt: input.createdAt, }, createdAt: input.createdAt, diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index c1ba48108..080a430c1 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -125,6 +125,7 @@ type ProviderRuntimeTestThread = ProviderRuntimeTestReadModel["threads"][number] type ProviderRuntimeTestMessage = ProviderRuntimeTestThread["messages"][number]; type ProviderRuntimeTestProposedPlan = ProviderRuntimeTestThread["proposedPlans"][number]; type ProviderRuntimeTestActivity = ProviderRuntimeTestThread["activities"][number]; +type ProviderRuntimeTestWorkUnit = ProviderRuntimeTestThread["workUnits"][number]; type ProviderRuntimeTestCheckpoint = ProviderRuntimeTestThread["checkpoints"][number]; describe("ProviderRuntimeIngestion", () => { @@ -1909,6 +1910,7 @@ describe("ProviderRuntimeIngestion", () => { createdAt: now, threadId: asThreadId("thread-1"), turnId: asTurnId("turn-task-1"), + itemId: "call-collab-1", payload: { taskId: "turn-task-1", taskType: "plan", @@ -1922,6 +1924,7 @@ describe("ProviderRuntimeIngestion", () => { createdAt: now, threadId: asThreadId("thread-1"), turnId: asTurnId("turn-task-1"), + itemId: "call-collab-1", payload: { taskId: "turn-task-1", description: "Comparing the desktop rollout chunks to the app-server stream.", @@ -1936,6 +1939,7 @@ describe("ProviderRuntimeIngestion", () => { createdAt: now, threadId: asThreadId("thread-1"), turnId: asTurnId("turn-task-1"), + itemId: "call-collab-1", payload: { taskId: "turn-task-1", status: "completed", @@ -1994,6 +1998,38 @@ describe("ProviderRuntimeIngestion", () => { ); expect(completed?.kind).toBe("task.completed"); expect(completedPayload?.detail).toBe("\n# Plan title\n"); + expect(started?.workUnitId).toBe("wu:thread-1:turn:turn-task-1:task:turn-task-1"); + expect(progress?.workUnitId).toBe("wu:thread-1:turn:turn-task-1:task:turn-task-1"); + expect(completed?.workUnitId).toBe("wu:thread-1:turn:turn-task-1:task:turn-task-1"); + expect( + thread.workUnits.find( + (entry: ProviderRuntimeTestWorkUnit) => entry.id === "wu:thread-1:turn:turn-task-1:root", + ), + ).toMatchObject({ + id: "wu:thread-1:turn:turn-task-1:root", + turnId: "turn-task-1", + parentWorkUnitId: null, + kind: "primary_agent", + state: "running", + title: "Primary agent", + }); + expect( + thread.workUnits.find( + (entry: ProviderRuntimeTestWorkUnit) => + entry.id === "wu:thread-1:turn:turn-task-1:task:turn-task-1", + ), + ).toMatchObject({ + id: "wu:thread-1:turn:turn-task-1:task:turn-task-1", + turnId: "turn-task-1", + parentWorkUnitId: "wu:thread-1:turn:turn-task-1:root", + kind: "delegated_agent", + state: "completed", + title: "\n# Plan title\n", + providerRefs: { + runtimeTaskId: "turn-task-1", + runtimeItemId: "call-collab-1", + }, + }); expect( thread.proposedPlans.find( (entry: ProviderRuntimeTestProposedPlan) => entry.id === "plan:thread-1:turn:turn-task-1", diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 3df47941a..4d9e52517 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -7,8 +7,11 @@ import { type OrchestrationProposedPlanId, CheckpointRef, isToolLifecycleItemType, + RuntimeTaskId, ThreadId, TurnId, + WorkUnitId, + type OrchestrationThread, type OrchestrationThreadActivity, type ProviderRuntimeEvent, } from "@t3tools/contracts"; @@ -29,6 +32,10 @@ import { const providerTurnKey = (threadId: ThreadId, turnId: TurnId) => `${threadId}:${turnId}`; const providerCommandId = (event: ProviderRuntimeEvent, tag: string): CommandId => CommandId.makeUnsafe(`provider:${event.eventId}:${tag}:${crypto.randomUUID()}`); +const rootWorkUnitIdForTurn = (threadId: ThreadId, turnId: TurnId): WorkUnitId => + WorkUnitId.makeUnsafe(`wu:${threadId}:turn:${turnId}:root`); +const taskWorkUnitIdForTurn = (threadId: ThreadId, turnId: TurnId, taskId: string): WorkUnitId => + WorkUnitId.makeUnsafe(`wu:${threadId}:turn:${turnId}:task:${taskId}`); const DEFAULT_ASSISTANT_DELIVERY_MODE: AssistantDeliveryMode = "buffered"; const TURN_MESSAGE_IDS_BY_TURN_CACHE_CAPACITY = 10_000; @@ -177,6 +184,57 @@ function requestKindFromCanonicalRequestType( } } +function workUnitIdForRuntimeEvent(event: ProviderRuntimeEvent): WorkUnitId | null { + const turnId = toTurnId(event.turnId); + if (!turnId) { + return null; + } + if ( + event.type === "task.started" || + event.type === "task.progress" || + event.type === "task.completed" + ) { + return taskWorkUnitIdForTurn(event.threadId, turnId, String(event.payload.taskId)); + } + return rootWorkUnitIdForTurn(event.threadId, turnId); +} + +function runtimeTurnStateToWorkUnitState( + state: "completed" | "failed" | "interrupted" | "cancelled", +): "completed" | "failed" | "stopped" | "cancelled" { + switch (state) { + case "failed": + return "failed"; + case "interrupted": + return "stopped"; + case "cancelled": + return "cancelled"; + case "completed": + return "completed"; + } +} + +function runtimeTaskStateToWorkUnitState( + state: "completed" | "failed" | "stopped", +): "completed" | "failed" | "stopped" { + switch (state) { + case "failed": + return "failed"; + case "stopped": + return "stopped"; + case "completed": + return "completed"; + } +} + +function providerTurnWorkUnitRefs( + event: ProviderRuntimeEvent, +): NonNullable | undefined { + return event.providerRefs?.providerTurnId !== undefined + ? { providerTurnId: event.providerRefs.providerTurnId } + : undefined; +} + function runtimeEventToActivities( event: ProviderRuntimeEvent, ): ReadonlyArray { @@ -186,6 +244,7 @@ function runtimeEventToActivities( ? { sequence: eventWithSequence.sessionSequence } : {}; })(); + const maybeWorkUnit = { workUnitId: workUnitIdForRuntimeEvent(event) }; switch (event.type) { case "request.opened": { if (event.payload.requestType === "tool_user_input") { @@ -213,6 +272,7 @@ function runtimeEventToActivities( ...(event.payload.detail ? { detail: truncateDetail(event.payload.detail) } : {}), }, turnId: toTurnId(event.turnId) ?? null, + ...maybeWorkUnit, ...maybeSequence, }, ]; @@ -237,6 +297,7 @@ function runtimeEventToActivities( ...(event.payload.decision ? { decision: event.payload.decision } : {}), }, turnId: toTurnId(event.turnId) ?? null, + ...maybeWorkUnit, ...maybeSequence, }, ]; @@ -258,6 +319,7 @@ function runtimeEventToActivities( message: truncateDetail(message), }, turnId: toTurnId(event.turnId) ?? null, + ...maybeWorkUnit, ...maybeSequence, }, ]; @@ -276,6 +338,7 @@ function runtimeEventToActivities( ...(event.payload.detail !== undefined ? { detail: event.payload.detail } : {}), }, turnId: toTurnId(event.turnId) ?? null, + ...maybeWorkUnit, ...maybeSequence, }, ]; @@ -296,6 +359,7 @@ function runtimeEventToActivities( : {}), }, turnId: toTurnId(event.turnId) ?? null, + ...maybeWorkUnit, ...maybeSequence, }, ]; @@ -314,6 +378,7 @@ function runtimeEventToActivities( questions: event.payload.questions, }, turnId: toTurnId(event.turnId) ?? null, + ...maybeWorkUnit, ...maybeSequence, }, ]; @@ -332,6 +397,7 @@ function runtimeEventToActivities( answers: event.payload.answers, }, turnId: toTurnId(event.turnId) ?? null, + ...maybeWorkUnit, ...maybeSequence, }, ]; @@ -358,6 +424,7 @@ function runtimeEventToActivities( : {}), }, turnId: toTurnId(event.turnId) ?? null, + ...maybeWorkUnit, ...maybeSequence, }, ]; @@ -379,6 +446,7 @@ function runtimeEventToActivities( ...(event.payload.usage !== undefined ? { usage: event.payload.usage } : {}), }, turnId: toTurnId(event.turnId) ?? null, + ...maybeWorkUnit, ...maybeSequence, }, ]; @@ -404,6 +472,7 @@ function runtimeEventToActivities( ...(event.payload.usage !== undefined ? { usage: event.payload.usage } : {}), }, turnId: toTurnId(event.turnId) ?? null, + ...maybeWorkUnit, ...maybeSequence, }, ]; @@ -427,6 +496,7 @@ function runtimeEventToActivities( ...(event.payload.data !== undefined ? { data: event.payload.data } : {}), }, turnId: toTurnId(event.turnId) ?? null, + ...maybeWorkUnit, ...maybeSequence, }, ]; @@ -448,6 +518,7 @@ function runtimeEventToActivities( ...(event.payload.detail ? { detail: truncateDetail(event.payload.detail) } : {}), }, turnId: toTurnId(event.turnId) ?? null, + ...maybeWorkUnit, ...maybeSequence, }, ]; @@ -469,6 +540,7 @@ function runtimeEventToActivities( ...(event.payload.detail ? { detail: truncateDetail(event.payload.detail) } : {}), }, turnId: toTurnId(event.turnId) ?? null, + ...maybeWorkUnit, ...maybeSequence, }, ]; @@ -856,6 +928,74 @@ const make = Effect.gen(function* () { }); }); + const upsertWorkUnit = Effect.fnUntraced(function* (input: { + thread: OrchestrationThread; + event: ProviderRuntimeEvent; + workUnitId: WorkUnitId; + turnId: TurnId; + kind: "primary_agent" | "delegated_agent"; + state: "queued" | "running" | "completed" | "failed" | "stopped" | "cancelled"; + title: string; + updatedAt: string; + parentWorkUnitId?: WorkUnitId | null; + detail?: string | null; + startedAt?: string; + completedAt?: string | null; + providerRefs?: NonNullable; + }) { + const existing = input.thread.workUnits.find((entry) => sameId(entry.id, input.workUnitId)); + const providerRefs = + input.providerRefs !== undefined || existing?.providerRefs !== undefined + ? { + ...existing?.providerRefs, + ...input.providerRefs, + } + : undefined; + + yield* orchestrationEngine.dispatch({ + type: "thread.work-unit.upsert", + commandId: providerCommandId(input.event, "thread-work-unit-upsert"), + threadId: input.thread.id, + workUnit: { + id: input.workUnitId, + turnId: input.turnId, + parentWorkUnitId: input.parentWorkUnitId ?? existing?.parentWorkUnitId ?? null, + kind: input.kind, + state: input.state, + title: input.title.trim().length > 0 ? input.title : (existing?.title ?? "Work unit"), + detail: input.detail !== undefined ? input.detail : (existing?.detail ?? null), + spawnedByActivityId: existing?.spawnedByActivityId ?? null, + ...(providerRefs !== undefined ? { providerRefs } : {}), + startedAt: existing?.startedAt ?? input.startedAt ?? input.updatedAt, + updatedAt: input.updatedAt, + completedAt: + input.completedAt !== undefined ? input.completedAt : (existing?.completedAt ?? null), + }, + createdAt: input.updatedAt, + }); + }); + + const ensureRootWorkUnit = Effect.fnUntraced(function* ( + thread: OrchestrationThread, + event: ProviderRuntimeEvent, + turnId: TurnId, + updatedAt: string, + ) { + const providerRefs = providerTurnWorkUnitRefs(event); + yield* upsertWorkUnit({ + thread, + event, + workUnitId: rootWorkUnitIdForTurn(thread.id, turnId), + turnId, + kind: "primary_agent", + state: "running", + title: "Primary agent", + updatedAt, + startedAt: updatedAt, + ...(providerRefs ? { providerRefs } : {}), + }); + }); + const processRuntimeEvent = (event: ProviderRuntimeEvent) => Effect.gen(function* () { const readModel = yield* orchestrationEngine.getReadModel(); @@ -980,6 +1120,89 @@ const make = Effect.gen(function* () { } } + if (eventTurnId !== undefined) { + yield* ensureRootWorkUnit(thread, event, eventTurnId, now); + } + + if (event.type === "turn.started" && eventTurnId !== undefined) { + const providerRefs = providerTurnWorkUnitRefs(event); + yield* upsertWorkUnit({ + thread, + event, + workUnitId: rootWorkUnitIdForTurn(thread.id, eventTurnId), + turnId: eventTurnId, + kind: "primary_agent", + state: "running", + title: "Primary agent", + updatedAt: now, + startedAt: now, + ...(providerRefs ? { providerRefs } : {}), + }); + } + + if (event.type === "turn.completed" && eventTurnId !== undefined) { + const providerRefs = providerTurnWorkUnitRefs(event); + yield* upsertWorkUnit({ + thread, + event, + workUnitId: rootWorkUnitIdForTurn(thread.id, eventTurnId), + turnId: eventTurnId, + kind: "primary_agent", + state: runtimeTurnStateToWorkUnitState(runtimeTurnState(event)), + title: "Primary agent", + updatedAt: now, + completedAt: now, + ...(providerRefs ? { providerRefs } : {}), + }); + } + + if ( + (event.type === "task.started" || + event.type === "task.progress" || + event.type === "task.completed") && + eventTurnId !== undefined + ) { + const taskId = String(event.payload.taskId); + const workUnitId = taskWorkUnitIdForTurn(thread.id, eventTurnId, taskId); + const taskState = + event.type === "task.completed" + ? runtimeTaskStateToWorkUnitState(event.payload.status) + : "running"; + const taskTitle = + event.type === "task.started" + ? (event.payload.description ?? event.payload.taskType ?? "Delegated task") + : event.type === "task.completed" + ? (event.payload.summary ?? "Delegated task") + : (event.payload.summary ?? event.payload.description); + const taskDetail = + event.type === "task.started" + ? (event.payload.description ?? null) + : event.type === "task.completed" + ? (event.payload.summary ?? null) + : (event.payload.summary ?? event.payload.description); + yield* upsertWorkUnit({ + thread, + event, + workUnitId, + turnId: eventTurnId, + parentWorkUnitId: rootWorkUnitIdForTurn(thread.id, eventTurnId), + kind: "delegated_agent", + state: taskState, + title: taskTitle, + detail: taskDetail, + updatedAt: now, + ...(event.type === "task.started" ? { startedAt: now } : {}), + ...(event.type === "task.completed" ? { completedAt: now } : {}), + providerRefs: { + runtimeTaskId: RuntimeTaskId.makeUnsafe(taskId), + ...(event.itemId ? { runtimeItemId: event.itemId } : {}), + ...(event.providerRefs?.providerTurnId !== undefined + ? { providerTurnId: event.providerRefs.providerTurnId } + : {}), + }, + }); + } + const assistantDelta = event.type === "content.delta" && event.payload.streamKind === "assistant_text" ? event.payload.delta diff --git a/apps/server/src/orchestration/Schemas.ts b/apps/server/src/orchestration/Schemas.ts index c96385cad..5abb07c6d 100644 --- a/apps/server/src/orchestration/Schemas.ts +++ b/apps/server/src/orchestration/Schemas.ts @@ -13,6 +13,7 @@ import { ThreadTurnDiffCompletedPayload as ContractsThreadTurnDiffCompletedPayloadSchema, ThreadRevertedPayload as ContractsThreadRevertedPayloadSchema, ThreadActivityAppendedPayload as ContractsThreadActivityAppendedPayloadSchema, + ThreadWorkUnitUpsertedPayload as ContractsThreadWorkUnitUpsertedPayloadSchema, ThreadTurnStartRequestedPayload as ContractsThreadTurnStartRequestedPayloadSchema, ThreadTurnInterruptRequestedPayload as ContractsThreadTurnInterruptRequestedPayloadSchema, ThreadApprovalResponseRequestedPayload as ContractsThreadApprovalResponseRequestedPayloadSchema, @@ -37,6 +38,7 @@ export const ThreadSessionSetPayload = ContractsThreadSessionSetPayloadSchema; export const ThreadTurnDiffCompletedPayload = ContractsThreadTurnDiffCompletedPayloadSchema; export const ThreadRevertedPayload = ContractsThreadRevertedPayloadSchema; export const ThreadActivityAppendedPayload = ContractsThreadActivityAppendedPayloadSchema; +export const ThreadWorkUnitUpsertedPayload = ContractsThreadWorkUnitUpsertedPayloadSchema; export const ThreadTurnStartRequestedPayload = ContractsThreadTurnStartRequestedPayloadSchema; export const ThreadTurnInterruptRequestedPayload = diff --git a/apps/server/src/orchestration/commandInvariants.test.ts b/apps/server/src/orchestration/commandInvariants.test.ts index f95e4db75..c6b969036 100644 --- a/apps/server/src/orchestration/commandInvariants.test.ts +++ b/apps/server/src/orchestration/commandInvariants.test.ts @@ -62,6 +62,7 @@ const readModel: OrchestrationReadModel = { session: null, activities: [], proposedPlans: [], + workUnits: [], checkpoints: [], deletedAt: null, }, @@ -81,6 +82,7 @@ const readModel: OrchestrationReadModel = { session: null, activities: [], proposedPlans: [], + workUnits: [], checkpoints: [], deletedAt: null, }, diff --git a/apps/server/src/orchestration/decider.ts b/apps/server/src/orchestration/decider.ts index 6ea4c5175..16ecfeb3a 100644 --- a/apps/server/src/orchestration/decider.ts +++ b/apps/server/src/orchestration/decider.ts @@ -631,6 +631,27 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" }; } + case "thread.work-unit.upsert": { + yield* requireThread({ + readModel, + command, + threadId: command.threadId, + }); + return { + ...withEventBase({ + aggregateKind: "thread", + aggregateId: command.threadId, + occurredAt: command.createdAt, + commandId: command.commandId, + }), + type: "thread.work-unit-upserted", + payload: { + threadId: command.threadId, + workUnit: command.workUnit, + }, + }; + } + default: { command satisfies never; const fallback = command as never as { type: string }; diff --git a/apps/server/src/orchestration/projector.test.ts b/apps/server/src/orchestration/projector.test.ts index 71f5b6bd4..485cf467a 100644 --- a/apps/server/src/orchestration/projector.test.ts +++ b/apps/server/src/orchestration/projector.test.ts @@ -85,6 +85,7 @@ describe("orchestration projector", () => { messages: [], proposedPlans: [], activities: [], + workUnits: [], checkpoints: [], session: null, }, @@ -122,6 +123,88 @@ describe("orchestration projector", () => { ).rejects.toBeDefined(); }); + it("applies thread.work-unit-upserted events", async () => { + const now = new Date().toISOString(); + const model = createEmptyReadModel(now); + + const afterCreate = await Effect.runPromise( + projectEvent( + model, + makeEvent({ + sequence: 1, + type: "thread.created", + aggregateKind: "thread", + aggregateId: "thread-1", + occurredAt: now, + commandId: "cmd-thread-create", + payload: { + threadId: "thread-1", + projectId: "project-1", + title: "demo", + model: "gpt-5-codex", + runtimeMode: "full-access", + branch: null, + worktreePath: null, + createdAt: now, + updatedAt: now, + }, + }), + ), + ); + + const next = await Effect.runPromise( + projectEvent( + afterCreate, + makeEvent({ + sequence: 2, + type: "thread.work-unit-upserted", + aggregateKind: "thread", + aggregateId: "thread-1", + occurredAt: now, + commandId: "cmd-work-unit-upsert", + payload: { + threadId: "thread-1", + workUnit: { + id: "wu:thread-1:turn:turn-1:root", + turnId: "turn-1", + parentWorkUnitId: null, + kind: "primary_agent", + state: "running", + title: "Primary agent", + detail: null, + spawnedByActivityId: null, + providerRefs: { + providerTurnId: "provider-turn-1", + }, + startedAt: now, + updatedAt: now, + completedAt: null, + }, + }, + }), + ), + ); + + expect(next.threads[0]?.workUnits).toEqual([ + { + id: "wu:thread-1:turn:turn-1:root", + turnId: "turn-1", + parentWorkUnitId: null, + kind: "primary_agent", + state: "running", + title: "Primary agent", + detail: null, + spawnedByActivityId: null, + providerRefs: { + providerTurnId: "provider-turn-1", + }, + startedAt: now, + updatedAt: now, + completedAt: null, + }, + ]); + }); + it("keeps projector forward-compatible for unhandled event types", async () => { const now = new Date().toISOString(); const model = createEmptyReadModel(now); @@ -453,6 +536,7 @@ describe("orchestration projector", () => { summary: "Edit file started", payload: { toolKind: "command" }, turnId: "turn-1", + workUnitId: null, createdAt: "2026-02-23T10:00:02.750Z", }, }, @@ -527,6 +611,7 @@ describe("orchestration projector", () => { summary: "Edit file complete", payload: { toolKind: "command" }, turnId: "turn-2", + workUnitId: null, createdAt: "2026-02-23T10:00:04.750Z", }, }, diff --git a/apps/server/src/orchestration/projector.ts b/apps/server/src/orchestration/projector.ts index 015f82a67..2da7b62bd 100644 --- a/apps/server/src/orchestration/projector.ts +++ b/apps/server/src/orchestration/projector.ts @@ -23,6 +23,7 @@ import { ThreadRevertedPayload, ThreadSessionSetPayload, ThreadTurnDiffCompletedPayload, + ThreadWorkUnitUpsertedPayload, } from "./Schemas.ts"; type ThreadPatch = Partial>; @@ -136,6 +137,17 @@ function retainThreadProposedPlansAfterRevert( ); } +function retainThreadWorkUnitsAfterRevert( + workUnits: ReadonlyArray, + retainedTurnIds: ReadonlySet, +): ReadonlyArray { + const retained = workUnits.filter((workUnit) => retainedTurnIds.has(workUnit.turnId)); + const retainedIds = new Set(retained.map((workUnit) => workUnit.id)); + return retained.filter( + (workUnit) => workUnit.parentWorkUnitId === null || retainedIds.has(workUnit.parentWorkUnitId), + ); +} + function compareThreadActivities( left: OrchestrationThread["activities"][number], right: OrchestrationThread["activities"][number], @@ -262,7 +274,9 @@ export function projectEvent( updatedAt: payload.updatedAt, deletedAt: null, messages: [], + proposedPlans: [], activities: [], + workUnits: [], checkpoints: [], session: null, }, @@ -563,6 +577,7 @@ export function projectEvent( retainedTurnIds, ).slice(-200); const activities = retainThreadActivitiesAfterRevert(thread.activities, retainedTurnIds); + const workUnits = retainThreadWorkUnitsAfterRevert(thread.workUnits, retainedTurnIds); const latestCheckpoint = checkpoints.at(-1) ?? null; const latestTurn = @@ -584,6 +599,7 @@ export function projectEvent( messages, proposedPlans, activities, + workUnits, latestTurn, updatedAt: event.occurredAt, }), @@ -621,6 +637,37 @@ export function projectEvent( }), ); + case "thread.work-unit-upserted": + return decodeForEvent( + ThreadWorkUnitUpsertedPayload, + event.payload, + event.type, + "payload", + ).pipe( + Effect.map((payload) => { + const thread = nextBase.threads.find((entry) => entry.id === payload.threadId); + if (!thread) { + return nextBase; + } + + const workUnits = [ + ...thread.workUnits.filter((entry) => entry.id !== payload.workUnit.id), + payload.workUnit, + ].toSorted( + (left, right) => + left.startedAt.localeCompare(right.startedAt) || left.id.localeCompare(right.id), + ); + + return { + ...nextBase, + threads: updateThread(nextBase.threads, payload.threadId, { + workUnits, + updatedAt: event.occurredAt, + }), + }; + }), + ); + default: return Effect.succeed(nextBase); } diff --git a/apps/server/src/persistence/Layers/ProjectionThreadActivities.ts b/apps/server/src/persistence/Layers/ProjectionThreadActivities.ts index 8e88cfa78..2c16806e4 100644 --- a/apps/server/src/persistence/Layers/ProjectionThreadActivities.ts +++ b/apps/server/src/persistence/Layers/ProjectionThreadActivities.ts @@ -38,6 +38,7 @@ const makeProjectionThreadActivityRepository = Effect.gen(function* () { activity_id, thread_id, turn_id, + work_unit_id, tone, kind, summary, @@ -49,6 +50,7 @@ const makeProjectionThreadActivityRepository = Effect.gen(function* () { ${row.activityId}, ${row.threadId}, ${row.turnId}, + ${row.workUnitId}, ${row.tone}, ${row.kind}, ${row.summary}, @@ -60,6 +62,7 @@ const makeProjectionThreadActivityRepository = Effect.gen(function* () { DO UPDATE SET thread_id = excluded.thread_id, turn_id = excluded.turn_id, + work_unit_id = excluded.work_unit_id, tone = excluded.tone, kind = excluded.kind, summary = excluded.summary, @@ -78,6 +81,7 @@ const makeProjectionThreadActivityRepository = Effect.gen(function* () { activity_id AS "activityId", thread_id AS "threadId", turn_id AS "turnId", + work_unit_id AS "workUnitId", tone, kind, summary, @@ -126,6 +130,7 @@ const makeProjectionThreadActivityRepository = Effect.gen(function* () { activityId: row.activityId, threadId: row.threadId, turnId: row.turnId, + workUnitId: row.workUnitId, tone: row.tone, kind: row.kind, summary: row.summary, diff --git a/apps/server/src/persistence/Layers/ProjectionThreadWorkUnits.ts b/apps/server/src/persistence/Layers/ProjectionThreadWorkUnits.ts new file mode 100644 index 000000000..0492b2185 --- /dev/null +++ b/apps/server/src/persistence/Layers/ProjectionThreadWorkUnits.ts @@ -0,0 +1,163 @@ +import { Effect, Layer, Schema, Struct } from "effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; +import * as SqlSchema from "effect/unstable/sql/SqlSchema"; + +import { toPersistenceDecodeError, toPersistenceSqlError } from "../Errors.ts"; +import { + DeleteProjectionThreadWorkUnitsInput, + ListProjectionThreadWorkUnitsInput, + ProjectionThreadWorkUnit, + ProjectionThreadWorkUnitRepository, + type ProjectionThreadWorkUnitRepositoryShape, +} from "../Services/ProjectionThreadWorkUnits.ts"; + +const ProjectionThreadWorkUnitDbRowSchema = ProjectionThreadWorkUnit.mapFields( + Struct.assign({ + providerRefs: Schema.NullOr( + Schema.fromJsonString(ProjectionThreadWorkUnit.fields.providerRefs), + ), + }), +); + +function toPersistenceSqlOrDecodeError(sqlOperation: string, decodeOperation: string) { + return (cause: unknown) => + Schema.isSchemaError(cause) + ? toPersistenceDecodeError(decodeOperation)(cause) + : toPersistenceSqlError(sqlOperation)(cause); +} + +const makeProjectionThreadWorkUnitRepository = Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + const upsertProjectionThreadWorkUnitRow = SqlSchema.void({ + Request: ProjectionThreadWorkUnit, + execute: (row) => sql` + INSERT INTO projection_thread_work_units ( + work_unit_id, + thread_id, + turn_id, + parent_work_unit_id, + kind, + state, + title, + detail, + spawned_by_activity_id, + provider_refs_json, + started_at, + updated_at, + completed_at + ) + VALUES ( + ${row.workUnitId}, + ${row.threadId}, + ${row.turnId}, + ${row.parentWorkUnitId}, + ${row.kind}, + ${row.state}, + ${row.title}, + ${row.detail}, + ${row.spawnedByActivityId}, + ${row.providerRefs ? JSON.stringify(row.providerRefs) : null}, + ${row.startedAt}, + ${row.updatedAt}, + ${row.completedAt} + ) + ON CONFLICT (work_unit_id) + DO UPDATE SET + thread_id = excluded.thread_id, + turn_id = excluded.turn_id, + parent_work_unit_id = excluded.parent_work_unit_id, + kind = excluded.kind, + state = excluded.state, + title = excluded.title, + detail = excluded.detail, + spawned_by_activity_id = excluded.spawned_by_activity_id, + provider_refs_json = excluded.provider_refs_json, + started_at = excluded.started_at, + updated_at = excluded.updated_at, + completed_at = excluded.completed_at + `, + }); + + const listProjectionThreadWorkUnitRows = SqlSchema.findAll({ + Request: ListProjectionThreadWorkUnitsInput, + Result: ProjectionThreadWorkUnitDbRowSchema, + execute: ({ threadId }) => sql` + SELECT + work_unit_id AS "workUnitId", + thread_id AS "threadId", + turn_id AS "turnId", + parent_work_unit_id AS "parentWorkUnitId", + kind, + state, + title, + detail, + spawned_by_activity_id AS "spawnedByActivityId", + provider_refs_json AS "providerRefs", + started_at AS "startedAt", + updated_at AS "updatedAt", + completed_at AS "completedAt" + FROM projection_thread_work_units + WHERE thread_id = ${threadId} + ORDER BY started_at ASC, work_unit_id ASC + `, + }); + + const deleteProjectionThreadWorkUnitRows = SqlSchema.void({ + Request: DeleteProjectionThreadWorkUnitsInput, + execute: ({ threadId }) => sql` + DELETE FROM projection_thread_work_units + WHERE thread_id = ${threadId} + `, + }); + + const upsert: ProjectionThreadWorkUnitRepositoryShape["upsert"] = (row) => + upsertProjectionThreadWorkUnitRow(row).pipe( + Effect.mapError(toPersistenceSqlError("ProjectionThreadWorkUnitRepository.upsert:query")), + ); + + const listByThreadId: ProjectionThreadWorkUnitRepositoryShape["listByThreadId"] = (input) => + listProjectionThreadWorkUnitRows(input).pipe( + Effect.mapError( + toPersistenceSqlOrDecodeError( + "ProjectionThreadWorkUnitRepository.listByThreadId:query", + "ProjectionThreadWorkUnitRepository.listByThreadId:decodeRows", + ), + ), + Effect.map((rows) => + rows.map((row) => ({ + workUnitId: row.workUnitId, + threadId: row.threadId, + turnId: row.turnId, + parentWorkUnitId: row.parentWorkUnitId, + kind: row.kind, + state: row.state, + title: row.title, + detail: row.detail, + spawnedByActivityId: row.spawnedByActivityId, + ...(row.providerRefs !== null ? { providerRefs: row.providerRefs } : {}), + startedAt: row.startedAt, + updatedAt: row.updatedAt, + completedAt: row.completedAt, + })), + ), + ); + + const deleteByThreadId: ProjectionThreadWorkUnitRepositoryShape["deleteByThreadId"] = (input) => + deleteProjectionThreadWorkUnitRows(input).pipe( + Effect.mapError( + toPersistenceSqlError("ProjectionThreadWorkUnitRepository.deleteByThreadId:query"), + ), + ); + + return { + upsert, + listByThreadId, + deleteByThreadId, + } satisfies ProjectionThreadWorkUnitRepositoryShape; +}); + +export const ProjectionThreadWorkUnitRepositoryLive = Layer.effect( + ProjectionThreadWorkUnitRepository, + makeProjectionThreadWorkUnitRepository, +); diff --git a/apps/server/src/persistence/Migrations.ts b/apps/server/src/persistence/Migrations.ts index ea1821014..2ccdd9ea5 100644 --- a/apps/server/src/persistence/Migrations.ts +++ b/apps/server/src/persistence/Migrations.ts @@ -27,6 +27,7 @@ import Migration0012 from "./Migrations/012_ProjectionThreadsInteractionMode.ts" import Migration0013 from "./Migrations/013_ProjectionThreadProposedPlans.ts"; import Migration0014 from "./Migrations/014_ProjectionThreadProposedPlanImplementation.ts"; import Migration0015 from "./Migrations/015_ProjectionTurnsSourceProposedPlan.ts"; +import Migration0016 from "./Migrations/016_ProjectionThreadWorkUnits.ts"; import { Effect } from "effect"; /** @@ -55,6 +56,7 @@ const loader = Migrator.fromRecord({ "13_ProjectionThreadProposedPlans": Migration0013, "14_ProjectionThreadProposedPlanImplementation": Migration0014, "15_ProjectionTurnsSourceProposedPlan": Migration0015, + "16_ProjectionThreadWorkUnits": Migration0016, }); /** diff --git a/apps/server/src/persistence/Migrations/016_ProjectionThreadWorkUnits.ts b/apps/server/src/persistence/Migrations/016_ProjectionThreadWorkUnits.ts new file mode 100644 index 000000000..1c08263c7 --- /dev/null +++ b/apps/server/src/persistence/Migrations/016_ProjectionThreadWorkUnits.ts @@ -0,0 +1,88 @@ +import * as Effect from "effect/Effect"; +import * as SqlClient from "effect/unstable/sql/SqlClient"; + +export default Effect.gen(function* () { + const sql = yield* SqlClient.SqlClient; + + yield* sql` + ALTER TABLE projection_thread_activities + ADD COLUMN work_unit_id TEXT + `; + + yield* sql` + CREATE TABLE IF NOT EXISTS projection_thread_work_units ( + work_unit_id TEXT PRIMARY KEY, + thread_id TEXT NOT NULL, + turn_id TEXT NOT NULL, + parent_work_unit_id TEXT, + kind TEXT NOT NULL, + state TEXT NOT NULL, + title TEXT NOT NULL, + detail TEXT, + spawned_by_activity_id TEXT, + provider_refs_json TEXT, + started_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + completed_at TEXT + ) + `; + + yield* sql` + CREATE INDEX IF NOT EXISTS idx_projection_thread_work_units_thread_turn + ON projection_thread_work_units(thread_id, turn_id, started_at) + `; + + yield* sql` + CREATE INDEX IF NOT EXISTS idx_projection_thread_work_units_thread_parent + ON projection_thread_work_units(thread_id, parent_work_unit_id, started_at) + `; + + yield* sql` + INSERT INTO projection_thread_work_units ( + work_unit_id, + thread_id, + turn_id, + parent_work_unit_id, + kind, + state, + title, + detail, + spawned_by_activity_id, + provider_refs_json, + started_at, + updated_at, + completed_at + ) + SELECT + 'wu:' || thread_id || ':turn:' || turn_id || ':root', + thread_id, + turn_id, + NULL, + 'primary_agent', + CASE state + WHEN 'completed' THEN 'completed' + WHEN 'error' THEN 'failed' + WHEN 'interrupted' THEN 'stopped' + ELSE 'running' + END, + 'Primary agent', + NULL, + NULL, + NULL, + COALESCE(started_at, requested_at), + COALESCE(completed_at, started_at, requested_at), + CASE + WHEN state IN ('completed', 'error', 'interrupted') THEN COALESCE(completed_at, started_at, requested_at) + ELSE NULL + END + FROM projection_turns + WHERE turn_id IS NOT NULL + `; + + yield* sql` + UPDATE projection_thread_activities + SET work_unit_id = 'wu:' || thread_id || ':turn:' || turn_id || ':root' + WHERE turn_id IS NOT NULL + AND work_unit_id IS NULL + `; +}); diff --git a/apps/server/src/persistence/Services/ProjectionThreadActivities.ts b/apps/server/src/persistence/Services/ProjectionThreadActivities.ts index 586ae3eb4..e45f90dba 100644 --- a/apps/server/src/persistence/Services/ProjectionThreadActivities.ts +++ b/apps/server/src/persistence/Services/ProjectionThreadActivities.ts @@ -13,6 +13,7 @@ import { OrchestrationThreadActivityTone, ThreadId, TurnId, + WorkUnitId, } from "@t3tools/contracts"; import { Schema, ServiceMap } from "effect"; import type { Effect } from "effect"; @@ -23,6 +24,7 @@ export const ProjectionThreadActivity = Schema.Struct({ activityId: EventId, threadId: ThreadId, turnId: Schema.NullOr(TurnId), + workUnitId: Schema.NullOr(WorkUnitId), tone: OrchestrationThreadActivityTone, kind: Schema.String, summary: Schema.String, diff --git a/apps/server/src/persistence/Services/ProjectionThreadWorkUnits.ts b/apps/server/src/persistence/Services/ProjectionThreadWorkUnits.ts new file mode 100644 index 000000000..d6ff2668b --- /dev/null +++ b/apps/server/src/persistence/Services/ProjectionThreadWorkUnits.ts @@ -0,0 +1,59 @@ +import { + EventId, + IsoDateTime, + OrchestrationWorkUnitKind, + OrchestrationWorkUnitProviderRefs, + OrchestrationWorkUnitState, + ThreadId, + TurnId, + TrimmedNonEmptyString, + WorkUnitId, +} from "@t3tools/contracts"; +import { Schema, ServiceMap } from "effect"; +import type { Effect } from "effect"; + +import type { ProjectionRepositoryError } from "../Errors.ts"; + +export const ProjectionThreadWorkUnit = Schema.Struct({ + workUnitId: WorkUnitId, + threadId: ThreadId, + turnId: TurnId, + parentWorkUnitId: Schema.NullOr(WorkUnitId), + kind: OrchestrationWorkUnitKind, + state: OrchestrationWorkUnitState, + title: TrimmedNonEmptyString, + detail: Schema.NullOr(TrimmedNonEmptyString), + spawnedByActivityId: Schema.NullOr(EventId), + providerRefs: Schema.optional(OrchestrationWorkUnitProviderRefs), + startedAt: IsoDateTime, + updatedAt: IsoDateTime, + completedAt: Schema.NullOr(IsoDateTime), +}); +export type ProjectionThreadWorkUnit = typeof ProjectionThreadWorkUnit.Type; + +export const ListProjectionThreadWorkUnitsInput = Schema.Struct({ + threadId: ThreadId, +}); +export type ListProjectionThreadWorkUnitsInput = typeof ListProjectionThreadWorkUnitsInput.Type; + +export const DeleteProjectionThreadWorkUnitsInput = Schema.Struct({ + threadId: ThreadId, +}); +export type DeleteProjectionThreadWorkUnitsInput = typeof DeleteProjectionThreadWorkUnitsInput.Type; + +export interface ProjectionThreadWorkUnitRepositoryShape { + readonly upsert: ( + workUnit: ProjectionThreadWorkUnit, + ) => Effect.Effect; + readonly listByThreadId: ( + input: ListProjectionThreadWorkUnitsInput, + ) => Effect.Effect, ProjectionRepositoryError>; + readonly deleteByThreadId: ( + input: DeleteProjectionThreadWorkUnitsInput, + ) => Effect.Effect; +} + +export class ProjectionThreadWorkUnitRepository extends ServiceMap.Service< + ProjectionThreadWorkUnitRepository, + ProjectionThreadWorkUnitRepositoryShape +>()("t3/persistence/Services/ProjectionThreadWorkUnits/ProjectionThreadWorkUnitRepository") {} diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.test.ts b/apps/server/src/provider/Layers/ClaudeAdapter.test.ts index 3f1c85197..9cf276008 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.test.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.test.ts @@ -996,6 +996,84 @@ describe("ClaudeAdapterLive", () => { ); }); + it.effect("attaches Claude subagent task progress to the collab tool item", () => { + const harness = makeHarness(); + return Effect.gen(function* () { + const adapter = yield* ClaudeAdapter; + + const runtimeEventsFiber = yield* Stream.take(adapter.streamEvents, 8).pipe( + Stream.runCollect, + Effect.forkChild, + ); + + const session = yield* adapter.startSession({ + threadId: THREAD_ID, + provider: "claudeAgent", + runtimeMode: "full-access", + }); + + yield* adapter.sendTurn({ + threadId: session.threadId, + input: "delegate this", + attachments: [], + }); + + harness.query.emit({ + type: "stream_event", + session_id: "sdk-session-task-item-link", + uuid: "stream-task-item-link", + parent_tool_use_id: null, + event: { + type: "content_block_start", + index: 0, + content_block: { + type: "tool_use", + id: "tool-task-link-1", + name: "Task", + input: { + description: "Review the database layer", + }, + }, + }, + } as unknown as SDKMessage); + + harness.query.emit({ + type: "tool_progress", + session_id: "sdk-session-task-item-link", + uuid: "tool-progress-task-link", + parent_tool_use_id: null, + tool_use_id: "tool-task-link-1", + tool_name: "Task", + task_id: "task-subagent-link-1", + elapsed_time_seconds: 1, + } as unknown as SDKMessage); + + harness.query.emit({ + type: "system", + subtype: "task_progress", + task_id: "task-subagent-link-1", + description: "Running background teammate", + summary: "Code reviewer checked the migration edge cases.", + session_id: "sdk-session-task-item-link", + uuid: "task-progress-task-link", + } as unknown as SDKMessage); + + const runtimeEvents = Array.from(yield* Fiber.join(runtimeEventsFiber)); + const progressEvent = runtimeEvents.find( + (event) => + event.type === "task.progress" && event.payload.taskId === "task-subagent-link-1", + ); + assert.equal(progressEvent?.type, "task.progress"); + if (progressEvent?.type === "task.progress") { + assert.equal(progressEvent.itemId, "tool-task-link-1"); + assert.equal(progressEvent.providerRefs?.providerItemId, "tool-task-link-1"); + } + }).pipe( + Effect.provideService(Random.Random, makeDeterministicRandomService()), + Effect.provide(harness.layer), + ); + }); + it.effect( "emits completion only after turn result when assistant frames arrive before deltas", () => { diff --git a/apps/server/src/provider/Layers/ClaudeAdapter.ts b/apps/server/src/provider/Layers/ClaudeAdapter.ts index d69d34e1a..db90939c5 100644 --- a/apps/server/src/provider/Layers/ClaudeAdapter.ts +++ b/apps/server/src/provider/Layers/ClaudeAdapter.ts @@ -89,6 +89,7 @@ interface ClaudeTurnState { readonly assistantTextBlocks: Map; readonly assistantTextBlockOrder: Array; readonly capturedProposedPlanKeys: Set; + readonly taskItemIds: Map; nextSyntheticAssistantBlockIndex: number; } @@ -434,6 +435,20 @@ function nativeProviderRefs( return {}; } +function taskEventParentItemFields( + context: ClaudeSessionContext, + taskId: string, +): Pick | undefined { + const providerItemId = context.turnState?.taskItemIds.get(taskId); + if (!providerItemId) { + return undefined; + } + return { + itemId: asRuntimeItemId(providerItemId), + providerRefs: nativeProviderRefs(context, { providerItemId }), + }; +} + function extractAssistantTextBlocks(message: SDKMessage): Array { if (message.type !== "assistant") { return []; @@ -1599,6 +1614,7 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) { assistantTextBlocks: new Map(), assistantTextBlockOrder: [], capturedProposedPlanKeys: new Set(), + taskItemIds: new Map(), nextSyntheticAssistantBlockIndex: -1, }; context.session = { @@ -1781,6 +1797,7 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) { case "task_started": yield* offerRuntimeEvent({ ...base, + ...taskEventParentItemFields(context, message.task_id), type: "task.started", payload: { taskId: RuntimeTaskId.makeUnsafe(message.task_id), @@ -1792,6 +1809,7 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) { case "task_progress": yield* offerRuntimeEvent({ ...base, + ...taskEventParentItemFields(context, message.task_id), type: "task.progress", payload: { taskId: RuntimeTaskId.makeUnsafe(message.task_id), @@ -1805,6 +1823,7 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) { case "task_notification": yield* offerRuntimeEvent({ ...base, + ...taskEventParentItemFields(context, message.task_id), type: "task.completed", payload: { taskId: RuntimeTaskId.makeUnsafe(message.task_id), @@ -1868,8 +1887,13 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) { }; if (message.type === "tool_progress") { + if (message.task_id && context.turnState) { + context.turnState.taskItemIds.set(message.task_id, message.tool_use_id); + } yield* offerRuntimeEvent({ ...base, + itemId: asRuntimeItemId(message.tool_use_id), + providerRefs: nativeProviderRefs(context, { providerItemId: message.tool_use_id }), type: "tool.progress", payload: { toolUseId: message.tool_use_id, @@ -2550,6 +2574,7 @@ function makeClaudeAdapter(options?: ClaudeAdapterLiveOptions) { assistantTextBlocks: new Map(), assistantTextBlockOrder: [], capturedProposedPlanKeys: new Set(), + taskItemIds: new Map(), nextSyntheticAssistantBlockIndex: -1, }; diff --git a/apps/server/src/provider/Layers/CodexAdapter.test.ts b/apps/server/src/provider/Layers/CodexAdapter.test.ts index 31d394c3e..8f7586437 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.test.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.test.ts @@ -812,6 +812,7 @@ lifecycleLayer("CodexAdapterLive lifecycle", (it) => { provider: "codex", threadId: asThreadId("thread-1"), turnId: asTurnId("turn-parent"), + itemId: asItemId("call-collab-1"), createdAt: new Date().toISOString(), method: "codex/event/task_started", payload: { @@ -835,7 +836,9 @@ lifecycleLayer("CodexAdapterLive lifecycle", (it) => { return; } assert.equal(firstEvent.value.turnId, "turn-parent"); + assert.equal(firstEvent.value.itemId, "call-collab-1"); assert.equal(firstEvent.value.providerRefs?.providerTurnId, "turn-parent"); + assert.equal(firstEvent.value.providerRefs?.providerItemId, "call-collab-1"); assert.equal(firstEvent.value.payload.taskId, "turn-child"); }), ); diff --git a/apps/web/src/components/ChatView.browser.tsx b/apps/web/src/components/ChatView.browser.tsx index 6cbef09bd..fa47407cb 100644 --- a/apps/web/src/components/ChatView.browser.tsx +++ b/apps/web/src/components/ChatView.browser.tsx @@ -244,6 +244,7 @@ function createSnapshotForTargetUser(options: { messages, activities: [], proposedPlans: [], + workUnits: [], checkpoints: [], session: { threadId: THREAD_ID, @@ -298,6 +299,7 @@ function addThreadToSnapshot( messages: [], activities: [], proposedPlans: [], + workUnits: [], checkpoints: [], session: { threadId, diff --git a/apps/web/src/components/KeybindingsToast.browser.tsx b/apps/web/src/components/KeybindingsToast.browser.tsx index ba4c8f432..8c1f0cbf3 100644 --- a/apps/web/src/components/KeybindingsToast.browser.tsx +++ b/apps/web/src/components/KeybindingsToast.browser.tsx @@ -98,6 +98,7 @@ function createMinimalSnapshot(): OrchestrationReadModel { ], activities: [], proposedPlans: [], + workUnits: [], checkpoints: [], session: { threadId: THREAD_ID, diff --git a/apps/web/src/session-logic.test.ts b/apps/web/src/session-logic.test.ts index 5348bbb96..cb5c04af4 100644 --- a/apps/web/src/session-logic.test.ts +++ b/apps/web/src/session-logic.test.ts @@ -41,6 +41,7 @@ function makeActivity(overrides: { tone: overrides.tone ?? "tool", payload, turnId: overrides.turnId ? TurnId.makeUnsafe(overrides.turnId) : null, + workUnitId: null, ...(overrides.sequence !== undefined ? { sequence: overrides.sequence } : {}), }; } diff --git a/apps/web/src/store.test.ts b/apps/web/src/store.test.ts index f1919ec72..3bf14669a 100644 --- a/apps/web/src/store.test.ts +++ b/apps/web/src/store.test.ts @@ -67,6 +67,7 @@ function makeReadModelThread(overrides: Partial @@ -315,3 +319,43 @@ it.effect("preserves proposed plan implementation metadata when present", () => assert.strictEqual(parsed.implementationThreadId, "thread-2"); }), ); + +it.effect("defaults activity work unit ids for historical rows", () => + Effect.gen(function* () { + const parsed = yield* decodeOrchestrationThreadActivity({ + id: "activity-1", + tone: "info", + kind: "runtime.note", + summary: "Runtime note", + payload: {}, + turnId: null, + createdAt: "2026-01-01T00:00:00.000Z", + }); + assert.strictEqual(parsed.workUnitId, null); + }), +); + +it.effect("defaults thread work units for historical rows", () => + Effect.gen(function* () { + const parsed = yield* decodeOrchestrationThread({ + id: "thread-1", + projectId: "project-1", + title: "Thread", + model: "gpt-5-codex", + runtimeMode: "full-access", + interactionMode: "default", + branch: null, + worktreePath: null, + latestTurn: null, + createdAt: "2026-01-01T00:00:00.000Z", + updatedAt: "2026-01-01T00:00:00.000Z", + deletedAt: null, + messages: [], + proposedPlans: [], + activities: [], + checkpoints: [], + session: null, + }); + assert.deepStrictEqual(parsed.workUnits, []); + }), +); diff --git a/packages/contracts/src/orchestration.ts b/packages/contracts/src/orchestration.ts index 3208adc8b..04f393981 100644 --- a/packages/contracts/src/orchestration.ts +++ b/packages/contracts/src/orchestration.ts @@ -13,6 +13,9 @@ import { ThreadId, TrimmedNonEmptyString, TurnId, + RuntimeItemId, + RuntimeTaskId, + WorkUnitId, } from "./baseSchemas"; export const ORCHESTRATION_WS_METHODS = { @@ -245,11 +248,49 @@ export const OrchestrationThreadActivity = Schema.Struct({ summary: TrimmedNonEmptyString, payload: Schema.Unknown, turnId: Schema.NullOr(TurnId), + workUnitId: Schema.NullOr(WorkUnitId).pipe(Schema.withDecodingDefault(() => null)), sequence: Schema.optional(NonNegativeInt), createdAt: IsoDateTime, }); export type OrchestrationThreadActivity = typeof OrchestrationThreadActivity.Type; +export const OrchestrationWorkUnitKind = Schema.Literals(["primary_agent", "delegated_agent"]); +export type OrchestrationWorkUnitKind = typeof OrchestrationWorkUnitKind.Type; + +export const OrchestrationWorkUnitState = Schema.Literals([ + "queued", + "running", + "completed", + "failed", + "stopped", + "cancelled", +]); +export type OrchestrationWorkUnitState = typeof OrchestrationWorkUnitState.Type; + +export const OrchestrationWorkUnitProviderRefs = Schema.Struct({ + runtimeTaskId: Schema.optional(RuntimeTaskId), + runtimeItemId: Schema.optional(RuntimeItemId), + providerThreadId: Schema.optional(TrimmedNonEmptyString), + providerTurnId: Schema.optional(TrimmedNonEmptyString), +}); +export type OrchestrationWorkUnitProviderRefs = typeof OrchestrationWorkUnitProviderRefs.Type; + +export const OrchestrationWorkUnit = Schema.Struct({ + id: WorkUnitId, + turnId: TurnId, + parentWorkUnitId: Schema.NullOr(WorkUnitId), + kind: OrchestrationWorkUnitKind, + state: OrchestrationWorkUnitState, + title: TrimmedNonEmptyString, + detail: Schema.NullOr(TrimmedNonEmptyString), + spawnedByActivityId: Schema.NullOr(EventId), + providerRefs: Schema.optional(OrchestrationWorkUnitProviderRefs), + startedAt: IsoDateTime, + updatedAt: IsoDateTime, + completedAt: Schema.NullOr(IsoDateTime), +}); +export type OrchestrationWorkUnit = typeof OrchestrationWorkUnit.Type; + const OrchestrationLatestTurnState = Schema.Literals([ "running", "interrupted", @@ -287,6 +328,7 @@ export const OrchestrationThread = Schema.Struct({ messages: Schema.Array(OrchestrationMessage), proposedPlans: Schema.Array(OrchestrationProposedPlan).pipe(Schema.withDecodingDefault(() => [])), activities: Schema.Array(OrchestrationThreadActivity), + workUnits: Schema.Array(OrchestrationWorkUnit).pipe(Schema.withDecodingDefault(() => [])), checkpoints: Schema.Array(OrchestrationCheckpointSummary), session: Schema.NullOr(OrchestrationSession), }); @@ -553,6 +595,14 @@ const ThreadActivityAppendCommand = Schema.Struct({ createdAt: IsoDateTime, }); +const ThreadWorkUnitUpsertCommand = Schema.Struct({ + type: Schema.Literal("thread.work-unit.upsert"), + commandId: CommandId, + threadId: ThreadId, + workUnit: OrchestrationWorkUnit, + createdAt: IsoDateTime, +}); + const ThreadRevertCompleteCommand = Schema.Struct({ type: Schema.Literal("thread.revert.complete"), commandId: CommandId, @@ -568,6 +618,7 @@ const InternalOrchestrationCommand = Schema.Union([ ThreadProposedPlanUpsertCommand, ThreadTurnDiffCompleteCommand, ThreadActivityAppendCommand, + ThreadWorkUnitUpsertCommand, ThreadRevertCompleteCommand, ]); export type InternalOrchestrationCommand = typeof InternalOrchestrationCommand.Type; @@ -599,6 +650,7 @@ export const OrchestrationEventType = Schema.Literals([ "thread.proposed-plan-upserted", "thread.turn-diff-completed", "thread.activity-appended", + "thread.work-unit-upserted", ]); export type OrchestrationEventType = typeof OrchestrationEventType.Type; @@ -763,6 +815,11 @@ export const ThreadActivityAppendedPayload = Schema.Struct({ activity: OrchestrationThreadActivity, }); +export const ThreadWorkUnitUpsertedPayload = Schema.Struct({ + threadId: ThreadId, + workUnit: OrchestrationWorkUnit, +}); + export const OrchestrationEventMetadata = Schema.Struct({ providerTurnId: Schema.optional(TrimmedNonEmptyString), providerItemId: Schema.optional(ProviderItemId), @@ -885,6 +942,11 @@ export const OrchestrationEvent = Schema.Union([ type: Schema.Literal("thread.activity-appended"), payload: ThreadActivityAppendedPayload, }), + Schema.Struct({ + ...EventBaseFields, + type: Schema.Literal("thread.work-unit-upserted"), + payload: ThreadWorkUnitUpsertedPayload, + }), ]); export type OrchestrationEvent = typeof OrchestrationEvent.Type;