From d6a86940e17aca5bd835c6cb76b01f6a34ec70f0 Mon Sep 17 00:00:00 2001 From: shaerware Date: Sun, 29 Mar 2026 04:45:15 +0000 Subject: [PATCH 1/2] =?UTF-8?q?feat(knowledge):=20KnowledgeService=20facad?= =?UTF-8?q?e=20=E2=80=94=20unified=20entry=20point=20(#643)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 7.2: Create KnowledgeServiceImpl that wraps wiki_rag_service (search/retrieval) and knowledge domain services (collections, documents, FAQ) behind a single Protocol-conformant facade. - modules/knowledge/facade.py: KnowledgeServiceImpl + dict→TypedDict converters - Lazy wiki_rag access via ServiceContainer (handles late initialization) - Multi-collection search with dedup/merge - Registered in ServiceContainer.knowledge_service during startup - 18 unit tests (protocol compliance, signatures, converters) Co-Authored-By: Claude Opus 4.6 (1M context) --- CLAUDE.md | 4 +- app/dependencies.py | 3 + modules/knowledge/facade.py | 283 +++++++++++++++++++++++++++ modules/knowledge/startup.py | 7 + tests/unit/test_knowledge_service.py | 213 ++++++++++++++++++++ 5 files changed, 509 insertions(+), 1 deletion(-) create mode 100644 modules/knowledge/facade.py create mode 100644 tests/unit/test_knowledge_service.py diff --git a/CLAUDE.md b/CLAUDE.md index 741613a..abf676e 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -146,7 +146,7 @@ Always run lint locally before pushing. Protected branches require PR workflow ### Modular Infrastructure (`modules/`) -Foundation layer for modular decomposition (issue #489). Phase 4 complete: all 28 routers migrated (Phase 3), all inline endpoints extracted (Phase 4.1–4.5), all background tasks via `TaskRegistry` (Phase 4.6), all startup helpers + service init extracted to domain `startup.py` modules, global service variables removed (Phase 4.7a/b). Phase 5.1–5.4 complete (EventBus infrastructure, first events, DatasetSynced, Widget→CRM events). Phase 5.5–5.6 and Phase 6 (protocol interfaces) pending. +Foundation layer for modular decomposition (issue #489). Phase 4 complete: all 28 routers migrated (Phase 3), all inline endpoints extracted (Phase 4.1–4.5), all background tasks via `TaskRegistry` (Phase 4.6), all startup helpers + service init extracted to domain `startup.py` modules, global service variables removed (Phase 4.7a/b). Phase 5.1–5.4 complete (EventBus infrastructure, first events, DatasetSynced, Widget→CRM events). Phase 6 complete (Protocol interfaces for AuthService, KnowledgeService, LLMService, ChatService in `modules/*/protocols.py`). Phase 7.1 complete (AuthService facade in `modules/core/auth_service.py`). Phase 7.2 complete (KnowledgeService facade in `modules/knowledge/facade.py`). Phase 7.3–7.4 pending. - **`EventBus`** (`modules/core/events.py`): In-process async pub/sub. Handlers run concurrently via `asyncio.gather`; exceptions are logged, never propagated to publisher. `BaseEvent` dataclass with auto-timestamp. Singleton in `ServiceContainer.event_bus`. Domain events: `InternetStatusChanged`, `UserRoleChanged`, `SessionRevoked`, `DatasetSynced` (in `modules/core/events.py`), `KnowledgeUpdated` (in `modules/knowledge/events.py`), `WidgetSessionCreated`, `WidgetMessageSent`, `WidgetContactSubmitted` (in `modules/channels/widget/events.py`). Subscriptions registered via `setup_event_subscriptions()` in `modules/core/startup.py`, which delegates to domain-specific setup functions (`setup_llm_event_subscriptions()` in `modules/llm/startup.py`, `setup_knowledge_event_subscriptions()` in `modules/knowledge/startup.py`, `setup_crm_event_subscriptions()` in `modules/crm/startup.py`). `DatasetSynced` decouples CRM/ecommerce/kanban from knowledge. Widget events decouple widget router from amoCRM: widget publishes events, CRM domain handles lead/contact/note creation reactively. - **`TaskRegistry`** (`modules/core/tasks.py`): Named background tasks — periodic (interval-based) or one-shot. `start_all()` / `cancel_all(timeout)` lifecycle. `TaskInfo` dataclass tracks status, run count, last error. 6 tasks registered in `startup_event()`: `session-cleanup` (1h), `periodic-vacuum` (7d), `kanban-sync` (15min), `woocommerce-sync` (daily 23:00 UTC), `wiki-embeddings` (one-shot), `wiki-collection-indexes` (one-shot). Task functions in `modules/core/maintenance.py`, `modules/knowledge/tasks.py`, `modules/kanban/tasks.py`, `modules/ecommerce/tasks.py`. @@ -163,6 +163,8 @@ Import from `modules.core`: `EventBus`, `BaseEvent`, `TaskRegistry`, `TaskInfo`, | Module | File | Service Classes | |--------|------|-----------------| | `modules/core/` | `service.py` | `DatabaseService`, `UserService`, `UserSessionService`, `RoleService`, `WorkspaceService`, `ConfigService`, `UserIdentityService` | +| `modules/core/` | `auth_service.py` | `AuthService` (Phase 7.1 facade — wraps `auth_manager.py` functions, implements Protocol from `protocols.py`) | +| `modules/knowledge/` | `facade.py` | `KnowledgeServiceImpl` (Phase 7.2 facade — wraps `wiki_rag_service` + knowledge services, implements Protocol from `protocols.py`) | | `modules/chat/` | `service.py` | `ChatService`, `ChatShareService` | | `modules/knowledge/` | `service.py` | `FAQService`, `KnowledgeDocService`, `KnowledgeCollectionService`, `GitHubRepoProjectService` | | `modules/channels/telegram/` | `service.py` | `BotInstanceService`, `TelegramSessionService` | diff --git a/app/dependencies.py b/app/dependencies.py index 39185d6..f2dc6d5 100644 --- a/app/dependencies.py +++ b/app/dependencies.py @@ -54,6 +54,9 @@ def __init__(self): # Wiki RAG service self.wiki_rag_service = None + # Knowledge service facade (Phase 7.2) + self.knowledge_service = None + # Vector Search client (HTTP → microservice) self.vector_search_client = None diff --git a/modules/knowledge/facade.py b/modules/knowledge/facade.py new file mode 100644 index 0000000..b048597 --- /dev/null +++ b/modules/knowledge/facade.py @@ -0,0 +1,283 @@ +"""KnowledgeService facade — unified entry point for knowledge retrieval and management. + +Implements the ``KnowledgeService`` Protocol from ``modules.knowledge.protocols``. +Delegates to existing services (Strangler Fig pattern): +- Search/retrieval → ``wiki_rag_service`` (via ServiceContainer, lazy) +- Collections → ``KnowledgeCollectionService`` +- Documents → ``KnowledgeDocService`` +- FAQ → ``FAQService`` + +Existing singletons (``faq_service``, ``knowledge_collection_service``, etc.) +remain for backward compatibility — this facade wraps them, not replaces. +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import TYPE_CHECKING, Optional + +from modules.knowledge.schemas import ( + CollectionInfo, + DocumentInfo, + FAQEntryInfo, + SearchResult, + SyncResult, +) + + +if TYPE_CHECKING: + from app.dependencies import ServiceContainer + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# dict → TypedDict converters +# --------------------------------------------------------------------------- + + +def _to_search_result(d: dict) -> SearchResult: + """Convert a wiki_rag search result dict to SearchResult TypedDict.""" + return SearchResult( + title=d.get("title", ""), + body=d.get("body", ""), + source_file=d.get("source_file", ""), + score=d.get("score", 0.0), + collection_id=d.get("collection_id"), + ) + + +def _to_collection_info(d: dict) -> CollectionInfo: + """Convert a KnowledgeCollectionService dict to CollectionInfo TypedDict.""" + return CollectionInfo( + id=d["id"], + name=d["name"], + slug=d.get("slug", ""), + description=d.get("description"), + enabled=d.get("enabled", True), + base_dir=d.get("base_dir", "wiki-pages"), + document_count=d.get("document_count", 0), + created=d.get("created"), + updated=d.get("updated"), + ) + + +def _to_document_info(d: dict) -> DocumentInfo: + """Convert a KnowledgeDocService dict to DocumentInfo TypedDict.""" + return DocumentInfo( + id=d["id"], + filename=d["filename"], + title=d.get("title", ""), + source_type=d.get("source_type", "manual"), + file_size_bytes=d.get("file_size_bytes", 0), + section_count=d.get("section_count", 0), + collection_id=d.get("collection_id"), + created=d.get("created"), + updated=d.get("updated"), + ) + + +def _to_faq_entry_info(d: dict) -> FAQEntryInfo: + """Convert a FAQService dict to FAQEntryInfo TypedDict.""" + return FAQEntryInfo( + id=d["id"], + question=d["question"], + answer=d["answer"], + keywords=d.get("keywords", []), + enabled=d.get("enabled", True), + hit_count=d.get("hit_count", 0), + created=d.get("created"), + updated=d.get("updated"), + ) + + +# --------------------------------------------------------------------------- +# Facade +# --------------------------------------------------------------------------- + + +class KnowledgeServiceImpl: + """Facade implementing the KnowledgeService Protocol. + + Receives ``ServiceContainer`` to lazily access ``wiki_rag_service`` + (initialized later during startup). CRUD services are imported + directly from ``modules.knowledge.service``. + """ + + def __init__(self, container: ServiceContainer) -> None: + self._container = container + + # -- Search --------------------------------------------------------------- + + async def search( + self, + query: str, + *, + collection_ids: list[int] | None = None, + top_k: int = 3, + max_chars: int = 2500, + ) -> list[SearchResult]: + """Semantic + BM25 search across indexed documents.""" + wiki_rag = self._container.wiki_rag_service + if not wiki_rag: + return [] + + if not collection_ids: + # Global index + raw = await wiki_rag.search_async(query, top_k=top_k) + elif len(collection_ids) == 1: + raw = await wiki_rag.search_async(query, top_k=top_k, collection_id=collection_ids[0]) + else: + # Multi-collection: search each, merge, deduplicate, sort + import asyncio + + tasks = [ + wiki_rag.search_async(query, top_k=top_k, collection_id=cid) + for cid in collection_ids + ] + results_per_collection = await asyncio.gather(*tasks) + + # Deduplicate by (source_file, title), keep best score + seen: dict[tuple[str, str], dict] = {} + for results in results_per_collection: + for r in results: + key = (r.get("source_file", ""), r.get("title", "")) + if key not in seen or r.get("score", 0) > seen[key].get("score", 0): + seen[key] = r + raw = sorted(seen.values(), key=lambda x: x.get("score", 0), reverse=True)[:top_k] + + return [_to_search_result(r) for r in raw] + + async def retrieve_context( + self, + query: str, + *, + collection_ids: list[int] | None = None, + top_k: int = 3, + max_chars: int = 2500, + ) -> str: + """Return pre-formatted context string for LLM prompt injection.""" + wiki_rag = self._container.wiki_rag_service + if not wiki_rag: + return "" + + if not collection_ids: + return await wiki_rag.retrieve_async(query, top_k=top_k, max_chars=max_chars) + elif len(collection_ids) == 1: + return await wiki_rag.retrieve_async( + query, top_k=top_k, max_chars=max_chars, collection_id=collection_ids[0] + ) + else: + return await wiki_rag.retrieve_multi_async( + query, collection_ids=collection_ids, top_k=top_k, max_chars=max_chars + ) + + # -- Collections ---------------------------------------------------------- + + async def get_collections( + self, + *, + enabled_only: bool = False, + workspace_id: int | None = None, + ) -> list[CollectionInfo]: + """List knowledge collections, optionally filtered.""" + from modules.knowledge.service import knowledge_collection_service + + raw = await knowledge_collection_service.get_all( + enabled_only=enabled_only, workspace_id=workspace_id + ) + return [_to_collection_info(c) for c in raw] + + async def get_collection( + self, + collection_id: int, + *, + workspace_id: int | None = None, + ) -> CollectionInfo | None: + """Get a single collection by ID.""" + from modules.knowledge.service import knowledge_collection_service + + raw = await knowledge_collection_service.get_by_id( + collection_id, workspace_id=workspace_id + ) + if raw is None: + return None + return _to_collection_info(raw) + + # -- Documents ------------------------------------------------------------ + + async def get_documents( + self, + collection_id: int, + *, + workspace_id: int | None = None, + ) -> list[DocumentInfo]: + """List documents in a collection.""" + from modules.knowledge.service import knowledge_doc_service + + raw = await knowledge_doc_service.get_by_collection( + collection_id, workspace_id=workspace_id + ) + return [_to_document_info(d) for d in raw] + + async def sync_documents( + self, + collection_id: int, + base_dir: str, + ) -> SyncResult: + """Re-index documents from disk into the collection.""" + from modules.knowledge.service import knowledge_collection_service + + # Get filenames for this collection + filenames = await knowledge_collection_service.get_document_filenames(collection_id) + + # Reload RAG index + wiki_rag = self._container.wiki_rag_service + sections_indexed = 0 + if wiki_rag: + cidx = wiki_rag.reload_collection(collection_id, filenames, Path(base_dir)) + if cidx: + sections_indexed = len(cidx.sections) + + # Sync to Vector Search if available + vs_client = self._container.vector_search_client + if vs_client and wiki_rag: + try: + from modules.knowledge.tasks import sync_collection_to_vector_search + + collection = await knowledge_collection_service.get_by_id(collection_id) + slug = collection["slug"] if collection else str(collection_id) + await sync_collection_to_vector_search(wiki_rag, vs_client, collection_id, slug) + except Exception as vs_err: + logger.warning("Vector Search sync failed for collection %d: %s", collection_id, vs_err) + + return SyncResult( + collection_id=collection_id, + documents_synced=len(filenames), + sections_indexed=sections_indexed, + ) + + # -- FAQ ------------------------------------------------------------------ + + async def find_faq_answer(self, question: str) -> str | None: + """BM25-match a user question against FAQ entries.""" + from modules.knowledge.service import faq_service + + return await faq_service.find_answer(question) + + async def get_faq_entries( + self, + *, + workspace_id: int | None = None, + ) -> list[FAQEntryInfo]: + """List all FAQ entries.""" + from modules.knowledge.service import faq_service + + raw = await faq_service.get_all_entries(workspace_id=workspace_id) + return [_to_faq_entry_info(e) for e in raw] + + +# Module-level singleton — NOT created here because it needs ServiceContainer. +# Created in startup and set via: knowledge_service_impl = KnowledgeServiceImpl(container) +knowledge_service: Optional[KnowledgeServiceImpl] = None diff --git a/modules/knowledge/startup.py b/modules/knowledge/startup.py index 848045e..d91cdb5 100644 --- a/modules/knowledge/startup.py +++ b/modules/knowledge/startup.py @@ -146,6 +146,13 @@ async def init_wiki_rag(container, deployment_mode: str, task_registry) -> None: """Initialize Wiki RAG service with tiered embedding provider.""" from pathlib import Path + from modules.knowledge import facade as _facade_module + from modules.knowledge.facade import KnowledgeServiceImpl + + facade = KnowledgeServiceImpl(container) + container.knowledge_service = facade + _facade_module.knowledge_service = facade + try: from app.services.wiki_rag_service import WikiRAGService diff --git a/tests/unit/test_knowledge_service.py b/tests/unit/test_knowledge_service.py new file mode 100644 index 0000000..f4ec513 --- /dev/null +++ b/tests/unit/test_knowledge_service.py @@ -0,0 +1,213 @@ +"""Tests for KnowledgeService facade. + +Verifies that: +- KnowledgeServiceImpl satisfies the Protocol structurally +- All Protocol methods exist with correct signatures +- Converter helpers produce correct TypedDict shapes +""" + +import inspect + +from modules.knowledge.facade import ( + KnowledgeServiceImpl, + _to_collection_info, + _to_document_info, + _to_faq_entry_info, + _to_search_result, +) +from modules.knowledge.protocols import KnowledgeService as KnowledgeServiceProtocol + + +class TestKnowledgeServiceProtocolCompliance: + """Verify KnowledgeServiceImpl matches the Protocol.""" + + def test_has_all_protocol_methods(self): + """KnowledgeServiceImpl must implement every method from the Protocol.""" + protocol_methods = { + name + for name, _ in inspect.getmembers(KnowledgeServiceProtocol, predicate=inspect.isfunction) + if not name.startswith("_") + } + impl_methods = { + name + for name in dir(KnowledgeServiceImpl) + if not name.startswith("_") and callable(getattr(KnowledgeServiceImpl, name)) + } + missing = protocol_methods - impl_methods + assert not missing, f"KnowledgeServiceImpl missing Protocol methods: {missing}" + + def test_search_signature(self): + sig = inspect.signature(KnowledgeServiceImpl.search) + params = list(sig.parameters.keys()) + assert "self" in params + assert "query" in params + assert "collection_ids" in params + assert "top_k" in params + assert "max_chars" in params + + def test_retrieve_context_signature(self): + sig = inspect.signature(KnowledgeServiceImpl.retrieve_context) + params = list(sig.parameters.keys()) + assert "self" in params + assert "query" in params + assert "collection_ids" in params + assert "top_k" in params + assert "max_chars" in params + + def test_get_collections_signature(self): + sig = inspect.signature(KnowledgeServiceImpl.get_collections) + params = list(sig.parameters.keys()) + assert "self" in params + assert "enabled_only" in params + assert "workspace_id" in params + + def test_get_collection_signature(self): + sig = inspect.signature(KnowledgeServiceImpl.get_collection) + params = list(sig.parameters.keys()) + assert "self" in params + assert "collection_id" in params + assert "workspace_id" in params + + def test_get_documents_signature(self): + sig = inspect.signature(KnowledgeServiceImpl.get_documents) + params = list(sig.parameters.keys()) + assert "self" in params + assert "collection_id" in params + assert "workspace_id" in params + + def test_sync_documents_signature(self): + sig = inspect.signature(KnowledgeServiceImpl.sync_documents) + params = list(sig.parameters.keys()) + assert "self" in params + assert "collection_id" in params + assert "base_dir" in params + + def test_find_faq_answer_signature(self): + sig = inspect.signature(KnowledgeServiceImpl.find_faq_answer) + params = list(sig.parameters.keys()) + assert "self" in params + assert "question" in params + + def test_get_faq_entries_signature(self): + sig = inspect.signature(KnowledgeServiceImpl.get_faq_entries) + params = list(sig.parameters.keys()) + assert "self" in params + assert "workspace_id" in params + + +class TestConverters: + """Test TypedDict converter helpers.""" + + def test_to_search_result(self): + data = { + "title": "Installation", + "body": "Run pip install...", + "source_file": "docs/install.md", + "score": 0.85, + "engine": "bm25", + } + result = _to_search_result(data) + assert result["title"] == "Installation" + assert result["body"] == "Run pip install..." + assert result["source_file"] == "docs/install.md" + assert result["score"] == 0.85 + assert result["collection_id"] is None + + def test_to_search_result_defaults(self): + result = _to_search_result({}) + assert result["title"] == "" + assert result["body"] == "" + assert result["score"] == 0.0 + assert result["collection_id"] is None + + def test_to_collection_info(self): + data = { + "id": 1, + "name": "Default", + "slug": "default", + "description": "Default collection", + "enabled": True, + "base_dir": "wiki-pages", + "document_count": 5, + "created": "2026-01-01T00:00:00", + "updated": "2026-01-02T00:00:00", + } + info = _to_collection_info(data) + assert info["id"] == 1 + assert info["name"] == "Default" + assert info["slug"] == "default" + assert info["document_count"] == 5 + assert info["enabled"] is True + + def test_to_collection_info_defaults(self): + data = {"id": 2, "name": "Test"} + info = _to_collection_info(data) + assert info["slug"] == "" + assert info["description"] is None + assert info["base_dir"] == "wiki-pages" + assert info["document_count"] == 0 + + def test_to_document_info(self): + data = { + "id": 10, + "filename": "Architecture.md", + "title": "Architecture", + "source_type": "wiki", + "file_size_bytes": 12345, + "section_count": 8, + "collection_id": 1, + "created": "2026-01-01T00:00:00", + "updated": None, + } + info = _to_document_info(data) + assert info["id"] == 10 + assert info["filename"] == "Architecture.md" + assert info["source_type"] == "wiki" + assert info["file_size_bytes"] == 12345 + assert info["collection_id"] == 1 + + def test_to_document_info_defaults(self): + data = {"id": 11, "filename": "test.md"} + info = _to_document_info(data) + assert info["title"] == "" + assert info["source_type"] == "manual" + assert info["file_size_bytes"] == 0 + assert info["section_count"] == 0 + assert info["collection_id"] is None + + def test_to_faq_entry_info(self): + data = { + "id": 1, + "question": "What is this?", + "answer": "An AI secretary", + "keywords": ["ai", "secretary"], + "enabled": True, + "hit_count": 42, + "created": "2026-01-01T00:00:00", + "updated": "2026-02-01T00:00:00", + } + info = _to_faq_entry_info(data) + assert info["id"] == 1 + assert info["question"] == "What is this?" + assert info["answer"] == "An AI secretary" + assert info["keywords"] == ["ai", "secretary"] + assert info["hit_count"] == 42 + + def test_to_faq_entry_info_defaults(self): + data = {"id": 2, "question": "?", "answer": "!"} + info = _to_faq_entry_info(data) + assert info["keywords"] == [] + assert info["enabled"] is True + assert info["hit_count"] == 0 + assert info["created"] is None + + +class TestModuleSingleton: + """Verify module-level singleton behavior.""" + + def test_singleton_initially_none(self): + from modules.knowledge.facade import knowledge_service + + # Before startup, singleton is None (no container yet) + # After startup via init_wiki_rag, it becomes KnowledgeServiceImpl + assert knowledge_service is None or isinstance(knowledge_service, KnowledgeServiceImpl) From 58f116e37bfd2e7b72e816ddedd6f6e51d076d2a Mon Sep 17 00:00:00 2001 From: shaerware Date: Sun, 29 Mar 2026 06:17:33 +0000 Subject: [PATCH 2/2] style: ruff format facade and tests Co-Authored-By: Claude Opus 4.6 (1M context) --- modules/knowledge/facade.py | 8 ++++---- tests/unit/test_knowledge_service.py | 4 +++- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/modules/knowledge/facade.py b/modules/knowledge/facade.py index b048597..e2e821b 100644 --- a/modules/knowledge/facade.py +++ b/modules/knowledge/facade.py @@ -198,9 +198,7 @@ async def get_collection( """Get a single collection by ID.""" from modules.knowledge.service import knowledge_collection_service - raw = await knowledge_collection_service.get_by_id( - collection_id, workspace_id=workspace_id - ) + raw = await knowledge_collection_service.get_by_id(collection_id, workspace_id=workspace_id) if raw is None: return None return _to_collection_info(raw) @@ -250,7 +248,9 @@ async def sync_documents( slug = collection["slug"] if collection else str(collection_id) await sync_collection_to_vector_search(wiki_rag, vs_client, collection_id, slug) except Exception as vs_err: - logger.warning("Vector Search sync failed for collection %d: %s", collection_id, vs_err) + logger.warning( + "Vector Search sync failed for collection %d: %s", collection_id, vs_err + ) return SyncResult( collection_id=collection_id, diff --git a/tests/unit/test_knowledge_service.py b/tests/unit/test_knowledge_service.py index f4ec513..e52e9d9 100644 --- a/tests/unit/test_knowledge_service.py +++ b/tests/unit/test_knowledge_service.py @@ -25,7 +25,9 @@ def test_has_all_protocol_methods(self): """KnowledgeServiceImpl must implement every method from the Protocol.""" protocol_methods = { name - for name, _ in inspect.getmembers(KnowledgeServiceProtocol, predicate=inspect.isfunction) + for name, _ in inspect.getmembers( + KnowledgeServiceProtocol, predicate=inspect.isfunction + ) if not name.startswith("_") } impl_methods = {