From c119395dc5df1579154516e23fe2fc60ad2138c2 Mon Sep 17 00:00:00 2001 From: Luke Steensen Date: Thu, 22 May 2025 21:21:10 -0500 Subject: [PATCH] add preaggregation pipeline dual shipping support Signed-off-by: Luke Steensen --- .../defaultforwarder/default_forwarder.go | 18 +++ .../defaultforwarder/endpoints/endpoints.go | 2 + .../transaction/transaction.go | 4 + pkg/config/setup/config.go | 1 + pkg/config/setup/preaggregation.go | 16 +++ pkg/config/utils/endpoints.go | 20 ++++ .../internal/metrics/iterable_series.go | 90 ++++++--------- pkg/serializer/serializer.go | 106 +++++++++++------- pkg/serializer/series_benchmark_test.go | 9 +- 9 files changed, 166 insertions(+), 100 deletions(-) create mode 100644 pkg/config/setup/preaggregation.go diff --git a/comp/forwarder/defaultforwarder/default_forwarder.go b/comp/forwarder/defaultforwarder/default_forwarder.go index 15ceaf69b74d..bec3745b6c7c 100644 --- a/comp/forwarder/defaultforwarder/default_forwarder.go +++ b/comp/forwarder/defaultforwarder/default_forwarder.go @@ -502,6 +502,24 @@ func (f *DefaultForwarder) createAdvancedHTTPTransactions(endpoint transaction.E for _, payload := range payloads { for domain, dr := range f.domainResolvers { drDomain, destinationType := dr.Resolve(endpoint) // drDomain is the domain with agent version if not local + + // If the payload has a preaggr destination, but the current domain + // is not the configured preaggr site, do not create a transaction. + // + // If we have a preaggr payload and domain, switch from the standard + // series endpoint to the preaggr-specific series endpoint. + if payload.Destination == transaction.PreaggrOnly { + if drDomain != f.config.GetString("preaggr_dd_url") { + continue + } else { + endpoint = endpoints.PreaggrSeriesEndpoint + } + } + // TODO(?): If the preaggr_dd_url is the same as the primary dd_url, + // we will also inherit any additional API keys from the + // configuration of that site, meaning we'll send preaggr payloads + // for each of those orgs. Not sure if this is a problem or not. + if payload.Destination == transaction.LocalOnly { // if it is local payload, we should not send it to the remote endpoint if destinationType == pkgresolver.Local && endpoint == endpoints.SeriesEndpoint { diff --git a/comp/forwarder/defaultforwarder/endpoints/endpoints.go b/comp/forwarder/defaultforwarder/endpoints/endpoints.go index 1dacd8fbacfd..f83f412d05c9 100644 --- a/comp/forwarder/defaultforwarder/endpoints/endpoints.go +++ b/comp/forwarder/defaultforwarder/endpoints/endpoints.go @@ -25,6 +25,8 @@ var ( // SeriesEndpoint is the v2 endpoint used to send series SeriesEndpoint = transaction.Endpoint{Route: "/api/v2/series", Name: "series_v2"} + // PreaggrSeriesEndpoint is the endpoint used for experimental preaggregation + PreaggrSeriesEndpoint = transaction.Endpoint{Route: "/api/intake/pipelines/ddseries", Name: "preaggr_series"} // EventsEndpoint is the v2 endpoint used to send events EventsEndpoint = transaction.Endpoint{Route: "/api/v2/events", Name: "events_v2"} // ServiceChecksEndpoint is the v2 endpoint used to send service checks diff --git a/comp/forwarder/defaultforwarder/transaction/transaction.go b/comp/forwarder/defaultforwarder/transaction/transaction.go index c5d9056bea5f..a2ed5c40e632 100644 --- a/comp/forwarder/defaultforwarder/transaction/transaction.go +++ b/comp/forwarder/defaultforwarder/transaction/transaction.go @@ -206,6 +206,8 @@ const ( SecondaryOnly // LocalOnly indicates the transaction should be sent to the local endpoint (cluster-agent) only LocalOnly + // PreaggrOnly indicates the transaction should be sent to the pre-aggregation endpoint only + PreaggrOnly ) func (d Destination) String() string { @@ -218,6 +220,8 @@ func (d Destination) String() string { return "SecondaryOnly" case LocalOnly: return "LocalOnly" + case PreaggrOnly: + return "PreaggrOnly" default: return "Unknown" } diff --git a/pkg/config/setup/config.go b/pkg/config/setup/config.go index cafc7ab68075..e65334e16431 100644 --- a/pkg/config/setup/config.go +++ b/pkg/config/setup/config.go @@ -233,6 +233,7 @@ var serverlessConfigComponents = []func(pkgconfigmodel.Setup){ setupAPM, OTLP, setupMultiRegionFailover, + setupPreaggregation, telemetry, autoconfig, remoteconfig, diff --git a/pkg/config/setup/preaggregation.go b/pkg/config/setup/preaggregation.go new file mode 100644 index 000000000000..0586aa18110c --- /dev/null +++ b/pkg/config/setup/preaggregation.go @@ -0,0 +1,16 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package setup + +import ( + pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model" +) + +func setupPreaggregation(config pkgconfigmodel.Setup) { + config.BindEnvAndSetDefault("enable_preaggr_pipeline", false) + config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datad0g.com") + config.BindEnv("preaggr_api_key") +} \ No newline at end of file diff --git a/pkg/config/utils/endpoints.go b/pkg/config/utils/endpoints.go index cde28dfc5349..bcd06cfab85b 100644 --- a/pkg/config/utils/endpoints.go +++ b/pkg/config/utils/endpoints.go @@ -165,6 +165,26 @@ func GetMultipleEndpoints(c pkgconfigmodel.Reader) (map[string][]APIKeys, error) Keys: []string{c.GetString("multi_region_failover.api_key")}, }} } + + // Populate preaggregation endpoint (only if unique) + // + // TODO(?): This assumes we wouldn't have a unique preaggregation API key + // without a unique preaggregation URL, but we should validate that. + if c.GetBool("enable_preaggr_pipeline") { + preaggURL := c.GetString("preaggr_dd_url") + // Check that it's not the same as the primary URL + if preaggURL != "" && preaggURL != ddURL { + // Check if preaggregation URL already exists in additional endpoints + if _, exists := additionalEndpoints[preaggURL]; !exists { + // Unique URL - create new domain resolver with preaggregation API key + additionalEndpoints[preaggURL] = []APIKeys{{ + ConfigSettingPath: "preaggr_api_key", + Keys: []string{c.GetString("preaggr_api_key")}, + }} + } + } + } + return mergeAdditionalEndpoints(keysPerDomain, additionalEndpoints) } diff --git a/pkg/serializer/internal/metrics/iterable_series.go b/pkg/serializer/internal/metrics/iterable_series.go index f124baeb77b2..4f2106b74bd8 100644 --- a/pkg/serializer/internal/metrics/iterable_series.go +++ b/pkg/serializer/internal/metrics/iterable_series.go @@ -106,77 +106,41 @@ func describeItem(serie *metrics.Serie) string { return fmt.Sprintf("name %q, %d points", serie.Name, len(serie.Points)) } -// MarshalSplitCompress uses the stream compressor to marshal and compress series payloads. -// If a compressed payload is larger than the max, a new payload will be generated. This method returns a slice of -// compressed protobuf marshaled MetricPayload objects. -func (series *IterableSeries) MarshalSplitCompress(bufferContext *marshaler.BufferContext, config config.Component, strategy compression.Component) (transaction.BytesPayloads, error) { - pb, err := series.NewPayloadsBuilder(bufferContext, config, strategy) - if err != nil { - return nil, err - } - - err = pb.startPayload() - if err != nil { - return nil, err - } - - // Use series.source.MoveNext() instead of series.MoveNext() because this function supports - // the serie.NoIndex field. - for series.source.MoveNext() { - err = pb.writeSerie(series.source.Current()) - if err != nil { - return nil, err - } - } - - // if the last payload has any data, flush it - err = pb.finishPayload() - if err != nil { - return nil, err - } - - return pb.payloads, nil +type Pipeline struct { + FilterFunc func(s *metrics.Serie) bool + Destination transaction.Destination } -// MarshalSplitCompressMultiple uses the stream compressor to marshal and compress one series into three sets of payloads. -// One set of payloads contains all metrics, -// The seond contains only those that pass the provided MRF filter function. -// The third contains only those that pass the provided autoscaling local failover filter function. -// This function exists because we need a way to build both payloads in a single pass over the input data, which cannot be iterated over twice. -func (series *IterableSeries) MarshalSplitCompressMultiple(config config.Component, strategy compression.Component, filterFuncForMRF func(s *metrics.Serie) bool, filterFuncForAutoscaling func(s *metrics.Serie) bool) (transaction.BytesPayloads, transaction.BytesPayloads, transaction.BytesPayloads, error) { - pbs := make([]*PayloadsBuilder, 3) // 0: all, 1: MRF, 2: autoscaling +// MarshalSplitCompressPipelines uses the stream compressor to marshal and +// compress series payloads, allowing multiple variants to be generated in a +// single pass over the input data. If a compressed payload is larger than the +// max, a new payload will be generated. This method returns a slice of +// compressed protobuf marshaled MetricPayload objects. +func (series *IterableSeries) MarshalSplitCompressPipelines(config config.Component, strategy compression.Component, pipelines []Pipeline) (transaction.BytesPayloads, error) { + pbs := make([]*PayloadsBuilder, len(pipelines)) for i := range pbs { bufferContext := marshaler.NewBufferContext() pb, err := series.NewPayloadsBuilder(bufferContext, config, strategy) if err != nil { - return nil, nil, nil, err + return nil, err } pbs[i] = &pb err = pbs[i].startPayload() if err != nil { - return nil, nil, nil, err + return nil, err } } + // Use series.source.MoveNext() instead of series.MoveNext() because this function supports // the serie.NoIndex field. for series.source.MoveNext() { - err := pbs[0].writeSerie(series.source.Current()) - if err != nil { - return nil, nil, nil, err - } - - if filterFuncForMRF(series.source.Current()) { - err = pbs[1].writeSerie(series.source.Current()) - if err != nil { - return nil, nil, nil, err - } - } - - if filterFuncForAutoscaling(series.source.Current()) { - err = pbs[2].writeSerie(series.source.Current()) - if err != nil { - return nil, nil, nil, err + for i, pipeline := range pipelines { + if pipeline.FilterFunc(series.source.Current()) { + err := pbs[i].writeSerie(series.source.Current()) + if err != nil { + return nil, err + } } } } @@ -185,11 +149,23 @@ func (series *IterableSeries) MarshalSplitCompressMultiple(config config.Compone for i := range pbs { err := pbs[i].finishPayload() if err != nil { - return nil, nil, nil, err + return nil, err + } + } + + // assign destinations to payloads per strategy + for i, pipeline := range pipelines { + for _, payload := range pbs[i].payloads { + payload.Destination = pipeline.Destination } } - return pbs[0].payloads, pbs[1].payloads, pbs[2].payloads, nil + payloads := make([]*transaction.BytesPayload, len(pbs)) + for _, pb := range pbs { + payloads = append(payloads, pb.payloads...) + } + + return payloads, nil } // NewPayloadsBuilder initializes a new PayloadsBuilder to be used for serializing series into a set of output payloads. diff --git a/pkg/serializer/serializer.go b/pkg/serializer/serializer.go index 9dc3708670e9..d5a1ffdf7553 100644 --- a/pkg/serializer/serializer.go +++ b/pkg/serializer/serializer.go @@ -8,17 +8,22 @@ package serializer import ( "bytes" + "context" "encoding/json" "errors" "expvar" "fmt" "net/http" + "slices" "strconv" "time" forwarder "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder/transaction" orchestratorForwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorinterface" + hostMetadataUtils "github.com/DataDog/datadog-agent/comp/metadata/host/hostimpl/hosttags" + pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" + "github.com/DataDog/datadog-agent/pkg/tagset" "github.com/DataDog/datadog-agent/comp/core/config" "github.com/DataDog/datadog-agent/pkg/metrics" @@ -342,54 +347,75 @@ func (s *Serializer) SendIterableSeries(serieSource metrics.SerieSource) error { var extraHeaders http.Header var err error - if useV1API && s.enableJSONStream { - seriesBytesPayloads, extraHeaders, err = s.serializeIterableStreamablePayload(seriesSerializer, stream.DropItemOnErrItemTooBig) - } else if useV1API && !s.enableJSONStream { - seriesBytesPayloads, extraHeaders, err = s.serializePayloadJSON(seriesSerializer, true) - } else { - failoverActiveForMRF, allowlistForMRF := s.getFailoverAllowlist() - failoverActiveForAutoscaling, allowlistForAutoscaling := s.getAutoscalingFailoverMetrics() - failoverActive := (failoverActiveForMRF && len(allowlistForMRF) > 0) || (failoverActiveForAutoscaling && len(allowlistForAutoscaling) > 0) - if failoverActive { - var filtered transaction.BytesPayloads - var localAutoscalingFaioverPayloads transaction.BytesPayloads - seriesBytesPayloads, filtered, localAutoscalingFaioverPayloads, err = seriesSerializer.MarshalSplitCompressMultiple(s.config, s.Strategy, - func(s *metrics.Serie) bool { // Filter for MRF - _, allowed := allowlistForMRF[s.Name] - return allowed - }, - func(s *metrics.Serie) bool { // Filter for Autoscaling - _, allowed := allowlistForAutoscaling[s.Name] - return allowed - }) - - for _, seriesBytesPayload := range seriesBytesPayloads { - seriesBytesPayload.Destination = transaction.PrimaryOnly - } - for _, seriesBytesPayload := range filtered { - seriesBytesPayload.Destination = transaction.SecondaryOnly - } - for _, seriesBytesPayload := range localAutoscalingFaioverPayloads { - seriesBytesPayload.Destination = transaction.LocalOnly - } - seriesBytesPayloads = append(seriesBytesPayloads, filtered...) - seriesBytesPayloads = append(seriesBytesPayloads, localAutoscalingFaioverPayloads...) + if useV1API { + if s.enableJSONStream { + seriesBytesPayloads, extraHeaders, err = s.serializeIterableStreamablePayload(seriesSerializer, stream.DropItemOnErrItemTooBig) } else { - seriesBytesPayloads, err = seriesSerializer.MarshalSplitCompress(marshaler.NewBufferContext(), s.config, s.Strategy) - for _, seriesBytesPayload := range seriesBytesPayloads { - seriesBytesPayload.Destination = transaction.AllRegions - } + seriesBytesPayloads, extraHeaders, err = s.serializePayloadJSON(seriesSerializer, true) } - extraHeaders = s.protobufExtraHeadersWithCompression + if err != nil { + return fmt.Errorf("dropping series payload: %s", err) + } + return s.Forwarder.SubmitV1Series(seriesBytesPayloads, extraHeaders) } + failoverActiveForMRF, allowlistForMRF := s.getFailoverAllowlist() + failoverActiveForAutoscaling, allowlistForAutoscaling := s.getAutoscalingFailoverMetrics() + failoverActive := (failoverActiveForMRF && len(allowlistForMRF) > 0) || (failoverActiveForAutoscaling && len(allowlistForAutoscaling) > 0) + pipelines := make([]metricsserializer.Pipeline, 0, 4) + if failoverActive { + // Default behavior, primary region only + pipelines = append(pipelines, metricsserializer.Pipeline{ + FilterFunc: func(series *metrics.Serie) bool { return true }, + Destination: transaction.PrimaryOnly, + }) + + // Filter for MRF + pipelines = append(pipelines, metricsserializer.Pipeline{ + FilterFunc: func(s *metrics.Serie) bool { + _, allowed := allowlistForMRF[s.Name] + return allowed + }, + Destination: transaction.SecondaryOnly, + }) + + // Filter for Autoscaling + pipelines = append(pipelines, metricsserializer.Pipeline{ + FilterFunc: func(s *metrics.Serie) bool { + _, allowed := allowlistForAutoscaling[s.Name] + return allowed + }, + Destination: transaction.LocalOnly, + }) + } else { + // Default behavior, all regions + pipelines = append(pipelines, metricsserializer.Pipeline{ + FilterFunc: func(series *metrics.Serie) bool { return true }, + Destination: transaction.AllRegions, + }) + } + + // We are modifying the series in this filter, so it must always be the last + // pipeline in the list to avoid affecting other pipelines. + if s.config.GetBool("enable_preaggr_pipeline") { + pipelines = append(pipelines, metricsserializer.Pipeline{ + FilterFunc: func(s *metrics.Serie) bool { + // TODO: don't add host tags if they were already added because of `expected_tags_duration` being set + hostTags := slices.Clone(hostMetadataUtils.Get(context.TODO(), false, pkgconfigsetup.Datadog()).System) + s.Tags = tagset.CombineCompositeTagsAndSlice(s.Tags, hostTags) + return true + }, + Destination: transaction.PreaggrOnly, + }) + } + + seriesBytesPayloads, err = seriesSerializer.MarshalSplitCompressPipelines(s.config, s.Strategy, pipelines) + extraHeaders = s.protobufExtraHeadersWithCompression + if err != nil { return fmt.Errorf("dropping series payload: %s", err) } - if useV1API { - return s.Forwarder.SubmitV1Series(seriesBytesPayloads, extraHeaders) - } return s.Forwarder.SubmitSeries(seriesBytesPayloads, extraHeaders) } diff --git a/pkg/serializer/series_benchmark_test.go b/pkg/serializer/series_benchmark_test.go index 2d06eb2191fa..26c14844d35a 100644 --- a/pkg/serializer/series_benchmark_test.go +++ b/pkg/serializer/series_benchmark_test.go @@ -20,7 +20,6 @@ import ( "github.com/DataDog/datadog-agent/pkg/metrics" metricsserializer "github.com/DataDog/datadog-agent/pkg/serializer/internal/metrics" "github.com/DataDog/datadog-agent/pkg/serializer/internal/stream" - "github.com/DataDog/datadog-agent/pkg/serializer/marshaler" "github.com/DataDog/datadog-agent/pkg/tagset" ) @@ -76,12 +75,16 @@ func BenchmarkSeries(b *testing.B) { b.ReportMetric(float64(payloadCompressedSize)/float64(b.N), "compressed-payload-bytes") } } - bufferContext := marshaler.NewBufferContext() mockConfig := mock.New(b) compressor := metricscompression.NewCompressorReq(metricscompression.Requires{Cfg: mockConfig}).Comp pb := func(series metrics.Series) (transaction.BytesPayloads, error) { iterableSeries := metricsserializer.CreateIterableSeries(metricsserializer.CreateSerieSource(series)) - return iterableSeries.MarshalSplitCompress(bufferContext, mockConfig, compressor) + return iterableSeries.MarshalSplitCompressPipelines(mockConfig, compressor, []metricsserializer.Pipeline{{ + FilterFunc: func(metric *metrics.Serie) bool { + return true + }, + Destination: transaction.AllRegions, + }}) } payloadBuilder := stream.NewJSONPayloadBuilder(true, mockConfig, compressor, logmock.New(b))