Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions src/lib/bridge/bridge-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,13 +391,34 @@ function runAdapterLoop(adapter: BaseChannelAdapter): void {
await handleMessage(adapter, msg);
} else {
const binding = router.resolve(msg.address);
// Fire-and-forget into session lock — loop continues to accept
// messages for other sessions immediately.
processWithSessionLock(binding.codepilotSessionId, () =>
handleMessage(adapter, msg),
).catch(err => {
console.error(`[bridge-manager] Session ${binding.codepilotSessionId.slice(0, 8)} error:`, err);
});
const parallelEnabled = getSetting('bridge_parallel_tasks') === 'true';
const sessionBusy = state.activeTasks.has(binding.codepilotSessionId)
|| state.sessionLocks.has(binding.codepilotSessionId);

if (parallelEnabled && sessionBusy) {
// ── Parallel mode: spawn a worker session ────────────────
// The main session is busy, so create an ephemeral worker
// session that inherits config from the main binding. This
// allows multiple messages to be processed concurrently with
// independent Claude streams.
const workerBinding = router.createWorkerBinding(binding);
console.log(
`[bridge-manager] Parallel dispatch: worker ${workerBinding.codepilotSessionId.slice(0, 8)} ` +
`spawned from busy session ${binding.codepilotSessionId.slice(0, 8)}`,
);
handleMessage(adapter, msg, workerBinding).catch(err => {
console.error(`[bridge-manager] Worker ${workerBinding.codepilotSessionId.slice(0, 8)} error:`, err);
});
} else {
// ── Standard mode: serialize within session ──────────────
// Fire-and-forget into session lock — loop continues to accept
// messages for other sessions immediately.
processWithSessionLock(binding.codepilotSessionId, () =>
handleMessage(adapter, msg),
).catch(err => {
console.error(`[bridge-manager] Session ${binding.codepilotSessionId.slice(0, 8)} error:`, err);
});
}
}
} catch (err) {
if (abort.signal.aborted) break;
Expand All @@ -424,10 +445,12 @@ function runAdapterLoop(adapter: BaseChannelAdapter): void {

/**
* Handle a single inbound message.
* @param overrideBinding Optional pre-resolved binding (used for parallel worker sessions).
*/
async function handleMessage(
adapter: BaseChannelAdapter,
msg: InboundMessage,
overrideBinding?: import('./types').ChannelBinding,
): Promise<void> {
// Update lastMessageAt for this adapter
const adapterState = getState();
Expand Down Expand Up @@ -488,7 +511,8 @@ async function handleMessage(
if (!text && !hasAttachments) { ack(); return; }

// Regular message — route to conversation engine
const binding = router.resolve(msg.address);
// Use override binding when provided (e.g., parallel worker session)
const binding = overrideBinding || router.resolve(msg.address);

// Notify adapter that message processing is starting (e.g., typing indicator)
adapter.onMessageStart?.(msg.address.chatId);
Expand Down
38 changes: 38 additions & 0 deletions src/lib/bridge/channel-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,44 @@ export function updateBinding(
updateChannelBinding(id, updates);
}

/**
* Create an ephemeral worker binding for parallel message processing.
*
* Creates a new session that inherits config from the parent binding
* but does NOT touch channel_bindings — the parent's binding stays intact.
* The worker session is independent and will be garbage collected when idle.
*/
export function createWorkerBinding(parentBinding: ChannelBinding): ChannelBinding {
const session = createSession(
`Worker: ${parentBinding.chatId.slice(0, 16)}`,
parentBinding.model,
undefined,
parentBinding.workingDirectory,
parentBinding.mode,
);

// Inherit provider from parent session
const parentSession = getSession(parentBinding.codepilotSessionId);
if (parentSession?.provider_id) {
updateSessionProviderId(session.id, parentSession.provider_id);
}

// Return a synthetic binding — NOT persisted to channel_bindings table
return {
id: '', // empty = ephemeral, not persisted
channelType: parentBinding.channelType,
chatId: parentBinding.chatId,
codepilotSessionId: session.id,
sdkSessionId: '',
workingDirectory: parentBinding.workingDirectory,
model: parentBinding.model,
mode: parentBinding.mode,
active: 1,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
};
}

/**
* List all bindings, optionally filtered by channel type.
*/
Expand Down