Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 14 additions & 70 deletions tests/otel/test_tracing_otlp.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,6 @@
import re
from enum import Enum
from utils import weblog, interfaces, scenarios, features
from typing import Any
from collections.abc import Iterator


def _snake_to_camel(snake_key: str) -> str:
parts = snake_key.split("_")
return parts[0].lower() + "".join(p.capitalize() for p in parts[1:])


def get_otlp_key(d: dict[str, Any] | None, snake_case_key: str, *, is_json: bool, default: Any = None) -> Any: # noqa: ANN401
"""Look up a field by its snake_case name when is_json is false, or its camelCase equivalent when is_json is true.
Fields must be camelCase for JSON Protobuf encoding. See https://opentelemetry.io/docs/specs/otlp/#json-protobuf-encoding
"""
if d is None:
return default
key = _snake_to_camel(snake_case_key) if is_json else snake_case_key
return d.get(key, default)


# See https://github.com/open-telemetry/opentelemetry-proto/blob/v1.9.0/opentelemetry/proto/trace/v1/trace.proto#L153
Expand All @@ -42,48 +25,13 @@ class StatusCode(Enum):
STATUS_CODE_ERROR = 2


def get_keyvalue_generator(attributes: list[dict]) -> Iterator[tuple[str, Any]]:
for key_value in attributes:
if key_value["value"].get("string_value"):
yield key_value["key"], key_value["value"]["string_value"]
elif key_value["value"].get("stringValue"):
yield key_value["key"], key_value["value"]["stringValue"]
elif key_value["value"].get("bool_value"):
yield key_value["key"], key_value["value"]["bool_value"]
elif key_value["value"].get("boolValue"):
yield key_value["key"], key_value["value"]["boolValue"]
elif key_value["value"].get("int_value"):
yield key_value["key"], key_value["value"]["int_value"]
elif key_value["value"].get("intValue"):
yield key_value["key"], key_value["value"]["intValue"]
elif key_value["value"].get("double_value"):
yield key_value["key"], key_value["value"]["double_value"]
elif key_value["value"].get("doubleValue"):
yield key_value["key"], key_value["value"]["doubleValue"]
elif key_value["value"].get("array_value"):
yield key_value["key"], key_value["value"]["array_value"]
elif key_value["value"].get("arrayValue"):
yield key_value["key"], key_value["value"]["arrayValue"]
elif key_value["value"].get("kvlist_value"):
yield key_value["key"], key_value["value"]["kvlist_value"]
elif key_value["value"].get("kvlistValue"):
yield key_value["key"], key_value["value"]["kvlistValue"]
elif key_value["value"].get("bytes_value"):
yield key_value["key"], key_value["value"]["bytes_value"]
elif key_value["value"].get("bytesValue"):
yield key_value["key"], key_value["value"]["bytesValue"]
else:
raise ValueError(f"Unknown attribute value: {key_value['value']}")


# @scenarios.apm_tracing_e2e_otel
@features.otel_api
@scenarios.apm_tracing_otlp
class Test_Otel_Tracing_OTLP:
def setup_single_server_trace(self):
self.start_time_ns = time.time_ns()
self.req = weblog.get("/")
self.end_time_ns = time.time_ns()

def test_single_server_trace(self):
data = list(interfaces.open_telemetry.get_otel_spans(self.req))
Expand All @@ -98,16 +46,12 @@ def test_single_server_trace(self):
is_json = request_headers.get("content-type") == "application/json"

# Assert that there is only one resource span (i.e. SDK) in the OTLP request
resource_spans = get_otlp_key(content, "resource_spans", is_json=is_json)
expected_key = _snake_to_camel("resource_spans") if is_json else "resource_spans"
assert resource_spans is not None, f"missing '{expected_key}' on content: {content}"
resource_spans = content["resourceSpans"]
assert resource_spans is not None, f"missing 'resourceSpans' on content: {content}"
assert len(resource_spans) == 1, f"expected 1 resource span, got {len(resource_spans)}"
resource_span = resource_spans[0]

