Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions gnmi/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package gnmi

import (
"go.opentelemetry.io/collector/component"
"time"
)

// Config holds the configuration for this processor.
Expand Down Expand Up @@ -52,6 +53,11 @@ type Config struct {

// Origin is set as the origin of gNMI notifications.
Origin string `mapstructure:"origin"`

// ContainerTTL is the time after which a container with no new docker_stats metrics is considered dead and purged.
// This should be set to a value greater than the docker_stats collection interval.
// If set to 0, no delete notifications will be sent.
ContainerTTL time.Duration `mapstructure:"container_ttl"`
}

var _ component.Config = (*Config)(nil)
17 changes: 9 additions & 8 deletions gnmi/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
package gnmi

import (
"path/filepath"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/confmaptest"
"path/filepath"
"testing"
"time"
)

func TestUnmarshalDefaultConfig(t *testing.T) {
Expand All @@ -46,11 +46,12 @@ func TestUnmarshalConfig(t *testing.T) {
}

want := &Config{
Addr: "localhost:10",
Sep: "/",
AttrSep: ".",
TargetName: "target",
BufferSize: 10,
Addr: "localhost:10",
Sep: "/",
AttrSep: ".",
TargetName: "target",
BufferSize: 10,
ContainerTTL: 20 * time.Second,
}
if diff := cmp.Diff(got, want); diff != "" {
t.Errorf("UnmarshalConfig() returned diff (-got, +want):\n%s", diff)
Expand Down
140 changes: 94 additions & 46 deletions gnmi/gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,22 @@
import (
"context"
"fmt"
"net"
"strings"
"time"

gpb "github.com/openconfig/gnmi/proto/gnmi"
ompb "go.opentelemetry.io/proto/otlp/metrics/v1"
anypb "google.golang.org/protobuf/types/known/anypb"

"github.com/openconfig/magna/lwotgtelem"
"github.com/openconfig/magna/lwotgtelem/gnmit"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
ompb "go.opentelemetry.io/proto/otlp/metrics/v1"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"k8s.io/klog/v2"
"net"
"strings"
"time"
)

// GNMI is a gNMI exporter.
Expand Down Expand Up @@ -420,55 +418,105 @@
// handleMetrics iterates over all received metrics and converts them into a
// gNMI update. This set of updates are then packed into a gNMI notification
// and sent to the telemetry server.
func (g *GNMI) handleMetrics(_ gnmit.Queue, updateFn gnmit.UpdateFn, target string, cleanup func()) error {

Check failure on line 421 in gnmi/gnmi.go

View workflow job for this annotation

GitHub Actions / go / Static Analysis

parameter 'cleanup' seems to be unused, consider removing or renaming it as _

Check failure on line 421 in gnmi/gnmi.go

View workflow job for this annotation

GitHub Actions / go / Static Analysis

parameter 'target' seems to be unused, consider removing or renaming it as _
go func() {
for ms := range g.metricCh {
var notis []*gpb.Notification

// Iterate over all resources (e.g., app).
rms := ms.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
cname := ""

// Extract container name from resource, if not found, log error and continue.
cNameVal, ok := rm.Resource().Attributes().Get("container.name")
if ok && cNameVal.Type() == pcommon.ValueTypeStr {
cname = cNameVal.Str()
} else {
g.logger.Error("resource is not associated with a container name formatted as a string", zap.String("resource", fmt.Sprintf("%+v", rm.Resource().Attributes().AsRaw())))
continue
var tickerCh <-chan time.Time
if g.cfg.ContainerTTL > 0 {
ticker := time.NewTicker(g.cfg.ContainerTTL / 2)
defer ticker.Stop()
tickerCh = ticker.C
} else {
tickerCh = make(chan time.Time)
}

lastSeen := make(map[string]time.Time)
for {
select {
case ms, ok := <-g.metricCh:
if !ok {
return
}
g.processMetrics(ms, updateFn, lastSeen)
case now := <-tickerCh:
g.sweepStaleContainers(now, updateFn, lastSeen)
}
}
}()
return nil
}

// Iterate over all instrument scopes within the resource (e.g., module within an app).
ilms := rm.ScopeMetrics()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
func (g *GNMI) processMetrics(ms *pmetric.Metrics, updateFn gnmit.UpdateFn, lastSeen map[string]time.Time) {
now := time.Now()
var notis []*gpb.Notification

// Iterate over all metrics for the instrument scope.
ms := ilm.Metrics()
for k := 0; k < ms.Len(); k++ {
m := ms.At(k)
notis = append(notis, g.notificationsFromMetric(m, cname)...)
}
}
// Iterate over all resources (e.g., app).
rms := ms.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
rm := rms.At(i)
cname := ""

// Extract container name from resource, if not found, log error and continue.
cNameVal, ok := rm.Resource().Attributes().Get("container.name")
if ok && cNameVal.Type() == pcommon.ValueTypeStr {
cname = cNameVal.Str()
} else {
g.logger.Error("resource is not associated with a container name formatted as a string", zap.String("resource", fmt.Sprintf("%+v", rm.Resource().Attributes().AsRaw())))
continue
}

// Obtain notifications for labels.
lmap, ok := rm.Resource().Attributes().Get("container.labels")
if ok && lmap.Type() == pcommon.ValueTypeMap && len(notis) > 0 {
notis = append(notis, g.notificationsFromLabels(lmap.Map(), cname)...)
}
// Iterate over all instrument scopes within the resource (e.g., module within an app).
ilms := rm.ScopeMetrics()
for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)

// Iterate over all metrics for the instrument scope.
ms := ilm.Metrics()
if ms.Len() > 0 {
// We refresh the TTL whenever we see any container-related metric.
lastSeen[cname] = now
}
for k := 0; k < ms.Len(); k++ {
m := ms.At(k)
notis = append(notis, g.notificationsFromMetric(m, cname)...)
}
}

// Send all notifications.
for _, notification := range notis {
if err := updateFn(notification); err != nil {
klog.Errorf("failed to send notification: %v", err)
}
// Obtain notifications for labels.
lmap, ok := rm.Resource().Attributes().Get("container.labels")
if ok && lmap.Type() == pcommon.ValueTypeMap && len(notis) > 0 {
notis = append(notis, g.notificationsFromLabels(lmap.Map(), cname)...)
}
}

// Send all notifications.
for _, notification := range notis {
if err := updateFn(notification); err != nil {
klog.Errorf("failed to send notification: %v", err)
}
}
}

func (g *GNMI) sweepStaleContainers(now time.Time, updateFn gnmit.UpdateFn, lastSeen map[string]time.Time) {
for cname, t := range lastSeen {
if now.Sub(t) > g.cfg.ContainerTTL {
delete(lastSeen, cname)
noti := &gpb.Notification{
Timestamp: now.UnixNano(),
Prefix: &gpb.Path{
Origin: g.cfg.Origin,
Target: g.cfg.TargetName,
Elem: []*gpb.PathElem{
{Name: "containers"},
{Name: "container", Key: map[string]string{"name": cname}},
},
},
Delete: []*gpb.Path{{}},
}
if err := updateFn(noti); err != nil {
klog.Errorf("failed to send delete notification: %v", err)
}
}
}()
return nil
}
}

func (g *GNMI) toPathElems(name string, attrs attrMap) []*gpb.PathElem {
Expand Down
Loading
Loading