Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ Same-subnet peers read `remote_build_port` from TXT so a `--remote-build-port` o

**`subscribe_events` initial state:**

Right after a client subscribes (and before any live events arrive), the server pushes one `initial_state` event carrying a snapshot of state that's accumulated server-side via background activity (mDNS browser, completed pair flows, etc.) so the frontend can paint the first frame without follow-up reads. Shape: `{devices?: [...], importable?: [...], pairings?: [PairingSummary], peers?: [PeerSummary], hosts?: [RemoteBuildPeer], offloader_alerts?: [OffloaderAlertSnapshotEntry], peer_queue_status?: [PeerQueueStatusSnapshotEntry], remote_jobs?: [OffloaderRemoteJobSnapshotEntry], remote_builds_enabled?: bool, version_match_policy?: "any" | "release" | "exact" | "exact_required"}`. Each field is present only when the corresponding controller is up; `pairings` carries both PENDING and APPROVED offloader-side rows from the `_pairings` dict, `peers` carries both PENDING (`_pending_peers`) and APPROVED (`_approved_peers`) receiver-side rows, `hosts` carries the receiver controller's mDNS-discovered peer dashboards (`self._peers`, RAM-only — never persisted), `offloader_alerts` carries the offloader-side pair alerts dict (`_offloader_alerts`, RAM-only) so a tab subscribing AFTER a `pin_mismatch` / `peer_revoked` event fired still renders the alert it would have missed on the live stream — the alert only clears via re-pair or unpair, never by an operator-driven dismiss, because the underlying state (broken pairing) doesn't fix itself. `peer_queue_status` carries the most recent `queue_status` snapshot per paired receiver so a late tab paints the per-peer queue depth without waiting for the next event. `remote_jobs` carries every offloader-submitted job that's still in flight (terminal entries drop on the matching `job_state_changed` event) so the UI can render running builds on page load. `remote_builds_enabled` and `version_match_policy` carry the current value of the offloader-wide settings so the Settings dialog renders both controls on first paint instead of waiting for the matching `offloader_remote_builds_toggled` / `offloader_version_match_policy_changed` event to fire. All sync reads, no executor hop, no disk I/O. The `PeerSummary` projection persists `peer_ip` (the source IP observed at pair_request time) on `StoredPeer` so a snapshot-loaded inbox row carries the same IP the live `remote_build_pair_request_received` event would carry; that's what the receiver Settings UI renders alongside the pin as a clone-risk sanity-check. Empty string for legacy on-disk rows from receivers that pre-date the field. Live updates that arrive after the initial state mutate against this seed via the events below.
Right after a client subscribes (and before any live events arrive), the server pushes one `initial_state` event carrying a snapshot of state that's accumulated server-side via background activity (mDNS browser, completed pair flows, etc.) so the frontend can paint the first frame without follow-up reads. Shape: `{preferences: UserPreferences, devices?: [...], importable?: [...], pairings?: [PairingSummary], peers?: [PeerSummary], hosts?: [RemoteBuildPeer], offloader_alerts?: [OffloaderAlertSnapshotEntry], peer_queue_status?: [PeerQueueStatusSnapshotEntry], remote_jobs?: [OffloaderRemoteJobSnapshotEntry], remote_builds_enabled?: bool, version_match_policy?: "any" | "release" | "exact" | "exact_required"}`. `preferences` is always present — it's RAM-canonical behind a `PreferencesStore` (loaded once at startup, mutations debounce a write to its own `.device-builder-preferences.json`, migrated out of the shared sidecar on first run; the same per-file `Store` pattern as the device-metadata and peer-link stores), so the client paints theme/UI state without a separate `config/get_preferences`. Undecodable preferences are preserved, never destroyed: a corrupt dedicated file is renamed to `.corrupt` and an undecodable legacy sidecar blob is left in place (not stripped), both logged, before the store falls back to defaults. The rest are present only when the corresponding controller is up; `pairings` carries both PENDING and APPROVED offloader-side rows from the `_pairings` dict, `peers` carries both PENDING (`_pending_peers`) and APPROVED (`_approved_peers`) receiver-side rows, `hosts` carries the receiver controller's mDNS-discovered peer dashboards (`self._peers`, RAM-only — never persisted), `offloader_alerts` carries the offloader-side pair alerts dict (`_offloader_alerts`, RAM-only) so a tab subscribing AFTER a `pin_mismatch` / `peer_revoked` event fired still renders the alert it would have missed on the live stream — the alert only clears via re-pair or unpair, never by an operator-driven dismiss, because the underlying state (broken pairing) doesn't fix itself. `peer_queue_status` carries the most recent `queue_status` snapshot per paired receiver so a late tab paints the per-peer queue depth without waiting for the next event. `remote_jobs` carries every offloader-submitted job that's still in flight (terminal entries drop on the matching `job_state_changed` event) so the UI can render running builds on page load. `remote_builds_enabled` and `version_match_policy` carry the current value of the offloader-wide settings so the Settings dialog renders both controls on first paint instead of waiting for the matching `offloader_remote_builds_toggled` / `offloader_version_match_policy_changed` event to fire. All sync reads, no executor hop, no disk I/O. The `PeerSummary` projection persists `peer_ip` (the source IP observed at pair_request time) on `StoredPeer` so a snapshot-loaded inbox row carries the same IP the live `remote_build_pair_request_received` event would carry; that's what the receiver Settings UI renders alongside the pin as a clone-risk sanity-check. Empty string for legacy on-disk rows from receivers that pre-date the field. Live updates that arrive after the initial state mutate against this seed via the events below.

