From 4495d25bc76d41877d9d20e2d144da4c14bafdb5 Mon Sep 17 00:00:00 2001 From: Masen Furer Date: Thu, 21 Aug 2025 12:05:01 -0700 Subject: [PATCH] Provide descriptive names for asyncio.Task Reflex internally spawns async tasks for background event processing, emitting websocket data, lifespan, telementry, and resolving async computed vars. Now these tasks all have descriptive names that include the event being processed, the token, and a timestamp of when the task started. This extra information in the task name allows users to better identify where potentially problems in the app are hiding. --- reflex/app.py | 18 +++++++++++++----- reflex/app_mixins/lifespan.py | 10 ++++++++-- reflex/istate/manager.py | 3 ++- reflex/state.py | 6 +++++- reflex/utils/telemetry.py | 5 ++++- 5 files changed, 32 insertions(+), 10 deletions(-) diff --git a/reflex/app.py b/reflex/app.py index 7aba623a4c6..1b7a6da1993 100644 --- a/reflex/app.py +++ b/reflex/app.py @@ -12,6 +12,7 @@ import io import json import sys +import time import traceback import urllib.parse from collections.abc import ( @@ -1582,7 +1583,10 @@ async def _coro(): sid=state.router.session.session_id, ) - task = asyncio.create_task(_coro()) + task = asyncio.create_task( + _coro(), + name=f"reflex_background_task|{event.name}|{time.time()}|{event.token}", + ) self._background_tasks.add(task) # Clean up task from background_tasks set when complete. task.add_done_callback(self._background_tasks.discard) @@ -1727,7 +1731,8 @@ async def process( "reload", data=event, to=sid, - ) + ), + name=f"reflex_emit_reload|{event.name}|{time.time()}|{event.token}", ) return # re-assign only when the value is different @@ -2028,7 +2033,8 @@ def on_disconnect(self, sid: str): if disconnect_token: # Use async cleanup through token manager task = asyncio.create_task( - self._token_manager.disconnect_token(disconnect_token, sid) + self._token_manager.disconnect_token(disconnect_token, sid), + name=f"reflex_disconnect_token|{disconnect_token}|{time.time()}", ) # Don't await to avoid blocking disconnect, but handle potential errors task.add_done_callback( @@ -2047,12 +2053,14 @@ async def emit_update(self, update: StateUpdate, sid: str) -> None: # If the sid is None, we are not connected to a client. Prevent sending # updates to all clients. return - if sid not in self.sid_to_token: + token = self.sid_to_token.get(sid) + if token is None: console.warn(f"Attempting to send delta to disconnected websocket {sid}") return # Creating a task prevents the update from being blocked behind other coroutines. await asyncio.create_task( - self.emit(str(constants.SocketEvent.EVENT), update, to=sid) + self.emit(str(constants.SocketEvent.EVENT), update, to=sid), + name=f"reflex_emit_event|{token}|{sid}|{time.time()}", ) async def on_event(self, sid: str, data: Any): diff --git a/reflex/app_mixins/lifespan.py b/reflex/app_mixins/lifespan.py index 44e3a884280..6079fdd4d8d 100644 --- a/reflex/app_mixins/lifespan.py +++ b/reflex/app_mixins/lifespan.py @@ -7,6 +7,7 @@ import dataclasses import functools import inspect +import time from collections.abc import Callable, Coroutine from starlette.applications import Starlette @@ -36,6 +37,7 @@ async def _run_lifespan_tasks(self, app: Starlette): if isinstance(task, asyncio.Task): running_tasks.append(task) else: + task_name = task.__name__ signature = inspect.signature(task) if "app" in signature.parameters: task = functools.partial(task, app=app) @@ -44,7 +46,10 @@ async def _run_lifespan_tasks(self, app: Starlette): await stack.enter_async_context(_t) console.debug(run_msg.format(type="asynccontextmanager")) elif isinstance(_t, Coroutine): - task_ = asyncio.create_task(_t) + task_ = asyncio.create_task( + _t, + name=f"reflex_lifespan_task|{task_name}|{time.time()}", + ) task_.add_done_callback(lambda t: t.result()) running_tasks.append(task_) console.debug(run_msg.format(type="coroutine")) @@ -70,9 +75,10 @@ def register_lifespan_task(self, task: Callable | asyncio.Task, **task_kwargs): msg = f"Task {task.__name__} of type generator must be decorated with contextlib.asynccontextmanager." raise InvalidLifespanTaskTypeError(msg) + task_name = task.__name__ # pyright: ignore [reportAttributeAccessIssue] if task_kwargs: original_task = task task = functools.partial(task, **task_kwargs) # pyright: ignore [reportArgumentType] functools.update_wrapper(task, original_task) # pyright: ignore [reportArgumentType] self.lifespan_tasks.add(task) - console.debug(f"Registered lifespan task: {task.__name__}") # pyright: ignore [reportAttributeAccessIssue] + console.debug(f"Registered lifespan task: {task_name}") diff --git a/reflex/istate/manager.py b/reflex/istate/manager.py index fac0f64586b..e4a0c15ef5b 100644 --- a/reflex/istate/manager.py +++ b/reflex/istate/manager.py @@ -667,7 +667,8 @@ async def set_state( _substate_key(client_token, substate), substate, lock_id, - ) + ), + name=f"reflex_set_state|{client_token}|{substate.get_full_name()}", ) for substate in state.substates.values() ] diff --git a/reflex/state.py b/reflex/state.py index 4240ee010c2..908ac30a241 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -11,6 +11,7 @@ import inspect import pickle import sys +import time import typing import warnings from collections.abc import AsyncIterator, Callable, Sequence @@ -284,7 +285,10 @@ async def _resolve_delta(delta: Delta) -> Delta: for state_name, state_delta in delta.items(): for var_name, value in state_delta.items(): if asyncio.iscoroutine(value): - tasks[state_name, var_name] = asyncio.create_task(value) + tasks[state_name, var_name] = asyncio.create_task( + value, + name=f"reflex_resolve_delta|{state_name}|{var_name}|{time.time()}", + ) for (state_name, var_name), task in tasks.items(): delta[state_name][var_name] = await task return delta diff --git a/reflex/utils/telemetry.py b/reflex/utils/telemetry.py index ef00a21b24b..e1f01c1a95e 100644 --- a/reflex/utils/telemetry.py +++ b/reflex/utils/telemetry.py @@ -353,7 +353,10 @@ async def async_send(event: str, telemetry_enabled: bool | None, **kwargs): try: # Within an event loop context, send the event asynchronously. - task = asyncio.create_task(async_send(event, telemetry_enabled, **kwargs)) + task = asyncio.create_task( + async_send(event, telemetry_enabled, **kwargs), + name=f"reflex_send_telemetry_event|{event}", + ) background_tasks.add(task) task.add_done_callback(background_tasks.discard) except RuntimeError: