Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 79 additions & 11 deletions lib/saluki-components/src/transforms/tag_filterlist/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
//! Configuration is read from the `metric_tag_filterlist` key and can be updated at runtime via
//! Remote Config.

mod telemetry;

use async_trait::async_trait;
use foldhash::fast::RandomState as FoldHashState;
use hashbrown::{HashMap, HashSet};
Expand All @@ -18,13 +20,17 @@ use saluki_core::{
ComponentContext,
},
data_model::event::{metric::Metric, EventType},
observability::ComponentMetricsExt,
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saluki-components has #![deny(warnings)], and ComponentMetricsExt is imported here but not used anywhere in the module. This will fail the build due to an unused import warning. Remove the import, or use the extension trait explicitly if it’s required for metrics wiring.

Suggested change
observability::ComponentMetricsExt,

Copilot uses AI. Check for mistakes.
topology::OutputDefinition,
};
use saluki_error::GenericError;
use saluki_metrics::MetricsBuilder;
use serde::Deserialize;
use tokio::select;
use tracing::debug;

use self::telemetry::Telemetry;

/// Action applied to the configured tag list: keep only listed tags, or remove listed tags.
#[derive(Clone, Debug, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
Expand All @@ -49,6 +55,25 @@ pub struct MetricTagFilterEntry {
/// Compiled filter table: metric name → (is_exclude, set of tag key names).
pub type CompiledFilters = HashMap<String, (bool, HashSet<String, FoldHashState>), FoldHashState>;

struct FilteredTagSet {
tags: TagSet,
removed: usize,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
/// Outcome of attempting to apply `metric_tag_filterlist` rules to a metric.
pub enum FilterMetricTagsOutcome {
/// No rule existed for the metric name.
RuleMiss,
/// A rule existed, but applying it did not change any tags.
NoChange,
/// A rule existed and removed one or more tags.
Modified {
/// Total number of instrumented and origin tags removed.
removed_tags: usize,
},
}

/// Compile a slice of filter entries into an O(1)-lookup table.
///
/// Merge rules:
Expand Down Expand Up @@ -117,13 +142,15 @@ impl TransformBuilder for TagFilterlistConfiguration {
OUTPUTS
}

async fn build(&self, _context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
async fn build(&self, context: ComponentContext) -> Result<Box<dyn Transform + Send>, GenericError> {
let metrics_builder = MetricsBuilder::from_component_context(&context);
Ok(Box::new(TagFilterlist {
filters: compile_filters(&self.entries),
configuration: self
.configuration
.clone()
.expect("configuration must be set via from_configuration"),
telemetry: Telemetry::new(&metrics_builder),
}))
}
}
Expand All @@ -137,6 +164,7 @@ impl MemoryBounds for TagFilterlistConfiguration {
struct TagFilterlist {
filters: CompiledFilters,
configuration: GenericConfiguration,
telemetry: Telemetry,
}

#[async_trait]
Expand All @@ -157,7 +185,8 @@ impl Transform for TagFilterlist {
for event in &mut events {
if let Some(metric) = event.try_as_metric_mut() {
if metric.values().is_sketch() {
filter_metric_tags(metric, &self.filters);
let outcome = filter_metric_tags(metric, &self.filters);
self.telemetry.record(outcome);
}
}
}
Expand Down Expand Up @@ -187,7 +216,9 @@ impl Transform for TagFilterlist {
/// Constructs a fresh `TagSet` without mutating the source, preserving isolation for
/// metrics that share the same underlying `Arc<TagSet>`.
#[inline]
fn apply_tag_filter(tags: &SharedTagSet, is_exclude: bool, names: &HashSet<String, FoldHashState>) -> Option<TagSet> {
fn apply_tag_filter(
tags: &SharedTagSet, is_exclude: bool, names: &HashSet<String, FoldHashState>,
) -> Option<FilteredTagSet> {
let capacity = if is_exclude {
tags.len().saturating_sub(names.len())
} else {
Expand All @@ -202,8 +233,9 @@ fn apply_tag_filter(tags: &SharedTagSet, is_exclude: bool, names: &HashSet<Strin
any_change = true;
}
}
let removed = tags.len().saturating_sub(out.len());
if any_change {
Some(out)
Some(FilteredTagSet { tags: out, removed })
} else {
None
}
Expand All @@ -215,31 +247,41 @@ fn apply_tag_filter(tags: &SharedTagSet, is_exclude: bool, names: &HashSet<Strin
/// If the metric name is not present in `filters`, the metric is left unchanged.
/// If filtering would not change any tags, the metric context is left untouched (zero allocations).
#[inline]
pub fn filter_metric_tags(metric: &mut Metric, filters: &CompiledFilters) {
pub fn filter_metric_tags(metric: &mut Metric, filters: &CompiledFilters) -> FilterMetricTagsOutcome {
let Some((is_exclude, tag_names)) = filters.get(metric.context().name().as_ref()) else {
return;
return FilterMetricTagsOutcome::RuleMiss;
};

let new_tags = apply_tag_filter(metric.context().tags(), *is_exclude, tag_names);

if metric.context().origin_tags().is_empty() {
if let Some(filtered) = new_tags {
*metric.context_mut() = metric.context().with_tags(filtered.into_shared());
let removed_tags = filtered.removed;
*metric.context_mut() = metric.context().with_tags(filtered.tags.into_shared());
FilterMetricTagsOutcome::Modified { removed_tags }
} else {
FilterMetricTagsOutcome::NoChange
}
} else {
let new_origin = apply_tag_filter(metric.context().origin_tags(), *is_exclude, tag_names);
match (new_tags, new_origin) {
(None, None) => {}
(None, None) => FilterMetricTagsOutcome::NoChange,
(Some(tags), None) => {
*metric.context_mut() = metric.context().with_tags(tags.into_shared());
let removed_tags = tags.removed;
*metric.context_mut() = metric.context().with_tags(tags.tags.into_shared());
FilterMetricTagsOutcome::Modified { removed_tags }
}
(None, Some(origin)) => {
*metric.context_mut() = metric.context().with_origin_tags(origin.into_shared());
let removed_tags = origin.removed;
*metric.context_mut() = metric.context().with_origin_tags(origin.tags.into_shared());
FilterMetricTagsOutcome::Modified { removed_tags }
}
(Some(tags), Some(origin)) => {
let removed_tags = tags.removed + origin.removed;
*metric.context_mut() = metric
.context()
.with_tags_and_origin_tags(tags.into_shared(), origin.into_shared());
.with_tags_and_origin_tags(tags.tags.into_shared(), origin.tags.into_shared());
FilterMetricTagsOutcome::Modified { removed_tags }
}
}
}
Expand All @@ -250,6 +292,7 @@ mod tests {
use saluki_config::{dynamic::ConfigUpdate, ConfigurationLoader};
use saluki_context::{tags::Tag, Context};
use saluki_core::data_model::event::metric::Metric;
use saluki_metrics::{test::TestRecorder, MetricsBuilder};

use super::*;

Expand Down Expand Up @@ -607,6 +650,31 @@ mod tests {
assert_eq!(origin_tag_names(&metric), vec!["region:us-east-1"]);
}

#[test]
fn telemetry_records_hits_misses_and_filtered_tags() {
let recorder = TestRecorder::default();
let _local = metrics::set_default_local_recorder(&recorder);

let builder = MetricsBuilder::default();
let telemetry = Telemetry::new(&builder);

assert_eq!(recorder.counter("tag_filterlist_rule_hits_total"), Some(0));
assert_eq!(recorder.counter("tag_filterlist_rule_misses_total"), Some(0));
assert_eq!(recorder.counter("tag_filterlist_noop_hits_total"), Some(0));
assert_eq!(recorder.counter("tag_filterlist_metrics_modified_total"), Some(0));
assert_eq!(recorder.counter("tag_filterlist_tags_filtered_total"), Some(0));

telemetry.record(FilterMetricTagsOutcome::RuleMiss);
telemetry.record(FilterMetricTagsOutcome::NoChange);
telemetry.record(FilterMetricTagsOutcome::Modified { removed_tags: 3 });

assert_eq!(recorder.counter("tag_filterlist_rule_hits_total"), Some(2));
assert_eq!(recorder.counter("tag_filterlist_rule_misses_total"), Some(1));
assert_eq!(recorder.counter("tag_filterlist_noop_hits_total"), Some(1));
assert_eq!(recorder.counter("tag_filterlist_metrics_modified_total"), Some(1));
assert_eq!(recorder.counter("tag_filterlist_tags_filtered_total"), Some(3));
}

#[tokio::test]
async fn dynamic_update_partial_replaces_filter() {
// Start with an empty static config to avoid figment merging static and dynamic arrays.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use metrics::Counter;
use saluki_metrics::MetricsBuilder;

use super::FilterMetricTagsOutcome;

#[derive(Clone)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know binary size is a big thing for us. Do we not derive Debug by default for that reason?

pub struct Telemetry {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think structures like this should have doc comments, especially when pub. Or... to kind of say something similar but differently, Telemetry is a fairly overloaded name in a repository like this. Is there a pattern of having many Telemetry structs in the codebase for different purposes?

Perhaps TagFilterTelemetry?

Edit: ok I answered the question with Claude. Yes there is a pattern of telemetry.rs files and structs named telemetry.

My next question is whether it needs to be pub...

rule_hits: Counter,
rule_misses: Counter,
noop_hits: Counter,
metrics_modified: Counter,
tags_filtered: Counter,
}

impl Telemetry {
pub fn new(builder: &MetricsBuilder) -> Self {
Self {
rule_hits: builder.register_debug_counter("tag_filterlist_rule_hits_total"),
rule_misses: builder.register_debug_counter("tag_filterlist_rule_misses_total"),
noop_hits: builder.register_debug_counter("tag_filterlist_noop_hits_total"),
metrics_modified: builder.register_debug_counter("tag_filterlist_metrics_modified_total"),
tags_filtered: builder.register_debug_counter("tag_filterlist_tags_filtered_total"),
}
}

pub fn record(&self, outcome: FilterMetricTagsOutcome) {
match outcome {
FilterMetricTagsOutcome::RuleMiss => {
self.rule_misses.increment(1);
}
FilterMetricTagsOutcome::NoChange => {
self.rule_hits.increment(1);
self.noop_hits.increment(1);
}
FilterMetricTagsOutcome::Modified { removed_tags } => {
self.rule_hits.increment(1);
self.metrics_modified.increment(1);
self.tags_filtered.increment(removed_tags as u64);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
-----BEGIN CERTIFICATE-----
MIIDwTCCAqmgAwIBAgIUGNrRiJ81arCE1jTvLRoxxuLVoHAwDQYJKoZIhvcNAQEL
BQAwgYgxCzAJBgNVBAYTAlVTMQswCQYDVQQIDAJOWTEWMBQGA1UEBwwNTmV3IFlv
cmsgQ2l0eTEWMBQGA1UECgwNRGF0YWRvZywgSW5jLjEoMCYGA1UECwwfU2VsZi1T
aWduZWQgVGVzdGluZyBDZXJ0aWZpY2F0ZTESMBAGA1UEAwwJbG9jYWxob3N0MB4X
DTI1MTExMzE4NDAxMFoXDTM1MTExMTE4NDAxMFowgYgxCzAJBgNVBAYTAlVTMQsw
CQYDVQQIDAJOWTEWMBQGA1UEBwwNTmV3IFlvcmsgQ2l0eTEWMBQGA1UECgwNRGF0
YWRvZywgSW5jLjEoMCYGA1UECwwfU2VsZi1TaWduZWQgVGVzdGluZyBDZXJ0aWZp
Y2F0ZTESMBAGA1UEAwwJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8A
MIIBCgKCAQEAs9oaPxXZ5t2725jEWJ+kLMbH6tFzKIgGEG2wpzJik1AumsSeax40
JvA6W20Gyb73KkbCWM7Hm83++5QesDllpIvK0QNlDsz9VzQtiMCRZQ5Dfuw6stl+
KQaCFvaEXBwvz/kfrIBLDeww9H9VL3YP2JnHIcA4Y5bvdIrSr3q59n7nd9exBtjS
IZxLHxf44/yQMIUcmPESnLNGYLxqIYuHvre+t1CrWduzHwKtsSIP1qy3U2YCRQW6
mxDaZ+aI0BY8vYNX1w4XEeW8NO1HEu25DPXPWGbcSgHVk2VnQKtMIoVjuCGZGbBz
/TFJo69KoW/3CH3WHpT2JVKPV5itxkP68QIDAQABoyEwHzAdBgNVHQ4EFgQUFHdk
73oIdZ7CkilNi96SveiH/OcwDQYJKoZIhvcNAQELBQADggEBAEMd7CiZkIqtE+O4
Nj19xiTflKyw0EBklwlq1NrFb6VpGN3vPcu5X2CNH9p+c+zPYgDSTf+IFBYXO/zE
AzNYjFeUQf6Hsh0fTLqaiaueiAZw1o6QJp7/xFdwFcUFJM72lIlgndkoffEJCLOR
GcjAYD2Qv/oCjQ2B5xeHT9Sk4t85dhiK0aLbeSi+9yqtHis15OW8vqVFJXh1SxMV
zSKN9C4Yw0JKMNqlmePjQIc3d28tm4sUOzD2+qF+mSTSZqvAEuO3loQJPexfsDsF
n929iEzFbOshRpjKp/mZ1jzZewDOIg5Zek1UmbqIHf9MHfpdDYA0zcEnzTQqGFtL
BxX9Nnk=
-----END CERTIFICATE-----
-----BEGIN PRIVATE KEY-----
MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQCz2ho/Fdnm3bvb
mMRYn6Qsxsfq0XMoiAYQbbCnMmKTUC6axJ5rHjQm8DpbbQbJvvcqRsJYzsebzf77
lB6wOWWki8rRA2UOzP1XNC2IwJFlDkN+7Dqy2X4pBoIW9oRcHC/P+R+sgEsN7DD0
f1Uvdg/YmcchwDhjlu90itKvern2fud317EG2NIhnEsfF/jj/JAwhRyY8RKcs0Zg
vGohi4e+t763UKtZ27MfAq2xIg/WrLdTZgJFBbqbENpn5ojQFjy9g1fXDhcR5bw0
7UcS7bkM9c9YZtxKAdWTZWdAq0wihWO4IZkZsHP9MUmjr0qhb/cIfdYelPYlUo9X
mK3GQ/rxAgMBAAECggEAFPVEe7d7JfkOzB84Oi+YPROI9mcj6UQ4mK9/l2w6qnn8
hFKHN2pUn3j9A+xnjxjuyoFmYlzuS8ysevqevOBjZNJZdxPTMe9XUlMlPztZdhI8
cUzr5i2MExHMFQrzD5zFQZIBS+PDW0L7zXINJjO20wHQf7FboNdU3hrTRFmj1AJx
ExWk+vnY2YrzQLewsbPtK1FX6wqKhYuOSUOjpNJmf+2+aGfR+Bxx2NBF4marJoj3
migN1pThUQAoPXX5qRoX3VpEEZEOMycuSXp+raqcBqSFfv8gz/iM6tLfctRgqe0w
266oxipeLLXLmffYvnXrYXwK4Jl5M6Zd9TVjf6GDBQKBgQD+FSW+YEp6l4Zz5Rwo
Xrb104ILMHsd5V59KyNcd2c3mxMEhodN3sQDEd4OQRmMGIfcivb9Sn49GRBMTu29
f4ax6+F0Su66MOikgdhOYyNWI3TfMbsY/zCyXiGU9MiWVNDzx1nQj8sG7h36dK1l
Yxv6xx/a857kbZYnB53DCQ3dmwKBgQC1NY09e3Qq+hXyuZEJSsHE5LAYd9TeWs+D
FuUmL8FQkGCihQz/fX3+yJar4oXDoIHnhEwGEEKkOoT9EJ4uzW9L0WSy6fopjRs2
lXTnQBbB7414ZYkTM/oanTh7dBVG307oyb6d5+Jl0NPxs5Fomh6xOMWD8hB0+CTw
bAaO/c5YYwKBgATdlM5ze5mjYzC+924SekB0322lbQYiiU+uTswLgU+ASbnxdY/Z
Lzm70tvFBV84bQmdI6OwFIDJBRXhAQ567bJkiPm4IaAxJZNY5TKDFX8lyKwpgKK2
6FDSGqSGl3zBfQreC2tCBapJTwunxlZFsph3zbVcqvNG4fQ3Yh8FAl33AoGADR9l
rRAlp38Y280Ibc3WHnYZMoxrA/c7k9iym4NV0onCFcLg4BesaikIkEYFPdd/0M5J
2x6OVOpP+yua6PTDnI/7ZOGA1kV7tQY5ww1nGIBKlG9178gR0p+UGYychddiFYWW
okTKpmjrEFPaseKHWnosA5QiEPZvZmHMT8qdiNUCgYAweUc7+5slfxKYVLUo34p9
BiED632dV2YA7CYf+iEjXoFjCOH+ZqzI7OGfODpSMpjim3Fxt++UWbJscbNmtCZr
UeNDsJxiC+eksGWqpu0QoKdNPYRBB2QQFcvTT0AVt6PTzKaFVBvF/dnLuODjXp9Y
R5Ivn5V7QPeMY56Uzr2fJQ==
-----END PRIVATE KEY-----
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
44 changes: 44 additions & 0 deletions test/smp/regression/adp/cases/tagfilter_0tags/experiment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
optimization_goal: cpu
erratic: false
target:
name: agent-data-plane
command: /maybe-profile.sh /usr/local/bin/agent-data-plane --config /etc/agent-data-plane/empty.yaml run
cpu_allotment: 4
memory_allotment: 3200MiB
environment:
DD_HOSTNAME: smp-regression
DD_API_KEY: foo00000001
DD_DD_URL: http://127.0.0.1:9091
DD_IPC_CERT_FILE_PATH: /etc/agent-data-plane/cert.pem
DD_LOG_FORMAT_JSON: "true"
DD_DATA_PLANE_STANDALONE_MODE: "true"
DD_DATA_PLANE_TELEMETRY_ENABLED: "true"
DD_DATA_PLANE_TELEMETRY_LISTEN_ADDR: tcp://127.0.0.1:5102
DD_DATA_PLANE_DOGSTATSD_ENABLED: "true"
DD_DOGSTATSD_PORT: "0"
DD_DOGSTATSD_SOCKET: /tmp/dsd.socket
DD_DOGSTATSD_ORIGIN_DETECTION: "true"
DD_AGGREGATE_CONTEXT_LIMIT: "80000"
profiling_environment:
SMP_PROFILING_ENABLED: "true"
DD_SERVICE: agent-data-plane
DD_TRACE_AGENT_URL: unix:///smp-host/apm.socket
DD_PROFILING_NATIVE_PRESET: cpu_live_heap
DD_PROFILING_INLINED_FUNCTIONS: "true"
checks:
- name: memory_usage
description: Memory usage quality gate. This puts a bound on the total ADP memory usage.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended to describe the entire experiment, or just the check? Without looking at other experiment.yaml files (yet), it feels like what is missing is a story about why the experiment was added. For example we have these large metric names that are the same until the Nth character position and these are being added to verify the performance of a new tag filtering mechanism. Do we want some of that context and thinking to be added to either the lading.yaml or experiment.yaml files?

bounds:
series: total_rss_bytes
upper_bound: 900.0 MiB
report_links:
- text: (metrics)
link: https://app.datadoghq.com/dashboard/4br-nxz-khi?fromUser=true&refresh_mode=paused&tpl_var_adp-run-id%5B0%5D={{ job_id
}}&tpl_var_experiment%5B0%5D={{ experiment }}&view=spans&from_ts={{ start_time_ms }}&to_ts={{ end_time_ms }}&live=false
- text: (profiles)
link: https://app.datadoghq.com/profiling/explorer?query=env%3Asingle-machine-performance%20service%3Aagent-data-plane%20job_id%3A{{
job_id }}%20experiment%3A{{ experiment }}&agg_m=count&agg_m_source=base&agg_t=count&fromUser=false&viz=stream&start={{
filter_start }}&end={{ filter_end }}&paused=true
- text: (logs)
link: https://app.datadoghq.com/logs?query=experiment%3A{{ experiment }}%20run_id%3A{{ job_id }}&agg_m=count&agg_m_source=base&agg_q=%40span.url&agg_q_source=base&agg_t=count&fromUser=true&index=single-machine-performance-target-logs&messageDisplay=inline&refresh_mode=paused&storage=hot&stream_sort=time%2Cdesc&top_n=100&top_o=top&viz=stream&x_missing=true&from_ts={{
filter_start }}&to_ts={{ filter_end }}&live=false
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
blackhole:
- http:
binding_addr: 127.0.0.1:9091
- http:
binding_addr: 127.0.0.1:9092
target_metrics:
- prometheus:
uri: http://127.0.0.1:5102/scrape
generator:
- unix_datagram:
seed: [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107,
109, 113, 127, 131]
path: /tmp/dsd.socket
bytes_per_second: 250 MiB
maximum_prebuild_cache_size_bytes: 500 MiB
variant:
dogstatsd:
contexts:
inclusive:
min: 78000
max: 78000
tags_per_msg:
inclusive:
min: 54
max: 54
multivalue_count:
inclusive:
min: 2
max: 32
multivalue_pack_probability: 0.08
kind_weights:
metric: 90
event: 0
service_check: 0
metric_weights:
count: 0
gauge: 0
timer: 0
distribution: 5
set: 0
histogram: 0
metric_names:
- metricname_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa{{0-499}}
tag_names:
- tagname_aaaaaaaaaaaaaaaaaaaaaa_{{0-54}}
tag_values:
- value{{0-9}}
Loading
Loading