**`subscribe_events` events:**
- `device_added`, `device_removed`, `device_updated`, `device_state_changed`
Expand Down
12 changes: 0 additions & 12 deletions esphome_device_builder/controllers/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@
rename_device_metadata,
set_device_metadata,
)
from .preferences import (
_prefs_from_data,
load_preferences,
mutate_preferences,
save_preferences,
update_preferences,
)
from .remote_build_settings import (
_settings_from_raw,
has_remote_build_settings_persisted,
Expand Down Expand Up @@ -99,7 +92,6 @@
"_make_descriptor_tempfile",
"_parse_chip_family_line",
"_parse_project_name",
"_prefs_from_data",
"_read_app_descriptor_board_id",
"_read_descriptor_file",
"_run_esptool",
Expand All @@ -114,17 +106,13 @@
"has_remote_build_settings_persisted",
"labels_transaction",
"load_labels",
"load_preferences",
"load_remote_build_settings",
"metadata_transaction",
"mutate_preferences",
"remote_build_settings_transaction",
"remove_device_metadata",
"rename_device_metadata",
"save_labels",
"save_preferences",
"save_remote_build_settings",
"set_device_labels",
"set_device_metadata",
"update_preferences",
]
225 changes: 225 additions & 0 deletions esphome_device_builder/controllers/config/_preferences_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
"""RAM-canonical user preferences with a debounced disk write."""

from __future__ import annotations

import asyncio
import logging
from collections.abc import Callable
from pathlib import Path
from typing import Any

from ...helpers.json import JSONDecodeError, dumps_indent, loads
from ...helpers.storage import ShutdownRegister, Store
from ...models import UserPreferences
from .metadata import _load_metadata, metadata_transaction
from .preferences import _PREFS_KEY

_LOGGER = logging.getLogger(__name__)

_STORE_FILENAME = ".device-builder-preferences.json"
_SHARED_SIDECAR_FILENAME = ".device-builder.json"

_DEFAULT_SAVE_DELAY = 1.0

# Decode failures that should be treated as "corrupt / incompatible": a bad-JSON
# read, a non-object payload, or a shape ``from_dict`` rejects.
_DECODE_ERRORS = (JSONDecodeError, ValueError, TypeError, LookupError)


def _encode(prefs: UserPreferences) -> bytes:
return dumps_indent(prefs.to_dict())


def _decode(raw: bytes) -> UserPreferences:
"""Decode stored preferences; raise on a corrupt or incompatible payload.

Corruption propagates (rather than defaulting) so the caller can preserve
the file for recovery instead of silently overwriting it.
"""
obj = loads(raw)
if not isinstance(obj, dict):
raise TypeError("preferences payload is not a JSON object")
return UserPreferences.from_dict(obj)


class PreferencesStore:
"""RAM-canonical user preferences; writes go through a debounced ``Store``."""

