Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 34 additions & 21 deletions cmd/memberagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,19 @@ var (
"Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.")
leaderElectionNamespace = flag.String("leader-election-namespace", "kube-system", "The namespace in which the leader election resource will be created.")
// TODO(weiweng): only keep enableV1Alpha1APIs for backward compatibility with helm charts. Remove soon.
enableV1Alpha1APIs = flag.Bool("enable-v1alpha1-apis", false, "If set, the agents will watch for the v1alpha1 APIs. This is deprecated and will be removed soon.")
enableV1Beta1APIs = flag.Bool("enable-v1beta1-apis", true, "If set, the agents will watch for the v1beta1 APIs.")
propertyProvider = flag.String("property-provider", "none", "The property provider to use for the agent.")
region = flag.String("region", "", "The region where the member cluster resides.")
cloudConfigFile = flag.String("cloud-config", "/etc/kubernetes/provider/config.json", "The path to the cloud cloudconfig file.")
watchWorkWithPriorityQueue = flag.Bool("enable-watch-work-with-priority-queue", false, "If set, the apply_work controller will watch/reconcile work objects that are created new or have recent updates")
watchWorkReconcileAgeMinutes = flag.Int("watch-work-reconcile-age", 60, "maximum age (in minutes) of work objects for apply_work controller to watch/reconcile")
deletionWaitTime = flag.Int("deletion-wait-time", 5, "The time the work-applier will wait for work object to be deleted before updating the applied work owner reference")
enablePprof = flag.Bool("enable-pprof", false, "enable pprof profiling")
pprofPort = flag.Int("pprof-port", 6065, "port for pprof profiling")
hubPprofPort = flag.Int("hub-pprof-port", 6066, "port for hub pprof profiling")
hubQPS = flag.Float64("hub-api-qps", 50, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
hubBurst = flag.Int("hub-api-burst", 500, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
memberQPS = flag.Float64("member-api-qps", 250, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
memberBurst = flag.Int("member-api-burst", 1000, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
enableV1Alpha1APIs = flag.Bool("enable-v1alpha1-apis", false, "If set, the agents will watch for the v1alpha1 APIs. This is deprecated and will be removed soon.")
enableV1Beta1APIs = flag.Bool("enable-v1beta1-apis", true, "If set, the agents will watch for the v1beta1 APIs.")
propertyProvider = flag.String("property-provider", "none", "The property provider to use for the agent.")
region = flag.String("region", "", "The region where the member cluster resides.")
cloudConfigFile = flag.String("cloud-config", "/etc/kubernetes/provider/config.json", "The path to the cloud cloudconfig file.")
deletionWaitTime = flag.Int("deletion-wait-time", 5, "The time the work-applier will wait for work object to be deleted before updating the applied work owner reference")
enablePprof = flag.Bool("enable-pprof", false, "enable pprof profiling")
pprofPort = flag.Int("pprof-port", 6065, "port for pprof profiling")
hubPprofPort = flag.Int("hub-pprof-port", 6066, "port for hub pprof profiling")
hubQPS = flag.Float64("hub-api-qps", 50, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
hubBurst = flag.Int("hub-api-burst", 500, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
memberQPS = flag.Float64("member-api-qps", 250, "QPS to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")
memberBurst = flag.Int("member-api-burst", 1000, "Burst to use while talking with fleet-apiserver. Doesn't cover events and node heartbeat apis which rate limiting is controlled by a different set of flags.")

// Work applier requeue rate limiter settings.
workApplierRequeueRateLimiterAttemptsWithFixedDelay = flag.Int("work-applier-requeue-rate-limiter-attempts-with-fixed-delay", 1, "If set, the work applier will requeue work objects with a fixed delay for the specified number of attempts before switching to exponential backoff.")
Expand All @@ -102,6 +100,12 @@ var (
workApplierRequeueRateLimiterExponentialBaseForFastBackoff = flag.Float64("work-applier-requeue-rate-limiter-exponential-base-for-fast-backoff", 1.5, "If set, the work applier will start to back off fast at this factor after it completes the slow backoff stage, until it reaches the fast backoff delay cap. Its value should be larger than the base value for the slow backoff stage.")
workApplierRequeueRateLimiterMaxFastBackoffDelaySeconds = flag.Float64("work-applier-requeue-rate-limiter-max-fast-backoff-delay-seconds", 900, "If set, the work applier will not back off longer than this value in seconds when it is in the fast backoff stage.")
workApplierRequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs = flag.Bool("work-applier-requeue-rate-limiter-skip-to-fast-backoff-for-available-or-diff-reported-work-objs", true, "If set, the rate limiter will skip the slow backoff stage and start fast backoff immediately for work objects that are available or have diff reported.")

// Work applier priority queue settings.
enableWorkApplierPriorityQueue = flag.Bool("enable-work-applier-priority-queue", false, "If set, the work applier will use a priority queue to process work objects.")
workApplierPriorityLinearEquationCoeffA = flag.Int("work-applier-priority-linear-equation-coeff-a", -3, "The work applier sets the priority for a Work object processing attempt using the linear equation: priority = A * (work object age in minutes) + B. This flag sets the coefficient A in the equation.")
workApplierPriorityLinearEquationCoeffB = flag.Int("work-applier-priority-linear-equation-coeff-b", 100, "The work applier sets the priority for a Work object processing attempt using the linear equation: priority = A * (work object age in minutes) + B. This flag sets the coefficient B in the equation.")

// Azure property provider feature gates.
isAzProviderCostPropertiesEnabled = flag.Bool("use-cost-properties-in-azure-provider", true, "If set, the Azure property provider will expose cost properties in the member cluster.")
isAzProviderAvailableResPropertiesEnabled = flag.Bool("use-available-res-properties-in-azure-provider", true, "If set, the Azure property provider will expose available resources properties in the member cluster.")
Expand Down Expand Up @@ -133,6 +137,13 @@ func main() {
klog.ErrorS(errors.New("either enable-v1alpha1-apis or enable-v1beta1-apis is required"), "Invalid APIs flags")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
// TO-DO (chenyu1): refactor the validation logic.
if workApplierPriorityLinearEquationCoeffA == nil || *workApplierPriorityLinearEquationCoeffA >= 0 {
klog.ErrorS(errors.New("parameter workApplierPriorityLinearEquationCoeffA is set incorrectly; must use a value less than 0"), "InvalidFlag", "workApplierPriorityLinearEquationCoeffA")
}
if workApplierPriorityLinearEquationCoeffB == nil || *workApplierPriorityLinearEquationCoeffB <= 0 {
klog.ErrorS(errors.New("parameter workApplierPriorityLinearEquationCoeffB is set incorrectly; must use a value greater than 0"), "InvalidFlag", "workApplierPriorityLinearEquationCoeffB")
}

hubURL := os.Getenv("HUB_SERVER_URL")

Expand Down Expand Up @@ -373,7 +384,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
klog.ErrorS(err, "unable to find the required CRD", "GVK", gvk)
return err
}
// create the work controller, so we can pass it to the internal member cluster reconciler
// Set up the work applier. Note that it is referenced by the InternalMemberCluster controller.

// Set up the requeue rate limiter for the work applier.
//
Expand Down Expand Up @@ -413,7 +424,8 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
*workApplierRequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs,
)

workController := workapplier.NewReconciler(
workApplier := workapplier.NewReconciler(
"work-applier",
hubMgr.GetClient(),
targetNS,
spokeDynamicClient,
Expand All @@ -426,12 +438,13 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
// Use the default worker count (4) for parallelized manifest processing.
parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers),
time.Minute*time.Duration(*deletionWaitTime),
*watchWorkWithPriorityQueue,
*watchWorkReconcileAgeMinutes,
requeueRateLimiter,
*enableWorkApplierPriorityQueue,
workApplierPriorityLinearEquationCoeffA,
workApplierPriorityLinearEquationCoeffB,
)

if err = workController.SetupWithManager(hubMgr); err != nil {
if err = workApplier.SetupWithManager(hubMgr); err != nil {
klog.ErrorS(err, "Failed to create v1beta1 controller", "controller", "work")
return err
}
Expand Down Expand Up @@ -459,7 +472,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
ctx,
hubMgr.GetClient(),
memberMgr.GetConfig(), memberMgr.GetClient(),
workController,
workApplier,
pp)
if err != nil {
klog.ErrorS(err, "Failed to create InternalMemberCluster v1beta1 reconciler")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ var _ = BeforeSuite(func() {

// This controller is created for testing purposes only; no reconciliation loop is actually
// run.
workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil)
workApplier1 = workapplier.NewReconciler("work-applier-1", hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, nil, nil)

propertyProvider1 = &manuallyUpdatedProvider{}
member1Reconciler, err := NewReconciler(ctx, hubClient, member1Cfg, member1Client, workApplier1, propertyProvider1)
Expand All @@ -402,7 +402,7 @@ var _ = BeforeSuite(func() {

// This controller is created for testing purposes only; no reconciliation loop is actually
// run.
workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil)
workApplier2 = workapplier.NewReconciler("work-applier-2", hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, nil, nil)

member2Reconciler, err := NewReconciler(ctx, hubClient, member2Cfg, member2Client, workApplier2, nil)
Expect(err).NotTo(HaveOccurred())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,14 @@ func (r *Reconciler) syncRoleBinding(ctx context.Context, mc *clusterv1beta1.Mem
Name: roleName,
},
}
// For User and Group kind, the APIGroup is defaulted to rbac.authorization.k8s.io if not set.
// Reference: https://pkg.go.dev/k8s.io/api/rbac/v1#Subject
for i := range expectedRoleBinding.Subjects {
subj := &expectedRoleBinding.Subjects[i]
if subj.APIGroup == "" && (subj.Kind == rbacv1.GroupKind || subj.Kind == rbacv1.UserKind) {
subj.APIGroup = rbacv1.GroupName
}
}

// Creates role binding if not found.
var currentRoleBinding rbacv1.RoleBinding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,11 @@ func TestSyncRole(t *testing.T) {

func TestSyncRoleBinding(t *testing.T) {
identity := rbacv1.Subject{
APIGroup: "rbac.authorization.k8s.io",
Kind: "User",
Name: "MemberClusterIdentity",
}
identityWithoutGroup := rbacv1.Subject{
Kind: "User",
Name: "MemberClusterIdentity",
}
Expand Down Expand Up @@ -428,6 +433,40 @@ func TestSyncRoleBinding(t *testing.T) {
roleName: "fleet-role-mc1",
wantedError: "",
},
"identity without APIGroup should not trigger roleBinding reconcile": {
r: &Reconciler{
Client: &test.MockClient{
MockGet: func(ctx context.Context, key client.ObjectKey, obj client.Object) error {
roleRef := rbacv1.RoleRef{
APIGroup: rbacv1.GroupName,
Kind: "Role",
Name: "fleet-role-mc1",
}
o := obj.(*rbacv1.RoleBinding)
*o = rbacv1.RoleBinding{
TypeMeta: metav1.TypeMeta{
Kind: "RoleBinding",
APIVersion: rbacv1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: "fleet-rolebinding-mc1",
Namespace: namespace1,
},
Subjects: []rbacv1.Subject{identity}, // Returned roleBinding has APIGroup set.
RoleRef: roleRef,
}
return nil
},
},
},
memberCluster: &clusterv1beta1.MemberCluster{
ObjectMeta: metav1.ObjectMeta{Name: "mc1"},
Spec: clusterv1beta1.MemberClusterSpec{Identity: identityWithoutGroup},
},
namespaceName: namespace1,
roleName: "fleet-role-mc1",
wantedError: "",
},
"role binding but with diff": {
r: &Reconciler{
Client: &test.MockClient{
Expand Down
20 changes: 10 additions & 10 deletions pkg/controllers/updaterun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"errors"
"fmt"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -51,9 +50,10 @@ import (
var (
// errStagedUpdatedAborted is the error when the updateRun is aborted.
errStagedUpdatedAborted = fmt.Errorf("cannot continue the updateRun")
// errInitializedFailed is the error when the updateRun fails to initialize.
// It is a wrapped error of errStagedUpdatedAborted, because some initialization functions are reused in the validation step.
errInitializedFailed = fmt.Errorf("%w: failed to initialize the updateRun", errStagedUpdatedAborted)
// errValidationFailed is the error when the updateRun fails validation.
// It is a wrapped error of errStagedUpdatedAborted, because we perform validation during initialization
// and subsequent reconciliations where initialization is skipped.
errValidationFailed = fmt.Errorf("%w: failed to validate the updateRun", errStagedUpdatedAborted)
)

// Reconciler reconciles an updateRun object.
Expand Down Expand Up @@ -125,8 +125,8 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
var initErr error
if toBeUpdatedBindings, toBeDeletedBindings, initErr = r.initialize(ctx, updateRun); initErr != nil {
klog.ErrorS(initErr, "Failed to initialize the updateRun", "updateRun", runObjRef)
// errInitializedFailed cannot be retried.
if errors.Is(initErr, errInitializedFailed) {
// errStagedUpdatedAborted cannot be retried.
if errors.Is(initErr, errStagedUpdatedAborted) {
return runtime.Result{}, r.recordInitializationFailed(ctx, updateRun, initErr.Error())
}
return runtime.Result{}, initErr
Expand Down Expand Up @@ -488,26 +488,26 @@ func handleApprovalRequestDelete(obj client.Object, q workqueue.TypedRateLimitin
// emitUpdateRunStatusMetric emits the update run status metric based on status conditions in the updateRun.
func emitUpdateRunStatusMetric(updateRun placementv1beta1.UpdateRunObj) {
generation := updateRun.GetGeneration()
genStr := strconv.FormatInt(generation, 10)
state := updateRun.GetUpdateRunSpec().State

updateRunStatus := updateRun.GetUpdateRunStatus()
succeedCond := meta.FindStatusCondition(updateRunStatus.Conditions, string(placementv1beta1.StagedUpdateRunConditionSucceeded))
if succeedCond != nil && succeedCond.ObservedGeneration == generation {
hubmetrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.GetNamespace(), updateRun.GetName(), genStr,
hubmetrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.GetNamespace(), updateRun.GetName(), string(state),
string(placementv1beta1.StagedUpdateRunConditionSucceeded), string(succeedCond.Status), succeedCond.Reason).SetToCurrentTime()
return
}

progressingCond := meta.FindStatusCondition(updateRunStatus.Conditions, string(placementv1beta1.StagedUpdateRunConditionProgressing))
if progressingCond != nil && progressingCond.ObservedGeneration == generation {
hubmetrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.GetNamespace(), updateRun.GetName(), genStr,
hubmetrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.GetNamespace(), updateRun.GetName(), string(state),
string(placementv1beta1.StagedUpdateRunConditionProgressing), string(progressingCond.Status), progressingCond.Reason).SetToCurrentTime()
return
}

initializedCond := meta.FindStatusCondition(updateRunStatus.Conditions, string(placementv1beta1.StagedUpdateRunConditionInitialized))
if initializedCond != nil && initializedCond.ObservedGeneration == generation {
hubmetrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.GetNamespace(), updateRun.GetName(), genStr,
hubmetrics.FleetUpdateRunStatusLastTimestampSeconds.WithLabelValues(updateRun.GetNamespace(), updateRun.GetName(), string(state),
string(placementv1beta1.StagedUpdateRunConditionInitialized), string(initializedCond.Status), initializedCond.Reason).SetToCurrentTime()
return
}
Expand Down
Loading
Loading