Skip to content

Commit 908f079

Browse files
authored
Rolling deployment for services (#2821)
Support rolling deployment when some service configuration properties are changed. During the deployment, `dstack` updates service replicas one by one. It first starts a new version of the replica, then waits until it is running, then stops the old version of the replica. As part of the implementation, introduce a new run and job property - `deployment_num`. When a new configuration is applied, the `deployment_num` of the run is incremented. Then, `dstack` gradually updates the jobs so that their `deployment_num` matches that of the run. Some jobs are updated in-place, if the new configuration does not affect their spec, others are redeployed as described above. ```shell > dstack apply Active run test-service already exists. Detected configuration changes that can be updated in-place: ['image', 'env', 'commands'] Update the run? [y/n]: y ⠋ Launching test-service... NAME BACKEND RESOURCES PRICE STATUS SUBMITTED test-service deployment=1 running 11 mins ago replica=0 job=0 deployment=0 aws (us-west-2) cpu=2 mem=1GB disk=100GB (spot) $0.0026 terminating 11 mins ago replica=1 job=0 deployment=1 aws (us-west-2) cpu=2 mem=1GB disk=100GB (spot) $0.0026 running 1 min ago ```
1 parent eac3c1c commit 908f079

File tree

18 files changed

+868
-233
lines changed

18 files changed

+868
-233
lines changed

src/dstack/_internal/cli/commands/attach.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,8 @@ def _register(self):
5252
)
5353
self._parser.add_argument(
5454
"--replica",
55-
help="The replica number. Defaults to 0.",
55+
help="The replica number. Defaults to any running replica.",
5656
type=int,
57-
default=0,
5857
)
5958
self._parser.add_argument(
6059
"--job",
@@ -129,14 +128,15 @@ def _print_finished_message_when_available(run: Run) -> None:
129128
def _print_attached_message(
130129
run: Run,
131130
bind_address: Optional[str],
132-
replica_num: int,
131+
replica_num: Optional[int],
133132
job_num: int,
134133
):
135134
if bind_address is None:
136135
bind_address = "localhost"
137136

138-
output = f"Attached to run [code]{run.name}[/] (replica={replica_num} job={job_num})\n"
139137
job = get_or_error(run._find_job(replica_num=replica_num, job_num=job_num))
138+
replica_num = job.job_spec.replica_num
139+
output = f"Attached to run [code]{run.name}[/] (replica={replica_num} job={job_num})\n"
140140
name = run.name
141141
if replica_num != 0 or job_num != 0:
142142
name = job.job_spec.job_name

src/dstack/_internal/cli/services/configurators/run.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,7 @@ def _is_ready_to_attach(run: Run) -> bool:
599599
]
600600
or run._run.jobs[0].job_submissions[-1].status
601601
in [JobStatus.SUBMITTED, JobStatus.PROVISIONING, JobStatus.PULLING]
602+
or run._run.is_deployment_in_progress()
602603
)
603604

604605

