Skip to content

Commit 9563350

Browse files
committed
Non-Blocking Async Operations for AWS IOT library
1 parent 98360ed commit 9563350

4 files changed

Lines changed: 110 additions & 30 deletions

File tree

CHANGELOG.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,16 @@ Version 0.2 (Unreleased)
88
Added
99
-----
1010

11+
- **Non-Blocking MQTT Operations**: All AWS IoT SDK operations are now non-blocking
12+
13+
- All MQTT operations (connect, disconnect, subscribe, unsubscribe, publish) use ``asyncio.run_in_executor()``
14+
- Eliminates "blocking I/O detected" warnings in Home Assistant and other async applications
15+
- Fully compatible with async event loops without blocking other operations
16+
- No API changes required - existing code works without modification
17+
- Maintains full performance and reliability of the underlying AWS IoT SDK
18+
- Safe for use in Home Assistant custom integrations and other async applications
19+
- Updated documentation with non-blocking implementation details
20+
1121
- **Event Emitter Pattern (Phase 1)**: Event-driven architecture for device state changes
1222

1323
- ``EventEmitter`` base class with multiple listeners per event

README.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Features
1515
* **Operation Mode Control**: Switch between Heat Pump, Energy Saver, High Demand, Electric, and Vacation modes
1616
* **Comprehensive Status Data**: Access to 70+ device status fields including compressor status, heater status, flow rates, and more
1717
* **MQTT Protocol Support**: Low-level MQTT communication with Navien devices
18+
* **Non-Blocking Async Operations**: Fully compatible with async event loops (Home Assistant safe)
1819
* **Automatic Reconnection**: Reconnects automatically with exponential backoff during network interruptions
1920
* **Command Queuing**: Commands sent while disconnected are queued and sent automatically when reconnected
2021
* **Data Models**: Type-safe data classes with automatic unit conversions

docs/MQTT_CLIENT.rst

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ enables:
1212
- Device control (temperature, mode, power)
1313
- Bidirectional communication over MQTT
1414
- Automatic reconnection and error handling
15+
- **Non-blocking async operations** (compatible with Home Assistant and other async applications)
16+
17+
The client is designed to be fully non-blocking and integrates seamlessly
18+
with async event loops, avoiding the "blocking I/O detected" warnings
19+
commonly seen in Home Assistant and similar applications.
1520

1621
Prerequisites
1722
-------------
@@ -778,6 +783,45 @@ Error Handling
778783
Advanced Usage
779784
--------------
780785

786+
Non-Blocking Implementation
787+
~~~~~~~~~~~~~~~~~~~~~~~~~~~
788+
789+
The MQTT client is designed to be fully compatible with async event loops
790+
and will not block or interfere with other async operations. This makes it
791+
suitable for integration with Home Assistant, web servers, and other
792+
async applications.
793+
794+
**Implementation Details:**
795+
796+
- All AWS IoT SDK operations that could block are wrapped with ``asyncio.run_in_executor()``
797+
- Connection, disconnection, subscription, and publishing operations are non-blocking
798+
- The client maintains full compatibility with the existing API
799+
- No additional configuration required for non-blocking behavior
800+
801+
**Home Assistant Integration:**
802+
803+
.. code:: python
804+
805+
# Safe for use in Home Assistant custom integrations
806+
class MyCoordinator(DataUpdateCoordinator):
807+
async def _async_update_data(self):
808+
# This will not trigger "blocking I/O detected" warnings
809+
await self.mqtt_client.request_device_status(self.device)
810+
return self.latest_data
811+
812+
**Concurrent Operations:**
813+
814+
.. code:: python
815+
816+
# MQTT operations will not block other async tasks
817+
async def main():
818+
# Both tasks run concurrently without blocking
819+
await asyncio.gather(
820+
mqtt_client.connect(),
821+
some_other_async_operation(),
822+
web_server.start(),
823+
)
824+
781825
Custom Connection Configuration
782826
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
783827

src/nwp500/mqtt_client.py

Lines changed: 55 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -453,21 +453,31 @@ async def connect(self) -> bool:
453453

