Skip to content

Commit 36f874b

Browse files
committed
Add include_jobs filter
1 parent 8f5e02e commit 36f874b

File tree

4 files changed

+64
-38
lines changed

4 files changed

+64
-38
lines changed

src/dstack/_internal/server/routers/runs.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ async def list_runs(
5454
repo_id=body.repo_id,
5555
username=body.username,
5656
only_active=body.only_active,
57+
include_jobs=body.include_jobs,
5758
job_submissions_limit=body.job_submissions_limit,
5859
prev_submitted_at=body.prev_submitted_at,
5960
prev_run_id=body.prev_run_id,

src/dstack/_internal/server/schemas/runs.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,16 @@ class ListRunsRequest(CoreModel):
1313
repo_id: Optional[str] = None
1414
username: Optional[str] = None
1515
only_active: bool = False
16+
include_jobs: bool = Field(
17+
True,
18+
description=("Whether to include `jobs` in the response"),
19+
)
1620
job_submissions_limit: Optional[int] = Field(
1721
None,
1822
ge=0,
1923
description=(
2024
"Limit number of job submissions returned per job to avoid large responses."
21-
"Drops old job submissions"
25+
"Drops older job submissions. No effect with `include_jobs: false`"
2226
),
2327
)
2428
prev_submitted_at: Optional[datetime] = None

src/dstack/_internal/server/services/runs.py

Lines changed: 56 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ async def list_user_runs(
105105
repo_id: Optional[str],
106106
username: Optional[str],
107107
only_active: bool,
108+
include_jobs: bool,
108109
job_submissions_limit: Optional[int],
109110
prev_submitted_at: Optional[datetime],
110111
prev_run_id: Optional[uuid.UUID],
@@ -151,7 +152,10 @@ async def list_user_runs(
151152
try:
152153
runs.append(
153154
run_model_to_run(
154-
r, return_in_api=True, job_submissions_limit=job_submissions_limit
155+
r,
156+
return_in_api=True,
157+
include_jobs=include_jobs,
158+
job_submissions_limit=job_submissions_limit,
155159
)
156160
)
157161
except pydantic.ValidationError:
@@ -657,52 +661,26 @@ async def delete_runs(
657661

658662
def run_model_to_run(
659663
run_model: RunModel,
660-
include_job_submissions: bool = True,
664+
include_jobs: bool = True,
661665
job_submissions_limit: Optional[int] = None,
662666
return_in_api: bool = False,
663667
include_sensitive: bool = False,
664668
) -> Run:
665669
jobs: List[Job] = []
666-
run_jobs = sorted(run_model.jobs, key=lambda j: (j.replica_num, j.job_num, j.submission_num))
667-
for replica_num, replica_submissions in itertools.groupby(
668-
run_jobs, key=lambda j: j.replica_num
669-
):
670-
for job_num, job_submissions in itertools.groupby(
671-
replica_submissions, key=lambda j: j.job_num
672-
):
673-
submissions = []
674-
job_model = None
675-
if job_submissions_limit is not None:
676-
if job_submissions_limit == 0:
677-
job_submissions = []
678-
else:
679-
job_submissions = list(job_submissions)[-job_submissions_limit:]
680-
for job_model in job_submissions:
681-
if include_job_submissions:
682-
job_submission = job_model_to_job_submission(job_model)
683-
if return_in_api:
684-
# Set default non-None values for 0.18 backward-compatibility
685-
# Remove in 0.19
686-
if job_submission.job_provisioning_data is not None:
687-
if job_submission.job_provisioning_data.hostname is None:
688-
job_submission.job_provisioning_data.hostname = ""
689-
if job_submission.job_provisioning_data.ssh_port is None:
690-
job_submission.job_provisioning_data.ssh_port = 22
691-
submissions.append(job_submission)
692-
if job_model is not None:
693-
# Use the spec from the latest submission. Submissions can have different specs
694-
job_spec = JobSpec.__response__.parse_raw(job_model.job_spec_data)
695-
if not include_sensitive:
696-
_remove_job_spec_sensitive_info(job_spec)
697-
jobs.append(Job(job_spec=job_spec, job_submissions=submissions))
670+
if include_jobs:
671+
jobs = _get_run_job_with_submissions(
672+
run_model=run_model,
673+
job_submissions_limit=job_submissions_limit,
674+
return_in_api=return_in_api,
675+
include_sensitive=include_sensitive,
676+
)
698677

699678
run_spec = RunSpec.__response__.parse_raw(run_model.run_spec)
700679

701680
latest_job_submission = None
702-
if include_job_submissions:
681+
if len(jobs) > 0 and len(jobs[0].job_submissions) > 0:
703682
# TODO(egor-s): does it make sense with replicas and multi-node?
704-
if jobs:
705-
latest_job_submission = jobs[0].job_submissions[-1]
683+
latest_job_submission = jobs[0].job_submissions[-1]
706684

707685
service_spec = None
708686
if run_model.service_spec is not None:
@@ -727,6 +705,47 @@ def run_model_to_run(
727705
return run
728706

729707

708+
def _get_run_job_with_submissions(
709+
run_model: RunModel,
710+
job_submissions_limit: Optional[int],
711+
return_in_api: bool = False,
712+
include_sensitive: bool = False,
713+
) -> List[Job]:
714+
jobs: List[Job] = []
715+
run_jobs = sorted(run_model.jobs, key=lambda j: (j.replica_num, j.job_num, j.submission_num))
716+
for replica_num, replica_submissions in itertools.groupby(
717+
run_jobs, key=lambda j: j.replica_num
718+
):
719+
for job_num, job_models in itertools.groupby(replica_submissions, key=lambda j: j.job_num):
720+
submissions = []
721+
job_model = None
722+
if job_submissions_limit is not None:
723+
if job_submissions_limit == 0:
724+
# Take latest job submission to return its job_spec
725+
job_models = list(job_models)[-1:]
726+
else:
727+
job_models = list(job_models)[-job_submissions_limit:]
728+
for job_model in job_models:
729+
if job_submissions_limit != 0:
730+
job_submission = job_model_to_job_submission(job_model)
731+
if return_in_api:
732+
# Set default non-None values for 0.18 backward-compatibility
733+
# Remove in 0.19
734+
if job_submission.job_provisioning_data is not None:
735+
if job_submission.job_provisioning_data.hostname is None:
736+
job_submission.job_provisioning_data.hostname = ""
737+
if job_submission.job_provisioning_data.ssh_port is None:
738+
job_submission.job_provisioning_data.ssh_port = 22
739+
submissions.append(job_submission)
740+
if job_model is not None:
741+
# Use the spec from the latest submission. Submissions can have different specs
742+
job_spec = JobSpec.__response__.parse_raw(job_model.job_spec_data)
743+
if not include_sensitive:
744+
_remove_job_spec_sensitive_info(job_spec)
745+
jobs.append(Job(job_spec=job_spec, job_submissions=submissions))
746+
return jobs
747+
748+
730749
async def _get_pool_offers(
731750
session: AsyncSession,
732751
project: ProjectModel,

src/dstack/api/server/_runs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@ def list(
3333
prev_run_id: Optional[UUID] = None,
3434
limit: int = 100,
3535
ascending: bool = False,
36+
include_jobs: bool = True,
3637
job_submissions_limit: Optional[int] = None,
3738
) -> List[Run]:
3839
body = ListRunsRequest(
3940
project_name=project_name,
4041
repo_id=repo_id,
4142
username=username,
4243
only_active=only_active,
44+
include_jobs=include_jobs,
4345
job_submissions_limit=job_submissions_limit,
4446
prev_submitted_at=prev_submitted_at,
4547
prev_run_id=prev_run_id,

0 commit comments

Comments
 (0)