From f4137b564002806f7465ad322d58362cfdcd2477 Mon Sep 17 00:00:00 2001 From: Sam Jandris Date: Sun, 10 May 2026 20:56:10 -0400 Subject: [PATCH] Fix OpenCode event session routing --- .../provider/Layers/OpenCodeAdapter.test.ts | 120 +++++++++++++++++- .../src/provider/Layers/OpenCodeAdapter.ts | 25 ++-- 2 files changed, 130 insertions(+), 15 deletions(-) diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts index 66dc5edc67..1943d3880c 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.test.ts @@ -63,6 +63,9 @@ const runtimeMock = { closeError: null as Error | null, messages: [] as MessageEntry[], subscribedEvents: [] as unknown[], + subscribedEventDirectory: process.cwd(), + globalEventCalls: 0, + keepSubscriptionOpen: false, }, reset() { this.state.startCalls.length = 0; @@ -76,9 +79,21 @@ const runtimeMock = { this.state.closeError = null; this.state.messages = []; this.state.subscribedEvents = []; + this.state.subscribedEventDirectory = process.cwd(); + this.state.globalEventCalls = 0; + this.state.keepSubscriptionOpen = false; }, }; +function toGlobalEvent(event: unknown, directory: string): unknown { + return event && typeof event === "object" && "payload" in event + ? event + : { + directory, + payload: event, + }; +} + const OpenCodeRuntimeTestDouble: OpenCodeRuntimeShape = { startOpenCodeServerProcess: ({ binaryPath }) => Effect.gen(function* () { @@ -158,14 +173,33 @@ const OpenCodeRuntimeTestDouble: OpenCodeRuntimeShape = { : runtimeMock.state.messages; }, }, + global: { + event: async (options?: { signal?: AbortSignal }) => { + runtimeMock.state.globalEventCalls += 1; + return { + stream: (async function* () { + let index = 0; + while (!options?.signal?.aborted) { + if (index < runtimeMock.state.subscribedEvents.length) { + yield toGlobalEvent( + runtimeMock.state.subscribedEvents[index++], + runtimeMock.state.subscribedEventDirectory, + ); + continue; + } + if (!runtimeMock.state.keepSubscriptionOpen) { + break; + } + await Effect.runPromise(Effect.sleep("5 millis")); + } + })(), + }; + }, + }, event: { - subscribe: async () => ({ - stream: (async function* () { - for (const event of runtimeMock.state.subscribedEvents) { - yield event; - } - })(), - }), + subscribe: async () => { + throw new Error("OpenCodeAdapter should use global.event for runtime events"); + }, }, }) as unknown as ReturnType, loadOpenCodeInventory: () => @@ -654,6 +688,78 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => { }), ); + it.effect("streams current OpenCode global event payloads for the session directory", () => + Effect.gen(function* () { + const adapter = yield* OpenCodeAdapter; + const threadId = asThreadId("thread-opencode-current-event-shape"); + runtimeMock.state.keepSubscriptionOpen = true; + runtimeMock.state.subscribedEventDirectory = "/repo/current-event-shape"; + const eventsFiber = yield* adapter.streamEvents.pipe( + Stream.filter((event) => event.threadId === threadId), + Stream.filter((event) => event.type === "content.delta"), + Stream.take(1), + Stream.runCollect, + Effect.forkChild, + ); + + yield* adapter.startSession({ + provider: ProviderDriverKind.make("opencode"), + threadId, + cwd: "/repo/current-event-shape", + runtimeMode: "full-access", + }); + const turn = yield* adapter.sendTurn({ + threadId, + input: "Fix it", + modelSelection: { + instanceId: ProviderInstanceId.make("opencode"), + model: "openai/gpt-5", + }, + }); + + runtimeMock.state.subscribedEvents.push( + { + type: "message.updated", + properties: { + sessionID: "http://127.0.0.1:9999/session", + info: { + id: "msg-assistant-current-shape", + role: "assistant", + }, + }, + }, + { + type: "message.part.updated", + properties: { + sessionID: "http://127.0.0.1:9999/session", + part: { + id: "prt-assistant-current-shape", + sessionID: "http://127.0.0.1:9999/session", + messageID: "msg-assistant-current-shape", + type: "text", + text: "Hello", + time: { + start: 1_778_000_000_000, + }, + }, + }, + }, + ); + + const events = Array.from(yield* Fiber.join(eventsFiber).pipe(Effect.timeout("1 second"))); + assert.deepEqual( + events.map((event) => event.type), + ["content.delta"], + ); + assert.equal(runtimeMock.state.globalEventCalls, 1); + const delta = events.find((event) => event.type === "content.delta"); + assert.equal(delta?.turnId, turn.turnId); + if (delta?.type === "content.delta") { + assert.equal(delta.payload.delta, "Hello"); + } + }), + ); + it.effect("writes provider-native observability records using the session thread id", () => Effect.gen(function* () { const nativeEvents: Array<{ diff --git a/apps/server/src/provider/Layers/OpenCodeAdapter.ts b/apps/server/src/provider/Layers/OpenCodeAdapter.ts index 512e9ed6bf..0da2f5b93f 100644 --- a/apps/server/src/provider/Layers/OpenCodeAdapter.ts +++ b/apps/server/src/provider/Layers/OpenCodeAdapter.ts @@ -58,10 +58,12 @@ interface OpenCodeTurnSnapshot { } type OpenCodeSubscribedEvent = - Awaited> extends { + Awaited> extends { readonly stream: AsyncIterable; } - ? TEvent + ? TEvent extends { readonly payload: infer TPayload } + ? TPayload + : never : never; interface OpenCodeSessionContext { @@ -89,7 +91,7 @@ interface OpenCodeSessionContext { /** * Sole lifecycle handle for the session. Closing this scope: * - aborts the `AbortController` registered as a finalizer - * (cancels the in-flight `event.subscribe` fetch), + * (cancels the in-flight `global.event` fetch), * - interrupts the event-pump and server-exit fibers forked * via `Effect.forkIn(sessionScope)`, * - tears down the OpenCode server process for scope-owned servers. @@ -954,7 +956,7 @@ export function makeOpenCodeAdapter( const startEventPump = Effect.fn("startEventPump")(function* (context: OpenCodeSessionContext) { // One AbortController per session scope. The finalizer fires when // the scope closes (explicit stop, unexpected exit, or layer - // shutdown) and cancels the in-flight `event.subscribe` fetch so + // shutdown) and cancels the in-flight `global.event` fetch so // the async iterable unwinds cleanly. const eventsAbortController = new AbortController(); yield* Scope.addFinalizer( @@ -965,8 +967,8 @@ export function makeOpenCodeAdapter( // Fibers forked into `context.sessionScope` are interrupted // automatically when the scope closes — no bookkeeping required. yield* Effect.flatMap( - runOpenCodeSdk("event.subscribe", () => - context.client.event.subscribe(undefined, { + runOpenCodeSdk("global.event", () => + context.client.global.event({ signal: eventsAbortController.signal, }), ), @@ -975,11 +977,18 @@ export function makeOpenCodeAdapter( subscription.stream, (cause) => new OpenCodeRuntimeError({ - operation: "event.subscribe", + operation: "global.event", detail: openCodeRuntimeErrorDetail(cause), cause, }), - ).pipe(Stream.runForEach((event) => handleSubscribedEvent(context, event))), + ).pipe( + Stream.runForEach((event) => { + if (!("directory" in event) || event.directory !== context.directory) { + return Effect.void; + } + return handleSubscribedEvent(context, event.payload); + }), + ), ).pipe( Effect.exit, Effect.flatMap((exit) =>