From 1b25020bfabe830940f0305afd08212b9b5accb1 Mon Sep 17 00:00:00 2001 From: shaerware Date: Tue, 31 Mar 2026 07:08:34 +0000 Subject: [PATCH] =?UTF-8?q?feat(chat):=20ChatService=20facade=20=E2=80=94?= =?UTF-8?q?=20unified=20chat=20CRUD=20+=20LLM=20generation=20(#645)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement ChatServiceImpl in modules/chat/facade.py that wraps existing ChatService (CRUD) + ChatShareService with send_message() and stream_message() methods, implementing the ChatService Protocol from protocols.py. Refactored router to delegate LLM generation to the facade instead of inline code. Removed ~500 lines of duplicated agentic RAG loop, prompt assembly, and RAG injection from three router endpoints (send, stream, edit, regenerate). Co-Authored-By: Claude Opus 4.6 (1M context) --- CLAUDE.md | 3 +- app/dependencies.py | 3 + modules/chat/facade.py | 670 ++++++++++++++++++++++++++++++++++++++++ modules/chat/router.py | 625 ++++--------------------------------- modules/chat/schemas.py | 15 +- orchestrator.py | 8 + 6 files changed, 748 insertions(+), 576 deletions(-) create mode 100644 modules/chat/facade.py diff --git a/CLAUDE.md b/CLAUDE.md index f4f8b02..3980ef2 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -146,7 +146,7 @@ Always run lint locally before pushing. Protected branches require PR workflow ### Modular Infrastructure (`modules/`) -Foundation layer for modular decomposition (issue #489). Phase 4 complete: all 28 routers migrated (Phase 3), all inline endpoints extracted (Phase 4.1–4.5), all background tasks via `TaskRegistry` (Phase 4.6), all startup helpers + service init extracted to domain `startup.py` modules, global service variables removed (Phase 4.7a/b). Phase 5.1–5.4 complete (EventBus infrastructure, first events, DatasetSynced, Widget→CRM events). Phase 6 complete (Protocol interfaces for AuthService, KnowledgeService, LLMService, ChatService in `modules/*/protocols.py`). Phase 7.1 complete (AuthService facade in `modules/core/auth_service.py`). Phase 7.2 complete (KnowledgeService facade in `modules/knowledge/facade.py`). Phase 7.3 complete (LLMService facade in `modules/llm/facade.py`). Phase 7.4 pending. +Foundation layer for modular decomposition (issue #489). Phase 4 complete: all 28 routers migrated (Phase 3), all inline endpoints extracted (Phase 4.1–4.5), all background tasks via `TaskRegistry` (Phase 4.6), all startup helpers + service init extracted to domain `startup.py` modules, global service variables removed (Phase 4.7a/b). Phase 5.1–5.4 complete (EventBus infrastructure, first events, DatasetSynced, Widget→CRM events). Phase 6 complete (Protocol interfaces for AuthService, KnowledgeService, LLMService, ChatService in `modules/*/protocols.py`). Phase 7.1 complete (AuthService facade in `modules/core/auth_service.py`). Phase 7.2 complete (KnowledgeService facade in `modules/knowledge/facade.py`). Phase 7.3 complete (LLMService facade in `modules/llm/facade.py`). Phase 7.4 complete (ChatService facade in `modules/chat/facade.py`). - **`EventBus`** (`modules/core/events.py`): In-process async pub/sub. Handlers run concurrently via `asyncio.gather`; exceptions are logged, never propagated to publisher. `BaseEvent` dataclass with auto-timestamp. Singleton in `ServiceContainer.event_bus`. Domain events: `InternetStatusChanged`, `UserRoleChanged`, `SessionRevoked`, `DatasetSynced` (in `modules/core/events.py`), `KnowledgeUpdated` (in `modules/knowledge/events.py`), `WidgetSessionCreated`, `WidgetMessageSent`, `WidgetContactSubmitted` (in `modules/channels/widget/events.py`). Subscriptions registered via `setup_event_subscriptions()` in `modules/core/startup.py`, which delegates to domain-specific setup functions (`setup_llm_event_subscriptions()` in `modules/llm/startup.py`, `setup_knowledge_event_subscriptions()` in `modules/knowledge/startup.py`, `setup_crm_event_subscriptions()` in `modules/crm/startup.py`). `DatasetSynced` decouples CRM/ecommerce/kanban from knowledge. Widget events decouple widget router from amoCRM: widget publishes events, CRM domain handles lead/contact/note creation reactively. - **`TaskRegistry`** (`modules/core/tasks.py`): Named background tasks — periodic (interval-based) or one-shot. `start_all()` / `cancel_all(timeout)` lifecycle. `TaskInfo` dataclass tracks status, run count, last error. 6 tasks registered in `startup_event()`: `session-cleanup` (1h), `periodic-vacuum` (7d), `kanban-sync` (15min), `woocommerce-sync` (daily 23:00 UTC), `wiki-embeddings` (one-shot), `wiki-collection-indexes` (one-shot). Task functions in `modules/core/maintenance.py`, `modules/knowledge/tasks.py`, `modules/kanban/tasks.py`, `modules/ecommerce/tasks.py`. @@ -166,6 +166,7 @@ Import from `modules.core`: `EventBus`, `BaseEvent`, `TaskRegistry`, `TaskInfo`, | `modules/core/` | `auth_service.py` | `AuthService` (Phase 7.1 facade — wraps `auth_manager.py` functions, implements Protocol from `protocols.py`) | | `modules/knowledge/` | `facade.py` | `KnowledgeServiceImpl` (Phase 7.2 facade — wraps `wiki_rag_service` + knowledge services, implements Protocol from `protocols.py`) | | `modules/llm/` | `facade.py` | `LLMServiceImpl` (Phase 7.3 facade — wraps `CloudLLMService`/`VLLMLLMService` + `CloudProviderService`, implements Protocol from `protocols.py`) | +| `modules/chat/` | `facade.py` | `ChatServiceImpl` (Phase 7.4 facade — wraps `ChatService` CRUD + LLM generation + `ChatShareService`, implements Protocol from `protocols.py`) | | `modules/chat/` | `service.py` | `ChatService`, `ChatShareService` | | `modules/knowledge/` | `service.py` | `FAQService`, `KnowledgeDocService`, `KnowledgeCollectionService`, `GitHubRepoProjectService` | | `modules/channels/telegram/` | `service.py` | `BotInstanceService`, `TelegramSessionService` | diff --git a/app/dependencies.py b/app/dependencies.py index baa0810..5fdfbd2 100644 --- a/app/dependencies.py +++ b/app/dependencies.py @@ -45,6 +45,9 @@ def __init__(self): # LLM service facade (Phase 7.3) self.llm_service_facade = None + # Chat service facade (Phase 7.4) + self.chat_service_facade = None + # STT service self.stt_service: STTService | None = None diff --git a/modules/chat/facade.py b/modules/chat/facade.py new file mode 100644 index 0000000..9841c94 --- /dev/null +++ b/modules/chat/facade.py @@ -0,0 +1,670 @@ +"""ChatService facade — unified entry point for chat CRUD + LLM generation. + +Implements the ``ChatService`` Protocol from ``modules.chat.protocols``. +Delegates to existing services (Strangler Fig pattern): +- Session/message CRUD -> ``ChatService`` (modules/chat/service.py) +- Sharing -> ``ChatShareService`` (modules/chat/service.py) +- LLM generation -> underlying LLM service via ``ServiceContainer`` +- RAG / tool execution -> ``wiki_rag_service``, ``web_search_service`` + +The router stays responsible for HTTP concerns (auth, rate limiting, +request parsing, LLM backend resolution, image uploads). The facade +owns the core flow: prompt assembly -> RAG -> LLM call -> save response. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +from typing import TYPE_CHECKING, Optional + +from modules.chat.schemas import ShareInfo, StreamChunk + + +if TYPE_CHECKING: + from collections.abc import AsyncIterator + + from app.dependencies import ServiceContainer + +logger = logging.getLogger(__name__) + +# Max agentic tool-call iterations (matches router constant) +MAX_TOOL_ITERATIONS = 10 + +# Default RAG system prompt +_DEFAULT_RAG_PROMPT = ( + "Ты — ИИ-секретарь. Отвечай на вопросы пользователя кратко и по делу, " + "используя предоставленную документацию. Отвечай на языке пользователя." +) + +_NO_TOOLS_SUFFIX = ( + "\n\nВАЖНО: Ты — чат-бот без доступа к инструментам, файлам и командам. " + "НИКОГДА не генерируй вызовы функций, tool_use, function_calls, " + "filesystem, code execution или любые блоки вида `command { ... }`. " + "Отвечай только обычным текстом. Используй markdown для форматирования." +) + +_AGENTIC_RAG_SUFFIX = ( + "\n\nУ тебя есть инструмент knowledge_search для поиска по базе знаний. " + "Используй его когда вопрос связан с документацией или требует фактических данных. " + "Можешь вызвать несколько раз с разными запросами. " + "Не выдумывай — если не нашёл информацию, честно скажи." +) + +_WEB_SEARCH_SUFFIX = ( + "\n\nУ тебя есть инструмент web_search для поиска в интернете. " + "Используй его когда нужна актуальная информация: новости, цены, погода, " + "события, факты. Можешь вызвать несколько раз с разными запросами." +) + +KNOWLEDGE_SEARCH_TOOL = { + "type": "function", + "function": { + "name": "knowledge_search", + "description": ( + "Search the knowledge base for relevant information. " + "Use when the user asks something that might be answered by documentation." + ), + "parameters": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "Search query. Be specific."} + }, + "required": ["query"], + }, + }, +} + +WEB_SEARCH_TOOL = { + "type": "function", + "function": { + "name": "web_search", + "description": ( + "Search the internet for current information. " + "Use when the user asks about recent events, prices, weather, news, " + "or anything that requires up-to-date data from the web." + ), + "parameters": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "Search query in the language of the user's question.", + } + }, + "required": ["query"], + }, + }, +} + + +# --------------------------------------------------------------------------- +# Helper functions (extracted from router) +# --------------------------------------------------------------------------- + + +def _supports_tools(llm_service) -> bool: + """Check if the LLM provider supports tool calling.""" + if getattr(llm_service, "supports_tools", False): + return True + return hasattr(llm_service, "provider") and getattr( + llm_service.provider, "supports_tools", False + ) + + +def _should_use_agentic_rag( + llm_service, rag_mode: str, collection_ids: list[int], wiki_rag +) -> bool: + """Check if agentic RAG loop should be used instead of one-shot injection.""" + if rag_mode == "none" or not collection_ids or not wiki_rag: + return False + return _supports_tools(llm_service) + + +def _build_tools(use_agentic: bool, use_web_search: bool) -> list[dict]: + """Build the tools list based on enabled features.""" + tools: list[dict] = [] + if use_agentic: + tools.append(KNOWLEDGE_SEARCH_TOOL) + if use_web_search: + tools.append(WEB_SEARCH_TOOL) + return tools + + +def _finalize_prompt( + prompt: str | None, agentic_rag: bool = False, web_search: bool = False +) -> str: + """Add suffix to system prompt.""" + base = prompt or _DEFAULT_RAG_PROMPT + if not agentic_rag and not web_search: + return base + _NO_TOOLS_SUFFIX + result = base + if agentic_rag: + result += _AGENTIC_RAG_SUFFIX + if web_search: + result += _WEB_SEARCH_SUFFIX + return result + + +def _inject_context_files(prompt: str | None, session: dict) -> str | None: + """Inject context file contents into system prompt if session has any.""" + context_files = session.get("context_files") + if not context_files: + return prompt + files_text = "\n\n".join(f"# {f['name']}\n{f['content']}" for f in context_files) + base = prompt or _DEFAULT_RAG_PROMPT + return f"{base}\n\n--- Прикреплённые файлы ---\n{files_text}" + + +def _execute_knowledge_search(wiki_rag, query: str, collection_ids: list[int]) -> str: + """Execute a knowledge base search (sync).""" + if len(collection_ids) == 1: + return wiki_rag.retrieve(query, top_k=5, max_chars=3000, collection_id=collection_ids[0]) + return wiki_rag.retrieve_multi(query, collection_ids, top_k=5, max_chars=3000) + + +async def _execute_knowledge_search_async(wiki_rag, query: str, collection_ids: list[int]) -> str: + """Execute knowledge search with vector search support (async).""" + if not wiki_rag.vector_search_available: + return await asyncio.to_thread(_execute_knowledge_search, wiki_rag, query, collection_ids) + + if len(collection_ids) == 1: + return await wiki_rag.retrieve_async( + query, top_k=5, max_chars=3000, collection_id=collection_ids[0] + ) + return await wiki_rag.retrieve_multi_async(query, collection_ids, top_k=5, max_chars=3000) + + +def _execute_tool_call( + fn_name: str, + args: dict, + wiki_rag, + collection_ids: list[int], +) -> tuple[str, bool]: + """Execute a tool call and return (result_text, found).""" + if fn_name == "knowledge_search": + query = args.get("query", "") + if not query: + return "Пустой поисковый запрос.", False + result = _execute_knowledge_search(wiki_rag, query, collection_ids) + found = bool(result and result.strip()) + return result or "Ничего не найдено в базе знаний.", found + elif fn_name == "web_search": + query = args.get("query", "") + if not query: + return "Пустой поисковый запрос.", False + from modules.search.service import web_search_service + + result = web_search_service.search(query, max_results=5) + found = bool(result and "No web results" not in result and "failed" not in result) + return result, found + else: + return f"Неизвестный инструмент: {fn_name}", False + + +def _inject_rag_context( + wiki_rag, + user_content: str, + base_prompt: str | None, + rag_mode: str, + collection_ids: list[int], +) -> str | None: + """Inject RAG context into system prompt (sync, one-shot mode).""" + if not wiki_rag or not user_content or rag_mode == "none" or not collection_ids: + return base_prompt + + if len(collection_ids) == 1: + wiki_context = wiki_rag.retrieve( + user_content, top_k=7, max_chars=4000, collection_id=collection_ids[0] + ) + else: + wiki_context = wiki_rag.retrieve_multi( + user_content, collection_ids, top_k=7, max_chars=4000 + ) + + base = base_prompt or _DEFAULT_RAG_PROMPT + if wiki_context: + rag_instruction = ( + "\n\n--- КОНТЕКСТ ИЗ БАЗЫ ЗНАНИЙ (обязательно к использованию) ---\n" + "Ниже приведена релевантная информация из базы знаний. " + "ОБЯЗАТЕЛЬНО используй эти данные при ответе. " + "Если информация ниже отвечает на вопрос пользователя — ответь на основе неё. " + "НЕ выдумывай информацию, которой нет в контексте ниже.\n" + ) + return f"{base}{rag_instruction}\n{wiki_context}" + + no_context_instruction = ( + "\n\n--- ВАЖНО ---\n" + "По данному запросу в базе знаний не найдено релевантной информации. " + "НЕ выдумывай ответ. Если ты не уверен в точности информации — " + "честно скажи, что не нашёл данных в базе знаний, и предложи " + "обратиться к менеджеру или уточнить вопрос.\n" + ) + return f"{base}{no_context_instruction}" + + +async def _inject_rag_context_async( + wiki_rag, + user_content: str, + base_prompt: str | None, + rag_mode: str, + collection_ids: list[int], +) -> str | None: + """Async version of _inject_rag_context — includes vector search results.""" + if not wiki_rag or not user_content or rag_mode == "none" or not collection_ids: + return base_prompt + + if not wiki_rag.vector_search_available: + return await asyncio.to_thread( + _inject_rag_context, wiki_rag, user_content, base_prompt, rag_mode, collection_ids + ) + + if len(collection_ids) == 1: + wiki_context = await wiki_rag.retrieve_async( + user_content, top_k=7, max_chars=4000, collection_id=collection_ids[0] + ) + else: + wiki_context = await wiki_rag.retrieve_multi_async( + user_content, collection_ids, top_k=7, max_chars=4000 + ) + + base = base_prompt or _DEFAULT_RAG_PROMPT + if wiki_context: + rag_instruction = ( + "\n\n--- КОНТЕКСТ ИЗ БАЗЫ ЗНАНИЙ (обязательно к использованию) ---\n" + "Ниже приведена релевантная информация из базы знаний. " + "ОБЯЗАТЕЛЬНО используй эти данные при ответе. " + "Если информация ниже отвечает на вопрос пользователя — ответь на основе неё. " + "НЕ выдумывай информацию, которой нет в контексте ниже.\n" + ) + return f"{base}{rag_instruction}\n{wiki_context}" + + no_context_instruction = ( + "\n\n--- ВАЖНО ---\n" + "По данному запросу в базе знаний не найдено релевантной информации. " + "НЕ выдумывай ответ. Если ты не уверен в точности информации — " + "честно скажи, что не нашёл данных в базе знаний, и предложи " + "обратиться к менеджеру или уточнить вопрос.\n" + ) + return f"{base}{no_context_instruction}" + + +def _get_model_name(llm_service) -> str: + """Extract model name from LLM service for token counting.""" + if hasattr(llm_service, "model_name") and llm_service.model_name: + return llm_service.model_name + if hasattr(llm_service, "config") and isinstance(llm_service.config, dict): + return llm_service.config.get("model_name", "") + return "claude" + + +# --------------------------------------------------------------------------- +# Facade +# --------------------------------------------------------------------------- + + +class ChatServiceImpl: + """Facade implementing the ChatService Protocol. + + Wraps existing ``ChatService`` (CRUD) and ``ChatShareService`` singletons, + adds LLM generation methods (``send_message``, ``stream_message``). + + Receives ``ServiceContainer`` for lazy access to LLM and RAG services + which may change at runtime. + """ + + def __init__(self, container: ServiceContainer) -> None: + self._container = container + # Lazy import to avoid circular deps + from modules.chat.service import chat_service, chat_share_service + + self._crud = chat_service + self._share = chat_share_service + + # -- Sessions (delegate to CRUD) ------------------------------------------ + + async def create_session( + self, + *, + source: str = "admin", + source_id: str | None = None, + title: str | None = None, + system_prompt: str | None = None, + owner_id: int | None = None, + workspace_id: int = 1, + ) -> dict: + return await self._crud.create_session( + title=title, + system_prompt=system_prompt, + source=source, + source_id=source_id, + owner_id=owner_id, + workspace_id=workspace_id, + ) + + async def get_session(self, session_id: str) -> dict | None: + return await self._crud.get_session(session_id) + + async def list_sessions( + self, + *, + owner_id: int | None = None, + workspace_id: int | None = None, + ) -> list[dict]: + return await self._crud.list_sessions(owner_id=owner_id, workspace_id=workspace_id) + + async def delete_session(self, session_id: str) -> bool: + return await self._crud.delete_session(session_id) + + # -- Messages (delegate to CRUD) ------------------------------------------ + + async def get_history(self, session_id: str) -> list[dict]: + return await self._crud.get_active_messages(session_id) + + async def add_message( + self, + session_id: str, + role: str, + content: str, + *, + parent_id: str | None = None, + ) -> dict | None: + return await self._crud.add_message(session_id, role, content, parent_id=parent_id) + + # -- Generation (LLM) ---------------------------------------------------- + + async def send_message( + self, + session_id: str, + content: str, + *, + llm_service=None, + session_data: dict | None = None, + system_prompt: str | None = None, + rag_mode: str = "all", + collection_ids: list[int] | None = None, + parent_id: str | None = None, + extra_data: str | None = None, + ) -> dict: + """Send a user message, call LLM (non-streaming), save and return assistant reply. + + Args: + session_id: Chat session ID. + content: User message text (already includes OCR if any). + llm_service: Resolved LLM service instance (CloudLLMService/VLLMLLMService). + session_data: Pre-fetched session dict (avoids redundant DB call). + system_prompt: Override system prompt (from widget/mobile/override). + rag_mode: RAG mode ("all", "selected", "collection", "none"). + collection_ids: Knowledge collection IDs for RAG. + parent_id: Parent message ID (for branching on edit/regenerate). + extra_data: JSON string with image metadata etc. + + Returns: + Saved assistant message dict. + """ + llm = llm_service or self._container.llm_service + if not llm: + raise RuntimeError("LLM service not available") + + session = session_data or await self._crud.get_session(session_id) + if not session: + raise RuntimeError(f"Session {session_id} not found") + + wiki_rag = self._container.wiki_rag_service + coll_ids = collection_ids or [] + use_agentic = _should_use_agentic_rag(llm, rag_mode, coll_ids, wiki_rag) + use_web_search = bool(session.get("web_search_enabled")) and _supports_tools(llm) + + # Build prompt + prompt = system_prompt or session.get("system_prompt") + if not prompt and hasattr(llm, "get_system_prompt"): + prompt = llm.get_system_prompt() + + if not use_agentic: + prompt = await _inject_rag_context_async(wiki_rag, content, prompt, rag_mode, coll_ids) + + prompt = _inject_context_files(prompt, session) + + messages = await self._crud.get_messages_for_llm( + session_id, + _finalize_prompt(prompt, agentic_rag=use_agentic, web_search=use_web_search), + ) + + # Trim context + from app.utils.tokens import get_context_window, trim_messages + + model = _get_model_name(llm) + context_window = get_context_window(model) + messages, _ = trim_messages(messages, context_window) + + # Generate + if use_agentic or use_web_search: + tools = _build_tools(use_agentic, use_web_search) + loop_messages = list(messages) + response_text = "" + + for _iteration in range(MAX_TOOL_ITERATIONS): + result = llm.generate_response_from_messages( + loop_messages, stream=False, tools=tools + ) + if isinstance(result, str): + response_text = result + break + tool_calls = result.get("tool_calls") + if not tool_calls: + response_text = (result.get("content") or "").strip() + break + loop_messages.append(result) + for tc in tool_calls: + fn_name = tc["function"]["name"] + try: + args = json.loads(tc["function"]["arguments"]) + except json.JSONDecodeError: + args = {} + result_text, _ = _execute_tool_call(fn_name, args, wiki_rag, coll_ids) + loop_messages.append( + {"role": "tool", "tool_call_id": tc["id"], "content": result_text} + ) + else: + response_text = llm.generate_response_from_messages(messages, stream=False) + if hasattr(response_text, "__iter__") and not isinstance(response_text, str): + response_text = "".join(response_text) + + assistant_msg = await self._crud.add_message( + session_id, "assistant", response_text, parent_id=parent_id + ) + return assistant_msg + + async def stream_message( + self, + session_id: str, + content: str, + *, + llm_service=None, + session_data: dict | None = None, + user_msg: dict | None = None, + system_prompt: str | None = None, + rag_mode: str = "all", + collection_ids: list[int] | None = None, + ) -> AsyncIterator[StreamChunk]: + """Send a user message, stream the LLM response, save the result. + + Yields ``StreamChunk`` dicts that the router serialises to SSE events. + The user message should already be saved before calling this method + (``user_msg`` is the saved dict echoed back to the client). + + Args: + session_id: Chat session ID. + content: User message text (already includes OCR if any). + llm_service: Resolved LLM service instance. + session_data: Pre-fetched session dict. + user_msg: Already-saved user message dict (echoed as first SSE event). + system_prompt: Override system prompt. + rag_mode: RAG mode. + collection_ids: Knowledge collection IDs for RAG. + + Yields: + StreamChunk dicts with ``type`` field for SSE serialization. + """ + llm = llm_service or self._container.llm_service + if not llm: + yield StreamChunk(type="error", content="LLM service not available") + return + + session = session_data or await self._crud.get_session(session_id) + if not session: + yield StreamChunk(type="error", content=f"Session {session_id} not found") + return + + wiki_rag = self._container.wiki_rag_service + coll_ids = collection_ids or [] + use_agentic = _should_use_agentic_rag(llm, rag_mode, coll_ids, wiki_rag) + use_web_search = bool(session.get("web_search_enabled")) and _supports_tools(llm) + + # Build prompt + prompt = system_prompt or session.get("system_prompt") + if not prompt and hasattr(llm, "get_system_prompt"): + prompt = llm.get_system_prompt() + + if not use_agentic: + prompt = await _inject_rag_context_async(wiki_rag, content, prompt, rag_mode, coll_ids) + + prompt = _inject_context_files(prompt, session) + + messages = await self._crud.get_messages_for_llm( + session_id, + _finalize_prompt(prompt, agentic_rag=use_agentic, web_search=use_web_search), + ) + + # Trim context + from app.utils.tokens import count_message_tokens, get_context_window, trim_messages + + model = _get_model_name(llm) + context_window = get_context_window(model) + messages, was_trimmed = trim_messages(messages, context_window) + if was_trimmed: + logger.info(f"Trimmed context for session {session_id}") + + # Echo user message + if user_msg: + yield StreamChunk(type="user_message", message=user_msg) + + full_response: list[str] = [] + + try: + if use_agentic or use_web_search: + # Agentic RAG loop + tools = _build_tools(use_agentic, use_web_search) + loop_messages = list(messages) + + for _iteration in range(MAX_TOOL_ITERATIONS): + content_chunks: list[str] = [] + tool_calls_result = None + + for event in llm.generate_response_from_messages( + loop_messages, stream=True, tools=tools + ): + if isinstance(event, dict): + if event["type"] == "content": + content_chunks.append(event["content"]) + full_response.append(event["content"]) + yield StreamChunk(type="chunk", content=event["content"]) + elif event["type"] == "tool_calls": + tool_calls_result = event["tool_calls"] + else: + full_response.append(event) + yield StreamChunk(type="chunk", content=event) + + if not tool_calls_result: + break + + # Execute tool calls + assistant_content = "".join(content_chunks) or None + loop_messages.append( + { + "role": "assistant", + "content": assistant_content, + "tool_calls": tool_calls_result, + } + ) + + for tc in tool_calls_result: + fn_name = tc["function"]["name"] + try: + args = json.loads(tc["function"]["arguments"]) + except json.JSONDecodeError: + args = {} + query = args.get("query", "") + yield StreamChunk(type="tool_start", name=fn_name, query=query) + + result_text, found = _execute_tool_call(fn_name, args, wiki_rag, coll_ids) + yield StreamChunk(type="tool_end", name=fn_name, found=found) + + loop_messages.append( + {"role": "tool", "tool_call_id": tc["id"], "content": result_text} + ) + else: + # One-shot streaming + for chunk in llm.generate_response_from_messages(messages, stream=True): + full_response.append(chunk) + yield StreamChunk(type="chunk", content=chunk) + + # Save full response + response_text = "".join(full_response) + assistant_msg = await self._crud.add_message(session_id, "assistant", response_text) + + # Token usage + all_msgs = messages + [{"role": "assistant", "content": response_text}] + tokens = count_message_tokens(all_msgs, model) + percent = round(tokens / context_window * 100, 1) if context_window else 0 + token_usage = { + "tokens": tokens, + "context_window": context_window, + "percent": percent, + "trimmed": was_trimmed, + } + + yield StreamChunk( + type="assistant_message", message=assistant_msg, token_usage=token_usage + ) + yield StreamChunk(type="done", done=True) + + except Exception as e: + logger.error(f"Chat stream error: {e}") + yield StreamChunk(type="error", content=str(e)) + + # -- Sharing (delegate to ChatShareService) ------------------------------- + + async def share_session( + self, + session_id: str, + user_id: int, + *, + permission: str = "read", + shared_by: int | None = None, + branch_message_id: str | None = None, + ) -> ShareInfo: + result = await self._share.add_share( + session_id, user_id, permission, shared_by, branch_message_id + ) + return ShareInfo( + id=result.get("id", 0), + session_id=result.get("session_id", session_id), + user_id=result.get("user_id", user_id), + permission=result.get("permission", permission), + shared_by=result.get("shared_by"), + shared_at=result.get("shared_at"), + ) + + async def unshare_session( + self, + session_id: str, + user_id: int, + ) -> bool: + return await self._share.remove_share(session_id, user_id) + + +# Module-level singleton — NOT created here because it needs ServiceContainer. +# Created in startup and set via: chat_service_facade = ChatServiceImpl(container) +chat_service_facade: Optional[ChatServiceImpl] = None diff --git a/modules/chat/router.py b/modules/chat/router.py index b16c9ed..7c6fb7d 100644 --- a/modules/chat/router.py +++ b/modules/chat/router.py @@ -12,17 +12,17 @@ from app.dependencies import get_container from app.rate_limiter import RATE_LIMIT_CHAT, limiter -from app.utils.tokens import count_message_tokens, get_context_window, trim_messages +from app.utils.tokens import count_message_tokens, get_context_window from auth_manager import User, require_permission, user_has_level, workspace_context from cloud_llm_service import CloudLLMService from modules.channels.mobile.service import mobile_app_instance_service from modules.channels.telegram.service import bot_instance_service from modules.channels.whatsapp.service import whatsapp_instance_service from modules.channels.widget.service import widget_instance_service +from modules.chat.facade import ChatServiceImpl, chat_service_facade from modules.chat.service import chat_service, chat_share_service from modules.knowledge.service import knowledge_collection_service from modules.llm.service import cloud_provider_service -from modules.search.service import web_search_service logger = logging.getLogger(__name__) @@ -43,47 +43,6 @@ "Отвечай только обычным текстом. Используй markdown для форматирования." ) -# Agentic RAG: LLM decides when to search the knowledge base -KNOWLEDGE_SEARCH_TOOL = { - "type": "function", - "function": { - "name": "knowledge_search", - "description": ( - "Search the knowledge base for relevant information. " - "Use when the user asks something that might be answered by documentation." - ), - "parameters": { - "type": "object", - "properties": { - "query": {"type": "string", "description": "Search query. Be specific."} - }, - "required": ["query"], - }, - }, -} -WEB_SEARCH_TOOL = { - "type": "function", - "function": { - "name": "web_search", - "description": ( - "Search the internet for current information. " - "Use when the user asks about recent events, prices, weather, news, " - "or anything that requires up-to-date data from the web." - ), - "parameters": { - "type": "object", - "properties": { - "query": { - "type": "string", - "description": "Search query in the language of the user's question.", - } - }, - "required": ["query"], - }, - }, -} -MAX_TOOL_ITERATIONS = 10 - # Temporary cache for OCR text from uploaded images (upload → send are two separate requests) _pending_image_ocr: dict[str, str] = {} @@ -101,16 +60,6 @@ ) -def _inject_context_files(prompt: str | None, session: dict) -> str | None: - """Inject context file contents into system prompt if session has any.""" - context_files = session.get("context_files") - if not context_files: - return prompt - files_text = "\n\n".join(f"# {f['name']}\n{f['content']}" for f in context_files) - base = prompt or _DEFAULT_RAG_PROMPT - return f"{base}\n\n--- Прикреплённые файлы ---\n{files_text}" - - def _finalize_prompt( prompt: str | None, agentic_rag: bool = False, web_search: bool = False ) -> str: @@ -126,88 +75,6 @@ def _finalize_prompt( return result -def _should_use_agentic_rag( - llm_service, rag_mode: str, collection_ids: list[int], wiki_rag -) -> bool: - """Check if agentic RAG loop should be used instead of one-shot injection.""" - if rag_mode == "none" or not collection_ids or not wiki_rag: - return False - # Check provider supports tools (CloudLLMService wraps provider, VLLMLLMService has it directly) - if getattr(llm_service, "supports_tools", False): - return True - return hasattr(llm_service, "provider") and getattr( - llm_service.provider, "supports_tools", False - ) - - -def _execute_knowledge_search(wiki_rag, query: str, collection_ids: list[int]) -> str: - """Execute a knowledge base search and return results text (sync).""" - if len(collection_ids) == 1: - return wiki_rag.retrieve(query, top_k=5, max_chars=3000, collection_id=collection_ids[0]) - return wiki_rag.retrieve_multi(query, collection_ids, top_k=5, max_chars=3000) - - -async def _execute_knowledge_search_async(wiki_rag, query: str, collection_ids: list[int]) -> str: - """Execute knowledge search with vector search support (async).""" - if not wiki_rag.vector_search_available: - import asyncio - - return await asyncio.to_thread(_execute_knowledge_search, wiki_rag, query, collection_ids) - - if len(collection_ids) == 1: - return await wiki_rag.retrieve_async( - query, top_k=5, max_chars=3000, collection_id=collection_ids[0] - ) - return await wiki_rag.retrieve_multi_async(query, collection_ids, top_k=5, max_chars=3000) - - -def _build_tools(use_agentic: bool, use_web_search: bool) -> list[dict]: - """Build the tools list based on enabled features.""" - tools: list[dict] = [] - if use_agentic: - tools.append(KNOWLEDGE_SEARCH_TOOL) - if use_web_search: - tools.append(WEB_SEARCH_TOOL) - return tools - - -def _supports_tools(llm_service) -> bool: - """Check if the LLM provider supports tool calling.""" - if getattr(llm_service, "supports_tools", False): - return True - return hasattr(llm_service, "provider") and getattr( - llm_service.provider, "supports_tools", False - ) - - -def _execute_tool_call( - fn_name: str, - args: dict, - wiki_rag, - collection_ids: list[int], -) -> tuple[str, bool]: - """Execute a tool call and return (result_text, found). - - Returns the result text and whether the search found anything. - """ - if fn_name == "knowledge_search": - query = args.get("query", "") - if not query: - return "Пустой поисковый запрос.", False - result = _execute_knowledge_search(wiki_rag, query, collection_ids) - found = bool(result and result.strip()) - return result or "Ничего не найдено в базе знаний.", found - elif fn_name == "web_search": - query = args.get("query", "") - if not query: - return "Пустой поисковый запрос.", False - result = web_search_service.search(query, max_results=5) - found = bool(result and "No web results" not in result and "failed" not in result) - return result, found - else: - return f"Неизвестный инструмент: {fn_name}", False - - def _extract_collection_ids(data: dict) -> list[int]: """Extract collection IDs from config dict. @@ -325,124 +192,6 @@ def _resolve_from_config(cfg: dict) -> tuple[str, list[int]] | None: return "all", await _get_all_enabled_collection_ids() -def _inject_rag_context( - wiki_rag, - user_content: str, - base_prompt: Optional[str], - rag_mode: str, - collection_ids: list[int], -) -> Optional[str]: - """Inject RAG context into system prompt based on rag_mode. - - Returns updated prompt or base_prompt unchanged if no RAG injection needed. - collection_ids: list of collection IDs to search (resolved by _resolve_rag_config). - """ - logger.info( - f"RAG inject: mode={rag_mode}, ids={collection_ids}, " - f"wiki_rag={'yes' if wiki_rag else 'NO'}, query={user_content[:80]!r}" - ) - if not wiki_rag or not user_content or rag_mode == "none" or not collection_ids: - logger.info( - f"RAG inject: skipped (wiki_rag={bool(wiki_rag)}, " - f"content={bool(user_content)}, mode={rag_mode}, ids={collection_ids})" - ) - return base_prompt - - loaded_ids = ( - list(wiki_rag._collection_indexes.keys()) - if hasattr(wiki_rag, "_collection_indexes") - else [] - ) - logger.info(f"RAG inject: loaded collection indexes: {loaded_ids}") - - if len(collection_ids) == 1: - wiki_context = wiki_rag.retrieve( - user_content, top_k=7, max_chars=4000, collection_id=collection_ids[0] - ) - else: - wiki_context = wiki_rag.retrieve_multi( - user_content, collection_ids, top_k=7, max_chars=4000 - ) - - logger.info( - f"RAG inject: context found={bool(wiki_context)}, len={len(wiki_context) if wiki_context else 0}" - ) - - base = base_prompt or _DEFAULT_RAG_PROMPT - if wiki_context: - rag_instruction = ( - "\n\n--- КОНТЕКСТ ИЗ БАЗЫ ЗНАНИЙ (обязательно к использованию) ---\n" - "Ниже приведена релевантная информация из базы знаний. " - "ОБЯЗАТЕЛЬНО используй эти данные при ответе. " - "Если информация ниже отвечает на вопрос пользователя — ответь на основе неё. " - "НЕ выдумывай информацию, которой нет в контексте ниже.\n" - ) - return f"{base}{rag_instruction}\n{wiki_context}" - - # RAG search returned nothing — instruct LLM not to hallucinate - no_context_instruction = ( - "\n\n--- ВАЖНО ---\n" - "По данному запросу в базе знаний не найдено релевантной информации. " - "НЕ выдумывай ответ. Если ты не уверен в точности информации — " - "честно скажи, что не нашёл данных в базе знаний, и предложи " - "обратиться к менеджеру или уточнить вопрос.\n" - ) - return f"{base}{no_context_instruction}" - - -async def _inject_rag_context_async( - wiki_rag, - user_content: str, - base_prompt: Optional[str], - rag_mode: str, - collection_ids: list[int], -) -> Optional[str]: - """Async version of _inject_rag_context — includes vector search results.""" - if not wiki_rag or not user_content or rag_mode == "none" or not collection_ids: - return base_prompt - - if not wiki_rag.vector_search_available: - import asyncio - - return await asyncio.to_thread( - _inject_rag_context, wiki_rag, user_content, base_prompt, rag_mode, collection_ids - ) - - if len(collection_ids) == 1: - wiki_context = await wiki_rag.retrieve_async( - user_content, top_k=7, max_chars=4000, collection_id=collection_ids[0] - ) - else: - wiki_context = await wiki_rag.retrieve_multi_async( - user_content, collection_ids, top_k=7, max_chars=4000 - ) - - logger.info( - f"RAG inject (async): context found={bool(wiki_context)}, " - f"len={len(wiki_context) if wiki_context else 0}" - ) - - base = base_prompt or _DEFAULT_RAG_PROMPT - if wiki_context: - rag_instruction = ( - "\n\n--- КОНТЕКСТ ИЗ БАЗЫ ЗНАНИЙ (обязательно к использованию) ---\n" - "Ниже приведена релевантная информация из базы знаний. " - "ОБЯЗАТЕЛЬНО используй эти данные при ответе. " - "Если информация ниже отвечает на вопрос пользователя — ответь на основе неё. " - "НЕ выдумывай информацию, которой нет в контексте ниже.\n" - ) - return f"{base}{rag_instruction}\n{wiki_context}" - - no_context_instruction = ( - "\n\n--- ВАЖНО ---\n" - "По данному запросу в базе знаний не найдено релевантной информации. " - "НЕ выдумывай ответ. Если ты не уверен в точности информации — " - "честно скажи, что не нашёл данных в базе знаний, и предложи " - "обратиться к менеджеру или уточнить вопрос.\n" - ) - return f"{base}{no_context_instruction}" - - # ============== Token Counting Helpers ============== @@ -468,18 +217,6 @@ def _build_token_usage(messages: list[dict], model: str, trimmed: bool = False) } -def _trim_and_log(messages: list[dict], model: str, session_id: str) -> tuple[list[dict], bool]: - """Trim messages to fit context window and log if trimmed.""" - context_window = get_context_window(model) - trimmed_messages, was_trimmed = trim_messages(messages, context_window) - if was_trimmed: - logger.info( - f"Trimmed context for session {session_id}: " - f"{len(messages)} -> {len(trimmed_messages)} messages" - ) - return trimmed_messages, was_trimmed - - # ============== Pydantic Models ============== @@ -821,80 +558,23 @@ async def admin_send_chat_message( session_id, "user", llm_content_ns, extra_data=extra_data_json_ns ) - # Получаем историю для LLM - # Session prompt takes priority; fallback to LLM service default - default_prompt = session.get("system_prompt") - if not default_prompt and hasattr(llm_service, "get_system_prompt"): - default_prompt = llm_service.get_system_prompt() - - # RAG: inject relevant wiki context based on rag_mode + # Resolve RAG config rag_mode, collection_ids = await _resolve_rag_config(session, msg_request.llm_override) - wiki_rag = container.wiki_rag_service - use_agentic = _should_use_agentic_rag(llm_service, rag_mode, collection_ids, wiki_rag) - use_web_search = bool(session.get("web_search_enabled")) and _supports_tools(llm_service) - if not use_agentic: - default_prompt = await _inject_rag_context_async( - wiki_rag, msg_request.content, default_prompt, rag_mode, collection_ids - ) - - # Inject context files - default_prompt = _inject_context_files(default_prompt, session) - - messages = await chat_service.get_messages_for_llm( - session_id, - _finalize_prompt(default_prompt, agentic_rag=use_agentic, web_search=use_web_search), - ) - - # Trim to fit context window - model = _get_model_name(llm_service) - messages, _ = _trim_and_log(messages, model, session_id) - - # Генерируем ответ + # Delegate generation to ChatService facade + facade = chat_service_facade or ChatServiceImpl(container) try: - if use_agentic or use_web_search: - # Agentic RAG loop (non-streaming) - tools = _build_tools(use_agentic, use_web_search) - loop_messages = list(messages) - response_text = "" - - for _iteration in range(MAX_TOOL_ITERATIONS): - result = llm_service.generate_response_from_messages( - loop_messages, stream=False, tools=tools - ) - if isinstance(result, str): - response_text = result - break - # dict with tool_calls - tool_calls = result.get("tool_calls") - if not tool_calls: - response_text = (result.get("content") or "").strip() - break - - loop_messages.append(result) # assistant message with tool_calls - for tc in tool_calls: - fn_name = tc["function"]["name"] - try: - args = json.loads(tc["function"]["arguments"]) - except json.JSONDecodeError: - logger.warning( - f"Tool {fn_name}: malformed args: {tc['function']['arguments'][:100]}" - ) - args = {} - result_text, _ = _execute_tool_call(fn_name, args, wiki_rag, collection_ids) - loop_messages.append( - {"role": "tool", "tool_call_id": tc["id"], "content": result_text} - ) - else: - response_text = llm_service.generate_response_from_messages(messages, stream=False) - if hasattr(response_text, "__iter__") and not isinstance(response_text, str): - response_text = "".join(response_text) - - assistant_msg = await chat_service.add_message(session_id, "assistant", response_text) + assistant_msg = await facade.send_message( + session_id, + llm_content_ns, + llm_service=llm_service, + session_data=session, + rag_mode=rag_mode, + collection_ids=collection_ids, + ) return {"message": user_msg, "response": assistant_msg} - except Exception as e: - logger.error(f"❌ Chat error: {e}") + logger.error(f"Chat error: {e}") raise HTTPException(status_code=500, detail=str(e)) @@ -1065,125 +745,33 @@ async def admin_stream_chat_message( session_id, "user", llm_content, extra_data=extra_data_json ) - # Получаем историю для LLM - # Priority: widget prompt > session prompt > LLM service default - default_prompt = custom_prompt or session.get("system_prompt") - if not default_prompt and hasattr(active_llm, "get_system_prompt"): - default_prompt = active_llm.get_system_prompt() - - # RAG: inject relevant wiki context based on rag_mode + # Resolve RAG config (stays in router — depends on override/widget/mobile context) rag_mode, collection_ids = await _resolve_rag_config( session, msg_request.llm_override, msg_request.widget_instance_id, msg_request.mobile_instance_id, ) - wiki_rag = container.wiki_rag_service - use_agentic = _should_use_agentic_rag(active_llm, rag_mode, collection_ids, wiki_rag) - use_web_search = bool(session.get("web_search_enabled")) and _supports_tools(active_llm) - - if not use_agentic: - # One-shot RAG: inject context into prompt (existing behavior) - default_prompt = await _inject_rag_context_async( - wiki_rag, msg_request.content, default_prompt, rag_mode, collection_ids - ) - - # Inject context files - default_prompt = _inject_context_files(default_prompt, session) - - messages = await chat_service.get_messages_for_llm( - session_id, - _finalize_prompt(default_prompt, agentic_rag=use_agentic, web_search=use_web_search), - ) - # Trim to fit context window - model = _get_model_name(active_llm) - messages, was_trimmed = _trim_and_log(messages, model, session_id) + # Delegate generation to ChatService facade + facade = chat_service_facade or ChatServiceImpl(container) async def generate_stream(): - full_response = [] - try: - # Отправляем сообщение пользователя - yield f"data: {json.dumps({'type': 'user_message', 'message': user_msg}, ensure_ascii=False)}\n\n" - - if use_agentic or use_web_search: - # Agentic RAG loop: LLM decides when to search - tools = _build_tools(use_agentic, use_web_search) - loop_messages = list(messages) - - for _iteration in range(MAX_TOOL_ITERATIONS): - content_chunks = [] - tool_calls_result = None - - for event in active_llm.generate_response_from_messages( - loop_messages, stream=True, tools=tools - ): - if isinstance(event, dict): - if event["type"] == "content": - content_chunks.append(event["content"]) - full_response.append(event["content"]) - yield f"data: {json.dumps({'type': 'chunk', 'content': event['content']}, ensure_ascii=False)}\n\n" - elif event["type"] == "tool_calls": - tool_calls_result = event["tool_calls"] - else: - # Safety fallback: plain str from provider - full_response.append(event) - yield f"data: {json.dumps({'type': 'chunk', 'content': event}, ensure_ascii=False)}\n\n" - - if not tool_calls_result: - break # LLM answered with text, done - - # Execute tool calls and feed results back - assistant_content = "".join(content_chunks) or None - loop_messages.append( - { - "role": "assistant", - "content": assistant_content, - "tool_calls": tool_calls_result, - } - ) - - for tc in tool_calls_result: - fn_name = tc["function"]["name"] - try: - args = json.loads(tc["function"]["arguments"]) - except json.JSONDecodeError: - logger.warning( - f"Tool {fn_name}: malformed args: {tc['function']['arguments'][:100]}" - ) - args = {} - query = args.get("query", "") - yield f"data: {json.dumps({'type': 'tool_start', 'name': fn_name, 'query': query}, ensure_ascii=False)}\n\n" - - result_text, found = _execute_tool_call( - fn_name, args, wiki_rag, collection_ids - ) - yield f"data: {json.dumps({'type': 'tool_end', 'name': fn_name, 'found': found}, ensure_ascii=False)}\n\n" - - loop_messages.append( - {"role": "tool", "tool_call_id": tc["id"], "content": result_text} - ) + async for chunk in facade.stream_message( + session_id, + llm_content, + llm_service=active_llm, + session_data=session, + user_msg=user_msg, + system_prompt=custom_prompt, + rag_mode=rag_mode, + collection_ids=collection_ids, + ): + # Serialize StreamChunk to SSE event + if chunk.get("done"): + yield "data: [DONE]\n\n" else: - # One-shot: existing streaming path - for chunk in active_llm.generate_response_from_messages(messages, stream=True): - full_response.append(chunk) - yield f"data: {json.dumps({'type': 'chunk', 'content': chunk}, ensure_ascii=False)}\n\n" - - # Сохраняем полный ответ - response_text = "".join(full_response) - assistant_msg = await chat_service.add_message(session_id, "assistant", response_text) - - # Build token_usage for the final event - all_msgs = messages + [{"role": "assistant", "content": response_text}] - token_usage = _build_token_usage(all_msgs, model, was_trimmed) - - # Отправляем финальное сообщение - yield f"data: {json.dumps({'type': 'assistant_message', 'message': assistant_msg, 'token_usage': token_usage}, ensure_ascii=False)}\n\n" - yield "data: [DONE]\n\n" - - except Exception as e: - logger.error(f"❌ Chat stream error: {e}") - yield f"data: {json.dumps({'type': 'error', 'content': str(e)})}\n\n" + yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n" return StreamingResponse( generate_stream(), @@ -1228,77 +816,22 @@ async def admin_edit_chat_message( if not llm_service: return {"message": edited_msg} - default_prompt = session.get("system_prompt") - if not default_prompt and hasattr(llm_service, "get_system_prompt"): - default_prompt = llm_service.get_system_prompt() - - # RAG: inject relevant wiki context based on rag_mode rag_mode, collection_ids = await _resolve_rag_config(session) - wiki_rag = container.wiki_rag_service - use_agentic = _should_use_agentic_rag(llm_service, rag_mode, collection_ids, wiki_rag) - use_web_search = bool(session.get("web_search_enabled")) and _supports_tools(llm_service) - - if not use_agentic: - default_prompt = await _inject_rag_context_async( - wiki_rag, request.content, default_prompt, rag_mode, collection_ids - ) - - # Inject context files - default_prompt = _inject_context_files(default_prompt, session) - - messages = await chat_service.get_messages_for_llm( - session_id, - _finalize_prompt(default_prompt, agentic_rag=use_agentic, web_search=use_web_search), - ) - - # Trim to fit context window - model = _get_model_name(llm_service) - messages, _ = _trim_and_log(messages, model, session_id) + facade = chat_service_facade or ChatServiceImpl(container) try: - if use_agentic or use_web_search: - tools = _build_tools(use_agentic, use_web_search) - loop_messages = list(messages) - response_text = "" - - for _iteration in range(MAX_TOOL_ITERATIONS): - result = llm_service.generate_response_from_messages( - loop_messages, stream=False, tools=tools - ) - if isinstance(result, str): - response_text = result - break - tool_calls = result.get("tool_calls") - if not tool_calls: - response_text = (result.get("content") or "").strip() - break - loop_messages.append(result) - for tc in tool_calls: - fn_name = tc["function"]["name"] - try: - args = json.loads(tc["function"]["arguments"]) - except json.JSONDecodeError: - logger.warning( - f"Tool {fn_name}: malformed args: {tc['function']['arguments'][:100]}" - ) - args = {} - result_text, _ = _execute_tool_call(fn_name, args, wiki_rag, collection_ids) - loop_messages.append( - {"role": "tool", "tool_call_id": tc["id"], "content": result_text} - ) - else: - response_text = llm_service.generate_response_from_messages(messages, stream=False) - if hasattr(response_text, "__iter__") and not isinstance(response_text, str): - response_text = "".join(response_text) - - # Add response as child of the new edited message - assistant_msg = await chat_service.add_message( - session_id, "assistant", response_text, parent_id=edited_msg["id"] + assistant_msg = await facade.send_message( + session_id, + request.content, + llm_service=llm_service, + session_data=session, + rag_mode=rag_mode, + collection_ids=collection_ids, + parent_id=edited_msg["id"], ) return {"message": edited_msg, "response": assistant_msg} - except Exception as e: - logger.error(f"❌ Chat regenerate error: {e}") + logger.error(f"Chat edit regenerate error: {e}") return {"message": edited_msg, "error": str(e)} @@ -1355,78 +888,24 @@ async def admin_regenerate_chat_response( break parent_id = message_id - # Generate new response - default_prompt = session.get("system_prompt") - if not default_prompt and hasattr(llm_service, "get_system_prompt"): - default_prompt = llm_service.get_system_prompt() - - # RAG: inject relevant wiki context based on rag_mode + # Generate new response via facade user_content = target_msg["content"] if target_msg["role"] == "user" else "" rag_mode, collection_ids = await _resolve_rag_config(session) - wiki_rag = container.wiki_rag_service - use_agentic = _should_use_agentic_rag(llm_service, rag_mode, collection_ids, wiki_rag) - use_web_search = bool(session.get("web_search_enabled")) and _supports_tools(llm_service) - - if not use_agentic: - default_prompt = await _inject_rag_context_async( - wiki_rag, user_content, default_prompt, rag_mode, collection_ids - ) - - # Inject context files - default_prompt = _inject_context_files(default_prompt, session) - - llm_messages = await chat_service.get_messages_for_llm( - session_id, - _finalize_prompt(default_prompt, agentic_rag=use_agentic, web_search=use_web_search), - ) - - # Trim to fit context window - model = _get_model_name(llm_service) - llm_messages, _ = _trim_and_log(llm_messages, model, session_id) + facade = chat_service_facade or ChatServiceImpl(container) try: - if use_agentic or use_web_search: - tools = _build_tools(use_agentic, use_web_search) - loop_messages = list(llm_messages) - response_text = "" - - for _iteration in range(MAX_TOOL_ITERATIONS): - result = llm_service.generate_response_from_messages( - loop_messages, stream=False, tools=tools - ) - if isinstance(result, str): - response_text = result - break - tool_calls = result.get("tool_calls") - if not tool_calls: - response_text = (result.get("content") or "").strip() - break - loop_messages.append(result) - for tc in tool_calls: - fn_name = tc["function"]["name"] - try: - args = json.loads(tc["function"]["arguments"]) - except json.JSONDecodeError: - logger.warning( - f"Tool {fn_name}: malformed args: {tc['function']['arguments'][:100]}" - ) - args = {} - result_text, _ = _execute_tool_call(fn_name, args, wiki_rag, collection_ids) - loop_messages.append( - {"role": "tool", "tool_call_id": tc["id"], "content": result_text} - ) - else: - response_text = llm_service.generate_response_from_messages(llm_messages, stream=False) - if hasattr(response_text, "__iter__") and not isinstance(response_text, str): - response_text = "".join(response_text) - - assistant_msg = await chat_service.add_message( - session_id, "assistant", response_text, parent_id=parent_id + assistant_msg = await facade.send_message( + session_id, + user_content, + llm_service=llm_service, + session_data=session, + rag_mode=rag_mode, + collection_ids=collection_ids, + parent_id=parent_id, ) return {"response": assistant_msg} - except Exception as e: - logger.error(f"❌ Chat regenerate error: {e}") + logger.error(f"Chat regenerate error: {e}") raise HTTPException(status_code=500, detail=str(e)) diff --git a/modules/chat/schemas.py b/modules/chat/schemas.py index c92a880..7004930 100644 --- a/modules/chat/schemas.py +++ b/modules/chat/schemas.py @@ -65,11 +65,22 @@ class MessageInfo(TypedDict): class StreamChunk(TypedDict, total=False): """A single chunk from ChatService.stream_message(). - ``content`` — text delta from the model. - ``done`` — ``True`` on the final chunk (includes ``token_usage``). + Event types: + - ``user_message`` — echoes saved user message (``message`` field) + - ``chunk`` — text delta from the model (``content`` field) + - ``tool_start`` — agentic RAG tool invoked (``name``, ``query``) + - ``tool_end`` — tool finished (``name``, ``found``) + - ``assistant_message`` — full saved response (``message``, ``token_usage``) + - ``done`` — stream finished + - ``error`` — generation error (``content``) """ + type: str content: str + message: dict + name: str + query: str + found: bool done: bool token_usage: TokenUsage | None diff --git a/orchestrator.py b/orchestrator.py index 9994cf6..48e4567 100644 --- a/orchestrator.py +++ b/orchestrator.py @@ -210,6 +210,14 @@ async def startup_event(): container.llm_service_facade = llm_facade _llm_facade_module.llm_service_facade = llm_facade + # Chat service facade (Phase 7.4) + from modules.chat import facade as _chat_facade_module + from modules.chat.facade import ChatServiceImpl + + chat_facade = ChatServiceImpl(container) + container.chat_service_facade = chat_facade + _chat_facade_module.chat_service_facade = chat_facade + # Reload FAQ and voice presets from DB from modules.knowledge.startup import reload_llm_faq from modules.speech.startup import reload_voice_presets