diff --git a/containers/conf/all.go b/containers/conf/all.go index 55784b9..0fdffa5 100755 --- a/containers/conf/all.go +++ b/containers/conf/all.go @@ -25,6 +25,7 @@ import ( "github.com/refractionPOINT/usp-adapters/ms_graph" "github.com/refractionPOINT/usp-adapters/o365" "github.com/refractionPOINT/usp-adapters/okta" + "github.com/refractionPOINT/usp-adapters/otel" "github.com/refractionPOINT/usp-adapters/pandadoc" "github.com/refractionPOINT/usp-adapters/proofpoint_tap" "github.com/refractionPOINT/usp-adapters/pubsub" @@ -86,5 +87,6 @@ type GeneralConfigs struct { Sublime usp_sublime.SublimeConfig `json:"sublime" yaml:"sublime"` SentinelOne usp_sentinelone.SentinelOneConfig `json:"sentinel_one" yaml:"sentinel_one"` TrendMicro usp_trendmicro.TrendMicroConfig `json:"trendmicro" yaml:"trendmicro"` + OTel usp_otel.OTelConfig `json:"otel" yaml:"otel"` Wiz usp_wiz.WizConfig `json:"wiz" yaml:"wiz"` } diff --git a/containers/general/tool.go b/containers/general/tool.go index 4c7af0b..adf9132 100755 --- a/containers/general/tool.go +++ b/containers/general/tool.go @@ -39,6 +39,7 @@ import ( "github.com/refractionPOINT/usp-adapters/ms_graph" "github.com/refractionPOINT/usp-adapters/o365" "github.com/refractionPOINT/usp-adapters/okta" + "github.com/refractionPOINT/usp-adapters/otel" "github.com/refractionPOINT/usp-adapters/pandadoc" "github.com/refractionPOINT/usp-adapters/proofpoint_tap" "github.com/refractionPOINT/usp-adapters/pubsub" @@ -481,6 +482,11 @@ func runAdapter(ctx context.Context, method string, configs Configuration, showC configs.SentinelOne.ClientOptions.Architecture = "usp_adapter" configToShow = configs.SentinelOne client, chRunning, err = usp_sentinelone.NewSentinelOneAdapter(ctx, configs.SentinelOne) + } else if method == "otel" { + configs.OTel.ClientOptions = applyLogging(configs.OTel.ClientOptions) + configs.OTel.ClientOptions.Architecture = "usp_adapter" + configToShow = configs.OTel + client, chRunning, err = usp_otel.NewOTelAdapter(ctx, configs.OTel) } else if method == "trendmicro" { configs.TrendMicro.ClientOptions = applyLogging(configs.TrendMicro.ClientOptions) configs.TrendMicro.ClientOptions.Architecture = "usp_adapter" diff --git a/go.mod b/go.mod index e402334..5a97002 100644 --- a/go.mod +++ b/go.mod @@ -22,12 +22,15 @@ require ( github.com/refractionPOINT/go-uspclient v1.6.3 github.com/stretchr/testify v1.11.1 github.com/vmihailenco/msgpack/v5 v5.4.1 + go.opentelemetry.io/proto/otlp v1.9.0 golang.org/x/net v0.49.0 golang.org/x/oauth2 v0.34.0 golang.org/x/sync v0.19.0 golang.org/x/sys v0.40.0 golang.org/x/text v0.33.0 google.golang.org/api v0.264.0 + google.golang.org/grpc v1.78.0 + google.golang.org/protobuf v1.36.11 gopkg.in/yaml.v3 v3.0.1 ) @@ -101,6 +104,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.11 // indirect github.com/googleapis/gax-go/v2 v2.17.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect @@ -145,8 +149,6 @@ require ( google.golang.org/genproto v0.0.0-20260203192932-546029d2fa20 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260203192932-546029d2fa20 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260203192932-546029d2fa20 // indirect - google.golang.org/grpc v1.78.0 // indirect - google.golang.org/protobuf v1.36.11 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect www.velocidex.com/golang/binparsergen v0.1.1-0.20240404114946-8f66c7cf586e // indirect diff --git a/go.sum b/go.sum index aee3bab..858585c 100644 --- a/go.sum +++ b/go.sum @@ -256,6 +256,8 @@ github.com/googleapis/gax-go/v2 v2.17.0 h1:RksgfBpxqff0EZkDWYuz9q/uWsTVz+kf43LsZ github.com/googleapis/gax-go/v2 v2.17.0/go.mod h1:mzaqghpQp4JDh3HvADwrat+6M3MOIDp5YKHhb9PAgDY= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 h1:8Tjv8EJ+pM1xP8mK6egEbD1OgnVTyacbefKhmbLhIhU= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2/go.mod h1:pkJQ2tZHJ0aFOVEEot6oZmaVEZcRme73eIFmhiVuRWs= github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= @@ -383,6 +385,8 @@ go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4A go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= +go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= +go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/otel/client.go b/otel/client.go new file mode 100644 index 0000000..ac3ac07 --- /dev/null +++ b/otel/client.go @@ -0,0 +1,331 @@ +package usp_otel + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/refractionPOINT/go-uspclient" + "github.com/refractionPOINT/go-uspclient/protocol" + + collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1" + colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" + + "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" +) + +const ( + defaultWriteTimeout = 60 * 10 + maxGRPCRecvMsgSize = 64 * 1024 * 1024 // 64MB, matching OTel Collector default + maxHTTPRequestBody = 64 * 1024 * 1024 + eventTypeOTelTrace = "otel_trace" + eventTypeOTelMetric = "otel_metric" + eventTypeOTelLog = "otel_log" +) + +type OTelConfig struct { + ClientOptions uspclient.ClientOptions `json:"client_options" yaml:"client_options"` + GRPCPort uint16 `json:"grpc_port,omitempty" yaml:"grpc_port,omitempty"` + HTTPPort uint16 `json:"http_port,omitempty" yaml:"http_port,omitempty"` + Interface string `json:"iface,omitempty" yaml:"iface,omitempty"` +} + +func (c *OTelConfig) Validate() error { + if err := c.ClientOptions.Validate(); err != nil { + return fmt.Errorf("client_options: %v", err) + } + if c.GRPCPort == 0 && c.HTTPPort == 0 { + return errors.New("at least one of grpc_port or http_port must be specified") + } + return nil +} + +type OTelAdapter struct { + conf OTelConfig + uspClient *uspclient.Client + grpcServer *grpc.Server + httpServer *http.Server + grpcListener net.Listener + httpListener net.Listener + wg sync.WaitGroup + isRunning uint32 + writeTimeout time.Duration +} + +func NewOTelAdapter(ctx context.Context, conf OTelConfig) (*OTelAdapter, chan struct{}, error) { + a := &OTelAdapter{ + conf: conf, + isRunning: 1, + writeTimeout: time.Duration(defaultWriteTimeout) * time.Second, + } + + var err error + a.uspClient, err = uspclient.NewClient(ctx, conf.ClientOptions) + if err != nil { + return nil, nil, err + } + + if conf.GRPCPort != 0 { + addr := fmt.Sprintf("%s:%d", conf.Interface, conf.GRPCPort) + a.grpcListener, err = net.Listen("tcp", addr) + if err != nil { + a.uspClient.Close() + return nil, nil, fmt.Errorf("grpc listen on %s: %v", addr, err) + } + a.grpcServer = grpc.NewServer( + grpc.MaxRecvMsgSize(maxGRPCRecvMsgSize), + ) + coltracepb.RegisterTraceServiceServer(a.grpcServer, &traceServiceServer{adapter: a}) + colmetricspb.RegisterMetricsServiceServer(a.grpcServer, &metricsServiceServer{adapter: a}) + collogspb.RegisterLogsServiceServer(a.grpcServer, &logsServiceServer{adapter: a}) + } + + if conf.HTTPPort != 0 { + addr := fmt.Sprintf("%s:%d", conf.Interface, conf.HTTPPort) + a.httpListener, err = net.Listen("tcp", addr) + if err != nil { + if a.grpcListener != nil { + a.grpcListener.Close() + } + a.uspClient.Close() + return nil, nil, fmt.Errorf("http listen on %s: %v", addr, err) + } + mux := http.NewServeMux() + mux.HandleFunc("/v1/traces", a.handleHTTPTraces) + mux.HandleFunc("/v1/metrics", a.handleHTTPMetrics) + mux.HandleFunc("/v1/logs", a.handleHTTPLogs) + a.httpServer = &http.Server{Handler: mux} + } + + chStopped := make(chan struct{}) + a.wg.Add(1) + go func() { + defer a.wg.Done() + var serverWg sync.WaitGroup + + if a.grpcServer != nil { + serverWg.Add(1) + go func() { + defer serverWg.Done() + conf.ClientOptions.DebugLog(fmt.Sprintf("gRPC server listening on %s:%d", conf.Interface, conf.GRPCPort)) + if err := a.grpcServer.Serve(a.grpcListener); err != nil { + if atomic.LoadUint32(&a.isRunning) == 1 { + conf.ClientOptions.OnError(fmt.Errorf("grpc serve: %v", err)) + } + } + }() + } + + if a.httpServer != nil { + serverWg.Add(1) + go func() { + defer serverWg.Done() + conf.ClientOptions.DebugLog(fmt.Sprintf("HTTP server listening on %s:%d", conf.Interface, conf.HTTPPort)) + if err := a.httpServer.Serve(a.httpListener); err != nil && err != http.ErrServerClosed { + if atomic.LoadUint32(&a.isRunning) == 1 { + conf.ClientOptions.OnError(fmt.Errorf("http serve: %v", err)) + } + } + }() + } + + serverWg.Wait() + close(chStopped) + }() + + return a, chStopped, nil +} + +func (a *OTelAdapter) Close() error { + a.conf.ClientOptions.DebugLog("closing") + atomic.StoreUint32(&a.isRunning, 0) + + if a.grpcServer != nil { + a.grpcServer.GracefulStop() + } + if a.httpServer != nil { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + a.httpServer.Shutdown(ctx) + } + + a.wg.Wait() + + err1 := a.uspClient.Drain(1 * time.Minute) + _, err2 := a.uspClient.Close() + if err1 != nil { + return err1 + } + return err2 +} + +func (a *OTelAdapter) shipProto(eventType string, protoMsg proto.Message) { + jsonBytes, err := protojson.Marshal(protoMsg) + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("protojson.Marshal(): %v", err)) + return + } + var payload map[string]interface{} + if err := json.Unmarshal(jsonBytes, &payload); err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("json.Unmarshal(): %v", err)) + return + } + msg := &protocol.DataMessage{ + JsonPayload: payload, + EventType: eventType, + TimestampMs: uint64(time.Now().UnixMilli()), + } + err = a.uspClient.Ship(msg, a.writeTimeout) + if err == uspclient.ErrorBufferFull { + a.conf.ClientOptions.OnWarning("stream falling behind") + err = a.uspClient.Ship(msg, 1*time.Hour) + } + if err != nil { + a.conf.ClientOptions.OnError(fmt.Errorf("Ship(): %v", err)) + } +} + +// gRPC service implementations + +type traceServiceServer struct { + coltracepb.UnimplementedTraceServiceServer + adapter *OTelAdapter +} + +func (s *traceServiceServer) Export(ctx context.Context, req *coltracepb.ExportTraceServiceRequest) (*coltracepb.ExportTraceServiceResponse, error) { + s.adapter.processTraces(req) + return &coltracepb.ExportTraceServiceResponse{}, nil +} + +type metricsServiceServer struct { + colmetricspb.UnimplementedMetricsServiceServer + adapter *OTelAdapter +} + +func (s *metricsServiceServer) Export(ctx context.Context, req *colmetricspb.ExportMetricsServiceRequest) (*colmetricspb.ExportMetricsServiceResponse, error) { + s.adapter.processMetrics(req) + return &colmetricspb.ExportMetricsServiceResponse{}, nil +} + +type logsServiceServer struct { + collogspb.UnimplementedLogsServiceServer + adapter *OTelAdapter +} + +func (s *logsServiceServer) Export(ctx context.Context, req *collogspb.ExportLogsServiceRequest) (*collogspb.ExportLogsServiceResponse, error) { + s.adapter.processLogs(req) + return &collogspb.ExportLogsServiceResponse{}, nil +} + +// HTTP handlers + +func (a *OTelAdapter) handleHTTPTraces(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + body, err := io.ReadAll(io.LimitReader(r.Body, maxHTTPRequestBody)) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + req := &coltracepb.ExportTraceServiceRequest{} + if err := unmarshalOTLPRequest(r, body, req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + a.processTraces(req) + marshalOTLPResponse(w, r, &coltracepb.ExportTraceServiceResponse{}) +} + +func (a *OTelAdapter) handleHTTPMetrics(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + body, err := io.ReadAll(io.LimitReader(r.Body, maxHTTPRequestBody)) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + req := &colmetricspb.ExportMetricsServiceRequest{} + if err := unmarshalOTLPRequest(r, body, req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + a.processMetrics(req) + marshalOTLPResponse(w, r, &colmetricspb.ExportMetricsServiceResponse{}) +} + +func (a *OTelAdapter) handleHTTPLogs(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + body, err := io.ReadAll(io.LimitReader(r.Body, maxHTTPRequestBody)) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + req := &collogspb.ExportLogsServiceRequest{} + if err := unmarshalOTLPRequest(r, body, req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + a.processLogs(req) + marshalOTLPResponse(w, r, &collogspb.ExportLogsServiceResponse{}) +} + +func unmarshalOTLPRequest(r *http.Request, body []byte, msg proto.Message) error { + ct := r.Header.Get("Content-Type") + if strings.HasPrefix(ct, "application/json") { + return protojson.Unmarshal(body, msg) + } + return proto.Unmarshal(body, msg) +} + +func marshalOTLPResponse(w http.ResponseWriter, r *http.Request, msg proto.Message) { + ct := r.Header.Get("Content-Type") + if strings.HasPrefix(ct, "application/json") { + respBytes, err := protojson.Marshal(msg) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(respBytes) + return + } + respBytes, err := proto.Marshal(msg) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/x-protobuf") + w.Write(respBytes) +} + +// Processing functions: pass through the original OTLP request as-is. + +func (a *OTelAdapter) processTraces(req *coltracepb.ExportTraceServiceRequest) { + a.shipProto(eventTypeOTelTrace, req) +} + +func (a *OTelAdapter) processMetrics(req *colmetricspb.ExportMetricsServiceRequest) { + a.shipProto(eventTypeOTelMetric, req) +} + +func (a *OTelAdapter) processLogs(req *collogspb.ExportLogsServiceRequest) { + a.shipProto(eventTypeOTelLog, req) +} diff --git a/otel/client_test.go b/otel/client_test.go new file mode 100644 index 0000000..4417ad4 --- /dev/null +++ b/otel/client_test.go @@ -0,0 +1,789 @@ +package usp_otel + +import ( + "bytes" + "context" + "encoding/hex" + "fmt" + "io" + "net" + "net/http" + "testing" + "time" + + "github.com/refractionPOINT/go-uspclient" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1" + colmetricspb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" + coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" + commonpb "go.opentelemetry.io/proto/otlp/common/v1" + logspb "go.opentelemetry.io/proto/otlp/logs/v1" + metricspb "go.opentelemetry.io/proto/otlp/metrics/v1" + resourcepb "go.opentelemetry.io/proto/otlp/resource/v1" + tracepb "go.opentelemetry.io/proto/otlp/trace/v1" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" +) + +func testClientOptions(t *testing.T) uspclient.ClientOptions { + return uspclient.ClientOptions{ + TestSinkMode: true, + OnError: func(err error) { + t.Logf("ERROR: %v", err) + }, + OnWarning: func(msg string) { + t.Logf("WARNING: %s", msg) + }, + DebugLog: func(msg string) { + t.Logf("DEBUG: %s", msg) + }, + } +} + +func makeTraceID() []byte { + b, _ := hex.DecodeString("0102030405060708090a0b0c0d0e0f10") + return b +} + +func makeSpanID() []byte { + b, _ := hex.DecodeString("0102030405060708") + return b +} + +// --- Validation tests --- + +func validClientOptions() uspclient.ClientOptions { + return uspclient.ClientOptions{ + TestSinkMode: true, + Identity: uspclient.Identity{ + Oid: "test-oid", + InstallationKey: "test-key", + }, + Platform: "json", + } +} + +func TestOTelConfigValidate(t *testing.T) { + t.Run("valid with grpc", func(t *testing.T) { + conf := OTelConfig{ + ClientOptions: validClientOptions(), + GRPCPort: 4317, + } + assert.NoError(t, conf.Validate()) + }) + + t.Run("valid with http", func(t *testing.T) { + conf := OTelConfig{ + ClientOptions: validClientOptions(), + HTTPPort: 4318, + } + assert.NoError(t, conf.Validate()) + }) + + t.Run("valid with both", func(t *testing.T) { + conf := OTelConfig{ + ClientOptions: validClientOptions(), + GRPCPort: 4317, + HTTPPort: 4318, + } + assert.NoError(t, conf.Validate()) + }) + + t.Run("invalid no ports", func(t *testing.T) { + conf := OTelConfig{ + ClientOptions: validClientOptions(), + } + err := conf.Validate() + assert.Error(t, err) + assert.Contains(t, err.Error(), "grpc_port or http_port") + }) + + t.Run("invalid client options", func(t *testing.T) { + conf := OTelConfig{ + ClientOptions: uspclient.ClientOptions{}, + GRPCPort: 4317, + } + err := conf.Validate() + assert.Error(t, err) + assert.Contains(t, err.Error(), "client_options") + }) +} + +// --- Integration tests: gRPC --- + +func TestGRPCTraceExport(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + GRPCPort: getFreePort(t), + } + + ctx := context.Background() + adapter, chStopped, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + conn, err := grpc.NewClient( + fmt.Sprintf("localhost:%d", conf.GRPCPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer conn.Close() + + client := coltracepb.NewTraceServiceClient(conn) + + req := &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: []*tracepb.ResourceSpans{ + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "test-svc"}}}, + }, + }, + ScopeSpans: []*tracepb.ScopeSpans{ + { + Scope: &commonpb.InstrumentationScope{Name: "test-lib", Version: "1.0"}, + Spans: []*tracepb.Span{ + { + TraceId: makeTraceID(), + SpanId: makeSpanID(), + Name: "test-span", + Kind: tracepb.Span_SPAN_KIND_SERVER, + StartTimeUnixNano: 1000000000000, + EndTimeUnixNano: 2000000000000, + }, + }, + }, + }, + }, + }, + } + + resp, err := client.Export(ctx, req) + require.NoError(t, err) + assert.NotNil(t, resp) + + select { + case <-chStopped: + t.Fatal("adapter stopped unexpectedly") + default: + } +} + +func TestGRPCMetricsExport(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + GRPCPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + conn, err := grpc.NewClient( + fmt.Sprintf("localhost:%d", conf.GRPCPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer conn.Close() + + client := colmetricspb.NewMetricsServiceClient(conn) + + req := &colmetricspb.ExportMetricsServiceRequest{ + ResourceMetrics: []*metricspb.ResourceMetrics{ + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "test-svc"}}}, + }, + }, + ScopeMetrics: []*metricspb.ScopeMetrics{ + { + Scope: &commonpb.InstrumentationScope{Name: "test-lib"}, + Metrics: []*metricspb.Metric{ + { + Name: "test.counter", + Description: "A test counter", + Unit: "1", + Data: &metricspb.Metric_Sum{ + Sum: &metricspb.Sum{ + AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + IsMonotonic: true, + DataPoints: []*metricspb.NumberDataPoint{ + { + TimeUnixNano: 2000000000000, + Value: &metricspb.NumberDataPoint_AsInt{AsInt: 42}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + resp, err := client.Export(ctx, req) + require.NoError(t, err) + assert.NotNil(t, resp) +} + +func TestGRPCLogsExport(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + GRPCPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + conn, err := grpc.NewClient( + fmt.Sprintf("localhost:%d", conf.GRPCPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer conn.Close() + + client := collogspb.NewLogsServiceClient(conn) + + req := &collogspb.ExportLogsServiceRequest{ + ResourceLogs: []*logspb.ResourceLogs{ + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "test-svc"}}}, + }, + }, + ScopeLogs: []*logspb.ScopeLogs{ + { + Scope: &commonpb.InstrumentationScope{Name: "test-lib"}, + LogRecords: []*logspb.LogRecord{ + { + TimeUnixNano: 1000000000000, + SeverityNumber: logspb.SeverityNumber_SEVERITY_NUMBER_INFO, + SeverityText: "INFO", + Body: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "test log message"}}, + }, + }, + }, + }, + }, + }, + } + + resp, err := client.Export(ctx, req) + require.NoError(t, err) + assert.NotNil(t, resp) +} + +// --- Integration tests: HTTP --- + +func TestHTTPTraceExportProtobuf(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + req := &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: []*tracepb.ResourceSpans{ + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "http-test"}}}, + }, + }, + ScopeSpans: []*tracepb.ScopeSpans{ + { + Spans: []*tracepb.Span{ + { + TraceId: makeTraceID(), + SpanId: makeSpanID(), + Name: "http-span", + Kind: tracepb.Span_SPAN_KIND_CLIENT, + StartTimeUnixNano: 1000000000000, + EndTimeUnixNano: 2000000000000, + }, + }, + }, + }, + }, + }, + } + + body, err := proto.Marshal(req) + require.NoError(t, err) + + resp, err := http.Post( + fmt.Sprintf("http://localhost:%d/v1/traces", conf.HTTPPort), + "application/x-protobuf", + bytes.NewReader(body), + ) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type")) + + respBody, err := io.ReadAll(resp.Body) + require.NoError(t, err) + respMsg := &coltracepb.ExportTraceServiceResponse{} + require.NoError(t, proto.Unmarshal(respBody, respMsg)) +} + +func TestHTTPTraceExportJSON(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + req := &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: []*tracepb.ResourceSpans{ + { + ScopeSpans: []*tracepb.ScopeSpans{ + { + Spans: []*tracepb.Span{ + { + TraceId: makeTraceID(), + SpanId: makeSpanID(), + Name: "json-span", + StartTimeUnixNano: 1000000000000, + EndTimeUnixNano: 2000000000000, + }, + }, + }, + }, + }, + }, + } + + body, err := protojson.Marshal(req) + require.NoError(t, err) + + resp, err := http.Post( + fmt.Sprintf("http://localhost:%d/v1/traces", conf.HTTPPort), + "application/json", + bytes.NewReader(body), + ) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "application/json", resp.Header.Get("Content-Type")) +} + +func TestHTTPMetricsExportProtobuf(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + req := &colmetricspb.ExportMetricsServiceRequest{ + ResourceMetrics: []*metricspb.ResourceMetrics{ + { + ScopeMetrics: []*metricspb.ScopeMetrics{ + { + Metrics: []*metricspb.Metric{ + { + Name: "test.gauge", + Data: &metricspb.Metric_Gauge{ + Gauge: &metricspb.Gauge{ + DataPoints: []*metricspb.NumberDataPoint{ + {TimeUnixNano: 2000000000000, Value: &metricspb.NumberDataPoint_AsDouble{AsDouble: 99.9}}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + body, err := proto.Marshal(req) + require.NoError(t, err) + + resp, err := http.Post( + fmt.Sprintf("http://localhost:%d/v1/metrics", conf.HTTPPort), + "application/x-protobuf", + bytes.NewReader(body), + ) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) +} + +func TestHTTPLogsExportProtobuf(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + req := &collogspb.ExportLogsServiceRequest{ + ResourceLogs: []*logspb.ResourceLogs{ + { + ScopeLogs: []*logspb.ScopeLogs{ + { + LogRecords: []*logspb.LogRecord{ + { + TimeUnixNano: 1000000000000, + SeverityNumber: logspb.SeverityNumber_SEVERITY_NUMBER_WARN, + SeverityText: "WARN", + Body: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "disk space low"}}, + }, + }, + }, + }, + }, + }, + } + + body, err := proto.Marshal(req) + require.NoError(t, err) + + resp, err := http.Post( + fmt.Sprintf("http://localhost:%d/v1/logs", conf.HTTPPort), + "application/x-protobuf", + bytes.NewReader(body), + ) + require.NoError(t, err) + defer resp.Body.Close() + + assert.Equal(t, http.StatusOK, resp.StatusCode) +} + +func TestHTTPMethodNotAllowed(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + endpoints := []string{"/v1/traces", "/v1/metrics", "/v1/logs"} + for _, ep := range endpoints { + t.Run(ep, func(t *testing.T) { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d%s", conf.HTTPPort, ep)) + require.NoError(t, err) + defer resp.Body.Close() + assert.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode) + }) + } +} + +func TestHTTPBadBody(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + resp, err := http.Post( + fmt.Sprintf("http://localhost:%d/v1/traces", conf.HTTPPort), + "application/x-protobuf", + bytes.NewReader([]byte("not valid protobuf")), + ) + require.NoError(t, err) + defer resp.Body.Close() + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) +} + +func TestHTTPBadJSON(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + resp, err := http.Post( + fmt.Sprintf("http://localhost:%d/v1/traces", conf.HTTPPort), + "application/json", + bytes.NewReader([]byte("{invalid json")), + ) + require.NoError(t, err) + defer resp.Body.Close() + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) +} + +// --- Adapter lifecycle tests --- + +func TestAdapterGRPCOnly(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + GRPCPort: getFreePort(t), + } + + ctx := context.Background() + adapter, chStopped, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + + select { + case <-chStopped: + t.Fatal("adapter stopped unexpectedly") + default: + } + + require.NoError(t, adapter.Close()) + + select { + case <-chStopped: + case <-time.After(5 * time.Second): + t.Fatal("adapter didn't stop after Close()") + } +} + +func TestAdapterHTTPOnly(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, chStopped, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + + select { + case <-chStopped: + t.Fatal("adapter stopped unexpectedly") + default: + } + + require.NoError(t, adapter.Close()) + + select { + case <-chStopped: + case <-time.After(5 * time.Second): + t.Fatal("adapter didn't stop after Close()") + } +} + +func TestAdapterBothPorts(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + GRPCPort: getFreePort(t), + HTTPPort: getFreePort(t), + } + + ctx := context.Background() + adapter, chStopped, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // Send via gRPC + conn, err := grpc.NewClient( + fmt.Sprintf("localhost:%d", conf.GRPCPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer conn.Close() + + traceClient := coltracepb.NewTraceServiceClient(conn) + _, err = traceClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: []*tracepb.ResourceSpans{ + {ScopeSpans: []*tracepb.ScopeSpans{ + {Spans: []*tracepb.Span{{TraceId: makeTraceID(), SpanId: makeSpanID(), Name: "grpc-span", StartTimeUnixNano: 1000000000000, EndTimeUnixNano: 2000000000000}}}, + }}, + }, + }) + require.NoError(t, err) + + // Send via HTTP + httpReq := &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: []*tracepb.ResourceSpans{ + {ScopeSpans: []*tracepb.ScopeSpans{ + {Spans: []*tracepb.Span{{TraceId: makeTraceID(), SpanId: makeSpanID(), Name: "http-span", StartTimeUnixNano: 1000000000000, EndTimeUnixNano: 2000000000000}}}, + }}, + }, + } + body, err := proto.Marshal(httpReq) + require.NoError(t, err) + resp, err := http.Post( + fmt.Sprintf("http://localhost:%d/v1/traces", conf.HTTPPort), + "application/x-protobuf", + bytes.NewReader(body), + ) + require.NoError(t, err) + resp.Body.Close() + assert.Equal(t, http.StatusOK, resp.StatusCode) + + require.NoError(t, adapter.Close()) + select { + case <-chStopped: + case <-time.After(5 * time.Second): + t.Fatal("adapter didn't stop after Close()") + } +} + +func TestGRPCMultipleResourceSpans(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + GRPCPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + conn, err := grpc.NewClient( + fmt.Sprintf("localhost:%d", conf.GRPCPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer conn.Close() + + client := coltracepb.NewTraceServiceClient(conn) + + req := &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: []*tracepb.ResourceSpans{ + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "svc-a"}}}, + }, + }, + ScopeSpans: []*tracepb.ScopeSpans{ + { + Scope: &commonpb.InstrumentationScope{Name: "lib-1"}, + Spans: []*tracepb.Span{ + {TraceId: makeTraceID(), SpanId: makeSpanID(), Name: "span-1", StartTimeUnixNano: 1000000000000, EndTimeUnixNano: 2000000000000}, + {TraceId: makeTraceID(), SpanId: makeSpanID(), Name: "span-2", StartTimeUnixNano: 1000000000000, EndTimeUnixNano: 2000000000000}, + }, + }, + { + Scope: &commonpb.InstrumentationScope{Name: "lib-2"}, + Spans: []*tracepb.Span{ + {TraceId: makeTraceID(), SpanId: makeSpanID(), Name: "span-3", StartTimeUnixNano: 1000000000000, EndTimeUnixNano: 2000000000000}, + }, + }, + }, + }, + { + Resource: &resourcepb.Resource{ + Attributes: []*commonpb.KeyValue{ + {Key: "service.name", Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "svc-b"}}}, + }, + }, + ScopeSpans: []*tracepb.ScopeSpans{ + { + Spans: []*tracepb.Span{ + {TraceId: makeTraceID(), SpanId: makeSpanID(), Name: "span-4", StartTimeUnixNano: 1000000000000, EndTimeUnixNano: 2000000000000}, + }, + }, + }, + }, + }, + } + + resp, err := client.Export(ctx, req) + require.NoError(t, err) + assert.NotNil(t, resp) +} + +func TestGRPCEmptyRequest(t *testing.T) { + conf := OTelConfig{ + ClientOptions: testClientOptions(t), + GRPCPort: getFreePort(t), + } + + ctx := context.Background() + adapter, _, err := NewOTelAdapter(ctx, conf) + require.NoError(t, err) + defer adapter.Close() + + time.Sleep(100 * time.Millisecond) + + conn, err := grpc.NewClient( + fmt.Sprintf("localhost:%d", conf.GRPCPort), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + defer conn.Close() + + traceClient := coltracepb.NewTraceServiceClient(conn) + _, err = traceClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{}) + assert.NoError(t, err) + + metricsClient := colmetricspb.NewMetricsServiceClient(conn) + _, err = metricsClient.Export(ctx, &colmetricspb.ExportMetricsServiceRequest{}) + assert.NoError(t, err) + + logsClient := collogspb.NewLogsServiceClient(conn) + _, err = logsClient.Export(ctx, &collogspb.ExportLogsServiceRequest{}) + assert.NoError(t, err) +} + +// --- Helpers --- + +func getFreePort(t *testing.T) uint16 { + t.Helper() + l, err := (&net.ListenConfig{}).Listen(context.Background(), "tcp", "127.0.0.1:0") + require.NoError(t, err) + port := l.Addr().(*net.TCPAddr).Port + l.Close() + return uint16(port) +}