diff --git a/.changelog/5298.changed b/.changelog/5298.changed new file mode 100644 index 00000000000..ee6c7821d21 --- /dev/null +++ b/.changelog/5298.changed @@ -0,0 +1 @@ +opentelemetry-sdk: reduce lock contention in attributes \ No newline at end of file diff --git a/opentelemetry-api/src/opentelemetry/attributes/__init__.py b/opentelemetry-api/src/opentelemetry/attributes/__init__.py index 86e899d7516..15519ce0c2c 100644 --- a/opentelemetry-api/src/opentelemetry/attributes/__init__.py +++ b/opentelemetry-api/src/opentelemetry/attributes/__init__.py @@ -277,29 +277,49 @@ def __getitem__(self, key: str) -> types.AnyValue: def __setitem__(self, key: str, value: types.AnyValue) -> None: if getattr(self, "_immutable", False): # type: ignore raise TypeError - with self._lock: - if self.maxlen is not None and self.maxlen == 0: + if self.maxlen is not None and self.maxlen == 0: + with self._lock: self.dropped += 1 + return + if self._extended_attributes: + value = _clean_extended_attribute(key, value, self.max_value_len) + else: + value = _clean_attribute(key, value, self.max_value_len) # type: ignore + if value is None: return + with self._lock: + self._setitem_locked(key, value) + def _set_items(self, attributes: "types._ExtendedAttributes") -> None: + if getattr(self, "_immutable", False): # type: ignore + raise TypeError + if self.maxlen is not None and self.maxlen == 0: + with self._lock: + self.dropped += len(attributes) + return + cleaned = [] + for key, value in attributes.items(): if self._extended_attributes: - value = _clean_extended_attribute( - key, value, self.max_value_len - ) + cv = _clean_extended_attribute(key, value, self.max_value_len) else: - value = _clean_attribute(key, value, self.max_value_len) # type: ignore - if value is None: - return - - if key in self._dict: - del self._dict[key] - elif self.maxlen is not None and len(self._dict) == self.maxlen: - if not isinstance(self._dict, OrderedDict): - self._dict = OrderedDict(self._dict) - self._dict.popitem(last=False) # type: ignore - self.dropped += 1 + cv = _clean_attribute(key, value, self.max_value_len) # type: ignore + if cv is None: + continue + cleaned.append((key, cv)) + with self._lock: + for key, cv in cleaned: + self._setitem_locked(key, cv) + + def _setitem_locked(self, key: str, value: types.AnyValue) -> None: + if key in self._dict: + del self._dict[key] + elif self.maxlen is not None and len(self._dict) == self.maxlen: + if not isinstance(self._dict, OrderedDict): + self._dict = OrderedDict(self._dict) + self._dict.popitem(last=False) # type: ignore + self.dropped += 1 - self._dict[key] = value # type: ignore + self._dict[key] = value # type: ignore def __delitem__(self, key: str) -> None: if getattr(self, "_immutable", False): # type: ignore @@ -308,6 +328,8 @@ def __delitem__(self, key: str) -> None: del self._dict[key] def __iter__(self): + if self._immutable: + return iter(self._dict) with self._lock: return iter(list(self._dict)) diff --git a/opentelemetry-sdk/benchmarks/trace/test_benchmark_trace.py b/opentelemetry-sdk/benchmarks/trace/test_benchmark_trace.py index 137314d3b0f..bac15bcc79a 100644 --- a/opentelemetry-sdk/benchmarks/trace/test_benchmark_trace.py +++ b/opentelemetry-sdk/benchmarks/trace/test_benchmark_trace.py @@ -1,6 +1,7 @@ # Copyright The OpenTelemetry Authors # SPDX-License-Identifier: Apache-2.0 +import threading import tracemalloc from functools import lru_cache @@ -17,6 +18,11 @@ _TracerConfig, sampling, ) +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) +from opentelemetry.sdk.util import BoundedList from opentelemetry.sdk.util.instrumentation import _scope_name_matches_glob from opentelemetry.trace import SpanContext, TraceFlags @@ -239,3 +245,57 @@ def benchmark_iter(): list(attrs) benchmark(benchmark_iter) + + +@pytest.mark.parametrize("num_items", [1, 10, 50, 128]) +def test_bounded_list_iterator(benchmark, num_items): + blist = BoundedList.from_seq(num_items, range(num_items)) + + peaks = [] + for _ in range(200): + tracemalloc.start() + list(blist) + _, peak = tracemalloc.get_traced_memory() + tracemalloc.stop() + peaks.append(peak) + benchmark.extra_info["mean_alloc_bytes"] = sum(peaks) / len(peaks) + + def benchmark_iter(): + list(blist) + + benchmark(benchmark_iter) + + +# Total span work is fixed at TOTAL_SPANS regardless of thread count so that +# ideal (GIL-free) parallelism would produce a flat wall-clock time across +# thread counts. Any increase instead reveals GIL + lock contention. +_TOTAL_SPANS = 128 +_ATTRS_PER_SPAN = {f"key{i}": f"value{i}" for i in range(50)} + + +@pytest.mark.parametrize("num_threads", [1, 2, 4, 8]) +def test_gil_contention_batch_processor(benchmark, num_threads): + exporter = InMemorySpanExporter() + provider = TracerProvider(sampler=sampling.DEFAULT_ON) + # max_export_batch_size=16 ensures the export threshold is crossed + # during the benchmark so _worker_awaken.set() contention is exercised. + provider.add_span_processor( + BatchSpanProcessor(exporter, max_export_batch_size=16) + ) + tracer = provider.get_tracer("bench") + spans_per_thread = _TOTAL_SPANS // num_threads + + def worker(): + for _ in range(spans_per_thread): + span = tracer.start_span("s") + span.set_attributes(_ATTRS_PER_SPAN) + span.end() + + def benchmark_fn(): + threads = [threading.Thread(target=worker) for _ in range(num_threads)] + for thread in threads: + thread.start() + for thread in threads: + thread.join() + + benchmark(benchmark_fn) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py index bf816e287e5..3e2b8a263a4 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_shared_internal/__init__.py @@ -206,7 +206,10 @@ def emit(self, data: Telemetry) -> None: self._metrics.drop_items(1) # This will drop a log from the right side if the queue is at _max_queue_size. self._queue.appendleft(data) - if len(self._queue) >= self._max_export_batch_size: + if ( + len(self._queue) >= self._max_export_batch_size + and not self._worker_awaken.is_set() + ): self._worker_awaken.set() def shutdown(self, timeout_millis: int = 30000): diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index e3e42e28bec..d292fed8e55 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -891,8 +891,7 @@ def set_attributes( logger.warning("Setting attribute on ended span.") return - for key, value in attributes.items(): - self._attributes[key] = value + self._attributes._set_items(attributes) # pylint: disable=protected-access def set_attribute(self, key: str, value: types.AttributeValue) -> None: with self._lock: @@ -990,6 +989,7 @@ def end(self, end_time: int | None = None) -> None: return self._end_time = end_time if end_time is not None else time_ns() + self._attributes._immutable = True # pylint: disable=protected-access if self._record_end_metrics: self._record_end_metrics()