fix(jobs): correct total_count and page fill for status-filtered job lists#263
fix(jobs): correct total_count and page fill for status-filtered job lists#263maxdubrinsky wants to merge 1 commit into
Conversation
📝 WalkthroughWalkthroughlist_jobs now detects status-field filters and either pushes exact filters to the store (non-status) or drains a status-free superset in overfetch batches and applies an in-memory join/filter/sort/slice so total_count and pagination reflect only status-matching jobs. ChangesStatus-aware job list filtering
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
services/core/jobs/src/nmp/core/jobs/app/dispatcher.py (1)
343-345: ⚡ Quick winSet iteration order is not guaranteed by Python spec.
attempt_idsis a set._gather_attemptsinternally converts it to a list, butzip(attempt_ids, attempts)iterates the set again. While CPython maintains stable iteration order for unmodified sets, this isn't spec-guaranteed. Same pattern exists inlist_steps.Consider having
_gather_attemptsreturndict[str, PlatformJobAttempt | None]directly, or convert the set to a list once and pass that to both.♻️ Suggested fix
- attempt_ids = {job.current_attempt_id for job in jobs if job.current_attempt_id} - attempts = await self._gather_attempts(attempt_ids) - attempt_by_id = {aid: a for aid, a in zip(attempt_ids, attempts) if a is not None} + aid_list = [job.current_attempt_id for job in jobs if job.current_attempt_id] + unique_aids = list(dict.fromkeys(aid_list)) # dedupe preserving order + attempts = await self._gather_attempts(set(unique_aids)) + # Build mapping using the same iteration order as _gather_attempts internally uses + attempt_by_id = {} + for aid in unique_aids: + # Fetch result by index would require _gather_attempts to return dict + pass # Better: refactor _gather_attempts to return dict[str, Optional[...]]Or refactor
_gather_attemptsto return a dict:async def _gather_attempts(self, attempt_ids: set[str]) -> dict[str, PlatformJobAttempt | None]: if not attempt_ids: return {} aid_list = list(attempt_ids) async with asyncio.TaskGroup() as tg: tasks = [tg.create_task(self.get_attempt(aid)) for aid in aid_list] return {aid: t.result() for aid, t in zip(aid_list, tasks)}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@services/core/jobs/src/nmp/core/jobs/app/dispatcher.py` around lines 343 - 345, The current code builds attempt_ids as a set and then zips attempt_ids with the results from self._gather_attempts, which relies on matching iteration order that the Python spec doesn't guarantee; fix by making order explicit: either convert attempt_ids to a list once (e.g., aid_list = list(attempt_ids)) and pass aid_list to self._gather_attempts and use aid_list to zip with attempts, or change _gather_attempts to return a dict mapping attempt_id -> PlatformJobAttempt (so call attempt_by_id = await self._gather_attempts(attempt_ids)); apply the same pattern for list_steps to avoid relying on set iteration order.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@services/core/jobs/src/nmp/core/jobs/app/dispatcher.py`:
- Around line 343-345: The current code builds attempt_ids as a set and then
zips attempt_ids with the results from self._gather_attempts, which relies on
matching iteration order that the Python spec doesn't guarantee; fix by making
order explicit: either convert attempt_ids to a list once (e.g., aid_list =
list(attempt_ids)) and pass aid_list to self._gather_attempts and use aid_list
to zip with attempts, or change _gather_attempts to return a dict mapping
attempt_id -> PlatformJobAttempt (so call attempt_by_id = await
self._gather_attempts(attempt_ids)); apply the same pattern for list_steps to
avoid relying on set iteration order.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: b4989e23-b8b8-4709-8eb6-136f2e3de5ce
📒 Files selected for processing (2)
services/core/jobs/src/nmp/core/jobs/app/dispatcher.pyservices/core/jobs/tests/test_dispatcher.py
|
…ists (AIRCORE-738) Status lives on PlatformJobAttempt, not PlatformJob, so list_jobs evaluates the status filter in-memory after the store query. Previously the store query was paginated with the caller's page_size over a status-free *superset*, and that superset's total_results was returned as the count. So any status-filtered job list reported an inflated total_count (superset count, not matched count) and under-filled pages (the page was narrowed *after* slicing, so it could be empty even when matches existed on later store pages). Fix: when the filter references status, drain the full status-free superset in batches, narrow it in-memory, then count + slice the matched set in Python (mirroring list_steps' job/source path). Filters that don't reference status keep the cheap store-pushdown path unchanged. Current attempts are loaded concurrently via _gather_attempts so the over-fetch isn't N serial round-trips. Adds regression tests asserting exact total_count, page fill (2/2/1), empty result on no match, and that the no-status pushdown path still paginates. Signed-off-by: Max Dubrinsky <mdubrinsky@nvidia.com>
d395c12 to
782d18c
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@services/core/jobs/src/nmp/core/jobs/app/dispatcher.py`:
- Around line 418-435: The current loop accumulates all pages into candidates
and then calls await self._join_and_filter_jobs(candidates, parsed) which can
trigger an unbounded fan-out of attempt get_by_id calls; instead process each
overfetch page as it is read (or chunk candidates into batches of size
_STORE_OVERFETCH_PAGE_SIZE) and call await self._join_and_filter_jobs(...) per
page/batch to bound concurrency. Concretely, inside the while True after
receiving response, call _join_and_filter_jobs(response.data, parsed), extend a
local matched list with the returned results (or collect counts), and only after
the loop apply the final sort/slice; ensure you reference self.store.list,
_STORE_OVERFETCH_PAGE_SIZE, and _join_and_filter_jobs when modifying the logic
so no single gather spans the entire workspace.
- Around line 343-345: _join_and_filter_jobs builds attempt_by_id using zip over
attempt_ids (a set) and attempts which can mis-pair attempts; change
_gather_attempts to return a dict keyed by attempt_id (or build attempt_by_id by
mapping each returned attempt.aid to the attempt) and use that mapping in
attempt_by_id instead of zipping a set. Also address unbounded fan-out in
list_jobs: avoid fetching attempts for every job in candidates—apply in-memory
filtering/limit/sorting first (or cap the candidate set) and use a bounded
concurrency mechanism when awaiting _gather_attempts/TaskGroup for
current_attempt_id to prevent store/latency blowups; reference
functions/classes: _join_and_filter_jobs, _gather_attempts, list_jobs,
candidates, current_attempt_id, attempt_by_id, and the TaskGroup usage.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: a6c27980-8df6-4222-b5b2-97ec5d3f07b5
📒 Files selected for processing (2)
services/core/jobs/src/nmp/core/jobs/app/dispatcher.pyservices/core/jobs/tests/test_dispatcher.py
| attempt_ids = {job.current_attempt_id for job in jobs if job.current_attempt_id} | ||
| attempts = await self._gather_attempts(attempt_ids) | ||
| attempt_by_id = {aid: a for aid, a in zip(attempt_ids, attempts) if a is not None} |
There was a problem hiding this comment.
Fix incorrect attempt ID → attempt pairing and bound status-filter fan-out in dispatcher
_join_and_filter_jobsrebuildsattempt_by_idusingzip(attempt_ids, attempts)whereattempt_idsis aset; pairing assumes set iteration order matches thelist(attempt_ids)order used by_gather_attempts. Buildattempt_by_idby keying on attempt ID (or return a dict from_gather_attempts) to avoid mis-matching jobs to attempts.list_jobsstatus-filter drains the full status-free superset intocandidatesand then fetches one attempt per uniquecurrent_attempt_idvia an unboundedTaskGroup; cap concurrency and/or avoid fetching attempts for jobs that will be discarded by in-memory filtering/slicing to prevent store/latency blowups.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@services/core/jobs/src/nmp/core/jobs/app/dispatcher.py` around lines 343 -
345, _join_and_filter_jobs builds attempt_by_id using zip over attempt_ids (a
set) and attempts which can mis-pair attempts; change _gather_attempts to return
a dict keyed by attempt_id (or build attempt_by_id by mapping each returned
attempt.aid to the attempt) and use that mapping in attempt_by_id instead of
zipping a set. Also address unbounded fan-out in list_jobs: avoid fetching
attempts for every job in candidates—apply in-memory filtering/limit/sorting
first (or cap the candidate set) and use a bounded concurrency mechanism when
awaiting _gather_attempts/TaskGroup for current_attempt_id to prevent
store/latency blowups; reference functions/classes: _join_and_filter_jobs,
_gather_attempts, list_jobs, candidates, current_attempt_id, attempt_by_id, and
the TaskGroup usage.
| candidates: list[PlatformJob] = [] | ||
| page = 1 | ||
| while True: | ||
| response = await self.store.list( | ||
| PlatformJob, | ||
| filter_operation=store_operation, | ||
| page=page, | ||
| page_size=_STORE_OVERFETCH_PAGE_SIZE, | ||
| workspace=workspace, | ||
| sort=sort_str, | ||
| ) | ||
| candidates.extend(response.data) | ||
| if page >= response.pagination.total_pages: | ||
| break | ||
| page += 1 | ||
|
|
||
| return job_outputs, response.pagination.total_results | ||
| matched = _apply_sort(await self._join_and_filter_jobs(candidates, parsed)) | ||
| return matched[start : start + page_size], len(matched) |
There was a problem hiding this comment.
Bound the attempt-fetch fan-out in the status path.
This branch drains the full superset and then joins attempts for every candidate in one shot. On a large workspace, one status-filtered request can turn into thousands of concurrent get_by_id calls, which is enough to saturate the entity client/DB pool and make the endpoint unstable under load. Join/filter each overfetch page as it is read, or chunk the attempt fetches to the same batch size instead of doing one unbounded gather over the whole workspace.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@services/core/jobs/src/nmp/core/jobs/app/dispatcher.py` around lines 418 -
435, The current loop accumulates all pages into candidates and then calls await
self._join_and_filter_jobs(candidates, parsed) which can trigger an unbounded
fan-out of attempt get_by_id calls; instead process each overfetch page as it is
read (or chunk candidates into batches of size _STORE_OVERFETCH_PAGE_SIZE) and
call await self._join_and_filter_jobs(...) per page/batch to bound concurrency.
Concretely, inside the while True after receiving response, call
_join_and_filter_jobs(response.data, parsed), extend a local matched list with
the returned results (or collect counts), and only after the loop apply the
final sort/slice; ensure you reference self.store.list,
_STORE_OVERFETCH_PAGE_SIZE, and _join_and_filter_jobs when modifying the logic
so no single gather spans the entire workspace.
Summary
JobDispatcher.list_jobsfilters bystatusin memory — status lives onPlatformJobAttempt, notPlatformJob, so it cannot be pushed to the entity-store query. Previously the store query was paginated with the caller'spage_sizeover a status-free superset of the filter, and that superset'stotal_resultswas returned as the count. Consequences for any status-filtered job list:total_count— it returned the superset count (every job matching the non-status constraints), not the count of jobs whose attempt status actually matched.page_sizewas applied to the superset, then matches were dropped after slicing, so a page could be empty even when matching jobs existed on later store pages.Closes AIRCORE-738.
Fix
When the filter references
status, drain the full status-free superset in batches (_STORE_OVERFETCH_PAGE_SIZE), narrow it in-memory, then count + slice the matched set in Python — mirroring the existing correct over-fetch path inlist_steps. Filters that don't reference status keep the cheap store-pushdown path (store paginates and counts directly), so the common case is unchanged. The in-memory join now loads current attempts concurrently via_gather_attempts, so the over-fetch isn't N serial round-trips.Testing
pytest services/core/jobs/tests→ 323 passed, 17 skipped. New regression tests intest_dispatcher.py:test_list_jobs_status_filter_total_count_excludes_non_matching—total_count== matched count (was the superset count).test_list_jobs_status_filter_pages_fill_to_page_size— pages fill2/2/1and every match surfaces exactly once;total_countexact per page.test_list_jobs_status_filter_no_match_zero_count—[]+total_count == 0.test_list_jobs_no_status_filter_paginates_via_store— guards the unchanged no-status pushdown path.The three status tests fail against
main(6==3,10==5,3==0) and pass with this change.Summary by CodeRabbit