-
Notifications
You must be signed in to change notification settings - Fork 947
feat: Add success policy #1506
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add success policy #1506
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| package v1 | ||
|
|
||
| // SuccessPolicy is the success policy. | ||
| type SuccessPolicy string | ||
|
|
||
| const ( | ||
| SuccessPolicyDefault SuccessPolicy = "" | ||
| SuccessPolicyAllWorkers SuccessPolicy = "AllWorkers" | ||
| ) |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ package pytorch | |
| import ( | ||
| "context" | ||
| "fmt" | ||
| "strings" | ||
|
|
||
| "github.com/go-logr/logr" | ||
| commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" | ||
|
|
@@ -326,6 +327,8 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, | |
| return fmt.Errorf("%+v is not a type of PyTorchJob", job) | ||
| } | ||
|
|
||
| logger := commonutil.LoggerForJob(pytorchjob) | ||
|
|
||
| for rtype, spec := range replicas { | ||
| status := jobStatus.ReplicaStatuses[rtype] | ||
| if status.LabelSelector == nil { | ||
|
|
@@ -338,7 +341,7 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, | |
| running := status.Active | ||
| failed := status.Failed | ||
|
|
||
| logrus.Infof("PyTorchJob=%s, ReplicaType=%s expected=%d, running=%d, succeeded=%d , failed=%d", | ||
| logger.Infof("PyTorchJob=%s, ReplicaType=%s expected=%d, running=%d, succeeded=%d , failed=%d", | ||
| pytorchjob.Name, rtype, expected, running, succeeded, failed) | ||
|
|
||
| if ContainsMasterSpec(replicas) { | ||
|
|
@@ -347,32 +350,40 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, | |
| msg := fmt.Sprintf("PyTorchJob %s is running.", pytorchjob.Name) | ||
| err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobRunning, commonutil.JobRunningReason, msg) | ||
| if err != nil { | ||
| commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) | ||
| logger.Infof("Append job condition error: %v", err) | ||
| return err | ||
| } | ||
| } | ||
| // when master is succeed, the job is finished. | ||
| if expected == 0 { | ||
| msg := fmt.Sprintf("PyTorchJob %s is successfully completed.", pytorchjob.Name) | ||
| logrus.Info(msg) | ||
| logger.Info(msg) | ||
| r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.JobSucceededReason, msg) | ||
| if jobStatus.CompletionTime == nil { | ||
| now := metav1.Now() | ||
| jobStatus.CompletionTime = &now | ||
| } | ||
| err := commonutil.UpdateJobConditions(jobStatus, commonv1.JobSucceeded, commonutil.JobSucceededReason, msg) | ||
| if err != nil { | ||
| commonutil.LoggerForJob(pytorchjob).Infof("Append job condition error: %v", err) | ||
| logger.Infof("Append job condition error: %v", err) | ||
| return err | ||
| } | ||
| trainingoperatorcommon.SuccessfulJobsCounterInc(pytorchjob.Namespace, pytorchv1.FrameworkName) | ||
| return nil | ||
| } | ||
| } | ||
| } else { | ||
|
|
||
| if rtype == pytorchv1.PyTorchReplicaTypeWorker { | ||
| // TODO(gaocegege): Support SuccessPolicy | ||
| if expected == 0 { | ||
| worker0Completed, err := r.IsWorker0Completed(pytorchjob, replicas) | ||
| if err != nil { | ||
| logger.Warnf("check if worker 0 completed error %v", err) | ||
| return err | ||
| } | ||
| // Leave a succeeded condition for the following two cases: | ||
| // 1. If default success policy is used and worker 0 has completed. | ||
| // 2. If `SuccessPolicyAllWorkers` success policy is used and all workers are succeeded. | ||
| if expected == 0 || (worker0Completed && *pytorchjob.Spec.SuccessPolicy != pytorchv1.SuccessPolicyAllWorkers) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. L387 might take a bit of time to understand. Maybe it's because the order of the two conditions are not arranged as the comment lists.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comments and conditions are copied from TF. I will refine it soon. |
||
| msg := fmt.Sprintf("TFJob %s/%s successfully completed.", | ||
| pytorchjob.Namespace, pytorchjob.Name) | ||
| r.recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.JobSucceededReason, msg) | ||
|
|
@@ -430,6 +441,53 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{}, | |
| return nil | ||
| } | ||
|
|
||
| // IsWorker0Completed returns true if pod of worker0 succeeded and exited with 0 | ||
| func (p *PyTorchJobReconciler) IsWorker0Completed(job *pytorchv1.PyTorchJob, | ||
| replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec) (bool, error) { | ||
| worker0Completed := false | ||
| _, ok := replicas[pytorchv1.PyTorchReplicaTypeWorker] | ||
| if !ok { | ||
| return true, nil | ||
| } | ||
| podSlices, err := p.getPodSlices(job, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I understand the slice approach definitely works. But what is the point to create a the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is to keep consistency with TF controller. The code is copied from it. |
||
| replicas[pytorchv1.PyTorchReplicaTypeWorker].Replicas) | ||
| if err != nil { | ||
| return false, err | ||
| } | ||
| for index, podSlice := range podSlices { | ||
| if len(podSlice) == 1 { | ||
| pod := podSlice[0] | ||
| exitCode := util.GetContainerExitCode(pod, pytorchv1.DefaultContainerName) | ||
| if index == 0 && exitCode == 0 && pod.Status.Phase == corev1.PodSucceeded { | ||
| worker0Completed = true | ||
| } | ||
| } | ||
| } | ||
| return worker0Completed, nil | ||
| } | ||
|
|
||
| // getPodSlices returns a slice, which element is the slice of pod. | ||
| // It gives enough information to caller to make decision to up/down scale resources. | ||
| func (p *PyTorchJobReconciler) getPodSlices( | ||
| job *pytorchv1.PyTorchJob, replicasNum *int32) ([][]*corev1.Pod, error) { | ||
| logger := commonutil.LoggerForReplica(job, strings.ToLower(string(pytorchv1.PyTorchReplicaTypeWorker))) | ||
|
|
||
| pods, err := p.GetPodsForJob(job) | ||
| if err != nil { | ||
| commonutil.LoggerForJob(job).Warnf("getPodsForTFJob error %v", err) | ||
| return nil, err | ||
| } | ||
|
|
||
| // Get all pods for the type rt. | ||
| pods, err = p.JobController.FilterPodsForReplicaType(pods, strings.ToLower(string(pytorchv1.PyTorchReplicaTypeWorker))) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| podSlices := p.GetPodSlices(pods, int(*replicasNum), logger) | ||
| return podSlices, nil | ||
| } | ||
|
|
||
| // ContainsMasterSpec returns true if the tfjob contains master spec. | ||
| func ContainsMasterSpec(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec) bool { | ||
| if _, ok := replicas[pytorchv1.PyTorchReplicaTypeMaster]; ok { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
namerefers thecontainerName?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.