Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use ottl::{EvalContextFamily, Field, IndexExpr, PathAccessor, PathResolverMap, Value};
use saluki_context::tags::SharedTagSet;
use saluki_context::tags::TagSet;
use saluki_core::data_model::event::trace::Span;

/// Family type for the span filter evaluation context.
Expand All @@ -34,13 +34,13 @@ pub struct SpanFilterContext<'a> {
/// Reference to the span being evaluated.
pub(super) span: &'a Span,
/// Reference to the trace's resource-level tags.
pub(super) resource_tags: &'a SharedTagSet,
pub(super) resource_tags: &'a TagSet,
}

impl<'a> SpanFilterContext<'a> {
/// Creates a context from references to the current span and resource tags.
#[inline]
pub fn new(span: &'a Span, resource_tags: &'a SharedTagSet) -> Self {
pub fn new(span: &'a Span, resource_tags: &'a TagSet) -> Self {
Self { span, resource_tags }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use async_trait::async_trait;
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use ottl::{CallbackMap, EnumMap, OttlParser};
use saluki_config::GenericConfiguration;
use saluki_context::tags::SharedTagSet;
use saluki_context::tags::TagSet;
use saluki_core::{
components::{transforms::*, ComponentContext},
data_model::event::trace::Span,
Expand Down Expand Up @@ -107,7 +107,7 @@ impl OttlTransform {
/// Each statement is executed in order. For editor statements (e.g. `set`), the `where`
/// clause is evaluated first; if it matches (or is absent), the editor function runs.
/// Errors are handled according to `error_mode`.
fn transform_span(&self, span: &mut Span, resource_tags: &SharedTagSet) {
fn transform_span(&self, span: &mut Span, resource_tags: &TagSet) {
let mut ctx = SpanTransformContext::new(span, resource_tags);

for parser in &self.span_parsers {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
//! immutable reference to the resource tags.
//!
//! `attributes` supports both read and write via [`SpanAttributesAccessor`].
//! `resource.attributes` is read-only because [`SharedTagSet`] does not expose mutable access.
//! `resource.attributes` is read-only because [`TagSet`] does not expose mutable access.

use std::collections::HashMap;
use std::sync::Arc;

use ottl::{EvalContextFamily, Field, IndexExpr, PathAccessor, PathResolverMap, Value};
use saluki_context::tags::SharedTagSet;
use saluki_context::tags::TagSet;
use saluki_core::data_model::event::trace::Span;
use stringtheory::MetaString;

Expand All @@ -40,13 +40,13 @@ pub struct SpanTransformContext<'a> {
/// Mutable reference to the span being transformed.
pub(super) span: &'a mut Span,
/// Reference to the trace's resource-level tags (read-only).
pub(super) resource_tags: &'a SharedTagSet,
pub(super) resource_tags: &'a TagSet,
}

impl<'a> SpanTransformContext<'a> {
/// Creates a context from a mutable span reference and immutable resource tags.
#[inline]
pub fn new(span: &'a mut Span, resource_tags: &'a SharedTagSet) -> Self {
pub fn new(span: &'a mut Span, resource_tags: &'a TagSet) -> Self {
Self { span, resource_tags }
}
}
Expand Down Expand Up @@ -115,7 +115,7 @@ impl PathAccessor<SpanTransformFamily> for SpanAttributesAccessor {

/// Path accessor for `resource.attributes` (trace resource tags).
///
/// Read-only: [`SharedTagSet`] does not expose mutable access, so `set` returns an error.
/// Read-only: [`TagSet`] does not expose mutable access, so `set` returns an error.
#[derive(Debug)]
pub struct ResourceAttributesAccessor;

Expand All @@ -136,13 +136,13 @@ impl PathAccessor<SpanTransformFamily> for ResourceAttributesAccessor {
Ok(value)
}

/// Always returns an error: `SharedTagSet` is an Arc-based immutable type and `Trace`
/// Always returns an error: `TagSet` is an Arc-based immutable type and `Trace`
/// does not expose yet mutable way to access resource_tags, so there is no way to write changes
/// back to the trace's resource tags.
fn set<'a>(&self, _ctx: &mut SpanTransformContext<'a>, fields: &[Field], _value: &Value) -> ottl::Result<()> {
let path_str: String = fields.iter().map(|f| f.name.as_str()).collect::<Vec<_>>().join(".");
Err(format!(
"resource.attributes is read-only; setting path `{}` is not supported because SharedTagSet does not expose mutable access",
"resource.attributes is read-only; setting path `{}` is not supported because TagSet does not expose mutable access",
path_str
)
.into())
Expand Down
4 changes: 2 additions & 2 deletions bin/agent-data-plane/src/state/metrics/rules/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use saluki_context::{tags::SharedTagSet, Context};
use saluki_context::{tags::TagSet, Context};
use stringtheory::MetaString;

mod aggregation;
Expand Down Expand Up @@ -125,7 +125,7 @@ impl RemapperRule {
}

/// Builds the remapped tags for a matched metric.
fn build_remapped_tags(&self, metric_tags: &SharedTagSet) -> Vec<MetaString> {
fn build_remapped_tags(&self, metric_tags: &TagSet) -> Vec<MetaString> {
let mut new_tags = vec![];

for (original_tag_name, new_tag_name) in &self.remapped_tags {
Expand Down
10 changes: 5 additions & 5 deletions lib/saluki-components/src/common/otlp/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::LazyLock;
use opentelemetry_semantic_conventions::resource::*;
use otlp_protos::opentelemetry::proto::common::v1::{self as otlp_common, any_value::Value};
use saluki_common::collections::{FastHashMap, FastHashSet};
use saluki_context::tags::{SharedTagSet, TagSet};
use saluki_context::tags::TagSet;

// ============================================================================
// Datadog attribute key constants shared across the encoder and translator
Expand Down Expand Up @@ -144,9 +144,9 @@ pub fn extract_container_tags_from_resource_attributes(attributes: &[otlp_common

/// Extracts container tags from a resource tagset and inserts them into the provided TagSet.
///
/// This mirrors `extract_container_tags_from_resource_attributes`, but operates on a `SharedTagSet` representation of
/// This mirrors `extract_container_tags_from_resource_attributes`, but operates on a `TagSet` representation of
/// the resource.
pub fn extract_container_tags_from_resource_tagset(resource_tags: &SharedTagSet, tags: &mut TagSet) {
pub fn extract_container_tags_from_resource_tagset(resource_tags: &TagSet, tags: &mut TagSet) {
let mut extracted_tags = FastHashSet::default();

for tag in resource_tags {
Expand Down Expand Up @@ -208,10 +208,10 @@ pub fn resource_to_source(resource: &otlp_protos::opentelemetry::proto::resource
None
}

/// Resolves the source metadata from a resource `SharedTagSet`.
/// Resolves the source metadata from a resource `TagSet`.
///
/// This is equivalent to `resource_to_source`, but avoids the OTLP protobuf resource type.
pub fn tags_to_source(resource_tags: &SharedTagSet) -> Option<Source> {
pub fn tags_to_source(resource_tags: &TagSet) -> Option<Source> {
let get = |key: &str| -> Option<&str> { resource_tags.get_single_tag(key).and_then(|t| t.value()) };

// AWS ECS Fargate
Expand Down
4 changes: 2 additions & 2 deletions lib/saluki-components/src/destinations/dsd_stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use saluki_api::{
APIHandler, StatusCode,
};
use saluki_common::time::get_coarse_unix_timestamp;
use saluki_context::tags::SharedTagSet;
use saluki_context::tags::TagSet;
use saluki_core::{
components::{
destinations::{Destination, DestinationBuilder, DestinationContext},
Expand Down Expand Up @@ -195,7 +195,7 @@ impl Destination for DogStatsDStats {
#[derive(Eq, Hash, PartialEq, Serialize)]
struct ContextNoOrigin {
name: MetaString,
tags: SharedTagSet,
tags: TagSet,
}
#[derive(Deserialize)]
struct StatsQueryParams {
Expand Down
14 changes: 7 additions & 7 deletions lib/saluki-components/src/encoders/datadog/traces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use piecemeal::{ScratchBuffer, ScratchWriter};
use saluki_common::strings::StringBuilder;
use saluki_common::task::HandleExt as _;
use saluki_config::GenericConfiguration;
use saluki_context::tags::{SharedTagSet, TagSet};
use saluki_context::tags::TagSet;
use saluki_core::data_model::event::trace::{AttributeScalarValue, AttributeValue, Span as DdSpan};
use saluki_core::topology::{EventsBuffer, PayloadsBuffer};
use saluki_core::{
Expand Down Expand Up @@ -688,12 +688,12 @@ fn encode_attribute_array_value<S: ScratchBuffer>(
Ok(())
}

fn get_resource_tag_value<'a>(resource_tags: &'a SharedTagSet, key: &str) -> Option<&'a str> {
fn get_resource_tag_value<'a>(resource_tags: &'a TagSet, key: &str) -> Option<&'a str> {
resource_tags.get_single_tag(key).and_then(|t| t.value())
}

fn resolve_hostname<'a>(
resource_tags: &'a SharedTagSet, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
resource_tags: &'a TagSet, source: Option<&'a OtlpSource>, default_hostname: Option<&'a str>,
ignore_missing_fields: bool,
) -> Option<&'a str> {
let mut hostname = match source {
Expand All @@ -715,7 +715,7 @@ fn resolve_hostname<'a>(
hostname
}

fn resolve_env(resource_tags: &SharedTagSet, ignore_missing_fields: bool) -> Option<&str> {
fn resolve_env(resource_tags: &TagSet, ignore_missing_fields: bool) -> Option<&str> {
if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_ENVIRONMENT) {
return Some(value);
}
Expand All @@ -728,7 +728,7 @@ fn resolve_env(resource_tags: &SharedTagSet, ignore_missing_fields: bool) -> Opt
get_resource_tag_value(resource_tags, DEPLOYMENT_ENVIRONMENT_KEY)
}

fn resolve_container_id<'a>(resource_tags: &'a SharedTagSet, first_span: Option<&'a DdSpan>) -> Option<&'a str> {
fn resolve_container_id<'a>(resource_tags: &'a TagSet, first_span: Option<&'a DdSpan>) -> Option<&'a str> {
for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
if let Some(value) = get_resource_tag_value(resource_tags, key) {
return Some(value);
Expand All @@ -746,15 +746,15 @@ fn resolve_container_id<'a>(resource_tags: &'a SharedTagSet, first_span: Option<
None
}

fn resolve_app_version(resource_tags: &SharedTagSet) -> Option<&str> {
fn resolve_app_version(resource_tags: &TagSet) -> Option<&str> {
if let Some(value) = get_resource_tag_value(resource_tags, KEY_DATADOG_VERSION) {
return Some(value);
}
get_resource_tag_value(resource_tags, SERVICE_VERSION)
}

fn resolve_container_tags(
resource_tags: &SharedTagSet, source: Option<&OtlpSource>, ignore_missing_fields: bool,
resource_tags: &TagSet, source: Option<&OtlpSource>, ignore_missing_fields: bool,
) -> Option<MetaString> {
// TODO: some refactoring is probably needed to normalize this function, the tags should already be normalized
// since we do so when we transform OTLP spans to DD spans however to make this class extensible for non otlp traces, we would
Expand Down
8 changes: 3 additions & 5 deletions lib/saluki-components/src/sources/otlp/logs/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use otlp_common::any_value::Value::{ArrayValue, BoolValue, BytesValue, DoubleVal
use otlp_protos::opentelemetry::proto::common::v1 as otlp_common;
use otlp_protos::opentelemetry::proto::logs::v1::LogRecord;
use otlp_protos::opentelemetry::proto::resource::v1::Resource;
use saluki_context::tags::{SharedTagSet, TagSet};
use saluki_context::tags::TagSet;
use saluki_core::data_model::event::log::{Log, LogStatus};
use serde_json::Value as JsonValue;
use stringtheory::MetaString;
Expand Down Expand Up @@ -196,7 +196,7 @@ fn safe_insert(map: &mut HashMap<MetaString, JsonValue>, key: &str, value: JsonV

pub fn transform_log_record(
mut lr: LogRecord, resource: &Resource, scope: Option<&otlp_common::InstrumentationScope>,
record_host: Option<MetaString>, record_service: Option<MetaString>, mut tags: SharedTagSet,
record_host: Option<MetaString>, record_service: Option<MetaString>, mut tags: TagSet,
) -> Log {
// Build additional properties map with resource, scope and record attributes
let mut additional_properties = HashMap::new();
Expand Down Expand Up @@ -265,14 +265,12 @@ pub fn transform_log_record(
k if k == DDTAGS_ATTR => {
if let Some(av) = kv.value.as_ref() {
if let Some(OtlpStringValue(s)) = av.value.as_ref() {
let mut extra = TagSet::default();
for raw in s.split(',') {
let t = raw.trim();
if !t.is_empty() {
extra.insert_tag(t);
tags.insert_tag(t);
}
}
tags.extend_from_shared(&extra.into_shared());
}
}
}
Expand Down
9 changes: 4 additions & 5 deletions lib/saluki-components/src/sources/otlp/logs/translator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use otlp_protos::opentelemetry::proto::common::v1::InstrumentationScope;
use otlp_protos::opentelemetry::proto::logs::v1::{LogRecord, ResourceLogs as OtlpResourceLogs, ScopeLogs};
use otlp_protos::opentelemetry::proto::resource::v1::Resource;
use saluki_context::origin::OriginTagsResolver;
use saluki_context::tags::{SharedTagSet, Tag};
use saluki_context::tags::{Tag, TagSet};
use saluki_core::data_model::event::Event;
use stringtheory::MetaString;

Expand All @@ -22,7 +22,7 @@ pub struct OtlpLogsTranslator {
resource: Resource,
host: Option<MetaString>,
service: Option<MetaString>,
attribute_tags: SharedTagSet,
attribute_tags: TagSet,
scope_logs: IntoIter<ScopeLogs>,
current_scope_logs: Option<(Option<InstrumentationScope>, IntoIter<LogRecord>)>,
}
Expand All @@ -43,18 +43,17 @@ impl OtlpLogsTranslator {
let service = get_string_attribute(&resource.attributes, SERVICE_NAME).map(MetaString::from);
let mut attribute_tags = tags_from_attributes(&resource.attributes);
attribute_tags.insert_tag(OTEL_SOURCE_TAG.clone());
let mut shared_attribute_tags = attribute_tags.into_shared();
let origin = raw_origin_from_attributes(&resource.attributes);
if let Some(resolver) = origin_tag_resolver {
let origin_tags = resolver.resolve_origin_tags(origin);
shared_attribute_tags.extend_from_shared(&origin_tags);
attribute_tags.merge_shared(&origin_tags);
}

Self {
resource,
host,
service,
attribute_tags: shared_attribute_tags,
attribute_tags,
scope_logs: resource_logs.scope_logs.into_iter(),
current_scope_logs: None,
}
Expand Down
15 changes: 6 additions & 9 deletions lib/saluki-components/src/transforms/apm_stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ use async_trait::async_trait;
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use opentelemetry_semantic_conventions::resource::{CONTAINER_ID, K8S_POD_UID};
use saluki_config::GenericConfiguration;
use saluki_context::{
origin::OriginTagCardinality,
tags::{SharedTagSet, TagSet},
};
use saluki_context::{origin::OriginTagCardinality, tags::TagSet};
use saluki_core::{
components::{transforms::*, ComponentContext},
data_model::event::{
Expand Down Expand Up @@ -184,7 +181,7 @@ impl ApmStats {
let resource_tags = trace.resource_tags();
let container_id = resolve_container_id(resource_tags);
let mut container_tags = if container_id.is_empty() {
SharedTagSet::default()
TagSet::default()
} else {
extract_container_tags(resource_tags)
};
Expand All @@ -194,7 +191,7 @@ impl ApmStats {
if let Some(workload_provider) = &self.workload_provider {
let entity_id = EntityId::Container(container_id.clone());
if let Some(tags) = workload_provider.get_tags_for_entity(&entity_id, OriginTagCardinality::Low) {
container_tags.extend_from_shared(&tags);
container_tags.merge_shared(&tags);
}
}
}
Expand Down Expand Up @@ -425,7 +422,7 @@ fn now_nanos() -> u64 {
}

/// Resolves container ID from OTLP resource tags.
fn resolve_container_id(resource_tags: &SharedTagSet) -> MetaString {
fn resolve_container_id(resource_tags: &TagSet) -> MetaString {
for key in [KEY_DATADOG_CONTAINER_ID, CONTAINER_ID, K8S_POD_UID] {
if let Some(tag) = resource_tags.get_single_tag(key) {
if let Some(value) = tag.value() {
Expand All @@ -440,11 +437,11 @@ fn resolve_container_id(resource_tags: &SharedTagSet) -> MetaString {
}

/// Extracts container tags from OTLP resource tags.
fn extract_container_tags(resource_tags: &SharedTagSet) -> SharedTagSet {
fn extract_container_tags(resource_tags: &TagSet) -> TagSet {
let mut container_tags_set = TagSet::default();
extract_container_tags_from_resource_tagset(resource_tags, &mut container_tags_set);

container_tags_set.into_shared()
container_tags_set
}

/// Extracts process tags from trace.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Span concentrator for APM stats computation.

use saluki_common::collections::FastHashMap;
use saluki_context::tags::SharedTagSet;
use saluki_context::tags::TagSet;
use saluki_core::data_model::event::trace::Span;
use saluki_core::data_model::event::trace_stats::{ClientStatsBucket, ClientStatsPayload};
use stringtheory::MetaString;
Expand Down Expand Up @@ -70,7 +70,7 @@ pub struct InfraTags {
/// Container ID from the tracer payload.
pub container_id: MetaString,
/// Container tags resolved from the container runtime.
pub container_tags: SharedTagSet,
pub container_tags: TagSet,
/// Hash of the process tags string.
pub process_tags_hash: u64,
/// Process tags string from the tracer payload.
Expand All @@ -79,7 +79,7 @@ pub struct InfraTags {

impl InfraTags {
pub fn new(
container_id: impl Into<MetaString>, container_tags: SharedTagSet, process_tags: impl Into<MetaString>,
container_id: impl Into<MetaString>, container_tags: TagSet, process_tags: impl Into<MetaString>,
) -> Self {
let process_tags: MetaString = process_tags.into();
let process_tags_hash = process_tags_hash(process_tags.as_ref());
Expand Down Expand Up @@ -173,7 +173,7 @@ impl SpanConcentrator {

pub fn flush(&mut self, now: u64, force: bool) -> Vec<ClientStatsPayload> {
let mut m = FastHashMap::<PayloadAggregationKey, Vec<ClientStatsBucket>>::default();
let mut container_tags_by_id = FastHashMap::<MetaString, SharedTagSet>::default();
let mut container_tags_by_id = FastHashMap::<MetaString, TagSet>::default();
let mut process_tags_by_hash = FastHashMap::<u64, MetaString>::default();

let timestamps: Vec<u64> = self.buckets.keys().copied().collect();
Expand Down
Loading
Loading