From 76da849c13bf47349bc048018af290686864b5c0 Mon Sep 17 00:00:00 2001 From: 123 Date: Thu, 12 Mar 2026 20:34:36 -0700 Subject: [PATCH] feat(bridge): batch rapid-fire messages for same session MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a user sends multiple messages quickly from an IM channel (Feishu, Telegram, Discord), they are now aggregated into a single Claude prompt instead of being queued for serial one-by-one processing. Problem: - Messages for the same session were serialized via processWithSessionLock - Each Claude call takes 30s-5min, so message 2 waited for message 1 - No acknowledgment was sent, making users think messages were lost Solution: - Add pendingBatches Map to BridgeManagerState for per-session aggregation - scheduleOrProcess() checks if session has active lock; if busy, buffers the message and sends an immediate "⏳ received" acknowledgment - 2-second batch window (BATCH_WINDOW_MS) allows collecting multiple rapid-fire messages before flushing - flushBatch() merges queued messages into a single prompt with --- separators, concatenates attachments, and processes as one request - processWithSessionLock.finally() auto-flushes any messages that accumulated during processing Behavior: - Single message, session free → processed immediately (no change) - Multiple messages while session busy → batched + ack sent per message - Batch flushed after 2s window or when current processing completes Co-Authored-By: Claude Opus 4.6 --- src/lib/bridge/bridge-manager.ts | 160 ++++++++++++++++++++++++++++--- 1 file changed, 149 insertions(+), 11 deletions(-) diff --git a/src/lib/bridge/bridge-manager.ts b/src/lib/bridge/bridge-manager.ts index 912b27b5..2bd33629 100644 --- a/src/lib/bridge/bridge-manager.ts +++ b/src/lib/bridge/bridge-manager.ts @@ -167,6 +167,15 @@ interface AdapterMeta { lastError: string | null; } +/** Pending messages waiting to be batched into the next Claude call. */ +interface PendingBatch { + messages: InboundMessage[]; + timer: ReturnType | null; +} + +/** How long to wait for additional messages before sending the batch (ms). */ +const BATCH_WINDOW_MS = 2000; + interface BridgeManagerState { adapters: Map; adapterMeta: Map; @@ -176,6 +185,8 @@ interface BridgeManagerState { activeTasks: Map; /** Per-session processing chains for concurrency control */ sessionLocks: Map>; + /** Per-session pending message batches for aggregation */ + pendingBatches: Map; autoStartChecked: boolean; } @@ -190,6 +201,7 @@ function getState(): BridgeManagerState { loopAborts: new Map(), activeTasks: new Map(), sessionLocks: new Map(), + pendingBatches: new Map(), autoStartChecked: false, }; } @@ -197,6 +209,9 @@ function getState(): BridgeManagerState { if (!g[GLOBAL_KEY].sessionLocks) { g[GLOBAL_KEY].sessionLocks = new Map(); } + if (!g[GLOBAL_KEY].pendingBatches) { + g[GLOBAL_KEY].pendingBatches = new Map(); + } return g[GLOBAL_KEY]; } @@ -209,10 +224,21 @@ function processWithSessionLock(sessionId: string, fn: () => Promise): Pro const prev = state.sessionLocks.get(sessionId) || Promise.resolve(); const current = prev.then(fn, fn); state.sessionLocks.set(sessionId, current); - // Cleanup when the chain completes + // Cleanup when the chain completes, and flush any pending batched messages current.finally(() => { if (state.sessionLocks.get(sessionId) === current) { state.sessionLocks.delete(sessionId); + // Check if messages accumulated during processing — flush them + const batch = state.pendingBatches.get(sessionId); + if (batch && batch.messages.length > 0) { + // Find the adapter for this batch + for (const [, adapter] of state.adapters) { + if (adapter.isRunning()) { + flushBatch(adapter, sessionId); + break; + } + } + } } }); return current; @@ -369,10 +395,129 @@ export function registerAdapter(adapter: BaseChannelAdapter): void { state.adapters.set(adapter.channelType, adapter); } +/** + * Flush a pending message batch: merge all queued messages into a single + * prompt and process as one Claude request. + */ +function flushBatch(adapter: BaseChannelAdapter, sessionId: string): void { + const state = getState(); + const batch = state.pendingBatches.get(sessionId); + if (!batch || batch.messages.length === 0) { + state.pendingBatches.delete(sessionId); + return; + } + + if (batch.timer) { + clearTimeout(batch.timer); + batch.timer = null; + } + + const messages = batch.messages.splice(0); + state.pendingBatches.delete(sessionId); + + // Merge multiple messages into one. If only one message, pass through unchanged. + if (messages.length === 1) { + processWithSessionLock(sessionId, () => + handleMessage(adapter, messages[0]), + ).catch(err => { + console.error(`[bridge-manager] Session ${sessionId.slice(0, 8)} error:`, err); + }); + return; + } + + // Multiple messages → merge into a single prompt. + // Use the last message as the base (for address, messageId, etc.) + // and concatenate all text with separators. + const lastMsg = messages[messages.length - 1]; + const mergedText = messages + .map((m, i) => m.text.trim()) + .filter(Boolean) + .join('\n\n---\n\n'); + + // Merge attachments from all messages + const allAttachments = messages + .flatMap(m => m.attachments || []); + + const mergedMsg: InboundMessage = { + ...lastMsg, + text: mergedText, + attachments: allAttachments.length > 0 ? allAttachments : undefined, + }; + + console.log(`[bridge-manager] Batched ${messages.length} messages for session ${sessionId.slice(0, 8)}`); + + // Send a confirmation so the user knows all messages were received + deliver(adapter, { + address: lastMsg.address, + text: `📨 Received ${messages.length} messages, processing as batch...`, + parseMode: 'plain', + }).catch(() => { /* best effort */ }); + + processWithSessionLock(sessionId, () => + handleMessage(adapter, mergedMsg), + ).catch(err => { + console.error(`[bridge-manager] Session ${sessionId.slice(0, 8)} batch error:`, err); + }); +} + +/** + * Schedule a message for batching. If the session is currently processing + * (has an active session lock), messages are accumulated and flushed together + * after a short window (BATCH_WINDOW_MS) to avoid serial one-by-one processing. + */ +function scheduleOrProcess(adapter: BaseChannelAdapter, msg: InboundMessage): void { + const state = getState(); + const binding = router.resolve(msg.address); + const sessionId = binding.codepilotSessionId; + + // If session is currently busy (has an active lock chain), batch the message + const hasActiveLock = state.sessionLocks.has(sessionId); + + if (hasActiveLock) { + // Session is busy — add to pending batch + let batch = state.pendingBatches.get(sessionId); + if (!batch) { + batch = { messages: [], timer: null }; + state.pendingBatches.set(sessionId, batch); + } + batch.messages.push(msg); + + // Send immediate acknowledgment so user knows message was received + deliver(adapter, { + address: msg.address, + text: `⏳ Message received (#${batch.messages.length} in queue). Will process after current task.`, + parseMode: 'plain', + }).catch(() => { /* best effort */ }); + + // Reset the batch window timer — wait for more messages + if (batch.timer) clearTimeout(batch.timer); + batch.timer = setTimeout(() => { + flushBatch(adapter, sessionId); + }, BATCH_WINDOW_MS); + + return; + } + + // Session is free — check if there are pending batched messages to merge with + const batch = state.pendingBatches.get(sessionId); + if (batch && batch.messages.length > 0) { + batch.messages.push(msg); + flushBatch(adapter, sessionId); + return; + } + + // No batching needed — process directly + processWithSessionLock(sessionId, () => + handleMessage(adapter, msg), + ).catch(err => { + console.error(`[bridge-manager] Session ${sessionId.slice(0, 8)} error:`, err); + }); +} + /** * Run the event loop for a single adapter. * Messages for different sessions are dispatched concurrently; - * messages for the same session are serialized via session locks. + * messages for the same session use batching to merge rapid-fire messages. */ function runAdapterLoop(adapter: BaseChannelAdapter): void { const state = getState(); @@ -386,18 +531,11 @@ function runAdapterLoop(adapter: BaseChannelAdapter): void { if (!msg) continue; // Adapter stopped // Callback queries and commands are lightweight — process inline. - // Regular messages use per-session locking for concurrency. if (msg.callbackData || msg.text.trim().startsWith('/')) { await handleMessage(adapter, msg); } else { - const binding = router.resolve(msg.address); - // Fire-and-forget into session lock — loop continues to accept - // messages for other sessions immediately. - processWithSessionLock(binding.codepilotSessionId, () => - handleMessage(adapter, msg), - ).catch(err => { - console.error(`[bridge-manager] Session ${binding.codepilotSessionId.slice(0, 8)} error:`, err); - }); + // Regular messages — use batching for same-session aggregation + scheduleOrProcess(adapter, msg); } } catch (err) { if (abort.signal.aborted) break;