From a7bbd310cbf35218560d32f992308994b1328f6e Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Mon, 30 Jun 2025 23:10:12 +0200 Subject: [PATCH] Rolling deployments for `files` Support rolling deployments for services when the `files` property is updated or the contents of file archives change. Currently, `dstack` always detects phantom changes to file archive contents, since file archives are omitted when getting the run plan. Addressing this is out of scope for this PR. This PR does not address compatibility with previously submitted jobs that are using files, because the files feature has not been released yet, so no users have such jobs. --- runner/internal/executor/files.go | 2 +- runner/internal/schemas/schemas.go | 4 ++-- src/dstack/_internal/core/compatibility/runs.py | 2 ++ src/dstack/_internal/core/models/runs.py | 1 + .../_internal/server/background/tasks/process_running_jobs.py | 4 ++-- src/dstack/_internal/server/schemas/runner.py | 2 +- .../_internal/server/services/jobs/configurators/base.py | 1 + src/dstack/_internal/server/services/runs.py | 2 ++ src/tests/_internal/server/routers/test_runs.py | 2 ++ 9 files changed, 14 insertions(+), 6 deletions(-) diff --git a/runner/internal/executor/files.go b/runner/internal/executor/files.go index 89de206338..e1f414b9af 100644 --- a/runner/internal/executor/files.go +++ b/runner/internal/executor/files.go @@ -51,7 +51,7 @@ func (ex *RunExecutor) setupFiles(ctx context.Context) error { } } - for _, fa := range ex.run.RunSpec.FileArchives { + for _, fa := range ex.jobSpec.FileArchives { log.Trace(ctx, "Extracting file archive", "id", fa.Id, "path", fa.Path) p := path.Clean(fa.Path) diff --git a/runner/internal/schemas/schemas.go b/runner/internal/schemas/schemas.go index e3b950639c..8e439dae02 100644 --- a/runner/internal/schemas/schemas.go +++ b/runner/internal/schemas/schemas.go @@ -47,7 +47,6 @@ type RunSpec struct { RunName string `json:"run_name"` RepoId string `json:"repo_id"` RepoData RepoData `json:"repo_data"` - FileArchives []FileArchive `json:"file_archives"` Configuration Configuration `json:"configuration"` ConfigurationPath string `json:"configuration_path"` } @@ -71,7 +70,8 @@ type JobSpec struct { // `RepoData` is optional for compatibility with jobs submitted before 0.19.17. // Use `RunExecutor.getRepoData()` to get non-nil `RepoData`. // TODO: make required when supporting jobs submitted before 0.19.17 is no longer relevant. - RepoData *RepoData `json:"repo_data"` + RepoData *RepoData `json:"repo_data"` + FileArchives []FileArchive `json:"file_archives"` } type ClusterInfo struct { diff --git a/src/dstack/_internal/core/compatibility/runs.py b/src/dstack/_internal/core/compatibility/runs.py index 172ba879b4..f4e6c6acfb 100644 --- a/src/dstack/_internal/core/compatibility/runs.py +++ b/src/dstack/_internal/core/compatibility/runs.py @@ -138,6 +138,8 @@ def get_job_spec_excludes(job_specs: list[JobSpec]) -> Optional[dict]: spec_excludes["repo_code_hash"] = True if all(s.repo_data is None for s in job_specs): spec_excludes["repo_data"] = True + if all(not s.file_archives for s in job_specs): + spec_excludes["file_archives"] = True if spec_excludes: return spec_excludes diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index 6cf80b9268..49691eb504 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -226,6 +226,7 @@ class JobSpec(CoreModel): # submitted before 0.19.17. See `_get_repo_code_hash` on how to get the correct `repo_code_hash` # TODO: drop this comment when supporting jobs submitted before 0.19.17 is no longer relevant. repo_code_hash: Optional[str] = None + file_archives: list[FileArchiveMapping] = [] class JobProvisioningData(CoreModel): diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index 130f371b7a..b66a70e94d 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -244,7 +244,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): # the runner is not ready yet file_archives = await _get_job_file_archives( session=session, - archive_mappings=run.run_spec.file_archives, + archive_mappings=job.job_spec.file_archives, user=run_model.user, ) code = await _get_job_code( @@ -296,7 +296,7 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): # the runner is not ready yet file_archives = await _get_job_file_archives( session=session, - archive_mappings=run.run_spec.file_archives, + archive_mappings=job.job_spec.file_archives, user=run_model.user, ) code = await _get_job_code( diff --git a/src/dstack/_internal/server/schemas/runner.py b/src/dstack/_internal/server/schemas/runner.py index a2efa2fcd3..f8d59343d7 100644 --- a/src/dstack/_internal/server/schemas/runner.py +++ b/src/dstack/_internal/server/schemas/runner.py @@ -57,7 +57,6 @@ class SubmitBody(CoreModel): "repo_data", "configuration", "configuration_path", - "file_archives", }, } ), @@ -79,6 +78,7 @@ class SubmitBody(CoreModel): "ssh_key", "working_dir", "repo_data", + "file_archives", } ), ] diff --git a/src/dstack/_internal/server/services/jobs/configurators/base.py b/src/dstack/_internal/server/services/jobs/configurators/base.py index b36919174f..55cd729ea0 100644 --- a/src/dstack/_internal/server/services/jobs/configurators/base.py +++ b/src/dstack/_internal/server/services/jobs/configurators/base.py @@ -151,6 +151,7 @@ async def _get_job_spec( ssh_key=self._ssh_key(jobs_per_replica), repo_data=self.run_spec.repo_data, repo_code_hash=self.run_spec.repo_code_hash, + file_archives=self.run_spec.file_archives, ) return job_spec diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index 616ed87735..8b63de0b33 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -918,6 +918,7 @@ def _validate_run_spec_and_set_defaults(run_spec: RunSpec): # rolling deployment "repo_data", "repo_code_hash", + "file_archives", ], } _CONF_UPDATABLE_FIELDS = ["priority"] @@ -930,6 +931,7 @@ def _validate_run_spec_and_set_defaults(run_spec: RunSpec): # rolling deployment "resources", "volumes", + "files", "image", "user", "privileged", diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index 63a71989e7..1474a64912 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -246,6 +246,7 @@ def get_dev_env_run_plan_dict( "working_dir": ".", "repo_code_hash": None, "repo_data": {"repo_dir": "/repo", "repo_type": "local"}, + "file_archives": [], }, "offers": [json.loads(o.json()) for o in offers], "total_offers": total_offers, @@ -440,6 +441,7 @@ def get_dev_env_run_dict( "working_dir": ".", "repo_code_hash": None, "repo_data": {"repo_dir": "/repo", "repo_type": "local"}, + "file_archives": [], }, "job_submissions": [ {