Skip to content

Refactor: Use asyncio.wrap_future for AWS IoT SDK operations#17

Merged
eman merged 4 commits intomainfrom
feature/asyncio-wrap-future
Oct 17, 2025
Merged

Refactor: Use asyncio.wrap_future for AWS IoT SDK operations#17
eman merged 4 commits intomainfrom
feature/asyncio-wrap-future

Conversation

@eman
Copy link
Copy Markdown
Owner

@eman eman commented Oct 17, 2025

Summary

Replace run_in_executor with asyncio.wrap_future() for more efficient async integration with AWS IoT SDK's concurrent.futures.Future objects.

Changes

Core Refactoring (mqtt_client.py)

  • connect(): Use asyncio.wrap_future() instead of run_in_executor()
  • disconnect(): Use asyncio.wrap_future() instead of run_in_executor()
  • subscribe(): Use asyncio.wrap_future() instead of run_in_executor()
  • unsubscribe(): Use asyncio.wrap_future() instead of run_in_executor()
  • publish(): Use asyncio.wrap_future() instead of run_in_executor()

Bug Fix (anti_legionella_example.py)

Fixed race condition where the example was capturing stale status responses:

  • Filter status responses by command code to ensure we capture the correct response
  • Skip command echoes (messages on /ctrl topic)
  • Import and use command constants instead of hardcoded values
  • Now correctly shows DISABLED/ENABLED state after commands

Documentation Updates

  • MQTT_CLIENT.rst: Updated implementation details to document asyncio.wrap_future() approach
  • CHANGELOG.rst: Updated non-blocking implementation description

Why This Is Better

Before:

def _connect():
    connect_future = self._connection.connect()
    return connect_future.result()  # Blocks a thread!

connect_result = await self._loop.run_in_executor(None, _connect)

After:

connect_future = self._connection.connect()  # Returns concurrent.futures.Future
connect_result = await asyncio.wrap_future(connect_future)  # No thread blocking!

Benefits

More efficient: No thread pool resources used
Simpler code: 51 fewer lines, more readable
Truly async: AWS SDK's Futures are already non-blocking (resolved by internal callbacks)
Better performance: No thread context switching overhead
No API changes: Public interface remains identical

Technical Details

The AWS IoT SDK (awsiotsdk) returns concurrent.futures.Future objects that are completed asynchronously by AWS's internal event loops. These Futures are already non-blocking - they're resolved via callbacks in the SDK's internal threads.

Using asyncio.wrap_future() properly integrates these Futures with asyncio's event loop without wasting thread pool resources on blocking calls.

Testing

  • ✅ All 29 tests pass
  • ✅ Linting checks pass
  • ✅ Documentation builds successfully
  • ✅ Anti-Legionella example now works correctly (shows proper DISABLED/ENABLED states)

Migration Notes

No migration needed - the public API is unchanged. This is a pure internal optimization.

eman added 3 commits October 16, 2025 17:26
Replace run_in_executor with asyncio.wrap_future for more efficient async
integration with AWS IoT SDK's concurrent.futures.Future objects.

Changes:
- Replace run_in_executor wrapping with asyncio.wrap_future() in:
  - connect(): Connection establishment
  - disconnect(): Connection teardown
  - subscribe(): Topic subscription
  - unsubscribe(): Topic unsubscription
  - publish(): Message publishing

- Fix anti_legionella_example.py race condition:
  - Filter status responses by command code
  - Skip command echoes on /ctrl topic
  - Import and use command constants instead of hardcoded values

- Update documentation:
  - MQTT_CLIENT.rst: Document asyncio.wrap_future approach
  - CHANGELOG.rst: Update non-blocking implementation description

Benefits:
- More efficient: No thread pool resources used
- Simpler code: 51 fewer lines
- Truly async: AWS SDK Futures are already non-blocking
- Better performance: No thread context switching overhead

The public API remains unchanged - this is a pure internal optimization.

Fixes #<issue-number>
Add guidance to run `make ci-lint` before finalizing changes to ensure
code will pass CI checks. This helps prevent "passes locally but fails
in CI" issues by running the exact same linting configuration as the
CI pipeline.
@eman eman requested a review from Copilot October 17, 2025 00:32
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the MQTT client implementation to replace thread pool execution with direct asyncio Future wrapping for better async performance. The key motivation is to leverage the fact that AWS IoT SDK already returns non-blocking Future objects.

  • Replace run_in_executor() calls with asyncio.wrap_future() for all MQTT operations
  • Fix race condition in anti-legionella example by filtering status responses by command code
  • Update documentation to reflect the new implementation approach

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/nwp500/mqtt_client.py Replaced thread pool execution with asyncio.wrap_future for connect, disconnect, subscribe, unsubscribe, and publish operations
examples/anti_legionella_example.py Fixed race condition by filtering status responses by command code and importing command constants
docs/MQTT_CLIENT.rst Updated implementation details to document asyncio.wrap_future approach
CHANGELOG.rst Updated description of non-blocking implementation to mention wrap_future
.github/copilot-instructions.md Added CI-compatible linting commands and pre-commit guidelines

Add return statement and type annotation to unsubscribe() to match
AWS SDK behavior and maintain consistency with subscribe() and publish()
methods.

Changes:
- Add return type annotation (-> int) to unsubscribe()
- Return packet_id from unsubscribe operation
- Update docstring to document return value

Addresses PR review comments about missing packet_id return value.
The AWS SDK provides packet_id for all operations (subscribe, unsubscribe,
publish) which can be useful for tracking MQTT packet lifecycle.
@eman
Copy link
Copy Markdown
Owner Author

eman commented Oct 17, 2025

Review Comments Addressed

I've reviewed all three comments about packet_id handling:

Comment 1: publish() packet_id

Already correct - The publish() method correctly returns packet_id on line 828. The packet_id is captured from the AWS SDK's tuple (Future, packet_id) and returned after awaiting the Future.

Comment 2: subscribe() packet_id

Already correct - The subscribe() method correctly returns packet_id on line 750. The packet_id is captured from the AWS SDK's tuple and returned after awaiting the Future.

Comment 3: unsubscribe() packet_id

Fixed in commit d9812b2 - Added:

  • Return type annotation (-> int)
  • Return statement for packet_id
  • Updated docstring to document the return value

The unsubscribe() method now maintains consistency with subscribe() and publish() by returning the packet_id from the AWS SDK.

All methods now properly return packet_ids that can be used for tracking MQTT packet lifecycle and QoS guarantees.

@eman eman merged commit b32f26d into main Oct 17, 2025
10 checks passed
@eman eman deleted the feature/asyncio-wrap-future branch October 17, 2025 00:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants