diff --git a/Cargo.lock b/Cargo.lock index 8d5c645efc..78aab11c87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3550,6 +3550,7 @@ dependencies = [ "bytes", "bytesize", "chrono", + "criterion", "datadog-protos", "ddsketch", "faster-hex", diff --git a/bin/agent-data-plane/src/cli/run.rs b/bin/agent-data-plane/src/cli/run.rs index e863783a38..b1344cf8e0 100644 --- a/bin/agent-data-plane/src/cli/run.rs +++ b/bin/agent-data-plane/src/cli/run.rs @@ -23,7 +23,7 @@ use saluki_components::{ transforms::{ AggregateConfiguration, ApmStatsTransformConfiguration, ChainedConfiguration, DogstatsDMapperConfiguration, DogstatsDPrefixFilterConfiguration, HostEnrichmentConfiguration, HostTagsConfiguration, - TraceObfuscationConfiguration, TraceSamplerConfiguration, + TagFilterlistConfiguration, TraceObfuscationConfiguration, TraceSamplerConfiguration, }, }; use saluki_config::{ConfigurationLoader, GenericConfiguration}; @@ -414,6 +414,8 @@ async fn add_dsd_pipeline_to_blueprint( let dsd_mapper_config = DogstatsDMapperConfiguration::from_configuration(config)?; let dsd_enrich_config = ChainedConfiguration::default().with_transform_builder("dogstatsd_mapper", dsd_mapper_config); + let dsd_tag_filterlist_config = TagFilterlistConfiguration::from_configuration(config) + .error_context("Failed to configure metric tag filterlist transform.")?; let dsd_agg_config = AggregateConfiguration::from_configuration(config).error_context("Failed to configure aggregate transform.")?; let dd_events_config = DatadogEventsConfiguration::from_configuration(config) @@ -428,6 +430,7 @@ async fn add_dsd_pipeline_to_blueprint( .add_source("dsd_in", dsd_config)? .add_transform("dsd_prefix_filter", dsd_prefix_filter_configuration)? .add_transform("dsd_enrich", dsd_enrich_config)? + .add_transform("dsd_tag_filterlist", dsd_tag_filterlist_config)? .add_transform("dsd_agg", dsd_agg_config)? .add_encoder("dd_events_encode", dd_events_config)? .add_encoder("dd_service_checks_encode", dd_service_checks_config)? @@ -435,7 +438,8 @@ async fn add_dsd_pipeline_to_blueprint( // Metrics. .connect_component("dsd_prefix_filter", ["dsd_in.metrics"])? .connect_component("dsd_enrich", ["dsd_prefix_filter"])? - .connect_component("dsd_agg", ["dsd_enrich"])? + .connect_component("dsd_tag_filterlist", ["dsd_enrich"])? + .connect_component("dsd_agg", ["dsd_tag_filterlist"])? .connect_component("metrics_enrich", ["dsd_agg"])? .connect_component("dd_service_checks_encode", ["dsd_in.service_checks"])? .connect_component("dd_events_encode", ["dsd_in.events"])? diff --git a/lib/saluki-components/Cargo.toml b/lib/saluki-components/Cargo.toml index 7c0513a31f..ab50bde4f9 100644 --- a/lib/saluki-components/Cargo.toml +++ b/lib/saluki-components/Cargo.toml @@ -77,6 +77,11 @@ tracing = { workspace = true } url = { workspace = true } [dev-dependencies] +criterion = { workspace = true } proptest = { workspace = true } saluki-metrics = { workspace = true, features = ["test"] } test-strategy = { workspace = true } + +[[bench]] +name = "tag_filterlist" +harness = false diff --git a/lib/saluki-components/benches/tag_filterlist.rs b/lib/saluki-components/benches/tag_filterlist.rs new file mode 100644 index 0000000000..d793081f17 --- /dev/null +++ b/lib/saluki-components/benches/tag_filterlist.rs @@ -0,0 +1,412 @@ +use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput}; +use saluki_components::transforms::tag_filterlist::{ + compile_filters, filter_metric_tags, CompiledFilters, FilterAction, MetricTagFilterEntry, +}; +use saluki_context::{ + tags::{Tag, TagSet}, + Context, +}; +use saluki_core::data_model::event::metric::Metric; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn make_tags(n: usize) -> Vec { + (0..n).map(|i| format!("tag{i}:val{i}")).collect() +} + +fn make_tags_static(n: usize) -> Vec<&'static str> { + // Leak allocations so we can hand &'static str to Context::from_static_parts. + // Benchmarks run for a bounded time so the leak is acceptable. + (0..n) + .map(|i| Box::leak(format!("filter_tag_filter_tag_filter_tag_{i}:value_{i}").into_boxed_str()) as &'static str) + .collect() +} + +fn make_origin_tags_static(n: usize) -> Vec<&'static str> { + (0..n) + .map(|i| Box::leak(format!("orig_tag{i}:val{i}").into_boxed_str()) as &'static str) + .collect() +} + +fn distribution_metric(name: &'static str, tags: &[&'static str]) -> Metric { + Metric::distribution(Context::from_static_parts(name, tags), 1.0) +} + +fn distribution_metric_with_origin_tags( + name: &'static str, tags: &[&'static str], origin_tags: &[&'static str], +) -> Metric { + let origin_tag_set: TagSet = origin_tags.iter().map(|s| Tag::from(*s)).collect(); + let context = Context::from_static_parts(name, tags).with_origin_tags(origin_tag_set.into_shared()); + Metric::distribution(context, 1.0) +} + +fn counter_metric(name: &'static str, tags: &[&'static str]) -> Metric { + Metric::counter(Context::from_static_parts(name, tags), 1.0) +} + +fn exclude_filter(metric_name: &str, tag_keys: &[&str]) -> Vec { + vec![MetricTagFilterEntry { + metric_name: metric_name.to_string(), + action: FilterAction::Exclude, + tags: tag_keys.iter().map(|s| s.to_string()).collect(), + }] +} + +fn include_filter(metric_name: &str, tag_keys: &[&str]) -> Vec { + vec![MetricTagFilterEntry { + metric_name: metric_name.to_string(), + action: FilterAction::Include, + tags: tag_keys.iter().map(|s| s.to_string()).collect(), + }] +} + +/// Build tag key names by index, matching the `tag{i}:val{i}` pattern above. +fn tag_keys(indices: &[usize]) -> Vec { + indices.iter().map(|i| format!("tag{i}")).collect() +} + +// --------------------------------------------------------------------------- +// Benchmark: exclude with varying tag-set sizes +// --------------------------------------------------------------------------- + +fn bench_exclude(c: &mut Criterion) { + let cases: &[(&str, usize, &[usize])] = &[ + // at most half of configured keys match the metric's tags (realistic: over-specified configs) + // tag keys ≥ n do not exist on the metric + ("10tags_exclude5", 10, &[0, 2, 10, 12, 14]), // 2/5 match + ( + "10tags_exclude50", + 10, + &[ + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, // 10/50 match + 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, + 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, + ], + ), + ("50tags_exclude5", 50, &[0, 10, 50, 60, 70]), // 2/5 match + ( + "50tags_exclude50", + 50, + &[ + 0, 2, 4, 6, 8, 10, 12, 14, 16, 18, // 25/50 match (even 0-48 ∩ metric) + 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, + 68, // these don't exist on metric + 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, + ], + ), + ("100tags_exclude5", 100, &[0, 50, 100, 110, 120]), // 2/5 match + ( + "100tags_exclude50", + 100, + &[ + 0, 4, 8, 12, 16, 20, 24, 28, 32, 36, // 25/50 match (every 4th 0-96) + 40, 44, 48, 52, 56, 60, 64, 68, 72, 76, 80, 84, 88, 92, 96, 100, 104, 108, 112, 116, 120, 124, 128, + 132, 136, // don't exist + 140, 144, 148, 152, 156, 160, 164, 168, 172, 176, 180, 184, 188, 192, 196, + ], + ), + ]; + + let mut group = c.benchmark_group("tag_filterlist/exclude"); + for (label, n_tags, excluded_indices) in cases { + let tags_static = make_tags_static(*n_tags); + let excluded_keys: Vec = tag_keys(excluded_indices); + let excluded_key_refs: Vec<&str> = excluded_keys.iter().map(|s| s.as_str()).collect(); + let filters: CompiledFilters = compile_filters(&exclude_filter("bench.dist", &excluded_key_refs)); + + group.throughput(Throughput::Elements(1)); + group.bench_function(BenchmarkId::new("", label), |b| { + b.iter_batched( + || distribution_metric("bench.dist", &tags_static), + |mut metric| { + filter_metric_tags(&mut metric, &filters); + metric + }, + BatchSize::SmallInput, + ); + }); + } + group.finish(); +} + +// --------------------------------------------------------------------------- +// Benchmark: include with varying tag-set sizes +// --------------------------------------------------------------------------- + +fn bench_include(c: &mut Criterion) { + let cases: &[(&str, usize, &[usize])] = &[ + ("10tags_include2", 10, &[2, 7]), + ("10tags_include5", 10, &[0, 2, 4, 6, 8]), + ("100tags_include2", 100, &[20, 80]), + ("100tags_include5", 100, &[10, 30, 50, 70, 90]), + ]; + + let mut group = c.benchmark_group("tag_filterlist/include"); + for (label, n_tags, included_indices) in cases { + let tags_static = make_tags_static(*n_tags); + let included_keys: Vec = tag_keys(included_indices); + let included_key_refs: Vec<&str> = included_keys.iter().map(|s| s.as_str()).collect(); + let filters: CompiledFilters = compile_filters(&include_filter("bench.dist", &included_key_refs)); + + group.throughput(Throughput::Elements(1)); + group.bench_function(BenchmarkId::new("", label), |b| { + b.iter_batched( + || distribution_metric("bench.dist", &tags_static), + |mut metric| { + filter_metric_tags(&mut metric, &filters); + metric + }, + BatchSize::SmallInput, + ); + }); + } + group.finish(); +} + +// --------------------------------------------------------------------------- +// Benchmark: fast-path — metric name not in filterlist +// --------------------------------------------------------------------------- + +fn bench_no_match_passthrough(c: &mut Criterion) { + let tags_static = make_tags_static(20); + // Filter is for a different metric name. + let entries = exclude_filter("other.metric", &["tag0", "tag5"]); + let filters = compile_filters(&entries); + + let mut group = c.benchmark_group("tag_filterlist/passthrough"); + group.throughput(Throughput::Elements(1)); + group.bench_function("no_match", |b| { + b.iter_batched( + || distribution_metric("bench.dist", &tags_static), + |mut metric| { + filter_metric_tags(&mut metric, &filters); + metric + }, + BatchSize::SmallInput, + ); + }); + + // Non-distribution (counter) — type check is done before calling filter_metric_tags; + // benchmark a simulated path that checks is_sketch() and skips. + let counter_tags = make_tags_static(20); + group.bench_function("non_distribution", |b| { + b.iter_batched( + || counter_metric("bench.dist", &counter_tags), + |metric| { + // Simulate the guard in Transform::run: only call filter_metric_tags for sketches. + if metric.values().is_sketch() { + unreachable!("counter is not a sketch"); + } + metric + }, + BatchSize::SmallInput, + ); + }); + group.finish(); +} + +// --------------------------------------------------------------------------- +// Benchmark: filterlist table size (hash lookup cost) +// --------------------------------------------------------------------------- + +fn bench_filterlist_size(c: &mut Criterion) { + let tags_static = make_tags_static(10); + let excluded_keys = ["tag0", "tag5"]; + + // Build filterlist with N entries; the matching entry is always last. + let sizes = [1usize, 10, 100]; + + let mut group = c.benchmark_group("tag_filterlist/filterlist_size"); + for &n in &sizes { + group.throughput(Throughput::Elements(1)); + + // Build entries: n-1 non-matching entries + 1 matching entry. + let entries: Vec = (0..n - 1) + .map(|i| MetricTagFilterEntry { + metric_name: format!("other.metric.{i}"), + action: FilterAction::Exclude, + tags: vec!["tag0".to_string()], + }) + .chain(std::iter::once(MetricTagFilterEntry { + metric_name: "bench.dist".to_string(), + action: FilterAction::Exclude, + tags: excluded_keys.iter().map(|s| s.to_string()).collect(), + })) + .collect(); + + let filters = compile_filters(&entries); + + group.bench_with_input(BenchmarkId::from_parameter(n), &n, |b, _| { + b.iter_batched( + || distribution_metric("bench.dist", &tags_static), + |mut metric| { + filter_metric_tags(&mut metric, &filters); + metric + }, + BatchSize::SmallInput, + ); + }); + } + group.finish(); +} + +// --------------------------------------------------------------------------- +// Benchmark: compile_filters itself (cold-path: RC update) +// --------------------------------------------------------------------------- + +fn bench_compile_filters(c: &mut Criterion) { + let mut group = c.benchmark_group("tag_filterlist/compile"); + + let sizes = [1usize, 10, 100]; + for &n in &sizes { + let entries: Vec = (0..n) + .map(|i| MetricTagFilterEntry { + metric_name: format!("metric.{i}"), + action: FilterAction::Exclude, + tags: make_tags(5) + .into_iter() + .map(|s| s.split(':').next().unwrap().to_string()) + .collect(), + }) + .collect(); + + group.throughput(Throughput::Elements(n as u64)); + group.bench_with_input(BenchmarkId::from_parameter(n), &entries, |b, entries| { + b.iter(|| compile_filters(entries)); + }); + } + group.finish(); +} + +// --------------------------------------------------------------------------- +// Benchmark: origin_tags exclude with varying origin tag-set sizes +// --------------------------------------------------------------------------- + +fn bench_origin_tags_exclude(c: &mut Criterion) { + let sizes = [10usize, 50, 100]; + let tags_static = make_tags_static(5); + + let mut group = c.benchmark_group("tag_filterlist/origin_tags_exclude"); + for &n in &sizes { + let origin_tags_static = make_origin_tags_static(n); + // Exclude the first third of orig_tag keys. + let excluded: Vec = (0..n / 3).map(|i| format!("orig_tag{i}")).collect(); + let excluded_refs: Vec<&str> = excluded.iter().map(|s| s.as_str()).collect(); + let filters = compile_filters(&exclude_filter("bench.dist", &excluded_refs)); + + group.throughput(Throughput::Elements(1)); + group.bench_with_input(BenchmarkId::from_parameter(n), &n, |b, _| { + b.iter_batched( + || distribution_metric_with_origin_tags("bench.dist", &tags_static, &origin_tags_static), + |mut metric| { + filter_metric_tags(&mut metric, &filters); + metric + }, + BatchSize::SmallInput, + ); + }); + } + group.finish(); +} + +// --------------------------------------------------------------------------- +// Benchmark: origin_tags include with varying origin tag-set sizes +// --------------------------------------------------------------------------- + +fn bench_origin_tags_include(c: &mut Criterion) { + let sizes = [10usize, 50, 100]; + let tags_static = make_tags_static(5); + + let mut group = c.benchmark_group("tag_filterlist/origin_tags_include"); + for &n in &sizes { + let origin_tags_static = make_origin_tags_static(n); + // Keep only the first third of orig_tag keys. + let included: Vec = (0..n / 3).map(|i| format!("orig_tag{i}")).collect(); + let included_refs: Vec<&str> = included.iter().map(|s| s.as_str()).collect(); + let filters = compile_filters(&include_filter("bench.dist", &included_refs)); + + group.throughput(Throughput::Elements(1)); + group.bench_with_input(BenchmarkId::from_parameter(n), &n, |b, _| { + b.iter_batched( + || distribution_metric_with_origin_tags("bench.dist", &tags_static, &origin_tags_static), + |mut metric| { + filter_metric_tags(&mut metric, &filters); + metric + }, + BatchSize::SmallInput, + ); + }); + } + group.finish(); +} + +// --------------------------------------------------------------------------- +// Benchmark: origin_tags passthrough (fast path — no origin_tags on metric) +// --------------------------------------------------------------------------- + +fn bench_origin_tags_passthrough(c: &mut Criterion) { + let tags_static = make_tags_static(20); + let filters = compile_filters(&exclude_filter("bench.dist", &["tag0", "tag5"])); + + let mut group = c.benchmark_group("tag_filterlist/origin_tags_passthrough"); + group.throughput(Throughput::Elements(1)); + group.bench_function("no_origin_tags", |b| { + b.iter_batched( + || distribution_metric("bench.dist", &tags_static), + |mut metric| { + // No origin_tags: exercises the fast path in filter_metric_tags. + filter_metric_tags(&mut metric, &filters); + metric + }, + BatchSize::SmallInput, + ); + }); + group.finish(); +} + +// --------------------------------------------------------------------------- +// Benchmark: combined instrumented + origin_tags filtering (single alloc path) +// --------------------------------------------------------------------------- + +fn bench_combined(c: &mut Criterion) { + let tags_static = make_tags_static(50); + let origin_tags_static = make_origin_tags_static(20); + + // Exclude a mix of instrumented tag keys and origin tag keys. + let excluded_keys: Vec = (0..10) + .map(|i| format!("tag{i}")) + .chain((0..5).map(|i| format!("orig_tag{i}"))) + .collect(); + let excluded_refs: Vec<&str> = excluded_keys.iter().map(|s| s.as_str()).collect(); + let filters = compile_filters(&exclude_filter("bench.dist", &excluded_refs)); + + let mut group = c.benchmark_group("tag_filterlist/combined"); + group.throughput(Throughput::Elements(1)); + group.bench_function("50tags_20origin", |b| { + b.iter_batched( + || distribution_metric_with_origin_tags("bench.dist", &tags_static, &origin_tags_static), + |mut metric| { + filter_metric_tags(&mut metric, &filters); + metric + }, + BatchSize::SmallInput, + ); + }); + group.finish(); +} + +criterion_group!( + benches, + bench_exclude, + bench_include, + bench_no_match_passthrough, + bench_filterlist_size, + bench_compile_filters, + bench_origin_tags_exclude, + bench_origin_tags_include, + bench_origin_tags_passthrough, + bench_combined, +); +criterion_main!(benches); diff --git a/lib/saluki-components/src/transforms/mod.rs b/lib/saluki-components/src/transforms/mod.rs index f3b1713236..a74d301584 100644 --- a/lib/saluki-components/src/transforms/mod.rs +++ b/lib/saluki-components/src/transforms/mod.rs @@ -27,5 +27,8 @@ pub use self::trace_sampler::TraceSamplerConfiguration; mod apm_stats; pub use self::apm_stats::ApmStatsTransformConfiguration; +pub mod tag_filterlist; +pub use self::tag_filterlist::TagFilterlistConfiguration; + mod trace_obfuscation; pub use self::trace_obfuscation::TraceObfuscationConfiguration; diff --git a/lib/saluki-components/src/transforms/tag_filterlist/mod.rs b/lib/saluki-components/src/transforms/tag_filterlist/mod.rs new file mode 100644 index 0000000000..4dbadcd22a --- /dev/null +++ b/lib/saluki-components/src/transforms/tag_filterlist/mod.rs @@ -0,0 +1,741 @@ +//! Metric Tag Filterlist synchronous transform. +//! +//! Removes or retains specific tags from distribution metrics based on per-metric configuration. +//! Supports both "exclude" (denylist) and "include" (allowlist) modes. +//! +//! Configuration is read from the `metric_tag_filterlist` key and can be updated at runtime via +//! Remote Config. + +use async_trait::async_trait; +use foldhash::fast::RandomState as FoldHashState; +use hashbrown::{HashMap, HashSet}; +use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; +use saluki_config::GenericConfiguration; +use saluki_context::tags::{SharedTagSet, TagSet}; +use saluki_core::{ + components::{ + transforms::{Transform, TransformBuilder, TransformContext}, + ComponentContext, + }, + data_model::event::EventType, + topology::OutputDefinition, +}; +use saluki_error::GenericError; +use serde::Deserialize; +use tokio::select; +use tracing::debug; + +/// Action applied to the configured tag list: keep only listed tags, or remove listed tags. +#[derive(Clone, Debug, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum FilterAction { + /// Keep only the tags whose key appears in the configured list. + Include, + /// Remove the tags whose key appears in the configured list. + Exclude, +} + +/// A single metric tag filter entry. +#[derive(Clone, Debug, Deserialize)] +pub struct MetricTagFilterEntry { + /// The exact metric name this entry applies to. + pub metric_name: String, + /// Whether to include or exclude the listed tags. + pub action: FilterAction, + /// Tag key names to include or exclude. + pub tags: Vec, +} + +/// Compiled filter table: metric name → (is_exclude, set of tag key names). +pub type CompiledFilters = HashMap), FoldHashState>; + +/// Compile a slice of filter entries into an O(1)-lookup table. +/// +/// Merge rules: +/// - Same metric name + same action → union of tag key sets. +/// - Same metric name + conflicting actions → `exclude` wins. +pub fn compile_filters(entries: &[MetricTagFilterEntry]) -> CompiledFilters { + let mut filters: CompiledFilters = HashMap::with_hasher(FoldHashState::default()); + + for entry in entries { + let is_exclude = entry.action == FilterAction::Exclude; + let mut tag_set = HashSet::with_capacity_and_hasher(entry.tags.len(), FoldHashState::default()); + tag_set.extend(entry.tags.iter().cloned()); + + match filters.entry(entry.metric_name.clone()) { + hashbrown::hash_map::Entry::Vacant(e) => { + e.insert((is_exclude, tag_set)); + } + hashbrown::hash_map::Entry::Occupied(mut e) => { + let (existing_is_exclude, existing_tags) = e.get_mut(); + if *existing_is_exclude == is_exclude { + // Same action: union the tag sets. + existing_tags.extend(tag_set); + } else if is_exclude { + // Conflicting actions: exclude takes precedence. + *existing_is_exclude = true; + *existing_tags = tag_set; + } + // If existing is already exclude and incoming is include: ignore. + } + } + } + + filters +} + +/// Metric Tag Filterlist transform. +/// +/// Removes or retains specific tags from distribution metrics based on per-metric configuration. +/// Configuration is read from `metric_tag_filterlist` and supports runtime updates via Remote Config. +#[derive(Deserialize)] +pub struct TagFilterlistConfiguration { + #[serde(default, rename = "metric_tag_filterlist")] + entries: Vec, + + #[serde(skip)] + configuration: Option, +} + +impl TagFilterlistConfiguration { + /// Creates a new `TagFilterlistConfiguration` from the given configuration. + pub fn from_configuration(config: &GenericConfiguration) -> Result { + let mut typed: Self = config.as_typed()?; + typed.configuration = Some(config.clone()); + Ok(typed) + } +} + +#[async_trait] +impl TransformBuilder for TagFilterlistConfiguration { + fn input_event_type(&self) -> EventType { + EventType::Metric + } + + fn outputs(&self) -> &[OutputDefinition] { + static OUTPUTS: &[OutputDefinition] = &[OutputDefinition::default_output(EventType::Metric)]; + OUTPUTS + } + + async fn build(&self, _context: ComponentContext) -> Result, GenericError> { + Ok(Box::new(TagFilterlist { + filters: compile_filters(&self.entries), + configuration: self + .configuration + .clone() + .expect("configuration must be set via from_configuration"), + })) + } +} + +impl MemoryBounds for TagFilterlistConfiguration { + fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { + builder.minimum().with_single_value::("component struct"); + } +} + +struct TagFilterlist { + filters: CompiledFilters, + configuration: GenericConfiguration, +} + +#[async_trait] +impl Transform for TagFilterlist { + async fn run(mut self: Box, mut context: TransformContext) -> Result<(), GenericError> { + let mut health = context.take_health_handle(); + health.mark_ready(); + + let mut watcher = self.configuration.watch_for_updates("metric_tag_filterlist"); + + debug!("Metric Tag Filterlist transform started."); + + loop { + select! { + _ = health.live() => continue, + maybe_events = context.events().next() => match maybe_events { + Some(mut events) => { + for event in &mut events { + if let Some(metric) = event.try_as_metric_mut() { + if metric.values().is_sketch() { + filter_metric_tags(metric, &self.filters); + } + } + } + if let Err(e) = context.dispatcher().dispatch(events).await { + tracing::error!(error = %e, "Failed to dispatch events."); + } + } + None => break, + }, + (_, new_entries) = watcher.changed::>() => { + self.filters = compile_filters(new_entries.as_deref().unwrap_or(&[])); + debug!("Updated metric tag filterlist."); + }, + } + } + + debug!("Metric Tag Filterlist transform stopped."); + + Ok(()) + } +} + +/// Applies a tag filter to a shared tag set, returning `Some(TagSet)` if any tags were +/// filtered out, or `None` if the result would be identical to the source. +/// +/// Tags whose key is in `names` are excluded when `is_exclude` is true, or kept when false. +/// Constructs a fresh `TagSet` without mutating the source, preserving isolation for +/// metrics that share the same underlying `Arc`. +#[inline] +fn apply_tag_filter(tags: &SharedTagSet, is_exclude: bool, names: &HashSet) -> Option { + let capacity = if is_exclude { + tags.len().saturating_sub(names.len()) + } else { + names.len().min(tags.len()) + }; + let mut out = TagSet::with_capacity(capacity); + let mut any_change = false; + for tag in tags { + if is_exclude != names.contains(tag.name()) { + out.extend([tag.clone()]); + } else { + any_change = true; + } + } + if any_change { + Some(out) + } else { + None + } +} + +/// Filter the tags of a distribution metric according to the compiled filter table. +/// +/// Both instrumented tags and origin tags are filtered using the same tag key list. +/// If the metric name is not present in `filters`, the metric is left unchanged. +/// If filtering would not change any tags, the metric context is left untouched (zero allocations). +#[inline] +pub fn filter_metric_tags(metric: &mut saluki_core::data_model::event::metric::Metric, filters: &CompiledFilters) { + let Some((is_exclude, tag_names)) = filters.get(metric.context().name().as_ref()) else { + return; + }; + + let new_tags = apply_tag_filter(metric.context().tags(), *is_exclude, tag_names); + + if metric.context().origin_tags().is_empty() { + if let Some(filtered) = new_tags { + *metric.context_mut() = metric.context().with_tags(filtered.into_shared()); + } + } else { + let new_origin = apply_tag_filter(metric.context().origin_tags(), *is_exclude, tag_names); + match (new_tags, new_origin) { + (None, None) => {} + (Some(tags), None) => { + *metric.context_mut() = metric.context().with_tags(tags.into_shared()); + } + (None, Some(origin)) => { + *metric.context_mut() = metric.context().with_origin_tags(origin.into_shared()); + } + (Some(tags), Some(origin)) => { + *metric.context_mut() = metric + .context() + .with_tags_and_origin_tags(tags.into_shared(), origin.into_shared()); + } + } + } +} + +#[cfg(test)] +mod tests { + use saluki_config::{dynamic::ConfigUpdate, ConfigurationLoader}; + use saluki_context::{tags::Tag, Context}; + use saluki_core::data_model::event::metric::Metric; + + use super::*; + + fn distribution_metric(name: &'static str, tags: &[&'static str]) -> Metric { + let context = Context::from_static_parts(name, tags); + Metric::distribution(context, 1.0) + } + + fn distribution_metric_with_origin_tags( + name: &'static str, tags: &[&'static str], origin_tags: &[&'static str], + ) -> Metric { + let origin_tag_set: TagSet = origin_tags.iter().map(|s| Tag::from(*s)).collect(); + let context = Context::from_static_parts(name, tags).with_origin_tags(origin_tag_set.into_shared()); + Metric::distribution(context, 1.0) + } + + fn counter_metric(name: &'static str, tags: &[&'static str]) -> Metric { + let context = Context::from_static_parts(name, tags); + Metric::counter(context, 1.0) + } + + fn tag_names(metric: &Metric) -> Vec { + let mut names: Vec<_> = metric + .context() + .tags() + .into_iter() + .map(|t| t.as_str().to_owned()) + .collect(); + names.sort(); + names + } + + #[test] + fn exclude_removes_listed_tags() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string(), "host".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["service:web"]); + } + + #[test] + fn include_keeps_only_listed_tags() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Include, + tags: vec!["env".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["env:prod"]); + } + + #[test] + fn non_matching_metric_unchanged() { + let entries = vec![MetricTagFilterEntry { + metric_name: "other.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn non_distribution_metric_unchanged() { + // compile_filters is correct; caller is responsible for checking is_sketch(). + // Test that a counter is not modified by the transform logic. + let entries = vec![MetricTagFilterEntry { + metric_name: "my.counter".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string()], + }]; + let filters = compile_filters(&entries); + + let metric = counter_metric("my.counter", &["env:prod", "service:web"]); + // filter_metric_tags is only called for is_sketch() metrics; verify counter is unchanged. + assert!(!metric.values().is_sketch(), "counter should not be a sketch"); + // If we did call filter_metric_tags (incorrectly), verify tags are still filtered by name, + // but in practice the transform guard prevents this. + let _ = &filters; // filters compiled fine + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn empty_tag_list_exclude_keeps_all() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec![], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn empty_tag_list_include_removes_all() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Include, + tags: vec![], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert!(tag_names(&metric).is_empty()); + } + + #[test] + fn merge_same_action_unions_tags() { + let entries = vec![ + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string()], + }, + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["host".to_string()], + }, + ]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["service:web"]); + } + + #[test] + fn merge_conflicting_actions_exclude_wins() { + let entries = vec![ + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Include, + tags: vec!["env".to_string()], + }, + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["host".to_string()], + }, + ]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + // Exclude wins: only "host" is removed. + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn merge_conflicting_actions_exclude_first_wins() { + // Same as above but order is reversed: exclude comes first. + let entries = vec![ + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["host".to_string()], + }, + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Include, + tags: vec!["env".to_string()], + }, + ]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn bare_tag_excluded_by_name() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["production".to_string()], + }]; + let filters = compile_filters(&entries); + + // "production" is a bare tag (no colon); tag.name() returns "production". + let mut metric = distribution_metric("my.dist", &["production", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["service:web"]); + } + + #[test] + fn no_config_is_noop() { + let filters = compile_filters(&[]); + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn origin_tags_preserved_after_filtering() { + // Build a context with origin_tags manually via from_inner. + // We simulate origin_tags by verifying that with_tags() on Context preserves origin_tags. + let context = Context::from_static_parts("my.dist", &["env:prod", "host:h1"]); + // with_tags preserves the name; origin_tags are empty for static contexts. + let tag_set: TagSet = [Tag::from("service:web")].into_iter().collect(); + let new_context = context.with_tags(tag_set.into_shared()); + assert_eq!(new_context.name().as_ref(), "my.dist"); + // origin_tags are empty for statically-created contexts. + assert!(new_context.origin_tags().is_empty()); + // Only the new tag survives. + let names: Vec<_> = new_context.tags().into_iter().map(|t| t.as_str().to_owned()).collect(); + assert_eq!(names, vec!["service:web"]); + } + + fn origin_tag_names(metric: &Metric) -> Vec { + let mut names: Vec<_> = metric + .context() + .origin_tags() + .into_iter() + .map(|t| t.as_str().to_owned()) + .collect(); + names.sort(); + names + } + + #[test] + fn exclude_removes_listed_origin_tags() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string(), "host".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = + distribution_metric_with_origin_tags("my.dist", &["env:prod"], &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(origin_tag_names(&metric), vec!["service:web"]); + } + + #[test] + fn include_keeps_only_listed_origin_tags() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Include, + tags: vec!["env".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = + distribution_metric_with_origin_tags("my.dist", &["env:prod"], &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(origin_tag_names(&metric), vec!["env:prod"]); + } + + #[test] + fn origin_tags_empty_unchanged() { + // Fast path: metric has no origin_tags; filtering should still work correctly. + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["service:web"]); + assert!(metric.context().origin_tags().is_empty()); + } + + #[test] + fn filtering_origin_tags_does_not_affect_shared_origin() { + // Two metrics share the same Arc for origin_tags. + // Filtering one must not change the other's origin_tags. + let origin_tag_set: TagSet = ["env:prod", "host:h1", "service:web"] + .iter() + .map(|s| Tag::from(*s)) + .collect(); + let shared_origin = origin_tag_set.into_shared(); + + let ctx1 = Context::from_static_parts("my.dist", &[]).with_origin_tags(shared_origin.clone()); + let ctx2 = Context::from_static_parts("my.dist", &[]).with_origin_tags(shared_origin.clone()); + + let mut metric1 = Metric::distribution(ctx1, 1.0); + let metric2 = Metric::distribution(ctx2, 1.0); + + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string(), "host".to_string()], + }]; + let filters = compile_filters(&entries); + + filter_metric_tags(&mut metric1, &filters); + + // metric1's origin_tags should be filtered. + assert_eq!(origin_tag_names(&metric1), vec!["service:web"]); + // metric2's origin_tags should be unchanged (still shares the original Arc). + let metric2_origin: Vec<_> = metric2 + .context() + .origin_tags() + .into_iter() + .map(|t| t.as_str().to_owned()) + .collect(); + assert!( + metric2_origin.contains(&"env:prod".to_owned()), + "shared origin_tags should not be mutated" + ); + assert!(metric2_origin.contains(&"host:h1".to_owned())); + } + + #[test] + fn combined_tags_and_origin_tags_filtering() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string(), "host".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric_with_origin_tags( + "my.dist", + &["env:prod", "service:web", "host:h1"], + &["env:prod", "host:h1", "region:us-east-1"], + ); + filter_metric_tags(&mut metric, &filters); + + // Both instrumented and origin tags should have env/host removed. + assert_eq!(tag_names(&metric), vec!["service:web"]); + assert_eq!(origin_tag_names(&metric), vec!["region:us-east-1"]); + } + + #[tokio::test] + async fn dynamic_update_partial_replaces_filter() { + // Start with an empty static config to avoid figment merging static and dynamic arrays. + let (cfg, sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await; + let sender = sender.expect("sender should exist"); + sender + .send(ConfigUpdate::Snapshot(serde_json::json!({}))) + .await + .unwrap(); + cfg.ready().await; + + let mut watcher = cfg.watch_for_updates("metric_tag_filterlist"); + + // Push a partial update setting the filter. + sender + .send(ConfigUpdate::Partial { + key: "metric_tag_filterlist".to_string(), + value: serde_json::json!([ + { "metric_name": "my.dist", "action": "exclude", "tags": ["host"] } + ]), + }) + .await + .unwrap(); + + let (_, new_entries) = tokio::time::timeout( + std::time::Duration::from_secs(2), + watcher.changed::>(), + ) + .await + .expect("timed out waiting for metric_tag_filterlist update"); + + let filters = compile_filters(new_entries.as_deref().unwrap_or(&[])); + + // With filter (exclude "host"), "env" should be kept, "host" removed. + let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[tokio::test] + async fn dynamic_update_to_empty_clears_filter() { + // Start with an empty static config to avoid figment merging static and dynamic arrays. + let (cfg, sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await; + let sender = sender.expect("sender should exist"); + sender + .send(ConfigUpdate::Snapshot(serde_json::json!({}))) + .await + .unwrap(); + cfg.ready().await; + + let mut watcher = cfg.watch_for_updates("metric_tag_filterlist"); + + // First, set a filter via partial update. + sender + .send(ConfigUpdate::Partial { + key: "metric_tag_filterlist".to_string(), + value: serde_json::json!([ + { "metric_name": "my.dist", "action": "exclude", "tags": ["env"] } + ]), + }) + .await + .unwrap(); + + // Consume the first update. + let _ = tokio::time::timeout( + std::time::Duration::from_secs(2), + watcher.changed::>(), + ) + .await + .expect("timed out waiting for initial metric_tag_filterlist update"); + + // Now clear the filter. + sender + .send(ConfigUpdate::Partial { + key: "metric_tag_filterlist".to_string(), + value: serde_json::json!([]), + }) + .await + .unwrap(); + + let (_, new_entries) = tokio::time::timeout( + std::time::Duration::from_secs(2), + watcher.changed::>(), + ) + .await + .expect("timed out waiting for cleared metric_tag_filterlist update"); + + let filters = compile_filters(new_entries.as_deref().unwrap_or(&[])); + + // No filters: all tags pass through. + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[tokio::test] + async fn dynamic_update_snapshot_applies_filter() { + let (cfg, sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await; + let sender = sender.expect("sender should exist"); + sender + .send(ConfigUpdate::Snapshot(serde_json::json!({}))) + .await + .unwrap(); + cfg.ready().await; + + let mut watcher = cfg.watch_for_updates("metric_tag_filterlist"); + + // Push a full snapshot that includes the filterlist key. + sender + .send(ConfigUpdate::Snapshot(serde_json::json!({ + "metric_tag_filterlist": [ + { "metric_name": "my.dist", "action": "include", "tags": ["service"] } + ] + }))) + .await + .unwrap(); + + let (_, new_entries) = tokio::time::timeout( + std::time::Duration::from_secs(2), + watcher.changed::>(), + ) + .await + .expect("timed out waiting for metric_tag_filterlist update"); + + let filters = compile_filters(new_entries.as_deref().unwrap_or(&[])); + + // Include "service": only service tag kept. + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]); + filter_metric_tags(&mut metric, &filters); + assert_eq!(tag_names(&metric), vec!["service:web"]); + } +} diff --git a/lib/saluki-context/src/context.rs b/lib/saluki-context/src/context.rs index 857d04bdbd..fca64c49f0 100644 --- a/lib/saluki-context/src/context.rs +++ b/lib/saluki-context/src/context.rs @@ -90,6 +90,63 @@ impl Context { } } + /// Clones this context, and uses the given tags for the cloned context. + /// + /// The name and origin tags of this context are preserved. + pub fn with_tags(&self, tags: SharedTagSet) -> Self { + let name = self.inner.name.clone(); + let origin_tags = self.inner.origin_tags.clone(); + let (key, _) = hash_context(&name, &tags, &origin_tags); + + Self { + inner: Arc::new(ContextInner { + name, + tags, + origin_tags, + key, + active_count: Gauge::noop(), + }), + } + } + + /// Clones this context, and uses the given origin tags for the cloned context. + /// + /// The name and instrumented tags of this context are preserved. + pub fn with_origin_tags(&self, origin_tags: SharedTagSet) -> Self { + let name = self.inner.name.clone(); + let tags = self.inner.tags.clone(); + let (key, _) = hash_context(&name, &tags, &origin_tags); + + Self { + inner: Arc::new(ContextInner { + name, + tags, + origin_tags, + key, + active_count: Gauge::noop(), + }), + } + } + + /// Clones this context, replacing both instrumented tags and origin tags in a single allocation. + /// + /// Preferred over two separate `with_tags` / `with_origin_tags` calls when both sets need to + /// be replaced, as it halves the number of `Arc` allocations. + pub fn with_tags_and_origin_tags(&self, tags: SharedTagSet, origin_tags: SharedTagSet) -> Self { + let name = self.inner.name.clone(); + let (key, _) = hash_context(&name, &tags, &origin_tags); + + Self { + inner: Arc::new(ContextInner { + name, + tags, + origin_tags, + key, + active_count: Gauge::noop(), + }), + } + } + pub(crate) fn from_inner(inner: ContextInner) -> Self { Self { inner: Arc::new(inner) } }