Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions .dagger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,8 @@ func (m *ValkeyClusterOperator) BuildTestEnv(
WithExec([]string{"go", "install", "sigs.k8s.io/kind@v0.29.0"}).
WithUnixSocket("/var/run/docker.sock", sock).
WithEnvVariable("CACHEBUSTER", time.Now().String()).
WithExec([]string{"kind", "create", "cluster"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}).
WithExec([]string{"kind", "delete", "cluster", "--name", "kind"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}).
WithExec([]string{"kind", "create", "cluster"}).
WithExec([]string{"kind", "load", "docker-image", "valkey-cluster-operator:latest", "--name", "kind"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}).
WithExec([]string{"kind", "load", "docker-image", "valkey-server:latest", "--name", "kind"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}).
WithExec([]string{"kind", "load", "docker-image", "valkey-server:8.0.5", "--name", "kind"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}).
Expand All @@ -431,15 +432,23 @@ func (m *ValkeyClusterOperator) BuildTestEnv(
daggerEngineContainerName = strings.TrimSpace(daggerEngineContainerName)
container.WithExec([]string{"bash", "-c", "docker network connect kind " + daggerEngineContainerName + " || true"}).Stdout(ctx)

data, _ := container.WithExec([]string{"kind", "get", "kubeconfig"}).Stdout(ctx)
data, err := container.WithExec([]string{"kind", "get", "kubeconfig"}).Stdout(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get kubeconfig: %w", err)
}

kindControlPlainContainerID, _ := container.WithExec([]string{"bash", "-c", `docker ps --format '{{json .}}' | jq -r '. | select(.Names=="kind-control-plane") | .ID'`}).Stdout(ctx)
kindControlPlainContainerID = strings.TrimSpace(kindControlPlainContainerID)
kindControlPlainContainerIP, _ := container.WithExec([]string{"bash", "-c", `docker inspect ` + kindControlPlainContainerID + ` | jq -r '.[0].NetworkSettings.Networks.kind.IPAddress'`}).Stdout(ctx)
kindControlPlainContainerIP = strings.TrimSpace(kindControlPlainContainerIP)

kubeConfig := KubeConfig{}
_ = yaml.Unmarshal([]byte(data), &kubeConfig)
if err := yaml.Unmarshal([]byte(data), &kubeConfig); err != nil {
return nil, fmt.Errorf("failed to parse kubeconfig: %w", err)
}
if len(kubeConfig.Clusters) == 0 || len(kubeConfig.Users) == 0 {
return nil, fmt.Errorf("kubeconfig is missing clusters or users")
}

kubeConfig.Clusters[0].Cluster.Server = "https://" + kindControlPlainContainerIP + ":6443"

Expand Down
8 changes: 4 additions & 4 deletions internal/controller/valkey/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ func parseClusterNodeLine(line string) (*ClusterNode, error) {
parts := strings.Split(fields[i], "-")
start, err := strconv.Atoi(parts[0])
if err != nil {
return nil, fmt.Errorf("failed to convert string %v: line: %s", err, line)
return nil, fmt.Errorf("failed to convert string %w: line: %s", err, line)
}
end, err := strconv.Atoi(parts[1])
if err != nil {
return nil, fmt.Errorf("failed to convert string %v: line: %s", err, line)
return nil, fmt.Errorf("failed to convert string %w: line: %s", err, line)
}
slotRange := &ClusterSlotRange{
Start: start,
Expand All @@ -82,7 +82,7 @@ func parseClusterNodeLine(line string) (*ClusterNode, error) {
} else {
start, err := strconv.Atoi(fields[i])
if err != nil {
return nil, fmt.Errorf("failed to convert string %v: line: %s", err, line)
return nil, fmt.Errorf("failed to convert string %w: line: %s", err, line)
}
end := start
slotRange := &ClusterSlotRange{
Expand Down Expand Up @@ -298,7 +298,7 @@ func GenerateReshardingPlan(clusterNodesForShard map[int][]*ClusterNode, desired
}
}

for fromID, _ := range rid {
for fromID := range rid {
if rid[fromID] == 0 {
continue
}
Expand Down
137 changes: 118 additions & 19 deletions internal/controller/valkeycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type ValkeyClusterReconciler struct {
// +kubebuilder:rbac:groups=core,resources=events,verbs=create;patch
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;delete
// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=pods/exec,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -229,7 +229,10 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques
res, err := r.reconcileStatefulSets(ctx, req, valkeyCluster)
if err != nil {
log.Error(err, "Failed to reconcile statefulsets")
return *res, err
return ctrl.Result{}, err
}
if res != nil {
return *res, nil
}

// wait for all pods to be running and accessible via valkey-client
Expand Down Expand Up @@ -307,6 +310,7 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}
r.Recorder.Event(valkeyCluster, "Normal", "Deleted",
fmt.Sprintf("StatefulSet %s/%s is deleted", lastSts.Namespace, lastSts.Name))
return ctrl.Result{Requeue: true}, nil
}
}

Expand Down Expand Up @@ -360,7 +364,7 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques

clusterNodes, err := r.buildClusterNodes(ctx, valkeyCluster)
if err != nil {
return ctrl.Result{}, fmt.Errorf("Could not build cluster nodes: %v", err)
return ctrl.Result{}, fmt.Errorf("Could not build cluster nodes: %w", err)
}

// cluster meet
Expand Down Expand Up @@ -689,17 +693,15 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}
for _, cn := range clusterNodes {
if !cn.IsMaster() {
for masterNode, _ := range clusterNodesMap {
for masterNode := range clusterNodesMap {
if masterNode.ID == cn.MasterNodeID {
clusterNodesMap[masterNode] = append(clusterNodesMap[masterNode], cn)
}
}
}
}
isAvailable := true
if len(clusterNodesMap) != int(valkeyCluster.Spec.Shards) {
isAvailable = false
}
isAvailable := len(clusterNodesMap) == int(valkeyCluster.Spec.Shards)

for _, v := range clusterNodesMap {
if len(v) != int(valkeyCluster.Spec.Replicas) {
isAvailable = false
Expand Down Expand Up @@ -750,9 +752,100 @@ func (r *ValkeyClusterReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}
}

// Drive the rolling update one pod at a time, gated on cluster health.
res, err = r.performRollingUpdate(ctx, valkeyCluster)
if err != nil {
log.Error(err, "Failed to perform rolling update")
return ctrl.Result{}, err
}
if res != nil {
return *res, nil
}

return ctrl.Result{}, nil
}

// isValkeyClusterHealthy returns true only when every pod in the cluster is Running and Ready
// and CLUSTER INFO on a reachable node reports cluster_state:ok.
func (r *ValkeyClusterReconciler) isValkeyClusterHealthy(ctx context.Context, valkeyCluster *cachev1alpha1.ValkeyCluster) (bool, error) {
logger := log.FromContext(ctx)

podList := &corev1.PodList{}
listOpts := []client.ListOption{
client.InNamespace(valkeyCluster.Namespace),
client.MatchingLabels(labelsForValkeyCluster(valkeyCluster.Name)),
}
if err := r.List(ctx, podList, listOpts...); err != nil {
return false, err
}
if len(podList.Items) == 0 {
logger.Info("No pods found, cluster not healthy")
return false, nil
}

// Every pod must be Running and Ready. Any terminating pod means a previous
// deletion hasn't fully settled yet — wait for it to complete before
// scheduling another deletion.
for _, pod := range podList.Items {
if pod.DeletionTimestamp != nil {
logger.Info("Cluster unhealthy: pod is terminating", "pod", pod.Name)
return false, nil
}
if pod.Status.Phase != corev1.PodRunning {
logger.Info("Cluster unhealthy: pod not running", "pod", pod.Name, "phase", string(pod.Status.Phase))
return false, nil
}
podReady := false
for _, cond := range pod.Status.Conditions {
if cond.Type == corev1.PodReady && cond.Status == corev1.ConditionTrue {
podReady = true
}
}
if !podReady {
logger.Info("Cluster unhealthy: pod not ready", "pod", pod.Name)
return false, nil
}
}

// Confirm the Valkey cluster itself reports a healthy state.
for _, pod := range podList.Items {
if pod.Status.PodIP == "" {
continue
}
valkeyClient, err := r.NewValkeyClient(ctx, valkeyCluster, pod.Status.PodIP, VALKEY_PORT)
if err != nil {
logger.Error(err, "Failed to create Valkey client for cluster health check", "pod", pod.Name)
return false, err
}
defer valkeyClient.Close()

clusterInfoTxt, err := valkeyClient.Do(ctx, valkeyClient.B().ClusterInfo().Build()).ToString()
if err != nil {
logger.Error(err, "Failed to get CLUSTER INFO for health check", "pod", pod.Name)
return false, err
}

for _, line := range strings.Split(clusterInfoTxt, "\n") {
line = strings.TrimSpace(line)
// The valkey-go client may prepend "txt:" to the response
line = strings.TrimPrefix(line, "txt:")
if state, ok := strings.CutPrefix(line, "cluster_state:"); ok {
state = strings.TrimSpace(state)
if state != "ok" {
logger.Info("Valkey cluster state is not ok", "pod", pod.Name, "state", state)
return false, nil
}
return true, nil
}
}
// Only need to check one pod.
break
}

logger.Info("Could not determine cluster_state from CLUSTER INFO")
return false, nil
}

func (r *ValkeyClusterReconciler) waitForPodsToBeAccessibleViaValkey(ctx context.Context, valkeyCluster *cachev1alpha1.ValkeyCluster) (*ctrl.Result, error) {
logger := log.FromContext(ctx)
podList := &corev1.PodList{}
Expand All @@ -765,9 +858,13 @@ func (r *ValkeyClusterReconciler) waitForPodsToBeAccessibleViaValkey(ctx context
return nil, err
}
for _, pod := range podList.Items {
if pod.DeletionTimestamp != nil {
logger.Info("Skipping pod - being deleted", "Pod.Name", pod.Name)
continue
}
if pod.Status.Phase != corev1.PodRunning {
logger.Info("Pod not running", "Pod.Name", pod.Name, "Pod.Status", pod.Status.Phase)
return &ctrl.Result{RequeueAfter: time.Minute}, nil
return &ctrl.Result{RequeueAfter: 15 * time.Second}, nil
}

logger.Info("Pod running", "Pod.Name", pod.Name, "Pod.Status", pod.Status.Phase)
Expand Down Expand Up @@ -816,7 +913,7 @@ func (r *ValkeyClusterReconciler) reconcileValkeySlots(ctx context.Context, valk
clusterNodesForShard, err := r.buildClusterNodesForShard(ctx, valkeyCluster)
if err != nil {
logger.Error(err, "Failed to build cluster nodes for shard")
return result, fmt.Errorf("Failed to build cluster nodes for shard: %v", err)
return result, fmt.Errorf("Failed to build cluster nodes for shard: %w", err)
}

logger.Info("Built cluster nodes for shard",
Expand Down Expand Up @@ -851,7 +948,7 @@ func (r *ValkeyClusterReconciler) reconcileValkeySlots(ctx context.Context, valk
logger.Error(err, "Failed to generate resharding plan",
"shardCount", len(clusterNodesForShard),
"expectedShards", valkeyCluster.Spec.Shards)
return result, fmt.Errorf("Failed to build action plan: %v", err)
return result, fmt.Errorf("Failed to build action plan: %w", err)
}

logger.Info("Generated resharding plan",
Expand Down Expand Up @@ -986,20 +1083,16 @@ func (r *ValkeyClusterReconciler) updateClusterNodesStatus(ctx context.Context,
})
}

needsUpdate := false
if len(valkeyCluster.Status.ClusterNodes) != len(clusterNodesStatus) {
needsUpdate = true
}
needsUpdate := len(valkeyCluster.Status.ClusterNodes) != len(clusterNodesStatus)

if !reflect.DeepEqual(valkeyCluster.Status.ClusterNodes, clusterNodesStatus) {
needsUpdate = true
}

if needsUpdate {
valkeyCluster.Status.ClusterNodes = make(map[string][]cachev1alpha1.ValkeyClusterNode)
for k, v := range clusterNodesStatus {
for _, n := range v {
valkeyCluster.Status.ClusterNodes[k] = append(valkeyCluster.Status.ClusterNodes[k], n)
}
valkeyCluster.Status.ClusterNodes[k] = append(valkeyCluster.Status.ClusterNodes[k], v...)
}
valkeyCluster.Status.ClusterNodes = clusterNodesStatus
if err := r.Status().Update(ctx, valkeyCluster); err != nil {
Expand Down Expand Up @@ -1041,7 +1134,7 @@ func (r *ValkeyClusterReconciler) buildClusterNodesForShard(ctx context.Context,
logger.Error(err, "Failed to parse shard index from pod name",
"podName", cn.Pod,
"shardIdxStr", matches[0][1])
return nil, fmt.Errorf("could not parse shard index from pod %s: %v", cn.Pod, err)
return nil, fmt.Errorf("could not parse shard index from pod %s: %w", cn.Pod, err)
}
if _, ok := clusterNodesForShard[shardIdx]; !ok {
clusterNodesForShard[shardIdx] = make([]*internalValkey.ClusterNode, 0)
Expand Down Expand Up @@ -1084,6 +1177,12 @@ func (r *ValkeyClusterReconciler) buildClusterNodes(ctx context.Context, valkeyC
"podCount", len(podList.Items))

for _, pod := range podList.Items {
if pod.DeletionTimestamp != nil {
logger.Info("Skipping pod - being deleted",
"podName", pod.Name)
continue
}

logger.Info("Processing pod",
"podName", pod.Name,
"podPhase", pod.Status.Phase,
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/valkeycluster_controller_configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (r *ValkeyClusterReconciler) upsertConfigMap(ctx context.Context, valkeyClu
}
valkeyConfContent, err := getValkeyConfigContent(valkeyCluster)
if err != nil {
return "", fmt.Errorf("failed to read valkey config content: %v", err)
return "", fmt.Errorf("failed to read valkey config content: %w", err)
}
valkeyConfHash := fmt.Sprintf("%x", sha256.Sum256([]byte(valkeyConfContent)))
cmData["valkey.conf"] = valkeyConfContent
Expand Down
Loading