Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 10 additions & 59 deletions lib/saluki-components/src/transforms/host_tags/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
use std::{
num::NonZeroUsize,
sync::Arc,
time::{Duration, Instant},
};

use async_trait::async_trait;
use bytesize::ByteSize;
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use saluki_config::GenericConfiguration;
use saluki_context::{
tags::{SharedTagSet, Tag, TagSet},
ContextResolver, ContextResolverBuilder,
};
use saluki_context::tags::{SharedTagSet, Tag};
use saluki_core::{components::transforms::*, topology::EventsBuffer};
use saluki_core::{components::ComponentContext, data_model::event::metric::Metric};
use saluki_env::helpers::remote_agent::RemoteAgentClient;
use saluki_error::{generic_error, GenericError};
use saluki_error::GenericError;
use stringtheory::MetaString;

/// Host Tags synchronous transform.
Expand All @@ -24,12 +19,10 @@ use stringtheory::MetaString;
/// preventing gaps in queryability until the backend starts adding these tags automatically.
pub struct HostTagsConfiguration {
client: RemoteAgentClient,
host_tags_context_string_interner_bytes: ByteSize,
expected_tags_duration: u64,
}

const DEFAULT_EXPECTED_TAGS_DURATION: u64 = 0;
const DEFAULT_HOST_TAGS_CONTEXT_STRING_INTERNER_BYTES: ByteSize = ByteSize::kib(64);

impl HostTagsConfiguration {
/// Creates a new `HostTagsConfiguration` from the given configuration.
Expand All @@ -38,21 +31,17 @@ impl HostTagsConfiguration {
let expected_tags_duration = config
.try_get_typed::<u64>("expected_tags_duration")?
.unwrap_or(DEFAULT_EXPECTED_TAGS_DURATION);
let host_tags_context_string_interner_bytes = config
.try_get_typed::<ByteSize>("host_tags_context_string_interner_bytes")?
.unwrap_or(DEFAULT_HOST_TAGS_CONTEXT_STRING_INTERNER_BYTES);

Ok(Self {
client,
host_tags_context_string_interner_bytes,
expected_tags_duration,
})
}
}

