From 147e080ff19a71472202121e496182a9e6238d4a Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Date: Thu, 4 Dec 2025 22:54:01 +0100 Subject: [PATCH 1/8] driver: nanokvm network driver --- .../reference/package-apis/drivers/index.md | 2 + .../reference/package-apis/drivers/nanokvm.md | 1 + .../jumpstarter-driver-nanokvm/.gitignore | 3 + packages/jumpstarter-driver-nanokvm/README.md | 266 ++++++++++++ .../examples/exporter.yaml | 19 + .../jumpstarter_driver_nanokvm/__init__.py | 13 + .../jumpstarter_driver_nanokvm/client.py | 317 ++++++++++++++ .../jumpstarter_driver_nanokvm/driver.py | 391 ++++++++++++++++++ .../jumpstarter_driver_nanokvm/driver_test.py | 202 +++++++++ .../jumpstarter-driver-nanokvm/pyproject.toml | 53 +++ uv.lock | 56 +++ 11 files changed, 1323 insertions(+) create mode 120000 docs/source/reference/package-apis/drivers/nanokvm.md create mode 100644 packages/jumpstarter-driver-nanokvm/.gitignore create mode 100644 packages/jumpstarter-driver-nanokvm/README.md create mode 100644 packages/jumpstarter-driver-nanokvm/examples/exporter.yaml create mode 100644 packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/__init__.py create mode 100644 packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py create mode 100644 packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py create mode 100644 packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py create mode 100644 packages/jumpstarter-driver-nanokvm/pyproject.toml diff --git a/docs/source/reference/package-apis/drivers/index.md b/docs/source/reference/package-apis/drivers/index.md index 2a5e173cc..7db26160d 100644 --- a/docs/source/reference/package-apis/drivers/index.md +++ b/docs/source/reference/package-apis/drivers/index.md @@ -60,6 +60,7 @@ Drivers that handle media streams: * **[UStreamer](ustreamer.md)** (`jumpstarter-driver-ustreamer`) - Video streaming functionality +* **[NanoKVM](nanokvm.md)** (`jumpstarter-driver-nanokvm`) - NanoKVM remote KVM control ### Debug and Programming Drivers @@ -95,6 +96,7 @@ flashers.md http.md http-power.md iscsi.md +nanokvm.md network.md opendal.md power.md diff --git a/docs/source/reference/package-apis/drivers/nanokvm.md b/docs/source/reference/package-apis/drivers/nanokvm.md new file mode 120000 index 000000000..dfb1e4b78 --- /dev/null +++ b/docs/source/reference/package-apis/drivers/nanokvm.md @@ -0,0 +1 @@ +../../../../../packages/jumpstarter-driver-nanokvm/README.md \ No newline at end of file diff --git a/packages/jumpstarter-driver-nanokvm/.gitignore b/packages/jumpstarter-driver-nanokvm/.gitignore new file mode 100644 index 000000000..cbc5d672b --- /dev/null +++ b/packages/jumpstarter-driver-nanokvm/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +.coverage +coverage.xml diff --git a/packages/jumpstarter-driver-nanokvm/README.md b/packages/jumpstarter-driver-nanokvm/README.md new file mode 100644 index 000000000..8eb6bf0c2 --- /dev/null +++ b/packages/jumpstarter-driver-nanokvm/README.md @@ -0,0 +1,266 @@ +# NanoKVM Driver + +`jumpstarter-driver-nanokvm` provides comprehensive support for [NanoKVM](https://github.com/sipeed/NanoKVM) devices, enabling remote KVM (Keyboard, Video, Mouse) control over the network. + +## Features + +- **Video Streaming**: Access live video feed from the connected device +- **Snapshot Capture**: Take screenshots of the video stream +- **Keyboard Control**: Send text and keystrokes via HID emulation +- **Mouse Control**: Full mouse support via WebSocket + - Absolute positioning (0-65535 coordinate system) + - Relative movement + - Left/right/middle button clicks + - Mouse wheel scrolling +- **Device Management**: Get device info, reboot the NanoKVM +- **Composite Driver**: Access all functionality through a unified interface + +## Installation + +```{code-block} console +:substitutions: +$ pip3 install --extra-index-url {{index_url}} jumpstarter-driver-nanokvm +``` + +## Configuration + +### Basic Configuration + +```yaml +export: + nanokvm: + type: jumpstarter_driver_nanokvm.driver.NanoKVM + config: + host: "nanokvm.local" # Hostname or IP address + username: "admin" # Default NanoKVM web interface username + password: "admin" # Default NanoKVM web interface password +``` + +### Advanced Configuration + +```yaml +export: + nanokvm: + type: jumpstarter_driver_nanokvm.driver.NanoKVM + config: + host: "192.168.1.100" + username: "admin" + password: "your-password" + # Optional: SSH access for serial console (future feature) + enable_serial: false + ssh_username: "root" + ssh_password: "root" + ssh_port: 22 +``` + +### Config Parameters + +| Parameter | Description | Type | Required | Default | +| -------------- | ------------------------------------------ | ----- | -------- | ------- | +| host | NanoKVM hostname or IP address | str | yes | | +| username | Web interface username | str | no | "admin" | +| password | Web interface password | str | no | "admin" | +| enable_serial | Enable serial console access via SSH | bool | no | false | +| ssh_username | SSH username for serial console | str | no | "root" | +| ssh_password | SSH password for serial console | str | no | "root" | +| ssh_port | SSH port for serial console | int | no | 22 | + +## Architecture + +The NanoKVM driver is a composite driver that provides three main interfaces: + +1. **video**: Video streaming and snapshot capture +2. **hid**: Keyboard and mouse HID control +3. **serial**: Serial console access (optional, future feature) + +## API Reference + +### NanoKVMClient + +```{eval-rst} +.. autoclass:: jumpstarter_driver_nanokvm.client.NanoKVMClient() + :members: get_info, reboot +``` + +### NanoKVMVideoClient + +```{eval-rst} +.. autoclass:: jumpstarter_driver_nanokvm.client.NanoKVMVideoClient() + :members: snapshot +``` + +### NanoKVMHIDClient + +```{eval-rst} +.. autoclass:: jumpstarter_driver_nanokvm.client.NanoKVMHIDClient() + :members: paste_text, press_key, reset_hid, mouse_move_abs, mouse_move_rel, mouse_click, mouse_scroll +``` + +## CLI Usage + +The NanoKVM driver provides CLI commands accessible through the `jmp shell` command: + +### Main Commands + +```bash +# Get device information +j nanokvm info + +# Reboot the NanoKVM device (with confirmation) +j nanokvm reboot +``` + +### Video Commands + +```bash +# Take a snapshot (saves to snapshot.jpg by default) +j nanokvm video snapshot + +# Take a snapshot with custom filename +j nanokvm video snapshot my_screenshot.jpg +``` + +### HID Commands + +#### Keyboard Commands + +```bash +# Paste text via keyboard HID +j nanokvm hid paste "Hello, World!" + +# Send commands with newline (use $'...' syntax in bash for escape sequences) +j nanokvm hid paste $'root\n' + +# Or use double backslash +j nanokvm hid paste "root\\n" + +# Send multiple lines +j nanokvm hid paste $'ls -la\ndate\n' + +# Press a single key +j nanokvm hid press "a" + +# Press special keys +j nanokvm hid press $'\n' # Enter +j nanokvm hid press $'\t' # Tab + +# Reset HID subsystem if it's not responding +j nanokvm hid reset +``` + +#### Mouse Commands + +```bash +# Move mouse to absolute coordinates (0-65535, scaled to screen) +j nanokvm hid mouse move 32768 32768 # Center of screen + +# Move mouse relatively (-127 to 127) +j nanokvm hid mouse move-rel 50 50 # Move right and down + +# Click at current position (default: left button) +j nanokvm hid mouse click + +# Click with specific button +j nanokvm hid mouse click --button right + +# Click at specific coordinates +j nanokvm hid mouse click --x 32768 --y 32768 --button left + +# Scroll (default: down 5 units) +j nanokvm hid mouse scroll + +# Scroll up +j nanokvm hid mouse scroll --dy 5 + +# Scroll down +j nanokvm hid mouse scroll --dy -5 +``` + +### Example Session + +```bash +# Connect to the exporter +jmp shell -l my=device + +# Inside the shell, use the commands +j nanokvm info +j nanokvm video snapshot my_screen.jpg +j nanokvm hid paste "echo 'Hello from NanoKVM'\n" +``` + +## Usage Examples + +### Basic Setup + +```python +image = nanokvm.video.snapshot() +image.save("snapshot.jpg") +print(f"Snapshot size: {image.size}") +``` + +### Keyboard Control + +```python +# Paste text to the connected device +nanokvm.hid.paste_text("Hello from Jumpstarter!\n") + +# Send commands +nanokvm.hid.paste_text("ls -la\n") + +# Press individual keys +nanokvm.hid.press_key("a") +nanokvm.hid.press_key("\n") # Enter +nanokvm.hid.press_key("\t") # Tab +``` + +### Mouse Control + +```python +# Move mouse to center of screen +nanokvm.hid.mouse_move_abs(32768, 32768) + +# Click left button +nanokvm.hid.mouse_click("left") + +# Click at specific coordinates +nanokvm.hid.mouse_click("left", x=32768, y=16384) + +# Move mouse relatively +nanokvm.hid.mouse_move_rel(50, 50) # Move right and down + +# Scroll up +nanokvm.hid.mouse_scroll(0, 5) + +# Scroll down +nanokvm.hid.mouse_scroll(0, -5) +``` + +### Device Management + +```python +# Get device info +info = nanokvm.get_info() +print(f"Device: {info['mdns']}") +print(f"IPs: {info['ips']}") +print(f"Application version: {info['application']}") + +# Reset HID +nanokvm.hid.reset_hid() +``` + + +## Character Support for paste_text() + +The `paste_text()` method supports a limited character set due to HID keyboard constraints: + +- Alphanumeric: `A-Z`, `a-z`, `0-9` +- Punctuation: `` `~!@#$%^&*()-_=+[]{}\|;:'",.<>/? `` +- Whitespace: Tab (`\t`), Newline (`\n`), Space +- Not supported: Extended Unicode, emoji, special control characters + + +## Related Documentation + +- [NanoKVM GitHub](https://github.com/sipeed/NanoKVM) +- [python-nanokvm Library](https://github.com/puddly/python-nanokvm) +- [Jumpstarter Documentation](https://jumpstarter.dev) diff --git a/packages/jumpstarter-driver-nanokvm/examples/exporter.yaml b/packages/jumpstarter-driver-nanokvm/examples/exporter.yaml new file mode 100644 index 000000000..f296fb3ff --- /dev/null +++ b/packages/jumpstarter-driver-nanokvm/examples/exporter.yaml @@ -0,0 +1,19 @@ +apiVersion: jumpstarter.dev/v1alpha1 +kind: ExporterConfig +metadata: + namespace: default + name: demo +endpoint: grpc.jumpstarter.192.168.0.203.nip.io:8082 +token: "" +export: + nanokvm: + type: jumpstarter_driver_nanokvm.driver.NanoKVM + config: + host: "192.168.1.110" # or IP address like "192.168.1.100" + username: "admin" + password: "admin" + # Optional: Enable serial console access via SSH + # enable_serial: true + # ssh_username: "root" + # ssh_password: "root" + # ssh_port: 22 diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/__init__.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/__init__.py new file mode 100644 index 000000000..e37a76603 --- /dev/null +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/__init__.py @@ -0,0 +1,13 @@ +"""NanoKVM driver for Jumpstarter + +This package provides support for NanoKVM devices, including: +- Video streaming and snapshot capture +- Keyboard and mouse HID control +- Serial console access (optional) +""" + +from .driver import NanoKVM, NanoKVMHID, NanoKVMVideo + +__all__ = ["NanoKVM", "NanoKVMVideo", "NanoKVMHID"] + + diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py new file mode 100644 index 000000000..b6b681aff --- /dev/null +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py @@ -0,0 +1,317 @@ +import io +from base64 import b64decode +from dataclasses import dataclass + +import click +from jumpstarter_driver_composite.client import CompositeClient +from PIL import Image + +from jumpstarter.client import DriverClient +from jumpstarter.client.decorators import driver_click_group + + +@dataclass(kw_only=True) +class NanoKVMVideoClient(DriverClient): + """ + Client interface for NanoKVM video streaming + + This client provides methods to access video stream and snapshots + from the NanoKVM device. + """ + + def snapshot(self) -> Image.Image: + """ + Get a snapshot image from the video stream + + Returns: + PIL Image object of the snapshot + """ + input_jpg_data = b64decode(self.call("snapshot")) + return Image.open(io.BytesIO(input_jpg_data)) + + def cli(self): + @driver_click_group(self) + def base(): + """NanoKVM Video commands""" + pass + + @base.command() + @click.argument("output", type=click.Path(), default="snapshot.jpg") + def snapshot(output): + """Take a snapshot and save to file""" + image = self.snapshot() + image.save(output) + click.echo(f"Snapshot saved to {output}") + click.echo(f"Image size: {image.size[0]}x{image.size[1]}") + + return base + + +@dataclass(kw_only=True) +class NanoKVMHIDClient(DriverClient): + """ + Client interface for NanoKVM HID (Keyboard/Mouse) control + + This client provides methods to send keyboard and mouse events + to the device connected to the NanoKVM. + """ + + def paste_text(self, text: str): + """ + Paste text via keyboard HID simulation + + Args: + text: Text to paste. Supports limited character set: + alphanumeric, basic punctuation, and common symbols + + Example:: + + hid.paste_text("Hello, World!") + hid.paste_text("ls -la\\n") + """ + self.call("paste_text", text) + + def press_key(self, key: str): + """ + Press a key by pasting a single character + + Args: + key: Single character or escape sequence to press + + Example:: + + hid.press_key("a") # Type 'a' + hid.press_key("A") # Type 'A' + hid.press_key("\\n") # Press Enter + hid.press_key("\\t") # Press Tab + """ + self.call("press_key", key) + + def reset_hid(self): + """ + Reset the HID subsystem + + This can help recover from HID communication issues. + """ + self.call("reset_hid") + + def mouse_move_abs(self, x: int, y: int): + """ + Move mouse to absolute coordinates + + Args: + x: X coordinate (0-65535, scaled to screen resolution) + y: Y coordinate (0-65535, scaled to screen resolution) + + Example:: + + # Move to center of screen (assuming 1920x1080) + hid.mouse_move_abs(32768, 32768) + """ + self.call("mouse_move_abs", x, y) + + def mouse_move_rel(self, dx: int, dy: int): + """ + Move mouse relative to current position + + Args: + dx: X movement delta (-127 to 127) + dy: Y movement delta (-127 to 127) + + Example:: + + # Move right and down + hid.mouse_move_rel(50, 50) + """ + self.call("mouse_move_rel", dx, dy) + + def mouse_click(self, button: str = "left", x: int | None = None, y: int | None = None): + """ + Click a mouse button + + Args: + button: Mouse button to click ("left", "right", "middle") + x: Optional X coordinate for absolute positioning before click + y: Optional Y coordinate for absolute positioning before click + + Example:: + + # Click at current position + hid.mouse_click("left") + + # Click at specific coordinates + hid.mouse_click("left", 32768, 32768) + + # Right-click + hid.mouse_click("right") + """ + if x is not None and y is not None: + self.call("mouse_click", button, x, y) + else: + self.call("mouse_click", button, None, None) + + def mouse_scroll(self, dx: int, dy: int): + """ + Scroll the mouse wheel + + Args: + dx: Horizontal scroll amount + dy: Vertical scroll amount (positive=up, negative=down) + + Example:: + + # Scroll up + hid.mouse_scroll(0, 5) + + # Scroll down + hid.mouse_scroll(0, -5) + """ + self.call("mouse_scroll", dx, dy) + + def cli(self): # noqa: C901 + @driver_click_group(self) + def base(): + """NanoKVM HID (Keyboard/Mouse) commands""" + pass + + @base.command() + @click.argument("text") + def paste(text): + """Paste text via keyboard HID (supports \\n for newline, \\t for tab)""" + # Decode escape sequences like \n, \t, etc. + decoded_text = text.encode().decode("unicode_escape") + self.paste_text(decoded_text) + click.echo(f"Pasted: {repr(decoded_text)}") + + @base.command() + @click.argument("key") + def press(key): + """Press a single key (supports \\n for Enter, \\t for Tab)""" + # Decode escape sequences + decoded_key = key.encode().decode("unicode_escape") + self.press_key(decoded_key) + click.echo(f"Pressed: {repr(decoded_key)}") + + @base.command() + def reset(): + """Reset the HID subsystem""" + self.reset_hid() + click.echo("HID subsystem reset") + + @base.group() + def mouse(): + """Mouse control commands""" + pass + + @mouse.command() + @click.argument("x", type=int) + @click.argument("y", type=int) + def move(x, y): + """Move mouse to absolute coordinates (0-65535)""" + self.mouse_move_abs(x, y) + click.echo(f"Mouse moved to ({x}, {y})") + + @mouse.command() + @click.argument("dx", type=int) + @click.argument("dy", type=int) + def move_rel(dx, dy): + """Move mouse by relative offset (-127 to 127)""" + self.mouse_move_rel(dx, dy) + click.echo(f"Mouse moved by ({dx}, {dy})") + + @mouse.command(name="click") + @click.option("--button", "-b", default="left", type=click.Choice(["left", "right", "middle"])) + @click.option("--x", type=int, default=None, help="Optional X coordinate") + @click.option("--y", type=int, default=None, help="Optional Y coordinate") + def mouse_click_cmd(button, x, y): + """Click a mouse button""" + self.mouse_click(button, x, y) + if x is not None and y is not None: + click.echo(f"Clicked {button} button at ({x}, {y})") + else: + click.echo(f"Clicked {button} button") + + @mouse.command() + @click.option("--dx", type=int, default=0, help="Horizontal scroll") + @click.option("--dy", type=int, default=-5, help="Vertical scroll") + def scroll(dx, dy): + """Scroll the mouse wheel""" + self.mouse_scroll(dx, dy) + click.echo(f"Scrolled ({dx}, {dy})") + + return base + + +@dataclass(kw_only=True) +class NanoKVMClient(CompositeClient): + """ + Client interface for NanoKVM devices + + This composite client provides access to all NanoKVM functionality: + - video: Video streaming and snapshots + - hid: Keyboard and mouse control + - serial: Serial console access (if enabled) + + Example:: + + # Get a snapshot + image = nanokvm.video.snapshot() + + # Paste text + nanokvm.hid.paste_text("Hello from Jumpstarter!") + + # Get device info + info = nanokvm.get_info() + print(f"Device: {info['mdns']}") + """ + + def get_info(self) -> dict: + """ + Get device information + + Returns: + Dictionary containing device information: + - ips: List of IP addresses + - mdns: mDNS hostname + - image: Image version + - application: Application version + - device_key: Device key + """ + return self.call("get_info") + + def reboot(self): + """ + Reboot the NanoKVM device + + Warning: + This will reboot the NanoKVM itself, not the connected device. + The connection will be lost during reboot. + """ + self.call("reboot") + + def cli(self): + """Create CLI interface with device management and child commands""" + base = super().cli() + + @base.command() + def info(): + """Get device information""" + info = self.get_info() + click.echo("NanoKVM Device Information:") + click.echo(f" mDNS: {info['mdns']}") + click.echo(f" Image version: {info['image']}") + click.echo(f" Application version: {info['application']}") + click.echo(f" Device key: {info['device_key']}") + if info['ips']: + click.echo(" IP Addresses:") + for ip in info['ips']: + click.echo(f" - {ip['name']}: {ip['addr']} ({ip['type']}, {ip['version']})") + + @base.command() + @click.confirmation_option(prompt="Are you sure you want to reboot the NanoKVM device?") + def reboot(): + """Reboot the NanoKVM device""" + self.reboot() + click.echo("NanoKVM device is rebooting...") + + return base diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py new file mode 100644 index 000000000..c249b9f02 --- /dev/null +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py @@ -0,0 +1,391 @@ +import asyncio +from base64 import b64encode +from contextlib import asynccontextmanager +from dataclasses import dataclass, field +from io import BytesIO + +import anyio +from aiohttp import ClientSession +from jumpstarter_driver_composite.driver import Composite +from jumpstarter_driver_pyserial.driver import PySerial +from nanokvm.client import NanoKVMClient as NanoKVMAPIClient + +from jumpstarter.driver import Driver, export, exportstream + + +@dataclass(kw_only=True) +class NanoKVMVideo(Driver): + """NanoKVM Video Streaming driver""" + + host: str + username: str = "admin" + password: str = "admin" + + _client: NanoKVMAPIClient = field(init=False, repr=False, default=None) + _session: ClientSession = field(init=False, repr=False, default=None) + + def __post_init__(self): + if hasattr(super(), "__post_init__"): + super().__post_init__() + + @classmethod + def client(cls) -> str: + return "jumpstarter_driver_nanokvm.client.NanoKVMVideoClient" + + async def _get_client(self) -> NanoKVMAPIClient: + """Get or create the NanoKVM API client""" + if self._client is None: + self._session = ClientSession() + self._client = NanoKVMAPIClient(f"http://{self.host}/api/", self._session) + await self._client.authenticate(self.username, self.password) + return self._client + + def close(self): + """Clean up resources""" + # Schedule cleanup of aiohttp session + if self._session is not None and not self._session.closed: + try: + import asyncio + loop = asyncio.get_event_loop() + if loop.is_running(): + loop.create_task(self._session.close()) + else: + loop.run_until_complete(self._session.close()) + except Exception as e: + self.logger.debug(f"Error closing session: {e}") + + @export + async def snapshot(self) -> str: + """ + Take a snapshot from the video stream + + Returns: + Base64 encoded JPEG image data + """ + client = await self._get_client() + async for frame in client.mjpeg_stream(): + # Get the first frame and return it + buffer = BytesIO() + frame.save(buffer, format="JPEG") + data = buffer.getvalue() + self.logger.debug(f"snapshot: {len(data)} bytes") + return b64encode(data).decode("ascii") + raise RuntimeError("No frames available from video stream") + + @exportstream + @asynccontextmanager + async def stream(self): + """ + Stream video frames as JPEG images + + Yields a stream that provides JPEG image data + """ + self.logger.debug("Starting video stream") + client = await self._get_client() + + # Create a pair of connected streams + send_stream, receive_stream = anyio.create_memory_object_stream(max_buffer_size=10) + + async def stream_video(): + try: + async with send_stream: + async for frame in client.mjpeg_stream(): + buffer = BytesIO() + frame.save(buffer, format="JPEG") + data = buffer.getvalue() + # TODO(mangelajo): this needs to be tested + await send_stream.send(data) + except Exception as e: + self.logger.error(f"Error streaming video: {e}") + + # Start the video streaming task + task = asyncio.create_task(stream_video()) + + try: + yield receive_stream + finally: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + +@dataclass(kw_only=True) +class NanoKVMHID(Driver): + """NanoKVM HID (Keyboard/Mouse) driver""" + + host: str + username: str = "admin" + password: str = "admin" + + _client: NanoKVMAPIClient = field(init=False, repr=False, default=None) + _session: ClientSession = field(init=False, repr=False, default=None) + _ws: object = field(init=False, repr=False, default=None) + + def __post_init__(self): + if hasattr(super(), "__post_init__"): + super().__post_init__() + + @classmethod + def client(cls) -> str: + return "jumpstarter_driver_nanokvm.client.NanoKVMHIDClient" + + async def _get_client(self) -> NanoKVMAPIClient: + """Get or create the NanoKVM API client""" + if self._client is None: + self._session = ClientSession() + self._client = NanoKVMAPIClient(f"http://{self.host}/api/", self._session) + await self._client.authenticate(self.username, self.password) + return self._client + + async def _get_ws(self): + """Get or create WebSocket connection for mouse events""" + if self._ws is None: + client = await self._get_client() + # Connect to WebSocket endpoint with authentication token + ws_url = f"ws://{self.host}/api/ws" + self._ws = await self._session.ws_connect( + ws_url, + headers={"Cookie": f"nano-kvm-token={client.token}"}, + ) + return self._ws + + async def _send_mouse_event(self, event_type: int, x: int, y: int): + """ + Send a mouse event via WebSocket + + Args: + event_type: 0=mouse_up, 1=mouse_down, 2=move_abs, 3=move_rel, 4=scroll + x: X coordinate or movement + y: Y coordinate or movement + """ + ws = await self._get_ws() + message = [2, event_type, x, y] # 2 indicates mouse event + await ws.send_json(message) + self.logger.debug(f"Sent mouse event: {message}") + + def close(self): + """Clean up resources""" + # Schedule cleanup of aiohttp session and websocket + if self._ws is not None: + try: + import asyncio + loop = asyncio.get_event_loop() + if loop.is_running(): + loop.create_task(self._ws.close()) + except Exception as e: + self.logger.debug(f"Error closing websocket: {e}") + + if self._session is not None and not self._session.closed: + try: + import asyncio + loop = asyncio.get_event_loop() + if loop.is_running(): + loop.create_task(self._session.close()) + else: + loop.run_until_complete(self._session.close()) + except Exception as e: + self.logger.debug(f"Error closing session: {e}") + + @export + async def paste_text(self, text: str): + """ + Paste text via keyboard HID simulation + + Args: + text: Text to paste (limited character set supported) + """ + client = await self._get_client() + await client.paste_text(text) + self.logger.info(f"Pasted text: {text}") + + @export + async def press_key(self, key: str): + """ + Press a key by pasting a single character + + Args: + key: Single character or escape sequence to press (e.g., 'a', 'A', '\\n', '\\t') + + Note: + This uses paste_text under the hood, so it supports the same character set. + For special keys like Enter, use '\\n'. For Tab, use '\\t'. + """ + if len(key) > 2: # Allow for escape sequences like \n, \t + self.logger.warning(f"press_key should be used with single characters, got: {key}") + + client = await self._get_client() + await client.paste_text(key) + self.logger.debug(f"Pressed key: {repr(key)}") + + @export + async def reset_hid(self): + """Reset the HID subsystem""" + client = await self._get_client() + await client.reset_hid() + self.logger.info("HID subsystem reset") + + @export + async def mouse_move_abs(self, x: int, y: int): + """ + Move mouse to absolute coordinates + + Args: + x: X coordinate (0-65535, scaled to screen resolution) + y: Y coordinate (0-65535, scaled to screen resolution) + """ + await self._send_mouse_event(2, x, y) + self.logger.debug(f"Mouse moved to absolute position: ({x}, {y})") + + @export + async def mouse_move_rel(self, dx: int, dy: int): + """ + Move mouse relative to current position + + Args: + dx: X movement delta (-127 to 127) + dy: Y movement delta (-127 to 127) + """ + await self._send_mouse_event(3, dx, dy) + self.logger.debug(f"Mouse moved by relative offset: ({dx}, {dy})") + + @export + async def mouse_click(self, button: str = "left", x: int | None = None, y: int | None = None): + """ + Click a mouse button at current position or specified coordinates + + Args: + button: Mouse button to click ("left", "right", "middle") + x: Optional X coordinate for absolute positioning before click + y: Optional Y coordinate for absolute positioning before click + """ + # Map button names to bit flags (left=1, right=2, middle=4) + button_map = {"left": 1, "right": 2, "middle": 4} + button_code = button_map.get(button.lower(), 1) + + # Move to position if coordinates provided + if x is not None and y is not None: + await self.mouse_move_abs(x, y) + # Small delay to ensure position update + await asyncio.sleep(0.05) + + # Send mouse down + await self._send_mouse_event(1, button_code, 0) + # Small delay between down and up + await asyncio.sleep(0.05) + # Send mouse up + await self._send_mouse_event(0, 0, 0) + + self.logger.info(f"Mouse {button} clicked") + + @export + async def mouse_scroll(self, dx: int, dy: int): + """ + Scroll the mouse wheel + + Args: + dx: Horizontal scroll amount + dy: Vertical scroll amount (positive=up, negative=down) + """ + await self._send_mouse_event(4, dx, dy) + self.logger.debug(f"Mouse scrolled: ({dx}, {dy})") + + +@dataclass(kw_only=True) +class NanoKVMSerial(PySerial): + """NanoKVM Serial console access via SSH tunnel""" + + nanokvm_host: str + nanokvm_username: str = "root" + nanokvm_password: str = "root" + nanokvm_ssh_port: int = 22 + + # PySerial will use the SSH tunnel + url: str = field(init=False) + + def __post_init__(self): + # Create an RFC2217 URL that will connect via SSH + # For now, we'll use a simple approach with a localhost tunnel + # This requires the user to set up SSH port forwarding manually + # or we can use paramiko to create the tunnel + self.url = "rfc2217://localhost:2217" + super().__post_init__() + + @classmethod + def client(cls) -> str: + return "jumpstarter_driver_pyserial.client.PySerialClient" + + +@dataclass(kw_only=True) +class NanoKVM(Composite): + """ + Composite driver for NanoKVM devices + + This driver provides: + - Video streaming via the 'video' child driver + - HID (Keyboard/Mouse) control via the 'hid' child driver + - Serial console access via SSH tunnel (optional) + """ + + host: str + username: str = "admin" + password: str = "admin" + + # SSH access for serial console (optional) + ssh_username: str = "root" + ssh_password: str = "root" + ssh_port: int = 22 + + # Optional: provide serial console access + enable_serial: bool = False + + def __post_init__(self): + # Create child drivers + self.children = { + "video": NanoKVMVideo( + host=self.host, + username=self.username, + password=self.password, + ), + "hid": NanoKVMHID( + host=self.host, + username=self.username, + password=self.password, + ), + } + + # Optionally add serial console access + if self.enable_serial: + # Note: This is a placeholder - actual serial console access via SSH + # would require additional implementation in the nanokvm library + self.logger.warning("Serial console access not yet fully implemented") + + super().__post_init__() + + @classmethod + def client(cls) -> str: + return "jumpstarter_driver_nanokvm.client.NanoKVMClient" + + @export + async def get_info(self): + """Get device information""" + # Get info from the video driver's client + video_driver = self.children["video"] + client = await video_driver._get_client() + info = await client.get_info() + return { + "ips": [{"name": ip.name, "addr": ip.addr, "version": ip.version, "type": ip.type} for ip in info.ips], + "mdns": info.mdns, + "image": info.image, + "application": info.application, + "device_key": info.device_key, + } + + @export + async def reboot(self): + """Reboot the NanoKVM device""" + video_driver = self.children["video"] + client = await video_driver._get_client() + await client.reboot_system() + self.logger.info("NanoKVM device rebooted") diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py new file mode 100644 index 000000000..7714134d5 --- /dev/null +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py @@ -0,0 +1,202 @@ +"""Tests for NanoKVM driver""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from PIL import Image + +from .driver import NanoKVM, NanoKVMHID, NanoKVMVideo +from jumpstarter.common.utils import serve + + +@pytest.fixture +def mock_nanokvm_client(): + """Create a mock NanoKVM API client""" + with patch("jumpstarter_driver_nanokvm.driver.NanoKVMAPIClient") as mock_client_class: + mock_client = AsyncMock() + + # Mock authentication + mock_client.authenticate = AsyncMock() + mock_client.logout = AsyncMock() + + # Mock info + mock_info = MagicMock() + mock_info.ips = [] + mock_info.mdns = "nanokvm-test.local" + mock_info.image = "1.0.0" + mock_info.application = "1.0.0" + mock_info.device_key = "test-key" + mock_client.get_info = AsyncMock(return_value=mock_info) + + # Mock video streaming + test_image = Image.new("RGB", (640, 480), color="red") + + async def mock_stream(): + yield test_image + + mock_client.mjpeg_stream = mock_stream + + # Mock HID functions + mock_client.paste_text = AsyncMock() + mock_client.reset_hid = AsyncMock() + + # Mock reboot + mock_client.reboot_system = AsyncMock() + + mock_client_class.return_value = mock_client + yield mock_client + + +@pytest.fixture +def mock_aiohttp_session(): + """Create a mock aiohttp ClientSession""" + with patch("jumpstarter_driver_nanokvm.driver.ClientSession") as mock_session_class: + mock_session = AsyncMock() + mock_session.close = AsyncMock() + mock_session_class.return_value = mock_session + yield mock_session + + +def test_nanokvm_video_snapshot(mock_nanokvm_client, mock_aiohttp_session): + """Test video snapshot functionality""" + video = NanoKVMVideo(host="test.local", username="admin", password="admin") + + with serve(video) as client: + # Get a snapshot + image = client.snapshot() + + # Verify it's a PIL Image + assert isinstance(image, Image.Image) + assert image.size == (640, 480) + + +def test_nanokvm_hid_paste(mock_nanokvm_client, mock_aiohttp_session): + """Test HID paste text functionality""" + hid = NanoKVMHID(host="test.local", username="admin", password="admin") + + with serve(hid) as client: + # Paste some text + client.paste_text("Hello, World!") + + # Verify the mock was called + mock_nanokvm_client.paste_text.assert_called_once_with("Hello, World!") + + +def test_nanokvm_hid_reset(mock_nanokvm_client, mock_aiohttp_session): + """Test HID reset functionality""" + hid = NanoKVMHID(host="test.local", username="admin", password="admin") + + with serve(hid) as client: + # Reset HID + client.reset_hid() + + # Verify the mock was called + mock_nanokvm_client.reset_hid.assert_called_once() + + +def test_nanokvm_hid_press_key(mock_nanokvm_client, mock_aiohttp_session): + """Test key press functionality""" + hid = NanoKVMHID(host="test.local", username="admin", password="admin") + + with serve(hid) as client: + # Press a key + client.press_key("a") + + # Verify paste_text was called with the character + mock_nanokvm_client.paste_text.assert_called_with("a") + + +def test_nanokvm_composite(mock_nanokvm_client, mock_aiohttp_session): + """Test composite NanoKVM driver""" + driver = NanoKVM( + host="test.local", + username="admin", + password="admin", + ) + + with serve(driver) as client: + # Test that children are accessible + assert hasattr(client, "video") + assert hasattr(client, "hid") + + # Test video snapshot through composite + image = client.video.snapshot() + assert isinstance(image, Image.Image) + + # Test HID paste through composite + client.hid.paste_text("Test") + mock_nanokvm_client.paste_text.assert_called_with("Test") + + # Test get_info + info = client.get_info() + assert "mdns" in info + assert info["mdns"] == "nanokvm-test.local" + + +def test_nanokvm_reboot(mock_nanokvm_client, mock_aiohttp_session): + """Test NanoKVM reboot functionality""" + driver = NanoKVM( + host="test.local", + username="admin", + password="admin", + ) + + with serve(driver) as client: + # Test reboot + client.reboot() + mock_nanokvm_client.reboot_system.assert_called_once() + + +def test_nanokvm_video_client_creation(): + """Test that NanoKVMVideo returns correct client class""" + assert NanoKVMVideo.client() == "jumpstarter_driver_nanokvm.client.NanoKVMVideoClient" + + +def test_nanokvm_hid_client_creation(): + """Test that NanoKVMHID returns correct client class""" + assert NanoKVMHID.client() == "jumpstarter_driver_nanokvm.client.NanoKVMHIDClient" + + +def test_nanokvm_client_creation(): + """Test that NanoKVM returns correct client class""" + assert NanoKVM.client() == "jumpstarter_driver_nanokvm.client.NanoKVMClient" + + +def test_nanokvm_mouse_move_abs(mock_nanokvm_client, mock_aiohttp_session): + """Test mouse absolute movement""" + with patch("jumpstarter_driver_nanokvm.driver.ClientSession") as mock_session_class: + mock_ws = AsyncMock() + mock_ws.send_json = AsyncMock() + mock_session = AsyncMock() + mock_session.ws_connect = AsyncMock(return_value=mock_ws) + mock_session.close = AsyncMock() + mock_session_class.return_value = mock_session + + hid = NanoKVMHID(host="test.local", username="admin", password="admin") + + with serve(hid) as client: + # Move mouse to absolute position + client.mouse_move_abs(32768, 32768) + + # Verify WebSocket message was sent + mock_ws.send_json.assert_called() + + +def test_nanokvm_mouse_click(mock_nanokvm_client, mock_aiohttp_session): + """Test mouse click""" + with patch("jumpstarter_driver_nanokvm.driver.ClientSession") as mock_session_class: + mock_ws = AsyncMock() + mock_ws.send_json = AsyncMock() + mock_session = AsyncMock() + mock_session.ws_connect = AsyncMock(return_value=mock_ws) + mock_session.close = AsyncMock() + mock_session_class.return_value = mock_session + + hid = NanoKVMHID(host="test.local", username="admin", password="admin") + + with serve(hid) as client: + # Click left button + client.mouse_click("left") + + # Verify WebSocket messages were sent (down and up) + assert mock_ws.send_json.call_count >= 2 diff --git a/packages/jumpstarter-driver-nanokvm/pyproject.toml b/packages/jumpstarter-driver-nanokvm/pyproject.toml new file mode 100644 index 000000000..260f2db5f --- /dev/null +++ b/packages/jumpstarter-driver-nanokvm/pyproject.toml @@ -0,0 +1,53 @@ +[project] +name = "jumpstarter-driver-nanokvm" +dynamic = ["version", "urls"] +description = "Jumpstarter driver for NanoKVM devices providing video streaming, keyboard/mouse control, and serial console access" +readme = "README.md" +license = "Apache-2.0" +authors = [ + { name = "Miguel Angel Ajo", email = "miguelangel@ajo.es" } +] +requires-python = ">=3.11" +dependencies = [ + "anyio>=4.10.0", + "jumpstarter", + "jumpstarter-driver-composite", + "jumpstarter-driver-pyserial", + "nanokvm @ git+https://github.com/mangelajo/python-nanokvm.git@python-3-11", + "aiohttp", + "pillow", + "pydantic", + "yarl", + "click", +] + +[tool.hatch.version] +source = "vcs" +raw-options = { 'root' = '../../'} + +[tool.hatch.metadata] +allow-direct-references = true + +[tool.hatch.metadata.hooks.vcs.urls] +Homepage = "https://jumpstarter.dev" +source_archive = "https://github.com/jumpstarter-dev/repo/archive/{commit_hash}.zip" + +[tool.pytest.ini_options] +addopts = "--cov --cov-report=html --cov-report=xml" +log_cli = true +log_cli_level = "INFO" +testpaths = ["jumpstarter_driver_nanokvm"] +asyncio_mode = "auto" + +[build-system] +requires = ["hatchling", "hatch-vcs", "hatch-pin-jumpstarter"] +build-backend = "hatchling.build" + +[tool.hatch.build.hooks.pin_jumpstarter] +name = "pin_jumpstarter" + +[dependency-groups] +dev = [ + "pytest-cov>=6.0.0", + "pytest>=8.3.3", +] diff --git a/uv.lock b/uv.lock index 77b11a509..1d3bc49c3 100644 --- a/uv.lock +++ b/uv.lock @@ -22,6 +22,7 @@ members = [ "jumpstarter-driver-http", "jumpstarter-driver-http-power", "jumpstarter-driver-iscsi", + "jumpstarter-driver-nanokvm", "jumpstarter-driver-network", "jumpstarter-driver-opendal", "jumpstarter-driver-power", @@ -1778,6 +1779,48 @@ dev = [ { name = "pytest-cov", specifier = ">=6.0.0" }, ] +[[package]] +name = "jumpstarter-driver-nanokvm" +source = { editable = "packages/jumpstarter-driver-nanokvm" } +dependencies = [ + { name = "aiohttp" }, + { name = "anyio" }, + { name = "click" }, + { name = "jumpstarter" }, + { name = "jumpstarter-driver-composite" }, + { name = "jumpstarter-driver-pyserial" }, + { name = "nanokvm" }, + { name = "pillow" }, + { name = "pydantic" }, + { name = "yarl" }, +] + +[package.dev-dependencies] +dev = [ + { name = "pytest" }, + { name = "pytest-cov" }, +] + +[package.metadata] +requires-dist = [ + { name = "aiohttp" }, + { name = "anyio", specifier = ">=4.10.0" }, + { name = "click" }, + { name = "jumpstarter", editable = "packages/jumpstarter" }, + { name = "jumpstarter-driver-composite", editable = "packages/jumpstarter-driver-composite" }, + { name = "jumpstarter-driver-pyserial", editable = "packages/jumpstarter-driver-pyserial" }, + { name = "nanokvm", git = "https://github.com/mangelajo/python-nanokvm.git?rev=python-3-11" }, + { name = "pillow" }, + { name = "pydantic" }, + { name = "yarl" }, +] + +[package.metadata.requires-dev] +dev = [ + { name = "pytest", specifier = ">=8.3.3" }, + { name = "pytest-cov", specifier = ">=6.0.0" }, +] + [[package]] name = "jumpstarter-driver-network" source = { editable = "packages/jumpstarter-driver-network" } @@ -2817,6 +2860,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/5f/df/76d0321c3797b54b60fef9ec3bd6f4cfd124b9e422182156a1dd418722cf/myst_parser-4.0.1-py3-none-any.whl", hash = "sha256:9134e88959ec3b5780aedf8a99680ea242869d012e8821db3126d427edc9c95d", size = 84579, upload-time = "2025-02-12T10:53:02.078Z" }, ] +[[package]] +name = "nanokvm" +version = "0.0.1" +source = { git = "https://github.com/mangelajo/python-nanokvm.git?rev=python-3-11#e45711d8687a48f21cdc3cba7ffd8317a32d0f2d" } +dependencies = [ + { name = "aiohttp" }, + { name = "cryptography" }, + { name = "paramiko" }, + { name = "pillow" }, + { name = "pydantic" }, + { name = "yarl" }, +] + [[package]] name = "nodeenv" version = "1.9.1" From ad52014abba1c3700c393991c8496cf16e21401c Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Fri, 5 Dec 2025 09:58:33 +0100 Subject: [PATCH 2/8] nanokvm: reauthentication support --- .../jumpstarter_driver_nanokvm/driver.py | 96 ++++++++++++++++--- 1 file changed, 85 insertions(+), 11 deletions(-) diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py index c249b9f02..edec0bf05 100644 --- a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py @@ -2,10 +2,11 @@ from base64 import b64encode from contextlib import asynccontextmanager from dataclasses import dataclass, field +from functools import wraps from io import BytesIO import anyio -from aiohttp import ClientSession +from aiohttp import ClientResponseError, ClientSession from jumpstarter_driver_composite.driver import Composite from jumpstarter_driver_pyserial.driver import PySerial from nanokvm.client import NanoKVMClient as NanoKVMAPIClient @@ -13,6 +14,31 @@ from jumpstarter.driver import Driver, export, exportstream +def _is_unauthorized_error(error: Exception) -> bool: + """Check if an error is a 401 Unauthorized error""" + if isinstance(error, ClientResponseError): + return error.status == 401 + # Also check for string representation in case error is wrapped + error_str = str(error) + return "401" in error_str and ("Unauthorized" in error_str or "unauthorized" in error_str.lower()) + + +def with_reauth(func): + """Decorator to automatically re-authenticate on 401 errors""" + @wraps(func) + async def wrapper(self, *args, **kwargs): + try: + return await func(self, *args, **kwargs) + except Exception as e: + if _is_unauthorized_error(e): + self.logger.warning("Received 401 Unauthorized, re-authenticating...") + await self._reset_client() + # Retry once after re-authentication + return await func(self, *args, **kwargs) + raise + return wrapper + + @dataclass(kw_only=True) class NanoKVMVideo(Driver): """NanoKVM Video Streaming driver""" @@ -32,6 +58,16 @@ def __post_init__(self): def client(cls) -> str: return "jumpstarter_driver_nanokvm.client.NanoKVMVideoClient" + async def _reset_client(self): + """Reset the client and session, forcing re-authentication""" + if self._session is not None and not self._session.closed: + try: + await self._session.close() + except Exception as e: + self.logger.debug(f"Error closing session during reset: {e}") + self._client = None + self._session = None + async def _get_client(self) -> NanoKVMAPIClient: """Get or create the NanoKVM API client""" if self._client is None: @@ -55,6 +91,7 @@ def close(self): self.logger.debug(f"Error closing session: {e}") @export + @with_reauth async def snapshot(self) -> str: """ Take a snapshot from the video stream @@ -96,7 +133,19 @@ async def stream_video(): # TODO(mangelajo): this needs to be tested await send_stream.send(data) except Exception as e: - self.logger.error(f"Error streaming video: {e}") + if _is_unauthorized_error(e): + self.logger.warning("Received 401 Unauthorized during stream, re-authenticating...") + await self._reset_client() + # Retry with new client + new_client = await self._get_client() + async for frame in new_client.mjpeg_stream(): + buffer = BytesIO() + frame.save(buffer, format="JPEG") + data = buffer.getvalue() + await send_stream.send(data) + else: + self.logger.error(f"Error streaming video: {e}") + raise # Start the video streaming task task = asyncio.create_task(stream_video()) @@ -131,6 +180,22 @@ def __post_init__(self): def client(cls) -> str: return "jumpstarter_driver_nanokvm.client.NanoKVMHIDClient" + async def _reset_client(self): + """Reset the client, session, and websocket, forcing re-authentication""" + if self._ws is not None: + try: + await self._ws.close() + except Exception as e: + self.logger.debug(f"Error closing websocket during reset: {e}") + if self._session is not None and not self._session.closed: + try: + await self._session.close() + except Exception as e: + self.logger.debug(f"Error closing session during reset: {e}") + self._client = None + self._session = None + self._ws = None + async def _get_client(self) -> NanoKVMAPIClient: """Get or create the NanoKVM API client""" if self._client is None: @@ -151,6 +216,7 @@ async def _get_ws(self): ) return self._ws + @with_reauth async def _send_mouse_event(self, event_type: int, x: int, y: int): """ Send a mouse event via WebSocket @@ -189,6 +255,7 @@ def close(self): self.logger.debug(f"Error closing session: {e}") @export + @with_reauth async def paste_text(self, text: str): """ Paste text via keyboard HID simulation @@ -201,6 +268,7 @@ async def paste_text(self, text: str): self.logger.info(f"Pasted text: {text}") @export + @with_reauth async def press_key(self, key: str): """ Press a key by pasting a single character @@ -220,6 +288,7 @@ async def press_key(self, key: str): self.logger.debug(f"Pressed key: {repr(key)}") @export + @with_reauth async def reset_hid(self): """Reset the HID subsystem""" client = await self._get_client() @@ -372,15 +441,20 @@ async def get_info(self): """Get device information""" # Get info from the video driver's client video_driver = self.children["video"] - client = await video_driver._get_client() - info = await client.get_info() - return { - "ips": [{"name": ip.name, "addr": ip.addr, "version": ip.version, "type": ip.type} for ip in info.ips], - "mdns": info.mdns, - "image": info.image, - "application": info.application, - "device_key": info.device_key, - } + + @with_reauth + async def _get_info_impl(driver): + client = await driver._get_client() + info = await client.get_info() + return { + "ips": [{"name": ip.name, "addr": ip.addr, "version": ip.version, "type": ip.type} for ip in info.ips], + "mdns": info.mdns, + "image": info.image, + "application": info.application, + "device_key": info.device_key, + } + + return await _get_info_impl(video_driver) @export async def reboot(self): From 75c15fa88d1ead38ad79f89f4b2168b0ac23a979 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Fri, 5 Dec 2025 10:06:57 +0100 Subject: [PATCH 3/8] nanokvm: ignore stale frames --- .../jumpstarter_driver_nanokvm/client.py | 4 ++-- .../jumpstarter_driver_nanokvm/driver.py | 9 +++++++-- .../jumpstarter_driver_nanokvm/driver_test.py | 4 ++++ 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py index b6b681aff..37666d617 100644 --- a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py @@ -19,14 +19,14 @@ class NanoKVMVideoClient(DriverClient): from the NanoKVM device. """ - def snapshot(self) -> Image.Image: + def snapshot(self, skip_frames: int = 3) -> Image.Image: """ Get a snapshot image from the video stream Returns: PIL Image object of the snapshot """ - input_jpg_data = b64decode(self.call("snapshot")) + input_jpg_data = b64decode(self.call("snapshot", skip_frames)) return Image.open(io.BytesIO(input_jpg_data)) def cli(self): diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py index edec0bf05..5c6392084 100644 --- a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py @@ -92,7 +92,7 @@ def close(self): @export @with_reauth - async def snapshot(self) -> str: + async def snapshot(self, skip_frames: int = 3) -> str: """ Take a snapshot from the video stream @@ -100,8 +100,13 @@ async def snapshot(self) -> str: Base64 encoded JPEG image data """ client = await self._get_client() + frame_count = 0 async for frame in client.mjpeg_stream(): - # Get the first frame and return it + frame_count += 1 + # Skip the first frames as it's normally stale + if frame_count < skip_frames: + continue + # Return the second (fresh) frame buffer = BytesIO() frame.save(buffer, format="JPEG") data = buffer.getvalue() diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py index 7714134d5..08a999a84 100644 --- a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py @@ -32,6 +32,10 @@ def mock_nanokvm_client(): test_image = Image.new("RGB", (640, 480), color="red") async def mock_stream(): + # Yield several frames - first ones are buffered/old, later ones are fresh + yield test_image + yield test_image + yield test_image yield test_image mock_client.mjpeg_stream = mock_stream From 1700fa9d3449c1e87196c2ad0a389cf0d550006b Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Fri, 5 Dec 2025 10:11:05 +0100 Subject: [PATCH 4/8] nanokvm: add reference to python-nanokvm --- packages/jumpstarter-driver-nanokvm/README.md | 2 +- packages/jumpstarter-driver-nanokvm/pyproject.toml | 2 +- uv.lock | 10 +++++++--- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/packages/jumpstarter-driver-nanokvm/README.md b/packages/jumpstarter-driver-nanokvm/README.md index 8eb6bf0c2..79aac00a5 100644 --- a/packages/jumpstarter-driver-nanokvm/README.md +++ b/packages/jumpstarter-driver-nanokvm/README.md @@ -1,6 +1,6 @@ # NanoKVM Driver -`jumpstarter-driver-nanokvm` provides comprehensive support for [NanoKVM](https://github.com/sipeed/NanoKVM) devices, enabling remote KVM (Keyboard, Video, Mouse) control over the network. +`jumpstarter-driver-nanokvm` provides comprehensive support for [NanoKVM](https://github.com/sipeed/NanoKVM) devices thanks to the amazing [python-nanokvm](https://github.com/puddly/python-nanokvm) library, enabling remote KVM (Keyboard, Video, Mouse) control over the network. ## Features diff --git a/packages/jumpstarter-driver-nanokvm/pyproject.toml b/packages/jumpstarter-driver-nanokvm/pyproject.toml index 260f2db5f..310f7fd09 100644 --- a/packages/jumpstarter-driver-nanokvm/pyproject.toml +++ b/packages/jumpstarter-driver-nanokvm/pyproject.toml @@ -13,7 +13,7 @@ dependencies = [ "jumpstarter", "jumpstarter-driver-composite", "jumpstarter-driver-pyserial", - "nanokvm @ git+https://github.com/mangelajo/python-nanokvm.git@python-3-11", + "nanokvm>=0.1.0", "aiohttp", "pillow", "pydantic", diff --git a/uv.lock b/uv.lock index 1d3bc49c3..dabf2be13 100644 --- a/uv.lock +++ b/uv.lock @@ -1809,7 +1809,7 @@ requires-dist = [ { name = "jumpstarter", editable = "packages/jumpstarter" }, { name = "jumpstarter-driver-composite", editable = "packages/jumpstarter-driver-composite" }, { name = "jumpstarter-driver-pyserial", editable = "packages/jumpstarter-driver-pyserial" }, - { name = "nanokvm", git = "https://github.com/mangelajo/python-nanokvm.git?rev=python-3-11" }, + { name = "nanokvm", specifier = ">=0.1.0" }, { name = "pillow" }, { name = "pydantic" }, { name = "yarl" }, @@ -2862,8 +2862,8 @@ wheels = [ [[package]] name = "nanokvm" -version = "0.0.1" -source = { git = "https://github.com/mangelajo/python-nanokvm.git?rev=python-3-11#e45711d8687a48f21cdc3cba7ffd8317a32d0f2d" } +version = "0.1.0" +source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "aiohttp" }, { name = "cryptography" }, @@ -2872,6 +2872,10 @@ dependencies = [ { name = "pydantic" }, { name = "yarl" }, ] +sdist = { url = "https://files.pythonhosted.org/packages/36/14/9b899b2fc93902125fe42c8434ecdcbccc600dc8b63feae8142fc58961b5/nanokvm-0.1.0.tar.gz", hash = "sha256:d10fc30ce06b537257c3daf1a7dda965bcd114b839fab617b20d65a0c4124f33", size = 14762, upload-time = "2025-12-05T13:27:19.406Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ee/5d/e9f120406449fec8f79e19b4550ba4c5fdb63c70d7f15a48989984295035/nanokvm-0.1.0-py3-none-any.whl", hash = "sha256:c68d67d1abbcbcf3714320c8787e3f55809abc49e9e55fe9fb72cd0a65a2b1b6", size = 14712, upload-time = "2025-12-05T13:27:17.578Z" }, +] [[package]] name = "nodeenv" From 4b936ecc5b91ec56578c2aad17013f5132c0aca2 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Sat, 20 Dec 2025 23:51:42 +0100 Subject: [PATCH 5/8] nanokvm: fix mouse move/click --- .../jumpstarter_driver_nanokvm/client.py | 59 +++++++++++-------- .../jumpstarter_driver_nanokvm/driver.py | 50 ++++++++++------ 2 files changed, 65 insertions(+), 44 deletions(-) diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py index 37666d617..939ee3249 100644 --- a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py @@ -95,55 +95,64 @@ def reset_hid(self): """ self.call("reset_hid") - def mouse_move_abs(self, x: int, y: int): + def mouse_move_abs(self, x: float, y: float): """ Move mouse to absolute coordinates Args: - x: X coordinate (0-65535, scaled to screen resolution) - y: Y coordinate (0-65535, scaled to screen resolution) + x: X coordinate (0.0 to 1.0, where 0.0 is left and 1.0 is right) + y: Y coordinate (0.0 to 1.0, where 0.0 is top and 1.0 is bottom) Example:: - # Move to center of screen (assuming 1920x1080) - hid.mouse_move_abs(32768, 32768) + # Move to center of screen + hid.mouse_move_abs(0.5, 0.5) + + # Move to top-left corner + hid.mouse_move_abs(0.0, 0.0) + + # Move to bottom-right corner + hid.mouse_move_abs(1.0, 1.0) """ self.call("mouse_move_abs", x, y) - def mouse_move_rel(self, dx: int, dy: int): + def mouse_move_rel(self, dx: float, dy: float): """ Move mouse relative to current position Args: - dx: X movement delta (-127 to 127) - dy: Y movement delta (-127 to 127) + dx: X movement delta (-1.0 to 1.0, where 1.0 is full screen width) + dy: Y movement delta (-1.0 to 1.0, where 1.0 is full screen height) Example:: - # Move right and down - hid.mouse_move_rel(50, 50) + # Move right by 10% of screen width and down by 10% + hid.mouse_move_rel(0.1, 0.1) + + # Move left by 20% + hid.mouse_move_rel(-0.2, 0.0) """ self.call("mouse_move_rel", dx, dy) - def mouse_click(self, button: str = "left", x: int | None = None, y: int | None = None): + def mouse_click(self, button: str = "left", x: float | None = None, y: float | None = None): """ Click a mouse button Args: button: Mouse button to click ("left", "right", "middle") - x: Optional X coordinate for absolute positioning before click - y: Optional Y coordinate for absolute positioning before click + x: Optional X coordinate (0.0 to 1.0) for absolute positioning before click + y: Optional Y coordinate (0.0 to 1.0) for absolute positioning before click Example:: # Click at current position hid.mouse_click("left") - # Click at specific coordinates - hid.mouse_click("left", 32768, 32768) + # Click at center of screen + hid.mouse_click("left", 0.5, 0.5) - # Right-click - hid.mouse_click("right") + # Right-click at specific location + hid.mouse_click("right", 0.75, 0.25) """ if x is not None and y is not None: self.call("mouse_click", button, x, y) @@ -204,25 +213,25 @@ def mouse(): pass @mouse.command() - @click.argument("x", type=int) - @click.argument("y", type=int) + @click.argument("x", type=float) + @click.argument("y", type=float) def move(x, y): - """Move mouse to absolute coordinates (0-65535)""" + """Move mouse to absolute coordinates (0.0-1.0)""" self.mouse_move_abs(x, y) click.echo(f"Mouse moved to ({x}, {y})") @mouse.command() - @click.argument("dx", type=int) - @click.argument("dy", type=int) + @click.argument("dx", type=float) + @click.argument("dy", type=float) def move_rel(dx, dy): - """Move mouse by relative offset (-127 to 127)""" + """Move mouse by relative offset (-1.0 to 1.0, where 1.0 is full screen)""" self.mouse_move_rel(dx, dy) click.echo(f"Mouse moved by ({dx}, {dy})") @mouse.command(name="click") @click.option("--button", "-b", default="left", type=click.Choice(["left", "right", "middle"])) - @click.option("--x", type=int, default=None, help="Optional X coordinate") - @click.option("--y", type=int, default=None, help="Optional Y coordinate") + @click.option("--x", type=float, default=None, help="Optional X coordinate (0.0-1.0)") + @click.option("--y", type=float, default=None, help="Optional Y coordinate (0.0-1.0)") def mouse_click_cmd(button, x, y): """Click a mouse button""" self.mouse_click(button, x, y) diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py index 5c6392084..3c22a1435 100644 --- a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py @@ -222,18 +222,30 @@ async def _get_ws(self): return self._ws @with_reauth - async def _send_mouse_event(self, event_type: int, x: int, y: int): + async def _send_mouse_event(self, event_type: int, button_state: int, x: float, y: float): """ Send a mouse event via WebSocket Args: event_type: 0=mouse_up, 1=mouse_down, 2=move_abs, 3=move_rel, 4=scroll - x: X coordinate or movement - y: Y coordinate or movement + button_state: Button state (0=no buttons, 1=left, 2=right, 4=middle) + x: X coordinate (0.0-1.0 for abs/rel) or scroll amount (int for scroll) + y: Y coordinate (0.0-1.0 for abs/rel) or scroll amount (int for scroll) """ ws = await self._get_ws() - message = [2, event_type, x, y] # 2 indicates mouse event - await ws.send_json(message) + # Scale coordinates for absolute and relative movements + if event_type == 2: # move_abs + x_val = int(x * 32768) + y_val = int(y * 32768) + elif event_type == 3: # move_rel + x_val = int(x * 32768) + y_val = int(y * 32768) + else: + x_val = int(x) + y_val = int(y) + message = [2, event_type, button_state, x_val, y_val] # 2 indicates mouse event + res = await ws.send_json(message) + print(message, res) self.logger.debug(f"Sent mouse event: {message}") def close(self): @@ -301,38 +313,38 @@ async def reset_hid(self): self.logger.info("HID subsystem reset") @export - async def mouse_move_abs(self, x: int, y: int): + async def mouse_move_abs(self, x: float, y: float): """ Move mouse to absolute coordinates Args: - x: X coordinate (0-65535, scaled to screen resolution) - y: Y coordinate (0-65535, scaled to screen resolution) + x: X coordinate (0.0 to 1.0, where 0.0 is left/top and 1.0 is right/bottom) + y: Y coordinate (0.0 to 1.0, where 0.0 is left/top and 1.0 is right/bottom) """ - await self._send_mouse_event(2, x, y) + await self._send_mouse_event(2, 0, x, y) self.logger.debug(f"Mouse moved to absolute position: ({x}, {y})") @export - async def mouse_move_rel(self, dx: int, dy: int): + async def mouse_move_rel(self, dx: float, dy: float): """ Move mouse relative to current position Args: - dx: X movement delta (-127 to 127) - dy: Y movement delta (-127 to 127) + dx: X movement delta (-1.0 to 1.0, where 1.0 is full screen width) + dy: Y movement delta (-1.0 to 1.0, where 1.0 is full screen height) """ - await self._send_mouse_event(3, dx, dy) + await self._send_mouse_event(3, 0, dx, dy) self.logger.debug(f"Mouse moved by relative offset: ({dx}, {dy})") @export - async def mouse_click(self, button: str = "left", x: int | None = None, y: int | None = None): + async def mouse_click(self, button: str = "left", x: float | None = None, y: float | None = None): """ Click a mouse button at current position or specified coordinates Args: button: Mouse button to click ("left", "right", "middle") - x: Optional X coordinate for absolute positioning before click - y: Optional Y coordinate for absolute positioning before click + x: Optional X coordinate (0.0 to 1.0) for absolute positioning before click + y: Optional Y coordinate (0.0 to 1.0) for absolute positioning before click """ # Map button names to bit flags (left=1, right=2, middle=4) button_map = {"left": 1, "right": 2, "middle": 4} @@ -345,11 +357,11 @@ async def mouse_click(self, button: str = "left", x: int | None = None, y: int | await asyncio.sleep(0.05) # Send mouse down - await self._send_mouse_event(1, button_code, 0) + await self._send_mouse_event(1, button_code, 0.0, 0.0) # Small delay between down and up await asyncio.sleep(0.05) # Send mouse up - await self._send_mouse_event(0, 0, 0) + await self._send_mouse_event(0, 0, 0.0, 0.0) self.logger.info(f"Mouse {button} clicked") @@ -362,7 +374,7 @@ async def mouse_scroll(self, dx: int, dy: int): dx: Horizontal scroll amount dy: Vertical scroll amount (positive=up, negative=down) """ - await self._send_mouse_event(4, dx, dy) + await self._send_mouse_event(4, 0, float(dx), float(dy)) self.logger.debug(f"Mouse scrolled: ({dx}, {dy})") From 35bfec3747344b448a394ec230aa5c1cdbea6319 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Sun, 21 Dec 2025 00:26:24 +0100 Subject: [PATCH 6/8] nanokvm: add image management --- packages/jumpstarter-driver-nanokvm/README.md | 109 +++++++++-- .../jumpstarter_driver_nanokvm/client.py | 177 +++++++++++++++++- .../jumpstarter_driver_nanokvm/driver.py | 132 ++++++++++++- 3 files changed, 402 insertions(+), 16 deletions(-) diff --git a/packages/jumpstarter-driver-nanokvm/README.md b/packages/jumpstarter-driver-nanokvm/README.md index 79aac00a5..09f8eab6c 100644 --- a/packages/jumpstarter-driver-nanokvm/README.md +++ b/packages/jumpstarter-driver-nanokvm/README.md @@ -8,10 +8,15 @@ - **Snapshot Capture**: Take screenshots of the video stream - **Keyboard Control**: Send text and keystrokes via HID emulation - **Mouse Control**: Full mouse support via WebSocket - - Absolute positioning (0-65535 coordinate system) - - Relative movement + - Absolute positioning (0.0-1.0 normalized coordinates) + - Relative movement (0.0-1.0 normalized, where 1.0 = full screen) - Left/right/middle button clicks - Mouse wheel scrolling +- **Image Management**: Virtual disk and CD-ROM control + - Mount/unmount disk and CD-ROM images + - Download images from URLs + - Check mounted image status + - Monitor download progress - **Device Management**: Get device info, reboot the NanoKVM - **Composite Driver**: Access all functionality through a unified interface @@ -79,7 +84,7 @@ The NanoKVM driver is a composite driver that provides three main interfaces: ```{eval-rst} .. autoclass:: jumpstarter_driver_nanokvm.client.NanoKVMClient() - :members: get_info, reboot + :members: get_info, reboot, mount_image, download_image, get_mounted_image, get_cdrom_status, is_image_download_enabled, get_image_download_status ``` ### NanoKVMVideoClient @@ -151,11 +156,14 @@ j nanokvm hid reset #### Mouse Commands ```bash -# Move mouse to absolute coordinates (0-65535, scaled to screen) -j nanokvm hid mouse move 32768 32768 # Center of screen +# Move mouse to absolute coordinates (0.0-1.0, where 0.0=top/left, 1.0=bottom/right) +j nanokvm hid mouse move 0.5 0.5 # Center of screen +j nanokvm hid mouse move 0.0 0.0 # Top-left corner +j nanokvm hid mouse move 1.0 1.0 # Bottom-right corner -# Move mouse relatively (-127 to 127) -j nanokvm hid mouse move-rel 50 50 # Move right and down +# Move mouse relatively (-1.0 to 1.0, where 1.0 = full screen width/height) +j nanokvm hid mouse move-rel 0.1 0.1 # Move right and down by 10% of screen +j nanokvm hid mouse move-rel -0.2 0.0 # Move left by 20% of screen width # Click at current position (default: left button) j nanokvm hid mouse click @@ -164,7 +172,7 @@ j nanokvm hid mouse click j nanokvm hid mouse click --button right # Click at specific coordinates -j nanokvm hid mouse click --x 32768 --y 32768 --button left +j nanokvm hid mouse click --x 0.5 --y 0.5 --button left # Scroll (default: down 5 units) j nanokvm hid mouse scroll @@ -176,6 +184,34 @@ j nanokvm hid mouse scroll --dy 5 j nanokvm hid mouse scroll --dy -5 ``` +### Image Management Commands + +```bash +# Mount a disk image +j nanokvm image mount /data/myimage.img + +# Mount a CD-ROM image +j nanokvm image mount /data/installer.iso --cdrom + +# Unmount current image +j nanokvm image unmount + +# Check mounted image status +j nanokvm image status + +# Check if mounted image is in CD-ROM mode +j nanokvm image cdrom-status + +# Download an image from URL +j nanokvm image download https://example.com/image.iso + +# Check if image downloads are enabled +j nanokvm image download-enabled + +# Check download progress +j nanokvm image download-status +``` + ### Example Session ```bash @@ -185,6 +221,14 @@ jmp shell -l my=device # Inside the shell, use the commands j nanokvm info j nanokvm video snapshot my_screen.jpg + +# Mount a CD-ROM image +j nanokvm image mount /data/installer.iso --cdrom +j nanokvm image status + +# Control the mouse and keyboard +j nanokvm hid mouse move 0.5 0.5 +j nanokvm hid mouse click j nanokvm hid paste "echo 'Hello from NanoKVM'\n" ``` @@ -216,17 +260,24 @@ nanokvm.hid.press_key("\t") # Tab ### Mouse Control ```python -# Move mouse to center of screen -nanokvm.hid.mouse_move_abs(32768, 32768) +# Move mouse to center of screen (normalized 0.0-1.0 coordinates) +nanokvm.hid.mouse_move_abs(0.5, 0.5) + +# Move to top-left corner +nanokvm.hid.mouse_move_abs(0.0, 0.0) + +# Move to bottom-right corner +nanokvm.hid.mouse_move_abs(1.0, 1.0) # Click left button nanokvm.hid.mouse_click("left") # Click at specific coordinates -nanokvm.hid.mouse_click("left", x=32768, y=16384) +nanokvm.hid.mouse_click("left", x=0.5, y=0.25) -# Move mouse relatively -nanokvm.hid.mouse_move_rel(50, 50) # Move right and down +# Move mouse relatively (normalized -1.0 to 1.0, where 1.0 = full screen) +nanokvm.hid.mouse_move_rel(0.1, 0.1) # Move right/down by 10% of screen +nanokvm.hid.mouse_move_rel(-0.2, 0.0) # Move left by 20% of screen width # Scroll up nanokvm.hid.mouse_scroll(0, 5) @@ -235,6 +286,38 @@ nanokvm.hid.mouse_scroll(0, 5) nanokvm.hid.mouse_scroll(0, -5) ``` +### Image Management + +```python +# Mount a disk image +nanokvm.mount_image("/data/myimage.img", cdrom=False) + +# Mount a CD-ROM image +nanokvm.mount_image("/data/installer.iso", cdrom=True) + +# Unmount current image +nanokvm.mount_image("") + +# Get mounted image info +file = nanokvm.get_mounted_image() +if file: + print(f"Mounted: {file}") + is_cdrom = nanokvm.get_cdrom_status() + print(f"Mode: {'CD-ROM' if is_cdrom else 'Disk'}") + +# Download an image +status = nanokvm.download_image("https://example.com/image.iso") +print(f"Download: {status['status']}, File: {status['file']}") + +# Check if downloads are enabled +if nanokvm.is_image_download_enabled(): + print("Downloads are available") + +# Monitor download progress +status = nanokvm.get_image_download_status() +print(f"Status: {status['status']}, Progress: {status['percentage']}") +``` + ### Device Management ```python diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py index 939ee3249..d937ce91a 100644 --- a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py @@ -298,7 +298,106 @@ def reboot(self): """ self.call("reboot") - def cli(self): + def mount_image(self, file: str = "", cdrom: bool = False): + """ + Mount an image file or unmount if file is empty string + + Args: + file: Path to image file on the NanoKVM device, or empty string to unmount + cdrom: Whether to mount as CD-ROM (True) or disk (False) + + Note: + Unmounting may fail if image is currently in use. If unmount fails, + you may need to power cycle the connected device first. + + Example:: + + # Mount a disk image + nanokvm.mount_image("/path/to/disk.img", cdrom=False) + + # Mount a CD-ROM image + nanokvm.mount_image("/path/to/cdrom.iso", cdrom=True) + + # Unmount + nanokvm.mount_image("") or nanokvm.mount_image() + """ + self.call("mount_image", file, cdrom) + + def download_image(self, url: str) -> dict: + """ + Start downloading an image from a URL + + Args: + url: URL of the image to download + + Returns: + Dictionary with download status, file, and percentage + + Example:: + + status = nanokvm.download_image("https://example.com/image.iso") + print(f"Download: {status['status']}, File: {status['file']}, {status['percentage']}%") + """ + return self.call("download_image", url) + + def get_mounted_image(self) -> str | None: + """ + Get information about mounted image + + Returns: + String with mounted image file path, or None if no image mounted + + Example:: + + file = nanokvm.get_mounted_image() + if file: + print(f"Mounted: {file}") + """ + return self.call("get_mounted_image") + + def get_cdrom_status(self) -> bool: + """ + Check if the mounted image is in CD-ROM mode + + Returns: + Boolean indicating if CD-ROM mode is active (True=CD-ROM, False=disk) + + Example:: + + if nanokvm.get_cdrom_status(): + print("CD-ROM mode is enabled") + """ + return self.call("get_cdrom_status") + + def is_image_download_enabled(self) -> bool: + """ + Check if the /data partition allows image downloads + + Returns: + Boolean indicating if image downloads are enabled + + Example:: + + if nanokvm.is_image_download_enabled(): + print("Image downloads are available") + """ + return self.call("is_image_download_enabled") + + def get_image_download_status(self) -> dict: + """ + Get the status of an ongoing image download + + Returns: + Dictionary with download status, file, and percentage complete + + Example:: + + status = nanokvm.get_image_download_status() + print(f"Status: {status['status']}, File: {status['file']}, {status['percentage']}%") + """ + return self.call("get_image_download_status") + + def cli(self): # noqa: C901 """Create CLI interface with device management and child commands""" base = super().cli() @@ -323,4 +422,80 @@ def reboot(): self.reboot() click.echo("NanoKVM device is rebooting...") + @base.group() + def image(): + """Image management commands""" + pass + + @image.command() + @click.argument("file") + @click.option("--cdrom", is_flag=True, help="Mount as CD-ROM instead of disk") + def mount(file, cdrom): + """Mount an image file""" + self.mount_image(file, cdrom) + image_type = "CD-ROM" if cdrom else "disk" + click.echo(f"Mounted {file} as {image_type}") + + @image.command() + def unmount(): + """Unmount the currently mounted image + + Note: Unmount may fail if image is in use by the connected device. + Power cycle the device first if unmount fails. + """ + try: + self.mount_image("") + click.echo("Image unmounted successfully") + except Exception as e: + click.echo(f"Failed to unmount image: {e}", err=True) + click.echo("Note: Image may be in use. Try power cycling the connected device first.", err=True) + raise + + @image.command() + @click.argument("url") + def download(url): + """Download an image from URL""" + status = self.download_image(url) + click.echo(f"Download started: {status['status']}") + if status['file']: + click.echo(f"File: {status['file']}") + if status['percentage']: + click.echo(f"Progress: {status['percentage']}%") + + @image.command() + def status(): + """Show mounted image status""" + file = self.get_mounted_image() + if file: + is_cdrom = self.get_cdrom_status() + mode = "CD-ROM" if is_cdrom else "Disk" + click.echo(f"Mounted: {file}") + click.echo(f"Mode: {mode}") + else: + click.echo("No image mounted") + + @image.command() + def cdrom_status(): + """Check if mounted image is in CD-ROM mode""" + is_cdrom = self.get_cdrom_status() + mode = "CD-ROM" if is_cdrom else "Disk" + click.echo(f"Current mode: {mode}") + + @image.command() + def download_enabled(): + """Check if image downloads are enabled""" + enabled = self.is_image_download_enabled() + status = "enabled" if enabled else "disabled" + click.echo(f"Image downloads: {status}") + + @image.command() + def download_status(): + """Get current image download status""" + status = self.get_image_download_status() + click.echo(f"Status: {status['status']}") + if status['file']: + click.echo(f"File: {status['file']}") + if status['percentage']: + click.echo(f"Progress: {status['percentage']}") + return base diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py index 3c22a1435..713b41c1b 100644 --- a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py @@ -244,8 +244,7 @@ async def _send_mouse_event(self, event_type: int, button_state: int, x: float, x_val = int(x) y_val = int(y) message = [2, event_type, button_state, x_val, y_val] # 2 indicates mouse event - res = await ws.send_json(message) - print(message, res) + await ws.send_json(message) self.logger.debug(f"Sent mouse event: {message}") def close(self): @@ -480,3 +479,132 @@ async def reboot(self): client = await video_driver._get_client() await client.reboot_system() self.logger.info("NanoKVM device rebooted") + + @export + async def mount_image(self, file: str = "", cdrom: bool = False): + """ + Mount an image file or unmount if file is empty string + + Args: + file: Path to image file on the NanoKVM device, or empty string to unmount + cdrom: Whether to mount as CD-ROM (True) or disk (False) + """ + video_driver = self.children["video"] + + @with_reauth + async def _mount_impl(driver): + client = await driver._get_client() + # Pass empty string or None for unmount - API expects empty string + mount_file = file if file else "" + # When unmounting, we need to pass the file as empty string or None + await client.mount_image(file=mount_file or None, cdrom=cdrom if mount_file else False) + + await _mount_impl(video_driver) + if file: + self.logger.info(f"Mounted image: {file} (cdrom={cdrom})") + else: + self.logger.info("Unmounted image") + + @export + async def download_image(self, url: str): + """ + Start downloading an image from a URL + + Args: + url: URL of the image to download + + Returns: + Dictionary with download status information + """ + video_driver = self.children["video"] + + @with_reauth + async def _download_impl(driver): + client = await driver._get_client() + status = await client.download_image(url=url) + return { + "status": status.status, + "file": status.file, + "percentage": status.percentage, + } + + result = await _download_impl(video_driver) + self.logger.info(f"Started download from {url}") + return result + + @export + async def get_mounted_image(self): + """ + Get information about mounted image + + Returns: + String with mounted image file path, or None if no image mounted + """ + video_driver = self.children["video"] + + @with_reauth + async def _get_mounted_impl(driver): + client = await driver._get_client() + info = await client.get_mounted_image() + return info.file + + return await _get_mounted_impl(video_driver) + + @export + async def get_cdrom_status(self): + """ + Check if the mounted image is in CD-ROM mode + + Returns: + Boolean indicating if CD-ROM mode is active (True=CD-ROM, False=disk) + """ + video_driver = self.children["video"] + + @with_reauth + async def _get_cdrom_status_impl(driver): + client = await driver._get_client() + status = await client.get_cdrom_status() + return bool(status.cdrom) + + return await _get_cdrom_status_impl(video_driver) + + @export + async def is_image_download_enabled(self): + """ + Check if the /data partition allows image downloads + + Returns: + Boolean indicating if image downloads are enabled + """ + video_driver = self.children["video"] + + @with_reauth + async def _is_download_enabled_impl(driver): + client = await driver._get_client() + status = await client.is_image_download_enabled() + return status.enabled + + return await _is_download_enabled_impl(video_driver) + + @export + async def get_image_download_status(self): + """ + Get the status of an ongoing image download + + Returns: + Dictionary with download status, file, and percentage complete + """ + video_driver = self.children["video"] + + @with_reauth + async def _get_download_status_impl(driver): + client = await driver._get_client() + status = await client.get_image_download_status() + return { + "status": status.status, + "file": status.file, + "percentage": status.percentage, + } + + return await _get_download_status_impl(video_driver) + From e3f15bd508b9aa042ced7a052f44368634cbe6b6 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Sun, 21 Dec 2025 21:02:05 +0100 Subject: [PATCH 7/8] nanokvm: add get_images --- .../jumpstarter_driver_nanokvm/client.py | 52 ++++- .../jumpstarter_driver_nanokvm/driver.py | 189 ++++++++---------- .../jumpstarter_driver_nanokvm/driver_test.py | 27 +++ .../jumpstarter-driver-nanokvm/pyproject.toml | 2 +- uv.lock | 10 +- 5 files changed, 157 insertions(+), 123 deletions(-) diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py index d937ce91a..be603dad4 100644 --- a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/client.py @@ -4,11 +4,15 @@ import click from jumpstarter_driver_composite.client import CompositeClient +from nanokvm.models import MouseButton from PIL import Image from jumpstarter.client import DriverClient from jumpstarter.client.decorators import driver_click_group +# Re-export MouseButton for convenience +__all__ = ["NanoKVMVideoClient", "NanoKVMHIDClient", "NanoKVMClient", "MouseButton"] + @dataclass(kw_only=True) class NanoKVMVideoClient(DriverClient): @@ -134,25 +138,24 @@ def mouse_move_rel(self, dx: float, dy: float): """ self.call("mouse_move_rel", dx, dy) - def mouse_click(self, button: str = "left", x: float | None = None, y: float | None = None): + def mouse_click(self, button: MouseButton | str = "left", x: float | None = None, y: float | None = None): """ Click a mouse button Args: - button: Mouse button to click ("left", "right", "middle") + button: Mouse button to click (MouseButton enum or "left", "right", "middle" string) x: Optional X coordinate (0.0 to 1.0) for absolute positioning before click y: Optional Y coordinate (0.0 to 1.0) for absolute positioning before click Example:: - # Click at current position + # Using string (backward compatible) hid.mouse_click("left") - - # Click at center of screen hid.mouse_click("left", 0.5, 0.5) - # Right-click at specific location - hid.mouse_click("right", 0.75, 0.25) + # Using MouseButton enum (recommended) + hid.mouse_click(MouseButton.LEFT) + hid.mouse_click(MouseButton.RIGHT, 0.75, 0.25) """ if x is not None and y is not None: self.call("mouse_click", button, x, y) @@ -234,7 +237,14 @@ def move_rel(dx, dy): @click.option("--y", type=float, default=None, help="Optional Y coordinate (0.0-1.0)") def mouse_click_cmd(button, x, y): """Click a mouse button""" - self.mouse_click(button, x, y) + # Convert string to MouseButton enum + button_map = { + "left": MouseButton.LEFT, + "right": MouseButton.RIGHT, + "middle": MouseButton.MIDDLE, + } + button_enum = button_map[button] + self.mouse_click(button_enum, x, y) if x is not None and y is not None: click.echo(f"Clicked {button} button at ({x}, {y})") else: @@ -397,6 +407,21 @@ def get_image_download_status(self) -> dict: """ return self.call("get_image_download_status") + def get_images(self) -> list[str]: + """ + Get the list of available image files + + Returns: + List of image file paths available on the NanoKVM device + + Example:: + + images = nanokvm.get_images() + for image in images: + print(f"Available: {image}") + """ + return self.call("get_images") + def cli(self): # noqa: C901 """Create CLI interface with device management and child commands""" base = super().cli() @@ -498,4 +523,15 @@ def download_status(): if status['percentage']: click.echo(f"Progress: {status['percentage']}") + @image.command() + def list(): + """List available image files""" + images = self.get_images() + if images: + click.echo("Available images:") + for img in images: + click.echo(f" - {img}") + else: + click.echo("No images available") + return base diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py index 713b41c1b..bc0c532c0 100644 --- a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py @@ -6,13 +6,17 @@ from io import BytesIO import anyio -from aiohttp import ClientResponseError, ClientSession +from aiohttp import ClientResponseError from jumpstarter_driver_composite.driver import Composite from jumpstarter_driver_pyserial.driver import PySerial from nanokvm.client import NanoKVMClient as NanoKVMAPIClient +from nanokvm.models import MouseButton from jumpstarter.driver import Driver, export, exportstream +# Re-export MouseButton for convenience +__all__ = ["NanoKVMVideo", "NanoKVMHID", "NanoKVMSerial", "NanoKVM", "MouseButton"] + def _is_unauthorized_error(error: Exception) -> bool: """Check if an error is a 401 Unauthorized error""" @@ -47,8 +51,8 @@ class NanoKVMVideo(Driver): username: str = "admin" password: str = "admin" - _client: NanoKVMAPIClient = field(init=False, repr=False, default=None) - _session: ClientSession = field(init=False, repr=False, default=None) + _client: NanoKVMAPIClient | None = field(init=False, repr=False, default=None) + _client_ctx: object = field(init=False, repr=False, default=None) def __post_init__(self): if hasattr(super(), "__post_init__"): @@ -59,36 +63,39 @@ def client(cls) -> str: return "jumpstarter_driver_nanokvm.client.NanoKVMVideoClient" async def _reset_client(self): - """Reset the client and session, forcing re-authentication""" - if self._session is not None and not self._session.closed: + """Reset the client, forcing re-authentication""" + if self._client is not None: try: - await self._session.close() + await self._client.close() except Exception as e: - self.logger.debug(f"Error closing session during reset: {e}") + self.logger.debug(f"Error closing client during reset: {e}") self._client = None - self._session = None + self._client_ctx = None async def _get_client(self) -> NanoKVMAPIClient: - """Get or create the NanoKVM API client""" + """Get or create the NanoKVM API client using context manager""" if self._client is None: - self._session = ClientSession() - self._client = NanoKVMAPIClient(f"http://{self.host}/api/", self._session) + # Create a new client context manager + self._client_ctx = NanoKVMAPIClient(f"http://{self.host}/api/") + # Enter the context manager + self._client = await self._client_ctx.__aenter__() + # Authenticate await self._client.authenticate(self.username, self.password) return self._client def close(self): """Clean up resources""" - # Schedule cleanup of aiohttp session - if self._session is not None and not self._session.closed: + # Schedule cleanup of client + if self._client is not None: try: import asyncio loop = asyncio.get_event_loop() if loop.is_running(): - loop.create_task(self._session.close()) + loop.create_task(self._client_ctx.__aexit__(None, None, None)) else: - loop.run_until_complete(self._session.close()) + loop.run_until_complete(self._client_ctx.__aexit__(None, None, None)) except Exception as e: - self.logger.debug(f"Error closing session: {e}") + self.logger.debug(f"Error closing client: {e}") @export @with_reauth @@ -173,9 +180,8 @@ class NanoKVMHID(Driver): username: str = "admin" password: str = "admin" - _client: NanoKVMAPIClient = field(init=False, repr=False, default=None) - _session: ClientSession = field(init=False, repr=False, default=None) - _ws: object = field(init=False, repr=False, default=None) + _client: NanoKVMAPIClient | None = field(init=False, repr=False, default=None) + _client_ctx: object = field(init=False, repr=False, default=None) def __post_init__(self): if hasattr(super(), "__post_init__"): @@ -186,89 +192,39 @@ def client(cls) -> str: return "jumpstarter_driver_nanokvm.client.NanoKVMHIDClient" async def _reset_client(self): - """Reset the client, session, and websocket, forcing re-authentication""" - if self._ws is not None: - try: - await self._ws.close() - except Exception as e: - self.logger.debug(f"Error closing websocket during reset: {e}") - if self._session is not None and not self._session.closed: + """Reset the client, forcing re-authentication""" + if self._client is not None: try: - await self._session.close() + await self._client.close() except Exception as e: - self.logger.debug(f"Error closing session during reset: {e}") + self.logger.debug(f"Error closing client during reset: {e}") self._client = None - self._session = None - self._ws = None + self._client_ctx = None async def _get_client(self) -> NanoKVMAPIClient: - """Get or create the NanoKVM API client""" + """Get or create the NanoKVM API client using context manager""" if self._client is None: - self._session = ClientSession() - self._client = NanoKVMAPIClient(f"http://{self.host}/api/", self._session) + # Create a new client context manager + self._client_ctx = NanoKVMAPIClient(f"http://{self.host}/api/") + # Enter the context manager + self._client = await self._client_ctx.__aenter__() + # Authenticate await self._client.authenticate(self.username, self.password) return self._client - async def _get_ws(self): - """Get or create WebSocket connection for mouse events""" - if self._ws is None: - client = await self._get_client() - # Connect to WebSocket endpoint with authentication token - ws_url = f"ws://{self.host}/api/ws" - self._ws = await self._session.ws_connect( - ws_url, - headers={"Cookie": f"nano-kvm-token={client.token}"}, - ) - return self._ws - - @with_reauth - async def _send_mouse_event(self, event_type: int, button_state: int, x: float, y: float): - """ - Send a mouse event via WebSocket - - Args: - event_type: 0=mouse_up, 1=mouse_down, 2=move_abs, 3=move_rel, 4=scroll - button_state: Button state (0=no buttons, 1=left, 2=right, 4=middle) - x: X coordinate (0.0-1.0 for abs/rel) or scroll amount (int for scroll) - y: Y coordinate (0.0-1.0 for abs/rel) or scroll amount (int for scroll) - """ - ws = await self._get_ws() - # Scale coordinates for absolute and relative movements - if event_type == 2: # move_abs - x_val = int(x * 32768) - y_val = int(y * 32768) - elif event_type == 3: # move_rel - x_val = int(x * 32768) - y_val = int(y * 32768) - else: - x_val = int(x) - y_val = int(y) - message = [2, event_type, button_state, x_val, y_val] # 2 indicates mouse event - await ws.send_json(message) - self.logger.debug(f"Sent mouse event: {message}") - def close(self): """Clean up resources""" - # Schedule cleanup of aiohttp session and websocket - if self._ws is not None: - try: - import asyncio - loop = asyncio.get_event_loop() - if loop.is_running(): - loop.create_task(self._ws.close()) - except Exception as e: - self.logger.debug(f"Error closing websocket: {e}") - - if self._session is not None and not self._session.closed: + # Schedule cleanup of client + if self._client is not None: try: import asyncio loop = asyncio.get_event_loop() if loop.is_running(): - loop.create_task(self._session.close()) + loop.create_task(self._client_ctx.__aexit__(None, None, None)) else: - loop.run_until_complete(self._session.close()) + loop.run_until_complete(self._client_ctx.__aexit__(None, None, None)) except Exception as e: - self.logger.debug(f"Error closing session: {e}") + self.logger.debug(f"Error closing client: {e}") @export @with_reauth @@ -312,6 +268,7 @@ async def reset_hid(self): self.logger.info("HID subsystem reset") @export + @with_reauth async def mouse_move_abs(self, x: float, y: float): """ Move mouse to absolute coordinates @@ -320,10 +277,12 @@ async def mouse_move_abs(self, x: float, y: float): x: X coordinate (0.0 to 1.0, where 0.0 is left/top and 1.0 is right/bottom) y: Y coordinate (0.0 to 1.0, where 0.0 is left/top and 1.0 is right/bottom) """ - await self._send_mouse_event(2, 0, x, y) + client = await self._get_client() + await client.mouse_move_abs(x, y) self.logger.debug(f"Mouse moved to absolute position: ({x}, {y})") @export + @with_reauth async def mouse_move_rel(self, dx: float, dy: float): """ Move mouse relative to current position @@ -332,39 +291,36 @@ async def mouse_move_rel(self, dx: float, dy: float): dx: X movement delta (-1.0 to 1.0, where 1.0 is full screen width) dy: Y movement delta (-1.0 to 1.0, where 1.0 is full screen height) """ - await self._send_mouse_event(3, 0, dx, dy) + client = await self._get_client() + await client.mouse_move_rel(dx, dy) self.logger.debug(f"Mouse moved by relative offset: ({dx}, {dy})") @export - async def mouse_click(self, button: str = "left", x: float | None = None, y: float | None = None): + @with_reauth + async def mouse_click(self, button: MouseButton | str = "left", x: float | None = None, y: float | None = None): """ Click a mouse button at current position or specified coordinates Args: - button: Mouse button to click ("left", "right", "middle") + button: Mouse button to click (MouseButton enum or "left", "right", "middle" string) x: Optional X coordinate (0.0 to 1.0) for absolute positioning before click y: Optional Y coordinate (0.0 to 1.0) for absolute positioning before click """ - # Map button names to bit flags (left=1, right=2, middle=4) - button_map = {"left": 1, "right": 2, "middle": 4} - button_code = button_map.get(button.lower(), 1) - - # Move to position if coordinates provided - if x is not None and y is not None: - await self.mouse_move_abs(x, y) - # Small delay to ensure position update - await asyncio.sleep(0.05) - - # Send mouse down - await self._send_mouse_event(1, button_code, 0.0, 0.0) - # Small delay between down and up - await asyncio.sleep(0.05) - # Send mouse up - await self._send_mouse_event(0, 0, 0.0, 0.0) + # Convert string to MouseButton enum for backward compatibility + if isinstance(button, str): + button_map = { + "left": MouseButton.LEFT, + "right": MouseButton.RIGHT, + "middle": MouseButton.MIDDLE, + } + button = button_map.get(button.lower(), MouseButton.LEFT) - self.logger.info(f"Mouse {button} clicked") + client = await self._get_client() + await client.mouse_click(button, x, y) + self.logger.info(f"Mouse {button.name} clicked") @export + @with_reauth async def mouse_scroll(self, dx: int, dy: int): """ Scroll the mouse wheel @@ -373,7 +329,8 @@ async def mouse_scroll(self, dx: int, dy: int): dx: Horizontal scroll amount dy: Vertical scroll amount (positive=up, negative=down) """ - await self._send_mouse_event(4, 0, float(dx), float(dy)) + client = await self._get_client() + await client.mouse_scroll(dx, dy) self.logger.debug(f"Mouse scrolled: ({dx}, {dy})") @@ -608,3 +565,21 @@ async def _get_download_status_impl(driver): return await _get_download_status_impl(video_driver) + @export + async def get_images(self): + """ + Get the list of available image files + + Returns: + List of image file paths available on the NanoKVM device + """ + video_driver = self.children["video"] + + @with_reauth + async def _get_images_impl(driver): + client = await driver._get_client() + images = await client.get_images() + return images.files + + return await _get_images_impl(video_driver) + diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py index 08a999a84..99e43c0f4 100644 --- a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py @@ -47,6 +47,11 @@ async def mock_stream(): # Mock reboot mock_client.reboot_system = AsyncMock() + # Mock image management + mock_images = MagicMock() + mock_images.files = ["/data/alpine-standard-3.23.2-x86_64.iso", "/data/cs10-js.iso"] + mock_client.get_images = AsyncMock(return_value=mock_images) + mock_client_class.return_value = mock_client yield mock_client @@ -204,3 +209,25 @@ def test_nanokvm_mouse_click(mock_nanokvm_client, mock_aiohttp_session): # Verify WebSocket messages were sent (down and up) assert mock_ws.send_json.call_count >= 2 + + +def test_nanokvm_get_images(mock_nanokvm_client, mock_aiohttp_session): + """Test getting list of available images""" + driver = NanoKVM( + host="test.local", + username="admin", + password="admin", + ) + + with serve(driver) as client: + # Get list of images + images = client.get_images() + + # Verify the result + assert isinstance(images, list) + assert len(images) == 2 + assert "/data/alpine-standard-3.23.2-x86_64.iso" in images + assert "/data/cs10-js.iso" in images + + # Verify the mock was called + mock_nanokvm_client.get_images.assert_called_once() diff --git a/packages/jumpstarter-driver-nanokvm/pyproject.toml b/packages/jumpstarter-driver-nanokvm/pyproject.toml index 310f7fd09..f3171a93a 100644 --- a/packages/jumpstarter-driver-nanokvm/pyproject.toml +++ b/packages/jumpstarter-driver-nanokvm/pyproject.toml @@ -13,7 +13,7 @@ dependencies = [ "jumpstarter", "jumpstarter-driver-composite", "jumpstarter-driver-pyserial", - "nanokvm>=0.1.0", + "nanokvm @ git+https://github.com/mangelajo/python-nanokvm.git@dev", "aiohttp", "pillow", "pydantic", diff --git a/uv.lock b/uv.lock index dabf2be13..8d16656e5 100644 --- a/uv.lock +++ b/uv.lock @@ -1809,7 +1809,7 @@ requires-dist = [ { name = "jumpstarter", editable = "packages/jumpstarter" }, { name = "jumpstarter-driver-composite", editable = "packages/jumpstarter-driver-composite" }, { name = "jumpstarter-driver-pyserial", editable = "packages/jumpstarter-driver-pyserial" }, - { name = "nanokvm", specifier = ">=0.1.0" }, + { name = "nanokvm", git = "https://github.com/mangelajo/python-nanokvm.git?rev=dev" }, { name = "pillow" }, { name = "pydantic" }, { name = "yarl" }, @@ -2862,8 +2862,8 @@ wheels = [ [[package]] name = "nanokvm" -version = "0.1.0" -source = { registry = "https://pypi.org/simple" } +version = "0.0.1" +source = { git = "https://github.com/mangelajo/python-nanokvm.git?rev=dev#7cbf92f3ebe2d100a25d07b383ac177f745fa668" } dependencies = [ { name = "aiohttp" }, { name = "cryptography" }, @@ -2872,10 +2872,6 @@ dependencies = [ { name = "pydantic" }, { name = "yarl" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/36/14/9b899b2fc93902125fe42c8434ecdcbccc600dc8b63feae8142fc58961b5/nanokvm-0.1.0.tar.gz", hash = "sha256:d10fc30ce06b537257c3daf1a7dda965bcd114b839fab617b20d65a0c4124f33", size = 14762, upload-time = "2025-12-05T13:27:19.406Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/ee/5d/e9f120406449fec8f79e19b4550ba4c5fdb63c70d7f15a48989984295035/nanokvm-0.1.0-py3-none-any.whl", hash = "sha256:c68d67d1abbcbcf3714320c8787e3f55809abc49e9e55fe9fb72cd0a65a2b1b6", size = 14712, upload-time = "2025-12-05T13:27:17.578Z" }, -] [[package]] name = "nodeenv" From 126f67d8ffd17882f3ea9cf148f0f24f7248c545 Mon Sep 17 00:00:00 2001 From: Miguel Angel Ajo Pelayo Date: Fri, 16 Jan 2026 19:16:53 +0100 Subject: [PATCH 8/8] nanokvm: address review comments --- .../jumpstarter_driver_nanokvm/driver.py | 55 +++++++++++-------- .../jumpstarter_driver_nanokvm/driver_test.py | 52 ++++++++---------- .../jumpstarter-driver-nanokvm/pyproject.toml | 3 + pyproject.toml | 1 + 4 files changed, 59 insertions(+), 52 deletions(-) diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py index bc0c532c0..c58c93bfc 100644 --- a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver.py @@ -86,14 +86,9 @@ async def _get_client(self) -> NanoKVMAPIClient: def close(self): """Clean up resources""" # Schedule cleanup of client - if self._client is not None: + if self._client_ctx is not None: try: - import asyncio - loop = asyncio.get_event_loop() - if loop.is_running(): - loop.create_task(self._client_ctx.__aexit__(None, None, None)) - else: - loop.run_until_complete(self._client_ctx.__aexit__(None, None, None)) + anyio.from_thread.run(self._client_ctx.__aexit__(None, None, None)) except Exception as e: self.logger.debug(f"Error closing client: {e}") @@ -215,14 +210,9 @@ async def _get_client(self) -> NanoKVMAPIClient: def close(self): """Clean up resources""" # Schedule cleanup of client - if self._client is not None: + if self._client_ctx is not None: try: - import asyncio - loop = asyncio.get_event_loop() - if loop.is_running(): - loop.create_task(self._client_ctx.__aexit__(None, None, None)) - else: - loop.run_until_complete(self._client_ctx.__aexit__(None, None, None)) + anyio.from_thread.run(self._client_ctx.__aexit__(None, None, None)) except Exception as e: self.logger.debug(f"Error closing client: {e}") @@ -397,13 +387,14 @@ def __post_init__(self): ), } + super().__post_init__() + # Optionally add serial console access if self.enable_serial: # Note: This is a placeholder - actual serial console access via SSH # would require additional implementation in the nanokvm library self.logger.warning("Serial console access not yet fully implemented") - super().__post_init__() @classmethod def client(cls) -> str: @@ -415,26 +406,44 @@ async def get_info(self): # Get info from the video driver's client video_driver = self.children["video"] - @with_reauth - async def _get_info_impl(driver): - client = await driver._get_client() - info = await client.get_info() + def _format_info(info): + """Format device info into a dictionary""" return { - "ips": [{"name": ip.name, "addr": ip.addr, "version": ip.version, "type": ip.type} for ip in info.ips], + "ips": [ + {"name": ip.name, "addr": ip.addr, "version": ip.version, "type": ip.type} + for ip in info.ips + ], "mdns": info.mdns, "image": info.image, "application": info.application, "device_key": info.device_key, } - return await _get_info_impl(video_driver) + try: + client = await video_driver._get_client() + info = await client.get_info() + return _format_info(info) + except Exception as e: + if _is_unauthorized_error(e): + self.logger.warning("Received 401 Unauthorized, re-authenticating...") + await video_driver._reset_client() + # Retry once after re-authentication + client = await video_driver._get_client() + info = await client.get_info() + return _format_info(info) + raise @export async def reboot(self): """Reboot the NanoKVM device""" video_driver = self.children["video"] - client = await video_driver._get_client() - await client.reboot_system() + + @with_reauth + async def _reboot_impl(driver): + client = await driver._get_client() + await client.reboot_system() + + await _reboot_impl(video_driver) self.logger.info("NanoKVM device rebooted") @export diff --git a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py index 99e43c0f4..4ef863e45 100644 --- a/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py +++ b/packages/jumpstarter-driver-nanokvm/jumpstarter_driver_nanokvm/driver_test.py @@ -18,6 +18,7 @@ def mock_nanokvm_client(): # Mock authentication mock_client.authenticate = AsyncMock() mock_client.logout = AsyncMock() + mock_client.close = AsyncMock() # Mock info mock_info = MagicMock() @@ -43,6 +44,8 @@ async def mock_stream(): # Mock HID functions mock_client.paste_text = AsyncMock() mock_client.reset_hid = AsyncMock() + mock_client.mouse_move_abs = AsyncMock() + mock_client.mouse_click = AsyncMock() # Mock reboot mock_client.reboot_system = AsyncMock() @@ -52,14 +55,19 @@ async def mock_stream(): mock_images.files = ["/data/alpine-standard-3.23.2-x86_64.iso", "/data/cs10-js.iso"] mock_client.get_images = AsyncMock(return_value=mock_images) - mock_client_class.return_value = mock_client + # Mock context manager behavior + mock_context = AsyncMock() + mock_context.__aenter__ = AsyncMock(return_value=mock_client) + mock_context.__aexit__ = AsyncMock(return_value=None) + mock_client_class.return_value = mock_context + yield mock_client @pytest.fixture def mock_aiohttp_session(): """Create a mock aiohttp ClientSession""" - with patch("jumpstarter_driver_nanokvm.driver.ClientSession") as mock_session_class: + with patch("aiohttp.ClientSession") as mock_session_class: mock_session = AsyncMock() mock_session.close = AsyncMock() mock_session_class.return_value = mock_session @@ -173,42 +181,28 @@ def test_nanokvm_client_creation(): def test_nanokvm_mouse_move_abs(mock_nanokvm_client, mock_aiohttp_session): """Test mouse absolute movement""" - with patch("jumpstarter_driver_nanokvm.driver.ClientSession") as mock_session_class: - mock_ws = AsyncMock() - mock_ws.send_json = AsyncMock() - mock_session = AsyncMock() - mock_session.ws_connect = AsyncMock(return_value=mock_ws) - mock_session.close = AsyncMock() - mock_session_class.return_value = mock_session - - hid = NanoKVMHID(host="test.local", username="admin", password="admin") + hid = NanoKVMHID(host="test.local", username="admin", password="admin") - with serve(hid) as client: - # Move mouse to absolute position - client.mouse_move_abs(32768, 32768) + with serve(hid) as client: + # Move mouse to absolute position (normalized 0.0-1.0 coordinates) + client.mouse_move_abs(0.5, 0.5) - # Verify WebSocket message was sent - mock_ws.send_json.assert_called() + # Verify the mock was called + mock_nanokvm_client.mouse_move_abs.assert_called_once_with(0.5, 0.5) def test_nanokvm_mouse_click(mock_nanokvm_client, mock_aiohttp_session): """Test mouse click""" - with patch("jumpstarter_driver_nanokvm.driver.ClientSession") as mock_session_class: - mock_ws = AsyncMock() - mock_ws.send_json = AsyncMock() - mock_session = AsyncMock() - mock_session.ws_connect = AsyncMock(return_value=mock_ws) - mock_session.close = AsyncMock() - mock_session_class.return_value = mock_session + from nanokvm.models import MouseButton - hid = NanoKVMHID(host="test.local", username="admin", password="admin") + hid = NanoKVMHID(host="test.local", username="admin", password="admin") - with serve(hid) as client: - # Click left button - client.mouse_click("left") + with serve(hid) as client: + # Click left button + client.mouse_click("left") - # Verify WebSocket messages were sent (down and up) - assert mock_ws.send_json.call_count >= 2 + # Verify the mock was called + mock_nanokvm_client.mouse_click.assert_called_once_with(MouseButton.LEFT, None, None) def test_nanokvm_get_images(mock_nanokvm_client, mock_aiohttp_session): diff --git a/packages/jumpstarter-driver-nanokvm/pyproject.toml b/packages/jumpstarter-driver-nanokvm/pyproject.toml index f3171a93a..75069323c 100644 --- a/packages/jumpstarter-driver-nanokvm/pyproject.toml +++ b/packages/jumpstarter-driver-nanokvm/pyproject.toml @@ -21,6 +21,9 @@ dependencies = [ "click", ] +[project.entry-points."jumpstarter.drivers"] +NanoKVM = "jumpstarter_driver_nanokvm.driver:NanoKVM" + [tool.hatch.version] source = "vcs" raw-options = { 'root' = '../../'} diff --git a/pyproject.toml b/pyproject.toml index e6a171ec2..4a360acc0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ jumpstarter-driver-http-power = { workspace = true } jumpstarter-driver-gpiod = { workspace = true } jumpstarter-driver-ridesx = { workspace = true } jumpstarter-driver-network = { workspace = true } +jumpstarter-driver-nanokvm = { workspace = true } jumpstarter-driver-opendal = { workspace = true } jumpstarter-driver-power = { workspace = true } jumpstarter-driver-probe-rs = { workspace = true }