diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 4bf68b2..c38f239 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -12,9 +12,14 @@ - **Install dependencies**: `pip install -e .` (development mode) - **Run tests**: `pytest` (unit tests in `tests/`) - **Lint/format**: `ruff format --check src/ tests/ examples/` (use `ruff format ...` to auto-format) +- **CI-compatible linting**: `make ci-lint` (run before finalizing changes to ensure CI will pass) +- **CI-compatible formatting**: `make ci-format` (auto-fix formatting issues) - **Build docs**: `tox -e docs` (Sphinx docs in `docs/`) - **Preview docs**: `python3 -m http.server --directory docs/_build/html` +### Before Committing Changes +Always run `make ci-lint` before finalizing changes to ensure your code will pass CI checks. This runs the exact same linting configuration as the CI pipeline, preventing "passes locally but fails in CI" issues. + ## Patterns & Conventions - **Async context managers** for authentication: `async with NavienAuthClient(email, password) as auth_client:` - **Environment variables** for credentials: `NAVIEN_EMAIL`, `NAVIEN_PASSWORD` diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 1c5e8c1..22c6401 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -19,9 +19,10 @@ Added - Eliminates "passes locally but fails in CI" issues - Cross-platform support (Linux, macOS, Windows, containers) - - All MQTT operations (connect, disconnect, subscribe, unsubscribe, publish) use ``asyncio.run_in_executor()`` + - All MQTT operations (connect, disconnect, subscribe, unsubscribe, publish) use ``asyncio.wrap_future()`` to convert AWS SDK Futures to asyncio Futures - Eliminates "blocking I/O detected" warnings in Home Assistant and other async applications - Fully compatible with async event loops without blocking other operations + - More efficient than executor-based approaches (no thread pool usage) - No API changes required - existing code works without modification - Maintains full performance and reliability of the underlying AWS IoT SDK - Safe for use in Home Assistant custom integrations and other async applications diff --git a/docs/MQTT_CLIENT.rst b/docs/MQTT_CLIENT.rst index 0809fec..f784740 100644 --- a/docs/MQTT_CLIENT.rst +++ b/docs/MQTT_CLIENT.rst @@ -1015,8 +1015,9 @@ async applications. **Implementation Details:** -- All AWS IoT SDK operations that could block are wrapped with ``asyncio.run_in_executor()`` -- Connection, disconnection, subscription, and publishing operations are non-blocking +- AWS IoT SDK operations return ``concurrent.futures.Future`` objects that are converted to asyncio Futures using ``asyncio.wrap_future()`` +- Connection, disconnection, subscription, and publishing operations are fully non-blocking +- No thread pool resources are used for MQTT operations (more efficient than executor-based approaches) - The client maintains full compatibility with the existing API - No additional configuration required for non-blocking behavior diff --git a/examples/anti_legionella_example.py b/examples/anti_legionella_example.py index a52c08b..66a1074 100644 --- a/examples/anti_legionella_example.py +++ b/examples/anti_legionella_example.py @@ -17,6 +17,11 @@ from typing import Any from nwp500 import NavienAPIClient, NavienAuthClient, NavienMqttClient +from nwp500.constants import ( + CMD_ANTI_LEGIONELLA_DISABLE, + CMD_ANTI_LEGIONELLA_ENABLE, + CMD_STATUS_REQUEST, +) def display_anti_legionella_status(status: dict[str, Any], label: str = "") -> None: @@ -67,15 +72,35 @@ async def main() -> None: latest_status = {} status_received = asyncio.Event() + # Expected command codes for each step + expected_command = None + def on_status(topic: str, message: dict[str, Any]) -> None: nonlocal latest_status # Debug: print what we received print(f"[DEBUG] Received message on topic: {topic}") + + # Skip command echoes (messages on /ctrl topic) + if topic.endswith("/ctrl"): + print("[DEBUG] Skipping command echo") + return + status = message.get("response", {}).get("status", {}) + command = status.get("command") + + # Only capture status if it has Anti-Legionella data if status.get("antiLegionellaPeriod") is not None: - latest_status = status - status_received.set() - print("[DEBUG] Anti-Legionella status captured") + # If we're expecting a specific command, only accept that + if expected_command is None or command == expected_command: + latest_status = status + status_received.set() + print( + f"[DEBUG] Anti-Legionella status captured (command={command})" + ) + else: + print( + f"[DEBUG] Ignoring status from different command (got {command}, expected {expected_command})" + ) else: print("[DEBUG] Message doesn't contain antiLegionellaPeriod") @@ -94,6 +119,7 @@ def on_status(topic: str, message: dict[str, Any]) -> None: print("STEP 1: Getting initial Anti-Legionella status...") print("=" * 70) status_received.clear() + expected_command = CMD_STATUS_REQUEST await mqtt_client.request_device_status(device) try: @@ -111,6 +137,7 @@ def on_status(topic: str, message: dict[str, Any]) -> None: print("STEP 2: Enabling Anti-Legionella cycle every 7 days...") print("=" * 70) status_received.clear() + expected_command = CMD_ANTI_LEGIONELLA_ENABLE await mqtt_client.enable_anti_legionella(device, period_days=7) try: @@ -128,6 +155,7 @@ def on_status(topic: str, message: dict[str, Any]) -> None: print("WARNING: This reduces protection against Legionella bacteria!") print("=" * 70) status_received.clear() + expected_command = CMD_ANTI_LEGIONELLA_DISABLE await mqtt_client.disable_anti_legionella(device) try: @@ -144,6 +172,7 @@ def on_status(topic: str, message: dict[str, Any]) -> None: print("STEP 4: Re-enabling Anti-Legionella with 14-day cycle...") print("=" * 70) status_received.clear() + expected_command = CMD_ANTI_LEGIONELLA_ENABLE await mqtt_client.enable_anti_legionella(device, period_days=14) try: diff --git a/src/nwp500/mqtt_client.py b/src/nwp500/mqtt_client.py index 912fa16..5376ae2 100644 --- a/src/nwp500/mqtt_client.py +++ b/src/nwp500/mqtt_client.py @@ -569,31 +569,23 @@ async def connect(self) -> bool: try: # Build WebSocket MQTT connection with AWS credentials - # Run the connection building in a thread pool to avoid blocking I/O - def _build_connection(): - return mqtt_connection_builder.websockets_with_default_aws_signing( - endpoint=self.config.endpoint, - region=self.config.region, - credentials_provider=self._create_credentials_provider(), - client_id=self.config.client_id, - clean_session=self.config.clean_session, - keep_alive_secs=self.config.keep_alive_secs, - on_connection_interrupted=self._on_connection_interrupted_internal, - on_connection_resumed=self._on_connection_resumed_internal, - ) - - # Run connection builder in thread pool to avoid blocking I/O - self._connection = await self._loop.run_in_executor(None, _build_connection) + self._connection = mqtt_connection_builder.websockets_with_default_aws_signing( + endpoint=self.config.endpoint, + region=self.config.region, + credentials_provider=self._create_credentials_provider(), + client_id=self.config.client_id, + clean_session=self.config.clean_session, + keep_alive_secs=self.config.keep_alive_secs, + on_connection_interrupted=self._on_connection_interrupted_internal, + on_connection_resumed=self._on_connection_resumed_internal, + ) # Connect _logger.info("Establishing MQTT connection...") - # Run the connect operation in a thread pool to avoid blocking I/O - def _connect(): - connect_future = self._connection.connect() - return connect_future.result() - - connect_result = await self._loop.run_in_executor(None, _connect) + # Convert concurrent.futures.Future to asyncio.Future and await + connect_future = self._connection.connect() + connect_result = await asyncio.wrap_future(connect_future) self._connected = True self._reconnect_attempts = 0 # Reset on successful connection @@ -644,12 +636,9 @@ async def disconnect(self): await self.stop_all_periodic_tasks() try: - # Run disconnect operation in thread pool to avoid blocking I/O - def _disconnect(): - disconnect_future = self._connection.disconnect() - return disconnect_future.result() - - await self._loop.run_in_executor(None, _disconnect) + # Convert concurrent.futures.Future to asyncio.Future and await + disconnect_future = self._connection.disconnect() + await asyncio.wrap_future(disconnect_future) self._connected = False self._connection = None @@ -744,15 +733,11 @@ async def subscribe( _logger.info(f"Subscribing to topic: {topic}") try: - # Run subscribe operation in thread pool to avoid blocking I/O - def _subscribe(): - subscribe_future, packet_id = self._connection.subscribe( - topic=topic, qos=qos, callback=self._on_message_received - ) - subscribe_result = subscribe_future.result() - return subscribe_result, packet_id - - subscribe_result, packet_id = await self._loop.run_in_executor(None, _subscribe) + # Convert concurrent.futures.Future to asyncio.Future and await + subscribe_future, packet_id = self._connection.subscribe( + topic=topic, qos=qos, callback=self._on_message_received + ) + subscribe_result = await asyncio.wrap_future(subscribe_future) _logger.info(f"Subscribed to '{topic}' with QoS {subscribe_result['qos']}") @@ -768,12 +753,18 @@ def _subscribe(): _logger.error(f"Failed to subscribe to '{_redact_topic(topic)}': {e}") raise - async def unsubscribe(self, topic: str): + async def unsubscribe(self, topic: str) -> int: """ Unsubscribe from an MQTT topic. Args: topic: MQTT topic to unsubscribe from + + Returns: + Unsubscribe packet ID + + Raises: + Exception: If unsubscribe fails """ if not self._connected: raise RuntimeError("Not connected to MQTT broker") @@ -781,12 +772,9 @@ async def unsubscribe(self, topic: str): _logger.info(f"Unsubscribing from topic: {topic}") try: - # Run unsubscribe operation in thread pool to avoid blocking I/O - def _unsubscribe(): - unsubscribe_future, packet_id = self._connection.unsubscribe(topic) - return unsubscribe_future.result() - - await self._loop.run_in_executor(None, _unsubscribe) + # Convert concurrent.futures.Future to asyncio.Future and await + unsubscribe_future, packet_id = self._connection.unsubscribe(topic) + await asyncio.wrap_future(unsubscribe_future) # Remove from tracking self._subscriptions.pop(topic, None) @@ -794,6 +782,8 @@ def _unsubscribe(): _logger.info(f"Unsubscribed from '{topic}'") + return packet_id + except Exception as e: _logger.error(f"Failed to unsubscribe from '{_redact_topic(topic)}': {e}") raise @@ -835,15 +825,11 @@ async def publish( # Serialize to JSON payload_json = json.dumps(payload) - # Run publish operation in thread pool to avoid blocking I/O - def _publish(): - publish_future, packet_id = self._connection.publish( - topic=topic, payload=payload_json, qos=qos - ) - publish_future.result() - return packet_id - - packet_id = await self._loop.run_in_executor(None, _publish) + # Convert concurrent.futures.Future to asyncio.Future and await + publish_future, packet_id = self._connection.publish( + topic=topic, payload=payload_json, qos=qos + ) + await asyncio.wrap_future(publish_future) _logger.debug(f"Published to '{topic}' with packet_id {packet_id}")