From 4ec7c2cb6c544a443497cd90e05c1f1a332dc6ee Mon Sep 17 00:00:00 2001 From: baudbot-agent Date: Fri, 27 Feb 2026 05:58:45 +0000 Subject: [PATCH 1/5] feat: add unanswered Slack mention detection to heartbeat Adds periodic checking for Slack app_mention events that didn't receive a reply, helping catch messages lost during agent restarts. Changes: - Add checkUnansweredMentions() function that scans bridge log for recent app_mention events (last hour, older than 5 min) - Add hasRepliedToThread() helper that checks bridge logs and control-agent session logs for evidence of replies - Integrate check into main heartbeat flow (runs every 10 min by default) - Add HEARTBEAT_CHECK_UNANSWERED_MENTIONS env var (enabled by default) - Add UNANSWERED_MENTION_THRESHOLD_MS constant (5 min grace period) - Update heartbeat config output to show new settings This addresses the issue where messages forwarded to dead sockets during restarts are lost because the bridge acks them to Slack before confirming agent receipt. The heartbeat will now alert the control-agent about unanswered mentions so it can follow up. Fixes dropped messages during agent restart windows. --- pi/extensions/heartbeat.ts | 137 +++++++++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) diff --git a/pi/extensions/heartbeat.ts b/pi/extensions/heartbeat.ts index c529160..78d3bac 100644 --- a/pi/extensions/heartbeat.ts +++ b/pi/extensions/heartbeat.ts @@ -9,11 +9,13 @@ * 2. Slack bridge — HTTP POST to localhost:7890/send returns 400 * 3. Stale worktrees — ~/workspace/worktrees/ has dirs with no matching in-progress todo * 4. Stuck todos — in-progress for >2 hours with no matching dev-agent session + * 5. Unanswered Slack mentions — app_mention events in bridge log with no reply within 5 min * * Configuration (env vars): * HEARTBEAT_INTERVAL_MS — interval between heartbeats (default: 600000 = 10 min) * HEARTBEAT_ENABLED — set to "0" or "false" to disable (default: enabled) * HEARTBEAT_EXPECTED_SESSIONS — comma-separated session aliases to check (default: "sentry-agent") + * HEARTBEAT_CHECK_UNANSWERED_MENTIONS — set to "1" or "true" to enable (default: enabled) * * When all checks pass, zero LLM tokens are consumed. When something fails, * a targeted prompt is injected describing only the failures so the control-agent @@ -37,6 +39,9 @@ const SOCKET_DIR = join(homedir(), ".pi", "session-control"); const WORKTREES_DIR = join(homedir(), "workspace", "worktrees"); const TODOS_DIR = join(homedir(), ".pi", "todos"); const BRIDGE_URL = "http://127.0.0.1:7890/send"; +const BRIDGE_LOG = join(homedir(), ".pi", "agent", "logs", "slack-bridge.log"); +const SESSION_DIR = join(homedir(), ".pi", "agent", "sessions"); +const UNANSWERED_MENTION_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes type HeartbeatState = { enabled: boolean; @@ -72,6 +77,12 @@ function getExpectedSessions(): string[] { return ["sentry-agent"]; } +function isUnansweredMentionsCheckEnabled(): boolean { + const val = process.env.HEARTBEAT_CHECK_UNANSWERED_MENTIONS?.trim().toLowerCase(); + // Default to enabled unless explicitly disabled + return val !== "0" && val !== "false" && val !== "no"; +} + // ── Health Check Functions ────────────────────────────────────────────────── function checkSessions(): CheckResult[] { @@ -300,6 +311,119 @@ function checkStuckTodos(): CheckResult[] { return results; } +function checkUnansweredMentions(): CheckResult[] { + const results: CheckResult[] = []; + const now = Date.now(); + + if (!existsSync(BRIDGE_LOG)) return results; + + try { + // Read the last 500 lines of the bridge log to find recent app_mention events + const { execSync } = require("node:child_process"); + const logTail = execSync(`tail -500 "${BRIDGE_LOG}"`, { encoding: "utf-8" }); + + // Parse log lines looking for app_mention events + const mentionPattern = /\[([^\]]+)\].*app_mention.*ts: (\d+\.\d+)/g; + const mentions: Array<{ timestamp: string; ts: string }> = []; + + let match; + while ((match = mentionPattern.exec(logTail)) !== null) { + mentions.push({ + timestamp: match[1], + ts: match[2], + }); + } + + // Filter to recent mentions (within last hour) + const oneHourAgo = now - 60 * 60 * 1000; + const recentMentions = mentions.filter(m => { + try { + const mentionTime = new Date(m.timestamp).getTime(); + return mentionTime > oneHourAgo; + } catch { + return false; + } + }); + + // For each recent mention, check if we replied to it + for (const mention of recentMentions) { + const mentionTime = new Date(mention.timestamp).getTime(); + const age = now - mentionTime; + + // Skip very recent mentions (< 5 min) - agent might still be processing + if (age < UNANSWERED_MENTION_THRESHOLD_MS) continue; + + // Check if we sent a reply to this thread_ts + const replied = hasRepliedToThread(mention.ts); + + if (!replied) { + const minutesAgo = Math.round(age / (60 * 1000)); + results.push({ + name: `unanswered:${mention.ts}`, + ok: false, + detail: `Slack mention at ts ${mention.ts} (${minutesAgo} min ago) has no reply — may have been lost during restart`, + }); + } + } + } catch (err: unknown) { + // Log read failure or exec error - non-fatal, but log it + const msg = err instanceof Error ? err.message : String(err); + // Don't report this as a failure unless we have a specific problem to report + } + + return results; +} + +function hasRepliedToThread(threadTs: string): boolean { + // Check session logs and bridge logs for evidence of a reply to this thread_ts + + // 1. Check bridge log for /send or /reply with this thread_ts + if (existsSync(BRIDGE_LOG)) { + try { + const { execSync } = require("node:child_process"); + // Look for POST to /send or /reply with this thread_ts in the last 1000 lines + const result = execSync( + `tail -1000 "${BRIDGE_LOG}" | grep -E "(POST /send|POST /reply)" | grep -c "${threadTs}" || echo 0`, + { encoding: "utf-8" } + ); + const count = parseInt(result.trim(), 10); + if (count > 0) return true; + } catch { + // grep failed or command error + } + } + + // 2. Check control-agent session logs for mentions of this thread_ts in recent sessions + if (existsSync(SESSION_DIR)) { + try { + const sessionFiles = readdirSync(SESSION_DIR); + // Get the most recent control-agent session file + const controlAgentFiles = sessionFiles + .filter(f => f.includes("control-agent") && f.endsWith(".jsonl")) + .sort() + .reverse() + .slice(0, 3); // Check last 3 sessions + + for (const file of controlAgentFiles) { + try { + const content = readFileSync(join(SESSION_DIR, file), "utf-8"); + // Look for the thread_ts being mentioned in message content or tool calls + if (content.includes(threadTs)) { + // Found reference to this thread - likely replied + return true; + } + } catch { + // File read error - skip + } + } + } catch { + // Dir read error + } + } + + return false; +} + // ── Helper Functions ──────────────────────────────────────────────────────── function hasMatchingTodo(devAgentName: string): boolean { @@ -412,12 +536,16 @@ export default function heartbeatExtension(pi: ExtensionAPI): void { const bridgeResult = await checkBridge(); const worktreeResults = checkWorktrees(); const stuckTodoResults = checkStuckTodos(); + const unansweredMentionResults = isUnansweredMentionsCheckEnabled() + ? checkUnansweredMentions() + : []; const allResults: CheckResult[] = [ ...sessionResults, bridgeResult, ...worktreeResults, ...stuckTodoResults, + ...unansweredMentionResults, ]; const failures = allResults.filter((r) => !r.ok); @@ -550,12 +678,16 @@ export default function heartbeatExtension(pi: ExtensionAPI): void { const bridgeResult = await checkBridge(); const worktreeResults = checkWorktrees(); const stuckTodoResults = checkStuckTodos(); + const unansweredMentionResults = isUnansweredMentionsCheckEnabled() + ? checkUnansweredMentions() + : []; const allResults: CheckResult[] = [ ...sessionResults, bridgeResult, ...worktreeResults, ...stuckTodoResults, + ...unansweredMentionResults, ]; const failures = allResults.filter((r) => !r.ok); @@ -592,6 +724,7 @@ export default function heartbeatExtension(pi: ExtensionAPI): void { case "config": { const expected = getExpectedSessions(); + const checkUnanswered = isUnansweredMentionsCheckEnabled(); return { content: [ { @@ -604,11 +737,15 @@ export default function heartbeatExtension(pi: ExtensionAPI): void { ` Backoff multiplier: ${BACKOFF_MULTIPLIER}x per error`, ` Max backoff: ${MAX_BACKOFF_MS / 1000}s`, ` Expected sessions: ${expected.join(", ")} (env: HEARTBEAT_EXPECTED_SESSIONS)`, + ` Check unanswered mentions: ${checkUnanswered ? "enabled" : "disabled"} (env: HEARTBEAT_CHECK_UNANSWERED_MENTIONS)`, + ` Unanswered mention threshold: ${UNANSWERED_MENTION_THRESHOLD_MS / (60 * 1000)} min`, ` Stuck todo threshold: ${STUCK_TODO_THRESHOLD_MS / (60 * 60 * 1000)}h`, ` Bridge URL: ${BRIDGE_URL}`, + ` Bridge log: ${BRIDGE_LOG}`, ` Socket dir: ${SOCKET_DIR}`, ` Worktrees dir: ${WORKTREES_DIR}`, ` Todos dir: ${TODOS_DIR}`, + ` Session dir: ${SESSION_DIR}`, ].join("\n"), }, ], From ed2c48d76b33094d0d2055226e69168453b8cde1 Mon Sep 17 00:00:00 2001 From: baudbot-agent Date: Fri, 27 Feb 2026 06:00:31 +0000 Subject: [PATCH 2/5] fix: improve reply detection to avoid false positives The initial implementation checked for any occurrence of thread_ts in session logs, which would match both inbound mentions AND outbound replies. This led to false positives. Improvements: 1. Add support for slack-reply-log.jsonl (most reliable source) 2. Check for thread_ts in outbound context specifically (JSON keys in curl commands or send_to_session calls) 3. Use proper session subdirectory path 4. Add multiple JSON formatting variations to catch escaped quotes 5. Add documentation for future reaction-based checking This makes the check more precise - only counting actual replies, not just seeing the mention. --- pi/extensions/heartbeat.ts | 60 +++++++++++++++++++++++--------------- 1 file changed, 36 insertions(+), 24 deletions(-) diff --git a/pi/extensions/heartbeat.ts b/pi/extensions/heartbeat.ts index 78d3bac..7a7fc83 100644 --- a/pi/extensions/heartbeat.ts +++ b/pi/extensions/heartbeat.ts @@ -375,41 +375,45 @@ function checkUnansweredMentions(): CheckResult[] { } function hasRepliedToThread(threadTs: string): boolean { - // Check session logs and bridge logs for evidence of a reply to this thread_ts - - // 1. Check bridge log for /send or /reply with this thread_ts - if (existsSync(BRIDGE_LOG)) { + // Check multiple sources for evidence of a reply to this thread_ts. + + // 1. Check the reply tracking log (most reliable — written by the agent). + // File: ~/.pi/agent/slack-reply-log.jsonl + // Each line: {"thread_ts":"...","replied_at":"..."} + const replyLogPath = join(homedir(), ".pi", "agent", "slack-reply-log.jsonl"); + if (existsSync(replyLogPath)) { try { - const { execSync } = require("node:child_process"); - // Look for POST to /send or /reply with this thread_ts in the last 1000 lines - const result = execSync( - `tail -1000 "${BRIDGE_LOG}" | grep -E "(POST /send|POST /reply)" | grep -c "${threadTs}" || echo 0`, - { encoding: "utf-8" } - ); - const count = parseInt(result.trim(), 10); - if (count > 0) return true; + const content = readFileSync(replyLogPath, "utf-8"); + if (content.includes(threadTs)) return true; } catch { - // grep failed or command error + // File read error — fall through to other checks } } - // 2. Check control-agent session logs for mentions of this thread_ts in recent sessions - if (existsSync(SESSION_DIR)) { + // 2. Check control-agent session logs for the thread_ts. + // Session files are in ~/.pi/agent/sessions/--home-baudbot_agent--/ + // and named _.jsonl. + // If the agent processed a thread_ts (via curl /send with thread_ts), + // the JSONL will contain it in the tool call arguments. + const controlAgentSessionDir = join(SESSION_DIR, "--home-baudbot_agent--"); + if (existsSync(controlAgentSessionDir)) { try { - const sessionFiles = readdirSync(SESSION_DIR); - // Get the most recent control-agent session file - const controlAgentFiles = sessionFiles - .filter(f => f.includes("control-agent") && f.endsWith(".jsonl")) + const sessionFiles = readdirSync(controlAgentSessionDir) + .filter(f => f.endsWith(".jsonl")) .sort() .reverse() .slice(0, 3); // Check last 3 sessions - for (const file of controlAgentFiles) { + for (const file of sessionFiles) { try { - const content = readFileSync(join(SESSION_DIR, file), "utf-8"); - // Look for the thread_ts being mentioned in message content or tool calls - if (content.includes(threadTs)) { - // Found reference to this thread - likely replied + const content = readFileSync(join(controlAgentSessionDir, file), "utf-8"); + // Look for evidence of a reply: the thread_ts appearing in a curl /send command + // or in a send_to_session message. The thread_ts in an outbound context + // (not just the inbound mention) indicates we replied. + if (content.includes(`"thread_ts":"${threadTs}"`) || + content.includes(`"thread_ts": "${threadTs}"`) || + content.includes(`\\"thread_ts\\":\\"${threadTs}\\"`) || + content.includes(`\\"thread_ts\\":\\"${threadTs}\\"`)) { return true; } } catch { @@ -421,6 +425,14 @@ function hasRepliedToThread(threadTs: string): boolean { } } + // 3. Check the bridge log for the ✅ check reaction resolving this thread. + // The bridge logs failures like "✅ check reaction failed" but successful + // ack reactions are silent. Still worth checking for the thread_ts in + // any outbound context (e.g., reaction calls). + // Also check for the 👀 eyes reaction — if we reacted with eyes AND + // the thread_ts appears in an outbound /send context, we likely replied. + // (This is a weak signal but better than nothing.) + return false; } From 6ae75e495e331e13d17beaaba0869f4ffd2ca93f Mon Sep 17 00:00:00 2001 From: Ben Vinegar Date: Sat, 28 Feb 2026 16:00:51 -0500 Subject: [PATCH 3/5] heartbeat: tighten unanswered-mention reply detection --- pi/extensions/heartbeat.test.mjs | 99 ++++++++++++++++++++++++++++++++ pi/extensions/heartbeat.ts | 80 +++++++++++++++++--------- 2 files changed, 152 insertions(+), 27 deletions(-) diff --git a/pi/extensions/heartbeat.test.mjs b/pi/extensions/heartbeat.test.mjs index 11c63df..b173b44 100644 --- a/pi/extensions/heartbeat.test.mjs +++ b/pi/extensions/heartbeat.test.mjs @@ -63,6 +63,55 @@ function parseTodo(content) { } } +function hasReplyLogEntry(replyLogContent, threadTs) { + const lines = replyLogContent.split("\n"); + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + try { + const entry = JSON.parse(trimmed); + if (entry?.thread_ts === threadTs) return true; + } catch { + // Ignore malformed JSONL lines. + } + } + return false; +} + +function hasOutboundSendCommand(sessionJsonlContent, threadTs) { + const escapedThreadTs = threadTs.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); + const threadTsPattern = new RegExp(`["']thread_ts["']\\s*:\\s*["']${escapedThreadTs}["']`); + + for (const line of sessionJsonlContent.split("\n")) { + const trimmed = line.trim(); + if (!trimmed) continue; + + let parsed; + try { + parsed = JSON.parse(trimmed); + } catch { + continue; + } + + if (parsed?.type !== "message") continue; + if (parsed?.message?.role !== "assistant") continue; + const items = parsed?.message?.content; + if (!Array.isArray(items)) continue; + + for (const item of items) { + if (item?.type !== "toolCall") continue; + if (item?.name !== "bash") continue; + const command = typeof item?.arguments?.command === "string" ? item.arguments.command : ""; + if (!command.includes("curl")) continue; + if (!command.includes("/send")) continue; + if (!threadTsPattern.test(command)) continue; + return true; + } + } + + return false; +} + // ── Test helpers ──────────────────────────────────────────────────────────── // ── Tests ─────────────────────────────────────────────────────────────────── @@ -312,6 +361,56 @@ Not part of JSON.`; }); }); +describe("heartbeat v2: unanswered mention reply detection", () => { + it("matches exact thread_ts entries in reply log jsonl", () => { + const log = [ + '{"thread_ts":"1234.5678","replied_at":"2026-02-27T00:00:00Z"}', + '{"thread_ts":"2345.6789","replied_at":"2026-02-27T00:05:00Z"}', + ].join("\n"); + + assert.equal(hasReplyLogEntry(log, "1234.5678"), true); + assert.equal(hasReplyLogEntry(log, "9999.0000"), false); + }); + + it("ignores malformed reply-log lines", () => { + const log = ['{"thread_ts":"1234.5678"}', 'not-json', '{"thread_ts":"2345.6789"}'].join("\n"); + assert.equal(hasReplyLogEntry(log, "2345.6789"), true); + }); + + it("detects outbound curl /send with matching thread_ts", () => { + const session = JSON.stringify({ + type: "message", + message: { + role: "assistant", + content: [ + { + type: "toolCall", + name: "bash", + arguments: { + command: + "curl -s -X POST http://127.0.0.1:7890/send -H 'Content-Type: application/json' -d '{\"channel\":\"C123\",\"text\":\"hi\",\"thread_ts\":\"1234.5678\"}'", + }, + }, + ], + }, + }); + + assert.equal(hasOutboundSendCommand(session, "1234.5678"), true); + }); + + it("does not treat inbound text containing thread_ts as a reply", () => { + const inboundOnly = JSON.stringify({ + type: "message", + message: { + role: "user", + content: [{ type: "text", text: "inbound event metadata: thread_ts=1234.5678" }], + }, + }); + + assert.equal(hasOutboundSendCommand(inboundOnly, "1234.5678"), false); + }); +}); + describe("heartbeat v2: hasMatchingInProgressTodo logic", () => { // Replicate the matching logic from the extension function matchesWorktree(content, worktreeName) { diff --git a/pi/extensions/heartbeat.ts b/pi/extensions/heartbeat.ts index 7a7fc83..5b7309d 100644 --- a/pi/extensions/heartbeat.ts +++ b/pi/extensions/heartbeat.ts @@ -15,7 +15,7 @@ * HEARTBEAT_INTERVAL_MS — interval between heartbeats (default: 600000 = 10 min) * HEARTBEAT_ENABLED — set to "0" or "false" to disable (default: enabled) * HEARTBEAT_EXPECTED_SESSIONS — comma-separated session aliases to check (default: "sentry-agent") - * HEARTBEAT_CHECK_UNANSWERED_MENTIONS — set to "1" or "true" to enable (default: enabled) + * HEARTBEAT_CHECK_UNANSWERED_MENTIONS — enabled by default, set to "0", "false", or "no" to disable * * When all checks pass, zero LLM tokens are consumed. When something fails, * a targeted prompt is injected describing only the failures so the control-agent @@ -326,7 +326,7 @@ function checkUnansweredMentions(): CheckResult[] { const mentionPattern = /\[([^\]]+)\].*app_mention.*ts: (\d+\.\d+)/g; const mentions: Array<{ timestamp: string; ts: string }> = []; - let match; + let match: RegExpExecArray | null; while ((match = mentionPattern.exec(logTail)) !== null) { mentions.push({ timestamp: match[1], @@ -365,10 +365,9 @@ function checkUnansweredMentions(): CheckResult[] { }); } } - } catch (err: unknown) { - // Log read failure or exec error - non-fatal, but log it - const msg = err instanceof Error ? err.message : String(err); - // Don't report this as a failure unless we have a specific problem to report + } catch { + // Log read failure or exec error - non-fatal. + // Don't report this as a failure unless we have a specific problem to report. } return results; @@ -384,22 +383,35 @@ function hasRepliedToThread(threadTs: string): boolean { if (existsSync(replyLogPath)) { try { const content = readFileSync(replyLogPath, "utf-8"); - if (content.includes(threadTs)) return true; + const lines = content.split("\n"); + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + try { + const entry = JSON.parse(trimmed); + if (entry?.thread_ts === threadTs) { + return true; + } + } catch { + // Ignore malformed JSONL lines and keep scanning. + } + } } catch { // File read error — fall through to other checks } } - // 2. Check control-agent session logs for the thread_ts. + // 2. Check recent control-agent session logs for explicit outbound /send calls. // Session files are in ~/.pi/agent/sessions/--home-baudbot_agent--/ // and named _.jsonl. - // If the agent processed a thread_ts (via curl /send with thread_ts), - // the JSONL will contain it in the tool call arguments. const controlAgentSessionDir = join(SESSION_DIR, "--home-baudbot_agent--"); if (existsSync(controlAgentSessionDir)) { + const escapeRegExp = (value: string): string => value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); + const threadTsPattern = new RegExp(`["']thread_ts["']\\s*:\\s*["']${escapeRegExp(threadTs)}["']`); + try { const sessionFiles = readdirSync(controlAgentSessionDir) - .filter(f => f.endsWith(".jsonl")) + .filter((f) => f.endsWith(".jsonl")) .sort() .reverse() .slice(0, 3); // Check last 3 sessions @@ -407,14 +419,36 @@ function hasRepliedToThread(threadTs: string): boolean { for (const file of sessionFiles) { try { const content = readFileSync(join(controlAgentSessionDir, file), "utf-8"); - // Look for evidence of a reply: the thread_ts appearing in a curl /send command - // or in a send_to_session message. The thread_ts in an outbound context - // (not just the inbound mention) indicates we replied. - if (content.includes(`"thread_ts":"${threadTs}"`) || - content.includes(`"thread_ts": "${threadTs}"`) || - content.includes(`\\"thread_ts\\":\\"${threadTs}\\"`) || - content.includes(`\\"thread_ts\\":\\"${threadTs}\\"`)) { - return true; + const lines = content.split("\n"); + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + + let parsed: any; + try { + parsed = JSON.parse(trimmed); + } catch { + continue; + } + + if (parsed?.type !== "message") continue; + if (parsed?.message?.role !== "assistant") continue; + + const items = parsed?.message?.content; + if (!Array.isArray(items)) continue; + + for (const item of items) { + if (item?.type !== "toolCall") continue; + if (item?.name !== "bash") continue; + + const command = typeof item?.arguments?.command === "string" ? item.arguments.command : ""; + if (!command.includes("curl")) continue; + if (!command.includes("/send")) continue; + if (!threadTsPattern.test(command)) continue; + + return true; + } } } catch { // File read error - skip @@ -425,14 +459,6 @@ function hasRepliedToThread(threadTs: string): boolean { } } - // 3. Check the bridge log for the ✅ check reaction resolving this thread. - // The bridge logs failures like "✅ check reaction failed" but successful - // ack reactions are silent. Still worth checking for the thread_ts in - // any outbound context (e.g., reaction calls). - // Also check for the 👀 eyes reaction — if we reacted with eyes AND - // the thread_ts appears in an outbound /send context, we likely replied. - // (This is a weak signal but better than nothing.) - return false; } From ef3bd908388fe3091c3ef276e15d8154fde29753 Mon Sep 17 00:00:00 2001 From: Ben Vinegar Date: Sat, 28 Feb 2026 22:09:56 -0500 Subject: [PATCH 4/5] heartbeat: support unanswered mention detection in socket mode --- pi/extensions/heartbeat.test.mjs | 43 ++++++++++++++++++++++++ pi/extensions/heartbeat.ts | 57 ++++++++++++++++---------------- slack-bridge/bridge.mjs | 2 ++ 3 files changed, 73 insertions(+), 29 deletions(-) diff --git a/pi/extensions/heartbeat.test.mjs b/pi/extensions/heartbeat.test.mjs index b173b44..62334be 100644 --- a/pi/extensions/heartbeat.test.mjs +++ b/pi/extensions/heartbeat.test.mjs @@ -112,6 +112,22 @@ function hasOutboundSendCommand(sessionJsonlContent, threadTs) { return false; } +function slackTsToMs(ts) { + const parsed = Number.parseFloat(ts); + if (!Number.isFinite(parsed) || parsed <= 0) return null; + return Math.floor(parsed * 1000); +} + +function extractMentionThreadTs(logTail) { + const mentionPattern = /app_mention[^\n]*\bts:\s*(\d+\.\d+)/g; + const mentionTs = new Set(); + let match; + while ((match = mentionPattern.exec(logTail)) !== null) { + mentionTs.add(match[1]); + } + return [...mentionTs]; +} + // ── Test helpers ──────────────────────────────────────────────────────────── // ── Tests ─────────────────────────────────────────────────────────────────── @@ -361,6 +377,33 @@ Not part of JSON.`; }); }); +describe("heartbeat v2: unanswered mention log parsing", () => { + it("extracts app_mention ts from broker-bridge log format", () => { + const log = "[2026-02-28T21:10:00.000Z] 👤 message from <@U123> in C123 (type: app_mention, ts: 1772313000.123456)"; + assert.deepEqual(extractMentionThreadTs(log), ["1772313000.123456"]); + }); + + it("extracts app_mention ts from socket-mode bridge log format", () => { + const log = "📣 app_mention from <@U123> in C123 ts: 1772313001.654321"; + assert.deepEqual(extractMentionThreadTs(log), ["1772313001.654321"]); + }); + + it("ignores non-app_mention log lines", () => { + const log = [ + "💬 from <@U123>: hello", + "[2026-02-28T21:10:00.000Z] 👤 message from <@U123> in C123 (type: message, ts: 1772313000.123456)", + "🧵 Registered thread-1 → channel=C123 thread_ts=1772313000.123456", + ].join("\n"); + assert.deepEqual(extractMentionThreadTs(log), []); + }); + + it("converts slack ts to milliseconds", () => { + assert.equal(slackTsToMs("1772313000.123456"), 1772313000123); + assert.equal(slackTsToMs("0"), null); + assert.equal(slackTsToMs("not-a-ts"), null); + }); +}); + describe("heartbeat v2: unanswered mention reply detection", () => { it("matches exact thread_ts entries in reply log jsonl", () => { const log = [ diff --git a/pi/extensions/heartbeat.ts b/pi/extensions/heartbeat.ts index 5b7309d..210a356 100644 --- a/pi/extensions/heartbeat.ts +++ b/pi/extensions/heartbeat.ts @@ -318,50 +318,43 @@ function checkUnansweredMentions(): CheckResult[] { if (!existsSync(BRIDGE_LOG)) return results; try { - // Read the last 500 lines of the bridge log to find recent app_mention events + // Read the last 500 lines of the bridge log to find recent app_mention events. + // Support both bridge implementations: + // - broker-bridge.mjs: "... (type: app_mention, ts: 1234.5678)" + // - bridge.mjs: "app_mention ... ts: 1234.5678" const { execSync } = require("node:child_process"); const logTail = execSync(`tail -500 "${BRIDGE_LOG}"`, { encoding: "utf-8" }); - - // Parse log lines looking for app_mention events - const mentionPattern = /\[([^\]]+)\].*app_mention.*ts: (\d+\.\d+)/g; - const mentions: Array<{ timestamp: string; ts: string }> = []; - + + // Capture the Slack event ts value from app_mention log lines. + const mentionPattern = /app_mention[^\n]*\bts:\s*(\d+\.\d+)/g; + const mentionThreadTsSet = new Set(); + let match: RegExpExecArray | null; while ((match = mentionPattern.exec(logTail)) !== null) { - mentions.push({ - timestamp: match[1], - ts: match[2], - }); + mentionThreadTsSet.add(match[1]); } - // Filter to recent mentions (within last hour) const oneHourAgo = now - 60 * 60 * 1000; - const recentMentions = mentions.filter(m => { - try { - const mentionTime = new Date(m.timestamp).getTime(); - return mentionTime > oneHourAgo; - } catch { - return false; - } - }); - // For each recent mention, check if we replied to it - for (const mention of recentMentions) { - const mentionTime = new Date(mention.timestamp).getTime(); + // For each recent mention, check if we replied to it. + for (const threadTs of mentionThreadTsSet) { + const mentionTime = slackTsToMs(threadTs); + if (mentionTime == null || mentionTime <= oneHourAgo) continue; + const age = now - mentionTime; - - // Skip very recent mentions (< 5 min) - agent might still be processing + + // Skip very recent mentions (< 5 min) - agent might still be processing. if (age < UNANSWERED_MENTION_THRESHOLD_MS) continue; - // Check if we sent a reply to this thread_ts - const replied = hasRepliedToThread(mention.ts); - + // Check if we sent a reply to this thread_ts. + const replied = hasRepliedToThread(threadTs); + if (!replied) { const minutesAgo = Math.round(age / (60 * 1000)); results.push({ - name: `unanswered:${mention.ts}`, + name: `unanswered:${threadTs}`, ok: false, - detail: `Slack mention at ts ${mention.ts} (${minutesAgo} min ago) has no reply — may have been lost during restart`, + detail: `Slack mention at ts ${threadTs} (${minutesAgo} min ago) has no reply — may have been lost during restart`, }); } } @@ -373,6 +366,12 @@ function checkUnansweredMentions(): CheckResult[] { return results; } +function slackTsToMs(ts: string): number | null { + const parsed = Number.parseFloat(ts); + if (!Number.isFinite(parsed) || parsed <= 0) return null; + return Math.floor(parsed * 1000); +} + function hasRepliedToThread(threadTs: string): boolean { // Check multiple sources for evidence of a reply to this thread_ts. diff --git a/slack-bridge/bridge.mjs b/slack-bridge/bridge.mjs index cb436e6..9e113fe 100644 --- a/slack-bridge/bridge.mjs +++ b/slack-bridge/bridge.mjs @@ -346,6 +346,8 @@ async function handleMessage(userMessage, event, say) { // Handle @mentions app.event("app_mention", async ({ event, say }) => { + console.log(`📣 app_mention from <@${event.user || "unknown"}> in ${event.channel || "n/a"} ts: ${event.ts}`); + const userMessage = cleanMessage(event.text); if (!userMessage) { await say({ text: "👋 I'm here! Send me a message.", thread_ts: event.ts }); From 453d1a7fffd50103f7112cdacde8fb1bf106a2b3 Mon Sep 17 00:00:00 2001 From: Ben Vinegar Date: Sat, 28 Feb 2026 22:23:02 -0500 Subject: [PATCH 5/5] heartbeat: track app mentions by thread root ts --- pi/extensions/heartbeat.test.mjs | 35 ++++++++++++++++++++++++++------ pi/extensions/heartbeat.ts | 30 +++++++++++++++++++-------- slack-bridge/bridge.mjs | 13 ++++++++---- slack-bridge/broker-bridge.mjs | 11 ++++++---- 4 files changed, 67 insertions(+), 22 deletions(-) diff --git a/pi/extensions/heartbeat.test.mjs b/pi/extensions/heartbeat.test.mjs index 62334be..1aea812 100644 --- a/pi/extensions/heartbeat.test.mjs +++ b/pi/extensions/heartbeat.test.mjs @@ -119,13 +119,24 @@ function slackTsToMs(ts) { } function extractMentionThreadTs(logTail) { - const mentionPattern = /app_mention[^\n]*\bts:\s*(\d+\.\d+)/g; - const mentionTs = new Set(); - let match; - while ((match = mentionPattern.exec(logTail)) !== null) { - mentionTs.add(match[1]); + const mentionThreadTs = new Set(); + + for (const line of logTail.split("\n")) { + if (!line.includes("app_mention")) continue; + + const threadMatch = line.match(/\bthread_ts:\s*(\d+\.\d+)/); + if (threadMatch?.[1]) { + mentionThreadTs.add(threadMatch[1]); + continue; + } + + const tsMatch = line.match(/\bts:\s*(\d+\.\d+)/); + if (tsMatch?.[1]) { + mentionThreadTs.add(tsMatch[1]); + } } - return [...mentionTs]; + + return [...mentionThreadTs]; } // ── Test helpers ──────────────────────────────────────────────────────────── @@ -379,6 +390,12 @@ Not part of JSON.`; describe("heartbeat v2: unanswered mention log parsing", () => { it("extracts app_mention ts from broker-bridge log format", () => { + const log = + "[2026-02-28T21:10:00.000Z] 👤 message from <@U123> in C123 (type: app_mention, thread_ts: 1772313000.000001, ts: 1772313000.123456)"; + assert.deepEqual(extractMentionThreadTs(log), ["1772313000.000001"]); + }); + + it("falls back to message ts when thread_ts is absent", () => { const log = "[2026-02-28T21:10:00.000Z] 👤 message from <@U123> in C123 (type: app_mention, ts: 1772313000.123456)"; assert.deepEqual(extractMentionThreadTs(log), ["1772313000.123456"]); }); @@ -388,6 +405,12 @@ describe("heartbeat v2: unanswered mention log parsing", () => { assert.deepEqual(extractMentionThreadTs(log), ["1772313001.654321"]); }); + it("prefers thread_ts over message ts when both are present", () => { + const log = + "📣 app_mention from <@U123> in C123 thread_ts: 1772313000.000001 ts: 1772313001.654321"; + assert.deepEqual(extractMentionThreadTs(log), ["1772313000.000001"]); + }); + it("ignores non-app_mention log lines", () => { const log = [ "💬 from <@U123>: hello", diff --git a/pi/extensions/heartbeat.ts b/pi/extensions/heartbeat.ts index 210a356..f41cfbc 100644 --- a/pi/extensions/heartbeat.ts +++ b/pi/extensions/heartbeat.ts @@ -325,14 +325,7 @@ function checkUnansweredMentions(): CheckResult[] { const { execSync } = require("node:child_process"); const logTail = execSync(`tail -500 "${BRIDGE_LOG}"`, { encoding: "utf-8" }); - // Capture the Slack event ts value from app_mention log lines. - const mentionPattern = /app_mention[^\n]*\bts:\s*(\d+\.\d+)/g; - const mentionThreadTsSet = new Set(); - - let match: RegExpExecArray | null; - while ((match = mentionPattern.exec(logTail)) !== null) { - mentionThreadTsSet.add(match[1]); - } + const mentionThreadTsSet = new Set(extractMentionThreadTs(logTail)); const oneHourAgo = now - 60 * 60 * 1000; @@ -366,6 +359,27 @@ function checkUnansweredMentions(): CheckResult[] { return results; } +function extractMentionThreadTs(logTail: string): string[] { + const mentionThreadTsSet = new Set(); + + for (const line of logTail.split("\n")) { + if (!line.includes("app_mention")) continue; + + const threadMatch = line.match(/\bthread_ts:\s*(\d+\.\d+)/); + if (threadMatch?.[1]) { + mentionThreadTsSet.add(threadMatch[1]); + continue; + } + + const tsMatch = line.match(/\bts:\s*(\d+\.\d+)/); + if (tsMatch?.[1]) { + mentionThreadTsSet.add(tsMatch[1]); + } + } + + return [...mentionThreadTsSet]; +} + function slackTsToMs(ts: string): number | null { const parsed = Number.parseFloat(ts); if (!Number.isFinite(parsed) || parsed <= 0) return null; diff --git a/slack-bridge/bridge.mjs b/slack-bridge/bridge.mjs index 9e113fe..665f80e 100644 --- a/slack-bridge/bridge.mjs +++ b/slack-bridge/bridge.mjs @@ -304,8 +304,10 @@ async function handleMessage(userMessage, event, say) { console.warn(`👀 eyes reaction failed: ${err.message}`); }); + const threadTs = event.thread_ts || event.ts; + // Track this message so we can add ✅ when the agent replies. - const threadKey = `${event.channel}:${event.thread_ts || event.ts}`; + const threadKey = `${event.channel}:${threadTs}`; pendingAckReactions.set(threadKey, { channel: event.channel, messageTs: event.ts, @@ -328,11 +330,11 @@ async function handleMessage(userMessage, event, say) { source: "Slack", user: event.user, channel: event.channel, - threadTs: event.ts, + threadTs, }); // Enrich with friendly thread ID so the agent can use /reply endpoint - const threadId = getThreadId(event.channel, event.thread_ts || event.ts); + const threadId = getThreadId(event.channel, threadTs); const contextMessage = `${wrappedMessage}\n[Bridge-Thread-ID: ${threadId}]`; // Fire-and-forget: deliver to agent, which will reply to Slack itself via /send API. @@ -346,7 +348,10 @@ async function handleMessage(userMessage, event, say) { // Handle @mentions app.event("app_mention", async ({ event, say }) => { - console.log(`📣 app_mention from <@${event.user || "unknown"}> in ${event.channel || "n/a"} ts: ${event.ts}`); + const threadTs = event.thread_ts || event.ts; + console.log( + `📣 app_mention from <@${event.user || "unknown"}> in ${event.channel || "n/a"} thread_ts: ${threadTs} ts: ${event.ts}` + ); const userMessage = cleanMessage(event.text); if (!userMessage) { diff --git a/slack-bridge/broker-bridge.mjs b/slack-bridge/broker-bridge.mjs index 0306e0a..33ea2b2 100755 --- a/slack-bridge/broker-bridge.mjs +++ b/slack-bridge/broker-bridge.mjs @@ -716,7 +716,10 @@ function sanitizeOutboundMessage(text, contextLabel) { } async function handleUserMessage(userMessage, event) { - logInfo(`👤 message from <@${event.user}> in ${event.channel} (type: ${event.type}, ts: ${event.ts})`); + const threadTs = event.thread_ts || event.ts; + logInfo( + `👤 message from <@${event.user}> in ${event.channel} (type: ${event.type}, thread_ts: ${threadTs}, ts: ${event.ts})` + ); if (!isAllowed(event.user, ALLOWED_USERS)) { logWarn(`🚫 user <@${event.user}> not in allowed list — rejecting`); @@ -742,7 +745,7 @@ async function handleUserMessage(userMessage, event) { }); // Track this message so we can add ✅ when the agent replies. - const threadKey = `${ackChannel}:${event.thread_ts || ackMessageTs}`; + const threadKey = `${ackChannel}:${threadTs}`; pendingAckReactions.set(threadKey, { channel: ackChannel, messageTs: ackMessageTs, @@ -763,10 +766,10 @@ async function handleUserMessage(userMessage, event) { source: "Slack (broker)", user: event.user, channel: event.channel, - threadTs: event.ts, + threadTs, }); - const threadId = getThreadId(event.channel, event.thread_ts || event.ts); + const threadId = getThreadId(event.channel, threadTs); const contextMessage = `${wrappedMessage}\n[Bridge-Thread-ID: ${threadId}]`; // Fire-and-forget: deliver to agent, which will reply to Slack itself via /send API.