Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ode",
"version": "0.1.18",
"version": "0.1.19",
"description": "Coding anywhere with your coding agents connected",
"module": "packages/core/index.ts",
"type": "module",
Expand Down
1 change: 0 additions & 1 deletion packages/agents/claude/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ async function runClaudeCommand(
cwd,
env,
entry,
timeoutMs: 5 * 60 * 1000,
onRecord,
onSpawn: (pid) => {
log.info("Claude CLI spawned", { pid });
Expand Down
3 changes: 1 addition & 2 deletions packages/agents/codex/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export function buildCodexCommandArgs(params: {
if (params.planMode) {
args.push("--sandbox", "read-only");
} else {
args.push("--full-auto");
args.push("--yolo");
}
if (params.model) {
args.push("--model", params.model);
Expand Down Expand Up @@ -203,7 +203,6 @@ export async function sendMessage(
cwd: workingPath,
env: envOverrides,
entry,
timeoutMs: 10 * 60 * 1000,
onRecord: (event) => {
if (event.type === "thread.started" && typeof event.thread_id === "string") {
latestSessionId = event.thread_id;
Expand Down
1 change: 0 additions & 1 deletion packages/agents/gemini/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ export async function sendMessage(
cwd: workingPath,
env: envOverrides,
entry,
timeoutMs: 10 * 60 * 1000,
onRecord: (record) => {
publishGeminiRecordAsSessionEvents(record, sessionId);
},
Expand Down
1 change: 0 additions & 1 deletion packages/agents/goose/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ export async function sendMessage(
cwd: workingPath,
env: envOverrides,
entry,
timeoutMs: 20 * 60 * 1000,
onRecord: (record) => {
const recordSessionId = getRecordSessionId(record, sessionId);
latestSessionId = recordSessionId;
Expand Down
1 change: 0 additions & 1 deletion packages/agents/kilo/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ export async function sendMessage(
cwd: workingPath,
env: envOverrides,
entry,
timeoutMs: 10 * 60 * 1000,
onRecord: (record) => {
publishKiloRecordAsSessionEvents(record, sessionId);
const recordSessionId = getRecordSessionId(record, sessionId);
Expand Down
1 change: 0 additions & 1 deletion packages/agents/kimi/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ export async function sendMessage(
cwd: workingPath,
env: envOverrides,
entry,
timeoutMs: 10 * 60 * 1000,
onRecord: (record) => {
publishKimiEvent(sessionId, record);
},
Expand Down
8 changes: 0 additions & 8 deletions packages/agents/kiro/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,19 +295,11 @@ async function runKiroCommand(

child.stderr?.on("data", (chunk) => stderrChunks.push(Buffer.from(chunk)));

const timeout = setTimeout(() => {
child.kill("SIGTERM");
reject(new Error("Kiro CLI timed out"));
}, 10 * 60 * 1000);

child.on("error", (err) => {
clearTimeout(timeout);
reject(err);
});

child.on("close", (code) => {
clearTimeout(timeout);

const stdout = Buffer.concat(stdoutChunks).toString("utf-8").trim();
const stderr = Buffer.concat(stderrChunks).toString("utf-8").trim();

Expand Down
1 change: 0 additions & 1 deletion packages/agents/qwen/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ export async function sendMessage(
cwd: workingPath,
env: envOverrides,
entry,
timeoutMs: 10 * 60 * 1000,
onRecord: (record) => {
const recordSessionId = getRecordSessionId(record, sessionId);
latestSessionId = recordSessionId;
Expand Down
20 changes: 13 additions & 7 deletions packages/agents/runtime/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type RunCliJsonCommandParams<TRecord> = {
cwd: string;
env: SessionEnvironment;
entry: ActiveRequestEntry;
timeoutMs: number;
timeoutMs?: number;
onRecord?: (record: TRecord) => void;
onSpawn?: (pid: number | undefined) => void;
onExit?: (code: number | null, signal: NodeJS.Signals | null) => void;
Expand Down Expand Up @@ -78,13 +78,19 @@ export async function runCliJsonCommand<TRecord>(params: RunCliJsonCommandParams

child.stderr?.on("data", (chunk) => stderrChunks.push(Buffer.from(chunk)));

const timeout = setTimeout(() => {
child.kill("SIGTERM");
reject(new Error(`${providerName} CLI timed out`));
}, timeoutMs);
const effectiveTimeoutMs =
typeof timeoutMs === "number" && Number.isFinite(timeoutMs) && timeoutMs > 0
? timeoutMs
: null;
const timeout = effectiveTimeoutMs === null
? null
: setTimeout(() => {
child.kill("SIGTERM");
reject(new Error(`${providerName} CLI timed out`));
}, effectiveTimeoutMs);

child.on("error", (err) => {
clearTimeout(timeout);
if (timeout) clearTimeout(timeout);
reject(err);
});

Expand All @@ -97,7 +103,7 @@ export async function runCliJsonCommand<TRecord>(params: RunCliJsonCommandParams
});

child.on("close", (code) => {
clearTimeout(timeout);
if (timeout) clearTimeout(timeout);
if (stdoutBuffer.trim().length > 0) {
flushLine(stdoutBuffer);
}
Expand Down
4 changes: 2 additions & 2 deletions packages/agents/test/cli-command.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ describe("agent cli command formatting", () => {

expect(command).toContain("codex exec --json --skip-git-repo-check");
expect(command).toContain("--json");
expect(command).toContain("--full-auto");
expect(command).toContain("--yolo");
expect(command).toContain("--model gpt-5-codex");
expect(command).toContain("session-3");
expect(command).toContain("'hello from codex'");
Expand All @@ -134,7 +134,7 @@ describe("agent cli command formatting", () => {
expect(command).toContain("codex exec --json --skip-git-repo-check");
expect(command).toContain("--json");
expect(command).toContain("--sandbox read-only");
expect(command).not.toContain("--full-auto");
expect(command).not.toContain("--yolo");
expect(command).toContain("session-3");
expect(command).toContain("'plan this change'");
});
Expand Down
22 changes: 22 additions & 0 deletions packages/core/kernel/request-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,28 @@ export async function runOpenRequest(
statusTs = updatedStatusTs;
request.statusMessageTs = updatedStatusTs;
}

const updateError = deps.im.takeUpdateError?.(context.channelId, statusTs);
if (updateError) {
const compactError = updateError.replace(/\s+/g, " ").trim().slice(0, 180);
const fallbackNotice = compactError.length > 0
? `Status update failed: ${compactError}`
: "Status update failed due to an unknown error.";
await deps.im.sendMessage(
context.channelId,
context.replyThreadId,
`${fallbackNotice}\nSwitching to a new status message below.`
);
const replacementStatusTs = await deps.im.sendMessage(
context.channelId,
context.replyThreadId,
statusText
);
if (typeof replacementStatusTs === "string" && replacementStatusTs.length > 0) {
statusTs = replacementStatusTs;
request.statusMessageTs = replacementStatusTs;
}
}
}
updateActiveRequest(context.channelId, context.threadId, {
statusMessageTs: request.statusMessageTs,
Expand Down
2 changes: 1 addition & 1 deletion packages/core/runtime/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export function categorizeRuntimeError(
): { message: string; suggestion: string } {
const errorStr = err instanceof Error ? err.message : String(err);

if (errorStr.includes("timeout") || errorStr.includes("ETIMEDOUT")) {
if (errorStr.includes("timeout") || errorStr.includes("timed out") || errorStr.includes("ETIMEDOUT")) {
return {
message: "Request timed out",
suggestion: "The operation took too long. Try a simpler request or break it into smaller steps.",
Expand Down
28 changes: 23 additions & 5 deletions packages/core/runtime/message-updates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export function createRateLimitedImAdapter(
): IMAdapter {
const rateLimitedMessages = new Set<string>();
const rateLimitErrors = new Map<string, string>();
const updateErrors = new Map<string, string>();

function key(channelId: string, messageTs: string): string {
return `${channelId}:${messageTs}`;
Expand All @@ -24,22 +25,25 @@ export function createRateLimitedImAdapter(
async ({ channelId, messageId }, text) => {
try {
const maybeUpdatedTs = await im.updateMessage(channelId, messageId, text);
updateErrors.delete(key(channelId, messageId));
return typeof maybeUpdatedTs === "string" ? maybeUpdatedTs : undefined;
} catch (error) {
const errorText = String(error);
const updateErrorKey = key(channelId, messageId);
updateErrors.set(updateErrorKey, errorText);
if (isRateLimitError(error)) {
const rateLimitKey = key(channelId, messageId);
rateLimitedMessages.add(rateLimitKey);
rateLimitErrors.set(rateLimitKey, String(error));
rateLimitedMessages.add(updateErrorKey);
rateLimitErrors.set(updateErrorKey, errorText);
log.warn("IM message update hit rate limit (429)", {
channelId,
messageTs: messageId,
error: String(error),
error: errorText,
});
}
log.debug("IM message update failed", {
channelId,
messageTs: messageId,
error: String(error),
error: errorText,
});
return undefined;
}
Expand All @@ -61,6 +65,20 @@ export function createRateLimitedImAdapter(
}
return rateLimitErrors.get(key(channelId, messageTs));
},
takeUpdateError: (channelId: string, messageTs: string): string | undefined => {
const updateErrorKey = key(channelId, messageTs);
if (typeof im.takeUpdateError === "function") {
const upstream = im.takeUpdateError(channelId, messageTs);
if (upstream) {
updateErrors.delete(updateErrorKey);
return upstream;
}
}
const captured = updateErrors.get(updateErrorKey);
if (!captured) return undefined;
updateErrors.delete(updateErrorKey);
return captured;
},
updateMessage: async (
channelId: string,
messageTs: string,
Expand Down
5 changes: 5 additions & 0 deletions packages/core/test/runtime-helpers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,9 @@ describe("runtime helpers", () => {
expect(result.message).toContain("http://127.0.0.1:4096");
expect(result.suggestion).toContain("OpenCode server");
});

it("categorizes timed out phrasing as timeout", () => {
const result = categorizeRuntimeError(new Error("Codex CLI timed out"));
expect(result.message).toBe("Request timed out");
});
});
71 changes: 63 additions & 8 deletions packages/core/test/runtime-resilience-e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,30 @@ async function withFastMessageUpdates<T>(run: () => Promise<T>): Promise<T> {
}

function createFakeIm(logs: {
sends: Array<{ channelId: string; threadId: string; text: string }>;
sends: Array<{ channelId: string; threadId: string; text: string; messageTs: string }>;
updates: Array<{ channelId: string; messageTs: string; text: string }>;
}, options?: {
failUpdateWith429?: boolean;
failUpdateWithErrorOnce?: string;
}): IMAdapter {
let nextTs = 0;
let failedOnce = false;
return {
sendMessage: async (channelId, threadId, text) => {
logs.sends.push({ channelId, threadId, text });
nextTs += 1;
return `ts-${nextTs}`;
const messageTs = `ts-${nextTs}`;
logs.sends.push({ channelId, threadId, text, messageTs });
return messageTs;
},
updateMessage: async (channelId, messageTs, text) => {
logs.updates.push({ channelId, messageTs, text });
if (options?.failUpdateWith429) {
throw new Error("429 rate limited");
}
if (!failedOnce && options?.failUpdateWithErrorOnce) {
failedOnce = true;
throw new Error(options.failUpdateWithErrorOnce);
}
},
deleteMessage: async () => {},
fetchThreadHistory: async () => null,
Expand Down Expand Up @@ -173,7 +180,7 @@ describe("core runtime resilience e2e", () => {
it("falls back to sending final message when status updates are rate-limited", async () => {
await withFastMessageUpdates(async () => {
const logs = { sends: [], updates: [] } as {
sends: Array<{ channelId: string; threadId: string; text: string }>;
sends: Array<{ channelId: string; threadId: string; text: string; messageTs: string }>;
updates: Array<{ channelId: string; messageTs: string; text: string }>;
};
const im = createFakeIm(logs, { failUpdateWith429: true });
Expand All @@ -200,18 +207,66 @@ describe("core runtime resilience e2e", () => {
messageId: context.messageId,
text: "trigger rate limit flow",
}));
await waitFor(() => logs.sends.some((entry) => entry.text === "final from agent"), 8000);
await waitFor(
() =>
logs.sends.some((entry) => entry.text === "final from agent")
|| logs.updates.some((entry) => entry.text === "final from agent"),
8000
);

expect(logs.updates.length).toBeGreaterThan(0);
expect(logs.sends.some((entry) => entry.text === "final from agent")).toBe(true);
expect(
logs.sends.some((entry) => entry.text === "final from agent")
|| logs.updates.some((entry) => entry.text === "final from agent")
).toBe(true);

deleteSession(context.channelId, context.threadId);
});
}, 15000);

it("reports status update errors and continues on a replacement status message", async () => {
await withFastMessageUpdates(async () => {
const logs = { sends: [], updates: [] } as {
sends: Array<{ channelId: string; threadId: string; text: string; messageTs: string }>;
updates: Array<{ channelId: string; messageTs: string; text: string }>;
};
const im = createFakeIm(logs, { failUpdateWithErrorOnce: "socket hang up" });
const { agent } = createFakeAgent({
delayMs: 1200,
responseText: "recovered output",
});
const runtime = createCoreRuntime({ platform: "slack", im, agent });
const channelId = uniqueId("CE2E-RECOVER-STATUS");
const threadId = uniqueId("TE2E-RECOVER-STATUS");

await runtime.handleInboundEvent(toInboundEvent({
channelId,
threadId,
userId: "UE2E-recover-status",
messageId: uniqueId("ME2E-recover-status"),
text: "trigger status replacement flow",
}));

await waitFor(
() => logs.sends.some((entry) => entry.text.startsWith("Status update failed:")),
5000
);

const fallbackNoticeIndex = logs.sends.findIndex((entry) => entry.text.startsWith("Status update failed:"));
const fallbackNotice = fallbackNoticeIndex >= 0 ? logs.sends[fallbackNoticeIndex] : undefined;
expect(fallbackNotice).toBeDefined();

const replacementStatus = fallbackNoticeIndex >= 0 ? logs.sends[fallbackNoticeIndex + 1] : undefined;
expect(replacementStatus).toBeDefined();
expect(logs.updates.some((entry) => entry.messageTs === replacementStatus!.messageTs)).toBe(true);

deleteSession(channelId, threadId);
});
}, 15000);

it("handles stop race in event-stream mode and still completes gracefully", async () => {
const logs = { sends: [], updates: [] } as {
sends: Array<{ channelId: string; threadId: string; text: string }>;
sends: Array<{ channelId: string; threadId: string; text: string; messageTs: string }>;
updates: Array<{ channelId: string; messageTs: string; text: string }>;
};
const im = createFakeIm(logs);
Expand Down Expand Up @@ -250,7 +305,7 @@ describe("core runtime resilience e2e", () => {
it("recovers pending in-flight requests after restart", async () => {
await withFastMessageUpdates(async () => {
const logs = { sends: [], updates: [] } as {
sends: Array<{ channelId: string; threadId: string; text: string }>;
sends: Array<{ channelId: string; threadId: string; text: string; messageTs: string }>;
updates: Array<{ channelId: string; messageTs: string; text: string }>;
};
const im = createFakeIm(logs);
Expand Down
1 change: 1 addition & 0 deletions packages/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export interface IMAdapter {
): Promise<string | undefined | void>;
wasRateLimited?(channelId: string, messageTs: string): boolean;
getRateLimitError?(channelId: string, messageTs: string): string | undefined;
takeUpdateError?(channelId: string, messageTs: string): string | undefined;
deleteMessage(channelId: string, messageTs: string): Promise<void>;
fetchThreadHistory(channelId: string, threadId: string, messageId: string): Promise<string | null>;
buildAgentContext(params: AgentContextBuilderParams): Promise<OpenCodeMessageContext>;
Expand Down