Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 149 additions & 11 deletions src/lib/bridge/bridge-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,15 @@ interface AdapterMeta {
lastError: string | null;
}

/** Pending messages waiting to be batched into the next Claude call. */
interface PendingBatch {
messages: InboundMessage[];
timer: ReturnType<typeof setTimeout> | null;
}

/** How long to wait for additional messages before sending the batch (ms). */
const BATCH_WINDOW_MS = 2000;

interface BridgeManagerState {
adapters: Map<string, BaseChannelAdapter>;
adapterMeta: Map<string, AdapterMeta>;
Expand All @@ -176,6 +185,8 @@ interface BridgeManagerState {
activeTasks: Map<string, AbortController>;
/** Per-session processing chains for concurrency control */
sessionLocks: Map<string, Promise<void>>;
/** Per-session pending message batches for aggregation */
pendingBatches: Map<string, PendingBatch>;
autoStartChecked: boolean;
}

Expand All @@ -190,13 +201,17 @@ function getState(): BridgeManagerState {
loopAborts: new Map(),
activeTasks: new Map(),
sessionLocks: new Map(),
pendingBatches: new Map(),
autoStartChecked: false,
};
}
// Backfill sessionLocks for states created before this field existed
if (!g[GLOBAL_KEY].sessionLocks) {
g[GLOBAL_KEY].sessionLocks = new Map();
}
if (!g[GLOBAL_KEY].pendingBatches) {
g[GLOBAL_KEY].pendingBatches = new Map();
}
return g[GLOBAL_KEY];
}

Expand All @@ -209,10 +224,21 @@ function processWithSessionLock(sessionId: string, fn: () => Promise<void>): Pro
const prev = state.sessionLocks.get(sessionId) || Promise.resolve();
const current = prev.then(fn, fn);
state.sessionLocks.set(sessionId, current);
// Cleanup when the chain completes
// Cleanup when the chain completes, and flush any pending batched messages
current.finally(() => {
if (state.sessionLocks.get(sessionId) === current) {
state.sessionLocks.delete(sessionId);
// Check if messages accumulated during processing — flush them
const batch = state.pendingBatches.get(sessionId);
if (batch && batch.messages.length > 0) {
// Find the adapter for this batch
for (const [, adapter] of state.adapters) {
if (adapter.isRunning()) {
flushBatch(adapter, sessionId);
break;
}
}
}
}
});
return current;
Expand Down Expand Up @@ -369,10 +395,129 @@ export function registerAdapter(adapter: BaseChannelAdapter): void {
state.adapters.set(adapter.channelType, adapter);
}

/**
* Flush a pending message batch: merge all queued messages into a single
* prompt and process as one Claude request.
*/
function flushBatch(adapter: BaseChannelAdapter, sessionId: string): void {
const state = getState();
const batch = state.pendingBatches.get(sessionId);
if (!batch || batch.messages.length === 0) {
state.pendingBatches.delete(sessionId);
return;
}

if (batch.timer) {
clearTimeout(batch.timer);
batch.timer = null;
}

const messages = batch.messages.splice(0);
state.pendingBatches.delete(sessionId);

// Merge multiple messages into one. If only one message, pass through unchanged.
if (messages.length === 1) {
processWithSessionLock(sessionId, () =>
handleMessage(adapter, messages[0]),
).catch(err => {
console.error(`[bridge-manager] Session ${sessionId.slice(0, 8)} error:`, err);
});
return;
}

// Multiple messages → merge into a single prompt.
// Use the last message as the base (for address, messageId, etc.)
// and concatenate all text with separators.
const lastMsg = messages[messages.length - 1];
const mergedText = messages
.map((m, i) => m.text.trim())
.filter(Boolean)
.join('\n\n---\n\n');

// Merge attachments from all messages
const allAttachments = messages
.flatMap(m => m.attachments || []);

const mergedMsg: InboundMessage = {
...lastMsg,
text: mergedText,
attachments: allAttachments.length > 0 ? allAttachments : undefined,
};

console.log(`[bridge-manager] Batched ${messages.length} messages for session ${sessionId.slice(0, 8)}`);

// Send a confirmation so the user knows all messages were received
deliver(adapter, {
address: lastMsg.address,
text: `📨 Received ${messages.length} messages, processing as batch...`,
parseMode: 'plain',
}).catch(() => { /* best effort */ });

processWithSessionLock(sessionId, () =>
handleMessage(adapter, mergedMsg),
).catch(err => {
console.error(`[bridge-manager] Session ${sessionId.slice(0, 8)} batch error:`, err);
});
}

/**
* Schedule a message for batching. If the session is currently processing
* (has an active session lock), messages are accumulated and flushed together
* after a short window (BATCH_WINDOW_MS) to avoid serial one-by-one processing.
*/
function scheduleOrProcess(adapter: BaseChannelAdapter, msg: InboundMessage): void {
const state = getState();
const binding = router.resolve(msg.address);
const sessionId = binding.codepilotSessionId;

// If session is currently busy (has an active lock chain), batch the message
const hasActiveLock = state.sessionLocks.has(sessionId);

if (hasActiveLock) {
// Session is busy — add to pending batch
let batch = state.pendingBatches.get(sessionId);
if (!batch) {
batch = { messages: [], timer: null };
state.pendingBatches.set(sessionId, batch);
}
batch.messages.push(msg);

// Send immediate acknowledgment so user knows message was received
deliver(adapter, {
address: msg.address,
text: `⏳ Message received (#${batch.messages.length} in queue). Will process after current task.`,
parseMode: 'plain',
}).catch(() => { /* best effort */ });

// Reset the batch window timer — wait for more messages
if (batch.timer) clearTimeout(batch.timer);
batch.timer = setTimeout(() => {
flushBatch(adapter, sessionId);
}, BATCH_WINDOW_MS);

return;
}

// Session is free — check if there are pending batched messages to merge with
const batch = state.pendingBatches.get(sessionId);
if (batch && batch.messages.length > 0) {
batch.messages.push(msg);
flushBatch(adapter, sessionId);
return;
}

// No batching needed — process directly
processWithSessionLock(sessionId, () =>
handleMessage(adapter, msg),
).catch(err => {
console.error(`[bridge-manager] Session ${sessionId.slice(0, 8)} error:`, err);
});
}

/**
* Run the event loop for a single adapter.
* Messages for different sessions are dispatched concurrently;
* messages for the same session are serialized via session locks.
* messages for the same session use batching to merge rapid-fire messages.
*/
function runAdapterLoop(adapter: BaseChannelAdapter): void {
const state = getState();
Expand All @@ -386,18 +531,11 @@ function runAdapterLoop(adapter: BaseChannelAdapter): void {
if (!msg) continue; // Adapter stopped

// Callback queries and commands are lightweight — process inline.
// Regular messages use per-session locking for concurrency.
if (msg.callbackData || msg.text.trim().startsWith('/')) {
await handleMessage(adapter, msg);
} else {
const binding = router.resolve(msg.address);
// Fire-and-forget into session lock — loop continues to accept
// messages for other sessions immediately.
processWithSessionLock(binding.codepilotSessionId, () =>
handleMessage(adapter, msg),
).catch(err => {
console.error(`[bridge-manager] Session ${binding.codepilotSessionId.slice(0, 8)} error:`, err);
});
// Regular messages — use batching for same-session aggregation
scheduleOrProcess(adapter, msg);
}
} catch (err) {
if (abort.signal.aborted) break;
Expand Down