Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/docs/concepts/services.md
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ The rolling deployment stops when all replicas are updated or when a new deploym
??? info "Supported properties"
<!-- NOTE: should be in sync with constants in server/services/runs.py -->

Rolling deployment supports changes to the following properties: `resources`, `volumes`, `docker`, `files`, `image`, `user`, `privileged`, `entrypoint`, `working_dir`, `python`, `nvcc`, `single_branch`, `env`, `shell`, `commands`, as well as changes to [repo](repos.md) or [file](#files) contents.
Rolling deployment supports changes to the following properties: `port`, `resources`, `volumes`, `docker`, `files`, `image`, `user`, `privileged`, `entrypoint`, `working_dir`, `python`, `nvcc`, `single_branch`, `env`, `shell`, `commands`, as well as changes to [repo](repos.md) or [file](#files) contents.

Changes to `replicas` and `scaling` can be applied without redeploying replicas.

Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/compatibility/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ def get_job_spec_excludes(job_specs: list[JobSpec]) -> IncludeExcludeDictType:
spec_excludes["repo_data"] = True
if all(not s.file_archives for s in job_specs):
spec_excludes["file_archives"] = True
if all(s.service_port is None for s in job_specs):
spec_excludes["service_port"] = True

return spec_excludes

Expand Down
3 changes: 2 additions & 1 deletion src/dstack/_internal/core/models/configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,9 @@ class TaskConfiguration(

class ServiceConfigurationParams(CoreModel):
port: Annotated[
# NOTE: it's a PortMapping for historical reasons. Only `port.container_port` is used.
Union[ValidPort, constr(regex=r"^[0-9]+:[0-9]+$"), PortMapping],
Field(description="The port, that application listens on or the mapping"),
Field(description="The port the application listens on"),
]
gateway: Annotated[
Optional[Union[bool, str]],
Expand Down
11 changes: 11 additions & 0 deletions src/dstack/_internal/core/models/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
DEFAULT_REPO_DIR,
AnyRunConfiguration,
RunConfiguration,
ServiceConfiguration,
)
from dstack._internal.core.models.files import FileArchiveMapping
from dstack._internal.core.models.instances import (
Expand Down Expand Up @@ -227,6 +228,8 @@ class JobSpec(CoreModel):
# TODO: drop this comment when supporting jobs submitted before 0.19.17 is no longer relevant.
repo_code_hash: Optional[str] = None
file_archives: list[FileArchiveMapping] = []
# None for non-services and pre-0.19.19 services. See `get_service_port`
service_port: Optional[int] = None


class JobProvisioningData(CoreModel):
Expand Down Expand Up @@ -640,3 +643,11 @@ def get_policy_map(spot_policy: Optional[SpotPolicy], default: SpotPolicy) -> Op
SpotPolicy.ONDEMAND: False,
}
return policy_map[spot_policy]


def get_service_port(job_spec: JobSpec, configuration: ServiceConfiguration) -> int:
# Compatibility with pre-0.19.19 job specs that do not have the `service_port` property.
# TODO: drop when pre-0.19.19 jobs are no longer relevant.
if job_spec.service_port is None:
return configuration.port.container_port
return job_spec.service_port
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/services/ssh/attach.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def __init__(
run_name: str,
dockerized: bool,
ssh_proxy: Optional[SSHConnectionParams] = None,
service_port: Optional[int] = None,
local_backend: bool = False,
bind_address: Optional[str] = None,
):
Expand All @@ -90,6 +91,7 @@ def __init__(
},
)
self.ssh_proxy = ssh_proxy
self.service_port = service_port

hosts: dict[str, dict[str, Union[str, int, FilePath]]] = {}
self.hosts = hosts
Expand Down
8 changes: 5 additions & 3 deletions src/dstack/_internal/server/services/gateways/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

from dstack._internal.core.consts import DSTACK_RUNNER_SSH_PORT
from dstack._internal.core.errors import GatewayError
from dstack._internal.core.models.configurations import RateLimit
from dstack._internal.core.models.configurations import RateLimit, ServiceConfiguration
from dstack._internal.core.models.instances import SSHConnectionParams
from dstack._internal.core.models.runs import JobSubmission, Run
from dstack._internal.core.models.runs import JobSpec, JobSubmission, Run, get_service_port
from dstack._internal.proxy.gateway.schemas.stats import ServiceStats
from dstack._internal.server import settings

Expand Down Expand Up @@ -80,13 +80,15 @@ async def unregister_service(self, project: str, run_name: str):
async def register_replica(
self,
run: Run,
job_spec: JobSpec,
job_submission: JobSubmission,
ssh_head_proxy: Optional[SSHConnectionParams],
ssh_head_proxy_private_key: Optional[str],
):
assert isinstance(run.run_spec.configuration, ServiceConfiguration)
payload = {
"job_id": job_submission.id.hex,
"app_port": run.run_spec.configuration.port.container_port,
"app_port": get_service_port(job_spec, run.run_spec.configuration),
"ssh_head_proxy": ssh_head_proxy.dict() if ssh_head_proxy is not None else None,
"ssh_head_proxy_private_key": ssh_head_proxy_private_key,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
PortMapping,
PythonVersion,
RunConfigurationType,
ServiceConfiguration,
)
from dstack._internal.core.models.profiles import (
DEFAULT_STOP_DURATION,
Expand Down Expand Up @@ -153,6 +154,7 @@ async def _get_job_spec(
repo_data=self.run_spec.repo_data,
repo_code_hash=self.run_spec.repo_code_hash,
file_archives=self.run_spec.file_archives,
service_port=self._service_port(),
)
return job_spec

Expand Down Expand Up @@ -306,6 +308,11 @@ def _ssh_key(self, jobs_per_replica: int) -> Optional[JobSSHKey]:
)
return self._job_ssh_key

def _service_port(self) -> Optional[int]:
if isinstance(self.run_spec.configuration, ServiceConfiguration):
return self.run_spec.configuration.port.container_port
return None


def interpolate_job_volumes(
run_volumes: List[Union[MountPoint, str]],
Expand Down
5 changes: 4 additions & 1 deletion src/dstack/_internal/server/services/proxy/repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
from dstack._internal.core.models.instances import RemoteConnectionInfo, SSHConnectionParams
from dstack._internal.core.models.runs import (
JobProvisioningData,
JobSpec,
JobStatus,
RunSpec,
RunStatus,
ServiceSpec,
get_service_port,
)
from dstack._internal.core.models.services import AnyModel
from dstack._internal.proxy.lib.models import (
Expand Down Expand Up @@ -97,9 +99,10 @@ async def get_service(self, project_name: str, run_name: str) -> Optional[Servic
if rci.ssh_proxy is not None:
ssh_head_proxy = rci.ssh_proxy
ssh_head_proxy_private_key = get_or_error(rci.ssh_proxy_keys)[0].private
job_spec: JobSpec = JobSpec.__response__.parse_raw(job.job_spec_data)
replica = Replica(
id=job.id.hex,
app_port=run_spec.configuration.port.container_port,
app_port=get_service_port(job_spec, run_spec.configuration),
ssh_destination=ssh_destination,
ssh_port=ssh_port,
ssh_proxy=ssh_proxy,
Expand Down
1 change: 1 addition & 0 deletions src/dstack/_internal/server/services/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,7 @@ def _validate_run_spec_and_set_defaults(run_spec: RunSpec):
"scaling",
# rolling deployment
# NOTE: keep this list in sync with the "Rolling deployment" section in services.md
"port",
"resources",
"volumes",
"docker",
Expand Down
3 changes: 2 additions & 1 deletion src/dstack/_internal/server/services/services/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from dstack._internal.core.models.configurations import SERVICE_HTTPS_DEFAULT, ServiceConfiguration
from dstack._internal.core.models.gateways import GatewayConfiguration, GatewayStatus
from dstack._internal.core.models.instances import SSHConnectionParams
from dstack._internal.core.models.runs import Run, RunSpec, ServiceModelSpec, ServiceSpec
from dstack._internal.core.models.runs import JobSpec, Run, RunSpec, ServiceModelSpec, ServiceSpec
from dstack._internal.server import settings
from dstack._internal.server.models import GatewayModel, JobModel, ProjectModel, RunModel
from dstack._internal.server.services.gateways import (
Expand Down Expand Up @@ -179,6 +179,7 @@ async def register_replica(
async with conn.client() as client:
await client.register_replica(
run=run,
job_spec=JobSpec.__response__.parse_raw(job_model.job_spec_data),
job_submission=job_submission,
ssh_head_proxy=ssh_head_proxy,
ssh_head_proxy_private_key=ssh_head_proxy_private_key,
Expand Down
14 changes: 12 additions & 2 deletions src/dstack/api/_public/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
from dstack._internal.core.consts import DSTACK_RUNNER_HTTP_PORT, DSTACK_RUNNER_SSH_PORT
from dstack._internal.core.errors import ClientError, ConfigurationError, ResourceNotExistsError
from dstack._internal.core.models.backends.base import BackendType
from dstack._internal.core.models.configurations import AnyRunConfiguration, PortMapping
from dstack._internal.core.models.configurations import (
AnyRunConfiguration,
PortMapping,
ServiceConfiguration,
)
from dstack._internal.core.models.files import FileArchiveMapping, FilePathMapping
from dstack._internal.core.models.profiles import (
CreationPolicy,
Expand All @@ -38,6 +42,7 @@
RunPlan,
RunSpec,
RunStatus,
get_service_port,
)
from dstack._internal.core.models.runs import Run as RunModel
from dstack._internal.core.services.logs import URLReplacer
Expand Down Expand Up @@ -163,7 +168,7 @@ def ws_thread():
service_port = 443 if secure else 80
ports = {
**ports,
self._run.run_spec.configuration.port.container_port: service_port,
get_or_error(get_or_error(self._ssh_attach).service_port): service_port,
}
path_prefix = url.path
replace_urls = URLReplacer(
Expand Down Expand Up @@ -338,6 +343,10 @@ def attach(
else:
container_user = "root"

service_port = None
if isinstance(self._run.run_spec.configuration, ServiceConfiguration):
service_port = get_service_port(job.job_spec, self._run.run_spec.configuration)

self._ssh_attach = SSHAttach(
hostname=provisioning_data.hostname,
ssh_port=provisioning_data.ssh_port,
Expand All @@ -349,6 +358,7 @@ def attach(
run_name=name,
dockerized=provisioning_data.dockerized,
ssh_proxy=provisioning_data.ssh_proxy,
service_port=service_port,
local_backend=provisioning_data.backend == BackendType.LOCAL,
bind_address=bind_address,
)
Expand Down
6 changes: 5 additions & 1 deletion src/tests/_internal/server/routers/test_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ def get_dev_env_run_plan_dict(
"repo_code_hash": None,
"repo_data": {"repo_dir": "/repo", "repo_type": "local"},
"file_archives": [],
"service_port": None,
},
"offers": [json.loads(o.json()) for o in offers],
"total_offers": total_offers,
Expand Down Expand Up @@ -441,6 +442,7 @@ def get_dev_env_run_dict(
"repo_code_hash": None,
"repo_data": {"repo_dir": "/repo", "repo_type": "local"},
"file_archives": [],
"service_port": None,
},
"job_submissions": [
{
Expand Down Expand Up @@ -1176,12 +1178,14 @@ async def test_returns_run_plan_instance_volumes(
ServiceConfiguration(
commands=["one", "two"],
port=80,
gateway=None,
replicas=1,
scaling=None,
),
ServiceConfiguration(
commands=["one", "two"],
port=8080, # not updatable
port=8080,
gateway="test-gateway", # not updatable
replicas="2..4",
scaling=ScalingSpec(metric="rps", target=5),
),
Expand Down
Loading