diff --git a/internal/controller/clustermesh_controller.go b/internal/controller/clustermesh_controller.go index bb75393..0765b58 100644 --- a/internal/controller/clustermesh_controller.go +++ b/internal/controller/clustermesh_controller.go @@ -38,6 +38,7 @@ import ( "github.com/squat/kilo-clustermesh-operator/internal/kilonode" "github.com/squat/kilo-clustermesh-operator/internal/multicluster" "github.com/squat/kilo-clustermesh-operator/internal/peer" + "github.com/squat/kilo-clustermesh-operator/internal/restart" "github.com/squat/kilo-clustermesh-operator/internal/validation" kilov1alpha1 "github.com/squat/kilo-clustermesh-operator/pkg/kilo/v1alpha1" ) @@ -67,16 +68,48 @@ type ClusterMeshReconciler struct { func (r *ClusterMeshReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := r.Log.With(slog.String("name", req.Name), slog.String("namespace", req.Namespace)) + err := r.reconcile(ctx, log, req) + + // A NoKindMatchError on a remote cluster's REST mapper survives the + // lifetime of cluster.Cluster because the negative discovery entry + // is cached. Reset every remote-cluster mapper so the next reconcile + // re-discovers the missing kind; the source target is unknown at + // this level (the wrapped error carries it as text only), and + // Reset() is cheap — it only invalidates the in-memory cache and + // the next List() pays a one-time discovery round-trip. Self-heal + // without taking the operator pod down, which would drop the leader + // lease and inflate to a CrashLoopBackOff after a few stale CRDs. + for _, m := range r.Registry.Mappers() { + restart.RefreshMapperOnNoMatch(err, m, log) + } + + return ctrl.Result{}, err +} + +// SetupWithManager registers the controller with the manager. +func (r *ClusterMeshReconciler) SetupWithManager(mgr ctrl.Manager) error { + err := ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1.ClusterMesh{}). + Named("clustermesh"). + WithEventFilter(predicate.Funcs{ + DeleteFunc: func(event.DeleteEvent) bool { return false }, + }). + Complete(r) + + return errors.Wrap(err, "building clustermesh controller") +} + +func (r *ClusterMeshReconciler) reconcile(ctx context.Context, log *slog.Logger, req ctrl.Request) error { mesh := &v1alpha1.ClusterMesh{} err := r.Get(ctx, req.NamespacedName, mesh) if err != nil { - return ctrl.Result{}, errors.Wrap(client.IgnoreNotFound(err), "fetching ClusterMesh") + return errors.Wrap(client.IgnoreNotFound(err), "fetching ClusterMesh") } // Handle deletion via finalizer. if !mesh.DeletionTimestamp.IsZero() { - return ctrl.Result{}, r.handleDeletion(ctx, log, mesh) + return r.handleDeletion(ctx, log, mesh) } // Ensure finalizer is present. @@ -85,15 +118,15 @@ func (r *ClusterMeshReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = r.Update(ctx, mesh) if err != nil { - return ctrl.Result{}, errors.Wrap(err, "adding finalizer") + return errors.Wrap(err, "adding finalizer") } - return ctrl.Result{}, nil + return nil } // Mesh-level validation. if overlap, msg := r.validateMeshNetworks(ctx, log, mesh); overlap { - return ctrl.Result{}, r.setOverlapCondition(ctx, mesh, msg) + return r.setOverlapCondition(ctx, mesh, msg) } setCondition(mesh, "NetworksOverlap", metav1.ConditionFalse, "NoOverlap", "all CIDRs are disjoint") @@ -101,7 +134,7 @@ func (r *ClusterMeshReconciler) Reconcile(ctx context.Context, req ctrl.Request) // Reconcile per-cluster peers. clusterStatuses, err := r.reconcileAllClusters(ctx, log, mesh) if err != nil { - return ctrl.Result{}, err + return err } // Sweep peers whose source-cluster was removed from spec.Clusters since @@ -118,20 +151,7 @@ func (r *ClusterMeshReconciler) Reconcile(ctx context.Context, req ctrl.Request) // the global cleanup pass so the cluster always converges. r.cleanupOrphanMeshPeers(ctx, log, mesh.Namespace) - return ctrl.Result{}, r.updateStatus(ctx, mesh, clusterStatuses) -} - -// SetupWithManager registers the controller with the manager. -func (r *ClusterMeshReconciler) SetupWithManager(mgr ctrl.Manager) error { - err := ctrl.NewControllerManagedBy(mgr). - For(&v1alpha1.ClusterMesh{}). - Named("clustermesh"). - WithEventFilter(predicate.Funcs{ - DeleteFunc: func(event.DeleteEvent) bool { return false }, - }). - Complete(r) - - return errors.Wrap(err, "building clustermesh controller") + return r.updateStatus(ctx, mesh, clusterStatuses) } // cleanupSweepTimeout caps the per-target list/delete pass time so a single diff --git a/internal/multicluster/mapper.go b/internal/multicluster/mapper.go new file mode 100644 index 0000000..582a8f0 --- /dev/null +++ b/internal/multicluster/mapper.go @@ -0,0 +1,129 @@ +/* +Copyright 2026 The Kilo Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multicluster + +import ( + "net/http" + "sync" + + "github.com/cockroachdb/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" +) + +// resettableDynamicMapper wraps apiutil.NewDynamicRESTMapper with a +// Reset() that atomically swaps the underlying mapper for a fresh one. +// +// controller-runtime's dynamic mapper does not implement +// meta.ResettableRESTMapper on its own: the unexported *apiutil.mapper +// type exposes KindFor, RESTMapping and the rest of meta.RESTMapper but +// has no Reset method. Without this wrapper, internal/restart's +// recovery path cannot invalidate the discovery cache through the +// standard interface and silently no-ops in production. +// +// Reset reconstructs the underlying dynamic mapper. NewDynamicRESTMapper +// is lazy (it does not perform discovery at construction time, only on +// first lookup), so Reset is cheap; the next List against an uncached +// kind triggers one discovery round-trip against the target apiserver. +// On rebuild failure the previous mapper is preserved so callers still +// have a working mapper for kinds already in cache; the next Reset +// retries the rebuild. +// +// The proxy methods deliberately do not wrap the underlying mapper's +// errors. Wrapping with a non-apimachinery error type would change the +// error chain in ways errors.As consumers downstream (notably +// restart.RefreshMapperOnNoMatch, which fishes out *meta.NoKindMatchError) +// already handle, but adding a wrap layer here gives no diagnostic +// value over the upstream error and only increases chain depth. +type resettableDynamicMapper struct { + cfg *rest.Config + httpClient *http.Client + + mu sync.RWMutex + mapper meta.RESTMapper +} + +var _ meta.ResettableRESTMapper = (*resettableDynamicMapper)(nil) + +// newResettableDynamicMapper builds the wrapper and pre-constructs the +// initial dynamic mapper so the first RESTMapper call does not pay the +// build cost. The signature matches cluster.Options.MapperProvider so +// it can be passed straight into cluster.New. +func newResettableDynamicMapper(cfg *rest.Config, httpClient *http.Client) (meta.RESTMapper, error) { + m, err := apiutil.NewDynamicRESTMapper(cfg, httpClient) + if err != nil { + return nil, errors.Wrap(err, "building dynamic REST mapper") + } + + return &resettableDynamicMapper{ + cfg: cfg, + httpClient: httpClient, + mapper: m, + }, nil +} + +// Reset replaces the underlying dynamic mapper with a freshly built +// one. On rebuild failure the existing mapper is kept and the call +// silently no-ops; the next Reset will retry. Concurrent callers see +// the swap atomically. +func (r *resettableDynamicMapper) Reset() { + fresh, err := apiutil.NewDynamicRESTMapper(r.cfg, r.httpClient) + if err != nil { + return + } + + r.mu.Lock() + r.mapper = fresh + r.mu.Unlock() +} + +func (r *resettableDynamicMapper) KindFor(input schema.GroupVersionResource) (schema.GroupVersionKind, error) { + return r.current().KindFor(input) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers +} + +func (r *resettableDynamicMapper) KindsFor(input schema.GroupVersionResource) ([]schema.GroupVersionKind, error) { + return r.current().KindsFor(input) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers +} + +func (r *resettableDynamicMapper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error) { + return r.current().ResourceFor(input) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers +} + +func (r *resettableDynamicMapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) { + return r.current().ResourcesFor(input) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers +} + +func (r *resettableDynamicMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) { + return r.current().RESTMapping(gk, versions...) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers +} + +func (r *resettableDynamicMapper) RESTMappings(gk schema.GroupKind, versions ...string) ([]*meta.RESTMapping, error) { + return r.current().RESTMappings(gk, versions...) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers +} + +func (r *resettableDynamicMapper) ResourceSingularizer(resource string) (string, error) { + return r.current().ResourceSingularizer(resource) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers +} + +func (r *resettableDynamicMapper) current() meta.RESTMapper { + r.mu.RLock() + defer r.mu.RUnlock() + + return r.mapper +} diff --git a/internal/multicluster/mapper_test.go b/internal/multicluster/mapper_test.go new file mode 100644 index 0000000..37c2bb3 --- /dev/null +++ b/internal/multicluster/mapper_test.go @@ -0,0 +1,85 @@ +/* +Copyright 2026 The Kilo Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package multicluster + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" +) + +// TestResettableDynamicMapperImplementsResettable is the load-bearing +// invariant of the entire stale-discovery recovery path: the mapper we +// hand to controller-runtime must satisfy meta.ResettableRESTMapper so +// restart.RefreshMapperOnNoMatch can call Reset() through the standard +// interface. Without this, the recovery path silently no-ops in +// production. +func TestResettableDynamicMapperImplementsResettable(t *testing.T) { + cfg := &rest.Config{Host: "https://127.0.0.1:6443"} + + httpClient, err := rest.HTTPClientFor(cfg) + require.NoError(t, err) + + m, err := newResettableDynamicMapper(cfg, httpClient) + require.NoError(t, err) + + _, ok := m.(meta.ResettableRESTMapper) + assert.True(t, ok, "newResettableDynamicMapper must satisfy meta.ResettableRESTMapper; otherwise restart.RefreshMapperOnNoMatch is a no-op") +} + +// TestRawDynamicMapperIsNotResettable documents the upstream gap this +// wrapper exists to close. If a future controller-runtime release adds +// Reset() to the dynamic mapper, this test will flip and the wrapper +// can be deleted. Failing here means the wrapper is no longer needed. +func TestRawDynamicMapperIsNotResettable(t *testing.T) { + cfg := &rest.Config{Host: "https://127.0.0.1:6443"} + + httpClient, err := rest.HTTPClientFor(cfg) + require.NoError(t, err) + + raw, err := apiutil.NewDynamicRESTMapper(cfg, httpClient) + require.NoError(t, err) + + _, ok := raw.(meta.ResettableRESTMapper) + assert.False(t, ok, "apiutil.NewDynamicRESTMapper now implements meta.ResettableRESTMapper — the resettableDynamicMapper wrapper can be removed") +} + +// TestResettableDynamicMapperResetReplacesUnderlying verifies that +// Reset swaps the underlying mapper for a new instance, rather than +// being a no-op stub. +func TestResettableDynamicMapperResetReplacesUnderlying(t *testing.T) { + cfg := &rest.Config{Host: "https://127.0.0.1:6443"} + + httpClient, err := rest.HTTPClientFor(cfg) + require.NoError(t, err) + + mAny, err := newResettableDynamicMapper(cfg, httpClient) + require.NoError(t, err) + + wrapper, ok := mAny.(*resettableDynamicMapper) + require.True(t, ok) + + before := wrapper.current() + wrapper.Reset() + after := wrapper.current() + + assert.NotSame(t, before, after, "Reset must replace the underlying mapper with a fresh instance") +} diff --git a/internal/multicluster/registry.go b/internal/multicluster/registry.go index 1f23448..a1c2ae5 100644 --- a/internal/multicluster/registry.go +++ b/internal/multicluster/registry.go @@ -73,9 +73,12 @@ func (d *directCluster) GetFieldIndexer() client.FieldIndexer { func (d *directCluster) GetCache() cache.Cache { panic("directCluster: GetCache not implemented") } -func (d *directCluster) GetRESTMapper() meta.RESTMapper { - panic("directCluster: GetRESTMapper not implemented") -} +// GetRESTMapper returns nil. directCluster is only used in integration +// tests that drive the reconciler against envtest-built clients; the +// stale-discovery recovery path treats a nil mapper as a no-op, so +// returning nil keeps Mappers() callable without panicking the test +// process. +func (d *directCluster) GetRESTMapper() meta.RESTMapper { return nil } func (d *directCluster) GetAPIReader() client.Reader { panic("directCluster: GetAPIReader not implemented") @@ -157,8 +160,15 @@ func Build( reg.local = entry.Name } - c, err := cluster.New(cfg, func(o *cluster.Options) { - o.Scheme = scheme + c, err := cluster.New(cfg, func(opts *cluster.Options) { + opts.Scheme = scheme + // Replace controller-runtime's default dynamic mapper with + // the resettable wrapper so internal/restart can invalidate + // the discovery cache through meta.ResettableRESTMapper. + // The unexported *apiutil.mapper that NewDynamicRESTMapper + // returns does not implement Reset(), which would silently + // turn the recovery path into a no-op. + opts.MapperProvider = newResettableDynamicMapper }) if err != nil { log.Warn("skipping cluster entry during registry build", @@ -239,3 +249,23 @@ func (r *ClusterRegistry) Clusters() []string { func (r *ClusterRegistry) All() map[string]cluster.Cluster { return r.clusters } + +// Mappers returns the REST mapper of every registered cluster, in +// arbitrary order. Nil mappers (e.g. from test-only directCluster) are +// skipped so callers can iterate without nil-guarding. Used by the +// reconciler to invalidate stale discovery caches after a +// NoKindMatchError without restarting the operator pod. +func (r *ClusterRegistry) Mappers() []meta.RESTMapper { + mappers := make([]meta.RESTMapper, 0, len(r.clusters)) + + for _, c := range r.clusters { + m := c.GetRESTMapper() + if m == nil { + continue + } + + mappers = append(mappers, m) + } + + return mappers +} diff --git a/internal/restart/trigger.go b/internal/restart/trigger.go new file mode 100644 index 0000000..413e869 --- /dev/null +++ b/internal/restart/trigger.go @@ -0,0 +1,72 @@ +/* +Copyright 2026 The Kilo Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package restart + +import ( + "errors" + "log/slog" + + "k8s.io/apimachinery/pkg/api/meta" +) + +// RefreshMapperOnNoMatch resets the given REST mapper when err carries a +// NoKindMatchError. Without this, controller-runtime's REST mapper caches +// the negative discovery entry for the lifetime of the cluster.Cluster — a +// freshly bootstrapped tenant that installs its Peer CRD only after our +// first List(PeerList{}) would deadlock the reconcile loop forever, with +// requeue-with-backoff never refreshing discovery. +// +// Resetting the mapper instead of restarting the operator pod avoids +// kubelet CrashLoopBackOff (which inflates to a 5-minute wait after a +// handful of restarts), lets the manager keep its leader lease, and +// scopes recovery to the one cluster whose CRD state actually drifted. +// The next reconcile picks up the fresh mapping via Discovery and the +// peer push succeeds without further intervention. +// +// mapper may be nil — the call is then a no-op. If the mapper does not +// implement meta.ResettableRESTMapper (an in-memory test fake, say) the +// call also no-ops; that is acceptable because the production code path +// always builds clusters through controller-runtime's dynamic mapper. +// log may be nil — logging is then skipped. +// Returns true when Reset() was actually invoked, so callers can avoid +// double-emitting the recovery event. +func RefreshMapperOnNoMatch(err error, mapper meta.RESTMapper, log *slog.Logger) bool { + if err == nil || mapper == nil { + return false + } + + var noMatch *meta.NoKindMatchError + if !errors.As(err, &noMatch) { + return false + } + + resettable, ok := mapper.(meta.ResettableRESTMapper) + if !ok { + return false + } + + resettable.Reset() + + if log != nil { + log.Info("reset remote-cluster REST mapper after NoMatchError; next reconcile will refresh discovery", + slog.String("groupKind", noMatch.GroupKind.String()), + slog.String("error", err.Error()), + ) + } + + return true +} diff --git a/internal/restart/trigger_test.go b/internal/restart/trigger_test.go new file mode 100644 index 0000000..18179f1 --- /dev/null +++ b/internal/restart/trigger_test.go @@ -0,0 +1,117 @@ +/* +Copyright 2026 The Kilo Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package restart + +import ( + "testing" + + cockerrors "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +var errUnrelated = cockerrors.New("network blew up") + +// resettableSpy is a meta.ResettableRESTMapper that counts Reset() calls +// and delegates the rest of the interface to an unused fallback. The +// helper only ever invokes Reset; the other methods are not exercised +// from production code paths under test. +type resettableSpy struct { + meta.RESTMapper + + resets int +} + +func (r *resettableSpy) Reset() { r.resets++ } + +func TestRefreshMapperOnNoMatch_NilErr(t *testing.T) { + spy := &resettableSpy{} + + called := RefreshMapperOnNoMatch(nil, spy, testLogger()) + + assert.False(t, called, "nil err must not reset") + assert.Equal(t, 0, spy.resets, "Reset must not be invoked on nil err") +} + +func TestRefreshMapperOnNoMatch_NilMapper(t *testing.T) { + noMatch := &meta.NoKindMatchError{GroupKind: schema.GroupKind{Group: "kilo.squat.ai", Kind: "Peer"}} + + called := RefreshMapperOnNoMatch(noMatch, nil, testLogger()) + + assert.False(t, called, "nil mapper must yield false even for NoMatch error") +} + +func TestRefreshMapperOnNoMatch_UnrelatedErr(t *testing.T) { + spy := &resettableSpy{} + + called := RefreshMapperOnNoMatch(errUnrelated, spy, testLogger()) + + assert.False(t, called, "unrelated err must not reset") + assert.Equal(t, 0, spy.resets, "Reset must not be invoked for non-NoMatch errors") +} + +func TestRefreshMapperOnNoMatch_PlainNoMatch(t *testing.T) { + spy := &resettableSpy{} + noMatch := &meta.NoKindMatchError{GroupKind: schema.GroupKind{Group: "kilo.squat.ai", Kind: "Peer"}} + + called := RefreshMapperOnNoMatch(noMatch, spy, testLogger()) + + assert.True(t, called, "plain NoKindMatchError must reset") + assert.Equal(t, 1, spy.resets, "Reset must be invoked exactly once") +} + +func TestRefreshMapperOnNoMatch_WrappedNoMatch(t *testing.T) { + spy := &resettableSpy{} + noMatch := &meta.NoKindMatchError{GroupKind: schema.GroupKind{Group: "kilo.squat.ai", Kind: "Peer"}} + wrapped := cockerrors.Wrap(cockerrors.Wrap(noMatch, "listing existing peers"), "reconciling peers from \"a\" to \"b\"") + + called := RefreshMapperOnNoMatch(wrapped, spy, testLogger()) + + assert.True(t, called, "wrapped NoKindMatchError must reset (the controller wraps the error twice in production)") + assert.Equal(t, 1, spy.resets) +} + +func TestRefreshMapperOnNoMatch_NonResettableMapper(t *testing.T) { + // A bare meta.RESTMapper that does not implement ResettableRESTMapper. + // Production builds always get a resettable mapper from + // apiutil.NewDynamicRESTMapper, but tests/fakes may pass anything. + bare := stubRESTMapper{} + + called := RefreshMapperOnNoMatch( + &meta.NoKindMatchError{GroupKind: schema.GroupKind{Group: "g", Kind: "K"}}, + bare, + testLogger(), + ) + + assert.False(t, called, "non-resettable mapper must yield false") +} + +func TestRefreshMapperOnNoMatch_NilLogger(t *testing.T) { + spy := &resettableSpy{} + noMatch := &meta.NoKindMatchError{GroupKind: schema.GroupKind{Group: "kilo.squat.ai", Kind: "Peer"}} + + assert.NotPanics(t, func() { + RefreshMapperOnNoMatch(noMatch, spy, nil) + }, "nil logger must be tolerated") + assert.Equal(t, 1, spy.resets) +} + +// stubRESTMapper is a placeholder meta.RESTMapper that does not implement +// ResettableRESTMapper. It is used only to verify the type-assertion +// guard; none of its methods are called in tests. +type stubRESTMapper struct{ meta.RESTMapper }