diff --git a/operator_use/computer/pip_monitor.py b/operator_use/computer/pip_monitor.py new file mode 100644 index 0000000..810c2e2 --- /dev/null +++ b/operator_use/computer/pip_monitor.py @@ -0,0 +1,328 @@ +"""Picture-in-Picture agent monitor overlay. + +A floating always-on-top window that captures the agent's active window at 2fps +and renders it as a 320x240 thumbnail so the user can observe the agent without +switching macOS Spaces or virtual desktops. + +Dependencies are fully optional — import this module even without PySide6: + + pip install operator-use[pip-monitor] +""" + +from __future__ import annotations + +import subprocess +import sys +import threading +import logging +from typing import Optional + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Optional PySide6 import +# --------------------------------------------------------------------------- + +try: + from PySide6.QtWidgets import QApplication, QLabel, QWidget + from PySide6.QtCore import Qt, QTimer, QSize + from PySide6.QtGui import QPixmap, QScreen + + _PYSIDE6_AVAILABLE = True +except ImportError: # pragma: no cover + _PYSIDE6_AVAILABLE = False + +# --------------------------------------------------------------------------- +# Platform window-ID lookup +# --------------------------------------------------------------------------- + +_CAPTURE_INTERVAL_MS = 500 # 2 fps +_PIP_WIDTH = 320 +_PIP_HEIGHT = 240 + + +def _find_window_id(title: str) -> Optional[int]: + """Return an integer window-ID for the first window matching *title*. + + Returns ``None`` if the window cannot be found or the platform helper is + unavailable. + """ + platform = sys.platform + + if platform == "linux": + return _find_window_id_linux(title) + elif platform == "darwin": + return _find_window_id_macos(title) + elif platform == "win32": + return _find_window_id_windows(title) + + return None + + +def _find_window_id_linux(title: str) -> Optional[int]: + """Use ``xdotool`` to resolve a window ID by title on Linux.""" + try: + result = subprocess.run( + ["xdotool", "search", "--name", title], + capture_output=True, + text=True, + timeout=3, + ) + lines = result.stdout.strip().splitlines() + if lines: + return int(lines[0]) + except Exception as exc: + logger.debug("xdotool lookup failed: %s", exc) + return None + + +def _find_window_id_macos(title: str) -> Optional[int]: + """Use ``CGWindowListCopyWindowInfo`` via ctypes to find a window on macOS.""" + try: + import ctypes + import ctypes.util + + core_graphics = ctypes.cdll.LoadLibrary( + ctypes.util.find_library("CoreGraphics") or "CoreGraphics" + ) + + # kCGWindowListOptionAll = 0, kCGNullWindowID = 0 + window_list = core_graphics.CGWindowListCopyWindowInfo(0, 0) + if not window_list: + return None + + # Use CoreFoundation to iterate the CFArray + cf = ctypes.cdll.LoadLibrary(ctypes.util.find_library("CoreFoundation") or "CoreFoundation") + + count = cf.CFArrayGetCount(window_list) + for i in range(count): + item = cf.CFArrayGetValueAtIndex(window_list, i) + # Read kCGWindowName key + key = cf.CFStringCreateWithCString(None, b"kCGWindowName", 0x08000100) + value = cf.CFDictionaryGetValue(item, key) + if not value: + cf.CFRelease(key) + continue + + buf = ctypes.create_string_buffer(512) + cf.CFStringGetCString(value, buf, 512, 0x08000100) + window_name = buf.value.decode("utf-8", errors="replace") + cf.CFRelease(key) + + if title.lower() in window_name.lower(): + # Read kCGWindowNumber + num_key = cf.CFStringCreateWithCString(None, b"kCGWindowNumber", 0x08000100) + num_val = cf.CFDictionaryGetValue(item, num_key) + wid = ctypes.c_int64(0) + cf.CFNumberGetValue(num_val, 4, ctypes.byref(wid)) + cf.CFRelease(num_key) + cf.CFRelease(window_list) + return int(wid.value) + + cf.CFRelease(window_list) + except Exception as exc: + logger.debug("CGWindowListCopyWindowInfo lookup failed: %s", exc) + return None + + +def _find_window_id_windows(title: str) -> Optional[int]: + """Use ``FindWindow`` via ctypes to locate a window by title on Windows.""" + try: + import ctypes + + hwnd = ctypes.windll.user32.FindWindowW(None, title) + if hwnd: + return int(hwnd) + except Exception as exc: + logger.debug("FindWindow lookup failed: %s", exc) + return None + + +# --------------------------------------------------------------------------- +# PiP overlay widget +# --------------------------------------------------------------------------- + +if _PYSIDE6_AVAILABLE: + + class _PiPWindow(QWidget): + """Frameless, always-on-top overlay that renders the agent window.""" + + def __init__(self, source_title: str) -> None: + super().__init__( + None, + Qt.WindowType.WindowStaysOnTopHint | Qt.WindowType.FramelessWindowHint, + ) + self._source_title = source_title + self._label = QLabel(self) + self._label.setAlignment(Qt.AlignmentFlag.AlignCenter) + self._label.resize(_PIP_WIDTH, _PIP_HEIGHT) + + self.setFixedSize(_PIP_WIDTH, _PIP_HEIGHT) + self.setWindowOpacity(0.88) + self._place_top_right() + + self._timer = QTimer(self) + self._timer.setInterval(_CAPTURE_INTERVAL_MS) + self._timer.timeout.connect(self._refresh) + self._timer.start() + + def _place_top_right(self) -> None: + """Position the window in the top-right corner of the primary screen.""" + screen: QScreen = QApplication.primaryScreen() + geom = screen.availableGeometry() + x = geom.right() - _PIP_WIDTH + y = geom.top() + self.move(x, y) + + def update_source(self, title: str) -> None: + self._source_title = title + + def _refresh(self) -> None: + """Grab the source window and update the label pixmap.""" + wid = _find_window_id(self._source_title) + if wid is None: + return + + screen: QScreen = QApplication.primaryScreen() + pixmap: QPixmap = screen.grabWindow(wid) + if pixmap.isNull(): + return + + scaled = pixmap.scaled( + QSize(_PIP_WIDTH, _PIP_HEIGHT), + Qt.AspectRatioMode.KeepAspectRatio, + Qt.TransformationMode.SmoothTransformation, + ) + self._label.setPixmap(scaled) + + def closeEvent(self, event) -> None: # noqa: N802 + self._timer.stop() + super().closeEvent(event) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +class PiPMonitor: + """Floating picture-in-picture monitor for an agent's active window. + + Usage:: + + monitor = PiPMonitor() + monitor.start("Agent Window Title") + # ... agent runs ... + monitor.stop() + """ + + def __init__(self) -> None: + self._thread: Optional[threading.Thread] = None + self._running = threading.Event() + self._app: Optional[object] = None + self._window: Optional[object] = None + self._source_title: str = "" + + # ------------------------------------------------------------------ + # Public methods + # ------------------------------------------------------------------ + + def start(self, source_window_title: str) -> None: + """Launch the PiP overlay in a daemon thread. + + Does nothing (logs a warning) when PySide6 is not installed. + """ + if not _PYSIDE6_AVAILABLE: + logger.warning( + "PiPMonitor: PySide6 is not installed. " + "Install it with: pip install operator-use[pip-monitor]" + ) + return + + if self._running.is_set(): + logger.warning("PiPMonitor: already running — call stop() first.") + return + + self._source_title = source_window_title + self._running.set() + + self._thread = threading.Thread( + target=self._run_event_loop, + args=(source_window_title,), + daemon=True, + name="pip-monitor", + ) + self._thread.start() + + def stop(self) -> None: + """Close the PiP window and join the daemon thread.""" + self._running.clear() + + if self._window is not None: + try: + self._window.close() + except Exception as exc: + logger.debug("PiPMonitor: error closing window: %s", exc) + self._window = None + + if self._app is not None: + try: + self._app.quit() + except Exception as exc: + logger.debug("PiPMonitor: error quitting app: %s", exc) + self._app = None + + if self._thread is not None and self._thread.is_alive(): + self._thread.join(timeout=3.0) + self._thread = None + + def update_source(self, window_title: str) -> None: + """Change the monitored window title (takes effect on next capture).""" + self._source_title = window_title + if self._window is not None: + try: + self._window.update_source(window_title) + except Exception as exc: + logger.debug("PiPMonitor: error updating source: %s", exc) + + # ------------------------------------------------------------------ + # Introspection helpers + # ------------------------------------------------------------------ + + @property + def is_running(self) -> bool: + """True if the overlay is currently active.""" + return self._running.is_set() + + @property + def source_title(self) -> str: + """The window title currently being monitored.""" + return self._source_title + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _run_event_loop(self, source_title: str) -> None: + """Run the Qt event loop inside the daemon thread.""" + if not _PYSIDE6_AVAILABLE: + return + + app = QApplication.instance() or QApplication(sys.argv) + self._app = app + + window = _PiPWindow(source_title) + self._window = window + window.show() + + app.exec() + + +# --------------------------------------------------------------------------- +# Module-level availability check +# --------------------------------------------------------------------------- + + +def is_available() -> bool: + """Return True when PySide6 is installed and the overlay can be used.""" + return _PYSIDE6_AVAILABLE diff --git a/pyproject.toml b/pyproject.toml index 111b2a5..e57aa8b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,6 +65,9 @@ exa = [ tavily = [ "tavily-python>=0.5.0", ] +pip-monitor = [ + "PySide6>=6.7.0", +] [tool.pytest.ini_options] asyncio_mode = "strict" diff --git a/tests/test_agent.py b/tests/test_agent.py index 4fb6c3f..13db174 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -186,7 +186,7 @@ async def test_agent_run_with_tool_call_then_text(tmp_path): # Register a simple echo tool from pydantic import BaseModel - from operator_use.tools.service import Tool + from operator_use.agent.tools.service import Tool class EchoParams(BaseModel): message: str diff --git a/tests/test_control_center.py b/tests/test_control_center.py index f3a2e5b..0efe749 100644 --- a/tests/test_control_center.py +++ b/tests/test_control_center.py @@ -4,7 +4,7 @@ import pytest from unittest.mock import AsyncMock, MagicMock, patch -from operator_use.agent.tools.builtin.control_center import ( +from operator_use.tools.control_center import ( control_center, _set_plugin_enabled, _get_plugin_enabled, diff --git a/tests/test_local_agents.py b/tests/test_local_agents.py index 8fd831b..a1b5168 100644 --- a/tests/test_local_agents.py +++ b/tests/test_local_agents.py @@ -2,7 +2,7 @@ import pytest -from operator_use.agent.tools.builtin.local_agents import LOCAL_AGENT_DELEGATION_CHAIN, localagents +from operator_use.tools.local_agents import LOCAL_AGENT_DELEGATION_CHAIN, localagents from operator_use.messages.service import AIMessage diff --git a/tests/test_pip_monitor.py b/tests/test_pip_monitor.py new file mode 100644 index 0000000..39f7d2d --- /dev/null +++ b/tests/test_pip_monitor.py @@ -0,0 +1,366 @@ +"""Tests for PiPMonitor — Picture-in-Picture agent overlay. + +PySide6 is mocked entirely at the module level so this test suite runs in any +CI environment without a display or Qt installation. +""" + +from __future__ import annotations + +import sys +import threading +from unittest.mock import MagicMock, patch + +import pytest + + +# --------------------------------------------------------------------------- +# Mock PySide6 before importing the module under test +# --------------------------------------------------------------------------- + + +def _make_pyside6_mock() -> MagicMock: + """Build a minimal PySide6 mock hierarchy.""" + pyside6 = MagicMock(name="PySide6") + + # QtCore + qt_core = MagicMock(name="PySide6.QtCore") + qt_core.Qt = MagicMock() + qt_core.Qt.WindowType = MagicMock() + qt_core.Qt.WindowType.WindowStaysOnTopHint = 0x0040 + qt_core.Qt.WindowType.FramelessWindowHint = 0x0800 + qt_core.Qt.AlignmentFlag = MagicMock() + qt_core.Qt.AlignmentFlag.AlignCenter = 0x0004 + qt_core.Qt.AspectRatioMode = MagicMock() + qt_core.Qt.AspectRatioMode.KeepAspectRatio = 1 + qt_core.Qt.TransformationMode = MagicMock() + qt_core.Qt.TransformationMode.SmoothTransformation = 1 + + mock_timer = MagicMock(name="QTimer") + mock_timer_instance = MagicMock(name="QTimer-instance") + mock_timer.return_value = mock_timer_instance + qt_core.QTimer = mock_timer + qt_core.QSize = MagicMock(return_value=MagicMock()) + + # QtWidgets + qt_widgets = MagicMock(name="PySide6.QtWidgets") + mock_widget = MagicMock(name="QWidget") + mock_widget_instance = MagicMock(name="QWidget-instance") + mock_widget.return_value = mock_widget_instance + qt_widgets.QWidget = mock_widget + + mock_app = MagicMock(name="QApplication") + mock_app_instance = MagicMock(name="QApplication-instance") + mock_app.return_value = mock_app_instance + mock_app.instance.return_value = None + mock_app.primaryScreen.return_value = MagicMock(name="QScreen") + qt_widgets.QApplication = mock_app + + mock_label = MagicMock(name="QLabel") + mock_label_instance = MagicMock(name="QLabel-instance") + mock_label.return_value = mock_label_instance + qt_widgets.QLabel = mock_label + + # QtGui + qt_gui = MagicMock(name="PySide6.QtGui") + mock_pixmap = MagicMock(name="QPixmap") + mock_pixmap_instance = MagicMock(name="QPixmap-instance") + mock_pixmap_instance.isNull.return_value = False + mock_pixmap_instance.scaled.return_value = MagicMock(name="scaled-pixmap") + mock_pixmap.return_value = mock_pixmap_instance + qt_gui.QPixmap = mock_pixmap + + mock_screen = MagicMock(name="QScreen") + mock_screen_instance = MagicMock(name="QScreen-instance") + mock_screen.return_value = mock_screen_instance + qt_gui.QScreen = mock_screen + + pyside6.QtCore = qt_core + pyside6.QtWidgets = qt_widgets + pyside6.QtGui = qt_gui + + return pyside6 + + +_pyside6_mock = _make_pyside6_mock() +sys.modules["PySide6"] = _pyside6_mock +sys.modules["PySide6.QtCore"] = _pyside6_mock.QtCore +sys.modules["PySide6.QtWidgets"] = _pyside6_mock.QtWidgets +sys.modules["PySide6.QtGui"] = _pyside6_mock.QtGui + +# Now import the module — PySide6 will resolve to our mock +import importlib # noqa: E402 +import operator_use.computer.pip_monitor as pip_module # noqa: E402 + +importlib.reload(pip_module) + +from operator_use.computer.pip_monitor import ( # noqa: E402 + PiPMonitor, + is_available, + _find_window_id_linux, + _find_window_id_macos, + _find_window_id_windows, + _PIP_WIDTH, + _PIP_HEIGHT, + _CAPTURE_INTERVAL_MS, + _PYSIDE6_AVAILABLE, +) + + +# --------------------------------------------------------------------------- +# Availability helpers +# --------------------------------------------------------------------------- + + +class TestIsAvailable: + def test_returns_bool(self): + result = is_available() + assert isinstance(result, bool) + + def test_true_when_pyside6_mocked(self): + # We injected the mock, so _PYSIDE6_AVAILABLE should be True after reload + assert _PYSIDE6_AVAILABLE is True + + def test_returns_false_without_pyside6(self): + """Simulate missing PySide6 by temporarily patching the flag.""" + with patch.object(pip_module, "_PYSIDE6_AVAILABLE", False): + assert pip_module.is_available() is False + + def test_is_available_matches_flag(self): + with patch.object(pip_module, "_PYSIDE6_AVAILABLE", True): + assert pip_module.is_available() is True + with patch.object(pip_module, "_PYSIDE6_AVAILABLE", False): + assert pip_module.is_available() is False + + +# --------------------------------------------------------------------------- +# Window geometry / constants +# --------------------------------------------------------------------------- + + +class TestWindowGeometry: + def test_pip_width_is_320(self): + assert _PIP_WIDTH == 320 + + def test_pip_height_is_240(self): + assert _PIP_HEIGHT == 240 + + def test_capture_interval_is_500ms(self): + assert _CAPTURE_INTERVAL_MS == 500 + + def test_width_height_ratio(self): + assert _PIP_WIDTH / _PIP_HEIGHT == pytest.approx(4 / 3) + + +# --------------------------------------------------------------------------- +# start() — launches daemon thread +# --------------------------------------------------------------------------- + + +class TestStart: + def test_start_sets_running_flag(self): + monitor = PiPMonitor() + with patch.object(monitor, "_run_event_loop"): + monitor.start("Agent Window") + assert monitor.is_running + + def test_start_creates_daemon_thread(self): + monitor = PiPMonitor() + with patch.object(monitor, "_run_event_loop"): + monitor.start("Agent Window") + assert monitor._thread is not None + assert monitor._thread.daemon is True + + def test_start_stores_source_title(self): + monitor = PiPMonitor() + with patch.object(monitor, "_run_event_loop"): + monitor.start("My Agent Window") + assert monitor.source_title == "My Agent Window" + + def test_start_noop_when_pyside6_missing(self): + monitor = PiPMonitor() + with patch.object(pip_module, "_PYSIDE6_AVAILABLE", False): + monitor.start("Agent Window") + assert not monitor.is_running + assert monitor._thread is None + + def test_start_noop_if_already_running(self): + monitor = PiPMonitor() + monitor._running.set() + original_thread = monitor._thread + with patch.object(monitor, "_run_event_loop"): + monitor.start("Agent Window") + assert monitor._thread is original_thread # unchanged + + +# --------------------------------------------------------------------------- +# stop() — clears flag, closes window, joins thread +# --------------------------------------------------------------------------- + + +class TestStop: + def test_stop_clears_running_flag(self): + monitor = PiPMonitor() + monitor._running.set() + monitor.stop() + assert not monitor.is_running + + def test_stop_joins_thread(self): + monitor = PiPMonitor() + finished = threading.Event() + + def slow(): + finished.wait(timeout=2) + + t = threading.Thread(target=slow, daemon=True) + t.start() + monitor._thread = t + monitor._running.set() + finished.set() + monitor.stop() + assert not t.is_alive() + + def test_stop_closes_window(self): + monitor = PiPMonitor() + mock_window = MagicMock(name="window") + monitor._window = mock_window + monitor._running.set() + monitor.stop() + mock_window.close.assert_called_once() + + def test_stop_quits_app(self): + monitor = PiPMonitor() + mock_app = MagicMock(name="app") + monitor._app = mock_app + monitor._running.set() + monitor.stop() + mock_app.quit.assert_called_once() + + def test_stop_resets_window_and_app_to_none(self): + monitor = PiPMonitor() + monitor._window = MagicMock() + monitor._app = MagicMock() + monitor._running.set() + monitor.stop() + assert monitor._window is None + assert monitor._app is None + + def test_stop_idempotent_when_not_running(self): + monitor = PiPMonitor() + monitor.stop() # should not raise + assert not monitor.is_running + + +# --------------------------------------------------------------------------- +# update_source() — changes monitored window title +# --------------------------------------------------------------------------- + + +class TestUpdateSource: + def test_update_source_changes_title(self): + monitor = PiPMonitor() + with patch.object(monitor, "_run_event_loop"): + monitor.start("Old Title") + monitor.update_source("New Title") + assert monitor.source_title == "New Title" + + def test_update_source_delegates_to_window(self): + monitor = PiPMonitor() + mock_window = MagicMock(name="window") + monitor._window = mock_window + monitor.update_source("Browser - Agent") + mock_window.update_source.assert_called_once_with("Browser - Agent") + + def test_update_source_without_window_does_not_raise(self): + monitor = PiPMonitor() + monitor._window = None + monitor.update_source("Some Title") # should not raise + assert monitor.source_title == "Some Title" + + +# --------------------------------------------------------------------------- +# Platform window-ID finders (mocked subprocess / ctypes) +# --------------------------------------------------------------------------- + + +class TestFindWindowIdLinux: + def test_returns_wid_on_success(self): + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(stdout="12345\n", returncode=0) + wid = _find_window_id_linux("My App") + assert wid == 12345 + + def test_returns_none_on_empty_output(self): + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(stdout="", returncode=1) + wid = _find_window_id_linux("Missing Window") + assert wid is None + + def test_returns_none_on_exception(self): + with patch("subprocess.run", side_effect=FileNotFoundError("xdotool not found")): + wid = _find_window_id_linux("My App") + assert wid is None + + +class TestFindWindowIdMacOs: + def test_returns_none_on_ctypes_failure(self): + with patch.dict(sys.modules, {"ctypes": None}): + wid = _find_window_id_macos("Anything") + assert wid is None + + def test_returns_none_on_exception(self): + with patch("ctypes.cdll") as mock_cdll: + mock_cdll.LoadLibrary.side_effect = OSError("lib not found") + wid = _find_window_id_macos("My App") + assert wid is None + + +class TestFindWindowIdWindows: + def test_returns_hwnd_when_found(self): + mock_windll = MagicMock(name="windll") + mock_windll.user32.FindWindowW.return_value = 99 + with patch("ctypes.windll", mock_windll, create=True): + wid = _find_window_id_windows("My App") + assert wid == 99 + + def test_returns_none_when_not_found(self): + mock_windll = MagicMock(name="windll") + mock_windll.user32.FindWindowW.return_value = 0 + with patch("ctypes.windll", mock_windll, create=True): + wid = _find_window_id_windows("Missing App") + assert wid is None + + def test_returns_none_on_exception(self): + """If ctypes.windll raises, returns None.""" + mock_windll = MagicMock(name="windll") + mock_windll.user32.FindWindowW.side_effect = OSError("access denied") + with patch("ctypes.windll", mock_windll, create=True): + wid = _find_window_id_windows("My App") + assert wid is None + + +# --------------------------------------------------------------------------- +# PiPMonitor properties +# --------------------------------------------------------------------------- + + +class TestProperties: + def test_is_running_false_initially(self): + monitor = PiPMonitor() + assert not monitor.is_running + + def test_source_title_empty_initially(self): + monitor = PiPMonitor() + assert monitor.source_title == "" + + def test_is_running_true_after_start(self): + monitor = PiPMonitor() + with patch.object(monitor, "_run_event_loop"): + monitor.start("Window") + assert monitor.is_running + + def test_is_running_false_after_stop(self): + monitor = PiPMonitor() + with patch.object(monitor, "_run_event_loop"): + monitor.start("Window") + monitor.stop() + assert not monitor.is_running diff --git a/tests/test_plugins.py b/tests/test_plugins.py index f6ba6d4..5d9f8b9 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -7,7 +7,7 @@ from operator_use.agent.tools.registry import ToolRegistry from operator_use.agent.hooks.service import Hooks from operator_use.agent.hooks.events import HookEvent -from operator_use.tools.service import Tool +from operator_use.agent.tools.service import Tool from pydantic import BaseModel diff --git a/tests/test_tool_registry.py b/tests/test_tool_registry.py index ca6ed75..77c70b9 100644 --- a/tests/test_tool_registry.py +++ b/tests/test_tool_registry.py @@ -4,7 +4,7 @@ from pydantic import BaseModel from operator_use.agent.tools.registry import ToolRegistry -from operator_use.tools.service import Tool +from operator_use.agent.tools.service import Tool # --- Helpers --- diff --git a/tests/test_tools.py b/tests/test_tools.py index 8cbf913..de572ab 100644 --- a/tests/test_tools.py +++ b/tests/test_tools.py @@ -4,7 +4,7 @@ from pydantic import BaseModel from typing import Literal -from operator_use.tools.service import Tool, ToolResult +from operator_use.agent.tools.service import Tool, ToolResult # --- ToolResult ---