Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@
- **Install dependencies**: `pip install -e .` (development mode)
- **Run tests**: `pytest` (unit tests in `tests/`)
- **Lint/format**: `ruff format --check src/ tests/ examples/` (use `ruff format ...` to auto-format)
- **CI-compatible linting**: `make ci-lint` (run before finalizing changes to ensure CI will pass)
- **CI-compatible formatting**: `make ci-format` (auto-fix formatting issues)
- **Build docs**: `tox -e docs` (Sphinx docs in `docs/`)
- **Preview docs**: `python3 -m http.server --directory docs/_build/html`

### Before Committing Changes
Always run `make ci-lint` before finalizing changes to ensure your code will pass CI checks. This runs the exact same linting configuration as the CI pipeline, preventing "passes locally but fails in CI" issues.

## Patterns & Conventions
- **Async context managers** for authentication: `async with NavienAuthClient(email, password) as auth_client:`
- **Environment variables** for credentials: `NAVIEN_EMAIL`, `NAVIEN_PASSWORD`
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ Added
- Eliminates "passes locally but fails in CI" issues
- Cross-platform support (Linux, macOS, Windows, containers)

- All MQTT operations (connect, disconnect, subscribe, unsubscribe, publish) use ``asyncio.run_in_executor()``
- All MQTT operations (connect, disconnect, subscribe, unsubscribe, publish) use ``asyncio.wrap_future()`` to convert AWS SDK Futures to asyncio Futures
- Eliminates "blocking I/O detected" warnings in Home Assistant and other async applications
- Fully compatible with async event loops without blocking other operations
- More efficient than executor-based approaches (no thread pool usage)
- No API changes required - existing code works without modification
- Maintains full performance and reliability of the underlying AWS IoT SDK
- Safe for use in Home Assistant custom integrations and other async applications
Expand Down
5 changes: 3 additions & 2 deletions docs/MQTT_CLIENT.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1015,8 +1015,9 @@ async applications.

**Implementation Details:**

- All AWS IoT SDK operations that could block are wrapped with ``asyncio.run_in_executor()``
- Connection, disconnection, subscription, and publishing operations are non-blocking
- AWS IoT SDK operations return ``concurrent.futures.Future`` objects that are converted to asyncio Futures using ``asyncio.wrap_future()``
- Connection, disconnection, subscription, and publishing operations are fully non-blocking
- No thread pool resources are used for MQTT operations (more efficient than executor-based approaches)
- The client maintains full compatibility with the existing API
- No additional configuration required for non-blocking behavior

Expand Down
35 changes: 32 additions & 3 deletions examples/anti_legionella_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
from typing import Any

from nwp500 import NavienAPIClient, NavienAuthClient, NavienMqttClient
from nwp500.constants import (
CMD_ANTI_LEGIONELLA_DISABLE,
CMD_ANTI_LEGIONELLA_ENABLE,
CMD_STATUS_REQUEST,
)


def display_anti_legionella_status(status: dict[str, Any], label: str = "") -> None:
Expand Down Expand Up @@ -67,15 +72,35 @@ async def main() -> None:
latest_status = {}
status_received = asyncio.Event()

# Expected command codes for each step
expected_command = None

def on_status(topic: str, message: dict[str, Any]) -> None:
nonlocal latest_status
# Debug: print what we received
print(f"[DEBUG] Received message on topic: {topic}")

# Skip command echoes (messages on /ctrl topic)
if topic.endswith("/ctrl"):
print("[DEBUG] Skipping command echo")
return

status = message.get("response", {}).get("status", {})
command = status.get("command")

# Only capture status if it has Anti-Legionella data
if status.get("antiLegionellaPeriod") is not None:
latest_status = status
status_received.set()
print("[DEBUG] Anti-Legionella status captured")
# If we're expecting a specific command, only accept that
if expected_command is None or command == expected_command:
latest_status = status
status_received.set()
print(
f"[DEBUG] Anti-Legionella status captured (command={command})"
)
else:
print(
f"[DEBUG] Ignoring status from different command (got {command}, expected {expected_command})"
)
else:
print("[DEBUG] Message doesn't contain antiLegionellaPeriod")

Expand All @@ -94,6 +119,7 @@ def on_status(topic: str, message: dict[str, Any]) -> None:
print("STEP 1: Getting initial Anti-Legionella status...")
print("=" * 70)
status_received.clear()
expected_command = CMD_STATUS_REQUEST
await mqtt_client.request_device_status(device)

try:
Expand All @@ -111,6 +137,7 @@ def on_status(topic: str, message: dict[str, Any]) -> None:
print("STEP 2: Enabling Anti-Legionella cycle every 7 days...")
print("=" * 70)
status_received.clear()
expected_command = CMD_ANTI_LEGIONELLA_ENABLE
await mqtt_client.enable_anti_legionella(device, period_days=7)

try:
Expand All @@ -128,6 +155,7 @@ def on_status(topic: str, message: dict[str, Any]) -> None:
print("WARNING: This reduces protection against Legionella bacteria!")
print("=" * 70)
status_received.clear()
expected_command = CMD_ANTI_LEGIONELLA_DISABLE
await mqtt_client.disable_anti_legionella(device)

try:
Expand All @@ -144,6 +172,7 @@ def on_status(topic: str, message: dict[str, Any]) -> None:
print("STEP 4: Re-enabling Anti-Legionella with 14-day cycle...")
print("=" * 70)
status_received.clear()
expected_command = CMD_ANTI_LEGIONELLA_ENABLE
await mqtt_client.enable_anti_legionella(device, period_days=14)