attributes = {
key_value["key"]: get_otlp_key(key_value["value"], "string_value", is_json=is_json)
for key_value in resource_span.get("resource").get("attributes")
}
attributes = resource_span.get("resource", {}).get("attributes", {})

# Assert that the resource attributes contain the service-level attributes and tracer-level attributes we expect
# TODO: Assert the following attributes: runtime-id, git.commit.sha, git.repository_url
Expand All @@ -117,9 +61,9 @@ def test_single_server_trace(self):
attributes.get("deployment.environment.name") == "system-tests"
or attributes.get("deployment.environment") == "system-tests"
)
assert attributes.get("telemetry.sdk.name") == "datadog"
# assert attributes.get("telemetry.sdk.name") == "datadog"
assert "telemetry.sdk.language" in attributes
assert "telemetry.sdk.version" in attributes
# assert "telemetry.sdk.version" in attributes

# Assert that the `traceId` and `spanId` JSON fields are valid case-insensitive hexadecimal strings, not base64-encoded strings as defined in the standard Protobuf JSON Mapping.
# See https://opentelemetry.io/docs/specs/otlp/#json-protobuf-encoding
Expand All @@ -133,23 +77,23 @@ def test_single_server_trace(self):
)

# Assert that the span fields match the expected values
span_start_time_ns = int(get_otlp_key(span, "start_time_unix_nano", is_json=is_json))
span_end_time_ns = int(get_otlp_key(span, "end_time_unix_nano", is_json=is_json))
span_start_time_ns = int(span["startTimeUnixNano"])
span_end_time_ns = int(span["endTimeUnixNano"])
assert span_start_time_ns >= self.start_time_ns
assert span_end_time_ns >= span_start_time_ns
assert span_end_time_ns <= self.end_time_ns

assert get_otlp_key(span, "name", is_json=is_json)
assert get_otlp_key(span, "kind", is_json=is_json) == SpanKind.SERVER.value
assert get_otlp_key(span, "attributes", is_json=is_json) is not None
assert span["name"]
assert span["kind"] == SpanKind.SERVER.value
assert span["attributes"] is not None
status = span.get("status", {})
# An absent or empty status dict both mean STATUS_CODE_UNSET (protobuf default = 0).
assert (
get_otlp_key(span, "status", is_json=is_json) is None
or get_otlp_key(span, "status", is_json=is_json).get("code") == StatusCode.STATUS_CODE_UNSET.value
not status or status.get("code", StatusCode.STATUS_CODE_UNSET.value) == StatusCode.STATUS_CODE_UNSET.value
)

# Assert HTTP tags
# Convert attributes list to a dictionary, but for now only handle key_value objects with stringValue
span_attributes = dict(get_keyvalue_generator(get_otlp_key(span, "attributes", is_json=is_json)))
span_attributes = span["attributes"]
method = span_attributes.get("http.method") or span_attributes.get("http.request.method")
status_code = span_attributes.get("http.status_code") or span_attributes.get("http.response.status_code")
assert method == "GET", f"HTTP method is not GET, got {method}"
Expand Down
45 changes: 21 additions & 24 deletions utils/interfaces/_open_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,25 @@ def ingest_file(self, src_path: str):
return super().ingest_file(src_path)

def get_otel_trace_id(self, request: HttpResponse):
# Paths filter intercepted OTLP export requests (weblog → proxy), not weblog or backend URLs.
paths = ["/api/v0.2/traces", "/v1/traces"]
rid = request.get_rid()

if rid:
logger.debug(f"Try to find traces related to request {rid}")

