Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,18 @@ Thank you for your interest in improving paperscout. This document describes how

- **[docs/onboarding.md](docs/onboarding.md)** — clone, database, `.env`, tests, and running the app locally.
- **[docs/handoff.md](docs/handoff.md)** — maintainer-oriented design notes and operational gotchas.
- **[docs/architecture.md](docs/architecture.md)** — concurrency model (event loop vs threads).
- **[docs/probe-operations.md](docs/probe-operations.md)** — production probe volume, tuning, troubleshooting.
- **[README.md](README.md)** — product behavior, Slack setup, deployment, and environment variable tables.

### Concurrency

When adding blocking I/O or new data sources:

1. Use **`run_blocking_io()`** (see `src/paperscout/concurrency.py`) only for pure blocking I/O with **no shared in-process mutable state** (e.g. psycopg2 via the connection pool).
2. **Never access `ISOProber._stats`, `WG21Index.papers`, or other source internals from a thread** — keep probes and index refresh on the asyncio event loop.
3. See **[docs/architecture.md](docs/architecture.md)** for the full model and **[docs/probe-operations.md](docs/probe-operations.md)** for probe tuning.

## Workflow

1. **Fork** the repository (if you lack direct push access) and **clone** your fork.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ Every P-number from 1 to the effective frontier is probed. Numbers are divided i
| **Cold** (1/48 per cycle ≈ daily) | All other P-numbers | everything else | D-prefix, latest+1, pdf+html |
| **Cold** (1/48 per cycle) | Gap numbers (no index entry) | 1..frontier minus known | D+P, R0..R1, pdf+html |

Typical per-cycle request count: **~1,600–2,000 HEAD requests** (~8–10 s at 20 concurrent, 100 ms latency). A full sweep of all ~4,000 P-numbers completes within ~24 h of continuous 30-min polling.
Typical per-cycle request count: **~1,600–2,000 HEAD requests** (~8–10 s at 20 concurrent, 100 ms latency). A full sweep of all ~4,000 P-numbers completes within ~24 h of continuous 30-min polling. See **[docs/probe-operations.md](docs/probe-operations.md)** for operational thresholds, log fields, and troubleshooting.

### Alerting by Last-Modified

Expand Down
36 changes: 36 additions & 0 deletions docs/architecture.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# PaperScout architecture — concurrency

This document describes what runs on the asyncio event loop, what runs in threads, and how to add new data sources without introducing races.

## Event loop (async, cooperative single-thread)

These components share one thread and the main event loop. They may await I/O but must not block the loop with synchronous work:

- **`Scheduler.run_forever` / `poll_once`** — orchestrates index refresh, probing, and notifications.
- **`WG21Index.refresh`** — fetches and parses wg21.link index (httpx async).
- **`ISOProber.run_cycle` / `_probe_one`** — concurrent HEAD probes via `asyncio.gather` and an httpx async client.
- **Slack Bolt handlers** — run on Bolt’s thread; they should not read mutable source state directly (use snapshots or health callbacks).

`ISOProber._stats` is updated from many coroutines in one `run_cycle()`. This is safe on the event loop because asyncio never preempts between awaits. A `threading.Lock` guards `_stats` as defense-in-depth if code is ever called from a worker thread by mistake.

## Threads

| Thread | Role |
|--------|------|
| **Health server** (`health.py`) | Serves `GET /health`; reads `len(index.papers)` via a callback and scheduler snapshot fields. |
| **MessageQueue sender** (`scout.py`) | Drains Slack post queue with rate limiting. |
| **`run_blocking_io` / `asyncio.to_thread`** | Runs blocking psycopg2 calls (e.g. `UserWatchlist.matches_for_users`) off the loop. |

## Concurrency rules

When adding or changing code:

1. **Use `run_blocking_io()` (or `asyncio.to_thread`) only for pure blocking I/O** with no shared in-process mutable state. The function should use its own DB connection from the pool.
2. **Never access `ISOProber._stats`, `WG21Index.papers`, `WG21Index._max_rev`, or other source internals from a thread.** Read them only on the event loop, or use lock-protected snapshots (`snapshot_stats()`).
3. **`WG21Index.papers` is replaced wholesale on every `refresh()`** — do not mutate the dict in place. Assign a new dict from `_parse_and_index()`.
4. **New HTTP data sources** should follow the async pattern (`httpx` + coroutines on the loop), like `WG21Index` and `ISOProber`. The optional open-std.org scraper in `sources.py` is a future extension point: if integrated, either keep it async on the loop or isolate it in a thread with no shared mutable state.

## Related docs

- [probe-operations.md](probe-operations.md) — production probe volume, tuning, troubleshooting.
- [probe-performance.md](probe-performance.md) — synthetic CI benchmark (mock server, not isocpp.org).
70 changes: 70 additions & 0 deletions docs/probe-operations.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# ISO probe operations guide