#[async_trait]
impl SynchronousTransformBuilder for HostTagsConfiguration {
async fn build(&self, context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
async fn build(&self, _context: ComponentContext) -> Result<Box<dyn SynchronousTransform + Send>, GenericError> {
// Make an initial request of the host tags from the Datadog Agent.
//
// We only pay attention to the "system" tags, as the "google_cloud_platform" tags are not relevant here.
Expand All @@ -65,19 +54,8 @@ impl SynchronousTransformBuilder for HostTagsConfiguration {
.map(Tag::from)
.collect::<SharedTagSet>();

let context_string_interner_size =
NonZeroUsize::new(self.host_tags_context_string_interner_bytes.as_u64() as usize)
.ok_or_else(|| generic_error!("host_tags_context_string_interner_bytes must be greater than 0"))
.unwrap();
let context_resolver =
ContextResolverBuilder::from_name(format!("{}/host_tags/primary", context.component_id()))
.expect("resolver name is not empty")
.with_interner_capacity_bytes(context_string_interner_size)
.with_idle_context_expiration(Duration::from_secs(30))
.build();
Ok(Box::new(HostTagsEnrichment {
start: Instant::now(),
context_resolver: Some(context_resolver),
expected_tags_duration: Duration::from_secs(self.expected_tags_duration),
host_tags: Some(host_tags),
}))
Comment on lines 44 to 61
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This transform used to rebuild contexts via a ContextResolver, which provides interning + caching of resolved contexts. With the new in-place with_tags_mut approach, any metric contexts that are shared (common with ContextResolver-produced contexts) will be detached via Arc::make_mut, potentially producing one enriched ContextInner per metric and losing cross-metric sharing. If host-tag enrichment is expected to touch many metrics, consider retaining a resolver/cache for enriched contexts or another strategy that preserves sharing.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Host Tags transform generally only operates for a few minutes after startup, so the reduction in sharing due to creating an off-shoot Context, and not caching that via something like ContextResolver, is pretty small and not sustained over time.

However, it could be useful to add something in ContextResolver where we support modifying how the input pieces are used to build the resulting context. In order to actually make use of ContextResolver in this scenario, we'd want to feed it to the incoming metric context untouched, and then generate a new context (which is the one we'd cache) with the additional host tags bolted on... such that we didn't have to do the chaining/additional host tags for subsequent lookups.

We could do a cache, too, but ContextResolver already wraps up most of the things we want here -- cache config, expiration, telemetry, etc -- in a nice package that's specific to contexts and so it'd be nicer to somehow add it there than to just use a cache naïvely.

Expand All @@ -87,56 +65,32 @@ impl SynchronousTransformBuilder for HostTagsConfiguration {
impl MemoryBounds for HostTagsConfiguration {
fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {
builder
// Capture the size of the heap allocation when the component is built.
.minimum()
.with_single_value::<HostTagsEnrichment>("component struct")
// We also allocate the backing storage for the string interner up front, which is used by our context
// resolver.
.with_fixed_amount(
"string interner",
self.host_tags_context_string_interner_bytes.as_u64() as usize,
);
.with_single_value::<HostTagsEnrichment>("component struct");
}
}

pub struct HostTagsEnrichment {
start: Instant,
context_resolver: Option<ContextResolver>,
expected_tags_duration: Duration,
host_tags: Option<SharedTagSet>,
}

impl HostTagsEnrichment {
fn enrich_metric(&mut self, metric: &mut Metric) {
// Get our context resolver and host tags.
//
// If they're not available, then we skip adding host tags.
let (resolver, host_tags) = match (self.context_resolver.as_mut(), self.host_tags.as_ref()) {
(Some(resolver), Some(host_tags)) => (resolver, host_tags),
_ => return,
let host_tags = match self.host_tags.as_ref() {
Some(host_tags) => host_tags,
None => return,
};

// TODO: use mutable tagsets to chain host tags on existing metric tags instead of allocating
let tags = metric
.context()
.tags()
.into_iter()
.chain(host_tags)
.cloned()
.collect::<TagSet>();
let origin_tags = metric.context().origin_tags().clone();

if let Some(context) = resolver.resolve_with_origin_tags(metric.context().name(), tags, origin_tags) {
*metric.context_mut() = context;
}
metric.context_mut().mutate_tags(|tags| tags.merge_shared(host_tags));
}
}

impl SynchronousTransform for HostTagsEnrichment {
fn transform_buffer(&mut self, event_buffer: &mut EventsBuffer) {
// Skip adding host tags if duration has elapsed.
if self.start.elapsed() >= self.expected_tags_duration {
self.context_resolver = None;
self.host_tags = None;
return;
}
Expand All @@ -153,18 +107,16 @@ impl SynchronousTransform for HostTagsEnrichment {
mod tests {
use std::time::{Duration, Instant};

use saluki_context::{Context, ContextResolverBuilder};
use saluki_context::Context;
use saluki_core::data_model::event::metric::Metric;

use super::*;

#[test]
fn basic() {
let context_resolver = ContextResolverBuilder::for_tests().build();
let host_tags = SharedTagSet::from_iter(vec![Tag::from("hosttag1"), Tag::from("hosttag2")]);
let mut host_tags_enrichment = HostTagsEnrichment {
start: Instant::now(),
context_resolver: Some(context_resolver),
expected_tags_duration: Duration::from_secs(30),
host_tags: Some(host_tags.clone()),
};
Expand All @@ -177,8 +129,7 @@ mod tests {
assert!(metric1.context().tags().has_tag(tag));
}

// Simulate exceeding our configured enrichment duration by clearing the context resolver and host tags.
host_tags_enrichment.context_resolver = None;
// Simulate exceeding our configured enrichment duration by clearing host tags.
host_tags_enrichment.host_tags = None;

// We should no longer enrich the metric with host tags.
Expand Down
108 changes: 108 additions & 0 deletions lib/saluki-context/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,39 @@ impl Context {
&self.inner.origin_tags
}

/// Mutates the instrumented tags of this context via a closure.
///
/// Uses copy-on-write semantics: if this context shares its inner data with other clones, the
/// inner data is cloned first so that mutations do not affect other holders. If this context is
/// the sole owner, the mutation happens in place.
///
/// The context key is automatically recomputed after the closure returns.
pub fn mutate_tags(&mut self, f: impl FnOnce(&mut TagSet)) {
self.mutate_inner(|inner| f(&mut inner.tags));
}

/// Mutates the origin tags of this context via a closure.
///
/// Uses copy-on-write semantics: if this context shares its inner data with other clones, the
/// inner data is cloned first so that mutations do not affect other holders. If this context is
/// the sole owner, the mutation happens in place.
///
/// The context key is automatically recomputed after the closure returns.
pub fn mutate_origin_tags(&mut self, f: impl FnOnce(&mut TagSet)) {
self.mutate_inner(|inner| f(&mut inner.origin_tags));
}
Comment on lines +118 to +138
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new with_tags_mut/with_origin_tags_mut methods mutate fields that contribute to Context's Eq/Hash (via the recomputed key). Please document that callers must not mutate a Context while it is being used as a key in a HashMap/HashSet (or otherwise relied on for stable hashing), as this would break those data structures' invariants.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true, except for the fact that normal collections (certainly the above ones from stdlib) don't actually allow modifying the key (or value, if HashSet) after insertion... and we're not doing interior mutability tricks here so we're not subverting that, and thus this isn't actually a problem for us.


/// Runs the given closure on the inner context data, recomputing the context key afterwards.
///
/// When the inner context state is shared (we aren't the only ones with a strong reference), we clone the inner
/// data first to have our own copy. Otherwise, we modify the inner data in place.
fn mutate_inner(&mut self, f: impl FnOnce(&mut ContextInner)) {
Comment thread
tobz marked this conversation as resolved.
let inner = Arc::make_mut(&mut self.inner);
f(inner);
let (key, _) = hash_context(&inner.name, &inner.tags, &inner.origin_tags);
inner.key = key;
}
Comment on lines +144 to +149
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mutate_inner recomputes the context key via hash_context, which allocates a new PrehashedHashSet each call. If with_tags_mut is used in per-metric hot paths (e.g. enrichment), this extra allocation can be a noticeable cost. Consider using the non-allocating hash_context_with_seen with a reusable seen set (or another allocation-free rehash strategy) to avoid per-mutation allocations.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is good advice, and we might consider a thread-local seen that can be reused, which would also help us clean up the different versions of the function, the callsites using them which carry around their own PrehashedHashSet, and so on.

Good for a follow-up PR.


/// Returns the size of this context in bytes.
///
/// A context's size is the sum of the sizes of its fields and the size of the `Context` struct itself, and
Expand Down Expand Up @@ -325,4 +358,79 @@ mod tests {
BASE_CONTEXT_SIZE + SIZE_OF_CONTEXT_NAME.len() + tags.size_of() + origin_tags.size_of()
);
}

#[test]
fn with_tags_mut_clones_shared_context() {
let original = Context::from_static_parts("metric", &["env:prod"]);
let mut mutated = original.clone();

// They share the same Arc before mutation.
assert!(original.ptr_eq(&mutated));

mutated.mutate_tags(|tags| {
tags.insert_tag(Tag::from("service:web"));
});

// After mutation, they no longer share the same inner.
assert!(!original.ptr_eq(&mutated));
}

#[test]
fn with_tags_mut_does_not_affect_original() {
let original = Context::from_static_parts("metric", &["env:prod"]);
let mut mutated = original.clone();

mutated.mutate_tags(|tags| {
tags.insert_tag(Tag::from("service:web"));
});

// Original is unchanged.
assert_eq!(original.tags().len(), 1);
assert!(original.tags().has_tag("env:prod"));
assert!(!original.tags().has_tag("service:web"));

// Mutated has both tags.
assert_eq!(mutated.tags().len(), 2);
assert!(mutated.tags().has_tag("env:prod"));
assert!(mutated.tags().has_tag("service:web"));
}

#[test]
fn with_tags_mut_rehashes() {
// Build a context and mutate it to add a tag.
let mut mutated = Context::from_static_parts("metric", &["env:prod"]);
mutated.mutate_tags(|tags| {
tags.insert_tag(Tag::from("service:web"));
});

// Build an equivalent context from scratch with both tags.
let expected = Context::from_static_parts("metric", &["env:prod", "service:web"]);

// The recomputed key should match a freshly-constructed context with the same state.
assert_eq!(mutated, expected);
Comment thread
tobz marked this conversation as resolved.

// Modify a tag on the mutated context that _isn't_ shared with `expected` to ensure that there's no asymmetric
// equality logic.
mutated.mutate_tags(|tags| {
tags.insert_tag(Tag::from("cluster:foo"));
});
assert_ne!(mutated, expected);
}

#[test]
fn with_origin_tags_mut_clones_shared_context() {
let original = Context::from_static_name("metric");
let mut mutated = original.clone();

assert!(original.ptr_eq(&mutated));

mutated.mutate_origin_tags(|tags| {
tags.insert_tag(Tag::from("origin:tag"));
});

assert!(!original.ptr_eq(&mutated));
assert!(original.origin_tags().is_empty());
assert_eq!(mutated.origin_tags().len(), 1);
assert!(mutated.origin_tags().has_tag("origin:tag"));
}
}
25 changes: 10 additions & 15 deletions lib/saluki-context/src/tags/tagset/owned.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,10 @@ impl<'a> Iterator for BaseIndexIter<'a> {

#[cfg(test)]
mod tests {
use std::collections::{BTreeSet, HashSet};

use proptest::{collection::vec as arb_vec, prelude::*, prop_oneof};

use super::*;

/// Helper: create a SharedTagSet from static tag strings.
Expand Down Expand Up @@ -878,15 +882,6 @@ mod tests {
ts.insert_tag(Tag::from("c:3"));
assert_eq!(ts.len(), 3);
}
}

#[cfg(test)]
mod proptests {
use std::collections::BTreeSet;

use proptest::prelude::*;

use super::*;

/// Operations we can apply to a TagSet.
#[derive(Clone, Debug)]
Expand All @@ -907,23 +902,23 @@ mod proptests {

/// Strategy for generating a random operation.
fn arb_op() -> impl Strategy<Value = Op> {
prop_oneof![arb_tag().prop_map(Op::Insert), arb_key().prop_map(Op::RemoveByName),]
prop_oneof![arb_tag().prop_map(Op::Insert), arb_key().prop_map(Op::RemoveByName)]
}

/// Strategy for generating a group of tags (for one FrozenTagSet in the chain).
fn arb_tag_group() -> impl Strategy<Value = Vec<String>> {
prop::collection::vec(arb_tag(), 0..10)
arb_vec(arb_tag(), 0..10)
}

/// Strategy for generating a base with 1-3 chained tag groups.
fn arb_base_groups() -> impl Strategy<Value = Vec<Vec<String>>> {
prop::collection::vec(arb_tag_group(), 1..4)
arb_vec(arb_tag_group(), 1..4)
}

/// Build a SharedTagSet from multiple groups (each becomes a chained FrozenTagSet).
/// Deduplicates exact tags across groups to avoid cross-chain duplicates.
fn build_chained_base(groups: &[Vec<String>]) -> SharedTagSet {
let mut seen_tags = std::collections::HashSet::new();
let mut seen_tags = HashSet::new();

let mut shared = {
let ts: TagSet = groups[0]
Expand Down Expand Up @@ -979,9 +974,9 @@ mod proptests {

#[test]
#[cfg_attr(miri, ignore)]
fn overlay_matches_reference(
fn property_test_overlay_matches_reference(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed this one so it would be properly filtered when running unit tests vs when specifically trying to run property tests.

base_groups in arb_base_groups(),
ops in prop::collection::vec(arb_op(), 0..20),
ops in arb_vec(arb_op(), 0..20),
) {
let base = build_chained_base(&base_groups);

Expand Down
Loading