-
Notifications
You must be signed in to change notification settings - Fork 4
fix(jobs): correct total_count and page fill for status-filtered job lists #263
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -93,6 +93,12 @@ def create_platform_job_response(job: PlatformJob, attempt: PlatformJobAttempt) | |
| # exactly. | ||
| _STATUS_FIELD = "data.status" | ||
|
|
||
| # When a status filter is present, jobs are narrowed in-memory AFTER the store | ||
| # query, so the store cannot paginate the matched set. list_jobs drains the full | ||
| # status-free superset in batches of this size and paginates the narrowed result | ||
| # in Python instead (see AIRCORE-738). | ||
| _STORE_OVERFETCH_PAGE_SIZE = 1000 | ||
|
|
||
|
|
||
| def _references_status(operation: FilterOperation | None) -> bool: | ||
| """Whether the operation tree contains any comparison on the status field.""" | ||
|
|
@@ -324,67 +330,109 @@ async def get_job(self, job_name: str, workspace: str) -> PlatformJobResponse | | |
| return None | ||
| return create_platform_job_response(job_entity, attempt) | ||
|
|
||
| async def list_jobs( | ||
| self, | ||
| parsed: ParsedFilter, | ||
| workspace: str, | ||
| limit: Optional[int] = None, | ||
| offset: Optional[int] = None, | ||
| sort: Optional[PlatformJobSortField] = None, | ||
| ) -> Tuple[List[PlatformJobResponse], int]: | ||
| """List platform jobs with their current attempts.""" | ||
| # Status lives on PlatformJobAttempt, not PlatformJob, so the full filter | ||
| # tree is evaluated in-memory against a virtual job entity that carries the | ||
| # attempt status. The store query receives a status-free SUPERSET of the | ||
| # filter so it never drops a row the in-memory pass would accept (see | ||
| # _status_free_superset); that pass then narrows the page exactly. | ||
| # The operation is already entity-translated by make_filter_dep. | ||
| store_operation = _status_free_superset(parsed.operation) | ||
|
|
||
| # Calculate page from offset | ||
| page = 1 | ||
| page_size = limit or 100 | ||
| if offset is not None and limit is not None: | ||
| page = (offset // limit) + 1 | ||
|
|
||
| sort_str = sort.value if sort else "-created_at" | ||
| async def _join_and_filter_jobs(self, jobs: list[PlatformJob], parsed: ParsedFilter) -> list[PlatformJobResponse]: | ||
| """Join jobs to their current attempts and apply the full status-aware filter. | ||
|
|
||
| response = await self.store.list( | ||
| PlatformJob, | ||
| filter_operation=store_operation, | ||
| page=page, | ||
| page_size=page_size, | ||
| workspace=workspace, | ||
| sort=sort_str, | ||
| ) | ||
| Status lives on ``PlatformJobAttempt``, so the store query only saw a | ||
| status-free superset (see ``_status_free_superset``). This pass loads each | ||
| job's current attempt (concurrently) and evaluates the full filter tree | ||
| in-memory against a virtual entity carrying the attempt status, narrowing | ||
| the superset to the exact matched set. Jobs without a current attempt (or | ||
| whose attempt is missing) are skipped; input order is preserved. | ||
| """ | ||
| attempt_ids = {job.current_attempt_id for job in jobs if job.current_attempt_id} | ||
| attempts = await self._gather_attempts(attempt_ids) | ||
| attempt_by_id = {aid: a for aid, a in zip(attempt_ids, attempts) if a is not None} | ||
|
|
||
| # IN-MEMORY JOIN: load each job's current attempt, then evaluate the full | ||
| # (status-aware) filter tree against a virtual entity carrying the status. | ||
| job_outputs: list[PlatformJobResponse] = [] | ||
| for job in response.data: | ||
| for job in jobs: | ||
| if not job.current_attempt_id: | ||
| continue | ||
|
|
||
| try: | ||
| attempt = await self.store.get_by_id(PlatformJobAttempt, job.current_attempt_id) | ||
| except EntityNotFoundError: | ||
| attempt = attempt_by_id.get(job.current_attempt_id) | ||
| if attempt is None: | ||
| logger.warning(f"Attempt {job.current_attempt_id} not found for job {job.id}") | ||
| continue | ||
|
|
||
| if parsed.operation is not None: | ||
| virtual_entity = _build_virtual_job_entity(job, attempt) | ||
| if not parsed.operation.apply(InMemoryFilterRepository(virtual_entity)): | ||
| continue | ||
| job_outputs.append(create_platform_job_response(job, attempt)) | ||
| return job_outputs | ||
|
|
||
| platform_job = create_platform_job_response(job, attempt) | ||
| job_outputs.append(platform_job) | ||
| async def list_jobs( | ||
| self, | ||
| parsed: ParsedFilter, | ||
| workspace: str, | ||
| limit: Optional[int] = None, | ||
| offset: Optional[int] = None, | ||
| sort: Optional[PlatformJobSortField] = None, | ||
| ) -> Tuple[List[PlatformJobResponse], int]: | ||
| """List platform jobs with their current attempts. | ||
|
|
||
| Most filters push down to the entity store, which paginates and counts | ||
| directly. A status filter is the exception: status lives on the attempt, | ||
| so it is evaluated in-memory after the store query (see | ||
| ``_join_and_filter_jobs``). The store therefore cannot paginate the matched | ||
| set, so we drain the full status-free superset, narrow it in-memory, then | ||
| count and slice in Python — otherwise ``total_count`` reflects the larger | ||
| superset and pages under-fill (AIRCORE-738). | ||
| """ | ||
| # The operation is already entity-translated by make_filter_dep. | ||
| store_operation = _status_free_superset(parsed.operation) | ||
| page_size = limit or 100 | ||
| start = offset or 0 | ||
| sort_str = sort.value if sort else "-created_at" | ||
|
|
||
| # Sort in memory | ||
| if sort: | ||
| reverse = sort.get_sort_direction() == "desc" | ||
| job_outputs.sort(key=lambda j: getattr(j, sort.get_field_name()), reverse=reverse) | ||
| def _apply_sort(jobs: list[PlatformJobResponse]) -> list[PlatformJobResponse]: | ||
| if sort: | ||
| field_name = sort.get_field_name() | ||
| reverse = sort.get_sort_direction() == "desc" | ||
| jobs.sort(key=lambda j: getattr(j, field_name), reverse=reverse) | ||
| return jobs | ||
|
|
||
| # Callers paginate by page: the REST layer passes offset = (page-1)*page_size, | ||
| # so offset is always a multiple of limit. The no-status branch maps that offset | ||
| # onto a store page; the status branch slices the matched set by offset exactly. | ||
| # Both coincide for page-aligned offsets (the only ones the API produces). | ||
| if not _references_status(parsed.operation): | ||
| # The store filter is exact, so let the store paginate and count. | ||
| page = (start // limit) + 1 if (limit and start) else 1 | ||
| response = await self.store.list( | ||
| PlatformJob, | ||
| filter_operation=store_operation, | ||
| page=page, | ||
| page_size=page_size, | ||
| workspace=workspace, | ||
| sort=sort_str, | ||
| ) | ||
| jobs = await self._join_and_filter_jobs(list(response.data), parsed) | ||
| return _apply_sort(jobs), response.pagination.total_results | ||
|
|
||
| # Status filter: status is narrowed in-memory after the store query, so the | ||
| # store can neither paginate nor count the matched set. Drain the full | ||
| # status-free superset (in batches), narrow it in-memory, then count + slice in | ||
| # Python so total_count is exact and pages fill. This holds the superset in | ||
| # memory (bounded by the workspace's job count, as list_steps already does); | ||
| # denormalizing status onto PlatformJob so it is store-filterable is the | ||
| # longer-term fix that would remove the in-memory pass entirely. | ||
| candidates: list[PlatformJob] = [] | ||
| page = 1 | ||
| while True: | ||
| response = await self.store.list( | ||
| PlatformJob, | ||
| filter_operation=store_operation, | ||
| page=page, | ||
| page_size=_STORE_OVERFETCH_PAGE_SIZE, | ||
| workspace=workspace, | ||
| sort=sort_str, | ||
| ) | ||
| candidates.extend(response.data) | ||
| if page >= response.pagination.total_pages: | ||
| break | ||
| page += 1 | ||
|
|
||
| return job_outputs, response.pagination.total_results | ||
| matched = _apply_sort(await self._join_and_filter_jobs(candidates, parsed)) | ||
| return matched[start : start + page_size], len(matched) | ||
|
Comment on lines
+418
to
+435
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bound the attempt-fetch fan-out in the status path. This branch drains the full superset and then joins attempts for every candidate in one shot. On a large workspace, one status-filtered request can turn into thousands of concurrent 🤖 Prompt for AI Agents |
||
|
|
||
| async def delete_job(self, job_name: str, workspace: str) -> bool: | ||
| """Delete a job and all of its associated data (steps, tasks, results, logs). | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix incorrect attempt ID → attempt pairing and bound status-filter fan-out in dispatcher
_join_and_filter_jobsrebuildsattempt_by_idusingzip(attempt_ids, attempts)whereattempt_idsis aset; pairing assumes set iteration order matches thelist(attempt_ids)order used by_gather_attempts. Buildattempt_by_idby keying on attempt ID (or return a dict from_gather_attempts) to avoid mis-matching jobs to attempts.list_jobsstatus-filter drains the full status-free superset intocandidatesand then fetches one attempt per uniquecurrent_attempt_idvia an unboundedTaskGroup; cap concurrency and/or avoid fetching attempts for jobs that will be discarded by in-memory filtering/slicing to prevent store/latency blowups.🤖 Prompt for AI Agents