Conversation
…better error handling
There was a problem hiding this comment.
Pull request overview
Refactors the LCLS-II archiver client to reduce timeouts and improve resilience by adding connection pooling, retries, PV batching, and parallel sampling for interval queries, while updating unit tests for the new request/session behavior.
Changes:
- Introduces a thread-local
requests.SessionwithHTTPAdapterpooling andurllib3.Retry, plus configurable timeout and new archiver-specific exceptions. - Batches multi-PV range queries via
getDataForPVs.jsonand parallelizes interval sampling withThreadPoolExecutor. - Updates unit tests to mock session usage and to recognize the new exception types.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
lcls_tools/common/data/archiver.py |
Adds session pooling/retries, per-request UTC offset handling, batched range fetches, and concurrent interval fetches. |
tests/unit_tests/lcls_tools/common/data/test_archiver.py |
Updates mocks/exception handling to align with session-based requests and new exception hierarchy. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| DEFAULT_MAX_WORKERS: int = 4 | ||
|
|
||
| # If daylight savings time, SLAC is 7 hours behind UTC otherwise 8 | ||
| # Legacy constant kept for backward compatability |
There was a problem hiding this comment.
Spelling: “compatability” should be “compatibility”.
| # Legacy constant kept for backward compatability | |
| # Legacy constant kept for backward compatibility |
| def _get_utc_offset(dt: Optional[datetime] = None) -> str: | ||
| """Return the Pacific time UTC offset, checking DST per timestamp.""" | ||
| if dt is None: | ||
| dt = datetime.now() | ||
| return "-07:00" if time.localtime(dt.timestamp()).tm_isdst else "-08:00" | ||
|
|
There was a problem hiding this comment.
_get_utc_offset() claims to return the Pacific time offset, but it uses time.localtime(dt.timestamp()), which reflects the host machine’s local timezone/DST rules. If this library is used on a non-Pacific host, the computed offset can be wrong, leading to incorrect query timestamps around DST boundaries. Consider computing the offset using a fixed timezone (e.g., zoneinfo.ZoneInfo("America/Los_Angeles")) rather than relying on the system local timezone.
| result: DefaultDict[str, ArchiveDataHandler] = defaultdict(ArchiveDataHandler) | ||
| workers = min(max_workers, len(sample_times)) | ||
| snapshots: Dict[datetime, Dict[str, ArchiverValue]] = {} | ||
|
|
||
| while curr_time < end_time: | ||
| value: Dict[str, ArchiverValue] = get_data_at_time(pv_list, curr_time) | ||
| for pv, archiver_value in value.items(): | ||
| with ThreadPoolExecutor(max_workers=workers) as pool: | ||
| future_to_time = { |
There was a problem hiding this comment.
max_workers is now user-configurable, but values <= 0 will cause ThreadPoolExecutor(max_workers=workers) to raise at runtime. Consider validating max_workers (and therefore workers) to be at least 1 (or falling back to 1) before constructing the executor.
| try: | ||
| snapshots[t] = future.result() | ||
| except ArchiverError as exc: | ||
| logger.warning("Failed to fetch data at %s: %s", t, exc) | ||
| snapshots[t] = {} |
There was a problem hiding this comment.
get_data_with_time_interval() now swallows ArchiverError and substitutes an empty snapshot, which changes the observable behavior from the prior implementation (callers/tests could previously catch Timeout/Connection errors). This can mask partial/total failures and makes it impossible for callers to distinguish “no data” vs “request failed”. Consider re-raising (or aggregating and raising) ArchiverError by default, or adding an explicit opt-in flag to return partial results.
| try: | ||
| batch = _fetch_pv_batch_range( | ||
| pv_list, start_time, end_time, timeout, operator=use_operator, | ||
| ) | ||
| result.update(batch) | ||
| except ArchiverError as exc: | ||
| logger.error("Batch fetch failed for %d PVs: %s", len(pv_list), exc) | ||
|
|
There was a problem hiding this comment.
get_values_over_time_range() catches ArchiverError from the batch fetch, logs it, and then returns empty handlers for all PVs. Previously, connection/timeouts would propagate, allowing callers/tests to handle or skip appropriately; returning empty results here can silently hide outages and makes failures indistinguishable from “no archived data”. Consider propagating ArchiverTimeoutError/ArchiverConnectionError (or providing a parameter to control whether errors are swallowed).
| use_operator: Optional[str] = None, | ||
| progress_callback: Optional[Callable[[int, int], None]] = None, | ||
| ) -> DefaultDict[str, ArchiveDataHandler]: |
There was a problem hiding this comment.
New parameters use_operator and progress_callback add user-visible behavior (PV wrapping/mapping and callback invocation), but there are no unit tests covering these paths in tests/unit_tests/lcls_tools/common/data/test_archiver.py. Consider adding tests that mock the session response to verify operator handling and callback invocation.
| "HTTP %s in get_data_at_time for %s at %s", | ||
| response.status_code, pv_list, time_requested, | ||
| ) | ||
| return {} |
There was a problem hiding this comment.
Why return an empty dict instead of raising an error like with the other ones?
| if pv not in result: | ||
| result[pv] = ArchiveDataHandler() | ||
|
|
||
| if progress_callback: |
There was a problem hiding this comment.
Can we get an example usage for this?
Rewrite archiver client: batching, retries, connection pooling
Why: Queries were timing out. The archiver client had three problems: a 3 second timeout that was way too tight, no retry mechanism for flaky connections, and it was issuing one HTTP request per PV.
What changed:
Batching — Instead of looping through PVs and making a request for each, we now send them all in one GET to
getDataForPVs.json.Connection pooling — Using
requests.SessionwithHTTPAdapternow so we reuse TCP connections across requests instead of opening and closing on each call.Resilience — Added automatic retries (3x with exponential backoff) for transient 5xx errors. Timeout is now 15 seconds, and it's configurable just set
_archiver.TIMEOUT = Nif you need to adjust.Better error handling — New exception hierarchy:
ArchiverError,ArchiverTimeoutError,ArchiverConnectionError. Makes it easier to distinguish between actual failures and timeout issues.DST fix — We were computing the UTC offset once at import time, which breaks when DST transitions. Now we calculate it per request.
Thread safety — Sessions are thread local,
threading.local()so it's safe to use withThreadPoolExecutor.Minor adds — Supports
use_operatorfor server side processing (likefirstSample_3600), and there's a progress callback for GUI integration.No breaking changes — All public function signatures and return types are the same. Existing code didnt need any modifications.
get_data_at_timestill returns{}on HTTP errors to match what callers expect.