From 98fc6e5f6b3dfbc52fdd60cb54aa9dedab75ddc9 Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Fri, 12 Jun 2026 15:28:00 -0400 Subject: [PATCH 1/8] feat(datadog-ffe): add flagevaluation EVP payload module + Cargo feature gate - Add telemetry/flagevaluation.rs with FfeFlagEvaluationBatch and FfeFlagEvaluationEvent types modeled on exposures.rs - Required fields: timestamp, flag.key, first_evaluation, last_evaluation, evaluation_count - Optional fields use skip_serializing_if = Option::is_none for omitempty semantics (reviewer concern #2 review:4477935835) - Context pruning helper prune_context() enforces 256 fields / 256 chars skip-not-truncate (reviewer concern #1 review:4477935835) - New Cargo feature flagevaluation-evp gates the module (parallel to exposure-events) - Gate telemetry module in lib.rs to include flagevaluation-evp feature - 30 tests pass (25 existing + 5 new); OTel evaluation_metrics.rs and exposures.rs are byte-for-byte unchanged --- datadog-ffe/Cargo.toml | 1 + datadog-ffe/src/lib.rs | 6 +- datadog-ffe/src/telemetry/flagevaluation.rs | 341 ++++++++++++++++++++ datadog-ffe/src/telemetry/mod.rs | 2 + 4 files changed, 349 insertions(+), 1 deletion(-) create mode 100644 datadog-ffe/src/telemetry/flagevaluation.rs diff --git a/datadog-ffe/Cargo.toml b/datadog-ffe/Cargo.toml index b052368644..68ee1dfd23 100644 --- a/datadog-ffe/Cargo.toml +++ b/datadog-ffe/Cargo.toml @@ -34,5 +34,6 @@ pyo3 = { version = "0.28", optional = true, default-features = false, features = default = ["remote-config"] exposure-events = ["dep:lru"] evaluation-metrics = ["dep:libdd-trace-protobuf", "dep:prost"] +flagevaluation-evp = [] pyo3 = ["dep:pyo3"] remote-config = ["dep:libdd-remote-config"] diff --git a/datadog-ffe/src/lib.rs b/datadog-ffe/src/lib.rs index bfa5ea72e6..ddbac1d2e7 100644 --- a/datadog-ffe/src/lib.rs +++ b/datadog-ffe/src/lib.rs @@ -6,7 +6,11 @@ mod flag_type; #[cfg(feature = "remote-config")] mod remote_config; pub mod rules_based; -#[cfg(any(feature = "exposure-events", feature = "evaluation-metrics"))] +#[cfg(any( + feature = "exposure-events", + feature = "evaluation-metrics", + feature = "flagevaluation-evp" +))] pub mod telemetry; pub use flag_type::{ExpectedFlagType, FlagType}; diff --git a/datadog-ffe/src/telemetry/flagevaluation.rs b/datadog-ffe/src/telemetry/flagevaluation.rs new file mode 100644 index 0000000000..a9d682d81b --- /dev/null +++ b/datadog-ffe/src/telemetry/flagevaluation.rs @@ -0,0 +1,341 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! EVP flagevaluation payload and serialization primitives for the +//! `flageval-worker` ingestion schema. +//! +//! Crate-naming note: this workspace uses `libdd-remote-config` (not +//! `datadog-remote-config`) for the remote config crate. Downstream consumers +//! (e.g. `dd-trace-php`) must use `libdd-remote-config` in any import paths. +//! +//! Two-tier aggregation (full → degraded → drop-counted) and context pruning +//! are enforced by the caller (PHP sidecar bridge, 02-07). This module only +//! owns the payload types and serialization helpers. + +use super::FfeTelemetryContext; +use serde::{Deserialize, Serialize}; +use std::collections::BTreeMap; + +// ── Aggregation caps (frozen contract §1) ──────────────────────────────────── +/// Maximum number of distinct full-tier buckets across all flags. +pub const GLOBAL_CAP: usize = 131_072; +/// Maximum number of full-tier buckets for a single flag. +pub const PER_FLAG_CAP: usize = 10_000; +/// Maximum number of distinct degraded-tier buckets across all flags. +pub const DEGRADED_CAP: usize = 32_768; + +// ── Context pruning bounds (reviewer concern #1 review:4477935835) ──────────── +/// Maximum number of context fields to include in a full-tier event. +pub const MAX_CONTEXT_FIELDS: usize = 256; +/// Maximum byte length of a context field value string. Values exceeding this +/// are skipped entirely (not truncated) to avoid partial-data misattribution. +pub const MAX_FIELD_LENGTH: usize = 256; + +// ── Top-level batch ────────────────────────────────────────────────────────── + +/// Batch wrapper for EVP flagevaluation events. +/// +/// Serializes to: +/// ```json +/// { "context": { "service": "…", "env": "…", "version": "…" }, +/// "flagEvaluations": [ … ] } +/// ``` +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FfeFlagEvaluationBatch { + pub context: FfeTelemetryContext, + #[serde(rename = "flagEvaluations")] + pub flag_evaluations: Vec, +} + +// ── Per-event payload ──────────────────────────────────────────────────────── + +/// A single aggregated flag evaluation event. +/// +/// Required fields are always present. Optional fields use +/// `skip_serializing_if = "Option::is_none"` (or the bool equivalent) so the +/// degraded tier emits a valid schema object without any null placeholders +/// (reviewer concern #2 review:4477935835). +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FfeFlagEvaluationEvent { + /// Unix timestamp of the aggregation window (milliseconds). + pub timestamp: i64, + /// Required: the flag key. + pub flag: FlagKey, + /// Earliest evaluation in this bucket (milliseconds since epoch). + pub first_evaluation: i64, + /// Latest evaluation in this bucket (milliseconds since epoch). + pub last_evaluation: i64, + /// Number of evaluations folded into this bucket. + pub evaluation_count: u64, + + // Optional fields — present in the full tier, absent in the degraded tier. + + /// Variant key; absent when the evaluation returned the runtime default + /// (no variant assigned). + #[serde(skip_serializing_if = "Option::is_none")] + pub variant: Option, + /// Allocation key from the UFC rule that produced this evaluation. + #[serde(skip_serializing_if = "Option::is_none")] + pub allocation: Option, + /// Targeting key identifying the evaluation subject. + #[serde(skip_serializing_if = "Option::is_none")] + pub targeting_key: Option, + /// Pruned evaluation context (≤256 fields, values ≤256 chars, skip-not-truncate). + #[serde(skip_serializing_if = "Option::is_none")] + pub context: Option, + /// Evaluation error, if any. + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + + // Optional fields — may appear in either tier. + + /// `true` when the evaluation returned the SDK runtime default (absent + /// variant, not a UFC-assigned variant). Omitted when false; defaults to + /// `false` on deserialization when absent. + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub runtime_default_used: bool, +} + +// ── Field sub-types ────────────────────────────────────────────────────────── + +/// Holds the flag key for the `flag` field. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FlagKey { + pub key: String, +} + +/// Holds the variant key for the `variant` field. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct VariantKey { + pub key: String, +} + +/// Holds the allocation key for the `allocation` field. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct AllocationKey { + pub key: String, +} + +/// Holds the error message for the `error` field. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct EvalError { + pub message: String, +} + +/// Per-event context object. +/// +/// `evaluation` carries the pruned context attributes; `dd.service` carries the +/// originating service name for cross-service attribution. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct FlagEvalEventContext { + /// Pruned evaluation context attributes (≤256 fields, values ≤256 chars). + #[serde(skip_serializing_if = "Option::is_none")] + pub evaluation: Option>, + /// Datadog-specific context sub-object. + #[serde(skip_serializing_if = "Option::is_none")] + pub dd: Option, +} + +/// Datadog-specific context fields inside the per-event `context.dd` object. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct ContextDD { + #[serde(skip_serializing_if = "String::is_empty")] + pub service: String, +} + +// ── Context pruning ────────────────────────────────────────────────────────── + +/// Prune evaluation context attributes to satisfy the frozen contract bounds: +/// - At most `MAX_CONTEXT_FIELDS` (256) entries are kept. +/// - String values longer than `MAX_FIELD_LENGTH` (256 chars) are **skipped** +/// (not truncated) to avoid partial-data misattribution. +/// - Non-string values (bool, number, null) are kept regardless of +/// their display length. +/// - Keys are iterated in sorted order for deterministic canonical-key +/// stability; the returned `BTreeMap` preserves that order. +/// +/// This satisfies reviewer concern #1 (`review:4477935835`). +pub fn prune_context( + attrs: &BTreeMap, +) -> BTreeMap { + attrs + .iter() + .filter(|(_, v)| { + // Skip string values that exceed the per-field byte limit. + if let serde_json::Value::String(s) = v { + s.len() <= MAX_FIELD_LENGTH + } else { + true + } + }) + .take(MAX_CONTEXT_FIELDS) + .map(|(k, v)| (k.clone(), v.clone())) + .collect() +} + +// ── Tests ──────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::{json, Value}; + + fn context() -> FfeTelemetryContext { + FfeTelemetryContext { + service: "svc".to_owned(), + env: "prod".to_owned(), + version: "1".to_owned(), + } + } + + fn full_event() -> FfeFlagEvaluationEvent { + FfeFlagEvaluationEvent { + timestamp: 1_700_000_000_000, + flag: FlagKey { + key: "my-flag".to_owned(), + }, + first_evaluation: 1_699_999_990_000, + last_evaluation: 1_700_000_000_000, + evaluation_count: 42, + variant: Some(VariantKey { + key: "on".to_owned(), + }), + allocation: Some(AllocationKey { + key: "alloc-a".to_owned(), + }), + targeting_key: Some("user-123".to_owned()), + context: Some(FlagEvalEventContext { + evaluation: Some({ + let mut m = BTreeMap::new(); + m.insert("plan".to_owned(), json!("premium")); + m + }), + dd: Some(ContextDD { + service: "frontend".to_owned(), + }), + }), + error: None, + runtime_default_used: false, + } + } + + // ── Test: required fields present in serialized JSON ────────────────────── + + #[test] + fn fully_populated_event_serializes_required_fields() { + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![full_event()], + }; + let json = serde_json::to_string(&batch).unwrap(); + let v: Value = serde_json::from_str(&json).unwrap(); + + assert_eq!(v["context"]["service"], "svc"); + assert_eq!(v["context"]["env"], "prod"); + assert_eq!(v["context"]["version"], "1"); + + let ev = &v["flagEvaluations"][0]; + assert_eq!(ev["flag"]["key"], "my-flag"); + assert!(ev["first_evaluation"].is_number()); + assert!(ev["last_evaluation"].is_number()); + assert_eq!(ev["evaluation_count"], 42); + assert_eq!(ev["variant"]["key"], "on"); + assert_eq!(ev["allocation"]["key"], "alloc-a"); + assert_eq!(ev["targeting_key"], "user-123"); + } + + // ── Test: degraded-tier event omits optional fields (no null) ───────────── + + #[test] + fn degraded_tier_event_omits_optional_fields_not_null() { + let degraded = FfeFlagEvaluationEvent { + timestamp: 1_700_000_000_000, + flag: FlagKey { + key: "flag-b".to_owned(), + }, + first_evaluation: 1_699_999_990_000, + last_evaluation: 1_700_000_000_000, + evaluation_count: 7, + variant: None, + allocation: None, + targeting_key: None, + context: None, + error: None, + runtime_default_used: false, + }; + let json = serde_json::to_string(°raded).unwrap(); + let v: Value = serde_json::from_str(&json).unwrap(); + + // Required fields present. + assert_eq!(v["flag"]["key"], "flag-b"); + assert!(v["first_evaluation"].is_number()); + assert!(v["last_evaluation"].is_number()); + assert_eq!(v["evaluation_count"], 7); + + // Optional fields entirely absent (not null). + assert!(v.get("variant").is_none(), "variant should be absent"); + assert!(v.get("allocation").is_none(), "allocation should be absent"); + assert!( + v.get("targeting_key").is_none(), + "targeting_key should be absent" + ); + assert!(v.get("context").is_none(), "context should be absent"); + assert!(v.get("error").is_none(), "error should be absent"); + assert!( + v.get("runtime_default_used").is_none(), + "runtime_default_used should be absent when false" + ); + } + + // ── Test: context pruning — 256-field limit ─────────────────────────────── + + #[test] + fn context_pruning_keeps_at_most_256_fields() { + let mut attrs = BTreeMap::new(); + for i in 0..300usize { + attrs.insert(format!("key{i:04}"), json!(i.to_string())); + } + let pruned = prune_context(&attrs); + assert_eq!( + pruned.len(), + MAX_CONTEXT_FIELDS, + "pruned context must have at most {MAX_CONTEXT_FIELDS} fields" + ); + } + + // ── Test: context pruning — skip string values > 256 chars ─────────────── + + #[test] + fn context_pruning_skips_oversized_string_values() { + let mut attrs = BTreeMap::new(); + let long_value = "x".repeat(MAX_FIELD_LENGTH + 1); + attrs.insert("oversized".to_owned(), json!(long_value)); + attrs.insert("ok".to_owned(), json!("short")); + // Non-string values are kept regardless of length. + attrs.insert("num".to_owned(), json!(12345)); + + let pruned = prune_context(&attrs); + assert!( + !pruned.contains_key("oversized"), + "oversized string value must be skipped" + ); + assert!(pruned.contains_key("ok"), "short string value must be kept"); + assert!( + pruned.contains_key("num"), + "numeric value must be kept regardless of length" + ); + } + + // ── Test: batch round-trips via serde ──────────────────────────────────── + + #[test] + fn batch_round_trips_via_serde() { + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![full_event()], + }; + let json = serde_json::to_string(&batch).unwrap(); + let decoded: FfeFlagEvaluationBatch = serde_json::from_str(&json).unwrap(); + assert_eq!(batch, decoded); + } +} diff --git a/datadog-ffe/src/telemetry/mod.rs b/datadog-ffe/src/telemetry/mod.rs index 6b5e851025..4fad83f38c 100644 --- a/datadog-ffe/src/telemetry/mod.rs +++ b/datadog-ffe/src/telemetry/mod.rs @@ -5,6 +5,8 @@ pub mod evaluation_metrics; #[cfg(feature = "exposure-events")] pub mod exposures; +#[cfg(feature = "flagevaluation-evp")] +pub mod flagevaluation; use serde::{Deserialize, Serialize}; From 89a2ba7fc7950cceb8c3c0c88a708afb72d2bbfb Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Fri, 12 Jun 2026 15:32:54 -0400 Subject: [PATCH 2/8] feat(datadog-sidecar): add FfeFlagEvaluationBatch sidecar action + EVP flusher - Add ffe_flagevaluation_flusher.rs: structural copy of ffe_exposures_flusher.rs with path constant EVP_FLAGEVALUATIONS_PATH = /evp_proxy/v2/api/v2/flagevaluations and batch type FfeFlagEvaluationBatch; fire-and-forget send_batch with non-2xx warn+drop and timeout via biased tokio::select! - Add SidecarAction::FfeFlagEvaluationBatch variant to mod.rs enum; re-export FfeFlagEvaluationBatch from datadog-ffe telemetry module - Route SidecarAction::FfeFlagEvaluationBatch in sidecar_server.rs to ffe_flagevaluation_flusher::send_batch (parallel to FfeExposureBatch arm) - Add exhaust arm for new variant in telemetry.rs process_actions match - Enable flagevaluation-evp feature on datadog-ffe dep in sidecar Cargo.toml - 40 sidecar tests pass (3 new: posts_to_evp_proxy, non_2xx_does_not_panic, timeout_returns_without_waiting); OTel FfeEvaluationMetric path untouched --- datadog-sidecar/Cargo.toml | 2 +- .../src/service/ffe_flagevaluation_flusher.rs | 275 ++++++++++++++++++ datadog-sidecar/src/service/mod.rs | 7 + datadog-sidecar/src/service/sidecar_server.rs | 22 ++ datadog-sidecar/src/service/telemetry.rs | 7 +- 5 files changed, 309 insertions(+), 4 deletions(-) create mode 100644 datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs diff --git a/datadog-sidecar/Cargo.toml b/datadog-sidecar/Cargo.toml index 443b292b32..c8da740354 100644 --- a/datadog-sidecar/Cargo.toml +++ b/datadog-sidecar/Cargo.toml @@ -29,7 +29,7 @@ libdd-trace-utils = { path = "../libdd-trace-utils" } libdd-trace-stats = { path = "../libdd-trace-stats", default-features=false, features = ["https"] } libdd-remote-config = { path = "../libdd-remote-config" } datadog-live-debugger = { path = "../datadog-live-debugger" } -datadog-ffe = { path = "../datadog-ffe", features = ["exposure-events", "evaluation-metrics"] } +datadog-ffe = { path = "../datadog-ffe", features = ["exposure-events", "evaluation-metrics", "flagevaluation-evp"] } libdd-crashtracker = { path = "../libdd-crashtracker" } libdd-dogstatsd-client = { path = "../libdd-dogstatsd-client" } libdd-tinybytes = { path = "../libdd-tinybytes" } diff --git a/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs b/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs new file mode 100644 index 0000000000..4a3a8b1103 --- /dev/null +++ b/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs @@ -0,0 +1,275 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Serializes and forwards FFE (Feature Flag Evaluation) flag evaluation +//! batches to the Datadog Agent's EVP proxy. +//! +//! Protocol: `POST /evp_proxy/v2/api/v2/flagevaluations` with the header +//! `X-Datadog-EVP-Subdomain: event-platform-intake`. Fire-and-forget: non-2xx +//! responses are logged at `warn`, network errors at `debug`, and dropped +//! (matches dd-trace-go behaviour). No agent capability gate. + +use crate::service::FfeFlagEvaluationBatch; +use http::uri::PathAndQuery; +use http::Method; +use libdd_capabilities::{Bytes, HttpClientCapability, SleepCapability}; +use libdd_common::Endpoint; +use std::time::Duration; +use tracing::{debug, warn}; + +/// EVP proxy path for FFE flag evaluation intake. +pub(crate) const EVP_FLAGEVALUATIONS_PATH: &str = "/evp_proxy/v2/api/v2/flagevaluations"; + +/// EVP subdomain that routes requests to event-platform intake. +pub(crate) const EVP_SUBDOMAIN_HEADER: &str = "X-Datadog-EVP-Subdomain"; +pub(crate) const EVP_SUBDOMAIN_VALUE: &str = "event-platform-intake"; + +const USER_AGENT: &str = concat!("ddtrace-sidecar/", env!("CARGO_PKG_VERSION")); + +/// Build the FFE flagevaluation endpoint from a session's agent base endpoint. +/// Overrides only the path (`/evp_proxy/v2/api/v2/flagevaluations`), preserving +/// scheme, authority, timeout, and test_token. +/// Returns `None` for agentless mode because EVP proxy routing is agent-only. +pub(crate) fn flagevaluation_endpoint(base: &Endpoint) -> Option { + if base.api_key.is_some() { + return None; + } + + let mut parts = base.url.clone().into_parts(); + parts.path_and_query = Some(PathAndQuery::from_static(EVP_FLAGEVALUATIONS_PATH)); + let url = http::Uri::from_parts(parts).ok()?; + Some(Endpoint { + url, + ..base.clone() + }) +} + +/// POST a structured FFE flag evaluation batch to the agent EVP proxy. +/// Fire-and-forget: non-2xx responses are logged at `warn`, network errors at +/// `debug`, and dropped (matches dd-trace-go behaviour). +pub(crate) async fn send_batch( + client: &C, + endpoint: &Endpoint, + batch: FfeFlagEvaluationBatch, +) { + let payload = match serde_json::to_string(&batch) { + Ok(p) => p, + Err(e) => { + debug!("ffe_flagevaluation_flusher: failed to encode batch payload: {e:?}"); + return; + } + }; + send_payload(client, endpoint, payload).await; +} + +async fn send_payload( + client: &C, + endpoint: &Endpoint, + payload: String, +) { + let builder = match endpoint.to_request_builder(USER_AGENT) { + Ok(b) => b, + Err(e) => { + debug!("ffe_flagevaluation_flusher: failed to build request: {e:?}"); + return; + } + }; + + let req = match builder + .method(Method::POST) + .header("Content-Type", "application/json") + .header(EVP_SUBDOMAIN_HEADER, EVP_SUBDOMAIN_VALUE) + .body(Bytes::from(payload)) + { + Ok(r) => r, + Err(e) => { + debug!("ffe_flagevaluation_flusher: failed to construct request body: {e:?}"); + return; + } + }; + + let timeout = Duration::from_millis(endpoint.timeout_ms); + let response = tokio::select! { + biased; + result = client.request(req) => result, + _ = client.sleep(timeout) => { + debug!("ffe_flagevaluation_flusher: request timed out after {timeout:?}"); + return; + } + }; + + match response { + Ok(resp) => { + let status = resp.status(); + if !status.is_success() { + let body_preview = truncate(resp.body().as_ref(), 256); + warn!("ffe_flagevaluation_flusher: non-2xx response {status}: {body_preview}"); + } else { + debug!("ffe_flagevaluation_flusher: sent flag evaluation batch, status={status}"); + } + } + Err(e) => { + debug!("ffe_flagevaluation_flusher: request failed: {e:?}"); + } + } +} + +fn truncate(bytes: &[u8], cap: usize) -> String { + let take = bytes.len().min(cap); + String::from_utf8_lossy(&bytes[..take]).into_owned() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::service::{FfeFlagEvaluationBatch, FfeTelemetryContext}; + use datadog_ffe::telemetry::flagevaluation::{FfeFlagEvaluationEvent, FlagKey}; + use httpmock::MockServer; + use libdd_capabilities::{HttpError, MaybeSend}; + use libdd_capabilities_impl::NativeCapabilities; + use std::future; + + fn endpoint_for(server: &MockServer) -> Endpoint { + Endpoint { + url: server.url("/").parse().unwrap(), + ..Endpoint::default() + } + } + + fn context() -> FfeTelemetryContext { + FfeTelemetryContext { + service: "svc".to_owned(), + env: "prod".to_owned(), + version: "1".to_owned(), + } + } + + fn eval_event() -> FfeFlagEvaluationEvent { + FfeFlagEvaluationEvent { + timestamp: 1_700_000_000_000, + flag: FlagKey { + key: "my-flag".to_owned(), + }, + first_evaluation: 1_699_999_990_000, + last_evaluation: 1_700_000_000_000, + evaluation_count: 5, + variant: None, + allocation: None, + targeting_key: None, + context: None, + error: None, + runtime_default_used: false, + } + } + + fn batch() -> FfeFlagEvaluationBatch { + FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![eval_event()], + } + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn posts_to_evp_proxy() { + let server = MockServer::start_async().await; + let mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST) + .path(EVP_FLAGEVALUATIONS_PATH) + .header(EVP_SUBDOMAIN_HEADER, EVP_SUBDOMAIN_VALUE) + .header("content-type", "application/json"); + then.status(202); + }) + .await; + + let base = endpoint_for(&server); + let ep = flagevaluation_endpoint(&base).unwrap(); + let client = NativeCapabilities::new_client(); + + send_batch(&client, &ep, batch()).await; + + mock.assert_async().await; + assert_eq!(mock.calls_async().await, 1); + } + + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn non_2xx_does_not_panic() { + let server = MockServer::start_async().await; + let _mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST) + .path(EVP_FLAGEVALUATIONS_PATH); + then.status(500).body("intake overloaded"); + }) + .await; + + let base = endpoint_for(&server); + let ep = flagevaluation_endpoint(&base).unwrap(); + let client = NativeCapabilities::new_client(); + send_batch(&client, &ep, batch()).await; + // Test passes if no panic occurs. + } + + #[tokio::test] + async fn timeout_returns_without_waiting_for_http_response() { + let ep = Endpoint { + url: "http://localhost:8126".parse().unwrap(), + timeout_ms: 1, + ..Endpoint::default() + }; + + send_batch(&HangingCapabilities, &ep, batch()).await; + // Test passes if function returns before the pending HTTP future resolves. + } + + #[test] + fn endpoint_preserves_authority_overrides_path() { + let base = Endpoint { + url: "http://agent.internal:8126/v0.4/traces".parse().unwrap(), + ..Endpoint::default() + }; + let ep = flagevaluation_endpoint(&base).unwrap(); + assert_eq!(ep.url.scheme_str(), Some("http")); + assert_eq!(ep.url.authority().unwrap().as_str(), "agent.internal:8126"); + assert_eq!(ep.url.path(), EVP_FLAGEVALUATIONS_PATH); + } + + #[test] + fn endpoint_rejects_agentless() { + let base = Endpoint { + url: "https://trace.agent.datadoghq.com/v0.4/traces" + .parse() + .unwrap(), + api_key: Some("api-key".into()), + ..Endpoint::default() + }; + assert!(flagevaluation_endpoint(&base).is_none()); + } + + #[derive(Clone, Debug)] + struct HangingCapabilities; + + impl HttpClientCapability for HangingCapabilities { + fn new_client() -> Self { + Self + } + + fn request( + &self, + _req: http::Request, + ) -> impl future::Future, HttpError>> + MaybeSend + { + future::pending() + } + } + + impl SleepCapability for HangingCapabilities { + fn new() -> Self { + Self + } + + async fn sleep(&self, _duration: Duration) {} + } +} diff --git a/datadog-sidecar/src/service/mod.rs b/datadog-sidecar/src/service/mod.rs index 656f2d635a..687472f2c1 100644 --- a/datadog-sidecar/src/service/mod.rs +++ b/datadog-sidecar/src/service/mod.rs @@ -5,6 +5,7 @@ use crate::config; pub use datadog_ffe::telemetry::evaluation_metrics::FfeEvaluationMetric; pub use datadog_ffe::telemetry::exposures::{FfeExposure, FfeExposureBatch}; +pub use datadog_ffe::telemetry::flagevaluation::FfeFlagEvaluationBatch; pub use datadog_ffe::telemetry::FfeTelemetryContext; use libdd_common::tag::Tag; use libdd_common::Endpoint; @@ -32,6 +33,7 @@ pub mod blocking; mod debugger_diagnostics_bookkeeper; pub mod exception_hash_rate_limiter; pub(crate) mod ffe_exposures_flusher; +pub(crate) mod ffe_flagevaluation_flusher; pub(crate) mod ffe_metrics_flusher; mod instance_id; mod queue_id; @@ -92,6 +94,11 @@ pub enum SidecarAction { /// Structured FFE exposures. The sidecar owns JSON serialization, /// cross-request deduplication, and EVP delivery. FfeExposureBatch(FfeExposureBatch), + /// Structured FFE flag evaluation batch for the EVP flagevaluation track. + /// The sidecar serializes and POSTs the batch to + /// `/evp_proxy/v2/api/v2/flagevaluations` (fire-and-forget). PHP (EMIT-07) + /// drives the two-tier aggregation upstream and dispatches via this action. + FfeFlagEvaluationBatch(FfeFlagEvaluationBatch), /// Structured FFE evaluation metrics. The sidecar owns OTLP/protobuf /// aggregation, serialization, and delivery. This action must be sent only /// by SDKs that explicitly opted into native FFE metric ownership. diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 15b19ba227..47af7dbbc6 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -36,6 +36,7 @@ use crate::service::debugger_diagnostics_bookkeeper::{ }; use crate::service::exception_hash_rate_limiter::EXCEPTION_HASH_LIMITER; use crate::service::ffe_exposures_flusher; +use crate::service::ffe_flagevaluation_flusher; use crate::service::ffe_metrics_flusher; use crate::service::remote_configs::{RemoteConfigNotifyTarget, RemoteConfigs}; use crate::service::stats_flusher::{ @@ -441,6 +442,27 @@ impl SidecarInterface for ConnectionSidecarHandler { } false } + SidecarAction::FfeFlagEvaluationBatch(batch) => { + if let Some(base) = trace_config.endpoint.as_ref() { + if let Some(ep) = ffe_flagevaluation_flusher::flagevaluation_endpoint(base) + { + let batch = batch.clone(); + let client = ffe_http_client.clone(); + tokio::spawn(async move { + ffe_flagevaluation_flusher::send_batch(&client, &ep, batch).await; + }); + } else { + debug!( + "ffe_flagevaluation_flusher: could not derive endpoint, dropping batch" + ); + } + } else { + debug!( + "ffe_flagevaluation_flusher: no session endpoint, dropping batch" + ); + } + false + } SidecarAction::FfeEvaluationMetrics { context, metrics } => { if let Some(ep) = session.get_otlp_metrics_endpoint().clone() { let client = ffe_http_client.clone(); diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index 9954f6eb41..4213a59399 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -453,9 +453,10 @@ impl TelemetryCachedClient { warn!("Attempted to send telemetry point for unregistered metric: {metric_name}"); } } - SidecarAction::PhpComposerTelemetryFile(_) => {} // handled separately - SidecarAction::FfeExposureBatch(_) => {} // handled in sidecar_server - SidecarAction::FfeEvaluationMetrics { .. } => {} // handled in sidecar_server + SidecarAction::PhpComposerTelemetryFile(_) => {} // handled separately + SidecarAction::FfeExposureBatch(_) => {} // handled in sidecar_server + SidecarAction::FfeFlagEvaluationBatch(_) => {} // handled in sidecar_server + SidecarAction::FfeEvaluationMetrics { .. } => {} // handled in sidecar_server } } actions From de8e987a28a64e98c201f4d06a1cb0adff8d1652 Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Sun, 14 Jun 2026 10:00:18 -0400 Subject: [PATCH 3/8] fix(ffe): make flagevaluation batch bincode-safe over the sidecar IPC The worker->sidecar IPC serializes SidecarAction with bincode (non-self-describing). serde_json::Value (deserialize_any) and #[serde(skip_serializing_if)] both make bincode deserialize fail, so the sidecar silently dropped every FfeFlagEvaluationBatch ('IPC serve: failed to decode request') while the worker enqueue still returned ok. - Carry pruned context as a JSON-object string (Option); remove all skip_serializing_if from the wire types (keep #[serde(default)] for deserialize). - Re-expand the context string into a JSON object and strip null/false/empty placeholders in ffe_flagevaluation_flusher::build_payload (POST shape unchanged; degraded tier carries no null placeholders). - Add enqueue_actions_reliable (checked blocking send + reconnect-retry) for one-shot FFE batches; best-effort enqueue_actions left unchanged for high-volume telemetry. - Add a bincode round-trip test for FfeFlagEvaluationBatch (mixed Some/None fields) to lock the wire-codec contract. --- Cargo.lock | 1 + datadog-ffe/Cargo.toml | 6 + datadog-ffe/src/telemetry/flagevaluation.rs | 171 ++++++--- datadog-sidecar/src/service/blocking.rs | 70 ++++ .../src/service/ffe_flagevaluation_flusher.rs | 334 +++++++++++++++++- datadog-sidecar/src/service/sender.rs | 28 ++ datadog-sidecar/src/service/sidecar_server.rs | 4 +- datadog-sidecar/src/service/telemetry.rs | 8 +- 8 files changed, 563 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ad07875b86..b7eeefba0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1353,6 +1353,7 @@ checksum = "d7a1e2f27636f116493b8b860f5546edb47c8d8f8ea73e1d2a20be88e28d1fea" name = "datadog-ffe" version = "1.0.0" dependencies = [ + "bincode", "chrono", "derive_more", "faststr", diff --git a/datadog-ffe/Cargo.toml b/datadog-ffe/Cargo.toml index 68ee1dfd23..3d1c88caf0 100644 --- a/datadog-ffe/Cargo.toml +++ b/datadog-ffe/Cargo.toml @@ -30,6 +30,12 @@ libdd-trace-protobuf = { path = "../libdd-trace-protobuf", optional = true } prost = { version = "0.14.1", optional = true } pyo3 = { version = "0.28", optional = true, default-features = false, features = ["macros"] } +[dev-dependencies] +# Matches the sidecar's bincode version. Used by the flagevaluation bincode +# round-trip test, which guards against `skip_serializing_if` reintroducing the +# worker→sidecar IPC field-misalignment bug (bincode is non-self-describing). +bincode = { version = "1.3.3" } + [features] default = ["remote-config"] exposure-events = ["dep:lru"] diff --git a/datadog-ffe/src/telemetry/flagevaluation.rs b/datadog-ffe/src/telemetry/flagevaluation.rs index a9d682d81b..553d53cba1 100644 --- a/datadog-ffe/src/telemetry/flagevaluation.rs +++ b/datadog-ffe/src/telemetry/flagevaluation.rs @@ -11,6 +11,20 @@ //! Two-tier aggregation (full → degraded → drop-counted) and context pruning //! are enforced by the caller (PHP sidecar bridge, 02-07). This module only //! owns the payload types and serialization helpers. +//! +//! Serialization note (bincode wire vs EVP POST): these types cross the +//! worker→sidecar IPC boundary, which is encoded with **bincode** — a +//! non-self-describing format whose derived `Deserialize` reads every field in +//! declaration order. `#[serde(skip_serializing_if = ...)]` is therefore +//! **incompatible** with the bincode wire: a skipped field is omitted on +//! serialize but still expected on deserialize, causing field misalignment and +//! a dropped batch. For that reason **all fields here are always serialized** +//! (no `skip_serializing_if`). The flageval-worker EVP schema rejects null / +//! empty placeholders (especially for degraded-tier events), so the sidecar +//! flusher (`ffe_flagevaluation_flusher::build_payload`) strips null / empty +//! placeholder entries from the JSON before the HTTP POST, reproducing the old +//! skip semantics only on the outbound wire. `#[serde(default)]` is kept on +//! fields that have it for deserialize robustness. use super::FfeTelemetryContext; use serde::{Deserialize, Serialize}; @@ -51,10 +65,13 @@ pub struct FfeFlagEvaluationBatch { /// A single aggregated flag evaluation event. /// -/// Required fields are always present. Optional fields use -/// `skip_serializing_if = "Option::is_none"` (or the bool equivalent) so the -/// degraded tier emits a valid schema object without any null placeholders -/// (reviewer concern #2 review:4477935835). +/// **All fields are always serialized** (no `skip_serializing_if`) so the type +/// is safe over the non-self-describing bincode IPC wire (see the module-level +/// serialization note). The degraded tier therefore serializes optional fields +/// as `null`/`false` on the wire; the sidecar flusher +/// (`ffe_flagevaluation_flusher::build_payload`) strips those null/empty +/// placeholders before the EVP POST so the flageval-worker schema sees no null +/// placeholders (reviewer concern #2 review:4477935835). #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct FfeFlagEvaluationEvent { /// Unix timestamp of the aggregation window (milliseconds). @@ -68,31 +85,31 @@ pub struct FfeFlagEvaluationEvent { /// Number of evaluations folded into this bucket. pub evaluation_count: u64, - // Optional fields — present in the full tier, absent in the degraded tier. - + // Optional fields — present in the full tier, `None` in the degraded tier. + // Serialized as `null` on the bincode wire; the flusher strips them. /// Variant key; absent when the evaluation returned the runtime default /// (no variant assigned). - #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] pub variant: Option, /// Allocation key from the UFC rule that produced this evaluation. - #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] pub allocation: Option, /// Targeting key identifying the evaluation subject. - #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] pub targeting_key: Option, /// Pruned evaluation context (≤256 fields, values ≤256 chars, skip-not-truncate). - #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] pub context: Option, /// Evaluation error, if any. - #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] pub error: Option, - // Optional fields — may appear in either tier. - + // Optional field — may appear in either tier. /// `true` when the evaluation returned the SDK runtime default (absent - /// variant, not a UFC-assigned variant). Omitted when false; defaults to - /// `false` on deserialization when absent. - #[serde(default, skip_serializing_if = "std::ops::Not::not")] + /// variant, not a UFC-assigned variant). Serialized as `false` on the wire + /// when unset; the flusher strips the `false` placeholder before the POST. + /// `#[serde(default)]` keeps deserialization robust when the field is absent. + #[serde(default)] pub runtime_default_used: bool, } @@ -128,18 +145,35 @@ pub struct EvalError { /// originating service name for cross-service attribution. #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct FlagEvalEventContext { - /// Pruned evaluation context attributes (≤256 fields, values ≤256 chars). - #[serde(skip_serializing_if = "Option::is_none")] - pub evaluation: Option>, - /// Datadog-specific context sub-object. - #[serde(skip_serializing_if = "Option::is_none")] + /// Pruned evaluation context attributes (≤256 fields, values ≤256 chars), + /// carried over the wire as a **JSON-object string** (e.g. `{"plan":"premium"}`). + /// + /// The sidecar IPC codec is bincode, which cannot (de)serialize + /// `serde_json::Value` (it relies on `deserialize_any`, which bincode + /// rejects). To keep the bincode wire encodable, the pruned context is + /// stringified at event-build time and re-expanded into a JSON object by the + /// sidecar flusher (`ffe_flagevaluation_flusher::build_payload`) before the + /// EVP POST, so the on-the-wire EVP schema (`context.evaluation` as an + /// object) is unchanged. `Eq` is preserved because `String` is `Eq`. + /// + /// Always serialized (no `skip_serializing_if`) for bincode-wire safety; + /// the sidecar flusher strips it when `None` → `null`. + #[serde(default)] + pub evaluation: Option, + /// Datadog-specific context sub-object. Always serialized for bincode-wire + /// safety; the flusher strips it when `None` → `null` (and recursively + /// removes the enclosing `context` object if it becomes empty). + #[serde(default)] pub dd: Option, } /// Datadog-specific context fields inside the per-event `context.dd` object. #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct ContextDD { - #[serde(skip_serializing_if = "String::is_empty")] + /// Originating service name. Always serialized for bincode-wire safety; the + /// flusher strips it when empty (`""`), reproducing the old + /// `skip_serializing_if = "String::is_empty"` semantics on the POST. + #[serde(default)] pub service: String, } @@ -147,12 +181,11 @@ pub struct ContextDD { /// Prune evaluation context attributes to satisfy the frozen contract bounds: /// - At most `MAX_CONTEXT_FIELDS` (256) entries are kept. -/// - String values longer than `MAX_FIELD_LENGTH` (256 chars) are **skipped** -/// (not truncated) to avoid partial-data misattribution. -/// - Non-string values (bool, number, null) are kept regardless of -/// their display length. -/// - Keys are iterated in sorted order for deterministic canonical-key -/// stability; the returned `BTreeMap` preserves that order. +/// - String values longer than `MAX_FIELD_LENGTH` (256 chars) are **skipped** (not truncated) to +/// avoid partial-data misattribution. +/// - Non-string values (bool, number, null) are kept regardless of their display length. +/// - Keys are iterated in sorted order for deterministic canonical-key stability; the returned +/// `BTreeMap` preserves that order. /// /// This satisfies reviewer concern #1 (`review:4477935835`). pub fn prune_context( @@ -205,11 +238,14 @@ mod tests { }), targeting_key: Some("user-123".to_owned()), context: Some(FlagEvalEventContext { - evaluation: Some({ - let mut m = BTreeMap::new(); - m.insert("plan".to_owned(), json!("premium")); - m - }), + evaluation: Some( + serde_json::to_string(&{ + let mut m = BTreeMap::new(); + m.insert("plan".to_owned(), json!("premium")); + m + }) + .unwrap(), + ), dd: Some(ContextDD { service: "frontend".to_owned(), }), @@ -244,11 +280,8 @@ mod tests { assert_eq!(ev["targeting_key"], "user-123"); } - // ── Test: degraded-tier event omits optional fields (no null) ───────────── - - #[test] - fn degraded_tier_event_omits_optional_fields_not_null() { - let degraded = FfeFlagEvaluationEvent { + fn degraded_event() -> FfeFlagEvaluationEvent { + FfeFlagEvaluationEvent { timestamp: 1_700_000_000_000, flag: FlagKey { key: "flag-b".to_owned(), @@ -262,8 +295,20 @@ mod tests { context: None, error: None, runtime_default_used: false, - }; - let json = serde_json::to_string(°raded).unwrap(); + } + } + + // ── Test: degraded-tier event serializes optional fields as null ────────── + // + // The type no longer uses `skip_serializing_if` (bincode-wire safety), so on + // the wire `None`/`false` optional fields ARE present (as null/false). The + // null-placeholder stripping that the flageval-worker schema requires now + // happens in the sidecar flusher's `build_payload`, not at the type level — + // see `ffe_flagevaluation_flusher` tests. + + #[test] + fn degraded_tier_event_serializes_optional_fields_as_null() { + let json = serde_json::to_string(°raded_event()).unwrap(); let v: Value = serde_json::from_str(&json).unwrap(); // Required fields present. @@ -272,18 +317,46 @@ mod tests { assert!(v["last_evaluation"].is_number()); assert_eq!(v["evaluation_count"], 7); - // Optional fields entirely absent (not null). - assert!(v.get("variant").is_none(), "variant should be absent"); - assert!(v.get("allocation").is_none(), "allocation should be absent"); + // Optional fields are present as null/false placeholders on the wire + // (stripped later by the flusher, NOT at the type level). + assert!(v["variant"].is_null(), "variant should serialize as null"); assert!( - v.get("targeting_key").is_none(), - "targeting_key should be absent" + v["allocation"].is_null(), + "allocation should serialize as null" ); - assert!(v.get("context").is_none(), "context should be absent"); - assert!(v.get("error").is_none(), "error should be absent"); assert!( - v.get("runtime_default_used").is_none(), - "runtime_default_used should be absent when false" + v["targeting_key"].is_null(), + "targeting_key should serialize as null" + ); + assert!(v["context"].is_null(), "context should serialize as null"); + assert!(v["error"].is_null(), "error should serialize as null"); + assert_eq!( + v["runtime_default_used"], false, + "runtime_default_used should serialize as false" + ); + } + + // ── Test: bincode round-trip with mixed Some/None fields ────────────────── + // + // This is the mechanical guard for the worker→sidecar IPC bug: bincode is a + // non-self-describing codec, so any `skip_serializing_if` on these types + // would omit a field on serialize while the derived Deserialize still + // expects it in order → field misalignment → the sidecar drops the batch. + // A batch mixing a full-tier event (Some fields) and a degraded-tier event + // (None fields) must survive serialize→deserialize byte-for-byte. + + #[test] + fn batch_round_trips_via_bincode_with_mixed_optional_fields() { + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![full_event(), degraded_event()], + }; + let bytes = bincode::serialize(&batch).expect("bincode serialize must succeed"); + let decoded: FfeFlagEvaluationBatch = + bincode::deserialize(&bytes).expect("bincode deserialize must succeed"); + assert_eq!( + batch, decoded, + "bincode round-trip must be lossless for a batch mixing Some and None fields" ); } diff --git a/datadog-sidecar/src/service/blocking.rs b/datadog-sidecar/src/service/blocking.rs index 80a9257384..e69b992316 100644 --- a/datadog-sidecar/src/service/blocking.rs +++ b/datadog-sidecar/src/service/blocking.rs @@ -156,6 +156,46 @@ impl SidecarTransport { Err(e) } + /// Like [`with_retry`](Self::with_retry) but accepts an `FnMut` closure, so the + /// closure may mutate captured state between the first attempt and the retry. + /// + /// Needed by [`enqueue_actions_reliable`] whose payload (`Vec`) is + /// not `Clone`: the closure `take`s the actions out of an `Option` per attempt + /// rather than cloning them. Reconnect/retry-once semantics are identical to + /// `with_retry`. + fn with_retry_mut(&mut self, mut f: F) -> io::Result + where + F: FnMut(&mut SidecarSender) -> io::Result, + { + let e = { + let mut inner = match self.inner.lock() { + Ok(t) => t, + Err(e) => return Err(io::Error::other(e.to_string())), + }; + match f(&mut inner) { + Ok(ret) => return Ok(ret), + Err(e) => e, + } + }; + if e.kind() == io::ErrorKind::BrokenPipe + || e.kind() == io::ErrorKind::ConnectionReset + || e.kind() == io::ErrorKind::NotConnected + { + warn!("with_retry_mut ({}): The sidecar transport is closed. Reconnecting... This generally indicates a problem with the sidecar, most likely a crash. Check the logs / core dump locations and possibly report a bug", e.kind()); + if let Some(ref reconnect) = self.reconnect_fn { + if Self::do_reconnect(&mut self.inner, reconnect, true) { + return f(&mut self.inner.lock_or_panic()); + } + } + } else { + warn!( + "with_retry_mut: non-connection error ({:?}), not reconnecting", + e.kind() + ); + } + Err(e) + } + /// Send garbage data (used in tests to verify error handling). pub fn send_garbage(&mut self) -> io::Result<()> { match self.inner.lock() { @@ -222,6 +262,36 @@ pub fn enqueue_actions( Ok(()) } +/// Reliably enqueues a list of actions to be performed. +/// +/// Unlike [`enqueue_actions`], this uses the **checked, blocking** channel path with +/// NO load-shedding and NO silent drop: the `io::Result` from the send propagates to +/// the caller. On a broken pipe / connection reset / not-connected error the transport +/// reconnects (the same reconnect path that replays metric registrations) and the send +/// is retried exactly once on the fresh connection. +/// +/// Intended for one-shot, non-replayed payloads (e.g. FFE flagevaluation batches) that +/// must not be silently dropped under transient backpressure or a broken pipe. +/// +/// `actions` is not `Clone` (the `SidecarAction::Telemetry` variant is intentionally +/// not clonable), so it is held in an `Option` that the retry closure `take`s; on the +/// first attempt's connection error the actions are re-stocked from the returned send +/// failure and replayed once after reconnect. +pub fn enqueue_actions_reliable( + transport: &mut SidecarTransport, + instance_id: &InstanceId, + queue_id: &QueueId, + actions: Vec, +) -> io::Result<()> { + let mut pending = Some(actions); + transport.with_retry_mut(|sender| { + let actions = pending + .take() + .expect("enqueue_actions_reliable retry invoked without pending actions"); + sender.enqueue_actions_reliable(instance_id.clone(), *queue_id, actions) + }) +} + /// Removes the application entry for the given queue ID from the instance. pub fn clear_queue_id( transport: &mut SidecarTransport, diff --git a/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs b/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs index 4a3a8b1103..c3cdc95df1 100644 --- a/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs +++ b/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs @@ -52,7 +52,7 @@ pub(crate) async fn send_batch( endpoint: &Endpoint, batch: FfeFlagEvaluationBatch, ) { - let payload = match serde_json::to_string(&batch) { + let payload = match build_payload(&batch) { Ok(p) => p, Err(e) => { debug!("ffe_flagevaluation_flusher: failed to encode batch payload: {e:?}"); @@ -62,6 +62,113 @@ pub(crate) async fn send_batch( send_payload(client, endpoint, payload).await; } +/// Build the EVP POST body from a batch. +/// +/// The flagevaluation types are serialized over the sidecar's **bincode** IPC +/// wire, which is non-self-describing: a field omitted by `skip_serializing_if` +/// would misalign the derived `Deserialize` and cause the sidecar to drop the +/// whole batch. The types therefore carry **no** `skip_serializing_if` and emit +/// every field (optional ones as `null`/`false`/`""`). The flageval-worker EVP +/// schema, however, rejects those null/empty placeholders (especially for +/// degraded-tier events), so this helper strips them here, on the outbound POST +/// only — reproducing the old skip-serialization semantics without breaking the +/// bincode wire. +/// +/// Two transforms happen, in order, on each `flagEvaluations` element: +/// 1. `context.evaluation` is carried as a JSON-object **string** (bincode +/// cannot encode `serde_json::Value`); it is re-expanded back into a JSON +/// **object** in place. An unparseable string drops the field gracefully +/// (never panics), matching the best-effort telemetry contract. +/// 2. The whole value is recursively cleaned (`strip_placeholders`) so the +/// POST contains no `null`, `false`, empty-string, empty-object, or +/// empty-array placeholder entries. Numeric zeros (timestamps/counts) are +/// preserved — they are real data. +fn build_payload(batch: &FfeFlagEvaluationBatch) -> Result { + let mut value = serde_json::to_value(batch)?; + + if let Some(events) = value + .get_mut("flagEvaluations") + .and_then(serde_json::Value::as_array_mut) + { + for event in events { + let Some(context) = event.get_mut("context") else { + continue; + }; + let Some(evaluation) = context.get_mut("evaluation") else { + continue; + }; + if let Some(s) = evaluation.as_str() { + match serde_json::from_str::(s) { + // Re-expand the JSON-object string into an object in place. + Ok(parsed) => *evaluation = parsed, + // Unparseable string → drop the field gracefully (never panic). + Err(_) => { + if let Some(obj) = context.as_object_mut() { + obj.remove("evaluation"); + } + } + } + } + } + } + + // Strip null/empty placeholders so the EVP POST matches the flageval-worker + // schema (which rejects null placeholders) — see the function doc comment. + strip_placeholders(&mut value); + + serde_json::to_string(&value) +} + +/// Recursively remove placeholder entries from a JSON value so the EVP POST +/// carries no null/empty fields, reproducing the old `skip_serializing_if` +/// semantics on the outbound wire only. +/// +/// An object entry (or array element) is dropped when its value, after the +/// children have themselves been cleaned (bottom-up), is one of: +/// - `null` (was `Option::is_none`) +/// - `false` (was the `runtime_default_used` bool skip) +/// - `""` (was `String::is_empty`, e.g. `ContextDD::service`) +/// - `{}` (an object that became empty after cleaning, e.g. a +/// `context.dd` whose only field `service` was stripped) +/// - `[]` (an array that became empty after cleaning) +/// +/// Numeric values (including `0`) are NEVER removed — timestamps and counts are +/// real data. Non-zero bools (`true`) and non-empty strings/collections are +/// kept. +fn strip_placeholders(value: &mut serde_json::Value) { + match value { + serde_json::Value::Object(map) => { + // Clean children first (bottom-up), then drop entries that are now + // placeholders, so a container emptied by cleaning is itself removed. + for child in map.values_mut() { + strip_placeholders(child); + } + map.retain(|_, v| !is_placeholder(v)); + } + serde_json::Value::Array(items) => { + for item in items.iter_mut() { + strip_placeholders(item); + } + items.retain(|v| !is_placeholder(v)); + } + _ => {} + } +} + +/// Whether a (already-cleaned) JSON value is an empty/null placeholder that +/// should be dropped from the POST. Numeric zeros are NOT placeholders. +fn is_placeholder(value: &serde_json::Value) -> bool { + match value { + serde_json::Value::Null => true, + serde_json::Value::Bool(b) => !b, + serde_json::Value::String(s) => s.is_empty(), + serde_json::Value::Object(map) => map.is_empty(), + serde_json::Value::Array(items) => items.is_empty(), + // Numbers (incl. 0) are real data — never placeholders. + serde_json::Value::Number(_) => false, + } +} + async fn send_payload( client: &C, endpoint: &Endpoint, @@ -123,10 +230,14 @@ fn truncate(bytes: &[u8], cap: usize) -> String { mod tests { use super::*; use crate::service::{FfeFlagEvaluationBatch, FfeTelemetryContext}; - use datadog_ffe::telemetry::flagevaluation::{FfeFlagEvaluationEvent, FlagKey}; + use datadog_ffe::telemetry::flagevaluation::{ + AllocationKey, ContextDD, EvalError, FfeFlagEvaluationEvent, FlagEvalEventContext, FlagKey, + VariantKey, + }; use httpmock::MockServer; use libdd_capabilities::{HttpError, MaybeSend}; use libdd_capabilities_impl::NativeCapabilities; + use std::collections::BTreeMap; use std::future; fn endpoint_for(server: &MockServer) -> Endpoint { @@ -156,7 +267,19 @@ mod tests { variant: None, allocation: None, targeting_key: None, - context: None, + // `evaluation` is carried as a JSON-object STRING on the wire (bincode + // can't carry serde_json::Value); the flusher re-expands it to an object. + context: Some(FlagEvalEventContext { + evaluation: Some( + serde_json::to_string(&{ + let mut m = BTreeMap::new(); + m.insert("country".to_owned(), serde_json::json!("US")); + m + }) + .unwrap(), + ), + dd: None, + }), error: None, runtime_default_used: false, } @@ -169,6 +292,211 @@ mod tests { } } + // A full-tier event with every optional field populated. + fn full_event() -> FfeFlagEvaluationEvent { + FfeFlagEvaluationEvent { + timestamp: 1_700_000_000_000, + flag: FlagKey { + key: "my-flag".to_owned(), + }, + first_evaluation: 1_699_999_990_000, + last_evaluation: 1_700_000_000_000, + evaluation_count: 42, + variant: Some(VariantKey { + key: "on".to_owned(), + }), + allocation: Some(AllocationKey { + key: "alloc-a".to_owned(), + }), + targeting_key: Some("user-123".to_owned()), + context: Some(FlagEvalEventContext { + evaluation: Some( + serde_json::to_string(&{ + let mut m = BTreeMap::new(); + m.insert("plan".to_owned(), serde_json::json!("premium")); + m + }) + .unwrap(), + ), + dd: Some(ContextDD { + service: "frontend".to_owned(), + }), + }), + error: Some(EvalError { + message: "boom".to_owned(), + }), + runtime_default_used: true, + } + } + + // A degraded-tier event: all optional fields are None/false. On the bincode + // wire these serialize as null/false placeholders; build_payload must strip + // them so the flageval-worker schema sees no null placeholders. + fn degraded_event() -> FfeFlagEvaluationEvent { + FfeFlagEvaluationEvent { + timestamp: 1_700_000_000_000, + flag: FlagKey { + key: "flag-b".to_owned(), + }, + first_evaluation: 1_699_999_990_000, + last_evaluation: 1_700_000_000_000, + evaluation_count: 7, + variant: None, + allocation: None, + targeting_key: None, + context: None, + error: None, + runtime_default_used: false, + } + } + + // Test: a degraded-tier event (all optional fields None/false) serializes to + // a POST object with NO null/empty placeholder keys. Required numeric fields + // (timestamps, counts — including any zeros) are preserved. + #[test] + fn build_payload_strips_degraded_tier_placeholders() { + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![degraded_event()], + }; + let payload = build_payload(&batch).expect("build_payload must succeed"); + let v: serde_json::Value = serde_json::from_str(&payload).unwrap(); + let ev = &v["flagEvaluations"][0]; + + // Required fields present. + assert_eq!(ev["flag"]["key"], "flag-b"); + assert_eq!(ev["evaluation_count"], 7); + assert!(ev["first_evaluation"].is_number()); + assert!(ev["last_evaluation"].is_number()); + assert!(ev["timestamp"].is_number()); + + // No null/empty placeholder keys. + assert!(ev.get("variant").is_none(), "variant must be stripped"); + assert!( + ev.get("allocation").is_none(), + "allocation must be stripped" + ); + assert!( + ev.get("targeting_key").is_none(), + "targeting_key must be stripped" + ); + assert!(ev.get("context").is_none(), "context must be stripped"); + assert!(ev.get("error").is_none(), "error must be stripped"); + assert!( + ev.get("runtime_default_used").is_none(), + "runtime_default_used=false must be stripped" + ); + } + + // Test: a full-tier event keeps all populated optional fields, with + // context.evaluation expanded to an OBJECT and context.dd preserved. + #[test] + fn build_payload_keeps_full_tier_fields() { + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![full_event()], + }; + let payload = build_payload(&batch).expect("build_payload must succeed"); + let v: serde_json::Value = serde_json::from_str(&payload).unwrap(); + let ev = &v["flagEvaluations"][0]; + + assert_eq!(ev["variant"]["key"], "on", "variant must be kept"); + assert_eq!( + ev["allocation"]["key"], "alloc-a", + "allocation must be kept" + ); + assert_eq!( + ev["targeting_key"], "user-123", + "targeting_key must be kept" + ); + assert_eq!(ev["error"]["message"], "boom", "error must be kept"); + assert_eq!( + ev["runtime_default_used"], true, + "runtime_default_used=true must be kept" + ); + + // context.evaluation is expanded to an OBJECT (not a string), and dd is kept. + let ctx = &ev["context"]; + assert!( + ctx["evaluation"].is_object(), + "context.evaluation must be an object: {}", + ctx["evaluation"] + ); + assert_eq!(ctx["evaluation"]["plan"], "premium"); + assert_eq!( + ctx["dd"]["service"], "frontend", + "context.dd.service must be kept" + ); + } + + // Test: a context whose only dd field (service) is empty collapses entirely — + // empty service is stripped, the now-empty dd object is stripped, and if + // evaluation is also absent the whole context object is removed. + #[test] + fn build_payload_collapses_empty_nested_context() { + let mut ev = degraded_event(); + ev.context = Some(FlagEvalEventContext { + evaluation: None, + dd: Some(ContextDD { + service: String::new(), + }), + }); + let batch = FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: vec![ev], + }; + let payload = build_payload(&batch).expect("build_payload must succeed"); + let v: serde_json::Value = serde_json::from_str(&payload).unwrap(); + + assert!( + v["flagEvaluations"][0].get("context").is_none(), + "a context that becomes empty after cleaning must be removed entirely" + ); + } + + // Test: build_payload re-expands the wire-side JSON-object STRING in + // `context.evaluation` into a JSON OBJECT in the POST body (EVP schema shape). + #[test] + fn build_payload_expands_evaluation_string_into_object() { + let payload = build_payload(&batch()).expect("build_payload must succeed"); + let v: serde_json::Value = serde_json::from_str(&payload).unwrap(); + + let evaluation = &v["flagEvaluations"][0]["context"]["evaluation"]; + assert!( + evaluation.is_object(), + "context.evaluation must be a JSON object in the POST body, not a string: {evaluation}" + ); + assert_eq!( + evaluation["country"], "US", + "the expanded object must preserve the original key/value" + ); + assert!( + !evaluation.is_string(), + "context.evaluation must not remain a quoted string" + ); + } + + // Test: an unparseable `evaluation` string is dropped gracefully (no panic, + // no malformed body) rather than left as a raw string in the POST body. + #[test] + fn build_payload_drops_unparseable_evaluation_gracefully() { + let mut batch = batch(); + batch.flag_evaluations[0].context = Some(FlagEvalEventContext { + evaluation: Some("this is not json".to_owned()), + dd: None, + }); + + let payload = build_payload(&batch).expect("build_payload must not fail on bad input"); + let v: serde_json::Value = serde_json::from_str(&payload).unwrap(); + + assert!( + v["flagEvaluations"][0]["context"] + .get("evaluation") + .is_none(), + "unparseable evaluation must be dropped from the body" + ); + } + #[tokio::test] #[cfg_attr(miri, ignore)] async fn posts_to_evp_proxy() { diff --git a/datadog-sidecar/src/service/sender.rs b/datadog-sidecar/src/service/sender.rs index e88720e23f..dd0234315c 100644 --- a/datadog-sidecar/src/service/sender.rs +++ b/datadog-sidecar/src/service/sender.rs @@ -353,6 +353,34 @@ impl SidecarSender { .try_send_enqueue_actions(instance_id, queue_id, actions); } + /// Reliably enqueue actions: NO load-shedding, NO silent drop. + /// + /// Builds the SAME `EnqueueActions` request as [`enqueue_actions`] / + /// `try_send_enqueue_actions`, but sends it via the checked, blocking channel + /// path ([`SidecarInterfaceChannel::send_request_blocking`]) so the `io::Result` + /// propagates to the caller instead of being dropped. The outbox is drained + /// (blocking) first so priority state-change messages are not reordered behind + /// this send. + /// + /// Used for one-shot, non-replayed payloads (e.g. FFE flagevaluation batches) + /// that must not be lost under transient backpressure or a broken pipe. A + /// `BrokenPipe`/`ConnectionReset`/`NotConnected` error returned here lets + /// `SidecarTransport::with_retry` reconnect and retry once. + pub fn enqueue_actions_reliable( + &mut self, + instance_id: InstanceId, + queue_id: QueueId, + actions: Vec, + ) -> io::Result<()> { + self.drain_outbox_blocking(); + let req = SidecarInterfaceRequest::EnqueueActions { + instance_id, + queue_id, + actions, + }; + self.channel.send_request_blocking(&req) + } + pub fn send_trace_v04_shm( &mut self, instance_id: InstanceId, diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 47af7dbbc6..04e12b35d4 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -457,9 +457,7 @@ impl SidecarInterface for ConnectionSidecarHandler { ); } } else { - debug!( - "ffe_flagevaluation_flusher: no session endpoint, dropping batch" - ); + debug!("ffe_flagevaluation_flusher: no session endpoint, dropping batch"); } false } diff --git a/datadog-sidecar/src/service/telemetry.rs b/datadog-sidecar/src/service/telemetry.rs index 4213a59399..845f478b31 100644 --- a/datadog-sidecar/src/service/telemetry.rs +++ b/datadog-sidecar/src/service/telemetry.rs @@ -453,10 +453,10 @@ impl TelemetryCachedClient { warn!("Attempted to send telemetry point for unregistered metric: {metric_name}"); } } - SidecarAction::PhpComposerTelemetryFile(_) => {} // handled separately - SidecarAction::FfeExposureBatch(_) => {} // handled in sidecar_server - SidecarAction::FfeFlagEvaluationBatch(_) => {} // handled in sidecar_server - SidecarAction::FfeEvaluationMetrics { .. } => {} // handled in sidecar_server + SidecarAction::PhpComposerTelemetryFile(_) => {} // handled separately + SidecarAction::FfeExposureBatch(_) => {} // handled in sidecar_server + SidecarAction::FfeFlagEvaluationBatch(_) => {} // handled in sidecar_server + SidecarAction::FfeEvaluationMetrics { .. } => {} // handled in sidecar_server } } actions From d02b30a9280c4a7c8ed1a3979e75bc1638745f5c Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Mon, 15 Jun 2026 10:45:34 -0400 Subject: [PATCH 4/8] style(ffe): rustfmt + fix clippy expect-on-Option in enqueue_actions_reliable --- datadog-sidecar/src/service/blocking.rs | 11 ++++++++--- .../src/service/ffe_flagevaluation_flusher.rs | 19 +++++++++---------- 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/datadog-sidecar/src/service/blocking.rs b/datadog-sidecar/src/service/blocking.rs index e69b992316..a988cc451b 100644 --- a/datadog-sidecar/src/service/blocking.rs +++ b/datadog-sidecar/src/service/blocking.rs @@ -285,9 +285,14 @@ pub fn enqueue_actions_reliable( ) -> io::Result<()> { let mut pending = Some(actions); transport.with_retry_mut(|sender| { - let actions = pending - .take() - .expect("enqueue_actions_reliable retry invoked without pending actions"); + // `actions` is not `Clone`, so it is consumed on the first attempt. If a + // reconnect-retry fires after that, there is nothing left to replay — bubble an + // error rather than panic (the eval path is best-effort and logs the failure). + let Some(actions) = pending.take() else { + return Err(io::Error::other( + "enqueue_actions_reliable: actions already consumed; cannot replay after reconnect", + )); + }; sender.enqueue_actions_reliable(instance_id.clone(), *queue_id, actions) }) } diff --git a/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs b/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs index c3cdc95df1..fd6e77999e 100644 --- a/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs +++ b/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs @@ -75,14 +75,13 @@ pub(crate) async fn send_batch( /// bincode wire. /// /// Two transforms happen, in order, on each `flagEvaluations` element: -/// 1. `context.evaluation` is carried as a JSON-object **string** (bincode -/// cannot encode `serde_json::Value`); it is re-expanded back into a JSON -/// **object** in place. An unparseable string drops the field gracefully -/// (never panics), matching the best-effort telemetry contract. -/// 2. The whole value is recursively cleaned (`strip_placeholders`) so the -/// POST contains no `null`, `false`, empty-string, empty-object, or -/// empty-array placeholder entries. Numeric zeros (timestamps/counts) are -/// preserved — they are real data. +/// 1. `context.evaluation` is carried as a JSON-object **string** (bincode cannot encode +/// `serde_json::Value`); it is re-expanded back into a JSON **object** in place. An +/// unparseable string drops the field gracefully (never panics), matching the best-effort +/// telemetry contract. +/// 2. The whole value is recursively cleaned (`strip_placeholders`) so the POST contains no +/// `null`, `false`, empty-string, empty-object, or empty-array placeholder entries. Numeric +/// zeros (timestamps/counts) are preserved — they are real data. fn build_payload(batch: &FfeFlagEvaluationBatch) -> Result { let mut value = serde_json::to_value(batch)?; @@ -128,8 +127,8 @@ fn build_payload(batch: &FfeFlagEvaluationBatch) -> Result Date: Tue, 16 Jun 2026 03:37:56 -0400 Subject: [PATCH 5/8] fix(ffe): align flagevaluation sidecar delivery with worker schema --- datadog-ffe/src/telemetry/flagevaluation.rs | 48 ++++-- datadog-sidecar-ffi/src/lib.rs | 147 +++++++++++++++++- datadog-sidecar/src/service/blocking.rs | 81 ++-------- .../src/service/ffe_flagevaluation_flusher.rs | 109 +++++++++---- datadog-sidecar/src/service/mod.rs | 19 ++- datadog-sidecar/src/service/sender.rs | 28 ---- datadog-sidecar/src/service/sidecar_server.rs | 4 +- 7 files changed, 283 insertions(+), 153 deletions(-) diff --git a/datadog-ffe/src/telemetry/flagevaluation.rs b/datadog-ffe/src/telemetry/flagevaluation.rs index 553d53cba1..145736d164 100644 --- a/datadog-ffe/src/telemetry/flagevaluation.rs +++ b/datadog-ffe/src/telemetry/flagevaluation.rs @@ -94,6 +94,9 @@ pub struct FfeFlagEvaluationEvent { /// Allocation key from the UFC rule that produced this evaluation. #[serde(default)] pub allocation: Option, + /// Targeting rule key from UFC metadata. Omit when no real rule metadata exists. + #[serde(default)] + pub targeting_rule: Option, /// Targeting key identifying the evaluation subject. #[serde(default)] pub targeting_key: Option, @@ -133,6 +136,12 @@ pub struct AllocationKey { pub key: String, } +/// Holds the targeting rule key for the `targeting_rule` field. +#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] +pub struct TargetingRuleKey { + pub key: String, +} + /// Holds the error message for the `error` field. #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct EvalError { @@ -181,11 +190,12 @@ pub struct ContextDD { /// Prune evaluation context attributes to satisfy the frozen contract bounds: /// - At most `MAX_CONTEXT_FIELDS` (256) entries are kept. -/// - String values longer than `MAX_FIELD_LENGTH` (256 chars) are **skipped** (not truncated) to -/// avoid partial-data misattribution. -/// - Non-string values (bool, number, null) are kept regardless of their display length. -/// - Keys are iterated in sorted order for deterministic canonical-key stability; the returned -/// `BTreeMap` preserves that order. +/// - String values longer than `MAX_FIELD_LENGTH` (256 chars) are **skipped** +/// (not truncated) to avoid partial-data misattribution. +/// - Non-string values (bool, number, null) are kept regardless of +/// their display length. +/// - Keys are iterated in sorted order for deterministic canonical-key +/// stability; the returned `BTreeMap` preserves that order. /// /// This satisfies reviewer concern #1 (`review:4477935835`). pub fn prune_context( @@ -237,6 +247,7 @@ mod tests { key: "alloc-a".to_owned(), }), targeting_key: Some("user-123".to_owned()), + targeting_rule: None, context: Some(FlagEvalEventContext { evaluation: Some( serde_json::to_string(&{ @@ -291,6 +302,7 @@ mod tests { evaluation_count: 7, variant: None, allocation: None, + targeting_rule: None, targeting_key: None, context: None, error: None, @@ -300,15 +312,15 @@ mod tests { // ── Test: degraded-tier event serializes optional fields as null ────────── // - // The type no longer uses `skip_serializing_if` (bincode-wire safety), so on + // The type does not use `skip_serializing_if` (bincode-wire safety), so on // the wire `None`/`false` optional fields ARE present (as null/false). The - // null-placeholder stripping that the flageval-worker schema requires now - // happens in the sidecar flusher's `build_payload`, not at the type level — - // see `ffe_flagevaluation_flusher` tests. + // null-placeholder stripping that the flageval-worker schema requires + // happens in the sidecar flusher's `build_payload`, not at the type level. #[test] fn degraded_tier_event_serializes_optional_fields_as_null() { - let json = serde_json::to_string(°raded_event()).unwrap(); + let degraded = degraded_event(); + let json = serde_json::to_string(°raded).unwrap(); let v: Value = serde_json::from_str(&json).unwrap(); // Required fields present. @@ -318,12 +330,16 @@ mod tests { assert_eq!(v["evaluation_count"], 7); // Optional fields are present as null/false placeholders on the wire - // (stripped later by the flusher, NOT at the type level). + // (stripped later by the flusher, not at the type level). assert!(v["variant"].is_null(), "variant should serialize as null"); assert!( v["allocation"].is_null(), "allocation should serialize as null" ); + assert!( + v["targeting_rule"].is_null(), + "targeting_rule should serialize as null" + ); assert!( v["targeting_key"].is_null(), "targeting_key should serialize as null" @@ -338,12 +354,12 @@ mod tests { // ── Test: bincode round-trip with mixed Some/None fields ────────────────── // - // This is the mechanical guard for the worker→sidecar IPC bug: bincode is a + // Mechanical guard for the worker→sidecar IPC bug: bincode is a // non-self-describing codec, so any `skip_serializing_if` on these types - // would omit a field on serialize while the derived Deserialize still - // expects it in order → field misalignment → the sidecar drops the batch. - // A batch mixing a full-tier event (Some fields) and a degraded-tier event - // (None fields) must survive serialize→deserialize byte-for-byte. + // would omit a field on serialize while derived Deserialize still expects it + // in order → field misalignment → the sidecar drops the batch. A batch + // mixing a full-tier event (Some fields) and degraded-tier event (None + // fields) must survive serialize→deserialize byte-for-byte. #[test] fn batch_round_trips_via_bincode_with_mixed_optional_fields() { diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 2855bf3d58..ef5852f2db 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -25,10 +25,14 @@ use datadog_sidecar::service::agent_info::AgentInfoReader; use datadog_sidecar::service::telemetry::InternalTelemetryAction; use datadog_sidecar::service::{ blocking::{self, SidecarTransport}, - DynamicInstrumentationConfigState, FfeEvaluationMetric as SidecarFfeEvaluationMetric, - FfeExposure as SidecarFfeExposure, FfeExposureBatch as SidecarFfeExposureBatch, - FfeTelemetryContext as SidecarFfeTelemetryContext, InstanceId, QueueId, RuntimeMetadata, - SerializedTracerHeaderTags, SessionConfig, SidecarAction, SidecarFlushOptions, + AllocationKey, ContextDD, DynamicInstrumentationConfigState, EvalError, + FfeEvaluationMetric as SidecarFfeEvaluationMetric, FfeExposure as SidecarFfeExposure, + FfeExposureBatch as SidecarFfeExposureBatch, + FfeFlagEvaluationBatch as SidecarFfeFlagEvaluationBatch, + FfeFlagEvaluationEvent as SidecarFfeFlagEvaluationEvent, + FfeTelemetryContext as SidecarFfeTelemetryContext, FlagEvalEventContext, FlagKey, InstanceId, + QueueId, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction, + SidecarFlushOptions, TargetingRuleKey, VariantKey, }; use datadog_sidecar::service::{get_telemetry_action_sender, InternalTelemetryActions}; use datadog_sidecar::shm_remote_config::{path_for_remote_config, RemoteConfigReader}; @@ -1174,6 +1178,23 @@ pub struct FfeEvaluationMetric<'a> { pub allocation_key: CharSlice<'a>, } +#[repr(C)] +pub struct FfeFlagEvaluation<'a> { + pub timestamp_ms: i64, + pub flag_key: CharSlice<'a>, + pub first_evaluation_ms: i64, + pub last_evaluation_ms: i64, + pub evaluation_count: u64, + pub variant: CharSlice<'a>, + pub allocation_key: CharSlice<'a>, + pub targeting_rule_key: CharSlice<'a>, + pub targeting_key: CharSlice<'a>, + /// UTF-8 JSON object. Empty, invalid, or non-object JSON is omitted. + pub evaluation_context_json: CharSlice<'a>, + pub error_message: CharSlice<'a>, + pub runtime_default_used: bool, +} + /// Send structured FFE exposure events to the sidecar. The sidecar owns /// deduplication, JSON serialization, and Agent EVP delivery. This function is /// caller-driven; shared libdatadog evaluator calls do not log unless an SDK @@ -1245,6 +1266,78 @@ fn ddog_sidecar_send_ffe_exposure_batch_impl( MaybeError::None } +/// Send structured FFE flag evaluation events to the sidecar. The sidecar owns +/// JSON serialization and Agent EVP delivery. This function is caller-driven; +/// callers must aggregate and bound event cardinality before passing a batch. +/// +/// # Safety +/// `context` and every element in `flag_evaluations` must contain valid UTF-8 +/// `CharSlice` values. Empty `flag_evaluations` is a no-op. +#[no_mangle] +#[allow(clippy::missing_safety_doc)] +pub unsafe extern "C" fn ddog_sidecar_send_ffe_flag_evaluation_batch( + transport: &mut Box, + instance_id: &InstanceId, + queue_id: &QueueId, + context: &FfeTelemetryContext<'_>, + flag_evaluations: Slice>, +) -> MaybeError { + std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + ddog_sidecar_send_ffe_flag_evaluation_batch_impl( + transport, + instance_id, + queue_id, + context, + flag_evaluations, + ) + })) + .unwrap_or_else(|panic| { + MaybeError::Some(libdd_common_ffi::utils::handle_panic_error( + panic, + "ddog_sidecar_send_ffe_flag_evaluation_batch", + )) + }) +} + +fn ddog_sidecar_send_ffe_flag_evaluation_batch_impl( + transport: &mut Box, + instance_id: &InstanceId, + queue_id: &QueueId, + context: &FfeTelemetryContext<'_>, + flag_evaluations: Slice>, +) -> MaybeError { + let flag_evaluations = try_c!(flag_evaluations + .try_as_slice() + .map_err(|e| format!("Invalid flag evaluation slice: {e}"))); + + if flag_evaluations.is_empty() { + return MaybeError::None; + } + + let context = try_c!(ffe_context_from_ffi(context)); + let flag_evaluations = try_c!(flag_evaluations + .iter() + .map(|event| ffe_flag_evaluation_from_ffi(event, &context.service)) + .collect::, _>>()); + + if flag_evaluations.is_empty() { + return MaybeError::None; + } + + try_c!(blocking::enqueue_actions_reliable( + transport, + instance_id, + queue_id, + vec![SidecarAction::FfeFlagEvaluationBatch( + SidecarFfeFlagEvaluationBatch { + context, + flag_evaluations, + } + )], + )); + MaybeError::None +} + /// Send structured FFE evaluation metric events to the sidecar. The sidecar /// owns aggregation, OTLP/protobuf serialization, and OTLP HTTP delivery. This /// function is caller-driven so SDKs with existing host-language hooks can @@ -1309,6 +1402,37 @@ fn ffe_exposure_from_ffi(exposure: &FfeExposure<'_>) -> Result, + service: &str, +) -> Result { + let evaluation = optional_json_object_string(event.evaluation_context_json)?; + let context = evaluation.map(|evaluation| FlagEvalEventContext { + evaluation: Some(evaluation), + dd: Some(ContextDD { + service: service.to_owned(), + }), + }); + + Ok(SidecarFfeFlagEvaluationEvent { + timestamp: event.timestamp_ms, + flag: FlagKey { + key: char_slice_to_string(event.flag_key)?, + }, + first_evaluation: event.first_evaluation_ms, + last_evaluation: event.last_evaluation_ms, + evaluation_count: event.evaluation_count, + variant: optional_string(event.variant)?.map(|key| VariantKey { key }), + allocation: optional_string(event.allocation_key)?.map(|key| AllocationKey { key }), + targeting_rule: optional_string(event.targeting_rule_key)? + .map(|key| TargetingRuleKey { key }), + targeting_key: optional_string(event.targeting_key)?, + context, + error: optional_string(event.error_message)?.map(|message| EvalError { message }), + runtime_default_used: event.runtime_default_used, + }) +} + fn ffe_metric_from_ffi( metric: &FfeEvaluationMetric<'_>, ) -> Result { @@ -1329,6 +1453,21 @@ fn optional_string(slice: CharSlice) -> Result, String> { } } +fn optional_json_object_string(slice: CharSlice) -> Result, String> { + let Some(raw) = optional_string(slice)? else { + return Ok(None); + }; + let value = match serde_json::from_str::(&raw) { + Ok(value) => value, + Err(_) => return Ok(None), + }; + if value.is_object() { + Ok(Some(value.to_string())) + } else { + Ok(None) + } +} + #[no_mangle] #[allow(clippy::missing_safety_doc)] #[allow(improper_ctypes_definitions)] // DebuggerPayload is just a pointer, we hide its internals diff --git a/datadog-sidecar/src/service/blocking.rs b/datadog-sidecar/src/service/blocking.rs index a988cc451b..3b88c5ea21 100644 --- a/datadog-sidecar/src/service/blocking.rs +++ b/datadog-sidecar/src/service/blocking.rs @@ -6,7 +6,7 @@ use super::{ SessionConfig, SidecarAction, SidecarFlushOptions, }; use crate::service::sender::SidecarSender; -use crate::service::sidecar_interface::SidecarInterfaceChannel; +use crate::service::sidecar_interface::{SidecarInterfaceChannel, SidecarInterfaceRequest}; use datadog_ipc::platform::{FileBackedHandle, ShmHandle}; use datadog_ipc::SeqpacketConn; use datadog_live_debugger::debugger_defs::DebuggerPayload; @@ -156,46 +156,6 @@ impl SidecarTransport { Err(e) } - /// Like [`with_retry`](Self::with_retry) but accepts an `FnMut` closure, so the - /// closure may mutate captured state between the first attempt and the retry. - /// - /// Needed by [`enqueue_actions_reliable`] whose payload (`Vec`) is - /// not `Clone`: the closure `take`s the actions out of an `Option` per attempt - /// rather than cloning them. Reconnect/retry-once semantics are identical to - /// `with_retry`. - fn with_retry_mut(&mut self, mut f: F) -> io::Result - where - F: FnMut(&mut SidecarSender) -> io::Result, - { - let e = { - let mut inner = match self.inner.lock() { - Ok(t) => t, - Err(e) => return Err(io::Error::other(e.to_string())), - }; - match f(&mut inner) { - Ok(ret) => return Ok(ret), - Err(e) => e, - } - }; - if e.kind() == io::ErrorKind::BrokenPipe - || e.kind() == io::ErrorKind::ConnectionReset - || e.kind() == io::ErrorKind::NotConnected - { - warn!("with_retry_mut ({}): The sidecar transport is closed. Reconnecting... This generally indicates a problem with the sidecar, most likely a crash. Check the logs / core dump locations and possibly report a bug", e.kind()); - if let Some(ref reconnect) = self.reconnect_fn { - if Self::do_reconnect(&mut self.inner, reconnect, true) { - return f(&mut self.inner.lock_or_panic()); - } - } - } else { - warn!( - "with_retry_mut: non-connection error ({:?}), not reconnecting", - e.kind() - ); - } - Err(e) - } - /// Send garbage data (used in tests to verify error handling). pub fn send_garbage(&mut self) -> io::Result<()> { match self.inner.lock() { @@ -264,37 +224,28 @@ pub fn enqueue_actions( /// Reliably enqueues a list of actions to be performed. /// -/// Unlike [`enqueue_actions`], this uses the **checked, blocking** channel path with -/// NO load-shedding and NO silent drop: the `io::Result` from the send propagates to -/// the caller. On a broken pipe / connection reset / not-connected error the transport -/// reconnects (the same reconnect path that replays metric registrations) and the send -/// is retried exactly once on the fresh connection. +/// Unlike [`enqueue_actions`], this uses the checked, blocking channel path with +/// no load-shedding and no silent drop: the `io::Result` from the send +/// propagates to the caller. On a broken pipe / connection reset / +/// not-connected error the transport reconnects and retries the exact same +/// pre-encoded request bytes once on the fresh connection. /// -/// Intended for one-shot, non-replayed payloads (e.g. FFE flagevaluation batches) that -/// must not be silently dropped under transient backpressure or a broken pipe. -/// -/// `actions` is not `Clone` (the `SidecarAction::Telemetry` variant is intentionally -/// not clonable), so it is held in an `Option` that the retry closure `take`s; on the -/// first attempt's connection error the actions are re-stocked from the returned send -/// failure and replayed once after reconnect. +/// Intended for one-shot, non-replayed payloads (for example FFE +/// flagevaluation batches) that must not be silently lost under transient +/// backpressure or a broken pipe. pub fn enqueue_actions_reliable( transport: &mut SidecarTransport, instance_id: &InstanceId, queue_id: &QueueId, actions: Vec, ) -> io::Result<()> { - let mut pending = Some(actions); - transport.with_retry_mut(|sender| { - // `actions` is not `Clone`, so it is consumed on the first attempt. If a - // reconnect-retry fires after that, there is nothing left to replay — bubble an - // error rather than panic (the eval path is best-effort and logs the failure). - let Some(actions) = pending.take() else { - return Err(io::Error::other( - "enqueue_actions_reliable: actions already consumed; cannot replay after reconnect", - )); - }; - sender.enqueue_actions_reliable(instance_id.clone(), *queue_id, actions) - }) + let req = SidecarInterfaceRequest::EnqueueActions { + instance_id: instance_id.clone(), + queue_id: *queue_id, + actions, + }; + let data = datadog_ipc::codec::encode(&req); + transport.with_retry(|sender| sender.drain_and_send_raw_blocking(&data)) } /// Removes the application entry for the given queue ID from the instance. diff --git a/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs b/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs index fd6e77999e..0993e66349 100644 --- a/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs +++ b/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs @@ -80,7 +80,8 @@ pub(crate) async fn send_batch( /// unparseable string drops the field gracefully (never panics), matching the best-effort /// telemetry contract. /// 2. The whole value is recursively cleaned (`strip_placeholders`) so the POST contains no -/// `null`, `false`, empty-string, empty-object, or empty-array placeholder entries. Numeric +/// optional-field placeholders. `context.evaluation` user values are preserved as-is; boolean +/// `false`, empty strings, empty objects, and empty arrays are valid context values. Numeric /// zeros (timestamps/counts) are preserved — they are real data. fn build_payload(batch: &FfeFlagEvaluationBatch) -> Result { let mut value = serde_json::to_value(batch)?; @@ -122,15 +123,18 @@ fn build_payload(batch: &FfeFlagEvaluationBatch) -> Result { // Clean children first (bottom-up), then drop entries that are now // placeholders, so a container emptied by cleaning is itself removed. - for child in map.values_mut() { - strip_placeholders(child); + for (key, child) in map.iter_mut() { + // `context.evaluation` contains user context values. Boolean + // false, empty strings, empty objects, and empty arrays are + // valid there and must not be stripped as optional-field + // placeholders. + if key != "evaluation" { + strip_placeholders(child); + } } - map.retain(|_, v| !is_placeholder(v)); + map.retain(|key, v| !is_placeholder(key, v)); } serde_json::Value::Array(items) => { for item in items.iter_mut() { strip_placeholders(item); } - items.retain(|v| !is_placeholder(v)); + items.retain(|v| !is_array_placeholder(v)); } _ => {} } @@ -156,11 +166,11 @@ fn strip_placeholders(value: &mut serde_json::Value) { /// Whether a (already-cleaned) JSON value is an empty/null placeholder that /// should be dropped from the POST. Numeric zeros are NOT placeholders. -fn is_placeholder(value: &serde_json::Value) -> bool { +fn is_placeholder(key: &str, value: &serde_json::Value) -> bool { match value { serde_json::Value::Null => true, - serde_json::Value::Bool(b) => !b, - serde_json::Value::String(s) => s.is_empty(), + serde_json::Value::Bool(b) => key == "runtime_default_used" && !b, + serde_json::Value::String(s) => key == "service" && s.is_empty(), serde_json::Value::Object(map) => map.is_empty(), serde_json::Value::Array(items) => items.is_empty(), // Numbers (incl. 0) are real data — never placeholders. @@ -168,6 +178,15 @@ fn is_placeholder(value: &serde_json::Value) -> bool { } } +fn is_array_placeholder(value: &serde_json::Value) -> bool { + match value { + serde_json::Value::Null => true, + serde_json::Value::Object(map) => map.is_empty(), + serde_json::Value::Array(items) => items.is_empty(), + _ => false, + } +} + async fn send_payload( client: &C, endpoint: &Endpoint, @@ -231,7 +250,7 @@ mod tests { use crate::service::{FfeFlagEvaluationBatch, FfeTelemetryContext}; use datadog_ffe::telemetry::flagevaluation::{ AllocationKey, ContextDD, EvalError, FfeFlagEvaluationEvent, FlagEvalEventContext, FlagKey, - VariantKey, + TargetingRuleKey, VariantKey, }; use httpmock::MockServer; use libdd_capabilities::{HttpError, MaybeSend}; @@ -265,6 +284,7 @@ mod tests { evaluation_count: 5, variant: None, allocation: None, + targeting_rule: None, targeting_key: None, // `evaluation` is carried as a JSON-object STRING on the wire (bincode // can't carry serde_json::Value); the flusher re-expands it to an object. @@ -291,7 +311,6 @@ mod tests { } } - // A full-tier event with every optional field populated. fn full_event() -> FfeFlagEvaluationEvent { FfeFlagEvaluationEvent { timestamp: 1_700_000_000_000, @@ -307,6 +326,9 @@ mod tests { allocation: Some(AllocationKey { key: "alloc-a".to_owned(), }), + targeting_rule: Some(TargetingRuleKey { + key: "rule-1".to_owned(), + }), targeting_key: Some("user-123".to_owned()), context: Some(FlagEvalEventContext { evaluation: Some( @@ -328,9 +350,6 @@ mod tests { } } - // A degraded-tier event: all optional fields are None/false. On the bincode - // wire these serialize as null/false placeholders; build_payload must strip - // them so the flageval-worker schema sees no null placeholders. fn degraded_event() -> FfeFlagEvaluationEvent { FfeFlagEvaluationEvent { timestamp: 1_700_000_000_000, @@ -342,6 +361,7 @@ mod tests { evaluation_count: 7, variant: None, allocation: None, + targeting_rule: None, targeting_key: None, context: None, error: None, @@ -349,9 +369,6 @@ mod tests { } } - // Test: a degraded-tier event (all optional fields None/false) serializes to - // a POST object with NO null/empty placeholder keys. Required numeric fields - // (timestamps, counts — including any zeros) are preserved. #[test] fn build_payload_strips_degraded_tier_placeholders() { let batch = FfeFlagEvaluationBatch { @@ -362,19 +379,21 @@ mod tests { let v: serde_json::Value = serde_json::from_str(&payload).unwrap(); let ev = &v["flagEvaluations"][0]; - // Required fields present. assert_eq!(ev["flag"]["key"], "flag-b"); assert_eq!(ev["evaluation_count"], 7); assert!(ev["first_evaluation"].is_number()); assert!(ev["last_evaluation"].is_number()); assert!(ev["timestamp"].is_number()); - // No null/empty placeholder keys. assert!(ev.get("variant").is_none(), "variant must be stripped"); assert!( ev.get("allocation").is_none(), "allocation must be stripped" ); + assert!( + ev.get("targeting_rule").is_none(), + "targeting_rule must be stripped" + ); assert!( ev.get("targeting_key").is_none(), "targeting_key must be stripped" @@ -387,8 +406,6 @@ mod tests { ); } - // Test: a full-tier event keeps all populated optional fields, with - // context.evaluation expanded to an OBJECT and context.dd preserved. #[test] fn build_payload_keeps_full_tier_fields() { let batch = FfeFlagEvaluationBatch { @@ -404,6 +421,10 @@ mod tests { ev["allocation"]["key"], "alloc-a", "allocation must be kept" ); + assert_eq!( + ev["targeting_rule"]["key"], "rule-1", + "targeting_rule must be kept" + ); assert_eq!( ev["targeting_key"], "user-123", "targeting_key must be kept" @@ -413,8 +434,11 @@ mod tests { ev["runtime_default_used"], true, "runtime_default_used=true must be kept" ); + assert!( + ev.get("reason").is_none(), + "EVP payload must not emit top-level OpenFeature reason" + ); - // context.evaluation is expanded to an OBJECT (not a string), and dd is kept. let ctx = &ev["context"]; assert!( ctx["evaluation"].is_object(), @@ -428,9 +452,6 @@ mod tests { ); } - // Test: a context whose only dd field (service) is empty collapses entirely — - // empty service is stripped, the now-empty dd object is stripped, and if - // evaluation is also absent the whole context object is removed. #[test] fn build_payload_collapses_empty_nested_context() { let mut ev = degraded_event(); @@ -453,8 +474,6 @@ mod tests { ); } - // Test: build_payload re-expands the wire-side JSON-object STRING in - // `context.evaluation` into a JSON OBJECT in the POST body (EVP schema shape). #[test] fn build_payload_expands_evaluation_string_into_object() { let payload = build_payload(&batch()).expect("build_payload must succeed"); @@ -475,8 +494,6 @@ mod tests { ); } - // Test: an unparseable `evaluation` string is dropped gracefully (no panic, - // no malformed body) rather than left as a raw string in the POST body. #[test] fn build_payload_drops_unparseable_evaluation_gracefully() { let mut batch = batch(); @@ -496,6 +513,32 @@ mod tests { ); } + #[test] + fn build_payload_preserves_false_and_empty_context_values() { + let mut batch = batch(); + batch.flag_evaluations[0].context = Some(FlagEvalEventContext { + evaluation: Some( + serde_json::json!({ + "enabled": false, + "empty": "", + "empty_object": {}, + "empty_array": [] + }) + .to_string(), + ), + dd: None, + }); + + let payload = build_payload(&batch).expect("build_payload must succeed"); + let v: serde_json::Value = serde_json::from_str(&payload).unwrap(); + let evaluation = &v["flagEvaluations"][0]["context"]["evaluation"]; + + assert_eq!(evaluation["enabled"], false); + assert_eq!(evaluation["empty"], ""); + assert!(evaluation["empty_object"].is_object()); + assert!(evaluation["empty_array"].is_array()); + } + #[tokio::test] #[cfg_attr(miri, ignore)] async fn posts_to_evp_proxy() { diff --git a/datadog-sidecar/src/service/mod.rs b/datadog-sidecar/src/service/mod.rs index 687472f2c1..9861f72b80 100644 --- a/datadog-sidecar/src/service/mod.rs +++ b/datadog-sidecar/src/service/mod.rs @@ -5,7 +5,10 @@ use crate::config; pub use datadog_ffe::telemetry::evaluation_metrics::FfeEvaluationMetric; pub use datadog_ffe::telemetry::exposures::{FfeExposure, FfeExposureBatch}; -pub use datadog_ffe::telemetry::flagevaluation::FfeFlagEvaluationBatch; +pub use datadog_ffe::telemetry::flagevaluation::{ + AllocationKey, ContextDD, EvalError, FfeFlagEvaluationBatch, FfeFlagEvaluationEvent, + FlagEvalEventContext, FlagKey, TargetingRuleKey, VariantKey, +}; pub use datadog_ffe::telemetry::FfeTelemetryContext; use libdd_common::tag::Tag; use libdd_common::Endpoint; @@ -94,11 +97,6 @@ pub enum SidecarAction { /// Structured FFE exposures. The sidecar owns JSON serialization, /// cross-request deduplication, and EVP delivery. FfeExposureBatch(FfeExposureBatch), - /// Structured FFE flag evaluation batch for the EVP flagevaluation track. - /// The sidecar serializes and POSTs the batch to - /// `/evp_proxy/v2/api/v2/flagevaluations` (fire-and-forget). PHP (EMIT-07) - /// drives the two-tier aggregation upstream and dispatches via this action. - FfeFlagEvaluationBatch(FfeFlagEvaluationBatch), /// Structured FFE evaluation metrics. The sidecar owns OTLP/protobuf /// aggregation, serialization, and delivery. This action must be sent only /// by SDKs that explicitly opted into native FFE metric ownership. @@ -106,4 +104,13 @@ pub enum SidecarAction { context: FfeTelemetryContext, metrics: Vec, }, + /// Structured FFE flag evaluation batch for the EVP flagevaluation track. + /// The sidecar serializes and POSTs the batch to + /// `/evp_proxy/v2/api/v2/flagevaluations` (fire-and-forget). PHP (EMIT-07) + /// drives the two-tier aggregation upstream and dispatches via this action. + /// + /// Keep this appended after pre-existing variants: this enum crosses the + /// bincode sidecar IPC boundary, so inserting a variant before existing + /// variants changes their wire ordinals. + FfeFlagEvaluationBatch(FfeFlagEvaluationBatch), } diff --git a/datadog-sidecar/src/service/sender.rs b/datadog-sidecar/src/service/sender.rs index dd0234315c..e88720e23f 100644 --- a/datadog-sidecar/src/service/sender.rs +++ b/datadog-sidecar/src/service/sender.rs @@ -353,34 +353,6 @@ impl SidecarSender { .try_send_enqueue_actions(instance_id, queue_id, actions); } - /// Reliably enqueue actions: NO load-shedding, NO silent drop. - /// - /// Builds the SAME `EnqueueActions` request as [`enqueue_actions`] / - /// `try_send_enqueue_actions`, but sends it via the checked, blocking channel - /// path ([`SidecarInterfaceChannel::send_request_blocking`]) so the `io::Result` - /// propagates to the caller instead of being dropped. The outbox is drained - /// (blocking) first so priority state-change messages are not reordered behind - /// this send. - /// - /// Used for one-shot, non-replayed payloads (e.g. FFE flagevaluation batches) - /// that must not be lost under transient backpressure or a broken pipe. A - /// `BrokenPipe`/`ConnectionReset`/`NotConnected` error returned here lets - /// `SidecarTransport::with_retry` reconnect and retry once. - pub fn enqueue_actions_reliable( - &mut self, - instance_id: InstanceId, - queue_id: QueueId, - actions: Vec, - ) -> io::Result<()> { - self.drain_outbox_blocking(); - let req = SidecarInterfaceRequest::EnqueueActions { - instance_id, - queue_id, - actions, - }; - self.channel.send_request_blocking(&req) - } - pub fn send_trace_v04_shm( &mut self, instance_id: InstanceId, diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 04e12b35d4..47af7dbbc6 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -457,7 +457,9 @@ impl SidecarInterface for ConnectionSidecarHandler { ); } } else { - debug!("ffe_flagevaluation_flusher: no session endpoint, dropping batch"); + debug!( + "ffe_flagevaluation_flusher: no session endpoint, dropping batch" + ); } false } From c73a3812af2a1485c521a0315ebfe33f806e4b8a Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Tue, 16 Jun 2026 04:07:06 -0400 Subject: [PATCH 6/8] chore(ffe): apply flagevaluation rustfmt --- datadog-ffe/src/telemetry/flagevaluation.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/datadog-ffe/src/telemetry/flagevaluation.rs b/datadog-ffe/src/telemetry/flagevaluation.rs index 145736d164..0cc6cdd9f0 100644 --- a/datadog-ffe/src/telemetry/flagevaluation.rs +++ b/datadog-ffe/src/telemetry/flagevaluation.rs @@ -190,12 +190,11 @@ pub struct ContextDD { /// Prune evaluation context attributes to satisfy the frozen contract bounds: /// - At most `MAX_CONTEXT_FIELDS` (256) entries are kept. -/// - String values longer than `MAX_FIELD_LENGTH` (256 chars) are **skipped** -/// (not truncated) to avoid partial-data misattribution. -/// - Non-string values (bool, number, null) are kept regardless of -/// their display length. -/// - Keys are iterated in sorted order for deterministic canonical-key -/// stability; the returned `BTreeMap` preserves that order. +/// - String values longer than `MAX_FIELD_LENGTH` (256 chars) are **skipped** (not truncated) to +/// avoid partial-data misattribution. +/// - Non-string values (bool, number, null) are kept regardless of their display length. +/// - Keys are iterated in sorted order for deterministic canonical-key stability; the returned +/// `BTreeMap` preserves that order. /// /// This satisfies reviewer concern #1 (`review:4477935835`). pub fn prune_context( From b576ea799acf5b006c5c564e0d2eb3b0c2a2e543 Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Tue, 16 Jun 2026 20:23:42 -0400 Subject: [PATCH 7/8] coalesce flagevaluation batches in sidecar --- .../src/service/ffe_flagevaluation_flusher.rs | 244 +++++++++++++++++- datadog-sidecar/src/service/sidecar_server.rs | 17 +- 2 files changed, 255 insertions(+), 6 deletions(-) diff --git a/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs b/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs index 0993e66349..9abcea2f0f 100644 --- a/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs +++ b/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs @@ -9,11 +9,16 @@ //! responses are logged at `warn`, network errors at `debug`, and dropped //! (matches dd-trace-go behaviour). No agent capability gate. -use crate::service::FfeFlagEvaluationBatch; +use crate::service::{FfeFlagEvaluationBatch, FfeFlagEvaluationEvent, FfeTelemetryContext}; +use datadog_ffe::telemetry::flagevaluation::{DEGRADED_CAP, GLOBAL_CAP}; use http::uri::PathAndQuery; use http::Method; use libdd_capabilities::{Bytes, HttpClientCapability, SleepCapability}; +use libdd_capabilities_impl::NativeCapabilities; use libdd_common::Endpoint; +use libdd_common::MutexExt; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; use std::time::Duration; use tracing::{debug, warn}; @@ -25,6 +30,212 @@ pub(crate) const EVP_SUBDOMAIN_HEADER: &str = "X-Datadog-EVP-Subdomain"; pub(crate) const EVP_SUBDOMAIN_VALUE: &str = "event-platform-intake"; const USER_AGENT: &str = concat!("ddtrace-sidecar/", env!("CARGO_PKG_VERSION")); +const COALESCE_DELAY: Duration = Duration::from_millis(250); +const MAX_PENDING_BUCKETS: usize = GLOBAL_CAP + DEGRADED_CAP; + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +struct DestinationKey { + url: String, + timeout_ms: u64, + test_token: Option, + use_system_resolver: bool, + context: FfeTelemetryContext, +} + +impl DestinationKey { + fn new(endpoint: &Endpoint, context: &FfeTelemetryContext) -> Self { + Self { + url: endpoint.url.to_string(), + timeout_ms: endpoint.timeout_ms, + test_token: endpoint.test_token.as_ref().map(|s| s.to_string()), + use_system_resolver: endpoint.use_system_resolver, + context: context.clone(), + } + } +} + +#[derive(Clone, Debug, Eq, Hash, PartialEq)] +struct EventKey { + flag_key: String, + variant_key: Option, + allocation_key: Option, + targeting_rule_key: Option, + targeting_key: Option, + context_evaluation: Option, + context_dd_service: Option, + error_message: Option, + runtime_default_used: bool, +} + +impl EventKey { + fn new(event: &FfeFlagEvaluationEvent) -> Self { + Self { + flag_key: event.flag.key.clone(), + variant_key: event.variant.as_ref().map(|v| v.key.clone()), + allocation_key: event.allocation.as_ref().map(|a| a.key.clone()), + targeting_rule_key: event.targeting_rule.as_ref().map(|r| r.key.clone()), + targeting_key: event.targeting_key.clone(), + context_evaluation: event + .context + .as_ref() + .and_then(|context| context.evaluation.clone()), + context_dd_service: event + .context + .as_ref() + .and_then(|context| context.dd.as_ref().map(|dd| dd.service.clone())), + error_message: event.error.as_ref().map(|e| e.message.clone()), + runtime_default_used: event.runtime_default_used, + } + } +} + +struct PendingDestination { + endpoint: Endpoint, + context: FfeTelemetryContext, + events: HashMap, +} + +#[derive(Default)] +struct CoalescerState { + destinations: HashMap, + flush_running: bool, + pending_bucket_count: usize, + dropped_overflow: u64, +} + +#[derive(Clone, Default)] +pub(crate) struct FlagEvaluationCoalescer { + state: Arc>, +} + +impl FlagEvaluationCoalescer { + pub(crate) fn enqueue( + &self, + client: NativeCapabilities, + endpoint: Endpoint, + batch: FfeFlagEvaluationBatch, + ) { + if batch.flag_evaluations.is_empty() { + return; + } + + let mut state = self.state.lock_or_panic(); + let destination_key = DestinationKey::new(&endpoint, &batch.context); + state + .destinations + .entry(destination_key.clone()) + .or_insert_with(|| PendingDestination { + endpoint, + context: batch.context, + events: HashMap::new(), + }); + + for event in batch.flag_evaluations { + let key = EventKey::new(&event); + let merged = { + let pending = state + .destinations + .get_mut(&destination_key) + .expect("destination was inserted before event merge"); + if let Some(existing) = pending.events.get_mut(&key) { + merge_event(existing, &event); + true + } else { + false + } + }; + if merged { + continue; + } + + if state.pending_bucket_count >= MAX_PENDING_BUCKETS { + state.dropped_overflow = state.dropped_overflow.saturating_add(1); + continue; + } + + state + .destinations + .get_mut(&destination_key) + .expect("destination was inserted before event insert") + .events + .insert(key, event); + state.pending_bucket_count += 1; + } + + if !state.flush_running { + state.flush_running = true; + let coalescer = self.clone(); + tokio::spawn(async move { + coalescer.flush_loop(client).await; + }); + } + } + + pub(crate) async fn flush_now(&self, client: NativeCapabilities) { + let batches = self.take_batches(); + futures::future::join_all(batches.into_iter().map(|(endpoint, batch)| { + let client = client.clone(); + async move { send_batch(&client, &endpoint, batch).await } + })) + .await; + } + + async fn flush_loop(self, client: NativeCapabilities) { + loop { + tokio::time::sleep(COALESCE_DELAY).await; + let batches = self.take_batches(); + futures::future::join_all(batches.into_iter().map(|(endpoint, batch)| { + let client = client.clone(); + async move { send_batch(&client, &endpoint, batch).await } + })) + .await; + + let mut state = self.state.lock_or_panic(); + if state.destinations.is_empty() { + state.flush_running = false; + break; + } + } + } + + fn take_batches(&self) -> Vec<(Endpoint, FfeFlagEvaluationBatch)> { + let mut state = self.state.lock_or_panic(); + if state.dropped_overflow > 0 { + warn!( + "ffe_flagevaluation_flusher: dropped {} pending bucket(s) after sidecar coalescer cap", + state.dropped_overflow + ); + state.dropped_overflow = 0; + } + + let destinations = std::mem::take(&mut state.destinations); + state.pending_bucket_count = 0; + destinations + .into_values() + .filter_map(|pending| { + if pending.events.is_empty() { + return None; + } + Some(( + pending.endpoint, + FfeFlagEvaluationBatch { + context: pending.context, + flag_evaluations: pending.events.into_values().collect(), + }, + )) + }) + .collect() + } +} + +fn merge_event(existing: &mut FfeFlagEvaluationEvent, incoming: &FfeFlagEvaluationEvent) { + existing.timestamp = existing.timestamp.max(incoming.timestamp); + existing.first_evaluation = existing.first_evaluation.min(incoming.first_evaluation); + existing.last_evaluation = existing.last_evaluation.max(incoming.last_evaluation); + existing.evaluation_count = existing + .evaluation_count + .saturating_add(incoming.evaluation_count); +} /// Build the FFE flagevaluation endpoint from a session's agent base endpoint. /// Overrides only the path (`/evp_proxy/v2/api/v2/flagevaluations`), preserving @@ -563,6 +774,37 @@ mod tests { assert_eq!(mock.calls_async().await, 1); } + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn coalesces_identical_batches_before_posting() { + let server = MockServer::start_async().await; + let mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST) + .path(EVP_FLAGEVALUATIONS_PATH) + .body_includes("\"evaluation_count\":10"); + then.status(202); + }) + .await; + + let base = endpoint_for(&server); + let ep = flagevaluation_endpoint(&base).unwrap(); + let client = NativeCapabilities::new_client(); + let coalescer = FlagEvaluationCoalescer::default(); + + coalescer.enqueue(client.clone(), ep.clone(), batch()); + coalescer.enqueue(client.clone(), ep, batch()); + + for _ in 0..100 { + if mock.calls_async().await == 1 { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + + mock.assert_calls_async(1).await; + } + #[tokio::test] #[cfg_attr(miri, ignore)] async fn non_2xx_does_not_panic() { diff --git a/datadog-sidecar/src/service/sidecar_server.rs b/datadog-sidecar/src/service/sidecar_server.rs index 185104b507..90fa0e29d4 100644 --- a/datadog-sidecar/src/service/sidecar_server.rs +++ b/datadog-sidecar/src/service/sidecar_server.rs @@ -118,6 +118,8 @@ pub struct SidecarServer { pub(crate) ffe_http_client: NativeCapabilities, /// Sidecar-owned exposure cache, shared across sessions/connections. pub(crate) ffe_exposure_deduplicator: ffe_exposures_flusher::ExposureDeduplicator, + /// Sidecar-owned EVP flagevaluation coalescer, shared across PHP request lifetimes. + pub(crate) ffe_flagevaluation_coalescer: ffe_flagevaluation_flusher::FlagEvaluationCoalescer, } /// Per-connection handler wrapper that tracks sessions/instances for cleanup on disconnect. @@ -451,11 +453,11 @@ impl SidecarInterface for ConnectionSidecarHandler { if let Some(base) = trace_config.endpoint.as_ref() { if let Some(ep) = ffe_flagevaluation_flusher::flagevaluation_endpoint(base) { - let batch = batch.clone(); - let client = ffe_http_client.clone(); - tokio::spawn(async move { - ffe_flagevaluation_flusher::send_batch(&client, &ep, batch).await; - }); + self.server.ffe_flagevaluation_coalescer.enqueue( + ffe_http_client.clone(), + ep, + batch.clone(), + ); } else { debug!( "ffe_flagevaluation_flusher: could not derive endpoint, dropping batch" @@ -1082,6 +1084,11 @@ impl SidecarInterface for ConnectionSidecarHandler { } async fn flush(&self, _peer: PeerCredentials, options: SidecarFlushOptions) { + self.server + .ffe_flagevaluation_coalescer + .flush_now(self.server.ffe_http_client.clone()) + .await; + if options.traces_and_stats { let flusher = self.server.trace_flusher.clone(); if let Err(e) = tokio::spawn(async move { flusher.flush().await }).await { From 53f81e56510f9981dd7db95a9dc7dec592cb9678 Mon Sep 17 00:00:00 2001 From: Leo Romanovsky Date: Wed, 17 Jun 2026 02:54:49 -0400 Subject: [PATCH 8/8] fix(ffe): bound flagevaluation sidecar delivery --- .../src/service/ffe_flagevaluation_flusher.rs | 225 +++++++++++++++--- 1 file changed, 193 insertions(+), 32 deletions(-) diff --git a/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs b/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs index 9abcea2f0f..c3324dd3a7 100644 --- a/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs +++ b/datadog-sidecar/src/service/ffe_flagevaluation_flusher.rs @@ -10,7 +10,7 @@ //! (matches dd-trace-go behaviour). No agent capability gate. use crate::service::{FfeFlagEvaluationBatch, FfeFlagEvaluationEvent, FfeTelemetryContext}; -use datadog_ffe::telemetry::flagevaluation::{DEGRADED_CAP, GLOBAL_CAP}; +use datadog_ffe::telemetry::flagevaluation::{DEGRADED_CAP, GLOBAL_CAP, PER_FLAG_CAP}; use http::uri::PathAndQuery; use http::Method; use libdd_capabilities::{Bytes, HttpClientCapability, SleepCapability}; @@ -32,6 +32,7 @@ pub(crate) const EVP_SUBDOMAIN_VALUE: &str = "event-platform-intake"; const USER_AGENT: &str = concat!("ddtrace-sidecar/", env!("CARGO_PKG_VERSION")); const COALESCE_DELAY: Duration = Duration::from_millis(250); const MAX_PENDING_BUCKETS: usize = GLOBAL_CAP + DEGRADED_CAP; +const MAX_EVENTS_PER_POST: usize = 512; #[derive(Clone, Debug, Eq, Hash, PartialEq)] struct DestinationKey { @@ -87,6 +88,20 @@ impl EventKey { runtime_default_used: event.runtime_default_used, } } + + fn degraded(event: &FfeFlagEvaluationEvent) -> Self { + Self { + flag_key: event.flag.key.clone(), + variant_key: event.variant.as_ref().map(|v| v.key.clone()), + allocation_key: event.allocation.as_ref().map(|a| a.key.clone()), + targeting_rule_key: event.targeting_rule.as_ref().map(|r| r.key.clone()), + targeting_key: None, + context_evaluation: None, + context_dd_service: None, + error_message: event.error.as_ref().map(|e| e.message.clone()), + runtime_default_used: event.runtime_default_used, + } + } } struct PendingDestination { @@ -100,6 +115,9 @@ struct CoalescerState { destinations: HashMap, flush_running: bool, pending_bucket_count: usize, + full_bucket_count: usize, + full_bucket_count_by_flag: HashMap, + degraded_bucket_count: usize, dropped_overflow: u64, } @@ -130,36 +148,42 @@ impl FlagEvaluationCoalescer { events: HashMap::new(), }); - for event in batch.flag_evaluations { + for mut event in batch.flag_evaluations { let key = EventKey::new(&event); - let merged = { - let pending = state - .destinations - .get_mut(&destination_key) - .expect("destination was inserted before event merge"); - if let Some(existing) = pending.events.get_mut(&key) { - merge_event(existing, &event); - true - } else { - false - } - }; - if merged { + if merge_pending_event(&mut state, &destination_key, &key, &event) { + continue; + } + + let flag_key = event.flag.key.clone(); + let full_bucket_count_for_flag = state + .full_bucket_count_by_flag + .get(&flag_key) + .copied() + .unwrap_or(0); + + if state.full_bucket_count < GLOBAL_CAP && full_bucket_count_for_flag < PER_FLAG_CAP { + insert_pending_event(&mut state, &destination_key, key, event); + state.full_bucket_count += 1; + *state.full_bucket_count_by_flag.entry(flag_key).or_default() += 1; + continue; + } + + event.targeting_key = None; + event.context = None; + let degraded_key = EventKey::degraded(&event); + if merge_pending_event(&mut state, &destination_key, °raded_key, &event) { continue; } - if state.pending_bucket_count >= MAX_PENDING_BUCKETS { + if state.degraded_bucket_count >= DEGRADED_CAP + || state.pending_bucket_count >= MAX_PENDING_BUCKETS + { state.dropped_overflow = state.dropped_overflow.saturating_add(1); continue; } - state - .destinations - .get_mut(&destination_key) - .expect("destination was inserted before event insert") - .events - .insert(key, event); - state.pending_bucket_count += 1; + insert_pending_event(&mut state, &destination_key, degraded_key, event); + state.degraded_bucket_count += 1; } if !state.flush_running { @@ -210,6 +234,9 @@ impl FlagEvaluationCoalescer { let destinations = std::mem::take(&mut state.destinations); state.pending_bucket_count = 0; + state.full_bucket_count = 0; + state.full_bucket_count_by_flag.clear(); + state.degraded_bucket_count = 0; destinations .into_values() .filter_map(|pending| { @@ -228,6 +255,39 @@ impl FlagEvaluationCoalescer { } } +fn merge_pending_event( + state: &mut CoalescerState, + destination_key: &DestinationKey, + key: &EventKey, + event: &FfeFlagEvaluationEvent, +) -> bool { + let pending = state + .destinations + .get_mut(destination_key) + .expect("destination was inserted before event merge"); + if let Some(existing) = pending.events.get_mut(key) { + merge_event(existing, event); + true + } else { + false + } +} + +fn insert_pending_event( + state: &mut CoalescerState, + destination_key: &DestinationKey, + key: EventKey, + event: FfeFlagEvaluationEvent, +) { + state + .destinations + .get_mut(destination_key) + .expect("destination was inserted before event insert") + .events + .insert(key, event); + state.pending_bucket_count += 1; +} + fn merge_event(existing: &mut FfeFlagEvaluationEvent, incoming: &FfeFlagEvaluationEvent) { existing.timestamp = existing.timestamp.max(incoming.timestamp); existing.first_evaluation = existing.first_evaluation.min(incoming.first_evaluation); @@ -263,14 +323,31 @@ pub(crate) async fn send_batch( endpoint: &Endpoint, batch: FfeFlagEvaluationBatch, ) { - let payload = match build_payload(&batch) { - Ok(p) => p, - Err(e) => { - debug!("ffe_flagevaluation_flusher: failed to encode batch payload: {e:?}"); - return; - } - }; - send_payload(client, endpoint, payload).await; + for chunk in split_batch_for_post(batch) { + let payload = match build_payload(&chunk) { + Ok(p) => p, + Err(e) => { + debug!("ffe_flagevaluation_flusher: failed to encode batch payload: {e:?}"); + return; + } + }; + send_payload(client, endpoint, payload).await; + } +} + +fn split_batch_for_post(batch: FfeFlagEvaluationBatch) -> Vec { + let FfeFlagEvaluationBatch { + context, + flag_evaluations, + } = batch; + + flag_evaluations + .chunks(MAX_EVENTS_PER_POST) + .map(|chunk| FfeFlagEvaluationBatch { + context: context.clone(), + flag_evaluations: chunk.to_vec(), + }) + .collect() } /// Build the EVP POST body from a batch. @@ -461,7 +538,7 @@ mod tests { use crate::service::{FfeFlagEvaluationBatch, FfeTelemetryContext}; use datadog_ffe::telemetry::flagevaluation::{ AllocationKey, ContextDD, EvalError, FfeFlagEvaluationEvent, FlagEvalEventContext, FlagKey, - TargetingRuleKey, VariantKey, + TargetingRuleKey, VariantKey, PER_FLAG_CAP, }; use httpmock::MockServer; use libdd_capabilities::{HttpError, MaybeSend}; @@ -774,6 +851,32 @@ mod tests { assert_eq!(mock.calls_async().await, 1); } + #[tokio::test] + #[cfg_attr(miri, ignore)] + async fn splits_large_batches_before_posting() { + let server = MockServer::start_async().await; + let mock = server + .mock_async(|when, then| { + when.method(httpmock::Method::POST) + .path(EVP_FLAGEVALUATIONS_PATH) + .header(EVP_SUBDOMAIN_HEADER, EVP_SUBDOMAIN_VALUE) + .header("content-type", "application/json"); + then.status(202); + }) + .await; + + let base = endpoint_for(&server); + let ep = flagevaluation_endpoint(&base).unwrap(); + let client = NativeCapabilities::new_client(); + let mut batch = batch(); + let event = batch.flag_evaluations[0].clone(); + batch.flag_evaluations = vec![event; MAX_EVENTS_PER_POST * 2 + 1]; + + send_batch(&client, &ep, batch).await; + + mock.assert_calls_async(3).await; + } + #[tokio::test] #[cfg_attr(miri, ignore)] async fn coalesces_identical_batches_before_posting() { @@ -805,6 +908,64 @@ mod tests { mock.assert_calls_async(1).await; } + #[test] + fn coalescer_degrades_after_per_flag_cap() { + let endpoint = Endpoint { + url: "http://agent:8126".parse().unwrap(), + ..Endpoint::default() + }; + let ep = flagevaluation_endpoint(&endpoint).unwrap(); + let coalescer = FlagEvaluationCoalescer::default(); + coalescer.state.lock().unwrap().flush_running = true; + + let mut events = Vec::with_capacity(PER_FLAG_CAP + 50); + for index in 0..(PER_FLAG_CAP + 50) { + let mut event = full_event(); + event.evaluation_count = 1; + event.targeting_key = Some(format!("user-{index}")); + events.push(event); + } + + coalescer.enqueue( + NativeCapabilities::new_client(), + ep, + FfeFlagEvaluationBatch { + context: context(), + flag_evaluations: events, + }, + ); + + let batches = coalescer.take_batches(); + assert_eq!(batches.len(), 1); + let events = &batches[0].1.flag_evaluations; + let full_events = events + .iter() + .filter(|event| event.targeting_key.is_some() || event.context.is_some()) + .count(); + let degraded = events + .iter() + .find(|event| event.targeting_key.is_none() && event.context.is_none()) + .expect("overflow must be folded into a degraded bucket"); + + assert_eq!(full_events, PER_FLAG_CAP); + assert_eq!(degraded.evaluation_count, 50); + assert_eq!( + degraded.variant.as_ref().map(|v| v.key.as_str()), + Some("on") + ); + assert_eq!( + degraded.allocation.as_ref().map(|a| a.key.as_str()), + Some("alloc-a") + ); + assert_eq!( + degraded + .targeting_rule + .as_ref() + .map(|rule| rule.key.as_str()), + Some("rule-1") + ); + } + #[tokio::test] #[cfg_attr(miri, ignore)] async fn non_2xx_does_not_panic() {