diff --git a/src/providers/codex.test.ts b/src/providers/codex.test.ts index 1ada66da..a879ebdd 100644 --- a/src/providers/codex.test.ts +++ b/src/providers/codex.test.ts @@ -235,6 +235,112 @@ Deno.test("codex provider responses forwards abort signal to command runner", as assertEquals(seenSignal, controller.signal); }); +Deno.test("codex provider streams assistant text deltas from agent_message events", async () => { + const streamEvents: Array<{ type?: string; delta?: string; text?: string }> = + []; + const streamedText: Array = []; + const provider = createCodexProvider({ + runCommand: ({ onStdoutLine }) => { + const lines = [ + JSON.stringify({ + type: "item.delta", + item: { id: "msg_1", type: "agent_message", text: "hello " }, + }), + JSON.stringify({ + type: "item.delta", + item: { id: "msg_1", type: "agent_message", text: "world" }, + }), + JSON.stringify({ + type: "item.completed", + item: { id: "msg_1", type: "agent_message", text: "hello world" }, + }), + ]; + lines.forEach((line) => onStdoutLine?.(line)); + return Promise.resolve({ + success: true, + code: 0, + stdout: enc.encode(lines.join("\n")), + stderr: new Uint8Array(), + }); + }, + }); + + const result = await provider.responses?.({ + request: { + model: "codex-cli/default", + stream: true, + input: [{ + type: "message", + role: "user", + content: [{ type: "input_text", text: "hi" }], + }], + }, + onStreamEvent: (event) => { + streamEvents.push( + event as { type?: string; delta?: string; text?: string }, + ); + }, + }); + + assertEquals(result?.output[0]?.type, "message"); + const firstOutput = result?.output[0]; + assertEquals( + firstOutput && firstOutput.type === "message" + ? firstOutput.content[0]?.text + : undefined, + "hello world", + ); + assertEquals( + streamEvents.filter((event) => event.type === "response.output_text.delta") + .map((event) => event.delta), + ["hello ", "world"], + ); + assertEquals( + streamEvents.filter((event) => event.type === "response.output_text.done") + .map((event) => event.text), + ["hello world"], + ); + + await provider.chat({ + model: "codex-cli/default", + stream: true, + messages: [{ role: "user", content: "hi" }], + onStreamText: (text) => streamedText.push(text), + }); + + assertEquals(streamedText, ["hello ", "world"]); +}); + +Deno.test("codex provider streams completed-only assistant text once", async () => { + const streamedText: Array = []; + const provider = createCodexProvider({ + runCommand: ({ onStdoutLine }) => { + const lines = [ + JSON.stringify({ + type: "item.completed", + item: { id: "msg_1", type: "agent_message", text: "hello world" }, + }), + ]; + lines.forEach((line) => onStdoutLine?.(line)); + return Promise.resolve({ + success: true, + code: 0, + stdout: enc.encode(lines.join("\n")), + stderr: new Uint8Array(), + }); + }, + }); + + await provider.chat({ + model: "codex-cli/default", + stream: true, + messages: [{ role: "user", content: "hi" }], + onStreamText: (text) => streamedText.push(text), + }); + + assertEquals(streamedText, ["hello world"]); +}); + Deno.test("codex provider emits tool traces for mcp tool events", async () => { const traces: Array = []; const provider = createCodexProvider({ @@ -387,6 +493,83 @@ Deno.test("codex provider emits tool traces for command execution events", async ); }); +Deno.test("codex provider emits in-progress tool results for command execution deltas", async () => { + const traces: Array = []; + const provider = createCodexProvider({ + runCommand: ({ onStdoutLine }) => { + const lines = [ + JSON.stringify({ + type: "item.started", + item: { + id: "item_progress", + type: "command_execution", + command: "/bin/bash -lc ls", + aggregated_output: "", + exit_code: null, + status: "in_progress", + }, + }), + JSON.stringify({ + type: "item.delta", + item: { + id: "item_progress", + type: "command_execution", + command: "/bin/bash -lc ls", + aggregated_output: "apps\n", + exit_code: null, + status: "in_progress", + }, + }), + JSON.stringify({ + type: "item.completed", + item: { + id: "item_progress", + type: "command_execution", + command: "/bin/bash -lc ls", + aggregated_output: "apps\npackages\n", + exit_code: 0, + status: "completed", + }, + }), + JSON.stringify({ + type: "item.completed", + item: { type: "agent_message", text: "done" }, + }), + ]; + lines.forEach((line) => onStdoutLine?.(line)); + return Promise.resolve({ + success: true, + code: 0, + stdout: enc.encode(lines.join("\n")), + stderr: new Uint8Array(), + }); + }, + }); + + await provider.chat({ + model: "codex-cli/default", + messages: [{ role: "user", content: "hello" }], + onTraceEvent: (event) => traces.push(event), + }); + + const toolResults = traces.filter((event) => + event.type === "tool.result" + ) as Array>; + assertEquals(toolResults.length, 2); + assertEquals(toolResults[0]?.result, { + command: "/bin/bash -lc ls", + status: "in_progress", + output: "apps\n", + exit_code: null, + }); + assertEquals(toolResults[1]?.result, { + command: "/bin/bash -lc ls", + status: "completed", + output: "apps\npackages\n", + exit_code: 0, + }); +}); + Deno.test("codex provider emits tool traces for file change events", async () => { const traces: Array = []; const provider = createCodexProvider({ diff --git a/src/providers/codex.ts b/src/providers/codex.ts index 9a2cd3e8..5608c22a 100644 --- a/src/providers/codex.ts +++ b/src/providers/codex.ts @@ -276,12 +276,42 @@ function asRecord(value: unknown): Record { return {}; } +function codexToolResultForItem( + itemType: string, + record: Record, +): JSONValue { + if (itemType === "mcp_tool_call") { + return { + server: record.server ?? "", + status: record.status ?? "", + result: record.result ?? null, + error: record.error ?? null, + }; + } + if (itemType === "command_execution") { + return { + command: record.command ?? "", + status: record.status ?? "", + output: record.aggregated_output ?? "", + exit_code: record.exit_code ?? null, + }; + } + if (itemType === "file_change") { + return { + status: record.status ?? "", + changes: record.changes ?? [], + }; + } + return record ?? null; +} + function emitCodexToolEvents(input: { event: Record; emit: (event: Record) => void; toolNames: Map; emittedCalls: Set; - emittedResults: Set; + emittedTerminalResults: Set; + lastResultFingerprintByCallId: Map; toolOutputIndexByCallId: Map; nextOutputIndexRef: { value: number }; }): void { @@ -354,44 +384,28 @@ function emitCodexToolEvents(input: { }); } - if (input.emittedResults.has(callId)) return; if (!resolvedName) return; const isTerminal = payloadType === "item.completed" || payloadType === "item.done"; - if (!isTerminal) return; - input.emittedResults.add(callId); - const result: JSONValue = (() => { - if (itemType === "mcp_tool_call") { - return { - server: record.server ?? "", - status: record.status ?? "", - result: record.result ?? null, - error: record.error ?? null, - }; - } - if (itemType === "command_execution") { - return { - command: record.command ?? "", - status: record.status ?? "", - output: record.aggregated_output ?? "", - exit_code: record.exit_code ?? null, - }; - } - if (itemType === "file_change") { - return { - status: record.status ?? "", - changes: record.changes ?? [], - }; - } - return record ?? null; - })(); - input.emit({ - type: "tool.result", - actionCallId: callId, - name: resolvedName, - result, - toolKind: "mcp_bridge", - }); + const result = codexToolResultForItem(itemType, record); + const resultFingerprint = stringifyJsonValue(result); + const priorResultFingerprint = input.lastResultFingerprintByCallId.get( + callId, + ); + const shouldEmitProgressResult = payloadType !== "item.started" && + resultFingerprint !== priorResultFingerprint; + if (shouldEmitProgressResult) { + input.lastResultFingerprintByCallId.set(callId, resultFingerprint); + input.emit({ + type: "tool.result", + actionCallId: callId, + name: resolvedName, + result, + toolKind: "mcp_bridge", + }); + } + if (!isTerminal || input.emittedTerminalResults.has(callId)) return; + input.emittedTerminalResults.add(callId); input.emit({ type: "response.output_item.done", output_index: outputIndex, @@ -416,6 +430,63 @@ function extractTextParts(value: JSONValue | undefined): Array { return parts; } +function extractCodexItemText(record: Record): string { + return typeof record.text === "string" + ? record.text + : extractTextParts(record.content).join(""); +} + +type CodexAssistantStreamState = { + streamedText: string; + sawAssistantTextStream: boolean; +}; + +function emitCodexAssistantTextEvents(input: { + event: Record; + emit: (event: Record) => void; + emitText?: (text: string) => void; + assistantState: CodexAssistantStreamState; +}): void { + const payloadType = typeof input.event.type === "string" + ? input.event.type + : ""; + if (!payloadType.startsWith("item.")) return; + const item = input.event.item; + if (!item || typeof item !== "object" || Array.isArray(item)) return; + const record = item as Record; + if (record.type !== "agent_message") return; + + const outputIndex = 0; + const text = extractCodexItemText(record); + if (!text) return; + + if (payloadType === "item.delta") { + input.assistantState.sawAssistantTextStream = true; + input.assistantState.streamedText += text; + input.emit({ + type: "response.output_text.delta", + output_index: outputIndex, + delta: text, + }); + input.emitText?.(text); + return; + } + + if (payloadType === "item.completed" || payloadType === "item.done") { + const hadPriorAssistantDelta = input.assistantState.sawAssistantTextStream; + input.assistantState.sawAssistantTextStream = true; + input.assistantState.streamedText = text; + input.emit({ + type: "response.output_text.done", + output_index: outputIndex, + text, + }); + if (!hadPriorAssistantDelta) { + input.emitText?.(text); + } + } +} + function emitCodexReasoningEvents(input: { event: Record; emit: (event: Record) => void; @@ -649,8 +720,18 @@ function parseCodexStdout(stdout: string): { const item = parsed.item as Record | undefined; if (!item || typeof item !== "object") continue; if (item.type !== "agent_message") continue; - if (typeof item.text !== "string") continue; - const content = item.text.trim(); + const content = typeof item.text === "string" + ? item.text.trim() + : Array.isArray(item.content) + ? item.content + .filter((entry) => entry && typeof entry === "object") + .map((entry) => { + const record = entry as Record; + return typeof record.text === "string" ? record.text : ""; + }) + .join("") + .trim() + : ""; if (content) assistantText = content; continue; } @@ -771,13 +852,22 @@ function defaultCommandRunner(input: { function buildCodexStreamHandler(input: { emitRaw: (event: Record) => void; emitTool: (event: Record) => void; + emitText?: (text: string) => void; + assistantState: CodexAssistantStreamState; }): (event: Record) => void { const toolNames = new Map(); const emittedCalls = new Set(); - const emittedResults = new Set(); + const emittedTerminalResults = new Set(); + const lastResultFingerprintByCallId = new Map(); const toolOutputIndexByCallId = new Map(); const nextOutputIndexRef = { value: 0 }; return (event) => { + emitCodexAssistantTextEvents({ + event, + emit: input.emitTool, + emitText: input.emitText, + assistantState: input.assistantState, + }); emitCodexReasoningEvents({ event, emit: input.emitTool, @@ -787,7 +877,8 @@ function buildCodexStreamHandler(input: { emit: input.emitTool, toolNames, emittedCalls, - emittedResults, + emittedTerminalResults, + lastResultFingerprintByCallId, toolOutputIndexByCallId, nextOutputIndexRef, }); @@ -803,7 +894,12 @@ export function createCodexProvider(opts?: { if (input.signal?.aborted) { throw new DOMException("Run canceled", "AbortError"); } - const streamHandler = (input.onStreamEvent || input.onTraceEvent) + const assistantState: CodexAssistantStreamState = { + streamedText: "", + sawAssistantTextStream: false, + }; + const streamHandler = (input.onStreamEvent || input.onTraceEvent || + (input.stream && input.onStreamText)) ? buildCodexStreamHandler({ emitRaw: (event) => input.onStreamEvent?.(event), emitTool: (event) => { @@ -813,6 +909,10 @@ export function createCodexProvider(opts?: { event as unknown as import("@bolt-foundry/gambit-core").ProviderTraceEvent, ); }, + emitText: input.stream + ? (text) => input.onStreamText?.(text) + : undefined, + assistantState, }) : undefined; const priorThreadIdRaw = input.state?.meta?.[CODEX_THREAD_META_KEY]; @@ -883,7 +983,10 @@ export function createCodexProvider(opts?: { } const parsed = parseCodexStdout(stdout); const threadId = parsed.threadId ?? priorThreadId; - if (input.stream && input.onStreamText && parsed.assistantText) { + if ( + input.stream && input.onStreamText && parsed.assistantText && + !assistantState.sawAssistantTextStream + ) { input.onStreamText(parsed.assistantText); } const updatedState = buildUpdatedState({ @@ -910,19 +1013,29 @@ export function createCodexProvider(opts?: { onStreamEvent?: (event: ResponseEvent) => void; }): Promise { const streamHandler = input.onStreamEvent - ? buildCodexStreamHandler({ - emitRaw: (event) => { - input.onStreamEvent?.({ - type: "codex.event", - payload: event, - // this predates the lint rule - } as unknown as ResponseEvent); - }, - emitTool: (event) => { - // this predates the lint rule - input.onStreamEvent?.(event as unknown as ResponseEvent); - }, - }) + ? (() => { + const assistantState: CodexAssistantStreamState = { + streamedText: "", + sawAssistantTextStream: false, + }; + return { + assistantState, + handle: buildCodexStreamHandler({ + emitRaw: (event) => { + input.onStreamEvent?.({ + type: "codex.event", + payload: event, + // this predates the lint rule + } as unknown as ResponseEvent); + }, + emitTool: (event) => { + // this predates the lint rule + input.onStreamEvent?.(event as unknown as ResponseEvent); + }, + assistantState, + }), + }; + })() : undefined; const result = await runChat({ model: input.request.model, @@ -935,7 +1048,7 @@ export function createCodexProvider(opts?: { state: input.state, deckPath: input.deckPath, signal: input.signal, - onStreamEvent: streamHandler, + onStreamEvent: streamHandler?.handle, }); const output = responseItemsFromAssistantMessage(result.message); @@ -957,7 +1070,9 @@ export function createCodexProvider(opts?: { }, }); if ( - typeof result.message.content === "string" && result.message.content + typeof result.message.content === "string" && + result.message.content && + !streamHandler?.assistantState.sawAssistantTextStream ) { input.onStreamEvent?.({ type: "response.output_text.delta",