From ae97a6a7a0d031abedef19e4ea10e64cbf08de51 Mon Sep 17 00:00:00 2001 From: chestack Date: Thu, 16 Jun 2022 16:11:25 +0800 Subject: [PATCH] Enable 'direct-pod-ip' mode LB svc 1.add new flag 'direct-pod-ip'(default is true) to identify if service controller create LB svc in directPodIP mode or nodePort mode 2.add goroutine to sync endpoints changes of svc, update svc LB if endpoints changed(pods not ready or replicas scaled) 3.do sync endpoints change if directPodIP mode, otherwise sync nodes change if nodePort mode Closes-Task: #EAS-105343 --- app/core.go | 2 + controllers/service/config/types.go | 5 ++ controllers/service/controller.go | 120 +++++++++++++++++++++++++++- options/servicecontroller.go | 2 + 4 files changed, 126 insertions(+), 3 deletions(-) diff --git a/app/core.go b/app/core.go index d02abb05..fe8b5014 100644 --- a/app/core.go +++ b/app/core.go @@ -85,8 +85,10 @@ func startServiceController(ctx context.Context, initContext ControllerInitConte completedConfig.ClientBuilder.ClientOrDie(initContext.ClientName), completedConfig.SharedInformers.Core().V1().Services(), completedConfig.SharedInformers.Core().V1().Nodes(), + completedConfig.SharedInformers.Core().V1().Endpoints(), completedConfig.ComponentConfig.KubeCloudShared.ClusterName, utilfeature.DefaultFeatureGate, + completedConfig.ComponentConfig.ServiceController.DirectPodIP, ) if err != nil { // This error shouldn't fail. It lives like this as a legacy. diff --git a/controllers/service/config/types.go b/controllers/service/config/types.go index 3afca163..1c6a556a 100644 --- a/controllers/service/config/types.go +++ b/controllers/service/config/types.go @@ -22,4 +22,9 @@ type ServiceControllerConfiguration struct { // allowed to sync concurrently. Larger number = more responsive service // management, but more CPU (and network) load. ConcurrentServiceSyncs int32 + + // directPodIp is the mode of load balancer service + // If true configured, load balancer will use pods ip addresses + // as pool members instead of nodes addresses. + DirectPodIP bool } diff --git a/controllers/service/controller.go b/controllers/service/controller.go index ae717e6b..063b1848 100644 --- a/controllers/service/controller.go +++ b/controllers/service/controller.go @@ -90,6 +90,11 @@ type Controller struct { // services that need to be synced queue workqueue.RateLimitingInterface + endpointLister corelisters.EndpointsLister + endpointListerSynced cache.InformerSynced + // endpoints that need to be synced + endpointsQueue workqueue.RateLimitingInterface + // nodeSyncLock ensures there is only one instance of triggerNodeSync getting executed at one time // and protects internal states (needFullSync) of nodeSync nodeSyncLock sync.Mutex @@ -97,6 +102,9 @@ type Controller struct { nodeSyncCh chan interface{} // needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services. needFullSync bool + + // if service controller will create lb svc in directPodIP mode or nodePort mode + directPodIP bool } // New returns a new service controller to keep cloud provider service resources @@ -106,8 +114,10 @@ func New( kubeClient clientset.Interface, serviceInformer coreinformers.ServiceInformer, nodeInformer coreinformers.NodeInformer, + endpointInformer coreinformers.EndpointsInformer, clusterName string, featureGate featuregate.FeatureGate, + directPodIP bool, ) (*Controller, error) { broadcaster := record.NewBroadcaster() broadcaster.StartStructuredLogging(0) @@ -134,6 +144,10 @@ func New( queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), // nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached. nodeSyncCh: make(chan interface{}, 1), + endpointLister: endpointInformer.Lister(), + endpointListerSynced: endpointInformer.Informer().HasSynced, + endpointsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "endpoint"), + directPodIP: directPodIP, } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -161,6 +175,10 @@ func New( s.serviceLister = serviceInformer.Lister() s.serviceListerSynced = serviceInformer.Informer().HasSynced + endpointInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: s.updateEndpoint, + }) + nodeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: func(cur interface{}) { @@ -189,7 +207,6 @@ func New( }, time.Duration(0), ) - if err := s.init(); err != nil { return nil, err } @@ -229,6 +246,7 @@ func (s *Controller) enqueueService(obj interface{}) { func (s *Controller) Run(ctx context.Context, workers int) { defer runtime.HandleCrash() defer s.queue.ShutDown() + defer s.endpointsQueue.ShutDown() klog.Info("Starting service controller") defer klog.Info("Shutting down service controller") @@ -239,10 +257,16 @@ func (s *Controller) Run(ctx context.Context, workers int) { for i := 0; i < workers; i++ { go wait.UntilWithContext(ctx, s.worker, time.Second) + if s.directPodIP { + go wait.UntilWithContext(ctx, s.endpointWorker, time.Second) + } } - go s.nodeSyncLoop(ctx, workers) - go wait.Until(s.triggerNodeSync, nodeSyncPeriod, ctx.Done()) + // sync node only if LB svc in nodePort mode + if !s.directPodIP { + go s.nodeSyncLoop(ctx, workers) + go wait.Until(s.triggerNodeSync, nodeSyncPeriod, ctx.Done()) + } <-ctx.Done() } @@ -284,6 +308,50 @@ func (s *Controller) worker(ctx context.Context) { } } +// endpointWorker runs a worker thread that just dequeues items, processes them, and marks them done. +// It enforces that the syncHandler is never invoked concurrently with the same key. +func (s *Controller) endpointWorker(ctx context.Context) { + for s.processNextEndpointWorkItem(ctx) { + } +} + +func (s *Controller) processNextEndpointWorkItem(ctx context.Context) bool { + key, quit := s.endpointsQueue.Get() + if quit { + return false + } + defer s.endpointsQueue.Done(key) + + err := s.updateService(ctx, key.(string)) + if err == nil { + s.endpointsQueue.Forget(key) + return true + } + + runtime.HandleError(fmt.Errorf("error updating endpoints for service %v (will retry): %v", key, err)) + s.endpointsQueue.AddRateLimited(key) + return true +} + +// updateService will call updateLoadBalancer to update pool members +func (s *Controller) updateService(ctx context.Context, key string) error { + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + service, err := s.serviceLister.Services(namespace).Get(name) + if err != nil { + klog.Errorf("Failed to get service %s with error %v", name, err) + return err + } + klog.Infof("Update service %s/%s which in direct-pod-ip mode triggered by endpoints changed ", namespace, name) + var emptyNodes [] *v1.Node + err = s.balancer.UpdateLoadBalancer(ctx, s.clusterName, service, emptyNodes) + + return err +} + // nodeSyncLoop takes nodeSync signal and triggers nodeSync func (s *Controller) nodeSyncLoop(ctx context.Context, workers int) { klog.V(4).Info("nodeSyncLoop Started") @@ -604,6 +672,47 @@ func (s *Controller) needsUpdate(oldService *v1.Service, newService *v1.Service) return false } +// updateEndpoint +func (s *Controller) updateEndpoint(old, new interface{}){ + oldep := old.(*v1.Endpoints) + newep := new.(*v1.Endpoints) + + // service of the endpoint + name := newep.Name + namespace := newep.Namespace + + service, err := s.serviceLister.Services(namespace).Get(name) + + if errors.IsNotFound(err) { + // for some endpoints without service that used for leader election, such as endpoints in kubevirt namespace + // the resourceVersion keep increasing and endpoints keep updating + klog.V(4).Infof("Failed to get service %s with error %v, do not need update service", name, err) + return + } else if err != nil { + klog.Errorf("Failed to get service %s with error %v", name, err) + return + } + + if service.Spec.Type == v1.ServiceTypeLoadBalancer { + if s.directPodIP { + if !reflect.DeepEqual(oldep.Subsets, newep.Subsets) { + klog.Infof("Endpoint %s changed, update the service in direct-pod-ip mode", name) + s.enqueueEndpoint(new) + } + } + } +} + +// obj could be an *v1.Endpoints, or a DeletionFinalStateUnknown marker item. +func (s *Controller) enqueueEndpoint(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err)) + return + } + s.endpointsQueue.Add(key) +} + func getPortsForLB(service *v1.Service) []*v1.ServicePort { ports := []*v1.ServicePort{} for i := range service.Spec.Ports { @@ -770,6 +879,11 @@ func (s *Controller) nodeSyncService(svc *v1.Service) bool { if svc == nil || !wantsLoadBalancer(svc) { return false } + if s.directPodIP { + klog.Infof("service %s/%s is direct-pod-ip mode, skip sync nodes", svc.Namespace, svc.Name) + return false + } + klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name) hosts, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate()) if err != nil { diff --git a/options/servicecontroller.go b/options/servicecontroller.go index c63e43a9..abb01428 100644 --- a/options/servicecontroller.go +++ b/options/servicecontroller.go @@ -33,6 +33,7 @@ func (o *ServiceControllerOptions) AddFlags(fs *pflag.FlagSet) { } fs.Int32Var(&o.ConcurrentServiceSyncs, "concurrent-service-syncs", o.ConcurrentServiceSyncs, "The number of services that are allowed to sync concurrently. Larger number = more responsive service management, but more CPU (and network) load") + fs.BoolVar(&o.DirectPodIP, "direct-pod-ip",true, "If 'true' configured, load balancer will use pods ip addresses as pool members instead of nodes addresses.") } // ApplyTo fills up ServiceController config with options. @@ -42,6 +43,7 @@ func (o *ServiceControllerOptions) ApplyTo(cfg *serviceconfig.ServiceControllerC } cfg.ConcurrentServiceSyncs = o.ConcurrentServiceSyncs + cfg.DirectPodIP = o.DirectPodIP return nil }