Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
7888090
First stab at implementing the "queued OTA update" feature for offlin…
rwalker777 Jun 12, 2026
b309322
Merge branch 'esphome:main' into queued-offline-updates
rwalker777 Jun 12, 2026
fa3336d
Expand tests.
rwalker777 Jun 12, 2026
1583cfe
Merge branch 'queued-offline-updates' of https://github.com/rwalker77…
rwalker777 Jun 12, 2026
3ff8cee
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 13, 2026
7d3099a
Refactor controller per AI feedback.
rwalker777 Jun 13, 2026
b0a5816
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 13, 2026
3d2e9c9
Fix ruff errros and test case.
rwalker777 Jun 15, 2026
5d3f7e4
Fix more validation errors.
rwalker777 Jun 15, 2026
5688f18
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 15, 2026
9c5c8f9
Fix offline queue tests.
rwalker777 Jun 15, 2026
720bfed
Merge branch 'queued-offline-updates' of https://github.com/rwalker77…
rwalker777 Jun 15, 2026
0a3bcc0
Merge branch 'main' into queued-offline-updates
rwalker777 Jun 15, 2026
e447802
Add clear queued update option and corresponding tests.
rwalker777 Jun 16, 2026
9849537
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 16, 2026
164ce7e
Fix Ruff validation.
rwalker777 Jun 16, 2026
589184d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 16, 2026
8beaa7a
Complete test coverage.
rwalker777 Jun 16, 2026
4d34a38
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 16, 2026
4306ec6
Fix test regression for non-OTA update.
rwalker777 Jun 16, 2026
59f7206
Try regreggion fix again.
rwalker777 Jun 16, 2026
2ce2fcf
Forgot one.
rwalker777 Jun 16, 2026
5a7470e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 16, 2026
9a64fca
Add more test coverage.
rwalker777 Jun 16, 2026
3cdb698
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 16, 2026
2737f13
Add empty init file.
rwalker777 Jun 16, 2026
d07d542
Update API.MD, refactor controller, rename test per bot, refactor tests.
rwalker777 Jun 16, 2026
964d340
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 16, 2026
a808f26
Fix linter.
rwalker777 Jun 16, 2026
b853f1a
Add all the tests that I could possible think of.
rwalker777 Jun 16, 2026
81d2f87
Fix ruff.
rwalker777 Jun 16, 2026
cdde7f9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 16, 2026
7bf3d25
Merge branch 'main' into queued-offline-updates
rwalker777 Jun 16, 2026
eb74870
Restore comment that should not have been deleted.
rwalker777 Jun 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ Connections that arrive on the trusted ingress site (HA add-on supervisor proxy)
| `firmware/upload` | `{configuration, port?: ""}` | `FirmwareJob` | Queue upload of existing binary. `port` defaults to `""` (no `--device` arg — CLI auto-detects). Also accepts `"OTA"`, a serial path (`/dev/ttyUSB0`, `COM3`), or an explicit IP / hostname for "install to a specific address" — the address-cache shortcut is bypassed when a target is named directly. |
| `firmware/install` | `{configuration, port?: "OTA" \| serial \| ip \| hostname, force_local?: bool}` | `FirmwareJob` (the COMPILE job) | Queue an install as a **two-job chain**: a `COMPILE` job + a dependent `UPLOAD` job (`FirmwareJob.depends_on` = the compile's `job_id`). Returns the COMPILE job; the UPLOAD renders as queued and starts only after the compile succeeds, on the **upload lane** so it doesn't block the next device's compile. A cancelled/failed compile cascades to cancel the held upload (a cancelled build never flashes). `port` (defaults `"OTA"`) lands on the UPLOAD job. `force_local=true` bypasses the scheduler (compile runs LOCAL). Remote installs use the same chain — the remote compile materialises artifacts locally, then the local upload lane flashes. |
| `firmware/clean` | `{configuration}` | `FirmwareJob` | Queue build clean for one device. **Cancels any in-flight build (compile/upload/install) for that configuration first** — a clean is the user asking for a fresh build, and the two lanes mean the upload could otherwise read artifacts the clean is wiping. The cancelled jobs fire `JOB_CANCELLED`. |
| `firmware/clear_queued_update` | `{configuration}` | — | Clears a staged offline update for a device that hasn't woken up yet. |
| `firmware/reset_build_env` | — | `FirmwareJob` | Queue full reset of `.esphome/` build dirs and PIO cache. **Cancels every in-flight job on both lanes first** (the wipe trashes the whole tree, which a concurrent compile or upload would race). |
| `firmware/compile_bulk` | `{configurations: string[]}` | `[FirmwareJob]` | Queue multiple compiles |
| `firmware/install_bulk` | `{configurations: string[], port?: "OTA" \| serial \| ip \| hostname}` | `[FirmwareJob]` | Queue multiple installs. `port` defaults to `"OTA"` and is shared across every queued job — almost always callers want that default rather than a single explicit target across the fleet. Same `port` validation as `firmware/install`. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@
ImportableAddedCallback = Callable[[AdoptableDevice], None]
ImportableRemovedCallback = Callable[[str], None]

# Set and clear persistent queued_update flag through the state callback path.
QueuedUpdateChangeCallback = Callable[[str, bool], None]


class DeviceStateMonitor(TaskControllerBase): # noqa: PLR0904 (grandfathered; new public methods need a refactor first)
"""
Expand All @@ -101,6 +104,7 @@ def __init__(
on_mac_address_change: MacAddressChangeCallback | None = None,
on_importable_added: ImportableAddedCallback | None = None,
on_importable_removed: ImportableRemovedCallback | None = None,
on_queued_update_change: QueuedUpdateChangeCallback | None = None,
reachability: ReachabilityTracker | None = None,
is_ignored: Callable[[str], bool] | None = None,
get_devices_by_name: Callable[[str], list[Device]] | None = None,
Expand All @@ -125,6 +129,7 @@ def __init__(
self._on_mac_address_change = on_mac_address_change
self._on_importable_added = on_importable_added
self._on_importable_removed = on_importable_removed
self._on_queued_update_change = on_queued_update_change
self._is_ignored = is_ignored or (lambda _name: False)
self.state = MonitorState(reachability=reachability)
self._ping_task: asyncio.Task | None = None
Expand Down Expand Up @@ -387,6 +392,15 @@ def apply_mac_address(self, name: str, mac: str) -> bool:
self._on_mac_address_change(name, normalized)
return True

def apply_queued_update(self, name: str, *, is_queued: bool) -> bool:
"""Record that a local compile finished and is waiting for device wake."""
if self._on_queued_update_change is None:
return False
if not self._any_matching_device_differs(name, "queued_update", is_queued):
return False
self._on_queued_update_change(name, is_queued)
return True

def _any_matching_device_differs(self, name: str, attr: str, value: Any) -> bool:
"""
Return True iff some configured device named *name* has ``attr != value``.
Expand Down
9 changes: 9 additions & 0 deletions esphome_device_builder/controllers/devices/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def __init__(self, device_builder: DeviceBuilder) -> None:
on_state_change=self._on_state_change,
on_ip_change=self._on_ip_change,
on_version_change=self._on_version_change,
on_queued_update_change=self._on_queued_update_change,
on_config_hash_change=self._on_config_hash_change,
on_api_encryption_change=self._on_api_encryption_change,
on_mac_address_change=self._on_mac_address_change,
Expand Down Expand Up @@ -317,6 +318,14 @@ def get_ota_address_cache_args(self, configuration: str, port: str | None) -> li
return []
return self.get_address_cache_args(configuration)

def _on_queued_update_change(self, name: str, is_queued: bool) -> None: # noqa: FBT001
"""Handle offline queued update flag transitions and persist."""
for device in self.get_devices():
if device.name == name:
device.queued_update = is_queued
self._metadata_store.update(device.configuration, queued_update=is_queued)
self._fire_device_updated(device)

# ------------------------------------------------------------------
# API commands — listing
# ------------------------------------------------------------------
Expand Down
74 changes: 74 additions & 0 deletions esphome_device_builder/controllers/firmware/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@

from ...helpers.api import CommandError, api_command
from ...helpers.async_ import create_eager_task
from ...helpers.event_bus import Event
from ...models import (
LOCAL_JOB_BUILD_SOURCE,
DeviceState,
ErrorCode,
EventType,
FirmwareJob,
JobBuildSource,
JobStatus,
Expand Down Expand Up @@ -69,11 +72,45 @@ def __init__(self, device_builder: DeviceBuilder) -> None:
# overwrite fresher state on disk).
self._persist_lock = asyncio.Lock()

self.bus.add_listener(EventType.DEVICE_STATE_CHANGED, self._handle_device_wake)

@property
def bus(self) -> EventBus:
"""The event bus for lifecycle / output events — read-only shorthand for ``_db.bus``."""
return self._db.bus

def _device_for_configuration(self, configuration: str) -> Any | None:
"""Resolve a Device by its configuration filename."""
if self._db.devices is None:
return None

# Extract devices iterable, safely handling test stubs (like StubDevices)
if hasattr(self._db.devices, "get_devices"):
devices = self._db.devices.get_devices()
elif isinstance(self._db.devices, list):
devices = self._db.devices
else:
devices = []

return next(
(d for d in devices if getattr(d, "configuration", None) == configuration), None
)

def _handle_device_wake(self, event: Event) -> None:
"""Intercept device wake to trigger queued updates and prevent flapping."""
if event.data["state"] != DeviceState.ONLINE.value:
return

config = event.data["configuration"]
device = self._device_for_configuration(config)

if device and getattr(device, "queued_update", False):
_LOGGER.info("Device %s woke up. Triggering queued offline update.", config)
if self._db.devices and hasattr(self._db.devices, "_state_monitor"):
self._db.devices._state_monitor.apply_queued_update(device.name, is_queued=False)

create_eager_task(self.upload(configuration=config, port="OTA"))

# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
Expand Down Expand Up @@ -166,6 +203,21 @@ async def upload(self, *, configuration: str, port: str = "", **kwargs: Any) ->
async def clean(self, *, configuration: str, **kwargs: Any) -> FirmwareJob:
return await clean_mod.clean(self, configuration=configuration)

@api_command("firmware/clear_queued_update")
async def clear_queued_update(self, *, configuration: str, **kwargs: Any) -> None:
"""Manually clear the queued_update flag for a device."""
await self._validate_configuration_boundary(configuration)

device = self._device_for_configuration(configuration)
if (
device
and getattr(device, "queued_update", False)
and self._db.devices
and hasattr(self._db.devices, "_state_monitor")
):
self._db.devices._state_monitor.apply_queued_update(device.name, is_queued=False)
_LOGGER.info("Queued update cleared for device %s", configuration)

@api_command("firmware/reset_build_env")
async def reset_build_env(self, **kwargs: Any) -> FirmwareJob:
"""
Expand Down Expand Up @@ -213,6 +265,16 @@ async def install(
"""
_validate_port(port)
await self._validate_configuration_boundary(configuration)

if port == "OTA":
device = self._device_for_configuration(configuration)
# Suggestion 1: Gated ONLY on OFFLINE, avoiding UNKNOWN startup states
if device and device.state == DeviceState.OFFLINE:
_LOGGER.info("Device %s is offline. Queuing compile-only job.", configuration)
build_source = self._resolve_install_source(force_local=True)
job = self._create_job(configuration, JobType.COMPILE, build_source=build_source)
return await self._enqueue(job)

build_source = self._resolve_install_source(force_local=force_local)
# Install is a compile + a dependent local upload. The compile (local
# or dispatched to a receiver) materialises the binary locally; the
Expand Down Expand Up @@ -388,6 +450,18 @@ async def _run_queue(self) -> None:
async def _execute_job(self, job: FirmwareJob, lane: Lane) -> None:
await runner.execute_job(self, job, lane)

is_comp = job.job_type == JobType.COMPILE
is_done = job.status == JobStatus.COMPLETED
if is_comp and is_done:
device = self._device_for_configuration(job.configuration)
if (
device
and device.state == DeviceState.OFFLINE
and self._db.devices
and hasattr(self._db.devices, "_state_monitor")
):
self._db.devices._state_monitor.apply_queued_update(device.name, is_queued=True)

async def _execute_remote_job(self, job: FirmwareJob) -> None:
await runner.execute_remote_job(self, job)

Expand Down
3 changes: 3 additions & 0 deletions esphome_device_builder/models/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class Device(DataClassORJSONMixin):
web_port: int | None = None
current_version: str = ""
deployed_version: str = ""
# Flag to determine if a local offline compilation has finished
# successfully and is waiting to be flashed via OTA upon the next mDNS check-in.
queued_update: bool = False
# 8-char hex hash of the YAML as last successfully compiled.
# Persisted in the metadata sidecar; matches what ESPHome's
# runtime publishes via ``App.get_config_hash()``.
Expand Down
Empty file.
73 changes: 73 additions & 0 deletions tests/controllers/_device_state_monitor/test_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""Tests for the DeviceStateMonitor controller."""

from unittest.mock import MagicMock

import pytest

from esphome_device_builder.controllers._device_state_monitor.controller import DeviceStateMonitor


@pytest.fixture
def monitor():
"""Fixture to provide a mock DeviceStateMonitor."""
# Create the callback mock
mock_on_queued = MagicMock()

return DeviceStateMonitor(
get_devices=MagicMock(return_value=[]),
on_state_change=MagicMock(),
on_ip_change=MagicMock(),
on_queued_update_change=mock_on_queued, # Pass the callback here
)


def test_apply_queued_update(monitor):
"""Test that apply_queued_update triggers the callback correctly."""
device_name = "test_device"

# We must mock _any_matching_device_differs because it checks device state
# to decide if the change is "real" or redundant.
monitor._any_matching_device_differs = MagicMock(return_value=True)

# 1. Test setting to True
result = monitor.apply_queued_update(device_name, is_queued=True)

# Verify the method returned True (indicating a change occurred)
assert result is True
# Verify the callback was fired
monitor._on_queued_update_change.assert_called_with(device_name, True)

# 2. Test setting to False
result = monitor.apply_queued_update(device_name, is_queued=False)

assert result is True
monitor._on_queued_update_change.assert_called_with(device_name, False)


def test_apply_queued_update_missing_callback():
"""Test early return if no callback is registered."""
monitor = DeviceStateMonitor(
get_devices=MagicMock(),
on_state_change=MagicMock(),
on_ip_change=MagicMock(),
on_queued_update_change=None, # Callback omitted
)
assert monitor.apply_queued_update("kitchen", is_queued=True) is False


def test_apply_queued_update_no_diff(monitor):
"""Test early return if the device state already matches."""
monitor._any_matching_device_differs = MagicMock(return_value=False)
monitor._on_queued_update_change = MagicMock()

assert monitor.apply_queued_update("kitchen", is_queued=True) is False
monitor._on_queued_update_change.assert_not_called()


def test_apply_queued_update_triggers_callback(monitor):
"""Test standard execution when the state differs."""
monitor._any_matching_device_differs = MagicMock(return_value=True)
monitor._on_queued_update_change = MagicMock()

assert monitor.apply_queued_update("kitchen", is_queued=True) is True
monitor._on_queued_update_change.assert_called_once_with("kitchen", True)
29 changes: 29 additions & 0 deletions tests/controllers/devices/test_update_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import asyncio
from pathlib import Path
from unittest.mock import MagicMock

from esphome_device_builder.controllers.config import (
get_device_metadata,
Expand Down Expand Up @@ -192,3 +193,31 @@ async def test_update_device_persists_via_executor(
# The atomic-replace landed a real JSON file on disk.
sidecar = tmp_path / ".device-builder.json"
assert sidecar.exists()


def test_on_queued_update_change_updates_and_persists(make_controller, tmp_path):
"""Test that changing the queued flag updates memory, disk, and the frontend."""
# 1. Create the controller using the factory fixture,
# passing the tmp_path as the config_dir
devices_controller = make_controller(config_dir=tmp_path)

# 2. Setup a mock device
mock_device = MagicMock()
mock_device.name = "kitchen"
mock_device.configuration = "kitchen.yaml"
mock_device.queued_update = False

# 3. Wire the controller dependencies
devices_controller.get_devices = MagicMock(return_value=[mock_device])
devices_controller._metadata_store = MagicMock()
devices_controller._fire_device_updated = MagicMock()

# 4. Execute
devices_controller._on_queued_update_change("kitchen", is_queued=True)

# 5. Assert
assert mock_device.queued_update is True
devices_controller._metadata_store.update.assert_called_once_with(
"kitchen.yaml", queued_update=True
)
devices_controller._fire_device_updated.assert_called_once_with(mock_device)
Loading
Loading