Skip to content
2 changes: 1 addition & 1 deletion .github/workflows/ci-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
python-version: ${{ matrix.python-version }}

- name: Install Hatch
run: pip install --upgrade hatch
run: pip install --upgrade hatch "virtualenv<21"

- name: Run static analysis
run: hatch fmt --check
Expand Down
17 changes: 17 additions & 0 deletions src/splunk_otel/callgraphs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from opentelemetry import trace
from opentelemetry.sdk.environment_variables import OTEL_SERVICE_NAME

from splunk_otel.callgraphs.span_processor import CallgraphsSpanProcessor
from splunk_otel.env import (
Env,
SPLUNK_SNAPSHOT_PROFILER_ENABLED,
SPLUNK_SNAPSHOT_SAMPLING_INTERVAL,
)


def _configure_callgraphs_if_enabled(env=None):
env = env or Env()
if env.is_true(SPLUNK_SNAPSHOT_PROFILER_ENABLED):
trace.get_tracer_provider().add_span_processor(
CallgraphsSpanProcessor(env.getval(OTEL_SERVICE_NAME), env.getint(SPLUNK_SNAPSHOT_SAMPLING_INTERVAL, 10))
)
98 changes: 98 additions & 0 deletions src/splunk_otel/callgraphs/span_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Copyright Splunk Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Optional

from opentelemetry import baggage, trace
from opentelemetry.context import Context
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor

from splunk_otel.profile import ProfilingContext
from splunk_otel.propagator import _SPLUNK_TRACE_SNAPSHOT_VOLUME

import threading


def _should_process_context(context: Optional[Context]) -> bool:
parent_span = trace.get_current_span(context).get_span_context()

is_root_span = not parent_span.is_valid

return is_root_span or parent_span.is_remote


class CallgraphsSpanProcessor(SpanProcessor):
def __init__(self, service_name: str, sampling_interval: Optional[int] = 10):
self._span_id_to_trace_id: dict[int, int] = {}
self._lock = threading.Lock()
self._profiler = ProfilingContext(
service_name, sampling_interval, self._filter_stacktraces, instrumentation_source="snapshot"
)

def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None:
if not _should_process_context(parent_context):
return

ctx_baggage = baggage.get_baggage(_SPLUNK_TRACE_SNAPSHOT_VOLUME, parent_context)

if ctx_baggage is None:
return

if ctx_baggage == "highest":
span.set_attribute("splunk.snapshot.profiling", True)

span_ctx = span.get_span_context()

if span_ctx is None:
return

with self._lock:
self._span_id_to_trace_id[span_ctx.span_id] = span_ctx.trace_id
self._profiler.start()

def on_end(self, span: ReadableSpan) -> None:
span_id = span.get_span_context().span_id
trace_id = self._span_id_to_trace_id.get(span_id)

if trace_id is None:
return

with self._lock:
self._span_id_to_trace_id.pop(span_id, None)

if len(self._span_id_to_trace_id) == 0:
self._profiler.pause_after(60.0)

def shutdown(self) -> None:
self._profiler.stop()

def force_flush(self, timeout_millis: int = 30000) -> bool:
return True

def _filter_stacktraces(self, stacktraces, active_trace_contexts):
filtered = []
with self._lock:
trace_ids = set(self._span_id_to_trace_id.values())

for stacktrace in stacktraces:
thread_id = stacktrace["tid"]

maybe_context = active_trace_contexts.get(thread_id)

if maybe_context is not None:
(trace_id, _span_id) = maybe_context
if trace_id in trace_ids:
filtered.append(stacktrace)

return filtered
2 changes: 2 additions & 0 deletions src/splunk_otel/configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
from opentelemetry.sdk._configuration import _OTelSDKConfigurator

from splunk_otel.profile import _start_profiling_if_enabled
from splunk_otel.callgraphs import _configure_callgraphs_if_enabled


