Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 0 additions & 44 deletions ddtrace/bootstrap/preload.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@

import typing as t

from ddtrace import config # noqa:F401
from ddtrace.internal.logger import get_logger # noqa:F401
from ddtrace.internal.module import ModuleWatchdog # noqa:F401
from ddtrace.internal.products import manager # noqa:F401
from ddtrace.internal.runtime.runtime_metrics import RuntimeWorker # noqa:F401
from ddtrace.internal.settings.crashtracker import config as crashtracker_config
from ddtrace.internal.settings.profiling import config as profiling_config # noqa:F401
from ddtrace.trace import tracer

Expand Down Expand Up @@ -42,54 +39,13 @@ def register_post_preload(func: t.Callable) -> None:
register_post_preload(manager.post_preload_products)


# TODO: Migrate the following product logic to the new product plugin interface

# DEV: We want to start the crashtracker as early as possible
if crashtracker_config.enabled:
try:
from ddtrace.internal.core import crashtracking

crashtracking.start()
except Exception:
log.error("failed to enable crashtracking", exc_info=True)


if profiling_config.enabled:
log.debug("profiler enabled via environment variable")
try:
import ddtrace.profiling.auto # noqa: F401
except Exception:
log.error("failed to enable profiling", exc_info=True)

if config._runtime_metrics_enabled:
RuntimeWorker.enable()


@ModuleWatchdog.after_module_imported("opentelemetry")
def _otel_signals(_):
if config._otel_trace_enabled:
from opentelemetry.trace import set_tracer_provider

from ddtrace.opentelemetry import TracerProvider

set_tracer_provider(TracerProvider())

if config._otel_logs_enabled:
from ddtrace.internal.opentelemetry.logs import set_otel_logs_provider

set_otel_logs_provider()

if config._otel_metrics_enabled:
from ddtrace.internal.opentelemetry.metrics import set_otel_meter_provider

set_otel_meter_provider()


if config._llmobs_enabled:
from ddtrace.llmobs import LLMObs

LLMObs.enable(_auto=True)


@ModuleWatchdog.after_module_imported("gevent.monkey")
def _(_):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from ddtrace import config
from ddtrace import version
from ddtrace.internal import forksafe
from ddtrace.internal import process_tags
from ddtrace.internal.compat import ensure_text
from ddtrace.internal.logger import get_logger
Expand Down Expand Up @@ -168,6 +167,15 @@ def is_started() -> bool:
return crashtracker_status() == CrashtrackerStatus.Initialized


def restart(additional_tags: Optional[dict[str, str]] = None) -> None:
# We recreate the args here mainly to pass updated runtime_id after fork.
config, receiver_config, metadata = _get_args(additional_tags)
if config is None or receiver_config is None or metadata is None:
log.error("Failed to restart crashtracker after fork: failed to construct crashtracker configuration")
return
crashtracker_on_fork(config, receiver_config, metadata)


def start(additional_tags: Optional[dict[str, str]] = None) -> bool:
if not is_available:
return False
Expand All @@ -186,17 +194,6 @@ def start(additional_tags: Optional[dict[str, str]] = None) -> bool:
# )

crashtracker_init(config, receiver_config, metadata)

def crashtracker_fork_handler():
# We recreate the args here mainly to pass updated runtime_id after
# fork
config, receiver_config, metadata = _get_args(additional_tags)
if config is None or receiver_config is None or metadata is None:
log.error("Failed to restart crashtracker after fork: failed to construct crashtracker configuration")
return
crashtracker_on_fork(config, receiver_config, metadata)

forksafe.register(crashtracker_fork_handler)
except Exception:
log.exception("Failed to start crashtracker")
return False
Expand Down
25 changes: 25 additions & 0 deletions ddtrace/internal/core/crashtracking/product.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from ddtrace.internal.settings.crashtracker import config


def post_preload() -> None:
pass


def enabled() -> bool:
return config.enabled


def start() -> None:
from ddtrace.internal.core import crashtracking

crashtracking.start()


def restart(join: bool = False) -> None:
from ddtrace.internal.core import crashtracking

crashtracking.restart()


def stop(join: bool = False) -> None:
pass # No explicit stop for crashtracking
43 changes: 43 additions & 0 deletions ddtrace/internal/opentelemetry/product.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
requires = ["tracer"]


def post_preload() -> None:
pass


def enabled() -> bool:
from ddtrace import config

return config._otel_enabled


def start() -> None:
from ddtrace import config
from ddtrace.internal.module import ModuleWatchdog

@ModuleWatchdog.after_module_imported("opentelemetry")
def _(_):
if config._otel_trace_enabled:
from opentelemetry.trace import set_tracer_provider

from ddtrace.opentelemetry import TracerProvider

set_tracer_provider(TracerProvider())

if config._otel_logs_enabled:
from ddtrace.internal.opentelemetry.logs import set_otel_logs_provider

set_otel_logs_provider()

if config._otel_metrics_enabled:
from ddtrace.internal.opentelemetry.metrics import set_otel_meter_provider

set_otel_meter_provider()


def restart(join: bool = False) -> None:
pass


def stop(join: bool = False) -> None:
pass
2 changes: 2 additions & 0 deletions ddtrace/internal/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ def restart_products(self, join: bool = False) -> None:
continue

