From d4b64afbac40efac5a74175a1c206301bb35f70c Mon Sep 17 00:00:00 2001 From: Emmanuel Levijarvi Date: Fri, 24 Oct 2025 14:52:39 -0700 Subject: [PATCH 1/4] Fix: Improve MQTT reconnection reliability with active reconnection - Implement active reconnection that recreates MQTT connection on interruption - Remove unreliable passive fallback to AWS IoT SDK automatic reconnection - Make reconnect_func required parameter in MqttReconnectionHandler - Add _active_reconnect() method to NavienMqttClient - Emit reconnection_failed event when max attempts are exhausted - Improve error handling and logging during reconnection - Resolves AWS_ERROR_MQTT_UNEXPECTED_HANGUP connection failures Breaking Internal Change: - MqttReconnectionHandler now requires reconnect_func parameter - Public API (NavienMqttClient) unchanged - no user impact Fixes connection issues where reconnection would fail after network interruptions, causing 'Failed to reconnect after 10 attempts' errors. --- .github/copilot-instructions.md | 2 + CHANGELOG.rst | 19 ++++++++++ src/nwp500/mqtt_client.py | 67 +++++++++++++++++++++++++++++++++ src/nwp500/mqtt_reconnection.py | 50 ++++++++++++++++++++---- 4 files changed, 131 insertions(+), 7 deletions(-) diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 1025339..280714b 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -26,6 +26,8 @@ Always run these checks before finalizing changes to ensure your code will pass This prevents "passes locally but fails in CI" issues. +**Important**: When updating CHANGELOG.rst or any file with dates, always use `date +"%Y-%m-%d"` to get the correct current date. Never hardcode or guess dates. + ### After Completing a Task Always run these checks after completing a task to validate your changes: 1. **Type checking**: `python3 -m mypy src/nwp500 --config-file pyproject.toml` - Verify no type errors were introduced diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 098781b..d38cb4f 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,6 +2,25 @@ Changelog ========= +Version 3.1.3 (2025-10-24) +========================== + +Fixed +----- + +- **MQTT Reconnection**: Improved MQTT reconnection reliability with active reconnection + + - **Breaking Internal Change**: ``MqttReconnectionHandler`` now requires ``reconnect_func`` parameter (not Optional) + - Implemented active reconnection that always recreates MQTT connection on interruption + - Removed unreliable passive fallback to AWS IoT SDK automatic reconnection + - Added automatic connection state checking during reconnection attempts + - Now emits ``reconnection_failed`` event when max reconnection attempts are exhausted + - Improved error handling and logging during reconnection process + - Better recovery from WebSocket connection interruptions (AWS_ERROR_MQTT_UNEXPECTED_HANGUP) + - Resolves issues where connection would fail to recover after network interruptions + - Note: Public API unchanged - ``NavienMqttClient`` continues to work as before + - Compatible with existing auto-recovery examples (``auto_recovery_example.py``, ``simple_auto_recovery.py``) + Version 3.1.2 (2025-01-23) ========================== diff --git a/src/nwp500/mqtt_client.py b/src/nwp500/mqtt_client.py index efc4839..06b8c53 100644 --- a/src/nwp500/mqtt_client.py +++ b/src/nwp500/mqtt_client.py @@ -279,6 +279,71 @@ async def _send_queued_commands_internal(self) -> None: self._connection_manager.publish, lambda: self._connected ) + async def _active_reconnect(self) -> None: + """ + Actively trigger a reconnection attempt. + + This method is called by the reconnection handler to actively + reconnect instead of passively waiting for AWS IoT SDK. + + Note: This creates a new connection while preserving subscriptions + and configuration. + """ + if self._connected: + _logger.debug("Already connected, skipping reconnection") + return + + _logger.info("Attempting active reconnection...") + + try: + # Ensure tokens are still valid + await self._auth_client.ensure_valid_token() + + # If we have a connection manager, try to reconnect using it + if self._connection_manager: + # The connection might be in a bad state, so we need to + # recreate the underlying connection + _logger.debug("Recreating MQTT connection...") + + # Create a new connection manager with same config + old_connection_manager = self._connection_manager + self._connection_manager = MqttConnection( + config=self.config, + auth_client=self._auth_client, + on_connection_interrupted=self._on_connection_interrupted_internal, + on_connection_resumed=self._on_connection_resumed_internal, + ) + + # Try to connect + success = await self._connection_manager.connect() + + if success: + # Update connection references + self._connection = self._connection_manager.connection + self._connected = True + + # Update subscription manager with new connection + if self._subscription_manager and self._connection: + self._subscription_manager._connection = ( + self._connection + ) + + _logger.info("Active reconnection successful") + else: + # Restore old connection manager + self._connection_manager = old_connection_manager + _logger.warning("Active reconnection failed") + else: + _logger.warning( + "No connection manager available for reconnection" + ) + + except Exception as e: + _logger.error( + f"Error during active reconnection: {e}", exc_info=True + ) + raise + async def connect(self) -> bool: """ Establish connection to AWS IoT Core. @@ -327,6 +392,8 @@ async def connect(self) -> bool: config=self.config, is_connected_func=lambda: self._connected, schedule_coroutine_func=self._schedule_coroutine, + reconnect_func=self._active_reconnect, + emit_event_func=self.emit, ) self._reconnection_handler.enable() diff --git a/src/nwp500/mqtt_reconnection.py b/src/nwp500/mqtt_reconnection.py index e6512f6..5d4d1fb 100644 --- a/src/nwp500/mqtt_reconnection.py +++ b/src/nwp500/mqtt_reconnection.py @@ -33,6 +33,8 @@ def __init__( config: "MqttConnectionConfig", is_connected_func: Callable[[], bool], schedule_coroutine_func: Callable[[Any], None], + reconnect_func: Callable[[], Any], + emit_event_func: Optional[Callable[[str, Any], Any]] = None, ): """ Initialize reconnection handler. @@ -42,10 +44,14 @@ def __init__( is_connected_func: Function to check if currently connected schedule_coroutine_func: Function to schedule coroutines from any thread + reconnect_func: Function to trigger active reconnection + emit_event_func: Optional function to emit events """ self.config = config self._is_connected_func = is_connected_func self._schedule_coroutine = schedule_coroutine_func + self._reconnect_func = reconnect_func + self._emit_event = emit_event_func self._reconnect_attempts = 0 self._reconnect_task: Optional[asyncio.Task[None]] = None @@ -156,24 +162,54 @@ async def _reconnect_with_backoff(self) -> None: try: await asyncio.sleep(delay) - # AWS IoT SDK will handle the actual reconnection automatically - # We just need to wait and monitor the connection state - _logger.debug( - "Waiting for AWS IoT SDK automatic reconnection..." - ) + # Check if we're already connected (AWS SDK auto-reconnected) + if self._is_connected_func(): + _logger.info( + "AWS IoT SDK automatically reconnected during delay" + ) + break + + # Trigger active reconnection + _logger.info("Triggering active reconnection...") + try: + await self._reconnect_func() + if self._is_connected_func(): + _logger.info("Successfully reconnected") + break + except Exception as e: + _logger.warning( + f"Active reconnection failed: {e}. " + "Will retry if attempts remain." + ) except asyncio.CancelledError: _logger.info("Reconnection task cancelled") break except Exception as e: - _logger.error(f"Error during reconnection attempt: {e}") + _logger.error( + f"Error during reconnection attempt: {e}", exc_info=True + ) - if self._reconnect_attempts >= self.config.max_reconnect_attempts: + # Check final state + if ( + self._reconnect_attempts >= self.config.max_reconnect_attempts + and not self._is_connected_func() + ): _logger.error( f"Failed to reconnect after " f"{self.config.max_reconnect_attempts} attempts. " "Manual reconnection required." ) + # Emit reconnection_failed event if event emitter is available + if self._emit_event: + try: + await self._emit_event( + "reconnection_failed", self._reconnect_attempts + ) + except Exception as e: + _logger.error( + f"Error emitting reconnection_failed event: {e}" + ) async def cancel(self) -> None: """Cancel any pending reconnection task.""" From 0086133283980c6bdc913b8940c7575c4cfaa753 Mon Sep 17 00:00:00 2001 From: Emmanuel Levijarvi Date: Fri, 24 Oct 2025 14:58:32 -0700 Subject: [PATCH 2/4] Fix: Restore connection reference when reconnection fails When restoring old connection manager on failure, also restore the _connection reference to maintain consistent state. Prevents self._connection from pointing to a failed connection while self._connection_manager points to the old one. --- src/nwp500/mqtt_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/nwp500/mqtt_client.py b/src/nwp500/mqtt_client.py index 06b8c53..a5e9141 100644 --- a/src/nwp500/mqtt_client.py +++ b/src/nwp500/mqtt_client.py @@ -330,8 +330,9 @@ async def _active_reconnect(self) -> None: _logger.info("Active reconnection successful") else: - # Restore old connection manager + # Restore old connection manager and connection reference self._connection_manager = old_connection_manager + self._connection = old_connection_manager.connection _logger.warning("Active reconnection failed") else: _logger.warning( From e239cbdd8cac9d4b5f5dcc8e96b00c8abbde9a18 Mon Sep 17 00:00:00 2001 From: Emmanuel Levijarvi Date: Fri, 24 Oct 2025 15:01:08 -0700 Subject: [PATCH 3/4] Improve type hints for async callables in MqttReconnectionHandler - Change reconnect_func type from Callable[[], Any] to Callable[[], Awaitable[None]] - Change emit_event_func type to Callable[..., Awaitable[Any]] to match EventEmitter.emit signature - Add import for Awaitable from collections.abc - Update docstrings to clarify these are async callables - More accurate type hints for better type safety and IDE support --- src/nwp500/mqtt_reconnection.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/nwp500/mqtt_reconnection.py b/src/nwp500/mqtt_reconnection.py index 5d4d1fb..f223a5d 100644 --- a/src/nwp500/mqtt_reconnection.py +++ b/src/nwp500/mqtt_reconnection.py @@ -8,6 +8,7 @@ import asyncio import contextlib import logging +from collections.abc import Awaitable from typing import TYPE_CHECKING, Any, Callable, Optional if TYPE_CHECKING: @@ -33,8 +34,8 @@ def __init__( config: "MqttConnectionConfig", is_connected_func: Callable[[], bool], schedule_coroutine_func: Callable[[Any], None], - reconnect_func: Callable[[], Any], - emit_event_func: Optional[Callable[[str, Any], Any]] = None, + reconnect_func: Callable[[], Awaitable[None]], + emit_event_func: Optional[Callable[..., Awaitable[Any]]] = None, ): """ Initialize reconnection handler. @@ -44,8 +45,9 @@ def __init__( is_connected_func: Function to check if currently connected schedule_coroutine_func: Function to schedule coroutines from any thread - reconnect_func: Function to trigger active reconnection - emit_event_func: Optional function to emit events + reconnect_func: Async function to trigger active reconnection + emit_event_func: Optional async function to emit events + (e.g., EventEmitter.emit) """ self.config = config self._is_connected_func = is_connected_func From 87b6677c35692254d3384fdfedf722f7b927877a Mon Sep 17 00:00:00 2001 From: Emmanuel Levijarvi Date: Fri, 24 Oct 2025 15:03:03 -0700 Subject: [PATCH 4/4] Fix encapsulation: Add public update_connection() method to subscription manager - Add update_connection() public method to MqttSubscriptionManager - Replace direct access to _connection attribute with proper method call - Improves encapsulation and maintainability - Provides clear API for updating connection after reconnection - Includes documentation explaining the method's purpose --- src/nwp500/mqtt_client.py | 2 +- src/nwp500/mqtt_subscriptions.py | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/nwp500/mqtt_client.py b/src/nwp500/mqtt_client.py index a5e9141..0fbf7c0 100644 --- a/src/nwp500/mqtt_client.py +++ b/src/nwp500/mqtt_client.py @@ -324,7 +324,7 @@ async def _active_reconnect(self) -> None: # Update subscription manager with new connection if self._subscription_manager and self._connection: - self._subscription_manager._connection = ( + self._subscription_manager.update_connection( self._connection ) diff --git a/src/nwp500/mqtt_subscriptions.py b/src/nwp500/mqtt_subscriptions.py index e7d8426..c74999c 100644 --- a/src/nwp500/mqtt_subscriptions.py +++ b/src/nwp500/mqtt_subscriptions.py @@ -72,6 +72,24 @@ def subscriptions(self) -> dict[str, mqtt.QoS]: """Get current subscriptions.""" return self._subscriptions.copy() + def update_connection(self, connection: Any) -> None: + """ + Update the MQTT connection reference. + + This is used when the connection is recreated (e.g., after reconnection) + to update the internal reference while preserving subscriptions. + + Args: + connection: New MQTT connection object + + Note: + This does not re-establish subscriptions. Call the appropriate + subscribe methods to re-register subscriptions with the new + connection if needed. + """ + self._connection = connection + _logger.debug("Updated subscription manager connection reference") + def _on_message_received( self, topic: str, payload: bytes, **kwargs: Any ) -> None: