Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 195 additions & 9 deletions cloudflare-code-review-infra/src/code-review-orchestrator.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
/**
* CodeReviewOrchestrator - Durable Object for managing code review lifecycle.
* Handles calling cloud agent, maintaining SSE subscription, and updating status in DB.
*
* Supports two execution modes based on the useCloudAgentNext flag:
* - Default (cloud-agent): SSE streaming via initiateSessionAsync
* - cloud-agent-next: prepareSession + initiateFromKilocodeSessionV2, callback-based completion
*/

import { DurableObject } from 'cloudflare:workers';
Expand Down Expand Up @@ -247,6 +250,7 @@ export class CodeReviewOrchestrator extends DurableObject<Env> {
userId: string;
};
skipBalanceCheck?: boolean;
useCloudAgentNext?: boolean;
}): Promise<{ status: CodeReviewStatus }> {
this.state = {
reviewId: params.reviewId,
Expand All @@ -256,12 +260,14 @@ export class CodeReviewOrchestrator extends DurableObject<Env> {
status: 'queued',
updatedAt: new Date().toISOString(),
skipBalanceCheck: params.skipBalanceCheck,
useCloudAgentNext: params.useCloudAgentNext,
};
await this.saveState();

console.log('[CodeReviewOrchestrator] Review created and queued', {
reviewId: params.reviewId,
owner: params.owner,
useCloudAgentNext: params.useCloudAgentNext,
});

// Note: Review execution is triggered via runReview() from the worker
Expand Down Expand Up @@ -344,12 +350,13 @@ export class CodeReviewOrchestrator extends DurableObject<Env> {

/**
* Interrupt the cloud agent session to stop it from running and posting comments.
* Calls the cloud agent's interruptSession tRPC mutation.
* Routes to the correct backend based on useCloudAgentNext flag.
*/
private async interruptCloudAgentSession(sessionId: string): Promise<void> {
// Build tRPC mutation endpoint
// tRPC mutations use POST with JSON body
const cloudAgentUrl = `${this.env.CLOUD_AGENT_URL}/trpc/interruptSession`;
const baseUrl = this.state.useCloudAgentNext
? this.env.CLOUD_AGENT_NEXT_URL
: this.env.CLOUD_AGENT_URL;
const cloudAgentUrl = `${baseUrl}/trpc/interruptSession`;

const response = await fetch(cloudAgentUrl, {
method: 'POST',
Expand All @@ -367,7 +374,7 @@ export class CodeReviewOrchestrator extends DurableObject<Env> {
}

/**
* RPC method: Get events for this review.
* RPC method: Get events for this review (used by SSE/cloud-agent flow only).
*/
async getEvents(): Promise<{ events: CodeReviewEvent[] }> {
await this.loadState();
Expand Down Expand Up @@ -397,15 +404,193 @@ export class CodeReviewOrchestrator extends DurableObject<Env> {
return;
}

await this.run();
// Branch based on feature flag
if (this.state.useCloudAgentNext) {
await this.runWithCloudAgentNext();
} else {
await this.runWithCloudAgent();
}
}

// ---------------------------------------------------------------------------
// cloud-agent-next flow (feature-flagged)
// Uses prepareSession + initiateFromKilocodeSessionV2 with callback-based completion.
// ---------------------------------------------------------------------------

/**
* Orchestration via cloud-agent-next.
* Calls prepareSession + initiateFromKilocodeSessionV2.
* Terminal status is delivered reliably via cloud-agent-next's callback queue.
*/
private async runWithCloudAgentNext(): Promise<void> {
const runStartTime = Date.now();

try {
await this.updateStatus('running');

console.log('[CodeReviewOrchestrator] Starting review via cloud-agent-next', {
reviewId: this.state.reviewId,
timestamp: new Date().toISOString(),
});

// Build common headers for prepareSession (internalApiProtectedProcedure)
const headers: Record<string, string> = {
Authorization: `Bearer ${this.state.authToken}`,
'Content-Type': 'application/json',
'x-internal-api-key': this.env.INTERNAL_API_SECRET,
};
if (this.state.skipBalanceCheck) {
headers['x-skip-balance-check'] = 'true';
}

// Step 1: Prepare session with callback target
const prepareInput = {
...this.state.sessionInput,
callbackTarget: {
url: `${this.env.API_URL}/api/internal/code-review-status/${this.state.reviewId}`,
headers: {
'X-Internal-Secret': this.env.INTERNAL_API_SECRET,
},
},
};

console.log('[CodeReviewOrchestrator] Calling prepareSession', {
reviewId: this.state.reviewId,
callbackUrl: prepareInput.callbackTarget.url,
skipBalanceCheck: this.state.skipBalanceCheck,
});

const prepareResponse = await fetch(`${this.env.CLOUD_AGENT_NEXT_URL}/trpc/prepareSession`, {
method: 'POST',
headers,
body: JSON.stringify(prepareInput),
});
Comment on lines +463 to +467
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets use src/lib/cloud-agent-next/cloud-agent-client.ts, it should have all methods


if (!prepareResponse.ok) {
const errorText = await prepareResponse.text();
throw new Error(`prepareSession failed (${prepareResponse.status}): ${errorText}`);
}

const prepareResult = (await prepareResponse.json()) as Record<string, unknown>;
const prepareData = (prepareResult?.result as Record<string, unknown>)?.data as
| Record<string, unknown>
| undefined;
if (
!prepareData ||
typeof prepareData.cloudAgentSessionId !== 'string' ||
typeof prepareData.kiloSessionId !== 'string'
) {
throw new Error(
`Unexpected prepareSession response shape: ${JSON.stringify(prepareResult).slice(0, 500)}`
);
}
const { cloudAgentSessionId, kiloSessionId } = prepareData as {
cloudAgentSessionId: string;
kiloSessionId: string;
};

console.log('[CodeReviewOrchestrator] Session prepared', {
reviewId: this.state.reviewId,
cloudAgentSessionId,
kiloSessionId,
});

// Store session IDs immediately (no stream parsing needed)
await this.updateStatus('running', {
sessionId: cloudAgentSessionId,
cliSessionId: kiloSessionId,
});

// Step 2: Initiate execution
// initiateFromKilocodeSessionV2 is a protectedProcedure (Bearer token only)
const initiateHeaders: Record<string, string> = {
Authorization: `Bearer ${this.state.authToken}`,
'Content-Type': 'application/json',
};
if (this.state.skipBalanceCheck) {
initiateHeaders['x-skip-balance-check'] = 'true';
}

console.log('[CodeReviewOrchestrator] Calling initiateFromKilocodeSessionV2', {
reviewId: this.state.reviewId,
cloudAgentSessionId,
});

const initiateResponse = await fetch(
`${this.env.CLOUD_AGENT_NEXT_URL}/trpc/initiateFromKilocodeSessionV2`,
{
method: 'POST',
headers: initiateHeaders,
body: JSON.stringify({ cloudAgentSessionId }),
}
);

if (!initiateResponse.ok) {
const errorText = await initiateResponse.text();
throw new Error(
`initiateFromKilocodeSessionV2 failed (${initiateResponse.status}): ${errorText}`
);
}

const initiateResult = (await initiateResponse.json()) as Record<string, unknown>;
const initiateData = (initiateResult?.result as Record<string, unknown>)?.data as
| Record<string, unknown>
| undefined;
if (!initiateData || typeof initiateData.executionId !== 'string') {
throw new Error(
`Unexpected initiateFromKilocodeSessionV2 response shape: ${JSON.stringify(initiateResult).slice(0, 500)}`
);
}

console.log('[CodeReviewOrchestrator] Execution started', {
reviewId: this.state.reviewId,
cloudAgentSessionId,
executionId: initiateData.executionId,
status: initiateData.status,
});

// Done — cloud-agent-next callback will deliver terminal status
console.log('[CodeReviewOrchestrator] Review dispatched to cloud-agent-next', {
reviewId: this.state.reviewId,
sessionId: cloudAgentSessionId,
note: 'Callback will update final status',
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';

await this.updateStatus('failed', { errorMessage });

console.error('[CodeReviewOrchestrator] Review failed (cloud-agent-next):', {
reviewId: this.state.reviewId,
error: errorMessage,
});
} finally {
const totalExecutionTimeMs = Date.now() - runStartTime;
const minutes = Math.floor(totalExecutionTimeMs / 60000);
const seconds = Math.floor((totalExecutionTimeMs % 60000) / 1000);

console.log('[CodeReviewOrchestrator] Run completed (cloud-agent-next)', {
reviewId: this.state.reviewId,
sessionId: this.state.sessionId,
status: this.state.status,
totalExecutionTimeMs,
totalExecutionTime: `${minutes}m ${seconds}s`,
timestamp: new Date().toISOString(),
});
}
}

// ---------------------------------------------------------------------------
// cloud-agent flow (default / legacy)
// Uses SSE streaming via initiateSessionAsync.
// ---------------------------------------------------------------------------

/**
* Main orchestration method.
* Orchestration via cloud-agent (SSE).
* Calls cloud agent async streaming endpoint with callback for reliable completion notification.
* The callback ensures status is updated even if this DO dies or the SSE connection drops.
*/
private async run(): Promise<void> {
private async runWithCloudAgent(): Promise<void> {
const runStartTime = Date.now();

try {
Expand Down Expand Up @@ -541,6 +726,7 @@ export class CodeReviewOrchestrator extends DurableObject<Env> {
/**
* Process Server-Sent Events stream from cloud agent.
* Parses SSE events and extracts sessionId from the first event.
* Used only in the cloud-agent (SSE) flow.
*/
private async processEventStream(response: Response): Promise<void> {
if (!response.body) {
Expand Down
11 changes: 8 additions & 3 deletions cloudflare-code-review-infra/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
*
* Architecture:
* - POST /review - Create and start a code review (returns 202 immediately)
* - GET /reviews/:reviewId/events - Get events for a review (SSE flow only)
* - POST /reviews/:reviewId/cancel - Cancel a running review
* - GET /health - Health check endpoint
*
* Features:
* - Durable Objects maintain long-lived SSE connections
* - Durable Objects support two execution modes (feature-flagged):
* - Default: cloud-agent SSE streaming (initiateSessionAsync)
* - cloud-agent-next: prepareSession + initiateFromKilocodeSessionV2 with callback
* - Concurrency control handled in Next.js (dispatch logic)
* - Fire-and-forget from Next.js dispatch
*/
Expand Down Expand Up @@ -98,12 +102,13 @@ app.post('/review', async (c: Context<HonoEnv>) => {
sessionInput: body.sessionInput,
owner: body.owner,
skipBalanceCheck: body.skipBalanceCheck,
useCloudAgentNext: body.useCloudAgentNext,
}),
'start'
);

// Fire-and-forget: trigger review execution via HTTP context (no 15-min wall time limit)
// This runs the SSE stream processing without blocking the response
// Routes to cloud-agent SSE or cloud-agent-next based on useCloudAgentNext flag
c.executionCtx.waitUntil(
withDORetry(
() => c.env.CODE_REVIEW_ORCHESTRATOR.get(id),
Expand Down Expand Up @@ -132,7 +137,7 @@ app.post('/review', async (c: Context<HonoEnv>) => {
return c.json(response, 202);
});

// Route: GET /reviews/:reviewId/events
// Route: GET /reviews/:reviewId/events (used by SSE/cloud-agent flow for event polling)
app.get('/reviews/:reviewId/events', async (c: Context<HonoEnv>) => {
const reviewId = c.req.param('reviewId');

Expand Down
10 changes: 8 additions & 2 deletions cloudflare-code-review-infra/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,22 @@ export interface CodeReview {
owner: Owner;
status: CodeReviewStatus;
sessionId?: string; // Cloud agent session ID (agent_xxx)
cliSessionId?: string; // CLI session UUID (from session_created event)
cliSessionId?: string; // CLI session UUID (from session_created event or prepareSession)
errorMessage?: string;
startedAt?: string;
completedAt?: string;
updatedAt: string;
events?: CodeReviewEvent[];
skipBalanceCheck?: boolean; // Skip balance validation in cloud agent (for OSS sponsorship)
/** When true, use cloud-agent-next instead of cloud-agent (feature flag) */
useCloudAgentNext?: boolean;
}

export interface CodeReviewStatusResponse {
reviewId: string;
status: CodeReviewStatus;
sessionId?: string; // Cloud agent session ID (agent_xxx)
cliSessionId?: string; // CLI session UUID (from session_created event)
cliSessionId?: string; // CLI session UUID
startedAt?: string;
completedAt?: string;
errorMessage?: string;
Expand All @@ -77,6 +79,8 @@ export interface CodeReviewRequest {
sessionInput: SessionInput;
owner: Owner;
skipBalanceCheck?: boolean;
/** When true, use cloud-agent-next instead of cloud-agent (feature flag) */
useCloudAgentNext?: boolean;
}

export interface CodeReviewResponse {
Expand All @@ -95,6 +99,8 @@ export interface Env {
API_URL: string;
INTERNAL_API_SECRET: string;
CLOUD_AGENT_URL: string;
/** cloud-agent-next URL (used when useCloudAgentNext feature flag is enabled) */
CLOUD_AGENT_NEXT_URL: string;
BACKEND_AUTH_TOKEN: string;

// Optional Sentry
Expand Down
Loading