diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 1025339..280714b 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -26,6 +26,8 @@ Always run these checks before finalizing changes to ensure your code will pass This prevents "passes locally but fails in CI" issues. +**Important**: When updating CHANGELOG.rst or any file with dates, always use `date +"%Y-%m-%d"` to get the correct current date. Never hardcode or guess dates. + ### After Completing a Task Always run these checks after completing a task to validate your changes: 1. **Type checking**: `python3 -m mypy src/nwp500 --config-file pyproject.toml` - Verify no type errors were introduced diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 098781b..d38cb4f 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,6 +2,25 @@ Changelog ========= +Version 3.1.3 (2025-10-24) +========================== + +Fixed +----- + +- **MQTT Reconnection**: Improved MQTT reconnection reliability with active reconnection + + - **Breaking Internal Change**: ``MqttReconnectionHandler`` now requires ``reconnect_func`` parameter (not Optional) + - Implemented active reconnection that always recreates MQTT connection on interruption + - Removed unreliable passive fallback to AWS IoT SDK automatic reconnection + - Added automatic connection state checking during reconnection attempts + - Now emits ``reconnection_failed`` event when max reconnection attempts are exhausted + - Improved error handling and logging during reconnection process + - Better recovery from WebSocket connection interruptions (AWS_ERROR_MQTT_UNEXPECTED_HANGUP) + - Resolves issues where connection would fail to recover after network interruptions + - Note: Public API unchanged - ``NavienMqttClient`` continues to work as before + - Compatible with existing auto-recovery examples (``auto_recovery_example.py``, ``simple_auto_recovery.py``) + Version 3.1.2 (2025-01-23) ========================== diff --git a/src/nwp500/mqtt_client.py b/src/nwp500/mqtt_client.py index efc4839..0fbf7c0 100644 --- a/src/nwp500/mqtt_client.py +++ b/src/nwp500/mqtt_client.py @@ -279,6 +279,72 @@ async def _send_queued_commands_internal(self) -> None: self._connection_manager.publish, lambda: self._connected ) + async def _active_reconnect(self) -> None: + """ + Actively trigger a reconnection attempt. + + This method is called by the reconnection handler to actively + reconnect instead of passively waiting for AWS IoT SDK. + + Note: This creates a new connection while preserving subscriptions + and configuration. + """ + if self._connected: + _logger.debug("Already connected, skipping reconnection") + return + + _logger.info("Attempting active reconnection...") + + try: + # Ensure tokens are still valid + await self._auth_client.ensure_valid_token() + + # If we have a connection manager, try to reconnect using it + if self._connection_manager: + # The connection might be in a bad state, so we need to + # recreate the underlying connection + _logger.debug("Recreating MQTT connection...") + + # Create a new connection manager with same config + old_connection_manager = self._connection_manager + self._connection_manager = MqttConnection( + config=self.config, + auth_client=self._auth_client, + on_connection_interrupted=self._on_connection_interrupted_internal, + on_connection_resumed=self._on_connection_resumed_internal, + ) + + # Try to connect + success = await self._connection_manager.connect() + + if success: + # Update connection references + self._connection = self._connection_manager.connection + self._connected = True + + # Update subscription manager with new connection + if self._subscription_manager and self._connection: + self._subscription_manager.update_connection( + self._connection + ) + + _logger.info("Active reconnection successful") + else: + # Restore old connection manager and connection reference + self._connection_manager = old_connection_manager + self._connection = old_connection_manager.connection + _logger.warning("Active reconnection failed") + else: + _logger.warning( + "No connection manager available for reconnection" + ) + + except Exception as e: + _logger.error( + f"Error during active reconnection: {e}", exc_info=True + ) + raise + async def connect(self) -> bool: """ Establish connection to AWS IoT Core. @@ -327,6 +393,8 @@ async def connect(self) -> bool: config=self.config, is_connected_func=lambda: self._connected, schedule_coroutine_func=self._schedule_coroutine, + reconnect_func=self._active_reconnect, + emit_event_func=self.emit, ) self._reconnection_handler.enable() diff --git a/src/nwp500/mqtt_reconnection.py b/src/nwp500/mqtt_reconnection.py index e6512f6..f223a5d 100644 --- a/src/nwp500/mqtt_reconnection.py +++ b/src/nwp500/mqtt_reconnection.py @@ -8,6 +8,7 @@ import asyncio import contextlib import logging +from collections.abc import Awaitable from typing import TYPE_CHECKING, Any, Callable, Optional if TYPE_CHECKING: @@ -33,6 +34,8 @@ def __init__( config: "MqttConnectionConfig", is_connected_func: Callable[[], bool], schedule_coroutine_func: Callable[[Any], None], + reconnect_func: Callable[[], Awaitable[None]], + emit_event_func: Optional[Callable[..., Awaitable[Any]]] = None, ): """ Initialize reconnection handler. @@ -42,10 +45,15 @@ def __init__( is_connected_func: Function to check if currently connected schedule_coroutine_func: Function to schedule coroutines from any thread + reconnect_func: Async function to trigger active reconnection + emit_event_func: Optional async function to emit events + (e.g., EventEmitter.emit) """ self.config = config self._is_connected_func = is_connected_func self._schedule_coroutine = schedule_coroutine_func + self._reconnect_func = reconnect_func + self._emit_event = emit_event_func self._reconnect_attempts = 0 self._reconnect_task: Optional[asyncio.Task[None]] = None @@ -156,24 +164,54 @@ async def _reconnect_with_backoff(self) -> None: try: await asyncio.sleep(delay) - # AWS IoT SDK will handle the actual reconnection automatically - # We just need to wait and monitor the connection state - _logger.debug( - "Waiting for AWS IoT SDK automatic reconnection..." - ) + # Check if we're already connected (AWS SDK auto-reconnected) + if self._is_connected_func(): + _logger.info( + "AWS IoT SDK automatically reconnected during delay" + ) + break + + # Trigger active reconnection + _logger.info("Triggering active reconnection...") + try: + await self._reconnect_func() + if self._is_connected_func(): + _logger.info("Successfully reconnected") + break + except Exception as e: + _logger.warning( + f"Active reconnection failed: {e}. " + "Will retry if attempts remain." + ) except asyncio.CancelledError: _logger.info("Reconnection task cancelled") break except Exception as e: - _logger.error(f"Error during reconnection attempt: {e}") + _logger.error( + f"Error during reconnection attempt: {e}", exc_info=True + ) - if self._reconnect_attempts >= self.config.max_reconnect_attempts: + # Check final state + if ( + self._reconnect_attempts >= self.config.max_reconnect_attempts + and not self._is_connected_func() + ): _logger.error( f"Failed to reconnect after " f"{self.config.max_reconnect_attempts} attempts. " "Manual reconnection required." ) + # Emit reconnection_failed event if event emitter is available + if self._emit_event: + try: + await self._emit_event( + "reconnection_failed", self._reconnect_attempts + ) + except Exception as e: + _logger.error( + f"Error emitting reconnection_failed event: {e}" + ) async def cancel(self) -> None: """Cancel any pending reconnection task.""" diff --git a/src/nwp500/mqtt_subscriptions.py b/src/nwp500/mqtt_subscriptions.py index e7d8426..c74999c 100644 --- a/src/nwp500/mqtt_subscriptions.py +++ b/src/nwp500/mqtt_subscriptions.py @@ -72,6 +72,24 @@ def subscriptions(self) -> dict[str, mqtt.QoS]: """Get current subscriptions.""" return self._subscriptions.copy() + def update_connection(self, connection: Any) -> None: + """ + Update the MQTT connection reference. + + This is used when the connection is recreated (e.g., after reconnection) + to update the internal reference while preserving subscriptions. + + Args: + connection: New MQTT connection object + + Note: + This does not re-establish subscriptions. Call the appropriate + subscribe methods to re-register subscriptions with the new + connection if needed. + """ + self._connection = connection + _logger.debug("Updated subscription manager connection reference") + def _on_message_received( self, topic: str, payload: bytes, **kwargs: Any ) -> None: