From ced5f65d852fc7eaf8fea39e60049136b831ba9c Mon Sep 17 00:00:00 2001 From: Kurt McAlpine Date: Sun, 22 Feb 2026 19:52:01 +1300 Subject: [PATCH] Verify replica replication catch-up during rolling updates Add ReplicationInfo struct and ParseInfoReplication() function to parse INFO REPLICATION output from Valkey nodes, extracting role, master_link_status, master_sync_in_progress, master_repl_offset, and slave_repl_offset fields. Handles the valkey-go client's "txt:" prefix and \r\n line endings. Enhance isValkeyClusterHealthy() to check that all replicas have caught up with their masters before allowing the next pod deletion during rolling updates. This prevents data loss when the next pod to be deleted is the master of a shard whose replica hasn't finished syncing. For each replica, checks: 1. master_link_status is "up" 2. master_sync_in_progress is 0 3. slave_repl_offset is within 1024 bytes of master_repl_offset If any replica fails these checks, the rolling update is deferred and requeued after 30 seconds. Includes 7 test cases for ParseInfoReplication covering: master node, fully caught-up replica, replica with lag, full sync in progress, link down without sync, txt: prefix handling, and carriage return handling. Co-Authored-By: Claude Opus 4.6 --- internal/controller/valkey/utils.go | 46 +++++++ internal/controller/valkey/utils_test.go | 124 ++++++++++++++++++ .../controller/valkeycluster_controller.go | 65 ++++++++- 3 files changed, 231 insertions(+), 4 deletions(-) diff --git a/internal/controller/valkey/utils.go b/internal/controller/valkey/utils.go index 4796899..1e6ec01 100644 --- a/internal/controller/valkey/utils.go +++ b/internal/controller/valkey/utils.go @@ -331,6 +331,52 @@ func GenerateReshardingPlan(clusterNodesForShard map[int][]*ClusterNode, desired return actionPlan, nil } +// ReplicationInfo holds parsed fields from INFO REPLICATION output. +type ReplicationInfo struct { + Role string + MasterLinkStatus string + MasterSyncInProgress int + MasterReplOffset int64 + SlaveReplOffset int64 +} + +// ParseInfoReplication parses the output of INFO REPLICATION into a ReplicationInfo struct. +// The valkey-go client may prepend "txt:" to the response and uses \r\n line endings. +func ParseInfoReplication(infoTxt string) ReplicationInfo { + info := ReplicationInfo{} + // The valkey-go client may prepend "txt:" to the response + infoTxt = strings.TrimPrefix(infoTxt, "txt:") + for _, line := range strings.Split(infoTxt, "\n") { + line = strings.TrimSpace(line) + line = strings.TrimSuffix(line, "\r") + if line == "" || strings.HasPrefix(line, "#") { + continue + } + parts := strings.SplitN(line, ":", 2) + if len(parts) != 2 { + continue + } + key := strings.TrimSpace(parts[0]) + val := strings.TrimSpace(parts[1]) + switch key { + case "role": + info.Role = val + case "master_link_status": + info.MasterLinkStatus = val + case "master_sync_in_progress": + n, _ := strconv.Atoi(val) + info.MasterSyncInProgress = n + case "master_repl_offset": + n, _ := strconv.ParseInt(val, 10, 64) + info.MasterReplOffset = n + case "slave_repl_offset": + n, _ := strconv.ParseInt(val, 10, 64) + info.SlaveReplOffset = n + } + } + return info +} + func TcpCheck(host, port string) bool { // check tcp port conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, port), 2*time.Second) diff --git a/internal/controller/valkey/utils_test.go b/internal/controller/valkey/utils_test.go index e011f49..91a86c7 100644 --- a/internal/controller/valkey/utils_test.go +++ b/internal/controller/valkey/utils_test.go @@ -379,3 +379,127 @@ func TestGenerateReshardingPlan(t *testing.T) { assert.Equal(t, tt.plan, actual) } } + +func TestParseInfoReplication(t *testing.T) { + testcases := []struct { + name string + in string + out ReplicationInfo + }{ + { + name: "master node", + in: `# Replication +role:master +connected_slaves:2 +master_replid:abc123 +master_repl_offset:123456 +`, + out: ReplicationInfo{ + Role: "master", + MasterReplOffset: 123456, + }, + }, + { + name: "fully caught-up replica", + in: `# Replication +role:slave +master_link_status:up +master_sync_in_progress:0 +master_repl_offset:123456 +slave_repl_offset:123456 +`, + out: ReplicationInfo{ + Role: "slave", + MasterLinkStatus: "up", + MasterSyncInProgress: 0, + MasterReplOffset: 123456, + SlaveReplOffset: 123456, + }, + }, + { + name: "replica with lag", + in: `# Replication +role:slave +master_link_status:up +master_sync_in_progress:0 +master_repl_offset:200000 +slave_repl_offset:199000 +`, + out: ReplicationInfo{ + Role: "slave", + MasterLinkStatus: "up", + MasterSyncInProgress: 0, + MasterReplOffset: 200000, + SlaveReplOffset: 199000, + }, + }, + { + name: "full sync in progress", + in: `# Replication +role:slave +master_link_status:down +master_sync_in_progress:1 +master_repl_offset:-1 +slave_repl_offset:0 +`, + out: ReplicationInfo{ + Role: "slave", + MasterLinkStatus: "down", + MasterSyncInProgress: 1, + MasterReplOffset: -1, + SlaveReplOffset: 0, + }, + }, + { + name: "link down without sync", + in: `# Replication +role:slave +master_link_status:down +master_sync_in_progress:0 +master_repl_offset:50000 +slave_repl_offset:50000 +`, + out: ReplicationInfo{ + Role: "slave", + MasterLinkStatus: "down", + MasterSyncInProgress: 0, + MasterReplOffset: 50000, + SlaveReplOffset: 50000, + }, + }, + { + name: "txt: prefix from valkey-go client", + in: `txt:# Replication +role:slave +master_link_status:up +master_sync_in_progress:0 +master_repl_offset:999 +slave_repl_offset:999 +`, + out: ReplicationInfo{ + Role: "slave", + MasterLinkStatus: "up", + MasterSyncInProgress: 0, + MasterReplOffset: 999, + SlaveReplOffset: 999, + }, + }, + { + name: "carriage return handling", + in: "# Replication\r\nrole:slave\r\nmaster_link_status:up\r\nmaster_sync_in_progress:0\r\nmaster_repl_offset:5000\r\nslave_repl_offset:5000\r\n", + out: ReplicationInfo{ + Role: "slave", + MasterLinkStatus: "up", + MasterSyncInProgress: 0, + MasterReplOffset: 5000, + SlaveReplOffset: 5000, + }, + }, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + actual := ParseInfoReplication(tt.in) + assert.Equal(t, tt.out, actual) + }) + } +} diff --git a/internal/controller/valkeycluster_controller.go b/internal/controller/valkeycluster_controller.go index 2e07c33..81f1958 100644 --- a/internal/controller/valkeycluster_controller.go +++ b/internal/controller/valkeycluster_controller.go @@ -808,6 +808,7 @@ func (r *ValkeyClusterReconciler) isValkeyClusterHealthy(ctx context.Context, va } // Confirm the Valkey cluster itself reports a healthy state. + clusterStateOK := false for _, pod := range podList.Items { if pod.Status.PodIP == "" { continue @@ -835,15 +836,71 @@ func (r *ValkeyClusterReconciler) isValkeyClusterHealthy(ctx context.Context, va logger.Info("Valkey cluster state is not ok", "pod", pod.Name, "state", state) return false, nil } - return true, nil + clusterStateOK = true + break } } - // Only need to check one pod. + // Only need to check one pod for CLUSTER INFO. break } - logger.Info("Could not determine cluster_state from CLUSTER INFO") - return false, nil + if !clusterStateOK { + logger.Info("Could not determine cluster_state from CLUSTER INFO") + return false, nil + } + + // Verify all replicas have caught up with their masters before declaring healthy. + // This prevents the rolling update from deleting the next pod while a freshly + // restarted replica is still performing a full or partial resync. + const replicationOffsetTolerance int64 = 1024 + for _, pod := range podList.Items { + if pod.Status.PodIP == "" { + continue + } + valkeyClient, err := r.NewValkeyClient(ctx, valkeyCluster, pod.Status.PodIP, VALKEY_PORT) + if err != nil { + logger.Error(err, "Failed to create Valkey client for replication check", "pod", pod.Name) + return false, err + } + defer valkeyClient.Close() + + infoTxt, err := valkeyClient.Do(ctx, valkeyClient.B().Info().Section("replication").Build()).ToString() + if err != nil { + logger.Error(err, "Failed to get INFO REPLICATION", "pod", pod.Name) + return false, err + } + + replInfo := internalValkey.ParseInfoReplication(infoTxt) + if replInfo.Role != "slave" { + continue + } + + if replInfo.MasterLinkStatus != "up" { + logger.Info("Cluster unhealthy: replica master_link_status is not up", + "pod", pod.Name, "master_link_status", replInfo.MasterLinkStatus) + return false, nil + } + if replInfo.MasterSyncInProgress != 0 { + logger.Info("Cluster unhealthy: replica has full sync in progress", + "pod", pod.Name) + return false, nil + } + offsetLag := replInfo.MasterReplOffset - replInfo.SlaveReplOffset + if offsetLag < 0 { + offsetLag = -offsetLag + } + if offsetLag > replicationOffsetTolerance { + logger.Info("Cluster unhealthy: replica replication offset lag exceeds tolerance", + "pod", pod.Name, + "master_repl_offset", replInfo.MasterReplOffset, + "slave_repl_offset", replInfo.SlaveReplOffset, + "lag", offsetLag, + "tolerance", replicationOffsetTolerance) + return false, nil + } + } + + return true, nil } func (r *ValkeyClusterReconciler) waitForPodsToBeAccessibleViaValkey(ctx context.Context, valkeyCluster *cachev1alpha1.ValkeyCluster) (*ctrl.Result, error) {