Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 113 additions & 7 deletions apps/server/src/provider/Layers/OpenCodeAdapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ const runtimeMock = {
closeError: null as Error | null,
messages: [] as MessageEntry[],
subscribedEvents: [] as unknown[],
subscribedEventDirectory: process.cwd(),
globalEventCalls: 0,
keepSubscriptionOpen: false,
},
reset() {
this.state.startCalls.length = 0;
Expand All @@ -76,9 +79,21 @@ const runtimeMock = {
this.state.closeError = null;
this.state.messages = [];
this.state.subscribedEvents = [];
this.state.subscribedEventDirectory = process.cwd();
this.state.globalEventCalls = 0;
this.state.keepSubscriptionOpen = false;
},
};

function toGlobalEvent(event: unknown, directory: string): unknown {
return event && typeof event === "object" && "payload" in event
? event
: {
directory,
payload: event,
};
}

const OpenCodeRuntimeTestDouble: OpenCodeRuntimeShape = {
startOpenCodeServerProcess: ({ binaryPath }) =>
Effect.gen(function* () {
Expand Down Expand Up @@ -158,14 +173,33 @@ const OpenCodeRuntimeTestDouble: OpenCodeRuntimeShape = {
: runtimeMock.state.messages;
},
},
global: {
event: async (options?: { signal?: AbortSignal }) => {
runtimeMock.state.globalEventCalls += 1;
return {
stream: (async function* () {
let index = 0;
while (!options?.signal?.aborted) {
if (index < runtimeMock.state.subscribedEvents.length) {
yield toGlobalEvent(
runtimeMock.state.subscribedEvents[index++],
runtimeMock.state.subscribedEventDirectory,
);
continue;
}
if (!runtimeMock.state.keepSubscriptionOpen) {
break;
}
await Effect.runPromise(Effect.sleep("5 millis"));
}
})(),
};
},
},
event: {
subscribe: async () => ({
stream: (async function* () {
for (const event of runtimeMock.state.subscribedEvents) {
yield event;
}
})(),
}),
subscribe: async () => {
throw new Error("OpenCodeAdapter should use global.event for runtime events");
},
},
}) as unknown as ReturnType<OpenCodeRuntimeShape["createOpenCodeSdkClient"]>,
loadOpenCodeInventory: () =>
Expand Down Expand Up @@ -654,6 +688,78 @@ it.layer(OpenCodeAdapterTestLayer)("OpenCodeAdapterLive", (it) => {
}),
);

it.effect("streams current OpenCode global event payloads for the session directory", () =>
Effect.gen(function* () {
const adapter = yield* OpenCodeAdapter;
const threadId = asThreadId("thread-opencode-current-event-shape");
runtimeMock.state.keepSubscriptionOpen = true;
runtimeMock.state.subscribedEventDirectory = "/repo/current-event-shape";
const eventsFiber = yield* adapter.streamEvents.pipe(
Stream.filter((event) => event.threadId === threadId),
Stream.filter((event) => event.type === "content.delta"),
Stream.take(1),
Stream.runCollect,
Effect.forkChild,
);

yield* adapter.startSession({
provider: ProviderDriverKind.make("opencode"),
threadId,
cwd: "/repo/current-event-shape",
runtimeMode: "full-access",
});
const turn = yield* adapter.sendTurn({
threadId,
input: "Fix it",
modelSelection: {
instanceId: ProviderInstanceId.make("opencode"),
model: "openai/gpt-5",
},
});

runtimeMock.state.subscribedEvents.push(
{
type: "message.updated",
properties: {
sessionID: "http://127.0.0.1:9999/session",
info: {
id: "msg-assistant-current-shape",
role: "assistant",
},
},
},
{
type: "message.part.updated",
properties: {
sessionID: "http://127.0.0.1:9999/session",
part: {
id: "prt-assistant-current-shape",
sessionID: "http://127.0.0.1:9999/session",
messageID: "msg-assistant-current-shape",
type: "text",
text: "Hello",
time: {
start: 1_778_000_000_000,
},
},
},
},
);

const events = Array.from(yield* Fiber.join(eventsFiber).pipe(Effect.timeout("1 second")));
assert.deepEqual(
events.map((event) => event.type),
["content.delta"],
);
assert.equal(runtimeMock.state.globalEventCalls, 1);
const delta = events.find((event) => event.type === "content.delta");
assert.equal(delta?.turnId, turn.turnId);
if (delta?.type === "content.delta") {
assert.equal(delta.payload.delta, "Hello");
}
}),
);

it.effect("writes provider-native observability records using the session thread id", () =>
Effect.gen(function* () {
const nativeEvents: Array<{
Expand Down
25 changes: 17 additions & 8 deletions apps/server/src/provider/Layers/OpenCodeAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ interface OpenCodeTurnSnapshot {
}

type OpenCodeSubscribedEvent =
Awaited<ReturnType<OpencodeClient["event"]["subscribe"]>> extends {
Awaited<ReturnType<OpencodeClient["global"]["event"]>> extends {
readonly stream: AsyncIterable<infer TEvent>;
}
? TEvent
? TEvent extends { readonly payload: infer TPayload }
? TPayload
: never
: never;

interface OpenCodeSessionContext {
Expand Down Expand Up @@ -89,7 +91,7 @@ interface OpenCodeSessionContext {
/**
* Sole lifecycle handle for the session. Closing this scope:
* - aborts the `AbortController` registered as a finalizer
* (cancels the in-flight `event.subscribe` fetch),
* (cancels the in-flight `global.event` fetch),
* - interrupts the event-pump and server-exit fibers forked
* via `Effect.forkIn(sessionScope)`,
* - tears down the OpenCode server process for scope-owned servers.
Expand Down Expand Up @@ -954,7 +956,7 @@ export function makeOpenCodeAdapter(
const startEventPump = Effect.fn("startEventPump")(function* (context: OpenCodeSessionContext) {
// One AbortController per session scope. The finalizer fires when
// the scope closes (explicit stop, unexpected exit, or layer
// shutdown) and cancels the in-flight `event.subscribe` fetch so
// shutdown) and cancels the in-flight `global.event` fetch so
// the async iterable unwinds cleanly.
const eventsAbortController = new AbortController();
yield* Scope.addFinalizer(
Expand All @@ -965,8 +967,8 @@ export function makeOpenCodeAdapter(
// Fibers forked into `context.sessionScope` are interrupted
// automatically when the scope closes — no bookkeeping required.
yield* Effect.flatMap(
runOpenCodeSdk("event.subscribe", () =>
context.client.event.subscribe(undefined, {
runOpenCodeSdk("global.event", () =>
context.client.global.event({
signal: eventsAbortController.signal,
}),
),
Expand All @@ -975,11 +977,18 @@ export function makeOpenCodeAdapter(
subscription.stream,
(cause) =>
new OpenCodeRuntimeError({
operation: "event.subscribe",
operation: "global.event",
detail: openCodeRuntimeErrorDetail(cause),
cause,
}),
).pipe(Stream.runForEach((event) => handleSubscribedEvent(context, event))),
).pipe(
Stream.runForEach((event) => {
if (!("directory" in event) || event.directory !== context.directory) {
return Effect.void;
}
return handleSubscribedEvent(context, event.payload);
}),
),
).pipe(
Effect.exit,
Effect.flatMap((exit) =>
Expand Down
Loading