From 4b73a4db2a723ad1431ab8e04b2e2e69176294a8 Mon Sep 17 00:00:00 2001 From: aviadh Date: Mon, 13 Apr 2026 11:28:28 +0300 Subject: [PATCH] fix: respect cleanPodPolicy when job exceeds backoffLimit Move UpdateJobConditions(JobFailed) before DeletePodsAndServices in the jobExceedsLimit block so that IsFinished() returns true and the cleanPodPolicy: None guard is not bypassed. Fixes: https://github.com/kubeflow/trainer/issues/3419 Signed-off-by: Aviad Hayumi --- pkg/controller.v1/common/job.go | 10 ++- pkg/controller.v1/common/job_test.go | 97 ++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 6 deletions(-) diff --git a/pkg/controller.v1/common/job.go b/pkg/controller.v1/common/job.go index ee750c2d1d..01387a7359 100644 --- a/pkg/controller.v1/common/job.go +++ b/pkg/controller.v1/common/job.go @@ -220,8 +220,10 @@ func (jc *JobController) ReconcileJobs( jobStatus.CompletionTime = &now } - // If the Job exceeds backoff limit or is past active deadline - // delete all pods and services, then set the status to failed + // Fix: https://github.com/kubeflow/trainer/issues/3419 + jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage) + commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage) + if err := jc.DeletePodsAndServices(runtimeObject, runPolicy, jobStatus, pods); err != nil { return err } @@ -240,10 +242,6 @@ func (jc *JobController) ReconcileJobs( } } - jc.Recorder.Event(runtimeObject, corev1.EventTypeNormal, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage) - - commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, corev1.ConditionTrue, commonutil.NewReason(jobKind, commonutil.JobFailedReason), failureMessage) - return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus) } else { // General cases which need to reconcile diff --git a/pkg/controller.v1/common/job_test.go b/pkg/controller.v1/common/job_test.go index ca948b788b..f52a2f7fff 100644 --- a/pkg/controller.v1/common/job_test.go +++ b/pkg/controller.v1/common/job_test.go @@ -97,6 +97,12 @@ func TestDeletePodsAndServices(T *testing.T) { wantPods: &corev1.PodList{}, wantService: &corev1.ServiceList{}, }, + "Unfinished job with cleanPodPolicy None deletes pods (pre-fix backoffLimit bug)": { + cleanPodPolicy: apiv1.CleanPodPolicyNone, + jobCondition: "", + wantPods: &corev1.PodList{}, + wantService: &corev1.ServiceList{}, + }, } for name, tc := range cases { T.Run(name, func(t *testing.T) { @@ -261,6 +267,97 @@ func TestManagedByExternalController(T *testing.T) { } } +// TestBackoffLimitExceededRespectsCleanPodPolicy verifies that cleanPodPolicy is +// respected when a job exceeds its backoffLimit. See https://github.com/kubeflow/trainer/issues/3419 +func TestBackoffLimitExceededRespectsCleanPodPolicy(T *testing.T) { + cases := map[string]struct { + cleanPodPolicy apiv1.CleanPodPolicy + setConditionBefore bool + wantPodsPreserved bool + }{ + "cleanPodPolicy None with JobFailed set before delete (fixed)": { + cleanPodPolicy: apiv1.CleanPodPolicyNone, + setConditionBefore: true, + wantPodsPreserved: true, + }, + "cleanPodPolicy None without JobFailed set before delete (buggy)": { + cleanPodPolicy: apiv1.CleanPodPolicyNone, + setConditionBefore: false, + wantPodsPreserved: false, + }, + "cleanPodPolicy All with JobFailed set before delete": { + cleanPodPolicy: apiv1.CleanPodPolicyAll, + setConditionBefore: true, + wantPodsPreserved: false, + }, + "cleanPodPolicy Running with JobFailed set before delete": { + cleanPodPolicy: apiv1.CleanPodPolicyRunning, + setConditionBefore: true, + wantPodsPreserved: false, + }, + } + for name, tc := range cases { + T.Run(name, func(t *testing.T) { + masterPod := newPod("master-0", corev1.PodRunning) + masterPod.Status.ContainerStatuses = []corev1.ContainerStatus{ + {RestartCount: 3}, + } + workerPod := newPod("worker-0", corev1.PodRunning) + pods := []runtime.Object{masterPod, workerPod} + services := []runtime.Object{ + newService("master-0"), + newService("worker-0"), + } + + fakeClient := fake.NewSimpleClientset(append(pods, services...)...) + jobController := JobController{ + PodControl: control.RealPodControl{KubeClient: fakeClient, Recorder: &record.FakeRecorder{}}, + ServiceControl: control.RealServiceControl{KubeClient: fakeClient, Recorder: &record.FakeRecorder{}}, + } + + now := metav1.Now() + jobStatus := apiv1.JobStatus{ + CompletionTime: &now, + } + runPolicy := &apiv1.RunPolicy{ + CleanPodPolicy: &tc.cleanPodPolicy, + } + + if tc.setConditionBefore { + jobStatus.Conditions = append(jobStatus.Conditions, apiv1.JobCondition{ + Type: apiv1.JobFailed, + Status: corev1.ConditionTrue, + Reason: "BackoffLimitExceeded", + }) + } + + var inPods []*corev1.Pod + for i := range pods { + inPods = append(inPods, pods[i].(*corev1.Pod)) + } + + if err := jobController.DeletePodsAndServices(&testjobv1.TestJob{}, runPolicy, jobStatus, inPods); err != nil { + t.Fatalf("DeletePodsAndServices failed: %v", err) + } + + gotPods, err := fakeClient.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{}) + if err != nil { + t.Fatalf("Failed to list pods: %v", err) + } + + if tc.wantPodsPreserved { + if len(gotPods.Items) != 2 { + t.Errorf("Expected pods to be preserved, but got %d pods", len(gotPods.Items)) + } + } else { + if len(gotPods.Items) != 0 { + t.Errorf("Expected pods to be deleted, but got %d pods", len(gotPods.Items)) + } + } + }) + } +} + func newPod(name string, phase corev1.PodPhase) *corev1.Pod { pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{