From 3012126a1c09fcd551c9fe1b82d9c31341c6deb3 Mon Sep 17 00:00:00 2001 From: mac Date: Tue, 19 May 2026 23:44:57 +0800 Subject: [PATCH 1/4] probe concurrency guardrails and operations --- CONTRIBUTING.md | 10 +++++ README.md | 2 +- src/paperscout/concurrency.py | 22 +++++++++++ src/paperscout/models.py | 5 ++- src/paperscout/monitor.py | 12 +++--- src/paperscout/sources.py | 73 +++++++++++++++++++++++++++++------ tests/test_sources.py | 71 +++++++++++++++++++++++++++++++--- 7 files changed, 169 insertions(+), 26 deletions(-) create mode 100644 src/paperscout/concurrency.py diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b55e701..0ec66ed 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -6,8 +6,18 @@ Thank you for your interest in improving paperscout. This document describes how - **[docs/onboarding.md](docs/onboarding.md)** — clone, database, `.env`, tests, and running the app locally. - **[docs/handoff.md](docs/handoff.md)** — maintainer-oriented design notes and operational gotchas. +- **[docs/architecture.md](docs/architecture.md)** — concurrency model (event loop vs threads). +- **[docs/probe-operations.md](docs/probe-operations.md)** — production probe volume, tuning, troubleshooting. - **[README.md](README.md)** — product behavior, Slack setup, deployment, and environment variable tables. +### Concurrency + +When adding blocking I/O or new data sources: + +1. Use **`run_blocking_io()`** (see `src/paperscout/concurrency.py`) only for pure blocking I/O with **no shared in-process mutable state** (e.g. psycopg2 via the connection pool). +2. **Never access `ISOProber._stats`, `WG21Index.papers`, or other source internals from a thread** — keep probes and index refresh on the asyncio event loop. +3. See **[docs/architecture.md](docs/architecture.md)** for the full model and **[docs/probe-operations.md](docs/probe-operations.md)** for probe tuning. + ## Workflow 1. **Fork** the repository (if you lack direct push access) and **clone** your fork. diff --git a/README.md b/README.md index be3bbc0..b308c7b 100644 --- a/README.md +++ b/README.md @@ -358,7 +358,7 @@ Every P-number from 1 to the effective frontier is probed. Numbers are divided i | **Cold** (1/48 per cycle ≈ daily) | All other P-numbers | everything else | D-prefix, latest+1, pdf+html | | **Cold** (1/48 per cycle) | Gap numbers (no index entry) | 1..frontier minus known | D+P, R0..R1, pdf+html | -Typical per-cycle request count: **~1,600–2,000 HEAD requests** (~8–10 s at 20 concurrent, 100 ms latency). A full sweep of all ~4,000 P-numbers completes within ~24 h of continuous 30-min polling. +Typical per-cycle request count: **~1,600–2,000 HEAD requests** (~8–10 s at 20 concurrent, 100 ms latency). A full sweep of all ~4,000 P-numbers completes within ~24 h of continuous 30-min polling. See **[docs/probe-operations.md](docs/probe-operations.md)** for operational thresholds, log fields, and troubleshooting. ### Alerting by Last-Modified diff --git a/src/paperscout/concurrency.py b/src/paperscout/concurrency.py new file mode 100644 index 0000000..8ef6466 --- /dev/null +++ b/src/paperscout/concurrency.py @@ -0,0 +1,22 @@ +"""Helpers for running blocking I/O off the asyncio event loop.""" + +from __future__ import annotations + +import asyncio +from collections.abc import Callable +from typing import TypeVar + +_T = TypeVar("_T") + + +async def run_blocking_io(fn: Callable[..., _T], /, *args, **kwargs) -> _T: + """Run *fn* in a worker thread without blocking the event loop. + + Use only for pure blocking I/O (e.g. psycopg2 queries) that does **not** + touch shared in-process mutable state such as ``ISOProber._stats``, + ``WG21Index.papers``, or other source internals. Each call should use its + own database connection from the pool. + + See ``docs/architecture.md`` for the full concurrency model. + """ + return await asyncio.to_thread(fn, *args, **kwargs) diff --git a/src/paperscout/models.py b/src/paperscout/models.py index 948bc71..cfdcc40 100644 --- a/src/paperscout/models.py +++ b/src/paperscout/models.py @@ -171,8 +171,9 @@ class ProbeHit: tier: Tier front_text: str = "" last_modified: datetime | None = field(default=None) - # True when Last-Modified is within alert_modified_hours of now, - # or when the header is absent (first-ever discovery of a new file). + # True when Last-Modified is within alert_modified_hours of now, when the + # header is absent, or when the header is present but unusable (first-ever + # discovery or bad Last-Modified — both treated as recent for alerting). is_recent: bool = False diff --git a/src/paperscout/monitor.py b/src/paperscout/monitor.py index 4f5f849..c4180a2 100644 --- a/src/paperscout/monitor.py +++ b/src/paperscout/monitor.py @@ -11,6 +11,7 @@ import httpx +from .concurrency import run_blocking_io from .config import Settings, settings from .errors import ConfigurationError, FailureCategory from .models import Paper, PerUserMatches, ProbeHit @@ -150,7 +151,7 @@ async def poll_once(self) -> PollResult: if not self._seeded: await self.seed() self._last_successful_poll = time.time() - self._last_probe_stats = dict(self.prober._stats) + self._last_probe_stats = self.prober.snapshot_stats() return PollResult( diff=DiffResult(new_papers=[], updated_papers=[]), probe_hits=[], @@ -228,9 +229,10 @@ async def poll_once(self) -> PollResult: ) break - # Dispatched to a thread because matches_for_users performs synchronous - # PostgreSQL I/O via psycopg2, which would block the event loop. - per_user_matches = await asyncio.to_thread( + # Safe to run off the event loop: matches_for_users only performs blocking + # PostgreSQL I/O via psycopg2 on its own pool connection — it does not + # touch ISOProber._stats, WG21Index.papers, or other shared source state. + per_user_matches = await run_blocking_io( self.user_watchlist.matches_for_users, diff.new_papers, recent_hits, @@ -268,7 +270,7 @@ async def poll_once(self) -> PollResult: len(per_user_matches), ) self._last_successful_poll = time.time() - self._last_probe_stats = dict(self.prober._stats) + self._last_probe_stats = self.prober.snapshot_stats() return result async def run_forever(self) -> None: diff --git a/src/paperscout/sources.py b/src/paperscout/sources.py index 34fda23..65e8bc7 100644 --- a/src/paperscout/sources.py +++ b/src/paperscout/sources.py @@ -3,8 +3,10 @@ from __future__ import annotations import asyncio +import json import logging import re +import threading import time from collections.abc import Iterable, Mapping from dataclasses import dataclass @@ -43,6 +45,7 @@ class WG21Index: def __init__(self, pool, cfg: Settings | None = None): self._cfg = cfg or settings self._cache = PaperCache(pool, ttl_hours=self._cfg.cache_ttl_hours) + # Replaced wholesale on every refresh(); never mutate in place. self.papers: dict[str, Paper] = {} self._max_rev: dict[int, int] = {} # P-number -> highest revision self._max_p: int = 0 # absolute highest P-number @@ -256,7 +259,11 @@ async def _fetch_front_text( class ISOProber: - """Async HEAD probe of isocpp draft URLs: hot every cycle, cold in rotating slices.""" + """Async HEAD probe of isocpp draft URLs: hot every cycle, cold in rotating slices. + + Designed for single-threaded async use on the event loop. Do not call from + threads, ``asyncio.to_thread()``, or thread-pool executors. + """ # Keys that _stats is reset to at the start of every run_cycle(). _STATS_TEMPLATE: dict[str, int] = { @@ -265,7 +272,7 @@ class ISOProber: "miss": 0, # server returned non-200 "hit_recent": 0, # 200 + Last-Modified within alert window "hit_old": 0, # 200 + Last-Modified outside alert window - "hit_no_lm": 0, # 200 + no Last-Modified header (treated as recent) + "hit_no_lm": 0, # 200 + no or unusable Last-Modified (treated as recent) "error": 0, # httpx / network exception } @@ -281,14 +288,31 @@ def __init__( self.user_watchlist = user_watchlist self.cfg = cfg or settings self._cycle = 0 + self._stats_lock = threading.Lock() + # This dict is mutated from async coroutines on the event loop. + # Thread-safety depends on asyncio cooperative scheduling. Do NOT access + # from asyncio.to_thread() or thread pool executors. self._stats: dict[str, int] = dict(self._STATS_TEMPLATE) + def _bump_stat(self, key: str, n: int = 1) -> None: + with self._stats_lock: + self._stats[key] += n + + def _reset_stats(self) -> None: + with self._stats_lock: + self._stats = dict(self._STATS_TEMPLATE) + + def snapshot_stats(self) -> dict[str, int]: + """Return a copy of per-cycle probe counters (lock-protected).""" + with self._stats_lock: + return dict(self._stats) + # ── Public API ─────────────────────────────────────────────────────────── async def run_cycle(self) -> list[ProbeHit]: """HEAD all scheduled URLs; return recent hits and persist discovery state.""" self._cycle += 1 - self._stats = dict(self._STATS_TEMPLATE) + self._reset_stats() t0 = time.monotonic() urls = self._build_probe_list() @@ -333,7 +357,7 @@ async def run_cycle(self) -> list[ProbeHit]: self.state.save() elapsed = time.monotonic() - t0 - s = self._stats + s = self.snapshot_stats() hit_total = s["hit_recent"] + s["hit_old"] + s["hit_no_lm"] log.info( "PROBE-DONE cycle=%d elapsed=%.1fs total=%d " @@ -351,6 +375,26 @@ async def run_cycle(self) -> list[ProbeHit]: s["skipped_in_index"], s["error"], ) + log.info( + "PROBE-CYCLE-SUMMARY %s", + json.dumps( + { + "cycle": self._cycle, + "cycle_requests": len(urls), + "cycle_duration_s": round(elapsed, 2), + "hot_probes": hot_count, + "cold_probes": cold_count, + "errors": s["error"], + "hit_total": hit_total, + "hit_recent": s["hit_recent"], + "hit_old": s["hit_old"], + "hit_no_lm": s["hit_no_lm"], + "miss": s["miss"], + "skipped_discovered": s["skipped_discovered"], + "skipped_in_index": s["skipped_in_index"], + } + ), + ) return hits # ── Probe-list builders ────────────────────────────────────────────────── @@ -492,13 +536,13 @@ async def _probe_one( ) -> ProbeHit | None: if self.state.is_discovered(url): log.debug("SKIP disc %s", url) - self._stats["skipped_discovered"] += 1 + self._bump_stat("skipped_discovered") return None paper_id = f"{prefix}{num:04d}R{rev}" ids = known_ids if known_ids is not None else self.index.get_known_paper_ids() if paper_id in ids: log.debug("SKIP idx %s", paper_id) - self._stats["skipped_in_index"] += 1 + self._bump_stat("skipped_in_index") return None async with sem: _max_retries = 3 @@ -521,14 +565,14 @@ async def _probe_one( url, exc, ) - self._stats["error"] += 1 + self._bump_stat("error") return None else: return None if resp.status_code != 200: log.debug("MISS %d %s", resp.status_code, url) - self._stats["miss"] += 1 + self._bump_stat("miss") return None # Determine recency from the Last-Modified response header. @@ -538,10 +582,15 @@ async def _probe_one( if lm_str: try: last_modified = parsedate_to_datetime(lm_str) + # Naive datetimes from parsedate_to_datetime are UTC. + if last_modified.tzinfo is None: + last_modified = last_modified.replace(tzinfo=timezone.utc) threshold = timedelta(hours=self.cfg.alert_modified_hours) is_recent = (datetime.now(timezone.utc) - last_modified) <= threshold except Exception: - pass + # Bad Last-Modified: merge with no-LM so we don't silently drop hits. + last_modified = None + is_recent = True else: # No Last-Modified: first-ever discovery of an untracked # file; treat as recent so we don't silently drop it. @@ -557,11 +606,11 @@ async def _probe_one( ) if is_recent and last_modified is not None: - self._stats["hit_recent"] += 1 + self._bump_stat("hit_recent") elif not is_recent: - self._stats["hit_old"] += 1 + self._bump_stat("hit_old") else: - self._stats["hit_no_lm"] += 1 + self._bump_stat("hit_no_lm") # Only fetch front text when we intend to alert. front_text = "" diff --git a/tests/test_sources.py b/tests/test_sources.py index 97bbfa8..d39cb30 100644 --- a/tests/test_sources.py +++ b/tests/test_sources.py @@ -756,20 +756,40 @@ def test_cold_known_skips_when_latest_none(self, fake_pool): assert 5 in cold_nums # normally probed async def test_probe_one_bad_last_modified_header(self, fake_pool): - """An unparsable Last-Modified header should not crash; is_recent stays False.""" + """Unparsable Last-Modified is treated like no-LM (recent, no silent drop).""" prober, _, _ = self._make_prober(fake_pool) url = "https://isocpp.org/files/papers/D9999R0.pdf" sem = asyncio.Semaphore(5) head_resp = MagicMock() head_resp.status_code = 200 head_resp.headers = {"last-modified": "this is not a date"} - client = AsyncMock() - client.head = AsyncMock(return_value=head_resp) + get_resp = _make_response(200, text="

