Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ Report the results of these checks in your final summary.
- **MQTT topics**: `cmd/{deviceType}/{deviceId}/ctrl` for control, `cmd/{deviceType}/{deviceId}/st` for status
- **Command queuing**: Commands sent while disconnected are queued and sent when reconnected
- **No base64 encoding/decoding** of MQTT payloads; all payloads are JSON-encoded/decoded
- **Exception handling**: Use specific exception types instead of catch-all `except Exception`. Common types:
- `AwsCrtError` - AWS IoT Core/MQTT errors
- `AuthenticationError`, `TokenRefreshError` - Authentication errors
- `RuntimeError` - Runtime state errors (not connected, etc.)
- `ValueError` - Invalid values or parameters
- `TypeError`, `AttributeError`, `KeyError` - Data structure errors
- `asyncio.CancelledError` - Task cancellation
- Only catch exceptions you can handle; let unexpected exceptions propagate

## Integration Points
- **AWS IoT Core**: MQTT client uses `awscrt` and `awsiot` libraries for connection and messaging
Expand Down
37 changes: 37 additions & 0 deletions src/nwp500/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,34 @@ async def refresh_token(self, refresh_token: str) -> AuthTokens:
_logger.error(f"Failed to parse refresh response: {e}")
raise TokenRefreshError(f"Invalid response format: {str(e)}")

async def re_authenticate(self) -> AuthenticationResponse:
"""
Re-authenticate using stored credentials.

This is a convenience method that uses the stored user_id and password
from initialization to perform a fresh sign-in. Useful for recovering
from expired tokens or connection issues.

Returns:
AuthenticationResponse with fresh tokens and user info

Raises:
ValueError: If stored credentials are not available
AuthenticationError: If authentication fails

Example:
>>> client = NavienAuthClient(email, password)
>>> await client.re_authenticate() # Uses stored credentials
"""
if not self.has_stored_credentials:
raise ValueError(
"No stored credentials available for re-authentication. "
"Credentials must be provided during initialization."
)

_logger.info("Re-authenticating with stored credentials")
return await self.sign_in(self._user_id, self._password)

async def ensure_valid_token(self) -> Optional[AuthTokens]:
"""
Ensure we have a valid access token, refreshing if necessary.
Expand Down Expand Up @@ -526,6 +554,15 @@ def user_email(self) -> Optional[str]:
"""Get the email address of the authenticated user."""
return self._user_email

@property
def has_stored_credentials(self) -> bool:
"""Check if user credentials are stored for re-authentication.

Returns:
True if both user_id and password are available for re-auth
"""
return bool(self._user_id and self._password)

async def close(self) -> None:
"""Close the aiohttp session if we own it."""
if self._owned_session and self._session:
Expand Down
5 changes: 5 additions & 0 deletions src/nwp500/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ async def emit(self, event: str, *args: Any, **kwargs: Any) -> int:
self._once_callbacks.discard((event, listener.callback))

except Exception as e:
# Catch all exceptions from user callbacks to ensure
# resilience. We intentionally catch Exception here because:
# 1. User callbacks can raise any exception type
# 2. One bad callback shouldn't break other callbacks
# 3. This is an event emitter pattern where resilience is key
_logger.error(
f"Error in '{event}' event handler: {e}",
exc_info=True,
Expand Down
138 changes: 124 additions & 14 deletions src/nwp500/mqtt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
from awscrt import mqtt
from awscrt.exceptions import AwsCrtError

from .auth import NavienAuthClient
from .auth import (
AuthenticationError,
NavienAuthClient,
TokenRefreshError,
)
from .events import EventEmitter
from .models import (
Device,
Expand Down Expand Up @@ -205,7 +209,8 @@ def _schedule_coroutine(self, coro: Any) -> None:
# Schedule the coroutine in the stored loop using thread-safe method
try:
asyncio.run_coroutine_threadsafe(coro, self._loop)
except Exception as e:
except RuntimeError as e:
# Event loop is closed or not running
_logger.error(f"Failed to schedule coroutine: {e}", exc_info=True)

def _on_connection_interrupted_internal(
Expand All @@ -218,7 +223,6 @@ def _on_connection_interrupted_internal(
error: Error that caused the interruption
**kwargs: Forward-compatibility kwargs from AWS SDK
"""
_logger.warning(f"Connection interrupted: {error}")
self._connected = False

# Emit event
Expand All @@ -232,7 +236,7 @@ def _on_connection_interrupted_internal(
# Fallback for callbacks expecting no arguments
try:
self._on_connection_interrupted() # type: ignore
except Exception as e:
except (TypeError, AttributeError) as e:
_logger.error(
f"Error in connection_interrupted callback: {e}"
)
Expand Down Expand Up @@ -339,12 +343,113 @@ async def _active_reconnect(self) -> None:
"No connection manager available for reconnection"
)

except Exception as e:
except (AwsCrtError, AuthenticationError, RuntimeError) as e:
_logger.error(
f"Error during active reconnection: {e}", exc_info=True
)
raise

async def _deep_reconnect(self) -> None:
"""
Perform a deep reconnection by completely rebuilding the connection.

This method is called after multiple quick reconnection failures.
It performs a full teardown and rebuild:
- Disconnects existing connection
- Refreshes authentication tokens
- Creates new connection manager
- Re-establishes all subscriptions

This is more expensive but can recover from issues that a simple
reconnection cannot fix (e.g., stale credentials, corrupted state).
"""
if self._connected:
_logger.debug("Already connected, skipping deep reconnection")
return

_logger.warning(
"Performing deep reconnection (full rebuild)... "
"This may take longer."
)

