Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ fn run(alloc: std.mem.Allocator, active_repo: *?[]const u8) void {
break;
} orelse break;
g_use_headers = g_use_headers or message_uses_headers;
@import("notify.zig").init(g_use_headers);
defer alloc.free(line);

const input = std.mem.trim(u8, line, " \t\r\n");
Expand Down
79 changes: 79 additions & 0 deletions src/notify.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// notify.zig — MCP progress notifications (keepalive heartbeats)
//
// Sends JSON-RPC notifications/message to the client while a long-running
// tool is executing. Thread-safe — worker threads in run_swarm all share
// the same global writer mutex.
//
// The MCP client (Claude Code / external bridges) uses these as proof that
// the server is still alive. Without periodic heartbeats, external MCP
// bridges time out long-running tool calls with MCP error -32001.
//
// Usage:
// notify.init(use_headers); // once, at startup
// notify.send(alloc, "agent 'explorer' running…"); // any thread

const std = @import("std");

/// Call once at server startup with the framing mode in use.
pub fn init(use_headers: bool) void {
g_use_headers = use_headers;
g_ready = true;
}

/// Send a notifications/message to the MCP client.
/// Thread-safe. No-ops silently if init() was not called.
pub fn send(alloc: std.mem.Allocator, message: []const u8) void {
if (!g_ready) return;

// Build JSON-escaped message string
var escaped: std.ArrayList(u8) = .empty;
defer escaped.deinit(alloc);
escaped.append(alloc, '"') catch return;
for (message) |c| {
switch (c) {
'"' => escaped.appendSlice(alloc, "\\\"") catch return,
'\\' => escaped.appendSlice(alloc, "\\\\") catch return,
'\n' => escaped.appendSlice(alloc, "\\n") catch return,
'\r' => escaped.appendSlice(alloc, "\\r") catch return,
'\t' => escaped.appendSlice(alloc, "\\t") catch return,
else => escaped.append(alloc, c) catch return,
}
}
escaped.append(alloc, '"') catch return;

const payload = std.fmt.allocPrint(
alloc,
"{{\"jsonrpc\":\"2.0\",\"method\":\"notifications/message\",\"params\":{{\"level\":\"info\",\"data\":{s}}}}}",
.{escaped.items},
) catch return;
defer alloc.free(payload);

const stdout = std.fs.File.stdout();
g_mutex.lock();
defer g_mutex.unlock();

if (g_use_headers) {
const header = std.fmt.allocPrint(
alloc, "Content-Length: {d}\r\n\r\n", .{payload.len},
) catch return;
defer alloc.free(header);
stdout.writeAll(header) catch {};
stdout.writeAll(payload) catch {};
stdout.writeAll("\r\n") catch {};
} else {
stdout.writeAll(payload) catch {};
stdout.writeAll("\n") catch {};
}
}

// ── Globals ───────────────────────────────────────────────────────────────────
var g_mutex: std.Thread.Mutex = .{};
var g_use_headers: bool = false;
var g_ready: bool = false;

// ── Tests ─────────────────────────────────────────────────────────────────────

test "notify: init sets ready flag" {
init(false);
send(std.testing.allocator, "test message");
}
48 changes: 47 additions & 1 deletion src/tools.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2279,6 +2279,7 @@ fn runChainStep(
step_out: *std.ArrayList(u8),
) void {
const rt = @import("runtime.zig");
const notify = @import("notify.zig");
const req: rt.AgentRequest = .{
.prompt = prompt,
.role = role,
Expand All @@ -2288,7 +2289,52 @@ fn runChainStep(
};
const resolved = rt.resolve.resolveWithProbe(alloc, req);
defer rt.prompts.freeAssembled(alloc, resolved.system_prompt);
rt.dispatch.dispatch(alloc, resolved, prompt, step_out);

// Run dispatch on a worker thread so we can send heartbeat notifications
// on the main thread. Without heartbeats, external MCP bridges (which
// impose their own ~60s timeout) will kill the connection with -32001
// even though the agent is still working.
const Ctx = struct {
a: std.mem.Allocator,
resolved: rt.ResolvedAgent,
p: []const u8,
out: *std.ArrayList(u8),
done: std.Thread.ResetEvent = .{},

fn work(ctx: *@This()) void {
rt.dispatch.dispatch(ctx.a, ctx.resolved, ctx.p, ctx.out);
ctx.done.set();
}
};

const ctx = alloc.create(Ctx) catch {
step_out.appendSlice(alloc, "{\"error\":\"OOM allocating agent context\"}") catch {};
return;
};
defer alloc.destroy(ctx);
ctx.* = .{ .a = alloc, .resolved = resolved, .p = prompt, .out = step_out };

const thread = std.Thread.spawn(.{}, Ctx.work, .{ctx}) catch {
// Fallback: run synchronously (no heartbeat, but still works)
rt.dispatch.dispatch(alloc, resolved, prompt, step_out);
return;
};

// Send heartbeat every 15s while the agent is running.
// This keeps external MCP bridge connections alive.
const heartbeat_ns: u64 = 15 * std.time.ns_per_s;
var elapsed_s: u64 = 0;
while (true) {
if (ctx.done.timedWait(heartbeat_ns)) |_| {
break; // agent finished
} else |_| {
elapsed_s += 15;
var msg_buf: [128]u8 = undefined;
const msg = std.fmt.bufPrint(&msg_buf, "agent '{s}' running ({d}s elapsed)", .{ role, elapsed_s }) catch "agent running…";
notify.send(alloc, msg);
}
}
thread.join();
}

fn handleRunTask(
Expand Down