diff --git a/.current-upstream-version b/.current-upstream-version new file mode 100644 index 000000000..fb7a04cff --- /dev/null +++ b/.current-upstream-version @@ -0,0 +1 @@ +v0.4.0 diff --git a/.github/workflows/sync-build.yml b/.github/workflows/sync-build.yml new file mode 100644 index 000000000..d55323908 --- /dev/null +++ b/.github/workflows/sync-build.yml @@ -0,0 +1,112 @@ +name: Sync upstream & build custom image + +on: + schedule: + - cron: '0 */6 * * *' # ogni 6 ore + workflow_dispatch: # trigger manuale + push: + branches: [custom] # rebuild ad ogni push su custom + +jobs: + sync-and-build: + runs-on: ubuntu-latest + steps: + - name: Checkout fork (custom branch) + uses: actions/checkout@v4 + with: + ref: custom + fetch-depth: 0 + token: ${{ secrets.PAT_TOKEN }} + + - name: Notify Telegram (start) + run: | + curl -s -X POST "https://api.telegram.org/bot${{ secrets.TELEGRAM_BOT_TOKEN }}/sendMessage" \ + -d chat_id="${{ secrets.TELEGRAM_CHAT_ID }}" \ + -d parse_mode="Markdown" \ + -d text="šŸš€ *OpenFang Build Started*%0A%0ATrigger: \`${{ github.event_name }}\`%0ABranch: \`custom\`%0ACommit: \`${GITHUB_SHA::7}\`%0A[View run](https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }})" + + - name: Configure git + run: | + git config user.name "github-actions" + git config user.email "actions@github.com" + + - name: Sync main with upstream + if: github.event_name != 'push' + run: | + git remote add upstream https://github.com/RightNow-AI/openfang.git || true + git fetch upstream --tags + git fetch origin main + + # Fast-forward main to latest upstream tag + LATEST_TAG=$(git tag -l 'v*' --sort=-v:refname | head -1) + CURRENT=$(cat .current-upstream-version 2>/dev/null || echo "none") + echo "LATEST_TAG=$LATEST_TAG" >> "$GITHUB_ENV" + echo "CURRENT=$CURRENT" >> "$GITHUB_ENV" + + if [ "$LATEST_TAG" != "$CURRENT" ]; then + echo "NEW_RELEASE=true" >> "$GITHUB_ENV" + # Reset main to latest tag, cherry-pick our generic fixes + git checkout -B main "$LATEST_TAG" + # Cherry-pick any commits on main that are ours (not upstream) + for commit in $(git log origin/main --oneline --format="%H" "$LATEST_TAG"..origin/main 2>/dev/null); do + git cherry-pick "$commit" --no-commit && git commit -C "$commit" || git cherry-pick --abort || true + done + git push origin main --force + # Now rebase custom on updated main + git checkout custom + git rebase main || git rebase --abort + echo "$LATEST_TAG" > .current-upstream-version + git add .current-upstream-version + git commit -m "chore: sync to upstream $LATEST_TAG" || true + git push origin custom --force + else + echo "NEW_RELEASE=false" >> "$GITHUB_ENV" + fi + + - name: Set up Docker Buildx + if: github.event_name == 'push' || env.NEW_RELEASE == 'true' + uses: docker/setup-buildx-action@v3 + + - name: Login to Docker Hub + if: github.event_name == 'push' || env.NEW_RELEASE == 'true' + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Build and push + if: github.event_name == 'push' || env.NEW_RELEASE == 'true' + uses: docker/build-push-action@v6 + with: + context: . + push: true + tags: | + fliva/openfang:latest + fliva/openfang:${{ env.LATEST_TAG || 'custom' }} + cache-from: type=gha + cache-to: type=gha,mode=max + + - name: Update Docker Hub description + if: github.event_name == 'push' || env.NEW_RELEASE == 'true' + uses: peter-evans/dockerhub-description@v4 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + repository: fliva/openfang + readme-filepath: ./DOCKER_README.md + + - name: Notify Telegram (success) + if: success() && (github.event_name == 'push' || env.NEW_RELEASE == 'true') + run: | + curl -s -X POST "https://api.telegram.org/bot${{ secrets.TELEGRAM_BOT_TOKEN }}/sendMessage" \ + -d chat_id="${{ secrets.TELEGRAM_CHAT_ID }}" \ + -d parse_mode="Markdown" \ + -d text="āœ… *OpenFang Build OK*%0A%0ATag: \`${{ env.LATEST_TAG || 'custom' }}\`%0ABranch: \`custom\`%0ACommit: \`${GITHUB_SHA::7}\`%0A[View run](https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }})" + + - name: Notify Telegram (failure) + if: failure() + run: | + curl -s -X POST "https://api.telegram.org/bot${{ secrets.TELEGRAM_BOT_TOKEN }}/sendMessage" \ + -d chat_id="${{ secrets.TELEGRAM_CHAT_ID }}" \ + -d parse_mode="Markdown" \ + -d text="āŒ *OpenFang Build FAILED*%0A%0ABranch: \`custom\`%0ACommit: \`${GITHUB_SHA::7}\`%0A[View run](https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }})" diff --git a/DOCKER_README.md b/DOCKER_README.md new file mode 100644 index 000000000..d3e09a054 --- /dev/null +++ b/DOCKER_README.md @@ -0,0 +1,42 @@ +# OpenFang for Lazycat NAS + +Custom [OpenFang](https://github.com/RightNow-AI/openfang) Docker image optimized for deployment on Lazycat LCMD Microserver. + +**Automatically rebuilt on every new upstream release via GitHub Actions.** + +## What's included + +- **OpenFang Agent OS** — Rust-based autonomous AI agent daemon +- **Claude Code CLI** — Anthropic's CLI for Claude, as LLM provider +- **Node.js 22** — JavaScript runtime +- **Python 3** — Python runtime +- **Go** — via Homebrew +- **Homebrew** — package manager for additional tools +- **uv** — fast Python package manager +- **gh** — GitHub CLI +- **gog** — [Google Workspace CLI](https://gogcli.sh/) (Gmail, Calendar, Drive, Sheets, etc.) +- **ffmpeg** — multimedia processing +- **jq** — JSON processor +- **git, curl, wget** — standard utilities + +## Non-root execution + +The image uses `gosu` to drop root privileges to the `openfang` user at runtime. This is required because Claude Code's `--dangerously-skip-permissions` flag refuses to run as root. + +The `openfang` user has passwordless `sudo` access, so it can still install system packages when needed. + +## Usage + +```bash +docker run -d \ + -p 4200:4200 \ + -v openfang-data:/data \ + -v openfang-home:/home/openfang \ + -e OPENFANG_HOME=/data \ + fliva/openfang:latest +``` + +## Source + +- **This fork**: [github.com/f-liva/openfang](https://github.com/f-liva/openfang) +- **Upstream**: [github.com/RightNow-AI/openfang](https://github.com/RightNow-AI/openfang) diff --git a/Dockerfile b/Dockerfile index 7b30b258d..8361b4c27 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,20 +15,37 @@ ENV CARGO_PROFILE_RELEASE_LTO=${LTO} \ CARGO_PROFILE_RELEASE_CODEGEN_UNITS=${CODEGEN_UNITS} RUN cargo build --release --bin openfang -FROM rust:1-slim-bookworm -RUN apt-get update && apt-get install -y --no-install-recommends \ - ca-certificates \ - python3 \ - python3-pip \ - python3-venv \ - nodejs \ - npm \ - && rm -rf /var/lib/apt/lists/* - +FROM debian:bookworm-slim +RUN apt-get update && apt-get install -y ca-certificates curl git ffmpeg python3 python3-pip chromium gosu sudo procps build-essential jq && rm -rf /var/lib/apt/lists/* +RUN ln -s /usr/bin/python3 /usr/bin/python +RUN curl -LsSf https://astral.sh/uv/install.sh | sh +RUN (type -p wget >/dev/null || (apt-get update && apt-get install -y wget)) && \ + mkdir -p -m 755 /etc/apt/keyrings && \ + out=$(mktemp) && wget -qO "$out" https://cli.github.com/packages/githubcli-archive-keyring.gpg && \ + cat "$out" | tee /etc/apt/keyrings/githubcli-archive-keyring.gpg > /dev/null && \ + chmod go+r /etc/apt/keyrings/githubcli-archive-keyring.gpg && \ + echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/githubcli-archive-keyring.gpg] https://cli.github.com/packages stable main" | tee /etc/apt/sources.list.d/github-cli.list > /dev/null && \ + apt-get update && apt-get install -y gh && rm -rf /var/lib/apt/lists/* +RUN curl -fsSL https://deb.nodesource.com/setup_22.x | bash - && \ + apt-get install -y nodejs && \ + npm install -g @anthropic-ai/claude-code @qwen-code/qwen-code && \ + rm -rf /var/lib/apt/lists/* +RUN useradd -m -s /bin/bash openfang && echo "openfang ALL=(ALL) NOPASSWD:ALL" > /etc/sudoers.d/openfang +USER openfang +RUN NONINTERACTIVE=1 /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)" +RUN eval "$(/home/linuxbrew/.linuxbrew/bin/brew shellenv)" && brew install steipete/tap/gogcli +USER root +RUN echo 'eval "$(/home/linuxbrew/.linuxbrew/bin/brew shellenv)"' >> /home/openfang/.bashrc && \ + echo 'eval "$(/home/linuxbrew/.linuxbrew/bin/brew shellenv)"' >> /root/.bashrc && \ + echo 'export PATH="/data/npm-global/bin:$PATH"' >> /home/openfang/.bashrc && \ + echo 'export PATH="/data/npm-global/bin:$PATH"' >> /root/.bashrc COPY --from=builder /build/target/release/openfang /usr/local/bin/ COPY --from=builder /build/agents /opt/openfang/agents +RUN mkdir -p /data && chown openfang:openfang /data +COPY entrypoint.sh /usr/local/bin/entrypoint.sh +RUN chmod +x /usr/local/bin/entrypoint.sh EXPOSE 4200 VOLUME /data ENV OPENFANG_HOME=/data -ENTRYPOINT ["openfang"] +ENTRYPOINT ["entrypoint.sh"] CMD ["start"] diff --git a/crates/openfang-api/src/channel_bridge.rs b/crates/openfang-api/src/channel_bridge.rs index 938eea680..b41aa94f7 100644 --- a/crates/openfang-api/src/channel_bridge.rs +++ b/crates/openfang-api/src/channel_bridge.rs @@ -81,6 +81,30 @@ impl ChannelBridgeHandle for KernelBridgeAdapter { Ok(result.response) } + async fn send_message_with_context( + &self, + agent_id: AgentId, + message: &str, + ctx: openfang_channels::bridge::ChannelContext, + ) -> Result { + let result = self + .kernel + .send_message_with_handle( + agent_id, + message, + None, + ctx.sender_id, + ctx.sender_name, + ctx.channel_type, + ) + .await + .map_err(|e| format!("{e}"))?; + if result.silent { + return Ok(String::new()); + } + Ok(result.response) + } + async fn send_message_with_blocks( &self, agent_id: AgentId, @@ -102,9 +126,47 @@ impl ChannelBridgeHandle for KernelBridgeAdapter { }; let result = self .kernel - .send_message_with_blocks(agent_id, &text, blocks) + .send_message_with_blocks(agent_id, &text, blocks, None) + .await + .map_err(|e| format!("{e}"))?; + Ok(result.response) + } + + async fn send_message_with_blocks_and_context( + &self, + agent_id: AgentId, + blocks: Vec, + ctx: openfang_channels::bridge::ChannelContext, + ) -> Result { + let text: String = blocks + .iter() + .filter_map(|b| match b { + openfang_types::message::ContentBlock::Text { text, .. } => Some(text.as_str()), + _ => None, + }) + .collect::>() + .join("\n"); + let text = if text.is_empty() { + "[Image]".to_string() + } else { + text + }; + let result = self + .kernel + .send_message_with_handle_and_blocks( + agent_id, + &text, + None, + Some(blocks), + ctx.sender_id, + ctx.sender_name, + ctx.channel_type, + ) .await .map_err(|e| format!("{e}"))?; + if result.silent { + return Ok(String::new()); + } Ok(result.response) } diff --git a/crates/openfang-api/src/openai_compat.rs b/crates/openfang-api/src/openai_compat.rs index 4fc3d302e..ca8131918 100644 --- a/crates/openfang-api/src/openai_compat.rs +++ b/crates/openfang-api/src/openai_compat.rs @@ -323,7 +323,7 @@ pub async fn chat_completions( let kernel_handle: Arc = state.kernel.clone() as Arc; match state .kernel - .send_message_with_handle(agent_id, &last_user_msg, Some(kernel_handle), None, None) + .send_message_with_handle(agent_id, &last_user_msg, Some(kernel_handle), None, None, None) .await { Ok(result) => { @@ -379,7 +379,7 @@ async fn stream_response( let (mut rx, _handle) = state .kernel - .send_message_streaming(agent_id, message, Some(kernel_handle), None, None) + .send_message_streaming(agent_id, message, Some(kernel_handle), None, None, None, None) .map_err(|e| format!("Streaming setup failed: {e}"))?; let (tx, stream_rx) = tokio::sync::mpsc::channel::>(64); diff --git a/crates/openfang-api/src/routes.rs b/crates/openfang-api/src/routes.rs index 456d867f0..a04640607 100644 --- a/crates/openfang-api/src/routes.rs +++ b/crates/openfang-api/src/routes.rs @@ -241,7 +241,8 @@ pub async fn list_agents(State(state): State>) -> impl IntoRespons /// /// Reads each file from the upload directory, base64-encodes it, and /// returns image content blocks ready to insert into a session message. -pub fn resolve_attachments( +/// File I/O is offloaded to a blocking thread to avoid stalling the async runtime. +pub async fn resolve_attachments( attachments: &[AttachmentRef], ) -> Vec { use base64::Engine; @@ -271,60 +272,26 @@ pub fn resolve_attachments( } let file_path = upload_dir.join(&att.file_id); - match std::fs::read(&file_path) { - Ok(data) => { + match tokio::task::spawn_blocking(move || std::fs::read(file_path)).await { + Ok(Ok(data)) => { let b64 = base64::engine::general_purpose::STANDARD.encode(&data); blocks.push(openfang_types::message::ContentBlock::Image { media_type: content_type, data: b64, }); } - Err(e) => { + Ok(Err(e)) => { tracing::warn!(file_id = %att.file_id, error = %e, "Failed to read upload for attachment"); } + Err(e) => { + tracing::warn!(file_id = %att.file_id, error = %e, "Blocking task panicked reading attachment"); + } } } blocks } -/// Pre-insert image attachments into an agent's session so the LLM can see them. -/// -/// This injects image content blocks into the session BEFORE the kernel -/// adds the text user message, so the LLM receives: [..., User(images), User(text)]. -pub fn inject_attachments_into_session( - kernel: &OpenFangKernel, - agent_id: AgentId, - image_blocks: Vec, -) { - use openfang_types::message::{Message, MessageContent, Role}; - - let entry = match kernel.registry.get(agent_id) { - Some(e) => e, - None => return, - }; - - let mut session = match kernel.memory.get_session(entry.session_id) { - Ok(Some(s)) => s, - _ => openfang_memory::session::Session { - id: entry.session_id, - agent_id, - messages: Vec::new(), - context_window_tokens: 0, - label: None, - }, - }; - - session.messages.push(Message { - role: Role::User, - content: MessageContent::Blocks(image_blocks), - }); - - if let Err(e) = kernel.memory.save_session(&session) { - tracing::warn!(error = %e, "Failed to save session with image attachments"); - } -} - /// POST /api/agents/:id/message — Send a message to an agent. pub async fn send_message( State(state): State>, @@ -358,23 +325,40 @@ pub async fn send_message( ); } - // Resolve file attachments into image content blocks - if !req.attachments.is_empty() { - let image_blocks = resolve_attachments(&req.attachments); - if !image_blocks.is_empty() { - inject_attachments_into_session(&state.kernel, agent_id, image_blocks); + // Resolve file attachments into image content blocks. + // Blocks are passed as transient content for the current turn only — + // they are NOT persisted in session history, avoiding token bloat and + // compaction storms from large base64 payloads. + let content_blocks = if !req.attachments.is_empty() { + let mut blocks = resolve_attachments(&req.attachments).await; + if !blocks.is_empty() { + // Prepend a text block with the user's message so the LLM sees both + blocks.insert( + 0, + openfang_types::message::ContentBlock::Text { + text: req.message.clone(), + provider_metadata: None, + }, + ); + Some(blocks) + } else { + None } - } + } else { + None + }; let kernel_handle: Arc = state.kernel.clone() as Arc; match state .kernel - .send_message_with_handle( + .send_message_with_handle_and_blocks( agent_id, &req.message, Some(kernel_handle), + content_blocks, req.sender_id, req.sender_name, + req.channel_type, ) .await { @@ -1412,6 +1396,8 @@ pub async fn send_message_stream( Some(kernel_handle), req.sender_id, req.sender_name, + None, // no content_blocks for SSE streaming endpoint + req.channel_type, ) { Ok(pair) => pair, Err(e) => { @@ -5720,6 +5706,19 @@ pub async fn patch_agent( } } + if let Some(ids) = body.get("owner_ids").and_then(|v| v.as_array()) { + let owner_ids: Vec = ids + .iter() + .filter_map(|v| v.as_str().map(|s| s.to_string())) + .collect(); + if let Err(e) = state.kernel.registry.update_owner_ids(agent_id, owner_ids) { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": format!("{e}")})), + ); + } + } + // Persist updated entry to SQLite if let Some(entry) = state.kernel.registry.get(agent_id) { let _ = state.kernel.memory.save_agent(&entry); @@ -8515,7 +8514,7 @@ pub async fn run_schedule( let kernel_handle: Arc = state.kernel.clone() as Arc; match state .kernel - .send_message_with_handle(target_agent, &run_message, Some(kernel_handle), None, None) + .send_message_with_handle(target_agent, &run_message, Some(kernel_handle), None, None, None) .await { Ok(result) => ( diff --git a/crates/openfang-api/src/types.rs b/crates/openfang-api/src/types.rs index e607c5b4b..89980ee45 100644 --- a/crates/openfang-api/src/types.rs +++ b/crates/openfang-api/src/types.rs @@ -48,6 +48,9 @@ pub struct MessageRequest { /// Sender display name. #[serde(default)] pub sender_name: Option, + /// Channel type (e.g. "whatsapp", "telegram", "discord", "web"). + #[serde(default)] + pub channel_type: Option, } /// Response from sending a message. diff --git a/crates/openfang-api/src/ws.rs b/crates/openfang-api/src/ws.rs index 8907b979d..7a9d69e26 100644 --- a/crates/openfang-api/src/ws.rs +++ b/crates/openfang-api/src/ws.rs @@ -437,22 +437,26 @@ async fn handle_text_message( return; } - // Resolve file attachments into image content blocks + // Resolve file attachments into transient content blocks. + // Blocks are passed to the streaming call — NOT persisted in session history. let mut has_images = false; + let mut ws_content_blocks: Option> = None; if let Some(attachments) = parsed["attachments"].as_array() { let refs: Vec = attachments .iter() .filter_map(|a| serde_json::from_value(a.clone()).ok()) .collect(); if !refs.is_empty() { - let image_blocks = crate::routes::resolve_attachments(&refs); + let image_blocks = crate::routes::resolve_attachments(&refs).await; if !image_blocks.is_empty() { has_images = true; - crate::routes::inject_attachments_into_session( - &state.kernel, - agent_id, - image_blocks, - ); + // Build combined blocks: text message + images + let mut blocks = vec![openfang_types::message::ContentBlock::Text { + text: content.clone(), + provider_metadata: None, + }]; + blocks.extend(image_blocks); + ws_content_blocks = Some(blocks); } } } @@ -508,6 +512,8 @@ async fn handle_text_message( Some(kernel_handle), None, None, + ws_content_blocks, + Some("web".to_string()), ) { Ok((mut rx, handle)) => { // Forward stream events to WebSocket with debouncing. diff --git a/crates/openfang-channels/src/bridge.rs b/crates/openfang-channels/src/bridge.rs index cf1b383ff..b614378c8 100644 --- a/crates/openfang-channels/src/bridge.rs +++ b/crates/openfang-channels/src/bridge.rs @@ -7,7 +7,7 @@ use crate::formatter; use crate::router::AgentRouter; use crate::types::{ default_phase_emoji, AgentPhase, ChannelAdapter, ChannelContent, ChannelMessage, ChannelUser, - LifecycleReaction, + LifecycleReaction, TypingEvent, }; use async_trait::async_trait; use dashmap::DashMap; @@ -15,11 +15,27 @@ use futures::StreamExt; use openfang_types::agent::AgentId; use openfang_types::config::{ChannelOverrides, DmPolicy, GroupPolicy, OutputFormat}; use openfang_types::message::ContentBlock; +use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::watch; +use tokio::sync::{mpsc, watch}; use tracing::{debug, error, info, warn}; +/// Channel context passed from channel bridges to the kernel alongside messages. +/// +/// Carries the originating channel type and sender identity so the kernel can +/// populate `PromptContext.channel_type` / `sender_id` / `sender_name` and the +/// LLM knows which channel it is responding to. +#[derive(Debug, Clone, Default)] +pub struct ChannelContext { + /// Channel type string (e.g. "whatsapp", "telegram", "discord"). + pub channel_type: Option, + /// Platform-specific sender ID (e.g. phone number, Telegram user ID). + pub sender_id: Option, + /// Human-readable sender display name. + pub sender_name: Option, +} + /// Kernel operations needed by channel adapters. /// /// Defined here to avoid circular deps (openfang-channels can't depend on openfang-kernel). @@ -27,7 +43,17 @@ use tracing::{debug, error, info, warn}; #[async_trait] pub trait ChannelBridgeHandle: Send + Sync { /// Send a message to an agent and get the text response. - async fn send_message(&self, agent_id: AgentId, message: &str) -> Result; + async fn send_message(&self, agent_id: AgentId, message: &str) -> Result { + self.send_message_with_context(agent_id, message, ChannelContext::default()).await + } + + /// Send a message with channel context (channel type, sender identity). + async fn send_message_with_context( + &self, + agent_id: AgentId, + message: &str, + ctx: ChannelContext, + ) -> Result; /// Send a message with structured content blocks (text + images) to an agent. /// @@ -36,6 +62,16 @@ pub trait ChannelBridgeHandle: Send + Sync { &self, agent_id: AgentId, blocks: Vec, + ) -> Result { + self.send_message_with_blocks_and_context(agent_id, blocks, ChannelContext::default()).await + } + + /// Send a message with content blocks and channel context. + async fn send_message_with_blocks_and_context( + &self, + agent_id: AgentId, + blocks: Vec, + ctx: ChannelContext, ) -> Result { // Default: extract text from blocks and send as plain text let text: String = blocks @@ -46,7 +82,7 @@ pub trait ChannelBridgeHandle: Send + Sync { }) .collect::>() .join("\n"); - self.send_message(agent_id, &text).await + self.send_message_with_context(agent_id, &text, ctx).await } /// Find an agent by name, returning its ID. @@ -268,6 +304,338 @@ impl ChannelRateLimiter { } } +// --------------------------------------------------------------------------- +// Message debouncer — batches rapid messages from the same sender +// --------------------------------------------------------------------------- + +/// A buffered message pending dispatch. +struct PendingMessage { + message: ChannelMessage, + /// Content blocks (images) associated with this message, if any. + image_blocks: Option>, +} + +/// Per-sender buffer state. +struct SenderBuffer { + /// Accumulated messages (in arrival order). + messages: Vec, + /// When the first message in this batch arrived. + first_arrived: Instant, + /// Handle to the active debounce timer task (cancelled on new message). + timer_handle: Option>, +} + +/// Batches rapid-fire messages from the same sender before dispatching. +/// +/// When `debounce_ms` > 0, incoming messages are buffered per sender key +/// (`channel_type:platform_id`). A timer starts on the first message; each +/// subsequent message resets the timer. When the timer fires (no new messages +/// for `debounce_ms`) or the safety cap (`debounce_max_ms`) is reached, all +/// buffered messages are merged and dispatched as a single request. +struct MessageDebouncer { + debounce_ms: u64, + debounce_max_ms: u64, + /// Sender for "flush this key" signals. + flush_tx: mpsc::UnboundedSender, +} + +impl MessageDebouncer { + fn new(debounce_ms: u64, debounce_max_ms: u64) -> (Self, mpsc::UnboundedReceiver) { + let (flush_tx, flush_rx) = mpsc::unbounded_channel(); + ( + Self { + debounce_ms, + debounce_max_ms, + flush_tx, + }, + flush_rx, + ) + } + + /// Returns true if debouncing is enabled. + #[allow(dead_code)] + fn is_enabled(&self) -> bool { + self.debounce_ms > 0 + } + + /// Push a message into the buffer and (re)start the debounce timer. + fn push( + &self, + key: &str, + pending: PendingMessage, + buffers: &mut HashMap, + ) { + let debounce_dur = Duration::from_millis(self.debounce_ms); + let max_dur = Duration::from_millis(self.debounce_max_ms); + + let buf = buffers.entry(key.to_string()).or_insert_with(|| SenderBuffer { + messages: Vec::new(), + first_arrived: Instant::now(), + timer_handle: None, + }); + + buf.messages.push(pending); + + // Cancel any existing timer + if let Some(handle) = buf.timer_handle.take() { + handle.abort(); + } + + // Check safety cap — if we've waited long enough, flush immediately + let elapsed = buf.first_arrived.elapsed(); + if elapsed >= max_dur { + debug!( + sender = key, + "Debounce safety cap reached ({:.1}s), flushing immediately", + elapsed.as_secs_f64() + ); + let _ = self.flush_tx.send(key.to_string()); + return; + } + + // Start a new timer + let remaining_cap = max_dur.saturating_sub(elapsed); + let delay = debounce_dur.min(remaining_cap); + let flush_tx = self.flush_tx.clone(); + let flush_key = key.to_string(); + buf.timer_handle = Some(tokio::spawn(async move { + tokio::time::sleep(delay).await; + let _ = flush_tx.send(flush_key); + })); + } + + /// Handle a typing indicator event. + /// + /// When `is_typing: true` and the sender has buffered messages, cancel the + /// debounce timer (don't flush yet — user is still composing). When + /// `is_typing: false`, restart the normal debounce timer. The safety cap + /// (`debounce_max_ms`) always applies regardless of typing state. + fn on_typing( + &self, + key: &str, + is_typing: bool, + buffers: &mut HashMap, + ) { + let Some(buf) = buffers.get_mut(key) else { + return; // No buffered messages for this sender — ignore + }; + + // Safety cap always applies + let max_dur = Duration::from_millis(self.debounce_max_ms); + let elapsed = buf.first_arrived.elapsed(); + if elapsed >= max_dur { + debug!( + sender = key, + "Debounce safety cap reached during typing ({:.1}s), flushing", + elapsed.as_secs_f64() + ); + let _ = self.flush_tx.send(key.to_string()); + return; + } + + if is_typing { + // User is typing — cancel any running timer, wait for more messages + if let Some(handle) = buf.timer_handle.take() { + handle.abort(); + debug!(sender = key, "Typing detected, debounce timer paused"); + } + } else { + // User stopped typing — restart the debounce timer + if let Some(handle) = buf.timer_handle.take() { + handle.abort(); + } + let remaining_cap = max_dur.saturating_sub(elapsed); + let delay = Duration::from_millis(self.debounce_ms).min(remaining_cap); + let flush_tx = self.flush_tx.clone(); + let flush_key = key.to_string(); + buf.timer_handle = Some(tokio::spawn(async move { + tokio::time::sleep(delay).await; + let _ = flush_tx.send(flush_key); + })); + debug!(sender = key, "Typing stopped, debounce timer restarted ({delay:?})"); + } + } + + /// Drain a sender's buffer and merge messages into a single dispatch payload. + /// + /// Returns (merged text parts, merged content blocks, first ChannelMessage as context). + fn drain( + &self, + key: &str, + buffers: &mut HashMap, + ) -> Option<(ChannelMessage, Option>)> { + let buf = buffers.remove(key)?; + if buf.messages.is_empty() { + return None; + } + + let count = buf.messages.len(); + let mut messages = buf.messages; + + // Single message — no merging needed + if count == 1 { + let pm = messages.remove(0); + return Some((pm.message, pm.image_blocks)); + } + + debug!( + sender = key, + count, "Merging {count} debounced messages into single dispatch" + ); + + // Use the first message as the "representative" for routing/metadata + let first = messages.remove(0); + let mut merged_msg = first.message; + let mut all_blocks: Vec = Vec::new(); + + // Collect blocks from the first message + if let Some(blocks) = first.image_blocks { + all_blocks.extend(blocks); + } + + // Extract text from the first message's content + let mut text_parts: Vec = Vec::new(); + text_parts.push(content_to_text(&merged_msg.content)); + + // Merge remaining messages + for pm in messages { + text_parts.push(content_to_text(&pm.message.content)); + if let Some(blocks) = pm.image_blocks { + all_blocks.extend(blocks); + } + } + + // Replace content with merged text + let merged_text = text_parts.join("\n"); + merged_msg.content = ChannelContent::Text(merged_text); + + let blocks = if all_blocks.is_empty() { + None + } else { + Some(all_blocks) + }; + + Some((merged_msg, blocks)) + } +} + +/// Extract text representation from any ChannelContent variant. +fn content_to_text(content: &ChannelContent) -> String { + match content { + ChannelContent::Text(t) => t.clone(), + ChannelContent::Command { name, args } => { + if args.is_empty() { + format!("/{name}") + } else { + format!("/{name} {}", args.join(" ")) + } + } + ChannelContent::Image { url, caption } => match caption { + Some(c) => format!("[Photo: {url}]\n{c}"), + None => format!("[Photo: {url}]"), + }, + ChannelContent::File { url, filename } => format!("[File ({filename}): {url}]"), + ChannelContent::Voice { url, duration_seconds } => { + format!("[Voice message ({duration_seconds}s): {url}]") + } + ChannelContent::Location { lat, lon } => format!("[Location: {lat}, {lon}]"), + ChannelContent::FileData { filename, .. } => format!("[File: {filename}]"), + } +} + +/// Drain a sender's debounce buffer and spawn a dispatch task for the merged message. +/// +/// Called when the debounce timer fires (sender stopped typing) or the safety cap is +/// reached. Drains all buffered messages for `key`, merges them, and dispatches the +/// result as a single request — either via `dispatch_with_blocks` (if images were +/// pre-downloaded) or via `dispatch_message` (text-only). +#[allow(clippy::too_many_arguments)] +fn flush_debounced( + debouncer: &MessageDebouncer, + key: &str, + buffers: &mut HashMap, + handle: &Arc, + router: &Arc, + adapter: &Arc, + rate_limiter: &ChannelRateLimiter, + semaphore: &Arc, +) { + let Some((merged_msg, blocks)) = debouncer.drain(key, buffers) else { + return; + }; + + let handle = handle.clone(); + let router = router.clone(); + let adapter = adapter.clone(); + let rate_limiter = rate_limiter.clone(); + let sem = semaphore.clone(); + + tokio::spawn(async move { + let _permit = match sem.acquire().await { + Ok(p) => p, + Err(_) => return, + }; + + if let Some(mut blocks) = blocks { + // Has pre-downloaded image blocks — build combined block list with text. + // Prepend any text content so the LLM sees the user's messages alongside images. + let text = content_to_text(&merged_msg.content); + if !text.is_empty() { + blocks.insert( + 0, + ContentBlock::Text { + text, + provider_metadata: None, + }, + ); + } + + let ct_str = channel_type_str(&merged_msg.channel); + let overrides = handle.channel_overrides(ct_str).await; + let channel_default_format = default_output_format_for_channel(ct_str); + let output_format = overrides + .as_ref() + .and_then(|o| o.output_format) + .unwrap_or(channel_default_format); + let threading_enabled = overrides.as_ref().map(|o| o.threading).unwrap_or(false); + let lifecycle_reactions = overrides + .as_ref() + .map(|o| o.lifecycle_reactions) + .unwrap_or(true); + let thread_id = if threading_enabled { + merged_msg.thread_id.as_deref() + } else { + None + }; + + dispatch_with_blocks( + blocks, + &merged_msg, + &handle, + &router, + adapter.as_ref(), + &adapter, + ct_str, + thread_id, + output_format, + lifecycle_reactions, + ) + .await; + } else { + // Text-only — use standard dispatch (handles commands, routing, etc.) + dispatch_message( + &merged_msg, + &handle, + &router, + adapter.as_ref(), + &adapter, + &rate_limiter, + ) + .await; + } + }); +} + /// Owns all running channel adapters and dispatches messages to agents. pub struct BridgeManager { handle: Arc, @@ -304,6 +672,11 @@ impl BridgeManager { /// begin processing immediately. Per-agent serialization (to prevent session /// corruption) is handled by the kernel's `agent_msg_locks`. /// + /// When debounce is enabled (per-channel `debounce_ms > 0`), rapid messages + /// from the same sender are buffered and merged into a single dispatch. This + /// prevents multiple conflicting responses when a user sends a burst of + /// messages (common on Telegram/WhatsApp where images are separate messages). + /// /// A semaphore limits concurrent dispatch tasks to prevent unbounded memory /// growth under burst traffic. pub async fn start_adapter( @@ -321,47 +694,186 @@ impl BridgeManager { // 32 is generous — most setups have 1-5 concurrent users. let semaphore = Arc::new(tokio::sync::Semaphore::new(32)); + // Fetch debounce config for this channel type + let ct_str = channel_type_str(&adapter.channel_type()).to_string(); + let overrides = handle.channel_overrides(&ct_str).await; + let debounce_ms = overrides.as_ref().map(|o| o.debounce_ms).unwrap_or(5_000); + let debounce_max_ms = overrides + .as_ref() + .map(|o| o.debounce_max_ms) + .unwrap_or(30_000); + + if debounce_ms > 0 { + info!( + channel = %ct_str, + debounce_ms, debounce_max_ms, "Message debouncing enabled" + ); + } + let task = tokio::spawn(async move { let mut stream = std::pin::pin!(stream); - loop { - tokio::select! { - msg = stream.next() => { - match msg { - Some(message) => { - // Spawn each dispatch as a concurrent task so the stream - // loop is never blocked by slow LLM calls. The kernel's - // per-agent lock ensures session integrity. - let handle = handle.clone(); - let router = router.clone(); - let adapter = adapter_clone.clone(); - let rate_limiter = rate_limiter.clone(); - let sem = semaphore.clone(); - tokio::spawn(async move { - // Acquire semaphore permit (blocks if 32 tasks are in flight). - let _permit = match sem.acquire().await { - Ok(p) => p, - Err(_) => return, // semaphore closed — shutting down - }; - dispatch_message( - &message, - &handle, - &router, - adapter.as_ref(), - &adapter, - &rate_limiter, - ).await; - }); + + if debounce_ms == 0 { + // --- Fast path: no debouncing, immediate dispatch --- + loop { + tokio::select! { + msg = stream.next() => { + match msg { + Some(message) => { + let handle = handle.clone(); + let router = router.clone(); + let adapter = adapter_clone.clone(); + let rate_limiter = rate_limiter.clone(); + let sem = semaphore.clone(); + tokio::spawn(async move { + let _permit = match sem.acquire().await { + Ok(p) => p, + Err(_) => return, + }; + dispatch_message( + &message, + &handle, + &router, + adapter.as_ref(), + &adapter, + &rate_limiter, + ).await; + }); + } + None => { + info!("Channel adapter {} stream ended", adapter_clone.name()); + break; + } } - None => { - info!("Channel adapter {} stream ended", adapter_clone.name()); + } + _ = shutdown.changed() => { + if *shutdown.borrow() { + info!("Shutting down channel adapter {}", adapter_clone.name()); break; } } } - _ = shutdown.changed() => { - if *shutdown.borrow() { - info!("Shutting down channel adapter {}", adapter_clone.name()); - break; + } + } else { + // --- Debounce path: buffer messages per sender, flush on timer --- + let (debouncer, mut flush_rx) = + MessageDebouncer::new(debounce_ms, debounce_max_ms); + let mut buffers: HashMap = HashMap::new(); + + // Subscribe to typing events if the adapter supports them + let mut typing_rx = adapter_clone.typing_events(); + let has_typing = typing_rx.is_some(); + if has_typing { + info!( + channel = %ct_str, + "Typing-aware debounce enabled" + ); + } + + loop { + // Use a helper macro-like approach: if typing_rx is None, + // the typing branch will never fire (recv on None -> pending forever). + tokio::select! { + msg = stream.next() => { + match msg { + Some(message) => { + let sender_key = format!( + "{}:{}", + channel_type_str(&message.channel), + message.sender.platform_id + ); + + // Try to download image blocks immediately (so the + // download happens in parallel with the debounce wait). + let image_blocks = if let ChannelContent::Image { + ref url, + ref caption, + } = message.content + { + let blocks = + download_image_to_blocks(url, caption.as_deref()) + .await; + if blocks + .iter() + .any(|b| matches!(b, ContentBlock::Image { .. })) + { + Some(blocks) + } else { + None // download failed, will fall back to text + } + } else { + None + }; + + let pending = PendingMessage { + message, + image_blocks, + }; + + debug!( + sender = %sender_key, + "Buffering message (debounce {}ms)", + debounce_ms + ); + debouncer.push(&sender_key, pending, &mut buffers); + } + None => { + // Stream ended — flush all remaining buffers + let keys: Vec = + buffers.keys().cloned().collect(); + for key in keys { + flush_debounced( + &debouncer, + &key, + &mut buffers, + &handle, + &router, + &adapter_clone, + &rate_limiter, + &semaphore, + ); + } + info!( + "Channel adapter {} stream ended", + adapter_clone.name() + ); + break; + } + } + } + Some(event) = async { + match typing_rx.as_mut() { + Some(rx) => rx.recv().await, + None => std::future::pending::>().await, + } + } => { + let sender_key = format!( + "{}:{}", + channel_type_str(&event.channel), + event.sender.platform_id + ); + debouncer.on_typing(&sender_key, event.is_typing, &mut buffers); + } + Some(key) = flush_rx.recv() => { + flush_debounced( + &debouncer, + &key, + &mut buffers, + &handle, + &router, + &adapter_clone, + &rate_limiter, + &semaphore, + ); + } + _ = shutdown.changed() => { + if *shutdown.borrow() { + info!( + "Shutting down channel adapter {}", + adapter_clone.name() + ); + break; + } } } } @@ -890,8 +1402,13 @@ async fn dispatch_message( // (which expire typing after ~5s) keep showing it during long LLM calls. let typing_task = spawn_typing_loop(adapter_arc.clone(), message.sender.clone()); - // Send to agent and relay response - let result = handle.send_message(agent_id, &text).await; + // Send to agent and relay response (with channel context for routing) + let ch_ctx = ChannelContext { + channel_type: Some(ct_str.to_string()), + sender_id: Some(sender_user_id(message).to_string()), + sender_name: Some(message.sender.display_name.clone()), + }; + let result = handle.send_message_with_context(agent_id, &text, ch_ctx).await; // Stop the typing refresh now that we have a response typing_task.abort(); @@ -917,7 +1434,12 @@ async fn dispatch_message( // Try re-resolution before reporting error if let Some(new_id) = try_reresolution(&e, &channel_key, handle, router).await { let typing_task2 = spawn_typing_loop(adapter_arc.clone(), message.sender.clone()); - let retry = handle.send_message(new_id, &text).await; + let ch_ctx2 = ChannelContext { + channel_type: Some(ct_str.to_string()), + sender_id: Some(sender_user_id(message).to_string()), + sender_name: Some(message.sender.display_name.clone()), + }; + let retry = handle.send_message_with_context(new_id, &text, ch_ctx2).await; typing_task2.abort(); match retry { Ok(response) => { @@ -1287,8 +1809,13 @@ async fn dispatch_with_blocks( // Continuous typing indicator (see spawn_typing_loop doc) let typing_task = spawn_typing_loop(adapter_arc.clone(), message.sender.clone()); + let ch_ctx = ChannelContext { + channel_type: Some(ct_str.to_string()), + sender_id: Some(sender_user_id(message).to_string()), + sender_name: Some(message.sender.display_name.clone()), + }; let result = handle - .send_message_with_blocks(agent_id, blocks.clone()) + .send_message_with_blocks_and_context(agent_id, blocks.clone(), ch_ctx) .await; typing_task.abort(); @@ -1693,7 +2220,12 @@ mod tests { #[async_trait] impl ChannelBridgeHandle for MockHandle { - async fn send_message(&self, _agent_id: AgentId, message: &str) -> Result { + async fn send_message_with_context( + &self, + _agent_id: AgentId, + message: &str, + _ctx: ChannelContext, + ) -> Result { Ok(format!("Echo: {message}")) } async fn find_agent_by_name(&self, name: &str) -> Result, String> { @@ -1978,4 +2510,122 @@ mod tests { "image/jpeg" ); } + + /// Helper to create a minimal PendingMessage for debouncer tests. + fn test_pending(text: &str) -> PendingMessage { + PendingMessage { + message: ChannelMessage { + channel: ChannelType::Telegram, + platform_message_id: "1".to_string(), + sender: ChannelUser { + platform_id: "user1".to_string(), + display_name: "Test".to_string(), + openfang_user: None, + }, + content: ChannelContent::Text(text.to_string()), + target_agent: None, + timestamp: chrono::Utc::now(), + is_group: false, + thread_id: None, + metadata: HashMap::new(), + }, + image_blocks: None, + } + } + + #[tokio::test] + async fn test_on_typing_pauses_timer() { + let (debouncer, mut flush_rx) = MessageDebouncer::new(200, 30_000); + let mut buffers: HashMap = HashMap::new(); + let key = "telegram:user1"; + + // Push a message — starts a 200ms timer + debouncer.push(key, test_pending("hello"), &mut buffers); + assert!(buffers.contains_key(key)); + assert!(buffers[key].timer_handle.is_some()); + + // Typing event arrives — should cancel the timer + debouncer.on_typing(key, true, &mut buffers); + assert!(buffers[key].timer_handle.is_none()); + + // Wait longer than debounce_ms — should NOT flush because typing paused it + tokio::time::sleep(Duration::from_millis(300)).await; + assert!(flush_rx.try_recv().is_err()); // No flush signal + + // Typing stops — should restart the timer + debouncer.on_typing(key, false, &mut buffers); + assert!(buffers[key].timer_handle.is_some()); + + // Now the timer should fire after debounce_ms + tokio::time::sleep(Duration::from_millis(250)).await; + let flushed_key = flush_rx.try_recv().unwrap(); + assert_eq!(flushed_key, key); + } + + #[tokio::test] + async fn test_on_typing_safety_cap_fires() { + // Safety cap of 100ms — typing should NOT prevent flush past the cap + let (debouncer, mut flush_rx) = MessageDebouncer::new(50, 100); + let mut buffers: HashMap = HashMap::new(); + let key = "telegram:user1"; + + debouncer.push(key, test_pending("hello"), &mut buffers); + + // Start typing (pauses timer) + debouncer.on_typing(key, true, &mut buffers); + + // Wait until safety cap is exceeded + tokio::time::sleep(Duration::from_millis(120)).await; + + // Another typing event should trigger safety cap flush + debouncer.on_typing(key, true, &mut buffers); + + let flushed_key = flush_rx.try_recv().unwrap(); + assert_eq!(flushed_key, key); + } + + #[tokio::test] + async fn test_on_typing_no_buffered_messages_ignored() { + let (debouncer, mut flush_rx) = MessageDebouncer::new(200, 30_000); + let mut buffers: HashMap = HashMap::new(); + let key = "telegram:user1"; + + // Typing event with no buffered messages — should be a no-op + debouncer.on_typing(key, true, &mut buffers); + debouncer.on_typing(key, false, &mut buffers); + + tokio::time::sleep(Duration::from_millis(50)).await; + assert!(flush_rx.try_recv().is_err()); + assert!(!buffers.contains_key(key)); + } + + #[tokio::test] + async fn test_on_typing_message_after_typing_stop_restarts_timer() { + let (debouncer, mut flush_rx) = MessageDebouncer::new(200, 30_000); + let mut buffers: HashMap = HashMap::new(); + let key = "telegram:user1"; + + // Push message, typing starts, typing stops + debouncer.push(key, test_pending("first"), &mut buffers); + debouncer.on_typing(key, true, &mut buffers); + debouncer.on_typing(key, false, &mut buffers); + + // Push another message after typing stopped — should reset the timer + tokio::time::sleep(Duration::from_millis(50)).await; + debouncer.push(key, test_pending("second"), &mut buffers); + + // Wait for debounce + tokio::time::sleep(Duration::from_millis(250)).await; + let flushed_key = flush_rx.try_recv().unwrap(); + assert_eq!(flushed_key, key); + + // Drain should contain both messages + let (merged, _) = debouncer.drain(key, &mut buffers).unwrap(); + if let ChannelContent::Text(text) = &merged.content { + assert!(text.contains("first")); + assert!(text.contains("second")); + } else { + panic!("Expected merged text content"); + } + } } diff --git a/crates/openfang-channels/src/telegram.rs b/crates/openfang-channels/src/telegram.rs index 670d038ee..450e28794 100644 --- a/crates/openfang-channels/src/telegram.rs +++ b/crates/openfang-channels/src/telegram.rs @@ -5,7 +5,7 @@ use crate::types::{ split_message, ChannelAdapter, ChannelContent, ChannelMessage, ChannelType, ChannelUser, - LifecycleReaction, + LifecycleReaction, TypingEvent, }; use async_trait::async_trait; use futures::Stream; @@ -26,6 +26,9 @@ const LONG_POLL_TIMEOUT: u64 = 30; /// Default Telegram Bot API base URL. const DEFAULT_API_URL: &str = "https://api.telegram.org"; +/// Telegram typing indicators auto-expire after ~5 seconds on the client side. +/// We emit `is_typing: false` after 6 seconds if no new typing event is received. +const TYPING_EXPIRE_SECS: u64 = 6; /// Telegram Bot API adapter using long-polling. pub struct TelegramAdapter { @@ -41,6 +44,15 @@ pub struct TelegramAdapter { bot_username: Arc>>, shutdown_tx: Arc>, shutdown_rx: watch::Receiver, + /// Sender for typing events. The bridge calls `typing_events()` to get the + /// receiver end. Populated lazily — `None` until `typing_events()` is called. + /// + /// NOTE: The Telegram Bot API does not expose user typing status (`updateUserTyping` + /// is only available via MTProto/TDLib). This channel is used when the adapter detects + /// `sendChatAction` typing updates (available in some bot contexts), or can be wired + /// to a TDLib backend in the future. + typing_tx: Arc>>>, + typing_rx: Arc>>>, } impl TelegramAdapter { @@ -60,6 +72,7 @@ impl TelegramAdapter { .unwrap_or_else(|| DEFAULT_API_URL.to_string()) .trim_end_matches('/') .to_string(); + let (typing_tx, typing_rx) = mpsc::channel(64); Self { token: Zeroizing::new(token), client: reqwest::Client::new(), @@ -69,6 +82,8 @@ impl TelegramAdapter { bot_username: Arc::new(tokio::sync::RwLock::new(None)), shutdown_tx: Arc::new(shutdown_tx), shutdown_rx, + typing_tx: Arc::new(tokio::sync::Mutex::new(Some(typing_tx))), + typing_rx: Arc::new(tokio::sync::Mutex::new(Some(typing_rx))), } } @@ -447,10 +462,15 @@ impl ChannelAdapter for TelegramAdapter { let api_base_url = self.api_base_url.clone(); let bot_username = self.bot_username.clone(); let mut shutdown = self.shutdown_rx.clone(); + let typing_tx = self.typing_tx.lock().await.take(); tokio::spawn(async move { let mut offset: Option = None; let mut backoff = INITIAL_BACKOFF; + // Track active typing expiration timers per chat_id so we can auto-emit + // is_typing: false after TYPING_EXPIRE_SECS (Telegram typing indicators + // auto-expire on the client side after ~5s). + let mut typing_timers: HashMap> = HashMap::new(); loop { // Check shutdown @@ -462,7 +482,7 @@ impl ChannelAdapter for TelegramAdapter { let url = format!("{}/bot{}/getUpdates", api_base_url, token.as_str()); let mut params = serde_json::json!({ "timeout": LONG_POLL_TIMEOUT, - "allowed_updates": ["message", "edited_message"], + "allowed_updates": ["message", "edited_message", "chat_member"], }); if let Some(off) = offset { params["offset"] = serde_json::json!(off); @@ -557,6 +577,61 @@ impl ChannelAdapter for TelegramAdapter { offset = Some(update_id + 1); } + // Detect user typing action from `chat_member` or action-based updates. + // NOTE: The Telegram Bot API does not expose `updateUserTyping` (that's + // MTProto/TDLib only). However, if the adapter is extended to use TDLib, + // or if Telegram adds this to the Bot API in the future, the infrastructure + // is ready. For now, we use a heuristic: when a message arrives, we + // cancel any active typing timer for that sender (they finished typing). + if let Some(action) = update.get("chat_action") + .or_else(|| update.get("sender_chat_action")) + { + if let Some(typing_tx) = &typing_tx { + let chat_id = action["chat"]["id"].as_i64().unwrap_or(0); + let user_id = action["from"]["id"].as_i64().unwrap_or(0); + let action_type = action["action"].as_str().unwrap_or(""); + let is_typing = action_type == "typing"; + + let sender_key = chat_id.to_string(); + let event = TypingEvent { + channel: ChannelType::Telegram, + sender: ChannelUser { + platform_id: sender_key.clone(), + display_name: user_id.to_string(), + openfang_user: None, + }, + is_typing, + }; + + let _ = typing_tx.try_send(event); + + if is_typing { + // Auto-expire after TYPING_EXPIRE_SECS + if let Some(old) = typing_timers.remove(&sender_key) { + old.abort(); + } + let expire_tx = typing_tx.clone(); + let expire_key = sender_key.clone(); + typing_timers.insert(sender_key, tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(TYPING_EXPIRE_SECS)).await; + let _ = expire_tx.try_send(TypingEvent { + channel: ChannelType::Telegram, + sender: ChannelUser { + platform_id: expire_key, + display_name: String::new(), + openfang_user: None, + }, + is_typing: false, + }); + })); + } else if let Some(old) = typing_timers.remove(&sender_key) { + old.abort(); + } + } + // chat_action updates have no message — continue to next update + continue; + } + // Parse the message let bot_uname = bot_username.read().await.clone(); let msg = match parse_telegram_update( @@ -573,6 +648,21 @@ impl ChannelAdapter for TelegramAdapter { None => continue, // filtered out or unparseable }; + // When a message arrives, cancel any typing expiry timer for this sender. + // The message itself acts as an implicit "stopped typing" signal. + if let Some(typing_tx) = &typing_tx { + let sender_key = msg.sender.platform_id.clone(); + if let Some(timer) = typing_timers.remove(&sender_key) { + timer.abort(); + // Emit is_typing: false so the debouncer restarts its timer + let _ = typing_tx.try_send(TypingEvent { + channel: ChannelType::Telegram, + sender: msg.sender.clone(), + is_typing: false, + }); + } + } + debug!( "Telegram message from {}: {:?}", msg.sender.display_name, msg.content @@ -638,6 +728,11 @@ impl ChannelAdapter for TelegramAdapter { Ok(()) } + fn typing_events(&self) -> Option> { + // Take the receiver (can only be called once — subsequent calls return None). + self.typing_rx.try_lock().ok().and_then(|mut rx| rx.take()) + } + async fn stop(&self) -> Result<(), Box> { let _ = self.shutdown_tx.send(true); Ok(()) @@ -1859,4 +1954,49 @@ mod tests { } assert!(!msg.metadata.contains_key("reply_to_message_id")); } + + #[test] + fn test_typing_event_stream_returns_once() { + let rt = tokio::runtime::Runtime::new().unwrap(); + let adapter = rt.block_on(async { + TelegramAdapter::new( + "fake:token".to_string(), + vec![], + Duration::from_millis(100), + None, + ) + }); + + // First call should return Some + let rx = adapter.typing_events(); + assert!(rx.is_some()); + + // Second call should return None (receiver already taken) + let rx2 = adapter.typing_events(); + assert!(rx2.is_none()); + } + + #[tokio::test] + async fn test_typing_tx_sends_events() { + let adapter = TelegramAdapter::new( + "fake:token".to_string(), + vec![], + Duration::from_millis(100), + None, + ); + + // Take the receiver + let mut rx = adapter.typing_events().unwrap(); + + // Get the tx from the adapter's internal state + let tx = adapter.typing_tx.lock().await; + // tx was taken during new(), but we can test via the channel we got + drop(tx); + + // The typing_tx was moved into the adapter; to test the full flow we'd + // need to call start() which spawns the polling loop. Instead, verify + // the channel is valid by checking that the receiver is functional. + // Drop the receiver to confirm no panic. + rx.close(); + } } diff --git a/crates/openfang-channels/src/types.rs b/crates/openfang-channels/src/types.rs index 4abfed380..96f615bad 100644 --- a/crates/openfang-channels/src/types.rs +++ b/crates/openfang-channels/src/types.rs @@ -8,6 +8,7 @@ use std::pin::Pin; use async_trait::async_trait; use futures::Stream; +use tokio::sync::mpsc; /// The type of messaging channel. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -95,6 +96,22 @@ pub struct ChannelMessage { pub metadata: HashMap, } +/// A typing indicator event from a channel. +/// +/// Channels that support presence detection (e.g. WhatsApp via Baileys `presence.update`, +/// Matrix typing notifications, XMPP chat states) can emit these events to signal that +/// a user is composing a message. The debouncer uses these to pause its flush timer — +/// avoiding premature dispatch while the user is still typing. +#[derive(Debug, Clone)] +pub struct TypingEvent { + /// Which channel this came from. + pub channel: ChannelType, + /// Who is typing. + pub sender: ChannelUser, + /// `true` = user started typing, `false` = user stopped typing (or timed out). + pub is_typing: bool, +} + /// Agent lifecycle phase for UX indicators. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "snake_case")] @@ -251,6 +268,15 @@ pub trait ChannelAdapter: Send + Sync { Ok(()) } + /// Subscribe to typing indicator events (optional — default returns None). + /// + /// Adapters that support presence detection (e.g. Telegram `ChatAction::Typing`, + /// WhatsApp `presence.update`) should return a receiver that emits [`TypingEvent`]s. + /// The bridge uses these to pause the debounce timer while the user is composing. + fn typing_events(&self) -> Option> { + None + } + /// Stop the adapter and clean up resources. async fn stop(&self) -> Result<(), Box>; diff --git a/crates/openfang-channels/src/whatsapp.rs b/crates/openfang-channels/src/whatsapp.rs index 9f45fcd46..ec7ef8a72 100644 --- a/crates/openfang-channels/src/whatsapp.rs +++ b/crates/openfang-channels/src/whatsapp.rs @@ -315,6 +315,15 @@ impl ChannelAdapter for WhatsAppAdapter { Ok(()) } + // TODO(#728): Emit TypingEvent for WhatsApp presence detection. + // The Baileys-based WhatsApp Web gateway can expose `presence.update` events + // with `composing`/`paused` states. When the gateway adds a `/presence/subscribe` + // or WebSocket stream for presence events, implement `typing_events()` here to + // return an mpsc::Receiver that maps: + // - `composing` → TypingEvent { is_typing: true, ... } + // - `paused` → TypingEvent { is_typing: false, ... } + // The Cloud API does not support reading user presence/typing status. + async fn stop(&self) -> Result<(), Box> { let _ = self.shutdown_tx.send(true); Ok(()) diff --git a/crates/openfang-channels/tests/bridge_integration_test.rs b/crates/openfang-channels/tests/bridge_integration_test.rs index e4c647766..d4f2e55f8 100644 --- a/crates/openfang-channels/tests/bridge_integration_test.rs +++ b/crates/openfang-channels/tests/bridge_integration_test.rs @@ -9,7 +9,7 @@ use async_trait::async_trait; use futures::Stream; -use openfang_channels::bridge::{BridgeManager, ChannelBridgeHandle}; +use openfang_channels::bridge::{BridgeManager, ChannelBridgeHandle, ChannelContext}; use openfang_channels::router::AgentRouter; use openfang_channels::types::{ ChannelAdapter, ChannelContent, ChannelMessage, ChannelType, ChannelUser, @@ -122,7 +122,12 @@ impl MockHandle { #[async_trait] impl ChannelBridgeHandle for MockHandle { - async fn send_message(&self, agent_id: AgentId, message: &str) -> Result { + async fn send_message_with_context( + &self, + agent_id: AgentId, + message: &str, + _ctx: ChannelContext, + ) -> Result { self.received .lock() .unwrap() diff --git a/crates/openfang-cli/src/tui/event.rs b/crates/openfang-cli/src/tui/event.rs index ce5fe8414..5184cda6f 100644 --- a/crates/openfang-cli/src/tui/event.rs +++ b/crates/openfang-cli/src/tui/event.rs @@ -303,7 +303,7 @@ pub fn spawn_inprocess_stream( // send_message_streaming() finds the reactor. let _guard = rt.enter(); - match kernel.send_message_streaming(agent_id, &message, None, None, None) { + match kernel.send_message_streaming(agent_id, &message, None, None, None, None, None) { Ok((mut rx, handle)) => { rt.block_on(async { while let Some(ev) = rx.recv().await { diff --git a/crates/openfang-kernel/src/kernel.rs b/crates/openfang-kernel/src/kernel.rs index 5e582d048..a7f17d2a6 100644 --- a/crates/openfang-kernel/src/kernel.rs +++ b/crates/openfang-kernel/src/kernel.rs @@ -1085,7 +1085,9 @@ impl OpenFangKernel { || disk_manifest.model.model != entry.manifest.model.model || disk_manifest.capabilities.tools - != entry.manifest.capabilities.tools; + != entry.manifest.capabilities.tools + || disk_manifest.owner_ids + != entry.manifest.owner_ids; if changed { info!( agent = %name, @@ -1438,7 +1440,7 @@ impl OpenFangKernel { .get() .and_then(|w| w.upgrade()) .map(|arc| arc as Arc); - self.send_message_with_handle(agent_id, message, handle, None, None) + self.send_message_with_handle(agent_id, message, handle, None, None, None) .await } @@ -1451,6 +1453,7 @@ impl OpenFangKernel { agent_id: AgentId, message: &str, blocks: Vec, + channel_type: Option, ) -> KernelResult { let handle: Option> = self .self_handle @@ -1464,6 +1467,7 @@ impl OpenFangKernel { Some(blocks), None, None, + channel_type, ) .await } @@ -1476,6 +1480,7 @@ impl OpenFangKernel { kernel_handle: Option>, sender_id: Option, sender_name: Option, + channel_type: Option, ) -> KernelResult { self.send_message_with_handle_and_blocks( agent_id, @@ -1484,6 +1489,7 @@ impl OpenFangKernel { None, sender_id, sender_name, + channel_type, ) .await } @@ -1505,6 +1511,7 @@ impl OpenFangKernel { content_blocks: Option>, sender_id: Option, sender_name: Option, + channel_type: Option, ) -> KernelResult { // Acquire per-agent lock to serialize concurrent messages for the same agent. // This prevents session corruption when multiple messages arrive in quick @@ -1542,6 +1549,7 @@ impl OpenFangKernel { content_blocks, sender_id, sender_name, + channel_type, ) .await }; @@ -1599,6 +1607,8 @@ impl OpenFangKernel { kernel_handle: Option>, sender_id: Option, sender_name: Option, + content_blocks: Option>, + channel_type: Option, ) -> KernelResult<( tokio::sync::mpsc::Receiver, tokio::task::JoinHandle>, @@ -1621,8 +1631,14 @@ impl OpenFangKernel { let kernel_clone = Arc::clone(self); let message_owned = message.to_string(); let entry_clone = entry.clone(); + let wasm_lock = self + .agent_msg_locks + .entry(agent_id) + .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))) + .clone(); let handle = tokio::spawn(async move { + let _guard = wasm_lock.lock().await; let result = if is_wasm { kernel_clone .execute_wasm_agent(&entry_clone, &message_owned, kernel_handle) @@ -1667,7 +1683,7 @@ impl OpenFangKernel { } // LLM agent: true streaming via agent loop - let mut session = self + let session = self .memory .get_session(entry.session_id) .map_err(KernelError::OpenFang)? @@ -1680,7 +1696,7 @@ impl OpenFangKernel { }); // Check if auto-compaction is needed: message-count OR token-count OR quota-headroom trigger - let needs_compact = { + let _needs_compact = { use openfang_runtime::compactor::{ estimate_token_count, needs_compaction as check_compact, needs_compaction_by_tokens, CompactionConfig, @@ -1802,7 +1818,7 @@ impl OpenFangKernel { .ok() .and_then(|(s, _)| s), user_name, - channel_type: None, + channel_type: channel_type.clone(), is_subagent: manifest .metadata .get("is_subagent") @@ -1842,6 +1858,7 @@ impl OpenFangKernel { ), sender_id, sender_name, + owner_ids: manifest.owner_ids.clone(), }; manifest.model.system_prompt = openfang_runtime::prompt_builder::build_system_prompt(&prompt_ctx); @@ -1868,7 +1885,53 @@ impl OpenFangKernel { }; let kernel_clone = Arc::clone(self); + // Acquire per-agent lock inside the spawned task to serialize concurrent + // messages for the same agent. Without this, a streaming call (WebSocket) + // and a non-streaming call (channel bridge) can run in parallel, corrupting + // the session and causing cross-channel response delivery. + let agent_lock = self + .agent_msg_locks + .entry(agent_id) + .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))) + .clone(); + + let session_id = session.id; let handle = tokio::spawn(async move { + let _guard = agent_lock.lock().await; + + // Reload the session under the lock to pick up any messages that were + // processed while we were waiting (prevents stale-session races between + // concurrent WebSocket/streaming and channel-bridge/non-streaming calls + // that could cause session corruption and cross-channel response delivery). + let mut session = memory + .get_session(session_id) + .unwrap_or_else(|_| Some(session.clone())) + .unwrap_or(session); + + // Re-evaluate compaction need with the fresh session + let needs_compact = { + use openfang_runtime::compactor::{ + estimate_token_count, needs_compaction as check_compact, + needs_compaction_by_tokens, CompactionConfig, + }; + let config = CompactionConfig::default(); + let by_messages = check_compact(&session, &config); + let estimated = estimate_token_count( + &session.messages, + Some(&manifest.model.system_prompt), + None, + ); + let by_tokens = needs_compaction_by_tokens(estimated, &config); + let by_quota = + if let Some(headroom) = kernel_clone.scheduler.token_headroom(agent_id) { + let threshold = (headroom as f64 * 0.8) as u64; + estimated as u64 > threshold && session.messages.len() > 4 + } else { + false + }; + by_messages || by_tokens || by_quota + }; + // Auto-compact if the session is large before running the loop if needs_compact { info!(agent_id = %agent_id, messages = session.messages.len(), "Auto-compacting session"); @@ -1876,7 +1939,7 @@ impl OpenFangKernel { Ok(msg) => { info!(agent_id = %agent_id, "{msg}"); // Reload the session after compaction - if let Ok(Some(reloaded)) = memory.get_session(session.id) { + if let Ok(Some(reloaded)) = memory.get_session(session_id) { session = reloaded; } } @@ -1954,7 +2017,7 @@ impl OpenFangKernel { Some(&kernel_clone.hooks), ctx_window, Some(&kernel_clone.process_manager), - None, // content_blocks (streaming path uses text only for now) + content_blocks, ) .await; @@ -2206,6 +2269,7 @@ impl OpenFangKernel { content_blocks: Option>, sender_id: Option, sender_name: Option, + channel_type: Option, ) -> KernelResult { // Check metering quota before starting self.metering @@ -2346,7 +2410,7 @@ impl OpenFangKernel { .ok() .and_then(|(s, _)| s), user_name, - channel_type: None, + channel_type: channel_type.clone(), is_subagent: manifest .metadata .get("is_subagent") @@ -2386,6 +2450,7 @@ impl OpenFangKernel { ), sender_id, sender_name, + owner_ids: manifest.owner_ids.clone(), }; manifest.model.system_prompt = openfang_runtime::prompt_builder::build_system_prompt(&prompt_ctx); @@ -4050,6 +4115,7 @@ impl OpenFangKernel { Some(kh), None, None, + None, ), ) .await diff --git a/crates/openfang-kernel/src/registry.rs b/crates/openfang-kernel/src/registry.rs index b3c3a4962..01f866e71 100644 --- a/crates/openfang-kernel/src/registry.rs +++ b/crates/openfang-kernel/src/registry.rs @@ -310,6 +310,17 @@ impl AgentRegistry { Ok(()) } + /// Update an agent's owner identity IDs for sender verification. + pub fn update_owner_ids(&self, id: AgentId, owner_ids: Vec) -> OpenFangResult<()> { + let mut entry = self + .agents + .get_mut(&id) + .ok_or_else(|| OpenFangError::AgentNotFound(id.to_string()))?; + entry.manifest.owner_ids = owner_ids; + entry.last_active = chrono::Utc::now(); + Ok(()) + } + /// Mark an agent's onboarding as complete. pub fn mark_onboarding_complete(&self, id: AgentId) -> OpenFangResult<()> { let mut entry = self diff --git a/crates/openfang-kernel/src/wizard.rs b/crates/openfang-kernel/src/wizard.rs index ad6dafe84..c05d351f9 100644 --- a/crates/openfang-kernel/src/wizard.rs +++ b/crates/openfang-kernel/src/wizard.rs @@ -182,6 +182,7 @@ impl SetupWizard { exec_policy: None, tool_allowlist: vec![], tool_blocklist: vec![], + owner_ids: vec![], }; let skills_to_install: Vec = intent diff --git a/crates/openfang-kernel/tests/wasm_agent_integration_test.rs b/crates/openfang-kernel/tests/wasm_agent_integration_test.rs index 2d659c2e2..76aeb3aaf 100644 --- a/crates/openfang-kernel/tests/wasm_agent_integration_test.rs +++ b/crates/openfang-kernel/tests/wasm_agent_integration_test.rs @@ -303,7 +303,7 @@ async fn test_wasm_agent_streaming_fallback() { let agent_id = kernel.spawn_agent(manifest).unwrap(); let (mut rx, handle) = kernel - .send_message_streaming(agent_id, "Hi!", None, None, None) + .send_message_streaming(agent_id, "Hi!", None, None, None, None, None) .expect("Streaming should start"); // Collect all stream events diff --git a/crates/openfang-runtime/src/agent_loop.rs b/crates/openfang-runtime/src/agent_loop.rs index 2fd481d68..987b00791 100644 --- a/crates/openfang-runtime/src/agent_loop.rs +++ b/crates/openfang-runtime/src/agent_loop.rs @@ -310,6 +310,7 @@ pub async fn run_agent_loop( let ctx_window = context_window_tokens.unwrap_or(DEFAULT_CONTEXT_WINDOW); let context_budget = ContextBudget::new(ctx_window); let mut any_tools_executed = false; + let mut action_validator_retried = false; for iteration in 0..max_iterations { debug!(iteration, "Agent loop iteration"); @@ -394,6 +395,32 @@ pub async fn run_agent_loop( crate::reply_directives::parse_directives(&text); let text = cleaned_text; + // Action validator: detect when user requested an explicit action + // (send, execute+send, etc.) but the LLM responded with text only + // and no tool was called — re-prompt once to force tool execution. + if !action_validator_retried + && !any_tools_executed + && response.tool_calls.is_empty() + && requires_tool_action(user_message) + { + action_validator_retried = true; + warn!( + agent = %manifest.name, + iteration, + "Action validator: user requested explicit action but LLM \ + responded with text only — re-prompting to execute tools" + ); + messages.push(Message::assistant(text.clone())); + messages.push(Message::user( + "IMPORTANT: You described the action but did NOT execute it. \ + The user asked you to perform an action using a tool (e.g. \ + channel_send, shell_exec). You MUST call the appropriate tool \ + NOW. Do not describe what you would do — actually do it." + .to_string(), + )); + continue; + } + // NO_REPLY: agent intentionally chose not to reply if text.trim() == "NO_REPLY" || parsed_directives.silent { debug!(agent = %manifest.name, "Agent chose NO_REPLY/silent — silent completion"); @@ -954,11 +981,29 @@ async fn call_with_retry( warn!( category = ?classified.category, retryable = classified.is_retryable, + attempt, raw = %raw_error, "LLM error classified: {}", classified.sanitized_message ); + // Retry retryable errors (e.g. timeouts, network issues) + if classified.is_retryable && attempt < MAX_RETRIES { + let delay = classified + .suggested_delay_ms + .filter(|&d| d > 0) + .unwrap_or(BASE_RETRY_DELAY_MS * 2u64.pow(attempt)); + warn!( + attempt, + delay_ms = delay, + category = ?classified.category, + "Retryable LLM error, retrying after delay" + ); + tokio::time::sleep(std::time::Duration::from_millis(delay)).await; + last_error = Some(classified.sanitized_message); + continue; + } + if let (Some(provider), Some(cooldown)) = (provider, cooldown) { cooldown.record_failure(provider, classified.is_billing); } @@ -1068,11 +1113,29 @@ async fn stream_with_retry( warn!( category = ?classified.category, retryable = classified.is_retryable, + attempt, raw = %raw_error, "LLM stream error classified: {}", classified.sanitized_message ); + // Retry retryable errors (e.g. timeouts, network issues) + if classified.is_retryable && attempt < MAX_RETRIES { + let delay = classified + .suggested_delay_ms + .filter(|&d| d > 0) + .unwrap_or(BASE_RETRY_DELAY_MS * 2u64.pow(attempt)); + warn!( + attempt, + delay_ms = delay, + category = ?classified.category, + "Retryable LLM stream error, retrying after delay" + ); + tokio::time::sleep(std::time::Duration::from_millis(delay)).await; + last_error = Some(classified.sanitized_message); + continue; + } + if let (Some(provider), Some(cooldown)) = (provider, cooldown) { cooldown.record_failure(provider, classified.is_billing); } @@ -1277,6 +1340,7 @@ pub async fn run_agent_loop_streaming( let ctx_window = context_window_tokens.unwrap_or(DEFAULT_CONTEXT_WINDOW); let context_budget = ContextBudget::new(ctx_window); let mut any_tools_executed = false; + let mut action_validator_retried = false; for iteration in 0..max_iterations { debug!(iteration, "Streaming agent loop iteration"); @@ -1381,6 +1445,32 @@ pub async fn run_agent_loop_streaming( crate::reply_directives::parse_directives(&text); let text = cleaned_text_s; + // Action validator (streaming): detect when user requested an explicit + // action but the LLM responded with text only — re-prompt once to + // force tool execution. + if !action_validator_retried + && !any_tools_executed + && response.tool_calls.is_empty() + && requires_tool_action(user_message) + { + action_validator_retried = true; + warn!( + agent = %manifest.name, + iteration, + "Action validator (streaming): user requested explicit action but LLM \ + responded with text only — re-prompting to execute tools" + ); + messages.push(Message::assistant(text.clone())); + messages.push(Message::user( + "IMPORTANT: You described the action but did NOT execute it. \ + The user asked you to perform an action using a tool (e.g. \ + channel_send, shell_exec). You MUST call the appropriate tool \ + NOW. Do not describe what you would do — actually do it." + .to_string(), + )); + continue; + } + // NO_REPLY: agent intentionally chose not to reply if text.trim() == "NO_REPLY" || parsed_directives_s.silent { debug!(agent = %manifest.name, "Agent chose NO_REPLY/silent (streaming) — silent completion"); @@ -1873,6 +1963,83 @@ pub async fn run_agent_loop_streaming( /// 10. `<|plugin|>...<|endofblock|>` — Qwen/ChatGLM thinking-model format /// 11. `Action: tool\nAction Input: {"key":"value"}` — ReAct-style (LM Studio, GPT-OSS) /// 12. `tool_name\n{"key":"value"}` — bare name + JSON on next line (Llama 4 Scout) +/// +/// Detect whether a user message contains an explicit request to perform a side-effecting +/// action that requires a tool call (e.g. "send to Telegram", "execute the script and send"). +/// +/// This is intentionally conservative — it only matches patterns where the user clearly +/// instructs the agent to *do* something on an external channel/system. Simple questions +/// like "what would you send?" or "how do I send an email?" will NOT match. +fn requires_tool_action(user_message: &str) -> bool { + let msg = user_message.to_lowercase(); + + // Pattern 1: explicit tool name in the message (user literally says which tool to call) + let explicit_tools = [ + "channel_send", + "shell_exec", + "web_fetch", + "agent_send", + "cron_create", + "file_write", + "schedule_create", + "text_to_speech", + ]; + if explicit_tools.iter().any(|t| msg.contains(t)) { + return true; + } + + // Pattern 2: action verb + target channel/system (multi-language) + let action_verbs = [ + "invia", + "manda", + "send", + "scrivi su", + "pubblica su", + "posta su", + "post to", + "forward to", + "inoltra su", + "riferisci su", + "notifica su", + "notify on", + "deliver to", + "recapita su", + ]; + let targets = [ + "telegram", + "whatsapp", + "slack", + "discord", + "email", + "e-mail", + "al signore", + ]; + for verb in &action_verbs { + if msg.contains(verb) { + for target in &targets { + if msg.contains(target) { + return true; + } + } + } + } + + // Pattern 3: "esegui ... e invia" / "execute ... and send" compound actions + let execute_verbs = ["esegui", "execute", "run", "lancia"]; + let send_verbs = ["invia", "send", "manda"]; + for ev in &execute_verbs { + if msg.contains(ev) { + for sv in &send_verbs { + if msg.contains(sv) { + return true; + } + } + } + } + + false +} + /// 13. `{"name":"tool","arguments":{...}}` — Llama 3.1+ variant /// /// Validates tool names against available tools and returns synthetic `ToolCall` entries. diff --git a/crates/openfang-runtime/src/drivers/claude_code.rs b/crates/openfang-runtime/src/drivers/claude_code.rs index 8b93fca49..45291daa2 100644 --- a/crates/openfang-runtime/src/drivers/claude_code.rs +++ b/crates/openfang-runtime/src/drivers/claude_code.rs @@ -10,9 +10,11 @@ use crate::llm_driver::{CompletionRequest, CompletionResponse, LlmDriver, LlmError, StreamEvent}; use async_trait::async_trait; +use base64::Engine; use dashmap::DashMap; -use openfang_types::message::{ContentBlock, Role, StopReason, TokenUsage}; +use openfang_types::message::{ContentBlock, MessageContent, Role, StopReason, TokenUsage}; use serde::Deserialize; +use std::path::PathBuf; use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncReadExt}; use tracing::{debug, info, warn}; @@ -130,8 +132,16 @@ impl ClaudeCodeDriver { } /// Build a text prompt from the completion request messages. - fn build_prompt(request: &CompletionRequest) -> String { + /// + /// When messages contain image blocks, the images are decoded from base64, + /// written to a temporary directory, and referenced by file path in the + /// prompt text. The caller must pass the returned `image_dir` to + /// `--add-dir` so the Claude CLI can read them, and clean up the directory + /// after the CLI exits. + fn build_prompt(request: &CompletionRequest) -> PreparedPrompt { let mut parts = Vec::new(); + let mut image_dir: Option = None; + let mut image_count = 0u32; if let Some(ref sys) = request.system { parts.push(format!("[System]\n{sys}")); @@ -143,13 +153,77 @@ impl ClaudeCodeDriver { Role::Assistant => "Assistant", Role::System => "System", }; - let text = msg.content.text_content(); - if !text.is_empty() { - parts.push(format!("[{role_label}]\n{text}")); + + match &msg.content { + MessageContent::Text(s) => { + if !s.is_empty() { + parts.push(format!("[{role_label}]\n{s}")); + } + } + MessageContent::Blocks(blocks) => { + let mut msg_parts = Vec::new(); + for block in blocks { + match block { + ContentBlock::Text { text, .. } => { + if !text.is_empty() { + msg_parts.push(text.clone()); + } + } + ContentBlock::Image { media_type, data } => { + // Create temp dir on first image + if image_dir.is_none() { + let dir = PathBuf::from(format!( + "/tmp/openfang-images-{}", + uuid::Uuid::new_v4() + )); + if let Err(e) = std::fs::create_dir_all(&dir) { + warn!(error = %e, "Failed to create image temp dir"); + continue; + } + image_dir = Some(dir); + } + + let ext = match media_type.as_str() { + "image/png" => "png", + "image/gif" => "gif", + "image/webp" => "webp", + _ => "jpg", + }; + image_count += 1; + let filename = format!("image-{image_count}.{ext}"); + let path = image_dir.as_ref().unwrap().join(&filename); + + match base64::engine::general_purpose::STANDARD.decode(data) { + Ok(decoded) => { + if let Err(e) = std::fs::write(&path, &decoded) { + warn!(error = %e, "Failed to write temp image"); + continue; + } + msg_parts.push(format!( + "@{}", + path.display() + )); + } + Err(e) => { + warn!(error = %e, "Failed to decode base64 image"); + } + } + } + _ => {} + } + } + let text = msg_parts.join("\n"); + if !text.is_empty() { + parts.push(format!("[{role_label}]\n{text}")); + } + } } } - parts.join("\n\n") + PreparedPrompt { + text: parts.join("\n\n"), + image_dir, + } } /// Map a model ID like "claude-code/opus" to CLI --model flag value. @@ -188,6 +262,25 @@ impl ClaudeCodeDriver { } } +/// Prompt text plus optional temp directory containing decoded images. +struct PreparedPrompt { + text: String, + /// Temporary directory holding image files. The caller should pass this + /// path via `--add-dir` and remove it after the CLI exits. + image_dir: Option, +} + +impl PreparedPrompt { + /// Clean up temporary image files, if any. + fn cleanup(&self) { + if let Some(ref dir) = self.image_dir { + if let Err(e) = std::fs::remove_dir_all(dir) { + debug!(error = %e, dir = %dir.display(), "Failed to clean up image temp dir"); + } + } + } +} + /// JSON output from `claude -p --output-format json`. /// /// The CLI may return the response text in different fields depending on @@ -231,12 +324,12 @@ struct ClaudeStreamEvent { #[async_trait] impl LlmDriver for ClaudeCodeDriver { async fn complete(&self, request: CompletionRequest) -> Result { - let prompt = Self::build_prompt(&request); + let prepared = Self::build_prompt(&request); let model_flag = Self::model_flag(&request.model); let mut cmd = tokio::process::Command::new(&self.cli_path); cmd.arg("-p") - .arg(&prompt) + .arg(&prepared.text) .arg("--output-format") .arg("json"); @@ -244,6 +337,11 @@ impl LlmDriver for ClaudeCodeDriver { cmd.arg("--dangerously-skip-permissions"); } + // Allow the CLI to read temp image files + if let Some(ref dir) = prepared.image_dir { + cmd.arg("--add-dir").arg(dir); + } + if let Some(ref model) = model_flag { cmd.arg("--model").arg(model); } @@ -257,6 +355,7 @@ impl LlmDriver for ClaudeCodeDriver { // Spawn child process instead of cmd.output() so we can track PID and timeout let mut child = cmd.spawn().map_err(|e| { + prepared.cleanup(); LlmError::Http(format!( "Claude Code CLI not found or failed to start ({}). \ Install: npm install -g @anthropic-ai/claude-code && claude auth", @@ -286,6 +385,7 @@ impl LlmDriver for ClaudeCodeDriver { Ok(Ok(status)) => status, Ok(Err(e)) => { warn!(error = %e, model = %pid_label, "Claude Code CLI subprocess failed"); + prepared.cleanup(); return Err(LlmError::Http(format!( "Claude Code CLI subprocess failed: {e}" ))); @@ -298,6 +398,7 @@ impl LlmDriver for ClaudeCodeDriver { "Claude Code CLI subprocess timed out, killing process" ); let _ = child.kill().await; + prepared.cleanup(); return Err(LlmError::Http(format!( "Claude Code CLI subprocess timed out after {}s — process killed", self.message_timeout_secs @@ -350,12 +451,16 @@ impl LlmDriver for ClaudeCodeDriver { format!("Claude Code CLI exited with code {code}: {detail}") }; + prepared.cleanup(); return Err(LlmError::Api { status: code as u16, message, }); } + // Clean up temp images now that the CLI has finished + prepared.cleanup(); + info!(model = %pid_label, "Claude Code CLI subprocess completed successfully"); let stdout = String::from_utf8_lossy(&stdout_bytes); @@ -403,12 +508,12 @@ impl LlmDriver for ClaudeCodeDriver { request: CompletionRequest, tx: tokio::sync::mpsc::Sender, ) -> Result { - let prompt = Self::build_prompt(&request); + let prepared = Self::build_prompt(&request); let model_flag = Self::model_flag(&request.model); let mut cmd = tokio::process::Command::new(&self.cli_path); cmd.arg("-p") - .arg(&prompt) + .arg(&prepared.text) .arg("--output-format") .arg("stream-json") .arg("--verbose"); @@ -417,6 +522,11 @@ impl LlmDriver for ClaudeCodeDriver { cmd.arg("--dangerously-skip-permissions"); } + // Allow the CLI to read temp image files + if let Some(ref dir) = prepared.image_dir { + cmd.arg("--add-dir").arg(dir); + } + if let Some(ref model) = model_flag { cmd.arg("--model").arg(model); } @@ -429,6 +539,7 @@ impl LlmDriver for ClaudeCodeDriver { debug!(cli = %self.cli_path, "Spawning Claude Code CLI (streaming)"); let mut child = cmd.spawn().map_err(|e| { + prepared.cleanup(); LlmError::Http(format!( "Claude Code CLI not found or failed to start ({}). \ Install: npm install -g @anthropic-ai/claude-code && claude auth", @@ -445,6 +556,7 @@ impl LlmDriver for ClaudeCodeDriver { let stdout = child.stdout.take().ok_or_else(|| { self.active_pids.remove(&pid_label); + prepared.cleanup(); LlmError::Http("No stdout from claude CLI".to_string()) })?; @@ -529,12 +641,16 @@ impl LlmDriver for ClaudeCodeDriver { "Claude Code CLI streaming subprocess timed out, killing process" ); let _ = child.kill().await; + prepared.cleanup(); return Err(LlmError::Http(format!( "Claude Code CLI streaming subprocess timed out after {}s — process killed", self.message_timeout_secs ))); } + // Clean up temp images now that the CLI has finished reading them + prepared.cleanup(); + // Wait for process to finish let status = child .wait() @@ -645,10 +761,10 @@ mod tests { }; let prompt = ClaudeCodeDriver::build_prompt(&request); - assert!(prompt.contains("[System]")); - assert!(prompt.contains("You are helpful.")); - assert!(prompt.contains("[User]")); - assert!(prompt.contains("Hello")); + assert!(prompt.text.contains("[System]")); + assert!(prompt.text.contains("You are helpful.")); + assert!(prompt.text.contains("[User]")); + assert!(prompt.text.contains("Hello")); } #[test] diff --git a/crates/openfang-runtime/src/prompt_builder.rs b/crates/openfang-runtime/src/prompt_builder.rs index fbe0bdbd3..e7b32081e 100644 --- a/crates/openfang-runtime/src/prompt_builder.rs +++ b/crates/openfang-runtime/src/prompt_builder.rs @@ -59,6 +59,11 @@ pub struct PromptContext { pub sender_id: Option, /// Sender display name. pub sender_name: Option, + /// Owner identity IDs for automated sender verification. + /// When non-empty, the prompt builder compares sender_id against this list + /// and injects a deterministic VERIFIED/STRANGER verdict, removing the need + /// for the LLM to perform the comparison. + pub owner_ids: Vec, } /// Build the complete system prompt from a `PromptContext`. @@ -154,9 +159,11 @@ pub fn build_system_prompt(ctx: &PromptContext) -> String { // Section 9.1 — Sender Identity (skip for subagents) if !ctx.is_subagent { - if let Some(sender_line) = - build_sender_section(ctx.sender_name.as_deref(), ctx.sender_id.as_deref()) - { + if let Some(sender_line) = build_sender_section( + ctx.sender_name.as_deref(), + ctx.sender_id.as_deref(), + &ctx.owner_ids, + ) { sections.push(sender_line); } } @@ -432,13 +439,41 @@ fn build_channel_section(channel: &str) -> String { ) } -fn build_sender_section(sender_name: Option<&str>, sender_id: Option<&str>) -> Option { - match (sender_name, sender_id) { - (Some(name), Some(id)) => Some(format!("## Sender\nMessage from: {name} ({id})")), - (Some(name), None) => Some(format!("## Sender\nMessage from: {name}")), - (None, Some(id)) => Some(format!("## Sender\nMessage from: {id}")), - (None, None) => None, - } +fn build_sender_section( + sender_name: Option<&str>, + sender_id: Option<&str>, + owner_ids: &[String], +) -> Option { + let id_str = match (sender_name, sender_id) { + (Some(name), Some(id)) => format!("{name} ({id})"), + (Some(name), None) => name.to_string(), + (None, Some(id)) => id.to_string(), + (None, None) => return None, + }; + + // When owner_ids is configured, inject a deterministic verdict so the LLM + // does not need to compare phone numbers itself. + let verdict = if !owner_ids.is_empty() { + if let Some(sid) = sender_id { + let norm = |s: &str| s.replace([' ', '-', '(', ')'], ""); + let normalized_sid = norm(sid); + let is_owner = owner_ids.iter().any(|oid| norm(oid) == normalized_sid); + if is_owner { + "\nšŸ”“ VERIFIED OWNER — This is your owner/master. Respond normally with full access." + } else { + "\n🚫 STRANGER — This person is NOT your owner. \ + Apply PRIVACY-RULES.md strictly. Do NOT share personal information, \ + phone numbers, schedules, or finances. Do NOT use owner titles (Signore) \ + or familiar tone. Do NOT load MEMORY.md." + } + } else { + "\nāš ļø UNVERIFIED — No sender ID available. Treat as stranger. Apply privacy rules." + } + } else { + "" // No owner_ids configured — no automated verdict + }; + + Some(format!("## Sender\nMessage from: {id_str}{verdict}")) } fn build_peer_agents_section(self_name: &str, peers: &[(String, String, String)]) -> String { @@ -970,4 +1005,51 @@ mod tests { assert_eq!(capitalize(""), ""); assert_eq!(capitalize("MCP"), "MCP"); } + + #[test] + fn test_sender_section_no_owner_ids() { + let result = build_sender_section(Some("Alice"), Some("+123"), &[]); + assert_eq!(result, Some("## Sender\nMessage from: Alice (+123)".to_string())); + } + + #[test] + fn test_sender_section_owner_verified() { + let owner_ids = vec!["+393760105565".to_string()]; + let result = build_sender_section(Some("Federico"), Some("+393760105565"), &owner_ids); + let text = result.unwrap(); + assert!(text.contains("VERIFIED OWNER")); + assert!(!text.contains("STRANGER")); + } + + #[test] + fn test_sender_section_stranger() { + let owner_ids = vec!["+393760105565".to_string()]; + let result = build_sender_section(Some("Unknown"), Some("+391234567890"), &owner_ids); + let text = result.unwrap(); + assert!(text.contains("STRANGER")); + assert!(!text.contains("VERIFIED OWNER")); + } + + #[test] + fn test_sender_section_no_sender_id_with_owners() { + let owner_ids = vec!["+393760105565".to_string()]; + let result = build_sender_section(Some("Unknown"), None, &owner_ids); + let text = result.unwrap(); + assert!(text.contains("UNVERIFIED")); + } + + #[test] + fn test_sender_section_normalized_comparison() { + // Owner configured with spaces, sender without + let owner_ids = vec!["+39 376 010 5565".to_string()]; + let result = build_sender_section(Some("Fed"), Some("+393760105565"), &owner_ids); + let text = result.unwrap(); + assert!(text.contains("VERIFIED OWNER")); + } + + #[test] + fn test_sender_section_none_when_no_info() { + assert!(build_sender_section(None, None, &[]).is_none()); + assert!(build_sender_section(None, None, &["+123".to_string()]).is_none()); + } } diff --git a/crates/openfang-types/src/agent.rs b/crates/openfang-types/src/agent.rs index 420380715..ea0d55ede 100644 --- a/crates/openfang-types/src/agent.rs +++ b/crates/openfang-types/src/agent.rs @@ -491,6 +491,12 @@ pub struct AgentManifest { /// Tool blocklist — these tools are excluded (applied after allowlist). #[serde(default, deserialize_with = "crate::serde_compat::vec_lenient")] pub tool_blocklist: Vec, + /// Owner identity IDs (e.g. phone numbers) for sender verification. + /// When a message arrives with a sender_id, the runtime compares it against + /// this list and injects a verified/stranger verdict into the system prompt, + /// removing the need for the LLM to perform the comparison itself. + #[serde(default, deserialize_with = "crate::serde_compat::vec_lenient")] + pub owner_ids: Vec, } fn default_true() -> bool { @@ -525,6 +531,7 @@ impl Default for AgentManifest { exec_policy: None, tool_allowlist: Vec::new(), tool_blocklist: Vec::new(), + owner_ids: Vec::new(), } } } @@ -782,6 +789,7 @@ mod tests { exec_policy: None, tool_allowlist: Vec::new(), tool_blocklist: Vec::new(), + owner_ids: Vec::new(), }; let json = serde_json::to_string(&manifest).unwrap(); let deserialized: AgentManifest = serde_json::from_str(&json).unwrap(); diff --git a/crates/openfang-types/src/config.rs b/crates/openfang-types/src/config.rs index 71748eb02..dd5f5c5ef 100644 --- a/crates/openfang-types/src/config.rs +++ b/crates/openfang-types/src/config.rs @@ -93,6 +93,16 @@ pub struct ChannelOverrides { /// Defaults to true. Set to false to suppress automatic reactions (e.g. on Telegram). #[serde(default = "default_true")] pub lifecycle_reactions: bool, + /// Debounce window in milliseconds for batching rapid messages from the same sender. + /// When > 0, messages arriving within this window are merged into one dispatch. + /// Default: 3000 (3 seconds — batches rapid-fire messages into a single request). + #[serde(default = "default_debounce_ms")] + pub debounce_ms: u64, + /// Maximum debounce wait in milliseconds (safety cap). + /// Even if the user keeps typing, dispatch after this many ms from the first message. + /// Default: 30000 (30 seconds). Only relevant when debounce_ms > 0. + #[serde(default = "default_debounce_max_ms")] + pub debounce_max_ms: u64, } impl Default for ChannelOverrides { @@ -108,6 +118,8 @@ impl Default for ChannelOverrides { usage_footer: None, typing_mode: None, lifecycle_reactions: true, + debounce_ms: default_debounce_ms(), + debounce_max_ms: default_debounce_max_ms(), } } } @@ -1255,6 +1267,14 @@ fn default_true() -> bool { true } +fn default_debounce_ms() -> u64 { + 5_000 +} + +fn default_debounce_max_ms() -> u64 { + 30_000 +} + fn default_thread_ttl() -> u64 { 24 } diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100644 index 000000000..9b759d7f9 --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,9 @@ +#!/bin/bash +# Drop root privileges and run openfang as the openfang user +chown -R openfang:openfang /data 2>/dev/null +chown -R openfang:openfang /home/openfang 2>/dev/null + +# Resurrect PM2 processes (whatsapp-gateway etc.) before starting OpenFang +gosu openfang bash -c 'pm2 resurrect 2>/dev/null || true' + +exec gosu openfang openfang "$@" diff --git a/packages/whatsapp-gateway/ecosystem.config.cjs b/packages/whatsapp-gateway/ecosystem.config.cjs new file mode 100644 index 000000000..c44070a21 --- /dev/null +++ b/packages/whatsapp-gateway/ecosystem.config.cjs @@ -0,0 +1,23 @@ +module.exports = { + apps: [{ + name: 'whatsapp-gateway', + script: 'index.js', + cwd: '/data/whatsapp-gateway', + node_args: '--experimental-vm-modules', + watch: false, + autorestart: true, + max_restarts: 50, + min_uptime: '10s', + restart_delay: 5000, + max_memory_restart: '256M', + exp_backoff_restart_delay: 1000, + error_file: '/data/whatsapp-gateway/logs/pm2-error.log', + out_file: '/data/whatsapp-gateway/logs/pm2-out.log', + merge_logs: true, + time: true, + env: { + NODE_ENV: 'production', + OPENFANG_DEFAULT_AGENT: 'ambrogio', + }, + }], +}; diff --git a/packages/whatsapp-gateway/index.js b/packages/whatsapp-gateway/index.js index 973d6b3f0..3cfacbb48 100644 --- a/packages/whatsapp-gateway/index.js +++ b/packages/whatsapp-gateway/index.js @@ -5,7 +5,13 @@ import { randomUUID } from 'node:crypto'; import fs from 'node:fs'; import path from 'node:path'; import { fileURLToPath } from 'node:url'; -import makeWASocket, { useMultiFileAuthState, DisconnectReason, fetchLatestBaileysVersion } from '@whiskeysockets/baileys'; +import makeWASocket, { + useMultiFileAuthState, + DisconnectReason, + fetchLatestBaileysVersion, + Browsers, + downloadMediaMessage, +} from '@whiskeysockets/baileys'; import QRCode from 'qrcode'; import pino from 'pino'; @@ -13,31 +19,249 @@ const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); // --------------------------------------------------------------------------- -// Config from environment +// Config // --------------------------------------------------------------------------- const PORT = parseInt(process.env.WHATSAPP_GATEWAY_PORT || '3009', 10); const OPENFANG_URL = (process.env.OPENFANG_URL || 'http://127.0.0.1:4200').replace(/\/+$/, ''); -const DEFAULT_AGENT = process.env.OPENFANG_DEFAULT_AGENT || 'assistant'; +const DEFAULT_AGENT = process.env.OPENFANG_DEFAULT_AGENT || 'ambrogio'; +const AGENT_UUID_CACHE = new Map(); // --------------------------------------------------------------------------- // State // --------------------------------------------------------------------------- -let sock = null; // Baileys socket -let sessionId = ''; // current session identifier -let qrDataUrl = ''; // latest QR code as data:image/png;base64,... -let connStatus = 'disconnected'; // disconnected | qr_ready | connected +let sock = null; +let sessionId = ''; +let qrDataUrl = ''; +let connStatus = 'disconnected'; let qrExpired = false; let statusMessage = 'Not started'; -let reconnectAttempt = 0; // exponential backoff counter -const MAX_RECONNECT_DELAY = 60_000; // cap at 60s +let reconnectAttempt = 0; +let reconnectTimer = null; +let connectedSince = null; +let flushInterval = null; +let evProcessUnsub = null; +const MAX_RECONNECT_DELAY = 60_000; +const pendingReplies = new Map(); + +// --------------------------------------------------------------------------- +// Message deduplication — prevents processing the same message multiple times +// (e.g. after Signal session re-establishment / decryption retry) +// --------------------------------------------------------------------------- +const PROCESSED_IDS_PATH = path.join(__dirname, '.processed_ids.json'); +const DEDUP_MAX_SIZE = 500; +let processedIds = new Set(); +try { + const raw = fs.readFileSync(PROCESSED_IDS_PATH, 'utf8'); + const arr = JSON.parse(raw); + if (Array.isArray(arr)) processedIds = new Set(arr.slice(-DEDUP_MAX_SIZE)); +} catch (_) {} + +function markProcessed(msgId) { + processedIds.add(msgId); + // Trim to max size + if (processedIds.size > DEDUP_MAX_SIZE) { + const arr = [...processedIds]; + processedIds = new Set(arr.slice(-Math.floor(DEDUP_MAX_SIZE * 0.8))); + } + // Persist async (non-blocking) + fs.writeFile(PROCESSED_IDS_PATH, JSON.stringify([...processedIds]), () => {}); +} + +// --------------------------------------------------------------------------- +// Media download & serving — allows forwarding WhatsApp images to OpenFang +// --------------------------------------------------------------------------- +const MEDIA_DIR = path.join(__dirname, 'media_cache'); +const MEDIA_MAX_AGE_MS = 30 * 60_000; // 30 minutes +if (!fs.existsSync(MEDIA_DIR)) fs.mkdirSync(MEDIA_DIR, { recursive: true }); + +// Periodic cleanup of expired media files +setInterval(() => { + try { + const now = Date.now(); + for (const f of fs.readdirSync(MEDIA_DIR)) { + const fp = path.join(MEDIA_DIR, f); + const stat = fs.statSync(fp); + if (now - stat.mtimeMs > MEDIA_MAX_AGE_MS) { + fs.unlinkSync(fp); + } + } + } catch (_) {} +}, 5 * 60_000); + +const MEDIA_TYPE_MAP = { + imageMessage: { ext: 'jpg', label: 'Photo' }, + videoMessage: { ext: 'mp4', label: 'Video' }, + stickerMessage: { ext: 'webp', label: 'Sticker' }, + audioMessage: { ext: 'ogg', label: 'Audio' }, + documentMessage: { ext: null, label: 'Document' }, +}; + +/** + * Download media from a WhatsApp message, save to disk, return local URL. + * Returns { url, label, caption } or null on failure. + */ +async function downloadMedia(msg) { + const m = msg.message; + if (!m) return null; + + for (const [key, info] of Object.entries(MEDIA_TYPE_MAP)) { + const media = m[key]; + if (!media) continue; + + try { + const buffer = await downloadMediaMessage(msg, 'buffer', {}); + const ext = info.ext || (media.fileName?.split('.').pop()) || 'bin'; + const filename = `${msg.key.id || randomUUID()}.${ext}`; + const filePath = path.join(MEDIA_DIR, filename); + fs.writeFileSync(filePath, buffer); + + const caption = media.caption || null; + const localUrl = `http://127.0.0.1:${PORT}/media/${filename}`; + + log('info', `Downloaded ${info.label} (${buffer.length} bytes) → ${filename}`); + return { url: localUrl, label: info.label, caption }; + } catch (err) { + log('error', `Media download failed (${key}): ${err.message}`); + return null; + } + } + return null; +} + +// Per-sender message debounce: accumulates rapid messages and sends as one batch. +// Media messages are buffered IMMEDIATELY (before download) so the debounce timer +// starts on arrival, not after the slow media download finishes. +const DEBOUNCE_MS = 5_000; // 5 seconds of silence before flushing +const DEBOUNCE_MEDIA_MS = 15_000; // 15 seconds when batch contains media (image uploads are slow) +const senderBuffers = new Map(); // senderJid → { entries: [], timer, replyJid, isGroup, groupJid, wasMentioned, phone, pushName } + +function debounceMessage(senderJid, msgData) { + let buf = senderBuffers.get(senderJid); + if (!buf) { + buf = { entries: [], timer: null, replyJid: null, isGroup: false, groupJid: null, wasMentioned: false }; + senderBuffers.set(senderJid, buf); + } + + // Each entry is either a resolved string or a Promise (for pending media downloads) + buf.entries.push(msgData.textOrPromise); + buf.replyJid = msgData.replyJid; + buf.isGroup = msgData.isGroup; + buf.groupJid = msgData.groupJid; + buf.phone = msgData.phone; + buf.pushName = msgData.pushName; + if (msgData.wasMentioned) buf.wasMentioned = true; + + // Reset the timer on each new message + // Use longer debounce when batch contains media (WhatsApp uploads images one at a time with delays) + const hasMedia = buf.entries.some(e => e && typeof e.then === 'function'); + const effectiveDebounce = hasMedia ? DEBOUNCE_MEDIA_MS : DEBOUNCE_MS; + if (buf.timer) clearTimeout(buf.timer); + buf.timer = setTimeout(() => flushSenderBuffer(senderJid), effectiveDebounce); + + log('info', `Debounce: buffered msg from ${msgData.pushName} (${buf.entries.length} in batch, flushing in ${effectiveDebounce}ms${hasMedia ? ' [media]' : ''})`); +} + +async function flushSenderBuffer(senderJid) { + const buf = senderBuffers.get(senderJid); + if (!buf || buf.entries.length === 0) return; + senderBuffers.delete(senderJid); + + // Resolve any pending media download promises + const resolved = await Promise.all(buf.entries); + // Filter out nulls (failed downloads with no text) + const texts = resolved.filter(Boolean); + if (texts.length === 0) return; + + const combinedText = texts.join('\n'); + const count = texts.length; + log('info', `Debounce flush: ${count} message(s) from ${buf.pushName} → single OpenFang call`); + + try { + await handleIncoming(combinedText, buf.phone, buf.pushName, buf.replyJid, buf.isGroup, buf.groupJid, buf.wasMentioned); + } catch (err) { + log('error', `Debounce flush failed for ${buf.pushName}: ${err.message}`); + } +} + +// --------------------------------------------------------------------------- +// Logging +// --------------------------------------------------------------------------- +const log = (level, msg) => { + const ts = new Date().toISOString(); + console[level === 'error' ? 'error' : 'log'](`[gateway] [${ts}] ${msg}`); +}; + +// --------------------------------------------------------------------------- +// Markdown → WhatsApp formatting +// --------------------------------------------------------------------------- +function markdownToWhatsApp(text) { + if (!text) return text; + // Bold: **text** or __text__ → *text* + text = text.replace(/\*\*(.+?)\*\*/g, '*$1*'); + text = text.replace(/__(.+?)__/g, '*$1*'); + // Italic: *text* (single) or _text_ → _text_ (WhatsApp italic) + // Be careful not to convert already-bold markers + text = text.replace(/(? { + reconnectTimer = null; + try { + await startConnection(); + } catch (err) { + log('error', `Reconnect failed: ${err.message}`); + scheduleReconnect('reconnect-error'); + } + }, delay); +} // --------------------------------------------------------------------------- // Baileys connection // --------------------------------------------------------------------------- async function startConnection() { - const logger = pino({ level: 'warn' }); - const authDir = path.join(__dirname, 'auth_store'); + cleanupSocket(); + const logger = pino({ level: 'info' }); + const authDir = path.join(__dirname, 'auth_store'); const { state, saveCreds } = await useMultiFileAuthState(authDir); const { version } = await fetchLatestBaileysVersion(); @@ -47,152 +271,358 @@ async function startConnection() { connStatus = 'disconnected'; statusMessage = 'Connecting...'; + log('info', `Starting connection (Baileys v${version.join('.')})`); + sock = makeWASocket({ version, auth: state, logger, - printQRInTerminal: true, - browser: ['OpenFang', 'Desktop', '1.0.0'], + browser: Browsers.ubuntu('Chrome'), + keepAliveIntervalMs: 25_000, + connectTimeoutMs: 20_000, + retryRequestDelayMs: 250, + markOnlineOnConnect: true, + defaultQueryTimeoutMs: 60_000, + emitOwnEvents: false, + fireInitQueries: true, + syncFullHistory: false, + generateHighQualityLinkPreview: false, + getMessage: async () => undefined, }); - // Save credentials whenever they update - sock.ev.on('creds.update', saveCreds); - - // Connection state changes (QR code, connected, disconnected) - sock.ev.on('connection.update', async (update) => { - const { connection, lastDisconnect, qr } = update; - - if (qr) { - // New QR code generated — convert to data URL - try { - qrDataUrl = await QRCode.toDataURL(qr, { width: 256, margin: 2 }); - connStatus = 'qr_ready'; - qrExpired = false; - statusMessage = 'Scan this QR code with WhatsApp → Linked Devices'; - console.log('[gateway] QR code ready — waiting for scan'); - } catch (err) { - console.error('[gateway] QR generation failed:', err.message); - } + // ------------------------------------------------------------------ + // Use sock.ev.process() — the canonical Baileys v6 event API. + // This receives consolidated event batches AFTER the internal + // buffer is flushed, avoiding the "events stuck in buffer" problem. + // ------------------------------------------------------------------ + evProcessUnsub = sock.ev.process(async (events) => { + // Credentials update + if (events['creds.update']) { + await saveCreds(); + } + + // Connection state + if (events['connection.update']) { + await handleConnectionUpdate(events['connection.update']); } - if (connection === 'close') { - const statusCode = lastDisconnect?.error?.output?.statusCode; - const reason = lastDisconnect?.error?.output?.payload?.message || 'unknown'; - console.log(`[gateway] Connection closed: ${reason} (${statusCode})`); - - if (statusCode === DisconnectReason.loggedOut) { - // User logged out from phone — clear auth and stop (truly non-recoverable) - connStatus = 'disconnected'; - statusMessage = 'Logged out. Generate a new QR code to reconnect.'; - qrDataUrl = ''; - sock = null; - reconnectAttempt = 0; - // Remove auth store so next connect gets a fresh QR - const authPath = path.join(__dirname, 'auth_store'); - if (fs.existsSync(authPath)) { - fs.rmSync(authPath, { recursive: true, force: true }); + // Incoming messages + if (events['messages.upsert']) { + const { messages, type } = events['messages.upsert']; + log('info', `messages.upsert event: ${messages.length} message(s), type=${type}`); + + if (type !== 'notify') { + log('info', `Skipping non-notify batch (type=${type})`); + return; + } + + for (const msg of messages) { + if (msg.key.fromMe) { + log('info', `Skipping own message ${msg.key.id}`); + continue; + } + if (msg.key.remoteJid === 'status@broadcast') continue; + + // --- Deduplication: skip already-processed messages --- + const msgId = msg.key.id; + if (processedIds.has(msgId)) { + log('info', `Skipping duplicate message ${msgId}`); + continue; + } + + const remoteJid = msg.key.remoteJid || ''; + const isGroup = remoteJid.endsWith('@g.us'); + + // In groups, the actual sender is in msg.key.participant; + // in DMs, the sender is remoteJid itself. + const sender = isGroup + ? (msg.key.participant || remoteJid) + : remoteJid; + + let text = + msg.message?.conversation || + msg.message?.extendedTextMessage?.text || + msg.message?.imageMessage?.caption || + msg.message?.videoMessage?.caption || + ''; + + // Detect if this is a media message that needs async download + let mediaPromise = null; + const hasMediaKey = !text && Object.keys(MEDIA_TYPE_MAP).some(k => msg.message?.[k]); + + if (hasMediaKey) { + // Start download in background — do NOT await here. + // The promise resolves to a text string (or null on failure). + mediaPromise = downloadMedia(msg).then(media => { + if (!media) return null; + return media.caption + ? `[${media.label}: ${media.url}]\n${media.caption}` + : `[${media.label}: ${media.url}]`; + }).catch(err => { + log('error', `Async media download failed: ${err.message}`); + return null; + }); + } + + // vCard / contact message support + if (!text && !hasMediaKey && msg.message?.contactMessage) { + const vc = msg.message.contactMessage; + const vcardStr = vc.vcard || ''; + // Extract name from displayName or vCard FN field + const contactName = vc.displayName || (vcardStr.match(/FN:(.*)/)?.[1]?.trim()) || 'Sconosciuto'; + // Extract phone numbers from TEL fields + const phones = [...vcardStr.matchAll(/TEL[^:]*:([\d\s+\-().]+)/g)].map(m => m[1].trim()); + text = `[Contatto condiviso] ${contactName}` + (phones.length ? ` — ${phones.join(', ')}` : ''); + log('info', `vCard received from ${sender}: ${contactName}, phones: ${phones.join(', ')}`); + } + + // Multi-contact message (contactsArrayMessage) + if (!text && !hasMediaKey && msg.message?.contactsArrayMessage) { + const contacts = msg.message.contactsArrayMessage.contacts || []; + const entries = contacts.map(c => { + const vcardStr = c.vcard || ''; + const name = c.displayName || (vcardStr.match(/FN:(.*)/)?.[1]?.trim()) || '?'; + const phones = [...vcardStr.matchAll(/TEL[^:]*:([\d\s+\-().]+)/g)].map(m => m[1].trim()); + return `${name}${phones.length ? ` (${phones.join(', ')})` : ''}`; + }); + text = `[Contatti condivisi] ${entries.join('; ')}`; + log('info', `Multi-vCard received from ${sender}: ${entries.length} contacts`); + } + + if (!text && !mediaPromise) { + log('info', `No text content in message from ${sender} (stub=${msg.messageStubType || 'none'})`); + continue; + } + + // Mark as processed BEFORE forwarding (prevents re-processing on decrypt retry) + markProcessed(msgId); + + const phone = '+' + sender.replace(/@.*$/, ''); + const pushName = msg.pushName || phone; + + // Detect @mention of the bot in groups + let wasMentioned = false; + if (isGroup && sock?.user?.id) { + const botJid = sock.user.id.replace(/:\d+@/, '@'); // normalize "123:45@s.whatsapp.net" → "123@s.whatsapp.net" + const mentionedJids = msg.message?.extendedTextMessage?.contextInfo?.mentionedJid || []; + wasMentioned = mentionedJids.includes(botJid) || mentionedJids.includes(sock.user.id); + // Also check raw text for @phone patterns + if (!wasMentioned) { + const botPhone = botJid.replace(/@.*$/, ''); + wasMentioned = (text || '').includes(`@${botPhone}`); + } + } + + const groupLabel = isGroup ? ` [group:${remoteJid}]` : ''; + log('info', `Incoming from ${pushName} (${phone})${groupLabel}: ${(text || '[media downloading]').substring(0, 120)}`); + + // Read receipt (blue ticks) + try { + if (sock) await sock.readMessages([msg.key]); + } catch (err) { + log('error', `Read receipt failed: ${err.message}`); } - } else { - // All other disconnect reasons are recoverable — reconnect with backoff - // Covers: restartRequired(515), timedOut(408), connectionClosed(428), - // connectionLost(408), connectionReplaced(440), badSession(500), etc. - reconnectAttempt++; - const delay = Math.min(1000 * Math.pow(2, reconnectAttempt - 1), MAX_RECONNECT_DELAY); - console.log(`[gateway] Reconnecting in ${delay}ms (attempt ${reconnectAttempt})...`); - statusMessage = `Reconnecting (attempt ${reconnectAttempt})...`; - connStatus = 'disconnected'; - setTimeout(() => startConnection(), delay); + + // In groups, reply to the group; in DMs, reply to the individual. + const replyJid = isGroup ? remoteJid : sender; + + // Debounce: accumulate rapid messages from same sender, flush after 5s of silence. + // For media messages, pass the download promise so debounce starts NOW (not after download). + const textOrPromise = mediaPromise || text; + debounceMessage(sender, { textOrPromise, phone, pushName, replyJid, isGroup, groupJid: remoteJid, wasMentioned }); } } + }); + + // ------------------------------------------------------------------ + // Safety net: periodic buffer flush every 3 seconds. + // In theory processNodeWithBuffer already flushes, but if a code + // path inside Baileys activates the buffer without flushing, this + // ensures events don't get stuck forever. + // ------------------------------------------------------------------ + flushInterval = setInterval(() => { + try { + if (sock?.ev?.flush) sock.ev.flush(); + } catch (_) {} + }, 3_000); + + log('info', 'Event handlers registered via sock.ev.process()'); +} - if (connection === 'open') { - connStatus = 'connected'; +// --------------------------------------------------------------------------- +// Handle connection updates +// --------------------------------------------------------------------------- +async function handleConnectionUpdate(update) { + const { connection, lastDisconnect, qr } = update; + + if (qr) { + try { + qrDataUrl = await QRCode.toDataURL(qr, { width: 256, margin: 2 }); + connStatus = 'qr_ready'; qrExpired = false; - qrDataUrl = ''; - reconnectAttempt = 0; - statusMessage = 'Connected to WhatsApp'; - console.log('[gateway] Connected to WhatsApp!'); + statusMessage = 'Scan QR with WhatsApp > Linked Devices'; + log('info', 'QR code ready'); + } catch (err) { + log('error', `QR generation failed: ${err.message}`); } - }); + } - // Incoming messages → forward to OpenFang - sock.ev.on('messages.upsert', async ({ messages, type }) => { - if (type !== 'notify') return; - - for (const msg of messages) { - // Skip messages from self and status broadcasts - if (msg.key.fromMe) continue; - if (msg.key.remoteJid === 'status@broadcast') continue; - - const remoteJid = msg.key.remoteJid || ''; - const isGroup = remoteJid.endsWith('@g.us'); - - let text = msg.message?.conversation - || msg.message?.extendedTextMessage?.text - || msg.message?.imageMessage?.caption - || ''; - - // Detect media type if no text - if (!text) { - const m = msg.message; - if (m?.imageMessage) text = '[Image received]' + (m.imageMessage.caption ? ': ' + m.imageMessage.caption : ''); - else if (m?.audioMessage) text = '[Voice note received]'; - else if (m?.videoMessage) text = '[Video received]' + (m.videoMessage.caption ? ': ' + m.videoMessage.caption : ''); - else if (m?.documentMessage) text = '[Document received: ' + (m.documentMessage.fileName || 'file') + ']'; - else if (m?.stickerMessage) text = '[Sticker received]'; - else continue; // Only skip truly empty messages + if (connection === 'open') { + connStatus = 'connected'; + qrExpired = false; + qrDataUrl = ''; + reconnectAttempt = 0; + connectedSince = Date.now(); + statusMessage = 'Connected to WhatsApp'; + log('info', 'Connected to WhatsApp!'); + + // TCP keepalive — prevents silent network deaths in containers + try { + const rawSocket = sock?.ws?.socket?._socket; + if (rawSocket && typeof rawSocket.setKeepAlive === 'function') { + rawSocket.setKeepAlive(true, 10_000); + log('info', 'TCP keepalive enabled'); } + } catch (_) {} - // For groups: real sender is in participant; for DMs: it's remoteJid - const senderJid = isGroup ? (msg.key.participant || '') : remoteJid; - const phone = '+' + senderJid.replace(/@.*$/, ''); - const pushName = msg.pushName || phone; - - const metadata = { - channel: 'whatsapp', - sender: phone, - sender_name: pushName, - }; - if (isGroup) { - metadata.group_jid = remoteJid; - metadata.is_group = true; - console.log(`[gateway] Group msg from ${pushName} (${phone}) in ${remoteJid}: ${text.substring(0, 80)}`); - } else { - console.log(`[gateway] Incoming from ${pushName} (${phone}): ${text.substring(0, 80)}`); - } + flushPendingReplies(); + } - // Forward to OpenFang agent - try { - const response = await forwardToOpenFang(text, phone, pushName, metadata); - if (response && sock) { - // Reply in the same context: group → group, DM → DM - const replyJid = isGroup ? remoteJid : senderJid.replace(/@.*$/, '') + '@s.whatsapp.net'; - await sock.sendMessage(replyJid, { text: response }); - console.log(`[gateway] Replied to ${pushName}${isGroup ? ' in group ' + remoteJid : ''}`); - } - } catch (err) { - console.error(`[gateway] Forward/reply failed:`, err.message); + if (connection === 'close') { + const statusCode = lastDisconnect?.error?.output?.statusCode; + const reason = lastDisconnect?.error?.output?.payload?.message || 'unknown'; + const uptime = connectedSince ? Math.round((Date.now() - connectedSince) / 1000) : 0; + log('info', `Closed after ${uptime}s — ${reason} (code: ${statusCode})`); + + if (statusCode === DisconnectReason.loggedOut) { + connStatus = 'disconnected'; + statusMessage = 'Logged out. POST /login/start to reconnect.'; + qrDataUrl = ''; + cleanupSocket(); + reconnectAttempt = 0; + const authPath = path.join(__dirname, 'auth_store'); + if (fs.existsSync(authPath)) { + fs.rmSync(authPath, { recursive: true, force: true }); } + log('info', 'Auth cleared'); + } else if (statusCode === DisconnectReason.connectionReplaced) { + log('info', 'Connection replaced — backing off'); + reconnectAttempt = Math.max(reconnectAttempt, 3); + scheduleReconnect('connection-replaced'); + } else { + scheduleReconnect(reason); + } + } +} + +// --------------------------------------------------------------------------- +// Handle incoming → OpenFang → reply +// --------------------------------------------------------------------------- +async function handleIncoming(text, phone, pushName, replyJid, isGroup, groupJid, wasMentioned) { + let response; + try { + response = await forwardToOpenFang(text, phone, pushName, isGroup, groupJid, wasMentioned); + } catch (err) { + log('error', `OpenFang error for ${pushName}: ${err.message}`); + return; + } + if (!response) { + log('info', `No response from OpenFang for ${pushName}`); + return; + } + + // Convert markdown formatting to WhatsApp-native formatting + response = markdownToWhatsApp(response); + + if (sock && connStatus === 'connected') { + try { + await sock.sendMessage(replyJid, { text: response }); + const target = isGroup ? `group ${groupJid}` : pushName; + log('info', `Replied to ${target} (${response.length} chars)`); + return; + } catch (err) { + log('error', `Send failed for ${pushName}: ${err.message}`); + } + } + + log('info', `Buffering reply for ${pushName}`); + pendingReplies.set(replyJid, { text: response, timestamp: Date.now() }); +} + +// --------------------------------------------------------------------------- +// Flush pending replies +// --------------------------------------------------------------------------- +async function flushPendingReplies() { + if (pendingReplies.size === 0) return; + const maxAge = 5 * 60_000; + const now = Date.now(); + + for (const [jid, { text, timestamp }] of pendingReplies) { + pendingReplies.delete(jid); + if (now - timestamp > maxAge) { + log('info', `Discarding stale reply for ${jid}`); + continue; } + try { + await sock.sendMessage(jid, { text }); + log('info', `Flushed reply to ${jid}`); + } catch (err) { + log('error', `Flush failed for ${jid}: ${err.message}`); + } + } +} + +// --------------------------------------------------------------------------- +// Resolve agent name → UUID +// --------------------------------------------------------------------------- +async function resolveAgentUUID(nameOrUUID) { + if (/^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i.test(nameOrUUID)) { + return nameOrUUID; + } + if (AGENT_UUID_CACHE.has(nameOrUUID)) { + return AGENT_UUID_CACHE.get(nameOrUUID); + } + return new Promise((resolve, reject) => { + http.get(`${OPENFANG_URL}/api/agents`, (res) => { + let body = ''; + res.on('data', (chunk) => (body += chunk)); + res.on('end', () => { + try { + const agents = JSON.parse(body); + const agent = agents.find((a) => a.name === nameOrUUID); + if (agent) { + AGENT_UUID_CACHE.set(nameOrUUID, agent.id); + resolve(agent.id); + } else { + reject(new Error(`Agent "${nameOrUUID}" not found`)); + } + } catch (e) { + reject(new Error(`Parse agents failed: ${e.message}`)); + } + }); + }).on('error', reject); }); } // --------------------------------------------------------------------------- -// Forward incoming message to OpenFang API, return agent response +// Forward to OpenFang // --------------------------------------------------------------------------- -function forwardToOpenFang(text, phone, pushName, metadata) { +async function forwardToOpenFang(text, phone, pushName, isGroup, groupJid, wasMentioned) { + const agentId = await resolveAgentUUID(DEFAULT_AGENT); return new Promise((resolve, reject) => { - const payload = JSON.stringify({ + const body = { message: text, - metadata: metadata || { - channel: 'whatsapp', - sender: phone, - sender_name: pushName, - }, - }); - - const url = new URL(`${OPENFANG_URL}/api/agents/${encodeURIComponent(DEFAULT_AGENT)}/message`); - + sender_id: phone, + sender_name: pushName, + channel_type: 'whatsapp', + }; + if (isGroup) { + body.is_group = true; + body.group_id = groupJid; + body.was_mentioned = wasMentioned; + } + const payload = JSON.stringify(body); + const url = new URL(`${OPENFANG_URL}/api/agents/${agentId}/message`); const req = http.request( { hostname: url.hostname, @@ -203,7 +633,7 @@ function forwardToOpenFang(text, phone, pushName, metadata) { 'Content-Type': 'application/json', 'Content-Length': Buffer.byteLength(payload), }, - timeout: 120_000, // LLM calls can be slow + timeout: 300_000, }, (res) => { let body = ''; @@ -211,7 +641,6 @@ function forwardToOpenFang(text, phone, pushName, metadata) { res.on('end', () => { try { const data = JSON.parse(body); - // The /api/agents/{id}/message endpoint returns { response: "..." } resolve(data.response || data.message || data.text || ''); } catch { resolve(body.trim() || ''); @@ -219,28 +648,19 @@ function forwardToOpenFang(text, phone, pushName, metadata) { }); }, ); - req.on('error', reject); - req.on('timeout', () => { - req.destroy(); - reject(new Error('OpenFang API timeout')); - }); + req.on('timeout', () => { req.destroy(); reject(new Error('OpenFang timeout')); }); req.write(payload); req.end(); }); } // --------------------------------------------------------------------------- -// Send a message via Baileys (called by OpenFang for outgoing) +// Send outgoing message // --------------------------------------------------------------------------- async function sendMessage(to, text) { - if (!sock || connStatus !== 'connected') { - throw new Error('WhatsApp not connected'); - } - - // If already a full JID (group or user), use as-is; otherwise normalize phone → JID - const jid = to.includes('@') ? to : to.replace(/^\+/, '') + '@s.whatsapp.net'; - + if (!sock || connStatus !== 'connected') throw new Error('WhatsApp not connected'); + const jid = to.replace(/^\+/, '').replace(/@.*$/, '') + '@s.whatsapp.net'; await sock.sendMessage(jid, { text }); } @@ -252,11 +672,8 @@ function parseBody(req) { let body = ''; req.on('data', (chunk) => (body += chunk)); req.on('end', () => { - try { - resolve(body ? JSON.parse(body) : {}); - } catch (e) { - reject(new Error('Invalid JSON')); - } + try { resolve(body ? JSON.parse(body) : {}); } + catch (e) { reject(new Error('Invalid JSON')); } }); req.on('error', reject); }); @@ -273,7 +690,6 @@ function jsonResponse(res, status, data) { } const server = http.createServer(async (req, res) => { - // CORS preflight if (req.method === 'OPTIONS') { res.writeHead(204, { 'Access-Control-Allow-Origin': '*', @@ -287,101 +703,91 @@ const server = http.createServer(async (req, res) => { const pathname = url.pathname; try { - // POST /login/start — start Baileys connection, return QR if (req.method === 'POST' && pathname === '/login/start') { - // If already connected, just return success if (connStatus === 'connected') { return jsonResponse(res, 200, { - qr_data_url: '', - session_id: sessionId, - message: 'Already connected to WhatsApp', - connected: true, + qr_data_url: '', session_id: sessionId, + message: 'Already connected', connected: true, }); } - - // Start a new connection (resets any existing) await startConnection(); - - // Wait briefly for QR to generate (Baileys emits it quickly) let waited = 0; while (!qrDataUrl && connStatus !== 'connected' && waited < 15_000) { await new Promise((r) => setTimeout(r, 300)); waited += 300; } - return jsonResponse(res, 200, { - qr_data_url: qrDataUrl, - session_id: sessionId, - message: statusMessage, - connected: connStatus === 'connected', + qr_data_url: qrDataUrl, session_id: sessionId, + message: statusMessage, connected: connStatus === 'connected', }); } - // GET /login/status — poll for connection status if (req.method === 'GET' && pathname === '/login/status') { return jsonResponse(res, 200, { connected: connStatus === 'connected', message: statusMessage, expired: qrExpired, + uptime: connectedSince ? Math.round((Date.now() - connectedSince) / 1000) : 0, }); } - // POST /message/send — send outgoing message via Baileys if (req.method === 'POST' && pathname === '/message/send') { const body = await parseBody(req); - const { to, text } = body; + if (!body.to || !body.text) return jsonResponse(res, 400, { error: 'Missing "to" or "text"' }); + await sendMessage(body.to, body.text); + return jsonResponse(res, 200, { success: true, message: 'Sent' }); + } - if (!to || !text) { - return jsonResponse(res, 400, { error: 'Missing "to" or "text" field' }); + // Serve cached media files + if (req.method === 'GET' && pathname.startsWith('/media/')) { + const filename = path.basename(pathname); + const filePath = path.join(MEDIA_DIR, filename); + if (fs.existsSync(filePath)) { + const ext = path.extname(filename).slice(1); + const mimeMap = { jpg: 'image/jpeg', jpeg: 'image/jpeg', png: 'image/png', webp: 'image/webp', mp4: 'video/mp4', ogg: 'audio/ogg', pdf: 'application/pdf' }; + const contentType = mimeMap[ext] || 'application/octet-stream'; + const data = fs.readFileSync(filePath); + res.writeHead(200, { 'Content-Type': contentType, 'Content-Length': data.length }); + return res.end(data); } - - await sendMessage(to, text); - return jsonResponse(res, 200, { success: true, message: 'Sent' }); + return jsonResponse(res, 404, { error: 'Media not found' }); } - // GET /health — health check if (req.method === 'GET' && pathname === '/health') { return jsonResponse(res, 200, { status: 'ok', connected: connStatus === 'connected', session_id: sessionId || null, + uptime: connectedSince ? Math.round((Date.now() - connectedSince) / 1000) : 0, + pending_replies: pendingReplies.size, }); } - // 404 jsonResponse(res, 404, { error: 'Not found' }); } catch (err) { - console.error(`[gateway] ${req.method} ${pathname} error:`, err.message); + log('error', `${req.method} ${pathname}: ${err.message}`); jsonResponse(res, 500, { error: err.message }); } }); +// --------------------------------------------------------------------------- +// Start +// --------------------------------------------------------------------------- server.listen(PORT, '127.0.0.1', () => { - console.log(`[gateway] WhatsApp Web gateway listening on http://127.0.0.1:${PORT}`); - console.log(`[gateway] OpenFang URL: ${OPENFANG_URL}`); - console.log(`[gateway] Default agent: ${DEFAULT_AGENT}`); + log('info', `Listening on http://127.0.0.1:${PORT}`); + log('info', `OpenFang: ${OPENFANG_URL} | Agent: ${DEFAULT_AGENT}`); - // Auto-connect if credentials already exist from a previous session const credsPath = path.join(__dirname, 'auth_store', 'creds.json'); if (fs.existsSync(credsPath)) { - console.log('[gateway] Found existing credentials — auto-connecting...'); + log('info', 'Credentials found — auto-connecting...'); startConnection().catch((err) => { - console.error('[gateway] Auto-connect failed:', err.message); - statusMessage = 'Auto-connect failed. Use POST /login/start to retry.'; + log('error', `Auto-connect failed: ${err.message}`); + statusMessage = 'Auto-connect failed. POST /login/start to retry.'; }); } else { - console.log('[gateway] No credentials found. Waiting for POST /login/start to begin QR flow...'); + log('info', 'No credentials. POST /login/start for QR flow.'); } }); -// Graceful shutdown -process.on('SIGINT', () => { - console.log('\n[gateway] Shutting down...'); - if (sock) sock.end(); - server.close(() => process.exit(0)); -}); - -process.on('SIGTERM', () => { - if (sock) sock.end(); - server.close(() => process.exit(0)); -}); +process.on('SIGINT', () => { log('info', 'SIGINT'); cleanupSocket(); server.close(() => process.exit(0)); }); +process.on('SIGTERM', () => { log('info', 'SIGTERM'); cleanupSocket(); server.close(() => process.exit(0)); });