Fix MQTT reconnection handling and add automatic recovery#16
Conversation
This commit addresses multiple MQTT connection issues and adds comprehensive automatic recovery capabilities. ## Problems Fixed 1. **Clean Session Errors**: `AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION` errors were logged as ERROR with full stack traces during normal reconnection, causing log noise and user confusion. 2. **Excessive Logging**: When disconnected, "Not connected" warnings were logged every 5 minutes indefinitely, filling logs with repetitive messages. 3. **Orphaned Periodic Tasks**: After max reconnection attempts were exhausted, periodic request tasks continued running indefinitely, generating warnings. 4. **No Recovery Mechanism**: Users had no way to automatically recover from permanent connection failures without manual intervention. ## Changes Made ### Core MQTT Client (`src/nwp500/mqtt_client.py`) - **Graceful clean session handling**: Catch and handle `AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION` errors in `publish()` method, automatically queuing commands for retry instead of raising exceptions. - **Throttled disconnect logging**: Log "not connected" warnings only on 1st occurrence and every 10th skip thereafter, reducing log noise by ~90%. - **Automatic task cleanup**: Stop all periodic tasks when max reconnection attempts are exhausted via new `_stop_all_periodic_tasks()` method. - **New event**: Added `reconnection_failed` event emitted when client fails to reconnect after max attempts, enabling programmatic recovery. - **Enhanced error handling**: Clean session cancellations in periodic requests are now logged at DEBUG level instead of ERROR. ### Documentation - **`docs/AUTO_RECOVERY.md`**: Comprehensive guide covering 4 recovery strategies from simple to production-ready, with configuration examples and best practices. - **`docs/AUTO_RECOVERY_QUICK.md`**: Quick reference with copy-paste ready `ResilientMqttClient` wrapper class for immediate use. - **`CLEAN_SESSION_FIX.md`**: Detailed explanation of the fixes with before/after logging examples and links to recovery documentation. ### Examples - **`examples/simple_auto_recovery.py`**: Production-ready `ResilientMqttClient` wrapper demonstrating automatic recovery with exponential backoff and token refresh (recommended for production use). - **`examples/auto_recovery_example.py`**: Educational example showing all 4 recovery strategies side-by-side with detailed explanations. ## Impact **Before:** - ERROR logs with stack traces for expected clean session behavior - Continuous WARNING logs every 5 minutes during disconnection - Manual intervention required after reconnection failure **After:** - Clean session errors handled gracefully (WARNING/DEBUG level) - Throttled logging reduces noise by 90% - Automatic recovery with token refresh and client recreation - Production-ready wrapper class for easy integration - Comprehensive documentation and examples ## Testing - All 29 existing tests pass - All linting checks pass - No breaking changes to public API - Backward compatible ## Related Issues Fixes issues with: - `AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION` error spam - Indefinite "Not connected" warnings after reconnection failure - Lack of automatic recovery mechanism for production deployments
…ensitive information Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Pull Request Overview
Implements improved MQTT reconnection handling and introduces automatic recovery mechanisms to reduce log noise and enable self-healing after permanent failures.
- Adds reconnection_failed event and graceful handling of clean session cancellation.
- Introduces throttled logging and internal cleanup of periodic tasks on permanent failure.
- Provides example recovery strategies and production-ready ResilientMqttClient wrapper with supporting documentation.
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| src/nwp500/mqtt_client.py | Adds reconnection_failed event, clean session handling, throttled periodic request logging, and internal task cleanup method. |
| examples/simple_auto_recovery.py | Provides production-oriented wrapper implementing automated recovery workflow. |
| examples/auto_recovery_example.py | Demonstrates four distinct recovery strategies for educational comparison. |
| docs/AUTO_RECOVERY_QUICK.md | Supplies quick-start, copy/paste recovery wrapper and operational timeline. |
| docs/AUTO_RECOVERY.md | Full conceptual and implementation guide for recovery strategies and best practices. |
docs/AUTO_RECOVERY_QUICK.md
Outdated
| async def _handle_recovery(self, attempts): | ||
| self.recovery_attempt += 1 | ||
| if self.recovery_attempt >= self.max_recovery_attempts: | ||
| return # Give up | ||
|
|
||
| await asyncio.sleep(self.recovery_delay) | ||
|
|
||
| try: | ||
| await self.auth_client.refresh_token() | ||
| await self._create_client() | ||
| self.recovery_attempt = 0 # Reset on success | ||
| except Exception: | ||
| pass # Will retry on next reconnection_failed | ||
|
|
There was a problem hiding this comment.
Snippet uses asyncio.sleep and async/await but does not show 'import asyncio'; add the import for copy/paste usability. Also avoid bare 'except Exception: pass'—log a minimal message (e.g., logger.warning) so silent failures do not obscure recurring recovery issues in production.
examples/simple_auto_recovery.py
Outdated
| # Wait before attempting recovery | ||
| logger.info(f"Waiting {self.recovery_delay} seconds before recovery...") | ||
| await asyncio.sleep(self.recovery_delay) | ||
|
|
||
| try: | ||
| # Refresh authentication tokens | ||
| logger.info("Refreshing authentication tokens...") | ||
| await self.auth_client.refresh_token() |
There was a problem hiding this comment.
If multiple reconnection_failed events fire before a prior recovery attempt finishes sleeping, overlapping recovery attempts could race (e.g., simultaneous client recreation). Consider adding an async lock or a boolean 'recovery_in_progress' guard to serialize recovery cycles.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Use proper exception type checking instead of string matching for AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION error detection. Changes: - Import AwsCrtError from awscrt.exceptions - Check isinstance(e, AwsCrtError) and e.name attribute - Prevents false positives from string matching - More robust against future message format changes - Follows Python exception handling best practices The previous string matching approach was brittle and could match unrelated exceptions that happened to contain the error name in their message. The new approach checks the exception type and uses the dedicated 'name' attribute provided by AwsCrtError.
Address code review feedback by consolidating the duplicate periodic task stopping logic into a single implementation. Changes: - Remove duplicate implementation in _stop_all_periodic_tasks() - Have private method delegate to public stop_all_periodic_tasks() - Add optional _reason parameter for context-specific logging - Maintains behavior: logs 'due to connection failure' when called internally Benefits: - Single source of truth for periodic task cleanup - Reduced maintenance surface (no code divergence) - Preserves distinct logging for different contexts - No behavior changes, all tests pass Addresses: #16 (comment)
Address code scanning alert #91 by redacting sensitive information
(MAC addresses) from MQTT topics in log messages.
Changes:
- Add _redact_topic() function to strip MAC addresses from topics
- Apply redaction to all ERROR and WARNING level logs containing topics
- Pattern matches 'navilink-{12-hex-chars}' and replaces with 'navilink-REDACTED'
- Prevents clear-text logging of sensitive MAC addresses
Fixed locations:
- publish() method: WARNING for clean session cancellation
- publish() method: ERROR for publish failures
- subscribe() method: ERROR for subscription failures
- unsubscribe() method: ERROR for unsubscribe failures
- _send_queued_commands(): ERROR for queued command failures
Example:
Before: 'cmd/52/navilink-04786332fca0/st/did'
After: 'cmd/52/navilink-REDACTED/st/did'
Fixes: https://github.com/eman/nwp500-python/security/code-scanning/91
…ensitive information Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
…ensitive information Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
Fix multiple code review comments from Copilot and security scanning: 1. **Fix ambiguous return value (comment #2437035678)** - publish() now raises RuntimeError when clean session fails and queue disabled - Prevents ambiguous return value (0 for both queued and ignored) - Callers can differentiate between queued (0) and error (exception) 2. **Add public API for reconnection reset (comment #2437035791)** - New reset_reconnect() method replaces private attribute access - Encapsulates internal state reset logic - Prevents coupling to internal implementation details - Examples updated to use public API 3. **Improve documentation (comment #2437035819)** - Add missing 'import asyncio' in AUTO_RECOVERY_QUICK.md - Replace bare 'except Exception: pass' with proper logging - Better copy-paste usability for users 4. **Fix race condition (comment #2437035841)** - Add _recovery_in_progress guard in simple_auto_recovery.py - Prevents overlapping recovery attempts if multiple events fire - Uses try/finally to ensure flag is always reset Changes: - src/nwp500/mqtt_client.py: Add reset_reconnect() method, fix return value - examples/simple_auto_recovery.py: Add race condition guard - docs/AUTO_RECOVERY_QUICK.md: Add asyncio import, improve error handling All tests pass. Ready for review.
Convert AUTO_RECOVERY.md and AUTO_RECOVERY_QUICK.md to reStructuredText format and add them to the Sphinx documentation. Changes: - Remove docs/AUTO_RECOVERY.md (markdown) - Remove docs/AUTO_RECOVERY_QUICK.md (markdown) - Add docs/AUTO_RECOVERY.rst (reStructuredText) - Add docs/AUTO_RECOVERY_QUICK.rst (reStructuredText) - Update docs/index.rst to include new documentation pages New Documentation Pages: - AUTO_RECOVERY_QUICK.rst: Quick reference with copy-paste code - AUTO_RECOVERY.rst: Complete guide with 4 strategies explained The documentation is now: - Fully integrated with Sphinx - Properly cross-referenced with other docs - Searchable through the documentation site - Includes proper RST formatting (code blocks, tables, cross-refs) Sphinx build succeeds with no warnings. HTML output generated at docs/_build/html/AUTO_RECOVERY*.html
Summary
This PR fixes MQTT reconnection handling and adds comprehensive automatic recovery capabilities to address connection issues that cause log spam and require manual intervention.
Problems Solved
1. Clean Session Error Spam ❌ → ✅
Before:
AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSIONerrors were logged as ERROR with full stack traces during normal reconnectionAfter: Handled gracefully with WARNING/DEBUG level logging, commands automatically queued for retry
2. Excessive "Not Connected" Warnings ❌ → ✅
Before: Warnings logged every 5 minutes indefinitely during disconnection
After: Logged only on 1st occurrence and every 10th skip (90% reduction in log noise)
3. Orphaned Periodic Tasks ❌ → ✅
Before: Tasks continued running and logging warnings indefinitely after reconnection failed
After: Automatically stopped when max reconnection attempts exhausted
4. No Automatic Recovery ❌ → ✅
Before: Manual intervention required after permanent connection failure
After: New
reconnection_failedevent enables automatic recovery with token refresh and client recreationChanges
Core MQTT Client (
src/nwp500/mqtt_client.py)publish()method_stop_all_periodic_tasks()method for cleanupreconnection_failedevent for programmatic recoveryDocumentation
docs/AUTO_RECOVERY.md- Comprehensive guide with 4 recovery strategiesdocs/AUTO_RECOVERY_QUICK.md- Quick reference with copy-paste codeCLEAN_SESSION_FIX.md- Detailed fix explanation with examplesExamples
examples/simple_auto_recovery.py- Production-readyResilientMqttClientwrapper (recommended)examples/auto_recovery_example.py- All 4 strategies demonstratedExample: Before vs After
Before
After
Production-Ready Auto-Recovery
The new
ResilientMqttClientwrapper (inexamples/simple_auto_recovery.py) provides:Usage:
Testing
ruff checkandruff format)Documentation
Complete documentation provided:
docs/AUTO_RECOVERY_QUICK.md- Copy-paste ready codedocs/AUTO_RECOVERY.md- All strategies explainedCLEAN_SESSION_FIX.md- Technical explanationImpact
This PR significantly improves the reliability and maintainability of MQTT connections:
reconnection_failedevent enables alerting/monitoringRelated Issues
Fixes: