diff --git a/internal/controller/postgrescluster_controller_test.go b/internal/controller/postgrescluster_controller_test.go index 9d4954d61..96ac67987 100644 --- a/internal/controller/postgrescluster_controller_test.go +++ b/internal/controller/postgrescluster_controller_test.go @@ -19,7 +19,9 @@ package controller import ( "context" "fmt" + "strings" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" @@ -36,7 +38,7 @@ import ( enterprisev4 "github.com/splunk/splunk-operator/api/v4" "github.com/splunk/splunk-operator/pkg/postgresql/cluster/core" - pgprometheus "github.com/splunk/splunk-operator/pkg/postgresql/shared/adapter/prometheus" + "github.com/splunk/splunk-operator/pkg/postgresql/shared/adapter/prometheus" ) /* @@ -52,6 +54,20 @@ import ( * PC-09 ignores no-op updates */ +func containsEvents(events *[]string, recorder *record.FakeRecorder, eventType string, event string) bool { + for { + select { + case e := <-recorder.Events: + *events = append(*events, e) + if strings.Contains(e, eventType) && strings.Contains(e, event) { + return true + } + default: + return false + } + } +} + var _ = Describe("PostgresCluster Controller", Label("postgres"), func() { const ( @@ -77,6 +93,7 @@ var _ = Describe("PostgresCluster Controller", Label("postgres"), func() { pgClusterClassKey types.NamespacedName reconciler *PostgresClusterReconciler req reconcile.Request + fakeRecorder *record.FakeRecorder ) reconcileNTimes := func(times int) { @@ -119,15 +136,19 @@ var _ = Describe("PostgresCluster Controller", Label("postgres"), func() { Spec: enterprisev4.PostgresClusterSpec{ Class: className, ClusterDeletionPolicy: &[]string{deletePolicy}[0], + ManagedRoles: []enterprisev4.ManagedRole{ + {Name: "app_user", Exists: true}, + {Name: "app_user_rw", Exists: true}, + }, }, } - + fakeRecorder = record.NewFakeRecorder(100) reconciler = &PostgresClusterReconciler{ Client: k8sClient, Scheme: k8sClient.Scheme(), - Recorder: record.NewFakeRecorder(100), - Metrics: &pgprometheus.NoopRecorder{}, - FleetCollector: pgprometheus.NewFleetCollector(), + Recorder: fakeRecorder, + Metrics: &prometheus.NoopRecorder{}, + FleetCollector: prometheus.NewFleetCollector(), } req = reconcile.Request{NamespacedName: types.NamespacedName{Name: clusterName, Namespace: namespace}} }) @@ -205,12 +226,17 @@ var _ = Describe("PostgresCluster Controller", Label("postgres"), func() { cond := meta.FindStatusCondition(pc.Status.Conditions, "ClusterReady") Expect(cond).NotTo(BeNil()) Expect(cond.Status).To(Equal(metav1.ConditionFalse)) - Expect(cond.Reason).To(Equal("ClusterBuildSucceeded")) + Expect(cond.Reason).To(Equal("CNPGClusterProvisioning")) // Simulate external CNPG controller status progression. cnpg := &cnpgv1.Cluster{} Expect(k8sClient.Get(ctx, pgClusterKey, cnpg)).To(Succeed()) cnpg.Status.Phase = cnpgv1.PhaseHealthy + cnpg.Status.ManagedRolesStatus = cnpgv1.ManagedRoles{ + ByStatus: map[cnpgv1.RoleStatus][]string{ + cnpgv1.RoleStatusReconciled: {"app_user", "app_user_rw"}, + }, + } Expect(k8sClient.Status().Update(ctx, cnpg)).To(Succeed()) reconcileNTimes(1) @@ -220,9 +246,47 @@ var _ = Describe("PostgresCluster Controller", Label("postgres"), func() { Expect(cond).NotTo(BeNil()) Expect(cond.Status).To(Equal(metav1.ConditionTrue)) Expect(cond.Reason).To(Equal("CNPGClusterHealthy")) + + secretCond := meta.FindStatusCondition(pc.Status.Conditions, "SecretsReady") + Expect(secretCond).NotTo(BeNil()) + Expect(secretCond.Status).To(Equal(metav1.ConditionTrue)) + Expect(secretCond.Reason).To(Equal("SuperUserSecretReady")) + + configMapCond := meta.FindStatusCondition(pc.Status.Conditions, "ConfigMapsReady") + Expect(configMapCond).NotTo(BeNil()) + Expect(configMapCond.Status).To(Equal(metav1.ConditionTrue)) + Expect(configMapCond.Reason).To(Equal("ConfigMapReconciled")) + + managedRolesCond := meta.FindStatusCondition(pc.Status.Conditions, "ManagedRolesReady") + Expect(managedRolesCond).NotTo(BeNil()) + Expect(managedRolesCond.Status).To(Equal(metav1.ConditionTrue)) + Expect(managedRolesCond.Reason).To(Equal("ManagedRolesReconciled")) + + // Pooler is disabled in this suite fixture, but converge publishes PoolerReady=True with disabled message. + poolerCond := meta.FindStatusCondition(pc.Status.Conditions, "PoolerReady") + Expect(poolerCond).NotTo(BeNil()) + Expect(poolerCond.Status).To(Equal(metav1.ConditionTrue)) + Expect(poolerCond.Reason).To(Equal("AllInstancesReady")) + Expect(poolerCond.Message).To(Equal("Connection pooler disabled")) + + Expect(pc.Status.ManagedRolesStatus).NotTo(BeNil()) + Expect(pc.Status.ManagedRolesStatus.Reconciled).To(ContainElements("app_user", "app_user_rw")) + + Expect(pc.Status.Phase).NotTo(BeNil()) + Expect(*pc.Status.Phase).To(Equal("Ready")) + Expect(pc.Status.ProvisionerRef).NotTo(BeNil()) + Expect(pc.Status.ProvisionerRef.Kind).To(Equal("Cluster")) + Expect(pc.Status.ProvisionerRef.Name).To(Equal(clusterName)) + Expect(pc.Status.Resources).NotTo(BeNil()) Expect(pc.Status.Resources.SuperUserSecretRef).NotTo(BeNil()) Expect(pc.Status.Resources.ConfigMapRef).NotTo(BeNil()) + + received := make([]string, 0, 8) + Expect(containsEvents( + &received, fakeRecorder, + v1.EventTypeNormal, core.EventClusterReady, + )).To(BeTrue(), "events seen: %v", received) }) // PC-07 @@ -291,7 +355,7 @@ var _ = Describe("PostgresCluster Controller", Label("postgres"), func() { When("reconciling with invalid or drifted dependencies", func() { // PC-05 Context("when referenced class does not exist", func() { - It("fails with class-not-found condition", func() { + It("fails with class-not-found condition and emits a warning event", func() { badName := "bad-" + clusterName badKey := types.NamespacedName{Name: badName, Namespace: namespace} @@ -316,6 +380,12 @@ var _ = Describe("PostgresCluster Controller", Label("postgres"), func() { cond := meta.FindStatusCondition(current.Status.Conditions, "ClusterReady") return cond != nil && cond.Reason == "ClusterClassNotFound" }, "20s", "250ms").Should(BeTrue()) + + received := make([]string, 0, 8) + Expect(containsEvents( + &received, fakeRecorder, + v1.EventTypeWarning, core.EventClusterClassNotFound, + )).To(BeTrue(), "events seen: %v", received) }) }) diff --git a/pkg/postgresql/cluster/core/cluster.go b/pkg/postgresql/cluster/core/cluster.go index ba9030f6f..eb146ba77 100644 --- a/pkg/postgresql/cluster/core/cluster.go +++ b/pkg/postgresql/cluster/core/cluster.go @@ -20,10 +20,13 @@ import ( "context" "errors" "fmt" + "sort" + "strings" cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" password "github.com/sethvargo/go-password/password" enterprisev4 "github.com/splunk/splunk-operator/api/v4" + pgcConstants "github.com/splunk/splunk-operator/pkg/postgresql/cluster/core/types/constants" "github.com/splunk/splunk-operator/pkg/postgresql/shared/ports" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -67,8 +70,36 @@ func PostgresClusterService(ctx context.Context, rc *ReconcileContext, req ctrl. logger = logger.WithValues("postgresCluster", postgresCluster.Name) ctx = log.IntoContext(ctx, logger) + currentPhase := func() string { + if postgresCluster.Status.Phase == nil { + return "" + } + return *postgresCluster.Status.Phase + } + updateStatus := func(conditionType conditionTypes, status metav1.ConditionStatus, reason conditionReasons, message string, phase reconcileClusterPhases) error { - return setStatus(ctx, c, rc.Metrics, postgresCluster, conditionType, status, reason, message, phase) + oldPhase := currentPhase() + if err := setStatus(ctx, c, rc.Metrics, postgresCluster, conditionType, status, reason, message, phase); err != nil { + return err + } + rc.emitClusterPhaseTransition(postgresCluster, oldPhase, currentPhase()) + return nil + } + updateComponentHealthStatus := func(health componentHealth) error { + oldPhase := currentPhase() + if err := setStatusFromHealth(ctx, c, rc.Metrics, postgresCluster, health); err != nil { + return err + } + rc.emitClusterPhaseTransition(postgresCluster, oldPhase, currentPhase()) + return nil + } + updatePhaseStatus := func(phase reconcileClusterPhases) error { + oldPhase := currentPhase() + if err := setPhaseStatus(ctx, c, postgresCluster, phase); err != nil { + return err + } + rc.emitClusterPhaseTransition(postgresCluster, oldPhase, currentPhase()) + return nil } // Finalizer handling must come before any other processing. @@ -128,302 +159,1111 @@ func PostgresClusterService(ctx context.Context, rc *ReconcileContext, req ctrl. logger.Info("Superuser secret name derived", "name", postgresSecretName) } - secretExists, secretErr := clusterSecretExists(ctx, c, postgresCluster.Namespace, postgresSecretName, secret) - if secretErr != nil { - logger.Error(secretErr, "Failed to check if PostgresCluster secret exists", "name", postgresSecretName) - rc.emitWarning(postgresCluster, EventSecretReconcileFailed, fmt.Sprintf("Failed to check secret existence: %v", secretErr)) - statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonUserSecretFailed, - fmt.Sprintf("Failed to check secret existence: %v", secretErr), failedClusterPhase) - return ctrl.Result{}, errors.Join(secretErr, statusErr) + poolerEnabled = mergedConfig.Spec.ConnectionPoolerEnabled != nil && *mergedConfig.Spec.ConnectionPoolerEnabled + poolerConfigPresent := mergedConfig.CNPG != nil && mergedConfig.CNPG.ConnectionPooler != nil + + secretComponent := newSecretModel(c, rc.Scheme, rc, updateComponentHealthStatus, postgresCluster, postgresSecretName) + clusterComponent := newProvisionerModel(c, rc.Scheme, rc, updateComponentHealthStatus, postgresCluster, mergedConfig, postgresSecretName) + + bootstrapComponents := []component{ + secretComponent, + clusterComponent, } - if !secretExists { - logger.Info("Superuser secret creation started", "name", postgresSecretName) - if err := ensureClusterSecret(ctx, c, rc.Scheme, postgresCluster, postgresSecretName, secret); err != nil { - logger.Error(err, "Failed to ensure PostgresCluster secret", "name", postgresSecretName) - rc.emitWarning(postgresCluster, EventSecretReconcileFailed, fmt.Sprintf("Failed to generate cluster secret: %v", err)) - statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonUserSecretFailed, - fmt.Sprintf("Failed to generate PostgresCluster secret: %v", err), failedClusterPhase) - return ctrl.Result{}, errors.Join(err, statusErr) + + phase := func(component component) (ctrl.Result, error) { + componentLogger := logger.WithValues("component", component.Name()) + gate, gateErr := component.EvaluatePrerequisites(ctx) + if gateErr != nil { + if isTransientError(gateErr) { + componentLogger.Error(gateErr, "Component prerequisite transient error, requeueing", "step", "prerequisites") + return transientResult(gateErr), nil + } + componentLogger.Error(gateErr, "Component prerequisite evaluation failed", "step", "prerequisites") + return ctrl.Result{}, fmt.Errorf("%s prerequisites: %w", component.Name(), gateErr) } - if err := c.Status().Update(ctx, postgresCluster); err != nil { - logger.Error(err, "Failed to update status after secret creation") - return ctrl.Result{}, err + if !gate.Allowed { + componentLogger.Info("Component blocked by prerequisites", + "step", "prerequisites", + "condition", gate.Health.Condition, + "reason", gate.Health.Reason, + "phase", gate.Health.Phase, + "requeueAfter", gate.Health.Result.RequeueAfter) + health, err := component.Converge(ctx) + if err != nil && isTransientError(err) { + return transientResult(err), nil + } + if err != nil { + componentLogger.Error(err, "Blocked component convergence failed", "step", "converge") + return health.Result, fmt.Errorf("%s converge (blocked): %w", component.Name(), err) + } + return health.Result, nil } - rc.emitNormal(postgresCluster, EventSecretReady, fmt.Sprintf("Superuser secret %s created", postgresSecretName)) - logger.Info("Superuser secret ref persisted to status") - } - // Re-attach ownerRef if it was stripped (e.g. by a Retain-policy deletion of a previous cluster). - hasOwnerRef, ownerRefErr := controllerutil.HasOwnerReference(secret.GetOwnerReferences(), postgresCluster, rc.Scheme) - if ownerRefErr != nil { - logger.Error(ownerRefErr, "Failed to check owner reference on Secret") - return ctrl.Result{}, fmt.Errorf("failed to check owner reference on secret: %w", ownerRefErr) + if err := component.Actuate(ctx); err != nil { + if isTransientError(err) { + componentLogger.Error(err, "Component actuation transient error, requeueing", "step", "actuate") + return transientResult(err), nil + } + componentLogger.Error(err, "Component actuation failed", "step", "actuate") + return ctrl.Result{}, fmt.Errorf("%s actuate: %w", component.Name(), err) + } + componentLogger.Info("Component actuation completed", "step", "actuate") + + health, err := component.Converge(ctx) + if err != nil && isTransientError(err) { + componentLogger.Error(err, "Component convergence transient error, requeueing", "step", "converge") + return transientResult(err), nil + } + + if err != nil { + componentLogger.Error(err, "Component convergence failed", + "step", "converge", + "condition", health.Condition, + "reason", health.Reason, + "phase", health.Phase) + return health.Result, fmt.Errorf("%s converge: %w", component.Name(), err) + } + if isIntermediateState(health.State) { + componentLogger.Info("Component convergence pending", + "step", "converge", + "condition", health.Condition, + "reason", health.Reason, + "phase", health.Phase, + "requeueAfter", health.Result.RequeueAfter) + return health.Result, nil + } + componentLogger.Info("Component convergence ready", + "step", "converge", + "condition", health.Condition, + "reason", health.Reason, + "phase", health.Phase) + if health.Result != (ctrl.Result{}) { + componentLogger.Info("Component requested explicit result", + "step", "converge", + "requeueAfter", health.Result.RequeueAfter) + return health.Result, nil + } + return ctrl.Result{}, nil } - if secretExists && !hasOwnerRef { - logger.Info("Existing secret linked to PostgresCluster", "name", postgresSecretName) - rc.emitNormal(postgresCluster, EventClusterAdopted, fmt.Sprintf("Adopted existing CNPG cluster and secret %s", postgresSecretName)) - originalSecret := secret.DeepCopy() - if err := ctrl.SetControllerReference(postgresCluster, secret, rc.Scheme); err != nil { - return ctrl.Result{}, fmt.Errorf("failed to set controller reference on existing secret: %w", err) + + for _, component := range bootstrapComponents { + result, err := phase(component) + if err != nil { + return result, err } - if err := patchObject(ctx, c, originalSecret, secret, "Secret"); err != nil { - logger.Error(err, "Failed to patch existing secret with controller reference") - rc.emitWarning(postgresCluster, EventSecretReconcileFailed, fmt.Sprintf("Failed to patch existing secret: %v", err)) - statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonSuperUserSecretFailed, - fmt.Sprintf("Failed to patch existing secret: %v", err), failedClusterPhase) - return ctrl.Result{}, errors.Join(err, statusErr) + if result != (ctrl.Result{}) { + return result, nil } } - if postgresCluster.Status.Resources.SuperUserSecretRef == nil { - postgresCluster.Status.Resources.SuperUserSecretRef = &corev1.SecretKeySelector{ - LocalObjectReference: corev1.LocalObjectReference{Name: postgresSecretName}, - Key: secretKeyPassword, + cnpgCluster = clusterComponent.cnpgCluster + runtimeView := clusterRuntimeViewAdapter{model: clusterComponent} + runtimeComponents := []component{ + newManagedRolesModel(c, rc.Scheme, rc, updateComponentHealthStatus, runtimeView, postgresCluster, postgresSecretName), + newPoolerModel(c, rc.Scheme, rc, updateComponentHealthStatus, postgresCluster, mergedConfig, cnpgCluster, poolerEnabled, poolerConfigPresent), + newConfigMapModel(c, rc.Scheme, rc, updateComponentHealthStatus, runtimeView, postgresCluster, postgresSecretName), + } + + for _, component := range runtimeComponents { + result, err := phase(component) + if err != nil { + return result, err + } + if result != (ctrl.Result{}) { + return result, nil } } + logger.Info("Reconciliation complete") + if err := updatePhaseStatus(readyClusterPhase); err != nil { + if apierrors.IsConflict(err) { + return ctrl.Result{Requeue: true}, nil + } + return ctrl.Result{}, err + } + return ctrl.Result{}, nil +} + +func isTransientError(err error) bool { + return apierrors.IsConflict(err) || + apierrors.IsServerTimeout(err) || + apierrors.IsTooManyRequests(err) || + apierrors.IsTimeout(err) +} + +func transientResult(err error) ctrl.Result { + if apierrors.IsConflict(err) { + return ctrl.Result{Requeue: true} + } + return ctrl.Result{RequeueAfter: retryDelay} +} + +func writeComponentStatus(updateStatus healthStatusUpdater, health componentHealth) error { + if updateStatus == nil { + return nil + } + return updateStatus(health) +} + +// types/dto candidate +type componentHealth struct { + State pgcConstants.State + Condition conditionTypes + Reason conditionReasons + Message string + Phase reconcileClusterPhases + Result ctrl.Result +} + +type component interface { + Actuate(ctx context.Context) error + Converge(ctx context.Context) (componentHealth, error) + EvaluatePrerequisites(ctx context.Context) (prerequisiteDecision, error) + Name() string +} + +type prerequisiteDecision struct { + Allowed bool + Health componentHealth +} + +type healthStatusUpdater func(health componentHealth) error - // Build desired CNPG Cluster spec. - desiredSpec := buildCNPGClusterSpec(mergedConfig, postgresSecretName) +type eventEmitter interface { + emitNormal(obj client.Object, reason, message string) + emitWarning(obj client.Object, reason, message string) +} + +type poolerEmitter interface { + eventEmitter + emitPoolerReadyTransition(obj client.Object, conditions []metav1.Condition) +} + +type clusterRuntimeView interface { + Cluster() *cnpgv1.Cluster + IsHealthy() bool +} + +type clusterRuntimeViewAdapter struct { + model *provisionerModel +} + +func (v clusterRuntimeViewAdapter) Cluster() *cnpgv1.Cluster { + return v.model.cnpgCluster +} + +func (v clusterRuntimeViewAdapter) IsHealthy() bool { + return v.model.cnpgCluster != nil && v.model.cnpgCluster.Status.Phase == cnpgv1.PhaseHealthy +} - // Fetch existing CNPG Cluster or create it. +type provisionerModel struct { + client client.Client + scheme *runtime.Scheme + events eventEmitter + updateStatus healthStatusUpdater + cluster *enterprisev4.PostgresCluster + mergedConfig *MergedConfig + secretName string + cnpgCluster *cnpgv1.Cluster + cnpgCreated bool + cnpgPatched bool + + health componentHealth +} + +func newProvisionerModel(c client.Client, scheme *runtime.Scheme, events eventEmitter, updateStatus healthStatusUpdater, cluster *enterprisev4.PostgresCluster, mergedConfig *MergedConfig, secretName string) *provisionerModel { + return &provisionerModel{client: c, scheme: scheme, events: events, updateStatus: updateStatus, cluster: cluster, mergedConfig: mergedConfig, secretName: secretName} +} + +func (p *provisionerModel) Name() string { return pgcConstants.ComponentProvisioner } + +func (p *provisionerModel) EvaluatePrerequisites(_ context.Context) (prerequisiteDecision, error) { + if health, missing := p.getHealthOnMissingSecretRef(); missing { + return prerequisiteDecision{ + Allowed: false, + Health: health, + }, nil + } + return prerequisiteDecision{Allowed: true}, nil +} + +func (p *provisionerModel) Actuate(ctx context.Context) error { + p.cnpgCreated = false + p.cnpgPatched = false + + desiredSpec := buildCNPGClusterSpec(p.mergedConfig, p.secretName) existingCNPG := &cnpgv1.Cluster{} - err = c.Get(ctx, types.NamespacedName{Name: postgresCluster.Name, Namespace: postgresCluster.Namespace}, existingCNPG) + err := p.client.Get(ctx, types.NamespacedName{Name: p.cluster.Name, Namespace: p.cluster.Namespace}, existingCNPG) switch { case apierrors.IsNotFound(err): - logger.Info("CNPG Cluster creation started", "name", postgresCluster.Name) - newCluster, err := buildCNPGCluster(rc.Scheme, postgresCluster, mergedConfig, postgresSecretName) + newCluster, err := buildCNPGCluster(p.scheme, p.cluster, p.mergedConfig, p.secretName) if err != nil { - logger.Error(err, "Failed to build CNPG Cluster", "name", postgresCluster.Name) - return ctrl.Result{}, err - } - if err := c.Create(ctx, newCluster); err != nil { - logger.Error(err, "Failed to create CNPG Cluster") - rc.emitWarning(postgresCluster, EventClusterCreateFailed, fmt.Sprintf("Failed to create CNPG cluster: %v", err)) - statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterBuildFailed, - fmt.Sprintf("Failed to create CNPG Cluster: %v", err), failedClusterPhase) - return ctrl.Result{}, errors.Join(err, statusErr) - } - rc.emitNormal(postgresCluster, EventClusterCreationStarted, "CNPG cluster created, waiting for healthy state") - if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterBuildSucceeded, - "CNPG Cluster created", pendingClusterPhase); statusErr != nil { - return ctrl.Result{}, statusErr - } - logger.Info("CNPG Cluster created, requeueing for status update", "name", postgresCluster.Name) - return ctrl.Result{RequeueAfter: retryDelay}, nil + return fmt.Errorf("failed to build CNPG Cluster: %w", err) + } + if createErr := p.client.Create(ctx, newCluster); createErr != nil { + p.events.emitWarning(p.cluster, EventClusterCreateFailed, fmt.Sprintf("Failed to create CNPG cluster: %v", createErr)) + return createErr + } + p.events.emitNormal(p.cluster, EventClusterCreationStarted, "CNPG cluster created, waiting for healthy state") + p.cnpgCluster = newCluster + p.cnpgCreated = true case err != nil: - logger.Error(err, "Failed to get CNPG Cluster") - statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterGetFailed, - fmt.Sprintf("Failed to get CNPG Cluster: %v", err), failedClusterPhase) - return ctrl.Result{}, errors.Join(err, statusErr) + return err + default: + p.cnpgCluster = existingCNPG + currentNormalized := normalizeCNPGClusterSpec(p.cnpgCluster.Spec, p.mergedConfig.Spec.PostgreSQLConfig) + desiredNormalized := normalizeCNPGClusterSpec(desiredSpec, p.mergedConfig.Spec.PostgreSQLConfig) + if !equality.Semantic.DeepEqual(currentNormalized, desiredNormalized) { + originalCluster := p.cnpgCluster.DeepCopy() + p.cnpgCluster.Spec = desiredSpec + if patchErr := patchObject(ctx, p.client, originalCluster, p.cnpgCluster, "CNPGCluster"); patchErr != nil { + p.events.emitWarning(p.cluster, EventClusterUpdateFailed, fmt.Sprintf("Failed to patch CNPG cluster: %v", patchErr)) + return patchErr + } + p.events.emitNormal(p.cluster, EventClusterUpdateStarted, "CNPG cluster spec updated, waiting for healthy state") + p.cnpgPatched = true + } } - // Patch CNPG Cluster spec if drift detected. - cnpgCluster = existingCNPG - currentNormalized := normalizeCNPGClusterSpec(cnpgCluster.Spec, mergedConfig.Spec.PostgreSQLConfig) - desiredNormalized := normalizeCNPGClusterSpec(desiredSpec, mergedConfig.Spec.PostgreSQLConfig) - - if !equality.Semantic.DeepEqual(currentNormalized, desiredNormalized) { - logger.Info("CNPG Cluster spec drift detected, patch started", "name", cnpgCluster.Name) - originalCluster := cnpgCluster.DeepCopy() - cnpgCluster.Spec = desiredSpec - - switch patchErr := patchObject(ctx, c, originalCluster, cnpgCluster, "CNPGCluster"); { - case patchErr != nil: - logger.Error(patchErr, "Failed to patch CNPG Cluster", "name", cnpgCluster.Name) - rc.emitWarning(postgresCluster, EventClusterUpdateFailed, fmt.Sprintf("Failed to patch CNPG cluster: %v", patchErr)) - statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterPatchFailed, - fmt.Sprintf("Failed to patch CNPG Cluster: %v", patchErr), failedClusterPhase) - return ctrl.Result{}, errors.Join(patchErr, statusErr) - default: - if statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonClusterBuildSucceeded, - "CNPG Cluster spec updated, waiting for healthy state", provisioningClusterPhase); statusErr != nil { - return ctrl.Result{}, statusErr + if p.cnpgCluster != nil { + p.cluster.Status.ProvisionerRef = &corev1.ObjectReference{ + APIVersion: "postgresql.cnpg.io/v1", + Kind: "Cluster", + Namespace: p.cnpgCluster.Namespace, + Name: p.cnpgCluster.Name, + UID: p.cnpgCluster.UID, + } + } + return nil +} + +func (p *provisionerModel) Converge(_ context.Context) (health componentHealth, err error) { + p.health.Condition = clusterReady + defer func() { + statusErr := writeComponentStatus(p.updateStatus, p.health) + if statusErr != nil { + if err != nil { + err = errors.Join(err, statusErr) + } else { + err = statusErr } - rc.emitNormal(postgresCluster, EventClusterUpdateStarted, "CNPG cluster spec updated, waiting for healthy state") - logger.Info("CNPG Cluster patched, requeueing for status update", "name", cnpgCluster.Name) - return ctrl.Result{RequeueAfter: retryDelay}, nil } + health = p.health + }() + + if missingHealth, missing := p.getHealthOnMissingSecretRef(); missing { + p.health = missingHealth + return p.health, nil } - // Reconcile ManagedRoles. - if err := reconcileManagedRoles(ctx, c, postgresCluster, cnpgCluster); err != nil { - logger.Error(err, "Failed to reconcile managed roles") - rc.emitWarning(postgresCluster, EventManagedRolesFailed, fmt.Sprintf("Failed to reconcile managed roles: %v", err)) - statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonManagedRolesFailed, - fmt.Sprintf("Failed to reconcile managed roles: %v", err), failedClusterPhase) - return ctrl.Result{}, errors.Join(err, statusErr) + if p.cnpgCluster == nil { + p.health.State = pgcConstants.Pending + p.health.Reason = reasonCNPGProvisioning + p.health.Message = msgCNPGPendingCreation + p.health.Phase = pendingClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil } - // Reconcile Connection Pooler. - poolerEnabled = mergedConfig.Spec.ConnectionPoolerEnabled != nil && *mergedConfig.Spec.ConnectionPoolerEnabled + if p.cnpgCreated { + p.health.State = pgcConstants.Pending + p.health.Reason = reasonCNPGProvisioning + p.health.Message = msgCNPGPendingCreation + p.health.Phase = pendingClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil + } + + if p.cnpgPatched { + p.health.State = pgcConstants.Provisioning + p.health.Reason = reasonCNPGProvisioning + p.health.Message = fmt.Sprintf(msgFmtCNPGClusterPhase, p.cnpgCluster.Status.Phase) + p.health.Phase = provisioningClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil + } + + switch p.cnpgCluster.Status.Phase { + case cnpgv1.PhaseHealthy: + p.health.State = pgcConstants.Ready + p.health.Reason = reasonCNPGClusterHealthy + p.health.Message = msgProvisionerHealthy + p.health.Phase = readyClusterPhase + p.health.Result = ctrl.Result{} + return p.health, nil + case cnpgv1.PhaseFirstPrimary, cnpgv1.PhaseCreatingReplica, cnpgv1.PhaseWaitingForInstancesToBeActive: + p.health.State = pgcConstants.Provisioning + p.health.Reason = reasonCNPGProvisioning + p.health.Message = fmt.Sprintf(msgFmtCNPGProvisioning, p.cnpgCluster.Status.Phase) + p.health.Phase = provisioningClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil + case cnpgv1.PhaseSwitchover: + p.health.State = pgcConstants.Configuring + p.health.Reason = reasonCNPGSwitchover + p.health.Message = msgCNPGSwitchover + p.health.Phase = configuringClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil + case cnpgv1.PhaseFailOver: + p.health.State = pgcConstants.Configuring + p.health.Reason = reasonCNPGFailingOver + p.health.Message = msgCNPGFailingOver + p.health.Phase = configuringClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil + case cnpgv1.PhaseInplacePrimaryRestart, cnpgv1.PhaseInplaceDeletePrimaryRestart: + p.health.State = pgcConstants.Configuring + p.health.Reason = reasonCNPGRestarting + p.health.Message = fmt.Sprintf(msgFmtCNPGRestarting, p.cnpgCluster.Status.Phase) + p.health.Phase = configuringClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil + case cnpgv1.PhaseUpgrade, cnpgv1.PhaseMajorUpgrade, cnpgv1.PhaseUpgradeDelayed, cnpgv1.PhaseOnlineUpgrading: + p.health.State = pgcConstants.Configuring + p.health.Reason = reasonCNPGUpgrading + p.health.Message = fmt.Sprintf(msgFmtCNPGUpgrading, p.cnpgCluster.Status.Phase) + p.health.Phase = configuringClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil + case cnpgv1.PhaseApplyingConfiguration: + p.health.State = pgcConstants.Configuring + p.health.Reason = reasonCNPGApplyingConfig + p.health.Message = msgCNPGApplyingConfiguration + p.health.Phase = configuringClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil + case cnpgv1.PhaseReplicaClusterPromotion: + p.health.State = pgcConstants.Configuring + p.health.Reason = reasonCNPGPromoting + p.health.Message = msgCNPGPromoting + p.health.Phase = configuringClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil + case cnpgv1.PhaseWaitingForUser: + p.health.State = pgcConstants.Failed + p.health.Reason = reasonCNPGWaitingForUser + p.health.Message = msgCNPGWaitingForUser + p.health.Phase = failedClusterPhase + p.health.Result = ctrl.Result{} + return p.health, fmt.Errorf("provisioner requires user action") + case cnpgv1.PhaseUnrecoverable: + p.health.State = pgcConstants.Failed + p.health.Reason = reasonCNPGUnrecoverable + p.health.Message = msgCNPGUnrecoverable + p.health.Phase = failedClusterPhase + p.health.Result = ctrl.Result{} + return p.health, fmt.Errorf("provisioner unrecoverable") + case cnpgv1.PhaseCannotCreateClusterObjects: + p.health.State = pgcConstants.Failed + p.health.Reason = reasonCNPGProvisioningFailed + p.health.Message = msgCNPGCannotCreateObjects + p.health.Phase = failedClusterPhase + p.health.Result = ctrl.Result{} + return p.health, fmt.Errorf("provisioner cannot create cluster objects") + case cnpgv1.PhaseUnknownPlugin, cnpgv1.PhaseFailurePlugin: + p.health.State = pgcConstants.Failed + p.health.Reason = reasonCNPGPluginError + p.health.Message = fmt.Sprintf(msgFmtCNPGPluginError, p.cnpgCluster.Status.Phase) + p.health.Phase = failedClusterPhase + p.health.Result = ctrl.Result{} + return p.health, fmt.Errorf("provisioner plugin error") + case cnpgv1.PhaseImageCatalogError, cnpgv1.PhaseArchitectureBinaryMissing: + p.health.State = pgcConstants.Failed + p.health.Reason = reasonCNPGImageError + p.health.Message = fmt.Sprintf(msgFmtCNPGImageError, p.cnpgCluster.Status.Phase) + p.health.Phase = failedClusterPhase + p.health.Result = ctrl.Result{} + return p.health, fmt.Errorf("provisioner image error") + case "": + p.health.State = pgcConstants.Pending + p.health.Reason = reasonCNPGProvisioning + p.health.Message = msgCNPGPendingCreation + p.health.Phase = pendingClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil + default: + p.health.State = pgcConstants.Provisioning + p.health.Reason = reasonCNPGProvisioning + p.health.Message = fmt.Sprintf(msgFmtCNPGClusterPhase, p.cnpgCluster.Status.Phase) + p.health.Phase = provisioningClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil + } +} + +func (p *provisionerModel) getHealthOnMissingSecretRef() (componentHealth, bool) { + if p.cluster.Status.Resources == nil || p.cluster.Status.Resources.SuperUserSecretRef == nil { + return componentHealth{ + State: pgcConstants.Pending, + Condition: clusterReady, + Reason: reasonUserSecretPending, + Message: msgSecretRefNotPublished, + Phase: pendingClusterPhase, + Result: ctrl.Result{RequeueAfter: retryDelay}, + }, true + } + return componentHealth{}, false +} + +type managedRolesModel struct { + client client.Client + scheme *runtime.Scheme + events eventEmitter + updateStatus healthStatusUpdater + runtime clusterRuntimeView + cluster *enterprisev4.PostgresCluster + secret string + + health componentHealth +} + +func newManagedRolesModel(c client.Client, scheme *runtime.Scheme, events eventEmitter, updateStatus healthStatusUpdater, runtime clusterRuntimeView, cluster *enterprisev4.PostgresCluster, secret string) *managedRolesModel { + return &managedRolesModel{client: c, scheme: scheme, events: events, updateStatus: updateStatus, runtime: runtime, cluster: cluster, secret: secret} +} + +func (m *managedRolesModel) Name() string { return pgcConstants.ComponentManagedRoles } - rwPoolerExists, err := poolerExists(ctx, c, postgresCluster, readWriteEndpoint) +func (m *managedRolesModel) runtimeGateHealth() (componentHealth, bool) { + if m.runtime == nil || !m.runtime.IsHealthy() { + return componentHealth{ + State: pgcConstants.Pending, + Condition: managedRolesReady, + Reason: reasonManagedRolesPending, + Message: "Managed roles blocked until CNPG cluster is healthy", + Phase: pendingClusterPhase, + Result: ctrl.Result{RequeueAfter: retryDelay}, + }, true + } + return componentHealth{}, false +} + +func (m *managedRolesModel) EvaluatePrerequisites(_ context.Context) (prerequisiteDecision, error) { + if gateHealth, blocked := m.runtimeGateHealth(); blocked { + return prerequisiteDecision{ + Allowed: false, + Health: gateHealth, + }, nil + } + return prerequisiteDecision{Allowed: true}, nil +} + +func (m *managedRolesModel) Actuate(ctx context.Context) error { + if rolesErr := reconcileManagedRoles(ctx, m.client, m.cluster, m.runtime.Cluster()); rolesErr != nil { + m.events.emitWarning(m.cluster, EventManagedRolesFailed, fmt.Sprintf("Failed to reconcile managed roles: %v", rolesErr)) + m.health.State = pgcConstants.Failed + m.health.Reason = reasonManagedRolesFailed + m.health.Message = fmt.Sprintf("Failed to reconcile managed roles: %v", rolesErr) + m.health.Phase = failedClusterPhase + m.health.Result = ctrl.Result{} + return rolesErr + } + return nil +} + +func (m *managedRolesModel) Converge(ctx context.Context) (health componentHealth, err error) { + _ = ctx + m.health = componentHealth{Condition: managedRolesReady} + defer func() { + statusErr := writeComponentStatus(m.updateStatus, m.health) + if statusErr != nil { + if err != nil { + err = errors.Join(err, statusErr) + } else { + err = statusErr + } + } + health = m.health + }() + + if gateHealth, blocked := m.runtimeGateHealth(); blocked { + m.health = gateHealth + return m.health, nil + } + + syncManagedRolesStatusFromCNPG(m.cluster, m.runtime.Cluster()) + status := m.cluster.Status.ManagedRolesStatus + if status == nil { + m.health.State = pgcConstants.Failed + m.health.Reason = reasonManagedRolesFailed + m.health.Message = "Managed roles status not published yet" + m.health.Phase = failedClusterPhase + m.health.Result = ctrl.Result{RequeueAfter: retryDelay} + m.emitManagedRolesConvergeFailure(m.health.Message) + return m.health, fmt.Errorf("managed roles status not published") + } + + if len(status.Failed) > 0 { + m.health.State = pgcConstants.Failed + m.health.Reason = reasonManagedRolesFailed + m.health.Message = fmt.Sprintf("Managed roles reconciliation failed for %d role(s)", len(status.Failed)) + m.health.Phase = failedClusterPhase + m.health.Result = ctrl.Result{RequeueAfter: retryDelay} + m.emitManagedRolesConvergeFailure(m.health.Message) + return m.health, fmt.Errorf("managed roles have failed entries") + } + + if len(status.Pending) > 0 { + m.health.State = pgcConstants.Pending + m.health.Reason = reasonManagedRolesPending + m.health.Message = fmt.Sprintf("Managed roles pending for %d role(s)", len(status.Pending)) + m.health.Phase = pendingClusterPhase + m.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return m.health, nil + } + + m.health.State = pgcConstants.Ready + m.health.Reason = reasonManagedRolesReady + m.health.Message = "Managed roles are reconciled" + m.health.Phase = readyClusterPhase + m.health.Result = ctrl.Result{} + if !meta.IsStatusConditionTrue(m.cluster.Status.Conditions, string(managedRolesReady)) { + m.events.emitNormal(m.cluster, EventManagedRolesReady, m.health.Message) + } + return m.health, nil +} + +func (m *managedRolesModel) emitManagedRolesConvergeFailure(message string) { + cond := meta.FindStatusCondition(m.cluster.Status.Conditions, string(managedRolesReady)) + if cond != nil && + cond.Status == metav1.ConditionFalse && + cond.Reason == string(reasonManagedRolesFailed) && + cond.Message == message { + return + } + m.events.emitWarning(m.cluster, EventManagedRolesFailed, message) +} + +// TODO: Ports as access to cnpg originated info to decouple. +func syncManagedRolesStatusFromCNPG(cluster *enterprisev4.PostgresCluster, cnpgCluster *cnpgv1.Cluster) { + if cluster == nil || cnpgCluster == nil { + return + } + + expectedRoles := make([]string, 0, len(cluster.Spec.ManagedRoles)) + for _, role := range cluster.Spec.ManagedRoles { + expectedRoles = append(expectedRoles, role.Name) + } + + cnpgStatus := cnpgCluster.Status.ManagedRolesStatus + reconciled := append([]string(nil), cnpgStatus.ByStatus[cnpgv1.RoleStatusReconciled]...) + pending := append([]string(nil), cnpgStatus.ByStatus[cnpgv1.RoleStatusPendingReconciliation]...) + + reconciledSet := make(map[string]struct{}, len(reconciled)) + for _, roleName := range reconciled { + reconciledSet[roleName] = struct{}{} + } + pendingSet := make(map[string]struct{}, len(pending)) + for _, roleName := range pending { + pendingSet[roleName] = struct{}{} + } + + failed := make(map[string]string, len(cnpgStatus.CannotReconcile)) + for roleName, errs := range cnpgStatus.CannotReconcile { + if len(errs) == 0 { + failed[roleName] = "role cannot be reconciled" + continue + } + failed[roleName] = strings.Join(errs, "; ") + } + + for _, roleName := range expectedRoles { + if _, ok := reconciledSet[roleName]; ok { + continue + } + if _, ok := failed[roleName]; ok { + continue + } + if _, ok := pendingSet[roleName]; ok { + continue + } + pending = append(pending, roleName) + } + + sort.Strings(reconciled) + sort.Strings(pending) + if len(failed) == 0 { + failed = nil + } + + cluster.Status.ManagedRolesStatus = &enterprisev4.ManagedRolesStatus{ + Reconciled: reconciled, + Pending: pending, + Failed: failed, + } +} + +type poolerModel struct { + client client.Client + scheme *runtime.Scheme + events poolerEmitter + updateStatus healthStatusUpdater + cluster *enterprisev4.PostgresCluster + mergedConfig *MergedConfig + cnpgCluster *cnpgv1.Cluster + poolerEnabled bool + poolerConfigPresent bool + + health componentHealth +} + +func newPoolerModel(c client.Client, scheme *runtime.Scheme, events poolerEmitter, updateStatus healthStatusUpdater, cluster *enterprisev4.PostgresCluster, mergedConfig *MergedConfig, cnpgCluster *cnpgv1.Cluster, poolerEnabled bool, poolerConfigPresent bool) *poolerModel { + return &poolerModel{ + client: c, + scheme: scheme, + events: events, + updateStatus: updateStatus, + cluster: cluster, + mergedConfig: mergedConfig, + cnpgCluster: cnpgCluster, + poolerEnabled: poolerEnabled, + poolerConfigPresent: poolerConfigPresent, + } +} + +func (p *poolerModel) Name() string { return pgcConstants.ComponentPooler } + +func (p *poolerModel) EvaluatePrerequisites(_ context.Context) (prerequisiteDecision, error) { + if !p.poolerEnabled || !p.poolerConfigPresent { + return prerequisiteDecision{Allowed: true}, nil + } + if p.cnpgCluster == nil { + return prerequisiteDecision{ + Allowed: false, + Health: componentHealth{ + State: pgcConstants.Pending, + Condition: poolerReady, + Reason: reasonCNPGProvisioning, + Message: msgCNPGPendingCreation, + Phase: pendingClusterPhase, + Result: ctrl.Result{RequeueAfter: retryDelay}, + }, + }, nil + } + if p.cnpgCluster.Status.Phase != cnpgv1.PhaseHealthy { + return prerequisiteDecision{ + Allowed: false, + Health: componentHealth{ + State: pgcConstants.Provisioning, + Condition: poolerReady, + Reason: reasonCNPGProvisioning, + Message: fmt.Sprintf(msgFmtCNPGClusterPhase, p.cnpgCluster.Status.Phase), + Phase: provisioningClusterPhase, + Result: ctrl.Result{RequeueAfter: retryDelay}, + }, + }, nil + } + return prerequisiteDecision{Allowed: true}, nil +} + +func (p *poolerModel) Actuate(ctx context.Context) error { + switch { + case !p.poolerEnabled: + if err := deleteConnectionPoolers(ctx, p.client, p.cluster); err != nil { + return err + } + p.cluster.Status.ConnectionPoolerStatus = nil + meta.RemoveStatusCondition(&p.cluster.Status.Conditions, string(poolerReady)) + return nil + case !p.poolerConfigPresent: + return nil + case p.cnpgCluster == nil || p.cnpgCluster.Status.Phase != cnpgv1.PhaseHealthy: + return nil + default: + if err := createOrUpdateConnectionPoolers(ctx, p.client, p.scheme, p.cluster, p.mergedConfig, p.cnpgCluster); err != nil { + p.events.emitWarning(p.cluster, EventPoolerReconcileFailed, fmt.Sprintf("Failed to reconcile connection pooler: %v", err)) + return err + } + p.events.emitNormal(p.cluster, EventPoolerCreationStarted, "Connection poolers created, waiting for readiness") + return nil + } +} + +func (p *poolerModel) Converge(ctx context.Context) (health componentHealth, err error) { + p.health = componentHealth{Condition: poolerReady} + defer func() { + statusErr := writeComponentStatus(p.updateStatus, p.health) + if statusErr != nil { + if err != nil { + err = errors.Join(err, statusErr) + } else { + err = statusErr + } + } + health = p.health + }() + + if !p.poolerEnabled { + p.health.State = pgcConstants.Ready + p.health.Reason = reasonAllInstancesReady + p.health.Message = msgPoolerDisabled + p.health.Phase = readyClusterPhase + p.health.Result = ctrl.Result{} + return p.health, nil + } + if !p.poolerConfigPresent { + p.health.State = pgcConstants.Failed + p.health.Reason = reasonPoolerConfigMissing + p.health.Message = msgPoolerConfigMissing + p.health.Phase = failedClusterPhase + p.health.Result = ctrl.Result{} + return p.health, fmt.Errorf("pooler config missing") + } + if p.cnpgCluster == nil { + p.health.State = pgcConstants.Pending + p.health.Reason = reasonCNPGProvisioning + p.health.Message = msgCNPGPendingCreation + p.health.Phase = pendingClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil + } + if p.cnpgCluster.Status.Phase != cnpgv1.PhaseHealthy { + p.health.State = pgcConstants.Provisioning + p.health.Reason = reasonCNPGProvisioning + p.health.Message = fmt.Sprintf(msgFmtCNPGClusterPhase, p.cnpgCluster.Status.Phase) + p.health.Phase = provisioningClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil + } + + // TODO: Port material. + rwExists, err := poolerExists(ctx, p.client, p.cluster, readWriteEndpoint) if err != nil { - logger.Error(err, "Failed to check RW pooler existence") - statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerReconciliationFailed, - fmt.Sprintf("Failed to check pooler existence: %v", err), failedClusterPhase) - return ctrl.Result{}, errors.Join(err, statusErr) + p.events.emitWarning(p.cluster, EventPoolerReconcileFailed, fmt.Sprintf("Failed to sync pooler status: %v", err)) + p.health.State = pgcConstants.Failed + p.health.Reason = reasonPoolerReconciliationFailed + p.health.Message = fmt.Sprintf("Failed to check RW pooler existence: %v", err) + p.health.Phase = failedClusterPhase + p.health.Result = ctrl.Result{} + return p.health, err + } + roExists, err := poolerExists(ctx, p.client, p.cluster, readOnlyEndpoint) + if err != nil { + p.events.emitWarning(p.cluster, EventPoolerReconcileFailed, fmt.Sprintf("Failed to sync pooler status: %v", err)) + p.health.State = pgcConstants.Failed + p.health.Reason = reasonPoolerReconciliationFailed + p.health.Message = fmt.Sprintf("Failed to check RO pooler existence: %v", err) + p.health.Phase = failedClusterPhase + p.health.Result = ctrl.Result{} + return p.health, err + } + if !rwExists || !roExists { + p.health.State = pgcConstants.Provisioning + p.health.Reason = reasonPoolerCreating + p.health.Message = msgPoolersProvisioning + p.health.Phase = provisioningClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil + } + + rwPooler := &cnpgv1.Pooler{} + if err := p.client.Get(ctx, types.NamespacedName{ + Name: poolerResourceName(p.cluster.Name, readWriteEndpoint), + Namespace: p.cluster.Namespace, + }, rwPooler); err != nil { + p.health.State = pgcConstants.Pending + p.health.Reason = reasonPoolerCreating + p.health.Message = msgWaitRWPoolerObject + p.health.Phase = pendingClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil + } + roPooler := &cnpgv1.Pooler{} + if err := p.client.Get(ctx, types.NamespacedName{ + Name: poolerResourceName(p.cluster.Name, readOnlyEndpoint), + Namespace: p.cluster.Namespace, + }, roPooler); err != nil { + p.health.State = pgcConstants.Pending + p.health.Reason = reasonPoolerCreating + p.health.Message = msgWaitROPoolerObject + p.health.Phase = pendingClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil + } + if !arePoolersReady(rwPooler, roPooler) { + p.health.State = pgcConstants.Pending + p.health.Reason = reasonPoolerCreating + p.health.Message = msgPoolersNotReady + p.health.Phase = pendingClusterPhase + p.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return p.health, nil + } + + p.cluster.Status.ConnectionPoolerStatus = &enterprisev4.ConnectionPoolerStatus{Enabled: true} + p.health.State = pgcConstants.Ready + p.health.Reason = reasonAllInstancesReady + p.health.Message = msgPoolersReady + p.health.Phase = readyClusterPhase + p.health.Result = ctrl.Result{} + p.events.emitPoolerReadyTransition(p.cluster, p.cluster.Status.Conditions) + return p.health, nil +} + +type configMapModel struct { + client client.Client + scheme *runtime.Scheme + events eventEmitter + updateStatus healthStatusUpdater + runtime clusterRuntimeView + cluster *enterprisev4.PostgresCluster + secret string + + health componentHealth +} + +func newConfigMapModel(c client.Client, scheme *runtime.Scheme, events eventEmitter, updateStatus healthStatusUpdater, runtime clusterRuntimeView, cluster *enterprisev4.PostgresCluster, secret string) *configMapModel { + return &configMapModel{client: c, scheme: scheme, events: events, updateStatus: updateStatus, runtime: runtime, cluster: cluster, secret: secret} +} + +func (c *configMapModel) Name() string { return pgcConstants.ComponentConfigMap } + +func (c *configMapModel) EvaluatePrerequisites(_ context.Context) (prerequisiteDecision, error) { + return prerequisiteDecision{Allowed: true}, nil +} + +func (c *configMapModel) Actuate(ctx context.Context) error { + cnpgCluster := c.runtime.Cluster() + if cnpgCluster == nil { + return nil } - roPoolerExists, err := poolerExists(ctx, c, postgresCluster, readOnlyEndpoint) + desiredCM, err := generateConfigMap(ctx, c.client, c.scheme, c.cluster, cnpgCluster, c.secret) if err != nil { - logger.Error(err, "Failed to check RO pooler existence") - statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerReconciliationFailed, - fmt.Sprintf("Failed to check pooler existence: %v", err), failedClusterPhase) - return ctrl.Result{}, errors.Join(err, statusErr) + c.events.emitWarning(c.cluster, EventConfigMapReconcileFailed, fmt.Sprintf("Failed to reconcile ConfigMap: %v", err)) + return err + } + cm := &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: desiredCM.Name, Namespace: desiredCM.Namespace}} + op, err := controllerutil.CreateOrUpdate(ctx, c.client, cm, func() error { + cm.Data = desiredCM.Data + cm.Annotations = desiredCM.Annotations + cm.Labels = desiredCM.Labels + if !metav1.IsControlledBy(cm, c.cluster) { + if setErr := ctrl.SetControllerReference(c.cluster, cm, c.scheme); setErr != nil { + return fmt.Errorf("setting controller reference: %w", setErr) + } + } + return nil + }) + if err != nil { + c.events.emitWarning(c.cluster, EventConfigMapReconcileFailed, fmt.Sprintf("Failed to reconcile ConfigMap: %v", err)) + return err } + if op == controllerutil.OperationResultCreated { + c.events.emitNormal(c.cluster, EventConfigMapReady, fmt.Sprintf("ConfigMap %s created", desiredCM.Name)) + } else if op == controllerutil.OperationResultUpdated { + c.events.emitNormal(c.cluster, EventConfigMapReady, fmt.Sprintf("ConfigMap %s updated", desiredCM.Name)) + } + if c.cluster.Status.Resources.ConfigMapRef == nil { + c.cluster.Status.Resources.ConfigMapRef = &corev1.LocalObjectReference{Name: desiredCM.Name} + } + return nil +} - switch { - case !poolerEnabled: - if err := deleteConnectionPoolers(ctx, c, postgresCluster); err != nil { - logger.Error(err, "Failed to delete connection poolers") - statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerReconciliationFailed, - fmt.Sprintf("Failed to delete connection poolers: %v", err), failedClusterPhase) - return ctrl.Result{}, errors.Join(err, statusErr) - } - postgresCluster.Status.ConnectionPoolerStatus = nil - meta.RemoveStatusCondition(&postgresCluster.Status.Conditions, string(poolerReady)) - - case !rwPoolerExists || !roPoolerExists: - if mergedConfig.CNPG == nil || mergedConfig.CNPG.ConnectionPooler == nil { - logger.Info("Connection pooler enabled but no config found in class or cluster spec, skipping", - "class", postgresCluster.Spec.Class, "cluster", postgresCluster.Name) - statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerConfigMissing, - fmt.Sprintf("Connection pooler is enabled but no config found in class %q or cluster %q", - postgresCluster.Spec.Class, postgresCluster.Name), failedClusterPhase) - return ctrl.Result{}, statusErr - } - if cnpgCluster.Status.Phase != cnpgv1.PhaseHealthy { - logger.Info("CNPG Cluster not healthy yet, pending pooler creation", "clusterPhase", cnpgCluster.Status.Phase) - statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonCNPGClusterNotHealthy, - "Waiting for CNPG cluster to become healthy before creating poolers", pendingClusterPhase) - return ctrl.Result{RequeueAfter: retryDelay}, statusErr - } - if err := createOrUpdateConnectionPoolers(ctx, c, rc.Scheme, postgresCluster, mergedConfig, cnpgCluster); err != nil { - logger.Error(err, "Failed to reconcile connection pooler") - rc.emitWarning(postgresCluster, EventPoolerReconcileFailed, fmt.Sprintf("Failed to reconcile connection pooler: %v", err)) - statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerReconciliationFailed, - fmt.Sprintf("Failed to reconcile connection pooler: %v", err), failedClusterPhase) - return ctrl.Result{}, errors.Join(err, statusErr) - } - rc.emitNormal(postgresCluster, EventPoolerCreationStarted, "Connection poolers created, waiting for readiness") - logger.Info("Connection pooler creation started, requeueing") - if statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerCreating, - "Connection poolers are being provisioned", provisioningClusterPhase); statusErr != nil { - return ctrl.Result{}, statusErr - } - return ctrl.Result{RequeueAfter: retryDelay}, nil - - case func() bool { - rwPooler := &cnpgv1.Pooler{} - rwErr := c.Get(ctx, types.NamespacedName{ - Name: poolerResourceName(postgresCluster.Name, readWriteEndpoint), - Namespace: postgresCluster.Namespace, - }, rwPooler) - roPooler := &cnpgv1.Pooler{} - roErr := c.Get(ctx, types.NamespacedName{ - Name: poolerResourceName(postgresCluster.Name, readOnlyEndpoint), - Namespace: postgresCluster.Namespace, - }, roPooler) - return rwErr != nil || roErr != nil || !arePoolersReady(rwPooler, roPooler) - }(): - logger.Info("Connection Poolers are not ready yet, requeueing") - statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerCreating, - "Connection poolers are being provisioned", pendingClusterPhase) - return ctrl.Result{RequeueAfter: retryDelay}, statusErr +func (c *configMapModel) Converge(ctx context.Context) (health componentHealth, err error) { + c.health = componentHealth{Condition: configMapsReady} + defer func() { + statusErr := writeComponentStatus(c.updateStatus, c.health) + if statusErr != nil { + if err != nil { + err = errors.Join(err, statusErr) + } else { + err = statusErr + } + } + health = c.health + }() + + if c.runtime == nil || !c.runtime.IsHealthy() { + c.health.State = pgcConstants.Provisioning + c.health.Reason = reasonCNPGProvisioning + c.health.Message = msgCNPGPendingCreation + c.health.Phase = provisioningClusterPhase + c.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return c.health, nil + } + + if c.cluster.Status.Resources == nil || c.cluster.Status.Resources.ConfigMapRef == nil { + c.health.State = pgcConstants.Provisioning + c.health.Reason = reasonConfigMapFailed + c.health.Message = msgConfigMapRefNotPublished + c.health.Phase = provisioningClusterPhase + c.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return c.health, nil + } + + cm := &corev1.ConfigMap{} + key := types.NamespacedName{Name: c.cluster.Status.Resources.ConfigMapRef.Name, Namespace: c.cluster.Namespace} + if err := c.client.Get(ctx, key, cm); err != nil { + if apierrors.IsNotFound(err) { + c.health.State = pgcConstants.Provisioning + c.health.Reason = reasonConfigMapFailed + c.health.Message = msgConfigMapNotFoundYet + c.health.Phase = provisioningClusterPhase + c.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return c.health, nil + } + c.health.State = pgcConstants.Failed + c.health.Reason = reasonConfigMapFailed + c.health.Message = fmt.Sprintf("Failed to fetch ConfigMap: %v", err) + c.health.Phase = failedClusterPhase + c.health.Result = ctrl.Result{} + return c.health, err + } + + requiredKeys := []string{ + configKeyClusterRWEndpoint, + configKeyClusterROEndpoint, + configKeyDefaultClusterPort, + configKeySuperUserSecretRef, + } + for _, requiredKey := range requiredKeys { + if _, ok := cm.Data[requiredKey]; !ok { + c.health.State = pgcConstants.Failed + c.health.Reason = reasonConfigMapFailed + c.health.Message = fmt.Sprintf(msgFmtConfigMapMissingRequiredKey, requiredKey) + c.health.Phase = failedClusterPhase + c.health.Result = ctrl.Result{} + return c.health, fmt.Errorf("configmap missing key %s", requiredKey) + } + } - default: - oldConditions := make([]metav1.Condition, len(postgresCluster.Status.Conditions)) - copy(oldConditions, postgresCluster.Status.Conditions) - if err := syncPoolerStatus(ctx, c, rc.Metrics, postgresCluster); err != nil { - logger.Error(err, "Failed to sync pooler status") - rc.emitWarning(postgresCluster, EventPoolerReconcileFailed, fmt.Sprintf("Failed to sync pooler status: %v", err)) - statusErr := updateStatus(poolerReady, metav1.ConditionFalse, reasonPoolerReconciliationFailed, - fmt.Sprintf("Failed to sync pooler status: %v", err), failedClusterPhase) - return ctrl.Result{}, errors.Join(err, statusErr) - } - rc.emitPoolerReadyTransition(postgresCluster, oldConditions) - } - - // Reconcile ConfigMap when CNPG cluster is healthy. - if cnpgCluster.Status.Phase == cnpgv1.PhaseHealthy { - logger.Info("CNPG Cluster healthy, reconciling ConfigMap") - desiredCM, err := generateConfigMap(ctx, c, rc.Scheme, postgresCluster, cnpgCluster, postgresSecretName) - if err != nil { - logger.Error(err, "Failed to generate ConfigMap") - rc.emitWarning(postgresCluster, EventConfigMapReconcileFailed, fmt.Sprintf("Failed to reconcile ConfigMap: %v", err)) - statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonConfigMapFailed, - fmt.Sprintf("Failed to generate ConfigMap: %v", err), failedClusterPhase) - return ctrl.Result{}, errors.Join(err, statusErr) - } - cm := &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: desiredCM.Name, Namespace: desiredCM.Namespace}} - createOrUpdateResult, err := controllerutil.CreateOrUpdate(ctx, c, cm, func() error { - cm.Data = desiredCM.Data - cm.Annotations = desiredCM.Annotations - cm.Labels = desiredCM.Labels - if !metav1.IsControlledBy(cm, postgresCluster) { - if err := ctrl.SetControllerReference(postgresCluster, cm, rc.Scheme); err != nil { - return fmt.Errorf("setting controller reference: %w", err) - } + c.health.State = pgcConstants.Ready + c.health.Reason = reasonConfigMapReady + c.health.Message = msgAccessConfigMapReady + c.health.Phase = readyClusterPhase + c.health.Result = ctrl.Result{} + return c.health, nil +} + +type secretModel struct { + client client.Client + scheme *runtime.Scheme + events eventEmitter + updateStatus healthStatusUpdater + cluster *enterprisev4.PostgresCluster + name string + + health componentHealth +} + +func newSecretModel(c client.Client, scheme *runtime.Scheme, events eventEmitter, updateStatus healthStatusUpdater, cluster *enterprisev4.PostgresCluster, name string) *secretModel { + return &secretModel{client: c, scheme: scheme, events: events, updateStatus: updateStatus, cluster: cluster, name: name} +} + +func (s *secretModel) Name() string { return pgcConstants.ComponentSecret } + +func (s *secretModel) EvaluatePrerequisites(_ context.Context) (prerequisiteDecision, error) { + return prerequisiteDecision{Allowed: true}, nil +} + +func (s *secretModel) Actuate(ctx context.Context) error { + secret := &corev1.Secret{} + secretExists, secretErr := clusterSecretExists(ctx, s.client, s.cluster.Namespace, s.name, secret) + if secretErr != nil { + s.events.emitWarning(s.cluster, EventSecretReconcileFailed, fmt.Sprintf("Failed to check secret existence: %v", secretErr)) + return secretErr + } + if !secretExists { + if err := ensureClusterSecret(ctx, s.client, s.scheme, s.cluster, s.name, secret); err != nil { + s.events.emitWarning(s.cluster, EventSecretReconcileFailed, fmt.Sprintf("Failed to generate cluster secret: %v", err)) + return err + } + s.events.emitNormal(s.cluster, EventSecretReady, fmt.Sprintf("Superuser secret %s created", s.name)) + } + hasOwnerRef, ownerRefErr := controllerutil.HasOwnerReference(secret.GetOwnerReferences(), s.cluster, s.scheme) + if ownerRefErr != nil { + return fmt.Errorf("failed to check owner reference on secret: %w", ownerRefErr) + } + if secretExists && !hasOwnerRef { + originalSecret := secret.DeepCopy() + if err := ctrl.SetControllerReference(s.cluster, secret, s.scheme); err != nil { + return fmt.Errorf("failed to set controller reference on existing secret: %w", err) + } + if err := patchObject(ctx, s.client, originalSecret, secret, "Secret"); err != nil { + s.events.emitWarning(s.cluster, EventSecretReconcileFailed, fmt.Sprintf("Failed to patch existing secret: %v", err)) + return err + } + s.events.emitNormal(s.cluster, EventClusterAdopted, fmt.Sprintf("Adopted existing CNPG cluster and secret %s", s.name)) + } + if s.cluster.Status.Resources.SuperUserSecretRef == nil { + s.cluster.Status.Resources.SuperUserSecretRef = &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: s.name}, + Key: secretKeyPassword, + } + } + return nil +} + +func (s *secretModel) Converge(ctx context.Context) (health componentHealth, err error) { + s.health = componentHealth{Condition: secretsReady} + defer func() { + statusErr := writeComponentStatus(s.updateStatus, s.health) + if statusErr != nil { + if err != nil { + err = errors.Join(err, statusErr) + } else { + err = statusErr } - return nil - }) - if err != nil { - logger.Error(err, "Failed to reconcile ConfigMap", "name", desiredCM.Name) - rc.emitWarning(postgresCluster, EventConfigMapReconcileFailed, fmt.Sprintf("Failed to reconcile ConfigMap: %v", err)) - statusErr := updateStatus(clusterReady, metav1.ConditionFalse, reasonConfigMapFailed, - fmt.Sprintf("Failed to reconcile ConfigMap: %v", err), failedClusterPhase) - return ctrl.Result{}, errors.Join(err, statusErr) - } - switch createOrUpdateResult { - case controllerutil.OperationResultCreated: - rc.emitNormal(postgresCluster, EventConfigMapReady, fmt.Sprintf("ConfigMap %s created", desiredCM.Name)) - logger.Info("ConfigMap created", "name", desiredCM.Name) - case controllerutil.OperationResultUpdated: - rc.emitNormal(postgresCluster, EventConfigMapReady, fmt.Sprintf("ConfigMap %s updated", desiredCM.Name)) - logger.Info("ConfigMap updated", "name", desiredCM.Name) - default: - logger.Info("ConfigMap unchanged", "name", desiredCM.Name) - } - if postgresCluster.Status.Resources.ConfigMapRef == nil { - postgresCluster.Status.Resources.ConfigMapRef = &corev1.LocalObjectReference{Name: desiredCM.Name} - } - } - - // Final status sync. - var oldPhase string - if postgresCluster.Status.Phase != nil { - oldPhase = *postgresCluster.Status.Phase - } - if err := syncStatus(ctx, c, rc.Metrics, postgresCluster, cnpgCluster); err != nil { - logger.Error(err, "Failed to sync status") - return ctrl.Result{}, err + } + health = s.health + }() + + if s.cluster.Status.Resources == nil || s.cluster.Status.Resources.SuperUserSecretRef == nil { + s.health.State = pgcConstants.Provisioning + s.health.Reason = reasonUserSecretPending + s.health.Message = msgSecretRefNotPublished + s.health.Phase = provisioningClusterPhase + s.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return s.health, nil } - var newPhase string - if postgresCluster.Status.Phase != nil { - newPhase = *postgresCluster.Status.Phase - } - rc.emitClusterPhaseTransition(postgresCluster, oldPhase, newPhase) - if cnpgCluster.Status.Phase == cnpgv1.PhaseHealthy { - rwPooler := &cnpgv1.Pooler{} - rwErr := c.Get(ctx, types.NamespacedName{ - Name: poolerResourceName(postgresCluster.Name, readWriteEndpoint), - Namespace: postgresCluster.Namespace, - }, rwPooler) - roPooler := &cnpgv1.Pooler{} - roErr := c.Get(ctx, types.NamespacedName{ - Name: poolerResourceName(postgresCluster.Name, readOnlyEndpoint), - Namespace: postgresCluster.Namespace, - }, roPooler) - if rwErr == nil && roErr == nil && arePoolersReady(rwPooler, roPooler) { - logger.Info("Poolers ready, syncing status") - poolerOldConditions := make([]metav1.Condition, len(postgresCluster.Status.Conditions)) - copy(poolerOldConditions, postgresCluster.Status.Conditions) - _ = syncPoolerStatus(ctx, c, rc.Metrics, postgresCluster) - rc.emitPoolerReadyTransition(postgresCluster, poolerOldConditions) + + secret := &corev1.Secret{} + key := types.NamespacedName{Name: s.cluster.Status.Resources.SuperUserSecretRef.Name, Namespace: s.cluster.Namespace} + if err := s.client.Get(ctx, key, secret); err != nil { + if apierrors.IsNotFound(err) { + s.health.State = pgcConstants.Provisioning + s.health.Reason = reasonUserSecretPending + s.health.Message = msgSecretNotFoundYet + s.health.Phase = provisioningClusterPhase + s.health.Result = ctrl.Result{RequeueAfter: retryDelay} + return s.health, nil } + s.health.State = pgcConstants.Failed + s.health.Reason = reasonUserSecretFailed + s.health.Message = fmt.Sprintf("Failed to fetch superuser secret: %v", err) + s.health.Phase = failedClusterPhase + s.health.Result = ctrl.Result{} + return s.health, err + } + + refKey := s.cluster.Status.Resources.SuperUserSecretRef.Key + if refKey == "" { + refKey = secretKeyPassword + } + if _, ok := secret.Data[refKey]; !ok { + s.health.State = pgcConstants.Failed + s.health.Reason = reasonSuperUserSecretFailed + s.health.Message = fmt.Sprintf(msgFmtSecretMissingKey, refKey) + s.health.Phase = failedClusterPhase + s.health.Result = ctrl.Result{} + return s.health, fmt.Errorf("secret missing key %s", refKey) + } + + s.health.State = pgcConstants.Ready + s.health.Reason = reasonSuperUserSecretReady + s.health.Message = msgSuperuserSecretReady + s.health.Phase = readyClusterPhase + s.health.Result = ctrl.Result{} + return s.health, nil +} + +func isIntermediateState(state pgcConstants.State) bool { + switch state { + case pgcConstants.Pending, + pgcConstants.Provisioning, + pgcConstants.Configuring: + return true + default: + return false } - logger.Info("Reconciliation complete") - return ctrl.Result{}, nil } // getMergedConfig overlays PostgresCluster spec on top of the class defaults. @@ -615,14 +1455,6 @@ func isPoolerReady(pooler *cnpgv1.Pooler) bool { return pooler.Status.Instances >= desired } -func poolerInstanceCount(p *cnpgv1.Pooler) (desired, scheduled int32) { - desired = 1 - if p.Spec.Instances != nil { - desired = *p.Spec.Instances - } - return desired, p.Status.Instances -} - // createOrUpdateConnectionPoolers creates RW and RO poolers if they don't exist. func createOrUpdateConnectionPoolers(ctx context.Context, c client.Client, scheme *runtime.Scheme, cluster *enterprisev4.PostgresCluster, cfg *MergedConfig, cnpgCluster *cnpgv1.Cluster) error { if err := createConnectionPooler(ctx, c, scheme, cluster, cfg, cnpgCluster, readWriteEndpoint); err != nil { @@ -702,90 +1534,6 @@ func deleteConnectionPoolers(ctx context.Context, c client.Client, cluster *ente return nil } -// syncPoolerStatus populates ConnectionPoolerStatus and the PoolerReady condition. -func syncPoolerStatus(ctx context.Context, c client.Client, metrics ports.Recorder, cluster *enterprisev4.PostgresCluster) error { - rwPooler := &cnpgv1.Pooler{} - if err := c.Get(ctx, types.NamespacedName{ - Name: poolerResourceName(cluster.Name, readWriteEndpoint), - Namespace: cluster.Namespace, - }, rwPooler); err != nil { - return err - } - - roPooler := &cnpgv1.Pooler{} - if err := c.Get(ctx, types.NamespacedName{ - Name: poolerResourceName(cluster.Name, readOnlyEndpoint), - Namespace: cluster.Namespace, - }, roPooler); err != nil { - return err - } - - cluster.Status.ConnectionPoolerStatus = &enterprisev4.ConnectionPoolerStatus{Enabled: true} - rwDesired, rwScheduled := poolerInstanceCount(rwPooler) - roDesired, roScheduled := poolerInstanceCount(roPooler) - - return setStatus(ctx, c, metrics, cluster, poolerReady, metav1.ConditionTrue, reasonAllInstancesReady, - fmt.Sprintf("%s: %d/%d, %s: %d/%d", readWriteEndpoint, rwScheduled, rwDesired, readOnlyEndpoint, roScheduled, roDesired), - readyClusterPhase) -} - -// syncStatus maps CNPG Cluster state to PostgresCluster status. -func syncStatus(ctx context.Context, c client.Client, metrics ports.Recorder, cluster *enterprisev4.PostgresCluster, cnpgCluster *cnpgv1.Cluster) error { - cluster.Status.ProvisionerRef = &corev1.ObjectReference{ - APIVersion: "postgresql.cnpg.io/v1", - Kind: "Cluster", - Namespace: cnpgCluster.Namespace, - Name: cnpgCluster.Name, - UID: cnpgCluster.UID, - } - - var phase reconcileClusterPhases - var condStatus metav1.ConditionStatus - var reason conditionReasons - var message string - - switch cnpgCluster.Status.Phase { - case cnpgv1.PhaseHealthy: - phase, condStatus, reason, message = readyClusterPhase, metav1.ConditionTrue, reasonCNPGClusterHealthy, "Cluster is up and running" - case cnpgv1.PhaseFirstPrimary, cnpgv1.PhaseCreatingReplica, cnpgv1.PhaseWaitingForInstancesToBeActive: - phase, condStatus, reason = provisioningClusterPhase, metav1.ConditionFalse, reasonCNPGProvisioning - message = fmt.Sprintf("CNPG cluster provisioning: %s", cnpgCluster.Status.Phase) - case cnpgv1.PhaseSwitchover: - phase, condStatus, reason, message = configuringClusterPhase, metav1.ConditionFalse, reasonCNPGSwitchover, "Cluster changing primary node" - case cnpgv1.PhaseFailOver: - phase, condStatus, reason, message = configuringClusterPhase, metav1.ConditionFalse, reasonCNPGFailingOver, "Pod missing, need to change primary" - case cnpgv1.PhaseInplacePrimaryRestart, cnpgv1.PhaseInplaceDeletePrimaryRestart: - phase, condStatus, reason = configuringClusterPhase, metav1.ConditionFalse, reasonCNPGRestarting - message = fmt.Sprintf("CNPG cluster restarting: %s", cnpgCluster.Status.Phase) - case cnpgv1.PhaseUpgrade, cnpgv1.PhaseMajorUpgrade, cnpgv1.PhaseUpgradeDelayed, cnpgv1.PhaseOnlineUpgrading: - phase, condStatus, reason = configuringClusterPhase, metav1.ConditionFalse, reasonCNPGUpgrading - message = fmt.Sprintf("CNPG cluster upgrading: %s", cnpgCluster.Status.Phase) - case cnpgv1.PhaseApplyingConfiguration: - phase, condStatus, reason, message = configuringClusterPhase, metav1.ConditionFalse, reasonCNPGApplyingConfig, "Configuration change is being applied" - case cnpgv1.PhaseReplicaClusterPromotion: - phase, condStatus, reason, message = configuringClusterPhase, metav1.ConditionFalse, reasonCNPGPromoting, "Replica is being promoted to primary" - case cnpgv1.PhaseWaitingForUser: - phase, condStatus, reason, message = failedClusterPhase, metav1.ConditionFalse, reasonCNPGWaitingForUser, "Action from the user is required" - case cnpgv1.PhaseUnrecoverable: - phase, condStatus, reason, message = failedClusterPhase, metav1.ConditionFalse, reasonCNPGUnrecoverable, "Cluster failed, needs manual intervention" - case cnpgv1.PhaseCannotCreateClusterObjects: - phase, condStatus, reason, message = failedClusterPhase, metav1.ConditionFalse, reasonCNPGProvisioningFailed, "Cluster resources cannot be created" - case cnpgv1.PhaseUnknownPlugin, cnpgv1.PhaseFailurePlugin: - phase, condStatus, reason = failedClusterPhase, metav1.ConditionFalse, reasonCNPGPluginError - message = fmt.Sprintf("CNPG plugin error: %s", cnpgCluster.Status.Phase) - case cnpgv1.PhaseImageCatalogError, cnpgv1.PhaseArchitectureBinaryMissing: - phase, condStatus, reason = failedClusterPhase, metav1.ConditionFalse, reasonCNPGImageError - message = fmt.Sprintf("CNPG image error: %s", cnpgCluster.Status.Phase) - case "": - phase, condStatus, reason, message = pendingClusterPhase, metav1.ConditionFalse, reasonCNPGProvisioning, "CNPG cluster is pending creation" - default: - phase, condStatus, reason = provisioningClusterPhase, metav1.ConditionFalse, reasonCNPGProvisioning - message = fmt.Sprintf("CNPG cluster phase: %s", cnpgCluster.Status.Phase) - } - - return setStatus(ctx, c, metrics, cluster, clusterReady, condStatus, reason, message, phase) -} - // setStatus sets the phase, condition and persists the status. // It skips the API write when the resulting status is identical to the current // state, avoiding unnecessary etcd churn and ResourceVersion bumps on stable clusters. @@ -806,7 +1554,9 @@ func setStatus(ctx context.Context, c client.Client, metrics ports.Recorder, clu return nil } - metrics.IncStatusTransition(ports.ControllerCluster, string(condType), string(status), string(reason)) + if metrics != nil { + metrics.IncStatusTransition(ports.ControllerCluster, string(condType), string(status), string(reason)) + } if err := c.Status().Update(ctx, cluster); err != nil { return fmt.Errorf("failed to update PostgresCluster status: %w", err) @@ -814,6 +1564,27 @@ func setStatus(ctx context.Context, c client.Client, metrics ports.Recorder, clu return nil } +func setStatusFromHealth(ctx context.Context, c client.Client, metrics ports.Recorder, cluster *enterprisev4.PostgresCluster, health componentHealth) error { + conditionStatus := metav1.ConditionFalse + if health.State == pgcConstants.Ready { + conditionStatus = metav1.ConditionTrue + } + return setStatus(ctx, c, metrics, cluster, health.Condition, conditionStatus, health.Reason, health.Message, health.Phase) +} + +func setPhaseStatus(ctx context.Context, c client.Client, cluster *enterprisev4.PostgresCluster, phase reconcileClusterPhases) error { + before := cluster.Status.DeepCopy() + p := string(phase) + cluster.Status.Phase = &p + if equality.Semantic.DeepEqual(*before, cluster.Status) { + return nil + } + if err := c.Status().Update(ctx, cluster); err != nil { + return fmt.Errorf("failed to update PostgresCluster status phase: %w", err) + } + return nil +} + // generateConfigMap builds a ConfigMap with connection details for the PostgresCluster. func generateConfigMap(ctx context.Context, c client.Client, scheme *runtime.Scheme, cluster *enterprisev4.PostgresCluster, cnpgCluster *cnpgv1.Cluster, secretName string) (*corev1.ConfigMap, error) { cmName := fmt.Sprintf("%s%s", cluster.Name, defaultConfigMapSuffix) @@ -1031,15 +1802,12 @@ func removeOwnerRef(scheme *runtime.Scheme, owner, obj client.Object) (bool, err // patchObject patches obj from original; treats NotFound as a no-op. func patchObject(ctx context.Context, c client.Client, original, obj client.Object, kind objectKind) error { - logger := log.FromContext(ctx) if err := c.Patch(ctx, obj, client.MergeFrom(original)); err != nil { if apierrors.IsNotFound(err) { - logger.Info("Object not found, skipping patch", "kind", kind, "name", obj.GetName()) return nil } return fmt.Errorf("patching %s: %w", kind, err) } - logger.Info("Object patched", "kind", kind, "name", obj.GetName()) return nil } diff --git a/pkg/postgresql/cluster/core/cluster_unit_test.go b/pkg/postgresql/cluster/core/cluster_unit_test.go index 1a0659c98..de46e0831 100644 --- a/pkg/postgresql/cluster/core/cluster_unit_test.go +++ b/pkg/postgresql/cluster/core/cluster_unit_test.go @@ -6,18 +6,57 @@ import ( cnpgv1 "github.com/cloudnative-pg/cloudnative-pg/api/v1" enterprisev4 "github.com/splunk/splunk-operator/api/v4" + pgcConstants "github.com/splunk/splunk-operator/pkg/postgresql/cluster/core/types/constants" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/utils/ptr" client "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) +type configMapNotFoundClient struct { + client.Client +} + +type noopEventEmitter struct{} + +func (noopEventEmitter) emitNormal(_ client.Object, _, _ string) {} +func (noopEventEmitter) emitWarning(_ client.Object, _, _ string) {} +func (noopEventEmitter) emitPoolerReadyTransition(_ client.Object, _ []metav1.Condition) {} + +type captureEventEmitter struct { + normals []string + warnings []string +} + +func (c *captureEventEmitter) emitNormal(_ client.Object, reason, message string) { + c.normals = append(c.normals, reason+":"+message) +} + +func (c *captureEventEmitter) emitWarning(_ client.Object, reason, message string) { + c.warnings = append(c.warnings, reason+":"+message) +} + +func (c *captureEventEmitter) emitPoolerReadyTransition(_ client.Object, conditions []metav1.Condition) { + if !meta.IsStatusConditionTrue(conditions, string(poolerReady)) { + c.normals = append(c.normals, EventPoolerReady+":Connection poolers are ready") + } +} + +func (c configMapNotFoundClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { + if _, ok := obj.(*corev1.ConfigMap); ok { + return apierrors.NewNotFound(schema.GroupResource{Resource: "configmaps"}, key.Name) + } + return c.Client.Get(ctx, key, obj, opts...) +} + func TestPoolerResourceName(t *testing.T) { tests := []struct { name string @@ -94,6 +133,46 @@ func TestIsPoolerReady(t *testing.T) { } } +func TestPoolerInstanceCountManual(t *testing.T) { + tests := []struct { + name string + pooler *cnpgv1.Pooler + expectedDesired int32 + expectedScheduled int32 + }{ + { + name: "nil instances defaults desired to 1", + pooler: &cnpgv1.Pooler{ + Status: cnpgv1.PoolerStatus{Instances: 3}, + }, + expectedDesired: 1, + expectedScheduled: 3, + }, + { + name: "explicit instances uses spec value", + pooler: &cnpgv1.Pooler{ + Spec: cnpgv1.PoolerSpec{Instances: ptr.To(int32(5))}, + Status: cnpgv1.PoolerStatus{Instances: 2}, + }, + expectedDesired: 5, + expectedScheduled: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + desired := int32(1) + if tt.pooler.Spec.Instances != nil { + desired = *tt.pooler.Spec.Instances + } + scheduled := tt.pooler.Status.Instances + + assert.Equal(t, tt.expectedDesired, desired) + assert.Equal(t, tt.expectedScheduled, scheduled) + }) + } +} + func TestNormalizeCNPGClusterSpec(t *testing.T) { tests := []struct { name string @@ -1002,42 +1081,6 @@ func TestGenerateConfigMap(t *testing.T) { }) } -func TestPoolerInstanceCount(t *testing.T) { - tests := []struct { - name string - pooler *cnpgv1.Pooler - expectedDesired int32 - expectedScheduled int32 - }{ - { - name: "nil instances defaults desired to 1", - pooler: &cnpgv1.Pooler{ - Status: cnpgv1.PoolerStatus{Instances: 3}, - }, - expectedDesired: 1, - expectedScheduled: 3, - }, - { - name: "explicit instances returns spec value", - pooler: &cnpgv1.Pooler{ - Spec: cnpgv1.PoolerSpec{Instances: ptr.To(int32(5))}, - Status: cnpgv1.PoolerStatus{Instances: 2}, - }, - expectedDesired: 5, - expectedScheduled: 2, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - desired, scheduled := poolerInstanceCount(tt.pooler) - - assert.Equal(t, tt.expectedDesired, desired) - assert.Equal(t, tt.expectedScheduled, scheduled) - }) - } -} - func TestGeneratePassword(t *testing.T) { pw, err := generatePassword() @@ -1139,3 +1182,708 @@ func TestCreateOrUpdateConnectionPoolers(t *testing.T) { assert.Equal(t, int32(1), *ro.Spec.Instances) }) } + +func TestComponentStateTriggerConditions(t *testing.T) { + t.Parallel() + + ctx := t.Context() + scheme := runtime.NewScheme() + require.NoError(t, corev1.AddToScheme(scheme)) + require.NoError(t, enterprisev4.AddToScheme(scheme)) + require.NoError(t, cnpgv1.AddToScheme(scheme)) + + exampleCm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pg1-config", + Namespace: "default", + }, + Data: map[string]string{ + "CLUSTER_RW_ENDPOINT": "pg1-rw.default", + "CLUSTER_RO_ENDPOINT": "pg1-ro.default", + "DEFAULT_CLUSTER_PORT": "5432", + "SUPER_USER_SECRET_REF": "pg1-secret", + }, + } + examplePgCluster := &enterprisev4.PostgresCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pg1", + Namespace: "default", + }, + Status: enterprisev4.PostgresClusterStatus{ + Resources: &enterprisev4.PostgresClusterResources{ + ConfigMapRef: &corev1.LocalObjectReference{Name: "pg1-config"}, + SuperUserSecretRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{Name: "pg1-secret"}, + Key: "password", + }, + }, + }, + } + exampleSecret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pg1-secret", + Namespace: "default", + }, + Data: map[string][]byte{ + "password": []byte("s3cr3t"), + }, + } + + instances := int32(1) + version := "16" + storageSize := resource.MustParse("10Gi") + mergedConfig := &MergedConfig{ + Spec: &enterprisev4.PostgresClusterSpec{ + Instances: &instances, + PostgresVersion: &version, + Storage: &storageSize, + Resources: &corev1.ResourceRequirements{}, + PostgreSQLConfig: map[string]string{}, + PgHBA: []string{}, + }, + } + + makeReadyProvisioner := func(cluster *enterprisev4.PostgresCluster) *provisionerModel { + cnpg := &cnpgv1.Cluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: cluster.Name, + Namespace: cluster.Namespace, + }, + Spec: buildCNPGClusterSpec(mergedConfig, "pg1-secret"), + Status: cnpgv1.ClusterStatus{ + Phase: cnpgv1.PhaseHealthy, + }, + } + c := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cnpg).Build() + return newProvisionerModel(c, scheme, noopEventEmitter{}, nil, cluster, mergedConfig, "pg1-secret") + } + + makeRuntimeView := func(healthy bool) clusterRuntimeView { + if !healthy { + return clusterRuntimeViewAdapter{model: &provisionerModel{}} + } + return clusterRuntimeViewAdapter{model: &provisionerModel{ + cnpgCluster: &cnpgv1.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: "pg1", Namespace: "default"}, + Status: cnpgv1.ClusterStatus{Phase: cnpgv1.PhaseHealthy}, + }, + }} + } + + // TODO: as soon as coupling is addressed, remove this monster of a test. + combinations := []struct { + name string + components []component + conditions []conditionTypes + requeue []bool + expectAll bool + message string + }{ + { + name: "Provisioner ready, pooler blocked by prerequisites", + components: func() []component { + cluster := examplePgCluster.DeepCopy() + provisioner := makeReadyProvisioner(cluster) + pooler := newPoolerModel( + fake.NewClientBuilder().WithScheme(scheme).Build(), + scheme, + noopEventEmitter{}, + nil, + cluster, + mergedConfig, + nil, + true, + true, + ) + return []component{provisioner, pooler} + }(), + conditions: []conditionTypes{clusterReady, poolerReady}, + requeue: []bool{false, true}, + expectAll: false, + message: "Provisioner ready but pooler gate is blocked until CNPG is healthy", + }, + { + name: "Provisioner ready, pooler ready, configMap pending from NotFound", + components: func() []component { + cluster := examplePgCluster.DeepCopy() + provisioner := makeReadyProvisioner(cluster) + pooler := newPoolerModel( + fake.NewClientBuilder().WithScheme(scheme).Build(), + scheme, + noopEventEmitter{}, + nil, + cluster, + mergedConfig, + nil, + false, + false, + ) + configMap := newConfigMapModel( + configMapNotFoundClient{ + Client: fake.NewClientBuilder(). + WithScheme(scheme). + Build(), + }, + scheme, + noopEventEmitter{}, + nil, + makeRuntimeView(true), + cluster, + "pg1-secret", + ) + return []component{provisioner, pooler, configMap} + }(), + conditions: []conditionTypes{clusterReady, poolerReady, configMapsReady}, + requeue: []bool{false, false, true}, + expectAll: false, + message: "Provisioner and pooler ready are not enough when ConfigMap check returns NotFound/pending", + }, + { + name: "Flow successful, all components ready", + components: func() []component { + cluster := examplePgCluster.DeepCopy() + provisioner := makeReadyProvisioner(cluster) + pooler := newPoolerModel( + fake.NewClientBuilder().WithScheme(scheme).Build(), + scheme, + noopEventEmitter{}, + nil, + cluster, + mergedConfig, + nil, + false, + false, + ) + configMap := newConfigMapModel( + fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(exampleCm). + Build(), + scheme, + noopEventEmitter{}, + nil, + makeRuntimeView(true), + cluster, + "pg1-secret", + ) + secret := newSecretModel( + fake.NewClientBuilder(). + WithScheme(scheme). + WithObjects(exampleSecret). + Build(), + scheme, + noopEventEmitter{}, + nil, + cluster, + "pg1-secret", + ) + return []component{provisioner, pooler, configMap, secret} + }(), + conditions: []conditionTypes{clusterReady, poolerReady, configMapsReady, secretsReady}, + requeue: []bool{false, false, false, false}, + expectAll: true, + message: "", + }, + } + + for _, tt := range combinations { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + state := pgcConstants.Empty + for i, check := range tt.components { + gate, gateErr := check.EvaluatePrerequisites(ctx) + require.NoError(t, gateErr) + if !gate.Allowed { + info := gate.Health + state = info.State + assert.Equal(t, tt.conditions[i], info.Condition) + assert.Equal(t, tt.requeue[i], info.Result.RequeueAfter > 0) + continue + } + + require.NoError(t, check.Actuate(ctx)) + info, err := check.Converge(ctx) + require.NoError(t, err) + state = info.State + assert.Equal(t, tt.conditions[i], info.Condition) + assert.Equal(t, tt.requeue[i], info.Result.RequeueAfter > 0) + } + assert.Equal(t, tt.expectAll, state&pgcConstants.Ready == pgcConstants.Ready, + tt.message) + }) + } +} + +func TestSyncManagedRolesStatusFromCNPG(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + specRoles []enterprisev4.ManagedRole + cnpgStatus cnpgv1.ManagedRoles + reconciled []string + pending []string + failed map[string]string + }{ + { + name: "marks unreconciled desired role as pending", + specRoles: []enterprisev4.ManagedRole{ + {Name: "app_user", Exists: true}, + }, + cnpgStatus: cnpgv1.ManagedRoles{}, + reconciled: nil, + pending: []string{"app_user"}, + failed: nil, + }, + { + name: "maps reconciled and pending roles from CNPG status", + specRoles: []enterprisev4.ManagedRole{ + {Name: "app_user", Exists: true}, + {Name: "app_rw", Exists: true}, + }, + cnpgStatus: cnpgv1.ManagedRoles{ + ByStatus: map[cnpgv1.RoleStatus][]string{ + cnpgv1.RoleStatusReconciled: {"app_user"}, + cnpgv1.RoleStatusPendingReconciliation: {"app_rw"}, + }, + }, + reconciled: []string{"app_user"}, + pending: []string{"app_rw"}, + failed: nil, + }, + { + name: "maps cannot reconcile errors as failed", + specRoles: []enterprisev4.ManagedRole{ + {Name: "app_user", Exists: true}, + }, + cnpgStatus: cnpgv1.ManagedRoles{ + CannotReconcile: map[string][]string{ + "app_user": {"reserved role"}, + }, + }, + reconciled: nil, + pending: nil, + failed: map[string]string{ + "app_user": "reserved role", + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + cluster := &enterprisev4.PostgresCluster{ + Spec: enterprisev4.PostgresClusterSpec{ + ManagedRoles: tt.specRoles, + }, + } + cnpgCluster := &cnpgv1.Cluster{ + Status: cnpgv1.ClusterStatus{ + ManagedRolesStatus: tt.cnpgStatus, + }, + } + + syncManagedRolesStatusFromCNPG(cluster, cnpgCluster) + + require.NotNil(t, cluster.Status.ManagedRolesStatus) + assert.Equal(t, tt.reconciled, cluster.Status.ManagedRolesStatus.Reconciled) + assert.Equal(t, tt.pending, cluster.Status.ManagedRolesStatus.Pending) + assert.Equal(t, tt.failed, cluster.Status.ManagedRolesStatus.Failed) + }) + } +} + +func TestManagedRolesModelConverge(t *testing.T) { + t.Parallel() + + makeRuntimeView := func(phase string, managedRoles cnpgv1.ManagedRoles) clusterRuntimeView { + return clusterRuntimeViewAdapter{model: &provisionerModel{ + cnpgCluster: &cnpgv1.Cluster{ + Status: cnpgv1.ClusterStatus{ + Phase: phase, + ManagedRolesStatus: managedRoles, + }, + }, + }} + } + + tests := []struct { + name string + runtimeView clusterRuntimeView + specRoles []enterprisev4.ManagedRole + expectedState pgcConstants.State + expectedReason conditionReasons + expectErr bool + expectStatusPublished bool + expectPending []string + expectFailed map[string]string + }{ + { + name: "returns pending when runtime is not healthy", + runtimeView: makeRuntimeView(cnpgv1.PhaseFirstPrimary, cnpgv1.ManagedRoles{}), + specRoles: []enterprisev4.ManagedRole{ + {Name: "app_user", Exists: true}, + }, + expectedState: pgcConstants.Pending, + expectedReason: reasonManagedRolesPending, + expectErr: false, + expectStatusPublished: false, + }, + { + name: "returns pending when role is still pending reconciliation", + runtimeView: makeRuntimeView(cnpgv1.PhaseHealthy, cnpgv1.ManagedRoles{ + ByStatus: map[cnpgv1.RoleStatus][]string{ + cnpgv1.RoleStatusPendingReconciliation: {"app_user"}, + }, + }), + specRoles: []enterprisev4.ManagedRole{ + {Name: "app_user", Exists: true}, + }, + expectedState: pgcConstants.Pending, + expectedReason: reasonManagedRolesPending, + expectErr: false, + expectStatusPublished: true, + expectPending: []string{"app_user"}, + }, + { + name: "returns failed when role cannot reconcile", + runtimeView: makeRuntimeView(cnpgv1.PhaseHealthy, cnpgv1.ManagedRoles{ + CannotReconcile: map[string][]string{ + "app_user": {"reserved role"}, + }, + }), + specRoles: []enterprisev4.ManagedRole{ + {Name: "app_user", Exists: true}, + }, + expectedState: pgcConstants.Failed, + expectedReason: reasonManagedRolesFailed, + expectErr: true, + expectStatusPublished: true, + expectFailed: map[string]string{ + "app_user": "reserved role", + }, + }, + { + name: "returns ready when all desired roles are reconciled", + runtimeView: makeRuntimeView(cnpgv1.PhaseHealthy, cnpgv1.ManagedRoles{ + ByStatus: map[cnpgv1.RoleStatus][]string{ + cnpgv1.RoleStatusReconciled: {"app_user", "app_user_rw"}, + }, + }), + specRoles: []enterprisev4.ManagedRole{ + {Name: "app_user", Exists: true}, + {Name: "app_user_rw", Exists: true}, + }, + expectedState: pgcConstants.Ready, + expectedReason: reasonManagedRolesReady, + expectErr: false, + expectStatusPublished: true, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + cluster := &enterprisev4.PostgresCluster{ + Spec: enterprisev4.PostgresClusterSpec{ + ManagedRoles: tt.specRoles, + }, + } + model := newManagedRolesModel( + fake.NewClientBuilder().Build(), + nil, + noopEventEmitter{}, + nil, + tt.runtimeView, + cluster, + "pg1-secret", + ) + + health, err := model.Converge(context.Background()) + if tt.expectErr { + require.Error(t, err) + } else { + require.NoError(t, err) + } + + assert.Equal(t, managedRolesReady, health.Condition) + assert.Equal(t, tt.expectedState, health.State) + assert.Equal(t, tt.expectedReason, health.Reason) + if tt.expectStatusPublished { + require.NotNil(t, cluster.Status.ManagedRolesStatus) + assert.Equal(t, tt.expectPending, cluster.Status.ManagedRolesStatus.Pending) + assert.Equal(t, tt.expectFailed, cluster.Status.ManagedRolesStatus.Failed) + } else { + assert.Nil(t, cluster.Status.ManagedRolesStatus) + } + }) + } +} + +func TestManagedRolesRuntimeGateHealthMatchesConverge(t *testing.T) { + t.Parallel() + + cluster := &enterprisev4.PostgresCluster{ + Spec: enterprisev4.PostgresClusterSpec{ + ManagedRoles: []enterprisev4.ManagedRole{ + {Name: "app_user", Exists: true}, + }, + }, + } + model := newManagedRolesModel( + fake.NewClientBuilder().Build(), + nil, + noopEventEmitter{}, + nil, + clusterRuntimeViewAdapter{model: &provisionerModel{ + cnpgCluster: &cnpgv1.Cluster{Status: cnpgv1.ClusterStatus{Phase: cnpgv1.PhaseFirstPrimary}}, + }}, + cluster, + "pg1-secret", + ) + + gate, err := model.EvaluatePrerequisites(context.Background()) + require.NoError(t, err) + require.False(t, gate.Allowed) + + health, err := model.Converge(context.Background()) + require.NoError(t, err) + assert.Equal(t, gate.Health, health) +} + +func TestPoolerModelConvergeSetsConnectionPoolerStatus(t *testing.T) { + t.Parallel() + + scheme := runtime.NewScheme() + require.NoError(t, enterprisev4.AddToScheme(scheme)) + require.NoError(t, cnpgv1.AddToScheme(scheme)) + require.NoError(t, corev1.AddToScheme(scheme)) + + t.Run("does not set enabled true while pooler is pending", func(t *testing.T) { + t.Parallel() + + cluster := &enterprisev4.PostgresCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "pg1", Namespace: "default"}, + } + model := newPoolerModel( + fake.NewClientBuilder().WithScheme(scheme).Build(), + scheme, + noopEventEmitter{}, + nil, + cluster, + &MergedConfig{}, + nil, + true, + true, + ) + + health, err := model.Converge(context.Background()) + require.NoError(t, err) + assert.Nil(t, cluster.Status.ConnectionPoolerStatus) + assert.Equal(t, pgcConstants.Pending, health.State) + }) + + t.Run("sets enabled true when pooler converges ready", func(t *testing.T) { + t.Parallel() + + cluster := &enterprisev4.PostgresCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "pg1", Namespace: "default"}, + } + rwPooler := &cnpgv1.Pooler{ + ObjectMeta: metav1.ObjectMeta{ + Name: poolerResourceName(cluster.Name, readWriteEndpoint), + Namespace: cluster.Namespace, + }, + Status: cnpgv1.PoolerStatus{Instances: 1}, + } + roPooler := &cnpgv1.Pooler{ + ObjectMeta: metav1.ObjectMeta{ + Name: poolerResourceName(cluster.Name, readOnlyEndpoint), + Namespace: cluster.Namespace, + }, + Status: cnpgv1.PoolerStatus{Instances: 1}, + } + model := newPoolerModel( + fake.NewClientBuilder().WithScheme(scheme).WithObjects(rwPooler, roPooler).Build(), + scheme, + noopEventEmitter{}, + nil, + cluster, + &MergedConfig{}, + &cnpgv1.Cluster{Status: cnpgv1.ClusterStatus{Phase: cnpgv1.PhaseHealthy}}, + true, + true, + ) + + health, err := model.Converge(context.Background()) + require.NoError(t, err) + assert.Equal(t, &enterprisev4.ConnectionPoolerStatus{Enabled: true}, cluster.Status.ConnectionPoolerStatus) + assert.Equal(t, pgcConstants.Ready, health.State) + }) + + t.Run("sets status nil when pooler disabled", func(t *testing.T) { + t.Parallel() + + cluster := &enterprisev4.PostgresCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "pg1", Namespace: "default"}, + Status: enterprisev4.PostgresClusterStatus{ + ConnectionPoolerStatus: &enterprisev4.ConnectionPoolerStatus{Enabled: true}, + }, + } + model := newPoolerModel( + fake.NewClientBuilder().WithScheme(scheme).Build(), + scheme, + noopEventEmitter{}, + nil, + cluster, + &MergedConfig{}, + nil, + false, + false, + ) + + require.NoError(t, model.Actuate(context.Background())) + health, err := model.Converge(context.Background()) + require.NoError(t, err) + assert.Nil(t, cluster.Status.ConnectionPoolerStatus) + assert.Equal(t, pgcConstants.Ready, health.State) + }) +} + +func TestPoolerConvergeEmitsReadyEventOnTransition(t *testing.T) { + t.Parallel() + + scheme := runtime.NewScheme() + require.NoError(t, enterprisev4.AddToScheme(scheme)) + require.NoError(t, cnpgv1.AddToScheme(scheme)) + require.NoError(t, corev1.AddToScheme(scheme)) + + cluster := &enterprisev4.PostgresCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "pg1", Namespace: "default"}, + } + events := &captureEventEmitter{} + rwPooler := &cnpgv1.Pooler{ + ObjectMeta: metav1.ObjectMeta{ + Name: poolerResourceName(cluster.Name, readWriteEndpoint), + Namespace: cluster.Namespace, + }, + Status: cnpgv1.PoolerStatus{Instances: 1}, + } + roPooler := &cnpgv1.Pooler{ + ObjectMeta: metav1.ObjectMeta{ + Name: poolerResourceName(cluster.Name, readOnlyEndpoint), + Namespace: cluster.Namespace, + }, + Status: cnpgv1.PoolerStatus{Instances: 1}, + } + model := newPoolerModel( + fake.NewClientBuilder().WithScheme(scheme).WithObjects(rwPooler, roPooler).Build(), + scheme, + events, + nil, + cluster, + &MergedConfig{}, + &cnpgv1.Cluster{Status: cnpgv1.ClusterStatus{Phase: cnpgv1.PhaseHealthy}}, + true, + true, + ) + + _, err := model.Converge(context.Background()) + require.NoError(t, err) + require.NotEmpty(t, events.normals) + assert.Contains(t, events.normals[0], EventPoolerReady) + + // No re-emission when condition already True. + cluster.Status.Conditions = []metav1.Condition{{ + Type: string(poolerReady), + Status: metav1.ConditionTrue, + }} + events.normals = nil + _, err = model.Converge(context.Background()) + require.NoError(t, err) + assert.Empty(t, events.normals) +} + +func TestManagedRolesConvergeDoesNotEmitFailureForPending(t *testing.T) { + t.Parallel() + + cluster := &enterprisev4.PostgresCluster{ + Spec: enterprisev4.PostgresClusterSpec{ + ManagedRoles: []enterprisev4.ManagedRole{{Name: "app_user", Exists: true}}, + }, + } + events := &captureEventEmitter{} + model := newManagedRolesModel( + fake.NewClientBuilder().Build(), + nil, + events, + nil, + clusterRuntimeViewAdapter{model: &provisionerModel{ + cnpgCluster: &cnpgv1.Cluster{ + Status: cnpgv1.ClusterStatus{ + Phase: cnpgv1.PhaseHealthy, + ManagedRolesStatus: cnpgv1.ManagedRoles{}, + }, + }, + }}, + cluster, + "pg1-secret", + ) + + _, err := model.Converge(context.Background()) + require.NoError(t, err) + assert.Empty(t, events.warnings) +} + +func TestManagedRolesConvergeEmitsReadyEventOnTransition(t *testing.T) { + t.Parallel() + + cluster := &enterprisev4.PostgresCluster{ + Spec: enterprisev4.PostgresClusterSpec{ + ManagedRoles: []enterprisev4.ManagedRole{ + {Name: "app_user", Exists: true}, + }, + }, + } + events := &captureEventEmitter{} + model := newManagedRolesModel( + fake.NewClientBuilder().Build(), + nil, + events, + nil, + clusterRuntimeViewAdapter{model: &provisionerModel{ + cnpgCluster: &cnpgv1.Cluster{ + Status: cnpgv1.ClusterStatus{ + Phase: cnpgv1.PhaseHealthy, + ManagedRolesStatus: cnpgv1.ManagedRoles{ + ByStatus: map[cnpgv1.RoleStatus][]string{ + cnpgv1.RoleStatusReconciled: {"app_user"}, + }, + }, + }, + }, + }}, + cluster, + "pg1-secret", + ) + + _, err := model.Converge(context.Background()) + require.NoError(t, err) + require.NotEmpty(t, events.normals) + assert.Contains(t, events.normals[0], EventManagedRolesReady) + + // No re-emission when condition already True. + cluster.Status.Conditions = []metav1.Condition{{ + Type: string(managedRolesReady), + Status: metav1.ConditionTrue, + }} + events.normals = nil + _, err = model.Converge(context.Background()) + require.NoError(t, err) + assert.Empty(t, events.normals) +} diff --git a/pkg/postgresql/cluster/core/events.go b/pkg/postgresql/cluster/core/events.go index afcfd768e..c6d7d58e0 100644 --- a/pkg/postgresql/cluster/core/events.go +++ b/pkg/postgresql/cluster/core/events.go @@ -25,6 +25,7 @@ const ( EventClusterCreateFailed = "ClusterCreateFailed" EventClusterUpdateFailed = "ClusterUpdateFailed" EventManagedRolesFailed = "ManagedRolesFailed" + EventManagedRolesReady = "ManagedRolesReady" EventPoolerReconcileFailed = "PoolerReconcileFailed" EventConfigMapReconcileFailed = "ConfigMapReconcileFailed" EventClusterDegraded = "ClusterDegraded" diff --git a/pkg/postgresql/cluster/core/types.go b/pkg/postgresql/cluster/core/types.go index 7a43322fe..0a18c9c86 100644 --- a/pkg/postgresql/cluster/core/types.go +++ b/pkg/postgresql/cluster/core/types.go @@ -44,6 +44,7 @@ type MergedConfig struct { type reconcileClusterPhases string type conditionTypes string type conditionReasons string +type statusMessage = string type objectKind string const ( @@ -52,9 +53,17 @@ const ( readOnlyEndpoint string = "ro" readWriteEndpoint string = "rw" - defaultDatabaseName string = "postgres" - superUsername string = "postgres" - defaultPort string = "5432" + defaultDatabaseName string = "postgres" + superUsername string = "postgres" + defaultPort string = "5432" + configKeyClusterRWEndpoint string = "CLUSTER_RW_ENDPOINT" + configKeyClusterROEndpoint string = "CLUSTER_RO_ENDPOINT" + configKeyClusterREndpoint string = "CLUSTER_R_ENDPOINT" + configKeyDefaultClusterPort string = "DEFAULT_CLUSTER_PORT" + configKeySuperUserName string = "SUPER_USER_NAME" + configKeySuperUserSecretRef string = "SUPER_USER_SECRET_REF" + configKeyPoolerRWEndpoint string = "CLUSTER_POOLER_RW_ENDPOINT" + configKeyPoolerROEndpoint string = "CLUSTER_POOLER_RO_ENDPOINT" secretKeyPassword string = "password" defaultSecretSuffix string = "-secret" @@ -76,19 +85,27 @@ const ( failedClusterPhase reconcileClusterPhases = "Failed" // condition types - clusterReady conditionTypes = "ClusterReady" - poolerReady conditionTypes = "PoolerReady" + clusterReady conditionTypes = "ClusterReady" + poolerReady conditionTypes = "PoolerReady" + managedRolesReady conditionTypes = "ManagedRolesReady" + secretsReady conditionTypes = "SecretsReady" + configMapsReady conditionTypes = "ConfigMapsReady" // condition reasons — clusterReady reasonClusterClassNotFound conditionReasons = "ClusterClassNotFound" + reasonManagedRolesReady conditionReasons = "ManagedRolesReconciled" + reasonManagedRolesPending conditionReasons = "ManagedRolesPending" reasonManagedRolesFailed conditionReasons = "ManagedRolesReconciliationFailed" reasonClusterBuildFailed conditionReasons = "ClusterBuildFailed" reasonClusterBuildSucceeded conditionReasons = "ClusterBuildSucceeded" reasonClusterGetFailed conditionReasons = "ClusterGetFailed" reasonClusterPatchFailed conditionReasons = "ClusterPatchFailed" reasonInvalidConfiguration conditionReasons = "InvalidConfiguration" + reasonConfigMapReady conditionReasons = "ConfigMapReconciled" reasonConfigMapFailed conditionReasons = "ConfigMapReconciliationFailed" + reasonUserSecretPending conditionReasons = "UserSecretPending" reasonUserSecretFailed conditionReasons = "UserSecretReconciliationFailed" + reasonSuperUserSecretReady conditionReasons = "SuperUserSecretReady" reasonSuperUserSecretFailed conditionReasons = "SuperUserSecretFailed" reasonClusterDeleteFailed conditionReasons = "ClusterDeleteFailed" @@ -113,4 +130,39 @@ const ( reasonCNPGProvisioningFailed conditionReasons = "CNPGProvisioningFailed" reasonCNPGPluginError conditionReasons = "CNPGPluginError" reasonCNPGImageError conditionReasons = "CNPGImageError" + + // status messages — provisioner health check + msgProvisionerHealthy statusMessage = "Provisioner cluster is healthy" + msgCNPGPendingCreation statusMessage = "CNPG cluster is pending creation" + msgFmtCNPGProvisioning statusMessage = "CNPG cluster provisioning: %s" + msgCNPGSwitchover statusMessage = "Cluster changing primary node" + msgCNPGFailingOver statusMessage = "Pod missing, need to change primary" + msgFmtCNPGRestarting statusMessage = "CNPG cluster restarting: %s" + msgFmtCNPGUpgrading statusMessage = "CNPG cluster upgrading: %s" + msgCNPGApplyingConfiguration statusMessage = "Configuration change is being applied" + msgCNPGPromoting statusMessage = "Replica is being promoted to primary" + msgCNPGWaitingForUser statusMessage = "Action from the user is required" + msgCNPGUnrecoverable statusMessage = "Cluster failed, needs manual intervention" + msgCNPGCannotCreateObjects statusMessage = "Cluster resources cannot be created" + msgFmtCNPGPluginError statusMessage = "CNPG plugin error: %s" + msgFmtCNPGImageError statusMessage = "CNPG image error: %s" + msgFmtCNPGClusterPhase statusMessage = "CNPG cluster phase: %s" + + // status messages — aggregate and component readiness checks + msgAllComponentsReady statusMessage = "All components are ready" + msgPoolerDisabled statusMessage = "Connection pooler disabled" + msgPoolerConfigMissing statusMessage = "Connection pooler enabled but configuration is missing" + msgPoolersProvisioning statusMessage = "Connection poolers are being provisioned" + msgWaitRWPoolerObject statusMessage = "Waiting for RW pooler object" + msgWaitROPoolerObject statusMessage = "Waiting for RO pooler object" + msgPoolersNotReady statusMessage = "Connection poolers are not ready yet" + msgPoolersReady statusMessage = "Connection poolers are ready" + msgConfigMapRefNotPublished statusMessage = "ConfigMap reference not published yet" + msgConfigMapNotFoundYet statusMessage = "ConfigMap not found yet" + msgFmtConfigMapMissingRequiredKey statusMessage = "ConfigMap missing required key %q" + msgAccessConfigMapReady statusMessage = "Access ConfigMap is ready" + msgSecretRefNotPublished statusMessage = "Superuser secret reference not published yet" + msgSecretNotFoundYet statusMessage = "Superuser secret not found yet" + msgFmtSecretMissingKey statusMessage = "Superuser secret missing key %q" + msgSuperuserSecretReady statusMessage = "Superuser secret is ready" ) diff --git a/pkg/postgresql/cluster/core/types/constants/components.go b/pkg/postgresql/cluster/core/types/constants/components.go new file mode 100644 index 000000000..f6dcdfb7b --- /dev/null +++ b/pkg/postgresql/cluster/core/types/constants/components.go @@ -0,0 +1,9 @@ +package pgcConstants + +const ( + ComponentManagedRoles = "managedRoles" + ComponentProvisioner = "provisioner" + ComponentPooler = "pooler" + ComponentConfigMap = "configMap" + ComponentSecret = "secret" +) diff --git a/pkg/postgresql/cluster/core/types/constants/state.go b/pkg/postgresql/cluster/core/types/constants/state.go new file mode 100644 index 000000000..7f4da47e9 --- /dev/null +++ b/pkg/postgresql/cluster/core/types/constants/state.go @@ -0,0 +1,24 @@ +package pgcConstants + +type State uint64 + +const ( + Empty State = 0 + Ready State = 1 << iota + Pending + Provisioning + Configuring + Failed +) + +func (s State) Contains(state State) bool { + return s&state == state +} + +func (s State) Add(state State) State { + return s | state +} + +func (s State) Remove(state State) State { + return s &^ state +}