Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ shared by two drivers so there is ONE rules layer, not a Claude copy and a non-C
`synthesizeAlways:true` runs a SINGLE arbiter synthesis pass instead of the loop (free-text
`synthesis`, for open questions) - one unified tool, one return envelope (split `verdict`/`synthesis`,
loop-only fields nullable). For non-Claude hosts that want the loop without driving it.
Per round, the arbiter's adjudication and revision calls run CONCURRENTLY when a peer
dissents (which guarantees the round cannot converge, so the revision is always used);
on an all-approve round only adjudication runs. Same external-call count as a serial loop
in every outcome, one serial arbiter leg saved per dissent round.
- **`consensus-step`** (MCP tool) - the host model (Claude) is the arbiter and drives ONE action per
call (`init / record_blind / dispatch_peers / submit_adjudication / submit_revision`); `LoopState`
is held server-side in the ephemeral `loop-store` by `sessionId`. The live `/consensus` slash command
Expand Down
54 changes: 44 additions & 10 deletions core/orchestrate.js
Original file line number Diff line number Diff line change
Expand Up @@ -335,13 +335,47 @@ async function runToConvergence(providers, req, opts = {}) {
);
state = loop.addOpinions(state, lastResults);

// Peer dissent => this round CANNOT converge (checkConvergence requires every
// responding peer to APPROVE), so the revision is GUARANTEED needed: overlap it
// with adjudication for free. All-approve => the round may converge, so do NOT
// speculate a revision we might discard. Net: same arbiter-call COUNT as the
// serial loop in every outcome, one serial leg saved per dissent (non-final) round.
//
// CONTRACT this parallelization relies on (revert to serial if either breaks):
// 1. buildRevisionPrompt depends only on state.currentPlan + the peer results,
// NOT on the arbiter's adjudicated verdict.
// 2. loop.submitAdjudication does not mutate state.currentPlan, so the revision
// prompt is identical whether built before or after adjudication.
const peerDissent = lastResults.some((r) => !r.isError && r.verdict !== "APPROVE");
// Isolate a whole arbiter branch (ask + parse) so neither a synchronous throw, a
// rejected ask, nor a parse fault can reject Promise.all - matching the per-call
// try/catch scope of the serial version and the blind-pass wrapper above. The
// adjudication/revision passes are intentionally NOT oriented (unlike the blind
// pass): the arbiter reasons over inlined peer text, not the cold repo.
const askIsolated = (/** @type {string} */ prompt) =>
Promise.resolve().then(() => arbiter.ask({ ...req, prompt })).then((r) => r, () => null);
/** @param {(DelegationResult|null)} res @returns {"APPROVE"|"REQUEST_CHANGES"|"REJECT"} */
const verdictFrom = (res) => {
const t = okText(res);
if (!t) return "REQUEST_CHANGES"; // null/empty -> hold (the serial default)
try { return parseReview(t).verdict || "REQUEST_CHANGES"; } catch { return "REQUEST_CHANGES"; }
};

