Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c36d47e
feat(metrics): Add fork-safety to SynchronousMeasurementConsumer
dshivashankar1994 Oct 9, 2025
039942b
Merge branch 'main' into reinit_metrics_on_fork
dshivashankar1994 Nov 4, 2025
4741be5
Enhance tests for SynchronousMeasurementConsumer
dshivashankar1994 Mar 18, 2026
491a8e6
Formatting changes
dshivashankar1994 Mar 19, 2026
20bacea
Merge branch 'main' into reinit_metrics_on_fork
dshivashankar1994 Mar 31, 2026
5098c53
Update measurement_consumer.py
dshivashankar1994 Apr 7, 2026
8b0b0ea
Update metric_reader_storage.py
dshivashankar1994 Apr 7, 2026
2050a51
Clean up pylint disables in test_measurement_consumer
dshivashankar1994 Apr 7, 2026
0cd70c0
Update CHANGELOG
dshivashankar1994 Apr 9, 2026
a1743b6
Apply suggestions from code review
dshivashankar1994 Apr 9, 2026
719b959
Merge branch 'main' into reinit_metrics_on_fork
dshivashankar1994 Apr 9, 2026
ee2bc78
Merge branch 'main' into reinit_metrics_on_fork
MikeGoldsmith Apr 13, 2026
ad480b8
Merge branch 'main' into reinit_metrics_on_fork
dshivashankar1994 Apr 18, 2026
96a86da
Change lock type to RLock in metric_reader_storage
dshivashankar1994 Apr 18, 2026
7241878
Fix lint errors
dshivashankar1994 Apr 18, 2026
55af869
Remove unintentional changes
dshivashankar1994 Apr 18, 2026
1265da2
Apply suggestions from code review
dshivashankar1994 Apr 18, 2026
22b00cc
Merge branch 'main' into reinit_metrics_on_fork
MikeGoldsmith Apr 21, 2026
8da6929
Merge branch 'main' into reinit_metrics_on_fork
dshivashankar1994 May 10, 2026
9243034
Merge branch 'main' into reinit_metrics_on_fork
dshivashankar1994 May 11, 2026
ce9c6ec
Merge branch 'main' into reinit_metrics_on_fork
MikeGoldsmith May 11, 2026
72464d4
fix(metrics): use weakref in fork handler to allow GC after provider…
dshivashankar1994 May 11, 2026
3333432
Merge remote-tracking branch 'upstream/main' into reinit_metrics_on_fork
dshivashankar1994 May 19, 2026
188f78a
Fix indent issue and lint error
dshivashankar1994 May 19, 2026
d28b500
Update CHANGELOG
dshivashankar1994 May 19, 2026
63df175
Add fork-safety to SynchronousMeasurementConsumer change log
dshivashankar1994 May 19, 2026
99ed406
Merge branch 'main' into reinit_metrics_on_fork
dshivashankar1994 Jun 3, 2026
a45e4ec
Merge branch 'main' into reinit_metrics_on_fork
xrmx Jun 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/4767.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
`opentelemetry-sdk`: Add fork-safety to metrics `SynchronousMeasurementConsumer` by registering a post-fork child hook, lazily reinitializing metric reader storages on first use in the child process, and clearing asynchronous instruments to avoid duplicated state after `fork()`
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

# pylint: disable=unused-import

import os
import weakref
from abc import ABC, abstractmethod
from collections.abc import Iterable, Mapping
Expand Down Expand Up @@ -65,8 +66,30 @@ def __init__(
self._async_instruments: list[
opentelemetry.sdk.metrics._internal.instrument._Asynchronous
] = []
self._needs_storage_reinit = False
if hasattr(os, "register_at_fork"):
weak_self = weakref.ref(self)

def _fork_handler():
obj = weak_self()
if obj is not None:
obj._at_fork_reinit() # pylint: disable=protected-access

os.register_at_fork(after_in_child=_fork_handler)

def _at_fork_reinit(self):
"""Reinitialize lock in child process after fork"""
self._lock = Lock()
# Lazy reinitialization of storages on first use post fork. This is
# done to avoid the overhead of reinitializing the storages on
# every fork.
self._needs_storage_reinit = True
self._async_instruments.clear()

def consume_measurement(self, measurement: Measurement) -> None:
if self._needs_storage_reinit:
self._reinit_storages()

should_sample_exemplar = (
self._sdk_config.exemplar_filter.should_sample(
measurement.value,
Expand Down Expand Up @@ -96,6 +119,9 @@ def collect(
metric_reader: "opentelemetry.sdk.metrics.export.MetricReader",
timeout_millis: float = 10_000,
) -> MetricsData | None:
if self._needs_storage_reinit:
self._reinit_storages()

with self._lock:
metric_reader_storage = self._reader_storages[metric_reader]
# for now, just use the defaults
Expand Down Expand Up @@ -135,6 +161,17 @@ def collect(

return result

def _reinit_storages(self):
# Reinitialize the storages after a fork to avoid duplicate data points.
# The flag is cleared inside the lock so concurrent callers only run
# reinit once.
with self._lock:
if not self._needs_storage_reinit:
return
for storage in self._reader_storages.values():
storage._at_fork_reinit() # pylint: disable=protected-access
self._needs_storage_reinit = False

def add_metric_reader(
self, metric_reader: "opentelemetry.sdk.metrics.MetricReader"
) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ def __init__(
self._instrument_class_temporality = instrument_class_temporality
self._instrument_class_aggregation = instrument_class_aggregation

def _at_fork_reinit(self) -> None:
"""Reinitialize locks and clear state in the child process after fork."""
self._lock = RLock()
self._instrument_view_instrument_matches.clear()

def _get_or_init_view_instrument_match(
self, instrument: _Instrument
) -> list[_ViewInstrumentMatch]:
Expand Down
Loading
Loading