Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pylabrobot/hamilton/liquid_handlers/vantage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .chatterbox import VantageChatterboxDriver
from .driver import VantageDriver
from .vantage import Vantage
93 changes: 93 additions & 0 deletions pylabrobot/hamilton/liquid_handlers/vantage/chatterbox.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""VantageChatterboxDriver: mock driver that prints commands instead of sending to hardware."""

from __future__ import annotations

import logging
from typing import Any, List, Optional

from .driver import VantageDriver

logger = logging.getLogger("pylabrobot")


class VantageChatterboxDriver(VantageDriver):
"""A VantageDriver that prints firmware commands instead of communicating with hardware.

Useful for testing, debugging, and development without a physical Vantage.
"""

def __init__(self):
super().__init__()

async def setup(
self,
skip_loading_cover: bool = False,
skip_core96: bool = False,
skip_ipg: bool = False,
):
# Skip USB and hardware discovery entirely.
# Import backends here to avoid circular imports.
from .head96_backend import VantageHead96Backend
from .ipg import IPGBackend
from .loading_cover import VantageLoadingCover
from .pip_backend import VantagePIPBackend
from .x_arm import VantageXArm

self.id_ = 0
self._num_channels = 8

self.pip = VantagePIPBackend(self)
self.head96 = VantageHead96Backend(self) if not skip_core96 else None
self.ipg = IPGBackend(driver=self) if not skip_ipg else None
if self.ipg is not None:
self.ipg._parked = True
self.x_arm = VantageXArm(driver=self)
self.loading_cover = VantageLoadingCover(driver=self)

# Initialize subsystems.
for sub in self._subsystems:
await sub._on_setup()

async def stop(self):
# Stop subsystems (no-ops for chatterbox, but follows the pattern).
for sub in reversed(self._subsystems):
await sub._on_stop()
# Clear state (skip super().stop() since there is no USB to close).
self._num_channels = None
self._tth2tti.clear()
self.head96 = None
self.ipg = None
self.x_arm = None
self.loading_cover = None

async def send_command(
self,
module: str,
command: str,
auto_id: bool = True,
tip_pattern: Optional[List[bool]] = None,
write_timeout: Optional[int] = None,
read_timeout: Optional[int] = None,
wait: bool = True,
fmt: Optional[Any] = None,
**kwargs,
):
cmd, _ = self._assemble_command(
module=module,
command=command,
tip_pattern=tip_pattern,
auto_id=auto_id,
**kwargs,
)
logger.info("Chatterbox: %s", cmd)
return None

async def send_raw_command(
self,
command: str,
write_timeout: Optional[int] = None,
read_timeout: Optional[int] = None,
wait: bool = True,
) -> Optional[str]:
logger.info("Chatterbox raw: %s", command)
return None
Loading
Loading