Skip to content

fix(core): _message_tasks set is accessed from multiple async contexts without a lock#535

Open
chinesepowered wants to merge 8 commits intorocketride-org:developfrom
chinesepowered:async-task-race
Open

fix(core): _message_tasks set is accessed from multiple async contexts without a lock#535
chinesepowered wants to merge 8 commits intorocketride-org:developfrom
chinesepowered:async-task-race

Conversation

@chinesepowered
Copy link
Copy Markdown
Contributor

@chinesepowered chinesepowered commented Mar 30, 2026

Summary

Race condition in async task tracking — packages/client-python/src/rocketride/core/transport_websocket.py:244-246
_message_tasks set is accessed from multiple async contexts without a lock

Type

fix

Testing

  • Tests added or updated
  • Tested locally
  • ./builder test passes

Checklist

  • Commit messages follow conventional commits
  • No secrets or credentials included
  • Wiki updated (if applicable)
  • Breaking changes documented (if applicable)

Summary by CodeRabbit

  • Bug Fixes
    • Enhanced WebSocket connection handling with improved synchronization to ensure reliable cleanup and task management during disconnection scenarios.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Mar 30, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Thread-safe synchronization added to WebSocket message task handling. A threading.Lock protects _message_tasks access, with a dedicated callback method for task completion. The disconnect() method updated to cancel in-flight tasks atomically and await their completion before finalizing transport disconnection.

Changes

Cohort / File(s) Summary
WebSocket Message Task Synchronization
packages/client-python/src/rocketride/core/transport_websocket.py
Added _message_tasks_lock for thread-safe access to _message_tasks. Introduced _on_message_task_done() callback to handle task completion. Updated _receive_loop() to register tasks under lock. Enhanced disconnect() to atomically cancel in-flight tasks, await completion, and safely clear the task collection.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~12 minutes

Poem

🐰 Lock and load, we say with glee,
Tasks synchronized, race-condition free!
The websocket hops with thread-safe care,
No tangled messages left in the air.

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: adding synchronization (lock) to protect _message_tasks accessed from multiple async contexts.
Docstring Coverage ✅ Passed Docstring coverage is 85.71% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/client-python/src/rocketride/core/transport_websocket.py`:
- Around line 244-248: The done-callback mutates self._message_tasks without
acquiring self._message_tasks_lock, causing a race with disconnect(); fix by
replacing the lambda with a small coroutine-based remover: have the
task.add_done_callback call a synchronous wrapper that does
asyncio.create_task(self._remove_message_task(t)), and implement
_remove_message_task(self, t) as an async method that uses "async with
self._message_tasks_lock" to discard t from self._message_tasks; ensure
disconnect() still uses the same lock when clearing the set. This keeps lock
protection consistent for _message_tasks modifications.
- Around line 121-122: The done-callback that calls
self._message_tasks.discard(t) is synchronous and cannot use an asyncio.Lock;
replace self._message_tasks_lock = asyncio.Lock() with a threading.Lock (add
import threading) so the callback can synchronously acquire/release it and async
code can still use it synchronously; update all usages of _message_tasks_lock
(e.g., any "async with self._message_tasks_lock" or "await
self._message_tasks_lock.acquire()") to synchronous acquire()/release() blocks
(or use the lock as a context manager) — specifically adjust the disconnect()
logic and the done-callback surrounding self._message_tasks modifications to use
self._message_tasks_lock.acquire()/release() so mutations are protected from
both sync callbacks and async code.
- Around line 456-458: The block currently uses "async with
self._message_tasks_lock:" which only works for an async lock; change it to use
a synchronous context manager with the threading.Lock instance by replacing
"async with self._message_tasks_lock:" with "with self._message_tasks_lock:"
(ensure _message_tasks_lock is initialized as threading.Lock and imported), so
that self._message_tasks.clear() is executed while holding the threading lock;
update any initialization of _message_tasks_lock to use threading.Lock() and
adjust surrounding code if necessary to use a normal with-block rather than an
async context.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: e8b71132-0dec-466d-a543-fdc60704ff8c

📥 Commits

Reviewing files that changed from the base of the PR and between b6b2dc1 and e8a61fb.

📒 Files selected for processing (1)
  • packages/client-python/src/rocketride/core/transport_websocket.py

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/client-python/src/rocketride/core/transport_websocket.py`:
- Around line 462-464: In disconnect(), before clearing self._message_tasks,
iterate the set under self._message_tasks_lock to call cancel() on each Task (or
schedule cancellation), then copy the tasks out, clear the set, release the
lock, and await asyncio.gather(*copied_tasks, return_exceptions=True) to ensure
background tasks are cancelled and any exceptions swallowed; update references
in the disconnect implementation around _message_tasks/_message_tasks_lock and
ensure tasks that may call _transport_receive are cancelled before
self._websocket is set to None.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: dfd4b405-8177-4988-aba1-e234501823a3

📥 Commits

Reviewing files that changed from the base of the PR and between e8a61fb and 3a045aa.

