Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions internal/controller/valkey/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,52 @@ func GenerateReshardingPlan(clusterNodesForShard map[int][]*ClusterNode, desired
return actionPlan, nil
}

// ReplicationInfo holds parsed fields from INFO REPLICATION output.
type ReplicationInfo struct {
Role string
MasterLinkStatus string
MasterSyncInProgress int
MasterReplOffset int64
SlaveReplOffset int64
}

// ParseInfoReplication parses the output of INFO REPLICATION into a ReplicationInfo struct.
// The valkey-go client may prepend "txt:" to the response and uses \r\n line endings.
func ParseInfoReplication(infoTxt string) ReplicationInfo {
info := ReplicationInfo{}
// The valkey-go client may prepend "txt:" to the response
infoTxt = strings.TrimPrefix(infoTxt, "txt:")
for _, line := range strings.Split(infoTxt, "\n") {
line = strings.TrimSpace(line)
line = strings.TrimSuffix(line, "\r")
if line == "" || strings.HasPrefix(line, "#") {
continue
}
parts := strings.SplitN(line, ":", 2)
if len(parts) != 2 {
continue
}
key := strings.TrimSpace(parts[0])
val := strings.TrimSpace(parts[1])
switch key {
case "role":
info.Role = val
case "master_link_status":
info.MasterLinkStatus = val
case "master_sync_in_progress":
n, _ := strconv.Atoi(val)
info.MasterSyncInProgress = n
case "master_repl_offset":
n, _ := strconv.ParseInt(val, 10, 64)
info.MasterReplOffset = n
case "slave_repl_offset":
n, _ := strconv.ParseInt(val, 10, 64)
info.SlaveReplOffset = n
}
}
return info
}

func TcpCheck(host, port string) bool {
// check tcp port
conn, err := net.DialTimeout("tcp", net.JoinHostPort(host, port), 2*time.Second)
Expand Down
124 changes: 124 additions & 0 deletions internal/controller/valkey/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,3 +379,127 @@ func TestGenerateReshardingPlan(t *testing.T) {
assert.Equal(t, tt.plan, actual)
}
}

func TestParseInfoReplication(t *testing.T) {
testcases := []struct {
name string
in string
out ReplicationInfo
}{
{
name: "master node",
in: `# Replication
role:master
connected_slaves:2
master_replid:abc123
master_repl_offset:123456
`,
out: ReplicationInfo{
Role: "master",
MasterReplOffset: 123456,
},
},
{
name: "fully caught-up replica",
in: `# Replication
role:slave
master_link_status:up
master_sync_in_progress:0
master_repl_offset:123456
slave_repl_offset:123456
`,
out: ReplicationInfo{
Role: "slave",
MasterLinkStatus: "up",
MasterSyncInProgress: 0,
MasterReplOffset: 123456,
SlaveReplOffset: 123456,
},
},
{
name: "replica with lag",
in: `# Replication
role:slave
master_link_status:up
master_sync_in_progress:0
master_repl_offset:200000
slave_repl_offset:199000
`,
out: ReplicationInfo{
Role: "slave",
MasterLinkStatus: "up",
MasterSyncInProgress: 0,
MasterReplOffset: 200000,
SlaveReplOffset: 199000,
},
},
{
name: "full sync in progress",
in: `# Replication
role:slave
master_link_status:down
master_sync_in_progress:1
master_repl_offset:-1
slave_repl_offset:0
`,
out: ReplicationInfo{
Role: "slave",
MasterLinkStatus: "down",
MasterSyncInProgress: 1,
MasterReplOffset: -1,
SlaveReplOffset: 0,
},
},
{
name: "link down without sync",
in: `# Replication
role:slave
master_link_status:down
master_sync_in_progress:0
master_repl_offset:50000
slave_repl_offset:50000
`,
out: ReplicationInfo{
Role: "slave",
MasterLinkStatus: "down",
MasterSyncInProgress: 0,
MasterReplOffset: 50000,
SlaveReplOffset: 50000,
},
},
{
name: "txt: prefix from valkey-go client",
in: `txt:# Replication
role:slave
master_link_status:up
master_sync_in_progress:0
master_repl_offset:999
slave_repl_offset:999
`,
out: ReplicationInfo{
Role: "slave",
MasterLinkStatus: "up",
MasterSyncInProgress: 0,
MasterReplOffset: 999,
SlaveReplOffset: 999,
},
},
{
name: "carriage return handling",
in: "# Replication\r\nrole:slave\r\nmaster_link_status:up\r\nmaster_sync_in_progress:0\r\nmaster_repl_offset:5000\r\nslave_repl_offset:5000\r\n",
out: ReplicationInfo{
Role: "slave",
MasterLinkStatus: "up",
MasterSyncInProgress: 0,
MasterReplOffset: 5000,
SlaveReplOffset: 5000,
},
},
}
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
actual := ParseInfoReplication(tt.in)
assert.Equal(t, tt.out, actual)
})
}
}
65 changes: 61 additions & 4 deletions internal/controller/valkeycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ func (r *ValkeyClusterReconciler) isValkeyClusterHealthy(ctx context.Context, va
}

