diff --git a/services/code-review-infra/src/code-review-orchestrator.ts b/services/code-review-infra/src/code-review-orchestrator.ts index 80d626fd44..29bf60ee51 100644 --- a/services/code-review-infra/src/code-review-orchestrator.ts +++ b/services/code-review-infra/src/code-review-orchestrator.ts @@ -26,7 +26,8 @@ import type { } from './types'; import { InternalStatusResponseSchema } from './types'; -type UpdateStatusResult = 'updated' | 'db-terminal'; +type UpdateStatusResult = 'updated' | 'db-terminal' | 'db-not-found'; +type TerminalCodeReviewStatus = Extract; function canContinueCloudAgentNextSession(health: CloudAgentSessionHealthOutput): boolean { return ( @@ -100,10 +101,6 @@ function parseJsonBody(body: string): unknown { } } -function isTerminalStatus(status: CodeReviewStatus): boolean { - return status === 'completed' || status === 'failed' || status === 'cancelled'; -} - function isCloudAgentNextSandboxInternalServerError(error: unknown): boolean { if (error instanceof CloudAgentNextBillingError) { return false; @@ -163,6 +160,10 @@ export class CodeReviewOrchestrator extends DurableObject { /** Fallback alarm for queued reviews accepted by the Worker but not run via waitUntil. */ private static readonly RUN_REVIEW_FALLBACK_DELAY_MS = 30_000; + private static readonly QUEUED_REVIEW_TIMEOUT_MS = 10 * 60 * 1000; + private static readonly RUNNING_REVIEW_TIMEOUT_MS = 90 * 60 * 1000; + private static readonly ALARM_RETRY_DELAY_MS = 5 * 60 * 1000; + /** Batch size for event persistence (save every N events to reduce CPU usage) */ private static readonly EVENT_BATCH_SIZE = 10; @@ -187,6 +188,86 @@ export class CodeReviewOrchestrator extends DurableObject { return this.cloudAgentNextClient; } + private static isTerminalStatus(status: CodeReviewStatus): status is TerminalCodeReviewStatus { + return status === 'completed' || status === 'failed' || status === 'cancelled'; + } + + private static timestampMs(value: string | undefined): number | undefined { + if (!value) { + return undefined; + } + + const timestamp = Date.parse(value); + return Number.isFinite(timestamp) ? timestamp : undefined; + } + + private async deleteStorage(): Promise { + await this.ctx.storage.deleteAlarm(); + await this.ctx.storage.deleteAll(); + Reflect.deleteProperty(this, 'state'); + } + + private runningWatchdogDeadlineMs(now = Date.now()): number { + const baseTimestamp = + CodeReviewOrchestrator.timestampMs(this.state.startedAt) ?? + CodeReviewOrchestrator.timestampMs(this.state.updatedAt) ?? + now; + + return baseTimestamp + CodeReviewOrchestrator.RUNNING_REVIEW_TIMEOUT_MS; + } + + private async scheduleRunningWatchdog( + options: { force?: boolean; now?: number } = {} + ): Promise { + const now = options.now ?? Date.now(); + const deadline = this.runningWatchdogDeadlineMs(now); + const alarmAt = Math.max(deadline, now + 1_000); + + if (options.force) { + await this.ctx.storage.setAlarm(alarmAt); + return deadline; + } + + const currentAlarm = await this.ctx.storage.getAlarm(); + if (currentAlarm === null || currentAlarm <= now) { + await this.ctx.storage.setAlarm(alarmAt); + } + + return deadline; + } + + private async scheduleTerminalCleanup(now = Date.now()): Promise { + const cleanupAt = now + CodeReviewOrchestrator.CLEANUP_DELAY_MS; + await this.ctx.storage.setAlarm(cleanupAt); + return cleanupAt; + } + + private async scheduleAlarmRetry(): Promise { + const now = Date.now(); + const retryAt = now + CodeReviewOrchestrator.ALARM_RETRY_DELAY_MS; + const currentAlarm = await this.ctx.storage.getAlarm(); + + if (currentAlarm !== null && currentAlarm > now && currentAlarm <= retryAt) { + return; + } + + await this.ctx.storage.setAlarm(retryAt); + } + + private terminalStatusOptions(): { + sessionId?: string; + cliSessionId?: string; + errorMessage?: string; + terminalReason?: CloudAgentTerminalReason; + } { + return { + sessionId: this.state.sessionId, + cliSessionId: this.state.cliSessionId, + errorMessage: this.state.errorMessage, + terminalReason: this.state.terminalReason, + }; + } + private async tryRetryFreshSessionAfterSandboxError( source: string, error: unknown @@ -194,7 +275,7 @@ export class CodeReviewOrchestrator extends DurableObject { if ( this.state.sandboxRetryAttempted === true || this.cancelled || - isTerminalStatus(this.state.status) + CodeReviewOrchestrator.isTerminalStatus(this.state.status) ) { return false; } @@ -250,34 +331,40 @@ export class CodeReviewOrchestrator extends DurableObject { */ async alarm(): Promise { try { - await this.loadState(); + const stateLoaded = await this.loadState(); + + if (!stateLoaded) { + console.log('[CodeReviewOrchestrator] Alarm fired but no state found, deleting storage'); + await this.deleteStorage(); + return; + } - // Guard against missing state (already cleaned up or never initialized) - if (!this.state) { - console.log('[CodeReviewOrchestrator] Alarm fired but no state found, skipping'); + if (this.state.status === 'queued') { + await this.handleQueuedAlarm(); return; } - if ( - this.state.status === 'completed' || - this.state.status === 'failed' || - this.state.status === 'cancelled' - ) { - // Cleanup: Delete all DO storage after 7 days - console.log('[CodeReviewOrchestrator] Cleaning up completed review', { + if (this.state.status === 'running') { + await this.handleRunningAlarm(); + return; + } + + if (CodeReviewOrchestrator.isTerminalStatus(this.state.status)) { + console.log('[CodeReviewOrchestrator] Cleaning up terminal review', { reviewId: this.state.reviewId, status: this.state.status, }); - await this.ctx.storage.deleteAll(); - } else if (this.state.status === 'queued') { - console.log('[CodeReviewOrchestrator] Fallback alarm starting queued review', { - reviewId: this.state.reviewId, - }); - await this.runReview(); - } else if (this.state.status === 'running') { - console.log('[CodeReviewOrchestrator] Fallback alarm no-op for running review', { - reviewId: this.state.reviewId, - }); + const dbUpdateResult = await this.updateDBStatus( + this.state.status, + this.terminalStatusOptions() + ); + if (dbUpdateResult === 'db-not-found') { + console.warn('[CodeReviewOrchestrator] Deleting terminal review storage after DB 404', { + reviewId: this.state.reviewId, + status: this.state.status, + }); + } + await this.deleteStorage(); } else { // Unexpected state - log for debugging console.warn('[CodeReviewOrchestrator] Alarm fired for non-terminal state', { @@ -292,13 +379,100 @@ export class CodeReviewOrchestrator extends DurableObject { errorType: (error as Error)?.constructor?.name, errorMessage: error instanceof Error ? error.message : String(error), }); + + try { + const storedState = await this.ctx.storage.get('state'); + if (storedState) { + this.state = storedState; + await this.scheduleAlarmRetry(); + console.warn('[CodeReviewOrchestrator] Scheduled alarm retry after handler failure', { + reviewId: storedState.reviewId, + status: storedState.status, + retryInMs: CodeReviewOrchestrator.ALARM_RETRY_DELAY_MS, + }); + } + } catch (retryError) { + console.error('[CodeReviewOrchestrator] Failed to schedule alarm retry:', { + reviewId: this.state?.reviewId, + status: this.state?.status, + errorType: (retryError as Error)?.constructor?.name, + errorMessage: retryError instanceof Error ? retryError.message : String(retryError), + }); + } + } + } + + private async handleQueuedAlarm(): Promise { + const now = Date.now(); + const updatedAt = CodeReviewOrchestrator.timestampMs(this.state.updatedAt); + if ( + updatedAt === undefined || + now - updatedAt > CodeReviewOrchestrator.QUEUED_REVIEW_TIMEOUT_MS + ) { + console.warn('[CodeReviewOrchestrator] Queued review timed out', { + reviewId: this.state.reviewId, + updatedAt: this.state.updatedAt, + }); + await this.updateStatus('failed', { + errorMessage: 'Review timed out waiting to start', + terminalReason: 'timeout', + }); + return; + } + + console.log('[CodeReviewOrchestrator] Fallback alarm starting queued review', { + reviewId: this.state.reviewId, + }); + await this.runReview(); + + if (this.state.status === 'queued') { + console.warn('[CodeReviewOrchestrator] Queued review still queued after fallback, retrying', { + reviewId: this.state.reviewId, + }); + await this.scheduleAlarmRetry(); + } + } + + private async handleRunningAlarm(): Promise { + const now = Date.now(); + const watchdogDeadline = this.runningWatchdogDeadlineMs(now); + const dbSyncResult = await this.updateStatus('running'); + + if ( + dbSyncResult === 'db-terminal' || + dbSyncResult === 'db-not-found' || + CodeReviewOrchestrator.isTerminalStatus(this.state.status) + ) { + if (dbSyncResult === 'db-not-found') { + await this.deleteStorage(); + } + return; } + + if (now >= watchdogDeadline) { + console.warn('[CodeReviewOrchestrator] Running review watchdog timed out', { + reviewId: this.state.reviewId, + startedAt: this.state.startedAt, + updatedAt: this.state.updatedAt, + }); + await this.updateStatus('failed', { + errorMessage: 'Review timed out waiting for cloud agent callback', + terminalReason: 'timeout', + }); + return; + } + + await this.ctx.storage.setAlarm(watchdogDeadline); + console.log('[CodeReviewOrchestrator] Re-armed running review watchdog', { + reviewId: this.state.reviewId, + watchdogAt: new Date(watchdogDeadline).toISOString(), + }); } /** * Load state from durable storage. */ - private async loadState(): Promise { + private async loadState(): Promise { const storedState = await this.ctx.storage.get('state'); if (storedState) { @@ -315,7 +489,11 @@ export class CodeReviewOrchestrator extends DurableObject { if (storedState.totalTokensIn != null) this.totalTokensIn = storedState.totalTokensIn; if (storedState.totalTokensOut != null) this.totalTokensOut = storedState.totalTokensOut; if (storedState.totalCost != null) this.totalCost = storedState.totalCost; + return true; } + + Reflect.deleteProperty(this, 'state'); + return false; } /** @@ -339,6 +517,7 @@ export class CodeReviewOrchestrator extends DurableObject { ): Promise { // Check if there are any actual changes to process const statusChanged = this.state.status !== status; + const previousStartedAt = this.state.startedAt; const sessionIdChanged = options !== undefined && 'sessionId' in options && options.sessionId !== this.state.sessionId; const cliSessionIdChanged = @@ -366,6 +545,8 @@ export class CodeReviewOrchestrator extends DurableObject { return 'updated'; } + await this.scheduleRunningWatchdog(); + try { return await this.updateDBStatus(status, options); } catch (error) { @@ -374,23 +555,26 @@ export class CodeReviewOrchestrator extends DurableObject { } } + const now = Date.now(); + const nowIso = new Date(now).toISOString(); + // Update status if it changed if (statusChanged) { this.state.status = status; // Update timestamps based on status if (status === 'running' && !this.state.startedAt) { - this.state.startedAt = new Date().toISOString(); + this.state.startedAt = nowIso; } - if (status === 'completed' || status === 'failed' || status === 'cancelled') { - this.state.completedAt = new Date().toISOString(); + if (CodeReviewOrchestrator.isTerminalStatus(status)) { + this.state.completedAt = nowIso; // Clear events immediately - no longer needed after completion this.state.events = []; // Schedule cleanup alarm for 7 days from now - await this.ctx.storage.setAlarm(Date.now() + CodeReviewOrchestrator.CLEANUP_DELAY_MS); + await this.scheduleTerminalCleanup(now); console.log('[CodeReviewOrchestrator] Scheduled cleanup alarm', { reviewId: this.state.reviewId, @@ -424,23 +608,28 @@ export class CodeReviewOrchestrator extends DurableObject { this.state.terminalReason = options.terminalReason; } - this.state.updatedAt = new Date().toISOString(); + if (status === 'running') { + await this.scheduleRunningWatchdog({ + force: statusChanged || previousStartedAt !== this.state.startedAt, + now, + }); + } + + this.state.updatedAt = nowIso; await this.saveState(); // Update Next.js DB via internal API try { const dbUpdateResult = await this.updateDBStatus(status, options); - if (dbUpdateResult === 'db-terminal') { - return 'db-terminal'; + if (dbUpdateResult === 'db-terminal' || dbUpdateResult === 'db-not-found') { + return dbUpdateResult; } } catch (error) { console.error('[CodeReviewOrchestrator] Failed to update DB status:', error); // For terminal states (completed/failed/cancelled), DB update MUST succeed // Otherwise frontend will poll forever thinking review is still running and also blocking the slot in the queue - const isTerminalState = - status === 'completed' || status === 'failed' || status === 'cancelled'; - if (isTerminalState) { + if (CodeReviewOrchestrator.isTerminalStatus(status)) { const errorMessage = error instanceof Error ? error.message : String(error); throw new Error(`Critical: Failed to update DB status to '${status}': ${errorMessage}`); } @@ -451,7 +640,7 @@ export class CodeReviewOrchestrator extends DurableObject { } private async setLocalTerminalStateFromDB( - status: Extract, + status: TerminalCodeReviewStatus, terminalReason?: CloudAgentTerminalReason | null ): Promise { this.state.status = status; @@ -461,7 +650,7 @@ export class CodeReviewOrchestrator extends DurableObject { this.state.completedAt = this.state.completedAt ?? new Date().toISOString(); this.state.events = []; this.state.updatedAt = new Date().toISOString(); - await this.ctx.storage.setAlarm(Date.now() + CodeReviewOrchestrator.CLEANUP_DELAY_MS); + await this.scheduleTerminalCleanup(); await this.saveState(); console.log('[CodeReviewOrchestrator] Local state synced to terminal DB status', { reviewId: this.state.reviewId, @@ -504,6 +693,14 @@ export class CodeReviewOrchestrator extends DurableObject { if (!response.ok) { const errorText = await response.text(); + if (response.status === 404) { + console.warn('[CodeReviewOrchestrator] DB review not found during status update', { + reviewId: this.state.reviewId, + status, + error: errorText, + }); + return 'db-not-found'; + } throw new Error(`Failed to update DB status: ${response.status} ${errorText}`); } @@ -933,7 +1130,7 @@ export class CodeReviewOrchestrator extends DurableObject { return; } - if (this.cancelled || isTerminalStatus(this.state.status)) { + if (this.cancelled || CodeReviewOrchestrator.isTerminalStatus(this.state.status)) { return; } @@ -1119,7 +1316,7 @@ export class CodeReviewOrchestrator extends DurableObject { return; } - if (this.cancelled || isTerminalStatus(this.state.status)) { + if (this.cancelled || CodeReviewOrchestrator.isTerminalStatus(this.state.status)) { return; } diff --git a/services/code-review-infra/test/integration/code-review-orchestrator.test.ts b/services/code-review-infra/test/integration/code-review-orchestrator.test.ts index f1f94c2b9b..6f59ac5c74 100644 --- a/services/code-review-infra/test/integration/code-review-orchestrator.test.ts +++ b/services/code-review-infra/test/integration/code-review-orchestrator.test.ts @@ -1,9 +1,14 @@ /* eslint-disable @typescript-eslint/no-base-to-string, @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-return */ import { env, runDurableObjectAlarm, runInDurableObject, SELF } from 'cloudflare:test'; -import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { describe, it, expect, vi, beforeEach, afterEach, afterAll } from 'vitest'; import type { CodeReviewOrchestrator } from '../../src/code-review-orchestrator'; import type { CodeReview, SessionInput } from '../../src/types'; +const QUEUED_REVIEW_TIMEOUT_MS = 10 * 60 * 1000; +const RUNNING_REVIEW_TIMEOUT_MS = 90 * 60 * 1000; +const ALARM_RETRY_DELAY_MS = 5 * 60 * 1000; +const CLEANUP_DELAY_MS = 7 * 24 * 60 * 60 * 1000; + function getReviewStub(name = `review-${crypto.randomUUID()}`) { const id = env.CODE_REVIEW_ORCHESTRATOR.idFromName(name); return env.CODE_REVIEW_ORCHESTRATOR.get(id); @@ -89,6 +94,12 @@ async function storedReview(stub: DurableObjectStub) { ); } +async function storedAlarm(stub: DurableObjectStub) { + return runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => + state.storage.getAlarm() + ); +} + describe('CodeReviewOrchestrator recovery', () => { const originalFetch = globalThis.fetch; @@ -100,6 +111,10 @@ describe('CodeReviewOrchestrator recovery', () => { globalThis.fetch = originalFetch; }); + afterAll(() => { + globalThis.fetch = originalFetch; + }); + it('start arms a fallback alarm for a queued review', async () => { const stub = getReviewStub(); @@ -149,6 +164,7 @@ describe('CodeReviewOrchestrator recovery', () => { it('queued review alarm retries runReview and transitions to running', async () => { const stub = getReviewStub(); + const startedBefore = Date.now(); const fetchMock = vi.fn(async (request: RequestInfo | URL) => { const url = String(request); if (url.includes('/api/internal/code-review-status/')) { @@ -183,6 +199,11 @@ describe('CodeReviewOrchestrator recovery', () => { }); expect(hasFetchCall(fetchMock, '/trpc/prepareSession')).toBe(true); expect(hasFetchCall(fetchMock, '/trpc/initiateFromKilocodeSessionV2')).toBe(true); + + const alarm = await storedAlarm(stub); + expect(alarm).toEqual(expect.any(Number)); + expect(alarm).toBeGreaterThanOrEqual(startedBefore + RUNNING_REVIEW_TIMEOUT_MS); + expect(alarm).toBeLessThanOrEqual(Date.now() + RUNNING_REVIEW_TIMEOUT_MS); }); it('retries prepareSession once after a sandbox 500 and initiates the retry session', async () => { @@ -832,10 +853,196 @@ describe('CodeReviewOrchestrator recovery', () => { const status = await stub.status(); expect(status.status).toBe('cancelled'); expect(fetchMock).toHaveBeenCalledTimes(1); + const alarm = await storedAlarm(stub); + expect(alarm).toEqual(expect.any(Number)); + expect(alarm).toBeGreaterThan(Date.now()); + }); + + it('recent running alarm syncs DB status and re-arms watchdog without failing', async () => { + const stub = getReviewStub(); + const startedAt = new Date(Date.now() - 5 * 60 * 1000).toISOString(); + const fetchMock = vi.fn(async (request: RequestInfo | URL) => { + const url = String(request); + if (url.includes('/api/internal/code-review-status/')) { + return Response.json({ success: true }); + } + return new Response('cloud-agent should not be called', { status: 500 }); + }); + globalThis.fetch = fetchMock; + + await runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => { + await state.storage.put( + 'state', + codeReview({ + status: 'running', + startedAt, + updatedAt: startedAt, + sessionId: 'agent-running-session', + }) + ); + await state.storage.setAlarm(Date.now() + 30_000); + }); + + const ran = await runDurableObjectAlarm(stub); + + expect(ran).toBe(true); + await expect(stub.status()).resolves.toMatchObject({ + status: 'running', + sessionId: 'agent-running-session', + }); + expect(fetchCalls(fetchMock, '/api/internal/code-review-status/')).toHaveLength(1); + expect(lastStatusUpdateBody(fetchMock)).toMatchObject({ status: 'running' }); + expect(hasFetchCall(fetchMock, '/trpc/prepareSession')).toBe(false); + await expect(storedAlarm(stub)).resolves.toBe( + Date.parse(startedAt) + RUNNING_REVIEW_TIMEOUT_MS + ); + }); + + it('running alarm syncs local terminal state when DB is already terminal', async () => { + const stub = getReviewStub(); + const startedAt = new Date(Date.now() - 5 * 60 * 1000).toISOString(); + const cleanupStart = Date.now(); + const fetchMock = vi.fn(async (request: RequestInfo | URL) => { + const url = String(request); + if (url.includes('/api/internal/code-review-status/')) { + return Response.json({ + success: true, + message: 'Review already in terminal state', + currentStatus: 'completed', + terminalReason: 'timeout', + }); + } + return new Response('cloud-agent should not be called', { status: 500 }); + }); + globalThis.fetch = fetchMock; + + await runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => { + await state.storage.put( + 'state', + codeReview({ + status: 'running', + startedAt, + updatedAt: startedAt, + events: [{ timestamp: startedAt, eventType: 'test', message: 'stored' }], + }) + ); + await state.storage.setAlarm(Date.now() + 30_000); + }); + + const ran = await runDurableObjectAlarm(stub); + + expect(ran).toBe(true); + const stored = await storedReview(stub); + expect(stored).toMatchObject({ + status: 'completed', + terminalReason: 'timeout', + events: [], + }); + expect(stored?.completedAt).toEqual(expect.any(String)); + expect(fetchMock).toHaveBeenCalledTimes(1); + const alarm = await storedAlarm(stub); + expect(alarm).toEqual(expect.any(Number)); + expect(alarm).toBeGreaterThanOrEqual(cleanupStart + CLEANUP_DELAY_MS); + expect(alarm).toBeLessThanOrEqual(Date.now() + CLEANUP_DELAY_MS); + }); + + it('stale running alarm marks review failed and schedules cleanup', async () => { + const stub = getReviewStub(); + const startedAt = new Date(Date.now() - RUNNING_REVIEW_TIMEOUT_MS - 1_000).toISOString(); + const cleanupStart = Date.now(); + const fetchMock = vi.fn(async (request: RequestInfo | URL) => { + const url = String(request); + if (url.includes('/api/internal/code-review-status/')) { + return Response.json({ success: true }); + } + return new Response('cloud-agent should not be called', { status: 500 }); + }); + globalThis.fetch = fetchMock; + + await runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => { + await state.storage.put( + 'state', + codeReview({ + status: 'running', + startedAt, + updatedAt: startedAt, + sessionId: 'agent-stale-session', + }) + ); + await state.storage.setAlarm(Date.now() + 30_000); + }); + + const ran = await runDurableObjectAlarm(stub); + + expect(ran).toBe(true); + await expect(stub.status()).resolves.toMatchObject({ + status: 'failed', + errorMessage: 'Review timed out waiting for cloud agent callback', + terminalReason: 'timeout', + }); + expect(fetchCalls(fetchMock, '/api/internal/code-review-status/')).toHaveLength(2); + expect(lastStatusUpdateBody(fetchMock)).toMatchObject({ + status: 'failed', + terminalReason: 'timeout', + errorMessage: 'Review timed out waiting for cloud agent callback', + }); + expect(hasFetchCall(fetchMock, '/trpc/prepareSession')).toBe(false); + const alarm = await storedAlarm(stub); + expect(alarm).toEqual(expect.any(Number)); + expect(alarm).toBeGreaterThanOrEqual(cleanupStart + CLEANUP_DELAY_MS); + expect(alarm).toBeLessThanOrEqual(Date.now() + CLEANUP_DELAY_MS); }); - it('terminal cleanup alarm still deletes storage', async () => { + it('stale queued alarm marks review failed without calling cloud-agent', async () => { const stub = getReviewStub(); + const queuedAt = new Date(Date.now() - QUEUED_REVIEW_TIMEOUT_MS - 1_000).toISOString(); + const cleanupStart = Date.now(); + const fetchMock = vi.fn(async (request: RequestInfo | URL) => { + const url = String(request); + if (url.includes('/api/internal/code-review-status/')) { + return Response.json({ success: true }); + } + return new Response('cloud-agent should not be called', { status: 500 }); + }); + globalThis.fetch = fetchMock; + + await runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => { + await state.storage.put('state', codeReview({ updatedAt: queuedAt })); + await state.storage.setAlarm(Date.now() + 30_000); + }); + + const ran = await runDurableObjectAlarm(stub); + + expect(ran).toBe(true); + await expect(stub.status()).resolves.toMatchObject({ + status: 'failed', + errorMessage: 'Review timed out waiting to start', + terminalReason: 'timeout', + }); + expect(fetchCalls(fetchMock, '/api/internal/code-review-status/')).toHaveLength(1); + expect(lastStatusUpdateBody(fetchMock)).toMatchObject({ + status: 'failed', + terminalReason: 'timeout', + errorMessage: 'Review timed out waiting to start', + }); + expect(hasFetchCall(fetchMock, '/trpc/prepareSession')).toBe(false); + expect(hasFetchCall(fetchMock, '/trpc/initiateFromKilocodeSessionV2')).toBe(false); + const alarm = await storedAlarm(stub); + expect(alarm).toEqual(expect.any(Number)); + expect(alarm).toBeGreaterThanOrEqual(cleanupStart + CLEANUP_DELAY_MS); + expect(alarm).toBeLessThanOrEqual(Date.now() + CLEANUP_DELAY_MS); + }); + + it('terminal cleanup deletes storage after DB terminal sync succeeds', async () => { + const stub = getReviewStub(); + const fetchMock = vi.fn(async (request: RequestInfo | URL) => { + const url = String(request); + if (url.includes('/api/internal/code-review-status/')) { + return Response.json({ success: true }); + } + return new Response('unexpected fetch', { status: 500 }); + }); + globalThis.fetch = fetchMock; await runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => { await state.storage.put( @@ -843,6 +1050,9 @@ describe('CodeReviewOrchestrator recovery', () => { codeReview({ status: 'completed', completedAt: new Date().toISOString(), + sessionId: 'agent-terminal-session', + cliSessionId: 'ses_terminal', + terminalReason: 'timeout', events: [{ timestamp: new Date().toISOString(), eventType: 'test', message: 'stored' }], }) ); @@ -857,5 +1067,88 @@ describe('CodeReviewOrchestrator recovery', () => { async (_instance: CodeReviewOrchestrator, state) => state.storage.get('state') ); expect(stored).toBeUndefined(); + await expect(storedAlarm(stub)).resolves.toBeNull(); + expect(fetchCalls(fetchMock, '/api/internal/code-review-status/')).toHaveLength(1); + expect(lastStatusUpdateBody(fetchMock)).toMatchObject({ + status: 'completed', + sessionId: 'agent-terminal-session', + cliSessionId: 'ses_terminal', + terminalReason: 'timeout', + }); + }); + + it('terminal cleanup retries instead of deleting when DB terminal sync fails transiently', async () => { + const stub = getReviewStub(); + const retryStart = Date.now(); + const fetchMock = vi.fn(async (request: RequestInfo | URL) => { + const url = String(request); + if (url.includes('/api/internal/code-review-status/')) { + return new Response('temporary status failure', { status: 500 }); + } + return new Response('unexpected fetch', { status: 500 }); + }); + globalThis.fetch = fetchMock; + + await runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => { + await state.storage.put( + 'state', + codeReview({ + status: 'failed', + completedAt: new Date().toISOString(), + errorMessage: 'temporary local failure', + terminalReason: 'upstream_error', + }) + ); + await state.storage.setAlarm(Date.now() + 30_000); + }); + + const ran = await runDurableObjectAlarm(stub); + + expect(ran).toBe(true); + await expect(storedReview(stub)).resolves.toMatchObject({ + status: 'failed', + errorMessage: 'temporary local failure', + terminalReason: 'upstream_error', + }); + const alarm = await storedAlarm(stub); + expect(alarm).toEqual(expect.any(Number)); + expect(alarm).toBeGreaterThanOrEqual(retryStart + ALARM_RETRY_DELAY_MS); + expect(alarm).toBeLessThanOrEqual(Date.now() + ALARM_RETRY_DELAY_MS); + }); + + it('alarm handler failure leaves a future retry alarm', async () => { + const stub = getReviewStub(); + const retryStart = Date.now(); + const fetchMock = vi.fn(async (request: RequestInfo | URL, init?: RequestInit) => { + const url = String(request); + if (url.includes('/api/internal/code-review-status/')) { + const body = typeof init?.body === 'string' ? JSON.parse(init.body) : undefined; + if (body?.status === 'failed') { + return new Response('temporary status failure', { status: 500 }); + } + return Response.json({ success: true }); + } + if (url.includes('/trpc/prepareSession')) { + return trpcError(400, 'Branch not found: main', 'BAD_REQUEST'); + } + return new Response('unexpected fetch', { status: 500 }); + }); + globalThis.fetch = fetchMock; + + await runInDurableObject(stub, async (_instance: CodeReviewOrchestrator, state) => { + await state.storage.put('state', codeReview()); + await state.storage.setAlarm(Date.now() + 30_000); + }); + + const ran = await runDurableObjectAlarm(stub); + + expect(ran).toBe(true); + await expect(storedReview(stub)).resolves.toMatchObject({ status: 'failed' }); + expect(fetchCalls(fetchMock, '/trpc/prepareSession')).toHaveLength(1); + expect(lastStatusUpdateBody(fetchMock)).toMatchObject({ status: 'failed' }); + const alarm = await storedAlarm(stub); + expect(alarm).toEqual(expect.any(Number)); + expect(alarm).toBeGreaterThanOrEqual(retryStart + ALARM_RETRY_DELAY_MS); + expect(alarm).toBeLessThanOrEqual(Date.now() + ALARM_RETRY_DELAY_MS); }); });