diff --git a/internal/controller/valkey/utils.go b/internal/controller/valkey/utils.go index 4796899..1e6ec01 100644 --- a/internal/controller/valkey/utils.go +++ b/internal/controller/valkey/utils.go @@ -331,6 +331,52 @@ func GenerateReshardingPlan(clusterNodesForShard map[int][]*ClusterNode, desired return actionPlan, nil } +// ReplicationInfo holds parsed fields from INFO REPLICATION output. +type ReplicationInfo struct { + Role string + MasterLinkStatus string + MasterSyncInProgress int + MasterReplOffset int64 + SlaveReplOffset int64 +} + +// ParseInfoReplication parses the output of INFO REPLICATION into a ReplicationInfo struct. +// The valkey-go client may prepend "txt:" to the response and uses \r\n line endings. +func ParseInfoReplication(infoTxt string) ReplicationInfo { + info := ReplicationInfo{} + // The valkey-go client may prepend "txt:" to the response + infoTxt = strings.TrimPrefix(infoTxt, "txt:") + for _, line := range strings.Split(infoTxt, "\n") { + line = strings.TrimSpace(line) + line = strings.TrimSuffix(line, "\r") + if line == "" || strings.HasPrefix(line, "#") { + continue + } + parts := strings.SplitN(line, ":", 2) + if len(parts) != 2 { + continue + } + key := strings.TrimSpace(parts[0]) + val := strings.TrimSpace(parts[1]) + switch key { + case "role": + info.Role = val + case "master_link_status": + info.MasterLinkStatus = val + case "master_sync_in_progress": + n, _ := strconv.Atoi(val) + info.MasterSyncInProgress = n + case "master_repl_offset": + n, _ := strconv.ParseInt(val, 10, 64) + info.MasterReplOffset = n + case "slave_repl_offset": + n, _ := strconv.ParseInt(val, 10, 64) + info.SlaveReplOffset = n + } + } + return info +} + func TcpCheck(host, port string) bool { // check tcp port conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, port), 2*time.Second) diff --git a/internal/controller/valkey/utils_test.go b/internal/controller/valkey/utils_test.go index e011f49..91a86c7 100644 --- a/internal/controller/valkey/utils_test.go +++ b/internal/controller/valkey/utils_test.go @@ -379,3 +379,127 @@ func TestGenerateReshardingPlan(t *testing.T) { assert.Equal(t, tt.plan, actual) } } + +func TestParseInfoReplication(t *testing.T) { + testcases := []struct { + name string + in string + out ReplicationInfo + }{ + { + name: "master node", + in: `# Replication +role:master +connected_slaves:2 +master_replid:abc123 +master_repl_offset:123456 +`, + out: ReplicationInfo{ + Role: "master", + MasterReplOffset: 123456, + }, + }, + { + name: "fully caught-up replica", + in: `# Replication +role:slave +master_link_status:up +master_sync_in_progress:0 +master_repl_offset:123456 +slave_repl_offset:123456 +`, + out: ReplicationInfo{ + Role: "slave", + MasterLinkStatus: "up", + MasterSyncInProgress: 0, + MasterReplOffset: 123456, + SlaveReplOffset: 123456, + }, + }, + { + name: "replica with lag", + in: `# Replication +role:slave +master_link_status:up +master_sync_in_progress:0 +master_repl_offset:200000 +slave_repl_offset:199000 +`, + out: ReplicationInfo{ + Role: "slave", + MasterLinkStatus: "up", + MasterSyncInProgress: 0, + MasterReplOffset: 200000, + SlaveReplOffset: 199000, + }, + }, + { + name: "full sync in progress", + in: `# Replication +role:slave +master_link_status:down +master_sync_in_progress:1 +master_repl_offset:-1 +slave_repl_offset:0 +`, + out: ReplicationInfo{ + Role: "slave", + MasterLinkStatus: "down", + MasterSyncInProgress: 1, + MasterReplOffset: -1, + SlaveReplOffset: 0, + }, + }, + { + name: "link down without sync", + in: `# Replication +role:slave +master_link_status:down +master_sync_in_progress:0 +master_repl_offset:50000 +slave_repl_offset:50000 +`, + out: ReplicationInfo{ + Role: "slave", + MasterLinkStatus: "down", + MasterSyncInProgress: 0, + MasterReplOffset: 50000, + SlaveReplOffset: 50000, + }, + }, + { + name: "txt: prefix from valkey-go client", + in: `txt:# Replication +role:slave +master_link_status:up +master_sync_in_progress:0 +master_repl_offset:999 +slave_repl_offset:999 +`, + out: ReplicationInfo{ + Role: "slave", + MasterLinkStatus: "up", + MasterSyncInProgress: 0, + MasterReplOffset: 999, + SlaveReplOffset: 999, + }, + }, + { + name: "carriage return handling", + in: "# Replication\r\nrole:slave\r\nmaster_link_status:up\r\nmaster_sync_in_progress:0\r\nmaster_repl_offset:5000\r\nslave_repl_offset:5000\r\n", + out: ReplicationInfo{ + Role: "slave", + MasterLinkStatus: "up", + MasterSyncInProgress: 0, + MasterReplOffset: 5000, + SlaveReplOffset: 5000, + }, + }, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + actual := ParseInfoReplication(tt.in) + assert.Equal(t, tt.out, actual) + }) + } +} diff --git a/internal/controller/valkeycluster_controller.go b/internal/controller/valkeycluster_controller.go index 2e07c33..81f1958 100644 --- a/internal/controller/valkeycluster_controller.go +++ b/internal/controller/valkeycluster_controller.go @@ -808,6 +808,7 @@ func (r *ValkeyClusterReconciler) isValkeyClusterHealthy(ctx context.Context, va } // Confirm the Valkey cluster itself reports a healthy state. + clusterStateOK := false for _, pod := range podList.Items { if pod.Status.PodIP == "" { continue @@ -835,15 +836,71 @@ func (r *ValkeyClusterReconciler) isValkeyClusterHealthy(ctx context.Context, va logger.Info("Valkey cluster state is not ok", "pod", pod.Name, "state", state) return false, nil } - return true, nil + clusterStateOK = true + break } } - // Only need to check one pod. + // Only need to check one pod for CLUSTER INFO. break } - logger.Info("Could not determine cluster_state from CLUSTER INFO") - return false, nil + if !clusterStateOK { + logger.Info("Could not determine cluster_state from CLUSTER INFO") + return false, nil + } + + // Verify all replicas have caught up with their masters before declaring healthy. + // This prevents the rolling update from deleting the next pod while a freshly + // restarted replica is still performing a full or partial resync. + const replicationOffsetTolerance int64 = 1024 + for _, pod := range podList.Items { + if pod.Status.PodIP == "" { + continue + } + valkeyClient, err := r.NewValkeyClient(ctx, valkeyCluster, pod.Status.PodIP, VALKEY_PORT) + if err != nil { + logger.Error(err, "Failed to create Valkey client for replication check", "pod", pod.Name) + return false, err + } + defer valkeyClient.Close() + + infoTxt, err := valkeyClient.Do(ctx, valkeyClient.B().Info().Section("replication").Build()).ToString() + if err != nil { + logger.Error(err, "Failed to get INFO REPLICATION", "pod", pod.Name) + return false, err + } + + replInfo := internalValkey.ParseInfoReplication(infoTxt) + if replInfo.Role != "slave" { + continue + } + + if replInfo.MasterLinkStatus != "up" { + logger.Info("Cluster unhealthy: replica master_link_status is not up", + "pod", pod.Name, "master_link_status", replInfo.MasterLinkStatus) + return false, nil + } + if replInfo.MasterSyncInProgress != 0 { + logger.Info("Cluster unhealthy: replica has full sync in progress", + "pod", pod.Name) + return false, nil + } + offsetLag := replInfo.MasterReplOffset - replInfo.SlaveReplOffset + if offsetLag < 0 { + offsetLag = -offsetLag + } + if offsetLag > replicationOffsetTolerance { + logger.Info("Cluster unhealthy: replica replication offset lag exceeds tolerance", + "pod", pod.Name, + "master_repl_offset", replInfo.MasterReplOffset, + "slave_repl_offset", replInfo.SlaveReplOffset, + "lag", offsetLag, + "tolerance", replicationOffsetTolerance) + return false, nil + } + } + + return true, nil } func (r *ValkeyClusterReconciler) waitForPodsToBeAccessibleViaValkey(ctx context.Context, valkeyCluster *cachev1alpha1.ValkeyCluster) (*ctrl.Result, error) {