Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"
"time"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -23,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
"pkg.package-operator.run/boxcutter"
"pkg.package-operator.run/boxcutter/machinery"
Expand All @@ -32,8 +34,8 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -83,29 +85,6 @@ func (c *ClusterObjectSetReconciler) Reconcile(ctx context.Context, req ctrl.Req
reconciledRev := existingRev.DeepCopy()
res, reconcileErr := c.reconcile(ctx, reconciledRev)

if pd := existingRev.Spec.ProgressDeadlineMinutes; pd > 0 {
cnd := meta.FindStatusCondition(reconciledRev.Status.Conditions, ocv1.ClusterObjectSetTypeProgressing)
isStillProgressing := cnd != nil && cnd.Status == metav1.ConditionTrue && cnd.Reason != ocv1.ReasonSucceeded
succeeded := meta.IsStatusConditionTrue(reconciledRev.Status.Conditions, ocv1.ClusterObjectSetTypeSucceeded)
// check if we reached the progress deadline only if the revision is still progressing and has not succeeded yet
if isStillProgressing && !succeeded {
timeout := time.Duration(pd) * time.Minute
if c.Clock.Since(existingRev.CreationTimestamp.Time) > timeout {
// progress deadline reached, reset any errors and stop reconciling this revision
markAsNotProgressing(reconciledRev, ocv1.ReasonProgressDeadlineExceeded, fmt.Sprintf("Revision has not rolled out for %d minute(s).", pd))
reconcileErr = nil
res = ctrl.Result{}
} else if reconcileErr == nil {
// We want to requeue so far in the future that the next reconciliation
// can detect if the revision did not progress within the given timeout.
// Thus, we plan the next reconcile slightly after (+2secs) the timeout is passed.
drift := 2 * time.Second
requeueAfter := existingRev.CreationTimestamp.Time.Add(timeout).Add(drift).Sub(c.Clock.Now()).Round(time.Second)
l.Info(fmt.Sprintf("ProgressDeadline not exceeded, requeue after ~%v to check again.", requeueAfter))
res = ctrl.Result{RequeueAfter: requeueAfter}
}
}
}
// Do checks before any Update()s, as Update() may modify the resource structure!
updateStatus := !equality.Semantic.DeepEqual(existingRev.Status, reconciledRev.Status)

Expand Down Expand Up @@ -146,6 +125,10 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
return c.delete(ctx, cos)
}

remaining, hasDeadline := durationUntilDeadline(c.Clock, cos)
Comment thread
joelanford marked this conversation as resolved.
isDeadlineExceeded := hasDeadline && remaining <= 0
Comment thread
joelanford marked this conversation as resolved.

// Blocked takes precedence over ProgressDeadlineExceeded: it is more actionable for the user.
if err := c.verifyReferencedSecretsImmutable(ctx, cos); err != nil {
l.Error(err, "referenced Secret verification failed, blocking reconciliation")
markAsNotProgressing(cos, ocv1.ClusterObjectSetReasonBlocked, err.Error())
Comment thread
joelanford marked this conversation as resolved.
Expand All @@ -154,7 +137,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl

phases, currentPhases, opts, err := c.buildBoxcutterPhases(ctx, cos)
if err != nil {
setRetryingConditions(cos, err.Error())
setRetryingConditions(l, cos, err.Error(), isDeadlineExceeded)
return ctrl.Result{}, fmt.Errorf("converting to boxcutter revision: %v", err)
}

Expand All @@ -168,7 +151,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl

revisionEngine, err := c.RevisionEngineFactory.CreateRevisionEngine(ctx, cos)
if err != nil {
setRetryingConditions(cos, err.Error())
setRetryingConditions(l, cos, err.Error(), isDeadlineExceeded)
return ctrl.Result{}, fmt.Errorf("failed to create revision engine: %v", err)
}

Expand All @@ -194,7 +177,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl

if err := c.establishWatch(ctx, cos, revision); err != nil {
werr := fmt.Errorf("establish watch: %v", err)
setRetryingConditions(cos, werr.Error())
setRetryingConditions(l, cos, werr.Error(), isDeadlineExceeded)
return ctrl.Result{}, werr
}

Expand All @@ -204,22 +187,22 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
// Log detailed reconcile reports only in debug mode (V(1)) to reduce verbosity.
l.V(1).Info("reconcile report", "report", rres.String())
}
setRetryingConditions(cos, err.Error())
setRetryingConditions(l, cos, err.Error(), isDeadlineExceeded)
return ctrl.Result{}, fmt.Errorf("revision reconcile: %v", err)
}

// Retry failing preflight checks with a flat 10s retry.
// TODO: report status, backoff?
if verr := rres.GetValidationError(); verr != nil {
l.Error(fmt.Errorf("%w", verr), "preflight validation failed, retrying after 10s")
setRetryingConditions(cos, fmt.Sprintf("revision validation error: %s", verr))
setRetryingConditions(l, cos, fmt.Sprintf("revision validation error: %s", verr), isDeadlineExceeded)
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}

