Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ docker run --rm -p 2379:2379 -e ALLOW_NONE_AUTHENTICATION=yes -e ETCD_ADVERTISE_
--allow-privileged=true \
--authorization-mode=AlwaysAllow \
--anonymous-auth=true \
--service-cidr-sharing-mode=per-cluster \
--kubernetes-service-mode=per-cluster-autoip \
--secure-port=6443
```

Expand All @@ -56,6 +58,30 @@ When changed, the readiness path becomes:
https://127.0.0.1:6443/clusters/default/control-plane/readyz
```

#### Multicluster flags
These flags control control-plane topology behavior. They are optional because
the server defaults to per-cluster behavior.

- `--root-control-plane-name` (default: `root`): name of the root control
plane used for default routing and root-level configuration.
- `--service-cidr-sharing-mode` (default: `per-cluster`): service CIDR
bootstrap mode.
- `per-cluster`: bootstrap a default `ServiceCIDR` per virtual control plane.
- `shared`: keep only root-managed upstream behavior.
- `--kubernetes-service-mode` (default: `per-cluster-autoip`): `default/kubernetes`
service behavior.
- `per-cluster-autoip`: reconcile `default/kubernetes` in each virtual control
plane with allocator-assigned ClusterIP.
- `shared`: keep only root-managed upstream behavior.

Example shared mode:
```bash
./apiserver \
... \
--service-cidr-sharing-mode=shared \
--kubernetes-service-mode=shared
```

#### Webhooks
Admission webhooks are scoped per control plane. For each cluster, the server
creates a per-cluster webhook client/informer stack and resolves Services and
Expand Down
57 changes: 57 additions & 0 deletions cmd/apiserver/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
namespaceplugin "k8s.io/apiserver/pkg/admission/plugin/namespace/lifecycle"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/server"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/klog/v2"
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
Expand All @@ -34,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/controlplane"
controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver"
"k8s.io/kubernetes/pkg/features"
generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"

"github.com/kplane-dev/apiserver/cmd/apiserver/app/options"
Expand All @@ -42,6 +44,7 @@ import (
mcnsl "github.com/kplane-dev/apiserver/pkg/multicluster/admission/namespace"
mcwh "github.com/kplane-dev/apiserver/pkg/multicluster/admission/webhook"
mcauth "github.com/kplane-dev/apiserver/pkg/multicluster/auth"
mcbootstrap "github.com/kplane-dev/apiserver/pkg/multicluster/bootstrap"
)

