diff --git a/.dagger/main.go b/.dagger/main.go index d62f8fe..4ee7463 100644 --- a/.dagger/main.go +++ b/.dagger/main.go @@ -419,7 +419,8 @@ func (m *ValkeyClusterOperator) BuildTestEnv( WithExec([]string{"go", "install", "sigs.k8s.io/kind@v0.29.0"}). WithUnixSocket("/var/run/docker.sock", sock). WithEnvVariable("CACHEBUSTER", time.Now().String()). - WithExec([]string{"kind", "create", "cluster"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}). + WithExec([]string{"kind", "delete", "cluster", "--name", "kind"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}). + WithExec([]string{"kind", "create", "cluster"}). WithExec([]string{"kind", "load", "docker-image", "valkey-cluster-operator:latest", "--name", "kind"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}). WithExec([]string{"kind", "load", "docker-image", "valkey-server:latest", "--name", "kind"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}). WithExec([]string{"kind", "load", "docker-image", "valkey-server:8.0.5", "--name", "kind"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}). @@ -431,7 +432,10 @@ func (m *ValkeyClusterOperator) BuildTestEnv( daggerEngineContainerName = strings.TrimSpace(daggerEngineContainerName) container.WithExec([]string{"bash", "-c", "docker network connect kind " + daggerEngineContainerName + " || true"}).Stdout(ctx) - data, _ := container.WithExec([]string{"kind", "get", "kubeconfig"}).Stdout(ctx) + data, err := container.WithExec([]string{"kind", "get", "kubeconfig"}).Stdout(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get kubeconfig: %w", err) + } kindControlPlainContainerID, _ := container.WithExec([]string{"bash", "-c", `docker ps --format '{{json .}}' | jq -r '. | select(.Names=="kind-control-plane") | .ID'`}).Stdout(ctx) kindControlPlainContainerID = strings.TrimSpace(kindControlPlainContainerID) @@ -439,7 +443,12 @@ func (m *ValkeyClusterOperator) BuildTestEnv( kindControlPlainContainerIP = strings.TrimSpace(kindControlPlainContainerIP) kubeConfig := KubeConfig{} - _ = yaml.Unmarshal([]byte(data), &kubeConfig) + if err := yaml.Unmarshal([]byte(data), &kubeConfig); err != nil { + return nil, fmt.Errorf("failed to parse kubeconfig: %w", err) + } + if len(kubeConfig.Clusters) == 0 || len(kubeConfig.Users) == 0 { + return nil, fmt.Errorf("kubeconfig is missing clusters or users") + } kubeConfig.Clusters[0].Cluster.Server = "https://" + kindControlPlainContainerIP + ":6443" diff --git a/internal/controller/valkey/utils.go b/internal/controller/valkey/utils.go index e71658f..4796899 100644 --- a/internal/controller/valkey/utils.go +++ b/internal/controller/valkey/utils.go @@ -68,11 +68,11 @@ func parseClusterNodeLine(line string) (*ClusterNode, error) { parts := strings.Split(fields[i], "-") start, err := strconv.Atoi(parts[0]) if err != nil { - return nil, fmt.Errorf("failed to convert string %v: line: %s", err, line) + return nil, fmt.Errorf("failed to convert string %w: line: %s", err, line) } end, err := strconv.Atoi(parts[1]) if err != nil { - return nil, fmt.Errorf("failed to convert string %v: line: %s", err, line) + return nil, fmt.Errorf("failed to convert string %w: line: %s", err, line) } slotRange := &ClusterSlotRange{ Start: start, @@ -82,7 +82,7 @@ func parseClusterNodeLine(line string) (*ClusterNode, error) { } else { start, err := strconv.Atoi(fields[i]) if err != nil { - return nil, fmt.Errorf("failed to convert string %v: line: %s", err, line) + return nil, fmt.Errorf("failed to convert string %w: line: %s", err, line) } end := start slotRange := &ClusterSlotRange{ @@ -298,7 +298,7 @@ func GenerateReshardingPlan(clusterNodesForShard map[int][]*ClusterNode, desired } } - for fromID, _ := range rid { + for fromID := range rid { if rid[fromID] == 0 { continue } diff --git a/internal/controller/valkeycluster_controller.go b/internal/controller/valkeycluster_controller.go index 95ff2ef..2e07c33 100644 --- a/internal/controller/valkeycluster_controller.go +++ b/internal/controller/valkeycluster_controller.go @@ -85,7 +85,7 @@ type ValkeyClusterReconciler struct { // +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch // +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch +// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;delete // +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=pods/exec,verbs=get;list;watch;create;update;patch;delete @@ -229,7 +229,10 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques res, err := r.reconcileStatefulSets(ctx, req, valkeyCluster) if err != nil { log.Error(err, "Failed to reconcile statefulsets") - return *res, err + return ctrl.Result{}, err + } + if res != nil { + return *res, nil } // wait for all pods to be running and accessible via valkey-client @@ -307,6 +310,7 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques } r.Recorder.Event(valkeyCluster, "Normal", "Deleted", fmt.Sprintf("StatefulSet %s/%s is deleted", lastSts.Namespace, lastSts.Name)) + return ctrl.Result{Requeue: true}, nil } } @@ -360,7 +364,7 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques clusterNodes, err := r.buildClusterNodes(ctx, valkeyCluster) if err != nil { - return ctrl.Result{}, fmt.Errorf("Could not build cluster nodes: %v", err) + return ctrl.Result{}, fmt.Errorf("Could not build cluster nodes: %w", err) } // cluster meet @@ -689,17 +693,15 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques } for _, cn := range clusterNodes { if !cn.IsMaster() { - for masterNode, _ := range clusterNodesMap { + for masterNode := range clusterNodesMap { if masterNode.ID == cn.MasterNodeID { clusterNodesMap[masterNode] = append(clusterNodesMap[masterNode], cn) } } } } - isAvailable := true - if len(clusterNodesMap) != int(valkeyCluster.Spec.Shards) { - isAvailable = false - } + isAvailable := len(clusterNodesMap) == int(valkeyCluster.Spec.Shards) + for _, v := range clusterNodesMap { if len(v) != int(valkeyCluster.Spec.Replicas) { isAvailable = false @@ -750,9 +752,100 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques } } + // Drive the rolling update one pod at a time, gated on cluster health. + res, err = r.performRollingUpdate(ctx, valkeyCluster) + if err != nil { + log.Error(err, "Failed to perform rolling update") + return ctrl.Result{}, err + } + if res != nil { + return *res, nil + } + return ctrl.Result{}, nil } +// isValkeyClusterHealthy returns true only when every pod in the cluster is Running and Ready +// and CLUSTER INFO on a reachable node reports cluster_state:ok. +func (r *ValkeyClusterReconciler) isValkeyClusterHealthy(ctx context.Context, valkeyCluster *cachev1alpha1.ValkeyCluster) (bool, error) { + logger := log.FromContext(ctx) + + podList := &corev1.PodList{} + listOpts := []client.ListOption{ + client.InNamespace(valkeyCluster.Namespace), + client.MatchingLabels(labelsForValkeyCluster(valkeyCluster.Name)), + } + if err := r.List(ctx, podList, listOpts...); err != nil { + return false, err + } + if len(podList.Items) == 0 { + logger.Info("No pods found, cluster not healthy") + return false, nil + } + + // Every pod must be Running and Ready. Any terminating pod means a previous + // deletion hasn't fully settled yet — wait for it to complete before + // scheduling another deletion. + for _, pod := range podList.Items { + if pod.DeletionTimestamp != nil { + logger.Info("Cluster unhealthy: pod is terminating", "pod", pod.Name) + return false, nil + } + if pod.Status.Phase != corev1.PodRunning { + logger.Info("Cluster unhealthy: pod not running", "pod", pod.Name, "phase", string(pod.Status.Phase)) + return false, nil + } + podReady := false + for _, cond := range pod.Status.Conditions { + if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue { + podReady = true + } + } + if !podReady { + logger.Info("Cluster unhealthy: pod not ready", "pod", pod.Name) + return false, nil + } + } + + // Confirm the Valkey cluster itself reports a healthy state. + for _, pod := range podList.Items { + if pod.Status.PodIP == "" { + continue + } + valkeyClient, err := r.NewValkeyClient(ctx, valkeyCluster, pod.Status.PodIP, VALKEY_PORT) + if err != nil { + logger.Error(err, "Failed to create Valkey client for cluster health check", "pod", pod.Name) + return false, err + } + defer valkeyClient.Close() + + clusterInfoTxt, err := valkeyClient.Do(ctx, valkeyClient.B().ClusterInfo().Build()).ToString() + if err != nil { + logger.Error(err, "Failed to get CLUSTER INFO for health check", "pod", pod.Name) + return false, err + } + + for _, line := range strings.Split(clusterInfoTxt, "\n") { + line = strings.TrimSpace(line) + // The valkey-go client may prepend "txt:" to the response + line = strings.TrimPrefix(line, "txt:") + if state, ok := strings.CutPrefix(line, "cluster_state:"); ok { + state = strings.TrimSpace(state) + if state != "ok" { + logger.Info("Valkey cluster state is not ok", "pod", pod.Name, "state", state) + return false, nil + } + return true, nil + } + } + // Only need to check one pod. + break + } + + logger.Info("Could not determine cluster_state from CLUSTER INFO") + return false, nil +} + func (r *ValkeyClusterReconciler) waitForPodsToBeAccessibleViaValkey(ctx context.Context, valkeyCluster *cachev1alpha1.ValkeyCluster) (*ctrl.Result, error) { logger := log.FromContext(ctx) podList := &corev1.PodList{} @@ -765,9 +858,13 @@ func (r *ValkeyClusterReconciler) waitForPodsToBeAccessibleViaValkey(ctx context return nil, err } for _, pod := range podList.Items { + if pod.DeletionTimestamp != nil { + logger.Info("Skipping pod - being deleted", "Pod.Name", pod.Name) + continue + } if pod.Status.Phase != corev1.PodRunning { logger.Info("Pod not running", "Pod.Name", pod.Name, "Pod.Status", pod.Status.Phase) - return &ctrl.Result{RequeueAfter: time.Minute}, nil + return &ctrl.Result{RequeueAfter: 15 * time.Second}, nil } logger.Info("Pod running", "Pod.Name", pod.Name, "Pod.Status", pod.Status.Phase) @@ -816,7 +913,7 @@ func (r *ValkeyClusterReconciler) reconcileValkeySlots(ctx context.Context, valk clusterNodesForShard, err := r.buildClusterNodesForShard(ctx, valkeyCluster) if err != nil { logger.Error(err, "Failed to build cluster nodes for shard") - return result, fmt.Errorf("Failed to build cluster nodes for shard: %v", err) + return result, fmt.Errorf("Failed to build cluster nodes for shard: %w", err) } logger.Info("Built cluster nodes for shard", @@ -851,7 +948,7 @@ func (r *ValkeyClusterReconciler) reconcileValkeySlots(ctx context.Context, valk logger.Error(err, "Failed to generate resharding plan", "shardCount", len(clusterNodesForShard), "expectedShards", valkeyCluster.Spec.Shards) - return result, fmt.Errorf("Failed to build action plan: %v", err) + return result, fmt.Errorf("Failed to build action plan: %w", err) } logger.Info("Generated resharding plan", @@ -986,10 +1083,8 @@ func (r *ValkeyClusterReconciler) updateClusterNodesStatus(ctx context.Context, }) } - needsUpdate := false - if len(valkeyCluster.Status.ClusterNodes) != len(clusterNodesStatus) { - needsUpdate = true - } + needsUpdate := len(valkeyCluster.Status.ClusterNodes) != len(clusterNodesStatus) + if !reflect.DeepEqual(valkeyCluster.Status.ClusterNodes, clusterNodesStatus) { needsUpdate = true } @@ -997,9 +1092,7 @@ func (r *ValkeyClusterReconciler) updateClusterNodesStatus(ctx context.Context, if needsUpdate { valkeyCluster.Status.ClusterNodes = make(map[string][]cachev1alpha1.ValkeyClusterNode) for k, v := range clusterNodesStatus { - for _, n := range v { - valkeyCluster.Status.ClusterNodes[k] = append(valkeyCluster.Status.ClusterNodes[k], n) - } + valkeyCluster.Status.ClusterNodes[k] = append(valkeyCluster.Status.ClusterNodes[k], v...) } valkeyCluster.Status.ClusterNodes = clusterNodesStatus if err := r.Status().Update(ctx, valkeyCluster); err != nil { @@ -1041,7 +1134,7 @@ func (r *ValkeyClusterReconciler) buildClusterNodesForShard(ctx context.Context, logger.Error(err, "Failed to parse shard index from pod name", "podName", cn.Pod, "shardIdxStr", matches[0][1]) - return nil, fmt.Errorf("could not parse shard index from pod %s: %v", cn.Pod, err) + return nil, fmt.Errorf("could not parse shard index from pod %s: %w", cn.Pod, err) } if _, ok := clusterNodesForShard[shardIdx]; !ok { clusterNodesForShard[shardIdx] = make([]*internalValkey.ClusterNode, 0) @@ -1084,6 +1177,12 @@ func (r *ValkeyClusterReconciler) buildClusterNodes(ctx context.Context, valkeyC "podCount", len(podList.Items)) for _, pod := range podList.Items { + if pod.DeletionTimestamp != nil { + logger.Info("Skipping pod - being deleted", + "podName", pod.Name) + continue + } + logger.Info("Processing pod", "podName", pod.Name, "podPhase", pod.Status.Phase, diff --git a/internal/controller/valkeycluster_controller_configmap.go b/internal/controller/valkeycluster_controller_configmap.go index 0631518..cbd0c1e 100644 --- a/internal/controller/valkeycluster_controller_configmap.go +++ b/internal/controller/valkeycluster_controller_configmap.go @@ -49,7 +49,7 @@ func (r *ValkeyClusterReconciler) upsertConfigMap(ctx context.Context, valkeyClu } valkeyConfContent, err := getValkeyConfigContent(valkeyCluster) if err != nil { - return "", fmt.Errorf("failed to read valkey config content: %v", err) + return "", fmt.Errorf("failed to read valkey config content: %w", err) } valkeyConfHash := fmt.Sprintf("%x", sha256.Sum256([]byte(valkeyConfContent))) cmData["valkey.conf"] = valkeyConfContent diff --git a/internal/controller/valkeycluster_controller_job.go b/internal/controller/valkeycluster_controller_job.go index 76390e4..96a2876 100644 --- a/internal/controller/valkeycluster_controller_job.go +++ b/internal/controller/valkeycluster_controller_job.go @@ -3,6 +3,7 @@ package controller import ( "context" "fmt" + "strings" "github.com/go-logr/logr" cachev1alpha1 "github.com/halter/valkey-cluster-operator/api/v1alpha1" @@ -44,7 +45,7 @@ func (m *ValkeyJobManager) getTargetPodAddress(ctx context.Context, valkeyCluste }, pod) if err != nil { logger.Error(err, "Failed to get target pod", "podName", podName) - return "", fmt.Errorf("Failed to get target pod %s: %v", podName, err) + return "", fmt.Errorf("Failed to get target pod %s: %w", podName, err) } logger.Info("Found target pod", @@ -77,7 +78,7 @@ func (m *ValkeyJobManager) ReshardSlots(ctx context.Context, valkeyCluster *cach targetAddress, err := m.getTargetPodAddress(ctx, valkeyCluster) if err != nil { - return fmt.Errorf("Failed to get target pod address: %v", err) + return fmt.Errorf("Failed to get target pod address: %w", err) } args := []string{ @@ -97,7 +98,7 @@ func (m *ValkeyJobManager) ReshardSlots(ctx context.Context, valkeyCluster *cach "slotCount", slotCount, "stdout", stdout, "stderr", stderr) - return fmt.Errorf("Failed to reshard %d slots from %s to %s: %v", slotCount, fromNodeID, toNodeID, err) + return fmt.Errorf("Failed to reshard %d slots from %s to %s: %w", slotCount, fromNodeID, toNodeID, err) } logger.Info("Successfully resharded slots via Job", @@ -117,7 +118,7 @@ func (m *ValkeyJobManager) CheckCluster(ctx context.Context, valkeyCluster *cach targetAddress, err := m.getTargetPodAddress(ctx, valkeyCluster) if err != nil { - return "", fmt.Errorf("Failed to get target pod address: %v", err) + return "", fmt.Errorf("Failed to get target pod address: %w", err) } args := []string{ @@ -144,7 +145,7 @@ func (m *ValkeyJobManager) FixClusterSlots(ctx context.Context, valkeyCluster *c targetAddress, err := m.getTargetPodAddress(ctx, valkeyCluster) if err != nil { - return fmt.Errorf("Failed to get target pod address: %v", err) + return fmt.Errorf("Failed to get target pod address: %w", err) } args := []string{ @@ -178,7 +179,7 @@ func (m *ValkeyJobManager) FixClusterSlots(ctx context.Context, valkeyCluster *c } if hasOpenSlots(checkStdout) { - return fmt.Errorf("Failed to fix open slots in cluster: stdout: %s, stderr: %s, original error: %v", stdout, stderr, err) + return fmt.Errorf("Failed to fix open slots in cluster: stdout: %s, stderr: %s, original error: %w", stdout, stderr, err) } logger.Info("Successfully fixed stuck slots via Job") @@ -195,7 +196,7 @@ func (m *ValkeyJobManager) DeleteNode(ctx context.Context, valkeyCluster *cachev targetAddress, err := m.getTargetPodAddress(ctx, valkeyCluster) if err != nil { - return fmt.Errorf("Failed to get target pod address: %v", err) + return fmt.Errorf("Failed to get target pod address: %w", err) } args := []string{ @@ -207,11 +208,17 @@ func (m *ValkeyJobManager) DeleteNode(ctx context.Context, valkeyCluster *cachev stdout, stderr, err := m.reconciler.executeValkeyCliJob(ctx, valkeyCluster, args) if err != nil { + // Treat "No such node ID" as success - the node is already removed + if strings.Contains(stdout, "No such node ID") || strings.Contains(stderr, "No such node ID") { + logger.Info("Node already removed from cluster (No such node ID), treating as success", + "nodeID", nodeID) + return nil + } logger.Error(err, "Failed to delete node from cluster", "nodeID", nodeID, "stdout", stdout, "stderr", stderr) - return fmt.Errorf("Failed to delete node %s from cluster: %v", nodeID, err) + return fmt.Errorf("Failed to delete node %s from cluster: %w", nodeID, err) } logger.Info("Successfully deleted node from cluster via Job", diff --git a/internal/controller/valkeycluster_controller_statefulset.go b/internal/controller/valkeycluster_controller_statefulset.go index 4bd4589..79629af 100644 --- a/internal/controller/valkeycluster_controller_statefulset.go +++ b/internal/controller/valkeycluster_controller_statefulset.go @@ -3,6 +3,7 @@ package controller import ( "context" "fmt" + "sort" "time" "github.com/google/go-cmp/cmp" @@ -16,6 +17,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -95,6 +97,9 @@ func (r *ValkeyClusterReconciler) statefulSet(name string, size int32, valkeyClu }, MinReadySeconds: valkeyCluster.Spec.MinReadySeconds, PodManagementPolicy: appsv1.ParallelPodManagement, // TODO: reconcile statefulsets that do not have this option set + UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ + Type: appsv1.OnDeleteStatefulSetStrategyType, + }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: ls, @@ -337,16 +342,6 @@ func (r *ValkeyClusterReconciler) reconcileStatefulSets(ctx context.Context, req return &ctrl.Result{}, err } - // check if any update is occuring for stateful set, if so re-schedule reconcile - found, err = r.findStatefulsetByName(ctx, valkeyCluster.Namespace, stsName) - if err != nil { - log.Error(err, fmt.Sprintf("Failed to get StatefulSet %s/%s", valkeyCluster.Namespace, stsName)) - return &ctrl.Result{}, err - } - if found.Status.CurrentRevision != found.Status.UpdateRevision { - return &ctrl.Result{RequeueAfter: 15 * time.Second}, nil - } - // update containers[0].Command if there is a difference // Update if there is a difference in the following attributes: // Command @@ -544,6 +539,11 @@ func (r *ValkeyClusterReconciler) compareActualToDesiredStatefulSet(ctx context. diff = true } + if actual.Spec.UpdateStrategy.Type != desired.Spec.UpdateStrategy.Type { + log.Info(fmt.Sprintf("StatefulSet %s UpdateStrategy.Type is different: actual=%s desired=%s", stsName, actual.Spec.UpdateStrategy.Type, desired.Spec.UpdateStrategy.Type)) + diff = true + } + return diff, nil } @@ -559,6 +559,108 @@ func (r *ValkeyClusterReconciler) applyDesiredStatefulSetSpec(valkeyCluster *cac ss.Spec.Template.Spec.Containers[1].Args = desired.Spec.Template.Spec.Containers[1].Args ss.Spec.Template.Spec.Containers[1].Image = desired.Spec.Template.Spec.Containers[1].Image + + ss.Spec.UpdateStrategy = desired.Spec.UpdateStrategy return ss } } + +// performRollingUpdate identifies pods that are on an outdated StatefulSet revision and deletes +// one pod per shard per reconcile, only after verifying the Valkey cluster is fully healthy. +// Because the StatefulSets use OnDelete update strategy, Kubernetes will not restart pods +// automatically; this method drives the rollout. Processing one pod per shard in parallel +// keeps the rollout fast while maintaining cluster safety. +func (r *ValkeyClusterReconciler) performRollingUpdate(ctx context.Context, valkeyCluster *cachev1alpha1.ValkeyCluster) (*ctrl.Result, error) { + logger := log.FromContext(ctx) + + stsList := &appsv1.StatefulSetList{} + listOpts := []client.ListOption{ + client.InNamespace(valkeyCluster.Namespace), + client.MatchingLabels(labelsForValkeyCluster(valkeyCluster.Name)), + } + if err := r.List(ctx, stsList, listOpts...); err != nil { + return nil, err + } + + // Process shards in a deterministic order so that shard N is fully updated + // before shard N+1 is touched. This avoids simultaneous disruption across + // multiple shards. + sort.Slice(stsList.Items, func(i, j int) bool { + return stsList.Items[i].Name < stsList.Items[j].Name + }) + + for _, sts := range stsList.Items { + if sts.Status.UpdateRevision == "" || sts.Status.UpdateRevision == sts.Status.CurrentRevision { + continue + } + + podList := &corev1.PodList{} + podListOpts := []client.ListOption{ + client.InNamespace(valkeyCluster.Namespace), + client.MatchingLabels(map[string]string{ + "statefulset.kubernetes.io/sts-name": sts.Name, + "cache/name": valkeyCluster.Name, + }), + } + if err := r.List(ctx, podList, podListOpts...); err != nil { + return nil, err + } + + var podsNeedingUpdate []corev1.Pod + for _, pod := range podList.Items { + // Skip pods that are already being deleted — they will be recreated by + // the StatefulSet controller with the new template. Counting them here + // would cause a second deletion to be scheduled before the first one + // completes, leading to cascade deletions. + if pod.DeletionTimestamp != nil { + continue + } + if pod.Labels["controller-revision-hash"] != sts.Status.UpdateRevision { + podsNeedingUpdate = append(podsNeedingUpdate, pod) + } + } + if len(podsNeedingUpdate) == 0 { + continue + } + + // Gate each pod deletion on cluster health. All pods must be Running/Ready + // and the cluster must report cluster_state:ok before we proceed. + healthy, err := r.isValkeyClusterHealthy(ctx, valkeyCluster) + if err != nil { + logger.Error(err, "Failed to check Valkey cluster health before rolling update, will retry") + return &ctrl.Result{RequeueAfter: 30 * time.Second}, nil + } + if !healthy { + logger.Info("Valkey cluster is not healthy, deferring rolling update", + "shard", sts.Name, + "podsNeedingUpdate", len(podsNeedingUpdate), + ) + r.Recorder.Event(valkeyCluster, "Warning", "RollingUpdate", + fmt.Sprintf("Cluster not healthy, deferring rolling update of shard %s (%d pods pending)", sts.Name, len(podsNeedingUpdate))) + return &ctrl.Result{RequeueAfter: 30 * time.Second}, nil + } + + // Delete replicas before masters: higher ordinal name = replica, lower = master. + sort.Slice(podsNeedingUpdate, func(i, j int) bool { + return podsNeedingUpdate[i].Name > podsNeedingUpdate[j].Name + }) + podToDelete := podsNeedingUpdate[0] + logger.Info("Deleting pod for rolling update", + "pod", podToDelete.Name, + "shard", sts.Name, + "remainingAfterDeletion", len(podsNeedingUpdate)-1, + ) + if err := r.Delete(ctx, &podToDelete); err != nil && !apierrors.IsNotFound(err) { + logger.Error(err, "Failed to delete pod for rolling update", "pod", podToDelete.Name) + return nil, err + } + r.Recorder.Event(valkeyCluster, "Normal", "RollingUpdate", + fmt.Sprintf("Deleted pod %s for rolling update, %d pods remaining in shard %s", podToDelete.Name, len(podsNeedingUpdate)-1, sts.Name)) + + // Return and requeue — we only ever delete one pod per reconcile cycle, + // and we complete one shard before advancing to the next. + return &ctrl.Result{RequeueAfter: 15 * time.Second}, nil + } + + return nil, nil +} diff --git a/internal/controller/valkeycluster_controller_valkeycli.go b/internal/controller/valkeycluster_controller_valkeycli.go index 662b804..7b6c912 100644 --- a/internal/controller/valkeycluster_controller_valkeycli.go +++ b/internal/controller/valkeycluster_controller_valkeycli.go @@ -36,7 +36,7 @@ func (r *ValkeyClusterReconciler) executeValkeyCli(ctx context.Context, valkeyCl }, runtime.NewParameterCodec(r.Scheme)) exec, err := remotecommand.NewSPDYExecutor(r.RestConfig, "POST", req.URL()) if err != nil { - return "", "", fmt.Errorf("Failed to execute valkey-cli %s: %v", strings.Join(args, " "), err) + return "", "", fmt.Errorf("Failed to execute valkey-cli %s: %w", strings.Join(args, " "), err) } var stdout, stderr bytes.Buffer err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ @@ -47,7 +47,7 @@ func (r *ValkeyClusterReconciler) executeValkeyCli(ctx context.Context, valkeyCl stdoutStr := stdout.String() stderrStr := stderr.String() if err != nil { - return stdoutStr, stderrStr, fmt.Errorf("Failed executing command 'valkey-cli %s': stdout: %s, stderr: %s, err: %v", strings.Join(args, " "), stdoutStr, stderrStr, err) + return stdoutStr, stderrStr, fmt.Errorf("Failed executing command 'valkey-cli %s': stdout: %s, stderr: %s, err: %w", strings.Join(args, " "), stdoutStr, stderrStr, err) } return stdoutStr, stderrStr, nil } diff --git a/internal/controller/valkeycluster_controller_valkeycli_job.go b/internal/controller/valkeycluster_controller_valkeycli_job.go index 4a6ea6a..0279272 100644 --- a/internal/controller/valkeycluster_controller_valkeycli_job.go +++ b/internal/controller/valkeycluster_controller_valkeycli_job.go @@ -36,7 +36,7 @@ func (r *ValkeyClusterReconciler) executeValkeyCliJob(ctx context.Context, valke "args", args) if err := r.Create(ctx, job); err != nil { - return "", "", fmt.Errorf("Failed to create valkey-cli Job: %v", err) + return "", "", fmt.Errorf("Failed to create valkey-cli Job: %w", err) } // Wait for Job to complete @@ -135,7 +135,13 @@ func (r *ValkeyClusterReconciler) waitForJobCompletion(ctx context.Context, name job := &batchv1.Job{} err := r.Get(ctx, types.NamespacedName{Name: jobName, Namespace: namespace}, job) if err != nil { - return "", "", fmt.Errorf("Failed to get Job %s: %v", jobName, err) + if apierrors.IsNotFound(err) { + // Cache may not have synced yet after Job creation, retry + logger.Info("Job not found in cache yet, retrying", "jobName", jobName) + time.Sleep(pollInterval) + continue + } + return "", "", fmt.Errorf("Failed to get Job %s: %w", jobName, err) } // Check if Job completed @@ -167,7 +173,7 @@ func (r *ValkeyClusterReconciler) getJobLogs(ctx context.Context, namespace, job }) if err != nil { logger.Error(err, "Failed to list pods for Job", "jobName", jobName) - return "", "", fmt.Errorf("Failed to list pods for Job %s: %v", jobName, err) + return "", "", fmt.Errorf("Failed to list pods for Job %s: %w", jobName, err) } if len(podList.Items) == 0 { @@ -195,7 +201,7 @@ func (r *ValkeyClusterReconciler) getJobLogs(ctx context.Context, namespace, job logs, err := req.Stream(ctx) if err != nil { logger.Error(err, "Failed to get log stream from pod", "podName", pod.Name) - return "", "", fmt.Errorf("Failed to get logs from pod %s: %v", pod.Name, err) + return "", "", fmt.Errorf("Failed to get logs from pod %s: %w", pod.Name, err) } defer logs.Close() @@ -204,7 +210,7 @@ func (r *ValkeyClusterReconciler) getJobLogs(ctx context.Context, namespace, job _, err = io.Copy(buf, logs) if err != nil { logger.Error(err, "Failed to read logs from pod", "podName", pod.Name) - return "", "", fmt.Errorf("Failed to read logs from pod %s: %v", pod.Name, err) + return "", "", fmt.Errorf("Failed to read logs from pod %s: %w", pod.Name, err) } output := buf.String() @@ -236,7 +242,7 @@ func (r *ValkeyClusterReconciler) deleteJob(ctx context.Context, namespace, jobN } if err := r.Delete(ctx, job, deleteOptions); err != nil { - return fmt.Errorf("Failed to delete Job %s: %v", jobName, err) + return fmt.Errorf("Failed to delete Job %s: %w", jobName, err) } logger.Info("Successfully deleted Job", "jobName", jobName) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 85faf2f..01e6ee4 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -180,7 +180,7 @@ var _ = Describe("controller", Ordered, func() { ) _, err := utils.Run(cmd) ExpectWithOffset(1, err).NotTo(HaveOccurred()) - EventuallyWithOffset(1, verifyClusterState("valkeycluster-sample", 3, 1, ""), 5*time.Minute, 15*time.Second).Should(Succeed()) + EventuallyWithOffset(1, verifyClusterState("valkeycluster-sample", 3, 1, ""), 3*time.Minute, 15*time.Second).Should(Succeed()) }) It("should scale down", func() { cmd := exec.Command("kubectl", @@ -191,10 +191,7 @@ var _ = Describe("controller", Ordered, func() { ) _, err := utils.Run(cmd) ExpectWithOffset(1, err).NotTo(HaveOccurred()) - // Scale down requires resharding slots away from the removed shard. - // Each resharding step runs as a K8s Job with up to 5min timeout. - // Scale 3->2 shards needs 2 resharding steps, so allow 12 minutes. - EventuallyWithOffset(1, verifyClusterState("valkeycluster-sample", 2, 1, ""), 12*time.Minute, 15*time.Second).Should(Succeed()) + EventuallyWithOffset(1, verifyClusterState("valkeycluster-sample", 2, 1, ""), 5*time.Minute, 15*time.Second).Should(Succeed()) getPods := func() error { cmd = exec.Command("kubectl", "get", "pods", "-l", fmt.Sprintf("cache/name=%s,app.kubernetes.io/name=valkeyCluster-operator,app.kubernetes.io/managed-by=ValkeyClusterController", "valkeycluster-sample"), @@ -268,6 +265,11 @@ var _ = Describe("controller", Ordered, func() { ExpectWithOffset(2, err).NotTo(HaveOccurred()) podNames := utils.GetNonEmptyLines(string(podOutput)) + // At this point: 2 shards x (1 master + 2 replicas) = 6 pods + if len(podNames) != 6 { + return fmt.Errorf("expected 6 pods, got %d", len(podNames)) + } + for _, podName := range podNames { cmd = exec.Command("kubectl", "get", "pod", podName, "-o", "jsonpath={.spec.containers[0].resources}", @@ -306,7 +308,7 @@ var _ = Describe("controller", Ordered, func() { } return nil } - EventuallyWithOffset(1, verifyPodResources, 10*time.Minute, 15*time.Second).Should(Succeed()) + EventuallyWithOffset(1, verifyPodResources, 5*time.Minute, 15*time.Second).Should(Succeed()) }) It("change minReadySeconds", func() { verifyClusterMinReadySeconds := func() error { @@ -672,10 +674,10 @@ spec: ExpectWithOffset(1, err).NotTo(HaveOccurred()) By("waiting for rolling update to complete") - EventuallyWithOffset(1, verifyClusterState(upgradeClusterName, 2, 1, ""), 10*time.Minute, 15*time.Second).Should(Succeed()) + EventuallyWithOffset(1, verifyClusterState(upgradeClusterName, 2, 1, ""), 5*time.Minute, 15*time.Second).Should(Succeed()) By("verifying cluster is running version 9.0.1") - EventuallyWithOffset(1, verifyClusterVersion(upgradeClusterName, "9.0.1"), 2*time.Minute, 5*time.Second).Should(Succeed()) + EventuallyWithOffset(1, verifyClusterVersion(upgradeClusterName, "9.0.1"), 5*time.Minute, 15*time.Second).Should(Succeed()) By("verifying test data survived the upgrade") cmd = exec.Command("kubectl", "-n", namespace, "exec", upgradeClusterName+"-0-0", "-c", "valkey-cluster-node", "--", @@ -700,7 +702,7 @@ func verifyClusterState(name string, shards, replicas int, password string) func ) podOutput, err := utils.Run(cmd) if err != nil { - return fmt.Errorf("recieved error getting pods: %v", err) + return fmt.Errorf("recieved error getting pods: %w", err) } podNames := utils.GetNonEmptyLines(string(podOutput)) if len(podNames) != shards+shards*replicas { @@ -715,7 +717,7 @@ func verifyClusterState(name string, shards, replicas int, password string) func ) status, err := utils.Run(cmd) if err != nil { - return fmt.Errorf("received error getting pod phase: %v", err) + return fmt.Errorf("received error getting pod phase: %w", err) } if string(status) != "Running" { return fmt.Errorf("valkey node pod (%s) in %s status", podName, status) @@ -731,7 +733,7 @@ func verifyClusterState(name string, shards, replicas int, password string) func } clusterNodesTxt, _, err := utils.RunWithSplitOutput(cmd) if err != nil { - return fmt.Errorf("received error running kubectl exec: %v", err) + return fmt.Errorf("received error running kubectl exec: %w", err) } if len(clusterNodesTxt) <= 0 { @@ -740,14 +742,14 @@ func verifyClusterState(name string, shards, replicas int, password string) func cn, err := valkey.ParseClusterNode(string(clusterNodesTxt)) if err != nil { - return fmt.Errorf("received error parsing cluster node: %v", err) + return fmt.Errorf("received error parsing cluster node: %w", err) } cn.Pod = podName clusterNodes = append(clusterNodes, cn) clusterNodesAll, err := valkey.ParseClusterNodes(strings.TrimSpace(string(clusterNodesTxt))) if err != nil { - return fmt.Errorf("received error parsing cluster nodes: %v", err) + return fmt.Errorf("received error parsing cluster nodes: %w", err) } clusterNodesByPod[podName] = clusterNodesAll } @@ -816,7 +818,7 @@ func verifyClusterState(name string, shards, replicas int, password string) func } infoReplication, _, err := utils.RunWithSplitOutput(cmd) if err != nil { - return fmt.Errorf("received error running kubectl exec: %v", err) + return fmt.Errorf("received error running kubectl exec: %w", err) } re := regexp.MustCompile(`connected_slaves:(\d+)`) matches := re.FindStringSubmatch(string(infoReplication)) @@ -847,10 +849,10 @@ func verifyClusterState(name string, shards, replicas int, password string) func cmd = exec.Command("kubectl", "-n", namespace, "exec", podName, "-c", "valkey-exporter", "--", "wget", "-qO-", "localhost:9121/metrics") metricsOutput, _, err := utils.RunWithSplitOutput(cmd) if err != nil { - return fmt.Errorf("received error running kubectl exec: %v", err) + return fmt.Errorf("received error running kubectl exec: %w", err) } - if !(strings.Contains(string(metricsOutput), "redis_up 1") && strings.Contains(string(metricsOutput), "redis_cluster_slots_pfail 0")) { + if !strings.Contains(string(metricsOutput), "redis_up 1") || !strings.Contains(string(metricsOutput), "redis_cluster_slots_pfail 0") { return fmt.Errorf("expected metrics output to contain 'redis_up 1' && 'redis_cluster_slots_pfail 0' but got: %s", string(metricsOutput)) } } @@ -871,7 +873,7 @@ func verifyClusterVersion(name string, expectedVersion string) func() error { ) podOutput, err := utils.Run(cmd) if err != nil { - return fmt.Errorf("received error getting pods: %v", err) + return fmt.Errorf("received error getting pods: %w", err) } podNames := utils.GetNonEmptyLines(string(podOutput)) if len(podNames) == 0 { @@ -883,7 +885,7 @@ func verifyClusterVersion(name string, expectedVersion string) func() error { "valkey-cli", "INFO", "server") infoOutput, err := utils.Run(cmd) if err != nil { - return fmt.Errorf("received error running valkey-cli INFO: %v", err) + return fmt.Errorf("received error running valkey-cli INFO: %w", err) } re := regexp.MustCompile(`valkey_version:(\S+)`) diff --git a/test/utils/utils.go b/test/utils/utils.go index b1d715b..7085887 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -61,7 +61,7 @@ func Run(cmd *exec.Cmd) ([]byte, error) { _, _ = fmt.Fprintf(GinkgoWriter, "running: %s\n", command) output, err := cmd.CombinedOutput() if err != nil { - return output, fmt.Errorf("%s failed with error: (%v) %s", command, err, string(output)) + return output, fmt.Errorf("%s failed with error: (%w) %s", command, err, string(output)) } return output, nil @@ -87,7 +87,7 @@ func RunWithSplitOutput(cmd *exec.Cmd) ([]byte, []byte, error) { stdoutBytes := stdout.Bytes() stderrBytes := stderr.Bytes() if err != nil { - return stdoutBytes, stderrBytes, fmt.Errorf("%s failed with error: (%v) %s", command, err, string(stdoutBytes)) + return stdoutBytes, stderrBytes, fmt.Errorf("%s failed with error: (%w) %s", command, err, string(stdoutBytes)) } return stdout.Bytes(), stderr.Bytes(), nil @@ -162,6 +162,6 @@ func GetProjectDir() (string, error) { if err != nil { return wd, err } - wd = strings.Replace(wd, "/test/e2e", "", -1) + wd = strings.ReplaceAll(wd, "/test/e2e", "") return wd, nil }