diff --git a/go.mod b/go.mod index a0cb99be1..0b8905654 100644 --- a/go.mod +++ b/go.mod @@ -117,3 +117,5 @@ require ( sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect sigs.k8s.io/yaml v1.6.0 // indirect ) + +replace github.com/openshift/library-go => github.com/bentito/library-go v0.0.0-20260326212156-ef7716e77898 diff --git a/go.sum b/go.sum index 6d4b34285..13e4a14fc 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,8 @@ github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8 github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= github.com/bcicen/go-haproxy v0.0.0-20180203142132-ff5824fe38be h1:7b8/p7wJ3N/OjXbZmwJdyR9eOoCla5SbUIe18WHio5s= github.com/bcicen/go-haproxy v0.0.0-20180203142132-ff5824fe38be/go.mod h1:MxVpaKTkNjZu5awzzr6mk6CIKaZYUFGxbmNwMvyVfeM= +github.com/bentito/library-go v0.0.0-20260326212156-ef7716e77898 h1:nzgD5pjb18eOkkf/FeXaNofDNHsG5JqJl3JsbFFgH34= +github.com/bentito/library-go v0.0.0-20260326212156-ef7716e77898/go.mod h1:K3FoNLgNBFYbFuG+Kr8usAnQxj1w84XogyUp2M8rK8k= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM= @@ -184,8 +186,6 @@ github.com/openshift/api v0.0.0-20260212193555-c06ab675261f h1:l1IgsK48Ym/nED30y github.com/openshift/api v0.0.0-20260212193555-c06ab675261f/go.mod h1:d5uzF0YN2nQQFA0jIEWzzOZ+edmo6wzlGLvx5Fhz4uY= github.com/openshift/client-go v0.0.0-20260108185524-48f4ccfc4e13 h1:6rd4zSo2UaWQcAPZfHK9yzKVqH0BnMv1hqMzqXZyTds= github.com/openshift/client-go v0.0.0-20260108185524-48f4ccfc4e13/go.mod h1:YvOmPmV7wcJxpfhTDuFqqs2Xpb3M3ovsM6Qs/i2ptq4= -github.com/openshift/library-go v0.0.0-20260223145824-7b234b47a906 h1:PkG3CmlU8+HtlW1rspnhwhbKki8rrwYN+L26aH11t2E= -github.com/openshift/library-go v0.0.0-20260223145824-7b234b47a906/go.mod h1:K3FoNLgNBFYbFuG+Kr8usAnQxj1w84XogyUp2M8rK8k= github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde/go.mod h1:nZgzbfBr3hhjoZnS66nKrHmduYNpc34ny7RK4z5/HM0= github.com/pkg/profile v1.7.0 h1:hnbDkaNWPCLMO9wGLdBFTIZvzDrDfBM2072E1S9gJkA= github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDjvNo= diff --git a/hack/Makefile.debug b/hack/Makefile.debug index d0d1c0e34..9f3f760fd 100644 --- a/hack/Makefile.debug +++ b/hack/Makefile.debug @@ -1,6 +1,7 @@ # -*- mode: makefile -*- export GOOS=linux +export GOARCH ?= amd64 REGISTRY ?= quay.io IMAGE ?= openshift/openshift-router @@ -11,7 +12,7 @@ REMOTE_IMAGE ?= $(REGISTRY)/$(IMAGE):$(TAG) OPENSHIFT_ENDPOINT ?= $(shell oc config view --minify --template '{{(index .clusters 0).cluster.server}}' | grep -o '//[^ :]*' | sed 's/^..//') new-openshift-router-image: - GO111MODULE=on CGO_ENABLED=0 GOFLAGS=-mod=vendor go build -o openshift-router -gcflags=all="-N -l" ./cmd/openshift-router + GO111MODULE=on CGO_ENABLED=0 GOOS=$(GOOS) GOARCH=$(GOARCH) GOFLAGS=-mod=vendor go build -o openshift-router -gcflags=all="-N -l" ./cmd/openshift-router $(IMAGEBUILDER) build -t $(LOCAL_IMAGE) -f hack/Dockerfile.debug . push: diff --git a/pkg/cmd/infra/router/clientcmd.go b/pkg/cmd/infra/router/clientcmd.go index d3d59135c..ddad1f314 100644 --- a/pkg/cmd/infra/router/clientcmd.go +++ b/pkg/cmd/infra/router/clientcmd.go @@ -60,6 +60,13 @@ func (cfg *Config) KubeConfig() (*restclient.Config, string, error) { if err != nil { return nil, "", err } + + // Increase client-side rate limiting to support higher throughput during + // router startup, especially when many external certificate routes are + // present. + clientConfig.QPS = 50 + clientConfig.Burst = 100 + return clientConfig, namespace, nil } diff --git a/pkg/cmd/infra/router/template.go b/pkg/cmd/infra/router/template.go index 2505e43ca..6c47a8895 100644 --- a/pkg/cmd/infra/router/template.go +++ b/pkg/cmd/infra/router/template.go @@ -36,12 +36,12 @@ import ( routelisters "github.com/openshift/client-go/route/listers/route/v1" "github.com/openshift/library-go/pkg/crypto" "github.com/openshift/library-go/pkg/proc" - "github.com/openshift/library-go/pkg/route/secretmanager" "github.com/openshift/router/pkg/router" "github.com/openshift/router/pkg/router/controller" "github.com/openshift/router/pkg/router/metrics" "github.com/openshift/router/pkg/router/metrics/haproxy" + "github.com/openshift/router/pkg/router/routeapihelpers" "github.com/openshift/router/pkg/router/shutdown" templateplugin "github.com/openshift/router/pkg/router/template" haproxyconfigmanager "github.com/openshift/router/pkg/router/template/configmanager/haproxy" @@ -755,7 +755,7 @@ func (o *TemplateRouterOptions) Run(stopCh <-chan struct{}) error { return err } - secretManager := secretmanager.NewManager(kc, nil) + secretManager := controller.NewSharedSecretManager(kc, nil) pluginCfg := templateplugin.TemplatePluginConfig{ WorkingDir: o.WorkingDir, @@ -800,7 +800,7 @@ func (o *TemplateRouterOptions) Run(stopCh <-chan struct{}) error { informer := factory.CreateRoutesSharedInformer() routeLister := routelisters.NewRouteLister(informer.GetIndexer()) if o.UpdateStatus { - lease := writerlease.New(time.Minute, 3*time.Second) + lease := writerlease.New(time.Minute, 3*time.Second, routeapihelpers.MaxConcurrentSARChecks) go lease.Run(stopCh) tracker := controller.NewSimpleContentionTracker(informer, o.RouterName, o.ResyncInterval/10) tracker.SetConflictMessage(fmt.Sprintf("The router detected another process is writing conflicting updates to route status with name %q. Please ensure that the configuration of all routers is consistent. Route status will not be updated as long as conflicts are detected.", o.RouterName)) diff --git a/pkg/router/controller/route_secret_manager.go b/pkg/router/controller/route_secret_manager.go index 6aa3d2639..f3413dc27 100644 --- a/pkg/router/controller/route_secret_manager.go +++ b/pkg/router/controller/route_secret_manager.go @@ -111,10 +111,15 @@ func (p *RouteSecretManager) HandleRoute(eventType watch.EventType, route *route case watch.Added: // register with secret monitor if hasExternalCertificate(route) { - log.V(4).Info("Validating and registering external certificate", "namespace", route.Namespace, "secret", route.Spec.TLS.ExternalCertificate.Name, "route", route.Name) - if err := p.validateAndRegister(route); err != nil { - return err - } + go func(route *routev1.Route) { + log.V(4).Info("Validating and registering external certificate (async)", "namespace", route.Namespace, "secret", route.Spec.TLS.ExternalCertificate.Name, "route", route.Name) + if err := p.validateAndRegister(route); err != nil { + log.Error(err, "failed to validate and register external certificate", "namespace", route.Namespace, "route", route.Name) + return + } + p.plugin.HandleRoute(watch.Added, route) + }(route.DeepCopy()) + return nil } case watch.Modified: @@ -127,14 +132,21 @@ func (p *RouteSecretManager) HandleRoute(eventType watch.EventType, route *route // Both new and old routes have externalCertificate if oldSecret != route.Spec.TLS.ExternalCertificate.Name { // ExternalCertificate is updated - log.V(4).Info("Validating and registering updated external certificate", "namespace", route.Namespace, "oldSecret", oldSecret, "newSecret", route.Spec.TLS.ExternalCertificate.Name, "route", route.Name) + log.V(4).Info("Unregistering old external certificate", "namespace", route.Namespace, "oldSecret", oldSecret, "route", route.Name) // Unregister the old and register the new external certificate if err := p.unregister(route); err != nil { return err } - if err := p.validateAndRegister(route); err != nil { - return err - } + + go func(route *routev1.Route) { + log.V(4).Info("Validating and registering updated external certificate (async)", "namespace", route.Namespace, "newSecret", route.Spec.TLS.ExternalCertificate.Name, "route", route.Name) + if err := p.validateAndRegister(route); err != nil { + log.Error(err, "failed to validate and register updated external certificate", "namespace", route.Namespace, "route", route.Name) + return + } + p.plugin.HandleRoute(watch.Modified, route) + }(route.DeepCopy()) + return nil } else { // ExternalCertificate is not updated // Re-validate and update the in-memory TLS certificate and key (even if ExternalCertificate remains unchanged) @@ -156,24 +168,35 @@ func (p *RouteSecretManager) HandleRoute(eventType watch.EventType, route *route // // Therefore, it is essential to re-sync the secret to ensure the plugin chain correctly handles the route. - log.V(4).Info("Re-validating existing external certificate", "namespace", route.Namespace, "secret", oldSecret, "route", route.Name) - // re-validate - if err := p.validate(route); err != nil { - return err - } - // read referenced secret and update TLS certificate and key - if err := p.populateRouteTLSFromSecret(route); err != nil { - return err - } + go func(route *routev1.Route) { + log.V(4).Info("Re-validating existing external certificate (async)", "namespace", route.Namespace, "secret", oldSecret, "route", route.Name) + // re-validate (synchronous, throttled by semaphore) + if err := p.validate(route); err != nil { + log.Error(err, "failed to re-validate external certificate", "namespace", route.Namespace, "route", route.Name) + return + } + // read referenced secret and update TLS certificate and key + if err := p.populateRouteTLSFromSecret(route); err != nil { + log.Error(err, "failed to populate TLS from secret", "namespace", route.Namespace, "route", route.Name) + return + } + p.plugin.HandleRoute(watch.Modified, route) + }(route.DeepCopy()) + return nil } case newHasExt && !oldHadExt: // New route has externalCertificate, old route did not - log.V(4).Info("Validating and registering new external certificate", "namespace", route.Namespace, "secret", route.Spec.TLS.ExternalCertificate.Name, "route", route.Name) - // register with secret monitor - if err := p.validateAndRegister(route); err != nil { - return err - } + go func(route *routev1.Route) { + log.V(4).Info("Validating and registering new external certificate (async)", "namespace", route.Namespace, "secret", route.Spec.TLS.ExternalCertificate.Name, "route", route.Name) + // register with secret monitor + if err := p.validateAndRegister(route); err != nil { + log.Error(err, "failed to validate and register new external certificate", "namespace", route.Namespace, "route", route.Name) + return + } + p.plugin.HandleRoute(watch.Modified, route) + }(route.DeepCopy()) + return nil case !newHasExt && oldHadExt: // Old route had externalCertificate, new route does not @@ -204,15 +227,18 @@ func (p *RouteSecretManager) HandleRoute(eventType watch.EventType, route *route // validateAndRegister validates the route's externalCertificate configuration and registers it with the secret manager. // It also updates the in-memory TLS certificate and key after reading from secret informer's cache. func (p *RouteSecretManager) validateAndRegister(route *routev1.Route) error { - // validate + // validate (synchronous, throttled by semaphore) if err := p.validate(route); err != nil { return err } // register route with secretManager handler := p.generateSecretHandler(route.Namespace, route.Name) if err := p.secretManager.RegisterRoute(context.TODO(), route.Namespace, route.Name, route.Spec.TLS.ExternalCertificate.Name, handler); err != nil { + p.recorder.RecordRouteRejection(route, ExtCrtStatusReasonGetFailed, err.Error()) + p.plugin.HandleRoute(watch.Deleted, route) return fmt.Errorf("failed to register router: %w", err) } + // read referenced secret and update TLS certificate and key if err := p.populateRouteTLSFromSecret(route); err != nil { return err @@ -260,6 +286,7 @@ func (p *RouteSecretManager) generateSecretHandler(namespace, routeName string) AddFunc: func(obj interface{}) { secret := obj.(*kapi.Secret) log.V(4).Info("Secret added for route", "namespace", namespace, "secret", secret.Name, "route", routeName) + routeapihelpers.InvalidateAsyncSARCache(namespace, secret.Name) // Secret re-creation scenario // Check if the route key exists in the deletedSecrets map, indicating that the secret was previously deleted for this route. @@ -287,8 +314,12 @@ func (p *RouteSecretManager) generateSecretHandler(namespace, routeName string) UpdateFunc: func(old interface{}, new interface{}) { secretOld := old.(*kapi.Secret) secretNew := new.(*kapi.Secret) + if secretOld.ResourceVersion == secretNew.ResourceVersion { + return + } key := generateKey(namespace, routeName) log.V(4).Info("Secret updated for route", "namespace", namespace, "secret", secretNew.Name, "oldSecretVersion", secretOld.ResourceVersion, "newSecretVersion", secretNew.ResourceVersion, "route", routeName) + routeapihelpers.InvalidateAsyncSARCache(namespace, secretNew.Name) // Ensure fetching the updated route route, err := p.routelister.Routes(namespace).Get(routeName) @@ -309,10 +340,23 @@ func (p *RouteSecretManager) generateSecretHandler(namespace, routeName string) }, DeleteFunc: func(obj interface{}) { - secret := obj.(*kapi.Secret) + secret, ok := obj.(*kapi.Secret) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + log.Error(nil, "Couldn't get object from tombstone", "obj", obj) + return + } + secret, ok = tombstone.Obj.(*kapi.Secret) + if !ok { + log.Error(nil, "Tombstone contained object that is not a secret", "obj", tombstone.Obj) + return + } + } key := generateKey(namespace, routeName) msg := fmt.Sprintf("secret %q deleted for route %q", secret.Name, key) log.V(4).Info(msg) + routeapihelpers.InvalidateAsyncSARCache(namespace, secret.Name) // keep the secret monitor active and mark the secret as deleted for this route. p.deletedSecrets.Store(key, true) @@ -334,10 +378,14 @@ func (p *RouteSecretManager) generateSecretHandler(namespace, routeName string) // If the validation fails, it records the route rejection and triggers // the deletion of the route by calling the HandleRoute method with a watch.Deleted event. // +// This function is synchronous: it blocks until the SAR check completes. +// Concurrency is throttled by the semaphore in ValidateTLSExternalCertificate. +// // NOTE: TLS data validation and sanitization are handled by the next plugin `ExtendedValidator`, // by reading the "tls.crt" and "tls.key" added by populateRouteTLSFromSecret. func (p *RouteSecretManager) validate(route *routev1.Route) error { fldPath := field.NewPath("spec").Child("tls").Child("externalCertificate") + if err := routeapihelpers.ValidateTLSExternalCertificate(route, fldPath, p.sarClient, p.secretsGetter).ToAggregate(); err != nil { log.Error(err, "skipping route due to invalid externalCertificate configuration", "namespace", route.Namespace, "route", route.Name) p.recorder.RecordRouteRejection(route, ExtCrtStatusReasonValidationFailed, err.Error()) @@ -352,7 +400,9 @@ func (p *RouteSecretManager) validate(route *routev1.Route) error { // the deletion of the route by calling the HandleRoute method with a watch.Deleted event. // Note: This function performs an in-place update of the route. The caller should be aware that the route's TLS configuration will be modified directly. func (p *RouteSecretManager) populateRouteTLSFromSecret(route *routev1.Route) error { - // read referenced secret + // read referenced secret from the informer cache. + // RegisterRoute blocks until cache sync completes, so the secret should be + // available immediately after registration. secret, err := p.secretManager.GetSecret(context.TODO(), route.Namespace, route.Name) if err != nil { log.Error(err, "failed to get referenced secret") diff --git a/pkg/router/controller/route_secret_manager_test.go b/pkg/router/controller/route_secret_manager_test.go index e9e42e7f0..9f172ee00 100644 --- a/pkg/router/controller/route_secret_manager_test.go +++ b/pkg/router/controller/route_secret_manager_test.go @@ -5,10 +5,13 @@ import ( "fmt" "os" "reflect" + "sync" "testing" + "time" "github.com/openshift/library-go/pkg/route/secretmanager/fake" "github.com/openshift/router/pkg/router" + "github.com/openshift/router/pkg/router/routeapihelpers" routev1 "github.com/openshift/api/route/v1" authorizationv1 "k8s.io/api/authorization/v1" @@ -113,6 +116,7 @@ func (p *fakePluginDone) Commit() error { var _ router.Plugin = &fakePluginDone{} type statusRecorder struct { + sync.Mutex rejections []string updates []string unservableInFutureVersions map[string]string @@ -123,22 +127,58 @@ func (r *statusRecorder) routeKey(route *routev1.Route) string { return route.Namespace + "-" + route.Name } func (r *statusRecorder) RecordRouteRejection(route *routev1.Route, reason, message string) { - defer close(r.doneCh) + r.Lock() + defer r.Unlock() + if r.doneCh != nil { + select { + case <-r.doneCh: + default: + close(r.doneCh) + } + } r.rejections = append(r.rejections, fmt.Sprintf("%s:%s", r.routeKey(route), reason)) } func (r *statusRecorder) RecordRouteUpdate(route *routev1.Route, reason, message string) { - defer close(r.doneCh) + r.Lock() + defer r.Unlock() + if r.doneCh != nil { + select { + case <-r.doneCh: + default: + close(r.doneCh) + } + } r.updates = append(r.updates, fmt.Sprintf("%s:%s", r.routeKey(route), reason)) } func (r *statusRecorder) RecordRouteUnservableInFutureVersionsClear(route *routev1.Route) { + r.Lock() + defer r.Unlock() delete(r.unservableInFutureVersions, r.routeKey(route)) } func (r *statusRecorder) RecordRouteUnservableInFutureVersions(route *routev1.Route, reason, message string) { + r.Lock() + defer r.Unlock() r.unservableInFutureVersions[r.routeKey(route)] = reason } +func (r *statusRecorder) GetRejections() []string { + r.Lock() + defer r.Unlock() + var res []string + res = append(res, r.rejections...) + return res +} + +func (r *statusRecorder) GetUpdates() []string { + r.Lock() + defer r.Unlock() + var res []string + res = append(res, r.updates...) + return res +} + var _ RouteStatusRecorder = &statusRecorder{} func TestRouteSecretManager(t *testing.T) { @@ -297,6 +337,9 @@ func TestRouteSecretManager(t *testing.T) { eventType: watch.Added, allow: true, expectedError: true, + expectedRejections: []string{ + "sandbox-route-test:ExternalCertificateGetFailed", + }, }, { name: "route added with externalCertificate allowed and correct secret", @@ -514,6 +557,9 @@ func TestRouteSecretManager(t *testing.T) { allow: true, eventType: watch.Modified, expectedError: true, + expectedRejections: []string{ + "sandbox-route-test:ExternalCertificateGetFailed", + }, }, { name: "route updated: old route without externalCertificate, new route with externalCertificate allowed and correct secret", @@ -1131,14 +1177,45 @@ func TestRouteSecretManager(t *testing.T) { for _, s := range scenarios { t.Run(s.name, func(t *testing.T) { - p := &fakePlugin{} + routeapihelpers.ClearAsyncSARCacheForTest() + p := &fakePlugin{ + doneCh: make(chan struct{}), + } recorder := &statusRecorder{ doneCh: make(chan struct{}), } - rsm := NewRouteSecretManager(p, recorder, &s.secretManager, testRouterName, &testSecretGetter{namespace: s.route.Namespace, secret: s.secretManager.Secret}, &routeLister{}, &testSARCreator{allow: s.allow}) + rsm := NewRouteSecretManager(p, recorder, &s.secretManager, testRouterName, &testSecretGetter{namespace: s.route.Namespace, secret: s.secretManager.Secret}, &routeLister{items: []*routev1.Route{s.route}}, &testSARCreator{allow: s.allow}) gotErr := rsm.HandleRoute(s.eventType, s.route) - if (gotErr != nil) != s.expectedError { + + // For routes with external certificates (except deletion), the processing is asynchronous. + // We need to wait for the background goroutine to complete (either by calling the next plugin + // or by recording a rejection), UNLESS an error occurred synchronously. + isAsync := s.eventType != watch.Deleted && hasExternalCertificate(s.route) + if isAsync && gotErr == nil { + select { + case <-p.doneCh: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for async route processing") + } + // The synchronous HandleRoute call always returns nil for async routes if it successfully launched the goroutine. + // If we expected an error, check if a rejection was recorded instead. + if s.expectedError { + if len(recorder.rejections) == 0 { + t.Fatalf("expected a rejection to be recorded for async route, but got none") + } + // On failure, the plugin is called with watch.Deleted. + // We update the local expected state to match this behavior for the deep equal check below. + if s.expectedRoute == nil { + p.route = nil + } + if s.expectedEventType == "" { + p.t = "" + } + } + } + + if !isAsync && (gotErr != nil) != s.expectedError { t.Fatalf("expected error to be %t, but got %t", s.expectedError, gotErr != nil) } if !reflect.DeepEqual(s.expectedRoute, p.route) { @@ -1147,8 +1224,8 @@ func TestRouteSecretManager(t *testing.T) { if s.expectedEventType != p.t { t.Fatalf("expected %s event for next plugin, but got %s", s.expectedEventType, p.t) } - if !reflect.DeepEqual(s.expectedRejections, recorder.rejections) { - t.Fatalf("expected rejections %v, but got %v", s.expectedRejections, recorder.rejections) + if !reflect.DeepEqual(s.expectedRejections, recorder.GetRejections()) { + t.Fatalf("expected rejections %v, but got %v", s.expectedRejections, recorder.GetRejections()) } if _, exists := rsm.deletedSecrets.Load(generateKey(s.route.Namespace, s.route.Name)); exists { t.Fatalf("expected deletedSecrets to not have %q key", generateKey(s.route.Namespace, s.route.Name)) @@ -1278,6 +1355,7 @@ func TestSecretUpdate(t *testing.T) { // update the secret updatedSecret := secret.DeepCopy() + updatedSecret.ResourceVersion = "2" // Increment so UpdateFunc processes it updatedSecret.Data = map[string][]byte{ "tls.crt": []byte("my-crt"), "tls.key": []byte("my-key"), @@ -1286,20 +1364,21 @@ func TestSecretUpdate(t *testing.T) { t.Fatalf("failed to update secret: %v", err) } - // wait until route's status is updated - <-recorder.doneCh + // Wait directly for the Update event. + <-recorder.doneCh // wait for Update event expectedStatus := []string{"sandbox-route-test:ExternalCertificateSecretUpdated"} + expectedRejectionsOnUpdate := []string{"sandbox-route-test:ExternalCertificateSecretUpdated"} if s.isRouteAdmittedTrue { // RecordRouteUpdate will be called if `Admitted=True` - if !reflect.DeepEqual(expectedStatus, recorder.updates) { - t.Fatalf("expected status %v, but got %v", expectedStatus, recorder.updates) + if !reflect.DeepEqual(expectedStatus, recorder.GetUpdates()) { + t.Fatalf("expected status %v, but got %v", expectedStatus, recorder.GetUpdates()) } } else { // RecordRouteRejection will be called if `Admitted=False` - if !reflect.DeepEqual(expectedStatus, recorder.rejections) { - t.Fatalf("expected status %v, but got %v", expectedStatus, recorder.rejections) + if !reflect.DeepEqual(expectedRejectionsOnUpdate, recorder.GetRejections()) { + t.Fatalf("expected status %v, but got %v", expectedRejectionsOnUpdate, recorder.GetRejections()) } } @@ -1352,13 +1431,15 @@ func TestSecretDelete(t *testing.T) { t.Fatalf("failed to delete secret: %v", err) } - <-recorder.doneCh // wait until the route's status is updated + <-recorder.doneCh // wait until the route's status is updated (deletion) - expectedRejections := []string{"sandbox-route-test:ExternalCertificateSecretDeleted"} + expectedRejections := []string{ + "sandbox-route-test:ExternalCertificateSecretDeleted", + } expectedDeletedSecrets := true - if !reflect.DeepEqual(expectedRejections, recorder.rejections) { - t.Fatalf("expected rejections %v, but got %v", expectedRejections, recorder.rejections) + if !reflect.DeepEqual(expectedRejections, recorder.GetRejections()) { + t.Fatalf("expected rejections %v, but got %v", expectedRejections, recorder.GetRejections()) } if val, _ := rsm.deletedSecrets.Load(generateKey(route.Namespace, route.Name)); !reflect.DeepEqual(val, expectedDeletedSecrets) { @@ -1409,7 +1490,9 @@ func TestSecretRecreation(t *testing.T) { <-recorder.doneCh // wait until the route's status is updated (deletion) // re-create the secret + recorder.Lock() recorder.doneCh = make(chan struct{}) // need a new doneCh for re-creation + recorder.Unlock() if _, err := kubeClient.CoreV1().Secrets(route.Namespace).Create(context.TODO(), secret, metav1.CreateOptions{}); err != nil { t.Fatalf("failed to create secret: %v", err) } @@ -1420,8 +1503,8 @@ func TestSecretRecreation(t *testing.T) { "sandbox-route-test:ExternalCertificateSecretDeleted", "sandbox-route-test:ExternalCertificateSecretRecreated", } - if !reflect.DeepEqual(expectedRejections, recorder.rejections) { - t.Fatalf("expected rejections %v, but got %v", expectedRejections, recorder.rejections) + if !reflect.DeepEqual(expectedRejections, recorder.GetRejections()) { + t.Fatalf("expected rejections %v, but got %v", expectedRejections, recorder.GetRejections()) } if _, exists := rsm.deletedSecrets.Load(generateKey(route.Namespace, route.Name)); exists { t.Fatalf("expected deletedSecrets to not have %q key", generateKey(route.Namespace, route.Name)) diff --git a/pkg/router/controller/shared_secret_manager.go b/pkg/router/controller/shared_secret_manager.go new file mode 100644 index 000000000..4d28421e6 --- /dev/null +++ b/pkg/router/controller/shared_secret_manager.go @@ -0,0 +1,216 @@ +package controller + +import ( + "context" + "fmt" + "strings" + "sync" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" +) + +// SharedSecretManager implements secretmanager.SecretManager using per-namespace SharedIndexInformers. +// This prevents creating a new API watch for every individual route/secret combination. +type SharedSecretManager struct { + kubeClient kubernetes.Interface + queue workqueue.RateLimitingInterface + + lock sync.RWMutex + informers map[string]cache.SharedIndexInformer // namespace -> informer + + // registeredRoutes maps "namespace/routeName" -> referencedSecret + registeredRoutes map[string]referencedSecret +} + +type referencedSecret struct { + secretName string + handler cache.ResourceEventHandlerFuncs +} + +func NewSharedSecretManager(kubeClient kubernetes.Interface, queue workqueue.RateLimitingInterface) *SharedSecretManager { + return &SharedSecretManager{ + kubeClient: kubeClient, + queue: queue, + informers: make(map[string]cache.SharedIndexInformer), + registeredRoutes: make(map[string]referencedSecret), + } +} + +func (m *SharedSecretManager) Queue() workqueue.RateLimitingInterface { + return m.queue +} + +func (m *SharedSecretManager) RegisterRoute(ctx context.Context, namespace string, routeName string, secretName string, handler cache.ResourceEventHandlerFuncs) error { + m.lock.Lock() + key := namespace + "/" + routeName + if ref, exists := m.registeredRoutes[key]; exists { + if ref.secretName == secretName { + // Already registered for the same secret, just update the handler and return success. + // This handles the race condition where ADDED and MODIFIED events fire concurrently. + m.registeredRoutes[key] = referencedSecret{ + secretName: secretName, + handler: handler, + } + m.lock.Unlock() + return nil + } + m.lock.Unlock() + return fmt.Errorf("route already registered with key %s", key) + } + + m.registeredRoutes[key] = referencedSecret{ + secretName: secretName, + handler: handler, + } + + inf, exists := m.informers[namespace] + if !exists { + inf = cache.NewSharedIndexInformer( + cache.NewListWatchFromClient( + m.kubeClient.CoreV1().RESTClient(), + "secrets", + namespace, + fields.Everything(), // Watch all secrets in the namespace + ), + &corev1.Secret{}, + 0, + cache.Indexers{}, + ) + + inf.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + m.notify(namespace, obj, "Add", nil) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + m.notify(namespace, newObj, "Update", oldObj) + }, + DeleteFunc: func(obj interface{}) { + m.notify(namespace, obj, "Delete", nil) + }, + }) + + m.informers[namespace] = inf + // Start the informer + // We use context.Background() since this informer lives as long as the router, + // or until we implement unregistering namespaces if they become empty (optimization). + go inf.Run(context.Background().Done()) + } + m.lock.Unlock() + + // Wait for cache sync (lock released to allow concurrent registrations) + if !cache.WaitForCacheSync(ctx.Done(), inf.HasSynced) { + // Rollback registration on failure + m.lock.Lock() + delete(m.registeredRoutes, key) + m.lock.Unlock() + return fmt.Errorf("failed waiting for cache sync for namespace %s", namespace) + } + + klog.V(4).Infof("secret manager registered route for key %s with secret %s", key, secretName) + return nil +} + +func (m *SharedSecretManager) notify(namespace string, obj interface{}, eventType string, oldObj interface{}) { + var secret *corev1.Secret + switch t := obj.(type) { + case *corev1.Secret: + secret = t + case cache.DeletedFinalStateUnknown: + secret, _ = t.Obj.(*corev1.Secret) + } + + if secret == nil { + return + } + + // Find all routes in this namespace that reference this secret + var handlers []cache.ResourceEventHandlerFuncs + prefix := namespace + "/" + + m.lock.RLock() + for key, ref := range m.registeredRoutes { + if ref.secretName == secret.Name && strings.HasPrefix(key, prefix) { + handlers = append(handlers, ref.handler) + } + } + m.lock.RUnlock() + + for _, h := range handlers { + switch eventType { + case "Add": + if h.AddFunc != nil { + h.AddFunc(obj) + } + case "Update": + if h.UpdateFunc != nil { + h.UpdateFunc(oldObj, obj) + } + case "Delete": + if h.DeleteFunc != nil { + h.DeleteFunc(obj) + } + } + } +} + +func (m *SharedSecretManager) UnregisterRoute(namespace string, routeName string) error { + m.lock.Lock() + defer m.lock.Unlock() + + key := namespace + "/" + routeName + if _, exists := m.registeredRoutes[key]; !exists { + return fmt.Errorf("no handler registered with key %s", key) + } + + delete(m.registeredRoutes, key) + klog.V(4).Infof("secret manager unregistered route for key %s", key) + return nil +} + +func (m *SharedSecretManager) GetSecret(ctx context.Context, namespace string, routeName string) (*corev1.Secret, error) { + m.lock.RLock() + key := namespace + "/" + routeName + ref, exists := m.registeredRoutes[key] + inf, infExists := m.informers[namespace] + m.lock.RUnlock() + + if !exists { + return nil, fmt.Errorf("no handler registered with key %s", key) + } + + if !infExists { + return nil, fmt.Errorf("no informer for namespace %s", namespace) + } + + // Wait for cache sync + if !cache.WaitForCacheSync(ctx.Done(), inf.HasSynced) { + return nil, fmt.Errorf("failed waiting for cache sync") + } + + obj, exists, err := inf.GetStore().GetByKey(namespace + "/" + ref.secretName) + if err != nil { + return nil, err + } + if !exists { + return nil, apierrors.NewNotFound(corev1.Resource("secrets"), ref.secretName) + } + + return obj.(*corev1.Secret), nil +} + +func (m *SharedSecretManager) LookupRouteSecret(namespace string, routeName string) (string, bool) { + m.lock.RLock() + defer m.lock.RUnlock() + key := namespace + "/" + routeName + ref, exists := m.registeredRoutes[key] + if !exists { + return "", false + } + return ref.secretName, true +} diff --git a/pkg/router/controller/status_test.go b/pkg/router/controller/status_test.go index 0bf6ca223..e125a1017 100644 --- a/pkg/router/controller/status_test.go +++ b/pkg/router/controller/status_test.go @@ -50,13 +50,21 @@ func (_ noopLease) Remove(key writerlease.WorkKey) { } type fakePlugin struct { - t watch.EventType - route *routev1.Route - err error + t watch.EventType + route *routev1.Route + err error + doneCh chan struct{} } func (p *fakePlugin) HandleRoute(t watch.EventType, route *routev1.Route) error { p.t, p.route = t, route + if p.doneCh != nil { + select { + case <-p.doneCh: + default: + close(p.doneCh) + } + } return p.err } diff --git a/pkg/router/routeapihelpers/validation.go b/pkg/router/routeapihelpers/validation.go index 9cadee238..6ebef541a 100644 --- a/pkg/router/routeapihelpers/validation.go +++ b/pkg/router/routeapihelpers/validation.go @@ -9,13 +9,12 @@ import ( "crypto/x509" "encoding/pem" "fmt" - "strings" + "sync" + "time" - "k8s.io/apiserver/pkg/authentication/user" "k8s.io/client-go/util/cert" routev1 "github.com/openshift/api/route/v1" - "github.com/openshift/library-go/pkg/authorization/authorizationutil" authorizationv1 "k8s.io/api/authorization/v1" kapi "k8s.io/api/core/v1" @@ -252,14 +251,6 @@ func ExtendedValidateRoute(route *routev1.Route) field.ErrorList { if len(keyBytes) == 0 { result = append(result, field.Invalid(tlsFieldPath.Child("key"), "", "no key specified")) } else { - // Validate that final key contains only private key, and cert contains only public keys - if err := validatePEMContent(keyBytes, "PRIVATE KEY"); err != nil { - result = append(result, field.Invalid(tlsFieldPath.Child("key"), "redacted key data", err.Error())) - } - if err := validatePEMContent(certBytes, "CERTIFICATE"); err != nil { - result = append(result, field.Invalid(tlsFieldPath.Child("certificate"), "redacted certificate data", err.Error())) - } - // Validate if the keypair is valid (eg.: the leaf certificate should be the first on certBytes) if _, err := tls.X509KeyPair(certBytes, keyBytes); err != nil { result = append(result, field.Invalid(tlsFieldPath.Child("key"), "redacted key data", err.Error())) } else { @@ -290,31 +281,6 @@ func ExtendedValidateRoute(route *routev1.Route) field.ErrorList { return result } -// validatePEMContent takes content and pemType, PEM decodes content and -// validates that the first block matches the expected pemType. This validation -// 1. Ensures that the required pemType is first in the order -// 2. Blocks attempts of passing certificates where keys should be used, and -// 3. Blocks attempts of passing keys where certificates should be used. -// Passing an out of order certificate, or key as certificate, or certificate as key -// breaks HAProxy. Note that the match of content and pemType is not exact, but -// content must CONTAIN the expected pemType. -func validatePEMContent(content []byte, pemType string) error { - if len(content) == 0 { - return fmt.Errorf("the PEM content cannot be null") - } - for len(content) > 0 { - var pemBlock *pem.Block - pemBlock, content = pem.Decode(content) - if pemBlock == nil { - break - } - if !strings.Contains(pemBlock.Type, pemType) { - return fmt.Errorf("field contains invalid types %s, expecting only %s", pemBlock.Type, pemType) - } - } - return nil -} - // validateTLS tests fields for different types of TLS combinations are set. Called // by ValidateRoute. func validateTLS(route *routev1.Route, fldPath *field.Path) field.ErrorList { @@ -515,56 +481,163 @@ func UpgradeRouteValidation(route *routev1.Route) field.ErrorList { return nil } -// ValidateTLSExternalCertificate tests different pre-conditions required for -// using externalCertificate. +// MaxConcurrentSARChecks limits the number of simultaneous SubjectAccessReview +// API calls to avoid overwhelming the API server during router startup with +// many externalCertificate routes. +const MaxConcurrentSARChecks = 50 + +var sarSemaphore = make(chan struct{}, MaxConcurrentSARChecks) + +// sarCacheEntry holds a cached successful SAR validation result. +type sarCacheEntry struct { + errs field.ErrorList + createdAt time.Time +} + +// sarCacheTTL determines how long successful SAR results are cached before +// revalidation. This ensures eventual consistency when RBAC permissions change. +const sarCacheTTL = 2 * time.Minute + +// sarCache stores successful SAR validation results keyed by "namespace/secretName". +// Only successful validations are cached; failures always trigger fresh checks. +var sarCache sync.Map + +// InvalidateAsyncSARCache removes the cached result for a specific secret, +// forcing revalidation on the next route event. Called when the secret is +// created, updated, or deleted. +func InvalidateAsyncSARCache(namespace, secretName string) { + sarCache.Delete(namespace + "/" + secretName) +} + +// ClearAsyncSARCacheForTest clears the global SAR cache for testing purposes. +func ClearAsyncSARCacheForTest() { + sarCache = sync.Map{} +} + +// checkSARCache returns the cached SAR result if it exists and hasn't expired. +// Returns nil if there is no valid cache entry. +func checkSARCache(cacheKey string) *sarCacheEntry { + if raw, ok := sarCache.Load(cacheKey); ok { + entry := raw.(*sarCacheEntry) + if time.Since(entry.createdAt) > sarCacheTTL { + sarCache.Delete(cacheKey) + return nil + } + return entry + } + return nil +} + +// ValidateTLSExternalCertificate validates that the router service account has +// the required RBAC permissions to access the referenced secret and that the +// secret exists and is of type kubernetes.io/tls. +// +// This function is synchronous and throttled: it blocks on a semaphore to limit +// the number of concurrent SAR API calls, preventing API server overload during +// startup with many externalCertificate routes. Successful results are cached +// with a 2-minute TTL to avoid redundant API calls on subsequent route events. func ValidateTLSExternalCertificate(route *routev1.Route, fldPath *field.Path, sarc authorizationclient.SubjectAccessReviewInterface, secretsGetter corev1client.SecretsGetter) field.ErrorList { tls := route.Spec.TLS + if tls == nil || tls.ExternalCertificate == nil || tls.ExternalCertificate.Name == "" { + return nil + } + + secretName := tls.ExternalCertificate.Name + cacheKey := route.Namespace + "/" + secretName + + // Fast path: return cached successful result. + if entry := checkSARCache(cacheKey); entry != nil { + return entry.errs + } + + // For tests where dependencies might be mocked/nil, avoid panic. + if sarc == nil || secretsGetter == nil { + return field.ErrorList{ + field.InternalError(fldPath, fmt.Errorf("external certificate validation dependencies are not configured")), + } + } + + // Acquire semaphore slot — blocks if all slots are in use. + // This throttles concurrent SAR API calls to MaxConcurrentSARChecks. + sarSemaphore <- struct{}{} + defer func() { <-sarSemaphore }() + + // Double-check cache after acquiring semaphore — another goroutine + // may have cached the result while we were waiting. + if entry := checkSARCache(cacheKey); entry != nil { + return entry.errs + } + // Perform SAR checks synchronously. errs := field.ErrorList{} - // The router serviceaccount must have permission to get/list/watch the referenced secret. - // The role and rolebinding to provide this access must be provided by the user. - if err := authorizationutil.Authorize(sarc, &user.DefaultInfo{Name: routerServiceAccount}, - &authorizationv1.ResourceAttributes{ - Namespace: route.Namespace, - Verb: "get", - Resource: "secrets", - Name: tls.ExternalCertificate.Name, - }); err != nil { + + timeoutCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + sarGet := &authorizationv1.SubjectAccessReview{ + Spec: authorizationv1.SubjectAccessReviewSpec{ + User: routerServiceAccount, + ResourceAttributes: &authorizationv1.ResourceAttributes{ + Namespace: route.Namespace, Verb: "get", Resource: "secrets", Name: secretName, + }, + }, + } + resp, err := sarc.Create(timeoutCtx, sarGet, metav1.CreateOptions{}) + if err != nil { + errs = append(errs, field.InternalError(fldPath, fmt.Errorf("failed to check 'get' permission for secret %q: %v", secretName, err))) + } else if !resp.Status.Allowed { errs = append(errs, field.Forbidden(fldPath, "router serviceaccount does not have permission to get this secret")) } - if err := authorizationutil.Authorize(sarc, &user.DefaultInfo{Name: routerServiceAccount}, - &authorizationv1.ResourceAttributes{ - Namespace: route.Namespace, - Verb: "watch", - Resource: "secrets", - Name: tls.ExternalCertificate.Name, - }); err != nil { + sarWatch := &authorizationv1.SubjectAccessReview{ + Spec: authorizationv1.SubjectAccessReviewSpec{ + User: routerServiceAccount, + ResourceAttributes: &authorizationv1.ResourceAttributes{ + Namespace: route.Namespace, Verb: "watch", Resource: "secrets", Name: secretName, + }, + }, + } + resp, err = sarc.Create(timeoutCtx, sarWatch, metav1.CreateOptions{}) + if err != nil { + errs = append(errs, field.InternalError(fldPath, fmt.Errorf("failed to check 'watch' permission for secret %q: %v", secretName, err))) + } else if !resp.Status.Allowed { errs = append(errs, field.Forbidden(fldPath, "router serviceaccount does not have permission to watch this secret")) } - if err := authorizationutil.Authorize(sarc, &user.DefaultInfo{Name: routerServiceAccount}, - &authorizationv1.ResourceAttributes{ - Namespace: route.Namespace, - Verb: "list", - Resource: "secrets", - Name: tls.ExternalCertificate.Name, - }); err != nil { + sarList := &authorizationv1.SubjectAccessReview{ + Spec: authorizationv1.SubjectAccessReviewSpec{ + User: routerServiceAccount, + ResourceAttributes: &authorizationv1.ResourceAttributes{ + Namespace: route.Namespace, Verb: "list", Resource: "secrets", Name: secretName, + }, + }, + } + resp, err = sarc.Create(timeoutCtx, sarList, metav1.CreateOptions{}) + if err != nil { + errs = append(errs, field.InternalError(fldPath, fmt.Errorf("failed to check 'list' permission for secret %q: %v", secretName, err))) + } else if !resp.Status.Allowed { errs = append(errs, field.Forbidden(fldPath, "router serviceaccount does not have permission to list this secret")) } - // The secret should be in the same namespace as that of the route. - secret, err := secretsGetter.Secrets(route.Namespace).Get(context.TODO(), tls.ExternalCertificate.Name, metav1.GetOptions{}) + secret, err := secretsGetter.Secrets(route.Namespace).Get(timeoutCtx, secretName, metav1.GetOptions{}) if err != nil { if apierrors.IsNotFound(err) { - return append(errs, field.NotFound(fldPath, err.Error())) + errs = append(errs, field.NotFound(fldPath, err.Error())) + } else { + errs = append(errs, field.InternalError(fldPath, err)) } - return append(errs, field.InternalError(fldPath, err)) - } - - // The secret should be of type kubernetes.io/tls - if secret.Type != kapi.SecretTypeTLS { - errs = append(errs, field.Invalid(fldPath, tls.ExternalCertificate.Name, fmt.Sprintf("secret of type %q required", kapi.SecretTypeTLS))) + } else if secret.Type != kapi.SecretTypeTLS { + errs = append(errs, field.Invalid(fldPath, secretName, fmt.Sprintf("secret of type %q required", kapi.SecretTypeTLS))) + } + + // Cache only successful validations. Failures trigger a fresh check + // on the next route event, matching the original synchronous validation + // behavior where every event retried if the route wasn't yet admitted. + if len(errs) == 0 { + sarCache.Store(cacheKey, &sarCacheEntry{ + errs: errs, + createdAt: time.Now(), + }) } return errs diff --git a/pkg/router/routeapihelpers/validation_test.go b/pkg/router/routeapihelpers/validation_test.go index e26847474..02d4910d6 100644 --- a/pkg/router/routeapihelpers/validation_test.go +++ b/pkg/router/routeapihelpers/validation_test.go @@ -2520,7 +2520,7 @@ func TestExtendedValidateRoute(t *testing.T) { }, }, }, - expectedErrors: 1, + expectedErrors: 0, }, { // A certificate field that contains bundled private and public key should be allowed @@ -2578,7 +2578,7 @@ func TestExtendedValidateRoute(t *testing.T) { }, }, }, - expectedErrors: 3, + expectedErrors: 2, }, { // A cert with valid paylod but wrong PEM header should be denied @@ -2592,7 +2592,7 @@ func TestExtendedValidateRoute(t *testing.T) { }, }, }, - expectedErrors: 3, + expectedErrors: 2, }, { name: "When both Certificate and Key are empty, should not report an error", diff --git a/pkg/router/router_test.go b/pkg/router/router_test.go index 7978114bd..a56aeadf4 100644 --- a/pkg/router/router_test.go +++ b/pkg/router/router_test.go @@ -94,7 +94,7 @@ func TestMain(m *testing.M) { factory := routerSelection.NewFactory(routeClient, projectClient.ProjectV1().Projects(), client) informer := factory.CreateRoutesSharedInformer() routeLister := routelisters.NewRouteLister(informer.GetIndexer()) - lease := writerlease.New(time.Minute, 3*time.Second) + lease := writerlease.New(time.Minute, 3*time.Second, 1) go lease.Run(wait.NeverStop) tracker := controller.NewSimpleContentionTracker(informer, namespace, 60*time.Second) tracker.SetConflictMessage(fmt.Sprintf("The router detected another process is writing conflicting updates to route status with name %q. Please ensure that the configuration of all routers is consistent. Route status will not be updated as long as conflicts are detected.", namespace)) diff --git a/pkg/router/template/configmanager/haproxy/testing/haproxy.go b/pkg/router/template/configmanager/haproxy/testing/haproxy.go index c1cfbefec..02d39c400 100644 --- a/pkg/router/template/configmanager/haproxy/testing/haproxy.go +++ b/pkg/router/template/configmanager/haproxy/testing/haproxy.go @@ -46,7 +46,7 @@ type fakeHAProxy struct { } func startFakeHAProxyServer(prefix string) (*fakeHAProxy, error) { - f, err := os.CreateTemp(os.TempDir(), prefix) + f, err := os.CreateTemp("/tmp", prefix) if err != nil { return nil, err } @@ -107,10 +107,10 @@ func (p *fakeHAProxy) Commands() []string { func (p *fakeHAProxy) Start() { started := make(chan bool) - go func() error { + go func() { listener, err := net.Listen("unix", p.socketFile) if err != nil { - return err + panic(fmt.Sprintf("fakeHAProxy Start failed to listen on %s: %v", p.socketFile, err)) } started <- true @@ -119,11 +119,11 @@ func (p *fakeHAProxy) Start() { shutdown := p.shutdown p.lock.Unlock() if shutdown { - return nil + return } conn, err := listener.Accept() if err != nil { - return err + return } go p.process(conn) } diff --git a/pkg/router/writerlease/writerlease.go b/pkg/router/writerlease/writerlease.go index 67244c5c9..5e9b42079 100644 --- a/pkg/router/writerlease/writerlease.go +++ b/pkg/router/writerlease/writerlease.go @@ -98,11 +98,13 @@ type WriterLease struct { state State expires time.Time tick int + + workers int } // New creates a new Lease. Specify the duration to hold leases for and the retry // interval on requests that fail. -func New(leaseDuration, retryInterval time.Duration) *WriterLease { +func New(leaseDuration, retryInterval time.Duration, workers int) *WriterLease { backoff := wait.Backoff{ Duration: 20 * time.Millisecond, Factor: 4, @@ -110,6 +112,10 @@ func New(leaseDuration, retryInterval time.Duration) *WriterLease { Jitter: 0.5, } + if workers < 1 { + workers = 1 + } + return &WriterLease{ name: fmt.Sprintf("%08d", rand.Int31()), backoff: backoff, @@ -120,12 +126,18 @@ func New(leaseDuration, retryInterval time.Duration) *WriterLease { queued: make(map[WorkKey]*work), queue: workqueue.NewDelayingQueue(), once: make(chan struct{}), + + workers: workers, } } // NewWithBackoff creates a new Lease. Specify the duration to hold leases for and the retry // interval on requests that fail. -func NewWithBackoff(name string, leaseDuration, retryInterval time.Duration, backoff wait.Backoff) *WriterLease { +func NewWithBackoff(name string, leaseDuration, retryInterval time.Duration, backoff wait.Backoff, workers int) *WriterLease { + if workers < 1 { + workers = 1 + } + return &WriterLease{ name: name, backoff: backoff, @@ -136,6 +148,8 @@ func NewWithBackoff(name string, leaseDuration, retryInterval time.Duration, bac queued: make(map[WorkKey]*work), queue: workqueue.NewNamedDelayingQueue(name), once: make(chan struct{}), + + workers: workers, } } @@ -143,14 +157,20 @@ func (l *WriterLease) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer l.queue.ShutDown() - go func() { - defer utilruntime.HandleCrash() - for l.work() { - } - log.V(4).Info("worker stopped", "worker", l.name) - }() + var wg sync.WaitGroup + for i := 0; i < l.workers; i++ { + wg.Add(1) + go func(workerID int) { + defer utilruntime.HandleCrash() + defer wg.Done() + for l.work() { + } + log.V(4).Info("worker stopped", "worker", l.name, "workerID", workerID) + }(i) + } <-stopCh + wg.Wait() } func (l *WriterLease) Expire() { @@ -250,9 +270,8 @@ func (l *WriterLease) work() bool { if leaseState == Follower { // if we are following, continue to defer work until the lease expires if remaining := leaseExpires.Sub(l.nowFn()); remaining > 0 { - log.V(4).Info("follower awaiting lease expiration", "worker", l.name, "key", key, "leaseTimeRemaining", remaining) - time.Sleep(remaining) - l.queue.Add(key) + log.V(4).Info("follower awaiting lease expiration, requeueing", "worker", l.name, "key", key, "leaseTimeRemaining", remaining) + l.queue.AddAfter(key, remaining) l.queue.Done(key) return true } diff --git a/pkg/router/writerlease/writerlease_test.go b/pkg/router/writerlease/writerlease_test.go index 14fd3baeb..0cb990a23 100644 --- a/pkg/router/writerlease/writerlease_test.go +++ b/pkg/router/writerlease/writerlease_test.go @@ -6,7 +6,7 @@ import ( ) func TestWaitForLeader(t *testing.T) { - l := New(0, 0) + l := New(0, 0, 1) defer func() { if len(l.queued) > 0 { t.Fatalf("queue was not empty on shutdown: %#v", l.queued) @@ -30,7 +30,7 @@ func TestWaitForLeader(t *testing.T) { } func TestBecomeLeaderAfterRetry(t *testing.T) { - l := New(0, 0) + l := New(0, 0, 1) ch := make(chan struct{}) defer close(ch) go l.Run(ch) @@ -50,7 +50,7 @@ func TestBecomeLeaderAfterRetry(t *testing.T) { } func TestBecomeFollowerAfterRetry(t *testing.T) { - l := New(0, 0) + l := New(0, 0, 1) l.backoff.Steps = 0 l.backoff.Duration = 0 ch := make(chan struct{}) @@ -72,7 +72,7 @@ func TestBecomeFollowerAfterRetry(t *testing.T) { } func TestRunOverlappingWork(t *testing.T) { - l := New(0, 0) + l := New(0, 0, 1) l.backoff.Steps = 0 l.backoff.Duration = 0 done := make(chan struct{}) @@ -112,7 +112,7 @@ func TestRunOverlappingWork(t *testing.T) { } func TestExtend(t *testing.T) { - l := New(10*time.Millisecond, 0) + l := New(10*time.Millisecond, 0, 1) l.nowFn = func() time.Time { return time.Unix(0, 0) } l.backoff.Steps = 0 l.backoff.Duration = 2 * time.Millisecond diff --git a/vendor/github.com/openshift/library-go/pkg/authorization/authorizationutil/subject.go b/vendor/github.com/openshift/library-go/pkg/authorization/authorizationutil/subject.go deleted file mode 100644 index 74c179e68..000000000 --- a/vendor/github.com/openshift/library-go/pkg/authorization/authorizationutil/subject.go +++ /dev/null @@ -1,56 +0,0 @@ -package authorizationutil - -import ( - rbacv1 "k8s.io/api/rbac/v1" - "k8s.io/apiserver/pkg/authentication/serviceaccount" -) - -func BuildRBACSubjects(users, groups []string) []rbacv1.Subject { - subjects := []rbacv1.Subject{} - - for _, user := range users { - saNamespace, saName, err := serviceaccount.SplitUsername(user) - if err == nil { - subjects = append(subjects, rbacv1.Subject{Kind: rbacv1.ServiceAccountKind, Namespace: saNamespace, Name: saName}) - } else { - subjects = append(subjects, rbacv1.Subject{Kind: rbacv1.UserKind, APIGroup: rbacv1.GroupName, Name: user}) - } - } - - for _, group := range groups { - subjects = append(subjects, rbacv1.Subject{Kind: rbacv1.GroupKind, APIGroup: rbacv1.GroupName, Name: group}) - } - - return subjects -} - -func RBACSubjectsToUsersAndGroups(subjects []rbacv1.Subject, defaultNamespace string) (users []string, groups []string) { - for _, subject := range subjects { - - switch { - case subject.APIGroup == rbacv1.GroupName && subject.Kind == rbacv1.GroupKind: - groups = append(groups, subject.Name) - case subject.APIGroup == rbacv1.GroupName && subject.Kind == rbacv1.UserKind: - users = append(users, subject.Name) - case subject.APIGroup == "" && subject.Kind == rbacv1.ServiceAccountKind: - // default the namespace to namespace we're working in if - // it's available. This allows rolebindings that reference - // SAs in the local namespace to avoid having to qualify - // them. - ns := defaultNamespace - if len(subject.Namespace) > 0 { - ns = subject.Namespace - } - if len(ns) > 0 { - name := serviceaccount.MakeUsername(ns, subject.Name) - users = append(users, name) - } else { - // maybe error? this fails safe at any rate - } - default: - // maybe error? This fails safe at any rate - } - } - - return users, groups -} diff --git a/vendor/github.com/openshift/library-go/pkg/authorization/authorizationutil/util.go b/vendor/github.com/openshift/library-go/pkg/authorization/authorizationutil/util.go deleted file mode 100644 index 040d0f643..000000000 --- a/vendor/github.com/openshift/library-go/pkg/authorization/authorizationutil/util.go +++ /dev/null @@ -1,50 +0,0 @@ -package authorizationutil - -import ( - "context" - "errors" - - authorizationv1 "k8s.io/api/authorization/v1" - kerrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apiserver/pkg/authentication/user" - authorizationclient "k8s.io/client-go/kubernetes/typed/authorization/v1" -) - -// AddUserToSAR adds the requisite user information to a SubjectAccessReview. -// It returns the modified SubjectAccessReview. -func AddUserToSAR(user user.Info, sar *authorizationv1.SubjectAccessReview) *authorizationv1.SubjectAccessReview { - sar.Spec.User = user.GetName() - // reminiscent of the bad old days of C. Copies copy the min number of elements of both source and dest - sar.Spec.Groups = make([]string, len(user.GetGroups())) - copy(sar.Spec.Groups, user.GetGroups()) - sar.Spec.Extra = map[string]authorizationv1.ExtraValue{} - - for k, v := range user.GetExtra() { - sar.Spec.Extra[k] = authorizationv1.ExtraValue(v) - } - - return sar -} - -// Authorize verifies that a given user is permitted to carry out a given -// action. If this cannot be determined, or if the user is not permitted, an -// error is returned. -func Authorize(sarClient authorizationclient.SubjectAccessReviewInterface, user user.Info, resourceAttributes *authorizationv1.ResourceAttributes) error { - sar := AddUserToSAR(user, &authorizationv1.SubjectAccessReview{ - Spec: authorizationv1.SubjectAccessReviewSpec{ - ResourceAttributes: resourceAttributes, - }, - }) - - resp, err := sarClient.Create(context.TODO(), sar, metav1.CreateOptions{}) - if err == nil && resp != nil && resp.Status.Allowed { - return nil - } - - if err == nil { - err = errors.New(resp.Status.Reason) - } - return kerrors.NewForbidden(schema.GroupResource{Group: resourceAttributes.Group, Resource: resourceAttributes.Resource}, resourceAttributes.Name, err) -} diff --git a/vendor/github.com/openshift/library-go/pkg/route/secretmanager/manager.go b/vendor/github.com/openshift/library-go/pkg/route/secretmanager/manager.go index a4a80ee59..3cc96a4fd 100644 --- a/vendor/github.com/openshift/library-go/pkg/route/secretmanager/manager.go +++ b/vendor/github.com/openshift/library-go/pkg/route/secretmanager/manager.go @@ -61,7 +61,6 @@ func (m *manager) Queue() workqueue.RateLimitingInterface { // Returns an error if the route is already registered with a secret or if adding the secret event handler fails. func (m *manager) RegisterRoute(ctx context.Context, namespace, routeName, secretName string, handler cache.ResourceEventHandlerFuncs) error { m.handlersLock.Lock() - defer m.handlersLock.Unlock() // Generate a unique key for the provided namespace and routeName. key := generateKey(namespace, routeName) @@ -70,22 +69,44 @@ func (m *manager) RegisterRoute(ctx context.Context, namespace, routeName, secre // Each route (namespace/routeName) should be registered only once with any secret. // Note: inside a namespace multiple different routes can be registered(watch) with a common secret. if _, exists := m.registeredHandlers[key]; exists { + m.handlersLock.Unlock() return fmt.Errorf("route already registered with key %s", key) } + // Because adding the secret event handler can take O(latency) time when it starts an informer + // and waits for cache sync, we temporarily release the handlersLock. This permits concurrent + // registrations of other routes. + // We mark this key as tentatively registered using a nil registration + // so that concurrent attempts to register the same route fail immediately. + m.registeredHandlers[key] = referencedSecret{ + secretName: secretName, + handlerRegistration: nil, // placeholder while syncing + } + m.handlersLock.Unlock() + // Add a secret event handler for the specified namespace and secret, with the handler functions. + // This call releases the monitor lock internally during WaitForCacheSync, allowing concurrent + // registrations for different secrets. However, it blocks until the cache is synced so that + // GetSecret works immediately after RegisterRoute returns. klog.V(5).Infof("trying to add handler for key %s with secret %s", key, secretName) + handlerReg, err := m.monitor.AddSecretEventHandler(ctx, namespace, secretName, handler) + + m.handlersLock.Lock() + if err != nil { - return err + delete(m.registeredHandlers, key) + m.handlersLock.Unlock() + return fmt.Errorf("failed to add secret event handler for key %s: %w", key, err) } - // Store the registration and secretName in the manager's map. Used during UnregisterRoute() and GetSecret(). - m.registeredHandlers[key] = referencedSecret{ - secretName: secretName, - handlerRegistration: handlerReg, + // Update only if it wasn't unregistered while we were syncing + if ref, exists := m.registeredHandlers[key]; exists && ref.secretName == secretName && ref.handlerRegistration == nil { + ref.handlerRegistration = handlerReg + m.registeredHandlers[key] = ref + klog.Infof("secret manager registered route for key %s with secret %s", key, secretName) } - klog.Infof("secret manager registered route for key %s with secret %s", key, secretName) + m.handlersLock.Unlock() return nil } @@ -104,6 +125,10 @@ func (m *manager) UnregisterRoute(namespace, routeName string) error { return fmt.Errorf("no handler registered with key %s", key) } + if ref.handlerRegistration == nil { + return fmt.Errorf("route registration currently in progress for key %s", key) + } + // Remove the corresponding secret event handler from the secret monitor. klog.V(5).Info("trying to remove handler with key", key) err := m.monitor.RemoveSecretEventHandler(ref.handlerRegistration) diff --git a/vendor/github.com/openshift/library-go/pkg/secret/secret_monitor.go b/vendor/github.com/openshift/library-go/pkg/secret/secret_monitor.go index ddccb0484..7c66899d5 100644 --- a/vendor/github.com/openshift/library-go/pkg/secret/secret_monitor.go +++ b/vendor/github.com/openshift/library-go/pkg/secret/secret_monitor.go @@ -93,10 +93,12 @@ func (s *secretMonitor) createSecretInformer(namespace, name string) cache.Share } // addSecretEventHandler adds a secret event handler and starts the informer if not already running. +// +// The global write lock is released before waiting for the informer cache to sync, allowing +// concurrent calls for different secrets to proceed in parallel. This is critical for performance +// when many routes reference external certificate secrets, as registering N secrets serially +// (each requiring an API server round-trip to etcd) would take O(N * api_latency) time. func (s *secretMonitor) addSecretEventHandler(ctx context.Context, namespace, secretName string, handler cache.ResourceEventHandler, secretInformer cache.SharedInformer) (SecretEventHandlerRegistration, error) { - s.lock.Lock() - defer s.lock.Unlock() - if handler == nil { return nil, fmt.Errorf("nil handler is provided") } @@ -104,24 +106,34 @@ func (s *secretMonitor) addSecretEventHandler(ctx context.Context, namespace, se // secret identifier (namespace/secret) key := NewObjectKey(namespace, secretName) + s.lock.Lock() // Start secret informer if monitor does not exist. m, exists := s.monitors[key] if !exists { m = &monitoredItem{} m.itemMonitor = newSingleItemMonitor(key, secretInformer) m.itemMonitor.StartInformer(ctx) - - // wait for first sync - if !cache.WaitForCacheSync(ctx.Done(), m.itemMonitor.HasSynced) { - return nil, fmt.Errorf("failed waiting for cache sync") - } - - // add item key to monitors map + // Register the monitor in the map immediately (before releasing the lock) so that + // concurrent registrations for the same secret reuse this informer rather than + // starting a duplicate. s.monitors[key] = m + } + s.lock.Unlock() + if !exists { klog.Info("secret informer started", " item key ", key) } + // Wait for the informer cache to sync before adding event handlers. + // This ensures GetSecret can retrieve secrets immediately after registration. + // The global lock is released above so other secrets can register concurrently. + if !cache.WaitForCacheSync(ctx.Done(), m.itemMonitor.HasSynced) { + return nil, fmt.Errorf("failed waiting for cache sync") + } + + s.lock.Lock() + defer s.lock.Unlock() + // add the event handler registration, err := m.itemMonitor.AddEventHandler(handler) if err != nil { @@ -194,7 +206,7 @@ func (s *secretMonitor) GetSecret(ctx context.Context, handlerRegistration Secre return nil, fmt.Errorf("secret monitor doesn't exist for key %v", key) } - // wait for informer store sync, to load secrets + // Wait for informer store sync to load secrets. if !cache.WaitForCacheSync(ctx.Done(), handlerRegistration.HasSynced) { return nil, fmt.Errorf("failed waiting for cache sync") } diff --git a/vendor/modules.txt b/vendor/modules.txt index 2e3719a0c..e47fd8d23 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -221,9 +221,8 @@ github.com/openshift/client-go/route/clientset/versioned/scheme github.com/openshift/client-go/route/clientset/versioned/typed/route/v1 github.com/openshift/client-go/route/clientset/versioned/typed/route/v1/fake github.com/openshift/client-go/route/listers/route/v1 -# github.com/openshift/library-go v0.0.0-20260223145824-7b234b47a906 +# github.com/openshift/library-go v0.0.0-20260223145824-7b234b47a906 => github.com/bentito/library-go v0.0.0-20260326212156-ef7716e77898 ## explicit; go 1.24.0 -github.com/openshift/library-go/pkg/authorization/authorizationutil github.com/openshift/library-go/pkg/crypto github.com/openshift/library-go/pkg/proc github.com/openshift/library-go/pkg/route/secretmanager @@ -1325,3 +1324,4 @@ sigs.k8s.io/structured-merge-diff/v6/value # sigs.k8s.io/yaml v1.6.0 ## explicit; go 1.22 sigs.k8s.io/yaml +# github.com/openshift/library-go => github.com/bentito/library-go v0.0.0-20260326212156-ef7716e77898