Skip to content

Fix MQTT reconnection handling and add automatic recovery#16

Merged
eman merged 13 commits intomainfrom
fix/mqtt-reconnection-improvements
Oct 16, 2025
Merged

Fix MQTT reconnection handling and add automatic recovery#16
eman merged 13 commits intomainfrom
fix/mqtt-reconnection-improvements

Conversation

@eman
Copy link
Copy Markdown
Owner

@eman eman commented Oct 16, 2025

Summary

This PR fixes MQTT reconnection handling and adds comprehensive automatic recovery capabilities to address connection issues that cause log spam and require manual intervention.

Problems Solved

1. Clean Session Error Spam ❌ → ✅

Before: AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION errors were logged as ERROR with full stack traces during normal reconnection
After: Handled gracefully with WARNING/DEBUG level logging, commands automatically queued for retry

2. Excessive "Not Connected" Warnings ❌ → ✅

Before: Warnings logged every 5 minutes indefinitely during disconnection
After: Logged only on 1st occurrence and every 10th skip (90% reduction in log noise)

3. Orphaned Periodic Tasks ❌ → ✅

Before: Tasks continued running and logging warnings indefinitely after reconnection failed
After: Automatically stopped when max reconnection attempts exhausted

4. No Automatic Recovery ❌ → ✅

Before: Manual intervention required after permanent connection failure
After: New reconnection_failed event enables automatic recovery with token refresh and client recreation

Changes

Core MQTT Client (src/nwp500/mqtt_client.py)

  • ✅ Graceful clean session error handling in publish() method
  • ✅ Throttled logging for connection state (1st + every 10th occurrence)
  • ✅ New _stop_all_periodic_tasks() method for cleanup
  • ✅ New reconnection_failed event for programmatic recovery
  • ✅ Enhanced error handling in periodic requests

Documentation

  • 📚 docs/AUTO_RECOVERY.md - Comprehensive guide with 4 recovery strategies
  • 📚 docs/AUTO_RECOVERY_QUICK.md - Quick reference with copy-paste code
  • 📚 CLEAN_SESSION_FIX.md - Detailed fix explanation with examples

Examples

  • 💻 examples/simple_auto_recovery.py - Production-ready ResilientMqttClient wrapper (recommended)
  • 💻 examples/auto_recovery_example.py - All 4 strategies demonstrated

Example: Before vs After

Before

2025-10-16 09:43:31.359 ERROR (MainThread) [nwp500.mqtt_client] Failed to publish to 'cmd/52/navilink-04786332fca0/st/did': AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION: Old requests from the previous session are cancelled
Traceback (most recent call last):
  File "/usr/local/lib/python3.13/site-packages/nwp500/mqtt_client.py", line 1472, in periodic_request
    await self.request_device_info(device)
  ...
awscrt.exceptions.AwsCrtError: AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION

2025-10-16 09:45:29.412 WARNING (MainThread) [nwp500.mqtt_client] Not connected, skipping device_status request
2025-10-16 09:50:29.413 WARNING (MainThread) [nwp500.mqtt_client] Not connected, skipping device_status request
... (repeats every 5 minutes forever)
2025-10-16 09:51:38.381 ERROR (MainThread) [nwp500.mqtt_client] Failed to reconnect after 10 attempts. Manual reconnection required.
2025-10-16 09:55:29.415 WARNING (MainThread) [nwp500.mqtt_client] Not connected, skipping device_status request
... (continues indefinitely)

After

2025-10-16 09:43:31.359 WARNING (MainThread) [nwp500.mqtt_client] Publish cancelled due to clean session (topic: cmd/52/...). This is expected during reconnection.
2025-10-16 09:43:31.359 DEBUG (MainThread) [nwp500.mqtt_client] Queuing command due to clean session cancellation

2025-10-16 09:45:29.412 WARNING (MainThread) [nwp500.mqtt_client] Not connected, skipping device_status request (skipped 1 time)
2025-10-16 09:50:29.413 DEBUG (MainThread) [nwp500.mqtt_client] Not connected, skipping device_status request
... (DEBUG level for skips 2-9)
2025-10-16 10:35:29.420 WARNING (MainThread) [nwp500.mqtt_client] Not connected, skipping device_status request (skipped 10 times)

2025-10-16 09:51:38.381 ERROR (MainThread) [nwp500.mqtt_client] Failed to reconnect after 10 attempts. Manual reconnection required.
2025-10-16 09:51:38.382 INFO (MainThread) [nwp500.mqtt_client] Stopping 2 periodic task(s) due to connection failure
2025-10-16 09:51:38.383 INFO (MainThread) [nwp500.mqtt_client] All periodic tasks stopped

(With ResilientMqttClient:)
2025-10-16 09:52:38.383 INFO (MainThread) Starting recovery attempt 1/10
2025-10-16 09:52:38.384 INFO (MainThread) Refreshing authentication tokens...
2025-10-16 09:52:39.125 INFO (MainThread) Recreating MQTT client...
2025-10-16 09:52:40.891 INFO (MainThread) ✅ Connected: navien-client-abc123
2025-10-16 09:52:40.892 INFO (MainThread) Subscriptions restored
2025-10-16 09:52:40.892 INFO (MainThread) ✅ Recovery successful!

Production-Ready Auto-Recovery

The new ResilientMqttClient wrapper (in examples/simple_auto_recovery.py) provides:

  • ✅ Automatic recovery from permanent connection failures
  • ✅ Exponential backoff to prevent server overload
  • ✅ Automatic token refresh
  • ✅ Clean client recreation
  • ✅ Subscription restoration
  • ✅ Configurable limits

Usage:

from examples.simple_auto_recovery import ResilientMqttClient

client = ResilientMqttClient(auth_client)
await client.connect(device, status_callback=on_status)
# Now automatically recovers from connection failures!

Testing

  • ✅ All 29 existing tests pass
  • ✅ All linting checks pass (ruff check and ruff format)
  • ✅ No breaking changes to public API
  • ✅ Backward compatible

Documentation

Complete documentation provided:

  • 📖 Quick Start: docs/AUTO_RECOVERY_QUICK.md - Copy-paste ready code
  • 📖 Full Guide: docs/AUTO_RECOVERY.md - All strategies explained
  • 📖 Fix Details: CLEAN_SESSION_FIX.md - Technical explanation

Impact

This PR significantly improves the reliability and maintainability of MQTT connections:

  • Log noise reduction: ~90% fewer repetitive warnings
  • Production readiness: Automatic recovery eliminates manual intervention
  • Developer experience: Clear documentation and ready-to-use examples
  • Monitoring: New reconnection_failed event enables alerting/monitoring

Related Issues

Fixes:

  • Clean session error spam during reconnection
  • Indefinite "Not connected" warnings after reconnection failure
  • Lack of automatic recovery for production deployments

This commit addresses multiple MQTT connection issues and adds comprehensive
automatic recovery capabilities.

## Problems Fixed

1. **Clean Session Errors**: `AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION`
   errors were logged as ERROR with full stack traces during normal
   reconnection, causing log noise and user confusion.

2. **Excessive Logging**: When disconnected, "Not connected" warnings were
   logged every 5 minutes indefinitely, filling logs with repetitive messages.

3. **Orphaned Periodic Tasks**: After max reconnection attempts were exhausted,
   periodic request tasks continued running indefinitely, generating warnings.

4. **No Recovery Mechanism**: Users had no way to automatically recover from
   permanent connection failures without manual intervention.

## Changes Made

### Core MQTT Client (`src/nwp500/mqtt_client.py`)

- **Graceful clean session handling**: Catch and handle
  `AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION` errors in `publish()` method,
  automatically queuing commands for retry instead of raising exceptions.

- **Throttled disconnect logging**: Log "not connected" warnings only on 1st
  occurrence and every 10th skip thereafter, reducing log noise by ~90%.

- **Automatic task cleanup**: Stop all periodic tasks when max reconnection
  attempts are exhausted via new `_stop_all_periodic_tasks()` method.

- **New event**: Added `reconnection_failed` event emitted when client fails
  to reconnect after max attempts, enabling programmatic recovery.

- **Enhanced error handling**: Clean session cancellations in periodic requests
  are now logged at DEBUG level instead of ERROR.

### Documentation

- **`docs/AUTO_RECOVERY.md`**: Comprehensive guide covering 4 recovery strategies
  from simple to production-ready, with configuration examples and best practices.

- **`docs/AUTO_RECOVERY_QUICK.md`**: Quick reference with copy-paste ready
  `ResilientMqttClient` wrapper class for immediate use.

- **`CLEAN_SESSION_FIX.md`**: Detailed explanation of the fixes with before/after
  logging examples and links to recovery documentation.

### Examples

- **`examples/simple_auto_recovery.py`**: Production-ready `ResilientMqttClient`
  wrapper demonstrating automatic recovery with exponential backoff and token
  refresh (recommended for production use).

- **`examples/auto_recovery_example.py`**: Educational example showing all 4
  recovery strategies side-by-side with detailed explanations.

## Impact

**Before:**
- ERROR logs with stack traces for expected clean session behavior
- Continuous WARNING logs every 5 minutes during disconnection
- Manual intervention required after reconnection failure

**After:**
- Clean session errors handled gracefully (WARNING/DEBUG level)
- Throttled logging reduces noise by 90%
- Automatic recovery with token refresh and client recreation
- Production-ready wrapper class for easy integration
- Comprehensive documentation and examples

## Testing

- All 29 existing tests pass
- All linting checks pass
- No breaking changes to public API
- Backward compatible

## Related Issues

Fixes issues with:
- `AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION` error spam
- Indefinite "Not connected" warnings after reconnection failure
- Lack of automatic recovery mechanism for production deployments
eman and others added 2 commits October 16, 2025 11:18
…ensitive information

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
@eman eman requested a review from Copilot October 16, 2025 18:24
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Implements improved MQTT reconnection handling and introduces automatic recovery mechanisms to reduce log noise and enable self-healing after permanent failures.

  • Adds reconnection_failed event and graceful handling of clean session cancellation.
  • Introduces throttled logging and internal cleanup of periodic tasks on permanent failure.
  • Provides example recovery strategies and production-ready ResilientMqttClient wrapper with supporting documentation.

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
src/nwp500/mqtt_client.py Adds reconnection_failed event, clean session handling, throttled periodic request logging, and internal task cleanup method.
examples/simple_auto_recovery.py Provides production-oriented wrapper implementing automated recovery workflow.
examples/auto_recovery_example.py Demonstrates four distinct recovery strategies for educational comparison.
docs/AUTO_RECOVERY_QUICK.md Supplies quick-start, copy/paste recovery wrapper and operational timeline.
docs/AUTO_RECOVERY.md Full conceptual and implementation guide for recovery strategies and best practices.

Comment on lines +41 to +54
async def _handle_recovery(self, attempts):
self.recovery_attempt += 1
if self.recovery_attempt >= self.max_recovery_attempts:
return # Give up

await asyncio.sleep(self.recovery_delay)

try:
await self.auth_client.refresh_token()
await self._create_client()
self.recovery_attempt = 0 # Reset on success
except Exception:
pass # Will retry on next reconnection_failed

Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Snippet uses asyncio.sleep and async/await but does not show 'import asyncio'; add the import for copy/paste usability. Also avoid bare 'except Exception: pass'—log a minimal message (e.g., logger.warning) so silent failures do not obscure recurring recovery issues in production.

Copilot uses AI. Check for mistakes.
Comment on lines +125 to +132
# Wait before attempting recovery
logger.info(f"Waiting {self.recovery_delay} seconds before recovery...")
await asyncio.sleep(self.recovery_delay)

try:
# Refresh authentication tokens
logger.info("Refreshing authentication tokens...")
await self.auth_client.refresh_token()
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If multiple reconnection_failed events fire before a prior recovery attempt finishes sleeping, overlapping recovery attempts could race (e.g., simultaneous client recreation). Consider adding an async lock or a boolean 'recovery_in_progress' guard to serialize recovery cycles.

Copilot uses AI. Check for mistakes.
eman and others added 5 commits October 16, 2025 11:36
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Use proper exception type checking instead of string matching for
AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION error detection.

Changes:
- Import AwsCrtError from awscrt.exceptions
- Check isinstance(e, AwsCrtError) and e.name attribute
- Prevents false positives from string matching
- More robust against future message format changes
- Follows Python exception handling best practices

