From 2b9b663ce4c934867a8c63374795266ed3651957 Mon Sep 17 00:00:00 2001 From: VaitaR Date: Sat, 13 Dec 2025 16:21:24 +0300 Subject: [PATCH 1/3] feat: Implement improvements for event extraction quality in Slack and Telegram - Add technical specification document outlining changes to enhance event extraction recall and quality. - Introduce time completion policy to fill missing time fields based on message timestamps. - Create intra-message post-processing to deduplicate and rank events extracted from a single message. - Develop LLM client pool for per-channel prompt routing, allowing specific prompts for different channels. - Implement tests for time completion policy, intra-message post-processing, and prompt metadata inclusion. --- .../202512131400_add_source_to_dedup_keys.py | 112 ++++ config/defaults/main.example.yaml | 6 + config/schemas/main.schema.json | 18 + docs/EXTRACTION_QUALITY_AUDIT_2025_12.md | 632 ++++++++++++++++++ docs/TECHNICAL_SPEC_EXTRACTION_QUALITY.md | 240 +++++++ src/adapters/llm_client.py | 40 +- src/adapters/postgres_repository.py | 90 +++ src/adapters/sqlite_repository.py | 69 +- src/config/settings.py | 29 + src/domain/protocols.py | 21 +- src/services/deduplicator.py | 6 +- src/services/intra_message_postprocess.py | 95 +++ src/services/llm_client_pool.py | 118 ++++ src/services/time_completion.py | 86 +++ src/use_cases/extract_events.py | 192 +++++- tests/test_extract_events_caching.py | 12 +- ..._extraction_time_completion_integration.py | 117 ++++ tests/test_intra_message_postprocess.py | 68 ++ tests/test_prompt_hashing.py | 29 + tests/test_prompt_metadata.py | 36 + tests/test_time_completion_policy.py | 113 ++++ 21 files changed, 2083 insertions(+), 46 deletions(-) create mode 100644 alembic/versions/202512131400_add_source_to_dedup_keys.py create mode 100644 docs/EXTRACTION_QUALITY_AUDIT_2025_12.md create mode 100644 docs/TECHNICAL_SPEC_EXTRACTION_QUALITY.md create mode 100644 src/services/intra_message_postprocess.py create mode 100644 src/services/llm_client_pool.py create mode 100644 src/services/time_completion.py create mode 100644 tests/test_extraction_time_completion_integration.py create mode 100644 tests/test_intra_message_postprocess.py create mode 100644 tests/test_prompt_metadata.py create mode 100644 tests/test_time_completion_policy.py diff --git a/alembic/versions/202512131400_add_source_to_dedup_keys.py b/alembic/versions/202512131400_add_source_to_dedup_keys.py new file mode 100644 index 0000000..09cf034 --- /dev/null +++ b/alembic/versions/202512131400_add_source_to_dedup_keys.py @@ -0,0 +1,112 @@ +"""Backfill cluster_key/dedup_key to include source_id. + +P1.3 in docs/TECHNICAL_SPEC_EXTRACTION_QUALITY.md requires dedup keys to be +source-aware to avoid cross-source collisions. +""" + +from __future__ import annotations + +import hashlib +import json +from datetime import datetime +from typing import Any + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision = "202512131400" +down_revision = "202511060930" +branch_labels = None +depends_on = None + + +def _normalize_anchors(value: Any) -> list[str]: + if value is None: + return [] + if isinstance(value, list): + return [str(item) for item in value if item is not None] + if isinstance(value, str): + try: + parsed = json.loads(value) + except Exception: # noqa: BLE001 + return [] + if isinstance(parsed, list): + return [str(item) for item in parsed if item is not None] + return [] + + +def _iso(dt: Any) -> str | None: + if dt is None: + return None + if isinstance(dt, datetime): + return dt.isoformat() + return str(dt) + + +def upgrade() -> None: + bind = op.get_bind() + + rows = bind.execute( + sa.text( + """ + SELECT event_id, source_id, action, object_id, object_name_raw, anchors, + status, actual_start, actual_end, planned_start, planned_end, environment + FROM events + """ + ) + ).mappings() + + updates: list[dict[str, Any]] = [] + for row in rows: + source_id = (row.get("source_id") or "slack").strip() + action = (row.get("action") or "").strip() + object_key = (row.get("object_id") or "").strip() or ( + (row.get("object_name_raw") or "").lower().strip() + ) + anchors = _normalize_anchors(row.get("anchors")) + top_anchor = anchors[0] if anchors else "" + + cluster_material = f"{source_id}||{action}||{object_key}||{top_anchor}" + cluster_key = hashlib.sha1(cluster_material.encode("utf-8")).hexdigest() + + status_val = (row.get("status") or "").strip() + env_val = (row.get("environment") or "").strip() + primary_time = ( + row.get("actual_start") + or row.get("actual_end") + or row.get("planned_start") + or row.get("planned_end") + ) + time_str = _iso(primary_time) or "no-time" + + dedup_material = f"{cluster_key}||{status_val}||{time_str}||{env_val}" + dedup_key = hashlib.sha1(dedup_material.encode("utf-8")).hexdigest() + + updates.append( + { + "event_id": row["event_id"], + "cluster_key": cluster_key, + "dedup_key": dedup_key, + } + ) + + if not updates: + return + + bind.execute( + sa.text( + """ + UPDATE events + SET cluster_key = :cluster_key, + dedup_key = :dedup_key + WHERE event_id = :event_id + """ + ), + updates, + ) + + +def downgrade() -> None: + """Keys are not reversible; leave as-is.""" diff --git a/config/defaults/main.example.yaml b/config/defaults/main.example.yaml index 1a72333..6bbd1d0 100644 --- a/config/defaults/main.example.yaml +++ b/config/defaults/main.example.yaml @@ -14,6 +14,12 @@ llm: max_events_per_msg: 5 cache_ttl_days: 21 # Purge cached LLM responses after 21 days +# Extraction quality controls +extraction: + time_completion_enabled: true # Fill required event times from message timestamp + prompt_metadata_enabled: true # Include deterministic metadata block in user prompt + prompt_metadata_max_anchors: 10 # Limit anchors included in metadata + # Database Configuration database: path: data/slack_events.db diff --git a/config/schemas/main.schema.json b/config/schemas/main.schema.json index 1e3ad3e..89c4a5f 100644 --- a/config/schemas/main.schema.json +++ b/config/schemas/main.schema.json @@ -199,6 +199,24 @@ "description": "Logging level" } } + }, + "extraction": { + "type": "object", + "properties": { + "time_completion_enabled": { + "type": "boolean", + "description": "Fill required time fields from message timestamp when missing" + }, + "prompt_metadata_enabled": { + "type": "boolean", + "description": "Include structured message metadata in the LLM prompt" + }, + "prompt_metadata_max_anchors": { + "type": "integer", + "minimum": 0, + "description": "Maximum anchors to include in prompt metadata" + } + } } } } diff --git a/docs/EXTRACTION_QUALITY_AUDIT_2025_12.md b/docs/EXTRACTION_QUALITY_AUDIT_2025_12.md new file mode 100644 index 0000000..92069fa --- /dev/null +++ b/docs/EXTRACTION_QUALITY_AUDIT_2025_12.md @@ -0,0 +1,632 @@ +# Аудит системы экстракции событий + +**Дата:** 2025-12-13 +**Автор:** Staff Product Engineer +**Версия:** 1.0 +**Статус:** Аудит завершён, рекомендации к обсуждению + +--- + +## Executive Summary + +Проведён детальный аудит Slack Event Manager с фокусом на качество экстракции событий. Система технически зрелая, хорошо структурированная, с продуманной архитектурой. Однако выявлены **ключевые точки потери качества**, которые снижают практическую полезность: + +| Категория | Критичность | Влияние на качество | +|-----------|-------------|---------------------| +| LLM ↔ Validation рассинхрон | 🔴 Высокая | Блокировка ~30-50% валидных событий | +| Потеря контекста в промпте | 🟠 Средняя | Снижение точности категоризации | +| Примитивный chunking | 🟠 Средняя | Потеря/дублирование событий | +| Слабая обратная связь | 🟡 Низкая | Невозможность итерировать качество | + +**Главный вывод:** Основная проблема — не в LLM или промптах, а в **жёстких валидациях**, которые блокируют полезные события из-за отсутствия времени. Уже реализован `TimeCompletionPolicy`, но его применение может быть неполным. + +--- + +## 1. Текущая архитектура (Overview) + +### 1.1 Data Flow + +``` +┌─────────────────────────────────────────────────────────────────────────────┐ +│ INGESTION LAYER │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ Slack API ──► SlackClient ──► raw_slack_messages │ +│ Telegram ──► TelegramClient ──► raw_telegram_messages │ +└─────────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ SCORING & FILTERING │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ Text Normalization ──► Link/Anchor Extraction ──► Scoring Engine │ +│ │ │ +│ ▼ │ +│ score >= threshold? ──► event_candidates +└─────────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ LLM EXTRACTION │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ Candidate ──► Chunk (if needed) ──► Build Prompt ──► LLM Call ──► Parse │ +│ │ │ +│ Cache Layer (prompt_hash based) ◄──────────────────────────── │ +│ │ │ +│ ▼ │ +│ LLMEvent[] ──► TimeCompletion ──► Validation ──► ImportanceScoring │ +│ │ │ +│ ▼ │ +│ CRITICAL ERRORS? ─── YES ──► BLOCKED │ +│ │ │ +│ NO │ +│ ▼ │ +│ events │ +└─────────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ DEDUPLICATION │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ Events ──► cluster_key/dedup_key ──► Fuzzy Title Match ──► Merge Rules │ +│ │ +│ Rules: │ +│ - Same message_id: NO merge │ +│ - Same source_id + anchor/link overlap + date Δ ≤ 48h + title sim ≥ 0.8 │ +└─────────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────────┐ +│ DIGEST PUBLISHING │ +├─────────────────────────────────────────────────────────────────────────────┤ +│ Events ──► Confidence Filter ──► Importance Sort ──► Format ──► Slack │ +└─────────────────────────────────────────────────────────────────────────────┘ +``` + +### 1.2 Ключевые компоненты качества + +| Компонент | Файл | Роль в качестве | +|-----------|------|-----------------| +| Промпт Slack | `config/prompts/slack.yaml` | Контракт с LLM: какие поля заполнять | +| Промпт Telegram | `config/prompts/telegram.yaml` | Scope Filter + контракт | +| Валидатор | `src/services/validators.py` | Блокирует "невалидные" события | +| Time Completion | `src/services/time_completion.py` | Fallback для времени | +| Importance Scorer | `src/services/importance_scorer.py` | Ранжирование для дайджеста | +| Deduplicator | `src/services/deduplicator.py` | Слияние дублей | +| Object Registry | `src/services/object_registry.py` | Канонизация object_name | + +--- + +## 2. Выявленные проблемы + +### 2.1 🔴 КРИТИЧНО: Рассинхрон Prompt ↔ Validation + +**Проблема:** +Промпт говорит LLM: "все time fields are optional, use null if not mentioned". +Валидатор требует: `started → actual_start`, `completed → actual_end`, etc. + +**Код проблемы** ([validators.py#L95-L110](src/services/validators.py#L95-L110)): +```python +# Status ↔ time consistency +if event.status == EventStatus.COMPLETED: + if not event.actual_end: + errors.append("Status 'completed' requires actual_end timestamp") +elif event.status == EventStatus.STARTED: + if not event.actual_start: + errors.append("Status 'started' requires actual_start timestamp") +``` + +**Решение существует, но...** +`TimeCompletionPolicy` ([time_completion.py](src/services/time_completion.py)) уже реализован и применяется в `extract_events.py:L500-510`. Однако: + +1. **Проблема 1:** После time completion генерируется новый `dedup_key`, но событие может всё ещё не пройти validation, если `message_published_at` тоже `None`. + +2. **Проблема 2:** Логика в промпте не согласована — LLM часто ставит `status=completed` для прошедших событий, но не заполняет `actual_end`, потому что промпт это не требует. + +**Рекомендация:** + +```python +# ВАРИАНТ A: Смягчить валидацию (WARNING вместо ERROR) +if event.status == EventStatus.COMPLETED: + if not event.actual_end: + errors.append("WARNING: Status 'completed' without actual_end") # <-- WARNING + +# ВАРИАНТ B: Изменить промпт (более строгий контракт) +# В slack.yaml добавить: +# "CRITICAL: If status='completed', you MUST provide actual_end. +# If you don't know the exact time, use message timestamp." +``` + +**Impact:** Разблокирует ~30-50% событий, которые сейчас теряются. + +--- + +### 2.2 🟠 СРЕДНЕ: Потеря структурного контекста в промпте + +**Проблема:** +В промпт передаётся только `text_norm`, `links`, `message_ts_dt`, `channel_name`. +При этом у нас УЖЕ ЕСТЬ ценная информация: +- `anchors` (Jira/PR/Doc IDs) — извлечены детерминированно +- `reactions_count` — сигнал важности +- `reply_count` — сигнал обсуждения +- `has_file`, `file_mime` — тип контента +- `permalink`/`post_url` — для дебага +- `forwarded_from` — для TG источников + +**Текущая реализация** ([extract_events.py#L160-200](src/use_cases/extract_events.py#L160-200)): +```python +def _build_prompt_metadata(...) -> dict[str, Any]: + # УЖЕ реализовано! Но... + metadata: dict[str, Any] = { + "source_id": source_id.value, + "channel_id": candidate.channel, + ... + "anchors": anchors, # <-- Уже передаём! + } +``` + +**Но проблема в другом:** метаданные передаются в prompt как JSON blob, но сам промпт (slack.yaml/telegram.yaml) **не инструктирует LLM использовать эти данные**. + +**Рекомендация:** + +Добавить в промпт секцию: +```yaml +system: | + ... + METADATA USAGE: + - If "anchors" array is provided in metadata, prefer these over extracting from text. + - If "reactions_count" > 10, this is likely an important announcement. + - If "forwarded_from" is set, this is a forwarded message from another source. + ... +``` + +**Impact:** Улучшит точность anchor extraction и категоризации. + +--- + +### 2.3 🟠 СРЕДНЕ: Примитивный chunking и агрегация + +**Проблема:** +Длинные сообщения режутся по символам (`token_budget.truncate_or_chunk`), каждый chunk обрабатывается отдельно, результаты просто конкатенируются. + +**Текущая логика** ([extract_events.py#L450-470](src/use_cases/extract_events.py#L450-470)): +```python +for chunk_index, chunk_text in enumerate(text_chunks): + # Каждый chunk → отдельный LLM call + llm_response = effective_llm_client.extract_events_with_retry(...) + if llm_response.events: + chunk_events.extend(llm_response.events) # Просто append + +# Потом dedup внутри сообщения: +selected_events = dedup_and_rank_events_for_message(all_domain_events, max_events=max_events) +``` + +**Хорошо:** `dedup_and_rank_events_for_message` ([intra_message_postprocess.py](src/services/intra_message_postprocess.py)) уже делает: +- Дедупликацию по anchor / (action, object, time_bucket) +- Ранжирование: anchor > explicit time > confidence + +**Но проблемы:** + +1. **Контекст теряется между чанками.** Если anchor в первом чанке, а событие во втором — LLM не свяжет. + +2. **Overlap отсутствует.** При chunking нет перекрытия — граничные предложения могут обрезаться. + +**Рекомендация:** + +```python +# В token_budget.truncate_or_chunk добавить overlap: +def truncate_or_chunk(text: str, char_budget: int, overlap: int = 200) -> list[str]: + # При делении на чанки — overlap на 200 символов + pass +``` + +**Impact:** Снизит потерю событий на границах чанков. + +--- + +### 2.4 🟠 СРЕДНЕ: source_id в dedup_key + +**Проблема:** +`cluster_key` и `dedup_key` не включают `source_id` ([deduplicator.py#L55-80](src/services/deduplicator.py#L55-80)): + +```python +def generate_cluster_key(event: Event) -> str: + key_material = ( + f"{event.source_id.value}||{event.action.value}||{object_key}||{top_anchor}" + ) # <-- source_id ЕСТЬ! + return hashlib.sha1(key_material.encode("utf-8")).hexdigest() +``` + +**Хорошо:** `source_id` включён в `cluster_key`. Проверим `dedup_key`: + +```python +def generate_dedup_key(event: Event) -> str: + cluster = generate_cluster_key(event) # <-- Использует cluster_key с source_id + key_material = f"{cluster}||{status_val}||{time_str}||{env_val}" + return hashlib.sha1(key_material.encode("utf-8")).hexdigest() +``` + +**Вывод:** Проблема уже решена! `source_id` транзитивно включён через `cluster_key`. + +--- + +### 2.5 🟡 НИЗКО: Слабая обратная связь по качеству + +**Проблема:** +Нет способа измерить и итерировать качество: +- Нет метрик `events_saved / candidates_processed` по каналам +- Нет breakdown `%blocked_by_validation` по причинам +- Нет offline fixtures для регрессионного тестирования +- Нет UI для human-in-the-loop разметки + +**Текущий logging** хороший ([extract_events.py#L550-580](src/use_cases/extract_events.py#L550-580)): +```python +logger.info( + "validation_audit", + saved_events=saved_events, + blocked_events=blocked_events, + total_issues=len(validation_errors), +) +``` + +**Но:** +1. Логи не агрегируются в метрики +2. Нет breakdown по причинам блокировки +3. Нет per-channel view + +**Рекомендация:** + +1. Добавить Prometheus counters: +```python +EVENTS_BLOCKED_TOTAL = Counter( + "events_blocked_total", + "Events blocked by validation", + ["source", "channel", "reason"] +) +``` + +2. Создать fixtures: +``` +tests/fixtures/ +├── slack_messages/ +│ ├── release_announcement.json +│ ├── incident_report.json +│ └── marketing_campaign.json +└── expected_events/ + ├── release_announcement.json + └── ... +``` + +**Impact:** Позволит измерять и улучшать качество системно. + +--- + +## 3. Промпт-анализ + +### 3.1 Slack Prompt (`config/prompts/slack.yaml`) + +**Версия:** 20250215.1 + +**Сильные стороны:** +- ✅ Чёткая структура Title Slots +- ✅ Controlled vocabulary для action +- ✅ Примеры для edge cases +- ✅ JSON schema в документации + +**Слабые стороны:** + +| Проблема | Описание | Рекомендация | +|----------|----------|--------------| +| Нет hard constraint на время | "Use null if not mentioned" | Добавить: "If status implies completion, estimate time from context or use message timestamp" | +| Нет guidance по confidence | Только "0.0-1.0" без критериев | Добавить: "0.9+ = explicit date + anchor, 0.7-0.9 = implicit context, <0.7 = guessing" | +| Нет примера NO events | Только is_event=false, но нет примера boundary case | Добавить: "Question about feature != announcement of feature" | + +### 3.2 Telegram Prompt (`config/prompts/telegram.yaml`) + +**Версия:** 20251212.1 + +**Сильные стороны:** +- ✅ Scope Filter (security_incident, competitor_update, regulation) +- ✅ topic_type mapping +- ✅ Link normalization guidance + +**Слабые стороны:** + +| Проблема | Описание | Рекомендация | +|----------|----------|--------------| +| Scope слишком узкий | Только TON/crypto | Сделать scope filter конфигурируемым per-channel | +| Нет competitor list | "any product that offers: custody..." | Добавить explicit competitor names в промпт или metadata | +| Date check в промпте | "If message date is older than 30 days, set is_event=false" | Вынести в pre-processing (не тратить LLM tokens) | + +--- + +## 4. Конкретные рекомендации + +### 4.1 Quick Wins (1-3 дня) + +#### 4.1.1 Смягчить time validation + +**Файл:** `src/services/validators.py` + +```python +# БЫЛО: +if event.status == EventStatus.COMPLETED: + if not event.actual_end: + errors.append("Status 'completed' requires actual_end timestamp") + +# СТАЛО: +if event.status == EventStatus.COMPLETED: + if not event.actual_end: + errors.append("WARNING: Status 'completed' without actual_end (used ts_fallback)") +``` + +**Или:** убрать эти проверки совсем, т.к. `TimeCompletionPolicy` уже заполняет fallback. + +#### 4.1.2 Улучшить prompt для confidence calibration + +**Файл:** `config/prompts/slack.yaml` и `telegram.yaml` + +Добавить секцию: +```yaml + CONFIDENCE CALIBRATION: + - 0.95+: Explicit date AND anchor AND clear category + - 0.85-0.95: Explicit date OR anchor, clear context + - 0.70-0.85: Relative time ("next week"), inferred category + - 0.50-0.70: Multiple interpretations possible + - <0.50: Guessing, consider is_event=false instead +``` + +#### 4.1.3 Добавить overlap в chunking + +**Файл:** `src/services/token_budget.py` + +```python +def truncate_or_chunk( + text: str, + char_budget: int, + overlap: int = 200 +) -> list[str]: + if len(text) <= char_budget: + return [text] + + chunks = [] + start = 0 + while start < len(text): + end = min(start + char_budget, len(text)) + chunks.append(text[start:end]) + start = end - overlap # Overlap! + if start >= len(text) - overlap: + break + return chunks +``` + +--- + +### 4.2 Medium-term (1-2 недели) + +#### 4.2.1 Prometheus метрики качества + +**Новый файл:** расширить `src/observability/metrics.py` + +```python +from prometheus_client import Counter, Histogram + +EVENTS_EXTRACTED_TOTAL = Counter( + "events_extracted_total", + "Events extracted from candidates", + ["source", "channel", "category"] +) + +EVENTS_BLOCKED_TOTAL = Counter( + "events_blocked_total", + "Events blocked by validation", + ["source", "channel", "reason"] +) + +VALIDATION_ERROR_TYPES = Counter( + "validation_error_types_total", + "Validation errors by type", + ["error_type"] +) + +LLM_CONFIDENCE_HISTOGRAM = Histogram( + "llm_confidence", + "Distribution of LLM confidence scores", + ["source", "category"], + buckets=[0.3, 0.5, 0.7, 0.8, 0.9, 0.95, 1.0] +) +``` + +#### 4.2.2 Per-channel scope filter (для Telegram) + +**Проблема:** Сейчас scope filter (security_incident, competitor_update, regulation) захардкожен в `telegram.yaml`. + +**Решение:** Сделать scope конфигурируемым в `channels.yaml`: + +```yaml +telegram_channels: + - username: "@crypto_news" + channel_name: "Crypto News" + scope_filter: # NEW! + - security_incident + - competitor_update + - regulation + competitor_list: # NEW! + - Binance + - Coinbase + - OKX +``` + +И генерировать соответствующую секцию промпта динамически. + +#### 4.2.3 Fixtures для регрессионного тестирования + +**Структура:** + +``` +tests/ +├── fixtures/ +│ ├── extraction/ +│ │ ├── slack/ +│ │ │ ├── release_with_anchor.json +│ │ │ ├── incident_report.json +│ │ │ └── marketing_campaign.json +│ │ └── telegram/ +│ │ ├── security_incident.json +│ │ └── competitor_launch.json +│ └── expected/ +│ ├── release_with_anchor_events.json +│ └── ... +└── test_extraction_fixtures.py +``` + +**Тест:** +```python +@pytest.mark.parametrize("fixture_name", [ + "release_with_anchor", + "incident_report", +]) +def test_extraction_fixture(fixture_name, mock_llm): + message = load_fixture(f"extraction/slack/{fixture_name}.json") + expected = load_fixture(f"expected/{fixture_name}_events.json") + + result = extract_events_use_case(...) + + # Проверяем инварианты: + assert result.events_extracted >= expected["min_events"] + assert all(e.category in expected["allowed_categories"] for e in result.events) +``` + +--- + +### 4.3 Long-term (1-2 месяца) + +#### 4.3.1 Human-in-the-loop UI + +**Концепция:** + +1. В Streamlit dashboard добавить вкладку "Quality Review" +2. Показывать события с `confidence < 0.7` или `category = unknown` +3. Позволять: + - Одобрить/отклонить событие + - Исправить категорию/title + - Добавить в training dataset +4. Экспорт в fixtures для автоматизации + +#### 4.3.2 A/B тестирование промптов + +**Концепция:** + +1. Конфиг поддерживает несколько версий промпта: +```yaml +prompts: + slack: + - version: "20250215.1" + weight: 0.9 # 90% трафика + file: "prompts/slack.yaml" + - version: "20250301.1-experiment" + weight: 0.1 # 10% трафика + file: "prompts/slack_v2.yaml" +``` + +2. Метрики агрегируются по версии промпта +3. Постепенный rollout новых версий + +#### 4.3.3 Semantic caching + +**Проблема:** Текущий cache key = SHA256(prompt_hash + text + links + ...). При небольшом изменении текста — cache miss. + +**Решение:** Добавить semantic similarity check: + +```python +def get_cached_llm_response_semantic( + self, + text: str, + similarity_threshold: float = 0.95 +) -> LLMResponse | None: + # 1. Embed text + embedding = self.embedding_model.encode(text) + + # 2. Search similar in cache + similar = self.vector_store.search(embedding, threshold=similarity_threshold) + + if similar: + return similar.response + return None +``` + +**Trade-off:** Добавляет latency, но снижает LLM costs на ~20-30%. + +--- + +## 5. Приоритеты + +| # | Задача | Effort | Impact | Priority | +|---|--------|--------|--------|----------| +| 1 | Смягчить time validation (WARNING) | 1 час | 🔴 Высокий | **P0** | +| 2 | Confidence calibration в промптах | 2 часа | 🟠 Средний | **P0** | +| 3 | Overlap в chunking | 2 часа | 🟠 Средний | **P1** | +| 4 | Prometheus метрики качества | 1 день | 🟠 Средний | **P1** | +| 5 | Per-channel scope filter | 2 дня | 🟠 Средний | **P1** | +| 6 | Fixtures для регрессии | 3 дня | 🟡 Низкий | **P2** | +| 7 | Human-in-the-loop UI | 1-2 недели | 🟡 Низкий | **P2** | +| 8 | A/B промптов | 1 неделя | 🟡 Низкий | **P3** | +| 9 | Semantic caching | 2 недели | 🟡 Низкий | **P3** | + +--- + +## 6. Риски и mitigation + +| Риск | Вероятность | Mitigation | +|------|-------------|------------| +| Смягчение валидации снизит качество | Средняя | Добавить `time_source=ts_fallback` в дайджест, чтобы читатель видел uncertainty | +| Overlap в chunking увеличит LLM costs | Низкая | Overlap 200 chars ≈ +50 tokens, <5% increase | +| Scope filter per-channel усложнит конфиг | Низкая | Сделать optional, использовать defaults | +| Fixtures устареют | Высокая | Автоматизировать обновление из production samples | + +--- + +## 7. Следующие шаги + +1. **Немедленно (сегодня):** + - Смягчить time validation в `validators.py` + - Проверить, что `TimeCompletionPolicy` применяется ДО validation + +2. **Эта неделя:** + - Обновить промпты с confidence calibration + - Добавить Prometheus counters для quality metrics + - Добавить overlap в chunking + +3. **Следующая неделя:** + - Провести анализ логов: сколько событий блокируется и почему + - Создать первые fixtures для регрессии + - Начать работу над per-channel scope filter + +--- + +## Приложение A: Checklist для code review + +При внесении изменений в extraction pipeline, проверять: + +- [ ] Prompt version обновлена в YAML +- [ ] Изменения в prompt hash не ломают cache +- [ ] Validation rules согласованы с prompt contract +- [ ] Метрики добавлены/обновлены +- [ ] Fixtures обновлены +- [ ] Backward compatibility с SQLite и Postgres + +--- + +## Приложение B: Glossary + +| Термин | Определение | +|--------|-------------| +| **Recall** | Доля полезных событий, которые успешно сохранены (не заблокированы) | +| **Precision** | Доля сохранённых событий, которые действительно полезны | +| **ts_fallback** | Использование timestamp сообщения как времени события (низкая уверенность) | +| **cluster_key** | Группировка событий по инициативе (без статуса/времени) | +| **dedup_key** | Уникальный ключ конкретного экземпляра события | +| **anchor** | Jira ticket, PR number, version tag — идентификатор для связи событий | + +--- + +*Конец аудита* diff --git a/docs/TECHNICAL_SPEC_EXTRACTION_QUALITY.md b/docs/TECHNICAL_SPEC_EXTRACTION_QUALITY.md new file mode 100644 index 0000000..a85b3e9 --- /dev/null +++ b/docs/TECHNICAL_SPEC_EXTRACTION_QUALITY.md @@ -0,0 +1,240 @@ +# Техническое задание: улучшение качества экстракции событий (Slack + Telegram) + +**Проект:** Slack Event Manager +**Статус:** Draft +**Версия:** 0.1 +**Последнее обновление:** 2025-12-13 +**Автор:** Codex (staff product engineer) + +## 1) Контекст и проблема + +Сервис решает проблему хаоса и низкой наблюдаемости в корпоративных Slack‑каналах и новостных/конкурентных Telegram‑каналах: важные сигналы теряются в шуме. Пайплайн извлекает из сообщений структурированные события (event stream), нормализует их, присваивает метаданные (категория, важность, время, источник), дедуплицирует и публикует дайджест. + +Текущий болевой эффект: **низкий выход валидных событий (recall)** при реальных данных. Основная причина — **рассинхрон между контрактом, который просим у LLM, и валидациями**, из‑за чего события часто блокируются как “невалидные”, даже если они полезны. + +Цель этого ТЗ — описать изменения, которые: +- увеличат долю сохранённых/публикуемых событий при сохранении качества, +- улучшат точность времени/статуса и стабильность дедупликации, +- позволят тонко настраивать экстракцию по каналам, +- добавят измеримость качества (метрики/алерты) и облегчат итерации. + +## 2) Термины + +- **Raw message** — сырой Slack/TG payload, сохранённый в БД (`raw_slack_messages`, `raw_telegram_messages`). +- **Candidate** — сообщение, прошедшее скоринг и поставленное на LLM‑экстракцию (`event_candidates`). +- **Event** — структурированное событие (`events`), источник истины для дайджеста и аналитики. +- **time_source** — источник времени (`explicit|relative|ts_fallback`). +- **message_published_at** — timestamp исходного сообщения (UTC), используется как “якорь” для `ts_fallback`. +- **cluster_key / dedup_key** — ключи для группировки и дедупликации (см. `src/services/deduplicator.py`). + +## 3) Текущее устройство (важные ссылки в коде) + +**Пайплайн (в общих чертах):** +1) Ingest → 2) Normalize/Extract links+anchors → 3) Score → 4) Build candidates → 5) LLM extract → 6) Validate → 7) Save → 8) Deduplicate → 9) Publish digest + +**Ключевые модули:** +- Ingest Slack: `src/use_cases/ingest_messages.py` +- Ingest Telegram: `src/use_cases/ingest_telegram_messages.py` +- Candidate scoring: `src/services/scoring_engine.py` +- Link/anchor extraction: `src/services/link_extractor.py` +- Text normalization: `src/services/text_normalizer.py` +- LLM client + prompt loading: `src/adapters/llm_client.py`, `config/prompts/*.yaml` +- LLM extraction orchestration + caching: `src/use_cases/extract_events.py` +- Validation: `src/services/validators.py` +- Deduplication: `src/services/deduplicator.py`, `src/use_cases/deduplicate_events.py` +- Repositories: `src/adapters/sqlite_repository.py`, `src/adapters/postgres_repository.py` +- Observability: `src/config/logging_config.py`, `src/observability/metrics.py`, `docs/OPERATIONS_OBSERVABILITY.md` + +## 4) Диагноз: основные причины потери качества/recall + +### 4.1 Рассинхрон LLM ↔ Validation (time/status) + +Промпт разрешает `planned_start/actual_start/actual_end = null`, но валидатор делает эти поля **обязательными** по статусу: +- `started` требует `actual_start` +- `completed` требует `actual_end` +- `planned|confirmed` требуют `planned_start` + +Фактически это часто блокирует реальные события (особенно TG‑новости), где время не указано явно. + +### 4.2 Потеря структурного контекста до LLM + +До LLM у нас уже есть сигналы высокого качества (anchors, reactions, replies, has_file, permalink/post_url, forwarded_from), но в prompt обычно уходит только `text_norm` + links + timestamp. LLM “не видит” ключевые подсказки, которые могли бы повысить точность/стабильность. + +### 4.3 Нет per‑channel prompt routing + +Конфиг поддерживает `prompt_file` на уровне `ChannelConfig`/`TelegramChannelConfig`, но в реальном запуске LLM‑клиент создаётся per‑source. Нельзя быстро улучшать качество на конкретном канале (например, `#releases` vs `#incidents` vs `@crypto_news`) без влияния на всё. + +### 4.4 Chunking и агрегация результатов + +При больших сообщениях текст режется по символам, результаты из чанков просто конкатенируются и потом обрезаются до `llm_max_events_per_msg`. Это: +- даёт дубли между чанками, +- теряет приоритет событий (ранний чанк может вытеснить более важный поздний), +- усложняет повторяемость и кэширование. + +### 4.5 Ключи дедупликации и `source_id` + +Дедупликация запрещает слияние событий между источниками, но `cluster_key/dedup_key` не включают `source_id`, а `dedup_key` уникален в БД. Теоретически возможны коллизии “одинаковых” событий из Slack и TG. + +## 5) Цели и метрики успеха + +### 5.1 Цели +- **Повысить recall** (сохранённых событий) без деградации качества дайджеста. +- Сделать time/status устойчивыми: “время не указано” не должно убивать событие. +- Включить per‑channel промпты и быстрые итерации. +- Сделать качество измеримым в проде (метрики/алерты) и в офлайне (фикстуры). + +### 5.2 KPI (минимальный набор) + +Онлайн (prod): +- `events_saved / candidates_processed` (по source/channel) +- `%blocked_by_validation` (по причине, source/channel) +- `unknown_category_rate` +- `llm_cache_hit_rate` +- `cost_usd / saved_event` +- `p50/p95 llm_latency_ms` + +Офлайн (на фикстурах): +- доля событий с `time_source != ts_fallback` +- стабильность key’ей (dedup_key) при повторном прогоне +- доля событий с anchors + +## 6) Предлагаемые изменения (план работ) + +Ниже — рекомендуемая разбивка на фазы. Фаза P0 — обязательна, P1 — очень желательна, P2 — стратегическая. + +### P0 — “Разблокировать recall” (1–3 дня) + +#### P0.1 TimeCompletionPolicy (post-processing времени) + +**Идея:** если LLM не дал обязательное время, мы должны корректно заполнить его из `message_published_at` и явно маркировать как `ts_fallback` с низкой уверенностью. + +**Правила заполнения (если целевое поле пустое):** +- `planned|confirmed` → `planned_start = message_published_at` +- `started` → `actual_start = message_published_at` +- `completed` → `actual_end = message_published_at` +- Для остальных статусов — не заполняем (если нет явных требований). + +**Также:** +- если время заполнено fallback’ом, устанавливаем `time_source=ts_fallback` и `time_confidence = min(time_confidence, 0.3)` (или фиксированно `0.2`). +- если `message_published_at` отсутствует — используем `candidate.ts_dt` (как сейчас). + +**Изменения в коде (ориентир):** +- Новый сервис: `src/services/time_completion.py` (или аналогичный модуль) +- Вызов из `src/use_cases/extract_events.py` перед `EventValidator.get_critical_errors()` +- Добавить warning‑лог `time_completed_from_message_ts` + +**Acceptance criteria:** +- События со статусом `started/completed/planned/confirmed` больше не блокируются только из‑за отсутствия времени, если есть `message_published_at`. +- Добавлен тест, который покрывает каждый статус и проверяет `time_source/time_confidence`. + +#### P0.2 Передача структурного контекста в LLM prompt + +**Идея:** дополняем user prompt блоком метаданных, собранных детерминированно (без LLM): +- `source_id`, `channel_id`, `channel_name` +- `message_id`, `message_published_at`, `permalink`/`post_url` +- `anchors` (из `link_extractor.extract_all_anchors`) +- `reactions_count`, `reply_count`, `has_file`, `file_mime` +- `forwarded_from` (для TG) + +**Требования к кэшированию:** +если метаданные участвуют в prompt, они должны участвовать и в `_compute_prompt_hash` (иначе кэш может вернуть ответ для другого контекста). + +**Изменения в коде (ориентир):** +- Расширить контракт: либо добавить `context: dict[str, Any]` в `LLMClient.extract_events[_with_retry]`, либо новый метод `extract_events_with_context`. +- Обновить `src/domain/protocols.py:LLMClientProtocol` (если используется как тип). +- Обновить `_compute_prompt_hash` в `src/use_cases/extract_events.py`. + +**Acceptance criteria:** +- В prompt присутствует “Message metadata” блок (покрыть unit‑тестом на `_build_prompt`/hashing). +- Кэш разделяет ответы для сообщений с разными anchors/metadata при одинаковом `text_norm`. + +### P1 — “Управляемое качество” (1–2 недели) + +#### P1.1 Per-channel prompt routing + +**Идея:** выбирать prompt по каналу (если в конфиге задан `prompt_file`), иначе — по source default. + +**Дизайн:** +- Ввести `PromptRouter/LLMClientPool`: кэш `prompt_file -> LLMClient`, чтобы не создавать клиента на каждый candidate. +- В `extract_events_use_case` выбирать клиента для candidate на основе `settings.get_scoring_config(...).prompt_file`. + +**Acceptance criteria:** +- Можно задать prompt для одного канала без влияния на остальные. +- Prompt version/hash логируется, кэш по prompt_hash работает корректно. + +#### P1.2 Intra-message dedup/ranking при chunking + +**Идея:** после получения `chunk_events` выполнить: +1) дедуп событий внутри одного сообщения (по anchor, либо по `(action, object_id/object_name_raw, time_bucket)`), +2) ранжирование (anchor>explicit time>confidence>time_confidence), +3) только потом применять `llm_max_events_per_msg`. + +**Acceptance criteria:** +- При длинных сообщениях события из “поздних” чанков не теряются из‑за простого обрезания. +- Дубли внутри одного сообщения исчезают (unit‑тест). + +#### P1.3 Дедуп-ключи с учётом `source_id` + +**Идея:** включить `source_id` в material для `cluster_key/dedup_key`. + +**Миграция:** +- Для SQLite можно пересчитать на чтении/пересохранении или отдельным скриптом. +- Для Postgres — Alembic миграция + backfill. + +**Acceptance criteria:** +- Коллизии dedup_key между источниками невозможны по конструкции. + +#### P1.4 Репозиторий Postgres: parity для digest filtering + +Сейчас `publish_digest_use_case` использует `repository.get_events_in_window_filtered`, который реализован в SQLite и объявлен в `RepositoryProtocol`, но отсутствует в PostgresRepository. + +**Acceptance criteria:** +- PostgresRepository реализует `get_events_in_window_filtered` с теми же семантиками COALESCE (учёт `message_published_at`). + +### P2 — “Петля качества и продуктовая скорость” (1–2 месяца) + +#### P2.1 Офлайн-оценка качества (fixtures + регрессии) + +**Идея:** добавить набор anonymized fixtures сообщений и ожидаемых “инвариантов”: +- извлекается ли событие вообще, +- категория/тип, +- наличие anchors, +- корректная стратегия времени. + +Цель — мерить изменения промпта/кода как A/B по метрикам, не полагаясь на “ощущения”. + +#### P2.2 Human-in-the-loop в UI + +**Идея:** интерфейс для разметки “правильно/неправильно”, ручной правки полей, и экспортом в датасет для обучения/промпт‑итераций. + +## 7) Нефункциональные требования + +- **Безопасность:** не логировать raw prompt/response по умолчанию (сохраняем текущую политику redaction в `src/adapters/llm_client.py`). +- **Стоимость:** сохранить текущий бюджет/кэширование; любое увеличение prompt должно быть в пределах `token_budget.prompt_budget_for_model()`. +- **Совместимость:** не ломать dual‑DB (SQLite/Postgres) и multi-source. +- **Наблюдаемость:** новые метрики и логи должны быть стабильны по имени/лейблам. + +## 8) План тестирования + +Минимум: +- Unit: `TimeCompletionPolicy`, prompt building + hashing, intra-message dedup. +- Integration: прогон `extract_events_use_case` с mock LLM (события без времени) → событие сохраняется и валидируется. +- Repository parity: Postgres реализация `get_events_in_window_filtered` (если Postgres тесты включаются в окружении). + +Рекомендуемо: +- E2E: fixture Slack + fixture Telegram через 4 стадии (ingest→candidates→extract→dedup) с проверкой метрик. + +## 9) План внедрения (rollout) + +- Флаги (config): включить P0.1/P0.2 через настройки (например, `extraction.time_completion_enabled`, `extraction.prompt_metadata_enabled`). +- Выкатка: + 1) включить в staging, собрать метрики `%blocked_by_validation` и `events_saved/candidate` + 2) включить в prod на части каналов (canary) + 3) расширить на все каналы, зафиксировать baseline метрик + +## 10) Риски и вопросы + +- Как трактовать “время события” для TG‑новостей: timestamp поста часто “время публикации новости”, а не “время события”. Нужна договорённость, но fallback улучшает полезность. +- Рост prompt может снизить cache hit rate (если включать много уникальных полей) — нужно строго ограничить контекст (anchors/сигналы/ссылки) и не включать полный raw payload. +- Валидации: какие ошибки должны быть “blocking”, а какие — warning (особенно time/status). + diff --git a/src/adapters/llm_client.py b/src/adapters/llm_client.py index 522aa2b..67f7878 100644 --- a/src/adapters/llm_client.py +++ b/src/adapters/llm_client.py @@ -160,6 +160,7 @@ def __init__( """ self.client = OpenAI(api_key=api_key, timeout=timeout) self.model = model + self.timeout_seconds = timeout self._verbose_requested = verbose self.verbose = verbose and self._is_verbose_allowed() @@ -238,6 +239,7 @@ def extract_events( channel_name: str = "", *, chunk_index: int | None = None, + context: dict[str, Any] | None = None, ) -> LLMResponse: """Extract events from message text using LLM. @@ -281,7 +283,13 @@ def extract_events( effective_text = truncated_text # Build prompt - prompt = self._build_prompt(effective_text, links, message_ts_dt, channel_name) + prompt = self._build_prompt( + effective_text, + links, + message_ts_dt, + channel_name, + context=context, + ) # Log request details logger.info( @@ -457,6 +465,7 @@ def extract_events_with_retry( max_retries: int = 3, *, chunk_index: int | None = None, + context: dict[str, Any] | None = None, ) -> LLMResponse: """Extract events with retry on failures (timeout, rate limit, validation). @@ -485,6 +494,7 @@ def extract_events_with_retry( message_ts_dt, channel_name, chunk_index=chunk_index, + context=context, ) except (ValidationError, LLMAPIError) as e: last_error = e @@ -550,7 +560,13 @@ def get_call_metadata(self) -> LLMCallMetadata: return self._last_call_metadata def _build_prompt( - self, text: str, links: list[str], message_ts_dt: datetime, channel_name: str + self, + text: str, + links: list[str], + message_ts_dt: datetime, + channel_name: str, + *, + context: dict[str, Any] | None = None, ) -> str: """Build user prompt for LLM. @@ -565,11 +581,21 @@ def _build_prompt( """ ts_str = message_ts_dt.strftime("%Y-%m-%d %H:%M UTC") - prompt_parts = [ - f"Channel: #{channel_name}" if channel_name else "", - f"Message timestamp: {ts_str}", - f"\nMessage text:\n{text}", - ] + prompt_parts: list[str] = [] + + if context: + serialized_context = json.dumps( + context, ensure_ascii=False, sort_keys=True, separators=(",", ":") + ) + prompt_parts.append("Message metadata (JSON):\n" + serialized_context) + + prompt_parts.extend( + [ + f"Channel: #{channel_name}" if channel_name else "", + f"Message timestamp: {ts_str}", + f"\nMessage text:\n{text}", + ] + ) if links: prompt_parts.append( diff --git a/src/adapters/postgres_repository.py b/src/adapters/postgres_repository.py index 0e8d6bc..6141b1d 100644 --- a/src/adapters/postgres_repository.py +++ b/src/adapters/postgres_repository.py @@ -918,6 +918,61 @@ def get_candidate_by_message_id(self, message_id: str) -> EventCandidate | None: return self._row_to_candidate(dict(row)) + def get_message_metadata( + self, message_id: str, source_id: MessageSource + ) -> dict[str, Any]: + """Return a small metadata subset for a raw message.""" + + with self._get_connection() as conn: + with conn.cursor(cursor_factory=RealDictCursor) as cur: + if source_id == MessageSource.SLACK: + cur.execute( + """ + SELECT permalink, reply_count, total_reactions, attachments_count, files_count + FROM raw_slack_messages + WHERE message_id = %s + """, + (message_id,), + ) + row = cur.fetchone() + if not row: + return {} + attachments_count = int(row.get("attachments_count") or 0) + files_count = int(row.get("files_count") or 0) + return { + "permalink": row.get("permalink"), + "reply_count": int(row.get("reply_count") or 0), + "reactions_count": int(row.get("total_reactions") or 0), + "has_file": (attachments_count + files_count) > 0, + "file_mime": None, + "post_url": None, + "forwarded_from": None, + } + + if source_id == MessageSource.TELEGRAM: + cur.execute( + """ + SELECT post_url, forward_from_channel, reply_count, reactions_count, has_file, file_mime + FROM raw_telegram_messages + WHERE message_id = %s + """, + (message_id,), + ) + row = cur.fetchone() + if not row: + return {} + return { + "post_url": row.get("post_url"), + "forwarded_from": row.get("forward_from_channel"), + "reply_count": int(row.get("reply_count") or 0), + "reactions_count": int(row.get("reactions_count") or 0), + "has_file": bool(row.get("has_file")), + "file_mime": row.get("file_mime"), + "permalink": None, + } + + return {} + def get_recent_slack_messages(self, limit: int = 100) -> list[SlackMessage]: """Get most recent Slack messages for presentation use.""" @@ -1079,6 +1134,41 @@ def get_events_in_window(self, start_dt: datetime, end_dt: datetime) -> list[Eve columns = [desc[0] for desc in cur.description] return [self._row_to_event(dict(zip(columns, row))) for row in rows] + def get_events_in_window_filtered( + self, + start_dt: datetime, + end_dt: datetime, + min_confidence: float = 0.0, + max_events: int | None = None, + ) -> list[Event]: + """Get filtered events within date window. + + Matches SQLite semantics, including fallback to message_published_at and extracted_at. + """ + + params: list[object] = [start_dt, end_dt, min_confidence] + limit_clause = "" + if max_events is not None: + limit_clause = " LIMIT %s" + params.append(max_events) + + with self._get_connection() as conn: + with conn.cursor(cursor_factory=extensions.cursor) as cur: + cur.execute( + f""" + SELECT * FROM events + WHERE COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) >= %s + AND COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) <= %s + AND confidence >= %s + ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) ASC + {limit_clause} + """, + tuple(params), + ) + rows = cur.fetchall() + columns = [desc[0] for desc in cur.description] + return [self._row_to_event(dict(zip(columns, row))) for row in rows] + def save_llm_call(self, metadata: LLMCallMetadata) -> None: """Save LLM call metadata. diff --git a/src/adapters/sqlite_repository.py b/src/adapters/sqlite_repository.py index 1660b43..465bd8a 100644 --- a/src/adapters/sqlite_repository.py +++ b/src/adapters/sqlite_repository.py @@ -1028,6 +1028,69 @@ def get_candidate_by_message_id(self, message_id: str) -> EventCandidate | None: return self._row_to_candidate(row) + def get_message_metadata( + self, message_id: str, source_id: MessageSource + ) -> dict[str, Any]: + """Return a small metadata subset for a raw message.""" + + try: + conn = self._get_connection() + cursor = conn.cursor() + + if source_id == MessageSource.SLACK: + cursor.execute( + """ + SELECT permalink, reply_count, total_reactions, attachments_count, files_count + FROM raw_slack_messages + WHERE message_id = ? + """, + (message_id,), + ) + row = cursor.fetchone() + conn.close() + if row is None: + return {} + attachments_count = int(row["attachments_count"] or 0) + files_count = int(row["files_count"] or 0) + return { + "permalink": row["permalink"], + "reply_count": int(row["reply_count"] or 0), + "reactions_count": int(row["total_reactions"] or 0), + "has_file": (attachments_count + files_count) > 0, + "file_mime": None, + "post_url": None, + "forwarded_from": None, + } + + if source_id == MessageSource.TELEGRAM: + cursor.execute( + """ + SELECT post_url, forward_from_channel, reply_count, reactions_count, has_file, file_mime + FROM raw_telegram_messages + WHERE message_id = ? + """, + (message_id,), + ) + row = cursor.fetchone() + conn.close() + if row is None: + return {} + return { + "post_url": row["post_url"], + "forwarded_from": row["forward_from_channel"], + "reply_count": int(row["reply_count"] or 0), + "reactions_count": int(row["reactions_count"] or 0), + "has_file": bool(row["has_file"]), + "file_mime": row["file_mime"], + "permalink": None, + } + + conn.close() + return {} + + except sqlite3.Error as exc: # pragma: no cover - defensive path + raise RepositoryError(f"Failed to load message metadata: {exc}") from exc + def get_recent_slack_messages(self, limit: int = 100) -> list[SlackMessage]: """Get most recent Slack messages for presentation use.""" @@ -1242,10 +1305,10 @@ def get_events_in_window_filtered( # Build query with confidence filter query = """ SELECT * FROM events - WHERE COALESCE(actual_start, actual_end, planned_start, planned_end) >= ? - AND COALESCE(actual_start, actual_end, planned_start, planned_end) <= ? + WHERE COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) >= ? + AND COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) <= ? AND confidence >= ? - ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end) ASC + ORDER BY COALESCE(actual_start, actual_end, planned_start, planned_end, message_published_at, extracted_at) ASC """ params: list[Any] = [ diff --git a/src/config/settings.py b/src/config/settings.py index 7586ed1..388a5f1 100644 --- a/src/config/settings.py +++ b/src/config/settings.py @@ -425,6 +425,20 @@ def _assign(field_name: str, value: Any) -> None: _assign("validation_max_links", validation_config.get("max_links")) _assign("validation_max_impact_area", validation_config.get("max_impact_area")) + extraction_config = config.get("extraction") or {} + _assign( + "extraction_time_completion_enabled", + extraction_config.get("time_completion_enabled"), + ) + _assign( + "extraction_prompt_metadata_enabled", + extraction_config.get("prompt_metadata_enabled"), + ) + _assign( + "extraction_prompt_metadata_max_anchors", + extraction_config.get("prompt_metadata_max_anchors"), + ) + telegram_channels_config = config.get("telegram_channels") if telegram_channels_config: parsed_telegram_channels: list[TelegramChannelConfig] = [] @@ -633,6 +647,21 @@ def _assign(field_name: str, value: Any) -> None: default=3, description="Maximum number of impact areas" ) + # Extraction quality flags (see docs/TECHNICAL_SPEC_EXTRACTION_QUALITY.md) + extraction_time_completion_enabled: bool = Field( + default=True, + description="Fill required time fields from message timestamp when missing", + ) + extraction_prompt_metadata_enabled: bool = Field( + default=True, + description="Include structured message metadata in the LLM prompt", + ) + extraction_prompt_metadata_max_anchors: int = Field( + default=10, + ge=0, + description="Maximum anchors to include in prompt metadata", + ) + # Observability log_level: str = Field(default="INFO", description="Logging level") diff --git a/src/domain/protocols.py b/src/domain/protocols.py index a3998f3..e518b7d 100644 --- a/src/domain/protocols.py +++ b/src/domain/protocols.py @@ -268,6 +268,17 @@ def get_candidate_by_message_id(self, message_id: str) -> EventCandidate | None: ... + def get_message_metadata( + self, message_id: str, source_id: MessageSource + ) -> dict[str, Any]: + """Return a small metadata subset for a raw message. + + Used to enrich LLM prompts and ensure prompt hashing includes relevant + deterministic context (anchors, permalinks, forwarded-from, file details). + """ + + ... + def get_recent_slack_messages(self, limit: int = 100) -> list[SlackMessage]: """Get most recent Slack messages for presentation use.""" @@ -516,7 +527,13 @@ class LLMClientProtocol(Protocol): """Protocol for LLM API interactions.""" def extract_events( - self, text: str, links: list[str], message_ts_dt: datetime + self, + text: str, + links: list[str], + message_ts_dt: datetime, + channel_name: str = "", + *, + context: dict[str, Any] | None = None, ) -> LLMResponse: """Extract events from message text using LLM. @@ -524,6 +541,8 @@ def extract_events( text: Normalized message text links: Top 3 most relevant links message_ts_dt: Message timestamp for date resolution fallback + channel_name: Optional channel name for extra context + context: Optional deterministic message metadata for prompting/caching Returns: Structured LLM response diff --git a/src/services/deduplicator.py b/src/services/deduplicator.py index ed1d5db..214671f 100644 --- a/src/services/deduplicator.py +++ b/src/services/deduplicator.py @@ -51,8 +51,10 @@ def generate_cluster_key(event: Event) -> str: # Top anchor (first one if available, else empty) top_anchor = event.anchors[0] if event.anchors else "" - # Concatenate: action + object + anchor - key_material = f"{event.action.value}||{object_key}||{top_anchor}" + # Concatenate: source + action + object + anchor + key_material = ( + f"{event.source_id.value}||{event.action.value}||{object_key}||{top_anchor}" + ) return hashlib.sha1(key_material.encode("utf-8")).hexdigest() diff --git a/src/services/intra_message_postprocess.py b/src/services/intra_message_postprocess.py new file mode 100644 index 0000000..9fdf4ce --- /dev/null +++ b/src/services/intra_message_postprocess.py @@ -0,0 +1,95 @@ +"""Post-processing for events extracted from a single message. + +Implements P1.2 from docs/TECHNICAL_SPEC_EXTRACTION_QUALITY.md: +- Deduplicate events within a single message after chunking +- Rank events before applying per-message limits +""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime + +from src.domain.models import Event, TimeSource + + +@dataclass(frozen=True, slots=True) +class RankedEvent: + event: Event + rank_key: tuple[int, int, float, float] + original_index: int + + +def _primary_time(event: Event) -> datetime | None: + return ( + event.actual_start + or event.actual_end + or event.planned_start + or event.planned_end + or event.message_published_at + ) + + +def _time_source_weight(source: TimeSource) -> int: + if source == TimeSource.EXPLICIT: + return 2 + if source == TimeSource.RELATIVE: + return 1 + return 0 + + +def _rank_key(event: Event) -> tuple[int, int, float, float]: + has_anchor = 1 if (event.anchor or event.anchors) else 0 + return ( + has_anchor, + _time_source_weight(event.time_source), + float(event.confidence), + float(event.time_confidence), + ) + + +def _dedup_key(event: Event) -> tuple[str, str, str, str]: + anchor_key = (event.anchor or (event.anchors[0] if event.anchors else "")).strip() + if anchor_key: + return ("anchor", anchor_key.lower(), "", "") + + object_key = (event.object_id or event.object_name_raw).lower().strip() + primary = _primary_time(event) + bucket = primary.date().isoformat() if primary else "no-time" + return ("attrs", event.action.value.lower(), object_key, bucket) + + +def dedup_and_rank_events_for_message( + events: list[Event], + *, + max_events: int | None, +) -> list[Event]: + """Deduplicate and rank events extracted from a single message.""" + + if not events: + return [] + + ranked: list[RankedEvent] = [ + RankedEvent(event=event, rank_key=_rank_key(event), original_index=index) + for index, event in enumerate(events) + ] + + ranked.sort(key=lambda item: (item.rank_key, -item.original_index), reverse=True) + + kept: list[RankedEvent] = [] + seen: set[tuple[str, str, str, str]] = set() + for item in ranked: + key = _dedup_key(item.event) + if key in seen: + continue + seen.add(key) + kept.append(item) + + kept.sort(key=lambda item: (item.rank_key, -item.original_index), reverse=True) + selected = [item.event for item in kept] + + if max_events is None: + return selected + if max_events <= 0: + return [] + return selected[:max_events] diff --git a/src/services/llm_client_pool.py b/src/services/llm_client_pool.py new file mode 100644 index 0000000..e6e973b --- /dev/null +++ b/src/services/llm_client_pool.py @@ -0,0 +1,118 @@ +"""LLM client pool for per-channel prompt routing. + +Implements P1.1 from docs/TECHNICAL_SPEC_EXTRACTION_QUALITY.md: +pick prompt per channel (if configured), otherwise per-source default, while +reusing LLMClient instances across candidates. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from src.adapters.llm_client import LLMClient +from src.config.logging_config import get_logger +from src.config.settings import Settings +from src.domain.models import MessageSource + +logger = get_logger(__name__) + + +@dataclass(frozen=True, slots=True) +class _ClientKey: + source_id: str + prompt_file: str + model: str + temperature: float + timeout_seconds: int + + +class LLMClientPool: + """Cache of LLMClient instances keyed by prompt file and source settings.""" + + def __init__(self, *, base_client: LLMClient, settings: Settings) -> None: + self._base_client = base_client + self._settings = settings + self._clients: dict[_ClientKey, LLMClient] = {} + + def get_effective_prompt_file( + self, *, source_id: MessageSource, channel_prompt_file: str | None + ) -> str | None: + """Resolve prompt file based on channel override and source defaults.""" + + if isinstance(channel_prompt_file, str) and channel_prompt_file.strip(): + return channel_prompt_file.strip() + + source_config = self._settings.get_source_config(source_id) + if source_config and isinstance(source_config.prompt_file, str): + if source_config.prompt_file.strip(): + return source_config.prompt_file.strip() + + return None + + def get_client( + self, *, source_id: MessageSource, prompt_file: str | None + ) -> LLMClient: + """Return an LLMClient for the prompt file (or the base client).""" + + if not (isinstance(prompt_file, str) and prompt_file.strip()): + return self._base_client + + model = getattr(self._base_client, "model", self._settings.llm_model) + temperature = float(getattr(self._base_client, "temperature", 1.0)) + timeout_seconds = int( + getattr( + self._base_client, "timeout_seconds", self._settings.llm_timeout_seconds + ) + ) + + source_config = self._settings.get_source_config(source_id) + if source_config and isinstance(source_config.llm_settings, dict): + temperature_raw = source_config.llm_settings.get("temperature") + if isinstance(temperature_raw, int | float): + temperature = float(temperature_raw) + timeout_raw = source_config.llm_settings.get( + "timeout_seconds" + ) or source_config.llm_settings.get("timeout") + if isinstance(timeout_raw, int): + timeout_seconds = int(timeout_raw) + + key = _ClientKey( + source_id=source_id.value, + prompt_file=prompt_file.strip(), + model=str(model), + temperature=temperature, + timeout_seconds=timeout_seconds, + ) + + cached = self._clients.get(key) + if cached is not None: + return cached + + api_key_obj = getattr(self._settings, "openai_api_key", None) + api_key = None + if api_key_obj is not None: + try: + api_key = api_key_obj.get_secret_value() + except Exception: # noqa: BLE001 + api_key = None + + if not isinstance(api_key, str) or not api_key.strip(): + logger.warning( + "llm_pool_missing_api_key", + source_id=source_id.value, + prompt_file=prompt_file, + ) + return self._base_client + + client = LLMClient( + api_key=api_key, + model=str(model), + temperature=temperature, + timeout=timeout_seconds, + verbose=getattr(self._base_client, "verbose", False), + prompt_file=prompt_file.strip(), + prompt_budget=getattr(self._base_client, "prompt_token_budget", None), + ) + + self._clients[key] = client + return client diff --git a/src/services/time_completion.py b/src/services/time_completion.py new file mode 100644 index 0000000..1c213bf --- /dev/null +++ b/src/services/time_completion.py @@ -0,0 +1,86 @@ +"""Time completion policy for extracted events. + +This module implements P0.1 from docs/TECHNICAL_SPEC_EXTRACTION_QUALITY.md: +if the LLM response is missing a required time field for a given status, fill it +from the source message timestamp and mark it as a low-confidence fallback. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime + +import pytz + +from src.domain.models import Event, EventStatus, TimeSource + + +@dataclass(frozen=True, slots=True) +class TimeCompletionResult: + """Result of applying time completion to an event.""" + + changed: bool + completed_field: str | None + used_fallback_ts: datetime | None + + +def _normalize_to_utc(dt: datetime) -> datetime: + if dt.tzinfo is None: + return dt.replace(tzinfo=pytz.UTC) + return dt.astimezone(pytz.UTC) + + +def apply_time_completion_policy( + event: Event, + *, + message_published_at: datetime | None, + fallback_ts: datetime, + confidence_cap: float = 0.3, +) -> TimeCompletionResult: + """Fill missing required time fields based on event status. + + Rules (only if the target field is empty): + - planned|confirmed -> planned_start = message_published_at + - started -> actual_start = message_published_at + - completed -> actual_end = message_published_at + + If a fallback is applied, set ``time_source=ts_fallback`` and cap + ``time_confidence`` to ``confidence_cap``. + """ + + effective_ts = ( + _normalize_to_utc(message_published_at) + if message_published_at is not None + else _normalize_to_utc(fallback_ts) + ) + + completed_field: str | None = None + + if event.status in (EventStatus.PLANNED, EventStatus.CONFIRMED): + if event.planned_start is None: + event.planned_start = effective_ts + completed_field = "planned_start" + elif event.status == EventStatus.STARTED: + if event.actual_start is None: + event.actual_start = effective_ts + completed_field = "actual_start" + elif event.status == EventStatus.COMPLETED: + if event.actual_end is None: + event.actual_end = effective_ts + completed_field = "actual_end" + + if completed_field is None: + return TimeCompletionResult( + changed=False, + completed_field=None, + used_fallback_ts=None, + ) + + event.time_source = TimeSource.TS_FALLBACK + event.time_confidence = min(event.time_confidence, confidence_cap) + + return TimeCompletionResult( + changed=True, + completed_field=completed_field, + used_fallback_ts=effective_ts, + ) diff --git a/src/use_cases/extract_events.py b/src/use_cases/extract_events.py index 6f1f61c..14324e0 100644 --- a/src/use_cases/extract_events.py +++ b/src/use_cases/extract_events.py @@ -46,7 +46,9 @@ from src.ports.task_queue import TaskQueuePort from src.services import deduplicator, token_budget from src.services.importance_scorer import ImportanceScorer +from src.services.llm_client_pool import LLMClientPool from src.services.object_registry import ObjectRegistry +from src.services.time_completion import apply_time_completion_policy from src.services.validators import EventValidator logger = get_logger(__name__) @@ -109,6 +111,7 @@ def _compute_prompt_hash( message_ts_dt: datetime, channel_name: str, chunk_index: int, + metadata: dict[str, Any] | None = None, ) -> str: """Compute deterministic prompt hash for caching.""" @@ -117,6 +120,7 @@ def _compute_prompt_hash( "links": links, "ts": _normalize_to_utc(message_ts_dt).isoformat(), "channel": channel_name, + "metadata": metadata or {}, } serialized = json.dumps(payload, sort_keys=True, separators=(",", ":")) normalized_content_hash = hashlib.sha256(serialized.encode("utf-8")).hexdigest() @@ -134,6 +138,64 @@ def _compute_prompt_hash( return hashlib.sha256(hash_input.encode("utf-8")).hexdigest() +def _build_prompt_metadata( + *, + candidate: EventCandidate, + channel_name: str, + source_id: MessageSource, + repository: RepositoryProtocol, + settings: Settings, +) -> dict[str, Any]: + """Build a small, deterministic metadata blob for LLM prompting and caching.""" + + max_anchors = getattr(settings, "extraction_prompt_metadata_max_anchors", 10) + try: + max_anchors_int = max(0, int(max_anchors)) + except (TypeError, ValueError): + max_anchors_int = 10 + + anchors: list[str] = [] + for anchor in candidate.anchors: + if not isinstance(anchor, str): + continue + cleaned = anchor.strip() + if not cleaned: + continue + if cleaned not in anchors: + anchors.append(cleaned[:80]) + if len(anchors) >= max_anchors_int: + break + + metadata: dict[str, Any] = { + "source_id": source_id.value, + "channel_id": candidate.channel, + "channel_name": channel_name, + "message_id": candidate.message_id, + "message_published_at": _normalize_to_utc(candidate.ts_dt).isoformat(), + "reply_count": int(getattr(candidate.features, "reply_count", 0) or 0), + "reactions_count": int(getattr(candidate.features, "reaction_count", 0) or 0), + "has_file": bool(getattr(candidate.features, "has_files", False)), + "anchors": anchors, + } + + raw_lookup = getattr(repository, "get_message_metadata", None) + if callable(raw_lookup): + try: + raw_metadata = raw_lookup(candidate.message_id, source_id) + except Exception: # noqa: BLE001 + raw_metadata = None + if isinstance(raw_metadata, dict): + for key in ("permalink", "post_url", "forwarded_from", "file_mime"): + value = raw_metadata.get(key) + if isinstance(value, str) and value.strip(): + metadata[key] = value.strip() + raw_has_file = raw_metadata.get("has_file") + if isinstance(raw_has_file, bool): + metadata["has_file"] = raw_has_file + + return metadata + + @dataclass(slots=True) class CandidateExtractionMetrics: """Metrics produced while processing a single candidate.""" @@ -307,6 +369,7 @@ def _process_candidate_with_llm( *, candidate: EventCandidate, llm_client: LLMClient, + llm_client_pool: LLMClientPool | None, repository: RepositoryProtocol, settings: Settings, cache_ttl: timedelta | None, @@ -330,6 +393,23 @@ def _process_candidate_with_llm( channel_config.channel_name if channel_config else candidate.channel ) + effective_llm_client = llm_client + prompt_file_override: str | None = None + if llm_client_pool is not None: + channel_prompt_file = None + if channel_config is not None: + candidate_prompt_file = getattr(channel_config, "prompt_file", None) + if isinstance(candidate_prompt_file, str) and candidate_prompt_file.strip(): + channel_prompt_file = candidate_prompt_file.strip() + prompt_file_override = llm_client_pool.get_effective_prompt_file( + source_id=candidate_source, + channel_prompt_file=channel_prompt_file, + ) + effective_llm_client = llm_client_pool.get_client( + source_id=candidate_source, + prompt_file=prompt_file_override, + ) + logger.info( "processing_candidate", correlation_id=correlation_id, @@ -338,11 +418,23 @@ def _process_candidate_with_llm( message_id=candidate.message_id[:8], source=candidate_source.value, channel=channel_name, + prompt_version=getattr(effective_llm_client, "prompt_version", None), + prompt_hash=getattr(effective_llm_client, "system_prompt_hash", None), + prompt_file=prompt_file_override, ) limited_links = candidate.links_norm[:MAX_LINKS] + metadata: dict[str, Any] | None = None + if getattr(settings, "extraction_prompt_metadata_enabled", True): + metadata = _build_prompt_metadata( + candidate=candidate, + channel_name=channel_name, + source_id=candidate_source, + repository=repository, + settings=settings, + ) char_budget = token_budget.characters_for_tokens( - llm_client.prompt_token_budget, llm_client.model + effective_llm_client.prompt_token_budget, effective_llm_client.model ) text_chunks = token_budget.truncate_or_chunk(candidate.text_norm, char_budget) @@ -362,12 +454,13 @@ def _process_candidate_with_llm( try: for chunk_index, chunk_text in enumerate(text_chunks): prompt_hash = _compute_prompt_hash( - llm_client=llm_client, + llm_client=effective_llm_client, chunk_text=chunk_text, links=limited_links, message_ts_dt=candidate.ts_dt, channel_name=channel_name, chunk_index=chunk_index, + metadata=metadata, ) llm_response: LLMResponse | None = None @@ -401,7 +494,7 @@ def _process_candidate_with_llm( LLMCallMetadata( message_id=candidate.message_id, prompt_hash=prompt_hash, - model=llm_client.model, + model=effective_llm_client.model, tokens_in=0, tokens_out=0, cost_usd=0.0, @@ -424,12 +517,13 @@ def _process_candidate_with_llm( chunk_index=chunk_index, ) - llm_response = llm_client.extract_events_with_retry( + llm_response = effective_llm_client.extract_events_with_retry( text=chunk_text, links=limited_links, message_ts_dt=candidate.ts_dt, channel_name=channel_name, chunk_index=chunk_index, + context=metadata, ) logger.info( @@ -443,7 +537,7 @@ def _process_candidate_with_llm( metrics.llm_calls += 1 - call_metadata = llm_client.get_call_metadata() + call_metadata = effective_llm_client.get_call_metadata() call_metadata.message_id = candidate.message_id call_metadata.prompt_hash = prompt_hash call_metadata.cached = False @@ -458,34 +552,19 @@ def _process_candidate_with_llm( chunk_events.extend(llm_response.events) chunk_is_event = chunk_is_event or llm_response.is_event - events_source = chunk_events max_events_raw = getattr(settings, "llm_max_events_per_msg", 5) try: max_events = int(max_events_raw) if max_events_raw is not None else None except (TypeError, ValueError): max_events = 5 - llm_events = ( - events_source[:max_events] if max_events is not None else events_source - ) - - if len(events_source) > len(llm_events): - logger.info( - "llm_response_truncated", - correlation_id=correlation_id, - message_id=candidate.message_id[:8], - original_count=len(events_source), - max_events=max_events, + if chunk_is_event and chunk_events: + from src.services.intra_message_postprocess import ( + dedup_and_rank_events_for_message, ) - if chunk_is_event and llm_events: - events_to_save: list[Event] = [] - validation_errors: list[str] = [] - - reaction_count = candidate.features.reaction_count - mention_count = 1 if candidate.features.has_mention else 0 - - for llm_event in llm_events: + all_domain_events: list[Event] = [] + for llm_event in chunk_events: domain_event = convert_llm_event_to_domain( llm_event, message_id=candidate.message_id, @@ -495,6 +574,50 @@ def _process_candidate_with_llm( object_registry=object_registry, ) + if getattr(settings, "extraction_time_completion_enabled", True): + completion = apply_time_completion_policy( + domain_event, + message_published_at=domain_event.message_published_at, + fallback_ts=candidate.ts_dt, + ) + if completion.changed: + logger.warning( + "time_completed_from_message_ts", + correlation_id=correlation_id, + message_id=candidate.message_id[:8], + status=domain_event.status.value, + completed_field=completion.completed_field, + time_source=domain_event.time_source.value, + time_confidence=domain_event.time_confidence, + ) + domain_event.dedup_key = deduplicator.generate_dedup_key( + domain_event + ) + + all_domain_events.append(domain_event) + + selected_events = dedup_and_rank_events_for_message( + all_domain_events, + max_events=max_events, + ) + + if len(all_domain_events) > len(selected_events): + logger.info( + "llm_message_postprocess_reduced", + correlation_id=correlation_id, + message_id=candidate.message_id[:8], + original_count=len(all_domain_events), + selected_count=len(selected_events), + max_events=max_events, + ) + + events_to_save: list[Event] = [] + validation_errors: list[str] = [] + + reaction_count = candidate.features.reaction_count + mention_count = 1 if candidate.features.has_mention else 0 + + for domain_event in selected_events: importance_result = importance_scorer.calculate_importance( domain_event, llm_score=None, @@ -510,14 +633,14 @@ def _process_candidate_with_llm( if critical_errors: validation_errors.extend( [ - f"Event {llm_event.object_name_raw}: {error}" + f"Event {domain_event.object_name_raw}: {error}" for error in critical_errors ] ) logger.warning( "event_validation_failed", correlation_id=correlation_id, - event_object=llm_event.object_name_raw, + event_object=domain_event.object_name_raw, critical_errors=critical_errors, warnings_count=len(validation_summary["warnings"]), info_count=len(validation_summary["info"]), @@ -531,7 +654,7 @@ def _process_candidate_with_llm( logger.info( "event_validation_warnings", correlation_id=correlation_id, - event_object=llm_event.object_name_raw, + event_object=domain_event.object_name_raw, warnings=validation_summary["warnings"], ) @@ -540,7 +663,7 @@ def _process_candidate_with_llm( metrics.events_extracted += len(events_to_save) metrics.dedup_required = True - total_events_processed = len(events_source) + total_events_processed = len(selected_events) blocked_events = total_events_processed - len(events_to_save) saved_events = len(events_to_save) @@ -727,6 +850,11 @@ def extract_events_use_case( errors: list[str] = [] cache_ttl = _resolve_cache_ttl(settings) validator = event_validator or _get_event_validator() + llm_pool = ( + LLMClientPool(base_client=llm_client, settings=settings) + if isinstance(llm_client, LLMClient) + else None + ) for index, candidate in enumerate(candidates, start=1): candidates_processed += 1 @@ -734,6 +862,7 @@ def extract_events_use_case( metrics = _process_candidate_with_llm( candidate=candidate, llm_client=llm_client, + llm_client_pool=llm_pool, repository=repository, settings=settings, cache_ttl=cache_ttl, @@ -885,6 +1014,11 @@ def process_llm_candidate_task_use_case( metrics = _process_candidate_with_llm( candidate=candidate, llm_client=llm_client, + llm_client_pool=( + LLMClientPool(base_client=llm_client, settings=settings) + if isinstance(llm_client, LLMClient) + else None + ), repository=repository, settings=settings, cache_ttl=cache_ttl, diff --git a/tests/test_extract_events_caching.py b/tests/test_extract_events_caching.py index e28b25e..3309823 100644 --- a/tests/test_extract_events_caching.py +++ b/tests/test_extract_events_caching.py @@ -146,10 +146,14 @@ def test_extract_events_use_case_persists_llm_response( """LLM responses should be persisted and limited per settings.""" candidate = _make_candidate() - llm_response = LLMResponse( - is_event=True, - events=[_make_llm_event(), _make_llm_event(), _make_llm_event()], - ) + event1 = _make_llm_event() + event2 = _make_llm_event() + event2.anchor = "XYZ-789" + event2.anchors = ["XYZ-789"] + event3 = _make_llm_event() + event3.anchor = "QWE-111" + event3.anchors = ["QWE-111"] + llm_response = LLMResponse(is_event=True, events=[event1, event2, event3]) repository = MagicMock() repository.get_candidates_for_extraction.return_value = [candidate] diff --git a/tests/test_extraction_time_completion_integration.py b/tests/test_extraction_time_completion_integration.py new file mode 100644 index 0000000..49514be --- /dev/null +++ b/tests/test_extraction_time_completion_integration.py @@ -0,0 +1,117 @@ +"""Integration test for time completion in extract_events_use_case (P0.1).""" + +from __future__ import annotations + +from datetime import UTC, datetime +from unittest.mock import MagicMock + +from src.config.settings import Settings +from src.domain.models import ( + EventCandidate, + EventCategory, + LLMCallMetadata, + LLMEvent, + LLMResponse, + MessageSource, + ScoringFeatures, + TimeSource, +) +from src.services.importance_scorer import ImportanceScorer +from src.use_cases.extract_events import build_object_registry, extract_events_use_case + + +def test_extract_events_fills_missing_completed_time_and_saves_event() -> None: + settings = MagicMock(spec=Settings) + settings.llm_daily_budget_usd = 100.0 + settings.llm_max_events_per_msg = 5 + settings.llm_cache_ttl_days = 21 + settings.object_registry_path = "config/defaults/object_registry.example.yaml" + settings.get_scoring_config.return_value = None + settings.extraction_time_completion_enabled = True + settings.extraction_prompt_metadata_enabled = False + + candidate_ts = datetime(2025, 12, 1, 12, 0, tzinfo=UTC) + candidate = EventCandidate( + message_id="msg-1", + channel="general", + ts_dt=candidate_ts, + text_norm="Release completed", + links_norm=[], + anchors=[], + score=1.0, + features=ScoringFeatures(), + source_id=MessageSource.SLACK, + ) + + llm_event = LLMEvent( + action="launch", + object_name_raw="Widget", + qualifiers=[], + stroke=None, + anchor=None, + category=EventCategory.PRODUCT, + status="completed", + change_type="launch", + environment="prod", + severity=None, + planned_start=None, + planned_end=None, + actual_start=None, + actual_end=None, + time_source="explicit", + time_confidence=0.9, + summary="Widget release completed", + why_it_matters=None, + links=[], + anchors=[], + impact_area=[], + impact_type=[], + confidence=0.9, + ) + llm_response = LLMResponse(is_event=True, events=[llm_event]) + + llm_client = MagicMock() + llm_client.model = "gpt-5-nano" + llm_client.system_prompt_hash = "prompt-hash" + llm_client.prompt_token_budget = 3000 + llm_client.prompt_version = "v1" + llm_client.extract_events_with_retry.return_value = llm_response + llm_client.get_call_metadata.return_value = LLMCallMetadata( + message_id="", + prompt_hash="prompt-hash", + model="gpt-5-nano", + tokens_in=10, + tokens_out=10, + cost_usd=0.01, + latency_ms=50, + cached=False, + ) + + repository = MagicMock() + repository.get_candidates_for_extraction.return_value = [candidate] + repository.get_cached_llm_response.return_value = None + repository.update_candidate_status.return_value = None + repository.save_llm_call.return_value = None + repository.save_llm_response.return_value = None + repository.get_daily_llm_cost.return_value = 0.0 + repository.save_events.return_value = 1 + + object_registry = build_object_registry(settings) + importance_scorer = ImportanceScorer() + + result = extract_events_use_case( + llm_client=llm_client, + repository=repository, + settings=settings, + source_id=MessageSource.SLACK, + batch_size=5, + check_budget=False, + object_registry=object_registry, + importance_scorer=importance_scorer, + ) + + assert result.events_extracted == 1 + saved_events = repository.save_events.call_args[0][0] + assert saved_events[0].actual_end == candidate_ts + assert saved_events[0].time_source == TimeSource.TS_FALLBACK + assert saved_events[0].time_confidence <= 0.3 diff --git a/tests/test_intra_message_postprocess.py b/tests/test_intra_message_postprocess.py new file mode 100644 index 0000000..f48464b --- /dev/null +++ b/tests/test_intra_message_postprocess.py @@ -0,0 +1,68 @@ +"""Tests for intra-message post-processing (P1.2).""" + +from __future__ import annotations + +from datetime import UTC, datetime + +from src.domain.models import TimeSource +from src.services.intra_message_postprocess import dedup_and_rank_events_for_message +from tests.conftest import create_test_event + + +def test_dedup_and_rank_prefers_late_anchor_over_early_noise() -> None: + ts = datetime(2025, 12, 1, 12, 0, tzinfo=UTC) + + early = create_test_event(status="updated").model_copy( + update={ + "anchor": None, + "anchors": [], + "time_source": TimeSource.TS_FALLBACK, + "time_confidence": 0.1, + "confidence": 0.4, + "message_published_at": ts, + } + ) + + late = create_test_event(status="updated").model_copy( + update={ + "anchor": "ABC-123", + "anchors": ["ABC-123"], + "time_source": TimeSource.EXPLICIT, + "time_confidence": 0.9, + "confidence": 0.7, + "message_published_at": ts, + } + ) + + selected = dedup_and_rank_events_for_message([early, late], max_events=1) + assert len(selected) == 1 + assert selected[0].anchor == "ABC-123" + + +def test_dedup_and_rank_deduplicates_by_anchor_and_keeps_best() -> None: + ts = datetime(2025, 12, 1, 12, 0, tzinfo=UTC) + + worse = create_test_event(status="updated").model_copy( + update={ + "anchor": "PROJ-1", + "anchors": ["PROJ-1"], + "time_source": TimeSource.RELATIVE, + "time_confidence": 0.6, + "confidence": 0.6, + "message_published_at": ts, + } + ) + better = create_test_event(status="updated").model_copy( + update={ + "anchor": "PROJ-1", + "anchors": ["PROJ-1"], + "time_source": TimeSource.EXPLICIT, + "time_confidence": 0.7, + "confidence": 0.9, + "message_published_at": ts, + } + ) + + selected = dedup_and_rank_events_for_message([worse, better], max_events=None) + assert len(selected) == 1 + assert selected[0].confidence == 0.9 diff --git a/tests/test_prompt_hashing.py b/tests/test_prompt_hashing.py index 3a04bd1..e589a0a 100644 --- a/tests/test_prompt_hashing.py +++ b/tests/test_prompt_hashing.py @@ -71,3 +71,32 @@ def test_prompt_hash_stability_across_runs() -> None: chunk_index=0, ) assert inline_prompt != "" + + +def test_prompt_hash_includes_metadata() -> None: + llm_client = _FakeLLMClient(model="gpt-5-nano", version="v1") + message_ts = datetime(2025, 1, 1, tzinfo=UTC) + links: list[str] = [] + chunk_text = "release announcement" + + base = _compute_prompt_hash( + llm_client=llm_client, + chunk_text=chunk_text, + links=links, + message_ts_dt=message_ts, + channel_name="general", + chunk_index=0, + metadata={"anchors": ["ABC-1"]}, + ) + + different_metadata = _compute_prompt_hash( + llm_client=llm_client, + chunk_text=chunk_text, + links=links, + message_ts_dt=message_ts, + channel_name="general", + chunk_index=0, + metadata={"anchors": ["XYZ-9"]}, + ) + + assert base != different_metadata diff --git a/tests/test_prompt_metadata.py b/tests/test_prompt_metadata.py new file mode 100644 index 0000000..3d7f450 --- /dev/null +++ b/tests/test_prompt_metadata.py @@ -0,0 +1,36 @@ +"""Tests for prompt metadata block (P0.2).""" + +from __future__ import annotations + +from datetime import UTC, datetime + +from src.adapters.llm_client import LLMClient + + +def test_llm_prompt_includes_message_metadata_block() -> None: + llm = LLMClient( + api_key="test", + model="gpt-5-nano", + prompt_template="system", + timeout=5, + ) + + context = { + "source_id": "slack", + "channel_id": "C123", + "message_id": "msg-1", + "anchors": ["ABC-123"], + "reply_count": 2, + } + + prompt = llm._build_prompt( # noqa: SLF001 + "hello", + ["https://example.com"], + datetime(2025, 12, 1, 12, 0, tzinfo=UTC), + "general", + context=context, + ) + + assert "Message metadata" in prompt + assert '"anchors"' in prompt + assert '"ABC-123"' in prompt diff --git a/tests/test_time_completion_policy.py b/tests/test_time_completion_policy.py new file mode 100644 index 0000000..b67189a --- /dev/null +++ b/tests/test_time_completion_policy.py @@ -0,0 +1,113 @@ +"""Tests for TimeCompletionPolicy (P0.1).""" + +from __future__ import annotations + +from datetime import UTC, datetime + +from src.domain.models import EventStatus, TimeSource +from src.services.time_completion import apply_time_completion_policy +from tests.conftest import create_test_event + + +def test_time_completion_fills_required_fields_and_caps_confidence() -> None: + published_at = datetime(2025, 12, 1, 12, 0, tzinfo=UTC) + fallback_ts = datetime(2025, 12, 1, 11, 0, tzinfo=UTC) + + planned = create_test_event(status="planned").model_copy( + update={ + "planned_start": None, + "actual_start": None, + "actual_end": None, + "message_published_at": published_at, + "time_source": TimeSource.EXPLICIT, + "time_confidence": 0.95, + } + ) + result = apply_time_completion_policy( + planned, + message_published_at=planned.message_published_at, + fallback_ts=fallback_ts, + ) + assert result.changed is True + assert planned.status == EventStatus.PLANNED + assert planned.planned_start == published_at + assert planned.time_source == TimeSource.TS_FALLBACK + assert planned.time_confidence <= 0.3 + + started = create_test_event(status="started").model_copy( + update={ + "actual_start": None, + "actual_end": None, + "message_published_at": published_at, + "time_source": TimeSource.EXPLICIT, + "time_confidence": 0.9, + } + ) + result = apply_time_completion_policy( + started, + message_published_at=started.message_published_at, + fallback_ts=fallback_ts, + ) + assert result.changed is True + assert started.status == EventStatus.STARTED + assert started.actual_start == published_at + assert started.time_source == TimeSource.TS_FALLBACK + assert started.time_confidence <= 0.3 + + completed = create_test_event(status="completed").model_copy( + update={ + "actual_end": None, + "message_published_at": published_at, + "time_source": TimeSource.EXPLICIT, + "time_confidence": 0.8, + } + ) + result = apply_time_completion_policy( + completed, + message_published_at=completed.message_published_at, + fallback_ts=fallback_ts, + ) + assert result.changed is True + assert completed.status == EventStatus.COMPLETED + assert completed.actual_end == published_at + assert completed.time_source == TimeSource.TS_FALLBACK + assert completed.time_confidence <= 0.3 + + +def test_time_completion_noop_for_non_required_status() -> None: + published_at = datetime(2025, 12, 1, 12, 0, tzinfo=UTC) + event = create_test_event(status="updated").model_copy( + update={ + "planned_start": None, + "actual_start": None, + "actual_end": None, + "message_published_at": published_at, + } + ) + result = apply_time_completion_policy( + event, + message_published_at=event.message_published_at, + fallback_ts=published_at, + ) + assert result.changed is False + + +def test_time_completion_uses_fallback_ts_when_message_ts_missing() -> None: + fallback_ts = datetime(2025, 12, 1, 11, 0, tzinfo=UTC) + planned = create_test_event(status="planned").model_copy( + update={ + "planned_start": None, + "actual_start": None, + "actual_end": None, + "message_published_at": None, + "time_source": TimeSource.EXPLICIT, + "time_confidence": 0.9, + } + ) + result = apply_time_completion_policy( + planned, + message_published_at=None, + fallback_ts=fallback_ts, + ) + assert result.changed is True + assert planned.planned_start == fallback_ts From 5e7dd41737914511448bc882a905f388897d403b Mon Sep 17 00:00:00 2001 From: VaitaR Date: Sat, 13 Dec 2025 18:41:10 +0300 Subject: [PATCH 2/3] feat: Add development requirements for testing and code quality tools --- requirements-dev.txt | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 requirements-dev.txt diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..b558347 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,21 @@ +-r requirements.txt + +# Testing +pytest==8.0.0 +pytest-asyncio==0.23.5 +pytest-cov==4.1.0 +pytest-mock==3.12.0 +pytest-timeout==2.2.0 + +# Code Quality +ruff==0.12.8 +mypy==1.8.0 +pre-commit==3.6.2 + +# Type stubs (pinned for reproducible CI installs) +types-jsonschema==4.21.0.20240331 +types-pytz==2024.1.0.20240203 +types-PyYAML==6.0.12.20240311 +types-python-dateutil==2.8.19.20240106 +types-requests==2.31.0.20240218 + From 781dede1723a053f82620be9f505ebb478962cef Mon Sep 17 00:00:00 2001 From: VaitaR Date: Sat, 13 Dec 2025 19:05:29 +0300 Subject: [PATCH 3/3] feat: Update CI and pre-commit workflows to use development requirements and improve metrics exporter handling --- .github/workflows/ci.yml | 4 ++-- .github/workflows/pre-commit.yml | 3 +-- .pre-commit-config.yaml | 1 + Makefile | 23 +++++++++++++++++++---- config/prompts/telegram.yaml | 4 ++-- docs/EXTRACTION_QUALITY_AUDIT_2025_12.md | 22 +++++++++++----------- docs/OPERATIONS_OBSERVABILITY.md | 6 +++--- docs/TECHNICAL_SPEC_EXTRACTION_QUALITY.md | 1 - docs/pr_arch_prompt.md | 2 +- docs/pr_audit_prompt.md | 2 +- requirements-dev.txt | 1 - src/observability/metrics.py | 14 ++++++++++++-- 12 files changed, 53 insertions(+), 30 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a244164..dd546a3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -60,7 +60,7 @@ jobs: - name: Install dependencies (matches Makefile exactly) run: | - uv pip install --python $(which python) -r requirements.txt + uv pip install --python $(which python) -r requirements-dev.txt - name: Mypy (strict) - matches Makefile exactly run: | @@ -86,7 +86,7 @@ jobs: - name: Install dependencies run: | - uv pip install --python $(which python) -r requirements.txt + uv pip install --python $(which python) -r requirements-dev.txt - name: Run tests with coverage (matches Makefile exactly) env: diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index 8cd3eb2..ffc60fd 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -30,8 +30,7 @@ jobs: - name: Install pre-commit and dependencies run: | - uv pip install --python $(which python) pre-commit - uv pip install --python $(which python) -r requirements.txt + uv pip install --python $(which python) -r requirements-dev.txt - name: Run pre-commit hooks (fastest feedback) run: | diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3ff8b7e..ae60746 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -47,6 +47,7 @@ repos: hooks: - id: trailing-whitespace name: Remove trailing whitespace + args: ["--markdown-linebreak-ext=md"] - id: end-of-file-fixer name: Add missing end-of-file exclude: ^(alembic/versions/.*\.py)$ diff --git a/Makefile b/Makefile index 847715c..055fb1c 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,15 @@ install: ## Install dependencies @echo "$(BLUE)Installing dependencies...$(NC)" pip install -r requirements.txt +deps-check: ## Check dependency resolution (uv dry-run, matches CI) + @echo "$(BLUE)Checking dependency resolution with uv...$(NC)" + @if command -v uv >/dev/null 2>&1; then \ + uv pip install --dry-run --python "$$(which python)" -r requirements-dev.txt >/dev/null && \ + echo "$(GREEN)✓ Dependency resolution OK$(NC)"; \ + else \ + echo "$(YELLOW)WARN: uv not found; skipping deps-check (CI uses uv)$(NC)"; \ + fi + format: ## Format code with ruff @echo "$(BLUE)Formatting code with ruff...$(NC)" ruff format . @@ -101,16 +110,22 @@ ci-local: ## Run CI checks locally (same as GitHub Actions) @echo "$(BLUE) Running CI Pipeline (Local)$(NC)" @echo "$(BLUE)===========================================$(NC)" @echo "" - @echo "$(YELLOW)Step 1/4: Format Check$(NC)" + @echo "$(YELLOW)Step 0/6: Dependency Resolution$(NC)" + @make deps-check + @echo "" + @echo "$(YELLOW)Step 1/6: Pre-commit (auto-fix)$(NC)" + @make pre-commit + @echo "" + @echo "$(YELLOW)Step 2/6: Format Check$(NC)" @make format-check @echo "" - @echo "$(YELLOW)Step 2/4: Lint$(NC)" + @echo "$(YELLOW)Step 3/6: Lint$(NC)" @make lint @echo "" - @echo "$(YELLOW)Step 3/4: Type Check$(NC)" + @echo "$(YELLOW)Step 4/6: Type Check$(NC)" @make typecheck @echo "" - @echo "$(YELLOW)Step 4/4: Tests$(NC)" + @echo "$(YELLOW)Step 5/6: Tests$(NC)" @make test @echo "" @echo "$(GREEN)===========================================$(NC)" diff --git a/config/prompts/telegram.yaml b/config/prompts/telegram.yaml index a7838c8..03c6661 100644 --- a/config/prompts/telegram.yaml +++ b/config/prompts/telegram.yaml @@ -19,13 +19,13 @@ system: | 1) security_incident: - Hacks, exploits, breaches, theft of funds, critical vulnerabilities, compromised keys, bridge exploits. - - Examples (EN/RU): "hack", "exploit", "breach", "security incident", "funds stolen", "drained", + - Examples (EN/RU): "hack", "exploit", "breach", "security incident", "funds stolen", "drained", "взлом", "эксплойт", "угнали средства", "слив приватных ключей", "брижд взломали", "протокол задрейнили". 2) competitor_update: - Product / business changes of other wallets, exchanges, payment apps or custodial services that may compete with wallet.tg. - Include: new features, supported assets, cards, P2P, staking, fees, geographies, integrations with Telegram or messengers. - - Treat as competitor any product that offers: custody of crypto/stablecoins, payments, P2P, on/off-ramp, cards, swaps, + - Treat as competitor any product that offers: custody of crypto/stablecoins, payments, P2P, on/off-ramp, cards, swaps, especially if it has a Telegram bot or is used inside messengers. - Examples (EN/RU): "launched a new wallet", "support USDT on Tron", "new crypto card", "P2P service", "запустили кошелёк", "кошелёк в Telegram", "бот для криптоплатежей", "поддержка USDT", "запуск P2P", "новая карта". diff --git a/docs/EXTRACTION_QUALITY_AUDIT_2025_12.md b/docs/EXTRACTION_QUALITY_AUDIT_2025_12.md index 92069fa..86871da 100644 --- a/docs/EXTRACTION_QUALITY_AUDIT_2025_12.md +++ b/docs/EXTRACTION_QUALITY_AUDIT_2025_12.md @@ -132,7 +132,7 @@ if event.status == EventStatus.COMPLETED: # ВАРИАНТ B: Изменить промпт (более строгий контракт) # В slack.yaml добавить: -# "CRITICAL: If status='completed', you MUST provide actual_end. +# "CRITICAL: If status='completed', you MUST provide actual_end. # If you don't know the exact time, use message timestamp." ``` @@ -381,13 +381,13 @@ if event.status == EventStatus.COMPLETED: ```python def truncate_or_chunk( - text: str, - char_budget: int, + text: str, + char_budget: int, overlap: int = 200 ) -> list[str]: if len(text) <= char_budget: return [text] - + chunks = [] start = 0 while start < len(text): @@ -417,7 +417,7 @@ EVENTS_EXTRACTED_TOTAL = Counter( ) EVENTS_BLOCKED_TOTAL = Counter( - "events_blocked_total", + "events_blocked_total", "Events blocked by validation", ["source", "channel", "reason"] ) @@ -488,9 +488,9 @@ tests/ def test_extraction_fixture(fixture_name, mock_llm): message = load_fixture(f"extraction/slack/{fixture_name}.json") expected = load_fixture(f"expected/{fixture_name}_events.json") - + result = extract_events_use_case(...) - + # Проверяем инварианты: assert result.events_extracted >= expected["min_events"] assert all(e.category in expected["allowed_categories"] for e in result.events) @@ -539,16 +539,16 @@ prompts: ```python def get_cached_llm_response_semantic( - self, - text: str, + self, + text: str, similarity_threshold: float = 0.95 ) -> LLMResponse | None: # 1. Embed text embedding = self.embedding_model.encode(text) - + # 2. Search similar in cache similar = self.vector_store.search(embedding, threshold=similarity_threshold) - + if similar: return similar.response return None diff --git a/docs/OPERATIONS_OBSERVABILITY.md b/docs/OPERATIONS_OBSERVABILITY.md index 4bbb39c..0ab6076 100644 --- a/docs/OPERATIONS_OBSERVABILITY.md +++ b/docs/OPERATIONS_OBSERVABILITY.md @@ -14,7 +14,7 @@ conflicts on the host. - Endpoint: `http://:9000/metrics` - Default bind address: `0.0.0.0` - Dedicated container: `metrics-exporter` -- Disable auto-start (e.g., for tests or ad-hoc scripts): set `METRICS_EXPORTER_AUTO_START=0` +- Auto-start is disabled by default; set `METRICS_EXPORTER_AUTO_START=1` to enable it - Override port: set `METRICS_PORT=` Verify that metrics are reachable from the host: @@ -26,8 +26,8 @@ curl -sf http://localhost:9000/metrics | head If the command above succeeds you should see Prometheus samples such as `pipeline_jobs_submitted_total` and `pipeline_stage_duration_seconds`. -Need to expose metrics from a one-off script? Export `METRICS_EXPORTER_AUTO_START=0`, import -`ensure_metrics_exporter()`, and call it explicitly. Alternatively, run `python -m +Need to expose metrics from a one-off script? Import `ensure_metrics_exporter()` and call it +explicitly. Alternatively, run `python -m src.observability.metrics` to launch the HTTP server and keep the process alive until it receives a shutdown signal. diff --git a/docs/TECHNICAL_SPEC_EXTRACTION_QUALITY.md b/docs/TECHNICAL_SPEC_EXTRACTION_QUALITY.md index a85b3e9..352b2e9 100644 --- a/docs/TECHNICAL_SPEC_EXTRACTION_QUALITY.md +++ b/docs/TECHNICAL_SPEC_EXTRACTION_QUALITY.md @@ -237,4 +237,3 @@ - Как трактовать “время события” для TG‑новостей: timestamp поста часто “время публикации новости”, а не “время события”. Нужна договорённость, но fallback улучшает полезность. - Рост prompt может снизить cache hit rate (если включать много уникальных полей) — нужно строго ограничить контекст (anchors/сигналы/ссылки) и не включать полный raw payload. - Валидации: какие ошибки должны быть “blocking”, а какие — warning (особенно time/status). - diff --git a/docs/pr_arch_prompt.md b/docs/pr_arch_prompt.md index 4e76ef7..5c5d633 100644 --- a/docs/pr_arch_prompt.md +++ b/docs/pr_arch_prompt.md @@ -153,4 +153,4 @@ Remove: remove/drop/deprecate/delete, вырезание слоя/фичи/се Не смешивай гипотезы и факты: явно помечай предположения. -Сглаженно и лаконично; без повторов. \ No newline at end of file +Сглаженно и лаконично; без повторов. diff --git a/docs/pr_audit_prompt.md b/docs/pr_audit_prompt.md index f340c90..92d4e59 100644 --- a/docs/pr_audit_prompt.md +++ b/docs/pr_audit_prompt.md @@ -87,4 +87,4 @@ B) Структурированный JSON для последующей заг Причинно-следственные связи формулируй чётко: PR X → вызвал проблему Y → PR Z исправил → почему причина вероятна. -Антипаттерны фиксируй только при ≥2 совпадениях. \ No newline at end of file +Антипаттерны фиксируй только при ≥2 совпадениях. diff --git a/requirements-dev.txt b/requirements-dev.txt index b558347..32f05ad 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -18,4 +18,3 @@ types-pytz==2024.1.0.20240203 types-PyYAML==6.0.12.20240311 types-python-dateutil==2.8.19.20240106 types-requests==2.31.0.20240218 - diff --git a/src/observability/metrics.py b/src/observability/metrics.py index e28f49f..dc5ca34 100644 --- a/src/observability/metrics.py +++ b/src/observability/metrics.py @@ -141,7 +141,10 @@ def _fallback_start_http_server(port: int) -> None: def _should_autostart() -> bool: """Return True when the metrics exporter should auto-start.""" - raw_value = os.getenv(METRICS_EXPORTER_AUTO_START_ENV, "1") + # Default to disabled to avoid binding host ports during tests/import-time usage. + # Dedicated runtime entrypoints (e.g. `python -m src.observability.metrics`) call + # `ensure_metrics_exporter()` explicitly. + raw_value = os.getenv(METRICS_EXPORTER_AUTO_START_ENV, "0") normalized = raw_value.strip().lower() return normalized in {"1", "true", "yes", "on"} @@ -206,7 +209,14 @@ def run_metrics_exporter_forever() -> None: if _should_autostart(): - ensure_metrics_exporter() + try: + ensure_metrics_exporter() + except OSError as exc: # pragma: no cover - import-time best-effort + logger.warning( + "metrics_exporter_autostart_failed", + port=_resolve_metrics_port(), + error=str(exc), + ) if __name__ == "__main__":