for i, pres := range rres.GetPhases() {
if verr := pres.GetValidationError(); verr != nil {
l.Error(fmt.Errorf("%w", verr), "phase preflight validation failed, retrying after 10s", "phase", i)
setRetryingConditions(cos, fmt.Sprintf("phase %d validation error: %s", i, verr))
setRetryingConditions(l, cos, fmt.Sprintf("phase %d validation error: %s", i, verr), isDeadlineExceeded)
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}

Expand All @@ -232,14 +215,14 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl

if len(collidingObjs) > 0 {
l.Error(fmt.Errorf("object collision detected"), "object collision, retrying after 10s", "phase", i, "collisions", collidingObjs)
setRetryingConditions(cos, fmt.Sprintf("revision object collisions in phase %d\n%s", i, strings.Join(collidingObjs, "\n\n")))
setRetryingConditions(l, cos, fmt.Sprintf("revision object collisions in phase %d\n%s", i, strings.Join(collidingObjs, "\n\n")), isDeadlineExceeded)
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}
}

revVersion := cos.GetAnnotations()[labels.BundleVersionKey]
if rres.InTransition() {
markAsProgressing(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion))
markAsProgressing(l, cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion), isDeadlineExceeded)
}

//nolint:nestif
Expand All @@ -259,7 +242,7 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
}
}

markAsProgressing(cos, ocv1.ReasonSucceeded, fmt.Sprintf("Revision %s has rolled out.", revVersion))
markAsProgressing(l, cos, ocv1.ReasonSucceeded, fmt.Sprintf("Revision %s has rolled out.", revVersion), isDeadlineExceeded)
markAsAvailable(cos, ocv1.ClusterObjectSetReasonProbesSucceeded, "Objects are available and pass all probes.")

// We'll probably only want to remove this once we are done updating the ClusterExtension conditions
Expand Down Expand Up @@ -304,8 +287,9 @@ func (c *ClusterObjectSetReconciler) reconcile(ctx context.Context, cos *ocv1.Cl
} else {
markAsUnavailable(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion))
}
if meta.FindStatusCondition(cos.Status.Conditions, ocv1.ClusterObjectSetTypeProgressing) == nil {
markAsProgressing(cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion))
markAsProgressing(l, cos, ocv1.ReasonRollingOut, fmt.Sprintf("Revision %s is rolling out.", revVersion), isDeadlineExceeded)
if hasDeadline && !isDeadlineExceeded {
return ctrl.Result{RequeueAfter: remaining}, nil
}
}

Expand All @@ -324,14 +308,15 @@ func (c *ClusterObjectSetReconciler) delete(ctx context.Context, cos *ocv1.Clust
}