📒 Files selected for processing (1)
  • packages/client-python/src/rocketride/core/transport_websocket.py

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/client-python/src/rocketride/core/transport_websocket.py`:
- Around line 463-470: The disconnect flow currently calls
self._transport_disconnected() before flipping self._connected and
cancelling/awaiting in-flight message tasks, allowing _receive_data() /
super()._transport_receive() to interleave; move the transition self._connected
= False and the block that locks self._message_tasks_lock, cancels non-done
tasks from self._message_tasks, clears the list, and awaits them (using
asyncio.gather with return_exceptions=True) to occur before calling
self._transport_disconnected(), and add a regression test that asserts
on_disconnected() runs only after all _message_tasks have been cancelled and
completed (i.e., no interleaving from _receive_data()).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: b2a26719-2bfd-4c5d-b92d-c98365a95f48

📥 Commits

Reviewing files that changed from the base of the PR and between 3a045aa and 3ad8e2a.

📒 Files selected for processing (1)
  • packages/client-python/src/rocketride/core/transport_websocket.py

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/client-python/src/rocketride/core/transport_websocket.py`:
- Around line 470-476: In the finally block of the TransportWebSocket cleanup
(the block that sets self._websocket and self._receive_task to None), remove the
redundant hasattr(self, '_message_tasks') check and always acquire
self._message_tasks_lock then clear self._message_tasks; __init__ already
initializes _message_tasks and _message_tasks_lock, so simply use "with
self._message_tasks_lock: self._message_tasks.clear()" to simplify the cleanup
logic.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

Run ID: 561a6b0c-dbf7-4dc4-82e1-f88c3627039f

📥 Commits

Reviewing files that changed from the base of the PR and between f4149da and 11c53bc.

📒 Files selected for processing (1)
  • packages/client-python/src/rocketride/core/transport_websocket.py

Copy link
Copy Markdown
Collaborator

@asclearuc asclearuc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your contribution and for digging into the WebSocket disconnect flow!

After review, we won't be merging this PR in its current form. The core concern is that _message_tasks is not actually subject to a data race in standard asyncio. Python's event loop is single-threaded and cooperative — coroutines only yield at await points, so there is no concurrent mutation of the set between create_task and .add(). Introducing a threading.Lock around a set that is already safely accessed within a single-threaded event loop adds synchronization overhead without solving a real problem, and using a blocking lock inside async functions can stall the event loop if contention ever occurs.

That said, there is a real and valid improvement hiding in this PR: the ordering guarantee — ensuring in-flight message tasks are cancelled and drained before _transport_disconnected() is called, so _receive_data cannot interleave with the disconnect callback.

We'd welcome a simplified version that focuses on just that:

  • Move self._connected = False to before the disconnect notification (keep it in finally as well as a safety net)
  • Cancel and await asyncio.gather(...) on in-flight tasks before calling _transport_disconnected()
  • No lock needed

That change would be clean, correct, and easy to review. Please feel free to reopen with a revised approach!

@chinesepowered
Copy link
Copy Markdown
Contributor Author

@asclearuc done, can you re-review?

@chinesepowered chinesepowered requested a review from asclearuc April 3, 2026 03:57
Copy link
Copy Markdown
Collaborator

@asclearuc asclearuc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the revision — the threading.Lock is gone, the ordering guarantee is in place, and _cleanup_and_disconnect reads cleanly. This is much closer to what we need.

One correctness issue to fix before merge: _transport_disconnected can fire twice in a single disconnect() call when the websocket close itself throws an exception (see inline comment at line 451). The fix is a one-line early-return guard at the top of _cleanup_and_disconnect.

There's also a minor redundancy in the finally block (line 469) - message tasks are cleared
in _cleanup_and_disconnect

Please see the inline comments for details.

has_error: Whether this was an error disconnection
"""
# Stop accepting new messages immediately
self._connected = False
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The happy-path call to _cleanup_and_disconnect (line 451) fires _transport_disconnected before control can reach any exception handler. But if self._websocket.close() at line 446 throws, the exception handlers at lines 453–463 each call _cleanup_and_disconnect again — resulting in _transport_disconnected being called twice.

The previous callback_called guard prevented exactly this. The simplest fix is a guard at the top of _cleanup_and_disconnect:

async def _cleanup_and_disconnect(self, reason: str, has_error: bool) -> None:
    if not self._connected:  # already disconnected, skip
        return
    self._connected = False
    ...

@nihalnihalani
Copy link
Copy Markdown
Contributor

Senior Review — PR #535: fix(core): _message_tasks set race condition

@chinesepowered, excellent work simplifying this per asclearuc's guidance. The race condition fix is well-reasoned and the code is cleaner for it.

Status: This is very close to merge. Two small items remain:

1. Add early-return guard in _cleanup_and_disconnect

Add a one-line check at the top of _cleanup_and_disconnect to prevent double _transport_disconnected() calls:

if self._transport is None:
    return

(or whatever sentinel is appropriate for your transport state)

Without this, rapid successive disconnects can still trigger the cleanup path twice.

2. Remove redundant message task clearing in finally block

There's a _message_tasks clearing operation in a finally block that is now redundant with the improved cleanup flow. Removing it keeps the cleanup logic in one place and avoids confusion about which code path is responsible for task teardown.

Both of these are minor. Once addressed, this should be ready to merge. Great job on the simplification.

@nihalnihalani
Copy link
Copy Markdown
Contributor

🚀 Merge Request

Excellent simplification per asclearuc's guidance. Very close to merge.

Before merge (one-line fix):

  • Add early-return guard at top of _cleanup_and_disconnect to prevent double _transport_disconnected() calls
  • Remove redundant message task clearing in finally block

This is a minimal fix with high value. @asclearuc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

module:client-python Python SDK and MCP client

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants