From 44e28648716817e60775b4cde9afa3309b74313c Mon Sep 17 00:00:00 2001 From: Sercan Degirmenci Date: Mon, 30 Jun 2025 10:37:17 +0300 Subject: [PATCH] track all endpoint slices instead of first 100 endpoint bug fix #66 --- builder.go | 137 +++++++++++++++++++++++++++++++++++++++++++++-------- models.go | 12 +++-- 2 files changed, 125 insertions(+), 24 deletions(-) diff --git a/builder.go b/builder.go index 8c6ee10..04e4152 100644 --- a/builder.go +++ b/builder.go @@ -162,13 +162,14 @@ func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts } ctx, cancel := context.WithCancel(context.Background()) r := &kResolver{ - target: ti, - ctx: ctx, - cancel: cancel, - cc: cc, - k8sClient: b.k8sClient, - t: time.NewTimer(defaultFreq), - freq: defaultFreq, + target: ti, + ctx: ctx, + cancel: cancel, + cc: cc, + k8sClient: b.k8sClient, + t: time.NewTimer(defaultFreq), + freq: defaultFreq, + endpointSlices: make(map[string]EndpointSlice), // Track all endpoint slices by name endpoints: endpointsForTarget.WithLabelValues(ti.String()), addresses: addressesForTarget.WithLabelValues(ti.String()), @@ -201,6 +202,10 @@ type kResolver struct { t *time.Timer freq time.Duration + // Track all endpoint slices for this service + endpointSlicesMu sync.RWMutex + endpointSlices map[string]EndpointSlice // key is EndpointSlice name from metadata + endpoints prometheus.Gauge addresses prometheus.Gauge // lastUpdateUnix is the timestamp of the last successful update to the resolver client @@ -219,7 +224,7 @@ func (k *kResolver) Close() { k.wg.Wait() } -func (k *kResolver) makeAddresses(e EndpointSlice) ([]resolver.Address, string) { +func (k *kResolver) makeAddresses(e EndpointSlice) []resolver.Address { port := k.target.port for _, p := range e.Ports { if k.target.useFirstPort { @@ -231,7 +236,7 @@ func (k *kResolver) makeAddresses(e EndpointSlice) ([]resolver.Address, string) } } - if len(port) == 0 { + if len(port) == 0 && len(e.Ports) > 0 { port = strconv.Itoa(e.Ports[0].Port) } @@ -250,27 +255,120 @@ func (k *kResolver) makeAddresses(e EndpointSlice) ([]resolver.Address, string) } } - return newAddrs, "" + return newAddrs } -func (k *kResolver) handle(e EndpointSlice) { - addrs, _ := k.makeAddresses(e) - if len(addrs) > 0 { +// Aggregate addresses from all endpoint slices +func (k *kResolver) aggregateAllAddresses() []resolver.Address { + k.endpointSlicesMu.RLock() + defer k.endpointSlicesMu.RUnlock() + + var allAddresses []resolver.Address + totalEndpoints := 0 + + for _, slice := range k.endpointSlices { + addrs := k.makeAddresses(slice) + allAddresses = append(allAddresses, addrs...) + totalEndpoints += len(slice.Endpoints) + } + + // Update metrics with aggregated data + k.endpoints.Set(float64(totalEndpoints)) + k.addresses.Set(float64(len(allAddresses))) + + return allAddresses +} + +// Handle individual endpoint slice events +func (k *kResolver) handleEndpointSliceEvent(event Event) { + k.endpointSlicesMu.Lock() + defer k.endpointSlicesMu.Unlock() + + sliceName := event.Object.Metadata.Name + if sliceName == "" { + // Fallback: generate name from first endpoint if metadata is missing + if len(event.Object.Endpoints) > 0 && len(event.Object.Endpoints[0].Addresses) > 0 { + sliceName = fmt.Sprintf("slice-%s", event.Object.Endpoints[0].Addresses[0]) + } else { + sliceName = fmt.Sprintf("slice-%d", len(k.endpointSlices)) + } + } + + switch event.Type { + case Added, Modified: + k.endpointSlices[sliceName] = event.Object + grpclog.Infof("kuberesolver: EndpointSlice %s %s, total slices: %d", sliceName, strings.ToLower(string(event.Type)), len(k.endpointSlices)) + case Deleted: + delete(k.endpointSlices, sliceName) + grpclog.Infof("kuberesolver: EndpointSlice %s deleted, total slices: %d", sliceName, len(k.endpointSlices)) + case Error: + grpclog.Errorf("kuberesolver: Error event for EndpointSlice %s", sliceName) + return // Don't update state on error + } + + // Aggregate all addresses from all slices and update the client + allAddresses := k.aggregateAllAddressesUnlocked() + + if len(allAddresses) > 0 { k.cc.UpdateState(resolver.State{ - Addresses: addrs, + Addresses: allAddresses, }) k.lastUpdateUnix.Set(float64(time.Now().Unix())) } +} + +// Internal method without lock (called when lock is already held) +func (k *kResolver) aggregateAllAddressesUnlocked() []resolver.Address { + var allAddresses []resolver.Address + totalEndpoints := 0 + + for _, slice := range k.endpointSlices { + addrs := k.makeAddresses(slice) + allAddresses = append(allAddresses, addrs...) + totalEndpoints += len(slice.Endpoints) + } + + // Update metrics with aggregated data + k.endpoints.Set(float64(totalEndpoints)) + k.addresses.Set(float64(len(allAddresses))) - k.endpoints.Set(float64(len(e.Endpoints))) - k.addresses.Set(float64(len(addrs))) + return allAddresses +} + +// Legacy method for backward compatibility - now handles single slice as modified event +func (k *kResolver) handle(e EndpointSlice) { + // Convert to event format and handle properly + k.handleEndpointSliceEvent(Event{Type: Modified, Object: e}) } func (k *kResolver) resolve() { list, err := getEndpointSliceList(k.k8sClient, k.target.serviceNamespace, k.target.serviceName) if err == nil { - for _, e := range list.Items { - k.handle(e) + k.endpointSlicesMu.Lock() + // Clear existing slices and repopulate with current state + k.endpointSlices = make(map[string]EndpointSlice) + + for _, slice := range list.Items { + sliceName := slice.Metadata.Name + if sliceName == "" { + // Generate a name if metadata is missing + sliceName = fmt.Sprintf("slice-%d", len(k.endpointSlices)) + } + k.endpointSlices[sliceName] = slice + } + + grpclog.Infof("kuberesolver: Loaded %d endpoint slices for service %s.%s", + len(k.endpointSlices), k.target.serviceName, k.target.serviceNamespace) + + // Aggregate all addresses + allAddresses := k.aggregateAllAddressesUnlocked() + k.endpointSlicesMu.Unlock() + + if len(allAddresses) > 0 { + k.cc.UpdateState(resolver.State{ + Addresses: allAddresses, + }) + k.lastUpdateUnix.Set(float64(time.Now().Unix())) } } else { grpclog.Errorf("kuberesolver: lookup endpoints failed: %v", err) @@ -294,7 +392,8 @@ func (k *kResolver) watch() error { k.resolve() case up, hasMore := <-sw.ResultChan(): if hasMore { - k.handle(up.Object) + // Use the new event-based handler that properly aggregates all slices + k.handleEndpointSliceEvent(up) } else { return nil } diff --git a/models.go b/models.go index e1d9b23..2e41924 100644 --- a/models.go +++ b/models.go @@ -16,17 +16,18 @@ type Event struct { } type EndpointSliceList struct { - Items []EndpointSlice + Items []EndpointSlice `json:"items"` } type EndpointSlice struct { - Endpoints []Endpoint - Ports []EndpointPort + Metadata Metadata `json:"metadata"` // Add metadata to track slice identity + Endpoints []Endpoint `json:"endpoints"` + Ports []EndpointPort `json:"ports"` } type Endpoint struct { - Addresses []string - Conditions EndpointConditions + Addresses []string `json:"addresses"` + Conditions EndpointConditions `json:"conditions"` } type EndpointConditions struct { @@ -41,6 +42,7 @@ type Metadata struct { ResourceVersion string `json:"resourceVersion"` Labels map[string]string `json:"labels"` } + type EndpointPort struct { Name string `json:"name"` Port int `json:"port"`