From e149cba98316d333933f5b645d75a2923f56b92a Mon Sep 17 00:00:00 2001 From: Emmanuel Levijarvi Date: Fri, 31 Oct 2025 16:27:58 -0700 Subject: [PATCH 1/2] Fix InvalidStateError when cancelling MQTT futures during disconnect Problem: When disconnect() was called while MQTT operations were in progress, asyncio.wrap_future() would cancel the underlying concurrent.futures.Future. Later, when AWS CRT callbacks tried to set exceptions on these already-cancelled futures, Python raised InvalidStateError, which was then ignored but logged as 'Exception ignored'. Solution: Wrap all asyncio.wrap_future() calls with asyncio.shield() to prevent cancellation from propagating to the underlying concurrent.futures.Future objects. This allows: - The outer asyncio context to be cancelled cleanly - AWS CRT futures to complete independently - No InvalidStateError exceptions Changes: - src/nwp500/mqtt_connection.py: Shield connect(), disconnect(), subscribe(), unsubscribe(), and publish() operations - src/nwp500/mqtt_subscriptions.py: Shield subscribe() and unsubscribe() operations Testing: - All 123 tests pass - Type checking passes (mypy) - Linting passes (ruff) - No InvalidStateError exceptions during disconnect --- src/nwp500/mqtt_connection.py | 38 +++++++++++++++++++++++++++----- src/nwp500/mqtt_subscriptions.py | 10 +++++++-- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/src/nwp500/mqtt_connection.py b/src/nwp500/mqtt_connection.py index d149a8b..6185da3 100644 --- a/src/nwp500/mqtt_connection.py +++ b/src/nwp500/mqtt_connection.py @@ -140,9 +140,13 @@ async def connect(self) -> bool: _logger.info("Establishing MQTT connection...") # Convert concurrent.futures.Future to asyncio.Future and await + # Use shield to prevent cancellation from propagating to + # underlying future if self._connection is not None: connect_future = self._connection.connect() - connect_result = await asyncio.wrap_future(connect_future) + connect_result = await asyncio.shield( + asyncio.wrap_future(connect_future) + ) else: raise MqttConnectionError("Connection not initialized") @@ -196,8 +200,10 @@ async def disconnect(self) -> None: try: # Convert concurrent.futures.Future to asyncio.Future and await + # Use shield to prevent cancellation from propagating to + # underlying future disconnect_future = self._connection.disconnect() - await asyncio.wrap_future(disconnect_future) + await asyncio.shield(asyncio.wrap_future(disconnect_future)) self._connected = False self._connection = None @@ -232,10 +238,12 @@ async def subscribe( _logger.debug(f"Subscribing to topic: {topic}") # Convert concurrent.futures.Future to asyncio.Future and await + # Use shield to prevent cancellation from propagating to + # underlying future subscribe_future, packet_id = self._connection.subscribe( topic=topic, qos=qos, callback=callback ) - await asyncio.wrap_future(subscribe_future) + await asyncio.shield(asyncio.wrap_future(subscribe_future)) _logger.info(f"Subscribed to '{topic}' with packet_id {packet_id}") return (subscribe_future, packet_id) @@ -259,10 +267,12 @@ async def unsubscribe(self, topic: str) -> int: _logger.debug(f"Unsubscribing from topic: {topic}") # Convert concurrent.futures.Future to asyncio.Future and await + # Use shield to prevent cancellation from propagating to + # underlying future unsubscribe_future, packet_id = self._connection.unsubscribe( topic=topic ) - await asyncio.wrap_future(unsubscribe_future) + await asyncio.shield(asyncio.wrap_future(unsubscribe_future)) _logger.info(f"Unsubscribed from '{topic}' with packet_id {packet_id}") return int(packet_id) @@ -286,6 +296,7 @@ async def publish( Raises: RuntimeError: If not connected + asyncio.CancelledError: If operation cancelled during disconnect """ if not self._connected or not self._connection: raise MqttNotConnectedError("Not connected to MQTT broker") @@ -303,11 +314,26 @@ async def publish( # Try to JSON encode other types payload_bytes = json.dumps(payload).encode("utf-8") - # Convert concurrent.futures.Future to asyncio.Future and await + # Publish and get the concurrent.futures.Future publish_future, packet_id = self._connection.publish( topic=topic, payload=payload_bytes, qos=qos ) - await asyncio.wrap_future(publish_future) + + # Shield the operation to prevent cancellation from propagating to + # the underlying concurrent.futures.Future. This avoids + # InvalidStateError when AWS CRT tries to set exception on a + # cancelled future. + try: + await asyncio.shield(asyncio.wrap_future(publish_future)) + except asyncio.CancelledError: + # Shield was cancelled - the underlying publish will complete + # independently, preventing InvalidStateError in AWS CRT + # callbacks + _logger.debug( + f"Publish to '{topic}' was cancelled but will complete " + "in background" + ) + raise _logger.debug(f"Published to '{topic}' with packet_id {packet_id}") return int(packet_id) diff --git a/src/nwp500/mqtt_subscriptions.py b/src/nwp500/mqtt_subscriptions.py index 42eb5ab..c397b4d 100644 --- a/src/nwp500/mqtt_subscriptions.py +++ b/src/nwp500/mqtt_subscriptions.py @@ -214,10 +214,14 @@ async def subscribe( try: # Convert concurrent.futures.Future to asyncio.Future and await + # Use shield to prevent cancellation from propagating to + # underlying future subscribe_future, packet_id = self._connection.subscribe( topic=topic, qos=qos, callback=self._on_message_received ) - subscribe_result = await asyncio.wrap_future(subscribe_future) + subscribe_result = await asyncio.shield( + asyncio.wrap_future(subscribe_future) + ) _logger.info( f"Subscription succeeded (topic redacted) with QoS " @@ -259,8 +263,10 @@ async def unsubscribe(self, topic: str) -> int: try: # Convert concurrent.futures.Future to asyncio.Future and await + # Use shield to prevent cancellation from propagating to + # underlying future unsubscribe_future, packet_id = self._connection.unsubscribe(topic) - await asyncio.wrap_future(unsubscribe_future) + await asyncio.shield(asyncio.wrap_future(unsubscribe_future)) # Remove from tracking self._subscriptions.pop(topic, None) From 38d43a8580a81334d5e661c08ac3a63d9c2ccb04 Mon Sep 17 00:00:00 2001 From: Emmanuel Levijarvi Date: Fri, 31 Oct 2025 16:43:28 -0700 Subject: [PATCH 2/2] Add consistent cancellation handling to all shielded operations Apply uniform CancelledError handling across all MQTT operations for better debugging and consistent behavior: - connect(): Log when connection attempt is cancelled - disconnect(): Log when disconnection is cancelled - subscribe(): Log when subscription is cancelled - unsubscribe(): Log when unsubscription is cancelled - publish(): Already had this pattern This ensures all operations provide debug logging when cancellation occurs, making it easier to diagnose cancellation-related issues while maintaining the InvalidStateError fix. Addresses code review feedback to apply consistent pattern across all shielded asyncio.wrap_future() calls. --- src/nwp500/mqtt_connection.py | 52 ++++++++++++++++++++++++++++---- src/nwp500/mqtt_subscriptions.py | 28 ++++++++++++++--- 2 files changed, 70 insertions(+), 10 deletions(-) diff --git a/src/nwp500/mqtt_connection.py b/src/nwp500/mqtt_connection.py index 6185da3..ad1e515 100644 --- a/src/nwp500/mqtt_connection.py +++ b/src/nwp500/mqtt_connection.py @@ -144,9 +144,19 @@ async def connect(self) -> bool: # underlying future if self._connection is not None: connect_future = self._connection.connect() - connect_result = await asyncio.shield( - asyncio.wrap_future(connect_future) - ) + try: + connect_result = await asyncio.shield( + asyncio.wrap_future(connect_future) + ) + except asyncio.CancelledError: + # Shield was cancelled - the underlying connect will + # complete independently, preventing InvalidStateError + # in AWS CRT callbacks + _logger.debug( + "Connect operation was cancelled but will complete " + "in background" + ) + raise else: raise MqttConnectionError("Connection not initialized") @@ -203,7 +213,17 @@ async def disconnect(self) -> None: # Use shield to prevent cancellation from propagating to # underlying future disconnect_future = self._connection.disconnect() - await asyncio.shield(asyncio.wrap_future(disconnect_future)) + try: + await asyncio.shield(asyncio.wrap_future(disconnect_future)) + except asyncio.CancelledError: + # Shield was cancelled - the underlying disconnect will + # complete independently, preventing InvalidStateError + # in AWS CRT callbacks + _logger.debug( + "Disconnect operation was cancelled but will complete " + "in background" + ) + raise self._connected = False self._connection = None @@ -243,7 +263,17 @@ async def subscribe( subscribe_future, packet_id = self._connection.subscribe( topic=topic, qos=qos, callback=callback ) - await asyncio.shield(asyncio.wrap_future(subscribe_future)) + try: + await asyncio.shield(asyncio.wrap_future(subscribe_future)) + except asyncio.CancelledError: + # Shield was cancelled - the underlying subscribe will + # complete independently, preventing InvalidStateError + # in AWS CRT callbacks + _logger.debug( + f"Subscribe to '{topic}' was cancelled but will complete " + "in background" + ) + raise _logger.info(f"Subscribed to '{topic}' with packet_id {packet_id}") return (subscribe_future, packet_id) @@ -272,7 +302,17 @@ async def unsubscribe(self, topic: str) -> int: unsubscribe_future, packet_id = self._connection.unsubscribe( topic=topic ) - await asyncio.shield(asyncio.wrap_future(unsubscribe_future)) + try: + await asyncio.shield(asyncio.wrap_future(unsubscribe_future)) + except asyncio.CancelledError: + # Shield was cancelled - the underlying unsubscribe will + # complete independently, preventing InvalidStateError + # in AWS CRT callbacks + _logger.debug( + f"Unsubscribe from '{topic}' was cancelled but will " + "complete in background" + ) + raise _logger.info(f"Unsubscribed from '{topic}' with packet_id {packet_id}") return int(packet_id) diff --git a/src/nwp500/mqtt_subscriptions.py b/src/nwp500/mqtt_subscriptions.py index c397b4d..e4e37a2 100644 --- a/src/nwp500/mqtt_subscriptions.py +++ b/src/nwp500/mqtt_subscriptions.py @@ -219,9 +219,19 @@ async def subscribe( subscribe_future, packet_id = self._connection.subscribe( topic=topic, qos=qos, callback=self._on_message_received ) - subscribe_result = await asyncio.shield( - asyncio.wrap_future(subscribe_future) - ) + try: + subscribe_result = await asyncio.shield( + asyncio.wrap_future(subscribe_future) + ) + except asyncio.CancelledError: + # Shield was cancelled - the underlying subscribe will + # complete independently, preventing InvalidStateError + # in AWS CRT callbacks + _logger.debug( + f"Subscribe to '{redact_topic(topic)}' was cancelled " + "but will complete in background" + ) + raise _logger.info( f"Subscription succeeded (topic redacted) with QoS " @@ -266,7 +276,17 @@ async def unsubscribe(self, topic: str) -> int: # Use shield to prevent cancellation from propagating to # underlying future unsubscribe_future, packet_id = self._connection.unsubscribe(topic) - await asyncio.shield(asyncio.wrap_future(unsubscribe_future)) + try: + await asyncio.shield(asyncio.wrap_future(unsubscribe_future)) + except asyncio.CancelledError: + # Shield was cancelled - the underlying unsubscribe will + # complete independently, preventing InvalidStateError + # in AWS CRT callbacks + _logger.debug( + f"Unsubscribe from '{redact_topic(topic)}' was " + "cancelled but will complete in background" + ) + raise # Remove from tracking self._subscriptions.pop(topic, None)