diff --git a/apps/server/src/orchestration/decider.ts b/apps/server/src/orchestration/decider.ts index 1004c945dbf..5129d200d23 100644 --- a/apps/server/src/orchestration/decider.ts +++ b/apps/server/src/orchestration/decider.ts @@ -441,11 +441,15 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" } case "thread.turn.interrupt": { - yield* requireThread({ + const thread = yield* requireThread({ readModel, command, threadId: command.threadId, }); + const resolvedTurnId = + command.turnId ?? + thread.session?.activeTurnId ?? + (thread.latestTurn?.state === "running" ? thread.latestTurn.turnId : undefined); return { ...withEventBase({ aggregateKind: "thread", @@ -456,7 +460,7 @@ export const decideOrchestrationCommand = Effect.fn("decideOrchestrationCommand" type: "thread.turn-interrupt-requested", payload: { threadId: command.threadId, - ...(command.turnId !== undefined ? { turnId: command.turnId } : {}), + ...(resolvedTurnId !== undefined ? { turnId: resolvedTurnId } : {}), createdAt: command.createdAt, }, }; diff --git a/apps/server/src/orchestration/http.ts b/apps/server/src/orchestration/http.ts index f24f808c6f8..048f3801253 100644 --- a/apps/server/src/orchestration/http.ts +++ b/apps/server/src/orchestration/http.ts @@ -7,6 +7,7 @@ import { import * as Effect from "effect/Effect"; import { HttpRouter, HttpServerRequest, HttpServerResponse } from "effect/unstable/http"; +import { respondToAuthError } from "../auth/http.ts"; import { ServerAuth } from "../auth/Services/ServerAuth.ts"; import { normalizeDispatchCommand } from "./Normalizer.ts"; import { OrchestrationEngineService } from "./Services/OrchestrationEngine.ts"; @@ -58,6 +59,7 @@ export const orchestrationSnapshotRouteLayer = HttpRouter.add( status: 200, }); }).pipe( + Effect.catchTag("AuthError", respondToAuthError), Effect.catchTag("OrchestrationDispatchCommandError", respondToOrchestrationHttpError), Effect.catchTag("OrchestrationGetSnapshotError", respondToOrchestrationHttpError), ), @@ -89,5 +91,8 @@ export const orchestrationDispatchRouteLayer = HttpRouter.add( ), ); return HttpServerResponse.jsonUnsafe(result, { status: 200 }); - }).pipe(Effect.catchTag("OrchestrationDispatchCommandError", respondToOrchestrationHttpError)), + }).pipe( + Effect.catchTag("AuthError", respondToAuthError), + Effect.catchTag("OrchestrationDispatchCommandError", respondToOrchestrationHttpError), + ), ); diff --git a/apps/server/src/provider/Drivers/GeminiDriver.ts b/apps/server/src/provider/Drivers/GeminiDriver.ts new file mode 100644 index 00000000000..019d91eee87 --- /dev/null +++ b/apps/server/src/provider/Drivers/GeminiDriver.ts @@ -0,0 +1,126 @@ +import { GeminiSettings, ProviderDriverKind, type ServerProvider } from "@t3tools/contracts"; +import * as Duration from "effect/Duration"; +import * as Effect from "effect/Effect"; +import * as FileSystem from "effect/FileSystem"; +import * as Schema from "effect/Schema"; +import * as Stream from "effect/Stream"; +import { ChildProcessSpawner } from "effect/unstable/process"; + +import { ServerConfig } from "../../config.ts"; +import { makeGeminiTextGeneration } from "../../textGeneration/GeminiTextGeneration.ts"; +import { ProviderDriverError } from "../Errors.ts"; +import { makeGeminiAdapter } from "../Layers/GeminiAdapter.ts"; +import { checkGeminiProviderStatus, makePendingGeminiProvider } from "../Layers/GeminiProvider.ts"; +import { ProviderEventLoggers } from "../Layers/ProviderEventLoggers.ts"; +import { makeManagedServerProvider } from "../makeManagedServerProvider.ts"; +import { + defaultProviderContinuationIdentity, + type ProviderDriver, + type ProviderInstance, +} from "../ProviderDriver.ts"; +import { + makePackageManagedProviderMaintenanceResolver, + resolveProviderMaintenanceCapabilitiesEffect, +} from "../providerMaintenance.ts"; +import { mergeProviderInstanceEnvironment } from "../ProviderInstanceEnvironment.ts"; +import type { ServerProviderDraft } from "../providerSnapshot.ts"; + +const DRIVER_KIND = ProviderDriverKind.make("gemini"); +const SNAPSHOT_REFRESH_INTERVAL = Duration.hours(1); +const UPDATE = makePackageManagedProviderMaintenanceResolver({ + provider: DRIVER_KIND, + npmPackageName: "@google/gemini-cli", + homebrewFormula: null, + nativeUpdate: null, +}); +const decodeDefaultGeminiSettings = Schema.decodeSync(GeminiSettings); + +export type GeminiDriverEnv = + | ChildProcessSpawner.ChildProcessSpawner + | FileSystem.FileSystem + | ProviderEventLoggers + | ServerConfig; + +const withInstanceIdentity = + (input: { + readonly instanceId: ProviderInstance["instanceId"]; + readonly displayName: string | undefined; + readonly accentColor: string | undefined; + }) => + (snapshot: ServerProviderDraft): ServerProvider => ({ + ...snapshot, + instanceId: input.instanceId, + driver: DRIVER_KIND, + ...(input.displayName ? { displayName: input.displayName } : {}), + ...(input.accentColor ? { accentColor: input.accentColor } : {}), + continuation: { groupKey: `${DRIVER_KIND}:instance:${input.instanceId}` }, + }); + +export const GeminiDriver: ProviderDriver = { + driverKind: DRIVER_KIND, + metadata: { + displayName: "Gemini", + supportsMultipleInstances: true, + }, + configSchema: GeminiSettings, + defaultConfig: (): GeminiSettings => decodeDefaultGeminiSettings({}), + create: ({ instanceId, displayName, accentColor, environment, enabled, config }) => + Effect.gen(function* () { + const spawner = yield* ChildProcessSpawner.ChildProcessSpawner; + const serverConfig = yield* ServerConfig; + const eventLoggers = yield* ProviderEventLoggers; + const processEnv = mergeProviderInstanceEnvironment(environment); + const effectiveConfig = { ...config, enabled } satisfies GeminiSettings; + const stampIdentity = withInstanceIdentity({ instanceId, displayName, accentColor }); + const maintenanceCapabilities = yield* resolveProviderMaintenanceCapabilitiesEffect(UPDATE, { + binaryPath: effectiveConfig.binaryPath, + env: processEnv, + }); + + const adapter = yield* makeGeminiAdapter(effectiveConfig, { + environment: processEnv, + ...(eventLoggers.native ? { nativeEventLogger: eventLoggers.native } : {}), + }); + const textGeneration = yield* makeGeminiTextGeneration(effectiveConfig, processEnv); + const checkProvider = checkGeminiProviderStatus(effectiveConfig, processEnv).pipe( + Effect.map(stampIdentity), + Effect.provideService(ChildProcessSpawner.ChildProcessSpawner, spawner), + Effect.provideService(ServerConfig, serverConfig), + ); + const snapshot = yield* makeManagedServerProvider({ + maintenanceCapabilities, + getSettings: Effect.succeed(effectiveConfig), + streamSettings: Stream.never, + haveSettingsChanged: () => false, + initialSnapshot: (settings) => + makePendingGeminiProvider(settings).pipe(Effect.map(stampIdentity)), + checkProvider, + refreshInterval: SNAPSHOT_REFRESH_INTERVAL, + }).pipe( + Effect.mapError( + (cause) => + new ProviderDriverError({ + driver: DRIVER_KIND, + instanceId, + detail: `Failed to build Gemini snapshot: ${cause.message ?? String(cause)}`, + cause, + }), + ), + ); + + return { + instanceId, + driverKind: DRIVER_KIND, + continuationIdentity: defaultProviderContinuationIdentity({ + driverKind: DRIVER_KIND, + instanceId, + }), + displayName, + accentColor, + enabled, + snapshot, + adapter, + textGeneration, + } satisfies ProviderInstance; + }), +}; diff --git a/apps/server/src/provider/Layers/CursorAdapter.ts b/apps/server/src/provider/Layers/CursorAdapter.ts index efef5f0a83b..3b715f57f02 100644 --- a/apps/server/src/provider/Layers/CursorAdapter.ts +++ b/apps/server/src/provider/Layers/CursorAdapter.ts @@ -801,12 +801,16 @@ export function makeCursorAdapter( provider: PROVIDER, threadId: ctx.threadId, turnId: ctx.activeTurnId, + streamKind: event.streamKind, ...(event.itemId ? { itemId: event.itemId } : {}), text: event.text, rawPayload: event.rawPayload, }), ); return; + case "UsageUpdated": + case "ThreadMetadataUpdated": + return; } }), ), diff --git a/apps/server/src/provider/Layers/GeminiAdapter.test.ts b/apps/server/src/provider/Layers/GeminiAdapter.test.ts new file mode 100644 index 00000000000..de4482046ec --- /dev/null +++ b/apps/server/src/provider/Layers/GeminiAdapter.test.ts @@ -0,0 +1,652 @@ +import assert from "node:assert/strict"; +const { chmodSync, mkdtempSync, rmSync, writeFileSync } = + require("node:fs") as typeof import("node:fs"); +const os = require("node:os") as typeof import("node:os"); +const path = require("node:path") as typeof import("node:path"); + +import * as NodeServices from "@effect/platform-node/NodeServices"; +import { + ProviderDriverKind, + ProviderRuntimeEvent, + ThreadId, + type ServerProvider, +} from "@t3tools/contracts"; +import { buildGeminiThinkingModelConfigAliases } from "@t3tools/shared/model"; +import * as Effect from "effect/Effect"; +import * as Layer from "effect/Layer"; +import * as Ref from "effect/Ref"; +import * as Stream from "effect/Stream"; +import { afterAll, describe, expect, it } from "vitest"; + +import { ServerConfig } from "../../config.ts"; +import { ServerSettingsService } from "../../serverSettings.ts"; +import { GeminiAdapter } from "../Services/GeminiAdapter.ts"; +import { ProviderRegistry } from "../Services/ProviderRegistry.ts"; +import { + accumulateGeminiPromptUsage, + buildGeminiPromptUsageSnapshot, + makeGeminiAdapterLive, + normalizeGeminiPromptUsage, + resolveRequestedGeminiModeId, +} from "./GeminiAdapter.ts"; +import { makeManualOnlyProviderMaintenanceCapabilities } from "../providerMaintenance.ts"; + +const GEMINI_PROVIDER = ProviderDriverKind.make("gemini"); + +const tempDirs: Array = []; + +afterAll(() => { + for (const dir of tempDirs.splice(0)) { + rmSync(dir, { force: true, recursive: true }); + } +}); + +function makeTempDir(prefix: string): string { + const dir = mkdtempSync(path.join(os.tmpdir(), prefix)); + tempDirs.push(dir); + return dir; +} + +function writeFakeGeminiBinary(): { + readonly baseDir: string; + readonly binaryPath: string; + readonly cwd: string; +} { + const baseDir = makeTempDir("gemini-adapter-test-"); + const binaryPath = path.join(baseDir, "fake-gemini-acp.js"); + + writeFileSync( + binaryPath, + `#!/usr/bin/env node +const readline = require("node:readline"); + +let sessionCounter = 0; +let currentModeId = "yolo"; +let currentSessionId = "session-" + process.pid + "-0"; +let pendingPrompt = null; + +const reply = (id, result) => { + process.stdout.write(JSON.stringify({ jsonrpc: "2.0", id, result }) + "\\n"); +}; + +const notify = (method, params) => { + process.stdout.write(JSON.stringify({ jsonrpc: "2.0", method, params }) + "\\n"); +}; + +const rl = readline.createInterface({ input: process.stdin }); +rl.on("line", (line) => { + const trimmed = line.trim(); + if (!trimmed) { + return; + } + + const message = JSON.parse(trimmed); + + switch (message.method) { + case "initialize": + reply(message.id, { protocolVersion: 1 }); + return; + case "session/new": + sessionCounter += 1; + currentSessionId = "session-" + process.pid + "-" + sessionCounter; + reply(message.id, { + sessionId: currentSessionId, + }); + return; + case "session/set_mode": + if (message.params && typeof message.params.modeId === "string") { + currentModeId = message.params.modeId; + } + reply(message.id, {}); + return; + case "session/set_model": + reply(message.id, {}); + return; + case "session/prompt": { + const sessionId = + message.params && typeof message.params.sessionId === "string" + ? message.params.sessionId + : currentSessionId; + const promptBlocks = Array.isArray(message.params?.prompt) ? message.params.prompt : []; + const promptText = promptBlocks + .filter((block) => block && block.type === "text" && typeof block.text === "string") + .map((block) => block.text) + .join("\\n"); + if (promptText.includes("wait for interrupt")) { + pendingPrompt = { + id: message.id, + sessionId, + }; + notify("session/update", { + sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "still working..." }, + }, + }); + return; + } + if (currentModeId === "plan") { + notify("session/update", { + sessionId, + update: { + sessionUpdate: "plan", + entries: [ + { + content: "Inspect the existing implementation", + priority: "high", + status: "completed", + }, + { + content: "Ship the requested change", + priority: "high", + status: "in_progress", + }, + ], + }, + }); + notify("session/update", { + sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { + type: "text", + text: "# Gemini plan\\n\\n- inspect the existing implementation\\n- ship the requested change", + }, + }, + }); + } else { + notify("session/update", { + sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "hello from fake gemini" }, + }, + }); + } + reply(message.id, { stopReason: "end_turn" }); + return; + } + case "session/cancel": + return; + default: + reply(message.id, {}); + } +}); + +process.on("SIGTERM", () => { + pendingPrompt = null; + setTimeout(() => process.exit(1), 75); +}); +`, + "utf8", + ); + chmodSync(binaryPath, 0o755); + + return { + baseDir, + binaryPath, + cwd: baseDir, + }; +} + +function makeProviderRegistryLayer(providers: ReadonlyArray = []) { + return Layer.succeed(ProviderRegistry, { + getProviders: Effect.succeed(providers), + refresh: () => Effect.succeed(providers), + refreshInstance: () => Effect.succeed(providers), + getProviderMaintenanceCapabilitiesForInstance: (_instanceId, provider) => + Effect.succeed( + makeManualOnlyProviderMaintenanceCapabilities({ + provider, + packageName: null, + }), + ), + setProviderMaintenanceActionState: () => Effect.succeed(providers), + streamChanges: Stream.empty, + }); +} + +function makeHarness() { + const fakeBinary = writeFakeGeminiBinary(); + + return makeGeminiAdapterLive().pipe( + Layer.provideMerge(ServerConfig.layerTest(fakeBinary.cwd, fakeBinary.baseDir)), + Layer.provideMerge( + ServerSettingsService.layerTest({ + providers: { + gemini: { + binaryPath: fakeBinary.binaryPath, + }, + }, + }), + ), + Layer.provideMerge(makeProviderRegistryLayer()), + Layer.provideMerge(NodeServices.layer), + ); +} + +describe("buildGeminiThinkingModelConfigAliases", () => { + it("builds Gemini 3 and Gemini 2.5 aliases from model families", () => { + expect( + buildGeminiThinkingModelConfigAliases(["auto-gemini-3", "gemini-2.5-flash", "custom-model"]), + ).toMatchObject({ + "t3code-gemini-auto-gemini-3-thinking-level-high": { + extends: "chat-base-3", + modelConfig: { + model: "auto-gemini-3", + generateContentConfig: { + thinkingConfig: { + thinkingLevel: "HIGH", + }, + }, + }, + }, + "t3code-gemini-auto-gemini-3-thinking-level-low": { + extends: "chat-base-3", + modelConfig: { + model: "auto-gemini-3", + generateContentConfig: { + thinkingConfig: { + thinkingLevel: "LOW", + }, + }, + }, + }, + "t3code-gemini-gemini-2-5-flash-thinking-budget-dynamic": { + extends: "chat-base-2.5", + modelConfig: { + model: "gemini-2.5-flash", + generateContentConfig: { + thinkingConfig: { + thinkingBudget: -1, + }, + }, + }, + }, + "t3code-gemini-gemini-2-5-flash-thinking-budget-512": { + extends: "chat-base-2.5", + modelConfig: { + model: "gemini-2.5-flash", + generateContentConfig: { + thinkingConfig: { + thinkingBudget: 512, + }, + }, + }, + }, + }); + }); +}); + +describe("GeminiAdapterLive", () => { + it("accumulates Gemini prompt usage into cumulative and per-turn snapshots", () => { + const firstTurn = normalizeGeminiPromptUsage({ + totalTokens: 120, + inputTokens: 80, + outputTokens: 40, + }); + assert.ok(firstTurn); + + const firstCumulative = accumulateGeminiPromptUsage(undefined, firstTurn); + expect(buildGeminiPromptUsageSnapshot(undefined, firstCumulative, firstTurn)).toEqual({ + usedTokens: 120, + totalProcessedTokens: 120, + inputTokens: 80, + outputTokens: 40, + lastUsedTokens: 120, + lastInputTokens: 80, + lastOutputTokens: 40, + }); + + const secondTurn = normalizeGeminiPromptUsage({ + totalTokens: 30, + inputTokens: 20, + outputTokens: 10, + }); + assert.ok(secondTurn); + + const secondCumulative = accumulateGeminiPromptUsage(firstCumulative, secondTurn); + expect(buildGeminiPromptUsageSnapshot(undefined, secondCumulative, secondTurn)).toEqual({ + usedTokens: 150, + totalProcessedTokens: 150, + inputTokens: 100, + outputTokens: 50, + lastUsedTokens: 30, + lastInputTokens: 20, + lastOutputTokens: 10, + }); + + expect( + buildGeminiPromptUsageSnapshot( + { + usedTokens: 2_048, + totalProcessedTokens: 120, + maxTokens: 1_000_000, + }, + secondCumulative, + secondTurn, + ), + ).toEqual({ + usedTokens: 2_048, + totalProcessedTokens: 150, + maxTokens: 1_000_000, + inputTokens: 100, + outputTokens: 50, + lastUsedTokens: 30, + lastInputTokens: 20, + lastOutputTokens: 10, + }); + }); + + it("does not emit stale exit events when startSession replaces an existing session", async () => { + await Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const adapter = yield* GeminiAdapter; + const eventsRef = yield* Ref.make>([]); + + yield* Stream.runForEach(adapter.streamEvents, (event) => + Ref.update(eventsRef, (events) => [...events, event]), + ).pipe(Effect.forkScoped); + + const threadId = ThreadId.make("thread-gemini-stale-exit"); + + yield* adapter.startSession({ + provider: GEMINI_PROVIDER, + threadId, + runtimeMode: "full-access", + }); + yield* adapter.startSession({ + provider: GEMINI_PROVIDER, + threadId, + runtimeMode: "full-access", + }); + + yield* Effect.sleep("250 millis"); + + const events = yield* Ref.get(eventsRef); + assert.equal( + events.some((event) => event.type === "runtime.error"), + false, + "replaced sessions should not emit stale runtime.error events", + ); + assert.equal( + events.some((event) => event.type === "session.exited"), + false, + "replaced sessions should not emit stale session.exited events", + ); + }).pipe(Effect.provide(makeHarness())), + ), + ); + }); + + it("keeps Gemini resume cursors small and free of snapshot payloads", async () => { + await Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const adapter = yield* GeminiAdapter; + const threadId = ThreadId.make("thread-gemini-small-resume-cursor"); + + const session = yield* adapter.startSession({ + provider: GEMINI_PROVIDER, + threadId, + runtimeMode: "full-access", + }); + + const startedTurn = yield* adapter.sendTurn({ + threadId, + input: "hello", + attachments: [], + }); + + expect(session.resumeCursor).toEqual({ + schemaVersion: 1, + sessionId: expect.any(String), + }); + expect(startedTurn.resumeCursor).toEqual({ + schemaVersion: 1, + sessionId: expect.any(String), + }); + expect(startedTurn.resumeCursor).not.toHaveProperty("snapshots"); + expect(startedTurn.resumeCursor).not.toHaveProperty("filePath"); + expect(startedTurn.resumeCursor).not.toHaveProperty("items"); + }).pipe(Effect.provide(makeHarness())), + ), + ); + }); + + it("captures Gemini plan turns as proposed plans for follow-up actions", async () => { + await Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const adapter = yield* GeminiAdapter; + const eventsRef = yield* Ref.make>([]); + + yield* Stream.runForEach(adapter.streamEvents, (event) => + Ref.update(eventsRef, (events) => [...events, event]), + ).pipe(Effect.forkScoped); + + const threadId = ThreadId.make("thread-gemini-plan"); + yield* adapter.startSession({ + provider: GEMINI_PROVIDER, + threadId, + runtimeMode: "full-access", + }); + + yield* adapter.sendTurn({ + threadId, + input: "plan this change", + interactionMode: "plan", + attachments: [], + }); + + for (let remainingAttempts = 50; remainingAttempts > 0; remainingAttempts -= 1) { + const events = yield* Ref.get(eventsRef); + if ( + events.some((event) => event.type === "turn.proposed.completed") && + events.some((event) => event.type === "turn.completed") + ) { + break; + } + yield* Effect.sleep("10 millis"); + } + + const events = yield* Ref.get(eventsRef); + const proposedEvent = events.find((event) => event.type === "turn.proposed.completed"); + expect(proposedEvent).toBeDefined(); + if (proposedEvent?.type !== "turn.proposed.completed") { + return; + } + + expect(proposedEvent.payload.planMarkdown).toBe( + "# Gemini plan\n\n- inspect the existing implementation\n- ship the requested change", + ); + }).pipe(Effect.provide(makeHarness())), + ), + ); + }); + + it("falls back to interrupting the turn locally when Gemini ignores session/cancel", async () => { + await Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const adapter = yield* GeminiAdapter; + const eventsRef = yield* Ref.make>([]); + + yield* Stream.runForEach(adapter.streamEvents, (event) => + Ref.update(eventsRef, (events) => [...events, event]), + ).pipe(Effect.forkScoped); + + const threadId = ThreadId.make("thread-gemini-unsupported-cancel"); + yield* adapter.startSession({ + provider: GEMINI_PROVIDER, + threadId, + runtimeMode: "full-access", + }); + + const started = yield* adapter.sendTurn({ + threadId, + input: "wait for interrupt", + attachments: [], + }); + + for (let remainingAttempts = 50; remainingAttempts > 0; remainingAttempts -= 1) { + const events = yield* Ref.get(eventsRef); + if ( + events.some( + (event) => + event.type === "content.delta" && + event.turnId === started.turnId && + event.payload.delta === "still working...", + ) + ) { + break; + } + yield* Effect.sleep("10 millis"); + } + + yield* adapter.interruptTurn(threadId, started.turnId); + + for (let remainingAttempts = 100; remainingAttempts > 0; remainingAttempts -= 1) { + const events = yield* Ref.get(eventsRef); + if ( + events.some( + (event) => + event.type === "turn.completed" && + event.turnId === started.turnId && + event.payload.state === "interrupted", + ) && + events.some((event) => event.type === "session.exited") + ) { + break; + } + yield* Effect.sleep("10 millis"); + } + + const events = yield* Ref.get(eventsRef); + const turnCompletedEvents = events.filter( + (event): event is Extract => + event.type === "turn.completed" && event.turnId === started.turnId, + ); + expect(turnCompletedEvents).toHaveLength(1); + expect( + turnCompletedEvents.some( + (event) => + event.payload.state === "interrupted" && event.payload.stopReason === "cancelled", + ), + ).toBe(true); + expect(events.some((event) => event.type === "session.exited")).toBe(true); + expect(events.some((event) => event.type === "runtime.error")).toBe(false); + }).pipe(Effect.provide(makeHarness())), + ), + ); + }); + + it("does not emit ready after stopSession exits an active Gemini turn", async () => { + await Effect.runPromise( + Effect.scoped( + Effect.gen(function* () { + const adapter = yield* GeminiAdapter; + const eventsRef = yield* Ref.make>([]); + + yield* Stream.runForEach(adapter.streamEvents, (event) => + Ref.update(eventsRef, (events) => [...events, event]), + ).pipe(Effect.forkScoped); + + const threadId = ThreadId.make("thread-gemini-stop-active-turn"); + yield* adapter.startSession({ + provider: GEMINI_PROVIDER, + threadId, + runtimeMode: "full-access", + }); + + const started = yield* adapter.sendTurn({ + threadId, + input: "wait for interrupt", + attachments: [], + }); + + for (let remainingAttempts = 50; remainingAttempts > 0; remainingAttempts -= 1) { + const events = yield* Ref.get(eventsRef); + if ( + events.some( + (event) => + event.type === "content.delta" && + event.turnId === started.turnId && + event.payload.delta === "still working...", + ) + ) { + break; + } + yield* Effect.sleep("10 millis"); + } + + yield* adapter.stopSession(threadId); + yield* Effect.sleep("250 millis"); + + const events = yield* Ref.get(eventsRef); + const exitIndex = events.findIndex((event) => event.type === "session.exited"); + expect(exitIndex).toBeGreaterThanOrEqual(0); + const turnCompletedEvents = events.filter( + (event): event is Extract => + event.type === "turn.completed" && event.turnId === started.turnId, + ); + expect(turnCompletedEvents).toHaveLength(1); + expect(turnCompletedEvents[0]?.payload).toMatchObject({ + state: "interrupted", + stopReason: "cancelled", + }); + expect( + events.findIndex( + (event) => event.type === "turn.completed" && event.turnId === started.turnId, + ), + ).toBeLessThan(exitIndex); + expect( + events + .slice(exitIndex + 1) + .some( + (event) => + event.type === "session.state.changed" && event.payload.state === "ready", + ), + ).toBe(false); + expect(events.some((event) => event.type === "runtime.error")).toBe(false); + }).pipe(Effect.provide(makeHarness())), + ), + ); + }); + + it("restores the runtime-backed Gemini mode after leaving plan mode", () => { + expect( + resolveRequestedGeminiModeId({ + interactionMode: "default", + runtimeModeId: "default", + currentModeId: "plan", + }), + ).toBe("default"); + expect( + resolveRequestedGeminiModeId({ + interactionMode: "default", + runtimeModeId: "autoEdit", + currentModeId: "plan", + }), + ).toBe("autoEdit"); + expect( + resolveRequestedGeminiModeId({ + interactionMode: "default", + runtimeModeId: "yolo", + currentModeId: "plan", + }), + ).toBe("yolo"); + }); + + it("leaves the current Gemini mode unchanged when interaction mode is omitted", () => { + expect( + resolveRequestedGeminiModeId({ + interactionMode: undefined, + runtimeModeId: "yolo", + currentModeId: "plan", + }), + ).toBe("plan"); + }); +}); diff --git a/apps/server/src/provider/Layers/GeminiAdapter.ts b/apps/server/src/provider/Layers/GeminiAdapter.ts new file mode 100644 index 00000000000..49f8a7acf76 --- /dev/null +++ b/apps/server/src/provider/Layers/GeminiAdapter.ts @@ -0,0 +1,1918 @@ +/** + * GeminiAdapterLive - Scoped live implementation for the Gemini provider adapter. + * + * Wraps Gemini CLI ACP sessions behind the generic provider adapter contract + * and emits canonical provider runtime events. + * + * @module GeminiAdapterLive + */ +const path = require("node:path") as typeof import("node:path"); + +import { + ApprovalRequestId, + type CanonicalItemType, + EventId, + type ProviderApprovalDecision, + type ProviderRuntimeEvent, + type ProviderSendTurnInput, + type ProviderSession, + RuntimeItemId, + RuntimeRequestId, + ThreadId, + type ThreadTokenUsageSnapshot, + TurnId, + type GeminiSettings, + ProviderDriverKind, +} from "@t3tools/contracts"; +import { resolveGeminiApiModelId } from "@t3tools/shared/model"; +import * as DateTime from "effect/DateTime"; +import * as Deferred from "effect/Deferred"; +import * as Effect from "effect/Effect"; +import * as Exit from "effect/Exit"; +import * as Fiber from "effect/Fiber"; +import * as FileSystem from "effect/FileSystem"; +import * as Layer from "effect/Layer"; +import * as Option from "effect/Option"; +import * as Queue from "effect/Queue"; +import * as Scope from "effect/Scope"; +import * as Semaphore from "effect/Semaphore"; +import * as Stream from "effect/Stream"; +import * as SynchronizedRef from "effect/SynchronizedRef"; +import { ChildProcessSpawner } from "effect/unstable/process"; +import type * as EffectAcpSchema from "effect-acp/schema"; + +import { resolveAttachmentPath } from "../../attachmentStore.ts"; +import { ServerConfig } from "../../config.ts"; +import { ServerSettingsService } from "../../serverSettings.ts"; +import { + ProviderAdapterProcessError, + ProviderAdapterRequestError, + ProviderAdapterSessionClosedError, + ProviderAdapterSessionNotFoundError, + ProviderAdapterValidationError, + type ProviderAdapterError, +} from "../Errors.ts"; +import { mapAcpToAdapterError } from "../acp/AcpAdapterSupport.ts"; +import { + makeAcpAssistantItemEvent, + makeAcpContentDeltaEvent, + makeAcpPlanUpdatedEvent, + makeAcpRequestOpenedEvent, + makeAcpRequestResolvedEvent, + makeAcpToolCallEvent, +} from "../acp/AcpCoreRuntimeEvents.ts"; +import { makeAcpNativeLoggers } from "../acp/AcpNativeLogging.ts"; +import { + type AcpPlanUpdate, + type AcpParsedSessionEvent, + type AcpPermissionRequest, + parsePermissionRequest, +} from "../acp/AcpRuntimeModel.ts"; +import { type AcpSessionRuntimeShape } from "../acp/AcpSessionRuntime.ts"; +import { makeGeminiAcpRuntime } from "../acp/GeminiAcpSupport.ts"; +import { + buildGeminiResumeCursor, + cleanupGeminiSystemSettings, + cloneGeminiSessionFile, + cloneGeminiStoredTurn, + cloneGeminiTurnItems, + findGeminiSessionFileById, + type GeminiStoredTurn, + readGeminiResumeSessionId, + readGeminiLaunchEnv, + readLegacyGeminiResumeTurns, + writeGeminiModelAliasSettings, +} from "../geminiCliFiles.ts"; +import { resolveGeminiBinaryPath } from "../geminiBinaryPath.ts"; +import { GeminiAdapter, type GeminiAdapterShape } from "../Services/GeminiAdapter.ts"; +import { asNumber, asRecord, trimToUndefined } from "../jsonValue.ts"; +import { type EventNdjsonLogger, makeEventNdjsonLogger } from "./EventNdjsonLogger.ts"; + +const PROVIDER = ProviderDriverKind.make("gemini"); + +function currentIsoTimestamp(): string { + return DateTime.formatIso(Effect.runSync(DateTime.now)); +} + +interface GeminiPendingApproval { + readonly decision: Deferred.Deferred; +} + +interface GeminiRecordedItem { + id: string; + itemType: CanonicalItemType; + title?: string; + detail?: string; + status?: "inProgress" | "completed" | "failed"; + text?: string; + data?: unknown; +} + +interface GeminiTurnState { + readonly turnId: TurnId; + readonly isPlanTurn: boolean; + reasoningItemId: RuntimeItemId | undefined; + readonly items: GeminiRecordedItem[]; + reasoningTextStarted: boolean; + latestPlanUpdate: AcpPlanUpdate | undefined; + proposedPlanCaptured: boolean; +} + +interface GeminiSessionContext { + readonly threadId: ThreadId; + session: ProviderSession; + readonly scope: Scope.Closeable; + readonly acp: AcpSessionRuntimeShape; + notificationFiber: Fiber.Fiber | undefined; + readonly pendingApprovals: Map; + readonly turns: GeminiStoredTurn[]; + readonly runtimeModeId: string; + readonly sessionId: string; + currentModeId: string | undefined; + currentModelId: string | undefined; + turnState: GeminiTurnState | undefined; + sessionFilePath: string | undefined; + systemSettingsPath: string | undefined; + stopped: boolean; + interruptedTurnIds: Set; + lastKnownTokenUsage: ThreadTokenUsageSnapshot | undefined; + cumulativePromptUsage: GeminiPromptUsageSnapshot | undefined; +} + +export interface GeminiAdapterLiveOptions { + readonly nativeEventLogPath?: string; + readonly nativeEventLogger?: EventNdjsonLogger; + readonly environment?: NodeJS.ProcessEnv; +} + +export interface GeminiPromptUsageSnapshot { + readonly usedTokens: number; + readonly inputTokens?: number; + readonly cachedInputTokens?: number; + readonly outputTokens?: number; + readonly reasoningOutputTokens?: number; +} + +function toMessage(cause: unknown, fallback: string): string { + if (cause instanceof Error && cause.message.trim().length > 0) { + return cause.message; + } + return fallback; +} + +function toSpawnEnv( + env: NodeJS.ProcessEnv | Readonly> | undefined, +): Readonly> | undefined { + if (!env) { + return undefined; + } + const entries = Object.entries(env).filter( + (entry): entry is [string, string] => typeof entry[1] === "string", + ); + return entries.length > 0 ? Object.fromEntries(entries) : undefined; +} + +function buildResumeCursor(context: GeminiSessionContext) { + return buildGeminiResumeCursor(context.sessionId); +} + +function runtimeModeToGeminiModeId(runtimeMode: ProviderSession["runtimeMode"]): string { + switch (runtimeMode) { + case "approval-required": + return "default"; + case "auto-accept-edits": + return "autoEdit"; + case "full-access": + default: + return "yolo"; + } +} + +function getGeminiCliApprovalModeFlag(runtimeModeId: string): string { + if (runtimeModeId === "autoEdit") { + return "auto_edit"; + } + return runtimeModeId; +} + +export function resolveRequestedGeminiModeId(input: { + readonly interactionMode: ProviderSendTurnInput["interactionMode"]; + readonly runtimeModeId: string; + readonly currentModeId: string | undefined; +}): string | undefined { + if (input.interactionMode === "plan") { + return "plan"; + } + + if (input.interactionMode === "default") { + return input.runtimeModeId; + } + + return input.currentModeId; +} + +function itemTypeFromAcpToolKind(kind: string | undefined): CanonicalItemType { + switch (kind) { + case "execute": + return "command_execution"; + case "edit": + case "delete": + case "move": + return "file_change"; + case "search": + case "fetch": + return "web_search"; + default: + return "dynamic_tool_call"; + } +} + +function runtimeStatusFromAcpToolStatus( + status: "pending" | "inProgress" | "completed" | "failed" | undefined, +): "inProgress" | "completed" | "failed" | undefined { + switch (status) { + case "pending": + case "inProgress": + return "inProgress"; + case "completed": + return "completed"; + case "failed": + return "failed"; + default: + return undefined; + } +} + +function isAskUserPermissionRequest(request: AcpPermissionRequest): boolean { + return request.toolCall?.title?.trim().toLowerCase() === "ask user"; +} + +function permissionOutcomeFromGeminiOptions( + decision: ProviderApprovalDecision, + options: ReadonlyArray, +): { outcome: { outcome: "cancelled" } | { outcome: "selected"; optionId: string } } { + if (decision === "cancel") { + return { outcome: { outcome: "cancelled" } }; + } + + const pick = (...kinds: ReadonlyArray) => + kinds + .map((kind) => options.find((option) => option.kind === kind)) + .find((option) => option !== undefined); + + const selected = + decision === "acceptForSession" + ? pick("allow_always", "allow_once") + : decision === "accept" + ? pick("allow_once", "allow_always") + : pick("reject_once", "reject_always"); + + const optionId = + typeof selected?.optionId === "string" && selected.optionId.trim().length > 0 + ? selected.optionId.trim() + : undefined; + return optionId + ? { + outcome: { + outcome: "selected", + optionId, + }, + } + : { outcome: { outcome: "cancelled" } }; +} + +function sumTokenUsageValue( + left: number | undefined, + right: number | undefined, +): number | undefined { + if (left === undefined && right === undefined) { + return undefined; + } + return (left ?? 0) + (right ?? 0); +} + +/** @internal - Exported for testing */ +export function normalizeGeminiPromptUsage(value: unknown): GeminiPromptUsageSnapshot | undefined { + const usage = asRecord(value); + const usedTokens = asNumber(usage?.totalTokens); + if (usedTokens === undefined || usedTokens <= 0) { + return undefined; + } + + const inputTokens = asNumber(usage?.inputTokens); + const outputTokens = asNumber(usage?.outputTokens); + const thoughtTokens = asNumber(usage?.thoughtTokens); + const cachedReadTokens = asNumber(usage?.cachedReadTokens); + const cachedWriteTokens = asNumber(usage?.cachedWriteTokens); + const cachedInputTokens = + (cachedReadTokens ?? 0) + (cachedWriteTokens ?? 0) > 0 + ? (cachedReadTokens ?? 0) + (cachedWriteTokens ?? 0) + : undefined; + + return { + usedTokens, + ...(inputTokens !== undefined ? { inputTokens } : {}), + ...(cachedInputTokens !== undefined ? { cachedInputTokens } : {}), + ...(outputTokens !== undefined ? { outputTokens } : {}), + ...(thoughtTokens !== undefined ? { reasoningOutputTokens: thoughtTokens } : {}), + }; +} + +/** @internal - Exported for testing */ +export function accumulateGeminiPromptUsage( + cumulativeUsage: GeminiPromptUsageSnapshot | undefined, + turnUsage: GeminiPromptUsageSnapshot, +): GeminiPromptUsageSnapshot { + const inputTokens = sumTokenUsageValue(cumulativeUsage?.inputTokens, turnUsage.inputTokens); + const cachedInputTokens = sumTokenUsageValue( + cumulativeUsage?.cachedInputTokens, + turnUsage.cachedInputTokens, + ); + const outputTokens = sumTokenUsageValue(cumulativeUsage?.outputTokens, turnUsage.outputTokens); + const reasoningOutputTokens = sumTokenUsageValue( + cumulativeUsage?.reasoningOutputTokens, + turnUsage.reasoningOutputTokens, + ); + + return { + usedTokens: (cumulativeUsage?.usedTokens ?? 0) + turnUsage.usedTokens, + ...(inputTokens !== undefined ? { inputTokens } : {}), + ...(cachedInputTokens !== undefined ? { cachedInputTokens } : {}), + ...(outputTokens !== undefined ? { outputTokens } : {}), + ...(reasoningOutputTokens !== undefined ? { reasoningOutputTokens } : {}), + }; +} + +/** @internal - Exported for testing */ +export function buildGeminiPromptUsageSnapshot( + lastKnownUsage: ThreadTokenUsageSnapshot | undefined, + cumulativeUsage: GeminiPromptUsageSnapshot, + turnUsage: GeminiPromptUsageSnapshot, +): ThreadTokenUsageSnapshot { + const keepContextWindowUsage = + typeof lastKnownUsage?.maxTokens === "number" && + Number.isFinite(lastKnownUsage.maxTokens) && + lastKnownUsage.maxTokens > 0; + const usedTokens = keepContextWindowUsage + ? lastKnownUsage.usedTokens + : cumulativeUsage.usedTokens; + + return { + usedTokens, + totalProcessedTokens: cumulativeUsage.usedTokens, + ...(keepContextWindowUsage ? { maxTokens: lastKnownUsage.maxTokens } : {}), + ...(cumulativeUsage.inputTokens !== undefined + ? { inputTokens: cumulativeUsage.inputTokens } + : {}), + ...(cumulativeUsage.cachedInputTokens !== undefined + ? { cachedInputTokens: cumulativeUsage.cachedInputTokens } + : {}), + ...(cumulativeUsage.outputTokens !== undefined + ? { outputTokens: cumulativeUsage.outputTokens } + : {}), + ...(cumulativeUsage.reasoningOutputTokens !== undefined + ? { reasoningOutputTokens: cumulativeUsage.reasoningOutputTokens } + : {}), + lastUsedTokens: turnUsage.usedTokens, + ...(turnUsage.inputTokens !== undefined ? { lastInputTokens: turnUsage.inputTokens } : {}), + ...(turnUsage.cachedInputTokens !== undefined + ? { lastCachedInputTokens: turnUsage.cachedInputTokens } + : {}), + ...(turnUsage.outputTokens !== undefined ? { lastOutputTokens: turnUsage.outputTokens } : {}), + ...(turnUsage.reasoningOutputTokens !== undefined + ? { lastReasoningOutputTokens: turnUsage.reasoningOutputTokens } + : {}), + }; +} + +function updateGeminiSession( + context: GeminiSessionContext, + patch: Partial, +): ProviderSession { + context.session = { + ...context.session, + ...patch, + updatedAt: currentIsoTimestamp(), + }; + return context.session; +} + +function upsertGeminiTurnItem( + turnState: GeminiTurnState, + itemId: string, + itemType: CanonicalItemType, + patch: Partial, +): GeminiRecordedItem { + let item = turnState.items.find((candidate) => candidate.id === itemId); + if (!item) { + item = { id: itemId, itemType }; + turnState.items.push(item); + } + item.itemType = itemType; + Object.assign(item, patch); + return item; +} + +function assistantMarkdownFromGeminiTurn(turnState: GeminiTurnState): string | undefined { + return trimToUndefined( + turnState.items + .filter( + (item): item is GeminiRecordedItem & { text: string } => + item.itemType === "assistant_message" && typeof item.text === "string", + ) + .map((item) => item.text) + .join(""), + ); +} + +function planMarkdownFromUpdate(planUpdate: AcpPlanUpdate | undefined): string | undefined { + if (!planUpdate) { + return undefined; + } + + const explanation = trimToUndefined(planUpdate.explanation ?? undefined); + const steps = planUpdate.plan + .map((entry) => trimToUndefined(entry.step)) + .filter((entry): entry is string => entry !== undefined); + + if (!explanation && steps.length === 0) { + return undefined; + } + + const lines = ["# Plan"]; + if (explanation) { + lines.push("", explanation); + } + if (steps.length > 0) { + lines.push( + "", + ...planUpdate.plan.map((entry, index) => { + const step = trimToUndefined(entry.step) ?? `Step ${index + 1}`; + switch (entry.status) { + case "completed": + return `- [x] ${step}`; + case "inProgress": + return `- [ ] ${step} (in progress)`; + default: + return `- [ ] ${step}`; + } + }), + ); + } + + return trimToUndefined(lines.join("\n")); +} + +function proposedPlanMarkdownFromGeminiTurn(turnState: GeminiTurnState): string | undefined { + return ( + assistantMarkdownFromGeminiTurn(turnState) ?? planMarkdownFromUpdate(turnState.latestPlanUpdate) + ); +} + +function settlePendingApprovalsAsCancelled( + pendingApprovals: ReadonlyMap, +): Effect.Effect { + const pendingEntries = Array.from(pendingApprovals.values()); + return Effect.forEach( + pendingEntries, + (pending) => Deferred.succeed(pending.decision, "cancel").pipe(Effect.ignore), + { + discard: true, + }, + ); +} + +export function makeGeminiAdapter( + geminiSettings: GeminiSettings, + options?: GeminiAdapterLiveOptions, +) { + return Effect.gen(function* () { + const childProcessSpawner = yield* ChildProcessSpawner.ChildProcessSpawner; + const fileSystem = yield* FileSystem.FileSystem; + const serverConfig = yield* ServerConfig; + const runtimeContext = yield* Effect.context(); + const launchEnvironment = options?.environment ?? process.env; + const runFork = Effect.runForkWith(runtimeContext); + const runtimeEventQueue = yield* Queue.unbounded(); + const nativeEventLogger = + options?.nativeEventLogger ?? + (options?.nativeEventLogPath !== undefined + ? yield* makeEventNdjsonLogger(options.nativeEventLogPath, { + stream: "native", + }) + : undefined); + const managedNativeEventLogger = + options?.nativeEventLogger === undefined ? nativeEventLogger : undefined; + + const sessions = new Map(); + const threadLocksRef = yield* SynchronizedRef.make(new Map()); + + const makeEventStamp = () => ({ + eventId: EventId.make(crypto.randomUUID()), + createdAt: currentIsoTimestamp(), + }); + + const makeEventBase = (context: GeminiSessionContext) => ({ + ...makeEventStamp(), + provider: PROVIDER, + threadId: context.threadId, + }); + + const offerRuntimeEvent = (event: ProviderRuntimeEvent) => + Queue.offer(runtimeEventQueue, event); + + const getThreadSemaphore = (threadId: string) => + SynchronizedRef.modifyEffect(threadLocksRef, (current) => { + const existing: Option.Option = Option.fromNullishOr( + current.get(threadId), + ); + return Option.match(existing, { + onNone: () => + Semaphore.make(1).pipe( + Effect.map((semaphore) => { + const next = new Map(current); + next.set(threadId, semaphore); + return [semaphore, next] as const; + }), + ), + onSome: (semaphore) => Effect.succeed([semaphore, current] as const), + }); + }); + + const withThreadLock = (threadId: string, effect: Effect.Effect) => + Effect.flatMap(getThreadSemaphore(threadId), (semaphore) => semaphore.withPermit(effect)); + + const getGeminiSettings = Effect.fn("getGeminiSettings")(function* (threadId: ThreadId) { + if (!geminiSettings.enabled) { + return yield* new ProviderAdapterProcessError({ + provider: PROVIDER, + threadId, + detail: "Gemini is disabled in T3 Code settings.", + }); + } + return geminiSettings; + }); + + const prepareGeminiLaunchConfig = Effect.fn("prepareGeminiLaunchConfig")(function* (input: { + readonly threadId: ThreadId; + readonly selectedModel?: string; + }) { + const candidateModels = [ + ...geminiSettings.customModels, + ...(input.selectedModel ? [input.selectedModel] : []), + ]; + + return yield* Effect.tryPromise({ + try: async () => { + const modelAliasSettings = await writeGeminiModelAliasSettings({ + scopeId: input.threadId, + modelIds: candidateModels, + }); + const env = await readGeminiLaunchEnv(modelAliasSettings.env); + return { + ...modelAliasSettings, + ...(env ? { env } : {}), + }; + }, + catch: (cause) => + new ProviderAdapterProcessError({ + provider: PROVIDER, + threadId: input.threadId, + detail: `Failed to prepare Gemini launch environment: ${toMessage(cause, "prepare failed")}`, + cause, + }), + }); + }); + + const snapshotThread = (context: GeminiSessionContext) => ({ + threadId: context.threadId, + turns: context.turns.map((turn) => ({ + id: turn.id, + items: cloneGeminiTurnItems(turn.items), + })), + }); + + const requireSession = ( + threadId: ThreadId, + ): Effect.Effect => { + const context = sessions.get(threadId); + if (!context) { + return Effect.fail( + new ProviderAdapterSessionNotFoundError({ provider: PROVIDER, threadId }), + ); + } + if (context.stopped) { + return Effect.fail(new ProviderAdapterSessionClosedError({ provider: PROVIDER, threadId })); + } + return Effect.succeed(context); + }; + + const emitSessionState = ( + context: GeminiSessionContext, + state: "starting" | "ready" | "running" | "stopped" | "error", + reason?: string, + detail?: unknown, + ) => + offerRuntimeEvent({ + ...makeEventBase(context), + type: "session.state.changed", + payload: { + state, + ...(reason ? { reason } : {}), + ...(detail !== undefined ? { detail } : {}), + }, + ...(detail !== undefined + ? { + raw: { + source: "acp.jsonrpc" as const, + method: "session/state", + payload: detail, + }, + } + : {}), + }); + + const emitRuntimeWarning = ( + context: GeminiSessionContext, + message: string, + raw?: { + readonly method: string; + readonly payload: unknown; + }, + ) => + offerRuntimeEvent({ + ...makeEventBase(context), + ...(context.turnState ? { turnId: context.turnState.turnId } : {}), + type: "runtime.warning", + payload: { message, ...(raw ? { detail: raw.payload } : {}) }, + ...(raw + ? { + raw: { + source: "acp.jsonrpc" as const, + method: raw.method, + payload: raw.payload, + }, + } + : {}), + }); + + const emitRuntimeError = ( + context: GeminiSessionContext, + message: string, + detail?: unknown, + turnId?: TurnId, + ) => + offerRuntimeEvent({ + ...makeEventBase(context), + ...(turnId ? { turnId } : {}), + type: "runtime.error", + payload: { + message, + class: "provider_error", + ...(detail !== undefined ? { detail } : {}), + }, + ...(detail !== undefined + ? { + raw: { + source: "acp.jsonrpc" as const, + method: "runtime/error", + payload: detail, + }, + } + : {}), + }); + + const emitUsage = ( + context: GeminiSessionContext, + usage: ThreadTokenUsageSnapshot, + turnId?: TurnId, + rawPayload?: unknown, + ) => { + context.lastKnownTokenUsage = { + ...context.lastKnownTokenUsage, + ...usage, + usedTokens: usage.usedTokens, + }; + return offerRuntimeEvent({ + ...makeEventBase(context), + ...(turnId ? { turnId } : {}), + type: "thread.token-usage.updated", + payload: { usage: context.lastKnownTokenUsage }, + ...(rawPayload !== undefined + ? { + raw: { + source: "acp.jsonrpc" as const, + method: "session/update", + payload: rawPayload, + }, + } + : {}), + }); + }; + + const emitReasoningItemStarted = (context: GeminiSessionContext) => + Effect.gen(function* () { + const turnState = context.turnState; + if (!turnState || turnState.reasoningTextStarted) { + return; + } + const itemId = RuntimeItemId.make(`gemini-reasoning-${crypto.randomUUID()}`); + turnState.reasoningItemId = itemId; + turnState.reasoningTextStarted = true; + upsertGeminiTurnItem(turnState, itemId, "reasoning", { + status: "inProgress", + title: "Reasoning", + }); + yield* offerRuntimeEvent({ + ...makeEventBase(context), + turnId: turnState.turnId, + itemId, + type: "item.started", + payload: { + itemType: "reasoning", + status: "inProgress", + title: "Reasoning", + }, + }); + }); + + const emitContentDelta = ( + context: GeminiSessionContext, + event: Extract, + ) => + Effect.gen(function* () { + const turnState = context.turnState; + if (!turnState || event.text.length === 0) { + return; + } + + let activeTurnState = turnState; + let itemId = event.itemId; + if (event.streamKind === "reasoning_text") { + yield* emitReasoningItemStarted(context); + const nextTurnState = context.turnState; + if (!nextTurnState || nextTurnState.turnId !== turnState.turnId) { + return; + } + activeTurnState = nextTurnState; + itemId = activeTurnState.reasoningItemId; + } + if (!itemId) { + return; + } + + const itemType = + event.streamKind === "assistant_text" ? "assistant_message" : ("reasoning" as const); + const existing = upsertGeminiTurnItem(activeTurnState, itemId, itemType, {}); + existing.text = `${existing.text ?? ""}${event.text}`; + + yield* offerRuntimeEvent( + makeAcpContentDeltaEvent({ + stamp: makeEventStamp(), + provider: PROVIDER, + threadId: context.threadId, + turnId: activeTurnState.turnId, + streamKind: event.streamKind, + itemId, + text: event.text, + rawPayload: event.rawPayload, + }), + ); + }); + + const handleAcpEvent = (context: GeminiSessionContext, event: AcpParsedSessionEvent) => + Effect.gen(function* () { + switch (event._tag) { + case "ModeChanged": + context.currentModeId = event.modeId; + return; + case "AssistantItemStarted": { + if (!context.turnState) { + return; + } + upsertGeminiTurnItem(context.turnState, event.itemId, "assistant_message", { + status: "inProgress", + title: "Assistant message", + }); + yield* offerRuntimeEvent( + makeAcpAssistantItemEvent({ + stamp: makeEventStamp(), + provider: PROVIDER, + threadId: context.threadId, + turnId: context.turnState.turnId, + itemId: event.itemId, + lifecycle: "item.started", + }), + ); + return; + } + case "AssistantItemCompleted": { + if (!context.turnState) { + return; + } + upsertGeminiTurnItem(context.turnState, event.itemId, "assistant_message", { + status: "completed", + }); + yield* offerRuntimeEvent( + makeAcpAssistantItemEvent({ + stamp: makeEventStamp(), + provider: PROVIDER, + threadId: context.threadId, + turnId: context.turnState.turnId, + itemId: event.itemId, + lifecycle: "item.completed", + }), + ); + return; + } + case "PlanUpdated": + if (context.turnState) { + context.turnState.latestPlanUpdate = event.payload; + } + yield* offerRuntimeEvent( + makeAcpPlanUpdatedEvent({ + stamp: makeEventStamp(), + provider: PROVIDER, + threadId: context.threadId, + turnId: context.turnState?.turnId, + payload: event.payload, + source: "acp.jsonrpc", + method: "session/update", + rawPayload: event.rawPayload, + }), + ); + return; + case "ToolCallUpdated": { + if (!context.turnState) { + return; + } + const runtimeStatus = runtimeStatusFromAcpToolStatus(event.toolCall.status); + const itemPatch: Partial = { + data: event.toolCall.data, + }; + if (event.toolCall.title) { + itemPatch.title = event.toolCall.title; + } + if (event.toolCall.detail) { + itemPatch.detail = event.toolCall.detail; + } + if (runtimeStatus) { + itemPatch.status = runtimeStatus; + } + upsertGeminiTurnItem( + context.turnState, + event.toolCall.toolCallId, + itemTypeFromAcpToolKind(event.toolCall.kind), + itemPatch, + ); + yield* offerRuntimeEvent( + makeAcpToolCallEvent({ + stamp: makeEventStamp(), + provider: PROVIDER, + threadId: context.threadId, + turnId: context.turnState.turnId, + toolCall: event.toolCall, + rawPayload: event.rawPayload, + }), + ); + return; + } + case "ContentDelta": + yield* emitContentDelta(context, event); + return; + case "UsageUpdated": + yield* emitUsage( + context, + { + usedTokens: event.usage.usedTokens, + lastUsedTokens: event.usage.usedTokens, + ...(event.usage.maxTokens !== undefined + ? { maxTokens: event.usage.maxTokens } + : {}), + compactsAutomatically: true, + }, + context.turnState?.turnId, + event.rawPayload, + ); + return; + case "ThreadMetadataUpdated": + yield* offerRuntimeEvent({ + ...makeEventBase(context), + ...(context.turnState ? { turnId: context.turnState.turnId } : {}), + type: "thread.metadata.updated", + payload: { + name: event.name, + ...(event.metadata ? { metadata: event.metadata } : {}), + }, + raw: { + source: "acp.jsonrpc", + method: "session/update", + payload: event.rawPayload, + }, + }); + return; + } + }); + + const resolveSessionFilePath = Effect.fn("resolveSessionFilePath")(function* ( + context: GeminiSessionContext, + options?: { readonly retries?: number }, + ) { + const retries = options?.retries ?? 0; + for (let attempt = 0; attempt <= retries; attempt += 1) { + const resolvedPath = yield* Effect.tryPromise({ + try: () => findGeminiSessionFileById(context.sessionId, context.sessionFilePath), + catch: (cause) => + new ProviderAdapterProcessError({ + provider: PROVIDER, + threadId: context.threadId, + detail: `Failed to locate Gemini session file: ${toMessage(cause, "lookup failed")}`, + cause, + }), + }); + if (resolvedPath) { + context.sessionFilePath = resolvedPath; + return resolvedPath; + } + if (attempt < retries) { + yield* Effect.sleep(100); + } + } + return undefined; + }); + + const persistTurnSnapshot = Effect.fn("persistTurnSnapshot")(function* ( + context: GeminiSessionContext, + turnId: TurnId, + items: ReadonlyArray, + ) { + const storedTurnBase: GeminiStoredTurn = { + id: turnId, + items: cloneGeminiTurnItems(items), + }; + const liveSessionFilePath = yield* resolveSessionFilePath(context, { retries: 5 }); + if (!liveSessionFilePath) { + return storedTurnBase; + } + + const snapshotSessionId = crypto.randomUUID(); + const snapshotFilePath = yield* Effect.tryPromise({ + try: () => cloneGeminiSessionFile(liveSessionFilePath, snapshotSessionId), + catch: (cause) => + new ProviderAdapterProcessError({ + provider: PROVIDER, + threadId: context.threadId, + detail: `Failed to snapshot Gemini session history: ${toMessage(cause, "snapshot failed")}`, + cause, + }), + }); + + return { + ...storedTurnBase, + snapshotSessionId, + snapshotFilePath, + } satisfies GeminiStoredTurn; + }); + + const finishTurn = ( + context: GeminiSessionContext, + result: { + readonly state: "completed" | "failed" | "cancelled" | "interrupted"; + readonly stopReason?: string | null; + readonly usage?: unknown; + readonly errorMessage?: string; + }, + options?: { + readonly persistSnapshot?: boolean; + readonly emitReadyState?: boolean; + }, + ) => + Effect.gen(function* () { + const turnState = context.turnState; + if (!turnState) { + return; + } + context.turnState = undefined; + + for (const item of turnState.items) { + if (item.itemType !== "assistant_message" || item.status !== "inProgress") { + continue; + } + item.status = result.state === "failed" ? "failed" : "completed"; + yield* offerRuntimeEvent({ + ...makeEventBase(context), + turnId: turnState.turnId, + itemId: RuntimeItemId.make(item.id), + type: "item.completed", + payload: { + itemType: "assistant_message", + status: item.status, + ...(item.title ? { title: item.title } : {}), + }, + }); + } + + if (turnState.reasoningItemId && turnState.reasoningTextStarted) { + upsertGeminiTurnItem(turnState, turnState.reasoningItemId, "reasoning", { + status: result.state === "failed" ? "failed" : "completed", + }); + yield* offerRuntimeEvent({ + ...makeEventBase(context), + turnId: turnState.turnId, + itemId: turnState.reasoningItemId, + type: "item.completed", + payload: { + itemType: "reasoning", + status: result.state === "failed" ? "failed" : "completed", + title: "Reasoning", + }, + }); + } + + if ( + !turnState.proposedPlanCaptured && + turnState.isPlanTurn && + result.state === "completed" + ) { + const planMarkdown = proposedPlanMarkdownFromGeminiTurn(turnState); + if (planMarkdown) { + turnState.proposedPlanCaptured = true; + yield* offerRuntimeEvent({ + ...makeEventBase(context), + turnId: turnState.turnId, + type: "turn.proposed.completed", + payload: { + planMarkdown, + }, + }); + } + } + + const normalizedUsage = normalizeGeminiPromptUsage(result.usage); + if (normalizedUsage) { + context.cumulativePromptUsage = accumulateGeminiPromptUsage( + context.cumulativePromptUsage, + normalizedUsage, + ); + yield* emitUsage( + context, + buildGeminiPromptUsageSnapshot( + context.lastKnownTokenUsage, + context.cumulativePromptUsage, + normalizedUsage, + ), + turnState.turnId, + result.usage, + ); + } + + yield* offerRuntimeEvent({ + ...makeEventBase(context), + turnId: turnState.turnId, + type: "turn.completed", + payload: { + state: result.state, + ...(result.stopReason !== undefined ? { stopReason: result.stopReason } : {}), + ...(result.usage !== undefined ? { usage: result.usage } : {}), + ...(result.errorMessage ? { errorMessage: result.errorMessage } : {}), + }, + }); + + if (options?.persistSnapshot !== false) { + const storedTurn = yield* persistTurnSnapshot( + context, + turnState.turnId, + turnState.items, + ).pipe( + Effect.catch((error) => + emitRuntimeWarning(context, error.message, { + method: "session/snapshot", + payload: { + message: error.message, + }, + }).pipe( + Effect.as({ + id: turnState.turnId, + items: cloneGeminiTurnItems(turnState.items), + } satisfies GeminiStoredTurn), + ), + ), + ); + + context.turns.push(storedTurn); + } + + updateGeminiSession(context, { + ...(options?.emitReadyState === false ? {} : { status: "ready" as const }), + activeTurnId: undefined, + resumeCursor: buildResumeCursor(context), + lastError: result.state === "failed" ? result.errorMessage : undefined, + }); + + if (options?.emitReadyState === false) { + return; + } + + yield* emitSessionState(context, "ready"); + }); + + const stopSessionInternal = ( + context: GeminiSessionContext, + options?: { + readonly emitExitEvent?: boolean; + }, + ) => + Effect.gen(function* () { + if (context.stopped) { + return; + } + context.stopped = true; + if (context.turnState) { + const interruptedTurnId = context.turnState.turnId; + context.interruptedTurnIds.add(interruptedTurnId); + yield* finishTurn( + context, + { + state: "interrupted", + stopReason: "cancelled", + }, + { + persistSnapshot: false, + emitReadyState: false, + }, + ); + } + yield* settlePendingApprovalsAsCancelled(context.pendingApprovals); + if (context.notificationFiber) { + yield* Fiber.interrupt(context.notificationFiber); + } + yield* Effect.ignore(Scope.close(context.scope, Exit.void)); + cleanupGeminiSystemSettings(context.systemSettingsPath); + context.systemSettingsPath = undefined; + updateGeminiSession(context, { + status: "closed", + activeTurnId: undefined, + }); + if (sessions.get(context.threadId) === context) { + sessions.delete(context.threadId); + } + if (options?.emitExitEvent === false) { + return; + } + yield* offerRuntimeEvent({ + ...makeEventBase(context), + type: "session.exited", + payload: { + exitKind: "graceful", + }, + }); + }); + + const setGeminiMode = (context: GeminiSessionContext, modeId: string) => + Effect.gen(function* () { + if (!modeId || context.currentModeId === modeId) { + return; + } + yield* context.acp + .request("session/set_mode", { + sessionId: context.sessionId, + modeId, + }) + .pipe( + Effect.mapError((cause) => + mapAcpToAdapterError(PROVIDER, context.threadId, "session/set_mode", cause), + ), + ); + context.currentModeId = modeId; + }); + + const setGeminiModel = ( + context: GeminiSessionContext, + input: { + readonly model: string; + readonly acpModelId: string; + }, + ) => + Effect.gen(function* () { + if (!input.acpModelId || context.currentModelId === input.acpModelId) { + if (context.session.model !== input.model) { + updateGeminiSession(context, { model: input.model }); + } + return; + } + yield* context.acp + .request("session/set_model", { + sessionId: context.sessionId, + modelId: input.acpModelId, + }) + .pipe( + Effect.mapError((cause) => + mapAcpToAdapterError(PROVIDER, context.threadId, "session/set_model", cause), + ), + ); + context.currentModelId = input.acpModelId; + updateGeminiSession(context, { model: input.model }); + }); + + const buildPromptBlocks = Effect.fn("buildPromptBlocks")(function* ( + input: ProviderSendTurnInput, + ) { + const blocks: Array = []; + + if (trimToUndefined(input.input)) { + blocks.push({ + type: "text", + text: trimToUndefined(input.input) as string, + }); + } + + for (const attachment of input.attachments ?? []) { + if (attachment.type !== "image") { + continue; + } + const attachmentPath = resolveAttachmentPath({ + attachmentsDir: serverConfig.attachmentsDir, + attachment, + }); + if (!attachmentPath) { + return yield* new ProviderAdapterRequestError({ + provider: PROVIDER, + method: "session/prompt", + detail: `Invalid attachment id '${attachment.id}'.`, + }); + } + const bytes = yield* fileSystem.readFile(attachmentPath).pipe( + Effect.mapError( + (cause) => + new ProviderAdapterRequestError({ + provider: PROVIDER, + method: "session/prompt", + detail: cause.message, + cause, + }), + ), + ); + blocks.push({ + type: "image", + data: Buffer.from(bytes).toString("base64"), + mimeType: attachment.mimeType, + }); + } + + return blocks; + }); + + const runPromptTurn = ( + context: GeminiSessionContext, + turnId: TurnId, + prompt: ReadonlyArray, + ) => + Effect.gen(function* () { + const promptResult = yield* Effect.result( + context.acp + .prompt({ prompt }) + .pipe( + Effect.mapError((cause) => + mapAcpToAdapterError(PROVIDER, context.threadId, "session/prompt", cause), + ), + ), + ); + if (promptResult._tag === "Failure") { + if (context.interruptedTurnIds.delete(turnId)) { + return; + } + const error = promptResult.failure; + const message = toMessage(error, "Gemini turn failed."); + yield* emitRuntimeError(context, message, error, turnId); + yield* finishTurn(context, { + state: "failed", + errorMessage: message, + }); + return; + } + + const response = promptResult.success; + const responseRecord = asRecord(response); + const stopReason = + typeof responseRecord?.stopReason === "string" ? responseRecord.stopReason : null; + if (context.interruptedTurnIds.delete(turnId)) { + return; + } + // Let queued ACP session updates land on the notification fiber before + // finalizing the turn so derived plan/message state is complete. + yield* Effect.sleep("10 millis"); + yield* finishTurn(context, { + state: stopReason === "cancelled" ? "cancelled" : "completed", + stopReason, + usage: responseRecord?.usage, + }); + }); + + const createGeminiSessionContext = (input: { + readonly threadId: ThreadId; + readonly runtimeMode: ProviderSession["runtimeMode"]; + readonly runtimeModeId: string; + readonly cwd: string; + readonly binaryPath: string; + readonly env?: NodeJS.ProcessEnv | Readonly>; + readonly turns?: ReadonlyArray; + readonly resumeSessionId?: string; + readonly allowResumeFallback?: boolean; + readonly selectedModel?: string; + readonly selectedApiModelId?: string; + readonly sessionFilePath?: string; + readonly systemSettingsPath?: string; + }) => + Effect.gen(function* () { + const pendingApprovals = new Map(); + const sessionScope = yield* Scope.make("sequential"); + let sessionScopeTransferred = false; + yield* Effect.addFinalizer(() => + sessionScopeTransferred + ? Effect.void + : Scope.close(sessionScope, Exit.void).pipe( + Effect.tap(() => + Effect.sync(() => cleanupGeminiSystemSettings(input.systemSettingsPath)), + ), + Effect.ignore, + ), + ); + + let context!: GeminiSessionContext; + const spawnEnv = toSpawnEnv(input.env); + const acp = yield* makeGeminiAcpRuntime({ + childProcessSpawner, + binaryPath: input.binaryPath, + cwd: input.cwd, + approvalMode: getGeminiCliApprovalModeFlag(input.runtimeModeId), + ...(spawnEnv ? { env: spawnEnv } : {}), + ...(input.resumeSessionId ? { resumeSessionId: input.resumeSessionId } : {}), + ...(input.allowResumeFallback !== undefined + ? { allowResumeFallback: input.allowResumeFallback } + : {}), + clientInfo: { name: "t3-code", version: "0.0.0" }, + clientCapabilities: { + fs: { readTextFile: false, writeTextFile: false }, + terminal: false, + auth: { terminal: false }, + }, + ...makeAcpNativeLoggers({ + nativeEventLogger, + provider: PROVIDER, + threadId: input.threadId, + }), + }).pipe( + Effect.provideService(Scope.Scope, sessionScope), + Effect.mapError( + (cause) => + new ProviderAdapterProcessError({ + provider: PROVIDER, + threadId: input.threadId, + detail: cause.message, + cause, + }), + ), + ); + + yield* acp.handleRequestPermission((params) => + Effect.gen(function* () { + const permissionRequest = parsePermissionRequest(params); + const approvalRequestId = ApprovalRequestId.make( + `gemini-approval-${crypto.randomUUID()}`, + ); + const runtimeRequestId = RuntimeRequestId.make(approvalRequestId); + const decision = yield* Deferred.make(); + pendingApprovals.set(approvalRequestId, { decision }); + const detail = isAskUserPermissionRequest(permissionRequest) + ? "Gemini CLI requested user input, but Gemini ACP did not include the question payload. Accepting this request will continue with an empty answer set." + : (permissionRequest.detail ?? String(params).slice(0, 2000)); + + yield* offerRuntimeEvent( + makeAcpRequestOpenedEvent({ + stamp: makeEventStamp(), + provider: PROVIDER, + threadId: input.threadId, + turnId: context?.turnState?.turnId, + requestId: runtimeRequestId, + permissionRequest, + detail, + args: { + ...(permissionRequest.toolCall ? { toolCall: permissionRequest.toolCall } : {}), + options: params.options, + }, + source: "acp.jsonrpc", + method: "session/request_permission", + rawPayload: params, + }), + ); + + const resolved = yield* Deferred.await(decision); + pendingApprovals.delete(approvalRequestId); + + yield* offerRuntimeEvent( + makeAcpRequestResolvedEvent({ + stamp: makeEventStamp(), + provider: PROVIDER, + threadId: input.threadId, + turnId: context?.turnState?.turnId, + requestId: runtimeRequestId, + permissionRequest, + decision: resolved, + }), + ); + + return permissionOutcomeFromGeminiOptions(resolved, params.options); + }), + ); + + const started = yield* acp.start().pipe( + Effect.mapError( + (cause) => + new ProviderAdapterProcessError({ + provider: PROVIDER, + threadId: input.threadId, + detail: cause.message, + cause, + }), + ), + ); + + const now = DateTime.formatIso(yield* DateTime.now); + const sessionSetupRecord = asRecord(started.sessionSetupResult); + context = { + threadId: input.threadId, + session: { + provider: PROVIDER, + status: "connecting", + runtimeMode: input.runtimeMode, + cwd: input.cwd, + threadId: input.threadId, + createdAt: now, + updatedAt: now, + }, + scope: sessionScope, + acp, + notificationFiber: undefined, + pendingApprovals, + turns: (input.turns ?? []).map(cloneGeminiStoredTurn), + runtimeModeId: input.runtimeModeId, + sessionId: started.sessionId, + currentModeId: trimToUndefined(asRecord(sessionSetupRecord?.modes)?.currentModeId), + currentModelId: trimToUndefined(asRecord(sessionSetupRecord?.models)?.currentModelId), + turnState: undefined, + sessionFilePath: input.sessionFilePath, + systemSettingsPath: input.systemSettingsPath, + stopped: false, + interruptedTurnIds: new Set(), + lastKnownTokenUsage: undefined, + cumulativePromptUsage: undefined, + }; + + context.notificationFiber = yield* Stream.runDrain( + Stream.mapEffect(acp.getEvents(), (event) => handleAcpEvent(context, event)), + ).pipe(Effect.forkChild); + + yield* setGeminiMode(context, input.runtimeModeId); + if (input.selectedModel) { + yield* setGeminiModel(context, { + model: input.selectedModel, + acpModelId: input.selectedApiModelId ?? input.selectedModel, + }); + } + + updateGeminiSession(context, { + status: "ready", + ...(input.selectedModel + ? { model: input.selectedModel } + : context.currentModelId + ? { model: context.currentModelId } + : {}), + resumeCursor: buildResumeCursor(context), + }); + + sessionScopeTransferred = true; + return context; + }); + + const startSession: GeminiAdapterShape["startSession"] = (input) => + withThreadLock( + input.threadId, + Effect.gen(function* () { + if (input.provider && input.provider !== PROVIDER) { + return yield* new ProviderAdapterValidationError({ + provider: PROVIDER, + operation: "startSession", + issue: `Expected provider '${PROVIDER}' but received '${input.provider}'.`, + }); + } + + const existing = sessions.get(input.threadId); + if (existing && !existing.stopped) { + yield* stopSessionInternal(existing, { emitExitEvent: false }); + } + + const geminiSettings = yield* getGeminiSettings(input.threadId); + const binaryPath = resolveGeminiBinaryPath(geminiSettings.binaryPath); + const cwd = path.resolve(input.cwd ?? process.cwd()); + const runtimeModeId = runtimeModeToGeminiModeId(input.runtimeMode); + const selectedGeminiModel = input.modelSelection ? input.modelSelection.model : undefined; + const selectedApiModelId = input.modelSelection + ? resolveGeminiApiModelId(input.modelSelection.model, input.modelSelection.options) + : undefined; + const requestedResumeSessionId = readGeminiResumeSessionId(input.resumeCursor); + const resumeTurns = readLegacyGeminiResumeTurns(input.resumeCursor); + const launchConfig = yield* prepareGeminiLaunchConfig({ + threadId: input.threadId, + ...(selectedGeminiModel ? { selectedModel: selectedGeminiModel } : {}), + }); + + const context = yield* createGeminiSessionContext({ + threadId: input.threadId, + runtimeMode: input.runtimeMode, + runtimeModeId, + cwd, + binaryPath, + env: { ...launchEnvironment, ...(launchConfig.env ?? {}) }, + turns: resumeTurns, + ...(requestedResumeSessionId ? { resumeSessionId: requestedResumeSessionId } : {}), + allowResumeFallback: true, + ...(selectedGeminiModel ? { selectedModel: selectedGeminiModel } : {}), + ...(selectedApiModelId ? { selectedApiModelId } : {}), + ...(launchConfig.systemSettingsPath + ? { systemSettingsPath: launchConfig.systemSettingsPath } + : {}), + }); + + sessions.set(input.threadId, context); + + yield* offerRuntimeEvent({ + ...makeEventBase(context), + type: "session.started", + payload: input.resumeCursor !== undefined ? { resume: input.resumeCursor } : {}, + }); + yield* offerRuntimeEvent({ + ...makeEventBase(context), + type: "session.configured", + payload: { + config: { + cwd, + modeId: context.currentModeId ?? runtimeModeId, + ...(context.session.model ? { model: context.session.model } : {}), + }, + }, + }); + yield* emitSessionState(context, "ready"); + yield* offerRuntimeEvent({ + ...makeEventBase(context), + type: "thread.started", + payload: { + providerThreadId: context.sessionId, + }, + }); + + return context.session; + }).pipe(Effect.scoped), + ); + + const sendTurn: GeminiAdapterShape["sendTurn"] = (input) => + withThreadLock( + input.threadId, + Effect.gen(function* () { + const context = yield* requireSession(input.threadId); + if (context.turnState) { + return yield* new ProviderAdapterValidationError({ + provider: PROVIDER, + operation: "sendTurn", + issue: "A Gemini turn is already in progress for this thread.", + }); + } + + if (input.modelSelection) { + yield* setGeminiModel(context, { + model: input.modelSelection.model, + acpModelId: resolveGeminiApiModelId( + input.modelSelection.model, + input.modelSelection.options, + ), + }); + } + + const requestedModeId = resolveRequestedGeminiModeId({ + interactionMode: input.interactionMode, + runtimeModeId: context.runtimeModeId, + currentModeId: context.currentModeId, + }); + if (requestedModeId) { + yield* setGeminiMode(context, requestedModeId); + } + + const prompt = yield* buildPromptBlocks(input); + if (prompt.length === 0) { + return yield* new ProviderAdapterValidationError({ + provider: PROVIDER, + operation: "sendTurn", + issue: "Either input text or at least one attachment is required.", + }); + } + + const turnId = TurnId.make(crypto.randomUUID()); + context.turnState = { + turnId, + isPlanTurn: context.currentModeId === "plan", + reasoningItemId: undefined, + items: [], + reasoningTextStarted: false, + latestPlanUpdate: undefined, + proposedPlanCaptured: false, + }; + updateGeminiSession(context, { + status: "running", + activeTurnId: turnId, + lastError: undefined, + }); + + yield* emitSessionState(context, "running"); + yield* offerRuntimeEvent({ + ...makeEventBase(context), + turnId, + type: "turn.started", + payload: context.session.model ? { model: context.session.model } : {}, + }); + + runFork(runPromptTurn(context, turnId, prompt)); + + return { + threadId: input.threadId, + turnId, + resumeCursor: buildResumeCursor(context), + }; + }), + ); + + const interruptTurn: GeminiAdapterShape["interruptTurn"] = (threadId, turnId) => + withThreadLock( + threadId, + Effect.gen(function* () { + const context = yield* requireSession(threadId); + if (turnId && context.turnState && context.turnState.turnId !== turnId) { + return yield* new ProviderAdapterValidationError({ + provider: PROVIDER, + operation: "interruptTurn", + issue: `Turn '${turnId}' is not active for thread '${threadId}'.`, + }); + } + if (!context.turnState) { + return; + } + yield* settlePendingApprovalsAsCancelled(context.pendingApprovals); + const interruptedTurnId = context.turnState.turnId; + // ACP cancellation is a fire-and-forget notification. Gemini CLI can + // ignore it and continue running, so if the turn is still active + // after the notification is sent we finalize it locally and tear down + // the session to make the stop button deterministic. + yield* Effect.ignore(context.acp.cancel); + if (context.turnState?.turnId !== interruptedTurnId) { + return; + } + context.interruptedTurnIds.add(interruptedTurnId); + yield* finishTurn( + context, + { + state: "interrupted", + stopReason: "cancelled", + }, + { + persistSnapshot: false, + emitReadyState: false, + }, + ); + yield* stopSessionInternal(context); + }), + ); + + const respondToRequest: GeminiAdapterShape["respondToRequest"] = ( + threadId, + requestId, + decision, + ) => + withThreadLock( + threadId, + Effect.gen(function* () { + const context = yield* requireSession(threadId); + const pending = context.pendingApprovals.get(requestId); + if (!pending) { + return yield* new ProviderAdapterValidationError({ + provider: PROVIDER, + operation: "respondToRequest", + issue: `Unknown Gemini approval request '${requestId}'.`, + }); + } + yield* Deferred.succeed(pending.decision, decision); + }), + ); + + const respondToUserInput: GeminiAdapterShape["respondToUserInput"] = ( + _threadId, + _requestId, + _answers, + ) => + Effect.fail( + new ProviderAdapterValidationError({ + provider: PROVIDER, + operation: "respondToUserInput", + issue: + "Gemini ACP does not expose structured user-input answers. Gemini Ask User requests can only be approved or declined.", + }), + ); + + const stopSession: GeminiAdapterShape["stopSession"] = (threadId) => + withThreadLock( + threadId, + Effect.gen(function* () { + const context = yield* requireSession(threadId); + yield* stopSessionInternal(context); + }), + ); + + const listSessions: GeminiAdapterShape["listSessions"] = () => + Effect.sync(() => + Array.from(sessions.values()) + .filter((context) => !context.stopped) + .map((context) => Object.assign({}, context.session)), + ); + + const hasSession: GeminiAdapterShape["hasSession"] = (threadId) => + Effect.sync(() => { + const context = sessions.get(threadId); + return context !== undefined && !context.stopped; + }); + + const readThread: GeminiAdapterShape["readThread"] = (threadId) => + Effect.gen(function* () { + const context = yield* requireSession(threadId); + return snapshotThread(context); + }); + + const rollbackThread: GeminiAdapterShape["rollbackThread"] = (threadId, numTurns) => + withThreadLock( + threadId, + Effect.gen(function* () { + const context = yield* requireSession(threadId); + if (!Number.isInteger(numTurns) || numTurns < 1) { + return yield* new ProviderAdapterValidationError({ + provider: PROVIDER, + operation: "rollbackThread", + issue: "numTurns must be an integer >= 1.", + }); + } + if (context.turnState) { + return yield* new ProviderAdapterValidationError({ + provider: PROVIDER, + operation: "rollbackThread", + issue: "Cannot roll back a Gemini thread while a turn is in progress.", + }); + } + + const nextLength = Math.max(0, context.turns.length - numTurns); + const nextTurns = context.turns.slice(0, nextLength).map(cloneGeminiStoredTurn); + const cwd = context.session.cwd ?? process.cwd(); + const geminiSettings = yield* getGeminiSettings(threadId); + + let resumeSessionId: string | undefined; + let sessionFilePath: string | undefined; + if (nextLength > 0) { + const targetTurn = nextTurns[nextLength - 1]; + if (!targetTurn?.snapshotSessionId) { + return yield* new ProviderAdapterValidationError({ + provider: PROVIDER, + operation: "rollbackThread", + issue: "Gemini session snapshot is unavailable for the requested rollback target.", + }); + } + const targetSnapshotSessionId = targetTurn.snapshotSessionId; + + const sourceSnapshotPath = yield* Effect.tryPromise({ + try: () => + findGeminiSessionFileById(targetSnapshotSessionId, targetTurn.snapshotFilePath), + catch: (cause) => + new ProviderAdapterProcessError({ + provider: PROVIDER, + threadId, + detail: `Failed to locate Gemini rollback snapshot: ${toMessage(cause, "lookup failed")}`, + cause, + }), + }); + if (!sourceSnapshotPath) { + return yield* new ProviderAdapterValidationError({ + provider: PROVIDER, + operation: "rollbackThread", + issue: "Gemini rollback snapshot file could not be found.", + }); + } + + resumeSessionId = crypto.randomUUID(); + sessionFilePath = yield* Effect.tryPromise({ + try: () => cloneGeminiSessionFile(sourceSnapshotPath, resumeSessionId as string), + catch: (cause) => + new ProviderAdapterProcessError({ + provider: PROVIDER, + threadId, + detail: `Failed to restore Gemini rollback snapshot: ${toMessage(cause, "restore failed")}`, + cause, + }), + }); + } + + const launchConfig = yield* prepareGeminiLaunchConfig({ + threadId, + ...(context.session.model ? { selectedModel: context.session.model } : {}), + }); + const binaryPath = resolveGeminiBinaryPath(geminiSettings.binaryPath); + + let nextContextRegistered = false; + const nextContext = yield* createGeminiSessionContext({ + threadId, + runtimeMode: context.session.runtimeMode, + runtimeModeId: context.runtimeModeId, + cwd, + binaryPath, + env: { ...launchEnvironment, ...(launchConfig.env ?? {}) }, + turns: nextTurns, + ...(resumeSessionId ? { resumeSessionId } : {}), + allowResumeFallback: false, + ...(context.session.model ? { selectedModel: context.session.model } : {}), + ...(context.currentModelId ? { selectedApiModelId: context.currentModelId } : {}), + ...(sessionFilePath ? { sessionFilePath } : {}), + ...(launchConfig.systemSettingsPath + ? { systemSettingsPath: launchConfig.systemSettingsPath } + : {}), + }).pipe( + Effect.tap((createdContext) => + Effect.addFinalizer(() => + nextContextRegistered + ? Effect.void + : stopSessionInternal(createdContext, { emitExitEvent: false }).pipe( + Effect.ignore, + ), + ), + ), + Effect.uninterruptible, + ); + + yield* stopSessionInternal(context, { emitExitEvent: false }); + yield* Effect.sync(() => { + sessions.set(threadId, nextContext); + nextContextRegistered = true; + }); + + return snapshotThread(nextContext); + }).pipe(Effect.scoped), + ); + + const stopAll: GeminiAdapterShape["stopAll"] = () => + Effect.forEach(sessions.values(), (context) => stopSessionInternal(context), { + discard: true, + }).pipe(Effect.asVoid); + + yield* Effect.addFinalizer(() => + Effect.forEach(sessions.values(), (context) => stopSessionInternal(context), { + discard: true, + }).pipe( + Effect.tap(() => managedNativeEventLogger?.close() ?? Effect.void), + Effect.tap(() => Queue.shutdown(runtimeEventQueue)), + ), + ); + + return { + provider: PROVIDER, + capabilities: { + sessionModelSwitch: "in-session", + }, + startSession, + sendTurn, + interruptTurn, + respondToRequest, + respondToUserInput, + stopSession, + listSessions, + hasSession, + readThread, + rollbackThread, + stopAll, + get streamEvents() { + return Stream.fromQueue(runtimeEventQueue); + }, + } satisfies GeminiAdapterShape; + }); +} + +export function makeGeminiAdapterLive(options?: GeminiAdapterLiveOptions) { + return Layer.effect( + GeminiAdapter, + Effect.gen(function* () { + const serverSettings = yield* ServerSettingsService; + const settings = yield* serverSettings.getSettings; + return yield* makeGeminiAdapter(settings.providers.gemini, options); + }), + ); +} diff --git a/apps/server/src/provider/Layers/GeminiProvider.test.ts b/apps/server/src/provider/Layers/GeminiProvider.test.ts new file mode 100644 index 00000000000..ee941bab6b2 --- /dev/null +++ b/apps/server/src/provider/Layers/GeminiProvider.test.ts @@ -0,0 +1,294 @@ +import { describe, it, assert } from "@effect/vitest"; +import * as Effect from "effect/Effect"; +import * as Layer from "effect/Layer"; +import * as Sink from "effect/Sink"; +import * as Stream from "effect/Stream"; +import { ChildProcessSpawner } from "effect/unstable/process"; + +import { checkGeminiProviderStatus } from "./GeminiProvider.ts"; +import { + DEFAULT_GEMINI_MODEL_CAPABILITIES, + GEMINI_2_5_MODEL_CAPABILITIES, + GEMINI_3_MODEL_CAPABILITIES, + geminiCapabilitiesForModel, + parseGeminiAcpProbeError, + parseGeminiDiscoveredModels, +} from "../geminiAcpProbe.ts"; +import { ServerConfig, type ServerConfigShape } from "../../config.ts"; + +const encoder = new TextEncoder(); + +function mockHandle(result: { stdout: string; stderr: string; code: number }) { + return ChildProcessSpawner.makeHandle({ + pid: ChildProcessSpawner.ProcessId(1), + exitCode: Effect.succeed(ChildProcessSpawner.ExitCode(result.code)), + isRunning: Effect.succeed(false), + kill: () => Effect.void, + unref: Effect.succeed(Effect.void), + stdin: Sink.drain, + stdout: Stream.make(encoder.encode(result.stdout)), + stderr: Stream.make(encoder.encode(result.stderr)), + all: Stream.empty, + getInputFd: () => Sink.drain, + getOutputFd: () => Stream.empty, + }); +} + +function mockSpawnerLayer( + handler: (input: { command: string; args: ReadonlyArray }) => { + stdout: string; + stderr: string; + code: number; + }, +) { + return Layer.succeed( + ChildProcessSpawner.ChildProcessSpawner, + ChildProcessSpawner.make((command) => { + const cmd = command as unknown as { command: string; args: ReadonlyArray }; + return Effect.succeed(mockHandle(handler({ command: cmd.command, args: cmd.args }))); + }), + ); +} + +function makeServerConfigLayer(cwd: string) { + return Layer.succeed(ServerConfig, { + logLevel: "Error", + traceMinLevel: "Info", + traceTimingEnabled: true, + traceBatchWindowMs: 200, + traceMaxBytes: 10 * 1024 * 1024, + traceMaxFiles: 10, + otlpTracesUrl: undefined, + otlpMetricsUrl: undefined, + otlpExportIntervalMs: 10_000, + otlpServiceName: "t3-server", + mode: "web", + port: 0, + host: undefined, + cwd, + baseDir: "/tmp/t3code-gemini-provider-test", + staticDir: undefined, + devUrl: undefined, + noBrowser: false, + startupPresentation: "browser", + desktopBootstrapToken: undefined, + autoBootstrapProjectFromCwd: false, + logWebSocketEvents: false, + stateDir: "/tmp/t3code-gemini-provider-test/state", + dbPath: "/tmp/t3code-gemini-provider-test/state/state.sqlite", + keybindingsConfigPath: "/tmp/t3code-gemini-provider-test/state/keybindings.json", + settingsPath: "/tmp/t3code-gemini-provider-test/state/settings.json", + providerStatusCacheDir: "/tmp/t3code-gemini-provider-test/caches", + worktreesDir: "/tmp/t3code-gemini-provider-test/worktrees", + attachmentsDir: "/tmp/t3code-gemini-provider-test/state/attachments", + logsDir: "/tmp/t3code-gemini-provider-test/state/logs", + serverLogPath: "/tmp/t3code-gemini-provider-test/state/logs/server.log", + serverTracePath: "/tmp/t3code-gemini-provider-test/state/logs/server.trace.ndjson", + providerLogsDir: "/tmp/t3code-gemini-provider-test/state/logs/provider", + providerEventLogPath: "/tmp/t3code-gemini-provider-test/state/logs/provider/events.log", + terminalLogsDir: "/tmp/t3code-gemini-provider-test/state/logs/terminals", + anonymousIdPath: "/tmp/t3code-gemini-provider-test/state/anonymous-id", + environmentIdPath: "/tmp/t3code-gemini-provider-test/state/environment-id", + serverRuntimeStatePath: "/tmp/t3code-gemini-provider-test/state/server-runtime.json", + secretsDir: "/tmp/t3code-gemini-provider-test/state/secrets", + tailscaleServeEnabled: false, + tailscaleServePort: 0, + } satisfies ServerConfigShape); +} + +describe("parseGeminiDiscoveredModels", () => { + it("deduplicates discovered Gemini models and ignores malformed entries", () => { + const models = parseGeminiDiscoveredModels({ + models: { + availableModels: [ + { modelId: "auto-gemini-3", name: "Auto (Gemini 3)" }, + { modelId: "gemini-2.5-pro" }, + { modelId: "auto-gemini-3", name: "Ignored duplicate" }, + { name: "Missing model id" }, + { modelId: " " }, + ], + }, + }); + + assert.deepStrictEqual(models, [ + { + slug: "auto-gemini-3", + name: "Auto (Gemini 3)", + isCustom: false, + capabilities: GEMINI_3_MODEL_CAPABILITIES, + }, + { + slug: "gemini-2.5-pro", + name: "Gemini 2.5 Pro", + isCustom: false, + capabilities: GEMINI_2_5_MODEL_CAPABILITIES, + }, + ]); + }); +}); + +describe("geminiCapabilitiesForModel", () => { + it("classifies Gemini 3 and Gemini 2.5 families for thinking controls", () => { + assert.deepStrictEqual( + geminiCapabilitiesForModel("auto-gemini-3"), + GEMINI_3_MODEL_CAPABILITIES, + ); + assert.deepStrictEqual( + geminiCapabilitiesForModel("gemini-2.5-flash-lite"), + GEMINI_2_5_MODEL_CAPABILITIES, + ); + assert.deepStrictEqual( + geminiCapabilitiesForModel("custom-model"), + DEFAULT_GEMINI_MODEL_CAPABILITIES, + ); + }); +}); + +describe("parseGeminiAcpProbeError", () => { + it("maps Gemini ACP auth errors to an unauthenticated provider state", () => { + const parsed = parseGeminiAcpProbeError({ + code: -32_000, + message: "Authentication required: Gemini API key is missing or not configured.", + }); + + assert.strictEqual(parsed.status, "error"); + assert.strictEqual(parsed.auth.status, "unauthenticated"); + assert.strictEqual( + parsed.message, + "Gemini is not authenticated. Authentication required: Gemini API key is missing or not configured.", + ); + }); +}); + +describe("checkGeminiProviderStatus", () => { + it.effect("falls back to the system gemini binary when the configured path is blank", () => { + const commands: string[] = []; + const probedBinaryPaths: string[] = []; + const probedCwds: string[] = []; + const projectCwd = "/tmp/t3code-gemini-project"; + + return Effect.gen(function* () { + const status = yield* checkGeminiProviderStatus( + { enabled: true, binaryPath: "", customModels: [] }, + undefined, + (input) => { + probedBinaryPaths.push(input.binaryPath); + probedCwds.push(input.cwd); + return Effect.succeed({ + status: "ready" as const, + auth: { status: "authenticated" as const }, + message: "Gemini CLI is installed and authenticated.", + models: [], + }); + }, + ); + + assert.strictEqual(status.status, "ready"); + assert.deepStrictEqual(commands, ["gemini"]); + assert.deepStrictEqual(probedBinaryPaths, ["gemini"]); + assert.deepStrictEqual(probedCwds, [projectCwd]); + }).pipe( + Effect.provide( + Layer.mergeAll( + makeServerConfigLayer(projectCwd), + mockSpawnerLayer(({ command, args }) => { + commands.push(command); + const joined = args.join(" "); + if (joined === "--version") { + return { stdout: "gemini 0.37.1\n", stderr: "", code: 0 }; + } + throw new Error(`Unexpected args: ${joined}`); + }), + ), + ), + ); + }); + + it.effect("publishes Gemini models discovered from ACP and merges custom models", () => + Effect.gen(function* () { + const status = yield* checkGeminiProviderStatus( + { + enabled: true, + binaryPath: "gemini", + customModels: ["gemini-custom-preview", "auto-gemini-next"], + }, + undefined, + () => + Effect.succeed({ + status: "ready" as const, + auth: { status: "authenticated" as const }, + message: "Gemini CLI is installed and authenticated.", + models: [ + { + slug: "auto-gemini-next", + name: "Auto (Gemini Next)", + isCustom: false, + capabilities: DEFAULT_GEMINI_MODEL_CAPABILITIES, + }, + { + slug: "gemini-4-pro", + name: "Gemini 4 Pro", + isCustom: false, + capabilities: DEFAULT_GEMINI_MODEL_CAPABILITIES, + }, + ], + }), + ); + + assert.strictEqual(status.status, "ready"); + assert.strictEqual(status.auth.status, "authenticated"); + assert.deepStrictEqual( + status.models.map((model) => model.slug), + ["auto-gemini-next", "gemini-4-pro", "gemini-custom-preview"], + ); + }).pipe( + Effect.provide( + Layer.mergeAll( + makeServerConfigLayer("/tmp/t3code-gemini-models"), + mockSpawnerLayer(({ args }) => { + const joined = args.join(" "); + if (joined === "--version") { + return { stdout: "gemini 0.37.1\n", stderr: "", code: 0 }; + } + throw new Error(`Unexpected args: ${joined}`); + }), + ), + ), + ), + ); + + it.effect( + "does not fall back to a hardcoded Gemini model list when ACP discovery is unavailable", + () => + Effect.gen(function* () { + const status = yield* checkGeminiProviderStatus( + { enabled: true, binaryPath: "gemini", customModels: [] }, + undefined, + () => + Effect.succeed({ + status: "warning" as const, + auth: { status: "unknown" as const }, + message: + "Gemini CLI is installed, but T3 Code could not verify authentication or discover models. Timed out while starting Gemini ACP session.", + models: [], + }), + ); + + assert.strictEqual(status.status, "warning"); + assert.strictEqual(status.models.length, 0); + }).pipe( + Effect.provide( + Layer.mergeAll( + makeServerConfigLayer("/tmp/t3code-gemini-unavailable"), + mockSpawnerLayer(({ args }) => { + const joined = args.join(" "); + if (joined === "--version") { + return { stdout: "gemini 0.37.1\n", stderr: "", code: 0 }; + } + throw new Error(`Unexpected args: ${joined}`); + }), + ), + ), + ), + ); +}); diff --git a/apps/server/src/provider/Layers/GeminiProvider.ts b/apps/server/src/provider/Layers/GeminiProvider.ts new file mode 100644 index 00000000000..b60359e09ea --- /dev/null +++ b/apps/server/src/provider/Layers/GeminiProvider.ts @@ -0,0 +1,215 @@ +import { ProviderDriverKind, type GeminiSettings } from "@t3tools/contracts"; +import { formatGeminiModelDisplayName } from "@t3tools/shared/gemini"; +import * as DateTime from "effect/DateTime"; +import * as Effect from "effect/Effect"; +import * as Option from "effect/Option"; +import * as Result from "effect/Result"; +import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; + +import { + buildServerProvider, + DEFAULT_TIMEOUT_MS, + detailFromResult, + isCommandMissingCause, + parseGenericCliVersion, + providerModelsFromSettings, + spawnAndCollect, +} from "../providerSnapshot.ts"; +import { + DEFAULT_GEMINI_MODEL_CAPABILITIES, + probeGeminiCapabilities, + type GeminiCapabilityProbeResult, +} from "../geminiAcpProbe.ts"; +import { resolveGeminiBinaryPath } from "../geminiBinaryPath.ts"; +import { ServerConfig } from "../../config.ts"; +import type { ServerProviderDraft } from "../providerSnapshot.ts"; + +const PROVIDER = ProviderDriverKind.make("gemini"); +const GEMINI_PRESENTATION = { + displayName: "Gemini", + showInteractionModeToggle: true, +} as const; + +const runGeminiCommand = Effect.fn("runGeminiCommand")(function* ( + geminiSettings: GeminiSettings, + args: ReadonlyArray, + environment?: NodeJS.ProcessEnv, +) { + const binaryPath = resolveGeminiBinaryPath(geminiSettings.binaryPath); + const command = ChildProcess.make(binaryPath, [...args], { + ...(environment ? { env: environment } : {}), + shell: process.platform === "win32", + }); + return yield* spawnAndCollect(binaryPath, command); +}); + +export const checkGeminiProviderStatus = Effect.fn("checkGeminiProviderStatus")(function* ( + geminiSettings: GeminiSettings, + environment?: NodeJS.ProcessEnv, + resolveCapabilities?: (input: { + readonly binaryPath: string; + readonly cwd: string; + }) => Effect.Effect, +): Effect.fn.Return< + ServerProviderDraft, + never, + ChildProcessSpawner.ChildProcessSpawner | ServerConfig +> { + const serverConfig = yield* ServerConfig; + const checkedAt = DateTime.formatIso(yield* DateTime.now); + const fallbackModels = providerModelsFromSettings( + [], + PROVIDER, + geminiSettings.customModels, + DEFAULT_GEMINI_MODEL_CAPABILITIES, + { formatCustomModelName: formatGeminiModelDisplayName }, + ); + + if (!geminiSettings.enabled) { + return buildServerProvider({ + presentation: GEMINI_PRESENTATION, + enabled: false, + checkedAt, + models: fallbackModels, + probe: { + installed: false, + version: null, + status: "warning", + auth: { status: "unknown" }, + message: "Gemini is disabled in T3 Code settings.", + }, + }); + } + + const binaryPath = resolveGeminiBinaryPath(geminiSettings.binaryPath); + const versionProbe = yield* runGeminiCommand(geminiSettings, ["--version"], environment).pipe( + Effect.timeoutOption(DEFAULT_TIMEOUT_MS), + Effect.result, + ); + + if (Result.isFailure(versionProbe)) { + const error = versionProbe.failure; + return buildServerProvider({ + presentation: GEMINI_PRESENTATION, + enabled: geminiSettings.enabled, + checkedAt, + models: fallbackModels, + probe: { + installed: !isCommandMissingCause(error), + version: null, + status: "error", + auth: { status: "unknown" }, + message: isCommandMissingCause(error) + ? "Gemini CLI (`gemini`) is not installed or not on PATH." + : `Failed to execute Gemini CLI health check: ${error instanceof Error ? error.message : String(error)}.`, + }, + }); + } + + if (Option.isNone(versionProbe.success)) { + return buildServerProvider({ + presentation: GEMINI_PRESENTATION, + enabled: geminiSettings.enabled, + checkedAt, + models: fallbackModels, + probe: { + installed: true, + version: null, + status: "error", + auth: { status: "unknown" }, + message: "Gemini CLI is installed but failed to run. Timed out while running command.", + }, + }); + } + + const version = versionProbe.success.value; + const parsedVersion = parseGenericCliVersion(`${version.stdout}\n${version.stderr}`); + if (version.code !== 0) { + const detail = detailFromResult(version); + return buildServerProvider({ + presentation: GEMINI_PRESENTATION, + enabled: geminiSettings.enabled, + checkedAt, + models: fallbackModels, + probe: { + installed: true, + version: parsedVersion, + status: "error", + auth: { status: "unknown" }, + message: detail + ? `Gemini CLI is installed but failed to run. ${detail}` + : "Gemini CLI is installed but failed to run.", + }, + }); + } + + const capabilityProbe = yield* (resolveCapabilities ?? probeGeminiCapabilities)({ + binaryPath, + cwd: serverConfig.cwd, + }); + const models = providerModelsFromSettings( + capabilityProbe.models, + PROVIDER, + geminiSettings.customModels, + DEFAULT_GEMINI_MODEL_CAPABILITIES, + { formatCustomModelName: formatGeminiModelDisplayName }, + ); + + return buildServerProvider({ + presentation: GEMINI_PRESENTATION, + enabled: geminiSettings.enabled, + checkedAt, + models, + probe: { + installed: true, + version: parsedVersion, + status: capabilityProbe.status, + auth: capabilityProbe.auth, + ...(capabilityProbe.message ? { message: capabilityProbe.message } : {}), + }, + }); +}); + +export const makePendingGeminiProvider = ( + geminiSettings: GeminiSettings, +): Effect.Effect => + Effect.gen(function* () { + const checkedAt = DateTime.formatIso(yield* DateTime.now); + const models = providerModelsFromSettings( + [], + PROVIDER, + geminiSettings.customModels, + DEFAULT_GEMINI_MODEL_CAPABILITIES, + { formatCustomModelName: formatGeminiModelDisplayName }, + ); + + if (!geminiSettings.enabled) { + return buildServerProvider({ + presentation: GEMINI_PRESENTATION, + enabled: false, + checkedAt, + models, + probe: { + installed: false, + version: null, + status: "warning", + auth: { status: "unknown" }, + message: "Gemini is disabled in T3 Code settings.", + }, + }); + } + + return buildServerProvider({ + presentation: GEMINI_PRESENTATION, + enabled: true, + checkedAt, + models, + probe: { + installed: false, + version: null, + status: "warning", + auth: { status: "unknown" }, + message: "Gemini provider status has not been checked in this session yet.", + }, + }); + }); diff --git a/apps/server/src/provider/Layers/ProviderRegistry.test.ts b/apps/server/src/provider/Layers/ProviderRegistry.test.ts index fb6eb3b443d..d8a1c33544b 100644 --- a/apps/server/src/provider/Layers/ProviderRegistry.test.ts +++ b/apps/server/src/provider/Layers/ProviderRegistry.test.ts @@ -1300,6 +1300,7 @@ it.layer(Layer.mergeAll(NodeServices.layer, ServerSettingsService.layerTest(), T "claudeAgent", "codex", "cursor", + "gemini", "opencode", ]); assert.strictEqual(cursorProvider?.enabled, false); diff --git a/apps/server/src/provider/Layers/ProviderService.test.ts b/apps/server/src/provider/Layers/ProviderService.test.ts index fc0450b8b69..76e443df7e1 100644 --- a/apps/server/src/provider/Layers/ProviderService.test.ts +++ b/apps/server/src/provider/Layers/ProviderService.test.ts @@ -910,6 +910,48 @@ routing.layer("ProviderServiceLive routing", (it) => { }), ); + it.effect("persists the updated resume cursor after rollback on an active session", () => + Effect.gen(function* () { + const provider = yield* ProviderService; + + const session = yield* provider.startSession(asThreadId("thread-rollback-persist"), { + provider: ProviderDriverKind.make("codex"), + providerInstanceId: codexInstanceId, + threadId: asThreadId("thread-rollback-persist"), + cwd: "/tmp/project", + runtimeMode: "full-access", + }); + + const rolledBackResumeCursor = { + sessionId: "gemini-session-after-rollback", + snapshots: [ + { + turnId: "turn-after-rollback", + sessionId: "snapshot-session-after-rollback", + items: [], + }, + ], + }; + routing.codex.updateSession(session.threadId, (existing) => ({ + ...existing, + resumeCursor: rolledBackResumeCursor, + updatedAt: "2026-01-01T00:00:01.000Z", + })); + + yield* provider.rollbackConversation({ + threadId: session.threadId, + numTurns: 1, + }); + + const directory = yield* ProviderSessionDirectory; + const binding = yield* directory.getBinding(session.threadId); + assert.equal(Option.isSome(binding), true); + if (Option.isSome(binding)) { + assert.deepEqual(binding.value.resumeCursor, rolledBackResumeCursor); + } + }), + ); + it.effect("preserves the persisted binding when stopping a session", () => Effect.gen(function* () { const provider = yield* ProviderService; diff --git a/apps/server/src/provider/Layers/ProviderService.ts b/apps/server/src/provider/Layers/ProviderService.ts index 2bce1f483b7..694b77c1848 100644 --- a/apps/server/src/provider/Layers/ProviderService.ts +++ b/apps/server/src/provider/Layers/ProviderService.ts @@ -961,6 +961,18 @@ const makeProviderService = Effect.fn("makeProviderService")(function* ( "provider.rollback_turns": input.numTurns, }); yield* routed.adapter.rollbackThread(routed.threadId, input.numTurns); + const activeSessions = yield* routed.adapter.listSessions(); + const updatedSession = activeSessions.find((session) => session.threadId === routed.threadId); + if (updatedSession) { + yield* upsertSessionBinding( + { ...updatedSession, providerInstanceId: routed.instanceId }, + input.threadId, + { + lastRuntimeEvent: "provider.rollbackConversation", + lastRuntimeEventAt: yield* nowIso, + }, + ); + } yield* analytics.record("provider.conversation.rolled_back", { provider: routed.adapter.provider, turns: input.numTurns, diff --git a/apps/server/src/provider/Services/GeminiAdapter.ts b/apps/server/src/provider/Services/GeminiAdapter.ts new file mode 100644 index 00000000000..92d194c3d56 --- /dev/null +++ b/apps/server/src/provider/Services/GeminiAdapter.ts @@ -0,0 +1,28 @@ +/** + * GeminiAdapter - Gemini CLI ACP implementation of the generic provider adapter contract. + * + * This service owns Gemini ACP runtime/session semantics and emits canonical + * provider runtime events. It does not perform cross-provider routing, shared + * event fan-out, or checkpoint orchestration. + * + * @module GeminiAdapter + */ +import * as Context from "effect/Context"; + +import type { ProviderDriverKind } from "@t3tools/contracts"; +import type { ProviderAdapterError } from "../Errors.ts"; +import type { ProviderAdapterShape } from "./ProviderAdapter.ts"; + +/** + * GeminiAdapterShape - Service API for the Gemini provider adapter. + */ +export interface GeminiAdapterShape extends ProviderAdapterShape { + readonly provider: ProviderDriverKind; +} + +/** + * GeminiAdapter - Service tag for Gemini provider adapter operations. + */ +export class GeminiAdapter extends Context.Service()( + "t3/provider/Services/GeminiAdapter", +) {} diff --git a/apps/server/src/provider/Services/GeminiProvider.ts b/apps/server/src/provider/Services/GeminiProvider.ts new file mode 100644 index 00000000000..d14bad82176 --- /dev/null +++ b/apps/server/src/provider/Services/GeminiProvider.ts @@ -0,0 +1,9 @@ +import * as Context from "effect/Context"; + +import type { ServerProviderShape } from "./ServerProvider.ts"; + +export interface GeminiProviderShape extends ServerProviderShape {} + +export class GeminiProvider extends Context.Service()( + "t3/provider/Services/GeminiProvider", +) {} diff --git a/apps/server/src/provider/acp/AcpCoreRuntimeEvents.test.ts b/apps/server/src/provider/acp/AcpCoreRuntimeEvents.test.ts index 713d0668928..0e791d392b7 100644 --- a/apps/server/src/provider/acp/AcpCoreRuntimeEvents.test.ts +++ b/apps/server/src/provider/acp/AcpCoreRuntimeEvents.test.ts @@ -122,6 +122,7 @@ describe("AcpCoreRuntimeEvents", () => { provider: ProviderDriverKind.make("cursor"), threadId: "thread-1" as never, turnId, + streamKind: "assistant_text", itemId: "assistant:session-1:segment:0", text: "hello", rawPayload: { sessionId: "session-1" }, diff --git a/apps/server/src/provider/acp/AcpCoreRuntimeEvents.ts b/apps/server/src/provider/acp/AcpCoreRuntimeEvents.ts index c93e61dc37b..e58c9fe5153 100644 --- a/apps/server/src/provider/acp/AcpCoreRuntimeEvents.ts +++ b/apps/server/src/provider/acp/AcpCoreRuntimeEvents.ts @@ -218,6 +218,7 @@ export function makeAcpContentDeltaEvent(input: { readonly provider: ProviderDriverKind; readonly threadId: ThreadId; readonly turnId: TurnId | undefined; + readonly streamKind: "assistant_text" | "reasoning_text"; readonly itemId?: string; readonly text: string; readonly rawPayload: unknown; @@ -230,7 +231,7 @@ export function makeAcpContentDeltaEvent(input: { turnId: input.turnId, ...(input.itemId ? { itemId: RuntimeItemId.make(input.itemId) } : {}), payload: { - streamKind: "assistant_text", + streamKind: input.streamKind, delta: input.text, }, raw: { diff --git a/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts b/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts index 1b8f7be5d7d..73399bfe6df 100644 --- a/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts +++ b/apps/server/src/provider/acp/AcpJsonRpcConnection.test.ts @@ -112,6 +112,37 @@ describe("AcpSessionRuntime", () => { ), ); + it.effect( + "can start an ACP session without sending authenticate when auth is not required", + () => { + const requestEvents: Array = []; + return Effect.gen(function* () { + const runtime = yield* AcpSessionRuntime; + const started = yield* runtime.start(); + + expect(started.sessionId).toBe("mock-session-1"); + expect(requestEvents.some((event) => event.method === "authenticate")).toBe(false); + }).pipe( + Effect.provide( + AcpSessionRuntime.layer({ + spawn: { + command: bunExe, + args: [mockAgentPath], + }, + cwd: process.cwd(), + clientInfo: { name: "t3-test", version: "0.0.0" }, + requestLogger: (event) => + Effect.sync(() => { + requestEvents.push(event); + }), + }), + ), + Effect.scoped, + Effect.provide(NodeServices.layer), + ); + }, + ); + it.effect("segments assistant text around ACP tool calls", () => Effect.gen(function* () { const runtime = yield* AcpSessionRuntime; diff --git a/apps/server/src/provider/acp/AcpRuntimeModel.test.ts b/apps/server/src/provider/acp/AcpRuntimeModel.test.ts index ae12d3112aa..fe3745b68ee 100644 --- a/apps/server/src/provider/acp/AcpRuntimeModel.test.ts +++ b/apps/server/src/provider/acp/AcpRuntimeModel.test.ts @@ -230,6 +230,7 @@ describe("AcpRuntimeModel", () => { expect(contentResult.events).toEqual([ { _tag: "ContentDelta", + streamKind: "assistant_text", text: "hello from acp", rawPayload: { sessionId: "session-1", @@ -243,6 +244,91 @@ describe("AcpRuntimeModel", () => { }, }, ]); + + const thoughtResult = parseSessionUpdateEvent({ + sessionId: "session-1", + update: { + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: "thinking", + }, + }, + } satisfies EffectAcpSchema.SessionNotification); + + expect(thoughtResult.events).toEqual([ + { + _tag: "ContentDelta", + streamKind: "reasoning_text", + text: "thinking", + rawPayload: { + sessionId: "session-1", + update: { + sessionUpdate: "agent_thought_chunk", + content: { + type: "text", + text: "thinking", + }, + }, + }, + }, + ]); + }); + + it("projects Gemini-specific ACP usage and metadata updates", () => { + const usageResult = parseSessionUpdateEvent({ + sessionId: "session-1", + update: { + sessionUpdate: "usage_update", + used: 123, + size: 456, + }, + } satisfies EffectAcpSchema.SessionNotification); + + expect(usageResult.events).toEqual([ + { + _tag: "UsageUpdated", + usage: { + usedTokens: 123, + maxTokens: 456, + }, + rawPayload: { + sessionId: "session-1", + update: { + sessionUpdate: "usage_update", + used: 123, + size: 456, + }, + }, + }, + ]); + + const metadataResult = parseSessionUpdateEvent({ + sessionId: "session-1", + update: { + sessionUpdate: "session_info_update", + title: " Gemini thread ", + updatedAt: "2026-04-18T15:00:00.000Z", + }, + } satisfies EffectAcpSchema.SessionNotification); + + expect(metadataResult.events).toEqual([ + { + _tag: "ThreadMetadataUpdated", + name: "Gemini thread", + metadata: { + updatedAt: "2026-04-18T15:00:00.000Z", + }, + rawPayload: { + sessionId: "session-1", + update: { + sessionUpdate: "session_info_update", + title: " Gemini thread ", + updatedAt: "2026-04-18T15:00:00.000Z", + }, + }, + }, + ]); }); it("keeps permission request parsing compatible with loose extension payloads", () => { diff --git a/apps/server/src/provider/acp/AcpRuntimeModel.ts b/apps/server/src/provider/acp/AcpRuntimeModel.ts index ffd214a5bf1..57c00b63a32 100644 --- a/apps/server/src/provider/acp/AcpRuntimeModel.ts +++ b/apps/server/src/provider/acp/AcpRuntimeModel.ts @@ -66,9 +66,26 @@ export type AcpParsedSessionEvent = } | { readonly _tag: "ContentDelta"; + readonly streamKind: "assistant_text" | "reasoning_text"; readonly itemId?: string; readonly text: string; readonly rawPayload: unknown; + } + | { + readonly _tag: "UsageUpdated"; + readonly usage: { + readonly usedTokens: number; + readonly maxTokens?: number; + }; + readonly rawPayload: unknown; + } + | { + readonly _tag: "ThreadMetadataUpdated"; + readonly name: string; + readonly metadata?: { + readonly updatedAt: string; + }; + readonly rawPayload: unknown; }; type AcpSessionSetupResponse = @@ -468,12 +485,50 @@ export function parseSessionUpdateEvent(params: EffectAcpSchema.SessionNotificat if (upd.content.type === "text" && upd.content.text.length > 0) { events.push({ _tag: "ContentDelta", + streamKind: "assistant_text", + text: upd.content.text, + rawPayload: params, + }); + } + break; + } + case "agent_thought_chunk": { + if (upd.content.type === "text" && upd.content.text.length > 0) { + events.push({ + _tag: "ContentDelta", + streamKind: "reasoning_text", text: upd.content.text, rawPayload: params, }); } break; } + case "usage_update": { + if (upd.used > 0) { + events.push({ + _tag: "UsageUpdated", + usage: { + usedTokens: upd.used, + ...(upd.size > 0 ? { maxTokens: upd.size } : {}), + }, + rawPayload: params, + }); + } + break; + } + case "session_info_update": { + const name = upd.title?.trim() ?? ""; + if (name.length > 0) { + const updatedAt = upd.updatedAt?.trim(); + events.push({ + _tag: "ThreadMetadataUpdated", + name, + ...(updatedAt ? { metadata: { updatedAt } } : {}), + rawPayload: params, + }); + } + break; + } default: break; } diff --git a/apps/server/src/provider/acp/AcpSessionRuntime.ts b/apps/server/src/provider/acp/AcpSessionRuntime.ts index 8652b2cfeaf..6e681c7a424 100644 --- a/apps/server/src/provider/acp/AcpSessionRuntime.ts +++ b/apps/server/src/provider/acp/AcpSessionRuntime.ts @@ -41,12 +41,13 @@ export interface AcpSessionRuntimeOptions { readonly spawn: AcpSpawnInput; readonly cwd: string; readonly resumeSessionId?: string; + readonly allowResumeFallback?: boolean; readonly clientCapabilities?: EffectAcpSchema.InitializeRequest["clientCapabilities"]; readonly clientInfo: { readonly name: string; readonly version: string; }; - readonly authMethodId: string; + readonly authMethodId?: string; readonly requestLogger?: (event: AcpSessionRequestLogEvent) => Effect.Effect; readonly protocolLogging?: { readonly logIncoming?: boolean; @@ -378,15 +379,17 @@ const makeAcpSessionRuntime = ( acp.agent.initialize(initializePayload), ); - const authenticatePayload = { - methodId: options.authMethodId, - } satisfies EffectAcpSchema.AuthenticateRequest; + if (options.authMethodId?.trim()) { + const authenticatePayload = { + methodId: options.authMethodId, + } satisfies EffectAcpSchema.AuthenticateRequest; - yield* runLoggedRequest( - "authenticate", - authenticatePayload, - acp.agent.authenticate(authenticatePayload), - ); + yield* runLoggedRequest( + "authenticate", + authenticatePayload, + acp.agent.authenticate(authenticatePayload), + ); + } let sessionId: string; let sessionSetupResult: @@ -407,6 +410,8 @@ const makeAcpSessionRuntime = ( if (Exit.isSuccess(resumed)) { sessionId = options.resumeSessionId; sessionSetupResult = resumed.value; + } else if (options.allowResumeFallback === false) { + return yield* Effect.failCause(resumed.cause); } else { const createPayload = { cwd: options.cwd, @@ -624,6 +629,10 @@ const handleSessionUpdate = ({ continue; } if (event._tag === "ContentDelta") { + if (event.streamKind === "reasoning_text") { + yield* Queue.offer(queue, event); + continue; + } if (event.text.trim().length === 0) { const assistantSegmentState = yield* Ref.get(assistantSegmentRef); if (!assistantSegmentState.activeItemId) { diff --git a/apps/server/src/provider/acp/GeminiAcpSupport.ts b/apps/server/src/provider/acp/GeminiAcpSupport.ts new file mode 100644 index 00000000000..cb1ee6476e8 --- /dev/null +++ b/apps/server/src/provider/acp/GeminiAcpSupport.ts @@ -0,0 +1,53 @@ +import * as Effect from "effect/Effect"; +import * as Layer from "effect/Layer"; +import * as Scope from "effect/Scope"; +import { ChildProcessSpawner } from "effect/unstable/process"; +import type * as EffectAcpErrors from "effect-acp/errors"; + +import { + AcpSessionRuntime, + type AcpSessionRuntimeOptions, + type AcpSessionRuntimeShape, + type AcpSpawnInput, +} from "./AcpSessionRuntime.ts"; + +export interface GeminiAcpRuntimeInput extends Omit< + AcpSessionRuntimeOptions, + "authMethodId" | "spawn" +> { + readonly childProcessSpawner: ChildProcessSpawner.ChildProcessSpawner["Service"]; + readonly binaryPath: string; + readonly env?: Readonly>; + readonly approvalMode?: string; +} + +export function buildGeminiAcpSpawnInput(input: { + readonly binaryPath: string; + readonly cwd: string; + readonly env?: Readonly>; + readonly approvalMode?: string; +}): AcpSpawnInput { + return { + command: input.binaryPath, + args: ["--acp", ...(input.approvalMode ? [`--approval-mode=${input.approvalMode}`] : [])], + cwd: input.cwd, + ...(input.env ? { env: input.env } : {}), + }; +} + +export const makeGeminiAcpRuntime = ( + input: GeminiAcpRuntimeInput, +): Effect.Effect => + Effect.gen(function* () { + const acpContext = yield* Layer.build( + AcpSessionRuntime.layer({ + ...input, + spawn: buildGeminiAcpSpawnInput(input), + }).pipe( + Layer.provide( + Layer.succeed(ChildProcessSpawner.ChildProcessSpawner, input.childProcessSpawner), + ), + ), + ); + return yield* Effect.service(AcpSessionRuntime).pipe(Effect.provide(acpContext)); + }); diff --git a/apps/server/src/provider/builtInDrivers.ts b/apps/server/src/provider/builtInDrivers.ts index 5af56dc6b0e..edb310f0c5a 100644 --- a/apps/server/src/provider/builtInDrivers.ts +++ b/apps/server/src/provider/builtInDrivers.ts @@ -23,6 +23,7 @@ import { ClaudeDriver, type ClaudeDriverEnv } from "./Drivers/ClaudeDriver.ts"; import { CodexDriver, type CodexDriverEnv } from "./Drivers/CodexDriver.ts"; import { CursorDriver, type CursorDriverEnv } from "./Drivers/CursorDriver.ts"; +import { GeminiDriver, type GeminiDriverEnv } from "./Drivers/GeminiDriver.ts"; import { OpenCodeDriver, type OpenCodeDriverEnv } from "./Drivers/OpenCodeDriver.ts"; import type { AnyProviderDriver } from "./ProviderDriver.ts"; @@ -35,6 +36,7 @@ export type BuiltInDriversEnv = | ClaudeDriverEnv | CodexDriverEnv | CursorDriverEnv + | GeminiDriverEnv | OpenCodeDriverEnv; /** @@ -46,5 +48,6 @@ export const BUILT_IN_DRIVERS: ReadonlyArray; + readonly status: Exclude; + readonly auth: Pick; + readonly message?: string; +}; + +function formatGeminiDiscoveryWarning(detail: string): string { + return `Gemini CLI is installed, but T3 Code could not verify authentication or discover models. ${detail}`; +} + +function formatGeminiAuthMessage(detail: string): string { + return `Gemini is not authenticated. ${detail}`; +} + +export function parseGeminiAcpProbeError( + error: unknown, +): Omit { + const record = asRecord(error); + const code = asNumber(record?.code); + const message = trimToUndefined(record?.message) ?? "Gemini ACP request failed."; + const lowerMessage = message.toLowerCase(); + const unauthenticated = + code === GEMINI_ACP_AUTH_REQUIRED_CODE || + lowerMessage.includes("authentication required") || + lowerMessage.includes("api key is missing") || + lowerMessage.includes("auth method") || + lowerMessage.includes("not configured"); + + if (unauthenticated) { + return { + status: "error", + auth: { status: "unauthenticated" }, + message: formatGeminiAuthMessage(message), + }; + } + + return { + status: "warning", + auth: { status: "unknown" }, + message: formatGeminiDiscoveryWarning(message), + }; +} + +export function parseGeminiDiscoveredModels( + response: unknown, + fallbackCapabilities: ModelCapabilities = DEFAULT_GEMINI_MODEL_CAPABILITIES, +): ReadonlyArray { + const availableModels = asRecord(asRecord(response)?.models)?.availableModels; + if (!Array.isArray(availableModels)) { + return []; + } + + const discoveredModels: ServerProviderModel[] = []; + const seen = new Set(); + + for (const candidate of availableModels) { + const record = asRecord(candidate); + const slug = trimToUndefined(record?.modelId); + if (!slug || seen.has(slug)) { + continue; + } + + seen.add(slug); + const explicitName = trimToUndefined(record?.name); + discoveredModels.push({ + slug, + name: + explicitName && explicitName.toLowerCase() !== slug.toLowerCase() + ? explicitName + : formatGeminiModelDisplayName(slug), + isCustom: false, + capabilities: geminiCapabilitiesForModel(slug, fallbackCapabilities), + }); + } + + return discoveredModels; +} + +export const probeGeminiCapabilities = (input: { + readonly binaryPath: string; + readonly cwd: string; + readonly capabilities?: ModelCapabilities; +}): Effect.Effect => + Effect.scoped( + Effect.gen(function* () { + const childProcessSpawner = yield* ChildProcessSpawner.ChildProcessSpawner; + const env = yield* Effect.tryPromise({ + try: () => readGeminiLaunchEnv(), + catch: () => undefined, + }); + const runtime = yield* makeGeminiAcpRuntime({ + childProcessSpawner, + binaryPath: input.binaryPath, + cwd: input.cwd, + ...(env ? { env } : {}), + clientInfo: { name: "t3-code-provider-probe", version: "0.0.0" }, + clientCapabilities: { + fs: { readTextFile: false, writeTextFile: false }, + terminal: false, + auth: { terminal: false }, + }, + }); + + const started = yield* runtime + .start() + .pipe(Effect.timeoutOption(GEMINI_ACP_PROBE_TIMEOUT_MS)); + if (Option.isNone(started)) { + return { + status: "warning" as const, + auth: { status: "unknown" as const }, + models: [], + message: formatGeminiDiscoveryWarning("Timed out while starting Gemini ACP session."), + } satisfies GeminiCapabilityProbeResult; + } + + const models = parseGeminiDiscoveredModels( + started.value.sessionSetupResult, + input.capabilities ?? DEFAULT_GEMINI_MODEL_CAPABILITIES, + ); + if (models.length === 0) { + return { + status: "warning" as const, + auth: { status: "authenticated" as const }, + models: [], + message: formatGeminiDiscoveryWarning( + "Gemini ACP session started, but it did not report any available models.", + ), + } satisfies GeminiCapabilityProbeResult; + } + + return { + status: "ready" as const, + auth: { status: "authenticated" as const }, + models, + message: "Gemini CLI is installed and authenticated.", + } satisfies GeminiCapabilityProbeResult; + }), + ).pipe( + Effect.catchCause((cause) => + Effect.succeed({ + ...parseGeminiAcpProbeError(Cause.squash(cause)), + models: [], + }), + ), + ); diff --git a/apps/server/src/provider/geminiBinaryPath.ts b/apps/server/src/provider/geminiBinaryPath.ts new file mode 100644 index 00000000000..98e141d0b16 --- /dev/null +++ b/apps/server/src/provider/geminiBinaryPath.ts @@ -0,0 +1,4 @@ +export function resolveGeminiBinaryPath(binaryPath: string | undefined): string { + const trimmed = binaryPath?.trim(); + return trimmed && trimmed.length > 0 ? trimmed : "gemini"; +} diff --git a/apps/server/src/provider/geminiCliFiles.test.ts b/apps/server/src/provider/geminiCliFiles.test.ts new file mode 100644 index 00000000000..156ce121af5 --- /dev/null +++ b/apps/server/src/provider/geminiCliFiles.test.ts @@ -0,0 +1,33 @@ +import assert from "node:assert/strict"; +import { describe, it } from "@effect/vitest"; + +import { parseGeminiEnvFile } from "./geminiCliFiles.ts"; + +describe("parseGeminiEnvFile", () => { + it("parses Gemini CLI dotenv files", () => { + assert.deepStrictEqual( + parseGeminiEnvFile(String.raw` +# comment +GOOGLE_CLOUD_PROJECT=t3-code-enterprise +export GOOGLE_CLOUD_PROJECT_ID="project-id" +GEMINI_API_KEY='api-key' +INVALID-KEY=ignored +EMPTY= +ESCAPED_NEWLINE="hello\nworld" +ESCAPED_QUOTE="hello \"world\"" +ESCAPED_BACKSLASH="path\\to\\file" +LITERAL_BACKSLASH_N="hello\\nworld" +`), + { + GOOGLE_CLOUD_PROJECT: "t3-code-enterprise", + GOOGLE_CLOUD_PROJECT_ID: "project-id", + GEMINI_API_KEY: "api-key", + EMPTY: "", + ESCAPED_NEWLINE: "hello\nworld", + ESCAPED_QUOTE: `hello "world"`, + ESCAPED_BACKSLASH: String.raw`path\to\file`, + LITERAL_BACKSLASH_N: String.raw`hello\nworld`, + }, + ); + }); +}); diff --git a/apps/server/src/provider/geminiCliFiles.ts b/apps/server/src/provider/geminiCliFiles.ts new file mode 100644 index 00000000000..e98203a10f3 --- /dev/null +++ b/apps/server/src/provider/geminiCliFiles.ts @@ -0,0 +1,332 @@ +const fs = require("node:fs").promises as typeof import("node:fs/promises"); +const os = require("node:os") as typeof import("node:os"); +const path = require("node:path") as typeof import("node:path"); + +import { TurnId as TurnIdSchema, type TurnId } from "@t3tools/contracts"; +import { buildGeminiThinkingModelConfigAliases } from "@t3tools/shared/model"; +import * as DateTime from "effect/DateTime"; +import * as Effect from "effect/Effect"; + +import { asArray, asNumber, asRecord, trimToUndefined } from "./jsonValue.ts"; + +const GEMINI_RESUME_CURSOR_VERSION = 1; +const GEMINI_TMP_DIR = path.join(os.homedir(), ".gemini", "tmp"); +const GEMINI_USER_ENV_PATH = path.join(os.homedir(), ".gemini", ".env"); +const GEMINI_CHAT_DIR_NAME = "chats"; +const GEMINI_SESSION_FILE_PREFIX = "session-"; +const T3CODE_GEMINI_SETTINGS_DIR = path.join(os.tmpdir(), "t3code", "gemini"); + +export interface GeminiStoredTurn { + readonly id: TurnId; + readonly items: Array; + readonly snapshotSessionId?: string; + readonly snapshotFilePath?: string; +} + +function cloneUnknownArray(items: ReadonlyArray): Array { + return items.map((item) => { + const record = asRecord(item); + return record ? Object.assign({}, record) : item; + }); +} + +export function cloneGeminiStoredTurn(turn: GeminiStoredTurn): GeminiStoredTurn { + return { + id: turn.id, + items: cloneUnknownArray(turn.items), + ...(turn.snapshotSessionId ? { snapshotSessionId: turn.snapshotSessionId } : {}), + ...(turn.snapshotFilePath ? { snapshotFilePath: turn.snapshotFilePath } : {}), + }; +} + +export function cloneGeminiTurnItems(items: ReadonlyArray): Array { + return cloneUnknownArray(items); +} + +export function readGeminiResumeSessionId(resumeCursor: unknown): string | undefined { + const record = asRecord(resumeCursor); + if (!record) { + return undefined; + } + + const schemaVersion = asNumber(record.schemaVersion); + if (schemaVersion !== undefined && schemaVersion !== GEMINI_RESUME_CURSOR_VERSION) { + return undefined; + } + + return trimToUndefined(record.sessionId); +} + +export function buildGeminiResumeCursor(sessionId: string) { + return { + schemaVersion: GEMINI_RESUME_CURSOR_VERSION, + sessionId, + }; +} + +export function readLegacyGeminiResumeTurns(resumeCursor: unknown): Array { + const record = asRecord(resumeCursor); + const schemaVersion = asNumber(record?.schemaVersion); + if (schemaVersion !== undefined) { + return []; + } + + return ( + asArray(record?.snapshots)?.reduce>((acc, entry) => { + const snapshot = asRecord(entry); + const turnId = trimToUndefined(snapshot?.turnId); + const sessionId = trimToUndefined(snapshot?.sessionId); + const items = asArray(snapshot?.items); + if (!turnId || !sessionId || !items) { + return acc; + } + const filePath = trimToUndefined(snapshot?.filePath); + acc.push({ + id: TurnIdSchema.make(turnId), + items: cloneUnknownArray(items), + snapshotSessionId: sessionId, + ...(filePath ? { snapshotFilePath: filePath } : {}), + }); + return acc; + }, []) ?? [] + ); +} + +function isStoredGeminiSession(value: unknown): value is Record & { + sessionId: string; + messages: Array; + startTime: string; + lastUpdated: string; +} { + const record = asRecord(value); + return Boolean( + trimToUndefined(record?.sessionId) && + asArray(record?.messages) && + trimToUndefined(record?.startTime) && + trimToUndefined(record?.lastUpdated), + ); +} + +async function currentIsoTimestamp(): Promise { + return DateTime.formatIso(await Effect.runPromise(DateTime.now)); +} + +async function makeGeminiSessionFileName(sessionId: string): Promise { + const timestamp = (await currentIsoTimestamp()).replaceAll(":", "-").replaceAll(".", "-"); + return `${GEMINI_SESSION_FILE_PREFIX}${timestamp}-${sessionId.slice(0, 8)}.json`; +} + +async function readStoredGeminiSession(filePath: string) { + const content = JSON.parse(await fs.readFile(filePath, "utf8")) as unknown; + if (!isStoredGeminiSession(content)) { + throw new Error(`Invalid Gemini session file: ${filePath}`); + } + return content; +} + +export async function findGeminiSessionFileById( + sessionId: string, + hintedPath?: string, +): Promise { + const prefix = sessionId.slice(0, 8); + const candidatePaths = new Set(); + if (hintedPath) { + candidatePaths.add(hintedPath); + } + + let projectDirs: Array = []; + try { + projectDirs = (await fs.readdir(GEMINI_TMP_DIR, { withFileTypes: true })) + .filter((entry) => entry.isDirectory()) + .map((entry) => path.join(GEMINI_TMP_DIR, entry.name, GEMINI_CHAT_DIR_NAME)); + } catch { + return undefined; + } + + for (const chatsDir of projectDirs) { + try { + const files = await fs.readdir(chatsDir, { withFileTypes: true }); + for (const entry of files) { + if ( + entry.isFile() && + entry.name.startsWith(GEMINI_SESSION_FILE_PREFIX) && + entry.name.endsWith(".json") && + entry.name.includes(prefix) + ) { + candidatePaths.add(path.join(chatsDir, entry.name)); + } + } + } catch { + // Ignore project temp dirs without chats. + } + } + + for (const candidatePath of candidatePaths) { + try { + const storedSession = await readStoredGeminiSession(candidatePath); + if (storedSession.sessionId === sessionId) { + return candidatePath; + } + } catch { + // Ignore unreadable or unrelated files. + } + } + + return undefined; +} + +export async function cloneGeminiSessionFile( + sourcePath: string, + sessionId: string, +): Promise { + const storedSession = await readStoredGeminiSession(sourcePath); + const nextSession = { + ...storedSession, + sessionId, + lastUpdated: await currentIsoTimestamp(), + }; + const destinationPath = path.join( + path.dirname(sourcePath), + await makeGeminiSessionFileName(sessionId), + ); + await fs.writeFile(destinationPath, `${JSON.stringify(nextSession, null, 2)}\n`, "utf8"); + return destinationPath; +} + +export async function writeGeminiModelAliasSettings(input: { + readonly scopeId: string; + readonly modelIds: ReadonlyArray; +}): Promise<{ + readonly systemSettingsPath?: string; + readonly env?: Readonly>; +}> { + const modelIds = input.modelIds.filter( + (model, index, collection) => model.trim() && collection.indexOf(model) === index, + ); + const aliases = buildGeminiThinkingModelConfigAliases(modelIds); + if (Object.keys(aliases).length === 0) { + return {}; + } + + const systemSettingsPath = path.join( + T3CODE_GEMINI_SETTINGS_DIR, + `${input.scopeId}-${crypto.randomUUID()}.json`, + ); + await fs.mkdir(T3CODE_GEMINI_SETTINGS_DIR, { recursive: true }); + await fs.writeFile( + systemSettingsPath, + JSON.stringify( + { + modelConfigs: { + aliases, + }, + }, + null, + 2, + ), + "utf8", + ); + + return { + systemSettingsPath, + env: { + GEMINI_CLI_SYSTEM_SETTINGS_PATH: systemSettingsPath, + }, + }; +} + +function unquoteEnvValue(value: string): string { + const trimmed = value.trim(); + if (trimmed.length < 2) { + return trimmed; + } + + const quote = trimmed[0]; + if ((quote !== `"` && quote !== `'`) || trimmed.at(-1) !== quote) { + return trimmed; + } + + const inner = trimmed.slice(1, -1); + if (quote !== `"`) { + return inner; + } + + let result = ""; + for (let index = 0; index < inner.length; index += 1) { + const char = inner[index]; + if (char !== "\\" || index === inner.length - 1) { + result += char; + continue; + } + + const next = inner[index + 1]; + index += 1; + if (next === "n") { + result += "\n"; + } else if (next === `"` || next === "\\") { + result += next; + } else { + result += `${char}${next}`; + } + } + return result; +} + +export function parseGeminiEnvFile(content: string): Readonly> { + const env: Record = {}; + + for (const rawLine of content.split(/\r?\n/)) { + const line = rawLine.trim(); + if (!line || line.startsWith("#")) { + continue; + } + + const normalizedLine = line.startsWith("export ") ? line.slice("export ".length).trim() : line; + const separatorIndex = normalizedLine.indexOf("="); + if (separatorIndex <= 0) { + continue; + } + + const key = normalizedLine.slice(0, separatorIndex).trim(); + if (!/^[A-Za-z_][A-Za-z0-9_]*$/.test(key)) { + continue; + } + + const value = normalizedLine.slice(separatorIndex + 1); + env[key] = unquoteEnvValue(value); + } + + return env; +} + +export async function readGeminiUserEnv(): Promise>> { + try { + return parseGeminiEnvFile(await fs.readFile(GEMINI_USER_ENV_PATH, "utf8")); + } catch { + return {}; + } +} + +export async function readGeminiLaunchEnv( + overrides?: Readonly>, +): Promise> | undefined> { + const userEnv = await readGeminiUserEnv(); + const merged = { + ...userEnv, + ...process.env, + ...overrides, + }; + const entries = Object.entries(merged).filter( + (entry): entry is [string, string] => typeof entry[1] === "string", + ); + return entries.length > 0 ? Object.fromEntries(entries) : undefined; +} + +export function cleanupGeminiSystemSettings(systemSettingsPath: string | undefined): void { + if (!systemSettingsPath) { + return; + } + void fs.unlink(systemSettingsPath).catch(() => { + // Ignore already deleted temporary settings files. + }); +} diff --git a/apps/server/src/provider/jsonValue.ts b/apps/server/src/provider/jsonValue.ts new file mode 100644 index 00000000000..a6fc50a7820 --- /dev/null +++ b/apps/server/src/provider/jsonValue.ts @@ -0,0 +1,18 @@ +export function asRecord(value: unknown): Record | undefined { + return value && typeof value === "object" && !Array.isArray(value) + ? (value as Record) + : undefined; +} + +export function asArray(value: unknown): ReadonlyArray | undefined { + return Array.isArray(value) ? value : undefined; +} + +export function asNumber(value: unknown): number | undefined { + return typeof value === "number" && Number.isFinite(value) ? value : undefined; +} + +export function trimToUndefined(value: unknown): string | undefined { + const candidate = typeof value === "string" ? value.trim() : undefined; + return candidate && candidate.length > 0 ? candidate : undefined; +} diff --git a/apps/server/src/provider/providerSnapshot.ts b/apps/server/src/provider/providerSnapshot.ts index c40903e1b45..fd18ff543a3 100644 --- a/apps/server/src/provider/providerSnapshot.ts +++ b/apps/server/src/provider/providerSnapshot.ts @@ -126,6 +126,9 @@ export function providerModelsFromSettings( provider: ProviderDriverKind, customModels: ReadonlyArray, customModelCapabilities: ModelCapabilities, + options?: { + readonly formatCustomModelName?: (slug: string) => string; + }, ): ReadonlyArray { const resolvedBuiltInModels = [...builtInModels]; const seen = new Set(resolvedBuiltInModels.map((model) => model.slug)); @@ -139,7 +142,7 @@ export function providerModelsFromSettings( seen.add(normalized); customEntries.push({ slug: normalized, - name: normalized, + name: options?.formatCustomModelName?.(normalized) ?? normalized, isCustom: true, capabilities: customModelCapabilities, }); diff --git a/apps/server/src/textGeneration/GeminiTextGeneration.ts b/apps/server/src/textGeneration/GeminiTextGeneration.ts new file mode 100644 index 00000000000..c2117ecd145 --- /dev/null +++ b/apps/server/src/textGeneration/GeminiTextGeneration.ts @@ -0,0 +1,311 @@ +import * as Effect from "effect/Effect"; +import * as Option from "effect/Option"; +import * as Schema from "effect/Schema"; +import * as Stream from "effect/Stream"; +import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"; + +import { type GeminiSettings, type ModelSelection, TextGenerationError } from "@t3tools/contracts"; +import { sanitizeBranchFragment, sanitizeFeatureBranchName } from "@t3tools/shared/git"; +import { resolveGeminiApiModelId } from "@t3tools/shared/model"; + +import { + cleanupGeminiSystemSettings, + readGeminiLaunchEnv, + writeGeminiModelAliasSettings, +} from "../provider/geminiCliFiles.ts"; +import { resolveGeminiBinaryPath } from "../provider/geminiBinaryPath.ts"; +import { type TextGenerationShape } from "./TextGeneration.ts"; +import { + buildBranchNamePrompt, + buildCommitMessagePrompt, + buildPrContentPrompt, + buildThreadTitlePrompt, +} from "./TextGenerationPrompts.ts"; +import { + extractJsonValueFromText, + normalizeCliError, + sanitizeCommitSubject, + sanitizePrTitle, + sanitizeThreadTitle, +} from "./TextGenerationUtils.ts"; + +const GEMINI_TIMEOUT_MS = 180_000; + +const GeminiOutputEnvelope = Schema.Struct({ + response: Schema.String, + session_id: Schema.optional(Schema.String), + stats: Schema.optional(Schema.Unknown), +}); +const decodeGeminiOutputEnvelope = Schema.decodeEffect(Schema.fromJsonString(GeminiOutputEnvelope)); + +export const makeGeminiTextGeneration = Effect.fn("makeGeminiTextGeneration")(function* ( + geminiSettings: GeminiSettings, + environment: NodeJS.ProcessEnv = process.env, +) { + const commandSpawner = yield* ChildProcessSpawner.ChildProcessSpawner; + + const readStreamAsString = ( + operation: string, + stream: Stream.Stream, + ): Effect.Effect => + stream.pipe( + Stream.decodeText(), + Stream.runFold( + () => "", + (acc, chunk) => acc + chunk, + ), + Effect.mapError((cause) => + normalizeCliError("gemini", operation, cause, "Failed to collect process output"), + ), + ); + + const runGeminiJson = Effect.fn("runGeminiJson")(function* ({ + operation, + cwd, + prompt, + outputSchemaJson, + modelSelection, + }: { + operation: + | "generateCommitMessage" + | "generatePrContent" + | "generateBranchName" + | "generateThreadTitle"; + cwd: string; + prompt: string; + outputSchemaJson: S; + modelSelection: ModelSelection; + }): Effect.fn.Return { + const runGeminiCommand = Effect.fn("runGeminiJson.runGeminiCommand")(function* () { + const binaryPath = resolveGeminiBinaryPath(geminiSettings.binaryPath); + const launchConfig = yield* Effect.tryPromise({ + try: async () => { + const modelAliasSettings = await writeGeminiModelAliasSettings({ + scopeId: `git-${operation}`, + modelIds: [modelSelection.model], + }); + const env = await readGeminiLaunchEnv(modelAliasSettings.env); + return { + ...modelAliasSettings, + ...(env ? { env } : {}), + }; + }, + catch: (cause) => + new TextGenerationError({ + operation, + detail: "Failed to prepare Gemini CLI launch environment.", + cause, + }), + }); + yield* Effect.addFinalizer(() => + Effect.sync(() => cleanupGeminiSystemSettings(launchConfig.systemSettingsPath)), + ); + const command = ChildProcess.make( + binaryPath, + [ + "--prompt", + "", + "--model", + resolveGeminiApiModelId(modelSelection.model, modelSelection.options), + "--output-format", + "json", + "--approval-mode", + "plan", + ], + { + cwd, + env: launchConfig.env ? { ...environment, ...launchConfig.env } : environment, + shell: process.platform === "win32", + stdin: { + stream: Stream.encodeText(Stream.make(prompt)), + }, + }, + ); + + const child = yield* commandSpawner + .spawn(command) + .pipe( + Effect.mapError((cause) => + normalizeCliError("gemini", operation, cause, "Failed to spawn Gemini CLI process"), + ), + ); + + const [stdout, stderr, exitCode] = yield* Effect.all( + [ + readStreamAsString(operation, child.stdout), + readStreamAsString(operation, child.stderr), + child.exitCode.pipe( + Effect.mapError((cause) => + normalizeCliError("gemini", operation, cause, "Failed to read Gemini CLI exit code"), + ), + ), + ], + { concurrency: "unbounded" }, + ); + + if (exitCode !== 0) { + const stderrDetail = stderr.trim(); + const stdoutDetail = stdout.trim(); + const detail = stderrDetail.length > 0 ? stderrDetail : stdoutDetail; + return yield* new TextGenerationError({ + operation, + detail: + detail.length > 0 + ? `Gemini CLI command failed: ${detail}` + : `Gemini CLI command failed with code ${exitCode}.`, + }); + } + + return stdout; + }); + + const rawStdout = yield* runGeminiCommand().pipe( + Effect.scoped, + Effect.timeoutOption(GEMINI_TIMEOUT_MS), + Effect.flatMap( + Option.match({ + onNone: () => + Effect.fail( + new TextGenerationError({ operation, detail: "Gemini CLI request timed out." }), + ), + onSome: (value) => Effect.succeed(value), + }), + ), + ); + + const envelope = yield* decodeGeminiOutputEnvelope(rawStdout).pipe( + Effect.catchTag("SchemaError", (cause) => + Effect.fail( + new TextGenerationError({ + operation, + detail: "Gemini CLI returned unexpected output format.", + cause, + }), + ), + ), + ); + + const decodedStructuredOutput = yield* Effect.try({ + try: () => extractJsonValueFromText(envelope.response), + catch: (cause) => + new TextGenerationError({ + operation, + detail: "Gemini CLI response did not contain valid structured JSON.", + cause, + }), + }); + + return yield* Schema.decodeEffect(outputSchemaJson)(decodedStructuredOutput).pipe( + Effect.catchTag("SchemaError", (cause) => + Effect.fail( + new TextGenerationError({ + operation, + detail: "Gemini returned invalid structured output.", + cause, + }), + ), + ), + ); + }); + + const generateCommitMessage: TextGenerationShape["generateCommitMessage"] = Effect.fn( + "GeminiTextGeneration.generateCommitMessage", + )(function* (input) { + const { prompt, outputSchema } = buildCommitMessagePrompt({ + branch: input.branch, + stagedSummary: input.stagedSummary, + stagedPatch: input.stagedPatch, + includeBranch: input.includeBranch === true, + }); + + const generated = yield* runGeminiJson({ + operation: "generateCommitMessage", + cwd: input.cwd, + prompt, + outputSchemaJson: outputSchema, + modelSelection: input.modelSelection, + }); + + return { + subject: sanitizeCommitSubject(generated.subject), + body: generated.body.trim(), + ...("branch" in generated && typeof generated.branch === "string" + ? { branch: sanitizeFeatureBranchName(generated.branch) } + : {}), + }; + }); + + const generatePrContent: TextGenerationShape["generatePrContent"] = Effect.fn( + "GeminiTextGeneration.generatePrContent", + )(function* (input) { + const { prompt, outputSchema } = buildPrContentPrompt({ + baseBranch: input.baseBranch, + headBranch: input.headBranch, + commitSummary: input.commitSummary, + diffSummary: input.diffSummary, + diffPatch: input.diffPatch, + }); + + const generated = yield* runGeminiJson({ + operation: "generatePrContent", + cwd: input.cwd, + prompt, + outputSchemaJson: outputSchema, + modelSelection: input.modelSelection, + }); + + return { + title: sanitizePrTitle(generated.title), + body: generated.body.trim(), + }; + }); + + const generateBranchName: TextGenerationShape["generateBranchName"] = Effect.fn( + "GeminiTextGeneration.generateBranchName", + )(function* (input) { + const { prompt, outputSchema } = buildBranchNamePrompt({ + message: input.message, + attachments: input.attachments, + }); + + const generated = yield* runGeminiJson({ + operation: "generateBranchName", + cwd: input.cwd, + prompt, + outputSchemaJson: outputSchema, + modelSelection: input.modelSelection, + }); + + return { + branch: sanitizeBranchFragment(generated.branch), + }; + }); + + const generateThreadTitle: TextGenerationShape["generateThreadTitle"] = Effect.fn( + "GeminiTextGeneration.generateThreadTitle", + )(function* (input) { + const { prompt, outputSchema } = buildThreadTitlePrompt({ + message: input.message, + attachments: input.attachments, + }); + + const generated = yield* runGeminiJson({ + operation: "generateThreadTitle", + cwd: input.cwd, + prompt, + outputSchemaJson: outputSchema, + modelSelection: input.modelSelection, + }); + + return { + title: sanitizeThreadTitle(generated.title), + }; + }); + + return { + generateCommitMessage, + generatePrContent, + generateBranchName, + generateThreadTitle, + } satisfies TextGenerationShape; +}); diff --git a/apps/server/src/textGeneration/TextGenerationUtils.test.ts b/apps/server/src/textGeneration/TextGenerationUtils.test.ts new file mode 100644 index 00000000000..084a8ff4eba --- /dev/null +++ b/apps/server/src/textGeneration/TextGenerationUtils.test.ts @@ -0,0 +1,37 @@ +import { describe, expect, it } from "vitest"; + +import { extractJsonObject, extractJsonValueFromText } from "./TextGenerationUtils.ts"; + +describe("extractJsonObject", () => { + it("extracts the first balanced object from surrounding text", () => { + expect(extractJsonObject('prefix {"title":"hello","items":[1,2]} suffix')).toBe( + '{"title":"hello","items":[1,2]}', + ); + }); + + it("preserves the legacy partial tail fallback for unterminated objects", () => { + expect(extractJsonObject('prefix {"title":"hello"')).toBe('{"title":"hello"'); + }); + + it("returns the trimmed input when no object exists", () => { + expect(extractJsonObject(" no json here ")).toBe("no json here"); + }); +}); + +describe("extractJsonValueFromText", () => { + it("parses a fenced JSON array from model output", () => { + expect(extractJsonValueFromText('```json\n[1, {"title":"hello"}]\n```')).toEqual([ + 1, + { title: "hello" }, + ]); + }); + + it("parses the first balanced JSON value from surrounding prose", () => { + expect( + extractJsonValueFromText('Result: {"title":"hello","items":[1,2]} trailing text'), + ).toEqual({ + title: "hello", + items: [1, 2], + }); + }); +}); diff --git a/apps/server/src/textGeneration/TextGenerationUtils.ts b/apps/server/src/textGeneration/TextGenerationUtils.ts index a786f81b2c8..5552e0020a9 100644 --- a/apps/server/src/textGeneration/TextGenerationUtils.ts +++ b/apps/server/src/textGeneration/TextGenerationUtils.ts @@ -19,6 +19,103 @@ export function limitSection(value: string, maxChars: number): string { return `${truncated}\n\n[truncated]`; } +type JsonExtractionMode = "object" | "value"; + +function scanBalancedJsonSubstring( + value: string, + start: number, + mode: JsonExtractionMode, +): string | null { + const stack: string[] = []; + let inString = false; + let escaped = false; + + for (let index = start; index < value.length; index += 1) { + const char = value[index]; + if (inString) { + if (escaped) { + escaped = false; + continue; + } + if (char === "\\") { + escaped = true; + continue; + } + if (char === '"') { + inString = false; + } + continue; + } + + if (char === '"') { + inString = true; + continue; + } + + if (char === "{" || (mode === "value" && char === "[")) { + stack.push(char); + continue; + } + + if (char === "}" || (mode === "value" && char === "]")) { + const expected = char === "}" ? "{" : "["; + if (stack.pop() !== expected) { + return null; + } + if (stack.length === 0) { + return value.slice(start, index + 1); + } + } + } + + return null; +} + +function findJsonValueStart(value: string): number { + const firstBrace = value.indexOf("{"); + const firstBracket = value.indexOf("["); + return firstBrace === -1 + ? firstBracket + : firstBracket === -1 + ? firstBrace + : Math.min(firstBrace, firstBracket); +} + +export function extractJsonObject(raw: string): string { + const trimmed = raw.trim(); + if (trimmed.length === 0) { + return trimmed; + } + + const start = trimmed.indexOf("{"); + if (start < 0) { + return trimmed; + } + + return scanBalancedJsonSubstring(trimmed, start, "object") ?? trimmed.slice(start); +} + +function stripMarkdownCodeFence(value: string): string { + const trimmed = value.trim(); + const fenced = trimmed.match(/^```(?:json)?\s*([\s\S]*?)\s*```$/i); + return fenced?.[1]?.trim() ?? trimmed; +} + +export function extractJsonValueFromText(value: string): unknown { + const normalized = stripMarkdownCodeFence(value); + try { + return JSON.parse(normalized); + } catch { + const trimmed = normalized.trim(); + const start = findJsonValueStart(trimmed); + const jsonSubstring = start === -1 ? null : scanBalancedJsonSubstring(trimmed, start, "value"); + if (jsonSubstring === null) { + throw new Error("No JSON object or array found in model response."); + } + return JSON.parse(jsonSubstring); + } +} + /** Normalise a raw commit subject to imperative-mood, ≤72 chars, no trailing period. */ export function sanitizeCommitSubject(raw: string): string { const singleLine = raw.trim().split(/\r?\n/g)[0]?.trim() ?? ""; diff --git a/apps/web/src/components/KeybindingsToast.browser.tsx b/apps/web/src/components/KeybindingsToast.browser.tsx index 611eaf572d0..4395de537bb 100644 --- a/apps/web/src/components/KeybindingsToast.browser.tsx +++ b/apps/web/src/components/KeybindingsToast.browser.tsx @@ -123,6 +123,7 @@ function createBaseServerConfig(): ServerConfig { launchArgs: "", }, cursor: { enabled: true, binaryPath: "", apiEndpoint: "", customModels: [] }, + gemini: { enabled: true, binaryPath: "", customModels: [] }, opencode: { enabled: true, binaryPath: "", diff --git a/apps/web/src/components/Sidebar.logic.test.ts b/apps/web/src/components/Sidebar.logic.test.ts index 926c117c1c0..164259e05eb 100644 --- a/apps/web/src/components/Sidebar.logic.test.ts +++ b/apps/web/src/components/Sidebar.logic.test.ts @@ -480,6 +480,7 @@ describe("resolveThreadStatusPill", () => { interactionMode: "plan" as const, latestTurn: null, lastVisitedAt: undefined, + updatedAt: "2026-03-09T10:05:00.000Z", session: { provider: ProviderDriverKind.make("codex"), status: "running" as const, @@ -554,21 +555,22 @@ describe("resolveThreadStatusPill", () => { }); it("shows completed when there is an unseen completion and no active blocker", () => { - expect( - resolveThreadStatusPill({ - thread: { - ...baseThread, - interactionMode: "default", - latestTurn: makeLatestTurn(), - lastVisitedAt: "2026-03-09T10:04:00.000Z", - session: { - ...baseThread.session, - status: "ready", - orchestrationStatus: "ready", - }, + const status = resolveThreadStatusPill({ + thread: { + ...baseThread, + interactionMode: "default", + latestTurn: makeLatestTurn(), + lastVisitedAt: "2026-03-09T10:04:00.000Z", + session: { + ...baseThread.session, + status: "ready", + orchestrationStatus: "ready", }, - }), - ).toMatchObject({ label: "Completed", pulse: false }); + }, + }); + + expect(status).toMatchObject({ label: "Completed", pulse: false }); + expect(status).not.toHaveProperty("workingProvider"); }); }); diff --git a/apps/web/src/components/Sidebar.logic.ts b/apps/web/src/components/Sidebar.logic.ts index 39b759ac313..cf453eaea57 100644 --- a/apps/web/src/components/Sidebar.logic.ts +++ b/apps/web/src/components/Sidebar.logic.ts @@ -36,6 +36,7 @@ export interface ThreadStatusPill { colorClass: string; dotClass: string; pulse: boolean; + workingProvider?: string; } const THREAD_STATUS_PRIORITY: Record = { @@ -330,6 +331,7 @@ export function resolveThreadStatusPill(input: { thread: ThreadStatusInput; }): ThreadStatusPill | null { const { thread } = input; + const workingProvider = thread.session?.provider; if (thread.hasPendingApprovals) { return { @@ -355,6 +357,7 @@ export function resolveThreadStatusPill(input: { colorClass: "text-sky-600 dark:text-sky-300/80", dotClass: "bg-sky-500 dark:bg-sky-300/80", pulse: true, + ...(workingProvider ? { workingProvider } : {}), }; } @@ -364,6 +367,7 @@ export function resolveThreadStatusPill(input: { colorClass: "text-sky-600 dark:text-sky-300/80", dotClass: "bg-sky-500 dark:bg-sky-300/80", pulse: true, + ...(workingProvider ? { workingProvider } : {}), }; } diff --git a/apps/web/src/components/ThreadStatusIndicators.tsx b/apps/web/src/components/ThreadStatusIndicators.tsx index 5ed0c0e4c40..c009609435f 100644 --- a/apps/web/src/components/ThreadStatusIndicators.tsx +++ b/apps/web/src/components/ThreadStatusIndicators.tsx @@ -8,11 +8,14 @@ import { useSavedEnvironmentRuntimeStore, } from "../environments/runtime"; import { useGitStatus } from "../lib/gitStatusState"; +import { cn } from "../lib/utils"; import { type AppState, selectProjectByRef, useStore } from "../store"; import { selectThreadTerminalState, useTerminalStateStore } from "../terminalStateStore"; import { useUiStateStore } from "../uiStateStore"; import { resolveChangeRequestPresentation } from "../sourceControlPresentation"; import { resolveThreadStatusPill, type ThreadStatusPill } from "./Sidebar.logic"; +import { ClaudeAI, CursorIcon, Gemini, OpenAI, OpenCodeIcon } from "./Icons"; +import { normalizeProviderBrandKey, providerIconClassName } from "./providerBrandClassNames"; import type { SidebarThreadSummary } from "../types"; import { Tooltip, TooltipPopup, TooltipTrigger } from "./ui/tooltip"; @@ -100,17 +103,60 @@ export function ThreadStatusLabel({ status: ThreadStatusPill; compact?: boolean; }) { + const normalizedProvider = normalizeProviderBrandKey(status.workingProvider); + const iconClassName = cn( + "size-3", + providerIconClassName(status.workingProvider, "text-foreground"), + status.pulse && "animate-pulse", + ); + const statusIcon = + normalizedProvider === "claudeAgent" ? ( +