diff --git a/src/lib/bridge/adapters/feishu-adapter.ts b/src/lib/bridge/adapters/feishu-adapter.ts index 9f9fe6e7..bc4e6db1 100644 --- a/src/lib/bridge/adapters/feishu-adapter.ts +++ b/src/lib/bridge/adapters/feishu-adapter.ts @@ -25,7 +25,7 @@ import type { } from '../types'; import type { FileAttachment } from '@/types'; import { BaseChannelAdapter, registerAdapterFactory } from '../channel-adapter'; -import { insertAuditLog } from '../../db'; +import { insertAuditLog, getChannelOffset, setChannelOffset } from '../../db'; import { getSetting } from '../../db'; import { htmlToFeishuMarkdown, @@ -35,12 +35,17 @@ import { buildPostContent, } from '../markdown/feishu'; -/** Max number of message_ids to keep for dedup. */ -const DEDUP_MAX = 1000; +/** Max number of message_ids to keep for dedup (in-memory LRU). */ +const DEDUP_MAX = 5000; /** Max file download size (20 MB). */ const MAX_FILE_SIZE = 20 * 1024 * 1024; +/** Max retries for resource downloads. */ +const DOWNLOAD_MAX_RETRIES = 2; +/** Base delay between download retries (ms). */ +const DOWNLOAD_RETRY_DELAY_MS = 1000; + /** Feishu emoji type for typing indicator (same as Openclaw). */ const TYPING_EMOJI = 'Typing'; @@ -832,6 +837,7 @@ export class FeishuAdapter extends BaseChannelAdapter { /** * Download a message resource (image/file/audio/video) via SDK. + * Retries transient failures with exponential backoff. * Returns null on failure (caller decides fallback behavior). */ private async downloadResource( @@ -841,92 +847,102 @@ export class FeishuAdapter extends BaseChannelAdapter { ): Promise { if (!this.restClient) return null; - try { - console.log(`[feishu-adapter] Downloading resource: type=${resourceType}, key=${fileKey}, msgId=${messageId}`); - - const res = await this.restClient.im.messageResource.get({ - path: { - message_id: messageId, - file_key: fileKey, - }, - params: { - type: resourceType === 'image' ? 'image' : 'file', - }, - }); + for (let attempt = 0; attempt <= DOWNLOAD_MAX_RETRIES; attempt++) { + try { + if (attempt > 0) { + console.log(`[feishu-adapter] Download retry ${attempt}/${DOWNLOAD_MAX_RETRIES}: key=${fileKey}`); + await new Promise(r => setTimeout(r, DOWNLOAD_RETRY_DELAY_MS * Math.pow(2, attempt - 1))); + } else { + console.log(`[feishu-adapter] Downloading resource: type=${resourceType}, key=${fileKey}, msgId=${messageId}`); + } - if (!res) { - console.warn('[feishu-adapter] messageResource.get returned null/undefined'); - return null; - } + const res = await this.restClient.im.messageResource.get({ + path: { + message_id: messageId, + file_key: fileKey, + }, + params: { + type: resourceType === 'image' ? 'image' : 'file', + }, + }); + + if (!res) { + console.warn('[feishu-adapter] messageResource.get returned null/undefined'); + continue; + } - // SDK returns { writeFile, getReadableStream, headers } - // Try stream approach first, fall back to writeFile + read if stream fails - let buffer: Buffer; + // SDK returns { writeFile, getReadableStream, headers } + // Try stream approach first, fall back to writeFile + read if stream fails + let buffer: Buffer; - try { - const readable = res.getReadableStream(); - const chunks: Buffer[] = []; - let totalSize = 0; - - for await (const chunk of readable) { - const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); - totalSize += buf.length; - if (totalSize > MAX_FILE_SIZE) { - console.warn(`[feishu-adapter] Resource too large (>${MAX_FILE_SIZE} bytes), key: ${fileKey}`); - return null; - } - chunks.push(buf); - } - buffer = Buffer.concat(chunks); - } catch (streamErr) { - // Stream approach failed — fall back to writeFile + read - console.warn('[feishu-adapter] Stream read failed, falling back to writeFile:', streamErr instanceof Error ? streamErr.message : streamErr); - - const fs = await import('fs'); - const os = await import('os'); - const path = await import('path'); - const tmpPath = path.join(os.tmpdir(), `feishu-dl-${crypto.randomUUID()}`); try { - await res.writeFile(tmpPath); - buffer = fs.readFileSync(tmpPath); - if (buffer.length > MAX_FILE_SIZE) { - console.warn(`[feishu-adapter] Resource too large (>${MAX_FILE_SIZE} bytes), key: ${fileKey}`); - return null; + const readable = res.getReadableStream(); + const chunks: Buffer[] = []; + let totalSize = 0; + + for await (const chunk of readable) { + const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk); + totalSize += buf.length; + if (totalSize > MAX_FILE_SIZE) { + console.warn(`[feishu-adapter] Resource too large (>${MAX_FILE_SIZE} bytes), key: ${fileKey}`); + return null; // Size limit — don't retry + } + chunks.push(buf); + } + buffer = Buffer.concat(chunks); + } catch (streamErr) { + // Stream approach failed — fall back to writeFile + read + console.warn('[feishu-adapter] Stream read failed, falling back to writeFile:', streamErr instanceof Error ? streamErr.message : streamErr); + + const fs = await import('fs'); + const os = await import('os'); + const path = await import('path'); + const tmpPath = path.join(os.tmpdir(), `feishu-dl-${crypto.randomUUID()}`); + try { + await res.writeFile(tmpPath); + buffer = fs.readFileSync(tmpPath); + if (buffer.length > MAX_FILE_SIZE) { + console.warn(`[feishu-adapter] Resource too large (>${MAX_FILE_SIZE} bytes), key: ${fileKey}`); + return null; // Size limit — don't retry + } + } finally { + try { fs.unlinkSync(tmpPath); } catch { /* ignore cleanup errors */ } } - } finally { - try { fs.unlinkSync(tmpPath); } catch { /* ignore cleanup errors */ } } - } - if (!buffer || buffer.length === 0) { - console.warn('[feishu-adapter] Downloaded resource is empty, key:', fileKey); - return null; - } + if (!buffer || buffer.length === 0) { + console.warn('[feishu-adapter] Downloaded resource is empty, key:', fileKey); + continue; // Empty result — retry + } - const base64 = buffer.toString('base64'); - const id = crypto.randomUUID(); - const mimeType = MIME_BY_TYPE[resourceType] || 'application/octet-stream'; - const ext = resourceType === 'image' ? 'png' - : resourceType === 'audio' ? 'ogg' - : resourceType === 'video' ? 'mp4' - : 'bin'; - - console.log(`[feishu-adapter] Resource downloaded: ${buffer.length} bytes, key=${fileKey}`); - - return { - id, - name: `${fileKey}.${ext}`, - type: mimeType, - size: buffer.length, - data: base64, - }; - } catch (err) { - console.error( - `[feishu-adapter] Resource download failed (type=${resourceType}, key=${fileKey}):`, - err instanceof Error ? err.stack || err.message : err, - ); - return null; + const base64 = buffer.toString('base64'); + const id = crypto.randomUUID(); + const mimeType = MIME_BY_TYPE[resourceType] || 'application/octet-stream'; + const ext = resourceType === 'image' ? 'png' + : resourceType === 'audio' ? 'ogg' + : resourceType === 'video' ? 'mp4' + : 'bin'; + + console.log(`[feishu-adapter] Resource downloaded: ${buffer.length} bytes, key=${fileKey}`); + + return { + id, + name: `${fileKey}.${ext}`, + type: mimeType, + size: buffer.length, + data: base64, + }; + } catch (err) { + console.error( + `[feishu-adapter] Resource download failed (attempt ${attempt + 1}/${DOWNLOAD_MAX_RETRIES + 1}, type=${resourceType}, key=${fileKey}):`, + err instanceof Error ? err.stack || err.message : err, + ); + // Continue to next retry attempt + } } + + console.error(`[feishu-adapter] Resource download exhausted retries: key=${fileKey}`); + return null; } // ── Utilities ─────────────────────────────────────────────── @@ -934,6 +950,9 @@ export class FeishuAdapter extends BaseChannelAdapter { private addToDedup(messageId: string): void { this.seenMessageIds.set(messageId, true); + // Persist latest processed message ID (survives restart) + try { setChannelOffset('feishu', messageId); } catch { /* best effort */ } + // LRU eviction: remove oldest entries when exceeding limit if (this.seenMessageIds.size > DEDUP_MAX) { const excess = this.seenMessageIds.size - DEDUP_MAX; diff --git a/src/lib/bridge/bridge-manager.ts b/src/lib/bridge/bridge-manager.ts index 912b27b5..1aed9448 100644 --- a/src/lib/bridge/bridge-manager.ts +++ b/src/lib/bridge/bridge-manager.ts @@ -391,13 +391,33 @@ function runAdapterLoop(adapter: BaseChannelAdapter): void { await handleMessage(adapter, msg); } else { const binding = router.resolve(msg.address); - // Fire-and-forget into session lock — loop continues to accept - // messages for other sessions immediately. - processWithSessionLock(binding.codepilotSessionId, () => - handleMessage(adapter, msg), - ).catch(err => { - console.error(`[bridge-manager] Session ${binding.codepilotSessionId.slice(0, 8)} error:`, err); - }); + const parallelEnabled = getSetting('bridge_parallel_tasks') === 'true'; + const sessionBusy = state.activeTasks.has(binding.codepilotSessionId); + + if (parallelEnabled && sessionBusy) { + // ── Parallel mode: spawn a worker session ──────────────── + // The main session is busy, so create an ephemeral worker + // session that inherits config from the main binding. This + // allows multiple messages to be processed concurrently with + // independent Claude streams. + const workerBinding = router.createWorkerBinding(binding); + console.log( + `[bridge-manager] Parallel dispatch: worker ${workerBinding.codepilotSessionId.slice(0, 8)} ` + + `spawned from busy session ${binding.codepilotSessionId.slice(0, 8)}`, + ); + handleMessage(adapter, msg, workerBinding).catch(err => { + console.error(`[bridge-manager] Worker ${workerBinding.codepilotSessionId.slice(0, 8)} error:`, err); + }); + } else { + // ── Standard mode: serialize within session ────────────── + // Fire-and-forget into session lock — loop continues to accept + // messages for other sessions immediately. + processWithSessionLock(binding.codepilotSessionId, () => + handleMessage(adapter, msg), + ).catch(err => { + console.error(`[bridge-manager] Session ${binding.codepilotSessionId.slice(0, 8)} error:`, err); + }); + } } } catch (err) { if (abort.signal.aborted) break; @@ -424,10 +444,12 @@ function runAdapterLoop(adapter: BaseChannelAdapter): void { /** * Handle a single inbound message. + * @param overrideBinding Optional pre-resolved binding (used for parallel worker sessions). */ async function handleMessage( adapter: BaseChannelAdapter, msg: InboundMessage, + overrideBinding?: import('./types').ChannelBinding, ): Promise { // Update lastMessageAt for this adapter const adapterState = getState(); @@ -488,7 +510,8 @@ async function handleMessage( if (!text && !hasAttachments) { ack(); return; } // Regular message — route to conversation engine - const binding = router.resolve(msg.address); + // Use override binding when provided (e.g., parallel worker session) + const binding = overrideBinding || router.resolve(msg.address); // Notify adapter that message processing is starting (e.g., typing indicator) adapter.onMessageStart?.(msg.address.chatId); diff --git a/src/lib/bridge/channel-router.ts b/src/lib/bridge/channel-router.ts index b1ec5aa4..9250fe46 100644 --- a/src/lib/bridge/channel-router.ts +++ b/src/lib/bridge/channel-router.ts @@ -102,6 +102,47 @@ export function updateBinding( updateChannelBinding(id, updates); } +/** + * Create an ephemeral worker session that inherits configuration from a + * parent binding. Used for parallel task processing — each concurrent + * message gets its own independent Claude session so they can run + * simultaneously without contending for the same session lock. + * + * The returned ChannelBinding has an empty `id`, indicating it is NOT + * persisted to the channel_bindings table. This prevents worker sessions + * from polluting the binding lookup and allows them to be garbage-collected + * naturally. + */ +export function createWorkerBinding(parentBinding: ChannelBinding): ChannelBinding { + const session = createSession( + `Worker: ${parentBinding.chatId.slice(0, 16)}`, + parentBinding.model, + undefined, + parentBinding.workingDirectory, + parentBinding.mode, + ); + + // Copy provider from parent session if present + const parentSession = getSession(parentBinding.codepilotSessionId); + if (parentSession?.provider_id) { + updateSessionProviderId(session.id, parentSession.provider_id); + } + + return { + id: '', // empty = ephemeral, not persisted + channelType: parentBinding.channelType, + chatId: parentBinding.chatId, + codepilotSessionId: session.id, + sdkSessionId: '', + workingDirectory: parentBinding.workingDirectory, + model: parentBinding.model, + mode: parentBinding.mode, + active: true, + createdAt: new Date().toISOString(), + updatedAt: new Date().toISOString(), + }; +} + /** * List all bindings, optionally filtered by channel type. */ diff --git a/src/lib/telegram-bot.ts b/src/lib/telegram-bot.ts index c50e36f0..c49a4254 100644 --- a/src/lib/telegram-bot.ts +++ b/src/lib/telegram-bot.ts @@ -57,6 +57,8 @@ export interface TelegramNotifyOptions { const TELEGRAM_API = 'https://api.telegram.org'; const MAX_MESSAGE_LENGTH = 4000; // Telegram limit is 4096, leave buffer +const NOTIFY_MAX_RETRIES = 2; +const NOTIFY_BASE_DELAY_MS = 1000; // ── Bridge Mode Guard ───────────────────────────────────────── @@ -115,9 +117,38 @@ function ensurePollingStarted(): void { // ── Core API ─────────────────────────────────────────────────── +/** + * Send a single API call with retry and exponential backoff. + */ +async function callWithRetry( + botToken: string, + method: string, + params: Record, +): Promise<{ ok: boolean; error?: string }> { + let lastError: string | undefined; + for (let attempt = 0; attempt <= NOTIFY_MAX_RETRIES; attempt++) { + const result = await callTelegramApi(botToken, method, params); + if (result.ok) return result; + + lastError = result.error; + // Don't retry client errors (4xx except 429) + const status = (result as { httpStatus?: number }).httpStatus; + if (status && status >= 400 && status < 500 && status !== 429) { + return result; + } + + if (attempt < NOTIFY_MAX_RETRIES) { + const delay = NOTIFY_BASE_DELAY_MS * Math.pow(2, attempt) + Math.random() * 500; + await new Promise(r => setTimeout(r, delay)); + } + } + return { ok: false, error: lastError || 'Max retries exceeded' }; +} + /** * Send a message to the configured Telegram chat. * Automatically splits messages that exceed the Telegram limit. + * Retries transient failures with exponential backoff. */ async function sendMessage(text: string, parseMode: 'HTML' | 'Markdown' = 'HTML'): Promise<{ ok: boolean; error?: string }> { const config = getTelegramConfig(); @@ -130,7 +161,7 @@ async function sendMessage(text: string, parseMode: 'HTML' | 'Markdown' = 'HTML' const chunks = splitMessage(text, MAX_MESSAGE_LENGTH); for (const chunk of chunks) { - const result = await callTelegramApi(config.botToken, 'sendMessage', { + const result = await callWithRetry(config.botToken, 'sendMessage', { chat_id: config.chatId, text: chunk, parse_mode: parseMode,