diff --git a/services/cloud-agent-next/src/execution/orchestrator.test.ts b/services/cloud-agent-next/src/execution/orchestrator.test.ts new file mode 100644 index 0000000000..536daa8889 --- /dev/null +++ b/services/cloud-agent-next/src/execution/orchestrator.test.ts @@ -0,0 +1,169 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import type { Env, SandboxInstance } from '../types.js'; +import type { ExecutionPlan } from './types.js'; + +const { ensureWrapperMock, promptMock, resumeMock, initiateMock, getOrCreateSessionMock } = + vi.hoisted(() => ({ + ensureWrapperMock: vi.fn(), + promptMock: vi.fn(), + resumeMock: vi.fn(), + initiateMock: vi.fn(), + getOrCreateSessionMock: vi.fn(), + })); + +vi.mock('../session-service.js', () => ({ + SessionService: class SessionService { + resume = resumeMock; + initiateWithRetry = initiateMock; + getOrCreateSession = getOrCreateSessionMock; + }, +})); + +vi.mock('../kilo/wrapper-client.js', () => ({ + WrapperClient: { + ensureWrapper: ensureWrapperMock, + }, +})); + +vi.mock('./image-prompt-parts.js', () => ({ + buildImagePromptParts: vi.fn(), + downloadImagePromptParts: vi.fn().mockResolvedValue([]), +})); + +import { ExecutionOrchestrator } from './orchestrator.js'; + +const preparedSession = { + session: { exec: vi.fn() }, + context: { + workspacePath: '/workspace/test', + upstreamBranch: 'main', + }, +}; + +const basePlan = { + executionId: 'exc_test', + sessionId: 'agent_test', + userId: 'user_test', + prompt: 'Review this change', + mode: 'code', + workspace: { + shouldPrepare: false, + sandboxId: 'sandbox_test', + resumeContext: { + kiloSessionId: 'kilo_existing', + workspacePath: '/workspace/test', + kilocodeToken: 'kilo_token', + branchName: 'session/agent_test', + }, + }, + wrapper: { + kiloSessionId: 'kilo_existing', + model: { providerID: 'kilocode', modelID: 'test-model' }, + }, +} satisfies ExecutionPlan; + +function createOrchestrator() { + const sandbox = {} as SandboxInstance; + const recordKiloServerActivity = vi.fn().mockResolvedValue(undefined); + + const orchestrator = new ExecutionOrchestrator({ + getSandbox: vi.fn().mockResolvedValue(sandbox), + getSessionStub: vi.fn().mockReturnValue({ recordKiloServerActivity }), + getIngestUrl: vi.fn().mockReturnValue('wss://ingest.example.com/ingest'), + env: {} as Env, + }); + + return orchestrator; +} + +describe('ExecutionOrchestrator tool overrides', () => { + beforeEach(() => { + vi.clearAllMocks(); + promptMock.mockResolvedValue({ messageId: 'msg_test' }); + ensureWrapperMock.mockResolvedValue({ + client: { prompt: promptMock }, + sessionId: 'kilo_existing', + }); + resumeMock.mockResolvedValue(preparedSession); + initiateMock.mockResolvedValue(preparedSession); + getOrCreateSessionMock.mockResolvedValue(preparedSession.session); + }); + + it('disables interactive tools for code-review resume executions', async () => { + const orchestrator = createOrchestrator(); + const plan = { + ...basePlan, + workspace: { + ...basePlan.workspace, + resumeContext: { + ...basePlan.workspace.resumeContext, + createdOnPlatform: 'code-review', + }, + }, + } satisfies ExecutionPlan; + + await orchestrator.execute(plan); + + expect(promptMock).toHaveBeenCalledWith( + expect.objectContaining({ + tools: { + question: false, + plan_enter: false, + plan_exit: false, + }, + }) + ); + }); + + it('does not send tool overrides for non-code-review executions', async () => { + const orchestrator = createOrchestrator(); + + await orchestrator.execute(basePlan); + + expect(promptMock).toHaveBeenCalledWith( + expect.objectContaining({ + tools: undefined, + }) + ); + }); + + it('uses existing metadata to detect code-review fast-path executions', async () => { + const orchestrator = createOrchestrator(); + const plan = { + ...basePlan, + workspace: { + shouldPrepare: true, + sandboxId: 'sandbox_test', + initContext: { + kilocodeToken: 'kilo_token', + isPreparedSession: true, + }, + existingMetadata: { + workspacePath: '/workspace/test', + kiloSessionId: 'kilo_existing', + sandboxId: 'sandbox_test', + sessionHome: '/home/agent_test', + branchName: 'session/agent_test', + createdOnPlatform: 'code-review', + }, + }, + } satisfies ExecutionPlan; + + await orchestrator.execute(plan); + + expect(promptMock).toHaveBeenCalledWith( + expect.objectContaining({ + tools: { + question: false, + plan_enter: false, + plan_exit: false, + }, + }) + ); + expect(getOrCreateSessionMock).toHaveBeenCalledWith( + expect.objectContaining({ + createdOnPlatform: 'code-review', + }) + ); + }); +}); diff --git a/services/cloud-agent-next/src/execution/orchestrator.ts b/services/cloud-agent-next/src/execution/orchestrator.ts index cf25dc93c6..8e3069ddb2 100644 --- a/services/cloud-agent-next/src/execution/orchestrator.ts +++ b/services/cloud-agent-next/src/execution/orchestrator.ts @@ -31,6 +31,12 @@ import { withSandboxInternalServerErrorRecovery } from '../sandbox-recovery.js'; /** Maximum time allowed for workspace preparation (resume, init, fast path). */ const PREPARE_WORKSPACE_TIMEOUT_MS = 10 * 60 * 1000; +const CODE_REVIEW_DISABLED_TOOLS = { + question: false, + plan_enter: false, + plan_exit: false, +} satisfies Record; + function withWorkspacePreparationTimeout(operation: Promise, step: string): Promise { return withTimeout( operation, @@ -229,6 +235,7 @@ export class ExecutionOrchestrator { autoCommit: wrapper.autoCommit, condenseOnComplete: wrapper.condenseOnComplete, execution, + tools: this.getToolOverrides(plan), }; if (fileParts.length > 0) { @@ -341,7 +348,7 @@ export class ExecutionOrchestrator { originalToken: initContext.kilocodeToken, kilocodeModel: initContext.kilocodeModel ?? 'default', originalOrgId: orgId, - createdOnPlatform: initContext.createdOnPlatform, + createdOnPlatform: this.getCreatedOnPlatform(plan), appendSystemPrompt: existingMetadata.appendSystemPrompt, profile: mergeFastPathProfile(initContext.profile, existingMetadata.profile), }), @@ -487,6 +494,12 @@ export class ExecutionOrchestrator { ); } + private getToolOverrides(plan: ExecutionPlan): Record | undefined { + return this.getCreatedOnPlatform(plan) === 'code-review' + ? CODE_REVIEW_DISABLED_TOOLS + : undefined; + } + /** * Get the kilocode token from the plan. */ diff --git a/services/cloud-agent-next/test/unit/wrapper/reconnection.test.ts b/services/cloud-agent-next/test/unit/wrapper/reconnection.test.ts index 2a24101c06..a55279a141 100644 --- a/services/cloud-agent-next/test/unit/wrapper/reconnection.test.ts +++ b/services/cloud-agent-next/test/unit/wrapper/reconnection.test.ts @@ -127,12 +127,14 @@ const createCallbacks = (): ConnectionCallbacks & { onReconnected: ReturnType; onDisconnect: ReturnType; onTerminalError: ReturnType; + onSseEvent: ReturnType; } => ({ onMessageComplete: vi.fn(), onTerminalError: vi.fn(), onCommand: vi.fn(), onDisconnect: vi.fn(), onCompletionSignal: vi.fn(), + onSseEvent: vi.fn(), onReconnecting: vi.fn(), onReconnected: vi.fn(), }); @@ -782,6 +784,7 @@ describe('ingest WS reconnection', () => { const manager = createManagerWithClient(kiloClient); const ws = await openConnection(manager); + callbacks.onSseEvent.mockClear(); await vi.advanceTimersByTimeAsync(0); const questionEvents = parseSentMessages(ws).filter( @@ -791,6 +794,7 @@ describe('ingest WS reconnection', () => { expect(rejectQuestion).toHaveBeenCalledWith('q_123'); expect(callbacks.onDisconnect).not.toHaveBeenCalled(); expect(callbacks.onMessageComplete).not.toHaveBeenCalled(); + expect(callbacks.onSseEvent).toHaveBeenCalledTimes(1); }); it('rejects real-time code-review permissions without disconnecting', async () => { @@ -815,6 +819,7 @@ describe('ingest WS reconnection', () => { const manager = createManagerWithClient(kiloClient); const ws = await openConnection(manager); + callbacks.onSseEvent.mockClear(); await vi.advanceTimersByTimeAsync(0); const permissionEvents = parseSentMessages(ws).filter( @@ -824,6 +829,7 @@ describe('ingest WS reconnection', () => { expect(answerPermission).toHaveBeenCalledWith('p_456', 'reject'); expect(callbacks.onDisconnect).not.toHaveBeenCalled(); expect(callbacks.onMessageComplete).not.toHaveBeenCalled(); + expect(callbacks.onSseEvent).toHaveBeenCalledTimes(1); }); it.each(['question', 'permission'])( @@ -848,6 +854,7 @@ describe('ingest WS reconnection', () => { const manager = createManagerWithClient(kiloClient); const ws = await openConnection(manager); + callbacks.onSseEvent.mockClear(); await vi.advanceTimersByTimeAsync(0); const statusEvents = parseSentMessages(ws).filter(event => { @@ -864,9 +871,36 @@ describe('ingest WS reconnection', () => { expect(statusEvents).toHaveLength(0); expect(callbacks.onDisconnect).not.toHaveBeenCalled(); expect(callbacks.onMessageComplete).not.toHaveBeenCalled(); + expect(callbacks.onSseEvent).toHaveBeenCalledTimes(1); } ); + it('forwards real-time interactive questions for non-code-review jobs', async () => { + const kiloClient = createMockKiloClient({ + sdkClient: { + event: { + subscribe: vi.fn().mockResolvedValue({ + stream: createEventStream([ + { type: 'question.asked', properties: { id: 'q_123', sessionID: 'kilo_sess_456' } }, + ]), + }), + }, + } as unknown as WrapperKiloClient['sdkClient'], + }); + + const manager = createManagerWithClient(kiloClient); + const ws = await openConnection(manager); + callbacks.onSseEvent.mockClear(); + await vi.advanceTimersByTimeAsync(0); + + const questionEvents = parseSentMessages(ws).filter( + event => event.streamEventType === 'kilocode' && event.data.event === 'question.asked' + ); + expect(questionEvents).toHaveLength(1); + expect(kiloClient.rejectQuestion).not.toHaveBeenCalled(); + expect(callbacks.onSseEvent).toHaveBeenCalledTimes(1); + }); + it('forwards payment-style events and reports terminal errors', async () => { const kiloClient = createMockKiloClient({ sdkClient: { diff --git a/services/cloud-agent-next/test/unit/wrapper/snapshot.test.ts b/services/cloud-agent-next/test/unit/wrapper/snapshot.test.ts index 342415469a..1032e978d0 100644 --- a/services/cloud-agent-next/test/unit/wrapper/snapshot.test.ts +++ b/services/cloud-agent-next/test/unit/wrapper/snapshot.test.ts @@ -334,7 +334,7 @@ describe('sendKiloSnapshot → sendKiloState', () => { }); }); - it('replays pending questions for code-review snapshots without rejecting them', async () => { + it('suppresses pending questions for code-review snapshots without rejecting them', async () => { const pendingQuestion = { id: 'q_123', sessionID: 'kilo_sess_456', @@ -359,16 +359,12 @@ describe('sendKiloSnapshot → sendKiloState', () => { m => m.streamEventType === 'kilocode' && m.data.event === 'question.asked' ); - expect(questionEvents).toHaveLength(1); - expect(questionEvents[0].data).toMatchObject({ - event: 'question.asked', - properties: pendingQuestion, - }); + expect(questionEvents).toHaveLength(0); expect(rejectQuestion).not.toHaveBeenCalled(); expect(callbacks.onTerminalError).not.toHaveBeenCalled(); }); - it('replays pending permissions for code-review snapshots without rejecting them', async () => { + it('suppresses pending permissions for code-review snapshots without rejecting them', async () => { const pendingPermission = { id: 'p_456', sessionID: 'kilo_sess_456', @@ -394,16 +390,12 @@ describe('sendKiloSnapshot → sendKiloState', () => { m => m.streamEventType === 'kilocode' && m.data.event === 'permission.asked' ); - expect(permissionEvents).toHaveLength(1); - expect(permissionEvents[0].data).toMatchObject({ - event: 'permission.asked', - properties: pendingPermission, - }); + expect(permissionEvents).toHaveLength(0); expect(answerPermission).not.toHaveBeenCalled(); expect(callbacks.onTerminalError).not.toHaveBeenCalled(); }); - it('replays code-review question status snapshots as session.status events', async () => { + it('suppresses code-review question status snapshots', async () => { const kiloClient = createMockKiloClient({ getSessionStatuses: vi.fn().mockResolvedValue({ kilo_sess_456: { type: 'question' }, @@ -419,16 +411,11 @@ describe('sendKiloSnapshot → sendKiloState', () => { m => m.streamEventType === 'kilocode' && m.data.event === 'session.status' ); - expect(statusEvents).toHaveLength(1); - expect(statusEvents[0].data).toMatchObject({ - event: 'session.status', - sessionID: 'kilo_sess_456', - status: { type: 'question' }, - }); + expect(statusEvents).toHaveLength(0); expect(callbacks.onTerminalError).not.toHaveBeenCalled(); }); - it('replays code-review permission status snapshots as session.status events', async () => { + it('suppresses code-review permission status snapshots', async () => { const kiloClient = createMockKiloClient({ getSessionStatuses: vi.fn().mockResolvedValue({ kilo_sess_456: { type: 'permission' }, @@ -444,12 +431,7 @@ describe('sendKiloSnapshot → sendKiloState', () => { m => m.streamEventType === 'kilocode' && m.data.event === 'session.status' ); - expect(statusEvents).toHaveLength(1); - expect(statusEvents[0].data).toMatchObject({ - event: 'session.status', - sessionID: 'kilo_sess_456', - status: { type: 'permission' }, - }); + expect(statusEvents).toHaveLength(0); expect(callbacks.onTerminalError).not.toHaveBeenCalled(); }); diff --git a/services/cloud-agent-next/wrapper/src/connection.ts b/services/cloud-agent-next/wrapper/src/connection.ts index 2942724977..7c53ac2b79 100644 --- a/services/cloud-agent-next/wrapper/src/connection.ts +++ b/services/cloud-agent-next/wrapper/src/connection.ts @@ -28,6 +28,10 @@ function statusTypeFromProperties(properties: Record): string | return isRecord(status) && typeof status.type === 'string' ? status.type : undefined; } +function isInteractiveStatusType(statusType: string | undefined): boolean { + return statusType === 'question' || statusType === 'permission'; +} + function rejectCodeReviewQuestion( questionId: string | undefined, kiloClient: WrapperKiloClient @@ -226,23 +230,27 @@ export function createConnectionManager( const pendingQuestion = questions.find(q => q.sessionID === kiloSessionId); const pendingPermission = permissions.find(p => p.sessionID === kiloSessionId); + const codeReviewJob = isCodeReviewJob(state); + const skipStatusForCodeReview = codeReviewJob && isInteractiveStatusType(sessionStatus.type); // Send session status as a regular kilocode event - const statusProperties = { sessionID: kiloSessionId, status: sessionStatus }; - sendToIngest({ - streamEventType: 'kilocode', - data: { - ...statusProperties, - event: 'session.status', - type: 'session.status', - properties: statusProperties, - }, - timestamp: new Date().toISOString(), - }); + if (!skipStatusForCodeReview) { + const statusProperties = { sessionID: kiloSessionId, status: sessionStatus }; + sendToIngest({ + streamEventType: 'kilocode', + data: { + ...statusProperties, + event: 'session.status', + type: 'session.status', + properties: statusProperties, + }, + timestamp: new Date().toISOString(), + }); + } // Replay pending questions/permissions as regular events // (same format as real-time delivery — matches CLI behavior) - if (pendingQuestion) { + if (pendingQuestion && !codeReviewJob) { sendToIngest({ streamEventType: 'kilocode', data: { @@ -253,7 +261,7 @@ export function createConnectionManager( timestamp: new Date().toISOString(), }); } - if (pendingPermission) { + if (pendingPermission && !codeReviewJob) { sendToIngest({ streamEventType: 'kilocode', data: { @@ -266,7 +274,7 @@ export function createConnectionManager( } logToFile( - `kilo state sent: status=${sessionStatus.type}, question=${pendingQuestion?.id ?? 'none'}, permission=${pendingPermission?.id ?? 'none'}` + `kilo state sent: status=${sessionStatus.type}${skipStatusForCodeReview ? ' (suppressed)' : ''}, question=${pendingQuestion?.id ?? 'none'}${codeReviewJob && pendingQuestion ? ' (suppressed)' : ''}, permission=${pendingPermission?.id ?? 'none'}${codeReviewJob && pendingPermission ? ' (suppressed)' : ''}` ); } catch (err) { logToFile( @@ -516,15 +524,7 @@ export function createConnectionManager( if ( eventType === 'session.status' && - statusTypeFromProperties(properties) === 'question' - ) { - callbacks.onSseEvent?.(); - continue; - } - - if ( - eventType === 'session.status' && - statusTypeFromProperties(properties) === 'permission' + isInteractiveStatusType(statusTypeFromProperties(properties)) ) { callbacks.onSseEvent?.(); continue;