From d82d072f8601fa8bb624d49b95007d27949f248e Mon Sep 17 00:00:00 2001 From: Miladin Devedzic Date: Mon, 4 May 2026 11:58:39 +0000 Subject: [PATCH 1/7] initial changes --- config/config.yaml | 1 + gnmi/config.go | 6 + gnmi/config_test.go | 17 +- gnmi/gnmi.go | 141 +++++++++----- gnmi/gnmi_test.go | 164 +++++++++++++++- gnmi/testdata/config.yaml | 1 + .../docker_stats_e2e/config/config.yaml | 2 + .../docker_stats_e2e/docker_stats_e2e_test.go | 184 +++++++++++++++--- 8 files changed, 428 insertions(+), 88 deletions(-) diff --git a/config/config.yaml b/config/config.yaml index c3ab876..3067f9a 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -154,6 +154,7 @@ processors: exporters: # NOTE: Prior to v0.86.0 use `logging` instead of `debug`. gnmi: + container_ttl: 20s addr: "0.0.0.0:60302" # To enable transport security (e.g., mTLS), set the following: # tp_sec: "mtls" diff --git a/gnmi/config.go b/gnmi/config.go index 1f5b5ec..56cb088 100644 --- a/gnmi/config.go +++ b/gnmi/config.go @@ -16,6 +16,7 @@ package gnmi import ( "go.opentelemetry.io/collector/component" + "time" ) // Config holds the configuration for this processor. @@ -52,6 +53,11 @@ type Config struct { // Origin is set as the origin of gNMI notifications. Origin string `mapstructure:"origin"` + + // ContainerTTL is the time after which a container with no new docker_stats metrics is considered dead and purged. + // This should be set to a value greater than the docker_stats collection interval. + // If set to 0, no delete notifications will be sent. + ContainerTTL time.Duration `mapstructure:"container_ttl"` } var _ component.Config = (*Config)(nil) diff --git a/gnmi/config_test.go b/gnmi/config_test.go index 81df476..a046bf8 100644 --- a/gnmi/config_test.go +++ b/gnmi/config_test.go @@ -15,13 +15,13 @@ package gnmi import ( - "path/filepath" - "testing" - "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/confmaptest" + "path/filepath" + "testing" + "time" ) func TestUnmarshalDefaultConfig(t *testing.T) { @@ -46,11 +46,12 @@ func TestUnmarshalConfig(t *testing.T) { } want := &Config{ - Addr: "localhost:10", - Sep: "/", - AttrSep: ".", - TargetName: "target", - BufferSize: 10, + Addr: "localhost:10", + Sep: "/", + AttrSep: ".", + TargetName: "target", + BufferSize: 10, + ContainerTTL: 20 * time.Second, } if diff := cmp.Diff(got, want); diff != "" { t.Errorf("UnmarshalConfig() returned diff (-got, +want):\n%s", diff) diff --git a/gnmi/gnmi.go b/gnmi/gnmi.go index 6365e3d..539b256 100644 --- a/gnmi/gnmi.go +++ b/gnmi/gnmi.go @@ -17,24 +17,22 @@ package gnmi import ( "context" "fmt" - "net" - "strings" - "time" - gpb "github.com/openconfig/gnmi/proto/gnmi" - ompb "go.opentelemetry.io/proto/otlp/metrics/v1" - anypb "google.golang.org/protobuf/types/known/anypb" - "github.com/openconfig/magna/lwotgtelem" "github.com/openconfig/magna/lwotgtelem/gnmit" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + ompb "go.opentelemetry.io/proto/otlp/metrics/v1" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/reflection" "google.golang.org/protobuf/proto" + anypb "google.golang.org/protobuf/types/known/anypb" "k8s.io/klog/v2" + "net" + "strings" + "time" ) // GNMI is a gNMI exporter. @@ -422,53 +420,104 @@ func (g *GNMI) notificationsFromLabels(m pcommon.Map, cname string) []*gpb.Notif // and sent to the telemetry server. func (g *GNMI) handleMetrics(_ gnmit.Queue, updateFn gnmit.UpdateFn, target string, cleanup func()) error { go func() { - for ms := range g.metricCh { - var notis []*gpb.Notification - - // Iterate over all resources (e.g., app). - rms := ms.ResourceMetrics() - for i := 0; i < rms.Len(); i++ { - rm := rms.At(i) - cname := "" - - // Extract container name from resource, if not found, log error and continue. - cNameVal, ok := rm.Resource().Attributes().Get("container.name") - if ok && cNameVal.Type() == pcommon.ValueTypeStr { - cname = cNameVal.Str() - } else { - g.logger.Error("resource is not associated with a container name formatted as a string", zap.String("resource", fmt.Sprintf("%+v", rm.Resource().Attributes().AsRaw()))) - continue + var tickerCh <-chan time.Time + if g.cfg.ContainerTTL > 0 { + ticker := time.NewTicker(g.cfg.ContainerTTL / 2) + defer ticker.Stop() + tickerCh = ticker.C + } else { + tickerCh = make(chan time.Time) + } + + lastSeen := make(map[string]time.Time) + for { + select { + case ms, ok := <-g.metricCh: + if !ok { + return } + g.processMetrics(ms, updateFn, lastSeen) + case now := <-tickerCh: + // continue + g.sweepStaleContainers(now, updateFn, lastSeen) + } + } + }() + return nil +} - // Iterate over all instrument scopes within the resource (e.g., module within an app). - ilms := rm.ScopeMetrics() - for j := 0; j < ilms.Len(); j++ { - ilm := ilms.At(j) +func (g *GNMI) processMetrics(ms *pmetric.Metrics, updateFn gnmit.UpdateFn, lastSeen map[string]time.Time) { + now := time.Now() + var notis []*gpb.Notification - // Iterate over all metrics for the instrument scope. - ms := ilm.Metrics() - for k := 0; k < ms.Len(); k++ { - m := ms.At(k) - notis = append(notis, g.notificationsFromMetric(m, cname)...) - } - } + // Iterate over all resources (e.g., app). + rms := ms.ResourceMetrics() + for i := 0; i < rms.Len(); i++ { + rm := rms.At(i) + cname := "" + + // Extract container name from resource, if not found, log error and continue. + cNameVal, ok := rm.Resource().Attributes().Get("container.name") + if ok && cNameVal.Type() == pcommon.ValueTypeStr { + cname = cNameVal.Str() + } else { + g.logger.Error("resource is not associated with a container name formatted as a string", zap.String("resource", fmt.Sprintf("%+v", rm.Resource().Attributes().AsRaw()))) + continue + } - // Obtain notifications for labels. - lmap, ok := rm.Resource().Attributes().Get("container.labels") - if ok && lmap.Type() == pcommon.ValueTypeMap && len(notis) > 0 { - notis = append(notis, g.notificationsFromLabels(lmap.Map(), cname)...) - } + // Iterate over all instrument scopes within the resource (e.g., module within an app). + ilms := rm.ScopeMetrics() + for j := 0; j < ilms.Len(); j++ { + ilm := ilms.At(j) + + // Iterate over all metrics for the instrument scope. + ms := ilm.Metrics() + if ms.Len() > 0 { + // We refresh the TTL whenever we see any container-related metric. + lastSeen[cname] = now + } + for k := 0; k < ms.Len(); k++ { + m := ms.At(k) + notis = append(notis, g.notificationsFromMetric(m, cname)...) } + } - // Send all notifications. - for _, notification := range notis { - if err := updateFn(notification); err != nil { - klog.Errorf("failed to send notification: %v", err) - } + // Obtain notifications for labels. + lmap, ok := rm.Resource().Attributes().Get("container.labels") + if ok && lmap.Type() == pcommon.ValueTypeMap && len(notis) > 0 { + notis = append(notis, g.notificationsFromLabels(lmap.Map(), cname)...) + } + } + + // Send all notifications. + for _, notification := range notis { + if err := updateFn(notification); err != nil { + klog.Errorf("failed to send notification: %v", err) + } + } +} + +func (g *GNMI) sweepStaleContainers(now time.Time, updateFn gnmit.UpdateFn, lastSeen map[string]time.Time) { + for cname, t := range lastSeen { + if now.Sub(t) > g.cfg.ContainerTTL { + delete(lastSeen, cname) + noti := &gpb.Notification{ + Timestamp: now.UnixNano(), + Prefix: &gpb.Path{ + Origin: g.cfg.Origin, + Target: g.cfg.TargetName, + Elem: []*gpb.PathElem{ + {Name: "containers"}, + {Name: "container", Key: map[string]string{"name": cname}}, + }, + }, + Delete: []*gpb.Path{{}}, + } + if err := updateFn(noti); err != nil { + klog.Errorf("failed to send delete notification: %v", err) } } - }() - return nil + } } func (g *GNMI) toPathElems(name string, attrs attrMap) []*gpb.PathElem { diff --git a/gnmi/gnmi_test.go b/gnmi/gnmi_test.go index a8ce78b..e05d6e7 100644 --- a/gnmi/gnmi_test.go +++ b/gnmi/gnmi_test.go @@ -16,22 +16,20 @@ package gnmi import ( "context" - "sync" - "testing" - "time" - "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + gpb "github.com/openconfig/gnmi/proto/gnmi" + "github.com/openconfig/ygot/testutil" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + ompb "go.opentelemetry.io/proto/otlp/metrics/v1" "go.uber.org/zap" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/testing/protocmp" - - gpb "github.com/openconfig/gnmi/proto/gnmi" - "github.com/openconfig/ygot/testutil" - ompb "go.opentelemetry.io/proto/otlp/metrics/v1" anypb "google.golang.org/protobuf/types/known/anypb" + "sync" + "testing" + "time" ) var ( @@ -1034,3 +1032,153 @@ func TestToPathElems(t *testing.T) { }) } } + +func getContainerName(path *gpb.Path) string { + for _, elem := range path.Elem { + if elem.Name == "container" { + return elem.Key["name"] + } + } + return "" +} + +func TestHandleMetricsTTL(t *testing.T) { + type scenarioStep struct { + containerName string + waitDuration time.Duration + injectMetric bool + } + + tests := []struct { + name string + containerTTL time.Duration + steps []scenarioStep + wantDeletes map[string]bool // container name -> expected delete notification + }{ + { + name: "metric-purged", + containerTTL: 100 * time.Millisecond, + steps: []scenarioStep{ + // waitDuration > containerTTL, so expect delete notification. + {containerName: "test-container", injectMetric: true, waitDuration: 300 * time.Millisecond}, + }, + wantDeletes: map[string]bool{"test-container": true}, + }, + { + name: "metric-survives-if-ttl-not-exceeded", + containerTTL: 500 * time.Millisecond, + steps: []scenarioStep{ + {containerName: "test-container", injectMetric: true, waitDuration: 50 * time.Millisecond}, + }, + wantDeletes: map[string]bool{"test-container": false}, + }, + { + name: "metric-survives-with-refresh", + containerTTL: 300 * time.Millisecond, + steps: []scenarioStep{ + {containerName: "test-container", injectMetric: true, waitDuration: 200 * time.Millisecond}, + {containerName: "test-container", injectMetric: true, waitDuration: 200 * time.Millisecond}, + }, + wantDeletes: map[string]bool{"test-container": false}, + }, + { + name: "metric-no-purge-if-zero-ttl", + containerTTL: 0, + steps: []scenarioStep{ + {containerName: "test-container", injectMetric: true, waitDuration: 500 * time.Millisecond}, + }, + wantDeletes: map[string]bool{"test-container": false}, + }, + { + name: "multiple-containers-mixed-purge", + containerTTL: 200 * time.Millisecond, + steps: []scenarioStep{ + {containerName: "container-1", injectMetric: true}, + {containerName: "container-2", injectMetric: true}, + {waitDuration: 300 * time.Millisecond}, // container-1 and container-2 should expire + {containerName: "container-3", injectMetric: true, waitDuration: 50 * time.Millisecond}, + }, + wantDeletes: map[string]bool{ + "container-1": true, + "container-2": true, + "container-3": false, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + g := &GNMI{ + logger: zap.NewExample(), + cfg: &Config{ + TargetName: "test-target", + Sep: "/", + ContainerTTL: tc.containerTTL, + }, + metricCh: make(chan *pmetric.Metrics, 10), + } + + var notifs []*gpb.Notification + var nMu sync.Mutex + updateFn := func(n *gpb.Notification) error { + nMu.Lock() + defer nMu.Unlock() + notifs = append(notifs, n) + return nil + } + + // Start handleMetrics + g.handleMetrics(nil, updateFn, "", nil) + + // Mock the metrics with the interval + for _, step := range tc.steps { + if step.injectMetric { + md := GenerateMetrics(1, pmetric.MetricTypeGauge, map[string]string{"container.name": step.containerName}) + md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetName("container/uptime") + if err := g.storeMetric(context.Background(), md); err != nil { + t.Errorf("storeMetric returned error: %v", err) + } + } + if step.waitDuration > 0 { + time.Sleep(step.waitDuration) + } + } + + close(g.metricCh) + + nMu.Lock() + defer nMu.Unlock() + + deletedContainers := make(map[string]bool) + for _, n := range notifs { + if len(n.Delete) > 0 { + cname := getContainerName(n.Prefix) + if cname != "" { + deletedContainers[cname] = true + } + + // Verify prefix and delete element + expectedPrefix := &gpb.Path{ + Target: "test-target", + Elem: []*gpb.PathElem{ + {Name: "containers"}, + {Name: "container", Key: map[string]string{"name": cname}}, + }, + } + if diff := cmp.Diff(n.Prefix, expectedPrefix, protocmp.Transform()); diff != "" { + t.Errorf("delete prefix mismatch for container %q (-want +got):\n%s", cname, diff) + } + if len(n.Delete[0].Elem) != 0 { + t.Errorf("expected empty path element in delete for container %q, got %v", cname, n.Delete[0].Elem) + } + } + } + + for cname, wantDelete := range tc.wantDeletes { + if deletedContainers[cname] != wantDelete { + t.Errorf("expected delete notification for container %q: %v, but got: %v", cname, wantDelete, deletedContainers[cname]) + } + } + }) + } +} diff --git a/gnmi/testdata/config.yaml b/gnmi/testdata/config.yaml index 9376754..f57216f 100644 --- a/gnmi/testdata/config.yaml +++ b/gnmi/testdata/config.yaml @@ -1,5 +1,6 @@ addr: localhost:10 sep: "/" attr_sep: "." +container_ttl: 20s target_name: "target" buffer_size: 10 \ No newline at end of file diff --git a/integration/docker_stats_e2e/config/config.yaml b/integration/docker_stats_e2e/config/config.yaml index 3406fe1..78668c1 100644 --- a/integration/docker_stats_e2e/config/config.yaml +++ b/integration/docker_stats_e2e/config/config.yaml @@ -3,6 +3,7 @@ receivers: # Note: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/dockerstatsreceiver/README.md collection_interval: 2s initial_delay: 1s + api_version: "1.44" container_labels_to_resource_attributes: - match_type: regexp include: ".*" @@ -157,6 +158,7 @@ exporters: gnmi: target_name: "poodle" origin: "shiba" + container_ttl: 4s service: pipelines: diff --git a/integration/docker_stats_e2e/docker_stats_e2e_test.go b/integration/docker_stats_e2e/docker_stats_e2e_test.go index a5eeb4a..309472c 100644 --- a/integration/docker_stats_e2e/docker_stats_e2e_test.go +++ b/integration/docker_stats_e2e/docker_stats_e2e_test.go @@ -2,13 +2,8 @@ package e2etest import ( "context" - "path/filepath" - "strings" - "sync" - "testing" - "time" - "github.com/openconfig/clio/collector" + gpb "github.com/openconfig/gnmi/proto/gnmi" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" "go.opentelemetry.io/collector/component" @@ -17,8 +12,11 @@ import ( "go.opentelemetry.io/collector/otelcol" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - - gpb "github.com/openconfig/gnmi/proto/gnmi" + "path/filepath" + "strings" + "sync" + "testing" + "time" ) const ( @@ -142,27 +140,29 @@ func validateNotifications(t *testing.T, gotNoti []*gpb.Notification) { // Map containing some of the enabled paths for each "logical metric group." wantPathSet := map[string]bool{ - "container.uptime": true, - "container.restarts": true, - "container.cpu.usage.kernelmode": true, - "container.cpu.usage.total": true, - "container.cpu.usage.usermode": true, - "container.memory.file": true, - "container.memory.percent": true, - "container.memory.usage.total": true, - "container.cpu.utilization": true, - "container.cpu.logical.count": true, - "container.cpu.shares": true, - "container.memory.usage.limit": true, - "container.pids.limit": true, - "container.network.io.usage.rx_bytes": true, - "container.network.io.usage.rx_dropped": true, - "container.network.io.usage.tx_bytes": true, - "labels.app": true, - "labels.version": true, + "uptime": true, + "restarts": true, + "cpu.usage.kernelmode": true, + "cpu.usage.total": true, + "cpu.usage.usermode": true, + "memory.file": true, + "memory.percent": true, + "memory.usage.total": true, + "cpu.utilization": true, + "cpu.logical.count": true, + "cpu.shares": true, + "memory.usage.limit": true, + "pids.limit": true, + "network.io.usage.rx_bytes": true, + "network.io.usage.rx_dropped": true, + "network.io.usage.tx_bytes": true, } for _, n := range gotNoti { + if len(n.GetDelete()) > 0 { + t.Errorf("Unexpected delete notification received: %v", n) + } + for _, u := range n.GetUpdate() { path := elems2path(u.GetPath().GetElem()) delete(wantPathSet, path) @@ -242,3 +242,135 @@ func TestE2E(t *testing.T) { validateNotifications(t, gotNoti) } + +func getContainerName(path *gpb.Path) string { + for _, elem := range path.Elem { + if elem.Name == "container" { + return elem.Key["name"] + } + } + return "" +} + +func TestE2ETTL(t *testing.T) { + container, cleanup := spawnNginxContainer(t) + + gOpts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + } + ctx := context.Background() + + var gotNoti []*gpb.Notification + var notiMu sync.Mutex + sinkWg := &sync.WaitGroup{} + defer sinkWg.Wait() + + cwg, col := startCollectorPipeline(ctx, t) + defer stopCollectorPipeline(t, cwg, col) + + for i := 15; col.GetState() != otelcol.StateRunning; i-- { + if i == 0 { + t.Fatalf("Collector never started") + } + time.Sleep(200 * time.Millisecond) + } + + gnmiConn, err := grpc.NewClient("localhost:6030", gOpts...) + if err != nil { + t.Fatalf("%v", err) + } + defer gnmiConn.Close() + + gnmiClient := gpb.NewGNMIClient(gnmiConn) + stream, err := gnmiClient.Subscribe(ctx) + if err != nil { + t.Fatalf("%v", err) + } + + sinkWg.Add(1) + go func() { + defer sinkWg.Done() + for { + resp, err := stream.Recv() + if err != nil { + return + } + if resp.GetUpdate() != nil { + notiMu.Lock() + gotNoti = append(gotNoti, resp.GetUpdate()) + notiMu.Unlock() + } + } + }() + + sreq := subscribeRequestForTarget(t, testTarget) + err = stream.Send(sreq) + if err != nil { + t.Fatalf("%v", err) + } + + containerName, err := container.Name(ctx) + if err != nil { + t.Fatalf("Failed to get container name: %v", err) + } + // containerName from testcontainers usually has a leading slash, e.g. /reaper_... + trimmedName := strings.TrimPrefix(containerName, "/") + + // Wait for at least one update for the container to ensure it's registered. + updateFound := false + for i := 0; i < 50; i++ { + notiMu.Lock() + for _, n := range gotNoti { + if len(n.Update) > 0 { + if strings.TrimPrefix(getContainerName(n.Prefix), "/") == trimmedName { + updateFound = true + break + } + } + } + notiMu.Unlock() + if updateFound { + break + } + time.Sleep(200 * time.Millisecond) + } + if !updateFound { + t.Fatalf("Timed out waiting for initial update for container %s", trimmedName) + } + + // 2. Kill the container to trigger the death condition + cleanup() + + // 3. Wait for the TTL (config is set to 4s) to expire + sweeper ticker to pick it up. + deleteFound := false + + for i := 0; i < 75; i++ { + notiMu.Lock() + for _, n := range gotNoti { + if len(n.Delete) > 0 { + if n.Prefix.GetTarget() != testTarget { + continue + } + if n.Prefix.GetOrigin() != testOrigin { + continue + } + + // Verify the container name is in the delete path + gotName := getContainerName(n.Delete[0]) + if strings.TrimPrefix(gotName, "/") == trimmedName { + deleteFound = true + break + } + } + } + if deleteFound { + break + } + notiMu.Unlock() + time.Sleep(200 * time.Millisecond) + } + + if !deleteFound { + t.Errorf("Expected a container delete notification from the integration pipeline after killing container %v, but none was sent", containerName) + } +} From fc88cb51228498926cab692fa10a88252fab9a5b Mon Sep 17 00:00:00 2001 From: Miladin Devedzic Date: Mon, 4 May 2026 12:28:01 +0000 Subject: [PATCH 2/7] config --- config/config.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/config/config.yaml b/config/config.yaml index 3067f9a..c3ab876 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -154,7 +154,6 @@ processors: exporters: # NOTE: Prior to v0.86.0 use `logging` instead of `debug`. gnmi: - container_ttl: 20s addr: "0.0.0.0:60302" # To enable transport security (e.g., mTLS), set the following: # tp_sec: "mtls" From 94cd90bf22db2593ab8c9c140c4d7d391f5a26d8 Mon Sep 17 00:00:00 2001 From: Miladin Devedzic Date: Mon, 4 May 2026 13:38:49 +0000 Subject: [PATCH 3/7] nits --- gnmi/config.go | 2 +- gnmi/gnmi.go | 3 +-- integration/docker_stats_e2e/docker_stats_e2e_test.go | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/gnmi/config.go b/gnmi/config.go index 56cb088..1e7d5ed 100644 --- a/gnmi/config.go +++ b/gnmi/config.go @@ -55,7 +55,7 @@ type Config struct { Origin string `mapstructure:"origin"` // ContainerTTL is the time after which a container with no new docker_stats metrics is considered dead and purged. - // This should be set to a value greater than the docker_stats collection interval. + // This should be set to a value greater than the docker_stats collection interval. // If set to 0, no delete notifications will be sent. ContainerTTL time.Duration `mapstructure:"container_ttl"` } diff --git a/gnmi/gnmi.go b/gnmi/gnmi.go index 539b256..6d09e59 100644 --- a/gnmi/gnmi.go +++ b/gnmi/gnmi.go @@ -28,7 +28,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/reflection" "google.golang.org/protobuf/proto" - anypb "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/anypb" "k8s.io/klog/v2" "net" "strings" @@ -438,7 +438,6 @@ func (g *GNMI) handleMetrics(_ gnmit.Queue, updateFn gnmit.UpdateFn, target stri } g.processMetrics(ms, updateFn, lastSeen) case now := <-tickerCh: - // continue g.sweepStaleContainers(now, updateFn, lastSeen) } } diff --git a/integration/docker_stats_e2e/docker_stats_e2e_test.go b/integration/docker_stats_e2e/docker_stats_e2e_test.go index 309472c..006e4e3 100644 --- a/integration/docker_stats_e2e/docker_stats_e2e_test.go +++ b/integration/docker_stats_e2e/docker_stats_e2e_test.go @@ -162,7 +162,7 @@ func validateNotifications(t *testing.T, gotNoti []*gpb.Notification) { if len(n.GetDelete()) > 0 { t.Errorf("Unexpected delete notification received: %v", n) } - + for _, u := range n.GetUpdate() { path := elems2path(u.GetPath().GetElem()) delete(wantPathSet, path) From 6df4fac780e37dc064749e57b8886e42e9224193 Mon Sep 17 00:00:00 2001 From: Miladin Devedzic Date: Mon, 4 May 2026 14:11:56 +0000 Subject: [PATCH 4/7] re-write test --- .../docker_stats_e2e/docker_stats_e2e_test.go | 61 +++++++------------ 1 file changed, 21 insertions(+), 40 deletions(-) diff --git a/integration/docker_stats_e2e/docker_stats_e2e_test.go b/integration/docker_stats_e2e/docker_stats_e2e_test.go index 006e4e3..e4f758a 100644 --- a/integration/docker_stats_e2e/docker_stats_e2e_test.go +++ b/integration/docker_stats_e2e/docker_stats_e2e_test.go @@ -260,8 +260,7 @@ func TestE2ETTL(t *testing.T) { } ctx := context.Background() - var gotNoti []*gpb.Notification - var notiMu sync.Mutex + notiCh := make(chan *gpb.Notification, 100) sinkWg := &sync.WaitGroup{} defer sinkWg.Wait() @@ -296,9 +295,7 @@ func TestE2ETTL(t *testing.T) { return } if resp.GetUpdate() != nil { - notiMu.Lock() - gotNoti = append(gotNoti, resp.GetUpdate()) - notiMu.Unlock() + notiCh <- resp.GetUpdate() } } }() @@ -318,24 +315,16 @@ func TestE2ETTL(t *testing.T) { // Wait for at least one update for the container to ensure it's registered. updateFound := false - for i := 0; i < 50; i++ { - notiMu.Lock() - for _, n := range gotNoti { - if len(n.Update) > 0 { - if strings.TrimPrefix(getContainerName(n.Prefix), "/") == trimmedName { - updateFound = true - break - } + timeout := time.After(10 * time.Second) + for !updateFound { + select { + case n := <-notiCh: + if len(n.GetUpdate()) > 0 && strings.TrimPrefix(getContainerName(n.GetPrefix()), "/") == trimmedName { + updateFound = true } + case <-timeout: + t.Fatalf("Timed out waiting for initial update for container %s", trimmedName) } - notiMu.Unlock() - if updateFound { - break - } - time.Sleep(200 * time.Millisecond) - } - if !updateFound { - t.Fatalf("Timed out waiting for initial update for container %s", trimmedName) } // 2. Kill the container to trigger the death condition @@ -343,34 +332,26 @@ func TestE2ETTL(t *testing.T) { // 3. Wait for the TTL (config is set to 4s) to expire + sweeper ticker to pick it up. deleteFound := false - - for i := 0; i < 75; i++ { - notiMu.Lock() - for _, n := range gotNoti { - if len(n.Delete) > 0 { - if n.Prefix.GetTarget() != testTarget { + timeout = time.After(15 * time.Second) + for !deleteFound { + select { + case n := <-notiCh: + if len(n.GetDelete()) > 0 { + if n.GetPrefix().GetTarget() != testTarget { continue } - if n.Prefix.GetOrigin() != testOrigin { + if n.GetPrefix().GetOrigin() != testOrigin { continue } - // Verify the container name is in the delete path - gotName := getContainerName(n.Delete[0]) + gotName := getContainerName(n.GetDelete()[0]) if strings.TrimPrefix(gotName, "/") == trimmedName { deleteFound = true - break } } + case <-timeout: + t.Errorf("Expected a container delete notification from the integration pipeline after killing container %v, but none was sent", containerName) + return } - if deleteFound { - break - } - notiMu.Unlock() - time.Sleep(200 * time.Millisecond) - } - - if !deleteFound { - t.Errorf("Expected a container delete notification from the integration pipeline after killing container %v, but none was sent", containerName) } } From 52b14d883b856bd3a18a954efbb9db03476a9774 Mon Sep 17 00:00:00 2001 From: Miladin Devedzic Date: Mon, 4 May 2026 14:24:19 +0000 Subject: [PATCH 5/7] nit --- gnmi/gnmi_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gnmi/gnmi_test.go b/gnmi/gnmi_test.go index e05d6e7..6573654 100644 --- a/gnmi/gnmi_test.go +++ b/gnmi/gnmi_test.go @@ -1165,10 +1165,10 @@ func TestHandleMetricsTTL(t *testing.T) { {Name: "container", Key: map[string]string{"name": cname}}, }, } - if diff := cmp.Diff(n.Prefix, expectedPrefix, protocmp.Transform()); diff != "" { + if diff := cmp.Diff(n.GetPrefix(), expectedPrefix, protocmp.Transform()); diff != "" { t.Errorf("delete prefix mismatch for container %q (-want +got):\n%s", cname, diff) } - if len(n.Delete[0].Elem) != 0 { + if len(n.GetDelete()[0].GetElem()) != 0 { t.Errorf("expected empty path element in delete for container %q, got %v", cname, n.Delete[0].Elem) } } From a54dd250892b0880509c6fa1791996bc68a35221 Mon Sep 17 00:00:00 2001 From: Miladin Devedzic Date: Mon, 4 May 2026 14:29:44 +0000 Subject: [PATCH 6/7] nit --- .../docker_stats_e2e/docker_stats_e2e_test.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/integration/docker_stats_e2e/docker_stats_e2e_test.go b/integration/docker_stats_e2e/docker_stats_e2e_test.go index e4f758a..5bb00f6 100644 --- a/integration/docker_stats_e2e/docker_stats_e2e_test.go +++ b/integration/docker_stats_e2e/docker_stats_e2e_test.go @@ -246,7 +246,7 @@ func TestE2E(t *testing.T) { func getContainerName(path *gpb.Path) string { for _, elem := range path.Elem { if elem.Name == "container" { - return elem.Key["name"] + return strings.TrimPrefix(elem.Key["name"], "/") } } return "" @@ -319,7 +319,7 @@ func TestE2ETTL(t *testing.T) { for !updateFound { select { case n := <-notiCh: - if len(n.GetUpdate()) > 0 && strings.TrimPrefix(getContainerName(n.GetPrefix()), "/") == trimmedName { + if len(n.GetUpdate()) > 0 && getContainerName(n.GetPrefix()) == trimmedName { updateFound = true } case <-timeout: @@ -337,15 +337,11 @@ func TestE2ETTL(t *testing.T) { select { case n := <-notiCh: if len(n.GetDelete()) > 0 { - if n.GetPrefix().GetTarget() != testTarget { + if n.GetPrefix().GetTarget() != testTarget || n.GetPrefix().GetOrigin() != testOrigin { continue } - if n.GetPrefix().GetOrigin() != testOrigin { - continue - } - gotName := getContainerName(n.GetDelete()[0]) - if strings.TrimPrefix(gotName, "/") == trimmedName { + if gotName == trimmedName { deleteFound = true } } From a9967c20ae8b400994dea01e6f0e425b59e712d4 Mon Sep 17 00:00:00 2001 From: Miladin Devedzic Date: Tue, 12 May 2026 15:59:12 +0000 Subject: [PATCH 7/7] rename getter --- gnmi/gnmi_test.go | 4 ++-- integration/docker_stats_e2e/docker_stats_e2e_test.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/gnmi/gnmi_test.go b/gnmi/gnmi_test.go index 6573654..c1f8b55 100644 --- a/gnmi/gnmi_test.go +++ b/gnmi/gnmi_test.go @@ -1033,7 +1033,7 @@ func TestToPathElems(t *testing.T) { } } -func getContainerName(path *gpb.Path) string { +func containerNameFromPath(path *gpb.Path) string { for _, elem := range path.Elem { if elem.Name == "container" { return elem.Key["name"] @@ -1152,7 +1152,7 @@ func TestHandleMetricsTTL(t *testing.T) { deletedContainers := make(map[string]bool) for _, n := range notifs { if len(n.Delete) > 0 { - cname := getContainerName(n.Prefix) + cname := containerNameFromPath(n.Prefix) if cname != "" { deletedContainers[cname] = true } diff --git a/integration/docker_stats_e2e/docker_stats_e2e_test.go b/integration/docker_stats_e2e/docker_stats_e2e_test.go index 5bb00f6..17989a5 100644 --- a/integration/docker_stats_e2e/docker_stats_e2e_test.go +++ b/integration/docker_stats_e2e/docker_stats_e2e_test.go @@ -243,7 +243,7 @@ func TestE2E(t *testing.T) { validateNotifications(t, gotNoti) } -func getContainerName(path *gpb.Path) string { +func containerNameFromPath(path *gpb.Path) string { for _, elem := range path.Elem { if elem.Name == "container" { return strings.TrimPrefix(elem.Key["name"], "/") @@ -319,7 +319,7 @@ func TestE2ETTL(t *testing.T) { for !updateFound { select { case n := <-notiCh: - if len(n.GetUpdate()) > 0 && getContainerName(n.GetPrefix()) == trimmedName { + if len(n.GetUpdate()) > 0 && containerNameFromPath(n.GetPrefix()) == trimmedName { updateFound = true } case <-timeout: @@ -340,7 +340,7 @@ func TestE2ETTL(t *testing.T) { if n.GetPrefix().GetTarget() != testTarget || n.GetPrefix().GetOrigin() != testOrigin { continue } - gotName := getContainerName(n.GetDelete()[0]) + gotName := containerNameFromPath(n.GetDelete()[0]) if gotName == trimmedName { deleteFound = true }