Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,10 @@ func startServiceController(ctx context.Context, initContext ControllerInitConte
completedConfig.ClientBuilder.ClientOrDie(initContext.ClientName),
completedConfig.SharedInformers.Core().V1().Services(),
completedConfig.SharedInformers.Core().V1().Nodes(),
completedConfig.SharedInformers.Core().V1().Endpoints(),
completedConfig.ComponentConfig.KubeCloudShared.ClusterName,
utilfeature.DefaultFeatureGate,
completedConfig.ComponentConfig.ServiceController.DirectPodIP,
)
if err != nil {
// This error shouldn't fail. It lives like this as a legacy.
Expand Down
5 changes: 5 additions & 0 deletions controllers/service/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,9 @@ type ServiceControllerConfiguration struct {
// allowed to sync concurrently. Larger number = more responsive service
// management, but more CPU (and network) load.
ConcurrentServiceSyncs int32

// directPodIp is the mode of load balancer service
// If true configured, load balancer will use pods ip addresses
// as pool members instead of nodes addresses.
DirectPodIP bool
}
120 changes: 117 additions & 3 deletions controllers/service/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,21 @@ type Controller struct {
// services that need to be synced
queue workqueue.RateLimitingInterface

endpointLister corelisters.EndpointsLister
endpointListerSynced cache.InformerSynced
// endpoints that need to be synced
endpointsQueue workqueue.RateLimitingInterface

// nodeSyncLock ensures there is only one instance of triggerNodeSync getting executed at one time
// and protects internal states (needFullSync) of nodeSync
nodeSyncLock sync.Mutex
// nodeSyncCh triggers nodeSyncLoop to run
nodeSyncCh chan interface{}
// needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services.
needFullSync bool

// if service controller will create lb svc in directPodIP mode or nodePort mode
directPodIP bool
}

// New returns a new service controller to keep cloud provider service resources
Expand All @@ -106,8 +114,10 @@ func New(
kubeClient clientset.Interface,
serviceInformer coreinformers.ServiceInformer,
nodeInformer coreinformers.NodeInformer,
endpointInformer coreinformers.EndpointsInformer,
clusterName string,
featureGate featuregate.FeatureGate,
directPodIP bool,
) (*Controller, error) {
broadcaster := record.NewBroadcaster()
broadcaster.StartStructuredLogging(0)
Expand All @@ -134,6 +144,10 @@ func New(
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
// nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached.
nodeSyncCh: make(chan interface{}, 1),
endpointLister: endpointInformer.Lister(),
endpointListerSynced: endpointInformer.Informer().HasSynced,
endpointsQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "endpoint"),
directPodIP: directPodIP,
}

serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
Expand Down Expand Up @@ -161,6 +175,10 @@ func New(
s.serviceLister = serviceInformer.Lister()
s.serviceListerSynced = serviceInformer.Informer().HasSynced

endpointInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: s.updateEndpoint,
})

nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(cur interface{}) {
Expand Down Expand Up @@ -189,7 +207,6 @@ func New(
},
time.Duration(0),
)

if err := s.init(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -229,6 +246,7 @@ func (s *Controller) enqueueService(obj interface{}) {
func (s *Controller) Run(ctx context.Context, workers int) {
defer runtime.HandleCrash()
defer s.queue.ShutDown()
defer s.endpointsQueue.ShutDown()

klog.Info("Starting service controller")
defer klog.Info("Shutting down service controller")
Expand All @@ -239,10 +257,16 @@ func (s *Controller) Run(ctx context.Context, workers int) {

for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, s.worker, time.Second)
if s.directPodIP {
go wait.UntilWithContext(ctx, s.endpointWorker, time.Second)
}
}

go s.nodeSyncLoop(ctx, workers)
go wait.Until(s.triggerNodeSync, nodeSyncPeriod, ctx.Done())
// sync node only if LB svc in nodePort mode
if !s.directPodIP {
go s.nodeSyncLoop(ctx, workers)
go wait.Until(s.triggerNodeSync, nodeSyncPeriod, ctx.Done())
}

<-ctx.Done()
}
Expand Down Expand Up @@ -284,6 +308,50 @@ func (s *Controller) worker(ctx context.Context) {
}
}

