Skip to content

Commit 3c43b25

Browse files
authored
Kubernetes: improve offers (#3548)
* Skip nodes with untolerated taints * Take into account already allocated resources * Set offer resources to the lower limit of the resource requirements ranges Closes: #3481
1 parent b166b9e commit 3c43b25

File tree

4 files changed

+423
-220
lines changed

4 files changed

+423
-220
lines changed

docs/docs/concepts/backends.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1049,7 +1049,7 @@ projects:
10491049
verbs: ["get", "create"]
10501050
- apiGroups: [""]
10511051
resources: ["pods"]
1052-
verbs: ["get", "create", "delete"]
1052+
verbs: ["get", "create", "delete", "list"]
10531053
- apiGroups: [""]
10541054
resources: ["services"]
10551055
verbs: ["get", "create", "delete"]

src/dstack/_internal/core/backends/kubernetes/compute.py

Lines changed: 48 additions & 208 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from enum import Enum
77
from typing import List, Optional
88

9-
from gpuhunt import KNOWN_AMD_GPUS, KNOWN_NVIDIA_GPUS, AcceleratorVendor
9+
from gpuhunt import AcceleratorVendor
1010
from kubernetes import client
1111

1212
from dstack._internal.core.backends.base.compute import (
@@ -19,85 +19,67 @@
1919
generate_unique_instance_name_for_job,
2020
get_docker_commands,
2121
get_dstack_gateway_commands,
22-
normalize_arch,
2322
)
24-
from dstack._internal.core.backends.base.offers import filter_offers_by_requirements
2523
from dstack._internal.core.backends.kubernetes.models import (
2624
KubernetesConfig,
2725
KubernetesProxyJumpConfig,
2826
)
27+
from dstack._internal.core.backends.kubernetes.resources import (
28+
AMD_GPU_DEVICE_ID_LABEL_PREFIX,
29+
AMD_GPU_NAME_TO_DEVICE_IDS,
30+
AMD_GPU_NODE_TAINT,
31+
AMD_GPU_RESOURCE,
32+
DUMMY_REGION,
33+
NVIDIA_GPU_NAME_TO_GPU_INFO,
34+
NVIDIA_GPU_NODE_TAINT,
35+
NVIDIA_GPU_PRODUCT_LABEL,
36+
NVIDIA_GPU_RESOURCE,
37+
TaintEffect,
38+
format_memory,
39+
get_amd_gpu_from_node_labels,
40+
get_gpu_request_from_gpu_spec,
41+
get_instance_offer_from_node,
42+
get_instance_offers,
43+
get_node_labels,
44+
get_nvidia_gpu_from_node_labels,
45+
is_hard_taint,
46+
is_taint_tolerated,
47+
)
2948
from dstack._internal.core.backends.kubernetes.utils import (
3049
call_api_method,
3150
get_api_from_config_data,
3251
get_cluster_public_ip,
3352
)
3453
from dstack._internal.core.consts import DSTACK_RUNNER_SSH_PORT
3554
from dstack._internal.core.errors import ComputeError
36-
from dstack._internal.core.models.backends.base import BackendType
3755
from dstack._internal.core.models.gateways import (
3856
GatewayComputeConfiguration,
3957
GatewayProvisioningData,
4058
)
4159
from dstack._internal.core.models.instances import (
42-
Disk,
4360
Gpu,
44-
InstanceAvailability,
4561
InstanceOfferWithAvailability,
46-
InstanceRuntime,
47-
InstanceType,
48-
Resources,
4962
SSHConnectionParams,
5063
)
5164
from dstack._internal.core.models.placement import PlacementGroup
52-
from dstack._internal.core.models.resources import CPUSpec, GPUSpec, Memory
65+
from dstack._internal.core.models.resources import CPUSpec, GPUSpec
5366
from dstack._internal.core.models.routers import AnyRouterConfig
5467
from dstack._internal.core.models.runs import Job, JobProvisioningData, Requirements, Run
5568
from dstack._internal.core.models.volumes import Volume
56-
from dstack._internal.utils.common import get_or_error, parse_memory
69+
from dstack._internal.utils.common import get_or_error
5770
from dstack._internal.utils.logging import get_logger
5871

5972
logger = get_logger(__name__)
6073

6174
JUMP_POD_IMAGE = "testcontainers/sshd:1.3.0@sha256:c50c0f59554dcdb2d9e5e705112144428ae9d04ac0af6322b365a18e24213a6a"
6275
JUMP_POD_SSH_PORT = 22
63-
DUMMY_REGION = "-"
64-
65-
NVIDIA_GPU_RESOURCE = "nvidia.com/gpu"
66-
NVIDIA_GPU_NODE_TAINT = NVIDIA_GPU_RESOURCE
67-
NVIDIA_GPU_PRODUCT_LABEL = f"{NVIDIA_GPU_RESOURCE}.product"
68-
69-
AMD_GPU_RESOURCE = "amd.com/gpu"
70-
AMD_GPU_NODE_TAINT = AMD_GPU_RESOURCE
71-
# The oldest but still supported label format, the safest option, see the commit message:
72-
# https://github.com/ROCm/k8s-device-plugin/commit/c0b0231b391a56bc9da4f362d561e25e960d7a48
73-
# E.g., beta.amd.com/gpu.device-id.74b5=4 - A node with four MI300X VF (0x74b5) GPUs
74-
# We cannot rely on the beta.amd.com/gpu.product-name.* label, as it may be missing, see the issue:
75-
# https://github.com/ROCm/k8s-device-plugin/issues/112
76-
AMD_GPU_DEVICE_ID_LABEL_PREFIX = f"beta.{AMD_GPU_RESOURCE}.device-id."
77-
78-
# Taints we know and tolerate when creating our objects, e.g., the jump pod.
79-
TOLERATED_NODE_TAINTS = (NVIDIA_GPU_NODE_TAINT, AMD_GPU_NODE_TAINT)
80-
81-
NVIDIA_GPU_NAME_TO_GPU_INFO = {gpu.name: gpu for gpu in KNOWN_NVIDIA_GPUS}
82-
NVIDIA_GPU_NAMES = NVIDIA_GPU_NAME_TO_GPU_INFO.keys()
83-
84-
AMD_GPU_DEVICE_ID_TO_GPU_INFO = {
85-
device_id: gpu_info for gpu_info in KNOWN_AMD_GPUS for device_id in gpu_info.device_ids
86-
}
87-
AMD_GPU_NAME_TO_DEVICE_IDS = {gpu.name: gpu.device_ids for gpu in KNOWN_AMD_GPUS}
8876

8977

9078
class Operator(str, Enum):
9179
EXISTS = "Exists"
9280
IN = "In"
9381

9482

95-
class TaintEffect(str, Enum):
96-
NO_EXECUTE = "NoExecute"
97-
NO_SCHEDULE = "NoSchedule"
98-
PREFER_NO_SCHEDULE = "PreferNoSchedule"
99-
100-
10183
class KubernetesCompute(
10284
ComputeWithFilteredOffersCached,
10385
ComputeWithPrivilegedSupport,
@@ -117,16 +99,7 @@ def __init__(self, config: KubernetesConfig):
11799
def get_offers_by_requirements(
118100
self, requirements: Requirements
119101
) -> list[InstanceOfferWithAvailability]:
120-
gpu_request = 0
121-
if (gpu_spec := requirements.resources.gpu) is not None:
122-
gpu_request = _get_gpu_request_from_gpu_spec(gpu_spec)
123-
instance_offers: list[InstanceOfferWithAvailability] = []
124-
for node in self.api.list_node().items:
125-
if (instance_offer := _get_instance_offer_from_node(node, gpu_request)) is not None:
126-
instance_offers.extend(
127-
filter_offers_by_requirements([instance_offer], requirements)
128-
)
129-
return instance_offers
102+
return get_instance_offers(self.api, requirements)
130103

131104
def run_job(
132105
self,
@@ -191,7 +164,7 @@ def run_job(
191164
if (cpu_max := resources_spec.cpu.count.max) is not None:
192165
resources_limits["cpu"] = str(cpu_max)
193166
if (gpu_spec := resources_spec.gpu) is not None:
194-
if (gpu_request := _get_gpu_request_from_gpu_spec(gpu_spec)) > 0:
167+
if (gpu_request := get_gpu_request_from_gpu_spec(gpu_spec)) > 0:
195168
gpu_resource, node_affinity, node_taint = _get_pod_spec_parameters_for_gpu(
196169
self.api, gpu_spec
197170
)
@@ -208,22 +181,22 @@ def run_job(
208181
)
209182
)
210183
if (memory_min := resources_spec.memory.min) is not None:
211-
resources_requests["memory"] = _render_memory(memory_min)
184+
resources_requests["memory"] = format_memory(memory_min)
212185
if (memory_max := resources_spec.memory.max) is not None:
213-
resources_limits["memory"] = _render_memory(memory_max)
186+
resources_limits["memory"] = format_memory(memory_max)
214187
if (disk_spec := resources_spec.disk) is not None:
215188
if (disk_min := disk_spec.size.min) is not None:
216-
resources_requests["ephemeral-storage"] = _render_memory(disk_min)
189+
resources_requests["ephemeral-storage"] = format_memory(disk_min)
217190
if (disk_max := disk_spec.size.max) is not None:
218-
resources_limits["ephemeral-storage"] = _render_memory(disk_max)
191+
resources_limits["ephemeral-storage"] = format_memory(disk_max)
219192
if (shm_size := resources_spec.shm_size) is not None:
220193
shm_volume_name = "dev-shm"
221194
volumes_.append(
222195
client.V1Volume(
223196
name=shm_volume_name,
224197
empty_dir=client.V1EmptyDirVolumeSource(
225198
medium="Memory",
226-
size_limit=_render_memory(shm_size),
199+
size_limit=format_memory(shm_size),
227200
),
228201
)
229202
)
@@ -338,10 +311,17 @@ def update_provisioning_data(
338311
provisioning_data.hostname = get_or_error(service_spec.cluster_ip)
339312
pod_spec = get_or_error(pod.spec)
340313
node = self.api.read_node(name=get_or_error(pod_spec.node_name))
341-
# The original offer has a list of GPUs already sliced according to pod spec's GPU resource
342-
# request, which is inferred from dstack's GPUSpec, see _get_gpu_request_from_gpu_spec
343-
gpu_request = len(provisioning_data.instance_type.resources.gpus)
344-
if (instance_offer := _get_instance_offer_from_node(node, gpu_request)) is not None:
314+
# In the original offer, the resources have already been adjusted according to
315+
# the run configuration resource requirements, see get_offers_by_requirements()
316+
original_resources = provisioning_data.instance_type.resources
317+
instance_offer = get_instance_offer_from_node(
318+
node=node,
319+
cpu_request=original_resources.cpus,
320+
memory_mib_request=original_resources.memory_mib,
321+
gpu_request=len(original_resources.gpus),
322+
disk_mib_request=original_resources.disk.size_mib,
323+
)
324+
if instance_offer is not None:
345325
provisioning_data.instance_type = instance_offer.instance
346326
provisioning_data.region = instance_offer.region
347327
provisioning_data.price = instance_offer.price
@@ -481,146 +461,6 @@ def terminate_gateway(
481461
)
482462

483463

484-
def _get_gpu_request_from_gpu_spec(gpu_spec: GPUSpec) -> int:
485-
return gpu_spec.count.min or 0
486-
487-
488-
def _get_instance_offer_from_node(
489-
node: client.V1Node, gpu_request: int
490-
) -> Optional[InstanceOfferWithAvailability]:
491-
try:
492-
node_name = get_or_error(get_or_error(node.metadata).name)
493-
node_status = get_or_error(node.status)
494-
allocatable = get_or_error(node_status.allocatable)
495-
_cpu_arch: Optional[str] = None
496-
if node_status.node_info is not None:
497-
_cpu_arch = node_status.node_info.architecture
498-
cpu_arch = normalize_arch(_cpu_arch).to_cpu_architecture()
499-
cpus = _parse_cpu(allocatable["cpu"])
500-
memory_mib = _parse_memory(allocatable["memory"])
501-
disk_size_mib = _parse_memory(allocatable["ephemeral-storage"])
502-
gpus = _get_node_gpus(node)
503-
except (ValueError, KeyError) as e:
504-
logger.exception("Failed to process node: %s: %s", type(e).__name__, e)
505-
return None
506-
return InstanceOfferWithAvailability(
507-
backend=BackendType.KUBERNETES,
508-
instance=InstanceType(
509-
name=node_name,
510-
resources=Resources(
511-
cpus=cpus,
512-
cpu_arch=cpu_arch,
513-
memory_mib=memory_mib,
514-
gpus=gpus[:gpu_request],
515-
spot=False,
516-
disk=Disk(size_mib=disk_size_mib),
517-
),
518-
),
519-
price=0,
520-
region=DUMMY_REGION,
521-
availability=InstanceAvailability.AVAILABLE,
522-
instance_runtime=InstanceRuntime.RUNNER,
523-
)
524-
525-
526-
def _parse_cpu(cpu: str) -> int:
527-
if cpu.endswith("m"):
528-
# "m" means millicpu (1/1000 CPU), e.g., 7900m -> 7.9 -> 7
529-
return int(float(cpu[:-1]) / 1000)
530-
return int(cpu)
531-
532-
533-
def _parse_memory(memory: str) -> int:
534-
if memory.isdigit():
535-
# no suffix means that the value is in bytes
536-
return int(memory) // 2**20
537-
return int(parse_memory(memory, as_untis="M"))
538-
539-
540-
def _render_memory(memory: Memory) -> str:
541-
return f"{float(memory)}Gi"
542-
543-
544-
def _get_node_labels(node: client.V1Node) -> dict[str, str]:
545-
if (metadata := node.metadata) is None:
546-
return {}
547-
if (labels := metadata.labels) is None:
548-
return {}
549-
return labels
550-
551-
552-
def _get_node_gpus(node: client.V1Node) -> list[Gpu]:
553-
node_name = get_or_error(get_or_error(node.metadata).name)
554-
allocatable = get_or_error(get_or_error(node.status).allocatable)
555-
labels = _get_node_labels(node)
556-
for gpu_resource, gpu_getter in (
557-
(NVIDIA_GPU_RESOURCE, _get_nvidia_gpu_from_node_labels),
558-
(AMD_GPU_RESOURCE, _get_amd_gpu_from_node_labels),
559-
):
560-
_gpu_count = allocatable.get(gpu_resource)
561-
if not _gpu_count:
562-
continue
563-
gpu_count = int(_gpu_count)
564-
if gpu_count < 1:
565-
continue
566-
gpu = gpu_getter(labels)
567-
if gpu is None:
568-
logger.warning(
569-
"Node %s: GPU resource found, but failed to detect its model: %s=%d",
570-
node_name,
571-
gpu_resource,
572-
gpu_count,
573-
)
574-
return []
575-
return [gpu] * gpu_count
576-
logger.debug("Node %s: no GPU resource found", node_name)
577-
return []
578-
579-
580-
def _get_nvidia_gpu_from_node_labels(labels: dict[str, str]) -> Optional[Gpu]:
581-
# We rely on https://github.com/NVIDIA/k8s-device-plugin/tree/main/docs/gpu-feature-discovery
582-
# to detect gpus. Note that "nvidia.com/gpu.product" is not a short gpu name like "T4" or
583-
# "A100" but a product name like "Tesla-T4" or "A100-SXM4-40GB".
584-
# Thus, we convert the product name to a known gpu name.
585-
gpu_product = labels.get(NVIDIA_GPU_PRODUCT_LABEL)
586-
if gpu_product is None:
587-
return None
588-
gpu_product = gpu_product.replace("RTX-", "RTX")
589-
for gpu_name in NVIDIA_GPU_NAMES:
590-
if gpu_name.lower() in gpu_product.lower().split("-"):
591-
break
592-
else:
593-
return None
594-
gpu_info = NVIDIA_GPU_NAME_TO_GPU_INFO[gpu_name]
595-
gpu_memory = gpu_info.memory * 1024
596-
# A100 may come in two variants
597-
if "40GB" in gpu_product:
598-
gpu_memory = 40 * 1024
599-
return Gpu(vendor=AcceleratorVendor.NVIDIA, name=gpu_name, memory_mib=gpu_memory)
600-
601-
602-
def _get_amd_gpu_from_node_labels(labels: dict[str, str]) -> Optional[Gpu]:
603-
# (AMDGPUInfo.name, AMDGPUInfo.memory) pairs
604-
gpus: set[tuple[str, int]] = set()
605-
for label in labels:
606-
if not label.startswith(AMD_GPU_DEVICE_ID_LABEL_PREFIX):
607-
continue
608-
_, _, _device_id = label.rpartition(".")
609-
device_id = int(_device_id, 16)
610-
gpu_info = AMD_GPU_DEVICE_ID_TO_GPU_INFO.get(device_id)
611-
if gpu_info is None:
612-
logger.warning("Unknown AMD GPU device id: %X", device_id)
613-
continue
614-
gpus.add((gpu_info.name, gpu_info.memory))
615-
if not gpus:
616-
return None
617-
if len(gpus) == 1:
618-
gpu_name, gpu_memory_gib = next(iter(gpus))
619-
return Gpu(vendor=AcceleratorVendor.AMD, name=gpu_name, memory_mib=gpu_memory_gib * 1024)
620-
logger.warning("Multiple AMD GPU models detected: %s, ignoring all GPUs", gpus)
621-
return None
622-
623-
624464
def _get_pod_spec_parameters_for_gpu(
625465
api: client.CoreV1Api, gpu_spec: GPUSpec
626466
) -> tuple[str, client.V1NodeAffinity, str]:
@@ -643,8 +483,8 @@ def _get_nvidia_gpu_node_affinity(
643483
) -> client.V1NodeAffinity:
644484
matching_gpu_label_values: set[str] = set()
645485
for node in nodes:
646-
labels = _get_node_labels(node)
647-
gpu = _get_nvidia_gpu_from_node_labels(labels)
486+
labels = get_node_labels(node)
487+
gpu = get_nvidia_gpu_from_node_labels(labels)
648488
if gpu is not None and _gpu_matches_gpu_spec(gpu, gpu_spec):
649489
matching_gpu_label_values.add(labels[NVIDIA_GPU_PRODUCT_LABEL])
650490
if not matching_gpu_label_values:
@@ -676,8 +516,8 @@ def _get_amd_gpu_node_affinity(
676516
) -> client.V1NodeAffinity:
677517
matching_device_ids: set[int] = set()
678518
for node in nodes:
679-
labels = _get_node_labels(node)
680-
gpu = _get_amd_gpu_from_node_labels(labels)
519+
labels = get_node_labels(node)
520+
gpu = get_amd_gpu_from_node_labels(labels)
681521
if gpu is not None and _gpu_matches_gpu_spec(gpu, gpu_spec):
682522
matching_device_ids.update(AMD_GPU_NAME_TO_DEVICE_IDS[gpu.name])
683523
return client.V1NodeAffinity(
@@ -828,10 +668,10 @@ def _create_jump_pod_service(
828668
taints = node_spec.taints or []
829669
for taint in taints:
830670
# A "soft" taint, ignore.
831-
if taint.effect == TaintEffect.PREFER_NO_SCHEDULE:
671+
if not is_hard_taint(taint):
832672
continue
833673
has_hard_taint = True
834-
if taint.key in TOLERATED_NODE_TAINTS:
674+
if is_taint_tolerated(taint):
835675
tolerated_taints.add((taint.key, taint.effect))
836676
if not has_hard_taint:
837677
toleration_required = False

0 commit comments

Comments
 (0)