Skip to content

Commit 9f158d1

Browse files
authored
Merge pull request #28 from eman/fix/mqtt-reconnection-improvements
Implement two-tier MQTT reconnection strategy with unlimited retries
2 parents d2eb6ee + 39f5769 commit 9f158d1

9 files changed

Lines changed: 340 additions & 51 deletions

File tree

.github/copilot-instructions.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ Report the results of these checks in your final summary.
4343
- **MQTT topics**: `cmd/{deviceType}/{deviceId}/ctrl` for control, `cmd/{deviceType}/{deviceId}/st` for status
4444
- **Command queuing**: Commands sent while disconnected are queued and sent when reconnected
4545
- **No base64 encoding/decoding** of MQTT payloads; all payloads are JSON-encoded/decoded
46+
- **Exception handling**: Use specific exception types instead of catch-all `except Exception`. Common types:
47+
- `AwsCrtError` - AWS IoT Core/MQTT errors
48+
- `AuthenticationError`, `TokenRefreshError` - Authentication errors
49+
- `RuntimeError` - Runtime state errors (not connected, etc.)
50+
- `ValueError` - Invalid values or parameters
51+
- `TypeError`, `AttributeError`, `KeyError` - Data structure errors
52+
- `asyncio.CancelledError` - Task cancellation
53+
- Only catch exceptions you can handle; let unexpected exceptions propagate
4654

4755
## Integration Points
4856
- **AWS IoT Core**: MQTT client uses `awscrt` and `awsiot` libraries for connection and messaging

src/nwp500/auth.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,34 @@ async def refresh_token(self, refresh_token: str) -> AuthTokens:
471471
_logger.error(f"Failed to parse refresh response: {e}")
472472
raise TokenRefreshError(f"Invalid response format: {str(e)}")
473473

474+
async def re_authenticate(self) -> AuthenticationResponse:
475+
"""
476+
Re-authenticate using stored credentials.
477+
478+
This is a convenience method that uses the stored user_id and password
479+
from initialization to perform a fresh sign-in. Useful for recovering
480+
from expired tokens or connection issues.
481+
482+
Returns:
483+
AuthenticationResponse with fresh tokens and user info
484+
485+
Raises:
486+
ValueError: If stored credentials are not available
487+
AuthenticationError: If authentication fails
488+
489+
Example:
490+
>>> client = NavienAuthClient(email, password)
491+
>>> await client.re_authenticate() # Uses stored credentials
492+
"""
493+
if not self.has_stored_credentials:
494+
raise ValueError(
495+
"No stored credentials available for re-authentication. "
496+
"Credentials must be provided during initialization."
497+
)
498+
499+
_logger.info("Re-authenticating with stored credentials")
500+
return await self.sign_in(self._user_id, self._password)
501+
474502
async def ensure_valid_token(self) -> Optional[AuthTokens]:
475503
"""
476504
Ensure we have a valid access token, refreshing if necessary.
@@ -526,6 +554,15 @@ def user_email(self) -> Optional[str]:
526554
"""Get the email address of the authenticated user."""
527555
return self._user_email
528556

557+
@property
558+
def has_stored_credentials(self) -> bool:
559+
"""Check if user credentials are stored for re-authentication.
560+
561+
Returns:
562+
True if both user_id and password are available for re-auth
563+
"""
564+
return bool(self._user_id and self._password)
565+
529566
async def close(self) -> None:
530567
"""Close the aiohttp session if we own it."""
531568
if self._owned_session and self._session:

src/nwp500/events.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,11 @@ async def emit(self, event: str, *args: Any, **kwargs: Any) -> int:
253253
self._once_callbacks.discard((event, listener.callback))
254254

255255
except Exception as e:
256+
# Catch all exceptions from user callbacks to ensure
257+
# resilience. We intentionally catch Exception here because:
258+
# 1. User callbacks can raise any exception type
259+
# 2. One bad callback shouldn't break other callbacks
260+
# 3. This is an event emitter pattern where resilience is key
256261
_logger.error(
257262
f"Error in '{event}' event handler: {e}",
258263
exc_info=True,

src/nwp500/mqtt_client.py

Lines changed: 124 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@
1919
from awscrt import mqtt
2020
from awscrt.exceptions import AwsCrtError
2121

22-
from .auth import NavienAuthClient
22+
from .auth import (
23+
AuthenticationError,
24+
NavienAuthClient,
25+
TokenRefreshError,
26+
)
2327
from .events import EventEmitter
2428
from .models import (
2529
Device,
@@ -205,7 +209,8 @@ def _schedule_coroutine(self, coro: Any) -> None:
205209
# Schedule the coroutine in the stored loop using thread-safe method
206210
try:
207211
asyncio.run_coroutine_threadsafe(coro, self._loop)
208-
except Exception as e:
212+
except RuntimeError as e:
213+
# Event loop is closed or not running
209214
_logger.error(f"Failed to schedule coroutine: {e}", exc_info=True)
210215

211216
def _on_connection_interrupted_internal(
@@ -218,7 +223,6 @@ def _on_connection_interrupted_internal(
218223
error: Error that caused the interruption
219224
**kwargs: Forward-compatibility kwargs from AWS SDK
220225
"""
221-
_logger.warning(f"Connection interrupted: {error}")
222226
self._connected = False
223227

