Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .env.dist
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ UNKNOWN_PLAYER_MAX_RETRY_AFTER=21600

# Cache configuration
CACHE_TTL_HEADER=X-Cache-TTL
PLAYER_CACHE_TIMEOUT=86400
HEROES_PATH_CACHE_TIMEOUT=86400
HERO_PATH_CACHE_TIMEOUT=86400
CSV_CACHE_TIMEOUT=86400
Expand All @@ -54,6 +53,14 @@ SEARCH_ACCOUNT_PATH_CACHE_TIMEOUT=600
HERO_STATS_CACHE_TIMEOUT=3600
PLAYER_PROFILE_MAX_AGE=259200

# SWR staleness thresholds
HEROES_STALENESS_THRESHOLD=86400 # 24 hours
MAPS_STALENESS_THRESHOLD=86400
GAMEMODES_STALENESS_THRESHOLD=86400
ROLES_STALENESS_THRESHOLD=86400
PLAYER_STALENESS_THRESHOLD=3600 # 1 hour
STALE_CACHE_TIMEOUT=60

# Critical error Discord webhook
DISCORD_WEBHOOK_ENABLED=false
DISCORD_WEBHOOK_URL=""
Expand Down
6 changes: 3 additions & 3 deletions app/adapters/blizzard/parsers/hero.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
if TYPE_CHECKING:
from selectolax.lexbor import LexborNode

from app.adapters.blizzard.client import BlizzardClient
from app.domain.ports import BlizzardClientPort

from app.enums import Locale
from app.exceptions import ParserBlizzardError, ParserParsingError
Expand All @@ -26,7 +26,7 @@


async def fetch_hero_html(
client: BlizzardClient,
client: BlizzardClientPort,
hero_key: str,
locale: Locale = Locale.ENGLISH_US,
) -> str:
Expand Down Expand Up @@ -288,7 +288,7 @@ def _parse_hero_stadium_powers(stadium_wrapper: LexborNode) -> list[dict]:


async def parse_hero(
client: BlizzardClient,
client: BlizzardClientPort,
hero_key: str,
locale: Locale = Locale.ENGLISH_US,
) -> dict:
Expand Down
6 changes: 3 additions & 3 deletions app/adapters/blizzard/parsers/hero_stats_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from app.players.enums import PlayerGamemode, PlayerPlatform, PlayerRegion

if TYPE_CHECKING:
from app.adapters.blizzard.client import BlizzardClient
from app.domain.ports import BlizzardClientPort

