Refactor: Use asyncio.wrap_future for AWS IoT SDK operations#17
Conversation
Replace run_in_executor with asyncio.wrap_future for more efficient async integration with AWS IoT SDK's concurrent.futures.Future objects. Changes: - Replace run_in_executor wrapping with asyncio.wrap_future() in: - connect(): Connection establishment - disconnect(): Connection teardown - subscribe(): Topic subscription - unsubscribe(): Topic unsubscription - publish(): Message publishing - Fix anti_legionella_example.py race condition: - Filter status responses by command code - Skip command echoes on /ctrl topic - Import and use command constants instead of hardcoded values - Update documentation: - MQTT_CLIENT.rst: Document asyncio.wrap_future approach - CHANGELOG.rst: Update non-blocking implementation description Benefits: - More efficient: No thread pool resources used - Simpler code: 51 fewer lines - Truly async: AWS SDK Futures are already non-blocking - Better performance: No thread context switching overhead The public API remains unchanged - this is a pure internal optimization. Fixes #<issue-number>
Add guidance to run `make ci-lint` before finalizing changes to ensure code will pass CI checks. This helps prevent "passes locally but fails in CI" issues by running the exact same linting configuration as the CI pipeline.
There was a problem hiding this comment.
Pull Request Overview
This PR refactors the MQTT client implementation to replace thread pool execution with direct asyncio Future wrapping for better async performance. The key motivation is to leverage the fact that AWS IoT SDK already returns non-blocking Future objects.
- Replace
run_in_executor()calls withasyncio.wrap_future()for all MQTT operations - Fix race condition in anti-legionella example by filtering status responses by command code
- Update documentation to reflect the new implementation approach
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/nwp500/mqtt_client.py | Replaced thread pool execution with asyncio.wrap_future for connect, disconnect, subscribe, unsubscribe, and publish operations |
| examples/anti_legionella_example.py | Fixed race condition by filtering status responses by command code and importing command constants |
| docs/MQTT_CLIENT.rst | Updated implementation details to document asyncio.wrap_future approach |
| CHANGELOG.rst | Updated description of non-blocking implementation to mention wrap_future |
| .github/copilot-instructions.md | Added CI-compatible linting commands and pre-commit guidelines |
Add return statement and type annotation to unsubscribe() to match AWS SDK behavior and maintain consistency with subscribe() and publish() methods. Changes: - Add return type annotation (-> int) to unsubscribe() - Return packet_id from unsubscribe operation - Update docstring to document return value Addresses PR review comments about missing packet_id return value. The AWS SDK provides packet_id for all operations (subscribe, unsubscribe, publish) which can be useful for tracking MQTT packet lifecycle.
Review Comments AddressedI've reviewed all three comments about packet_id handling: Comment 1: publish() packet_id✅ Already correct - The Comment 2: subscribe() packet_id✅ Already correct - The Comment 3: unsubscribe() packet_id✅ Fixed in commit d9812b2 - Added:
The All methods now properly return packet_ids that can be used for tracking MQTT packet lifecycle and QoS guarantees. |
Summary
Replace
run_in_executorwithasyncio.wrap_future()for more efficient async integration with AWS IoT SDK'sconcurrent.futures.Futureobjects.Changes
Core Refactoring (mqtt_client.py)
asyncio.wrap_future()instead ofrun_in_executor()asyncio.wrap_future()instead ofrun_in_executor()asyncio.wrap_future()instead ofrun_in_executor()asyncio.wrap_future()instead ofrun_in_executor()asyncio.wrap_future()instead ofrun_in_executor()Bug Fix (anti_legionella_example.py)
Fixed race condition where the example was capturing stale status responses:
/ctrltopic)Documentation Updates
asyncio.wrap_future()approachWhy This Is Better
Before:
After:
Benefits
✅ More efficient: No thread pool resources used
✅ Simpler code: 51 fewer lines, more readable
✅ Truly async: AWS SDK's Futures are already non-blocking (resolved by internal callbacks)
✅ Better performance: No thread context switching overhead
✅ No API changes: Public interface remains identical
Technical Details
The AWS IoT SDK (
awsiotsdk) returnsconcurrent.futures.Futureobjects that are completed asynchronously by AWS's internal event loops. These Futures are already non-blocking - they're resolved via callbacks in the SDK's internal threads.Using
asyncio.wrap_future()properly integrates these Futures with asyncio's event loop without wasting thread pool resources on blocking calls.Testing
Migration Notes
No migration needed - the public API is unchanged. This is a pure internal optimization.