Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 126 additions & 18 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@
from ddtrace.llmobs._utils import AnnotationContext
from ddtrace.llmobs._utils import LinkTracker
from ddtrace.llmobs._utils import _batched
from ddtrace.llmobs._utils import _annotate_llmobs_span_data
from ddtrace.llmobs._utils import _get_llmobs_data_metastruct
from ddtrace.llmobs._utils import _get_ml_app
from ddtrace.llmobs._utils import _get_nearest_llmobs_ancestor
Expand All @@ -165,6 +166,7 @@
from ddtrace.llmobs.types import Message
from ddtrace.llmobs.types import Prompt
from ddtrace.llmobs.types import PromptFallback
from ddtrace.llmobs.types import ToolDefinition
from ddtrace.llmobs.types import _ErrorField
from ddtrace.llmobs.types import _Meta
from ddtrace.llmobs.types import _MetaIO
Expand Down Expand Up @@ -320,20 +322,52 @@ class LLMObsSpan:

Passed to the `span_processor` function in the `enable` or `register_processor` methods.

Fields:
input: List of input messages. For LLM spans these are role/content dicts; for other kinds,
a single-element list with a "content" value string.
output: List of output messages. Same conventions as "input".
_tags: Raw span tags dict. Access via "get_tag()".
name: The display name of the span (e.g. "my_llm_call"). Mutations are written back.
_span_kind: The kind of the span (e.g. "llm", "agent", "tool", "task",
"embedding", "retrieval", "workflow"). Read-only; access via "get_span_kind()".
model_name: The model name (e.g. "gpt-4o"). Mutations are written back.
model_provider: The model provider (e.g. "openai"). Mutations are written back (lowercased).
session_id: The session ID to group spans. Mutations are written back.
ml_app: The ML app name. Mutations are written back.
metadata: Dictionary of arbitrary metadata key-value pairs. Mutations are written back.
metrics: Dictionary of numeric metric key-value pairs (e.g. token counts). Mutations are written back.
tool_definitions: List of tool definition dicts for LLM tool-calling. Mutations are written back.
prompt: The prompt object for LLM spans, or "None". Mutations are written back.

Example::
def span_processor(span: LLMObsSpan) -> Optional[LLMObsSpan]:
# Modify input/output
if span.get_tag("omit_span") == "1":
return None
if span.get_tag("no_input") == "1":
span.input = []
# Rename the span based on model
span.name = f"{span.model_provider}.{span.model_name}"
return span
"""

input: list[Message] = field(default_factory=list)
output: list[Message] = field(default_factory=list)
_tags: dict[str, str] = field(default_factory=dict)

# Span-level identity fields
name: str = ""
_span_kind: str = "" # read-only — write-back is ignored
model_name: str = ""
model_provider: str = ""
session_id: str = ""
ml_app: str = ""

# Annotation fields
metadata: dict[str, Any] = field(default_factory=dict)
metrics: dict[str, Any] = field(default_factory=dict)
tool_definitions: list[ToolDefinition] = field(default_factory=list)
prompt: Optional[Prompt] = field(default=None)

def get_tag(self, key: str) -> Optional[str]:
"""Get a tag from the span.

Expand All @@ -343,6 +377,9 @@ def get_tag(self, key: str) -> Optional[str]:
"""
return self._tags.get(key)

def get_span_kind(self) -> str:
return self._span_kind


def _build_llmobs_span(
span_kind: str,
Expand Down Expand Up @@ -430,6 +467,14 @@ def _build_span_meta(
elif output_type == "value" and llmobs_span.output:
meta["output"]["value"] = llmobs_span.output[0].get("content", "")

# Write back processor mutations for meta-level fields
meta["metadata"] = llmobs_span.metadata
meta["model_name"] = llmobs_span.model_name
meta["model_provider"] = llmobs_span.model_provider.lower()
meta["tool_definitions"] = llmobs_span.tool_definitions
if llmobs_span.prompt is not None:
meta.setdefault(LLMOBS_STRUCT.INPUT, _MetaIO())["prompt"] = llmobs_span.prompt

return meta


Expand Down Expand Up @@ -514,7 +559,9 @@ def _llmobs_span_event(self, span: Span) -> Optional[LLMObsSpanEvent]:
return self._build_span_event_from_meta_struct(span, llmobs_data)
return self._build_span_event_from_ctx_items(span)

def _apply_user_span_processor(self, llmobs_span: LLMObsSpan, llmobs_data: LLMObsSpanData) -> Optional[LLMObsSpan]:
def _apply_user_span_processor(
self, span: Span, llmobs_span: LLMObsSpan, llmobs_data: LLMObsSpanData
) -> Optional[LLMObsSpan]:
"""Run the user span processor.

