diff --git a/docs/AUTO_RECOVERY.rst b/docs/AUTO_RECOVERY.rst new file mode 100644 index 0000000..5b10a57 --- /dev/null +++ b/docs/AUTO_RECOVERY.rst @@ -0,0 +1,456 @@ +======================================== +Automatic Reconnection After Connection Failure +======================================== + +This guide explains how to automatically recover from permanent MQTT connection failures (after max reconnection attempts are exhausted). + +Understanding the Problem +========================== + +The MQTT client has built-in automatic reconnection with exponential backoff: + +* When connection drops, it automatically tries to reconnect +* Default: 10 attempts with exponential backoff (1s → 120s) +* After 10 failed attempts, it stops and emits ``reconnection_failed`` event +* Periodic tasks are stopped to prevent log spam + +The question is: **How do we automatically retry after these 10 attempts fail?** + +Solution Overview +================= + +There are 4 strategies, ranging from simple to production-ready: + +.. list-table:: + :header-rows: 1 + :widths: 30 15 55 + + * - Strategy + - Complexity + - Use Case + * - 1. Simple Retry + - ⭐ + - Quick tests, simple scripts + * - 2. Full Recreation + - ⭐⭐ + - Better cleanup, medium apps + * - 3. Token Refresh + - ⭐⭐⭐ + - Long-running apps, token expiry issues + * - 4. Exponential Backoff + - ⭐⭐⭐⭐ + - **Production (Recommended)** + +Strategy 1: Simple Retry with Reset +==================================== + +Just reset the reconnection counter and try again after a delay. + +.. code-block:: python + + from nwp500 import NavienMqttClient + from nwp500.mqtt_client import MqttConnectionConfig + + config = MqttConnectionConfig(max_reconnect_attempts=10) + mqtt_client = NavienMqttClient(auth_client, config=config) + + async def on_reconnection_failed(attempts): + print(f"Failed after {attempts} attempts. Retrying in 60s...") + await asyncio.sleep(60) + + # Reset and retry using public API + await mqtt_client.reset_reconnect() + + mqtt_client.on('reconnection_failed', on_reconnection_failed) + await mqtt_client.connect() + +**Pros:** Simple, minimal code + +**Cons:** May need to refresh tokens for long-running connections + +Strategy 2: Full Client Recreation +=================================== + +Create a new MQTT client instance when reconnection fails. + +.. code-block:: python + + mqtt_client = None + + async def create_and_connect(): + global mqtt_client + + if mqtt_client and mqtt_client.is_connected: + await mqtt_client.disconnect() + + mqtt_client = NavienMqttClient(auth_client, config=config) + mqtt_client.on('reconnection_failed', on_reconnection_failed) + await mqtt_client.connect() + + # Restore subscriptions + await mqtt_client.subscribe_device_status(device, on_status) + await mqtt_client.start_periodic_device_status_requests(device) + + return mqtt_client + + async def on_reconnection_failed(attempts): + print(f"Failed after {attempts} attempts. Recreating client in 60s...") + await asyncio.sleep(60) + await create_and_connect() + + mqtt_client = await create_and_connect() + +**Pros:** Clean state, more reliable + +**Cons:** Need to restore all subscriptions + +Strategy 3: Token Refresh and Retry +==================================== + +Refresh authentication tokens before retrying (handles token expiry). + +.. code-block:: python + + async def on_reconnection_failed(attempts): + print(f"Failed after {attempts} attempts. Refreshing tokens and retrying...") + await asyncio.sleep(60) + + # Refresh authentication tokens + await auth_client.refresh_token() + + # Recreate client with fresh tokens + mqtt_client = NavienMqttClient(auth_client, config=config) + mqtt_client.on('reconnection_failed', on_reconnection_failed) + await mqtt_client.connect() + + # Restore subscriptions + await mqtt_client.subscribe_device_status(device, on_status) + await mqtt_client.start_periodic_device_status_requests(device) + +**Pros:** Handles token expiry, more robust + +**Cons:** More complex, need to manage client lifecycle + +Strategy 4: Exponential Backoff (Production-Ready) ⭐ RECOMMENDED +================================================================= + +Use exponential backoff between recovery attempts with token refresh. + +.. code-block:: python + + import asyncio + from nwp500 import NavienMqttClient + from nwp500.mqtt_client import MqttConnectionConfig + + class ResilientMqttClient: + """Production-ready MQTT client with automatic recovery.""" + + def __init__(self, auth_client, config=None): + self.auth_client = auth_client + self.config = config or MqttConnectionConfig() + self.mqtt_client = None + self.device = None + self.callbacks = {} + + # Recovery settings + self.recovery_attempt = 0 + self.max_recovery_attempts = 10 + self.initial_recovery_delay = 60.0 + self.max_recovery_delay = 300.0 + self.recovery_backoff_multiplier = 2.0 + + async def connect(self, device, status_callback=None): + """Connect with automatic recovery.""" + self.device = device + self.callbacks['status'] = status_callback + await self._create_client() + + async def _create_client(self): + """Create and configure MQTT client.""" + # Cleanup old client + if self.mqtt_client and self.mqtt_client.is_connected: + await self.mqtt_client.disconnect() + + # Create new client + self.mqtt_client = NavienMqttClient(self.auth_client, self.config) + self.mqtt_client.on('reconnection_failed', self._handle_recovery) + + # Connect + await self.mqtt_client.connect() + + # Restore subscriptions + if self.device and self.callbacks.get('status'): + await self.mqtt_client.subscribe_device_status( + self.device, self.callbacks['status'] + ) + await self.mqtt_client.start_periodic_device_status_requests( + self.device + ) + + async def _handle_recovery(self, attempts): + """Handle reconnection failure with exponential backoff.""" + self.recovery_attempt += 1 + + if self.recovery_attempt >= self.max_recovery_attempts: + print("Max recovery attempts reached. Manual intervention required.") + # Send alert, restart app, etc. + return + + # Calculate delay with exponential backoff + delay = min( + self.initial_recovery_delay * + (self.recovery_backoff_multiplier ** (self.recovery_attempt - 1)), + self.max_recovery_delay + ) + + print(f"Recovery attempt {self.recovery_attempt} in {delay:.0f}s...") + await asyncio.sleep(delay) + + try: + # Refresh tokens every few attempts + if self.recovery_attempt % 3 == 0: + await self.auth_client.refresh_token() + + # Recreate client + await self._create_client() + + # Reset on success + self.recovery_attempt = 0 + print("✅ Recovery successful!") + + except Exception as e: + print(f"Recovery failed: {e}") + + async def disconnect(self): + """Disconnect gracefully.""" + if self.mqtt_client and self.mqtt_client.is_connected: + await self.mqtt_client.disconnect() + + @property + def is_connected(self): + return self.mqtt_client and self.mqtt_client.is_connected + + # Usage + async with NavienAuthClient(email, password) as auth_client: + api_client = NavienAPIClient(auth_client=auth_client) + device = await api_client.get_first_device() + + def on_status(status): + print(f"Temperature: {status.dhwTemperature}°F") + + # Create resilient client + mqtt_config = MqttConnectionConfig( + auto_reconnect=True, + max_reconnect_attempts=10, + ) + + client = ResilientMqttClient(auth_client, config=mqtt_config) + await client.connect(device, status_callback=on_status) + + # Monitor indefinitely + while True: + await asyncio.sleep(60) + print(f"Status: {'Connected' if client.is_connected else 'Reconnecting...'}") + +**Pros:** + +* Production-ready +* Handles token expiry +* Exponential backoff prevents overwhelming the server +* Configurable limits +* Clean error handling + +**Cons:** More code (but provided in examples) + +Configuration Options +===================== + +You can tune the reconnection behavior: + +.. code-block:: python + + config = MqttConnectionConfig( + # Initial reconnection (built-in) + auto_reconnect=True, + max_reconnect_attempts=10, + initial_reconnect_delay=1.0, # Start with 1s + max_reconnect_delay=120.0, # Cap at 2 minutes + reconnect_backoff_multiplier=2.0, # Double each time + ) + +**Reconnection Timeline:** + +1. Attempt 1: 1s delay +2. Attempt 2: 2s delay +3. Attempt 3: 4s delay +4. Attempt 4: 8s delay +5. Attempt 5: 16s delay +6. Attempt 6: 32s delay +7. Attempt 7: 64s delay +8. Attempts 8-10: 120s delay (capped) + +After 10 attempts (~6 minutes), ``reconnection_failed`` event is emitted. + +Best Practices +============== + +1. Use the ResilientMqttClient wrapper (Strategy 4) +---------------------------------------------------- + +See ``examples/simple_auto_recovery.py`` for a complete implementation. + +2. Implement monitoring and alerting +------------------------------------- + +.. code-block:: python + + async def on_reconnection_failed(attempts): + # Send alert when recovery starts + await send_alert(f"MQTT connection failed after {attempts} attempts") + +3. Set reasonable limits +------------------------ + +.. code-block:: python + + max_recovery_attempts = 10 # Stop after 10 recovery cycles + max_recovery_delay = 300.0 # Max 5 minutes between attempts + +4. Refresh tokens periodically +------------------------------- + +.. code-block:: python + + # Refresh every 3rd recovery attempt + if recovery_attempt % 3 == 0: + await auth_client.refresh_token() + +5. Log recovery events +---------------------- + +.. code-block:: python + + logger.info(f"Recovery attempt {recovery_attempt}/{max_recovery_attempts}") + logger.info(f"Waiting {delay:.0f} seconds before retry") + logger.info("✅ Recovery successful!") + +Examples +======== + +Complete working examples are provided: + +1. **examples/simple_auto_recovery.py** - Recommended pattern (Strategy 4) + + * Production-ready ResilientMqttClient wrapper + * Exponential backoff + * Token refresh + * Easy to use + +2. **examples/auto_recovery_example.py** - All 4 strategies + + * Shows all approaches side-by-side + * Good for learning and comparison + * Select strategy with ``STRATEGY=1-4`` env var + +Run them: + +.. code-block:: bash + + # Simple recovery (recommended) + NAVIEN_EMAIL=your@email.com NAVIEN_PASSWORD=yourpass \ + python examples/simple_auto_recovery.py + + # All strategies (for learning) + NAVIEN_EMAIL=your@email.com NAVIEN_PASSWORD=yourpass STRATEGY=4 \ + python examples/auto_recovery_example.py + +Testing Recovery +================ + +To test automatic recovery: + +1. Start the example +2. Wait for connection +3. Disconnect your internet for ~1 minute +4. Reconnect internet +5. Watch the client automatically recover + +The logs will show: + +.. code-block:: text + + ERROR: Failed to reconnect after 10 attempts. Manual reconnection required. + INFO: Stopping 2 periodic task(s) due to connection failure + INFO: Starting recovery attempt 1/10 + INFO: Waiting 60 seconds before recovery... + INFO: Refreshing authentication tokens... + INFO: Recreating MQTT client... + INFO: ✅ Connected: navien-client-abc123 + INFO: Subscriptions restored + INFO: ✅ Recovery successful! + +When to Use Each Strategy +========================== + +.. list-table:: + :header-rows: 1 + :widths: 40 60 + + * - Scenario + - Recommended Strategy + * - Simple script, occasional use + - Strategy 1: Simple Retry + * - Development/testing + - Strategy 2: Full Recreation + * - Long-running service + - Strategy 3: Token Refresh + * - **Production application** + - **Strategy 4: Exponential Backoff** + * - Home automation integration + - Strategy 4: Exponential Backoff + * - Monitoring dashboard + - Strategy 4: Exponential Backoff + +Additional Options +================== + +Increase max reconnection attempts +----------------------------------- + +Instead of implementing recovery, you can increase the built-in attempts: + +.. code-block:: python + + config = MqttConnectionConfig( + max_reconnect_attempts=50, # Try 50 times before giving up + max_reconnect_delay=300.0, # Up to 5 minutes between attempts + ) + +This gives ~4+ hours of retry attempts before needing recovery. + +Disable automatic reconnection +------------------------------- + +If you want to handle everything manually: + +.. code-block:: python + + config = MqttConnectionConfig( + auto_reconnect=False, # Disable automatic reconnection + ) + + mqtt_client.on('connection_interrupted', my_custom_handler) + +Conclusion +========== + +For production use, **use Strategy 4 (Exponential Backoff)** via the ``ResilientMqttClient`` wrapper provided in ``examples/simple_auto_recovery.py``. It handles: + +* ✅ Automatic recovery from permanent failures +* ✅ Exponential backoff to prevent server overload +* ✅ Token refresh for long-running connections +* ✅ Clean client recreation +* ✅ Subscription restoration +* ✅ Configurable limits and delays + +This ensures your application stays connected even during extended network outages. diff --git a/docs/AUTO_RECOVERY_QUICK.rst b/docs/AUTO_RECOVERY_QUICK.rst new file mode 100644 index 0000000..8e585da --- /dev/null +++ b/docs/AUTO_RECOVERY_QUICK.rst @@ -0,0 +1,192 @@ +=================================== +Quick Reference: MQTT Auto-Recovery +=================================== + +TL;DR - Just Give Me The Code! +=============================== + +Copy this class into your project for production-ready automatic recovery: + +.. code-block:: python + + import asyncio + from nwp500 import NavienMqttClient + from nwp500.mqtt_client import MqttConnectionConfig + + class ResilientMqttClient: + """MQTT client with automatic recovery from permanent connection failures.""" + + def __init__(self, auth_client, config=None): + self.auth_client = auth_client + self.config = config or MqttConnectionConfig() + self.mqtt_client = None + self.device = None + self.status_callback = None + self.recovery_attempt = 0 + self.max_recovery_attempts = 10 + self.recovery_delay = 60.0 + + async def connect(self, device, status_callback=None): + self.device = device + self.status_callback = status_callback + await self._create_client() + + async def _create_client(self): + if self.mqtt_client and self.mqtt_client.is_connected: + await self.mqtt_client.disconnect() + + self.mqtt_client = NavienMqttClient(self.auth_client, self.config) + self.mqtt_client.on("reconnection_failed", self._handle_recovery) + await self.mqtt_client.connect() + + if self.device and self.status_callback: + await self.mqtt_client.subscribe_device_status( + self.device, self.status_callback + ) + await self.mqtt_client.start_periodic_device_status_requests(self.device) + + async def _handle_recovery(self, attempts): + self.recovery_attempt += 1 + if self.recovery_attempt >= self.max_recovery_attempts: + return # Give up + + await asyncio.sleep(self.recovery_delay) + + try: + await self.auth_client.refresh_token() + await self._create_client() + self.recovery_attempt = 0 # Reset on success + except Exception as e: + # Log the error instead of silently passing + import logging + + logging.getLogger(__name__).warning(f"Recovery attempt failed: {e}") + # Will retry on next reconnection_failed + + async def disconnect(self): + if self.mqtt_client: + await self.mqtt_client.disconnect() + + @property + def is_connected(self): + return self.mqtt_client and self.mqtt_client.is_connected + +**Usage:** + +.. code-block:: python + + client = ResilientMqttClient(auth_client) + await client.connect(device, status_callback=on_status) + + # Your client will now automatically recover from connection failures! + +How It Works +============ + +1. **Normal operation**: MQTT client connects and operates normally +2. **Connection lost**: Client tries to reconnect automatically (10 attempts with exponential backoff) +3. **Reconnection fails**: After 10 attempts (~6 minutes), ``reconnection_failed`` event fires +4. **Auto-recovery kicks in**: + + * Waits 60 seconds + * Refreshes authentication tokens + * Creates new MQTT client + * Restores all subscriptions + * Tries up to 10 recovery cycles + +Configuration +============= + +Tune the behavior: + +.. code-block:: python + + config = MqttConnectionConfig( + max_reconnect_attempts=10, # Built-in reconnection attempts + max_reconnect_delay=120.0, # Max 2 min between attempts + ) + + client = ResilientMqttClient(auth_client, config=config) + client.max_recovery_attempts = 10 # Recovery cycles + client.recovery_delay = 60.0 # Seconds between recovery attempts + +Complete Examples +================= + +See these files for full working examples: + +* ``examples/simple_auto_recovery.py`` - Production-ready pattern (recommended) +* ``examples/auto_recovery_example.py`` - All 4 strategies explained +* ``docs/AUTO_RECOVERY.rst`` - Complete documentation + +Timeline Example +================ + +With default settings: + +.. code-block:: text + + 00:00 - Connection lost + 00:00 - Reconnect attempt 1 (1s delay) + 00:01 - Reconnect attempt 2 (2s delay) + 00:03 - Reconnect attempt 3 (4s delay) + 00:07 - Reconnect attempt 4 (8s delay) + 00:15 - Reconnect attempt 5 (16s delay) + 00:31 - Reconnect attempt 6 (32s delay) + 01:03 - Reconnect attempt 7 (64s delay) + 02:07 - Reconnect attempt 8 (120s delay, capped) + 04:07 - Reconnect attempt 9 (120s delay) + 06:07 - Reconnect attempt 10 (120s delay) + + 06:07 - reconnection_failed event emitted + 06:07 - Recovery cycle 1 starts + 07:07 - Token refresh + client recreation + 07:07 - If successful, back to normal operation + 07:07 - If failed, wait for next reconnection_failed event + + [Process repeats up to max_recovery_attempts times] + +Events You Can Listen To +======================== + +.. code-block:: python + + # Built-in MQTT events + mqtt_client.on('connection_interrupted', lambda err: print(f"Interrupted: {err}")) + mqtt_client.on('connection_resumed', lambda rc, sp: print("Resumed!")) + mqtt_client.on('reconnection_failed', lambda attempts: print(f"Failed after {attempts}")) + + # In ResilientMqttClient, reconnection_failed is handled automatically + +Testing +======= + +Test automatic recovery: + +1. Start your application +2. Disconnect internet for ~2 minutes +3. Reconnect internet +4. Watch automatic recovery in logs + +Production Considerations +========================== + +**DO:** + +* ✅ Use ``ResilientMqttClient`` wrapper +* ✅ Set reasonable ``max_recovery_attempts`` (10-20) +* ✅ Log recovery events for monitoring +* ✅ Send alerts when recovery is triggered +* ✅ Monitor token expiration + +**DON'T:** + +* ❌ Set recovery delay too low (causes server load) +* ❌ Set max_recovery_attempts too high (wastes resources) +* ❌ Ignore the ``reconnection_failed`` event +* ❌ Forget to restore subscriptions after recovery + +Need More Info? +================ + +Read the full documentation: :doc:`AUTO_RECOVERY` diff --git a/docs/index.rst b/docs/index.rst index 4cd2411..e450973 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -315,6 +315,8 @@ Documentation Command Queue Event Emitter Energy Monitoring + Auto-Recovery Quick Reference + Auto-Recovery Complete Guide .. toctree:: :maxdepth: 2 diff --git a/examples/auto_recovery_example.py b/examples/auto_recovery_example.py new file mode 100644 index 0000000..13b648d --- /dev/null +++ b/examples/auto_recovery_example.py @@ -0,0 +1,448 @@ +#!/usr/bin/env python3 +""" +Example: Automatic Recovery After Reconnection Failure + +This example demonstrates different strategies to automatically recover +from permanent connection failures (after max reconnection attempts). + +Strategies demonstrated: +1. Simple retry with reset - Just retry the connection after a delay +2. Full client recreation - Recreate the MQTT client from scratch +3. Token refresh and retry - Refresh auth tokens before retry +4. Exponential backoff retry - Use increasing delays between retry attempts +""" + +import asyncio +import logging +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from nwp500 import NavienAPIClient, NavienAuthClient, NavienMqttClient +from nwp500.mqtt_client import MqttConnectionConfig + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + + +# ============================================================================ +# STRATEGY 1: Simple Retry with Reset +# ============================================================================ +async def strategy_simple_retry(auth_client, device): + """ + Simple strategy: When reconnection fails, wait and try again. + + This is the simplest approach - just restart the reconnection attempts + after a delay. The MQTT client's internal reconnection counter is reset. + """ + logger.info("Using Strategy 1: Simple Retry with Reset") + + config = MqttConnectionConfig( + auto_reconnect=True, + max_reconnect_attempts=5, # Fewer attempts for demo + initial_reconnect_delay=1.0, + max_reconnect_delay=30.0, + ) + + mqtt_client = NavienMqttClient(auth_client, config=config) + + # Track recovery attempts + recovery_attempt = 0 + max_recovery_attempts = 3 + + async def on_reconnection_failed(attempts): + """Handle permanent reconnection failure.""" + nonlocal recovery_attempt + recovery_attempt += 1 + + logger.error(f"Reconnection failed after {attempts} attempts") + logger.info( + f"Recovery attempt {recovery_attempt}/{max_recovery_attempts} " + f"- will retry in 30 seconds..." + ) + + if recovery_attempt >= max_recovery_attempts: + logger.error("Max recovery attempts reached. Giving up.") + return + + # Wait before retrying + await asyncio.sleep(30) + + # Reset reconnection counter and try again using public API + logger.info("Restarting reconnection process...") + await mqtt_client.reset_reconnect() + + # Register the event handler + mqtt_client.on("reconnection_failed", on_reconnection_failed) + + try: + await mqtt_client.connect() + logger.info(f"Connected: {mqtt_client.client_id}") + + # Subscribe and monitor + await mqtt_client.subscribe_device_status(device, lambda s: None) + await mqtt_client.start_periodic_device_status_requests(device) + + # Monitor for 120 seconds + for i in range(120): + await asyncio.sleep(1) + if i % 10 == 0: + status = ( + "🟢 Connected" if mqtt_client.is_connected else "🔴 Disconnected" + ) + logger.info(f"[{i}s] {status}") + + finally: + if mqtt_client.is_connected: + await mqtt_client.disconnect() + + +# ============================================================================ +# STRATEGY 2: Full Client Recreation +# ============================================================================ +async def strategy_full_recreation(auth_client, device): + """ + Robust strategy: Recreate the entire MQTT client from scratch. + + This approach creates a new MQTT client instance when reconnection fails. + It's more robust as it clears all internal state. + """ + logger.info("Using Strategy 2: Full Client Recreation") + + config = MqttConnectionConfig( + auto_reconnect=True, + max_reconnect_attempts=5, + initial_reconnect_delay=1.0, + max_reconnect_delay=30.0, + ) + + mqtt_client = None + recovery_attempt = 0 + max_recovery_attempts = 3 + + async def create_and_connect(): + """Create a new MQTT client and connect.""" + nonlocal mqtt_client + + if mqtt_client and mqtt_client.is_connected: + await mqtt_client.disconnect() + + mqtt_client = NavienMqttClient(auth_client, config=config) + mqtt_client.on("reconnection_failed", on_reconnection_failed) + + await mqtt_client.connect() + logger.info(f"Connected: {mqtt_client.client_id}") + + # Re-subscribe + await mqtt_client.subscribe_device_status(device, lambda s: None) + await mqtt_client.start_periodic_device_status_requests(device) + + return mqtt_client + + async def on_reconnection_failed(attempts): + """Handle permanent reconnection failure by recreating client.""" + nonlocal recovery_attempt, mqtt_client + + recovery_attempt += 1 + logger.error(f"Reconnection failed after {attempts} attempts") + logger.info( + f"Recovery attempt {recovery_attempt}/{max_recovery_attempts} " + f"- recreating MQTT client in 30 seconds..." + ) + + if recovery_attempt >= max_recovery_attempts: + logger.error("Max recovery attempts reached. Giving up.") + return + + await asyncio.sleep(30) + + try: + mqtt_client = await create_and_connect() + recovery_attempt = 0 # Reset on success + logger.info("Successfully recreated MQTT client") + except Exception as e: + logger.error(f"Failed to recreate MQTT client: {e}") + + try: + mqtt_client = await create_and_connect() + + # Monitor for 120 seconds + for i in range(120): + await asyncio.sleep(1) + if i % 10 == 0: + status = ( + "🟢 Connected" if mqtt_client.is_connected else "🔴 Disconnected" + ) + logger.info(f"[{i}s] {status}") + + finally: + if mqtt_client and mqtt_client.is_connected: + await mqtt_client.disconnect() + + +# ============================================================================ +# STRATEGY 3: Token Refresh and Retry +# ============================================================================ +async def strategy_token_refresh(auth_client, device): + """ + Advanced strategy: Refresh authentication tokens before retry. + + Sometimes connection failures are due to expired tokens. This strategy + refreshes the auth tokens before retrying the connection. + """ + logger.info("Using Strategy 3: Token Refresh and Retry") + + config = MqttConnectionConfig( + auto_reconnect=True, + max_reconnect_attempts=5, + initial_reconnect_delay=1.0, + max_reconnect_delay=30.0, + ) + + mqtt_client = None + recovery_attempt = 0 + max_recovery_attempts = 3 + + async def on_reconnection_failed(attempts): + """Handle permanent reconnection failure with token refresh.""" + nonlocal recovery_attempt, mqtt_client + + recovery_attempt += 1 + logger.error(f"Reconnection failed after {attempts} attempts") + logger.info( + f"Recovery attempt {recovery_attempt}/{max_recovery_attempts} " + f"- refreshing tokens and retrying in 30 seconds..." + ) + + if recovery_attempt >= max_recovery_attempts: + logger.error("Max recovery attempts reached. Giving up.") + return + + await asyncio.sleep(30) + + try: + # Refresh authentication tokens + logger.info("Refreshing authentication tokens...") + await auth_client.refresh_token() + logger.info("Tokens refreshed successfully") + + # Disconnect old client + if mqtt_client and mqtt_client.is_connected: + await mqtt_client.disconnect() + + # Create new client with fresh tokens + mqtt_client = NavienMqttClient(auth_client, config=config) + mqtt_client.on("reconnection_failed", on_reconnection_failed) + + await mqtt_client.connect() + logger.info(f"Reconnected with fresh tokens: {mqtt_client.client_id}") + + # Re-subscribe + await mqtt_client.subscribe_device_status(device, lambda s: None) + await mqtt_client.start_periodic_device_status_requests(device) + + recovery_attempt = 0 # Reset on success + + except Exception as e: + logger.error(f"Failed to refresh and reconnect: {e}") + + mqtt_client = NavienMqttClient(auth_client, config=config) + mqtt_client.on("reconnection_failed", on_reconnection_failed) + + try: + await mqtt_client.connect() + logger.info(f"Connected: {mqtt_client.client_id}") + + # Subscribe and monitor + await mqtt_client.subscribe_device_status(device, lambda s: None) + await mqtt_client.start_periodic_device_status_requests(device) + + # Monitor for 120 seconds + for i in range(120): + await asyncio.sleep(1) + if i % 10 == 0: + status = ( + "🟢 Connected" if mqtt_client.is_connected else "🔴 Disconnected" + ) + logger.info(f"[{i}s] {status}") + + finally: + if mqtt_client and mqtt_client.is_connected: + await mqtt_client.disconnect() + + +# ============================================================================ +# STRATEGY 4: Exponential Backoff Retry (Production-Ready) +# ============================================================================ +async def strategy_exponential_backoff(auth_client, device): + """ + Production-ready strategy: Use exponential backoff for recovery attempts. + + This is the most robust strategy for production use. It: + - Uses exponential backoff between recovery attempts + - Refreshes tokens periodically + - Recreates the client cleanly + - Has configurable limits + """ + logger.info("Using Strategy 4: Exponential Backoff Retry (Production)") + + config = MqttConnectionConfig( + auto_reconnect=True, + max_reconnect_attempts=5, + initial_reconnect_delay=1.0, + max_reconnect_delay=30.0, + ) + + mqtt_client = None + recovery_attempt = 0 + max_recovery_attempts = 10 + initial_recovery_delay = 30.0 + max_recovery_delay = 300.0 # 5 minutes + recovery_backoff_multiplier = 2.0 + + async def on_reconnection_failed(attempts): + """Handle permanent reconnection failure with exponential backoff.""" + nonlocal recovery_attempt, mqtt_client + + recovery_attempt += 1 + logger.error(f"Reconnection failed after {attempts} attempts") + + if recovery_attempt >= max_recovery_attempts: + logger.error("Max recovery attempts reached. Manual intervention required.") + # In production, you might want to: + # - Send alert notification + # - Restart the application + # - Log to monitoring system + return + + # Calculate delay with exponential backoff + delay = min( + initial_recovery_delay + * (recovery_backoff_multiplier ** (recovery_attempt - 1)), + max_recovery_delay, + ) + + logger.info( + f"Recovery attempt {recovery_attempt}/{max_recovery_attempts} " + f"in {delay:.1f} seconds..." + ) + + await asyncio.sleep(delay) + + try: + # Refresh tokens every few attempts + if recovery_attempt % 3 == 0: + logger.info("Refreshing authentication tokens...") + await auth_client.refresh_token() + logger.info("Tokens refreshed") + + # Clean up old client + if mqtt_client: + try: + if mqtt_client.is_connected: + await mqtt_client.disconnect() + except Exception as e: + logger.warning(f"Error disconnecting old client: {e}") + + # Create new client + mqtt_client = NavienMqttClient(auth_client, config=config) + mqtt_client.on("reconnection_failed", on_reconnection_failed) + + await mqtt_client.connect() + logger.info(f"✅ Recovered! Connected: {mqtt_client.client_id}") + + # Re-subscribe + await mqtt_client.subscribe_device_status(device, lambda s: None) + await mqtt_client.start_periodic_device_status_requests(device) + + recovery_attempt = 0 # Reset on success + logger.info("All subscriptions restored") + + except Exception as e: + logger.error(f"Recovery attempt failed: {e}") + + mqtt_client = NavienMqttClient(auth_client, config=config) + mqtt_client.on("reconnection_failed", on_reconnection_failed) + + try: + await mqtt_client.connect() + logger.info(f"Connected: {mqtt_client.client_id}") + + # Subscribe and monitor + await mqtt_client.subscribe_device_status(device, lambda s: None) + await mqtt_client.start_periodic_device_status_requests(device) + + # Monitor for 120 seconds + for i in range(120): + await asyncio.sleep(1) + if i % 10 == 0: + status = ( + "🟢 Connected" if mqtt_client.is_connected else "🔴 Disconnected" + ) + logger.info(f"[{i}s] {status}") + + finally: + if mqtt_client and mqtt_client.is_connected: + await mqtt_client.disconnect() + + +# ============================================================================ +# Main +# ============================================================================ +async def main(): + """Run the selected strategy.""" + email = os.getenv("NAVIEN_EMAIL") + password = os.getenv("NAVIEN_PASSWORD") + + if not email or not password: + print("Please set NAVIEN_EMAIL and NAVIEN_PASSWORD environment variables") + return + + # Select strategy (1-4) + strategy = int(os.getenv("STRATEGY", "4")) + + print("=" * 70) + print("Automatic Recovery After Reconnection Failure") + print("=" * 70) + + async with NavienAuthClient(email, password) as auth_client: + logger.info(f"Authenticated as: {auth_client.current_user.full_name}") + + api_client = NavienAPIClient(auth_client=auth_client) + device = await api_client.get_first_device() + + if not device: + logger.error("No devices found") + return + + logger.info(f"Found device: {device.device_info.device_name}") + + # Run selected strategy + if strategy == 1: + await strategy_simple_retry(auth_client, device) + elif strategy == 2: + await strategy_full_recreation(auth_client, device) + elif strategy == 3: + await strategy_token_refresh(auth_client, device) + elif strategy == 4: + await strategy_exponential_backoff(auth_client, device) + else: + logger.error(f"Invalid strategy: {strategy}") + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n\n⚠️ Interrupted by user") + except Exception as e: + print(f"\n❌ Error: {e}") + import traceback + + traceback.print_exc() diff --git a/examples/simple_auto_recovery.py b/examples/simple_auto_recovery.py new file mode 100644 index 0000000..db75562 --- /dev/null +++ b/examples/simple_auto_recovery.py @@ -0,0 +1,257 @@ +#!/usr/bin/env python3 +""" +Example: Simple Automatic Recovery (Recommended Pattern) + +This example shows the simplest and most reliable way to handle permanent +connection failures. When the MQTT client fails to reconnect after max +attempts, it will automatically: + +1. Wait 60 seconds +2. Refresh authentication tokens +3. Recreate the MQTT client +4. Restore all subscriptions +5. Restart periodic requests + +This pattern is production-ready and handles most network issues gracefully. +""" + +import asyncio +import logging +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from nwp500 import NavienAPIClient, NavienAuthClient, NavienMqttClient +from nwp500.mqtt_client import MqttConnectionConfig + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" +) +logger = logging.getLogger(__name__) + + +class ResilientMqttClient: + """ + Wrapper around NavienMqttClient that automatically recovers from failures. + + This class handles the `reconnection_failed` event and automatically + recreates the MQTT client with fresh authentication tokens. + """ + + def __init__(self, auth_client, config=None): + self.auth_client = auth_client + self.config = config or MqttConnectionConfig() + self.mqtt_client = None + self.device = None + self.status_callback = None + + # Recovery settings + self.max_recovery_attempts = 10 + self.recovery_delay = 60.0 # seconds + self.recovery_attempt = 0 + self._recovery_in_progress = False # Guard against concurrent recovery + + async def connect(self, device, status_callback=None): + """ + Connect to MQTT and set up automatic recovery. + + Args: + device: Navien device to monitor + status_callback: Optional callback for status updates + """ + self.device = device + self.status_callback = status_callback + + # Create and connect MQTT client + await self._create_client() + + logger.info(f"✅ Connected: {self.mqtt_client.client_id}") + + async def _create_client(self): + """Create MQTT client with recovery handler.""" + # Clean up old client if exists + if self.mqtt_client and self.mqtt_client.is_connected: + try: + await self.mqtt_client.disconnect() + except Exception as e: + logger.warning(f"Error disconnecting old client: {e}") + + # Create new client + self.mqtt_client = NavienMqttClient(self.auth_client, config=self.config) + + # Register recovery handler + self.mqtt_client.on("reconnection_failed", self._handle_reconnection_failed) + + # Connect + await self.mqtt_client.connect() + + # Restore subscriptions + if self.device and self.status_callback: + await self.mqtt_client.subscribe_device_status( + self.device, self.status_callback + ) + await self.mqtt_client.start_periodic_device_status_requests(self.device) + logger.info("Subscriptions restored") + + async def _handle_reconnection_failed(self, attempts): + """ + Handle permanent reconnection failure by recreating the client. + + This method is called when the MQTT client exhausts all reconnection + attempts. It will automatically: + 1. Wait before retrying + 2. Refresh authentication tokens + 3. Recreate the MQTT client + 4. Restore all subscriptions + """ + # Prevent overlapping recovery attempts + if self._recovery_in_progress: + logger.debug( + "Recovery already in progress, ignoring reconnection_failed event" + ) + return + + self._recovery_in_progress = True + self.recovery_attempt += 1 + + try: + logger.error( + f"Reconnection failed after {attempts} attempts. " + f"Starting recovery attempt {self.recovery_attempt}/{self.max_recovery_attempts}" + ) + + if self.recovery_attempt >= self.max_recovery_attempts: + logger.error( + "Maximum recovery attempts reached. Manual intervention required." + ) + # In production, you might want to: + # - Send alert/notification + # - Trigger application restart + # - Log to monitoring system + return + + # Wait before attempting recovery + logger.info(f"Waiting {self.recovery_delay} seconds before recovery...") + await asyncio.sleep(self.recovery_delay) + + # Refresh authentication tokens + logger.info("Refreshing authentication tokens...") + await self.auth_client.refresh_token() + logger.info("Tokens refreshed successfully") + + # Recreate MQTT client + logger.info("Recreating MQTT client...") + await self._create_client() + + # Reset recovery counter on success + self.recovery_attempt = 0 + logger.info("✅ Recovery successful!") + + except Exception as e: + logger.error(f"Recovery attempt failed: {e}") + # The next reconnection_failed event will trigger another recovery attempt + finally: + self._recovery_in_progress = False + + async def disconnect(self): + """Disconnect from MQTT.""" + if self.mqtt_client and self.mqtt_client.is_connected: + await self.mqtt_client.disconnect() + logger.info("Disconnected") + + @property + def is_connected(self): + """Check if currently connected.""" + return self.mqtt_client and self.mqtt_client.is_connected + + +async def main(): + """Example usage of ResilientMqttClient.""" + email = os.getenv("NAVIEN_EMAIL") + password = os.getenv("NAVIEN_PASSWORD") + + if not email or not password: + print("Please set NAVIEN_EMAIL and NAVIEN_PASSWORD environment variables") + return + + print("=" * 70) + print("Simple Automatic Recovery Example") + print("=" * 70) + + async with NavienAuthClient(email, password) as auth_client: + logger.info(f"Authenticated as: {auth_client.current_user.full_name}") + + # Get device + api_client = NavienAPIClient(auth_client=auth_client) + device = await api_client.get_first_device() + + if not device: + logger.error("No devices found") + return + + logger.info(f"Found device: {device.device_info.device_name}") + + # Status callback + status_count = 0 + + def on_status(status): + nonlocal status_count + status_count += 1 + logger.info( + f"Status #{status_count}: Temp={status.dhwTemperature}°F, " + f"Mode={status.operationMode}" + ) + + # Create resilient MQTT client + mqtt_config = MqttConnectionConfig( + auto_reconnect=True, + max_reconnect_attempts=5, # Low for demo purposes + initial_reconnect_delay=1.0, + max_reconnect_delay=30.0, + ) + + resilient_client = ResilientMqttClient(auth_client, config=mqtt_config) + + # Connect with automatic recovery + await resilient_client.connect(device, status_callback=on_status) + + print("\n" + "=" * 70) + print("Monitoring connection (180 seconds)...") + print("=" * 70) + print("\nThe client will automatically recover if connection fails.") + print("To test: disconnect your internet and wait ~30 seconds,") + print("then reconnect. The client should automatically recover.\n") + + # Monitor for 3 minutes + for i in range(180): + await asyncio.sleep(1) + + # Show status every 10 seconds + if i % 10 == 0: + status = ( + "🟢 Connected" + if resilient_client.is_connected + else "🔴 Disconnected" + ) + logger.info(f"[{i}s] {status} | Status updates: {status_count}") + + print("\n" + "=" * 70) + print(f"Monitoring complete. Received {status_count} status updates.") + print("=" * 70) + + # Disconnect + await resilient_client.disconnect() + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n\n⚠️ Interrupted by user") + except Exception as e: + print(f"\n❌ Error: {e}") + import traceback + + traceback.print_exc() diff --git a/src/nwp500/mqtt_client.py b/src/nwp500/mqtt_client.py index 8a7927d..912fa16 100644 --- a/src/nwp500/mqtt_client.py +++ b/src/nwp500/mqtt_client.py @@ -22,6 +22,7 @@ from typing import Any, Callable, Optional from awscrt import mqtt +from awscrt.exceptions import AwsCrtError from awsiot import mqtt_connection_builder from .auth import NavienAuthClient @@ -118,6 +119,35 @@ def _redact(obj, keys_to_redact=None): return "" +def _redact_topic(topic: str) -> str: + """ + Redact sensitive information from MQTT topic strings. + + Topics often contain MAC addresses or device unique identifiers, e.g.: + - cmd/52/navilink-04786332fca0/st/did + - cmd/52/navilink-04786332fca0/ctrl + - cmd/52/04786332fca0/ctrl + - or with colons/hyphens (04:78:63:32:fc:a0 or 04-78-63-32-fc-a0) + + Args: + topic: MQTT topic string + + Returns: + Topic with MAC addresses redacted + """ + import re + + # Redact navilink- + topic = re.sub(r"(navilink-)[0-9a-fA-F]{12}", r"\1REDACTED", topic) + # Redact bare 12-hex MACs (lower/upper) + topic = re.sub(r"\b[0-9a-fA-F]{12}\b", "REDACTED", topic) + # Redact colon-delimited MAC format (e.g., 04:78:63:32:fc:a0) + topic = re.sub(r"\b([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}\b", "REDACTED", topic) + # Redact hyphen-delimited MAC format (e.g., 04-78-63-32-fc-a0) + topic = re.sub(r"\b([0-9a-fA-F]{2}-){5}[0-9a-fA-F]{2}\b", "REDACTED", topic) + return topic + + @dataclass class MqttConnectionConfig: """Configuration for MQTT connection.""" @@ -224,6 +254,7 @@ class NavienMqttClient(EventEmitter): - error_cleared: Error code cleared (error_code) - connection_interrupted: Connection lost (error) - connection_resumed: Connection restored (return_code, session_present) + - reconnection_failed: Reconnection permanently failed after max attempts (attempt_count) """ def __init__( @@ -428,6 +459,10 @@ async def _reconnect_with_backoff(self): f"Failed to reconnect after {self.config.max_reconnect_attempts} attempts. " "Manual reconnection required." ) + # Stop all periodic tasks to reduce log noise + await self._stop_all_periodic_tasks() + # Emit event so users can take action + self._schedule_coroutine(self.emit("reconnection_failed", self._reconnect_attempts)) async def _send_queued_commands(self): """ @@ -461,7 +496,9 @@ async def _send_queued_commands(self): ) except Exception as e: failed_count += 1 - _logger.error(f"Failed to send queued command to '{command.topic}': {e}") + _logger.error( + f"Failed to send queued command to '{_redact_topic(command.topic)}': {e}" + ) # Re-queue if there's room if len(self._command_queue) < self.config.max_queued_commands: self._command_queue.append(command) @@ -728,7 +765,7 @@ def _subscribe(): return packet_id except Exception as e: - _logger.error(f"Failed to subscribe to '{topic}': {e}") + _logger.error(f"Failed to subscribe to '{_redact_topic(topic)}': {e}") raise async def unsubscribe(self, topic: str): @@ -758,7 +795,7 @@ def _unsubscribe(): _logger.info(f"Unsubscribed from '{topic}'") except Exception as e: - _logger.error(f"Failed to unsubscribe from '{topic}': {e}") + _logger.error(f"Failed to unsubscribe from '{_redact_topic(topic)}': {e}") raise async def publish( @@ -813,7 +850,26 @@ def _publish(): return packet_id except Exception as e: - _logger.error(f"Failed to publish to '{topic}': {e}") + # Handle clean session cancellation gracefully + # Check exception type and name attribute for proper error identification + if ( + isinstance(e, AwsCrtError) + and e.name == "AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION" + ): + _logger.warning( + "Publish cancelled due to clean session. This is expected during reconnection." + ) + # Queue the command if queue is enabled + if self.config.enable_command_queue: + _logger.debug("Queuing command due to clean session cancellation") + self._queue_command(topic, payload, qos) + return 0 # Return 0 to indicate command was queued + # Otherwise, raise an error so the caller can handle the failure + raise RuntimeError( + "Publish cancelled due to clean session and command queue is disabled" + ) + + _logger.error(f"Failed to publish to '{_redact_topic(topic)}': {e}") raise # Navien-specific convenience methods @@ -1776,15 +1832,39 @@ async def periodic_request(): f"(every {period_seconds}s)" ) + # Track consecutive skips for throttled logging + consecutive_skips = 0 + while True: try: if not self._connected: - _logger.warning( - "Not connected, skipping %s request for %s", - request_type.value, - redacted_device_id, - ) + consecutive_skips += 1 + # Log warning only on first skip and then every 10th skip to reduce noise + if consecutive_skips == 1 or consecutive_skips % 10 == 0: + _logger.warning( + "Not connected, skipping %s request for %s (skipped %d time%s)", + request_type.value, + redacted_device_id, + consecutive_skips, + "s" if consecutive_skips > 1 else "", + ) + else: + _logger.debug( + "Not connected, skipping %s request for %s", + request_type.value, + redacted_device_id, + ) else: + # Reset skip counter when connected + if consecutive_skips > 0: + _logger.info( + "Reconnected, resuming %s requests for %s (had skipped %d)", + request_type.value, + redacted_device_id, + consecutive_skips, + ) + consecutive_skips = 0 + # Send appropriate request type if request_type == PeriodicRequestType.DEVICE_INFO: await self.request_device_info(device) @@ -1806,13 +1886,26 @@ async def periodic_request(): ) break except Exception as e: - _logger.error( - "Error in periodic %s request for %s: %s", - request_type.value, - redacted_device_id, - e, - exc_info=True, - ) + # Handle clean session cancellation gracefully (expected during reconnection) + # Check exception type and name attribute for proper error identification + if ( + isinstance(e, AwsCrtError) + and e.name == "AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION" + ): + _logger.debug( + "Periodic %s request cancelled due to clean session for %s. " + "This is expected during reconnection.", + request_type.value, + redacted_device_id, + ) + else: + _logger.error( + "Error in periodic %s request for %s: %s", + request_type.value, + redacted_device_id, + e, + exc_info=True, + ) # Continue despite errors await asyncio.sleep(period_seconds) @@ -1880,6 +1973,16 @@ async def stop_periodic_requests( + (f" (type={request_type.value})" if request_type else "") ) + async def _stop_all_periodic_tasks(self) -> None: + """ + Stop all periodic tasks. + + This is called internally when reconnection fails permanently + to reduce log noise from tasks trying to send requests while disconnected. + """ + # Delegate to public method with specific reason + await self.stop_all_periodic_tasks(_reason="connection failure") + # Convenience methods async def start_periodic_device_info_requests( self, device: Device, period_seconds: float = 300.0 @@ -1939,19 +2042,24 @@ async def stop_periodic_device_status_requests(self, device: Device) -> None: """ await self.stop_periodic_requests(device, PeriodicRequestType.DEVICE_STATUS) - async def stop_all_periodic_tasks(self) -> None: + async def stop_all_periodic_tasks(self, _reason: Optional[str] = None) -> None: """ Stop all periodic request tasks. This is automatically called when disconnecting. + Args: + _reason: Internal parameter for logging context (e.g., "connection failure") + Example: >>> await mqtt_client.stop_all_periodic_tasks() """ if not self._periodic_tasks: return - _logger.info(f"Stopping {len(self._periodic_tasks)} periodic task(s)") + task_count = len(self._periodic_tasks) + reason_msg = f" due to {_reason}" if _reason else "" + _logger.info(f"Stopping {task_count} periodic task(s){reason_msg}") # Cancel all tasks for task in self._periodic_tasks.values(): @@ -2005,3 +2113,28 @@ def clear_command_queue(self) -> int: self._command_queue.clear() _logger.info(f"Cleared {count} queued command(s)") return count + + async def reset_reconnect(self) -> None: + """ + Reset reconnection state and trigger a new reconnection attempt. + + This method resets the reconnection attempt counter and initiates + a new reconnection cycle. Useful for implementing custom recovery + logic after max reconnection attempts have been exhausted. + + Example: + >>> # In a reconnection_failed event handler + >>> await mqtt_client.reset_reconnect() + + Note: + This should typically only be called after a reconnection_failed + event, not during normal operation. + """ + self._reconnect_attempts = 0 + self._manual_disconnect = False + await self._start_reconnect_task() + count = len(self._command_queue) + if count > 0: + self._command_queue.clear() + _logger.info(f"Cleared {count} queued command(s)") + return count