Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 118 additions & 27 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,31 @@ func bindingMatchesIdentity(binding *v1alpha1.GPUBinding, id appBindingIdentity)
return true
}

// buildGPUUUIDToNodeMap returns a mapping from GPU UUID to the name of the node
// the GPU is registered on. It is used to locate which node an app's bound GPUs
// live on so a pod can be pinned to a single node.
func (s *Scheduler) buildGPUUUIDToNodeMap() map[string]string {
res := make(map[string]string)
nodes, err := s.ListNodes()
if err != nil {
klog.ErrorS(err, "failed to list nodes for GPU UUID to node mapping")
return res
}
for nodeID, n := range nodes {
if n == nil {
continue
}
for _, d := range n.Devices {
uuid := normalizeGPUUUID(d.ID)
if uuid == "" {
continue
}
res[uuid] = nodeID
}
}
return res
}

func (s *Scheduler) collectConsumedGPUUUIDsByApp(identity appBindingIdentity, currentPod *corev1.Pod) map[string]struct{} {
consumed := make(map[string]struct{})
for _, p := range s.ListPodsInfo() {
Expand Down Expand Up @@ -856,51 +881,117 @@ func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFi
policyMode = args.Pod.Labels[nvidia.AppPodGPUConsumePolicyKey]
}
consumedByApp := s.collectConsumedGPUUUIDsByApp(identity, args.Pod)

selectedUUIDs := make([]string, 0)
selectedUUIDSet := make(map[string]struct{})
appendSelectedUUID := func(uuid string) {
if uuid == "" {
return
}
if _, ok := selectedUUIDSet[uuid]; ok {
return
}
selectedUUIDSet[uuid] = struct{}{}
selectedUUIDs = append(selectedUUIDs, uuid)
}

if policyMode == "" || policyMode == nvidia.AppPodGPUConsumePolicyAll {
// "all" policy: a single pod consumes all GPUs bound to this app that live on
// one node. Because a pod can only run on a single node, the app's bound GPUs
// may span several nodes. We therefore group the bound GPUs by node, skip nodes
// already occupied by other pods of this app (i.e. nodes whose bound GPUs are
// already consumed), and pin this pod to one remaining free node so that it
// takes all of that node's bound GPUs.
if len(matchedBindings) > 0 {
uuidToNode := s.buildGPUUUIDToNodeMap()
nodeToBoundUUIDs := make(map[string][]string)
nodeOrder := make([]string, 0)
for _, b := range matchedBindings {
if _, occupied := consumedByApp[b.Spec.UUID]; occupied {
err := fmt.Errorf("bound GPU %s of app %s is already consumed by another pod", b.Spec.UUID, appName)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonInsufficientGPU, "", err)
return &extenderv1.ExtenderFilterResult{
FailedNodes: map[string]string{},
}, nil
nodeName, ok := uuidToNode[normalizeGPUUUID(b.Spec.UUID)]
if !ok {
klog.V(4).InfoS("bound GPU not found on any registered node, skipping",
"app", appName, "uuid", b.Spec.UUID)
continue
}
if _, seen := nodeToBoundUUIDs[nodeName]; !seen {
nodeOrder = append(nodeOrder, nodeName)
}
nodeToBoundUUIDs[nodeName] = append(nodeToBoundUUIDs[nodeName], b.Spec.UUID)
}
// deterministic node selection order so concurrent pods of the same app
// fill nodes predictably (occupied nodes are skipped as they fill up).
sort.Strings(nodeOrder)

candidate := make(map[string]struct{})
if args.NodeNames != nil {
for _, nn := range *args.NodeNames {
candidate[nn] = struct{}{}
}
}

targetNode := ""
var targetUUIDs []string
for _, nodeName := range nodeOrder {
// only consider nodes that survived the default scheduler's predicates
if len(candidate) > 0 {
if _, ok := candidate[nodeName]; !ok {
continue
}
}
// skip nodes already occupied by another pod of this app
occupied := false
for _, u := range nodeToBoundUUIDs[nodeName] {
if _, c := consumedByApp[normalizeGPUUUID(u)]; c {
occupied = true
break
}
}
if occupied {
continue
}
targetNode = nodeName
targetUUIDs = nodeToBoundUUIDs[nodeName]
break
}

if targetNode == "" {
err := fmt.Errorf("no free node with GPUs bound to app %s (all bound nodes are occupied by existing pods or unschedulable)", appName)
s.recordScheduleFilterResultEvent(args.Pod, EventReasonInsufficientGPU, "", err)
return &extenderv1.ExtenderFilterResult{
FailedNodes: map[string]string{},
}, nil
}

// pin the pod to the chosen node and request exactly its bound GPUs
args.NodeNames = &[]string{targetNode}
for ctrIdx := range resourceReqs {
for reqIdx, req := range resourceReqs[ctrIdx] {
if req.Type != nvidia.NvidiaGPUDevice || req.Nums <= 0 {
continue
}
// this assumes only one container in the pod has a gpu request
req.Nums = int32(len(matchedBindings))
req.Nums = int32(len(targetUUIDs))
resourceReqs[ctrIdx][reqIdx] = req
}
}
for _, u := range targetUUIDs {
appendSelectedUUID(u)
}
klog.InfoS("app consume-policy=all: pinning pod to node holding all bound GPUs",
"app", appName, "node", targetNode, "gpuCount", len(targetUUIDs), "uuids", targetUUIDs)
}
}

nvidiaSummary := summarizeNVIDIARequests(resourceReqs)
selectedUUIDs := make([]string, 0)
selectedUUIDSet := make(map[string]struct{})
appendSelectedUUID := func(uuid string) {
if uuid == "" {
return
}
if _, ok := selectedUUIDSet[uuid]; ok {
return
}
selectedUUIDSet[uuid] = struct{}{}
selectedUUIDs = append(selectedUUIDs, uuid)
}

for _, b := range matchedBindings {
if _, occupied := consumedByApp[b.Spec.UUID]; occupied {
continue
} else {
// other policies (e.g. "single"): keep the pod's own requested GPU count and
// only constrain it to the app's bound GPUs that are still free.
for _, b := range matchedBindings {
if _, occupied := consumedByApp[normalizeGPUUUID(b.Spec.UUID)]; occupied {
continue
}
appendSelectedUUID(b.Spec.UUID)
}
appendSelectedUUID(b.Spec.UUID)
}

nvidiaSummary := summarizeNVIDIARequests(resourceReqs)
if nvidiaSummary.requested > 0 && len(selectedUUIDs) < nvidiaSummary.requested {
err := fmt.Errorf("insufficient GPUBindings for app %s, requested=%d, bound=%d", appName, nvidiaSummary.requested, len(selectedUUIDs))
s.recordScheduleFilterResultEvent(args.Pod, EventReasonInsufficientGPU, "", err)
Expand Down
Loading