for data in self.get_data(path_filters=paths):
for resource_span in data.get("request").get("content").get("resourceSpans"):
for scope_span in resource_span.get("scopeSpans"):
for span in scope_span.get("spans"):
for attribute in span.get("attributes", []):
attr_key = attribute.get("key")
attr_val = attribute.get("value").get("string_value") or attribute.get("value").get(
"stringValue"
)
if (attr_key == "http.request.headers.user-agent" and rid in attr_val) or (
attr_key == "http.useragent" and rid in attr_val
):
yield span.get("traceId")
content = data.get("request").get("content")
resource_spans = content.get("resourceSpans") or []
for resource_span in resource_spans:
scope_spans = resource_span.get("scopeSpans") or []
for scope_span in scope_spans:
for span in scope_span.get("spans", []):
attributes = span.get("attributes", {})
request_headers_user_agent_value = attributes.get("http.request.headers.user-agent", "")
user_agent_value = attributes.get("http.useragent", "")
if rid in request_headers_user_agent_value or rid in user_agent_value:
yield span.get("trace_id") or span.get("traceId")

def get_otel_spans(self, request: HttpResponse):
paths = ["/api/v0.2/traces", "/v1/traces"]
Expand All @@ -52,18 +52,15 @@ def get_otel_spans(self, request: HttpResponse):

for data in self.get_data(path_filters=paths):
content = data.get("request").get("content")
resource_spans = content.get("resource_spans") or content.get("resourceSpans")
logger.debug(f"[get_otel_spans] content: {content}")
resource_spans = content.get("resourceSpans") or []
for resource_span in resource_spans:
scope_spans = resource_span.get("scope_spans") or resource_span.get("scopeSpans")
scope_spans = resource_span.get("scopeSpans")
for scope_span in scope_spans:
for span in scope_span.get("spans"):
for attribute in span.get("attributes", []):
attr_key = attribute.get("key")
attr_val = attribute.get("value").get("string_value") or attribute.get("value").get(
"stringValue"
)
if (attr_key == "http.request.headers.user-agent" and rid in attr_val) or (
attr_key == "http.useragent" and rid in attr_val
):
yield data.get("request"), content, span
break # Skip to next span
attributes = span.get("attributes", {})
request_headers_user_agent_value = attributes.get("http.request.headers.user-agent", "")
user_agent_value = attributes.get("http.useragent", "")
if rid in request_headers_user_agent_value or rid in user_agent_value:
yield data.get("request"), content, span
break # Skip to next span
44 changes: 38 additions & 6 deletions utils/proxy/_deserializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)
from ._decoders.protobuf_schemas import MetricPayload, TracePayload, SketchPayload, BackendResponsePayload
from .traces.trace_v1 import deserialize_v1_trace, _uncompress_agent_v1_trace, decode_appsec_s_value
from .traces.otlp_v1 import deserialize_otlp_v1_trace
from .utils import logger


Expand Down Expand Up @@ -126,6 +127,9 @@ def json_load():
return None

if content_type and any(mime_type in content_type for mime_type in ("application/json", "text/json")):
# For OTLP traces, flatten some attributes to simplify the payload for testing purposes
if path == "/v1/traces":
return deserialize_otlp_v1_trace(json_load())
return json_load()

