From 782d18c6bc383a7eef284492962e45ac892de6a3 Mon Sep 17 00:00:00 2001 From: Max Dubrinsky Date: Wed, 10 Jun 2026 14:58:03 -0400 Subject: [PATCH] fix(jobs): exact total_count and full pages for status-filtered job lists (AIRCORE-738) Status lives on PlatformJobAttempt, not PlatformJob, so list_jobs evaluates the status filter in-memory after the store query. Previously the store query was paginated with the caller's page_size over a status-free *superset*, and that superset's total_results was returned as the count. So any status-filtered job list reported an inflated total_count (superset count, not matched count) and under-filled pages (the page was narrowed *after* slicing, so it could be empty even when matches existed on later store pages). Fix: when the filter references status, drain the full status-free superset in batches, narrow it in-memory, then count + slice the matched set in Python (mirroring list_steps' job/source path). Filters that don't reference status keep the cheap store-pushdown path unchanged. Current attempts are loaded concurrently via _gather_attempts so the over-fetch isn't N serial round-trips. Adds regression tests asserting exact total_count, page fill (2/2/1), empty result on no match, and that the no-status pushdown path still paginates. Signed-off-by: Max Dubrinsky --- .../jobs/src/nmp/core/jobs/app/dispatcher.py | 142 ++++++++++++------ services/core/jobs/tests/test_dispatcher.py | 122 +++++++++++++++ 2 files changed, 217 insertions(+), 47 deletions(-) diff --git a/services/core/jobs/src/nmp/core/jobs/app/dispatcher.py b/services/core/jobs/src/nmp/core/jobs/app/dispatcher.py index ae591db494..a4d08d3fa2 100644 --- a/services/core/jobs/src/nmp/core/jobs/app/dispatcher.py +++ b/services/core/jobs/src/nmp/core/jobs/app/dispatcher.py @@ -93,6 +93,12 @@ def create_platform_job_response(job: PlatformJob, attempt: PlatformJobAttempt) # exactly. _STATUS_FIELD = "data.status" +# When a status filter is present, jobs are narrowed in-memory AFTER the store +# query, so the store cannot paginate the matched set. list_jobs drains the full +# status-free superset in batches of this size and paginates the narrowed result +# in Python instead (see AIRCORE-738). +_STORE_OVERFETCH_PAGE_SIZE = 1000 + def _references_status(operation: FilterOperation | None) -> bool: """Whether the operation tree contains any comparison on the status field.""" @@ -324,67 +330,109 @@ async def get_job(self, job_name: str, workspace: str) -> PlatformJobResponse | return None return create_platform_job_response(job_entity, attempt) - async def list_jobs( - self, - parsed: ParsedFilter, - workspace: str, - limit: Optional[int] = None, - offset: Optional[int] = None, - sort: Optional[PlatformJobSortField] = None, - ) -> Tuple[List[PlatformJobResponse], int]: - """List platform jobs with their current attempts.""" - # Status lives on PlatformJobAttempt, not PlatformJob, so the full filter - # tree is evaluated in-memory against a virtual job entity that carries the - # attempt status. The store query receives a status-free SUPERSET of the - # filter so it never drops a row the in-memory pass would accept (see - # _status_free_superset); that pass then narrows the page exactly. - # The operation is already entity-translated by make_filter_dep. - store_operation = _status_free_superset(parsed.operation) - - # Calculate page from offset - page = 1 - page_size = limit or 100 - if offset is not None and limit is not None: - page = (offset // limit) + 1 - - sort_str = sort.value if sort else "-created_at" + async def _join_and_filter_jobs(self, jobs: list[PlatformJob], parsed: ParsedFilter) -> list[PlatformJobResponse]: + """Join jobs to their current attempts and apply the full status-aware filter. - response = await self.store.list( - PlatformJob, - filter_operation=store_operation, - page=page, - page_size=page_size, - workspace=workspace, - sort=sort_str, - ) + Status lives on ``PlatformJobAttempt``, so the store query only saw a + status-free superset (see ``_status_free_superset``). This pass loads each + job's current attempt (concurrently) and evaluates the full filter tree + in-memory against a virtual entity carrying the attempt status, narrowing + the superset to the exact matched set. Jobs without a current attempt (or + whose attempt is missing) are skipped; input order is preserved. + """ + attempt_ids = {job.current_attempt_id for job in jobs if job.current_attempt_id} + attempts = await self._gather_attempts(attempt_ids) + attempt_by_id = {aid: a for aid, a in zip(attempt_ids, attempts) if a is not None} - # IN-MEMORY JOIN: load each job's current attempt, then evaluate the full - # (status-aware) filter tree against a virtual entity carrying the status. job_outputs: list[PlatformJobResponse] = [] - for job in response.data: + for job in jobs: if not job.current_attempt_id: continue - - try: - attempt = await self.store.get_by_id(PlatformJobAttempt, job.current_attempt_id) - except EntityNotFoundError: + attempt = attempt_by_id.get(job.current_attempt_id) + if attempt is None: logger.warning(f"Attempt {job.current_attempt_id} not found for job {job.id}") continue - if parsed.operation is not None: virtual_entity = _build_virtual_job_entity(job, attempt) if not parsed.operation.apply(InMemoryFilterRepository(virtual_entity)): continue + job_outputs.append(create_platform_job_response(job, attempt)) + return job_outputs - platform_job = create_platform_job_response(job, attempt) - job_outputs.append(platform_job) + async def list_jobs( + self, + parsed: ParsedFilter, + workspace: str, + limit: Optional[int] = None, + offset: Optional[int] = None, + sort: Optional[PlatformJobSortField] = None, + ) -> Tuple[List[PlatformJobResponse], int]: + """List platform jobs with their current attempts. + + Most filters push down to the entity store, which paginates and counts + directly. A status filter is the exception: status lives on the attempt, + so it is evaluated in-memory after the store query (see + ``_join_and_filter_jobs``). The store therefore cannot paginate the matched + set, so we drain the full status-free superset, narrow it in-memory, then + count and slice in Python — otherwise ``total_count`` reflects the larger + superset and pages under-fill (AIRCORE-738). + """ + # The operation is already entity-translated by make_filter_dep. + store_operation = _status_free_superset(parsed.operation) + page_size = limit or 100 + start = offset or 0 + sort_str = sort.value if sort else "-created_at" - # Sort in memory - if sort: - reverse = sort.get_sort_direction() == "desc" - job_outputs.sort(key=lambda j: getattr(j, sort.get_field_name()), reverse=reverse) + def _apply_sort(jobs: list[PlatformJobResponse]) -> list[PlatformJobResponse]: + if sort: + field_name = sort.get_field_name() + reverse = sort.get_sort_direction() == "desc" + jobs.sort(key=lambda j: getattr(j, field_name), reverse=reverse) + return jobs + + # Callers paginate by page: the REST layer passes offset = (page-1)*page_size, + # so offset is always a multiple of limit. The no-status branch maps that offset + # onto a store page; the status branch slices the matched set by offset exactly. + # Both coincide for page-aligned offsets (the only ones the API produces). + if not _references_status(parsed.operation): + # The store filter is exact, so let the store paginate and count. + page = (start // limit) + 1 if (limit and start) else 1 + response = await self.store.list( + PlatformJob, + filter_operation=store_operation, + page=page, + page_size=page_size, + workspace=workspace, + sort=sort_str, + ) + jobs = await self._join_and_filter_jobs(list(response.data), parsed) + return _apply_sort(jobs), response.pagination.total_results + + # Status filter: status is narrowed in-memory after the store query, so the + # store can neither paginate nor count the matched set. Drain the full + # status-free superset (in batches), narrow it in-memory, then count + slice in + # Python so total_count is exact and pages fill. This holds the superset in + # memory (bounded by the workspace's job count, as list_steps already does); + # denormalizing status onto PlatformJob so it is store-filterable is the + # longer-term fix that would remove the in-memory pass entirely. + candidates: list[PlatformJob] = [] + page = 1 + while True: + response = await self.store.list( + PlatformJob, + filter_operation=store_operation, + page=page, + page_size=_STORE_OVERFETCH_PAGE_SIZE, + workspace=workspace, + sort=sort_str, + ) + candidates.extend(response.data) + if page >= response.pagination.total_pages: + break + page += 1 - return job_outputs, response.pagination.total_results + matched = _apply_sort(await self._join_and_filter_jobs(candidates, parsed)) + return matched[start : start + page_size], len(matched) async def delete_job(self, job_name: str, workspace: str) -> bool: """Delete a job and all of its associated data (steps, tasks, results, logs). diff --git a/services/core/jobs/tests/test_dispatcher.py b/services/core/jobs/tests/test_dispatcher.py index 9dc1511eb2..af322e08ee 100644 --- a/services/core/jobs/tests/test_dispatcher.py +++ b/services/core/jobs/tests/test_dispatcher.py @@ -1362,3 +1362,125 @@ async def test_list_jobs_filter_name_operator_and_project_like( assert "excluded-job" not in returned_names assert "no-project-job" not in returned_names assert len(returned_names) == 1 + + +# ============================================================================= +# list_jobs: status-filter pagination + total_count (AIRCORE-738) +# +# Status lives on PlatformJobAttempt, so it is filtered in-memory after the +# store query. These tests pin the two bugs AIRCORE-738 describes: total_count +# must reflect the matched set (not the status-free superset), and each page +# must fill up to page_size with *matching* jobs rather than under-filling. +# ============================================================================= + + +@pytest.mark.asyncio +async def test_list_jobs_status_filter_total_count_excludes_non_matching( + mock_dispatcher: JobDispatcher, + mock_store: EntityClient, +): + """total_count for a status filter counts matched jobs, not the superset (AIRCORE-738).""" + for i in range(3): + await _make_job(mock_dispatcher, mock_store, f"active-{i}", PlatformJobStatus.ACTIVE) + for i in range(2): + await _make_job(mock_dispatcher, mock_store, f"completed-{i}", PlatformJobStatus.COMPLETED) + await _make_job(mock_dispatcher, mock_store, "error-0", PlatformJobStatus.ERROR) + + jobs, total_count = await mock_dispatcher.list_jobs( + parsed=ParsedFilter( + operation=ComparisonOperation(field="data.status", operator=FilterOperator.EQ, value="active"), + ), + workspace=DEFAULT_WORKSPACE, + ) + + # Before the fix total_count was 6 (the status-free superset). + assert total_count == 3 + assert len(jobs) == 3 + assert all(j.status == PlatformJobStatus.ACTIVE for j in jobs) + + +@pytest.mark.asyncio +async def test_list_jobs_status_filter_pages_fill_to_page_size( + mock_dispatcher: JobDispatcher, + mock_store: EntityClient, +): + """Each status-filtered page returns up to page_size matching jobs; total_count is exact (AIRCORE-738).""" + from nmp.core.jobs.api.v2.jobs.schemas import PlatformJobSortField + + # Interleave statuses so a naive store-side page of size 2 would mix matches + # with non-matches and under-fill the page. + expected_active: set[str] = set() + for i in range(5): + active = await _make_job(mock_dispatcher, mock_store, f"active-{i}", PlatformJobStatus.ACTIVE) + expected_active.add(active.id) + await _make_job(mock_dispatcher, mock_store, f"completed-{i}", PlatformJobStatus.COMPLETED) + + op = ParsedFilter( + operation=ComparisonOperation(field="data.status", operator=FilterOperator.EQ, value="active"), + ) + + seen: list[str] = [] + page_sizes: list[int] = [] + for offset in (0, 2, 4): + page, total_count = await mock_dispatcher.list_jobs( + parsed=op, + workspace=DEFAULT_WORKSPACE, + limit=2, + offset=offset, + sort=PlatformJobSortField.CREATED_AT_ASC, + ) + assert total_count == 5 # exact matched count, independent of the page + page_sizes.append(len(page)) + seen.extend(j.id for j in page) + + # Pages fill (2, 2, 1) instead of under-filling, and every matching job + # surfaces exactly once across the pages — no empty pages, no duplicates. + assert page_sizes == [2, 2, 1] + assert set(seen) == expected_active + assert len(seen) == 5 + + +@pytest.mark.asyncio +async def test_list_jobs_status_filter_no_match_zero_count( + mock_dispatcher: JobDispatcher, + mock_store: EntityClient, +): + """A status with no matching jobs returns [] and total_count 0, not the superset (AIRCORE-738).""" + for i in range(3): + await _make_job(mock_dispatcher, mock_store, f"completed-{i}", PlatformJobStatus.COMPLETED) + + jobs, total_count = await mock_dispatcher.list_jobs( + parsed=ParsedFilter( + operation=ComparisonOperation(field="data.status", operator=FilterOperator.EQ, value="active"), + ), + workspace=DEFAULT_WORKSPACE, + ) + + assert jobs == [] + assert total_count == 0 + + +@pytest.mark.asyncio +async def test_list_jobs_no_status_filter_paginates_via_store( + mock_dispatcher: JobDispatcher, + mock_store: EntityClient, +): + """Without a status filter, the store-pushdown path still paginates and counts correctly (AIRCORE-738).""" + from nmp.core.jobs.api.v2.jobs.schemas import PlatformJobSortField + + created = [await _make_job(mock_dispatcher, mock_store, f"job-{i}", PlatformJobStatus.ACTIVE) for i in range(5)] + + seen: list[str] = [] + for offset in (0, 2, 4): + page, total_count = await mock_dispatcher.list_jobs( + parsed=ParsedFilter(operation=None), + workspace=DEFAULT_WORKSPACE, + limit=2, + offset=offset, + sort=PlatformJobSortField.CREATED_AT_ASC, + ) + assert total_count == 5 + seen.extend(j.id for j in page) + + assert set(seen) == {j.id for j in created} + assert len(seen) == 5