type Config struct {
Expand Down Expand Up @@ -118,6 +121,21 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
}
clientPool := mc.NewClientPool(genericConfig.LoopbackClientConfig, mcOpts.PathPrefix, mcOpts.ControlPlaneSegment)
informerPool := mc.NewInformerPoolFromClientPool(clientPool, 0, genericConfig.DrainedNotify())
systemNamespaceBootstrapper := mcbootstrap.NewSystemNamespaceBootstrapper(mcbootstrap.SystemNamespaceOptions{
ClientForCluster: clientPool.KubeClientForCluster,
Namespaces: opts.SystemNamespaces,
})
serviceCIDRBootstrapper := mcbootstrap.NewServiceCIDRBootstrapper(mcbootstrap.ServiceCIDROptions{
ClientForCluster: clientPool.KubeClientForCluster,
PrimaryRange: opts.PrimaryServiceClusterIPRange,
SecondaryRange: opts.SecondaryServiceClusterIPRange,
Enabled: utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) && opts.ServiceCIDRSharingMode == options.ServiceCIDRSharingModePerCluster,
})
rbacBootstrapper := mcbootstrap.NewRBACBootstrapper(mcbootstrap.RBACOptions{
BaseLoopbackClientConfig: genericConfig.LoopbackClientConfig,
PathPrefix: mcOpts.PathPrefix,
ControlPlaneSegment: mcOpts.ControlPlaneSegment,
})
genericConfig.BuildHandlerChainFunc = func(h http.Handler, conf *server.Config) http.Handler {
ex := mc.PathExtractor{PathPrefix: mcOpts.PathPrefix, ControlPlaneSegment: mcOpts.ControlPlaneSegment}
return mc.WithClusterRouting(server.DefaultBuildHandlerChain(h, conf), ex, mcOpts)
Expand Down Expand Up @@ -179,6 +197,45 @@ func NewConfig(opts options.CompletedOptions) (*Config, error) {
if c.KubeAPIs.ControlPlane.Generic.RESTOptionsGetter != nil {
c.KubeAPIs.ControlPlane.Generic.RESTOptionsGetter = decorateRESTOptionsGetter("controlplane", c.KubeAPIs.ControlPlane.Generic.RESTOptionsGetter, mcOpts)
}
targetPort := 443
if opts.SecureServing != nil && opts.SecureServing.BindPort > 0 {
targetPort = opts.SecureServing.BindPort
}
stopChForCluster := func(clusterID string) (<-chan struct{}, error) {
_, _, stopCh, err := informerPool.Get(clusterID)
if err != nil {
return nil, err
}
return stopCh, nil
}
internalControllerMgr := mcbootstrap.NewInternalControllerManager(mcbootstrap.InternalControllerOptions{
ClientForCluster: clientPool.KubeClientForCluster,
StopChForCluster: stopChForCluster,
ClusterAuthInfo: kubeAPIs.ControlPlane.Extra.ClusterAuthenticationInfo,
})
kubeServiceControllerMgr := mcbootstrap.NewKubernetesServiceControllerManager(mcbootstrap.KubernetesServiceControllerOptions{
ClientForCluster: clientPool.KubeClientForCluster,
StopChForCluster: stopChForCluster,
PublicIP: kubeAPIs.ControlPlane.Generic.PublicAddress,
ServicePort: 443,
TargetPort: targetPort,
NodePort: opts.KubernetesServiceNodePort,
})
mcOpts.OnClusterSelected = func(clusterID string) {
// Preserve upstream root bootstrap as-is; only add multicluster bootstrap for non-root VCPs.
if clusterID == mcOpts.DefaultCluster {
return
}
// Run asynchronously to avoid recursive request deadlocks when bootstrap logic
// uses cluster-scoped clients that route back through this same middleware.
go systemNamespaceBootstrapper.Ensure(clusterID)
go serviceCIDRBootstrapper.Ensure(clusterID)
go rbacBootstrapper.Ensure(clusterID)
go internalControllerMgr.Ensure(clusterID)
if opts.KubernetesServiceMode == options.KubernetesServiceModePerClusterAutoIP {
go kubeServiceControllerMgr.Ensure(clusterID)
}
}

// Cluster-aware webhook admission (per-cluster clients + informers, no global cross-cluster view).
authWrapper := webhook.NewDefaultAuthenticationInfoResolverWrapper(
Expand Down
21 changes: 18 additions & 3 deletions cmd/apiserver/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ type Extra struct {
AllowPrivileged bool
KubeletConfig kubeletclient.KubeletClientConfig
KubernetesServiceNodePort int
KubernetesServiceMode string
RootControlPlaneName string
ServiceCIDRSharingMode string
// ServiceClusterIPRange is mapped to input provided by user
ServiceClusterIPRanges string
// PrimaryServiceClusterIPRange and SecondaryServiceClusterIPRange are the results
Expand All @@ -64,6 +66,13 @@ type Extra struct {
MasterCount int
}

const (
ServiceCIDRSharingModeShared = "shared"
ServiceCIDRSharingModePerCluster = "per-cluster"
KubernetesServiceModeShared = "shared"
KubernetesServiceModePerClusterAutoIP = "per-cluster-autoip"
)

// NewServerRunOptions creates and returns ServerRunOptions according to the given featureGate and effectiveVersion of the server binary to run.
func NewServerRunOptions() *ServerRunOptions {
s := ServerRunOptions{
Expand All @@ -88,9 +97,11 @@ func NewServerRunOptions() *ServerRunOptions {
},
HTTPTimeout: time.Duration(5) * time.Second,
},
ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange,
MasterCount: 1,
RootControlPlaneName: mc.DefaultClusterName,
ServiceNodePortRange: kubeoptions.DefaultServiceNodePortRange,
MasterCount: 1,
RootControlPlaneName: mc.DefaultClusterName,
KubernetesServiceMode: KubernetesServiceModePerClusterAutoIP,
ServiceCIDRSharingMode: ServiceCIDRSharingModePerCluster,
},
}

Expand All @@ -108,6 +119,10 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) {
mcfs := fss.FlagSet("multicluster")
mcfs.StringVar(&s.RootControlPlaneName, "root-control-plane-name", s.RootControlPlaneName,
"Name of the root control plane that receives CLI flag configuration and is used for default cluster routing.")
mcfs.StringVar(&s.KubernetesServiceMode, "kubernetes-service-mode", s.KubernetesServiceMode,
"Kubernetes default service strategy across control planes: 'shared' (upstream root-only) or 'per-cluster-autoip' (create/reconcile default/kubernetes in each virtual control plane with allocator-assigned ClusterIP).")
mcfs.StringVar(&s.ServiceCIDRSharingMode, "service-cidr-sharing-mode", s.ServiceCIDRSharingMode,
"Service CIDR strategy across control planes: 'shared' keeps root-managed defaults only; 'per-cluster' bootstraps default ServiceCIDR in each virtual control plane.")

fs := fss.FlagSet("misc")

Expand Down
20 changes: 20 additions & 0 deletions cmd/apiserver/app/options/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,24 @@ func validateRootControlPlaneName(name string) []error {
return errs
}

func validateServiceCIDRSharingMode(mode string) []error {
switch mode {
case ServiceCIDRSharingModeShared, ServiceCIDRSharingModePerCluster:
return nil
default:
return []error{fmt.Errorf("--service-cidr-sharing-mode must be one of %q or %q", ServiceCIDRSharingModeShared, ServiceCIDRSharingModePerCluster)}
}
}

func validateKubernetesServiceMode(mode string) []error {
switch mode {
case KubernetesServiceModeShared, KubernetesServiceModePerClusterAutoIP:
return nil
default:
return []error{fmt.Errorf("--kubernetes-service-mode must be one of %q or %q", KubernetesServiceModeShared, KubernetesServiceModePerClusterAutoIP)}
}
}

func validatePublicIPServiceClusterIPRangeIPFamilies(extra Extra, generic genericoptions.ServerRunOptions) []error {
// The "kubernetes.default" Service is SingleStack based on the configured ServiceIPRange.
// If the bootstrap controller reconcile the kubernetes.default Service and Endpoints, it must
Expand Down Expand Up @@ -146,6 +164,8 @@ func (s CompletedOptions) Validate() []error {
errs = append(errs, validateClusterIPFlags(s.Extra)...)
errs = append(errs, validateServiceNodePort(s.Extra)...)
errs = append(errs, validateRootControlPlaneName(s.RootControlPlaneName)...)
errs = append(errs, validateKubernetesServiceMode(s.KubernetesServiceMode)...)
errs = append(errs, validateServiceCIDRSharingMode(s.ServiceCIDRSharingMode)...)
errs = append(errs, validatePublicIPServiceClusterIPRangeIPFamilies(s.Extra, *s.GenericServerRunOptions)...)

if s.MasterCount <= 0 {
Expand Down
16 changes: 15 additions & 1 deletion pkg/multicluster/admission/mutating.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"strings"

mcv1 "github.com/kplane-dev/apiserver/pkg/multicluster"
mcauth "github.com/kplane-dev/apiserver/pkg/multicluster/auth"
"k8s.io/apimachinery/pkg/api/meta"
apiserveradmission "k8s.io/apiserver/pkg/admission"
authenticationapi "k8s.io/kubernetes/pkg/apis/authentication"
)

const MutatingPluginName = "MulticlusterMutating"
Expand All @@ -26,8 +28,20 @@ func NewMutating(opts mcv1.Options) *Mutating {
func (m *Mutating) Handles(op apiserveradmission.Operation) bool { return m.Handler.Handles(op) }

func (m *Mutating) Admit(ctx context.Context, a apiserveradmission.Attributes, _ apiserveradmission.ObjectInterfaces) error {
// Skip non-persisted review objects (SAR/TokenReview), which require empty metadata
// TokenReview is non-persisted and expects empty metadata. Record token->cluster
// hint so synthetic auth requests in TokenReview storage can route correctly.
gvk := a.GetKind()
if gvk.Group == "authentication.k8s.io" && gvk.Kind == "TokenReview" {
if tr, ok := a.GetObject().(*authenticationapi.TokenReview); ok {
cid, _, _ := mcv1.FromContext(ctx)
if cid == "" {
cid = m.Options.DefaultCluster
}
mcauth.RememberTokenReviewHint(tr.Spec.Token, cid)
}
return nil
}
// Skip other non-persisted review objects (e.g. SAR), which require empty metadata.
if gvk.Group == "authorization.k8s.io" || gvk.Group == "authentication.k8s.io" || strings.HasSuffix(gvk.Kind, "Review") {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/multicluster/admission/namespace/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (m *Manager) envForCluster(clusterID string) (*clusterEnv, error) {

// Warm the namespaces informer (used by NamespaceLifecycle).
_ = inf.Core().V1().Namespaces().Informer()
inf.Start(stopCh)
m.clusters[clusterID] = e
return e, nil
}
1 change: 1 addition & 0 deletions pkg/multicluster/admission/webhook/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func (m *Manager) envForCluster(clusterID string) (*clusterEnv, error) {
_ = inf.Discovery().V1().EndpointSlices().Informer()
_ = inf.Admissionregistration().V1().MutatingWebhookConfigurations().Informer()
_ = inf.Admissionregistration().V1().ValidatingWebhookConfigurations().Informer()
inf.Start(stopCh)

// Start informers for resources needed by webhook admission.
go func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package mutating
import (
"crypto/sha256"
"encoding/hex"
"reflect"
"sync"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -76,6 +77,13 @@ func (c *webhookReinvokeContext) ShouldReinvokeWebhook(uid string) bool {
}

func objHash(obj runtime.Object) string {
if obj == nil {
return ""
}
v := reflect.ValueOf(obj)
if v.Kind() == reflect.Pointer && v.IsNil() {
return ""
}
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return ""
Expand Down
15 changes: 15 additions & 0 deletions pkg/multicluster/auth/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package auth
import (
"context"
"net/http"
"strings"

mc "github.com/kplane-dev/apiserver/pkg/multicluster"
"k8s.io/apiserver/pkg/authentication/authenticator"
Expand Down Expand Up @@ -41,6 +42,11 @@ func (c *ClusterAuthenticator) AuthenticateRequest(req *http.Request) (*authenti
return nil, false, nil
}
cid := clusterFromContext(req.Context())
if cid == "" {
if token := bearerTokenFromAuthHeader(req.Header.Get("Authorization")); token != "" {
cid = clusterHintForToken(token)
}
}
useRoot := cid == "" || cid == c.rootCluster
if useRoot && c.root != nil {
return c.root.AuthenticateRequest(req)
Expand Down Expand Up @@ -126,3 +132,12 @@ func clusterFromContext(ctx context.Context) string {
cid, _, _ := mc.FromContext(ctx)
return cid
}

func bearerTokenFromAuthHeader(authz string) string {
const prefix = "Bearer "
if !strings.HasPrefix(authz, prefix) {
return ""
}
token := strings.TrimSpace(strings.TrimPrefix(authz, prefix))
return token
}
24 changes: 24 additions & 0 deletions pkg/multicluster/auth/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,27 @@ func TestClusterAuthorizerDispatch(t *testing.T) {
t.Fatalf("expected resolver to see cluster c-3, got %q", lastCluster)
}
}

func TestClusterAuthenticatorUsesTokenHintWithoutClusterContext(t *testing.T) {
var called string
var lastCluster string

root := &fakeAuthenticator{name: "root", called: &called}
cluster := &fakeAuthenticator{name: "cluster", called: &called}
resolver := &fakeResolver{authn: cluster, lastCluster: &lastCluster}
dispatch := NewClusterAuthenticator("root", root, resolver)

token := "tok-" + t.Name()
rememberTokenReviewHint(token, "c-42")

req := httptest.NewRequest("GET", "http://example", nil)
req.Header.Set("Authorization", "Bearer "+token)
_, _, _ = dispatch.AuthenticateRequest(req)

if called != "cluster" {
t.Fatalf("expected cluster authenticator via token hint, got %q", called)
}
if lastCluster != "c-42" {
t.Fatalf("expected resolver cluster c-42, got %q", lastCluster)
}
}
3 changes: 2 additions & 1 deletion pkg/multicluster/auth/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (m *Manager) envForCluster(clusterID string) (*clusterEnv, error) {
return nil, fmt.Errorf("loopback informer pool is required for cluster auth")
}

cs, informers, _, err := m.opts.InformerPool.Get(clusterID)
cs, informers, stopCh, err := m.opts.InformerPool.Get(clusterID)
if err != nil {
m.mu.Unlock()
return nil, err
Expand All @@ -134,6 +134,7 @@ func (m *Manager) envForCluster(clusterID string) (*clusterEnv, error) {
m.mu.Unlock()
return nil, err
}
informers.Start(stopCh)

env := &clusterEnv{
cid: clusterID,
Expand Down
Loading