diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index 3ad71470cfb..99959d67009 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -86,6 +86,10 @@ type ServicesConfig struct { NodePortRange utilnet.PortRange IPRepairInterval time.Duration + + // ServiceRESTHook, if set, is called after the service REST storage is created. + // This allows external code to configure per-cluster allocators. + ServiceRESTHook func(rest *servicestore.REST) } type rangeRegistries struct { @@ -213,6 +217,9 @@ func (p *legacyProvider) NewRESTStorage(apiResourceConfigSource serverstorage.AP if err != nil { return genericapiserver.APIGroupInfo{}, err } + if p.Services.ServiceRESTHook != nil { + p.Services.ServiceRESTHook(serviceRESTStorage) + } storage := apiGroupInfo.VersionedResourcesStorageMap["v1"] if storage == nil { diff --git a/pkg/registry/core/service/allocator/storage/storage.go b/pkg/registry/core/service/allocator/storage/storage.go index 11601537b33..49a571db4db 100644 --- a/pkg/registry/core/service/allocator/storage/storage.go +++ b/pkg/registry/core/service/allocator/storage/storage.go @@ -60,6 +60,19 @@ type Etcd struct { var _ allocator.Interface = &Etcd{} var _ rangeallocation.RangeRegistry = &Etcd{} +// NewWithStorage returns an allocator backed by an existing storage.Interface. +// This allows callers to use a storage backend other than etcd (e.g. Spanner). +func NewWithStorage(alloc allocator.Snapshottable, baseKey string, + store storage.Interface, resource schema.GroupResource) *Etcd { + return &Etcd{ + alloc: alloc, + storage: store, + baseKey: baseKey, + resource: resource, + destroyFn: func() {}, + } +} + // NewEtcd returns an allocator that is backed by Etcd and can manage // persisting the snapshot state of allocation after each allocation is made. func NewEtcd(alloc allocator.Snapshottable, baseKey string, config *storagebackend.ConfigForResource) (*Etcd, error) { diff --git a/pkg/registry/core/service/storage/alloc.go b/pkg/registry/core/service/storage/alloc.go index 4fcbe442bc5..ef7cbea28c4 100644 --- a/pkg/registry/core/service/storage/alloc.go +++ b/pkg/registry/core/service/storage/alloc.go @@ -55,6 +55,11 @@ type ServiceNodePort struct { // This is a trasitionary function to facilitate service REST flattening. func makeAlloc(defaultFamily api.IPFamily, ipAllocs map[api.IPFamily]ipallocator.Interface, portAlloc portallocator.Interface) Allocators { + return MakeAllocators(defaultFamily, ipAllocs, portAlloc) +} + +// MakeAllocators constructs an Allocators from the given IP and port allocators. +func MakeAllocators(defaultFamily api.IPFamily, ipAllocs map[api.IPFamily]ipallocator.Interface, portAlloc portallocator.Interface) Allocators { return Allocators{ defaultServiceIPFamily: defaultFamily, serviceIPAllocatorsByFamily: ipAllocs, diff --git a/pkg/registry/core/service/storage/storage.go b/pkg/registry/core/service/storage/storage.go index 2eb0aabfe5b..4bb1ba1bd67 100644 --- a/pkg/registry/core/service/storage/storage.go +++ b/pkg/registry/core/service/storage/storage.go @@ -62,6 +62,9 @@ type REST struct { primaryIPFamily api.IPFamily secondaryIPFamily api.IPFamily alloc Allocators + // ClusterAllocators, if set, returns per-cluster allocators resolved from + // the request context. When nil, the shared alloc field is used. + ClusterAllocators func(ctx context.Context) *Allocators endpoints EndpointsStorage pods PodStorage proxyTransport http.RoundTripper @@ -330,7 +333,18 @@ func (r *REST) defaultOnReadIPFamilies(service *api.Service) { } } -func (r *REST) afterDelete(obj runtime.Object, options *metav1.DeleteOptions) { +// allocForContext returns per-cluster allocators when ClusterAllocators is set, +// falling back to the shared alloc field. +func (r *REST) allocForContext(ctx context.Context) *Allocators { + if r.ClusterAllocators != nil { + if a := r.ClusterAllocators(ctx); a != nil { + return a + } + } + return &r.alloc +} + +func (r *REST) afterDelete(ctx context.Context, obj runtime.Object, options *metav1.DeleteOptions) { svc := obj.(*api.Service) // Normally this defaulting is done automatically, but the hook (Decorator) @@ -340,19 +354,17 @@ func (r *REST) afterDelete(obj runtime.Object, options *metav1.DeleteOptions) { // Only perform the cleanup if this is a non-dryrun deletion if !dryrun.IsDryRun(options.DryRun) { - // It would be better if we had the caller context, but that changes - // this hook signature. - ctx := genericapirequest.WithNamespace(genericapirequest.NewContext(), svc.Namespace) + nsCtx := genericapirequest.WithNamespace(genericapirequest.NewContext(), svc.Namespace) // TODO: This is clumsy. It was added for fear that the endpoints // controller might lag, and we could end up rusing the service name // with old endpoints. We should solve that better and remove this, or // else we should do this for EndpointSlice, too. - _, _, err := r.endpoints.Delete(ctx, svc.Name, rest.ValidateAllObjectFunc, &metav1.DeleteOptions{}) + _, _, err := r.endpoints.Delete(nsCtx, svc.Name, rest.ValidateAllObjectFunc, &metav1.DeleteOptions{}) if err != nil && !errors.IsNotFound(err) { klog.Errorf("delete service endpoints %s/%s failed: %v", svc.Name, svc.Namespace, err) } - r.alloc.releaseAllocatedResources(svc) + r.allocForContext(ctx).releaseAllocatedResources(svc) } } @@ -368,7 +380,7 @@ func (r *REST) beginCreate(ctx context.Context, obj runtime.Object, options *met // it manually. This has to happen here and not in any earlier hooks (e.g. // defaulting) because it needs to be aware of flags and be able to access // API storage. - txn, err := r.alloc.allocateCreate(svc, dryrun.IsDryRun(options.DryRun)) + txn, err := r.allocForContext(ctx).allocateCreate(svc, dryrun.IsDryRun(options.DryRun)) if err != nil { return nil, err } @@ -403,7 +415,7 @@ func (r *REST) beginUpdate(ctx context.Context, obj, oldObj runtime.Object, opti normalizeClusterIPs(After{newSvc}, Before{oldSvc}) // Allocate and initialize fields. - txn, err := r.alloc.allocateUpdate(After{newSvc}, Before{oldSvc}, dryrun.IsDryRun(options.DryRun)) + txn, err := r.allocForContext(ctx).allocateUpdate(After{newSvc}, Before{oldSvc}, dryrun.IsDryRun(options.DryRun)) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 7e7e67536d7..3e23b829f40 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -60,7 +60,7 @@ import ( type FinishFunc func(ctx context.Context, success bool) // AfterDeleteFunc is the type used for the Store.AfterDelete hook. -type AfterDeleteFunc func(obj runtime.Object, options *metav1.DeleteOptions) +type AfterDeleteFunc func(ctx context.Context, obj runtime.Object, options *metav1.DeleteOptions) // BeginCreateFunc is the type used for the Store.BeginCreate hook. type BeginCreateFunc func(ctx context.Context, obj runtime.Object, options *metav1.CreateOptions) (FinishFunc, error) @@ -1385,7 +1385,7 @@ func (e *Store) DeleteCollection(ctx context.Context, deleteValidation rest.Vali // returns the decorated deleted object if appropriate. func (e *Store) finalizeDelete(ctx context.Context, obj runtime.Object, runHooks bool, options *metav1.DeleteOptions) (runtime.Object, error) { if runHooks && e.AfterDelete != nil { - e.AfterDelete(obj, options) + e.AfterDelete(ctx, obj, options) } if e.ReturnDeletedObject { if e.Decorator != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go index bb90542a4f3..97966a4a1a4 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go @@ -1318,7 +1318,7 @@ func TestStoreDelete(t *testing.T) { defer destroyFunc() afterWasCalled := false - registry.AfterDelete = func(obj runtime.Object, options *metav1.DeleteOptions) { + registry.AfterDelete = func(_ context.Context, obj runtime.Object, options *metav1.DeleteOptions) { afterWasCalled = true } @@ -1464,7 +1464,7 @@ func TestGracefulStoreHandleFinalizers(t *testing.T) { defer destroyFunc() afterWasCalled := false - registry.AfterDelete = func(obj runtime.Object, options *metav1.DeleteOptions) { + registry.AfterDelete = func(_ context.Context, obj runtime.Object, options *metav1.DeleteOptions) { afterWasCalled = true } @@ -1546,7 +1546,7 @@ func TestNonGracefulStoreHandleFinalizers(t *testing.T) { defer destroyFunc() afterWasCalled := false - registry.AfterDelete = func(obj runtime.Object, options *metav1.DeleteOptions) { + registry.AfterDelete = func(_ context.Context, obj runtime.Object, options *metav1.DeleteOptions) { afterWasCalled = true } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go index a4802cd5e8f..969e8b9282f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go @@ -31,7 +31,6 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" - "k8s.io/apiserver/pkg/storage/etcd3" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/consistencydetector" @@ -109,7 +108,7 @@ func (lw *listerWatcher) List(options metav1.ListOptions) (runtime.Object, error var keyMap map[string]string if lw.identityFromKey != nil && lw.wrapObject != nil { keyMap = make(map[string]string) - ctx = etcd3.WithDecodeCallback(ctx, func(obj runtime.Object, storageKey string, modRev int64) { + ctx = storage.WithDecodeCallback(ctx, func(obj runtime.Object, storageKey string, modRev int64) { accessor, err := meta.Accessor(obj) if err != nil { return diff --git a/staging/src/k8s.io/apiserver/pkg/storage/decode_callback.go b/staging/src/k8s.io/apiserver/pkg/storage/decode_callback.go new file mode 100644 index 00000000000..5530d3b0e6c --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/decode_callback.go @@ -0,0 +1,42 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "context" + + "k8s.io/apimachinery/pkg/runtime" +) + +type decodeCallbackKeyType struct{} + +// DecodeCallback is called for each item decoded during GetList, +// providing the decoded object, its storage-relative key (backend prefix +// stripped), and the modification revision. +type DecodeCallback func(obj runtime.Object, storageKey string, modRevision int64) + +// WithDecodeCallback returns a context that carries a DecodeCallback. +// The callback will be invoked for each item decoded in GetList. +func WithDecodeCallback(ctx context.Context, cb DecodeCallback) context.Context { + return context.WithValue(ctx, decodeCallbackKeyType{}, cb) +} + +// DecodeCallbackFromContext extracts the DecodeCallback from the context, if any. +func DecodeCallbackFromContext(ctx context.Context) DecodeCallback { + cb, _ := ctx.Value(decodeCallbackKeyType{}).(DecodeCallback) + return cb +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/decode_callback.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/decode_callback.go index 0e3514e53c3..624338f6955 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/decode_callback.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/decode_callback.go @@ -19,23 +19,19 @@ package etcd3 import ( "context" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/storage" ) -type decodeCallbackKeyType struct{} +// DecodeCallback is an alias for storage.DecodeCallback. +// Deprecated: use storage.DecodeCallback directly. +type DecodeCallback = storage.DecodeCallback -// DecodeCallback is called for each item decoded during GetList, -// providing the decoded object, its storage-relative key (etcd prefix -// stripped), and the etcd mod revision. -type DecodeCallback func(obj runtime.Object, storageKey string, modRevision int64) - -// WithDecodeCallback returns a context that carries a DecodeCallback. -// The callback will be invoked for each item decoded in GetList. +// WithDecodeCallback is an alias for storage.WithDecodeCallback. +// Deprecated: use storage.WithDecodeCallback directly. func WithDecodeCallback(ctx context.Context, cb DecodeCallback) context.Context { - return context.WithValue(ctx, decodeCallbackKeyType{}, cb) + return storage.WithDecodeCallback(ctx, cb) } func decodeCallbackFromContext(ctx context.Context) DecodeCallback { - cb, _ := ctx.Value(decodeCallbackKeyType{}).(DecodeCallback) - return cb + return storage.DecodeCallbackFromContext(ctx) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go b/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go index e664d22a646..a6373b11835 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go @@ -41,6 +41,15 @@ var ( DefaultFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker() ) +// SetFeatureSupported explicitly marks a feature as supported in the default +// checker. Used by non-etcd storage backends (e.g. Spanner) that implement +// features like RequestWatchProgress natively. +func SetFeatureSupported(feature storage.Feature, supported bool) { + if d, ok := DefaultFeatureSupportChecker.(*defaultFeatureSupportChecker); ok { + d.SetSupported(feature, supported) + } +} + // FeatureSupportChecker to define Supports functions. type FeatureSupportChecker interface { // Supports check if the feature is supported or not by checking internal cache. @@ -61,12 +70,17 @@ type FeatureSupportChecker interface { type defaultFeatureSupportChecker struct { lock sync.Mutex progressNotifySupported *bool - checkingEndpoint map[string]struct{} + // forcedFeatures overrides etcd-detected support. Once a feature is + // force-set, CheckClient cannot override it. Used by non-etcd storage + // backends (e.g. Spanner) that implement features natively. + forcedFeatures map[storage.Feature]bool + checkingEndpoint map[string]struct{} } func newDefaultFeatureSupportChecker() *defaultFeatureSupportChecker { return &defaultFeatureSupportChecker{ checkingEndpoint: make(map[string]struct{}), + forcedFeatures: make(map[storage.Feature]bool), } } @@ -77,6 +91,9 @@ func (f *defaultFeatureSupportChecker) Supports(feature storage.Feature) bool { f.lock.Lock() defer f.lock.Unlock() + if v, ok := f.forcedFeatures[feature]; ok { + return v + } return ptr.Deref(f.progressNotifySupported, false) default: runtime.HandleError(fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature)) @@ -84,6 +101,16 @@ func (f *defaultFeatureSupportChecker) Supports(feature storage.Feature) bool { } } +// SetSupported explicitly marks a feature as supported, overriding any +// etcd-detected value. Once set, CheckClient cannot override it. This is +// used by non-etcd storage backends (e.g. Spanner) that implement the +// feature natively. +func (f *defaultFeatureSupportChecker) SetSupported(feature storage.Feature, supported bool) { + f.lock.Lock() + defer f.lock.Unlock() + f.forcedFeatures[feature] = supported +} + // CheckClient accepts client and calculate the support per endpoint and caches it. func (f *defaultFeatureSupportChecker) CheckClient(ctx context.Context, c client, feature storage.Feature) { switch feature {