x

") + client = _make_async_client(head_resp=head_resp, get_resp=get_resp) result = await prober._probe_one(client, sem, url, "D", 9999, 0, ".pdf", "cold") assert result is not None - assert result.is_recent is False - # No front_text GET for non-recent - client.get.assert_not_called() + assert result.is_recent is True + assert result.last_modified is None + assert prober._stats["hit_no_lm"] == 1 + client.get.assert_called() + + async def test_probe_one_naive_last_modified_recent(self, fake_pool): + """Naive-but-parseable Last-Modified within alert window counts as recent.""" + prober, _, _ = self._make_prober(fake_pool) + url = "https://isocpp.org/files/papers/D9992R0.pdf" + sem = asyncio.Semaphore(5) + naive_lm = datetime.now().replace(tzinfo=None) + lm_header = naive_lm.strftime("%a, %d %b %Y %H:%M:%S") + head_resp = MagicMock() + head_resp.status_code = 200 + head_resp.headers = {"last-modified": lm_header} + get_resp = _make_response(200, text="

x

") + client = _make_async_client(head_resp=head_resp, get_resp=get_resp) + result = await prober._probe_one(client, sem, url, "D", 9992, 0, ".pdf", "hot") + assert result is not None + assert result.is_recent is True + assert result.last_modified is not None + assert result.last_modified.tzinfo == timezone.utc + assert prober._stats["hit_recent"] == 1 def test_cold_excludes_hot_numbers(self, fake_pool): prober, index, _ = self._make_prober( @@ -1112,6 +1132,45 @@ async def mock_head(url, **_): assert old_hits[0].is_recent is False assert state.is_discovered(hit_url) + async def test_run_cycle_stats_integrity_under_concurrency(self, fake_pool): + """Every probed URL accounts for exactly one _stats bucket under concurrent gather.""" + index = WG21Index(fake_pool) + index._max_p = 100 + index._max_rev = {99: 0, 100: 0} + index._sorted_p_nums = [99, 100] + state = ProbeState(fake_pool) + cfg = make_test_settings( + hot_lookback_months=0, + hot_revision_depth=1, + frontier_window_above=0, + frontier_window_below=0, + gap_max_rev=0, + cold_cycle_divisor=100, + http_concurrency=10, + ) + prober = ISOProber(index, state, user_watchlist=_mock_wl([9999]), cfg=cfg) + + mock_client = _make_async_client(head_resp=_make_response(404)) + + with patch("paperscout.sources.httpx.AsyncClient") as mock_cls: + mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_cls.return_value.__aexit__ = AsyncMock(return_value=False) + await prober.run_cycle() + + urls = prober._build_probe_list() + s = prober.snapshot_stats() + accounted = ( + s["skipped_discovered"] + + s["skipped_in_index"] + + s["miss"] + + s["hit_recent"] + + s["hit_old"] + + s["hit_no_lm"] + + s["error"] + ) + assert accounted == len(urls) + assert s["miss"] == len(urls) # all 404 in this mock + # ── open-std.org scraper ───────────────────────────────────────────────────── From b9d6ed55f5c271dcae1e07bef043c2cc54fdb7d7 Mon Sep 17 00:00:00 2001 From: mac Date: Tue, 19 May 2026 23:45:21 +0800 Subject: [PATCH 2/4] added docs too --- docs/architecture.md | 36 +++++++++++++++++++++ docs/probe-operations.md | 70 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 docs/architecture.md create mode 100644 docs/probe-operations.md diff --git a/docs/architecture.md b/docs/architecture.md new file mode 100644 index 0000000..719c64b --- /dev/null +++ b/docs/architecture.md @@ -0,0 +1,36 @@ +# PaperScout architecture — concurrency + +This document describes what runs on the asyncio event loop, what runs in threads, and how to add new data sources without introducing races. + +## Event loop (async, cooperative single-thread) + +These components share one thread and the main event loop. They may await I/O but must not block the loop with synchronous work: + +- **`Scheduler.run_forever` / `poll_once`** — orchestrates index refresh, probing, and notifications. +- **`WG21Index.refresh`** — fetches and parses wg21.link index (httpx async). +- **`ISOProber.run_cycle` / `_probe_one`** — concurrent HEAD probes via `asyncio.gather` and an httpx async client. +- **Slack Bolt handlers** — run on Bolt’s thread; they should not read mutable source state directly (use snapshots or health callbacks). + +`ISOProber._stats` is updated from many coroutines in one `run_cycle()`. This is safe on the event loop because asyncio never preempts between awaits. A `threading.Lock` guards `_stats` as defense-in-depth if code is ever called from a worker thread by mistake. + +## Threads + +| Thread | Role | +|--------|------| +| **Health server** (`health.py`) | Serves `GET /health`; reads `len(index.papers)` via a callback and scheduler snapshot fields. | +| **MessageQueue sender** (`scout.py`) | Drains Slack post queue with rate limiting. | +| **`run_blocking_io` / `asyncio.to_thread`** | Runs blocking psycopg2 calls (e.g. `UserWatchlist.matches_for_users`) off the loop. | + +## Concurrency rules + +When adding or changing code: + +1. **Use `run_blocking_io()` (or `asyncio.to_thread`) only for pure blocking I/O** with no shared in-process mutable state. The function should use its own DB connection from the pool. +2. **Never access `ISOProber._stats`, `WG21Index.papers`, `WG21Index._max_rev`, or other source internals from a thread.** Read them only on the event loop, or use lock-protected snapshots (`snapshot_stats()`). +3. **`WG21Index.papers` is replaced wholesale on every `refresh()`** — do not mutate the dict in place. Assign a new dict from `_parse_and_index()`. +4. **New HTTP data sources** should follow the async pattern (`httpx` + coroutines on the loop), like `WG21Index` and `ISOProber`. The optional open-std.org scraper in `sources.py` is a future extension point: if integrated, either keep it async on the loop or isolate it in a thread with no shared mutable state. + +## Related docs + +- [probe-operations.md](probe-operations.md) — production probe volume, tuning, troubleshooting. +- [probe-performance.md](probe-performance.md) — synthetic CI benchmark (mock server, not isocpp.org). diff --git a/docs/probe-operations.md b/docs/probe-operations.md new file mode 100644 index 0000000..f97c00c --- /dev/null +++ b/docs/probe-operations.md @@ -0,0 +1,70 @@ +# ISO probe operations guide + +This document describes the **production** isocpp.org HEAD probe cycle: normal volume, timing, configuration, and what to do when something degrades. For the **synthetic CI benchmark** (mock server), see [probe-performance.md](probe-performance.md). + +## Normal envelope (default settings) + +At default configuration (~4,000 active P-numbers, `HTTP_CONCURRENCY=20`, 30-minute poll interval): + +| Metric | Typical value | +|--------|----------------| +| HEAD requests per cycle | **~1,600–2,000** | +| Wall-clock duration | **~8–10 s** (depends on isocpp.org latency) | +| Hot probes | Watchlist + frontier window + recent index papers — every cycle | +| Cold probes | **1 / `COLD_CYCLE_DIVISOR`** of the cold pool per cycle (default 48 → full tail in ~24 h) | + +Each cycle logs **`PROBE-START`** (human-readable) and **`PROBE-CYCLE-SUMMARY`** (JSON) with `cycle_requests`, `hot_probes`, `cold_probes`, `cycle_duration_s`, and per-outcome counts. + +Hot/cold split is **not a fixed ratio** — it varies with frontier size, watchlist, and index dates. Use `hot_probes` / `cold_probes` in the summary line to see the split for a given cycle. + +## Key settings + +> **Note:** Some issue templates refer to `POLL_INTERVAL_SECONDS`; the actual environment variable is **`POLL_INTERVAL_MINUTES`**. + +| Variable | Default | Operational effect | Recommended range | +|----------|---------|-------------------|-------------------| +| `HTTP_CONCURRENCY` | `20` | Max concurrent HEAD requests (semaphore) | **10–20**; lower if you see 429s or high `errors` | +| `POLL_INTERVAL_MINUTES` | `30` | Target time between poll cycles | **30** default; **increase** if cycles routinely overrun the interval | +| `POLL_OVERRUN_COOLDOWN_SECONDS` | `300` | Minimum sleep after a cycle longer than the interval | **300** default; prevents tight loops when work backs up | +| `COLD_CYCLE_DIVISOR` | `48` | Cold pool sliced across N cycles (48 × 30 min ≈ 24 h) | Raise to spread load; lower for faster cold coverage | +| `ALERT_MODIFIED_HOURS` | `24` | Only alert on hits with recent `Last-Modified` | — | + +See [`.env.example`](../.env.example) and the README env tables for frontier, hot, and cold tuning (`FRONTIER_WINDOW_*`, `HOT_LOOKBACK_MONTHS`, etc.). + +## Degradation signals + +| Signal | What it means | +|--------|----------------| +| `cycle_duration_s` > `POLL_INTERVAL_MINUTES` × 60 | Cycle **overrun** — scheduler applies `POLL_OVERRUN_COOLDOWN_SECONDS` before the next poll | +| No successful poll for **> 2 × `POLL_INTERVAL_MINUTES`** | Stale-poll ops alert (see `monitor.py`) — treat as incident | +| `errors / cycle_requests` **> 5%** for 2+ consecutive cycles | Network or upstream outage — check logs for `PROBE-ERR` | +| HTTP **429** from isocpp.org | Rate limiting — reduce concurrency and/or widen poll interval | + +Parse **`PROBE-CYCLE-SUMMARY`** from logs (grep or log aggregator) for machine-readable fields: `hit_total`, `miss`, `errors`, `skipped_discovered`, `skipped_in_index`. + +## What to do if… + +### Cycle takes longer than the poll interval + +1. Grep logs for `PROBE-CYCLE-SUMMARY` — note `cycle_duration_s` and `cycle_requests`. +2. If duration tracks request count, isocpp.org may be slow; consider lowering `HTTP_CONCURRENCY` slightly (less burst) or increasing `POLL_INTERVAL_MINUTES`. +3. Confirm `POLL_OVERRUN_COOLDOWN_SECONDS` is in effect (see `SCHEDULER-SLEEP` lines). + +### Error rate exceeds 5% + +1. Compute `errors / cycle_requests` from the last few `PROBE-CYCLE-SUMMARY` lines. +2. If sustained, check network and isocpp.org status; avoid raising `HTTP_CONCURRENCY` until errors drop. +3. Review `PROBE-ERR` debug lines for `failure_category` (timeout vs network). + +### isocpp.org returns 429 + +1. Halve `HTTP_CONCURRENCY` (e.g. 20 → 10). +2. Increase `POLL_INTERVAL_MINUTES` (e.g. 30 → 45) to reduce sustained request rate. +3. Monitor `errors` and 429 patterns for several cycles before tuning back up. + +## Related documentation + +- [README — Two-Frequency Probing Strategy](../README.md#two-frequency-probing-strategy) +- [handoff.md](handoff.md) — design rationale +- [architecture.md](architecture.md) — concurrency model +- [probe-performance.md](probe-performance.md) — CI benchmark and regression gate From 86b4d2274233e17bd7f2ef0ca7817eada8cd438c Mon Sep 17 00:00:00 2001 From: mac Date: Tue, 19 May 2026 23:59:55 +0800 Subject: [PATCH 3/4] addressed a AI review --- src/paperscout/sources.py | 41 ++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/src/paperscout/sources.py b/src/paperscout/sources.py index 65e8bc7..b8948f7 100644 --- a/src/paperscout/sources.py +++ b/src/paperscout/sources.py @@ -317,7 +317,9 @@ async def run_cycle(self) -> list[ProbeHit]: urls = self._build_probe_list() known_ids = self.index.get_known_paper_ids() - hot_count = sum(1 for u in urls if u[1] in (Tier.WATCHLIST, Tier.FRONTIER, Tier.RECENT)) + hot_count = sum( + 1 for u in urls if u[1] in (Tier.WATCHLIST, Tier.FRONTIER, Tier.RECENT) + ) cold_count = sum(1 for u in urls if u[1] == Tier.COLD) slice_idx = (self._cycle - 1) % self.cfg.cold_cycle_divisor log.info( @@ -339,7 +341,9 @@ async def run_cycle(self) -> list[ProbeHit]: follow_redirects=True, ) as client: tasks = [ - self._probe_one(client, sem, url, prefix, num, rev, ext, tier, known_ids) + self._probe_one( + client, sem, url, prefix, num, rev, ext, tier, known_ids + ) for url, tier, prefix, num, rev, ext in urls ] results = await asyncio.gather(*tasks, return_exceptions=True) @@ -405,9 +409,9 @@ def _build_probe_list(self) -> list[_Entry]: extra_p_numbers=self.state.paper_nums_from_discovered_iso_urls(), ) hot_known, hot_unknown = self._hot_numbers(frontier) - return self._build_hot_list(frontier, hot_known, hot_unknown) + self._build_cold_slice( - self._cycle, frontier, hot_known, hot_unknown - ) + return self._build_hot_list( + frontier, hot_known, hot_unknown + ) + self._build_cold_slice(self._cycle, frontier, hot_known, hot_unknown) def _hot_numbers(self, frontier: int) -> tuple[set[int], set[int]]: """Return (known_hot, unknown_hot) P-number sets to probe every cycle.""" @@ -425,9 +429,16 @@ def _hot_numbers(self, frontier: int) -> tuple[set[int], set[int]]: # Recently active papers if self.cfg.hot_lookback_months > 0: - cutoff = date.today() - timedelta(days=int(self.cfg.hot_lookback_months * 30.44)) + cutoff = date.today() - timedelta( + days=int(self.cfg.hot_lookback_months * 30.44) + ) for p in self.index.get_papers_snapshot().values(): - if p.prefix != "P" or p.number is None or not p.date or p.date == "unknown": + if ( + p.prefix != "P" + or p.number is None + or not p.date + or p.date == "unknown" + ): continue try: if date.fromisoformat(p.date[:10]) >= cutoff: @@ -438,7 +449,9 @@ def _hot_numbers(self, frontier: int) -> tuple[set[int], set[int]]: known_p_nums = self.index.known_p_numbers() return hot & known_p_nums, hot - known_p_nums - def _tier_label(self, num: int, watchlist_set: set[int], frontier_range: set[int]) -> Tier: + def _tier_label( + self, num: int, watchlist_set: set[int], frontier_range: set[int] + ) -> Tier: if num in watchlist_set: return Tier.WATCHLIST if num in frontier_range: @@ -586,8 +599,10 @@ async def _probe_one( if last_modified.tzinfo is None: last_modified = last_modified.replace(tzinfo=timezone.utc) threshold = timedelta(hours=self.cfg.alert_modified_hours) - is_recent = (datetime.now(timezone.utc) - last_modified) <= threshold - except Exception: + is_recent = ( + datetime.now(timezone.utc) - last_modified + ) <= threshold + except (TypeError, ValueError): # Bad Last-Modified: merge with no-LM so we don't silently drop hits. last_modified = None is_recent = True @@ -596,7 +611,11 @@ async def _probe_one( # file; treat as recent so we don't silently drop it. is_recent = True - lm_display = last_modified.strftime("%Y-%m-%d %H:%M UTC") if last_modified else "no-lm" + lm_display = ( + last_modified.strftime("%Y-%m-%d %H:%M UTC") + if last_modified + else "no-lm" + ) log.info( "HIT tier=%-10s recent=%-5s lm=%-20s %s", tier, From f4d9faae88249e78144264f83f303ab17719d113 Mon Sep 17 00:00:00 2001 From: mac Date: Wed, 20 May 2026 00:01:46 +0800 Subject: [PATCH 4/4] fixed lint error --- src/paperscout/sources.py | 39 ++++++++++----------------------------- 1 file changed, 10 insertions(+), 29 deletions(-) diff --git a/src/paperscout/sources.py b/src/paperscout/sources.py index b8948f7..50dfc3b 100644 --- a/src/paperscout/sources.py +++ b/src/paperscout/sources.py @@ -317,9 +317,7 @@ async def run_cycle(self) -> list[ProbeHit]: urls = self._build_probe_list() known_ids = self.index.get_known_paper_ids() - hot_count = sum( - 1 for u in urls if u[1] in (Tier.WATCHLIST, Tier.FRONTIER, Tier.RECENT) - ) + hot_count = sum(1 for u in urls if u[1] in (Tier.WATCHLIST, Tier.FRONTIER, Tier.RECENT)) cold_count = sum(1 for u in urls if u[1] == Tier.COLD) slice_idx = (self._cycle - 1) % self.cfg.cold_cycle_divisor log.info( @@ -341,9 +339,7 @@ async def run_cycle(self) -> list[ProbeHit]: follow_redirects=True, ) as client: tasks = [ - self._probe_one( - client, sem, url, prefix, num, rev, ext, tier, known_ids - ) + self._probe_one(client, sem, url, prefix, num, rev, ext, tier, known_ids) for url, tier, prefix, num, rev, ext in urls ] results = await asyncio.gather(*tasks, return_exceptions=True) @@ -409,9 +405,9 @@ def _build_probe_list(self) -> list[_Entry]: extra_p_numbers=self.state.paper_nums_from_discovered_iso_urls(), ) hot_known, hot_unknown = self._hot_numbers(frontier) - return self._build_hot_list( - frontier, hot_known, hot_unknown - ) + self._build_cold_slice(self._cycle, frontier, hot_known, hot_unknown) + return self._build_hot_list(frontier, hot_known, hot_unknown) + self._build_cold_slice( + self._cycle, frontier, hot_known, hot_unknown + ) def _hot_numbers(self, frontier: int) -> tuple[set[int], set[int]]: """Return (known_hot, unknown_hot) P-number sets to probe every cycle.""" @@ -429,16 +425,9 @@ def _hot_numbers(self, frontier: int) -> tuple[set[int], set[int]]: # Recently active papers if self.cfg.hot_lookback_months > 0: - cutoff = date.today() - timedelta( - days=int(self.cfg.hot_lookback_months * 30.44) - ) + cutoff = date.today() - timedelta(days=int(self.cfg.hot_lookback_months * 30.44)) for p in self.index.get_papers_snapshot().values(): - if ( - p.prefix != "P" - or p.number is None - or not p.date - or p.date == "unknown" - ): + if p.prefix != "P" or p.number is None or not p.date or p.date == "unknown": continue try: if date.fromisoformat(p.date[:10]) >= cutoff: @@ -449,9 +438,7 @@ def _hot_numbers(self, frontier: int) -> tuple[set[int], set[int]]: known_p_nums = self.index.known_p_numbers() return hot & known_p_nums, hot - known_p_nums - def _tier_label( - self, num: int, watchlist_set: set[int], frontier_range: set[int] - ) -> Tier: + def _tier_label(self, num: int, watchlist_set: set[int], frontier_range: set[int]) -> Tier: if num in watchlist_set: return Tier.WATCHLIST if num in frontier_range: @@ -599,9 +586,7 @@ async def _probe_one( if last_modified.tzinfo is None: last_modified = last_modified.replace(tzinfo=timezone.utc) threshold = timedelta(hours=self.cfg.alert_modified_hours) - is_recent = ( - datetime.now(timezone.utc) - last_modified - ) <= threshold + is_recent = (datetime.now(timezone.utc) - last_modified) <= threshold except (TypeError, ValueError): # Bad Last-Modified: merge with no-LM so we don't silently drop hits. last_modified = None @@ -611,11 +596,7 @@ async def _probe_one( # file; treat as recent so we don't silently drop it. is_recent = True - lm_display = ( - last_modified.strftime("%Y-%m-%d %H:%M UTC") - if last_modified - else "no-lm" - ) + lm_display = last_modified.strftime("%Y-%m-%d %H:%M UTC") if last_modified else "no-lm" log.info( "HIT tier=%-10s recent=%-5s lm=%-20s %s", tier,