Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions packages/dmoss-agent/src/core/tools/execute-tool-call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ export async function executeOneToolCall(
let attemptErrFlag = false;
let attemptText = '';
let attemptTimeout = false;
// Reset per-attempt structured output so a retry that rejects can't surface
// a previous attempt's blocks alongside this attempt's error text.
structuredBlocks = undefined;
const timeoutAbortCtrl = new AbortController();

try {
Expand Down Expand Up @@ -378,16 +381,18 @@ export async function executeOneToolCall(
logger.debug(
`[execute-tool-call] retry #${retriesUsed}/${MAX_RETRY_ATTEMPTS} for ${call.name}(${call.id}) after ${delayMs}ms: ${rawMsg.slice(0, 120)}`,
);
// Abortable backoff — abort immediately cancels the wait
// Abortable backoff — abort immediately cancels the wait. The abort
// listener must be removed on the normal-completion path too, otherwise
// it leaks on the long-lived run signal across every retry.
let backoffTimer: ReturnType<typeof setTimeout> | undefined;
await Promise.race([
new Promise<void>((resolve) => { backoffTimer = setTimeout(resolve, delayMs); }),
abortable(
new Promise<never>(() => {}),
deps.abortSignal,
).catch(() => {}),
]);
let onBackoffAbort: (() => void) | undefined;
await new Promise<void>((resolve) => {
backoffTimer = setTimeout(resolve, delayMs);
onBackoffAbort = () => resolve();
deps.abortSignal.addEventListener('abort', onBackoffAbort, { once: true });
});
if (backoffTimer) clearTimeout(backoffTimer);
if (onBackoffAbort) deps.abortSignal.removeEventListener('abort', onBackoffAbort);
// Re-check abort after backoff; if aborted, break with cancelled path
if (deps.abortSignal.aborted) {
aborted = { by: 'user' };
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { describe, it } from 'node:test';
import assert from 'node:assert/strict';
import { getEventListeners } from 'node:events';
import { executeOneToolCall } from '../dist/core/tools/execute-tool-call.js';

describe('execute-tool retry backoff listener hygiene', () => {
it('does not leak abort listeners on the run signal across retries', async () => {
const runAbort = new AbortController();
let attempt = 0;
const tool = {
name: 'always_transient_probe',
description: 'always fails transiently to exercise both retry backoffs',
metadata: { transientRetry: true },
inputSchema: { type: 'object', properties: {} },
async execute() {
attempt++;
throw new Error('connection reset by peer');
},
};

const outcome = await executeOneToolCall(
{ id: 'call-leak', name: 'always_transient_probe', input: {} },
{
toolsForRun: [tool],
toolCtx: { workspaceDir: process.cwd(), sessionKey: 's' },
sessionKey: 's',
abortSignal: runAbort.signal,
toolTimeoutMs: 2_000,
enableHeartbeat: false,
heartbeatIntervalMs: 1_000,
skipHeartbeatToolNames: new Set(),
push: () => {},
},
);

assert.equal(outcome.kind, 'completed');
assert.equal(outcome.isError, true);
// Two backoff waits (500ms + 1500ms) ran and completed normally; neither
// should leave an abort listener registered on the long-lived run signal.
assert.ok(attempt >= 3, 'all retry attempts must have run');
const listeners = getEventListeners(runAbort.signal, 'abort').length;
assert.equal(
listeners,
0,
`run signal must have no leaked abort listeners, found ${listeners}`,
);
});
});
53 changes: 53 additions & 0 deletions packages/dmoss-agent/test/execute-tool-retry-structured.spec.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { describe, it } from 'node:test';
import assert from 'node:assert/strict';
import { executeOneToolCall } from '../dist/core/tools/execute-tool-call.js';

describe('execute-tool transient retry + structured content', () => {
it('does not surface stale structuredContent when a retry attempt rejects', async () => {
let attempt = 0;
const tool = {
name: 'flaky_structured_probe',
description: 'structured tool that errors transiently then rejects',
metadata: { transientRetry: true, sideEffectClass: 'readonly' },
inputSchema: { type: 'object', properties: {} },
async executeStructured() {
attempt++;
if (attempt === 1) {
// First attempt: structured isError with a transient message -> eligible for retry
return {
content: [{ type: 'text', text: 'connection reset by peer' }],
isError: true,
};
}
// Retry attempt: reject (no structured content produced)
throw new Error('connection reset by peer');
},
};

const outcome = await executeOneToolCall(
{ id: 'call-flaky', name: 'flaky_structured_probe', input: {} },
{
toolsForRun: [tool],
toolCtx: { workspaceDir: process.cwd(), sessionKey: 's' },
sessionKey: 's',
abortSignal: new AbortController().signal,
toolTimeoutMs: 2_000,
enableHeartbeat: false,
heartbeatIntervalMs: 1_000,
skipHeartbeatToolNames: new Set(),
push: () => {},
},
);

assert.equal(outcome.kind, 'completed');
assert.equal(outcome.isError, true);
// The retry rejected and produced no structured content, so the result must
// not carry the first attempt's blocks alongside the retry's error text.
assert.equal(
outcome.structuredContent,
undefined,
'stale structuredContent from a prior attempt must not be returned',
);
assert.ok(attempt >= 2, 'test must exercise the retry path');
});
});
Loading