From c36d47e17288223397c6835cf06bf5487b6a21b0 Mon Sep 17 00:00:00 2001 From: dshivashankar Date: Thu, 9 Oct 2025 17:35:57 +0530 Subject: [PATCH 01/16] feat(metrics): Add fork-safety to SynchronousMeasurementConsumer Implement post-fork reinitialization of threading locks in the metrics measurement consumer to prevent deadlocks and data duplication in forked child processes. --- .../metrics/_internal/measurement_consumer.py | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index c651033051a..049968b6a8e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -14,6 +14,7 @@ # pylint: disable=unused-import +import os from abc import ABC, abstractmethod from threading import Lock from time import time_ns @@ -76,8 +77,25 @@ def __init__( self._async_instruments: List[ "opentelemetry.sdk.metrics._internal.instrument._Asynchronous" ] = [] + if hasattr(os, "register_at_fork"): + os.register_at_fork( + after_in_child=self._at_fork_reinit + ) # pylint: disable=protected-access + + def _at_fork_reinit(self): + """Reinitialize lock in child process after fork""" + self._lock._at_fork_reinit() + # Lazy reinitialization of storages on first use post fork. This is + # done to avoid the overhead of reinitializing the storages on + # every fork. + self._needs_storage_reinit = True + self._async_instruments.clear() def consume_measurement(self, measurement: Measurement) -> None: + if getattr(self, '_needs_storage_reinit', False): + self._reinit_storages() + self._needs_storage_reinit = False + should_sample_exemplar = ( self._sdk_config.exemplar_filter.should_sample( measurement.value, @@ -105,6 +123,11 @@ def collect( metric_reader: "opentelemetry.sdk.metrics.MetricReader", timeout_millis: float = 10_000, ) -> Optional[Iterable[Metric]]: + + if getattr(self, '_needs_storage_reinit', False): + self._reinit_storages() + self._needs_storage_reinit = False + with self._lock: metric_reader_storage = self._reader_storages[metric_reader] # for now, just use the defaults @@ -143,3 +166,11 @@ def collect( result = self._reader_storages[metric_reader].collect() return result + + def _reinit_storages(self): + # Reinitialize the storages. Use to reinitialize the storages after a + # fork to avoid duplicate data points. + with self._lock: + for storage in self._reader_storages.values(): + storage._lock._at_fork_reinit() + storage._instrument_view_instrument_matches.clear() From 4741be55591aac3d169fb7f3fa6960c15d10708d Mon Sep 17 00:00:00 2001 From: dshivashankar Date: Wed, 18 Mar 2026 22:39:41 +0530 Subject: [PATCH 02/16] Enhance tests for SynchronousMeasurementConsumer Added tests for SynchronousMeasurementConsumer's behavior after forking, including registration of fork handlers and reinitialization of storage. --- .../metrics/test_measurement_consumer.py | 329 +++++++++++++++++- 1 file changed, 328 insertions(+), 1 deletion(-) diff --git a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py index 22abfbd3cfe..ec7cc967c86 100644 --- a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py +++ b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -# pylint: disable=invalid-name,no-self-use +# pylint: disable=invalid-name,no-self-use,protected-access from time import sleep from unittest import TestCase @@ -27,6 +27,17 @@ ) +def _sdk_config(exemplar_filter=None, resource=None, metric_readers=None, views=None): + """Create SdkConfiguration for tests. exemplar_filter is set as attribute after init.""" + config = SdkConfiguration( + resource=resource or Mock(), + metric_readers=metric_readers or [Mock()], + views=views or Mock(), + exemplar_filter=exemplar_filter or Mock(should_sample=Mock(return_value=False)) + ) + return config + + @patch( "opentelemetry.sdk.metrics._internal." "measurement_consumer.MetricReaderStorage" @@ -192,3 +203,319 @@ def sleep_1(*args, **kwargs): callback_options_time_call, 10000, ) + + +@patch( + "opentelemetry.sdk.metrics._internal." + "measurement_consumer.MetricReaderStorage" +) +class TestSynchronousMeasurementConsumerForkHandler(TestCase): + """Exhaustive tests for fork handler, needs_storage_reinit, and lazy _reinit_storages.""" + + def test_register_at_fork_called_when_available( + self, MockMetricReaderStorage + ): + """Consumer should register fork handler when os.register_at_fork exists.""" + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + SynchronousMeasurementConsumer(_sdk_config()) + register_mock.assert_called_once() + call_kwargs = register_mock.call_args[1] + self.assertIn("after_in_child", call_kwargs) + self.assertTrue(callable(call_kwargs["after_in_child"])) + + def test_at_fork_reinit_sets_needs_storage_reinit_and_clears_async_instruments( + self, MockMetricReaderStorage + ): + """_at_fork_reinit should set _needs_storage_reinit=True and clear _async_instruments.""" + reader_mock = Mock() + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + MockMetricReaderStorage.return_value = storage_mock + + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=[reader_mock]) + ) + async_instrument = MagicMock() + consumer.register_asynchronous_instrument(async_instrument) + self.assertEqual(len(consumer._async_instruments), 1) + + # Simulate fork: call the after_in_child callback + after_in_child = register_mock.call_args[1]["after_in_child"] + after_in_child() + + self.assertTrue(consumer._needs_storage_reinit) + self.assertEqual(len(consumer._async_instruments), 0) + + def test_consume_measurement_triggers_lazy_reinit_on_first_use_after_fork( + self, MockMetricReaderStorage + ): + """First consume_measurement after fork should call _reinit_storages.""" + reader_mocks = [Mock()] + storage_mocks = [Mock()] + storage_mocks[0]._lock = Mock() + storage_mocks[0]._instrument_view_instrument_matches = {} + storage_mocks[0].consume_measurement = Mock() + MockMetricReaderStorage.side_effect = storage_mocks + + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=reader_mocks) + ) + after_in_child = register_mock.call_args[1]["after_in_child"] + after_in_child() + + with patch.object( + consumer, "_reinit_storages", wraps=consumer._reinit_storages + ) as reinit_spy: + consumer.consume_measurement(Mock()) + reinit_spy.assert_called_once() + + def test_consume_measurement_does_not_reinit_on_second_call( + self, MockMetricReaderStorage + ): + """Second consume_measurement after fork should NOT call _reinit_storages again.""" + reader_mocks = [Mock()] + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + storage_mock.consume_measurement = Mock() + MockMetricReaderStorage.return_value = storage_mock + + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=reader_mocks) + ) + after_in_child = register_mock.call_args[1]["after_in_child"] + after_in_child() + + with patch.object( + consumer, "_reinit_storages", wraps=consumer._reinit_storages + ) as reinit_spy: + consumer.consume_measurement(Mock()) + consumer.consume_measurement(Mock()) + reinit_spy.assert_called_once() + + def test_collect_triggers_lazy_reinit_on_first_use_after_fork( + self, MockMetricReaderStorage + ): + """First collect after fork should call _reinit_storages.""" + reader_mock = Mock() + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + storage_mock.collect.return_value = [] + MockMetricReaderStorage.return_value = storage_mock + + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=[reader_mock]) + ) + after_in_child = register_mock.call_args[1]["after_in_child"] + after_in_child() + + with patch.object( + consumer, "_reinit_storages", wraps=consumer._reinit_storages + ) as reinit_spy: + consumer.collect(reader_mock) + reinit_spy.assert_called_once() + + def test_collect_does_not_reinit_on_second_call( + self, MockMetricReaderStorage + ): + """Second collect after fork should NOT call _reinit_storages again.""" + reader_mock = Mock() + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + storage_mock.collect.return_value = [] + MockMetricReaderStorage.return_value = storage_mock + + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=[reader_mock]) + ) + after_in_child = register_mock.call_args[1]["after_in_child"] + after_in_child() + + with patch.object( + consumer, "_reinit_storages", wraps=consumer._reinit_storages + ) as reinit_spy: + consumer.collect(reader_mock) + consumer.collect(reader_mock) + reinit_spy.assert_called_once() + + def test_consume_then_collect_after_fork_reinits_once( + self, MockMetricReaderStorage + ): + """After fork, consume_measurement triggers reinit; collect uses same reinit (no second call).""" + reader_mock = Mock() + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + storage_mock.consume_measurement = Mock() + storage_mock.collect.return_value = [] + MockMetricReaderStorage.return_value = storage_mock + + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=[reader_mock]) + ) + after_in_child = register_mock.call_args[1]["after_in_child"] + after_in_child() + + with patch.object( + consumer, "_reinit_storages", wraps=consumer._reinit_storages + ) as reinit_spy: + consumer.consume_measurement(Mock()) + consumer.collect(reader_mock) + reinit_spy.assert_called_once() + + def test_collect_then_consume_after_fork_reinits_once( + self, MockMetricReaderStorage + ): + """After fork, collect triggers reinit; consume_measurement uses same reinit (no second call).""" + reader_mock = Mock() + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + storage_mock.consume_measurement = Mock() + storage_mock.collect.return_value = [] + MockMetricReaderStorage.return_value = storage_mock + + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=[reader_mock]) + ) + after_in_child = register_mock.call_args[1]["after_in_child"] + after_in_child() + + with patch.object( + consumer, "_reinit_storages", wraps=consumer._reinit_storages + ) as reinit_spy: + consumer.collect(reader_mock) + consumer.consume_measurement(Mock()) + reinit_spy.assert_called_once() + + def test_no_reinit_on_consume_measurement_without_fork( + self, MockMetricReaderStorage + ): + """consume_measurement without prior fork should NOT call _reinit_storages.""" + reader_mocks = [Mock()] + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + storage_mock.consume_measurement = Mock() + MockMetricReaderStorage.return_value = storage_mock + + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = Mock() + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=reader_mocks) + ) + # Do NOT simulate fork + with patch.object( + consumer, "_reinit_storages", wraps=consumer._reinit_storages + ) as reinit_spy: + consumer.consume_measurement(Mock()) + reinit_spy.assert_not_called() + + def test_no_reinit_on_collect_without_fork( + self, MockMetricReaderStorage + ): + """collect without prior fork should NOT call _reinit_storages.""" + reader_mock = Mock() + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + storage_mock.collect.return_value = [] + MockMetricReaderStorage.return_value = storage_mock + + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = Mock() + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=[reader_mock]) + ) + with patch.object( + consumer, "_reinit_storages", wraps=consumer._reinit_storages + ) as reinit_spy: + consumer.collect(reader_mock) + reinit_spy.assert_not_called() + + def test_collect_after_fork_does_not_invoke_cleared_async_instruments( + self, MockMetricReaderStorage + ): + """After fork, collect should not invoke async instruments (they were cleared).""" + reader_mock = Mock() + storage_mock = Mock() + storage_mock._lock = Mock() + storage_mock._instrument_view_instrument_matches = {} + storage_mock.collect.return_value = [] + MockMetricReaderStorage.return_value = storage_mock + + register_mock = Mock() + with patch( + "opentelemetry.sdk.metrics._internal.measurement_consumer.os" + ) as mock_os: + mock_os.register_at_fork = register_mock + + consumer = SynchronousMeasurementConsumer( + _sdk_config(metric_readers=[reader_mock]) + ) + async_instrument = MagicMock() + async_instrument.callback.return_value = [] + consumer.register_asynchronous_instrument(async_instrument) + + after_in_child = register_mock.call_args[1]["after_in_child"] + after_in_child() + + consumer.collect(reader_mock) + + async_instrument.callback.assert_not_called() From 491a8e6e5baf9a6f69ea68d01888495cfd47204e Mon Sep 17 00:00:00 2001 From: dshivashankar Date: Thu, 19 Mar 2026 12:26:06 +0530 Subject: [PATCH 03/16] Formatting changes --- .../sdk/metrics/_internal/measurement_consumer.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index 049968b6a8e..25903673b50 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -78,9 +78,7 @@ def __init__( "opentelemetry.sdk.metrics._internal.instrument._Asynchronous" ] = [] if hasattr(os, "register_at_fork"): - os.register_at_fork( - after_in_child=self._at_fork_reinit - ) # pylint: disable=protected-access + os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access def _at_fork_reinit(self): """Reinitialize lock in child process after fork""" @@ -92,7 +90,7 @@ def _at_fork_reinit(self): self._async_instruments.clear() def consume_measurement(self, measurement: Measurement) -> None: - if getattr(self, '_needs_storage_reinit', False): + if getattr(self, "_needs_storage_reinit", False): self._reinit_storages() self._needs_storage_reinit = False @@ -123,8 +121,7 @@ def collect( metric_reader: "opentelemetry.sdk.metrics.MetricReader", timeout_millis: float = 10_000, ) -> Optional[Iterable[Metric]]: - - if getattr(self, '_needs_storage_reinit', False): + if getattr(self, "_needs_storage_reinit", False): self._reinit_storages() self._needs_storage_reinit = False From 5098c53cd3932388ba33931e7a179431912f1170 Mon Sep 17 00:00:00 2001 From: dshivashankar Date: Tue, 7 Apr 2026 18:00:57 +0530 Subject: [PATCH 04/16] Update measurement_consumer.py --- .../metrics/_internal/measurement_consumer.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index aec564cdbd2..daa97d51f13 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -77,6 +77,7 @@ def __init__( self._async_instruments: List[ "opentelemetry.sdk.metrics._internal.instrument._Asynchronous" ] = [] + self._needs_storage_reinit = False if hasattr(os, "register_at_fork"): os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access @@ -90,9 +91,8 @@ def _at_fork_reinit(self): self._async_instruments.clear() def consume_measurement(self, measurement: Measurement) -> None: - if getattr(self, "_needs_storage_reinit", False): + if self._needs_storage_reinit: self._reinit_storages() - self._needs_storage_reinit = False should_sample_exemplar = ( self._sdk_config.exemplar_filter.should_sample( @@ -121,9 +121,8 @@ def collect( metric_reader: "opentelemetry.sdk.metrics.MetricReader", timeout_millis: float = 10_000, ) -> Optional[MetricsData]: - if getattr(self, "_needs_storage_reinit", False): + if self._needs_storage_reinit: self._reinit_storages() - self._needs_storage_reinit = False with self._lock: metric_reader_storage = self._reader_storages[metric_reader] @@ -165,9 +164,12 @@ def collect( return result def _reinit_storages(self): - # Reinitialize the storages. Use to reinitialize the storages after a - # fork to avoid duplicate data points. + # Reinitialize the storages after a fork to avoid duplicate data points. + # The flag is cleared inside the lock so concurrent callers only run + # reinit once. with self._lock: + if not self._needs_storage_reinit: + return for storage in self._reader_storages.values(): - storage._lock._at_fork_reinit() - storage._instrument_view_instrument_matches.clear() + storage._at_fork_reinit() + self._needs_storage_reinit = False From 8b0b0eaa4b46ec126518ecc28c8c20366d571de5 Mon Sep 17 00:00:00 2001 From: dshivashankar Date: Tue, 7 Apr 2026 18:02:17 +0530 Subject: [PATCH 05/16] Update metric_reader_storage.py --- .../sdk/metrics/_internal/metric_reader_storage.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py index 317fda0b420..cccf20b08ef 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py @@ -75,6 +75,11 @@ def __init__( self._instrument_class_temporality = instrument_class_temporality self._instrument_class_aggregation = instrument_class_aggregation + def _at_fork_reinit(self) -> None: + """Reinitialize locks and clear state in the child process after fork.""" + self._lock._at_fork_reinit() + self._instrument_view_instrument_matches.clear() + def _get_or_init_view_instrument_match( self, instrument: Instrument ) -> List[_ViewInstrumentMatch]: From 2050a51974ffce0a67e580f1ff6e72e2b46c3106 Mon Sep 17 00:00:00 2001 From: dshivashankar Date: Tue, 7 Apr 2026 18:06:50 +0530 Subject: [PATCH 06/16] Clean up pylint disables in test_measurement_consumer Removed unnecessary pylint disable for protected-access. --- opentelemetry-sdk/tests/metrics/test_measurement_consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py index ec7cc967c86..67e17773a9c 100644 --- a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py +++ b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -# pylint: disable=invalid-name,no-self-use,protected-access +# pylint: disable=invalid-name,no-self-use from time import sleep from unittest import TestCase @@ -209,7 +209,7 @@ def sleep_1(*args, **kwargs): "opentelemetry.sdk.metrics._internal." "measurement_consumer.MetricReaderStorage" ) -class TestSynchronousMeasurementConsumerForkHandler(TestCase): +class TestSynchronousMeasurementConsumerForkHandler(TestCase): # pylint: disable=protected-access """Exhaustive tests for fork handler, needs_storage_reinit, and lazy _reinit_storages.""" def test_register_at_fork_called_when_available( From 0cd70c02829dce6a781f3674089fce4fc6ce5928 Mon Sep 17 00:00:00 2001 From: dshivashankar Date: Thu, 9 Apr 2026 21:37:57 +0530 Subject: [PATCH 07/16] Update CHANGELOG --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bab44ceb635..feb2187f4e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +- `opentelemetry-sdk`: Add fork-safety to metrics `SynchronousMeasurementConsumer` by registering a post-fork child hook, lazily reinitializing metric reader storages on first use in the child process, and clearing asynchronous instruments to avoid duplicated state after `fork()` + ([#4767](https://github.com/open-telemetry/opentelemetry-python/pull/4767)) - `opentelemetry-api`: Replace a broad exception in attribute cleaning tests to satisfy pylint in the `lint-opentelemetry-api` CI job - `opentelemetry-sdk`: Add `create_resource` and `create_propagator`/`configure_propagator` to declarative file configuration, enabling Resource and propagator instantiation from config files without reading env vars ([#4979](https://github.com/open-telemetry/opentelemetry-python/pull/4979)) From a1743b6b8d0310ca9b7cc992273ddbf4c81bf5ec Mon Sep 17 00:00:00 2001 From: dshivashankar Date: Thu, 9 Apr 2026 21:38:02 +0530 Subject: [PATCH 08/16] Apply suggestions from code review Co-authored-by: Mike Goldsmith --- .../opentelemetry/sdk/metrics/_internal/measurement_consumer.py | 2 +- .../sdk/metrics/_internal/metric_reader_storage.py | 2 +- opentelemetry-sdk/tests/metrics/test_measurement_consumer.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index daa97d51f13..d9db18c610a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -83,7 +83,7 @@ def __init__( def _at_fork_reinit(self): """Reinitialize lock in child process after fork""" - self._lock._at_fork_reinit() + self._lock = Lock() # Lazy reinitialization of storages on first use post fork. This is # done to avoid the overhead of reinitializing the storages on # every fork. diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py index cccf20b08ef..34d54f27400 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py @@ -77,7 +77,7 @@ def __init__( def _at_fork_reinit(self) -> None: """Reinitialize locks and clear state in the child process after fork.""" - self._lock._at_fork_reinit() + self._lock = Lock() self._instrument_view_instrument_matches.clear() def _get_or_init_view_instrument_match( diff --git a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py index 67e17773a9c..cdc52b6ee3b 100644 --- a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py +++ b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py @@ -28,7 +28,7 @@ def _sdk_config(exemplar_filter=None, resource=None, metric_readers=None, views=None): - """Create SdkConfiguration for tests. exemplar_filter is set as attribute after init.""" + """Create SdkConfiguration for tests.""" config = SdkConfiguration( resource=resource or Mock(), metric_readers=metric_readers or [Mock()], From 96a86dac743313f90bb31411402aad15474937db Mon Sep 17 00:00:00 2001 From: dshivashankar Date: Sat, 18 Apr 2026 15:23:11 +0530 Subject: [PATCH 09/16] Change lock type to RLock in metric_reader_storage Replaced Lock with RLock for better handling of nested locks. --- .../sdk/metrics/_internal/metric_reader_storage.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py index c0f2af8b4e6..d9c0310a8a2 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/metric_reader_storage.py @@ -77,7 +77,7 @@ def __init__( def _at_fork_reinit(self) -> None: """Reinitialize locks and clear state in the child process after fork.""" - self._lock = Lock() + self._lock = RLock() self._instrument_view_instrument_matches.clear() def _get_or_init_view_instrument_match( From 7241878f196356d845de27be5815c3e4281cdb45 Mon Sep 17 00:00:00 2001 From: dshivashankar Date: Sat, 18 Apr 2026 15:24:16 +0530 Subject: [PATCH 10/16] Fix lint errors --- .../metrics/test_measurement_consumer.py | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py index 912cb4efe1e..26ad42a62db 100644 --- a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py +++ b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py @@ -27,13 +27,16 @@ ) -def _sdk_config(exemplar_filter=None, resource=None, metric_readers=None, views=None): +def _sdk_config( + exemplar_filter=None, resource=None, metric_readers=None, views=None +): """Create SdkConfiguration for tests.""" config = SdkConfiguration( resource=resource or Mock(), metric_readers=metric_readers or [Mock()], views=views or Mock(), - exemplar_filter=exemplar_filter or Mock(should_sample=Mock(return_value=False)) + exemplar_filter=exemplar_filter + or Mock(should_sample=Mock(return_value=False)), ) return config @@ -167,9 +170,8 @@ def sleep_1(*args, **kwargs): "opentelemetry.sdk.metrics._internal." "measurement_consumer.CallbackOptions" ) - @patch("opentelemetry.sdk.metrics._internal.measurement_consumer.time_ns") def test_collect_deadline( - self, mock_time_ns, mock_callback_options, MockMetricReaderStorage + self, mock_callback_options, MockMetricReaderStorage ): reader_mock = Mock() reader_storage_mock = Mock() @@ -183,23 +185,17 @@ def test_collect_deadline( ) ) + def sleep_1(*args, **kwargs): + sleep(1) + return [] + consumer.register_asynchronous_instrument( - Mock(**{"callback.return_value": []}) + Mock(**{"callback.side_effect": sleep_1}) ) consumer.register_asynchronous_instrument( - Mock(**{"callback.return_value": []}) + Mock(**{"callback.side_effect": sleep_1}) ) - # collect start, first remaining_time, post-first callback, - # second remaining_time, post-second callback - mock_time_ns.side_effect = [ - 0, - 0, - int(1e9), - int(1e9), - int(2e9), - ] - consumer.collect(reader_mock) callback_options_time_call = mock_callback_options.mock_calls[ @@ -216,7 +212,7 @@ def test_collect_deadline( "opentelemetry.sdk.metrics._internal." "measurement_consumer.MetricReaderStorage" ) -class TestSynchronousMeasurementConsumerForkHandler(TestCase): # pylint: disable=protected-access +class TestSynchronousMeasurementConsumerForkHandler(TestCase): # pylint: disable=protected-access """Exhaustive tests for fork handler, needs_storage_reinit, and lazy _reinit_storages.""" def test_register_at_fork_called_when_available( @@ -471,9 +467,7 @@ def test_no_reinit_on_consume_measurement_without_fork( consumer.consume_measurement(Mock()) reinit_spy.assert_not_called() - def test_no_reinit_on_collect_without_fork( - self, MockMetricReaderStorage - ): + def test_no_reinit_on_collect_without_fork(self, MockMetricReaderStorage): """collect without prior fork should NOT call _reinit_storages.""" reader_mock = Mock() storage_mock = Mock() From 55af869116227d00692797bdc9f8ce675e593794 Mon Sep 17 00:00:00 2001 From: dshivashankar Date: Sat, 18 Apr 2026 15:26:37 +0530 Subject: [PATCH 11/16] Remove unintentional changes --- .../metrics/test_measurement_consumer.py | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py index 26ad42a62db..8b6ba24d573 100644 --- a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py +++ b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py @@ -170,8 +170,9 @@ def sleep_1(*args, **kwargs): "opentelemetry.sdk.metrics._internal." "measurement_consumer.CallbackOptions" ) + @patch("opentelemetry.sdk.metrics._internal.measurement_consumer.time_ns") def test_collect_deadline( - self, mock_callback_options, MockMetricReaderStorage + self, mock_time_ns, mock_callback_options, MockMetricReaderStorage ): reader_mock = Mock() reader_storage_mock = Mock() @@ -185,17 +186,23 @@ def test_collect_deadline( ) ) - def sleep_1(*args, **kwargs): - sleep(1) - return [] - consumer.register_asynchronous_instrument( - Mock(**{"callback.side_effect": sleep_1}) + Mock(**{"callback.return_value": []}) ) consumer.register_asynchronous_instrument( - Mock(**{"callback.side_effect": sleep_1}) + Mock(**{"callback.return_value": []}) ) + # collect start, first remaining_time, post-first callback, + # second remaining_time, post-second callback + mock_time_ns.side_effect = [ + 0, + 0, + int(1e9), + int(1e9), + int(2e9), + ] + consumer.collect(reader_mock) callback_options_time_call = mock_callback_options.mock_calls[ From 1265da2dc6444ad5eb338ef373542c9aa2b3f076 Mon Sep 17 00:00:00 2001 From: dshivashankar Date: Sat, 18 Apr 2026 15:26:59 +0530 Subject: [PATCH 12/16] Apply suggestions from code review Co-authored-by: Mike Goldsmith --- .../opentelemetry/sdk/metrics/_internal/measurement_consumer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index 7bafe6326f1..1bd87ec0089 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -79,7 +79,7 @@ def __init__( ] = [] self._needs_storage_reinit = False if hasattr(os, "register_at_fork"): - os.register_at_fork(after_in_child=self._at_fork_reinit) # pylint: disable=protected-access + os.register_at_fork(after_in_child=self._at_fork_reinit) def _at_fork_reinit(self): """Reinitialize lock in child process after fork""" From 72464d42caf0690bf497aa85a3d1b56591c7aa7a Mon Sep 17 00:00:00 2001 From: dshivashankar Date: Mon, 11 May 2026 23:26:39 +0530 Subject: [PATCH 13/16] fix(metrics): use weakref in fork handler to allow GC after provider shutdown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit os.register_at_fork holds callbacks for the lifetime of the process with no unregister API. Passing a bound method created a permanent strong reference chain: fork callback → SynchronousMeasurementConsumer → _reader_storages → MetricReader → exporter, preventing garbage collection even after MeterProvider.shutdown(). Replace the bound method with a weakref-based closure so the consumer and everything it owns can be collected normally. The fork handler is a no-op once the consumer is gone. Fixes test_meter_provider_shutdown_cleans_up_successfully. --- .../sdk/metrics/_internal/measurement_consumer.py | 10 +++++++++- .../tests/metrics/test_measurement_consumer.py | 6 ++++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index 771492cb0c6..55fb2c8c1e1 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -4,6 +4,7 @@ # pylint: disable=unused-import import os +import weakref from abc import ABC, abstractmethod from collections.abc import Mapping from threading import Lock @@ -68,7 +69,14 @@ def __init__( ] = [] self._needs_storage_reinit = False if hasattr(os, "register_at_fork"): - os.register_at_fork(after_in_child=self._at_fork_reinit) + weak_self = weakref.ref(self) + + def _fork_handler(): + obj = weak_self() + if obj is not None: + obj._at_fork_reinit() # pylint: disable=protected-access + + os.register_at_fork(after_in_child=_fork_handler) def _at_fork_reinit(self): """Reinitialize lock in child process after fork""" diff --git a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py index 558d5047a88..d67de48b463 100644 --- a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py +++ b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py @@ -208,8 +208,10 @@ def test_collect_deadline( "opentelemetry.sdk.metrics._internal." "measurement_consumer.MetricReaderStorage" ) -class TestSynchronousMeasurementConsumerForkHandler(TestCase): # pylint: disable=protected-access - """Exhaustive tests for fork handler, needs_storage_reinit, and lazy _reinit_storages.""" +class TestSynchronousMeasurementConsumerForkHandler(TestCase): + """Exhaustive tests for fork handler, needs_storage_reinit, and lazy _reinit_storages.""" + + # pylint: disable=protected-access def test_register_at_fork_called_when_available( self, MockMetricReaderStorage From 188f78a8b4e07876dbb68ff7d2ebd3417fa329d4 Mon Sep 17 00:00:00 2001 From: dshivashankar1994 Date: Tue, 19 May 2026 07:13:55 -0400 Subject: [PATCH 14/16] Fix indent issue and lint error --- .../opentelemetry/sdk/metrics/_internal/measurement_consumer.py | 2 +- opentelemetry-sdk/tests/metrics/test_measurement_consumer.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py index 55fb2c8c1e1..ea69be9cb76 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py @@ -168,5 +168,5 @@ def _reinit_storages(self): if not self._needs_storage_reinit: return for storage in self._reader_storages.values(): - storage._at_fork_reinit() + storage._at_fork_reinit() # pylint: disable=protected-access self._needs_storage_reinit = False diff --git a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py index d67de48b463..dba514724da 100644 --- a/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py +++ b/opentelemetry-sdk/tests/metrics/test_measurement_consumer.py @@ -209,7 +209,7 @@ def test_collect_deadline( "measurement_consumer.MetricReaderStorage" ) class TestSynchronousMeasurementConsumerForkHandler(TestCase): - """Exhaustive tests for fork handler, needs_storage_reinit, and lazy _reinit_storages.""" + """Exhaustive tests for fork handler, needs_storage_reinit, and lazy _reinit_storages.""" # pylint: disable=protected-access From d28b5002c7d509f7d4c76e4346937edffd279750 Mon Sep 17 00:00:00 2001 From: Shiva Shankar Dhamodharan Date: Tue, 19 May 2026 17:07:31 +0530 Subject: [PATCH 15/16] Update CHANGELOG --- CHANGELOG.md | 55 ---------------------------------------------------- 1 file changed, 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fb9af8f4af..7501f2a0450 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,62 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 > [!IMPORTANT] > We are working on stabilizing the Log signal that would require making deprecations and breaking changes. We will try to reduce the releases that may require an update to your code, especially for instrumentations or for sdk developers. -<<<<<<< HEAD -## Unreleased - -- `opentelemetry-sdk`: fix multi-processor `force_flush` skipping remaining processors when one returns `None` - ([#5179](https://github.com/open-telemetry/opentelemetry-python/pull/5179)) -- Apply fixes for `UP` ruff rule - ([#5133](https://github.com/open-telemetry/opentelemetry-python/pull/5133)) -- Switch to SPDX license headers and add CI enforcement - ([#5177](https://github.com/open-telemetry/opentelemetry-python/pull/5177)) -- `opentelemetry-api`: Enforce W3C Baggage size limits on outbound propagation in `W3CBaggagePropagator.inject()`. Previously only inbound extraction enforced limits; now inject also caps entries at 180, individual pairs at 4096 bytes, and total header at 8192 bytes per the W3C Baggage spec. The extract path max_pairs limit now counts all size-valid entries rather than only successfully parsed ones. - ([#5163](https://github.com/open-telemetry/opentelemetry-python/pull/5163)) -- `opentelemetry-sdk`: add `additional_properties` support to generated config models via custom `datamodel-codegen` template, enabling plugin/custom component names to flow through typed dataclasses - ([#5131](https://github.com/open-telemetry/opentelemetry-python/pull/5131)) -- Fix incorrect code example in `create_tracer()` docstring - ([#5072](https://github.com/open-telemetry/opentelemetry-python/issues/5072)) -- `opentelemetry-sdk`: add `load_entry_point` shared utility to declarative file configuration for loading plugins via entry points; refactor propagator loading to use it - ([#5093](https://github.com/open-telemetry/opentelemetry-python/pull/5093)) -- `opentelemetry-sdk`: fix YAML structure injection via environment variable substitution in declarative file configuration; values containing newlines are now emitted as quoted YAML scalars per spec requirement - ([#5091](https://github.com/open-telemetry/opentelemetry-python/pull/5091)) -- `opentelemetry-sdk`: Add `create_logger_provider`/`configure_logger_provider` to declarative file configuration, enabling LoggerProvider instantiation from config files without reading env vars - ([#4990](https://github.com/open-telemetry/opentelemetry-python/pull/4990)) -- `opentelemetry-sdk`: Add `service` resource detector support to declarative file configuration via `detection_development.detectors[].service` - ([#5003](https://github.com/open-telemetry/opentelemetry-python/pull/5003)) -- logs: add exception support to Logger emit and LogRecord attributes - ([#4907](https://github.com/open-telemetry/opentelemetry-python/issues/4907)) -- Drop Python 3.9 support - ([#5076](https://github.com/open-telemetry/opentelemetry-python/pull/5076)) -- `opentelemetry-semantic-conventions`: use `X | Y` union annotation - ([#5096](https://github.com/open-telemetry/opentelemetry-python/pull/5096)) -- `opentelemetry-sdk`: Fix `ProcessResourceDetector` to use `sys.orig_argv` so that `process.command`, `process.command_line`, and `process.command_args` reflect the original invocation for `python -m ` runs (where `sys.argv[0]` is rewritten to the module path) - ([#5083](https://github.com/open-telemetry/opentelemetry-python/pull/5083)) -- `opentelemetry-sdk`: make resource detector ordering deterministic - ([#5120](https://github.com/open-telemetry/opentelemetry-python/pull/5120)) -- Add WeaverLiveCheck test util - ([#5088](https://github.com/open-telemetry/opentelemetry-python/pull/5088)) -- Fix incorrect type annotation on `detectors` parameter of `get_aggregated_resources` - ([#5135](https://github.com/open-telemetry/opentelemetry-python/pull/5135)) -- ci: wait for tracecontext server readiness instead of a fixed sleep in `scripts/tracecontext-integration-test.sh` - ([#5149](https://github.com/open-telemetry/opentelemetry-python/pull/5149)) -- `opentelemetry-api`: update `EnvironmentGetter` and `EnvironmentSetter` to use normalized environment variable names - ([#5119](https://github.com/open-telemetry/opentelemetry-python/pull/5119)) -- `opentelemetry-sdk`: only load entrypoints for resource detectors if they are configured via `OTEL_EXPERIMENTAL_RESOURCE_DETECTORS` - ([#5145](https://github.com/open-telemetry/opentelemetry-python/pull/5145)) -- `opentelemetry-exporter-otlp-json-common`: add 'opentelemetry-exporter-otlp-json-common' package for OTLP JSON exporters - ([#4996](https://github.com/open-telemetry/opentelemetry-python/pull/4996)) -- `opentelemetry-exporter-otlp-proto-grpc`: make retryable gRPC error codes configurable for gRPC exporters - ([#4917](https://github.com/open-telemetry/opentelemetry-python/pull/4917)) -- `opentelemetry-sdk`: add sampler plugin loading to declarative file configuration via the `opentelemetry_sampler` entry point group, matching the spec's PluginComponentProvider mechanism - ([#5095](https://github.com/open-telemetry/opentelemetry-python/pull/5095)) -- Add `registry` keyword argument to `PrometheusMetricReader` to allow passing a custom Prometheus registry - ([#5055](https://github.com/open-telemetry/opentelemetry-python/pull/5055)) -- Add ability to selectively enable exporting of SDK internal metrics with the `OTEL_PYTHON_SDK_INTERNAL_METRICS_ENABLED` environment variable. - ([#5151](https://github.com/open-telemetry/opentelemetry-python/pull/5151)) -======= ->>>>>>> upstream/main ## Version 1.41.0/0.62b0 (2026-04-09) From 63df175772f19fb7f647fdd795e510d64fa3a783 Mon Sep 17 00:00:00 2001 From: Shiva Shankar Dhamodharan Date: Tue, 19 May 2026 17:09:24 +0530 Subject: [PATCH 16/16] Add fork-safety to SynchronousMeasurementConsumer change log --- .changelog/4767.fixed | 1 + 1 file changed, 1 insertion(+) create mode 100644 .changelog/4767.fixed diff --git a/.changelog/4767.fixed b/.changelog/4767.fixed new file mode 100644 index 00000000000..440d4c90080 --- /dev/null +++ b/.changelog/4767.fixed @@ -0,0 +1 @@ +`opentelemetry-sdk`: Add fork-safety to metrics `SynchronousMeasurementConsumer` by registering a post-fork child hook, lazily reinitializing metric reader storages on first use in the child process, and clearing asynchronous instruments to avoid duplicated state after `fork()`