try:
Expand Down
90 changes: 38 additions & 52 deletions src/nwp500/mqtt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -569,31 +569,23 @@ async def connect(self) -> bool:

try:
# Build WebSocket MQTT connection with AWS credentials
# Run the connection building in a thread pool to avoid blocking I/O
def _build_connection():
return mqtt_connection_builder.websockets_with_default_aws_signing(
endpoint=self.config.endpoint,
region=self.config.region,
credentials_provider=self._create_credentials_provider(),
client_id=self.config.client_id,
clean_session=self.config.clean_session,
keep_alive_secs=self.config.keep_alive_secs,
on_connection_interrupted=self._on_connection_interrupted_internal,
on_connection_resumed=self._on_connection_resumed_internal,
)

# Run connection builder in thread pool to avoid blocking I/O
self._connection = await self._loop.run_in_executor(None, _build_connection)
self._connection = mqtt_connection_builder.websockets_with_default_aws_signing(
endpoint=self.config.endpoint,
region=self.config.region,
credentials_provider=self._create_credentials_provider(),
client_id=self.config.client_id,
clean_session=self.config.clean_session,
keep_alive_secs=self.config.keep_alive_secs,
on_connection_interrupted=self._on_connection_interrupted_internal,
on_connection_resumed=self._on_connection_resumed_internal,
)

# Connect
_logger.info("Establishing MQTT connection...")

# Run the connect operation in a thread pool to avoid blocking I/O
def _connect():
connect_future = self._connection.connect()
return connect_future.result()

connect_result = await self._loop.run_in_executor(None, _connect)
# Convert concurrent.futures.Future to asyncio.Future and await
connect_future = self._connection.connect()
connect_result = await asyncio.wrap_future(connect_future)

self._connected = True
self._reconnect_attempts = 0 # Reset on successful connection
Expand Down Expand Up @@ -644,12 +636,9 @@ async def disconnect(self):
await self.stop_all_periodic_tasks()

try:
# Run disconnect operation in thread pool to avoid blocking I/O
def _disconnect():
disconnect_future = self._connection.disconnect()
return disconnect_future.result()

await self._loop.run_in_executor(None, _disconnect)
# Convert concurrent.futures.Future to asyncio.Future and await
disconnect_future = self._connection.disconnect()
await asyncio.wrap_future(disconnect_future)

self._connected = False
self._connection = None
Expand Down Expand Up @@ -744,15 +733,11 @@ async def subscribe(
_logger.info(f"Subscribing to topic: {topic}")

try:
# Run subscribe operation in thread pool to avoid blocking I/O
def _subscribe():
subscribe_future, packet_id = self._connection.subscribe(
topic=topic, qos=qos, callback=self._on_message_received
)
subscribe_result = subscribe_future.result()
return subscribe_result, packet_id

subscribe_result, packet_id = await self._loop.run_in_executor(None, _subscribe)
# Convert concurrent.futures.Future to asyncio.Future and await
subscribe_future, packet_id = self._connection.subscribe(
topic=topic, qos=qos, callback=self._on_message_received
)
subscribe_result = await asyncio.wrap_future(subscribe_future)

_logger.info(f"Subscribed to '{topic}' with QoS {subscribe_result['qos']}")

Expand All @@ -768,32 +753,37 @@ def _subscribe():
_logger.error(f"Failed to subscribe to '{_redact_topic(topic)}': {e}")
raise

async def unsubscribe(self, topic: str):
async def unsubscribe(self, topic: str) -> int:
"""
Unsubscribe from an MQTT topic.

Args:
topic: MQTT topic to unsubscribe from

Returns:
Unsubscribe packet ID

Raises:
Exception: If unsubscribe fails
"""
if not self._connected:
raise RuntimeError("Not connected to MQTT broker")

_logger.info(f"Unsubscribing from topic: {topic}")

try:
# Run unsubscribe operation in thread pool to avoid blocking I/O
def _unsubscribe():
unsubscribe_future, packet_id = self._connection.unsubscribe(topic)
return unsubscribe_future.result()

await self._loop.run_in_executor(None, _unsubscribe)
# Convert concurrent.futures.Future to asyncio.Future and await
unsubscribe_future, packet_id = self._connection.unsubscribe(topic)
await asyncio.wrap_future(unsubscribe_future)

# Remove from tracking
self._subscriptions.pop(topic, None)
self._message_handlers.pop(topic, None)

_logger.info(f"Unsubscribed from '{topic}'")

return packet_id

except Exception as e:
_logger.error(f"Failed to unsubscribe from '{_redact_topic(topic)}': {e}")
raise
Expand Down Expand Up @@ -835,15 +825,11 @@ async def publish(
# Serialize to JSON
payload_json = json.dumps(payload)

# Run publish operation in thread pool to avoid blocking I/O
def _publish():
publish_future, packet_id = self._connection.publish(
topic=topic, payload=payload_json, qos=qos
)
publish_future.result()
return packet_id

packet_id = await self._loop.run_in_executor(None, _publish)
# Convert concurrent.futures.Future to asyncio.Future and await
publish_future, packet_id = self._connection.publish(
topic=topic, payload=payload_json, qos=qos
)
await asyncio.wrap_future(publish_future)

_logger.debug(f"Published to '{topic}' with packet_id {packet_id}")

Expand Down