From 989de619071ea8f1de9af412db0e883ec12b96d8 Mon Sep 17 00:00:00 2001 From: Kurt McAlpine Date: Fri, 9 Jan 2026 14:56:12 +1300 Subject: [PATCH 1/2] Add new valkey versions --- .dagger/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.dagger/main.go b/.dagger/main.go index 59ea8f2..4d9fae3 100644 --- a/.dagger/main.go +++ b/.dagger/main.go @@ -124,7 +124,7 @@ func (m *ValkeyClusterOperator) PublishValkeyDocker( ghToken *dagger.Secret, ) error { - valkeyVersions := []string{"8.0.3", "8.0.4", "8.0.5", "8.1.3"} + valkeyVersions := []string{"8.0.3", "8.0.4", "8.0.5", "8.1.3", "8.1.5", "9.0.1"} for _, valkeyVersion := range valkeyVersions { // container registry for the multi-platform image From 9871884fcad7eca74414240d43f65db9476095ef Mon Sep 17 00:00:00 2001 From: Kurt McAlpine Date: Fri, 9 Jan 2026 14:59:52 +1300 Subject: [PATCH 2/2] Add upgrade test --- .dagger/main.go | 22 ++++- test/e2e/e2e_test.go | 190 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 209 insertions(+), 3 deletions(-) diff --git a/.dagger/main.go b/.dagger/main.go index 4d9fae3..d2cada8 100644 --- a/.dagger/main.go +++ b/.dagger/main.go @@ -303,7 +303,25 @@ func (m *ValkeyClusterOperator) BuildAndLoadLocally( return err } - return m.loadSingleArchImage(ctx, valkeyVariants, hostPlatform, "valkey-server:latest", sock) + err = m.loadSingleArchImage(ctx, valkeyVariants, hostPlatform, "valkey-server:latest", sock) + if err != nil { + return err + } + + // Load versioned images for upgrade testing + upgradeTestVersions := []string{"8.0.5", "9.0.1"} + for _, version := range upgradeTestVersions { + valkeyVersionedVariants, err := m.BuildValkeyContainerImage(ctx, version) + if err != nil { + return err + } + err = m.loadSingleArchImage(ctx, valkeyVersionedVariants, hostPlatform, "valkey-server:"+version, sock) + if err != nil { + return err + } + } + + return nil } func (m *ValkeyClusterOperator) detectHostPlatform(ctx context.Context) (string, error) { @@ -404,6 +422,8 @@ func (m *ValkeyClusterOperator) BuildTestEnv( WithExec([]string{"kind", "create", "cluster"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}). WithExec([]string{"kind", "load", "docker-image", "valkey-cluster-operator:latest", "--name", "kind"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}). WithExec([]string{"kind", "load", "docker-image", "valkey-server:latest", "--name", "kind"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}). + WithExec([]string{"kind", "load", "docker-image", "valkey-server:8.0.5", "--name", "kind"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}). + WithExec([]string{"kind", "load", "docker-image", "valkey-server:9.0.1", "--name", "kind"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}). WithDirectory("/src", source). WithWorkdir("/src") diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 2d75a04..85faf2f 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -191,7 +191,10 @@ var _ = Describe("controller", Ordered, func() { ) _, err := utils.Run(cmd) ExpectWithOffset(1, err).NotTo(HaveOccurred()) - EventuallyWithOffset(1, verifyClusterState("valkeycluster-sample", 2, 1, ""), 6*time.Minute, 15*time.Second).Should(Succeed()) + // Scale down requires resharding slots away from the removed shard. + // Each resharding step runs as a K8s Job with up to 5min timeout. + // Scale 3->2 shards needs 2 resharding steps, so allow 12 minutes. + EventuallyWithOffset(1, verifyClusterState("valkeycluster-sample", 2, 1, ""), 12*time.Minute, 15*time.Second).Should(Succeed()) getPods := func() error { cmd = exec.Command("kubectl", "get", "pods", "-l", fmt.Sprintf("cache/name=%s,app.kubernetes.io/name=valkeyCluster-operator,app.kubernetes.io/managed-by=ValkeyClusterController", "valkeycluster-sample"), @@ -541,12 +544,154 @@ var _ = Describe("controller", Ordered, func() { }) }) +var _ = Describe("upgrade", Ordered, func() { + upgradeClusterName := "valkeycluster-upgrade" + + BeforeAll(func() { + By("creating manager namespace") + cmd := exec.Command("kubectl", "create", "ns", namespace) + _, _ = utils.Run(cmd) + + By("installing CRDs") + cmd = exec.Command("make", "install") + _, err := utils.Run(cmd) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + + By("deploying the controller-manager") + projectimage := "valkey-cluster-operator:latest" + cmd = exec.Command("make", "deploy", fmt.Sprintf("IMG=%s", projectimage)) + _, err = utils.Run(cmd) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + + By("waiting for controller-manager to be ready") + verifyControllerUp := func() error { + cmd = exec.Command("kubectl", "get", + "pods", "-l", "control-plane=controller-manager", + "-o", "go-template={{ range .items }}"+ + "{{ if not .metadata.deletionTimestamp }}"+ + "{{ .metadata.name }}"+ + "{{ \"\\n\" }}{{ end }}{{ end }}", + "-n", namespace, + ) + podOutput, err := utils.Run(cmd) + if err != nil { + return err + } + podNames := utils.GetNonEmptyLines(string(podOutput)) + if len(podNames) != 1 { + return fmt.Errorf("expect 1 controller pods running, but got %d", len(podNames)) + } + cmd = exec.Command("kubectl", "get", + "pods", podNames[0], "-o", "jsonpath={.status.phase}", + "-n", namespace, + ) + status, err := utils.Run(cmd) + if err != nil { + return err + } + if string(status) != "Running" { + return fmt.Errorf("controller pod in %s status", status) + } + return nil + } + EventuallyWithOffset(1, verifyControllerUp, time.Minute, 2*time.Second).Should(Succeed()) + }) + + AfterAll(func() { + By("cleaning up upgrade test resources") + cmd := exec.Command("kubectl", "delete", "--timeout=30s", "valkeycluster", upgradeClusterName, + "-n", namespace, + ) + _, _ = utils.Run(cmd) + + cmd = exec.Command("kubectl", "delete", "--timeout=30s", "pvc", + "-l", fmt.Sprintf("cache/name=%s", upgradeClusterName), + "-n", namespace, + ) + _, _ = utils.Run(cmd) + + cmd = exec.Command("kubectl", "delete", "--timeout=10s", "deployment", "valkey-cluster-operator-controller-manager", + "-n", namespace, + ) + _, _ = utils.Run(cmd) + }) + + It("should upgrade from 8.0.5 to 9.0.1", func() { + By("creating a ValkeyCluster with version 8.0.5") + cmd := exec.Command("kubectl", "apply", "-n", namespace, "-f", "-") + cmd.Stdin = strings.NewReader(`apiVersion: cache.halter.io/v1alpha1 +kind: ValkeyCluster +metadata: + name: valkeycluster-upgrade +spec: + image: valkey-server:8.0.5 + shards: 2 + replicas: 1 + minReadySeconds: 5 + resources: + limits: + cpu: "0.4" + memory: 314Mi + requests: + cpu: "0.2" + memory: 214Mi + storage: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 2Gi + storageClassName: standard + initialDelaySeconds: 5 +`) + _, err := utils.Run(cmd) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + + By("waiting for cluster to be ready with version 8.0.5") + EventuallyWithOffset(1, verifyClusterState(upgradeClusterName, 2, 1, ""), 3*time.Minute, 15*time.Second).Should(Succeed()) + + By("verifying cluster is running version 8.0.5") + EventuallyWithOffset(1, verifyClusterVersion(upgradeClusterName, "8.0.5"), time.Minute, 5*time.Second).Should(Succeed()) + + By("writing test data before upgrade") + testKey := "upgrade-test-key" + testValue := "upgrade-test-value-12345" + cmd = exec.Command("kubectl", "-n", namespace, "exec", upgradeClusterName+"-0-0", "-c", "valkey-cluster-node", "--", + "valkey-cli", "-c", "SET", testKey, testValue) + _, err = utils.Run(cmd) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + + By("upgrading cluster to version 9.0.1") + cmd = exec.Command("kubectl", + "-n", namespace, + "patch", "valkeycluster", upgradeClusterName, + "--type=json", + `-p=[{"op":"replace","path":"/spec/image","value":"valkey-server:9.0.1"}]`, + ) + _, err = utils.Run(cmd) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + + By("waiting for rolling update to complete") + EventuallyWithOffset(1, verifyClusterState(upgradeClusterName, 2, 1, ""), 10*time.Minute, 15*time.Second).Should(Succeed()) + + By("verifying cluster is running version 9.0.1") + EventuallyWithOffset(1, verifyClusterVersion(upgradeClusterName, "9.0.1"), 2*time.Minute, 5*time.Second).Should(Succeed()) + + By("verifying test data survived the upgrade") + cmd = exec.Command("kubectl", "-n", namespace, "exec", upgradeClusterName+"-0-0", "-c", "valkey-cluster-node", "--", + "valkey-cli", "-c", "GET", testKey) + output, err := utils.Run(cmd) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + ExpectWithOffset(1, strings.TrimSpace(string(output))).To(Equal(testValue)) + }) +}) + func verifyClusterState(name string, shards, replicas int, password string) func() error { return func() error { expectedSlots := valkey.SlotCounts(shards) cmd := exec.Command("kubectl", "get", - "pods", "-l", "cache/name=valkeycluster-sample", + "pods", "-l", fmt.Sprintf("cache/name=%s", name), "-o", "go-template={{ range .items }}"+ "{{ if not .metadata.deletionTimestamp }}"+ "{{ .metadata.name }}"+ @@ -713,3 +858,44 @@ func verifyClusterState(name string, shards, replicas int, password string) func return nil } } + +func verifyClusterVersion(name string, expectedVersion string) func() error { + return func() error { + cmd := exec.Command("kubectl", "get", + "pods", "-l", fmt.Sprintf("cache/name=%s", name), + "-o", "go-template={{ range .items }}"+ + "{{ if not .metadata.deletionTimestamp }}"+ + "{{ .metadata.name }}"+ + "{{ \"\\n\" }}{{ end }}{{ end }}", + "-n", namespace, + ) + podOutput, err := utils.Run(cmd) + if err != nil { + return fmt.Errorf("received error getting pods: %v", err) + } + podNames := utils.GetNonEmptyLines(string(podOutput)) + if len(podNames) == 0 { + return fmt.Errorf("no pods found for cluster %s", name) + } + + for _, podName := range podNames { + cmd = exec.Command("kubectl", "-n", namespace, "exec", podName, "--", + "valkey-cli", "INFO", "server") + infoOutput, err := utils.Run(cmd) + if err != nil { + return fmt.Errorf("received error running valkey-cli INFO: %v", err) + } + + re := regexp.MustCompile(`valkey_version:(\S+)`) + matches := re.FindStringSubmatch(string(infoOutput)) + if len(matches) < 2 { + return fmt.Errorf("could not find valkey_version in INFO output for pod %s", podName) + } + actualVersion := matches[1] + if actualVersion != expectedVersion { + return fmt.Errorf("pod %s running version %s, expected %s", podName, actualVersion, expectedVersion) + } + } + return nil + } +}