src/dstack/_internal/cli/utils/run.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,16 @@ def get_runs_table(
162162

163163
for run in runs:
164164
run = run._run # TODO(egor-s): make public attribute
165+
show_deployment_num = (
166+
verbose
167+
and run.run_spec.configuration.type == "service"
168+
or run.is_deployment_in_progress()
169+
)
170+
merge_job_rows = len(run.jobs) == 1 and not show_deployment_num
165171

166172
run_row: Dict[Union[str, int], Any] = {
167-
"NAME": run.run_spec.run_name,
173+
"NAME": run.run_spec.run_name
174+
+ (f" [secondary]deployment={run.deployment_num}[/]" if show_deployment_num else ""),
168175
"SUBMITTED": format_date(run.submitted_at),
169176
"STATUS": (
170177
run.latest_job_submission.status_message
@@ -174,7 +181,7 @@ def get_runs_table(
174181
}
175182
if run.error:
176183
run_row["ERROR"] = run.error
177-
if len(run.jobs) != 1:
184+
if not merge_job_rows:
178185
add_row_from_dict(table, run_row)
179186

180187
for job in run.jobs:
@@ -184,7 +191,12 @@ def get_runs_table(
184191
inactive_for = format_duration_multiunit(latest_job_submission.inactivity_secs)
185192
status += f" (inactive for {inactive_for})"
186193
job_row: Dict[Union[str, int], Any] = {
187-
"NAME": f" replica={job.job_spec.replica_num} job={job.job_spec.job_num}",
194+
"NAME": f" replica={job.job_spec.replica_num} job={job.job_spec.job_num}"
195+
+ (
196+
f" deployment={latest_job_submission.deployment_num}"
197+
if show_deployment_num
198+
else ""
199+
),
188200
"STATUS": latest_job_submission.status_message,
189201
"SUBMITTED": format_date(latest_job_submission.submitted_at),
190202
"ERROR": latest_job_submission.error,
@@ -208,7 +220,7 @@ def get_runs_table(
208220
"PRICE": f"${jpd.price:.4f}".rstrip("0").rstrip("."),
209221
}
210222
)
211-
if len(run.jobs) == 1:
223+
if merge_job_rows:
212224
# merge rows
213225
job_row.update(run_row)
214226
add_row_from_dict(table, job_row, style="secondary" if len(run.jobs) != 1 else None)

src/dstack/_internal/core/compatibility/runs.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ def get_apply_plan_excludes(plan: ApplyRunPlanInput) -> Optional[Dict]:
1919
if current_resource is not None:
2020
current_resource_excludes = {}
2121
current_resource_excludes["status_message"] = True
22+
if current_resource.deployment_num == 0:
23+
current_resource_excludes["deployment_num"] = True
2224
apply_plan_excludes["current_resource"] = current_resource_excludes
2325
current_resource_excludes["run_spec"] = get_run_spec_excludes(current_resource.run_spec)
2426
job_submissions_excludes = {}
@@ -36,6 +38,8 @@ def get_apply_plan_excludes(plan: ApplyRunPlanInput) -> Optional[Dict]:
3638
}
3739
if all(js.exit_status is None for js in job_submissions):
3840
job_submissions_excludes["exit_status"] = True
41+
if all(js.deployment_num == 0 for js in job_submissions):
42+
job_submissions_excludes["deployment_num"] = True
3943
latest_job_submission = current_resource.latest_job_submission
4044
if latest_job_submission is not None:
4145
latest_job_submission_excludes = {}
@@ -50,6 +54,8 @@ def get_apply_plan_excludes(plan: ApplyRunPlanInput) -> Optional[Dict]:
5054
}
5155
if latest_job_submission.exit_status is None:
5256
latest_job_submission_excludes["exit_status"] = True
57+
if latest_job_submission.deployment_num == 0:
58+
latest_job_submission_excludes["deployment_num"] = True
5359
return {"plan": apply_plan_excludes}
5460

5561

src/dstack/_internal/core/models/runs.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,7 @@ class ClusterInfo(CoreModel):
289289
class JobSubmission(CoreModel):
290290
id: UUID4
291291
submission_num: int
292+
deployment_num: int = 0 # default for compatibility with pre-0.19.14 servers
292293
submitted_at: datetime
293294
last_processed_at: datetime
294295
finished_at: Optional[datetime]
@@ -516,6 +517,7 @@ class Run(CoreModel):
516517
latest_job_submission: Optional[JobSubmission]
517518
cost: float = 0
518519
service: Optional[ServiceSpec] = None
520+
deployment_num: int = 0 # default for compatibility with pre-0.19.14 servers
519521
# TODO: make error a computed field after migrating to pydanticV2
520522
error: Optional[str] = None
521523
deleted: Optional[bool] = None
@@ -578,6 +580,13 @@ def _get_status_message(
578580
return "retrying"
579581
return status.value
580582

583+
def is_deployment_in_progress(self) -> bool:
584+
return any(
585+
not j.job_submissions[-1].status.is_finished()
586+
and j.job_submissions[-1].deployment_num != self.deployment_num
587+
for j in self.jobs
588+
)
589+
581590

582591
class JobPlan(CoreModel):
583592
job_spec: JobSpec

0 commit comments

Comments
 (0)