func (c *ClusterObjectSetReconciler) archive(ctx context.Context, revisionEngine RevisionEngine, cos *ocv1.ClusterObjectSet, revision boxcutter.RevisionBuilder) (ctrl.Result, error) {
l := log.FromContext(ctx)
tdres, err := revisionEngine.Teardown(ctx, revision)
if err != nil {
err = fmt.Errorf("error archiving revision: %v", err)
setRetryingConditions(cos, err.Error())
setRetryingConditions(l, cos, err.Error(), false)
return ctrl.Result{}, err
}
if tdres != nil && !tdres.IsComplete() {
setRetryingConditions(cos, "removing revision resources that are not owned by another revision")
setRetryingConditions(l, cos, "removing revision resources that are not owned by another revision", false)
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
// Ensure conditions are set before removing the finalizer when archiving
Expand All @@ -349,29 +334,19 @@ type Sourcoser interface {
}

func (c *ClusterObjectSetReconciler) SetupWithManager(mgr ctrl.Manager) error {
skipProgressDeadlineExceededPredicate := predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
rev, ok := e.ObjectNew.(*ocv1.ClusterObjectSet)
if !ok {
return true
}
// allow deletions to happen
if !rev.DeletionTimestamp.IsZero() {
return true
}
if cnd := meta.FindStatusCondition(rev.Status.Conditions, ocv1.ClusterObjectSetTypeProgressing); cnd != nil && cnd.Status == metav1.ConditionFalse && cnd.Reason == ocv1.ReasonProgressDeadlineExceeded {
return false
}
return true
},
}
c.Clock = clock.RealClock{}
Comment thread
joelanford marked this conversation as resolved.
return ctrl.NewControllerManagedBy(mgr).
WithOptions(controller.Options{
RateLimiter: newDeadlineAwareRateLimiter(
workqueue.DefaultTypedControllerRateLimiter[ctrl.Request](),
mgr.GetClient(),
c.Clock,
),
}).
For(
&ocv1.ClusterObjectSet{},
builder.WithPredicates(
predicate.ResourceVersionChangedPredicate{},
skipProgressDeadlineExceededPredicate,
),
).
WatchesRawSource(
Expand Down Expand Up @@ -659,14 +634,32 @@ func buildProgressionProbes(progressionProbes []ocv1.ProgressionProbe) (probing.
return userProbes, nil
}

func setRetryingConditions(cos *ocv1.ClusterObjectSet, message string) {
markAsProgressing(cos, ocv1.ClusterObjectSetReasonRetrying, message)
func setRetryingConditions(l logr.Logger, cos *ocv1.ClusterObjectSet, message string, isDeadlineExceeded bool) {
markAsProgressing(l, cos, ocv1.ClusterObjectSetReasonRetrying, message, isDeadlineExceeded)
if meta.FindStatusCondition(cos.Status.Conditions, ocv1.ClusterObjectSetTypeAvailable) != nil {
markAsAvailableUnknown(cos, ocv1.ClusterObjectSetReasonReconciling, message)
}
}

func markAsProgressing(cos *ocv1.ClusterObjectSet, reason, message string) {
var nonTerminalProgressingReasons = map[string]struct{}{
ocv1.ReasonRollingOut: {},
ocv1.ClusterObjectSetReasonRetrying: {},
}

func markAsProgressing(l logr.Logger, cos *ocv1.ClusterObjectSet, reason, message string, isDeadlineExceeded bool) {
switch reason {
case ocv1.ReasonSucceeded:
// Terminal — always apply.
Comment thread
joelanford marked this conversation as resolved.
default:
if _, known := nonTerminalProgressingReasons[reason]; !known {
l.Error(fmt.Errorf("unregistered progressing reason: %q", reason), "treating as non-terminal for deadline enforcement")
}
if isDeadlineExceeded {
markAsNotProgressing(cos, ocv1.ReasonProgressDeadlineExceeded,
fmt.Sprintf("Revision has not rolled out for %d minute(s). Last status: %s", cos.Spec.ProgressDeadlineMinutes, message))
Comment thread
joelanford marked this conversation as resolved.
return
}
}
meta.SetStatusCondition(&cos.Status.Conditions, metav1.Condition{
Type: ocv1.ClusterObjectSetTypeProgressing,
Status: metav1.ConditionTrue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ func Test_ClusterObjectSetReconciler_Reconcile_ProgressDeadline(t *testing.T) {
revisionResult: &mockRevisionResult{
inTransition: true,
},
reconcileResult: ctrl.Result{RequeueAfter: 62 * time.Second},
reconcileResult: ctrl.Result{RequeueAfter: 60 * time.Second},
validate: func(t *testing.T, c client.Client) {
rev := &ocv1.ClusterObjectSet{}
err := c.Get(t.Context(), client.ObjectKey{
Expand All @@ -1000,6 +1000,39 @@ func Test_ClusterObjectSetReconciler_Reconcile_ProgressDeadline(t *testing.T) {
require.Equal(t, ocv1.ReasonRollingOut, cnd.Reason)
},
},
{
name: "recovery from ProgressDeadlineExceeded to Succeeded when revision completes",
Comment thread
joelanford marked this conversation as resolved.
existingObjs: func() []client.Object {
ext := newTestClusterExtension()
rev1 := newTestClusterObjectSet(t, clusterObjectSetName, ext, testScheme)
rev1.Spec.ProgressDeadlineMinutes = 1
rev1.CreationTimestamp = metav1.NewTime(time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC))
meta.SetStatusCondition(&rev1.Status.Conditions, metav1.Condition{
Type: ocv1.ClusterObjectSetTypeProgressing,
Status: metav1.ConditionFalse,
Reason: ocv1.ReasonProgressDeadlineExceeded,
Message: "Revision has not rolled out for 1 minute(s). Last status: Revision 1.0.0 is rolling out.",
ObservedGeneration: rev1.Generation,
})
return []client.Object{rev1, ext}
},
clock: clocktesting.NewFakeClock(time.Date(2022, 1, 1, 0, 5, 0, 0, time.UTC)),
revisionResult: &mockRevisionResult{
isComplete: true,
},
validate: func(t *testing.T, c client.Client) {
rev := &ocv1.ClusterObjectSet{}
err := c.Get(t.Context(), client.ObjectKey{
Name: clusterObjectSetName,
}, rev)
require.NoError(t, err)
cnd := meta.FindStatusCondition(rev.Status.Conditions, ocv1.ClusterObjectSetTypeProgressing)
require.NotNil(t, cnd)
require.Equal(t, metav1.ConditionTrue, cnd.Status)
require.Equal(t, ocv1.ReasonSucceeded, cnd.Reason)
require.Equal(t, "Revision 1.0.0 has rolled out.", cnd.Message)
},
},
{
name: "no progression deadline checks on revision recovery",
existingObjs: func() []client.Object {
Expand Down
Loading
Loading