From cbdd2a796feec198924190b1455ae02b4de82961 Mon Sep 17 00:00:00 2001 From: Geoff Date: Fri, 27 Feb 2026 11:05:36 +1100 Subject: [PATCH 1/5] fix: Discord thread replies, Telegram thread support, and queue reliability Discord: - Respond to all messages without requiring @mention - Create threads for responses in server channels - Typing indicator targets the thread Telegram: - Respond in groups/supergroups (not just private chats and forums) - File attachments sent to the correct forum topic thread - Typing indicator targets the correct thread - Commands (agent, team, reset, pairing) work in threads Queue reliability: - Fix claude not found in PATH by adding ~/.local/bin to spawn env - Add 5-minute timeout to agent commands to prevent hung processes - Process pending messages on startup (not just on new events) - Add 30-second fallback poll so queue never gets stuck Co-Authored-By: Claude Opus 4.6 --- src/channels/discord-client.ts | 56 +++++++++++++++++++++++---------- src/channels/telegram-client.ts | 40 +++++++++++++++++------ src/lib/invoke.ts | 33 ++++++++++++++++++- src/queue-processor.ts | 11 ++++++- 4 files changed, 111 insertions(+), 29 deletions(-) diff --git a/src/channels/discord-client.ts b/src/channels/discord-client.ts index 7a420fb0..3254e67b 100644 --- a/src/channels/discord-client.ts +++ b/src/channels/discord-client.ts @@ -5,7 +5,7 @@ * Does NOT call Claude directly - that's handled by queue-processor */ -import { Client, Events, GatewayIntentBits, Partials, Message, DMChannel, AttachmentBuilder } from 'discord.js'; +import { Client, Events, GatewayIntentBits, Partials, Message, DMChannel, TextChannel, ThreadChannel, AttachmentBuilder } from 'discord.js'; import 'dotenv/config'; import fs from 'fs'; import path from 'path'; @@ -43,7 +43,8 @@ if (!DISCORD_BOT_TOKEN || DISCORD_BOT_TOKEN === 'your_token_here') { interface PendingMessage { message: Message; - channel: DMChannel; + channel: DMChannel | TextChannel; + thread?: ThreadChannel; timestamp: number; } @@ -198,6 +199,7 @@ function pairingMessage(code: string): string { const client = new Client({ intents: [ GatewayIntentBits.Guilds, + GatewayIntentBits.GuildMessages, GatewayIntentBits.DirectMessages, GatewayIntentBits.MessageContent, ], @@ -221,9 +223,9 @@ client.on(Events.MessageCreate, async (message: Message) => { return; } - // Skip non-DM messages (guild = server channel) + // In server channels, strip any bot mention from the text but respond to all messages if (message.guild) { - return; + message.content = message.content.replace(/<@!?\d+>/g, '').trim(); } const hasAttachments = message.attachments.size > 0; @@ -321,7 +323,29 @@ client.on(Events.MessageCreate, async (message: Message) => { } // Show typing indicator - await (message.channel as DMChannel).sendTyping(); + await (message.channel as DMChannel | TextChannel).sendTyping(); + + // For guild messages, create a thread (or use existing thread) for the response + let thread: ThreadChannel | undefined; + if (message.guild) { + if (message.channel.isThread()) { + // Already in a thread, reply there + thread = message.channel as ThreadChannel; + } else { + // Create a new thread from the message + try { + const threadName = messageText.substring(0, 90) || 'Tiny'; + thread = await (message as Message).startThread({ + name: threadName, + autoArchiveDuration: 60, + }); + log('INFO', `Created thread "${threadName}" for message ${messageId}`); + } catch (threadErr) { + log('ERROR', `Failed to create thread: ${(threadErr as Error).message}`); + // Fall back to inline reply if thread creation fails + } + } + } // Build message text with file references let fullMessage = messageText; @@ -349,7 +373,8 @@ client.on(Events.MessageCreate, async (message: Message) => { // Store pending message for response pendingMessages.set(messageId, { message: message, - channel: message.channel as DMChannel, + channel: message.channel as DMChannel | TextChannel, + thread, timestamp: Date.now(), }); @@ -401,6 +426,9 @@ async function checkOutgoingQueue(): Promise { } if (dmChannel) { + // Use thread channel if available (guild messages), otherwise DM channel + const replyChannel = pending?.thread ?? dmChannel; + // Send any attached files if (files.length > 0) { const attachments: AttachmentBuilder[] = []; @@ -413,7 +441,7 @@ async function checkOutgoingQueue(): Promise { } } if (attachments.length > 0) { - await dmChannel.send({ files: attachments }); + await replyChannel.send({ files: attachments }); log('INFO', `Sent ${attachments.length} file(s) to Discord`); } } @@ -422,15 +450,8 @@ async function checkOutgoingQueue(): Promise { if (responseText) { const chunks = splitMessage(responseText); - if (chunks.length > 0) { - if (pending) { - await pending.message.reply(chunks[0]!); - } else { - await dmChannel.send(chunks[0]!); - } - } - for (let i = 1; i < chunks.length; i++) { - await dmChannel.send(chunks[i]!); + for (const chunk of chunks) { + await replyChannel.send(chunk); } } @@ -460,7 +481,8 @@ setInterval(checkOutgoingQueue, 1000); // Refresh typing indicator every 8 seconds (Discord typing expires after ~10s) setInterval(() => { for (const [, data] of pendingMessages.entries()) { - data.channel.sendTyping().catch(() => { + const typingChannel = data.thread ?? data.channel; + typingChannel.sendTyping().catch(() => { // Ignore typing errors silently }); } diff --git a/src/channels/telegram-client.ts b/src/channels/telegram-client.ts index 4d2a5fea..541a91d1 100644 --- a/src/channels/telegram-client.ts +++ b/src/channels/telegram-client.ts @@ -46,6 +46,7 @@ if (!TELEGRAM_BOT_TOKEN || TELEGRAM_BOT_TOKEN === 'your_token_here') { interface PendingMessage { chatId: number; messageId: number; + messageThreadId?: number; timestamp: number; } @@ -280,8 +281,8 @@ bot.getMe().then(async (me: TelegramBot.User) => { // Message received - Write to queue bot.on('message', async (msg: TelegramBot.Message) => { try { - // Skip group/channel messages - only handle private chats - if (msg.chat.type !== 'private') { + // Skip channel posts - handle private chats, groups, supergroups, and forums + if (msg.chat.type === 'channel') { return; } @@ -355,12 +356,17 @@ bot.on('message', async (msg: TelegramBot.Message) => { log('INFO', `Message from ${sender}: ${messageText.substring(0, 50)}${downloadedFiles.length > 0 ? ` [+${downloadedFiles.length} file(s)]` : ''}...`); + const threadOpts = (msg as any).message_thread_id + ? { message_thread_id: (msg as any).message_thread_id } as any + : {}; + const pairing = ensureSenderPaired(PAIRING_FILE, 'telegram', senderId, sender); if (!pairing.approved && pairing.code) { if (pairing.isNewPending) { log('INFO', `Blocked unpaired Telegram sender ${sender} (${senderId}) with code ${pairing.code}`); await bot.sendMessage(msg.chat.id, pairingMessage(pairing.code), { reply_to_message_id: msg.message_id, + ...threadOpts, }); } else { log('INFO', `Blocked pending Telegram sender ${sender} (${senderId}) without re-sending pairing message`); @@ -374,6 +380,7 @@ bot.on('message', async (msg: TelegramBot.Message) => { const agentList = getAgentListText(); await bot.sendMessage(msg.chat.id, agentList, { reply_to_message_id: msg.message_id, + ...threadOpts, }); return; } @@ -384,6 +391,7 @@ bot.on('message', async (msg: TelegramBot.Message) => { const teamList = getTeamListText(); await bot.sendMessage(msg.chat.id, teamList, { reply_to_message_id: msg.message_id, + ...threadOpts, }); return; } @@ -393,6 +401,7 @@ bot.on('message', async (msg: TelegramBot.Message) => { if (messageText.trim().match(/^[!/]reset$/i)) { await bot.sendMessage(msg.chat.id, 'Usage: /reset @agent_id [@agent_id2 ...]\nSpecify which agent(s) to reset.', { reply_to_message_id: msg.message_id, + ...threadOpts, }); return; } @@ -417,10 +426,12 @@ bot.on('message', async (msg: TelegramBot.Message) => { } await bot.sendMessage(msg.chat.id, resetResults.join('\n'), { reply_to_message_id: msg.message_id, + ...threadOpts, }); } catch { await bot.sendMessage(msg.chat.id, 'Could not process reset command. Check settings.', { reply_to_message_id: msg.message_id, + ...threadOpts, }); } return; @@ -456,6 +467,7 @@ bot.on('message', async (msg: TelegramBot.Message) => { pendingMessages.set(queueMessageId, { chatId: msg.chat.id, messageId: msg.message_id, + messageThreadId: (msg as any).message_thread_id, timestamp: Date.now(), }); @@ -498,6 +510,11 @@ async function checkOutgoingQueue(): Promise { const targetChatId = pending?.chatId ?? (senderId ? Number(senderId) : null); if (targetChatId && !Number.isNaN(targetChatId)) { + // Thread options for file and message sending + const threadOpts = pending?.messageThreadId + ? { message_thread_id: pending.messageThreadId } as any + : {}; + // Send any attached files first if (files.length > 0) { for (const file of files) { @@ -505,13 +522,13 @@ async function checkOutgoingQueue(): Promise { if (!fs.existsSync(file)) continue; const ext = path.extname(file).toLowerCase(); if (['.jpg', '.jpeg', '.png', '.gif', '.webp'].includes(ext)) { - await bot.sendPhoto(targetChatId, file); + await bot.sendPhoto(targetChatId, file, threadOpts); } else if (['.mp3', '.ogg', '.wav', '.m4a'].includes(ext)) { - await bot.sendAudio(targetChatId, file); + await bot.sendAudio(targetChatId, file, threadOpts); } else if (['.mp4', '.avi', '.mov', '.webm'].includes(ext)) { - await bot.sendVideo(targetChatId, file); + await bot.sendVideo(targetChatId, file, threadOpts); } else { - await bot.sendDocument(targetChatId, file); + await bot.sendDocument(targetChatId, file, threadOpts); } log('INFO', `Sent file to Telegram: ${path.basename(file)}`); } catch (fileErr) { @@ -526,12 +543,12 @@ async function checkOutgoingQueue(): Promise { if (chunks.length > 0) { await sendTelegramMessage(targetChatId, chunks[0]!, pending - ? { reply_to_message_id: pending.messageId } - : {}, + ? { reply_to_message_id: pending.messageId, ...threadOpts } + : threadOpts, ); } for (let i = 1; i < chunks.length; i++) { - await sendTelegramMessage(targetChatId, chunks[i]!); + await sendTelegramMessage(targetChatId, chunks[i]!, threadOpts); } } @@ -561,7 +578,10 @@ setInterval(checkOutgoingQueue, 1000); // Refresh typing indicator every 4 seconds for pending messages setInterval(() => { for (const [, data] of pendingMessages.entries()) { - bot.sendChatAction(data.chatId, 'typing').catch(() => { + const opts = data.messageThreadId + ? { message_thread_id: data.messageThreadId } as any + : {}; + bot.sendChatAction(data.chatId, 'typing', opts).catch(() => { // Ignore typing errors silently }); } diff --git a/src/lib/invoke.ts b/src/lib/invoke.ts index 020b1bca..058fb6b3 100644 --- a/src/lib/invoke.ts +++ b/src/lib/invoke.ts @@ -1,20 +1,43 @@ import { spawn } from 'child_process'; import fs from 'fs'; import path from 'path'; +import os from 'os'; import { AgentConfig, TeamConfig } from './types'; import { SCRIPT_DIR, resolveClaudeModel, resolveCodexModel, resolveOpenCodeModel } from './config'; import { log } from './logging'; import { ensureAgentDirectory, updateAgentTeammates } from './agent'; -export async function runCommand(command: string, args: string[], cwd?: string): Promise { +const DEFAULT_COMMAND_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes + +// Ensure ~/.local/bin and ~/bin are in PATH for spawned processes +const HOME = os.homedir(); +const extraPaths = [ + path.join(HOME, '.local', 'bin'), + path.join(HOME, 'bin'), + '/usr/local/bin', +].filter(p => fs.existsSync(p)); +const spawnPath = [...extraPaths, process.env.PATH].join(':'); + +export async function runCommand(command: string, args: string[], cwd?: string, timeoutMs?: number): Promise { return new Promise((resolve, reject) => { const child = spawn(command, args, { cwd: cwd || SCRIPT_DIR, stdio: ['ignore', 'pipe', 'pipe'], + env: { ...process.env, PATH: spawnPath }, }); let stdout = ''; let stderr = ''; + let killed = false; + + const timeout = setTimeout(() => { + killed = true; + child.kill('SIGTERM'); + // Force kill if SIGTERM doesn't work after 5s + setTimeout(() => { + if (!child.killed) child.kill('SIGKILL'); + }, 5000); + }, timeoutMs ?? DEFAULT_COMMAND_TIMEOUT_MS); child.stdout.setEncoding('utf8'); child.stderr.setEncoding('utf8'); @@ -28,10 +51,18 @@ export async function runCommand(command: string, args: string[], cwd?: string): }); child.on('error', (error) => { + clearTimeout(timeout); reject(error); }); child.on('close', (code) => { + clearTimeout(timeout); + + if (killed) { + reject(new Error(`Command timed out after ${(timeoutMs ?? DEFAULT_COMMAND_TIMEOUT_MS) / 1000}s and was killed`)); + return; + } + if (code === 0) { resolve(stdout); return; diff --git a/src/queue-processor.ts b/src/queue-processor.ts index 7fef61e1..ccd76125 100644 --- a/src/queue-processor.ts +++ b/src/queue-processor.ts @@ -364,10 +364,19 @@ emitEvent('processor_start', { agents: Object.keys(getAgents(getSettings())), te // Event-driven: all messages come through the API server (same process) queueEvents.on('message:enqueued', () => processQueue()); +// Process any pending messages left from previous session +setTimeout(() => processQueue(), 1000); + +// Periodic fallback poll — catches messages missed by events or recovered from stale state +setInterval(() => processQueue(), 30 * 1000); // every 30s + // Periodic maintenance setInterval(() => { const count = recoverStaleMessages(); - if (count > 0) log('INFO', `Recovered ${count} stale message(s)`); + if (count > 0) { + log('INFO', `Recovered ${count} stale message(s)`); + processQueue(); + } }, 5 * 60 * 1000); // every 5 min setInterval(() => { From e2ce3407cfc2f50f00e48504f719005c27053a72 Mon Sep 17 00:00:00 2001 From: Geoff Date: Fri, 27 Feb 2026 12:23:00 +1100 Subject: [PATCH 2/5] feat: channel routing and @mention routing for Discord and Telegram MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add ChannelRouting and ChannelConfig types to Settings - Add resolveChannelAgent() for channel name → agent mapping - Add parseMentionRouting() for @agent extraction from messages - Discord: channel routing from settings, @mention priority routing, default_agent fallback for guild messages - Telegram: @mention routing in groups/supergroups, topic/group name routing via channel_routing config, default_agent fallback - Existing channel_routing in settings.json now functional Co-Authored-By: Claude Opus 4.6 --- src/channels/discord-client.ts | 41 +++++++++++++++++- src/channels/telegram-client.ts | 53 ++++++++++++++++++++++++ src/lib/routing.ts | 73 +++++++++++++++++++++++++++++++++ src/lib/types.ts | 15 ++++++- 4 files changed, 179 insertions(+), 3 deletions(-) diff --git a/src/channels/discord-client.ts b/src/channels/discord-client.ts index 3254e67b..d03f7bd9 100644 --- a/src/channels/discord-client.ts +++ b/src/channels/discord-client.ts @@ -12,6 +12,7 @@ import path from 'path'; import https from 'https'; import http from 'http'; import { ensureSenderPaired } from '../lib/pairing'; +import { resolveChannelAgent, parseMentionRouting } from '../lib/routing'; const API_PORT = parseInt(process.env.TINYCLAW_API_PORT || '3777', 10); const API_BASE = `http://localhost:${API_PORT}`; @@ -223,9 +224,46 @@ client.on(Events.MessageCreate, async (message: Message) => { return; } - // In server channels, strip any bot mention from the text but respond to all messages + // In server channels, strip bot mentions and determine routing + let routedAgent: string | undefined; if (message.guild) { + // Check if bot was mentioned (before stripping mentions) + const botMentioned = client.user && message.mentions.has(client.user); + + // Strip bot mentions from content message.content = message.content.replace(/<@!?\d+>/g, '').trim(); + + // Load settings for routing decisions + const settingsData = fs.readFileSync(SETTINGS_FILE, 'utf8'); + const settings = JSON.parse(settingsData); + const agents = settings.agents || {}; + const discordConfig = settings.channels?.discord || {}; + + // Priority 1: @mention routing - first word after bot mention is agent name + if (botMentioned && message.content) { + const mentionRoute = parseMentionRouting(message.content, agents); + if (mentionRoute) { + routedAgent = mentionRoute.agentId; + message.content = mentionRoute.cleanMessage; + log('INFO', `@mention routed to agent: \${routedAgent}`); + } + } + + // Priority 2: Channel-based routing + if (!routedAgent) { + const channelName = 'name' in message.channel ? (message.channel as TextChannel).name : ''; + const channelAgent = resolveChannelAgent(channelName, discordConfig.channel_routing, agents); + if (channelAgent) { + routedAgent = channelAgent; + log('INFO', `Channel "\${channelName}" routed to agent: \${routedAgent}`); + } + } + + // Priority 3: Default agent from discord config + if (!routedAgent && discordConfig.default_agent && agents[discordConfig.default_agent]) { + routedAgent = discordConfig.default_agent; + log('INFO', `Using default discord agent: \${routedAgent}`); + } } const hasAttachments = message.attachments.size > 0; @@ -364,6 +402,7 @@ client.on(Events.MessageCreate, async (message: Message) => { senderId: message.author.id, message: fullMessage, messageId, + agent: routedAgent, files: downloadedFiles.length > 0 ? downloadedFiles : undefined, }), }); diff --git a/src/channels/telegram-client.ts b/src/channels/telegram-client.ts index 541a91d1..36cfb500 100644 --- a/src/channels/telegram-client.ts +++ b/src/channels/telegram-client.ts @@ -14,6 +14,7 @@ import path from 'path'; import https from 'https'; import http from 'http'; import { ensureSenderPaired } from '../lib/pairing'; +import { resolveChannelAgent, parseMentionRouting } from '../lib/routing'; const API_PORT = parseInt(process.env.TINYCLAW_API_PORT || '3777', 10); const API_BASE = `http://localhost:${API_PORT}`; @@ -262,7 +263,10 @@ function pairingMessage(code: string): string { const bot = new TelegramBot(TELEGRAM_BOT_TOKEN, { polling: true }); // Bot ready +let botUsername = ''; + bot.getMe().then(async (me: TelegramBot.User) => { + botUsername = (me.username || '').toLowerCase(); log('INFO', `Telegram bot connected as @${me.username}`); // Register bot commands so they appear in Telegram's "/" menu @@ -440,6 +444,54 @@ bot.on('message', async (msg: TelegramBot.Message) => { // Show typing indicator await bot.sendChatAction(msg.chat.id, 'typing'); + // Determine agent routing for group/supergroup messages + let routedAgent: string | undefined; + if (msg.chat.type === 'group' || msg.chat.type === 'supergroup') { + const settingsData = fs.readFileSync(SETTINGS_FILE, 'utf8'); + const settings = JSON.parse(settingsData); + const agents = settings.agents || {}; + const telegramConfig = settings.channels?.telegram || {}; + + // Check if bot was mentioned via @username in message text + const botMentionPattern = botUsername ? new RegExp(`@${botUsername}\\b`, 'i') : null; + const botWasMentioned = botMentionPattern && botMentionPattern.test(messageText); + + if (botWasMentioned && botUsername) { + // Strip bot mention and check for agent routing + const strippedText = messageText.replace(new RegExp(`@${botUsername}\\s*`, 'gi'), '').trim(); + const mentionRoute = parseMentionRouting(strippedText, agents); + if (mentionRoute) { + routedAgent = mentionRoute.agentId; + messageText = mentionRoute.cleanMessage; + log('INFO', `Telegram @mention routed to agent: ${routedAgent}`); + } else { + messageText = strippedText; + } + } + + // Channel/topic routing (lower priority than @mention) + if (!routedAgent) { + // Use topic name for forum topics, or chat title for regular groups + const topicName = (msg as any).forum_topic_created?.name + || (msg as any).reply_to_message?.forum_topic_created?.name + || ''; + const routingName = topicName || msg.chat.title || ''; + if (routingName) { + const channelAgent = resolveChannelAgent(routingName, telegramConfig.channel_routing, agents); + if (channelAgent) { + routedAgent = channelAgent; + log('INFO', `Telegram topic/group "${routingName}" routed to agent: ${routedAgent}`); + } + } + } + + // Default agent fallback + if (!routedAgent && telegramConfig.default_agent && agents[telegramConfig.default_agent]) { + routedAgent = telegramConfig.default_agent; + log('INFO', `Using default telegram agent: ${routedAgent}`); + } + } + // Build message text with file references let fullMessage = messageText; if (downloadedFiles.length > 0) { @@ -457,6 +509,7 @@ bot.on('message', async (msg: TelegramBot.Message) => { senderId, message: fullMessage, messageId: queueMessageId, + agent: routedAgent, files: downloadedFiles.length > 0 ? downloadedFiles : undefined, }), }); diff --git a/src/lib/routing.ts b/src/lib/routing.ts index b57a1666..02792c5a 100644 --- a/src/lib/routing.ts +++ b/src/lib/routing.ts @@ -136,3 +136,76 @@ export function parseAgentRouting( } return { agentId: 'default', message: rawMessage }; } + +/** + * Resolve an agent from channel routing configuration. + * Matches channel name (case-insensitive) against channel_routing keys. + * Supports exact match and partial matching (key contains channel name or vice versa). + * Returns agent ID if found and agent exists, null otherwise. + */ +export function resolveChannelAgent( + channelName: string, + channelRouting: Record | undefined, + agents: Record +): string | null { + if (!channelRouting || !channelName) return null; + + const normalizedChannel = channelName.toLowerCase(); + + // Exact match first + for (const [key, agentId] of Object.entries(channelRouting)) { + if (key.toLowerCase() === normalizedChannel) { + if (agents[agentId]) return agentId; + // Try matching by agent name + for (const [id, config] of Object.entries(agents)) { + if (config.name.toLowerCase() === agentId.toLowerCase()) return id; + } + } + } + + // Partial match: channel name contains key or key contains channel name + for (const [key, agentId] of Object.entries(channelRouting)) { + const normalizedKey = key.toLowerCase(); + if (normalizedChannel.includes(normalizedKey) || normalizedKey.includes(normalizedChannel)) { + if (agents[agentId]) return agentId; + for (const [id, config] of Object.entries(agents)) { + if (config.name.toLowerCase() === agentId.toLowerCase()) return id; + } + } + } + + return null; +} + +/** + * Parse @mention routing from a message in a channel context. + * Extracts the agent ID from the first word after bot mentions are stripped. + * Returns { agentId, cleanMessage } or null if no valid agent mention found. + */ +export function parseMentionRouting( + messageText: string, + agents: Record +): { agentId: string; cleanMessage: string } | null { + if (!messageText.trim()) return null; + + // Check if the first word is @agentId or just agentId + const match = messageText.match(/^@?(\S+)\s*([\s\S]*)$/); + if (!match) return null; + + const candidateId = match[1].toLowerCase(); + const rest = match[2].trim(); + + // Check agent IDs + if (agents[candidateId]) { + return { agentId: candidateId, cleanMessage: rest || messageText }; + } + + // Check agent names (case-insensitive) + for (const [id, config] of Object.entries(agents)) { + if (config.name.toLowerCase() === candidateId) { + return { agentId: id, cleanMessage: rest || messageText }; + } + } + + return null; +} diff --git a/src/lib/types.ts b/src/lib/types.ts index 46b3d566..e0569a0e 100644 --- a/src/lib/types.ts +++ b/src/lib/types.ts @@ -31,6 +31,17 @@ export interface ChainStep { response: string; } +export interface ChannelRouting { + [channelNameOrId: string]: string; // channel name/id → agent id +} + +export interface ChannelConfig { + bot_token?: string; + channel_routing?: ChannelRouting; + default_agent?: string; // fallback agent for unmapped channels + approved_users?: number[]; +} + export interface Settings { workspace?: { path?: string; @@ -38,8 +49,8 @@ export interface Settings { }; channels?: { enabled?: string[]; - discord?: { bot_token?: string }; - telegram?: { bot_token?: string }; + discord?: ChannelConfig; + telegram?: ChannelConfig; whatsapp?: {}; }; models?: { From 9677a9db3410c477391ab96ec51b87a93a08d8c9 Mon Sep 17 00:00:00 2001 From: Geoff Date: Fri, 27 Feb 2026 22:49:49 +1100 Subject: [PATCH 3/5] feat: per-agent timeout and progress streaming for long-running tasks - Add `timeout` field to AgentConfig (seconds, default 300) - Add RunCommandOptions with onOutput callback for stdout streaming - invokeAgent accepts InvokeOptions to forward onOutput to runCommand - Queue processor sends periodic progress updates to chat every 60s for agents with timeout > 5min, so users know the agent is still working - Progress messages include elapsed time and last output snippet Co-Authored-By: Claude Opus 4.6 --- src/lib/invoke.ts | 36 +++++++++++++++++++++++++------- src/lib/types.ts | 1 + src/queue-processor.ts | 47 +++++++++++++++++++++++++++++++++++++++--- 3 files changed, 73 insertions(+), 11 deletions(-) diff --git a/src/lib/invoke.ts b/src/lib/invoke.ts index 058fb6b3..115ca7e0 100644 --- a/src/lib/invoke.ts +++ b/src/lib/invoke.ts @@ -9,6 +9,12 @@ import { ensureAgentDirectory, updateAgentTeammates } from './agent'; const DEFAULT_COMMAND_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes +export interface RunCommandOptions { + cwd?: string; + timeoutMs?: number; + onOutput?: (chunk: string) => void; // called on each stdout chunk +} + // Ensure ~/.local/bin and ~/bin are in PATH for spawned processes const HOME = os.homedir(); const extraPaths = [ @@ -18,10 +24,16 @@ const extraPaths = [ ].filter(p => fs.existsSync(p)); const spawnPath = [...extraPaths, process.env.PATH].join(':'); -export async function runCommand(command: string, args: string[], cwd?: string, timeoutMs?: number): Promise { +export async function runCommand(command: string, args: string[], cwd?: string, timeoutMs?: number): Promise; +export async function runCommand(command: string, args: string[], opts?: RunCommandOptions): Promise; +export async function runCommand(command: string, args: string[], cwdOrOpts?: string | RunCommandOptions, timeoutMs?: number): Promise { + const opts: RunCommandOptions = typeof cwdOrOpts === 'string' + ? { cwd: cwdOrOpts, timeoutMs } + : (cwdOrOpts || {}); + return new Promise((resolve, reject) => { const child = spawn(command, args, { - cwd: cwd || SCRIPT_DIR, + cwd: opts.cwd || SCRIPT_DIR, stdio: ['ignore', 'pipe', 'pipe'], env: { ...process.env, PATH: spawnPath }, }); @@ -30,6 +42,8 @@ export async function runCommand(command: string, args: string[], cwd?: string, let stderr = ''; let killed = false; + const effectiveTimeout = opts.timeoutMs ?? DEFAULT_COMMAND_TIMEOUT_MS; + const timeout = setTimeout(() => { killed = true; child.kill('SIGTERM'); @@ -37,13 +51,14 @@ export async function runCommand(command: string, args: string[], cwd?: string, setTimeout(() => { if (!child.killed) child.kill('SIGKILL'); }, 5000); - }, timeoutMs ?? DEFAULT_COMMAND_TIMEOUT_MS); + }, effectiveTimeout); child.stdout.setEncoding('utf8'); child.stderr.setEncoding('utf8'); child.stdout.on('data', (chunk: string) => { stdout += chunk; + if (opts.onOutput) opts.onOutput(chunk); }); child.stderr.on('data', (chunk: string) => { @@ -59,7 +74,7 @@ export async function runCommand(command: string, args: string[], cwd?: string, clearTimeout(timeout); if (killed) { - reject(new Error(`Command timed out after ${(timeoutMs ?? DEFAULT_COMMAND_TIMEOUT_MS) / 1000}s and was killed`)); + reject(new Error(`Command timed out after ${effectiveTimeout / 1000}s and was killed`)); return; } @@ -78,6 +93,10 @@ export async function runCommand(command: string, args: string[], cwd?: string, * Invoke a single agent with a message. Contains all Claude/Codex invocation logic. * Returns the raw response text. */ +export interface InvokeOptions { + onOutput?: (chunk: string) => void; +} + export async function invokeAgent( agent: AgentConfig, agentId: string, @@ -85,7 +104,8 @@ export async function invokeAgent( workspacePath: string, shouldReset: boolean, agents: Record = {}, - teams: Record = {} + teams: Record = {}, + options: InvokeOptions = {} ): Promise { // Ensure agent directory exists with config files const agentDir = path.join(workspacePath, agentId); @@ -126,7 +146,7 @@ export async function invokeAgent( } codexArgs.push('--skip-git-repo-check', '--dangerously-bypass-approvals-and-sandbox', '--json', message); - const codexOutput = await runCommand('codex', codexArgs, workingDir); + const codexOutput = await runCommand('codex', codexArgs, { cwd: workingDir, timeoutMs: (agent.timeout || 300) * 1000, onOutput: options.onOutput }); // Parse JSONL output and extract final agent_message let response = ''; @@ -166,7 +186,7 @@ export async function invokeAgent( } opencodeArgs.push(message); - const opencodeOutput = await runCommand('opencode', opencodeArgs, workingDir); + const opencodeOutput = await runCommand('opencode', opencodeArgs, { cwd: workingDir, timeoutMs: (agent.timeout || 300) * 1000, onOutput: options.onOutput }); // Parse JSONL output and collect all text parts let response = ''; @@ -203,6 +223,6 @@ export async function invokeAgent( } claudeArgs.push('-p', message); - return await runCommand('claude', claudeArgs, workingDir); + return await runCommand('claude', claudeArgs, { cwd: workingDir, timeoutMs: (agent.timeout || 300) * 1000, onOutput: options.onOutput }); } } diff --git a/src/lib/types.ts b/src/lib/types.ts index e0569a0e..73b032ba 100644 --- a/src/lib/types.ts +++ b/src/lib/types.ts @@ -5,6 +5,7 @@ export interface AgentConfig { working_directory: string; system_prompt?: string; prompt_file?: string; + timeout?: number; // max execution time in seconds (default: 300) } export interface TeamConfig { diff --git a/src/queue-processor.ts b/src/queue-processor.ts index ccd76125..a415d4f3 100644 --- a/src/queue-processor.ts +++ b/src/queue-processor.ts @@ -23,7 +23,7 @@ import { } from './lib/config'; import { log, emitEvent } from './lib/logging'; import { parseAgentRouting, findTeamForAgent, getAgentResetFlag, extractTeammateMentions } from './lib/routing'; -import { invokeAgent } from './lib/invoke'; +import { invokeAgent, InvokeOptions } from './lib/invoke'; import { startApiServer } from './server'; import { initQueueDb, claimNextMessage, completeMessage as dbCompleteMessage, @@ -155,11 +155,52 @@ async function processMessage(dbMsg: DbMessage): Promise { } } - // Invoke agent + // Invoke agent with progress streaming for long-running tasks emitEvent('chain_step_start', { agentId, agentName: agent.name, fromAgent: messageData.fromAgent || null }); let response: string; try { - response = await invokeAgent(agent, agentId, message, workspacePath, shouldReset, agents, teams); + const agentTimeout = agent.timeout || 300; + const PROGRESS_INTERVAL_MS = 60 * 1000; // send update every 60s + let lastProgressTime = Date.now(); + let lastOutputSnippet = ''; + let progressCount = 0; + let progressTimer: ReturnType | null = null; + + // Only stream progress for agents with timeout > 5 min + if (agentTimeout > 300 && !isInternal) { + progressTimer = setInterval(() => { + progressCount++; + const elapsed = Math.round((Date.now() - lastProgressTime + (progressCount * PROGRESS_INTERVAL_MS)) / 1000 / 60); + const snippet = lastOutputSnippet ? `\n> ${lastOutputSnippet.substring(0, 200)}` : ''; + enqueueResponse({ + channel, + sender, + senderId: dbMsg.sender_id ?? undefined, + message: `⏳ @${agentId} is still working... (${elapsed} min)${snippet}`, + originalMessage: rawMessage, + messageId: `${messageId}_progress_${progressCount}`, + agent: agentId, + }); + log('INFO', `Progress update #${progressCount} for ${messageId} (agent: ${agentId})`); + }, PROGRESS_INTERVAL_MS); + } + + const invokeOpts: InvokeOptions = { + onOutput: (chunk: string) => { + // Capture last meaningful line for progress snippets + const trimmed = chunk.trim(); + if (trimmed.length > 0) { + const lines = trimmed.split('\n').filter(l => l.trim()); + if (lines.length > 0) { + lastOutputSnippet = lines[lines.length - 1]; + } + } + }, + }; + + response = await invokeAgent(agent, agentId, message, workspacePath, shouldReset, agents, teams, invokeOpts); + + if (progressTimer) clearInterval(progressTimer); } catch (error) { const provider = agent.provider || 'anthropic'; const providerLabel = provider === 'openai' ? 'Codex' : provider === 'opencode' ? 'OpenCode' : 'Claude'; From 1c8068e6a27634b4775d06711f1c03e1f80e4ba8 Mon Sep 17 00:00:00 2001 From: Geoff Date: Fri, 27 Feb 2026 23:35:00 +1100 Subject: [PATCH 4/5] fix: escaped template literals in Discord channel routing logs Co-Authored-By: Claude Opus 4.6 --- src/channels/discord-client.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/channels/discord-client.ts b/src/channels/discord-client.ts index d03f7bd9..f76be743 100644 --- a/src/channels/discord-client.ts +++ b/src/channels/discord-client.ts @@ -245,7 +245,7 @@ client.on(Events.MessageCreate, async (message: Message) => { if (mentionRoute) { routedAgent = mentionRoute.agentId; message.content = mentionRoute.cleanMessage; - log('INFO', `@mention routed to agent: \${routedAgent}`); + log('INFO', `@mention routed to agent: ${routedAgent}`); } } @@ -255,14 +255,14 @@ client.on(Events.MessageCreate, async (message: Message) => { const channelAgent = resolveChannelAgent(channelName, discordConfig.channel_routing, agents); if (channelAgent) { routedAgent = channelAgent; - log('INFO', `Channel "\${channelName}" routed to agent: \${routedAgent}`); + log('INFO', `Channel "${channelName}" routed to agent: ${routedAgent}`); } } // Priority 3: Default agent from discord config if (!routedAgent && discordConfig.default_agent && agents[discordConfig.default_agent]) { routedAgent = discordConfig.default_agent; - log('INFO', `Using default discord agent: \${routedAgent}`); + log('INFO', `Using default discord agent: ${routedAgent}`); } } From 76973ce7a130a42c95a2c50137114afcf9e0f9f5 Mon Sep 17 00:00:00 2001 From: Geoff Date: Fri, 27 Feb 2026 23:47:38 +1100 Subject: [PATCH 5/5] fix: resolve parent channel name for thread routing in Discord Threads have their own name (the thread title), not the parent channel name. Now resolves message.channel.parent.name when in a thread so channel_routing maps correctly. Co-Authored-By: Claude Opus 4.6 --- src/channels/discord-client.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/channels/discord-client.ts b/src/channels/discord-client.ts index f76be743..fdeecadb 100644 --- a/src/channels/discord-client.ts +++ b/src/channels/discord-client.ts @@ -250,8 +250,14 @@ client.on(Events.MessageCreate, async (message: Message) => { } // Priority 2: Channel-based routing + // For threads, resolve the parent channel name for routing if (!routedAgent) { - const channelName = 'name' in message.channel ? (message.channel as TextChannel).name : ''; + let channelName = ''; + if (message.channel.isThread() && message.channel.parent) { + channelName = message.channel.parent.name; + } else if ('name' in message.channel) { + channelName = (message.channel as TextChannel).name; + } const channelAgent = resolveChannelAgent(channelName, discordConfig.channel_routing, agents); if (channelAgent) { routedAgent = channelAgent;