Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions .dagger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (m *ValkeyClusterOperator) PublishValkeyDocker(
ghToken *dagger.Secret,
) error {

valkeyVersions := []string{"8.0.3", "8.0.4", "8.0.5", "8.1.3"}
valkeyVersions := []string{"8.0.3", "8.0.4", "8.0.5", "8.1.3", "8.1.5", "9.0.1"}

for _, valkeyVersion := range valkeyVersions {
// container registry for the multi-platform image
Expand Down Expand Up @@ -303,7 +303,25 @@ func (m *ValkeyClusterOperator) BuildAndLoadLocally(
return err
}

return m.loadSingleArchImage(ctx, valkeyVariants, hostPlatform, "valkey-server:latest", sock)
err = m.loadSingleArchImage(ctx, valkeyVariants, hostPlatform, "valkey-server:latest", sock)
if err != nil {
return err
}

// Load versioned images for upgrade testing
upgradeTestVersions := []string{"8.0.5", "9.0.1"}
for _, version := range upgradeTestVersions {
valkeyVersionedVariants, err := m.BuildValkeyContainerImage(ctx, version)
if err != nil {
return err
}
err = m.loadSingleArchImage(ctx, valkeyVersionedVariants, hostPlatform, "valkey-server:"+version, sock)
if err != nil {
return err
}
}

return nil
}

func (m *ValkeyClusterOperator) detectHostPlatform(ctx context.Context) (string, error) {
Expand Down Expand Up @@ -404,6 +422,8 @@ func (m *ValkeyClusterOperator) BuildTestEnv(
WithExec([]string{"kind", "create", "cluster"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}).
WithExec([]string{"kind", "load", "docker-image", "valkey-cluster-operator:latest", "--name", "kind"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}).
WithExec([]string{"kind", "load", "docker-image", "valkey-server:latest", "--name", "kind"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}).
WithExec([]string{"kind", "load", "docker-image", "valkey-server:8.0.5", "--name", "kind"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}).
WithExec([]string{"kind", "load", "docker-image", "valkey-server:9.0.1", "--name", "kind"}, dagger.ContainerWithExecOpts{Expect: dagger.ReturnTypeAny}).
WithDirectory("/src", source).
WithWorkdir("/src")

Expand Down
190 changes: 188 additions & 2 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ var _ = Describe("controller", Ordered, func() {
)
_, err := utils.Run(cmd)
ExpectWithOffset(1, err).NotTo(HaveOccurred())
EventuallyWithOffset(1, verifyClusterState("valkeycluster-sample", 2, 1, ""), 6*time.Minute, 15*time.Second).Should(Succeed())
// Scale down requires resharding slots away from the removed shard.
// Each resharding step runs as a K8s Job with up to 5min timeout.
// Scale 3->2 shards needs 2 resharding steps, so allow 12 minutes.
EventuallyWithOffset(1, verifyClusterState("valkeycluster-sample", 2, 1, ""), 12*time.Minute, 15*time.Second).Should(Succeed())
getPods := func() error {
cmd = exec.Command("kubectl", "get", "pods",
"-l", fmt.Sprintf("cache/name=%s,app.kubernetes.io/name=valkeyCluster-operator,app.kubernetes.io/managed-by=ValkeyClusterController", "valkeycluster-sample"),
Expand Down Expand Up @@ -541,12 +544,154 @@ var _ = Describe("controller", Ordered, func() {
})
})

var _ = Describe("upgrade", Ordered, func() {
upgradeClusterName := "valkeycluster-upgrade"

BeforeAll(func() {
By("creating manager namespace")
cmd := exec.Command("kubectl", "create", "ns", namespace)
_, _ = utils.Run(cmd)

By("installing CRDs")
cmd = exec.Command("make", "install")
_, err := utils.Run(cmd)
ExpectWithOffset(1, err).NotTo(HaveOccurred())

By("deploying the controller-manager")
projectimage := "valkey-cluster-operator:latest"
cmd = exec.Command("make", "deploy", fmt.Sprintf("IMG=%s", projectimage))
_, err = utils.Run(cmd)
ExpectWithOffset(1, err).NotTo(HaveOccurred())

By("waiting for controller-manager to be ready")
verifyControllerUp := func() error {
cmd = exec.Command("kubectl", "get",
"pods", "-l", "control-plane=controller-manager",
"-o", "go-template={{ range .items }}"+
"{{ if not .metadata.deletionTimestamp }}"+
"{{ .metadata.name }}"+
"{{ \"\\n\" }}{{ end }}{{ end }}",
"-n", namespace,
)
podOutput, err := utils.Run(cmd)
if err != nil {
return err
}
podNames := utils.GetNonEmptyLines(string(podOutput))
if len(podNames) != 1 {
return fmt.Errorf("expect 1 controller pods running, but got %d", len(podNames))
}
cmd = exec.Command("kubectl", "get",
"pods", podNames[0], "-o", "jsonpath={.status.phase}",
"-n", namespace,
)
status, err := utils.Run(cmd)
if err != nil {
return err
}
if string(status) != "Running" {
return fmt.Errorf("controller pod in %s status", status)
}
return nil
}
EventuallyWithOffset(1, verifyControllerUp, time.Minute, 2*time.Second).Should(Succeed())
})

AfterAll(func() {
By("cleaning up upgrade test resources")
cmd := exec.Command("kubectl", "delete", "--timeout=30s", "valkeycluster", upgradeClusterName,
"-n", namespace,
)
_, _ = utils.Run(cmd)

cmd = exec.Command("kubectl", "delete", "--timeout=30s", "pvc",
"-l", fmt.Sprintf("cache/name=%s", upgradeClusterName),
"-n", namespace,
)
_, _ = utils.Run(cmd)

cmd = exec.Command("kubectl", "delete", "--timeout=10s", "deployment", "valkey-cluster-operator-controller-manager",
"-n", namespace,
)
_, _ = utils.Run(cmd)
})

It("should upgrade from 8.0.5 to 9.0.1", func() {
By("creating a ValkeyCluster with version 8.0.5")
cmd := exec.Command("kubectl", "apply", "-n", namespace, "-f", "-")
cmd.Stdin = strings.NewReader(`apiVersion: cache.halter.io/v1alpha1
kind: ValkeyCluster
metadata:
name: valkeycluster-upgrade
spec:
image: valkey-server:8.0.5
shards: 2
replicas: 1
minReadySeconds: 5
resources:
limits:
cpu: "0.4"
memory: 314Mi
requests:
cpu: "0.2"
memory: 214Mi
storage:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 2Gi
storageClassName: standard
initialDelaySeconds: 5
`)
_, err := utils.Run(cmd)
ExpectWithOffset(1, err).NotTo(HaveOccurred())

By("waiting for cluster to be ready with version 8.0.5")
EventuallyWithOffset(1, verifyClusterState(upgradeClusterName, 2, 1, ""), 3*time.Minute, 15*time.Second).Should(Succeed())

By("verifying cluster is running version 8.0.5")
EventuallyWithOffset(1, verifyClusterVersion(upgradeClusterName, "8.0.5"), time.Minute, 5*time.Second).Should(Succeed())

By("writing test data before upgrade")
testKey := "upgrade-test-key"
testValue := "upgrade-test-value-12345"
cmd = exec.Command("kubectl", "-n", namespace, "exec", upgradeClusterName+"-0-0", "-c", "valkey-cluster-node", "--",
"valkey-cli", "-c", "SET", testKey, testValue)
_, err = utils.Run(cmd)
ExpectWithOffset(1, err).NotTo(HaveOccurred())

By("upgrading cluster to version 9.0.1")
cmd = exec.Command("kubectl",
"-n", namespace,
"patch", "valkeycluster", upgradeClusterName,
"--type=json",
`-p=[{"op":"replace","path":"/spec/image","value":"valkey-server:9.0.1"}]`,
)
_, err = utils.Run(cmd)
ExpectWithOffset(1, err).NotTo(HaveOccurred())

By("waiting for rolling update to complete")
EventuallyWithOffset(1, verifyClusterState(upgradeClusterName, 2, 1, ""), 10*time.Minute, 15*time.Second).Should(Succeed())

By("verifying cluster is running version 9.0.1")
EventuallyWithOffset(1, verifyClusterVersion(upgradeClusterName, "9.0.1"), 2*time.Minute, 5*time.Second).Should(Succeed())

By("verifying test data survived the upgrade")
cmd = exec.Command("kubectl", "-n", namespace, "exec", upgradeClusterName+"-0-0", "-c", "valkey-cluster-node", "--",
"valkey-cli", "-c", "GET", testKey)
output, err := utils.Run(cmd)
ExpectWithOffset(1, err).NotTo(HaveOccurred())
ExpectWithOffset(1, strings.TrimSpace(string(output))).To(Equal(testValue))
})
})

func verifyClusterState(name string, shards, replicas int, password string) func() error {
return func() error {
expectedSlots := valkey.SlotCounts(shards)

cmd := exec.Command("kubectl", "get",
"pods", "-l", "cache/name=valkeycluster-sample",
"pods", "-l", fmt.Sprintf("cache/name=%s", name),
"-o", "go-template={{ range .items }}"+
"{{ if not .metadata.deletionTimestamp }}"+
"{{ .metadata.name }}"+
Expand Down Expand Up @@ -713,3 +858,44 @@ func verifyClusterState(name string, shards, replicas int, password string) func
return nil
}
}

func verifyClusterVersion(name string, expectedVersion string) func() error {
return func() error {
cmd := exec.Command("kubectl", "get",
"pods", "-l", fmt.Sprintf("cache/name=%s", name),
"-o", "go-template={{ range .items }}"+
"{{ if not .metadata.deletionTimestamp }}"+
"{{ .metadata.name }}"+
"{{ \"\\n\" }}{{ end }}{{ end }}",
"-n", namespace,
)
podOutput, err := utils.Run(cmd)
if err != nil {
return fmt.Errorf("received error getting pods: %v", err)
}
podNames := utils.GetNonEmptyLines(string(podOutput))
if len(podNames) == 0 {
return fmt.Errorf("no pods found for cluster %s", name)
}

for _, podName := range podNames {
cmd = exec.Command("kubectl", "-n", namespace, "exec", podName, "--",
"valkey-cli", "INFO", "server")
infoOutput, err := utils.Run(cmd)
if err != nil {
return fmt.Errorf("received error running valkey-cli INFO: %v", err)
}

re := regexp.MustCompile(`valkey_version:(\S+)`)
matches := re.FindStringSubmatch(string(infoOutput))
if len(matches) < 2 {
return fmt.Errorf("could not find valkey_version in INFO output for pod %s", podName)
}
actualVersion := matches[1]
if actualVersion != expectedVersion {
return fmt.Errorf("pod %s running version %s, expected %s", podName, actualVersion, expectedVersion)
}
}
return nil
}
}
Loading