From a2625d928ed22829b9d62891ffb7d2190f4fcf9a Mon Sep 17 00:00:00 2001 From: 92819 <928198962@qq.com> Date: Sat, 18 Apr 2026 16:41:49 +0800 Subject: [PATCH 1/6] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BA=86=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E6=80=BB=E7=BA=BF=E5=92=8C=E6=B3=A8=E5=86=8C=E4=B8=AD?= =?UTF-8?q?=E5=BF=83=EF=BC=8C=E8=BF=9B=E8=A1=8C=E8=A7=A3=E8=80=A6=E6=93=8D?= =?UTF-8?q?=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent.py | 513 +++++++++++++++++++++++-------------- background_manager.py | 19 ++ frontend/package-lock.json | 9 + message_center.py | 130 ++++++++++ platforms/web.py | 89 +++++-- tool_registry.py | 34 +++ 6 files changed, 583 insertions(+), 211 deletions(-) create mode 100644 message_center.py create mode 100644 tool_registry.py diff --git a/agent.py b/agent.py index 31efa61..502d65f 100644 --- a/agent.py +++ b/agent.py @@ -11,6 +11,8 @@ from workspace_manager import WorkspaceManager from task_manager import task_manager from background_manager import background_manager +from message_center import MessageCenter +from tool_registry import ToolExecutionResult, ToolRegistry logger = logging.getLogger(__name__) @@ -402,6 +404,42 @@ ] +def _get_block_type(block) -> str: + if isinstance(block, dict): + return block.get("type", "") + return getattr(block, "type", "") + + +def _get_block_text(block) -> str: + if isinstance(block, dict): + return block.get("text", "") + return getattr(block, "text", "") + + +def _extract_tool_use(block) -> tuple[str, str, dict]: + if isinstance(block, dict): + return ( + str(block.get("id", "")), + str(block.get("name", "")), + block.get("input", {}) or {}, + ) + return ( + str(getattr(block, "id", "")), + str(getattr(block, "name", "")), + getattr(block, "input", {}) or {}, + ) + + +def _extract_text_parts(blocks: list) -> list[str]: + text_parts: list[str] = [] + for block in blocks: + if _get_block_type(block) == "text": + text = _get_block_text(block) + if text: + text_parts.append(text) + return text_parts + + # ============== Context Compression Functions ============== @@ -519,11 +557,7 @@ async def auto_compact(messages: list, focus: str = None) -> list: max_tokens=2000, messages=[{"role": "user", "content": summary_prompt}], ) - # Extract text from response, handling ThinkingBlock etc. - text_parts = [] - for block in response.content: - if hasattr(block, "text") and block.type == "text": - text_parts.append(block.text) + text_parts = _extract_text_parts(response.content) summary = "\n".join(text_parts) if text_parts else "No summary generated." except Exception as e: summary = f"Error generating summary: {str(e)}" @@ -669,16 +703,220 @@ def _auto_create_root_task( return task +def _normalize_info_payload(msg) -> dict: + if isinstance(msg, dict): + return msg + return {"message": str(msg)} + + +def _create_tool_registry( + *, + pm: PlaywrightManager, + page, + effective_session_id: str, + auto_root_task: dict | None, + emit_info, + emit_action_required, + emit_image, + emit_file, +) -> ToolRegistry: + registry = ToolRegistry() + + async def _snapshot(args: dict) -> ToolExecutionResult: + snapshot_text = await pm.get_aria_snapshot(effective_session_id) + return ToolExecutionResult( + output=snapshot_text if snapshot_text else "Could not capture aria snapshot." + ) + + async def _navigate(args: dict) -> ToolExecutionResult: + if not page: + raise RuntimeError("No active page") + await page.goto(args["url"]) + await asyncio.sleep(2) + return ToolExecutionResult(output="Successfully navigated.") + + async def _click(args: dict) -> ToolExecutionResult: + if not page: + raise RuntimeError("No active page") + ref = args.get("ref") + selector = args.get("selector") + if ref: + locator = await pm.locate_by_ref(ref, effective_session_id) + await locator.click(timeout=5000) + elif selector: + await page.click(selector, timeout=5000) + else: + raise ValueError("click requires either 'ref' or 'selector'") + await asyncio.sleep(1) + return ToolExecutionResult(output="Successfully clicked.") + + async def _type_text(args: dict) -> ToolExecutionResult: + if not page: + raise RuntimeError("No active page") + ref = args.get("ref") + selector = args.get("selector") + if ref: + locator = await pm.locate_by_ref(ref, effective_session_id) + await locator.fill(args["text"]) + elif selector: + await page.fill(selector, args["text"]) + else: + raise ValueError("type_text requires either 'ref' or 'selector'") + return ToolExecutionResult(output="Successfully typed text.") + + async def _scroll(args: dict) -> ToolExecutionResult: + if not page: + raise RuntimeError("No active page") + direction = args.get("direction", "down") + if direction == "down": + await page.mouse.wheel(0, 1000) + else: + await page.mouse.wheel(0, -1000) + await asyncio.sleep(1) + return ToolExecutionResult(output="Scrolled.") + + async def _request_human_assistance(args: dict) -> ToolExecutionResult: + reason = args.get("reason", "Login required.") + await pm.block_for_human(emit_action_required, reason, effective_session_id) + return ToolExecutionResult( + output=( + "Human has processed the request. Page might have updated. " + "You may resume your task." + ) + ) + + async def _extract_info(args: dict) -> ToolExecutionResult: + return ToolExecutionResult(output=f"Extracted: {args['info_summary']}") + + async def _send_screenshot(args: dict) -> ToolExecutionResult: + description = args.get("description", "Current page screenshot") + screenshot_b64 = await pm.get_page_screenshot_base64(effective_session_id) + if screenshot_b64: + await emit_image(description, screenshot_b64) + return ToolExecutionResult(output=f"Screenshot sent to user: {description}") + return ToolExecutionResult(output="Failed to capture screenshot.") + + async def _finish_task(args: dict) -> ToolExecutionResult: + report = args.get("report", "Task completed.") + if auto_root_task: + task_manager.update(auto_root_task["id"], status="completed") + await emit_info( + { + "message": f"✅ **Task Completed**:\n\n{report}", + "message_key": "common.task_completed", + "params": {"report": report}, + } + ) + return ToolExecutionResult(output="Finished.", finished=True) + + async def _read_file(args: dict) -> ToolExecutionResult: + return ToolExecutionResult( + output=await workspace.read_file(args["path"], args.get("limit")) + ) + + async def _write_file(args: dict) -> ToolExecutionResult: + return ToolExecutionResult( + output=await workspace.write_file(args["path"], args["content"]) + ) + + async def _edit_file(args: dict) -> ToolExecutionResult: + return ToolExecutionResult( + output=await workspace.edit_file(args["path"], args["old_text"], args["new_text"]) + ) + + async def _send_file(args: dict) -> ToolExecutionResult: + file_path = args["path"] + description = args.get("description", f"File: {file_path}") + full_path = WORKDIR / file_path + if not full_path.exists(): + return ToolExecutionResult(output=f"Error: File not found: {file_path}") + if not str(full_path.resolve()).startswith(str(WORKDIR.resolve())): + return ToolExecutionResult(output=f"Error: Path escapes workspace: {file_path}") + await emit_file(file_path, description) + return ToolExecutionResult(output=f"File sent: {file_path}") + + async def _run_bash(args: dict) -> ToolExecutionResult: + return ToolExecutionResult( + output=await workspace.run_bash(args["command"], args.get("timeout", 120)) + ) + + async def _task_create(args: dict) -> ToolExecutionResult: + return ToolExecutionResult( + output=task_manager.create(args["subject"], args.get("description", "")) + ) + + async def _task_get(args: dict) -> ToolExecutionResult: + return ToolExecutionResult(output=task_manager.get(args["task_id"])) + + async def _task_update(args: dict) -> ToolExecutionResult: + return ToolExecutionResult( + output=task_manager.update( + args["task_id"], + args.get("status"), + args.get("addBlockedBy"), + args.get("addBlocks"), + ) + ) + + async def _task_list(args: dict) -> ToolExecutionResult: + return ToolExecutionResult(output=task_manager.list_all()) + + async def _background_run(args: dict) -> ToolExecutionResult: + return ToolExecutionResult( + output=await background_manager.run( + args["command"], args.get("timeout"), effective_session_id + ) + ) + + async def _check_background(args: dict) -> ToolExecutionResult: + return ToolExecutionResult( + output=await background_manager.check(args.get("task_id")) + ) + + async def _compact(args: dict) -> ToolExecutionResult: + focus = args.get("focus") + return ToolExecutionResult( + output=f"Manual compression requested{': ' + focus if focus else ''}.", + manual_compact=True, + compact_focus=focus, + ) + + registry.register("snapshot", _snapshot) + registry.register("navigate", _navigate) + registry.register("click", _click) + registry.register("type_text", _type_text) + registry.register("scroll", _scroll) + registry.register("request_human_assistance", _request_human_assistance) + registry.register("extract_info", _extract_info) + registry.register("send_screenshot", _send_screenshot) + registry.register("finish_task", _finish_task) + registry.register("read_file", _read_file) + registry.register("write_file", _write_file) + registry.register("edit_file", _edit_file) + registry.register("send_file", _send_file) + registry.register("run_bash", _run_bash) + registry.register("task_create", _task_create) + registry.register("task_get", _task_get) + registry.register("task_update", _task_update) + registry.register("task_list", _task_list) + registry.register("background_run", _background_run) + registry.register("check_background", _check_background) + registry.register("compact", _compact) + + return registry + + # ============== Main Agent Loop ============== async def run_agent_loop( pm: PlaywrightManager, user_instruction: str, - ws_send_msg, - ws_request_action, - ws_send_image, - ws_send_file, + ws_send_msg=None, + ws_request_action=None, + ws_send_image=None, + ws_send_file=None, + message_center: MessageCenter | None = None, images: list = [], history_messages: list = [], uploaded_files: list = [], @@ -689,8 +927,43 @@ async def run_agent_loop( cancel_event: asyncio.Event = None, ): effective_session_id = web_session_id or session_id + + async def emit_info(msg) -> None: + payload = _normalize_info_payload(msg) + if message_center: + await message_center.publish("info", payload) + return + if ws_send_msg: + await ws_send_msg(payload) + + async def emit_action_required(reason: str, image: str | None = None) -> None: + payload = {"reason": reason} + if image: + payload["image"] = image + if message_center: + await message_center.publish("action_required", payload) + return + if ws_request_action: + await ws_request_action(reason, image) + + async def emit_image(description: str, image_b64: str) -> None: + payload = {"description": description, "image": image_b64} + if message_center: + await message_center.publish("image", payload) + return + if ws_send_image: + await ws_send_image(description, image_b64) + + async def emit_file(file_path: str, description: str) -> None: + payload = {"path": file_path, "description": description} + if message_center: + await message_center.publish("send_file", payload) + return + if ws_send_file: + await ws_send_file(file_path, description) + if not effective_session_id: - await ws_send_msg( + await emit_info( {"message": "Error: No session ID provided", "message_key": "common.error"} ) return @@ -698,7 +971,7 @@ async def run_agent_loop( try: page = await pm.get_or_create_page(effective_session_id) except Exception as e: - await ws_send_msg( + await emit_info( { "message": f"Error creating browser tab: {e}", "message_key": "common.error", @@ -708,7 +981,7 @@ async def run_agent_loop( auto_root_task = _auto_create_root_task(user_instruction, images, uploaded_files) - await ws_send_msg( + await emit_info( { "message": f"Agent starting task: {user_instruction}", "message_key": "common.agent_starting", @@ -785,7 +1058,7 @@ async def run_agent_loop( if cancel_event and cancel_event.is_set(): if auto_root_task: task_manager.update(auto_root_task["id"], status="pending") - await ws_send_msg( + await emit_info( { "message": "Task cancelled by user.", "message_key": "common.task_cancelled", @@ -794,7 +1067,7 @@ async def run_agent_loop( break if pm.check_and_clear_pause_request(effective_session_id): - await ws_send_msg( + await emit_info( { "message": "Agent paused for manual takeover. Waiting for you to finish…", "message_key": "common.agent_paused_for_takeover", @@ -850,7 +1123,7 @@ async def run_agent_loop( # === NEW: Auto-compact check (Layer 2) === if estimate_tokens(messages) > TOKEN_THRESHOLD: - await ws_send_msg( + await emit_info( { "message": "Context threshold reached, compressing...", "message_key": "common.context_compressing", @@ -894,7 +1167,7 @@ async def run_agent_loop( messages.append({"role": "user", "content": user_content}) # 2. Think - await ws_send_msg( + await emit_info( {"message": "Agent is thinking...", "message_key": "common.agent_thinking"} ) @@ -906,6 +1179,7 @@ async def run_agent_loop( messages=messages, tools=TOOLS, ) + assistant_blocks = response.content except Exception as e: err_text = str(e) if "image_url" in err_text or "validation errors for ValidatorIterator" in err_text: @@ -914,7 +1188,7 @@ async def run_agent_loop( if not (isinstance(block, dict) and block.get("type") == "image") ] messages[-1] = {"role": "user", "content": user_content} - await ws_send_msg( + await emit_info( { "message": "当前模型网关不接受图片输入,已自动切换为纯文本模式继续执行。", "message_key": "common.image_input_disabled", @@ -928,35 +1202,46 @@ async def run_agent_loop( messages=messages, tools=TOOLS, ) + assistant_blocks = response.content except Exception as retry_error: - await ws_send_msg(f"Error calling LLM: {str(retry_error)}") + await emit_info(f"Error calling LLM: {str(retry_error)}") break else: - await ws_send_msg(f"Error calling LLM: {err_text}") + await emit_info(f"Error calling LLM: {err_text}") break - messages.append({"role": "assistant", "content": response.content}) + messages.append({"role": "assistant", "content": assistant_blocks}) # 3. Act - tool_uses = [block for block in response.content if block.type == "tool_use"] + tool_uses = [block for block in assistant_blocks if _get_block_type(block) == "tool_use"] user_content = [] if not tool_uses: - text_blocks = [b.text for b in response.content if b.type == "text"] + text_blocks = _extract_text_parts(assistant_blocks) if text_blocks: msg = "\n".join(text_blocks) - await ws_send_msg(msg) + await emit_info(msg) break continue manual_compact = False + manual_compact_focus = None + tool_registry = _create_tool_registry( + pm=pm, + page=page, + effective_session_id=effective_session_id, + auto_root_task=auto_root_task, + emit_info=emit_info, + emit_action_required=emit_action_required, + emit_image=emit_image, + emit_file=emit_file, + ) for tool in tool_uses: - tool_name = tool.name - args = tool.input + tool_id, tool_name, args = _extract_tool_use(tool) result_str = "" - await ws_send_msg( + await emit_info( { "message": f"Executing action: `{tool_name}` with args: {json.dumps(args, ensure_ascii=False)}", "message_key": "common.executing_action", @@ -968,188 +1253,30 @@ async def run_agent_loop( ) try: - if tool_name == "snapshot": - snapshot_text = await pm.get_aria_snapshot(effective_session_id) - result_str = ( - snapshot_text - if snapshot_text - else "Could not capture aria snapshot." - ) - - elif tool_name == "navigate": - await page.goto(args["url"]) - await asyncio.sleep(2) - result_str = "Successfully navigated." - - elif tool_name == "click": - ref = args.get("ref") - selector = args.get("selector") - if ref: - locator = await pm.locate_by_ref(ref, effective_session_id) - await locator.click(timeout=5000) - elif selector: - await page.click(selector, timeout=5000) - else: - raise ValueError("click requires either 'ref' or 'selector'") - await asyncio.sleep(1) - result_str = "Successfully clicked." - - elif tool_name == "type_text": - ref = args.get("ref") - selector = args.get("selector") - if ref: - locator = await pm.locate_by_ref(ref, effective_session_id) - await locator.fill(args["text"]) - elif selector: - await page.fill(selector, args["text"]) - else: - raise ValueError( - "type_text requires either 'ref' or 'selector'" - ) - result_str = "Successfully typed text." - - elif tool_name == "scroll": - direction = args.get("direction", "down") - if direction == "down": - await page.mouse.wheel(0, 1000) - else: - await page.mouse.wheel(0, -1000) - await asyncio.sleep(1) - result_str = "Scrolled." - - elif tool_name == "request_human_assistance": - reason = args.get("reason", "Login required.") - await pm.block_for_human( - ws_request_action, reason, effective_session_id - ) - result_str = "Human has processed the request. Page might have updated. You may resume your task." - - elif tool_name == "extract_info": - result_str = f"Extracted: {args['info_summary']}" - - elif tool_name == "send_screenshot": - description = args.get("description", "Current page screenshot") - screenshot_b64 = await pm.get_page_screenshot_base64( - effective_session_id - ) - if screenshot_b64: - await ws_send_image(description, screenshot_b64) - result_str = f"Screenshot sent to user: {description}" - else: - result_str = "Failed to capture screenshot." - - elif tool_name == "finish_task": - report = args.get("report", "Task completed.") - if auto_root_task: - task_manager.update(auto_root_task["id"], status="completed") - await ws_send_msg( - { - "message": f"✅ **Task Completed**:\n\n{report}", - "message_key": "common.task_completed", - "params": {"report": report}, - } - ) - result_str = "Finished." + execution = await tool_registry.execute(tool_name, args) + result_str = execution.output + if execution.finished: is_finished = True - - # File operation tools - elif tool_name == "read_file": - result_str = await workspace.read_file( - args["path"], args.get("limit") - ) - - elif tool_name == "write_file": - result_str = await workspace.write_file( - args["path"], args["content"] - ) - - elif tool_name == "edit_file": - result_str = await workspace.edit_file( - args["path"], args["old_text"], args["new_text"] - ) - - elif tool_name == "send_file": - file_path = args["path"] - description = args.get("description", f"File: {file_path}") - # Resolve path relative to workspace - full_path = WORKDIR / file_path - if not full_path.exists(): - result_str = f"Error: File not found: {file_path}" - elif not str(full_path.resolve()).startswith( - str(WORKDIR.resolve()) - ): - result_str = f"Error: Path escapes workspace: {file_path}" - else: - await ws_send_file(file_path, description) - result_str = f"File sent: {file_path}" - - elif tool_name == "run_bash": - result_str = await workspace.run_bash( - args["command"], args.get("timeout", 120) - ) - - # Task management tools - elif tool_name == "task_create": - result_str = task_manager.create( - args["subject"], args.get("description", "") - ) - - elif tool_name == "task_get": - result_str = task_manager.get(args["task_id"]) - - elif tool_name == "task_update": - result_str = task_manager.update( - args["task_id"], - args.get("status"), - args.get("addBlockedBy"), - args.get("addBlocks"), - ) - - elif tool_name == "task_list": - result_str = task_manager.list_all() - - # Background task tools - elif tool_name == "background_run": - result_str = await background_manager.run( - args["command"], - args.get("timeout"), - effective_session_id, - ) - - elif tool_name == "check_background": - result_str = await background_manager.check(args.get("task_id")) - - # Context compression - elif tool_name == "compact": + if execution.manual_compact: manual_compact = True - focus = args.get("focus") - result_str = ( - f"Manual compression requested{': ' + focus if focus else ''}." - ) - - else: - result_str = f"Unknown tool: {tool_name}" + manual_compact_focus = execution.compact_focus except Exception as e: result_str = f"Error executing {tool_name}: {str(e)}" user_content.append( - {"type": "tool_result", "tool_use_id": tool.id, "content": result_str} + {"type": "tool_result", "tool_use_id": tool_id, "content": result_str} ) # === NEW: Handle manual compact (Layer 3) === if manual_compact: - await ws_send_msg( + await emit_info( { "message": "Manual compression triggered...", "message_key": "common.manual_compressing", } ) - focus = None - for tool in tool_uses: - if tool.name == "compact" and tool.input.get("focus"): - focus = tool.input["focus"] - messages[:] = await auto_compact(messages, focus) + messages[:] = await auto_compact(messages, manual_compact_focus) # Reset user_content after compression user_content = [] @@ -1159,7 +1286,7 @@ async def run_agent_loop( if not is_finished: if auto_root_task: task_manager.update(auto_root_task["id"], status="pending") - await ws_send_msg( + await emit_info( { "message": "⚠️ Task reached maximum steps without calling finish_task.", "message_key": "common.max_steps_error", diff --git a/background_manager.py b/background_manager.py index b2bbfa3..7a2a2e5 100644 --- a/background_manager.py +++ b/background_manager.py @@ -22,6 +22,7 @@ from typing import Optional, Dict, List, Any from pathlib import Path import time +from message_center import MessageCenter # Default timeout for background tasks BG_TASK_TIMEOUT = int(os.getenv("BG_TASK_TIMEOUT", "300")) # 5 minutes default @@ -193,6 +194,24 @@ async def _execute( "session_id": session_id, }) + if session_id: + center = MessageCenter(session_id) + await center.publish( + "background_task_update", + { + "task_id": task_id, + "status": status, + "command": command[:80], + "result": (output or "(no output)")[:500], + "elapsed": time.time() + - self.tasks[task_id].get("start_time", time.time()), + "message": ( + f"[bg:{task_id}] {status} ({time.time() - self.tasks[task_id].get('start_time', time.time()):.1f}s): " + f"{command[:50]}" + ), + }, + ) + async def check(self, task_id: Optional[str] = None) -> str: """ Check status of background tasks. diff --git a/frontend/package-lock.json b/frontend/package-lock.json index 31c80b6..a03ad37 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -1037,6 +1037,7 @@ "integrity": "sha512-GYDxsZi3ChgmckRT9HPU0WEhKLP08ev/Yfcq2AstjrDASOYCSXeyjDsHg4v5t4jOj7cyDX3vmprafKlWIG9MXQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "undici-types": "~7.16.0" } @@ -1363,6 +1364,7 @@ } ], "license": "MIT", + "peer": true, "dependencies": { "baseline-browser-mapping": "^2.9.0", "caniuse-lite": "^1.0.30001759", @@ -1744,6 +1746,7 @@ "resolved": "https://registry.npmmirror.com/jiti/-/jiti-1.21.7.tgz", "integrity": "sha512-/imKNG4EbWNrVjoNC/1H5/9GFy+tqjGBHCaSsN+P2RnPqjsLmv6UD3Ej+Kj8nBWaRAwyk7kK5ZUc+OEatnTR3A==", "license": "MIT", + "peer": true, "bin": { "jiti": "bin/jiti.js" } @@ -1923,6 +1926,7 @@ "resolved": "https://registry.npmmirror.com/picomatch/-/picomatch-4.0.3.tgz", "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "license": "MIT", + "peer": true, "engines": { "node": ">=12" }, @@ -1967,6 +1971,7 @@ } ], "license": "MIT", + "peer": true, "dependencies": { "nanoid": "^3.3.11", "picocolors": "^1.1.1", @@ -2303,6 +2308,7 @@ "resolved": "https://registry.npmmirror.com/tailwindcss/-/tailwindcss-3.4.19.tgz", "integrity": "sha512-3ofp+LL8E+pK/JuPLPggVAIaEuhvIz4qNcf3nA1Xn2o/7fb7s/TYpHhwGDv1ZU3PkBluUVaF8PyCHcm48cKLWQ==", "license": "MIT", + "peer": true, "dependencies": { "@alloc/quick-lru": "^5.2.0", "arg": "^5.0.2", @@ -2396,6 +2402,7 @@ "integrity": "sha512-jl1vZzPDinLr9eUt3J/t7V6FgNEw9QjvBPdysz9KfQDD41fQrC2Y4vKQdiaUpFT4bXlb1RHhLpp8wtm6M5TgSw==", "devOptional": true, "license": "Apache-2.0", + "peer": true, "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" @@ -2453,6 +2460,7 @@ "integrity": "sha512-w+N7Hifpc3gRjZ63vYBXA56dvvRlNWRczTdmCBBa+CotUzAPf5b7YMdMR/8CQoeYE5LX3W4wj6RYTgonm1b9DA==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "esbuild": "^0.27.0", "fdir": "^6.5.0", @@ -2534,6 +2542,7 @@ "resolved": "https://registry.npmmirror.com/vue/-/vue-3.5.30.tgz", "integrity": "sha512-hTHLc6VNZyzzEH/l7PFGjpcTvUgiaPK5mdLkbjrTeWSRcEfxFrv56g/XckIYlE9ckuobsdwqd5mk2g1sBkMewg==", "license": "MIT", + "peer": true, "dependencies": { "@vue/compiler-dom": "3.5.30", "@vue/compiler-sfc": "3.5.30", diff --git a/message_center.py b/message_center.py new file mode 100644 index 0000000..f50ba40 --- /dev/null +++ b/message_center.py @@ -0,0 +1,130 @@ +""" +message_center.py - Lightweight message bus and message center for NeoFish. + +Provides: +1. In-process async publish/subscribe bus scoped by session_id +2. MessageCenter helper to publish typed events from core logic +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Awaitable, Callable + + +@dataclass +class BusEvent: + session_id: str + event_type: str + payload: dict[str, Any] + timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) + + +EventHandler = Callable[[BusEvent], Awaitable[None]] + + +class InMemoryMessageBus: + """In-process pub/sub bus with per-session producer/consumer workers.""" + + def __init__(self) -> None: + self._subscribers: dict[str, list[EventHandler]] = {} + self._queues: dict[str, asyncio.Queue] = {} + self._workers: dict[str, asyncio.Task] = {} + self._lock = asyncio.Lock() + self._stop_sentinel = object() + + async def _ensure_worker_locked(self, session_id: str) -> None: + if session_id in self._workers and not self._workers[session_id].done(): + return + if session_id not in self._queues: + self._queues[session_id] = asyncio.Queue() + self._workers[session_id] = asyncio.create_task( + self._consume_loop(session_id), name=f"msg-bus-{session_id}" + ) + + async def _consume_loop(self, session_id: str) -> None: + queue = self._queues.get(session_id) + if queue is None: + return + + while True: + item = await queue.get() + if item is self._stop_sentinel: + break + + event: BusEvent = item + async with self._lock: + handlers = list(self._subscribers.get(session_id, [])) + + if not handlers: + continue + + for handler in handlers: + try: + await handler(event) + except Exception: + # Keep bus resilient: one bad subscriber must not block others. + continue + + async def subscribe(self, session_id: str, handler: EventHandler) -> None: + async with self._lock: + handlers = self._subscribers.setdefault(session_id, []) + if handler not in handlers: + handlers.append(handler) + await self._ensure_worker_locked(session_id) + + async def unsubscribe(self, session_id: str, handler: EventHandler) -> None: + worker: asyncio.Task | None = None + queue: asyncio.Queue | None = None + async with self._lock: + handlers = self._subscribers.get(session_id, []) + if handler in handlers: + handlers.remove(handler) + if not handlers and session_id in self._subscribers: + del self._subscribers[session_id] + worker = self._workers.pop(session_id, None) + queue = self._queues.get(session_id) + + if queue is not None: + await queue.put(self._stop_sentinel) + if worker is not None: + try: + await worker + except Exception: + pass + async with self._lock: + self._queues.pop(session_id, None) + + async def publish(self, event: BusEvent) -> None: + queue: asyncio.Queue | None = None + async with self._lock: + handlers = self._subscribers.get(event.session_id, []) + if handlers: + await self._ensure_worker_locked(event.session_id) + queue = self._queues.get(event.session_id) + + if not handlers or queue is None: + return + await queue.put(event) + + +message_bus = InMemoryMessageBus() + + +class MessageCenter: + """Session-scoped message publisher facade.""" + + def __init__(self, session_id: str, bus: InMemoryMessageBus | None = None) -> None: + self.session_id = session_id + self.bus = bus or message_bus + + async def publish(self, event_type: str, payload: dict[str, Any]) -> None: + event = BusEvent( + session_id=self.session_id, + event_type=event_type, + payload=payload, + ) + await self.bus.publish(event) + diff --git a/platforms/web.py b/platforms/web.py index a8fda6f..76b4c08 100644 --- a/platforms/web.py +++ b/platforms/web.py @@ -12,6 +12,7 @@ from platforms.base import PlatformAdapter from agent_task_manager import task_manager from background_manager import background_manager +from message_center import BusEvent, MessageCenter logger = logging.getLogger(__name__) @@ -93,10 +94,13 @@ def __init__( self._workspace_dir = uploads_dir.parent self._pm = playwright_manager self._run_agent = run_agent + self._message_center = MessageCenter(session_id) + self._bus_handler = self._handle_bus_event # ── PlatformAdapter interface ───────────────────────────────────────────── async def start(self) -> None: + await self._message_center.bus.subscribe(self._session_id, self._bus_handler) await self._ws.send_text( json.dumps( { @@ -123,9 +127,74 @@ async def start(self) -> None: await self._ws.send_text(json.dumps(msg_data["message"])) async def stop(self) -> None: + await self._message_center.bus.unsubscribe(self._session_id, self._bus_handler) if not task_manager.has_running_task(self._session_id): self._pm.deactivate_tab(self._session_id) + async def _handle_bus_event(self, event: BusEvent) -> None: + payload = event.payload or {} + event_type = event.event_type + + if event_type == "info": + packet = {"type": "info", **payload} + await self._send_packet(packet) + self._append_message( + "assistant", + payload.get("message", ""), + message_key=payload.get("message_key", ""), + params=payload.get("params"), + ) + return + + if event_type == "action_required": + packet = { + "type": "action_required", + "reason": payload.get("reason", ""), + } + if payload.get("image"): + packet["image"] = payload["image"] + await self._send_packet(packet) + self._append_message( + "assistant", + f"[Action Required] {payload.get('reason', '')}", + image_data=payload.get("image", "") or "", + ) + return + + if event_type == "image": + await self._send_packet( + { + "type": "image", + "description": payload.get("description", ""), + "image": payload.get("image", ""), + } + ) + self._append_message( + "assistant", + f"[Image] {payload.get('description', '')}", + image_data=payload.get("image", "") or "", + ) + return + + if event_type == "send_file": + await self.send_file( + self._session_id, + payload.get("path", ""), + payload.get("description", ""), + ) + return + + if event_type == "background_task_update": + text = payload.get("message") + if not text: + text = ( + f"[bg:{payload.get('task_id', '')}] {payload.get('status', '')}: " + f"{payload.get('command', '')}" + ) + packet = {"type": "info", "message": text} + await self._send_packet(packet) + self._append_message("assistant", text) + def _is_ws_connected(self) -> bool: client_state = getattr(self._ws, "client_state", None) return bool(client_state and client_state.name == "CONNECTED") @@ -587,23 +656,6 @@ async def _handle_user_input(self, payload: dict) -> None: history = self._build_history() - async def _ws_send_msg(msg) -> None: - if isinstance(msg, dict): - human_text = msg.get("message", "") - packet = {"type": "info", **msg} - self._append_message( - "assistant", - human_text, - message_key=msg.get("message_key", ""), - params=msg.get("params"), - ) - else: - human_text = str(msg) - packet = {"type": "info", "message": human_text} - self._append_message("assistant", human_text) - - await self._send_packet(packet) - _web_running.add(self._session_id) async def _run_with_queue(cancel_event: asyncio.Event = None): @@ -611,12 +663,13 @@ async def _run_with_queue(cancel_event: asyncio.Event = None): await self._run_agent( self._pm, user_msg, - _ws_send_msg, + None, lambda reason, img: self.request_action( self._session_id, reason, img ), self._send_image, lambda path, desc: self.send_file(self._session_id, path, desc), + message_center=self._message_center, images=user_images, history_messages=history, uploaded_files=saved_paths, diff --git a/tool_registry.py b/tool_registry.py new file mode 100644 index 0000000..788305d --- /dev/null +++ b/tool_registry.py @@ -0,0 +1,34 @@ +""" +tool_registry.py - Runtime tool handler registry for NeoFish agent. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Awaitable, Callable + + +@dataclass +class ToolExecutionResult: + output: str + finished: bool = False + manual_compact: bool = False + compact_focus: str | None = None + + +ToolHandler = Callable[[dict], Awaitable[ToolExecutionResult]] + + +class ToolRegistry: + def __init__(self) -> None: + self._handlers: dict[str, ToolHandler] = {} + + def register(self, name: str, handler: ToolHandler) -> None: + self._handlers[name] = handler + + async def execute(self, name: str, args: dict) -> ToolExecutionResult: + handler = self._handlers.get(name) + if handler is None: + return ToolExecutionResult(output=f"Unknown tool: {name}") + return await handler(args) + From 5b5d361b38d4fb924f807fcb3e66b596a9f2ebf5 Mon Sep 17 00:00:00 2001 From: 92819 <928198962@qq.com> Date: Sat, 18 Apr 2026 19:25:35 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E5=89=8D=E5=90=8E=E7=AB=AF=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E4=BA=86RAG=E5=8A=9F=E8=83=BD,=E5=AF=B9=E5=88=87?= =?UTF-8?q?=E7=89=87=E7=AD=96=E7=95=A5=E5=81=9A=E4=BA=86=E5=8F=96=E8=88=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- frontend/src/App.vue | 20 +- frontend/src/components/GalleryPanel.vue | 17 + frontend/src/components/KnowledgePanel.vue | 163 ++++++++ .../src/components/KnowledgeWorkspace.vue | 197 ++++++++++ frontend/src/components/Sidebar.vue | 45 ++- frontend/src/composables/useKnowledge.ts | 362 ++++++++++++++++++ frontend/src/locales/en.json | 40 +- frontend/src/locales/zh.json | 40 +- 8 files changed, 872 insertions(+), 12 deletions(-) create mode 100644 frontend/src/components/GalleryPanel.vue create mode 100644 frontend/src/components/KnowledgePanel.vue create mode 100644 frontend/src/components/KnowledgeWorkspace.vue create mode 100644 frontend/src/composables/useKnowledge.ts diff --git a/frontend/src/App.vue b/frontend/src/App.vue index 2cde961..cf5ecb1 100644 --- a/frontend/src/App.vue +++ b/frontend/src/App.vue @@ -7,17 +7,21 @@ import MainInput from './components/MainInput.vue' import BrowserView from './components/BrowserView.vue' import ThinkingChain from './components/ThinkingChain.vue' import TaskSidebar from './components/TaskSidebar.vue' +import KnowledgeWorkspace from './components/KnowledgeWorkspace.vue' import { useChatHistory } from './composables/useChatHistory' import { useTasks } from './composables/useTasks' import { useDebugMode } from './composables/useDebugMode' import { useThemeMode } from './composables/useThemeMode' +import { useKnowledge } from './composables/useKnowledge' import { extractSessionPreview } from './utils/sessionPreview' const { t } = useI18n() const { sessions, activeChatId, loadSessions, createNewChat, refreshSession } = useChatHistory() const { tasks, loadTasks, isLoading: tasksLoading } = useTasks() +const { loadFolderFiles } = useKnowledge() const { debugMode } = useDebugMode() useThemeMode() +const mainView = ref<'chat' | 'knowledge'>('chat') // ─── WebSocket ───────────────────────────────────────────────────────────── const ws = ref(null) @@ -311,6 +315,7 @@ function pushMessage(data: any) { const { loadMessages } = useChatHistory() async function switchToSession(id: string) { + mainView.value = 'chat' activeChatId.value = id messages.value = [] hasStarted.value = false @@ -365,6 +370,7 @@ async function switchToSession(id: string) { // ─── New chat ────────────────────────────────────────────────────────────── async function handleNewChat() { + mainView.value = 'chat' // createNewChat already called in Sidebar → we just switch to the new active session messages.value = [] hasStarted.value = false @@ -447,6 +453,11 @@ function onBrowserNavigate(payload: { url: string }) { ws.value?.send(JSON.stringify({ type: 'takeover_navigate', url: payload.url })) } +async function openKnowledgeWorkspace(folderId: string) { + mainView.value = 'knowledge' + await loadFolderFiles(folderId) +} + // ─── Lifecycle ───────────────────────────────────────────────────────────── onMounted(async () => { await loadTasks() @@ -478,6 +489,7 @@ onUnmounted(() => {
@@ -502,7 +514,7 @@ onUnmounted(() => { -