Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 95 additions & 47 deletions services/core/jobs/src/nmp/core/jobs/app/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ def create_platform_job_response(job: PlatformJob, attempt: PlatformJobAttempt)
# exactly.
_STATUS_FIELD = "data.status"

# When a status filter is present, jobs are narrowed in-memory AFTER the store
# query, so the store cannot paginate the matched set. list_jobs drains the full
# status-free superset in batches of this size and paginates the narrowed result
# in Python instead (see AIRCORE-738).
_STORE_OVERFETCH_PAGE_SIZE = 1000


def _references_status(operation: FilterOperation | None) -> bool:
"""Whether the operation tree contains any comparison on the status field."""
Expand Down Expand Up @@ -324,67 +330,109 @@ async def get_job(self, job_name: str, workspace: str) -> PlatformJobResponse |
return None
return create_platform_job_response(job_entity, attempt)

async def list_jobs(
self,
parsed: ParsedFilter,
workspace: str,
limit: Optional[int] = None,
offset: Optional[int] = None,
sort: Optional[PlatformJobSortField] = None,
) -> Tuple[List[PlatformJobResponse], int]:
"""List platform jobs with their current attempts."""
# Status lives on PlatformJobAttempt, not PlatformJob, so the full filter
# tree is evaluated in-memory against a virtual job entity that carries the
# attempt status. The store query receives a status-free SUPERSET of the
# filter so it never drops a row the in-memory pass would accept (see
# _status_free_superset); that pass then narrows the page exactly.
# The operation is already entity-translated by make_filter_dep.
store_operation = _status_free_superset(parsed.operation)

