Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ Always run lint locally before pushing. Protected branches require PR workflow

### Modular Infrastructure (`modules/`)

Foundation layer for modular decomposition (issue #489). Phase 4 complete: all 28 routers migrated (Phase 3), all inline endpoints extracted (Phase 4.1–4.5), all background tasks via `TaskRegistry` (Phase 4.6), all startup helpers + service init extracted to domain `startup.py` modules, global service variables removed (Phase 4.7a/b). Phase 5.1–5.4 complete (EventBus infrastructure, first events, DatasetSynced, Widget→CRM events). Phase 6 complete (Protocol interfaces for AuthService, KnowledgeService, LLMService, ChatService in `modules/*/protocols.py`). Phase 7.1 complete (AuthService facade in `modules/core/auth_service.py`). Phase 7.2 complete (KnowledgeService facade in `modules/knowledge/facade.py`). Phase 7.37.4 pending.
Foundation layer for modular decomposition (issue #489). Phase 4 complete: all 28 routers migrated (Phase 3), all inline endpoints extracted (Phase 4.1–4.5), all background tasks via `TaskRegistry` (Phase 4.6), all startup helpers + service init extracted to domain `startup.py` modules, global service variables removed (Phase 4.7a/b). Phase 5.1–5.4 complete (EventBus infrastructure, first events, DatasetSynced, Widget→CRM events). Phase 6 complete (Protocol interfaces for AuthService, KnowledgeService, LLMService, ChatService in `modules/*/protocols.py`). Phase 7.1 complete (AuthService facade in `modules/core/auth_service.py`). Phase 7.2 complete (KnowledgeService facade in `modules/knowledge/facade.py`). Phase 7.3 complete (LLMService facade in `modules/llm/facade.py`). Phase 7.4 pending.

- **`EventBus`** (`modules/core/events.py`): In-process async pub/sub. Handlers run concurrently via `asyncio.gather`; exceptions are logged, never propagated to publisher. `BaseEvent` dataclass with auto-timestamp. Singleton in `ServiceContainer.event_bus`. Domain events: `InternetStatusChanged`, `UserRoleChanged`, `SessionRevoked`, `DatasetSynced` (in `modules/core/events.py`), `KnowledgeUpdated` (in `modules/knowledge/events.py`), `WidgetSessionCreated`, `WidgetMessageSent`, `WidgetContactSubmitted` (in `modules/channels/widget/events.py`). Subscriptions registered via `setup_event_subscriptions()` in `modules/core/startup.py`, which delegates to domain-specific setup functions (`setup_llm_event_subscriptions()` in `modules/llm/startup.py`, `setup_knowledge_event_subscriptions()` in `modules/knowledge/startup.py`, `setup_crm_event_subscriptions()` in `modules/crm/startup.py`). `DatasetSynced` decouples CRM/ecommerce/kanban from knowledge. Widget events decouple widget router from amoCRM: widget publishes events, CRM domain handles lead/contact/note creation reactively.
- **`TaskRegistry`** (`modules/core/tasks.py`): Named background tasks — periodic (interval-based) or one-shot. `start_all()` / `cancel_all(timeout)` lifecycle. `TaskInfo` dataclass tracks status, run count, last error. 6 tasks registered in `startup_event()`: `session-cleanup` (1h), `periodic-vacuum` (7d), `kanban-sync` (15min), `woocommerce-sync` (daily 23:00 UTC), `wiki-embeddings` (one-shot), `wiki-collection-indexes` (one-shot). Task functions in `modules/core/maintenance.py`, `modules/knowledge/tasks.py`, `modules/kanban/tasks.py`, `modules/ecommerce/tasks.py`.
Expand All @@ -165,6 +165,7 @@ Import from `modules.core`: `EventBus`, `BaseEvent`, `TaskRegistry`, `TaskInfo`,
| `modules/core/` | `service.py` | `DatabaseService`, `UserService`, `UserSessionService`, `RoleService`, `WorkspaceService`, `ConfigService`, `UserIdentityService` |
| `modules/core/` | `auth_service.py` | `AuthService` (Phase 7.1 facade — wraps `auth_manager.py` functions, implements Protocol from `protocols.py`) |
| `modules/knowledge/` | `facade.py` | `KnowledgeServiceImpl` (Phase 7.2 facade — wraps `wiki_rag_service` + knowledge services, implements Protocol from `protocols.py`) |
| `modules/llm/` | `facade.py` | `LLMServiceImpl` (Phase 7.3 facade — wraps `CloudLLMService`/`VLLMLLMService` + `CloudProviderService`, implements Protocol from `protocols.py`) |
| `modules/chat/` | `service.py` | `ChatService`, `ChatShareService` |
| `modules/knowledge/` | `service.py` | `FAQService`, `KnowledgeDocService`, `KnowledgeCollectionService`, `GitHubRepoProjectService` |
| `modules/channels/telegram/` | `service.py` | `BotInstanceService`, `TelegramSessionService` |
Expand Down
3 changes: 3 additions & 0 deletions app/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def __init__(self):
# LLM service (VLLMLLMService or CloudLLMService)
self.llm_service: Any = None

# LLM service facade (Phase 7.3)
self.llm_service_facade = None

# STT service
self.stt_service: STTService | None = None

Expand Down
187 changes: 187 additions & 0 deletions modules/llm/facade.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
"""LLMService facade — unified entry point for LLM generation.

Implements the ``LLMService`` Protocol from ``modules.llm.protocols``.
Delegates to existing services (Strangler Fig pattern):
- Generation → ``CloudLLMService`` / ``VLLMLLMService`` (via ServiceContainer, lazy)
- Provider CRUD → ``CloudProviderService``

Existing ``container.llm_service`` (CloudLLMService/VLLMLLMService instance)
remains for backward compatibility — this facade wraps it, not replaces.
"""

from __future__ import annotations

import asyncio
import logging
from typing import TYPE_CHECKING, Optional

from modules.llm.schemas import (
LLMConfig,
LLMParams,
ProviderInfo,
StreamChunk,
)


if TYPE_CHECKING:
from collections.abc import AsyncIterator

from app.dependencies import ServiceContainer

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# dict → TypedDict converters
# ---------------------------------------------------------------------------


def _to_provider_info(d: dict) -> ProviderInfo:
"""Convert a CloudProviderService dict to ProviderInfo TypedDict."""
config = d.get("config") or {}
return ProviderInfo(
id=d["id"],
name=d["name"],
provider_type=d.get("provider_type", "custom"),
model_name=d.get("model_name", ""),
enabled=d.get("enabled", True),
is_default=d.get("is_default", False),
base_url=d.get("base_url"),
description=d.get("description"),
config=LLMParams(
temperature=config.get("temperature", 0.7),
max_tokens=config.get("max_tokens", 1024),
top_p=config.get("top_p", 0.9),
repetition_penalty=config.get("repetition_penalty", 1.0),
),
created=d.get("created"),
updated=d.get("updated"),
)


def _to_stream_chunk(raw) -> StreamChunk:
"""Normalize a provider stream item to StreamChunk TypedDict.

Provider yields either:
- ``str`` (plain content from _generate_stream)
- ``dict`` with ``type``/``content``/``tool_calls`` (from _generate_stream_with_tools)
"""
if isinstance(raw, str):
return StreamChunk(type="content", content=raw)
# dict from generate_with_tools stream
return StreamChunk(
type=raw.get("type", "content"),
content=raw.get("content", ""),
tool_calls=raw.get("tool_calls", []),
)


# ---------------------------------------------------------------------------
# Facade
# ---------------------------------------------------------------------------


class LLMServiceImpl:
"""Facade implementing the LLMService Protocol.

Receives ``ServiceContainer`` to lazily access ``llm_service``
(the underlying CloudLLMService/VLLMLLMService, which may change
at runtime when InternetMonitor switches backends).
"""

def __init__(self, container: ServiceContainer) -> None:
self._container = container

# -- Generation -----------------------------------------------------------

async def generate(
self,
messages: list[dict[str, str]],
config: LLMConfig | None = None,
) -> str:
"""Single-shot generation. Returns the full assistant reply."""
llm = self._resolve_llm(config)
if not llm:
return ""

tools = None
if config and config.get("tools"):
tools = config["tools"]

result = await asyncio.to_thread(
llm.generate_response_from_messages, messages, False, *([tools] if tools else [])
)

# generate_with_tools may return dict (tool_calls message) — stringify
if isinstance(result, dict):
return result.get("content") or ""
return result or ""

async def stream(
self,
messages: list[dict[str, str]],
config: LLMConfig | None = None,
) -> AsyncIterator[StreamChunk]:
"""Streaming generation. Yields content/tool-call chunks."""
llm = self._resolve_llm(config)
if not llm:
return

tools = None
if config and config.get("tools"):
tools = config["tools"]

sync_gen = llm.generate_response_from_messages(messages, True, *([tools] if tools else []))

# Wrap sync generator in async iteration via thread
for chunk in await asyncio.to_thread(list, sync_gen):
yield _to_stream_chunk(chunk)

# -- Provider resolution --------------------------------------------------

async def resolve_backend(self, backend_id: str) -> ProviderInfo | None:
"""Look up a cloud provider by its ID."""
from modules.llm.service import cloud_provider_service

raw = await cloud_provider_service.get_provider(backend_id)
if raw is None:
return None
return _to_provider_info(raw)

async def list_providers(
self,
*,
enabled_only: bool = False,
workspace_id: int | None = None,
) -> list[ProviderInfo]:
"""List registered cloud LLM providers."""
from modules.llm.service import cloud_provider_service

raw = await cloud_provider_service.list_providers(
enabled_only=enabled_only, workspace_id=workspace_id
)
return [_to_provider_info(p) for p in raw]

# -- Internal helpers -----------------------------------------------------

def _resolve_llm(self, config: LLMConfig | None = None):
"""Get the active LLM service, applying config params if provided."""
llm = self._container.llm_service
if not llm:
return None

# Apply runtime params from config if provided
if config:
params = {}
for key in ("temperature", "max_tokens", "top_p", "repetition_penalty"):
if key in config:
params[key] = config[key]
if params and hasattr(llm, "set_params"):
llm.set_params(**params)

return llm


# Module-level singleton — NOT created here because it needs ServiceContainer.
# Created in startup and set via: llm_service_facade = LLMServiceImpl(container)
llm_service_facade: Optional[LLMServiceImpl] = None
8 changes: 8 additions & 0 deletions orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,14 @@ async def startup_event():
container.llm_service = llm_service
container.streaming_tts_manager = streaming_tts_manager

# LLM service facade (Phase 7.3)
from modules.llm import facade as _llm_facade_module
from modules.llm.facade import LLMServiceImpl

llm_facade = LLMServiceImpl(container)
container.llm_service_facade = llm_facade
_llm_facade_module.llm_service_facade = llm_facade

# Reload FAQ and voice presets from DB
from modules.knowledge.startup import reload_llm_faq
from modules.speech.startup import reload_voice_presets
Expand Down
148 changes: 148 additions & 0 deletions tests/unit/test_llm_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
"""Tests for LLMService facade.

Verifies that:
- LLMServiceImpl satisfies the Protocol structurally
- All Protocol methods exist with correct signatures
- Converter helpers produce correct TypedDict shapes
"""

import inspect

from modules.llm.facade import (
LLMServiceImpl,
_to_provider_info,
_to_stream_chunk,
)
from modules.llm.protocols import LLMService as LLMServiceProtocol


class TestLLMServiceProtocolCompliance:
"""Verify LLMServiceImpl matches the Protocol."""

def test_has_all_protocol_methods(self):
"""LLMServiceImpl must implement every method from the Protocol."""
protocol_methods = {
name
for name, _ in inspect.getmembers(LLMServiceProtocol, predicate=inspect.isfunction)
if not name.startswith("_")
}
impl_methods = {
name
for name in dir(LLMServiceImpl)
if not name.startswith("_") and callable(getattr(LLMServiceImpl, name))
}
missing = protocol_methods - impl_methods
assert not missing, f"LLMServiceImpl missing Protocol methods: {missing}"

def test_generate_signature(self):
sig = inspect.signature(LLMServiceImpl.generate)
params = list(sig.parameters.keys())
assert "self" in params
assert "messages" in params
assert "config" in params

def test_stream_signature(self):
sig = inspect.signature(LLMServiceImpl.stream)
params = list(sig.parameters.keys())
assert "self" in params
assert "messages" in params
assert "config" in params

def test_resolve_backend_signature(self):
sig = inspect.signature(LLMServiceImpl.resolve_backend)
params = list(sig.parameters.keys())
assert "self" in params
assert "backend_id" in params

def test_list_providers_signature(self):
sig = inspect.signature(LLMServiceImpl.list_providers)
params = list(sig.parameters.keys())
assert "self" in params
assert "enabled_only" in params
assert "workspace_id" in params


class TestConverters:
"""Test TypedDict converter helpers."""

def test_to_provider_info(self):
data = {
"id": "gemini-default",
"name": "Gemini Pro",
"provider_type": "gemini",
"model_name": "gemini-2.0-flash",
"enabled": True,
"is_default": True,
"base_url": None,
"description": "Default Gemini provider",
"config": {
"temperature": 0.7,
"max_tokens": 2048,
"top_p": 0.9,
},
"created": "2026-01-01T00:00:00",
"updated": "2026-01-02T00:00:00",
}
info = _to_provider_info(data)
assert info["id"] == "gemini-default"
assert info["name"] == "Gemini Pro"
assert info["provider_type"] == "gemini"
assert info["model_name"] == "gemini-2.0-flash"
assert info["enabled"] is True
assert info["is_default"] is True
assert info["config"]["temperature"] == 0.7
assert info["config"]["max_tokens"] == 2048

def test_to_provider_info_defaults(self):
data = {"id": "test", "name": "Test"}
info = _to_provider_info(data)
assert info["provider_type"] == "custom"
assert info["model_name"] == ""
assert info["enabled"] is True
assert info["is_default"] is False
assert info["base_url"] is None
assert info["description"] is None
assert info["config"]["temperature"] == 0.7
assert info["config"]["max_tokens"] == 1024

def test_to_provider_info_none_config(self):
data = {"id": "x", "name": "X", "config": None}
info = _to_provider_info(data)
assert info["config"]["temperature"] == 0.7

def test_to_stream_chunk_from_string(self):
chunk = _to_stream_chunk("Hello")
assert chunk["type"] == "content"
assert chunk["content"] == "Hello"

def test_to_stream_chunk_from_content_dict(self):
chunk = _to_stream_chunk({"type": "content", "content": "world"})
assert chunk["type"] == "content"
assert chunk["content"] == "world"

def test_to_stream_chunk_from_tool_calls_dict(self):
tool_calls = [
{
"id": "call_1",
"type": "function",
"function": {"name": "search", "arguments": '{"q": "test"}'},
}
]
chunk = _to_stream_chunk({"type": "tool_calls", "tool_calls": tool_calls})
assert chunk["type"] == "tool_calls"
assert len(chunk["tool_calls"]) == 1
assert chunk["tool_calls"][0]["function"]["name"] == "search"

def test_to_stream_chunk_from_empty_dict(self):
chunk = _to_stream_chunk({})
assert chunk["type"] == "content"
assert chunk["content"] == ""


class TestModuleSingleton:
"""Verify module-level singleton behavior."""

def test_singleton_initially_none(self):
from modules.llm.facade import llm_service_facade

assert llm_service_facade is None or isinstance(llm_service_facade, LLMServiceImpl)
Loading