def __init__(self, config_dir: Path, shutdown_register: ShutdownRegister) -> None:
self._config_dir = config_dir
self._state = UserPreferences()
# Set when an undecodable file couldn't be renamed aside; suppresses all
# writes so a later save can't overwrite the still-recoverable corrupt file.
self._persist_disabled = False
self._store: Store[UserPreferences] = Store(
config_dir / _STORE_FILENAME,
encoder=_encode,
decoder=_decode,
shutdown_register=shutdown_register,
name="preferences",
)

async def async_load(self) -> None:
"""Seed RAM from disk; migrate the sidecar's ``_preferences`` on first run.

Undecodable data is preserved, never destroyed: a corrupt dedicated file
is renamed aside (then the legacy sidecar is still tried, so a recoverable
blob isn't lost), and an undecodable legacy blob is left in place. Both
fall back to defaults.
"""
loop = asyncio.get_running_loop()
try:
loaded = await self._store.async_load()
except _DECODE_ERRORS:
_LOGGER.exception(
"preferences store: %s is undecodable; preserving it and using defaults",
_STORE_FILENAME,
)
await loop.run_in_executor(None, self._preserve_corrupt_file)
await self._migrate_from_sidecar(loop)
return
if loaded is not None:
self._state = loaded
return
await self._migrate_from_sidecar(loop)

async def _migrate_from_sidecar(self, loop: asyncio.AbstractEventLoop) -> None:
"""Adopt the legacy ``_preferences`` blob, persist it, then strip the key.

The strip is gated on a confirmed write so an unconfirmed flush can't lose
the prefs on restart (see :meth:`_confirm_and_strip_shared_sync`).
"""
migrated = await loop.run_in_executor(None, self._migrate_read_shared_sync)
if migrated is None:
return
self._state = migrated
if self._persist_disabled:
return
self._store.async_delay_save(self._snapshot, delay=0.0)
await self._store.async_save_now()
stripped = await loop.run_in_executor(None, self._confirm_and_strip_shared_sync)
if not stripped:
_LOGGER.warning(
"preferences store: %s write unconfirmed; keeping %s in %s to retry",
_STORE_FILENAME,
_PREFS_KEY,
_SHARED_SIDECAR_FILENAME,
)
return
_LOGGER.info(
"Migrated preferences from %s to %s", _SHARED_SIDECAR_FILENAME, _STORE_FILENAME
)

def snapshot(self) -> UserPreferences:
"""Return a copy of the current preferences (sync; for the subscribe snapshot).

A copy so a caller mutating it can't corrupt the canonical RAM state
(which would skip the debounced write and be lost on restart).
"""
return self._copy()

def update(
self, fields: dict[str, Any], *, delay: float = _DEFAULT_SAVE_DELAY
) -> UserPreferences:
"""Merge a validated partial dict and schedule a debounced save."""
self._state = UserPreferences.from_dict({**self._state.to_dict(), **fields})
self._schedule_save(delay=delay)
return self._copy()

def mutate(
self,
fn: Callable[[UserPreferences], UserPreferences | None],
*,
delay: float = _DEFAULT_SAVE_DELAY,
) -> UserPreferences:
"""Apply *fn* to a copy, replace RAM, schedule a save, return the result.

*fn* may mutate the passed copy in place and return ``None`` (in-RAM
state is always replaced, never mutated in place, so a borrowed
:meth:`snapshot` reference stays stable).
"""
working = self._copy()
result = fn(working)
if result is None:
result = working
self._state = result
self._schedule_save(delay=delay)
return self._copy()

def _schedule_save(self, *, delay: float) -> None:
"""Schedule a debounced write, unless persistence has been disabled."""
if self._persist_disabled:
return
self._store.async_delay_save(self._snapshot, delay=delay)

def _copy(self) -> UserPreferences:
"""Return a fresh, independent copy of the canonical RAM state."""
return UserPreferences.from_dict(self._state.to_dict())

def _snapshot(self) -> UserPreferences:
return self._state

def _preserve_corrupt_file(self) -> None:
"""Rename the undecodable dedicated file aside so the next save can't erase it.

If the rename fails, disable persistence: leaving the corrupt file in
place and then writing over it would destroy the recoverable data this
method exists to protect.
"""
path = self._config_dir / _STORE_FILENAME
try:
path.replace(path.with_name(path.name + ".corrupt"))
except OSError:
self._persist_disabled = True
_LOGGER.warning(
"Could not preserve corrupt preferences file %s; disabling writes to keep it",
path,
exc_info=True,
)
Comment thread
bdraco marked this conversation as resolved.

