From 80e7151a3d4051c1d7915d912c2dc9818b2f60b6 Mon Sep 17 00:00:00 2001 From: dkeven Date: Wed, 17 Jun 2026 21:50:39 +0800 Subject: [PATCH] feat(scheduler): seperate device allocation for multi pods of a single app across nodes --- pkg/scheduler/scheduler.go | 145 ++++++++++++++++++++++++++++++------- 1 file changed, 118 insertions(+), 27 deletions(-) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 62706a68f2..2f367542f5 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -669,6 +669,31 @@ func bindingMatchesIdentity(binding *v1alpha1.GPUBinding, id appBindingIdentity) return true } +// buildGPUUUIDToNodeMap returns a mapping from GPU UUID to the name of the node +// the GPU is registered on. It is used to locate which node an app's bound GPUs +// live on so a pod can be pinned to a single node. +func (s *Scheduler) buildGPUUUIDToNodeMap() map[string]string { + res := make(map[string]string) + nodes, err := s.ListNodes() + if err != nil { + klog.ErrorS(err, "failed to list nodes for GPU UUID to node mapping") + return res + } + for nodeID, n := range nodes { + if n == nil { + continue + } + for _, d := range n.Devices { + uuid := normalizeGPUUUID(d.ID) + if uuid == "" { + continue + } + res[uuid] = nodeID + } + } + return res +} + func (s *Scheduler) collectConsumedGPUUUIDsByApp(identity appBindingIdentity, currentPod *corev1.Pod) map[string]struct{} { consumed := make(map[string]struct{}) for _, p := range s.ListPodsInfo() { @@ -856,51 +881,117 @@ func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFi policyMode = args.Pod.Labels[nvidia.AppPodGPUConsumePolicyKey] } consumedByApp := s.collectConsumedGPUUUIDsByApp(identity, args.Pod) + + selectedUUIDs := make([]string, 0) + selectedUUIDSet := make(map[string]struct{}) + appendSelectedUUID := func(uuid string) { + if uuid == "" { + return + } + if _, ok := selectedUUIDSet[uuid]; ok { + return + } + selectedUUIDSet[uuid] = struct{}{} + selectedUUIDs = append(selectedUUIDs, uuid) + } + if policyMode == "" || policyMode == nvidia.AppPodGPUConsumePolicyAll { + // "all" policy: a single pod consumes all GPUs bound to this app that live on + // one node. Because a pod can only run on a single node, the app's bound GPUs + // may span several nodes. We therefore group the bound GPUs by node, skip nodes + // already occupied by other pods of this app (i.e. nodes whose bound GPUs are + // already consumed), and pin this pod to one remaining free node so that it + // takes all of that node's bound GPUs. if len(matchedBindings) > 0 { + uuidToNode := s.buildGPUUUIDToNodeMap() + nodeToBoundUUIDs := make(map[string][]string) + nodeOrder := make([]string, 0) for _, b := range matchedBindings { - if _, occupied := consumedByApp[b.Spec.UUID]; occupied { - err := fmt.Errorf("bound GPU %s of app %s is already consumed by another pod", b.Spec.UUID, appName) - s.recordScheduleFilterResultEvent(args.Pod, EventReasonInsufficientGPU, "", err) - return &extenderv1.ExtenderFilterResult{ - FailedNodes: map[string]string{}, - }, nil + nodeName, ok := uuidToNode[normalizeGPUUUID(b.Spec.UUID)] + if !ok { + klog.V(4).InfoS("bound GPU not found on any registered node, skipping", + "app", appName, "uuid", b.Spec.UUID) + continue + } + if _, seen := nodeToBoundUUIDs[nodeName]; !seen { + nodeOrder = append(nodeOrder, nodeName) + } + nodeToBoundUUIDs[nodeName] = append(nodeToBoundUUIDs[nodeName], b.Spec.UUID) + } + // deterministic node selection order so concurrent pods of the same app + // fill nodes predictably (occupied nodes are skipped as they fill up). + sort.Strings(nodeOrder) + + candidate := make(map[string]struct{}) + if args.NodeNames != nil { + for _, nn := range *args.NodeNames { + candidate[nn] = struct{}{} } } + + targetNode := "" + var targetUUIDs []string + for _, nodeName := range nodeOrder { + // only consider nodes that survived the default scheduler's predicates + if len(candidate) > 0 { + if _, ok := candidate[nodeName]; !ok { + continue + } + } + // skip nodes already occupied by another pod of this app + occupied := false + for _, u := range nodeToBoundUUIDs[nodeName] { + if _, c := consumedByApp[normalizeGPUUUID(u)]; c { + occupied = true + break + } + } + if occupied { + continue + } + targetNode = nodeName + targetUUIDs = nodeToBoundUUIDs[nodeName] + break + } + + if targetNode == "" { + err := fmt.Errorf("no free node with GPUs bound to app %s (all bound nodes are occupied by existing pods or unschedulable)", appName) + s.recordScheduleFilterResultEvent(args.Pod, EventReasonInsufficientGPU, "", err) + return &extenderv1.ExtenderFilterResult{ + FailedNodes: map[string]string{}, + }, nil + } + + // pin the pod to the chosen node and request exactly its bound GPUs + args.NodeNames = &[]string{targetNode} for ctrIdx := range resourceReqs { for reqIdx, req := range resourceReqs[ctrIdx] { if req.Type != nvidia.NvidiaGPUDevice || req.Nums <= 0 { continue } // this assumes only one container in the pod has a gpu request - req.Nums = int32(len(matchedBindings)) + req.Nums = int32(len(targetUUIDs)) resourceReqs[ctrIdx][reqIdx] = req } } + for _, u := range targetUUIDs { + appendSelectedUUID(u) + } + klog.InfoS("app consume-policy=all: pinning pod to node holding all bound GPUs", + "app", appName, "node", targetNode, "gpuCount", len(targetUUIDs), "uuids", targetUUIDs) } - } - - nvidiaSummary := summarizeNVIDIARequests(resourceReqs) - selectedUUIDs := make([]string, 0) - selectedUUIDSet := make(map[string]struct{}) - appendSelectedUUID := func(uuid string) { - if uuid == "" { - return - } - if _, ok := selectedUUIDSet[uuid]; ok { - return - } - selectedUUIDSet[uuid] = struct{}{} - selectedUUIDs = append(selectedUUIDs, uuid) - } - - for _, b := range matchedBindings { - if _, occupied := consumedByApp[b.Spec.UUID]; occupied { - continue + } else { + // other policies (e.g. "single"): keep the pod's own requested GPU count and + // only constrain it to the app's bound GPUs that are still free. + for _, b := range matchedBindings { + if _, occupied := consumedByApp[normalizeGPUUUID(b.Spec.UUID)]; occupied { + continue + } + appendSelectedUUID(b.Spec.UUID) } - appendSelectedUUID(b.Spec.UUID) } + nvidiaSummary := summarizeNVIDIARequests(resourceReqs) if nvidiaSummary.requested > 0 && len(selectedUUIDs) < nvidiaSummary.requested { err := fmt.Errorf("insufficient GPUBindings for app %s, requested=%d, bound=%d", appName, nvidiaSummary.requested, len(selectedUUIDs)) s.recordScheduleFilterResultEvent(args.Pod, EventReasonInsufficientGPU, "", err)