This document describes the **production** isocpp.org HEAD probe cycle: normal volume, timing, configuration, and what to do when something degrades. For the **synthetic CI benchmark** (mock server), see [probe-performance.md](probe-performance.md).

## Normal envelope (default settings)

At default configuration (~4,000 active P-numbers, `HTTP_CONCURRENCY=20`, 30-minute poll interval):

| Metric | Typical value |
|--------|----------------|
| HEAD requests per cycle | **~1,600–2,000** |
| Wall-clock duration | **~8–10 s** (depends on isocpp.org latency) |
| Hot probes | Watchlist + frontier window + recent index papers — every cycle |
| Cold probes | **1 / `COLD_CYCLE_DIVISOR`** of the cold pool per cycle (default 48 → full tail in ~24 h) |

Each cycle logs **`PROBE-START`** (human-readable) and **`PROBE-CYCLE-SUMMARY`** (JSON) with `cycle_requests`, `hot_probes`, `cold_probes`, `cycle_duration_s`, and per-outcome counts.

Hot/cold split is **not a fixed ratio** — it varies with frontier size, watchlist, and index dates. Use `hot_probes` / `cold_probes` in the summary line to see the split for a given cycle.

## Key settings

> **Note:** Some issue templates refer to `POLL_INTERVAL_SECONDS`; the actual environment variable is **`POLL_INTERVAL_MINUTES`**.

| Variable | Default | Operational effect | Recommended range |
|----------|---------|-------------------|-------------------|
| `HTTP_CONCURRENCY` | `20` | Max concurrent HEAD requests (semaphore) | **10–20**; lower if you see 429s or high `errors` |
| `POLL_INTERVAL_MINUTES` | `30` | Target time between poll cycles | **30** default; **increase** if cycles routinely overrun the interval |
| `POLL_OVERRUN_COOLDOWN_SECONDS` | `300` | Minimum sleep after a cycle longer than the interval | **300** default; prevents tight loops when work backs up |
| `COLD_CYCLE_DIVISOR` | `48` | Cold pool sliced across N cycles (48 × 30 min ≈ 24 h) | Raise to spread load; lower for faster cold coverage |
| `ALERT_MODIFIED_HOURS` | `24` | Only alert on hits with recent `Last-Modified` | — |

See [`.env.example`](../.env.example) and the README env tables for frontier, hot, and cold tuning (`FRONTIER_WINDOW_*`, `HOT_LOOKBACK_MONTHS`, etc.).

## Degradation signals

| Signal | What it means |
|--------|----------------|
| `cycle_duration_s` > `POLL_INTERVAL_MINUTES` × 60 | Cycle **overrun** — scheduler applies `POLL_OVERRUN_COOLDOWN_SECONDS` before the next poll |
| No successful poll for **> 2 × `POLL_INTERVAL_MINUTES`** | Stale-poll ops alert (see `monitor.py`) — treat as incident |
| `errors / cycle_requests` **> 5%** for 2+ consecutive cycles | Network or upstream outage — check logs for `PROBE-ERR` |
| HTTP **429** from isocpp.org | Rate limiting — reduce concurrency and/or widen poll interval |

Parse **`PROBE-CYCLE-SUMMARY`** from logs (grep or log aggregator) for machine-readable fields: `hit_total`, `miss`, `errors`, `skipped_discovered`, `skipped_in_index`.

## What to do if…

### Cycle takes longer than the poll interval

1. Grep logs for `PROBE-CYCLE-SUMMARY` — note `cycle_duration_s` and `cycle_requests`.
2. If duration tracks request count, isocpp.org may be slow; consider lowering `HTTP_CONCURRENCY` slightly (less burst) or increasing `POLL_INTERVAL_MINUTES`.
3. Confirm `POLL_OVERRUN_COOLDOWN_SECONDS` is in effect (see `SCHEDULER-SLEEP` lines).

### Error rate exceeds 5%

1. Compute `errors / cycle_requests` from the last few `PROBE-CYCLE-SUMMARY` lines.
2. If sustained, check network and isocpp.org status; avoid raising `HTTP_CONCURRENCY` until errors drop.
3. Review `PROBE-ERR` debug lines for `failure_category` (timeout vs network).

### isocpp.org returns 429

1. Halve `HTTP_CONCURRENCY` (e.g. 20 → 10).
2. Increase `POLL_INTERVAL_MINUTES` (e.g. 30 → 45) to reduce sustained request rate.
3. Monitor `errors` and 429 patterns for several cycles before tuning back up.

## Related documentation

