diff --git a/.changelog/5280.added b/.changelog/5280.added new file mode 100644 index 0000000000..f86d7ebe8f --- /dev/null +++ b/.changelog/5280.added @@ -0,0 +1 @@ +`opentelemetry-sdk`: Add ability to refresh process sensitive Resource attributes diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py index b6b1ce88f6..fc5693846d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_logs/_internal/__init__.py @@ -8,6 +8,7 @@ import concurrent.futures import json import logging +import os import threading import traceback import warnings @@ -24,7 +25,7 @@ cast, overload, ) -from weakref import WeakSet +from weakref import WeakMethod, WeakSet from typing_extensions import deprecated @@ -59,7 +60,10 @@ from opentelemetry.sdk.environment_variables._internal import ( parse_boolean_environment_variable, ) -from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.resources import ( + Resource, + _get_process_sensitive_resource, +) from opentelemetry.sdk.util import ns_to_iso_str from opentelemetry.sdk.util._configurator import RuleBasedConfigurator from opentelemetry.sdk.util.instrumentation import ( @@ -693,6 +697,9 @@ def _is_enabled(self) -> bool: def _set_logger_config(self, logger_config: _LoggerConfig) -> None: self._logger_config = logger_config + def _set_resource(self, resource: Resource) -> None: + self._resource = resource + @property def instrumentation_scope(self): return self._instrumentation_scope @@ -812,11 +819,31 @@ def __init__( self._logger_cache_lock = Lock() self._active_loggers: WeakSet[Logger] = WeakSet() self._active_loggers_lock = Lock() + if hasattr(os, "register_at_fork"): + weak_at_fork = WeakMethod(self._handle_fork) + + def _after_in_child() -> None: + if at_fork := weak_at_fork(): + at_fork() + + os.register_at_fork(after_in_child=_after_in_child) + + def _handle_fork(self) -> None: + self._logger_cache_lock = Lock() + self._active_loggers_lock = Lock() + self._update_resource(_get_process_sensitive_resource()) @property def resource(self): return self._resource + def _update_resource(self, resource: Resource) -> None: + with self._active_loggers_lock: + self._resource = self._resource.merge(resource) + for logger in list(self._active_loggers): + # pylint: disable-next=protected-access + logger._set_resource(self._resource) + def _get_logger_no_cache( self, name: str, @@ -897,7 +924,7 @@ def _set_logger_configurator( """ self._logger_configurator = logger_configurator with self._active_loggers_lock: - for logger in self._active_loggers: + for logger in list(self._active_loggers): # pylint: disable-next=protected-access logger._set_logger_config( self._apply_logger_configurator( diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py index 5b8c2600c2..3250443025 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py @@ -1,6 +1,7 @@ # Copyright The OpenTelemetry Authors # SPDX-License-Identifier: Apache-2.0 +import os import weakref from atexit import register, unregister from collections.abc import Callable, Sequence @@ -51,7 +52,10 @@ from opentelemetry.sdk.metrics._internal.sdk_configuration import ( SdkConfiguration, ) -from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.resources import ( + Resource, + _get_process_sensitive_resource, +) from opentelemetry.sdk.util._configurator import RuleBasedConfigurator from opentelemetry.sdk.util.instrumentation import ( InstrumentationScope, @@ -526,6 +530,21 @@ def __init__( ) metric_reader._set_meter_provider(self) + if hasattr(os, "register_at_fork"): + weak_at_fork = weakref.WeakMethod(self._handle_fork) + + def _after_in_child() -> None: + if at_fork := weak_at_fork(): + at_fork() + + os.register_at_fork(after_in_child=_after_in_child) + + def _handle_fork(self) -> None: + self._lock = Lock() + self._meter_lock = Lock() + type(self)._all_metric_readers_lock = Lock() + self._update_resource(_get_process_sensitive_resource()) + def _set_meter_configurator( self, *, meter_configurator: _MeterConfiguratorT ): @@ -543,6 +562,12 @@ def _set_meter_configurator( self._apply_meter_configurator(instrumentation_scope) ) + def _update_resource(self, resource: Resource) -> None: + with self._meter_lock: + self._sdk_config.resource = self._sdk_config.resource.merge( + resource + ) + def _apply_meter_configurator( self, instrumentation_scope: InstrumentationScope ) -> _MeterConfig: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py index 6585741a58..851b60784f 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/resources/__init__.py @@ -294,6 +294,21 @@ def detect(self) -> "Resource": """Don't call `Resource.create` here to avoid an infinite loop, instead instantiate `Resource` directly""" raise NotImplementedError() + # pylint: disable-next=no-self-use + def is_process_sensitive(self) -> bool: + """Return whether this detector depends on the current process identity. + + Process sensitive detectors may return resource attributes that become + stale after a process identity change, such as :func:`os.fork`. + Detectors returning ``True`` should be re-run after such changes so the + resulting :class:`Resource` describes the current process. + + Returns: + ``True`` if this detector should be re-run after process identity + changes otherwise ``False``. + """ + return False + class OTELResourceDetector(ResourceDetector): # pylint: disable=no-self-use @@ -323,6 +338,9 @@ def detect(self) -> "Resource": class ProcessResourceDetector(ResourceDetector): # pylint: disable=no-self-use + def is_process_sensitive(self) -> bool: + return True + def detect(self) -> "Resource": _runtime_version = ".".join( map( @@ -478,6 +496,9 @@ class ServiceInstanceIdResourceDetector(ResourceDetector): regenerated automatically when the process PID changes (e.g. after a fork). """ + def is_process_sensitive(self) -> bool: + return True + def detect(self) -> "Resource": # pylint: disable-next=global-statement global _service_instance_id, _service_instance_id_pid @@ -562,6 +583,17 @@ def _build_resource_detectors() -> list["ResourceDetector"]: return detectors +def _get_process_sensitive_resource() -> Resource: # pyright: ignore[reportUnusedFunction] + return get_aggregated_resources( + [ + detector + for detector in _build_resource_detectors() + if detector.is_process_sensitive() + ], + Resource.get_empty(), + ) + + def get_aggregated_resources( detectors: Sequence["ResourceDetector"], initial_resource: Resource | None = None, diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index d292fed8e5..79e52e51f8 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -50,7 +50,10 @@ from opentelemetry.sdk.environment_variables._internal import ( parse_boolean_environment_variable, ) -from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.resources import ( + Resource, + _get_process_sensitive_resource, +) from opentelemetry.sdk.trace import sampling from opentelemetry.sdk.trace._tracer_metrics import create_tracer_metrics from opentelemetry.sdk.trace.id_generator import IdGenerator, RandomIdGenerator @@ -1141,6 +1144,9 @@ def __init__( def _set_tracer_config(self, tracer_config: _TracerConfig): self._tracer_config = tracer_config + def _set_resource(self, resource: Resource) -> None: + self.resource = resource + def _is_enabled(self) -> bool: """If the tracer is not enabled, start_span will create a NonRecordingSpan""" return self._tracer_config.is_enabled @@ -1345,6 +1351,18 @@ def __init__( ) self._tracers_lock = threading.Lock() self._tracers: dict[InstrumentationScope, Tracer] = {} + if hasattr(os, "register_at_fork"): + weak_at_fork = weakref.WeakMethod(self._handle_fork) + + def _after_in_child() -> None: + if at_fork := weak_at_fork(): + at_fork() + + os.register_at_fork(after_in_child=_after_in_child) + + def _handle_fork(self) -> None: + self._tracers_lock = threading.Lock() + self._update_resource(_get_process_sensitive_resource()) def _set_tracer_configurator( self, *, tracer_configurator: _TracerConfiguratorT @@ -1367,6 +1385,12 @@ def _set_tracer_configurator( def resource(self) -> Resource: return self._resource + def _update_resource(self, resource: Resource) -> None: + with self._tracers_lock: + self._resource = self._resource.merge(resource) + for tracer in self._tracers.values(): + tracer._set_resource(self._resource) # pylint: disable=protected-access + def _apply_tracer_configurator( self, instrumentation_scope: InstrumentationScope ): diff --git a/opentelemetry-sdk/tests/logs/scripts/logger_provider_resource_after_fork.py b/opentelemetry-sdk/tests/logs/scripts/logger_provider_resource_after_fork.py new file mode 100644 index 0000000000..2ff16b6b1d --- /dev/null +++ b/opentelemetry-sdk/tests/logs/scripts/logger_provider_resource_after_fork.py @@ -0,0 +1,82 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +import json +import os + +from opentelemetry._logs import LogRecord +from opentelemetry.sdk._logs import LoggerProvider +from opentelemetry.sdk._logs.export import ( + InMemoryLogRecordExporter, + SimpleLogRecordProcessor, +) +from opentelemetry.sdk.resources import PROCESS_PID + + +# pylint: disable-next=too-many-locals +def main() -> None: + exporter = InMemoryLogRecordExporter() + logger_provider = LoggerProvider(shutdown_on_exit=False) + logger_provider.add_log_record_processor( + SimpleLogRecordProcessor(exporter) + ) + logger = logger_provider.get_logger("cached") + parent_pid = os.getpid() + parent_resource_pid = logger_provider.resource.attributes[PROCESS_PID] + parent_logger_pid = logger.resource.attributes[PROCESS_PID] + + pid = os.fork() + if not pid: + child_pid = os.getpid() + new_logger = logger_provider.get_logger("new") + logger.emit(LogRecord(observed_timestamp=0, body="cached")) + new_logger.emit(LogRecord(observed_timestamp=0, body="new")) + finished_logs = exporter.get_finished_logs() + print( + json.dumps( + { + "child_pid": child_pid, + "provider_pid": logger_provider.resource.attributes[ + PROCESS_PID + ], + "cached_logger_pid": logger.resource.attributes[ + PROCESS_PID + ], + "new_logger_pid": new_logger.resource.attributes[ + PROCESS_PID + ], + "exported_resource_pids": [ + log.resource.attributes[PROCESS_PID] + for log in finished_logs + ], + "log_bodies": sorted( + log.log_record.body for log in finished_logs + ), + } + ), + flush=True, + ) + # pylint: disable-next=protected-access + os._exit(0) + + os.waitpid(pid, 0) + print( + json.dumps( + { + "parent_pid": parent_pid, + "parent_resource_pid": parent_resource_pid, + "parent_logger_pid": parent_logger_pid, + "parent_resource_pid_after_fork": logger_provider.resource.attributes[ + PROCESS_PID + ], + "parent_logger_pid_after_fork": logger.resource.attributes[ + PROCESS_PID + ], + } + ), + flush=True, + ) + + +if __name__ == "__main__": + main() diff --git a/opentelemetry-sdk/tests/logs/test_logs.py b/opentelemetry-sdk/tests/logs/test_logs.py index a9b171b6fd..3870ae7b09 100644 --- a/opentelemetry-sdk/tests/logs/test_logs.py +++ b/opentelemetry-sdk/tests/logs/test_logs.py @@ -3,7 +3,12 @@ # pylint: disable=protected-access +import json +import os +import subprocess +import sys import unittest +from pathlib import Path from unittest.mock import Mock, patch from opentelemetry._logs import LogRecord, SeverityNumber @@ -23,7 +28,10 @@ _RuleBasedLoggerConfigurator, create_logger_metrics, ) -from opentelemetry.sdk.environment_variables import OTEL_SDK_DISABLED +from opentelemetry.sdk.environment_variables import ( + OTEL_EXPERIMENTAL_RESOURCE_DETECTORS, + OTEL_SDK_DISABLED, +) from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.util.instrumentation import ( InstrumentationScope, @@ -51,6 +59,98 @@ def test_resource(self): resource = Resource({"key": "value"}) self.assertIs(LoggerProvider(resource=resource).resource, resource) + def test_update_resource(self): + initial_resource = Resource({"one": "one", "two": "old"}) + updating_resource = Resource({"two": "new", "three": "three"}) + logger_provider = LoggerProvider(resource=initial_resource) + logger = logger_provider.get_logger("name") + other_logger = logger_provider.get_logger( + "other", attributes={"key": "value"} + ) + processor_mock = Mock() + logger_provider.add_log_record_processor(processor_mock) + + logger_provider._update_resource(updating_resource) + + self.assertEqual( + logger_provider.resource.attributes, + {"one": "one", "two": "new", "three": "three"}, + ) + self.assertIs(logger.resource, logger_provider.resource) + self.assertIs(other_logger.resource, logger_provider.resource) + + logger.emit(LogRecord(observed_timestamp=0, body="a log line")) + log_data = processor_mock.on_emit.call_args.args[0] + self.assertIs(log_data.resource, logger_provider.resource) + + new_logger = logger_provider.get_logger("new") + self.assertIs(new_logger.resource, logger_provider.resource) + + @unittest.skipUnless( + hasattr(os, "fork") and hasattr(os, "register_at_fork"), + "requires os.fork and os.register_at_fork", + ) + def test_logger_provider_updates_process_sensitive_resource_after_fork( + self, + ): + script_path = ( + Path(__file__).parent + / "scripts" + / "logger_provider_resource_after_fork.py" + ) + + result = subprocess.run( + [sys.executable, str(script_path)], + capture_output=True, + check=False, + text=True, + env={ + **os.environ, + OTEL_EXPERIMENTAL_RESOURCE_DETECTORS: "process", + }, + ) + self.assertEqual( + result.returncode, + 0, + f"stdout: {result.stdout}\nstderr: {result.stderr}", + ) + lines = result.stdout.strip().splitlines() + child_payload = json.loads(lines[0]) + parent_payload = json.loads(lines[1]) + + self.assertEqual( + parent_payload["parent_resource_pid"], parent_payload["parent_pid"] + ) + self.assertEqual( + parent_payload["parent_logger_pid"], parent_payload["parent_pid"] + ) + self.assertEqual( + parent_payload["parent_resource_pid_after_fork"], + parent_payload["parent_pid"], + ) + self.assertEqual( + parent_payload["parent_logger_pid_after_fork"], + parent_payload["parent_pid"], + ) + + self.assertNotEqual( + child_payload["child_pid"], parent_payload["parent_pid"] + ) + self.assertEqual( + child_payload["provider_pid"], child_payload["child_pid"] + ) + self.assertEqual( + child_payload["cached_logger_pid"], child_payload["child_pid"] + ) + self.assertEqual( + child_payload["new_logger_pid"], child_payload["child_pid"] + ) + self.assertEqual( + child_payload["exported_resource_pids"], + [child_payload["child_pid"], child_payload["child_pid"]], + ) + self.assertEqual(child_payload["log_bodies"], ["cached", "new"]) + def test_get_logger(self): """ `LoggerProvider.get_logger` arguments are used to create an diff --git a/opentelemetry-sdk/tests/metrics/scripts/meter_provider_resource_after_fork.py b/opentelemetry-sdk/tests/metrics/scripts/meter_provider_resource_after_fork.py new file mode 100644 index 0000000000..dea2054319 --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/scripts/meter_provider_resource_after_fork.py @@ -0,0 +1,84 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +import json +import os + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import InMemoryMetricReader +from opentelemetry.sdk.resources import PROCESS_PID + + +def _resource_pids(metrics_data) -> list[int]: + return [ + resource_metric.resource.attributes[PROCESS_PID] + for resource_metric in metrics_data.resource_metrics + ] + + +def _metric_names(metrics_data) -> list[str]: + return sorted( + metric.name + for resource_metric in metrics_data.resource_metrics + for scope_metric in resource_metric.scope_metrics + for metric in scope_metric.metrics + ) + + +# pylint: disable-next=too-many-locals +def main() -> None: + reader = InMemoryMetricReader() + meter_provider = MeterProvider( + metric_readers=[reader], shutdown_on_exit=False + ) + meter = meter_provider.get_meter("cached") + counter = meter.create_counter("cached_counter") + parent_pid = os.getpid() + # pylint: disable-next=protected-access + parent_resource_pid = meter_provider._sdk_config.resource.attributes[ + PROCESS_PID + ] + + pid = os.fork() + if not pid: + child_pid = os.getpid() + new_meter = meter_provider.get_meter("new") + new_counter = new_meter.create_counter("new_counter") + counter.add(1) + new_counter.add(1) + metrics_data = reader.get_metrics_data() + print( + json.dumps( + { + "child_pid": child_pid, + # pylint: disable-next=protected-access + "provider_pid": meter_provider._sdk_config.resource.attributes[ + PROCESS_PID + ], + "exported_resource_pids": _resource_pids(metrics_data), + "metric_names": _metric_names(metrics_data), + } + ), + flush=True, + ) + # pylint: disable-next=protected-access + os._exit(0) + + os.waitpid(pid, 0) + print( + json.dumps( + { + "parent_pid": parent_pid, + "parent_resource_pid": parent_resource_pid, + # pylint: disable-next=protected-access + "parent_resource_pid_after_fork": meter_provider._sdk_config.resource.attributes[ + PROCESS_PID + ], + } + ), + flush=True, + ) + + +if __name__ == "__main__": + main() diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index 433ad056ea..94bcf931e2 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -2,9 +2,15 @@ # SPDX-License-Identifier: Apache-2.0 # pylint: disable=protected-access,no-self-use,too-many-lines +import json +import os +import subprocess +import sys +import unittest import weakref from collections.abc import Callable, Iterable, Sequence from logging import DEBUG, WARNING +from pathlib import Path from threading import Lock from time import sleep from typing import Any @@ -12,7 +18,10 @@ from opentelemetry.attributes import BoundedAttributes from opentelemetry.metrics import NoOpMeter -from opentelemetry.sdk.environment_variables import OTEL_SDK_DISABLED +from opentelemetry.sdk.environment_variables import ( + OTEL_EXPERIMENTAL_RESOURCE_DETECTORS, + OTEL_SDK_DISABLED, +) from opentelemetry.sdk.metrics import ( Counter, Histogram, @@ -121,6 +130,94 @@ def test_resource(self): MeterProvider(resource=resource)._sdk_config.resource, resource ) + def test_update_resource(self): + initial_resource = Resource({"one": "one", "two": "old"}) + updating_resource = Resource({"two": "new", "three": "three"}) + reader = InMemoryMetricReader() + meter_provider = MeterProvider( + metric_readers=[reader], resource=initial_resource + ) + meter = meter_provider.get_meter("name") + counter = meter.create_counter("counter") + + meter_provider._update_resource(updating_resource) + + self.assertEqual( + meter_provider._sdk_config.resource.attributes, + {"one": "one", "two": "new", "three": "three"}, + ) + + counter.add(1) + metrics_data = reader.get_metrics_data() + self.assertIs( + metrics_data.resource_metrics[0].resource, + meter_provider._sdk_config.resource, + ) + + new_meter = meter_provider.get_meter("new") + new_counter = new_meter.create_counter("new_counter") + new_counter.add(1) + metrics_data = reader.get_metrics_data() + self.assertIs( + metrics_data.resource_metrics[0].resource, + meter_provider._sdk_config.resource, + ) + + @unittest.skipUnless( + hasattr(os, "fork") and hasattr(os, "register_at_fork"), + "requires os.fork and os.register_at_fork", + ) + def test_meter_provider_updates_process_sensitive_resource_after_fork( + self, + ): + script_path = ( + Path(__file__).parent + / "scripts" + / "meter_provider_resource_after_fork.py" + ) + + result = subprocess.run( + [sys.executable, str(script_path)], + capture_output=True, + check=False, + text=True, + env={ + **os.environ, + OTEL_EXPERIMENTAL_RESOURCE_DETECTORS: "process", + }, + ) + self.assertEqual( + result.returncode, + 0, + f"stdout: {result.stdout}\nstderr: {result.stderr}", + ) + lines = result.stdout.strip().splitlines() + child_payload = json.loads(lines[0]) + parent_payload = json.loads(lines[1]) + + self.assertEqual( + parent_payload["parent_resource_pid"], parent_payload["parent_pid"] + ) + self.assertEqual( + parent_payload["parent_resource_pid_after_fork"], + parent_payload["parent_pid"], + ) + + self.assertNotEqual( + child_payload["child_pid"], parent_payload["parent_pid"] + ) + self.assertEqual( + child_payload["provider_pid"], child_payload["child_pid"] + ) + self.assertEqual( + child_payload["exported_resource_pids"], + [child_payload["child_pid"]], + ) + self.assertEqual( + child_payload["metric_names"], + ["cached_counter", "new_counter"], + ) + def test_get_meter(self): """ `MeterProvider.get_meter` arguments are used to create an diff --git a/opentelemetry-sdk/tests/resources/test_resources.py b/opentelemetry-sdk/tests/resources/test_resources.py index 2b945ea9fd..f7ca275299 100644 --- a/opentelemetry-sdk/tests/resources/test_resources.py +++ b/opentelemetry-sdk/tests/resources/test_resources.py @@ -50,6 +50,7 @@ Resource, ResourceDetector, ServiceInstanceIdResourceDetector, + _get_process_sensitive_resource, _HostResourceDetector, get_aggregated_resources, ) @@ -63,6 +64,27 @@ psutil = None +class DefaultResourceDetector(ResourceDetector): + def detect(self) -> Resource: + return Resource.get_empty() + + +class ProcessSensitivityResourceDetector(ResourceDetector): + def __init__( + self, resource: Resource, process_sensitive: bool = False + ) -> None: + super().__init__() + self.resource = resource + self.process_sensitive = process_sensitive + + def detect(self) -> Resource: + return self.resource + + def is_process_sensitive(self) -> bool: + return self.process_sensitive + + +# pylint: disable-next=too-many-public-methods class TestResources(unittest.TestCase): def setUp(self) -> None: environ[OTEL_RESOURCE_ATTRIBUTES] = "" @@ -459,6 +481,48 @@ def test_resource_detector_raise_error(self): Exception, get_aggregated_resources, [resource_detector] ) + def test_resource_detector_is_not_process_sensitive_by_default(self): + self.assertFalse(DefaultResourceDetector().is_process_sensitive()) + + def test_process_resource_detector_is_process_sensitive(self): + self.assertTrue(ProcessResourceDetector().is_process_sensitive()) + + @patch("opentelemetry.sdk.resources._build_resource_detectors") + def test_get_process_sensitive_resource( + self, build_resource_detectors_mock + ): + build_resource_detectors_mock.return_value = [ + ProcessSensitivityResourceDetector( + Resource({"ignored": "ignored"}) + ), + ProcessSensitivityResourceDetector( + Resource({"one": "one", "two": "old"}), process_sensitive=True + ), + ProcessSensitivityResourceDetector( + Resource({"two": "new", "three": "three"}), + process_sensitive=True, + ), + ] + + self.assertEqual( + _get_process_sensitive_resource(), + Resource({"one": "one", "two": "new", "three": "three"}), + ) + + @patch("opentelemetry.sdk.resources._build_resource_detectors") + def test_get_process_sensitive_resource_empty( + self, build_resource_detectors_mock + ): + build_resource_detectors_mock.return_value = [ + ProcessSensitivityResourceDetector( + Resource({"ignored": "ignored"}) + ), + ] + + self.assertEqual( + _get_process_sensitive_resource(), Resource.get_empty() + ) + @patch("opentelemetry.sdk.resources.logger") def test_resource_detector_timeout(self, mock_logger): resource_detector = Mock(spec=ResourceDetector) @@ -970,6 +1034,11 @@ def tearDown(self) -> None: _resources_module._service_instance_id = self._orig_instance_id _resources_module._service_instance_id_pid = self._orig_instance_pid + def test_is_process_sensitive(self): + self.assertTrue( + ServiceInstanceIdResourceDetector().is_process_sensitive() + ) + def test_detect_value_is_valid_uuid4(self): _resources_module._service_instance_id = None _resources_module._service_instance_id_pid = None diff --git a/opentelemetry-sdk/tests/trace/scripts/tracer_provider_resource_after_fork.py b/opentelemetry-sdk/tests/trace/scripts/tracer_provider_resource_after_fork.py new file mode 100644 index 0000000000..0eff1b3f4b --- /dev/null +++ b/opentelemetry-sdk/tests/trace/scripts/tracer_provider_resource_after_fork.py @@ -0,0 +1,64 @@ +# Copyright The OpenTelemetry Authors +# SPDX-License-Identifier: Apache-2.0 + +import json +import os + +from opentelemetry.sdk import trace +from opentelemetry.sdk.resources import PROCESS_PID + + +def main() -> None: + tracer_provider = trace.TracerProvider(shutdown_on_exit=False) + tracer = tracer_provider.get_tracer("cached") + parent_pid = os.getpid() + parent_resource_pid = tracer_provider.resource.attributes[PROCESS_PID] + parent_tracer_pid = tracer.resource.attributes[PROCESS_PID] + + pid = os.fork() + if not pid: + child_pid = os.getpid() + new_tracer = tracer_provider.get_tracer("new") + span = tracer.start_span("child") + print( + json.dumps( + { + "child_pid": child_pid, + "provider_pid": tracer_provider.resource.attributes[ + PROCESS_PID + ], + "cached_tracer_pid": tracer.resource.attributes[ + PROCESS_PID + ], + "new_tracer_pid": new_tracer.resource.attributes[ + PROCESS_PID + ], + "span_pid": span.resource.attributes[PROCESS_PID], + } + ), + flush=True, + ) + # pylint: disable-next=protected-access + os._exit(0) + + os.waitpid(pid, 0) + print( + json.dumps( + { + "parent_pid": parent_pid, + "parent_resource_pid": parent_resource_pid, + "parent_tracer_pid": parent_tracer_pid, + "parent_resource_pid_after_fork": tracer_provider.resource.attributes[ + PROCESS_PID + ], + "parent_tracer_pid_after_fork": tracer.resource.attributes[ + PROCESS_PID + ], + } + ), + flush=True, + ) + + +if __name__ == "__main__": + main() diff --git a/opentelemetry-sdk/tests/trace/test_trace.py b/opentelemetry-sdk/tests/trace/test_trace.py index 0b07908ec5..63460bb0d3 100644 --- a/opentelemetry-sdk/tests/trace/test_trace.py +++ b/opentelemetry-sdk/tests/trace/test_trace.py @@ -6,11 +6,15 @@ import copy import dataclasses +import json +import os import shutil import subprocess +import sys import unittest from importlib import reload from logging import ERROR, WARNING +from pathlib import Path from random import randint from time import time_ns from unittest import mock @@ -24,6 +28,7 @@ OTEL_ATTRIBUTE_COUNT_LIMIT, OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT, OTEL_EVENT_ATTRIBUTE_COUNT_LIMIT, + OTEL_EXPERIMENTAL_RESOURCE_DETECTORS, OTEL_LINK_ATTRIBUTE_COUNT_LIMIT, OTEL_SDK_DISABLED, OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, @@ -163,6 +168,67 @@ def test_tracer_provider_accepts_concurrent_multi_span_processor(self): span_processor, tracer_provider._active_span_processor ) + @unittest.skipUnless( + hasattr(os, "fork") and hasattr(os, "register_at_fork"), + "requires os.fork and os.register_at_fork", + ) + def test_tracer_provider_updates_process_sensitive_resource_after_fork( + self, + ): + script_path = ( + Path(__file__).parent + / "scripts" + / "tracer_provider_resource_after_fork.py" + ) + + result = subprocess.run( + [sys.executable, str(script_path)], + capture_output=True, + check=False, + text=True, + env={ + **os.environ, + OTEL_EXPERIMENTAL_RESOURCE_DETECTORS: "process", + }, + ) + self.assertEqual( + result.returncode, + 0, + f"stdout: {result.stdout}\nstderr: {result.stderr}", + ) + lines = result.stdout.strip().splitlines() + child_payload = json.loads(lines[0]) + parent_payload = json.loads(lines[1]) + + self.assertEqual( + parent_payload["parent_resource_pid"], parent_payload["parent_pid"] + ) + self.assertEqual( + parent_payload["parent_tracer_pid"], parent_payload["parent_pid"] + ) + self.assertEqual( + parent_payload["parent_resource_pid_after_fork"], + parent_payload["parent_pid"], + ) + self.assertEqual( + parent_payload["parent_tracer_pid_after_fork"], + parent_payload["parent_pid"], + ) + + self.assertNotEqual( + child_payload["child_pid"], parent_payload["parent_pid"] + ) + self.assertEqual( + child_payload["provider_pid"], child_payload["child_pid"] + ) + self.assertEqual( + child_payload["cached_tracer_pid"], child_payload["child_pid"] + ) + self.assertEqual( + child_payload["new_tracer_pid"], child_payload["child_pid"] + ) + self.assertEqual(child_payload["span_pid"], child_payload["child_pid"]) + def test_get_tracer_sdk(self): tracer_provider = trace.TracerProvider() tracer = tracer_provider.get_tracer( @@ -651,6 +717,35 @@ def test_explicit_span_resource(self): span = tracer.start_span("root") self.assertIs(span.resource, resource) + def test_update_resource(self): + initial_resource = resources.Resource({"one": "one", "two": "old"}) + updating_resource = resources.Resource( + {"two": "new", "three": "three"} + ) + tracer_provider = trace.TracerProvider(resource=initial_resource) + tracer = tracer_provider.get_tracer(__name__) + other_tracer = tracer_provider.get_tracer("other") + original_span = tracer.start_span("original") + + # pylint: disable-next=protected-access + tracer_provider._update_resource(updating_resource) + + self.assertEqual( + tracer_provider.resource.attributes, + {"one": "one", "two": "new", "three": "three"}, + ) + self.assertIs(tracer.resource, tracer_provider.resource) + self.assertIs(other_tracer.resource, tracer_provider.resource) + self.assertIs(original_span.resource, initial_resource) + + updated_span = tracer.start_span("updated") + self.assertIs(updated_span.resource, tracer_provider.resource) + + updated_tracer = tracer_provider.get_tracer("new") + self.assertIs(updated_tracer.resource, tracer_provider.resource) + new_span = updated_tracer.start_span("new") + self.assertIs(new_span.resource, tracer_provider.resource) + def test_default_span_resource(self): tracer_provider = trace.TracerProvider() tracer = tracer_provider.get_tracer(__name__) diff --git a/pyproject.toml b/pyproject.toml index 95c826e01b..1b928e2c9f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -181,7 +181,7 @@ issue_format = "[#{issue}](https://github.com/open-telemetry/opentelemetry-pytho wrap = true # wrap fragments to 79 char line length issue_pattern = "^(\\d+)" # only PR numbers as fragment prefix (e.g. 1234.fixed) -# NOTE: remember to update the changelog workflow if these types change +# NOTE: remember to update the changelog workflow if these types change [[tool.towncrier.type]] directory = "added" name = "Added"