diff --git a/reflex/app.py b/reflex/app.py index 7aba623a4c6..1b7a6da1993 100644 --- a/reflex/app.py +++ b/reflex/app.py @@ -12,6 +12,7 @@ import io import json import sys +import time import traceback import urllib.parse from collections.abc import ( @@ -1582,7 +1583,10 @@ async def _coro(): sid=state.router.session.session_id, ) - task = asyncio.create_task(_coro()) + task = asyncio.create_task( + _coro(), + name=f"reflex_background_task|{event.name}|{time.time()}|{event.token}", + ) self._background_tasks.add(task) # Clean up task from background_tasks set when complete. task.add_done_callback(self._background_tasks.discard) @@ -1727,7 +1731,8 @@ async def process( "reload", data=event, to=sid, - ) + ), + name=f"reflex_emit_reload|{event.name}|{time.time()}|{event.token}", ) return # re-assign only when the value is different @@ -2028,7 +2033,8 @@ def on_disconnect(self, sid: str): if disconnect_token: # Use async cleanup through token manager task = asyncio.create_task( - self._token_manager.disconnect_token(disconnect_token, sid) + self._token_manager.disconnect_token(disconnect_token, sid), + name=f"reflex_disconnect_token|{disconnect_token}|{time.time()}", ) # Don't await to avoid blocking disconnect, but handle potential errors task.add_done_callback( @@ -2047,12 +2053,14 @@ async def emit_update(self, update: StateUpdate, sid: str) -> None: # If the sid is None, we are not connected to a client. Prevent sending # updates to all clients. return - if sid not in self.sid_to_token: + token = self.sid_to_token.get(sid) + if token is None: console.warn(f"Attempting to send delta to disconnected websocket {sid}") return # Creating a task prevents the update from being blocked behind other coroutines. await asyncio.create_task( - self.emit(str(constants.SocketEvent.EVENT), update, to=sid) + self.emit(str(constants.SocketEvent.EVENT), update, to=sid), + name=f"reflex_emit_event|{token}|{sid}|{time.time()}", ) async def on_event(self, sid: str, data: Any): diff --git a/reflex/app_mixins/lifespan.py b/reflex/app_mixins/lifespan.py index 44e3a884280..6079fdd4d8d 100644 --- a/reflex/app_mixins/lifespan.py +++ b/reflex/app_mixins/lifespan.py @@ -7,6 +7,7 @@ import dataclasses import functools import inspect +import time from collections.abc import Callable, Coroutine from starlette.applications import Starlette @@ -36,6 +37,7 @@ async def _run_lifespan_tasks(self, app: Starlette): if isinstance(task, asyncio.Task): running_tasks.append(task) else: + task_name = task.__name__ signature = inspect.signature(task) if "app" in signature.parameters: task = functools.partial(task, app=app) @@ -44,7 +46,10 @@ async def _run_lifespan_tasks(self, app: Starlette): await stack.enter_async_context(_t) console.debug(run_msg.format(type="asynccontextmanager")) elif isinstance(_t, Coroutine): - task_ = asyncio.create_task(_t) + task_ = asyncio.create_task( + _t, + name=f"reflex_lifespan_task|{task_name}|{time.time()}", + ) task_.add_done_callback(lambda t: t.result()) running_tasks.append(task_) console.debug(run_msg.format(type="coroutine")) @@ -70,9 +75,10 @@ def register_lifespan_task(self, task: Callable | asyncio.Task, **task_kwargs): msg = f"Task {task.__name__} of type generator must be decorated with contextlib.asynccontextmanager." raise InvalidLifespanTaskTypeError(msg) + task_name = task.__name__ # pyright: ignore [reportAttributeAccessIssue] if task_kwargs: original_task = task task = functools.partial(task, **task_kwargs) # pyright: ignore [reportArgumentType] functools.update_wrapper(task, original_task) # pyright: ignore [reportArgumentType] self.lifespan_tasks.add(task) - console.debug(f"Registered lifespan task: {task.__name__}") # pyright: ignore [reportAttributeAccessIssue] + console.debug(f"Registered lifespan task: {task_name}") diff --git a/reflex/istate/manager.py b/reflex/istate/manager.py index fac0f64586b..e4a0c15ef5b 100644 --- a/reflex/istate/manager.py +++ b/reflex/istate/manager.py @@ -667,7 +667,8 @@ async def set_state( _substate_key(client_token, substate), substate, lock_id, - ) + ), + name=f"reflex_set_state|{client_token}|{substate.get_full_name()}", ) for substate in state.substates.values() ] diff --git a/reflex/state.py b/reflex/state.py index 4240ee010c2..908ac30a241 100644 --- a/reflex/state.py +++ b/reflex/state.py @@ -11,6 +11,7 @@ import inspect import pickle import sys +import time import typing import warnings from collections.abc import AsyncIterator, Callable, Sequence @@ -284,7 +285,10 @@ async def _resolve_delta(delta: Delta) -> Delta: for state_name, state_delta in delta.items(): for var_name, value in state_delta.items(): if asyncio.iscoroutine(value): - tasks[state_name, var_name] = asyncio.create_task(value) + tasks[state_name, var_name] = asyncio.create_task( + value, + name=f"reflex_resolve_delta|{state_name}|{var_name}|{time.time()}", + ) for (state_name, var_name), task in tasks.items(): delta[state_name][var_name] = await task return delta diff --git a/reflex/utils/telemetry.py b/reflex/utils/telemetry.py index ef00a21b24b..e1f01c1a95e 100644 --- a/reflex/utils/telemetry.py +++ b/reflex/utils/telemetry.py @@ -353,7 +353,10 @@ async def async_send(event: str, telemetry_enabled: bool | None, **kwargs): try: # Within an event loop context, send the event asynchronously. - task = asyncio.create_task(async_send(event, telemetry_enabled, **kwargs)) + task = asyncio.create_task( + async_send(event, telemetry_enabled, **kwargs), + name=f"reflex_send_telemetry_event|{event}", + ) background_tasks.add(task) task.add_done_callback(background_tasks.discard) except RuntimeError: