Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ Always run these checks before finalizing changes to ensure your code will pass

This prevents "passes locally but fails in CI" issues.

**Important**: When updating CHANGELOG.rst or any file with dates, always use `date +"%Y-%m-%d"` to get the correct current date. Never hardcode or guess dates.

### After Completing a Task
Always run these checks after completing a task to validate your changes:
1. **Type checking**: `python3 -m mypy src/nwp500 --config-file pyproject.toml` - Verify no type errors were introduced
Expand Down
19 changes: 19 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,25 @@
Changelog
=========

Version 3.1.3 (2025-10-24)
==========================

Fixed
-----

- **MQTT Reconnection**: Improved MQTT reconnection reliability with active reconnection

- **Breaking Internal Change**: ``MqttReconnectionHandler`` now requires ``reconnect_func`` parameter (not Optional)
- Implemented active reconnection that always recreates MQTT connection on interruption
- Removed unreliable passive fallback to AWS IoT SDK automatic reconnection
- Added automatic connection state checking during reconnection attempts
- Now emits ``reconnection_failed`` event when max reconnection attempts are exhausted
- Improved error handling and logging during reconnection process
- Better recovery from WebSocket connection interruptions (AWS_ERROR_MQTT_UNEXPECTED_HANGUP)
- Resolves issues where connection would fail to recover after network interruptions
- Note: Public API unchanged - ``NavienMqttClient`` continues to work as before
- Compatible with existing auto-recovery examples (``auto_recovery_example.py``, ``simple_auto_recovery.py``)

Version 3.1.2 (2025-01-23)
==========================

Expand Down
68 changes: 68 additions & 0 deletions src/nwp500/mqtt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,72 @@ async def _send_queued_commands_internal(self) -> None:
self._connection_manager.publish, lambda: self._connected
)

async def _active_reconnect(self) -> None:
"""
Actively trigger a reconnection attempt.

This method is called by the reconnection handler to actively
reconnect instead of passively waiting for AWS IoT SDK.

Note: This creates a new connection while preserving subscriptions
and configuration.
"""
if self._connected:
_logger.debug("Already connected, skipping reconnection")
return

_logger.info("Attempting active reconnection...")

try:
# Ensure tokens are still valid
await self._auth_client.ensure_valid_token()

# If we have a connection manager, try to reconnect using it
if self._connection_manager:
# The connection might be in a bad state, so we need to
# recreate the underlying connection
_logger.debug("Recreating MQTT connection...")

# Create a new connection manager with same config
old_connection_manager = self._connection_manager
self._connection_manager = MqttConnection(
config=self.config,
auth_client=self._auth_client,
on_connection_interrupted=self._on_connection_interrupted_internal,
on_connection_resumed=self._on_connection_resumed_internal,
)

# Try to connect
success = await self._connection_manager.connect()

if success:
# Update connection references
self._connection = self._connection_manager.connection
self._connected = True

# Update subscription manager with new connection
if self._subscription_manager and self._connection:
self._subscription_manager.update_connection(
self._connection
)

_logger.info("Active reconnection successful")
else:
# Restore old connection manager and connection reference
self._connection_manager = old_connection_manager
Copy link

Copilot AI Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When restoring the old connection manager on failure, the _connection reference is not restored. This could leave self._connection pointing to a failed connection while self._connection_manager is the old one, creating an inconsistent state. Add self._connection = old_connection_manager.connection after this line.

Suggested change
self._connection_manager = old_connection_manager
self._connection_manager = old_connection_manager
self._connection = old_connection_manager.connection

Copilot uses AI. Check for mistakes.
self._connection = old_connection_manager.connection
_logger.warning("Active reconnection failed")
else:
_logger.warning(
"No connection manager available for reconnection"
)

except Exception as e:
_logger.error(
f"Error during active reconnection: {e}", exc_info=True
)
raise

