diff --git a/api/v1alpha1/valkeycluster_types.go b/api/v1alpha1/valkeycluster_types.go index 8921651..251ddcc 100644 --- a/api/v1alpha1/valkeycluster_types.go +++ b/api/v1alpha1/valkeycluster_types.go @@ -82,7 +82,9 @@ type ValkeyClusterSpec struct { // +operator-sdk:csv:customresourcedefinitions:type=spec Password string `json:"password,omitempty"` - // Number of seconds after the container has started before liveness probes are initiated. + // Number of seconds after the container has started before startup probe is initiated. + // Deprecated: This field is deprecated and will be removed in a future version. + // The startup probe now uses a fixed configuration. // +operator-sdk:csv:customresourcedefinitions:type=spec // +kubebuilder:default=30 InitialDelaySeconds int32 `json:"initialDelaySeconds,omitempty"` diff --git a/config/crd/bases/cache.halter.io_valkeyclusters.yaml b/config/crd/bases/cache.halter.io_valkeyclusters.yaml index 428d75c..2cf920d 100644 --- a/config/crd/bases/cache.halter.io_valkeyclusters.yaml +++ b/config/crd/bases/cache.halter.io_valkeyclusters.yaml @@ -51,8 +51,10 @@ spec: type: string initialDelaySeconds: default: 30 - description: Number of seconds after the container has started before - liveness probes are initiated. + description: |- + Number of seconds after the container has started before startup probe is initiated. + Deprecated: This field is deprecated and will be removed in a future version. + The startup probe now uses a fixed configuration. format: int32 type: integer minReadySeconds: diff --git a/config/rbac/kustomization.yaml b/config/rbac/kustomization.yaml index 1dbc0b7..1880dce 100644 --- a/config/rbac/kustomization.yaml +++ b/config/rbac/kustomization.yaml @@ -24,3 +24,7 @@ resources: # if you do not want those helpers be installed with your Project. - valkeycluster_editor_role.yaml - valkeycluster_viewer_role.yaml +# RBAC for Valkey pods to query Kubernetes API for pod discovery +- valkey_pod_service_account.yaml +- valkey_pod_role.yaml +- valkey_pod_role_binding.yaml diff --git a/config/rbac/valkey_pod_role.yaml b/config/rbac/valkey_pod_role.yaml new file mode 100644 index 0000000..3fce5a8 --- /dev/null +++ b/config/rbac/valkey_pod_role.yaml @@ -0,0 +1,9 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: valkey-pod-reader + namespace: system +rules: +- apiGroups: [""] + resources: ["pods"] + verbs: ["get", "list"] diff --git a/config/rbac/valkey_pod_role_binding.yaml b/config/rbac/valkey_pod_role_binding.yaml new file mode 100644 index 0000000..33127a7 --- /dev/null +++ b/config/rbac/valkey_pod_role_binding.yaml @@ -0,0 +1,13 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: valkey-pod-reader-binding + namespace: system +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: valkey-pod-reader +subjects: +- kind: ServiceAccount + name: valkey-pod + namespace: system diff --git a/config/rbac/valkey_pod_service_account.yaml b/config/rbac/valkey_pod_service_account.yaml new file mode 100644 index 0000000..fc865aa --- /dev/null +++ b/config/rbac/valkey_pod_service_account.yaml @@ -0,0 +1,5 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: valkey-pod + namespace: system diff --git a/internal/controller/scripts/meet.sh b/internal/controller/scripts/meet.sh index 80a5312..1a6e87f 100644 --- a/internal/controller/scripts/meet.sh +++ b/internal/controller/scripts/meet.sh @@ -9,19 +9,49 @@ end_time=$((start_time + SCRIPT_TIMEOUT)) msg meet begin +# Function to get pod IP from Kubernetes API +get_pod_ip() { + local pod_name=$1 + local namespace=$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace) + local token=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token) + local cacert=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt + + # Query Kubernetes API for the specific pod + curl -s --cacert "$cacert" --header "Authorization: Bearer $token" \ + "https://kubernetes.default.svc/api/v1/namespaces/$namespace/pods/$pod_name" | + grep -o '"podIP":"[^"]*"' | head -1 | cut -d'"' -f4 +} + while [ "$(date +%s)" -lt "$end_time" ]; do sleep 1 - msg meet "valkey_cli $VALKEY_HOST 6379 -t 1 -c ping" - RESPONSE=$(valkey_cli "$VALKEY_HOST" 6379 -t 1 -c ping) - if [ "$RESPONSE" = "PONG" ]; then - msg meet "PONG response from $VALKEY_HOST" + # Extract pod name from FQDN (take everything before first dot) + POD_NAME=$(echo "$VALKEY_HOST" | cut -d'.' -f1) + + # Try to get IP from Kubernetes API first + ipaddress=$(get_pod_ip "$POD_NAME") + + # Fall back to DNS if Kubernetes API fails + if [ -z "$ipaddress" ]; then + msg meet "Kubernetes API lookup failed, falling back to DNS for $VALKEY_HOST" ipaddress=$(getent hosts "$VALKEY_HOST" | awk '{ print $1 }') + fi + + if [ -z "$ipaddress" ]; then + msg meet "Could not resolve $VALKEY_HOST" + continue + fi + + msg meet "valkey_cli $ipaddress 6379 -t 1 -c ping" + RESPONSE=$(valkey_cli "$ipaddress" 6379 -t 1 -c ping) + + if [ "$RESPONSE" = "PONG" ]; then + msg meet "PONG response from $ipaddress ($VALKEY_HOST)" msg meet "MEET $ipaddress" valkey_cli 127.0.0.1 6379 -t 1 -c cluster meet "$ipaddress" 6379 break else - msg meet "got response from $VALKEY_HOST: ${RESPONSE}" + msg meet "got response from $ipaddress: ${RESPONSE}" continue fi done diff --git a/internal/controller/scripts/startup.sh b/internal/controller/scripts/startup.sh new file mode 100644 index 0000000..e3b8064 --- /dev/null +++ b/internal/controller/scripts/startup.sh @@ -0,0 +1,124 @@ +#!/bin/bash + +# Valkey Node Startup Check for Kubernetes +# Returns 0 (started) if any of these conditions are met: +# 1. Cluster state is "ok" +# 2. Node has zero slots allocated +# 3. Node doesn't know about any other nodes (single node) +# 4. 300 seconds have elapsed since pod started + +set -x + +# shellcheck source=./utils.sh +. /scripts/utils.sh + +# Configuration +TIMEOUT_SECONDS="${TIMEOUT_SECONDS:-300}" + +# Function to check if timeout has elapsed since Valkey started +check_timeout() { + local info_output + info_output=$(valkey_cli 127.0.0.1 6379 -t 1 -c INFO server 2>/dev/null || echo "") + + # Extract uptime_in_seconds from INFO output + local uptime + uptime=$(echo "$info_output" | grep "^uptime_in_seconds:" | cut -d: -f2 | tr -d '\r') + + if [ -z "$uptime" ]; then + echo "Warning: Could not retrieve Valkey uptime" >&2 + return 1 + fi + + if [ "$uptime" -ge $TIMEOUT_SECONDS ]; then + echo "Startup check passed: Valkey uptime of ${uptime}s exceeds timeout of ${TIMEOUT_SECONDS}s" + return 0 + fi + return 1 +} + +# Function to check cluster state +check_cluster_state() { + local cluster_info + cluster_info=$(valkey_cli 127.0.0.1 6379 -t 1 -c CLUSTER INFO 2>/dev/null || echo "") + + if echo "$cluster_info" | grep -q "cluster_state:ok"; then + echo "Startup check passed: cluster state is ok" + return 0 + fi + return 1 +} + +# Function to check slot allocation +check_slots() { + local nodes_info + nodes_info=$(valkey_cli 127.0.0.1 6379 -t 1 -c CLUSTER NODES 2>/dev/null || echo "") + + # Find the current node (marked with "myself") + local myself_line + myself_line=$(echo "$nodes_info" | grep "myself" || echo "") + + if [ -z "$myself_line" ]; then + echo "Warning: Could not find current node in cluster nodes output" >&2 + return 1 + fi + + # Check if the line contains any slot ranges (format: [slot-slot] or single slots) + # Slots appear after the address and flags, typically after the 8th field + if ! echo "$myself_line" | grep -qE '\[?[0-9]+-?[0-9]*\]?'; then + echo "Startup check passed: node has zero slots allocated" + return 0 + fi + + return 1 +} + +# Function to check if node doesn't know about any other nodes +check_single_node() { + local nodes_info + nodes_info=$(valkey_cli 127.0.0.1 6379 -t 1 -c CLUSTER NODES 2>/dev/null || echo "") + + if [ -z "$nodes_info" ]; then + echo "Warning: Could not retrieve cluster nodes information" >&2 + return 1 + fi + + # Count the number of nodes (each node is one line) + local node_count + node_count=$(echo "$nodes_info" | grep -c "^") + + if [ "$node_count" -eq 1 ]; then + echo "Startup check passed: node doesn't know about any other nodes" + return 0 + fi + + return 1 +} + +# Main startup check logic +main() { + # Check condition 4: timeout elapsed + if check_timeout; then + exit 0 + fi + + # Check condition 1: cluster state ok + if check_cluster_state; then + exit 0 + fi + + # Check condition 3: single node (doesn't know about other nodes) + if check_single_node; then + exit 0 + fi + + # Check condition 2: zero slots allocated + if check_slots; then + exit 0 + fi + + # None of the conditions met + echo "Startup check failed: waiting for cluster state ok, zero slots, single node, or timeout" + exit 1 +} + +main diff --git a/internal/controller/valkeycluster_controller_configmap.go b/internal/controller/valkeycluster_controller_configmap.go index 0631518..1a43f3e 100644 --- a/internal/controller/valkeycluster_controller_configmap.go +++ b/internal/controller/valkeycluster_controller_configmap.go @@ -40,12 +40,18 @@ func (r *ValkeyClusterReconciler) upsertConfigMap(ctx context.Context, valkeyClu logger.Error(err, "failed to read utils.sh") return "", err } + startup, err := scripts.ReadFile("scripts/startup.sh") + if err != nil { + logger.Error(err, "failed to read startup.sh") + return "", err + } ls := labelsForValkeyCluster(valkeyCluster.Name) cmData := map[string]string{ "pre_stop.sh": string(preStop), "post_start.sh": string(postStart), "meet.sh": string(meet), "utils.sh": string(utils), + "startup.sh": string(startup), } valkeyConfContent, err := getValkeyConfigContent(valkeyCluster) if err != nil { diff --git a/internal/controller/valkeycluster_controller_statefulset.go b/internal/controller/valkeycluster_controller_statefulset.go index 4bd4589..f5910a3 100644 --- a/internal/controller/valkeycluster_controller_statefulset.go +++ b/internal/controller/valkeycluster_controller_statefulset.go @@ -104,8 +104,9 @@ func (r *ValkeyClusterReconciler) statefulSet(name string, size int32, valkeyClu }, }, Spec: corev1.PodSpec{ - NodeSelector: valkeyCluster.Spec.NodeSelector, - Tolerations: valkeyCluster.Spec.Tolerations, + ServiceAccountName: "valkey-cluster-operator-valkey-pod", + NodeSelector: valkeyCluster.Spec.NodeSelector, + Tolerations: valkeyCluster.Spec.Tolerations, SecurityContext: &corev1.PodSecurityContext{ RunAsNonRoot: &[]bool{true}[0], // IMPORTANT: seccomProfile was introduced with Kubernetes 1.19 @@ -167,13 +168,23 @@ func (r *ValkeyClusterReconciler) statefulSet(name string, size int32, valkeyClu }, }, }, + StartupProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + Exec: &corev1.ExecAction{ + Command: []string{"/bin/bash", "/scripts/startup.sh"}, + }, + }, + TimeoutSeconds: 5, + PeriodSeconds: 10, + SuccessThreshold: 1, + FailureThreshold: 30, + }, ReadinessProbe: &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ TCPSocket: &corev1.TCPSocketAction{ Port: intstr.FromInt(VALKEY_PORT), }, }, - InitialDelaySeconds: valkeyCluster.Spec.InitialDelaySeconds, }, LivenessProbe: &corev1.Probe{ ProbeHandler: corev1.ProbeHandler{ @@ -504,7 +515,7 @@ func (r *ValkeyClusterReconciler) reconcileStatefulSets(ctx context.Context, req // - valkey-server command // - valkey-server lifecycle // - valkey-server environment -// - valkey-server readiness probe +// - valkey-server startup probe // - metrics command // It's really important that you consider updating applyDesiredStatefulSetSpec if this function changes func (r *ValkeyClusterReconciler) compareActualToDesiredStatefulSet(ctx context.Context, valkeyCluster *cachev1alpha1.ValkeyCluster, stsName string) (bool, error) { @@ -533,8 +544,8 @@ func (r *ValkeyClusterReconciler) compareActualToDesiredStatefulSet(ctx context. log.Info(fmt.Sprintf("StatefulSet %s Env is different: %s", stsName, cmp.Diff(actual.Spec.Template.Spec.Containers[0].Env, desired.Spec.Template.Spec.Containers[0].Env))) diff = true } - if !cmp.Equal(actual.Spec.Template.Spec.Containers[0].ReadinessProbe.InitialDelaySeconds, desired.Spec.Template.Spec.Containers[0].ReadinessProbe.InitialDelaySeconds) { - log.Info(fmt.Sprintf("StatefulSet %s ReadinessProbe.InitialDelaySeconds is different: %s", stsName, cmp.Diff(actual.Spec.Template.Spec.Containers[0].ReadinessProbe.InitialDelaySeconds, desired.Spec.Template.Spec.Containers[0].ReadinessProbe.InitialDelaySeconds))) + if !cmp.Equal(actual.Spec.Template.Spec.Containers[0].StartupProbe, desired.Spec.Template.Spec.Containers[0].StartupProbe) { + log.Info(fmt.Sprintf("StatefulSet %s StartupProbe is different: %s", stsName, cmp.Diff(actual.Spec.Template.Spec.Containers[0].StartupProbe, desired.Spec.Template.Spec.Containers[0].StartupProbe))) diff = true } @@ -555,7 +566,7 @@ func (r *ValkeyClusterReconciler) applyDesiredStatefulSetSpec(valkeyCluster *cac ss.Spec.Template.Spec.Containers[0].Lifecycle = desired.Spec.Template.Spec.Containers[0].Lifecycle ss.Spec.Template.Spec.Containers[0].Env = desired.Spec.Template.Spec.Containers[0].Env ss.Spec.Template.Spec.Containers[0].Image = desired.Spec.Template.Spec.Containers[0].Image - ss.Spec.Template.Spec.Containers[0].ReadinessProbe = desired.Spec.Template.Spec.Containers[0].ReadinessProbe + ss.Spec.Template.Spec.Containers[0].StartupProbe = desired.Spec.Template.Spec.Containers[0].StartupProbe ss.Spec.Template.Spec.Containers[1].Args = desired.Spec.Template.Spec.Containers[1].Args ss.Spec.Template.Spec.Containers[1].Image = desired.Spec.Template.Spec.Containers[1].Image diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 2d75a04..46730c4 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -229,7 +229,7 @@ var _ = Describe("controller", Ordered, func() { } return nil } - Eventually(getPvc, time.Minute, time.Second).Should(Succeed()) + Eventually(getPvc, 5*time.Minute, time.Second).Should(Succeed()) }) It("should add replicas", func() { cmd := exec.Command("kubectl",