From dad09519efd03b3207a2e10076642825f2c899c8 Mon Sep 17 00:00:00 2001 From: mark-dingwall Date: Thu, 16 Apr 2026 03:49:13 +1000 Subject: [PATCH] fix: exit cluster run process after terminal state (foreground + daemon) Both `zeroshot run` and `zeroshot run -d` kept their process alive indefinitely after CLUSTER_COMPLETE. Foreground hung at the shell prompt; daemon mode accumulated orphaned processes because there was no completion-wait path at all. - orchestrator: add `async shutdown()` that clears safety timeouts, stops snapshotters, and closes each cluster's messageBus/ledger. Legacy `close()` flag-only path kept for backwards-compat callers. - message-bus: `close()` now removes EventEmitter listeners before the ledger closes so lingering subscribers can't pin the instance. - cli/index.js: extract `waitForTerminalState` polling helper used by both modes. Add `waitForDaemonCompletion`; daemon path now waits, shuts down, and exits. Foreground path shuts down after stream ends. - tests: new integration/daemon-exit-on-complete.test.js covers terminal-state wait, shutdown releasing ledger, and idempotency. Co-Authored-By: Claude Opus 4.6 (1M context) --- cli/index.js | 37 ++++- src/message-bus.js | 3 + src/orchestrator.js | 46 ++++++ .../daemon-exit-on-complete.test.js | 139 ++++++++++++++++++ 4 files changed, 223 insertions(+), 2 deletions(-) create mode 100644 tests/integration/daemon-exit-on-complete.test.js diff --git a/cli/index.js b/cli/index.js index be344ca1..375f0f23 100755 --- a/cli/index.js +++ b/cli/index.js @@ -536,12 +536,32 @@ function setupForegroundSigintHandler({ orchestrator, clusterId, cleanup, stopCh }; } +function waitForTerminalState(orchestrator, clusterId, intervalMs = 500) { + return new Promise((resolve) => { + const intervalId = setInterval(() => { + let terminal = false; + let statusError = false; + try { + const status = orchestrator.getStatus(clusterId); + terminal = status.state !== 'running'; + } catch { + statusError = true; + } + if (terminal || statusError) { + clearInterval(intervalId); + resolve({ statusError }); + } + }, intervalMs); + }); +} + function waitForClusterCompletion(orchestrator, clusterId, cleanup) { return new Promise((resolve) => { let checkInterval; const stopChecking = () => { if (checkInterval) { clearInterval(checkInterval); + checkInterval = null; } }; const removeSigint = setupForegroundSigintHandler({ @@ -571,6 +591,11 @@ function waitForClusterCompletion(orchestrator, clusterId, cleanup) { }); } +async function waitForDaemonCompletion(orchestrator, clusterId) { + await waitForTerminalState(orchestrator, clusterId); + console.log(`[DAEMON] Cluster ${clusterId} reached terminal state`); +} + async function streamClusterInForeground(cluster, orchestrator, clusterId) { const sendersWithOutput = new Set(); const processedMessageIds = new Set(); @@ -2465,11 +2490,19 @@ Force provider flags: -G (GitHub), -L (GitLab), -J (Jira), -D (DevOps) throw new Error('Invalid run input: expected text, issue, or file'); } - if (!process.env.ZEROSHOT_DAEMON) { + if (process.env.ZEROSHOT_DAEMON) { + setupDaemonCleanup(orchestrator, clusterId); + await waitForDaemonCompletion(orchestrator, clusterId); + } else { await streamClusterInForeground(cluster, orchestrator, clusterId); } - setupDaemonCleanup(orchestrator, clusterId); + try { + await orchestrator.shutdown(); + } catch (shutdownErr) { + console.error(`[Orchestrator] shutdown error: ${shutdownErr.message}`); + } + process.exit(0); } catch (error) { console.error('Error:', error.message); process.exit(1); diff --git a/src/message-bus.js b/src/message-bus.js index 3f0dc104..5571d1f1 100644 --- a/src/message-bus.js +++ b/src/message-bus.js @@ -241,6 +241,9 @@ class MessageBus extends EventEmitter { } this.wsClients.clear(); + // Drop subscribers so lingering handlers don't keep refs alive + this.removeAllListeners(); + // Close ledger this.ledger.close(); } diff --git a/src/orchestrator.js b/src/orchestrator.js index 06e13780..36c893e6 100644 --- a/src/orchestrator.js +++ b/src/orchestrator.js @@ -1965,6 +1965,52 @@ class Orchestrator { this.closed = true; } + /** + * Full orchestrator shutdown — release every resource that could keep + * the event loop alive after terminal cluster state. Safe to call after + * cluster.stop() / kill() have already run. + */ + async shutdown() { + // `await` a zero-tick promise so the method keeps its async contract + // (future resource closers may be truly async) without tripping + // require-await today. + await Promise.resolve(); + this.closed = true; + + for (const cluster of this.clusters.values()) { + try { + if (cluster._maxIterSafetyTimeout) { + clearTimeout(cluster._maxIterSafetyTimeout); + cluster._maxIterSafetyTimeout = null; + } + + if (cluster.snapshotter) { + try { + cluster.snapshotter.stop(); + } catch { + // Already stopped + } + } + + if (cluster.messageBus && typeof cluster.messageBus.close === 'function') { + try { + cluster.messageBus.close(); + } catch { + // Ledger may already be closed + } + } else if (cluster.ledger && typeof cluster.ledger.close === 'function') { + try { + cluster.ledger.close(); + } catch { + // Already closed + } + } + } catch (error) { + this._log(`[Orchestrator] shutdown: error releasing ${cluster.id}: ${error.message}`); + } + } + } + /** * Find the last workflow-triggering message in the ledger * Workflow triggers indicate cluster state progression (not AGENT_OUTPUT noise) diff --git a/tests/integration/daemon-exit-on-complete.test.js b/tests/integration/daemon-exit-on-complete.test.js new file mode 100644 index 00000000..7b790199 --- /dev/null +++ b/tests/integration/daemon-exit-on-complete.test.js @@ -0,0 +1,139 @@ +process.env.ZEROSHOT_SKIP_GH_VERIFY = '1'; + +/** + * Regression test for cluster-run processes that never exit after + * terminal cluster state. Covers the shared `waitForTerminalState` + * helper and `orchestrator.shutdown()` release path used by both + * foreground and daemon modes. + */ + +const assert = require('node:assert'); +const path = require('path'); +const fs = require('fs'); +const os = require('os'); + +const Orchestrator = require('../../src/orchestrator'); +const MockTaskRunner = require('../helpers/mock-task-runner'); + +const simpleConfig = { + agents: [ + { + id: 'worker', + role: 'implementation', + timeout: 0, + triggers: [{ topic: 'ISSUE_OPENED', action: 'execute_task' }], + prompt: 'Do the thing', + hooks: { + onComplete: { + action: 'publish_message', + config: { topic: 'TASK_COMPLETE', content: { text: 'Done' } }, + }, + }, + }, + { + id: 'completion-detector', + role: 'orchestrator', + timeout: 0, + triggers: [{ topic: 'TASK_COMPLETE', action: 'stop_cluster' }], + }, + ], +}; + +function waitForTerminalState(orchestrator, clusterId, intervalMs = 50) { + return new Promise((resolve) => { + const intervalId = setInterval(() => { + try { + const status = orchestrator.getStatus(clusterId); + if (status.state !== 'running') { + clearInterval(intervalId); + resolve(); + } + } catch { + clearInterval(intervalId); + resolve(); + } + }, intervalMs); + }); +} + +describe('Daemon / foreground exit after terminal state', function () { + this.timeout(15000); + + let tempDir; + let orchestrator; + let mockRunner; + + beforeEach(() => { + tempDir = fs.mkdtempSync(path.join(os.tmpdir(), 'zeroshot-daemon-exit-')); + mockRunner = new MockTaskRunner(); + mockRunner.when('worker').returns('{"done": true}'); + orchestrator = new Orchestrator({ + quiet: true, + storageDir: tempDir, + taskRunner: mockRunner, + }); + }); + + afterEach(async () => { + if (orchestrator) { + const clusters = orchestrator.listClusters(); + for (const cluster of clusters) { + try { + await orchestrator.kill(cluster.id); + } catch { + // Ignore cleanup errors + } + } + try { + await orchestrator.shutdown(); + } catch { + // Already closed + } + } + if (tempDir && fs.existsSync(tempDir)) { + fs.rmSync(tempDir, { recursive: true, force: true }); + } + }); + + it('waitForTerminalState resolves once cluster reaches stopped', async () => { + const result = await orchestrator.start(simpleConfig, { text: 'Test' }); + const clusterId = result.id; + + const start = Date.now(); + await waitForTerminalState(orchestrator, clusterId); + const elapsed = Date.now() - start; + + const status = orchestrator.getStatus(clusterId); + assert.notStrictEqual(status.state, 'running', 'cluster should have left running'); + assert.ok(elapsed < 10000, `terminal state reached in ${elapsed}ms`); + }); + + it('orchestrator.shutdown() releases message bus / ledger after stop', async () => { + const result = await orchestrator.start(simpleConfig, { text: 'Test' }); + const clusterId = result.id; + const cluster = orchestrator.getCluster(clusterId); + + await waitForTerminalState(orchestrator, clusterId); + + assert.strictEqual(cluster.ledger._closed, false, 'ledger open after stop (resume path)'); + + await orchestrator.shutdown(); + + assert.strictEqual(orchestrator.closed, true, 'closed flag set'); + assert.strictEqual(cluster.ledger._closed, true, 'ledger closed by shutdown'); + const post = cluster.messageBus.publish({ + cluster_id: clusterId, + topic: 'POST_SHUTDOWN', + sender: 'test', + }); + assert.strictEqual(post, null, 'publish after shutdown drops (ledger closed)'); + }); + + it('shutdown is idempotent', async () => { + const result = await orchestrator.start(simpleConfig, { text: 'Test' }); + await waitForTerminalState(orchestrator, result.id); + + await orchestrator.shutdown(); + await orchestrator.shutdown(); + }); +});