try:
if not product.enabled():
continue
product.restart(join=join)
log.debug("Restarted product '%s'", name)
except Exception:
Expand Down
30 changes: 30 additions & 0 deletions ddtrace/internal/runtime/product.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
requires = ["tracer"]


def post_preload() -> None:
pass


def enabled() -> bool:
from ddtrace import config

return config._runtime_metrics_enabled


def start() -> None:
from ddtrace.internal.runtime.runtime_metrics import RuntimeWorker

RuntimeWorker.enable()


def restart(join: bool = False) -> None:
from ddtrace.internal.runtime.runtime_metrics import RuntimeWorker

RuntimeWorker.disable()
RuntimeWorker.enable()


def stop(join: bool = False) -> None:
from ddtrace.internal.runtime.runtime_metrics import RuntimeWorker

RuntimeWorker.disable()
15 changes: 9 additions & 6 deletions ddtrace/llmobs/_evaluators/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,15 @@ def _stop_service(self) -> None:
self.periodic(_wait_sync=True)
self.executor.shutdown(wait=True)

def recreate(self) -> "EvaluatorRunner":
return self.__class__(
interval=self._interval,
llmobs_service=self.llmobs_service,
evaluators=self.evaluators,
)
def reset(self) -> None:
"""Discard buffered spans and recreate the executor after a fork.

The worker thread is restarted automatically; the ThreadPoolExecutor is not
fork-safe (its internal threads die across fork) so it must be recreated.
"""
with self._lock:
self._buffer = []
self.executor = futures.ThreadPoolExecutor()

def enqueue(self, span_event: LLMObsSpanEvent, span: Span) -> None:
if self.status == ServiceStatus.STOPPED:
Expand Down
13 changes: 3 additions & 10 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from ddtrace.ext import SpanTypes
from ddtrace.internal import atexit
from ddtrace.internal import core
from ddtrace.internal import forksafe
from ddtrace.internal.compat import ensure_text
from ddtrace.internal.logger import get_logger
from ddtrace.internal.native import generate_128bit_trace_id
Expand Down Expand Up @@ -471,8 +470,6 @@ def __init__(
is_agentless=True, # agent proxy doesn't seem to work for experiments
)

forksafe.register(self._child_after_fork)

self._link_tracker = LinkTracker()
self._annotations: list[tuple[str, str, dict[str, Any]]] = []
self._annotation_context_lock = RLock()
Expand Down Expand Up @@ -888,12 +885,10 @@ def _do_annotations(self, span: Span) -> None:
self.annotate(span, **annotation_kwargs, _suppress_span_kind_error=True)

def _child_after_fork(self) -> None:
self._llmobs_span_writer = self._llmobs_span_writer.recreate()
self._llmobs_eval_metric_writer = self._llmobs_eval_metric_writer.recreate()
self._evaluator_runner = self._evaluator_runner.recreate()
self._llmobs_span_writer.reset()
self._llmobs_eval_metric_writer.reset()
self._evaluator_runner.reset()
LLMObs._prompt_manager = None
if self.enabled:
self._start_service()

def _start_service(self) -> None:
try:
Expand Down Expand Up @@ -949,8 +944,6 @@ def _stop_service(self) -> None:
self._link_tracker.on_openai_agent_span_finish,
)

forksafe.unregister(self._child_after_fork)

@classmethod
def enable(
cls,
Expand Down
14 changes: 5 additions & 9 deletions ddtrace/llmobs/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,15 +325,11 @@ def _data(self, events):
"""Return payload containing events to be encoded and submitted to LLM Observability."""
raise NotImplementedError

def recreate(self):
return self.__class__(
interval=self._interval,
timeout=self._timeout,
is_agentless=self._agentless,
_site=self._site,
_api_key=self._api_key,
_override_url=self._override_url,
)
def reset(self) -> None:
"""Discard buffered events after a fork. The worker thread is restarted automatically."""
with self._lock:
self._buffer = []
self._buffer_size = 0


class LLMObsEvalMetricWriter(BaseLLMObsWriter):
Expand Down
30 changes: 30 additions & 0 deletions ddtrace/llmobs/product.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
requires = ["tracer"]


def post_preload() -> None:
pass


def enabled() -> bool:
from ddtrace import config

return config._llmobs_enabled


def start() -> None:
from ddtrace.llmobs import LLMObs

LLMObs.enable(_auto=True)


def restart(join: bool = False) -> None:
from ddtrace.llmobs import LLMObs

if LLMObs._instance is not None:
LLMObs._instance._child_after_fork()


def stop(join: bool = False) -> None:
from ddtrace.llmobs import LLMObs

LLMObs.disable()
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ datadog = "ddtrace.contrib.internal.mlflow.auth_plugin:DatadogHeaderProvider"
"appsec" = "ddtrace.internal.appsec.product"
"iast" = "ddtrace.internal.iast.product"
"tracer" = "ddtrace._trace.product"
"crashtracking" = "ddtrace.internal.core.crashtracking.product"
"runtime-metrics" = "ddtrace.internal.runtime.product"
"opentelemetry" = "ddtrace.internal.opentelemetry.product"
"llmobs" = "ddtrace.llmobs.product"

[project.urls]
"Bug Tracker" = "https://github.com/DataDog/dd-trace-py/issues"
Expand Down
Loading