Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 118 additions & 19 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,14 @@ func (b *kubeBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
}
ctx, cancel := context.WithCancel(context.Background())
r := &kResolver{
target: ti,
ctx: ctx,
cancel: cancel,
cc: cc,
k8sClient: b.k8sClient,
t: time.NewTimer(defaultFreq),
freq: defaultFreq,
target: ti,
ctx: ctx,
cancel: cancel,
cc: cc,
k8sClient: b.k8sClient,
t: time.NewTimer(defaultFreq),
freq: defaultFreq,
endpointSlices: make(map[string]EndpointSlice), // Track all endpoint slices by name

endpoints: endpointsForTarget.WithLabelValues(ti.String()),
addresses: addressesForTarget.WithLabelValues(ti.String()),
Expand Down Expand Up @@ -201,6 +202,10 @@ type kResolver struct {
t *time.Timer
freq time.Duration

// Track all endpoint slices for this service
endpointSlicesMu sync.RWMutex
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand the code, only one goroutine handles events, so we don't have to synchronize using a mutex.

endpointSlices map[string]EndpointSlice // key is EndpointSlice name from metadata

endpoints prometheus.Gauge
addresses prometheus.Gauge
// lastUpdateUnix is the timestamp of the last successful update to the resolver client
Expand All @@ -219,7 +224,7 @@ func (k *kResolver) Close() {
k.wg.Wait()
}

func (k *kResolver) makeAddresses(e EndpointSlice) ([]resolver.Address, string) {
func (k *kResolver) makeAddresses(e EndpointSlice) []resolver.Address {
port := k.target.port
for _, p := range e.Ports {
if k.target.useFirstPort {
Expand All @@ -231,7 +236,7 @@ func (k *kResolver) makeAddresses(e EndpointSlice) ([]resolver.Address, string)
}
}

if len(port) == 0 {
if len(port) == 0 && len(e.Ports) > 0 {
port = strconv.Itoa(e.Ports[0].Port)
}

Expand All @@ -250,27 +255,120 @@ func (k *kResolver) makeAddresses(e EndpointSlice) ([]resolver.Address, string)
}
}

return newAddrs, ""
return newAddrs
}

func (k *kResolver) handle(e EndpointSlice) {
addrs, _ := k.makeAddresses(e)
if len(addrs) > 0 {
// Aggregate addresses from all endpoint slices
func (k *kResolver) aggregateAllAddresses() []resolver.Address {
k.endpointSlicesMu.RLock()
defer k.endpointSlicesMu.RUnlock()

var allAddresses []resolver.Address
totalEndpoints := 0

for _, slice := range k.endpointSlices {
addrs := k.makeAddresses(slice)
allAddresses = append(allAddresses, addrs...)
totalEndpoints += len(slice.Endpoints)
}

// Update metrics with aggregated data
k.endpoints.Set(float64(totalEndpoints))
k.addresses.Set(float64(len(allAddresses)))

return allAddresses
}

// Handle individual endpoint slice events
func (k *kResolver) handleEndpointSliceEvent(event Event) {
k.endpointSlicesMu.Lock()
defer k.endpointSlicesMu.Unlock()

sliceName := event.Object.Metadata.Name
if sliceName == "" {
Copy link
Copy Markdown

@lothar1998 lothar1998 Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure that endpoint slice name has to be present. If it is absent, then we should probably ignore such event.

// Fallback: generate name from first endpoint if metadata is missing
if len(event.Object.Endpoints) > 0 && len(event.Object.Endpoints[0].Addresses) > 0 {
sliceName = fmt.Sprintf("slice-%s", event.Object.Endpoints[0].Addresses[0])
} else {
sliceName = fmt.Sprintf("slice-%d", len(k.endpointSlices))
}
}
Comment on lines +288 to +295
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the fact that endpoints can be duplicated, I think that using a hashmap with keys based on endpoints might not work.


switch event.Type {
case Added, Modified:
k.endpointSlices[sliceName] = event.Object
grpclog.Infof("kuberesolver: EndpointSlice %s %s, total slices: %d", sliceName, strings.ToLower(string(event.Type)), len(k.endpointSlices))
case Deleted:
delete(k.endpointSlices, sliceName)
grpclog.Infof("kuberesolver: EndpointSlice %s deleted, total slices: %d", sliceName, len(k.endpointSlices))
case Error:
grpclog.Errorf("kuberesolver: Error event for EndpointSlice %s", sliceName)
return // Don't update state on error
}

// Aggregate all addresses from all slices and update the client
allAddresses := k.aggregateAllAddressesUnlocked()

if len(allAddresses) > 0 {
k.cc.UpdateState(resolver.State{
Addresses: addrs,
Addresses: allAddresses,
})
k.lastUpdateUnix.Set(float64(time.Now().Unix()))
}
}

// Internal method without lock (called when lock is already held)
func (k *kResolver) aggregateAllAddressesUnlocked() []resolver.Address {
var allAddresses []resolver.Address
totalEndpoints := 0

for _, slice := range k.endpointSlices {
addrs := k.makeAddresses(slice)
allAddresses = append(allAddresses, addrs...)
totalEndpoints += len(slice.Endpoints)
}
Comment on lines +322 to +329
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the documentation https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#duplicate-endpoints endpoints can be duplicated, which means we should deduplicate them


// Update metrics with aggregated data
k.endpoints.Set(float64(totalEndpoints))
k.addresses.Set(float64(len(allAddresses)))

k.endpoints.Set(float64(len(e.Endpoints)))
k.addresses.Set(float64(len(addrs)))
return allAddresses
}

// Legacy method for backward compatibility - now handles single slice as modified event
func (k *kResolver) handle(e EndpointSlice) {
// Convert to event format and handle properly
k.handleEndpointSliceEvent(Event{Type: Modified, Object: e})
}

func (k *kResolver) resolve() {
list, err := getEndpointSliceList(k.k8sClient, k.target.serviceNamespace, k.target.serviceName)
if err == nil {
for _, e := range list.Items {
k.handle(e)
k.endpointSlicesMu.Lock()
// Clear existing slices and repopulate with current state
k.endpointSlices = make(map[string]EndpointSlice)

for _, slice := range list.Items {
sliceName := slice.Metadata.Name
if sliceName == "" {
// Generate a name if metadata is missing
sliceName = fmt.Sprintf("slice-%d", len(k.endpointSlices))
}
k.endpointSlices[sliceName] = slice
}

grpclog.Infof("kuberesolver: Loaded %d endpoint slices for service %s.%s",
len(k.endpointSlices), k.target.serviceName, k.target.serviceNamespace)

// Aggregate all addresses
allAddresses := k.aggregateAllAddressesUnlocked()
k.endpointSlicesMu.Unlock()

if len(allAddresses) > 0 {
k.cc.UpdateState(resolver.State{
Addresses: allAddresses,
})
k.lastUpdateUnix.Set(float64(time.Now().Unix()))
}
} else {
grpclog.Errorf("kuberesolver: lookup endpoints failed: %v", err)
Expand All @@ -294,7 +392,8 @@ func (k *kResolver) watch() error {
k.resolve()
case up, hasMore := <-sw.ResultChan():
if hasMore {
k.handle(up.Object)
// Use the new event-based handler that properly aggregates all slices
k.handleEndpointSliceEvent(up)
} else {
return nil
}
Expand Down
12 changes: 7 additions & 5 deletions models.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ type Event struct {
}

type EndpointSliceList struct {
Items []EndpointSlice
Items []EndpointSlice `json:"items"`
}

type EndpointSlice struct {
Endpoints []Endpoint
Ports []EndpointPort
Metadata Metadata `json:"metadata"` // Add metadata to track slice identity
Endpoints []Endpoint `json:"endpoints"`
Ports []EndpointPort `json:"ports"`
}

type Endpoint struct {
Addresses []string
Conditions EndpointConditions
Addresses []string `json:"addresses"`
Conditions EndpointConditions `json:"conditions"`
}

type EndpointConditions struct {
Expand All @@ -41,6 +42,7 @@ type Metadata struct {
ResourceVersion string `json:"resourceVersion"`
Labels map[string]string `json:"labels"`
}

type EndpointPort struct {
Name string `json:"name"`
Port int `json:"port"`
Expand Down