From 258c8bdb562bd107dca955a30f1fa596b147693a Mon Sep 17 00:00:00 2001 From: Zach Smith Date: Fri, 13 Feb 2026 00:19:19 -0800 Subject: [PATCH] feat: align with upstream apiserver controllers --- README.md | 26 +++ cmd/apiserver/app/config.go | 57 ++++++ cmd/apiserver/app/options/options.go | 21 ++- cmd/apiserver/app/options/validation.go | 20 +++ pkg/multicluster/admission/mutating.go | 16 +- .../admission/namespace/manager.go | 1 + pkg/multicluster/admission/webhook/manager.go | 1 + .../webhook/mutating/reinvocationcontext.go | 8 + pkg/multicluster/auth/dispatcher.go | 15 ++ pkg/multicluster/auth/dispatcher_test.go | 24 +++ pkg/multicluster/auth/manager.go | 3 +- pkg/multicluster/auth/tokenreview_hints.go | 52 ++++++ pkg/multicluster/bootstrap/controllers.go | 69 +++++++ .../bootstrap/kubernetesservice_controller.go | 168 ++++++++++++++++++ pkg/multicluster/bootstrap/rbac.go | 116 ++++++++++++ pkg/multicluster/bootstrap/rbac_test.go | 40 +++++ pkg/multicluster/bootstrap/servicecidr.go | 94 ++++++++++ .../bootstrap/systemnamespaces.go | 108 +++++++++++ .../bootstrap/systemnamespaces_test.go | 81 +++++++++ pkg/multicluster/handler.go | 3 + pkg/multicluster/informerpool.go | 9 +- pkg/multicluster/options.go | 3 + test/smoke/apiserver_test.go | 10 ++ test/smoke/auth_test.go | 8 +- test/smoke/bootstrap_defaults_test.go | 56 ++++++ test/smoke/bootstrap_test.go | 36 ++++ test/smoke/core_bootstrap_test.go | 76 ++++++++ test/smoke/crud_test.go | 3 + test/smoke/helpers_test.go | 28 +++ test/smoke/internal_controllers_test.go | 56 ++++++ test/smoke/rbac_bootstrap_test.go | 34 ++++ test/smoke/webhook_test.go | 9 +- 32 files changed, 1239 insertions(+), 12 deletions(-) create mode 100644 pkg/multicluster/auth/tokenreview_hints.go create mode 100644 pkg/multicluster/bootstrap/controllers.go create mode 100644 pkg/multicluster/bootstrap/kubernetesservice_controller.go create mode 100644 pkg/multicluster/bootstrap/rbac.go create mode 100644 pkg/multicluster/bootstrap/rbac_test.go create mode 100644 pkg/multicluster/bootstrap/servicecidr.go create mode 100644 pkg/multicluster/bootstrap/systemnamespaces.go create mode 100644 pkg/multicluster/bootstrap/systemnamespaces_test.go create mode 100644 test/smoke/bootstrap_defaults_test.go create mode 100644 test/smoke/bootstrap_test.go create mode 100644 test/smoke/core_bootstrap_test.go create mode 100644 test/smoke/helpers_test.go create mode 100644 test/smoke/internal_controllers_test.go create mode 100644 test/smoke/rbac_bootstrap_test.go diff --git a/README.md b/README.md index a7ea001..22dbb0a 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,8 @@ docker run --rm -p 2379:2379 -e ALLOW_NONE_AUTHENTICATION=yes -e ETCD_ADVERTISE_ --allow-privileged=true \ --authorization-mode=AlwaysAllow \ --anonymous-auth=true \ + --service-cidr-sharing-mode=per-cluster \ + --kubernetes-service-mode=per-cluster-autoip \ --secure-port=6443 ``` @@ -56,6 +58,30 @@ When changed, the readiness path becomes: https://127.0.0.1:6443/clusters/default/control-plane/readyz ``` +#### Multicluster flags +These flags control control-plane topology behavior. They are optional because +the server defaults to per-cluster behavior. + +- `--root-control-plane-name` (default: `root`): name of the root control + plane used for default routing and root-level configuration. +- `--service-cidr-sharing-mode` (default: `per-cluster`): service CIDR + bootstrap mode. + - `per-cluster`: bootstrap a default `ServiceCIDR` per virtual control plane. + - `shared`: keep only root-managed upstream behavior. +- `--kubernetes-service-mode` (default: `per-cluster-autoip`): `default/kubernetes` + service behavior. + - `per-cluster-autoip`: reconcile `default/kubernetes` in each virtual control + plane with allocator-assigned ClusterIP. + - `shared`: keep only root-managed upstream behavior. + +Example shared mode: +```bash +./apiserver \ + ... \ + --service-cidr-sharing-mode=shared \ + --kubernetes-service-mode=shared +``` + #### Webhooks Admission webhooks are scoped per control plane. For each cluster, the server creates a per-cluster webhook client/informer stack and resolves Services and diff --git a/cmd/apiserver/app/config.go b/cmd/apiserver/app/config.go index f667a3c..97a1457 100644 --- a/cmd/apiserver/app/config.go +++ b/cmd/apiserver/app/config.go @@ -26,6 +26,7 @@ import ( namespaceplugin "k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle" "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/server" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/webhook" "k8s.io/klog/v2" aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" @@ -34,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/controlplane" controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver" + "k8s.io/kubernetes/pkg/features" generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi" "github.com/kplane-dev/apiserver/cmd/apiserver/app/options" @@ -42,6 +44,7 @@ import ( mcnsl "github.com/kplane-dev/apiserver/pkg/multicluster/admission/namespace" mcwh "github.com/kplane-dev/apiserver/pkg/multicluster/admission/webhook" mcauth "github.com/kplane-dev/apiserver/pkg/multicluster/auth" + mcbootstrap "github.com/kplane-dev/apiserver/pkg/multicluster/bootstrap" ) type Config struct { @@ -118,6 +121,21 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { } clientPool := mc.NewClientPool(genericConfig.LoopbackClientConfig, mcOpts.PathPrefix, mcOpts.ControlPlaneSegment) informerPool := mc.NewInformerPoolFromClientPool(clientPool, 0, genericConfig.DrainedNotify()) + systemNamespaceBootstrapper := mcbootstrap.NewSystemNamespaceBootstrapper(mcbootstrap.SystemNamespaceOptions{ + ClientForCluster: clientPool.KubeClientForCluster, + Namespaces: opts.SystemNamespaces, + }) + serviceCIDRBootstrapper := mcbootstrap.NewServiceCIDRBootstrapper(mcbootstrap.ServiceCIDROptions{ + ClientForCluster: clientPool.KubeClientForCluster, + PrimaryRange: opts.PrimaryServiceClusterIPRange, + SecondaryRange: opts.SecondaryServiceClusterIPRange, + Enabled: utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) && opts.ServiceCIDRSharingMode == options.ServiceCIDRSharingModePerCluster, + }) + rbacBootstrapper := mcbootstrap.NewRBACBootstrapper(mcbootstrap.RBACOptions{ + BaseLoopbackClientConfig: genericConfig.LoopbackClientConfig, + PathPrefix: mcOpts.PathPrefix, + ControlPlaneSegment: mcOpts.ControlPlaneSegment, + }) genericConfig.BuildHandlerChainFunc = func(h http.Handler, conf *server.Config) http.Handler { ex := mc.PathExtractor{PathPrefix: mcOpts.PathPrefix, ControlPlaneSegment: mcOpts.ControlPlaneSegment} return mc.WithClusterRouting(server.DefaultBuildHandlerChain(h, conf), ex, mcOpts) @@ -179,6 +197,45 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) { if c.KubeAPIs.ControlPlane.Generic.RESTOptionsGetter != nil { c.KubeAPIs.ControlPlane.Generic.RESTOptionsGetter = decorateRESTOptionsGetter("controlplane", c.KubeAPIs.ControlPlane.Generic.RESTOptionsGetter, mcOpts) } + targetPort := 443 + if opts.SecureServing != nil && opts.SecureServing.BindPort > 0 { + targetPort = opts.SecureServing.BindPort + } + stopChForCluster := func(clusterID string) (<-chan struct{}, error) { + _, _, stopCh, err := informerPool.Get(clusterID) + if err != nil { + return nil, err + } + return stopCh, nil + } + internalControllerMgr := mcbootstrap.NewInternalControllerManager(mcbootstrap.InternalControllerOptions{ + ClientForCluster: clientPool.KubeClientForCluster, + StopChForCluster: stopChForCluster, + ClusterAuthInfo: kubeAPIs.ControlPlane.Extra.ClusterAuthenticationInfo, + }) + kubeServiceControllerMgr := mcbootstrap.NewKubernetesServiceControllerManager(mcbootstrap.KubernetesServiceControllerOptions{ + ClientForCluster: clientPool.KubeClientForCluster, + StopChForCluster: stopChForCluster, + PublicIP: kubeAPIs.ControlPlane.Generic.PublicAddress, + ServicePort: 443, + TargetPort: targetPort, + NodePort: opts.KubernetesServiceNodePort, + }) + mcOpts.OnClusterSelected = func(clusterID string) { + // Preserve upstream root bootstrap as-is; only add multicluster bootstrap for non-root VCPs. + if clusterID == mcOpts.DefaultCluster { + return + } + // Run asynchronously to avoid recursive request deadlocks when bootstrap logic + // uses cluster-scoped clients that route back through this same middleware. + go systemNamespaceBootstrapper.Ensure(clusterID) + go serviceCIDRBootstrapper.Ensure(clusterID) + go rbacBootstrapper.Ensure(clusterID) + go internalControllerMgr.Ensure(clusterID) + if opts.KubernetesServiceMode == options.KubernetesServiceModePerClusterAutoIP { + go kubeServiceControllerMgr.Ensure(clusterID) + } + } // Cluster-aware webhook admission (per-cluster clients + informers, no global cross-cluster view). authWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper( diff --git a/cmd/apiserver/app/options/options.go b/cmd/apiserver/app/options/options.go index 6042d8d..85a438a 100644 --- a/cmd/apiserver/app/options/options.go +++ b/cmd/apiserver/app/options/options.go @@ -47,7 +47,9 @@ type Extra struct { AllowPrivileged bool KubeletConfig kubeletclient.KubeletClientConfig KubernetesServiceNodePort int + KubernetesServiceMode string RootControlPlaneName string + ServiceCIDRSharingMode string // ServiceClusterIPRange is mapped to input provided by user ServiceClusterIPRanges string // PrimaryServiceClusterIPRange and SecondaryServiceClusterIPRange are the results @@ -64,6 +66,13 @@ type Extra struct { MasterCount int } +const ( + ServiceCIDRSharingModeShared = "shared" + ServiceCIDRSharingModePerCluster = "per-cluster" + KubernetesServiceModeShared = "shared" + KubernetesServiceModePerClusterAutoIP = "per-cluster-autoip" +) + // NewServerRunOptions creates and returns ServerRunOptions according to the given featureGate and effectiveVersion of the server binary to run. func NewServerRunOptions() *ServerRunOptions { s := ServerRunOptions{ @@ -88,9 +97,11 @@ func NewServerRunOptions() *ServerRunOptions { }, HTTPTimeout: time.Duration(5) * time.Second, }, - ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange, - MasterCount: 1, - RootControlPlaneName: mc.DefaultClusterName, + ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange, + MasterCount: 1, + RootControlPlaneName: mc.DefaultClusterName, + KubernetesServiceMode: KubernetesServiceModePerClusterAutoIP, + ServiceCIDRSharingMode: ServiceCIDRSharingModePerCluster, }, } @@ -108,6 +119,10 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) { mcfs := fss.FlagSet("multicluster") mcfs.StringVar(&s.RootControlPlaneName, "root-control-plane-name", s.RootControlPlaneName, "Name of the root control plane that receives CLI flag configuration and is used for default cluster routing.") + mcfs.StringVar(&s.KubernetesServiceMode, "kubernetes-service-mode", s.KubernetesServiceMode, + "Kubernetes default service strategy across control planes: 'shared' (upstream root-only) or 'per-cluster-autoip' (create/reconcile default/kubernetes in each virtual control plane with allocator-assigned ClusterIP).") + mcfs.StringVar(&s.ServiceCIDRSharingMode, "service-cidr-sharing-mode", s.ServiceCIDRSharingMode, + "Service CIDR strategy across control planes: 'shared' keeps root-managed defaults only; 'per-cluster' bootstraps default ServiceCIDR in each virtual control plane.") fs := fss.FlagSet("misc") diff --git a/cmd/apiserver/app/options/validation.go b/cmd/apiserver/app/options/validation.go index 271f60a..e00fe9d 100644 --- a/cmd/apiserver/app/options/validation.go +++ b/cmd/apiserver/app/options/validation.go @@ -119,6 +119,24 @@ func validateRootControlPlaneName(name string) []error { return errs } +func validateServiceCIDRSharingMode(mode string) []error { + switch mode { + case ServiceCIDRSharingModeShared, ServiceCIDRSharingModePerCluster: + return nil + default: + return []error{fmt.Errorf("--service-cidr-sharing-mode must be one of %q or %q", ServiceCIDRSharingModeShared, ServiceCIDRSharingModePerCluster)} + } +} + +func validateKubernetesServiceMode(mode string) []error { + switch mode { + case KubernetesServiceModeShared, KubernetesServiceModePerClusterAutoIP: + return nil + default: + return []error{fmt.Errorf("--kubernetes-service-mode must be one of %q or %q", KubernetesServiceModeShared, KubernetesServiceModePerClusterAutoIP)} + } +} + func validatePublicIPServiceClusterIPRangeIPFamilies(extra Extra, generic genericoptions.ServerRunOptions) []error { // The "kubernetes.default" Service is SingleStack based on the configured ServiceIPRange. // If the bootstrap controller reconcile the kubernetes.default Service and Endpoints, it must @@ -146,6 +164,8 @@ func (s CompletedOptions) Validate() []error { errs = append(errs, validateClusterIPFlags(s.Extra)...) errs = append(errs, validateServiceNodePort(s.Extra)...) errs = append(errs, validateRootControlPlaneName(s.RootControlPlaneName)...) + errs = append(errs, validateKubernetesServiceMode(s.KubernetesServiceMode)...) + errs = append(errs, validateServiceCIDRSharingMode(s.ServiceCIDRSharingMode)...) errs = append(errs, validatePublicIPServiceClusterIPRangeIPFamilies(s.Extra, *s.GenericServerRunOptions)...) if s.MasterCount <= 0 { diff --git a/pkg/multicluster/admission/mutating.go b/pkg/multicluster/admission/mutating.go index c411afd..e90a26b 100644 --- a/pkg/multicluster/admission/mutating.go +++ b/pkg/multicluster/admission/mutating.go @@ -5,8 +5,10 @@ import ( "strings" mcv1 "github.com/kplane-dev/apiserver/pkg/multicluster" + mcauth "github.com/kplane-dev/apiserver/pkg/multicluster/auth" "k8s.io/apimachinery/pkg/api/meta" apiserveradmission "k8s.io/apiserver/pkg/admission" + authenticationapi "k8s.io/kubernetes/pkg/apis/authentication" ) const MutatingPluginName = "MulticlusterMutating" @@ -26,8 +28,20 @@ func NewMutating(opts mcv1.Options) *Mutating { func (m *Mutating) Handles(op apiserveradmission.Operation) bool { return m.Handler.Handles(op) } func (m *Mutating) Admit(ctx context.Context, a apiserveradmission.Attributes, _ apiserveradmission.ObjectInterfaces) error { - // Skip non-persisted review objects (SAR/TokenReview), which require empty metadata + // TokenReview is non-persisted and expects empty metadata. Record token->cluster + // hint so synthetic auth requests in TokenReview storage can route correctly. gvk := a.GetKind() + if gvk.Group == "authentication.k8s.io" && gvk.Kind == "TokenReview" { + if tr, ok := a.GetObject().(*authenticationapi.TokenReview); ok { + cid, _, _ := mcv1.FromContext(ctx) + if cid == "" { + cid = m.Options.DefaultCluster + } + mcauth.RememberTokenReviewHint(tr.Spec.Token, cid) + } + return nil + } + // Skip other non-persisted review objects (e.g. SAR), which require empty metadata. if gvk.Group == "authorization.k8s.io" || gvk.Group == "authentication.k8s.io" || strings.HasSuffix(gvk.Kind, "Review") { return nil } diff --git a/pkg/multicluster/admission/namespace/manager.go b/pkg/multicluster/admission/namespace/manager.go index 6517de4..1ea560b 100644 --- a/pkg/multicluster/admission/namespace/manager.go +++ b/pkg/multicluster/admission/namespace/manager.go @@ -79,6 +79,7 @@ func (m *Manager) envForCluster(clusterID string) (*clusterEnv, error) { // Warm the namespaces informer (used by NamespaceLifecycle). _ = inf.Core().V1().Namespaces().Informer() + inf.Start(stopCh) m.clusters[clusterID] = e return e, nil } diff --git a/pkg/multicluster/admission/webhook/manager.go b/pkg/multicluster/admission/webhook/manager.go index 6304889..7c66e8b 100644 --- a/pkg/multicluster/admission/webhook/manager.go +++ b/pkg/multicluster/admission/webhook/manager.go @@ -109,6 +109,7 @@ func (m *Manager) envForCluster(clusterID string) (*clusterEnv, error) { _ = inf.Discovery().V1().EndpointSlices().Informer() _ = inf.Admissionregistration().V1().MutatingWebhookConfigurations().Informer() _ = inf.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer() + inf.Start(stopCh) // Start informers for resources needed by webhook admission. go func() { diff --git a/pkg/multicluster/admission/webhook/mutating/reinvocationcontext.go b/pkg/multicluster/admission/webhook/mutating/reinvocationcontext.go index c778024..1b0dd66 100644 --- a/pkg/multicluster/admission/webhook/mutating/reinvocationcontext.go +++ b/pkg/multicluster/admission/webhook/mutating/reinvocationcontext.go @@ -19,6 +19,7 @@ package mutating import ( "crypto/sha256" "encoding/hex" + "reflect" "sync" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -76,6 +77,13 @@ func (c *webhookReinvokeContext) ShouldReinvokeWebhook(uid string) bool { } func objHash(obj runtime.Object) string { + if obj == nil { + return "" + } + v := reflect.ValueOf(obj) + if v.Kind() == reflect.Pointer && v.IsNil() { + return "" + } u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { return "" diff --git a/pkg/multicluster/auth/dispatcher.go b/pkg/multicluster/auth/dispatcher.go index c9b8baa..8333cdc 100644 --- a/pkg/multicluster/auth/dispatcher.go +++ b/pkg/multicluster/auth/dispatcher.go @@ -3,6 +3,7 @@ package auth import ( "context" "net/http" + "strings" mc "github.com/kplane-dev/apiserver/pkg/multicluster" "k8s.io/apiserver/pkg/authentication/authenticator" @@ -41,6 +42,11 @@ func (c *ClusterAuthenticator) AuthenticateRequest(req *http.Request) (*authenti return nil, false, nil } cid := clusterFromContext(req.Context()) + if cid == "" { + if token := bearerTokenFromAuthHeader(req.Header.Get("Authorization")); token != "" { + cid = clusterHintForToken(token) + } + } useRoot := cid == "" || cid == c.rootCluster if useRoot && c.root != nil { return c.root.AuthenticateRequest(req) @@ -126,3 +132,12 @@ func clusterFromContext(ctx context.Context) string { cid, _, _ := mc.FromContext(ctx) return cid } + +func bearerTokenFromAuthHeader(authz string) string { + const prefix = "Bearer " + if !strings.HasPrefix(authz, prefix) { + return "" + } + token := strings.TrimSpace(strings.TrimPrefix(authz, prefix)) + return token +} diff --git a/pkg/multicluster/auth/dispatcher_test.go b/pkg/multicluster/auth/dispatcher_test.go index d562792..fa30345 100644 --- a/pkg/multicluster/auth/dispatcher_test.go +++ b/pkg/multicluster/auth/dispatcher_test.go @@ -131,3 +131,27 @@ func TestClusterAuthorizerDispatch(t *testing.T) { t.Fatalf("expected resolver to see cluster c-3, got %q", lastCluster) } } + +func TestClusterAuthenticatorUsesTokenHintWithoutClusterContext(t *testing.T) { + var called string + var lastCluster string + + root := &fakeAuthenticator{name: "root", called: &called} + cluster := &fakeAuthenticator{name: "cluster", called: &called} + resolver := &fakeResolver{authn: cluster, lastCluster: &lastCluster} + dispatch := NewClusterAuthenticator("root", root, resolver) + + token := "tok-" + t.Name() + rememberTokenReviewHint(token, "c-42") + + req := httptest.NewRequest("GET", "http://example", nil) + req.Header.Set("Authorization", "Bearer "+token) + _, _, _ = dispatch.AuthenticateRequest(req) + + if called != "cluster" { + t.Fatalf("expected cluster authenticator via token hint, got %q", called) + } + if lastCluster != "c-42" { + t.Fatalf("expected resolver cluster c-42, got %q", lastCluster) + } +} diff --git a/pkg/multicluster/auth/manager.go b/pkg/multicluster/auth/manager.go index cc36c58..f55925e 100644 --- a/pkg/multicluster/auth/manager.go +++ b/pkg/multicluster/auth/manager.go @@ -118,7 +118,7 @@ func (m *Manager) envForCluster(clusterID string) (*clusterEnv, error) { return nil, fmt.Errorf("loopback informer pool is required for cluster auth") } - cs, informers, _, err := m.opts.InformerPool.Get(clusterID) + cs, informers, stopCh, err := m.opts.InformerPool.Get(clusterID) if err != nil { m.mu.Unlock() return nil, err @@ -134,6 +134,7 @@ func (m *Manager) envForCluster(clusterID string) (*clusterEnv, error) { m.mu.Unlock() return nil, err } + informers.Start(stopCh) env := &clusterEnv{ cid: clusterID, diff --git a/pkg/multicluster/auth/tokenreview_hints.go b/pkg/multicluster/auth/tokenreview_hints.go new file mode 100644 index 0000000..a2485e9 --- /dev/null +++ b/pkg/multicluster/auth/tokenreview_hints.go @@ -0,0 +1,52 @@ +package auth + +import ( + "sync" + "time" +) + +const tokenReviewHintTTL = 15 * time.Second + +type tokenHintEntry struct { + clusterID string + expiresAt time.Time +} + +var tokenReviewHints sync.Map // map[token]tokenHintEntry + +// RememberTokenReviewHint stores a short-lived token->cluster mapping. +// Multicluster admission can call this before TokenReview storage invokes the +// authenticator on a synthetic request that does not carry cluster context. +func RememberTokenReviewHint(token, clusterID string) { + rememberTokenReviewHint(token, clusterID) +} + +func rememberTokenReviewHint(token, clusterID string) { + if token == "" || clusterID == "" { + return + } + tokenReviewHints.Store(token, tokenHintEntry{ + clusterID: clusterID, + expiresAt: time.Now().Add(tokenReviewHintTTL), + }) +} + +func clusterHintForToken(token string) string { + if token == "" { + return "" + } + v, ok := tokenReviewHints.Load(token) + if !ok { + return "" + } + entry, ok := v.(tokenHintEntry) + if !ok { + tokenReviewHints.Delete(token) + return "" + } + if time.Now().After(entry.expiresAt) { + tokenReviewHints.Delete(token) + return "" + } + return entry.clusterID +} diff --git a/pkg/multicluster/bootstrap/controllers.go b/pkg/multicluster/bootstrap/controllers.go new file mode 100644 index 0000000..5bd86a2 --- /dev/null +++ b/pkg/multicluster/bootstrap/controllers.go @@ -0,0 +1,69 @@ +package bootstrap + +import ( + "context" + "sync" + + clusterauthenticationtrust "k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust" + legacytokentracking "k8s.io/kubernetes/pkg/controlplane/controller/legacytokentracking" + + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" +) + +type InternalControllerOptions struct { + ClientForCluster func(clusterID string) (kubernetes.Interface, error) + StopChForCluster func(clusterID string) (<-chan struct{}, error) + ClusterAuthInfo clusterauthenticationtrust.ClusterAuthenticationInfo +} + +type InternalControllerManager struct { + opts InternalControllerOptions + mu sync.Mutex + run map[string]struct{} +} + +func NewInternalControllerManager(opts InternalControllerOptions) *InternalControllerManager { + return &InternalControllerManager{opts: opts, run: map[string]struct{}{}} +} + +func (m *InternalControllerManager) Ensure(clusterID string) { + if m == nil || clusterID == "" || m.opts.ClientForCluster == nil || m.opts.StopChForCluster == nil { + return + } + m.mu.Lock() + if _, ok := m.run[clusterID]; ok { + m.mu.Unlock() + return + } + m.run[clusterID] = struct{}{} + m.mu.Unlock() + + cs, err := m.opts.ClientForCluster(clusterID) + if err != nil { + klog.Errorf("mc.bootstrap internal controllers client failed cluster=%s: %v", clusterID, err) + return + } + stopCh, err := m.opts.StopChForCluster(clusterID) + if err != nil { + klog.Errorf("mc.bootstrap internal controllers stop channel failed cluster=%s: %v", clusterID, err) + return + } + ctx, cancel := context.WithCancel(context.Background()) + go func() { + <-stopCh + cancel() + }() + + legacy := legacytokentracking.NewController(cs) + go legacy.Run(stopCh) + + auth := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.opts.ClusterAuthInfo, cs) + if m.opts.ClusterAuthInfo.ClientCA != nil { + m.opts.ClusterAuthInfo.ClientCA.AddListener(auth) + } + if m.opts.ClusterAuthInfo.RequestHeaderCA != nil { + m.opts.ClusterAuthInfo.RequestHeaderCA.AddListener(auth) + } + go auth.Run(ctx, 1) +} diff --git a/pkg/multicluster/bootstrap/kubernetesservice_controller.go b/pkg/multicluster/bootstrap/kubernetesservice_controller.go new file mode 100644 index 0000000..b34f261 --- /dev/null +++ b/pkg/multicluster/bootstrap/kubernetesservice_controller.go @@ -0,0 +1,168 @@ +package bootstrap + +import ( + "context" + "net" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" +) + +type KubernetesServiceControllerOptions struct { + ClientForCluster func(clusterID string) (kubernetes.Interface, error) + StopChForCluster func(clusterID string) (<-chan struct{}, error) + PublicIP net.IP + ServicePort int + TargetPort int + NodePort int + Interval time.Duration +} + +type KubernetesServiceControllerManager struct { + opts KubernetesServiceControllerOptions + mu sync.Mutex + run map[string]struct{} +} + +func NewKubernetesServiceControllerManager(opts KubernetesServiceControllerOptions) *KubernetesServiceControllerManager { + if opts.Interval <= 0 { + opts.Interval = 10 * time.Second + } + return &KubernetesServiceControllerManager{opts: opts, run: map[string]struct{}{}} +} + +func (m *KubernetesServiceControllerManager) Ensure(clusterID string) { + if m == nil || clusterID == "" || m.opts.ClientForCluster == nil || m.opts.StopChForCluster == nil { + return + } + m.mu.Lock() + if _, ok := m.run[clusterID]; ok { + m.mu.Unlock() + return + } + m.run[clusterID] = struct{}{} + m.mu.Unlock() + + cs, err := m.opts.ClientForCluster(clusterID) + if err != nil { + klog.Errorf("mc.bootstrap kubernetes service controller client failed cluster=%s: %v", clusterID, err) + return + } + stopCh, err := m.opts.StopChForCluster(clusterID) + if err != nil { + klog.Errorf("mc.bootstrap kubernetes service controller stop channel failed cluster=%s: %v", clusterID, err) + return + } + c := &kubernetesServiceController{ + client: cs, + publicIP: m.opts.PublicIP, + servicePort: m.opts.ServicePort, + targetPort: m.opts.TargetPort, + nodePort: m.opts.NodePort, + interval: m.opts.Interval, + } + go c.Run(stopCh) +} + +type kubernetesServiceController struct { + client kubernetes.Interface + publicIP net.IP + servicePort int + targetPort int + nodePort int + interval time.Duration +} + +func (c *kubernetesServiceController) Run(stopCh <-chan struct{}) { + wait.NonSlidingUntil(func() { + if err := c.reconcile(); err != nil { + klog.Errorf("mc.bootstrap kubernetes service reconcile failed: %v", err) + } + }, c.interval, stopCh) +} + +func (c *kubernetesServiceController) reconcile() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + port := corev1.ServicePort{ + Protocol: corev1.ProtocolTCP, + Port: int32(c.servicePort), + Name: "https", + TargetPort: intstr.FromInt32(int32(c.targetPort)), + } + svcType := corev1.ServiceTypeClusterIP + if c.nodePort > 0 { + port.NodePort = int32(c.nodePort) + svcType = corev1.ServiceTypeNodePort + } + svcClient := c.client.CoreV1().Services(metav1.NamespaceDefault) + svc, err := svcClient.Get(ctx, "kubernetes", metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + return err + } + singleStack := corev1.IPFamilyPolicySingleStack + _, err = svcClient.Create(ctx, &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kubernetes", + Namespace: metav1.NamespaceDefault, + Labels: map[string]string{"provider": "kubernetes", "component": "apiserver"}, + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{port}, + Selector: nil, + IPFamilyPolicy: &singleStack, + SessionAffinity: corev1.ServiceAffinityNone, + Type: svcType, + }, + }, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return err + } + } else { + updated := svc.DeepCopy() + if len(updated.Spec.Ports) != 1 || updated.Spec.Ports[0].Port != port.Port || updated.Spec.Ports[0].TargetPort != port.TargetPort || updated.Spec.Ports[0].Protocol != port.Protocol || updated.Spec.Ports[0].Name != port.Name || (c.nodePort > 0 && updated.Spec.Ports[0].NodePort != port.NodePort) || updated.Spec.Type != svcType { + updated.Spec.Ports = []corev1.ServicePort{port} + updated.Spec.Type = svcType + if _, err := svcClient.Update(ctx, updated, metav1.UpdateOptions{}); err != nil && !apierrors.IsConflict(err) { + return err + } + } + } + + epClient := c.client.CoreV1().Endpoints(metav1.NamespaceDefault) + want := &corev1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kubernetes", + Namespace: metav1.NamespaceDefault, + }, + Subsets: []corev1.EndpointSubset{{ + Addresses: []corev1.EndpointAddress{{IP: c.publicIP.String()}}, + Ports: []corev1.EndpointPort{{Name: "https", Port: int32(c.targetPort), Protocol: corev1.ProtocolTCP}}, + }}, + } + existing, err := epClient.Get(ctx, "kubernetes", metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + _, err = epClient.Create(ctx, want, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return err + } + return nil + } + return err + } + existing = existing.DeepCopy() + existing.Subsets = want.Subsets + if _, err := epClient.Update(ctx, existing, metav1.UpdateOptions{}); err != nil && !apierrors.IsConflict(err) { + return err + } + return nil +} diff --git a/pkg/multicluster/bootstrap/rbac.go b/pkg/multicluster/bootstrap/rbac.go new file mode 100644 index 0000000..76f6192 --- /dev/null +++ b/pkg/multicluster/bootstrap/rbac.go @@ -0,0 +1,116 @@ +package bootstrap + +import ( + "context" + "fmt" + "sync" + "time" + + mc "github.com/kplane-dev/apiserver/pkg/multicluster" + "k8s.io/client-go/rest" + genericapiserver "k8s.io/apiserver/pkg/server" + rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest" + "k8s.io/kubernetes/plugin/pkg/auth/authorizer/rbac/bootstrappolicy" + "k8s.io/klog/v2" +) + +type RBACOptions struct { + BaseLoopbackClientConfig *rest.Config + PathPrefix string + ControlPlaneSegment string + Timeout time.Duration +} + +// RBACBootstrapper applies upstream default RBAC bootstrap policy per cluster. +// It preserves upstream policy data and reconciliation semantics while making +// the target cluster explicit via a cluster-scoped loopback host. +type RBACBootstrapper struct { + base *rest.Config + pathPrefix string + controlPlaneSegment string + timeout time.Duration + + mu sync.Mutex + done map[string]struct{} + inflight map[string]chan struct{} +} + +func NewRBACBootstrapper(opts RBACOptions) *RBACBootstrapper { + timeout := opts.Timeout + if timeout <= 0 { + timeout = 35 * time.Second + } + base := opts.BaseLoopbackClientConfig + if base != nil { + base = rest.CopyConfig(base) + } + return &RBACBootstrapper{ + base: base, + pathPrefix: opts.PathPrefix, + controlPlaneSegment: opts.ControlPlaneSegment, + timeout: timeout, + done: map[string]struct{}{}, + inflight: map[string]chan struct{}{}, + } +} + +func (b *RBACBootstrapper) Ensure(clusterID string) { + if b == nil || clusterID == "" || b.base == nil { + return + } + + b.mu.Lock() + if _, ok := b.done[clusterID]; ok { + b.mu.Unlock() + return + } + if ch, ok := b.inflight[clusterID]; ok { + b.mu.Unlock() + <-ch + return + } + ch := make(chan struct{}) + b.inflight[clusterID] = ch + b.mu.Unlock() + + err := b.bootstrap(clusterID) + + b.mu.Lock() + delete(b.inflight, clusterID) + if err == nil { + b.done[clusterID] = struct{}{} + } + close(ch) + b.mu.Unlock() + if err != nil { + klog.Errorf("mc.bootstrap rbac failed cluster=%s: %v", clusterID, err) + } +} + +func (b *RBACBootstrapper) bootstrap(clusterID string) error { + cfg := rest.CopyConfig(b.base) + host, err := mc.ClusterHost(cfg.Host, mc.Options{ + PathPrefix: b.pathPrefix, + ControlPlaneSegment: b.controlPlaneSegment, + }, clusterID) + if err != nil { + return fmt.Errorf("build cluster host: %w", err) + } + cfg.Host = host + + policy := &rbacrest.PolicyData{ + ClusterRoles: append(bootstrappolicy.ClusterRoles(), bootstrappolicy.ControllerRoles()...), + ClusterRoleBindings: append(bootstrappolicy.ClusterRoleBindings(), bootstrappolicy.ControllerRoleBindings()...), + Roles: bootstrappolicy.NamespaceRoles(), + RoleBindings: bootstrappolicy.NamespaceRoleBindings(), + ClusterRolesToAggregate: bootstrappolicy.ClusterRolesToAggregate(), + ClusterRoleBindingsToSplit: bootstrappolicy.ClusterRoleBindingsToSplit(), + } + hook := policy.EnsureRBACPolicy() + ctx, cancel := context.WithTimeout(context.Background(), b.timeout) + defer cancel() + return hook(genericapiserver.PostStartHookContext{ + LoopbackClientConfig: cfg, + Context: ctx, + }) +} diff --git a/pkg/multicluster/bootstrap/rbac_test.go b/pkg/multicluster/bootstrap/rbac_test.go new file mode 100644 index 0000000..ea0417f --- /dev/null +++ b/pkg/multicluster/bootstrap/rbac_test.go @@ -0,0 +1,40 @@ +package bootstrap + +import ( + "testing" + "time" + + mc "github.com/kplane-dev/apiserver/pkg/multicluster" + "k8s.io/client-go/rest" +) + +func TestRBACBootstrapperSkipsNilBaseConfig(t *testing.T) { + b := NewRBACBootstrapper(RBACOptions{ + BaseLoopbackClientConfig: nil, + Timeout: time.Second, + }) + // Should no-op and not panic. + b.Ensure("vc-1") +} + +func TestRBACBootstrapperBuildsClusterHost(t *testing.T) { + b := NewRBACBootstrapper(RBACOptions{ + BaseLoopbackClientConfig: &rest.Config{Host: "https://127.0.0.1:6443"}, + PathPrefix: mc.DefaultPathPrefix, + ControlPlaneSegment: mc.DefaultControlPlaneSegment, + Timeout: time.Second, + }) + + cfg := rest.CopyConfig(b.base) + host, err := mc.ClusterHost(cfg.Host, mc.Options{ + PathPrefix: b.pathPrefix, + ControlPlaneSegment: b.controlPlaneSegment, + }, "vc-1") + if err != nil { + t.Fatalf("ClusterHost() error = %v", err) + } + want := "https://127.0.0.1:6443/clusters/vc-1/control-plane" + if host != want { + t.Fatalf("unexpected host: got %q want %q", host, want) + } +} diff --git a/pkg/multicluster/bootstrap/servicecidr.go b/pkg/multicluster/bootstrap/servicecidr.go new file mode 100644 index 0000000..4aae1f9 --- /dev/null +++ b/pkg/multicluster/bootstrap/servicecidr.go @@ -0,0 +1,94 @@ +package bootstrap + +import ( + "context" + "net" + "sync" + "time" + + "k8s.io/client-go/kubernetes" + defaultservicecidr "k8s.io/kubernetes/pkg/controlplane/controller/defaultservicecidr" + "k8s.io/klog/v2" +) + +type ServiceCIDROptions struct { + ClientForCluster func(clusterID string) (kubernetes.Interface, error) + PrimaryRange net.IPNet + SecondaryRange net.IPNet + Enabled bool + Timeout time.Duration +} + +// ServiceCIDRBootstrapper ensures the default ServiceCIDR object per cluster +// using the upstream controller logic. +type ServiceCIDRBootstrapper struct { + clientForCluster func(clusterID string) (kubernetes.Interface, error) + primaryRange net.IPNet + secondaryRange net.IPNet + enabled bool + timeout time.Duration + + mu sync.Mutex + done map[string]struct{} + inflight map[string]chan struct{} +} + +func NewServiceCIDRBootstrapper(opts ServiceCIDROptions) *ServiceCIDRBootstrapper { + timeout := opts.Timeout + if timeout <= 0 { + timeout = 10 * time.Second + } + return &ServiceCIDRBootstrapper{ + clientForCluster: opts.ClientForCluster, + primaryRange: opts.PrimaryRange, + secondaryRange: opts.SecondaryRange, + enabled: opts.Enabled, + timeout: timeout, + done: map[string]struct{}{}, + inflight: map[string]chan struct{}{}, + } +} + +func (b *ServiceCIDRBootstrapper) Ensure(clusterID string) { + if b == nil || !b.enabled || clusterID == "" || b.clientForCluster == nil { + return + } + b.mu.Lock() + if _, ok := b.done[clusterID]; ok { + b.mu.Unlock() + return + } + if ch, ok := b.inflight[clusterID]; ok { + b.mu.Unlock() + <-ch + return + } + ch := make(chan struct{}) + b.inflight[clusterID] = ch + b.mu.Unlock() + + err := b.bootstrap(clusterID) + + b.mu.Lock() + delete(b.inflight, clusterID) + if err == nil { + b.done[clusterID] = struct{}{} + } + close(ch) + b.mu.Unlock() + if err != nil { + klog.Errorf("mc.bootstrap servicecidr failed cluster=%s: %v", clusterID, err) + } +} + +func (b *ServiceCIDRBootstrapper) bootstrap(clusterID string) error { + client, err := b.clientForCluster(clusterID) + if err != nil { + return err + } + ctrl := defaultservicecidr.NewController(b.primaryRange, b.secondaryRange, client) + ctx, cancel := context.WithTimeout(context.Background(), b.timeout) + defer cancel() + ctrl.Start(ctx) + return nil +} diff --git a/pkg/multicluster/bootstrap/systemnamespaces.go b/pkg/multicluster/bootstrap/systemnamespaces.go new file mode 100644 index 0000000..9aa298f --- /dev/null +++ b/pkg/multicluster/bootstrap/systemnamespaces.go @@ -0,0 +1,108 @@ +package bootstrap + +import ( + "context" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" +) + +type SystemNamespaceOptions struct { + ClientForCluster func(clusterID string) (kubernetes.Interface, error) + Namespaces []string + Timeout time.Duration +} + +// SystemNamespaceBootstrapper ensures system namespaces exist in each cluster. +// It runs once per cluster and retries on a later request if a prior attempt failed. +type SystemNamespaceBootstrapper struct { + clientForCluster func(clusterID string) (kubernetes.Interface, error) + namespaces []string + timeout time.Duration + + mu sync.Mutex + done map[string]struct{} + inflight map[string]chan struct{} +} + +func NewSystemNamespaceBootstrapper(opts SystemNamespaceOptions) *SystemNamespaceBootstrapper { + timeout := opts.Timeout + if timeout <= 0 { + timeout = 5 * time.Second + } + return &SystemNamespaceBootstrapper{ + clientForCluster: opts.ClientForCluster, + namespaces: append([]string(nil), opts.Namespaces...), + timeout: timeout, + done: map[string]struct{}{}, + inflight: map[string]chan struct{}{}, + } +} + +func (b *SystemNamespaceBootstrapper) Ensure(clusterID string) { + if clusterID == "" || b == nil || b.clientForCluster == nil || len(b.namespaces) == 0 { + return + } + + b.mu.Lock() + if _, ok := b.done[clusterID]; ok { + b.mu.Unlock() + return + } + if ch, ok := b.inflight[clusterID]; ok { + b.mu.Unlock() + <-ch + return + } + ch := make(chan struct{}) + b.inflight[clusterID] = ch + b.mu.Unlock() + + err := b.bootstrap(clusterID) + + b.mu.Lock() + delete(b.inflight, clusterID) + if err == nil { + b.done[clusterID] = struct{}{} + } + close(ch) + b.mu.Unlock() + + if err != nil { + klog.Errorf("mc.bootstrap system namespaces failed cluster=%s: %v", clusterID, err) + } +} + +func (b *SystemNamespaceBootstrapper) bootstrap(clusterID string) error { + client, err := b.clientForCluster(clusterID) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), b.timeout) + defer cancel() + + for _, ns := range b.namespaces { + if ns == "" { + continue + } + _, err := client.CoreV1().Namespaces().Get(ctx, ns, metav1.GetOptions{}) + if err == nil { + continue + } + if !apierrors.IsNotFound(err) { + return err + } + _, err = client.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: ns}, + }, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return err + } + } + return nil +} diff --git a/pkg/multicluster/bootstrap/systemnamespaces_test.go b/pkg/multicluster/bootstrap/systemnamespaces_test.go new file mode 100644 index 0000000..1867c65 --- /dev/null +++ b/pkg/multicluster/bootstrap/systemnamespaces_test.go @@ -0,0 +1,81 @@ +package bootstrap + +import ( + "fmt" + "sync/atomic" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" +) + +func TestSystemNamespaceBootstrapperEnsureCreatesOnce(t *testing.T) { + cs := fake.NewSimpleClientset() + var clientCalls int32 + b := NewSystemNamespaceBootstrapper(SystemNamespaceOptions{ + ClientForCluster: func(clusterID string) (kubernetes.Interface, error) { + atomic.AddInt32(&clientCalls, 1) + return cs, nil + }, + Namespaces: []string{"default", "kube-system", "kube-public"}, + Timeout: 2 * time.Second, + }) + + b.Ensure("vc-1") + b.Ensure("vc-1") + + if got := atomic.LoadInt32(&clientCalls); got != 1 { + t.Fatalf("expected one client creation call, got %d", got) + } + for _, ns := range []string{"default", "kube-system", "kube-public"} { + if _, err := cs.CoreV1().Namespaces().Get(t.Context(), ns, metav1.GetOptions{}); err != nil { + t.Fatalf("expected namespace %q to exist: %v", ns, err) + } + } +} + +func TestSystemNamespaceBootstrapperEnsureRetriesAfterFailure(t *testing.T) { + cs := fake.NewSimpleClientset() + var calls int32 + b := NewSystemNamespaceBootstrapper(SystemNamespaceOptions{ + ClientForCluster: func(clusterID string) (kubernetes.Interface, error) { + n := atomic.AddInt32(&calls, 1) + if n == 1 { + return nil, fmt.Errorf("temporary client error") + } + return cs, nil + }, + Namespaces: []string{"default"}, + Timeout: 2 * time.Second, + }) + + b.Ensure("vc-2") + if _, err := cs.CoreV1().Namespaces().Get(t.Context(), "default", metav1.GetOptions{}); err == nil { + t.Fatalf("default namespace should not exist after failed first attempt") + } + + b.Ensure("vc-2") + if _, err := cs.CoreV1().Namespaces().Get(t.Context(), "default", metav1.GetOptions{}); err != nil { + t.Fatalf("default namespace should exist after retry: %v", err) + } +} + +func TestSystemNamespaceBootstrapperEnsureSkipsEmptyCluster(t *testing.T) { + cs := fake.NewSimpleClientset(&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}}) + var calls int32 + b := NewSystemNamespaceBootstrapper(SystemNamespaceOptions{ + ClientForCluster: func(clusterID string) (kubernetes.Interface, error) { + atomic.AddInt32(&calls, 1) + return cs, nil + }, + Namespaces: []string{"kube-system"}, + }) + + b.Ensure("") + if got := atomic.LoadInt32(&calls); got != 0 { + t.Fatalf("expected no client calls for empty cluster, got %d", got) + } +} diff --git a/pkg/multicluster/handler.go b/pkg/multicluster/handler.go index 9070611..7a38525 100644 --- a/pkg/multicluster/handler.go +++ b/pkg/multicluster/handler.go @@ -19,6 +19,9 @@ func WithClusterRouting(next http.Handler, ex Extractor, o Options) http.Handler if cid == "" { cid = o.DefaultCluster } + if o.OnClusterSelected != nil && cid != "" { + o.OnClusterSelected(cid) + } next.ServeHTTP(w, r.WithContext(WithCluster(r.Context(), cid, all))) }) } diff --git a/pkg/multicluster/informerpool.go b/pkg/multicluster/informerpool.go index 3fc691d..d542927 100644 --- a/pkg/multicluster/informerpool.go +++ b/pkg/multicluster/informerpool.go @@ -28,8 +28,6 @@ type informerEntry struct { factory informers.SharedInformerFactory stopCh <-chan struct{} ownedCh chan struct{} - - startOnce sync.Once } func NewInformerPool(opts InformerPoolOptions) *InformerPool { @@ -91,9 +89,10 @@ func (p *InformerPool) Get(clusterID string) (kubernetes.Interface, informers.Sh } func (e *informerEntry) start() { - e.startOnce.Do(func() { - e.factory.Start(e.stopCh) - }) + // SharedInformerFactory.Start is idempotent and can be called repeatedly. + // Calling it on every Get ensures informers that are registered after the + // first Start call are also started. + e.factory.Start(e.stopCh) } func (p *InformerPool) StopCluster(clusterID string) { diff --git a/pkg/multicluster/options.go b/pkg/multicluster/options.go index a8fc51e..45cad35 100644 --- a/pkg/multicluster/options.go +++ b/pkg/multicluster/options.go @@ -25,6 +25,9 @@ type Options struct { ClusterFieldKey string WatchStrategy WatchStrategy ClusterSource ClusterSource + // OnClusterSelected is invoked by routing middleware after cluster extraction. + // It can be used to lazily initialize per-cluster bootstrap state. + OnClusterSelected func(clusterID string) // ServerName identifies the apiserver instance using this options set (for metrics/logging). ServerName string } diff --git a/test/smoke/apiserver_test.go b/test/smoke/apiserver_test.go index 02d4827..39aef50 100644 --- a/test/smoke/apiserver_test.go +++ b/test/smoke/apiserver_test.go @@ -113,6 +113,7 @@ func mustWriteTokenFile(t *testing.T, path string) string { type apiserverOptions struct { rootCluster string + etcdPrefix string extraArgs []string } @@ -141,10 +142,16 @@ func startAPIServerWithOptions(t *testing.T, etcdEndpoints string, opts apiserve if opts.rootCluster == "" { opts.rootCluster = mc.DefaultClusterName } + if opts.etcdPrefix == "" { + name := strings.ToLower(t.Name()) + name = strings.NewReplacer("/", "-", " ", "-", "_", "-", ":", "-").Replace(name) + opts.etcdPrefix = fmt.Sprintf("/registry-smoke-%s-%d", name, time.Now().UnixNano()) + } s.root = opts.rootCluster args := []string{ "--etcd-servers=" + etcdEndpoints, + "--etcd-prefix=" + opts.etcdPrefix, "--cert-dir=" + filepath.Join(tmp, "certs"), "--secure-port=" + fmt.Sprintf("%d", port), "--enable-aggregator-routing=true", @@ -153,7 +160,10 @@ func startAPIServerWithOptions(t *testing.T, etcdEndpoints string, opts apiserve "--token-auth-file=" + mustWriteTokenFile(t, filepath.Join(tmp, "tokens.csv")), "--allow-privileged=true", "--service-cluster-ip-range=10.0.0.0/24", + "--service-cidr-sharing-mode=shared", + "--kubernetes-service-mode=shared", "--service-account-issuer=test", + "--api-audiences=test,https://kubernetes.default.svc", "--service-account-signing-key-file=" + mustWriteRSAKey(t, filepath.Join(tmp, "sa.key")), "--service-account-key-file=" + filepath.Join(tmp, "sa.key"), // reduce noise + make startup faster for tests diff --git a/test/smoke/auth_test.go b/test/smoke/auth_test.go index 4ab8d51..0378b3d 100644 --- a/test/smoke/auth_test.go +++ b/test/smoke/auth_test.go @@ -98,6 +98,12 @@ func TestServiceAccountTokenIsolationAcrossClusters(t *testing.T) { csA := kubeClientForCluster(t, s, clusterA) ns := "default" + if err := waitForNamespace(ctx, csRoot, ns); err != nil { + t.Fatalf("cluster=%s wait for namespace %q: %v", root, ns, err) + } + if err := waitForNamespace(ctx, csA, ns); err != nil { + t.Fatalf("cluster=%s wait for namespace %q: %v", clusterA, ns, err) + } saRoot := "sa-" + randSuffix(3) saA := "sa-" + randSuffix(3) @@ -167,7 +173,7 @@ func requestServiceAccountToken(ctx context.Context, t *testing.T, cs kubernetes func waitForTokenReview(ctx context.Context, t *testing.T, cs kubernetes.Interface, token string, wantAuthenticated bool) { t.Helper() - deadline := time.Now().Add(30 * time.Second) + deadline := time.Now().Add(90 * time.Second) var lastErr error for time.Now().Before(deadline) { resp, err := cs.AuthenticationV1().TokenReviews().Create(ctx, &authenticationv1.TokenReview{ diff --git a/test/smoke/bootstrap_defaults_test.go b/test/smoke/bootstrap_defaults_test.go new file mode 100644 index 0000000..04c4c87 --- /dev/null +++ b/test/smoke/bootstrap_defaults_test.go @@ -0,0 +1,56 @@ +package smoke + +import ( + "crypto/tls" + "io" + "net/http" + "os" + "strings" + "testing" + "time" +) + +func TestUpstreamControlPlanePostStartHooksStillPresent(t *testing.T) { + etcd := os.Getenv("ETCD_ENDPOINTS") + s := startAPIServer(t, etcd) + + client := &http.Client{ + Timeout: 3 * time.Second, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, // test only + }, + } + + req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, s.clusterURL(s.root)+"/readyz?verbose=1", nil) + if err != nil { + t.Fatalf("build request: %v", err) + } + req.Header.Set("Authorization", "Bearer smoketoken") + resp, err := client.Do(req) + if err != nil { + t.Fatalf("readyz request failed: %v", err) + } + defer resp.Body.Close() + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("read readyz body: %v", err) + } + body := string(bodyBytes) + if resp.StatusCode != http.StatusOK { + t.Fatalf("readyz non-200 status=%d body=%s", resp.StatusCode, body) + } + + // These hook names are upstream defaults we rely on. + required := []string{ + "poststarthook/start-system-namespaces-controller", + "poststarthook/bootstrap-controller", + "poststarthook/start-cluster-authentication-info-controller", + "poststarthook/start-legacy-token-tracking-controller", + "poststarthook/rbac/bootstrap-roles", + } + for _, needle := range required { + if !strings.Contains(body, needle) { + t.Fatalf("expected readyz to include %q\nbody:\n%s", needle, body) + } + } +} diff --git a/test/smoke/bootstrap_test.go b/test/smoke/bootstrap_test.go new file mode 100644 index 0000000..4d8d154 --- /dev/null +++ b/test/smoke/bootstrap_test.go @@ -0,0 +1,36 @@ +package smoke + +import ( + "os" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestSystemNamespacesBootstrapPerCluster(t *testing.T) { + etcd := os.Getenv("ETCD_ENDPOINTS") + s := startAPIServer(t, etcd) + + clusterID := "vc-bootstrap" + cs := kubeClientForCluster(t, s, clusterID) + want := []string{"default", "kube-system", "kube-public", "kube-node-lease"} + + deadline := time.Now().Add(10 * time.Second) + for { + missing := "" + for _, ns := range want { + if _, err := cs.CoreV1().Namespaces().Get(t.Context(), ns, metav1.GetOptions{}); err != nil { + missing = ns + break + } + } + if missing == "" { + return + } + if time.Now().After(deadline) { + t.Fatalf("timed out waiting for system namespace bootstrap in cluster=%s\nlogs:\n%s", clusterID, s.logs()) + } + time.Sleep(250 * time.Millisecond) + } +} diff --git a/test/smoke/core_bootstrap_test.go b/test/smoke/core_bootstrap_test.go new file mode 100644 index 0000000..6ed8dd2 --- /dev/null +++ b/test/smoke/core_bootstrap_test.go @@ -0,0 +1,76 @@ +package smoke + +import ( + "os" + "testing" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestKubernetesServiceBootstrapPerCluster(t *testing.T) { + etcd := os.Getenv("ETCD_ENDPOINTS") + s := startAPIServerWithOptions(t, etcd, apiserverOptions{ + extraArgs: []string{"--kubernetes-service-mode=per-cluster-autoip"}, + }) + + clusterID := "vc-core-bootstrap" + cs := kubeClientForCluster(t, s, clusterID) + + // Trigger cluster callback with a cluster-scoped discovery call. + if _, err := cs.Discovery().ServerVersion(); err != nil { + t.Fatalf("trigger request failed: %v", err) + } + + deadline := time.Now().Add(10 * time.Second) + for { + svc, err := cs.CoreV1().Services(metav1.NamespaceDefault).Get(t.Context(), "kubernetes", metav1.GetOptions{}) + if err == nil { + if svc.Spec.ClusterIP == "" { + t.Fatalf("kubernetes service exists but clusterIP is empty") + } + eps, epErr := cs.CoreV1().Endpoints(metav1.NamespaceDefault).Get(t.Context(), "kubernetes", metav1.GetOptions{}) + if epErr == nil && len(eps.Subsets) > 0 { + return + } + } + if time.Now().After(deadline) { + t.Fatalf("timed out waiting for kubernetes service bootstrap in cluster=%s: %v\nlogs:\n%s", clusterID, err, s.logs()) + } + time.Sleep(250 * time.Millisecond) + } +} + +func TestServiceCIDRBootstrapPerCluster(t *testing.T) { + etcd := os.Getenv("ETCD_ENDPOINTS") + s := startAPIServerWithOptions(t, etcd, apiserverOptions{ + extraArgs: []string{"--service-cidr-sharing-mode=per-cluster"}, + }) + + clusterID := "vc-servicecidr-bootstrap" + cs := kubeClientForCluster(t, s, clusterID) + + // Trigger cluster callback with a cluster-scoped discovery call. + if _, err := cs.Discovery().ServerVersion(); err != nil { + t.Fatalf("trigger request failed: %v", err) + } + + deadline := time.Now().Add(10 * time.Second) + for { + _, err := cs.NetworkingV1().ServiceCIDRs().Get(t.Context(), "kubernetes", metav1.GetOptions{}) + if err == nil { + return + } + if apierrors.IsNotFound(err) { + // keep waiting while bootstrap may still be running + } else if apierrors.IsMethodNotSupported(err) || apierrors.IsNotAcceptable(err) { + t.Skip("ServiceCIDR API not enabled") + } + if time.Now().After(deadline) { + // ServiceCIDR may be disabled by feature gates in this binary/config. + t.Skipf("ServiceCIDR bootstrap not observed (likely feature-disabled): %v", err) + } + time.Sleep(250 * time.Millisecond) + } +} diff --git a/test/smoke/crud_test.go b/test/smoke/crud_test.go index 223dece..4504289 100644 --- a/test/smoke/crud_test.go +++ b/test/smoke/crud_test.go @@ -37,6 +37,9 @@ func TestCRUD_RandomizedAcrossClusters(t *testing.T) { cs := kubeClientForCluster(t, s, cid) ns := "default" + if err := waitForNamespace(ctx, cs, ns); err != nil { + t.Fatalf("cluster=%s wait for namespace %q: %v", cid, ns, err) + } cmName := "cm-" + randSuffix(4) _, err := cs.CoreV1().ConfigMaps(ns).Create(ctx, &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ diff --git a/test/smoke/helpers_test.go b/test/smoke/helpers_test.go new file mode 100644 index 0000000..1c5d697 --- /dev/null +++ b/test/smoke/helpers_test.go @@ -0,0 +1,28 @@ +package smoke + +import ( + "context" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" +) + +func waitForNamespace(ctx context.Context, cs kubernetes.Interface, namespace string) error { + deadline := time.Now().Add(30 * time.Second) + var lastErr error + for time.Now().Before(deadline) { + _, err := cs.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{}) + if err == nil { + return nil + } + lastErr = err + if !apierrors.IsNotFound(err) { + time.Sleep(200 * time.Millisecond) + continue + } + time.Sleep(200 * time.Millisecond) + } + return lastErr +} diff --git a/test/smoke/internal_controllers_test.go b/test/smoke/internal_controllers_test.go new file mode 100644 index 0000000..fa3286b --- /dev/null +++ b/test/smoke/internal_controllers_test.go @@ -0,0 +1,56 @@ +package smoke + +import ( + "os" + "strings" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestClusterAuthInfoControllerPerCluster(t *testing.T) { + etcd := os.Getenv("ETCD_ENDPOINTS") + s := startAPIServer(t, etcd) + + clusterID := "vc-authinfo-bootstrap" + cs := kubeClientForCluster(t, s, clusterID) + if _, err := cs.Discovery().ServerVersion(); err != nil { + t.Fatalf("trigger request failed: %v", err) + } + + deadline := time.Now().Add(12 * time.Second) + for { + // Root apiserver startup logs one instance; non-root bootstrap should start another. + if strings.Count(s.logs(), "Starting cluster_authentication_trust_controller controller") >= 2 { + return + } + if time.Now().After(deadline) { + t.Fatalf("timed out waiting for per-cluster cluster_authentication_trust_controller start in cluster=%s\nlogs:\n%s", clusterID, s.logs()) + } + time.Sleep(250 * time.Millisecond) + } +} + +func TestLegacyTokenTrackingControllerPerCluster(t *testing.T) { + etcd := os.Getenv("ETCD_ENDPOINTS") + s := startAPIServer(t, etcd) + + clusterID := "vc-legacy-token-bootstrap" + cs := kubeClientForCluster(t, s, clusterID) + if _, err := cs.Discovery().ServerVersion(); err != nil { + t.Fatalf("trigger request failed: %v", err) + } + + deadline := time.Now().Add(12 * time.Second) + for { + cm, err := cs.CoreV1().ConfigMaps("kube-system").Get(t.Context(), "kube-apiserver-legacy-service-account-token-tracking", metav1.GetOptions{}) + if err == nil && cm.Data != nil && cm.Data["since"] != "" { + return + } + if time.Now().After(deadline) { + t.Fatalf("timed out waiting for legacy token tracking configmap in cluster=%s: %v\nlogs:\n%s", clusterID, err, s.logs()) + } + time.Sleep(250 * time.Millisecond) + } +} diff --git a/test/smoke/rbac_bootstrap_test.go b/test/smoke/rbac_bootstrap_test.go new file mode 100644 index 0000000..73fef79 --- /dev/null +++ b/test/smoke/rbac_bootstrap_test.go @@ -0,0 +1,34 @@ +package smoke + +import ( + "os" + "testing" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestRBACBootstrapPerCluster(t *testing.T) { + etcd := os.Getenv("ETCD_ENDPOINTS") + s := startAPIServer(t, etcd) + + clusterID := "vc-rbac-bootstrap" + cs := kubeClientForCluster(t, s, clusterID) + + // Trigger a request in the target cluster, which runs our per-cluster bootstrap callback. + if _, err := cs.Discovery().ServerVersion(); err != nil { + t.Fatalf("trigger request failed: %v", err) + } + + deadline := time.Now().Add(20 * time.Second) + for { + _, err := cs.RbacV1().ClusterRoles().Get(t.Context(), "cluster-admin", metav1.GetOptions{}) + if err == nil { + return + } + if time.Now().After(deadline) { + t.Fatalf("timed out waiting for RBAC bootstrap in cluster=%s: %v\nlogs:\n%s", clusterID, err, s.logs()) + } + time.Sleep(250 * time.Millisecond) + } +} diff --git a/test/smoke/webhook_test.go b/test/smoke/webhook_test.go index cb31578..7387bef 100644 --- a/test/smoke/webhook_test.go +++ b/test/smoke/webhook_test.go @@ -14,6 +14,7 @@ import ( "net" "net/http" "os" + "strings" "testing" "time" @@ -164,6 +165,12 @@ func TestWebhook_ServiceBacked_MulticlusterScoping(t *testing.T) { csRoot := kubeClientForCluster(t, s, root) ns := "default" + if err := waitForNamespace(ctx, csA, ns); err != nil { + t.Fatalf("cluster=%s wait for namespace %q: %v", clusterA, ns, err) + } + if err := waitForNamespace(ctx, csB, ns); err != nil { + t.Fatalf("cluster=%s wait for namespace %q: %v", clusterB, ns, err) + } // Service + EndpointSlice that resolves to our local webhook server. svcName := "svc-wh-" + randSuffix(3) @@ -288,7 +295,7 @@ func TestWebhook_ServiceBacked_MulticlusterScoping(t *testing.T) { if err == nil { t.Fatalf("expected webhook to deny configmap create in cluster=%s, but it succeeded (this indicates the webhook registration/resolution is scoped wrong)", clusterA) } - if !apierrors.IsForbidden(err) { + if !apierrors.IsForbidden(err) && !strings.Contains(err.Error(), "denied by smoke webhook") { t.Fatalf("expected forbidden from webhook in cluster=%s, got: %v", clusterA, err) }