Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions custom_components/pymc_repeater/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
)
coordinator = PyMCRepeaterDataUpdateCoordinator(hass, entry, api)
await coordinator.async_config_entry_first_refresh()
await coordinator.async_start_runtime()

repeater_name = get_repeater_name_from_stats(coordinator.data.get("stats", {}))
if repeater_name and repeater_name != entry.title:
Expand All @@ -97,6 +98,8 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
entry_data = hass.data[DOMAIN].pop(entry.entry_id, None)
if entry_data:
await entry_data["coordinator"].async_stop_runtime()
if entry_data and (unsub := entry_data.get("unsub_options_listener")):
unsub()
return unload_ok
Expand Down
56 changes: 55 additions & 1 deletion custom_components/pymc_repeater/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
from __future__ import annotations

import asyncio
import json
import socket
from dataclasses import dataclass
from typing import Any
from urllib.parse import urlparse

from aiohttp import ClientError, ClientSession
from aiohttp import ClientError, ClientResponse, ClientSession
from yarl import URL

from .const import CLIENT_ID_PREFIX, DEFAULT_PACKET_WINDOW_HOURS
Expand Down Expand Up @@ -253,6 +254,10 @@ async def async_get_gps(self) -> dict[str, Any]:
"""Return local GPS receiver diagnostics."""
return await self._async_request_wrapped("GET", "/api/gps")

async def async_open_gps_stream(self) -> ClientResponse:
"""Open the GPS SSE stream."""
return await self._async_open_stream("GET", "/api/gps_stream")

