fix(core): _message_tasks set is accessed from multiple async contexts without a lock#535
fix(core): _message_tasks set is accessed from multiple async contexts without a lock#535chinesepowered wants to merge 8 commits intorocketride-org:developfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThread-safe synchronization added to WebSocket message task handling. A Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~12 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/client-python/src/rocketride/core/transport_websocket.py`:
- Around line 244-248: The done-callback mutates self._message_tasks without
acquiring self._message_tasks_lock, causing a race with disconnect(); fix by
replacing the lambda with a small coroutine-based remover: have the
task.add_done_callback call a synchronous wrapper that does
asyncio.create_task(self._remove_message_task(t)), and implement
_remove_message_task(self, t) as an async method that uses "async with
self._message_tasks_lock" to discard t from self._message_tasks; ensure
disconnect() still uses the same lock when clearing the set. This keeps lock
protection consistent for _message_tasks modifications.
- Around line 121-122: The done-callback that calls
self._message_tasks.discard(t) is synchronous and cannot use an asyncio.Lock;
replace self._message_tasks_lock = asyncio.Lock() with a threading.Lock (add
import threading) so the callback can synchronously acquire/release it and async
code can still use it synchronously; update all usages of _message_tasks_lock
(e.g., any "async with self._message_tasks_lock" or "await
self._message_tasks_lock.acquire()") to synchronous acquire()/release() blocks
(or use the lock as a context manager) — specifically adjust the disconnect()
logic and the done-callback surrounding self._message_tasks modifications to use
self._message_tasks_lock.acquire()/release() so mutations are protected from
both sync callbacks and async code.
- Around line 456-458: The block currently uses "async with
self._message_tasks_lock:" which only works for an async lock; change it to use
a synchronous context manager with the threading.Lock instance by replacing
"async with self._message_tasks_lock:" with "with self._message_tasks_lock:"
(ensure _message_tasks_lock is initialized as threading.Lock and imported), so
that self._message_tasks.clear() is executed while holding the threading lock;
update any initialization of _message_tasks_lock to use threading.Lock() and
adjust surrounding code if necessary to use a normal with-block rather than an
async context.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: e8b71132-0dec-466d-a543-fdc60704ff8c
📒 Files selected for processing (1)
packages/client-python/src/rocketride/core/transport_websocket.py
packages/client-python/src/rocketride/core/transport_websocket.py
Outdated
Show resolved
Hide resolved
packages/client-python/src/rocketride/core/transport_websocket.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/client-python/src/rocketride/core/transport_websocket.py`:
- Around line 462-464: In disconnect(), before clearing self._message_tasks,
iterate the set under self._message_tasks_lock to call cancel() on each Task (or
schedule cancellation), then copy the tasks out, clear the set, release the
lock, and await asyncio.gather(*copied_tasks, return_exceptions=True) to ensure
background tasks are cancelled and any exceptions swallowed; update references
in the disconnect implementation around _message_tasks/_message_tasks_lock and
ensure tasks that may call _transport_receive are cancelled before
self._websocket is set to None.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: dfd4b405-8177-4988-aba1-e234501823a3
📒 Files selected for processing (1)
packages/client-python/src/rocketride/core/transport_websocket.py
packages/client-python/src/rocketride/core/transport_websocket.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/client-python/src/rocketride/core/transport_websocket.py`:
- Around line 463-470: The disconnect flow currently calls
self._transport_disconnected() before flipping self._connected and
cancelling/awaiting in-flight message tasks, allowing _receive_data() /
super()._transport_receive() to interleave; move the transition self._connected
= False and the block that locks self._message_tasks_lock, cancels non-done
tasks from self._message_tasks, clears the list, and awaits them (using
asyncio.gather with return_exceptions=True) to occur before calling
self._transport_disconnected(), and add a regression test that asserts
on_disconnected() runs only after all _message_tasks have been cancelled and
completed (i.e., no interleaving from _receive_data()).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: b2a26719-2bfd-4c5d-b92d-c98365a95f48
📒 Files selected for processing (1)
packages/client-python/src/rocketride/core/transport_websocket.py
packages/client-python/src/rocketride/core/transport_websocket.py
Outdated
Show resolved
Hide resolved
This reverts commit f4149da.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@packages/client-python/src/rocketride/core/transport_websocket.py`:
- Around line 470-476: In the finally block of the TransportWebSocket cleanup
(the block that sets self._websocket and self._receive_task to None), remove the
redundant hasattr(self, '_message_tasks') check and always acquire
self._message_tasks_lock then clear self._message_tasks; __init__ already
initializes _message_tasks and _message_tasks_lock, so simply use "with
self._message_tasks_lock: self._message_tasks.clear()" to simplify the cleanup
logic.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 561a6b0c-dbf7-4dc4-82e1-f88c3627039f
📒 Files selected for processing (1)
packages/client-python/src/rocketride/core/transport_websocket.py
packages/client-python/src/rocketride/core/transport_websocket.py
Outdated
Show resolved
Hide resolved
asclearuc
left a comment
There was a problem hiding this comment.
Thank you for your contribution and for digging into the WebSocket disconnect flow!
After review, we won't be merging this PR in its current form. The core concern is that _message_tasks is not actually subject to a data race in standard asyncio. Python's event loop is single-threaded and cooperative — coroutines only yield at await points, so there is no concurrent mutation of the set between create_task and .add(). Introducing a threading.Lock around a set that is already safely accessed within a single-threaded event loop adds synchronization overhead without solving a real problem, and using a blocking lock inside async functions can stall the event loop if contention ever occurs.
That said, there is a real and valid improvement hiding in this PR: the ordering guarantee — ensuring in-flight message tasks are cancelled and drained before _transport_disconnected() is called, so _receive_data cannot interleave with the disconnect callback.
We'd welcome a simplified version that focuses on just that:
- Move
self._connected = Falseto before the disconnect notification (keep it infinallyas well as a safety net) - Cancel and
await asyncio.gather(...)on in-flight tasks before calling_transport_disconnected() - No lock needed
That change would be clean, correct, and easy to review. Please feel free to reopen with a revised approach!
|
@asclearuc done, can you re-review? |
asclearuc
left a comment
There was a problem hiding this comment.
Thanks for the revision — the threading.Lock is gone, the ordering guarantee is in place, and _cleanup_and_disconnect reads cleanly. This is much closer to what we need.
One correctness issue to fix before merge: _transport_disconnected can fire twice in a single disconnect() call when the websocket close itself throws an exception (see inline comment at line 451). The fix is a one-line early-return guard at the top of _cleanup_and_disconnect.
There's also a minor redundancy in the finally block (line 469) - message tasks are cleared
in _cleanup_and_disconnect
Please see the inline comments for details.
| has_error: Whether this was an error disconnection | ||
| """ | ||
| # Stop accepting new messages immediately | ||
| self._connected = False |
There was a problem hiding this comment.
The happy-path call to _cleanup_and_disconnect (line 451) fires _transport_disconnected before control can reach any exception handler. But if self._websocket.close() at line 446 throws, the exception handlers at lines 453–463 each call _cleanup_and_disconnect again — resulting in _transport_disconnected being called twice.
The previous callback_called guard prevented exactly this. The simplest fix is a guard at the top of _cleanup_and_disconnect:
async def _cleanup_and_disconnect(self, reason: str, has_error: bool) -> None:
if not self._connected: # already disconnected, skip
return
self._connected = False
...
Senior Review — PR #535: fix(core): _message_tasks set race condition@chinesepowered, excellent work simplifying this per asclearuc's guidance. The race condition fix is well-reasoned and the code is cleaner for it. Status: This is very close to merge. Two small items remain: 1. Add early-return guard in
|
🚀 Merge RequestExcellent simplification per asclearuc's guidance. Very close to merge. Before merge (one-line fix):
This is a minimal fix with high value. @asclearuc |
Summary
Race condition in async task tracking — packages/client-python/src/rocketride/core/transport_websocket.py:244-246
_message_tasks set is accessed from multiple async contexts without a lock
Type
fix
Testing
./builder testpassesChecklist
Summary by CodeRabbit