diff --git a/docs/codex-mid-turn-native-resume.md b/docs/codex-mid-turn-native-resume.md new file mode 100644 index 0000000000..bec4506cf7 --- /dev/null +++ b/docs/codex-mid-turn-native-resume.md @@ -0,0 +1,141 @@ +# Codex Mid-Turn Interaction: Native Resume Approach + +## Problem + +Codex currently uses the **interrupt-and-continue fallback** for mid-turn interaction: + +1. SIGINT kills the process +2. New process spawned with a hand-crafted continuation prompt containing partial output +3. Agent loses all conversation context except what Maestro manually captures and stuffs into the prompt + +This is fragile, lossy, and architecturally different from Claude Code's native stdin-based mid-turn input. + +## Research Findings (Codex CLI Reference) + +- `codex exec` is explicitly **non-interactive** — designed for "scripted or CI-style runs that should finish without human interaction" +- stdin with `-` only accepts the **initial prompt**, not continuous streaming input +- No documented stdin streaming, IPC, socket, or signal-based mid-turn message injection +- `--json` emits JSONL events **out** but there's no way to send events **back in** +- **Conclusion: True mid-turn stdin injection (like Claude Code's `stream-json`) is impossible with Codex** + +## Proposed Approach: Interrupt + Native Resume + +Codex supports `codex exec resume "follow-up prompt"`. Instead of reconstructing context manually, leverage Codex's own session persistence. + +### Current Flow (Fallback) + +```text +User interjects while Codex is working + → SIGINT sent to process + → Process exits + → Maestro collects partial stdout captured so far + → Maestro builds continuation prompt: + buildContinuationPrompt(partialOutput, userMessage) + wraps partial output in tags + → Spawn fresh: codex exec -- "giant continuation prompt" + → Agent sees ONLY what Maestro stuffed in the prompt + → Context loss: tool calls in progress, reasoning state, earlier turns +``` + +### Proposed Flow (Native Resume) + +```text +User interjects while Codex is working + → Grab thread_id (already stored as sessionId on the tab/process) + → SIGINT sent to process + → Process exits (Codex saves state to ~/.codex/sessions/ JSONL files) + → Spawn: codex exec resume -- "user's interjection message" + → Codex loads FULL conversation history from its own session files + → Agent has complete context of everything that happened +``` + +### Comparison + +| Aspect | Current (Fallback) | Proposed (Native Resume) | +| -------------------------- | ------------------------------------------ | ---------------------------------------- | +| Context preservation | Partial — only captured stdout | Full — Codex's own session files | +| Continuation prompt | Hand-crafted with `` tags | Just the user's interjection | +| Tool call history | Lost | Preserved | +| Reasoning state | Lost | Preserved (in session JSONL) | +| Earlier conversation turns | Lost | Preserved | +| Complexity | High — prompt reconstruction logic | Low — use existing resume infrastructure | +| Reliability | Fragile — depends on stdout capture timing | Robust — Codex manages its own state | + +## Infrastructure Already in Place + +| Component | File | Status | +| --------------------------- | ----------------------------------------- | ----------------------------------------------- | +| `thread_id` extraction | `src/main/parsers/codex-output-parser.ts` | Done — parsed from `thread.started` JSONL event | +| `resumeArgs` definition | `src/main/agents/definitions.ts` | Done — `(sessionId) => ['resume', sessionId]` | +| `supportsResume` capability | `src/main/agents/capabilities.ts` | Done — `true` | +| Resume arg building | `src/main/utils/agent-args.ts` | Done — inserts `resume ` into CLI args | +| Session ID storage | Tab/process state in agentStore | Done — stored when parser emits `init` event | + +## Implementation Plan + +### Primary Change: `src/renderer/hooks/input/useInputProcessing.ts` + +In the interrupt-and-continue fallback path (~line 443-538), replace: + +```typescript +// BEFORE: Build continuation prompt with partial output +const continuationPrompt = buildContinuationPrompt(partialOutput, userMessage); +queueExecution({ prompt: continuationPrompt, sessionId: undefined }); +``` + +With: + +```typescript +// AFTER: Resume with native session continuation +const threadId = getCurrentSessionId(); // already captured from thread.started +queueExecution({ prompt: userMessage, sessionId: threadId }); +``` + +### Capability Gating + +Gate this behavior on agents that support native resume: + +```typescript +if (capabilities.supportsResume && sessionId) { + // Use native resume — full context preserved by agent + queueExecution({ prompt: userMessage, sessionId }); +} else { + // Fall back to continuation prompt reconstruction + const continuationPrompt = buildContinuationPrompt(partialOutput, userMessage); + queueExecution({ prompt: continuationPrompt }); +} +``` + +### Secondary Changes + +1. **`src/main/process-manager/spawners/ChildProcessSpawner.ts`** — Ensure resume args are passed through when `sessionId` is provided on a queued execution +2. **`src/main/utils/agent-args.ts`** — Verify the resume + follow-up prompt combination produces correct CLI: `codex exec resume -- "message"` + +## Key Risk: Session State on SIGINT + +**Does Codex save session state when interrupted with SIGINT (not just on clean exit)?** + +Codex uses incrementally-written `.jsonl` rollout files at `~/.codex/sessions/YYYY/MM/DD/rollout--.jsonl`. Since JSONL files are append-only and typically flushed per-line, partial sessions should be persisted even on interrupt. + +**Mitigation:** If SIGINT doesn't reliably save state, two fallback strategies: + +1. **Graceful wait** — Let the current turn complete, then resume (queue-and-wait instead of interrupt) +2. **Hybrid** — Try native resume first; if it fails (session not found), fall back to continuation prompt + +## Agents This Applies To + +| Agent | Supports Resume | Session Persistence | Candidate? | +| ------------- | --------------- | ------------------------- | ------------------ | +| Codex | Yes | JSONL rollout files | Yes | +| Claude Code | N/A | Has native mid-turn stdin | No (already works) | +| OpenCode | Yes | Local session files | Yes | +| Factory Droid | Yes | `~/.factory/sessions/` | Yes | + +## References + +- Codex CLI reference: https://developers.openai.com/codex/cli/reference.md +- Agent definitions: `src/main/agents/definitions.ts:143-190` +- Agent capabilities: `src/main/agents/capabilities.ts:206-232` +- Codex output parser: `src/main/parsers/codex-output-parser.ts` +- Codex session storage: `src/main/storage/codex-session-storage.ts` +- Interrupt fallback path: `src/renderer/hooks/input/useInputProcessing.ts:443-538` diff --git a/src/__tests__/main/agents/capabilities.test.ts b/src/__tests__/main/agents/capabilities.test.ts index b8b888c446..76e0fbb0dc 100644 --- a/src/__tests__/main/agents/capabilities.test.ts +++ b/src/__tests__/main/agents/capabilities.test.ts @@ -272,6 +272,7 @@ describe('agent-capabilities', () => { 'supportsBatchMode', 'supportsStreaming', 'supportsStreamJsonInput', + 'supportsMidTurnInput', 'supportsResultMessages', 'supportsModelSelection', 'requiresPromptToStart', diff --git a/src/__tests__/main/ipc/handlers/process.test.ts b/src/__tests__/main/ipc/handlers/process.test.ts index 692b88f0bc..a734f0c8fb 100644 --- a/src/__tests__/main/ipc/handlers/process.test.ts +++ b/src/__tests__/main/ipc/handlers/process.test.ts @@ -285,9 +285,11 @@ describe('process IPC handlers', () => { const expectedChannels = [ 'process:spawn', 'process:write', + 'process:writeInterjection', 'process:interrupt', 'process:kill', 'process:resize', + 'process:hasResultEmitted', 'process:getActiveProcesses', 'process:spawnTerminalTab', 'process:runCommand', diff --git a/src/__tests__/renderer/hooks/useAgentCapabilities.test.ts b/src/__tests__/renderer/hooks/useAgentCapabilities.test.ts index b4f370620d..2da87f91dd 100644 --- a/src/__tests__/renderer/hooks/useAgentCapabilities.test.ts +++ b/src/__tests__/renderer/hooks/useAgentCapabilities.test.ts @@ -27,6 +27,7 @@ const baseCapabilities = { supportsResultMessages: true, supportsModelSelection: false, supportsStreamJsonInput: true, + supportsMidTurnInput: false, supportsThinkingDisplay: false, // Added in Show Thinking feature supportsContextMerge: false, supportsContextExport: false, diff --git a/src/__tests__/renderer/hooks/useAgentListeners.test.ts b/src/__tests__/renderer/hooks/useAgentListeners.test.ts index b0f8358948..d201098197 100644 --- a/src/__tests__/renderer/hooks/useAgentListeners.test.ts +++ b/src/__tests__/renderer/hooks/useAgentListeners.test.ts @@ -93,6 +93,7 @@ let onCommandExitHandler: ListenerCallback | undefined; let onUsageHandler: ListenerCallback | undefined; let onAgentErrorHandler: ListenerCallback | undefined; let onThinkingChunkHandler: ListenerCallback | undefined; +let onInterjectionAckHandler: ListenerCallback | undefined; let onSshRemoteHandler: ListenerCallback | undefined; let onToolExecutionHandler: ListenerCallback | undefined; @@ -105,6 +106,7 @@ const mockUnsubscribeCommandExit = vi.fn(); const mockUnsubscribeUsage = vi.fn(); const mockUnsubscribeAgentError = vi.fn(); const mockUnsubscribeThinkingChunk = vi.fn(); +const mockUnsubscribeInterjectionAck = vi.fn(); const mockUnsubscribeSshRemote = vi.fn(); const mockUnsubscribeToolExecution = vi.fn(); @@ -145,6 +147,10 @@ const mockProcess = { onThinkingChunkHandler = handler; return mockUnsubscribeThinkingChunk; }), + onInterjectionAck: vi.fn((handler: ListenerCallback) => { + onInterjectionAckHandler = handler; + return mockUnsubscribeInterjectionAck; + }), onSshRemote: vi.fn((handler: ListenerCallback) => { onSshRemoteHandler = handler; return mockUnsubscribeSshRemote; @@ -207,6 +213,7 @@ beforeEach(() => { onUsageHandler = undefined; onAgentErrorHandler = undefined; onThinkingChunkHandler = undefined; + onInterjectionAckHandler = undefined; onSshRemoteHandler = undefined; onToolExecutionHandler = undefined; @@ -275,7 +282,7 @@ describe('getErrorTitleForType', () => { describe('useAgentListeners', () => { describe('listener registration', () => { - it('registers all 11 IPC listeners on mount', () => { + it('registers all 12 IPC listeners on mount', () => { const deps = createMockDeps(); renderHook(() => useAgentListeners(deps)); @@ -288,11 +295,12 @@ describe('useAgentListeners', () => { expect(mockProcess.onUsage).toHaveBeenCalledTimes(1); expect(mockProcess.onAgentError).toHaveBeenCalledTimes(1); expect(mockProcess.onThinkingChunk).toHaveBeenCalledTimes(1); + expect(mockProcess.onInterjectionAck).toHaveBeenCalledTimes(1); expect(mockProcess.onSshRemote).toHaveBeenCalledTimes(1); expect(mockProcess.onToolExecution).toHaveBeenCalledTimes(1); }); - it('unsubscribes all 11 listeners on unmount', () => { + it('unsubscribes all 12 listeners on unmount', () => { const deps = createMockDeps(); const { unmount } = renderHook(() => useAgentListeners(deps)); @@ -307,6 +315,7 @@ describe('useAgentListeners', () => { expect(mockUnsubscribeUsage).toHaveBeenCalledTimes(1); expect(mockUnsubscribeAgentError).toHaveBeenCalledTimes(1); expect(mockUnsubscribeThinkingChunk).toHaveBeenCalledTimes(1); + expect(mockUnsubscribeInterjectionAck).toHaveBeenCalledTimes(1); expect(mockUnsubscribeSshRemote).toHaveBeenCalledTimes(1); expect(mockUnsubscribeToolExecution).toHaveBeenCalledTimes(1); }); @@ -324,6 +333,83 @@ describe('useAgentListeners', () => { }); }); + describe('onInterjectionAck', () => { + it('moves queued interjection from executionQueue to tab logs as delivered', () => { + const deps = createMockDeps(); + const session = createMockSession({ + id: 'sess-1', + aiTabs: [createMockTab({ id: 'tab-1', logs: [] })], + activeTabId: 'tab-1', + executionQueue: [ + { + id: 'interjection-1', + timestamp: 1700000000000, + tabId: 'tab-1', + type: 'message' as const, + text: 'follow up question', + images: undefined, + }, + ], + }); + useSessionStore.setState({ + sessions: [session], + activeSessionId: 'sess-1', + }); + + renderHook(() => useAgentListeners(deps)); + + onInterjectionAckHandler?.('sess-1-ai-tab-1', 'interjection-1'); + + const updated = useSessionStore.getState().sessions.find((s) => s.id === 'sess-1'); + // Should be removed from queue + expect(updated?.executionQueue.length).toBe(0); + // Should now be in tab logs as delivered + const deliveredLog = updated?.aiTabs[0].logs[0]; + expect(deliveredLog?.id).toBe('interjection-1'); + expect(deliveredLog?.text).toBe('follow up question'); + expect(deliveredLog?.interjection).toBe(true); + expect(deliveredLog?.delivered).toBe(true); + expect(deliveredLog?.deliveryFailed).toBe(false); + }); + + it('falls back to marking existing log entry as delivered (interrupt-resume path)', () => { + const deps = createMockDeps(); + const session = createMockSession({ + id: 'sess-1', + aiTabs: [ + createMockTab({ + id: 'tab-1', + logs: [ + { + id: 'interjection-1', + timestamp: Date.now(), + source: 'user', + text: 'follow up', + interjection: true, + delivered: false, + deliveryFailed: true, + }, + ], + }), + ], + activeTabId: 'tab-1', + }); + useSessionStore.setState({ + sessions: [session], + activeSessionId: 'sess-1', + }); + + renderHook(() => useAgentListeners(deps)); + + onInterjectionAckHandler?.('sess-1-ai-tab-1', 'interjection-1'); + + const updated = useSessionStore.getState().sessions.find((s) => s.id === 'sess-1'); + const updatedLog = updated?.aiTabs[0].logs[0]; + expect(updatedLog?.delivered).toBe(true); + expect(updatedLog?.deliveryFailed).toBe(false); + }); + }); + // ======================================================================== // onData handler // ======================================================================== diff --git a/src/__tests__/renderer/hooks/useInputProcessing.test.ts b/src/__tests__/renderer/hooks/useInputProcessing.test.ts index 670e96f9d1..1cab44108d 100644 --- a/src/__tests__/renderer/hooks/useInputProcessing.test.ts +++ b/src/__tests__/renderer/hooks/useInputProcessing.test.ts @@ -17,6 +17,7 @@ vi.mock('../../../renderer/hooks/agent/useAgentCapabilities', async () => { }); import { useInputProcessing } from '../../../renderer/hooks/input/useInputProcessing'; +import { hasCapabilityCached } from '../../../renderer/hooks/agent/useAgentCapabilities'; import type { Session, AITab, @@ -92,6 +93,20 @@ const defaultBatchState: BatchRunState = { worktreeActive: false, }; +const reduceSessionUpdates = ( + session: Session, + updates: Array<[Session[] | ((prev: Session[]) => Session[])]> +): Session[] => + updates.reduce( + (current, [update]) => { + if (typeof update === 'function') { + return update(current); + } + return update; + }, + [session] as Session[] + ); + describe('useInputProcessing', () => { const mockSetSessions = vi.fn(); const mockSetInputValue = vi.fn(); @@ -111,6 +126,12 @@ describe('useInputProcessing', () => { beforeEach(() => { vi.clearAllMocks(); mockGetBatchState.mockReturnValue(defaultBatchState); + vi.mocked(hasCapabilityCached).mockImplementation((agentId: string, capability: string) => { + if (capability === 'supportsBatchMode') { + return ['claude-code', 'codex', 'opencode', 'factory-droid'].includes(agentId); + } + return false; + }); // Mock window.maestro.process.spawn window.maestro = { @@ -119,7 +140,10 @@ describe('useInputProcessing', () => { ...window.maestro?.process, spawn: vi.fn().mockResolvedValue(undefined), write: vi.fn().mockResolvedValue(undefined), + writeInterjection: vi.fn().mockResolvedValue(true), + interrupt: vi.fn().mockResolvedValue(true), runCommand: vi.fn().mockResolvedValue(undefined), + hasResultEmitted: vi.fn().mockResolvedValue(false), }, agents: { ...window.maestro?.agents, @@ -815,6 +839,258 @@ describe('useInputProcessing', () => { }); }); + describe('mid-turn interjections', () => { + const mockCapabilities = (supportsMidTurnInput: boolean) => { + vi.mocked(hasCapabilityCached).mockImplementation((agentId: string, capability: string) => { + if (capability === 'supportsBatchMode') { + return ['claude-code', 'codex', 'opencode', 'factory-droid'].includes(agentId); + } + if (capability === 'supportsMidTurnInput') { + return supportsMidTurnInput; + } + return false; + }); + }; + + it('moves interjection from queue to logs on successful write', async () => { + mockCapabilities(true); + + const busyTab = createMockTab({ state: 'busy' }); + const busySession = createMockSession({ + state: 'busy', + aiTabs: [busyTab], + activeTabId: busyTab.id, + }); + const deps = createDeps({ + activeSession: busySession, + sessionsRef: { current: [busySession] }, + inputValue: 'please also include tests', + }); + + const { result } = renderHook(() => useInputProcessing(deps)); + + await act(async () => { + await result.current.processInput(); + }); + await act(async () => { + await Promise.resolve(); + }); + + expect(window.maestro.process.writeInterjection).toHaveBeenCalledWith( + 'session-1-ai-tab-1', + 'please also include tests', + expect.any(String), + undefined + ); + + const updatedSessions = reduceSessionUpdates( + busySession, + mockSetSessions.mock.calls as Array<[Session[] | ((prev: Session[]) => Session[])]> + ); + // Queue should be empty (moved to logs on successful write) + expect(updatedSessions[0].executionQueue.length).toBe(0); + // Should now be in logs as delivered + const deliveredLog = updatedSessions[0].aiTabs[0].logs[0]; + expect(deliveredLog.interjection).toBe(true); + expect(deliveredLog.delivered).toBe(true); + expect(deliveredLog.deliveryFailed).toBe(false); + expect(deliveredLog.text).toBe('please also include tests'); + }); + + it('moves interjection from queue to logs as failed when writeInterjection returns false', async () => { + mockCapabilities(true); + + const busyTab = createMockTab({ state: 'busy' }); + const busySession = createMockSession({ + state: 'busy', + aiTabs: [busyTab], + activeTabId: busyTab.id, + }); + const deps = createDeps({ + activeSession: busySession, + sessionsRef: { current: [busySession] }, + inputValue: 'please also include tests', + }); + (window.maestro.process.writeInterjection as ReturnType).mockResolvedValue( + false + ); + + const { result } = renderHook(() => useInputProcessing(deps)); + + await act(async () => { + await result.current.processInput(); + }); + await act(async () => { + await Promise.resolve(); + }); + + const updatedSessions = reduceSessionUpdates( + busySession, + mockSetSessions.mock.calls as Array<[Session[] | ((prev: Session[]) => Session[])]> + ); + // Should be removed from queue + expect(updatedSessions[0].executionQueue.length).toBe(0); + // Should now be in logs as failed + const updatedLog = updatedSessions[0].aiTabs[0].logs[0]; + expect(updatedLog.interjection).toBe(true); + expect(updatedLog.delivered).toBe(false); + expect(updatedLog.deliveryFailed).toBe(true); + }); + + it('moves interjection from queue to logs as failed when writeInterjection rejects', async () => { + mockCapabilities(true); + + const busyTab = createMockTab({ state: 'busy' }); + const busySession = createMockSession({ + state: 'busy', + aiTabs: [busyTab], + activeTabId: busyTab.id, + }); + const deps = createDeps({ + activeSession: busySession, + sessionsRef: { current: [busySession] }, + inputValue: 'retry with the new constraints', + }); + (window.maestro.process.writeInterjection as ReturnType).mockRejectedValue( + new Error('stdin closed') + ); + + const { result } = renderHook(() => useInputProcessing(deps)); + + await act(async () => { + await result.current.processInput(); + }); + await act(async () => { + await Promise.resolve(); + }); + + const updatedSessions = reduceSessionUpdates( + busySession, + mockSetSessions.mock.calls as Array<[Session[] | ((prev: Session[]) => Session[])]> + ); + // Should be removed from queue + expect(updatedSessions[0].executionQueue.length).toBe(0); + // Should now be in logs as failed + const updatedLog = updatedSessions[0].aiTabs[0].logs[0]; + expect(updatedLog.delivered).toBe(false); + expect(updatedLog.deliveryFailed).toBe(true); + }); + }); + + describe('native resume fallback (interrupt + resume)', () => { + const mockCapabilitiesForResume = (supportsResume: boolean) => { + vi.mocked(hasCapabilityCached).mockImplementation((agentId: string, capability: string) => { + if (capability === 'supportsBatchMode') { + return ['claude-code', 'codex', 'opencode', 'factory-droid'].includes(agentId); + } + if (capability === 'supportsMidTurnInput') { + return false; // No native stdin mid-turn + } + if (capability === 'supportsResume') { + return supportsResume; + } + return false; + }); + }; + + it('queues user message directly (no continuation prompt) when agent supports resume', async () => { + mockCapabilitiesForResume(true); + + const busyTab = createMockTab({ + state: 'busy', + agentSessionId: 'thread-abc-123', // Has a session ID from prior turn + }); + const busySession = createMockSession({ + state: 'busy', + toolType: 'codex', + aiTabs: [busyTab], + activeTabId: busyTab.id, + }); + const deps = createDeps({ + activeSession: busySession, + sessionsRef: { current: [busySession] }, + inputValue: 'also fix the tests', + }); + + const { result } = renderHook(() => useInputProcessing(deps)); + + await act(async () => { + await result.current.processInput(); + }); + + // Should interrupt the running process + expect(window.maestro.process.interrupt).toHaveBeenCalledWith('session-1-ai-tab-1'); + + // Should queue a resume prompt (not a continuation prompt with partial_output) + const updatedSessions = reduceSessionUpdates( + busySession, + mockSetSessions.mock.calls as Array<[Session[] | ((prev: Session[]) => Session[])]> + ); + const queuedItem = updatedSessions[0].executionQueue[0]; + expect(queuedItem).toBeDefined(); + // Resume prompt wraps user message with resume context + expect(queuedItem.text).toContain('also fix the tests'); + expect(queuedItem.text).toContain('session resume'); + // Should NOT contain partial_output tags (that's the old fallback) + expect(queuedItem.text).not.toContain(''); + // displayText should show raw user message in queue UI + expect(queuedItem.displayText).toBe('also fix the tests'); + // Should carry the interjection log ID for delivery tracking + expect(queuedItem.interjectionLogId).toBeDefined(); + + // The interjection log entry should start as queued (delivered: false) + const interjectionLog = updatedSessions[0].aiTabs[0].logs.find( + (log: any) => log.interjection === true + ); + expect(interjectionLog).toBeDefined(); + expect(interjectionLog?.delivered).toBe(false); + expect(interjectionLog?.deliveryFailed).toBe(false); + // The queued item should reference this log entry + expect(queuedItem.interjectionLogId).toBe(interjectionLog?.id); + }); + + it('falls back to continuation prompt when agent has no session ID', async () => { + mockCapabilitiesForResume(true); + + const busyTab = createMockTab({ + state: 'busy', + agentSessionId: null, // No session ID yet (first turn) + }); + const busySession = createMockSession({ + state: 'busy', + toolType: 'codex', + aiTabs: [busyTab], + activeTabId: busyTab.id, + }); + const deps = createDeps({ + activeSession: busySession, + sessionsRef: { current: [busySession] }, + inputValue: 'also fix the tests', + }); + + const { result } = renderHook(() => useInputProcessing(deps)); + + await act(async () => { + await result.current.processInput(); + }); + + // Should still interrupt + expect(window.maestro.process.interrupt).toHaveBeenCalled(); + + // Without a session ID, should fall back to continuation prompt + const updatedSessions = reduceSessionUpdates( + busySession, + mockSetSessions.mock.calls as Array<[Session[] | ((prev: Session[]) => Session[])]> + ); + const queuedItem = updatedSessions[0].executionQueue[0]; + expect(queuedItem).toBeDefined(); + // Continuation prompt wraps user message (buildContinuationPrompt behavior) + expect(queuedItem.text).toContain('also fix the tests'); + // displayText should show raw user message, not the continuation prompt + expect(queuedItem.displayText).toBe('also fix the tests'); + }); + }); + describe('Auto Run blocking', () => { it('queues write commands when Auto Run is active AND session is busy', async () => { const runningBatchState: BatchRunState = { diff --git a/src/main/agents/capabilities.ts b/src/main/agents/capabilities.ts index b6403c4a7b..4e21d6a734 100644 --- a/src/main/agents/capabilities.ts +++ b/src/main/agents/capabilities.ts @@ -61,6 +61,9 @@ export interface AgentCapabilities { /** Agent supports --input-format stream-json for image input via stdin */ supportsStreamJsonInput: boolean; + /** Agent can receive additional user messages via stdin while processing a turn */ + supportsMidTurnInput: boolean; + /** Agent emits streaming thinking/reasoning content that can be displayed */ supportsThinkingDisplay: boolean; @@ -112,6 +115,7 @@ export const DEFAULT_CAPABILITIES: AgentCapabilities = { supportsResultMessages: false, supportsModelSelection: false, supportsStreamJsonInput: false, + supportsMidTurnInput: false, supportsThinkingDisplay: false, supportsContextMerge: false, supportsContextExport: false, @@ -155,6 +159,7 @@ export const AGENT_CAPABILITIES: Record = { supportsResultMessages: true, // "result" event type supportsModelSelection: false, // Model is configured via Anthropic account supportsStreamJsonInput: true, // --input-format stream-json for images via stdin + supportsMidTurnInput: true, // NDJSON stream-json stdin accepts multiple messages mid-turn supportsThinkingDisplay: true, // Emits streaming assistant messages supportsContextMerge: true, // Can receive merged context via prompts supportsContextExport: true, // Session storage supports context export @@ -186,6 +191,7 @@ export const AGENT_CAPABILITIES: Record = { supportsResultMessages: false, supportsModelSelection: false, supportsStreamJsonInput: false, + supportsMidTurnInput: false, supportsThinkingDisplay: false, // Terminal is not an AI agent supportsContextMerge: false, // Terminal is not an AI agent supportsContextExport: false, // Terminal has no AI context @@ -220,6 +226,7 @@ export const AGENT_CAPABILITIES: Record = { supportsResultMessages: false, // All messages are agent_message type (no distinct result) - Verified supportsModelSelection: true, // -m, --model flag - Documented supportsStreamJsonInput: false, // Uses -i, --image flag instead + supportsMidTurnInput: false, supportsThinkingDisplay: true, // Emits reasoning tokens (o3/o4-mini) supportsContextMerge: true, // Can receive merged context via prompts supportsContextExport: true, // Session storage supports context export @@ -254,6 +261,7 @@ export const AGENT_CAPABILITIES: Record = { supportsResultMessages: false, supportsModelSelection: false, // Not yet investigated supportsStreamJsonInput: false, + supportsMidTurnInput: false, supportsThinkingDisplay: false, // Not yet investigated supportsContextMerge: false, // Not yet investigated - PLACEHOLDER supportsContextExport: false, // Not yet investigated - PLACEHOLDER @@ -287,6 +295,7 @@ export const AGENT_CAPABILITIES: Record = { supportsResultMessages: false, supportsModelSelection: false, // Not yet investigated supportsStreamJsonInput: false, + supportsMidTurnInput: false, supportsThinkingDisplay: false, // Not yet investigated supportsContextMerge: false, // Not yet investigated - PLACEHOLDER supportsContextExport: false, // Not yet investigated - PLACEHOLDER @@ -321,6 +330,7 @@ export const AGENT_CAPABILITIES: Record = { supportsResultMessages: true, // step_finish with part.reason:"stop" - Verified supportsModelSelection: true, // --model provider/model (e.g., 'ollama/qwen3:8b') - Verified supportsStreamJsonInput: false, // Uses positional arguments for prompt + supportsMidTurnInput: false, supportsThinkingDisplay: true, // Emits streaming text chunks supportsContextMerge: true, // Can receive merged context via prompts supportsContextExport: true, // Session storage supports context export @@ -354,6 +364,7 @@ export const AGENT_CAPABILITIES: Record = { supportsResultMessages: true, // Can detect end of conversation supportsModelSelection: true, // -m, --model flag - Verified supportsStreamJsonInput: true, // --input-format stream-json - Verified + supportsMidTurnInput: true, // Uses same stream-json input format — pending direct verification supportsThinkingDisplay: true, // Emits thinking content in messages - Verified supportsContextMerge: true, // Can receive merged context via prompts supportsContextExport: true, // Session files are exportable @@ -388,6 +399,7 @@ export const AGENT_CAPABILITIES: Record = { supportsResultMessages: false, supportsModelSelection: false, supportsStreamJsonInput: false, + supportsMidTurnInput: false, supportsThinkingDisplay: false, supportsContextMerge: false, supportsContextExport: false, diff --git a/src/main/ipc/handlers/process.ts b/src/main/ipc/handlers/process.ts index 632e901420..2e00822352 100644 --- a/src/main/ipc/handlers/process.ts +++ b/src/main/ipc/handlers/process.ts @@ -4,7 +4,7 @@ import * as os from 'os'; import * as fs from 'fs'; import * as path from 'path'; import { ProcessManager } from '../../process-manager'; -import { AgentDetector } from '../../agents'; +import { AgentDetector, hasCapability } from '../../agents'; import { logger } from '../../utils/logger'; import { isWindows } from '../../../shared/platformDetection'; import { addBreadcrumb, captureException } from '../../utils/sentry'; @@ -643,6 +643,8 @@ export function registerProcessHandlers(deps: ProcessHandlerDependencies): void sshRemoteHost: sshRemoteUsed?.host, // SSH stdin script - the entire command is sent via stdin to /bin/bash on remote sshStdinScript, + // Keep stdin open for agents that support mid-turn input + keepStdinOpen: hasCapability(config.toolType, 'supportsMidTurnInput'), }); logger.info(`Process spawned successfully`, LOG_CONTEXT, { @@ -705,6 +707,54 @@ export function registerProcessHandlers(deps: ProcessHandlerDependencies): void }) ); + // Write a mid-turn interjection to a running agent process + ipcMain.handle( + 'process:writeInterjection', + withIpcErrorLogging( + handlerOpts('writeInterjection'), + async (sessionId: string, text: string, interjectionId?: string, images?: string[]) => { + const processManager = requireProcessManager(getProcessManager); + + // Validate non-empty text to avoid sending empty stream-json messages + if (!text || !text.trim()) { + logger.warn(`Ignoring empty interjection for process: ${sessionId}`, LOG_CONTEXT, { + sessionId, + interjectionId, + }); + return false; + } + + logger.info(`Writing interjection to process: ${sessionId}`, LOG_CONTEXT, { + sessionId, + textLength: text.length, + interjectionId, + imageCount: images?.length || 0, + }); + const streamJsonMessage = buildStreamJsonMessage(text, images || []); + logger.debug('Built interjection stream-json message', LOG_CONTEXT, { + sessionId, + messageLength: streamJsonMessage.length, + }); + const success = processManager.write(sessionId, streamJsonMessage + '\n'); + + // Track the pending interjection so StdoutHandler can acknowledge + // it when the CLI emits a result for the current turn + if (success && interjectionId) { + const managedProcess = processManager.get(sessionId); + if (managedProcess) { + // Immutable append — internal process state queue + managedProcess.pendingInterjectionIds = [ + ...(managedProcess.pendingInterjectionIds || []), + interjectionId, + ]; + } + } + + return success; + } + ) + ); + // Send SIGINT to a process ipcMain.handle( 'process:interrupt', @@ -894,6 +944,17 @@ export function registerProcessHandlers(deps: ProcessHandlerDependencies): void ) ); + // Check if a process has emitted its result (for race condition guard) + ipcMain.handle( + 'process:hasResultEmitted', + withIpcErrorLogging(handlerOpts('hasResultEmitted'), async (sessionId: string) => { + const processManager = requireProcessManager(getProcessManager); + const managedProcess = processManager.get(sessionId); + // Missing process means it already exited and was cleaned up - treat as finished + return !managedProcess || managedProcess.resultEmitted === true; + }) + ); + // Run a single command and capture only stdout/stderr (no PTY echo/prompts) // Supports SSH remote execution when sessionSshRemoteConfig is provided // TODO: Remove this handler once all callers migrate to process:spawnTerminalTab for persistent PTY sessions diff --git a/src/main/preload/process.ts b/src/main/preload/process.ts index 6fb18ec876..711c0c45fd 100644 --- a/src/main/preload/process.ts +++ b/src/main/preload/process.ts @@ -180,6 +180,18 @@ export function createProcessApi() { write: (sessionId: string, data: string): Promise => ipcRenderer.invoke('process:write', sessionId, data), + /** + * Write a mid-turn interjection to a running agent process. + * Formats the message as stream-json in the main process and writes to stdin. + */ + writeInterjection: ( + sessionId: string, + text: string, + interjectionId?: string, + images?: string[] + ): Promise => + ipcRenderer.invoke('process:writeInterjection', sessionId, text, interjectionId, images), + /** * Send interrupt signal (Ctrl+C) to a process */ @@ -204,6 +216,12 @@ export function createProcessApi() { runCommand: (config: RunCommandConfig): Promise<{ exitCode: number }> => ipcRenderer.invoke('process:runCommand', config), + /** + * Check if a running process has already emitted its result + */ + hasResultEmitted: (sessionId: string): Promise => + ipcRenderer.invoke('process:hasResultEmitted', sessionId), + /** * Get all active processes from ProcessManager */ @@ -264,6 +282,18 @@ export function createProcessApi() { return () => ipcRenderer.removeListener('process:thinking-chunk', handler); }, + /** + * Subscribe to interjection acknowledgment (CLI consumed a mid-turn message) + */ + onInterjectionAck: ( + callback: (sessionId: string, interjectionId: string) => void + ): (() => void) => { + const handler = (_: unknown, sessionId: string, interjectionId: string) => + callback(sessionId, interjectionId); + ipcRenderer.on('process:interjection-ack', handler); + return () => ipcRenderer.removeListener('process:interjection-ack', handler); + }, + /** * Subscribe to tool execution events */ diff --git a/src/main/process-listeners/forwarding-listeners.ts b/src/main/process-listeners/forwarding-listeners.ts index 90121b8bb3..aa39e793ea 100644 --- a/src/main/process-listeners/forwarding-listeners.ts +++ b/src/main/process-listeners/forwarding-listeners.ts @@ -33,6 +33,11 @@ export function setupForwardingListeners( safeSend('process:tool-execution', sessionId, toolEvent); }); + // Handle interjection acknowledgment (CLI has consumed a mid-turn message) + processManager.on('interjection-ack', (sessionId: string, interjectionId: string) => { + safeSend('process:interjection-ack', sessionId, interjectionId); + }); + // Handle stderr separately from runCommand (for clean command execution) processManager.on('stderr', (sessionId: string, data: string) => { safeSend('process:stderr', sessionId, data); diff --git a/src/main/process-manager/ProcessManager.ts b/src/main/process-manager/ProcessManager.ts index 53a144b71f..7c6e40f535 100644 --- a/src/main/process-manager/ProcessManager.ts +++ b/src/main/process-manager/ProcessManager.ts @@ -102,8 +102,14 @@ export class ProcessManager extends EventEmitter { process.ptyProcess.write(data); return true; } else if (process.childProcess?.stdin) { - process.childProcess.stdin.write(data); - return true; + if (process.childProcess.stdin.writable) { + process.childProcess.stdin.write(data); + return true; + } + logger.warn('[ProcessManager] stdin not writable for session', 'ProcessManager', { + sessionId, + }); + return false; } return false; } catch (error) { diff --git a/src/main/process-manager/handlers/StdoutHandler.ts b/src/main/process-manager/handlers/StdoutHandler.ts index 3b90331245..a83c6cd9c1 100644 --- a/src/main/process-manager/handlers/StdoutHandler.ts +++ b/src/main/process-manager/handlers/StdoutHandler.ts @@ -145,10 +145,22 @@ export class StdoutHandler { managedProcess: ManagedProcess, output: string ): void { + logger.debug('[ProcessManager] handleStreamJsonData chunk', 'ProcessManager', { + sessionId, + toolType: managedProcess.toolType, + chunkLength: output.length, + existingBufferLength: managedProcess.jsonBuffer?.length || 0, + }); managedProcess.jsonBuffer = (managedProcess.jsonBuffer || '') + output; const lines = managedProcess.jsonBuffer.split('\n'); managedProcess.jsonBuffer = lines.pop() || ''; + logger.debug('[ProcessManager] handleStreamJsonData split', 'ProcessManager', { + sessionId, + toolType: managedProcess.toolType, + completedLines: lines.length, + remainingBufferLength: managedProcess.jsonBuffer.length, + }); for (const line of lines) { if (!line.trim()) continue; @@ -161,6 +173,11 @@ export class StdoutHandler { private processLine(sessionId: string, managedProcess: ManagedProcess, line: string): void { const { outputParser, toolType } = managedProcess; + logger.debug('[ProcessManager] processLine received', 'ProcessManager', { + sessionId, + toolType, + lineLength: line.length, + }); // ── Single JSON parse for the entire line ── // Previously JSON.parse was called up to 3× per line (detectErrorFromLine, @@ -171,6 +188,15 @@ export class StdoutHandler { } catch { // Not valid JSON — handled in the else branch below } + logger.debug('[ProcessManager] processLine parse result', 'ProcessManager', { + sessionId, + toolType, + parsed: parsed !== null, + parsedType: + parsed && typeof parsed === 'object' && 'type' in (parsed as Record) + ? String((parsed as Record).type) + : null, + }); // ── Error detection from parser ── if (outputParser && !managedProcess.errorEmitted) { @@ -231,6 +257,11 @@ export class StdoutHandler { this.handleLegacyMessage(sessionId, managedProcess, parsed); } } else { + logger.warn('[ProcessManager] Non-JSON line in stream-json mode', 'ProcessManager', { + sessionId, + toolType, + linePreview: line.substring(0, 400), + }); this.bufferManager.emitDataBuffered(sessionId, line); } } @@ -245,6 +276,7 @@ export class StdoutHandler { logger.debug('[ProcessManager] Parsed event from output parser', 'ProcessManager', { sessionId, + toolType: managedProcess.toolType, eventType: event?.type, hasText: !!event?.text, textPreview: event?.text?.substring(0, 100), @@ -397,8 +429,55 @@ export class StdoutHandler { outputParser.isResultMessage(event) && !managedProcess.resultEmitted ) { + // If interjections are queued, this result ends the PRE-interjection turn. + // Acknowledge the next interjection and reset for a new turn instead of + // closing stdin and marking the process as done. + if ( + managedProcess.pendingInterjectionIds && + managedProcess.pendingInterjectionIds.length > 0 + ) { + const [ackId, ...remainingIds] = managedProcess.pendingInterjectionIds; + managedProcess.pendingInterjectionIds = remainingIds; + if (!ackId) return; + logger.info( + '[ProcessManager] Result received with pending interjection — acknowledging and resetting for next turn', + 'ProcessManager', + { + sessionId, + interjectionId: ackId, + remainingPending: remainingIds.length, + } + ); + + // Emit the result text for the completed turn + const resultText = event.text || managedProcess.streamedText || ''; + if (resultText) { + this.bufferManager.emitDataBuffered(sessionId, resultText); + } + + // Notify renderer that the CLI has taken in this interjection + this.emitter.emit('interjection-ack', sessionId, ackId); + + // Reset per-turn state so the interjection response is tracked as a fresh turn + managedProcess.streamedText = ''; + // resultEmitted stays false — the interjection response will produce its own result + + // Don't close stdin here; it was already written and may still be open + // for further interjections. It will be closed by the final result. + return; + } + + logger.info('[ProcessManager] Final result event detected', 'ProcessManager', { + sessionId, + toolType: managedProcess.toolType, + eventType: event.type, + hasEventText: !!event.text, + streamedTextLength: managedProcess.streamedText?.length || 0, + keepStdinOpen: !!managedProcess.keepStdinOpen, + }); managedProcess.resultEmitted = true; const resultText = event.text || managedProcess.streamedText || ''; + this.closeKeptOpenStdinAfterResult(sessionId, managedProcess); // Log synopsis result processing (for debugging empty synopsis issue) if (sessionId.includes('-synopsis-')) { @@ -447,6 +526,7 @@ export class StdoutHandler { if (msgRecord.type === 'result' && msgRecord.result && !managedProcess.resultEmitted) { managedProcess.resultEmitted = true; + this.closeKeptOpenStdinAfterResult(sessionId, managedProcess); logger.debug('[ProcessManager] Emitting result data', 'ProcessManager', { sessionId, resultLength: (msgRecord.result as string).length, @@ -512,4 +592,34 @@ export class StdoutHandler { reasoningTokens: usage.reasoningTokens, }; } + + private closeKeptOpenStdinAfterResult(sessionId: string, managedProcess: ManagedProcess): void { + if (!managedProcess.keepStdinOpen || !managedProcess.childProcess?.stdin) { + logger.debug('[ProcessManager] Skipping closeKeptOpenStdinAfterResult', 'ProcessManager', { + sessionId, + toolType: managedProcess.toolType, + keepStdinOpen: !!managedProcess.keepStdinOpen, + hasChildStdin: !!managedProcess.childProcess?.stdin, + }); + return; + } + + const stdin = managedProcess.childProcess.stdin; + if (!stdin.writable || stdin.destroyed) { + logger.warn('[ProcessManager] Cannot close kept-open stdin after result', 'ProcessManager', { + sessionId, + toolType: managedProcess.toolType, + writable: stdin.writable, + destroyed: stdin.destroyed, + }); + return; + } + + logger.info('[ProcessManager] Closing kept-open stdin after final result', 'ProcessManager', { + sessionId, + toolType: managedProcess.toolType, + }); + stdin.end(); + managedProcess.keepStdinOpen = false; + } } diff --git a/src/main/process-manager/spawners/ChildProcessSpawner.ts b/src/main/process-manager/spawners/ChildProcessSpawner.ts index b66f9c7531..e205e19638 100644 --- a/src/main/process-manager/spawners/ChildProcessSpawner.ts +++ b/src/main/process-manager/spawners/ChildProcessSpawner.ts @@ -17,6 +17,7 @@ import { saveImageToTempFile, buildImagePromptPrefix } from '../utils/imageUtils import { buildStreamJsonMessage } from '../utils/streamJsonBuilder'; import { escapeArgsForShell, isPowerShellShell } from '../utils/shellEscape'; import { isWindows } from '../../../shared/platformDetection'; +import { captureException } from '../../utils/sentry'; /** * Handles spawning of child processes (non-PTY). @@ -88,6 +89,11 @@ export class ChildProcessSpawner { (arg, i) => arg === 'stream-json' && i > 0 && args[i - 1] === '--input-format' ); const promptViaStdin = sendPromptViaStdin || sendPromptViaStdinRaw || argsHaveInputStreamJson; + const shouldUseStreamJsonInput = + !!prompt && + capabilities.supportsStreamJsonInput && + !sendPromptViaStdinRaw && + (hasImages || sendPromptViaStdin || argsHaveInputStreamJson); // Build final args based on batch mode and images // Track whether the prompt was added to CLI args (used later to decide stdin behavior) @@ -97,16 +103,15 @@ export class ChildProcessSpawner { let effectivePrompt = prompt; let promptAddedToArgs = false; - if (hasImages && prompt && capabilities.supportsStreamJsonInput) { - // For agents that support stream-json input (like Claude Code) - // Always add --input-format stream-json when sending images via stdin. - // This flag is required for Claude Code to parse the JSON+base64 message - // correctly; without it, the raw JSON is treated as plain text prompt. - const needsInputFormat = !args.includes('--input-format') - ? ['--input-format', 'stream-json'] - : []; + if (shouldUseStreamJsonInput) { + // For agents that support stream-json stdin input (like Claude Code), + // always add --input-format stream-json whenever the prompt is being + // delivered via stdin. Without this flag, the agent treats the JSON + // payload as plain text and typically waits for EOF before processing, + // which breaks mid-turn input because stdin remains open. + const needsInputFormat = !argsHaveInputStreamJson ? ['--input-format', 'stream-json'] : []; finalArgs = [...args, ...needsInputFormat]; - // Prompt will be sent via stdin as stream-json with embedded images (not in CLI args) + // Prompt will be sent via stdin as stream-json (with embedded images when present) } else if (hasImages && prompt && imageArgs) { // For agents that use file-based image args (like Codex, OpenCode) finalArgs = [...args]; @@ -385,6 +390,7 @@ export class ChildProcessSpawner { projectPath: config.projectPath, sshRemoteId: config.sshRemoteId, sshRemoteHost: config.sshRemoteHost, + keepStdinOpen: config.keepStdinOpen, }; this.processes.set(sessionId, managedProcess); @@ -429,6 +435,11 @@ export class ChildProcessSpawner { }); childProcess.stdout.on('data', (data: Buffer | string) => { const output = data.toString(); + logger.debug('[ProcessManager] stdout chunk received', 'ProcessManager', { + sessionId, + toolType, + length: output.length, + }); this.stdoutHandler.handleData(sessionId, output); }); } else { @@ -451,6 +462,11 @@ export class ChildProcessSpawner { }); childProcess.stderr.on('data', (data: Buffer | string) => { const stderrData = data.toString(); + logger.debug('[ProcessManager] stderr chunk received', 'ProcessManager', { + sessionId, + toolType, + length: stderrData.length, + }); this.stderrHandler.handleData(sessionId, stderrData); }); } @@ -461,11 +477,32 @@ export class ChildProcessSpawner { // emitted near the end of stdout (e.g., tab-naming, batch operations). // The 'close' event guarantees all stdio streams are closed first. childProcess.on('close', (code) => { + logger.info('[ProcessManager] child process close', 'ProcessManager', { + sessionId, + toolType, + code, + stdinDestroyed: childProcess.stdin?.destroyed ?? null, + stdinWritable: childProcess.stdin?.writable ?? null, + }); + // Clean up stdin if it was kept open for mid-turn input + if (childProcess.stdin && !childProcess.stdin.destroyed) { + childProcess.stdin.end(); + } this.exitHandler.handleExit(sessionId, code || 0); }); // Handle errors childProcess.on('error', (error) => { + logger.error('[ProcessManager] child process error event', 'ProcessManager', { + sessionId, + toolType, + error: String(error), + }); + captureException(error, { + operation: 'child-process-error', + sessionId, + toolType, + }); this.exitHandler.handleError(sessionId, error); }); @@ -477,7 +514,17 @@ export class ChildProcessSpawner { scriptLength: config.sshStdinScript.length, }); childProcess.stdin?.write(config.sshStdinScript); - childProcess.stdin?.end(); + if (!config.keepStdinOpen) { + childProcess.stdin?.end(); + } else { + logger.debug( + '[ProcessManager] Keeping stdin open for mid-turn input (SSH)', + 'ProcessManager', + { + sessionId, + } + ); + } } else if (config.sendPromptViaStdinRaw && effectivePrompt) { // Raw stdin mode: send prompt as literal text (non-stream-json agents on Windows) // Note: When sending via stdin, PowerShell treats the input as literal text, @@ -497,12 +544,20 @@ export class ChildProcessSpawner { const streamJsonMessage = buildStreamJsonMessage(effectivePrompt, images || []); logger.debug('[ProcessManager] Sending stream-json message via stdin', 'ProcessManager', { sessionId, + toolType, messageLength: streamJsonMessage.length, imageCount: (images || []).length, hasImages: !!(images && images.length > 0), + keepStdinOpen: !!config.keepStdinOpen, }); childProcess.stdin?.write(streamJsonMessage + '\n'); - childProcess.stdin?.end(); + if (!config.keepStdinOpen) { + childProcess.stdin?.end(); + } else { + logger.debug('[ProcessManager] Keeping stdin open for mid-turn input', 'ProcessManager', { + sessionId, + }); + } } else if (isBatchMode) { // Regular batch mode: close stdin immediately logger.debug('[ProcessManager] Closing stdin for batch mode', 'ProcessManager', { @@ -516,6 +571,12 @@ export class ChildProcessSpawner { logger.error('[ProcessManager] Failed to spawn process', 'ProcessManager', { error: String(error), }); + captureException(error instanceof Error ? error : new Error(String(error)), { + operation: 'child-process-spawn', + sessionId, + toolType, + command, + }); return { pid: -1, success: false }; } } diff --git a/src/main/process-manager/types.ts b/src/main/process-manager/types.ts index 8f0f5f87bd..ac5f56ea96 100644 --- a/src/main/process-manager/types.ts +++ b/src/main/process-manager/types.ts @@ -40,6 +40,8 @@ export interface ProcessConfig { cols?: number; /** PTY terminal height in rows (default 24) */ rows?: number; + /** If true, don't close stdin after initial prompt - enables mid-turn writes */ + keepStdinOpen?: boolean; } /** @@ -76,6 +78,9 @@ export interface ManagedProcess { projectPath?: string; sshRemoteId?: string; sshRemoteHost?: string; + keepStdinOpen?: boolean; + /** Queue of interjection IDs awaiting CLI acknowledgment (result event) */ + pendingInterjectionIds?: string[]; dataBuffer?: string; dataBufferTimeout?: NodeJS.Timeout; } @@ -119,6 +124,7 @@ export interface ProcessManagerEvents { 'session-id': (sessionId: string, agentSessionId: string) => void; 'agent-error': (sessionId: string, error: AgentError) => void; 'thinking-chunk': (sessionId: string, text: string) => void; + 'interjection-ack': (sessionId: string, interjectionId: string) => void; 'tool-execution': (sessionId: string, tool: ToolExecution) => void; 'slash-commands': (sessionId: string, commands: unknown[]) => void; 'query-complete': (sessionId: string, data: QueryCompleteData) => void; diff --git a/src/renderer/components/InputArea.tsx b/src/renderer/components/InputArea.tsx index 041d87325a..ffbd479615 100644 --- a/src/renderer/components/InputArea.tsx +++ b/src/renderer/components/InputArea.tsx @@ -278,7 +278,7 @@ export const InputArea = React.memo(function InputArea(props: InputAreaProps) { }, [isResumingSession, hasCapability]); // PERF: Memoize mode-related derived state - const { isReadOnlyMode, showQueueingBorder } = useMemo(() => { + const { isReadOnlyMode, showQueueingBorder, showBusyBorder } = useMemo(() => { // Check if we're in read-only mode (manual toggle only - Claude will be in plan mode) // NOTE: Auto Run no longer forces read-only mode. Instead: // - Yellow border shows during Auto Run to indicate queuing will happen for write messages @@ -289,11 +289,15 @@ export const InputArea = React.memo(function InputArea(props: InputAreaProps) { // Check if Auto Run is active - used for yellow border indication (queuing will happen for write messages) const autoRunActive = isAutoModeActive && session.inputMode === 'ai'; // Show yellow border when: read-only mode is on OR Auto Run is active (both indicate special input handling) + // Show subtle busy border when the active tab is busy in AI mode (interjection hint) + const busyInAI = + activeTab?.state === 'busy' && session.inputMode === 'ai' && !readOnly && !autoRunActive; return { isReadOnlyMode: readOnly, showQueueingBorder: readOnly || autoRunActive, + showBusyBorder: busyInAI, }; - }, [tabReadOnlyMode, isAutoModeActive, session.inputMode]); + }, [tabReadOnlyMode, isAutoModeActive, session.inputMode, activeTab?.state]); // Filter slash commands based on input and current mode const isTerminalMode = session.inputMode === 'terminal'; @@ -844,10 +848,16 @@ export const InputArea = React.memo(function InputArea(props: InputAreaProps) {
@@ -867,7 +877,11 @@ export const InputArea = React.memo(function InputArea(props: InputAreaProps) { placeholder={ isTerminalMode ? 'Run shell command...' - : `Talking to ${session.name} powered by ${getProviderDisplayName(session.toolType)}` + : activeTab?.state === 'busy' + ? hasCapability('supportsMidTurnInput') + ? 'Send a message to the active agent...' + : 'Interrupt agent with a follow-up...' + : `Talking to ${session.name} powered by ${getProviderDisplayName(session.toolType)}` } value={inputValue} onFocus={onInputFocus} diff --git a/src/renderer/components/QueuedItemsList.tsx b/src/renderer/components/QueuedItemsList.tsx index a1f83d52f3..ad89e894df 100644 --- a/src/renderer/components/QueuedItemsList.tsx +++ b/src/renderer/components/QueuedItemsList.tsx @@ -133,7 +133,8 @@ export const QueuedItemsList = memo( {/* Queued items */} {filteredQueue.map((item, index) => { - const displayText = item.type === 'command' ? (item.command ?? '') : (item.text ?? ''); + const displayText = + item.type === 'command' ? (item.command ?? '') : (item.displayText ?? item.text ?? ''); const isLongMessage = displayText.length > 200; const isQueuedExpanded = expandedQueuedMessages.has(item.id); const isDragging = dragIndex === index; diff --git a/src/renderer/components/TerminalOutput.tsx b/src/renderer/components/TerminalOutput.tsx index 327143c945..b86d11729a 100644 --- a/src/renderer/components/TerminalOutput.tsx +++ b/src/renderer/components/TerminalOutput.tsx @@ -402,6 +402,21 @@ const LogItemComponent = memo( : htmlContent; const isUserMessage = log.source === 'user'; + const isInterjection = isUserMessage && log.interjection === true; + const interjectionStatus = isInterjection + ? log.deliveryFailed + ? 'failed' + : log.delivered + ? 'interjection' + : 'queued' + : null; + const interjectionStatusColor = isInterjection + ? log.deliveryFailed + ? theme.colors.error + : log.delivered + ? theme.colors.warning + : theme.colors.textDim + : undefined; const isReversed = isUserMessage ? userMessageAlignment === 'left' : userMessageAlignment === 'right'; @@ -442,20 +457,24 @@ const LogItemComponent = memo( className={`flex-1 min-w-0 p-4 pb-10 rounded-xl border ${isReversed ? 'rounded-tr-none' : 'rounded-tl-none'} relative overflow-hidden`} style={{ backgroundColor: isUserMessage - ? isAIMode - ? `color-mix(in srgb, ${theme.colors.accent} 20%, ${theme.colors.bgSidebar})` - : `color-mix(in srgb, ${theme.colors.accent} 15%, ${theme.colors.bgActivity})` + ? isInterjection + ? `color-mix(in srgb, ${theme.colors.warning} 15%, ${theme.colors.bgSidebar})` + : isAIMode + ? `color-mix(in srgb, ${theme.colors.accent} 20%, ${theme.colors.bgSidebar})` + : `color-mix(in srgb, ${theme.colors.accent} 15%, ${theme.colors.bgActivity})` : log.source === 'stderr' || log.source === 'error' ? `color-mix(in srgb, ${theme.colors.error} 8%, ${theme.colors.bgActivity})` : isAIMode ? theme.colors.bgActivity : 'transparent', borderColor: - isUserMessage && isAIMode - ? theme.colors.accent + '40' - : log.source === 'stderr' || log.source === 'error' - ? theme.colors.error - : theme.colors.border, + isUserMessage && isInterjection + ? theme.colors.warning + '40' + : isUserMessage && isAIMode + ? theme.colors.accent + '40' + : log.source === 'stderr' || log.source === 'error' + ? theme.colors.error + : theme.colors.border, }} > {/* Local filter icon for system output only */} @@ -497,6 +516,17 @@ const LogItemComponent = memo( ))}
)} + {isInterjection && ( + + {interjectionStatus} + + )} {log.source === 'stderr' && (
Promise<{ pid: number; success: boolean }>; write: (sessionId: string, data: string) => Promise; + writeInterjection: ( + sessionId: string, + text: string, + interjectionId?: string, + images?: string[] + ) => Promise; interrupt: (sessionId: string) => Promise; kill: (sessionId: string) => Promise; resize: (sessionId: string, cols: number, rows: number) => Promise; @@ -289,6 +295,7 @@ interface MaestroAPI { workingDirOverride?: string; }; }) => Promise<{ exitCode: number }>; + hasResultEmitted: (sessionId: string) => Promise; getActiveProcesses: () => Promise< Array<{ sessionId: string; @@ -312,6 +319,9 @@ interface MaestroAPI { onSessionId: (callback: (sessionId: string, agentSessionId: string) => void) => () => void; onSlashCommands: (callback: (sessionId: string, slashCommands: string[]) => void) => () => void; onThinkingChunk: (callback: (sessionId: string, content: string) => void) => () => void; + onInterjectionAck: ( + callback: (sessionId: string, interjectionId: string) => void + ) => () => void; onToolExecution: ( callback: ( sessionId: string, diff --git a/src/renderer/hooks/agent/useAgentCapabilities.ts b/src/renderer/hooks/agent/useAgentCapabilities.ts index 4beb6d23a3..58125f2246 100644 --- a/src/renderer/hooks/agent/useAgentCapabilities.ts +++ b/src/renderer/hooks/agent/useAgentCapabilities.ts @@ -60,6 +60,9 @@ export interface AgentCapabilities { /** Agent supports --input-format stream-json for image input via stdin */ supportsStreamJsonInput: boolean; + /** Agent can receive additional user messages via stdin while processing a turn */ + supportsMidTurnInput: boolean; + /** Agent emits streaming thinking/reasoning content that can be displayed */ supportsThinkingDisplay: boolean; @@ -106,6 +109,7 @@ export const DEFAULT_CAPABILITIES: AgentCapabilities = { supportsResultMessages: false, supportsModelSelection: false, supportsStreamJsonInput: false, + supportsMidTurnInput: false, supportsThinkingDisplay: false, supportsContextMerge: false, supportsContextExport: false, diff --git a/src/renderer/hooks/agent/useAgentListeners.ts b/src/renderer/hooks/agent/useAgentListeners.ts index 1bd66c73fa..c74abea428 100644 --- a/src/renderer/hooks/agent/useAgentListeners.ts +++ b/src/renderer/hooks/agent/useAgentListeners.ts @@ -383,13 +383,19 @@ export function useAgentListeners(deps: UseAgentListenersDeps): void { if (isFromAi) { const currentSession = getSessions().find((s) => s.id === actualSessionId); if (currentSession) { + // Find the first queue item that is NOT a pending interjection. + // Pending interjections were already written to stdin and are + // waiting for interjection-ack — they must not be spawned again. + const nextSpawnableItem = currentSession.executionQueue.find( + (q) => !q.pendingInterjection + ); if ( - currentSession.executionQueue.length > 0 && + nextSpawnableItem && !(currentSession.state === 'error' && currentSession.agentError) ) { queuedItemToProcess = { sessionId: actualSessionId, - item: currentSession.executionQueue[0], + item: nextSpawnableItem, }; } @@ -541,17 +547,50 @@ export function useAgentListeners(deps: UseAgentListenersDeps): void { }; } - if (s.executionQueue.length > 0) { - const [nextItem, ...remainingQueue] = s.executionQueue; + // Separate pending interjections (already sent to stdin) from + // spawnable items. Orphaned interjections (process exited before + // ack) are moved to logs as failed delivery. + const orphanedInterjections = s.executionQueue.filter((q) => q.pendingInterjection); + const spawnableQueue = s.executionQueue.filter((q) => !q.pendingInterjection); + + // Move orphaned interjections to logs as failed + let tabsAfterOrphans = s.aiTabs; + if (orphanedInterjections.length > 0) { + tabsAfterOrphans = tabsAfterOrphans.map((tab) => { + const orphansForTab = orphanedInterjections.filter((q) => q.tabId === tab.id); + if (orphansForTab.length === 0) return tab; + return { + ...tab, + logs: [ + ...tab.logs, + ...orphansForTab.map((q) => ({ + id: q.id, + timestamp: q.timestamp, + source: 'user' as const, + text: q.displayText || q.text || '', + images: q.images, + interjection: true, + delivered: false, + deliveryFailed: true, + })), + ], + }; + }); + } + + if (spawnableQueue.length > 0) { + const [nextItem, ...remainingQueue] = spawnableQueue; const targetTab = - s.aiTabs.find((tab) => tab.id === nextItem.tabId) || getActiveTab(s); + tabsAfterOrphans.find((tab) => tab.id === nextItem.tabId) || + getActiveTab({ ...s, aiTabs: tabsAfterOrphans }); if (!targetTab) { return { ...s, state: 'busy' as SessionState, busySource: 'ai', + aiTabs: tabsAfterOrphans, executionQueue: remainingQueue, thinkingStartTime: Date.now(), currentCycleTokens: 0, @@ -559,7 +598,7 @@ export function useAgentListeners(deps: UseAgentListenersDeps): void { }; } - let updatedAiTabs = s.aiTabs.map((tab) => { + let updatedAiTabs = tabsAfterOrphans.map((tab) => { if (tab.id === targetTab.id) { return { ...tab, @@ -576,12 +615,12 @@ export function useAgentListeners(deps: UseAgentListenersDeps): void { return tab; }); - if (nextItem.type === 'message' && nextItem.text) { + if (nextItem.type === 'message' && nextItem.text && !nextItem.interjectionLogId) { const logEntry: LogEntry = { id: generateId(), timestamp: Date.now(), source: 'user', - text: nextItem.text, + text: nextItem.displayText || nextItem.text, images: nextItem.images, }; updatedAiTabs = updatedAiTabs.map((tab) => @@ -607,8 +646,8 @@ export function useAgentListeners(deps: UseAgentListenersDeps): void { } const updatedAiTabs = - s.aiTabs?.length > 0 - ? s.aiTabs.map((tab) => { + tabsAfterOrphans?.length > 0 + ? tabsAfterOrphans.map((tab) => { if (tabIdFromSession) { return tab.id === tabIdFromSession ? { @@ -627,7 +666,7 @@ export function useAgentListeners(deps: UseAgentListenersDeps): void { : tab; } }) - : s.aiTabs; + : tabsAfterOrphans; const anyTabStillBusy = updatedAiTabs.some((tab) => tab.state === 'busy'); const newState = @@ -659,6 +698,8 @@ export function useAgentListeners(deps: UseAgentListenersDeps): void { thinkingStartTime: anyTabStillBusy ? s.thinkingStartTime : undefined, pendingAICommandForSynopsis: undefined, aiTabs: updatedAiTabs, + // Clear orphaned pending interjections from queue + executionQueue: spawnableQueue, }; } @@ -1552,6 +1593,77 @@ export function useAgentListeners(deps: UseAgentListenersDeps): void { } ); + // ================================================================ + // onInterjectionAck — CLI consumed a mid-turn interjection + // ================================================================ + const unsubscribeInterjectionAck = window.maestro.process.onInterjectionAck?.( + (sessionId: string, interjectionId: string) => { + const aiTabMatch = sessionId.match(/^(.+)-ai-(.+)$/); + if (!aiTabMatch) return; + + const actualSessionId = aiTabMatch[1]; + const tabId = aiTabMatch[2]; + + setSessions((prev) => + prev.map((s) => { + if (s.id !== actualSessionId) return s; + + // Find the queued interjection to move to chat history + const queuedItem = s.executionQueue.find((q) => q.id === interjectionId); + + if (queuedItem) { + // Move from queue to tab.logs as a delivered user message. + // This is the primary path for native stdin interjections + // (Claude Code, Factory Droid). + // Use the queued item's tabId as the authoritative target + // (parsed tabId from sessionId should match, but this is safer). + const targetTabId = queuedItem.tabId || tabId; + return { + ...s, + executionQueue: s.executionQueue.filter((q) => q.id !== interjectionId), + aiTabs: s.aiTabs.map((tab) => { + if (tab.id !== targetTabId) return tab; + return { + ...tab, + logs: [ + ...tab.logs, + { + id: interjectionId, + timestamp: queuedItem.timestamp, + source: 'user' as const, + text: queuedItem.displayText || queuedItem.text || '', + images: queuedItem.images, + interjection: true, + delivered: true, + deliveryFailed: false, + }, + ], + }; + }), + }; + } + + // Fallback: entry already in logs (interrupt-and-resume path), + // just mark it as delivered + return { + ...s, + aiTabs: s.aiTabs.map((tab) => { + if (tab.id !== tabId) return tab; + return { + ...tab, + logs: tab.logs.map((log) => + log.id === interjectionId + ? { ...log, delivered: true, deliveryFailed: false } + : log + ), + }; + }), + }; + }) + ); + } + ); + // ================================================================ // Cleanup — unsubscribe all listeners on unmount // ================================================================ @@ -1567,6 +1679,7 @@ export function useAgentListeners(deps: UseAgentListenersDeps): void { unsubscribeThinkingChunk?.(); unsubscribeSshRemote?.(); unsubscribeToolExecution?.(); + unsubscribeInterjectionAck?.(); // Cancel any pending thinking chunk RAF and clear buffer if (thinkingChunkRafIdRef.current !== null) { cancelAnimationFrame(thinkingChunkRafIdRef.current); diff --git a/src/renderer/hooks/input/useInputProcessing.ts b/src/renderer/hooks/input/useInputProcessing.ts index 6a8667c3f2..f69eccf98a 100644 --- a/src/renderer/hooks/input/useInputProcessing.ts +++ b/src/renderer/hooks/input/useInputProcessing.ts @@ -15,12 +15,47 @@ import { filterYoloArgs } from '../../utils/agentArgs'; import { hasCapabilityCached } from '../agent/useAgentCapabilities'; import { gitService } from '../../services/git'; import { imageOnlyDefaultPrompt, maestroSystemPrompt } from '../../../prompts'; +import { buildContinuationPrompt, buildResumePrompt } from '../../utils/continuationPrompt'; /** * Default prompt used when user sends only an image without text. */ export const DEFAULT_IMAGE_ONLY_PROMPT = imageOnlyDefaultPrompt; +/** + * Max characters of partial output to include in a continuation prompt. + * Prevents enormous prompts when an agent has streamed a lot of output. + * 50k chars is roughly 12-15k tokens, leaving plenty of room in context. + */ +const MAX_CONTINUATION_OUTPUT_CHARS = 50_000; + +/** + * Extract the AI/stdout output from the current turn only. + * Finds the last non-interjection user message and captures everything after it. + * Used by the interrupt-and-continue fallback to build a continuation prompt. + * Capped at MAX_CONTINUATION_OUTPUT_CHARS to prevent oversized prompts. + */ +function captureCurrentTurnOutput(logs: LogEntry[]): string { + let lastUserMsgIndex = -1; + for (let i = logs.length - 1; i >= 0; i--) { + if (logs[i].source === 'user' && !logs[i].interjection) { + lastUserMsgIndex = i; + break; + } + } + const output = logs + .slice(lastUserMsgIndex + 1) + .filter((log) => log.source === 'ai' || log.source === 'stdout') + .map((log) => log.text) + .join(''); + + if (output.length > MAX_CONTINUATION_OUTPUT_CHARS) { + // Keep the tail (most recent output is most relevant for continuation) + return output.slice(-MAX_CONTINUATION_OUTPUT_CHARS); + } + return output; +} + /** * Dependencies for the useInputProcessing hook. */ @@ -382,6 +417,263 @@ export function useInputProcessing(deps: UseInputProcessingDeps): UseInputProces return; } + // Mid-turn interjection: if agent is busy and supports mid-turn input, + // send directly to the running process via stdin (bypass queue) + if (currentMode === 'ai' && activeSession.state === 'busy') { + const activeTab = getActiveTab(activeSession); + if (!activeTab) return; + const processSessionId = `${activeSession.id}-ai-${activeTab.id}`; + const agentSupportsMidTurn = hasCapabilityCached( + activeSession.toolType, + 'supportsMidTurnInput' + ); + + // Race condition guard: if agent already finished, queue as next message instead + const resultEmitted = await window.maestro.process.hasResultEmitted(processSessionId); + if (resultEmitted) { + // Agent already done — no return here, intentionally falls through + // to the normal queue logic below this outer if block + console.log('[processInput] Agent already finished, queueing instead of interjecting'); + } else if (agentSupportsMidTurn && activeTab?.state === 'busy') { + // Capture staged images before clearing + const imagesToSend = stagedImages.length > 0 ? [...stagedImages] : undefined; + + // For image-only interjections (images but no text), use the default prompt + const interjectionText = + !effectiveInputValue.trim() && imagesToSend + ? DEFAULT_IMAGE_ONLY_PROMPT + : effectiveInputValue; + + // Add interjection to executionQueue only. It shows in the queue UI + // while pending. On successful write, it moves to tab.logs as + // delivered. On failure, it moves to tab.logs as failed. + const interjectionEntryId = generateId(); + const queuedInterjection: QueuedItem = { + id: interjectionEntryId, + timestamp: Date.now(), + tabId: activeTab.id, + type: 'message', + text: interjectionText, + displayText: interjectionText, + images: [...stagedImages], + tabName: activeTab.name || 'New', + interjectionLogId: interjectionEntryId, + pendingInterjection: true, // Already sent to stdin — don't spawn on exit + }; + + // Flush any pending batched updates so queue appears in order + if (flushBatchedUpdates) flushBatchedUpdates(); + + setSessions((prev) => + prev.map((s) => { + if (s.id !== activeSessionId) return s; + return { + ...s, + executionQueue: [...s.executionQueue, queuedInterjection], + }; + }) + ); + + // Move interjection from queue to chat history as failed delivery + const markInterjectionFailed = () => { + setSessions((prev) => + prev.map((s) => { + if (s.id !== activeSessionId) return s; + return { + ...s, + executionQueue: s.executionQueue.filter((q) => q.id !== interjectionEntryId), + aiTabs: s.aiTabs.map((tab) => { + if (tab.id !== activeTab.id) return tab; + return { + ...tab, + logs: [ + ...tab.logs, + { + id: interjectionEntryId, + timestamp: Date.now(), + source: 'user' as const, + text: interjectionText, + images: [...stagedImages], + interjection: true, + delivered: false, + deliveryFailed: true, + }, + ], + }; + }), + }; + }) + ); + }; + + // Send interjection via IPC (formats stream-json in main process). + // On successful write, move from queue to logs immediately. + // We don't wait for interjection-ack here because the renderer + // optimistically marks delivery on successful write. The main + // process StdoutHandler tracks the two-result cycle (pre-interjection + // result, then interjection response result) and emits an ack when + // the first result arrives. The ack promotes the queue entry to logs + // in useAgentListeners, but the renderer doesn't gate on it. + window.maestro.process + .writeInterjection( + processSessionId, + interjectionText, + interjectionEntryId, + imagesToSend + ) + .then((success) => { + if (success) { + // Write confirmed — move from queue to chat history + setSessions((prev) => + prev.map((s) => { + if (s.id !== activeSessionId) return s; + return { + ...s, + executionQueue: s.executionQueue.filter((q) => q.id !== interjectionEntryId), + aiTabs: s.aiTabs.map((tab) => { + if (tab.id !== activeTab.id) return tab; + return { + ...tab, + logs: [ + ...tab.logs, + { + id: interjectionEntryId, + timestamp: Date.now(), + source: 'user' as const, + text: interjectionText, + images: [...stagedImages], + interjection: true, + delivered: true, + deliveryFailed: false, + }, + ], + }; + }), + }; + }) + ); + } else { + console.warn('[processInput] Interjection write failed'); + markInterjectionFailed(); + } + }) + .catch((error) => { + console.error('[processInput] Interjection failed:', error); + markInterjectionFailed(); + }); + + // Clear input + setInputValue(''); + setStagedImages([]); + syncAiInputToSession(''); + if (inputRef.current) inputRef.current.style.height = 'auto'; + return; + } else if (activeTab?.state === 'busy') { + // Interrupt-and-continue: agent doesn't support mid-turn stdin, + // so interrupt the current turn and respawn with combined context. + // + // Two sub-paths: + // A) Native resume — agent supports resume and has a session ID. + // Just interrupt and re-spawn with the user's message + session ID. + // The agent loads full conversation history from its own session files. + // B) Continuation prompt fallback — agent has no resume support. + // Capture partial stdout, build hand-crafted continuation prompt. + + const agentSupportsResume = hasCapabilityCached(activeSession.toolType, 'supportsResume'); + const agentSessionId = activeTab.agentSessionId; + const useNativeResume = agentSupportsResume && !!agentSessionId; + + // Build the prompt: + // - Native resume: agent loads full history from session files, + // but needs instruction to continue the interrupted task + // - Fallback: no session files, must include partial output inline + // Always capture partial output so we have a fallback if resume fails + const partialOutput = captureCurrentTurnOutput(activeTab.logs); + const continuationFallback = buildContinuationPrompt(partialOutput, effectiveInputValue); + const promptForQueue = useNativeResume + ? buildResumePrompt(effectiveInputValue) + : continuationFallback; + + // Add the user's interjection log entry + // delivered: false so UI shows "queued" until the agent actually spawns + const interjectionEntryId = generateId(); + const interjectionEntry: LogEntry = { + id: interjectionEntryId, + timestamp: Date.now(), + source: 'user', + text: effectiveInputValue, + images: [...stagedImages], + interjection: true, + delivered: false, + deliveryFailed: false, + }; + + if (flushBatchedUpdates) flushBatchedUpdates(); + + setSessions((prev) => + prev.map((s) => { + if (s.id !== activeSessionId) return s; + return { + ...s, + aiTabs: s.aiTabs.map((tab) => { + if (tab.id !== activeTab.id) return tab; + return { + ...tab, + logs: [...tab.logs, interjectionEntry], + }; + }), + }; + }) + ); + + // Interrupt the running process + window.maestro.process.interrupt(processSessionId).catch((error) => { + console.error('[processInput] Interrupt failed:', error); + }); + + // Queue the continuation at the FRONT of the queue. + // It must run before any older queued work for this session so the + // interrupted turn resumes immediately after exit. + const continuationItem: QueuedItem = { + id: generateId(), + timestamp: Date.now(), + tabId: activeTab.id, + type: 'message', + text: promptForQueue, + // Show user's raw message in queue UI, not the continuation prompt wrapper + displayText: effectiveInputValue, + images: [...stagedImages], + tabName: activeTab.name || 'New', + readOnlyMode: activeTab.readOnlyMode === true, + interjectionLogId: interjectionEntryId, + // If resume spawn fails, retry with continuation prompt (includes partial output) + fallbackText: useNativeResume ? continuationFallback : undefined, + }; + + setSessions((prev) => + prev.map((s) => { + if (s.id !== activeSessionId) return s; + // Replace any existing continuation for this tab to avoid + // stacking stale fallbacks on rapid double-interrupt + const filtered = s.executionQueue.filter( + (q) => !(q.tabId === activeTab.id && q.interjectionLogId) + ); + return { + ...s, + executionQueue: [continuationItem, ...filtered], + }; + }) + ); + + // Clear input + setInputValue(''); + setStagedImages([]); + syncAiInputToSession(''); + if (inputRef.current) inputRef.current.style.height = 'auto'; + return; + } + } + // Queue messages when AI is busy (only in AI mode) // For read-only mode tabs: only queue if THIS TAB is busy (allows parallel execution) // For write mode tabs: queue if ANY tab in session is busy (prevents conflicts) diff --git a/src/renderer/stores/agentStore.ts b/src/renderer/stores/agentStore.ts index b651724264..b07757a3b6 100644 --- a/src/renderer/stores/agentStore.ts +++ b/src/renderer/stores/agentStore.ts @@ -275,14 +275,18 @@ export const useAgentStore = create()((set, get) => ({ } const targetSessionId = `${sessionId}-ai-${targetTab.id}`; + // Use resolved tab ID for delivery tracking (item.tabId may be missing if fallback was used) + const deliveryTabId = item.tabId || targetTab.id; try { // Get agent configuration for this session's tool type const agent = await window.maestro.agents.get(session.toolType); if (!agent) throw new Error(`Agent not found for toolType: ${session.toolType}`); - // Get the TARGET TAB's agentSessionId for session continuity - const tabAgentSessionId = targetTab.agentSessionId; + // Get the TARGET TAB's agentSessionId for session continuity. + // skipResume: when a resume attempt failed and we're retrying with + // a continuation prompt, don't pass the session ID (avoids re-triggering resume args). + const tabAgentSessionId = item.skipResume ? undefined : targetTab.agentSessionId; const isReadOnly = item.readOnlyMode || targetTab.readOnlyMode; // Filter out YOLO/skip-permissions flags when read-only mode is active @@ -297,6 +301,13 @@ export const useAgentStore = create()((set, get) => ({ const hasText = item.text && item.text.trim(); const isImageOnlyMessage = item.type === 'message' && hasImages && !hasText; + // Compute stdin transport flags for Windows (applies to both messages and commands) + const { sendPromptViaStdin, sendPromptViaStdinRaw } = getStdinFlags({ + isSshSession: !!session.sshRemoteId || !!session.sessionSshRemoteConfig?.enabled, + supportsStreamJsonInput: agent.capabilities?.supportsStreamJsonInput ?? false, + hasImages: !!hasImages, + }); + if (item.type === 'message' && (hasText || isImageOnlyMessage)) { // Process a message - spawn agent with the message text const effectivePrompt = isImageOnlyMessage ? DEFAULT_IMAGE_ONLY_PROMPT : item.text!; @@ -363,6 +374,30 @@ export const useAgentStore = create()((set, get) => ({ sendPromptViaStdin, sendPromptViaStdinRaw, }); + + // Mark the associated interjection log entry as delivered + // (interrupt-and-resume path: entry shows "queued" until agent spawns) + if (item.interjectionLogId) { + useSessionStore.getState().setSessions((prev) => + prev.map((s) => { + if (s.id !== sessionId) return s; + return { + ...s, + aiTabs: s.aiTabs.map((tab) => { + if (tab.id !== deliveryTabId) return tab; + return { + ...tab, + logs: tab.logs.map((log) => + log.id === item.interjectionLogId + ? { ...log, delivered: true, deliveryFailed: false } + : log + ), + }; + }), + }; + }) + ); + } } else if (item.type === 'command' && item.command) { // Process a slash command - find matching command // Check user-defined commands first, then agent-discovered commands with prompts @@ -466,6 +501,30 @@ export const useAgentStore = create()((set, get) => ({ sendPromptViaStdin: cmdSendViaStdin, sendPromptViaStdinRaw: cmdSendViaStdinRaw, }); + + // Mark the associated interjection log entry as delivered + // (mirrors the message branch above) + if (item.interjectionLogId) { + useSessionStore.getState().setSessions((prev) => + prev.map((s) => { + if (s.id !== sessionId) return s; + return { + ...s, + aiTabs: s.aiTabs.map((tab) => { + if (tab.id !== deliveryTabId) return tab; + return { + ...tab, + logs: tab.logs.map((log) => + log.id === item.interjectionLogId + ? { ...log, delivered: true, deliveryFailed: false } + : log + ), + }; + }), + }; + }) + ); + } } else { // Unknown command - add error log and reset to idle useSessionStore.getState().addLogToTab(sessionId, { @@ -496,7 +555,48 @@ export const useAgentStore = create()((set, get) => ({ } } } catch (error: any) { + // If this was a resume attempt and we have a fallback prompt, retry without + // session resume (falls back to continuation prompt with partial output) + if (item.fallbackText) { + console.warn( + '[processQueuedItem] Resume spawn failed, retrying with continuation fallback:', + error.message + ); + const fallbackItem: QueuedItem = { + ...item, + text: item.fallbackText, + fallbackText: undefined, // Don't retry again + skipResume: true, // Don't re-trigger resume args + }; + await get().processQueuedItem(sessionId, fallbackItem, deps); + return; + } + console.error('[processQueuedItem] Failed to process queued item:', error); + + // Mark associated interjection as failed if spawn errored + if (item.interjectionLogId) { + useSessionStore.getState().setSessions((prev) => + prev.map((s) => { + if (s.id !== sessionId) return s; + return { + ...s, + aiTabs: s.aiTabs.map((tab) => { + if (tab.id !== deliveryTabId) return tab; + return { + ...tab, + logs: tab.logs.map((log) => + log.id === item.interjectionLogId + ? { ...log, delivered: false, deliveryFailed: true } + : log + ), + }; + }), + }; + }) + ); + } + const errorLogEntry: LogEntry = { id: generateId(), timestamp: Date.now(), diff --git a/src/renderer/stores/sessionStore.ts b/src/renderer/stores/sessionStore.ts index dff230920d..528ca18a5b 100644 --- a/src/renderer/stores/sessionStore.ts +++ b/src/renderer/stores/sessionStore.ts @@ -281,6 +281,9 @@ export const useSessionStore = create()((set) => ({ text: logEntry.text, ...(logEntry.images && { images: logEntry.images }), ...(logEntry.delivered !== undefined && { delivered: logEntry.delivered }), + ...(logEntry.deliveryFailed !== undefined && { + deliveryFailed: logEntry.deliveryFailed, + }), ...('aiCommand' in logEntry && logEntry.aiCommand && { aiCommand: logEntry.aiCommand }), }; diff --git a/src/renderer/types/index.ts b/src/renderer/types/index.ts index 0555ff6371..1b048a3d42 100644 --- a/src/renderer/types/index.ts +++ b/src/renderer/types/index.ts @@ -190,8 +190,12 @@ export interface LogEntry { }; // For user messages - tracks if message was successfully delivered to the agent delivered?: boolean; + // For mid-turn interjections - tracks write failures before CLI acknowledgment + deliveryFailed?: boolean; // For user messages - tracks if message was sent in read-only mode readOnly?: boolean; + // For user messages — tracks if message was sent as a mid-turn interjection + interjection?: boolean; // For error entries - stores the full AgentError for "View Details" functionality agentError?: AgentError; // For tool execution entries - stores tool state and details @@ -224,6 +228,19 @@ export interface QueuedItem { tabName?: string; // Tab name at time of queuing (for display) // Read-only mode tracking (for parallel execution bypass) readOnlyMode?: boolean; // True if queued from a read-only tab + // Delivery tracking for interrupt-and-resume interjections + interjectionLogId?: string; // Log entry ID to mark as delivered when this item spawns + // Display override for queue UI (e.g., show user's raw message instead of continuation prompt) + displayText?: string; + // True for native stdin interjections already written to the process. + // These are NOT waiting to be spawned — they're waiting for interjection-ack. + // The onExit handler must skip these (they're not normal queue items). + pendingInterjection?: boolean; + // Fallback prompt if resume spawn fails (contains partial output context). + // Used by processQueuedItem to retry without session resume. + fallbackText?: string; + // When true, processQueuedItem skips session resume (used on fallback retry). + skipResume?: boolean; } export interface WorkLogItem { @@ -761,6 +778,7 @@ export interface AgentCapabilities { supportsResultMessages: boolean; supportsModelSelection?: boolean; supportsStreamJsonInput?: boolean; + supportsMidTurnInput?: boolean; supportsThinkingDisplay?: boolean; supportsContextMerge?: boolean; supportsContextExport?: boolean; diff --git a/src/renderer/utils/continuationPrompt.ts b/src/renderer/utils/continuationPrompt.ts new file mode 100644 index 0000000000..0e6e329a94 --- /dev/null +++ b/src/renderer/utils/continuationPrompt.ts @@ -0,0 +1,45 @@ +/** + * Build a resume prompt for agents that support native session resume. + * The agent already has full conversation history from its session files, + * so we don't need to include partial output. We just need to tell it + * to continue the interrupted task while incorporating the user's message. + */ +export function buildResumePrompt(userInterjection: string): string { + return [ + '[System context: You were interrupted during your previous response. Your full conversation history has been restored via session resume.]', + '', + "[The user's interjection:]", + userInterjection, + '', + "[Continue from where you left off, incorporating the user's guidance. Complete any tasks that were in progress when interrupted.]", + ].join('\n'); +} + +/** + * Build a continuation prompt that combines partial agent output with a user's + * mid-turn interjection. Used when an agent doesn't support native stdin + * injection and must be interrupted and restarted. + */ +export function buildContinuationPrompt(partialOutput: string, userInterjection: string): string { + const trimmedOutput = partialOutput.trim(); + const parts: string[] = []; + + parts.push('[System context: The user interjected during your previous response.'); + if (trimmedOutput) { + parts.push('Here is what you had produced so far:]'); + parts.push(''); + parts.push(''); + parts.push(trimmedOutput); + parts.push(''); + } else { + parts.push('You had not yet produced any output.]'); + } + + parts.push(''); + parts.push("[The user's interjection:]"); + parts.push(userInterjection); + parts.push(''); + parts.push("[Continue from where you left off, incorporating the user's guidance.]"); + + return parts.join('\n'); +}