224228
# Emit event
@@ -232,7 +236,7 @@ def _on_connection_interrupted_internal(
232236
# Fallback for callbacks expecting no arguments
233237
try:
234238
self._on_connection_interrupted() # type: ignore
235-
except Exception as e:
239+
except (TypeError, AttributeError) as e:
236240
_logger.error(
237241
f"Error in connection_interrupted callback: {e}"
238242
)
@@ -339,12 +343,113 @@ async def _active_reconnect(self) -> None:
339343
"No connection manager available for reconnection"
340344
)
341345

342-
except Exception as e:
346+
except (AwsCrtError, AuthenticationError, RuntimeError) as e:
343347
_logger.error(
344348
f"Error during active reconnection: {e}", exc_info=True
345349
)
346350
raise
347351

352+
async def _deep_reconnect(self) -> None:
353+
"""
354+
Perform a deep reconnection by completely rebuilding the connection.
355+
356+
This method is called after multiple quick reconnection failures.
357+
It performs a full teardown and rebuild:
358+
- Disconnects existing connection
359+
- Refreshes authentication tokens
360+
- Creates new connection manager
361+
- Re-establishes all subscriptions
362+
363+
This is more expensive but can recover from issues that a simple
364+
reconnection cannot fix (e.g., stale credentials, corrupted state).
365+
"""
366+
if self._connected:
367+
_logger.debug("Already connected, skipping deep reconnection")
368+
return
369+
370+
_logger.warning(
371+
"Performing deep reconnection (full rebuild)... "
372+
"This may take longer."
373+
)
374+
375+
try:
376+
# Step 1: Clean up existing connection if any
377+
if self._connection_manager:
378+
_logger.debug("Cleaning up old connection...")
379+
try:
380+
if self._connection_manager.is_connected:
381+
await self._connection_manager.disconnect()
382+
except (AwsCrtError, RuntimeError) as e:
383+
# Expected: connection already dead or in bad state
384+
_logger.debug(f"Error during cleanup: {e} (expected)")
385+
386+
# Step 2: Force token refresh to get fresh AWS credentials
387+
_logger.debug("Refreshing authentication tokens...")
388+
try:
389+
# Use the stored refresh token from current tokens
390+
current_tokens = self._auth_client.current_tokens
391+
if current_tokens and current_tokens.refresh_token:
392+
await self._auth_client.refresh_token(
393+
current_tokens.refresh_token
394+
)
395+
else:
396+
_logger.warning("No refresh token available")
397+
raise ValueError("No refresh token available for refresh")
398+
except (TokenRefreshError, ValueError, AuthenticationError) as e:
399+
# If refresh fails, try full re-authentication with stored
400+
# credentials
401+
if self._auth_client.has_stored_credentials:
402+
_logger.warning(
403+
f"Token refresh failed: {e}. Attempting full "
404+
"re-authentication..."
405+
)
406+
await self._auth_client.re_authenticate()
407+
else:
408+
_logger.error(
409+
"Cannot re-authenticate: no stored credentials"
410+
)
411+
raise
412+
413+
# Step 3: Create completely new connection manager
414+
_logger.debug("Creating new connection manager...")
415+
self._connection_manager = MqttConnection(
416+
config=self.config,
417+
auth_client=self._auth_client,
418+
on_connection_interrupted=self._on_connection_interrupted_internal,
419+
on_connection_resumed=self._on_connection_resumed_internal,
420+
)
421+
422+
# Step 4: Attempt connection
423+
success = await self._connection_manager.connect()
424+
425+
if success:
426+
# Update connection references
427+
self._connection = self._connection_manager.connection
428+
self._connected = True
429+
430+
# Step 5: Re-establish subscriptions
431+
if self._subscription_manager and self._connection:
432+
_logger.debug("Re-establishing subscriptions...")
433+
self._subscription_manager.update_connection(
434+
self._connection
435+
)
436+
await self._subscription_manager.resubscribe_all()
437+
438+
_logger.info(
439+
"Deep reconnection successful - fully rebuilt connection"
440+
)
441+
else:
442+
_logger.error("Deep reconnection failed to connect")
443+
444+
except (
445+
AwsCrtError,
446+
AuthenticationError,
447+
RuntimeError,
448+
ValueError,
449+
) as e:
450+
_logger.error(f"Error during deep reconnection: {e}", exc_info=True)
451+
raise
452+
348453
async def connect(self) -> bool:
349454
"""
350455
Establish connection to AWS IoT Core.
@@ -394,6 +499,7 @@ async def connect(self) -> bool:
394499
is_connected_func=lambda: self._connected,
395500
schedule_coroutine_func=self._schedule_coroutine,
396501
reconnect_func=self._active_reconnect,
502+
deep_reconnect_func=self._deep_reconnect,
397503
emit_event_func=self.emit,
398504
)
399505
self._reconnection_handler.enable()
@@ -428,7 +534,12 @@ async def connect(self) -> bool:
428534

429535
return False
430536

431-
except Exception as e:
537+
except (
538+
AwsCrtError,
539+
AuthenticationError,
540+
RuntimeError,
541+
ValueError,
542+
) as e:
432543
_logger.error(f"Failed to connect: {e}")
433544
raise
434545

@@ -473,7 +584,7 @@ async def disconnect(self) -> None:
473584
self._connection = None
474585

475586
_logger.info("Disconnected successfully")
476-
except Exception as e:
587+
except (AwsCrtError, RuntimeError) as e:
477588
_logger.error(f"Error during disconnect: {e}")
478589
raise
479590

@@ -493,7 +604,7 @@ def _on_message_received(
493604

494605
except json.JSONDecodeError as e:
495606
_logger.error(f"Failed to parse message payload: {e}")
496-
except Exception as e:
607+
except (AttributeError, KeyError, TypeError) as e:
497608
_logger.error(f"Error processing message: {e}")
498609

499610
def _topic_matches_pattern(self, topic: str, pattern: str) -> bool:
@@ -618,12 +729,11 @@ async def publish(
618729

619730
try:
620731
return await self._connection_manager.publish(topic, payload, qos)
621-
except Exception as e:
732+
except AwsCrtError as e:
622733
# Handle clean session cancellation gracefully
623-
# Check exception type and name attribute for proper
624-
# error identification
734+
# Safely check e.name attribute (may not exist or be None)
625735
if (
626-
isinstance(e, AwsCrtError)
736+
hasattr(e, "name")
627737
and e.name == "AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION"
628738
):
629739
_logger.warning(
@@ -641,9 +751,9 @@ async def publish(
641751
raise RuntimeError(
642752
"Publish cancelled due to clean session and "
643753
"command queue is disabled"
644-
)
754+
) from e
645755

646-
# Note: redact_topic is already used elsewhere in the file
756+
# Other AWS CRT errors
647757
_logger.error(f"Failed to publish to topic: {e}")
648758
raise
649759

src/nwp500/mqtt_connection.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from typing import TYPE_CHECKING, Any, Callable, Optional, Union
1313

1414
from awscrt import mqtt
15+
from awscrt.exceptions import AwsCrtError
1516
from awsiot import mqtt_connection_builder
1617

1718
if TYPE_CHECKING:
@@ -147,7 +148,7 @@ async def connect(self) -> bool:
147148

148149
return True
149150

150-
except Exception as e:
151+
except (AwsCrtError, RuntimeError, ValueError) as e:
151152
_logger.error(f"Failed to connect: {e}")
152153
raise
153154

@@ -195,7 +196,7 @@ async def disconnect(self) -> None:
195196
self._connected = False
196197
self._connection = None
197198
_logger.info("Disconnected successfully")
198-
except Exception as e:
199+
except (AwsCrtError, RuntimeError) as e:
199200
_logger.error(f"Error during disconnect: {e}")
200201
raise
201202

src/nwp500/mqtt_periodic.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,13 @@ async def periodic_request() -> None:
186186
f"for {redacted_device_id}"
187187
)
188188
break
189-
except Exception as e:
189+
except (AwsCrtError, RuntimeError) as e:
190190
# Handle clean session cancellation gracefully (expected
191191
# during reconnection)
192-
# Check exception type and name attribute for proper error
193-
# identification
192+
# Safely check exception name attribute
194193
if (
195194
isinstance(e, AwsCrtError)
195+
and hasattr(e, "name")
196196
and e.name
197197
== "AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION"
198198
):

0 commit comments

Comments
 (0)