The previous string matching approach was brittle and could match
unrelated exceptions that happened to contain the error name in
their message. The new approach checks the exception type and
uses the dedicated 'name' attribute provided by AwsCrtError.
Address code review feedback by consolidating the duplicate periodic task
stopping logic into a single implementation.

Changes:
- Remove duplicate implementation in _stop_all_periodic_tasks()
- Have private method delegate to public stop_all_periodic_tasks()
- Add optional _reason parameter for context-specific logging
- Maintains behavior: logs 'due to connection failure' when called internally

Benefits:
- Single source of truth for periodic task cleanup
- Reduced maintenance surface (no code divergence)
- Preserves distinct logging for different contexts
- No behavior changes, all tests pass

Addresses: #16 (comment)
Address code scanning alert #91 by redacting sensitive information
(MAC addresses) from MQTT topics in log messages.

Changes:
- Add _redact_topic() function to strip MAC addresses from topics
- Apply redaction to all ERROR and WARNING level logs containing topics
- Pattern matches 'navilink-{12-hex-chars}' and replaces with 'navilink-REDACTED'
- Prevents clear-text logging of sensitive MAC addresses

Fixed locations:
- publish() method: WARNING for clean session cancellation
- publish() method: ERROR for publish failures
- subscribe() method: ERROR for subscription failures
- unsubscribe() method: ERROR for unsubscribe failures
- _send_queued_commands(): ERROR for queued command failures

Example:
  Before: 'cmd/52/navilink-04786332fca0/st/did'
  After:  'cmd/52/navilink-REDACTED/st/did'

Fixes: https://github.com/eman/nwp500-python/security/code-scanning/91
eman and others added 5 commits October 16, 2025 12:00
…ensitive information

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
…ensitive information

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
Fix multiple code review comments from Copilot and security scanning:

1. **Fix ambiguous return value (comment #2437035678)**
   - publish() now raises RuntimeError when clean session fails and queue disabled
   - Prevents ambiguous return value (0 for both queued and ignored)
   - Callers can differentiate between queued (0) and error (exception)

2. **Add public API for reconnection reset (comment #2437035791)**
   - New reset_reconnect() method replaces private attribute access
   - Encapsulates internal state reset logic
   - Prevents coupling to internal implementation details
   - Examples updated to use public API

3. **Improve documentation (comment #2437035819)**
   - Add missing 'import asyncio' in AUTO_RECOVERY_QUICK.md
   - Replace bare 'except Exception: pass' with proper logging
   - Better copy-paste usability for users

4. **Fix race condition (comment #2437035841)**
   - Add _recovery_in_progress guard in simple_auto_recovery.py
   - Prevents overlapping recovery attempts if multiple events fire
   - Uses try/finally to ensure flag is always reset

Changes:
- src/nwp500/mqtt_client.py: Add reset_reconnect() method, fix return value
- examples/simple_auto_recovery.py: Add race condition guard
- docs/AUTO_RECOVERY_QUICK.md: Add asyncio import, improve error handling

All tests pass. Ready for review.
Convert AUTO_RECOVERY.md and AUTO_RECOVERY_QUICK.md to reStructuredText
format and add them to the Sphinx documentation.

Changes:
- Remove docs/AUTO_RECOVERY.md (markdown)
- Remove docs/AUTO_RECOVERY_QUICK.md (markdown)
- Add docs/AUTO_RECOVERY.rst (reStructuredText)
- Add docs/AUTO_RECOVERY_QUICK.rst (reStructuredText)
- Update docs/index.rst to include new documentation pages

New Documentation Pages:
- AUTO_RECOVERY_QUICK.rst: Quick reference with copy-paste code
- AUTO_RECOVERY.rst: Complete guide with 4 strategies explained

The documentation is now:
- Fully integrated with Sphinx
- Properly cross-referenced with other docs
- Searchable through the documentation site
- Includes proper RST formatting (code blocks, tables, cross-refs)

Sphinx build succeeds with no warnings.
HTML output generated at docs/_build/html/AUTO_RECOVERY*.html
@eman eman merged commit bd57b22 into main Oct 16, 2025
10 checks passed
@eman eman deleted the fix/mqtt-reconnection-improvements branch October 16, 2025 19:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants