Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 40 additions & 20 deletions internal/controller/clustermesh_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/squat/kilo-clustermesh-operator/internal/kilonode"
"github.com/squat/kilo-clustermesh-operator/internal/multicluster"
"github.com/squat/kilo-clustermesh-operator/internal/peer"
"github.com/squat/kilo-clustermesh-operator/internal/restart"
"github.com/squat/kilo-clustermesh-operator/internal/validation"
kilov1alpha1 "github.com/squat/kilo-clustermesh-operator/pkg/kilo/v1alpha1"
)
Expand Down Expand Up @@ -67,16 +68,48 @@ type ClusterMeshReconciler struct {
func (r *ClusterMeshReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.With(slog.String("name", req.Name), slog.String("namespace", req.Namespace))

err := r.reconcile(ctx, log, req)

// A NoKindMatchError on a remote cluster's REST mapper survives the
// lifetime of cluster.Cluster because the negative discovery entry
// is cached. Reset every remote-cluster mapper so the next reconcile
// re-discovers the missing kind; the source target is unknown at
// this level (the wrapped error carries it as text only), and
// Reset() is cheap — it only invalidates the in-memory cache and
// the next List() pays a one-time discovery round-trip. Self-heal
// without taking the operator pod down, which would drop the leader
// lease and inflate to a CrashLoopBackOff after a few stale CRDs.
for _, m := range r.Registry.Mappers() {
restart.RefreshMapperOnNoMatch(err, m, log)
}

return ctrl.Result{}, err
}

// SetupWithManager registers the controller with the manager.
func (r *ClusterMeshReconciler) SetupWithManager(mgr ctrl.Manager) error {
err := ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.ClusterMesh{}).
Named("clustermesh").
WithEventFilter(predicate.Funcs{
DeleteFunc: func(event.DeleteEvent) bool { return false },
}).
Complete(r)

return errors.Wrap(err, "building clustermesh controller")
}

func (r *ClusterMeshReconciler) reconcile(ctx context.Context, log *slog.Logger, req ctrl.Request) error {
mesh := &v1alpha1.ClusterMesh{}

err := r.Get(ctx, req.NamespacedName, mesh)
if err != nil {
return ctrl.Result{}, errors.Wrap(client.IgnoreNotFound(err), "fetching ClusterMesh")
return errors.Wrap(client.IgnoreNotFound(err), "fetching ClusterMesh")
}

// Handle deletion via finalizer.
if !mesh.DeletionTimestamp.IsZero() {
return ctrl.Result{}, r.handleDeletion(ctx, log, mesh)
return r.handleDeletion(ctx, log, mesh)
}

// Ensure finalizer is present.
Expand All @@ -85,23 +118,23 @@ func (r *ClusterMeshReconciler) Reconcile(ctx context.Context, req ctrl.Request)

err = r.Update(ctx, mesh)
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "adding finalizer")
return errors.Wrap(err, "adding finalizer")
}

return ctrl.Result{}, nil
return nil
}

// Mesh-level validation.
if overlap, msg := r.validateMeshNetworks(ctx, log, mesh); overlap {
return ctrl.Result{}, r.setOverlapCondition(ctx, mesh, msg)
return r.setOverlapCondition(ctx, mesh, msg)
}

setCondition(mesh, "NetworksOverlap", metav1.ConditionFalse, "NoOverlap", "all CIDRs are disjoint")

// Reconcile per-cluster peers.
clusterStatuses, err := r.reconcileAllClusters(ctx, log, mesh)
if err != nil {
return ctrl.Result{}, err
return err
}

// Sweep peers whose source-cluster was removed from spec.Clusters since
Expand All @@ -118,20 +151,7 @@ func (r *ClusterMeshReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// the global cleanup pass so the cluster always converges.
r.cleanupOrphanMeshPeers(ctx, log, mesh.Namespace)

return ctrl.Result{}, r.updateStatus(ctx, mesh, clusterStatuses)
}

// SetupWithManager registers the controller with the manager.
func (r *ClusterMeshReconciler) SetupWithManager(mgr ctrl.Manager) error {
err := ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.ClusterMesh{}).
Named("clustermesh").
WithEventFilter(predicate.Funcs{
DeleteFunc: func(event.DeleteEvent) bool { return false },
}).
Complete(r)

return errors.Wrap(err, "building clustermesh controller")
return r.updateStatus(ctx, mesh, clusterStatuses)
}

// cleanupSweepTimeout caps the per-target list/delete pass time so a single
Expand Down
129 changes: 129 additions & 0 deletions internal/multicluster/mapper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
Copyright 2026 The Kilo Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package multicluster

import (
"net/http"
"sync"

"github.com/cockroachdb/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

// resettableDynamicMapper wraps apiutil.NewDynamicRESTMapper with a
// Reset() that atomically swaps the underlying mapper for a fresh one.
//
// controller-runtime's dynamic mapper does not implement
// meta.ResettableRESTMapper on its own: the unexported *apiutil.mapper
// type exposes KindFor, RESTMapping and the rest of meta.RESTMapper but
// has no Reset method. Without this wrapper, internal/restart's
// recovery path cannot invalidate the discovery cache through the
// standard interface and silently no-ops in production.
//
// Reset reconstructs the underlying dynamic mapper. NewDynamicRESTMapper
// is lazy (it does not perform discovery at construction time, only on
// first lookup), so Reset is cheap; the next List against an uncached
// kind triggers one discovery round-trip against the target apiserver.
// On rebuild failure the previous mapper is preserved so callers still
// have a working mapper for kinds already in cache; the next Reset
// retries the rebuild.
//
// The proxy methods deliberately do not wrap the underlying mapper's
// errors. Wrapping with a non-apimachinery error type would change the
// error chain in ways errors.As consumers downstream (notably
// restart.RefreshMapperOnNoMatch, which fishes out *meta.NoKindMatchError)
// already handle, but adding a wrap layer here gives no diagnostic
// value over the upstream error and only increases chain depth.
type resettableDynamicMapper struct {
cfg *rest.Config
httpClient *http.Client

mu sync.RWMutex
mapper meta.RESTMapper
}

var _ meta.ResettableRESTMapper = (*resettableDynamicMapper)(nil)

// newResettableDynamicMapper builds the wrapper and pre-constructs the
// initial dynamic mapper so the first RESTMapper call does not pay the
// build cost. The signature matches cluster.Options.MapperProvider so
// it can be passed straight into cluster.New.
func newResettableDynamicMapper(cfg *rest.Config, httpClient *http.Client) (meta.RESTMapper, error) {
m, err := apiutil.NewDynamicRESTMapper(cfg, httpClient)
if err != nil {
return nil, errors.Wrap(err, "building dynamic REST mapper")
}

return &resettableDynamicMapper{
cfg: cfg,
httpClient: httpClient,
mapper: m,
}, nil
}

// Reset replaces the underlying dynamic mapper with a freshly built
// one. On rebuild failure the existing mapper is kept and the call
// silently no-ops; the next Reset will retry. Concurrent callers see
// the swap atomically.
func (r *resettableDynamicMapper) Reset() {
fresh, err := apiutil.NewDynamicRESTMapper(r.cfg, r.httpClient)
if err != nil {
return
}

r.mu.Lock()
r.mapper = fresh
r.mu.Unlock()
}
Comment on lines +85 to +94

func (r *resettableDynamicMapper) KindFor(input schema.GroupVersionResource) (schema.GroupVersionKind, error) {
return r.current().KindFor(input) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers
}

func (r *resettableDynamicMapper) KindsFor(input schema.GroupVersionResource) ([]schema.GroupVersionKind, error) {
return r.current().KindsFor(input) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers
}

func (r *resettableDynamicMapper) ResourceFor(input schema.GroupVersionResource) (schema.GroupVersionResource, error) {
return r.current().ResourceFor(input) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers
}

func (r *resettableDynamicMapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) {
return r.current().ResourcesFor(input) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers
}

func (r *resettableDynamicMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) {
return r.current().RESTMapping(gk, versions...) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers
}

func (r *resettableDynamicMapper) RESTMappings(gk schema.GroupKind, versions ...string) ([]*meta.RESTMapping, error) {
return r.current().RESTMappings(gk, versions...) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers
}

func (r *resettableDynamicMapper) ResourceSingularizer(resource string) (string, error) {
return r.current().ResourceSingularizer(resource) //nolint:wrapcheck // thin proxy; wrapping would obscure errors.As consumers
}

func (r *resettableDynamicMapper) current() meta.RESTMapper {
r.mu.RLock()
defer r.mu.RUnlock()

return r.mapper
}
85 changes: 85 additions & 0 deletions internal/multicluster/mapper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Copyright 2026 The Kilo Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package multicluster

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

// TestResettableDynamicMapperImplementsResettable is the load-bearing
// invariant of the entire stale-discovery recovery path: the mapper we
// hand to controller-runtime must satisfy meta.ResettableRESTMapper so
// restart.RefreshMapperOnNoMatch can call Reset() through the standard
// interface. Without this, the recovery path silently no-ops in
// production.
func TestResettableDynamicMapperImplementsResettable(t *testing.T) {
cfg := &rest.Config{Host: "https://127.0.0.1:6443"}

httpClient, err := rest.HTTPClientFor(cfg)
require.NoError(t, err)

m, err := newResettableDynamicMapper(cfg, httpClient)
require.NoError(t, err)

_, ok := m.(meta.ResettableRESTMapper)
assert.True(t, ok, "newResettableDynamicMapper must satisfy meta.ResettableRESTMapper; otherwise restart.RefreshMapperOnNoMatch is a no-op")
}

// TestRawDynamicMapperIsNotResettable documents the upstream gap this
// wrapper exists to close. If a future controller-runtime release adds
// Reset() to the dynamic mapper, this test will flip and the wrapper
// can be deleted. Failing here means the wrapper is no longer needed.
func TestRawDynamicMapperIsNotResettable(t *testing.T) {
cfg := &rest.Config{Host: "https://127.0.0.1:6443"}

httpClient, err := rest.HTTPClientFor(cfg)
require.NoError(t, err)

raw, err := apiutil.NewDynamicRESTMapper(cfg, httpClient)
require.NoError(t, err)

_, ok := raw.(meta.ResettableRESTMapper)
assert.False(t, ok, "apiutil.NewDynamicRESTMapper now implements meta.ResettableRESTMapper — the resettableDynamicMapper wrapper can be removed")
}

// TestResettableDynamicMapperResetReplacesUnderlying verifies that
// Reset swaps the underlying mapper for a new instance, rather than
// being a no-op stub.
func TestResettableDynamicMapperResetReplacesUnderlying(t *testing.T) {
cfg := &rest.Config{Host: "https://127.0.0.1:6443"}

httpClient, err := rest.HTTPClientFor(cfg)
require.NoError(t, err)

mAny, err := newResettableDynamicMapper(cfg, httpClient)
require.NoError(t, err)

wrapper, ok := mAny.(*resettableDynamicMapper)
require.True(t, ok)

before := wrapper.current()
wrapper.Reset()
after := wrapper.current()

assert.NotSame(t, before, after, "Reset must replace the underlying mapper with a fresh instance")
}
40 changes: 35 additions & 5 deletions internal/multicluster/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,12 @@ func (d *directCluster) GetFieldIndexer() client.FieldIndexer {

func (d *directCluster) GetCache() cache.Cache { panic("directCluster: GetCache not implemented") }

func (d *directCluster) GetRESTMapper() meta.RESTMapper {
panic("directCluster: GetRESTMapper not implemented")
}
// GetRESTMapper returns nil. directCluster is only used in integration
// tests that drive the reconciler against envtest-built clients; the
// stale-discovery recovery path treats a nil mapper as a no-op, so
// returning nil keeps Mappers() callable without panicking the test
// process.
func (d *directCluster) GetRESTMapper() meta.RESTMapper { return nil }

func (d *directCluster) GetAPIReader() client.Reader {
panic("directCluster: GetAPIReader not implemented")
Expand Down Expand Up @@ -157,8 +160,15 @@ func Build(
reg.local = entry.Name
}

c, err := cluster.New(cfg, func(o *cluster.Options) {
o.Scheme = scheme
c, err := cluster.New(cfg, func(opts *cluster.Options) {
opts.Scheme = scheme
// Replace controller-runtime's default dynamic mapper with
// the resettable wrapper so internal/restart can invalidate
// the discovery cache through meta.ResettableRESTMapper.
// The unexported *apiutil.mapper that NewDynamicRESTMapper
// returns does not implement Reset(), which would silently
// turn the recovery path into a no-op.
opts.MapperProvider = newResettableDynamicMapper
})
if err != nil {
log.Warn("skipping cluster entry during registry build",
Expand Down Expand Up @@ -239,3 +249,23 @@ func (r *ClusterRegistry) Clusters() []string {
func (r *ClusterRegistry) All() map[string]cluster.Cluster {
return r.clusters
}

// Mappers returns the REST mapper of every registered cluster, in
// arbitrary order. Nil mappers (e.g. from test-only directCluster) are
// skipped so callers can iterate without nil-guarding. Used by the
// reconciler to invalidate stale discovery caches after a
// NoKindMatchError without restarting the operator pod.
func (r *ClusterRegistry) Mappers() []meta.RESTMapper {
mappers := make([]meta.RESTMapper, 0, len(r.clusters))

for _, c := range r.clusters {
m := c.GetRESTMapper()
if m == nil {
continue
}

mappers = append(mappers, m)
}

return mappers
}
Loading
Loading