async def async_get_logs(self) -> dict[str, Any]:
"""Return buffered repeater logs."""
payload = await self._async_request_json("GET", "/api/logs", auth="api_token")
Expand Down Expand Up @@ -753,6 +758,41 @@ async def _async_request_wrapped(
raise PyMCRepeaterApiError(payload.get("error", f"Request failed for {path}"))
return payload.get("data", payload)

async def _async_open_stream(
self,
method: str,
path: str,
*,
params: dict[str, Any] | None = None,
) -> ClientResponse:
headers = {"Accept": "text/event-stream"}
if not self.api_token:
raise PyMCRepeaterAuthenticationError("API token is not configured")
headers["X-API-Key"] = self.api_token
url = f"{self.base_url}{path}"

try:
response = await self._session.request(
method,
url,
params=params,
headers=headers,
timeout=None,
)
except ClientError as err:
raise PyMCRepeaterCannotConnect(
f"Cannot connect to {self.host}:{self.port}"
) from err

if response.status in (401, 403):
response.release()
raise PyMCRepeaterAuthenticationError(f"Authentication failed for {path}")
if response.status >= 400:
detail = await response.text()
response.release()
raise PyMCRepeaterApiError(f"HTTP {response.status} from {path}: {detail[:200]}")
return response

async def _async_request_json(
self,
method: str,
Expand Down Expand Up @@ -815,6 +855,20 @@ async def _async_request_json(

return payload

@staticmethod
def decode_sse_payload(line: bytes) -> dict[str, Any] | None:
"""Decode one SSE data line from the GPS stream."""
if not line.startswith(b"data:"):
return None
payload = line[5:].strip()
if not payload:
return None
try:
parsed = json.loads(payload.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError):
return None
return parsed if isinstance(parsed, dict) else None

def _build_client_id(self) -> str:
"""Create a stable-enough client identifier for HA bootstrap."""
hostname = socket.gethostname().lower().replace(" ", "-")
Expand Down
28 changes: 28 additions & 0 deletions custom_components/pymc_repeater/binary_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ def _any_mqtt_connected(data: dict[str, Any]) -> bool:
entity_category=EntityCategory.DIAGNOSTIC,
value_fn=lambda data: bool(_nested(data, "update_status", "has_update")),
),
PyMCBinarySensorDescription(
key="gps_enabled",
name="GPS enabled",
icon="mdi:satellite-uplink",
entity_category=EntityCategory.DIAGNOSTIC,
value_fn=lambda data: bool(_nested(data, "gps", "enabled")),
),
PyMCBinarySensorDescription(
key="gps_running",
name="GPS running",
icon="mdi:run-fast",
entity_category=EntityCategory.DIAGNOSTIC,
value_fn=lambda data: bool(_nested(data, "gps", "running")),
),
PyMCBinarySensorDescription(
key="gps_fix_valid",
name="GPS fix valid",
Expand All @@ -83,6 +97,20 @@ def _any_mqtt_connected(data: dict[str, Any]) -> bool:
device_class=BinarySensorDeviceClass.CONNECTIVITY,
value_fn=lambda data: bool(_nested(data, "gps", "status", "fix_valid")),
),
PyMCBinarySensorDescription(
key="gps_stale",
name="GPS stale",
icon="mdi:timer-off-outline",
entity_category=EntityCategory.DIAGNOSTIC,
value_fn=lambda data: bool(_nested(data, "gps", "status", "stale")),
),
PyMCBinarySensorDescription(
key="gps_location_update_enabled",
name="GPS location updates enabled",
icon="mdi:map-marker-plus-outline",
entity_category=EntityCategory.DIAGNOSTIC,
value_fn=lambda data: bool(_nested(data, "gps", "location_update", "enabled")),
),
)


Expand Down
81 changes: 81 additions & 0 deletions custom_components/pymc_repeater/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

from __future__ import annotations

import asyncio
import contextlib
import json
import logging

from homeassistant.config_entries import ConfigEntry
Expand All @@ -17,6 +20,7 @@
from .const import DEFAULT_SCAN_INTERVAL, DOMAIN

_LOGGER = logging.getLogger(__name__)
GPS_STREAM_RETRY_SECONDS = 15


class PyMCRepeaterDataUpdateCoordinator(DataUpdateCoordinator[dict]):
Expand All @@ -36,6 +40,20 @@ def __init__(
)
self.config_entry = entry
self.api = api
self._gps_stream_task: asyncio.Task | None = None

async def async_start_runtime(self) -> None:
"""Start background runtime tasks."""
if self._gps_stream_task is None:
self._gps_stream_task = self.hass.async_create_task(self._async_gps_stream_loop())

async def async_stop_runtime(self) -> None:
"""Stop background runtime tasks."""
if self._gps_stream_task is not None:
self._gps_stream_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._gps_stream_task
self._gps_stream_task = None

async def _async_update_data(self) -> dict:
try:
Expand All @@ -44,3 +62,66 @@ async def _async_update_data(self) -> dict:
raise ConfigEntryAuthFailed(str(err)) from err
except PyMCRepeaterCannotConnect as err:
raise UpdateFailed(str(err)) from err

async def _async_gps_stream_loop(self) -> None:
"""Listen for GPS stream snapshots and update GPS entities faster than polling."""
while True:
try:
response = await self.api.async_open_gps_stream()
try:
_LOGGER.debug(
"Connected GPS stream for %s", self.config_entry.entry_id
)
async for raw_line in response.content:
event = self.api.decode_sse_payload(raw_line)
if not event:
continue
if event.get("type") != "snapshot":
continue
snapshot = event.get("data")
if not isinstance(snapshot, dict):
continue
self._async_apply_gps_snapshot(snapshot)
finally:
response.close()
except asyncio.CancelledError:
raise
except PyMCRepeaterAuthenticationError as err:
_LOGGER.warning(
"GPS stream auth failed for %s: %s",
self.config_entry.entry_id,
err,
)
return
except (PyMCRepeaterCannotConnect, UpdateFailed) as err:
_LOGGER.debug(
"GPS stream temporarily unavailable for %s: %s",
self.config_entry.entry_id,
err,
)
except Exception as err:
_LOGGER.debug(
"GPS stream error for %s: %s",
self.config_entry.entry_id,
err,
)

await asyncio.sleep(GPS_STREAM_RETRY_SECONDS)

def _async_apply_gps_snapshot(self, snapshot: dict) -> None:
"""Apply a GPS snapshot from the SSE stream."""
current = dict(self.data or {})
old_snapshot = current.get("gps")
if self._same_snapshot(old_snapshot, snapshot):
return
current["gps"] = snapshot
self.async_set_updated_data(current)

@staticmethod
def _same_snapshot(old_snapshot: object, new_snapshot: dict) -> bool:
"""Compare two GPS snapshots."""
if not isinstance(old_snapshot, dict):
return False
return json.dumps(old_snapshot, sort_keys=True, default=str) == json.dumps(
new_snapshot, sort_keys=True, default=str
)
6 changes: 4 additions & 2 deletions custom_components/pymc_repeater/manifest.json
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
{
"domain": "pymc_repeater",
"name": "pyMC Repeater",
"codeowners": ["@pyMC-dev"],
"codeowners": [
"@pyMC-dev"
],
"config_flow": true,
"documentation": "https://github.com/pyMC-dev/pyMC-HA-Integration",
"integration_type": "device",
"iot_class": "local_polling",
"issue_tracker": "https://github.com/pyMC-dev/pyMC-HA-Integration/issues",
"requirements": [],
"version": "1.0.0"
"version": "1.1.0"
}
Loading