From 520a9ce75fb03d7ad1a8e1c50cd594a97eb24997 Mon Sep 17 00:00:00 2001 From: Krzysztof Ostrowski Date: Thu, 30 Apr 2026 11:16:43 +0200 Subject: [PATCH 1/3] kms/health: add KMSv2 health monitor library Introduces pkg/operator/encryption/kms/health/: - Probe calls Status() on an injected kmsservice.Service (mirrors preflight's pattern) and classifies the response as ok, unhealthy:not-ok:, or unhealthy:rpc-error:. Never returns an error; failures are baked into HealthStatus.Healthz so the monitor loop stays flat. - StatusWriter / ConfigMapWriter updates an existing ConfigMap in place. Hard-fails on miss so misconfiguration is loud. - Monitor drives the tick loop: probe, stamp observer pod, write, sleep on healthy/unhealthy interval, repeat until ctx cancel. Write errors are logged and tolerated. - health.NewCommand(ctx) wires flags, validates inputs, builds a rest.Config, dials the UDS via the same kmsv2 client preflight uses, and hands a Probe + ConfigMapWriter to the Monitor. --output-mode=condition is reserved and explicitly rejected so the OpenShift track can add it without silent behavior change. Tests use FakeClock with the HasWaiters() barrier for race-free stepping through healthy->unhealthy->healthy transitions. --- pkg/operator/encryption/kms/health/cmd.go | 238 ++++++++++++++++ .../encryption/kms/health/cmd_test.go | 180 ++++++++++++ pkg/operator/encryption/kms/health/doc.go | 5 + pkg/operator/encryption/kms/health/monitor.go | 78 ++++++ .../encryption/kms/health/monitor_test.go | 261 ++++++++++++++++++ pkg/operator/encryption/kms/health/probe.go | 89 ++++++ .../encryption/kms/health/probe_test.go | 219 +++++++++++++++ pkg/operator/encryption/kms/health/types.go | 49 ++++ pkg/operator/encryption/kms/health/writer.go | 10 + .../encryption/kms/health/writer_configmap.go | 100 +++++++ .../kms/health/writer_configmap_test.go | 244 ++++++++++++++++ 11 files changed, 1473 insertions(+) create mode 100644 pkg/operator/encryption/kms/health/cmd.go create mode 100644 pkg/operator/encryption/kms/health/cmd_test.go create mode 100644 pkg/operator/encryption/kms/health/doc.go create mode 100644 pkg/operator/encryption/kms/health/monitor.go create mode 100644 pkg/operator/encryption/kms/health/monitor_test.go create mode 100644 pkg/operator/encryption/kms/health/probe.go create mode 100644 pkg/operator/encryption/kms/health/probe_test.go create mode 100644 pkg/operator/encryption/kms/health/types.go create mode 100644 pkg/operator/encryption/kms/health/writer.go create mode 100644 pkg/operator/encryption/kms/health/writer_configmap.go create mode 100644 pkg/operator/encryption/kms/health/writer_configmap_test.go diff --git a/pkg/operator/encryption/kms/health/cmd.go b/pkg/operator/encryption/kms/health/cmd.go new file mode 100644 index 0000000000..afbb7bec51 --- /dev/null +++ b/pkg/operator/encryption/kms/health/cmd.go @@ -0,0 +1,238 @@ +package health + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/spf13/cobra" + "github.com/spf13/pflag" + + k8senvelopekmsv2 "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/klog/v2" +) + +const ( + outputModeConfigMap = "configmap" + outputModeCondition = "condition" + + providerName = "kms-health-monitor" +) + +type commandOptions struct { + kmsSocket string + probeInterval time.Duration + probeIntervalUnhealthy time.Duration + probeTimeout time.Duration + writeTimeout time.Duration + outputMode string + configmapNamespace string + configmapName string + observerPodName string + kubeconfig string +} + +// NewCommand wires the cobra command. ctx is owned by main() so signal +// handling lives there. +func NewCommand(ctx context.Context) *cobra.Command { + o := &commandOptions{ + kmsSocket: "/var/run/kmsplugin/kms.sock", + probeInterval: 60 * time.Second, + probeIntervalUnhealthy: 10 * time.Second, + probeTimeout: 3 * time.Second, + writeTimeout: 5 * time.Second, + outputMode: outputModeConfigMap, + } + cmd := &cobra.Command{ + Use: "kms-health-monitor", + Short: "Observes a co-located KMSv2 plugin and publishes a health status.", + RunE: func(cmd *cobra.Command, args []string) error { + if err := o.validate(); err != nil { + return err + } + return o.run(ctx) + }, + } + o.addFlags(cmd.Flags()) + return cmd +} + +func (o *commandOptions) addFlags(fs *pflag.FlagSet) { + fs.StringVar( + &o.kmsSocket, + "kms-socket", + o.kmsSocket, + "filesystem path to the KMSv2 plugin UDS", + ) + fs.DurationVar( + &o.probeInterval, + "probe-interval", + o.probeInterval, + "probe cadence while the plugin is healthy", + ) + fs.DurationVar( + &o.probeIntervalUnhealthy, + "probe-interval-unhealthy", + o.probeIntervalUnhealthy, + "probe cadence after an unhealthy result until recovery", + ) + fs.DurationVar( + &o.probeTimeout, + "probe-timeout", + o.probeTimeout, + "deadline for each Status RPC", + ) + fs.DurationVar( + &o.writeTimeout, + "write-timeout", + o.writeTimeout, + "deadline for each status write (e.g. ConfigMap Update); should fit inside --probe-interval-unhealthy to preserve cadence under apiserver slowness", + ) + fs.StringVar( + &o.outputMode, + "output-mode", + o.outputMode, + "status sink: configmap (condition is reserved for the OpenShift track)", + ) + fs.StringVar( + &o.configmapNamespace, + "configmap-namespace", + "", + "namespace of the status ConfigMap (required when output-mode=configmap; "+ + "caller must have RBAC to patch ConfigMaps in this namespace)", + ) + fs.StringVar( + &o.configmapName, + "configmap-name", + "", + "name of the status ConfigMap; defaults to \"kms-health-${POD_NAME}\". "+ + "MUST be unique per monitor instance: concurrent writers on one CM "+ + "produce last-writer-wins flap on every key", + ) + fs.StringVar( + &o.observerPodName, + "observer-pod-name", + os.Getenv("POD_NAME"), + "pod name recorded in the status; defaults to $POD_NAME", + ) + fs.StringVar( + &o.kubeconfig, + "kubeconfig", + "", + "path to a kubeconfig; empty uses in-cluster config", + ) +} + +func (o *commandOptions) validate() error { + if o.kmsSocket == "" { + return fmt.Errorf("--kms-socket is required") + } + if o.probeInterval <= 0 { + return fmt.Errorf("--probe-interval must be positive") + } + if o.probeIntervalUnhealthy <= 0 { + return fmt.Errorf("--probe-interval-unhealthy must be positive") + } + if o.probeTimeout <= 0 { + return fmt.Errorf("--probe-timeout must be positive") + } + if o.writeTimeout <= 0 { + return fmt.Errorf("--write-timeout must be positive") + } + // $POD_NAME defaulting happens at flag-registration time in addFlags; + // by here observerPodName is the flag, env, or genuinely empty. + if o.observerPodName == "" { + return fmt.Errorf( + "--observer-pod-name is required (or set $POD_NAME)", + ) + } + switch o.outputMode { + case outputModeConfigMap: + if o.configmapNamespace == "" { + return fmt.Errorf( + "--configmap-namespace is required when --output-mode=%s", + outputModeConfigMap, + ) + } + if o.configmapName == "" { + // Wrap pod identity rather than using it bare. Advertises + // ownership and avoids colliding with a same-namespaced CM + // that some other component happens to name after the pod. + o.configmapName = "kms-health-" + o.observerPodName + } + case outputModeCondition: + return fmt.Errorf( + "--output-mode=%s is reserved for the OpenShift track and not implemented", + outputModeCondition, + ) + default: + return fmt.Errorf( + "--output-mode must be %q or %q (got %q)", + outputModeConfigMap, + outputModeCondition, + o.outputMode, + ) + } + return nil +} + +func (o *commandOptions) run(ctx context.Context) error { + cfg, err := buildRESTConfig(o.kubeconfig) + if err != nil { + return fmt.Errorf("build rest config: %w", err) + } + client, err := kubernetes.NewForConfig(cfg) + if err != nil { + return fmt.Errorf("build kubernetes client: %w", err) + } + + writer := NewConfigMapWriter(client, o.configmapNamespace, o.configmapName) + + // WaitForReady(true) is set inside the kmsv2 client, so Dial returns + // immediately even if the plugin isn't listening yet. + endpoint := "unix://" + o.kmsSocket + service, err := k8senvelopekmsv2.NewGRPCService( + ctx, + endpoint, + providerName, + o.probeTimeout, + ) + if err != nil { + return fmt.Errorf("dial KMS plugin at %q: %w", endpoint, err) + } + probe := NewProbe(service, 0) + + monitor := NewMonitor( + probe, + writer, + o.observerPodName, + o.probeInterval, + o.probeIntervalUnhealthy, + o.writeTimeout, + ) + + klog.InfoS("kms-health-monitor starting", + "socket", o.kmsSocket, + "configmap", o.configmapNamespace+"/"+o.configmapName, + "observerPod", o.observerPodName, + "healthyInterval", o.probeInterval, + "unhealthyInterval", o.probeIntervalUnhealthy, + "probeTimeout", o.probeTimeout, + "writeTimeout", o.writeTimeout, + ) + + monitor.Run(ctx) + klog.Info("kms-health-monitor stopping") + return nil +} + +func buildRESTConfig(kubeconfig string) (*rest.Config, error) { + if kubeconfig != "" { + return clientcmd.BuildConfigFromFlags("", kubeconfig) + } + return rest.InClusterConfig() +} diff --git a/pkg/operator/encryption/kms/health/cmd_test.go b/pkg/operator/encryption/kms/health/cmd_test.go new file mode 100644 index 0000000000..712a4bdd0c --- /dev/null +++ b/pkg/operator/encryption/kms/health/cmd_test.go @@ -0,0 +1,180 @@ +package health + +import ( + "strings" + "testing" + "time" + + "github.com/spf13/pflag" +) + +// validOpts is the minimal fully-valid commandOptions. Every test case +// below mutates a copy of this to isolate one validation rule at a time. +func validOpts() commandOptions { + return commandOptions{ + kmsSocket: "/var/run/kms.sock", + probeInterval: 60 * time.Second, + probeIntervalUnhealthy: 10 * time.Second, + probeTimeout: 3 * time.Second, + writeTimeout: 5 * time.Second, + outputMode: outputModeConfigMap, + configmapNamespace: "kms-health-test", + configmapName: "kms-health-master-0", + observerPodName: "master-0", + } +} + +func TestValidate_acceptsValidOptions(t *testing.T) { + o := validOpts() + if err := o.validate(); err != nil { + t.Fatalf("validate(valid): %v", err) + } +} + +func TestValidate_rejectsMissingOrZeroFields(t *testing.T) { + cases := []struct { + name string + mut func(*commandOptions) + wants string // substring expected in the error + }{ + {"empty kmsSocket", func(o *commandOptions) { o.kmsSocket = "" }, "--kms-socket"}, + {"zero probeInterval", func(o *commandOptions) { o.probeInterval = 0 }, "--probe-interval"}, + {"zero probeIntervalUnhealthy", func(o *commandOptions) { o.probeIntervalUnhealthy = 0 }, "--probe-interval-unhealthy"}, + {"zero probeTimeout", func(o *commandOptions) { o.probeTimeout = 0 }, "--probe-timeout"}, + {"zero writeTimeout", func(o *commandOptions) { o.writeTimeout = 0 }, "--write-timeout"}, + {"empty configmapNamespace", func(o *commandOptions) { o.configmapNamespace = "" }, "--configmap-namespace"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + o := validOpts() + tc.mut(&o) + err := o.validate() + if err == nil { + t.Fatalf("validate: want error containing %q, got nil", tc.wants) + } + if !strings.Contains(err.Error(), tc.wants) { + t.Errorf("validate: got %q, want substring %q", err.Error(), tc.wants) + } + }) + } +} + +func TestValidate_outputModeEnum(t *testing.T) { + cases := []struct { + name string + mode string + wantErr bool + wantSubstr string + }{ + {"configmap accepted", outputModeConfigMap, false, ""}, + {"condition rejected as reserved", outputModeCondition, true, "reserved for the OpenShift track"}, + {"unknown rejected with both options listed", "bogus", true, `must be "configmap" or "condition"`}, + {"empty rejected with both options listed", "", true, `must be "configmap" or "condition"`}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + o := validOpts() + o.outputMode = tc.mode + err := o.validate() + if tc.wantErr { + if err == nil { + t.Fatalf("validate: want error containing %q, got nil", tc.wantSubstr) + } + if !strings.Contains(err.Error(), tc.wantSubstr) { + t.Errorf("validate: got %q, want substring %q", err.Error(), tc.wantSubstr) + } + return + } + if err != nil { + t.Errorf("validate: got %v, want nil", err) + } + }) + } +} + +func TestAddFlags_observerPodNameDefaultsFromPodNameEnv(t *testing.T) { + t.Setenv("POD_NAME", "from-env-0") + o := commandOptions{} + fs := pflag.NewFlagSet("test", pflag.ContinueOnError) + o.addFlags(fs) + if err := fs.Parse(nil); err != nil { + t.Fatalf("parse: %v", err) + } + if o.observerPodName != "from-env-0" { + t.Errorf("observerPodName: got %q, want %q (env-derived flag default)", o.observerPodName, "from-env-0") + } +} + +func TestValidate_observerPodNameRequiredWhenEmpty(t *testing.T) { + o := validOpts() + o.observerPodName = "" + + err := o.validate() + if err == nil { + t.Fatal("validate: want error, got nil") + } + if !strings.Contains(err.Error(), "--observer-pod-name") { + t.Errorf("validate: got %q, want substring --observer-pod-name", err.Error()) + } +} + +func TestValidate_configmapNameDefaultsFromObserverPodName(t *testing.T) { + o := validOpts() + o.configmapName = "" // force the wrapped default + + if err := o.validate(); err != nil { + t.Fatalf("validate: %v", err) + } + want := "kms-health-master-0" + if o.configmapName != want { + t.Errorf("configmapName: got %q, want %q (wrapped default from observerPodName)", o.configmapName, want) + } +} + +func TestAddFlagsAndValidate_configmapNameDefaultChainsThroughEnv(t *testing.T) { + // Verifies the full chain (env → observerPodName → configmapName) + // across the two layers it now spans: addFlags reads $POD_NAME at + // flag-registration time, validate derives configmapName from the + // resolved observerPodName. + t.Setenv("POD_NAME", "kube-apiserver-3") + // Field defaults that addFlags wires to non-empty values + // (kmsSocket, probeInterval, etc.) come from this struct literal — + // addFlags references them as the flag default. Fields that addFlags + // defaults to "" (configmapNamespace, configmapName, observerPodName) + // have to be supplied via flag args, since addFlags ignores the + // struct value for those. + o := commandOptions{ + kmsSocket: "/var/run/kms.sock", + probeInterval: 60 * time.Second, + probeIntervalUnhealthy: 10 * time.Second, + probeTimeout: 3 * time.Second, + writeTimeout: 5 * time.Second, + outputMode: outputModeConfigMap, + } + fs := pflag.NewFlagSet("test", pflag.ContinueOnError) + o.addFlags(fs) + if err := fs.Parse([]string{"--configmap-namespace=kms-health-test"}); err != nil { + t.Fatalf("parse: %v", err) + } + if err := o.validate(); err != nil { + t.Fatalf("validate: %v", err) + } + if o.observerPodName != "kube-apiserver-3" { + t.Errorf("observerPodName: got %q, want kube-apiserver-3", o.observerPodName) + } + if o.configmapName != "kms-health-kube-apiserver-3" { + t.Errorf("configmapName: got %q, want kms-health-kube-apiserver-3", o.configmapName) + } +} + +func TestValidate_explicitConfigmapNameNotOverridden(t *testing.T) { + o := validOpts() + o.configmapName = "explicit-cm" + + if err := o.validate(); err != nil { + t.Fatalf("validate: %v", err) + } + if o.configmapName != "explicit-cm" { + t.Errorf("configmapName: got %q, want explicit-cm (default must not override caller-set value)", o.configmapName) + } +} diff --git a/pkg/operator/encryption/kms/health/doc.go b/pkg/operator/encryption/kms/health/doc.go new file mode 100644 index 0000000000..7b24795f2d --- /dev/null +++ b/pkg/operator/encryption/kms/health/doc.go @@ -0,0 +1,5 @@ +// Package health probes a co-located KMSv2 plugin on a cadence and +// publishes a classified, timestamped HealthStatus through a StatusWriter. +// Used by operators and condition reporters that need plugin health +// without dialing the socket themselves. +package health diff --git a/pkg/operator/encryption/kms/health/monitor.go b/pkg/operator/encryption/kms/health/monitor.go new file mode 100644 index 0000000000..5d16ee1314 --- /dev/null +++ b/pkg/operator/encryption/kms/health/monitor.go @@ -0,0 +1,78 @@ +package health + +import ( + "context" + "time" + + "k8s.io/klog/v2" + "k8s.io/utils/clock" +) + +type Prober interface { + Probe(ctx context.Context) HealthStatus +} + +// Monitor never exits on probe or write failures; the next tick is the +// retry, and ctx cancellation is the only exit path. Skipped writes +// (apiserver Forbidden, NotFound, transient network failure) leave the +// published status one tick stale, up to one healthyInterval; the next +// tick republishes. +type Monitor struct { + probe Prober + writer StatusWriter + observerPod string + healthyInterval time.Duration + unhealthyInterval time.Duration + // writeTimeout bounds each writer.Write call so apiserver slowness + // cannot stall the probe loop. probeTimeout + writeTimeout stack + // per tick. Keep their sum below unhealthyInterval, otherwise a + // stuck Write delays the next tick past the unhealthy cadence and + // the "tighten on unhealthy" mechanism collapses. Must be > 0. + writeTimeout time.Duration + clock clock.Clock +} + +func NewMonitor( + prober Prober, + writer StatusWriter, + observerPod string, + healthyInterval, unhealthyInterval, writeTimeout time.Duration, +) *Monitor { + return &Monitor{ + probe: prober, + writer: writer, + observerPod: observerPod, + healthyInterval: healthyInterval, + unhealthyInterval: unhealthyInterval, + writeTimeout: writeTimeout, + clock: clock.RealClock{}, + } +} + +// Run blocks until ctx is cancelled. +func (m *Monitor) Run(ctx context.Context) { + for { + status := m.probe.Probe(ctx) + status.ObserverPod = m.observerPod + + // Cancel inline rather than via defer: defers in unbounded loops + // accumulate until the function returns. + writeCtx, writeCancel := context.WithTimeout(ctx, m.writeTimeout) + err := m.writer.Write(writeCtx, status) + writeCancel() + if err != nil { + klog.ErrorS(err, "kms-health: writer failed; continuing") + } + + interval := m.healthyInterval + if !status.Healthz.IsOK() { + interval = m.unhealthyInterval + } + + select { + case <-ctx.Done(): + return + case <-m.clock.After(interval): + } + } +} diff --git a/pkg/operator/encryption/kms/health/monitor_test.go b/pkg/operator/encryption/kms/health/monitor_test.go new file mode 100644 index 0000000000..439266a0cd --- /dev/null +++ b/pkg/operator/encryption/kms/health/monitor_test.go @@ -0,0 +1,261 @@ +package health + +import ( + "context" + "runtime" + "sync" + "testing" + "time" + + testclock "k8s.io/utils/clock/testing" +) + +// recordingWriter buffers every HealthStatus it receives and lets the +// test block on each write via a channel. Bounded buffer is fine for +// unit-test scope; if a Monitor change starts writing way more than +// expected the test will deadlock — that's a useful failure. +type recordingWriter struct { + writes chan HealthStatus +} + +func newRecordingWriter() *recordingWriter { + return &recordingWriter{writes: make(chan HealthStatus, 32)} +} + +func (w *recordingWriter) Write(ctx context.Context, status HealthStatus) error { + w.writes <- status + return nil +} + +// scriptedProber hands out pre-arranged HealthStatus values in order. +// Each Probe call consumes one. Designed to block if the Monitor asks +// for more than the test scripted — surfaces unintended extra probes. +type scriptedProber struct { + mu sync.Mutex + results []HealthStatus + calls int +} + +func (s *scriptedProber) Probe(ctx context.Context) HealthStatus { + s.mu.Lock() + defer s.mu.Unlock() + if s.calls >= len(s.results) { + // Return something sane rather than panicking mid-test; the + // writer buffer will catch unexpected writes. + s.calls++ + return HealthStatus{Healthz: Healthz{Class: HealthClassOK}, Timestamp: time.Now()} + } + r := s.results[s.calls] + s.calls++ + return r +} + +// waitForWaiters polls until the FakeClock has a registered waiter, so +// the next Step() actually triggers the Monitor's next-tick timer +// rather than being lost to the void. Bounded so misbehaving Monitors +// fail the test instead of hanging. +func waitForWaiters(t *testing.T, clk *testclock.FakeClock) { + t.Helper() + deadline := time.Now().Add(2 * time.Second) + for !clk.HasWaiters() { + if time.Now().After(deadline) { + t.Fatal("Monitor never registered a clock waiter — not scheduling next tick") + } + time.Sleep(time.Millisecond) + } +} + +func TestMonitor_transitionsHealthyUnhealthyHealthy(t *testing.T) { + t0 := time.Date(2026, 4, 24, 12, 0, 0, 0, time.UTC) + clk := testclock.NewFakeClock(t0) + prober := &scriptedProber{ + results: []HealthStatus{ + {Healthz: Healthz{Class: HealthClassOK}, Timestamp: t0}, + {Healthz: Healthz{Class: HealthClassNotOK, Detail: "boom"}, Timestamp: t0.Add(60 * time.Second)}, + {Healthz: Healthz{Class: HealthClassOK}, Timestamp: t0.Add(70 * time.Second)}, + }, + } + writer := newRecordingWriter() + + m := &Monitor{ + probe: prober, + writer: writer, + observerPod: "test-pod-1", + healthyInterval: 60 * time.Second, + unhealthyInterval: 10 * time.Second, + writeTimeout: 5 * time.Second, + clock: clk, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan struct{}) + go func() { + m.Run(ctx) + close(done) + }() + + // Tick 1: healthy. Probed immediately, no clock step needed first. + got1 := <-writer.writes + if !got1.Healthz.IsOK() { + t.Errorf("tick 1 Healthz: got %q, want IsOK", got1.Healthz) + } + if got1.ObserverPod != "test-pod-1" { + t.Errorf("tick 1 ObserverPod: got %q, want test-pod-1", got1.ObserverPod) + } + + // Should now be waiting for healthyInterval. + waitForWaiters(t, clk) + clk.Step(60 * time.Second) + + // Tick 2: unhealthy. Monitor must now switch to unhealthyInterval. + got2 := <-writer.writes + wantTick2 := Healthz{Class: HealthClassNotOK, Detail: "boom"} + if got2.Healthz != wantTick2 { + t.Errorf("tick 2 Healthz: got %q, want %q", got2.Healthz, wantTick2) + } + + // Should be waiting for unhealthyInterval (10s), not 60s. Verify by + // stepping only 10s and seeing the next probe fire. + waitForWaiters(t, clk) + clk.Step(10 * time.Second) + + // Tick 3: healthy again. + got3 := <-writer.writes + if !got3.Healthz.IsOK() { + t.Errorf("tick 3 Healthz: got %q, want IsOK", got3.Healthz) + } + + cancel() + <-done + + // Timestamps must be strictly advancing across the three recorded writes. + if !got2.Timestamp.After(got1.Timestamp) { + t.Errorf("timestamps not advancing: t1=%v t2=%v", got1.Timestamp, got2.Timestamp) + } + if !got3.Timestamp.After(got2.Timestamp) { + t.Errorf("timestamps not advancing: t2=%v t3=%v", got2.Timestamp, got3.Timestamp) + } +} + +func TestMonitor_stopsOnContextCancel(t *testing.T) { + t0 := time.Date(2026, 4, 24, 12, 0, 0, 0, time.UTC) + clk := testclock.NewFakeClock(t0) + prober := &scriptedProber{ + results: []HealthStatus{{Healthz: Healthz{Class: HealthClassOK}, Timestamp: t0}}, + } + writer := newRecordingWriter() + + m := &Monitor{ + probe: prober, + writer: writer, + observerPod: "p", + healthyInterval: 60 * time.Second, + unhealthyInterval: 10 * time.Second, + writeTimeout: 5 * time.Second, + clock: clk, + } + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + m.Run(ctx) + close(done) + }() + + <-writer.writes // first probe happened + waitForWaiters(t, clk) + cancel() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("Monitor did not stop within 2s of context cancel") + } +} + +// runMonitorCycle starts a Monitor, waits for the first write + clock +// waiter, then cancels and waits for Run to return. Shared by the leak +// test so its iteration body stays focused on the leak assertion. +func runMonitorCycle(t *testing.T) { + t.Helper() + t0 := time.Date(2026, 4, 24, 12, 0, 0, 0, time.UTC) + clk := testclock.NewFakeClock(t0) + prober := &scriptedProber{ + results: []HealthStatus{ + {Healthz: Healthz{Class: HealthClassOK}, Timestamp: t0}, + }, + } + writer := newRecordingWriter() + + m := &Monitor{ + probe: prober, + writer: writer, + observerPod: "p", + healthyInterval: 60 * time.Second, + unhealthyInterval: 10 * time.Second, + writeTimeout: 5 * time.Second, + clock: clk, + } + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan struct{}) + go func() { + m.Run(ctx) + close(done) + }() + + <-writer.writes + waitForWaiters(t, clk) + cancel() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("Monitor did not stop within 2s of context cancel") + } +} + +// TestMonitor_noGoroutineLeak verifies Run does not leak goroutines +// after ctx cancellation. Plan §14.2 calls out goroutine leakage as a +// real-class-of-bug for the gRPC dial path; the same risk applies to +// the loop that owns the long-lived select on ctx.Done() and +// clock.After. +// +// The test runs many cycles after a warm-up snapshot. A single-shot +// check would miss per-iteration accumulation (e.g. an orphaned +// goroutine spawned inside Run that exits on the *next* tick rather +// than on ctx.Done()). +func TestMonitor_noGoroutineLeak(t *testing.T) { + // Warm up before snapshotting: first run pulls in lazy klog buffers, + // fake-clock test internals, etc. Snapshotting beforehand would + // surface a one-time bump as a leak. + runMonitorCycle(t) + + runtime.GC() + baseline := runtime.NumGoroutine() + + const iterations = 20 + for range iterations { + runMonitorCycle(t) + } + + // Cleanup is scheduler-dependent; poll up to a deadline rather than + // using a fixed sleep that flakes on a loaded CI runner. + runtime.GC() + deadline := time.Now().Add(2 * time.Second) + for runtime.NumGoroutine() > baseline && time.Now().Before(deadline) { + time.Sleep(20 * time.Millisecond) + runtime.GC() + } + + if got := runtime.NumGoroutine(); got > baseline { + // Dump live stacks so a future regression is diagnosable from CI + // output alone. Without this, "leaked N goroutines" gives no clue + // to the leaking site. + buf := make([]byte, 1<<16) + n := runtime.Stack(buf, true) + t.Errorf("goroutine leak after %d cycles: baseline=%d, got=%d\nstacks:\n%s", + iterations, baseline, got, buf[:n]) + } +} diff --git a/pkg/operator/encryption/kms/health/probe.go b/pkg/operator/encryption/kms/health/probe.go new file mode 100644 index 0000000000..962741dbbb --- /dev/null +++ b/pkg/operator/encryption/kms/health/probe.go @@ -0,0 +1,89 @@ +package health + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "strings" + "time" + + "google.golang.org/grpc/status" + kmsservice "k8s.io/kms/pkg/service" +) + +// healthzMaxBodyLen caps the plugin's Healthz body. Without it a +// misbehaving plugin could push multi-MB strings into our ConfigMap on +// every tick. +const healthzMaxBodyLen = 200 + +// Probe never returns an error; failures are classified into +// HealthStatus.Healthz so the Monitor loop stays flat. +type Probe struct { + service kmsservice.Service + timeout time.Duration + now func() time.Time +} + +// NewProbe wraps the per-probe Status RPC with timeout if positive; 0 +// relies solely on ctx (or the kmsv2 client's own deadline). +func NewProbe(service kmsservice.Service, timeout time.Duration) *Probe { + return &Probe{ + service: service, + timeout: timeout, + now: time.Now, + } +} + +// Probe blocks until Status returns or ctx / the per-probe timeout fires. +func (p *Probe) Probe(ctx context.Context) HealthStatus { + if p.timeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, p.timeout) + defer cancel() + } + resp, err := p.service.Status(ctx) + timestamp := p.now() + if err != nil { + return HealthStatus{ + Healthz: classifyRPCError(err), + Timestamp: timestamp, + } + } + return HealthStatus{ + Healthz: classifyHealthz(resp.Healthz), + KeyIDHash: hashKeyID(resp.KeyID), + Timestamp: timestamp, + } +} + +// classifyHealthz truncates overlong bodies on a UTF-8-safe boundary: +// the proto permits free-form text, ConfigMap.Data must be valid UTF-8, +// and naive byte-slicing at healthzMaxBodyLen could split a multi-byte +// rune. ToValidUTF8 drops any partial sequence at the cut. +func classifyHealthz(body string) Healthz { + if body == string(HealthClassOK) { + return Healthz{Class: HealthClassOK} + } + if len(body) > healthzMaxBodyLen { + body = strings.ToValidUTF8(body[:healthzMaxBodyLen], "") + } + return Healthz{Class: HealthClassNotOK, Detail: body} +} + +// classifyRPCError uses status.Code, which returns codes.Unknown for +// non-gRPC errors and the real code for gRPC status errors. Covers +// transport failures and deadline expirations without extra branching. +func classifyRPCError(err error) Healthz { + return Healthz{ + Class: HealthClassRPCError, + Detail: status.Code(err).String(), + } +} + +func hashKeyID(keyID string) string { + if keyID == "" { + return "" + } + sum := sha256.Sum256([]byte(keyID)) + return hex.EncodeToString(sum[:]) +} diff --git a/pkg/operator/encryption/kms/health/probe_test.go b/pkg/operator/encryption/kms/health/probe_test.go new file mode 100644 index 0000000000..7966c9f56e --- /dev/null +++ b/pkg/operator/encryption/kms/health/probe_test.go @@ -0,0 +1,219 @@ +package health + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "errors" + "strings" + "testing" + "time" + "unicode/utf8" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + kmsservice "k8s.io/kms/pkg/service" +) + +// testTimeout is a generous per-probe timeout for fast-fake tests that +// should never hit it. Tests that DO exercise the deadline path use a +// short timeout locally. +const testTimeout = 10 * time.Second + +// fakeService injects scripted Status responses. Same shape as +// preflight's fake (checker_test.go:14-30) — only StatusFn is exercised +// here because the Probe never calls Encrypt/Decrypt. +type fakeService struct { + StatusFn func(ctx context.Context) (*kmsservice.StatusResponse, error) +} + +func (f *fakeService) Status(ctx context.Context) (*kmsservice.StatusResponse, error) { + return f.StatusFn(ctx) +} + +func (f *fakeService) Encrypt(ctx context.Context, uid string, data []byte) (*kmsservice.EncryptResponse, error) { + panic("Encrypt not expected") +} + +func (f *fakeService) Decrypt(ctx context.Context, uid string, req *kmsservice.DecryptRequest) ([]byte, error) { + panic("Decrypt not expected") +} + +func sha256Hex(s string) string { + sum := sha256.Sum256([]byte(s)) + return hex.EncodeToString(sum[:]) +} + +func TestProbe_healthy(t *testing.T) { + fake := &fakeService{ + StatusFn: func(ctx context.Context) (*kmsservice.StatusResponse, error) { + return &kmsservice.StatusResponse{ + Healthz: "ok", + Version: "v2", + KeyID: "test-key-1", + }, nil + }, + } + + probe := NewProbe(fake, testTimeout) + got := probe.Probe(context.Background()) + + if !got.Healthz.IsOK() { + t.Errorf("Healthz: got %q, want IsOK", got.Healthz) + } + wantHash := sha256Hex("test-key-1") + if got.KeyIDHash != wantHash { + t.Errorf("KeyIDHash: got %q, want %q", got.KeyIDHash, wantHash) + } + if got.Timestamp.IsZero() { + t.Error("Timestamp: expected non-zero") + } +} + +func TestProbe_notOk_classifiesAsUnhealthy(t *testing.T) { + fake := &fakeService{ + StatusFn: func(ctx context.Context) (*kmsservice.StatusResponse, error) { + return &kmsservice.StatusResponse{ + Healthz: "not-ready", + KeyID: "test-key-1", + }, nil + }, + } + + got := NewProbe(fake, testTimeout).Probe(context.Background()) + + want := Healthz{Class: HealthClassNotOK, Detail: "not-ready"} + if got.Healthz != want { + t.Errorf("Healthz: got %q, want %q", got.Healthz, want) + } +} + +func TestProbe_notOk_truncatesTo200Chars(t *testing.T) { + // 260-char ASCII body — exercise the byte-length cap. + longBody := strings.Repeat("x", 260) + fake := &fakeService{ + StatusFn: func(ctx context.Context) (*kmsservice.StatusResponse, error) { + return &kmsservice.StatusResponse{ + Healthz: longBody, + KeyID: "test-key-1", + }, nil + }, + } + + got := NewProbe(fake, testTimeout).Probe(context.Background()) + + if got.Healthz.Class != HealthClassNotOK { + t.Errorf("Healthz class: got %q, want %q", got.Healthz.Class, HealthClassNotOK) + } + if len(got.Healthz.Detail) != healthzMaxBodyLen { + t.Errorf("Healthz detail length: got %d, want %d", len(got.Healthz.Detail), healthzMaxBodyLen) + } +} + +func TestProbe_notOk_truncatesSafelyAtMultiByteBoundary(t *testing.T) { + // 198 ASCII bytes + thumbs-up emoji (4 bytes) + tail. Naive byte- + // slicing at healthzMaxBodyLen=200 would land between bytes 2 and 3 + // of the emoji, producing invalid UTF-8 — apiserver rejects + // ConfigMap Updates with invalid UTF-8 in cm.Data values, which + // would freeze the published status indefinitely. + body := strings.Repeat("x", 198) + "👍" + "tail" + fake := &fakeService{ + StatusFn: func(ctx context.Context) (*kmsservice.StatusResponse, error) { + return &kmsservice.StatusResponse{ + Healthz: body, + KeyID: "test-key-1", + }, nil + }, + } + + got := NewProbe(fake, testTimeout).Probe(context.Background()) + + if !utf8.ValidString(got.Healthz.Detail) { + t.Errorf("Healthz detail is not valid UTF-8: %q", got.Healthz.Detail) + } + if len(got.Healthz.Detail) > healthzMaxBodyLen { + t.Errorf("Healthz detail too long: got %d bytes, want <= %d", len(got.Healthz.Detail), healthzMaxBodyLen) + } +} + +func TestProbe_respectsPerProbeTimeout(t *testing.T) { + // Fake blocks until ctx fires; Probe's own WithTimeout should cut it + // off after probeTimeout. The fake wraps ctx.Err() exactly the way + // gRPC's transport does in production (status.FromContextError), so + // classification sees DeadlineExceeded. + fake := &fakeService{ + StatusFn: func(ctx context.Context) (*kmsservice.StatusResponse, error) { + <-ctx.Done() + return nil, status.FromContextError(ctx.Err()).Err() + }, + } + + const probeTimeout = 20 * time.Millisecond + probe := NewProbe(fake, probeTimeout) + + start := time.Now() + // Parent context has NO deadline — if Probe doesn't apply its own + // timeout, this test hangs and we'd notice via go test -timeout. + got := probe.Probe(context.Background()) + elapsed := time.Since(start) + + // Budget: probeTimeout + generous slack for CI. If we see >1s we're + // honoring the parent ctx instead of our own timeout. + if elapsed > 500*time.Millisecond { + t.Errorf("probe elapsed %v — per-probe timeout not applied", elapsed) + } + want := Healthz{Class: HealthClassRPCError, Detail: "DeadlineExceeded"} + if got.Healthz != want { + t.Errorf("Healthz: got %q, want %q", got.Healthz, want) + } +} + +func TestProbe_rpcError(t *testing.T) { + // The real k8senvelopekmsv2 client returns status errors from gRPC + // calls; generic (non-status) errors are treated by grpc/status as code + // Unknown. See vendor/google.golang.org/grpc/status/status.go. + scenarios := []struct { + name string + fakeErr error + wantCode string + }{ + { + name: "generic non-gRPC error classifies as Unknown", + fakeErr: errors.New("connection refused"), + wantCode: "Unknown", + }, + { + name: "gRPC Unavailable propagates through classification", + fakeErr: status.Error(codes.Unavailable, "server gone"), + wantCode: "Unavailable", + }, + { + name: "gRPC DeadlineExceeded propagates through classification", + fakeErr: status.Error(codes.DeadlineExceeded, "too slow"), + wantCode: "DeadlineExceeded", + }, + } + + for _, tc := range scenarios { + t.Run(tc.name, func(t *testing.T) { + fake := &fakeService{ + StatusFn: func(ctx context.Context) (*kmsservice.StatusResponse, error) { + return nil, tc.fakeErr + }, + } + + got := NewProbe(fake, testTimeout).Probe(context.Background()) + + want := Healthz{Class: HealthClassRPCError, Detail: tc.wantCode} + if got.Healthz != want { + t.Errorf("Healthz: got %q, want %q", got.Healthz, want) + } + if got.KeyIDHash != "" { + t.Errorf("KeyIDHash: got %q, want empty (unreachable plugin)", got.KeyIDHash) + } + if got.Timestamp.IsZero() { + t.Error("Timestamp: expected non-zero even on RPC error") + } + }) + } +} diff --git a/pkg/operator/encryption/kms/health/types.go b/pkg/operator/encryption/kms/health/types.go new file mode 100644 index 0000000000..0825f6595a --- /dev/null +++ b/pkg/operator/encryption/kms/health/types.go @@ -0,0 +1,49 @@ +package health + +import "time" + +// HealthClass is the closed-set classifier for Healthz. +type HealthClass string + +const ( + HealthClassOK HealthClass = "ok" + HealthClassNotOK HealthClass = "not-ok" + HealthClassRPCError HealthClass = "rpc-error" +) + +type Healthz struct { + Class HealthClass + Detail string +} + +// IsOK is the canonical health predicate. Prefer this over comparing +// Class directly so consumers don't depend on the internal shape. +func (h Healthz) IsOK() bool { return h.Class == HealthClassOK } + +// String renders the wire format: "ok" or "unhealthy::". +func (h Healthz) String() string { + if h.Class == HealthClassOK { + return string(HealthClassOK) + } + return "unhealthy:" + string(h.Class) + ":" + h.Detail +} + +type HealthStatus struct { + Healthz Healthz + + // KeyIDHash is the sha256 hex of the KeyID the plugin returned, or + // empty if the probe could not reach Status. Hashing avoids leaking + // key material; consumers can still diff hashes across instances to + // detect rotation skew. + KeyIDHash string + + // Timestamp: RFC3339 on the wire. + Timestamp time.Time + + // ObserverPod is stable across deployment stages. ConfigMapWriter + // does not strictly need it (one CM per monitor, identity is in the + // CM name); the OpenShift CRD-condition writer does (one CR + // aggregates conditions from N pods, and identity must travel with + // each entry). + ObserverPod string +} diff --git a/pkg/operator/encryption/kms/health/writer.go b/pkg/operator/encryption/kms/health/writer.go new file mode 100644 index 0000000000..a14b45a5fe --- /dev/null +++ b/pkg/operator/encryption/kms/health/writer.go @@ -0,0 +1,10 @@ +package health + +import "context" + +// StatusWriter ships each HealthStatus produced by the Monitor. Write errors +// should be logged and tolerated by the Monitor. A missed publish is strictly +// less bad than crashing the sidecar. +type StatusWriter interface { + Write(ctx context.Context, status HealthStatus) error +} diff --git a/pkg/operator/encryption/kms/health/writer_configmap.go b/pkg/operator/encryption/kms/health/writer_configmap.go new file mode 100644 index 0000000000..f831035d29 --- /dev/null +++ b/pkg/operator/encryption/kms/health/writer_configmap.go @@ -0,0 +1,100 @@ +package health + +import ( + "context" + "encoding/json" + "fmt" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" +) + +// ConfigMapWriter uses merge-patch so a tick of class X updates data.X +// and leaves the other classes' keys alone. Consumers determine the +// "current" class by max data..timestamp; there is no separate +// pointer key. +// +// On the wire (data is map[string]string; values shown decoded): +// +// data.ok = {"timestamp":"...","observerPod":"...","keyIDHash":"..."} +// data.not-ok = {"timestamp":"...","observerPod":"...","detail":"...","keyIDHash":"..."} +// data.rpc-error = {"timestamp":"...","observerPod":"...","detail":"..."} +// +// Concurrency contract: one ConfigMap per monitor instance. Two +// monitors writing the same CM produce last-writer-wins on every key. +// Callers MUST encode instance identity in the CM name (the cmd-layer +// default is "kms-health-${POD_NAME}"). +type ConfigMapWriter struct { + client kubernetes.Interface + namespace string + name string +} + +func NewConfigMapWriter(client kubernetes.Interface, namespace, name string) *ConfigMapWriter { + return &ConfigMapWriter{ + client: client, + namespace: namespace, + name: name, + } +} + +type classEntry struct { + Timestamp string `json:"timestamp"` + ObserverPod string `json:"observerPod"` + Detail string `json:"detail,omitempty"` + KeyIDHash string `json:"keyIDHash,omitempty"` +} + +type configMapDataPatch struct { + Data map[string]string `json:"data"` +} + +func (w *ConfigMapWriter) Write(ctx context.Context, status HealthStatus) error { + entry := classEntry{ + Timestamp: status.Timestamp.UTC().Format(time.RFC3339), + ObserverPod: status.ObserverPod, + Detail: status.Healthz.Detail, + KeyIDHash: status.KeyIDHash, + } + entryBytes, err := json.Marshal(entry) + if err != nil { + return fmt.Errorf("marshal entry for ConfigMap %s/%s: %w", w.namespace, w.name, err) + } + + data := map[string]string{string(status.Healthz.Class): string(entryBytes)} + patchBytes, err := json.Marshal(configMapDataPatch{Data: data}) + if err != nil { + return fmt.Errorf("marshal patch for ConfigMap %s/%s: %w", w.namespace, w.name, err) + } + + cms := w.client.CoreV1().ConfigMaps(w.namespace) + _, err = cms.Patch(ctx, w.name, types.MergePatchType, patchBytes, metav1.PatchOptions{}) + if err == nil { + return nil + } + if !apierrors.IsNotFound(err) { + return fmt.Errorf("patch ConfigMap %s/%s: %w", w.namespace, w.name, err) + } + + cm := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: w.name, Namespace: w.namespace}, + Data: data, + } + _, err = cms.Create(ctx, cm, metav1.CreateOptions{}) + if err == nil { + return nil + } + if !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("create ConfigMap %s/%s: %w", w.namespace, w.name, err) + } + + // Race: another writer created the CM between our Patch and Create. + if _, err := cms.Patch(ctx, w.name, types.MergePatchType, patchBytes, metav1.PatchOptions{}); err != nil { + return fmt.Errorf("patch ConfigMap %s/%s after create race: %w", w.namespace, w.name, err) + } + return nil +} diff --git a/pkg/operator/encryption/kms/health/writer_configmap_test.go b/pkg/operator/encryption/kms/health/writer_configmap_test.go new file mode 100644 index 0000000000..a572db5c68 --- /dev/null +++ b/pkg/operator/encryption/kms/health/writer_configmap_test.go @@ -0,0 +1,244 @@ +package health + +import ( + "context" + "encoding/json" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +const ( + testNamespace = "kms-health-test" + testCMName = "kms-health-status" +) + +func newTestConfigMap() *corev1.ConfigMap { + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: testCMName, + Namespace: testNamespace, + }, + } +} + +// decodeEntry parses data. back into a classEntry. Comparing +// decoded structs avoids brittleness against JSON field ordering changes. +func decodeEntry(t *testing.T, raw string) classEntry { + t.Helper() + var e classEntry + if err := json.Unmarshal([]byte(raw), &e); err != nil { + t.Fatalf("decode classEntry %q: %v", raw, err) + } + return e +} + +func TestConfigMapWriter_writesHealthyEntryUnderOKKey(t *testing.T) { + client := fake.NewClientset(newTestConfigMap()) + w := NewConfigMapWriter(client, testNamespace, testCMName) + + status := HealthStatus{ + Healthz: Healthz{Class: HealthClassOK}, + KeyIDHash: "abc123", + Timestamp: time.Date(2026, 4, 24, 12, 0, 0, 0, time.UTC), + ObserverPod: "mock-kmsv2-provider-node", + } + if err := w.Write(context.Background(), status); err != nil { + t.Fatalf("Write: %v", err) + } + + got, err := client.CoreV1().ConfigMaps(testNamespace). + Get(context.Background(), testCMName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Get: %v", err) + } + + raw, ok := got.Data[string(HealthClassOK)] + if !ok { + t.Fatalf("data[%q] missing; data=%v", HealthClassOK, got.Data) + } + entry := decodeEntry(t, raw) + + want := classEntry{ + Timestamp: "2026-04-24T12:00:00Z", + ObserverPod: "mock-kmsv2-provider-node", + KeyIDHash: "abc123", + } + if entry != want { + t.Errorf("data[ok]: got %+v, want %+v", entry, want) + } + + // Other class keys must be absent: we never observed them. + for _, c := range []HealthClass{HealthClassNotOK, HealthClassRPCError} { + if _, present := got.Data[string(c)]; present { + t.Errorf("data[%q] present after only an OK probe; data=%v", c, got.Data) + } + } +} + +func TestConfigMapWriter_writesUnhealthyEntryUnderNotOKKey(t *testing.T) { + client := fake.NewClientset(newTestConfigMap()) + w := NewConfigMapWriter(client, testNamespace, testCMName) + + status := HealthStatus{ + Healthz: Healthz{Class: HealthClassNotOK, Detail: "boom"}, + Timestamp: time.Date(2026, 4, 24, 12, 1, 0, 0, time.UTC), + ObserverPod: "p", + } + if err := w.Write(context.Background(), status); err != nil { + t.Fatalf("Write: %v", err) + } + + got, _ := client.CoreV1().ConfigMaps(testNamespace). + Get(context.Background(), testCMName, metav1.GetOptions{}) + raw, ok := got.Data[string(HealthClassNotOK)] + if !ok { + t.Fatalf("data[%q] missing; data=%v", HealthClassNotOK, got.Data) + } + entry := decodeEntry(t, raw) + + want := classEntry{ + Timestamp: "2026-04-24T12:01:00Z", + ObserverPod: "p", + Detail: "boom", + } + if entry != want { + t.Errorf("data[not-ok]: got %+v, want %+v", entry, want) + } +} + +func TestConfigMapWriter_writesRPCErrorEntryUnderRPCErrorKey(t *testing.T) { + client := fake.NewClientset(newTestConfigMap()) + w := NewConfigMapWriter(client, testNamespace, testCMName) + + status := HealthStatus{ + Healthz: Healthz{Class: HealthClassRPCError, Detail: "Unavailable"}, + Timestamp: time.Date(2026, 4, 24, 12, 2, 0, 0, time.UTC), + ObserverPod: "p", + } + if err := w.Write(context.Background(), status); err != nil { + t.Fatalf("Write: %v", err) + } + + got, _ := client.CoreV1().ConfigMaps(testNamespace). + Get(context.Background(), testCMName, metav1.GetOptions{}) + raw, ok := got.Data[string(HealthClassRPCError)] + if !ok { + t.Fatalf("data[%q] missing; data=%v", HealthClassRPCError, got.Data) + } + entry := decodeEntry(t, raw) + + want := classEntry{ + Timestamp: "2026-04-24T12:02:00Z", + ObserverPod: "p", + Detail: "Unavailable", + } + if entry != want { + t.Errorf("data[rpc-error]: got %+v, want %+v", entry, want) + } +} + +// TestConfigMapWriter_tickPreservesOtherClasses is the load-bearing +// test for the schema's whole point: a probe of class X must not +// clobber data.Y / data.Z. If a future refactor swaps merge-patch for +// Update or for a strategic-merge-with-replace directive, this test +// goes red. +func TestConfigMapWriter_tickPreservesOtherClasses(t *testing.T) { + // Pre-populate data.ok with a stale healthy entry; tick rpc-error; + // assert data.ok survived byte-for-byte. + staleOK := `{"timestamp":"2026-04-24T11:00:00Z","observerPod":"p","keyIDHash":"old"}` + existing := newTestConfigMap() + existing.Data = map[string]string{ + string(HealthClassOK): staleOK, + } + client := fake.NewClientset(existing) + w := NewConfigMapWriter(client, testNamespace, testCMName) + + status := HealthStatus{ + Healthz: Healthz{Class: HealthClassRPCError, Detail: "Unavailable"}, + Timestamp: time.Date(2026, 4, 24, 12, 0, 0, 0, time.UTC), + ObserverPod: "p", + } + if err := w.Write(context.Background(), status); err != nil { + t.Fatalf("Write: %v", err) + } + + got, _ := client.CoreV1().ConfigMaps(testNamespace). + Get(context.Background(), testCMName, metav1.GetOptions{}) + + if got.Data[string(HealthClassOK)] != staleOK { + t.Errorf("data[ok] mutated by rpc-error tick: got %q, want %q", + got.Data[string(HealthClassOK)], staleOK) + } + if _, ok := got.Data[string(HealthClassRPCError)]; !ok { + t.Errorf("data[rpc-error] not written; data=%v", got.Data) + } +} + +func TestConfigMapWriter_patchPreservesUnrelatedKeys(t *testing.T) { + // Same load-bearing property as tickPreservesOtherClasses, but for + // keys outside our schema entirely (e.g. an annotation-style key set + // by something else in the same CM). + existing := newTestConfigMap() + existing.Data = map[string]string{ + "unrelated-key": "set-by-something-else", + } + client := fake.NewClientset(existing) + w := NewConfigMapWriter(client, testNamespace, testCMName) + + if err := w.Write(context.Background(), HealthStatus{ + Healthz: Healthz{Class: HealthClassOK}, + Timestamp: time.Date(2026, 4, 24, 12, 0, 0, 0, time.UTC), + ObserverPod: "p", + }); err != nil { + t.Fatalf("Write: %v", err) + } + + got, _ := client.CoreV1().ConfigMaps(testNamespace). + Get(context.Background(), testCMName, metav1.GetOptions{}) + if got.Data["unrelated-key"] != "set-by-something-else" { + t.Errorf("merge patch dropped unrelated key: got %q", got.Data["unrelated-key"]) + } + if _, ok := got.Data[string(HealthClassOK)]; !ok { + t.Errorf("data[ok] not written; data=%v", got.Data) + } +} + +func TestConfigMapWriter_createsConfigMapWhenMissing(t *testing.T) { + // Empty clientset, no ConfigMap to patch. Write self-heals by + // creating the CM seeded with this tick's class entry; subsequent + // ticks would extend it via merge-patch. + client := fake.NewClientset() + w := NewConfigMapWriter(client, testNamespace, testCMName) + + status := HealthStatus{ + Healthz: Healthz{Class: HealthClassOK}, + KeyIDHash: "abc", + Timestamp: time.Date(2026, 4, 24, 12, 0, 0, 0, time.UTC), + ObserverPod: "p", + } + if err := w.Write(context.Background(), status); err != nil { + t.Fatalf("Write on missing CM: %v", err) + } + + got, err := client.CoreV1().ConfigMaps(testNamespace). + Get(context.Background(), testCMName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Get: %v", err) + } + raw, ok := got.Data[string(HealthClassOK)] + if !ok { + t.Fatalf("data[%q] missing after create; data=%v", HealthClassOK, got.Data) + } + want := classEntry{ + Timestamp: "2026-04-24T12:00:00Z", + ObserverPod: "p", + KeyIDHash: "abc", + } + if entry := decodeEntry(t, raw); entry != want { + t.Errorf("data[ok]: got %+v, want %+v", entry, want) + } +} From 27214dde7926e5b18a8fe1de626fba0b8c5845f8 Mon Sep 17 00:00:00 2001 From: Krzysztof Ostrowski Date: Thu, 30 Apr 2026 15:51:31 +0200 Subject: [PATCH 2/3] kms/health: add container template GenerateContainerTemplate renders a corev1.Container sidecar. KMSPluginName suffixes container name, volumeMount, kms-socket path, and configmap name so multi-plugin coexists in one pod. Drive-by: ConfigMapWriter godoc updated to match the actual patch-then-create flow. --- .../health/assets/kms-health-container.yaml | 27 +++ pkg/operator/encryption/kms/health/bindata.go | 16 ++ .../kms/health/container_template.go | 119 ++++++++++++ .../kms/health/container_template_test.go | 171 ++++++++++++++++++ .../encryption/kms/health/writer_configmap.go | 4 + 5 files changed, 337 insertions(+) create mode 100644 pkg/operator/encryption/kms/health/assets/kms-health-container.yaml create mode 100644 pkg/operator/encryption/kms/health/bindata.go create mode 100644 pkg/operator/encryption/kms/health/container_template.go create mode 100644 pkg/operator/encryption/kms/health/container_template_test.go diff --git a/pkg/operator/encryption/kms/health/assets/kms-health-container.yaml b/pkg/operator/encryption/kms/health/assets/kms-health-container.yaml new file mode 100644 index 0000000000..8bceca8b2d --- /dev/null +++ b/pkg/operator/encryption/kms/health/assets/kms-health-container.yaml @@ -0,0 +1,27 @@ +name: kms-health-monitor-{{.KMSPluginName}} +image: {{.OperatorImage}} +command: +{{- range .Command }} + - {{ . | toJson }} +{{- end }} +args: + - --kms-socket=/var/run/kmsplugin-{{.KMSPluginName}}/kms.sock + - --probe-interval={{.ProbeInterval}} + - --probe-interval-unhealthy={{.ProbeIntervalUnhealthy}} + - --probe-timeout={{.ProbeTimeout}} + - --write-timeout={{.WriteTimeout}} + - --output-mode=configmap + - --configmap-namespace={{.ConfigMapNamespace}} + - --configmap-name=kms-health-{{.KMSPluginName}} +env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name +volumeMounts: + - name: kms-plugin-socket-{{.KMSPluginName}} + mountPath: /var/run/kmsplugin-{{.KMSPluginName}} +resources: + requests: + memory: 50Mi + cpu: 5m diff --git a/pkg/operator/encryption/kms/health/bindata.go b/pkg/operator/encryption/kms/health/bindata.go new file mode 100644 index 0000000000..545d50cd77 --- /dev/null +++ b/pkg/operator/encryption/kms/health/bindata.go @@ -0,0 +1,16 @@ +package health + +import ( + "embed" +) + +//go:embed assets/* +var f embed.FS + +func mustAsset(name string) []byte { + data, err := f.ReadFile(name) + if err != nil { + panic(err) + } + return data +} diff --git a/pkg/operator/encryption/kms/health/container_template.go b/pkg/operator/encryption/kms/health/container_template.go new file mode 100644 index 0000000000..7a9256dbf7 --- /dev/null +++ b/pkg/operator/encryption/kms/health/container_template.go @@ -0,0 +1,119 @@ +package health + +import ( + "bytes" + "encoding/json" + "fmt" + "strings" + "text/template" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/validation" + "sigs.k8s.io/yaml" +) + +// ContainerOptions are the per-caller knobs for GenerateContainerTemplate. +// KMSPluginName is the cohering parameter; distinct names produce +// non-colliding sidecars so multiple plugins can be observed in one pod. +type ContainerOptions struct { + // Must be a DNS-1123 label; suffixed onto container/volume/CM names. + KMSPluginName string + + OperatorImage string + OperatorCommand []string + + ProbeInterval time.Duration + ProbeIntervalUnhealthy time.Duration + ProbeTimeout time.Duration + WriteTimeout time.Duration + + ConfigMapNamespace string +} + +// GenerateContainerTemplate renders the kms-health-monitor sidecar. +// Caller must add a matching Volume named kms-plugin-socket- +// to PodSpec.Volumes. The deprecated single-plugin +// AddKMSPluginVolumeAndMountToPodSpec helper is intentionally not used. +func GenerateContainerTemplate(opts ContainerOptions) (corev1.Container, error) { + if err := opts.validate(); err != nil { + return corev1.Container{}, err + } + + rawManifest := mustAsset("assets/kms-health-container.yaml") + + funcs := template.FuncMap{ + "toJson": func(v any) (string, error) { + b, err := json.Marshal(v) + if err != nil { + return "", err + } + return string(b), nil + }, + } + + tmpl, err := template.New("kms-health-container").Funcs(funcs).Parse(string(rawManifest)) + if err != nil { + return corev1.Container{}, fmt.Errorf("parse asset template: %w", err) + } + + values := struct { + KMSPluginName string + OperatorImage string + Command []string + ProbeInterval string + ProbeIntervalUnhealthy string + ProbeTimeout string + WriteTimeout string + ConfigMapNamespace string + }{ + KMSPluginName: opts.KMSPluginName, + OperatorImage: opts.OperatorImage, + Command: opts.OperatorCommand, + ProbeInterval: opts.ProbeInterval.String(), + ProbeIntervalUnhealthy: opts.ProbeIntervalUnhealthy.String(), + ProbeTimeout: opts.ProbeTimeout.String(), + WriteTimeout: opts.WriteTimeout.String(), + ConfigMapNamespace: opts.ConfigMapNamespace, + } + + var buf bytes.Buffer + if err := tmpl.Execute(&buf, values); err != nil { + return corev1.Container{}, fmt.Errorf("execute template: %w", err) + } + + var container corev1.Container + if err := yaml.Unmarshal(buf.Bytes(), &container); err != nil { + return corev1.Container{}, fmt.Errorf("parse rendered container: %w", err) + } + return container, nil +} + +func (o ContainerOptions) validate() error { + if errs := validation.IsDNS1123Label(o.KMSPluginName); len(errs) > 0 { + return fmt.Errorf("KMSPluginName %q is not a valid DNS-1123 label: %s", + o.KMSPluginName, strings.Join(errs, "; ")) + } + if o.OperatorImage == "" { + return fmt.Errorf("OperatorImage is required") + } + if len(o.OperatorCommand) == 0 { + return fmt.Errorf("OperatorCommand must have at least one element") + } + if o.ProbeInterval <= 0 { + return fmt.Errorf("ProbeInterval must be positive") + } + if o.ProbeIntervalUnhealthy <= 0 { + return fmt.Errorf("ProbeIntervalUnhealthy must be positive") + } + if o.ProbeTimeout <= 0 { + return fmt.Errorf("ProbeTimeout must be positive") + } + if o.WriteTimeout <= 0 { + return fmt.Errorf("WriteTimeout must be positive") + } + if o.ConfigMapNamespace == "" { + return fmt.Errorf("ConfigMapNamespace is required") + } + return nil +} diff --git a/pkg/operator/encryption/kms/health/container_template_test.go b/pkg/operator/encryption/kms/health/container_template_test.go new file mode 100644 index 0000000000..de68a11606 --- /dev/null +++ b/pkg/operator/encryption/kms/health/container_template_test.go @@ -0,0 +1,171 @@ +package health + +import ( + "strings" + "testing" + "time" + + "github.com/spf13/pflag" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + "sigs.k8s.io/yaml" +) + +const expectedContainerYAML = ` +name: kms-health-monitor-aws +image: quay.io/openshift/operator:latest +command: + - "operator" + - "kms-health-monitor" +args: + - --kms-socket=/var/run/kmsplugin-aws/kms.sock + - --probe-interval=1m0s + - --probe-interval-unhealthy=10s + - --probe-timeout=3s + - --write-timeout=5s + - --output-mode=configmap + - --configmap-namespace=openshift-kube-apiserver + - --configmap-name=kms-health-aws +env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name +volumeMounts: + - name: kms-plugin-socket-aws + mountPath: /var/run/kmsplugin-aws +resources: + requests: + memory: 50Mi + cpu: 5m +` + +func validContainerOptions() ContainerOptions { + return ContainerOptions{ + KMSPluginName: "aws", + OperatorImage: "quay.io/openshift/operator:latest", + OperatorCommand: []string{"operator", "kms-health-monitor"}, + ProbeInterval: 60 * time.Second, + ProbeIntervalUnhealthy: 10 * time.Second, + ProbeTimeout: 3 * time.Second, + WriteTimeout: 5 * time.Second, + ConfigMapNamespace: "openshift-kube-apiserver", + } +} + +func TestGenerateContainerTemplate(t *testing.T) { + got, err := GenerateContainerTemplate(validContainerOptions()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + var want corev1.Container + if err := yaml.Unmarshal([]byte(expectedContainerYAML), &want); err != nil { + t.Fatalf("parse expected: %v", err) + } + if !equality.Semantic.DeepEqual(got, want) { + t.Fatalf("rendered container does not match expected:\ngot: %+v\nwant: %+v", got, want) + } +} + +// Drift detector: rendered args must parse against cmd.go's flag set. +func TestRenderedArgsAreValidFlags(t *testing.T) { + c, err := GenerateContainerTemplate(validContainerOptions()) + if err != nil { + t.Fatalf("render: %v", err) + } + + fs := pflag.NewFlagSet("kms-health-monitor", pflag.ContinueOnError) + (&commandOptions{}).addFlags(fs) + if err := fs.Parse(c.Args); err != nil { + t.Fatalf("rendered args do not parse against cmd.go flag set: %v\nargs: %v", err, c.Args) + } +} + +// Distinct KMSPluginName must yield non-colliding container/volume/CM names. +func TestMultiplePluginsCoexist(t *testing.T) { + a, err := GenerateContainerTemplate(ContainerOptions{ + KMSPluginName: "aws", + OperatorImage: "img:1", + OperatorCommand: []string{"o"}, + ProbeInterval: 60 * time.Second, + ProbeIntervalUnhealthy: 10 * time.Second, + ProbeTimeout: 3 * time.Second, + WriteTimeout: 5 * time.Second, + ConfigMapNamespace: "ns", + }) + if err != nil { + t.Fatalf("render aws: %v", err) + } + b, err := GenerateContainerTemplate(ContainerOptions{ + KMSPluginName: "vault", + OperatorImage: "img:1", + OperatorCommand: []string{"o"}, + ProbeInterval: 60 * time.Second, + ProbeIntervalUnhealthy: 10 * time.Second, + ProbeTimeout: 3 * time.Second, + WriteTimeout: 5 * time.Second, + ConfigMapNamespace: "ns", + }) + if err != nil { + t.Fatalf("render vault: %v", err) + } + + if a.Name == b.Name { + t.Fatalf("container names collide: %q == %q", a.Name, b.Name) + } + if a.VolumeMounts[0].Name == b.VolumeMounts[0].Name { + t.Fatalf("volumeMount names collide: %q", a.VolumeMounts[0].Name) + } + if a.VolumeMounts[0].MountPath == b.VolumeMounts[0].MountPath { + t.Fatalf("mountPaths collide: %q", a.VolumeMounts[0].MountPath) + } + // Args must contain different --configmap-name and --kms-socket values. + if argsEqual(a.Args, b.Args) { + t.Fatalf("args identical for distinct plugin names") + } +} + +func argsEqual(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +func TestContainerOptionsValidate(t *testing.T) { + tests := []struct { + name string + mutate func(*ContainerOptions) + wantSub string + }{ + {"empty plugin name", func(o *ContainerOptions) { o.KMSPluginName = "" }, "KMSPluginName"}, + {"underscore in plugin name", func(o *ContainerOptions) { o.KMSPluginName = "aws_kms" }, "KMSPluginName"}, + {"uppercase in plugin name", func(o *ContainerOptions) { o.KMSPluginName = "AWS" }, "KMSPluginName"}, + {"empty image", func(o *ContainerOptions) { o.OperatorImage = "" }, "OperatorImage"}, + {"empty command", func(o *ContainerOptions) { o.OperatorCommand = nil }, "OperatorCommand"}, + {"zero probe-interval", func(o *ContainerOptions) { o.ProbeInterval = 0 }, "ProbeInterval"}, + {"zero probe-interval-unhealthy", func(o *ContainerOptions) { o.ProbeIntervalUnhealthy = 0 }, "ProbeIntervalUnhealthy"}, + {"zero probe-timeout", func(o *ContainerOptions) { o.ProbeTimeout = 0 }, "ProbeTimeout"}, + {"zero write-timeout", func(o *ContainerOptions) { o.WriteTimeout = 0 }, "WriteTimeout"}, + {"empty namespace", func(o *ContainerOptions) { o.ConfigMapNamespace = "" }, "ConfigMapNamespace"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + opts := validContainerOptions() + tt.mutate(&opts) + _, err := GenerateContainerTemplate(opts) + if err == nil { + t.Fatalf("expected error containing %q, got nil", tt.wantSub) + } + if !strings.Contains(err.Error(), tt.wantSub) { + t.Fatalf("error %q does not contain %q", err.Error(), tt.wantSub) + } + }) + } +} diff --git a/pkg/operator/encryption/kms/health/writer_configmap.go b/pkg/operator/encryption/kms/health/writer_configmap.go index f831035d29..7025192e07 100644 --- a/pkg/operator/encryption/kms/health/writer_configmap.go +++ b/pkg/operator/encryption/kms/health/writer_configmap.go @@ -24,6 +24,10 @@ import ( // data.not-ok = {"timestamp":"...","observerPod":"...","detail":"...","keyIDHash":"..."} // data.rpc-error = {"timestamp":"...","observerPod":"...","detail":"..."} // +// Self-heals on miss: if the CM is absent, Write creates it. Caller +// RBAC must therefore grant create in addition to get/update/patch on +// the named CM. +// // Concurrency contract: one ConfigMap per monitor instance. Two // monitors writing the same CM produce last-writer-wins on every key. // Callers MUST encode instance identity in the CM name (the cmd-layer From b0d6ae6da103054f1c830babe7a6092b6f5b5da2 Mon Sep 17 00:00:00 2001 From: Krzysztof Ostrowski Date: Thu, 30 Apr 2026 15:51:52 +0200 Subject: [PATCH 3/3] examples/kms-health-kind: add KIND harness Self-contained throwaway harness for validating the health package against a real KIND cluster. Defaults to cluster name kms-health-claude. fake-plugin/ ~90-line k8s.io/kms/pkg/service.Service with AES-GCM and a file-marker unhealthy toggle. cmd/monitor/ wraps health.NewCommand. cmd/render-container/ wraps health.GenerateContainerTemplate; the harness Deployment mirrors its output. manifests/ fake-plugin runs as a static pod in kube-system; monitor runs as a Deployment in kms-health-test pinned to the control-plane node; both share the host's /tmp bind-mounted at /var/run/kmsplugin-fake/ for the UDS. RBAC splits into two rules since k8s ignores create on rules with resourceNames. verify.sh six end-to-end assertions. Drop this commit before merging to keep examples/ out of the library. --- .../kms-health-kind/Dockerfile.fake-plugin | 15 ++ examples/kms-health-kind/Dockerfile.monitor | 15 ++ examples/kms-health-kind/Makefile | 71 +++++++ examples/kms-health-kind/bin/down.sh | 18 ++ examples/kms-health-kind/bin/up.sh | 82 ++++++++ examples/kms-health-kind/cmd/monitor/main.go | 26 +++ .../cmd/render-container/main.go | 62 ++++++ examples/kms-health-kind/fake-plugin/main.go | 125 ++++++++++++ .../manifests/encryption-config.yaml | 11 ++ examples/kms-health-kind/manifests/kind.yaml | 22 +++ .../manifests/monitor-deployment.yaml | 58 ++++++ .../kms-health-kind/manifests/namespace.yaml | 6 + examples/kms-health-kind/manifests/rbac.yaml | 39 ++++ .../kms-health-kind/manifests/static-pod.yaml | 28 +++ examples/kms-health-kind/verify.sh | 183 ++++++++++++++++++ 15 files changed, 761 insertions(+) create mode 100644 examples/kms-health-kind/Dockerfile.fake-plugin create mode 100644 examples/kms-health-kind/Dockerfile.monitor create mode 100644 examples/kms-health-kind/Makefile create mode 100755 examples/kms-health-kind/bin/down.sh create mode 100755 examples/kms-health-kind/bin/up.sh create mode 100644 examples/kms-health-kind/cmd/monitor/main.go create mode 100644 examples/kms-health-kind/cmd/render-container/main.go create mode 100644 examples/kms-health-kind/fake-plugin/main.go create mode 100644 examples/kms-health-kind/manifests/encryption-config.yaml create mode 100644 examples/kms-health-kind/manifests/kind.yaml create mode 100644 examples/kms-health-kind/manifests/monitor-deployment.yaml create mode 100644 examples/kms-health-kind/manifests/namespace.yaml create mode 100644 examples/kms-health-kind/manifests/rbac.yaml create mode 100644 examples/kms-health-kind/manifests/static-pod.yaml create mode 100755 examples/kms-health-kind/verify.sh diff --git a/examples/kms-health-kind/Dockerfile.fake-plugin b/examples/kms-health-kind/Dockerfile.fake-plugin new file mode 100644 index 0000000000..73388718ef --- /dev/null +++ b/examples/kms-health-kind/Dockerfile.fake-plugin @@ -0,0 +1,15 @@ +# Disposable image for the KIND harness. Build from the LIBRARY-GO +# MODULE ROOT so the context includes vendor/ and go.mod: +# docker build -f examples/kms-health-kind/Dockerfile.fake-plugin -t kms-health-kind-fake-plugin:dev . +FROM golang:1.25 AS build +WORKDIR /src +COPY . . +RUN CGO_ENABLED=0 GOFLAGS=-mod=vendor \ + go build -trimpath -ldflags='-s -w' \ + -o /out/fake-plugin \ + ./examples/kms-health-kind/fake-plugin + +FROM gcr.io/distroless/static:nonroot +COPY --from=build /out/fake-plugin /fake-plugin +USER nonroot:nonroot +ENTRYPOINT ["/fake-plugin"] diff --git a/examples/kms-health-kind/Dockerfile.monitor b/examples/kms-health-kind/Dockerfile.monitor new file mode 100644 index 0000000000..31d2339aef --- /dev/null +++ b/examples/kms-health-kind/Dockerfile.monitor @@ -0,0 +1,15 @@ +# Disposable image for the KIND harness. Build from the LIBRARY-GO +# MODULE ROOT so the context includes vendor/ and go.mod: +# docker build -f examples/kms-health-kind/Dockerfile.monitor -t kms-health-kind-monitor:dev . +FROM golang:1.25 AS build +WORKDIR /src +COPY . . +RUN CGO_ENABLED=0 GOFLAGS=-mod=vendor \ + go build -trimpath -ldflags='-s -w' \ + -o /out/monitor \ + ./examples/kms-health-kind/cmd/monitor + +FROM gcr.io/distroless/static:nonroot +COPY --from=build /out/monitor /monitor +USER nonroot:nonroot +ENTRYPOINT ["/monitor"] diff --git a/examples/kms-health-kind/Makefile b/examples/kms-health-kind/Makefile new file mode 100644 index 0000000000..dbcede5b74 --- /dev/null +++ b/examples/kms-health-kind/Makefile @@ -0,0 +1,71 @@ +# KMSv2 health-monitor KIND harness. Disposable. +# +# All targets should be run from the library-go module root: +# make -C examples/kms-health-kind kind-verify + +CLUSTER ?= kms-health-claude +IMAGE_FAKE_PLUGIN ?= kms-health-kind-fake-plugin:dev +IMAGE_MONITOR ?= kms-health-kind-monitor:dev +CTX := kind-$(CLUSTER) +NODE := $(CLUSTER)-control-plane +FAKE_POD := kms-fake-plugin-$(NODE) + +# Dockerfile build context = library-go module root (need vendor/, go.mod). +REPO_ROOT := $(shell cd ../.. && pwd -P) +HARNESS_DIR := $(shell pwd -P) + +.PHONY: build-images +build-images: + docker build -t $(IMAGE_FAKE_PLUGIN) \ + -f $(HARNESS_DIR)/Dockerfile.fake-plugin \ + $(REPO_ROOT) + docker build -t $(IMAGE_MONITOR) \ + -f $(HARNESS_DIR)/Dockerfile.monitor \ + $(REPO_ROOT) + +.PHONY: kind-up +kind-up: + CLUSTER=$(CLUSTER) $(HARNESS_DIR)/bin/up.sh + +.PHONY: kind-down +kind-down: + CLUSTER=$(CLUSTER) $(HARNESS_DIR)/bin/down.sh + +## Distroless fake-plugin has no touch/rm; flip via node /tmp (bind-mounted +## to /var/run/kmsplugin-fake/ in both pods). +.PHONY: flip-unhealthy +flip-unhealthy: + docker exec $(NODE) touch /tmp/kms-unhealthy + +.PHONY: flip-healthy +flip-healthy: + docker exec $(NODE) rm -f /tmp/kms-unhealthy + +## Drift check: diff output against monitor-deployment.yaml's container block. +.PHONY: render-container +render-container: + go run $(REPO_ROOT)/examples/kms-health-kind/cmd/render-container \ + --kms-plugin-name=fake \ + --image=$(IMAGE_MONITOR) \ + --command=/usr/bin/kms-health-monitor \ + --probe-interval=2s \ + --probe-interval-unhealthy=1s \ + --probe-timeout=1s \ + --write-timeout=5s \ + --configmap-namespace=kms-health-test + +.PHONY: verify +verify: + CLUSTER=$(CLUSTER) $(HARNESS_DIR)/verify.sh + +.PHONY: kind-verify +kind-verify: build-images kind-up verify + @echo "" + @echo "=================================================" + @echo " kind-verify PASSED on cluster ${CLUSTER}" + @echo " tear down with: make kind-down" + @echo "=================================================" + +.PHONY: cm +cm: + kubectl --context $(CTX) -n kms-health-test get cm kms-health-fake -o yaml diff --git a/examples/kms-health-kind/bin/down.sh b/examples/kms-health-kind/bin/down.sh new file mode 100755 index 0000000000..9dc6877c7b --- /dev/null +++ b/examples/kms-health-kind/bin/down.sh @@ -0,0 +1,18 @@ +#!/usr/bin/env bash +# +# Tear down the KIND cluster created by up.sh. Safe to re-run. + +set -o errexit +set -o nounset +set -o pipefail + +unset KIND_EXPERIMENTAL_PROVIDER + +CLUSTER="${CLUSTER:-kms-health-claude}" + +if kind get clusters 2>/dev/null | grep -qx "${CLUSTER}"; then + echo "[down.sh] deleting kind cluster ${CLUSTER}" + kind delete cluster --name "${CLUSTER}" +else + echo "[down.sh] cluster ${CLUSTER} not found — nothing to do" +fi diff --git a/examples/kms-health-kind/bin/up.sh b/examples/kms-health-kind/bin/up.sh new file mode 100755 index 0000000000..13aee53ae0 --- /dev/null +++ b/examples/kms-health-kind/bin/up.sh @@ -0,0 +1,82 @@ +#!/usr/bin/env bash +# +# Create a KIND cluster named ${CLUSTER} (default: kms-health-claude) +# with a KMSv2 fake plugin (static pod) and a health-monitor Deployment +# pinned to the control-plane node. Self-contained — doesn't depend on +# kubernetes/kubernetes/hack/local-up-kms. +# +# Run from the library-go module root. + +set -o errexit +set -o nounset +set -o pipefail + +# Force docker — this harness's Dockerfiles build with `docker build` +# and `kind load docker-image` needs the same backend. User may have +# KIND_EXPERIMENTAL_PROVIDER=podman set globally; unset locally. +unset KIND_EXPERIMENTAL_PROVIDER + +CLUSTER="${CLUSTER:-kms-health-claude}" +KIND_NODE_IMAGE="${KIND_NODE_IMAGE:-kindest/node:v1.33.0}" + +HARNESS_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd -P)" + +echo "[up.sh] cluster=${CLUSTER} node=${KIND_NODE_IMAGE}" +echo "[up.sh] harness-dir=${HARNESS_DIR}" + +# kind.yaml uses paths relative to the harness dir (manifests/...). +cd "${HARNESS_DIR}" + +# 1) Create the cluster (idempotent). +if kind get clusters 2>/dev/null | grep -qx "${CLUSTER}"; then + echo "[up.sh] cluster ${CLUSTER} already exists, reusing" +else + echo "[up.sh] creating kind cluster ${CLUSTER}" + kind create cluster \ + --name "${CLUSTER}" \ + --image "${KIND_NODE_IMAGE}" \ + --config manifests/kind.yaml +fi + +CTX="kind-${CLUSTER}" +NODE="${CLUSTER}-control-plane" +STATIC_POD="kms-fake-plugin-${NODE}" + +# 2) Load our local images. +echo "[up.sh] loading images into cluster" +kind load docker-image kms-health-kind-fake-plugin:dev --name "${CLUSTER}" +kind load docker-image kms-health-kind-monitor:dev --name "${CLUSTER}" + +# 3) Force the static pod to restart so it picks up the freshly-loaded +# image (kubelet may have cached an ErrImageNeverPull from before). +echo "[up.sh] bouncing fake-plugin container so it picks up loaded image" +FAKE_CID="$(docker exec "${NODE}" crictl ps --name fake-plugin -q 2>/dev/null | head -1 || true)" +if [ -n "${FAKE_CID}" ]; then + docker exec "${NODE}" crictl stop "${FAKE_CID}" >/dev/null || true +fi + +# 4) Apply namespace + RBAC + monitor Deployment. The status ConfigMap +# is created by the writer itself on first observation; nothing +# pre-creates it. +echo "[up.sh] applying namespace, RBAC, monitor Deployment" +kubectl --context "${CTX}" apply -f manifests/namespace.yaml +kubectl --context "${CTX}" apply -f manifests/rbac.yaml +kubectl --context "${CTX}" apply -f manifests/monitor-deployment.yaml + +# 5) Wait for the monitor Deployment to be Available. Apiserver is NOT +# wired to KMS here (see manifests/kind.yaml) — we're only +# validating the monitor, not apiserver/KMS integration. +echo "[up.sh] waiting for monitor Deployment" +kubectl --context "${CTX}" -n kms-health-test rollout status deployment/kms-health-monitor-fake --timeout=120s + +MONITOR_POD="$(kubectl --context "${CTX}" -n kms-health-test get pod \ + -l app=kms-health-monitor-fake \ + -o jsonpath='{.items[0].metadata.name}')" +echo "[up.sh] done." +echo " cluster: ${CLUSTER}" +echo " monitor pod: ${MONITOR_POD} (kms-health-test)" +echo " status cm: kms-health-fake (kms-health-test)" +echo " fake plugin: ${STATIC_POD} (kube-system, static)" +echo "" +echo "Try:" +echo " kubectl --context ${CTX} -n kms-health-test get cm kms-health-fake -o yaml" diff --git a/examples/kms-health-kind/cmd/monitor/main.go b/examples/kms-health-kind/cmd/monitor/main.go new file mode 100644 index 0000000000..8bbc55ba29 --- /dev/null +++ b/examples/kms-health-kind/cmd/monitor/main.go @@ -0,0 +1,26 @@ +// Command monitor is the throwaway KIND-stage binary that bundles +// library-go's health.NewCommand(). Production builds come from the +// OpenShift operator repos; this main exists only so the KIND harness +// has something to put in a Dockerfile. +package main + +import ( + "fmt" + "os" + + genericapiserver "k8s.io/apiserver/pkg/server" + "k8s.io/klog/v2" + + "github.com/openshift/library-go/pkg/operator/encryption/kms/health" +) + +func main() { + ctx := genericapiserver.SetupSignalContext() + cmd := health.NewCommand(ctx) + if err := cmd.ExecuteContext(ctx); err != nil { + fmt.Fprintln(os.Stderr, err) + klog.Flush() + os.Exit(1) + } + klog.Flush() +} diff --git a/examples/kms-health-kind/cmd/render-container/main.go b/examples/kms-health-kind/cmd/render-container/main.go new file mode 100644 index 0000000000..9163729189 --- /dev/null +++ b/examples/kms-health-kind/cmd/render-container/main.go @@ -0,0 +1,62 @@ +// Disposable CLI: prints what health.GenerateContainerTemplate emits, so +// the KIND harness validates the production template end-to-end. +package main + +import ( + "flag" + "fmt" + "os" + "strings" + "time" + + "sigs.k8s.io/yaml" + + "github.com/openshift/library-go/pkg/operator/encryption/kms/health" +) + +func main() { + var ( + pluginName = flag.String("kms-plugin-name", "", + "DNS-1123 label identifying the plugin instance (required)") + image = flag.String("image", "", "operator image (required)") + cmdStr = flag.String("command", "", + "operator binary command, comma-separated (required), e.g. /usr/bin/kms-health-monitor") + probeInterval = flag.Duration("probe-interval", 60*time.Second, "") + probeIntervalUnhealthy = flag.Duration("probe-interval-unhealthy", 10*time.Second, "") + probeTimeout = flag.Duration("probe-timeout", 3*time.Second, "") + writeTimeout = flag.Duration("write-timeout", 5*time.Second, "") + ns = flag.String("configmap-namespace", "", + "operand namespace (required)") + ) + flag.Parse() + + var cmdParts []string + if *cmdStr != "" { + cmdParts = strings.Split(*cmdStr, ",") + } + + container, err := health.GenerateContainerTemplate(health.ContainerOptions{ + KMSPluginName: *pluginName, + OperatorImage: *image, + OperatorCommand: cmdParts, + ProbeInterval: *probeInterval, + ProbeIntervalUnhealthy: *probeIntervalUnhealthy, + ProbeTimeout: *probeTimeout, + WriteTimeout: *writeTimeout, + ConfigMapNamespace: *ns, + }) + if err != nil { + fmt.Fprintln(os.Stderr, "render-container:", err) + os.Exit(1) + } + + out, err := yaml.Marshal(&container) + if err != nil { + fmt.Fprintln(os.Stderr, "render-container: marshal:", err) + os.Exit(1) + } + if _, err := os.Stdout.Write(out); err != nil { + fmt.Fprintln(os.Stderr, "render-container: write:", err) + os.Exit(1) + } +} diff --git a/examples/kms-health-kind/fake-plugin/main.go b/examples/kms-health-kind/fake-plugin/main.go new file mode 100644 index 0000000000..bc5998c313 --- /dev/null +++ b/examples/kms-health-kind/fake-plugin/main.go @@ -0,0 +1,125 @@ +// Command fake-plugin is a disposable KMSv2 plugin for the KIND +// harness: AES-GCM encryption with a hard-coded key (NEVER for +// production use), plus a file-presence toggle that flips Status() +// between "ok" and "unhealthy:test-forced" so verify.sh can exercise +// both branches of the monitor's classification. +// +// Replaces upstream's softhsm-backed mock plugin at the same UDS path. +// ~90 lines of code; stays deliberately small so its disposability is +// obvious from a glance. +package main + +import ( + "context" + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "flag" + "fmt" + "io" + "log" + "os" + "os/signal" + "syscall" + "time" + + kmsservice "k8s.io/kms/pkg/service" +) + +// testKey is the AES-256-GCM key our fake uses to (un)wrap every DEK +// kube-apiserver sends. Fixed so decryption works across restarts of +// the fake; value is meaningless outside this KIND harness. +var testKey = []byte("0123456789abcdef0123456789abcdef") + +const fakeKeyID = "kms-health-kind-test-key-v1" + +type plugin struct { + markerPath string +} + +func (p *plugin) Status(ctx context.Context) (*kmsservice.StatusResponse, error) { + healthz := "ok" + if _, err := os.Stat(p.markerPath); err == nil { + healthz = "unhealthy:test-forced" + } + return &kmsservice.StatusResponse{ + Healthz: healthz, + Version: "v2", + KeyID: fakeKeyID, + }, nil +} + +func (p *plugin) Encrypt(ctx context.Context, uid string, data []byte) (*kmsservice.EncryptResponse, error) { + gcm, err := newGCM() + if err != nil { + return nil, err + } + nonce := make([]byte, gcm.NonceSize()) + if _, err := io.ReadFull(rand.Reader, nonce); err != nil { + return nil, err + } + return &kmsservice.EncryptResponse{ + Ciphertext: gcm.Seal(nonce, nonce, data, nil), + KeyID: fakeKeyID, + }, nil +} + +func (p *plugin) Decrypt(ctx context.Context, uid string, req *kmsservice.DecryptRequest) ([]byte, error) { + gcm, err := newGCM() + if err != nil { + return nil, err + } + ns := gcm.NonceSize() + if len(req.Ciphertext) < ns { + return nil, fmt.Errorf("ciphertext too short: %d bytes", len(req.Ciphertext)) + } + return gcm.Open(nil, req.Ciphertext[:ns], req.Ciphertext[ns:], nil) +} + +func newGCM() (cipher.AEAD, error) { + block, err := aes.NewCipher(testKey) + if err != nil { + return nil, err + } + return cipher.NewGCM(block) +} + +func main() { + socket := flag.String("socket", "/var/run/kmsplugin-fake/kms.sock", "UDS path to listen on") + marker := flag.String("unhealthy-marker", "/var/run/kmsplugin-fake/kms-unhealthy", "flipping this file's existence forces Status() to return unhealthy") + staydown := flag.String("staydown-marker", "/var/run/kmsplugin-fake/kms-staydown", "if this file exists at startup, sleep before listening so verify.sh can observe a real outage window") + staydownFor := flag.Duration("staydown-duration", 3*time.Second, "how long to sleep when staydown marker is present at startup") + flag.Parse() + + // Stay-down marker: when verify.sh wants assertion 6 to observe an + // rpc-error, it sets this file before killing the container. Kubelet + // restarts us, we see the marker on this fresh start, and sleep + // before listening so the gap is wide enough for a probe timeout to + // fire regardless of kubelet's restart speed. + if _, err := os.Stat(*staydown); err == nil { + log.Printf("fake-plugin: %s present, sleeping %s before listening", *staydown, *staydownFor) + time.Sleep(*staydownFor) + } + + // Idempotent: remove any stale socket from a prior crash. Must do + // this before ListenAndServe or the bind fails with "address in use". + _ = os.Remove(*socket) + + svc := &plugin{markerPath: *marker} + server := kmsservice.NewGRPCService(*socket, 5*time.Second, svc) + + serverErr := make(chan error, 1) + go func() { serverErr <- server.ListenAndServe() }() + + log.Printf("fake-plugin: listening on %s (marker=%s)", *socket, *marker) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT) + select { + case <-sig: + log.Print("fake-plugin: signal received, shutting down") + case err := <-serverErr: + log.Fatalf("fake-plugin: server exited: %v", err) + } + server.Shutdown() +} diff --git a/examples/kms-health-kind/manifests/encryption-config.yaml b/examples/kms-health-kind/manifests/encryption-config.yaml new file mode 100644 index 0000000000..fe10aa6c1d --- /dev/null +++ b/examples/kms-health-kind/manifests/encryption-config.yaml @@ -0,0 +1,11 @@ +apiVersion: apiserver.config.k8s.io/v1 +kind: EncryptionConfiguration +resources: +- resources: + - '*.*' + providers: + - kms: + apiVersion: v2 + name: kms-health-kind-fake + endpoint: unix:///var/run/kmsplugin-fake/kms.sock + - identity: {} diff --git a/examples/kms-health-kind/manifests/kind.yaml b/examples/kms-health-kind/manifests/kind.yaml new file mode 100644 index 0000000000..9b616edea6 --- /dev/null +++ b/examples/kms-health-kind/manifests/kind.yaml @@ -0,0 +1,22 @@ +# Minimal single-node KIND cluster. The only thing we customize is +# dropping the fake-plugin static pod manifest into +# /etc/kubernetes/manifests/ so kubelet starts it automatically. +# +# We deliberately do NOT wire kube-apiserver to KMS encryption: what +# this harness validates is the MONITOR's behavior, not +# apiserver/KMS integration. Wiring apiserver would create a +# chicken-and-egg startup dependency (apiserver needs KMS; KMS static +# pod comes up in parallel) that is orthogonal to our test. +# +# The fake-plugin pod and the monitor Deployment both mount the +# control-plane node's /tmp via hostPath, which is how they share the +# UDS. No apiserver involvement needed. +kind: Cluster +apiVersion: kind.x-k8s.io/v1alpha4 +nodes: +- role: control-plane + extraMounts: + - containerPath: /etc/kubernetes/manifests/kms-fake-plugin.yaml + hostPath: manifests/static-pod.yaml + readOnly: true + propagation: None diff --git a/examples/kms-health-kind/manifests/monitor-deployment.yaml b/examples/kms-health-kind/manifests/monitor-deployment.yaml new file mode 100644 index 0000000000..17126ed7f9 --- /dev/null +++ b/examples/kms-health-kind/manifests/monitor-deployment.yaml @@ -0,0 +1,58 @@ +# Deployment (not static pod) for the ServiceAccount; pinned to +# control-plane because the fake-plugin's UDS lives on that node's /tmp. +# +# Container block mirrors `make render-container` for KMSPluginName=fake; +# drift makes kind-verify go red. +apiVersion: apps/v1 +kind: Deployment +metadata: + name: kms-health-monitor-fake + namespace: kms-health-test +spec: + replicas: 1 + selector: + matchLabels: + app: kms-health-monitor-fake + template: + metadata: + labels: + app: kms-health-monitor-fake + spec: + serviceAccountName: kms-health-monitor + nodeSelector: + node-role.kubernetes.io/control-plane: "" + tolerations: + - key: node-role.kubernetes.io/control-plane + operator: Exists + effect: NoSchedule + containers: + - name: kms-health-monitor-fake + image: kms-health-kind-monitor:dev + imagePullPolicy: Never + # Tighter intervals than library defaults so verify.sh fits in budget. + args: + - --kms-socket=/var/run/kmsplugin-fake/kms.sock + - --probe-interval=2s + - --probe-interval-unhealthy=1s + - --probe-timeout=1s + - --write-timeout=5s + - --output-mode=configmap + - --configmap-namespace=kms-health-test + - --configmap-name=kms-health-fake + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + volumeMounts: + - name: kms-plugin-socket-fake + mountPath: /var/run/kmsplugin-fake + resources: + requests: + memory: 50Mi + cpu: 5m + volumes: + - name: kms-plugin-socket-fake + hostPath: + path: /tmp + type: Directory diff --git a/examples/kms-health-kind/manifests/namespace.yaml b/examples/kms-health-kind/manifests/namespace.yaml new file mode 100644 index 0000000000..dcd9c07bda --- /dev/null +++ b/examples/kms-health-kind/manifests/namespace.yaml @@ -0,0 +1,6 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: kms-health-test + labels: + app.kubernetes.io/managed-by: kms-health-kind-harness diff --git a/examples/kms-health-kind/manifests/rbac.yaml b/examples/kms-health-kind/manifests/rbac.yaml new file mode 100644 index 0000000000..df9a46f665 --- /dev/null +++ b/examples/kms-health-kind/manifests/rbac.yaml @@ -0,0 +1,39 @@ +# Monitor runs as a Deployment in kms-health-test; SA + RBAC all live +# in the same namespace. The Role is scoped to exactly one ConfigMap +# so the blast radius is obvious. +apiVersion: v1 +kind: ServiceAccount +metadata: + name: kms-health-monitor + namespace: kms-health-test +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: kms-health-monitor + namespace: kms-health-test +rules: +# RBAC forbids resourceNames with `create` (the resource doesn't exist +# yet, so name matching is undefined). Two-rule shape: tight scope for +# mutating verbs, namespace-wide create for the self-heal path. +- apiGroups: [""] + resources: ["configmaps"] + resourceNames: ["kms-health-fake"] + verbs: ["get", "update", "patch"] +- apiGroups: [""] + resources: ["configmaps"] + verbs: ["create"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: kms-health-monitor + namespace: kms-health-test +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: kms-health-monitor +subjects: +- kind: ServiceAccount + name: kms-health-monitor + namespace: kms-health-test diff --git a/examples/kms-health-kind/manifests/static-pod.yaml b/examples/kms-health-kind/manifests/static-pod.yaml new file mode 100644 index 0000000000..1094929b8e --- /dev/null +++ b/examples/kms-health-kind/manifests/static-pod.yaml @@ -0,0 +1,28 @@ +# Static pod for the fake plugin. kubelet creates a mirror pod in the +# apiserver for visibility — mirror pods can't reference a +# ServiceAccount, so the monitor lives in a Deployment (see +# manifests/monitor-deployment.yaml) rather than as a sibling container +# here. The /tmp hostPath volume is the coordination point — apiserver, +# this fake plugin, and the monitor pod all see the same socket file. +apiVersion: v1 +kind: Pod +metadata: + name: kms-fake-plugin + namespace: kube-system + labels: + tier: control-plane + component: kms-fake-plugin +spec: + hostNetwork: true + containers: + - name: fake-plugin + image: kms-health-kind-fake-plugin:dev + imagePullPolicy: Never + volumeMounts: + - name: kms-plugin-socket-fake + mountPath: /var/run/kmsplugin-fake + volumes: + - name: kms-plugin-socket-fake + hostPath: + path: /tmp + type: Directory diff --git a/examples/kms-health-kind/verify.sh b/examples/kms-health-kind/verify.sh new file mode 100755 index 0000000000..647da7ac82 --- /dev/null +++ b/examples/kms-health-kind/verify.sh @@ -0,0 +1,183 @@ +#!/usr/bin/env bash +# +# Acceptance test for the KIND harness. Implements the 6 assertions +# from PLAN.md §9. +# +# Exit 0 = pass. On failure: dump monitor logs, fake-plugin container +# logs, and the status ConfigMap, then exit 1. + +set -o errexit +set -o nounset +set -o pipefail + +unset KIND_EXPERIMENTAL_PROVIDER + +CLUSTER="${CLUSTER:-kms-health-claude}" +CTX="kind-${CLUSTER}" +NODE="${CLUSTER}-control-plane" +FAKE_POD="kms-fake-plugin-${NODE}" +MONITOR_NS="kms-health-test" +MONITOR_SELECTOR="app=kms-health-monitor-fake" +CM_NS="kms-health-test" +CM="kms-health-fake" + +step() { echo "[verify] $*"; } + +monitor_pod() { + kubectl --context "${CTX}" -n "${MONITOR_NS}" get pod \ + -l "${MONITOR_SELECTOR}" -o jsonpath='{.items[0].metadata.name}' 2>/dev/null +} + +dump_diagnostics() { + local mp + mp="$(monitor_pod || true)" + echo "" >&2 + echo "=== diagnostics ===" >&2 + echo "--- monitor pod describe (${mp}) ---" >&2 + kubectl --context "${CTX}" -n "${MONITOR_NS}" describe pod "${mp}" >&2 || true + echo "--- monitor logs ---" >&2 + kubectl --context "${CTX}" -n "${MONITOR_NS}" logs "${mp}" >&2 || true + echo "--- fake-plugin crictl logs ---" >&2 + local cid + cid="$(docker exec "${NODE}" crictl ps -a --name fake-plugin -q 2>/dev/null | head -1 || true)" + if [ -n "${cid}" ]; then + docker exec "${NODE}" crictl logs "${cid}" >&2 || true + fi + echo "--- configmap ---" >&2 + kubectl --context "${CTX}" -n "${CM_NS}" get cm "${CM}" -o yaml >&2 || true +} + +fail() { + echo "" >&2 + echo "FAIL: $*" >&2 + dump_diagnostics + exit 1 +} + +## Schema reminder: the writer keys data. to a JSON-encoded +## classEntry. There is no flat "current state" key; the class with +## the maximum embedded .timestamp is the current observation. +## +## data.ok = `{"timestamp":...,"observerPod":...,"keyIDHash":...}` +## data.not-ok = `{"timestamp":...,"observerPod":...,"detail":...,"keyIDHash":...}` +## data.rpc-error = `{"timestamp":...,"observerPod":...,"detail":...}` +## +## After a class transition the previous class's key is left in place +## (merge-patch semantics), so "current" is determined by max-timestamp, +## not by key existence. + +# Print the class key whose embedded .timestamp is max. Empty when the +# CM or its data is missing/empty. +current_class() { + kubectl --context "${CTX}" -n "${CM_NS}" get cm "${CM}" -o json 2>/dev/null \ + | jq -r ' + .data // {} + | to_entries + | map(.key as $k | .value | fromjson | {k: $k, ts: .timestamp}) + | (max_by(.ts) // {k: ""}).k + ' +} + +# Print one field from data.. Empty when class or field absent. +class_field() { + local class="$1" field="$2" + kubectl --context "${CTX}" -n "${CM_NS}" get cm "${CM}" -o json 2>/dev/null \ + | jq -r --arg c "$class" --arg f "$field" ' + .data[$c] // "" + | if . == "" then "" else fromjson | .[$f] // "" end + ' +} + +# wait_for PRED TIMEOUT_SECONDS MSG: evaluates PRED every second. +wait_for() { + local pred="$1" timeout="$2" msg="$3" + local deadline=$(($(date +%s) + timeout)) + while ! eval "$pred" >/dev/null 2>&1; do + if [ "$(date +%s)" -gt "$deadline" ]; then + fail "timeout (${timeout}s) waiting for: ${msg}" + fi + sleep 1 + done +} + +# Assertion 1: Bootstrap. +step "1/6 monitor Deployment Available + fake-plugin container running" +kubectl --context "${CTX}" -n "${MONITOR_NS}" rollout status deployment/kms-health-monitor-fake \ + --timeout=120s \ + || fail "monitor Deployment not Available within 120s" +FAKE_CID="$(docker exec "${NODE}" crictl ps --name fake-plugin -q 2>/dev/null | head -1)" +if [ -z "${FAKE_CID}" ]; then + fail "fake-plugin container not running on ${NODE}" +fi + +# Assertion 2: Initial healthy. +step "2/6 ConfigMap shows current class = ok" +wait_for '[ "$(current_class)" = "ok" ]' 15 "current_class=ok" +t0="$(class_field ok timestamp)" +step " t0=${t0} keyIDHash=$(class_field ok keyIDHash | cut -c1-16)… observerPod=$(class_field ok observerPod)" + +# Assertion 3: Timestamp advances. +step "3/6 data.ok.timestamp advances after 2× probe-interval" +sleep 6 # ≥2× harness probe-interval (2s) + slack +t1="$(class_field ok timestamp)" +if [ "${t1}" = "${t0}" ]; then + fail "data.ok.timestamp did not advance: t0=${t0} t1=${t1}" +fi +if ! [[ "${t1}" > "${t0}" ]]; then + fail "data.ok.timestamp moved backward: t0=${t0} t1=${t1}" +fi +step " t1=${t1} > t0 ok" + +# Assertion 4: Flip to unhealthy. +step "4/6 flip-unhealthy → current class = not-ok" +docker exec "${NODE}" touch /tmp/kms-unhealthy +wait_for '[ "$(current_class)" = "not-ok" ]' 10 "current_class=not-ok" +t2="$(class_field not-ok timestamp)" +d2="$(class_field not-ok detail)" +if ! [[ "${t2}" > "${t1}" ]]; then + fail "data.not-ok.timestamp not newer than last data.ok: t1=${t1} t2=${t2}" +fi +step " t2=${t2} detail=${d2}" + +# Assertion 5: Restore healthy. +step "5/6 flip-healthy → current class = ok" +docker exec "${NODE}" rm -f /tmp/kms-unhealthy +wait_for '[ "$(current_class)" = "ok" ]' 10 "current_class=ok after flip" +t3="$(class_field ok timestamp)" +if ! [[ "${t3}" > "${t2}" ]]; then + fail "recovery data.ok.timestamp not newer than data.not-ok: t2=${t2} t3=${t3}" +fi +step " t3=${t3} current_class=ok" + +# Assertion 6: Plugin death. +# Staydown marker pattern: kubelet's restart speed varies (1s to 9s +# observed). Without help, the probe sometimes reconnects across the +# gap thanks to grpc.WaitForReady(true) and never reports rpc-error. +# Setting /tmp/kms-staydown before killing makes fake-plugin sleep on +# its next start, giving a deterministic outage window. +step "6/6 set staydown marker + kill fake-plugin → current class = rpc-error" +FAKE_CID="$(docker exec "${NODE}" crictl ps --name fake-plugin -q 2>/dev/null | head -1)" +if [ -z "${FAKE_CID}" ]; then + fail "could not find fake-plugin container via crictl on ${NODE}" +fi +docker exec "${NODE}" touch /tmp/kms-staydown +step " stopping container ${FAKE_CID}" +docker exec "${NODE}" crictl stop "${FAKE_CID}" >/dev/null +caught=0 +for _ in $(seq 1 50); do + if [ "$(current_class)" = "rpc-error" ]; then + caught=1 + step " caught: detail=$(class_field rpc-error detail)" + break + fi + sleep 0.2 +done +docker exec "${NODE}" rm -f /tmp/kms-staydown +if [ "${caught}" -ne 1 ]; then + fail "never observed current_class=rpc-error after killing fake-plugin (window ~10s @ 200ms poll)" +fi +t4="$(class_field rpc-error timestamp)" +step " t4=${t4} (rpc-error observation timestamp)" + +echo "" +echo "✅ All 6 assertions passed on cluster ${CLUSTER}."