async def connect(self) -> bool:
"""
Establish connection to AWS IoT Core.
Expand Down Expand Up @@ -327,6 +393,8 @@ async def connect(self) -> bool:
config=self.config,
is_connected_func=lambda: self._connected,
schedule_coroutine_func=self._schedule_coroutine,
reconnect_func=self._active_reconnect,
emit_event_func=self.emit,
)
self._reconnection_handler.enable()

Expand Down
52 changes: 45 additions & 7 deletions src/nwp500/mqtt_reconnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import asyncio
import contextlib
import logging
from collections.abc import Awaitable
from typing import TYPE_CHECKING, Any, Callable, Optional

if TYPE_CHECKING:
Expand All @@ -33,6 +34,8 @@ def __init__(
config: "MqttConnectionConfig",
is_connected_func: Callable[[], bool],
schedule_coroutine_func: Callable[[Any], None],
reconnect_func: Callable[[], Awaitable[None]],
emit_event_func: Optional[Callable[..., Awaitable[Any]]] = None,
):
"""
Initialize reconnection handler.
Expand All @@ -42,10 +45,15 @@ def __init__(
is_connected_func: Function to check if currently connected
schedule_coroutine_func: Function to schedule coroutines from any
thread
reconnect_func: Async function to trigger active reconnection
emit_event_func: Optional async function to emit events
(e.g., EventEmitter.emit)
"""
self.config = config
self._is_connected_func = is_connected_func
self._schedule_coroutine = schedule_coroutine_func
self._reconnect_func = reconnect_func
self._emit_event = emit_event_func

self._reconnect_attempts = 0
self._reconnect_task: Optional[asyncio.Task[None]] = None
Expand Down Expand Up @@ -156,24 +164,54 @@ async def _reconnect_with_backoff(self) -> None:
try:
await asyncio.sleep(delay)

# AWS IoT SDK will handle the actual reconnection automatically
# We just need to wait and monitor the connection state
_logger.debug(
"Waiting for AWS IoT SDK automatic reconnection..."
)
# Check if we're already connected (AWS SDK auto-reconnected)
if self._is_connected_func():
_logger.info(
"AWS IoT SDK automatically reconnected during delay"
)
break

# Trigger active reconnection
_logger.info("Triggering active reconnection...")
try:
await self._reconnect_func()
if self._is_connected_func():
_logger.info("Successfully reconnected")
break
except Exception as e:
_logger.warning(
f"Active reconnection failed: {e}. "
"Will retry if attempts remain."
)

except asyncio.CancelledError:
_logger.info("Reconnection task cancelled")
break
except Exception as e:
_logger.error(f"Error during reconnection attempt: {e}")
_logger.error(
f"Error during reconnection attempt: {e}", exc_info=True
)

if self._reconnect_attempts >= self.config.max_reconnect_attempts:
# Check final state
if (
self._reconnect_attempts >= self.config.max_reconnect_attempts
and not self._is_connected_func()
):
_logger.error(
f"Failed to reconnect after "
f"{self.config.max_reconnect_attempts} attempts. "
"Manual reconnection required."
)
# Emit reconnection_failed event if event emitter is available
if self._emit_event:
try:
await self._emit_event(
"reconnection_failed", self._reconnect_attempts
)
except Exception as e:
_logger.error(
f"Error emitting reconnection_failed event: {e}"
)

async def cancel(self) -> None:
"""Cancel any pending reconnection task."""
Expand Down
18 changes: 18 additions & 0 deletions src/nwp500/mqtt_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ def subscriptions(self) -> dict[str, mqtt.QoS]:
"""Get current subscriptions."""
return self._subscriptions.copy()

def update_connection(self, connection: Any) -> None:
"""
Update the MQTT connection reference.

This is used when the connection is recreated (e.g., after reconnection)
to update the internal reference while preserving subscriptions.

Args:
connection: New MQTT connection object

Note:
This does not re-establish subscriptions. Call the appropriate
subscribe methods to re-register subscriptions with the new
connection if needed.
"""
self._connection = connection
_logger.debug("Updated subscription manager connection reference")

def _on_message_received(
self, topic: str, payload: bytes, **kwargs: Any
) -> None:
Expand Down