// Confirm the Valkey cluster itself reports a healthy state.
clusterStateOK := false
for _, pod := range podList.Items {
if pod.Status.PodIP == "" {
continue
Expand Down Expand Up @@ -835,15 +836,71 @@ func (r *ValkeyClusterReconciler) isValkeyClusterHealthy(ctx context.Context, va
logger.Info("Valkey cluster state is not ok", "pod", pod.Name, "state", state)
return false, nil
}
return true, nil
clusterStateOK = true
break
}
}
// Only need to check one pod.
// Only need to check one pod for CLUSTER INFO.
break
}

logger.Info("Could not determine cluster_state from CLUSTER INFO")
return false, nil
if !clusterStateOK {
logger.Info("Could not determine cluster_state from CLUSTER INFO")
return false, nil
}

// Verify all replicas have caught up with their masters before declaring healthy.
// This prevents the rolling update from deleting the next pod while a freshly
// restarted replica is still performing a full or partial resync.
const replicationOffsetTolerance int64 = 1024
for _, pod := range podList.Items {
if pod.Status.PodIP == "" {
continue
}
valkeyClient, err := r.NewValkeyClient(ctx, valkeyCluster, pod.Status.PodIP, VALKEY_PORT)
if err != nil {
logger.Error(err, "Failed to create Valkey client for replication check", "pod", pod.Name)
return false, err
}
defer valkeyClient.Close()

infoTxt, err := valkeyClient.Do(ctx, valkeyClient.B().Info().Section("replication").Build()).ToString()
if err != nil {
logger.Error(err, "Failed to get INFO REPLICATION", "pod", pod.Name)
return false, err
}

replInfo := internalValkey.ParseInfoReplication(infoTxt)
if replInfo.Role != "slave" {
continue
}

if replInfo.MasterLinkStatus != "up" {
logger.Info("Cluster unhealthy: replica master_link_status is not up",
"pod", pod.Name, "master_link_status", replInfo.MasterLinkStatus)
return false, nil
}
if replInfo.MasterSyncInProgress != 0 {
logger.Info("Cluster unhealthy: replica has full sync in progress",
"pod", pod.Name)
return false, nil
}
offsetLag := replInfo.MasterReplOffset - replInfo.SlaveReplOffset
if offsetLag < 0 {
offsetLag = -offsetLag
}
if offsetLag > replicationOffsetTolerance {
logger.Info("Cluster unhealthy: replica replication offset lag exceeds tolerance",
"pod", pod.Name,
"master_repl_offset", replInfo.MasterReplOffset,
"slave_repl_offset", replInfo.SlaveReplOffset,
"lag", offsetLag,
"tolerance", replicationOffsetTolerance)
return false, nil
}
}

return true, nil
}

func (r *ValkeyClusterReconciler) waitForPodsToBeAccessibleViaValkey(ctx context.Context, valkeyCluster *cachev1alpha1.ValkeyCluster) (*ctrl.Result, error) {
Expand Down