Skip to content

Commit 531c022

Browse files
committed
Add include_jobs filter
1 parent 8f5e02e commit 531c022

4 files changed

Lines changed: 68 additions & 38 deletions

File tree

src/dstack/_internal/server/routers/runs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ async def list_runs(
5454
repo_id=body.repo_id,
5555
username=body.username,
5656
only_active=body.only_active,
57+
include_jobs=body.include_jobs,
5758
job_submissions_limit=body.job_submissions_limit,
5859
prev_submitted_at=body.prev_submitted_at,
5960
prev_run_id=body.prev_run_id,

src/dstack/_internal/server/schemas/runs.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,18 @@ class ListRunsRequest(CoreModel):
1313
repo_id: Optional[str] = None
1414
username: Optional[str] = None
1515
only_active: bool = False
16+
include_jobs: bool = Field(
17+
True,
18+
description=(
19+
"Whether to include `jobs` in the response"
20+
),
21+
)
1622
job_submissions_limit: Optional[int] = Field(
1723
None,
1824
ge=0,
1925
description=(
2026
"Limit number of job submissions returned per job to avoid large responses."
21-
"Drops old job submissions"
27+
"Drops older job submissions. No effect with `include_jobs: false`"
2228
),
2329
)
2430
prev_submitted_at: Optional[datetime] = None

src/dstack/_internal/server/services/runs.py

Lines changed: 58 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ async def list_user_runs(
105105
repo_id: Optional[str],
106106
username: Optional[str],
107107
only_active: bool,
108+
include_jobs: bool,
108109
job_submissions_limit: Optional[int],
109110
prev_submitted_at: Optional[datetime],
110111
prev_run_id: Optional[uuid.UUID],
@@ -151,7 +152,10 @@ async def list_user_runs(
151152
try:
152153
runs.append(
153154
run_model_to_run(
154-
r, return_in_api=True, job_submissions_limit=job_submissions_limit
155+
r,
156+
return_in_api=True,
157+
include_jobs=include_jobs,
158+
job_submissions_limit=job_submissions_limit,
155159
)
156160
)
157161
except pydantic.ValidationError:
@@ -657,52 +661,26 @@ async def delete_runs(
657661

658662
def run_model_to_run(
659663
run_model: RunModel,
660-
include_job_submissions: bool = True,
664+
include_jobs: bool = True,
661665
job_submissions_limit: Optional[int] = None,
662666
return_in_api: bool = False,
663667
include_sensitive: bool = False,
664668
) -> Run:
665669
jobs: List[Job] = []
666-
run_jobs = sorted(run_model.jobs, key=lambda j: (j.replica_num, j.job_num, j.submission_num))
667-
for replica_num, replica_submissions in itertools.groupby(
668-
run_jobs, key=lambda j: j.replica_num
669-
):
670-
for job_num, job_submissions in itertools.groupby(
671-
replica_submissions, key=lambda j: j.job_num
672-
):
673-
submissions = []
674-
job_model = None
675-
if job_submissions_limit is not None:
676-
if job_submissions_limit == 0:
677-
job_submissions = []
678-
else:
679-
job_submissions = list(job_submissions)[-job_submissions_limit:]
680-
for job_model in job_submissions:
681-
if include_job_submissions:
682-
job_submission = job_model_to_job_submission(job_model)
683-
if return_in_api:
684-
# Set default non-None values for 0.18 backward-compatibility
685-
# Remove in 0.19
686-
if job_submission.job_provisioning_data is not None:
687-
if job_submission.job_provisioning_data.hostname is None:
688-
job_submission.job_provisioning_data.hostname = ""
689-
if job_submission.job_provisioning_data.ssh_port is None:
690-
job_submission.job_provisioning_data.ssh_port = 22
691-
submissions.append(job_submission)
692-
if job_model is not None:
693-
# Use the spec from the latest submission. Submissions can have different specs
694-
job_spec = JobSpec.__response__.parse_raw(job_model.job_spec_data)
695-
if not include_sensitive:
696-
_remove_job_spec_sensitive_info(job_spec)
697-
jobs.append(Job(job_spec=job_spec, job_submissions=submissions))
670+
if include_jobs:
671+
jobs = _get_run_job_with_submissions(
672+
run_model=run_model,
673+
job_submissions_limit=job_submissions_limit,
674+
return_in_api=return_in_api,
675+
include_sensitive=include_sensitive,
676+
)
698677

699678
run_spec = RunSpec.__response__.parse_raw(run_model.run_spec)
700679

701680
latest_job_submission = None
702-
if include_job_submissions:
681+
if len(jobs) > 0 and len(jobs[0].job_submissions) > 0:
703682
# TODO(egor-s): does it make sense with replicas and multi-node?
704-
if jobs:
705-
latest_job_submission = jobs[0].job_submissions[-1]
683+
latest_job_submission = jobs[0].job_submissions[-1]
706684

707685
service_spec = None
708686
if run_model.service_spec is not None:
@@ -727,6 +705,49 @@ def run_model_to_run(
727705
return run
728706

729707

708+
def _get_run_job_with_submissions(
709+
run_model: RunModel,
710+
job_submissions_limit: Optional[int],
711+
return_in_api: bool = False,
712+
include_sensitive: bool = False,
713+
) -> List[Job]:
714+
jobs: List[Job] = []
715+
run_jobs = sorted(run_model.jobs, key=lambda j: (j.replica_num, j.job_num, j.submission_num))
716+
for replica_num, replica_submissions in itertools.groupby(
717+
run_jobs, key=lambda j: j.replica_num
718+
):
719+
for job_num, job_models in itertools.groupby(
720+
replica_submissions, key=lambda j: j.job_num
721+
):
722+
submissions = []
723+
job_model = None
724+
if job_submissions_limit is not None:
725+
if job_submissions_limit == 0:
726+
# Take latest job submission to return its job_spec
727+
job_models = list(job_models)[-1:]
728+
else:
729+
job_models = list(job_models)[-job_submissions_limit:]
730+
for job_model in job_models:
731+
if job_submissions_limit != 0:
732+
job_submission = job_model_to_job_submission(job_model)
733+
if return_in_api:
734+
# Set default non-None values for 0.18 backward-compatibility
735+
# Remove in 0.19
736+
if job_submission.job_provisioning_data is not None:
737+
if job_submission.job_provisioning_data.hostname is None:
738+
job_submission.job_provisioning_data.hostname = ""
739+
if job_submission.job_provisioning_data.ssh_port is None:
740+
job_submission.job_provisioning_data.ssh_port = 22
741+
submissions.append(job_submission)
742+
if job_model is not None:
743+
# Use the spec from the latest submission. Submissions can have different specs
744+
job_spec = JobSpec.__response__.parse_raw(job_model.job_spec_data)
745+
if not include_sensitive:
746+
_remove_job_spec_sensitive_info(job_spec)
747+
jobs.append(Job(job_spec=job_spec, job_submissions=submissions))
748+
return jobs
749+
750+
730751
async def _get_pool_offers(
731752
session: AsyncSession,
732753
project: ProjectModel,

src/dstack/api/server/_runs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@ def list(
3333
prev_run_id: Optional[UUID] = None,
3434
limit: int = 100,
3535
ascending: bool = False,
36+
include_jobs: bool = True,
3637
job_submissions_limit: Optional[int] = None,
3738
) -> List[Run]:
3839
body = ListRunsRequest(
3940
project_name=project_name,
4041
repo_id=repo_id,
4142
username=username,
4243
only_active=only_active,
44+
include_jobs=include_jobs,
4345
job_submissions_limit=job_submissions_limit,
4446
prev_submitted_at=prev_submitted_at,
4547
prev_run_id=prev_run_id,

0 commit comments

Comments
 (0)