Skip to content

Commit ecad839

Browse files
committed
feat(workers): implement and register taskiq middlewares for logging, metrics, and retries
1 parent 159b226 commit ecad839

File tree

5 files changed

+141
-0
lines changed

5 files changed

+141
-0
lines changed

backend/workers/broker.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from typing import Any
2+
3+
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
4+
5+
from backend.config.settings import settings
6+
from backend.workers.middlewares.logging import StructlogMiddleware
7+
from backend.workers.middlewares.metrics import PrometheusMetricsMiddleware
8+
from backend.workers.middlewares.retry import RetryTrackerMiddleware
9+
10+
redis_url = f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/0"
11+
12+
# Ініціалізуємо брокер з явно вказаною чергою (ізоляція від інших проєктів)
13+
broker = ListQueueBroker(redis_url, queue_name="seratonin_tasks")
14+
15+
# Додаємо явну типізацію для усунення помилок Pylance
16+
result_backend: RedisAsyncResultBackend[Any] = RedisAsyncResultBackend(
17+
redis_url, result_ex_time=3600
18+
)
19+
broker.with_result_backend(result_backend)
20+
21+
# Підключення Middlewares у правильному порядку
22+
broker.add_middlewares(
23+
StructlogMiddleware(),
24+
RetryTrackerMiddleware(),
25+
PrometheusMetricsMiddleware(),
26+
)

backend/workers/dependencies.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from typing import Annotated
2+
3+
from taskiq import TaskiqDepends
4+
5+
from backend.integrations.llm.router import LLMRouter
6+
from backend.services.content_generator import ContentGenerator
7+
from backend.services.fact_checker import FactChecker
8+
from backend.services.style_matcher import StyleMatcher
9+
10+
11+
def get_llm_router() -> LLMRouter:
12+
"""Ініціалізує та повертає роутер моделей."""
13+
return LLMRouter()
14+
15+
16+
def get_style_matcher() -> StyleMatcher:
17+
"""Ініціалізує сервіс пошуку стилю."""
18+
return StyleMatcher()
19+
20+
21+
def get_fact_checker(
22+
llm_router: Annotated[LLMRouter, TaskiqDepends(get_llm_router)],
23+
) -> FactChecker:
24+
"""Ініціалізує сервіс перевірки фактів, ін'єктуючи роутер."""
25+
return FactChecker(llm_router=llm_router)
26+
27+
28+
def get_content_generator(
29+
llm_router: Annotated[LLMRouter, TaskiqDepends(get_llm_router)],
30+
) -> ContentGenerator:
31+
"""
32+
Фабрика для генератора контенту.
33+
Використовує спільний інстанс роутера для оптимізації ресурсів.
34+
"""
35+
return ContentGenerator(llm_router=llm_router)
36+
37+
38+
# Заглушка для Milestone 7.4 (Publisher Service)
39+
# def get_publisher_service() -> PublisherService:
40+
# return PublisherService()
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from typing import Any
2+
3+
import structlog
4+
from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult
5+
6+
logger = structlog.get_logger()
7+
8+
9+
class StructlogMiddleware(TaskiqMiddleware):
10+
def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
11+
"""Виконується безпосередньо перед запуском таску у воркері."""
12+
logger.info(
13+
"task_execution_started",
14+
task_id=message.task_id,
15+
task_name=message.task_name,
16+
)
17+
return message
18+
19+
def post_execute(self, message: TaskiqMessage, result: TaskiqResult[Any]) -> None:
20+
"""Виконується після завершення таску (успішного або з помилкою)."""
21+
if result.is_err:
22+
logger.error(
23+
"task_execution_failed",
24+
task_id=message.task_id,
25+
task_name=message.task_name,
26+
error=str(result.error),
27+
)
28+
else:
29+
logger.info(
30+
"task_execution_success",
31+
task_id=message.task_id,
32+
task_name=message.task_name,
33+
execution_time_sec=round(result.execution_time, 3),
34+
)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from typing import Any
2+
3+
import structlog
4+
from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult
5+
6+
logger = structlog.get_logger()
7+
8+
9+
class PrometheusMetricsMiddleware(TaskiqMiddleware):
10+
def post_execute(self, message: TaskiqMessage, result: TaskiqResult[Any]) -> None:
11+
"""
12+
TODO: Інтеграція з prometheus_client.
13+
Тут будуть оновлюватись Counter (кількість задач) та Histogram (час виконання).
14+
"""
15+
status = "error" if result.is_err else "success"
16+
logger.debug(
17+
"metrics_updated",
18+
metric="task_execution",
19+
task_name=message.task_name,
20+
status=status,
21+
)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import structlog
2+
from taskiq import TaskiqMessage, TaskiqMiddleware
3+
4+
logger = structlog.get_logger()
5+
6+
7+
class RetryTrackerMiddleware(TaskiqMiddleware):
8+
def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage:
9+
"""Відстежує, чи є цей запуск повторним (retry)."""
10+
# Taskiq зберігає кількість спроб у labels
11+
retry_count = message.labels.get("retry_count", 0)
12+
13+
if int(retry_count) > 0:
14+
logger.warning(
15+
"task_retry_attempt",
16+
task_id=message.task_id,
17+
task_name=message.task_name,
18+
attempt=retry_count,
19+
)
20+
return message

0 commit comments

Comments
 (0)