# Mappings for query parameters
PLATFORM_MAPPING: dict[PlayerPlatform, str] = {
Expand All @@ -25,7 +25,7 @@


async def fetch_hero_stats_json(
client: BlizzardClient,
client: BlizzardClientPort,
platform: PlayerPlatform,
gamemode: PlayerGamemode,
region: PlayerRegion,
Expand Down Expand Up @@ -125,7 +125,7 @@ def _normalize_rate(rate: float) -> float:


async def parse_hero_stats_summary(
client: BlizzardClient,
client: BlizzardClientPort,
platform: PlayerPlatform,
gamemode: PlayerGamemode,
region: PlayerRegion,
Expand Down
6 changes: 3 additions & 3 deletions app/adapters/blizzard/parsers/heroes.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
from app.heroes.enums import HeroGamemode

if TYPE_CHECKING:
from app.adapters.blizzard.client import BlizzardClient
from app.domain.ports import BlizzardClientPort


async def fetch_heroes_html(
client: BlizzardClient,
client: BlizzardClientPort,
locale: Locale = Locale.ENGLISH_US,
) -> str:
"""
Expand Down Expand Up @@ -95,7 +95,7 @@ def filter_heroes(


async def parse_heroes(
client: BlizzardClient,
client: BlizzardClientPort,
locale: Locale = Locale.ENGLISH_US,
role: str | None = None,
gamemode: HeroGamemode | None = None,
Expand Down
25 changes: 25 additions & 0 deletions app/adapters/blizzard/parsers/heroes_hitpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Stateless parser functions for heroes hitpoints data (HP, armor, shields) from CSV"""

from app.adapters.csv import CSVReader

HITPOINTS_KEYS = {"health", "armor", "shields"}


def parse_heroes_hitpoints() -> dict[str, dict]:
"""Parse heroes hitpoints (health/armor/shields) from the heroes CSV file.

Returns:
Dict mapping hero key to hitpoints data.
Example: {"ana": {"hitpoints": {"health": 200, "armor": 0, "shields": 0, "total": 200}}}
"""
csv_reader = CSVReader()
csv_data = csv_reader.read_csv_file("heroes")

return {row["key"]: {"hitpoints": _get_hitpoints(row)} for row in csv_data}


def _get_hitpoints(row: dict) -> dict:
"""Extract hitpoints data from a hero CSV row."""
hitpoints = {hp_key: int(row[hp_key]) for hp_key in HITPOINTS_KEYS}
hitpoints["total"] = sum(hitpoints.values())
return hitpoints
39 changes: 0 additions & 39 deletions app/adapters/blizzard/parsers/heroes_stats.py

This file was deleted.

6 changes: 3 additions & 3 deletions app/adapters/blizzard/parsers/roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
from app.roles.helpers import get_role_from_icon_url

if TYPE_CHECKING:
from app.adapters.blizzard.client import BlizzardClient
from app.domain.ports import BlizzardClientPort


async def fetch_roles_html(
client: BlizzardClient,
client: BlizzardClientPort,
locale: Locale = Locale.ENGLISH_US,
) -> str:
"""Fetch roles HTML from Blizzard homepage"""
Expand Down Expand Up @@ -86,7 +86,7 @@ def parse_roles_html(html: str) -> list[dict]:


async def parse_roles(
client: BlizzardClient,
client: BlizzardClientPort,
locale: Locale = Locale.ENGLISH_US,
) -> list[dict]:
"""
Expand Down
55 changes: 29 additions & 26 deletions app/adapters/cache/valkey_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"""

import json
import time
from compression import zstd
from functools import wraps
from typing import TYPE_CHECKING, Any
Expand Down Expand Up @@ -128,46 +129,48 @@ async def exists(self, key: str) -> bool:
# Application-specific cache methods
@handle_valkey_error(default_return=None)
async def get_api_cache(self, cache_key: str) -> dict | list | None:
"""Get the API Cache value associated with a given cache key"""
"""Get the API Cache value associated with a given cache key."""
api_cache_key = f"{settings.api_cache_key_prefix}:{cache_key}"
api_cache = await self.valkey_server.get(api_cache_key)
if not api_cache or not isinstance(api_cache, bytes):
return None
return self._decompress_json_value(api_cache)
envelope = self._decompress_json_value(api_cache)
if isinstance(envelope, dict) and "data_json" in envelope:
return json.loads(envelope["data_json"])
return envelope

@handle_valkey_error(default_return=None)
async def update_api_cache(
self, cache_key: str, value: dict | list, expire: int
self,
cache_key: str,
value: dict | list,
expire: int,
*,
stored_at: int | None = None,
staleness_threshold: int | None = None,
stale_while_revalidate: int = 0,
) -> None:
"""Update or set an API Cache value with an expiration value (in seconds)"""
bytes_value = self._compress_json_value(value)
"""Wrap value in a metadata envelope, compress, and store with TTL.

``data_json`` is the pre-serialized JSON string (key order preserved by
Python's ``json.dumps``), so nginx/Lua can print it verbatim without
re-encoding through cjson (which does not guarantee key ordering).
"""
envelope: dict = {
"data_json": json.dumps(value, separators=(",", ":")),
"stored_at": stored_at if stored_at is not None else int(time.time()),
"staleness_threshold": (
staleness_threshold if staleness_threshold is not None else expire
),
"stale_while_revalidate": stale_while_revalidate,
}
bytes_value = self._compress_json_value(envelope)
await self.valkey_server.set(
f"{settings.api_cache_key_prefix}:{cache_key}",
bytes_value,
ex=expire,
)

@handle_valkey_error(default_return=None)
async def get_player_cache(self, player_id: str) -> dict | list | None:
"""Get the Player Cache value associated with a given cache key"""
player_key = f"{settings.player_cache_key_prefix}:{player_id}"
player_cache = await self.valkey_server.get(player_key)
if not player_cache or not isinstance(player_cache, bytes):
return None
# Reset the TTL before returning the value
await self.valkey_server.expire(player_key, settings.player_cache_timeout)
return self._decompress_json_value(player_cache)

@handle_valkey_error(default_return=None)
async def update_player_cache(self, player_id: str, value: dict) -> None:
"""Update or set a Player Cache value"""
compressed_value = self._compress_json_value(value)
await self.valkey_server.set(
f"{settings.player_cache_key_prefix}:{player_id}",
value=compressed_value,
ex=settings.player_cache_timeout,
)

@handle_valkey_error(default_return=False)
async def is_being_rate_limited(self) -> bool:
"""Check if Blizzard rate limit is currently active"""
Expand Down
28 changes: 2 additions & 26 deletions app/adapters/csv/csv_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,11 @@


class CSVReader:
"""Adapter for reading CSV data files"""
"""Adapter for reading CSV data files from app/adapters/csv/data/"""

@staticmethod
def read_csv_file(filename: str) -> list[dict[str, str]]:
"""
Read a CSV file from app/adapters/csv/data/ directory

Args:
filename: Name of the CSV file without extension (e.g., "heroes", "maps", "gamemodes")

Returns:
List of dictionaries with CSV rows
"""
"""Read a CSV file by name (without extension) from the data/ directory."""
csv_path = Path(__file__).parent / "data" / f"{filename}.csv"
with csv_path.open(encoding="utf-8") as csv_file:
return list(csv.DictReader(csv_file, delimiter=","))

@staticmethod
def read_csv_file_legacy(filename: str) -> list[dict[str, str]]:
"""
Legacy method for reading CSV files from old location app/{module}/data/{module}.csv
This is kept for backward compatibility during migration.

Args:
filename: Name of the module/CSV file (e.g., "heroes", "maps", "gamemodes")

Returns:
List of dictionaries with CSV rows
"""
csv_path = Path.cwd() / "app" / filename / "data" / f"{filename}.csv"
with csv_path.open(encoding="utf-8") as csv_file:
return list(csv.DictReader(csv_file, delimiter=","))
87 changes: 87 additions & 0 deletions app/adapters/tasks/asyncio_task_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""AsyncIO task queue adapter — Phase 4 in-process background tasks with deduplication"""

import asyncio
import time
from typing import TYPE_CHECKING, Any, ClassVar

from app.metaclasses import Singleton
from app.monitoring.metrics import (
background_tasks_duration_seconds,
background_tasks_queue_size,
background_tasks_total,
)
from app.overfast_logger import logger

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Coroutine


class AsyncioTaskQueue(metaclass=Singleton):
"""In-process task queue backed by asyncio.create_task().

Uses a class-level set for deduplication so concurrent requests don't
trigger duplicate refreshes for the same entity.

When a ``coro`` is provided to ``enqueue``, it is executed as a real
background task. Phase 5 will replace this adapter with an arq-backed
one that dispatches to a worker process instead, but the interface stays
the same.
"""

_pending_jobs: ClassVar[set[str]] = set()

async def enqueue( # NOSONAR
self,
task_name: str,
*_args: Any,
job_id: str | None = None,
coro: Coroutine[Any, Any, Any] | None = None,
on_complete: Callable[[str], Awaitable[None]] | None = None,
on_failure: Callable[[str, Exception], Awaitable[None]] | None = None,
**_kwargs: Any,
) -> str:
"""Schedule a background task if not already pending."""
effective_id = job_id or task_name
if effective_id in self._pending_jobs:
logger.debug(f"[TaskQueue] Skipping duplicate job: {effective_id}")
if coro is not None:
coro.close() # avoid "coroutine was never awaited" warning
return effective_id

self._pending_jobs.add(effective_id)
background_tasks_queue_size.labels(task_type=task_name).inc()

async def _run() -> None:
start = time.monotonic()
status = "success"
try:
logger.info(
f"[TaskQueue] Running task '{task_name}' (job_id={effective_id})"
)
if coro is not None:
await coro
if on_complete is not None:
await on_complete(effective_id)
except Exception as exc: # noqa: BLE001
status = "failure"
logger.warning(
f"[TaskQueue] Task '{task_name}' (job_id={effective_id}) failed: {exc}"
)
if on_failure is not None:
await on_failure(effective_id, exc)
finally:
elapsed = time.monotonic() - start
background_tasks_total.labels(task_type=task_name, status=status).inc()
background_tasks_duration_seconds.labels(task_type=task_name).observe(
elapsed
)
background_tasks_queue_size.labels(task_type=task_name).dec()
self._pending_jobs.discard(effective_id)

task = asyncio.create_task(_run(), name=effective_id)
task.add_done_callback(lambda _: None)
return effective_id

async def is_job_pending_or_running(self, job_id: str) -> bool: # NOSONAR
"""Return True if a job with this ID is already in-flight."""
return job_id in self._pending_jobs
Loading