From 53c887b5f79f8d0c534cbb52f01c2fe7c4a8e942 Mon Sep 17 00:00:00 2001 From: Olivier Vielpeau Date: Fri, 13 Mar 2026 19:36:03 +0100 Subject: [PATCH 1/5] feat(saluki-components): add tag_filterlist transform with benchmarks WIP: experimental and not tested e2e Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 1 + bin/agent-data-plane/src/cli/run.rs | 8 +- lib/saluki-components/Cargo.toml | 5 + .../benches/tag_filterlist.rs | 244 ++++++++ lib/saluki-components/src/transforms/mod.rs | 3 + .../src/transforms/tag_filterlist/mod.rs | 562 ++++++++++++++++++ lib/saluki-context/src/context.rs | 19 + 7 files changed, 840 insertions(+), 2 deletions(-) create mode 100644 lib/saluki-components/benches/tag_filterlist.rs create mode 100644 lib/saluki-components/src/transforms/tag_filterlist/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 8d5c645efc..78aab11c87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3550,6 +3550,7 @@ dependencies = [ "bytes", "bytesize", "chrono", + "criterion", "datadog-protos", "ddsketch", "faster-hex", diff --git a/bin/agent-data-plane/src/cli/run.rs b/bin/agent-data-plane/src/cli/run.rs index e863783a38..b1344cf8e0 100644 --- a/bin/agent-data-plane/src/cli/run.rs +++ b/bin/agent-data-plane/src/cli/run.rs @@ -23,7 +23,7 @@ use saluki_components::{ transforms::{ AggregateConfiguration, ApmStatsTransformConfiguration, ChainedConfiguration, DogstatsDMapperConfiguration, DogstatsDPrefixFilterConfiguration, HostEnrichmentConfiguration, HostTagsConfiguration, - TraceObfuscationConfiguration, TraceSamplerConfiguration, + TagFilterlistConfiguration, TraceObfuscationConfiguration, TraceSamplerConfiguration, }, }; use saluki_config::{ConfigurationLoader, GenericConfiguration}; @@ -414,6 +414,8 @@ async fn add_dsd_pipeline_to_blueprint( let dsd_mapper_config = DogstatsDMapperConfiguration::from_configuration(config)?; let dsd_enrich_config = ChainedConfiguration::default().with_transform_builder("dogstatsd_mapper", dsd_mapper_config); + let dsd_tag_filterlist_config = TagFilterlistConfiguration::from_configuration(config) + .error_context("Failed to configure metric tag filterlist transform.")?; let dsd_agg_config = AggregateConfiguration::from_configuration(config).error_context("Failed to configure aggregate transform.")?; let dd_events_config = DatadogEventsConfiguration::from_configuration(config) @@ -428,6 +430,7 @@ async fn add_dsd_pipeline_to_blueprint( .add_source("dsd_in", dsd_config)? .add_transform("dsd_prefix_filter", dsd_prefix_filter_configuration)? .add_transform("dsd_enrich", dsd_enrich_config)? + .add_transform("dsd_tag_filterlist", dsd_tag_filterlist_config)? .add_transform("dsd_agg", dsd_agg_config)? .add_encoder("dd_events_encode", dd_events_config)? .add_encoder("dd_service_checks_encode", dd_service_checks_config)? @@ -435,7 +438,8 @@ async fn add_dsd_pipeline_to_blueprint( // Metrics. .connect_component("dsd_prefix_filter", ["dsd_in.metrics"])? .connect_component("dsd_enrich", ["dsd_prefix_filter"])? - .connect_component("dsd_agg", ["dsd_enrich"])? + .connect_component("dsd_tag_filterlist", ["dsd_enrich"])? + .connect_component("dsd_agg", ["dsd_tag_filterlist"])? .connect_component("metrics_enrich", ["dsd_agg"])? .connect_component("dd_service_checks_encode", ["dsd_in.service_checks"])? .connect_component("dd_events_encode", ["dsd_in.events"])? diff --git a/lib/saluki-components/Cargo.toml b/lib/saluki-components/Cargo.toml index 7c0513a31f..ab50bde4f9 100644 --- a/lib/saluki-components/Cargo.toml +++ b/lib/saluki-components/Cargo.toml @@ -77,6 +77,11 @@ tracing = { workspace = true } url = { workspace = true } [dev-dependencies] +criterion = { workspace = true } proptest = { workspace = true } saluki-metrics = { workspace = true, features = ["test"] } test-strategy = { workspace = true } + +[[bench]] +name = "tag_filterlist" +harness = false diff --git a/lib/saluki-components/benches/tag_filterlist.rs b/lib/saluki-components/benches/tag_filterlist.rs new file mode 100644 index 0000000000..41fb59be1e --- /dev/null +++ b/lib/saluki-components/benches/tag_filterlist.rs @@ -0,0 +1,244 @@ +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use saluki_components::transforms::tag_filterlist::{ + compile_filters, filter_metric_tags, CompiledFilters, FilterAction, MetricTagFilterEntry, +}; +use saluki_context::Context; +use saluki_core::data_model::event::metric::Metric; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +fn make_tags(n: usize) -> Vec { + (0..n).map(|i| format!("tag{i}:val{i}")).collect() +} + +fn make_tags_static(n: usize) -> Vec<&'static str> { + // Leak allocations so we can hand &'static str to Context::from_static_parts. + // Benchmarks run for a bounded time so the leak is acceptable. + (0..n) + .map(|i| Box::leak(format!("tag{i}:val{i}").into_boxed_str()) as &'static str) + .collect() +} + +fn distribution_metric(name: &'static str, tags: &[&'static str]) -> Metric { + Metric::distribution(Context::from_static_parts(name, tags), 1.0) +} + +fn counter_metric(name: &'static str, tags: &[&'static str]) -> Metric { + Metric::counter(Context::from_static_parts(name, tags), 1.0) +} + +fn exclude_filter(metric_name: &str, tag_keys: &[&str]) -> Vec { + vec![MetricTagFilterEntry { + metric_name: metric_name.to_string(), + action: FilterAction::Exclude, + tags: tag_keys.iter().map(|s| s.to_string()).collect(), + }] +} + +fn include_filter(metric_name: &str, tag_keys: &[&str]) -> Vec { + vec![MetricTagFilterEntry { + metric_name: metric_name.to_string(), + action: FilterAction::Include, + tags: tag_keys.iter().map(|s| s.to_string()).collect(), + }] +} + +/// Build tag key names by index, matching the `tag{i}:val{i}` pattern above. +fn tag_keys(indices: &[usize]) -> Vec { + indices.iter().map(|i| format!("tag{i}")).collect() +} + +// --------------------------------------------------------------------------- +// Benchmark: exclude with varying tag-set sizes +// --------------------------------------------------------------------------- + +fn bench_exclude(c: &mut Criterion) { + let cases: &[(&str, usize, &[usize])] = &[ + // at most half of configured keys match the metric's tags (realistic: over-specified configs) + // tag keys ≥ n do not exist on the metric + ("10tags_exclude5", 10, &[0, 2, 10, 12, 14]), // 2/5 match + ("10tags_exclude50", 10, &[0,1,2,3,4,5,6,7,8,9, // 10/50 match + 10,11,12,13,14,15,16,17,18,19, + 20,21,22,23,24,25,26,27,28,29, + 30,31,32,33,34,35,36,37,38,39, + 40,41,42,43,44,45,46,47,48,49]), + ("50tags_exclude5", 50, &[0, 10, 50, 60, 70]), // 2/5 match + ("50tags_exclude50", 50, &[0,2,4,6,8,10,12,14,16,18, // 25/50 match (even 0-48 ∩ metric) + 20,22,24,26,28,30,32,34,36,38, + 40,42,44,46,48, + 50,52,54,56,58,60,62,64,66,68, // these don't exist on metric + 70,72,74,76,78,80,82,84,86,88, + 90,92,94,96,98]), + ("100tags_exclude5", 100, &[0, 50, 100, 110, 120]), // 2/5 match + ("100tags_exclude50", 100, &[0,4,8,12,16,20,24,28,32,36, // 25/50 match (every 4th 0-96) + 40,44,48,52,56,60,64,68,72,76, + 80,84,88,92,96, + 100,104,108,112,116,120,124,128,132,136, // don't exist + 140,144,148,152,156,160,164,168,172,176, + 180,184,188,192,196]), + ]; + + let mut group = c.benchmark_group("tag_filterlist/exclude"); + for (label, n_tags, excluded_indices) in cases { + let tags_static = make_tags_static(*n_tags); + let excluded_keys: Vec = tag_keys(excluded_indices); + let excluded_key_refs: Vec<&str> = excluded_keys.iter().map(|s| s.as_str()).collect(); + let filters: CompiledFilters = compile_filters(&exclude_filter("bench.dist", &excluded_key_refs)); + + group.throughput(Throughput::Elements(1)); + group.bench_function(BenchmarkId::new("", label), |b| { + b.iter(|| { + let mut metric = distribution_metric("bench.dist", &tags_static); + filter_metric_tags(&mut metric, &filters); + metric + }); + }); + } + group.finish(); +} + +// --------------------------------------------------------------------------- +// Benchmark: include with varying tag-set sizes +// --------------------------------------------------------------------------- + +fn bench_include(c: &mut Criterion) { + let cases: &[(&str, usize, &[usize])] = &[ + ("10tags_include2", 10, &[2, 7]), + ("10tags_include5", 10, &[0, 2, 4, 6, 8]), + ("100tags_include2", 100, &[20, 80]), + ("100tags_include5", 100, &[10, 30, 50, 70, 90]), + ]; + + let mut group = c.benchmark_group("tag_filterlist/include"); + for (label, n_tags, included_indices) in cases { + let tags_static = make_tags_static(*n_tags); + let included_keys: Vec = tag_keys(included_indices); + let included_key_refs: Vec<&str> = included_keys.iter().map(|s| s.as_str()).collect(); + let filters: CompiledFilters = compile_filters(&include_filter("bench.dist", &included_key_refs)); + + group.throughput(Throughput::Elements(1)); + group.bench_function(BenchmarkId::new("", label), |b| { + b.iter(|| { + let mut metric = distribution_metric("bench.dist", &tags_static); + filter_metric_tags(&mut metric, &filters); + metric + }); + }); + } + group.finish(); +} + +// --------------------------------------------------------------------------- +// Benchmark: fast-path — metric name not in filterlist +// --------------------------------------------------------------------------- + +fn bench_no_match_passthrough(c: &mut Criterion) { + let tags_static = make_tags_static(20); + // Filter is for a different metric name. + let entries = exclude_filter("other.metric", &["tag0", "tag5"]); + let filters = compile_filters(&entries); + + let mut group = c.benchmark_group("tag_filterlist/passthrough"); + group.throughput(Throughput::Elements(1)); + group.bench_function("no_match", |b| { + b.iter(|| { + let mut metric = distribution_metric("bench.dist", &tags_static); + filter_metric_tags(&mut metric, &filters); + metric + }); + }); + + // Non-distribution (counter) — type check is done before calling filter_metric_tags; + // benchmark a simulated path that checks is_sketch() and skips. + let counter_tags = make_tags_static(20); + group.bench_function("non_distribution", |b| { + b.iter(|| { + let metric = counter_metric("bench.dist", &counter_tags); + // Simulate the guard in Transform::run: only call filter_metric_tags for sketches. + if metric.values().is_sketch() { + unreachable!("counter is not a sketch"); + } + metric + }); + }); + group.finish(); +} + +// --------------------------------------------------------------------------- +// Benchmark: filterlist table size (hash lookup cost) +// --------------------------------------------------------------------------- + +fn bench_filterlist_size(c: &mut Criterion) { + let tags_static = make_tags_static(10); + let excluded_keys = ["tag0", "tag5"]; + + // Build filterlist with N entries; the matching entry is always last. + let sizes = [1usize, 10, 100]; + + let mut group = c.benchmark_group("tag_filterlist/filterlist_size"); + for &n in &sizes { + group.throughput(Throughput::Elements(1)); + + // Build entries: n-1 non-matching entries + 1 matching entry. + let entries: Vec = (0..n - 1) + .map(|i| MetricTagFilterEntry { + metric_name: format!("other.metric.{i}"), + action: FilterAction::Exclude, + tags: vec!["tag0".to_string()], + }) + .chain(std::iter::once(MetricTagFilterEntry { + metric_name: "bench.dist".to_string(), + action: FilterAction::Exclude, + tags: excluded_keys.iter().map(|s| s.to_string()).collect(), + })) + .collect(); + + let filters = compile_filters(&entries); + + group.bench_with_input(BenchmarkId::from_parameter(n), &n, |b, _| { + b.iter(|| { + let mut metric = distribution_metric("bench.dist", &tags_static); + filter_metric_tags(&mut metric, &filters); + metric + }); + }); + } + group.finish(); +} + +// --------------------------------------------------------------------------- +// Benchmark: compile_filters itself (cold-path: RC update) +// --------------------------------------------------------------------------- + +fn bench_compile_filters(c: &mut Criterion) { + let mut group = c.benchmark_group("tag_filterlist/compile"); + + let sizes = [1usize, 10, 100]; + for &n in &sizes { + let entries: Vec = (0..n) + .map(|i| MetricTagFilterEntry { + metric_name: format!("metric.{i}"), + action: FilterAction::Exclude, + tags: make_tags(5).into_iter().map(|s| s.split(':').next().unwrap().to_string()).collect(), + }) + .collect(); + + group.throughput(Throughput::Elements(n as u64)); + group.bench_with_input(BenchmarkId::from_parameter(n), &entries, |b, entries| { + b.iter(|| compile_filters(entries)); + }); + } + group.finish(); +} + +criterion_group!( + benches, + bench_exclude, + bench_include, + bench_no_match_passthrough, + bench_filterlist_size, + bench_compile_filters, +); +criterion_main!(benches); diff --git a/lib/saluki-components/src/transforms/mod.rs b/lib/saluki-components/src/transforms/mod.rs index f3b1713236..a74d301584 100644 --- a/lib/saluki-components/src/transforms/mod.rs +++ b/lib/saluki-components/src/transforms/mod.rs @@ -27,5 +27,8 @@ pub use self::trace_sampler::TraceSamplerConfiguration; mod apm_stats; pub use self::apm_stats::ApmStatsTransformConfiguration; +pub mod tag_filterlist; +pub use self::tag_filterlist::TagFilterlistConfiguration; + mod trace_obfuscation; pub use self::trace_obfuscation::TraceObfuscationConfiguration; diff --git a/lib/saluki-components/src/transforms/tag_filterlist/mod.rs b/lib/saluki-components/src/transforms/tag_filterlist/mod.rs new file mode 100644 index 0000000000..525ee81cd5 --- /dev/null +++ b/lib/saluki-components/src/transforms/tag_filterlist/mod.rs @@ -0,0 +1,562 @@ +//! Metric Tag Filterlist synchronous transform. +//! +//! Removes or retains specific tags from distribution metrics based on per-metric configuration. +//! Supports both "exclude" (denylist) and "include" (allowlist) modes. +//! +//! Configuration is read from the `metric_tag_filterlist` key and can be updated at runtime via +//! Remote Config. + +use async_trait::async_trait; +use hashbrown::{HashMap, HashSet}; +use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; +use saluki_config::GenericConfiguration; +use saluki_context::tags::TagSet; +use saluki_core::{ + components::{ + transforms::{Transform, TransformBuilder, TransformContext}, + ComponentContext, + }, + data_model::event::EventType, + topology::OutputDefinition, +}; +use saluki_error::GenericError; +use serde::Deserialize; +use tokio::select; +use tracing::debug; + +/// Action applied to the configured tag list: keep only listed tags, or remove listed tags. +#[derive(Clone, Debug, Deserialize, PartialEq)] +#[serde(rename_all = "lowercase")] +pub enum FilterAction { + /// Keep only the tags whose key appears in the configured list. + Include, + /// Remove the tags whose key appears in the configured list. + Exclude, +} + +/// A single metric tag filter entry. +#[derive(Clone, Debug, Deserialize)] +pub struct MetricTagFilterEntry { + /// The exact metric name this entry applies to. + pub metric_name: String, + /// Whether to include or exclude the listed tags. + pub action: FilterAction, + /// Tag key names to include or exclude. + pub tags: Vec, +} + +/// Compiled filter table: metric name → (is_exclude, set of tag key names). +pub type CompiledFilters = HashMap)>; + +/// Compile a slice of filter entries into an O(1)-lookup table. +/// +/// Merge rules: +/// - Same metric name + same action → union of tag key sets. +/// - Same metric name + conflicting actions → `exclude` wins. +pub fn compile_filters(entries: &[MetricTagFilterEntry]) -> CompiledFilters { + let mut filters: CompiledFilters = HashMap::new(); + + for entry in entries { + let is_exclude = entry.action == FilterAction::Exclude; + let tag_set: HashSet = entry.tags.iter().cloned().collect(); + + match filters.entry(entry.metric_name.clone()) { + hashbrown::hash_map::Entry::Vacant(e) => { + e.insert((is_exclude, tag_set)); + } + hashbrown::hash_map::Entry::Occupied(mut e) => { + let (existing_is_exclude, existing_tags) = e.get_mut(); + if *existing_is_exclude == is_exclude { + // Same action: union the tag sets. + existing_tags.extend(tag_set); + } else if is_exclude { + // Conflicting actions: exclude takes precedence. + *existing_is_exclude = true; + *existing_tags = tag_set; + } + // If existing is already exclude and incoming is include: ignore. + } + } + } + + filters +} + +/// Metric Tag Filterlist transform. +/// +/// Removes or retains specific tags from distribution metrics based on per-metric configuration. +/// Configuration is read from `metric_tag_filterlist` and supports runtime updates via Remote Config. +#[derive(Deserialize)] +pub struct TagFilterlistConfiguration { + #[serde(default, rename = "metric_tag_filterlist")] + entries: Vec, + + #[serde(skip)] + configuration: Option, +} + +impl TagFilterlistConfiguration { + /// Creates a new `TagFilterlistConfiguration` from the given configuration. + pub fn from_configuration(config: &GenericConfiguration) -> Result { + let mut typed: Self = config.as_typed()?; + typed.configuration = Some(config.clone()); + Ok(typed) + } +} + +#[async_trait] +impl TransformBuilder for TagFilterlistConfiguration { + fn input_event_type(&self) -> EventType { + EventType::Metric + } + + fn outputs(&self) -> &[OutputDefinition] { + static OUTPUTS: &[OutputDefinition] = &[OutputDefinition::default_output(EventType::Metric)]; + OUTPUTS + } + + async fn build(&self, _context: ComponentContext) -> Result, GenericError> { + Ok(Box::new(TagFilterlist { + filters: compile_filters(&self.entries), + configuration: self.configuration.clone().expect("configuration must be set via from_configuration"), + })) + } +} + +impl MemoryBounds for TagFilterlistConfiguration { + fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { + builder.minimum().with_single_value::("component struct"); + } +} + +struct TagFilterlist { + filters: CompiledFilters, + configuration: GenericConfiguration, +} + +#[async_trait] +impl Transform for TagFilterlist { + async fn run(mut self: Box, mut context: TransformContext) -> Result<(), GenericError> { + let mut health = context.take_health_handle(); + health.mark_ready(); + + let mut watcher = self.configuration.watch_for_updates("metric_tag_filterlist"); + + debug!("Metric Tag Filterlist transform started."); + + loop { + select! { + _ = health.live() => continue, + maybe_events = context.events().next() => match maybe_events { + Some(mut events) => { + for event in &mut events { + if let Some(metric) = event.try_as_metric_mut() { + if metric.values().is_sketch() { + filter_metric_tags(metric, &self.filters); + } + } + } + if let Err(e) = context.dispatcher().dispatch(events).await { + tracing::error!(error = %e, "Failed to dispatch events."); + } + } + None => break, + }, + (_, new_entries) = watcher.changed::>() => { + self.filters = compile_filters(new_entries.as_deref().unwrap_or(&[])); + debug!("Updated metric tag filterlist."); + }, + } + } + + debug!("Metric Tag Filterlist transform stopped."); + + Ok(()) + } +} + +/// Filter the tags of a distribution metric according to the compiled filter table. +/// +/// If the metric name is not present in `filters`, the metric is left unchanged. +pub fn filter_metric_tags( + metric: &mut saluki_core::data_model::event::metric::Metric, filters: &CompiledFilters, +) { + let name = metric.context().name().as_ref().to_owned(); + let Some((is_exclude, tag_names)) = filters.get(&name) else { + return; + }; + + let original_tags = metric.context().tags(); + let mut new_tags = TagSet::with_capacity(original_tags.len()); + for tag in original_tags { + let in_list = tag_names.contains(tag.name()); + // XOR: keep if (exclude ∧ not-in-list) ∨ (include ∧ in-list) + if *is_exclude != in_list { + new_tags.insert_tag(tag.clone()); + } + } + let new_context = metric.context().with_tags(new_tags.into_shared()); + *metric.context_mut() = new_context; +} + +#[cfg(test)] +mod tests { + use saluki_config::{dynamic::ConfigUpdate, ConfigurationLoader}; + use saluki_context::Context; + use saluki_core::data_model::event::metric::Metric; + + use super::*; + + fn distribution_metric(name: &'static str, tags: &[&'static str]) -> Metric { + let context = Context::from_static_parts(name, tags); + Metric::distribution(context, 1.0) + } + + fn counter_metric(name: &'static str, tags: &[&'static str]) -> Metric { + let context = Context::from_static_parts(name, tags); + Metric::counter(context, 1.0) + } + + fn tag_names(metric: &Metric) -> Vec { + let mut names: Vec<_> = metric.context().tags().into_iter().map(|t| t.as_str().to_owned()).collect(); + names.sort(); + names + } + + #[test] + fn exclude_removes_listed_tags() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string(), "host".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["service:web"]); + } + + #[test] + fn include_keeps_only_listed_tags() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Include, + tags: vec!["env".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["env:prod"]); + } + + #[test] + fn non_matching_metric_unchanged() { + let entries = vec![MetricTagFilterEntry { + metric_name: "other.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn non_distribution_metric_unchanged() { + // compile_filters is correct; caller is responsible for checking is_sketch(). + // Test that a counter is not modified by the transform logic. + let entries = vec![MetricTagFilterEntry { + metric_name: "my.counter".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string()], + }]; + let filters = compile_filters(&entries); + + let metric = counter_metric("my.counter", &["env:prod", "service:web"]); + // filter_metric_tags is only called for is_sketch() metrics; verify counter is unchanged. + assert!(!metric.values().is_sketch(), "counter should not be a sketch"); + // If we did call filter_metric_tags (incorrectly), verify tags are still filtered by name, + // but in practice the transform guard prevents this. + let _ = &filters; // filters compiled fine + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn empty_tag_list_exclude_keeps_all() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec![], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn empty_tag_list_include_removes_all() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Include, + tags: vec![], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert!(tag_names(&metric).is_empty()); + } + + #[test] + fn merge_same_action_unions_tags() { + let entries = vec![ + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string()], + }, + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["host".to_string()], + }, + ]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["service:web"]); + } + + #[test] + fn merge_conflicting_actions_exclude_wins() { + let entries = vec![ + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Include, + tags: vec!["env".to_string()], + }, + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["host".to_string()], + }, + ]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + // Exclude wins: only "host" is removed. + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn merge_conflicting_actions_exclude_first_wins() { + // Same as above but order is reversed: exclude comes first. + let entries = vec![ + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["host".to_string()], + }, + MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Include, + tags: vec!["env".to_string()], + }, + ]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn bare_tag_excluded_by_name() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["production".to_string()], + }]; + let filters = compile_filters(&entries); + + // "production" is a bare tag (no colon); tag.name() returns "production". + let mut metric = distribution_metric("my.dist", &["production", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["service:web"]); + } + + #[test] + fn no_config_is_noop() { + let filters = compile_filters(&[]); + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[test] + fn origin_tags_preserved_after_filtering() { + use saluki_context::tags::Tag; + + // Build a context with origin_tags manually via from_inner. + // We simulate origin_tags by verifying that with_tags() on Context preserves origin_tags. + let context = Context::from_static_parts("my.dist", &["env:prod", "host:h1"]); + // with_tags preserves the name; origin_tags are empty for static contexts. + let tag_set: saluki_context::tags::TagSet = [Tag::from("service:web")].into_iter().collect(); + let new_context = context.with_tags(tag_set.into_shared()); + assert_eq!(new_context.name().as_ref(), "my.dist"); + // origin_tags are empty for statically-created contexts. + assert!(new_context.origin_tags().is_empty()); + // Only the new tag survives. + let names: Vec<_> = new_context.tags().into_iter().map(|t| t.as_str().to_owned()).collect(); + assert_eq!(names, vec!["service:web"]); + } + + #[tokio::test] + async fn dynamic_update_partial_replaces_filter() { + // Start with an empty static config to avoid figment merging static and dynamic arrays. + let (cfg, sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await; + let sender = sender.expect("sender should exist"); + sender + .send(ConfigUpdate::Snapshot(serde_json::json!({}))) + .await + .unwrap(); + cfg.ready().await; + + let mut watcher = cfg.watch_for_updates("metric_tag_filterlist"); + + // Push a partial update setting the filter. + sender + .send(ConfigUpdate::Partial { + key: "metric_tag_filterlist".to_string(), + value: serde_json::json!([ + { "metric_name": "my.dist", "action": "exclude", "tags": ["host"] } + ]), + }) + .await + .unwrap(); + + let (_, new_entries) = tokio::time::timeout( + std::time::Duration::from_secs(2), + watcher.changed::>(), + ) + .await + .expect("timed out waiting for metric_tag_filterlist update"); + + let filters = compile_filters(new_entries.as_deref().unwrap_or(&[])); + + // With filter (exclude "host"), "env" should be kept, "host" removed. + let mut metric = distribution_metric("my.dist", &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[tokio::test] + async fn dynamic_update_to_empty_clears_filter() { + // Start with an empty static config to avoid figment merging static and dynamic arrays. + let (cfg, sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await; + let sender = sender.expect("sender should exist"); + sender + .send(ConfigUpdate::Snapshot(serde_json::json!({}))) + .await + .unwrap(); + cfg.ready().await; + + let mut watcher = cfg.watch_for_updates("metric_tag_filterlist"); + + // First, set a filter via partial update. + sender + .send(ConfigUpdate::Partial { + key: "metric_tag_filterlist".to_string(), + value: serde_json::json!([ + { "metric_name": "my.dist", "action": "exclude", "tags": ["env"] } + ]), + }) + .await + .unwrap(); + + // Consume the first update. + let _ = tokio::time::timeout( + std::time::Duration::from_secs(2), + watcher.changed::>(), + ) + .await + .expect("timed out waiting for initial metric_tag_filterlist update"); + + // Now clear the filter. + sender + .send(ConfigUpdate::Partial { + key: "metric_tag_filterlist".to_string(), + value: serde_json::json!([]), + }) + .await + .unwrap(); + + let (_, new_entries) = tokio::time::timeout( + std::time::Duration::from_secs(2), + watcher.changed::>(), + ) + .await + .expect("timed out waiting for cleared metric_tag_filterlist update"); + + let filters = compile_filters(new_entries.as_deref().unwrap_or(&[])); + + // No filters: all tags pass through. + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + assert_eq!(tag_names(&metric), vec!["env:prod", "service:web"]); + } + + #[tokio::test] + async fn dynamic_update_snapshot_applies_filter() { + let (cfg, sender) = ConfigurationLoader::for_tests(Some(serde_json::json!({})), None, true).await; + let sender = sender.expect("sender should exist"); + sender + .send(ConfigUpdate::Snapshot(serde_json::json!({}))) + .await + .unwrap(); + cfg.ready().await; + + let mut watcher = cfg.watch_for_updates("metric_tag_filterlist"); + + // Push a full snapshot that includes the filterlist key. + sender + .send(ConfigUpdate::Snapshot(serde_json::json!({ + "metric_tag_filterlist": [ + { "metric_name": "my.dist", "action": "include", "tags": ["service"] } + ] + }))) + .await + .unwrap(); + + let (_, new_entries) = tokio::time::timeout( + std::time::Duration::from_secs(2), + watcher.changed::>(), + ) + .await + .expect("timed out waiting for metric_tag_filterlist update"); + + let filters = compile_filters(new_entries.as_deref().unwrap_or(&[])); + + // Include "service": only service tag kept. + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web", "host:h1"]); + filter_metric_tags(&mut metric, &filters); + assert_eq!(tag_names(&metric), vec!["service:web"]); + } +} diff --git a/lib/saluki-context/src/context.rs b/lib/saluki-context/src/context.rs index 857d04bdbd..851c7192de 100644 --- a/lib/saluki-context/src/context.rs +++ b/lib/saluki-context/src/context.rs @@ -90,6 +90,25 @@ impl Context { } } + /// Clones this context, and uses the given tags for the cloned context. + /// + /// The name and origin tags of this context are preserved. + pub fn with_tags(&self, tags: SharedTagSet) -> Self { + let name = self.inner.name.clone(); + let origin_tags = self.inner.origin_tags.clone(); + let (key, _) = hash_context(&name, &tags, &origin_tags); + + Self { + inner: Arc::new(ContextInner { + name, + tags, + origin_tags, + key, + active_count: Gauge::noop(), + }), + } + } + pub(crate) fn from_inner(inner: ContextInner) -> Self { Self { inner: Arc::new(inner) } } From 2a1b918f0dcbee630e8a261942aec13926c93198 Mon Sep 17 00:00:00 2001 From: Olivier Vielpeau Date: Fri, 13 Mar 2026 20:04:24 +0100 Subject: [PATCH 2/5] feat(saluki-components): filter origin_tags in tag_filterlist transform Apply the same include/exclude tag filter to origin_tags using the existing `tags` list. Adds `with_origin_tags` and `with_tags_and_origin_tags` to `Context`, extracts `apply_tag_filter` helper, fast-paths empty origin_tags, and adds tests + benchmarks. Also, apply formatter. Co-Authored-By: Claude Sonnet 4.6 --- .../benches/tag_filterlist.rs | 189 ++++++++++++++--- .../src/transforms/tag_filterlist/mod.rs | 193 ++++++++++++++++-- lib/saluki-context/src/context.rs | 38 ++++ 3 files changed, 376 insertions(+), 44 deletions(-) diff --git a/lib/saluki-components/benches/tag_filterlist.rs b/lib/saluki-components/benches/tag_filterlist.rs index 41fb59be1e..42501abd14 100644 --- a/lib/saluki-components/benches/tag_filterlist.rs +++ b/lib/saluki-components/benches/tag_filterlist.rs @@ -2,7 +2,10 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Through use saluki_components::transforms::tag_filterlist::{ compile_filters, filter_metric_tags, CompiledFilters, FilterAction, MetricTagFilterEntry, }; -use saluki_context::Context; +use saluki_context::{ + tags::{Tag, TagSet}, + Context, +}; use saluki_core::data_model::event::metric::Metric; // --------------------------------------------------------------------------- @@ -21,10 +24,24 @@ fn make_tags_static(n: usize) -> Vec<&'static str> { .collect() } +fn make_origin_tags_static(n: usize) -> Vec<&'static str> { + (0..n) + .map(|i| Box::leak(format!("orig_tag{i}:val{i}").into_boxed_str()) as &'static str) + .collect() +} + fn distribution_metric(name: &'static str, tags: &[&'static str]) -> Metric { Metric::distribution(Context::from_static_parts(name, tags), 1.0) } +fn distribution_metric_with_origin_tags( + name: &'static str, tags: &[&'static str], origin_tags: &[&'static str], +) -> Metric { + let origin_tag_set: TagSet = origin_tags.iter().map(|s| Tag::from(*s)).collect(); + let context = Context::from_static_parts(name, tags).with_origin_tags(origin_tag_set.into_shared()); + Metric::distribution(context, 1.0) +} + fn counter_metric(name: &'static str, tags: &[&'static str]) -> Metric { Metric::counter(Context::from_static_parts(name, tags), 1.0) } @@ -58,26 +75,38 @@ fn bench_exclude(c: &mut Criterion) { let cases: &[(&str, usize, &[usize])] = &[ // at most half of configured keys match the metric's tags (realistic: over-specified configs) // tag keys ≥ n do not exist on the metric - ("10tags_exclude5", 10, &[0, 2, 10, 12, 14]), // 2/5 match - ("10tags_exclude50", 10, &[0,1,2,3,4,5,6,7,8,9, // 10/50 match - 10,11,12,13,14,15,16,17,18,19, - 20,21,22,23,24,25,26,27,28,29, - 30,31,32,33,34,35,36,37,38,39, - 40,41,42,43,44,45,46,47,48,49]), - ("50tags_exclude5", 50, &[0, 10, 50, 60, 70]), // 2/5 match - ("50tags_exclude50", 50, &[0,2,4,6,8,10,12,14,16,18, // 25/50 match (even 0-48 ∩ metric) - 20,22,24,26,28,30,32,34,36,38, - 40,42,44,46,48, - 50,52,54,56,58,60,62,64,66,68, // these don't exist on metric - 70,72,74,76,78,80,82,84,86,88, - 90,92,94,96,98]), - ("100tags_exclude5", 100, &[0, 50, 100, 110, 120]), // 2/5 match - ("100tags_exclude50", 100, &[0,4,8,12,16,20,24,28,32,36, // 25/50 match (every 4th 0-96) - 40,44,48,52,56,60,64,68,72,76, - 80,84,88,92,96, - 100,104,108,112,116,120,124,128,132,136, // don't exist - 140,144,148,152,156,160,164,168,172,176, - 180,184,188,192,196]), + ("10tags_exclude5", 10, &[0, 2, 10, 12, 14]), // 2/5 match + ( + "10tags_exclude50", + 10, + &[ + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, // 10/50 match + 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, + 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, + ], + ), + ("50tags_exclude5", 50, &[0, 10, 50, 60, 70]), // 2/5 match + ( + "50tags_exclude50", + 50, + &[ + 0, 2, 4, 6, 8, 10, 12, 14, 16, 18, // 25/50 match (even 0-48 ∩ metric) + 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, + 68, // these don't exist on metric + 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, + ], + ), + ("100tags_exclude5", 100, &[0, 50, 100, 110, 120]), // 2/5 match + ( + "100tags_exclude50", + 100, + &[ + 0, 4, 8, 12, 16, 20, 24, 28, 32, 36, // 25/50 match (every 4th 0-96) + 40, 44, 48, 52, 56, 60, 64, 68, 72, 76, 80, 84, 88, 92, 96, 100, 104, 108, 112, 116, 120, 124, 128, + 132, 136, // don't exist + 140, 144, 148, 152, 156, 160, 164, 168, 172, 176, 180, 184, 188, 192, 196, + ], + ), ]; let mut group = c.benchmark_group("tag_filterlist/exclude"); @@ -105,8 +134,8 @@ fn bench_exclude(c: &mut Criterion) { fn bench_include(c: &mut Criterion) { let cases: &[(&str, usize, &[usize])] = &[ - ("10tags_include2", 10, &[2, 7]), - ("10tags_include5", 10, &[0, 2, 4, 6, 8]), + ("10tags_include2", 10, &[2, 7]), + ("10tags_include5", 10, &[0, 2, 4, 6, 8]), ("100tags_include2", 100, &[20, 80]), ("100tags_include5", 100, &[10, 30, 50, 70, 90]), ]; @@ -221,7 +250,10 @@ fn bench_compile_filters(c: &mut Criterion) { .map(|i| MetricTagFilterEntry { metric_name: format!("metric.{i}"), action: FilterAction::Exclude, - tags: make_tags(5).into_iter().map(|s| s.split(':').next().unwrap().to_string()).collect(), + tags: make_tags(5) + .into_iter() + .map(|s| s.split(':').next().unwrap().to_string()) + .collect(), }) .collect(); @@ -233,6 +265,111 @@ fn bench_compile_filters(c: &mut Criterion) { group.finish(); } +// --------------------------------------------------------------------------- +// Benchmark: origin_tags exclude with varying origin tag-set sizes +// --------------------------------------------------------------------------- + +fn bench_origin_tags_exclude(c: &mut Criterion) { + let sizes = [10usize, 50, 100]; + let tags_static = make_tags_static(5); + + let mut group = c.benchmark_group("tag_filterlist/origin_tags_exclude"); + for &n in &sizes { + let origin_tags_static = make_origin_tags_static(n); + // Exclude the first third of orig_tag keys. + let excluded: Vec = (0..n / 3).map(|i| format!("orig_tag{i}")).collect(); + let excluded_refs: Vec<&str> = excluded.iter().map(|s| s.as_str()).collect(); + let filters = compile_filters(&exclude_filter("bench.dist", &excluded_refs)); + + group.throughput(Throughput::Elements(1)); + group.bench_with_input(BenchmarkId::from_parameter(n), &n, |b, _| { + b.iter(|| { + let mut metric = distribution_metric_with_origin_tags("bench.dist", &tags_static, &origin_tags_static); + filter_metric_tags(&mut metric, &filters); + metric + }); + }); + } + group.finish(); +} + +// --------------------------------------------------------------------------- +// Benchmark: origin_tags include with varying origin tag-set sizes +// --------------------------------------------------------------------------- + +fn bench_origin_tags_include(c: &mut Criterion) { + let sizes = [10usize, 50, 100]; + let tags_static = make_tags_static(5); + + let mut group = c.benchmark_group("tag_filterlist/origin_tags_include"); + for &n in &sizes { + let origin_tags_static = make_origin_tags_static(n); + // Keep only the first third of orig_tag keys. + let included: Vec = (0..n / 3).map(|i| format!("orig_tag{i}")).collect(); + let included_refs: Vec<&str> = included.iter().map(|s| s.as_str()).collect(); + let filters = compile_filters(&include_filter("bench.dist", &included_refs)); + + group.throughput(Throughput::Elements(1)); + group.bench_with_input(BenchmarkId::from_parameter(n), &n, |b, _| { + b.iter(|| { + let mut metric = distribution_metric_with_origin_tags("bench.dist", &tags_static, &origin_tags_static); + filter_metric_tags(&mut metric, &filters); + metric + }); + }); + } + group.finish(); +} + +// --------------------------------------------------------------------------- +// Benchmark: origin_tags passthrough (fast path — no origin_tags on metric) +// --------------------------------------------------------------------------- + +fn bench_origin_tags_passthrough(c: &mut Criterion) { + let tags_static = make_tags_static(20); + let filters = compile_filters(&exclude_filter("bench.dist", &["tag0", "tag5"])); + + let mut group = c.benchmark_group("tag_filterlist/origin_tags_passthrough"); + group.throughput(Throughput::Elements(1)); + group.bench_function("no_origin_tags", |b| { + b.iter(|| { + // No origin_tags: exercises the fast path in filter_metric_tags. + let mut metric = distribution_metric("bench.dist", &tags_static); + filter_metric_tags(&mut metric, &filters); + metric + }); + }); + group.finish(); +} + +// --------------------------------------------------------------------------- +// Benchmark: combined instrumented + origin_tags filtering (single alloc path) +// --------------------------------------------------------------------------- + +fn bench_combined(c: &mut Criterion) { + let tags_static = make_tags_static(50); + let origin_tags_static = make_origin_tags_static(20); + + // Exclude a mix of instrumented tag keys and origin tag keys. + let excluded_keys: Vec = (0..10) + .map(|i| format!("tag{i}")) + .chain((0..5).map(|i| format!("orig_tag{i}"))) + .collect(); + let excluded_refs: Vec<&str> = excluded_keys.iter().map(|s| s.as_str()).collect(); + let filters = compile_filters(&exclude_filter("bench.dist", &excluded_refs)); + + let mut group = c.benchmark_group("tag_filterlist/combined"); + group.throughput(Throughput::Elements(1)); + group.bench_function("50tags_20origin", |b| { + b.iter(|| { + let mut metric = distribution_metric_with_origin_tags("bench.dist", &tags_static, &origin_tags_static); + filter_metric_tags(&mut metric, &filters); + metric + }); + }); + group.finish(); +} + criterion_group!( benches, bench_exclude, @@ -240,5 +377,9 @@ criterion_group!( bench_no_match_passthrough, bench_filterlist_size, bench_compile_filters, + bench_origin_tags_exclude, + bench_origin_tags_include, + bench_origin_tags_passthrough, + bench_combined, ); criterion_main!(benches); diff --git a/lib/saluki-components/src/transforms/tag_filterlist/mod.rs b/lib/saluki-components/src/transforms/tag_filterlist/mod.rs index 525ee81cd5..8ecca1322d 100644 --- a/lib/saluki-components/src/transforms/tag_filterlist/mod.rs +++ b/lib/saluki-components/src/transforms/tag_filterlist/mod.rs @@ -10,7 +10,7 @@ use async_trait::async_trait; use hashbrown::{HashMap, HashSet}; use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; use saluki_config::GenericConfiguration; -use saluki_context::tags::TagSet; +use saluki_context::tags::{SharedTagSet, TagSet}; use saluki_core::{ components::{ transforms::{Transform, TransformBuilder, TransformContext}, @@ -118,7 +118,10 @@ impl TransformBuilder for TagFilterlistConfiguration { async fn build(&self, _context: ComponentContext) -> Result, GenericError> { Ok(Box::new(TagFilterlist { filters: compile_filters(&self.entries), - configuration: self.configuration.clone().expect("configuration must be set via from_configuration"), + configuration: self + .configuration + .clone() + .expect("configuration must be set via from_configuration"), })) } } @@ -175,34 +178,51 @@ impl Transform for TagFilterlist { } } +/// Applies a tag filter to a shared tag set, returning a new `TagSet` with the filter applied. +/// +/// Tags whose key is in `names` are excluded when `is_exclude` is true, or kept when false. +/// Always constructs a fresh `TagSet` without mutating the source, preserving isolation for +/// metrics that share the same underlying `Arc`. +fn apply_tag_filter(tags: &SharedTagSet, is_exclude: bool, names: &HashSet) -> TagSet { + let mut out = TagSet::with_capacity(tags.len()); + for tag in tags { + let in_list = names.contains(tag.name()); + // XOR: keep if (exclude ∧ not-in-list) ∨ (include ∧ in-list) + if is_exclude != in_list { + out.insert_tag(tag.clone()); + } + } + out +} + /// Filter the tags of a distribution metric according to the compiled filter table. /// +/// Both instrumented tags and origin tags are filtered using the same tag key list. /// If the metric name is not present in `filters`, the metric is left unchanged. -pub fn filter_metric_tags( - metric: &mut saluki_core::data_model::event::metric::Metric, filters: &CompiledFilters, -) { +pub fn filter_metric_tags(metric: &mut saluki_core::data_model::event::metric::Metric, filters: &CompiledFilters) { let name = metric.context().name().as_ref().to_owned(); let Some((is_exclude, tag_names)) = filters.get(&name) else { return; }; - let original_tags = metric.context().tags(); - let mut new_tags = TagSet::with_capacity(original_tags.len()); - for tag in original_tags { - let in_list = tag_names.contains(tag.name()); - // XOR: keep if (exclude ∧ not-in-list) ∨ (include ∧ in-list) - if *is_exclude != in_list { - new_tags.insert_tag(tag.clone()); - } + let new_tags = apply_tag_filter(metric.context().tags(), *is_exclude, tag_names); + + if metric.context().origin_tags().is_empty() { + // Fast path: no origin_tags to filter; single allocation. + *metric.context_mut() = metric.context().with_tags(new_tags.into_shared()); + } else { + // Filter origin_tags with the same list; single Arc allocation for both. + let new_origin = apply_tag_filter(metric.context().origin_tags(), *is_exclude, tag_names); + *metric.context_mut() = metric + .context() + .with_tags_and_origin_tags(new_tags.into_shared(), new_origin.into_shared()); } - let new_context = metric.context().with_tags(new_tags.into_shared()); - *metric.context_mut() = new_context; } #[cfg(test)] mod tests { use saluki_config::{dynamic::ConfigUpdate, ConfigurationLoader}; - use saluki_context::Context; + use saluki_context::{tags::Tag, Context}; use saluki_core::data_model::event::metric::Metric; use super::*; @@ -212,13 +232,26 @@ mod tests { Metric::distribution(context, 1.0) } + fn distribution_metric_with_origin_tags( + name: &'static str, tags: &[&'static str], origin_tags: &[&'static str], + ) -> Metric { + let origin_tag_set: TagSet = origin_tags.iter().map(|s| Tag::from(*s)).collect(); + let context = Context::from_static_parts(name, tags).with_origin_tags(origin_tag_set.into_shared()); + Metric::distribution(context, 1.0) + } + fn counter_metric(name: &'static str, tags: &[&'static str]) -> Metric { let context = Context::from_static_parts(name, tags); Metric::counter(context, 1.0) } fn tag_names(metric: &Metric) -> Vec { - let mut names: Vec<_> = metric.context().tags().into_iter().map(|t| t.as_str().to_owned()).collect(); + let mut names: Vec<_> = metric + .context() + .tags() + .into_iter() + .map(|t| t.as_str().to_owned()) + .collect(); names.sort(); names } @@ -412,13 +445,11 @@ mod tests { #[test] fn origin_tags_preserved_after_filtering() { - use saluki_context::tags::Tag; - // Build a context with origin_tags manually via from_inner. // We simulate origin_tags by verifying that with_tags() on Context preserves origin_tags. let context = Context::from_static_parts("my.dist", &["env:prod", "host:h1"]); // with_tags preserves the name; origin_tags are empty for static contexts. - let tag_set: saluki_context::tags::TagSet = [Tag::from("service:web")].into_iter().collect(); + let tag_set: TagSet = [Tag::from("service:web")].into_iter().collect(); let new_context = context.with_tags(tag_set.into_shared()); assert_eq!(new_context.name().as_ref(), "my.dist"); // origin_tags are empty for statically-created contexts. @@ -428,6 +459,128 @@ mod tests { assert_eq!(names, vec!["service:web"]); } + fn origin_tag_names(metric: &Metric) -> Vec { + let mut names: Vec<_> = metric + .context() + .origin_tags() + .into_iter() + .map(|t| t.as_str().to_owned()) + .collect(); + names.sort(); + names + } + + #[test] + fn exclude_removes_listed_origin_tags() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string(), "host".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = + distribution_metric_with_origin_tags("my.dist", &["env:prod"], &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(origin_tag_names(&metric), vec!["service:web"]); + } + + #[test] + fn include_keeps_only_listed_origin_tags() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Include, + tags: vec!["env".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = + distribution_metric_with_origin_tags("my.dist", &["env:prod"], &["env:prod", "host:h1", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(origin_tag_names(&metric), vec!["env:prod"]); + } + + #[test] + fn origin_tags_empty_unchanged() { + // Fast path: metric has no origin_tags; filtering should still work correctly. + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric("my.dist", &["env:prod", "service:web"]); + filter_metric_tags(&mut metric, &filters); + + assert_eq!(tag_names(&metric), vec!["service:web"]); + assert!(metric.context().origin_tags().is_empty()); + } + + #[test] + fn filtering_origin_tags_does_not_affect_shared_origin() { + // Two metrics share the same Arc for origin_tags. + // Filtering one must not change the other's origin_tags. + let origin_tag_set: TagSet = ["env:prod", "host:h1", "service:web"] + .iter() + .map(|s| Tag::from(*s)) + .collect(); + let shared_origin = origin_tag_set.into_shared(); + + let ctx1 = Context::from_static_parts("my.dist", &[]).with_origin_tags(shared_origin.clone()); + let ctx2 = Context::from_static_parts("my.dist", &[]).with_origin_tags(shared_origin.clone()); + + let mut metric1 = Metric::distribution(ctx1, 1.0); + let metric2 = Metric::distribution(ctx2, 1.0); + + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string(), "host".to_string()], + }]; + let filters = compile_filters(&entries); + + filter_metric_tags(&mut metric1, &filters); + + // metric1's origin_tags should be filtered. + assert_eq!(origin_tag_names(&metric1), vec!["service:web"]); + // metric2's origin_tags should be unchanged (still shares the original Arc). + let metric2_origin: Vec<_> = metric2 + .context() + .origin_tags() + .into_iter() + .map(|t| t.as_str().to_owned()) + .collect(); + assert!( + metric2_origin.contains(&"env:prod".to_owned()), + "shared origin_tags should not be mutated" + ); + assert!(metric2_origin.contains(&"host:h1".to_owned())); + } + + #[test] + fn combined_tags_and_origin_tags_filtering() { + let entries = vec![MetricTagFilterEntry { + metric_name: "my.dist".to_string(), + action: FilterAction::Exclude, + tags: vec!["env".to_string(), "host".to_string()], + }]; + let filters = compile_filters(&entries); + + let mut metric = distribution_metric_with_origin_tags( + "my.dist", + &["env:prod", "service:web", "host:h1"], + &["env:prod", "host:h1", "region:us-east-1"], + ); + filter_metric_tags(&mut metric, &filters); + + // Both instrumented and origin tags should have env/host removed. + assert_eq!(tag_names(&metric), vec!["service:web"]); + assert_eq!(origin_tag_names(&metric), vec!["region:us-east-1"]); + } + #[tokio::test] async fn dynamic_update_partial_replaces_filter() { // Start with an empty static config to avoid figment merging static and dynamic arrays. diff --git a/lib/saluki-context/src/context.rs b/lib/saluki-context/src/context.rs index 851c7192de..fca64c49f0 100644 --- a/lib/saluki-context/src/context.rs +++ b/lib/saluki-context/src/context.rs @@ -109,6 +109,44 @@ impl Context { } } + /// Clones this context, and uses the given origin tags for the cloned context. + /// + /// The name and instrumented tags of this context are preserved. + pub fn with_origin_tags(&self, origin_tags: SharedTagSet) -> Self { + let name = self.inner.name.clone(); + let tags = self.inner.tags.clone(); + let (key, _) = hash_context(&name, &tags, &origin_tags); + + Self { + inner: Arc::new(ContextInner { + name, + tags, + origin_tags, + key, + active_count: Gauge::noop(), + }), + } + } + + /// Clones this context, replacing both instrumented tags and origin tags in a single allocation. + /// + /// Preferred over two separate `with_tags` / `with_origin_tags` calls when both sets need to + /// be replaced, as it halves the number of `Arc` allocations. + pub fn with_tags_and_origin_tags(&self, tags: SharedTagSet, origin_tags: SharedTagSet) -> Self { + let name = self.inner.name.clone(); + let (key, _) = hash_context(&name, &tags, &origin_tags); + + Self { + inner: Arc::new(ContextInner { + name, + tags, + origin_tags, + key, + active_count: Gauge::noop(), + }), + } + } + pub(crate) fn from_inner(inner: ContextInner) -> Self { Self { inner: Arc::new(inner) } } From 1b7124792a2cb3d183c3a79946db042090fc122e Mon Sep 17 00:00:00 2001 From: Olivier Vielpeau Date: Fri, 13 Mar 2026 21:11:57 +0100 Subject: [PATCH 3/5] perf(tag_filterlist): reduce allocations and use foldhash in hot paths - Use foldhash for CompiledFilters and inner tag HashSets - Avoid String allocation on name lookup (pass &str via Borrow) - Replace insert_tag (O(n) dedup) with extend (direct Vec push) since source tags are already unique - Skip context/Arc allocations when filtering produces no changes Co-Authored-By: Claude Sonnet 4.6 --- .../src/transforms/tag_filterlist/mod.rs | 66 +++++++++++++------ 1 file changed, 46 insertions(+), 20 deletions(-) diff --git a/lib/saluki-components/src/transforms/tag_filterlist/mod.rs b/lib/saluki-components/src/transforms/tag_filterlist/mod.rs index 8ecca1322d..4dbadcd22a 100644 --- a/lib/saluki-components/src/transforms/tag_filterlist/mod.rs +++ b/lib/saluki-components/src/transforms/tag_filterlist/mod.rs @@ -7,6 +7,7 @@ //! Remote Config. use async_trait::async_trait; +use foldhash::fast::RandomState as FoldHashState; use hashbrown::{HashMap, HashSet}; use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; use saluki_config::GenericConfiguration; @@ -46,7 +47,7 @@ pub struct MetricTagFilterEntry { } /// Compiled filter table: metric name → (is_exclude, set of tag key names). -pub type CompiledFilters = HashMap)>; +pub type CompiledFilters = HashMap), FoldHashState>; /// Compile a slice of filter entries into an O(1)-lookup table. /// @@ -54,11 +55,12 @@ pub type CompiledFilters = HashMap)>; /// - Same metric name + same action → union of tag key sets. /// - Same metric name + conflicting actions → `exclude` wins. pub fn compile_filters(entries: &[MetricTagFilterEntry]) -> CompiledFilters { - let mut filters: CompiledFilters = HashMap::new(); + let mut filters: CompiledFilters = HashMap::with_hasher(FoldHashState::default()); for entry in entries { let is_exclude = entry.action == FilterAction::Exclude; - let tag_set: HashSet = entry.tags.iter().cloned().collect(); + let mut tag_set = HashSet::with_capacity_and_hasher(entry.tags.len(), FoldHashState::default()); + tag_set.extend(entry.tags.iter().cloned()); match filters.entry(entry.metric_name.clone()) { hashbrown::hash_map::Entry::Vacant(e) => { @@ -178,44 +180,68 @@ impl Transform for TagFilterlist { } } -/// Applies a tag filter to a shared tag set, returning a new `TagSet` with the filter applied. +/// Applies a tag filter to a shared tag set, returning `Some(TagSet)` if any tags were +/// filtered out, or `None` if the result would be identical to the source. /// /// Tags whose key is in `names` are excluded when `is_exclude` is true, or kept when false. -/// Always constructs a fresh `TagSet` without mutating the source, preserving isolation for +/// Constructs a fresh `TagSet` without mutating the source, preserving isolation for /// metrics that share the same underlying `Arc`. -fn apply_tag_filter(tags: &SharedTagSet, is_exclude: bool, names: &HashSet) -> TagSet { - let mut out = TagSet::with_capacity(tags.len()); +#[inline] +fn apply_tag_filter(tags: &SharedTagSet, is_exclude: bool, names: &HashSet) -> Option { + let capacity = if is_exclude { + tags.len().saturating_sub(names.len()) + } else { + names.len().min(tags.len()) + }; + let mut out = TagSet::with_capacity(capacity); + let mut any_change = false; for tag in tags { - let in_list = names.contains(tag.name()); - // XOR: keep if (exclude ∧ not-in-list) ∨ (include ∧ in-list) - if is_exclude != in_list { - out.insert_tag(tag.clone()); + if is_exclude != names.contains(tag.name()) { + out.extend([tag.clone()]); + } else { + any_change = true; } } - out + if any_change { + Some(out) + } else { + None + } } /// Filter the tags of a distribution metric according to the compiled filter table. /// /// Both instrumented tags and origin tags are filtered using the same tag key list. /// If the metric name is not present in `filters`, the metric is left unchanged. +/// If filtering would not change any tags, the metric context is left untouched (zero allocations). +#[inline] pub fn filter_metric_tags(metric: &mut saluki_core::data_model::event::metric::Metric, filters: &CompiledFilters) { - let name = metric.context().name().as_ref().to_owned(); - let Some((is_exclude, tag_names)) = filters.get(&name) else { + let Some((is_exclude, tag_names)) = filters.get(metric.context().name().as_ref()) else { return; }; let new_tags = apply_tag_filter(metric.context().tags(), *is_exclude, tag_names); if metric.context().origin_tags().is_empty() { - // Fast path: no origin_tags to filter; single allocation. - *metric.context_mut() = metric.context().with_tags(new_tags.into_shared()); + if let Some(filtered) = new_tags { + *metric.context_mut() = metric.context().with_tags(filtered.into_shared()); + } } else { - // Filter origin_tags with the same list; single Arc allocation for both. let new_origin = apply_tag_filter(metric.context().origin_tags(), *is_exclude, tag_names); - *metric.context_mut() = metric - .context() - .with_tags_and_origin_tags(new_tags.into_shared(), new_origin.into_shared()); + match (new_tags, new_origin) { + (None, None) => {} + (Some(tags), None) => { + *metric.context_mut() = metric.context().with_tags(tags.into_shared()); + } + (None, Some(origin)) => { + *metric.context_mut() = metric.context().with_origin_tags(origin.into_shared()); + } + (Some(tags), Some(origin)) => { + *metric.context_mut() = metric + .context() + .with_tags_and_origin_tags(tags.into_shared(), origin.into_shared()); + } + } } } From 1aa35a1543c3b7d1a6b34026ef8df139e40b2023 Mon Sep 17 00:00:00 2001 From: Olivier Vielpeau Date: Tue, 17 Mar 2026 19:45:27 +0100 Subject: [PATCH 4/5] bench(tag_filterlist): exclude metric construction from measured iterations Replace `b.iter` with `b.iter_batched(..., BatchSize::SmallInput)` in all 9 bench functions so that `Metric` construction happens in Criterion's setup phase and is excluded from the timed measurement. Only `filter_metric_tags` (and the `is_sketch()` guard for the non-distribution path) is now measured, giving a cleaner view of the actual transform cost. Co-Authored-By: Claude Sonnet 4.6 --- .../benches/tag_filterlist.rs | 127 +++++++++++------- 1 file changed, 77 insertions(+), 50 deletions(-) diff --git a/lib/saluki-components/benches/tag_filterlist.rs b/lib/saluki-components/benches/tag_filterlist.rs index 42501abd14..055e03cf42 100644 --- a/lib/saluki-components/benches/tag_filterlist.rs +++ b/lib/saluki-components/benches/tag_filterlist.rs @@ -1,4 +1,4 @@ -use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput}; use saluki_components::transforms::tag_filterlist::{ compile_filters, filter_metric_tags, CompiledFilters, FilterAction, MetricTagFilterEntry, }; @@ -118,11 +118,14 @@ fn bench_exclude(c: &mut Criterion) { group.throughput(Throughput::Elements(1)); group.bench_function(BenchmarkId::new("", label), |b| { - b.iter(|| { - let mut metric = distribution_metric("bench.dist", &tags_static); - filter_metric_tags(&mut metric, &filters); - metric - }); + b.iter_batched( + || distribution_metric("bench.dist", &tags_static), + |mut metric| { + filter_metric_tags(&mut metric, &filters); + metric + }, + BatchSize::SmallInput, + ); }); } group.finish(); @@ -149,11 +152,14 @@ fn bench_include(c: &mut Criterion) { group.throughput(Throughput::Elements(1)); group.bench_function(BenchmarkId::new("", label), |b| { - b.iter(|| { - let mut metric = distribution_metric("bench.dist", &tags_static); - filter_metric_tags(&mut metric, &filters); - metric - }); + b.iter_batched( + || distribution_metric("bench.dist", &tags_static), + |mut metric| { + filter_metric_tags(&mut metric, &filters); + metric + }, + BatchSize::SmallInput, + ); }); } group.finish(); @@ -172,25 +178,31 @@ fn bench_no_match_passthrough(c: &mut Criterion) { let mut group = c.benchmark_group("tag_filterlist/passthrough"); group.throughput(Throughput::Elements(1)); group.bench_function("no_match", |b| { - b.iter(|| { - let mut metric = distribution_metric("bench.dist", &tags_static); - filter_metric_tags(&mut metric, &filters); - metric - }); + b.iter_batched( + || distribution_metric("bench.dist", &tags_static), + |mut metric| { + filter_metric_tags(&mut metric, &filters); + metric + }, + BatchSize::SmallInput, + ); }); // Non-distribution (counter) — type check is done before calling filter_metric_tags; // benchmark a simulated path that checks is_sketch() and skips. let counter_tags = make_tags_static(20); group.bench_function("non_distribution", |b| { - b.iter(|| { - let metric = counter_metric("bench.dist", &counter_tags); - // Simulate the guard in Transform::run: only call filter_metric_tags for sketches. - if metric.values().is_sketch() { - unreachable!("counter is not a sketch"); - } - metric - }); + b.iter_batched( + || counter_metric("bench.dist", &counter_tags), + |metric| { + // Simulate the guard in Transform::run: only call filter_metric_tags for sketches. + if metric.values().is_sketch() { + unreachable!("counter is not a sketch"); + } + metric + }, + BatchSize::SmallInput, + ); }); group.finish(); } @@ -227,11 +239,14 @@ fn bench_filterlist_size(c: &mut Criterion) { let filters = compile_filters(&entries); group.bench_with_input(BenchmarkId::from_parameter(n), &n, |b, _| { - b.iter(|| { - let mut metric = distribution_metric("bench.dist", &tags_static); - filter_metric_tags(&mut metric, &filters); - metric - }); + b.iter_batched( + || distribution_metric("bench.dist", &tags_static), + |mut metric| { + filter_metric_tags(&mut metric, &filters); + metric + }, + BatchSize::SmallInput, + ); }); } group.finish(); @@ -283,11 +298,14 @@ fn bench_origin_tags_exclude(c: &mut Criterion) { group.throughput(Throughput::Elements(1)); group.bench_with_input(BenchmarkId::from_parameter(n), &n, |b, _| { - b.iter(|| { - let mut metric = distribution_metric_with_origin_tags("bench.dist", &tags_static, &origin_tags_static); - filter_metric_tags(&mut metric, &filters); - metric - }); + b.iter_batched( + || distribution_metric_with_origin_tags("bench.dist", &tags_static, &origin_tags_static), + |mut metric| { + filter_metric_tags(&mut metric, &filters); + metric + }, + BatchSize::SmallInput, + ); }); } group.finish(); @@ -311,11 +329,14 @@ fn bench_origin_tags_include(c: &mut Criterion) { group.throughput(Throughput::Elements(1)); group.bench_with_input(BenchmarkId::from_parameter(n), &n, |b, _| { - b.iter(|| { - let mut metric = distribution_metric_with_origin_tags("bench.dist", &tags_static, &origin_tags_static); - filter_metric_tags(&mut metric, &filters); - metric - }); + b.iter_batched( + || distribution_metric_with_origin_tags("bench.dist", &tags_static, &origin_tags_static), + |mut metric| { + filter_metric_tags(&mut metric, &filters); + metric + }, + BatchSize::SmallInput, + ); }); } group.finish(); @@ -332,12 +353,15 @@ fn bench_origin_tags_passthrough(c: &mut Criterion) { let mut group = c.benchmark_group("tag_filterlist/origin_tags_passthrough"); group.throughput(Throughput::Elements(1)); group.bench_function("no_origin_tags", |b| { - b.iter(|| { - // No origin_tags: exercises the fast path in filter_metric_tags. - let mut metric = distribution_metric("bench.dist", &tags_static); - filter_metric_tags(&mut metric, &filters); - metric - }); + b.iter_batched( + || distribution_metric("bench.dist", &tags_static), + |mut metric| { + // No origin_tags: exercises the fast path in filter_metric_tags. + filter_metric_tags(&mut metric, &filters); + metric + }, + BatchSize::SmallInput, + ); }); group.finish(); } @@ -361,11 +385,14 @@ fn bench_combined(c: &mut Criterion) { let mut group = c.benchmark_group("tag_filterlist/combined"); group.throughput(Throughput::Elements(1)); group.bench_function("50tags_20origin", |b| { - b.iter(|| { - let mut metric = distribution_metric_with_origin_tags("bench.dist", &tags_static, &origin_tags_static); - filter_metric_tags(&mut metric, &filters); - metric - }); + b.iter_batched( + || distribution_metric_with_origin_tags("bench.dist", &tags_static, &origin_tags_static), + |mut metric| { + filter_metric_tags(&mut metric, &filters); + metric + }, + BatchSize::SmallInput, + ); }); group.finish(); } From 1883d0b6c5cf423d4dfbea3c74e72956ad41ceef Mon Sep 17 00:00:00 2001 From: Olivier Vielpeau Date: Tue, 17 Mar 2026 20:15:37 +0100 Subject: [PATCH 5/5] Make the tags used in micro-benchmark similar to the ones use in core agent benchmark --- lib/saluki-components/benches/tag_filterlist.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/saluki-components/benches/tag_filterlist.rs b/lib/saluki-components/benches/tag_filterlist.rs index 055e03cf42..d793081f17 100644 --- a/lib/saluki-components/benches/tag_filterlist.rs +++ b/lib/saluki-components/benches/tag_filterlist.rs @@ -20,7 +20,7 @@ fn make_tags_static(n: usize) -> Vec<&'static str> { // Leak allocations so we can hand &'static str to Context::from_static_parts. // Benchmarks run for a bounded time so the leak is acceptable. (0..n) - .map(|i| Box::leak(format!("tag{i}:val{i}").into_boxed_str()) as &'static str) + .map(|i| Box::leak(format!("filter_tag_filter_tag_filter_tag_{i}:value_{i}").into_boxed_str()) as &'static str) .collect() }