/** @type {"APPROVE"|"REQUEST_CHANGES"|"REJECT"} */
let verdict = "REQUEST_CHANGES";
try {
const adj = await arbiter.ask({ ...req, prompt: buildAdjudicationPrompt(state, lastResults) });
const t = okText(adj);
if (t) { const p = parseReview(t); if (p.verdict) verdict = p.verdict; }
} catch { /* arbiter adjudication failed -> hold at REQUEST_CHANGES */ }
let revised = state.currentPlan;
if (peerDissent) {
// Guaranteed non-final: adjudication || revision, both used (no waste).
const [adjRes, revRes] = await Promise.all([
askIsolated(buildAdjudicationPrompt(state, lastResults)),
askIsolated(buildRevisionPrompt(state, lastResults)),
]);
verdict = verdictFrom(adjRes);
revised = okText(revRes) || state.currentPlan;
} else {
// May converge: adjudication only - do not burn a revision call we might discard.
verdict = verdictFrom(await askIsolated(buildAdjudicationPrompt(state, lastResults)));
}
state = loop.submitAdjudication(state, { verdict, decisions: [] });
try {
logger.logEvent({
Expand All @@ -353,11 +387,11 @@ async function runToConvergence(providers, req, opts = {}) {
} catch { /* logging must never break the loop */ }
if (state.status === "converged") break;

let revised = state.currentPlan;
try {
const rev = await arbiter.ask({ ...req, prompt: buildRevisionPrompt(state, lastResults) });
revised = okText(rev) || state.currentPlan;
} catch { /* keep the current plan */ }
if (!peerDissent) {
// Rare: all peers APPROVED but the arbiter blocked -> revise now (serial; there
// was nothing to overlap, since convergence was still possible at fan-out time).
revised = okText(await askIsolated(buildRevisionPrompt(state, lastResults))) || state.currentPlan;
}
state = loop.submitRevision(state, revised, "arbiter revision");
}
} catch (e) {
Expand Down
55 changes: 55 additions & 0 deletions test/core-run-to-convergence.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,58 @@ test("RC8: omitted maxRounds still terminates (defaults to 5) -> unresolved on p
assert.equal(out.confidence, "none");
assert.equal(out.rounds.length, 5); // defaulted to 5 and terminated
});

test("RC9: dissent in round 1 -> the round-1 revision propagates into round 2's plan-under-review", async () => {
/** @type {string[]} */
const seen = [];
const peers = [stub("gpt", (p) => { seen.push(p); return p.includes("REVISED") ? "**Verdict**: APPROVE" : "**Verdict**: REQUEST_CHANGES\n- [scope] needs detail"; })];
const out = await runToConvergence(peers, REQ, { arbiter: smartArbiter(), maxRounds: 5 });
assert.equal(out.converged, true); // round 2, after the revision lands
// The dissent round's revision ("REVISED plan...") must appear in a later peer prompt.
assert.ok(seen.some((p) => p.includes("Round 2 of 5") && p.includes("REVISED")), "round-2 plan-under-review should contain the round-1 revision");
});

test("RC10: round-1 all-APPROVE dispatches NO revision call (dissent gate avoids waste)", async () => {
/** @type {string[]} */
const arbPrompts = [];
const arbiter = stub("arb", (p) => { arbPrompts.push(p); if (p.includes("ADJUDICATE")) return "**Verdict**: APPROVE"; if (p.includes("REVISE")) return "REVISED"; return "**Verdict**: APPROVE"; });
const peers = [stub("gpt", () => "**Verdict**: APPROVE"), stub("gemini", () => "**Verdict**: APPROVE")];
const out = await runToConvergence(peers, REQ, { arbiter });
assert.equal(out.converged, true);
assert.equal(out.rounds.length, 0); // converged round 1, no revision recorded
assert.ok(!arbPrompts.some((p) => p.includes("REVISE")), "no REVISE prompt should be dispatched on an all-approve converging round");
});

test("RC11: a revision leg that throws SYNCHRONOUSLY on a dissent round is isolated (no reject)", async () => {
// Dissent => parallel branch fires adjudication ∥ revision; the arbiter throws
// synchronously on the REVISE prompt, before returning a promise.
const arbiter = {
name: "arb",
capabilities: { canImplement: false, fileUpload: false, multiTurn: false },
health: async () => ({ ok: true }),
/** @param {{prompt:string}} req @returns {any} */
ask: (req) => {
if (req.prompt.includes("REVISE")) throw new Error("sync revise throw");
return Promise.resolve({ provider: "arb", model: "s", isError: false, text: "**Verdict**: REQUEST_CHANGES", ms: 1 });
},
};
const peers = [stub("gpt", () => "**Verdict**: REQUEST_CHANGES\n- [ops] x")];
/** @type {any} */
let out;
await assert.doesNotReject(async () => { out = await runToConvergence(peers, REQ, { arbiter, maxRounds: 2 }); });
assert.equal(out.converged, false); // revision threw -> plan never changes -> peer keeps dissenting
assert.equal(out.rounds.length, 2); // both rounds still ran despite the throwing revision leg
});

test("RC12: all peers APPROVE but arbiter blocks -> serial revision runs (the !peerDissent post-break path)", async () => {
/** @type {string[]} */
const arbPrompts = [];
// No peer dissent, but the arbiter REQUEST_CHANGES on adjudication, so the round
// cannot converge and the SERIAL revision branch must fire.
const arbiter = stub("arb", (p) => { arbPrompts.push(p); if (p.includes("ADJUDICATE")) return "**Verdict**: REQUEST_CHANGES"; if (p.includes("REVISE")) return "REVISED"; return "**Verdict**: APPROVE"; });
const peers = [stub("gpt", () => "**Verdict**: APPROVE")];
const out = await runToConvergence(peers, REQ, { arbiter, maxRounds: 2 });
assert.equal(out.converged, false); // arbiter never approves
assert.equal(out.rounds.length, 2);
assert.ok(arbPrompts.some((p) => p.includes("REVISE")), "serial revision should run when all peers approve but the arbiter blocks");
});
Loading