if path == "/v0.7/config": # Kyle, please add content-type header :)
Expand Down Expand Up @@ -178,17 +182,45 @@ def json_load():
assert isinstance(content, bytes)
dd_protocol = get_header_value("dd-protocol", message["headers"])
if dd_protocol == "otlp" and "traces" in path:
return MessageToDict(ExportTraceServiceRequest.FromString(content))
return deserialize_otlp_v1_trace(
MessageToDict(
ExportTraceServiceRequest.FromString(content),
preserving_proto_field_name=False,
use_integers_for_enums=True,
)
)
if dd_protocol == "otlp" and "metrics" in path:
return MessageToDict(ExportMetricsServiceRequest.FromString(content))
return MessageToDict(
ExportMetricsServiceRequest.FromString(content),
preserving_proto_field_name=False,
use_integers_for_enums=True,
)
if dd_protocol == "otlp" and "logs" in path:
return MessageToDict(ExportLogsServiceRequest.FromString(content))
return MessageToDict(
ExportLogsServiceRequest.FromString(content),
preserving_proto_field_name=False,
use_integers_for_enums=True,
)
if path == "/v1/traces":
return MessageToDict(ExportTraceServiceResponse.FromString(content))
return deserialize_otlp_v1_trace(
MessageToDict(
ExportTraceServiceResponse.FromString(content),
preserving_proto_field_name=False,
use_integers_for_enums=True,
)
)
if path == "/v1/metrics":
return MessageToDict(ExportMetricsServiceResponse.FromString(content))
return MessageToDict(
ExportMetricsServiceResponse.FromString(content),
preserving_proto_field_name=False,
use_integers_for_enums=True,
)
if path == "/v1/logs":
return MessageToDict(ExportLogsServiceResponse.FromString(content))
return MessageToDict(
ExportLogsServiceResponse.FromString(content),
preserving_proto_field_name=False,
use_integers_for_enums=True,
)
if path == "/api/v0.2/traces":
result = MessageToDict(TracePayload.FromString(content))
_deserialized_nested_json_from_trace_payloads(result, interface)
Expand Down
66 changes: 66 additions & 0 deletions utils/proxy/traces/otlp_v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from collections.abc import Iterator
from typing import Any


def _flatten_otlp_attributes(attributes: list[dict]) -> Iterator[tuple[str, Any]]:
for key_value in attributes:
v = key_value["value"]
# Use `is not None` rather than truthiness so zero/false/empty values are not skipped.
if v.get("stringValue") is not None:
yield key_value["key"], v["stringValue"]
elif v.get("boolValue") is not None:
yield key_value["key"], v["boolValue"]
elif v.get("intValue") is not None:
yield key_value["key"], v["intValue"]
elif v.get("doubleValue") is not None:
yield key_value["key"], v["doubleValue"]
elif v.get("arrayValue") is not None:
yield key_value["key"], v["arrayValue"]
elif v.get("kvlistValue") is not None:
yield key_value["key"], v["kvlistValue"]
elif v.get("bytesValue") is not None:
yield key_value["key"], v["bytesValue"]
else:
raise ValueError(f"Unknown attribute value: {v}")


def deserialize_otlp_v1_trace(content: dict) -> dict:
# Iterate the OTLP payload to flatten any attributes dictionary
# Attributes are represented in the following way:
# - {"key": "value": { "stringValue": <VALUE> }}
# - {"key": "value": { "boolValue": <VALUE> }}
# - etc.
#
# We'll remap them to simple key-value pairs {"key": <VALUE>, "key2": <VALUE2>, etc.}
for resource_span in content.get("resourceSpans", []):
resource = resource_span.get("resource", {})
if resource:
remapped_attributes = dict(_flatten_otlp_attributes(resource.get("attributes", [])))
resource["attributes"] = remapped_attributes

for scope_span in resource_span.get("scopeSpans", []):
scope = scope_span.get("scope", {})
scope_attributes = scope.get("attributes", [])
if scope and scope_attributes:
remapped_attributes = dict(_flatten_otlp_attributes(scope_attributes))
scope["attributes"] = remapped_attributes

for span in scope_span.get("spans", []):
span_attributes = span.get("attributes", [])
if span_attributes:
remapped_attributes = dict(_flatten_otlp_attributes(span_attributes))
span["attributes"] = remapped_attributes

for event in span.get("events", []):
event_attributes = event.get("attributes", [])
if event_attributes:
remapped_attributes = dict(_flatten_otlp_attributes(event_attributes))
event["attributes"] = remapped_attributes

for link in span.get("links", []):
link_attributes = link.get("attributes", [])
if link_attributes:
remapped_attributes = dict(_flatten_otlp_attributes(link_attributes))
link["attributes"] = remapped_attributes

return content