diff --git a/app/core.go b/app/core.go index d02abb05..fe8b5014 100644 --- a/app/core.go +++ b/app/core.go @@ -85,8 +85,10 @@ func startServiceController(ctx context.Context, initContext ControllerInitConte completedConfig.ClientBuilder.ClientOrDie(initContext.ClientName), completedConfig.SharedInformers.Core().V1().Services(), completedConfig.SharedInformers.Core().V1().Nodes(), + completedConfig.SharedInformers.Core().V1().Endpoints(), completedConfig.ComponentConfig.KubeCloudShared.ClusterName, utilfeature.DefaultFeatureGate, + completedConfig.ComponentConfig.ServiceController.DirectPodIP, ) if err != nil { // This error shouldn't fail. It lives like this as a legacy. diff --git a/controllers/service/config/types.go b/controllers/service/config/types.go index 3afca163..1c6a556a 100644 --- a/controllers/service/config/types.go +++ b/controllers/service/config/types.go @@ -22,4 +22,9 @@ type ServiceControllerConfiguration struct { // allowed to sync concurrently. Larger number = more responsive service // management, but more CPU (and network) load. ConcurrentServiceSyncs int32 + + // directPodIp is the mode of load balancer service + // If true configured, load balancer will use pods ip addresses + // as pool members instead of nodes addresses. + DirectPodIP bool } diff --git a/controllers/service/controller.go b/controllers/service/controller.go index ae717e6b..063b1848 100644 --- a/controllers/service/controller.go +++ b/controllers/service/controller.go @@ -90,6 +90,11 @@ type Controller struct { // services that need to be synced queue workqueue.RateLimitingInterface + endpointLister corelisters.EndpointsLister + endpointListerSynced cache.InformerSynced + // endpoints that need to be synced + endpointsQueue workqueue.RateLimitingInterface + // nodeSyncLock ensures there is only one instance of triggerNodeSync getting executed at one time // and protects internal states (needFullSync) of nodeSync nodeSyncLock sync.Mutex @@ -97,6 +102,9 @@ type Controller struct { nodeSyncCh chan interface{} // needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services. needFullSync bool + + // if service controller will create lb svc in directPodIP mode or nodePort mode + directPodIP bool } // New returns a new service controller to keep cloud provider service resources @@ -106,8 +114,10 @@ func New( kubeClient clientset.Interface, serviceInformer coreinformers.ServiceInformer, nodeInformer coreinformers.NodeInformer, + endpointInformer coreinformers.EndpointsInformer, clusterName string, featureGate featuregate.FeatureGate, + directPodIP bool, ) (*Controller, error) { broadcaster := record.NewBroadcaster() broadcaster.StartStructuredLogging(0) @@ -134,6 +144,10 @@ func New( queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), // nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached. nodeSyncCh: make(chan interface{}, 1), + endpointLister: endpointInformer.Lister(), + endpointListerSynced: endpointInformer.Informer().HasSynced, + endpointsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "endpoint"), + directPodIP: directPodIP, } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -161,6 +175,10 @@ func New( s.serviceLister = serviceInformer.Lister() s.serviceListerSynced = serviceInformer.Informer().HasSynced + endpointInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: s.updateEndpoint, + }) + nodeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: func(cur interface{}) { @@ -189,7 +207,6 @@ func New( }, time.Duration(0), ) - if err := s.init(); err != nil { return nil, err } @@ -229,6 +246,7 @@ func (s *Controller) enqueueService(obj interface{}) { func (s *Controller) Run(ctx context.Context, workers int) { defer runtime.HandleCrash() defer s.queue.ShutDown() + defer s.endpointsQueue.ShutDown() klog.Info("Starting service controller") defer klog.Info("Shutting down service controller") @@ -239,10 +257,16 @@ func (s *Controller) Run(ctx context.Context, workers int) { for i := 0; i < workers; i++ { go wait.UntilWithContext(ctx, s.worker, time.Second) + if s.directPodIP { + go wait.UntilWithContext(ctx, s.endpointWorker, time.Second) + } } - go s.nodeSyncLoop(ctx, workers) - go wait.Until(s.triggerNodeSync, nodeSyncPeriod, ctx.Done()) + // sync node only if LB svc in nodePort mode + if !s.directPodIP { + go s.nodeSyncLoop(ctx, workers) + go wait.Until(s.triggerNodeSync, nodeSyncPeriod, ctx.Done()) + } <-ctx.Done() } @@ -284,6 +308,50 @@ func (s *Controller) worker(ctx context.Context) { } } +// endpointWorker runs a worker thread that just dequeues items, processes them, and marks them done. +// It enforces that the syncHandler is never invoked concurrently with the same key. +func (s *Controller) endpointWorker(ctx context.Context) { + for s.processNextEndpointWorkItem(ctx) { + } +} + +func (s *Controller) processNextEndpointWorkItem(ctx context.Context) bool { + key, quit := s.endpointsQueue.Get() + if quit { + return false + } + defer s.endpointsQueue.Done(key) + + err := s.updateService(ctx, key.(string)) + if err == nil { + s.endpointsQueue.Forget(key) + return true + } + + runtime.HandleError(fmt.Errorf("error updating endpoints for service %v (will retry): %v", key, err)) + s.endpointsQueue.AddRateLimited(key) + return true +} + +// updateService will call updateLoadBalancer to update pool members +func (s *Controller) updateService(ctx context.Context, key string) error { + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + service, err := s.serviceLister.Services(namespace).Get(name) + if err != nil { + klog.Errorf("Failed to get service %s with error %v", name, err) + return err + } + klog.Infof("Update service %s/%s which in direct-pod-ip mode triggered by endpoints changed ", namespace, name) + var emptyNodes [] *v1.Node + err = s.balancer.UpdateLoadBalancer(ctx, s.clusterName, service, emptyNodes) + + return err +} + // nodeSyncLoop takes nodeSync signal and triggers nodeSync func (s *Controller) nodeSyncLoop(ctx context.Context, workers int) { klog.V(4).Info("nodeSyncLoop Started") @@ -604,6 +672,47 @@ func (s *Controller) needsUpdate(oldService *v1.Service, newService *v1.Service) return false } +// updateEndpoint +func (s *Controller) updateEndpoint(old, new interface{}){ + oldep := old.(*v1.Endpoints) + newep := new.(*v1.Endpoints) + + // service of the endpoint + name := newep.Name + namespace := newep.Namespace + + service, err := s.serviceLister.Services(namespace).Get(name) + + if errors.IsNotFound(err) { + // for some endpoints without service that used for leader election, such as endpoints in kubevirt namespace + // the resourceVersion keep increasing and endpoints keep updating + klog.V(4).Infof("Failed to get service %s with error %v, do not need update service", name, err) + return + } else if err != nil { + klog.Errorf("Failed to get service %s with error %v", name, err) + return + } + + if service.Spec.Type == v1.ServiceTypeLoadBalancer { + if s.directPodIP { + if !reflect.DeepEqual(oldep.Subsets, newep.Subsets) { + klog.Infof("Endpoint %s changed, update the service in direct-pod-ip mode", name) + s.enqueueEndpoint(new) + } + } + } +} + +// obj could be an *v1.Endpoints, or a DeletionFinalStateUnknown marker item. +func (s *Controller) enqueueEndpoint(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err)) + return + } + s.endpointsQueue.Add(key) +} + func getPortsForLB(service *v1.Service) []*v1.ServicePort { ports := []*v1.ServicePort{} for i := range service.Spec.Ports { @@ -770,6 +879,11 @@ func (s *Controller) nodeSyncService(svc *v1.Service) bool { if svc == nil || !wantsLoadBalancer(svc) { return false } + if s.directPodIP { + klog.Infof("service %s/%s is direct-pod-ip mode, skip sync nodes", svc.Namespace, svc.Name) + return false + } + klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name) hosts, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate()) if err != nil { diff --git a/options/servicecontroller.go b/options/servicecontroller.go index c63e43a9..abb01428 100644 --- a/options/servicecontroller.go +++ b/options/servicecontroller.go @@ -33,6 +33,7 @@ func (o *ServiceControllerOptions) AddFlags(fs *pflag.FlagSet) { } fs.Int32Var(&o.ConcurrentServiceSyncs, "concurrent-service-syncs", o.ConcurrentServiceSyncs, "The number of services that are allowed to sync concurrently. Larger number = more responsive service management, but more CPU (and network) load") + fs.BoolVar(&o.DirectPodIP, "direct-pod-ip",true, "If 'true' configured, load balancer will use pods ip addresses as pool members instead of nodes addresses.") } // ApplyTo fills up ServiceController config with options. @@ -42,6 +43,7 @@ func (o *ServiceControllerOptions) ApplyTo(cfg *serviceconfig.ServiceControllerC } cfg.ConcurrentServiceSyncs = o.ConcurrentServiceSyncs + cfg.DirectPodIP = o.DirectPodIP return nil }