# Calculate page from offset
page = 1
page_size = limit or 100
if offset is not None and limit is not None:
page = (offset // limit) + 1

sort_str = sort.value if sort else "-created_at"
async def _join_and_filter_jobs(self, jobs: list[PlatformJob], parsed: ParsedFilter) -> list[PlatformJobResponse]:
"""Join jobs to their current attempts and apply the full status-aware filter.

response = await self.store.list(
PlatformJob,
filter_operation=store_operation,
page=page,
page_size=page_size,
workspace=workspace,
sort=sort_str,
)
Status lives on ``PlatformJobAttempt``, so the store query only saw a
status-free superset (see ``_status_free_superset``). This pass loads each
job's current attempt (concurrently) and evaluates the full filter tree
in-memory against a virtual entity carrying the attempt status, narrowing
the superset to the exact matched set. Jobs without a current attempt (or
whose attempt is missing) are skipped; input order is preserved.
"""
attempt_ids = {job.current_attempt_id for job in jobs if job.current_attempt_id}
attempts = await self._gather_attempts(attempt_ids)
attempt_by_id = {aid: a for aid, a in zip(attempt_ids, attempts) if a is not None}
Comment on lines +343 to +345

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix incorrect attempt ID → attempt pairing and bound status-filter fan-out in dispatcher

  • _join_and_filter_jobs rebuilds attempt_by_id using zip(attempt_ids, attempts) where attempt_ids is a set; pairing assumes set iteration order matches the list(attempt_ids) order used by _gather_attempts. Build attempt_by_id by keying on attempt ID (or return a dict from _gather_attempts) to avoid mis-matching jobs to attempts.
  • list_jobs status-filter drains the full status-free superset into candidates and then fetches one attempt per unique current_attempt_id via an unbounded TaskGroup; cap concurrency and/or avoid fetching attempts for jobs that will be discarded by in-memory filtering/slicing to prevent store/latency blowups.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@services/core/jobs/src/nmp/core/jobs/app/dispatcher.py` around lines 343 -
345, _join_and_filter_jobs builds attempt_by_id using zip over attempt_ids (a
set) and attempts which can mis-pair attempts; change _gather_attempts to return
a dict keyed by attempt_id (or build attempt_by_id by mapping each returned
attempt.aid to the attempt) and use that mapping in attempt_by_id instead of
zipping a set. Also address unbounded fan-out in list_jobs: avoid fetching
attempts for every job in candidates—apply in-memory filtering/limit/sorting
first (or cap the candidate set) and use a bounded concurrency mechanism when
awaiting _gather_attempts/TaskGroup for current_attempt_id to prevent
store/latency blowups; reference functions/classes: _join_and_filter_jobs,
_gather_attempts, list_jobs, candidates, current_attempt_id, attempt_by_id, and
the TaskGroup usage.


# IN-MEMORY JOIN: load each job's current attempt, then evaluate the full
# (status-aware) filter tree against a virtual entity carrying the status.
job_outputs: list[PlatformJobResponse] = []
for job in response.data:
for job in jobs:
if not job.current_attempt_id:
continue

try:
attempt = await self.store.get_by_id(PlatformJobAttempt, job.current_attempt_id)
except EntityNotFoundError:
attempt = attempt_by_id.get(job.current_attempt_id)
if attempt is None:
logger.warning(f"Attempt {job.current_attempt_id} not found for job {job.id}")
continue

if parsed.operation is not None:
virtual_entity = _build_virtual_job_entity(job, attempt)
if not parsed.operation.apply(InMemoryFilterRepository(virtual_entity)):
continue
job_outputs.append(create_platform_job_response(job, attempt))
return job_outputs

platform_job = create_platform_job_response(job, attempt)
job_outputs.append(platform_job)
async def list_jobs(
self,
parsed: ParsedFilter,
workspace: str,
limit: Optional[int] = None,
offset: Optional[int] = None,
sort: Optional[PlatformJobSortField] = None,
) -> Tuple[List[PlatformJobResponse], int]:
"""List platform jobs with their current attempts.

Most filters push down to the entity store, which paginates and counts
directly. A status filter is the exception: status lives on the attempt,
so it is evaluated in-memory after the store query (see
``_join_and_filter_jobs``). The store therefore cannot paginate the matched
set, so we drain the full status-free superset, narrow it in-memory, then
count and slice in Python — otherwise ``total_count`` reflects the larger
superset and pages under-fill (AIRCORE-738).
"""
# The operation is already entity-translated by make_filter_dep.
store_operation = _status_free_superset(parsed.operation)
page_size = limit or 100
start = offset or 0
sort_str = sort.value if sort else "-created_at"

# Sort in memory
if sort:
reverse = sort.get_sort_direction() == "desc"
job_outputs.sort(key=lambda j: getattr(j, sort.get_field_name()), reverse=reverse)
def _apply_sort(jobs: list[PlatformJobResponse]) -> list[PlatformJobResponse]:
if sort:
field_name = sort.get_field_name()
reverse = sort.get_sort_direction() == "desc"
jobs.sort(key=lambda j: getattr(j, field_name), reverse=reverse)
return jobs

# Callers paginate by page: the REST layer passes offset = (page-1)*page_size,
# so offset is always a multiple of limit. The no-status branch maps that offset
# onto a store page; the status branch slices the matched set by offset exactly.
# Both coincide for page-aligned offsets (the only ones the API produces).
if not _references_status(parsed.operation):
# The store filter is exact, so let the store paginate and count.
page = (start // limit) + 1 if (limit and start) else 1
response = await self.store.list(
PlatformJob,
filter_operation=store_operation,
page=page,
page_size=page_size,
workspace=workspace,
sort=sort_str,
)
jobs = await self._join_and_filter_jobs(list(response.data), parsed)
return _apply_sort(jobs), response.pagination.total_results

# Status filter: status is narrowed in-memory after the store query, so the
# store can neither paginate nor count the matched set. Drain the full
# status-free superset (in batches), narrow it in-memory, then count + slice in
# Python so total_count is exact and pages fill. This holds the superset in
# memory (bounded by the workspace's job count, as list_steps already does);
# denormalizing status onto PlatformJob so it is store-filterable is the
# longer-term fix that would remove the in-memory pass entirely.
candidates: list[PlatformJob] = []
page = 1
while True:
response = await self.store.list(
PlatformJob,
filter_operation=store_operation,
page=page,
page_size=_STORE_OVERFETCH_PAGE_SIZE,
workspace=workspace,
sort=sort_str,
)
candidates.extend(response.data)
if page >= response.pagination.total_pages:
break
page += 1

return job_outputs, response.pagination.total_results
matched = _apply_sort(await self._join_and_filter_jobs(candidates, parsed))
return matched[start : start + page_size], len(matched)
Comment on lines +418 to +435

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Bound the attempt-fetch fan-out in the status path.

This branch drains the full superset and then joins attempts for every candidate in one shot. On a large workspace, one status-filtered request can turn into thousands of concurrent get_by_id calls, which is enough to saturate the entity client/DB pool and make the endpoint unstable under load. Join/filter each overfetch page as it is read, or chunk the attempt fetches to the same batch size instead of doing one unbounded gather over the whole workspace.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@services/core/jobs/src/nmp/core/jobs/app/dispatcher.py` around lines 418 -
435, The current loop accumulates all pages into candidates and then calls await
self._join_and_filter_jobs(candidates, parsed) which can trigger an unbounded
fan-out of attempt get_by_id calls; instead process each overfetch page as it is
read (or chunk candidates into batches of size _STORE_OVERFETCH_PAGE_SIZE) and
call await self._join_and_filter_jobs(...) per page/batch to bound concurrency.
Concretely, inside the while True after receiving response, call
_join_and_filter_jobs(response.data, parsed), extend a local matched list with
the returned results (or collect counts), and only after the loop apply the
final sort/slice; ensure you reference self.store.list,
_STORE_OVERFETCH_PAGE_SIZE, and _join_and_filter_jobs when modifying the logic
so no single gather spans the entire workspace.


async def delete_job(self, job_name: str, workspace: str) -> bool:
"""Delete a job and all of its associated data (steps, tasks, results, logs).
Expand Down
122 changes: 122 additions & 0 deletions services/core/jobs/tests/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1362,3 +1362,125 @@ async def test_list_jobs_filter_name_operator_and_project_like(
assert "excluded-job" not in returned_names
assert "no-project-job" not in returned_names
assert len(returned_names) == 1


# =============================================================================
# list_jobs: status-filter pagination + total_count (AIRCORE-738)
#
# Status lives on PlatformJobAttempt, so it is filtered in-memory after the
# store query. These tests pin the two bugs AIRCORE-738 describes: total_count
# must reflect the matched set (not the status-free superset), and each page
# must fill up to page_size with *matching* jobs rather than under-filling.
# =============================================================================


@pytest.mark.asyncio
async def test_list_jobs_status_filter_total_count_excludes_non_matching(
mock_dispatcher: JobDispatcher,
mock_store: EntityClient,
):
"""total_count for a status filter counts matched jobs, not the superset (AIRCORE-738)."""
for i in range(3):
await _make_job(mock_dispatcher, mock_store, f"active-{i}", PlatformJobStatus.ACTIVE)
for i in range(2):
await _make_job(mock_dispatcher, mock_store, f"completed-{i}", PlatformJobStatus.COMPLETED)
await _make_job(mock_dispatcher, mock_store, "error-0", PlatformJobStatus.ERROR)

jobs, total_count = await mock_dispatcher.list_jobs(
parsed=ParsedFilter(
operation=ComparisonOperation(field="data.status", operator=FilterOperator.EQ, value="active"),
),
workspace=DEFAULT_WORKSPACE,
)

# Before the fix total_count was 6 (the status-free superset).
assert total_count == 3
assert len(jobs) == 3
assert all(j.status == PlatformJobStatus.ACTIVE for j in jobs)


@pytest.mark.asyncio
async def test_list_jobs_status_filter_pages_fill_to_page_size(
mock_dispatcher: JobDispatcher,
mock_store: EntityClient,
):
"""Each status-filtered page returns up to page_size matching jobs; total_count is exact (AIRCORE-738)."""
from nmp.core.jobs.api.v2.jobs.schemas import PlatformJobSortField

# Interleave statuses so a naive store-side page of size 2 would mix matches
# with non-matches and under-fill the page.
expected_active: set[str] = set()
for i in range(5):
active = await _make_job(mock_dispatcher, mock_store, f"active-{i}", PlatformJobStatus.ACTIVE)
expected_active.add(active.id)
await _make_job(mock_dispatcher, mock_store, f"completed-{i}", PlatformJobStatus.COMPLETED)

op = ParsedFilter(
operation=ComparisonOperation(field="data.status", operator=FilterOperator.EQ, value="active"),
)

seen: list[str] = []
page_sizes: list[int] = []
for offset in (0, 2, 4):
page, total_count = await mock_dispatcher.list_jobs(
parsed=op,
workspace=DEFAULT_WORKSPACE,
limit=2,
offset=offset,
sort=PlatformJobSortField.CREATED_AT_ASC,
)
assert total_count == 5 # exact matched count, independent of the page
page_sizes.append(len(page))
seen.extend(j.id for j in page)

# Pages fill (2, 2, 1) instead of under-filling, and every matching job
# surfaces exactly once across the pages — no empty pages, no duplicates.
assert page_sizes == [2, 2, 1]
assert set(seen) == expected_active
assert len(seen) == 5


@pytest.mark.asyncio
async def test_list_jobs_status_filter_no_match_zero_count(
mock_dispatcher: JobDispatcher,
mock_store: EntityClient,
):
"""A status with no matching jobs returns [] and total_count 0, not the superset (AIRCORE-738)."""
for i in range(3):
await _make_job(mock_dispatcher, mock_store, f"completed-{i}", PlatformJobStatus.COMPLETED)

jobs, total_count = await mock_dispatcher.list_jobs(
parsed=ParsedFilter(
operation=ComparisonOperation(field="data.status", operator=FilterOperator.EQ, value="active"),
),
workspace=DEFAULT_WORKSPACE,
)

assert jobs == []
assert total_count == 0


@pytest.mark.asyncio
async def test_list_jobs_no_status_filter_paginates_via_store(
mock_dispatcher: JobDispatcher,
mock_store: EntityClient,
):
"""Without a status filter, the store-pushdown path still paginates and counts correctly (AIRCORE-738)."""
from nmp.core.jobs.api.v2.jobs.schemas import PlatformJobSortField

created = [await _make_job(mock_dispatcher, mock_store, f"job-{i}", PlatformJobStatus.ACTIVE) for i in range(5)]

seen: list[str] = []
for offset in (0, 2, 4):
page, total_count = await mock_dispatcher.list_jobs(
parsed=ParsedFilter(operation=None),
workspace=DEFAULT_WORKSPACE,
limit=2,
offset=offset,
sort=PlatformJobSortField.CREATED_AT_ASC,
)
assert total_count == 5
seen.extend(j.id for j in page)

assert set(seen) == {j.id for j in created}
assert len(seen) == 5
Loading