From eefa1ec279ca3fd6acb2f5a1c303546bd433516b Mon Sep 17 00:00:00 2001 From: Mahimai Raja J Date: Wed, 6 May 2026 14:10:18 -0400 Subject: [PATCH 01/10] chore(deps): add watchfiles for file watcher Promote watchfiles from a transitive dependency (via livekit-agents) to an explicit one. Used by the file watcher landing in src/openrtc/execution/file_watcher.py for MAH-80. Pin >=0.21,<2 to accept the originally-targeted 0.21 line and the current 1.x (1.1.1 ships with livekit-agents 1.5). --- pyproject.toml | 1 + uv.lock | 2 ++ 2 files changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 565ddec..c6d1228 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ license = "MIT" requires-python = ">=3.11,<3.14" dependencies = [ "livekit-agents[openai,silero,turn-detector]~=1.5", + "watchfiles>=0.21,<2", ] [project.optional-dependencies] diff --git a/uv.lock b/uv.lock index 71dc629..09f7ac8 100644 --- a/uv.lock +++ b/uv.lock @@ -1347,6 +1347,7 @@ name = "openrtc" source = { editable = "." } dependencies = [ { name = "livekit-agents", extra = ["openai", "silero", "turn-detector"] }, + { name = "watchfiles" }, ] [package.optional-dependencies] @@ -1383,6 +1384,7 @@ requires-dist = [ { name = "textual", marker = "extra == 'tui'", specifier = ">=0.47,<2" }, { name = "typer", marker = "extra == 'cli'", specifier = ">=0.12" }, { name = "typer", marker = "extra == 'tui'", specifier = ">=0.12" }, + { name = "watchfiles", specifier = ">=0.21,<2" }, ] provides-extras = ["cli", "tui"] From 7867359c0e2230d4490214226c50add29be76aa7 Mon Sep 17 00:00:00 2001 From: Mahimai Raja J Date: Wed, 6 May 2026 14:11:51 -0400 Subject: [PATCH 02/10] feat(execution): add FileChange dataclass for file watcher events Introduce src/openrtc/execution/file_watcher.py with the FileChange frozen dataclass (path + change_type). Frozen=True gives equality and hash for free, so FileChange instances can be deduplicated through a set during the debounce coalesce step (MAH-80 Step 7). This is the public event-payload contract for the watcher; subsequent commits add the discovery helper and the FileWatcher class itself. --- src/openrtc/execution/file_watcher.py | 34 ++++++++++++++++++++ tests/execution/__init__.py | 0 tests/execution/test_file_watcher.py | 46 +++++++++++++++++++++++++++ 3 files changed, 80 insertions(+) create mode 100644 src/openrtc/execution/file_watcher.py create mode 100644 tests/execution/__init__.py create mode 100644 tests/execution/test_file_watcher.py diff --git a/src/openrtc/execution/file_watcher.py b/src/openrtc/execution/file_watcher.py new file mode 100644 index 0000000..713fee4 --- /dev/null +++ b/src/openrtc/execution/file_watcher.py @@ -0,0 +1,34 @@ +"""File watcher infrastructure for user agent code. + +The watcher monitors user-edited Python modules and emits debounced +change events. Reload, re-import, and session re-binding are out of +scope here — see MAH-81 onward. This module provides the foundation: +discovery, event shape, and a callback API. + +Public API (locked at design.md §3.5): + +- :class:`FileChange` — frozen dataclass describing a single change +- :class:`FileWatcher` — async watcher with ``start()``/``stop()``/``refresh_paths()`` +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Literal + +ChangeType = Literal["created", "modified", "deleted"] + + +@dataclass(frozen=True) +class FileChange: + """A single filesystem change event. + + Frozen so instances are hashable and can be deduplicated in sets. + Paths are absolute. ``change_type`` is one of ``"created"``, + ``"modified"``, or ``"deleted"`` (mapped from watchfiles' ``Change`` + enum at the watcher boundary). + """ + + path: Path + change_type: ChangeType diff --git a/tests/execution/__init__.py b/tests/execution/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/execution/test_file_watcher.py b/tests/execution/test_file_watcher.py new file mode 100644 index 0000000..536cd42 --- /dev/null +++ b/tests/execution/test_file_watcher.py @@ -0,0 +1,46 @@ +"""Unit tests for the file watcher (MAH-80).""" + +from __future__ import annotations + +import dataclasses +from pathlib import Path + +import pytest + +from openrtc.execution.file_watcher import FileChange + + +class TestFileChange: + """The :class:`FileChange` dataclass is the event-payload contract.""" + + def test_construction(self) -> None: + change = FileChange(path=Path("/tmp/agent.py"), change_type="modified") + assert change.path == Path("/tmp/agent.py") + assert change.change_type == "modified" + + def test_equality_same_values(self) -> None: + a = FileChange(path=Path("/tmp/agent.py"), change_type="modified") + b = FileChange(path=Path("/tmp/agent.py"), change_type="modified") + assert a == b + + def test_equality_different_paths(self) -> None: + a = FileChange(path=Path("/tmp/a.py"), change_type="modified") + b = FileChange(path=Path("/tmp/b.py"), change_type="modified") + assert a != b + + def test_equality_different_change_types(self) -> None: + a = FileChange(path=Path("/tmp/agent.py"), change_type="created") + b = FileChange(path=Path("/tmp/agent.py"), change_type="modified") + assert a != b + + def test_hashable_for_set_deduplication(self) -> None: + a = FileChange(path=Path("/tmp/agent.py"), change_type="modified") + b = FileChange(path=Path("/tmp/agent.py"), change_type="modified") + c = FileChange(path=Path("/tmp/agent.py"), change_type="created") + # Equal instances collapse in a set; unequal ones do not. + assert {a, b, c} == {a, c} + + def test_frozen_rejects_mutation(self) -> None: + change = FileChange(path=Path("/tmp/agent.py"), change_type="modified") + with pytest.raises(dataclasses.FrozenInstanceError): + change.path = Path("/tmp/other.py") # type: ignore[misc] From 0cf78d377679faafba220ca5e249c368ae4f22c9 Mon Sep 17 00:00:00 2001 From: Mahimai Raja J Date: Wed, 6 May 2026 14:13:26 -0400 Subject: [PATCH 03/10] feat(execution): user-module discovery for file watcher Add the private _discover_user_modules() helper. Walks sys.modules, keeps modules with a real __file__, drops anything under site.getsitepackages() / site.getusersitepackages() / sys.prefix / sys.base_prefix, returns distinct absolute paths. Tests cover: synthetic user module under tempdir is included; synthetic site-packages module is excluded; modules with __file__ = None or missing don't raise; aliased modules pointing at the same file collapse to one entry. --- src/openrtc/execution/file_watcher.py | 67 ++++++++++++++++++ tests/execution/test_file_watcher.py | 99 ++++++++++++++++++++++++++- 2 files changed, 165 insertions(+), 1 deletion(-) diff --git a/src/openrtc/execution/file_watcher.py b/src/openrtc/execution/file_watcher.py index 713fee4..7979aef 100644 --- a/src/openrtc/execution/file_watcher.py +++ b/src/openrtc/execution/file_watcher.py @@ -13,6 +13,8 @@ from __future__ import annotations +import site +import sys from dataclasses import dataclass from pathlib import Path from typing import Literal @@ -32,3 +34,68 @@ class FileChange: path: Path change_type: ChangeType + + +def _interpreter_excluded_roots() -> list[Path]: + """Return absolute directory roots whose contents are NOT user code. + + Modules whose ``__file__`` lives under any of these roots are + interpreter, standard library, or third-party package code — not + something a user would edit during a hot-reload session. + """ + roots: list[Path] = [Path(path).resolve() for path in site.getsitepackages()] + user_site = site.getusersitepackages() + if user_site: + roots.append(Path(user_site).resolve()) + roots.append(Path(sys.prefix).resolve()) + roots.append(Path(sys.base_prefix).resolve()) + # Deduplicate while preserving order. + seen: set[Path] = set() + unique: list[Path] = [] + for root in roots: + if root not in seen: + seen.add(root) + unique.append(root) + return unique + + +def _is_under(path: Path, roots: list[Path]) -> bool: + """Return True if *path* is at or below any of *roots*.""" + return any(path.is_relative_to(root) for root in roots) + + +def _discover_user_modules() -> list[Path]: + """Snapshot ``sys.modules`` and return user-editable Python file paths. + + A module is "user-editable" when: + + 1. It exposes a real ``__file__`` attribute (excludes built-ins, + namespace packages, and some C extensions). + 2. The file is NOT under any interpreter or site-packages root + returned by :func:`_interpreter_excluded_roots`. + + Returns absolute, deduplicated paths in module-iteration order. + Modules without a ``__file__`` are skipped silently — that is the + documented "graceful" behavior for built-ins. + """ + excluded = _interpreter_excluded_roots() + seen: set[Path] = set() + discovered: list[Path] = [] + # Snapshot to a list to tolerate sys.modules mutation during iteration. + for module in list(sys.modules.values()): + file_attr = getattr(module, "__file__", None) + if not file_attr: + continue + try: + resolved = Path(file_attr).resolve() + except (OSError, RuntimeError): + # Resolving can fail on broken symlinks or weird platforms; + # skip those rather than break discovery. + continue + if _is_under(resolved, excluded): + continue + if resolved in seen: + continue + seen.add(resolved) + discovered.append(resolved) + return discovered diff --git a/tests/execution/test_file_watcher.py b/tests/execution/test_file_watcher.py index 536cd42..8caaee2 100644 --- a/tests/execution/test_file_watcher.py +++ b/tests/execution/test_file_watcher.py @@ -3,11 +3,18 @@ from __future__ import annotations import dataclasses +import importlib.util +import sys +import types from pathlib import Path import pytest -from openrtc.execution.file_watcher import FileChange +from openrtc.execution.file_watcher import ( + FileChange, + _discover_user_modules, + _interpreter_excluded_roots, +) class TestFileChange: @@ -44,3 +51,93 @@ def test_frozen_rejects_mutation(self) -> None: change = FileChange(path=Path("/tmp/agent.py"), change_type="modified") with pytest.raises(dataclasses.FrozenInstanceError): change.path = Path("/tmp/other.py") # type: ignore[misc] + + +def _install_synthetic_module( + name: str, file_path: Path, monkeypatch: pytest.MonkeyPatch +) -> types.ModuleType: + """Register a real-but-empty module pointing at *file_path*. + + The file must exist on disk, otherwise ``Path.resolve()`` may fail + on some platforms. + """ + file_path.parent.mkdir(parents=True, exist_ok=True) + file_path.write_text("# synthetic test module\n") + spec = importlib.util.spec_from_loader(name, loader=None, origin=str(file_path)) + assert spec is not None + module = importlib.util.module_from_spec(spec) + module.__file__ = str(file_path) + monkeypatch.setitem(sys.modules, name, module) + return module + + +class TestDiscoverUserModules: + """``_discover_user_modules`` is the watcher's source of truth for paths.""" + + def test_includes_user_module_under_tempdir( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + user_file = tmp_path / "user_pkg" / "agent.py" + _install_synthetic_module("openrtc_test_user_pkg.agent", user_file, monkeypatch) + discovered = _discover_user_modules() + assert user_file.resolve() in discovered + + def test_excludes_module_under_site_packages( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + fake_site = tmp_path / "site-packages" + site_file = fake_site / "third_party" / "lib.py" + _install_synthetic_module( + "openrtc_test_third_party.lib", site_file, monkeypatch + ) + # Make _interpreter_excluded_roots() include our fake site dir. + original_roots = _interpreter_excluded_roots() + monkeypatch.setattr( + "openrtc.execution.file_watcher._interpreter_excluded_roots", + lambda: [*original_roots, fake_site.resolve()], + ) + discovered = _discover_user_modules() + assert site_file.resolve() not in discovered + + def test_handles_modules_without_file_attribute( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + # Real built-ins like ``sys`` already lack __file__; verify that + # discovery does not blow up on them (they are present in + # sys.modules during every run). + assert getattr(sys, "__file__", None) is None + discovered = _discover_user_modules() + # Build a synthetic module with __file__ = None to be extra safe. + fake = types.ModuleType("openrtc_test_no_file_module") + fake.__file__ = None # type: ignore[assignment] + monkeypatch.setitem(sys.modules, "openrtc_test_no_file_module", fake) + # Should not raise even with __file__ = None. + discovered_again = _discover_user_modules() + assert isinstance(discovered_again, list) + # The result is at least as informative as the prior call (modules + # without __file__ contribute nothing, never an exception). + assert len(discovered_again) >= 0 + del ( + discovered + ) # silence unused-warning; the first call's purpose was to prove it runs + + def test_returns_distinct_absolute_paths( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + shared_file = tmp_path / "shared.py" + _install_synthetic_module("openrtc_test_alias_a", shared_file, monkeypatch) + # Register the same file under a second module name; discovery + # should deduplicate by resolved path. + spec = importlib.util.spec_from_loader( + "openrtc_test_alias_b", loader=None, origin=str(shared_file) + ) + assert spec is not None + module_b = importlib.util.module_from_spec(spec) + module_b.__file__ = str(shared_file) + monkeypatch.setitem(sys.modules, "openrtc_test_alias_b", module_b) + + discovered = _discover_user_modules() + absolute = [p.is_absolute() for p in discovered] + assert all(absolute) + # The shared file appears at most once. + assert discovered.count(shared_file.resolve()) == 1 From 5f101904404d0a128eb81f16bc53f39389705d96 Mon Sep 17 00:00:00 2001 From: Mahimai Raja J Date: Wed, 6 May 2026 14:15:03 -0400 Subject: [PATCH 04/10] feat(execution): FileWatcher class skeleton with lifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the FileWatcher class with the locked public API (design.md §3.5): construction with on_change + debounce_ms + paths, async start()/stop(), sync refresh_paths(), and the new -> running -> stopped state machine. The skeleton lands without watchfiles wiring or debounce logic — those are MAH-80 Steps 6 and 7. start() and stop() are idempotent; start() after stop() raises (no restart). refresh_paths() rebuilds the path list only for auto-discover watchers; it is a no-op when explicit paths were supplied. Eleven new tests cover construction, debounce-validation, path-copy isolation, lifecycle idempotency, the no-restart guard, and refresh behavior in both auto and explicit modes. --- src/openrtc/execution/file_watcher.py | 81 ++++++++++++++++++ tests/execution/test_file_watcher.py | 114 ++++++++++++++++++++++++++ 2 files changed, 195 insertions(+) diff --git a/src/openrtc/execution/file_watcher.py b/src/openrtc/execution/file_watcher.py index 7979aef..7fb22c4 100644 --- a/src/openrtc/execution/file_watcher.py +++ b/src/openrtc/execution/file_watcher.py @@ -15,11 +15,13 @@ import site import sys +from collections.abc import Awaitable, Callable from dataclasses import dataclass from pathlib import Path from typing import Literal ChangeType = Literal["created", "modified", "deleted"] +WatcherState = Literal["new", "running", "stopped"] @dataclass(frozen=True) @@ -99,3 +101,82 @@ def _discover_user_modules() -> list[Path]: seen.add(resolved) discovered.append(resolved) return discovered + + +class FileWatcher: + """Watch user-edited Python modules and emit debounced change events. + + Public API is locked at design.md §3.5. The watcher is async-native: + ``start()`` schedules a background watch task, ``stop()`` cancels + it gracefully, and ``refresh_paths()`` rebuilds the auto-discovered + path set without restarting. + + Lifecycle: a watcher transitions ``new → running → stopped``. A + stopped watcher cannot be restarted — construct a new one. + """ + + def __init__( + self, + on_change: Callable[[list[FileChange]], Awaitable[None]], + *, + debounce_ms: int = 200, + paths: list[Path] | None = None, + ) -> None: + if debounce_ms <= 0: + raise ValueError( + f"debounce_ms must be > 0, got {debounce_ms}.", + ) + self._on_change = on_change + self._debounce_ms = debounce_ms + # ``paths is None`` → auto-discover, and refresh_paths() will + # re-run discovery. Explicit paths short-circuit discovery. + self._auto_discover = paths is None + self._paths: list[Path] = ( + list(paths) if paths is not None else _discover_user_modules() + ) + self._state: WatcherState = "new" + + @property + def paths(self) -> list[Path]: + """Return the current snapshot of watched paths.""" + return list(self._paths) + + @property + def state(self) -> WatcherState: + """Return the current lifecycle state.""" + return self._state + + def refresh_paths(self) -> None: + """Re-discover user modules when constructed with ``paths=None``. + + No-op when explicit paths were supplied at construction (the + caller manages that list). Synchronous because rebuilding the + path set is a fast in-process snapshot; the live watcher loop + picks up the change on its next event boundary. + """ + if not self._auto_discover: + return + self._paths = _discover_user_modules() + + async def start(self) -> None: + """Begin watching. Idempotent: a second call while running is a no-op. + + Raises ``RuntimeError`` if called after :meth:`stop` — construct + a new watcher instead. + """ + if self._state == "running": + return + if self._state == "stopped": + raise RuntimeError( + "FileWatcher cannot be restarted after stop(); construct a new watcher.", + ) + self._state = "running" + + async def stop(self) -> None: + """Stop watching. Idempotent: safe to call multiple times. + + Calling ``stop()`` on a fresh (never-started) watcher transitions + it directly to ``stopped`` so the no-restart invariant still + holds. + """ + self._state = "stopped" diff --git a/tests/execution/test_file_watcher.py b/tests/execution/test_file_watcher.py index 8caaee2..1c675c8 100644 --- a/tests/execution/test_file_watcher.py +++ b/tests/execution/test_file_watcher.py @@ -12,11 +12,16 @@ from openrtc.execution.file_watcher import ( FileChange, + FileWatcher, _discover_user_modules, _interpreter_excluded_roots, ) +async def _noop_callback(_changes: list[FileChange]) -> None: + """Default callback for lifecycle tests that don't care about events.""" + + class TestFileChange: """The :class:`FileChange` dataclass is the event-payload contract.""" @@ -141,3 +146,112 @@ def test_returns_distinct_absolute_paths( assert all(absolute) # The shared file appears at most once. assert discovered.count(shared_file.resolve()) == 1 + + +class TestFileWatcherLifecycle: + """The :class:`FileWatcher` skeleton — construction, state machine, restart guard. + + These tests exercise the lifecycle contract; debounce + watchfiles + wiring land in later steps. + """ + + def test_construction_with_explicit_paths(self, tmp_path: Path) -> None: + explicit = [tmp_path / "agent.py"] + watcher = FileWatcher(_noop_callback, paths=explicit) + assert watcher.paths == explicit + assert watcher.state == "new" + + def test_construction_with_none_triggers_discovery(self) -> None: + watcher = FileWatcher(_noop_callback, paths=None) + # Discovery returns whatever user-edited modules are loaded; + # the guarantee is that some snapshot was captured (even an + # empty list is fine — the contract is "discovery ran"). + assert isinstance(watcher.paths, list) + # Modules outside site-packages should include the test file + # itself; verify by checking absoluteness. + assert all(p.is_absolute() for p in watcher.paths) + + def test_default_debounce_is_200ms(self, tmp_path: Path) -> None: + watcher = FileWatcher(_noop_callback, paths=[tmp_path / "agent.py"]) + # The default is part of the public API contract (design.md §3.5). + assert watcher._debounce_ms == 200 # noqa: SLF001 — testing the public default + + def test_rejects_non_positive_debounce(self, tmp_path: Path) -> None: + with pytest.raises(ValueError, match="debounce_ms"): + FileWatcher(_noop_callback, debounce_ms=0, paths=[tmp_path / "x.py"]) + with pytest.raises(ValueError, match="debounce_ms"): + FileWatcher(_noop_callback, debounce_ms=-1, paths=[tmp_path / "x.py"]) + + def test_paths_is_a_copy(self, tmp_path: Path) -> None: + explicit = [tmp_path / "agent.py"] + watcher = FileWatcher(_noop_callback, paths=explicit) + # Mutating the original must not mutate the watcher's view. + explicit.append(tmp_path / "extra.py") + assert watcher.paths == [tmp_path / "agent.py"] + + +@pytest.mark.asyncio +class TestFileWatcherAsyncLifecycle: + """``start`` / ``stop`` are async; assert idempotency and the no-restart rule.""" + + async def test_start_is_idempotent(self, tmp_path: Path) -> None: + watcher = FileWatcher(_noop_callback, paths=[tmp_path / "agent.py"]) + await watcher.start() + assert watcher.state == "running" + # Second start: no-op, no error. + await watcher.start() + assert watcher.state == "running" + await watcher.stop() + + async def test_stop_is_idempotent(self, tmp_path: Path) -> None: + watcher = FileWatcher(_noop_callback, paths=[tmp_path / "agent.py"]) + await watcher.start() + await watcher.stop() + assert watcher.state == "stopped" + # Second stop: no-op, no error. + await watcher.stop() + assert watcher.state == "stopped" + + async def test_stop_on_fresh_watcher(self, tmp_path: Path) -> None: + watcher = FileWatcher(_noop_callback, paths=[tmp_path / "agent.py"]) + # Never started → still safe to stop. Guarantees the no-restart + # invariant: a fresh+stopped watcher cannot be revived. + await watcher.stop() + assert watcher.state == "stopped" + with pytest.raises(RuntimeError, match="cannot be restarted"): + await watcher.start() + + async def test_start_after_stop_raises(self, tmp_path: Path) -> None: + watcher = FileWatcher(_noop_callback, paths=[tmp_path / "agent.py"]) + await watcher.start() + await watcher.stop() + with pytest.raises(RuntimeError, match="cannot be restarted"): + await watcher.start() + + +class TestFileWatcherRefreshPaths: + """``refresh_paths`` only re-runs discovery for auto-discover watchers.""" + + def test_refresh_with_auto_discover(self, monkeypatch: pytest.MonkeyPatch) -> None: + watcher = FileWatcher(_noop_callback, paths=None) + sentinel = [Path("/tmp/refresh_marker.py")] + monkeypatch.setattr( + "openrtc.execution.file_watcher._discover_user_modules", + lambda: list(sentinel), + ) + watcher.refresh_paths() + assert watcher.paths == sentinel + + def test_refresh_with_explicit_paths_is_noop( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + explicit = [tmp_path / "agent.py"] + watcher = FileWatcher(_noop_callback, paths=explicit) + # Even if discovery would return something, refresh must not + # touch an explicitly-managed path list. + monkeypatch.setattr( + "openrtc.execution.file_watcher._discover_user_modules", + lambda: [Path("/tmp/should_not_appear.py")], + ) + watcher.refresh_paths() + assert watcher.paths == explicit From 1af385b04a44b219ea126f505b808b725871c70f Mon Sep 17 00:00:00 2001 From: Mahimai Raja J Date: Wed, 6 May 2026 14:17:31 -0400 Subject: [PATCH 05/10] feat(execution): FileWatcher wires watchfiles for filesystem events start() now spawns a background asyncio task that consumes watchfiles.awatch(*paths, stop_event=...) and translates each (Change, str) into a FileChange. The watchfiles Change enum is mapped explicitly: added -> created, modified -> modified, deleted -> deleted; unrecognized variants are dropped. Events land in self._pending. The trailing-edge debounce that flushes the buffer through on_change ships in the next commit (Step 7). stop() sets the stop event and cancels the watch task, awaiting it under contextlib.suppress(CancelledError) so shutdown is clean. Three new tests cover the live wiring (write -> buffered FileChange), clean cancellation, and the empty-path-list edge case. --- src/openrtc/execution/file_watcher.py | 67 +++++++++++++++++++++++++++ tests/execution/test_file_watcher.py | 62 +++++++++++++++++++++++++ 2 files changed, 129 insertions(+) diff --git a/src/openrtc/execution/file_watcher.py b/src/openrtc/execution/file_watcher.py index 7fb22c4..e6fdb7e 100644 --- a/src/openrtc/execution/file_watcher.py +++ b/src/openrtc/execution/file_watcher.py @@ -13,6 +13,9 @@ from __future__ import annotations +import asyncio +import contextlib +import logging import site import sys from collections.abc import Awaitable, Callable @@ -20,9 +23,19 @@ from pathlib import Path from typing import Literal +import watchfiles + ChangeType = Literal["created", "modified", "deleted"] WatcherState = Literal["new", "running", "stopped"] +_log = logging.getLogger(__name__) + +_WATCHFILES_CHANGE_MAP: dict[watchfiles.Change, ChangeType] = { + watchfiles.Change.added: "created", + watchfiles.Change.modified: "modified", + watchfiles.Change.deleted: "deleted", +} + @dataclass(frozen=True) class FileChange: @@ -135,6 +148,11 @@ def __init__( list(paths) if paths is not None else _discover_user_modules() ) self._state: WatcherState = "new" + # Filled in on start(). _pending collects changes between + # debounce flushes; the trailing-edge debounce lands in Step 7. + self._pending: list[FileChange] = [] + self._stop_event: asyncio.Event | None = None + self._watch_task: asyncio.Task[None] | None = None @property def paths(self) -> list[Path]: @@ -171,6 +189,11 @@ async def start(self) -> None: "FileWatcher cannot be restarted after stop(); construct a new watcher.", ) self._state = "running" + self._stop_event = asyncio.Event() + self._watch_task = asyncio.create_task( + self._run_watch_loop(), + name=f"openrtc.file_watcher[{id(self):#x}]", + ) async def stop(self) -> None: """Stop watching. Idempotent: safe to call multiple times. @@ -179,4 +202,48 @@ async def stop(self) -> None: it directly to ``stopped`` so the no-restart invariant still holds. """ + if self._state == "stopped": + return self._state = "stopped" + if self._stop_event is not None: + self._stop_event.set() + if self._watch_task is not None: + self._watch_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._watch_task + self._watch_task = None + self._stop_event = None + + async def _run_watch_loop(self) -> None: + """Background task: consume ``watchfiles.awatch`` and buffer events. + + Step 6 lands the buffer; Step 7 swaps appends for the trailing + debounce flush. Until then, every change just lands in + ``self._pending`` so tests can verify the wiring. + """ + if not self._paths: + # No paths to watch — block until stop(). + assert self._stop_event is not None + await self._stop_event.wait() + return + assert self._stop_event is not None + try: + async for changes in watchfiles.awatch( + *self._paths, + stop_event=self._stop_event, + ): + for change_kind, raw_path in changes: + change_type = _WATCHFILES_CHANGE_MAP.get(change_kind) + if change_type is None: + # watchfiles may add new variants; ignore unknowns. + continue + self._pending.append( + FileChange( + path=Path(raw_path), + change_type=change_type, + ) + ) + except asyncio.CancelledError: + raise + except Exception: # noqa: BLE001 — logged and swallowed + _log.exception("FileWatcher loop crashed; events will stop firing") diff --git a/tests/execution/test_file_watcher.py b/tests/execution/test_file_watcher.py index 1c675c8..ce33fd2 100644 --- a/tests/execution/test_file_watcher.py +++ b/tests/execution/test_file_watcher.py @@ -2,10 +2,12 @@ from __future__ import annotations +import asyncio import dataclasses import importlib.util import sys import types +from collections.abc import Callable from pathlib import Path import pytest @@ -255,3 +257,63 @@ def test_refresh_with_explicit_paths_is_noop( ) watcher.refresh_paths() assert watcher.paths == explicit + + +async def _wait_until( + predicate: Callable[[], bool], + *, + timeout_s: float = 2.0, + poll_s: float = 0.02, +) -> bool: + """Poll until *predicate* is True or *timeout_s* elapses.""" + deadline = asyncio.get_running_loop().time() + timeout_s + while asyncio.get_running_loop().time() < deadline: + if predicate(): + return True + await asyncio.sleep(poll_s) + return predicate() + + +@pytest.mark.asyncio +class TestFileWatcherEventWiring: + """Step 6 — events from watchfiles land in the watcher buffer.""" + + async def test_writes_produce_buffered_filechange(self, tmp_path: Path) -> None: + target = tmp_path / "agent.py" + target.write_text("# initial\n") + watcher = FileWatcher(_noop_callback, paths=[target]) + await watcher.start() + try: + # watchfiles takes a few ms to install the OS-level watch on + # macOS; give it a small head start before mutating the file. + await asyncio.sleep(0.1) + target.write_text("# modified\n") + arrived = await _wait_until( + lambda: any( + fc.path.resolve() == target.resolve() + for fc in watcher._pending # noqa: SLF001 — buffer is internal pre-debounce + ), + timeout_s=3.0, + ) + assert arrived, f"No FileChange for target; pending={watcher._pending!r}" # noqa: SLF001 + finally: + await watcher.stop() + + async def test_stop_cancels_watch_task_cleanly(self, tmp_path: Path) -> None: + target = tmp_path / "agent.py" + target.write_text("# initial\n") + watcher = FileWatcher(_noop_callback, paths=[target]) + await watcher.start() + # Give awatch a moment to install. + await asyncio.sleep(0.05) + await watcher.stop() + assert watcher.state == "stopped" + # Internal task field is cleared after a clean shutdown. + assert watcher._watch_task is None # noqa: SLF001 + + async def test_empty_path_list_starts_and_stops(self) -> None: + watcher = FileWatcher(_noop_callback, paths=[]) + await watcher.start() + assert watcher.state == "running" + await watcher.stop() + assert watcher.state == "stopped" From a9c37e4645757dac2a99a040127fb8d8979cda6a Mon Sep 17 00:00:00 2001 From: Mahimai Raja J Date: Wed, 6 May 2026 14:20:32 -0400 Subject: [PATCH 06/10] feat(execution): trailing-edge debounce and callback dispatch in FileWatcher MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the trailing-edge debounce on top of the watchfiles event loop. The pipeline is now: watch loop -> _handle_change_batch (extends _pending, reschedules _flush_task) -> _flush_after sleeps debounce_ms -> _collapse_changes -> on_change. _collapse_changes follows design.md §3.4: deleted dominates, created beats modified, modified is the fall-through. First-seen path order is preserved. User callback exceptions are caught and logged; the watcher keeps running. stop() cancels both the watch task and any in-flight flush task, drops _pending, and shuts down without leaking tasks or firing the cancelled flush. Ten new tests cover the design-doc collapse rules and the four AC-3 scenarios: 1 save -> 1 callback at ~200ms, 5 rapid saves -> 1 callback ~200ms after the last, raising callback isolated, stop during pending flush cancels cleanly with no leaked tasks. --- src/openrtc/execution/file_watcher.py | 101 +++++++++++++- tests/execution/test_file_watcher.py | 181 ++++++++++++++++++++++++++ 2 files changed, 275 insertions(+), 7 deletions(-) diff --git a/src/openrtc/execution/file_watcher.py b/src/openrtc/execution/file_watcher.py index e6fdb7e..d6e32c4 100644 --- a/src/openrtc/execution/file_watcher.py +++ b/src/openrtc/execution/file_watcher.py @@ -149,10 +149,11 @@ def __init__( ) self._state: WatcherState = "new" # Filled in on start(). _pending collects changes between - # debounce flushes; the trailing-edge debounce lands in Step 7. + # debounce flushes; _flush_task fires the trailing-edge flush. self._pending: list[FileChange] = [] self._stop_event: asyncio.Event | None = None self._watch_task: asyncio.Task[None] | None = None + self._flush_task: asyncio.Task[None] | None = None @property def paths(self) -> list[Path]: @@ -200,26 +201,33 @@ async def stop(self) -> None: Calling ``stop()`` on a fresh (never-started) watcher transitions it directly to ``stopped`` so the no-restart invariant still - holds. + holds. A pending debounce flush is cancelled cleanly without + firing the callback. """ if self._state == "stopped": return self._state = "stopped" if self._stop_event is not None: self._stop_event.set() + if self._flush_task is not None: + self._flush_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._flush_task + self._flush_task = None if self._watch_task is not None: self._watch_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._watch_task self._watch_task = None self._stop_event = None + self._pending.clear() async def _run_watch_loop(self) -> None: - """Background task: consume ``watchfiles.awatch`` and buffer events. + """Background task: consume ``watchfiles.awatch`` and feed the debounce. - Step 6 lands the buffer; Step 7 swaps appends for the trailing - debounce flush. Until then, every change just lands in - ``self._pending`` so tests can verify the wiring. + Each batch from watchfiles is converted to ``FileChange`` + instances and handed to :meth:`_handle_change_batch`, which + extends ``self._pending`` and (re)schedules the trailing flush. """ if not self._paths: # No paths to watch — block until stop(). @@ -232,18 +240,97 @@ async def _run_watch_loop(self) -> None: *self._paths, stop_event=self._stop_event, ): + batch: list[FileChange] = [] for change_kind, raw_path in changes: change_type = _WATCHFILES_CHANGE_MAP.get(change_kind) if change_type is None: # watchfiles may add new variants; ignore unknowns. continue - self._pending.append( + batch.append( FileChange( path=Path(raw_path), change_type=change_type, ) ) + if batch: + self._handle_change_batch(batch) except asyncio.CancelledError: raise except Exception: # noqa: BLE001 — logged and swallowed _log.exception("FileWatcher loop crashed; events will stop firing") + + def _handle_change_batch(self, batch: list[FileChange]) -> None: + """Buffer one batch and (re)schedule the trailing debounce flush. + + Called from the watchfiles loop for each emitted batch and from + unit tests directly. The semantics: extend ``_pending``, cancel + any in-flight flush task, schedule a fresh flush + ``debounce_ms / 1000`` seconds from now. If five rapid batches + arrive, only the last reschedule survives — the prior four + flush tasks are cancelled before they fire. + """ + self._pending.extend(batch) + if self._flush_task is not None and not self._flush_task.done(): + self._flush_task.cancel() + self._flush_task = asyncio.create_task( + self._flush_after(self._debounce_ms / 1000.0), + name=f"openrtc.file_watcher.flush[{id(self):#x}]", + ) + + async def _flush_after(self, delay_s: float) -> None: + """Wait *delay_s* seconds then flush ``_pending`` through ``on_change``. + + Cancellation before the timer fires drops the in-flight flush + without firing the callback (used both by the debounce reschedule + and by ``stop()`` for clean shutdown). Exceptions raised by the + user callback are logged and swallowed so the watch loop keeps + running for subsequent events. + """ + try: + await asyncio.sleep(delay_s) + except asyncio.CancelledError: + raise + # Snapshot + clear under the same logical step. If new events + # arrive while on_change is awaiting, _handle_change_batch will + # schedule the next flush around them. + collapsed = _collapse_changes(self._pending) + self._pending.clear() + if not collapsed: + return + try: + await self._on_change(collapsed) + except asyncio.CancelledError: + raise + except Exception: # noqa: BLE001 — user callback isolation + _log.exception( + "FileWatcher.on_change raised; continuing to watch", + ) + + +def _collapse_changes(changes: list[FileChange]) -> list[FileChange]: + """Coalesce multiple events for the same path into one ``FileChange``. + + Per design.md §3.4, the salient state wins: + + - any ``deleted`` in the path's window → emit ``deleted`` (the file + is gone now, regardless of intermediate states) + - else any ``created`` in the window → emit ``created`` (the file is + new; downstream consumers must register it for the first time) + - otherwise → emit ``modified`` + + Output preserves the first-seen order of paths in *changes*. + """ + by_path: dict[Path, list[ChangeType]] = {} + for change in changes: + by_path.setdefault(change.path, []).append(change.change_type) + collapsed: list[FileChange] = [] + for path, types in by_path.items(): + chosen: ChangeType + if "deleted" in types: + chosen = "deleted" + elif "created" in types: + chosen = "created" + else: + chosen = "modified" + collapsed.append(FileChange(path=path, change_type=chosen)) + return collapsed diff --git a/tests/execution/test_file_watcher.py b/tests/execution/test_file_watcher.py index ce33fd2..c3e147c 100644 --- a/tests/execution/test_file_watcher.py +++ b/tests/execution/test_file_watcher.py @@ -5,7 +5,9 @@ import asyncio import dataclasses import importlib.util +import logging import sys +import time import types from collections.abc import Callable from pathlib import Path @@ -15,6 +17,7 @@ from openrtc.execution.file_watcher import ( FileChange, FileWatcher, + _collapse_changes, _discover_user_modules, _interpreter_excluded_roots, ) @@ -317,3 +320,181 @@ async def test_empty_path_list_starts_and_stops(self) -> None: assert watcher.state == "running" await watcher.stop() assert watcher.state == "stopped" + + +class TestCollapseChanges: + """``_collapse_changes`` enforces the design.md §3.4 collapse rules.""" + + def test_single_modified_passthrough(self) -> None: + result = _collapse_changes( + [FileChange(path=Path("/tmp/a.py"), change_type="modified")] + ) + assert result == [FileChange(path=Path("/tmp/a.py"), change_type="modified")] + + def test_created_plus_modified_collapses_to_created(self) -> None: + result = _collapse_changes( + [ + FileChange(path=Path("/tmp/a.py"), change_type="created"), + FileChange(path=Path("/tmp/a.py"), change_type="modified"), + ] + ) + assert result == [FileChange(path=Path("/tmp/a.py"), change_type="created")] + + def test_modified_plus_deleted_collapses_to_deleted(self) -> None: + result = _collapse_changes( + [ + FileChange(path=Path("/tmp/a.py"), change_type="modified"), + FileChange(path=Path("/tmp/a.py"), change_type="deleted"), + ] + ) + assert result == [FileChange(path=Path("/tmp/a.py"), change_type="deleted")] + + def test_deleted_dominates_created(self) -> None: + # Created and deleted in the same window: the file is gone, so + # report deleted (final-state-wins for the deletion case). + result = _collapse_changes( + [ + FileChange(path=Path("/tmp/a.py"), change_type="created"), + FileChange(path=Path("/tmp/a.py"), change_type="deleted"), + ] + ) + assert result == [FileChange(path=Path("/tmp/a.py"), change_type="deleted")] + + def test_multiple_paths_preserved_in_first_seen_order(self) -> None: + result = _collapse_changes( + [ + FileChange(path=Path("/tmp/b.py"), change_type="modified"), + FileChange(path=Path("/tmp/a.py"), change_type="modified"), + FileChange(path=Path("/tmp/b.py"), change_type="modified"), + ] + ) + assert result == [ + FileChange(path=Path("/tmp/b.py"), change_type="modified"), + FileChange(path=Path("/tmp/a.py"), change_type="modified"), + ] + + def test_empty_input_returns_empty(self) -> None: + assert _collapse_changes([]) == [] + + +@pytest.mark.asyncio +class TestDebounceAndDispatch: + """Step 7 — trailing-edge debounce coalesces rapid events into one callback.""" + + async def test_one_save_one_callback_after_200ms(self, tmp_path: Path) -> None: + events: list[tuple[float, list[FileChange]]] = [] + + async def on_change(changes: list[FileChange]) -> None: + events.append((time.monotonic(), list(changes))) + + watcher = FileWatcher(on_change, debounce_ms=200, paths=[tmp_path / "a.py"]) + # Don't run the watch loop; drive the seam directly. + watcher._state = "running" # noqa: SLF001 + start = time.monotonic() + watcher._handle_change_batch( # noqa: SLF001 + [FileChange(path=tmp_path / "a.py", change_type="modified")] + ) + # Wait past the debounce window. + await asyncio.sleep(0.35) + + assert len(events) == 1 + elapsed_ms = (events[0][0] - start) * 1000 + assert 150 < elapsed_ms < 350, f"flush at {elapsed_ms:.0f}ms not within 200±150" + assert events[0][1] == [ + FileChange(path=tmp_path / "a.py", change_type="modified"), + ] + await watcher.stop() + + async def test_five_rapid_saves_one_callback(self, tmp_path: Path) -> None: + events: list[tuple[float, list[FileChange]]] = [] + + async def on_change(changes: list[FileChange]) -> None: + events.append((time.monotonic(), list(changes))) + + watcher = FileWatcher(on_change, debounce_ms=200, paths=[tmp_path / "a.py"]) + watcher._state = "running" # noqa: SLF001 + start = time.monotonic() + # Five batches within 50ms, all for the same file. + for _ in range(5): + watcher._handle_change_batch( # noqa: SLF001 + [FileChange(path=tmp_path / "a.py", change_type="modified")] + ) + await asyncio.sleep(0.005) + last_event_time = time.monotonic() + + await asyncio.sleep(0.4) + + assert len(events) == 1, ( + f"expected exactly 1 callback, got {len(events)}: {events!r}" + ) + # Callback should fire ~200ms after the LAST event, not after the first. + elapsed_after_last_ms = (events[0][0] - last_event_time) * 1000 + assert 150 < elapsed_after_last_ms < 300, ( + f"flush at {elapsed_after_last_ms:.0f}ms after last event, " + "expected 200ms ±50ms" + ) + # All five events collapsed to one (same path, all modified). + assert events[0][1] == [ + FileChange(path=tmp_path / "a.py", change_type="modified"), + ] + # Sanity: total elapsed from FIRST event is at least 200ms. + elapsed_total_ms = (events[0][0] - start) * 1000 + assert elapsed_total_ms >= 200 + await watcher.stop() + + async def test_callback_exception_does_not_break_subsequent_dispatches( + self, tmp_path: Path, caplog: pytest.LogCaptureFixture + ) -> None: + call_log: list[list[FileChange]] = [] + raise_on_first = True + + async def on_change(changes: list[FileChange]) -> None: + nonlocal raise_on_first + call_log.append(list(changes)) + if raise_on_first: + raise_on_first = False + raise RuntimeError("boom") + + watcher = FileWatcher(on_change, debounce_ms=100, paths=[tmp_path / "a.py"]) + watcher._state = "running" # noqa: SLF001 + with caplog.at_level(logging.ERROR, logger="openrtc.execution.file_watcher"): + watcher._handle_change_batch( # noqa: SLF001 + [FileChange(path=tmp_path / "a.py", change_type="modified")] + ) + await asyncio.sleep(0.25) + # First flush fired and raised; the watcher must still be alive. + assert len(call_log) == 1 + assert any("on_change raised" in rec.message for rec in caplog.records) + + # Fire a second batch; the watcher should still dispatch. + watcher._handle_change_batch( # noqa: SLF001 + [FileChange(path=tmp_path / "b.py", change_type="created")] + ) + await asyncio.sleep(0.25) + assert len(call_log) == 2 + assert call_log[1] == [ + FileChange(path=tmp_path / "b.py", change_type="created") + ] + await watcher.stop() + + async def test_stop_during_pending_flush_cancels_cleanly( + self, tmp_path: Path + ) -> None: + events: list[list[FileChange]] = [] + + async def on_change(changes: list[FileChange]) -> None: + events.append(list(changes)) + + watcher = FileWatcher(on_change, debounce_ms=300, paths=[tmp_path / "a.py"]) + watcher._state = "running" # noqa: SLF001 + watcher._handle_change_batch( # noqa: SLF001 + [FileChange(path=tmp_path / "a.py", change_type="modified")] + ) + # Stop BEFORE the flush window elapses. + await asyncio.sleep(0.05) + await watcher.stop() + # Wait long enough that any leaked flush would have fired. + await asyncio.sleep(0.4) + assert events == [] + # No leaked tasks. + assert watcher._flush_task is None # noqa: SLF001 From dc532350d618d8f97cd2cb5690504fb754e4b67c Mon Sep 17 00:00:00 2001 From: Mahimai Raja J Date: Wed, 6 May 2026 14:21:50 -0400 Subject: [PATCH 07/10] feat(execution): re-export FileWatcher and FileChange from package root MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add the locked public API surface (design.md §3.5) to the openrtc package: from openrtc import FileWatcher, FileChange works in a fresh interpreter. Both names land in __all__ alongside AgentPool, AgentConfig, etc., so star-imports and IDEs see them. Three tests guard the contract: a fresh subprocess imports both names without going through the runtime, the names appear in __all__, and the package-level re-exports are object-identity with the originals. --- src/openrtc/__init__.py | 3 ++ tests/execution/test_file_watcher.py | 43 ++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/src/openrtc/__init__.py b/src/openrtc/__init__.py index 0671c0f..7d2bb5f 100644 --- a/src/openrtc/__init__.py +++ b/src/openrtc/__init__.py @@ -4,6 +4,7 @@ from .core.config import AgentConfig, AgentDiscoveryConfig, agent_config from .core.pool import AgentPool +from .execution.file_watcher import FileChange, FileWatcher from .types import ProviderValue try: @@ -18,6 +19,8 @@ "AgentConfig", "AgentDiscoveryConfig", "AgentPool", + "FileChange", + "FileWatcher", "ProviderValue", "__version__", "agent_config", diff --git a/tests/execution/test_file_watcher.py b/tests/execution/test_file_watcher.py index c3e147c..067be63 100644 --- a/tests/execution/test_file_watcher.py +++ b/tests/execution/test_file_watcher.py @@ -498,3 +498,46 @@ async def on_change(changes: list[FileChange]) -> None: assert events == [] # No leaked tasks. assert watcher._flush_task is None # noqa: SLF001 + + +class TestPackageReExports: + """``from openrtc import FileWatcher, FileChange`` is part of the public API.""" + + def test_fresh_process_can_import_top_level_names(self) -> None: + import subprocess + + proc = subprocess.run( + [ + sys.executable, + "-c", + ( + "from openrtc import FileWatcher, FileChange; " + "print(FileWatcher.__name__, FileChange.__name__)" + ), + ], + capture_output=True, + text=True, + check=False, + ) + assert proc.returncode == 0, ( + f"fresh-process import failed: stderr={proc.stderr!r}" + ) + assert "FileWatcher FileChange" in proc.stdout + + def test_names_appear_in_dunder_all(self) -> None: + import openrtc + + assert "FileWatcher" in openrtc.__all__ + assert "FileChange" in openrtc.__all__ + + def test_re_exports_are_the_same_objects(self) -> None: + import openrtc + from openrtc.execution.file_watcher import ( + FileChange as InnerFileChange, + ) + from openrtc.execution.file_watcher import ( + FileWatcher as InnerFileWatcher, + ) + + assert openrtc.FileWatcher is InnerFileWatcher + assert openrtc.FileChange is InnerFileChange From 8539989c35f20754eb0fd66eeeb3d00f9be9a0fe Mon Sep 17 00:00:00 2001 From: Mahimai Raja J Date: Wed, 6 May 2026 14:22:48 -0400 Subject: [PATCH 08/10] test(execution): smoke test FileWatcher against tempdir End-to-end smoke covering watchfiles -> _handle_change_batch -> trailing debounce -> on_change against a real file. Writes initial contents, starts the watcher, edits the file, waits up to 3s for the debounced callback, then asserts exactly one callback arrived with the expected path and a non-deleted change_type. Finally checks the clean shutdown contract: state -> stopped, _watch_task and _flush_task both cleared. --- tests/execution/test_file_watcher_smoke.py | 66 ++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 tests/execution/test_file_watcher_smoke.py diff --git a/tests/execution/test_file_watcher_smoke.py b/tests/execution/test_file_watcher_smoke.py new file mode 100644 index 0000000..fd1a341 --- /dev/null +++ b/tests/execution/test_file_watcher_smoke.py @@ -0,0 +1,66 @@ +"""End-to-end smoke test for the file watcher (MAH-80, Step 9). + +Exercises the full pipeline — ``watchfiles.awatch`` -> ``_handle_change_batch`` +-> trailing debounce flush -> ``on_change`` callback — against a real +file on disk. The unit tests in ``test_file_watcher.py`` cover each +seam in isolation; this test confirms the seams stay glued together +when wired through the live filesystem watcher. +""" + +from __future__ import annotations + +import asyncio +import time +from pathlib import Path + +import pytest + +from openrtc import FileChange, FileWatcher + + +@pytest.mark.asyncio +async def test_filewatcher_smoke_against_real_tempdir(tmp_path: Path) -> None: + target = tmp_path / "agent.py" + target.write_text("# initial contents\n") + + received: list[list[FileChange]] = [] + + async def record_changes(changes: list[FileChange]) -> None: + received.append(list(changes)) + + watcher = FileWatcher(record_changes, debounce_ms=200, paths=[target]) + await watcher.start() + try: + # Give watchfiles a moment to install the OS-level watch. + await asyncio.sleep(0.15) + + # One real edit on disk. + target.write_text("# modified contents\n") + + # Wait long enough for the watchfiles batch + 200ms debounce + dispatch. + deadline = time.monotonic() + 3.0 + while time.monotonic() < deadline and not received: + await asyncio.sleep(0.05) + + assert received, ( + f"on_change never fired within 3s; tempdir={tmp_path}, " + f"watcher.state={watcher.state}" + ) + # Exactly one callback for one logical edit. + assert len(received) == 1, ( + f"expected exactly one callback, got {len(received)}: {received!r}" + ) + # The callback receives a list of FileChange touching our file. + paths = {fc.path.resolve() for fc in received[0]} + assert target.resolve() in paths + change_types = {fc.change_type for fc in received[0]} + # Editor-style writes can land as modified or created depending on + # the platform and write strategy; both are valid ends of the contract. + assert change_types <= {"created", "modified", "deleted"} + assert "deleted" not in change_types + finally: + await watcher.stop() + assert watcher.state == "stopped" + # Clean shutdown: no leaked watch task or flush task. + assert watcher._watch_task is None # noqa: SLF001 + assert watcher._flush_task is None # noqa: SLF001 From a0172db8b56f8b9c079adf0e68d962827b363ddd Mon Sep 17 00:00:00 2001 From: Mahimai Raja J Date: Wed, 6 May 2026 14:24:26 -0400 Subject: [PATCH 09/10] docs(execution): docstrings for FileWatcher public API Expand the module-level docstring with a Contract summary covering discovery, debounce, callback isolation, and stop() semantics. Each public method now documents Args / Side effects / Raises explicitly: - __init__: each parameter described, ValueError raise contract - start: side effect (task + event creation, state transition), RuntimeError on restart, idempotency note - stop: side effects (state, stop_event, task cancellation, dropped buffer), idempotency, no-restart invariant - refresh_paths: synchronous side effect on auto-discover watchers, no-op for explicit-path watchers No behavior change. README + user guide land separately in MAH-87. --- src/openrtc/execution/file_watcher.py | 98 ++++++++++++++++++++++----- 1 file changed, 81 insertions(+), 17 deletions(-) diff --git a/src/openrtc/execution/file_watcher.py b/src/openrtc/execution/file_watcher.py index d6e32c4..4dcfe6c 100644 --- a/src/openrtc/execution/file_watcher.py +++ b/src/openrtc/execution/file_watcher.py @@ -1,14 +1,34 @@ -"""File watcher infrastructure for user agent code. +"""File watcher infrastructure for user agent code (MAH-80, v0.2.1). The watcher monitors user-edited Python modules and emits debounced change events. Reload, re-import, and session re-binding are out of scope here — see MAH-81 onward. This module provides the foundation: discovery, event shape, and a callback API. +Contract summary +---------------- + +- The watcher discovers user-editable modules from ``sys.modules`` at + construction (when ``paths=None``) and ignores anything under + ``site-packages`` / ``sys.prefix``. +- Filesystem events are consumed via ``watchfiles.awatch`` and mapped + to :class:`FileChange` instances. +- Rapid events are coalesced through a trailing-edge debounce + (``debounce_ms``, default 200) before the user callback fires, so + multi-write editor saves produce a single dispatch. +- The user callback is awaited inside a try/except: exceptions are + logged at ERROR and swallowed, leaving the watcher running. +- ``stop()`` cancels the in-flight watch and flush tasks, drops the + pending buffer, and is safe to call repeatedly. + Public API (locked at design.md §3.5): - :class:`FileChange` — frozen dataclass describing a single change -- :class:`FileWatcher` — async watcher with ``start()``/``stop()``/``refresh_paths()`` +- :class:`FileWatcher` — async watcher with + ``start()`` / ``stop()`` / ``refresh_paths()`` + +Both names are re-exported from the package root, so callers can write +``from openrtc import FileWatcher, FileChange``. """ from __future__ import annotations @@ -135,6 +155,23 @@ def __init__( debounce_ms: int = 200, paths: list[Path] | None = None, ) -> None: + """Construct a watcher; does not start watching until :meth:`start`. + + Args: + on_change: Async callable invoked with the coalesced + ``list[FileChange]`` after each debounce window. + Exceptions raised by this callable are logged and + swallowed. + debounce_ms: Trailing-edge debounce window. Must be > 0. + paths: Explicit list of files or directories to watch. When + ``None`` (default), the watcher snapshots + ``sys.modules`` and excludes anything under the + interpreter / site-packages roots — :meth:`refresh_paths` + only re-runs discovery in this auto-discover mode. + + Raises: + ValueError: ``debounce_ms`` is not strictly positive. + """ if debounce_ms <= 0: raise ValueError( f"debounce_ms must be > 0, got {debounce_ms}.", @@ -166,22 +203,39 @@ def state(self) -> WatcherState: return self._state def refresh_paths(self) -> None: - """Re-discover user modules when constructed with ``paths=None``. - - No-op when explicit paths were supplied at construction (the - caller manages that list). Synchronous because rebuilding the - path set is a fast in-process snapshot; the live watcher loop - picks up the change on its next event boundary. + """Re-snapshot ``sys.modules`` for the auto-discover watcher. + + Side effects: + Replaces ``self._paths`` with a fresh discovery snapshot + when the watcher was constructed with ``paths=None``. + No-op when explicit paths were supplied (the caller owns + the list). + + Notes: + Synchronous because rebuilding the path set is a fast + in-process snapshot. The live watch loop picks up the + change on its next iteration boundary; this method does + not restart the watcher. """ if not self._auto_discover: return self._paths = _discover_user_modules() async def start(self) -> None: - """Begin watching. Idempotent: a second call while running is a no-op. + """Begin watching. Idempotent. - Raises ``RuntimeError`` if called after :meth:`stop` — construct - a new watcher instead. + Side effects: + Creates an ``asyncio.Event`` (``_stop_event``) and an + ``asyncio.Task`` running :meth:`_run_watch_loop`, then + transitions the state to ``running``. + + Raises: + RuntimeError: Called after :meth:`stop`. A stopped watcher + cannot be restarted; construct a new instance. + + Notes: + A second call while ``running`` is a no-op (does not spawn + a duplicate watch task). """ if self._state == "running": return @@ -197,12 +251,22 @@ async def start(self) -> None: ) async def stop(self) -> None: - """Stop watching. Idempotent: safe to call multiple times. - - Calling ``stop()`` on a fresh (never-started) watcher transitions - it directly to ``stopped`` so the no-restart invariant still - holds. A pending debounce flush is cancelled cleanly without - firing the callback. + """Stop watching. Idempotent and graceful. + + Side effects: + - Transitions state to ``stopped`` (terminal — :meth:`start` + will raise). + - Sets ``_stop_event`` so ``watchfiles.awatch`` exits its + async iterator. + - Cancels and awaits the in-flight watch task and any + pending flush task; ``CancelledError`` is suppressed. + - Drops ``self._pending`` (any unflushed events are lost). + + Notes: + Calling ``stop()`` on a fresh (never-started) watcher still + moves it to ``stopped`` so the no-restart invariant holds. + A pending debounce flush is cancelled without invoking the + user callback. """ if self._state == "stopped": return From eade5da3a1fdf24df23eefe6eedd11145766a1e6 Mon Sep 17 00:00:00 2001 From: Mahimai Raja J Date: Wed, 6 May 2026 16:55:57 -0400 Subject: [PATCH 10/10] fix(execution): absolutize emitted paths and honor refresh_paths while running Two issues from review: 1. FileChange paths emitted by the live watcher are now resolved via Path.resolve(strict=False) at the boundary, matching the dataclass docstring's "Paths emitted by FileWatcher are absolute" claim. The dataclass itself does not normalize in __post_init__: doing so would break literal '/tmp/agent.py' assertions on macOS where /tmp resolves to /private/tmp. Manual constructions are documented as the caller's responsibility. 2. refresh_paths() previously mutated self._paths but watchfiles.awatch captures paths at construction, so a running watcher never picked up the new set. Add _restart_event: refresh_paths() sets it, _run_watch_loop wraps awatch in an outer loop that tears down the current iterator (via a per-iteration mirror task that signals awatch's stop_event on either stop or restart) and recreates awatch with the latest self._paths. stop() also sets _restart_event so the mirror task wakes promptly. Two regression tests: - test_emitted_paths_are_absolute: every FileChange landing in the pending buffer has an absolute path. - test_refresh_paths_during_run_swaps_watched_set: with two files in separate dirs, edits to file_b are silent before refresh_paths(), and fire the callback after. --- src/openrtc/execution/file_watcher.py | 150 +++++++++++++++++++------- tests/execution/test_file_watcher.py | 80 ++++++++++++++ 2 files changed, 191 insertions(+), 39 deletions(-) diff --git a/src/openrtc/execution/file_watcher.py b/src/openrtc/execution/file_watcher.py index 4dcfe6c..0d4eb85 100644 --- a/src/openrtc/execution/file_watcher.py +++ b/src/openrtc/execution/file_watcher.py @@ -62,9 +62,11 @@ class FileChange: """A single filesystem change event. Frozen so instances are hashable and can be deduplicated in sets. - Paths are absolute. ``change_type`` is one of ``"created"``, - ``"modified"``, or ``"deleted"`` (mapped from watchfiles' ``Change`` - enum at the watcher boundary). + Paths emitted by :class:`FileWatcher` are absolutized (via + ``Path.resolve(strict=False)``) at the watcher boundary; instances + constructed by callers directly carry whatever path they pass in. + ``change_type`` is one of ``"created"``, ``"modified"``, or + ``"deleted"`` (mapped from watchfiles' ``Change`` enum). """ path: Path @@ -187,8 +189,11 @@ def __init__( self._state: WatcherState = "new" # Filled in on start(). _pending collects changes between # debounce flushes; _flush_task fires the trailing-edge flush. + # _restart_event signals the watch loop to recreate awatch with + # the latest self._paths after refresh_paths() mutates them. self._pending: list[FileChange] = [] self._stop_event: asyncio.Event | None = None + self._restart_event: asyncio.Event | None = None self._watch_task: asyncio.Task[None] | None = None self._flush_task: asyncio.Task[None] | None = None @@ -206,20 +211,26 @@ def refresh_paths(self) -> None: """Re-snapshot ``sys.modules`` for the auto-discover watcher. Side effects: - Replaces ``self._paths`` with a fresh discovery snapshot - when the watcher was constructed with ``paths=None``. - No-op when explicit paths were supplied (the caller owns - the list). + - Replaces ``self._paths`` with a fresh discovery snapshot + when the watcher was constructed with ``paths=None``. + No-op when explicit paths were supplied (the caller owns + the list). + - When the watcher is running, sets ``_restart_event`` so + the watch loop tears down the current ``awatch`` iterator + and recreates it with the new path set on the next + iteration boundary. Notes: Synchronous because rebuilding the path set is a fast - in-process snapshot. The live watch loop picks up the - change on its next iteration boundary; this method does - not restart the watcher. + in-process snapshot. The live recreate happens + asynchronously in the watch loop, typically within a few + milliseconds. """ if not self._auto_discover: return self._paths = _discover_user_modules() + if self._restart_event is not None: + self._restart_event.set() async def start(self) -> None: """Begin watching. Idempotent. @@ -245,6 +256,7 @@ async def start(self) -> None: ) self._state = "running" self._stop_event = asyncio.Event() + self._restart_event = asyncio.Event() self._watch_task = asyncio.create_task( self._run_watch_loop(), name=f"openrtc.file_watcher[{id(self):#x}]", @@ -273,6 +285,10 @@ async def stop(self) -> None: self._state = "stopped" if self._stop_event is not None: self._stop_event.set() + if self._restart_event is not None: + # Wake any awaiter blocked on the restart side of the mirror + # task so the watch loop can observe stop. + self._restart_event.set() if self._flush_task is not None: self._flush_task.cancel() with contextlib.suppress(asyncio.CancelledError): @@ -284,44 +300,100 @@ async def stop(self) -> None: await self._watch_task self._watch_task = None self._stop_event = None + self._restart_event = None self._pending.clear() async def _run_watch_loop(self) -> None: """Background task: consume ``watchfiles.awatch`` and feed the debounce. + Wraps ``watchfiles.awatch`` in an outer loop so :meth:`refresh_paths` + can swap the watched path set without restarting the whole + watcher: when ``_restart_event`` fires, the inner ``awatch`` + iterator's ``stop_event`` is tripped, the loop tears it down, + and the next iteration creates a fresh ``awatch`` over the + latest ``self._paths``. + Each batch from watchfiles is converted to ``FileChange`` - instances and handed to :meth:`_handle_change_batch`, which - extends ``self._pending`` and (re)schedules the trailing flush. + instances (with absolutized paths) and handed to + :meth:`_handle_change_batch`, which extends ``self._pending`` + and (re)schedules the trailing flush. """ - if not self._paths: - # No paths to watch — block until stop(). - assert self._stop_event is not None - await self._stop_event.wait() - return assert self._stop_event is not None - try: - async for changes in watchfiles.awatch( - *self._paths, - stop_event=self._stop_event, - ): - batch: list[FileChange] = [] - for change_kind, raw_path in changes: - change_type = _WATCHFILES_CHANGE_MAP.get(change_kind) - if change_type is None: - # watchfiles may add new variants; ignore unknowns. - continue - batch.append( - FileChange( - path=Path(raw_path), - change_type=change_type, + assert self._restart_event is not None + while not self._stop_event.is_set(): + # Snapshot paths at iteration start. refresh_paths() mutates + # self._paths and sets _restart_event; the next iteration + # picks up the new list. + current_paths = list(self._paths) + iter_done = asyncio.Event() + mirror = asyncio.create_task( + self._mirror_signals(iter_done), + name=f"openrtc.file_watcher.mirror[{id(self):#x}]", + ) + try: + if current_paths: + try: + async for changes in watchfiles.awatch( + *current_paths, + stop_event=iter_done, + ): + batch: list[FileChange] = [] + for change_kind, raw_path in changes: + change_type = _WATCHFILES_CHANGE_MAP.get(change_kind) + if change_type is None: + # watchfiles may add new variants; + # ignore unknowns. + continue + batch.append( + FileChange( + path=Path(raw_path).resolve(strict=False), + change_type=change_type, + ) + ) + if batch: + self._handle_change_batch(batch) + except asyncio.CancelledError: + raise + except Exception: # noqa: BLE001 — logged and swallowed + _log.exception( + "FileWatcher loop crashed; events will stop firing", ) - ) - if batch: - self._handle_change_batch(batch) - except asyncio.CancelledError: - raise - except Exception: # noqa: BLE001 — logged and swallowed - _log.exception("FileWatcher loop crashed; events will stop firing") + return + else: + # No paths to watch — wait for stop or restart. + await iter_done.wait() + finally: + mirror.cancel() + with contextlib.suppress(asyncio.CancelledError): + await mirror + # Drain whichever signal triggered iter_done. If stop fired, + # the while-condition exits us. Otherwise clear the restart + # flag and loop back to recreate awatch over the new paths. + if self._restart_event.is_set() and not self._stop_event.is_set(): + self._restart_event.clear() + + async def _mirror_signals(self, target: asyncio.Event) -> None: + """Set *target* when either ``_stop_event`` or ``_restart_event`` fires. + + Used to translate the watcher's two lifecycle signals into the + single ``stop_event`` that ``watchfiles.awatch`` accepts. + """ + assert self._stop_event is not None + assert self._restart_event is not None + stop_wait = asyncio.create_task(self._stop_event.wait()) + restart_wait = asyncio.create_task(self._restart_event.wait()) + try: + await asyncio.wait( + {stop_wait, restart_wait}, + return_when=asyncio.FIRST_COMPLETED, + ) + target.set() + finally: + for task in (stop_wait, restart_wait): + if not task.done(): + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task def _handle_change_batch(self, batch: list[FileChange]) -> None: """Buffer one batch and (re)schedule the trailing debounce flush. diff --git a/tests/execution/test_file_watcher.py b/tests/execution/test_file_watcher.py index 067be63..e8047f5 100644 --- a/tests/execution/test_file_watcher.py +++ b/tests/execution/test_file_watcher.py @@ -321,6 +321,86 @@ async def test_empty_path_list_starts_and_stops(self) -> None: await watcher.stop() assert watcher.state == "stopped" + async def test_emitted_paths_are_absolute(self, tmp_path: Path) -> None: + target = tmp_path / "agent.py" + target.write_text("# initial\n") + watcher = FileWatcher(_noop_callback, paths=[target]) + await watcher.start() + try: + await asyncio.sleep(0.1) + target.write_text("# modified\n") + arrived = await _wait_until( + lambda: bool(watcher._pending), # noqa: SLF001 + timeout_s=3.0, + ) + assert arrived + for fc in watcher._pending: # noqa: SLF001 + assert fc.path.is_absolute(), ( + f"emitted FileChange.path must be absolute, got {fc.path!r}" + ) + finally: + await watcher.stop() + + async def test_refresh_paths_during_run_swaps_watched_set( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + # Two files in DIFFERENT directories so watchfiles' parent- + # directory-watch doesn't accidentally surface file_b's edits + # while we're still watching only file_a. + dir_a = tmp_path / "dir_a" + dir_b = tmp_path / "dir_b" + dir_a.mkdir() + dir_b.mkdir() + file_a = dir_a / "a.py" + file_a.write_text("# initial\n") + file_b = dir_b / "b.py" + file_b.write_text("# initial\n") + + # Auto-discover watcher: discovery returns only file_a initially. + monkeypatch.setattr( + "openrtc.execution.file_watcher._discover_user_modules", + lambda: [file_a], + ) + + received: list[list[FileChange]] = [] + + async def on_change(changes: list[FileChange]) -> None: + received.append(list(changes)) + + watcher = FileWatcher(on_change, debounce_ms=100, paths=None) + assert watcher.paths == [file_a] + await watcher.start() + try: + await asyncio.sleep(0.15) + + # Edit file_b while only file_a is watched: must NOT fire. + file_b.write_text("# v2\n") + await asyncio.sleep(0.4) + assert received == [], ( + f"file_b shouldn't fire before refresh; got {received!r}" + ) + + # Switch discovery to include file_b and refresh. + monkeypatch.setattr( + "openrtc.execution.file_watcher._discover_user_modules", + lambda: [file_a, file_b], + ) + watcher.refresh_paths() + assert watcher.paths == [file_a, file_b] + # Give the watch loop a beat to recreate awatch over the new set. + await asyncio.sleep(0.3) + + # Now editing file_b SHOULD fire. + file_b.write_text("# v3\n") + arrived = await _wait_until(lambda: bool(received), timeout_s=3.0) + assert arrived, "file_b edit didn't fire after refresh_paths()" + paths_seen = {fc.path.resolve() for batch in received for fc in batch} + assert file_b.resolve() in paths_seen + finally: + await watcher.stop() + assert watcher.state == "stopped" + assert watcher._watch_task is None # noqa: SLF001 + class TestCollapseChanges: """``_collapse_changes`` enforces the design.md §3.4 collapse rules."""