Skip to content

feat: add trace stats computation#51

Open
duncanpharvey wants to merge 8 commits intomainfrom
duncan-harvey/trace-stats
Open

feat: add trace stats computation#51
duncanpharvey wants to merge 8 commits intomainfrom
duncan-harvey/trace-stats

Conversation

@duncanpharvey
Copy link
Copy Markdown
Collaborator

@duncanpharvey duncanpharvey commented Dec 3, 2025

What does this PR do?

  • Add trace stats computation in the Serverless Compatibility Layer.
  • Disables tracer computed stats

Motivation

Some tracers support trace stats computation, but all don't support it, or have it disabled by default. Trace stats computation in the Serverless Compatibility Layer allows all environments supported by this package to have trace stats.

https://datadoghq.atlassian.net/browse/SVLS-8032

Additional Notes

Describe how to test/QA your changes

DEBUG datadog_trace_agent::stats_concentrator_service: Stats concentrator command queue backlog backlog=1278 tier_reached=1024 channel_capacity=8192 utilization_pct=15

@duncanpharvey duncanpharvey force-pushed the duncan-harvey/trace-stats branch 2 times, most recently from a7773cc to 070ae12 Compare December 11, 2025 14:51
@duncanpharvey duncanpharvey force-pushed the duncan-harvey/trace-stats branch from 2056c19 to d5a3f08 Compare December 17, 2025 22:16
@duncanpharvey duncanpharvey force-pushed the duncan-harvey/trace-stats branch 2 times, most recently from 006cb8e to 4142915 Compare March 27, 2026 14:16
@duncanpharvey duncanpharvey force-pushed the duncan-harvey/trace-stats branch from 5a64507 to 3562c93 Compare March 31, 2026 17:50
@duncanpharvey duncanpharvey marked this pull request as ready for review April 1, 2026 20:48
@duncanpharvey duncanpharvey requested a review from a team as a code owner April 1, 2026 20:48
Copilot AI review requested due to automatic review settings April 1, 2026 20:48
@duncanpharvey duncanpharvey requested a review from a team as a code owner April 1, 2026 20:48
@duncanpharvey duncanpharvey requested review from Lewis-E and litianningdatadog and removed request for a team April 1, 2026 20:48
@duncanpharvey
Copy link
Copy Markdown
Collaborator Author

@codex review

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds trace stats computation capabilities to the Serverless Compatibility Layer. It enables the system to compute and send trace statistics when tracers haven't already computed them, enhancing stats availability across all supported serverless environments.

Changes:

  • Adds a new StatsConcentratorService module that manages span concentration and stats bucket generation using the libdd-trace-stats library
  • Adds a new StatsGenerator module that extracts metadata and spans from traces and sends them to the stats concentrator
  • Extends ServerlessTraceProcessor to optionally call the stats generator when the tracer hasn't already computed stats
  • Extends ServerlessStatsFlusher to flush both tracer-computed and serverless-computed stats
  • Adds integration tests to verify stats computation and to verify stats are skipped when client has already computed them
  • Adds configuration options for service, env, and version fields, and an environment variable to enable/disable stats computation

Reviewed changes

Copilot reviewed 12 out of 13 changed files in this pull request and generated no comments.

Show a summary per file
File Description
src/stats_concentrator_service.rs New module implementing stats concentration service with metadata tracking and flush logic
src/stats_generator.rs New module for extracting trace information and sending to stats concentrator
src/trace_processor.rs Modified to conditionally invoke stats generator based on client-computed-stats header
src/stats_flusher.rs Modified to flush both channel and concentrator-based stats with proper flow control
src/config.rs Added service, env, version fields and corresponding environment variable reading
src/main.rs Added initialization of stats computation with environment variable toggle
src/lib.rs Exported new modules
src/mini_agent.rs Minor code style improvements (let-else pattern)
Cargo.toml Added thiserror and libdd-trace-stats dependencies
tests/integration_test.rs Added comprehensive integration tests for stats generation scenarios
tests/common/mock_server.rs Updated to handle stats intake with proper 202 status code response
tests/common/helpers.rs Extended send_tcp_request to support additional headers

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ac8d049051

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

impl StatsConcentratorService {
#[must_use]
pub fn new(config: Arc<Config>) -> (Self, StatsConcentratorHandle) {
let (tx, rx) = mpsc::unbounded_channel();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Bound the concentrator command queue

Using mpsc::unbounded_channel() here allows StatsGenerator::send to enqueue one cloned command per span without any backpressure, so bursts of large traces can grow memory until the process is OOM-killed if run() cannot drain fast enough. This is especially risky because the enqueue happens on the request path and nothing throttles producers when the concentrator falls behind.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 10c9441


/// A bounded channel applies backpressure on the trace request path when the concentrator
/// cannot keep up, instead of growing without limit as an unbounded channel would.
const CONCENTRATOR_COMMAND_CHANNEL_CAPACITY: usize = 8192;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this back-pressure propagate to the tracer and the customer's code? would it be better to drop messages at some point?

also, how do we pick this size and what do we need to consider if we need to change it?

fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
// Cloning this may cause trace metadata to be set multiple times,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does AtomicBool::new clone something?

}

pub async fn set_tracer_metadata(&self, trace: &TracerPayload) -> Result<(), StatsError> {
// Set tracer metadata only once for the first trace because
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this assumes that our agent is being used in a single-tracer environment. this is probably true of most serverless workloads, but why pay for all this extra "only once" complexity? and is there a way to detect that we're actually running in a single-tracer environment and at least log a warning of some kind?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or, alternatively, why not pull this data for each tracer payload?

self.tracer_metadata = tracer_metadata;
}
ConcentratorCommand::Add(span) => self.concentrator.add_span(&*span),
ConcentratorCommand::Flush(force_flush, response_tx) => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this channel allow messages to arrive out of order? i.e. is it possible that we, say, add a final span, then call flush, and end up with the add being missed by our final flush?

Some(ClientStatsPayload {
// Do not set hostname so the trace stats backend can aggregate stats properly
hostname: String::new(),
env: self.config.env.clone().unwrap_or("unknown-env".to_string()),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we default to unknown-env in other places? not dev?

info!("serverless stats computation enabled");
let (service, handle) =
stats_concentrator_service::StatsConcentratorService::new(config.clone());
tokio::spawn(service.run());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to think about any sort of shutdown mechanism for this service, or do we rely on the existing application cleanup at the end?

self.flush_stats(config.clone(), buffer.to_vec()).await;
buffer.clear();
// copy the batch for this flush
let channel_stats = buffer.to_vec();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: LLM recommends let channel_stats = std::mem::take(&mut *buffer); to replace let channel_stats = buffer.to_vec(); buffer.clear(); but that's unrelated to the actual intent of the change here, so.

}
Ok(None) => {}
Err(e) => {
error!("Failed to flush stats concentrator: {e}");
Copy link
Copy Markdown
Contributor

@Lewis-E Lewis-E Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want a log in a loop here or a error to be caught and have the whole process restarted/shut down? The trace flusher and stats flusher both have a a Err("<this code> task terminated unexpectedly".into());, would be consistent to have that for the concentrator as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants