diff --git a/.changelog/4767.fixed b/.changelog/4767.fixed new file mode 100644 index 0000000000..440d4c9008 --- /dev/null +++ b/.changelog/4767.fixed @@ -0,0 +1 @@ +`opentelemetry-sdk`: Add fork-safety to metrics `SynchronousMeasurementConsumer` by registering a post-fork child hook, lazily reinitializing metric reader storages on first use in the child process, and clearing asynchronous instruments to avoid duplicated state after `fork()` diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index 8af2f06431..54307635f6 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -3,6 +3,7 @@ # pylint: disable=unused-import +import os import weakref from abc import ABC, abstractmethod from collections.abc import Iterable, Mapping @@ -65,8 +66,30 @@ def __init__( self._async_instruments: list[ opentelemetry.sdk.metrics._internal.instrument._Asynchronous ] = [] + self._needs_storage_reinit = False + if hasattr(os, "register_at_fork"): + weak_self = weakref.ref(self) + + def _fork_handler(): + obj = weak_self() + if obj is not None: + obj._at_fork_reinit() # pylint: disable=protected-access + + os.register_at_fork(after_in_child=_fork_handler) + + def _at_fork_reinit(self): + """Reinitialize lock in child process after fork""" + self._lock = Lock() + # Lazy reinitialization of storages on first use post fork. This is + # done to avoid the overhead of reinitializing the storages on + # every fork. + self._needs_storage_reinit = True + self._async_instruments.clear() def consume_measurement(self, measurement: Measurement) -> None: + if self._needs_storage_reinit: + self._reinit_storages() + should_sample_exemplar = ( self._sdk_config.exemplar_filter.should_sample( measurement.value, @@ -96,6 +119,9 @@ def collect( metric_reader: "opentelemetry.sdk.metrics.export.MetricReader", timeout_millis: float = 10_000, ) -> MetricsData | None: + if self._needs_storage_reinit: + self._reinit_storages() + with self._lock: metric_reader_storage = self._reader_storages[metric_reader] # for now, just use the defaults @@ -135,6 +161,17 @@ def collect( return result + def _reinit_storages(self): + # Reinitialize the storages after a fork to avoid duplicate data points. + # The flag is cleared inside the lock so concurrent callers only run + # reinit once. + with self._lock: + if not self._needs_storage_reinit: + return + for storage in self._reader_storages.values(): + storage._at_fork_reinit() # pylint: disable=protected-access + self._needs_storage_reinit = False + def add_metric_reader( self, metric_reader: "opentelemetry.sdk.metrics.MetricReader" ) -> None: diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py index 7c409430bf..7b41ed395c 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py @@ -63,6 +63,11 @@ def __init__( self._instrument_class_temporality = instrument_class_temporality self._instrument_class_aggregation = instrument_class_aggregation + def _at_fork_reinit(self) -> None: + """Reinitialize locks and clear state in the child process after fork.""" + self._lock = RLock() + self._instrument_view_instrument_matches.clear() + def _get_or_init_view_instrument_match( self, instrument: _Instrument ) -> list[_ViewInstrumentMatch]: diff --git a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py index b4ce731cc1..0dcf3a6793 100644 --- a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py +++ b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py @@ -17,6 +17,20 @@ ) +def _sdk_config( + exemplar_filter=None, resource=None, metric_readers=None, views=None +): + """Create SdkConfiguration for tests.""" + config = SdkConfiguration( + resource=resource or Mock(), + metric_readers=metric_readers or [Mock()], + views=views or Mock(), + exemplar_filter=exemplar_filter + or Mock(should_sample=Mock(return_value=False)), + ) + return config + + @patch( "opentelemetry.sdk.metrics._internal." "measurement_consumer.MetricReaderStorage" @@ -192,6 +206,322 @@ def test_collect_deadline( ) +@patch( + "opentelemetry.sdk.metrics._internal." + "measurement_consumer.MetricReaderStorage" +) +class TestSynchronousMeasurementConsumerForkHandler(TestCase): + """Exhaustive tests for fork handler, needs_storage_reinit, and lazy _reinit_storages.""" + + # pylint: disable=protected-access + + def test_register_at_fork_called_when_available( + self, MockMetricReaderStorage + ): + """Consumer should register fork handler when os.register_at_fork exists.""" + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + SynchronousMeasurementConsumer(_sdk_config()) + register_mock.assert_called_once() + call_kwargs = register_mock.call_args[1] + self.assertIn("after_in_child", call_kwargs) + self.assertTrue(callable(call_kwargs["after_in_child"])) + + def test_at_fork_reinit_sets_needs_storage_reinit_and_clears_async_instruments( + self, MockMetricReaderStorage + ): + """_at_fork_reinit should set _needs_storage_reinit=True and clear _async_instruments.""" + reader_mock = Mock() + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + MockMetricReaderStorage.return_value = storage_mock + + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=[reader_mock]) + ) + async_instrument = MagicMock() + consumer.register_asynchronous_instrument(async_instrument) + self.assertEqual(len(consumer._async_instruments), 1) + + # Simulate fork: call the after_in_child callback + after_in_child = register_mock.call_args[1]["after_in_child"] + after_in_child() + + self.assertTrue(consumer._needs_storage_reinit) + self.assertEqual(len(consumer._async_instruments), 0) + + def test_consume_measurement_triggers_lazy_reinit_on_first_use_after_fork( + self, MockMetricReaderStorage + ): + """First consume_measurement after fork should call _reinit_storages.""" + reader_mocks = [Mock()] + storage_mocks = [Mock()] + storage_mocks[0]._lock = Mock() + storage_mocks[0]._instrument_view_instrument_matches = {} + storage_mocks[0].consume_measurement = Mock() + MockMetricReaderStorage.side_effect = storage_mocks + + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=reader_mocks) + ) + after_in_child = register_mock.call_args[1]["after_in_child"] + after_in_child() + + with patch.object( + consumer, "_reinit_storages", wraps=consumer._reinit_storages + ) as reinit_spy: + consumer.consume_measurement(Mock()) + reinit_spy.assert_called_once() + + def test_consume_measurement_does_not_reinit_on_second_call( + self, MockMetricReaderStorage + ): + """Second consume_measurement after fork should NOT call _reinit_storages again.""" + reader_mocks = [Mock()] + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + storage_mock.consume_measurement = Mock() + MockMetricReaderStorage.return_value = storage_mock + + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=reader_mocks) + ) + after_in_child = register_mock.call_args[1]["after_in_child"] + after_in_child() + + with patch.object( + consumer, "_reinit_storages", wraps=consumer._reinit_storages + ) as reinit_spy: + consumer.consume_measurement(Mock()) + consumer.consume_measurement(Mock()) + reinit_spy.assert_called_once() + + def test_collect_triggers_lazy_reinit_on_first_use_after_fork( + self, MockMetricReaderStorage + ): + """First collect after fork should call _reinit_storages.""" + reader_mock = Mock() + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + storage_mock.collect.return_value = [] + MockMetricReaderStorage.return_value = storage_mock + + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=[reader_mock]) + ) + after_in_child = register_mock.call_args[1]["after_in_child"] + after_in_child() + + with patch.object( + consumer, "_reinit_storages", wraps=consumer._reinit_storages + ) as reinit_spy: + consumer.collect(reader_mock) + reinit_spy.assert_called_once() + + def test_collect_does_not_reinit_on_second_call( + self, MockMetricReaderStorage + ): + """Second collect after fork should NOT call _reinit_storages again.""" + reader_mock = Mock() + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + storage_mock.collect.return_value = [] + MockMetricReaderStorage.return_value = storage_mock + + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=[reader_mock]) + ) + after_in_child = register_mock.call_args[1]["after_in_child"] + after_in_child() + + with patch.object( + consumer, "_reinit_storages", wraps=consumer._reinit_storages + ) as reinit_spy: + consumer.collect(reader_mock) + consumer.collect(reader_mock) + reinit_spy.assert_called_once() + + def test_consume_then_collect_after_fork_reinits_once( + self, MockMetricReaderStorage + ): + """After fork, consume_measurement triggers reinit; collect uses same reinit (no second call).""" + reader_mock = Mock() + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + storage_mock.consume_measurement = Mock() + storage_mock.collect.return_value = [] + MockMetricReaderStorage.return_value = storage_mock + + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=[reader_mock]) + ) + after_in_child = register_mock.call_args[1]["after_in_child"] + after_in_child() + + with patch.object( + consumer, "_reinit_storages", wraps=consumer._reinit_storages + ) as reinit_spy: + consumer.consume_measurement(Mock()) + consumer.collect(reader_mock) + reinit_spy.assert_called_once() + + def test_collect_then_consume_after_fork_reinits_once( + self, MockMetricReaderStorage + ): + """After fork, collect triggers reinit; consume_measurement uses same reinit (no second call).""" + reader_mock = Mock() + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + storage_mock.consume_measurement = Mock() + storage_mock.collect.return_value = [] + MockMetricReaderStorage.return_value = storage_mock + + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=[reader_mock]) + ) + after_in_child = register_mock.call_args[1]["after_in_child"] + after_in_child() + + with patch.object( + consumer, "_reinit_storages", wraps=consumer._reinit_storages + ) as reinit_spy: + consumer.collect(reader_mock) + consumer.consume_measurement(Mock()) + reinit_spy.assert_called_once() + + def test_no_reinit_on_consume_measurement_without_fork( + self, MockMetricReaderStorage + ): + """consume_measurement without prior fork should NOT call _reinit_storages.""" + reader_mocks = [Mock()] + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + storage_mock.consume_measurement = Mock() + MockMetricReaderStorage.return_value = storage_mock + + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = Mock() + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=reader_mocks) + ) + # Do NOT simulate fork + with patch.object( + consumer, "_reinit_storages", wraps=consumer._reinit_storages + ) as reinit_spy: + consumer.consume_measurement(Mock()) + reinit_spy.assert_not_called() + + def test_no_reinit_on_collect_without_fork(self, MockMetricReaderStorage): + """collect without prior fork should NOT call _reinit_storages.""" + reader_mock = Mock() + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + storage_mock.collect.return_value = [] + MockMetricReaderStorage.return_value = storage_mock + + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = Mock() + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=[reader_mock]) + ) + with patch.object( + consumer, "_reinit_storages", wraps=consumer._reinit_storages + ) as reinit_spy: + consumer.collect(reader_mock) + reinit_spy.assert_not_called() + + def test_collect_after_fork_does_not_invoke_cleared_async_instruments( + self, MockMetricReaderStorage + ): + """After fork, collect should not invoke async instruments (they were cleared).""" + reader_mock = Mock() + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + storage_mock.collect.return_value = [] + MockMetricReaderStorage.return_value = storage_mock + + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=[reader_mock]) + ) + async_instrument = MagicMock() + async_instrument.callback.return_value = [] + consumer.register_asynchronous_instrument(async_instrument) + + after_in_child = register_mock.call_args[1]["after_in_child"] + after_in_child() + + consumer.collect(reader_mock) + + async_instrument.callback.assert_not_called() + + class TestSynchronousMeasurementConsumerConcurrency(TestCase): def test_concurrent_changes_to_metric_readers(self): timeout = 1