def _migrate_read_shared_sync(self) -> UserPreferences | None:
"""Decode the sidecar's ``_preferences`` blob.

Returns ``None`` when the key is absent or undecodable; an undecodable
legacy blob is logged and left in the sidecar (the caller doesn't strip
it) so the data stays recoverable rather than being replaced by defaults.
"""
shared_path = self._config_dir / _SHARED_SIDECAR_FILENAME
if not shared_path.exists():
return None
data = _load_metadata(self._config_dir)
if _PREFS_KEY not in data:
return None
try:
return UserPreferences.from_dict(data[_PREFS_KEY])
except _DECODE_ERRORS:
_LOGGER.exception(
"preferences store: legacy _preferences blob undecodable; left in %s for recovery",
_SHARED_SIDECAR_FILENAME,
)
return None

def _confirm_and_strip_shared_sync(self) -> bool:
"""Strip the migrated ``_preferences`` key once the dedicated write landed.

Returns ``False`` only when the dedicated-file write is unconfirmed
(``Store`` swallows write errors), so the caller keeps the legacy key for
a retry. A *strip* failure is non-fatal and still returns ``True``: the
dedicated file is already canonical and a leftover legacy key is benign
(the dedicated file wins on the next load), so it must not abort boot.
The ``exists`` probe and the strip share one executor hop.
"""
if not self._store.path.exists():
return False
try:
with metadata_transaction(self._config_dir) as data:
data.pop(_PREFS_KEY, None)
except OSError:
_LOGGER.warning(
"preferences store: migrated prefs but could not strip %s from %s; "
"the dedicated file wins, leaving the stale key",
_PREFS_KEY,
_SHARED_SIDECAR_FILENAME,
exc_info=True,
)
return True
25 changes: 18 additions & 7 deletions esphome_device_builder/controllers/config/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
read_secrets_yaml,
write_secret,
)
from ...helpers.storage import ShutdownCallback
from ...helpers.storage_path import resolve_storage_path
from ...models import ErrorCode, UserPreferences
from ._preferences_store import PreferencesStore
from .chip_detect import (
_detect_chip_via_esptool,
_detect_failure_message,
_is_valid_port_name,
_read_app_descriptor_board_id,
)
from .preferences import load_preferences, update_preferences

if TYPE_CHECKING:
from ...device_builder import DeviceBuilder
Expand All @@ -37,6 +38,19 @@ class ConfigController:

def __init__(self, device_builder: DeviceBuilder) -> None:
self._db = device_builder
self._shutdown_callbacks: list[ShutdownCallback] = []
self.prefs = PreferencesStore(
device_builder.settings.config_dir, self._shutdown_callbacks.append
)

async def async_load(self) -> None:
"""Seed the RAM-canonical preferences store (and migrate on first run)."""
await self.prefs.async_load()

async def stop(self) -> None:
"""Flush the preferences store on shutdown."""
for callback in self._shutdown_callbacks:
await callback()

@api_command("config/version")
async def get_version(self, **kwargs: Any) -> dict:
Expand Down Expand Up @@ -94,9 +108,8 @@ async def detect_chip_cmd(self, **kwargs: Any) -> dict:

@api_command("config/get_preferences")
async def get_prefs(self, **kwargs: Any) -> UserPreferences:
"""Get user preferences."""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, load_preferences, self._db.settings.config_dir)
"""Get user preferences (RAM-canonical via the store)."""
return self.prefs.snapshot()

@api_command("config/set_preferences")
async def set_prefs(self, **kwargs: Any) -> UserPreferences:
Expand All @@ -105,10 +118,8 @@ async def set_prefs(self, **kwargs: Any) -> UserPreferences:
Accepts partial updates — only provided fields are changed,
others keep their current values.
"""
loop = asyncio.get_running_loop()
config_dir = self._db.settings.config_dir
update_fields = {k: v for k, v in kwargs.items() if k not in ("client", "message_id")}
return await loop.run_in_executor(None, update_preferences, config_dir, update_fields)
return self.prefs.update(update_fields)

@api_command("config/get_secrets")
async def get_secrets(self, **kwargs: Any) -> list[str]:
Expand Down
Loading
Loading