Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 99 additions & 80 deletions src/lib/bridge/adapters/feishu-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import type {
} from '../types';
import type { FileAttachment } from '@/types';
import { BaseChannelAdapter, registerAdapterFactory } from '../channel-adapter';
import { insertAuditLog } from '../../db';
import { insertAuditLog, getChannelOffset, setChannelOffset } from '../../db';
import { getSetting } from '../../db';
import {
htmlToFeishuMarkdown,
Expand All @@ -35,12 +35,17 @@ import {
buildPostContent,
} from '../markdown/feishu';

/** Max number of message_ids to keep for dedup. */
const DEDUP_MAX = 1000;
/** Max number of message_ids to keep for dedup (in-memory LRU). */
const DEDUP_MAX = 5000;

/** Max file download size (20 MB). */
const MAX_FILE_SIZE = 20 * 1024 * 1024;

/** Max retries for resource downloads. */
const DOWNLOAD_MAX_RETRIES = 2;
/** Base delay between download retries (ms). */
const DOWNLOAD_RETRY_DELAY_MS = 1000;

/** Feishu emoji type for typing indicator (same as Openclaw). */
const TYPING_EMOJI = 'Typing';

Expand Down Expand Up @@ -832,6 +837,7 @@ export class FeishuAdapter extends BaseChannelAdapter {

/**
* Download a message resource (image/file/audio/video) via SDK.
* Retries transient failures with exponential backoff.
* Returns null on failure (caller decides fallback behavior).
*/
private async downloadResource(
Expand All @@ -841,99 +847,112 @@ export class FeishuAdapter extends BaseChannelAdapter {
): Promise<FileAttachment | null> {
if (!this.restClient) return null;

try {
console.log(`[feishu-adapter] Downloading resource: type=${resourceType}, key=${fileKey}, msgId=${messageId}`);

const res = await this.restClient.im.messageResource.get({
path: {
message_id: messageId,
file_key: fileKey,
},
params: {
type: resourceType === 'image' ? 'image' : 'file',
},
});
for (let attempt = 0; attempt <= DOWNLOAD_MAX_RETRIES; attempt++) {
try {
if (attempt > 0) {
console.log(`[feishu-adapter] Download retry ${attempt}/${DOWNLOAD_MAX_RETRIES}: key=${fileKey}`);
await new Promise(r => setTimeout(r, DOWNLOAD_RETRY_DELAY_MS * Math.pow(2, attempt - 1)));
} else {
console.log(`[feishu-adapter] Downloading resource: type=${resourceType}, key=${fileKey}, msgId=${messageId}`);
}

if (!res) {
console.warn('[feishu-adapter] messageResource.get returned null/undefined');
return null;
}
const res = await this.restClient.im.messageResource.get({
path: {
message_id: messageId,
file_key: fileKey,
},
params: {
type: resourceType === 'image' ? 'image' : 'file',
},
});

if (!res) {
console.warn('[feishu-adapter] messageResource.get returned null/undefined');
continue;
}

// SDK returns { writeFile, getReadableStream, headers }
// Try stream approach first, fall back to writeFile + read if stream fails
let buffer: Buffer;
// SDK returns { writeFile, getReadableStream, headers }
// Try stream approach first, fall back to writeFile + read if stream fails
let buffer: Buffer;

try {
const readable = res.getReadableStream();
const chunks: Buffer[] = [];
let totalSize = 0;

for await (const chunk of readable) {
const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
totalSize += buf.length;
if (totalSize > MAX_FILE_SIZE) {
console.warn(`[feishu-adapter] Resource too large (>${MAX_FILE_SIZE} bytes), key: ${fileKey}`);
return null;
}
chunks.push(buf);
}
buffer = Buffer.concat(chunks);
} catch (streamErr) {
// Stream approach failed — fall back to writeFile + read
console.warn('[feishu-adapter] Stream read failed, falling back to writeFile:', streamErr instanceof Error ? streamErr.message : streamErr);

const fs = await import('fs');
const os = await import('os');
const path = await import('path');
const tmpPath = path.join(os.tmpdir(), `feishu-dl-${crypto.randomUUID()}`);
try {
await res.writeFile(tmpPath);
buffer = fs.readFileSync(tmpPath);
if (buffer.length > MAX_FILE_SIZE) {
console.warn(`[feishu-adapter] Resource too large (>${MAX_FILE_SIZE} bytes), key: ${fileKey}`);
return null;
const readable = res.getReadableStream();
const chunks: Buffer[] = [];
let totalSize = 0;

for await (const chunk of readable) {
const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
totalSize += buf.length;
if (totalSize > MAX_FILE_SIZE) {
console.warn(`[feishu-adapter] Resource too large (>${MAX_FILE_SIZE} bytes), key: ${fileKey}`);
return null; // Size limit — don't retry
}
chunks.push(buf);
}
buffer = Buffer.concat(chunks);
} catch (streamErr) {
// Stream approach failed — fall back to writeFile + read
console.warn('[feishu-adapter] Stream read failed, falling back to writeFile:', streamErr instanceof Error ? streamErr.message : streamErr);

const fs = await import('fs');
const os = await import('os');
const path = await import('path');
const tmpPath = path.join(os.tmpdir(), `feishu-dl-${crypto.randomUUID()}`);
try {
await res.writeFile(tmpPath);
buffer = fs.readFileSync(tmpPath);
if (buffer.length > MAX_FILE_SIZE) {
console.warn(`[feishu-adapter] Resource too large (>${MAX_FILE_SIZE} bytes), key: ${fileKey}`);
return null; // Size limit — don't retry
}
} finally {
try { fs.unlinkSync(tmpPath); } catch { /* ignore cleanup errors */ }
}
} finally {
try { fs.unlinkSync(tmpPath); } catch { /* ignore cleanup errors */ }
}
}

if (!buffer || buffer.length === 0) {
console.warn('[feishu-adapter] Downloaded resource is empty, key:', fileKey);
return null;
}
if (!buffer || buffer.length === 0) {
console.warn('[feishu-adapter] Downloaded resource is empty, key:', fileKey);
continue; // Empty result — retry
}

const base64 = buffer.toString('base64');
const id = crypto.randomUUID();
const mimeType = MIME_BY_TYPE[resourceType] || 'application/octet-stream';
const ext = resourceType === 'image' ? 'png'
: resourceType === 'audio' ? 'ogg'
: resourceType === 'video' ? 'mp4'
: 'bin';

console.log(`[feishu-adapter] Resource downloaded: ${buffer.length} bytes, key=${fileKey}`);

return {
id,
name: `${fileKey}.${ext}`,
type: mimeType,
size: buffer.length,
data: base64,
};
} catch (err) {
console.error(
`[feishu-adapter] Resource download failed (type=${resourceType}, key=${fileKey}):`,
err instanceof Error ? err.stack || err.message : err,
);
return null;
const base64 = buffer.toString('base64');
const id = crypto.randomUUID();
const mimeType = MIME_BY_TYPE[resourceType] || 'application/octet-stream';
const ext = resourceType === 'image' ? 'png'
: resourceType === 'audio' ? 'ogg'
: resourceType === 'video' ? 'mp4'
: 'bin';

console.log(`[feishu-adapter] Resource downloaded: ${buffer.length} bytes, key=${fileKey}`);

return {
id,
name: `${fileKey}.${ext}`,
type: mimeType,
size: buffer.length,
data: base64,
};
} catch (err) {
console.error(
`[feishu-adapter] Resource download failed (attempt ${attempt + 1}/${DOWNLOAD_MAX_RETRIES + 1}, type=${resourceType}, key=${fileKey}):`,
err instanceof Error ? err.stack || err.message : err,
);
// Continue to next retry attempt
}
}

console.error(`[feishu-adapter] Resource download exhausted retries: key=${fileKey}`);
return null;
}

// ── Utilities ───────────────────────────────────────────────

private addToDedup(messageId: string): void {
this.seenMessageIds.set(messageId, true);

// Persist latest processed message ID (survives restart)
try { setChannelOffset('feishu', messageId); } catch { /* best effort */ }

// LRU eviction: remove oldest entries when exceeding limit
if (this.seenMessageIds.size > DEDUP_MAX) {
const excess = this.seenMessageIds.size - DEDUP_MAX;
Expand Down
39 changes: 31 additions & 8 deletions src/lib/bridge/bridge-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,13 +391,33 @@ function runAdapterLoop(adapter: BaseChannelAdapter): void {
await handleMessage(adapter, msg);
} else {
const binding = router.resolve(msg.address);
// Fire-and-forget into session lock — loop continues to accept
// messages for other sessions immediately.
processWithSessionLock(binding.codepilotSessionId, () =>
handleMessage(adapter, msg),
).catch(err => {
console.error(`[bridge-manager] Session ${binding.codepilotSessionId.slice(0, 8)} error:`, err);
});
const parallelEnabled = getSetting('bridge_parallel_tasks') === 'true';
const sessionBusy = state.activeTasks.has(binding.codepilotSessionId);

if (parallelEnabled && sessionBusy) {
// ── Parallel mode: spawn a worker session ────────────────
// The main session is busy, so create an ephemeral worker
// session that inherits config from the main binding. This
// allows multiple messages to be processed concurrently with
// independent Claude streams.
const workerBinding = router.createWorkerBinding(binding);
console.log(
`[bridge-manager] Parallel dispatch: worker ${workerBinding.codepilotSessionId.slice(0, 8)} ` +
`spawned from busy session ${binding.codepilotSessionId.slice(0, 8)}`,
);
handleMessage(adapter, msg, workerBinding).catch(err => {
console.error(`[bridge-manager] Worker ${workerBinding.codepilotSessionId.slice(0, 8)} error:`, err);
});
} else {
// ── Standard mode: serialize within session ──────────────
// Fire-and-forget into session lock — loop continues to accept
// messages for other sessions immediately.
processWithSessionLock(binding.codepilotSessionId, () =>
handleMessage(adapter, msg),
).catch(err => {
console.error(`[bridge-manager] Session ${binding.codepilotSessionId.slice(0, 8)} error:`, err);
});
}
}
} catch (err) {
if (abort.signal.aborted) break;
Expand All @@ -424,10 +444,12 @@ function runAdapterLoop(adapter: BaseChannelAdapter): void {

/**
* Handle a single inbound message.
* @param overrideBinding Optional pre-resolved binding (used for parallel worker sessions).
*/
async function handleMessage(
adapter: BaseChannelAdapter,
msg: InboundMessage,
overrideBinding?: import('./types').ChannelBinding,
): Promise<void> {
// Update lastMessageAt for this adapter
const adapterState = getState();
Expand Down Expand Up @@ -488,7 +510,8 @@ async function handleMessage(
if (!text && !hasAttachments) { ack(); return; }

// Regular message — route to conversation engine
const binding = router.resolve(msg.address);
// Use override binding when provided (e.g., parallel worker session)
const binding = overrideBinding || router.resolve(msg.address);

// Notify adapter that message processing is starting (e.g., typing indicator)
adapter.onMessageStart?.(msg.address.chatId);
Expand Down
41 changes: 41 additions & 0 deletions src/lib/bridge/channel-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,47 @@ export function updateBinding(
updateChannelBinding(id, updates);
}

/**
* Create an ephemeral worker session that inherits configuration from a
* parent binding. Used for parallel task processing — each concurrent
* message gets its own independent Claude session so they can run
* simultaneously without contending for the same session lock.
*
* The returned ChannelBinding has an empty `id`, indicating it is NOT
* persisted to the channel_bindings table. This prevents worker sessions
* from polluting the binding lookup and allows them to be garbage-collected
* naturally.
*/
export function createWorkerBinding(parentBinding: ChannelBinding): ChannelBinding {
const session = createSession(
`Worker: ${parentBinding.chatId.slice(0, 16)}`,
parentBinding.model,
undefined,
parentBinding.workingDirectory,
parentBinding.mode,
);

// Copy provider from parent session if present
const parentSession = getSession(parentBinding.codepilotSessionId);
if (parentSession?.provider_id) {
updateSessionProviderId(session.id, parentSession.provider_id);
}

return {
id: '', // empty = ephemeral, not persisted
channelType: parentBinding.channelType,
chatId: parentBinding.chatId,
codepilotSessionId: session.id,
sdkSessionId: '',
workingDirectory: parentBinding.workingDirectory,
model: parentBinding.model,
mode: parentBinding.mode,
active: true,
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
};
}

/**
* List all bindings, optionally filtered by channel type.
*/
Expand Down
33 changes: 32 additions & 1 deletion src/lib/telegram-bot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ export interface TelegramNotifyOptions {

const TELEGRAM_API = 'https://api.telegram.org';
const MAX_MESSAGE_LENGTH = 4000; // Telegram limit is 4096, leave buffer
const NOTIFY_MAX_RETRIES = 2;
const NOTIFY_BASE_DELAY_MS = 1000;

// ── Bridge Mode Guard ─────────────────────────────────────────

Expand Down Expand Up @@ -115,9 +117,38 @@ function ensurePollingStarted(): void {

// ── Core API ───────────────────────────────────────────────────

/**
* Send a single API call with retry and exponential backoff.
*/
async function callWithRetry(
botToken: string,
method: string,
params: Record<string, unknown>,
): Promise<{ ok: boolean; error?: string }> {
let lastError: string | undefined;
for (let attempt = 0; attempt <= NOTIFY_MAX_RETRIES; attempt++) {
const result = await callTelegramApi(botToken, method, params);
if (result.ok) return result;

lastError = result.error;
// Don't retry client errors (4xx except 429)
const status = (result as { httpStatus?: number }).httpStatus;
if (status && status >= 400 && status < 500 && status !== 429) {
return result;
}

if (attempt < NOTIFY_MAX_RETRIES) {
const delay = NOTIFY_BASE_DELAY_MS * Math.pow(2, attempt) + Math.random() * 500;
await new Promise(r => setTimeout(r, delay));
}
}
return { ok: false, error: lastError || 'Max retries exceeded' };
}

/**
* Send a message to the configured Telegram chat.
* Automatically splits messages that exceed the Telegram limit.
* Retries transient failures with exponential backoff.
*/
async function sendMessage(text: string, parseMode: 'HTML' | 'Markdown' = 'HTML'): Promise<{ ok: boolean; error?: string }> {
const config = getTelegramConfig();
Expand All @@ -130,7 +161,7 @@ async function sendMessage(text: string, parseMode: 'HTML' | 'Markdown' = 'HTML'

const chunks = splitMessage(text, MAX_MESSAGE_LENGTH);
for (const chunk of chunks) {
const result = await callTelegramApi(config.botToken, 'sendMessage', {
const result = await callWithRetry(config.botToken, 'sendMessage', {
chat_id: config.chatId,
text: chunk,
parse_mode: parseMode,
Expand Down