Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

from __future__ import annotations

from typing import Any

from opentelemetry._logs import Logger
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
Expand All @@ -24,6 +22,7 @@
OutputMessage,
ToolDefinition,
)
from opentelemetry.util.types import AttributeValue


class AgentInvocation(GenAIInvocation):
Expand Down Expand Up @@ -101,7 +100,7 @@ def __init__(

self._start(self._get_base_attributes())

def _get_base_attributes(self) -> dict[str, Any]:
def _get_base_attributes(self) -> dict[str, AttributeValue]:
"""Return sampling-relevant attributes available at span creation time."""
optional_attrs = (
(GenAI.GEN_AI_PROVIDER_NAME, self.provider),
Expand All @@ -115,7 +114,7 @@ def _get_base_attributes(self) -> dict[str, Any]:
**{k: v for k, v in optional_attrs if v is not None},
}

def _get_common_attributes(self) -> dict[str, Any]:
def _get_common_attributes(self) -> dict[str, AttributeValue]:
optional_attrs = (
(GenAI.GEN_AI_PROVIDER_NAME, self.provider),
(GenAI.GEN_AI_REQUEST_MODEL, self.request_model),
Expand All @@ -131,7 +130,7 @@ def _get_common_attributes(self) -> dict[str, Any]:
**{k: v for k, v in optional_attrs if v is not None},
}

def _get_request_attributes(self) -> dict[str, Any]:
def _get_request_attributes(self) -> dict[str, AttributeValue]:
optional_attrs = (
(GenAI.GEN_AI_CONVERSATION_ID, self.conversation_id),
(GenAI.GEN_AI_DATA_SOURCE_ID, self.data_source_id),
Expand All @@ -147,12 +146,12 @@ def _get_request_attributes(self) -> dict[str, Any]:
)
return {k: v for k, v in optional_attrs if v is not None}

def _get_response_attributes(self) -> dict[str, Any]:
def _get_response_attributes(self) -> dict[str, AttributeValue]:
if self.finish_reasons:
return {GenAI.GEN_AI_RESPONSE_FINISH_REASONS: self.finish_reasons}
return {}

def _get_usage_attributes(self) -> dict[str, Any]:
def _get_usage_attributes(self) -> dict[str, AttributeValue]:
optional_attrs = (
(GenAI.GEN_AI_USAGE_INPUT_TOKENS, self.input_tokens),
(GenAI.GEN_AI_USAGE_OUTPUT_TOKENS, self.output_tokens),
Expand All @@ -167,7 +166,7 @@ def _get_usage_attributes(self) -> dict[str, Any]:
)
return {k: v for k, v in optional_attrs if v is not None}

def _get_content_attributes_for_span(self) -> dict[str, Any]:
def _get_content_attributes_for_span(self) -> dict[str, AttributeValue]:
return get_content_attributes(
input_messages=self.input_messages,
output_messages=self.output_messages,
Expand All @@ -176,14 +175,14 @@ def _get_content_attributes_for_span(self) -> dict[str, Any]:
for_span=True,
)

def _get_metric_attributes(self) -> dict[str, Any]:
def _get_metric_attributes(self) -> dict[str, AttributeValue]:
optional_attrs = (
(GenAI.GEN_AI_PROVIDER_NAME, self.provider),
(GenAI.GEN_AI_REQUEST_MODEL, self.request_model),
(server_attributes.SERVER_ADDRESS, self.server_address),
(server_attributes.SERVER_PORT, self.server_port),
)
attrs: dict[str, Any] = {
attrs: dict[str, AttributeValue] = {
GenAI.GEN_AI_OPERATION_NAME: self._operation_name,
**{k: v for k, v in optional_attrs if v is not None},
}
Expand All @@ -204,7 +203,7 @@ def _apply_finish(self, error: Error | None = None) -> None:
if error is not None:
self._apply_error_attributes(error)

attributes: dict[str, Any] = {}
attributes: dict[str, AttributeValue] = {}
attributes.update(self._get_common_attributes())
attributes.update(self._get_request_attributes())
attributes.update(self._get_response_attributes())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

from __future__ import annotations

from typing import Any

from opentelemetry._logs import Logger
Comment on lines 4 to 6
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
Expand Down Expand Up @@ -60,7 +58,7 @@ def __init__(
self.response_model_name: str | None = None
self._start(self._get_base_attributes())

def _get_base_attributes(self) -> dict[str, Any]:
def _get_base_attributes(self) -> dict[str, AttributeValue]:
"""Return sampling-relevant attributes available at span creation time."""
optional_attrs = (
(GenAI.GEN_AI_REQUEST_MODEL, self.request_model),
Expand All @@ -73,7 +71,7 @@ def _get_base_attributes(self) -> dict[str, Any]:
**{k: v for k, v in optional_attrs if v is not None},
}

def _get_metric_attributes(self) -> dict[str, Any]:
def _get_metric_attributes(self) -> dict[str, AttributeValue]:
optional_attrs = (
(GenAI.GEN_AI_PROVIDER_NAME, self.provider),
(GenAI.GEN_AI_REQUEST_MODEL, self.request_model),
Expand Down Expand Up @@ -104,7 +102,7 @@ def _apply_finish(self, error: Error | None = None) -> None:
(GenAI.GEN_AI_RESPONSE_MODEL, self.response_model_name),
(GenAI.GEN_AI_USAGE_INPUT_TOKENS, self.input_tokens),
)
attributes: dict[str, Any] = {
attributes: dict[str, AttributeValue] = {
GenAI.GEN_AI_OPERATION_NAME: self._operation_name,
**{
key: value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any

from opentelemetry._logs import Logger, LogRecord
from opentelemetry.semconv._incubating.attributes import (
Expand All @@ -28,6 +27,7 @@
from opentelemetry.util.genai.utils import (
should_emit_event,
)
from opentelemetry.util.types import AttributeValue

# TODO: Migrate to GenAI constants once available in semconv package
_GEN_AI_REASONING_OUTPUT_TOKENS = "gen_ai.usage.reasoning.output_tokens"
Expand Down Expand Up @@ -97,7 +97,7 @@ def __init__(
self.output_type: str | None = None
self._start(self._get_base_attributes())

def _get_message_attributes(self, *, for_span: bool) -> dict[str, Any]:
def _get_message_attributes(self, *, for_span: bool) -> dict[str, AttributeValue]:
return get_content_attributes(
input_messages=self.input_messages,
output_messages=self.output_messages,
Expand All @@ -118,7 +118,7 @@ def _get_finish_reasons(self) -> list[str] | None:
return reasons or None
return None

def _get_base_attributes(self) -> dict[str, Any]:
def _get_base_attributes(self) -> dict[str, AttributeValue]:
optional_attrs = (
(GenAI.GEN_AI_REQUEST_MODEL, self.request_model),
(GenAI.GEN_AI_PROVIDER_NAME, self.provider),
Expand All @@ -130,7 +130,7 @@ def _get_base_attributes(self) -> dict[str, Any]:
**{k: v for k, v in optional_attrs if v is not None},
}

def _get_attributes(self) -> dict[str, Any]:
def _get_attributes(self) -> dict[str, AttributeValue]:
attrs = self._get_base_attributes()
if self.output_tokens is None and self.thinking_tokens is None:
output_tokens = None
Expand Down Expand Up @@ -170,7 +170,7 @@ def _get_attributes(self) -> dict[str, Any]:
attrs.update({k: v for k, v in optional_attrs if v is not None})
return attrs

def _get_metric_attributes(self) -> dict[str, Any]:
def _get_metric_attributes(self) -> dict[str, AttributeValue]:
attrs = self._get_base_attributes()
if self.response_model_name is not None:
attrs[GenAI.GEN_AI_RESPONSE_MODEL] = self.response_model_name
Expand Down Expand Up @@ -243,9 +243,9 @@ class LLMInvocation:
finish_reasons: list[str] | None = None
input_tokens: int | None = None
output_tokens: int | None = None
attributes: dict[str, Any] = field(default_factory=dict) # pyright: ignore[reportUnknownVariableType]
attributes: dict[str, AttributeValue] = field(default_factory=dict) # pyright: ignore[reportUnknownVariableType]
"""Additional attributes to set on spans and/or events. Not set on metrics."""
metric_attributes: dict[str, Any] = field(default_factory=dict) # pyright: ignore[reportUnknownVariableType]
metric_attributes: dict[str, AttributeValue] = field(default_factory=dict) # pyright: ignore[reportUnknownVariableType]
"""Additional attributes to set on metrics. Must be low cardinality. Not set on spans or events."""
temperature: float | None = None
top_p: float | None = None
Expand Down Expand Up @@ -331,4 +331,4 @@ def span(self) -> Span:
self._inference_invocation.span
if self._inference_invocation is not None
else INVALID_SPAN
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
gen_ai_json_dumps,
get_content_capturing_mode,
)
from opentelemetry.util.types import AttributeValue

if TYPE_CHECKING:
from opentelemetry.util.genai.metrics import InvocationMetricsRecorder
Expand Down Expand Up @@ -61,19 +62,19 @@ def __init__(
operation_name: str,
span_name: str,
span_kind: SpanKind = SpanKind.CLIENT,
attributes: dict[str, Any] | None = None,
metric_attributes: dict[str, Any] | None = None,
attributes: dict[str, AttributeValue] | None = None,
metric_attributes: dict[str, AttributeValue] | None = None,
) -> None:
self._tracer = tracer
self._metrics_recorder = metrics_recorder
self._logger = logger
self._completion_hook = completion_hook
self._operation_name: str = operation_name
self.attributes: dict[str, Any] = (
self.attributes: dict[str, AttributeValue] = (
{} if attributes is None else attributes
)
"""Additional attributes to set on spans and/or events. Not set on metrics."""
self.metric_attributes: dict[str, Any] = (
self.metric_attributes: dict[str, AttributeValue] = (
{} if metric_attributes is None else metric_attributes
)
"""Additional attributes to set on metrics. Must be low cardinality. Not set on spans or events."""
Expand All @@ -84,7 +85,7 @@ def __init__(
self._context_token: ContextToken | None = None
self._monotonic_start_s: float | None = None

def _start(self, attributes: dict[str, Any] | None = None) -> None:
def _start(self, attributes: dict[str, AttributeValue] | None = None) -> None:
"""Start the invocation span and attach it to the current context.

Args:
Expand All @@ -99,7 +100,7 @@ def _start(self, attributes: dict[str, Any] | None = None) -> None:
self._monotonic_start_s = timeit.default_timer()
self._context_token = attach(self._span_context)

def _get_metric_attributes(self) -> dict[str, Any]:
def _get_metric_attributes(self) -> dict[str, AttributeValue]:
"""Return low-cardinality attributes for metric recording."""
return self.metric_attributes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

from __future__ import annotations

from typing import Any

from opentelemetry._logs import Logger
Comment on lines 4 to 6
from opentelemetry.semconv._incubating.attributes import (
gen_ai_attributes as GenAI,
Expand Down Expand Up @@ -74,7 +72,7 @@ def __init__(
self.tool_description = tool_description
self._start(self._get_base_attributes())

def _get_base_attributes(self) -> dict[str, Any]:
def _get_base_attributes(self) -> dict[str, AttributeValue]:
"""Return sampling-relevant attributes available at span creation time."""
optional_attrs = (
(GenAI.GEN_AI_TOOL_NAME, self.name),
Expand All @@ -87,8 +85,8 @@ def _get_base_attributes(self) -> dict[str, Any]:
**{k: v for k, v in optional_attrs if v is not None},
}

def _get_metric_attributes(self) -> dict[str, Any]:
attrs: dict[str, Any] = {
def _get_metric_attributes(self) -> dict[str, AttributeValue]:
attrs: dict[str, AttributeValue] = {
GenAI.GEN_AI_OPERATION_NAME: self._operation_name,
}
attrs.update(self.metric_attributes)
Expand All @@ -115,7 +113,7 @@ def _apply_finish(self, error: Error | None = None) -> None:
else None,
),
)
attributes: dict[str, Any] = {
attributes: dict[str, AttributeValue] = {
GenAI.GEN_AI_OPERATION_NAME: self._operation_name,
**{k: v for k, v in optional_attrs if v is not None},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from __future__ import annotations

from dataclasses import asdict
from typing import Any

from opentelemetry._logs import Logger
from opentelemetry.semconv._incubating.attributes import (
Expand All @@ -22,6 +21,7 @@
gen_ai_json_dumps,
should_capture_content_on_spans,
)
from opentelemetry.util.types import AttributeValue


class WorkflowInvocation(GenAIInvocation):
Expand Down Expand Up @@ -57,14 +57,14 @@ def __init__(
self.output_messages: list[OutputMessage] = []
self._start(self._get_base_attributes())

def _get_base_attributes(self) -> dict[str, Any]:
def _get_base_attributes(self) -> dict[str, AttributeValue]:
"""Return sampling-relevant attributes available at span creation time."""
attrs: dict[str, Any] = {
attrs: dict[str, AttributeValue] = {
GenAI.GEN_AI_OPERATION_NAME: self._operation_name,
}
return attrs

def _get_messages_for_span(self) -> dict[str, Any]:
def _get_messages_for_span(self) -> dict[str, AttributeValue]:
if not should_capture_content_on_spans():
return {}
optional_attrs = (
Expand All @@ -86,8 +86,8 @@ def _get_messages_for_span(self) -> dict[str, Any]:
}

def _apply_finish(self, error: Error | None = None) -> None:
attributes: dict[str, Any] = {
GenAI.GEN_AI_OPERATION_NAME: self._operation_name
attributes: dict[str, AttributeValue] = {
GenAI.GEN_AI_OPERATION_NAME: self._operation_name,
}
attributes.update(self._get_messages_for_span())
if error is not None:
Expand All @@ -98,4 +98,4 @@ def _apply_finish(self, error: Error | None = None) -> None:
inputs=self.input_messages,
outputs=self.output_messages,
)
# TODO: Add workflow metrics when supported
# TODO: Add workflow metrics when supported
Loading