diff --git a/apps/server/src/provider/Layers/CodexAdapter.test.ts b/apps/server/src/provider/Layers/CodexAdapter.test.ts index 515a7c6fcbb..d1164ed50db 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.test.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.test.ts @@ -43,6 +43,7 @@ import { type CodexSessionRuntimeOptions, type CodexSessionRuntimeSendTurnInput, type CodexSessionRuntimeShape, + type CodexSessionRuntimeSlashCommandInput, type CodexThreadSnapshot, } from "./CodexSessionRuntime.ts"; import { makeCodexAdapter } from "./CodexAdapter.ts"; @@ -83,6 +84,14 @@ class FakeCodexRuntime implements CodexSessionRuntimeShape { }), ); + public readonly runSlashCommandImpl = vi.fn( + (_input: CodexSessionRuntimeSlashCommandInput): Promise => + Promise.resolve({ + threadId: this.options.threadId, + turnId: asTurnId("turn-1"), + }), + ); + public readonly interruptTurnImpl = vi.fn( (_turnId?: TurnId): Promise => Promise.resolve(undefined), ); @@ -131,6 +140,10 @@ class FakeCodexRuntime implements CodexSessionRuntimeShape { return Effect.promise(() => this.sendTurnImpl(input)); } + runSlashCommand(input: CodexSessionRuntimeSlashCommandInput) { + return Effect.promise(() => this.runSlashCommandImpl(input)); + } + interruptTurn(turnId?: TurnId) { return Effect.promise(() => this.interruptTurnImpl(turnId)); } diff --git a/apps/server/src/provider/Layers/CodexAdapter.ts b/apps/server/src/provider/Layers/CodexAdapter.ts index 270126e934b..4b4bccdfac5 100644 --- a/apps/server/src/provider/Layers/CodexAdapter.ts +++ b/apps/server/src/provider/Layers/CodexAdapter.ts @@ -58,6 +58,7 @@ import { makeCodexSessionRuntime, type CodexSessionRuntimeError, type CodexSessionRuntimeOptions, + type CodexSessionRuntimeSlashCommandInput, type CodexSessionRuntimeShape, } from "./CodexSessionRuntime.ts"; import { type EventNdjsonLogger, makeEventNdjsonLogger } from "./EventNdjsonLogger.ts"; @@ -92,6 +93,34 @@ interface CodexAdapterSessionContext { stopped: boolean; } +function parseStandaloneCodexSlashCommand( + input: ProviderSendTurnInput, +): CodexSessionRuntimeSlashCommandInput | null { + if ((input.attachments?.length ?? 0) > 0) { + return null; + } + + const text = input.input?.trim(); + if (!text) { + return null; + } + + const match = /^\/([a-z][a-z-]*)(?:\s+([\s\S]*))?$/i.exec(text); + if (!match) { + return null; + } + + const command = match[1]?.toLowerCase(); + const args = match[2]?.trim(); + if (command === "compact" && !args) { + return { command: "compact" }; + } + if (command === "review") { + return args ? { command: "review", instructions: args } : { command: "review" }; + } + return null; +} + function mapCodexRuntimeError( threadId: ThreadId, method: string, @@ -1531,16 +1560,29 @@ export const makeCodexAdapter = Effect.fn("makeCodexAdapter")(function* ( input.modelSelection?.instanceId === boundInstanceId ? getModelSelectionStringOptionValue(input.modelSelection, "reasoningEffort") : undefined; + const selectedModel = + input.modelSelection?.instanceId === boundInstanceId ? input.modelSelection.model : undefined; const serviceTier = input.modelSelection?.instanceId === boundInstanceId ? getCodexServiceTierOptionValue(input.modelSelection) : undefined; + const slashCommand = parseStandaloneCodexSlashCommand(input); + if (slashCommand) { + return yield* session.runtime + .runSlashCommand({ + ...slashCommand, + ...(selectedModel ? { model: selectedModel } : {}), + }) + .pipe( + Effect.mapError((cause) => + mapCodexRuntimeError(input.threadId, `/${slashCommand.command}`, cause), + ), + ); + } return yield* session.runtime .sendTurn({ ...(input.input !== undefined ? { input: input.input } : {}), - ...(input.modelSelection?.instanceId === boundInstanceId - ? { model: input.modelSelection.model } - : {}), + ...(selectedModel ? { model: selectedModel } : {}), ...(reasoningEffort ? { effort: reasoningEffort as EffectCodexSchema.V2TurnStartParams__ReasoningEffort, diff --git a/apps/server/src/provider/Layers/CodexProvider.ts b/apps/server/src/provider/Layers/CodexProvider.ts index 811c362f1e0..a665e99e1b8 100644 --- a/apps/server/src/provider/Layers/CodexProvider.ts +++ b/apps/server/src/provider/Layers/CodexProvider.ts @@ -20,6 +20,7 @@ import type { ModelCapabilities, ProviderOptionDescriptor, ServerProviderModel, + ServerProviderSlashCommand, ServerProviderSkill, } from "@t3tools/contracts"; import { ServerSettingsError } from "@t3tools/contracts"; @@ -42,6 +43,18 @@ const CODEX_PRESENTATION = { showInteractionModeToggle: true, } as const; +const CODEX_APP_SERVER_SLASH_COMMANDS = [ + { + name: "review", + description: "Review current changes and find issues", + input: { hint: "optional review instructions" }, + }, + { + name: "compact", + description: "Summarize conversation history to preserve context", + }, +] satisfies ReadonlyArray; + export interface CodexAppServerProviderSnapshot { readonly account: CodexSchema.V2GetAccountResponse; readonly version: string | undefined; @@ -402,6 +415,7 @@ const makePendingCodexProvider = ( enabled: false, checkedAt, models, + slashCommands: [], skills: [], probe: { installed: false, @@ -418,6 +432,7 @@ const makePendingCodexProvider = ( enabled: true, checkedAt, models, + slashCommands: [], skills: [], probe: { installed: false, @@ -487,6 +502,7 @@ export const checkCodexProviderStatus = Effect.fn("checkCodexProviderStatus")(fu enabled: false, checkedAt, models: emptyModels, + slashCommands: [], skills: [], probe: { installed: false, @@ -518,6 +534,7 @@ export const checkCodexProviderStatus = Effect.fn("checkCodexProviderStatus")(fu enabled: codexSettings.enabled, checkedAt, models: emptyModels, + slashCommands: [], skills: [], probe: { installed, @@ -537,6 +554,7 @@ export const checkCodexProviderStatus = Effect.fn("checkCodexProviderStatus")(fu enabled: codexSettings.enabled, checkedAt, models: emptyModels, + slashCommands: [], skills: [], probe: { installed: true, @@ -556,6 +574,7 @@ export const checkCodexProviderStatus = Effect.fn("checkCodexProviderStatus")(fu enabled: codexSettings.enabled, checkedAt, models: snapshot.models, + slashCommands: CODEX_APP_SERVER_SLASH_COMMANDS, skills: snapshot.skills, probe: { installed: true, diff --git a/apps/server/src/provider/Layers/CodexSessionRuntime.ts b/apps/server/src/provider/Layers/CodexSessionRuntime.ts index 99ac498f0c3..93520214bb0 100644 --- a/apps/server/src/provider/Layers/CodexSessionRuntime.ts +++ b/apps/server/src/provider/Layers/CodexSessionRuntime.ts @@ -26,6 +26,7 @@ import * as Exit from "effect/Exit"; import * as Layer from "effect/Layer"; import * as Queue from "effect/Queue"; import * as Ref from "effect/Ref"; +import * as Semaphore from "effect/Semaphore"; import * as Schema from "effect/Schema"; import * as Scope from "effect/Scope"; import * as Stream from "effect/Stream"; @@ -121,6 +122,22 @@ export interface CodexSessionRuntimeSendTurnInput { readonly interactionMode?: ProviderInteractionMode; } +export type CodexSessionRuntimeSlashCommandInput = + | { + readonly command: "compact"; + readonly model?: string; + } + | { + readonly command: "review"; + readonly instructions?: string; + readonly model?: string; + }; + +interface TurnStartedWaiter { + readonly deferred: Deferred.Deferred; + readonly ignoreTurnIds: ReadonlySet; +} + export interface CodexThreadTurnSnapshot { readonly id: TurnId; readonly items: ReadonlyArray; @@ -137,6 +154,9 @@ export interface CodexSessionRuntimeShape { readonly sendTurn: ( input: CodexSessionRuntimeSendTurnInput, ) => Effect.Effect; + readonly runSlashCommand: ( + input: CodexSessionRuntimeSlashCommandInput, + ) => Effect.Effect; readonly interruptTurn: (turnId?: TurnId) => Effect.Effect; readonly readThread: Effect.Effect; readonly rollbackThread: ( @@ -709,6 +729,9 @@ export const makeCodexSessionRuntime = ( const pendingUserInputsRef = yield* Ref.make(new Map()); const collabReceiverTurnsRef = yield* Ref.make(new Map()); const closedRef = yield* Ref.make(false); + const knownTurnIdsRef = yield* Ref.make>(new Set()); + const turnStartedWaitersRef = yield* Ref.make>([]); + const turnStartSemaphore = yield* Semaphore.make(1); // `~` is not shell-expanded when env vars are set via // `child_process.spawn`; `expandHomePath` lets a configured @@ -886,6 +909,54 @@ export const makeCodexSessionRuntime = ( const currentSessionProviderThreadId = Effect.map(Ref.get(sessionRef), currentProviderThreadId); + const rememberKnownTurnIds = (turnIds: Iterable) => + Ref.update(knownTurnIdsRef, (knownTurnIds) => { + const next = new Set(knownTurnIds); + for (const turnId of turnIds) { + next.add(turnId); + } + return next; + }); + + const rememberKnownTurnId = (turnId: string) => rememberKnownTurnIds([turnId]); + + const makeProcessExitedError = (code?: number) => + new CodexErrors.CodexAppServerProcessExitedError({ + ...(code === undefined ? {} : { code }), + pid: child.pid, + }); + + const failTurnStartedWaiters = (error: CodexErrors.CodexAppServerError) => + Ref.getAndSet(turnStartedWaitersRef, []).pipe( + Effect.flatMap((waiters) => + Effect.forEach(waiters, (waiter) => Deferred.fail(waiter.deferred, error), { + discard: true, + }), + ), + ); + + const resolveTurnStartedWaiters = Effect.fn("CodexSessionRuntime.resolveTurnStartedWaiters")( + function* (rawTurnId: string, turnId: TurnId) { + const waitersToResolve = yield* Ref.modify(turnStartedWaitersRef, (waiters) => { + const pending: Array = []; + const matched: Array = []; + for (const waiter of waiters) { + if (waiter.ignoreTurnIds.has(rawTurnId)) { + pending.push(waiter); + } else { + matched.push(waiter); + } + } + return [matched, pending] as const; + }); + yield* Effect.forEach( + waitersToResolve, + (waiter) => Deferred.succeed(waiter.deferred, turnId), + { discard: true }, + ); + }, + ); + yield* client.handleServerNotification("thread/started", (payload) => currentSessionProviderThreadId.pipe( Effect.flatMap((providerThreadId) => { @@ -905,10 +976,15 @@ export const makeCodexSessionRuntime = ( if (providerThreadId && payload.threadId !== providerThreadId) { return Effect.void; } + const rawTurnId = payload.turn.id; + const turnId = TurnId.make(payload.turn.id); return updateSession(sessionRef, { status: "running", - activeTurnId: TurnId.make(payload.turn.id), - }); + activeTurnId: turnId, + }).pipe( + Effect.andThen(rememberKnownTurnId(rawTurnId)), + Effect.andThen(resolveTurnStartedWaiters(rawTurnId, turnId)), + ); }), ), ); @@ -1172,7 +1248,8 @@ export const makeCodexSessionRuntime = ( yield* child.exitCode.pipe( Effect.flatMap((exitCode) => - Ref.get(closedRef).pipe( + failTurnStartedWaiters(makeProcessExitedError(exitCode)).pipe( + Effect.andThen(Ref.get(closedRef)), Effect.flatMap((closed) => { if (closed) { return Effect.void; @@ -1215,6 +1292,7 @@ export const makeCodexSessionRuntime = ( }); const providerThreadId = opened.thread.id; + yield* rememberKnownTurnIds(opened.thread.turns.map((turn) => turn.id)); const session = { ...(yield* Ref.get(sessionRef)), status: "ready", @@ -1238,11 +1316,84 @@ export const makeCodexSessionRuntime = ( return providerThreadId; }); + const buildTurnStartResult = Effect.fn("CodexSessionRuntime.buildTurnStartResult")(function* ( + turnId: TurnId, + normalizedModel?: string, + ) { + yield* updateSession(sessionRef, { + status: "running", + activeTurnId: turnId, + ...(normalizedModel ? { model: normalizedModel } : {}), + }); + const resumedProviderThreadId = currentProviderThreadId(yield* Ref.get(sessionRef)); + return { + threadId: options.threadId, + turnId, + ...(resumedProviderThreadId ? { resumeCursor: { threadId: resumedProviderThreadId } } : {}), + } satisfies ProviderTurnStartResult; + }); + + const waitForSlashCommandTurnStart = Effect.fn( + "CodexSessionRuntime.waitForSlashCommandTurnStart", + )(function* (waiter: TurnStartedWaiter) { + const cleanup = Ref.update(turnStartedWaitersRef, (waiters) => + waiters.filter((candidate) => candidate !== waiter), + ); + return yield* Deferred.await(waiter.deferred).pipe(Effect.ensuring(cleanup)); + }); + + const runSlashCommand = Effect.fn("CodexSessionRuntime.runSlashCommand")(function* ( + input: CodexSessionRuntimeSlashCommandInput, + ) { + return yield* turnStartSemaphore.withPermit( + Effect.gen(function* () { + const providerThreadId = yield* readProviderThreadId; + const normalizedModel = normalizeCodexModelSlug( + input.model ?? (yield* Ref.get(sessionRef)).model, + ); + + if (input.command === "compact") { + const deferred = yield* Deferred.make(); + const ignoreTurnIds = yield* Ref.get(knownTurnIdsRef); + const waiter = { deferred, ignoreTurnIds } satisfies TurnStartedWaiter; + yield* Ref.update(turnStartedWaitersRef, (waiters) => [...waiters, waiter]); + const cleanup = Ref.update(turnStartedWaitersRef, (waiters) => + waiters.filter((candidate) => candidate !== waiter), + ); + yield* client + .request("thread/compact/start", { + threadId: providerThreadId, + }) + .pipe(Effect.catch((cause) => cleanup.pipe(Effect.andThen(Effect.fail(cause))))); + const turnId = yield* waitForSlashCommandTurnStart(waiter); + return yield* buildTurnStartResult(turnId, normalizedModel); + } + + const instructions = input.instructions?.trim(); + const response = yield* client.request("review/start", { + threadId: providerThreadId, + delivery: "inline", + target: instructions + ? { + type: "custom", + instructions, + } + : { + type: "uncommittedChanges", + }, + }); + yield* rememberKnownTurnId(response.turn.id); + return yield* buildTurnStartResult(TurnId.make(response.turn.id), normalizedModel); + }), + ); + }); + const close = Effect.gen(function* () { const alreadyClosed = yield* Ref.getAndSet(closedRef, true); if (alreadyClosed) { return; } + yield* failTurnStartedWaiters(makeProcessExitedError()); yield* settlePendingApprovals("cancel"); yield* settlePendingUserInputs({}); yield* updateSession(sessionRef, { @@ -1262,56 +1413,48 @@ export const makeCodexSessionRuntime = ( return { start, getSession: Ref.get(sessionRef), + runSlashCommand, sendTurn: (input) => - Effect.gen(function* () { - const providerThreadId = yield* readProviderThreadId; - if (hasConfiguredMcpServer(options.appServerArgs)) { - yield* client.request("config/mcpServer/reload", undefined).pipe( - Effect.catch((cause) => - Effect.logWarning("Failed to refresh Codex MCP tool catalog before turn.", { - cause, - }), - ), + turnStartSemaphore.withPermit( + Effect.gen(function* () { + const providerThreadId = yield* readProviderThreadId; + if (hasConfiguredMcpServer(options.appServerArgs)) { + yield* client.request("config/mcpServer/reload", undefined).pipe( + Effect.catch((cause) => + Effect.logWarning("Failed to refresh Codex MCP tool catalog before turn.", { + cause, + }), + ), + ); + } + const normalizedModel = normalizeCodexModelSlug( + input.model ?? (yield* Ref.get(sessionRef)).model, ); - } - const normalizedModel = normalizeCodexModelSlug( - input.model ?? (yield* Ref.get(sessionRef)).model, - ); - const params = yield* buildTurnStartParams({ - threadId: providerThreadId, - runtimeMode: options.runtimeMode, - ...(input.input ? { prompt: input.input } : {}), - ...(input.attachments ? { attachments: input.attachments } : {}), - ...(normalizedModel ? { model: normalizedModel } : {}), - ...(input.serviceTier ? { serviceTier: input.serviceTier } : {}), - ...(input.effort ? { effort: input.effort } : {}), - ...(input.interactionMode ? { interactionMode: input.interactionMode } : {}), - }); - const rawResponse = yield* client.raw.request("turn/start", params); - const response = yield* decodeV2TurnStartResponse(rawResponse).pipe( - Effect.mapError((error) => - CodexErrors.CodexAppServerProtocolParseError.fromSchemaError( - "decode-response-payload", - error, - { method: "turn/start" }, + const params = yield* buildTurnStartParams({ + threadId: providerThreadId, + runtimeMode: options.runtimeMode, + ...(input.input ? { prompt: input.input } : {}), + ...(input.attachments ? { attachments: input.attachments } : {}), + ...(normalizedModel ? { model: normalizedModel } : {}), + ...(input.serviceTier ? { serviceTier: input.serviceTier } : {}), + ...(input.effort ? { effort: input.effort } : {}), + ...(input.interactionMode ? { interactionMode: input.interactionMode } : {}), + }); + const rawResponse = yield* client.raw.request("turn/start", params); + const response = yield* decodeV2TurnStartResponse(rawResponse).pipe( + Effect.mapError((error) => + CodexErrors.CodexAppServerProtocolParseError.fromSchemaError( + "decode-response-payload", + error, + { method: "turn/start" }, + ), ), - ), - ); - const turnId = TurnId.make(response.turn.id); - yield* updateSession(sessionRef, { - status: "running", - activeTurnId: turnId, - ...(normalizedModel ? { model: normalizedModel } : {}), - }); - const resumedProviderThreadId = currentProviderThreadId(yield* Ref.get(sessionRef)); - return { - threadId: options.threadId, - turnId, - ...(resumedProviderThreadId - ? { resumeCursor: { threadId: resumedProviderThreadId } } - : {}), - } satisfies ProviderTurnStartResult; - }), + ); + yield* rememberKnownTurnId(response.turn.id); + const turnId = TurnId.make(response.turn.id); + return yield* buildTurnStartResult(turnId, normalizedModel); + }), + ), interruptTurn: (turnId) => Effect.gen(function* () { const providerThreadId = yield* readProviderThreadId; @@ -1331,6 +1474,7 @@ export const makeCodexSessionRuntime = ( threadId: providerThreadId, includeTurns: true, }); + yield* rememberKnownTurnIds(response.thread.turns.map((turn) => turn.id)); return parseThreadSnapshot(response); }), rollbackThread: (numTurns) => @@ -1340,6 +1484,7 @@ export const makeCodexSessionRuntime = ( threadId: providerThreadId, numTurns, }); + yield* rememberKnownTurnIds(response.thread.turns.map((turn) => turn.id)); yield* updateSession(sessionRef, { status: "ready", activeTurnId: undefined,