|
5 | 5 | from enum import Enum |
6 | 6 | from typing import List, Optional, Tuple |
7 | 7 |
|
8 | | -from gpuhunt import KNOWN_NVIDIA_GPUS, AcceleratorVendor |
| 8 | +from gpuhunt import KNOWN_AMD_GPUS, KNOWN_NVIDIA_GPUS, AcceleratorVendor |
9 | 9 | from kubernetes import client |
10 | 10 |
|
11 | 11 | from dstack._internal.core.backends.base.compute import ( |
|
59 | 59 | logger = get_logger(__name__) |
60 | 60 |
|
61 | 61 | JUMP_POD_SSH_PORT = 22 |
62 | | - |
63 | | -NVIDIA_GPU_NAME_TO_GPU_INFO = {gpu.name: gpu for gpu in KNOWN_NVIDIA_GPUS} |
64 | | -NVIDIA_GPU_NAMES = NVIDIA_GPU_NAME_TO_GPU_INFO.keys() |
| 62 | +DUMMY_REGION = "-" |
65 | 63 |
|
66 | 64 | NVIDIA_GPU_RESOURCE = "nvidia.com/gpu" |
67 | | -NVIDIA_GPU_COUNT_LABEL = f"{NVIDIA_GPU_RESOURCE}.count" |
68 | | -NVIDIA_GPU_PRODUCT_LABEL = f"{NVIDIA_GPU_RESOURCE}.product" |
69 | 65 | NVIDIA_GPU_NODE_TAINT = NVIDIA_GPU_RESOURCE |
| 66 | +NVIDIA_GPU_PRODUCT_LABEL = f"{NVIDIA_GPU_RESOURCE}.product" |
| 67 | + |
| 68 | +AMD_GPU_RESOURCE = "amd.com/gpu" |
| 69 | +AMD_GPU_NODE_TAINT = AMD_GPU_RESOURCE |
| 70 | +# The oldest but still supported label format, the safest option, see the commit message: |
| 71 | +# https://github.com/ROCm/k8s-device-plugin/commit/c0b0231b391a56bc9da4f362d561e25e960d7a48 |
| 72 | +# E.g., beta.amd.com/gpu.device-id.74b5=4 - A node with four MI300X VF (0x74b5) GPUs |
| 73 | +# We cannot rely on the beta.amd.com/gpu.product-name.* label, as it may be missing, see the issue: |
| 74 | +# https://github.com/ROCm/k8s-device-plugin/issues/112 |
| 75 | +AMD_GPU_DEVICE_ID_LABEL_PREFIX = f"beta.{AMD_GPU_RESOURCE}.device-id." |
70 | 76 |
|
71 | 77 | # Taints we know and tolerate when creating our objects, e.g., the jump pod. |
72 | | -TOLERATED_NODE_TAINTS = (NVIDIA_GPU_NODE_TAINT,) |
| 78 | +TOLERATED_NODE_TAINTS = (NVIDIA_GPU_NODE_TAINT, AMD_GPU_NODE_TAINT) |
73 | 79 |
|
74 | | -DUMMY_REGION = "-" |
| 80 | +NVIDIA_GPU_NAME_TO_GPU_INFO = {gpu.name: gpu for gpu in KNOWN_NVIDIA_GPUS} |
| 81 | +NVIDIA_GPU_NAMES = NVIDIA_GPU_NAME_TO_GPU_INFO.keys() |
| 82 | + |
| 83 | +AMD_GPU_DEVICE_ID_TO_GPU_INFO = { |
| 84 | + device_id: gpu_info for gpu_info in KNOWN_AMD_GPUS for device_id in gpu_info.device_ids |
| 85 | +} |
| 86 | +AMD_GPU_NAME_TO_DEVICE_IDS = {gpu.name: gpu.device_ids for gpu in KNOWN_AMD_GPUS} |
75 | 87 |
|
76 | 88 |
|
77 | 89 | class Operator(str, Enum): |
@@ -112,21 +124,15 @@ def get_offers_by_requirements( |
112 | 124 | nodes = get_value(node_list, ".items", list[client.V1Node], required=True) |
113 | 125 | for node in nodes: |
114 | 126 | try: |
115 | | - labels = get_value(node, ".metadata.labels", dict[str, str]) or {} |
116 | 127 | name = get_value(node, ".metadata.name", str, required=True) |
117 | | - cpus = _parse_cpu( |
118 | | - get_value(node, ".status.allocatable['cpu']", str, required=True) |
119 | | - ) |
120 | 128 | cpu_arch = normalize_arch( |
121 | 129 | get_value(node, ".status.node_info.architecture", str) |
122 | 130 | ).to_cpu_architecture() |
123 | | - memory_mib = _parse_memory( |
124 | | - get_value(node, ".status.allocatable['memory']", str, required=True) |
125 | | - ) |
126 | | - gpus, _ = _get_gpus_from_node_labels(labels) |
127 | | - disk_size_mib = _parse_memory( |
128 | | - get_value(node, ".status.allocatable['ephemeral-storage']", str, required=True) |
129 | | - ) |
| 131 | + allocatable = get_value(node, ".status.allocatable", dict[str, str], required=True) |
| 132 | + cpus = _parse_cpu(allocatable["cpu"]) |
| 133 | + memory_mib = _parse_memory(allocatable["memory"]) |
| 134 | + disk_size_mib = _parse_memory(allocatable["ephemeral-storage"]) |
| 135 | + gpus = _get_node_gpus(node) |
130 | 136 | except (AttributeError, KeyError, ValueError) as e: |
131 | 137 | logger.exception("Failed to process node: %s: %s", type(e).__name__, e) |
132 | 138 | continue |
@@ -217,59 +223,18 @@ def run_job( |
217 | 223 | "GPU is requested but the offer has no GPUs:" |
218 | 224 | f" {gpu_spec=} {instance_offer=}", |
219 | 225 | ) |
220 | | - offer_gpu = offer_gpus[0] |
221 | | - matching_gpu_label_values: set[str] = set() |
222 | | - # We cannot generate an expected GPU label value from the Gpu model instance |
223 | | - # as the actual values may have additional components (socket, memory type, etc.) |
224 | | - # that we don't preserve in the Gpu model, e.g., "NVIDIA-H100-80GB-HBM3". |
225 | | - # Moreover, a single Gpu may match multiple label values. |
226 | | - # As a workaround, we iterate and process all node labels once again (we already |
227 | | - # processed them in `get_offers_by_requirements()`). |
228 | | - node_list = call_api_method( |
229 | | - self.api.list_node, |
230 | | - client.V1NodeList, |
231 | | - ) |
232 | | - nodes = get_value(node_list, ".items", list[client.V1Node], required=True) |
233 | | - for node in nodes: |
234 | | - labels = get_value(node, ".metadata.labels", dict[str, str]) |
235 | | - if not labels: |
236 | | - continue |
237 | | - gpus, gpu_label_value = _get_gpus_from_node_labels(labels) |
238 | | - if not gpus or gpu_label_value is None: |
239 | | - continue |
240 | | - if gpus[0] == offer_gpu: |
241 | | - matching_gpu_label_values.add(gpu_label_value) |
242 | | - if not matching_gpu_label_values: |
243 | | - raise ComputeError( |
244 | | - f"GPU is requested but no matching GPU labels found: {gpu_spec=}" |
245 | | - ) |
246 | | - logger.debug( |
247 | | - "Requesting %d GPU(s), node labels: %s", gpu_min, matching_gpu_label_values |
248 | | - ) |
249 | | - # TODO: support other GPU vendors |
250 | | - resources_requests[NVIDIA_GPU_RESOURCE] = str(gpu_min) |
251 | | - resources_limits[NVIDIA_GPU_RESOURCE] = str(gpu_min) |
252 | | - node_affinity = client.V1NodeAffinity( |
253 | | - required_during_scheduling_ignored_during_execution=[ |
254 | | - client.V1NodeSelectorTerm( |
255 | | - match_expressions=[ |
256 | | - client.V1NodeSelectorRequirement( |
257 | | - key=NVIDIA_GPU_PRODUCT_LABEL, |
258 | | - operator=Operator.IN, |
259 | | - values=list(matching_gpu_label_values), |
260 | | - ), |
261 | | - ], |
262 | | - ), |
263 | | - ], |
| 226 | + gpu_resource, node_affinity, node_taint = _get_pod_spec_parameters_for_gpu( |
| 227 | + self.api, offer_gpus[0] |
264 | 228 | ) |
| 229 | + logger.debug("Requesting GPU resource: %s=%d", gpu_resource, gpu_min) |
| 230 | + resources_requests[gpu_resource] = resources_limits[gpu_resource] = str(gpu_min) |
265 | 231 | # It should be NoSchedule, but we also add NoExecute toleration just in case. |
266 | 232 | for effect in [TaintEffect.NO_SCHEDULE, TaintEffect.NO_EXECUTE]: |
267 | 233 | tolerations.append( |
268 | 234 | client.V1Toleration( |
269 | | - key=NVIDIA_GPU_NODE_TAINT, operator=Operator.EXISTS, effect=effect |
| 235 | + key=node_taint, operator=Operator.EXISTS, effect=effect |
270 | 236 | ) |
271 | 237 | ) |
272 | | - |
273 | 238 | if (memory_min := resources_spec.memory.min) is not None: |
274 | 239 | resources_requests["memory"] = _render_memory(memory_min) |
275 | 240 | if ( |
@@ -331,7 +296,9 @@ def run_job( |
331 | 296 | volume_mounts=volume_mounts, |
332 | 297 | ) |
333 | 298 | ], |
334 | | - affinity=node_affinity, |
| 299 | + affinity=client.V1Affinity( |
| 300 | + node_affinity=node_affinity, |
| 301 | + ), |
335 | 302 | tolerations=tolerations, |
336 | 303 | volumes=volumes_, |
337 | 304 | ), |
@@ -550,34 +517,144 @@ def _render_memory(memory: Memory) -> str: |
550 | 517 | return f"{float(memory)}Gi" |
551 | 518 |
|
552 | 519 |
|
553 | | -def _get_gpus_from_node_labels(labels: dict[str, str]) -> tuple[list[Gpu], Optional[str]]: |
| 520 | +def _get_node_gpus(node: client.V1Node) -> list[Gpu]: |
| 521 | + node_name = get_value(node, ".metadata.name", str, required=True) |
| 522 | + allocatable = get_value(node, ".status.allocatable", dict[str, str], required=True) |
| 523 | + labels = get_value(node, ".metadata.labels", dict[str, str]) or {} |
| 524 | + for gpu_resource, gpu_getter in ( |
| 525 | + (NVIDIA_GPU_RESOURCE, _get_nvidia_gpu_from_node_labels), |
| 526 | + (AMD_GPU_RESOURCE, _get_amd_gpu_from_node_labels), |
| 527 | + ): |
| 528 | + _gpu_count = allocatable.get(gpu_resource) |
| 529 | + if not _gpu_count: |
| 530 | + continue |
| 531 | + gpu_count = int(_gpu_count) |
| 532 | + if gpu_count < 1: |
| 533 | + continue |
| 534 | + gpu = gpu_getter(labels) |
| 535 | + if gpu is None: |
| 536 | + logger.warning( |
| 537 | + "Node %s: GPU resource found, but failed to detect its model: %s=%d", |
| 538 | + node_name, |
| 539 | + gpu_resource, |
| 540 | + gpu_count, |
| 541 | + ) |
| 542 | + return [] |
| 543 | + return [gpu] * gpu_count |
| 544 | + logger.debug("Node %s: no GPU resource found", node_name) |
| 545 | + return [] |
| 546 | + |
| 547 | + |
| 548 | +def _get_nvidia_gpu_from_node_labels(labels: dict[str, str]) -> Optional[Gpu]: |
554 | 549 | # We rely on https://github.com/NVIDIA/k8s-device-plugin/tree/main/docs/gpu-feature-discovery |
555 | 550 | # to detect gpus. Note that "nvidia.com/gpu.product" is not a short gpu name like "T4" or |
556 | 551 | # "A100" but a product name like "Tesla-T4" or "A100-SXM4-40GB". |
557 | 552 | # Thus, we convert the product name to a known gpu name. |
558 | | - # TODO: support other GPU vendors |
559 | | - gpu_count = labels.get(NVIDIA_GPU_COUNT_LABEL) |
560 | 553 | gpu_product = labels.get(NVIDIA_GPU_PRODUCT_LABEL) |
561 | | - if gpu_count is None or gpu_product is None: |
562 | | - return [], None |
563 | | - gpu_count = int(gpu_count) |
564 | | - gpu_name = None |
565 | | - for known_gpu_name in NVIDIA_GPU_NAMES: |
566 | | - if known_gpu_name.lower() in gpu_product.lower().split("-"): |
567 | | - gpu_name = known_gpu_name |
| 554 | + if gpu_product is None: |
| 555 | + return None |
| 556 | + for gpu_name in NVIDIA_GPU_NAMES: |
| 557 | + if gpu_name.lower() in gpu_product.lower().split("-"): |
568 | 558 | break |
569 | | - if gpu_name is None: |
570 | | - return [], None |
| 559 | + else: |
| 560 | + return None |
571 | 561 | gpu_info = NVIDIA_GPU_NAME_TO_GPU_INFO[gpu_name] |
572 | 562 | gpu_memory = gpu_info.memory * 1024 |
573 | 563 | # A100 may come in two variants |
574 | 564 | if "40GB" in gpu_product: |
575 | 565 | gpu_memory = 40 * 1024 |
576 | | - gpus = [ |
577 | | - Gpu(vendor=AcceleratorVendor.NVIDIA, name=gpu_name, memory_mib=gpu_memory) |
578 | | - for _ in range(gpu_count) |
579 | | - ] |
580 | | - return gpus, gpu_product |
| 566 | + return Gpu(vendor=AcceleratorVendor.NVIDIA, name=gpu_name, memory_mib=gpu_memory) |
| 567 | + |
| 568 | + |
| 569 | +def _get_amd_gpu_from_node_labels(labels: dict[str, str]) -> Optional[Gpu]: |
| 570 | + # (AMDGPUInfo.name, AMDGPUInfo.memory) pairs |
| 571 | + gpus: set[tuple[str, int]] = set() |
| 572 | + for label in labels: |
| 573 | + if not label.startswith(AMD_GPU_DEVICE_ID_LABEL_PREFIX): |
| 574 | + continue |
| 575 | + _, _, _device_id = label.rpartition(".") |
| 576 | + device_id = int(_device_id, 16) |
| 577 | + gpu_info = AMD_GPU_DEVICE_ID_TO_GPU_INFO.get(device_id) |
| 578 | + if gpu_info is None: |
| 579 | + logger.warning("Unknown AMD GPU device id: %X", device_id) |
| 580 | + continue |
| 581 | + gpus.add((gpu_info.name, gpu_info.memory)) |
| 582 | + if not gpus: |
| 583 | + return None |
| 584 | + if len(gpus) == 1: |
| 585 | + gpu_name, gpu_memory_gib = next(iter(gpus)) |
| 586 | + return Gpu(vendor=AcceleratorVendor.AMD, name=gpu_name, memory_mib=gpu_memory_gib * 1024) |
| 587 | + logger.warning("Multiple AMD GPU models detected: %s, ignoring all GPUs", gpus) |
| 588 | + return None |
| 589 | + |
| 590 | + |
| 591 | +def _get_pod_spec_parameters_for_gpu( |
| 592 | + api: client.CoreV1Api, gpu: Gpu |
| 593 | +) -> tuple[str, client.V1NodeAffinity, str]: |
| 594 | + gpu_vendor = gpu.vendor |
| 595 | + assert gpu_vendor is not None |
| 596 | + if gpu_vendor == AcceleratorVendor.NVIDIA: |
| 597 | + node_affinity = _get_nvidia_gpu_node_affinity(api, gpu) |
| 598 | + return NVIDIA_GPU_RESOURCE, node_affinity, NVIDIA_GPU_NODE_TAINT |
| 599 | + if gpu_vendor == AcceleratorVendor.AMD: |
| 600 | + node_affinity = _get_amd_gpu_node_affinity(gpu) |
| 601 | + return AMD_GPU_RESOURCE, node_affinity, AMD_GPU_NODE_TAINT |
| 602 | + raise ComputeError(f"Unsupported GPU vendor: {gpu_vendor}") |
| 603 | + |
| 604 | + |
| 605 | +def _get_nvidia_gpu_node_affinity(api: client.CoreV1Api, gpu: Gpu) -> client.V1NodeAffinity: |
| 606 | + matching_gpu_label_values: set[str] = set() |
| 607 | + # We cannot generate an expected GPU label value from the Gpu model instance |
| 608 | + # as the actual values may have additional components (socket, memory type, etc.) |
| 609 | + # that we don't preserve in the Gpu model, e.g., "NVIDIA-H100-80GB-HBM3". |
| 610 | + # Moreover, a single Gpu may match multiple label values. |
| 611 | + # As a workaround, we iterate and process all node labels once again (we already |
| 612 | + # processed them in `get_offers_by_requirements()`). |
| 613 | + node_list = call_api_method(api.list_node, client.V1NodeList) |
| 614 | + nodes = get_value(node_list, ".items", list[client.V1Node], required=True) |
| 615 | + for node in nodes: |
| 616 | + labels = get_value(node, ".metadata.labels", dict[str, str]) or {} |
| 617 | + if _get_nvidia_gpu_from_node_labels(labels) == gpu: |
| 618 | + matching_gpu_label_values.add(labels[NVIDIA_GPU_PRODUCT_LABEL]) |
| 619 | + if not matching_gpu_label_values: |
| 620 | + raise ComputeError(f"NVIDIA GPU is requested but no matching GPU labels found: {gpu=}") |
| 621 | + logger.debug("Selecting nodes by labels %s for NVIDIA %s", matching_gpu_label_values, gpu.name) |
| 622 | + return client.V1NodeAffinity( |
| 623 | + required_during_scheduling_ignored_during_execution=client.V1NodeSelector( |
| 624 | + node_selector_terms=[ |
| 625 | + client.V1NodeSelectorTerm( |
| 626 | + match_expressions=[ |
| 627 | + client.V1NodeSelectorRequirement( |
| 628 | + key=NVIDIA_GPU_PRODUCT_LABEL, |
| 629 | + operator=Operator.IN, |
| 630 | + values=list(matching_gpu_label_values), |
| 631 | + ), |
| 632 | + ], |
| 633 | + ), |
| 634 | + ], |
| 635 | + ), |
| 636 | + ) |
| 637 | + |
| 638 | + |
| 639 | +def _get_amd_gpu_node_affinity(gpu: Gpu) -> client.V1NodeAffinity: |
| 640 | + device_ids = AMD_GPU_NAME_TO_DEVICE_IDS.get(gpu.name) |
| 641 | + if device_ids is None: |
| 642 | + raise ComputeError(f"AMD GPU is requested but no matching device ids found: {gpu=}") |
| 643 | + return client.V1NodeAffinity( |
| 644 | + required_during_scheduling_ignored_during_execution=client.V1NodeSelector( |
| 645 | + node_selector_terms=[ |
| 646 | + client.V1NodeSelectorTerm( |
| 647 | + match_expressions=[ |
| 648 | + client.V1NodeSelectorRequirement( |
| 649 | + key=f"{AMD_GPU_DEVICE_ID_LABEL_PREFIX}{device_id:x}", |
| 650 | + operator=Operator.EXISTS, |
| 651 | + ), |
| 652 | + ], |
| 653 | + ) |
| 654 | + for device_id in device_ids |
| 655 | + ], |
| 656 | + ), |
| 657 | + ) |
581 | 658 |
|
582 | 659 |
|
583 | 660 | def _continue_setup_jump_pod( |
|
0 commit comments