diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml deleted file mode 100644 index 3e06c0c9..00000000 --- a/.github/workflows/release.yml +++ /dev/null @@ -1,136 +0,0 @@ -name: Create Release Bundle - -on: - push: - tags: - - 'v*.*.*' # Trigger on version tags like v1.0.0 - workflow_dispatch: # Allow manual trigger - -permissions: - contents: write - -jobs: - build-and-release: - runs-on: ubuntu-latest - - steps: - - name: Checkout code - uses: actions/checkout@v4 - with: - fetch-depth: 0 - fetch-tags: true - - - name: Setup Node.js - uses: actions/setup-node@v4 - with: - node-version: '18' - cache: 'npm' - - - name: Get version from tag - id: get_version - run: | - if [ "${{ github.ref_type }}" = "tag" ]; then - VERSION=${GITHUB_REF#refs/tags/} - else - VERSION=$(git describe --tags --always) - fi - echo "version=$VERSION" >> $GITHUB_OUTPUT - echo "Version: $VERSION" - - - name: Install dependencies - run: | - PUPPETEER_SKIP_DOWNLOAD=true npm ci --include=dev - - - name: Build TypeScript - run: npm run build - - - name: Prune development dependencies - run: npm prune --omit=dev - - - name: Create bundle tarball - run: | - VERSION="${{ steps.get_version.outputs.version }}" - BUNDLE_NAME="tinyclaw-bundle.tar.gz" - TEMP_DIR=$(mktemp -d) - BUNDLE_DIR="$TEMP_DIR/tinyclaw" - - mkdir -p "$BUNDLE_DIR" - - # Copy necessary files - cp -r bin/ "$BUNDLE_DIR/" - cp -r src/ "$BUNDLE_DIR/" - cp -r dist/ "$BUNDLE_DIR/" - cp -r node_modules/ "$BUNDLE_DIR/" - cp -r scripts/ "$BUNDLE_DIR/" - cp -r lib/ "$BUNDLE_DIR/" - cp -r docs/ "$BUNDLE_DIR/" - cp -r .agents/ "$BUNDLE_DIR/" - - cp tinyclaw.sh "$BUNDLE_DIR/" - - cp package.json "$BUNDLE_DIR/" - cp package-lock.json "$BUNDLE_DIR/" - cp tsconfig.json "$BUNDLE_DIR/" - cp tsconfig.visualizer.json "$BUNDLE_DIR/" - cp README.md "$BUNDLE_DIR/" - cp AGENTS.md "$BUNDLE_DIR/" - cp SOUL.md "$BUNDLE_DIR/" - cp heartbeat.md "$BUNDLE_DIR/" - cp .gitignore "$BUNDLE_DIR/" - - # Make scripts executable - chmod +x "$BUNDLE_DIR/bin/tinyclaw" - chmod +x "$BUNDLE_DIR/tinyclaw.sh" - chmod +x "$BUNDLE_DIR/scripts/install.sh" - chmod +x "$BUNDLE_DIR/scripts/uninstall.sh" - chmod +x "$BUNDLE_DIR/scripts/bundle.sh" - chmod +x "$BUNDLE_DIR/scripts/remote-install.sh" - chmod +x "$BUNDLE_DIR/lib/setup-wizard.sh" - chmod +x "$BUNDLE_DIR/lib/heartbeat-cron.sh" - chmod +x "$BUNDLE_DIR/lib/update.sh" - - # Create tarball - cd "$TEMP_DIR" - tar -czf "$GITHUB_WORKSPACE/$BUNDLE_NAME" tinyclaw/ - - # Get bundle info - BUNDLE_SIZE=$(du -h "$GITHUB_WORKSPACE/$BUNDLE_NAME" | cut -f1) - echo "Bundle created: $BUNDLE_NAME ($BUNDLE_SIZE)" - echo "bundle_name=$BUNDLE_NAME" >> $GITHUB_OUTPUT - echo "bundle_size=$BUNDLE_SIZE" >> $GITHUB_OUTPUT - - - name: Extract release notes from tag - id: release_notes - run: | - VERSION="${{ steps.get_version.outputs.version }}" - # Use for-each-ref to reliably get the annotated tag message (not the commit message) - TAG_MESSAGE=$(git for-each-ref --format='%(contents)' "refs/tags/$VERSION") - if [ -n "$TAG_MESSAGE" ]; then - echo "$TAG_MESSAGE" > release_notes.md - else - cat > release_notes.md < working...` +3. As the agent works, tool calls and results stream into the thread in real-time +4. When the agent finishes, a completion message is posted in the thread +5. The final response is delivered as a normal reply to the original message + +## Enabling + +Add `"stream_logs": true` to any agent in `.tinyclaw/settings.json`: + +```json +{ + "agents": { + "my-agent": { + "name": "MyAgent", + "provider": "anthropic", + "model": "opus", + "working_directory": "/path/to/workspace", + "stream_logs": true + } + } +} +``` + +To disable, set `"stream_logs": false` or remove the field. + +## Requirements + +- **Provider**: Only works with `anthropic` (Claude) agents. Other providers fall back to normal invocation silently. +- **Channel**: Only works in Discord guild (server) channels. DMs are skipped gracefully — the agent still responds normally, just without the thread. +- **System**: Requires the `script` command (from `util-linux`, available on all standard Linux systems). + +## What Shows in the Thread + +| Agent Activity | Thread Message | +|---|---| +| Tool call (Bash) | `` Tool: `Bash` `echo hello` `` | +| Tool call (Read) | `` Tool: `Read` `/path/to/file` `` | +| Tool call (Glob) | `` Tool: `Glob` `**/*.ts` `` | +| Tool result | `Result: ` | +| Agent starts | `Agent **MyAgent** is processing...` | +| Agent finishes | `Agent **MyAgent** finished.` | + +Events that are **not** shown: system init, rate limits, final text response, thinking blocks. + +## Architecture + +``` +Discord User + │ + ▼ +Discord Client ──POST──▶ Queue Processor + │ │ + │◀──────── SSE ───────────┤ (stream_start, stream_log, stream_end) + │ │ + ▼ ▼ +Discord Thread Claude CLI (--output-format stream-json --verbose) +``` + +- **Queue Processor** invokes Claude CLI with `--output-format stream-json --verbose` via a PTY (`script -qec`) to get real-time line-by-line JSON output +- Each JSON line is parsed by `formatStreamEvent()` into a human-readable string +- Formatted events are broadcast as SSE events (`stream_start`, `stream_log`, `stream_end`) +- **Discord Client** connects to the SSE endpoint on startup, creates threads on `stream_start`, buffers and flushes log lines on `stream_log`, and cleans up on `stream_end` + +### Batching + +Log messages are batched to avoid Discord rate limits: +- Flushed every **2 seconds** or when **15 lines** accumulate (whichever comes first) +- Each message is capped at Discord's **2000 character** limit + +## Files Changed + +| File | Change | +|---|---| +| `src/lib/types.ts` | Added `stream_logs?: boolean` to `AgentConfig`, new `StreamLogEvent` interface | +| `src/lib/invoke.ts` | Added `runCommandStreaming()` (PTY-based line streaming) and `invokeAgentStreaming()` | +| `src/queue-processor.ts` | Added `formatStreamEvent()` helper and streaming branch in `processMessage()` | +| `src/channels/discord-client.ts` | Added SSE client (`connectSSE`), thread management, batched log delivery | + +## Troubleshooting + +**Thread not created**: Check that the message is from a guild channel (not a DM) and the agent has `"stream_logs": true`. + +**No tool calls in thread**: Check queue logs for `[stream-debug]` entries. If `total lines: 0`, the PTY wrapper may not be working — verify `script` command is available. + +**Thread shows only start/end**: The agent may have responded without using any tools (e.g., a simple text reply). diff --git a/src/channels/discord-client.ts b/src/channels/discord-client.ts index 7a420fb0..5c6ed9fc 100644 --- a/src/channels/discord-client.ts +++ b/src/channels/discord-client.ts @@ -5,7 +5,7 @@ * Does NOT call Claude directly - that's handled by queue-processor */ -import { Client, Events, GatewayIntentBits, Partials, Message, DMChannel, AttachmentBuilder } from 'discord.js'; +import { Client, Events, GatewayIntentBits, Partials, Message, DMChannel, TextChannel, ThreadChannel, AttachmentBuilder, REST, Routes, SlashCommandBuilder, ChatInputCommandInteraction, ThreadAutoArchiveDuration } from 'discord.js'; import 'dotenv/config'; import fs from 'fs'; import path from 'path'; @@ -43,7 +43,7 @@ if (!DISCORD_BOT_TOKEN || DISCORD_BOT_TOKEN === 'your_token_here') { interface PendingMessage { message: Message; - channel: DMChannel; + channel: DMChannel | TextChannel; timestamp: number; } @@ -96,6 +96,203 @@ function downloadFile(url: string, destPath: string): Promise { const pendingMessages = new Map(); let processingOutgoingQueue = false; +// ─── Stream log thread management ──────────────────────────────────────────── + +interface ActiveLogThread { + thread: ThreadChannel; + buffer: string[]; + flushTimer: ReturnType | null; + lastSendTime: number; +} + +const activeLogThreads = new Map(); + +const STREAM_FLUSH_INTERVAL_MS = 2000; +const STREAM_FLUSH_LINE_THRESHOLD = 15; +const DISCORD_MAX_LENGTH = 2000; + +async function flushLogBuffer(messageId: string): Promise { + const entry = activeLogThreads.get(messageId); + if (!entry || entry.buffer.length === 0) return; + + let text = entry.buffer.join('\n'); + if (text.length > DISCORD_MAX_LENGTH) { + text = text.substring(0, DISCORD_MAX_LENGTH - 4) + '...'; + } + entry.buffer = []; + entry.lastSendTime = Date.now(); + + try { + await entry.thread.send(text); + } catch (err) { + log('ERROR', `Failed to send log to thread: ${(err as Error).message}`); + } +} + +function scheduleFlush(messageId: string): void { + const entry = activeLogThreads.get(messageId); + if (!entry) return; + + // Force flush if buffer is large + if (entry.buffer.length >= STREAM_FLUSH_LINE_THRESHOLD) { + if (entry.flushTimer) clearTimeout(entry.flushTimer); + entry.flushTimer = null; + flushLogBuffer(messageId); + return; + } + + // Otherwise schedule a timed flush if not already pending + if (!entry.flushTimer) { + entry.flushTimer = setTimeout(() => { + entry.flushTimer = null; + flushLogBuffer(messageId); + }, STREAM_FLUSH_INTERVAL_MS); + } +} + +async function handleStreamStart(data: any): Promise { + const { messageId, agentName } = data; + const pending = pendingMessages.get(messageId); + if (!pending) return; + + // Threads only work in guild (server) channels, not DMs + if (!pending.message.guild) return; + + try { + const thread = await pending.message.startThread({ + name: `${agentName || 'Agent'} working...`, + autoArchiveDuration: ThreadAutoArchiveDuration.OneHour, + }); + activeLogThreads.set(messageId, { + thread, + buffer: [], + flushTimer: null, + lastSendTime: Date.now(), + }); + await thread.send(`Agent **${agentName}** is processing...`); + log('INFO', `Created log thread for ${messageId} (${agentName})`); + } catch (err) { + log('ERROR', `Failed to create log thread: ${(err as Error).message}`); + } +} + +function handleStreamLog(data: any): void { + const { messageId, content } = data; + if (!content) return; + const entry = activeLogThreads.get(messageId); + if (!entry) return; + + entry.buffer.push(content); + scheduleFlush(messageId); +} + +async function handleStreamEnd(data: any): Promise { + const { messageId, agentName } = data; + const entry = activeLogThreads.get(messageId); + if (!entry) return; + + // Flush remaining buffer + if (entry.flushTimer) { + clearTimeout(entry.flushTimer); + entry.flushTimer = null; + } + await flushLogBuffer(messageId); + + try { + await entry.thread.send(`Agent **${agentName || 'Agent'}** finished.`); + } catch { + // ignore + } + + activeLogThreads.delete(messageId); +} + +/** + * Connect to the queue processor's SSE endpoint to receive real-time stream events. + * Reconnects automatically on disconnect. + */ +function connectSSE(): void { + let reconnectDelay = 2000; + + function connect(): void { + const req = http.get(`${API_BASE}/api/events/stream`, (res) => { + if (res.statusCode !== 200) { + res.resume(); + scheduleReconnect(); + return; + } + + reconnectDelay = 2000; // reset on successful connect + log('INFO', 'SSE connected to queue processor'); + res.setEncoding('utf8'); + + let eventType = ''; + let dataLines: string[] = []; + + res.on('data', (chunk: string) => { + const lines = chunk.split('\n'); + for (const line of lines) { + if (line.startsWith('event: ')) { + eventType = line.slice(7).trim(); + } else if (line.startsWith('data: ')) { + dataLines.push(line.slice(6)); + } else if (line === '') { + // End of SSE message + if (eventType && dataLines.length > 0) { + try { + const payload = JSON.parse(dataLines.join('\n')); + if (eventType === 'stream_start') { + log('INFO', `SSE stream_start: messageId=${payload.messageId} agent=${payload.agentName}`); + handleStreamStart(payload).catch(err => log('ERROR', `handleStreamStart error: ${(err as Error).message}`)); + } else if (eventType === 'stream_log') { + handleStreamLog(payload); + } else if (eventType === 'stream_end') { + log('INFO', `SSE stream_end: messageId=${payload.messageId}`); + handleStreamEnd(payload).catch(err => log('ERROR', `handleStreamEnd error: ${(err as Error).message}`)); + } + } catch (err) { + log('ERROR', `SSE parse error: ${(err as Error).message}`); + } + } + eventType = ''; + dataLines = []; + } + } + }); + + res.on('end', () => { + scheduleReconnect(); + }); + + res.on('error', () => { + scheduleReconnect(); + }); + }); + + req.on('error', () => { + scheduleReconnect(); + }); + } + + function scheduleReconnect(): void { + const delay = Math.min(reconnectDelay, 5000); + reconnectDelay = Math.min(reconnectDelay * 1.5, 5000); + setTimeout(connect, delay); + } + + connect(); +} + +// Clean up stale log threads every 5 minutes +setInterval(() => { + for (const [messageId, entry] of activeLogThreads.entries()) { + if (!pendingMessages.has(messageId) && Date.now() - entry.lastSendTime > 5 * 60 * 1000) { + if (entry.flushTimer) clearTimeout(entry.flushTimer); + activeLogThreads.delete(messageId); + } + } +}, 5 * 60 * 1000); + // Logger function log(level: string, message: string): void { const timestamp = new Date().toISOString(); @@ -104,49 +301,85 @@ function log(level: string, message: string): void { fs.appendFileSync(LOG_FILE, logMessage); } -// Load teams from settings for /team command -function getTeamListText(): string { +// Cached settings reader — single parse shared by all consumers +let _cachedSettings: any = null; +let _settingsMtime = 0; + +function getCachedSettings(): any { try { - const settingsData = fs.readFileSync(SETTINGS_FILE, 'utf8'); - const settings = JSON.parse(settingsData); - const teams = settings.teams; - if (!teams || Object.keys(teams).length === 0) { - return 'No teams configured.\n\nCreate a team with `tinyclaw team add`.'; - } - let text = '**Available Teams:**\n'; - for (const [id, team] of Object.entries(teams) as [string, any][]) { - text += `\n**@${id}** - ${team.name}`; - text += `\n Agents: ${team.agents.join(', ')}`; - text += `\n Leader: @${team.leader_agent}`; + const mtime = fs.statSync(SETTINGS_FILE).mtimeMs; + if (!_cachedSettings || mtime !== _settingsMtime) { + _cachedSettings = JSON.parse(fs.readFileSync(SETTINGS_FILE, 'utf8')); + _settingsMtime = mtime; } - text += '\n\nUsage: Start your message with `@team_id` to route to a team.'; - return text; + return _cachedSettings; } catch { - return 'Could not load team configuration.'; + return null; + } +} + +function getTeamListText(): string { + const settings = getCachedSettings(); + if (!settings) return 'Could not load team configuration.'; + const teams = settings.teams; + if (!teams || Object.keys(teams).length === 0) { + return 'No teams configured.\n\nCreate a team with `tinyclaw team add`.'; + } + let text = '**Available Teams:**\n'; + for (const [id, team] of Object.entries(teams) as [string, any][]) { + text += `\n**@${id}** - ${team.name}`; + text += `\n Agents: ${team.agents.join(', ')}`; + text += `\n Leader: @${team.leader_agent}`; } + text += '\n\nUsage: Start your message with `@team_id` to route to a team.'; + return text; } -// Load agents from settings for /agent command function getAgentListText(): string { - try { - const settingsData = fs.readFileSync(SETTINGS_FILE, 'utf8'); - const settings = JSON.parse(settingsData); - const agents = settings.agents; - if (!agents || Object.keys(agents).length === 0) { - return 'No agents configured. Using default single-agent mode.\n\nConfigure agents in `.tinyclaw/settings.json` or run `tinyclaw agent add`.'; - } - let text = '**Available Agents:**\n'; - for (const [id, agent] of Object.entries(agents) as [string, any][]) { - text += `\n**@${id}** - ${agent.name}`; - text += `\n Provider: ${agent.provider}/${agent.model}`; - text += `\n Directory: ${agent.working_directory}`; - if (agent.system_prompt) text += `\n Has custom system prompt`; - if (agent.prompt_file) text += `\n Prompt file: ${agent.prompt_file}`; + const settings = getCachedSettings(); + if (!settings) return 'Could not load agent configuration.'; + const agents = settings.agents; + if (!agents || Object.keys(agents).length === 0) { + return 'No agents configured. Using default single-agent mode.\n\nConfigure agents in `.tinyclaw/settings.json` or run `tinyclaw agent add`.'; + } + let text = '**Available Agents:**\n'; + for (const [id, agent] of Object.entries(agents) as [string, any][]) { + text += `\n**@${id}** - ${agent.name}`; + text += `\n Provider: ${agent.provider}/${agent.model}`; + text += `\n Directory: ${agent.working_directory}`; + if (agent.system_prompt) text += `\n Has custom system prompt`; + if (agent.prompt_file) text += `\n Prompt file: ${agent.prompt_file}`; + } + text += '\n\nUsage: Start your message with `@agent_id` to route to a specific agent.'; + return text; +} + +// Shared reset logic +function resetAgents(agentArgs: string[]): string[] { + const settings = getCachedSettings(); + if (!settings) return ['Could not load settings.']; + const agents = settings.agents || {}; + const workspacePath = settings?.workspace?.path || path.join(require('os').homedir(), 'tinyclaw-workspace'); + const results: string[] = []; + for (const agentId of agentArgs) { + if (!agents[agentId]) { + results.push(`Agent '${agentId}' not found.`); + continue; } - text += '\n\nUsage: Start your message with `@agent_id` to route to a specific agent.'; - return text; - } catch { - return 'Could not load agent configuration.'; + const flagDir = path.join(workspacePath, agentId); + if (!fs.existsSync(flagDir)) fs.mkdirSync(flagDir, { recursive: true }); + fs.writeFileSync(path.join(flagDir, 'reset_flag'), 'reset'); + results.push(`Reset @${agentId} (${agents[agentId].name}).`); + } + return results; +} + +// Reply with message splitting for slash commands +async function interactionReplySplit(interaction: ChatInputCommandInteraction, text: string): Promise { + const chunks = splitMessage(text); + await interaction.reply(chunks[0]!); + for (let i = 1; i < chunks.length; i++) { + await interaction.followUp(chunks[i]!); } } @@ -194,11 +427,47 @@ function pairingMessage(code: string): string { ].join('\n'); } +// Guild channel configuration +type GuildChannelConfig = Record; +let guildChannels: GuildChannelConfig = {}; + +function loadGuildChannels(): void { + const settings = getCachedSettings(); + guildChannels = settings?.channels?.discord?.guild_channels || {}; +} + +// Load on startup +loadGuildChannels(); + +// Reload every 30 seconds to pick up config changes +setInterval(loadGuildChannels, 30_000); + +// Slash command definitions +const slashCommands = [ + new SlashCommandBuilder() + .setName('agent') + .setDescription('List all configured agents'), + new SlashCommandBuilder() + .setName('team') + .setDescription('List all configured teams'), + new SlashCommandBuilder() + .setName('reset') + .setDescription('Reset one or more agents') + .addStringOption(option => + option + .setName('agent_ids') + .setDescription('Space-separated agent IDs to reset (e.g. "coder writer")') + .setRequired(true) + .setAutocomplete(true) + ), +]; + // Initialize Discord client const client = new Client({ intents: [ GatewayIntentBits.Guilds, GatewayIntentBits.DirectMessages, + GatewayIntentBits.GuildMessages, GatewayIntentBits.MessageContent, ], partials: [ @@ -207,10 +476,104 @@ const client = new Client({ ], }); +// Register slash commands for a single guild +const commandData = slashCommands.map(cmd => cmd.toJSON()); + +async function registerGuildCommands(appId: string, guildId: string, guildName: string): Promise { + const rest = new REST({ version: '10' }).setToken(DISCORD_BOT_TOKEN!); + try { + await rest.put(Routes.applicationGuildCommands(appId, guildId), { body: commandData }); + log('INFO', `Registered ${commandData.length} slash commands for guild "${guildName}" (${guildId})`); + } catch (err) { + log('ERROR', `Failed to register slash commands for guild "${guildName}" (${guildId}): ${(err as Error).message}`); + } +} + // Client ready -client.on(Events.ClientReady, (readyClient) => { +client.on(Events.ClientReady, async (readyClient) => { log('INFO', `Discord bot connected as ${readyClient.user.tag}`); log('INFO', 'Listening for DMs...'); + + for (const [guildId, guild] of readyClient.guilds.cache) { + await registerGuildCommands(readyClient.user.id, guildId, guild.name); + } +}); + +// Register slash commands when bot joins a new guild +client.on(Events.GuildCreate, async (guild) => { + if (!client.user) return; + log('INFO', `Joined new guild "${guild.name}" (${guild.id})`); + await registerGuildCommands(client.user.id, guild.id, guild.name); +}); + +// Slash command interactions +client.on(Events.InteractionCreate, async (interaction) => { + try { + // Handle autocomplete for /reset + if (interaction.isAutocomplete()) { + if (interaction.commandName !== 'reset') return; + + const focusedValue = interaction.options.getFocused(); + try { + const settings = getCachedSettings(); + const agentIds = Object.keys(settings?.agents || {}); + + // Support space-separated multi-agent input: autocomplete the last token + const tokens = focusedValue.split(/\s+/); + const prefix = tokens.length > 1 ? tokens.slice(0, -1).join(' ') + ' ' : ''; + const lastToken = (tokens[tokens.length - 1] || '').toLowerCase(); + const alreadySelected = new Set(tokens.slice(0, -1).map((t: string) => t.toLowerCase())); + + const choices = agentIds + .filter(id => !alreadySelected.has(id) && id.toLowerCase().startsWith(lastToken)) + .slice(0, 25) + .map(id => ({ name: prefix + id, value: prefix + id })); + + await interaction.respond(choices); + } catch { + await interaction.respond([]); + } + return; + } + + // Handle slash commands + if (!interaction.isChatInputCommand()) return; + + const { commandName } = interaction; + + if (commandName === 'agent') { + log('INFO', 'Slash command /agent received'); + await interactionReplySplit(interaction, getAgentListText()); + return; + } + + if (commandName === 'team') { + log('INFO', 'Slash command /team received'); + await interactionReplySplit(interaction, getTeamListText()); + return; + } + + if (commandName === 'reset') { + log('INFO', 'Slash command /reset received'); + const agentIdsRaw = interaction.options.getString('agent_ids', true); + const agentArgs = agentIdsRaw.split(/\s+/).map(a => a.replace(/^@/, '').toLowerCase()).filter(Boolean); + + if (agentArgs.length === 0) { + await interaction.reply({ content: 'Please specify at least one agent ID to reset.', ephemeral: true }); + return; + } + + await interactionReplySplit(interaction, resetAgents(agentArgs).join('\n')); + return; + } + } catch (error) { + log('ERROR', `Interaction handling error: ${(error as Error).message}`); + try { + if (interaction.isRepliable() && !interaction.replied && !interaction.deferred) { + await interaction.reply({ content: 'An error occurred processing this command.', ephemeral: true }); + } + } catch { /* ignore reply failure */ } + } }); // Message received - Write to queue @@ -221,8 +584,12 @@ client.on(Events.MessageCreate, async (message: Message) => { return; } - // Skip non-DM messages (guild = server channel) - if (message.guild) { + // Determine if this is a guild (server) message and whether to process it + const isGuild = !!message.guild; + const botMentioned = isGuild && client.user ? message.mentions.has(client.user) : false; + const isDesignatedChannel = isGuild && Object.prototype.hasOwnProperty.call(guildChannels, message.channel.id); + + if (isGuild && !botMentioned && !isDesignatedChannel) { return; } @@ -259,7 +626,15 @@ client.on(Events.MessageCreate, async (message: Message) => { let messageText = message.content || ''; - log('INFO', `Message from ${sender}: ${messageText.substring(0, 50)}${downloadedFiles.length > 0 ? ` [+${downloadedFiles.length} file(s)]` : ''}...`); + // Strip bot @mention and role mentions from guild messages + if (isGuild) { + if (client.user) { + messageText = messageText.replace(new RegExp(`<@!?${client.user.id}>`, 'g'), ''); + } + messageText = messageText.replace(/<@&\d+>/g, '').trim(); + } + + log('INFO', `Message from ${sender}${isGuild ? ` in #${(message.channel as TextChannel).name}` : ''}: ${messageText.substring(0, 50)}${downloadedFiles.length > 0 ? ` [+${downloadedFiles.length} file(s)]` : ''}...`); const pairing = ensureSenderPaired(PAIRING_FILE, 'discord', message.author.id, sender); if (!pairing.approved && pairing.code) { @@ -272,56 +647,10 @@ client.on(Events.MessageCreate, async (message: Message) => { return; } - // Check for agent list command - if (message.content.trim().match(/^[!/]agent$/i)) { - log('INFO', 'Agent list command received'); - const agentList = getAgentListText(); - await message.reply(agentList); - return; - } - - // Check for team list command - if (message.content.trim().match(/^[!/]team$/i)) { - log('INFO', 'Team list command received'); - const teamList = getTeamListText(); - await message.reply(teamList); - return; - } - - // Check for reset command: /reset @agent_id [@agent_id2 ...] - const resetMatch = messageText.trim().match(/^[!/]reset\s+(.+)$/i); - if (messageText.trim().match(/^[!/]reset$/i)) { - await message.reply('Usage: `/reset @agent_id [@agent_id2 ...]`\nSpecify which agent(s) to reset.'); - return; - } - if (resetMatch) { - log('INFO', 'Per-agent reset command received'); - const agentArgs = resetMatch[1].split(/\s+/).map(a => a.replace(/^@/, '').toLowerCase()); - try { - const settingsData = fs.readFileSync(SETTINGS_FILE, 'utf8'); - const settings = JSON.parse(settingsData); - const agents = settings.agents || {}; - const workspacePath = settings?.workspace?.path || path.join(require('os').homedir(), 'tinyclaw-workspace'); - const resetResults: string[] = []; - for (const agentId of agentArgs) { - if (!agents[agentId]) { - resetResults.push(`Agent '${agentId}' not found.`); - continue; - } - const flagDir = path.join(workspacePath, agentId); - if (!fs.existsSync(flagDir)) fs.mkdirSync(flagDir, { recursive: true }); - fs.writeFileSync(path.join(flagDir, 'reset_flag'), 'reset'); - resetResults.push(`Reset @${agentId} (${agents[agentId].name}).`); - } - await message.reply(resetResults.join('\n')); - } catch { - await message.reply('Could not process reset command. Check settings.'); - } - return; - } - // Show typing indicator - await (message.channel as DMChannel).sendTyping(); + if ('sendTyping' in message.channel) { + await message.channel.sendTyping(); + } // Build message text with file references let fullMessage = messageText; @@ -330,6 +659,27 @@ client.on(Events.MessageCreate, async (message: Message) => { fullMessage = fullMessage ? `${fullMessage}\n\n${fileRefs}` : fileRefs; } + // Encode senderId: for guild messages use userId:channelId, for DMs just userId + const senderId = isGuild + ? `${message.author.id}:${message.channel.id}` + : message.author.id; + + // Determine default agent for designated channels (if no explicit @agent prefix) + let agent: string | undefined; + if (isDesignatedChannel) { + const channelConfig = guildChannels[message.channel.id]; + if (channelConfig?.default_agent && !fullMessage.match(/^@\S+/)) { + agent = channelConfig.default_agent; + } + } + + // Store pending message BEFORE enqueuing so SSE stream_start can find it + pendingMessages.set(messageId, { + message: message, + channel: message.channel as DMChannel | TextChannel, + timestamp: Date.now(), + }); + // Write to queue via API await fetch(`${API_BASE}/api/message`, { method: 'POST', @@ -337,26 +687,20 @@ client.on(Events.MessageCreate, async (message: Message) => { body: JSON.stringify({ channel: 'discord', sender, - senderId: message.author.id, + senderId, message: fullMessage, messageId, + agent, files: downloadedFiles.length > 0 ? downloadedFiles : undefined, }), }); - log('INFO', `Queued message ${messageId}`); + log('INFO', `Queued message ${messageId}${agent ? ` (default agent: ${agent})` : ''}`); - // Store pending message for response - pendingMessages.set(messageId, { - message: message, - channel: message.channel as DMChannel, - timestamp: Date.now(), - }); - - // Clean up old pending messages (older than 10 minutes) - const tenMinutesAgo = Date.now() - (10 * 60 * 1000); + // Clean up old pending messages (older than 60 minutes) + const ttlAgo = Date.now() - (60 * 60 * 1000); for (const [id, data] of pendingMessages.entries()) { - if (data.timestamp < tenMinutesAgo) { + if (data.timestamp < ttlAgo) { pendingMessages.delete(id); } } @@ -381,26 +725,37 @@ async function checkOutgoingQueue(): Promise { for (const resp of responses) { try { - const responseText = resp.message; - const messageId = resp.messageId; - const sender = resp.sender; - const senderId = resp.senderId; + const responseText: string = resp.message; + const messageId: string = resp.messageId; + const sender: string = resp.sender; + const senderId: string | undefined = resp.senderId; + const agentId: string | undefined = resp.agent; const files: string[] = resp.files || []; // Find pending message, or fall back to senderId for proactive messages const pending = pendingMessages.get(messageId); - let dmChannel = pending?.channel ?? null; + let targetChannel: DMChannel | TextChannel | null = pending?.channel ?? null; - if (!dmChannel && senderId) { + if (!targetChannel && senderId) { try { - const user = await client.users.fetch(senderId); - dmChannel = await user.createDM(); + if (senderId.includes(':')) { + // Guild message: senderId is userId:channelId + const channelId = senderId.split(':')[1]; + const ch = await client.channels.fetch(channelId); + if (ch && ch.isTextBased() && !ch.isDMBased()) { + targetChannel = ch as TextChannel; + } + } else { + // DM: senderId is just userId + const user = await client.users.fetch(senderId); + targetChannel = await user.createDM(); + } } catch (err) { - log('ERROR', `Could not open DM for senderId ${senderId}: ${(err as Error).message}`); + log('ERROR', `Could not resolve channel for senderId ${senderId}: ${(err as Error).message}`); } } - if (dmChannel) { + if (targetChannel) { // Send any attached files if (files.length > 0) { const attachments: AttachmentBuilder[] = []; @@ -413,24 +768,34 @@ async function checkOutgoingQueue(): Promise { } } if (attachments.length > 0) { - await dmChannel.send({ files: attachments }); + await targetChannel.send({ files: attachments }); log('INFO', `Sent ${attachments.length} file(s) to Discord`); } } + // Append agent signature to response (unless disabled) + let signedText = responseText; + if (agentId) { + const settings = getCachedSettings(); + if (settings?.channels?.discord?.sign_responses !== false) { + const agentName = settings?.agents?.[agentId]?.name; + if (agentName) signedText = `${responseText}\n\n— ${agentName}`; + } + } + // Split message if needed (Discord 2000 char limit) - if (responseText) { - const chunks = splitMessage(responseText); + if (signedText) { + const chunks = splitMessage(signedText); if (chunks.length > 0) { if (pending) { await pending.message.reply(chunks[0]!); } else { - await dmChannel.send(chunks[0]!); + await targetChannel.send(chunks[0]!); } } for (let i = 1; i < chunks.length; i++) { - await dmChannel.send(chunks[i]!); + await targetChannel.send(chunks[i]!); } } @@ -481,4 +846,5 @@ process.on('SIGTERM', () => { // Start client log('INFO', 'Starting Discord client...'); +connectSSE(); client.login(DISCORD_BOT_TOKEN); diff --git a/src/lib/conversation.ts b/src/lib/conversation.ts index e9f34bc4..4d011159 100644 --- a/src/lib/conversation.ts +++ b/src/lib/conversation.ts @@ -178,6 +178,7 @@ export function completeConversation(conv: Conversation): void { enqueueResponse({ channel: conv.channel, sender: conv.sender, + senderId: conv.senderId, message: responseMessage, originalMessage: conv.originalMessage, messageId: conv.messageId, diff --git a/src/lib/db.ts b/src/lib/db.ts index b1690d6a..15f727b2 100644 --- a/src/lib/db.ts +++ b/src/lib/db.ts @@ -29,6 +29,7 @@ export interface DbMessage { created_at: number; updated_at: number; claimed_by: string | null; + handoff_depth: number; } export interface DbResponse { @@ -56,6 +57,7 @@ export interface EnqueueMessageData { files?: string[]; conversationId?: string; fromAgent?: string; + handoffDepth?: number; } export interface EnqueueResponseData { @@ -127,6 +129,11 @@ export function initQueueDb(): void { CREATE INDEX IF NOT EXISTS idx_responses_channel_status ON responses(channel, status); `); + // Add handoff_depth column (idempotent) + try { + db.exec('ALTER TABLE messages ADD COLUMN handoff_depth INTEGER NOT NULL DEFAULT 0'); + } catch { /* column already exists */ } + // Drop legacy indexes/tables db.exec('DROP INDEX IF EXISTS idx_messages_status'); db.exec('DROP INDEX IF EXISTS idx_messages_agent'); @@ -144,8 +151,8 @@ export function enqueueMessage(data: EnqueueMessageData): number { const d = getDb(); const now = Date.now(); const result = d.prepare(` - INSERT INTO messages (message_id, channel, sender, sender_id, message, agent, files, conversation_id, from_agent, status, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending', ?, ?) + INSERT INTO messages (message_id, channel, sender, sender_id, message, agent, files, conversation_id, from_agent, handoff_depth, status, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending', ?, ?) `).run( data.messageId, data.channel, @@ -156,6 +163,7 @@ export function enqueueMessage(data: EnqueueMessageData): number { data.files ? JSON.stringify(data.files) : null, data.conversationId ?? null, data.fromAgent ?? null, + data.handoffDepth ?? 0, now, now, ); diff --git a/src/lib/invoke.ts b/src/lib/invoke.ts index 020b1bca..b249ffb9 100644 --- a/src/lib/invoke.ts +++ b/src/lib/invoke.ts @@ -1,4 +1,5 @@ import { spawn } from 'child_process'; +import { createInterface } from 'readline'; import fs from 'fs'; import path from 'path'; import { AgentConfig, TeamConfig } from './types'; @@ -8,9 +9,12 @@ import { ensureAgentDirectory, updateAgentTeammates } from './agent'; export async function runCommand(command: string, args: string[], cwd?: string): Promise { return new Promise((resolve, reject) => { + const env = { ...process.env }; + delete env.CLAUDECODE; const child = spawn(command, args, { cwd: cwd || SCRIPT_DIR, stdio: ['ignore', 'pipe', 'pipe'], + env, }); let stdout = ''; @@ -175,3 +179,159 @@ export async function invokeAgent( return await runCommand('claude', claudeArgs, workingDir); } } + +/** + * Run a command and stream stdout line-by-line as data arrives. + * Uses `script -qec` to allocate a PTY so the child process flushes + * output line-by-line instead of buffering until exit. + * Calls onLine(line) for each complete line. Returns full output on close. + */ +export async function runCommandStreaming( + command: string, + args: string[], + cwd: string | undefined, + onLine: (line: string) => void +): Promise { + return new Promise((resolve, reject) => { + const env = { ...process.env }; + delete env.CLAUDECODE; + + // Build the full command string for script -qec + const escapedArgs = args.map(a => `'${a.replace(/'/g, "'\\''")}'`).join(' '); + const fullCmd = `${command} ${escapedArgs}`; + + const child = spawn('script', ['-qec', fullCmd, '/dev/null'], { + cwd: cwd || SCRIPT_DIR, + stdio: ['ignore', 'pipe', 'pipe'], + env, + }); + + let allLines = ''; + + child.stdout.setEncoding('utf8'); + child.stderr.setEncoding('utf8'); + + // With PTY via script, all output comes through stdout + const rl = createInterface({ input: child.stdout }); + rl.on('line', (line) => { + // Filter out script control sequences and non-JSON lines + const trimmed = line.replace(/\r$/, '').replace(/[\x00-\x09\x0b-\x1f]|\x1b\[[^a-zA-Z]*[a-zA-Z]|\x1b\][^\x07]*\x07/g, '').trim(); + if (!trimmed || !trimmed.startsWith('{')) return; + allLines += trimmed + '\n'; + onLine(trimmed); + }); + + let stderr = ''; + child.stderr.on('data', (chunk: string) => { + stderr += chunk; + }); + + child.on('error', (error) => { + rl.close(); + reject(error); + }); + + child.on('close', (code) => { + rl.close(); + // stream-json may exit non-zero; check if we got a result line + const hasResult = allLines.includes('"type":"result"'); + if (code === 0 || hasResult) { + resolve(allLines); + return; + } + const errorMessage = stderr.trim() || allLines.trim() || `Command exited with code ${code}`; + reject(new Error(errorMessage)); + }); + }); +} + +/** + * Invoke a Claude/Anthropic agent with streaming JSON output. + * Each stdout line is passed to onStreamLine for real-time processing. + * Falls back to invokeAgent for non-Anthropic providers. + */ +export async function invokeAgentStreaming( + agent: AgentConfig, + agentId: string, + message: string, + workspacePath: string, + shouldReset: boolean, + agents: Record, + teams: Record, + onStreamLine: (line: string) => void +): Promise { + const provider = agent.provider || 'anthropic'; + + // Only Anthropic/Claude supports stream-json; fall back for others + if (provider !== 'anthropic') { + return invokeAgent(agent, agentId, message, workspacePath, shouldReset, agents, teams); + } + + // Ensure agent directory exists with config files + const agentDir = path.join(workspacePath, agentId); + const isNewAgent = !fs.existsSync(agentDir); + ensureAgentDirectory(agentDir); + if (isNewAgent) { + log('INFO', `Initialized agent directory with config files: ${agentDir}`); + } + + // Update AGENTS.md with current teammate info + updateAgentTeammates(agentDir, agentId, agents, teams); + + // Resolve working directory + const workingDir = agent.working_directory + ? (path.isAbsolute(agent.working_directory) + ? agent.working_directory + : path.join(workspacePath, agent.working_directory)) + : agentDir; + + log('INFO', `Using Claude provider with streaming (agent: ${agentId})`); + + const continueConversation = !shouldReset; + if (shouldReset) { + log('INFO', `🔄 Resetting conversation for agent: ${agentId}`); + } + + const modelId = resolveClaudeModel(agent.model); + const claudeArgs = ['--dangerously-skip-permissions', '--output-format', 'stream-json', '--verbose']; + if (modelId) { + claudeArgs.push('--model', modelId); + } + if (continueConversation) { + claudeArgs.push('-c'); + } + claudeArgs.push('-p', message); + + const fullOutput = await runCommandStreaming('claude', claudeArgs, workingDir, onStreamLine); + + // Extract final response from the result line + const lines = fullOutput.trim().split('\n'); + for (let i = lines.length - 1; i >= 0; i--) { + try { + const json = JSON.parse(lines[i]); + if (json.type === 'result' && json.subtype === 'success' && json.result) { + return json.result; + } + } catch { + // not JSON, skip + } + } + + // Fallback: look for the last assistant text content + for (let i = lines.length - 1; i >= 0; i--) { + try { + const json = JSON.parse(lines[i]); + if (json.type === 'assistant' && Array.isArray(json.content)) { + for (const block of json.content) { + if (block.type === 'text' && block.text) { + return block.text; + } + } + } + } catch { + // not JSON, skip + } + } + + return fullOutput; +} diff --git a/src/lib/routing.ts b/src/lib/routing.ts index b57a1666..6adf9b95 100644 --- a/src/lib/routing.ts +++ b/src/lib/routing.ts @@ -136,3 +136,40 @@ export function parseAgentRouting( } return { agentId: 'default', message: rawMessage }; } + +/** + * Detect @agent_id prefix in an agent's response text (agent-to-agent handoff). + * Returns the target agent ID and remaining message, or null if no valid handoff. + */ +export function parseResponseHandoff( + response: string, + currentAgentId: string, + agents: Record +): { targetAgentId: string; message: string } | null { + const trimmed = response.trim(); + const match = trimmed.match(/^@(\S+)\s+([\s\S]*)$/); + if (!match) return null; + + const candidateId = match[1].toLowerCase(); + const message = match[2].trim(); + + if (!message) return null; + + // Reject self-handoff + if (candidateId === currentAgentId) return null; + + // Match by agent ID first + if (agents[candidateId]) { + return { targetAgentId: candidateId, message: trimmed }; + } + + // Match by agent name (case-insensitive) — rewrite @name to @id + for (const [id, config] of Object.entries(agents)) { + if (config.name.toLowerCase() === candidateId && id !== currentAgentId) { + const rewritten = `@${id} ${message}`; + return { targetAgentId: id, message: rewritten }; + } + } + + return null; +} diff --git a/src/lib/types.ts b/src/lib/types.ts index 46b3d566..1c52e48f 100644 --- a/src/lib/types.ts +++ b/src/lib/types.ts @@ -5,6 +5,14 @@ export interface AgentConfig { working_directory: string; system_prompt?: string; prompt_file?: string; + stream_logs?: boolean; +} + +export interface StreamLogEvent { + messageId: string; + agentId: string; + agentName: string; + content: string | null; } export interface TeamConfig { @@ -38,7 +46,11 @@ export interface Settings { }; channels?: { enabled?: string[]; - discord?: { bot_token?: string }; + discord?: { + bot_token?: string; + guild_channels?: Record; + sign_responses?: boolean; + }; telegram?: { bot_token?: string }; whatsapp?: {}; }; @@ -79,6 +91,7 @@ export interface Conversation { id: string; channel: string; sender: string; + senderId?: string; originalMessage: string; messageId: string; pending: number; diff --git a/src/queue-processor.ts b/src/queue-processor.ts index 7fef61e1..b5c6ce3d 100644 --- a/src/queue-processor.ts +++ b/src/queue-processor.ts @@ -22,12 +22,12 @@ import { getSettings, getAgents, getTeams } from './lib/config'; import { log, emitEvent } from './lib/logging'; -import { parseAgentRouting, findTeamForAgent, getAgentResetFlag, extractTeammateMentions } from './lib/routing'; -import { invokeAgent } from './lib/invoke'; +import { parseAgentRouting, findTeamForAgent, getAgentResetFlag, extractTeammateMentions, parseResponseHandoff } from './lib/routing'; +import { invokeAgent, invokeAgentStreaming } from './lib/invoke'; import { startApiServer } from './server'; import { initQueueDb, claimNextMessage, completeMessage as dbCompleteMessage, - failMessage, enqueueResponse, getPendingAgents, recoverStaleMessages, + failMessage, enqueueMessage, enqueueResponse, getPendingAgents, recoverStaleMessages, pruneAckedResponses, pruneCompletedMessages, closeQueueDb, queueEvents, DbMessage, } from './lib/db'; import { handleLongResponse, collectFiles } from './lib/response'; @@ -36,6 +36,68 @@ import { withConversationLock, incrementPending, decrementPending, } from './lib/conversation'; +const MAX_HANDOFF_DEPTH = 5; + +/** + * Parse a stream-json line from Claude CLI and return a human-readable string, + * or null to skip the event. + */ +function formatStreamEvent(line: string): string | null { + let json: any; + try { + json = JSON.parse(line); + } catch { + return null; + } + + // stream-json wraps content inside json.message.content + const content = json.message?.content ?? json.content; + + // Tool use events from assistant + if (json.type === 'assistant' && Array.isArray(content)) { + const parts: string[] = []; + for (const block of content) { + if (block.type === 'tool_use') { + const name = block.name || 'unknown'; + const input = block.input || {}; + // Build a short description from the input + let detail = ''; + if (input.command) { + detail = ` \`${String(input.command).substring(0, 120)}\``; + } else if (input.description) { + detail = ` (${String(input.description).substring(0, 120)})`; + } else if (input.pattern) { + detail = ` \`${input.pattern}\``; + } else if (input.file_path) { + detail = ` \`${input.file_path}\``; + } + parts.push(`Tool: \`${name}\`${detail}`); + } + } + return parts.length > 0 ? parts.join('\n') : null; + } + + // Tool result events from user + if (json.type === 'user' && Array.isArray(content)) { + for (const block of content) { + if (block.type === 'tool_result') { + // tool_result content can be a string or nested in the top-level tool_use_result + const resultText = typeof block.content === 'string' + ? block.content + : (Array.isArray(block.content) + ? block.content.map((c: any) => c.text || '').join('') + : (json.tool_use_result?.stdout || '')); + if (!resultText) return null; + const preview = resultText.length > 200 ? resultText.substring(0, 200) + '...' : resultText; + // Escape backticks to avoid breaking Discord formatting + return `Result: ${preview.replace(/`/g, "'")}`; + } + } + } + + return null; +} + // Ensure directories exist [FILES_DIR, path.dirname(LOG_FILE), CHATS_DIR].forEach(dir => { if (!fs.existsSync(dir)) { @@ -158,10 +220,27 @@ async function processMessage(dbMsg: DbMessage): Promise { // Invoke agent emitEvent('chain_step_start', { agentId, agentName: agent.name, fromAgent: messageData.fromAgent || null }); let response: string; + const provider = agent.provider || 'anthropic'; + const useStreaming = agent.stream_logs === true && provider === 'anthropic'; + try { - response = await invokeAgent(agent, agentId, message, workspacePath, shouldReset, agents, teams); + if (useStreaming) { + // Streaming path: emit real-time log events + emitEvent('stream_start', { messageId, agentId, agentName: agent.name }); + response = await invokeAgentStreaming(agent, agentId, message, workspacePath, shouldReset, agents, teams, (line) => { + const formatted = formatStreamEvent(line); + if (formatted) { + emitEvent('stream_log', { messageId, agentId, agentName: agent.name, content: formatted }); + } + }); + emitEvent('stream_end', { messageId, agentId, agentName: agent.name }); + } else { + response = await invokeAgent(agent, agentId, message, workspacePath, shouldReset, agents, teams); + } } catch (error) { - const provider = agent.provider || 'anthropic'; + if (useStreaming) { + emitEvent('stream_end', { messageId, agentId, agentName: agent.name }); + } const providerLabel = provider === 'openai' ? 'Codex' : provider === 'opencode' ? 'OpenCode' : 'Claude'; log('ERROR', `${providerLabel} error (agent: ${agentId}): ${(error as Error).message}`); response = "Sorry, I encountered an error processing your request. Please check the queue logs."; @@ -184,6 +263,29 @@ async function processMessage(dbMsg: DbMessage): Promise { // Handle long responses — send as file attachment const { message: responseMessage, files: allFiles } = handleLongResponse(finalResponse, outboundFiles); + // Check for agent-to-agent handoff (@agent_id prefix in response) + const handoffDepth = dbMsg.handoff_depth ?? 0; + const handoff = parseResponseHandoff(response, agentId, agents); + + if (handoff && handoffDepth < MAX_HANDOFF_DEPTH) { + const handoffMessageId = `handoff_${agentId}_${handoff.targetAgentId}_${Date.now()}_${Math.random().toString(36).slice(2, 6)}`; + enqueueMessage({ + channel, + sender, + senderId: dbMsg.sender_id ?? undefined, + message: handoff.message, + messageId: handoffMessageId, + agent: handoff.targetAgentId, + fromAgent: agentId, + handoffDepth: handoffDepth + 1, + }); + log('INFO', `Agent handoff: @${agentId} → @${handoff.targetAgentId} (depth ${handoffDepth + 1})`); + emitEvent('agent_handoff', { fromAgent: agentId, toAgent: handoff.targetAgentId, depth: handoffDepth + 1 }); + } else if (handoff && handoffDepth >= MAX_HANDOFF_DEPTH) { + log('WARN', `Agent handoff depth limit reached (${MAX_HANDOFF_DEPTH}): @${agentId} → @${handoff.targetAgentId} — skipping re-enqueue`); + } + + // Always deliver the response to the user enqueueResponse({ channel, sender, @@ -215,6 +317,7 @@ async function processMessage(dbMsg: DbMessage): Promise { id: convId, channel, sender, + senderId: dbMsg.sender_id ?? undefined, originalMessage: rawMessage, messageId, pending: 1, // this initial message @@ -310,11 +413,14 @@ async function processQueue(): Promise { // Update the chain agentProcessingChains.set(agentId, newChain); - // Clean up completed chains to avoid memory leaks + // Clean up completed chains and re-check for pending messages newChain.finally(() => { if (agentProcessingChains.get(agentId) === newChain) { agentProcessingChains.delete(agentId); } + // Re-trigger queue processing in case more messages arrived + // while this agent was busy + processQueue(); }); } } catch (error) { @@ -364,6 +470,12 @@ emitEvent('processor_start', { agents: Object.keys(getAgents(getSettings())), te // Event-driven: all messages come through the API server (same process) queueEvents.on('message:enqueued', () => processQueue()); +// Process any pending messages left over from before restart +processQueue(); + +// Safety-net: periodically check for stranded pending messages +setInterval(() => processQueue(), 30 * 1000); // every 30s + // Periodic maintenance setInterval(() => { const count = recoverStaleMessages();