From 7d40d07986e0b6c5e4c35f301c6ed6bfe5c480bd Mon Sep 17 00:00:00 2001 From: dkeven Date: Wed, 18 Mar 2026 15:40:42 +0800 Subject: [PATCH 1/2] fix(scheduler): clear pod usage fast if deleted by ourselves --- pkg/scheduler/pods.go | 11 ++++++ pkg/scheduler/routes/gpu_manage.go | 22 ++++-------- pkg/scheduler/scheduler.go | 54 +++++++++++++++++++++++------- pkg/util/util.go | 18 ---------- 4 files changed, 58 insertions(+), 47 deletions(-) diff --git a/pkg/scheduler/pods.go b/pkg/scheduler/pods.go index a54590665..597973619 100644 --- a/pkg/scheduler/pods.go +++ b/pkg/scheduler/pods.go @@ -140,6 +140,17 @@ func (m *podManager) ListPodsInfo() []*podInfo { return pods } +func (m *podManager) PodInfoToPodObj(pod *podInfo) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: pod.Namespace, + Name: pod.Name, + UID: pod.UID, + Annotations: map[string]string{util.AssignedNodeAnnotations: pod.NodeID}, + }, + } +} + func (m *podManager) GetScheduledPods() (map[k8stypes.UID]*podInfo, error) { m.mutex.RLock() defer m.mutex.RUnlock() diff --git a/pkg/scheduler/routes/gpu_manage.go b/pkg/scheduler/routes/gpu_manage.go index 4ffa4c4e3..e5fa2ceea 100644 --- a/pkg/scheduler/routes/gpu_manage.go +++ b/pkg/scheduler/routes/gpu_manage.go @@ -294,10 +294,8 @@ func AssignGPUToApp(s *scheduler.Scheduler) httprouter.Handle { for _, cdev := range cdevs { if cdev.UUID == uuid { klog.Infof("Forcing out pod %s/%s of exclusive GPU %s in favor of %s", pod.Namespace, pod.Name, uuid, req.AppName) - err = ctrlclient.IgnoreNotFound(client.GetClient().CoreV1().Pods(pod.Namespace).Delete(r.Context(), pod.Name, metav1.DeleteOptions{})) + err = s.DeletePodFromCluster(r.Context(), s.PodInfoToPodObj(pod)) if err != nil { - err = fmt.Errorf("failed to delete existing pod occupying GPU %s/%s: %v", pod.Namespace, pod.Name, err) - klog.Errorln(err) http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -362,10 +360,8 @@ func AssignGPUToApp(s *scheduler.Scheduler) httprouter.Handle { } // delete existing pods for this app - err = ctrlclient.IgnoreNotFound(util.DeletePodsBelongToApp(r.Context(), req.AppName)) + err = s.DeletePodsBelongToApp(r.Context(), req.AppName) if err != nil { - err = fmt.Errorf("failed to delete existing pods of app %s: %v", req.AppName, err) - klog.Errorln(err) http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -464,10 +460,8 @@ func SwitchGPUMode(s *scheduler.Scheduler) httprouter.Handle { for _, cdev := range cdevs { if cdev.UUID == uuid { klog.Infof("Deleting pod %s/%s for mode switch of GPU %s", pod.Namespace, pod.Name, uuid) - err = ctrlclient.IgnoreNotFound(client.GetClient().CoreV1().Pods(pod.Namespace).Delete(r.Context(), pod.Name, metav1.DeleteOptions{})) + err = s.DeletePodFromCluster(r.Context(), s.PodInfoToPodObj(pod)) if err != nil { - err = fmt.Errorf("failed to delete existing pod occupying GPU %s/%s: %v", pod.Namespace, pod.Name, err) - klog.Errorln(err) http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -555,7 +549,7 @@ func UnassignGPUFromApp(s *scheduler.Scheduler) httprouter.Handle { return } - if err := ctrlclient.IgnoreNotFound(util.DeletePodsBelongToApp(r.Context(), req.AppName)); err != nil { + if err := ctrlclient.IgnoreNotFound(s.DeletePodsBelongToApp(r.Context(), req.AppName)); err != nil { klog.Errorln(fmt.Errorf("failed to delete pods of app %s: %v", req.AppName, err)) http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -812,9 +806,7 @@ func BulkManageAssignments(s *scheduler.Scheduler) httprouter.Handle { for _, cdev := range cdevs { if _, needEvict := evictUUIDs[cdev.UUID]; needEvict { klog.Infof("Evicting pod %s/%s occupying exclusive GPU %s", pod.Namespace, pod.Name, cdev.UUID) - if err := ctrlclient.IgnoreNotFound(client.GetClient().CoreV1().Pods(pod.Namespace).Delete(r.Context(), pod.Name, metav1.DeleteOptions{})); err != nil { - err = fmt.Errorf("failed to delete existing pod occupying GPU %s/%s: %v", pod.Namespace, pod.Name, err) - klog.Errorln(err) + if err := s.DeletePodFromCluster(r.Context(), s.PodInfoToPodObj(pod)); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -834,9 +826,7 @@ func BulkManageAssignments(s *scheduler.Scheduler) httprouter.Handle { } // 2) Restart this app's pods due to binding changes - if err := ctrlclient.IgnoreNotFound(util.DeletePodsBelongToApp(r.Context(), req.AppName)); err != nil { - err = fmt.Errorf("failed to delete existing pods of app %s: %v", req.AppName, err) - klog.Errorln(err) + if err := s.DeletePodsBelongToApp(r.Context(), req.AppName); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 3caf83b72..0337816ce 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -98,7 +98,7 @@ func (s *Scheduler) onAddPod(obj any) { if !ok { return } - if k8sutil.IsPodInTerminatedState(pod) { + if k8sutil.IsPodInTerminatedState(pod) || pod.DeletionTimestamp != nil { s.delPod(pod) return } @@ -139,20 +139,48 @@ func (s *Scheduler) onDelPod(obj any) { return } p := pod.DeepCopy() - go func(nodeName string, p *corev1.Pod) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - node, err := s.kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) - if err != nil { - klog.Error("Skip releasing node lock: failed to get node", "node", nodeName, "pod", klog.KObj(p), "err", err) - return + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + node, err := s.kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + klog.Error("Skip releasing node lock: failed to get node", "node", nodeName, "pod", klog.KObj(p), "err", err) + return + } + for _, dev := range device.GetDevices() { + if err := dev.ReleaseNodeLock(node, p); err != nil { + klog.Error("ReleaseNodeLock returned error", "node", nodeName, "pod", klog.KObj(p), "err", err) } - for _, dev := range device.GetDevices() { - if err := dev.ReleaseNodeLock(node, p); err != nil { - klog.Error("ReleaseNodeLock returned error", "node", nodeName, "pod", klog.KObj(p), "err", err) - } + } +} + +func (s *Scheduler) DeletePodFromCluster(ctx context.Context, pod *corev1.Pod) error { + if pod == nil { + return nil + } + err := ctrlclient.IgnoreNotFound(s.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})) + if err != nil { + err = fmt.Errorf("failed to delete pod %s: %v", pod.Name, err) + klog.Errorln(err) + return err + } + s.onDelPod(pod) + return nil +} + +func (s *Scheduler) DeletePodsBelongToApp(ctx context.Context, appName string) error { + pods, err := s.kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", util.AppNameLabelKey, appName)}) + if err != nil { + err = fmt.Errorf("failed to list pods belonging to app %s: %v", appName, err) + klog.Errorln(err) + return err + } + for _, pod := range pods.Items { + err := s.DeletePodFromCluster(ctx, &pod) + if err != nil { + return err } - }(nodeName, p) + } + return nil } func (s *Scheduler) Start() { diff --git a/pkg/util/util.go b/pkg/util/util.go index 46ba63360..54aa51348 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -28,7 +28,6 @@ import ( "time" "github.com/Project-HAMi/HAMi/pkg/api/gpu/v1alpha1" - ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "github.com/Project-HAMi/HAMi/pkg/util/client" "github.com/Project-HAMi/HAMi/pkg/util/nodelock" @@ -423,23 +422,6 @@ func PatchPodAnnotations(pod *corev1.Pod, annotations map[string]string) error { return err } -func DeletePodsBelongToApp(ctx context.Context, appName string) error { - if appName == "" { - return errors.New("appName is empty") - } - pods, err := client.GetClient().CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", AppNameLabelKey, appName)}) - if err != nil { - return fmt.Errorf("failed to list pods belonging to app %s: %v", appName, err) - } - for _, pod := range pods.Items { - err := ctrlclient.IgnoreNotFound(client.GetClient().CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})) - if err != nil { - return fmt.Errorf("failed to delete pod %s: %v", pod.Name, err) - } - } - return nil -} - func DeleteGPUBinding(ctx context.Context, name string) error { binding := &v1alpha1.GPUBinding{ ObjectMeta: metav1.ObjectMeta{ From b67df4988fbe8374e89952d740798b1d95cd0940 Mon Sep 17 00:00:00 2001 From: dkeven Date: Wed, 18 Mar 2026 18:52:12 +0800 Subject: [PATCH 2/2] fix(scheduler): use a dedicated reason for insufficient GPU --- pkg/scheduler/event.go | 2 ++ pkg/scheduler/scheduler.go | 8 ++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/scheduler/event.go b/pkg/scheduler/event.go index 1e100e4eb..95896bb21 100644 --- a/pkg/scheduler/event.go +++ b/pkg/scheduler/event.go @@ -31,6 +31,8 @@ import ( // Define events for ResourceBinding, ResourceFilter objects and their associated resources. const ( + // EventReasonInsufficientGPU indicates that insufficient GPU resources are available. + EventReasonInsufficientGPU = "InsufficientGPU" // EventReasonFilteringFailed indicates that filtering failed. EventReasonFilteringFailed = "FilteringFailed" // EventReasonFilteringSucceed indicates that filtering succeed. diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 0337816ce..070658178 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -1069,7 +1069,7 @@ func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFi for _, b := range matchedBindings { if _, occupied := consumedByApp[b.Spec.UUID]; occupied { err := fmt.Errorf("bound GPU %s of app %s is already consumed by another pod", b.Spec.UUID, appName) - s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, "", err) + s.recordScheduleFilterResultEvent(args.Pod, EventReasonInsufficientGPU, "", err) return &extenderv1.ExtenderFilterResult{ FailedNodes: map[string]string{}, }, nil @@ -1131,7 +1131,7 @@ func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFi } if nvidiaSummary.requested > 0 && len(selectedUUIDs) < nvidiaSummary.requested { err := fmt.Errorf("insufficient GPU candidates for app %s, requested=%d, available=%d", appName, nvidiaSummary.requested, len(selectedUUIDs)) - s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, "", err) + s.recordScheduleFilterResultEvent(args.Pod, EventReasonInsufficientGPU, "", err) return &extenderv1.ExtenderFilterResult{ FailedNodes: map[string]string{}, }, nil @@ -1160,7 +1160,7 @@ func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFi if len((*nodeScores).NodeList) == 0 { klog.V(4).InfoS("No available nodes meet the required scores", "pod", args.Pod.Name) - s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, "", fmt.Errorf("no available node, %d nodes do not meet", len(*args.NodeNames))) + s.recordScheduleFilterResultEvent(args.Pod, EventReasonInsufficientGPU, "", fmt.Errorf("no available GPU resources on all %d nodes", len(*args.NodeNames))) return &extenderv1.ExtenderFilterResult{ FailedNodes: failedNodes, }, nil @@ -1232,7 +1232,7 @@ func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFi if totalMem > 0 && bindingAllocatedMemory[uuid]+requiredMem > totalMem { err := fmt.Errorf("insufficient mem-slicing GPU memory for binding on %s: allocated=%d, request=%d, total=%d", uuid, bindingAllocatedMemory[uuid], requiredMem, totalMem) klog.ErrorS(err, "Failed to create GPUBinding automatically", "pod", args.Pod.Name, "uuid", uuid) - s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, "", err) + s.recordScheduleFilterResultEvent(args.Pod, EventReasonInsufficientGPU, "", err) return nil, err } memQ := resource.NewQuantity(requiredMem, resource.BinarySI)