diff --git a/docs/docs/concepts/services.md b/docs/docs/concepts/services.md index b4b7288bca..a93cacf0d0 100644 --- a/docs/docs/concepts/services.md +++ b/docs/docs/concepts/services.md @@ -725,7 +725,7 @@ The rolling deployment stops when all replicas are updated or when a new deploym ??? info "Supported properties" - Rolling deployment supports changes to the following properties: `resources`, `volumes`, `docker`, `files`, `image`, `user`, `privileged`, `entrypoint`, `working_dir`, `python`, `nvcc`, `single_branch`, `env`, `shell`, `commands`, as well as changes to [repo](repos.md) or [file](#files) contents. + Rolling deployment supports changes to the following properties: `port`, `resources`, `volumes`, `docker`, `files`, `image`, `user`, `privileged`, `entrypoint`, `working_dir`, `python`, `nvcc`, `single_branch`, `env`, `shell`, `commands`, as well as changes to [repo](repos.md) or [file](#files) contents. Changes to `replicas` and `scaling` can be applied without redeploying replicas. diff --git a/src/dstack/_internal/core/compatibility/runs.py b/src/dstack/_internal/core/compatibility/runs.py index 1deea7ee02..e6f41e0403 100644 --- a/src/dstack/_internal/core/compatibility/runs.py +++ b/src/dstack/_internal/core/compatibility/runs.py @@ -139,6 +139,8 @@ def get_job_spec_excludes(job_specs: list[JobSpec]) -> IncludeExcludeDictType: spec_excludes["repo_data"] = True if all(not s.file_archives for s in job_specs): spec_excludes["file_archives"] = True + if all(s.service_port is None for s in job_specs): + spec_excludes["service_port"] = True return spec_excludes diff --git a/src/dstack/_internal/core/models/configurations.py b/src/dstack/_internal/core/models/configurations.py index 770673b1b7..a22db9e361 100644 --- a/src/dstack/_internal/core/models/configurations.py +++ b/src/dstack/_internal/core/models/configurations.py @@ -398,8 +398,9 @@ class TaskConfiguration( class ServiceConfigurationParams(CoreModel): port: Annotated[ + # NOTE: it's a PortMapping for historical reasons. Only `port.container_port` is used. Union[ValidPort, constr(regex=r"^[0-9]+:[0-9]+$"), PortMapping], - Field(description="The port, that application listens on or the mapping"), + Field(description="The port the application listens on"), ] gateway: Annotated[ Optional[Union[bool, str]], diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index a90e39d9cd..a439e80938 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -11,6 +11,7 @@ DEFAULT_REPO_DIR, AnyRunConfiguration, RunConfiguration, + ServiceConfiguration, ) from dstack._internal.core.models.files import FileArchiveMapping from dstack._internal.core.models.instances import ( @@ -227,6 +228,8 @@ class JobSpec(CoreModel): # TODO: drop this comment when supporting jobs submitted before 0.19.17 is no longer relevant. repo_code_hash: Optional[str] = None file_archives: list[FileArchiveMapping] = [] + # None for non-services and pre-0.19.19 services. See `get_service_port` + service_port: Optional[int] = None class JobProvisioningData(CoreModel): @@ -640,3 +643,11 @@ def get_policy_map(spot_policy: Optional[SpotPolicy], default: SpotPolicy) -> Op SpotPolicy.ONDEMAND: False, } return policy_map[spot_policy] + + +def get_service_port(job_spec: JobSpec, configuration: ServiceConfiguration) -> int: + # Compatibility with pre-0.19.19 job specs that do not have the `service_port` property. + # TODO: drop when pre-0.19.19 jobs are no longer relevant. + if job_spec.service_port is None: + return configuration.port.container_port + return job_spec.service_port diff --git a/src/dstack/_internal/core/services/ssh/attach.py b/src/dstack/_internal/core/services/ssh/attach.py index a095fafb26..d0ad4ac644 100644 --- a/src/dstack/_internal/core/services/ssh/attach.py +++ b/src/dstack/_internal/core/services/ssh/attach.py @@ -64,6 +64,7 @@ def __init__( run_name: str, dockerized: bool, ssh_proxy: Optional[SSHConnectionParams] = None, + service_port: Optional[int] = None, local_backend: bool = False, bind_address: Optional[str] = None, ): @@ -90,6 +91,7 @@ def __init__( }, ) self.ssh_proxy = ssh_proxy + self.service_port = service_port hosts: dict[str, dict[str, Union[str, int, FilePath]]] = {} self.hosts = hosts diff --git a/src/dstack/_internal/server/services/gateways/client.py b/src/dstack/_internal/server/services/gateways/client.py index ed87bc84f8..aa4b4823cf 100644 --- a/src/dstack/_internal/server/services/gateways/client.py +++ b/src/dstack/_internal/server/services/gateways/client.py @@ -7,9 +7,9 @@ from dstack._internal.core.consts import DSTACK_RUNNER_SSH_PORT from dstack._internal.core.errors import GatewayError -from dstack._internal.core.models.configurations import RateLimit +from dstack._internal.core.models.configurations import RateLimit, ServiceConfiguration from dstack._internal.core.models.instances import SSHConnectionParams -from dstack._internal.core.models.runs import JobSubmission, Run +from dstack._internal.core.models.runs import JobSpec, JobSubmission, Run, get_service_port from dstack._internal.proxy.gateway.schemas.stats import ServiceStats from dstack._internal.server import settings @@ -80,13 +80,15 @@ async def unregister_service(self, project: str, run_name: str): async def register_replica( self, run: Run, + job_spec: JobSpec, job_submission: JobSubmission, ssh_head_proxy: Optional[SSHConnectionParams], ssh_head_proxy_private_key: Optional[str], ): + assert isinstance(run.run_spec.configuration, ServiceConfiguration) payload = { "job_id": job_submission.id.hex, - "app_port": run.run_spec.configuration.port.container_port, + "app_port": get_service_port(job_spec, run.run_spec.configuration), "ssh_head_proxy": ssh_head_proxy.dict() if ssh_head_proxy is not None else None, "ssh_head_proxy_private_key": ssh_head_proxy_private_key, } diff --git a/src/dstack/_internal/server/services/jobs/configurators/base.py b/src/dstack/_internal/server/services/jobs/configurators/base.py index 079e47f0b0..e1fcee5973 100644 --- a/src/dstack/_internal/server/services/jobs/configurators/base.py +++ b/src/dstack/_internal/server/services/jobs/configurators/base.py @@ -15,6 +15,7 @@ PortMapping, PythonVersion, RunConfigurationType, + ServiceConfiguration, ) from dstack._internal.core.models.profiles import ( DEFAULT_STOP_DURATION, @@ -153,6 +154,7 @@ async def _get_job_spec( repo_data=self.run_spec.repo_data, repo_code_hash=self.run_spec.repo_code_hash, file_archives=self.run_spec.file_archives, + service_port=self._service_port(), ) return job_spec @@ -306,6 +308,11 @@ def _ssh_key(self, jobs_per_replica: int) -> Optional[JobSSHKey]: ) return self._job_ssh_key + def _service_port(self) -> Optional[int]: + if isinstance(self.run_spec.configuration, ServiceConfiguration): + return self.run_spec.configuration.port.container_port + return None + def interpolate_job_volumes( run_volumes: List[Union[MountPoint, str]], diff --git a/src/dstack/_internal/server/services/proxy/repo.py b/src/dstack/_internal/server/services/proxy/repo.py index 4578cb56fb..7c21e840ba 100644 --- a/src/dstack/_internal/server/services/proxy/repo.py +++ b/src/dstack/_internal/server/services/proxy/repo.py @@ -12,10 +12,12 @@ from dstack._internal.core.models.instances import RemoteConnectionInfo, SSHConnectionParams from dstack._internal.core.models.runs import ( JobProvisioningData, + JobSpec, JobStatus, RunSpec, RunStatus, ServiceSpec, + get_service_port, ) from dstack._internal.core.models.services import AnyModel from dstack._internal.proxy.lib.models import ( @@ -97,9 +99,10 @@ async def get_service(self, project_name: str, run_name: str) -> Optional[Servic if rci.ssh_proxy is not None: ssh_head_proxy = rci.ssh_proxy ssh_head_proxy_private_key = get_or_error(rci.ssh_proxy_keys)[0].private + job_spec: JobSpec = JobSpec.__response__.parse_raw(job.job_spec_data) replica = Replica( id=job.id.hex, - app_port=run_spec.configuration.port.container_port, + app_port=get_service_port(job_spec, run_spec.configuration), ssh_destination=ssh_destination, ssh_port=ssh_port, ssh_proxy=ssh_proxy, diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index 84f6f3e793..e4f2eb82c7 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -945,6 +945,7 @@ def _validate_run_spec_and_set_defaults(run_spec: RunSpec): "scaling", # rolling deployment # NOTE: keep this list in sync with the "Rolling deployment" section in services.md + "port", "resources", "volumes", "docker", diff --git a/src/dstack/_internal/server/services/services/__init__.py b/src/dstack/_internal/server/services/services/__init__.py index 062390b9af..ba3168c590 100644 --- a/src/dstack/_internal/server/services/services/__init__.py +++ b/src/dstack/_internal/server/services/services/__init__.py @@ -22,7 +22,7 @@ from dstack._internal.core.models.configurations import SERVICE_HTTPS_DEFAULT, ServiceConfiguration from dstack._internal.core.models.gateways import GatewayConfiguration, GatewayStatus from dstack._internal.core.models.instances import SSHConnectionParams -from dstack._internal.core.models.runs import Run, RunSpec, ServiceModelSpec, ServiceSpec +from dstack._internal.core.models.runs import JobSpec, Run, RunSpec, ServiceModelSpec, ServiceSpec from dstack._internal.server import settings from dstack._internal.server.models import GatewayModel, JobModel, ProjectModel, RunModel from dstack._internal.server.services.gateways import ( @@ -179,6 +179,7 @@ async def register_replica( async with conn.client() as client: await client.register_replica( run=run, + job_spec=JobSpec.__response__.parse_raw(job_model.job_spec_data), job_submission=job_submission, ssh_head_proxy=ssh_head_proxy, ssh_head_proxy_private_key=ssh_head_proxy_private_key, diff --git a/src/dstack/api/_public/runs.py b/src/dstack/api/_public/runs.py index feeacbe63f..f9696dc60b 100644 --- a/src/dstack/api/_public/runs.py +++ b/src/dstack/api/_public/runs.py @@ -18,7 +18,11 @@ from dstack._internal.core.consts import DSTACK_RUNNER_HTTP_PORT, DSTACK_RUNNER_SSH_PORT from dstack._internal.core.errors import ClientError, ConfigurationError, ResourceNotExistsError from dstack._internal.core.models.backends.base import BackendType -from dstack._internal.core.models.configurations import AnyRunConfiguration, PortMapping +from dstack._internal.core.models.configurations import ( + AnyRunConfiguration, + PortMapping, + ServiceConfiguration, +) from dstack._internal.core.models.files import FileArchiveMapping, FilePathMapping from dstack._internal.core.models.profiles import ( CreationPolicy, @@ -38,6 +42,7 @@ RunPlan, RunSpec, RunStatus, + get_service_port, ) from dstack._internal.core.models.runs import Run as RunModel from dstack._internal.core.services.logs import URLReplacer @@ -163,7 +168,7 @@ def ws_thread(): service_port = 443 if secure else 80 ports = { **ports, - self._run.run_spec.configuration.port.container_port: service_port, + get_or_error(get_or_error(self._ssh_attach).service_port): service_port, } path_prefix = url.path replace_urls = URLReplacer( @@ -338,6 +343,10 @@ def attach( else: container_user = "root" + service_port = None + if isinstance(self._run.run_spec.configuration, ServiceConfiguration): + service_port = get_service_port(job.job_spec, self._run.run_spec.configuration) + self._ssh_attach = SSHAttach( hostname=provisioning_data.hostname, ssh_port=provisioning_data.ssh_port, @@ -349,6 +358,7 @@ def attach( run_name=name, dockerized=provisioning_data.dockerized, ssh_proxy=provisioning_data.ssh_proxy, + service_port=service_port, local_backend=provisioning_data.backend == BackendType.LOCAL, bind_address=bind_address, ) diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index eafd956e5a..4ec0c42259 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -246,6 +246,7 @@ def get_dev_env_run_plan_dict( "repo_code_hash": None, "repo_data": {"repo_dir": "/repo", "repo_type": "local"}, "file_archives": [], + "service_port": None, }, "offers": [json.loads(o.json()) for o in offers], "total_offers": total_offers, @@ -441,6 +442,7 @@ def get_dev_env_run_dict( "repo_code_hash": None, "repo_data": {"repo_dir": "/repo", "repo_type": "local"}, "file_archives": [], + "service_port": None, }, "job_submissions": [ { @@ -1176,12 +1178,14 @@ async def test_returns_run_plan_instance_volumes( ServiceConfiguration( commands=["one", "two"], port=80, + gateway=None, replicas=1, scaling=None, ), ServiceConfiguration( commands=["one", "two"], - port=8080, # not updatable + port=8080, + gateway="test-gateway", # not updatable replicas="2..4", scaling=ScalingSpec(metric="rps", target=5), ),