From 579aabb4cbad4d29f48f0f259081022383c258fd Mon Sep 17 00:00:00 2001 From: Dmitry Meyer Date: Wed, 5 Nov 2025 14:51:10 +0000 Subject: [PATCH] Kubernetes: request all suitable GPUs Previously, KubernetesCompute only used GPU from the first offer to set node affinity, and if that type of GPU was not available (e.g., another job or even some non-dstack pod had already taken it), the job eventually failed with FAILED_TO_START_DUE_TO_NO_CAPACITY, even if there were other GPUs matching the run spec requirements. Now, we inspect all nodes to request all suitable GPUs (any of). In addition, we now use upper bounds of Ranges (CPU, memory, disk) as limits except for GPU, which cannot have request =/= limit (as it cannot be overcommited). Part-of: https://github.com/dstackai/dstack/issues/3126 --- src/dstack/_internal/cli/utils/run.py | 79 +++++--- .../core/backends/kubernetes/compute.py | 186 +++++++++++------- 2 files changed, 166 insertions(+), 99 deletions(-) diff --git a/src/dstack/_internal/cli/utils/run.py b/src/dstack/_internal/cli/utils/run.py index 5e68143b40..68444997ff 100644 --- a/src/dstack/_internal/cli/utils/run.py +++ b/src/dstack/_internal/cli/utils/run.py @@ -5,16 +5,19 @@ from rich.table import Table from dstack._internal.cli.utils.common import NO_OFFERS_WARNING, add_row_from_dict, console +from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.configurations import DevEnvironmentConfiguration -from dstack._internal.core.models.instances import InstanceAvailability, Resources +from dstack._internal.core.models.instances import ( + InstanceAvailability, + InstanceOfferWithAvailability, + InstanceType, +) from dstack._internal.core.models.profiles import ( DEFAULT_RUN_TERMINATION_IDLE_TIME, TerminationPolicy, ) from dstack._internal.core.models.runs import ( Job, - JobProvisioningData, - JobRuntimeData, JobStatus, JobSubmission, Probe, @@ -294,27 +297,24 @@ def _format_price(price: float, is_spot: bool) -> str: return price_str -def _format_backend(backend: Any, region: str) -> str: - backend_str = getattr(backend, "value", backend) - backend_str = backend_str.replace("remote", "ssh") +def _format_backend(backend_type: BackendType, region: str) -> str: + backend_str = backend_type.value + if backend_type == BackendType.REMOTE: + backend_str = "ssh" return f"{backend_str} ({region})" -def _format_instance_type(jpd: JobProvisioningData, jrd: Optional[JobRuntimeData]) -> str: - instance_type = jpd.instance_type.name - if jrd is not None and getattr(jrd, "offer", None) is not None: - if jrd.offer.total_blocks > 1: - instance_type += f" ({jrd.offer.blocks}/{jrd.offer.total_blocks})" - if jpd.reservation: - instance_type += f" ({jpd.reservation})" - return instance_type - - -def _get_resources(jpd: JobProvisioningData, jrd: Optional[JobRuntimeData]) -> Resources: - resources: Resources = jpd.instance_type.resources - if jrd is not None and getattr(jrd, "offer", None) is not None: - resources = jrd.offer.instance.resources - return resources +def _format_instance_type( + instance_type: InstanceType, + shared_offer: Optional[InstanceOfferWithAvailability], + reservation: Optional[str], +) -> str: + instance_type_str = instance_type.name + if shared_offer is not None: + instance_type_str += f" ({shared_offer.blocks}/{shared_offer.total_blocks})" + if reservation is not None: + instance_type_str += f" ({reservation})" + return instance_type_str def _format_run_name(run: CoreRun, show_deployment_num: bool) -> str: @@ -387,16 +387,35 @@ def get_runs_table( } jpd = latest_job_submission.job_provisioning_data if jpd is not None: + shared_offer: Optional[InstanceOfferWithAvailability] = None + instance_type = jpd.instance_type + price = jpd.price jrd = latest_job_submission.job_runtime_data - resources = _get_resources(jpd, jrd) - update_dict: Dict[Union[str, int], Any] = { - "BACKEND": _format_backend(jpd.backend, jpd.region), - "RESOURCES": resources.pretty_format(include_spot=False), - "GPU": resources.pretty_format(gpu_only=True, include_spot=False), - "INSTANCE TYPE": _format_instance_type(jpd, jrd), - "PRICE": _format_price(jpd.price, resources.spot), - } - job_row.update(update_dict) + if jrd is not None and jrd.offer is not None and jrd.offer.total_blocks > 1: + # We only use offer data from jrd if the job is/was running on a shared + # instance (the instance blocks feature). In that case, jpd contains the full + # instance offer data, while jrd contains the shared offer (a fraction of + # the full offer). Although jrd always contains the offer, we don't use it in + # other cases, as, unlike jpd offer data, jrd offer is not updated after + # Compute.update_provisioning_data() call, but some backends, namely + # Kubernetes, may update offer data via that method. + # As long as we don't have a backend which both supports the blocks feature + # and may update offer data in update_provisioning_data(), this logic is fine. + shared_offer = jrd.offer + instance_type = shared_offer.instance + price = shared_offer.price + resources = instance_type.resources + job_row.update( + { + "BACKEND": _format_backend(jpd.backend, jpd.region), + "RESOURCES": resources.pretty_format(include_spot=False), + "GPU": resources.pretty_format(gpu_only=True, include_spot=False), + "INSTANCE TYPE": _format_instance_type( + instance_type, shared_offer, jpd.reservation + ), + "PRICE": _format_price(price, resources.spot), + } + ) if merge_job_rows: _status = job_row["STATUS"] _resources = job_row["RESOURCES"] diff --git a/src/dstack/_internal/core/backends/kubernetes/compute.py b/src/dstack/_internal/core/backends/kubernetes/compute.py index bf2c7c7fba..8647e49c62 100644 --- a/src/dstack/_internal/core/backends/kubernetes/compute.py +++ b/src/dstack/_internal/core/backends/kubernetes/compute.py @@ -50,7 +50,7 @@ Resources, SSHConnectionParams, ) -from dstack._internal.core.models.resources import CPUSpec, Memory +from dstack._internal.core.models.resources import CPUSpec, GPUSpec, Memory from dstack._internal.core.models.runs import Job, JobProvisioningData, Requirements, Run from dstack._internal.core.models.volumes import Volume from dstack._internal.utils.common import parse_memory @@ -123,38 +123,10 @@ def get_offers_by_requirements( ) nodes = get_value(node_list, ".items", list[client.V1Node], required=True) for node in nodes: - try: - name = get_value(node, ".metadata.name", str, required=True) - cpu_arch = normalize_arch( - get_value(node, ".status.node_info.architecture", str) - ).to_cpu_architecture() - allocatable = get_value(node, ".status.allocatable", dict[str, str], required=True) - cpus = _parse_cpu(allocatable["cpu"]) - memory_mib = _parse_memory(allocatable["memory"]) - disk_size_mib = _parse_memory(allocatable["ephemeral-storage"]) - gpus = _get_node_gpus(node) - except (AttributeError, KeyError, ValueError) as e: - logger.exception("Failed to process node: %s: %s", type(e).__name__, e) - continue - instance_offer = InstanceOfferWithAvailability( - backend=BackendType.KUBERNETES, - instance=InstanceType( - name=name, - resources=Resources( - cpus=cpus, - cpu_arch=cpu_arch, - memory_mib=memory_mib, - gpus=gpus, - spot=False, - disk=Disk(size_mib=disk_size_mib), - ), - ), - price=0, - region=DUMMY_REGION, - availability=InstanceAvailability.AVAILABLE, - instance_runtime=InstanceRuntime.RUNNER, - ) - instance_offers.extend(filter_offers_by_requirements([instance_offer], requirements)) + if (instance_offer := _get_instance_offer_from_node(node)) is not None: + instance_offers.extend( + filter_offers_by_requirements([instance_offer], requirements) + ) return instance_offers def run_job( @@ -216,18 +188,17 @@ def run_job( assert isinstance(resources_spec.cpu, CPUSpec) if (cpu_min := resources_spec.cpu.count.min) is not None: resources_requests["cpu"] = str(cpu_min) + if (cpu_max := resources_spec.cpu.count.max) is not None: + resources_limits["cpu"] = str(cpu_max) if (gpu_spec := resources_spec.gpu) is not None: gpu_min = gpu_spec.count.min if gpu_min is not None and gpu_min > 0: - if not (offer_gpus := instance_offer.instance.resources.gpus): - raise ComputeError( - "GPU is requested but the offer has no GPUs:" - f" {gpu_spec=} {instance_offer=}", - ) gpu_resource, node_affinity, node_taint = _get_pod_spec_parameters_for_gpu( - self.api, offer_gpus[0] + self.api, gpu_spec ) logger.debug("Requesting GPU resource: %s=%d", gpu_resource, gpu_min) + # Limit must be set (GPU resources cannot be overcommitted) + # and must be equal to request. resources_requests[gpu_resource] = resources_limits[gpu_resource] = str(gpu_min) # It should be NoSchedule, but we also add NoExecute toleration just in case. for effect in [TaintEffect.NO_SCHEDULE, TaintEffect.NO_EXECUTE]: @@ -238,11 +209,13 @@ def run_job( ) if (memory_min := resources_spec.memory.min) is not None: resources_requests["memory"] = _render_memory(memory_min) - if ( - resources_spec.disk is not None - and (disk_min := resources_spec.disk.size.min) is not None - ): - resources_requests["ephemeral-storage"] = _render_memory(disk_min) + if (memory_max := resources_spec.memory.max) is not None: + resources_limits["memory"] = _render_memory(memory_max) + if (disk_spec := resources_spec.disk) is not None: + if (disk_min := disk_spec.size.min) is not None: + resources_requests["ephemeral-storage"] = _render_memory(disk_min) + if (disk_max := disk_spec.size.max) is not None: + resources_limits["ephemeral-storage"] = _render_memory(disk_max) if (shm_size := resources_spec.shm_size) is not None: shm_volume_name = "dev-shm" volumes_.append( @@ -328,8 +301,9 @@ def run_job( instance_type=instance_offer.instance, instance_id=instance_name, # Although we can already get Service's ClusterIP from the `V1Service` object returned - # by the `create_namespaced_service` method, we still need PodIP for multinode runs. - # We'll update both hostname and internal_ip once the pod is assigned to the node. + # by the `create_namespaced_service` method, we still need 1) updated instance offer + # 2) PodIP for multinode runs. + # We'll update all these fields once the pod is assigned to the node. hostname=None, internal_ip=None, region=instance_offer.region, @@ -368,6 +342,15 @@ def update_provisioning_data( namespace=self.config.namespace, ) provisioning_data.hostname = get_value(service, ".spec.cluster_ip", str, required=True) + node = call_api_method( + self.api.read_node, + client.V1Node, + name=get_value(pod, ".spec.node_name", str, required=True), + ) + if (instance_offer := _get_instance_offer_from_node(node)) is not None: + provisioning_data.instance_type = instance_offer.instance + provisioning_data.region = instance_offer.region + provisioning_data.price = instance_offer.price def terminate_instance( self, instance_id: str, region: str, backend_data: Optional[str] = None @@ -500,6 +483,40 @@ def terminate_gateway( ) +def _get_instance_offer_from_node(node: client.V1Node) -> Optional[InstanceOfferWithAvailability]: + try: + name = get_value(node, ".metadata.name", str, required=True) + cpu_arch = normalize_arch( + get_value(node, ".status.node_info.architecture", str) + ).to_cpu_architecture() + allocatable = get_value(node, ".status.allocatable", dict[str, str], required=True) + cpus = _parse_cpu(allocatable["cpu"]) + memory_mib = _parse_memory(allocatable["memory"]) + disk_size_mib = _parse_memory(allocatable["ephemeral-storage"]) + gpus = _get_node_gpus(node) + except (AttributeError, KeyError, ValueError) as e: + logger.exception("Failed to process node: %s: %s", type(e).__name__, e) + return None + return InstanceOfferWithAvailability( + backend=BackendType.KUBERNETES, + instance=InstanceType( + name=name, + resources=Resources( + cpus=cpus, + cpu_arch=cpu_arch, + memory_mib=memory_mib, + gpus=gpus, + spot=False, + disk=Disk(size_mib=disk_size_mib), + ), + ), + price=0, + region=DUMMY_REGION, + availability=InstanceAvailability.AVAILABLE, + instance_runtime=InstanceRuntime.RUNNER, + ) + + def _parse_cpu(cpu: str) -> int: if cpu.endswith("m"): # "m" means millicpu (1/1000 CPU), e.g., 7900m -> 7.9 -> 7 @@ -590,36 +607,39 @@ def _get_amd_gpu_from_node_labels(labels: dict[str, str]) -> Optional[Gpu]: def _get_pod_spec_parameters_for_gpu( - api: client.CoreV1Api, gpu: Gpu + api: client.CoreV1Api, gpu_spec: GPUSpec ) -> tuple[str, client.V1NodeAffinity, str]: - gpu_vendor = gpu.vendor - assert gpu_vendor is not None - if gpu_vendor == AcceleratorVendor.NVIDIA: - node_affinity = _get_nvidia_gpu_node_affinity(api, gpu) + node_list = call_api_method(api.list_node, client.V1NodeList) + nodes = get_value(node_list, ".items", list[client.V1Node], required=True) + gpu_vendor = gpu_spec.vendor + # If no vendor specified, we assume it's NVIDIA. Technically, it's possible to request either + # NVIDIA or AMD in the run configuration using only GPU names (e.g.,`gpu: H100,MI300X:8`), + # but we ignore such configurations as it's hard to translate them to K8s request. + if gpu_vendor is None or gpu_vendor == AcceleratorVendor.NVIDIA: + node_affinity = _get_nvidia_gpu_node_affinity(gpu_spec, nodes) return NVIDIA_GPU_RESOURCE, node_affinity, NVIDIA_GPU_NODE_TAINT if gpu_vendor == AcceleratorVendor.AMD: - node_affinity = _get_amd_gpu_node_affinity(gpu) + node_affinity = _get_amd_gpu_node_affinity(gpu_spec, nodes) return AMD_GPU_RESOURCE, node_affinity, AMD_GPU_NODE_TAINT raise ComputeError(f"Unsupported GPU vendor: {gpu_vendor}") -def _get_nvidia_gpu_node_affinity(api: client.CoreV1Api, gpu: Gpu) -> client.V1NodeAffinity: +def _get_nvidia_gpu_node_affinity( + gpu_spec: GPUSpec, nodes: list[client.V1Node] +) -> client.V1NodeAffinity: matching_gpu_label_values: set[str] = set() - # We cannot generate an expected GPU label value from the Gpu model instance - # as the actual values may have additional components (socket, memory type, etc.) - # that we don't preserve in the Gpu model, e.g., "NVIDIA-H100-80GB-HBM3". - # Moreover, a single Gpu may match multiple label values. - # As a workaround, we iterate and process all node labels once again (we already - # processed them in `get_offers_by_requirements()`). - node_list = call_api_method(api.list_node, client.V1NodeList) - nodes = get_value(node_list, ".items", list[client.V1Node], required=True) for node in nodes: labels = get_value(node, ".metadata.labels", dict[str, str]) or {} - if _get_nvidia_gpu_from_node_labels(labels) == gpu: + gpu = _get_nvidia_gpu_from_node_labels(labels) + if gpu is not None and _gpu_matches_gpu_spec(gpu, gpu_spec): matching_gpu_label_values.add(labels[NVIDIA_GPU_PRODUCT_LABEL]) if not matching_gpu_label_values: - raise ComputeError(f"NVIDIA GPU is requested but no matching GPU labels found: {gpu=}") - logger.debug("Selecting nodes by labels %s for NVIDIA %s", matching_gpu_label_values, gpu.name) + raise ComputeError( + f"NVIDIA GPU is requested but no matching GPU labels found: {gpu_spec=}" + ) + logger.debug( + "Selecting nodes by labels %s for NVIDIA %s", matching_gpu_label_values, gpu_spec.name + ) return client.V1NodeAffinity( required_during_scheduling_ignored_during_execution=client.V1NodeSelector( node_selector_terms=[ @@ -637,10 +657,15 @@ def _get_nvidia_gpu_node_affinity(api: client.CoreV1Api, gpu: Gpu) -> client.V1N ) -def _get_amd_gpu_node_affinity(gpu: Gpu) -> client.V1NodeAffinity: - device_ids = AMD_GPU_NAME_TO_DEVICE_IDS.get(gpu.name) - if device_ids is None: - raise ComputeError(f"AMD GPU is requested but no matching device ids found: {gpu=}") +def _get_amd_gpu_node_affinity( + gpu_spec: GPUSpec, nodes: list[client.V1Node] +) -> client.V1NodeAffinity: + matching_device_ids: set[int] = set() + for node in nodes: + labels = get_value(node, ".metadata.labels", dict[str, str]) or {} + gpu = _get_amd_gpu_from_node_labels(labels) + if gpu is not None and _gpu_matches_gpu_spec(gpu, gpu_spec): + matching_device_ids.update(AMD_GPU_NAME_TO_DEVICE_IDS[gpu.name]) return client.V1NodeAffinity( required_during_scheduling_ignored_during_execution=client.V1NodeSelector( node_selector_terms=[ @@ -652,12 +677,35 @@ def _get_amd_gpu_node_affinity(gpu: Gpu) -> client.V1NodeAffinity: ), ], ) - for device_id in device_ids + for device_id in matching_device_ids ], ), ) +def _gpu_matches_gpu_spec(gpu: Gpu, gpu_spec: GPUSpec) -> bool: + if gpu_spec.vendor is not None and gpu.vendor != gpu_spec.vendor: + return False + if gpu_spec.name is not None and gpu.name.lower() not in map(str.lower, gpu_spec.name): + return False + if gpu_spec.memory is not None: + min_memory_gib = gpu_spec.memory.min + if min_memory_gib is not None and gpu.memory_mib < min_memory_gib * 1024: + return False + max_memory_gib = gpu_spec.memory.max + if max_memory_gib is not None and gpu.memory_mib > max_memory_gib * 1024: + return False + if gpu_spec.compute_capability is not None: + if gpu.vendor != AcceleratorVendor.NVIDIA: + return False + gpu_info = NVIDIA_GPU_NAME_TO_GPU_INFO.get(gpu.name) + if gpu_info is None: + return False + if gpu_info.compute_capability < gpu_spec.compute_capability: + return False + return True + + def _continue_setup_jump_pod( api: client.CoreV1Api, namespace: str,