454454
try:
455455
# Build WebSocket MQTT connection with AWS credentials
456-
self._connection = mqtt_connection_builder.websockets_with_default_aws_signing(
457-
endpoint=self.config.endpoint,
458-
region=self.config.region,
459-
credentials_provider=self._create_credentials_provider(),
460-
client_id=self.config.client_id,
461-
clean_session=self.config.clean_session,
462-
keep_alive_secs=self.config.keep_alive_secs,
463-
on_connection_interrupted=self._on_connection_interrupted_internal,
464-
on_connection_resumed=self._on_connection_resumed_internal,
465-
)
456+
# Run the connection building in a thread pool to avoid blocking I/O
457+
def _build_connection():
458+
return mqtt_connection_builder.websockets_with_default_aws_signing(
459+
endpoint=self.config.endpoint,
460+
region=self.config.region,
461+
credentials_provider=self._create_credentials_provider(),
462+
client_id=self.config.client_id,
463+
clean_session=self.config.clean_session,
464+
keep_alive_secs=self.config.keep_alive_secs,
465+
on_connection_interrupted=self._on_connection_interrupted_internal,
466+
on_connection_resumed=self._on_connection_resumed_internal,
467+
)
468+
469+
# Run connection builder in thread pool to avoid blocking I/O
470+
self._connection = await self._loop.run_in_executor(None, _build_connection)
466471

467472
# Connect
468473
_logger.info("Establishing MQTT connection...")
469-
connect_future = self._connection.connect()
470-
connect_result = connect_future.result()
474+
475+
# Run the connect operation in a thread pool to avoid blocking I/O
476+
def _connect():
477+
connect_future = self._connection.connect()
478+
return connect_future.result()
479+
480+
connect_result = await self._loop.run_in_executor(None, _connect)
471481

472482
self._connected = True
473483
self._reconnect_attempts = 0 # Reset on successful connection
@@ -518,8 +528,13 @@ async def disconnect(self):
518528
await self.stop_all_periodic_tasks()
519529

520530
try:
521-
disconnect_future = self._connection.disconnect()
522-
disconnect_future.result()
531+
# Run disconnect operation in thread pool to avoid blocking I/O
532+
def _disconnect():
533+
disconnect_future = self._connection.disconnect()
534+
return disconnect_future.result()
535+
536+
await self._loop.run_in_executor(None, _disconnect)
537+
523538
self._connected = False
524539
self._connection = None
525540
_logger.info("Disconnected successfully")
@@ -613,13 +628,16 @@ async def subscribe(
613628
_logger.info(f"Subscribing to topic: {topic}")
614629

615630
try:
616-
# Subscribe via MQTT connection
617-
subscribe_future, packet_id = self._connection.subscribe(
618-
topic=topic, qos=qos, callback=self._on_message_received
619-
)
620-
621-
# Wait for subscription acknowledgment
622-
subscribe_result = subscribe_future.result()
631+
# Run subscribe operation in thread pool to avoid blocking I/O
632+
def _subscribe():
633+
subscribe_future, packet_id = self._connection.subscribe(
634+
topic=topic, qos=qos, callback=self._on_message_received
635+
)
636+
subscribe_result = subscribe_future.result()
637+
return subscribe_result, packet_id
638+
639+
subscribe_result, packet_id = await self._loop.run_in_executor(None, _subscribe)
640+
623641
_logger.info(f"Subscribed to '{topic}' with QoS {subscribe_result['qos']}")
624642

625643
# Store subscription and handler
@@ -647,8 +665,12 @@ async def unsubscribe(self, topic: str):
647665
_logger.info(f"Unsubscribing from topic: {topic}")
648666

649667
try:
650-
unsubscribe_future, packet_id = self._connection.unsubscribe(topic)
651-
unsubscribe_future.result()
668+
# Run unsubscribe operation in thread pool to avoid blocking I/O
669+
def _unsubscribe():
670+
unsubscribe_future, packet_id = self._connection.unsubscribe(topic)
671+
return unsubscribe_future.result()
672+
673+
await self._loop.run_in_executor(None, _unsubscribe)
652674

653675
# Remove from tracking
654676
self._subscriptions.pop(topic, None)
@@ -698,13 +720,16 @@ async def publish(
698720
# Serialize to JSON
699721
payload_json = json.dumps(payload)
700722

701-
# Publish
702-
publish_future, packet_id = self._connection.publish(
703-
topic=topic, payload=payload_json, qos=qos
704-
)
705-
706-
# Wait for publish acknowledgment
707-
publish_future.result()
723+
# Run publish operation in thread pool to avoid blocking I/O
724+
def _publish():
725+
publish_future, packet_id = self._connection.publish(
726+
topic=topic, payload=payload_json, qos=qos
727+
)
728+
publish_future.result()
729+
return packet_id
730+
731+
packet_id = await self._loop.run_in_executor(None, _publish)
732+
708733
_logger.debug(f"Published to '{topic}' with packet_id {packet_id}")
709734

710735
return packet_id

0 commit comments

Comments
 (0)