Conversation
a7773cc to
070ae12
Compare
2056c19 to
d5a3f08
Compare
006cb8e to
4142915
Compare
5a64507 to
3562c93
Compare
|
@codex review |
There was a problem hiding this comment.
Pull request overview
This PR adds trace stats computation capabilities to the Serverless Compatibility Layer. It enables the system to compute and send trace statistics when tracers haven't already computed them, enhancing stats availability across all supported serverless environments.
Changes:
- Adds a new
StatsConcentratorServicemodule that manages span concentration and stats bucket generation using thelibdd-trace-statslibrary - Adds a new
StatsGeneratormodule that extracts metadata and spans from traces and sends them to the stats concentrator - Extends
ServerlessTraceProcessorto optionally call the stats generator when the tracer hasn't already computed stats - Extends
ServerlessStatsFlusherto flush both tracer-computed and serverless-computed stats - Adds integration tests to verify stats computation and to verify stats are skipped when client has already computed them
- Adds configuration options for service, env, and version fields, and an environment variable to enable/disable stats computation
Reviewed changes
Copilot reviewed 12 out of 13 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
src/stats_concentrator_service.rs |
New module implementing stats concentration service with metadata tracking and flush logic |
src/stats_generator.rs |
New module for extracting trace information and sending to stats concentrator |
src/trace_processor.rs |
Modified to conditionally invoke stats generator based on client-computed-stats header |
src/stats_flusher.rs |
Modified to flush both channel and concentrator-based stats with proper flow control |
src/config.rs |
Added service, env, version fields and corresponding environment variable reading |
src/main.rs |
Added initialization of stats computation with environment variable toggle |
src/lib.rs |
Exported new modules |
src/mini_agent.rs |
Minor code style improvements (let-else pattern) |
Cargo.toml |
Added thiserror and libdd-trace-stats dependencies |
tests/integration_test.rs |
Added comprehensive integration tests for stats generation scenarios |
tests/common/mock_server.rs |
Updated to handle stats intake with proper 202 status code response |
tests/common/helpers.rs |
Extended send_tcp_request to support additional headers |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ac8d049051
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| impl StatsConcentratorService { | ||
| #[must_use] | ||
| pub fn new(config: Arc<Config>) -> (Self, StatsConcentratorHandle) { | ||
| let (tx, rx) = mpsc::unbounded_channel(); |
There was a problem hiding this comment.
Bound the concentrator command queue
Using mpsc::unbounded_channel() here allows StatsGenerator::send to enqueue one cloned command per span without any backpressure, so bursts of large traces can grow memory until the process is OOM-killed if run() cannot drain fast enough. This is especially risky because the enqueue happens on the request path and nothing throttles producers when the concentrator falls behind.
Useful? React with 👍 / 👎.
|
|
||
| /// A bounded channel applies backpressure on the trace request path when the concentrator | ||
| /// cannot keep up, instead of growing without limit as an unbounded channel would. | ||
| const CONCENTRATOR_COMMAND_CHANNEL_CAPACITY: usize = 8192; |
There was a problem hiding this comment.
does this back-pressure propagate to the tracer and the customer's code? would it be better to drop messages at some point?
also, how do we pick this size and what do we need to consider if we need to change it?
| fn clone(&self) -> Self { | ||
| Self { | ||
| tx: self.tx.clone(), | ||
| // Cloning this may cause trace metadata to be set multiple times, |
There was a problem hiding this comment.
does AtomicBool::new clone something?
| } | ||
|
|
||
| pub async fn set_tracer_metadata(&self, trace: &TracerPayload) -> Result<(), StatsError> { | ||
| // Set tracer metadata only once for the first trace because |
There was a problem hiding this comment.
this assumes that our agent is being used in a single-tracer environment. this is probably true of most serverless workloads, but why pay for all this extra "only once" complexity? and is there a way to detect that we're actually running in a single-tracer environment and at least log a warning of some kind?
There was a problem hiding this comment.
or, alternatively, why not pull this data for each tracer payload?
| self.tracer_metadata = tracer_metadata; | ||
| } | ||
| ConcentratorCommand::Add(span) => self.concentrator.add_span(&*span), | ||
| ConcentratorCommand::Flush(force_flush, response_tx) => { |
There was a problem hiding this comment.
does this channel allow messages to arrive out of order? i.e. is it possible that we, say, add a final span, then call flush, and end up with the add being missed by our final flush?
| Some(ClientStatsPayload { | ||
| // Do not set hostname so the trace stats backend can aggregate stats properly | ||
| hostname: String::new(), | ||
| env: self.config.env.clone().unwrap_or("unknown-env".to_string()), |
There was a problem hiding this comment.
do we default to unknown-env in other places? not dev?
| info!("serverless stats computation enabled"); | ||
| let (service, handle) = | ||
| stats_concentrator_service::StatsConcentratorService::new(config.clone()); | ||
| tokio::spawn(service.run()); |
There was a problem hiding this comment.
do we need to think about any sort of shutdown mechanism for this service, or do we rely on the existing application cleanup at the end?
| self.flush_stats(config.clone(), buffer.to_vec()).await; | ||
| buffer.clear(); | ||
| // copy the batch for this flush | ||
| let channel_stats = buffer.to_vec(); |
There was a problem hiding this comment.
Nit: LLM recommends let channel_stats = std::mem::take(&mut *buffer); to replace let channel_stats = buffer.to_vec(); buffer.clear(); but that's unrelated to the actual intent of the change here, so.
| } | ||
| Ok(None) => {} | ||
| Err(e) => { | ||
| error!("Failed to flush stats concentrator: {e}"); |
There was a problem hiding this comment.
Do we want a log in a loop here or a error to be caught and have the whole process restarted/shut down? The trace flusher and stats flusher both have a a Err("<this code> task terminated unexpectedly".into());, would be consistent to have that for the concentrator as well.
What does this PR do?
Motivation
Some tracers support trace stats computation, but all don't support it, or have it disabled by default. Trace stats computation in the Serverless Compatibility Layer allows all environments supported by this package to have trace stats.
https://datadoghq.atlassian.net/browse/SVLS-8032
Additional Notes
DD_SERVERLESS_STATS_COMPUTATION_ENABLED=falseclient_drop_p0s: falsein agent/infoendpointDescribe how to test/QA your changes
Datadog-Client-Computed-Statsis present in the trace payloadserverless-components/crates/datadog-trace-agent/src/stats_concentrator_service.rs
Line 17 in 10c9441