Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4175](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4175))
- `opentelemetry-docker-tests` Fix docker-tests assumption by Postgres-Sqlalchemy case about scope of metrics
([#4258](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4258))
- `opentelemetry-instrumentation-threading`: fix AttributeError when Thread is run without starting
([#4246](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/4246))

### Breaking changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ def __wrap_threading_run(
) -> R:
token = None
try:
token = context.attach(instance._otel_context)
if hasattr(instance, "_otel_context"):
token = context.attach(instance._otel_context)
return call_wrapped(*args, **kwargs)
finally:
if token is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import threading
from concurrent.futures import ( # pylint: disable=no-name-in-module; TODO #4199
Future,
ThreadPoolExecutor,
)
from typing import List
Expand Down Expand Up @@ -66,7 +69,7 @@ def test_trace_context_propagation_in_thread_pool_with_multiple_workers(
executor = ThreadPoolExecutor(max_workers=max_workers)

expected_span_contexts: List[trace.SpanContext] = []
futures_list = []
futures_list: List[Future[trace.SpanContext]] = []
for num in range(max_workers):
with self._tracer.start_as_current_span(f"trace_{num}") as span:
expected_span_context = span.get_span_context()
Expand Down Expand Up @@ -125,23 +128,23 @@ def fake_func(self):
def get_current_span_context_for_test() -> trace.SpanContext:
return trace.get_current_span().get_span_context()

def print_square(self, num):
def print_square(self, num: int | float) -> int | float:
with self._tracer.start_as_current_span("square"):
return num * num

def print_cube(self, num):
def print_cube(self, num: int | float) -> int | float:
with self._tracer.start_as_current_span("cube"):
return num * num * num

def print_square_with_thread(self, num):
def print_square_with_thread(self, num: int | float) -> int | float:
with self._tracer.start_as_current_span("square"):
cube_thread = threading.Thread(target=self.print_cube, args=(10,))

cube_thread.start()
cube_thread.join()
return num * num

def calculate(self, num):
def calculate(self, num: int | float) -> None:
with self._tracer.start_as_current_span("calculate"):
square_thread = threading.Thread(
target=self.print_square, args=(num,)
Expand Down Expand Up @@ -294,3 +297,48 @@ def test_threadpool_with_valid_context_token(self, mock_detach: MagicMock):
future = executor.submit(self.get_current_span_context_for_test)
future.result()
mock_detach.assert_called_once()

def test_threading_run_without_start(self):
square_thread = threading.Thread(target=self.print_square, args=(10,))
with self._tracer.start_as_current_span("root"):
square_thread.run()

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 2)
root_span = next(span for span in spans if span.name == "root")
self.assertIsNotNone(root_span)
self.assertIsNone(root_span.parent)
square_span = next(span for span in spans if span.name == "square")
self.assertIsNotNone(square_span)
self.assertIs(square_span.parent, root_span.get_span_context())

def test_threading_run_with_custom_run(self):
_tracer = self._tracer

class ThreadWithCustomRun(threading.Thread):
def run(self):
# don't call super().run() on purpose
# Thread.run() cannot be called twice
with _tracer.start_as_current_span("square"):
pass

square_thread = ThreadWithCustomRun(
target=self.print_square, args=(10,)
)
with self._tracer.start_as_current_span("run_1"):
square_thread.run()
with self._tracer.start_as_current_span("run_2"):
square_thread.run()

spans = self.memory_exporter.get_finished_spans()
self.assertEqual(len(spans), 4)
run_1_span = next(span for span in spans if span.name == "run_1")
run_2_span = next(span for span in spans if span.name == "run_2")
square_spans = [span for span in spans if span.name == "square"]
square_spans.sort(key=lambda x: x.start_time or 0)
run_1_child_span = square_spans[0]
run_2_child_span = square_spans[1]
self.assertIs(run_1_child_span.parent, run_1_span.get_span_context())
self.assertIs(run_2_child_span.parent, run_2_span.get_span_context())
self.assertIsNone(run_1_span.parent)
self.assertIsNone(run_2_span.parent)