// endpointWorker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (s *Controller) endpointWorker(ctx context.Context) {
for s.processNextEndpointWorkItem(ctx) {
}
}

func (s *Controller) processNextEndpointWorkItem(ctx context.Context) bool {
key, quit := s.endpointsQueue.Get()
if quit {
return false
}
defer s.endpointsQueue.Done(key)

err := s.updateService(ctx, key.(string))
if err == nil {
s.endpointsQueue.Forget(key)
return true
}

runtime.HandleError(fmt.Errorf("error updating endpoints for service %v (will retry): %v", key, err))
s.endpointsQueue.AddRateLimited(key)
return true
}

// updateService will call updateLoadBalancer to update pool members
func (s *Controller) updateService(ctx context.Context, key string) error {

namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
service, err := s.serviceLister.Services(namespace).Get(name)
if err != nil {
klog.Errorf("Failed to get service %s with error %v", name, err)
return err
}
klog.Infof("Update service %s/%s which in direct-pod-ip mode triggered by endpoints changed ", namespace, name)
var emptyNodes [] *v1.Node
err = s.balancer.UpdateLoadBalancer(ctx, s.clusterName, service, emptyNodes)

return err
}

// nodeSyncLoop takes nodeSync signal and triggers nodeSync
func (s *Controller) nodeSyncLoop(ctx context.Context, workers int) {
klog.V(4).Info("nodeSyncLoop Started")
Expand Down Expand Up @@ -604,6 +672,47 @@ func (s *Controller) needsUpdate(oldService *v1.Service, newService *v1.Service)
return false
}

// updateEndpoint
func (s *Controller) updateEndpoint(old, new interface{}){
oldep := old.(*v1.Endpoints)
newep := new.(*v1.Endpoints)

// service of the endpoint
name := newep.Name
namespace := newep.Namespace

service, err := s.serviceLister.Services(namespace).Get(name)

if errors.IsNotFound(err) {
// for some endpoints without service that used for leader election, such as endpoints in kubevirt namespace
// the resourceVersion keep increasing and endpoints keep updating
klog.V(4).Infof("Failed to get service %s with error %v, do not need update service", name, err)
return
} else if err != nil {
klog.Errorf("Failed to get service %s with error %v", name, err)
return
}

if service.Spec.Type == v1.ServiceTypeLoadBalancer {
if s.directPodIP {
if !reflect.DeepEqual(oldep.Subsets, newep.Subsets) {
klog.Infof("Endpoint %s changed, update the service in direct-pod-ip mode", name)
s.enqueueEndpoint(new)
}
}
}
}

// obj could be an *v1.Endpoints, or a DeletionFinalStateUnknown marker item.
func (s *Controller) enqueueEndpoint(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
return
}
s.endpointsQueue.Add(key)
}

func getPortsForLB(service *v1.Service) []*v1.ServicePort {
ports := []*v1.ServicePort{}
for i := range service.Spec.Ports {
Expand Down Expand Up @@ -770,6 +879,11 @@ func (s *Controller) nodeSyncService(svc *v1.Service) bool {
if svc == nil || !wantsLoadBalancer(svc) {
return false
}
if s.directPodIP {
klog.Infof("service %s/%s is direct-pod-ip mode, skip sync nodes", svc.Namespace, svc.Name)
return false
}

klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name)
hosts, err := listWithPredicate(s.nodeLister, s.getNodeConditionPredicate())
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions options/servicecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (o *ServiceControllerOptions) AddFlags(fs *pflag.FlagSet) {
}

fs.Int32Var(&o.ConcurrentServiceSyncs, "concurrent-service-syncs", o.ConcurrentServiceSyncs, "The number of services that are allowed to sync concurrently. Larger number = more responsive service management, but more CPU (and network) load")
fs.BoolVar(&o.DirectPodIP, "direct-pod-ip",true, "If 'true' configured, load balancer will use pods ip addresses as pool members instead of nodes addresses.")
}

// ApplyTo fills up ServiceController config with options.
Expand All @@ -42,6 +43,7 @@ func (o *ServiceControllerOptions) ApplyTo(cfg *serviceconfig.ServiceControllerC
}

cfg.ConcurrentServiceSyncs = o.ConcurrentServiceSyncs
cfg.DirectPodIP = o.DirectPodIP

return nil
}
Expand Down