Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ function makeSnapshot(input: {
messages: [],
activities: [],
proposedPlans: [],
workUnits: [],
checkpoints: [
{
turnId: TurnId.makeUnsafe("turn-1"),
Expand Down
237 changes: 237 additions & 0 deletions apps/server/src/codexAppServerManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ function createSendTurnHarness() {
planType: null,
sparkEnabled: true,
},
collabReceiverTurns: new Map(),
collabReceiverItems: new Map(),
};

const requireSession = vi
Expand Down Expand Up @@ -75,6 +77,8 @@ function createThreadControlHarness() {
createdAt: "2026-02-10T00:00:00.000Z",
updatedAt: "2026-02-10T00:00:00.000Z",
},
collabReceiverTurns: new Map(),
collabReceiverItems: new Map(),
};

const requireSession = vi
Expand Down Expand Up @@ -117,6 +121,8 @@ function createPendingUserInputHarness() {
},
],
]),
collabReceiverTurns: new Map(),
collabReceiverItems: new Map(),
};

const requireSession = vi
Expand All @@ -135,6 +141,44 @@ function createPendingUserInputHarness() {
return { manager, context, requireSession, writeMessage, emitEvent };
}

function createCollabNotificationHarness() {
const manager = new CodexAppServerManager();
const context = {
session: {
provider: "codex",
status: "running",
threadId: asThreadId("thread_1"),
runtimeMode: "full-access",
model: "gpt-5.3-codex",
activeTurnId: "turn_parent",
resumeCursor: { threadId: "provider_parent" },
createdAt: "2026-02-10T00:00:00.000Z",
updatedAt: "2026-02-10T00:00:00.000Z",
},
account: {
type: "unknown",
planType: null,
sparkEnabled: true,
},
pending: new Map(),
pendingApprovals: new Map(),
pendingUserInputs: new Map(),
collabReceiverTurns: new Map<string, string>(),
collabReceiverItems: new Map<string, string>(),
nextRequestId: 1,
stopping: false,
};

const emitEvent = vi
.spyOn(manager as unknown as { emitEvent: (...args: unknown[]) => void }, "emitEvent")
.mockImplementation(() => {});
const updateSession = vi
.spyOn(manager as unknown as { updateSession: (...args: unknown[]) => void }, "updateSession")
.mockImplementation(() => {});

return { manager, context, emitEvent, updateSession };
}

describe("classifyCodexStderrLine", () => {
it("ignores empty lines", () => {
expect(classifyCodexStderrLine(" ")).toBeNull();
Expand Down Expand Up @@ -721,6 +765,8 @@ describe("respondToUserInput", () => {
},
pendingApprovals: new Map(),
pendingUserInputs: new Map(),
collabReceiverTurns: new Map(),
collabReceiverItems: new Map(),
};
type ApprovalRequestContext = {
session: typeof context.session;
Expand Down Expand Up @@ -748,6 +794,197 @@ describe("respondToUserInput", () => {
});
});

describe("collab child conversation routing", () => {
it("rewrites child notification turn ids onto the parent turn", () => {
const { manager, context, emitEvent } = createCollabNotificationHarness();

(
manager as unknown as {
handleServerNotification: (context: unknown, notification: Record<string, unknown>) => void;
}
).handleServerNotification(context, {
method: "item/completed",
params: {
item: {
type: "collabAgentToolCall",
id: "call_collab_1",
receiverThreadIds: ["child_provider_1"],
},
threadId: "provider_parent",
turnId: "turn_parent",
},
});

(
manager as unknown as {
handleServerNotification: (context: unknown, notification: Record<string, unknown>) => void;
}
).handleServerNotification(context, {
method: "item/agentMessage/delta",
params: {
threadId: "child_provider_1",
turnId: "turn_child_1",
itemId: "msg_child_1",
delta: "working",
},
});

expect(emitEvent).toHaveBeenLastCalledWith(
expect.objectContaining({
method: "item/agentMessage/delta",
turnId: "turn_parent",
itemId: "msg_child_1",
}),
);
});

it("suppresses child lifecycle notifications so they cannot replace the parent turn", () => {
const { manager, context, emitEvent, updateSession } = createCollabNotificationHarness();

(
manager as unknown as {
handleServerNotification: (context: unknown, notification: Record<string, unknown>) => void;
}
).handleServerNotification(context, {
method: "item/completed",
params: {
item: {
type: "collabAgentToolCall",
id: "call_collab_1",
receiverThreadIds: ["child_provider_1"],
},
threadId: "provider_parent",
turnId: "turn_parent",
},
});
emitEvent.mockClear();
updateSession.mockClear();

(
manager as unknown as {
handleServerNotification: (context: unknown, notification: Record<string, unknown>) => void;
}
).handleServerNotification(context, {
method: "turn/started",
params: {
threadId: "child_provider_1",
turn: { id: "turn_child_1" },
},
});

(
manager as unknown as {
handleServerNotification: (context: unknown, notification: Record<string, unknown>) => void;
}
).handleServerNotification(context, {
method: "turn/completed",
params: {
threadId: "child_provider_1",
turn: { id: "turn_child_1", status: "completed" },
},
});

expect(emitEvent).not.toHaveBeenCalled();
expect(updateSession).not.toHaveBeenCalled();
});

it("rewrites child approval requests onto the parent turn", () => {
const { manager, context, emitEvent } = createCollabNotificationHarness();

(
manager as unknown as {
handleServerNotification: (context: unknown, notification: Record<string, unknown>) => void;
}
).handleServerNotification(context, {
method: "item/completed",
params: {
item: {
type: "collabAgentToolCall",
id: "call_collab_1",
receiverThreadIds: ["child_provider_1"],
},
threadId: "provider_parent",
turnId: "turn_parent",
},
});
emitEvent.mockClear();

(
manager as unknown as {
handleServerRequest: (context: unknown, request: Record<string, unknown>) => void;
}
).handleServerRequest(context, {
id: 42,
method: "item/commandExecution/requestApproval",
params: {
threadId: "child_provider_1",
turnId: "turn_child_1",
itemId: "call_child_1",
command: "bun install",
},
});

expect(Array.from(context.pendingApprovals.values())[0]).toEqual(
expect.objectContaining({
turnId: "turn_parent",
itemId: "call_child_1",
}),
);
expect(emitEvent).toHaveBeenCalledWith(
expect.objectContaining({
method: "item/commandExecution/requestApproval",
turnId: "turn_parent",
itemId: "call_child_1",
}),
);
});

it("rewrites child task notifications onto the parent collab item when no child item id exists", () => {
const { manager, context, emitEvent } = createCollabNotificationHarness();

(
manager as unknown as {
handleServerNotification: (context: unknown, notification: Record<string, unknown>) => void;
}
).handleServerNotification(context, {
method: "item/completed",
params: {
item: {
type: "collabAgentToolCall",
id: "call_collab_1",
receiverThreadIds: ["child_provider_1"],
},
threadId: "provider_parent",
turnId: "turn_parent",
},
});
emitEvent.mockClear();

(
manager as unknown as {
handleServerNotification: (context: unknown, notification: Record<string, unknown>) => void;
}
).handleServerNotification(context, {
method: "codex/event/task_started",
params: {
threadId: "child_provider_1",
msg: {
type: "task_started",
turn_id: "turn_child_1",
},
},
});

expect(emitEvent).toHaveBeenCalledWith(
expect.objectContaining({
method: "codex/event/task_started",
turnId: "turn_parent",
itemId: "call_collab_1",
}),
);
});
});

describe.skipIf(!process.env.CODEX_BINARY_PATH)("startSession live Codex resume", () => {
it("keeps prior thread history when resuming with a changed runtime mode", async () => {
const workspaceDir = mkdtempSync(path.join(os.tmpdir(), "codex-live-resume-"));
Expand Down
Loading