From 80ab159c4e5d255ff90618970a7b378d7c8e5141 Mon Sep 17 00:00:00 2001 From: 123 Date: Fri, 13 Mar 2026 21:53:17 +0800 Subject: [PATCH] feat(bridge): parallel message processing for IM channels MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When `bridge_parallel_tasks` is enabled, messages arriving while the session is busy are dispatched to ephemeral worker sessions instead of being serialized in a per-session Promise chain. This eliminates the queue delay users experience when sending multiple messages rapidly on Feishu/Telegram/Discord. Worker sessions inherit provider, model, working directory, and mode from the parent binding but do NOT overwrite the channel_bindings row, so the primary chat→session mapping stays intact. Co-Authored-By: Claude Opus 4.6 --- src/lib/bridge/bridge-manager.ts | 40 +++++++++++++++++++++++++------- src/lib/bridge/channel-router.ts | 38 ++++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 8 deletions(-) diff --git a/src/lib/bridge/bridge-manager.ts b/src/lib/bridge/bridge-manager.ts index 912b27b5..f2413579 100644 --- a/src/lib/bridge/bridge-manager.ts +++ b/src/lib/bridge/bridge-manager.ts @@ -391,13 +391,34 @@ function runAdapterLoop(adapter: BaseChannelAdapter): void { await handleMessage(adapter, msg); } else { const binding = router.resolve(msg.address); - // Fire-and-forget into session lock — loop continues to accept - // messages for other sessions immediately. - processWithSessionLock(binding.codepilotSessionId, () => - handleMessage(adapter, msg), - ).catch(err => { - console.error(`[bridge-manager] Session ${binding.codepilotSessionId.slice(0, 8)} error:`, err); - }); + const parallelEnabled = getSetting('bridge_parallel_tasks') === 'true'; + const sessionBusy = state.activeTasks.has(binding.codepilotSessionId) + || state.sessionLocks.has(binding.codepilotSessionId); + + if (parallelEnabled && sessionBusy) { + // ── Parallel mode: spawn a worker session ──────────────── + // The main session is busy, so create an ephemeral worker + // session that inherits config from the main binding. This + // allows multiple messages to be processed concurrently with + // independent Claude streams. + const workerBinding = router.createWorkerBinding(binding); + console.log( + `[bridge-manager] Parallel dispatch: worker ${workerBinding.codepilotSessionId.slice(0, 8)} ` + + `spawned from busy session ${binding.codepilotSessionId.slice(0, 8)}`, + ); + handleMessage(adapter, msg, workerBinding).catch(err => { + console.error(`[bridge-manager] Worker ${workerBinding.codepilotSessionId.slice(0, 8)} error:`, err); + }); + } else { + // ── Standard mode: serialize within session ────────────── + // Fire-and-forget into session lock — loop continues to accept + // messages for other sessions immediately. + processWithSessionLock(binding.codepilotSessionId, () => + handleMessage(adapter, msg), + ).catch(err => { + console.error(`[bridge-manager] Session ${binding.codepilotSessionId.slice(0, 8)} error:`, err); + }); + } } } catch (err) { if (abort.signal.aborted) break; @@ -424,10 +445,12 @@ function runAdapterLoop(adapter: BaseChannelAdapter): void { /** * Handle a single inbound message. + * @param overrideBinding Optional pre-resolved binding (used for parallel worker sessions). */ async function handleMessage( adapter: BaseChannelAdapter, msg: InboundMessage, + overrideBinding?: import('./types').ChannelBinding, ): Promise { // Update lastMessageAt for this adapter const adapterState = getState(); @@ -488,7 +511,8 @@ async function handleMessage( if (!text && !hasAttachments) { ack(); return; } // Regular message — route to conversation engine - const binding = router.resolve(msg.address); + // Use override binding when provided (e.g., parallel worker session) + const binding = overrideBinding || router.resolve(msg.address); // Notify adapter that message processing is starting (e.g., typing indicator) adapter.onMessageStart?.(msg.address.chatId); diff --git a/src/lib/bridge/channel-router.ts b/src/lib/bridge/channel-router.ts index b1ec5aa4..15fcdee0 100644 --- a/src/lib/bridge/channel-router.ts +++ b/src/lib/bridge/channel-router.ts @@ -102,6 +102,44 @@ export function updateBinding( updateChannelBinding(id, updates); } +/** + * Create an ephemeral worker binding for parallel message processing. + * + * Creates a new session that inherits config from the parent binding + * but does NOT touch channel_bindings — the parent's binding stays intact. + * The worker session is independent and will be garbage collected when idle. + */ +export function createWorkerBinding(parentBinding: ChannelBinding): ChannelBinding { + const session = createSession( + `Worker: ${parentBinding.chatId.slice(0, 16)}`, + parentBinding.model, + undefined, + parentBinding.workingDirectory, + parentBinding.mode, + ); + + // Inherit provider from parent session + const parentSession = getSession(parentBinding.codepilotSessionId); + if (parentSession?.provider_id) { + updateSessionProviderId(session.id, parentSession.provider_id); + } + + // Return a synthetic binding — NOT persisted to channel_bindings table + return { + id: '', // empty = ephemeral, not persisted + channelType: parentBinding.channelType, + chatId: parentBinding.chatId, + codepilotSessionId: session.id, + sdkSessionId: '', + workingDirectory: parentBinding.workingDirectory, + model: parentBinding.model, + mode: parentBinding.mode, + active: 1, + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }; +} + /** * List all bindings, optionally filtered by channel type. */