From 9e0d65a45958a3f7f679c17079b695482a7a10cd Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Fri, 6 Mar 2026 08:50:42 -0800 Subject: [PATCH 1/2] feat: add OTel adapter for OTLP trace, metric, and log ingestion Implements an OpenTelemetry adapter that acts as an OTLP endpoint, accepting traces, metrics, and logs via both gRPC and HTTP (protobuf and JSON). Each span/metric/log record is shipped individually as JSON to LimaCharlie with resource and scope context preserved. Co-Authored-By: Claude Opus 4.6 --- containers/conf/all.go | 2 + containers/general/tool.go | 6 + go.mod | 6 +- go.sum | 4 + otel/client.go | 358 +++++++++++ otel/client_test.go | 1249 ++++++++++++++++++++++++++++++++++++ otel/convert.go | 393 ++++++++++++ 7 files changed, 2016 insertions(+), 2 deletions(-) create mode 100644 otel/client.go create mode 100644 otel/client_test.go create mode 100644 otel/convert.go diff --git a/containers/conf/all.go b/containers/conf/all.go index 55784b9..0fdffa5 100755 --- a/containers/conf/all.go +++ b/containers/conf/all.go @@ -25,6 +25,7 @@ import ( "github.com/refractionPOINT/usp-adapters/ms_graph" "github.com/refractionPOINT/usp-adapters/o365" "github.com/refractionPOINT/usp-adapters/okta" + "github.com/refractionPOINT/usp-adapters/otel" "github.com/refractionPOINT/usp-adapters/pandadoc" "github.com/refractionPOINT/usp-adapters/proofpoint_tap" "github.com/refractionPOINT/usp-adapters/pubsub" @@ -86,5 +87,6 @@ type GeneralConfigs struct { Sublime usp_sublime.SublimeConfig `json:"sublime" yaml:"sublime"` SentinelOne usp_sentinelone.SentinelOneConfig `json:"sentinel_one" yaml:"sentinel_one"` TrendMicro usp_trendmicro.TrendMicroConfig `json:"trendmicro" yaml:"trendmicro"` + OTel usp_otel.OTelConfig `json:"otel" yaml:"otel"` Wiz usp_wiz.WizConfig `json:"wiz" yaml:"wiz"` } diff --git a/containers/general/tool.go b/containers/general/tool.go index 4c7af0b..adf9132 100755 --- a/containers/general/tool.go +++ b/containers/general/tool.go @@ -39,6 +39,7 @@ import ( "github.com/refractionPOINT/usp-adapters/ms_graph" "github.com/refractionPOINT/usp-adapters/o365" "github.com/refractionPOINT/usp-adapters/okta" + "github.com/refractionPOINT/usp-adapters/otel" "github.com/refractionPOINT/usp-adapters/pandadoc" "github.com/refractionPOINT/usp-adapters/proofpoint_tap" "github.com/refractionPOINT/usp-adapters/pubsub" @@ -481,6 +482,11 @@ func runAdapter(ctx context.Context, method string, configs Configuration, showC configs.SentinelOne.ClientOptions.Architecture = "usp_adapter" configToShow = configs.SentinelOne client, chRunning, err = usp_sentinelone.NewSentinelOneAdapter(ctx, configs.SentinelOne) + } else if method == "otel" { + configs.OTel.ClientOptions = applyLogging(configs.OTel.ClientOptions) + configs.OTel.ClientOptions.Architecture = "usp_adapter" + configToShow = configs.OTel + client, chRunning, err = usp_otel.NewOTelAdapter(ctx, configs.OTel) } else if method == "trendmicro" { configs.TrendMicro.ClientOptions = applyLogging(configs.TrendMicro.ClientOptions) configs.TrendMicro.ClientOptions.Architecture = "usp_adapter" diff --git a/go.mod b/go.mod index e402334..5a97002 100644 --- a/go.mod +++ b/go.mod @@ -22,12 +22,15 @@ require ( github.com/refractionPOINT/go-uspclient v1.6.3 github.com/stretchr/testify v1.11.1 github.com/vmihailenco/msgpack/v5 v5.4.1 + go.opentelemetry.io/proto/otlp v1.9.0 golang.org/x/net v0.49.0 golang.org/x/oauth2 v0.34.0 golang.org/x/sync v0.19.0 golang.org/x/sys v0.40.0 golang.org/x/text v0.33.0 google.golang.org/api v0.264.0 + google.golang.org/grpc v1.78.0 + google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 ) @@ -101,6 +104,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.11 // indirect github.com/googleapis/gax-go/v2 v2.17.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect @@ -145,8 +149,6 @@ require ( google.golang.org/genproto v0.0.0-20260203192932-546029d2fa20 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260203192932-546029d2fa20 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260203192932-546029d2fa20 // indirect - google.golang.org/grpc v1.78.0 // indirect - google.golang.org/protobuf v1.36.11 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect www.velocidex.com/golang/binparsergen v0.1.1-0.20240404114946-8f66c7cf586e // indirect diff --git a/go.sum b/go.sum index aee3bab..858585c 100644 --- a/go.sum +++ b/go.sum @@ -256,6 +256,8 @@ github.com/googleapis/gax-go/v2 v2.17.0 h1:RksgfBpxqff0EZkDWYuz9q/uWsTVz+kf43LsZ github.com/googleapis/gax-go/v2 v2.17.0/go.mod h1:mzaqghpQp4JDh3HvADwrat+6M3MOIDp5YKHhb9PAgDY= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 h1:8Tjv8EJ+pM1xP8mK6egEbD1OgnVTyacbefKhmbLhIhU= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2/go.mod h1:pkJQ2tZHJ0aFOVEEot6oZmaVEZcRme73eIFmhiVuRWs= github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= @@ -383,6 +385,8 @@ go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4A go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= +go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= +go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/otel/client.go b/otel/client.go new file mode 100644 index 0000000..2fa8592 --- /dev/null +++ b/otel/client.go @@ -0,0 +1,358 @@ +package usp_otel + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "net/http" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/refractionPOINT/go-uspclient" + "github.com/refractionPOINT/go-uspclient/protocol" + + collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1" + colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" + + "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" +) + +const ( + defaultWriteTimeout = 60 * 10 + maxGRPCRecvMsgSize = 64 * 1024 * 1024 // 64MB, matching OTel Collector default + maxHTTPRequestBody = 64 * 1024 * 1024 + eventTypeOTelTrace = "otel_trace" + eventTypeOTelMetric = "otel_metric" + eventTypeOTelLog = "otel_log" +) + +type OTelConfig struct { + ClientOptions uspclient.ClientOptions `json:"client_options" yaml:"client_options"` + GRPCPort uint16 `json:"grpc_port,omitempty" yaml:"grpc_port,omitempty"` + HTTPPort uint16 `json:"http_port,omitempty" yaml:"http_port,omitempty"` + Interface string `json:"iface,omitempty" yaml:"iface,omitempty"` +} + +func (c *OTelConfig) Validate() error { + if err := c.ClientOptions.Validate(); err != nil { + return fmt.Errorf("client_options: %v", err) + } + if c.GRPCPort == 0 && c.HTTPPort == 0 { + return errors.New("at least one of grpc_port or http_port must be specified") + } + return nil +} + +type OTelAdapter struct { + conf OTelConfig + uspClient *uspclient.Client + grpcServer *grpc.Server + httpServer *http.Server + grpcListener net.Listener + httpListener net.Listener + wg sync.WaitGroup + isRunning uint32 + writeTimeout time.Duration +} + +func NewOTelAdapter(ctx context.Context, conf OTelConfig) (*OTelAdapter, chan struct{}, error) { + a := &OTelAdapter{ + conf: conf, + isRunning: 1, + writeTimeout: time.Duration(defaultWriteTimeout) * time.Second, + } + + var err error + a.uspClient, err = uspclient.NewClient(ctx, conf.ClientOptions) + if err != nil { + return nil, nil, err + } + + if conf.GRPCPort != 0 { + addr := fmt.Sprintf("%s:%d", conf.Interface, conf.GRPCPort) + a.grpcListener, err = net.Listen("tcp", addr) + if err != nil { + a.uspClient.Close() + return nil, nil, fmt.Errorf("grpc listen on %s: %v", addr, err) + } + a.grpcServer = grpc.NewServer( + grpc.MaxRecvMsgSize(maxGRPCRecvMsgSize), + ) + coltracepb.RegisterTraceServiceServer(a.grpcServer, &traceServiceServer{adapter: a}) + colmetricspb.RegisterMetricsServiceServer(a.grpcServer, &metricsServiceServer{adapter: a}) + collogspb.RegisterLogsServiceServer(a.grpcServer, &logsServiceServer{adapter: a}) + } + + if conf.HTTPPort != 0 { + addr := fmt.Sprintf("%s:%d", conf.Interface, conf.HTTPPort) + a.httpListener, err = net.Listen("tcp", addr) + if err != nil { + if a.grpcListener != nil { + a.grpcListener.Close() + } + a.uspClient.Close() + return nil, nil, fmt.Errorf("http listen on %s: %v", addr, err) + } + mux := http.NewServeMux() + mux.HandleFunc("/v1/traces", a.handleHTTPTraces) + mux.HandleFunc("/v1/metrics", a.handleHTTPMetrics) + mux.HandleFunc("/v1/logs", a.handleHTTPLogs) + a.httpServer = &http.Server{Handler: mux} + } + + chStopped := make(chan struct{}) + a.wg.Add(1) + go func() { + defer a.wg.Done() + var serverWg sync.WaitGroup + + if a.grpcServer != nil { + serverWg.Add(1) + go func() { + defer serverWg.Done() + conf.ClientOptions.DebugLog(fmt.Sprintf("gRPC server listening on %s:%d", conf.Interface, conf.GRPCPort)) + if err := a.grpcServer.Serve(a.grpcListener); err != nil { + if atomic.LoadUint32(&a.isRunning) == 1 { + conf.ClientOptions.OnError(fmt.Errorf("grpc serve: %v", err)) + } + } + }() + } + + if a.httpServer != nil { + serverWg.Add(1) + go func() { + defer serverWg.Done() + conf.ClientOptions.DebugLog(fmt.Sprintf("HTTP server listening on %s:%d", conf.Interface, conf.HTTPPort)) + if err := a.httpServer.Serve(a.httpListener); err != nil && err != http.ErrServerClosed { + if atomic.LoadUint32(&a.isRunning) == 1 { + conf.ClientOptions.OnError(fmt.Errorf("http serve: %v", err)) + } + } + }() + } + + serverWg.Wait() + close(chStopped) + }() + + return a, chStopped, nil +} + +func (a *OTelAdapter) Close() error { + a.conf.ClientOptions.DebugLog("closing") + atomic.StoreUint32(&a.isRunning, 0) + + if a.grpcServer != nil { + a.grpcServer.GracefulStop() + } + if a.httpServer != nil { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + a.httpServer.Shutdown(ctx) + } + + a.wg.Wait() + + err1 := a.uspClient.Drain(1 * time.Minute) + _, err2 := a.uspClient.Close() + if err1 != nil { + return err1 + } + return err2 +} + +func (a *OTelAdapter) shipJSON(eventType string, payload map[string]interface{}, timestampMs uint64) { + if timestampMs == 0 { + timestampMs = uint64(time.Now().UnixMilli()) + } + msg := &protocol.DataMessage{ + JsonPayload: payload, + EventType: eventType, + TimestampMs: timestampMs, + } + err := a.uspClient.Ship(msg, a.writeTimeout) + if err == uspclient.ErrorBufferFull { + a.conf.ClientOptions.OnWarning("stream falling behind") + err = a.uspClient.Ship(msg, 1*time.Hour) + } + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("Ship(): %v", err)) + } +} + +// gRPC service implementations + +type traceServiceServer struct { + coltracepb.UnimplementedTraceServiceServer + adapter *OTelAdapter +} + +func (s *traceServiceServer) Export(ctx context.Context, req *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) { + s.adapter.processTraces(req) + return &coltracepb.ExportTraceServiceResponse{}, nil +} + +type metricsServiceServer struct { + colmetricspb.UnimplementedMetricsServiceServer + adapter *OTelAdapter +} + +func (s *metricsServiceServer) Export(ctx context.Context, req *colmetricspb.ExportMetricsServiceRequest) (*colmetricspb.ExportMetricsServiceResponse, error) { + s.adapter.processMetrics(req) + return &colmetricspb.ExportMetricsServiceResponse{}, nil +} + +type logsServiceServer struct { + collogspb.UnimplementedLogsServiceServer + adapter *OTelAdapter +} + +func (s *logsServiceServer) Export(ctx context.Context, req *collogspb.ExportLogsServiceRequest) (*collogspb.ExportLogsServiceResponse, error) { + s.adapter.processLogs(req) + return &collogspb.ExportLogsServiceResponse{}, nil +} + +// HTTP handlers + +func (a *OTelAdapter) handleHTTPTraces(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + body, err := io.ReadAll(io.LimitReader(r.Body, maxHTTPRequestBody)) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + req := &coltracepb.ExportTraceServiceRequest{} + if err := unmarshalOTLPRequest(r, body, req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + a.processTraces(req) + marshalOTLPResponse(w, r, &coltracepb.ExportTraceServiceResponse{}) +} + +func (a *OTelAdapter) handleHTTPMetrics(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + body, err := io.ReadAll(io.LimitReader(r.Body, maxHTTPRequestBody)) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + req := &colmetricspb.ExportMetricsServiceRequest{} + if err := unmarshalOTLPRequest(r, body, req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + a.processMetrics(req) + marshalOTLPResponse(w, r, &colmetricspb.ExportMetricsServiceResponse{}) +} + +func (a *OTelAdapter) handleHTTPLogs(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + body, err := io.ReadAll(io.LimitReader(r.Body, maxHTTPRequestBody)) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + req := &collogspb.ExportLogsServiceRequest{} + if err := unmarshalOTLPRequest(r, body, req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + a.processLogs(req) + marshalOTLPResponse(w, r, &collogspb.ExportLogsServiceResponse{}) +} + +func unmarshalOTLPRequest(r *http.Request, body []byte, msg proto.Message) error { + ct := r.Header.Get("Content-Type") + if strings.HasPrefix(ct, "application/json") { + return protojson.Unmarshal(body, msg) + } + return proto.Unmarshal(body, msg) +} + +func marshalOTLPResponse(w http.ResponseWriter, r *http.Request, msg proto.Message) { + ct := r.Header.Get("Content-Type") + if strings.HasPrefix(ct, "application/json") { + respBytes, err := protojson.Marshal(msg) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(respBytes) + return + } + respBytes, err := proto.Marshal(msg) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/x-protobuf") + w.Write(respBytes) +} + +// Processing functions + +func (a *OTelAdapter) processTraces(req *coltracepb.ExportTraceServiceRequest) { + for _, rs := range req.ResourceSpans { + resource := convertResource(rs.Resource) + for _, ss := range rs.ScopeSpans { + scope := convertScope(ss.Scope) + for _, span := range ss.Spans { + record := convertSpan(span) + record["resource"] = resource + record["scope"] = scope + ts := nanoToMs(span.StartTimeUnixNano) + a.shipJSON(eventTypeOTelTrace, record, ts) + } + } + } +} + +func (a *OTelAdapter) processMetrics(req *colmetricspb.ExportMetricsServiceRequest) { + for _, rm := range req.ResourceMetrics { + resource := convertResource(rm.Resource) + for _, sm := range rm.ScopeMetrics { + scope := convertScope(sm.Scope) + for _, metric := range sm.Metrics { + record := convertMetric(metric) + record["resource"] = resource + record["scope"] = scope + a.shipJSON(eventTypeOTelMetric, record, 0) + } + } + } +} + +func (a *OTelAdapter) processLogs(req *collogspb.ExportLogsServiceRequest) { + for _, rl := range req.ResourceLogs { + resource := convertResource(rl.Resource) + for _, sl := range rl.ScopeLogs { + scope := convertScope(sl.Scope) + for _, logRecord := range sl.LogRecords { + record := convertLogRecord(logRecord) + record["resource"] = resource + record["scope"] = scope + ts := nanoToMs(logRecord.TimeUnixNano) + a.shipJSON(eventTypeOTelLog, record, ts) + } + } + } +} diff --git a/otel/client_test.go b/otel/client_test.go new file mode 100644 index 0000000..45236d0 --- /dev/null +++ b/otel/client_test.go @@ -0,0 +1,1249 @@ +package usp_otel + +import ( + "bytes" + "context" + "encoding/hex" + "fmt" + "io" + "net" + "net/http" + "testing" + "time" + + "github.com/refractionPOINT/go-uspclient" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1" + colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" + commonpb "go.opentelemetry.io/proto/otlp/common/v1" + logspb "go.opentelemetry.io/proto/otlp/logs/v1" + metricspb "go.opentelemetry.io/proto/otlp/metrics/v1" + resourcepb "go.opentelemetry.io/proto/otlp/resource/v1" + tracepb "go.opentelemetry.io/proto/otlp/trace/v1" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" +) + +func testClientOptions(t *testing.T) uspclient.ClientOptions { + return uspclient.ClientOptions{ + TestSinkMode: true, + OnError: func(err error) { + t.Logf("ERROR: %v", err) + }, + OnWarning: func(msg string) { + t.Logf("WARNING: %s", msg) + }, + DebugLog: func(msg string) { + t.Logf("DEBUG: %s", msg) + }, + } +} + +func makeTraceID() []byte { + b, _ := hex.DecodeString("0102030405060708090a0b0c0d0e0f10") + return b +} + +func makeSpanID() []byte { + b, _ := hex.DecodeString("0102030405060708") + return b +} + +func makeParentSpanID() []byte { + b, _ := hex.DecodeString("0807060504030201") + return b +} + +// --- Conversion tests --- + +func TestConvertAttributes(t *testing.T) { + attrs := []*commonpb.KeyValue{ + {Key: "string_key", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "hello"}}}, + {Key: "int_key", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_IntValue{IntValue: 42}}}, + {Key: "double_key", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_DoubleValue{DoubleValue: 3.14}}}, + {Key: "bool_key", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_BoolValue{BoolValue: true}}}, + {Key: "array_key", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_ArrayValue{ + ArrayValue: &commonpb.ArrayValue{ + Values: []*commonpb.AnyValue{ + {Value: &commonpb.AnyValue_StringValue{StringValue: "a"}}, + {Value: &commonpb.AnyValue_IntValue{IntValue: 1}}, + }, + }, + }}}, + {Key: "kvlist_key", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_KvlistValue{ + KvlistValue: &commonpb.KeyValueList{ + Values: []*commonpb.KeyValue{ + {Key: "nested", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "value"}}}, + }, + }, + }}}, + {Key: "bytes_key", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_BytesValue{BytesValue: []byte{0xDE, 0xAD}}}}, + } + + result := convertAttributes(attrs) + + assert.Equal(t, "hello", result["string_key"]) + assert.Equal(t, int64(42), result["int_key"]) + assert.Equal(t, 3.14, result["double_key"]) + assert.Equal(t, true, result["bool_key"]) + assert.Equal(t, []interface{}{"a", int64(1)}, result["array_key"]) + assert.Equal(t, map[string]interface{}{"nested": "value"}, result["kvlist_key"]) + assert.Equal(t, "3q0=", result["bytes_key"]) // base64 of 0xDEAD +} + +func TestConvertAttributesNil(t *testing.T) { + assert.Nil(t, convertAttributes(nil)) + assert.Nil(t, convertAttributes([]*commonpb.KeyValue{})) +} + +func TestConvertAnyValueNil(t *testing.T) { + assert.Nil(t, convertAnyValue(nil)) + assert.Nil(t, convertAnyValue(&commonpb.AnyValue{})) +} + +func TestConvertResource(t *testing.T) { + r := &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "my-service"}}}, + {Key: "service.version", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "1.0.0"}}}, + }, + } + result := convertResource(r) + assert.Equal(t, "my-service", result["service.name"]) + assert.Equal(t, "1.0.0", result["service.version"]) +} + +func TestConvertResourceNil(t *testing.T) { + assert.Nil(t, convertResource(nil)) +} + +func TestConvertScope(t *testing.T) { + scope := &commonpb.InstrumentationScope{ + Name: "my-library", + Version: "2.0", + Attributes: []*commonpb.KeyValue{ + {Key: "scope_attr", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "val"}}}, + }, + } + result := convertScope(scope) + assert.Equal(t, "my-library", result["name"]) + assert.Equal(t, "2.0", result["version"]) + assert.Equal(t, map[string]interface{}{"scope_attr": "val"}, result["attributes"]) +} + +func TestConvertScopeNil(t *testing.T) { + assert.Nil(t, convertScope(nil)) +} + +func TestConvertSpan(t *testing.T) { + traceID := makeTraceID() + spanID := makeSpanID() + parentSpanID := makeParentSpanID() + + span := &tracepb.Span{ + TraceId: traceID, + SpanId: spanID, + ParentSpanId: parentSpanID, + TraceState: "key=value", + Name: "GET /api/users", + Kind: tracepb.Span_SPAN_KIND_SERVER, + StartTimeUnixNano: 1000000000000, // 1s in nanoseconds + EndTimeUnixNano: 2000000000000, // 2s in nanoseconds + Attributes: []*commonpb.KeyValue{ + {Key: "http.method", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "GET"}}}, + {Key: "http.status_code", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_IntValue{IntValue: 200}}}, + }, + Status: &tracepb.Status{ + Code: tracepb.Status_STATUS_CODE_OK, + Message: "success", + }, + Events: []*tracepb.Span_Event{ + { + Name: "exception", + TimeUnixNano: 1500000000000, + Attributes: []*commonpb.KeyValue{ + {Key: "exception.message", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "not found"}}}, + }, + DroppedAttributesCount: 1, + }, + }, + Links: []*tracepb.Span_Link{ + { + TraceId: traceID, + SpanId: spanID, + TraceState: "linked", + Attributes: []*commonpb.KeyValue{ + {Key: "link_attr", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "v"}}}, + }, + DroppedAttributesCount: 2, + }, + }, + DroppedAttributesCount: 3, + DroppedEventsCount: 4, + DroppedLinksCount: 5, + } + + result := convertSpan(span) + + assert.Equal(t, hex.EncodeToString(traceID), result["trace_id"]) + assert.Equal(t, hex.EncodeToString(spanID), result["span_id"]) + assert.Equal(t, hex.EncodeToString(parentSpanID), result["parent_span_id"]) + assert.Equal(t, "key=value", result["trace_state"]) + assert.Equal(t, "GET /api/users", result["name"]) + assert.Equal(t, "SPAN_KIND_SERVER", result["kind"]) + assert.Equal(t, uint64(1000000000000), result["start_time_unix_nano"]) + assert.Equal(t, uint64(2000000000000), result["end_time_unix_nano"]) + + attrs := result["attributes"].(map[string]interface{}) + assert.Equal(t, "GET", attrs["http.method"]) + assert.Equal(t, int64(200), attrs["http.status_code"]) + + status := result["status"].(map[string]interface{}) + assert.Equal(t, "STATUS_CODE_OK", status["code"]) + assert.Equal(t, "success", status["message"]) + + events := result["events"].([]interface{}) + require.Len(t, events, 1) + event := events[0].(map[string]interface{}) + assert.Equal(t, "exception", event["name"]) + assert.Equal(t, uint64(1500000000000), event["time_unix_nano"]) + assert.Equal(t, uint32(1), event["dropped_attributes_count"]) + + links := result["links"].([]interface{}) + require.Len(t, links, 1) + link := links[0].(map[string]interface{}) + assert.Equal(t, "linked", link["trace_state"]) + assert.Equal(t, uint32(2), link["dropped_attributes_count"]) + + assert.Equal(t, uint32(3), result["dropped_attributes_count"]) + assert.Equal(t, uint32(4), result["dropped_events_count"]) + assert.Equal(t, uint32(5), result["dropped_links_count"]) +} + +func TestConvertSpanMinimal(t *testing.T) { + span := &tracepb.Span{ + TraceId: makeTraceID(), + SpanId: makeSpanID(), + Name: "simple", + Kind: tracepb.Span_SPAN_KIND_INTERNAL, + StartTimeUnixNano: 1000000000000, + EndTimeUnixNano: 2000000000000, + } + result := convertSpan(span) + assert.Equal(t, "simple", result["name"]) + assert.Nil(t, result["parent_span_id"]) + assert.Nil(t, result["trace_state"]) + assert.Nil(t, result["attributes"]) + assert.Nil(t, result["status"]) + assert.Nil(t, result["events"]) + assert.Nil(t, result["links"]) +} + +func TestConvertLogRecord(t *testing.T) { + traceID := makeTraceID() + spanID := makeSpanID() + + lr := &logspb.LogRecord{ + TimeUnixNano: 1000000000000, + ObservedTimeUnixNano: 1000000100000, + SeverityNumber: logspb.SeverityNumber_SEVERITY_NUMBER_ERROR, + SeverityText: "ERROR", + Body: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "something went wrong"}}, + Attributes: []*commonpb.KeyValue{ + {Key: "log.source", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "app"}}}, + }, + TraceId: traceID, + SpanId: spanID, + Flags: 1, + DroppedAttributesCount: 2, + } + + result := convertLogRecord(lr) + + assert.Equal(t, uint64(1000000000000), result["time_unix_nano"]) + assert.Equal(t, uint64(1000000100000), result["observed_time_unix_nano"]) + assert.Equal(t, int32(logspb.SeverityNumber_SEVERITY_NUMBER_ERROR), result["severity_number"]) + assert.Equal(t, "ERROR", result["severity_text"]) + assert.Equal(t, "something went wrong", result["body"]) + assert.Equal(t, hex.EncodeToString(traceID), result["trace_id"]) + assert.Equal(t, hex.EncodeToString(spanID), result["span_id"]) + assert.Equal(t, uint32(1), result["flags"]) + assert.Equal(t, uint32(2), result["dropped_attributes_count"]) + + attrs := result["attributes"].(map[string]interface{}) + assert.Equal(t, "app", attrs["log.source"]) +} + +func TestConvertLogRecordMinimal(t *testing.T) { + lr := &logspb.LogRecord{} + result := convertLogRecord(lr) + assert.Empty(t, result) +} + +func TestConvertMetricGauge(t *testing.T) { + metric := &metricspb.Metric{ + Name: "cpu.usage", + Description: "CPU usage percentage", + Unit: "%", + Data: &metricspb.Metric_Gauge{ + Gauge: &metricspb.Gauge{ + DataPoints: []*metricspb.NumberDataPoint{ + { + StartTimeUnixNano: 1000000000000, + TimeUnixNano: 2000000000000, + Value: &metricspb.NumberDataPoint_AsDouble{AsDouble: 75.5}, + Attributes: []*commonpb.KeyValue{ + {Key: "host", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "server1"}}}, + }, + }, + { + StartTimeUnixNano: 1000000000000, + TimeUnixNano: 2000000000000, + Value: &metricspb.NumberDataPoint_AsInt{AsInt: 80}, + }, + }, + }, + }, + } + + result := convertMetric(metric) + + assert.Equal(t, "cpu.usage", result["name"]) + assert.Equal(t, "CPU usage percentage", result["description"]) + assert.Equal(t, "%", result["unit"]) + assert.Equal(t, "gauge", result["type"]) + + dps := result["data_points"].([]interface{}) + require.Len(t, dps, 2) + + dp0 := dps[0].(map[string]interface{}) + assert.Equal(t, 75.5, dp0["value"]) + assert.Equal(t, "server1", dp0["attributes"].(map[string]interface{})["host"]) + + dp1 := dps[1].(map[string]interface{}) + assert.Equal(t, int64(80), dp1["value"]) +} + +func TestConvertMetricSum(t *testing.T) { + metric := &metricspb.Metric{ + Name: "http.requests", + Data: &metricspb.Metric_Sum{ + Sum: &metricspb.Sum{ + AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + IsMonotonic: true, + DataPoints: []*metricspb.NumberDataPoint{ + { + TimeUnixNano: 2000000000000, + Value: &metricspb.NumberDataPoint_AsInt{AsInt: 100}, + }, + }, + }, + }, + } + + result := convertMetric(metric) + assert.Equal(t, "sum", result["type"]) + assert.Equal(t, "AGGREGATION_TEMPORALITY_CUMULATIVE", result["aggregation_temporality"]) + assert.Equal(t, true, result["is_monotonic"]) +} + +func TestConvertMetricHistogram(t *testing.T) { + sum := 123.0 + min := 1.0 + max := 100.0 + metric := &metricspb.Metric{ + Name: "http.request.duration", + Data: &metricspb.Metric_Histogram{ + Histogram: &metricspb.Histogram{ + AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + DataPoints: []*metricspb.HistogramDataPoint{ + { + StartTimeUnixNano: 1000000000000, + TimeUnixNano: 2000000000000, + Count: 10, + Sum: &sum, + Min: &min, + Max: &max, + BucketCounts: []uint64{2, 3, 5}, + ExplicitBounds: []float64{10.0, 50.0}, + Exemplars: []*metricspb.Exemplar{ + { + TimeUnixNano: 1500000000000, + Value: &metricspb.Exemplar_AsDouble{AsDouble: 42.0}, + TraceId: makeTraceID(), + SpanId: makeSpanID(), + FilteredAttributes: []*commonpb.KeyValue{ + {Key: "filtered", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "val"}}}, + }, + }, + }, + }, + }, + }, + }, + } + + result := convertMetric(metric) + assert.Equal(t, "histogram", result["type"]) + assert.Equal(t, "AGGREGATION_TEMPORALITY_DELTA", result["aggregation_temporality"]) + + dps := result["data_points"].([]interface{}) + require.Len(t, dps, 1) + dp := dps[0].(map[string]interface{}) + assert.Equal(t, uint64(10), dp["count"]) + assert.Equal(t, 123.0, dp["sum"]) + assert.Equal(t, 1.0, dp["min"]) + assert.Equal(t, 100.0, dp["max"]) + assert.Equal(t, []uint64{2, 3, 5}, dp["bucket_counts"]) + assert.Equal(t, []float64{10.0, 50.0}, dp["explicit_bounds"]) + + exemplars := dp["exemplars"].([]interface{}) + require.Len(t, exemplars, 1) + ex := exemplars[0].(map[string]interface{}) + assert.Equal(t, 42.0, ex["value"]) + assert.Equal(t, hex.EncodeToString(makeTraceID()), ex["trace_id"]) + assert.Equal(t, hex.EncodeToString(makeSpanID()), ex["span_id"]) + assert.Equal(t, "val", ex["filtered_attributes"].(map[string]interface{})["filtered"]) +} + +func TestConvertMetricExponentialHistogram(t *testing.T) { + sum := 500.0 + metric := &metricspb.Metric{ + Name: "exp_hist", + Data: &metricspb.Metric_ExponentialHistogram{ + ExponentialHistogram: &metricspb.ExponentialHistogram{ + AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + DataPoints: []*metricspb.ExponentialHistogramDataPoint{ + { + TimeUnixNano: 2000000000000, + Count: 100, + Sum: &sum, + Scale: 5, + ZeroCount: 3, + Positive: &metricspb.ExponentialHistogramDataPoint_Buckets{ + Offset: 1, + BucketCounts: []uint64{10, 20, 30}, + }, + Negative: &metricspb.ExponentialHistogramDataPoint_Buckets{ + Offset: -2, + BucketCounts: []uint64{5, 15}, + }, + }, + }, + }, + }, + } + + result := convertMetric(metric) + assert.Equal(t, "exponential_histogram", result["type"]) + + dps := result["data_points"].([]interface{}) + require.Len(t, dps, 1) + dp := dps[0].(map[string]interface{}) + assert.Equal(t, uint64(100), dp["count"]) + assert.Equal(t, 500.0, dp["sum"]) + assert.Equal(t, int32(5), dp["scale"]) + assert.Equal(t, uint64(3), dp["zero_count"]) + + pos := dp["positive"].(map[string]interface{}) + assert.Equal(t, int32(1), pos["offset"]) + assert.Equal(t, []uint64{10, 20, 30}, pos["bucket_counts"]) + + neg := dp["negative"].(map[string]interface{}) + assert.Equal(t, int32(-2), neg["offset"]) + assert.Equal(t, []uint64{5, 15}, neg["bucket_counts"]) +} + +func TestConvertMetricSummary(t *testing.T) { + metric := &metricspb.Metric{ + Name: "request.duration.summary", + Data: &metricspb.Metric_Summary{ + Summary: &metricspb.Summary{ + DataPoints: []*metricspb.SummaryDataPoint{ + { + TimeUnixNano: 2000000000000, + Count: 50, + Sum: 1234.5, + QuantileValues: []*metricspb.SummaryDataPoint_ValueAtQuantile{ + {Quantile: 0.5, Value: 20.0}, + {Quantile: 0.99, Value: 99.0}, + }, + }, + }, + }, + }, + } + + result := convertMetric(metric) + assert.Equal(t, "summary", result["type"]) + + dps := result["data_points"].([]interface{}) + require.Len(t, dps, 1) + dp := dps[0].(map[string]interface{}) + assert.Equal(t, uint64(50), dp["count"]) + assert.Equal(t, 1234.5, dp["sum"]) + + qvs := dp["quantile_values"].([]interface{}) + require.Len(t, qvs, 2) + assert.Equal(t, 0.5, qvs[0].(map[string]interface{})["quantile"]) + assert.Equal(t, 20.0, qvs[0].(map[string]interface{})["value"]) + assert.Equal(t, 0.99, qvs[1].(map[string]interface{})["quantile"]) + assert.Equal(t, 99.0, qvs[1].(map[string]interface{})["value"]) +} + +func TestNanoToMs(t *testing.T) { + assert.Equal(t, uint64(0), nanoToMs(0)) + assert.Equal(t, uint64(1000), nanoToMs(1000000000)) + assert.Equal(t, uint64(1), nanoToMs(1000000)) + assert.Equal(t, uint64(1500), nanoToMs(1500000000)) +} + +// --- Validation tests --- + +func validClientOptions() uspclient.ClientOptions { + return uspclient.ClientOptions{ + TestSinkMode: true, + Identity: uspclient.Identity{ + Oid: "test-oid", + InstallationKey: "test-key", + }, + Platform: "json", + } +} + +func TestOTelConfigValidate(t *testing.T) { + t.Run("valid with grpc", func(t *testing.T) { + conf := OTelConfig{ + ClientOptions: validClientOptions(), + GRPCPort: 4317, + } + assert.NoError(t, conf.Validate()) + }) + + t.Run("valid with http", func(t *testing.T) { + conf := OTelConfig{ + ClientOptions: validClientOptions(), + HTTPPort: 4318, + } + assert.NoError(t, conf.Validate()) + }) + + t.Run("valid with both", func(t *testing.T) { + conf := OTelConfig{ + ClientOptions: validClientOptions(), + GRPCPort: 4317, + HTTPPort: 4318, + } + assert.NoError(t, conf.Validate()) + }) + + t.Run("invalid no ports", func(t *testing.T) { + conf := OTelConfig{ + ClientOptions: validClientOptions(), + } + err := conf.Validate() + assert.Error(t, err) + assert.Contains(t, err.Error(), "grpc_port or http_port") + }) + + t.Run("invalid client options", func(t *testing.T) { + conf := OTelConfig{ + ClientOptions: uspclient.ClientOptions{}, + GRPCPort: 4317, + } + err := conf.Validate() + assert.Error(t, err) + assert.Contains(t, err.Error(), "client_options") + }) +} + +// --- Integration tests: gRPC --- + +func TestGRPCTraceExport(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + GRPCPort: 0, // Will use a random port + } + + // Use a random available port + conf.GRPCPort = getFreePort(t) + + ctx := context.Background() + adapter, chStopped, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + // Wait for server to start + time.Sleep(100 * time.Millisecond) + + // Connect gRPC client + conn, err := grpc.NewClient( + fmt.Sprintf("localhost:%d", conf.GRPCPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer conn.Close() + + client := coltracepb.NewTraceServiceClient(conn) + + req := &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: []*tracepb.ResourceSpans{ + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "test-svc"}}}, + }, + }, + ScopeSpans: []*tracepb.ScopeSpans{ + { + Scope: &commonpb.InstrumentationScope{Name: "test-lib", Version: "1.0"}, + Spans: []*tracepb.Span{ + { + TraceId: makeTraceID(), + SpanId: makeSpanID(), + Name: "test-span", + Kind: tracepb.Span_SPAN_KIND_SERVER, + StartTimeUnixNano: 1000000000000, + EndTimeUnixNano: 2000000000000, + }, + }, + }, + }, + }, + }, + } + + resp, err := client.Export(ctx, req) + require.NoError(t, err) + assert.NotNil(t, resp) + + // Verify adapter is still running + select { + case <-chStopped: + t.Fatal("adapter stopped unexpectedly") + default: + } +} + +func TestGRPCMetricsExport(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + GRPCPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + conn, err := grpc.NewClient( + fmt.Sprintf("localhost:%d", conf.GRPCPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer conn.Close() + + client := colmetricspb.NewMetricsServiceClient(conn) + + req := &colmetricspb.ExportMetricsServiceRequest{ + ResourceMetrics: []*metricspb.ResourceMetrics{ + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "test-svc"}}}, + }, + }, + ScopeMetrics: []*metricspb.ScopeMetrics{ + { + Scope: &commonpb.InstrumentationScope{Name: "test-lib"}, + Metrics: []*metricspb.Metric{ + { + Name: "test.counter", + Description: "A test counter", + Unit: "1", + Data: &metricspb.Metric_Sum{ + Sum: &metricspb.Sum{ + AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + IsMonotonic: true, + DataPoints: []*metricspb.NumberDataPoint{ + { + TimeUnixNano: 2000000000000, + Value: &metricspb.NumberDataPoint_AsInt{AsInt: 42}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + resp, err := client.Export(ctx, req) + require.NoError(t, err) + assert.NotNil(t, resp) +} + +func TestGRPCLogsExport(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + GRPCPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + conn, err := grpc.NewClient( + fmt.Sprintf("localhost:%d", conf.GRPCPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer conn.Close() + + client := collogspb.NewLogsServiceClient(conn) + + req := &collogspb.ExportLogsServiceRequest{ + ResourceLogs: []*logspb.ResourceLogs{ + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "test-svc"}}}, + }, + }, + ScopeLogs: []*logspb.ScopeLogs{ + { + Scope: &commonpb.InstrumentationScope{Name: "test-lib"}, + LogRecords: []*logspb.LogRecord{ + { + TimeUnixNano: 1000000000000, + SeverityNumber: logspb.SeverityNumber_SEVERITY_NUMBER_INFO, + SeverityText: "INFO", + Body: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "test log message"}}, + }, + }, + }, + }, + }, + }, + } + + resp, err := client.Export(ctx, req) + require.NoError(t, err) + assert.NotNil(t, resp) +} + +// --- Integration tests: HTTP --- + +func TestHTTPTraceExportProtobuf(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + req := &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: []*tracepb.ResourceSpans{ + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "http-test"}}}, + }, + }, + ScopeSpans: []*tracepb.ScopeSpans{ + { + Spans: []*tracepb.Span{ + { + TraceId: makeTraceID(), + SpanId: makeSpanID(), + Name: "http-span", + Kind: tracepb.Span_SPAN_KIND_CLIENT, + StartTimeUnixNano: 1000000000000, + EndTimeUnixNano: 2000000000000, + }, + }, + }, + }, + }, + }, + } + + body, err := proto.Marshal(req) + require.NoError(t, err) + + resp, err := http.Post( + fmt.Sprintf("http://localhost:%d/v1/traces", conf.HTTPPort), + "application/x-protobuf", + bytes.NewReader(body), + ) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type")) + + respBody, err := io.ReadAll(resp.Body) + require.NoError(t, err) + respMsg := &coltracepb.ExportTraceServiceResponse{} + require.NoError(t, proto.Unmarshal(respBody, respMsg)) +} + +func TestHTTPTraceExportJSON(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + req := &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: []*tracepb.ResourceSpans{ + { + ScopeSpans: []*tracepb.ScopeSpans{ + { + Spans: []*tracepb.Span{ + { + TraceId: makeTraceID(), + SpanId: makeSpanID(), + Name: "json-span", + StartTimeUnixNano: 1000000000000, + EndTimeUnixNano: 2000000000000, + }, + }, + }, + }, + }, + }, + } + + body, err := protojson.Marshal(req) + require.NoError(t, err) + + resp, err := http.Post( + fmt.Sprintf("http://localhost:%d/v1/traces", conf.HTTPPort), + "application/json", + bytes.NewReader(body), + ) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "application/json", resp.Header.Get("Content-Type")) +} + +func TestHTTPMetricsExportProtobuf(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + req := &colmetricspb.ExportMetricsServiceRequest{ + ResourceMetrics: []*metricspb.ResourceMetrics{ + { + ScopeMetrics: []*metricspb.ScopeMetrics{ + { + Metrics: []*metricspb.Metric{ + { + Name: "test.gauge", + Data: &metricspb.Metric_Gauge{ + Gauge: &metricspb.Gauge{ + DataPoints: []*metricspb.NumberDataPoint{ + {TimeUnixNano: 2000000000000, Value: &metricspb.NumberDataPoint_AsDouble{AsDouble: 99.9}}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + body, err := proto.Marshal(req) + require.NoError(t, err) + + resp, err := http.Post( + fmt.Sprintf("http://localhost:%d/v1/metrics", conf.HTTPPort), + "application/x-protobuf", + bytes.NewReader(body), + ) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) +} + +func TestHTTPLogsExportProtobuf(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + req := &collogspb.ExportLogsServiceRequest{ + ResourceLogs: []*logspb.ResourceLogs{ + { + ScopeLogs: []*logspb.ScopeLogs{ + { + LogRecords: []*logspb.LogRecord{ + { + TimeUnixNano: 1000000000000, + SeverityNumber: logspb.SeverityNumber_SEVERITY_NUMBER_WARN, + SeverityText: "WARN", + Body: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "disk space low"}}, + }, + }, + }, + }, + }, + }, + } + + body, err := proto.Marshal(req) + require.NoError(t, err) + + resp, err := http.Post( + fmt.Sprintf("http://localhost:%d/v1/logs", conf.HTTPPort), + "application/x-protobuf", + bytes.NewReader(body), + ) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) +} + +func TestHTTPMethodNotAllowed(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + endpoints := []string{"/v1/traces", "/v1/metrics", "/v1/logs"} + for _, ep := range endpoints { + t.Run(ep, func(t *testing.T) { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", conf.HTTPPort, ep)) + require.NoError(t, err) + defer resp.Body.Close() + assert.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode) + }) + } +} + +func TestHTTPBadBody(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + resp, err := http.Post( + fmt.Sprintf("http://localhost:%d/v1/traces", conf.HTTPPort), + "application/x-protobuf", + bytes.NewReader([]byte("not valid protobuf")), + ) + require.NoError(t, err) + defer resp.Body.Close() + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) +} + +func TestHTTPBadJSON(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + resp, err := http.Post( + fmt.Sprintf("http://localhost:%d/v1/traces", conf.HTTPPort), + "application/json", + bytes.NewReader([]byte("{invalid json")), + ) + require.NoError(t, err) + defer resp.Body.Close() + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) +} + +// --- Adapter lifecycle tests --- + +func TestAdapterGRPCOnly(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + GRPCPort: getFreePort(t), + } + + ctx := context.Background() + adapter, chStopped, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + + // Verify it's running + select { + case <-chStopped: + t.Fatal("adapter stopped unexpectedly") + default: + } + + require.NoError(t, adapter.Close()) + + // Verify it stopped + select { + case <-chStopped: + case <-time.After(5 * time.Second): + t.Fatal("adapter didn't stop after Close()") + } +} + +func TestAdapterHTTPOnly(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, chStopped, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + + select { + case <-chStopped: + t.Fatal("adapter stopped unexpectedly") + default: + } + + require.NoError(t, adapter.Close()) + + select { + case <-chStopped: + case <-time.After(5 * time.Second): + t.Fatal("adapter didn't stop after Close()") + } +} + +func TestAdapterBothPorts(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + GRPCPort: getFreePort(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, chStopped, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // Send via gRPC + conn, err := grpc.NewClient( + fmt.Sprintf("localhost:%d", conf.GRPCPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer conn.Close() + + traceClient := coltracepb.NewTraceServiceClient(conn) + _, err = traceClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: []*tracepb.ResourceSpans{ + {ScopeSpans: []*tracepb.ScopeSpans{ + {Spans: []*tracepb.Span{{TraceId: makeTraceID(), SpanId: makeSpanID(), Name: "grpc-span", StartTimeUnixNano: 1000000000000, EndTimeUnixNano: 2000000000000}}}, + }}, + }, + }) + require.NoError(t, err) + + // Send via HTTP + httpReq := &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: []*tracepb.ResourceSpans{ + {ScopeSpans: []*tracepb.ScopeSpans{ + {Spans: []*tracepb.Span{{TraceId: makeTraceID(), SpanId: makeSpanID(), Name: "http-span", StartTimeUnixNano: 1000000000000, EndTimeUnixNano: 2000000000000}}}, + }}, + }, + } + body, err := proto.Marshal(httpReq) + require.NoError(t, err) + resp, err := http.Post( + fmt.Sprintf("http://localhost:%d/v1/traces", conf.HTTPPort), + "application/x-protobuf", + bytes.NewReader(body), + ) + require.NoError(t, err) + resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + // Graceful shutdown + require.NoError(t, adapter.Close()) + select { + case <-chStopped: + case <-time.After(5 * time.Second): + t.Fatal("adapter didn't stop after Close()") + } +} + +func TestGRPCMultipleResourceSpans(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + GRPCPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + conn, err := grpc.NewClient( + fmt.Sprintf("localhost:%d", conf.GRPCPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer conn.Close() + + client := coltracepb.NewTraceServiceClient(conn) + + // Send request with multiple resources, scopes, and spans + req := &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: []*tracepb.ResourceSpans{ + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "svc-a"}}}, + }, + }, + ScopeSpans: []*tracepb.ScopeSpans{ + { + Scope: &commonpb.InstrumentationScope{Name: "lib-1"}, + Spans: []*tracepb.Span{ + {TraceId: makeTraceID(), SpanId: makeSpanID(), Name: "span-1", StartTimeUnixNano: 1000000000000, EndTimeUnixNano: 2000000000000}, + {TraceId: makeTraceID(), SpanId: makeSpanID(), Name: "span-2", StartTimeUnixNano: 1000000000000, EndTimeUnixNano: 2000000000000}, + }, + }, + { + Scope: &commonpb.InstrumentationScope{Name: "lib-2"}, + Spans: []*tracepb.Span{ + {TraceId: makeTraceID(), SpanId: makeSpanID(), Name: "span-3", StartTimeUnixNano: 1000000000000, EndTimeUnixNano: 2000000000000}, + }, + }, + }, + }, + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "svc-b"}}}, + }, + }, + ScopeSpans: []*tracepb.ScopeSpans{ + { + Spans: []*tracepb.Span{ + {TraceId: makeTraceID(), SpanId: makeSpanID(), Name: "span-4", StartTimeUnixNano: 1000000000000, EndTimeUnixNano: 2000000000000}, + }, + }, + }, + }, + }, + } + + resp, err := client.Export(ctx, req) + require.NoError(t, err) + assert.NotNil(t, resp) +} + +func TestGRPCEmptyRequest(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + GRPCPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + conn, err := grpc.NewClient( + fmt.Sprintf("localhost:%d", conf.GRPCPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer conn.Close() + + // Empty requests should succeed without errors + traceClient := coltracepb.NewTraceServiceClient(conn) + _, err = traceClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{}) + assert.NoError(t, err) + + metricsClient := colmetricspb.NewMetricsServiceClient(conn) + _, err = metricsClient.Export(ctx, &colmetricspb.ExportMetricsServiceRequest{}) + assert.NoError(t, err) + + logsClient := collogspb.NewLogsServiceClient(conn) + _, err = logsClient.Export(ctx, &collogspb.ExportLogsServiceRequest{}) + assert.NoError(t, err) +} + +// --- Helpers --- + +func getFreePort(t *testing.T) uint16 { + t.Helper() + l, err := (&net.ListenConfig{}).Listen(context.Background(), "tcp", "127.0.0.1:0") + require.NoError(t, err) + port := l.Addr().(*net.TCPAddr).Port + l.Close() + return uint16(port) +} diff --git a/otel/convert.go b/otel/convert.go new file mode 100644 index 0000000..8afd324 --- /dev/null +++ b/otel/convert.go @@ -0,0 +1,393 @@ +package usp_otel + +import ( + "encoding/base64" + "encoding/hex" + + commonpb "go.opentelemetry.io/proto/otlp/common/v1" + logspb "go.opentelemetry.io/proto/otlp/logs/v1" + metricspb "go.opentelemetry.io/proto/otlp/metrics/v1" + resourcepb "go.opentelemetry.io/proto/otlp/resource/v1" + tracepb "go.opentelemetry.io/proto/otlp/trace/v1" +) + +func nanoToMs(nanos uint64) uint64 { + if nanos == 0 { + return 0 + } + return nanos / 1_000_000 +} + +func convertAttributes(attrs []*commonpb.KeyValue) map[string]interface{} { + if len(attrs) == 0 { + return nil + } + result := make(map[string]interface{}, len(attrs)) + for _, kv := range attrs { + result[kv.Key] = convertAnyValue(kv.Value) + } + return result +} + +func convertAnyValue(v *commonpb.AnyValue) interface{} { + if v == nil { + return nil + } + switch val := v.Value.(type) { + case *commonpb.AnyValue_StringValue: + return val.StringValue + case *commonpb.AnyValue_IntValue: + return val.IntValue + case *commonpb.AnyValue_DoubleValue: + return val.DoubleValue + case *commonpb.AnyValue_BoolValue: + return val.BoolValue + case *commonpb.AnyValue_ArrayValue: + if val.ArrayValue == nil { + return nil + } + arr := make([]interface{}, len(val.ArrayValue.Values)) + for i, v := range val.ArrayValue.Values { + arr[i] = convertAnyValue(v) + } + return arr + case *commonpb.AnyValue_KvlistValue: + if val.KvlistValue == nil { + return nil + } + return convertAttributes(val.KvlistValue.Values) + case *commonpb.AnyValue_BytesValue: + return base64.StdEncoding.EncodeToString(val.BytesValue) + } + return nil +} + +func convertResource(r *resourcepb.Resource) map[string]interface{} { + if r == nil { + return nil + } + return convertAttributes(r.Attributes) +} + +func convertScope(scope *commonpb.InstrumentationScope) map[string]interface{} { + if scope == nil { + return nil + } + result := map[string]interface{}{} + if scope.Name != "" { + result["name"] = scope.Name + } + if scope.Version != "" { + result["version"] = scope.Version + } + if attrs := convertAttributes(scope.Attributes); attrs != nil { + result["attributes"] = attrs + } + return result +} + +// Traces + +func convertSpan(span *tracepb.Span) map[string]interface{} { + record := map[string]interface{}{ + "trace_id": hex.EncodeToString(span.TraceId), + "span_id": hex.EncodeToString(span.SpanId), + "name": span.Name, + "kind": span.Kind.String(), + "start_time_unix_nano": span.StartTimeUnixNano, + "end_time_unix_nano": span.EndTimeUnixNano, + } + if len(span.ParentSpanId) > 0 { + record["parent_span_id"] = hex.EncodeToString(span.ParentSpanId) + } + if span.TraceState != "" { + record["trace_state"] = span.TraceState + } + if attrs := convertAttributes(span.Attributes); attrs != nil { + record["attributes"] = attrs + } + if span.Status != nil { + status := map[string]interface{}{ + "code": span.Status.Code.String(), + } + if span.Status.Message != "" { + status["message"] = span.Status.Message + } + record["status"] = status + } + if len(span.Events) > 0 { + events := make([]interface{}, len(span.Events)) + for i, e := range span.Events { + event := map[string]interface{}{ + "name": e.Name, + "time_unix_nano": e.TimeUnixNano, + } + if attrs := convertAttributes(e.Attributes); attrs != nil { + event["attributes"] = attrs + } + if e.DroppedAttributesCount > 0 { + event["dropped_attributes_count"] = e.DroppedAttributesCount + } + events[i] = event + } + record["events"] = events + } + if len(span.Links) > 0 { + links := make([]interface{}, len(span.Links)) + for i, l := range span.Links { + link := map[string]interface{}{ + "trace_id": hex.EncodeToString(l.TraceId), + "span_id": hex.EncodeToString(l.SpanId), + } + if l.TraceState != "" { + link["trace_state"] = l.TraceState + } + if attrs := convertAttributes(l.Attributes); attrs != nil { + link["attributes"] = attrs + } + if l.DroppedAttributesCount > 0 { + link["dropped_attributes_count"] = l.DroppedAttributesCount + } + links[i] = link + } + record["links"] = links + } + if span.DroppedAttributesCount > 0 { + record["dropped_attributes_count"] = span.DroppedAttributesCount + } + if span.DroppedEventsCount > 0 { + record["dropped_events_count"] = span.DroppedEventsCount + } + if span.DroppedLinksCount > 0 { + record["dropped_links_count"] = span.DroppedLinksCount + } + return record +} + +// Metrics + +func convertMetric(metric *metricspb.Metric) map[string]interface{} { + record := map[string]interface{}{ + "name": metric.Name, + } + if metric.Description != "" { + record["description"] = metric.Description + } + if metric.Unit != "" { + record["unit"] = metric.Unit + } + + switch data := metric.Data.(type) { + case *metricspb.Metric_Gauge: + record["type"] = "gauge" + record["data_points"] = convertNumberDataPoints(data.Gauge.DataPoints) + case *metricspb.Metric_Sum: + record["type"] = "sum" + record["data_points"] = convertNumberDataPoints(data.Sum.DataPoints) + record["aggregation_temporality"] = data.Sum.AggregationTemporality.String() + record["is_monotonic"] = data.Sum.IsMonotonic + case *metricspb.Metric_Histogram: + record["type"] = "histogram" + record["data_points"] = convertHistogramDataPoints(data.Histogram.DataPoints) + record["aggregation_temporality"] = data.Histogram.AggregationTemporality.String() + case *metricspb.Metric_ExponentialHistogram: + record["type"] = "exponential_histogram" + record["data_points"] = convertExponentialHistogramDataPoints(data.ExponentialHistogram.DataPoints) + record["aggregation_temporality"] = data.ExponentialHistogram.AggregationTemporality.String() + case *metricspb.Metric_Summary: + record["type"] = "summary" + record["data_points"] = convertSummaryDataPoints(data.Summary.DataPoints) + } + + return record +} + +func convertNumberDataPoints(dps []*metricspb.NumberDataPoint) []interface{} { + result := make([]interface{}, len(dps)) + for i, dp := range dps { + point := map[string]interface{}{ + "start_time_unix_nano": dp.StartTimeUnixNano, + "time_unix_nano": dp.TimeUnixNano, + } + switch v := dp.Value.(type) { + case *metricspb.NumberDataPoint_AsDouble: + point["value"] = v.AsDouble + case *metricspb.NumberDataPoint_AsInt: + point["value"] = v.AsInt + } + if attrs := convertAttributes(dp.Attributes); attrs != nil { + point["attributes"] = attrs + } + if len(dp.Exemplars) > 0 { + point["exemplars"] = convertExemplars(dp.Exemplars) + } + result[i] = point + } + return result +} + +func convertHistogramDataPoints(dps []*metricspb.HistogramDataPoint) []interface{} { + result := make([]interface{}, len(dps)) + for i, dp := range dps { + point := map[string]interface{}{ + "start_time_unix_nano": dp.StartTimeUnixNano, + "time_unix_nano": dp.TimeUnixNano, + "count": dp.Count, + } + if len(dp.BucketCounts) > 0 { + point["bucket_counts"] = dp.BucketCounts + } + if len(dp.ExplicitBounds) > 0 { + point["explicit_bounds"] = dp.ExplicitBounds + } + if dp.Sum != nil { + point["sum"] = *dp.Sum + } + if dp.Min != nil { + point["min"] = *dp.Min + } + if dp.Max != nil { + point["max"] = *dp.Max + } + if attrs := convertAttributes(dp.Attributes); attrs != nil { + point["attributes"] = attrs + } + if len(dp.Exemplars) > 0 { + point["exemplars"] = convertExemplars(dp.Exemplars) + } + result[i] = point + } + return result +} + +func convertExponentialHistogramDataPoints(dps []*metricspb.ExponentialHistogramDataPoint) []interface{} { + result := make([]interface{}, len(dps)) + for i, dp := range dps { + point := map[string]interface{}{ + "start_time_unix_nano": dp.StartTimeUnixNano, + "time_unix_nano": dp.TimeUnixNano, + "count": dp.Count, + "scale": dp.Scale, + "zero_count": dp.ZeroCount, + } + if dp.Sum != nil { + point["sum"] = *dp.Sum + } + if dp.Min != nil { + point["min"] = *dp.Min + } + if dp.Max != nil { + point["max"] = *dp.Max + } + if dp.Positive != nil { + point["positive"] = map[string]interface{}{ + "offset": dp.Positive.Offset, + "bucket_counts": dp.Positive.BucketCounts, + } + } + if dp.Negative != nil { + point["negative"] = map[string]interface{}{ + "offset": dp.Negative.Offset, + "bucket_counts": dp.Negative.BucketCounts, + } + } + if attrs := convertAttributes(dp.Attributes); attrs != nil { + point["attributes"] = attrs + } + if len(dp.Exemplars) > 0 { + point["exemplars"] = convertExemplars(dp.Exemplars) + } + result[i] = point + } + return result +} + +func convertSummaryDataPoints(dps []*metricspb.SummaryDataPoint) []interface{} { + result := make([]interface{}, len(dps)) + for i, dp := range dps { + point := map[string]interface{}{ + "start_time_unix_nano": dp.StartTimeUnixNano, + "time_unix_nano": dp.TimeUnixNano, + "count": dp.Count, + "sum": dp.Sum, + } + if len(dp.QuantileValues) > 0 { + qvs := make([]interface{}, len(dp.QuantileValues)) + for j, qv := range dp.QuantileValues { + qvs[j] = map[string]interface{}{ + "quantile": qv.Quantile, + "value": qv.Value, + } + } + point["quantile_values"] = qvs + } + if attrs := convertAttributes(dp.Attributes); attrs != nil { + point["attributes"] = attrs + } + result[i] = point + } + return result +} + +func convertExemplars(exemplars []*metricspb.Exemplar) []interface{} { + result := make([]interface{}, len(exemplars)) + for i, e := range exemplars { + ex := map[string]interface{}{ + "time_unix_nano": e.TimeUnixNano, + } + switch v := e.Value.(type) { + case *metricspb.Exemplar_AsDouble: + ex["value"] = v.AsDouble + case *metricspb.Exemplar_AsInt: + ex["value"] = v.AsInt + } + if len(e.TraceId) > 0 { + ex["trace_id"] = hex.EncodeToString(e.TraceId) + } + if len(e.SpanId) > 0 { + ex["span_id"] = hex.EncodeToString(e.SpanId) + } + if attrs := convertAttributes(e.FilteredAttributes); attrs != nil { + ex["filtered_attributes"] = attrs + } + result[i] = ex + } + return result +} + +// Logs + +func convertLogRecord(lr *logspb.LogRecord) map[string]interface{} { + record := map[string]interface{}{} + if lr.TimeUnixNano != 0 { + record["time_unix_nano"] = lr.TimeUnixNano + } + if lr.ObservedTimeUnixNano != 0 { + record["observed_time_unix_nano"] = lr.ObservedTimeUnixNano + } + if lr.SeverityNumber != 0 { + record["severity_number"] = int32(lr.SeverityNumber) + } + if lr.SeverityText != "" { + record["severity_text"] = lr.SeverityText + } + if lr.Body != nil { + record["body"] = convertAnyValue(lr.Body) + } + if attrs := convertAttributes(lr.Attributes); attrs != nil { + record["attributes"] = attrs + } + if len(lr.TraceId) > 0 { + record["trace_id"] = hex.EncodeToString(lr.TraceId) + } + if len(lr.SpanId) > 0 { + record["span_id"] = hex.EncodeToString(lr.SpanId) + } + if lr.Flags > 0 { + record["flags"] = lr.Flags + } + if lr.DroppedAttributesCount > 0 { + record["dropped_attributes_count"] = lr.DroppedAttributesCount + } + return record +} From 167e6688c858115cb6c138dc5923ee5e9390a75b Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Fri, 6 Mar 2026 14:59:57 -0800 Subject: [PATCH 2/2] refactor: pass through original OTLP data as-is instead of converting Ship the entire OTLP request as a single JSON event via protojson, removing per-record conversion logic. This allows the backend (legion_usp_proxy) to perform centralized mapping regardless of how events arrive. Co-Authored-By: Claude Opus 4.6 --- otel/client.go | 61 ++---- otel/client_test.go | 464 +------------------------------------------- otel/convert.go | 393 ------------------------------------- 3 files changed, 19 insertions(+), 899 deletions(-) delete mode 100644 otel/convert.go diff --git a/otel/client.go b/otel/client.go index 2fa8592..ac3ac07 100644 --- a/otel/client.go +++ b/otel/client.go @@ -2,6 +2,7 @@ package usp_otel import ( "context" + "encoding/json" "errors" "fmt" "io" @@ -169,16 +170,23 @@ func (a *OTelAdapter) Close() error { return err2 } -func (a *OTelAdapter) shipJSON(eventType string, payload map[string]interface{}, timestampMs uint64) { - if timestampMs == 0 { - timestampMs = uint64(time.Now().UnixMilli()) +func (a *OTelAdapter) shipProto(eventType string, protoMsg proto.Message) { + jsonBytes, err := protojson.Marshal(protoMsg) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("protojson.Marshal(): %v", err)) + return + } + var payload map[string]interface{} + if err := json.Unmarshal(jsonBytes, &payload); err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("json.Unmarshal(): %v", err)) + return } msg := &protocol.DataMessage{ JsonPayload: payload, EventType: eventType, - TimestampMs: timestampMs, + TimestampMs: uint64(time.Now().UnixMilli()), } - err := a.uspClient.Ship(msg, a.writeTimeout) + err = a.uspClient.Ship(msg, a.writeTimeout) if err == uspclient.ErrorBufferFull { a.conf.ClientOptions.OnWarning("stream falling behind") err = a.uspClient.Ship(msg, 1*time.Hour) @@ -308,51 +316,16 @@ func marshalOTLPResponse(w http.ResponseWriter, r *http.Request, msg proto.Messa w.Write(respBytes) } -// Processing functions +// Processing functions: pass through the original OTLP request as-is. func (a *OTelAdapter) processTraces(req *coltracepb.ExportTraceServiceRequest) { - for _, rs := range req.ResourceSpans { - resource := convertResource(rs.Resource) - for _, ss := range rs.ScopeSpans { - scope := convertScope(ss.Scope) - for _, span := range ss.Spans { - record := convertSpan(span) - record["resource"] = resource - record["scope"] = scope - ts := nanoToMs(span.StartTimeUnixNano) - a.shipJSON(eventTypeOTelTrace, record, ts) - } - } - } + a.shipProto(eventTypeOTelTrace, req) } func (a *OTelAdapter) processMetrics(req *colmetricspb.ExportMetricsServiceRequest) { - for _, rm := range req.ResourceMetrics { - resource := convertResource(rm.Resource) - for _, sm := range rm.ScopeMetrics { - scope := convertScope(sm.Scope) - for _, metric := range sm.Metrics { - record := convertMetric(metric) - record["resource"] = resource - record["scope"] = scope - a.shipJSON(eventTypeOTelMetric, record, 0) - } - } - } + a.shipProto(eventTypeOTelMetric, req) } func (a *OTelAdapter) processLogs(req *collogspb.ExportLogsServiceRequest) { - for _, rl := range req.ResourceLogs { - resource := convertResource(rl.Resource) - for _, sl := range rl.ScopeLogs { - scope := convertScope(sl.Scope) - for _, logRecord := range sl.LogRecords { - record := convertLogRecord(logRecord) - record["resource"] = resource - record["scope"] = scope - ts := nanoToMs(logRecord.TimeUnixNano) - a.shipJSON(eventTypeOTelLog, record, ts) - } - } - } + a.shipProto(eventTypeOTelLog, req) } diff --git a/otel/client_test.go b/otel/client_test.go index 45236d0..4417ad4 100644 --- a/otel/client_test.go +++ b/otel/client_test.go @@ -55,455 +55,6 @@ func makeSpanID() []byte { return b } -func makeParentSpanID() []byte { - b, _ := hex.DecodeString("0807060504030201") - return b -} - -// --- Conversion tests --- - -func TestConvertAttributes(t *testing.T) { - attrs := []*commonpb.KeyValue{ - {Key: "string_key", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "hello"}}}, - {Key: "int_key", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_IntValue{IntValue: 42}}}, - {Key: "double_key", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_DoubleValue{DoubleValue: 3.14}}}, - {Key: "bool_key", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_BoolValue{BoolValue: true}}}, - {Key: "array_key", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_ArrayValue{ - ArrayValue: &commonpb.ArrayValue{ - Values: []*commonpb.AnyValue{ - {Value: &commonpb.AnyValue_StringValue{StringValue: "a"}}, - {Value: &commonpb.AnyValue_IntValue{IntValue: 1}}, - }, - }, - }}}, - {Key: "kvlist_key", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_KvlistValue{ - KvlistValue: &commonpb.KeyValueList{ - Values: []*commonpb.KeyValue{ - {Key: "nested", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "value"}}}, - }, - }, - }}}, - {Key: "bytes_key", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_BytesValue{BytesValue: []byte{0xDE, 0xAD}}}}, - } - - result := convertAttributes(attrs) - - assert.Equal(t, "hello", result["string_key"]) - assert.Equal(t, int64(42), result["int_key"]) - assert.Equal(t, 3.14, result["double_key"]) - assert.Equal(t, true, result["bool_key"]) - assert.Equal(t, []interface{}{"a", int64(1)}, result["array_key"]) - assert.Equal(t, map[string]interface{}{"nested": "value"}, result["kvlist_key"]) - assert.Equal(t, "3q0=", result["bytes_key"]) // base64 of 0xDEAD -} - -func TestConvertAttributesNil(t *testing.T) { - assert.Nil(t, convertAttributes(nil)) - assert.Nil(t, convertAttributes([]*commonpb.KeyValue{})) -} - -func TestConvertAnyValueNil(t *testing.T) { - assert.Nil(t, convertAnyValue(nil)) - assert.Nil(t, convertAnyValue(&commonpb.AnyValue{})) -} - -func TestConvertResource(t *testing.T) { - r := &resourcepb.Resource{ - Attributes: []*commonpb.KeyValue{ - {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "my-service"}}}, - {Key: "service.version", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "1.0.0"}}}, - }, - } - result := convertResource(r) - assert.Equal(t, "my-service", result["service.name"]) - assert.Equal(t, "1.0.0", result["service.version"]) -} - -func TestConvertResourceNil(t *testing.T) { - assert.Nil(t, convertResource(nil)) -} - -func TestConvertScope(t *testing.T) { - scope := &commonpb.InstrumentationScope{ - Name: "my-library", - Version: "2.0", - Attributes: []*commonpb.KeyValue{ - {Key: "scope_attr", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "val"}}}, - }, - } - result := convertScope(scope) - assert.Equal(t, "my-library", result["name"]) - assert.Equal(t, "2.0", result["version"]) - assert.Equal(t, map[string]interface{}{"scope_attr": "val"}, result["attributes"]) -} - -func TestConvertScopeNil(t *testing.T) { - assert.Nil(t, convertScope(nil)) -} - -func TestConvertSpan(t *testing.T) { - traceID := makeTraceID() - spanID := makeSpanID() - parentSpanID := makeParentSpanID() - - span := &tracepb.Span{ - TraceId: traceID, - SpanId: spanID, - ParentSpanId: parentSpanID, - TraceState: "key=value", - Name: "GET /api/users", - Kind: tracepb.Span_SPAN_KIND_SERVER, - StartTimeUnixNano: 1000000000000, // 1s in nanoseconds - EndTimeUnixNano: 2000000000000, // 2s in nanoseconds - Attributes: []*commonpb.KeyValue{ - {Key: "http.method", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "GET"}}}, - {Key: "http.status_code", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_IntValue{IntValue: 200}}}, - }, - Status: &tracepb.Status{ - Code: tracepb.Status_STATUS_CODE_OK, - Message: "success", - }, - Events: []*tracepb.Span_Event{ - { - Name: "exception", - TimeUnixNano: 1500000000000, - Attributes: []*commonpb.KeyValue{ - {Key: "exception.message", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "not found"}}}, - }, - DroppedAttributesCount: 1, - }, - }, - Links: []*tracepb.Span_Link{ - { - TraceId: traceID, - SpanId: spanID, - TraceState: "linked", - Attributes: []*commonpb.KeyValue{ - {Key: "link_attr", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "v"}}}, - }, - DroppedAttributesCount: 2, - }, - }, - DroppedAttributesCount: 3, - DroppedEventsCount: 4, - DroppedLinksCount: 5, - } - - result := convertSpan(span) - - assert.Equal(t, hex.EncodeToString(traceID), result["trace_id"]) - assert.Equal(t, hex.EncodeToString(spanID), result["span_id"]) - assert.Equal(t, hex.EncodeToString(parentSpanID), result["parent_span_id"]) - assert.Equal(t, "key=value", result["trace_state"]) - assert.Equal(t, "GET /api/users", result["name"]) - assert.Equal(t, "SPAN_KIND_SERVER", result["kind"]) - assert.Equal(t, uint64(1000000000000), result["start_time_unix_nano"]) - assert.Equal(t, uint64(2000000000000), result["end_time_unix_nano"]) - - attrs := result["attributes"].(map[string]interface{}) - assert.Equal(t, "GET", attrs["http.method"]) - assert.Equal(t, int64(200), attrs["http.status_code"]) - - status := result["status"].(map[string]interface{}) - assert.Equal(t, "STATUS_CODE_OK", status["code"]) - assert.Equal(t, "success", status["message"]) - - events := result["events"].([]interface{}) - require.Len(t, events, 1) - event := events[0].(map[string]interface{}) - assert.Equal(t, "exception", event["name"]) - assert.Equal(t, uint64(1500000000000), event["time_unix_nano"]) - assert.Equal(t, uint32(1), event["dropped_attributes_count"]) - - links := result["links"].([]interface{}) - require.Len(t, links, 1) - link := links[0].(map[string]interface{}) - assert.Equal(t, "linked", link["trace_state"]) - assert.Equal(t, uint32(2), link["dropped_attributes_count"]) - - assert.Equal(t, uint32(3), result["dropped_attributes_count"]) - assert.Equal(t, uint32(4), result["dropped_events_count"]) - assert.Equal(t, uint32(5), result["dropped_links_count"]) -} - -func TestConvertSpanMinimal(t *testing.T) { - span := &tracepb.Span{ - TraceId: makeTraceID(), - SpanId: makeSpanID(), - Name: "simple", - Kind: tracepb.Span_SPAN_KIND_INTERNAL, - StartTimeUnixNano: 1000000000000, - EndTimeUnixNano: 2000000000000, - } - result := convertSpan(span) - assert.Equal(t, "simple", result["name"]) - assert.Nil(t, result["parent_span_id"]) - assert.Nil(t, result["trace_state"]) - assert.Nil(t, result["attributes"]) - assert.Nil(t, result["status"]) - assert.Nil(t, result["events"]) - assert.Nil(t, result["links"]) -} - -func TestConvertLogRecord(t *testing.T) { - traceID := makeTraceID() - spanID := makeSpanID() - - lr := &logspb.LogRecord{ - TimeUnixNano: 1000000000000, - ObservedTimeUnixNano: 1000000100000, - SeverityNumber: logspb.SeverityNumber_SEVERITY_NUMBER_ERROR, - SeverityText: "ERROR", - Body: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "something went wrong"}}, - Attributes: []*commonpb.KeyValue{ - {Key: "log.source", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "app"}}}, - }, - TraceId: traceID, - SpanId: spanID, - Flags: 1, - DroppedAttributesCount: 2, - } - - result := convertLogRecord(lr) - - assert.Equal(t, uint64(1000000000000), result["time_unix_nano"]) - assert.Equal(t, uint64(1000000100000), result["observed_time_unix_nano"]) - assert.Equal(t, int32(logspb.SeverityNumber_SEVERITY_NUMBER_ERROR), result["severity_number"]) - assert.Equal(t, "ERROR", result["severity_text"]) - assert.Equal(t, "something went wrong", result["body"]) - assert.Equal(t, hex.EncodeToString(traceID), result["trace_id"]) - assert.Equal(t, hex.EncodeToString(spanID), result["span_id"]) - assert.Equal(t, uint32(1), result["flags"]) - assert.Equal(t, uint32(2), result["dropped_attributes_count"]) - - attrs := result["attributes"].(map[string]interface{}) - assert.Equal(t, "app", attrs["log.source"]) -} - -func TestConvertLogRecordMinimal(t *testing.T) { - lr := &logspb.LogRecord{} - result := convertLogRecord(lr) - assert.Empty(t, result) -} - -func TestConvertMetricGauge(t *testing.T) { - metric := &metricspb.Metric{ - Name: "cpu.usage", - Description: "CPU usage percentage", - Unit: "%", - Data: &metricspb.Metric_Gauge{ - Gauge: &metricspb.Gauge{ - DataPoints: []*metricspb.NumberDataPoint{ - { - StartTimeUnixNano: 1000000000000, - TimeUnixNano: 2000000000000, - Value: &metricspb.NumberDataPoint_AsDouble{AsDouble: 75.5}, - Attributes: []*commonpb.KeyValue{ - {Key: "host", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "server1"}}}, - }, - }, - { - StartTimeUnixNano: 1000000000000, - TimeUnixNano: 2000000000000, - Value: &metricspb.NumberDataPoint_AsInt{AsInt: 80}, - }, - }, - }, - }, - } - - result := convertMetric(metric) - - assert.Equal(t, "cpu.usage", result["name"]) - assert.Equal(t, "CPU usage percentage", result["description"]) - assert.Equal(t, "%", result["unit"]) - assert.Equal(t, "gauge", result["type"]) - - dps := result["data_points"].([]interface{}) - require.Len(t, dps, 2) - - dp0 := dps[0].(map[string]interface{}) - assert.Equal(t, 75.5, dp0["value"]) - assert.Equal(t, "server1", dp0["attributes"].(map[string]interface{})["host"]) - - dp1 := dps[1].(map[string]interface{}) - assert.Equal(t, int64(80), dp1["value"]) -} - -func TestConvertMetricSum(t *testing.T) { - metric := &metricspb.Metric{ - Name: "http.requests", - Data: &metricspb.Metric_Sum{ - Sum: &metricspb.Sum{ - AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, - IsMonotonic: true, - DataPoints: []*metricspb.NumberDataPoint{ - { - TimeUnixNano: 2000000000000, - Value: &metricspb.NumberDataPoint_AsInt{AsInt: 100}, - }, - }, - }, - }, - } - - result := convertMetric(metric) - assert.Equal(t, "sum", result["type"]) - assert.Equal(t, "AGGREGATION_TEMPORALITY_CUMULATIVE", result["aggregation_temporality"]) - assert.Equal(t, true, result["is_monotonic"]) -} - -func TestConvertMetricHistogram(t *testing.T) { - sum := 123.0 - min := 1.0 - max := 100.0 - metric := &metricspb.Metric{ - Name: "http.request.duration", - Data: &metricspb.Metric_Histogram{ - Histogram: &metricspb.Histogram{ - AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, - DataPoints: []*metricspb.HistogramDataPoint{ - { - StartTimeUnixNano: 1000000000000, - TimeUnixNano: 2000000000000, - Count: 10, - Sum: &sum, - Min: &min, - Max: &max, - BucketCounts: []uint64{2, 3, 5}, - ExplicitBounds: []float64{10.0, 50.0}, - Exemplars: []*metricspb.Exemplar{ - { - TimeUnixNano: 1500000000000, - Value: &metricspb.Exemplar_AsDouble{AsDouble: 42.0}, - TraceId: makeTraceID(), - SpanId: makeSpanID(), - FilteredAttributes: []*commonpb.KeyValue{ - {Key: "filtered", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "val"}}}, - }, - }, - }, - }, - }, - }, - }, - } - - result := convertMetric(metric) - assert.Equal(t, "histogram", result["type"]) - assert.Equal(t, "AGGREGATION_TEMPORALITY_DELTA", result["aggregation_temporality"]) - - dps := result["data_points"].([]interface{}) - require.Len(t, dps, 1) - dp := dps[0].(map[string]interface{}) - assert.Equal(t, uint64(10), dp["count"]) - assert.Equal(t, 123.0, dp["sum"]) - assert.Equal(t, 1.0, dp["min"]) - assert.Equal(t, 100.0, dp["max"]) - assert.Equal(t, []uint64{2, 3, 5}, dp["bucket_counts"]) - assert.Equal(t, []float64{10.0, 50.0}, dp["explicit_bounds"]) - - exemplars := dp["exemplars"].([]interface{}) - require.Len(t, exemplars, 1) - ex := exemplars[0].(map[string]interface{}) - assert.Equal(t, 42.0, ex["value"]) - assert.Equal(t, hex.EncodeToString(makeTraceID()), ex["trace_id"]) - assert.Equal(t, hex.EncodeToString(makeSpanID()), ex["span_id"]) - assert.Equal(t, "val", ex["filtered_attributes"].(map[string]interface{})["filtered"]) -} - -func TestConvertMetricExponentialHistogram(t *testing.T) { - sum := 500.0 - metric := &metricspb.Metric{ - Name: "exp_hist", - Data: &metricspb.Metric_ExponentialHistogram{ - ExponentialHistogram: &metricspb.ExponentialHistogram{ - AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, - DataPoints: []*metricspb.ExponentialHistogramDataPoint{ - { - TimeUnixNano: 2000000000000, - Count: 100, - Sum: &sum, - Scale: 5, - ZeroCount: 3, - Positive: &metricspb.ExponentialHistogramDataPoint_Buckets{ - Offset: 1, - BucketCounts: []uint64{10, 20, 30}, - }, - Negative: &metricspb.ExponentialHistogramDataPoint_Buckets{ - Offset: -2, - BucketCounts: []uint64{5, 15}, - }, - }, - }, - }, - }, - } - - result := convertMetric(metric) - assert.Equal(t, "exponential_histogram", result["type"]) - - dps := result["data_points"].([]interface{}) - require.Len(t, dps, 1) - dp := dps[0].(map[string]interface{}) - assert.Equal(t, uint64(100), dp["count"]) - assert.Equal(t, 500.0, dp["sum"]) - assert.Equal(t, int32(5), dp["scale"]) - assert.Equal(t, uint64(3), dp["zero_count"]) - - pos := dp["positive"].(map[string]interface{}) - assert.Equal(t, int32(1), pos["offset"]) - assert.Equal(t, []uint64{10, 20, 30}, pos["bucket_counts"]) - - neg := dp["negative"].(map[string]interface{}) - assert.Equal(t, int32(-2), neg["offset"]) - assert.Equal(t, []uint64{5, 15}, neg["bucket_counts"]) -} - -func TestConvertMetricSummary(t *testing.T) { - metric := &metricspb.Metric{ - Name: "request.duration.summary", - Data: &metricspb.Metric_Summary{ - Summary: &metricspb.Summary{ - DataPoints: []*metricspb.SummaryDataPoint{ - { - TimeUnixNano: 2000000000000, - Count: 50, - Sum: 1234.5, - QuantileValues: []*metricspb.SummaryDataPoint_ValueAtQuantile{ - {Quantile: 0.5, Value: 20.0}, - {Quantile: 0.99, Value: 99.0}, - }, - }, - }, - }, - }, - } - - result := convertMetric(metric) - assert.Equal(t, "summary", result["type"]) - - dps := result["data_points"].([]interface{}) - require.Len(t, dps, 1) - dp := dps[0].(map[string]interface{}) - assert.Equal(t, uint64(50), dp["count"]) - assert.Equal(t, 1234.5, dp["sum"]) - - qvs := dp["quantile_values"].([]interface{}) - require.Len(t, qvs, 2) - assert.Equal(t, 0.5, qvs[0].(map[string]interface{})["quantile"]) - assert.Equal(t, 20.0, qvs[0].(map[string]interface{})["value"]) - assert.Equal(t, 0.99, qvs[1].(map[string]interface{})["quantile"]) - assert.Equal(t, 99.0, qvs[1].(map[string]interface{})["value"]) -} - -func TestNanoToMs(t *testing.T) { - assert.Equal(t, uint64(0), nanoToMs(0)) - assert.Equal(t, uint64(1000), nanoToMs(1000000000)) - assert.Equal(t, uint64(1), nanoToMs(1000000)) - assert.Equal(t, uint64(1500), nanoToMs(1500000000)) -} - // --- Validation tests --- func validClientOptions() uspclient.ClientOptions { @@ -568,21 +119,16 @@ func TestOTelConfigValidate(t *testing.T) { func TestGRPCTraceExport(t *testing.T) { conf := OTelConfig{ ClientOptions: testClientOptions(t), - GRPCPort: 0, // Will use a random port + GRPCPort: getFreePort(t), } - // Use a random available port - conf.GRPCPort = getFreePort(t) - ctx := context.Background() adapter, chStopped, err := NewOTelAdapter(ctx, conf) require.NoError(t, err) defer adapter.Close() - // Wait for server to start time.Sleep(100 * time.Millisecond) - // Connect gRPC client conn, err := grpc.NewClient( fmt.Sprintf("localhost:%d", conf.GRPCPort), grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -623,7 +169,6 @@ func TestGRPCTraceExport(t *testing.T) { require.NoError(t, err) assert.NotNil(t, resp) - // Verify adapter is still running select { case <-chStopped: t.Fatal("adapter stopped unexpectedly") @@ -672,7 +217,7 @@ func TestGRPCMetricsExport(t *testing.T) { Data: &metricspb.Metric_Sum{ Sum: &metricspb.Sum{ AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, - IsMonotonic: true, + IsMonotonic: true, DataPoints: []*metricspb.NumberDataPoint{ { TimeUnixNano: 2000000000000, @@ -1033,7 +578,6 @@ func TestAdapterGRPCOnly(t *testing.T) { adapter, chStopped, err := NewOTelAdapter(ctx, conf) require.NoError(t, err) - // Verify it's running select { case <-chStopped: t.Fatal("adapter stopped unexpectedly") @@ -1042,7 +586,6 @@ func TestAdapterGRPCOnly(t *testing.T) { require.NoError(t, adapter.Close()) - // Verify it stopped select { case <-chStopped: case <-time.After(5 * time.Second): @@ -1125,7 +668,6 @@ func TestAdapterBothPorts(t *testing.T) { resp.Body.Close() assert.Equal(t, http.StatusOK, resp.StatusCode) - // Graceful shutdown require.NoError(t, adapter.Close()) select { case <-chStopped: @@ -1156,7 +698,6 @@ func TestGRPCMultipleResourceSpans(t *testing.T) { client := coltracepb.NewTraceServiceClient(conn) - // Send request with multiple resources, scopes, and spans req := &coltracepb.ExportTraceServiceRequest{ ResourceSpans: []*tracepb.ResourceSpans{ { @@ -1223,7 +764,6 @@ func TestGRPCEmptyRequest(t *testing.T) { require.NoError(t, err) defer conn.Close() - // Empty requests should succeed without errors traceClient := coltracepb.NewTraceServiceClient(conn) _, err = traceClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{}) assert.NoError(t, err) diff --git a/otel/convert.go b/otel/convert.go deleted file mode 100644 index 8afd324..0000000 --- a/otel/convert.go +++ /dev/null @@ -1,393 +0,0 @@ -package usp_otel - -import ( - "encoding/base64" - "encoding/hex" - - commonpb "go.opentelemetry.io/proto/otlp/common/v1" - logspb "go.opentelemetry.io/proto/otlp/logs/v1" - metricspb "go.opentelemetry.io/proto/otlp/metrics/v1" - resourcepb "go.opentelemetry.io/proto/otlp/resource/v1" - tracepb "go.opentelemetry.io/proto/otlp/trace/v1" -) - -func nanoToMs(nanos uint64) uint64 { - if nanos == 0 { - return 0 - } - return nanos / 1_000_000 -} - -func convertAttributes(attrs []*commonpb.KeyValue) map[string]interface{} { - if len(attrs) == 0 { - return nil - } - result := make(map[string]interface{}, len(attrs)) - for _, kv := range attrs { - result[kv.Key] = convertAnyValue(kv.Value) - } - return result -} - -func convertAnyValue(v *commonpb.AnyValue) interface{} { - if v == nil { - return nil - } - switch val := v.Value.(type) { - case *commonpb.AnyValue_StringValue: - return val.StringValue - case *commonpb.AnyValue_IntValue: - return val.IntValue - case *commonpb.AnyValue_DoubleValue: - return val.DoubleValue - case *commonpb.AnyValue_BoolValue: - return val.BoolValue - case *commonpb.AnyValue_ArrayValue: - if val.ArrayValue == nil { - return nil - } - arr := make([]interface{}, len(val.ArrayValue.Values)) - for i, v := range val.ArrayValue.Values { - arr[i] = convertAnyValue(v) - } - return arr - case *commonpb.AnyValue_KvlistValue: - if val.KvlistValue == nil { - return nil - } - return convertAttributes(val.KvlistValue.Values) - case *commonpb.AnyValue_BytesValue: - return base64.StdEncoding.EncodeToString(val.BytesValue) - } - return nil -} - -func convertResource(r *resourcepb.Resource) map[string]interface{} { - if r == nil { - return nil - } - return convertAttributes(r.Attributes) -} - -func convertScope(scope *commonpb.InstrumentationScope) map[string]interface{} { - if scope == nil { - return nil - } - result := map[string]interface{}{} - if scope.Name != "" { - result["name"] = scope.Name - } - if scope.Version != "" { - result["version"] = scope.Version - } - if attrs := convertAttributes(scope.Attributes); attrs != nil { - result["attributes"] = attrs - } - return result -} - -// Traces - -func convertSpan(span *tracepb.Span) map[string]interface{} { - record := map[string]interface{}{ - "trace_id": hex.EncodeToString(span.TraceId), - "span_id": hex.EncodeToString(span.SpanId), - "name": span.Name, - "kind": span.Kind.String(), - "start_time_unix_nano": span.StartTimeUnixNano, - "end_time_unix_nano": span.EndTimeUnixNano, - } - if len(span.ParentSpanId) > 0 { - record["parent_span_id"] = hex.EncodeToString(span.ParentSpanId) - } - if span.TraceState != "" { - record["trace_state"] = span.TraceState - } - if attrs := convertAttributes(span.Attributes); attrs != nil { - record["attributes"] = attrs - } - if span.Status != nil { - status := map[string]interface{}{ - "code": span.Status.Code.String(), - } - if span.Status.Message != "" { - status["message"] = span.Status.Message - } - record["status"] = status - } - if len(span.Events) > 0 { - events := make([]interface{}, len(span.Events)) - for i, e := range span.Events { - event := map[string]interface{}{ - "name": e.Name, - "time_unix_nano": e.TimeUnixNano, - } - if attrs := convertAttributes(e.Attributes); attrs != nil { - event["attributes"] = attrs - } - if e.DroppedAttributesCount > 0 { - event["dropped_attributes_count"] = e.DroppedAttributesCount - } - events[i] = event - } - record["events"] = events - } - if len(span.Links) > 0 { - links := make([]interface{}, len(span.Links)) - for i, l := range span.Links { - link := map[string]interface{}{ - "trace_id": hex.EncodeToString(l.TraceId), - "span_id": hex.EncodeToString(l.SpanId), - } - if l.TraceState != "" { - link["trace_state"] = l.TraceState - } - if attrs := convertAttributes(l.Attributes); attrs != nil { - link["attributes"] = attrs - } - if l.DroppedAttributesCount > 0 { - link["dropped_attributes_count"] = l.DroppedAttributesCount - } - links[i] = link - } - record["links"] = links - } - if span.DroppedAttributesCount > 0 { - record["dropped_attributes_count"] = span.DroppedAttributesCount - } - if span.DroppedEventsCount > 0 { - record["dropped_events_count"] = span.DroppedEventsCount - } - if span.DroppedLinksCount > 0 { - record["dropped_links_count"] = span.DroppedLinksCount - } - return record -} - -// Metrics - -func convertMetric(metric *metricspb.Metric) map[string]interface{} { - record := map[string]interface{}{ - "name": metric.Name, - } - if metric.Description != "" { - record["description"] = metric.Description - } - if metric.Unit != "" { - record["unit"] = metric.Unit - } - - switch data := metric.Data.(type) { - case *metricspb.Metric_Gauge: - record["type"] = "gauge" - record["data_points"] = convertNumberDataPoints(data.Gauge.DataPoints) - case *metricspb.Metric_Sum: - record["type"] = "sum" - record["data_points"] = convertNumberDataPoints(data.Sum.DataPoints) - record["aggregation_temporality"] = data.Sum.AggregationTemporality.String() - record["is_monotonic"] = data.Sum.IsMonotonic - case *metricspb.Metric_Histogram: - record["type"] = "histogram" - record["data_points"] = convertHistogramDataPoints(data.Histogram.DataPoints) - record["aggregation_temporality"] = data.Histogram.AggregationTemporality.String() - case *metricspb.Metric_ExponentialHistogram: - record["type"] = "exponential_histogram" - record["data_points"] = convertExponentialHistogramDataPoints(data.ExponentialHistogram.DataPoints) - record["aggregation_temporality"] = data.ExponentialHistogram.AggregationTemporality.String() - case *metricspb.Metric_Summary: - record["type"] = "summary" - record["data_points"] = convertSummaryDataPoints(data.Summary.DataPoints) - } - - return record -} - -func convertNumberDataPoints(dps []*metricspb.NumberDataPoint) []interface{} { - result := make([]interface{}, len(dps)) - for i, dp := range dps { - point := map[string]interface{}{ - "start_time_unix_nano": dp.StartTimeUnixNano, - "time_unix_nano": dp.TimeUnixNano, - } - switch v := dp.Value.(type) { - case *metricspb.NumberDataPoint_AsDouble: - point["value"] = v.AsDouble - case *metricspb.NumberDataPoint_AsInt: - point["value"] = v.AsInt - } - if attrs := convertAttributes(dp.Attributes); attrs != nil { - point["attributes"] = attrs - } - if len(dp.Exemplars) > 0 { - point["exemplars"] = convertExemplars(dp.Exemplars) - } - result[i] = point - } - return result -} - -func convertHistogramDataPoints(dps []*metricspb.HistogramDataPoint) []interface{} { - result := make([]interface{}, len(dps)) - for i, dp := range dps { - point := map[string]interface{}{ - "start_time_unix_nano": dp.StartTimeUnixNano, - "time_unix_nano": dp.TimeUnixNano, - "count": dp.Count, - } - if len(dp.BucketCounts) > 0 { - point["bucket_counts"] = dp.BucketCounts - } - if len(dp.ExplicitBounds) > 0 { - point["explicit_bounds"] = dp.ExplicitBounds - } - if dp.Sum != nil { - point["sum"] = *dp.Sum - } - if dp.Min != nil { - point["min"] = *dp.Min - } - if dp.Max != nil { - point["max"] = *dp.Max - } - if attrs := convertAttributes(dp.Attributes); attrs != nil { - point["attributes"] = attrs - } - if len(dp.Exemplars) > 0 { - point["exemplars"] = convertExemplars(dp.Exemplars) - } - result[i] = point - } - return result -} - -func convertExponentialHistogramDataPoints(dps []*metricspb.ExponentialHistogramDataPoint) []interface{} { - result := make([]interface{}, len(dps)) - for i, dp := range dps { - point := map[string]interface{}{ - "start_time_unix_nano": dp.StartTimeUnixNano, - "time_unix_nano": dp.TimeUnixNano, - "count": dp.Count, - "scale": dp.Scale, - "zero_count": dp.ZeroCount, - } - if dp.Sum != nil { - point["sum"] = *dp.Sum - } - if dp.Min != nil { - point["min"] = *dp.Min - } - if dp.Max != nil { - point["max"] = *dp.Max - } - if dp.Positive != nil { - point["positive"] = map[string]interface{}{ - "offset": dp.Positive.Offset, - "bucket_counts": dp.Positive.BucketCounts, - } - } - if dp.Negative != nil { - point["negative"] = map[string]interface{}{ - "offset": dp.Negative.Offset, - "bucket_counts": dp.Negative.BucketCounts, - } - } - if attrs := convertAttributes(dp.Attributes); attrs != nil { - point["attributes"] = attrs - } - if len(dp.Exemplars) > 0 { - point["exemplars"] = convertExemplars(dp.Exemplars) - } - result[i] = point - } - return result -} - -func convertSummaryDataPoints(dps []*metricspb.SummaryDataPoint) []interface{} { - result := make([]interface{}, len(dps)) - for i, dp := range dps { - point := map[string]interface{}{ - "start_time_unix_nano": dp.StartTimeUnixNano, - "time_unix_nano": dp.TimeUnixNano, - "count": dp.Count, - "sum": dp.Sum, - } - if len(dp.QuantileValues) > 0 { - qvs := make([]interface{}, len(dp.QuantileValues)) - for j, qv := range dp.QuantileValues { - qvs[j] = map[string]interface{}{ - "quantile": qv.Quantile, - "value": qv.Value, - } - } - point["quantile_values"] = qvs - } - if attrs := convertAttributes(dp.Attributes); attrs != nil { - point["attributes"] = attrs - } - result[i] = point - } - return result -} - -func convertExemplars(exemplars []*metricspb.Exemplar) []interface{} { - result := make([]interface{}, len(exemplars)) - for i, e := range exemplars { - ex := map[string]interface{}{ - "time_unix_nano": e.TimeUnixNano, - } - switch v := e.Value.(type) { - case *metricspb.Exemplar_AsDouble: - ex["value"] = v.AsDouble - case *metricspb.Exemplar_AsInt: - ex["value"] = v.AsInt - } - if len(e.TraceId) > 0 { - ex["trace_id"] = hex.EncodeToString(e.TraceId) - } - if len(e.SpanId) > 0 { - ex["span_id"] = hex.EncodeToString(e.SpanId) - } - if attrs := convertAttributes(e.FilteredAttributes); attrs != nil { - ex["filtered_attributes"] = attrs - } - result[i] = ex - } - return result -} - -// Logs - -func convertLogRecord(lr *logspb.LogRecord) map[string]interface{} { - record := map[string]interface{}{} - if lr.TimeUnixNano != 0 { - record["time_unix_nano"] = lr.TimeUnixNano - } - if lr.ObservedTimeUnixNano != 0 { - record["observed_time_unix_nano"] = lr.ObservedTimeUnixNano - } - if lr.SeverityNumber != 0 { - record["severity_number"] = int32(lr.SeverityNumber) - } - if lr.SeverityText != "" { - record["severity_text"] = lr.SeverityText - } - if lr.Body != nil { - record["body"] = convertAnyValue(lr.Body) - } - if attrs := convertAttributes(lr.Attributes); attrs != nil { - record["attributes"] = attrs - } - if len(lr.TraceId) > 0 { - record["trace_id"] = hex.EncodeToString(lr.TraceId) - } - if len(lr.SpanId) > 0 { - record["span_id"] = hex.EncodeToString(lr.SpanId) - } - if lr.Flags > 0 { - record["flags"] = lr.Flags - } - if lr.DroppedAttributesCount > 0 { - record["dropped_attributes_count"] = lr.DroppedAttributesCount - } - return record -}