try:
# Step 1: Clean up existing connection if any
if self._connection_manager:
_logger.debug("Cleaning up old connection...")
try:
if self._connection_manager.is_connected:
await self._connection_manager.disconnect()
except (AwsCrtError, RuntimeError) as e:
# Expected: connection already dead or in bad state
_logger.debug(f"Error during cleanup: {e} (expected)")

# Step 2: Force token refresh to get fresh AWS credentials
_logger.debug("Refreshing authentication tokens...")
try:
# Use the stored refresh token from current tokens
current_tokens = self._auth_client.current_tokens
if current_tokens and current_tokens.refresh_token:
await self._auth_client.refresh_token(
current_tokens.refresh_token
)
else:
_logger.warning("No refresh token available")
raise ValueError("No refresh token available for refresh")
except (TokenRefreshError, ValueError, AuthenticationError) as e:
# If refresh fails, try full re-authentication with stored
# credentials
if self._auth_client.has_stored_credentials:
_logger.warning(
f"Token refresh failed: {e}. Attempting full "
"re-authentication..."
)
await self._auth_client.re_authenticate()
else:
_logger.error(
"Cannot re-authenticate: no stored credentials"
)
raise

# Step 3: Create completely new connection manager
_logger.debug("Creating new connection manager...")
self._connection_manager = MqttConnection(
config=self.config,
auth_client=self._auth_client,
on_connection_interrupted=self._on_connection_interrupted_internal,
on_connection_resumed=self._on_connection_resumed_internal,
)

# Step 4: Attempt connection
success = await self._connection_manager.connect()

if success:
# Update connection references
self._connection = self._connection_manager.connection
self._connected = True

# Step 5: Re-establish subscriptions
if self._subscription_manager and self._connection:
_logger.debug("Re-establishing subscriptions...")
self._subscription_manager.update_connection(
self._connection
)
await self._subscription_manager.resubscribe_all()

_logger.info(
"Deep reconnection successful - fully rebuilt connection"
)
else:
_logger.error("Deep reconnection failed to connect")

except (
AwsCrtError,
AuthenticationError,
RuntimeError,
ValueError,
) as e:
_logger.error(f"Error during deep reconnection: {e}", exc_info=True)
raise

async def connect(self) -> bool:
"""
Establish connection to AWS IoT Core.
Expand Down Expand Up @@ -394,6 +499,7 @@ async def connect(self) -> bool:
is_connected_func=lambda: self._connected,
schedule_coroutine_func=self._schedule_coroutine,
reconnect_func=self._active_reconnect,
deep_reconnect_func=self._deep_reconnect,
emit_event_func=self.emit,
)
self._reconnection_handler.enable()
Expand Down Expand Up @@ -428,7 +534,12 @@ async def connect(self) -> bool:

return False

except Exception as e:
except (
AwsCrtError,
AuthenticationError,
RuntimeError,
ValueError,
) as e:
_logger.error(f"Failed to connect: {e}")
raise

Expand Down Expand Up @@ -473,7 +584,7 @@ async def disconnect(self) -> None:
self._connection = None

_logger.info("Disconnected successfully")
except Exception as e:
except (AwsCrtError, RuntimeError) as e:
_logger.error(f"Error during disconnect: {e}")
raise

Expand All @@ -493,7 +604,7 @@ def _on_message_received(

except json.JSONDecodeError as e:
_logger.error(f"Failed to parse message payload: {e}")
except Exception as e:
except (AttributeError, KeyError, TypeError) as e:
_logger.error(f"Error processing message: {e}")

def _topic_matches_pattern(self, topic: str, pattern: str) -> bool:
Expand Down Expand Up @@ -618,12 +729,11 @@ async def publish(

try:
return await self._connection_manager.publish(topic, payload, qos)
except Exception as e:
except AwsCrtError as e:
# Handle clean session cancellation gracefully
# Check exception type and name attribute for proper
# error identification
# Safely check e.name attribute (may not exist or be None)
if (
isinstance(e, AwsCrtError)
hasattr(e, "name")
and e.name == "AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION"
):
_logger.warning(
Expand All @@ -641,9 +751,9 @@ async def publish(
raise RuntimeError(
"Publish cancelled due to clean session and "
"command queue is disabled"
)
) from e

# Note: redact_topic is already used elsewhere in the file
# Other AWS CRT errors
_logger.error(f"Failed to publish to topic: {e}")
raise

Expand Down
5 changes: 3 additions & 2 deletions src/nwp500/mqtt_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing import TYPE_CHECKING, Any, Callable, Optional, Union

from awscrt import mqtt
from awscrt.exceptions import AwsCrtError
from awsiot import mqtt_connection_builder

if TYPE_CHECKING:
Expand Down Expand Up @@ -147,7 +148,7 @@ async def connect(self) -> bool:

return True

except Exception as e:
except (AwsCrtError, RuntimeError, ValueError) as e:
_logger.error(f"Failed to connect: {e}")
raise

Expand Down Expand Up @@ -195,7 +196,7 @@ async def disconnect(self) -> None:
self._connected = False
self._connection = None
_logger.info("Disconnected successfully")
except Exception as e:
except (AwsCrtError, RuntimeError) as e:
_logger.error(f"Error during disconnect: {e}")
raise

Expand Down
6 changes: 3 additions & 3 deletions src/nwp500/mqtt_periodic.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,13 @@ async def periodic_request() -> None:
f"for {redacted_device_id}"
)
break
except Exception as e:
except (AwsCrtError, RuntimeError) as e:
# Handle clean session cancellation gracefully (expected
# during reconnection)
# Check exception type and name attribute for proper error
# identification
# Safely check exception name attribute
if (
isinstance(e, AwsCrtError)
and hasattr(e, "name")
and e.name
== "AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION"
):
Expand Down
Loading