From ff32e0281b061a6b7920f7891cd9f0f78afdf382 Mon Sep 17 00:00:00 2001 From: dimkk Date: Sun, 1 Mar 2026 20:27:01 +0300 Subject: [PATCH 1/7] feat: add codex provider support and forwarding workflow --- README.md | 113 +++-- src/ccbot/bot.py | 433 ++++++++++++++++---- src/ccbot/codex_mapper.py | 281 +++++++++++++ src/ccbot/config.py | 83 +++- src/ccbot/handlers/message_queue.py | 161 ++++++-- src/ccbot/handlers/status_polling.py | 15 +- src/ccbot/main.py | 50 ++- src/ccbot/port_forward.py | 231 +++++++++++ src/ccbot/session.py | 84 +++- src/ccbot/session_monitor.py | 160 ++++++-- src/ccbot/tmux_manager.py | 6 +- src/ccbot/transcript_parser.py | 351 +++++++++++++++- tests/ccbot/handlers/test_message_queue.py | 29 ++ tests/ccbot/test_codex_mapper.py | 151 +++++++ tests/ccbot/test_config.py | 41 ++ tests/ccbot/test_main.py | 19 + tests/ccbot/test_port_forward.py | 49 +++ tests/ccbot/test_session.py | 68 ++- tests/ccbot/test_transcript_parser_codex.py | 170 ++++++++ 19 files changed, 2296 insertions(+), 199 deletions(-) create mode 100644 src/ccbot/codex_mapper.py create mode 100644 src/ccbot/port_forward.py create mode 100644 tests/ccbot/handlers/test_message_queue.py create mode 100644 tests/ccbot/test_codex_mapper.py create mode 100644 tests/ccbot/test_main.py create mode 100644 tests/ccbot/test_port_forward.py create mode 100644 tests/ccbot/test_transcript_parser_codex.py diff --git a/README.md b/README.md index e7ee01dc..d81e212c 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [中文文档](README_CN.md) [Русская документация](README_RU.md) -Control Claude Code sessions remotely via Telegram — monitor, interact, and manage AI coding sessions running in tmux. +Control AI coding sessions remotely via Telegram — monitor, interact, and manage agent sessions running in tmux. https://github.com/user-attachments/assets/15ffb38e-5eb9-4720-93b9-412e4961dc93 @@ -23,23 +23,26 @@ In fact, CCBot itself was built this way — iterating on itself through Claude ## Features -- **Topic-based sessions** — Each Telegram topic maps 1:1 to a tmux window and Claude session +- **Topic-based sessions** — Each Telegram topic maps 1:1 to a tmux window and agent session - **Real-time notifications** — Get Telegram messages for assistant responses, thinking content, tool use/result, and local command output - **Interactive UI** — Navigate AskUserQuestion, ExitPlanMode, and Permission Prompts via inline keyboard - **Voice messages** — Voice messages are transcribed via OpenAI and forwarded as text -- **Send messages** — Forward text to Claude Code via tmux keystrokes -- **Slash command forwarding** — Send any `/command` directly to Claude Code (e.g. `/clear`, `/compact`, `/cost`) -- **Create new sessions** — Start Claude Code sessions from Telegram via directory browser +- **Send messages** — Forward text to the active CLI via tmux keystrokes +- **Slash command forwarding** — Optional forwarding for unknown `/command` to the active CLI +- **Create new sessions** — Start agent sessions from Telegram via directory browser - **Resume sessions** — Pick up where you left off by resuming an existing Claude session in a directory - **Kill sessions** — Close a topic to auto-kill the associated tmux window - **Message history** — Browse conversation history with pagination (newest first) -- **Hook-based session tracking** — Auto-associates tmux windows with Claude sessions via `SessionStart` hook +- **Provider support** — Claude Code (`claude`) and Codex CLI (`codex`) +- **Session tracking** — Claude via `SessionStart` hook, Codex via rollout mapper - **Persistent state** — Thread bindings and read offsets survive restarts ## Prerequisites - **tmux** — must be installed and available in PATH -- **Claude Code** — the CLI tool (`claude`) must be installed +- **Agent CLI** — install one or both: + - Claude Code (`claude`) + - Codex CLI (`codex`) ## Installation @@ -92,7 +95,12 @@ ALLOWED_USERS=your_telegram_user_id | ----------------------- | ---------- | ------------------------------------------------ | | `CCBOT_DIR` | `~/.ccbot` | Config/state directory (`.env` loaded from here) | | `TMUX_SESSION_NAME` | `ccbot` | Tmux session name | -| `CLAUDE_COMMAND` | `claude` | Command to run in new windows | +| `CCBOT_PROVIDER` | `claude` | Session provider: `claude` or `codex` | +| `CCBOT_AGENT_COMMAND` | provider default (`claude`/`codex`) | Command to run in new windows | +| `CLAUDE_COMMAND` | `claude` | Backward-compatible alias for `CCBOT_AGENT_COMMAND` | +| `CCBOT_CODEX_SESSIONS_PATH` | `~/.codex/sessions` | Codex rollout root path | +| `CCBOT_FORWARD_PORTS` | _(none)_ | Comma-separated local ports to expose publicly on startup (e.g. `3000,5173`) | +| `CCBOT_FORWARD_SLASH` | `true` | Forward unknown `/command` to CLI | | `MONITOR_POLL_INTERVAL` | `2.0` | Polling interval in seconds | | `CCBOT_SHOW_HIDDEN_DIRS` | `false` | Show hidden (dot) directories in directory browser | | `OPENAI_API_KEY` | _(none)_ | OpenAI API key for voice message transcription | @@ -104,10 +112,12 @@ There is no runtime formatter switch to MarkdownV2. > If running on a VPS where there's no interactive terminal to approve permissions, consider: > > ``` -> CLAUDE_COMMAND=IS_SANDBOX=1 claude --dangerously-skip-permissions +> CCBOT_AGENT_COMMAND=IS_SANDBOX=1 claude --dangerously-skip-permissions > ``` -## Hook Setup (Recommended) +## Session Tracking Setup + +### Claude Provider (Recommended) Auto-install via CLI: @@ -131,6 +141,10 @@ Or manually add to `~/.claude/settings.json`: This writes window-session mappings to `$CCBOT_DIR/session_map.json` (`~/.ccbot/` by default), so the bot automatically tracks which Claude session is running in each tmux window — even after `/clear` or session restarts. +### Codex Provider + +No hook installation is required. CCBot maps tmux windows to Codex rollout files from `~/.codex/sessions` (or `CCBOT_CODEX_SESSIONS_PATH`). + ## Usage ```bash @@ -141,6 +155,40 @@ ccbot uv run ccbot ``` +### Provider Switching + +Switch provider at startup: + +```bash +# Claude Code +CCBOT_PROVIDER=claude CCBOT_AGENT_COMMAND=claude uv run ccbot + +# Codex CLI +CCBOT_PROVIDER=codex CCBOT_AGENT_COMMAND=codex uv run ccbot +``` + +You can also place these values in `~/.ccbot/.env`. + +### Port Forwarding + +Expose local dev ports with CLI flags: + +```bash +uv run ccbot --forward 3000 +``` + +Multiple ports: + +```bash +uv run ccbot --forward 3000 --forward 5173 +``` + +Behavior: + +- On startup, CCBot creates tunnel(s), sends real public URLs to all allowed users, and pins that message. +- On shutdown, CCBot stops tunnel(s) and unpins the message. +- Tunnel provider priority: `ngrok` -> `cloudflared` -> `localhost.run (ssh)`. + ### Commands **Bot commands:** @@ -150,9 +198,9 @@ uv run ccbot | `/start` | Show welcome message | | `/history` | Message history for this topic | | `/screenshot` | Capture terminal screenshot | -| `/esc` | Send Escape to interrupt Claude | +| `/esc` | Send Escape to interrupt active session | -**Claude Code commands (forwarded via tmux):** +**Claude provider commands (forwarded via tmux):** | Command | Description | | ---------- | ---------------------------- | @@ -162,7 +210,7 @@ uv run ccbot | `/help` | Show Claude Code help | | `/memory` | Edit CLAUDE.md | -Any unrecognized `/command` is also forwarded to Claude Code as-is (e.g. `/review`, `/doctor`, `/init`). +For `CCBOT_PROVIDER=claude`, unknown `/command` is forwarded as-is (e.g. `/review`, `/doctor`, `/init`). ### Topic Workflow @@ -174,11 +222,11 @@ Any unrecognized `/command` is also forwarded to Claude Code as-is (e.g. `/revie 2. Send any message in the topic 3. A directory browser appears — select the project directory 4. If the directory has existing Claude sessions, a session picker appears — choose one to resume or start fresh -5. A tmux window is created, `claude` starts (with `--resume` if resuming), and your pending message is forwarded +5. A tmux window is created, agent command starts (with `--resume` when selecting a Claude session), and your pending message is forwarded **Sending messages:** -Once a topic is bound to a session, just send text or voice messages in that topic — text gets forwarded to Claude Code via tmux keystrokes, and voice messages are automatically transcribed and forwarded as text. +Once a topic is bound to a session, just send text or voice messages in that topic — text gets forwarded to the active CLI via tmux keystrokes, and voice messages are transcribed then forwarded as text. **Killing a session:** @@ -206,18 +254,14 @@ I'll look into the login bug... The monitor polls session JSONL files every 2 seconds and sends notifications for: -- **Assistant responses** — Claude's text replies +- **Assistant responses** — model text replies - **Thinking content** — Shown as expandable blockquotes - **Tool use/result** — Summarized with stats (e.g. "Read 42 lines", "Found 5 matches") - **Local command output** — stdout from commands like `git status`, prefixed with `❯ command_name` Notifications are delivered to the topic bound to the session's window. -Formatting note: -- Telegram messages are rendered with parse mode `HTML` using `chatgpt-md-converter` -- Long messages are split with HTML tag awareness to preserve code blocks and formatting - -## Running Claude Code in tmux +## Running in tmux ### Option 1: Create via Telegram (Recommended) @@ -230,20 +274,41 @@ Formatting note: ```bash tmux attach -t ccbot tmux new-window -n myproject -c ~/Code/myproject -# Then start Claude Code in the new window +# Then start your CLI in the new window claude +# or +codex +``` + +The window must be in the `ccbot` tmux session (configurable via `TMUX_SESSION_NAME`). Claude provider uses hooks for session mapping; Codex provider uses rollout mapping. + +## Resume Existing Session + +To attach CCBot to an existing Codex session, run: + +```bash +CCBOT_PROVIDER=codex \ +CCBOT_AGENT_COMMAND='codex resume ' \ +uv run ccbot ``` -The window must be in the `ccbot` tmux session (configurable via `TMUX_SESSION_NAME`). The hook will automatically register it in `session_map.json` when Claude starts. +Example: + +```bash +CCBOT_PROVIDER=codex \ +CCBOT_AGENT_COMMAND='codex resume 019c9eef-c5f7-7dc2-9e92-de59a1c3cd28' \ +uv run ccbot +``` ## Data Storage | Path | Description | | ------------------------------- | ----------------------------------------------------------------------- | | `$CCBOT_DIR/state.json` | Thread bindings, window states, display names, and per-user read offsets | -| `$CCBOT_DIR/session_map.json` | Hook-generated `{tmux_session:window_id: {session_id, cwd, window_name}}` mappings | +| `$CCBOT_DIR/session_map.json` | Provider mappings `{tmux_session:window_id: {session_id, cwd, ...}}` | | `$CCBOT_DIR/monitor_state.json` | Monitor byte offsets per session (prevents duplicate notifications) | | `~/.claude/projects/` | Claude Code session data (read-only) | +| `~/.codex/sessions/` | Codex rollout session logs (read-only) | ## File Structure diff --git a/src/ccbot/bot.py b/src/ccbot/bot.py index bf6e8009..422be268 100644 --- a/src/ccbot/bot.py +++ b/src/ccbot/bot.py @@ -1,17 +1,17 @@ """Telegram bot handlers — the main UI layer of CCBot. Registers all command/callback/message handlers and manages the bot lifecycle. -Each Telegram topic maps 1:1 to a tmux window (Claude session). +Each Telegram topic maps 1:1 to a tmux window (agent session). Core responsibilities: - Command handlers: /start, /history, /screenshot, /esc, /kill, /unbind, - plus forwarding unknown /commands to Claude Code via tmux. + plus forwarding unknown /commands to the active agent via tmux. - Callback query handler: directory browser, history pagination, interactive UI navigation, screenshot refresh. - Topic-based routing: each named topic binds to one tmux window. Unbound topics trigger the directory browser to create a new session. - Photo handling: photos sent by user are downloaded and forwarded - to Claude Code as file paths (photo_handler). + to the active agent as file paths (photo_handler). - Voice handling: voice messages are transcribed via OpenAI API and forwarded as text (voice_handler). - Automatic cleanup: closing a topic kills the associated window @@ -58,6 +58,7 @@ ) from .config import config +from .codex_mapper import codex_session_mapper from .handlers.callback_data import ( CB_ASK_DOWN, CB_ASK_ENTER, @@ -129,6 +130,7 @@ from .markdown_v2 import convert_markdown from .handlers.response_builder import build_response_parts from .handlers.status_polling import status_poll_loop +from .port_forward import PortForwardManager, PortTunnel from .screenshot import text_to_image from .session import session_manager from .session_monitor import NewMessage, SessionMonitor @@ -145,9 +147,12 @@ # Status polling task _status_poll_task: asyncio.Task | None = None +_port_forward_manager: PortForwardManager | None = None +_port_forward_task: asyncio.Task[None] | None = None +_forward_pin_message_ids: dict[int, int] = {} -# Claude Code commands shown in bot menu (forwarded via tmux) -CC_COMMANDS: dict[str, str] = { +# Claude commands shown in bot menu (forwarded via tmux) +CLAUDE_COMMANDS: dict[str, str] = { "clear": "↗ Clear conversation history", "compact": "↗ Compact conversation context", "cost": "↗ Show token/cost usage", @@ -157,6 +162,12 @@ } +def _provider_menu_commands() -> dict[str, str]: + if config.provider == "claude": + return CLAUDE_COMMANDS + return {} + + def is_user_allowed(user_id: int | None) -> bool: return user_id is not None and config.is_user_allowed(user_id) @@ -174,6 +185,50 @@ def _get_thread_id(update: Update) -> int | None: return tid +async def _ensure_private_window_binding(user_id: int) -> tuple[str | None, str]: + """Ensure private chat has a stable bound window (thread_id=0).""" + private_tid = 0 + bound = session_manager.get_window_for_thread(user_id, private_tid) + if bound: + w = await tmux_manager.find_window_by_id(bound) + if w: + return bound, "" + session_manager.unbind_thread(user_id, private_tid) + + # Reuse a stable named window if it already exists. + existing = await tmux_manager.find_window_by_name(config.tmux_session_name) + if existing: + session_manager.bind_thread( + user_id, + private_tid, + existing.window_id, + window_name=existing.window_name, + ) + return existing.window_id, "" + + # Otherwise create one in current cwd. + success, message, created_wname, created_wid = await tmux_manager.create_window( + str(Path.cwd()), + window_name=config.tmux_session_name, + ) + if not success: + return None, message + + if config.provider == "codex": + await codex_session_mapper.sync_session_map() + await session_manager.wait_for_session_map_entry( + created_wid, + timeout=10.0 if config.provider == "codex" else 5.0, + ) + session_manager.bind_thread( + user_id, + private_tid, + created_wid, + window_name=created_wname, + ) + return created_wid, "" + + # --- Command handlers --- @@ -189,7 +244,7 @@ async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N if update.message: await safe_reply( update.message, - "🤖 *Claude Code Monitor*\n\n" + f"🤖 *{config.agent_name} Monitor*\n\n" "Each topic is a session. Create a new topic to start.", ) @@ -248,7 +303,7 @@ async def screenshot_command( async def unbind_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - """Unbind this topic from its Claude session without killing the window.""" + """Unbind this topic from its session without killing the window.""" user = update.effective_user if not user or not is_user_allowed(user.id): return @@ -256,29 +311,34 @@ async def unbind_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> return thread_id = _get_thread_id(update) - if thread_id is None: - await safe_reply(update.message, "❌ This command only works in a topic.") - return + bind_tid = thread_id + if bind_tid is None: + chat = update.effective_chat + if chat and chat.type == "private": + bind_tid = 0 + else: + await safe_reply(update.message, "❌ This command only works in a topic.") + return - wid = session_manager.get_window_for_thread(user.id, thread_id) + wid = session_manager.get_window_for_thread(user.id, bind_tid) if not wid: await safe_reply(update.message, "❌ No session bound to this topic.") return display = session_manager.get_display_name(wid) - session_manager.unbind_thread(user.id, thread_id) - await clear_topic_state(user.id, thread_id, context.bot, context.user_data) + session_manager.unbind_thread(user.id, bind_tid) + await clear_topic_state(user.id, bind_tid, context.bot, context.user_data) await safe_reply( update.message, f"✅ Topic unbound from window '{display}'.\n" - "The Claude session is still running in tmux.\n" + f"The {config.agent_name} session is still running in tmux.\n" "Send a message to bind to a new session.", ) async def esc_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - """Send Escape key to interrupt Claude.""" + """Send Escape key to interrupt the active agent.""" user = update.effective_user if not user or not is_user_allowed(user.id): return @@ -303,7 +363,14 @@ async def esc_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Non async def usage_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - """Fetch Claude Code usage stats from TUI and send to Telegram.""" + """Fetch usage stats from TUI and send to Telegram.""" + if not config.supports_usage_command: + if update.message: + await safe_reply( + update.message, + f"❌ /usage is not supported for provider '{config.provider}'.", + ) + return user = update.effective_user if not user or not is_user_allowed(user.id): return @@ -321,7 +388,7 @@ async def usage_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N await safe_reply(update.message, f"Window '{wid}' no longer exists.") return - # Send /usage command to Claude Code TUI + # Send /usage command to TUI await tmux_manager.send_keys(w.window_id, "/usage") # Wait for the modal to render await asyncio.sleep(2.0) @@ -486,7 +553,7 @@ async def topic_edited_handler( async def forward_command_handler( update: Update, context: ContextTypes.DEFAULT_TYPE ) -> None: - """Forward any non-bot command as a slash command to the active Claude Code session.""" + """Forward non-bot slash commands to the active session.""" user = update.effective_user if not user or not is_user_allowed(user.id): return @@ -505,7 +572,18 @@ async def forward_command_handler( cmd_text = update.message.text or "" # The full text is already a slash command like "/clear" or "/compact foo" cc_slash = cmd_text.split("@")[0] # strip bot mention + if not config.forward_slash_commands: + await safe_reply( + update.message, + f"❌ Slash command forwarding is disabled for provider '{config.provider}'.", + ) + return wid = session_manager.resolve_window_for_thread(user.id, thread_id) + if wid is None and chat and chat.type == "private": + wid, err = await _ensure_private_window_binding(user.id) + if wid is None: + await safe_reply(update.message, f"❌ {err or 'Failed to create session'}") + return if not wid: await safe_reply(update.message, "❌ No session bound to this topic.") return @@ -526,7 +604,7 @@ async def forward_command_handler( await safe_reply(update.message, f"⚡ [{display}] Sent: {cc_slash}") # If /clear command was sent, clear the session association # so we can detect the new session after first message - if cc_slash.strip().lower() == "/clear": + if config.provider == "claude" and cc_slash.strip().lower() == "/clear": logger.info("Clearing session for window %s after /clear", display) session_manager.clear_window_session(wid) @@ -551,7 +629,8 @@ async def unsupported_content_handler( logger.debug("Unsupported content from user %d", user.id) await safe_reply( update.message, - "⚠ Only text, photo, and voice messages are supported. Stickers, video, and other media cannot be forwarded to Claude Code.", + f"⚠ Only text, photo, and voice messages are supported. " + f"Other media cannot be forwarded to {config.agent_name}.", ) @@ -561,7 +640,7 @@ async def unsupported_content_handler( async def photo_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: - """Handle photos sent by the user: download and forward path to Claude Code.""" + """Handle photos sent by the user: download and forward file path.""" user = update.effective_user if not user or not is_user_allowed(user.id): if update.message: @@ -573,29 +652,37 @@ async def photo_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N chat = update.message.chat thread_id = _get_thread_id(update) + bind_tid = thread_id or 0 if chat.type in ("group", "supergroup") and thread_id is not None: session_manager.set_group_chat_id(user.id, thread_id, chat.id) - # Must be in a named topic - if thread_id is None: + is_private_chat = chat.type == "private" + + if thread_id is None and not is_private_chat: await safe_reply( update.message, "❌ Please use a named topic. Create a new topic to start a session.", ) return - wid = session_manager.get_window_for_thread(user.id, thread_id) - if wid is None: - await safe_reply( - update.message, - "❌ No session bound to this topic. Send a text message first to create one.", - ) - return + if is_private_chat: + wid, err = await _ensure_private_window_binding(user.id) + if wid is None: + await safe_reply(update.message, f"❌ {err or 'Failed to create session'}") + return + else: + wid = session_manager.resolve_window_for_thread(user.id, thread_id) + if wid is None: + await safe_reply( + update.message, + "❌ No session bound to this topic. Send a text message first to create one.", + ) + return w = await tmux_manager.find_window_by_id(wid) if not w: display = session_manager.get_display_name(wid) - session_manager.unbind_thread(user.id, thread_id) + session_manager.unbind_thread(user.id, bind_tid) await safe_reply( update.message, f"❌ Window '{display}' no longer exists. Binding removed.\n" @@ -612,7 +699,7 @@ async def photo_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N file_path = _IMAGES_DIR / filename await tg_file.download_to_drive(file_path) - # Build the message to send to Claude Code + # Build the message to send to the agent caption = update.message.caption or "" if caption: text_to_send = f"{caption}\n\n(image attached: {file_path})" @@ -628,7 +715,83 @@ async def photo_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N return # Confirm to user - await safe_reply(update.message, "📷 Image sent to Claude Code.") + await safe_reply(update.message, f"📷 Image sent to {config.agent_name}.") + + +async def voice_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + """Handle voice messages: transcribe via OpenAI and forward text to Claude Code.""" + user = update.effective_user + if not user or not is_user_allowed(user.id): + if update.message: + await safe_reply(update.message, "You are not authorized to use this bot.") + return + + if not update.message or not update.message.voice: + return + + if not config.openai_api_key: + await safe_reply( + update.message, + "⚠ Voice transcription requires an OpenAI API key.\n" + "Set `OPENAI_API_KEY` in your `.env` file and restart the bot.", + ) + return + + chat = update.message.chat + thread_id = _get_thread_id(update) + if chat.type in ("group", "supergroup") and thread_id is not None: + session_manager.set_group_chat_id(user.id, thread_id, chat.id) + + if thread_id is None: + await safe_reply( + update.message, + "❌ Please use a named topic. Create a new topic to start a session.", + ) + return + + wid = session_manager.get_window_for_thread(user.id, thread_id) + if wid is None: + await safe_reply( + update.message, + "❌ No session bound to this topic. Send a text message first to create one.", + ) + return + + w = await tmux_manager.find_window_by_id(wid) + if not w: + display = session_manager.get_display_name(wid) + session_manager.unbind_thread(user.id, thread_id) + await safe_reply( + update.message, + f"❌ Window '{display}' no longer exists. Binding removed.\n" + "Send a message to start a new session.", + ) + return + + # Download voice as in-memory bytes + voice_file = await update.message.voice.get_file() + ogg_data = bytes(await voice_file.download_as_bytearray()) + + # Transcribe + try: + text = await transcribe_voice(ogg_data) + except ValueError as e: + await safe_reply(update.message, f"⚠ {e}") + return + except Exception as e: + logger.error("Voice transcription failed: %s", e) + await safe_reply(update.message, f"⚠ Transcription failed: {e}") + return + + await update.message.chat.send_action(ChatAction.TYPING) + clear_status_msg_info(user.id, thread_id) + + success, message = await session_manager.send_to_window(wid, text) + if not success: + await safe_reply(update.message, f"❌ {message}") + return + + await safe_reply(update.message, f'🎤 "{text}"') async def voice_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: @@ -708,10 +871,10 @@ async def voice_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N # Active bash capture tasks: (user_id, thread_id) → asyncio.Task -_bash_capture_tasks: dict[tuple[int, int], asyncio.Task[None]] = {} +_bash_capture_tasks: dict[tuple[int, int | None], asyncio.Task[None]] = {} -def _cancel_bash_capture(user_id: int, thread_id: int) -> None: +def _cancel_bash_capture(user_id: int, thread_id: int | None) -> None: """Cancel any running bash capture for this topic.""" key = (user_id, thread_id) task = _bash_capture_tasks.pop(key, None) @@ -722,7 +885,7 @@ def _cancel_bash_capture(user_id: int, thread_id: int) -> None: async def _capture_bash_output( bot: Bot, user_id: int, - thread_id: int, + thread_id: int | None, window_id: str, command: str, ) -> None: @@ -763,11 +926,14 @@ async def _capture_bash_output( if msg_id is None: # First capture — send a new message + send_kwargs: dict[str, int] = {} + if thread_id is not None: + send_kwargs["message_thread_id"] = thread_id sent = await send_with_fallback( bot, chat_id, output, - message_thread_id=thread_id, + **send_kwargs, # type: ignore[arg-type] ) if sent: msg_id = sent.message_id @@ -810,11 +976,12 @@ async def text_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No return thread_id = _get_thread_id(update) + chat = update.effective_chat + is_private_chat = bool(chat and chat.type == "private") # Capture group chat_id for supergroup forum topic routing. # Required: Telegram Bot API needs group chat_id (not user_id) to send # messages with message_thread_id. Do NOT remove — see session.py docs. - chat = update.effective_chat if chat and chat.type in ("group", "supergroup"): session_manager.set_group_chat_id(user.id, thread_id, chat.id) @@ -869,15 +1036,21 @@ async def text_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No context.user_data.pop("_pending_thread_text", None) context.user_data.pop("_selected_path", None) - # Must be in a named topic - if thread_id is None: + # Must be in a named topic unless private-chat mode is enabled. + if thread_id is None and not is_private_chat: await safe_reply( update.message, "❌ Please use a named topic. Create a new topic to start a session.", ) return - wid = session_manager.get_window_for_thread(user.id, thread_id) + if is_private_chat: + wid, err = await _ensure_private_window_binding(user.id) + if wid is None: + await safe_reply(update.message, f"❌ {err or 'Failed to create session'}") + return + else: + wid = session_manager.resolve_window_for_thread(user.id, thread_id) if wid is None: # Unbound topic — check for unbound windows first all_windows = await tmux_manager.list_windows() @@ -939,7 +1112,7 @@ async def text_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No user.id, thread_id, ) - session_manager.unbind_thread(user.id, thread_id) + session_manager.unbind_thread(user.id, thread_id or 0) await safe_reply( update.message, f"❌ Window '{display}' no longer exists. Binding removed.\n" @@ -1723,9 +1896,14 @@ async def handle_new_message(msg: NewMessage, bot: Bot) -> None: Routes via thread_bindings to deliver to the correct topic. """ status = "complete" if msg.is_complete else "streaming" + trace_id = msg.trace_id or f"{msg.session_id}:0-0" + logger.info( - f"handle_new_message [{status}]: session={msg.session_id}, " - f"text_len={len(msg.text)}" + "handle_new_message [%s]: trace=%s session=%s text_len=%d", + status, + trace_id, + msg.session_id, + len(msg.text), ) # Find users whose thread-bound window matches this session @@ -1735,29 +1913,36 @@ async def handle_new_message(msg: NewMessage, bot: Bot) -> None: logger.info(f"No active users for session {msg.session_id}") return + async def _mark_window_read_offset(user_id: int, window_id: str) -> None: + state = session_manager.get_window_state(window_id) + file_path = state.file_path + if not file_path: + # Fallback for stale state + session = await session_manager.resolve_session_for_window(window_id) + if not session or not session.file_path: + return + file_path = session.file_path + try: + file_size = Path(file_path).stat().st_size + session_manager.update_user_window_offset(user_id, window_id, file_size) + except OSError: + pass + for user_id, wid, thread_id in active_users: + queue = get_message_queue(user_id) + # Handle interactive tools specially - capture terminal and send UI if msg.tool_name in INTERACTIVE_TOOL_NAMES and msg.content_type == "tool_use": # Mark interactive mode BEFORE sleeping so polling skips this window set_interactive_mode(user_id, wid, thread_id) # Flush pending messages (e.g. plan content) before sending interactive UI - queue = get_message_queue(user_id) if queue: await queue.join() - # Wait briefly for Claude Code to render the question UI + # Wait briefly for the agent UI to render await asyncio.sleep(0.3) handled = await handle_interactive_ui(bot, user_id, wid, thread_id) if handled: - # Update user's read offset - session = await session_manager.resolve_session_for_window(wid) - if session and session.file_path: - try: - file_size = Path(session.file_path).stat().st_size - session_manager.update_user_window_offset( - user_id, wid, file_size - ) - except OSError: - pass + await _mark_window_read_offset(user_id, wid) continue # Don't send the normal tool_use message else: # UI not rendered — clear the early-set mode @@ -1788,24 +1973,80 @@ async def handle_new_message(msg: NewMessage, bot: Bot) -> None: text=msg.text, thread_id=thread_id, image_data=msg.image_data, + session_id=msg.session_id, + trace_id=trace_id, + source_line_start=msg.source_line_start, + source_line_end=msg.source_line_end, + detected_at_monotonic=msg.detected_at_monotonic, + ) + + await _mark_window_read_offset(user_id, wid) + + +async def _announce_forward_links(bot: Bot, tunnels: list[PortTunnel]) -> None: + """Send and pin forward links for allowed users.""" + lines = [ + "🌐 Dev port forwarding is enabled.", + "", + ] + for tunnel in tunnels: + lines.append( + f"- `localhost:{tunnel.port}` → {tunnel.public_url} ({tunnel.provider})" + ) + text = "\n".join(lines) + + for user_id in sorted(config.allowed_users): + sent = await send_with_fallback(bot, user_id, text) + if sent is None: + logger.warning( + "Failed to announce forward links to user %d", user_id + ) + continue + try: + await bot.pin_chat_message( + chat_id=user_id, + message_id=sent.message_id, + disable_notification=True, + ) + _forward_pin_message_ids[user_id] = sent.message_id + logger.info("Pinned forward links message for user %d", user_id) + except Exception as e: + logger.warning( + "Failed to pin forward links message for user %d: %s", user_id, e ) - # Update user's read offset to current file position - # This marks these messages as "read" for this user - session = await session_manager.resolve_session_for_window(wid) - if session and session.file_path: - try: - file_size = Path(session.file_path).stat().st_size - session_manager.update_user_window_offset(user_id, wid, file_size) - except OSError: - pass + +async def _announce_forward_error(bot: Bot, error_text: str) -> None: + text = ( + "⚠ Failed to start dev port forwarding.\n\n" + f"{error_text}" + ) + for user_id in sorted(config.allowed_users): + await send_with_fallback(bot, user_id, text) + + +async def _run_port_forwarding(bot: Bot) -> None: + """Start forwarding in background and announce links/errors.""" + global _port_forward_manager + manager = PortForwardManager(config.forward_ports) + try: + tunnels = await manager.start() + _port_forward_manager = manager + await _announce_forward_links(bot, tunnels) + except asyncio.CancelledError: + await manager.stop() + raise + except Exception as e: + logger.error("Forwarding startup failed: %s", e) + await _announce_forward_error(bot, str(e)) + await manager.stop() # --- App lifecycle --- async def post_init(application: Application) -> None: - global session_monitor, _status_poll_task + global session_monitor, _status_poll_task, _port_forward_task await application.bot.delete_my_commands() @@ -1813,17 +2054,22 @@ async def post_init(application: Application) -> None: BotCommand("start", "Show welcome message"), BotCommand("history", "Message history for this topic"), BotCommand("screenshot", "Terminal screenshot with control keys"), - BotCommand("esc", "Send Escape to interrupt Claude"), + BotCommand("esc", "Send Escape to interrupt active session"), BotCommand("kill", "Kill session and delete topic"), BotCommand("unbind", "Unbind topic from session (keeps window running)"), - BotCommand("usage", "Show Claude Code usage remaining"), ] - # Add Claude Code slash commands - for cmd_name, desc in CC_COMMANDS.items(): + if config.supports_usage_command: + bot_commands.append(BotCommand("usage", "Show usage remaining")) + # Add provider slash commands + for cmd_name, desc in _provider_menu_commands().items(): bot_commands.append(BotCommand(cmd_name, desc)) await application.bot.set_my_commands(bot_commands) + if config.forward_ports: + _port_forward_task = asyncio.create_task(_run_port_forwarding(application.bot)) + logger.info("Port forwarding task started for ports: %s", config.forward_ports) + # Re-resolve stale window IDs from persisted state against live tmux windows await session_manager.resolve_stale_ids() @@ -1870,11 +2116,48 @@ async def post_shutdown(application: Application) -> None: await shutdown_workers() if session_monitor: - session_monitor.stop() + await session_monitor.stop() logger.info("Session monitor stopped") await close_transcribe_client() +async def post_stop(application: Application) -> None: + """Run before shutdown while Telegram HTTP client is still active.""" + global _port_forward_task, _port_forward_manager, _forward_pin_message_ids + + if _port_forward_task: + if not _port_forward_task.done(): + _port_forward_task.cancel() + try: + await _port_forward_task + except asyncio.CancelledError: + pass + _port_forward_task = None + + if _port_forward_manager: + await _port_forward_manager.stop() + _port_forward_manager = None + logger.info("Port forwarding stopped") + + for chat_id, message_id in list(_forward_pin_message_ids.items()): + try: + await application.bot.unpin_chat_message( + chat_id=chat_id, + message_id=message_id, + ) + logger.info( + "Unpinned forward links message for user %d (message_id=%d)", + chat_id, + message_id, + ) + except Exception as e: + logger.warning( + "Failed to unpin forward links message for user %d (message_id=%d): %s", + chat_id, + message_id, + e, + ) + _forward_pin_message_ids = {} def create_bot() -> Application: application = ( @@ -1882,6 +2165,7 @@ def create_bot() -> Application: .token(config.telegram_bot_token) .rate_limiter(AIORateLimiter(max_retries=5)) .post_init(post_init) + .post_stop(post_stop) .post_shutdown(post_shutdown) .build() ) @@ -1891,7 +2175,8 @@ def create_bot() -> Application: application.add_handler(CommandHandler("screenshot", screenshot_command)) application.add_handler(CommandHandler("esc", esc_command)) application.add_handler(CommandHandler("unbind", unbind_command)) - application.add_handler(CommandHandler("usage", usage_command)) + if config.supports_usage_command: + application.add_handler(CommandHandler("usage", usage_command)) application.add_handler(CallbackQueryHandler(callback_handler)) # Topic closed event — auto-kill associated window application.add_handler( @@ -1907,12 +2192,12 @@ def create_bot() -> Application: topic_edited_handler, ) ) - # Forward any other /command to Claude Code + # Forward any other /command to the active session application.add_handler(MessageHandler(filters.COMMAND, forward_command_handler)) application.add_handler( MessageHandler(filters.TEXT & ~filters.COMMAND, text_handler) ) - # Photos: download and forward file path to Claude Code + # Photos: download and forward file path to the active session application.add_handler(MessageHandler(filters.PHOTO, photo_handler)) # Voice: transcribe via OpenAI and forward text to Claude Code application.add_handler(MessageHandler(filters.VOICE, voice_handler)) diff --git a/src/ccbot/codex_mapper.py b/src/ccbot/codex_mapper.py new file mode 100644 index 00000000..b78d8e9f --- /dev/null +++ b/src/ccbot/codex_mapper.py @@ -0,0 +1,281 @@ +"""Codex session mapper: tmux windows -> Codex rollout sessions. + +Codex does not expose Claude-style SessionStart hooks, so this module +discovers sessions from ~/.codex/sessions/**/rollout-*.jsonl and writes +window mappings into session_map.json in the existing ccbot format. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path + +from .config import config +from .tmux_manager import tmux_manager +from .utils import atomic_write_json + +logger = logging.getLogger(__name__) + + +@dataclass +class CodexSessionMeta: + """Metadata extracted from a Codex rollout file.""" + + session_id: str + cwd: str + file_path: Path + started_ts: float + file_mtime: float + + +def _norm_path(path: str) -> str: + try: + return str(Path(path).resolve()) + except (OSError, ValueError): + return path + + +def _parse_iso_ts(ts: str) -> float: + if not ts: + return 0.0 + # Codex timestamps are ISO8601 with trailing Z. + if ts.endswith("Z"): + ts = ts[:-1] + "+00:00" + try: + return datetime.fromisoformat(ts).timestamp() + except ValueError: + return 0.0 + + +class CodexSessionMapper: + """Maps live tmux windows to Codex session IDs.""" + + def __init__( + self, + sessions_root: Path | None = None, + session_map_file: Path | None = None, + ) -> None: + self.sessions_root = ( + sessions_root if sessions_root is not None else config.codex_sessions_path + ) + self.session_map_file = ( + session_map_file + if session_map_file is not None + else config.session_map_file + ) + # file -> (mtime, size, parsed_meta_or_none) + self._meta_cache: dict[str, tuple[float, int, CodexSessionMeta | None]] = {} + + def _read_rollout_meta( + self, file_path: Path, file_mtime: float + ) -> CodexSessionMeta | None: + try: + with open(file_path, "r", encoding="utf-8") as f: + first = f.readline().strip() + except OSError: + return None + + if not first: + return None + try: + data = json.loads(first) + except json.JSONDecodeError: + return None + if data.get("type") != "session_meta": + return None + payload = data.get("payload", {}) + if not isinstance(payload, dict): + return None + + session_id = payload.get("id", "") + cwd = payload.get("cwd", "") + started_at = payload.get("timestamp", "") + if not session_id or not cwd: + return None + + return CodexSessionMeta( + session_id=session_id, + cwd=_norm_path(cwd), + file_path=file_path, + started_ts=_parse_iso_ts(started_at), + file_mtime=file_mtime, + ) + + def _scan_sessions(self) -> list[CodexSessionMeta]: + if not self.sessions_root.exists(): + return [] + + metas: list[CodexSessionMeta] = [] + seen_files: set[str] = set() + for file_path in self.sessions_root.rglob("rollout-*.jsonl"): + key = str(file_path) + seen_files.add(key) + try: + st = file_path.stat() + except OSError: + continue + cached = self._meta_cache.get(key) + if cached and cached[0] == st.st_mtime and cached[1] == st.st_size: + meta = cached[2] + else: + meta = self._read_rollout_meta(file_path, st.st_mtime) + self._meta_cache[key] = (st.st_mtime, st.st_size, meta) + if meta: + # Refresh mtime from current stat for cache-hit case + meta.file_mtime = st.st_mtime + metas.append(meta) + + # Drop deleted files from cache + deleted = [k for k in self._meta_cache if k not in seen_files] + for key in deleted: + del self._meta_cache[key] + + return metas + + async def sync_session_map(self) -> bool: + """Sync codex window->session mappings into session_map.json.""" + windows = await tmux_manager.list_windows() + metas = await asyncio.to_thread(self._scan_sessions) + + if self.session_map_file.exists(): + try: + session_map = json.loads(self.session_map_file.read_text()) + except (json.JSONDecodeError, OSError): + session_map = {} + else: + session_map = {} + + if not isinstance(session_map, dict): + session_map = {} + + prefix = f"{config.tmux_session_name}:" + by_cwd: dict[str, list[CodexSessionMeta]] = {} + for meta in metas: + by_cwd.setdefault(meta.cwd, []).append(meta) + for cwd in by_cwd: + by_cwd[cwd].sort( + key=lambda m: (m.file_mtime, m.started_ts), + reverse=True, + ) + all_metas = sorted( + metas, + key=lambda m: (m.file_mtime, m.started_ts), + reverse=True, + ) + preferred_sid = (config.codex_resume_session_id or "").strip() + preferred_meta: CodexSessionMeta | None = None + if preferred_sid: + for meta in all_metas: + if meta.session_id == preferred_sid: + preferred_meta = meta + break + + live_wids = {w.window_id for w in windows} + assigned_session_ids: set[str] = set() + next_entries: dict[str, dict[str, str]] = {} + + for w in windows: + key = f"{prefix}{w.window_id}" + existing = session_map.get(key, {}) + if not isinstance(existing, dict): + existing = {} + existing_provider = existing.get("provider", "claude") + + # Map windows that appear active, or were mapped as codex before. + # Codex often appears as "node" in tmux pane_current_command. + pane_cmd = (w.pane_current_command or "").lower() + if existing_provider != "codex" and pane_cmd in ( + "", + "bash", + "sh", + "zsh", + "fish", + ): + continue + + norm_cwd = _norm_path(w.cwd) + candidates = by_cwd.get(norm_cwd, []) + + chosen: CodexSessionMeta | None = None + if ( + preferred_meta is not None + and w.window_name == config.tmux_session_name + and preferred_meta.session_id not in assigned_session_ids + ): + chosen = preferred_meta + + existing_sid = existing.get("session_id", "") + if chosen is None and existing_sid: + for meta in all_metas: + if meta.session_id == existing_sid: + chosen = meta + break + + if chosen is None: + for meta in candidates: + if meta.session_id in assigned_session_ids: + continue + chosen = meta + break + + # Fallback: choose the newest unassigned rollout across all projects. + # This handles "codex resume " where pane cwd can differ from session cwd. + if chosen is None: + for meta in all_metas: + if meta.session_id in assigned_session_ids: + continue + chosen = meta + break + + # Keep previous mapping if file still exists and no better candidate. + if chosen is None and existing_provider == "codex" and existing_sid: + existing_fp = Path(existing.get("file_path", "")) + if existing_fp.exists(): + next_entries[key] = existing + assigned_session_ids.add(existing_sid) + continue + + if chosen is None: + continue + + assigned_session_ids.add(chosen.session_id) + next_entries[key] = { + "session_id": chosen.session_id, + "cwd": norm_cwd, + "window_name": w.window_name, + "provider": "codex", + "file_path": str(chosen.file_path), + } + + changed = False + # Remove stale codex entries for this tmux session. + stale_keys: list[str] = [] + for key, info in session_map.items(): + if not key.startswith(prefix): + continue + if key[len(prefix) :] not in live_wids: + stale_keys.append(key) + continue + if isinstance(info, dict) and info.get("provider") == "codex": + if key not in next_entries: + stale_keys.append(key) + for key in stale_keys: + del session_map[key] + changed = True + + for key, info in next_entries.items(): + if session_map.get(key) != info: + session_map[key] = info + changed = True + + if changed: + atomic_write_json(self.session_map_file, session_map) + logger.debug("Codex session_map updated (%d entries)", len(next_entries)) + return changed + + +codex_session_mapper = CodexSessionMapper() diff --git a/src/ccbot/config.py b/src/ccbot/config.py index ca3d6744..6d9dca00 100644 --- a/src/ccbot/config.py +++ b/src/ccbot/config.py @@ -10,6 +10,7 @@ import logging import os +import re from pathlib import Path from dotenv import load_dotenv @@ -22,6 +23,17 @@ SENSITIVE_ENV_VARS = {"TELEGRAM_BOT_TOKEN", "ALLOWED_USERS", "OPENAI_API_KEY"} +def _extract_codex_resume_session_id(command: str) -> str: + """Extract session id from a command containing `resume `.""" + if not command: + return "" + + match = re.search(r"(?:^|\s)resume\s+([0-9a-fA-F-]{8,})\b", command) + if match: + return match.group(1) + return "" + + class Config: """Application configuration loaded from environment variables.""" @@ -61,8 +73,37 @@ def __init__(self) -> None: self.tmux_session_name = os.getenv("TMUX_SESSION_NAME", "ccbot") self.tmux_main_window_name = "__main__" - # Claude command to run in new windows - self.claude_command = os.getenv("CLAUDE_COMMAND", "claude") + provider = os.getenv("CCBOT_PROVIDER", "claude").strip().lower() + if provider not in ("claude", "codex"): + raise ValueError("CCBOT_PROVIDER must be one of: claude, codex") + self.provider = provider + + # Agent command to run in new windows. + # Backward compatible: + # - CLAUDE_COMMAND still works for provider=claude + # - CCBOT_AGENT_COMMAND overrides provider defaults + default_cmd = "codex" if self.provider == "codex" else "claude" + self.agent_command = os.getenv("CCBOT_AGENT_COMMAND") or os.getenv( + "CLAUDE_COMMAND", default_cmd + ) + self.codex_resume_session_id = ( + _extract_codex_resume_session_id(self.agent_command) + if self.provider == "codex" + else "" + ) + # Keep old attribute name for compatibility in the existing code path. + self.claude_command = self.agent_command + + # Provider capabilities and UI labels + self.agent_name = "Codex CLI" if self.provider == "codex" else "Claude Code" + self.supports_usage_command = self.provider == "claude" + self.supports_claude_interactive_ui = self.provider == "claude" + # Forward unknown slash commands for both providers by default: + # e.g. /status, /permissions, /clear, /compact. + slash_default = "true" + self.forward_slash_commands = ( + os.getenv("CCBOT_FORWARD_SLASH", slash_default).lower() == "true" + ) # All state files live under config_dir self.state_file = self.config_dir / "state.json" @@ -82,6 +123,19 @@ def __init__(self) -> None: else: self.claude_projects_path = Path.home() / ".claude" / "projects" + # Codex rollout logs root + self.codex_sessions_path = Path( + os.getenv( + "CCBOT_CODEX_SESSIONS_PATH", str(Path.home() / ".codex" / "sessions") + ) + ) + self.provider_data_root = ( + self.codex_sessions_path + if self.provider == "codex" + else self.claude_projects_path + ) + self.provider_supports_hook = self.provider == "claude" + self.monitor_poll_interval = float(os.getenv("MONITOR_POLL_INTERVAL", "2.0")) # Display user messages in history and real-time notifications @@ -99,6 +153,26 @@ def __init__(self) -> None: "OPENAI_BASE_URL", "https://api.openai.com/v1" ) + # Optional local port forwarding announcement on startup + # Format: "3000" or "3000,5173" + self.forward_ports: list[int] = [] + forward_ports_raw = os.getenv("CCBOT_FORWARD_PORTS", "").strip() + if forward_ports_raw: + for token in forward_ports_raw.split(","): + part = token.strip() + if not part: + continue + try: + port = int(part) + except ValueError as e: + raise ValueError( + f"CCBOT_FORWARD_PORTS contains non-numeric port: {part}" + ) from e + if port < 1 or port > 65535: + raise ValueError( + f"CCBOT_FORWARD_PORTS contains invalid port: {port}" + ) + self.forward_ports.append(port) # Scrub sensitive vars from os.environ so child processes never inherit them. # Values are already captured in Config attributes above. for var in SENSITIVE_ENV_VARS: @@ -106,12 +180,13 @@ def __init__(self) -> None: logger.debug( "Config initialized: dir=%s, token=%s..., allowed_users=%d, " - "tmux_session=%s, claude_projects_path=%s", + "provider=%s, tmux_session=%s, data_root=%s", self.config_dir, self.telegram_bot_token[:8], len(self.allowed_users), + self.provider, self.tmux_session_name, - self.claude_projects_path, + self.provider_data_root, ) def is_user_allowed(self, user_id: int) -> bool: diff --git a/src/ccbot/handlers/message_queue.py b/src/ccbot/handlers/message_queue.py index bdd28038..74233472 100644 --- a/src/ccbot/handlers/message_queue.py +++ b/src/ccbot/handlers/message_queue.py @@ -64,6 +64,12 @@ class MessageTask: content_type: str = "text" thread_id: int | None = None # Telegram topic thread_id for targeted send image_data: list[tuple[str, bytes]] | None = None # From tool_result images + session_id: str = "" + trace_id: str = "" + source_line_start: int = 0 + source_line_end: int = 0 + trace_detected_at: float = 0.0 + created_at: float = field(default_factory=time.monotonic) # Per-user message queues and worker tasks @@ -81,10 +87,16 @@ class MessageTask: # Flood control: user_id -> monotonic time when ban expires _flood_until: dict[int, float] = {} -# Max seconds to wait for flood control before dropping tasks +# Max seconds to wait for flood control before pausing delivery FLOOD_CONTROL_MAX_WAIT = 10 +def _trace_age_seconds(now: float, detected_at: float) -> float: + if detected_at <= 0: + return 0.0 + return max(0.0, now - detected_at) + + def get_message_queue(user_id: int) -> asyncio.Queue[MessageTask] | None: """Get the message queue for a user (if exists).""" return _message_queues.get(user_id) @@ -192,6 +204,12 @@ async def _merge_content_tasks( tool_use_id=first.tool_use_id, content_type=first.content_type, thread_id=first.thread_id, + session_id=first.session_id, + trace_id=first.trace_id, + source_line_start=first.source_line_start, + source_line_end=first.source_line_end, + trace_detected_at=first.trace_detected_at, + created_at=first.created_at, ), merge_count, ) @@ -207,24 +225,35 @@ async def _message_queue_worker(bot: Bot, user_id: int) -> None: try: task = await queue.get() try: - # Flood control: drop status, wait for content - flood_end = _flood_until.get(user_id, 0) - if flood_end > 0: + # Flood control: drop status, wait for content in short chunks. + drop_current_task = False + while True: + flood_end = _flood_until.get(user_id, 0) + if flood_end <= 0: + break + remaining = flood_end - time.monotonic() - if remaining > 0: - if task.task_type != "content": - # Status is ephemeral — safe to drop - continue - # Content is actual Claude output — wait then send - logger.debug( - "Flood controlled: waiting %.0fs for content (user %d)", - remaining, - user_id, - ) - await asyncio.sleep(remaining) - # Ban expired - _flood_until.pop(user_id, None) - logger.info("Flood control lifted for user %d", user_id) + if remaining <= 0: + _flood_until.pop(user_id, None) + logger.info("Flood control lifted for user %d", user_id) + break + + if task.task_type != "content": + # Status is ephemeral — safe to drop + drop_current_task = True + break + + # Sleep in short chunks so queue can react promptly. + sleep_for = min(remaining, FLOOD_CONTROL_MAX_WAIT) + logger.debug( + "Flood controlled: waiting %.0fs for content (user %d)", + sleep_for, + user_id, + ) + await asyncio.sleep(sleep_for) + + if drop_current_task: + continue if task.task_type == "content": # Try to merge consecutive content tasks @@ -247,6 +276,16 @@ async def _message_queue_worker(bot: Bot, user_id: int) -> None: if isinstance(e.retry_after, int) else int(e.retry_after.total_seconds()) ) + # Status tasks are ephemeral and should never force long + # per-user flood pauses that delay real content delivery. + if task.task_type != "content": + logger.warning( + "RetryAfter=%ds for %s task (user %d), dropping task", + retry_secs, + task.task_type, + user_id, + ) + continue if retry_secs > FLOOD_CONTROL_MAX_WAIT: _flood_until[user_id] = time.monotonic() + retry_secs logger.warning( @@ -275,7 +314,7 @@ async def _message_queue_worker(bot: Bot, user_id: int) -> None: def _send_kwargs(thread_id: int | None) -> dict[str, int]: """Build message_thread_id kwargs for bot.send_message().""" - if thread_id is not None: + if thread_id is not None and thread_id > 0: return {"message_thread_id": thread_id} return {} @@ -302,6 +341,7 @@ async def _process_content_task(bot: Bot, user_id: int, task: MessageTask) -> No wid = task.window_id or "" tid = task.thread_id or 0 chat_id = session_manager.resolve_chat_id(user_id, task.thread_id) + trace_id = task.trace_id or f"{task.session_id}:0-0" # 1. Handle tool_result editing (merged parts are edited together) if task.content_type == "tool_result" and task.tool_use_id: @@ -322,6 +362,18 @@ async def _process_content_task(bot: Bot, user_id: int, task: MessageTask) -> No ) await _send_task_images(bot, chat_id, task) await _check_and_send_status(bot, user_id, wid, task.thread_id) + now = time.monotonic() + logger.info( + "LineTrace out: trace=%s user=%d action=edit content_type=%s message_id=%d line=%d-%d trace_age=%.3fs queue_wait=%.3fs", + trace_id, + user_id, + task.content_type, + edit_msg_id, + task.source_line_start, + task.source_line_end, + _trace_age_seconds(now, task.trace_detected_at), + max(0.0, now - task.created_at), + ) return except RetryAfter: raise @@ -337,6 +389,18 @@ async def _process_content_task(bot: Bot, user_id: int, task: MessageTask) -> No ) await _send_task_images(bot, chat_id, task) await _check_and_send_status(bot, user_id, wid, task.thread_id) + now = time.monotonic() + logger.info( + "LineTrace out: trace=%s user=%d action=edit_plain content_type=%s message_id=%d line=%d-%d trace_age=%.3fs queue_wait=%.3fs", + trace_id, + user_id, + task.content_type, + edit_msg_id, + task.source_line_start, + task.source_line_end, + _trace_age_seconds(now, task.trace_detected_at), + max(0.0, now - task.created_at), + ) return except RetryAfter: raise @@ -347,6 +411,7 @@ async def _process_content_task(bot: Bot, user_id: int, task: MessageTask) -> No # 2. Send content messages, converting status message to first content part first_part = True last_msg_id: int | None = None + sent_msg_ids: list[int] = [] for part in task.parts: sent = None @@ -362,6 +427,7 @@ async def _process_content_task(bot: Bot, user_id: int, task: MessageTask) -> No ) if converted_msg_id is not None: last_msg_id = converted_msg_id + sent_msg_ids.append(converted_msg_id) continue sent = await send_with_fallback( @@ -373,6 +439,7 @@ async def _process_content_task(bot: Bot, user_id: int, task: MessageTask) -> No if sent: last_msg_id = sent.message_id + sent_msg_ids.append(sent.message_id) # 3. Record tool_use message ID for later editing if last_msg_id and task.tool_use_id and task.content_type == "tool_use": @@ -384,6 +451,35 @@ async def _process_content_task(bot: Bot, user_id: int, task: MessageTask) -> No # 5. After content, check and send status await _check_and_send_status(bot, user_id, wid, task.thread_id) + if sent_msg_ids: + ids = ",".join(str(i) for i in sent_msg_ids) + now = time.monotonic() + logger.info( + "LineTrace out: trace=%s user=%d action=send content_type=%s tg_message_ids=%s parts=%d line=%d-%d trace_age=%.3fs queue_wait=%.3fs", + trace_id, + user_id, + task.content_type, + ids, + len(task.parts), + task.source_line_start, + task.source_line_end, + _trace_age_seconds(now, task.trace_detected_at), + max(0.0, now - task.created_at), + ) + else: + now = time.monotonic() + logger.warning( + "LineTrace out: trace=%s user=%d action=send_none content_type=%s parts=%d line=%d-%d trace_age=%.3fs queue_wait=%.3fs", + trace_id, + user_id, + task.content_type, + len(task.parts), + task.source_line_start, + task.source_line_end, + _trace_age_seconds(now, task.trace_detected_at), + max(0.0, now - task.created_at), + ) + async def _convert_status_to_content( bot: Bot, @@ -601,15 +697,15 @@ async def enqueue_content_message( text: str | None = None, thread_id: int | None = None, image_data: list[tuple[str, bytes]] | None = None, + session_id: str = "", + trace_id: str = "", + source_line_start: int = 0, + source_line_end: int = 0, + detected_at_monotonic: float = 0.0, ) -> None: """Enqueue a content message task.""" - logger.debug( - "Enqueue content: user=%d, window_id=%s, content_type=%s", - user_id, - window_id, - content_type, - ) queue = get_or_create_queue(bot, user_id) + queue_before = queue.qsize() task = MessageTask( task_type="content", @@ -620,8 +716,23 @@ async def enqueue_content_message( content_type=content_type, thread_id=thread_id, image_data=image_data, + session_id=session_id, + trace_id=trace_id, + source_line_start=source_line_start, + source_line_end=source_line_end, + trace_detected_at=detected_at_monotonic, ) queue.put_nowait(task) + now = time.monotonic() + logger.info( + "LineTrace enqueue: trace=%s user=%d content_type=%s queue=%d->%d trace_age=%.3fs", + trace_id or "-", + user_id, + content_type, + queue_before, + queue.qsize(), + _trace_age_seconds(now, detected_at_monotonic), + ) async def enqueue_status_update( diff --git a/src/ccbot/handlers/status_polling.py b/src/ccbot/handlers/status_polling.py index c4de1c6e..e6c01ee6 100644 --- a/src/ccbot/handlers/status_polling.py +++ b/src/ccbot/handlers/status_polling.py @@ -23,6 +23,7 @@ from telegram import Bot from telegram.error import BadRequest +from ..config import config from ..session import session_manager from ..terminal_parser import is_interactive_ui, parse_status_line from ..tmux_manager import tmux_manager @@ -76,7 +77,9 @@ async def update_status_message( interactive_window = get_interactive_window(user_id, thread_id) should_check_new_ui = True - if interactive_window == window_id: + supports_interactive_ui = config.supports_claude_interactive_ui + + if supports_interactive_ui and interactive_window == window_id: # User is in interactive mode for THIS window if is_interactive_ui(pane_text): # Interactive UI still showing — skip status update (user is interacting) @@ -85,14 +88,13 @@ async def update_status_message( # Don't re-check for new UI this cycle (the old one just disappeared). await clear_interactive_msg(user_id, bot, thread_id) should_check_new_ui = False - elif interactive_window is not None: + elif supports_interactive_ui and interactive_window is not None: # User is in interactive mode for a DIFFERENT window (window switched) # Clear stale interactive mode await clear_interactive_msg(user_id, bot, thread_id) # Check for permission prompt (interactive UI not triggered via JSONL) - # ALWAYS check UI, regardless of skip_status - if should_check_new_ui and is_interactive_ui(pane_text): + if supports_interactive_ui and should_check_new_ui and is_interactive_ui(pane_text): logger.debug( "Interactive UI detected in polling (user=%d, window=%s, thread=%s)", user_id, @@ -132,6 +134,9 @@ async def status_poll_loop(bot: Bot) -> None: for user_id, thread_id, wid in list( session_manager.iter_thread_bindings() ): + # Private chat binding (synthetic thread_id=0): no topic probe. + if thread_id <= 0: + continue try: await bot.unpin_all_forum_topic_messages( chat_id=session_manager.resolve_chat_id(user_id, thread_id), @@ -190,7 +195,7 @@ async def status_poll_loop(bot: Bot) -> None: bot, user_id, wid, - thread_id=thread_id, + thread_id=thread_id or None, skip_status=skip_status, ) except Exception as e: diff --git a/src/ccbot/main.py b/src/ccbot/main.py index dabd3fd7..a899b502 100644 --- a/src/ccbot/main.py +++ b/src/ccbot/main.py @@ -2,14 +2,48 @@ Handles two execution modes: 1. `ccbot hook` — delegates to hook.hook_main() for Claude Code hook processing. - 2. Default — configures logging, initializes tmux session, and starts the + 2. `ccbot codex-map` — updates session_map.json for Codex rollout sessions. + 3. Default — configures logging, initializes tmux session, and starts the Telegram bot polling loop via bot.create_bot(). """ +import asyncio +import argparse import logging +import os import sys +def _parse_forward_ports(args: list[str]) -> list[int]: + parser = argparse.ArgumentParser( + prog="ccbot", + description="Telegram monitor for AI CLI sessions", + ) + parser.add_argument( + "--forward", + action="append", + default=[], + metavar="PORT[,PORT...]", + help="Forward local port(s) to public URL and announce in Telegram", + ) + parsed = parser.parse_args(args) + + ports: list[int] = [] + for group in parsed.forward: + for token in group.split(","): + part = token.strip() + if not part: + continue + try: + port = int(part) + except ValueError as e: + raise SystemExit(f"Invalid --forward value: {part}") from e + if port < 1 or port > 65535: + raise SystemExit(f"Invalid --forward port: {port}") + ports.append(port) + return ports + + def main() -> None: """Main entry point.""" if len(sys.argv) > 1 and sys.argv[1] == "hook": @@ -18,6 +52,17 @@ def main() -> None: hook_main() return + if len(sys.argv) > 1 and sys.argv[1] == "codex-map": + from .codex_mapper import codex_session_mapper + + changed = asyncio.run(codex_session_mapper.sync_session_map()) + print("updated" if changed else "no changes") + return + + forward_ports = _parse_forward_ports(sys.argv[1:]) + if forward_ports: + os.environ["CCBOT_FORWARD_PORTS"] = ",".join(str(p) for p in forward_ports) + logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.WARNING, @@ -48,7 +93,8 @@ def main() -> None: from .tmux_manager import tmux_manager logger.info("Allowed users: %s", config.allowed_users) - logger.info("Claude projects path: %s", config.claude_projects_path) + logger.info("Provider: %s", config.provider) + logger.info("Provider data root: %s", config.provider_data_root) # Ensure tmux session exists session = tmux_manager.get_or_create_session() diff --git a/src/ccbot/port_forward.py b/src/ccbot/port_forward.py new file mode 100644 index 00000000..17f3a06c --- /dev/null +++ b/src/ccbot/port_forward.py @@ -0,0 +1,231 @@ +"""Port forwarding helpers for exposing local dev services. + +Starts public tunnels for local ports using available CLI tools +(`ngrok` preferred, `cloudflared` fallback, `ssh`/localhost.run as last resort). +""" + +import asyncio +import logging +import re +import shutil +from dataclasses import dataclass + +logger = logging.getLogger(__name__) + +_NGROK_URL_RE = re.compile( + r"https://[A-Za-z0-9.-]+\.(?:ngrok-free\.app|ngrok\.app|ngrok\.io)" +) +_CF_URL_RE = re.compile(r"https://[A-Za-z0-9-]+\.trycloudflare\.com") +_LOCALHOST_RUN_URL_RE = re.compile( + r"https://(?!admin(?:\.|$))[A-Za-z0-9-]{8,}\.(?:localhost\.run|lhr\.life)" +) + + +@dataclass +class PortTunnel: + port: int + public_url: str + provider: str + process: asyncio.subprocess.Process + + +class PortForwardManager: + """Manage public tunnels for local ports.""" + + def __init__(self, ports: list[int]) -> None: + self.ports = ports + self.tunnels: list[PortTunnel] = [] + + async def start(self) -> list[PortTunnel]: + for port in self.ports: + tunnel = await self._start_port(port) + self.tunnels.append(tunnel) + logger.info( + "Forward tunnel ready: provider=%s port=%d url=%s", + tunnel.provider, + tunnel.port, + tunnel.public_url, + ) + return list(self.tunnels) + + async def stop(self) -> None: + for tunnel in self.tunnels: + proc = tunnel.process + if proc.returncode is not None: + continue + proc.terminate() + try: + await asyncio.wait_for(proc.wait(), timeout=5.0) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + except ProcessLookupError: + pass + self.tunnels.clear() + + async def _start_port(self, port: int) -> PortTunnel: + providers: list[str] = [] + if shutil.which("ngrok"): + providers.append("ngrok") + if shutil.which("cloudflared"): + providers.append("cloudflared") + if shutil.which("ssh"): + providers.append("localhost.run") + if not providers: + raise RuntimeError( + "No tunnel provider found. Install ngrok/cloudflared or ensure ssh is available." + ) + + errors: list[str] = [] + for provider in providers: + try: + if provider == "ngrok": + return await self._start_ngrok(port) + if provider == "cloudflared": + return await self._start_cloudflared(port) + return await self._start_localhost_run(port) + except RuntimeError as e: + errors.append(f"{provider}: {e}") + logger.warning("Tunnel provider failed for port %d: %s", port, e) + continue + + raise RuntimeError( + f"Failed to create tunnel for port {port}. " + "; ".join(errors) + ) + + async def _start_ngrok(self, port: int) -> PortTunnel: + cmd = [ + "ngrok", + "http", + f"http://127.0.0.1:{port}", + "--log=stdout", + "--log-format=json", + ] + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + url = await self._wait_for_url( + proc=proc, + provider="ngrok", + port=port, + regex=_NGROK_URL_RE, + timeout_seconds=30.0, + ) + return PortTunnel(port=port, public_url=url, provider="ngrok", process=proc) + + async def _start_cloudflared(self, port: int) -> PortTunnel: + cmd = [ + "cloudflared", + "tunnel", + "--url", + f"http://127.0.0.1:{port}", + "--no-autoupdate", + ] + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + url = await self._wait_for_url( + proc=proc, + provider="cloudflared", + port=port, + regex=_CF_URL_RE, + timeout_seconds=40.0, + ) + return PortTunnel( + port=port, public_url=url, provider="cloudflared", process=proc + ) + + async def _start_localhost_run(self, port: int) -> PortTunnel: + cmd = [ + "ssh", + "-o", + "StrictHostKeyChecking=no", + "-o", + "UserKnownHostsFile=/dev/null", + "-o", + "ExitOnForwardFailure=yes", + "-o", + "ServerAliveInterval=30", + "-o", + "ServerAliveCountMax=3", + "-R", + f"80:127.0.0.1:{port}", + "nokey@localhost.run", + ] + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + url = await self._wait_for_url( + proc=proc, + provider="localhost.run", + port=port, + regex=_LOCALHOST_RUN_URL_RE, + timeout_seconds=45.0, + ) + return PortTunnel( + port=port, + public_url=url, + provider="localhost.run", + process=proc, + ) + + async def _wait_for_url( + self, + *, + proc: asyncio.subprocess.Process, + provider: str, + port: int, + regex: re.Pattern[str], + timeout_seconds: float, + ) -> str: + if proc.stdout is None: + raise RuntimeError(f"{provider} stdout is unavailable") + + deadline = asyncio.get_event_loop().time() + timeout_seconds + tail: list[str] = [] + + while True: + if proc.returncode is not None: + break + remaining = deadline - asyncio.get_event_loop().time() + if remaining <= 0: + break + try: + line = await asyncio.wait_for(proc.stdout.readline(), timeout=remaining) + except asyncio.TimeoutError: + break + if not line: + continue + text = line.decode("utf-8", errors="replace").strip() + if text: + tail.append(text) + if len(tail) > 20: + tail = tail[-20:] + m = regex.search(text) + if m: + return m.group(0) + + # Cleanup failed process + if proc.returncode is None: + proc.terminate() + try: + await asyncio.wait_for(proc.wait(), timeout=3.0) + except asyncio.TimeoutError: + proc.kill() + await proc.wait() + + reason = ( + f"process exited with code {proc.returncode}" + if proc.returncode is not None + else "startup timeout" + ) + sample = " | ".join(tail[-5:]) if tail else "no output" + raise RuntimeError( + f"{provider} failed for port {port}: {reason}; output: {sample}" + ) diff --git a/src/ccbot/session.py b/src/ccbot/session.py index 173293b1..ebbedb8b 100644 --- a/src/ccbot/session.py +++ b/src/ccbot/session.py @@ -53,6 +53,7 @@ class WindowState: session_id: str = "" cwd: str = "" window_name: str = "" + file_path: str = "" def to_dict(self) -> dict[str, Any]: d: dict[str, Any] = { @@ -61,6 +62,8 @@ def to_dict(self) -> dict[str, Any]: } if self.window_name: d["window_name"] = self.window_name + if self.file_path: + d["file_path"] = self.file_path return d @classmethod @@ -69,6 +72,7 @@ def from_dict(cls, data: dict[str, Any]) -> "WindowState": session_id=data.get("session_id", ""), cwd=data.get("cwd", ""), window_name=data.get("window_name", ""), + file_path=data.get("file_path", ""), ) @@ -481,6 +485,10 @@ async def wait_for_session_map_entry( content = await f.read() session_map = json.loads(content) info = session_map.get(key, {}) + provider = info.get("provider", "claude") + if provider != config.provider: + await asyncio.sleep(interval) + continue if info.get("session_id"): # Found — load into window_states immediately logger.debug( @@ -521,6 +529,8 @@ async def load_session_map(self) -> None: # Only process entries for our tmux session if not key.startswith(prefix): continue + if info.get("provider", "claude") != config.provider: + continue window_id = key[len(prefix) :] if not self._is_window_id(window_id): continue @@ -528,10 +538,15 @@ async def load_session_map(self) -> None: new_sid = info.get("session_id", "") new_cwd = info.get("cwd", "") new_wname = info.get("window_name", "") + new_fp = info.get("file_path", "") if not new_sid: continue state = self.get_window_state(window_id) - if state.session_id != new_sid or state.cwd != new_cwd: + if ( + state.session_id != new_sid + or state.cwd != new_cwd + or state.file_path != new_fp + ): logger.info( "Session map: window_id %s updated sid=%s, cwd=%s", window_id, @@ -540,6 +555,7 @@ async def load_session_map(self) -> None: ) state.session_id = new_sid state.cwd = new_cwd + state.file_path = new_fp changed = True # Update display name if new_wname: @@ -575,33 +591,40 @@ def clear_window_session(self, window_id: str) -> None: @staticmethod def _encode_cwd(cwd: str) -> str: - """Encode a cwd path to match Claude Code's project directory naming. - - Replaces all non-alphanumeric characters (except dash) with dashes. - E.g. /home/user_name/Code/project -> -home-user-name-Code-project - """ + """Encode cwd to Claude projects dir naming convention.""" return re.sub(r"[^a-zA-Z0-9-]", "-", cwd) - def _build_session_file_path(self, session_id: str, cwd: str) -> Path | None: + def _build_session_file_path( + self, session_id: str, cwd: str, file_path: str = "" + ) -> Path | None: """Build the direct file path for a session from session_id and cwd.""" + if file_path: + return Path(file_path) if not session_id or not cwd: return None + if config.provider == "codex": + return None encoded_cwd = self._encode_cwd(cwd) return config.claude_projects_path / encoded_cwd / f"{session_id}.jsonl" async def _get_session_direct( - self, session_id: str, cwd: str + self, session_id: str, cwd: str, file_path: str = "" ) -> ClaudeSession | None: """Get a ClaudeSession directly from session_id and cwd (no scanning).""" - file_path = self._build_session_file_path(session_id, cwd) + resolved_path = self._build_session_file_path(session_id, cwd, file_path) # Fallback: glob search if direct path doesn't exist - if not file_path or not file_path.exists(): - pattern = f"*/{session_id}.jsonl" - matches = list(config.claude_projects_path.glob(pattern)) + if not resolved_path or not resolved_path.exists(): + if config.provider == "codex": + matches = list( + config.codex_sessions_path.rglob(f"rollout-*{session_id}.jsonl") + ) + else: + pattern = f"*/{session_id}.jsonl" + matches = list(config.claude_projects_path.glob(pattern)) if matches: - file_path = matches[0] - logger.debug("Found session via glob: %s", file_path) + resolved_path = matches[0] + logger.debug("Found session via glob: %s", resolved_path) else: return None @@ -610,7 +633,7 @@ async def _get_session_direct( last_user_msg = "" message_count = 0 try: - async with aiofiles.open(file_path, "r", encoding="utf-8") as f: + async with aiofiles.open(resolved_path, "r", encoding="utf-8") as f: async for line in f: line = line.strip() if not line: @@ -640,7 +663,7 @@ async def _get_session_direct( session_id=session_id, summary=summary, message_count=message_count, - file_path=str(file_path), + file_path=str(resolved_path), ) # --- Directory session listing --- @@ -692,7 +715,11 @@ async def resolve_session_for_window(self, window_id: str) -> ClaudeSession | No if not state.session_id or not state.cwd: return None - session = await self._get_session_direct(state.session_id, state.cwd) + session = await self._get_session_direct( + state.session_id, + state.cwd, + state.file_path, + ) if session: return session @@ -705,6 +732,7 @@ async def resolve_session_for_window(self, window_id: str) -> ClaudeSession | No ) state.session_id = "" state.cwd = "" + state.file_path = "" self._save_state() return None @@ -716,6 +744,9 @@ def update_user_window_offset( """Update the user's last read offset for a window.""" if user_id not in self.user_window_offsets: self.user_window_offsets[user_id] = {} + prev = self.user_window_offsets[user_id].get(window_id) + if prev == offset: + return self.user_window_offsets[user_id][window_id] = offset self._save_state() @@ -778,10 +809,11 @@ def resolve_window_for_thread( ) -> str | None: """Resolve the tmux window_id for a user's thread. - Returns None if thread_id is None or the thread is not bound. + For private chats (thread_id=None), falls back to the synthetic + binding key 0. """ if thread_id is None: - return None + return self.get_window_for_thread(user_id, 0) return self.get_window_for_thread(user_id, thread_id) def iter_thread_bindings(self) -> Iterator[tuple[int, int, str]]: @@ -797,16 +829,24 @@ def iter_thread_bindings(self) -> Iterator[tuple[int, int, str]]: async def find_users_for_session( self, session_id: str, - ) -> list[tuple[int, str, int]]: + ) -> list[tuple[int, str, int | None]]: """Find all users whose thread-bound window maps to the given session_id. Returns list of (user_id, window_id, thread_id) tuples. + Private bindings use thread_id=None. """ - result: list[tuple[int, str, int]] = [] + result: list[tuple[int, str, int | None]] = [] for user_id, thread_id, window_id in self.iter_thread_bindings(): + state = self.window_states.get(window_id) + # Fast path: session_id is already synced from session_map.json. + if state and state.session_id == session_id: + result.append((user_id, window_id, thread_id or None)) + continue + + # Fallback path for stale/incomplete state. resolved = await self.resolve_session_for_window(window_id) if resolved and resolved.session_id == session_id: - result.append((user_id, window_id, thread_id)) + result.append((user_id, window_id, thread_id or None)) return result # --- Tmux helpers --- diff --git a/src/ccbot/session_monitor.py b/src/ccbot/session_monitor.py index 0a1b3186..3e0807f1 100644 --- a/src/ccbot/session_monitor.py +++ b/src/ccbot/session_monitor.py @@ -14,6 +14,7 @@ import asyncio import json import logging +import time from dataclasses import dataclass from pathlib import Path from typing import Any, Callable, Awaitable @@ -21,6 +22,7 @@ import aiofiles from .config import config +from .codex_mapper import codex_session_mapper from .monitor_state import MonitorState, TrackedSession from .tmux_manager import tmux_manager from .transcript_parser import TranscriptParser @@ -31,7 +33,7 @@ @dataclass class SessionInfo: - """Information about a Claude Code session.""" + """Information about an agent session transcript file.""" session_id: str file_path: Path @@ -49,10 +51,14 @@ class NewMessage: role: str = "assistant" # "user" or "assistant" tool_name: str | None = None # For tool_use messages, the tool name image_data: list[tuple[str, bytes]] | None = None # From tool_result images + source_line_start: int = 0 # JSONL byte offset start in transcript + source_line_end: int = 0 # JSONL byte offset end in transcript + trace_id: str = "" # Stable line trace id: :- + detected_at_monotonic: float = 0.0 # Local monotonic time when line was parsed class SessionMonitor: - """Monitors Claude Code sessions for new assistant messages. + """Monitors sessions for new assistant messages. Uses simple async polling with aiofiles for non-blocking I/O. Emits both intermediate and complete assistant messages. @@ -65,7 +71,7 @@ def __init__( state_file: Path | None = None, ): self.projects_path = ( - projects_path if projects_path is not None else config.claude_projects_path + projects_path if projects_path is not None else config.provider_data_root ) self.poll_interval = ( poll_interval if poll_interval is not None else config.monitor_poll_interval @@ -103,6 +109,9 @@ async def _get_active_cwds(self) -> set[str]: async def scan_projects(self) -> list[SessionInfo]: """Scan projects that have active tmux windows.""" + if config.provider == "codex": + return await self._scan_codex_from_session_map() + active_cwds = await self._get_active_cwds() if not active_cwds: return [] @@ -193,6 +202,28 @@ async def scan_projects(self) -> list[SessionInfo]: return sessions + async def _scan_codex_from_session_map(self) -> list[SessionInfo]: + """Read codex transcript paths from session_map.json entries.""" + entries = await self._load_current_session_map_entries() + sessions: list[SessionInfo] = [] + seen: set[str] = set() + for info in entries.values(): + session_id = info.get("session_id", "") + if not session_id or session_id in seen: + continue + file_path_raw = info.get("file_path", "") + file_path = Path(file_path_raw) if file_path_raw else None + if file_path is None or not file_path.exists(): + matches = list( + config.codex_sessions_path.rglob(f"rollout-*{session_id}.jsonl") + ) + file_path = matches[0] if matches else None + if file_path is None or not file_path.exists(): + continue + seen.add(session_id) + sessions.append(SessionInfo(session_id=session_id, file_path=file_path)) + return sessions + async def _read_new_lines( self, session: TrackedSession, file_path: Path ) -> list[dict]: @@ -244,11 +275,18 @@ async def _read_new_lines( # successfully. A non-empty line that fails JSON parsing is # likely a partial write; stop and retry next cycle. safe_offset = session.last_byte_offset - async for line in f: + while True: + line_start = await f.tell() + line = await f.readline() + if not line: + break + line_end = await f.tell() data = TranscriptParser.parse_line(line) if data: + data["__ccbot_line_start"] = line_start + data["__ccbot_line_end"] = line_end new_entries.append(data) - safe_offset = await f.tell() + safe_offset = line_end elif line.strip(): # Partial JSONL line — don't advance offset past it logger.warning( @@ -258,7 +296,7 @@ async def _read_new_lines( break else: # Empty line — safe to skip - safe_offset = await f.tell() + safe_offset = line_end session.last_byte_offset = safe_offset @@ -334,35 +372,52 @@ async def check_for_updates(self, active_session_ids: set[str]) -> list[NewMessa f"session {session_info.session_id}" ) - # Parse new entries using the shared logic, carrying over pending tools + # Parse line-by-line to preserve exact source line trace. carry = self._pending_tools.get(session_info.session_id, {}) - parsed_entries, remaining = TranscriptParser.parse_entries( - new_entries, - pending_tools=carry, - ) - if remaining: - self._pending_tools[session_info.session_id] = remaining - else: - self._pending_tools.pop(session_info.session_id, None) + for raw in new_entries: + line_start = int(raw.get("__ccbot_line_start", 0)) + line_end = int(raw.get("__ccbot_line_end", 0)) + trace_id = f"{session_info.session_id}:{line_start}-{line_end}" + + parsed_entries, carry = TranscriptParser.parse_entries( + [raw], + pending_tools=carry, + ) - for entry in parsed_entries: - if not entry.text and not entry.image_data: - continue - # Skip user messages unless show_user_messages is enabled - if entry.role == "user" and not config.show_user_messages: - continue - new_messages.append( - NewMessage( - session_id=session_info.session_id, - text=entry.text, - is_complete=True, - content_type=entry.content_type, - tool_use_id=entry.tool_use_id, - role=entry.role, - tool_name=entry.tool_name, - image_data=entry.image_data, + for entry in parsed_entries: + if not entry.text and not entry.image_data: + continue + # Skip user messages unless show_user_messages is enabled + if entry.role == "user" and not config.show_user_messages: + continue + logger.info( + "LineTrace in: trace=%s type=%s role=%s text_len=%d", + trace_id, + entry.content_type, + entry.role, + len(entry.text), ) - ) + new_messages.append( + NewMessage( + session_id=session_info.session_id, + text=entry.text, + is_complete=True, + content_type=entry.content_type, + tool_use_id=entry.tool_use_id, + role=entry.role, + tool_name=entry.tool_name, + image_data=entry.image_data, + source_line_start=line_start, + source_line_end=line_end, + trace_id=trace_id, + detected_at_monotonic=time.monotonic(), + ) + ) + + if carry: + self._pending_tools[session_info.session_id] = carry + else: + self._pending_tools.pop(session_info.session_id, None) self.state.update_session(tracked) @@ -372,8 +427,8 @@ async def check_for_updates(self, active_session_ids: set[str]) -> list[NewMessa self.state.save_if_dirty() return new_messages - async def _load_current_session_map(self) -> dict[str, str]: - """Load current session_map and return window_key -> session_id mapping. + async def _load_current_session_map_entries(self) -> dict[str, dict[str, str]]: + """Load current session_map and return window_key -> mapping info. Keys in session_map are formatted as "tmux_session:window_id" (e.g. "ccbot:@12"). Old-format keys ("ccbot:window_name") are also @@ -381,7 +436,7 @@ async def _load_current_session_map(self) -> dict[str, str]: to be monitored until the hook re-fires with new format. Only entries matching our tmux_session_name are processed. """ - window_to_session: dict[str, str] = {} + window_to_info: dict[str, dict[str, str]] = {} if config.session_map_file.exists(): try: async with aiofiles.open(config.session_map_file, "r") as f: @@ -389,15 +444,37 @@ async def _load_current_session_map(self) -> dict[str, str]: session_map = json.loads(content) prefix = f"{config.tmux_session_name}:" for key, info in session_map.items(): + if not isinstance(info, dict): + continue # Only process entries for our tmux session if not key.startswith(prefix): continue + entry_provider = info.get("provider", "claude") + if entry_provider != config.provider: + continue window_key = key[len(prefix) :] session_id = info.get("session_id", "") - if session_id: - window_to_session[window_key] = session_id + if not session_id: + continue + window_to_info[window_key] = { + "session_id": session_id, + "cwd": info.get("cwd", ""), + "window_name": info.get("window_name", ""), + "provider": entry_provider, + "file_path": info.get("file_path", ""), + } except (json.JSONDecodeError, OSError): pass + return window_to_info + + async def _load_current_session_map(self) -> dict[str, str]: + """Load current session_map and return window_key -> session_id mapping.""" + entries = await self._load_current_session_map_entries() + window_to_session: dict[str, str] = {} + for window_key, info in entries.items(): + session_id = info.get("session_id", "") + if session_id: + window_to_session[window_key] = session_id return window_to_session async def _cleanup_all_stale_sessions(self) -> None: @@ -483,6 +560,9 @@ async def _monitor_loop(self) -> None: while self._running: try: + if config.provider == "codex": + await codex_session_mapper.sync_session_map() + # Load hook-based session map updates await session_manager.load_session_map() @@ -517,10 +597,14 @@ def start(self) -> None: self._running = True self._task = asyncio.create_task(self._monitor_loop()) - def stop(self) -> None: + async def stop(self) -> None: self._running = False if self._task: self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass self._task = None self.state.save() logger.info("Session monitor stopped and state saved") diff --git a/src/ccbot/tmux_manager.py b/src/ccbot/tmux_manager.py index f05b4f3a..423381a2 100644 --- a/src/ccbot/tmux_manager.py +++ b/src/ccbot/tmux_manager.py @@ -376,7 +376,7 @@ async def create_window( Args: work_dir: Working directory for the new window window_name: Optional window name (defaults to directory name) - start_claude: Whether to start claude command + start_claude: Whether to start configured agent command resume_session_id: If set, append --resume to claude command Returns: @@ -414,11 +414,11 @@ def _create_and_start() -> tuple[bool, str, str, str]: # Prevent Claude Code from overriding window name window.set_window_option("allow-rename", "off") - # Start Claude Code if requested + # Start agent command if requested if start_claude: pane = window.active_pane if pane: - cmd = config.claude_command + cmd = config.agent_command if resume_session_id: cmd = f"{cmd} --resume {resume_session_id}" pane.send_keys(cmd, enter=True) diff --git a/src/ccbot/transcript_parser.py b/src/ccbot/transcript_parser.py index fa0bbf69..35f7e9a6 100644 --- a/src/ccbot/transcript_parser.py +++ b/src/ccbot/transcript_parser.py @@ -104,10 +104,33 @@ def get_message_type(data: dict) -> str | None: """ return data.get("type") + @staticmethod + def is_codex_entry(data: dict) -> bool: + """Check if this line is from Codex rollout JSONL format.""" + return data.get("type") in ( + "session_meta", + "turn_context", + "event_msg", + "response_item", + ) + @staticmethod def is_user_message(data: dict) -> bool: """Check if this is a user message.""" - return data.get("type") == "user" + msg_type = data.get("type") + if msg_type == "user": + return True + if msg_type == "event_msg": + payload = data.get("payload", {}) + return isinstance(payload, dict) and payload.get("type") == "user_message" + if msg_type == "response_item": + payload = data.get("payload", {}) + return ( + isinstance(payload, dict) + and payload.get("type") == "message" + and payload.get("role") == "user" + ) + return False @staticmethod def extract_text_only(content_list: list[Any]) -> str: @@ -139,12 +162,40 @@ def extract_text_only(content_list: list[Any]) -> str: return "\n".join(texts) + @staticmethod + def _extract_codex_text_from_content(content_list: list[Any] | Any) -> str: + """Extract text payload from Codex response_item.message content.""" + if isinstance(content_list, str): + return content_list + if not isinstance(content_list, list): + return "" + + texts: list[str] = [] + for item in content_list: + if isinstance(item, str): + texts.append(item) + continue + if not isinstance(item, dict): + continue + item_type = item.get("type", "") + if item_type in ("output_text", "input_text", "text"): + text = item.get("text", "") + if text: + texts.append(text) + return "\n".join(texts) + _RE_ANSI_ESCAPE = re.compile(r"\x1b\[[0-9;]*m") _RE_COMMAND_NAME = re.compile(r"(.*?)") _RE_LOCAL_STDOUT = re.compile( r"(.*?)", re.DOTALL ) + _RE_CODEX_USER_SHELL = re.compile( + r"\s*\s*(.*?)\s*\s*" + r"\s*(.*?)\s*\s*", + re.DOTALL, + ) + _RE_CODEX_OUTPUT = re.compile(r"Output:\s*\n(.*)", re.DOTALL) _RE_SYSTEM_TAGS = re.compile( r"<(bash-input|bash-stdout|bash-stderr|local-command-caveat|system-reminder)" ) @@ -284,6 +335,58 @@ def parse_message(cls, data: dict) -> ParsedMessage | None: """ msg_type = cls.get_message_type(data) + # Codex rollout event line + if msg_type == "event_msg": + payload = data.get("payload") + if not isinstance(payload, dict): + return None + event_type = payload.get("type", "") + if event_type == "user_message": + return ParsedMessage( + message_type="user", + text=str(payload.get("message", "")), + ) + if event_type == "agent_message": + return ParsedMessage( + message_type="assistant", + text=str(payload.get("message", "")), + ) + if event_type == "agent_reasoning": + return ParsedMessage( + message_type="thinking", + text=str(payload.get("text", "")), + ) + return None + + # Codex rollout response item + if msg_type == "response_item": + payload = data.get("payload") + if not isinstance(payload, dict): + return None + payload_type = payload.get("type", "") + if payload_type == "message": + role = payload.get("role", "") + if role not in ("assistant", "user"): + return None + content = payload.get("content", []) + text = cls._extract_codex_text_from_content(content) + return ParsedMessage(message_type=role, text=text) + if payload_type == "reasoning": + summary = payload.get("summary", []) + text_parts: list[str] = [] + if isinstance(summary, list): + for item in summary: + if isinstance(item, dict): + t = item.get("text", "") + if t: + text_parts.append(t) + return ParsedMessage( + message_type="thinking", + text="\n".join(text_parts).strip(), + ) + return None + + # Claude JSONL line if msg_type not in ("user", "assistant"): return None @@ -408,6 +511,249 @@ def _format_tool_result_text(cls, text: str, tool_name: str | None = None) -> st # Default: expandable quote without stats return cls._format_expandable_quote(text) + @classmethod + def _parse_codex_user_shell_command(cls, text: str) -> str | None: + """Parse Codex `` payload into local-command text.""" + match = cls._RE_CODEX_USER_SHELL.search(text) + if not match: + return None + + cmd = match.group(1).strip() + result_block = match.group(2).strip() + output_match = cls._RE_CODEX_OUTPUT.search(result_block) + output = output_match.group(1).strip() if output_match else "" + + if not output: + output = "(no output)" + + if cmd: + if "\n" in output: + return f"❯ `{cmd}`\n```\n{output}\n```" + return f"❯ `{cmd}`\n`{output}`" + + if "\n" in output: + return f"```\n{output}\n```" + return f"`{output}`" + + @classmethod + def _parse_codex_entries( + cls, + entries: list[dict], + pending_tools: dict[str, PendingToolInfo] | None = None, + ) -> tuple[list[ParsedEntry], dict[str, PendingToolInfo]]: + """Parse Codex rollout JSONL entries into ParsedEntry records.""" + result: list[ParsedEntry] = [] + + def _append_entry(entry: ParsedEntry) -> None: + """Append entry with light dedupe for duplicate Codex events.""" + entry.text = entry.text.strip() + if not entry.text: + return + if result: + prev = result[-1] + if ( + prev.role == entry.role + and prev.content_type == entry.content_type + and prev.text == entry.text + and prev.tool_use_id == entry.tool_use_id + ): + return + result.append(entry) + + _carry_over = pending_tools is not None + if pending_tools is None: + pending_tools = {} + else: + pending_tools = dict(pending_tools) + + for data in entries: + entry_timestamp = cls.get_timestamp(data) + line_type = data.get("type", "") + + if line_type == "response_item": + payload = data.get("payload", {}) + if not isinstance(payload, dict): + continue + payload_type = payload.get("type", "") + + if payload_type == "message": + role = payload.get("role", "") + if role not in ("assistant", "user"): + continue + if role == "user": + # Codex shell command responses are encoded as user messages + # inside a wrapper. + user_text = cls._extract_codex_text_from_content( + payload.get("content", []) + ).strip() + local_cmd_text = cls._parse_codex_user_shell_command(user_text) + if local_cmd_text: + _append_entry( + ParsedEntry( + role="assistant", + text=local_cmd_text, + content_type="local_command", + timestamp=entry_timestamp, + ) + ) + continue + text = cls._extract_codex_text_from_content( + payload.get("content", []) + ).strip() + if text: + _append_entry( + ParsedEntry( + role="assistant", + text=text, + content_type="text", + timestamp=entry_timestamp, + ) + ) + + elif payload_type == "reasoning": + summary = payload.get("summary", []) + text_parts: list[str] = [] + if isinstance(summary, list): + for item in summary: + if not isinstance(item, dict): + continue + if item.get("type") != "summary_text": + continue + text = item.get("text", "") + if text: + text_parts.append(text) + thinking_text = "\n".join(text_parts).strip() + if thinking_text: + _append_entry( + ParsedEntry( + role="assistant", + text=cls._format_expandable_quote(thinking_text), + content_type="thinking", + timestamp=entry_timestamp, + ) + ) + + elif payload_type == "function_call": + call_id = payload.get("call_id", "") + name = payload.get("name", "tool") + args_raw = payload.get("arguments", "") + args_data: dict[str, Any] | Any = {} + if isinstance(args_raw, str) and args_raw: + try: + parsed_args = json.loads(args_raw) + args_data = parsed_args + except json.JSONDecodeError: + args_data = {"arguments": args_raw} + elif isinstance(args_raw, dict): + args_data = args_raw + + summary = cls.format_tool_use_summary(name, args_data) + if call_id: + pending_tools[call_id] = PendingToolInfo( + summary=summary, + tool_name=name, + input_data=args_data + if name in ("Edit", "NotebookEdit") + else None, + ) + _append_entry( + ParsedEntry( + role="assistant", + text=summary, + content_type="tool_use", + tool_use_id=call_id or None, + timestamp=entry_timestamp, + tool_name=name, + ) + ) + + elif payload_type == "function_call_output": + call_id = payload.get("call_id", "") + output = str(payload.get("output", "")).strip() + tool_info = pending_tools.pop(call_id, None) + + tool_summary = tool_info.summary if tool_info else "**Result**" + tool_name = tool_info.tool_name if tool_info else None + entry_text = tool_summary + if output: + entry_text += "\n" + cls._format_tool_result_text( + output, tool_name + ) + _append_entry( + ParsedEntry( + role="assistant", + text=entry_text, + content_type="tool_result", + tool_use_id=call_id or None, + timestamp=entry_timestamp, + ) + ) + + elif line_type == "event_msg": + payload = data.get("payload", {}) + if not isinstance(payload, dict): + continue + event_type = payload.get("type", "") + if event_type == "user_message": + text = str(payload.get("message", "")).strip() + if text: + _append_entry( + ParsedEntry( + role="user", + text=text, + content_type="text", + timestamp=entry_timestamp, + ) + ) + elif event_type == "agent_message": + text = str(payload.get("message", "")).strip() + if text: + _append_entry( + ParsedEntry( + role="assistant", + text=text, + content_type="text", + timestamp=entry_timestamp, + ) + ) + elif event_type == "agent_reasoning": + text = str(payload.get("text", "")).strip() + if text: + _append_entry( + ParsedEntry( + role="assistant", + text=cls._format_expandable_quote(text), + content_type="thinking", + timestamp=entry_timestamp, + ) + ) + elif event_type == "task_complete": + # Fallback final text for turns where message payload is not emitted. + text = str(payload.get("last_agent_message", "")).strip() + if text: + _append_entry( + ParsedEntry( + role="assistant", + text=text, + content_type="text", + timestamp=entry_timestamp, + ) + ) + + remaining_pending = dict(pending_tools) + if not _carry_over: + for tool_id, tool_info in pending_tools.items(): + _append_entry( + ParsedEntry( + role="assistant", + text=tool_info.summary, + content_type="tool_use", + tool_use_id=tool_id, + ) + ) + + return result, remaining_pending + @classmethod def parse_entries( cls, @@ -429,6 +775,9 @@ def parse_entries( Returns: Tuple of (parsed entries, remaining pending_tools state) """ + if any(cls.is_codex_entry(data) for data in entries): + return cls._parse_codex_entries(entries, pending_tools) + result: list[ParsedEntry] = [] last_cmd_name: str | None = None # Pending tool_use blocks keyed by id diff --git a/tests/ccbot/handlers/test_message_queue.py b/tests/ccbot/handlers/test_message_queue.py new file mode 100644 index 00000000..e9225764 --- /dev/null +++ b/tests/ccbot/handlers/test_message_queue.py @@ -0,0 +1,29 @@ +"""Tests for message queue helpers.""" + +import pytest + +from ccbot.handlers import message_queue +from ccbot.handlers.message_queue import MessageTask + + +@pytest.fixture(autouse=True) +def clean_message_queue_state() -> None: + message_queue._message_queues.clear() + message_queue._queue_locks.clear() + message_queue._queue_workers.clear() + message_queue._flood_until.clear() + message_queue._status_msg_info.clear() + message_queue._tool_msg_ids.clear() + yield + message_queue._message_queues.clear() + message_queue._queue_locks.clear() + message_queue._queue_workers.clear() + message_queue._flood_until.clear() + message_queue._status_msg_info.clear() + message_queue._tool_msg_ids.clear() + + +def test_message_task_defaults() -> None: + task = MessageTask(task_type="content") + assert task.parts == [] + assert task.content_type == "text" diff --git a/tests/ccbot/test_codex_mapper.py b/tests/ccbot/test_codex_mapper.py new file mode 100644 index 00000000..6b94cb04 --- /dev/null +++ b/tests/ccbot/test_codex_mapper.py @@ -0,0 +1,151 @@ +"""Tests for CodexSessionMapper.""" + +import json +import os +from unittest.mock import AsyncMock, patch + +import pytest + +from ccbot.codex_mapper import CodexSessionMapper +from ccbot.tmux_manager import TmuxWindow + + +def _write_rollout(path, session_id: str, cwd: str, ts: str) -> None: + payload = { + "type": "session_meta", + "payload": { + "id": session_id, + "cwd": cwd, + "timestamp": ts, + }, + } + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload) + "\n", encoding="utf-8") + + +class TestCodexSessionMapper: + @pytest.mark.asyncio + async def test_maps_window_to_matching_cwd(self, tmp_path): + sessions_root = tmp_path / "sessions" + map_file = tmp_path / "session_map.json" + cwd = str((tmp_path / "proj").resolve()) + _write_rollout( + sessions_root / "2026/02/28/rollout-2026-02-28T00-00-00-sid-1.jsonl", + "sid-1", + cwd, + "2026-02-28T00:00:00Z", + ) + + mapper = CodexSessionMapper( + sessions_root=sessions_root, session_map_file=map_file + ) + windows = [ + TmuxWindow( + window_id="@1", + window_name="proj", + cwd=cwd, + pane_current_command="codex", + ) + ] + + with patch("ccbot.codex_mapper.tmux_manager") as mock_tmux: + mock_tmux.list_windows = AsyncMock(return_value=windows) + changed = await mapper.sync_session_map() + + assert changed is True + data = json.loads(map_file.read_text()) + key = "ccbot:@1" + assert key in data + assert data[key]["session_id"] == "sid-1" + assert data[key]["provider"] == "codex" + assert data[key]["cwd"] == cwd + + @pytest.mark.asyncio + async def test_removes_stale_codex_entry(self, tmp_path): + sessions_root = tmp_path / "sessions" + map_file = tmp_path / "session_map.json" + map_file.write_text( + json.dumps( + { + "ccbot:@99": { + "session_id": "sid-old", + "cwd": "/tmp/old", + "window_name": "old", + "provider": "codex", + "file_path": "/tmp/old.jsonl", + } + } + ), + encoding="utf-8", + ) + mapper = CodexSessionMapper( + sessions_root=sessions_root, session_map_file=map_file + ) + + with patch("ccbot.codex_mapper.tmux_manager") as mock_tmux: + mock_tmux.list_windows = AsyncMock(return_value=[]) + changed = await mapper.sync_session_map() + + assert changed is True + data = json.loads(map_file.read_text()) + assert "ccbot:@99" not in data + + @pytest.mark.asyncio + async def test_prefers_resume_session_id_for_main_window( + self, tmp_path, monkeypatch + ): + sessions_root = tmp_path / "sessions" + map_file = tmp_path / "session_map.json" + ccbot_cwd = str((tmp_path / "ccbot").resolve()) + resume_sid = "019c9eef-c5f7-7dc2-9e92-de59a1c3cd28" + other_sid = "019ca375-fbf1-7e20-aff6-38003fd36889" + + resume_path = ( + sessions_root + / "2026/02/27" + / f"rollout-2026-02-27T14-50-39-{resume_sid}.jsonl" + ) + other_path = ( + sessions_root + / "2026/02/28" + / f"rollout-2026-02-28T11-55-44-{other_sid}.jsonl" + ) + _write_rollout( + resume_path, + resume_sid, + str((tmp_path / "opticlaw").resolve()), + "2026-02-27T14:50:39Z", + ) + _write_rollout( + other_path, + other_sid, + ccbot_cwd, + "2026-02-28T11:55:44Z", + ) + os.utime(resume_path, (1700000000, 1700000000)) + os.utime(other_path, (1800000000, 1800000000)) + + mapper = CodexSessionMapper( + sessions_root=sessions_root, session_map_file=map_file + ) + windows = [ + TmuxWindow( + window_id="@3", + window_name="ccbot", + cwd=ccbot_cwd, + pane_current_command="node", + ) + ] + + monkeypatch.setattr( + "ccbot.codex_mapper.config.codex_resume_session_id", resume_sid + ) + monkeypatch.setattr("ccbot.codex_mapper.config.tmux_session_name", "ccbot") + + with patch("ccbot.codex_mapper.tmux_manager") as mock_tmux: + mock_tmux.list_windows = AsyncMock(return_value=windows) + changed = await mapper.sync_session_map() + + assert changed is True + data = json.loads(map_file.read_text()) + assert data["ccbot:@3"]["session_id"] == resume_sid diff --git a/tests/ccbot/test_config.py b/tests/ccbot/test_config.py index 95cf35f9..307cc59e 100644 --- a/tests/ccbot/test_config.py +++ b/tests/ccbot/test_config.py @@ -41,6 +41,37 @@ def test_is_user_allowed_false(self): cfg = Config() assert cfg.is_user_allowed(99999) is False + def test_default_provider_is_claude(self): + cfg = Config() + assert cfg.provider == "claude" + assert cfg.agent_command == "claude" + assert cfg.supports_usage_command is True + assert cfg.forward_slash_commands is True + + def test_codex_provider_defaults(self, monkeypatch): + monkeypatch.setenv("CCBOT_PROVIDER", "codex") + cfg = Config() + assert cfg.provider == "codex" + assert cfg.agent_command == "codex" + assert cfg.codex_resume_session_id == "" + assert cfg.supports_usage_command is False + assert cfg.forward_slash_commands is True + assert cfg.provider_data_root == cfg.codex_sessions_path + + def test_codex_resume_session_id_from_agent_command(self, monkeypatch): + monkeypatch.setenv("CCBOT_PROVIDER", "codex") + monkeypatch.setenv( + "CCBOT_AGENT_COMMAND", + "codex resume 019c9eef-c5f7-7dc2-9e92-de59a1c3cd28", + ) + cfg = Config() + assert cfg.codex_resume_session_id == "019c9eef-c5f7-7dc2-9e92-de59a1c3cd28" + + def test_forward_ports(self, monkeypatch): + monkeypatch.setenv("CCBOT_FORWARD_PORTS", "3000, 5173") + cfg = Config() + assert cfg.forward_ports == [3000, 5173] + @pytest.mark.usefixtures("_base_env") class TestConfigMissingEnv: @@ -59,6 +90,16 @@ def test_non_numeric_allowed_users(self, monkeypatch): with pytest.raises(ValueError, match="non-numeric"): Config() + def test_invalid_provider(self, monkeypatch): + monkeypatch.setenv("CCBOT_PROVIDER", "bad-provider") + with pytest.raises(ValueError, match="CCBOT_PROVIDER"): + Config() + + def test_invalid_forward_ports(self, monkeypatch): + monkeypatch.setenv("CCBOT_FORWARD_PORTS", "abc") + with pytest.raises(ValueError, match="CCBOT_FORWARD_PORTS"): + Config() + @pytest.mark.usefixtures("_base_env") class TestConfigClaudeProjectsPath: diff --git a/tests/ccbot/test_main.py b/tests/ccbot/test_main.py new file mode 100644 index 00000000..c7d0e06d --- /dev/null +++ b/tests/ccbot/test_main.py @@ -0,0 +1,19 @@ +"""Tests for CLI argument parsing in main.py.""" + +import pytest + +from ccbot.main import _parse_forward_ports + + +def test_parse_forward_ports_empty() -> None: + assert _parse_forward_ports([]) == [] + + +def test_parse_forward_ports_multiple() -> None: + ports = _parse_forward_ports(["--forward", "3000,5173", "--forward", "8080"]) + assert ports == [3000, 5173, 8080] + + +def test_parse_forward_ports_invalid() -> None: + with pytest.raises(SystemExit): + _parse_forward_ports(["--forward", "abc"]) diff --git a/tests/ccbot/test_port_forward.py b/tests/ccbot/test_port_forward.py new file mode 100644 index 00000000..ca4d4650 --- /dev/null +++ b/tests/ccbot/test_port_forward.py @@ -0,0 +1,49 @@ +"""Tests for port forwarding URL parsing and ssh command generation.""" + +from types import SimpleNamespace +from unittest.mock import AsyncMock, patch + +import pytest + +from ccbot.port_forward import PortForwardManager +from ccbot.port_forward import _LOCALHOST_RUN_URL_RE + + +def test_localhost_run_regex_ignores_admin_url() -> None: + assert _LOCALHOST_RUN_URL_RE.search("https://admin.localhost.run/") is None + + +def test_localhost_run_regex_matches_random_subdomain() -> None: + m = _LOCALHOST_RUN_URL_RE.search("tunnel: https://a1b2c3d4.localhost.run") + assert m is not None + assert m.group(0) == "https://a1b2c3d4.localhost.run" + + +@pytest.mark.asyncio +async def test_start_localhost_run_uses_non_n_ssh_command() -> None: + manager = PortForwardManager([3000]) + fake_proc = SimpleNamespace(returncode=None) + + with ( + patch( + "ccbot.port_forward.asyncio.create_subprocess_exec", + new_callable=AsyncMock, + return_value=fake_proc, + ) as mock_create, + patch.object( + manager, + "_wait_for_url", + new_callable=AsyncMock, + return_value="https://2cd35b9d853526.lhr.life", + ), + ): + tunnel = await manager._start_localhost_run(3000) + + assert tunnel.public_url == "https://2cd35b9d853526.lhr.life" + + cmd = mock_create.await_args.args + assert cmd[0] == "ssh" + assert "-N" not in cmd + assert "-R" in cmd + assert "80:127.0.0.1:3000" in cmd + assert cmd[-1] == "nokey@localhost.run" diff --git a/tests/ccbot/test_session.py b/tests/ccbot/test_session.py index 022fb55a..a4be6789 100644 --- a/tests/ccbot/test_session.py +++ b/tests/ccbot/test_session.py @@ -109,9 +109,15 @@ def test_clear_window_session(self, mgr: SessionManager) -> None: class TestResolveWindowForThread: - def test_none_thread_id_returns_none(self, mgr: SessionManager) -> None: + def test_none_thread_id_without_private_binding_returns_none( + self, mgr: SessionManager + ) -> None: assert mgr.resolve_window_for_thread(100, None) is None + def test_none_thread_id_uses_private_binding(self, mgr: SessionManager) -> None: + mgr.bind_thread(100, 0, "@9") + assert mgr.resolve_window_for_thread(100, None) == "@9" + def test_unbound_thread_returns_none(self, mgr: SessionManager) -> None: assert mgr.resolve_window_for_thread(100, 42) is None @@ -144,6 +150,66 @@ def test_bind_thread_without_name_no_display(self, mgr: SessionManager) -> None: assert mgr.get_display_name("@1") == "@1" +class TestOffsets: + def test_update_user_window_offset_skips_unchanged_value( + self, mgr: SessionManager, monkeypatch + ) -> None: + save_calls = 0 + + def fake_save_state() -> None: + nonlocal save_calls + save_calls += 1 + + monkeypatch.setattr(mgr, "_save_state", fake_save_state) + + mgr.update_user_window_offset(100, "@1", 123) + mgr.update_user_window_offset(100, "@1", 123) + mgr.update_user_window_offset(100, "@1", 124) + + assert save_calls == 2 + + +class TestFindUsersForSession: + @pytest.mark.asyncio + async def test_fast_path_uses_window_state_without_resolve( + self, mgr: SessionManager, monkeypatch + ) -> None: + mgr.bind_thread(100, 1, "@1") + mgr.get_window_state("@1").session_id = "sid-1" + + resolve_calls = 0 + + async def fake_resolve(_window_id: str): + nonlocal resolve_calls + resolve_calls += 1 + return None + + monkeypatch.setattr(mgr, "resolve_session_for_window", fake_resolve) + + result = await mgr.find_users_for_session("sid-1") + + assert result == [(100, "@1", 1)] + assert resolve_calls == 0 + + @pytest.mark.asyncio + async def test_fallback_resolve_when_window_state_missing( + self, mgr: SessionManager, monkeypatch + ) -> None: + mgr.bind_thread(100, 1, "@1") + + class _Resolved: + session_id = "sid-1" + + async def fake_resolve(_window_id: str): + return _Resolved() + + monkeypatch.setattr(mgr, "resolve_session_for_window", fake_resolve) + + result = await mgr.find_users_for_session("sid-1") + + assert result == [(100, "@1", 1)] + + class TestIsWindowId: def test_valid_ids(self, mgr: SessionManager) -> None: assert mgr._is_window_id("@0") is True diff --git a/tests/ccbot/test_transcript_parser_codex.py b/tests/ccbot/test_transcript_parser_codex.py new file mode 100644 index 00000000..dbd2eb6b --- /dev/null +++ b/tests/ccbot/test_transcript_parser_codex.py @@ -0,0 +1,170 @@ +"""Tests for Codex rollout parsing in TranscriptParser.""" + +from ccbot.transcript_parser import TranscriptParser + +EXPQUOTE_START = TranscriptParser.EXPANDABLE_QUOTE_START +EXPQUOTE_END = TranscriptParser.EXPANDABLE_QUOTE_END + + +class TestCodexParseEntries: + def test_skips_system_entries(self): + entries = [ + {"type": "session_meta", "payload": {"id": "s1", "cwd": "/tmp"}}, + {"type": "turn_context", "payload": {"cwd": "/tmp"}}, + ] + result, pending = TranscriptParser.parse_entries(entries) + assert result == [] + assert pending == {} + + def test_parses_assistant_message_and_reasoning(self): + entries = [ + { + "type": "response_item", + "timestamp": "2026-01-01T00:00:00Z", + "payload": { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "hello"}], + }, + }, + { + "type": "response_item", + "timestamp": "2026-01-01T00:00:01Z", + "payload": { + "type": "reasoning", + "summary": [{"type": "summary_text", "text": "thinking"}], + }, + }, + ] + result, _ = TranscriptParser.parse_entries(entries) + assert len(result) == 2 + assert result[0].role == "assistant" + assert result[0].content_type == "text" + assert result[0].text == "hello" + assert result[1].content_type == "thinking" + assert EXPQUOTE_START in result[1].text and EXPQUOTE_END in result[1].text + + def test_parses_tool_call_and_output_pair(self): + entries = [ + { + "type": "response_item", + "payload": { + "type": "function_call", + "name": "exec_command", + "arguments": '{"cmd":"git status"}', + "call_id": "call-1", + }, + }, + { + "type": "response_item", + "payload": { + "type": "function_call_output", + "call_id": "call-1", + "output": "On branch main", + }, + }, + ] + result, pending = TranscriptParser.parse_entries(entries) + assert pending == {} + tool_use = [e for e in result if e.content_type == "tool_use"] + tool_result = [e for e in result if e.content_type == "tool_result"] + assert len(tool_use) == 1 + assert len(tool_result) == 1 + assert "exec_command" in tool_use[0].text + assert "On branch main" in tool_result[0].text + + def test_parses_event_user_message(self): + entries = [ + { + "type": "event_msg", + "payload": {"type": "user_message", "message": "привет"}, + } + ] + result, _ = TranscriptParser.parse_entries(entries) + assert len(result) == 1 + assert result[0].role == "user" + assert result[0].text == "привет" + + def test_parses_event_agent_message(self): + entries = [ + { + "type": "event_msg", + "timestamp": "2026-02-28T20:41:59.885Z", + "payload": {"type": "agent_message", "message": "done"}, + } + ] + result, _ = TranscriptParser.parse_entries(entries) + assert len(result) == 1 + assert result[0].role == "assistant" + assert result[0].content_type == "text" + assert result[0].text == "done" + + def test_parses_task_complete_last_agent_message(self): + entries = [ + { + "type": "event_msg", + "timestamp": "2026-02-28T20:42:00.000Z", + "payload": { + "type": "task_complete", + "last_agent_message": "final summary", + }, + } + ] + result, _ = TranscriptParser.parse_entries(entries) + assert len(result) == 1 + assert result[0].role == "assistant" + assert result[0].content_type == "text" + assert result[0].text == "final summary" + + def test_dedupes_reasoning_from_event_and_response_item(self): + entries = [ + { + "type": "event_msg", + "timestamp": "2026-02-28T20:41:36.020Z", + "payload": {"type": "agent_reasoning", "text": "same thinking"}, + }, + { + "type": "response_item", + "timestamp": "2026-02-28T20:41:36.020Z", + "payload": { + "type": "reasoning", + "summary": [{"type": "summary_text", "text": "same thinking"}], + }, + }, + ] + result, _ = TranscriptParser.parse_entries(entries) + assert len(result) == 1 + assert result[0].content_type == "thinking" + + def test_parses_codex_user_shell_command_as_local_command(self): + text = ( + "\n" + "\n" + "pwd\n" + "\n" + "\n" + "Exit code: 0\n" + "Duration: 0.01s\n" + "Output:\n" + "/home/dimkk/new-proj/opticlaw\n" + "\n" + "\n" + "" + ) + entries = [ + { + "type": "response_item", + "timestamp": "2026-02-28T09:42:26.596Z", + "payload": { + "type": "message", + "role": "user", + "content": [{"type": "input_text", "text": text}], + }, + } + ] + result, _ = TranscriptParser.parse_entries(entries) + assert len(result) == 1 + assert result[0].role == "assistant" + assert result[0].content_type == "local_command" + assert "❯ `pwd`" in result[0].text + assert "/home/dimkk/new-proj/opticlaw" in result[0].text From 6f7444f1caa5d5fe51074c68932cecc40d1fa944 Mon Sep 17 00:00:00 2001 From: dimkk Date: Sun, 1 Mar 2026 23:19:09 +0300 Subject: [PATCH 2/7] fix: clamp oversized monitor payloads and skip huge startup backlog --- src/ccbot/session_monitor.py | 63 +++++++++++++++++++- tests/ccbot/test_session_monitor.py | 89 ++++++++++++++++++++++++++++- 2 files changed, 149 insertions(+), 3 deletions(-) diff --git a/src/ccbot/session_monitor.py b/src/ccbot/session_monitor.py index 3e0807f1..4dcdcdfc 100644 --- a/src/ccbot/session_monitor.py +++ b/src/ccbot/session_monitor.py @@ -30,6 +30,12 @@ logger = logging.getLogger(__name__) +# Safety limits for oversized transcript payloads. +# These values intentionally avoid additional env knobs. +_MAX_JSONL_LINE_CHARS = 256_000 +_MAX_EMITTED_TEXT_CHARS = 32_000 +_MAX_INITIAL_BACKLOG_BYTES = 1_000_000 + @dataclass class SessionInfo: @@ -90,6 +96,18 @@ def __init__( self._last_session_map: dict[str, str] = {} # window_key -> session_id # In-memory mtime cache for quick file change detection (not persisted) self._file_mtimes: dict[str, float] = {} # session_id -> last_seen_mtime + # Sessions seen by this monitor process. Used to apply startup catch-up + # protection only once per session after restart. + self._seen_sessions: set[str] = set() + + @staticmethod + def _truncate_emitted_text(text: str) -> str: + """Clamp oversized message text before enqueueing Telegram work.""" + if len(text) <= _MAX_EMITTED_TEXT_CHARS: + return text + omitted = len(text) - _MAX_EMITTED_TEXT_CHARS + suffix = f"\n\n… (truncated by CCBot, omitted {omitted} chars)" + return text[:_MAX_EMITTED_TEXT_CHARS] + suffix def set_message_callback( self, callback: Callable[[NewMessage], Awaitable[None]] @@ -281,6 +299,17 @@ async def _read_new_lines( if not line: break line_end = await f.tell() + if len(line) > _MAX_JSONL_LINE_CHARS: + logger.warning( + "Oversized JSONL line skipped for session %s: line=%d-%d chars=%d limit=%d", + session.session_id, + line_start, + line_end, + len(line), + _MAX_JSONL_LINE_CHARS, + ) + safe_offset = line_end + continue data = TranscriptParser.parse_line(line) if data: data["__ccbot_line_start"] = line_start @@ -352,6 +381,25 @@ async def check_for_updates(self, active_session_ids: set[str]) -> list[NewMessa except OSError: continue + # Restart protection: if we are far behind on first observation + # of an existing tracked session, fast-forward offset to avoid + # blocking startup with a huge catch-up burst. + if session_info.session_id not in self._seen_sessions: + backlog_bytes = max(0, current_size - tracked.last_byte_offset) + if backlog_bytes > _MAX_INITIAL_BACKLOG_BYTES: + tracked.last_byte_offset = current_size + self.state.update_session(tracked) + self._file_mtimes[session_info.session_id] = current_mtime + self._seen_sessions.add(session_info.session_id) + logger.warning( + "Fast-forwarded session %s backlog on startup: skipped_bytes=%d threshold=%d", + session_info.session_id, + backlog_bytes, + _MAX_INITIAL_BACKLOG_BYTES, + ) + continue + self._seen_sessions.add(session_info.session_id) + last_mtime = self._file_mtimes.get(session_info.session_id, 0.0) if ( current_mtime <= last_mtime @@ -390,17 +438,28 @@ async def check_for_updates(self, active_session_ids: set[str]) -> list[NewMessa # Skip user messages unless show_user_messages is enabled if entry.role == "user" and not config.show_user_messages: continue + text = self._truncate_emitted_text(entry.text) + if text != entry.text: + logger.warning( + "LineTrace clamp: trace=%s type=%s role=%s original_len=%d clamped_len=%d limit=%d", + trace_id, + entry.content_type, + entry.role, + len(entry.text), + len(text), + _MAX_EMITTED_TEXT_CHARS, + ) logger.info( "LineTrace in: trace=%s type=%s role=%s text_len=%d", trace_id, entry.content_type, entry.role, - len(entry.text), + len(text), ) new_messages.append( NewMessage( session_id=session_info.session_id, - text=entry.text, + text=text, is_complete=True, content_type=entry.content_type, tool_use_id=entry.tool_use_id, diff --git a/tests/ccbot/test_session_monitor.py b/tests/ccbot/test_session_monitor.py index b1274e63..d7bc8ae3 100644 --- a/tests/ccbot/test_session_monitor.py +++ b/tests/ccbot/test_session_monitor.py @@ -1,11 +1,18 @@ """Unit tests for SessionMonitor JSONL reading and offset handling.""" import json +from unittest.mock import AsyncMock import pytest from ccbot.monitor_state import TrackedSession -from ccbot.session_monitor import SessionMonitor +from ccbot.session_monitor import ( + _MAX_EMITTED_TEXT_CHARS, + _MAX_INITIAL_BACKLOG_BYTES, + SessionInfo, + SessionMonitor, +) +from ccbot.transcript_parser import ParsedEntry class TestReadNewLinesOffsetRecovery: @@ -93,3 +100,83 @@ async def test_truncation_detection(self, monitor, tmp_path, make_jsonl_entry): # Should reset offset to 0 and read the line assert session.last_byte_offset == jsonl_file.stat().st_size assert len(result) == 1 + + +class TestSessionMonitorOversizedProtection: + @pytest.fixture + def monitor(self, tmp_path): + return SessionMonitor( + projects_path=tmp_path / "projects", + state_file=tmp_path / "monitor_state.json", + ) + + @pytest.mark.asyncio + async def test_check_for_updates_truncates_oversized_entry_text( + self, monitor, tmp_path, monkeypatch + ): + session_id = "s1" + jsonl_file = tmp_path / "session.jsonl" + jsonl_file.write_text("{}\n", encoding="utf-8") + + tracked = TrackedSession( + session_id=session_id, + file_path=str(jsonl_file), + last_byte_offset=0, + ) + monitor.state.update_session(tracked) + monitor._file_mtimes[session_id] = -1.0 + + async def fake_scan_projects(): + return [SessionInfo(session_id=session_id, file_path=jsonl_file)] + + async def fake_read_new_lines(_tracked, _file): + return [{"__ccbot_line_start": 10, "__ccbot_line_end": 20}] + + huge_text = "X" * (_MAX_EMITTED_TEXT_CHARS + 777) + + def fake_parse_entries(_entries, pending_tools=None): + return ( + [ParsedEntry(role="assistant", text=huge_text, content_type="text")], + pending_tools or {}, + ) + + monkeypatch.setattr(monitor, "scan_projects", fake_scan_projects) + monkeypatch.setattr(monitor, "_read_new_lines", fake_read_new_lines) + monkeypatch.setattr( + "ccbot.session_monitor.TranscriptParser.parse_entries", + fake_parse_entries, + ) + + messages = await monitor.check_for_updates({session_id}) + assert len(messages) == 1 + assert len(messages[0].text) < len(huge_text) + assert "truncated by CCBot" in messages[0].text + + @pytest.mark.asyncio + async def test_check_for_updates_fast_forwards_large_startup_backlog( + self, monitor, tmp_path, monkeypatch + ): + session_id = "s2" + jsonl_file = tmp_path / "session.jsonl" + payload = "A" * (_MAX_INITIAL_BACKLOG_BYTES + 100) + jsonl_file.write_text(payload, encoding="utf-8") + + tracked = TrackedSession( + session_id=session_id, + file_path=str(jsonl_file), + last_byte_offset=0, + ) + monitor.state.update_session(tracked) + monitor._file_mtimes[session_id] = -1.0 + + async def fake_scan_projects(): + return [SessionInfo(session_id=session_id, file_path=jsonl_file)] + + mock_read_new_lines = AsyncMock(return_value=[]) + monkeypatch.setattr(monitor, "scan_projects", fake_scan_projects) + monkeypatch.setattr(monitor, "_read_new_lines", mock_read_new_lines) + + messages = await monitor.check_for_updates({session_id}) + assert messages == [] + assert tracked.last_byte_offset == jsonl_file.stat().st_size + mock_read_new_lines.assert_not_called() From c8e757b2bc6bbde6ca2db9dd0be10bd234d39bce Mon Sep 17 00:00:00 2001 From: dimkk Date: Sun, 1 Mar 2026 23:27:30 +0300 Subject: [PATCH 3/7] fix: prevent startup stalls on heavy session backlog --- src/ccbot/bot.py | 19 +++++++++++++++++-- src/ccbot/session_monitor.py | 14 +++++++++++++- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/ccbot/bot.py b/src/ccbot/bot.py index 422be268..07a10124 100644 --- a/src/ccbot/bot.py +++ b/src/ccbot/bot.py @@ -142,6 +142,9 @@ logger = logging.getLogger(__name__) +# Keep startup responsive even when tmux/state files are slow. +_POST_INIT_RESOLVE_TIMEOUT_SECONDS = 8.0 + # Session monitor instance session_monitor: SessionMonitor | None = None @@ -2070,8 +2073,20 @@ async def post_init(application: Application) -> None: _port_forward_task = asyncio.create_task(_run_port_forwarding(application.bot)) logger.info("Port forwarding task started for ports: %s", config.forward_ports) - # Re-resolve stale window IDs from persisted state against live tmux windows - await session_manager.resolve_stale_ids() + # Re-resolve stale window IDs from persisted state against live tmux windows. + # Guard with timeout so bot startup never hangs on slow tmux/state I/O. + try: + await asyncio.wait_for( + session_manager.resolve_stale_ids(), + timeout=_POST_INIT_RESOLVE_TIMEOUT_SECONDS, + ) + except asyncio.TimeoutError: + logger.warning( + "Startup timeout: resolve_stale_ids exceeded %.1fs; continuing", + _POST_INIT_RESOLVE_TIMEOUT_SECONDS, + ) + except Exception as e: + logger.warning("Startup warning: resolve_stale_ids failed: %s", e) # Pre-fill global rate limiter bucket on restart. # AsyncLimiter starts at _level=0 (full burst capacity), but Telegram's diff --git a/src/ccbot/session_monitor.py b/src/ccbot/session_monitor.py index 4dcdcdfc..4b12fc51 100644 --- a/src/ccbot/session_monitor.py +++ b/src/ccbot/session_monitor.py @@ -34,7 +34,8 @@ # These values intentionally avoid additional env knobs. _MAX_JSONL_LINE_CHARS = 256_000 _MAX_EMITTED_TEXT_CHARS = 32_000 -_MAX_INITIAL_BACKLOG_BYTES = 1_000_000 +_MAX_INITIAL_BACKLOG_BYTES = 256_000 +_MAX_READ_BYTES_PER_CYCLE = 256_000 @dataclass @@ -293,12 +294,14 @@ async def _read_new_lines( # successfully. A non-empty line that fails JSON parsing is # likely a partial write; stop and retry next cycle. safe_offset = session.last_byte_offset + processed_bytes = 0 while True: line_start = await f.tell() line = await f.readline() if not line: break line_end = await f.tell() + processed_bytes += line_end - line_start if len(line) > _MAX_JSONL_LINE_CHARS: logger.warning( "Oversized JSONL line skipped for session %s: line=%d-%d chars=%d limit=%d", @@ -327,6 +330,15 @@ async def _read_new_lines( # Empty line — safe to skip safe_offset = line_end + if processed_bytes >= _MAX_READ_BYTES_PER_CYCLE: + logger.warning( + "Read cycle capped for session %s: processed_bytes=%d limit=%d", + session.session_id, + processed_bytes, + _MAX_READ_BYTES_PER_CYCLE, + ) + break + session.last_byte_offset = safe_offset except OSError as e: From 1295710edf591033116bfabcac0154b9b3eed373 Mon Sep 17 00:00:00 2001 From: dimkk Date: Sun, 1 Mar 2026 23:35:25 +0300 Subject: [PATCH 4/7] fix: reset monitor state after 10s silent startup --- src/ccbot/session_monitor.py | 35 +++++++++++++++++++++++++++++ tests/ccbot/test_session_monitor.py | 23 +++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/src/ccbot/session_monitor.py b/src/ccbot/session_monitor.py index 4b12fc51..3f9d6856 100644 --- a/src/ccbot/session_monitor.py +++ b/src/ccbot/session_monitor.py @@ -36,6 +36,7 @@ _MAX_EMITTED_TEXT_CHARS = 32_000 _MAX_INITIAL_BACKLOG_BYTES = 256_000 _MAX_READ_BYTES_PER_CYCLE = 256_000 +_NO_MESSAGES_RECOVERY_SECONDS = 10.0 @dataclass @@ -100,6 +101,8 @@ def __init__( # Sessions seen by this monitor process. Used to apply startup catch-up # protection only once per session after restart. self._seen_sessions: set[str] = set() + self._last_message_monotonic: float = time.monotonic() + self._silence_recovery_done = False @staticmethod def _truncate_emitted_text(text: str) -> str: @@ -110,6 +113,22 @@ def _truncate_emitted_text(text: str) -> str: suffix = f"\n\n… (truncated by CCBot, omitted {omitted} chars)" return text[:_MAX_EMITTED_TEXT_CHARS] + suffix + def _reset_monitor_state(self, reason: str) -> None: + """Hard-reset monitor state file and in-memory offsets.""" + tracked_before = len(self.state.tracked_sessions) + self.state.tracked_sessions.clear() + self._file_mtimes.clear() + self._pending_tools.clear() + self._seen_sessions.clear() + self.state.save() + self._silence_recovery_done = True + self._last_message_monotonic = time.monotonic() + logger.warning( + "Monitor state reset: reason=%s removed_sessions=%d", + reason, + tracked_before, + ) + def set_message_callback( self, callback: Callable[[NewMessage], Awaitable[None]] ) -> None: @@ -644,6 +663,20 @@ async def _monitor_loop(self) -> None: # Check for new messages (all I/O is async) new_messages = await self.check_for_updates(active_session_ids) + if new_messages: + self._last_message_monotonic = time.monotonic() + elif ( + active_session_ids + and not self._silence_recovery_done + and ( + time.monotonic() - self._last_message_monotonic + >= _NO_MESSAGES_RECOVERY_SECONDS + ) + ): + self._reset_monitor_state( + f"no_messages_for_{int(_NO_MESSAGES_RECOVERY_SECONDS)}s" + ) + for msg in new_messages: status = "complete" if msg.is_complete else "streaming" preview = msg.text[:80] + ("..." if len(msg.text) > 80 else "") @@ -666,6 +699,8 @@ def start(self) -> None: logger.warning("Monitor already running") return self._running = True + self._last_message_monotonic = time.monotonic() + self._silence_recovery_done = False self._task = asyncio.create_task(self._monitor_loop()) async def stop(self) -> None: diff --git a/tests/ccbot/test_session_monitor.py b/tests/ccbot/test_session_monitor.py index d7bc8ae3..2a49b612 100644 --- a/tests/ccbot/test_session_monitor.py +++ b/tests/ccbot/test_session_monitor.py @@ -180,3 +180,26 @@ async def fake_scan_projects(): assert messages == [] assert tracked.last_byte_offset == jsonl_file.stat().st_size mock_read_new_lines.assert_not_called() + + def test_reset_monitor_state_clears_offsets_and_writes_file(self, monitor, tmp_path): + state_path = tmp_path / "monitor_state.json" + monitor.state.state_file = state_path + monitor.state.tracked_sessions["a"] = TrackedSession( + session_id="a", + file_path="/tmp/a.jsonl", + last_byte_offset=123, + ) + monitor._file_mtimes["a"] = 1.0 + monitor._pending_tools["a"] = {"tool": "x"} + monitor._seen_sessions.add("a") + + monitor._reset_monitor_state("test") + + assert monitor.state.tracked_sessions == {} + assert monitor._file_mtimes == {} + assert monitor._pending_tools == {} + assert monitor._seen_sessions == set() + assert monitor._silence_recovery_done is True + assert state_path.exists() + payload = json.loads(state_path.read_text(encoding="utf-8")) + assert payload.get("tracked_sessions") == {} From 5fa260cb1babf274bba549b0961763a8fe8f325b Mon Sep 17 00:00:00 2001 From: dimkk Date: Mon, 2 Mar 2026 05:17:43 +0300 Subject: [PATCH 5/7] fix: harden monitor loop with timeouts and safer dedupe --- src/ccbot/session_monitor.py | 116 +++++++++++++++++++++++++++- tests/ccbot/test_session_monitor.py | 49 ++++++++++++ 2 files changed, 161 insertions(+), 4 deletions(-) diff --git a/src/ccbot/session_monitor.py b/src/ccbot/session_monitor.py index 3f9d6856..6228733e 100644 --- a/src/ccbot/session_monitor.py +++ b/src/ccbot/session_monitor.py @@ -12,6 +12,7 @@ """ import asyncio +import hashlib import json import logging import time @@ -37,6 +38,12 @@ _MAX_INITIAL_BACKLOG_BYTES = 256_000 _MAX_READ_BYTES_PER_CYCLE = 256_000 _NO_MESSAGES_RECOVERY_SECONDS = 10.0 +_MAX_DUPLICATE_OFFSET_GAP_BYTES = 8_192 +_CODEX_MAPPER_SYNC_INTERVAL_SECONDS = 15.0 +_CODEX_MAPPER_SYNC_TIMEOUT_SECONDS = 5.0 +_LOAD_SESSION_MAP_TIMEOUT_SECONDS = 5.0 +_DETECT_CLEANUP_TIMEOUT_SECONDS = 5.0 +_CHECK_UPDATES_TIMEOUT_SECONDS = 8.0 @dataclass @@ -101,8 +108,13 @@ def __init__( # Sessions seen by this monitor process. Used to apply startup catch-up # protection only once per session after restart. self._seen_sessions: set[str] = set() + # Last emitted message fingerprint per session for duplicate suppression. + self._last_emitted_fingerprint: dict[ + str, tuple[tuple[str, str, str, int, str], int] + ] = {} self._last_message_monotonic: float = time.monotonic() self._silence_recovery_done = False + self._last_codex_mapper_sync: float = 0.0 @staticmethod def _truncate_emitted_text(text: str) -> str: @@ -120,6 +132,7 @@ def _reset_monitor_state(self, reason: str) -> None: self._file_mtimes.clear() self._pending_tools.clear() self._seen_sessions.clear() + self._last_emitted_fingerprint.clear() self.state.save() self._silence_recovery_done = True self._last_message_monotonic = time.monotonic() @@ -129,6 +142,30 @@ def _reset_monitor_state(self, reason: str) -> None: tracked_before, ) + def _should_skip_duplicate_emit( + self, + *, + session_id: str, + role: str, + content_type: str, + tool_use_id: str | None, + text: str, + line_end: int, + ) -> bool: + """Drop repeated Codex entries that mirror the same content nearby.""" + text_hash = hashlib.sha1(text.encode("utf-8", errors="ignore")).hexdigest() + fingerprint = (role, content_type, tool_use_id or "", len(text), text_hash) + prev = self._last_emitted_fingerprint.get(session_id) + self._last_emitted_fingerprint[session_id] = (fingerprint, line_end) + if prev is None: + return False + prev_fp, prev_line_end = prev + if fingerprint != prev_fp: + return False + if line_end < prev_line_end: + return False + return (line_end - prev_line_end) <= _MAX_DUPLICATE_OFFSET_GAP_BYTES + def set_message_callback( self, callback: Callable[[NewMessage], Awaitable[None]] ) -> None: @@ -462,6 +499,10 @@ async def check_for_updates(self, active_session_ids: set[str]) -> list[NewMessa [raw], pending_tools=carry, ) + if parsed_entries: + # Activity detected in transcript stream, even if some + # parsed entries are dropped as duplicates. + self._last_message_monotonic = time.monotonic() for entry in parsed_entries: if not entry.text and not entry.image_data: @@ -470,6 +511,22 @@ async def check_for_updates(self, active_session_ids: set[str]) -> list[NewMessa if entry.role == "user" and not config.show_user_messages: continue text = self._truncate_emitted_text(entry.text) + if self._should_skip_duplicate_emit( + session_id=session_info.session_id, + role=entry.role, + content_type=entry.content_type, + tool_use_id=entry.tool_use_id, + text=text, + line_end=line_end, + ): + logger.info( + "LineTrace drop: trace=%s reason=duplicate_emit type=%s role=%s text_len=%d", + trace_id, + entry.content_type, + entry.role, + len(text), + ) + continue if text != entry.text: logger.warning( "LineTrace clamp: trace=%s type=%s role=%s original_len=%d clamped_len=%d limit=%d", @@ -651,17 +708,67 @@ async def _monitor_loop(self) -> None: while self._running: try: if config.provider == "codex": - await codex_session_mapper.sync_session_map() + now = time.monotonic() + if ( + now - self._last_codex_mapper_sync + >= _CODEX_MAPPER_SYNC_INTERVAL_SECONDS + ): + try: + await asyncio.wait_for( + codex_session_mapper.sync_session_map(), + timeout=_CODEX_MAPPER_SYNC_TIMEOUT_SECONDS, + ) + except asyncio.TimeoutError: + logger.warning( + "Codex mapper sync timeout after %.1fs; continuing", + _CODEX_MAPPER_SYNC_TIMEOUT_SECONDS, + ) + except Exception as e: + logger.warning("Codex mapper sync failed: %s", e) + self._last_codex_mapper_sync = now # Load hook-based session map updates - await session_manager.load_session_map() + try: + await asyncio.wait_for( + session_manager.load_session_map(), + timeout=_LOAD_SESSION_MAP_TIMEOUT_SECONDS, + ) + except asyncio.TimeoutError: + logger.warning( + "load_session_map timeout after %.1fs; skipping cycle", + _LOAD_SESSION_MAP_TIMEOUT_SECONDS, + ) + await asyncio.sleep(self.poll_interval) + continue # Detect session_map changes and cleanup replaced/removed sessions - current_map = await self._detect_and_cleanup_changes() + try: + current_map = await asyncio.wait_for( + self._detect_and_cleanup_changes(), + timeout=_DETECT_CLEANUP_TIMEOUT_SECONDS, + ) + except asyncio.TimeoutError: + logger.warning( + "detect_and_cleanup timeout after %.1fs; skipping cycle", + _DETECT_CLEANUP_TIMEOUT_SECONDS, + ) + await asyncio.sleep(self.poll_interval) + continue active_session_ids = set(current_map.values()) # Check for new messages (all I/O is async) - new_messages = await self.check_for_updates(active_session_ids) + try: + new_messages = await asyncio.wait_for( + self.check_for_updates(active_session_ids), + timeout=_CHECK_UPDATES_TIMEOUT_SECONDS, + ) + except asyncio.TimeoutError: + logger.warning( + "check_for_updates timeout after %.1fs; continuing", + _CHECK_UPDATES_TIMEOUT_SECONDS, + ) + await asyncio.sleep(self.poll_interval) + continue if new_messages: self._last_message_monotonic = time.monotonic() @@ -701,6 +808,7 @@ def start(self) -> None: self._running = True self._last_message_monotonic = time.monotonic() self._silence_recovery_done = False + self._last_codex_mapper_sync = 0.0 self._task = asyncio.create_task(self._monitor_loop()) async def stop(self) -> None: diff --git a/tests/ccbot/test_session_monitor.py b/tests/ccbot/test_session_monitor.py index 2a49b612..0e4b5243 100644 --- a/tests/ccbot/test_session_monitor.py +++ b/tests/ccbot/test_session_monitor.py @@ -8,6 +8,7 @@ from ccbot.monitor_state import TrackedSession from ccbot.session_monitor import ( _MAX_EMITTED_TEXT_CHARS, + _MAX_DUPLICATE_OFFSET_GAP_BYTES, _MAX_INITIAL_BACKLOG_BYTES, SessionInfo, SessionMonitor, @@ -181,6 +182,54 @@ async def fake_scan_projects(): assert tracked.last_byte_offset == jsonl_file.stat().st_size mock_read_new_lines.assert_not_called() + def test_duplicate_emit_guard_skips_nearby_identical_entries(self, monitor): + assert ( + monitor._should_skip_duplicate_emit( + session_id="dup", + role="assistant", + content_type="text", + tool_use_id=None, + text="same message", + line_end=100, + ) + is False + ) + assert ( + monitor._should_skip_duplicate_emit( + session_id="dup", + role="assistant", + content_type="text", + tool_use_id=None, + text="same message", + line_end=150, + ) + is True + ) + + def test_duplicate_emit_guard_allows_far_apart_repeats(self, monitor): + assert ( + monitor._should_skip_duplicate_emit( + session_id="dup2", + role="assistant", + content_type="text", + tool_use_id=None, + text="repeat later", + line_end=10, + ) + is False + ) + assert ( + monitor._should_skip_duplicate_emit( + session_id="dup2", + role="assistant", + content_type="text", + tool_use_id=None, + text="repeat later", + line_end=10 + _MAX_DUPLICATE_OFFSET_GAP_BYTES + 1, + ) + is False + ) + def test_reset_monitor_state_clears_offsets_and_writes_file(self, monitor, tmp_path): state_path = tmp_path / "monitor_state.json" monitor.state.state_file = state_path From 176f79d8349ce2e74e98a5c19554aaff4e648066 Mon Sep 17 00:00:00 2001 From: dimkk Date: Mon, 2 Mar 2026 11:56:54 +0300 Subject: [PATCH 6/7] fix: allow codex mapper to switch to newer session for window --- src/ccbot/codex_mapper.py | 30 ++++++++++++----- tests/ccbot/test_codex_mapper.py | 56 ++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 9 deletions(-) diff --git a/src/ccbot/codex_mapper.py b/src/ccbot/codex_mapper.py index b78d8e9f..fd903ec2 100644 --- a/src/ccbot/codex_mapper.py +++ b/src/ccbot/codex_mapper.py @@ -201,20 +201,23 @@ async def sync_session_map(self) -> bool: candidates = by_cwd.get(norm_cwd, []) chosen: CodexSessionMeta | None = None + existing_sid = existing.get("session_id", "") + has_existing_codex = existing_provider == "codex" and bool(existing_sid) + existing_meta: CodexSessionMeta | None = None + if existing_sid: + for meta in all_metas: + if meta.session_id == existing_sid: + existing_meta = meta + break + if ( preferred_meta is not None and w.window_name == config.tmux_session_name + and not has_existing_codex and preferred_meta.session_id not in assigned_session_ids ): chosen = preferred_meta - existing_sid = existing.get("session_id", "") - if chosen is None and existing_sid: - for meta in all_metas: - if meta.session_id == existing_sid: - chosen = meta - break - if chosen is None: for meta in candidates: if meta.session_id in assigned_session_ids: @@ -222,6 +225,14 @@ async def sync_session_map(self) -> bool: chosen = meta break + # If we couldn't resolve by cwd, keep previous mapping if still valid. + if ( + chosen is None + and existing_meta is not None + and existing_meta.session_id not in assigned_session_ids + ): + chosen = existing_meta + # Fallback: choose the newest unassigned rollout across all projects. # This handles "codex resume " where pane cwd can differ from session cwd. if chosen is None: @@ -231,8 +242,9 @@ async def sync_session_map(self) -> bool: chosen = meta break - # Keep previous mapping if file still exists and no better candidate. - if chosen is None and existing_provider == "codex" and existing_sid: + # Keep previous raw mapping only when we failed to parse its meta + # but file path still exists. + if chosen is None and has_existing_codex: existing_fp = Path(existing.get("file_path", "")) if existing_fp.exists(): next_entries[key] = existing diff --git a/tests/ccbot/test_codex_mapper.py b/tests/ccbot/test_codex_mapper.py index 6b94cb04..9e9900d7 100644 --- a/tests/ccbot/test_codex_mapper.py +++ b/tests/ccbot/test_codex_mapper.py @@ -149,3 +149,59 @@ async def test_prefers_resume_session_id_for_main_window( assert changed is True data = json.loads(map_file.read_text()) assert data["ccbot:@3"]["session_id"] == resume_sid + + @pytest.mark.asyncio + async def test_switches_existing_mapping_to_newer_same_cwd_session( + self, tmp_path, monkeypatch + ): + sessions_root = tmp_path / "sessions" + map_file = tmp_path / "session_map.json" + proj_cwd = str((tmp_path / "proj").resolve()) + sid_old = "sid-old" + sid_new = "sid-new" + old_path = sessions_root / "2026/03/01" / f"rollout-2026-03-01T10-00-00-{sid_old}.jsonl" + new_path = sessions_root / "2026/03/01" / f"rollout-2026-03-01T11-00-00-{sid_new}.jsonl" + _write_rollout(old_path, sid_old, proj_cwd, "2026-03-01T10:00:00Z") + _write_rollout(new_path, sid_new, proj_cwd, "2026-03-01T11:00:00Z") + os.utime(old_path, (1700000000, 1700000000)) + os.utime(new_path, (1800000000, 1800000000)) + + map_file.write_text( + json.dumps( + { + "ccbot:@1": { + "session_id": sid_old, + "cwd": proj_cwd, + "window_name": "proj", + "provider": "codex", + "file_path": str(old_path), + } + } + ), + encoding="utf-8", + ) + + mapper = CodexSessionMapper( + sessions_root=sessions_root, session_map_file=map_file + ) + windows = [ + TmuxWindow( + window_id="@1", + window_name="proj", + cwd=proj_cwd, + pane_current_command="node", + ) + ] + + monkeypatch.setattr( + "ccbot.codex_mapper.config.codex_resume_session_id", "some-other-sid" + ) + monkeypatch.setattr("ccbot.codex_mapper.config.tmux_session_name", "ccbot") + + with patch("ccbot.codex_mapper.tmux_manager") as mock_tmux: + mock_tmux.list_windows = AsyncMock(return_value=windows) + changed = await mapper.sync_session_map() + + assert changed is True + data = json.loads(map_file.read_text()) + assert data["ccbot:@1"]["session_id"] == sid_new From de47f8603ea46ed1a832c583a4aacf4f6edcde74 Mon Sep 17 00:00:00 2001 From: dimkk Date: Tue, 3 Mar 2026 21:44:02 +0300 Subject: [PATCH 7/7] Fix Codex interactive prompt handling and session resume mapping --- src/ccbot/bot.py | 64 +++++++++- src/ccbot/codex_mapper.py | 3 +- src/ccbot/config.py | 3 +- src/ccbot/handlers/callback_data.py | 1 + src/ccbot/handlers/interactive_ui.py | 125 +++++++++++++++++++- src/ccbot/handlers/status_polling.py | 33 +++++- src/ccbot/session_monitor.py | 29 +++++ src/ccbot/terminal_parser.py | 14 ++- tests/ccbot/conftest.py | 15 +++ tests/ccbot/handlers/test_interactive_ui.py | 104 +++++++++++++++- tests/ccbot/handlers/test_status_polling.py | 43 ++++++- tests/ccbot/test_bot_interactive.py | 72 +++++++++++ tests/ccbot/test_codex_mapper.py | 65 ++++++++++ tests/ccbot/test_config.py | 1 + tests/ccbot/test_session_monitor.py | 31 +++++ tests/ccbot/test_terminal_parser.py | 14 +++ 16 files changed, 603 insertions(+), 14 deletions(-) create mode 100644 tests/ccbot/test_bot_interactive.py diff --git a/src/ccbot/bot.py b/src/ccbot/bot.py index 07a10124..476010c4 100644 --- a/src/ccbot/bot.py +++ b/src/ccbot/bot.py @@ -66,6 +66,7 @@ CB_ASK_LEFT, CB_ASK_REFRESH, CB_ASK_RIGHT, + CB_ASK_SELECT, CB_ASK_SPACE, CB_ASK_TAB, CB_ASK_UP, @@ -108,6 +109,7 @@ INTERACTIVE_TOOL_NAMES, clear_interactive_mode, clear_interactive_msg, + get_interactive_choice_state, get_interactive_msg_id, get_interactive_window, handle_interactive_ui, @@ -1750,6 +1752,54 @@ async def callback_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) - await handle_interactive_ui(context.bot, user.id, window_id, thread_id) await query.answer() + # Interactive UI: Numeric quick-select (1..N) + elif data.startswith(CB_ASK_SELECT): + payload = data[len(CB_ASK_SELECT) :] + parts = payload.split(":", 1) + if len(parts) != 2: + await query.answer("Invalid selection", show_alert=True) + return + index_raw, window_id = parts + try: + target_index = int(index_raw) + except ValueError: + await query.answer("Invalid selection", show_alert=True) + return + + thread_id = _get_thread_id(update) + choice_state = get_interactive_choice_state(user.id, thread_id) + if choice_state is None: + # State can be stale after restart — refresh once from terminal. + await handle_interactive_ui(context.bot, user.id, window_id, thread_id) + choice_state = get_interactive_choice_state(user.id, thread_id) + if choice_state is None: + await query.answer("No selectable options", show_alert=True) + return + + selected_index, total_options = choice_state + if target_index < 1 or target_index > total_options: + await query.answer("Option out of range", show_alert=True) + return + + w = await tmux_manager.find_window_by_id(window_id) + if not w: + await query.answer("Window no longer exists", show_alert=True) + return + + delta = target_index - selected_index + if delta != 0: + nav_key = "Down" if delta > 0 else "Up" + for _ in range(abs(delta)): + await tmux_manager.send_keys( + w.window_id, nav_key, enter=False, literal=False + ) + await asyncio.sleep(0.05) + + await tmux_manager.send_keys(w.window_id, "Enter", enter=False, literal=False) + await asyncio.sleep(0.35) + await handle_interactive_ui(context.bot, user.id, window_id, thread_id) + await query.answer(f"Selected {target_index}") + # Interactive UI: Down arrow elif data.startswith(CB_ASK_DOWN): window_id = data[len(CB_ASK_DOWN) :] @@ -1951,9 +2001,19 @@ async def _mark_window_read_offset(user_id: int, window_id: str) -> None: # UI not rendered — clear the early-set mode clear_interactive_mode(user_id, thread_id) - # Any non-interactive message means the interaction is complete — delete the UI message + # Non-interactive events can still arrive while an approval UI is visible. + # Only clear the interactive message when the UI is actually gone. if get_interactive_msg_id(user_id, thread_id): - await clear_interactive_msg(user_id, bot, thread_id) + should_clear_interactive = False + w = await tmux_manager.find_window_by_id(wid) + if not w: + should_clear_interactive = True + else: + pane_text = await tmux_manager.capture_pane(w.window_id) + if pane_text: + should_clear_interactive = not is_interactive_ui(pane_text) + if should_clear_interactive: + await clear_interactive_msg(user_id, bot, thread_id) parts = build_response_parts( msg.text, diff --git a/src/ccbot/codex_mapper.py b/src/ccbot/codex_mapper.py index fd903ec2..dce8047e 100644 --- a/src/ccbot/codex_mapper.py +++ b/src/ccbot/codex_mapper.py @@ -213,9 +213,10 @@ async def sync_session_map(self) -> bool: if ( preferred_meta is not None and w.window_name == config.tmux_session_name - and not has_existing_codex and preferred_meta.session_id not in assigned_session_ids ): + # Explicit resume target should win over stale/existing mapping + # for the main ccbot window. chosen = preferred_meta if chosen is None: diff --git a/src/ccbot/config.py b/src/ccbot/config.py index 6d9dca00..9e0550db 100644 --- a/src/ccbot/config.py +++ b/src/ccbot/config.py @@ -97,7 +97,8 @@ def __init__(self) -> None: # Provider capabilities and UI labels self.agent_name = "Codex CLI" if self.provider == "codex" else "Claude Code" self.supports_usage_command = self.provider == "claude" - self.supports_claude_interactive_ui = self.provider == "claude" + # Interactive terminal prompts (approval/select) are used by both providers. + self.supports_claude_interactive_ui = self.provider in ("claude", "codex") # Forward unknown slash commands for both providers by default: # e.g. /status, /permissions, /clear, /compact. slash_default = "true" diff --git a/src/ccbot/handlers/callback_data.py b/src/ccbot/handlers/callback_data.py index e4846aff..86d4429f 100644 --- a/src/ccbot/handlers/callback_data.py +++ b/src/ccbot/handlers/callback_data.py @@ -36,6 +36,7 @@ CB_ASK_DOWN = "aq:down:" # aq:down: CB_ASK_LEFT = "aq:left:" # aq:left: CB_ASK_RIGHT = "aq:right:" # aq:right: +CB_ASK_SELECT = "aq:num:" # aq:num:: CB_ASK_ESC = "aq:esc:" # aq:esc: CB_ASK_ENTER = "aq:enter:" # aq:enter: CB_ASK_SPACE = "aq:spc:" # aq:spc: diff --git a/src/ccbot/handlers/interactive_ui.py b/src/ccbot/handlers/interactive_ui.py index 174e3a9e..908ad550 100644 --- a/src/ccbot/handlers/interactive_ui.py +++ b/src/ccbot/handlers/interactive_ui.py @@ -8,6 +8,7 @@ Provides: - Keyboard navigation (up/down/left/right/enter/esc) + - Direct numeric selection for option lists (1..N) - Terminal capture and display - Interactive mode tracking per user and thread @@ -15,8 +16,11 @@ """ import logging +import re +import time from telegram import Bot, InlineKeyboardButton, InlineKeyboardMarkup +from telegram.error import BadRequest from ..session import session_manager from ..terminal_parser import extract_interactive_content, is_interactive_ui @@ -28,6 +32,7 @@ CB_ASK_LEFT, CB_ASK_REFRESH, CB_ASK_RIGHT, + CB_ASK_SELECT, CB_ASK_SPACE, CB_ASK_TAB, CB_ASK_UP, @@ -45,6 +50,17 @@ # Track interactive mode: (user_id, thread_id_or_0) -> window_id _interactive_mode: dict[tuple[int, int], str] = {} +# Track parsed option state: (user_id, thread_id_or_0) -> (selected_index, total_options) +_interactive_choices: dict[tuple[int, int], tuple[int, int]] = {} +# Track latest rendered interactive payload to suppress duplicate re-sends. +# Value: (text, monotonic_timestamp) +_interactive_last_render: dict[tuple[int, int], tuple[str, float]] = {} + +_INTERACTIVE_RESEND_COOLDOWN_SECONDS = 30.0 + +_OPTION_NUMBER_RE = re.compile(r"^(?P\d{1,2})\.\s+(?P