Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions cli/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -536,12 +536,32 @@
};
}

function waitForTerminalState(orchestrator, clusterId, intervalMs = 500) {
return new Promise((resolve) => {
const intervalId = setInterval(() => {
let terminal = false;
let statusError = false;
try {
const status = orchestrator.getStatus(clusterId);
terminal = status.state !== 'running';
} catch {
statusError = true;
}
if (terminal || statusError) {
clearInterval(intervalId);
resolve({ statusError });
}
}, intervalMs);
});
}

function waitForClusterCompletion(orchestrator, clusterId, cleanup) {
return new Promise((resolve) => {
let checkInterval;
const stopChecking = () => {
if (checkInterval) {
clearInterval(checkInterval);
checkInterval = null;
}
};
const removeSigint = setupForegroundSigintHandler({
Expand Down Expand Up @@ -571,6 +591,11 @@
});
}

async function waitForDaemonCompletion(orchestrator, clusterId) {
await waitForTerminalState(orchestrator, clusterId);
console.log(`[DAEMON] Cluster ${clusterId} reached terminal state`);
}

async function streamClusterInForeground(cluster, orchestrator, clusterId) {
const sendersWithOutput = new Set();
const processedMessageIds = new Set();
Expand Down Expand Up @@ -2347,7 +2372,7 @@
Force provider flags: -G (GitHub), -L (GitLab), -J (Jira), -D (DevOps)
`
)
.action(async (inputArg, options) => {

Check warning on line 2375 in cli/index.js

View workflow job for this annotation

GitHub Actions / check

Async arrow function has a complexity of 21. Maximum allowed is 20

Check warning on line 2375 in cli/index.js

View workflow job for this annotation

GitHub Actions / check

Refactor this function to reduce its Cognitive Complexity from 32 to the 15 allowed
try {
// Normalize options (--ship → --pr → --worktree flags)
normalizeRunOptions(options);
Expand Down Expand Up @@ -2465,11 +2490,19 @@
throw new Error('Invalid run input: expected text, issue, or file');
}

if (!process.env.ZEROSHOT_DAEMON) {
if (process.env.ZEROSHOT_DAEMON) {
setupDaemonCleanup(orchestrator, clusterId);
await waitForDaemonCompletion(orchestrator, clusterId);
} else {
await streamClusterInForeground(cluster, orchestrator, clusterId);
}

setupDaemonCleanup(orchestrator, clusterId);
try {
await orchestrator.shutdown();
} catch (shutdownErr) {
console.error(`[Orchestrator] shutdown error: ${shutdownErr.message}`);
}
process.exit(0);
} catch (error) {
console.error('Error:', error.message);
process.exit(1);
Expand Down Expand Up @@ -2760,7 +2793,7 @@
.command('kill-all')
.description('Kill all running tasks and clusters')
.option('-y, --yes', 'Skip confirmation')
.action(async (options) => {

Check warning on line 2796 in cli/index.js

View workflow job for this annotation

GitHub Actions / check

Refactor this function to reduce its Cognitive Complexity from 26 to the 15 allowed
try {
// Get counts first
const orchestrator = await getOrchestrator();
Expand Down Expand Up @@ -2985,7 +3018,7 @@
.command('resume <id> [prompt]')
.description('Resume a failed task or cluster')
.option('-d, --detach', 'Resume in background (daemon mode)')
.action(async (id, prompt, options) => {

Check warning on line 3021 in cli/index.js

View workflow job for this annotation

GitHub Actions / check

Refactor this function to reduce its Cognitive Complexity from 19 to the 15 allowed
try {
// Try cluster first, then task (both use same ID format: "adjective-noun-number")
const OrchestratorModule = require('../src/orchestrator');
Expand Down Expand Up @@ -4155,7 +4188,7 @@
}
});

function outputAgent(agent, options) {

Check warning on line 4191 in cli/index.js

View workflow job for this annotation

GitHub Actions / check

Refactor this function to reduce its Cognitive Complexity from 19 to the 15 allowed
if (options.json) {
console.log(JSON.stringify(agent, null, 2));
return;
Expand Down Expand Up @@ -4323,7 +4356,7 @@
}

// Format tool result for display
function formatToolResult(content, isError, toolName, toolInput) {

Check warning on line 4359 in cli/index.js

View workflow job for this annotation

GitHub Actions / check

Refactor this function to reduce its Cognitive Complexity from 17 to the 15 allowed
if (!content) return isError ? 'error' : 'done';

// For errors, show full message
Expand Down Expand Up @@ -4924,7 +4957,7 @@

// Accumulate text and print complete lines only
// Word wrap long lines, aligning continuation with message column
function accumulateText(prefix, sender, text) {

Check warning on line 4960 in cli/index.js

View workflow job for this annotation

GitHub Actions / check

Refactor this function to reduce its Cognitive Complexity from 17 to the 15 allowed
if (!text) return;
const buf = getLineBuffer(sender);

Expand Down Expand Up @@ -4987,7 +5020,7 @@
}

// Stream thinking text immediately with word wrapping
function accumulateThinking(prefix, sender, text) {

Check warning on line 5023 in cli/index.js

View workflow job for this annotation

GitHub Actions / check

Refactor this function to reduce its Cognitive Complexity from 25 to the 15 allowed
if (!text) return;
const buf = getLineBuffer(sender);

Expand Down Expand Up @@ -5046,7 +5079,7 @@
}

// Flush pending content - just add newline if we have pending text
function flushLineBuffer(prefix, sender) {

Check warning on line 5082 in cli/index.js

View workflow job for this annotation

GitHub Actions / check

Refactor this function to reduce its Cognitive Complexity from 18 to the 15 allowed
const buf = lineBuffers.get(sender);
if (!buf) return;

Expand Down
3 changes: 3 additions & 0 deletions src/message-bus.js
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ class MessageBus extends EventEmitter {
}
this.wsClients.clear();

// Drop subscribers so lingering handlers don't keep refs alive
this.removeAllListeners();

// Close ledger
this.ledger.close();
}
Expand Down
46 changes: 46 additions & 0 deletions src/orchestrator.js
Original file line number Diff line number Diff line change
Expand Up @@ -1965,6 +1965,52 @@ class Orchestrator {
this.closed = true;
}

/**
* Full orchestrator shutdown — release every resource that could keep
* the event loop alive after terminal cluster state. Safe to call after
* cluster.stop() / kill() have already run.
*/
async shutdown() {
// `await` a zero-tick promise so the method keeps its async contract
// (future resource closers may be truly async) without tripping
// require-await today.
await Promise.resolve();
this.closed = true;

for (const cluster of this.clusters.values()) {
try {
if (cluster._maxIterSafetyTimeout) {
clearTimeout(cluster._maxIterSafetyTimeout);
cluster._maxIterSafetyTimeout = null;
}

if (cluster.snapshotter) {
try {
cluster.snapshotter.stop();
} catch {
// Already stopped
}
}

if (cluster.messageBus && typeof cluster.messageBus.close === 'function') {
try {
cluster.messageBus.close();
} catch {
// Ledger may already be closed
}
} else if (cluster.ledger && typeof cluster.ledger.close === 'function') {
try {
cluster.ledger.close();
} catch {
// Already closed
}
}
} catch (error) {
this._log(`[Orchestrator] shutdown: error releasing ${cluster.id}: ${error.message}`);
}
}
}

/**
* Find the last workflow-triggering message in the ledger
* Workflow triggers indicate cluster state progression (not AGENT_OUTPUT noise)
Expand Down
139 changes: 139 additions & 0 deletions tests/integration/daemon-exit-on-complete.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
process.env.ZEROSHOT_SKIP_GH_VERIFY = '1';

/**
* Regression test for cluster-run processes that never exit after
* terminal cluster state. Covers the shared `waitForTerminalState`
* helper and `orchestrator.shutdown()` release path used by both
* foreground and daemon modes.
*/

const assert = require('node:assert');
const path = require('path');
const fs = require('fs');
const os = require('os');

const Orchestrator = require('../../src/orchestrator');
const MockTaskRunner = require('../helpers/mock-task-runner');

const simpleConfig = {
agents: [
{
id: 'worker',
role: 'implementation',
timeout: 0,
triggers: [{ topic: 'ISSUE_OPENED', action: 'execute_task' }],
prompt: 'Do the thing',
hooks: {
onComplete: {
action: 'publish_message',
config: { topic: 'TASK_COMPLETE', content: { text: 'Done' } },
},
},
},
{
id: 'completion-detector',
role: 'orchestrator',
timeout: 0,
triggers: [{ topic: 'TASK_COMPLETE', action: 'stop_cluster' }],
},
],
};

function waitForTerminalState(orchestrator, clusterId, intervalMs = 50) {
return new Promise((resolve) => {
const intervalId = setInterval(() => {
try {
const status = orchestrator.getStatus(clusterId);
if (status.state !== 'running') {
clearInterval(intervalId);
resolve();
}
} catch {
clearInterval(intervalId);
resolve();
}
}, intervalMs);
});
}

describe('Daemon / foreground exit after terminal state', function () {
this.timeout(15000);

let tempDir;
let orchestrator;
let mockRunner;

beforeEach(() => {
tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'zeroshot-daemon-exit-'));
mockRunner = new MockTaskRunner();
mockRunner.when('worker').returns('{"done": true}');
orchestrator = new Orchestrator({
quiet: true,
storageDir: tempDir,
taskRunner: mockRunner,
});
});

afterEach(async () => {
if (orchestrator) {
const clusters = orchestrator.listClusters();
for (const cluster of clusters) {
try {
await orchestrator.kill(cluster.id);
} catch {
// Ignore cleanup errors
}
}
try {
await orchestrator.shutdown();
} catch {
// Already closed
}
}
if (tempDir && fs.existsSync(tempDir)) {
fs.rmSync(tempDir, { recursive: true, force: true });
}
});

it('waitForTerminalState resolves once cluster reaches stopped', async () => {
const result = await orchestrator.start(simpleConfig, { text: 'Test' });
const clusterId = result.id;

const start = Date.now();
await waitForTerminalState(orchestrator, clusterId);
const elapsed = Date.now() - start;

const status = orchestrator.getStatus(clusterId);
assert.notStrictEqual(status.state, 'running', 'cluster should have left running');
assert.ok(elapsed < 10000, `terminal state reached in ${elapsed}ms`);
});

it('orchestrator.shutdown() releases message bus / ledger after stop', async () => {
const result = await orchestrator.start(simpleConfig, { text: 'Test' });
const clusterId = result.id;
const cluster = orchestrator.getCluster(clusterId);

await waitForTerminalState(orchestrator, clusterId);

assert.strictEqual(cluster.ledger._closed, false, 'ledger open after stop (resume path)');

await orchestrator.shutdown();

assert.strictEqual(orchestrator.closed, true, 'closed flag set');
assert.strictEqual(cluster.ledger._closed, true, 'ledger closed by shutdown');
const post = cluster.messageBus.publish({
cluster_id: clusterId,
topic: 'POST_SHUTDOWN',
sender: 'test',
});
assert.strictEqual(post, null, 'publish after shutdown drops (ledger closed)');
});

it('shutdown is idempotent', async () => {
const result = await orchestrator.start(simpleConfig, { text: 'Test' });
await waitForTerminalState(orchestrator, result.id);

await orchestrator.shutdown();
await orchestrator.shutdown();
});
});
Loading