Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 60 additions & 18 deletions ddtrace/profiling/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ def _task_get_name(task: "asyncio.Task[typing.Any]") -> str:
return "Task-%d" % id(task)


def _current_task_or_none() -> typing.Optional["asyncio.Task[typing.Any]"]:
try:
return globals()["current_task"]()
except RuntimeError:
return None


def _call_init_asyncio(asyncio: ModuleType) -> None:
from asyncio import tasks as asyncio_tasks

Expand All @@ -45,6 +52,27 @@ def _call_init_asyncio(asyncio: ModuleType) -> None:
stack.init_asyncio(scheduled_tasks, eager_tasks)


def _get_current_or_thread_loop(asyncio: ModuleType) -> typing.Optional["aio.AbstractEventLoop"]:
try:
return typing.cast("aio.AbstractEventLoop", asyncio.get_running_loop())
except RuntimeError:
pass

policy = asyncio.get_event_loop_policy()
local_state = getattr(policy, "_local", None)
if local_state is None:
return None

# AIDEV-NOTE: Profiling can start after asyncio.set_event_loop(loop) but before the
# loop is running. Reading the policy's thread-local `_loop` recovers that already-set
# loop without calling get_event_loop(), which would create a brand-new loop.
current_loop = getattr(local_state, "_loop", None)
if current_loop is None or current_loop.is_closed():
return None

return typing.cast("aio.AbstractEventLoop", current_loop)


def link_existing_loop_to_current_thread() -> None:
global ASYNCIO_IMPORTED

Expand All @@ -55,16 +83,11 @@ def link_existing_loop_to_current_thread() -> None:

import asyncio

# Only track if there's actually a running loop
running_loop: typing.Optional["asyncio.AbstractEventLoop"] = None
try:
running_loop = asyncio.get_running_loop()
except RuntimeError:
# No existing loop to track, nothing to do
current_loop = _get_current_or_thread_loop(asyncio)
if current_loop is None:
return

# We have a running loop, track it
stack.track_asyncio_loop(typing.cast(int, ddtrace_threading.current_thread().ident), running_loop)
stack.track_asyncio_loop(typing.cast(int, ddtrace_threading.current_thread().ident), current_loop)
_call_init_asyncio(asyncio)


Expand Down Expand Up @@ -121,9 +144,10 @@ def _(f: typing.Callable[..., None], args: tuple[typing.Any, ...], kwargs: dict[
children = get_argument_value(args, kwargs, 1, "children")
assert children is not None # nosec: assert is used for typing

parent = globals()["current_task"]()
for child in children:
stack.link_tasks(parent, child)
parent = _current_task_or_none()
if parent is not None:
for child in children:
stack.link_tasks(parent, child)

@partial(wrap, sys.modules["asyncio"].tasks._wait)
def _(
Expand All @@ -136,9 +160,10 @@ def _(
finally:
futures = typing.cast(set["aio.Future[typing.Any]"], get_argument_value(args, kwargs, 0, "fs"))

parent = typing.cast("aio.Task[typing.Any]", globals()["current_task"]())
for future in futures:
stack.link_tasks(parent, future)
parent = _current_task_or_none()
if parent is not None:
for future in futures:
stack.link_tasks(parent, future)

@partial(wrap, sys.modules["asyncio"].tasks.as_completed)
def _(
Expand All @@ -147,7 +172,7 @@ def _(
kwargs: dict[str, typing.Any],
) -> typing.Any:
loop = typing.cast(typing.Optional["aio.AbstractEventLoop"], kwargs.get("loop"))
parent: typing.Optional["aio.Task[typing.Any]"] = globals()["current_task"]()
parent = _current_task_or_none()

if parent is not None:
fs = typing.cast(typing.Iterable["aio.Future[typing.Any]"], get_argument_value(args, kwargs, 0, "fs"))
Expand Down Expand Up @@ -178,7 +203,7 @@ def _(
awaitable = typing.cast("aio.Future[typing.Any]", get_argument_value(args, kwargs, 0, "arg"))
future = asyncio.ensure_future(awaitable, loop=loop)

parent = globals()["current_task"]()
parent = _current_task_or_none()
if parent is not None:
stack.link_tasks(parent, future)

Expand Down Expand Up @@ -207,13 +232,30 @@ def _(
) -> typing.Any:
result = f(*args, **kwargs)

parent = globals()["current_task"]()
parent = _current_task_or_none()
if parent is not None and result is not None:
# Link parent task to the task created by TaskGroup
stack.link_tasks(parent, result)

return result

# AIDEV-NOTE: `loop.create_task()` is valid before the loop starts, unlike
# `asyncio.create_task()`. Use the safe helper so direct loop scheduling
# does not raise `RuntimeError: no running event loop` when we probe parentage.
@partial(wrap, sys.modules["asyncio"].base_events.BaseEventLoop.create_task)
def _(
f: typing.Callable[..., "aio.Task[typing.Any]"],
args: tuple[typing.Any, ...],
kwargs: dict[str, typing.Any],
) -> "aio.Task[typing.Any]":
task: "aio.Task[typing.Any]" = f(*args, **kwargs)
parent = _current_task_or_none()

if parent is not None:
stack.weak_link_tasks(parent, task)

return task

# Note: asyncio.timeout and asyncio.timeout_at don't create child tasks.
# They are context managers that schedule a callback to cancel the current task
# if it times out. The timeout._task is the same as the current task, so there's
Expand All @@ -227,7 +269,7 @@ def _(
) -> "aio.Task[typing.Any]":
# kwargs will typically contain context (Python 3.11+ only) and eager_start (Python 3.14+ only)
task: "aio.Task[typing.Any]" = f(*args, **kwargs)
parent: typing.Optional["aio.Task[typing.Any]"] = globals()["current_task"]()
parent = _current_task_or_none()

if parent is not None:
stack.weak_link_tasks(parent, task)
Expand Down
8 changes: 5 additions & 3 deletions ddtrace/profiling/_gevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,11 @@ def _untrack_greenlet_by_id(greenlet_id: int) -> None:
_tracked_greenlets.discard(greenlet_id)
_parent_greenlet_count.pop(greenlet_id, None)
if (parent_id := _greenlet_parent_map.pop(greenlet_id, None)) is not None:
_parent_greenlet_count[parent_id] -= 1
if _parent_greenlet_count[parent_id] <= 0:
del _parent_greenlet_count[parent_id]
remaining = _parent_greenlet_count.get(parent_id, 0) - 1
if remaining <= 0:
_parent_greenlet_count.pop(parent_id, None)
else:
_parent_greenlet_count[parent_id] = remaining


def untrack_greenlet(gl: _Greenlet) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
profiling: A ``KeyError`` that could occur when using ``gevent.Timeout`` has
been fixed.
2 changes: 0 additions & 2 deletions tests/profiling/collector/test_asyncio_import_order.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,6 @@ async def main_task():
)


@pytest.mark.xfail(reason="No way to get the current loop if it is set but not running.")
@pytest.mark.subprocess(
env=dict(
DD_PROFILING_OUTPUT_PPROF="/tmp/test_asyncio_start_profiler_from_process_after_creating_loop",
Expand Down Expand Up @@ -378,7 +377,6 @@ async def main_task():
)


@pytest.mark.xfail(reason="This test fails because there's no way to get the current loop if it's not already running.")
@pytest.mark.subprocess(
env=dict(
DD_PROFILING_OUTPUT_PPROF="/tmp/test_asyncio_import_profiler_from_process_after_starting_loop",
Expand Down
Loading
Loading