From 8e428e77d2c0b054093c273089012ceda8066228 Mon Sep 17 00:00:00 2001 From: IvanHunters Date: Thu, 28 May 2026 02:54:18 +0300 Subject: [PATCH 1/4] feat(restart): add TriggerOnStaleDiscovery helper Self-cancel helper that fires the manager's context when an error chain carries a NoKindMatchError. Mirrors ChangeWatcher's restart-on-drift pattern so the operator recovers from stale remote-cluster discovery without operator intervention. Signed-off-by: IvanHunters --- internal/restart/trigger.go | 64 ++++++++++++++++++++++ internal/restart/trigger_test.go | 93 ++++++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+) create mode 100644 internal/restart/trigger.go create mode 100644 internal/restart/trigger_test.go diff --git a/internal/restart/trigger.go b/internal/restart/trigger.go new file mode 100644 index 0000000..2f71bda --- /dev/null +++ b/internal/restart/trigger.go @@ -0,0 +1,64 @@ +/* +Copyright 2026 The Kilo Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package restart + +import ( + "context" + "errors" + "log/slog" + + "k8s.io/apimachinery/pkg/api/meta" +) + +// TriggerOnStaleDiscovery cancels the manager's context when err carries a +// NoKindMatchError, asking the pod to terminate so kubelet restarts it with +// a freshly-built remote-cluster registry. +// +// Why: each remote cluster in the registry owns a controller-runtime +// cluster.Cluster, whose REST mapper caches discovery results. When a +// freshly-bootstrapped tenant cluster installs its Peer CRD only after the +// ClusterMesh CR is reconciled for the first time, the mapper caches a +// negative result and never refreshes — every subsequent List(PeerList{}) +// against that target fails with NoKindMatchError until the pod restarts. +// This mirrors ChangeWatcher's self-restart pattern, which already handles +// fingerprint changes by cancelling the same context. +// +// cancel may be nil (tests/fakes) — the call is then a no-op. +// log may be nil — logging is then skipped. +// Returns true if cancel was invoked, so callers can avoid further work +// that would race with the pod shutdown. +func TriggerOnStaleDiscovery(err error, cancel context.CancelFunc, log *slog.Logger) bool { + if err == nil || cancel == nil { + return false + } + + var noMatch *meta.NoKindMatchError + if !errors.As(err, &noMatch) { + return false + } + + if log != nil { + log.Info("remote cluster discovery returned NoMatchError; triggering self-restart to refresh REST mapper", + slog.String("groupKind", noMatch.GroupKind.String()), + slog.String("error", err.Error()), + ) + } + + cancel() + + return true +} diff --git a/internal/restart/trigger_test.go b/internal/restart/trigger_test.go new file mode 100644 index 0000000..e83d0ca --- /dev/null +++ b/internal/restart/trigger_test.go @@ -0,0 +1,93 @@ +/* +Copyright 2026 The Kilo Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package restart + +import ( + "testing" + + cockerrors "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +var errUnrelated = cockerrors.New("network blew up") + +func TestTriggerOnStaleDiscovery_NilErr(t *testing.T) { + called := false + cancel := func() { called = true } + + triggered := TriggerOnStaleDiscovery(nil, cancel, testLogger()) + + assert.False(t, triggered, "nil err must not trigger") + assert.False(t, called, "cancel must not be invoked on nil err") +} + +func TestTriggerOnStaleDiscovery_NilCancel(t *testing.T) { + noMatch := &meta.NoKindMatchError{GroupKind: schema.GroupKind{Group: "kilo.squat.ai", Kind: "Peer"}} + + triggered := TriggerOnStaleDiscovery(noMatch, nil, testLogger()) + + assert.False(t, triggered, "nil cancel must yield false even for NoMatch error") +} + +func TestTriggerOnStaleDiscovery_UnrelatedErr(t *testing.T) { + called := false + cancel := func() { called = true } + + triggered := TriggerOnStaleDiscovery(errUnrelated, cancel, testLogger()) + + assert.False(t, triggered, "unrelated err must not trigger") + assert.False(t, called, "cancel must not be invoked for non-NoMatch errors") +} + +func TestTriggerOnStaleDiscovery_PlainNoMatch(t *testing.T) { + called := false + cancel := func() { called = true } + + noMatch := &meta.NoKindMatchError{GroupKind: schema.GroupKind{Group: "kilo.squat.ai", Kind: "Peer"}} + + triggered := TriggerOnStaleDiscovery(noMatch, cancel, testLogger()) + + assert.True(t, triggered, "plain NoKindMatchError must trigger") + assert.True(t, called, "cancel must be invoked") +} + +func TestTriggerOnStaleDiscovery_WrappedNoMatch(t *testing.T) { + called := false + cancel := func() { called = true } + + noMatch := &meta.NoKindMatchError{GroupKind: schema.GroupKind{Group: "kilo.squat.ai", Kind: "Peer"}} + wrapped := cockerrors.Wrap(cockerrors.Wrap(noMatch, "listing existing peers"), "reconciling peers from \"a\" to \"b\"") + + triggered := TriggerOnStaleDiscovery(wrapped, cancel, testLogger()) + + assert.True(t, triggered, "wrapped NoKindMatchError must trigger (the controller wraps the error twice in production)") + assert.True(t, called, "cancel must be invoked") +} + +func TestTriggerOnStaleDiscovery_NilLogger(t *testing.T) { + called := false + cancel := func() { called = true } + + noMatch := &meta.NoKindMatchError{GroupKind: schema.GroupKind{Group: "kilo.squat.ai", Kind: "Peer"}} + + assert.NotPanics(t, func() { + TriggerOnStaleDiscovery(noMatch, cancel, nil) + }, "nil logger must be tolerated") + assert.True(t, called) +} From e93df61074513fc0d3b0ebb53a7d5a5b792dcd7b Mon Sep 17 00:00:00 2001 From: IvanHunters Date: Thu, 28 May 2026 02:54:26 +0300 Subject: [PATCH 2/4] fix(controller): self-restart on stale remote-cluster discovery A target cluster's REST mapper caches discovery results for the lifetime of cluster.Cluster. When a freshly bootstrapped tenant installs its Peer CRD only after the operator's first reconcile of its ClusterMesh CR, the mapper holds a NoKindMatchError forever and every subsequent peer push fails until the pod restarts. Plumb the manager's cancel through ClusterMeshReconciler and invoke TriggerOnStaleDiscovery on every error return so kubelet rebuilds the registry against current discovery. Same recovery shape as the existing ChangeWatcher fingerprint-drift restart. Signed-off-by: IvanHunters --- cmd/main.go | 5 +- internal/controller/clustermesh_controller.go | 61 +++++++++++++------ 2 files changed, 44 insertions(+), 22 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index a983063..64c246e 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -122,7 +122,7 @@ func run() error { } } - if err := wireReconciler(mgr, registry, slogger); err != nil { + if err := wireReconciler(mgr, registry, slogger, cancel); err != nil { return err } @@ -313,13 +313,14 @@ func newManager(cfg *rest.Config, opts *runtimeOpts) (manager.Manager, error) { return mgr, errors.Wrap(err, "creating manager") } -func wireReconciler(mgr manager.Manager, registry *multicluster.ClusterRegistry, slogger *slog.Logger) error { +func wireReconciler(mgr manager.Manager, registry *multicluster.ClusterRegistry, slogger *slog.Logger, cancel context.CancelFunc) error { r := &controller.ClusterMeshReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Registry: registry, Log: slogger, Recorder: mgr.GetEventRecorder(controllerEventName), + Cancel: cancel, } return errors.Wrap(r.SetupWithManager(mgr), "registering ClusterMesh reconciler") diff --git a/internal/controller/clustermesh_controller.go b/internal/controller/clustermesh_controller.go index bb75393..a540d1c 100644 --- a/internal/controller/clustermesh_controller.go +++ b/internal/controller/clustermesh_controller.go @@ -38,6 +38,7 @@ import ( "github.com/squat/kilo-clustermesh-operator/internal/kilonode" "github.com/squat/kilo-clustermesh-operator/internal/multicluster" "github.com/squat/kilo-clustermesh-operator/internal/peer" + "github.com/squat/kilo-clustermesh-operator/internal/restart" "github.com/squat/kilo-clustermesh-operator/internal/validation" kilov1alpha1 "github.com/squat/kilo-clustermesh-operator/pkg/kilo/v1alpha1" ) @@ -61,22 +62,55 @@ type ClusterMeshReconciler struct { Registry *multicluster.ClusterRegistry Log *slog.Logger Recorder events.EventRecorder + + // Cancel terminates the manager's root context. It is fired when a + // reconcile error indicates the remote-cluster discovery cache is + // stale (e.g. a freshly bootstrapped tenant's Peer CRD landed after + // our first List); kubelet then restarts the pod, rebuilding the + // registry against current discovery. May be nil in tests. + Cancel context.CancelFunc } // Reconcile implements the main reconciliation loop for ClusterMesh objects. func (r *ClusterMeshReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := r.Log.With(slog.String("name", req.Name), slog.String("namespace", req.Namespace)) + err := r.reconcile(ctx, log, req) + + // A NoKindMatchError on a remote cluster's REST mapper survives the + // lifetime of cluster.Cluster because the negative discovery entry is + // cached. Self-cancel so kubelet rebuilds the registry against fresh + // discovery — same recovery shape as ChangeWatcher's fingerprint-drift + // restart. + restart.TriggerOnStaleDiscovery(err, r.Cancel, log) + + return ctrl.Result{}, err +} + +// SetupWithManager registers the controller with the manager. +func (r *ClusterMeshReconciler) SetupWithManager(mgr ctrl.Manager) error { + err := ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1.ClusterMesh{}). + Named("clustermesh"). + WithEventFilter(predicate.Funcs{ + DeleteFunc: func(event.DeleteEvent) bool { return false }, + }). + Complete(r) + + return errors.Wrap(err, "building clustermesh controller") +} + +func (r *ClusterMeshReconciler) reconcile(ctx context.Context, log *slog.Logger, req ctrl.Request) error { mesh := &v1alpha1.ClusterMesh{} err := r.Get(ctx, req.NamespacedName, mesh) if err != nil { - return ctrl.Result{}, errors.Wrap(client.IgnoreNotFound(err), "fetching ClusterMesh") + return errors.Wrap(client.IgnoreNotFound(err), "fetching ClusterMesh") } // Handle deletion via finalizer. if !mesh.DeletionTimestamp.IsZero() { - return ctrl.Result{}, r.handleDeletion(ctx, log, mesh) + return r.handleDeletion(ctx, log, mesh) } // Ensure finalizer is present. @@ -85,15 +119,15 @@ func (r *ClusterMeshReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = r.Update(ctx, mesh) if err != nil { - return ctrl.Result{}, errors.Wrap(err, "adding finalizer") + return errors.Wrap(err, "adding finalizer") } - return ctrl.Result{}, nil + return nil } // Mesh-level validation. if overlap, msg := r.validateMeshNetworks(ctx, log, mesh); overlap { - return ctrl.Result{}, r.setOverlapCondition(ctx, mesh, msg) + return r.setOverlapCondition(ctx, mesh, msg) } setCondition(mesh, "NetworksOverlap", metav1.ConditionFalse, "NoOverlap", "all CIDRs are disjoint") @@ -101,7 +135,7 @@ func (r *ClusterMeshReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Reconcile per-cluster peers. clusterStatuses, err := r.reconcileAllClusters(ctx, log, mesh) if err != nil { - return ctrl.Result{}, err + return err } // Sweep peers whose source-cluster was removed from spec.Clusters since @@ -118,20 +152,7 @@ func (r *ClusterMeshReconciler) Reconcile(ctx context.Context, req ctrl.Request) // the global cleanup pass so the cluster always converges. r.cleanupOrphanMeshPeers(ctx, log, mesh.Namespace) - return ctrl.Result{}, r.updateStatus(ctx, mesh, clusterStatuses) -} - -// SetupWithManager registers the controller with the manager. -func (r *ClusterMeshReconciler) SetupWithManager(mgr ctrl.Manager) error { - err := ctrl.NewControllerManagedBy(mgr). - For(&v1alpha1.ClusterMesh{}). - Named("clustermesh"). - WithEventFilter(predicate.Funcs{ - DeleteFunc: func(event.DeleteEvent) bool { return false }, - }). - Complete(r) - - return errors.Wrap(err, "building clustermesh controller") + return r.updateStatus(ctx, mesh, clusterStatuses) } // cleanupSweepTimeout caps the per-target list/delete pass time so a single From a35b8a27eb424b8b3227132bd105e0c66610be2c Mon Sep 17 00:00:00 2001 From: IvanHunters Date: Thu, 28 May 2026 03:32:12 +0300 Subject: [PATCH 3/4] fix(controller): reset target REST mapper instead of restarting pod on stale discovery When a reconcile error carries a NoKindMatchError, reset every remote cluster's REST mapper so the next reconcile re-discovers the missing kind. The previous self-cancel approach killed the leader lease and risked CrashLoopBackOff under repeated stale-CRD events. Mapper reset is scoped to cache invalidation, costs one discovery RPC on the next List, and keeps the rest of the operator running. - multicluster.ClusterRegistry gains Mappers() returning every remote cluster's REST mapper; directCluster.GetRESTMapper() returns nil so envtest-driven integration tests stay safe. - restart.RefreshMapperOnNoMatch replaces TriggerOnStaleDiscovery; accepts a single mapper, type-asserts to meta.ResettableRESTMapper. - ClusterMeshReconciler.Reconcile loops over Mappers() on every return. Cancel field removed from the reconciler; ChangeWatcher still cancels on kubeconfig drift where that is the correct shape. Signed-off-by: IvanHunters --- cmd/main.go | 5 +- internal/controller/clustermesh_controller.go | 23 +++-- internal/multicluster/registry.go | 29 +++++- internal/restart/trigger.go | 48 ++++++---- internal/restart/trigger_test.go | 92 ++++++++++++------- 5 files changed, 125 insertions(+), 72 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 64c246e..a983063 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -122,7 +122,7 @@ func run() error { } } - if err := wireReconciler(mgr, registry, slogger, cancel); err != nil { + if err := wireReconciler(mgr, registry, slogger); err != nil { return err } @@ -313,14 +313,13 @@ func newManager(cfg *rest.Config, opts *runtimeOpts) (manager.Manager, error) { return mgr, errors.Wrap(err, "creating manager") } -func wireReconciler(mgr manager.Manager, registry *multicluster.ClusterRegistry, slogger *slog.Logger, cancel context.CancelFunc) error { +func wireReconciler(mgr manager.Manager, registry *multicluster.ClusterRegistry, slogger *slog.Logger) error { r := &controller.ClusterMeshReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Registry: registry, Log: slogger, Recorder: mgr.GetEventRecorder(controllerEventName), - Cancel: cancel, } return errors.Wrap(r.SetupWithManager(mgr), "registering ClusterMesh reconciler") diff --git a/internal/controller/clustermesh_controller.go b/internal/controller/clustermesh_controller.go index a540d1c..0765b58 100644 --- a/internal/controller/clustermesh_controller.go +++ b/internal/controller/clustermesh_controller.go @@ -62,13 +62,6 @@ type ClusterMeshReconciler struct { Registry *multicluster.ClusterRegistry Log *slog.Logger Recorder events.EventRecorder - - // Cancel terminates the manager's root context. It is fired when a - // reconcile error indicates the remote-cluster discovery cache is - // stale (e.g. a freshly bootstrapped tenant's Peer CRD landed after - // our first List); kubelet then restarts the pod, rebuilding the - // registry against current discovery. May be nil in tests. - Cancel context.CancelFunc } // Reconcile implements the main reconciliation loop for ClusterMesh objects. @@ -78,11 +71,17 @@ func (r *ClusterMeshReconciler) Reconcile(ctx context.Context, req ctrl.Request) err := r.reconcile(ctx, log, req) // A NoKindMatchError on a remote cluster's REST mapper survives the - // lifetime of cluster.Cluster because the negative discovery entry is - // cached. Self-cancel so kubelet rebuilds the registry against fresh - // discovery — same recovery shape as ChangeWatcher's fingerprint-drift - // restart. - restart.TriggerOnStaleDiscovery(err, r.Cancel, log) + // lifetime of cluster.Cluster because the negative discovery entry + // is cached. Reset every remote-cluster mapper so the next reconcile + // re-discovers the missing kind; the source target is unknown at + // this level (the wrapped error carries it as text only), and + // Reset() is cheap — it only invalidates the in-memory cache and + // the next List() pays a one-time discovery round-trip. Self-heal + // without taking the operator pod down, which would drop the leader + // lease and inflate to a CrashLoopBackOff after a few stale CRDs. + for _, m := range r.Registry.Mappers() { + restart.RefreshMapperOnNoMatch(err, m, log) + } return ctrl.Result{}, err } diff --git a/internal/multicluster/registry.go b/internal/multicluster/registry.go index 1f23448..fadfeee 100644 --- a/internal/multicluster/registry.go +++ b/internal/multicluster/registry.go @@ -73,9 +73,12 @@ func (d *directCluster) GetFieldIndexer() client.FieldIndexer { func (d *directCluster) GetCache() cache.Cache { panic("directCluster: GetCache not implemented") } -func (d *directCluster) GetRESTMapper() meta.RESTMapper { - panic("directCluster: GetRESTMapper not implemented") -} +// GetRESTMapper returns nil. directCluster is only used in integration +// tests that drive the reconciler against envtest-built clients; the +// stale-discovery recovery path treats a nil mapper as a no-op, so +// returning nil keeps Mappers() callable without panicking the test +// process. +func (d *directCluster) GetRESTMapper() meta.RESTMapper { return nil } func (d *directCluster) GetAPIReader() client.Reader { panic("directCluster: GetAPIReader not implemented") @@ -239,3 +242,23 @@ func (r *ClusterRegistry) Clusters() []string { func (r *ClusterRegistry) All() map[string]cluster.Cluster { return r.clusters } + +// Mappers returns the REST mapper of every registered cluster, in +// arbitrary order. Nil mappers (e.g. from test-only directCluster) are +// skipped so callers can iterate without nil-guarding. Used by the +// reconciler to invalidate stale discovery caches after a +// NoKindMatchError without restarting the operator pod. +func (r *ClusterRegistry) Mappers() []meta.RESTMapper { + mappers := make([]meta.RESTMapper, 0, len(r.clusters)) + + for _, c := range r.clusters { + m := c.GetRESTMapper() + if m == nil { + continue + } + + mappers = append(mappers, m) + } + + return mappers +} diff --git a/internal/restart/trigger.go b/internal/restart/trigger.go index 2f71bda..f63df6b 100644 --- a/internal/restart/trigger.go +++ b/internal/restart/trigger.go @@ -17,32 +17,35 @@ limitations under the License. package restart import ( - "context" "errors" "log/slog" "k8s.io/apimachinery/pkg/api/meta" ) -// TriggerOnStaleDiscovery cancels the manager's context when err carries a -// NoKindMatchError, asking the pod to terminate so kubelet restarts it with -// a freshly-built remote-cluster registry. +// RefreshMapperOnNoMatch resets the given REST mapper when err carries a +// NoKindMatchError. Without this, controller-runtime's REST mapper caches +// the negative discovery entry for the lifetime of the cluster.Cluster — a +// freshly bootstrapped tenant that installs its Peer CRD only after our +// first List(PeerList{}) would deadlock the reconcile loop forever, with +// requeue-with-backoff never refreshing discovery. // -// Why: each remote cluster in the registry owns a controller-runtime -// cluster.Cluster, whose REST mapper caches discovery results. When a -// freshly-bootstrapped tenant cluster installs its Peer CRD only after the -// ClusterMesh CR is reconciled for the first time, the mapper caches a -// negative result and never refreshes — every subsequent List(PeerList{}) -// against that target fails with NoKindMatchError until the pod restarts. -// This mirrors ChangeWatcher's self-restart pattern, which already handles -// fingerprint changes by cancelling the same context. +// Resetting the mapper instead of restarting the operator pod avoids +// kubelet CrashLoopBackOff (which inflates to a 5-minute wait after a +// handful of restarts), lets the manager keep its leader lease, and +// scopes recovery to the one cluster whose CRD state actually drifted. +// The next reconcile picks up the fresh mapping via Discovery and the +// peer push succeeds without further intervention. // -// cancel may be nil (tests/fakes) — the call is then a no-op. +// mapper may be nil — the call is then a no-op. If the mapper does not +// implement meta.ResettableRESTMapper (an in-memory test fake, say) the +// call also no-ops; that is acceptable because the production code path +// always builds clusters through controller-runtime's dynamic mapper. // log may be nil — logging is then skipped. -// Returns true if cancel was invoked, so callers can avoid further work -// that would race with the pod shutdown. -func TriggerOnStaleDiscovery(err error, cancel context.CancelFunc, log *slog.Logger) bool { - if err == nil || cancel == nil { +// Returns true when Reset() was actually invoked, so callers can avoid +// double-emitting the recovery event. +func RefreshMapperOnNoMatch(err error, mapper meta.RESTMapper, log *slog.Logger) bool { + if err == nil || mapper == nil { return false } @@ -51,14 +54,19 @@ func TriggerOnStaleDiscovery(err error, cancel context.CancelFunc, log *slog.Log return false } + resettable, ok := mapper.(meta.ResettableRESTMapper) + if !ok { + return false + } + + resettable.Reset() + if log != nil { - log.Info("remote cluster discovery returned NoMatchError; triggering self-restart to refresh REST mapper", + log.Info("reset target cluster REST mapper after NoMatchError; next reconcile will refresh discovery", slog.String("groupKind", noMatch.GroupKind.String()), slog.String("error", err.Error()), ) } - cancel() - return true } diff --git a/internal/restart/trigger_test.go b/internal/restart/trigger_test.go index e83d0ca..18179f1 100644 --- a/internal/restart/trigger_test.go +++ b/internal/restart/trigger_test.go @@ -27,67 +27,91 @@ import ( var errUnrelated = cockerrors.New("network blew up") -func TestTriggerOnStaleDiscovery_NilErr(t *testing.T) { - called := false - cancel := func() { called = true } +// resettableSpy is a meta.ResettableRESTMapper that counts Reset() calls +// and delegates the rest of the interface to an unused fallback. The +// helper only ever invokes Reset; the other methods are not exercised +// from production code paths under test. +type resettableSpy struct { + meta.RESTMapper + + resets int +} + +func (r *resettableSpy) Reset() { r.resets++ } + +func TestRefreshMapperOnNoMatch_NilErr(t *testing.T) { + spy := &resettableSpy{} - triggered := TriggerOnStaleDiscovery(nil, cancel, testLogger()) + called := RefreshMapperOnNoMatch(nil, spy, testLogger()) - assert.False(t, triggered, "nil err must not trigger") - assert.False(t, called, "cancel must not be invoked on nil err") + assert.False(t, called, "nil err must not reset") + assert.Equal(t, 0, spy.resets, "Reset must not be invoked on nil err") } -func TestTriggerOnStaleDiscovery_NilCancel(t *testing.T) { +func TestRefreshMapperOnNoMatch_NilMapper(t *testing.T) { noMatch := &meta.NoKindMatchError{GroupKind: schema.GroupKind{Group: "kilo.squat.ai", Kind: "Peer"}} - triggered := TriggerOnStaleDiscovery(noMatch, nil, testLogger()) + called := RefreshMapperOnNoMatch(noMatch, nil, testLogger()) - assert.False(t, triggered, "nil cancel must yield false even for NoMatch error") + assert.False(t, called, "nil mapper must yield false even for NoMatch error") } -func TestTriggerOnStaleDiscovery_UnrelatedErr(t *testing.T) { - called := false - cancel := func() { called = true } +func TestRefreshMapperOnNoMatch_UnrelatedErr(t *testing.T) { + spy := &resettableSpy{} - triggered := TriggerOnStaleDiscovery(errUnrelated, cancel, testLogger()) + called := RefreshMapperOnNoMatch(errUnrelated, spy, testLogger()) - assert.False(t, triggered, "unrelated err must not trigger") - assert.False(t, called, "cancel must not be invoked for non-NoMatch errors") + assert.False(t, called, "unrelated err must not reset") + assert.Equal(t, 0, spy.resets, "Reset must not be invoked for non-NoMatch errors") } -func TestTriggerOnStaleDiscovery_PlainNoMatch(t *testing.T) { - called := false - cancel := func() { called = true } - +func TestRefreshMapperOnNoMatch_PlainNoMatch(t *testing.T) { + spy := &resettableSpy{} noMatch := &meta.NoKindMatchError{GroupKind: schema.GroupKind{Group: "kilo.squat.ai", Kind: "Peer"}} - triggered := TriggerOnStaleDiscovery(noMatch, cancel, testLogger()) + called := RefreshMapperOnNoMatch(noMatch, spy, testLogger()) - assert.True(t, triggered, "plain NoKindMatchError must trigger") - assert.True(t, called, "cancel must be invoked") + assert.True(t, called, "plain NoKindMatchError must reset") + assert.Equal(t, 1, spy.resets, "Reset must be invoked exactly once") } -func TestTriggerOnStaleDiscovery_WrappedNoMatch(t *testing.T) { - called := false - cancel := func() { called = true } - +func TestRefreshMapperOnNoMatch_WrappedNoMatch(t *testing.T) { + spy := &resettableSpy{} noMatch := &meta.NoKindMatchError{GroupKind: schema.GroupKind{Group: "kilo.squat.ai", Kind: "Peer"}} wrapped := cockerrors.Wrap(cockerrors.Wrap(noMatch, "listing existing peers"), "reconciling peers from \"a\" to \"b\"") - triggered := TriggerOnStaleDiscovery(wrapped, cancel, testLogger()) + called := RefreshMapperOnNoMatch(wrapped, spy, testLogger()) - assert.True(t, triggered, "wrapped NoKindMatchError must trigger (the controller wraps the error twice in production)") - assert.True(t, called, "cancel must be invoked") + assert.True(t, called, "wrapped NoKindMatchError must reset (the controller wraps the error twice in production)") + assert.Equal(t, 1, spy.resets) } -func TestTriggerOnStaleDiscovery_NilLogger(t *testing.T) { - called := false - cancel := func() { called = true } +func TestRefreshMapperOnNoMatch_NonResettableMapper(t *testing.T) { + // A bare meta.RESTMapper that does not implement ResettableRESTMapper. + // Production builds always get a resettable mapper from + // apiutil.NewDynamicRESTMapper, but tests/fakes may pass anything. + bare := stubRESTMapper{} + called := RefreshMapperOnNoMatch( + &meta.NoKindMatchError{GroupKind: schema.GroupKind{Group: "g", Kind: "K"}}, + bare, + testLogger(), + ) + + assert.False(t, called, "non-resettable mapper must yield false") +} + +func TestRefreshMapperOnNoMatch_NilLogger(t *testing.T) { + spy := &resettableSpy{} noMatch := &meta.NoKindMatchError{GroupKind: schema.GroupKind{Group: "kilo.squat.ai", Kind: "Peer"}} assert.NotPanics(t, func() { - TriggerOnStaleDiscovery(noMatch, cancel, nil) + RefreshMapperOnNoMatch(noMatch, spy, nil) }, "nil logger must be tolerated") - assert.True(t, called) + assert.Equal(t, 1, spy.resets) } + +// stubRESTMapper is a placeholder meta.RESTMapper that does not implement +// ResettableRESTMapper. It is used only to verify the type-assertion +// guard; none of its methods are called in tests. +type stubRESTMapper struct{ meta.RESTMapper } From 29ab12f2374fa39c71251af03863b54271251b2e Mon Sep 17 00:00:00 2001 From: IvanHunters Date: Thu, 28 May 2026 03:45:46 +0300 Subject: [PATCH 4/4] fix(multicluster): wrap dynamic REST mapper to make Reset() actually fire MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit controller-runtime's apiutil.NewDynamicRESTMapper returns an unexported *mapper that has no Reset() method, so the meta ResettableRESTMapper type assertion inside restart.RefreshMapperOnNoMatch was failing in production and the recovery path was silently no-opping. The previous commit shipped a placebo: unit tests passed against a hand-rolled spy but the prod mapper never got reset, so the original mesh1 deadlock would not self-heal as advertised. Introduce resettableDynamicMapper, a thin proxy around apiutil.NewDynamicRESTMapper that implements meta.ResettableRESTMapper. Reset() atomically swaps the underlying mapper for a freshly constructed one; NewDynamicRESTMapper is lazy about discovery so the rebuild is cheap. On rebuild failure the old mapper is preserved so callers still have a working mapper for kinds already in cache. The wrapper is wired into multicluster.Build via cluster.Options.MapperProvider so every remote cluster.Cluster ends up with a resettable mapper. Tests: - TestResettableDynamicMapperImplementsResettable: load-bearing invariant — guards against the regression that just happened. - TestRawDynamicMapperIsNotResettable: documents the upstream gap; flips green and tells us to delete the wrapper if controller-runtime ever adds Reset to its dynamic mapper. - TestResettableDynamicMapperResetReplacesUnderlying: Reset is not a no-op stub. Also: log message in restart.RefreshMapperOnNoMatch said "target cluster"; on the reconciler's fan-out path it is one of N remote clusters, not a single target. Reword to "remote-cluster". Signed-off-by: IvanHunters --- internal/multicluster/mapper.go | 129 +++++++++++++++++++++++++++ internal/multicluster/mapper_test.go | 85 ++++++++++++++++++ internal/multicluster/registry.go | 11 ++- internal/restart/trigger.go | 2 +- 4 files changed, 224 insertions(+), 3 deletions(-) create mode 100644 internal/multicluster/mapper.go create mode 100644 internal/multicluster/mapper_test.go diff --git a/internal/multicluster/mapper.go b/internal/multicluster/mapper.go new file mode 100644 index 0000000..582a8f0 --- /dev/null +++ b/internal/multicluster/mapper.go @@ -0,0 +1,129 @@ +/* +Copyright 2026 The Kilo Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multicluster + +import ( + "net/http" + "sync" + + "github.com/cockroachdb/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" +) + +// resettableDynamicMapper wraps apiutil.NewDynamicRESTMapper with a +// Reset() that atomically swaps the underlying mapper for a fresh one. +// +// controller-runtime's dynamic mapper does not implement +// meta.ResettableRESTMapper on its own: the unexported *apiutil.mapper +// type exposes KindFor, RESTMapping and the rest of meta.RESTMapper but +// has no Reset method. Without this wrapper, internal/restart's +// recovery path cannot invalidate the discovery cache through the +// standard interface and silently no-ops in production. +// +// Reset reconstructs the underlying dynamic mapper. NewDynamicRESTMapper +// is lazy (it does not perform discovery at construction time, only on +// first lookup), so Reset is cheap; the next List against an uncached +// kind triggers one discovery round-trip against the target apiserver. +// On rebuild failure the previous mapper is preserved so callers still +// have a working mapper for kinds already in cache; the next Reset +// retries the rebuild. +// +// The proxy methods deliberately do not wrap the underlying mapper's +// errors. Wrapping with a non-apimachinery error type would change the +// error chain in ways errors.As consumers downstream (notably +// restart.RefreshMapperOnNoMatch, which fishes out *meta.NoKindMatchError) +// already handle, but adding a wrap layer here gives no diagnostic +// value over the upstream error and only increases chain depth. +type resettableDynamicMapper struct { + cfg *rest.Config + httpClient *http.Client + + mu sync.RWMutex + mapper meta.RESTMapper +} + +var _ meta.ResettableRESTMapper = (*resettableDynamicMapper)(nil) + +// newResettableDynamicMapper builds the wrapper and pre-constructs the +// initial dynamic mapper so the first RESTMapper call does not pay the +// build cost. The signature matches cluster.Options.MapperProvider so +// it can be passed straight into cluster.New. +func newResettableDynamicMapper(cfg *rest.Config, httpClient *http.Client) (meta.RESTMapper, error) { + m, err := apiutil.NewDynamicRESTMapper(cfg, httpClient) + if err != nil { + return nil, errors.Wrap(err, "building dynamic REST mapper") + } + + return &resettableDynamicMapper{ + cfg: cfg, + httpClient: httpClient, + mapper: m, + }, nil +} + +// Reset replaces the underlying dynamic mapper with a freshly built +// one. On rebuild failure the existing mapper is kept and the call +// silently no-ops; the next Reset will retry. Concurrent callers see +// the swap atomically. +func (r *resettableDynamicMapper) Reset() { + fresh, err := apiutil.NewDynamicRESTMapper(r.cfg, r.httpClient) + if err != nil { + return + } + + r.mu.Lock() + r.mapper = fresh + r.mu.Unlock() +} + +func (r *resettableDynamicMapper) KindFor(input schema.GroupVersionResource) (schema.GroupVersionKind, error) { + return r.current().KindFor(input) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers +} + +func (r *resettableDynamicMapper) KindsFor(input schema.GroupVersionResource) ([]schema.GroupVersionKind, error) { + return r.current().KindsFor(input) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers +} + +func (r *resettableDynamicMapper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error) { + return r.current().ResourceFor(input) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers +} + +func (r *resettableDynamicMapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) { + return r.current().ResourcesFor(input) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers +} + +func (r *resettableDynamicMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) { + return r.current().RESTMapping(gk, versions...) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers +} + +func (r *resettableDynamicMapper) RESTMappings(gk schema.GroupKind, versions ...string) ([]*meta.RESTMapping, error) { + return r.current().RESTMappings(gk, versions...) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers +} + +func (r *resettableDynamicMapper) ResourceSingularizer(resource string) (string, error) { + return r.current().ResourceSingularizer(resource) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers +} + +func (r *resettableDynamicMapper) current() meta.RESTMapper { + r.mu.RLock() + defer r.mu.RUnlock() + + return r.mapper +} diff --git a/internal/multicluster/mapper_test.go b/internal/multicluster/mapper_test.go new file mode 100644 index 0000000..37c2bb3 --- /dev/null +++ b/internal/multicluster/mapper_test.go @@ -0,0 +1,85 @@ +/* +Copyright 2026 The Kilo Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multicluster + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" +) + +// TestResettableDynamicMapperImplementsResettable is the load-bearing +// invariant of the entire stale-discovery recovery path: the mapper we +// hand to controller-runtime must satisfy meta.ResettableRESTMapper so +// restart.RefreshMapperOnNoMatch can call Reset() through the standard +// interface. Without this, the recovery path silently no-ops in +// production. +func TestResettableDynamicMapperImplementsResettable(t *testing.T) { + cfg := &rest.Config{Host: "https://127.0.0.1:6443"} + + httpClient, err := rest.HTTPClientFor(cfg) + require.NoError(t, err) + + m, err := newResettableDynamicMapper(cfg, httpClient) + require.NoError(t, err) + + _, ok := m.(meta.ResettableRESTMapper) + assert.True(t, ok, "newResettableDynamicMapper must satisfy meta.ResettableRESTMapper; otherwise restart.RefreshMapperOnNoMatch is a no-op") +} + +// TestRawDynamicMapperIsNotResettable documents the upstream gap this +// wrapper exists to close. If a future controller-runtime release adds +// Reset() to the dynamic mapper, this test will flip and the wrapper +// can be deleted. Failing here means the wrapper is no longer needed. +func TestRawDynamicMapperIsNotResettable(t *testing.T) { + cfg := &rest.Config{Host: "https://127.0.0.1:6443"} + + httpClient, err := rest.HTTPClientFor(cfg) + require.NoError(t, err) + + raw, err := apiutil.NewDynamicRESTMapper(cfg, httpClient) + require.NoError(t, err) + + _, ok := raw.(meta.ResettableRESTMapper) + assert.False(t, ok, "apiutil.NewDynamicRESTMapper now implements meta.ResettableRESTMapper — the resettableDynamicMapper wrapper can be removed") +} + +// TestResettableDynamicMapperResetReplacesUnderlying verifies that +// Reset swaps the underlying mapper for a new instance, rather than +// being a no-op stub. +func TestResettableDynamicMapperResetReplacesUnderlying(t *testing.T) { + cfg := &rest.Config{Host: "https://127.0.0.1:6443"} + + httpClient, err := rest.HTTPClientFor(cfg) + require.NoError(t, err) + + mAny, err := newResettableDynamicMapper(cfg, httpClient) + require.NoError(t, err) + + wrapper, ok := mAny.(*resettableDynamicMapper) + require.True(t, ok) + + before := wrapper.current() + wrapper.Reset() + after := wrapper.current() + + assert.NotSame(t, before, after, "Reset must replace the underlying mapper with a fresh instance") +} diff --git a/internal/multicluster/registry.go b/internal/multicluster/registry.go index fadfeee..a1c2ae5 100644 --- a/internal/multicluster/registry.go +++ b/internal/multicluster/registry.go @@ -160,8 +160,15 @@ func Build( reg.local = entry.Name } - c, err := cluster.New(cfg, func(o *cluster.Options) { - o.Scheme = scheme + c, err := cluster.New(cfg, func(opts *cluster.Options) { + opts.Scheme = scheme + // Replace controller-runtime's default dynamic mapper with + // the resettable wrapper so internal/restart can invalidate + // the discovery cache through meta.ResettableRESTMapper. + // The unexported *apiutil.mapper that NewDynamicRESTMapper + // returns does not implement Reset(), which would silently + // turn the recovery path into a no-op. + opts.MapperProvider = newResettableDynamicMapper }) if err != nil { log.Warn("skipping cluster entry during registry build", diff --git a/internal/restart/trigger.go b/internal/restart/trigger.go index f63df6b..413e869 100644 --- a/internal/restart/trigger.go +++ b/internal/restart/trigger.go @@ -62,7 +62,7 @@ func RefreshMapperOnNoMatch(err error, mapper meta.RESTMapper, log *slog.Logger) resettable.Reset() if log != nil { - log.Info("reset target cluster REST mapper after NoMatchError; next reconcile will refresh discovery", + log.Info("reset remote-cluster REST mapper after NoMatchError; next reconcile will refresh discovery", slog.String("groupKind", noMatch.GroupKind.String()), slog.String("error", err.Error()), )