diff --git a/src/main.zig b/src/main.zig index be9f3c7..8a5359f 100644 --- a/src/main.zig +++ b/src/main.zig @@ -159,6 +159,7 @@ fn run(alloc: std.mem.Allocator, active_repo: *?[]const u8) void { break; } orelse break; g_use_headers = g_use_headers or message_uses_headers; + @import("notify.zig").init(g_use_headers); defer alloc.free(line); const input = std.mem.trim(u8, line, " \t\r\n"); diff --git a/src/notify.zig b/src/notify.zig new file mode 100644 index 0000000..8245388 --- /dev/null +++ b/src/notify.zig @@ -0,0 +1,79 @@ +// notify.zig — MCP progress notifications (keepalive heartbeats) +// +// Sends JSON-RPC notifications/message to the client while a long-running +// tool is executing. Thread-safe — worker threads in run_swarm all share +// the same global writer mutex. +// +// The MCP client (Claude Code / external bridges) uses these as proof that +// the server is still alive. Without periodic heartbeats, external MCP +// bridges time out long-running tool calls with MCP error -32001. +// +// Usage: +// notify.init(use_headers); // once, at startup +// notify.send(alloc, "agent 'explorer' running…"); // any thread + +const std = @import("std"); + +/// Call once at server startup with the framing mode in use. +pub fn init(use_headers: bool) void { + g_use_headers = use_headers; + g_ready = true; +} + +/// Send a notifications/message to the MCP client. +/// Thread-safe. No-ops silently if init() was not called. +pub fn send(alloc: std.mem.Allocator, message: []const u8) void { + if (!g_ready) return; + + // Build JSON-escaped message string + var escaped: std.ArrayList(u8) = .empty; + defer escaped.deinit(alloc); + escaped.append(alloc, '"') catch return; + for (message) |c| { + switch (c) { + '"' => escaped.appendSlice(alloc, "\\\"") catch return, + '\\' => escaped.appendSlice(alloc, "\\\\") catch return, + '\n' => escaped.appendSlice(alloc, "\\n") catch return, + '\r' => escaped.appendSlice(alloc, "\\r") catch return, + '\t' => escaped.appendSlice(alloc, "\\t") catch return, + else => escaped.append(alloc, c) catch return, + } + } + escaped.append(alloc, '"') catch return; + + const payload = std.fmt.allocPrint( + alloc, + "{{\"jsonrpc\":\"2.0\",\"method\":\"notifications/message\",\"params\":{{\"level\":\"info\",\"data\":{s}}}}}", + .{escaped.items}, + ) catch return; + defer alloc.free(payload); + + const stdout = std.fs.File.stdout(); + g_mutex.lock(); + defer g_mutex.unlock(); + + if (g_use_headers) { + const header = std.fmt.allocPrint( + alloc, "Content-Length: {d}\r\n\r\n", .{payload.len}, + ) catch return; + defer alloc.free(header); + stdout.writeAll(header) catch {}; + stdout.writeAll(payload) catch {}; + stdout.writeAll("\r\n") catch {}; + } else { + stdout.writeAll(payload) catch {}; + stdout.writeAll("\n") catch {}; + } +} + +// ── Globals ─────────────────────────────────────────────────────────────────── +var g_mutex: std.Thread.Mutex = .{}; +var g_use_headers: bool = false; +var g_ready: bool = false; + +// ── Tests ───────────────────────────────────────────────────────────────────── + +test "notify: init sets ready flag" { + init(false); + send(std.testing.allocator, "test message"); +} diff --git a/src/tools.zig b/src/tools.zig index 887eeb4..b359c62 100644 --- a/src/tools.zig +++ b/src/tools.zig @@ -2279,6 +2279,7 @@ fn runChainStep( step_out: *std.ArrayList(u8), ) void { const rt = @import("runtime.zig"); + const notify = @import("notify.zig"); const req: rt.AgentRequest = .{ .prompt = prompt, .role = role, @@ -2288,7 +2289,52 @@ fn runChainStep( }; const resolved = rt.resolve.resolveWithProbe(alloc, req); defer rt.prompts.freeAssembled(alloc, resolved.system_prompt); - rt.dispatch.dispatch(alloc, resolved, prompt, step_out); + + // Run dispatch on a worker thread so we can send heartbeat notifications + // on the main thread. Without heartbeats, external MCP bridges (which + // impose their own ~60s timeout) will kill the connection with -32001 + // even though the agent is still working. + const Ctx = struct { + a: std.mem.Allocator, + resolved: rt.ResolvedAgent, + p: []const u8, + out: *std.ArrayList(u8), + done: std.Thread.ResetEvent = .{}, + + fn work(ctx: *@This()) void { + rt.dispatch.dispatch(ctx.a, ctx.resolved, ctx.p, ctx.out); + ctx.done.set(); + } + }; + + const ctx = alloc.create(Ctx) catch { + step_out.appendSlice(alloc, "{\"error\":\"OOM allocating agent context\"}") catch {}; + return; + }; + defer alloc.destroy(ctx); + ctx.* = .{ .a = alloc, .resolved = resolved, .p = prompt, .out = step_out }; + + const thread = std.Thread.spawn(.{}, Ctx.work, .{ctx}) catch { + // Fallback: run synchronously (no heartbeat, but still works) + rt.dispatch.dispatch(alloc, resolved, prompt, step_out); + return; + }; + + // Send heartbeat every 15s while the agent is running. + // This keeps external MCP bridge connections alive. + const heartbeat_ns: u64 = 15 * std.time.ns_per_s; + var elapsed_s: u64 = 0; + while (true) { + if (ctx.done.timedWait(heartbeat_ns)) |_| { + break; // agent finished + } else |_| { + elapsed_s += 15; + var msg_buf: [128]u8 = undefined; + const msg = std.fmt.bufPrint(&msg_buf, "agent '{s}' running ({d}s elapsed)", .{ role, elapsed_s }) catch "agent running…"; + notify.send(alloc, msg); + } + } + thread.join(); } fn handleRunTask(