Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ddtrace/internal/settings/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,9 @@ def __init__(self) -> None:
"DD_LLMOBS_PAYLOAD_SIZE_BYTES", DEFAULT_EVP_PAYLOAD_SIZE_LIMIT, int
)
self._llmobs_event_size_limit = _get_config("DD_LLMOBS_EVENT_SIZE_BYTES", DEFAULT_EVP_EVENT_SIZE_LIMIT, int)
self._llmobs_graceful_termination_period = _get_config(
"DD_LLMOBS_GRACEFUL_TERMINATION_PERIOD", 0, float
)
self._inject_force = _get_config("DD_INJECT_FORCE", None, asbool)
# Telemetry for whether ssi instrumented an app is tracked by the `instrumentation_source` config
self._lib_was_injected = _get_config("_DD_PY_SSI_INJECT", False, asbool, report_telemetry=False)
Expand Down
15 changes: 15 additions & 0 deletions ddtrace/llmobs/_llmobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import math
import os
import sys
import threading
import time
from typing import Any
from typing import Callable
Expand Down Expand Up @@ -436,6 +437,7 @@ def _build_span_meta(
class LLMObs(Service):
_instance = None # type: LLMObs
enabled = False
SHUTDOWN_TIMEOUT = 5
_app_key: str = os.getenv("DD_APP_KEY", "")
_project_name: str = os.getenv("DD_LLMOBS_PROJECT_NAME", DEFAULT_PROJECT_NAME)

Expand Down Expand Up @@ -919,6 +921,8 @@ def _stop_service(self) -> None:
try:
self._llmobs_span_writer.stop()
self._llmobs_eval_metric_writer.stop()
self._llmobs_span_writer.join(self.SHUTDOWN_TIMEOUT)
self._llmobs_eval_metric_writer.join(self.SHUTDOWN_TIMEOUT)
except ServiceStatusError:
log.debug("Error stopping LLMObs writers")

Expand Down Expand Up @@ -1085,6 +1089,7 @@ def enable(
)

atexit.register(cls.disable)
atexit.register_on_exit_signal(cls._on_exit_signal)
telemetry_writer.product_activated(TELEMETRY_APM_PRODUCT.LLMOBS, True)

log.debug(
Expand Down Expand Up @@ -1732,6 +1737,16 @@ def _integration_is_enabled(cls, integration: str) -> bool:
return False
return SUPPORTED_LLMOBS_INTEGRATIONS[integration] in ddtrace._monkey._get_patched_modules()

@classmethod
def _on_exit_signal(cls) -> None:
"""Called on SIGTERM/SIGINT. Waits for the configured grace period before disabling."""
grace_period = config._llmobs_graceful_termination_period
if grace_period > 0:
log.debug("Received exit signal; disabling %s after %.1f second grace period", cls.__name__, grace_period)
threading.Timer(grace_period, cls.disable).start()
else:
cls.disable()

@classmethod
def disable(cls) -> None:
if not cls.enabled:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
features:
- |
LLM Observability: Adds graceful SIGTERM handling so buffered spans and evaluation metrics
are flushed before process exit. Set ``DD_LLMOBS_GRACEFUL_TERMINATION_PERIOD`` (default
``0``) to delay shutdown by the given number of seconds, allowing the application to
continue writing spans during a Kubernetes graceful termination window before stopping.
64 changes: 64 additions & 0 deletions tests/llmobs/test_llmobs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,70 @@ def test_service_disable(tracer):
assert llmobs_service._instance._evaluator_runner.status.value == "stopped"


def test_shutdown_timeout_default():
assert llmobs_service.SHUTDOWN_TIMEOUT == 5


@mock.patch("ddtrace.llmobs._llmobs.atexit.register_on_exit_signal")
def test_enable_registers_sigterm_handler(mock_register_on_exit_signal, tracer):
with override_global_config(dict(_dd_api_key="<not-a-real-api-key>", _llmobs_ml_app="<ml-app-name>")):
llmobs_service.enable(_tracer=tracer)
mock_register_on_exit_signal.assert_called_once_with(llmobs_service._on_exit_signal)
llmobs_service.disable()


def test_sigterm_disables_llmobs(tracer):
with override_global_config(dict(_dd_api_key="<not-a-real-api-key>", _llmobs_ml_app="<ml-app-name>")):
with mock.patch("ddtrace.llmobs._llmobs.atexit.register_on_exit_signal") as mock_register:
llmobs_service.enable(_tracer=tracer)
# Invoke the registered function directly, bypassing the OS signal chain
# (which would re-raise SIGTERM and kill the process)
registered_fn = mock_register.call_args[0][0]
registered_fn()
assert llmobs_service.enabled is False
assert llmobs_service._instance._llmobs_span_writer.status.value == "stopped"
assert llmobs_service._instance._llmobs_eval_metric_writer.status.value == "stopped"


def test_sigterm_with_grace_period_delays_disable(tracer):
with override_global_config(
dict(_dd_api_key="<not-a-real-api-key>", _llmobs_ml_app="<ml-app-name>", _llmobs_graceful_termination_period=30)
):
with mock.patch("ddtrace.llmobs._llmobs.atexit.register_on_exit_signal") as mock_register:
llmobs_service.enable(_tracer=tracer)
with mock.patch("ddtrace.llmobs._llmobs.threading.Timer") as mock_timer:
registered_fn = mock_register.call_args[0][0]
registered_fn()
mock_timer.assert_called_once_with(30, llmobs_service.disable)
mock_timer.return_value.start.assert_called_once()
assert llmobs_service.enabled is True # not yet disabled — timer hasn't fired
llmobs_service.disable()


def test_sigterm_with_zero_grace_period_disables_immediately(tracer):
with override_global_config(
dict(_dd_api_key="<not-a-real-api-key>", _llmobs_ml_app="<ml-app-name>", _llmobs_graceful_termination_period=0)
):
with mock.patch("ddtrace.llmobs._llmobs.atexit.register_on_exit_signal") as mock_register:
llmobs_service.enable(_tracer=tracer)
registered_fn = mock_register.call_args[0][0]
registered_fn()
assert llmobs_service.enabled is False


def test_sigterm_joins_writers_with_shutdown_timeout(tracer):
with override_global_config(dict(_dd_api_key="<not-a-real-api-key>", _llmobs_ml_app="<ml-app-name>")):
llmobs_service.enable(_tracer=tracer)
span_writer = llmobs_service._instance._llmobs_span_writer
eval_writer = llmobs_service._instance._llmobs_eval_metric_writer
with mock.patch.object(span_writer, "join") as mock_span_join, mock.patch.object(
eval_writer, "join"
) as mock_eval_join:
llmobs_service.disable()
mock_span_join.assert_called_once_with(llmobs_service.SHUTDOWN_TIMEOUT)
mock_eval_join.assert_called_once_with(llmobs_service.SHUTDOWN_TIMEOUT)


def test_service_enable_no_api_key(tracer):
with override_global_config(dict(_dd_api_key="", _llmobs_ml_app="<ml-app-name>")):
with pytest.raises(ValueError):
Expand Down
1 change: 1 addition & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def override_global_config(values):
"_llmobs_instrumented_proxy_urls",
"_llmobs_payload_size_limit",
"_llmobs_event_size_limit",
"_llmobs_graceful_termination_period",
"_data_streams_enabled",
"_inferred_proxy_services_enabled",
"_lib_was_injected",
Expand Down
Loading