Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changelog/5298.changed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
opentelemetry-sdk: reduce lock contention in attributes
56 changes: 39 additions & 17 deletions opentelemetry-api/src/opentelemetry/attributes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,29 +277,49 @@ def __getitem__(self, key: str) -> types.AnyValue:
def __setitem__(self, key: str, value: types.AnyValue) -> None:
if getattr(self, "_immutable", False): # type: ignore
raise TypeError
with self._lock:
if self.maxlen is not None and self.maxlen == 0:
if self.maxlen is not None and self.maxlen == 0:
Comment thread
lzchen marked this conversation as resolved.
with self._lock:
self.dropped += 1
return
if self._extended_attributes:
value = _clean_extended_attribute(key, value, self.max_value_len)
else:
value = _clean_attribute(key, value, self.max_value_len) # type: ignore
if value is None:
return
with self._lock:
self._setitem_locked(key, value)

def _set_items(self, attributes: "types._ExtendedAttributes") -> None:
if getattr(self, "_immutable", False): # type: ignore
raise TypeError
if self.maxlen is not None and self.maxlen == 0:
with self._lock:
self.dropped += len(attributes)
return
cleaned = []
for key, value in attributes.items():
if self._extended_attributes:
value = _clean_extended_attribute(
key, value, self.max_value_len
)
cv = _clean_extended_attribute(key, value, self.max_value_len)
else:
value = _clean_attribute(key, value, self.max_value_len) # type: ignore
if value is None:
return

if key in self._dict:
del self._dict[key]
elif self.maxlen is not None and len(self._dict) == self.maxlen:
if not isinstance(self._dict, OrderedDict):
self._dict = OrderedDict(self._dict)
self._dict.popitem(last=False) # type: ignore
self.dropped += 1
cv = _clean_attribute(key, value, self.max_value_len) # type: ignore
if cv is None:
continue
cleaned.append((key, cv))
with self._lock:
for key, cv in cleaned:
self._setitem_locked(key, cv)

def _setitem_locked(self, key: str, value: types.AnyValue) -> None:
if key in self._dict:
del self._dict[key]
elif self.maxlen is not None and len(self._dict) == self.maxlen:
if not isinstance(self._dict, OrderedDict):
self._dict = OrderedDict(self._dict)
self._dict.popitem(last=False) # type: ignore
self.dropped += 1

self._dict[key] = value # type: ignore
self._dict[key] = value # type: ignore

def __delitem__(self, key: str) -> None:
if getattr(self, "_immutable", False): # type: ignore
Expand All @@ -308,6 +328,8 @@ def __delitem__(self, key: str) -> None:
del self._dict[key]

def __iter__(self):
if self._immutable:
return iter(self._dict)
with self._lock:
return iter(list(self._dict))

Expand Down
60 changes: 60 additions & 0 deletions opentelemetry-sdk/benchmarks/trace/test_benchmark_trace.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

import threading
import tracemalloc
from functools import lru_cache

Expand All @@ -17,6 +18,11 @@
_TracerConfig,
sampling,
)
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)
from opentelemetry.sdk.util import BoundedList
from opentelemetry.sdk.util.instrumentation import _scope_name_matches_glob
from opentelemetry.trace import SpanContext, TraceFlags

Expand Down Expand Up @@ -239,3 +245,57 @@ def benchmark_iter():
list(attrs)

benchmark(benchmark_iter)


@pytest.mark.parametrize("num_items", [1, 10, 50, 128])
def test_bounded_list_iterator(benchmark, num_items):
blist = BoundedList.from_seq(num_items, range(num_items))

peaks = []
for _ in range(200):
tracemalloc.start()
list(blist)
_, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
peaks.append(peak)
benchmark.extra_info["mean_alloc_bytes"] = sum(peaks) / len(peaks)

def benchmark_iter():
list(blist)

benchmark(benchmark_iter)


# Total span work is fixed at TOTAL_SPANS regardless of thread count so that
# ideal (GIL-free) parallelism would produce a flat wall-clock time across
# thread counts. Any increase instead reveals GIL + lock contention.
_TOTAL_SPANS = 128
_ATTRS_PER_SPAN = {f"key{i}": f"value{i}" for i in range(50)}


@pytest.mark.parametrize("num_threads", [1, 2, 4, 8])
def test_gil_contention_batch_processor(benchmark, num_threads):
exporter = InMemorySpanExporter()
provider = TracerProvider(sampler=sampling.DEFAULT_ON)
# max_export_batch_size=16 ensures the export threshold is crossed
# during the benchmark so _worker_awaken.set() contention is exercised.
provider.add_span_processor(
BatchSpanProcessor(exporter, max_export_batch_size=16)
)
tracer = provider.get_tracer("bench")
spans_per_thread = _TOTAL_SPANS // num_threads

def worker():
for _ in range(spans_per_thread):
span = tracer.start_span("s")
span.set_attributes(_ATTRS_PER_SPAN)
span.end()

def benchmark_fn():
threads = [threading.Thread(target=worker) for _ in range(num_threads)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()

benchmark(benchmark_fn)
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,10 @@ def emit(self, data: Telemetry) -> None:
self._metrics.drop_items(1)
# This will drop a log from the right side if the queue is at _max_queue_size.
self._queue.appendleft(data)
if len(self._queue) >= self._max_export_batch_size:
if (
len(self._queue) >= self._max_export_batch_size
and not self._worker_awaken.is_set()
):
self._worker_awaken.set()

def shutdown(self, timeout_millis: int = 30000):
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,8 +891,7 @@ def set_attributes(
logger.warning("Setting attribute on ended span.")
return

for key, value in attributes.items():
self._attributes[key] = value
self._attributes._set_items(attributes) # pylint: disable=protected-access

def set_attribute(self, key: str, value: types.AttributeValue) -> None:
with self._lock:
Expand Down Expand Up @@ -990,6 +989,7 @@ def end(self, end_time: int | None = None) -> None:
return

self._end_time = end_time if end_time is not None else time_ns()
self._attributes._immutable = True # pylint: disable=protected-access

if self._record_end_metrics:
self._record_end_metrics()
Expand Down
Loading