From 3fe4632aed7d6860dbb74030b39e7da5b084a96e Mon Sep 17 00:00:00 2001 From: Siim Kallas Date: Fri, 23 Jan 2026 14:32:10 +0200 Subject: [PATCH 01/10] add support for callgraph profiling --- src/splunk_otel/callgraphs/__init__.py | 17 ++ src/splunk_otel/callgraphs/span_processor.py | 94 +++++++++ src/splunk_otel/configurator.py | 2 + src/splunk_otel/distro.py | 18 +- src/splunk_otel/env.py | 11 + src/splunk_otel/profile.py | 181 +++++++++++------ src/splunk_otel/propagator.py | 50 ++++- tests/test_callgraphs_init.py | 57 ++++++ tests/test_callgraphs_span_processor.py | 199 +++++++++++++++++++ tests/test_distro.py | 42 ++++ tests/test_env.py | 5 + tests/test_propagator.py | 96 ++++++++- 12 files changed, 703 insertions(+), 69 deletions(-) create mode 100644 src/splunk_otel/callgraphs/__init__.py create mode 100644 src/splunk_otel/callgraphs/span_processor.py create mode 100644 tests/test_callgraphs_init.py create mode 100644 tests/test_callgraphs_span_processor.py diff --git a/src/splunk_otel/callgraphs/__init__.py b/src/splunk_otel/callgraphs/__init__.py new file mode 100644 index 00000000..203cb8ed --- /dev/null +++ b/src/splunk_otel/callgraphs/__init__.py @@ -0,0 +1,17 @@ +from opentelemetry import trace +from opentelemetry.sdk.environment_variables import OTEL_SERVICE_NAME + +from splunk_otel.callgraphs.span_processor import CallgraphsSpanProcessor +from splunk_otel.env import ( + Env, + SPLUNK_SNAPSHOT_PROFILER_ENABLED, + SPLUNK_SNAPSHOT_SAMPLING_INTERVAL, +) + + +def start_callgraphs_if_enabled(env=None): + env = env or Env() + if env.is_true(SPLUNK_SNAPSHOT_PROFILER_ENABLED): + trace.get_tracer_provider().add_span_processor( + CallgraphsSpanProcessor(env.getval(OTEL_SERVICE_NAME), env.getint(SPLUNK_SNAPSHOT_SAMPLING_INTERVAL, 10)) + ) diff --git a/src/splunk_otel/callgraphs/span_processor.py b/src/splunk_otel/callgraphs/span_processor.py new file mode 100644 index 00000000..3ebb120e --- /dev/null +++ b/src/splunk_otel/callgraphs/span_processor.py @@ -0,0 +1,94 @@ +# Copyright Splunk Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional + +from opentelemetry import baggage, trace +from opentelemetry.context import Context +from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor + +from splunk_otel.profile import ProfilingContext + + +def _should_process_context(context: Optional[Context]) -> bool: + parent_span = trace.get_current_span(context).get_span_context() + + if not parent_span.is_valid: + return True + + return parent_span.is_remote + + +class CallgraphsSpanProcessor(SpanProcessor): + def __init__(self, service_name: str, sampling_interval: Optional[int] = 10): + self.span_id_to_trace_id: dict[int, int] = {} + self.active_traces = set() + self.profiler = ProfilingContext( + service_name, sampling_interval, self._filter_stacktraces, instrumentation_source="snapshot" + ) + + def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: + if not _should_process_context(parent_context): + return + + ctx_baggage = baggage.get_baggage("splunk.trace.snapshot.volume", parent_context) + + if ctx_baggage is None: + return + + if ctx_baggage == "highest": + span.set_attribute("splunk.snapshot.profiling", True) + + span_ctx = span.get_span_context() + + if span_ctx is None: + return + + self.span_id_to_trace_id[span_ctx.span_id] = span_ctx.trace_id + self.active_traces.add(span_ctx.trace_id) + self.profiler.start() + + def on_end(self, span: ReadableSpan) -> None: + span_id = span.get_span_context().span_id + trace_id = self.span_id_to_trace_id.get(span_id) + + if trace_id is None: + return + + del self.span_id_to_trace_id[span_id] + self.active_traces.remove(trace_id) + + if len(self.span_id_to_trace_id) == 0: + self.profiler.pause_after(60.0) + + def shutdown(self) -> None: + pass + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return True + + def _filter_stacktraces(self, stacktraces, active_trace_contexts): + filtered = [] + + for stacktrace in stacktraces: + thread_id = stacktrace["tid"] + + maybe_context = active_trace_contexts.get(thread_id) + + if maybe_context is not None: + (trace_id, _span_id) = maybe_context + if trace_id in self.active_traces: + filtered.append(stacktrace) + + return filtered diff --git a/src/splunk_otel/configurator.py b/src/splunk_otel/configurator.py index 6ee7d628..9969e6e4 100644 --- a/src/splunk_otel/configurator.py +++ b/src/splunk_otel/configurator.py @@ -15,9 +15,11 @@ from opentelemetry.sdk._configuration import _OTelSDKConfigurator from splunk_otel.profile import _start_profiling_if_enabled +from splunk_otel.callgraphs import start_callgraphs_if_enabled class SplunkConfigurator(_OTelSDKConfigurator): def _configure(self, **kwargs): super()._configure(**kwargs) _start_profiling_if_enabled() + start_callgraphs_if_enabled() diff --git a/src/splunk_otel/distro.py b/src/splunk_otel/distro.py index 55b24a99..6eea0126 100644 --- a/src/splunk_otel/distro.py +++ b/src/splunk_otel/distro.py @@ -16,6 +16,7 @@ from opentelemetry.instrumentation.distro import BaseDistro from opentelemetry.instrumentation.propagators import set_global_response_propagator +from opentelemetry.propagators.composite import CompositePropagator from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_LOGS_ENDPOINT, @@ -25,6 +26,7 @@ OTEL_RESOURCE_ATTRIBUTES, OTEL_SERVICE_NAME, ) +from opentelemetry.propagate import get_global_textmap, set_global_textmap from splunk_otel.__about__ import __version__ as version from splunk_otel.env import ( @@ -33,10 +35,12 @@ SPLUNK_PROFILER_ENABLED, SPLUNK_PROFILER_LOGS_ENDPOINT, SPLUNK_REALM, + SPLUNK_SNAPSHOT_PROFILER_ENABLED, + SPLUNK_SNAPSHOT_SELECTION_PROBABILITY, SPLUNK_TRACE_RESPONSE_HEADER_ENABLED, Env, ) -from splunk_otel.propagator import ServerTimingResponsePropagator +from splunk_otel.propagator import CallgraphsPropagator, ServerTimingResponsePropagator _DISTRO_NAME = "splunk-opentelemetry" @@ -66,6 +70,7 @@ def _configure(self, **kwargs): self.handle_realm() self.configure_token_headers() self.set_server_timing_propagator() + self.set_callgraphs_propagator() def set_env_defaults(self): for key, value in DEFAULTS.items(): @@ -110,3 +115,14 @@ def configure_token_headers(self): def set_server_timing_propagator(self): if self.env.is_true(SPLUNK_TRACE_RESPONSE_HEADER_ENABLED, "true"): set_global_response_propagator(ServerTimingResponsePropagator()) + + def set_callgraphs_propagator(self): + if self.env.is_true(SPLUNK_SNAPSHOT_PROFILER_ENABLED, "false"): + set_global_textmap( + CompositePropagator( + [ + get_global_textmap(), + CallgraphsPropagator(self.env.getfloat(SPLUNK_SNAPSHOT_SELECTION_PROBABILITY, 0.01)), + ] + ) + ) diff --git a/src/splunk_otel/env.py b/src/splunk_otel/env.py index f2d35785..80333eef 100644 --- a/src/splunk_otel/env.py +++ b/src/splunk_otel/env.py @@ -51,6 +51,9 @@ SPLUNK_PROFILER_ENABLED = "SPLUNK_PROFILER_ENABLED" SPLUNK_PROFILER_CALL_STACK_INTERVAL = "SPLUNK_PROFILER_CALL_STACK_INTERVAL" SPLUNK_PROFILER_LOGS_ENDPOINT = "SPLUNK_PROFILER_LOGS_ENDPOINT" +SPLUNK_SNAPSHOT_PROFILER_ENABLED = "SPLUNK_SNAPSHOT_PROFILER_ENABLED" +SPLUNK_SNAPSHOT_SAMPLING_INTERVAL = "SPLUNK_SNAPSHOT_SAMPLING_INTERVAL" +SPLUNK_SNAPSHOT_SELECTION_PROBABILITY = "SPLUNK_SNAPSHOT_SELECTION_PROBABILITY" SPLUNK_REALM = "SPLUNK_REALM" _pylogger = logging.getLogger(__name__) @@ -85,6 +88,14 @@ def getint(self, key, default=0): _pylogger.warning("Invalid integer value of '%s' for env var '%s'", val, key) return default + def getfloat(self, key, default=0.0): + val = self.getval(key, str(default)) + try: + return float(val) + except ValueError: + _pylogger.warning("Invalid float value of '%s' for env var '%s'", val, key) + return default + def setval(self, key, value): self.store[key] = value diff --git a/src/splunk_otel/profile.py b/src/splunk_otel/profile.py index f5e1b136..47c79572 100644 --- a/src/splunk_otel/profile.py +++ b/src/splunk_otel/profile.py @@ -1,12 +1,12 @@ import base64 import gzip -import logging import sys import threading import time import traceback from collections import OrderedDict from traceback import StackSummary +from typing import Callable, Optional, Literal import opentelemetry.context import wrapt @@ -37,8 +37,41 @@ _SCOPE_VERSION = "0.2.0" _SCOPE_NAME = "otel.profiling" -_timer = None -_pylogger = logging.getLogger(__name__) +_thread_states = {} +_context_tracking_started = False + + +class ProfilingContext: + _timer = None + + def __init__( + self, + service_name: str, + interval_millis: int, + stacktrace_filter: Optional[Callable[[list[dict], dict], list[dict]]] = None, + instrumentation_source: Optional[Literal["continuous", "snapshot"]] = "continuous", + ): + start_thread_context_tracking() + resource = _mk_resource(service_name) + logger = get_logger(_SCOPE_NAME, _SCOPE_VERSION) + scraper = _ProfileScraper( + resource, + _thread_states, + interval_millis, + logger, + stacktrace_filter=stacktrace_filter, + instrumentation_source=instrumentation_source, + ) + self._timer = _IntervalTimer(interval_millis, scraper.tick) + + def start(self): + self._timer.start() + + def stop(self): + self._timer.stop() + + def pause_after(self, seconds: float): + self._timer.pause_after(seconds) def _start_profiling_if_enabled(env=None): @@ -52,20 +85,9 @@ def start_profiling(env=None): interval_millis = env.getint(SPLUNK_PROFILER_CALL_STACK_INTERVAL, _DEFAULT_PROF_CALL_STACK_INTERVAL_MILLIS) svcname = env.getval(OTEL_SERVICE_NAME) - tcm = _ThreadContextMapping() - tcm.wrap_context_methods() - - resource = _mk_resource(svcname) - logger = get_logger(_SCOPE_NAME, _SCOPE_VERSION) - scraper = _ProfileScraper(resource, tcm.get_thread_states(), interval_millis, logger) - - global _timer # noqa PLW0603 - _timer = _IntervalTimer(interval_millis, scraper.tick) - _timer.start() - - -def stop_profiling(): - _timer.stop() + ctx = ProfilingContext(svcname, interval_millis) + ctx.start() + return ctx def _mk_resource(service_name) -> Resource: @@ -77,61 +99,55 @@ def _mk_resource(service_name) -> Resource: ) -class _ThreadContextMapping: - def __init__(self): - self.thread_states = {} +def start_thread_context_tracking(): + global _context_tracking_started # noqa PLW0603 + if _context_tracking_started: + return + _context_tracking_started = True - def get_thread_states(self): - return self.thread_states + wrapt.wrap_function_wrapper(opentelemetry.context, "attach", _wrap_context_attach) + wrapt.wrap_function_wrapper(opentelemetry.context, "detach", _wrap_context_detach) - def wrap_context_methods(self): - wrapt.wrap_function_wrapper(opentelemetry.context, "attach", self.wrap_context_attach()) - wrapt.wrap_function_wrapper(opentelemetry.context, "detach", self.wrap_context_detach()) - def wrap_context_attach(self): - def wrapper(wrapped, _instance, args, kwargs): - token = wrapped(*args, **kwargs) +def _wrap_context_attach(wrapped, _instance, args, kwargs): + token = wrapped(*args, **kwargs) - maybe_context = args[0] if args else None + maybe_context = args[0] if args else None - if maybe_context: - span = maybe_context.get(_SPAN_KEY) + if maybe_context: + span = maybe_context.get(_SPAN_KEY) - if span: - thread_id = threading.get_ident() - context = span.get_span_context() - self.thread_states[thread_id] = ( - context.trace_id, - context.span_id, - ) + if span: + thread_id = threading.get_ident() + context = span.get_span_context() + _thread_states[thread_id] = ( + context.trace_id, + context.span_id, + ) - return token + return token - return wrapper - def wrap_context_detach(self): - def wrapper(wrapped, _instance, args, kwargs): - token = args[0] if args else None +def _wrap_context_detach(wrapped, _instance, args, kwargs): + token = args[0] if args else None - if token: - prev = token.old_value - thread_id = threading.get_ident() - if isinstance(prev, Context): - span = prev.get(_SPAN_KEY) + if token: + prev = token.old_value + thread_id = threading.get_ident() + if isinstance(prev, Context): + span = prev.get(_SPAN_KEY) - if span: - context = span.get_span_context() - self.thread_states[thread_id] = ( - context.trace_id, - context.span_id, - ) - else: - self.thread_states[thread_id] = None - else: - self.thread_states[thread_id] = None - return wrapped(*args, **kwargs) - - return wrapper + if span: + context = span.get_span_context() + _thread_states[thread_id] = ( + context.trace_id, + context.span_id, + ) + else: + _thread_states[thread_id] = None + else: + _thread_states[thread_id] = None + return wrapped(*args, **kwargs) def _collect_stacktraces(): @@ -161,6 +177,8 @@ def __init__( logger: Logger, collect_stacktraces_func=_collect_stacktraces, time_func=time.time, + stacktrace_filter: Optional[Callable[[list[dict], dict], list[dict]]] = None, + instrumentation_source: Optional[Literal["continuous", "snapshot"]] = "continuous", ): self.resource = resource self.thread_states = thread_states @@ -168,9 +186,18 @@ def __init__( self.collect_stacktraces = collect_stacktraces_func self.time = time_func self.logger = logger + self.stacktrace_filter = stacktrace_filter + self.instrumentation_source = instrumentation_source def tick(self): stacktraces = self.collect_stacktraces() + + if self.stacktrace_filter is not None: + stacktraces = self.stacktrace_filter(stacktraces, self.thread_states) + + if len(stacktraces) == 0: + return + log_record = self.mk_log_record(stacktraces) self.logger.emit(log_record) @@ -205,6 +232,7 @@ def mk_log_record(self, stacktraces): "profiling.data.type": "cpu", "com.splunk.sourcetype": "otel.profiling", "profiling.data.total.frame.count": total_frame_count, + "profiling.instrumentation.source": self.instrumentation_source, }, ) instrumentation_scope = getattr( @@ -231,22 +259,45 @@ def __init__(self, interval_millis, target): self.interval_seconds = interval_millis / 1e3 self.target = target self.thread = threading.Thread(target=self._loop, daemon=True) - self.sleep = time.sleep + self.running = False + self.pause_at = None + self.wakeup_event = threading.Event() def start(self): + self.pause_at = None + self.wakeup_event.set() + + if self.running: + return + + self.running = True self.thread.start() def _loop(self): - while True: - start_time_seconds = time.time() + while self.running: + start_time_seconds = time.monotonic() + + # Pause has been been requested, sleep until pause_at or until woken. + if self.pause_at is not None and start_time_seconds < self.pause_at: + if not self.wakeup_event.wait(timeout=self.pause_at - start_time_seconds): + self.pause_at = None + self.wakeup_event.clear() + continue + self.target() - elapsed_seconds = time.time() - start_time_seconds + elapsed_seconds = time.monotonic() - start_time_seconds sleep_seconds = max(0, self.interval_seconds - elapsed_seconds) time.sleep(sleep_seconds) def stop(self): + self.running = False + self.wakeup_event.set() self.thread.join() + def pause_after(self, seconds: float): + self.pause_at = time.monotonic() + seconds + self.wakeup_event.clear() + class _StringTable: def __init__(self): diff --git a/src/splunk_otel/propagator.py b/src/splunk_otel/propagator.py index 5f2b51ae..3487695b 100644 --- a/src/splunk_otel/propagator.py +++ b/src/splunk_otel/propagator.py @@ -13,8 +13,9 @@ # limitations under the License. import typing +import random -from opentelemetry import trace +from opentelemetry import baggage, trace from opentelemetry.context.context import Context from opentelemetry.instrumentation.propagators import ( _HTTP_HEADER_ACCESS_CONTROL_EXPOSE_HEADERS, @@ -22,8 +23,11 @@ default_setter, ) from opentelemetry.propagators import textmap +from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, Decision from opentelemetry.trace import format_span_id, format_trace_id +_SPLUNK_TRACE_SNAPSHOT_VOLUME = "splunk.trace.snapshot.volume" + class ServerTimingResponsePropagator(ResponsePropagator): def inject( @@ -58,3 +62,47 @@ def inject( _HTTP_HEADER_ACCESS_CONTROL_EXPOSE_HEADERS, header_name, ) + + +def _with_volume_baggage(is_selected: bool, context: typing.Optional[Context]) -> Context: # noqa FBT001 + baggage_value = "highest" if is_selected else "off" + return baggage.set_baggage(_SPLUNK_TRACE_SNAPSHOT_VOLUME, baggage_value, context) + + +class CallgraphsPropagator(textmap.TextMapPropagator): + selection_probability: float + + def __init__(self, selection_probability: float = 0.01): + self.selection_probability = selection_probability + self.sampler = TraceIdRatioBased(selection_probability) + + def extract(self, carrier, context, getter): + volume_baggage = baggage.get_baggage(_SPLUNK_TRACE_SNAPSHOT_VOLUME, context) + + if volume_baggage is None: + return self._attach_volume_baggage(context) + + if volume_baggage in {"highest", "off"}: + return context + + return self._attach_volume_baggage(context) + + def inject(self, carrier, context, setter): + pass + + def fields(self) -> set[str]: + return set() + + def _attach_volume_baggage(self, context: typing.Optional[Context]) -> Context: + span = trace.get_current_span(context) + + if span is None: + is_selected = random.random() < self.selection_probability # noqa S311 + return _with_volume_baggage(is_selected, context) + + is_selected = ( + self.sampler.should_sample(context, span.get_span_context().trace_id, "splunk.snapshot.profiling").decision + == Decision.RECORD_AND_SAMPLE + ) + + return _with_volume_baggage(is_selected, context) diff --git a/tests/test_callgraphs_init.py b/tests/test_callgraphs_init.py new file mode 100644 index 00000000..e1aea563 --- /dev/null +++ b/tests/test_callgraphs_init.py @@ -0,0 +1,57 @@ +# Copyright Splunk Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest.mock import patch + +from splunk_otel.callgraphs import start_callgraphs_if_enabled +from splunk_otel.env import Env + + +class TestStartCallgraphsIfEnabled: + @patch("splunk_otel.callgraphs.trace") + def test_does_not_add_processor_when_disabled(self, mock_trace): + env_store = {} + env = Env(env_store) + + start_callgraphs_if_enabled(env) + + mock_trace.get_tracer_provider.return_value.add_span_processor.assert_not_called() + + @patch("splunk_otel.callgraphs.trace") + @patch("splunk_otel.callgraphs.CallgraphsSpanProcessor") + def test_adds_processor_when_enabled(self, mock_processor, mock_trace): + env_store = { + "SPLUNK_SNAPSHOT_PROFILER_ENABLED": "true", + "OTEL_SERVICE_NAME": "test-service", + } + env = Env(env_store) + + start_callgraphs_if_enabled(env) + + mock_trace.get_tracer_provider.return_value.add_span_processor.assert_called_once() + mock_processor.assert_called_once_with("test-service", 10) + + @patch("splunk_otel.callgraphs.trace") + @patch("splunk_otel.callgraphs.CallgraphsSpanProcessor") + def test_uses_custom_sampling_interval(self, mock_processor, mock_trace): + env_store = { + "SPLUNK_SNAPSHOT_PROFILER_ENABLED": "true", + "OTEL_SERVICE_NAME": "test-service", + "SPLUNK_SNAPSHOT_SAMPLING_INTERVAL": "50", + } + env = Env(env_store) + + start_callgraphs_if_enabled(env) + + mock_processor.assert_called_once_with("test-service", 50) diff --git a/tests/test_callgraphs_span_processor.py b/tests/test_callgraphs_span_processor.py new file mode 100644 index 00000000..a0f2948a --- /dev/null +++ b/tests/test_callgraphs_span_processor.py @@ -0,0 +1,199 @@ +# Copyright Splunk Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest.mock import MagicMock, patch + +from opentelemetry import baggage, trace +from opentelemetry.context import Context +from opentelemetry.sdk.trace import Span +from opentelemetry.trace import SpanContext + +from splunk_otel.callgraphs.span_processor import CallgraphsSpanProcessor, _should_process_context + + +class TestShouldProcessContext: + def test_returns_true_when_no_parent_span(self): + assert _should_process_context(Context()) is True + + def test_returns_true_when_parent_is_remote(self): + span = trace.NonRecordingSpan( + SpanContext( + trace_id=1, + span_id=2, + is_remote=True, + ), + ) + ctx = trace.set_span_in_context(span, Context()) + assert _should_process_context(ctx) is True + + def test_returns_false_when_parent_is_local(self): + span = trace.NonRecordingSpan( + SpanContext( + trace_id=1, + span_id=2, + is_remote=False, + ), + ) + ctx = trace.set_span_in_context(span, Context()) + assert _should_process_context(ctx) is False + + +class TestCallgraphsSpanProcessor: + @patch("splunk_otel.callgraphs.span_processor.ProfilingContext") + def test_on_start_does_nothing_when_baggage_is_none(self, mock_profiling_context): + processor = CallgraphsSpanProcessor("test-service") + span = MagicMock(spec=Span) + + processor.on_start(span, Context()) + + span.set_attribute.assert_not_called() + mock_profiling_context.return_value.start.assert_not_called() + + @patch("splunk_otel.callgraphs.span_processor.ProfilingContext") + def test_on_start_does_nothing_when_baggage_is_off(self, mock_profiling_context): + processor = CallgraphsSpanProcessor("test-service") + span = MagicMock(spec=Span) + ctx = baggage.set_baggage("splunk.trace.snapshot.volume", "off", Context()) + + processor.on_start(span, ctx) + + span.set_attribute.assert_not_called() + mock_profiling_context.return_value.start.assert_not_called() + + @patch("splunk_otel.callgraphs.span_processor.ProfilingContext") + def test_on_start_does_nothing_when_parent_is_local(self, mock_profiling_context): + processor = CallgraphsSpanProcessor("test-service") + span = MagicMock(spec=Span) + + parent_span = trace.NonRecordingSpan( + SpanContext(trace_id=1, span_id=2, is_remote=False), + ) + ctx = trace.set_span_in_context(parent_span, Context()) + ctx = baggage.set_baggage("splunk.trace.snapshot.volume", "highest", ctx) + + processor.on_start(span, ctx) + + span.set_attribute.assert_not_called() + mock_profiling_context.return_value.start.assert_not_called() + + @patch("splunk_otel.callgraphs.span_processor.ProfilingContext") + def test_on_start_activates_profiling_when_baggage_is_highest(self, mock_profiling_context): + processor = CallgraphsSpanProcessor("test-service") + + span = MagicMock(spec=Span) + span_ctx = SpanContext(trace_id=123, span_id=456, is_remote=False) + span.get_span_context.return_value = span_ctx + + ctx = baggage.set_baggage("splunk.trace.snapshot.volume", "highest", Context()) + + processor.on_start(span, ctx) + + span.set_attribute.assert_called_once_with("splunk.snapshot.profiling", True) + mock_profiling_context.return_value.start.assert_called_once() + assert 456 in processor.span_id_to_trace_id + assert processor.span_id_to_trace_id[456] == 123 + assert 123 in processor.active_traces + + @patch("splunk_otel.callgraphs.span_processor.ProfilingContext") + def test_on_end_removes_span_from_tracking(self, mock_profiling_context): + processor = CallgraphsSpanProcessor("test-service") + processor.span_id_to_trace_id[456] = 123 + processor.active_traces.add(123) + + span = MagicMock(spec=Span) + span_ctx = SpanContext(trace_id=123, span_id=456, is_remote=False) + span.get_span_context.return_value = span_ctx + + processor.on_end(span) + + assert 456 not in processor.span_id_to_trace_id + assert 123 not in processor.active_traces + + @patch("splunk_otel.callgraphs.span_processor.ProfilingContext") + def test_on_end_pauses_profiler_when_no_active_spans(self, mock_profiling_context): + processor = CallgraphsSpanProcessor("test-service") + processor.span_id_to_trace_id[456] = 123 + processor.active_traces.add(123) + + span = MagicMock(spec=Span) + span_ctx = SpanContext(trace_id=123, span_id=456, is_remote=False) + span.get_span_context.return_value = span_ctx + + processor.on_end(span) + + mock_profiling_context.return_value.pause_after.assert_called_once_with(60.0) + + @patch("splunk_otel.callgraphs.span_processor.ProfilingContext") + def test_on_end_does_not_pause_profiler_when_other_spans_active(self, mock_profiling_context): + processor = CallgraphsSpanProcessor("test-service") + processor.span_id_to_trace_id[456] = 123 + processor.span_id_to_trace_id[789] = 123 + processor.active_traces.add(123) + + span = MagicMock(spec=Span) + span_ctx = SpanContext(trace_id=123, span_id=456, is_remote=False) + span.get_span_context.return_value = span_ctx + + processor.on_end(span) + + mock_profiling_context.return_value.pause_after.assert_not_called() + + @patch("splunk_otel.callgraphs.span_processor.ProfilingContext") + def test_on_end_ignores_untracked_spans(self, mock_profiling_context): + processor = CallgraphsSpanProcessor("test-service") + + span = MagicMock(spec=Span) + span_ctx = SpanContext(trace_id=123, span_id=456, is_remote=False) + span.get_span_context.return_value = span_ctx + + processor.on_end(span) + + mock_profiling_context.return_value.pause_after.assert_not_called() + + @patch("splunk_otel.callgraphs.span_processor.ProfilingContext") + def test_filter_stacktraces_keeps_active_traces(self, mock_profiling_context): + processor = CallgraphsSpanProcessor("test-service") + processor.active_traces.add(123) + + stacktraces = [ + {"tid": 1, "frames": []}, + {"tid": 2, "frames": []}, + {"tid": 3, "frames": []}, + ] + active_trace_contexts = { + 1: (123, 456), + 2: (999, 888), + } + + result = processor._filter_stacktraces(stacktraces, active_trace_contexts) # noqa SLF001 + + assert len(result) == 1 + assert result[0]["tid"] == 1 + + @patch("splunk_otel.callgraphs.span_processor.ProfilingContext") + def test_filter_stacktraces_returns_empty_when_no_active_traces(self, mock_profiling_context): + processor = CallgraphsSpanProcessor("test-service") + + stacktraces = [ + {"tid": 1, "frames": []}, + {"tid": 2, "frames": []}, + ] + active_trace_contexts = { + 1: (123, 456), + 2: (999, 888), + } + + result = processor._filter_stacktraces(stacktraces, active_trace_contexts) # noqa SLF001 + + assert len(result) == 0 diff --git a/tests/test_distro.py b/tests/test_distro.py index c11c40c7..a386b471 100644 --- a/tests/test_distro.py +++ b/tests/test_distro.py @@ -17,9 +17,13 @@ get_global_response_propagator, set_global_response_propagator, ) +from opentelemetry.propagate import get_global_textmap +from opentelemetry.propagators.composite import CompositePropagator + from splunk_otel.__about__ import __version__ as version from splunk_otel.distro import SplunkDistro from splunk_otel.env import Env +from splunk_otel.propagator import CallgraphsPropagator def test_distro_env(): @@ -108,6 +112,44 @@ def test_realm(): assert env_store["OTEL_EXPORTER_OTLP_PROTOCOL"] == "http/protobuf" +def test_callgraphs_propagator_disabled_by_default(): + env_store = {} + configure_distro(env_store) + + textmap = get_global_textmap() + if isinstance(textmap, CompositePropagator): + propagators = textmap._propagators # noqa SLF001 + callgraphs_propagators = [p for p in propagators if isinstance(p, CallgraphsPropagator)] + assert len(callgraphs_propagators) == 0 + else: + assert not isinstance(textmap, CallgraphsPropagator) + + +def test_callgraphs_propagator_enabled(): + env_store = {"SPLUNK_SNAPSHOT_PROFILER_ENABLED": "true"} + configure_distro(env_store) + + textmap = get_global_textmap() + assert isinstance(textmap, CompositePropagator) + + propagators = textmap._propagators # noqa SLF001 + callgraphs_propagators = [p for p in propagators if isinstance(p, CallgraphsPropagator)] + assert len(callgraphs_propagators) == 1 + + +def test_callgraphs_propagator_selection_probability(): + env_store = { + "SPLUNK_SNAPSHOT_PROFILER_ENABLED": "true", + "SPLUNK_SNAPSHOT_SELECTION_PROBABILITY": "0.5", + } + configure_distro(env_store) + + textmap = get_global_textmap() + propagators = textmap._propagators # noqa SLF001 + callgraphs_propagator = next(p for p in propagators if isinstance(p, CallgraphsPropagator)) + assert callgraphs_propagator.selection_probability == 0.5 + + def configure_distro(env_store): sd = SplunkDistro() sd.env = Env(env_store) diff --git a/tests/test_env.py b/tests/test_env.py index a14df72d..a5e2f23f 100644 --- a/tests/test_env.py +++ b/tests/test_env.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import pytest from splunk_otel.env import Env @@ -21,6 +22,7 @@ def test_env(): e.store = { "PREEXISTING": "preexisting", "FAVORITE_NUMBER": "42", + "FLOATY_NUMBER": "0.56", } e.setdefault("PREEXISTING", "default") @@ -46,6 +48,9 @@ def test_env(): assert e.getint("FAVORITE_NUMBER", 111) == 42 assert e.getint("NOT_SET", 222) == 222 + assert pytest.approx(e.getfloat("FLOATY_NUMBER", 0.1), 0.001) == 0.56 + assert pytest.approx(e.getfloat("NOT_SET", 0.234), 0.001) == 0.234 + def test_get_invalid_int(caplog): with caplog.at_level(logging.WARNING): diff --git a/tests/test_propagator.py b/tests/test_propagator.py index 6708fc7b..6761480c 100644 --- a/tests/test_propagator.py +++ b/tests/test_propagator.py @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from opentelemetry import trace -from splunk_otel.propagator import ServerTimingResponsePropagator +from opentelemetry import baggage, trace +from opentelemetry.context import Context +from splunk_otel.propagator import CallgraphsPropagator, ServerTimingResponsePropagator def test_inject(): @@ -33,3 +34,94 @@ def test_inject(): prop.inject(carrier, ctx) assert carrier["Access-Control-Expose-Headers"] == "Server-Timing" assert carrier["Server-Timing"] == 'traceparent;desc="00-00000000000000000000000000000001-0000000000000002-01"' + + +class TestCallgraphsPropagator: + def test_extract_sets_highest_when_trace_is_selected(self): + span = trace.NonRecordingSpan( + trace.SpanContext( + trace_id=1, + span_id=2, + is_remote=False, + ), + ) + ctx = trace.set_span_in_context(span, Context()) + + prop = CallgraphsPropagator(selection_probability=1.0) + result_ctx = prop.extract({}, ctx, None) + + volume = baggage.get_baggage("splunk.trace.snapshot.volume", result_ctx) + assert volume == "highest" + + def test_extract_sets_off_when_not_trace_is_not_selected(self): + span = trace.NonRecordingSpan( + trace.SpanContext( + trace_id=1, + span_id=2, + is_remote=False, + ), + ) + ctx = trace.set_span_in_context(span, Context()) + + prop = CallgraphsPropagator(selection_probability=0.0) + result_ctx = prop.extract({}, ctx, None) + + volume = baggage.get_baggage("splunk.trace.snapshot.volume", result_ctx) + assert volume == "off" + + def test_extract_preserves_existing_highest_baggage(self): + ctx = baggage.set_baggage("splunk.trace.snapshot.volume", "highest", Context()) + + prop = CallgraphsPropagator(selection_probability=0.0) + result_ctx = prop.extract({}, ctx, None) + + volume = baggage.get_baggage("splunk.trace.snapshot.volume", result_ctx) + assert volume == "highest" + + def test_extract_preserves_existing_off_baggage(self): + ctx = baggage.set_baggage("splunk.trace.snapshot.volume", "off", Context()) + + prop = CallgraphsPropagator(selection_probability=1.0) + result_ctx = prop.extract({}, ctx, None) + + volume = baggage.get_baggage("splunk.trace.snapshot.volume", result_ctx) + assert volume == "off" + + def test_extract_resets_invalid_baggage_value(self): + ctx = baggage.set_baggage("splunk.trace.snapshot.volume", "invalid", Context()) + span = trace.NonRecordingSpan( + trace.SpanContext( + trace_id=1, + span_id=2, + is_remote=False, + ), + ) + ctx = trace.set_span_in_context(span, ctx) + + prop = CallgraphsPropagator(selection_probability=1.0) + result_ctx = prop.extract({}, ctx, None) + + volume = baggage.get_baggage("splunk.trace.snapshot.volume", result_ctx) + assert volume == "highest" + + def test_sampling_is_consistent_for_same_trace_id(self): + trace_id = 12345678901234567890 + + span = trace.NonRecordingSpan( + trace.SpanContext( + trace_id=trace_id, + span_id=1, + is_remote=False, + ), + ) + ctx = trace.set_span_in_context(span, Context()) + + prop = CallgraphsPropagator(selection_probability=0.5) + + results = [] + for _ in range(16): + result_ctx = prop.extract({}, ctx, None) + volume = baggage.get_baggage("splunk.trace.snapshot.volume", result_ctx) + results.append(volume) + + assert all(r == results[0] for r in results) From 7f0ce87b1f4dac1fc76359baca7afd08e8557d6f Mon Sep 17 00:00:00 2001 From: Siim Kallas Date: Fri, 23 Jan 2026 15:41:00 +0200 Subject: [PATCH 02/10] use set discard and fix the check for parent context --- src/splunk_otel/callgraphs/span_processor.py | 2 +- src/splunk_otel/propagator.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/splunk_otel/callgraphs/span_processor.py b/src/splunk_otel/callgraphs/span_processor.py index 3ebb120e..7ba08643 100644 --- a/src/splunk_otel/callgraphs/span_processor.py +++ b/src/splunk_otel/callgraphs/span_processor.py @@ -67,7 +67,7 @@ def on_end(self, span: ReadableSpan) -> None: return del self.span_id_to_trace_id[span_id] - self.active_traces.remove(trace_id) + self.active_traces.discard(trace_id) if len(self.span_id_to_trace_id) == 0: self.profiler.pause_after(60.0) diff --git a/src/splunk_otel/propagator.py b/src/splunk_otel/propagator.py index 3487695b..32099b86 100644 --- a/src/splunk_otel/propagator.py +++ b/src/splunk_otel/propagator.py @@ -96,7 +96,7 @@ def fields(self) -> set[str]: def _attach_volume_baggage(self, context: typing.Optional[Context]) -> Context: span = trace.get_current_span(context) - if span is None: + if not span.get_span_context().is_valid: is_selected = random.random() < self.selection_probability # noqa S311 return _with_volume_baggage(is_selected, context) From 1770046c0b754566ddc6ae3881c670ca793b9385 Mon Sep 17 00:00:00 2001 From: Siim Kallas Date: Thu, 5 Feb 2026 19:49:55 +0200 Subject: [PATCH 03/10] remove active_traces from CallgraphsSpanProcessor --- src/splunk_otel/callgraphs/span_processor.py | 5 +---- tests/test_callgraphs_span_processor.py | 7 +------ 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/src/splunk_otel/callgraphs/span_processor.py b/src/splunk_otel/callgraphs/span_processor.py index 7ba08643..d2bed183 100644 --- a/src/splunk_otel/callgraphs/span_processor.py +++ b/src/splunk_otel/callgraphs/span_processor.py @@ -33,7 +33,6 @@ def _should_process_context(context: Optional[Context]) -> bool: class CallgraphsSpanProcessor(SpanProcessor): def __init__(self, service_name: str, sampling_interval: Optional[int] = 10): self.span_id_to_trace_id: dict[int, int] = {} - self.active_traces = set() self.profiler = ProfilingContext( service_name, sampling_interval, self._filter_stacktraces, instrumentation_source="snapshot" ) @@ -56,7 +55,6 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None return self.span_id_to_trace_id[span_ctx.span_id] = span_ctx.trace_id - self.active_traces.add(span_ctx.trace_id) self.profiler.start() def on_end(self, span: ReadableSpan) -> None: @@ -67,7 +65,6 @@ def on_end(self, span: ReadableSpan) -> None: return del self.span_id_to_trace_id[span_id] - self.active_traces.discard(trace_id) if len(self.span_id_to_trace_id) == 0: self.profiler.pause_after(60.0) @@ -88,7 +85,7 @@ def _filter_stacktraces(self, stacktraces, active_trace_contexts): if maybe_context is not None: (trace_id, _span_id) = maybe_context - if trace_id in self.active_traces: + if trace_id in self.span_id_to_trace_id.values(): filtered.append(stacktrace) return filtered diff --git a/tests/test_callgraphs_span_processor.py b/tests/test_callgraphs_span_processor.py index a0f2948a..22cbde0e 100644 --- a/tests/test_callgraphs_span_processor.py +++ b/tests/test_callgraphs_span_processor.py @@ -103,13 +103,11 @@ def test_on_start_activates_profiling_when_baggage_is_highest(self, mock_profili mock_profiling_context.return_value.start.assert_called_once() assert 456 in processor.span_id_to_trace_id assert processor.span_id_to_trace_id[456] == 123 - assert 123 in processor.active_traces @patch("splunk_otel.callgraphs.span_processor.ProfilingContext") def test_on_end_removes_span_from_tracking(self, mock_profiling_context): processor = CallgraphsSpanProcessor("test-service") processor.span_id_to_trace_id[456] = 123 - processor.active_traces.add(123) span = MagicMock(spec=Span) span_ctx = SpanContext(trace_id=123, span_id=456, is_remote=False) @@ -118,13 +116,11 @@ def test_on_end_removes_span_from_tracking(self, mock_profiling_context): processor.on_end(span) assert 456 not in processor.span_id_to_trace_id - assert 123 not in processor.active_traces @patch("splunk_otel.callgraphs.span_processor.ProfilingContext") def test_on_end_pauses_profiler_when_no_active_spans(self, mock_profiling_context): processor = CallgraphsSpanProcessor("test-service") processor.span_id_to_trace_id[456] = 123 - processor.active_traces.add(123) span = MagicMock(spec=Span) span_ctx = SpanContext(trace_id=123, span_id=456, is_remote=False) @@ -139,7 +135,6 @@ def test_on_end_does_not_pause_profiler_when_other_spans_active(self, mock_profi processor = CallgraphsSpanProcessor("test-service") processor.span_id_to_trace_id[456] = 123 processor.span_id_to_trace_id[789] = 123 - processor.active_traces.add(123) span = MagicMock(spec=Span) span_ctx = SpanContext(trace_id=123, span_id=456, is_remote=False) @@ -164,7 +159,7 @@ def test_on_end_ignores_untracked_spans(self, mock_profiling_context): @patch("splunk_otel.callgraphs.span_processor.ProfilingContext") def test_filter_stacktraces_keeps_active_traces(self, mock_profiling_context): processor = CallgraphsSpanProcessor("test-service") - processor.active_traces.add(123) + processor.span_id_to_trace_id[456] = 123 stacktraces = [ {"tid": 1, "frames": []}, From 2106073a5ce5915fc422def5c93940f5f6fd38b4 Mon Sep 17 00:00:00 2001 From: Siim Kallas Date: Thu, 5 Feb 2026 20:35:41 +0200 Subject: [PATCH 04/10] use constant for splunk.trace.snapshot.volume --- src/splunk_otel/callgraphs/span_processor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/splunk_otel/callgraphs/span_processor.py b/src/splunk_otel/callgraphs/span_processor.py index d2bed183..123c4aac 100644 --- a/src/splunk_otel/callgraphs/span_processor.py +++ b/src/splunk_otel/callgraphs/span_processor.py @@ -19,6 +19,7 @@ from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor from splunk_otel.profile import ProfilingContext +from splunk_otel.propagator import _SPLUNK_TRACE_SNAPSHOT_VOLUME def _should_process_context(context: Optional[Context]) -> bool: @@ -41,7 +42,7 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None if not _should_process_context(parent_context): return - ctx_baggage = baggage.get_baggage("splunk.trace.snapshot.volume", parent_context) + ctx_baggage = baggage.get_baggage(_SPLUNK_TRACE_SNAPSHOT_VOLUME, parent_context) if ctx_baggage is None: return From d382cb1562a0f18a5d36c0e3ed9415ad21ea780e Mon Sep 17 00:00:00 2001 From: Siim Kallas Date: Thu, 5 Feb 2026 20:38:44 +0200 Subject: [PATCH 05/10] rename start_callgraphs_if_enabled to _configure_callgraphs_if_enabled --- src/splunk_otel/callgraphs/__init__.py | 2 +- src/splunk_otel/configurator.py | 4 ++-- tests/test_callgraphs_init.py | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/splunk_otel/callgraphs/__init__.py b/src/splunk_otel/callgraphs/__init__.py index 203cb8ed..299151f4 100644 --- a/src/splunk_otel/callgraphs/__init__.py +++ b/src/splunk_otel/callgraphs/__init__.py @@ -9,7 +9,7 @@ ) -def start_callgraphs_if_enabled(env=None): +def _configure_callgraphs_if_enabled(env=None): env = env or Env() if env.is_true(SPLUNK_SNAPSHOT_PROFILER_ENABLED): trace.get_tracer_provider().add_span_processor( diff --git a/src/splunk_otel/configurator.py b/src/splunk_otel/configurator.py index 9969e6e4..126b8ea6 100644 --- a/src/splunk_otel/configurator.py +++ b/src/splunk_otel/configurator.py @@ -15,11 +15,11 @@ from opentelemetry.sdk._configuration import _OTelSDKConfigurator from splunk_otel.profile import _start_profiling_if_enabled -from splunk_otel.callgraphs import start_callgraphs_if_enabled +from splunk_otel.callgraphs import _configure_callgraphs_if_enabled class SplunkConfigurator(_OTelSDKConfigurator): def _configure(self, **kwargs): super()._configure(**kwargs) _start_profiling_if_enabled() - start_callgraphs_if_enabled() + _configure_callgraphs_if_enabled() diff --git a/tests/test_callgraphs_init.py b/tests/test_callgraphs_init.py index e1aea563..56d258ec 100644 --- a/tests/test_callgraphs_init.py +++ b/tests/test_callgraphs_init.py @@ -14,7 +14,7 @@ from unittest.mock import patch -from splunk_otel.callgraphs import start_callgraphs_if_enabled +from splunk_otel.callgraphs import _configure_callgraphs_if_enabled from splunk_otel.env import Env @@ -24,7 +24,7 @@ def test_does_not_add_processor_when_disabled(self, mock_trace): env_store = {} env = Env(env_store) - start_callgraphs_if_enabled(env) + _configure_callgraphs_if_enabled(env) mock_trace.get_tracer_provider.return_value.add_span_processor.assert_not_called() @@ -37,7 +37,7 @@ def test_adds_processor_when_enabled(self, mock_processor, mock_trace): } env = Env(env_store) - start_callgraphs_if_enabled(env) + _configure_callgraphs_if_enabled(env) mock_trace.get_tracer_provider.return_value.add_span_processor.assert_called_once() mock_processor.assert_called_once_with("test-service", 10) @@ -52,6 +52,6 @@ def test_uses_custom_sampling_interval(self, mock_processor, mock_trace): } env = Env(env_store) - start_callgraphs_if_enabled(env) + _configure_callgraphs_if_enabled(env) mock_processor.assert_called_once_with("test-service", 50) From 67ebe23c59fed4e6575c9d4d68ef6e57a3de2f5c Mon Sep 17 00:00:00 2001 From: Siim Kallas Date: Fri, 20 Feb 2026 15:55:03 +0200 Subject: [PATCH 06/10] add a lock to CallgraphsSpanProcessor --- src/splunk_otel/callgraphs/span_processor.py | 25 +++++++++++++------- tests/test_callgraphs_span_processor.py | 16 ++++++------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/src/splunk_otel/callgraphs/span_processor.py b/src/splunk_otel/callgraphs/span_processor.py index 123c4aac..d2942013 100644 --- a/src/splunk_otel/callgraphs/span_processor.py +++ b/src/splunk_otel/callgraphs/span_processor.py @@ -21,6 +21,8 @@ from splunk_otel.profile import ProfilingContext from splunk_otel.propagator import _SPLUNK_TRACE_SNAPSHOT_VOLUME +import threading + def _should_process_context(context: Optional[Context]) -> bool: parent_span = trace.get_current_span(context).get_span_context() @@ -33,8 +35,9 @@ def _should_process_context(context: Optional[Context]) -> bool: class CallgraphsSpanProcessor(SpanProcessor): def __init__(self, service_name: str, sampling_interval: Optional[int] = 10): - self.span_id_to_trace_id: dict[int, int] = {} - self.profiler = ProfilingContext( + self._span_id_to_trace_id: dict[int, int] = {} + self._lock = threading.Lock() + self._profiler = ProfilingContext( service_name, sampling_interval, self._filter_stacktraces, instrumentation_source="snapshot" ) @@ -55,20 +58,22 @@ def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None if span_ctx is None: return - self.span_id_to_trace_id[span_ctx.span_id] = span_ctx.trace_id - self.profiler.start() + with self._lock: + self._span_id_to_trace_id[span_ctx.span_id] = span_ctx.trace_id + self._profiler.start() def on_end(self, span: ReadableSpan) -> None: span_id = span.get_span_context().span_id - trace_id = self.span_id_to_trace_id.get(span_id) + trace_id = self._span_id_to_trace_id.get(span_id) if trace_id is None: return - del self.span_id_to_trace_id[span_id] + with self._lock: + self._span_id_to_trace_id.pop(span_id, None) - if len(self.span_id_to_trace_id) == 0: - self.profiler.pause_after(60.0) + if len(self._span_id_to_trace_id) == 0: + self._profiler.pause_after(60.0) def shutdown(self) -> None: pass @@ -78,6 +83,8 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: def _filter_stacktraces(self, stacktraces, active_trace_contexts): filtered = [] + with self._lock: + trace_ids = set(self._span_id_to_trace_id.values()) for stacktrace in stacktraces: thread_id = stacktrace["tid"] @@ -86,7 +93,7 @@ def _filter_stacktraces(self, stacktraces, active_trace_contexts): if maybe_context is not None: (trace_id, _span_id) = maybe_context - if trace_id in self.span_id_to_trace_id.values(): + if trace_id in trace_ids: filtered.append(stacktrace) return filtered diff --git a/tests/test_callgraphs_span_processor.py b/tests/test_callgraphs_span_processor.py index 22cbde0e..e4336ca1 100644 --- a/tests/test_callgraphs_span_processor.py +++ b/tests/test_callgraphs_span_processor.py @@ -101,13 +101,13 @@ def test_on_start_activates_profiling_when_baggage_is_highest(self, mock_profili span.set_attribute.assert_called_once_with("splunk.snapshot.profiling", True) mock_profiling_context.return_value.start.assert_called_once() - assert 456 in processor.span_id_to_trace_id - assert processor.span_id_to_trace_id[456] == 123 + assert 456 in processor._span_id_to_trace_id # noqa SLF001 + assert processor._span_id_to_trace_id[456] == 123 # noqa SLF001 @patch("splunk_otel.callgraphs.span_processor.ProfilingContext") def test_on_end_removes_span_from_tracking(self, mock_profiling_context): processor = CallgraphsSpanProcessor("test-service") - processor.span_id_to_trace_id[456] = 123 + processor._span_id_to_trace_id[456] = 123 # noqa SLF001 span = MagicMock(spec=Span) span_ctx = SpanContext(trace_id=123, span_id=456, is_remote=False) @@ -115,12 +115,12 @@ def test_on_end_removes_span_from_tracking(self, mock_profiling_context): processor.on_end(span) - assert 456 not in processor.span_id_to_trace_id + assert 456 not in processor._span_id_to_trace_id # noqa SLF001 @patch("splunk_otel.callgraphs.span_processor.ProfilingContext") def test_on_end_pauses_profiler_when_no_active_spans(self, mock_profiling_context): processor = CallgraphsSpanProcessor("test-service") - processor.span_id_to_trace_id[456] = 123 + processor._span_id_to_trace_id[456] = 123 # noqa SLF001 span = MagicMock(spec=Span) span_ctx = SpanContext(trace_id=123, span_id=456, is_remote=False) @@ -133,8 +133,8 @@ def test_on_end_pauses_profiler_when_no_active_spans(self, mock_profiling_contex @patch("splunk_otel.callgraphs.span_processor.ProfilingContext") def test_on_end_does_not_pause_profiler_when_other_spans_active(self, mock_profiling_context): processor = CallgraphsSpanProcessor("test-service") - processor.span_id_to_trace_id[456] = 123 - processor.span_id_to_trace_id[789] = 123 + processor._span_id_to_trace_id[456] = 123 # noqa SLF001 + processor._span_id_to_trace_id[789] = 123 # noqa SLF001 span = MagicMock(spec=Span) span_ctx = SpanContext(trace_id=123, span_id=456, is_remote=False) @@ -159,7 +159,7 @@ def test_on_end_ignores_untracked_spans(self, mock_profiling_context): @patch("splunk_otel.callgraphs.span_processor.ProfilingContext") def test_filter_stacktraces_keeps_active_traces(self, mock_profiling_context): processor = CallgraphsSpanProcessor("test-service") - processor.span_id_to_trace_id[456] = 123 + processor._span_id_to_trace_id[456] = 123 # noqa SLF001 stacktraces = [ {"tid": 1, "frames": []}, From a8e5014a384125e91ff23774ea73c3156c00cc57 Mon Sep 17 00:00:00 2001 From: Siim Kallas Date: Fri, 20 Feb 2026 18:39:00 +0200 Subject: [PATCH 07/10] fix the profiler sleep logic --- src/splunk_otel/profile.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/splunk_otel/profile.py b/src/splunk_otel/profile.py index 47c79572..45b9d9e3 100644 --- a/src/splunk_otel/profile.py +++ b/src/splunk_otel/profile.py @@ -277,11 +277,10 @@ def _loop(self): while self.running: start_time_seconds = time.monotonic() - # Pause has been been requested, sleep until pause_at or until woken. - if self.pause_at is not None and start_time_seconds < self.pause_at: - if not self.wakeup_event.wait(timeout=self.pause_at - start_time_seconds): - self.pause_at = None - self.wakeup_event.clear() + if self.pause_at is not None and start_time_seconds >= self.pause_at: + # The pause deadline has been reached, sleep until woken again. + self.wakeup_event.wait() + self.pause_at = None continue self.target() @@ -291,10 +290,12 @@ def _loop(self): def stop(self): self.running = False + self.pause_at = None self.wakeup_event.set() self.thread.join() def pause_after(self, seconds: float): + # The timer will stay running until pause_at has been reached. self.pause_at = time.monotonic() + seconds self.wakeup_event.clear() From c38b767c060e4cb0bd1f72fc8dca8309205c0df7 Mon Sep 17 00:00:00 2001 From: Siim Kallas Date: Fri, 20 Feb 2026 22:37:32 +0200 Subject: [PATCH 08/10] fix a race condition when waking up the profiler thread --- src/splunk_otel/profile.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/splunk_otel/profile.py b/src/splunk_otel/profile.py index 45b9d9e3..4735b3da 100644 --- a/src/splunk_otel/profile.py +++ b/src/splunk_otel/profile.py @@ -280,7 +280,6 @@ def _loop(self): if self.pause_at is not None and start_time_seconds >= self.pause_at: # The pause deadline has been reached, sleep until woken again. self.wakeup_event.wait() - self.pause_at = None continue self.target() From d1613c74a3adc804d0ebc5d83f0ca8cdba7a87f2 Mon Sep 17 00:00:00 2001 From: Siim Kallas Date: Thu, 26 Feb 2026 14:29:39 +0200 Subject: [PATCH 09/10] shut down profiler on span processor shutdown --- src/splunk_otel/callgraphs/span_processor.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/splunk_otel/callgraphs/span_processor.py b/src/splunk_otel/callgraphs/span_processor.py index d2942013..58804032 100644 --- a/src/splunk_otel/callgraphs/span_processor.py +++ b/src/splunk_otel/callgraphs/span_processor.py @@ -27,10 +27,9 @@ def _should_process_context(context: Optional[Context]) -> bool: parent_span = trace.get_current_span(context).get_span_context() - if not parent_span.is_valid: - return True + is_root_span = not parent_span.is_valid - return parent_span.is_remote + return is_root_span or parent_span.is_remote class CallgraphsSpanProcessor(SpanProcessor): @@ -76,7 +75,7 @@ def on_end(self, span: ReadableSpan) -> None: self._profiler.pause_after(60.0) def shutdown(self) -> None: - pass + self._profiler.stop() def force_flush(self, timeout_millis: int = 30000) -> bool: return True From b3e7e0b19ca77ea5ebbfb9ed04c65c674b97889a Mon Sep 17 00:00:00 2001 From: Pablo Collins Date: Thu, 26 Feb 2026 07:56:03 -0500 Subject: [PATCH 10/10] pin virtualenv in ci --- .github/workflows/ci-main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci-main.yml b/.github/workflows/ci-main.yml index d04bb564..b7cda6be 100644 --- a/.github/workflows/ci-main.yml +++ b/.github/workflows/ci-main.yml @@ -33,7 +33,7 @@ jobs: python-version: ${{ matrix.python-version }} - name: Install Hatch - run: pip install --upgrade hatch + run: pip install --upgrade hatch "virtualenv<21" - name: Run static analysis run: hatch fmt --check