class SplunkConfigurator(_OTelSDKConfigurator):
def _configure(self, **kwargs):
super()._configure(**kwargs)
_start_profiling_if_enabled()
_configure_callgraphs_if_enabled()
18 changes: 17 additions & 1 deletion src/splunk_otel/distro.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from opentelemetry.instrumentation.distro import BaseDistro
from opentelemetry.instrumentation.propagators import set_global_response_propagator
from opentelemetry.propagators.composite import CompositePropagator
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_OTLP_HEADERS,
OTEL_EXPORTER_OTLP_LOGS_ENDPOINT,
Expand All @@ -25,6 +26,7 @@
OTEL_RESOURCE_ATTRIBUTES,
OTEL_SERVICE_NAME,
)
from opentelemetry.propagate import get_global_textmap, set_global_textmap

from splunk_otel.__about__ import __version__ as version
from splunk_otel.env import (
Expand All @@ -33,10 +35,12 @@
SPLUNK_PROFILER_ENABLED,
SPLUNK_PROFILER_LOGS_ENDPOINT,
SPLUNK_REALM,
SPLUNK_SNAPSHOT_PROFILER_ENABLED,
SPLUNK_SNAPSHOT_SELECTION_PROBABILITY,
SPLUNK_TRACE_RESPONSE_HEADER_ENABLED,
Env,
)
from splunk_otel.propagator import ServerTimingResponsePropagator
from splunk_otel.propagator import CallgraphsPropagator, ServerTimingResponsePropagator

_DISTRO_NAME = "splunk-opentelemetry"

Expand Down Expand Up @@ -66,6 +70,7 @@ def _configure(self, **kwargs):
self.handle_realm()
self.configure_token_headers()
self.set_server_timing_propagator()
self.set_callgraphs_propagator()

def set_env_defaults(self):
for key, value in DEFAULTS.items():
Expand Down Expand Up @@ -110,3 +115,14 @@ def configure_token_headers(self):
def set_server_timing_propagator(self):
if self.env.is_true(SPLUNK_TRACE_RESPONSE_HEADER_ENABLED, "true"):
set_global_response_propagator(ServerTimingResponsePropagator())

def set_callgraphs_propagator(self):
if self.env.is_true(SPLUNK_SNAPSHOT_PROFILER_ENABLED, "false"):
set_global_textmap(
CompositePropagator(
[
get_global_textmap(),
CallgraphsPropagator(self.env.getfloat(SPLUNK_SNAPSHOT_SELECTION_PROBABILITY, 0.01)),
]
)
)
11 changes: 11 additions & 0 deletions src/splunk_otel/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
SPLUNK_PROFILER_ENABLED = "SPLUNK_PROFILER_ENABLED"
SPLUNK_PROFILER_CALL_STACK_INTERVAL = "SPLUNK_PROFILER_CALL_STACK_INTERVAL"
SPLUNK_PROFILER_LOGS_ENDPOINT = "SPLUNK_PROFILER_LOGS_ENDPOINT"
SPLUNK_SNAPSHOT_PROFILER_ENABLED = "SPLUNK_SNAPSHOT_PROFILER_ENABLED"
SPLUNK_SNAPSHOT_SAMPLING_INTERVAL = "SPLUNK_SNAPSHOT_SAMPLING_INTERVAL"
SPLUNK_SNAPSHOT_SELECTION_PROBABILITY = "SPLUNK_SNAPSHOT_SELECTION_PROBABILITY"
SPLUNK_REALM = "SPLUNK_REALM"

_pylogger = logging.getLogger(__name__)
Expand Down Expand Up @@ -85,6 +88,14 @@ def getint(self, key, default=0):
_pylogger.warning("Invalid integer value of '%s' for env var '%s'", val, key)
return default

def getfloat(self, key, default=0.0):
val = self.getval(key, str(default))
try:
return float(val)
except ValueError:
_pylogger.warning("Invalid float value of '%s' for env var '%s'", val, key)
return default

def setval(self, key, value):
self.store[key] = value

Expand Down
Loading