- [README — Two-Frequency Probing Strategy](../README.md#two-frequency-probing-strategy)
- [handoff.md](handoff.md) — design rationale
- [architecture.md](architecture.md) — concurrency model
- [probe-performance.md](probe-performance.md) — CI benchmark and regression gate
22 changes: 22 additions & 0 deletions src/paperscout/concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Helpers for running blocking I/O off the asyncio event loop."""

from __future__ import annotations

import asyncio
from collections.abc import Callable
from typing import TypeVar

_T = TypeVar("_T")


async def run_blocking_io(fn: Callable[..., _T], /, *args, **kwargs) -> _T:
"""Run *fn* in a worker thread without blocking the event loop.

Use only for pure blocking I/O (e.g. psycopg2 queries) that does **not**
touch shared in-process mutable state such as ``ISOProber._stats``,
``WG21Index.papers``, or other source internals. Each call should use its
own database connection from the pool.

See ``docs/architecture.md`` for the full concurrency model.
"""
return await asyncio.to_thread(fn, *args, **kwargs)
5 changes: 3 additions & 2 deletions src/paperscout/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ class ProbeHit:
tier: Tier
front_text: str = ""
last_modified: datetime | None = field(default=None)
# True when Last-Modified is within alert_modified_hours of now,
# or when the header is absent (first-ever discovery of a new file).
# True when Last-Modified is within alert_modified_hours of now, when the
# header is absent, or when the header is present but unusable (first-ever
# discovery or bad Last-Modified — both treated as recent for alerting).
is_recent: bool = False


Expand Down
12 changes: 7 additions & 5 deletions src/paperscout/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import httpx

from .concurrency import run_blocking_io
from .config import Settings, settings
from .errors import ConfigurationError, FailureCategory
from .models import Paper, PerUserMatches, ProbeHit
Expand Down Expand Up @@ -150,7 +151,7 @@ async def poll_once(self) -> PollResult:
if not self._seeded:
await self.seed()
self._last_successful_poll = time.time()
self._last_probe_stats = dict(self.prober._stats)
self._last_probe_stats = self.prober.snapshot_stats()
return PollResult(
diff=DiffResult(new_papers=[], updated_papers=[]),
probe_hits=[],
Expand Down Expand Up @@ -228,9 +229,10 @@ async def poll_once(self) -> PollResult:
)
break

# Dispatched to a thread because matches_for_users performs synchronous
# PostgreSQL I/O via psycopg2, which would block the event loop.
per_user_matches = await asyncio.to_thread(
# Safe to run off the event loop: matches_for_users only performs blocking
# PostgreSQL I/O via psycopg2 on its own pool connection — it does not
# touch ISOProber._stats, WG21Index.papers, or other shared source state.
per_user_matches = await run_blocking_io(
self.user_watchlist.matches_for_users,
diff.new_papers,
recent_hits,
Expand Down Expand Up @@ -268,7 +270,7 @@ async def poll_once(self) -> PollResult:
len(per_user_matches),
)
self._last_successful_poll = time.time()
self._last_probe_stats = dict(self.prober._stats)
self._last_probe_stats = self.prober.snapshot_stats()
return result

async def run_forever(self) -> None:
Expand Down
75 changes: 62 additions & 13 deletions src/paperscout/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
from __future__ import annotations

import asyncio
import json
import logging
import re
import threading
import time
from collections.abc import Iterable, Mapping
from dataclasses import dataclass
Expand Down Expand Up @@ -43,6 +45,7 @@ class WG21Index:
def __init__(self, pool, cfg: Settings | None = None):
self._cfg = cfg or settings
self._cache = PaperCache(pool, ttl_hours=self._cfg.cache_ttl_hours)
# Replaced wholesale on every refresh(); never mutate in place.
self.papers: dict[str, Paper] = {}
self._max_rev: dict[int, int] = {} # P-number -> highest revision
self._max_p: int = 0 # absolute highest P-number
Expand Down Expand Up @@ -256,7 +259,11 @@ async def _fetch_front_text(


class ISOProber:
"""Async HEAD probe of isocpp draft URLs: hot every cycle, cold in rotating slices."""
"""Async HEAD probe of isocpp draft URLs: hot every cycle, cold in rotating slices.

Designed for single-threaded async use on the event loop. Do not call from
threads, ``asyncio.to_thread()``, or thread-pool executors.
"""

# Keys that _stats is reset to at the start of every run_cycle().
_STATS_TEMPLATE: dict[str, int] = {
Expand All @@ -265,7 +272,7 @@ class ISOProber:
"miss": 0, # server returned non-200
"hit_recent": 0, # 200 + Last-Modified within alert window
"hit_old": 0, # 200 + Last-Modified outside alert window
"hit_no_lm": 0, # 200 + no Last-Modified header (treated as recent)
"hit_no_lm": 0, # 200 + no or unusable Last-Modified (treated as recent)
"error": 0, # httpx / network exception
}

Expand All @@ -281,14 +288,31 @@ def __init__(
self.user_watchlist = user_watchlist
self.cfg = cfg or settings
self._cycle = 0
self._stats_lock = threading.Lock()
# This dict is mutated from async coroutines on the event loop.
# Thread-safety depends on asyncio cooperative scheduling. Do NOT access
# from asyncio.to_thread() or thread pool executors.
self._stats: dict[str, int] = dict(self._STATS_TEMPLATE)

def _bump_stat(self, key: str, n: int = 1) -> None:
with self._stats_lock:
self._stats[key] += n

def _reset_stats(self) -> None:
with self._stats_lock:
self._stats = dict(self._STATS_TEMPLATE)

def snapshot_stats(self) -> dict[str, int]:
"""Return a copy of per-cycle probe counters (lock-protected)."""
with self._stats_lock:
return dict(self._stats)

# ── Public API ───────────────────────────────────────────────────────────

async def run_cycle(self) -> list[ProbeHit]:
"""HEAD all scheduled URLs; return recent hits and persist discovery state."""
self._cycle += 1
self._stats = dict(self._STATS_TEMPLATE)
self._reset_stats()
t0 = time.monotonic()

urls = self._build_probe_list()
Expand Down Expand Up @@ -333,7 +357,7 @@ async def run_cycle(self) -> list[ProbeHit]:
self.state.save()

elapsed = time.monotonic() - t0
s = self._stats
s = self.snapshot_stats()
hit_total = s["hit_recent"] + s["hit_old"] + s["hit_no_lm"]
log.info(
"PROBE-DONE cycle=%d elapsed=%.1fs total=%d "
Expand All @@ -351,6 +375,26 @@ async def run_cycle(self) -> list[ProbeHit]:
s["skipped_in_index"],
s["error"],
)
log.info(
"PROBE-CYCLE-SUMMARY %s",
json.dumps(
{
"cycle": self._cycle,
"cycle_requests": len(urls),
"cycle_duration_s": round(elapsed, 2),
"hot_probes": hot_count,
"cold_probes": cold_count,
"errors": s["error"],
"hit_total": hit_total,
"hit_recent": s["hit_recent"],
"hit_old": s["hit_old"],
"hit_no_lm": s["hit_no_lm"],
"miss": s["miss"],
"skipped_discovered": s["skipped_discovered"],
"skipped_in_index": s["skipped_in_index"],
}
),
)
return hits

# ── Probe-list builders ──────────────────────────────────────────────────
Expand Down Expand Up @@ -492,13 +536,13 @@ async def _probe_one(
) -> ProbeHit | None:
if self.state.is_discovered(url):
log.debug("SKIP disc %s", url)
self._stats["skipped_discovered"] += 1
self._bump_stat("skipped_discovered")
return None
paper_id = f"{prefix}{num:04d}R{rev}"
ids = known_ids if known_ids is not None else self.index.get_known_paper_ids()
if paper_id in ids:
log.debug("SKIP idx %s", paper_id)
self._stats["skipped_in_index"] += 1
self._bump_stat("skipped_in_index")
return None
async with sem:
_max_retries = 3
Expand All @@ -521,14 +565,14 @@ async def _probe_one(
url,
exc,
)
self._stats["error"] += 1
self._bump_stat("error")
return None
else:
return None

if resp.status_code != 200:
log.debug("MISS %d %s", resp.status_code, url)
self._stats["miss"] += 1
self._bump_stat("miss")
return None

# Determine recency from the Last-Modified response header.
Expand All @@ -538,10 +582,15 @@ async def _probe_one(
if lm_str:
try:
last_modified = parsedate_to_datetime(lm_str)
# Naive datetimes from parsedate_to_datetime are UTC.
if last_modified.tzinfo is None:
last_modified = last_modified.replace(tzinfo=timezone.utc)
threshold = timedelta(hours=self.cfg.alert_modified_hours)
is_recent = (datetime.now(timezone.utc) - last_modified) <= threshold
except Exception:
pass
except (TypeError, ValueError):
# Bad Last-Modified: merge with no-LM so we don't silently drop hits.
last_modified = None
is_recent = True
Comment thread
coderabbitai[bot] marked this conversation as resolved.
else:
# No Last-Modified: first-ever discovery of an untracked
# file; treat as recent so we don't silently drop it.
Expand All @@ -557,11 +606,11 @@ async def _probe_one(
)

if is_recent and last_modified is not None:
self._stats["hit_recent"] += 1
self._bump_stat("hit_recent")
elif not is_recent:
self._stats["hit_old"] += 1
self._bump_stat("hit_old")
else:
self._stats["hit_no_lm"] += 1
self._bump_stat("hit_no_lm")

# Only fetch front text when we intend to alert.
front_text = ""
Expand Down
Loading