Returns the possibly mutated span, or None if the span should be dropped.
Expand All @@ -524,7 +571,19 @@ def _apply_user_span_processor(self, llmobs_span: LLMObsSpan, llmobs_data: LLMOb
return llmobs_span
error = False
try:
llmobs_meta = llmobs_data.get(LLMOBS_STRUCT.META) or _Meta()
llmobs_input = llmobs_meta.get(LLMOBS_STRUCT.INPUT) or _MetaIO()
llmobs_span._tags = cast(dict[str, str], llmobs_data.get(LLMOBS_STRUCT.TAGS, {}))
llmobs_span.name = _get_span_name(span)
llmobs_span._span_kind = _get_span_kind(span) or ""
llmobs_span.model_name = llmobs_meta.get(LLMOBS_STRUCT.MODEL_NAME) or ""
llmobs_span.model_provider = (llmobs_meta.get(LLMOBS_STRUCT.MODEL_PROVIDER) or "custom").lower()
llmobs_span.session_id = _get_session_id(span) or ""
llmobs_span.ml_app = _get_ml_app(span) or ""
llmobs_span.metadata = dict(llmobs_meta.get(LLMOBS_STRUCT.METADATA) or {})
llmobs_span.metrics = dict(llmobs_data.get(LLMOBS_STRUCT.METRICS) or {})
llmobs_span.tool_definitions = list(llmobs_meta.get(LLMOBS_STRUCT.TOOL_DEFINITIONS) or [])
llmobs_span.prompt = llmobs_input.get(LLMOBS_STRUCT.PROMPT)
result = self._user_span_processor(llmobs_span)
if result is None:
return None
Expand Down Expand Up @@ -564,16 +623,17 @@ def _build_span_event_from_meta_struct(self, span: Span, llmobs_data: LLMObsSpan
core.dispatch(DISPATCH_ON_LLM_SPAN_FINISH, (span,))

llmobs_span, input_type, output_type = _build_llmobs_span(span_kind, llmobs_input, llmobs_output)
user_processed_span = self._apply_user_span_processor(llmobs_span, llmobs_data)
user_processed_span = self._apply_user_span_processor(span, llmobs_span, llmobs_data)
if user_processed_span is None:
return None
llmobs_span = user_processed_span

# Wait to build meta until after user processors apply and potentially mutate I/O
meta = _build_span_meta(span, llmobs_span, llmobs_meta, span_kind, input_type, output_type)
metrics = llmobs_data.get(LLMOBS_STRUCT.METRICS) or {}
session_id = _get_session_id(span)
tags = self._llmobs_tags(span, ml_app, session_id, True, llmobs_data)
metrics = llmobs_span.metrics
session_id = llmobs_span.session_id or _get_session_id(span)
ml_app_final = llmobs_span.ml_app or ml_app
tags = self._llmobs_tags(span, ml_app_final, session_id, True, llmobs_data)
span_links = get_span_links(span)
_dd_attrs = {
"span_id": str(span.span_id),
Expand All @@ -587,7 +647,7 @@ def _build_span_event_from_meta_struct(self, span: Span, llmobs_data: LLMObsSpan
"trace_id": llmobs_trace_id,
"span_id": str(span.span_id),
"parent_id": parent_id,
"name": _get_span_name(span),
"name": llmobs_span.name or _get_span_name(span),
"start_ns": span.start_ns,
"duration": cast(int, span.duration_ns),
"status": "error" if span.error else "ok",
Expand Down Expand Up @@ -721,6 +781,16 @@ def _build_span_event_from_ctx_items(self, span: Span) -> Optional[LLMObsSpanEve
error = False
try:
llmobs_span._tags = cast(dict[str, str], span._get_ctx_item(TAGS))
llmobs_span.name = _get_span_name(span)
llmobs_span._span_kind = span_kind
llmobs_span.model_name = span._get_ctx_item(MODEL_NAME) or ""
llmobs_span.model_provider = (span._get_ctx_item(MODEL_PROVIDER) or "custom").lower()
llmobs_span.session_id = _get_session_id(span) or ""
llmobs_span.ml_app = _get_ml_app(span) or ""
llmobs_span.metadata = dict(span._get_ctx_item(METADATA) or {})
llmobs_span.metrics = dict(span._get_ctx_item(METRICS) or {})
llmobs_span.tool_definitions = list(span._get_ctx_item(TOOL_DEFINITIONS) or [])
llmobs_span.prompt = span._get_ctx_item(INPUT_PROMPT)
user_llmobs_span = self._user_span_processor(llmobs_span)
if user_llmobs_span is None:
return None
Expand Down Expand Up @@ -750,11 +820,18 @@ def _build_span_event_from_ctx_items(self, span: Span) -> Optional[LLMObsSpanEve
elif output_type == "value":
meta["output"]["value"] = llmobs_span.output[0].get("content", "")

# Write back processor mutations for meta-level fields
meta["metadata"] = llmobs_span.metadata
meta["model_name"] = llmobs_span.model_name
meta["model_provider"] = llmobs_span.model_provider.lower()
meta["tool_definitions"] = llmobs_span.tool_definitions
if llmobs_span.prompt is not None:
meta.setdefault(LLMOBS_STRUCT.INPUT, _MetaIO())["prompt"] = llmobs_span.prompt

if not meta["input"]:
meta.pop("input")
if not meta["output"]:
meta.pop("output")
metrics = span._get_ctx_item(METRICS) or {}
ml_app = _get_ml_app(span)

if ml_app is None:
Expand All @@ -763,18 +840,21 @@ def _build_span_event_from_ctx_items(self, span: Span) -> Optional[LLMObsSpanEve
"Ensure this configuration is set before running your application."
)

span._set_ctx_item(ML_APP, ml_app)
ml_app_final = llmobs_span.ml_app or ml_app
span._set_ctx_item(ML_APP, ml_app_final)
parent_id = span._get_ctx_item(PARENT_ID_KEY) or ROOT_PARENT_ID

llmobs_trace_id = span._get_ctx_item(LLMOBS_TRACE_ID)
if llmobs_trace_id is None:
raise ValueError("Failed to extract LLMObs trace ID from span context.")

metrics = llmobs_span.metrics
session_id = llmobs_span.session_id or _get_session_id(span)
llmobs_span_event: LLMObsSpanEvent = {
"trace_id": format_trace_id(llmobs_trace_id),
"span_id": str(span.span_id),
"parent_id": parent_id,
"name": _get_span_name(span),
"name": llmobs_span.name or _get_span_name(span),
"start_ns": span.start_ns,
"duration": cast(int, span.duration_ns),
"status": "error" if span.error else "ok",
Expand All @@ -783,12 +863,11 @@ def _build_span_event_from_ctx_items(self, span: Span) -> Optional[LLMObsSpanEve
"tags": [],
"_dd": _dd_attrs,
}
session_id = _get_session_id(span)
if session_id is not None:
span._set_ctx_item(SESSION_ID, session_id)
llmobs_span_event["session_id"] = session_id

llmobs_span_event["tags"] = self._llmobs_tags(span, ml_app, session_id, False, None)
llmobs_span_event["tags"] = self._llmobs_tags(span, ml_app_final, session_id, False, None)

span_links = span._get_ctx_item(SPAN_LINKS)
if isinstance(span_links, list) and span_links:
Expand Down Expand Up @@ -2418,7 +2497,12 @@ def annotate(
metrics: Optional[dict[str, Any]] = None,
tags: Optional[dict[str, Any]] = None,
tool_definitions: Optional[list[dict[str, Any]]] = None,
_name: Optional[str] = None,
name: Optional[str] = None,
model_name: Optional[str] = None,
model_provider: Optional[str] = None,
session_id: Optional[str] = None,
ml_app: Optional[str] = None,
_name: Optional[str] = None, # kept for backwards compatibility; "name" takes precedence when both provided
_linked_spans: Optional[list[ExportedLLMObsSpan]] = None,
_suppress_span_kind_error: bool = False,
) -> None:
Expand Down Expand Up @@ -2473,6 +2557,12 @@ def annotate(
and optional "description" (string) and "schema" (JSON serializable dictionary) keys.
:param metrics: Dictionary of JSON serializable key-value metric pairs,
such as `{prompt,completion,total}_tokens`.
:param name: The display name for the span. Takes priority over `_name` when both are supplied.
`_name` param was kept for backwards compatibility
:param model_name: The model name (e.g. `"gpt-4o"`). Only applies to LLM and embedding spans.
:param model_provider: The model provider (e.g. `"openai"`). Only applies to LLM and embedding spans.
:param session_id: The session ID to group spans under a common user session.
:param ml_app: The name of the ML application.
"""
error = None
try:
Expand Down Expand Up @@ -2510,17 +2600,35 @@ def annotate(
"span tags must be a dictionary of string key - primitive value pairs."
)
else:
session_id = tags.get("session_id")
if session_id:
span._set_ctx_item(SESSION_ID, str(session_id))
tag_session_id = tags.get("session_id")
if tag_session_id:
span._set_ctx_item(SESSION_ID, str(tag_session_id))
cls._set_dict_attribute(span, TAGS, tags)
if tool_definitions is not None:
validated_tool_definitions = extract_tool_definitions(tool_definitions)
if validated_tool_definitions:
span._set_ctx_item(TOOL_DEFINITIONS, validated_tool_definitions)
span_kind = span._get_ctx_item(SPAN_KIND)
if _name is not None:
span.name = _name
# name takes priority over _name; fall back to _name if name is not provided
effective_name = name if name is not None else _name
if effective_name is not None:
span.name = effective_name
if model_name is not None:
span._set_ctx_item(MODEL_NAME, model_name)
if _get_llmobs_data_metastruct(span):
_annotate_llmobs_span_data(span, model_name=model_name)
if model_provider is not None:
span._set_ctx_item(MODEL_PROVIDER, model_provider.lower())
if _get_llmobs_data_metastruct(span):
_annotate_llmobs_span_data(span, model_provider=model_provider.lower())
if session_id is not None:
span._set_ctx_item(SESSION_ID, session_id)
if _get_llmobs_data_metastruct(span):
_annotate_llmobs_span_data(span, session_id=session_id)
if ml_app is not None:
span._set_ctx_item(ML_APP, ml_app)
if _get_llmobs_data_metastruct(span):
_annotate_llmobs_span_data(span, ml_app=ml_app)
if prompt is not None:
try:
validated_prompt = _validate_prompt(prompt, strict_validation=False)
Expand Down
Loading