From 1959c7f0b2d84f66ab9d1f79bf256916fa8d1582 Mon Sep 17 00:00:00 2001 From: Emmanuel Levijarvi Date: Fri, 17 Oct 2025 13:10:19 -0700 Subject: [PATCH 1/2] fix: run MQTT connection setup in executor to avoid blocking event loop This resolves Home Assistant warnings about blocking calls to open() inside the event loop. The AWS IoT SDK performs synchronous file I/O operations when reading metadata files during connection setup. Changes: - Wrap mqtt_connection_builder.websockets_with_default_aws_signing() in executor - Move credentials provider creation into the executor - Add documentation explaining why executor is needed Fixes Home Assistant warning: 'Detected blocking call to open with args (...awsiotsdk...METADATA) inside the event loop by custom integration' This is a non-breaking change that maintains full API compatibility. --- src/nwp500/mqtt_client.py | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/nwp500/mqtt_client.py b/src/nwp500/mqtt_client.py index 5376ae2..d8ced28 100644 --- a/src/nwp500/mqtt_client.py +++ b/src/nwp500/mqtt_client.py @@ -569,16 +569,23 @@ async def connect(self) -> bool: try: # Build WebSocket MQTT connection with AWS credentials - self._connection = mqtt_connection_builder.websockets_with_default_aws_signing( - endpoint=self.config.endpoint, - region=self.config.region, - credentials_provider=self._create_credentials_provider(), - client_id=self.config.client_id, - clean_session=self.config.clean_session, - keep_alive_secs=self.config.keep_alive_secs, - on_connection_interrupted=self._on_connection_interrupted_internal, - on_connection_resumed=self._on_connection_resumed_internal, - ) + # Run this in an executor to avoid blocking the event loop + # The AWS IoT SDK performs synchronous file I/O operations during connection setup + def _build_connection(): + # Create credentials provider within the executor to avoid any blocking calls + credentials_provider = self._create_credentials_provider() + return mqtt_connection_builder.websockets_with_default_aws_signing( + endpoint=self.config.endpoint, + region=self.config.region, + credentials_provider=credentials_provider, + client_id=self.config.client_id, + clean_session=self.config.clean_session, + keep_alive_secs=self.config.keep_alive_secs, + on_connection_interrupted=self._on_connection_interrupted_internal, + on_connection_resumed=self._on_connection_resumed_internal, + ) + + self._connection = await self._loop.run_in_executor(None, _build_connection) # Connect _logger.info("Establishing MQTT connection...") From 9a6a8d2b81e490de466826e9b1de9deddabdc011 Mon Sep 17 00:00:00 2001 From: Emmanuel Levijarvi Date: Fri, 17 Oct 2025 13:16:52 -0700 Subject: [PATCH 2/2] refactor: use asyncio.to_thread() instead of run_in_executor pattern Replace the custom nested _build_connection + run_in_executor pattern with asyncio.to_thread() for clearer intent and better performance: - Uses asyncio.to_thread() for credentials provider creation - Uses asyncio.to_thread() for connection builder call - Avoids redefining functions on every connect() call - More explicit about threading blocking operations - Uses current event loop implicitly - Cleaner, more readable code This is the modern Python 3.9+ approach for handling blocking operations in async code. --- src/nwp500/mqtt_client.py | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/src/nwp500/mqtt_client.py b/src/nwp500/mqtt_client.py index d8ced28..74ba2cf 100644 --- a/src/nwp500/mqtt_client.py +++ b/src/nwp500/mqtt_client.py @@ -569,23 +569,20 @@ async def connect(self) -> bool: try: # Build WebSocket MQTT connection with AWS credentials - # Run this in an executor to avoid blocking the event loop + # Run blocking operations in a thread to avoid blocking the event loop # The AWS IoT SDK performs synchronous file I/O operations during connection setup - def _build_connection(): - # Create credentials provider within the executor to avoid any blocking calls - credentials_provider = self._create_credentials_provider() - return mqtt_connection_builder.websockets_with_default_aws_signing( - endpoint=self.config.endpoint, - region=self.config.region, - credentials_provider=credentials_provider, - client_id=self.config.client_id, - clean_session=self.config.clean_session, - keep_alive_secs=self.config.keep_alive_secs, - on_connection_interrupted=self._on_connection_interrupted_internal, - on_connection_resumed=self._on_connection_resumed_internal, - ) - - self._connection = await self._loop.run_in_executor(None, _build_connection) + credentials_provider = await asyncio.to_thread(self._create_credentials_provider) + self._connection = await asyncio.to_thread( + mqtt_connection_builder.websockets_with_default_aws_signing, + endpoint=self.config.endpoint, + region=self.config.region, + credentials_provider=credentials_provider, + client_id=self.config.client_id, + clean_session=self.config.clean_session, + keep_alive_secs=self.config.keep_alive_secs, + on_connection_interrupted=self._on_connection_interrupted_internal, + on_connection_resumed=self._on_connection_resumed_internal, + ) # Connect _logger.info("Establishing MQTT connection...")