-
Notifications
You must be signed in to change notification settings - Fork 0
add preaggregation pipeline dual shipping support #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,16 @@ | ||||||||||||||||||||||||
| // Unless explicitly stated otherwise all files in this repository are licensed | ||||||||||||||||||||||||
| // under the Apache License Version 2.0. | ||||||||||||||||||||||||
| // This product includes software developed at Datadog (https://www.datadoghq.com/). | ||||||||||||||||||||||||
| // Copyright 2016-present Datadog, Inc. | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| package setup | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||
| pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model" | ||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| func setupPreaggregation(config pkgconfigmodel.Setup) { | ||||||||||||||||||||||||
| config.BindEnvAndSetDefault("enable_preaggr_pipeline", false) | ||||||||||||||||||||||||
| config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datad0g.com") | ||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The URL
Suggested change
Spotted by Diamond |
||||||||||||||||||||||||
| config.BindEnv("preaggr_api_key") | ||||||||||||||||||||||||
| } | ||||||||||||||||||||||||
|
Comment on lines
+12
to
+16
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Verification agent 🧩 Analysis chainGood configuration setup, but verify the default URL. The configuration function properly sets up environment variable bindings with appropriate defaults. The approach of defaulting However, please verify the default URL 🏁 Script executed: #!/bin/bash
# Description: Verify if the preaggregation URL domain is valid and related to Datadog
# Check if the domain resolves and is accessible
echo "Checking domain resolution for api.datad0g.com..."
nslookup api.datad0g.com
echo -e "\nChecking HTTP response..."
curl -I https://api.datad0g.com 2>/dev/null || echo "Failed to connect"
echo -e "\nFor comparison, checking standard Datadog domains..."
nslookup api.datadoghq.com
curl -I https://api.datadoghq.com 2>/dev/null || echo "Failed to connect"Length of output: 1587 Correct default Datadog API URL It looks like the domain in the default URL is a typo— • pkg/config/setup/preaggregation.go, line 14 - config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datad0g.com")
+ config.BindEnvAndSetDefault("preaggr_dd_url", "https://api.datadoghq.com")This ensures clients will hit the official Datadog API. 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -165,6 +165,26 @@ func GetMultipleEndpoints(c pkgconfigmodel.Reader) (map[string][]APIKeys, error) | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Keys: []string{c.GetString("multi_region_failover.api_key")}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Populate preaggregation endpoint (only if unique) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // TODO(?): This assumes we wouldn't have a unique preaggregation API key | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // without a unique preaggregation URL, but we should validate that. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if c.GetBool("enable_preaggr_pipeline") { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| preaggURL := c.GetString("preaggr_dd_url") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Check that it's not the same as the primary URL | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if preaggURL != "" && preaggURL != ddURL { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Check if preaggregation URL already exists in additional endpoints | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if _, exists := additionalEndpoints[preaggURL]; !exists { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Unique URL - create new domain resolver with preaggregation API key | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| additionalEndpoints[preaggURL] = []APIKeys{{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ConfigSettingPath: "preaggr_api_key", | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Keys: []string{c.GetString("preaggr_api_key")}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+169
to
+186
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Review the preaggregation endpoint logic and address edge cases. The preaggregation endpoint configuration logic is generally sound, but there are several concerns:
Consider adding validation to address these concerns: if c.GetBool("enable_preaggr_pipeline") {
preaggURL := c.GetString("preaggr_dd_url")
+ preaggAPIKey := c.GetString("preaggr_api_key")
+
+ // Validate preaggregation URL
+ if preaggURL != "" {
+ if _, err := url.Parse(preaggURL); err != nil {
+ return nil, fmt.Errorf("could not parse preaggr_dd_url: %s", err)
+ }
+ }
+
// Check that it's not the same as the primary URL
- if preaggURL != "" && preaggURL != ddURL {
+ if preaggURL != "" && preaggURL != ddURL && preaggAPIKey != "" {
// Check if preaggregation URL already exists in additional endpoints
if _, exists := additionalEndpoints[preaggURL]; !exists {
// Unique URL - create new domain resolver with preaggregation API key
additionalEndpoints[preaggURL] = []APIKeys{{
ConfigSettingPath: "preaggr_api_key",
- Keys: []string{c.GetString("preaggr_api_key")},
+ Keys: []string{preaggAPIKey},
}}
}
}
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return mergeAdditionalEndpoints(keysPerDomain, additionalEndpoints) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -106,77 +106,41 @@ func describeItem(serie *metrics.Serie) string { | |||||||||||||||||
| return fmt.Sprintf("name %q, %d points", serie.Name, len(serie.Points)) | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // MarshalSplitCompress uses the stream compressor to marshal and compress series payloads. | ||||||||||||||||||
| // If a compressed payload is larger than the max, a new payload will be generated. This method returns a slice of | ||||||||||||||||||
| // compressed protobuf marshaled MetricPayload objects. | ||||||||||||||||||
| func (series *IterableSeries) MarshalSplitCompress(bufferContext *marshaler.BufferContext, config config.Component, strategy compression.Component) (transaction.BytesPayloads, error) { | ||||||||||||||||||
| pb, err := series.NewPayloadsBuilder(bufferContext, config, strategy) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| return nil, err | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| err = pb.startPayload() | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| return nil, err | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Use series.source.MoveNext() instead of series.MoveNext() because this function supports | ||||||||||||||||||
| // the serie.NoIndex field. | ||||||||||||||||||
| for series.source.MoveNext() { | ||||||||||||||||||
| err = pb.writeSerie(series.source.Current()) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| return nil, err | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // if the last payload has any data, flush it | ||||||||||||||||||
| err = pb.finishPayload() | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| return nil, err | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| return pb.payloads, nil | ||||||||||||||||||
| type Pipeline struct { | ||||||||||||||||||
| FilterFunc func(s *metrics.Serie) bool | ||||||||||||||||||
| Destination transaction.Destination | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // MarshalSplitCompressMultiple uses the stream compressor to marshal and compress one series into three sets of payloads. | ||||||||||||||||||
| // One set of payloads contains all metrics, | ||||||||||||||||||
| // The seond contains only those that pass the provided MRF filter function. | ||||||||||||||||||
| // The third contains only those that pass the provided autoscaling local failover filter function. | ||||||||||||||||||
| // This function exists because we need a way to build both payloads in a single pass over the input data, which cannot be iterated over twice. | ||||||||||||||||||
| func (series *IterableSeries) MarshalSplitCompressMultiple(config config.Component, strategy compression.Component, filterFuncForMRF func(s *metrics.Serie) bool, filterFuncForAutoscaling func(s *metrics.Serie) bool) (transaction.BytesPayloads, transaction.BytesPayloads, transaction.BytesPayloads, error) { | ||||||||||||||||||
| pbs := make([]*PayloadsBuilder, 3) // 0: all, 1: MRF, 2: autoscaling | ||||||||||||||||||
| // MarshalSplitCompressPipelines uses the stream compressor to marshal and | ||||||||||||||||||
| // compress series payloads, allowing multiple variants to be generated in a | ||||||||||||||||||
| // single pass over the input data. If a compressed payload is larger than the | ||||||||||||||||||
| // max, a new payload will be generated. This method returns a slice of | ||||||||||||||||||
| // compressed protobuf marshaled MetricPayload objects. | ||||||||||||||||||
| func (series *IterableSeries) MarshalSplitCompressPipelines(config config.Component, strategy compression.Component, pipelines []Pipeline) (transaction.BytesPayloads, error) { | ||||||||||||||||||
| pbs := make([]*PayloadsBuilder, len(pipelines)) | ||||||||||||||||||
| for i := range pbs { | ||||||||||||||||||
| bufferContext := marshaler.NewBufferContext() | ||||||||||||||||||
| pb, err := series.NewPayloadsBuilder(bufferContext, config, strategy) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| return nil, nil, nil, err | ||||||||||||||||||
| return nil, err | ||||||||||||||||||
| } | ||||||||||||||||||
| pbs[i] = &pb | ||||||||||||||||||
|
|
||||||||||||||||||
| err = pbs[i].startPayload() | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| return nil, nil, nil, err | ||||||||||||||||||
| return nil, err | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // Use series.source.MoveNext() instead of series.MoveNext() because this function supports | ||||||||||||||||||
| // the serie.NoIndex field. | ||||||||||||||||||
| for series.source.MoveNext() { | ||||||||||||||||||
| err := pbs[0].writeSerie(series.source.Current()) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| return nil, nil, nil, err | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| if filterFuncForMRF(series.source.Current()) { | ||||||||||||||||||
| err = pbs[1].writeSerie(series.source.Current()) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| return nil, nil, nil, err | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| if filterFuncForAutoscaling(series.source.Current()) { | ||||||||||||||||||
| err = pbs[2].writeSerie(series.source.Current()) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| return nil, nil, nil, err | ||||||||||||||||||
| for i, pipeline := range pipelines { | ||||||||||||||||||
| if pipeline.FilterFunc(series.source.Current()) { | ||||||||||||||||||
| err := pbs[i].writeSerie(series.source.Current()) | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| return nil, err | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
@@ -185,11 +149,23 @@ func (series *IterableSeries) MarshalSplitCompressMultiple(config config.Compone | |||||||||||||||||
| for i := range pbs { | ||||||||||||||||||
| err := pbs[i].finishPayload() | ||||||||||||||||||
| if err != nil { | ||||||||||||||||||
| return nil, nil, nil, err | ||||||||||||||||||
| return nil, err | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // assign destinations to payloads per strategy | ||||||||||||||||||
| for i, pipeline := range pipelines { | ||||||||||||||||||
| for _, payload := range pbs[i].payloads { | ||||||||||||||||||
| payload.Destination = pipeline.Destination | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| return pbs[0].payloads, pbs[1].payloads, pbs[2].payloads, nil | ||||||||||||||||||
| payloads := make([]*transaction.BytesPayload, len(pbs)) | ||||||||||||||||||
| for _, pb := range pbs { | ||||||||||||||||||
| payloads = append(payloads, pb.payloads...) | ||||||||||||||||||
| } | ||||||||||||||||||
|
Comment on lines
+163
to
+166
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's an issue with the array initialization in this code. The line To fix this, either:
payloads := make([]*transaction.BytesPayload, 0, totalPayloadCount)
for _, pb := range pbs {
payloads = append(payloads, pb.payloads...)
}
Suggested change
Spotted by Diamond |
||||||||||||||||||
|
|
||||||||||||||||||
| return payloads, nil | ||||||||||||||||||
|
Comment on lines
+163
to
+168
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix payload collection logic to avoid slice reallocation. The payload collection has an efficiency issue: The current logic initializes -payloads := make([]*transaction.BytesPayload, len(pbs))
+var payloads []*transaction.BytesPayload
for _, pb := range pbs {
payloads = append(payloads, pb.payloads...)
}Alternatively, pre-calculate the total capacity: +totalCapacity := 0
+for _, pb := range pbs {
+ totalCapacity += len(pb.payloads)
+}
+payloads := make([]*transaction.BytesPayload, 0, totalCapacity)
for _, pb := range pbs {
payloads = append(payloads, pb.payloads...)
}🤖 Prompt for AI Agents |
||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| // NewPayloadsBuilder initializes a new PayloadsBuilder to be used for serializing series into a set of output payloads. | ||||||||||||||||||
|
|
||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,17 +8,22 @@ package serializer | |
|
|
||
| import ( | ||
| "bytes" | ||
| "context" | ||
| "encoding/json" | ||
| "errors" | ||
| "expvar" | ||
| "fmt" | ||
| "net/http" | ||
| "slices" | ||
| "strconv" | ||
| "time" | ||
|
|
||
| forwarder "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder" | ||
| "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder/transaction" | ||
| orchestratorForwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator/orchestratorinterface" | ||
| hostMetadataUtils "github.com/DataDog/datadog-agent/comp/metadata/host/hostimpl/hosttags" | ||
| pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup" | ||
| "github.com/DataDog/datadog-agent/pkg/tagset" | ||
|
|
||
| "github.com/DataDog/datadog-agent/comp/core/config" | ||
| "github.com/DataDog/datadog-agent/pkg/metrics" | ||
|
|
@@ -342,54 +347,75 @@ func (s *Serializer) SendIterableSeries(serieSource metrics.SerieSource) error { | |
| var extraHeaders http.Header | ||
| var err error | ||
|
|
||
| if useV1API && s.enableJSONStream { | ||
| seriesBytesPayloads, extraHeaders, err = s.serializeIterableStreamablePayload(seriesSerializer, stream.DropItemOnErrItemTooBig) | ||
| } else if useV1API && !s.enableJSONStream { | ||
| seriesBytesPayloads, extraHeaders, err = s.serializePayloadJSON(seriesSerializer, true) | ||
| } else { | ||
| failoverActiveForMRF, allowlistForMRF := s.getFailoverAllowlist() | ||
| failoverActiveForAutoscaling, allowlistForAutoscaling := s.getAutoscalingFailoverMetrics() | ||
| failoverActive := (failoverActiveForMRF && len(allowlistForMRF) > 0) || (failoverActiveForAutoscaling && len(allowlistForAutoscaling) > 0) | ||
| if failoverActive { | ||
| var filtered transaction.BytesPayloads | ||
| var localAutoscalingFaioverPayloads transaction.BytesPayloads | ||
| seriesBytesPayloads, filtered, localAutoscalingFaioverPayloads, err = seriesSerializer.MarshalSplitCompressMultiple(s.config, s.Strategy, | ||
| func(s *metrics.Serie) bool { // Filter for MRF | ||
| _, allowed := allowlistForMRF[s.Name] | ||
| return allowed | ||
| }, | ||
| func(s *metrics.Serie) bool { // Filter for Autoscaling | ||
| _, allowed := allowlistForAutoscaling[s.Name] | ||
| return allowed | ||
| }) | ||
|
|
||
| for _, seriesBytesPayload := range seriesBytesPayloads { | ||
| seriesBytesPayload.Destination = transaction.PrimaryOnly | ||
| } | ||
| for _, seriesBytesPayload := range filtered { | ||
| seriesBytesPayload.Destination = transaction.SecondaryOnly | ||
| } | ||
| for _, seriesBytesPayload := range localAutoscalingFaioverPayloads { | ||
| seriesBytesPayload.Destination = transaction.LocalOnly | ||
| } | ||
| seriesBytesPayloads = append(seriesBytesPayloads, filtered...) | ||
| seriesBytesPayloads = append(seriesBytesPayloads, localAutoscalingFaioverPayloads...) | ||
| if useV1API { | ||
| if s.enableJSONStream { | ||
| seriesBytesPayloads, extraHeaders, err = s.serializeIterableStreamablePayload(seriesSerializer, stream.DropItemOnErrItemTooBig) | ||
| } else { | ||
| seriesBytesPayloads, err = seriesSerializer.MarshalSplitCompress(marshaler.NewBufferContext(), s.config, s.Strategy) | ||
| for _, seriesBytesPayload := range seriesBytesPayloads { | ||
| seriesBytesPayload.Destination = transaction.AllRegions | ||
| } | ||
| seriesBytesPayloads, extraHeaders, err = s.serializePayloadJSON(seriesSerializer, true) | ||
| } | ||
| extraHeaders = s.protobufExtraHeadersWithCompression | ||
| if err != nil { | ||
| return fmt.Errorf("dropping series payload: %s", err) | ||
| } | ||
| return s.Forwarder.SubmitV1Series(seriesBytesPayloads, extraHeaders) | ||
| } | ||
|
|
||
| failoverActiveForMRF, allowlistForMRF := s.getFailoverAllowlist() | ||
| failoverActiveForAutoscaling, allowlistForAutoscaling := s.getAutoscalingFailoverMetrics() | ||
| failoverActive := (failoverActiveForMRF && len(allowlistForMRF) > 0) || (failoverActiveForAutoscaling && len(allowlistForAutoscaling) > 0) | ||
| pipelines := make([]metricsserializer.Pipeline, 0, 4) | ||
| if failoverActive { | ||
| // Default behavior, primary region only | ||
| pipelines = append(pipelines, metricsserializer.Pipeline{ | ||
| FilterFunc: func(series *metrics.Serie) bool { return true }, | ||
| Destination: transaction.PrimaryOnly, | ||
| }) | ||
|
|
||
| // Filter for MRF | ||
| pipelines = append(pipelines, metricsserializer.Pipeline{ | ||
| FilterFunc: func(s *metrics.Serie) bool { | ||
| _, allowed := allowlistForMRF[s.Name] | ||
| return allowed | ||
| }, | ||
| Destination: transaction.SecondaryOnly, | ||
| }) | ||
|
|
||
| // Filter for Autoscaling | ||
| pipelines = append(pipelines, metricsserializer.Pipeline{ | ||
| FilterFunc: func(s *metrics.Serie) bool { | ||
| _, allowed := allowlistForAutoscaling[s.Name] | ||
| return allowed | ||
| }, | ||
| Destination: transaction.LocalOnly, | ||
| }) | ||
| } else { | ||
| // Default behavior, all regions | ||
| pipelines = append(pipelines, metricsserializer.Pipeline{ | ||
| FilterFunc: func(series *metrics.Serie) bool { return true }, | ||
| Destination: transaction.AllRegions, | ||
| }) | ||
| } | ||
|
Comment on lines
+362
to
+396
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Verification agent 🧩 Analysis chainVerify failover pipeline logic correctness. The pipeline-based approach is a significant improvement in terms of modularity and readability. However, I want to verify the failover logic is equivalent to the previous implementation. The logic appears correct - it creates separate pipelines for primary (all series), secondary (MRF allowlist), and local (autoscaling allowlist) destinations when failover is active, otherwise sends all series to all regions. 🏁 Script executed: #!/bin/bash
# Search for the original failover logic to compare with the new pipeline approach
rg -A 20 -B 5 "multi_region_failover.*enabled.*failover_metrics"
# Check for any existing tests that validate failover behavior
fd -e go -t f | xargs grep -l "failover.*test\|test.*failover" | head -5Length of output: 5780 Verified failover pipeline logic matches original behavior I’ve confirmed that the new pipeline-based code in pkg/serializer/serializer.go (around lines 362–396) uses the exact same flag checks and allowlists from getFailoverAllowlist() and getAutoscalingFailoverMetrics() as the prior implementation:
I did not find any existing tests that exercise these failover‐metrics pipelines. To fully validate this important behavior, please add unit tests covering: • MRF-only failover routing — especially around pkg/serializer/serializer.go lines 362–396. 🤖 Prompt for AI Agents |
||
|
|
||
| // We are modifying the series in this filter, so it must always be the last | ||
| // pipeline in the list to avoid affecting other pipelines. | ||
| if s.config.GetBool("enable_preaggr_pipeline") { | ||
| pipelines = append(pipelines, metricsserializer.Pipeline{ | ||
| FilterFunc: func(s *metrics.Serie) bool { | ||
| // TODO: don't add host tags if they were already added because of `expected_tags_duration` being set | ||
| hostTags := slices.Clone(hostMetadataUtils.Get(context.TODO(), false, pkgconfigsetup.Datadog()).System) | ||
| s.Tags = tagset.CombineCompositeTagsAndSlice(s.Tags, hostTags) | ||
| return true | ||
| }, | ||
| Destination: transaction.PreaggrOnly, | ||
| }) | ||
| } | ||
|
|
||
|
Comment on lines
+398
to
+411
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Verification agent 🧩 Analysis chainAddress host tag modification concerns in preaggregation pipeline. The preaggregation pipeline implementation has several concerns that need attention:
Consider improving the implementation: // We are modifying the series in this filter, so it must always be the last
// pipeline in the list to avoid affecting other pipelines.
if s.config.GetBool("enable_preaggr_pipeline") {
+ // Cache host tags to avoid repeated retrieval
+ hostTags := slices.Clone(hostMetadataUtils.Get(context.TODO(), false, pkgconfigsetup.Datadog()).System)
+
pipelines = append(pipelines, metricsserializer.Pipeline{
FilterFunc: func(s *metrics.Serie) bool {
- // TODO: don't add host tags if they were already added because of `expected_tags_duration` being set
- hostTags := slices.Clone(hostMetadataUtils.Get(context.TODO(), false, pkgconfigsetup.Datadog()).System)
+ // TODO: Check if host tags already present to avoid duplicates
s.Tags = tagset.CombineCompositeTagsAndSlice(s.Tags, hostTags)
return true
},
Destination: transaction.PreaggrOnly,
})
}Let's check how 🏁 Script executed: #!/bin/bash
# Search for expected_tags_duration usage and host tag handling
rg -A 10 -B 5 "expected_tags_duration"
# Check if there are existing mechanisms to detect duplicate host tags
ast-grep --pattern $'func $_(tags $_) $_ {
$$$
}'Length of output: 50317 Refactor preaggregation pipeline host-tag logic The current implementation always fetches and clones host tags per-series (inside the • Cache & clone host tags once, outside the per-series closure, so you don’t call Suggested diff in if s.config.GetBool("enable_preaggr_pipeline") {
- pipelines = append(pipelines, metricsserializer.Pipeline{
+ // Preaggregated metrics need host tags once—cache at pipeline setup
+ ctx := s.ctx // or however you carry a real Context
+ hostTags := slices.Clone(hostMetadataUtils.Get(ctx, false, pkgconfigsetup.Datadog()).System)
+
+ pipelines = append(pipelines, metricsserializer.Pipeline{
FilterFunc: func(s *metrics.Serie) bool {
- // TODO: don't add host tags if they were already added because of `expected_tags_duration` being set
- hostTags := slices.Clone(hostMetadataUtils.Get(context.TODO(), false, pkgconfigsetup.Datadog()).System)
+ // skip if host tags already on the series
+ if containsAll(s.Tags, hostTags) {
+ return true
+ }
+ s.Tags = tagset.CombineCompositeTagsAndSlice(s.Tags, hostTags)
return true
},
Destination: transaction.PreaggrOnly,
})
}You’ll need to implement a small helper (e.g.
🤖 Prompt for AI Agents |
||
| seriesBytesPayloads, err = seriesSerializer.MarshalSplitCompressPipelines(s.config, s.Strategy, pipelines) | ||
| extraHeaders = s.protobufExtraHeadersWithCompression | ||
|
|
||
| if err != nil { | ||
| return fmt.Errorf("dropping series payload: %s", err) | ||
| } | ||
|
|
||
| if useV1API { | ||
| return s.Forwarder.SubmitV1Series(seriesBytesPayloads, extraHeaders) | ||
| } | ||
| return s.Forwarder.SubmitSeries(seriesBytesPayloads, extraHeaders) | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Address potential duplicate payload sending and improve domain validation.
The PreaggrOnly destination handling logic is correct but has important concerns that need addressing:
Duplicate sending issue: The TODO comment on lines 518-521 highlights a real problem - if
preaggr_dd_urlequals the primarydd_url, preaggregated payloads will be sent multiple times for each org due to inherited API keys.Configuration access: The direct config access with
f.config.GetString("preaggr_dd_url")should ideally be cached or validated during forwarder initialization rather than on every transaction creation for performance.Let's verify if this duplicate sending scenario is possible in the current configuration setup:
Consider adding validation to prevent the duplicate sending scenario:
if payload.Destination == transaction.PreaggrOnly { + preaggURL := f.config.GetString("preaggr_dd_url") + if preaggURL == "" { + f.log.Warnf("PreaggrOnly payload skipped: preaggr_dd_url not configured") + continue + } - if drDomain != f.config.GetString("preaggr_dd_url") { + if drDomain != preaggURL { continue } else { + // Warn if preaggregation URL is same as primary to highlight potential duplicate sends + if preaggURL == f.config.GetString("dd_url") { + f.log.Warnf("preaggr_dd_url is same as dd_url - potential duplicate payload sending") + } endpoint = endpoints.PreaggrSeriesEndpoint } }🏁 Script executed:
Length of output: 66119
Remove duplicate‐sending check (enforced in config utils) and cache URLs in DefaultForwarder
The
pkg/config/utils/endpoints.gologic already skips adding a preaggregated endpoint whenpreaggr_dd_url == dd_url, so the “duplicate send” scenario cannot occur. However, repeatedly callingf.config.GetString(...)on every transaction impacts performance. Cache both URLs during forwarder initialization and simplify the PreaggrOnly block:• File:
comp/forwarder/defaultforwarder/default_forwarder.goLines: ~505–522
Suggested refactor:
type DefaultForwarder struct { - config config.Config + config config.Config + preaggURL string + ddURL string // … } func NewDefaultForwarder(cfg config.Config, …) *DefaultForwarder { - return &DefaultForwarder{config: cfg, …} + return &DefaultForwarder{ + config: cfg, + preaggURL: cfg.GetString("preaggr_dd_url"), + ddURL: cfg.GetString("dd_url"), + // … + } } // inside transaction‐creation loop if payload.Destination == transaction.PreaggrOnly { - if drDomain != f.config.GetString("preaggr_dd_url") { + if f.preaggURL == "" { + f.log.Warn("PreaggrOnly payload skipped: preaggr_dd_url not set") + continue + } + if drDomain != f.preaggURL { continue } endpoint = endpoints.PreaggrSeriesEndpoint }GetStringcalls.📝 Committable suggestion
🤖 Prompt for AI Agents