Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions comp/forwarder/defaultforwarder/default_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,24 @@ func (f *DefaultForwarder) createAdvancedHTTPTransactions(endpoint transaction.E
for _, payload := range payloads {
for domain, dr := range f.domainResolvers {
drDomain, destinationType := dr.Resolve(endpoint) // drDomain is the domain with agent version if not local

// If the payload has a preaggr destination, but the current domain
// is not the configured preaggr site, do not create a transaction.
//
// If we have a preaggr payload and domain, switch from the standard
// series endpoint to the preaggr-specific series endpoint.
if payload.Destination == transaction.PreaggrOnly {
if drDomain != f.config.GetString("preaggr_dd_url") {
continue
} else {
endpoint = endpoints.PreaggrSeriesEndpoint
}
}
// TODO(?): If the preaggr_dd_url is the same as the primary dd_url,
// we will also inherit any additional API keys from the
// configuration of that site, meaning we'll send preaggr payloads
// for each of those orgs. Not sure if this is a problem or not.

Comment on lines +505 to +522
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Address potential duplicate payload sending and improve domain validation.

The PreaggrOnly destination handling logic is correct but has important concerns that need addressing:

  1. Duplicate sending issue: The TODO comment on lines 518-521 highlights a real problem - if preaggr_dd_url equals the primary dd_url, preaggregated payloads will be sent multiple times for each org due to inherited API keys.

  2. Configuration access: The direct config access with f.config.GetString("preaggr_dd_url") should ideally be cached or validated during forwarder initialization rather than on every transaction creation for performance.

Let's verify if this duplicate sending scenario is possible in the current configuration setup:

Consider adding validation to prevent the duplicate sending scenario:

 if payload.Destination == transaction.PreaggrOnly {
+	preaggURL := f.config.GetString("preaggr_dd_url")
+	if preaggURL == "" {
+		f.log.Warnf("PreaggrOnly payload skipped: preaggr_dd_url not configured")
+		continue
+	}
-	if drDomain != f.config.GetString("preaggr_dd_url") {
+	if drDomain != preaggURL {
 		continue
 	} else {
+		// Warn if preaggregation URL is same as primary to highlight potential duplicate sends
+		if preaggURL == f.config.GetString("dd_url") {
+			f.log.Warnf("preaggr_dd_url is same as dd_url - potential duplicate payload sending")
+		}
 		endpoint = endpoints.PreaggrSeriesEndpoint
 	}
 }

🏁 Script executed:

#!/bin/bash
# Check if preaggr_dd_url can be set to the same value as dd_url
# and if there are safeguards in the configuration setup

# Search for preaggr_dd_url configuration and validation
rg -A 10 -B 5 "preaggr_dd_url.*=|GetString.*preaggr_dd_url"

# Check for validation preventing duplicate URLs
ast-grep --pattern $'if $_ == $_ {
  $$$
}'

Length of output: 66119


Remove duplicate‐sending check (enforced in config utils) and cache URLs in DefaultForwarder

The pkg/config/utils/endpoints.go logic already skips adding a preaggregated endpoint when preaggr_dd_url == dd_url, so the “duplicate send” scenario cannot occur. However, repeatedly calling f.config.GetString(...) on every transaction impacts performance. Cache both URLs during forwarder initialization and simplify the PreaggrOnly block:

• File: comp/forwarder/defaultforwarder/default_forwarder.go
Lines: ~505–522

Suggested refactor:

 type DefaultForwarder struct {
-   config config.Config
+   config    config.Config
+   preaggURL string
+   ddURL     string
    // …
 }

 func NewDefaultForwarder(cfg config.Config, …) *DefaultForwarder {
-   return &DefaultForwarder{config: cfg, …}
+   return &DefaultForwarder{
+       config:    cfg,
+       preaggURL: cfg.GetString("preaggr_dd_url"),
+       ddURL:     cfg.GetString("dd_url"),
+       // …
+   }
 }

 // inside transaction‐creation loop
 if payload.Destination == transaction.PreaggrOnly {
-   if drDomain != f.config.GetString("preaggr_dd_url") {
+   if f.preaggURL == "" {
+       f.log.Warn("PreaggrOnly payload skipped: preaggr_dd_url not set")
+       continue
+   }
+   if drDomain != f.preaggURL {
        continue
    }
    endpoint = endpoints.PreaggrSeriesEndpoint
 }
  • Removed the redundant “duplicate send” warning (config utils enforces uniqueness).
  • Cached both URLs on startup to eliminate repetitive GetString calls.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// If the payload has a preaggr destination, but the current domain
// is not the configured preaggr site, do not create a transaction.
//
// If we have a preaggr payload and domain, switch from the standard
// series endpoint to the preaggr-specific series endpoint.
if payload.Destination == transaction.PreaggrOnly {
if drDomain != f.config.GetString("preaggr_dd_url") {
continue
} else {
endpoint = endpoints.PreaggrSeriesEndpoint
}
}
// TODO(?): If the preaggr_dd_url is the same as the primary dd_url,
// we will also inherit any additional API keys from the
// configuration of that site, meaning we'll send preaggr payloads
// for each of those orgs. Not sure if this is a problem or not.
// --- in comp/forwarder/defaultforwarder/default_forwarder.go ---
// 1) Cache URLs in the struct
type DefaultForwarder struct {
config config.Config
preaggURL string
ddURL string
// …
}
func NewDefaultForwarder(cfg config.Config, …) *DefaultForwarder {
return &DefaultForwarder{
config: cfg,
preaggURL: cfg.GetString("preaggr_dd_url"),
ddURL: cfg.GetString("dd_url"),
// …
}
}
// … later, inside the transaction-creation loop …
// If the payload has a preaggr destination, but the current domain
// is not the configured preaggr site, do not create a transaction.
//
// If we have a preaggr payload and domain, switch from the standard
// series endpoint to the preaggr-specific series endpoint.
if payload.Destination == transaction.PreaggrOnly {
if f.preaggURL == "" {
f.log.Warn("PreaggrOnly payload skipped: preaggr_dd_url not set")
continue
}
if drDomain != f.preaggURL {
continue
}
endpoint = endpoints.PreaggrSeriesEndpoint
}
// TODO(?): If the preaggr_dd_url is the same as the primary dd_url,
// we will also inherit any additional API keys from the
// configuration of that site…
🤖 Prompt for AI Agents
In comp/forwarder/defaultforwarder/default_forwarder.go around lines 505 to 522,
remove the redundant duplicate-sending check for preaggr payloads since this is
already enforced in pkg/config/utils/endpoints.go. Instead of calling
f.config.GetString("preaggr_dd_url") on every transaction, cache both the
primary dd_url and preaggr_dd_url during DefaultForwarder initialization. Then
update the PreaggrOnly destination handling to use the cached URLs for domain
comparison, simplifying the logic and improving performance.

if payload.Destination == transaction.LocalOnly {
// if it is local payload, we should not send it to the remote endpoint
if destinationType == pkgresolver.Local && endpoint == endpoints.SeriesEndpoint {
Expand Down
2 changes: 2 additions & 0 deletions comp/forwarder/defaultforwarder/endpoints/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ var (

// SeriesEndpoint is the v2 endpoint used to send series
SeriesEndpoint = transaction.Endpoint{Route: "/api/v2/series", Name: "series_v2"}
// PreaggrSeriesEndpoint is the endpoint used for experimental preaggregation
PreaggrSeriesEndpoint = transaction.Endpoint{Route: "/api/intake/pipelines/ddseries", Name: "preaggr_series"}
// EventsEndpoint is the v2 endpoint used to send events
EventsEndpoint = transaction.Endpoint{Route: "/api/v2/events", Name: "events_v2"}
// ServiceChecksEndpoint is the v2 endpoint used to send service checks
Expand Down
4 changes: 4 additions & 0 deletions comp/forwarder/defaultforwarder/transaction/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ const (
SecondaryOnly
// LocalOnly indicates the transaction should be sent to the local endpoint (cluster-agent) only
LocalOnly
// PreaggrOnly indicates the transaction should be sent to the pre-aggregation endpoint only
PreaggrOnly
)

func (d Destination) String() string {
Expand All @@ -218,6 +220,8 @@ func (d Destination) String() string {
return "SecondaryOnly"
case LocalOnly:
return "LocalOnly"
case PreaggrOnly:
return "PreaggrOnly"
default:
return "Unknown"
}
Expand Down
1 change: 1 addition & 0 deletions pkg/config/setup/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ var serverlessConfigComponents = []func(pkgconfigmodel.Setup){
setupAPM,
OTLP,
setupMultiRegionFailover,
setupPreaggregation,
telemetry,
autoconfig,
remoteconfig,
Expand Down
16 changes: 16 additions & 0 deletions pkg/config/setup/preaggregation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package setup

import (
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
)

func setupPreaggregation(config pkgconfigmodel.Setup) {
config.BindEnvAndSetDefault("enable_preaggr_pipeline", false)
config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datad0g.com")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The URL https://api.datad0g.com appears to contain a typo, using the digit 0 instead of the letter o. Unless this is intentional (perhaps for testing purposes), consider changing it to https://api.datadoghq.com or the appropriate production URL to avoid potential connectivity issues.

Suggested change
config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datad0g.com")
config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datadoghq.com")

Spotted by Diamond

Is this helpful? React 👍 or 👎 to let us know.

config.BindEnv("preaggr_api_key")
}
Comment on lines +12 to +16
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Good configuration setup, but verify the default URL.

The configuration function properly sets up environment variable bindings with appropriate defaults. The approach of defaulting enable_preaggr_pipeline to false for safety is good practice.

However, please verify the default URL "https://api.datad0g.com" - the domain contains "datad0g" with a zero instead of "datadog". Ensure this is intentional and not a typo.


🏁 Script executed:

#!/bin/bash
# Description: Verify if the preaggregation URL domain is valid and related to Datadog

# Check if the domain resolves and is accessible
echo "Checking domain resolution for api.datad0g.com..."
nslookup api.datad0g.com

echo -e "\nChecking HTTP response..."
curl -I https://api.datad0g.com 2>/dev/null || echo "Failed to connect"

echo -e "\nFor comparison, checking standard Datadog domains..."
nslookup api.datadoghq.com
curl -I https://api.datadoghq.com 2>/dev/null || echo "Failed to connect"

Length of output: 1587


Correct default Datadog API URL

It looks like the domain in the default URL is a typo—api.datad0g.com (with a zero) isn’t the standard Datadog API endpoint. Please update it to the correct domain:

• pkg/config/setup/preaggregation.go, line 14

-   config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datad0g.com")
+   config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datadoghq.com")

This ensures clients will hit the official Datadog API.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
func setupPreaggregation(config pkgconfigmodel.Setup) {
config.BindEnvAndSetDefault("enable_preaggr_pipeline", false)
config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datad0g.com")
config.BindEnv("preaggr_api_key")
}
func setupPreaggregation(config pkgconfigmodel.Setup) {
config.BindEnvAndSetDefault("enable_preaggr_pipeline", false)
- config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datad0g.com")
+ config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datadoghq.com")
config.BindEnv("preaggr_api_key")
}
🤖 Prompt for AI Agents
In pkg/config/setup/preaggregation.go around lines 12 to 16, the default URL for
the preaggregation API is set to "https://api.datad0g.com" which contains a typo
with a zero instead of the letter "o". Update the default URL string to
"https://api.datadoghq.com" to ensure it points to the correct official Datadog
API endpoint.

20 changes: 20 additions & 0 deletions pkg/config/utils/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,26 @@ func GetMultipleEndpoints(c pkgconfigmodel.Reader) (map[string][]APIKeys, error)
Keys: []string{c.GetString("multi_region_failover.api_key")},
}}
}

// Populate preaggregation endpoint (only if unique)
//
// TODO(?): This assumes we wouldn't have a unique preaggregation API key
// without a unique preaggregation URL, but we should validate that.
if c.GetBool("enable_preaggr_pipeline") {
preaggURL := c.GetString("preaggr_dd_url")
// Check that it's not the same as the primary URL
if preaggURL != "" && preaggURL != ddURL {
// Check if preaggregation URL already exists in additional endpoints
if _, exists := additionalEndpoints[preaggURL]; !exists {
// Unique URL - create new domain resolver with preaggregation API key
additionalEndpoints[preaggURL] = []APIKeys{{
ConfigSettingPath: "preaggr_api_key",
Keys: []string{c.GetString("preaggr_api_key")},
}}
}
}
}
Comment on lines +169 to +186
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Review the preaggregation endpoint logic and address edge cases.

The preaggregation endpoint configuration logic is generally sound, but there are several concerns:

  1. Missing validation: The TODO comment on lines 171-172 highlights a valid concern - there's no validation that a unique preaggregation API key corresponds to a unique URL.

  2. Empty API key handling: No validation ensures preaggr_api_key is non-empty before creating the endpoint entry.

  3. URL validation: While the main URL gets validated via url.Parse() in the mergeAdditionalEndpoints call, the preaggregation URL only gets a basic string comparison check.

Consider adding validation to address these concerns:

 if c.GetBool("enable_preaggr_pipeline") {
 	preaggURL := c.GetString("preaggr_dd_url")
+	preaggAPIKey := c.GetString("preaggr_api_key")
+	
+	// Validate preaggregation URL
+	if preaggURL != "" {
+		if _, err := url.Parse(preaggURL); err != nil {
+			return nil, fmt.Errorf("could not parse preaggr_dd_url: %s", err)
+		}
+	}
+	
 	// Check that it's not the same as the primary URL
-	if preaggURL != "" && preaggURL != ddURL {
+	if preaggURL != "" && preaggURL != ddURL && preaggAPIKey != "" {
 		// Check if preaggregation URL already exists in additional endpoints
 		if _, exists := additionalEndpoints[preaggURL]; !exists {
 			// Unique URL - create new domain resolver with preaggregation API key
 			additionalEndpoints[preaggURL] = []APIKeys{{
 				ConfigSettingPath: "preaggr_api_key",
-				Keys:              []string{c.GetString("preaggr_api_key")},
+				Keys:              []string{preaggAPIKey},
 			}}
 		}
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Populate preaggregation endpoint (only if unique)
//
// TODO(?): This assumes we wouldn't have a unique preaggregation API key
// without a unique preaggregation URL, but we should validate that.
if c.GetBool("enable_preaggr_pipeline") {
preaggURL := c.GetString("preaggr_dd_url")
// Check that it's not the same as the primary URL
if preaggURL != "" && preaggURL != ddURL {
// Check if preaggregation URL already exists in additional endpoints
if _, exists := additionalEndpoints[preaggURL]; !exists {
// Unique URL - create new domain resolver with preaggregation API key
additionalEndpoints[preaggURL] = []APIKeys{{
ConfigSettingPath: "preaggr_api_key",
Keys: []string{c.GetString("preaggr_api_key")},
}}
}
}
}
// Populate preaggregation endpoint (only if unique)
//
// TODO(?): This assumes we wouldn't have a unique preaggregation API key
// without a unique preaggregation URL, but we should validate that.
if c.GetBool("enable_preaggr_pipeline") {
preaggURL := c.GetString("preaggr_dd_url")
preaggAPIKey := c.GetString("preaggr_api_key")
// Validate preaggregation URL
if preaggURL != "" {
if _, err := url.Parse(preaggURL); err != nil {
return nil, fmt.Errorf("could not parse preaggr_dd_url: %s", err)
}
}
// Check that it's not the same as the primary URL
if preaggURL != "" && preaggURL != ddURL && preaggAPIKey != "" {
// Check if preaggregation URL already exists in additional endpoints
if _, exists := additionalEndpoints[preaggURL]; !exists {
// Unique URL - create new domain resolver with preaggregation API key
additionalEndpoints[preaggURL] = []APIKeys{{
ConfigSettingPath: "preaggr_api_key",
Keys: []string{preaggAPIKey},
}}
}
}
}
🤖 Prompt for AI Agents
In pkg/config/utils/endpoints.go around lines 169 to 186, enhance the
preaggregation endpoint logic by adding validation to ensure the preaggregation
API key is non-empty before creating the endpoint entry, verify that the
preaggregation API key corresponds to a unique preaggregation URL, and perform
proper URL validation on the preaggregation URL instead of just string
comparison. Implement these checks before adding the preaggregation endpoint to
additionalEndpoints to handle edge cases and improve robustness.


return mergeAdditionalEndpoints(keysPerDomain, additionalEndpoints)
}

Expand Down
90 changes: 33 additions & 57 deletions pkg/serializer/internal/metrics/iterable_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,77 +106,41 @@ func describeItem(serie *metrics.Serie) string {
return fmt.Sprintf("name %q, %d points", serie.Name, len(serie.Points))
}

// MarshalSplitCompress uses the stream compressor to marshal and compress series payloads.
// If a compressed payload is larger than the max, a new payload will be generated. This method returns a slice of
// compressed protobuf marshaled MetricPayload objects.
func (series *IterableSeries) MarshalSplitCompress(bufferContext *marshaler.BufferContext, config config.Component, strategy compression.Component) (transaction.BytesPayloads, error) {
pb, err := series.NewPayloadsBuilder(bufferContext, config, strategy)
if err != nil {
return nil, err
}

err = pb.startPayload()
if err != nil {
return nil, err
}

// Use series.source.MoveNext() instead of series.MoveNext() because this function supports
// the serie.NoIndex field.
for series.source.MoveNext() {
err = pb.writeSerie(series.source.Current())
if err != nil {
return nil, err
}
}

// if the last payload has any data, flush it
err = pb.finishPayload()
if err != nil {
return nil, err
}

return pb.payloads, nil
type Pipeline struct {
FilterFunc func(s *metrics.Serie) bool
Destination transaction.Destination
}

// MarshalSplitCompressMultiple uses the stream compressor to marshal and compress one series into three sets of payloads.
// One set of payloads contains all metrics,
// The seond contains only those that pass the provided MRF filter function.
// The third contains only those that pass the provided autoscaling local failover filter function.
// This function exists because we need a way to build both payloads in a single pass over the input data, which cannot be iterated over twice.
func (series *IterableSeries) MarshalSplitCompressMultiple(config config.Component, strategy compression.Component, filterFuncForMRF func(s *metrics.Serie) bool, filterFuncForAutoscaling func(s *metrics.Serie) bool) (transaction.BytesPayloads, transaction.BytesPayloads, transaction.BytesPayloads, error) {
pbs := make([]*PayloadsBuilder, 3) // 0: all, 1: MRF, 2: autoscaling
// MarshalSplitCompressPipelines uses the stream compressor to marshal and
// compress series payloads, allowing multiple variants to be generated in a
// single pass over the input data. If a compressed payload is larger than the
// max, a new payload will be generated. This method returns a slice of
// compressed protobuf marshaled MetricPayload objects.
func (series *IterableSeries) MarshalSplitCompressPipelines(config config.Component, strategy compression.Component, pipelines []Pipeline) (transaction.BytesPayloads, error) {
pbs := make([]*PayloadsBuilder, len(pipelines))
for i := range pbs {
bufferContext := marshaler.NewBufferContext()
pb, err := series.NewPayloadsBuilder(bufferContext, config, strategy)
if err != nil {
return nil, nil, nil, err
return nil, err
}
pbs[i] = &pb

err = pbs[i].startPayload()
if err != nil {
return nil, nil, nil, err
return nil, err
}
}

// Use series.source.MoveNext() instead of series.MoveNext() because this function supports
// the serie.NoIndex field.
for series.source.MoveNext() {
err := pbs[0].writeSerie(series.source.Current())
if err != nil {
return nil, nil, nil, err
}

if filterFuncForMRF(series.source.Current()) {
err = pbs[1].writeSerie(series.source.Current())
if err != nil {
return nil, nil, nil, err
}
}

if filterFuncForAutoscaling(series.source.Current()) {
err = pbs[2].writeSerie(series.source.Current())
if err != nil {
return nil, nil, nil, err
for i, pipeline := range pipelines {
if pipeline.FilterFunc(series.source.Current()) {
err := pbs[i].writeSerie(series.source.Current())
if err != nil {
return nil, err
}
}
}
}
Expand All @@ -185,11 +149,23 @@ func (series *IterableSeries) MarshalSplitCompressMultiple(config config.Compone
for i := range pbs {
err := pbs[i].finishPayload()
if err != nil {
return nil, nil, nil, err
return nil, err
}
}

// assign destinations to payloads per strategy
for i, pipeline := range pipelines {
for _, payload := range pbs[i].payloads {
payload.Destination = pipeline.Destination
}
}

return pbs[0].payloads, pbs[1].payloads, pbs[2].payloads, nil
payloads := make([]*transaction.BytesPayload, len(pbs))
for _, pb := range pbs {
payloads = append(payloads, pb.payloads...)
}
Comment on lines +163 to +166
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an issue with the array initialization in this code. The line payloads := make([]*transaction.BytesPayload, len(pbs)) creates an array with len(pbs) nil elements, but then the code appends more elements with append(payloads, pb.payloads...). This will result in the first len(pbs) elements being nil followed by the actual payload elements.

To fix this, either:

  1. Initialize with zero length: payloads := make([]*transaction.BytesPayload, 0) or simply var payloads []*transaction.BytesPayload
  2. Or use assignment instead of append if you want to pre-allocate:
payloads := make([]*transaction.BytesPayload, 0, totalPayloadCount)
for _, pb := range pbs {
    payloads = append(payloads, pb.payloads...)
}
Suggested change
payloads := make([]*transaction.BytesPayload, len(pbs))
for _, pb := range pbs {
payloads = append(payloads, pb.payloads...)
}
payloads := make([]*transaction.BytesPayload, 0)
for _, pb := range pbs {
payloads = append(payloads, pb.payloads...)
}

Spotted by Diamond

Is this helpful? React 👍 or 👎 to let us know.


return payloads, nil
Comment on lines +163 to +168
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix payload collection logic to avoid slice reallocation.

The payload collection has an efficiency issue:

The current logic initializes payloads with a fixed size but then uses append(), which will cause reallocation and potentially incorrect indexing:

-payloads := make([]*transaction.BytesPayload, len(pbs))
+var payloads []*transaction.BytesPayload
 for _, pb := range pbs {
 	payloads = append(payloads, pb.payloads...)
 }

Alternatively, pre-calculate the total capacity:

+totalCapacity := 0
+for _, pb := range pbs {
+	totalCapacity += len(pb.payloads)
+}
+payloads := make([]*transaction.BytesPayload, 0, totalCapacity)
 for _, pb := range pbs {
 	payloads = append(payloads, pb.payloads...)
 }
🤖 Prompt for AI Agents
In pkg/serializer/internal/metrics/iterable_series.go around lines 163 to 168,
the code initializes the payloads slice with a fixed length but then uses
append, causing unnecessary reallocations and incorrect slice length. To fix
this, first calculate the total number of payloads across all pbs, then create
the payloads slice with that exact capacity and length, and finally copy the
payloads directly into the slice without using append.

}

// NewPayloadsBuilder initializes a new PayloadsBuilder to be used for serializing series into a set of output payloads.
Expand Down
106 changes: 66 additions & 40 deletions pkg/serializer/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,22 @@ package serializer

import (
"bytes"
"context"
"encoding/json"
"errors"
"expvar"
"fmt"
"net/http"
"slices"
"strconv"
"time"

forwarder "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder/transaction"
orchestratorForwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorinterface"
hostMetadataUtils "github.com/DataDog/datadog-agent/comp/metadata/host/hostimpl/hosttags"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/tagset"

"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/pkg/metrics"
Expand Down Expand Up @@ -342,54 +347,75 @@ func (s *Serializer) SendIterableSeries(serieSource metrics.SerieSource) error {
var extraHeaders http.Header
var err error

if useV1API && s.enableJSONStream {
seriesBytesPayloads, extraHeaders, err = s.serializeIterableStreamablePayload(seriesSerializer, stream.DropItemOnErrItemTooBig)
} else if useV1API && !s.enableJSONStream {
seriesBytesPayloads, extraHeaders, err = s.serializePayloadJSON(seriesSerializer, true)
} else {
failoverActiveForMRF, allowlistForMRF := s.getFailoverAllowlist()
failoverActiveForAutoscaling, allowlistForAutoscaling := s.getAutoscalingFailoverMetrics()
failoverActive := (failoverActiveForMRF && len(allowlistForMRF) > 0) || (failoverActiveForAutoscaling && len(allowlistForAutoscaling) > 0)
if failoverActive {
var filtered transaction.BytesPayloads
var localAutoscalingFaioverPayloads transaction.BytesPayloads
seriesBytesPayloads, filtered, localAutoscalingFaioverPayloads, err = seriesSerializer.MarshalSplitCompressMultiple(s.config, s.Strategy,
func(s *metrics.Serie) bool { // Filter for MRF
_, allowed := allowlistForMRF[s.Name]
return allowed
},
func(s *metrics.Serie) bool { // Filter for Autoscaling
_, allowed := allowlistForAutoscaling[s.Name]
return allowed
})

for _, seriesBytesPayload := range seriesBytesPayloads {
seriesBytesPayload.Destination = transaction.PrimaryOnly
}
for _, seriesBytesPayload := range filtered {
seriesBytesPayload.Destination = transaction.SecondaryOnly
}
for _, seriesBytesPayload := range localAutoscalingFaioverPayloads {
seriesBytesPayload.Destination = transaction.LocalOnly
}
seriesBytesPayloads = append(seriesBytesPayloads, filtered...)
seriesBytesPayloads = append(seriesBytesPayloads, localAutoscalingFaioverPayloads...)
if useV1API {
if s.enableJSONStream {
seriesBytesPayloads, extraHeaders, err = s.serializeIterableStreamablePayload(seriesSerializer, stream.DropItemOnErrItemTooBig)
} else {
seriesBytesPayloads, err = seriesSerializer.MarshalSplitCompress(marshaler.NewBufferContext(), s.config, s.Strategy)
for _, seriesBytesPayload := range seriesBytesPayloads {
seriesBytesPayload.Destination = transaction.AllRegions
}
seriesBytesPayloads, extraHeaders, err = s.serializePayloadJSON(seriesSerializer, true)
}
extraHeaders = s.protobufExtraHeadersWithCompression
if err != nil {
return fmt.Errorf("dropping series payload: %s", err)
}
return s.Forwarder.SubmitV1Series(seriesBytesPayloads, extraHeaders)
}

failoverActiveForMRF, allowlistForMRF := s.getFailoverAllowlist()
failoverActiveForAutoscaling, allowlistForAutoscaling := s.getAutoscalingFailoverMetrics()
failoverActive := (failoverActiveForMRF && len(allowlistForMRF) > 0) || (failoverActiveForAutoscaling && len(allowlistForAutoscaling) > 0)
pipelines := make([]metricsserializer.Pipeline, 0, 4)
if failoverActive {
// Default behavior, primary region only
pipelines = append(pipelines, metricsserializer.Pipeline{
FilterFunc: func(series *metrics.Serie) bool { return true },
Destination: transaction.PrimaryOnly,
})

// Filter for MRF
pipelines = append(pipelines, metricsserializer.Pipeline{
FilterFunc: func(s *metrics.Serie) bool {
_, allowed := allowlistForMRF[s.Name]
return allowed
},
Destination: transaction.SecondaryOnly,
})

// Filter for Autoscaling
pipelines = append(pipelines, metricsserializer.Pipeline{
FilterFunc: func(s *metrics.Serie) bool {
_, allowed := allowlistForAutoscaling[s.Name]
return allowed
},
Destination: transaction.LocalOnly,
})
} else {
// Default behavior, all regions
pipelines = append(pipelines, metricsserializer.Pipeline{
FilterFunc: func(series *metrics.Serie) bool { return true },
Destination: transaction.AllRegions,
})
}
Comment on lines +362 to +396
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify failover pipeline logic correctness.

The pipeline-based approach is a significant improvement in terms of modularity and readability. However, I want to verify the failover logic is equivalent to the previous implementation.

The logic appears correct - it creates separate pipelines for primary (all series), secondary (MRF allowlist), and local (autoscaling allowlist) destinations when failover is active, otherwise sends all series to all regions.


🏁 Script executed:

#!/bin/bash
# Search for the original failover logic to compare with the new pipeline approach
rg -A 20 -B 5 "multi_region_failover.*enabled.*failover_metrics"

# Check for any existing tests that validate failover behavior
fd -e go -t f | xargs grep -l "failover.*test\|test.*failover" | head -5

Length of output: 5780


Verified failover pipeline logic matches original behavior

I’ve confirmed that the new pipeline-based code in pkg/serializer/serializer.go (around lines 362–396) uses the exact same flag checks and allowlists from getFailoverAllowlist() and getAutoscalingFailoverMetrics() as the prior implementation:

  • When either MRF or Autoscaling failover is active (and their respective allowlists are non-empty), it emits three pipelines:
    • PrimaryOnly (all series)
    • SecondaryOnly (MRF allowlist)
    • LocalOnly (Autoscaling allowlist)
  • Otherwise it emits a single AllRegions pipeline.

I did not find any existing tests that exercise these failover‐metrics pipelines. To fully validate this important behavior, please add unit tests covering:

• MRF-only failover routing
• Autoscaling-only failover routing
• Both failovers active
• No-failover (AllRegions) path

— especially around pkg/serializer/serializer.go lines 362–396.

🤖 Prompt for AI Agents
In pkg/serializer/serializer.go around lines 362 to 396, the failover pipeline
logic is correctly implemented but lacks unit test coverage. Add unit tests that
specifically cover the failover routing scenarios: MRF-only failover active,
Autoscaling-only failover active, both failovers active simultaneously, and the
no-failover case where all regions receive metrics. These tests should verify
that the correct pipelines with appropriate filters and destinations are created
for each scenario to ensure the failover behavior is fully validated.


// We are modifying the series in this filter, so it must always be the last
// pipeline in the list to avoid affecting other pipelines.
if s.config.GetBool("enable_preaggr_pipeline") {
pipelines = append(pipelines, metricsserializer.Pipeline{
FilterFunc: func(s *metrics.Serie) bool {
// TODO: don't add host tags if they were already added because of `expected_tags_duration` being set
hostTags := slices.Clone(hostMetadataUtils.Get(context.TODO(), false, pkgconfigsetup.Datadog()).System)
s.Tags = tagset.CombineCompositeTagsAndSlice(s.Tags, hostTags)
return true
},
Destination: transaction.PreaggrOnly,
})
}

Comment on lines +398 to +411
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Address host tag modification concerns in preaggregation pipeline.

The preaggregation pipeline implementation has several concerns that need attention:

  1. Side effect warning: The comment on lines 398-399 correctly notes this pipeline must be last since it modifies series data. This is a potential source of bugs if the pipeline order changes.

  2. TODO concern: Line 403 mentions not adding host tags if they were already added due to expected_tags_duration. This could lead to duplicate tags or inconsistent behavior.

  3. Context usage: Using context.TODO() on line 404 instead of a proper context might miss cancellation or timeout scenarios.

  4. Performance impact: slices.Clone() and tagset.CombineCompositeTagsAndSlice() are called for every series, which could be expensive for high-volume metrics.

Consider improving the implementation:

 // We are modifying the series in this filter, so it must always be the last
 // pipeline in the list to avoid affecting other pipelines.
 if s.config.GetBool("enable_preaggr_pipeline") {
+	// Cache host tags to avoid repeated retrieval
+	hostTags := slices.Clone(hostMetadataUtils.Get(context.TODO(), false, pkgconfigsetup.Datadog()).System)
+	
 	pipelines = append(pipelines, metricsserializer.Pipeline{
 		FilterFunc: func(s *metrics.Serie) bool {
-			// TODO: don't add host tags if they were already added because of `expected_tags_duration` being set
-			hostTags := slices.Clone(hostMetadataUtils.Get(context.TODO(), false, pkgconfigsetup.Datadog()).System)
+			// TODO: Check if host tags already present to avoid duplicates
 			s.Tags = tagset.CombineCompositeTagsAndSlice(s.Tags, hostTags)
 			return true
 		},
 		Destination: transaction.PreaggrOnly,
 	})
 }

Let's check how expected_tags_duration interacts with host tag addition:


🏁 Script executed:

#!/bin/bash
# Search for expected_tags_duration usage and host tag handling
rg -A 10 -B 5 "expected_tags_duration"

# Check if there are existing mechanisms to detect duplicate host tags
ast-grep --pattern $'func $_(tags $_) $_ {
  $$$
}'

Length of output: 50317


Refactor preaggregation pipeline host-tag logic

The current implementation always fetches and clones host tags per-series (inside the FilterFunc), unconditionally adds them (risking duplicates when the standard host-tag provider is active), and uses context.TODO(). Let’s tighten this up:

• Cache & clone host tags once, outside the per-series closure, so you don’t call hostMetadataUtils.Get() on every metric.
• Replace context.TODO() with a real context (e.g. a field on your serializer or a passed-in ctx).
• Detect & skip already-present host tags to honor expected_tags_duration and avoid dupes.
• Ensure this pipeline remains last (to avoid unintended side effects).

Suggested diff in pkg/serializer/serializer.go:

  if s.config.GetBool("enable_preaggr_pipeline") {
-   pipelines = append(pipelines, metricsserializer.Pipeline{
+   // Preaggregated metrics need host tags once—cache at pipeline setup
+   ctx := s.ctx  // or however you carry a real Context
+   hostTags := slices.Clone(hostMetadataUtils.Get(ctx, false, pkgconfigsetup.Datadog()).System)
+
+   pipelines = append(pipelines, metricsserializer.Pipeline{
      FilterFunc: func(s *metrics.Serie) bool {
-       // TODO: don't add host tags if they were already added because of `expected_tags_duration` being set
-       hostTags := slices.Clone(hostMetadataUtils.Get(context.TODO(), false, pkgconfigsetup.Datadog()).System)
+       // skip if host tags already on the series
+       if containsAll(s.Tags, hostTags) {
+           return true
+       }
+       s.Tags = tagset.CombineCompositeTagsAndSlice(s.Tags, hostTags)
        return true
      },
      Destination: transaction.PreaggrOnly,
    })
  }

You’ll need to implement a small helper (e.g. containsAll([]string, []string) bool) to detect duplicates. This refactor eliminates per-series overhead, avoids duplicate tag addition, and uses a real context.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In pkg/serializer/serializer.go around lines 398 to 411, the preaggregation
pipeline currently fetches and clones host tags inside the per-series FilterFunc
using context.TODO(), which causes performance overhead and risks adding
duplicate host tags if they were already added due to expected_tags_duration. To
fix this, move the host tag fetching and cloning outside the FilterFunc so it
happens once, replace context.TODO() with a proper context passed into the
serializer or function, implement a helper function to check if all host tags
are already present in the series tags to avoid duplicates, and only add host
tags if they are missing. Also, keep the pipeline as the last in the list to
prevent side effects.

seriesBytesPayloads, err = seriesSerializer.MarshalSplitCompressPipelines(s.config, s.Strategy, pipelines)
extraHeaders = s.protobufExtraHeadersWithCompression

if err != nil {
return fmt.Errorf("dropping series payload: %s", err)
}

if useV1API {
return s.Forwarder.SubmitV1Series(seriesBytesPayloads, extraHeaders)
}
return s.Forwarder.SubmitSeries(seriesBytesPayloads, extraHeaders)
}

Expand Down
9 changes: 6 additions & 3 deletions pkg/serializer/series_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/DataDog/datadog-agent/pkg/metrics"
metricsserializer "github.com/DataDog/datadog-agent/pkg/serializer/internal/metrics"
"github.com/DataDog/datadog-agent/pkg/serializer/internal/stream"
"github.com/DataDog/datadog-agent/pkg/serializer/marshaler"
"github.com/DataDog/datadog-agent/pkg/tagset"
)

Expand Down Expand Up @@ -76,12 +75,16 @@ func BenchmarkSeries(b *testing.B) {
b.ReportMetric(float64(payloadCompressedSize)/float64(b.N), "compressed-payload-bytes")
}
}
bufferContext := marshaler.NewBufferContext()
mockConfig := mock.New(b)
compressor := metricscompression.NewCompressorReq(metricscompression.Requires{Cfg: mockConfig}).Comp
pb := func(series metrics.Series) (transaction.BytesPayloads, error) {
iterableSeries := metricsserializer.CreateIterableSeries(metricsserializer.CreateSerieSource(series))
return iterableSeries.MarshalSplitCompress(bufferContext, mockConfig, compressor)
return iterableSeries.MarshalSplitCompressPipelines(mockConfig, compressor, []metricsserializer.Pipeline{{
FilterFunc: func(metric *metrics.Serie) bool {
return true
},
Destination: transaction.AllRegions,
}})
}

payloadBuilder := stream.NewJSONPayloadBuilder(true, mockConfig, compressor, logmock.New(b))
Expand Down