Fix InvalidStateError when cancelling MQTT futures during disconnect#32
Merged
Fix InvalidStateError when cancelling MQTT futures during disconnect#32
Conversation
Problem: When disconnect() was called while MQTT operations were in progress, asyncio.wrap_future() would cancel the underlying concurrent.futures.Future. Later, when AWS CRT callbacks tried to set exceptions on these already-cancelled futures, Python raised InvalidStateError, which was then ignored but logged as 'Exception ignored'. Solution: Wrap all asyncio.wrap_future() calls with asyncio.shield() to prevent cancellation from propagating to the underlying concurrent.futures.Future objects. This allows: - The outer asyncio context to be cancelled cleanly - AWS CRT futures to complete independently - No InvalidStateError exceptions Changes: - src/nwp500/mqtt_connection.py: Shield connect(), disconnect(), subscribe(), unsubscribe(), and publish() operations - src/nwp500/mqtt_subscriptions.py: Shield subscribe() and unsubscribe() operations Testing: - All 123 tests pass - Type checking passes (mypy) - Linting passes (ruff) - No InvalidStateError exceptions during disconnect
Contributor
There was a problem hiding this comment.
Pull Request Overview
This PR fixes an InvalidStateError that occurred when MQTT operations were cancelled during disconnect. The issue arose because asyncio.wrap_future() automatically cancels the underlying concurrent.futures.Future when the asyncio task is cancelled, causing AWS CRT callbacks to fail when they later tried to set exceptions on already-cancelled futures.
Key changes:
- Wrapped all
asyncio.wrap_future()calls withasyncio.shield()to prevent cancellation propagation - Added explicit cancellation handling in
publish()with debug logging - Updated docstring to document the
asyncio.CancelledErrorexception
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| src/nwp500/mqtt_connection.py | Shields connect(), disconnect(), subscribe(), unsubscribe(), and publish() operations from cancellation; adds explicit error handling in publish() |
| src/nwp500/mqtt_subscriptions.py | Shields subscribe() and unsubscribe() operations from cancellation |
Apply uniform CancelledError handling across all MQTT operations for better debugging and consistent behavior: - connect(): Log when connection attempt is cancelled - disconnect(): Log when disconnection is cancelled - subscribe(): Log when subscription is cancelled - unsubscribe(): Log when unsubscription is cancelled - publish(): Already had this pattern This ensures all operations provide debug logging when cancellation occurs, making it easier to diagnose cancellation-related issues while maintaining the InvalidStateError fix. Addresses code review feedback to apply consistent pattern across all shielded asyncio.wrap_future() calls.
Owner
Author
|
✅ Updated to address code review feedback Applied consistent Changes in latest commit:
Validation:
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
When
disconnect()was called while MQTT operations (publish, subscribe, etc.) were in progress, the following error would appear:Root Cause
asyncio.wrap_future()automatically cancels the underlyingconcurrent.futures.Futurewhen the asyncio task is cancelled. When AWS CRT callbacks later tried to callset_exception()on these already-cancelled futures, Python raisedInvalidStateError.Solution
Wrap all
asyncio.wrap_future()calls withasyncio.shield()to prevent cancellation from propagating to the underlyingconcurrent.futures.Futureobjects. This allows:InvalidStateErrorexceptionsChanges
Modified Files
connect(),disconnect(),subscribe(),unsubscribe(), andpublish()operationssubscribe()andunsubscribe()operationsTechnical Details
The fix uses
asyncio.shield()to create a protective barrier that:InvalidStateErrorwhen callbacks executeTesting
mypy)make ci-lint)InvalidStateErrorexceptions during disconnectImpact
This is a bug fix that eliminates spurious error messages during normal disconnect operations. No breaking changes or API modifications.