Skip to content

Rewrite archiver client #367

Open
hmarts9 wants to merge 1 commit intomainfrom
feat/archiver-rewrite
Open

Rewrite archiver client #367
hmarts9 wants to merge 1 commit intomainfrom
feat/archiver-rewrite

Conversation

@hmarts9
Copy link
Copy Markdown
Collaborator

@hmarts9 hmarts9 commented Mar 25, 2026

Rewrite archiver client: batching, retries, connection pooling

Why: Queries were timing out. The archiver client had three problems: a 3 second timeout that was way too tight, no retry mechanism for flaky connections, and it was issuing one HTTP request per PV.

What changed:

Batching — Instead of looping through PVs and making a request for each, we now send them all in one GET to getDataForPVs.json.

Connection pooling — Using requests.Session with HTTPAdapter now so we reuse TCP connections across requests instead of opening and closing on each call.

Resilience — Added automatic retries (3x with exponential backoff) for transient 5xx errors. Timeout is now 15 seconds, and it's configurable just set _archiver.TIMEOUT = N if you need to adjust.

Better error handling — New exception hierarchy: ArchiverError, ArchiverTimeoutError, ArchiverConnectionError. Makes it easier to distinguish between actual failures and timeout issues.

DST fix — We were computing the UTC offset once at import time, which breaks when DST transitions. Now we calculate it per request.

Thread safety — Sessions are thread local,threading.local() so it's safe to use with ThreadPoolExecutor.

Minor adds — Supports use_operator for server side processing (like firstSample_3600), and there's a progress callback for GUI integration.

No breaking changes — All public function signatures and return types are the same. Existing code didnt need any modifications. get_data_at_time still returns {} on HTTP errors to match what callers expect.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors the LCLS-II archiver client to reduce timeouts and improve resilience by adding connection pooling, retries, PV batching, and parallel sampling for interval queries, while updating unit tests for the new request/session behavior.

Changes:

  • Introduces a thread-local requests.Session with HTTPAdapter pooling and urllib3.Retry, plus configurable timeout and new archiver-specific exceptions.
  • Batches multi-PV range queries via getDataForPVs.json and parallelizes interval sampling with ThreadPoolExecutor.
  • Updates unit tests to mock session usage and to recognize the new exception types.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 6 comments.

File Description
lcls_tools/common/data/archiver.py Adds session pooling/retries, per-request UTC offset handling, batched range fetches, and concurrent interval fetches.
tests/unit_tests/lcls_tools/common/data/test_archiver.py Updates mocks/exception handling to align with session-based requests and new exception hierarchy.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

DEFAULT_MAX_WORKERS: int = 4

# If daylight savings time, SLAC is 7 hours behind UTC otherwise 8
# Legacy constant kept for backward compatability
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling: “compatability” should be “compatibility”.

Suggested change
# Legacy constant kept for backward compatability
# Legacy constant kept for backward compatibility

Copilot uses AI. Check for mistakes.
Comment on lines +91 to +96
def _get_utc_offset(dt: Optional[datetime] = None) -> str:
"""Return the Pacific time UTC offset, checking DST per timestamp."""
if dt is None:
dt = datetime.now()
return "-07:00" if time.localtime(dt.timestamp()).tm_isdst else "-08:00"

Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_get_utc_offset() claims to return the Pacific time offset, but it uses time.localtime(dt.timestamp()), which reflects the host machine’s local timezone/DST rules. If this library is used on a non-Pacific host, the computed offset can be wrong, leading to incorrect query timestamps around DST boundaries. Consider computing the offset using a fixed timezone (e.g., zoneinfo.ZoneInfo("America/Los_Angeles")) rather than relying on the system local timezone.

Copilot uses AI. Check for mistakes.
Comment on lines 288 to +293
result: DefaultDict[str, ArchiveDataHandler] = defaultdict(ArchiveDataHandler)
workers = min(max_workers, len(sample_times))
snapshots: Dict[datetime, Dict[str, ArchiverValue]] = {}

while curr_time < end_time:
value: Dict[str, ArchiverValue] = get_data_at_time(pv_list, curr_time)
for pv, archiver_value in value.items():
with ThreadPoolExecutor(max_workers=workers) as pool:
future_to_time = {
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

max_workers is now user-configurable, but values <= 0 will cause ThreadPoolExecutor(max_workers=workers) to raise at runtime. Consider validating max_workers (and therefore workers) to be at least 1 (or falling back to 1) before constructing the executor.

Copilot uses AI. Check for mistakes.
Comment on lines +299 to +303
try:
snapshots[t] = future.result()
except ArchiverError as exc:
logger.warning("Failed to fetch data at %s: %s", t, exc)
snapshots[t] = {}
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_data_with_time_interval() now swallows ArchiverError and substitutes an empty snapshot, which changes the observable behavior from the prior implementation (callers/tests could previously catch Timeout/Connection errors). This can mask partial/total failures and makes it impossible for callers to distinguish “no data” vs “request failed”. Consider re-raising (or aggregating and raising) ArchiverError by default, or adding an explicit opt-in flag to return partial results.

Copilot uses AI. Check for mistakes.
Comment on lines +334 to +341
try:
batch = _fetch_pv_batch_range(
pv_list, start_time, end_time, timeout, operator=use_operator,
)
result.update(batch)
except ArchiverError as exc:
logger.error("Batch fetch failed for %d PVs: %s", len(pv_list), exc)

Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_values_over_time_range() catches ArchiverError from the batch fetch, logs it, and then returns empty handlers for all PVs. Previously, connection/timeouts would propagate, allowing callers/tests to handle or skip appropriately; returning empty results here can silently hide outages and makes failures indistinguishable from “no archived data”. Consider propagating ArchiverTimeoutError/ArchiverConnectionError (or providing a parameter to control whether errors are swallowed).

Copilot uses AI. Check for mistakes.
Comment on lines +319 to 321
use_operator: Optional[str] = None,
progress_callback: Optional[Callable[[int, int], None]] = None,
) -> DefaultDict[str, ArchiveDataHandler]:
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New parameters use_operator and progress_callback add user-visible behavior (PV wrapping/mapping and callback invocation), but there are no unit tests covering these paths in tests/unit_tests/lcls_tools/common/data/test_archiver.py. Consider adding tests that mock the session response to verify operator handling and callback invocation.

Copilot uses AI. Check for mistakes.
"HTTP %s in get_data_at_time for %s at %s",
response.status_code, pv_list, time_requested,
)
return {}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why return an empty dict instead of raising an error like with the other ones?

if pv not in result:
result[pv] = ArchiveDataHandler()

if progress_callback:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get an example usage for this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants