diff --git a/apps/server/src/claudeModelOptions.test.ts b/apps/server/src/claudeModelOptions.test.ts index 620a7ef0b3f..dd47346fce2 100644 --- a/apps/server/src/claudeModelOptions.test.ts +++ b/apps/server/src/claudeModelOptions.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it } from "vitest"; +import { describe, expect, it } from "vite-plus/test"; import { ProviderInstanceId, type ModelSelection } from "@t3tools/contracts"; diff --git a/apps/server/src/orchestration-v2/Adapters/AcpAdapterV2.ts b/apps/server/src/orchestration-v2/Adapters/AcpAdapterV2.ts index c3e5dd1e849..c4f7c94ecad 100644 --- a/apps/server/src/orchestration-v2/Adapters/AcpAdapterV2.ts +++ b/apps/server/src/orchestration-v2/Adapters/AcpAdapterV2.ts @@ -87,7 +87,7 @@ export const ACP_PROTOCOL = "acp.ndjson-jsonrpc" as const; export interface AcpAdapterV2RuntimeInput { readonly cwd: string; readonly mcpServers: ReadonlyArray; - readonly interruptPromptOnCancel: false; + readonly interruptPromptOnCancel?: boolean; readonly clientCapabilities: EffectAcpSchema.InitializeRequest["clientCapabilities"]; readonly clientInfo: AcpSessionRuntimeOptions["clientInfo"]; readonly requestLogger?: NonNullable; @@ -112,6 +112,56 @@ export interface AcpAdapterV2ExtensionContext { ) => Effect.Effect; } +export interface AcpRootTurnIdleSnapshot { + readonly finalized: boolean; + readonly interrupted: boolean; + readonly assistantStreamOpen: boolean; + readonly reasoningStreamOpen: boolean; + readonly hasRunningTool: boolean; + readonly hasPendingRuntimeRequest: boolean; + readonly hasToolHistory: boolean; + readonly hasRunningSubagent: boolean; + readonly hasOutput: boolean; +} + +/** Wait for follow-on tool calls after a brief assistant preamble. */ +export const acpRootTurnSettleDebounceMs = 2_000; + +/** Fail silent post-prompt hangs when no root session traffic arrives. */ +export const acpRootTurnPromptLivenessMs = 30_000; + +/** Let trailing root session chunks land before terminalizing a settled turn. */ +export const acpRootTurnCompletionDrainMs = 100; + +/** True when root-session streaming is quiescent and the turn can settle locally. */ +export function acpRootTurnIsIdle(snapshot: AcpRootTurnIdleSnapshot): boolean { + if (snapshot.finalized || snapshot.interrupted) return false; + if (snapshot.assistantStreamOpen || snapshot.reasoningStreamOpen) return false; + if (snapshot.hasRunningTool || snapshot.hasPendingRuntimeRequest) return false; + // Grok tool phases often pause root streaming while work continues. Wait for + // the prompt RPC (or root prompt_complete race) instead of idle-canceling. + if (snapshot.hasToolHistory || snapshot.hasRunningSubagent) return false; + return snapshot.hasOutput; +} + +/** True when idle settle and the prompt liveness watchdog should be (re-)scheduled. */ +export function acpRootTurnShouldRearmRecoveryTimers(context: { + readonly finalized: boolean; + readonly interrupted: boolean; +}): boolean { + return !context.finalized && !context.interrupted; +} + +/** + * Watchdog tick defers without failing when approval/user-input is still pending. + * Callers must re-arm the watchdog when that pending request clears. + */ +export function acpRootTurnLivenessWatchdogDefersForPending( + hasPendingRuntimeRequest: boolean, +): boolean { + return hasPendingRuntimeRequest; +} + export interface AcpAdapterV2Flavor { readonly driver: ProviderDriverKind; readonly capabilities: OrchestrationV2ProviderCapabilities; @@ -130,6 +180,22 @@ export interface AcpAdapterV2Flavor { toolCall: AcpToolCallState, ) => AcpAdapterV2SubagentUpdate | undefined; readonly assertComplete?: Effect.Effect; + /** + * Grok can finish streaming on the root session while `session/prompt` stays + * hung. xAI `prompt_complete` is subagent-scoped, so settle the root turn + * once root assistant output is idle. Reasoning-only or tool-only phases may + * precede further root work and must not terminalize the turn. Settlement is + * debounced and rescheduled on each ingested root session update so follow-on + * tool calls and assistant replies still attach to the active turn. + */ + readonly settleRootTurnWhenIdle?: boolean; + /** Interrupt the local prompt fiber before `session/cancel` (Grok wedged prompts). */ + readonly interruptPromptOnCancel?: boolean; + /** + * When set, fail the turn if no root session activity arrives within this window + * after `session/prompt` starts and no output has been ingested yet. + */ + readonly rootTurnPromptLivenessMs?: number; } export interface AcpAdapterV2SubagentUpdate { @@ -565,6 +631,43 @@ interface ActiveAcpTurn { } | null; interrupted: boolean; finalized: boolean; + settleScheduleGeneration: number; + promptLivenessGeneration: number; + hasRootActivity: boolean; +} + +export function acpRootTurnHasIngestedOutput(context: { + readonly assistant: ActiveTextStream; + readonly reasoning: ActiveTextStream; + readonly tools: ReadonlyMap; + readonly plan: unknown; +}): boolean { + return ( + context.assistant.nextSegment > 0 || + context.reasoning.nextSegment > 0 || + context.tools.size > 0 || + context.plan !== null + ); +} + +/** True when a root session/update carries ingestible turn output, not keepalive noise. */ +export function acpRootSessionUpdateIngestsOutput( + notification: EffectAcpSchema.SessionNotification, +): boolean { + const update = notification.update; + switch (update.sessionUpdate) { + case "agent_message_chunk": + case "agent_thought_chunk": + return update.content.type === "text" && update.content.text.length > 0; + case "tool_call": + case "tool_call_update": + case "plan": + return parseSessionUpdateEvent(notification).events.some( + (event) => event._tag === "ToolCallUpdated" || event._tag === "PlanUpdated", + ); + default: + return false; + } } interface ActiveAcpSubagent { @@ -578,6 +681,17 @@ interface ActiveAcpSubagent { nextChildOrdinal: number; } +function acpTurnHasPendingRuntimeRequest( + providerTurnId: OrchestrationV2ProviderTurn["id"], + pending: ReadonlyMap, +): boolean { + return [...pending.values()].some( + (request) => + request.runtimeRequest.providerTurnId === providerTurnId && + request.runtimeRequest.status === "pending", + ); +} + type PendingRuntimeRequest = { readonly requestId: RuntimeRequestId; readonly runtimeRequest: OrchestrationV2RuntimeRequest; @@ -632,13 +746,16 @@ export function makeAcpAdapterV2(options: AcpAdapterV2Options): ProviderAdapterV const emitProviderEvent = (event: ProviderAdapterV2Event) => Queue.offer(events, event).pipe(Effect.asVoid); + let scheduleSettleRootTurnWhenIdle = (_context: ActiveAcpTurn) => Effect.void; + let schedulePromptLivenessWatchdog = (_context: ActiveAcpTurn) => Effect.void; + let rearmRootTurnRecoveryTimers = (_context: ActiveAcpTurn) => Effect.void; const nativeLogging = options.nativeLogging?.(input.threadId); const runtime = yield* flavor.makeRuntime({ cwd: input.runtimePolicy.cwd ?? process.cwd(), mcpServers: acpMcpServers(input.threadId), - interruptPromptOnCancel: false, + interruptPromptOnCancel: flavor.interruptPromptOnCancel ?? false, clientCapabilities: { fs: { readTextFile: false, writeTextFile: false }, terminal: false, @@ -813,6 +930,9 @@ export function makeAcpAdapterV2(options: AcpAdapterV2Options): ProviderAdapterV if (stream.current === null) return; yield* emitTextSegment(context, kind, true); stream.current = null; + if (kind === "assistant") { + yield* scheduleSettleRootTurnWhenIdle(context); + } }); const closeTextStreams = Effect.fnUntraced(function* (context: ActiveAcpTurn) { @@ -1465,12 +1585,12 @@ export function makeAcpAdapterV2(options: AcpAdapterV2Options): ProviderAdapterV if (update.content.type === "text") { yield* appendText(context, "assistant", update.content.text); } - return; + break; case "agent_thought_chunk": if (update.content.type === "text") { yield* appendText(context, "reasoning", update.content.text); } - return; + break; default: { const parsed = parseSessionUpdateEvent(notification); for (const event of parsed.events) { @@ -1482,6 +1602,10 @@ export function makeAcpAdapterV2(options: AcpAdapterV2Options): ProviderAdapterV } } } + if (acpRootSessionUpdateIngestsOutput(notification)) { + context.hasRootActivity = true; + yield* scheduleSettleRootTurnWhenIdle(context); + } }); yield* runtime.handleSessionUpdate((notification) => @@ -1609,10 +1733,13 @@ export function makeAcpAdapterV2(options: AcpAdapterV2Options): ProviderAdapterV }); const resolved = yield* Deferred.await(decision).pipe( Effect.ensuring( - Ref.update(pendingRuntimeRequests, (current) => { - const updated = new Map(current); - updated.delete(String(requestId)); - return updated; + Effect.gen(function* () { + yield* Ref.update(pendingRuntimeRequests, (current) => { + const updated = new Map(current); + updated.delete(String(requestId)); + return updated; + }); + yield* rearmRootTurnRecoveryTimers(context); }), ), ); @@ -1764,10 +1891,13 @@ export function makeAcpAdapterV2(options: AcpAdapterV2Options): ProviderAdapterV }); return yield* Deferred.await(answers).pipe( Effect.ensuring( - Ref.update(pendingRuntimeRequests, (current) => { - const updated = new Map(current); - updated.delete(String(requestId)); - return updated; + Effect.gen(function* () { + yield* Ref.update(pendingRuntimeRequests, (current) => { + const updated = new Map(current); + updated.delete(String(requestId)); + return updated; + }); + yield* rearmRootTurnRecoveryTimers(context); }), ), ); @@ -1994,13 +2124,25 @@ export function makeAcpAdapterV2(options: AcpAdapterV2Options): ProviderAdapterV completedAt, }); + const drainTrailingRootTurnChunks = Effect.fnUntraced(function* () { + if (!flavor.settleRootTurnWhenIdle) return; + // Projected via handleSessionUpdate, not getEvents(). Cooperative yield + // only — replay uses TestClock; Effect.sleep here would stall settlement. + yield* Effect.yieldNow; + yield* Effect.yieldNow; + }); + const finalizeTurn = Effect.fnUntraced(function* ( context: ActiveAcpTurn, status: "completed" | "interrupted" | "failed" | "cancelled", failure?: OrchestrationV2ProviderFailure, + options?: { readonly drainTrailingChunks?: boolean }, ) { if (context.finalized) return; context.finalized = true; + if (options?.drainTrailingChunks === true) { + yield* drainTrailingRootTurnChunks(); + } yield* closeTextStreams(context); const now = yield* DateTime.now; const turn = providerTurnPayload(context, status, now); @@ -2059,6 +2201,95 @@ export function makeAcpAdapterV2(options: AcpAdapterV2Options): ProviderAdapterV yield* Deferred.succeed(context.completed, undefined).pipe(Effect.ignore); }); + const trySettleRootTurnWhenIdle = Effect.fnUntraced(function* (context: ActiveAcpTurn) { + const pending = yield* Ref.get(pendingRuntimeRequests); + const hasPendingRuntimeRequest = acpTurnHasPendingRuntimeRequest( + context.providerTurnId, + pending, + ); + const hasRunningTool = [...context.tools.values()].some((tool) => { + const status = toolStatus(tool.status); + return status === "pending" || status === "running"; + }); + // Debounce already proved root-session quiescence; open segment handles + // without an explicit close should not block settlement. + const hasRunningSubagent = [...context.subagents.values()].some( + (subagent) => subagent.task.status === "running", + ); + if ( + !acpRootTurnIsIdle({ + finalized: context.finalized, + interrupted: context.interrupted, + assistantStreamOpen: false, + reasoningStreamOpen: false, + hasRunningTool, + hasPendingRuntimeRequest, + hasToolHistory: context.tools.size > 0, + hasRunningSubagent, + hasOutput: context.assistant.nextSegment > 0, + }) + ) { + return; + } + yield* finalizeTurn(context, "completed", undefined, { drainTrailingChunks: true }); + yield* runtime.cancel.pipe(Effect.ignore); + }); + + schedulePromptLivenessWatchdog = (context) => + Effect.gen(function* () { + const livenessMs = flavor.rootTurnPromptLivenessMs; + if (livenessMs === undefined || livenessMs <= 0) return; + context.promptLivenessGeneration += 1; + const generation = context.promptLivenessGeneration; + yield* Effect.gen(function* () { + yield* Effect.sleep(`${livenessMs} millis`); + if (context.finalized || context.interrupted) return; + if (context.promptLivenessGeneration !== generation) return; + const active = yield* Ref.get(activeTurn); + if (active !== context) return; + const pending = yield* Ref.get(pendingRuntimeRequests); + if ( + acpRootTurnLivenessWatchdogDefersForPending( + acpTurnHasPendingRuntimeRequest(context.providerTurnId, pending), + ) + ) { + return; + } + if (context.hasRootActivity || acpRootTurnHasIngestedOutput(context)) return; + yield* runtime.cancel.pipe(Effect.ignore); + yield* finalizeTurn( + context, + "failed", + makeProviderFailure({ + class: "provider_error", + message: "Grok prompt produced no root session activity", + }), + ); + }).pipe(Effect.forkIn(sessionScope), Effect.asVoid); + }); + + scheduleSettleRootTurnWhenIdle = (context) => + Effect.gen(function* () { + if (!flavor.settleRootTurnWhenIdle) return; + context.settleScheduleGeneration += 1; + const generation = context.settleScheduleGeneration; + yield* Effect.gen(function* () { + yield* Effect.sleep(`${acpRootTurnSettleDebounceMs} millis`); + if (context.finalized || context.interrupted) return; + if (context.settleScheduleGeneration !== generation) return; + const active = yield* Ref.get(activeTurn); + if (active !== context) return; + yield* trySettleRootTurnWhenIdle(context); + }).pipe(Effect.forkIn(sessionScope), Effect.asVoid); + }); + + rearmRootTurnRecoveryTimers = (context) => + Effect.gen(function* () { + if (!acpRootTurnShouldRearmRecoveryTimers(context)) return; + yield* scheduleSettleRootTurnWhenIdle(context); + yield* schedulePromptLivenessWatchdog(context); + }); + const resolvePromptParts = Effect.fnUntraced(function* ( turnInput: ProviderAdapterV2TurnInput, ) { @@ -2166,6 +2397,9 @@ export function makeAcpAdapterV2(options: AcpAdapterV2Options): ProviderAdapterV plan: null, interrupted: false, finalized: false, + settleScheduleGeneration: 0, + promptLivenessGeneration: 0, + hasRootActivity: false, }; yield* Ref.set(activeTurn, context); const runningTurn = providerTurnPayload(context, "running", null); @@ -2204,35 +2438,39 @@ export function makeAcpAdapterV2(options: AcpAdapterV2Options): ProviderAdapterV createdAt: startedAt, updatedAt: startedAt, }); + yield* schedulePromptLivenessWatchdog(context); yield* runtime.prompt({ prompt }).pipe( Effect.flatMap((result) => { + if (context.finalized) return Effect.void; const status = result.stopReason === "cancelled" ? context.interrupted ? "interrupted" : "cancelled" : "completed"; - return finalizeTurn(context, status); + return finalizeTurn(context, status, undefined, { drainTrailingChunks: true }); }), Effect.catchCause((cause) => - finalizeTurn( - context, - context.interrupted ? "interrupted" : "failed", - makeProviderFailure({ - cause: Cause.squash(cause), - class: "provider_error", - }), - ).pipe( - Effect.andThen( - Effect.logWarning("orchestration-v2.acp-prompt-failed", { - driver, - providerSessionId: input.providerSessionId, - providerThreadId: turnInput.providerThread.id, - providerTurnId, - cause, - }), - ), - ), + context.finalized + ? Effect.void + : finalizeTurn( + context, + context.interrupted ? "interrupted" : "failed", + makeProviderFailure({ + cause: Cause.squash(cause), + class: "provider_error", + }), + ).pipe( + Effect.andThen( + Effect.logWarning("orchestration-v2.acp-prompt-failed", { + driver, + providerSessionId: input.providerSessionId, + providerThreadId: turnInput.providerThread.id, + providerTurnId, + cause, + }), + ), + ), ), Effect.forkIn(sessionScope), ); diff --git a/apps/server/src/orchestration-v2/Adapters/GrokAdapterV2.test.ts b/apps/server/src/orchestration-v2/Adapters/GrokAdapterV2.test.ts index 1a65a39aab8..9ddd4da80cd 100644 --- a/apps/server/src/orchestration-v2/Adapters/GrokAdapterV2.test.ts +++ b/apps/server/src/orchestration-v2/Adapters/GrokAdapterV2.test.ts @@ -2,7 +2,18 @@ import { assert, describe, it } from "@effect/vitest"; import type * as EffectAcpSchema from "effect-acp/schema"; import { ProviderAdapterV2RuntimePolicy } from "../ProviderAdapter.ts"; -import { AcpProviderCapabilitiesV2, acpPermissionDisposition } from "./AcpAdapterV2.ts"; +import { + AcpProviderCapabilitiesV2, + acpPermissionDisposition, + acpRootSessionUpdateIngestsOutput, + acpRootTurnCompletionDrainMs, + acpRootTurnHasIngestedOutput, + acpRootTurnIsIdle, + acpRootTurnLivenessWatchdogDefersForPending, + acpRootTurnPromptLivenessMs, + acpRootTurnSettleDebounceMs, + acpRootTurnShouldRearmRecoveryTimers, +} from "./AcpAdapterV2.ts"; import { GrokProviderCapabilitiesV2 } from "./GrokAdapterV2.ts"; function permissionRequest( @@ -37,6 +48,161 @@ function runtimePolicy(input: { }); } +describe("acpRootTurnSettleDebounceMs", () => { + it("allows tool calls to land after a brief assistant preamble", () => { + assert.equal(acpRootTurnSettleDebounceMs, 2_000); + }); +}); + +describe("acpRootTurnPromptLivenessMs", () => { + it("fails silent post-prompt hangs before the user waits hours", () => { + assert.equal(acpRootTurnPromptLivenessMs, 30_000); + }); +}); + +describe("acpRootTurnCompletionDrainMs", () => { + it("gives trailing root chunks a short landing window", () => { + assert.equal(acpRootTurnCompletionDrainMs, 100); + }); +}); + +describe("acpRootSessionUpdateIngestsOutput", () => { + const sessionId = "session-1"; + + it("ignores empty assistant chunks used as Grok keepalives", () => { + assert.isFalse( + acpRootSessionUpdateIngestsOutput({ + sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "" }, + }, + }), + ); + }); + + it("accepts non-empty assistant and reasoning chunks", () => { + assert.isTrue( + acpRootSessionUpdateIngestsOutput({ + sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "hello" }, + }, + }), + ); + assert.isTrue( + acpRootSessionUpdateIngestsOutput({ + sessionId, + update: { + sessionUpdate: "agent_thought_chunk", + content: { type: "text", text: "thinking" }, + }, + }), + ); + }); + + it("accepts tool and plan updates", () => { + assert.isTrue( + acpRootSessionUpdateIngestsOutput({ + sessionId, + update: { + sessionUpdate: "tool_call", + toolCallId: "tool-1", + title: "Read", + kind: "read", + status: "pending", + }, + }), + ); + assert.isTrue( + acpRootSessionUpdateIngestsOutput({ + sessionId, + update: { + sessionUpdate: "plan", + entries: [{ content: "Step 1", status: "pending", priority: "medium" }], + }, + }), + ); + }); +}); + +describe("acpRootTurnHasIngestedOutput", () => { + const empty = { + assistant: { current: null, nextSegment: 0 }, + reasoning: { current: null, nextSegment: 0 }, + tools: new Map(), + plan: null, + } as const; + + it("is false before any root turn items land", () => { + assert.isFalse(acpRootTurnHasIngestedOutput(empty)); + }); + + it("is true once assistant segments have streamed", () => { + assert.isTrue( + acpRootTurnHasIngestedOutput({ + ...empty, + assistant: { current: null, nextSegment: 1 }, + }), + ); + }); +}); + +describe("acpRootTurn recovery timer re-arm", () => { + it("re-arms settle and watchdog after pending clears on active turns", () => { + assert.isTrue(acpRootTurnShouldRearmRecoveryTimers({ finalized: false, interrupted: false })); + }); + + it("skips re-arm when the turn is already terminal", () => { + assert.isFalse(acpRootTurnShouldRearmRecoveryTimers({ finalized: true, interrupted: false })); + assert.isFalse(acpRootTurnShouldRearmRecoveryTimers({ finalized: false, interrupted: true })); + }); + + it("defers the liveness watchdog tick while approval or user-input is pending", () => { + assert.isTrue(acpRootTurnLivenessWatchdogDefersForPending(true)); + assert.isFalse(acpRootTurnLivenessWatchdogDefersForPending(false)); + }); +}); + +describe("acpRootTurnIsIdle", () => { + const idle = { + finalized: false, + interrupted: false, + assistantStreamOpen: false, + reasoningStreamOpen: false, + hasRunningTool: false, + hasPendingRuntimeRequest: false, + hasToolHistory: false, + hasRunningSubagent: false, + hasOutput: true, + } as const; + + it("is false while assistant text is still streaming", () => { + assert.isFalse(acpRootTurnIsIdle({ ...idle, assistantStreamOpen: true })); + }); + + it("is false while a tool is running", () => { + assert.isFalse(acpRootTurnIsIdle({ ...idle, hasRunningTool: true })); + }); + + it("is false after tool history until the prompt RPC completes", () => { + assert.isFalse(acpRootTurnIsIdle({ ...idle, hasToolHistory: true })); + }); + + it("is false while a native subagent task is still running", () => { + assert.isFalse(acpRootTurnIsIdle({ ...idle, hasRunningSubagent: true })); + }); + + it("is false when only reasoning or tools have streamed", () => { + assert.isFalse(acpRootTurnIsIdle({ ...idle, hasOutput: false })); + }); + + it("is true when root assistant output is quiescent", () => { + assert.isTrue(acpRootTurnIsIdle(idle)); + }); +}); + describe("GrokAdapterV2 capabilities", () => { it("keeps optional protocol features conservative until a flavor or handshake confirms them", () => { assert.isFalse(AcpProviderCapabilitiesV2.sessions.supportsModelSwitchInSession); diff --git a/apps/server/src/orchestration-v2/Adapters/GrokAdapterV2.ts b/apps/server/src/orchestration-v2/Adapters/GrokAdapterV2.ts index f74db916d83..ab995da9d41 100644 --- a/apps/server/src/orchestration-v2/Adapters/GrokAdapterV2.ts +++ b/apps/server/src/orchestration-v2/Adapters/GrokAdapterV2.ts @@ -40,6 +40,7 @@ import { } from "../ProviderAdapterDriver.ts"; import { AcpProviderCapabilitiesV2, + acpRootTurnPromptLivenessMs, makeAcpAdapterV2, type AcpAdapterV2Flavor, type AcpAdapterV2RuntimeInput, @@ -132,12 +133,16 @@ export function makeGrokAdapterV2(options: GrokAdapterV2Options) { const flavor: AcpAdapterV2Flavor = { driver: GROK_PROVIDER, capabilities: GrokProviderCapabilitiesV2, + settleRootTurnWhenIdle: true, + interruptPromptOnCancel: false, + rootTurnPromptLivenessMs: acpRootTurnPromptLivenessMs, resolveModelId: (selection) => resolveGrokAcpBaseModelId(selection.model), makeRuntime: options.makeRuntime ?? ((input) => makeGrokAcpRuntime({ ...input, + interruptPromptOnCancel: input.interruptPromptOnCancel ?? false, grokSettings: options.settings, environment: options.environment, childProcessSpawner: options.childProcessSpawner, diff --git a/apps/server/src/orchestration-v2/ProviderSelectionTransition.test.ts b/apps/server/src/orchestration-v2/ProviderSelectionTransition.test.ts index ee8114b6938..f2da22ad7fd 100644 --- a/apps/server/src/orchestration-v2/ProviderSelectionTransition.test.ts +++ b/apps/server/src/orchestration-v2/ProviderSelectionTransition.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, it } from "vitest"; +import { describe, expect, it } from "vite-plus/test"; import { type ModelSelection, diff --git a/apps/server/src/provider/acp/XAiAcpExtension.test.ts b/apps/server/src/provider/acp/XAiAcpExtension.test.ts index c3318c175a0..c6ebbb0ab06 100644 --- a/apps/server/src/provider/acp/XAiAcpExtension.test.ts +++ b/apps/server/src/provider/acp/XAiAcpExtension.test.ts @@ -1,10 +1,7 @@ -// @effect-diagnostics nodeBuiltinImport:off -import * as NodePath from "node:path"; -import * as NodeURL from "node:url"; - -import * as NodeServices from "@effect/platform-node/NodeServices"; import { it } from "@effect/vitest"; +import * as Deferred from "effect/Deferred"; import * as Effect from "effect/Effect"; +import * as Fiber from "effect/Fiber"; import * as Schema from "effect/Schema"; import { describe, expect } from "vite-plus/test"; @@ -18,24 +15,6 @@ import { } from "./XAiAcpExtension.ts"; import * as AcpSessionRuntime from "./AcpSessionRuntime.ts"; -const __dirname = NodePath.dirname(NodeURL.fileURLToPath(import.meta.url)); -const mockAgentPath = NodePath.join(__dirname, "../../../scripts/acp-mock-agent.ts"); - -const makePromptCompletionRuntime = (env: NodeJS.ProcessEnv) => - Effect.gen(function* () { - const runtime = yield* AcpSessionRuntime.make({ - spawn: { - command: process.execPath, - args: [mockAgentPath], - env, - }, - cwd: process.cwd(), - clientInfo: { name: "t3-test", version: "0.0.0" }, - authMethodId: "test", - }); - return yield* makeXAiPromptCompletionRuntime(runtime); - }); - const decodeXAiAskUserQuestionRequest = Schema.decodeUnknownSync(XAiAskUserQuestionRequest); describe("XAiAcpExtension", () => { @@ -335,58 +314,119 @@ describe("XAiAcpExtension", () => { }); }); - it.effect("resolves a hung standard prompt from xAI prompt completion", () => + it.effect("settles a hung prompt from a root-session prompt_complete notification", () => Effect.gen(function* () { - const runtime = yield* makePromptCompletionRuntime({ - T3_ACP_EMIT_XAI_PROMPT_COMPLETE_THEN_HANG: "1", - }); - yield* runtime.start(); - - const promptResult = yield* runtime.prompt({ - prompt: [{ type: "text", text: "hi" }], - }); - const promptId = promptResult._meta?.promptId; + let promptCompleteHandler: + | ((notification: { + readonly sessionId: string; + readonly promptId?: string; + readonly stopReason?: string; + }) => Effect.Effect) + | null = null; + const hungPrompt = yield* Deferred.make(); + const baseRuntime = { + start: () => + Effect.succeed({ + sessionId: "root-session", + initializeResult: {}, + sessionSetupResult: {}, + modelConfigId: undefined, + }), + prompt: () => Deferred.await(hungPrompt), + cancel: Effect.void, + handleExtNotification: ( + _method: string, + _schema: unknown, + handler: (notification: { + readonly sessionId: string; + readonly promptId?: string; + readonly stopReason?: string; + }) => Effect.Effect, + ) => { + promptCompleteHandler = handler; + return Effect.void; + }, + handleExtRequest: () => Effect.void, + } as unknown as AcpSessionRuntime.AcpSessionRuntime["Service"]; - expect(typeof promptId).toBe("string"); - expect(promptResult).toMatchObject({ + const runtime = yield* makeXAiPromptCompletionRuntime(baseRuntime); + const promptFiber = yield* runtime + .prompt({ prompt: [{ type: "text", text: "hi" }] }) + .pipe(Effect.forkChild); + yield* Effect.yieldNow; + expect(promptCompleteHandler).not.toBeNull(); + yield* promptCompleteHandler!({ + sessionId: "root-session", stopReason: "end_turn", - _meta: { - sessionId: "mock-session-1", - promptId, - requestId: promptId, - }, }); - }).pipe(Effect.scoped, Effect.provide(NodeServices.layer)), + const response = yield* Fiber.join(promptFiber); + expect(response.stopReason).toBe("end_turn"); + }), ); - it.effect("ignores stale xAI completion from an already settled prompt", () => + it.effect("ignores prompt_complete notifications for foreign session ids", () => Effect.gen(function* () { - const runtime = yield* makePromptCompletionRuntime({ - T3_ACP_EMIT_STALE_XAI_PROMPT_COMPLETE_BEFORE_SECOND_HANG: "1", - }); - yield* runtime.start(); + let promptCompleteHandler: + | ((notification: { readonly sessionId: string }) => Effect.Effect) + | null = null; + const hungPrompt = yield* Deferred.make(); + const baseRuntime = { + start: () => + Effect.succeed({ + sessionId: "root-session", + initializeResult: {}, + sessionSetupResult: {}, + modelConfigId: undefined, + }), + prompt: () => Deferred.await(hungPrompt), + cancel: Effect.void, + handleExtNotification: ( + _method: string, + _schema: unknown, + handler: (notification: { readonly sessionId: string }) => Effect.Effect, + ) => { + promptCompleteHandler = handler; + return Effect.void; + }, + handleExtRequest: () => Effect.void, + } as unknown as AcpSessionRuntime.AcpSessionRuntime["Service"]; - const firstPromptResult = yield* runtime.prompt({ - prompt: [{ type: "text", text: "first" }], - }); - expect(firstPromptResult).toMatchObject({ - stopReason: "end_turn", - _meta: { promptId: "mock-stale-xai-prompt-1" }, + const runtime = yield* makeXAiPromptCompletionRuntime(baseRuntime); + const promptFiber = yield* runtime + .prompt({ prompt: [{ type: "text", text: "hi" }] }) + .pipe(Effect.forkChild); + yield* Effect.yieldNow; + yield* promptCompleteHandler!({ + sessionId: "child-session", }); + yield* Effect.yieldNow; + expect(promptFiber.pollUnsafe()).toBeUndefined(); + yield* Fiber.interrupt(promptFiber); + }), + ); - const secondPromptResult = yield* runtime.prompt({ - prompt: [{ type: "text", text: "second" }], - }); - const secondPromptId = secondPromptResult._meta?.promptId; - expect(typeof secondPromptId).toBe("string"); - expect(secondPromptId).not.toBe("mock-stale-xai-prompt-1"); - expect(secondPromptResult).toMatchObject({ - stopReason: "end_turn", - _meta: { - promptId: secondPromptId, - requestId: secondPromptId, + it.effect("injects promptId and requestId into prompt _meta", () => + Effect.gen(function* () { + let capturedMeta: Record | null | undefined; + const baseRuntime = { + start: () => Effect.succeed({ sessionId: "session-1" }), + prompt: (payload: { readonly _meta?: Record | null }) => { + capturedMeta = payload._meta ?? null; + return Effect.succeed({ stopReason: "end_turn" as const }); }, + cancel: Effect.void, + handleExtNotification: () => Effect.void, + handleExtRequest: () => Effect.void, + } as unknown as AcpSessionRuntime.AcpSessionRuntime["Service"]; + + const runtime = yield* makeXAiPromptCompletionRuntime(baseRuntime); + yield* runtime.prompt({ prompt: [{ type: "text", text: "hi" }] }); + + expect(typeof capturedMeta?.promptId).toBe("string"); + expect(capturedMeta).toMatchObject({ + promptId: capturedMeta?.promptId, + requestId: capturedMeta?.promptId, }); - }).pipe(Effect.scoped, Effect.provide(NodeServices.layer)), + }), ); }); diff --git a/apps/server/src/provider/acp/XAiAcpExtension.ts b/apps/server/src/provider/acp/XAiAcpExtension.ts index 77bc9770114..25b0824292c 100644 --- a/apps/server/src/provider/acp/XAiAcpExtension.ts +++ b/apps/server/src/provider/acp/XAiAcpExtension.ts @@ -1,6 +1,8 @@ import type { ProviderUserInputAnswers, UserInputQuestion } from "@t3tools/contracts"; +import * as Cause from "effect/Cause"; import * as Deferred from "effect/Deferred"; import * as Effect from "effect/Effect"; +import * as Fiber from "effect/Fiber"; import * as Ref from "effect/Ref"; import * as Schema from "effect/Schema"; import type * as EffectAcpSchema from "effect-acp/schema"; @@ -8,6 +10,9 @@ import type * as EffectAcpSchema from "effect-acp/schema"; import type * as AcpSessionRuntime from "./AcpSessionRuntime.ts"; import type { AcpToolCallState } from "./AcpRuntimeModel.ts"; +const xAiStopReasonMissingMetaKey = "xAiStopReasonMissing"; +const completedXAiPromptIdLimit = 128; + const XAiPromptCompleteNotification = Schema.Struct({ sessionId: Schema.String, promptId: Schema.optional(Schema.String), @@ -23,9 +28,6 @@ interface PendingXAiPromptCompletion { readonly deferred: Deferred.Deferred; } -const completedXAiPromptIdLimit = 128; -const xAiStopReasonMissingMetaKey = "xAiStopReasonMissing"; - export interface XAiAcpSubagentUpdate { readonly nativeTaskId: string; readonly prompt: string; @@ -283,82 +285,50 @@ export function makeXAiAskUserQuestionCancelledResponse(): XAiAskUserQuestionCan return { outcome: "cancelled" }; } -/** - * Adds Grok's private prompt-completion fallback around a standards-only ACP runtime. - * The underlying runtime remains unaware of xAI methods and metadata. - */ -export const makeXAiPromptCompletionRuntime = Effect.fn("makeXAiPromptCompletionRuntime")( - function* (runtime: AcpSessionRuntime.AcpSessionRuntime["Service"]) { - const activeSessionIdRef = yield* Ref.make(undefined); - const pendingRef = yield* Ref.make>([]); - const completedPromptIdsRef = yield* Ref.make>([]); - let nextPromptFallbackId = 0; - const allocatePromptFallbackId = Effect.sync(() => { - nextPromptFallbackId += 1; - return `t3-xai-prompt-${nextPromptFallbackId}`; - }); - - yield* runtime.handleExtNotification( - "_x.ai/session/prompt_complete", - XAiPromptCompleteNotification, - (notification) => - resolveXAiPromptCompletionFallback({ - pendingRef, - completedPromptIdsRef, - notification, - }), - ); - - return { - ...runtime, - start: () => - runtime - .start() - .pipe(Effect.tap((started) => Ref.set(activeSessionIdRef, started.sessionId))), - prompt: (payload) => - Effect.gen(function* () { - const sessionId = yield* Ref.get(activeSessionIdRef); - if (sessionId === undefined) { - return yield* runtime.prompt(payload); - } +function promptIdFromResponse(response: EffectAcpSchema.PromptResponse): string | undefined { + const meta = response._meta; + if (meta === null || typeof meta !== "object") { + return undefined; + } + const promptId = meta.promptId ?? meta.requestId; + return typeof promptId === "string" && promptId.length > 0 ? promptId : undefined; +} - const promptId = yield* allocatePromptFallbackId; - const fallback = yield* registerXAiPromptCompletionFallback( - pendingRef, - sessionId, - promptId, - ); - const requestPayload = { - ...payload, - _meta: { - ...payload._meta, - promptId: fallback.promptId, - requestId: fallback.promptId, - }, - } satisfies Omit; +function normalizeXAiStopReason(value: string | undefined): EffectAcpSchema.StopReason { + switch (value) { + case "cancelled": + case "end_turn": + case "max_tokens": + case "max_turn_requests": + case "refusal": + return value; + default: + return "end_turn"; + } +} - return yield* Effect.raceFirst( - runtime.prompt(requestPayload), - Deferred.await(fallback.deferred), - ).pipe( - Effect.tap((response) => - rememberCompletedXAiPromptId(completedPromptIdsRef, response, fallback.promptId), - ), - Effect.ensuring(unregisterXAiPromptCompletionFallback(pendingRef, fallback.deferred)), - ); - }), - cancel: Ref.get(activeSessionIdRef).pipe( - Effect.flatMap((sessionId) => - sessionId === undefined - ? runtime.cancel - : abortPendingPromptCompletions(pendingRef, sessionId).pipe( - Effect.andThen(runtime.cancel), - ), - ), - ), - } satisfies AcpSessionRuntime.AcpSessionRuntime["Service"]; - }, -); +function promptResponseFromXAi( + notification: XAiPromptCompleteNotification, +): EffectAcpSchema.PromptResponse { + const stopReason = normalizeXAiStopReason(notification.stopReason); + const meta: Record = { + sessionId: notification.sessionId, + }; + if (notification.stopReason === undefined) { + meta[xAiStopReasonMissingMetaKey] = true; + } + if (notification.promptId !== undefined) { + meta.promptId = notification.promptId; + meta.requestId = notification.promptId; + } + if (notification.agentResult !== undefined) { + meta.agentResult = notification.agentResult; + } + return { + stopReason, + _meta: meta, + }; +} const registerXAiPromptCompletionFallback = ( pendingRef: Ref.Ref>, @@ -436,7 +406,15 @@ const resolveXAiPromptCompletionFallback = ({ entry.sessionId === notification.sessionId && entry.promptId === notification.promptId, ) - : pending.findIndex((entry) => entry.sessionId === notification.sessionId); + : (() => { + const sessionPendingIndexes = pending.flatMap((entry, entryIndex) => + entry.sessionId === notification.sessionId ? [entryIndex] : [], + ); + if (sessionPendingIndexes.length === 0) { + return -1; + } + return sessionPendingIndexes[0] ?? -1; + })(); if (index < 0) { return [Effect.void, pending] as const; } @@ -458,6 +436,9 @@ const rememberCompletedXAiPromptId = ( fallbackPromptId: string, ) => { const promptId = promptIdFromResponse(response) ?? fallbackPromptId; + if (promptId.length === 0) { + return Effect.void; + } return Ref.update(completedPromptIdsRef, (completedPromptIds) => { if (completedPromptIds.includes(promptId)) { return completedPromptIds; @@ -466,14 +447,93 @@ const rememberCompletedXAiPromptId = ( }); }; -function promptIdFromResponse(response: EffectAcpSchema.PromptResponse): string | undefined { - const meta = response._meta; - if (meta === null || typeof meta !== "object") { - return undefined; - } - const promptId = meta.promptId ?? meta.requestId; - return typeof promptId === "string" && promptId.length > 0 ? promptId : undefined; -} +/** + * Grok-specific ACP runtime wrapper. Races `session/prompt` against root-session + * `_x.ai/session/prompt_complete` notifications (sessionId-matched). Subagent + * completions on foreign session ids are ignored by the pending-entry gate. + */ +export const makeXAiPromptCompletionRuntime = Effect.fn("makeXAiPromptCompletionRuntime")( + function* (runtime: AcpSessionRuntime.AcpSessionRuntime["Service"]) { + let nextPromptFallbackId = 0; + const allocatePromptFallbackId = Effect.sync(() => { + nextPromptFallbackId += 1; + return `t3-xai-prompt-${nextPromptFallbackId}`; + }); + const pendingXAiPromptCompletionsRef = yield* Ref.make< + ReadonlyArray + >([]); + const completedXAiPromptIdsRef = yield* Ref.make>([]); + + yield* runtime.handleExtNotification( + "_x.ai/session/prompt_complete", + XAiPromptCompleteNotification, + (notification) => + resolveXAiPromptCompletionFallback({ + pendingRef: pendingXAiPromptCompletionsRef, + completedPromptIdsRef: completedXAiPromptIdsRef, + notification, + }).pipe(Effect.catch(() => Effect.void)), + ); + + return { + ...runtime, + prompt: (payload) => + Effect.gen(function* () { + const started = yield* runtime.start(); + const promptId = yield* allocatePromptFallbackId; + const fallback = yield* registerXAiPromptCompletionFallback( + pendingXAiPromptCompletionsRef, + started.sessionId, + promptId, + ); + const cancelledResponse = promptResponseFromXAi({ + sessionId: started.sessionId, + promptId: fallback.promptId, + stopReason: "cancelled", + agentResult: null, + }); + const promptRpcFiber = yield* runtime + .prompt({ + ...payload, + _meta: { + ...payload._meta, + promptId: fallback.promptId, + requestId: fallback.promptId, + }, + }) + .pipe(Effect.forkChild); + return yield* Effect.raceFirst( + Fiber.join(promptRpcFiber).pipe( + Effect.catchCause((cause) => + Cause.hasInterruptsOnly(cause) + ? Effect.succeed(cancelledResponse) + : Effect.failCause(cause), + ), + ), + Deferred.await(fallback.deferred), + ).pipe( + Effect.tap((response) => + rememberCompletedXAiPromptId(completedXAiPromptIdsRef, response, fallback.promptId), + ), + Effect.ensuring( + Effect.gen(function* () { + yield* Fiber.interrupt(promptRpcFiber).pipe(Effect.ignore); + yield* unregisterXAiPromptCompletionFallback( + pendingXAiPromptCompletionsRef, + fallback.deferred, + ); + }), + ), + ); + }), + cancel: Effect.gen(function* () { + const started = yield* runtime.start(); + yield* abortPendingPromptCompletions(pendingXAiPromptCompletionsRef, started.sessionId); + yield* runtime.cancel; + }), + } satisfies AcpSessionRuntime.AcpSessionRuntime["Service"]; + }, +); export function promptResponseHasMissingXAiStopReason( response: EffectAcpSchema.PromptResponse, @@ -481,39 +541,3 @@ export function promptResponseHasMissingXAiStopReason( const meta = response._meta; return meta !== null && typeof meta === "object" && meta[xAiStopReasonMissingMetaKey] === true; } - -function promptResponseFromXAi( - notification: XAiPromptCompleteNotification, -): EffectAcpSchema.PromptResponse { - const stopReason = normalizeXAiStopReason(notification.stopReason); - const meta: Record = { - sessionId: notification.sessionId, - }; - if (notification.stopReason === undefined) { - meta[xAiStopReasonMissingMetaKey] = true; - } - if (notification.promptId !== undefined) { - meta.promptId = notification.promptId; - meta.requestId = notification.promptId; - } - if (notification.agentResult !== undefined) { - meta.agentResult = notification.agentResult; - } - return { - stopReason, - _meta: meta, - }; -} - -function normalizeXAiStopReason(value: string | undefined): EffectAcpSchema.StopReason { - switch (value) { - case "cancelled": - case "end_turn": - case "max_tokens": - case "max_turn_requests": - case "refusal": - return value; - default: - return "end_turn"; - } -} diff --git a/apps/web/src/components/ChatView.logic.test.ts b/apps/web/src/components/ChatView.logic.test.ts index 4ae4464cfd4..dc24b5b264a 100644 --- a/apps/web/src/components/ChatView.logic.test.ts +++ b/apps/web/src/components/ChatView.logic.test.ts @@ -1,4 +1,14 @@ -import { EnvironmentId, ProjectId, ProviderInstanceId, ThreadId, RunId } from "@t3tools/contracts"; +import { + EnvironmentId, + MessageId, + ProjectId, + ProviderInstanceId, + ThreadId, + RunId, + TurnItemId, + type OrchestrationV2ProjectedTurnItem, +} from "@t3tools/contracts"; +import * as DateTime from "effect/DateTime"; import { describe, expect, it } from "vite-plus/test"; import type { Thread } from "../types"; @@ -8,6 +18,7 @@ import { MAX_HIDDEN_MOUNTED_TERMINAL_THREADS, buildExpiredTerminalContextToastCopy, createLocalDispatchSnapshot, + deriveCommittedServerUserMessageIds, deriveComposerSendState, getStartedThreadModelChangeBlockReason, hasServerAcknowledgedLocalDispatch, @@ -426,3 +437,103 @@ describe("hasServerAcknowledgedLocalDispatch", () => { expect(hasServerAcknowledgedLocalDispatch({ ...common, threadError: "failed" })).toBe(true); }); }); + +describe("deriveCommittedServerUserMessageIds", () => { + it("tracks only committed user turn items, not assistant rows or projection-only messages", () => { + const turnStartId = MessageId.make("message-turn-start"); + const steerId = MessageId.make("message-steer"); + const assistantId = MessageId.make("message-assistant"); + const committedAt = DateTime.makeUnsafe("2026-06-26T17:50:15.180Z"); + const runId = RunId.make("run:thread:thread-1:ordinal:1"); + const visibleTurnItems: ReadonlyArray = [ + { + position: 0, + visibility: "local", + sourceThreadId: threadId, + sourceItemId: TurnItemId.make("turn-item:message-turn-start"), + item: { + id: TurnItemId.make("turn-item:message-turn-start"), + threadId, + runId, + nodeId: null, + providerThreadId: null, + providerTurnId: null, + nativeItemRef: null, + parentItemId: null, + ordinal: 1, + status: "completed", + title: null, + startedAt: committedAt, + completedAt: committedAt, + updatedAt: committedAt, + createdBy: "user", + creationSource: "web", + type: "user_message", + messageId: turnStartId, + inputIntent: "turn_start", + text: "start", + attachments: [], + }, + }, + { + position: 1, + visibility: "local", + sourceThreadId: threadId, + sourceItemId: TurnItemId.make("turn-item:message-assistant"), + item: { + id: TurnItemId.make("turn-item:message-assistant"), + threadId, + runId, + nodeId: null, + providerThreadId: null, + providerTurnId: null, + nativeItemRef: null, + parentItemId: null, + ordinal: 2, + status: "completed", + title: null, + startedAt: committedAt, + completedAt: committedAt, + updatedAt: committedAt, + type: "assistant_message", + messageId: assistantId, + text: "working", + streaming: false, + }, + }, + { + position: 2, + visibility: "local", + sourceThreadId: threadId, + sourceItemId: TurnItemId.make("turn-item:message-steer"), + item: { + id: TurnItemId.make("turn-item:message-steer"), + threadId, + runId, + nodeId: null, + providerThreadId: null, + providerTurnId: null, + nativeItemRef: null, + parentItemId: null, + ordinal: 3, + status: "completed", + title: null, + startedAt: committedAt, + completedAt: committedAt, + updatedAt: committedAt, + createdBy: "user", + creationSource: "web", + type: "user_message", + messageId: steerId, + inputIntent: "steer", + text: "continue", + attachments: [], + }, + }, + ]; + + expect(deriveCommittedServerUserMessageIds(visibleTurnItems)).toEqual( + new Set([turnStartId, steerId]), + ); + }); +}); diff --git a/apps/web/src/components/ChatView.logic.ts b/apps/web/src/components/ChatView.logic.ts index 737d909236c..e30ca43be11 100644 --- a/apps/web/src/components/ChatView.logic.ts +++ b/apps/web/src/components/ChatView.logic.ts @@ -3,6 +3,7 @@ import { isProviderDriverKind, ProjectId, type ModelSelection, + type OrchestrationV2ProjectedTurnItem, type ProviderDriverKind, type ServerProvider, type ScopedThreadRef, @@ -380,6 +381,22 @@ export function createLocalDispatchSnapshot( }; } +/** + * The timeline renders committed user rows from `visibleTurnItems`, but + * `message.updated` can land in `projection.messages` one event earlier than + * the matching `turn-item.updated`. Basing optimistic eviction on visible user + * turn items avoids dropping steer rows in that gap. + */ +export function deriveCommittedServerUserMessageIds( + visibleTurnItems: ReadonlyArray, +): ReadonlySet { + return new Set( + visibleTurnItems.flatMap((row) => + row.item.type === "user_message" ? [row.item.messageId] : [], + ), + ); +} + export function hasServerAcknowledgedLocalDispatch(input: { localDispatch: LocalDispatchSnapshot | null; phase: SessionPhase; diff --git a/apps/web/src/components/ChatView.tsx b/apps/web/src/components/ChatView.tsx index 4733b813fed..a0be31fa32c 100644 --- a/apps/web/src/components/ChatView.tsx +++ b/apps/web/src/components/ChatView.tsx @@ -233,6 +233,7 @@ import { buildLocalDraftThread, collectUserMessageBlobPreviewUrls, createLocalDispatchSnapshot, + deriveCommittedServerUserMessageIds, deriveComposerSendState, hasServerAcknowledgedLocalDispatch, getStartedThreadModelChangeBlockReason, @@ -1063,8 +1064,8 @@ function ChatViewContent(props: ChatViewProps) { routeKind === "server" ? routeThreadRef : null, ); const committedServerMessageIds = useMemo( - () => new Set(serverProjection?.messages.map((message) => message.id) ?? []), - [serverProjection], + () => deriveCommittedServerUserMessageIds(serverVisibleTurnItems), + [serverVisibleTurnItems], ); const markThreadVisited = useUiStateStore((store) => store.markThreadVisited); const activeThreadLastVisitedAt = useUiStateStore((store) =>