diff --git a/pyproject.toml b/pyproject.toml index 565ddec..c6d1228 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ license = "MIT" requires-python = ">=3.11,<3.14" dependencies = [ "livekit-agents[openai,silero,turn-detector]~=1.5", + "watchfiles>=0.21,<2", ] [project.optional-dependencies] diff --git a/src/openrtc/__init__.py b/src/openrtc/__init__.py index 0671c0f..7d2bb5f 100644 --- a/src/openrtc/__init__.py +++ b/src/openrtc/__init__.py @@ -4,6 +4,7 @@ from .core.config import AgentConfig, AgentDiscoveryConfig, agent_config from .core.pool import AgentPool +from .execution.file_watcher import FileChange, FileWatcher from .types import ProviderValue try: @@ -18,6 +19,8 @@ "AgentConfig", "AgentDiscoveryConfig", "AgentPool", + "FileChange", + "FileWatcher", "ProviderValue", "__version__", "agent_config", diff --git a/src/openrtc/execution/file_watcher.py b/src/openrtc/execution/file_watcher.py new file mode 100644 index 0000000..0d4eb85 --- /dev/null +++ b/src/openrtc/execution/file_watcher.py @@ -0,0 +1,472 @@ +"""File watcher infrastructure for user agent code (MAH-80, v0.2.1). + +The watcher monitors user-edited Python modules and emits debounced +change events. Reload, re-import, and session re-binding are out of +scope here — see MAH-81 onward. This module provides the foundation: +discovery, event shape, and a callback API. + +Contract summary +---------------- + +- The watcher discovers user-editable modules from ``sys.modules`` at + construction (when ``paths=None``) and ignores anything under + ``site-packages`` / ``sys.prefix``. +- Filesystem events are consumed via ``watchfiles.awatch`` and mapped + to :class:`FileChange` instances. +- Rapid events are coalesced through a trailing-edge debounce + (``debounce_ms``, default 200) before the user callback fires, so + multi-write editor saves produce a single dispatch. +- The user callback is awaited inside a try/except: exceptions are + logged at ERROR and swallowed, leaving the watcher running. +- ``stop()`` cancels the in-flight watch and flush tasks, drops the + pending buffer, and is safe to call repeatedly. + +Public API (locked at design.md §3.5): + +- :class:`FileChange` — frozen dataclass describing a single change +- :class:`FileWatcher` — async watcher with + ``start()`` / ``stop()`` / ``refresh_paths()`` + +Both names are re-exported from the package root, so callers can write +``from openrtc import FileWatcher, FileChange``. +""" + +from __future__ import annotations + +import asyncio +import contextlib +import logging +import site +import sys +from collections.abc import Awaitable, Callable +from dataclasses import dataclass +from pathlib import Path +from typing import Literal + +import watchfiles + +ChangeType = Literal["created", "modified", "deleted"] +WatcherState = Literal["new", "running", "stopped"] + +_log = logging.getLogger(__name__) + +_WATCHFILES_CHANGE_MAP: dict[watchfiles.Change, ChangeType] = { + watchfiles.Change.added: "created", + watchfiles.Change.modified: "modified", + watchfiles.Change.deleted: "deleted", +} + + +@dataclass(frozen=True) +class FileChange: + """A single filesystem change event. + + Frozen so instances are hashable and can be deduplicated in sets. + Paths emitted by :class:`FileWatcher` are absolutized (via + ``Path.resolve(strict=False)``) at the watcher boundary; instances + constructed by callers directly carry whatever path they pass in. + ``change_type`` is one of ``"created"``, ``"modified"``, or + ``"deleted"`` (mapped from watchfiles' ``Change`` enum). + """ + + path: Path + change_type: ChangeType + + +def _interpreter_excluded_roots() -> list[Path]: + """Return absolute directory roots whose contents are NOT user code. + + Modules whose ``__file__`` lives under any of these roots are + interpreter, standard library, or third-party package code — not + something a user would edit during a hot-reload session. + """ + roots: list[Path] = [Path(path).resolve() for path in site.getsitepackages()] + user_site = site.getusersitepackages() + if user_site: + roots.append(Path(user_site).resolve()) + roots.append(Path(sys.prefix).resolve()) + roots.append(Path(sys.base_prefix).resolve()) + # Deduplicate while preserving order. + seen: set[Path] = set() + unique: list[Path] = [] + for root in roots: + if root not in seen: + seen.add(root) + unique.append(root) + return unique + + +def _is_under(path: Path, roots: list[Path]) -> bool: + """Return True if *path* is at or below any of *roots*.""" + return any(path.is_relative_to(root) for root in roots) + + +def _discover_user_modules() -> list[Path]: + """Snapshot ``sys.modules`` and return user-editable Python file paths. + + A module is "user-editable" when: + + 1. It exposes a real ``__file__`` attribute (excludes built-ins, + namespace packages, and some C extensions). + 2. The file is NOT under any interpreter or site-packages root + returned by :func:`_interpreter_excluded_roots`. + + Returns absolute, deduplicated paths in module-iteration order. + Modules without a ``__file__`` are skipped silently — that is the + documented "graceful" behavior for built-ins. + """ + excluded = _interpreter_excluded_roots() + seen: set[Path] = set() + discovered: list[Path] = [] + # Snapshot to a list to tolerate sys.modules mutation during iteration. + for module in list(sys.modules.values()): + file_attr = getattr(module, "__file__", None) + if not file_attr: + continue + try: + resolved = Path(file_attr).resolve() + except (OSError, RuntimeError): + # Resolving can fail on broken symlinks or weird platforms; + # skip those rather than break discovery. + continue + if _is_under(resolved, excluded): + continue + if resolved in seen: + continue + seen.add(resolved) + discovered.append(resolved) + return discovered + + +class FileWatcher: + """Watch user-edited Python modules and emit debounced change events. + + Public API is locked at design.md §3.5. The watcher is async-native: + ``start()`` schedules a background watch task, ``stop()`` cancels + it gracefully, and ``refresh_paths()`` rebuilds the auto-discovered + path set without restarting. + + Lifecycle: a watcher transitions ``new → running → stopped``. A + stopped watcher cannot be restarted — construct a new one. + """ + + def __init__( + self, + on_change: Callable[[list[FileChange]], Awaitable[None]], + *, + debounce_ms: int = 200, + paths: list[Path] | None = None, + ) -> None: + """Construct a watcher; does not start watching until :meth:`start`. + + Args: + on_change: Async callable invoked with the coalesced + ``list[FileChange]`` after each debounce window. + Exceptions raised by this callable are logged and + swallowed. + debounce_ms: Trailing-edge debounce window. Must be > 0. + paths: Explicit list of files or directories to watch. When + ``None`` (default), the watcher snapshots + ``sys.modules`` and excludes anything under the + interpreter / site-packages roots — :meth:`refresh_paths` + only re-runs discovery in this auto-discover mode. + + Raises: + ValueError: ``debounce_ms`` is not strictly positive. + """ + if debounce_ms <= 0: + raise ValueError( + f"debounce_ms must be > 0, got {debounce_ms}.", + ) + self._on_change = on_change + self._debounce_ms = debounce_ms + # ``paths is None`` → auto-discover, and refresh_paths() will + # re-run discovery. Explicit paths short-circuit discovery. + self._auto_discover = paths is None + self._paths: list[Path] = ( + list(paths) if paths is not None else _discover_user_modules() + ) + self._state: WatcherState = "new" + # Filled in on start(). _pending collects changes between + # debounce flushes; _flush_task fires the trailing-edge flush. + # _restart_event signals the watch loop to recreate awatch with + # the latest self._paths after refresh_paths() mutates them. + self._pending: list[FileChange] = [] + self._stop_event: asyncio.Event | None = None + self._restart_event: asyncio.Event | None = None + self._watch_task: asyncio.Task[None] | None = None + self._flush_task: asyncio.Task[None] | None = None + + @property + def paths(self) -> list[Path]: + """Return the current snapshot of watched paths.""" + return list(self._paths) + + @property + def state(self) -> WatcherState: + """Return the current lifecycle state.""" + return self._state + + def refresh_paths(self) -> None: + """Re-snapshot ``sys.modules`` for the auto-discover watcher. + + Side effects: + - Replaces ``self._paths`` with a fresh discovery snapshot + when the watcher was constructed with ``paths=None``. + No-op when explicit paths were supplied (the caller owns + the list). + - When the watcher is running, sets ``_restart_event`` so + the watch loop tears down the current ``awatch`` iterator + and recreates it with the new path set on the next + iteration boundary. + + Notes: + Synchronous because rebuilding the path set is a fast + in-process snapshot. The live recreate happens + asynchronously in the watch loop, typically within a few + milliseconds. + """ + if not self._auto_discover: + return + self._paths = _discover_user_modules() + if self._restart_event is not None: + self._restart_event.set() + + async def start(self) -> None: + """Begin watching. Idempotent. + + Side effects: + Creates an ``asyncio.Event`` (``_stop_event``) and an + ``asyncio.Task`` running :meth:`_run_watch_loop`, then + transitions the state to ``running``. + + Raises: + RuntimeError: Called after :meth:`stop`. A stopped watcher + cannot be restarted; construct a new instance. + + Notes: + A second call while ``running`` is a no-op (does not spawn + a duplicate watch task). + """ + if self._state == "running": + return + if self._state == "stopped": + raise RuntimeError( + "FileWatcher cannot be restarted after stop(); construct a new watcher.", + ) + self._state = "running" + self._stop_event = asyncio.Event() + self._restart_event = asyncio.Event() + self._watch_task = asyncio.create_task( + self._run_watch_loop(), + name=f"openrtc.file_watcher[{id(self):#x}]", + ) + + async def stop(self) -> None: + """Stop watching. Idempotent and graceful. + + Side effects: + - Transitions state to ``stopped`` (terminal — :meth:`start` + will raise). + - Sets ``_stop_event`` so ``watchfiles.awatch`` exits its + async iterator. + - Cancels and awaits the in-flight watch task and any + pending flush task; ``CancelledError`` is suppressed. + - Drops ``self._pending`` (any unflushed events are lost). + + Notes: + Calling ``stop()`` on a fresh (never-started) watcher still + moves it to ``stopped`` so the no-restart invariant holds. + A pending debounce flush is cancelled without invoking the + user callback. + """ + if self._state == "stopped": + return + self._state = "stopped" + if self._stop_event is not None: + self._stop_event.set() + if self._restart_event is not None: + # Wake any awaiter blocked on the restart side of the mirror + # task so the watch loop can observe stop. + self._restart_event.set() + if self._flush_task is not None: + self._flush_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._flush_task + self._flush_task = None + if self._watch_task is not None: + self._watch_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._watch_task + self._watch_task = None + self._stop_event = None + self._restart_event = None + self._pending.clear() + + async def _run_watch_loop(self) -> None: + """Background task: consume ``watchfiles.awatch`` and feed the debounce. + + Wraps ``watchfiles.awatch`` in an outer loop so :meth:`refresh_paths` + can swap the watched path set without restarting the whole + watcher: when ``_restart_event`` fires, the inner ``awatch`` + iterator's ``stop_event`` is tripped, the loop tears it down, + and the next iteration creates a fresh ``awatch`` over the + latest ``self._paths``. + + Each batch from watchfiles is converted to ``FileChange`` + instances (with absolutized paths) and handed to + :meth:`_handle_change_batch`, which extends ``self._pending`` + and (re)schedules the trailing flush. + """ + assert self._stop_event is not None + assert self._restart_event is not None + while not self._stop_event.is_set(): + # Snapshot paths at iteration start. refresh_paths() mutates + # self._paths and sets _restart_event; the next iteration + # picks up the new list. + current_paths = list(self._paths) + iter_done = asyncio.Event() + mirror = asyncio.create_task( + self._mirror_signals(iter_done), + name=f"openrtc.file_watcher.mirror[{id(self):#x}]", + ) + try: + if current_paths: + try: + async for changes in watchfiles.awatch( + *current_paths, + stop_event=iter_done, + ): + batch: list[FileChange] = [] + for change_kind, raw_path in changes: + change_type = _WATCHFILES_CHANGE_MAP.get(change_kind) + if change_type is None: + # watchfiles may add new variants; + # ignore unknowns. + continue + batch.append( + FileChange( + path=Path(raw_path).resolve(strict=False), + change_type=change_type, + ) + ) + if batch: + self._handle_change_batch(batch) + except asyncio.CancelledError: + raise + except Exception: # noqa: BLE001 — logged and swallowed + _log.exception( + "FileWatcher loop crashed; events will stop firing", + ) + return + else: + # No paths to watch — wait for stop or restart. + await iter_done.wait() + finally: + mirror.cancel() + with contextlib.suppress(asyncio.CancelledError): + await mirror + # Drain whichever signal triggered iter_done. If stop fired, + # the while-condition exits us. Otherwise clear the restart + # flag and loop back to recreate awatch over the new paths. + if self._restart_event.is_set() and not self._stop_event.is_set(): + self._restart_event.clear() + + async def _mirror_signals(self, target: asyncio.Event) -> None: + """Set *target* when either ``_stop_event`` or ``_restart_event`` fires. + + Used to translate the watcher's two lifecycle signals into the + single ``stop_event`` that ``watchfiles.awatch`` accepts. + """ + assert self._stop_event is not None + assert self._restart_event is not None + stop_wait = asyncio.create_task(self._stop_event.wait()) + restart_wait = asyncio.create_task(self._restart_event.wait()) + try: + await asyncio.wait( + {stop_wait, restart_wait}, + return_when=asyncio.FIRST_COMPLETED, + ) + target.set() + finally: + for task in (stop_wait, restart_wait): + if not task.done(): + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + + def _handle_change_batch(self, batch: list[FileChange]) -> None: + """Buffer one batch and (re)schedule the trailing debounce flush. + + Called from the watchfiles loop for each emitted batch and from + unit tests directly. The semantics: extend ``_pending``, cancel + any in-flight flush task, schedule a fresh flush + ``debounce_ms / 1000`` seconds from now. If five rapid batches + arrive, only the last reschedule survives — the prior four + flush tasks are cancelled before they fire. + """ + self._pending.extend(batch) + if self._flush_task is not None and not self._flush_task.done(): + self._flush_task.cancel() + self._flush_task = asyncio.create_task( + self._flush_after(self._debounce_ms / 1000.0), + name=f"openrtc.file_watcher.flush[{id(self):#x}]", + ) + + async def _flush_after(self, delay_s: float) -> None: + """Wait *delay_s* seconds then flush ``_pending`` through ``on_change``. + + Cancellation before the timer fires drops the in-flight flush + without firing the callback (used both by the debounce reschedule + and by ``stop()`` for clean shutdown). Exceptions raised by the + user callback are logged and swallowed so the watch loop keeps + running for subsequent events. + """ + try: + await asyncio.sleep(delay_s) + except asyncio.CancelledError: + raise + # Snapshot + clear under the same logical step. If new events + # arrive while on_change is awaiting, _handle_change_batch will + # schedule the next flush around them. + collapsed = _collapse_changes(self._pending) + self._pending.clear() + if not collapsed: + return + try: + await self._on_change(collapsed) + except asyncio.CancelledError: + raise + except Exception: # noqa: BLE001 — user callback isolation + _log.exception( + "FileWatcher.on_change raised; continuing to watch", + ) + + +def _collapse_changes(changes: list[FileChange]) -> list[FileChange]: + """Coalesce multiple events for the same path into one ``FileChange``. + + Per design.md §3.4, the salient state wins: + + - any ``deleted`` in the path's window → emit ``deleted`` (the file + is gone now, regardless of intermediate states) + - else any ``created`` in the window → emit ``created`` (the file is + new; downstream consumers must register it for the first time) + - otherwise → emit ``modified`` + + Output preserves the first-seen order of paths in *changes*. + """ + by_path: dict[Path, list[ChangeType]] = {} + for change in changes: + by_path.setdefault(change.path, []).append(change.change_type) + collapsed: list[FileChange] = [] + for path, types in by_path.items(): + chosen: ChangeType + if "deleted" in types: + chosen = "deleted" + elif "created" in types: + chosen = "created" + else: + chosen = "modified" + collapsed.append(FileChange(path=path, change_type=chosen)) + return collapsed diff --git a/tests/execution/__init__.py b/tests/execution/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/execution/test_file_watcher.py b/tests/execution/test_file_watcher.py new file mode 100644 index 0000000..e8047f5 --- /dev/null +++ b/tests/execution/test_file_watcher.py @@ -0,0 +1,623 @@ +"""Unit tests for the file watcher (MAH-80).""" + +from __future__ import annotations + +import asyncio +import dataclasses +import importlib.util +import logging +import sys +import time +import types +from collections.abc import Callable +from pathlib import Path + +import pytest + +from openrtc.execution.file_watcher import ( + FileChange, + FileWatcher, + _collapse_changes, + _discover_user_modules, + _interpreter_excluded_roots, +) + + +async def _noop_callback(_changes: list[FileChange]) -> None: + """Default callback for lifecycle tests that don't care about events.""" + + +class TestFileChange: + """The :class:`FileChange` dataclass is the event-payload contract.""" + + def test_construction(self) -> None: + change = FileChange(path=Path("/tmp/agent.py"), change_type="modified") + assert change.path == Path("/tmp/agent.py") + assert change.change_type == "modified" + + def test_equality_same_values(self) -> None: + a = FileChange(path=Path("/tmp/agent.py"), change_type="modified") + b = FileChange(path=Path("/tmp/agent.py"), change_type="modified") + assert a == b + + def test_equality_different_paths(self) -> None: + a = FileChange(path=Path("/tmp/a.py"), change_type="modified") + b = FileChange(path=Path("/tmp/b.py"), change_type="modified") + assert a != b + + def test_equality_different_change_types(self) -> None: + a = FileChange(path=Path("/tmp/agent.py"), change_type="created") + b = FileChange(path=Path("/tmp/agent.py"), change_type="modified") + assert a != b + + def test_hashable_for_set_deduplication(self) -> None: + a = FileChange(path=Path("/tmp/agent.py"), change_type="modified") + b = FileChange(path=Path("/tmp/agent.py"), change_type="modified") + c = FileChange(path=Path("/tmp/agent.py"), change_type="created") + # Equal instances collapse in a set; unequal ones do not. + assert {a, b, c} == {a, c} + + def test_frozen_rejects_mutation(self) -> None: + change = FileChange(path=Path("/tmp/agent.py"), change_type="modified") + with pytest.raises(dataclasses.FrozenInstanceError): + change.path = Path("/tmp/other.py") # type: ignore[misc] + + +def _install_synthetic_module( + name: str, file_path: Path, monkeypatch: pytest.MonkeyPatch +) -> types.ModuleType: + """Register a real-but-empty module pointing at *file_path*. + + The file must exist on disk, otherwise ``Path.resolve()`` may fail + on some platforms. + """ + file_path.parent.mkdir(parents=True, exist_ok=True) + file_path.write_text("# synthetic test module\n") + spec = importlib.util.spec_from_loader(name, loader=None, origin=str(file_path)) + assert spec is not None + module = importlib.util.module_from_spec(spec) + module.__file__ = str(file_path) + monkeypatch.setitem(sys.modules, name, module) + return module + + +class TestDiscoverUserModules: + """``_discover_user_modules`` is the watcher's source of truth for paths.""" + + def test_includes_user_module_under_tempdir( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + user_file = tmp_path / "user_pkg" / "agent.py" + _install_synthetic_module("openrtc_test_user_pkg.agent", user_file, monkeypatch) + discovered = _discover_user_modules() + assert user_file.resolve() in discovered + + def test_excludes_module_under_site_packages( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + fake_site = tmp_path / "site-packages" + site_file = fake_site / "third_party" / "lib.py" + _install_synthetic_module( + "openrtc_test_third_party.lib", site_file, monkeypatch + ) + # Make _interpreter_excluded_roots() include our fake site dir. + original_roots = _interpreter_excluded_roots() + monkeypatch.setattr( + "openrtc.execution.file_watcher._interpreter_excluded_roots", + lambda: [*original_roots, fake_site.resolve()], + ) + discovered = _discover_user_modules() + assert site_file.resolve() not in discovered + + def test_handles_modules_without_file_attribute( + self, monkeypatch: pytest.MonkeyPatch + ) -> None: + # Real built-ins like ``sys`` already lack __file__; verify that + # discovery does not blow up on them (they are present in + # sys.modules during every run). + assert getattr(sys, "__file__", None) is None + discovered = _discover_user_modules() + # Build a synthetic module with __file__ = None to be extra safe. + fake = types.ModuleType("openrtc_test_no_file_module") + fake.__file__ = None # type: ignore[assignment] + monkeypatch.setitem(sys.modules, "openrtc_test_no_file_module", fake) + # Should not raise even with __file__ = None. + discovered_again = _discover_user_modules() + assert isinstance(discovered_again, list) + # The result is at least as informative as the prior call (modules + # without __file__ contribute nothing, never an exception). + assert len(discovered_again) >= 0 + del ( + discovered + ) # silence unused-warning; the first call's purpose was to prove it runs + + def test_returns_distinct_absolute_paths( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + shared_file = tmp_path / "shared.py" + _install_synthetic_module("openrtc_test_alias_a", shared_file, monkeypatch) + # Register the same file under a second module name; discovery + # should deduplicate by resolved path. + spec = importlib.util.spec_from_loader( + "openrtc_test_alias_b", loader=None, origin=str(shared_file) + ) + assert spec is not None + module_b = importlib.util.module_from_spec(spec) + module_b.__file__ = str(shared_file) + monkeypatch.setitem(sys.modules, "openrtc_test_alias_b", module_b) + + discovered = _discover_user_modules() + absolute = [p.is_absolute() for p in discovered] + assert all(absolute) + # The shared file appears at most once. + assert discovered.count(shared_file.resolve()) == 1 + + +class TestFileWatcherLifecycle: + """The :class:`FileWatcher` skeleton — construction, state machine, restart guard. + + These tests exercise the lifecycle contract; debounce + watchfiles + wiring land in later steps. + """ + + def test_construction_with_explicit_paths(self, tmp_path: Path) -> None: + explicit = [tmp_path / "agent.py"] + watcher = FileWatcher(_noop_callback, paths=explicit) + assert watcher.paths == explicit + assert watcher.state == "new" + + def test_construction_with_none_triggers_discovery(self) -> None: + watcher = FileWatcher(_noop_callback, paths=None) + # Discovery returns whatever user-edited modules are loaded; + # the guarantee is that some snapshot was captured (even an + # empty list is fine — the contract is "discovery ran"). + assert isinstance(watcher.paths, list) + # Modules outside site-packages should include the test file + # itself; verify by checking absoluteness. + assert all(p.is_absolute() for p in watcher.paths) + + def test_default_debounce_is_200ms(self, tmp_path: Path) -> None: + watcher = FileWatcher(_noop_callback, paths=[tmp_path / "agent.py"]) + # The default is part of the public API contract (design.md §3.5). + assert watcher._debounce_ms == 200 # noqa: SLF001 — testing the public default + + def test_rejects_non_positive_debounce(self, tmp_path: Path) -> None: + with pytest.raises(ValueError, match="debounce_ms"): + FileWatcher(_noop_callback, debounce_ms=0, paths=[tmp_path / "x.py"]) + with pytest.raises(ValueError, match="debounce_ms"): + FileWatcher(_noop_callback, debounce_ms=-1, paths=[tmp_path / "x.py"]) + + def test_paths_is_a_copy(self, tmp_path: Path) -> None: + explicit = [tmp_path / "agent.py"] + watcher = FileWatcher(_noop_callback, paths=explicit) + # Mutating the original must not mutate the watcher's view. + explicit.append(tmp_path / "extra.py") + assert watcher.paths == [tmp_path / "agent.py"] + + +@pytest.mark.asyncio +class TestFileWatcherAsyncLifecycle: + """``start`` / ``stop`` are async; assert idempotency and the no-restart rule.""" + + async def test_start_is_idempotent(self, tmp_path: Path) -> None: + watcher = FileWatcher(_noop_callback, paths=[tmp_path / "agent.py"]) + await watcher.start() + assert watcher.state == "running" + # Second start: no-op, no error. + await watcher.start() + assert watcher.state == "running" + await watcher.stop() + + async def test_stop_is_idempotent(self, tmp_path: Path) -> None: + watcher = FileWatcher(_noop_callback, paths=[tmp_path / "agent.py"]) + await watcher.start() + await watcher.stop() + assert watcher.state == "stopped" + # Second stop: no-op, no error. + await watcher.stop() + assert watcher.state == "stopped" + + async def test_stop_on_fresh_watcher(self, tmp_path: Path) -> None: + watcher = FileWatcher(_noop_callback, paths=[tmp_path / "agent.py"]) + # Never started → still safe to stop. Guarantees the no-restart + # invariant: a fresh+stopped watcher cannot be revived. + await watcher.stop() + assert watcher.state == "stopped" + with pytest.raises(RuntimeError, match="cannot be restarted"): + await watcher.start() + + async def test_start_after_stop_raises(self, tmp_path: Path) -> None: + watcher = FileWatcher(_noop_callback, paths=[tmp_path / "agent.py"]) + await watcher.start() + await watcher.stop() + with pytest.raises(RuntimeError, match="cannot be restarted"): + await watcher.start() + + +class TestFileWatcherRefreshPaths: + """``refresh_paths`` only re-runs discovery for auto-discover watchers.""" + + def test_refresh_with_auto_discover(self, monkeypatch: pytest.MonkeyPatch) -> None: + watcher = FileWatcher(_noop_callback, paths=None) + sentinel = [Path("/tmp/refresh_marker.py")] + monkeypatch.setattr( + "openrtc.execution.file_watcher._discover_user_modules", + lambda: list(sentinel), + ) + watcher.refresh_paths() + assert watcher.paths == sentinel + + def test_refresh_with_explicit_paths_is_noop( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + explicit = [tmp_path / "agent.py"] + watcher = FileWatcher(_noop_callback, paths=explicit) + # Even if discovery would return something, refresh must not + # touch an explicitly-managed path list. + monkeypatch.setattr( + "openrtc.execution.file_watcher._discover_user_modules", + lambda: [Path("/tmp/should_not_appear.py")], + ) + watcher.refresh_paths() + assert watcher.paths == explicit + + +async def _wait_until( + predicate: Callable[[], bool], + *, + timeout_s: float = 2.0, + poll_s: float = 0.02, +) -> bool: + """Poll until *predicate* is True or *timeout_s* elapses.""" + deadline = asyncio.get_running_loop().time() + timeout_s + while asyncio.get_running_loop().time() < deadline: + if predicate(): + return True + await asyncio.sleep(poll_s) + return predicate() + + +@pytest.mark.asyncio +class TestFileWatcherEventWiring: + """Step 6 — events from watchfiles land in the watcher buffer.""" + + async def test_writes_produce_buffered_filechange(self, tmp_path: Path) -> None: + target = tmp_path / "agent.py" + target.write_text("# initial\n") + watcher = FileWatcher(_noop_callback, paths=[target]) + await watcher.start() + try: + # watchfiles takes a few ms to install the OS-level watch on + # macOS; give it a small head start before mutating the file. + await asyncio.sleep(0.1) + target.write_text("# modified\n") + arrived = await _wait_until( + lambda: any( + fc.path.resolve() == target.resolve() + for fc in watcher._pending # noqa: SLF001 — buffer is internal pre-debounce + ), + timeout_s=3.0, + ) + assert arrived, f"No FileChange for target; pending={watcher._pending!r}" # noqa: SLF001 + finally: + await watcher.stop() + + async def test_stop_cancels_watch_task_cleanly(self, tmp_path: Path) -> None: + target = tmp_path / "agent.py" + target.write_text("# initial\n") + watcher = FileWatcher(_noop_callback, paths=[target]) + await watcher.start() + # Give awatch a moment to install. + await asyncio.sleep(0.05) + await watcher.stop() + assert watcher.state == "stopped" + # Internal task field is cleared after a clean shutdown. + assert watcher._watch_task is None # noqa: SLF001 + + async def test_empty_path_list_starts_and_stops(self) -> None: + watcher = FileWatcher(_noop_callback, paths=[]) + await watcher.start() + assert watcher.state == "running" + await watcher.stop() + assert watcher.state == "stopped" + + async def test_emitted_paths_are_absolute(self, tmp_path: Path) -> None: + target = tmp_path / "agent.py" + target.write_text("# initial\n") + watcher = FileWatcher(_noop_callback, paths=[target]) + await watcher.start() + try: + await asyncio.sleep(0.1) + target.write_text("# modified\n") + arrived = await _wait_until( + lambda: bool(watcher._pending), # noqa: SLF001 + timeout_s=3.0, + ) + assert arrived + for fc in watcher._pending: # noqa: SLF001 + assert fc.path.is_absolute(), ( + f"emitted FileChange.path must be absolute, got {fc.path!r}" + ) + finally: + await watcher.stop() + + async def test_refresh_paths_during_run_swaps_watched_set( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + # Two files in DIFFERENT directories so watchfiles' parent- + # directory-watch doesn't accidentally surface file_b's edits + # while we're still watching only file_a. + dir_a = tmp_path / "dir_a" + dir_b = tmp_path / "dir_b" + dir_a.mkdir() + dir_b.mkdir() + file_a = dir_a / "a.py" + file_a.write_text("# initial\n") + file_b = dir_b / "b.py" + file_b.write_text("# initial\n") + + # Auto-discover watcher: discovery returns only file_a initially. + monkeypatch.setattr( + "openrtc.execution.file_watcher._discover_user_modules", + lambda: [file_a], + ) + + received: list[list[FileChange]] = [] + + async def on_change(changes: list[FileChange]) -> None: + received.append(list(changes)) + + watcher = FileWatcher(on_change, debounce_ms=100, paths=None) + assert watcher.paths == [file_a] + await watcher.start() + try: + await asyncio.sleep(0.15) + + # Edit file_b while only file_a is watched: must NOT fire. + file_b.write_text("# v2\n") + await asyncio.sleep(0.4) + assert received == [], ( + f"file_b shouldn't fire before refresh; got {received!r}" + ) + + # Switch discovery to include file_b and refresh. + monkeypatch.setattr( + "openrtc.execution.file_watcher._discover_user_modules", + lambda: [file_a, file_b], + ) + watcher.refresh_paths() + assert watcher.paths == [file_a, file_b] + # Give the watch loop a beat to recreate awatch over the new set. + await asyncio.sleep(0.3) + + # Now editing file_b SHOULD fire. + file_b.write_text("# v3\n") + arrived = await _wait_until(lambda: bool(received), timeout_s=3.0) + assert arrived, "file_b edit didn't fire after refresh_paths()" + paths_seen = {fc.path.resolve() for batch in received for fc in batch} + assert file_b.resolve() in paths_seen + finally: + await watcher.stop() + assert watcher.state == "stopped" + assert watcher._watch_task is None # noqa: SLF001 + + +class TestCollapseChanges: + """``_collapse_changes`` enforces the design.md §3.4 collapse rules.""" + + def test_single_modified_passthrough(self) -> None: + result = _collapse_changes( + [FileChange(path=Path("/tmp/a.py"), change_type="modified")] + ) + assert result == [FileChange(path=Path("/tmp/a.py"), change_type="modified")] + + def test_created_plus_modified_collapses_to_created(self) -> None: + result = _collapse_changes( + [ + FileChange(path=Path("/tmp/a.py"), change_type="created"), + FileChange(path=Path("/tmp/a.py"), change_type="modified"), + ] + ) + assert result == [FileChange(path=Path("/tmp/a.py"), change_type="created")] + + def test_modified_plus_deleted_collapses_to_deleted(self) -> None: + result = _collapse_changes( + [ + FileChange(path=Path("/tmp/a.py"), change_type="modified"), + FileChange(path=Path("/tmp/a.py"), change_type="deleted"), + ] + ) + assert result == [FileChange(path=Path("/tmp/a.py"), change_type="deleted")] + + def test_deleted_dominates_created(self) -> None: + # Created and deleted in the same window: the file is gone, so + # report deleted (final-state-wins for the deletion case). + result = _collapse_changes( + [ + FileChange(path=Path("/tmp/a.py"), change_type="created"), + FileChange(path=Path("/tmp/a.py"), change_type="deleted"), + ] + ) + assert result == [FileChange(path=Path("/tmp/a.py"), change_type="deleted")] + + def test_multiple_paths_preserved_in_first_seen_order(self) -> None: + result = _collapse_changes( + [ + FileChange(path=Path("/tmp/b.py"), change_type="modified"), + FileChange(path=Path("/tmp/a.py"), change_type="modified"), + FileChange(path=Path("/tmp/b.py"), change_type="modified"), + ] + ) + assert result == [ + FileChange(path=Path("/tmp/b.py"), change_type="modified"), + FileChange(path=Path("/tmp/a.py"), change_type="modified"), + ] + + def test_empty_input_returns_empty(self) -> None: + assert _collapse_changes([]) == [] + + +@pytest.mark.asyncio +class TestDebounceAndDispatch: + """Step 7 — trailing-edge debounce coalesces rapid events into one callback.""" + + async def test_one_save_one_callback_after_200ms(self, tmp_path: Path) -> None: + events: list[tuple[float, list[FileChange]]] = [] + + async def on_change(changes: list[FileChange]) -> None: + events.append((time.monotonic(), list(changes))) + + watcher = FileWatcher(on_change, debounce_ms=200, paths=[tmp_path / "a.py"]) + # Don't run the watch loop; drive the seam directly. + watcher._state = "running" # noqa: SLF001 + start = time.monotonic() + watcher._handle_change_batch( # noqa: SLF001 + [FileChange(path=tmp_path / "a.py", change_type="modified")] + ) + # Wait past the debounce window. + await asyncio.sleep(0.35) + + assert len(events) == 1 + elapsed_ms = (events[0][0] - start) * 1000 + assert 150 < elapsed_ms < 350, f"flush at {elapsed_ms:.0f}ms not within 200±150" + assert events[0][1] == [ + FileChange(path=tmp_path / "a.py", change_type="modified"), + ] + await watcher.stop() + + async def test_five_rapid_saves_one_callback(self, tmp_path: Path) -> None: + events: list[tuple[float, list[FileChange]]] = [] + + async def on_change(changes: list[FileChange]) -> None: + events.append((time.monotonic(), list(changes))) + + watcher = FileWatcher(on_change, debounce_ms=200, paths=[tmp_path / "a.py"]) + watcher._state = "running" # noqa: SLF001 + start = time.monotonic() + # Five batches within 50ms, all for the same file. + for _ in range(5): + watcher._handle_change_batch( # noqa: SLF001 + [FileChange(path=tmp_path / "a.py", change_type="modified")] + ) + await asyncio.sleep(0.005) + last_event_time = time.monotonic() + + await asyncio.sleep(0.4) + + assert len(events) == 1, ( + f"expected exactly 1 callback, got {len(events)}: {events!r}" + ) + # Callback should fire ~200ms after the LAST event, not after the first. + elapsed_after_last_ms = (events[0][0] - last_event_time) * 1000 + assert 150 < elapsed_after_last_ms < 300, ( + f"flush at {elapsed_after_last_ms:.0f}ms after last event, " + "expected 200ms ±50ms" + ) + # All five events collapsed to one (same path, all modified). + assert events[0][1] == [ + FileChange(path=tmp_path / "a.py", change_type="modified"), + ] + # Sanity: total elapsed from FIRST event is at least 200ms. + elapsed_total_ms = (events[0][0] - start) * 1000 + assert elapsed_total_ms >= 200 + await watcher.stop() + + async def test_callback_exception_does_not_break_subsequent_dispatches( + self, tmp_path: Path, caplog: pytest.LogCaptureFixture + ) -> None: + call_log: list[list[FileChange]] = [] + raise_on_first = True + + async def on_change(changes: list[FileChange]) -> None: + nonlocal raise_on_first + call_log.append(list(changes)) + if raise_on_first: + raise_on_first = False + raise RuntimeError("boom") + + watcher = FileWatcher(on_change, debounce_ms=100, paths=[tmp_path / "a.py"]) + watcher._state = "running" # noqa: SLF001 + with caplog.at_level(logging.ERROR, logger="openrtc.execution.file_watcher"): + watcher._handle_change_batch( # noqa: SLF001 + [FileChange(path=tmp_path / "a.py", change_type="modified")] + ) + await asyncio.sleep(0.25) + # First flush fired and raised; the watcher must still be alive. + assert len(call_log) == 1 + assert any("on_change raised" in rec.message for rec in caplog.records) + + # Fire a second batch; the watcher should still dispatch. + watcher._handle_change_batch( # noqa: SLF001 + [FileChange(path=tmp_path / "b.py", change_type="created")] + ) + await asyncio.sleep(0.25) + assert len(call_log) == 2 + assert call_log[1] == [ + FileChange(path=tmp_path / "b.py", change_type="created") + ] + await watcher.stop() + + async def test_stop_during_pending_flush_cancels_cleanly( + self, tmp_path: Path + ) -> None: + events: list[list[FileChange]] = [] + + async def on_change(changes: list[FileChange]) -> None: + events.append(list(changes)) + + watcher = FileWatcher(on_change, debounce_ms=300, paths=[tmp_path / "a.py"]) + watcher._state = "running" # noqa: SLF001 + watcher._handle_change_batch( # noqa: SLF001 + [FileChange(path=tmp_path / "a.py", change_type="modified")] + ) + # Stop BEFORE the flush window elapses. + await asyncio.sleep(0.05) + await watcher.stop() + # Wait long enough that any leaked flush would have fired. + await asyncio.sleep(0.4) + assert events == [] + # No leaked tasks. + assert watcher._flush_task is None # noqa: SLF001 + + +class TestPackageReExports: + """``from openrtc import FileWatcher, FileChange`` is part of the public API.""" + + def test_fresh_process_can_import_top_level_names(self) -> None: + import subprocess + + proc = subprocess.run( + [ + sys.executable, + "-c", + ( + "from openrtc import FileWatcher, FileChange; " + "print(FileWatcher.__name__, FileChange.__name__)" + ), + ], + capture_output=True, + text=True, + check=False, + ) + assert proc.returncode == 0, ( + f"fresh-process import failed: stderr={proc.stderr!r}" + ) + assert "FileWatcher FileChange" in proc.stdout + + def test_names_appear_in_dunder_all(self) -> None: + import openrtc + + assert "FileWatcher" in openrtc.__all__ + assert "FileChange" in openrtc.__all__ + + def test_re_exports_are_the_same_objects(self) -> None: + import openrtc + from openrtc.execution.file_watcher import ( + FileChange as InnerFileChange, + ) + from openrtc.execution.file_watcher import ( + FileWatcher as InnerFileWatcher, + ) + + assert openrtc.FileWatcher is InnerFileWatcher + assert openrtc.FileChange is InnerFileChange diff --git a/tests/execution/test_file_watcher_smoke.py b/tests/execution/test_file_watcher_smoke.py new file mode 100644 index 0000000..fd1a341 --- /dev/null +++ b/tests/execution/test_file_watcher_smoke.py @@ -0,0 +1,66 @@ +"""End-to-end smoke test for the file watcher (MAH-80, Step 9). + +Exercises the full pipeline — ``watchfiles.awatch`` -> ``_handle_change_batch`` +-> trailing debounce flush -> ``on_change`` callback — against a real +file on disk. The unit tests in ``test_file_watcher.py`` cover each +seam in isolation; this test confirms the seams stay glued together +when wired through the live filesystem watcher. +""" + +from __future__ import annotations + +import asyncio +import time +from pathlib import Path + +import pytest + +from openrtc import FileChange, FileWatcher + + +@pytest.mark.asyncio +async def test_filewatcher_smoke_against_real_tempdir(tmp_path: Path) -> None: + target = tmp_path / "agent.py" + target.write_text("# initial contents\n") + + received: list[list[FileChange]] = [] + + async def record_changes(changes: list[FileChange]) -> None: + received.append(list(changes)) + + watcher = FileWatcher(record_changes, debounce_ms=200, paths=[target]) + await watcher.start() + try: + # Give watchfiles a moment to install the OS-level watch. + await asyncio.sleep(0.15) + + # One real edit on disk. + target.write_text("# modified contents\n") + + # Wait long enough for the watchfiles batch + 200ms debounce + dispatch. + deadline = time.monotonic() + 3.0 + while time.monotonic() < deadline and not received: + await asyncio.sleep(0.05) + + assert received, ( + f"on_change never fired within 3s; tempdir={tmp_path}, " + f"watcher.state={watcher.state}" + ) + # Exactly one callback for one logical edit. + assert len(received) == 1, ( + f"expected exactly one callback, got {len(received)}: {received!r}" + ) + # The callback receives a list of FileChange touching our file. + paths = {fc.path.resolve() for fc in received[0]} + assert target.resolve() in paths + change_types = {fc.change_type for fc in received[0]} + # Editor-style writes can land as modified or created depending on + # the platform and write strategy; both are valid ends of the contract. + assert change_types <= {"created", "modified", "deleted"} + assert "deleted" not in change_types + finally: + await watcher.stop() + assert watcher.state == "stopped" + # Clean shutdown: no leaked watch task or flush task. + assert watcher._watch_task is None # noqa: SLF001 + assert watcher._flush_task is None # noqa: SLF001 diff --git a/uv.lock b/uv.lock index 71dc629..09f7ac8 100644 --- a/uv.lock +++ b/uv.lock @@ -1347,6 +1347,7 @@ name = "openrtc" source = { editable = "." } dependencies = [ { name = "livekit-agents", extra = ["openai", "silero", "turn-detector"] }, + { name = "watchfiles" }, ] [package.optional-dependencies] @@ -1383,6 +1384,7 @@ requires-dist = [ { name = "textual", marker = "extra == 'tui'", specifier = ">=0.47,<2" }, { name = "typer", marker = "extra == 'cli'", specifier = ">=0.12" }, { name = "typer", marker = "extra == 'tui'", specifier = ">=0.12" }, + { name = "watchfiles", specifier = ">=0.21,<2" }, ] provides-extras = ["cli", "tui"]