From 7bcbdbd88ba89b63cf0adc5591b515228f7a4caa Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Thu, 2 Apr 2026 18:48:43 -0700 Subject: [PATCH 1/3] Port Hamilton Vantage to new Device/Driver/Backend architecture MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Decomposes the legacy monolithic VantageBackend (~5,334 lines) into the same layered architecture established by the STAR port: - VantageDriver(HamiltonLiquidHandler) — USB I/O, firmware protocol, setup - VantagePIPBackend(PIPBackend) — independent channel operations - VantageHead96Backend(Head96Backend) — 96-head operations - IPGBackend(OrientableGripperArmBackend) — plate gripper - VantageXArm, VantageLoadingCover — auxiliary subsystems - VantageChatterboxDriver — mock driver for testing - Vantage(Device) — user-facing device wiring capabilities All firmware commands verified parameter-by-parameter against the legacy. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../liquid_handlers/vantage/__init__.py | 3 + .../liquid_handlers/vantage/chatterbox.py | 93 ++ .../liquid_handlers/vantage/driver.py | 340 ++++++++ .../liquid_handlers/vantage/errors.py | 282 ++++++ .../liquid_handlers/vantage/fw_parsing.py | 63 ++ .../liquid_handlers/vantage/head96_backend.py | 649 ++++++++++++++ .../hamilton/liquid_handlers/vantage/ipg.py | 279 ++++++ .../liquid_handlers/vantage/loading_cover.py | 56 ++ .../liquid_handlers/vantage/pip_backend.py | 822 ++++++++++++++++++ .../liquid_handlers/vantage/tests/__init__.py | 0 .../vantage/tests/test_errors.py | 75 ++ .../vantage/tests/test_fw_parsing.py | 45 + .../liquid_handlers/vantage/vantage.py | 61 ++ .../hamilton/liquid_handlers/vantage/x_arm.py | 125 +++ 14 files changed, 2893 insertions(+) create mode 100644 pylabrobot/hamilton/liquid_handlers/vantage/__init__.py create mode 100644 pylabrobot/hamilton/liquid_handlers/vantage/chatterbox.py create mode 100644 pylabrobot/hamilton/liquid_handlers/vantage/driver.py create mode 100644 pylabrobot/hamilton/liquid_handlers/vantage/errors.py create mode 100644 pylabrobot/hamilton/liquid_handlers/vantage/fw_parsing.py create mode 100644 pylabrobot/hamilton/liquid_handlers/vantage/head96_backend.py create mode 100644 pylabrobot/hamilton/liquid_handlers/vantage/ipg.py create mode 100644 pylabrobot/hamilton/liquid_handlers/vantage/loading_cover.py create mode 100644 pylabrobot/hamilton/liquid_handlers/vantage/pip_backend.py create mode 100644 pylabrobot/hamilton/liquid_handlers/vantage/tests/__init__.py create mode 100644 pylabrobot/hamilton/liquid_handlers/vantage/tests/test_errors.py create mode 100644 pylabrobot/hamilton/liquid_handlers/vantage/tests/test_fw_parsing.py create mode 100644 pylabrobot/hamilton/liquid_handlers/vantage/vantage.py create mode 100644 pylabrobot/hamilton/liquid_handlers/vantage/x_arm.py diff --git a/pylabrobot/hamilton/liquid_handlers/vantage/__init__.py b/pylabrobot/hamilton/liquid_handlers/vantage/__init__.py new file mode 100644 index 00000000000..c2a906a35dd --- /dev/null +++ b/pylabrobot/hamilton/liquid_handlers/vantage/__init__.py @@ -0,0 +1,3 @@ +from .chatterbox import VantageChatterboxDriver +from .driver import VantageDriver +from .vantage import Vantage diff --git a/pylabrobot/hamilton/liquid_handlers/vantage/chatterbox.py b/pylabrobot/hamilton/liquid_handlers/vantage/chatterbox.py new file mode 100644 index 00000000000..c0b6d30cbda --- /dev/null +++ b/pylabrobot/hamilton/liquid_handlers/vantage/chatterbox.py @@ -0,0 +1,93 @@ +"""VantageChatterboxDriver: mock driver that prints commands instead of sending to hardware.""" + +from __future__ import annotations + +import logging +from typing import Any, List, Optional + +from .driver import VantageDriver + +logger = logging.getLogger("pylabrobot") + + +class VantageChatterboxDriver(VantageDriver): + """A VantageDriver that prints firmware commands instead of communicating with hardware. + + Useful for testing, debugging, and development without a physical Vantage. + """ + + def __init__(self): + super().__init__() + + async def setup( + self, + skip_loading_cover: bool = False, + skip_core96: bool = False, + skip_ipg: bool = False, + ): + # Skip USB and hardware discovery entirely. + # Import backends here to avoid circular imports. + from .head96_backend import VantageHead96Backend + from .ipg import IPGBackend + from .loading_cover import VantageLoadingCover + from .pip_backend import VantagePIPBackend + from .x_arm import VantageXArm + + self.id_ = 0 + self._num_channels = 8 + + self.pip = VantagePIPBackend(self) + self.head96 = VantageHead96Backend(self) if not skip_core96 else None + self.ipg = IPGBackend(driver=self) if not skip_ipg else None + if self.ipg is not None: + self.ipg._parked = True + self.x_arm = VantageXArm(driver=self) + self.loading_cover = VantageLoadingCover(driver=self) + + # Initialize subsystems. + for sub in self._subsystems: + await sub._on_setup() + + async def stop(self): + # Stop subsystems (no-ops for chatterbox, but follows the pattern). + for sub in reversed(self._subsystems): + await sub._on_stop() + # Clear state (skip super().stop() since there is no USB to close). + self._num_channels = None + self._tth2tti.clear() + self.head96 = None + self.ipg = None + self.x_arm = None + self.loading_cover = None + + async def send_command( + self, + module: str, + command: str, + auto_id: bool = True, + tip_pattern: Optional[List[bool]] = None, + write_timeout: Optional[int] = None, + read_timeout: Optional[int] = None, + wait: bool = True, + fmt: Optional[Any] = None, + **kwargs, + ): + cmd, _ = self._assemble_command( + module=module, + command=command, + tip_pattern=tip_pattern, + auto_id=auto_id, + **kwargs, + ) + logger.info("Chatterbox: %s", cmd) + return None + + async def send_raw_command( + self, + command: str, + write_timeout: Optional[int] = None, + read_timeout: Optional[int] = None, + wait: bool = True, + ) -> Optional[str]: + logger.info("Chatterbox raw: %s", command) + return None diff --git a/pylabrobot/hamilton/liquid_handlers/vantage/driver.py b/pylabrobot/hamilton/liquid_handlers/vantage/driver.py new file mode 100644 index 00000000000..0195b5dcef6 --- /dev/null +++ b/pylabrobot/hamilton/liquid_handlers/vantage/driver.py @@ -0,0 +1,340 @@ +"""VantageDriver: inherits HamiltonLiquidHandler, adds Vantage-specific config and error handling.""" + +from typing import TYPE_CHECKING, Any, List, Literal, Optional, Union + +from pylabrobot.capabilities.liquid_handling.head96_backend import Head96Backend +from pylabrobot.capabilities.liquid_handling.pip_backend import PIPBackend +from pylabrobot.hamilton.liquid_handlers.base import HamiltonLiquidHandler +from pylabrobot.resources.hamilton import TipPickupMethod, TipSize + +from .errors import vantage_response_string_to_error +from .fw_parsing import parse_vantage_fw_string + +if TYPE_CHECKING: + from .ipg import IPGBackend + from .loading_cover import VantageLoadingCover + from .x_arm import VantageXArm + + +class VantageDriver(HamiltonLiquidHandler): + """Driver for Hamilton Vantage liquid handlers. + + Inherits USB I/O, command assembly, and background reading from HamiltonLiquidHandler. + Adds Vantage-specific firmware parsing, error handling, and subsystem management. + """ + + def __init__( + self, + device_address: Optional[int] = None, + serial_number: Optional[str] = None, + packet_read_timeout: int = 3, + read_timeout: int = 60, + write_timeout: int = 30, + ): + super().__init__( + id_product=0x8003, + device_address=device_address, + serial_number=serial_number, + packet_read_timeout=packet_read_timeout, + read_timeout=read_timeout, + write_timeout=write_timeout, + ) + + self._num_channels: Optional[int] = None + self._traversal_height: float = 245.0 + + # Populated during setup(). + self.pip: Optional[PIPBackend] = None # set in setup() + self.head96: Optional[Head96Backend] = None # set in setup() if installed + self.ipg: Optional["IPGBackend"] = None # set in setup() if installed + self.x_arm: Optional["VantageXArm"] = None # set in setup() + self.loading_cover: Optional["VantageLoadingCover"] = None # set in setup() + + # -- HamiltonLiquidHandler abstract methods -------------------------------- + + @property + def module_id_length(self) -> int: + return 4 + + @property + def num_channels(self) -> int: + if self._num_channels is None: + raise RuntimeError("Driver not set up - call setup() first.") + return self._num_channels + + def get_id_from_fw_response(self, resp: str) -> Optional[int]: + parsed = parse_vantage_fw_string(resp, {"id": "int"}) + if "id" in parsed and parsed["id"] is not None: + return int(parsed["id"]) + return None + + def check_fw_string_error(self, resp: str) -> None: + if "er" in resp and "er0" not in resp: + raise vantage_response_string_to_error(resp) + + def _parse_response(self, resp: str, fmt: Any) -> dict: + return parse_vantage_fw_string(resp, fmt) + + async def define_tip_needle( + self, + tip_type_table_index: int, + has_filter: bool, + tip_length: int, + maximum_tip_volume: int, + tip_size: TipSize, + pickup_method: TipPickupMethod, + ) -> None: + if not 0 <= tip_type_table_index <= 99: + raise ValueError("tip_type_table_index must be between 0 and 99") + if not 1 <= tip_length <= 1999: + raise ValueError("tip_length must be between 1 and 1999") + if not 1 <= maximum_tip_volume <= 56000: + raise ValueError("maximum_tip_volume must be between 1 and 56000") + + await self.send_command( + module="A1AM", + command="TT", + ti=f"{tip_type_table_index:02}", + tf=has_filter, + tl=f"{tip_length:04}", + tv=f"{maximum_tip_volume:05}", + tg=tip_size.value, + tu=pickup_method.value, + ) + + # -- traversal height ------------------------------------------------------ + + def set_minimum_traversal_height(self, traversal_height: float) -> None: + """Set the minimum traversal height (mm). Used as default for z-safety parameters.""" + assert 0 < traversal_height < 285, "Traversal height must be between 0 and 285 mm" + self._traversal_height = traversal_height + + @property + def traversal_height(self) -> float: + return self._traversal_height + + # -- lifecycle ------------------------------------------------------------- + + async def setup( + self, + skip_loading_cover: bool = False, + skip_core96: bool = False, + skip_ipg: bool = False, + ): + await super().setup() + self.id_ = 0 + + # Import here to avoid circular imports. + from .head96_backend import VantageHead96Backend + from .ipg import IPGBackend + from .loading_cover import VantageLoadingCover + from .pip_backend import VantagePIPBackend + from .x_arm import VantageXArm + + # Discover channel count. + tip_presences = await self.query_tip_presence() + self._num_channels = len(tip_presences) + + # Arm pre-initialization. + arm_initialized = await self.arm_request_instrument_initialization_status() + if not arm_initialized: + await self.arm_pre_initialize() + + # Create backends. + self.pip = VantagePIPBackend(self) + self.x_arm = VantageXArm(driver=self) + self.loading_cover = VantageLoadingCover(driver=self) + + # Initialize PIP channels. + pip_channels_initialized = await self.pip_request_initialization_status() + if not pip_channels_initialized or any(tip_presences): + await self.pip_initialize( + x_position=[7095] * self.num_channels, + y_position=[3891, 3623, 3355, 3087, 2819, 2551, 2283, 2016], + begin_z_deposit_position=[int(self._traversal_height * 10)] * self.num_channels, + end_z_deposit_position=[1235] * self.num_channels, + minimal_height_at_command_end=[int(self._traversal_height * 10)] * self.num_channels, + tip_pattern=[True] * self.num_channels, + tip_type=[1] * self.num_channels, + ts=70, + ) + + # Loading cover. + if not skip_loading_cover: + loading_cover_initialized = await self.loading_cover.request_initialization_status() + if not loading_cover_initialized: + await self.loading_cover.initialize() + + # Core 96 head. + core96_initialized = await self.core96_request_initialization_status() + if not core96_initialized and not skip_core96: + self.head96 = VantageHead96Backend(self) + await self.core96_initialize( + x_position=7347, + y_position=2684, + minimal_traverse_height_at_begin_of_command=int(self._traversal_height * 10), + minimal_height_at_command_end=int(self._traversal_height * 10), + end_z_deposit_position=2420, + ) + else: + # Even if already initialized, create the backend. + self.head96 = VantageHead96Backend(self) if not skip_core96 else None + + # IPG. + if not skip_ipg: + self.ipg = IPGBackend(driver=self) + ipg_initialized = await self.ipg.request_initialization_status() + if not ipg_initialized: + await self.ipg.initialize() + if not await self.ipg.get_parking_status(): + await self.ipg.park() + else: + self.ipg = None + + # Initialize subsystems. + for sub in self._subsystems: + await sub._on_setup() + + @property + def _subsystems(self) -> List[Any]: + """All active subsystems, for lifecycle management.""" + subs: List[Any] = [] + if self.pip is not None: + subs.append(self.pip) + if self.head96 is not None: + subs.append(self.head96) + if self.ipg is not None: + subs.append(self.ipg) + if self.x_arm is not None: + subs.append(self.x_arm) + if self.loading_cover is not None: + subs.append(self.loading_cover) + return subs + + async def stop(self): + # Stop subsystems first (they may need to send firmware commands). + for sub in reversed(self._subsystems): + await sub._on_stop() + await super().stop() + self._num_channels = None + self.head96 = None + self.ipg = None + self.x_arm = None + self.loading_cover = None + + # -- arm commands (A1AM) --------------------------------------------------- + + async def arm_request_instrument_initialization_status(self) -> bool: + """Check if the arm module is initialized (A1AM:QW).""" + resp = await self.send_command(module="A1AM", command="QW", fmt={"qw": "int"}) + return resp is not None and resp["qw"] == 1 + + async def arm_pre_initialize(self) -> None: + """Pre-initialize the arm module (A1AM:MI).""" + await self.send_command(module="A1AM", command="MI") + + # -- pip module commands (A1PM) used during setup -------------------------- + + async def pip_request_initialization_status(self) -> bool: + """Check if PIP channels are initialized (A1PM:QW).""" + resp = await self.send_command(module="A1PM", command="QW", fmt={"qw": "int"}) + return resp is not None and resp["qw"] == 1 + + async def pip_initialize( + self, + x_position: List[int], + y_position: List[int], + begin_z_deposit_position: Optional[List[int]] = None, + end_z_deposit_position: Optional[List[int]] = None, + minimal_height_at_command_end: Optional[List[int]] = None, + tip_pattern: Optional[List[bool]] = None, + tip_type: Optional[List[int]] = None, + ts: int = 0, + ) -> None: + """Initialize PIP channels (A1PM:DI).""" + + if begin_z_deposit_position is None: + begin_z_deposit_position = [0] * self.num_channels + if end_z_deposit_position is None: + end_z_deposit_position = [0] * self.num_channels + if minimal_height_at_command_end is None: + minimal_height_at_command_end = [3600] * self.num_channels + if tip_pattern is None: + tip_pattern = [False] * self.num_channels + if tip_type is None: + tip_type = [4] * self.num_channels + + await self.send_command( + module="A1PM", + command="DI", + xp=x_position, + yp=y_position, + tp=begin_z_deposit_position, + tz=end_z_deposit_position, + te=minimal_height_at_command_end, + tm=tip_pattern, + tt=tip_type, + ts=ts, + ) + + async def query_tip_presence(self) -> List[bool]: + """Query tip presence on all channels (A1PM:QA).""" + resp = await self.send_command(module="A1PM", command="QA", fmt={"rt": "[int]"}) + presences_int: List[int] = resp["rt"] + return [bool(p) for p in presences_int] + + # -- core 96 commands used during setup (A1HM) ----------------------------- + + async def core96_request_initialization_status(self) -> bool: + """Check if Core96 head is initialized (A1HM:QW).""" + resp = await self.send_command(module="A1HM", command="QW", fmt={"qw": "int"}) + return resp is not None and resp["qw"] == 1 + + async def core96_initialize( + self, + x_position: int = 7347, + y_position: int = 2684, + z_position: int = 0, + minimal_traverse_height_at_begin_of_command: int = 2450, + minimal_height_at_command_end: int = 2450, + end_z_deposit_position: int = 2420, + tip_type: int = 4, + ) -> None: + """Initialize Core 96 head (A1HM:DI).""" + await self.send_command( + module="A1HM", + command="DI", + xp=x_position, + yp=y_position, + zp=z_position, + th=minimal_traverse_height_at_begin_of_command, + te=minimal_height_at_command_end, + tz=end_z_deposit_position, + tt=tip_type, + ) + + # -- LED (C0AM) ------------------------------------------------------------ + + async def set_led_color( + self, + mode: Union[Literal["on"], Literal["off"], Literal["blink"]], + intensity: int, + white: int, + red: int, + green: int, + blue: int, + uv: int, + blink_interval: Optional[int] = None, + ) -> None: + """Set the instrument LED color (C0AM:LI).""" + if blink_interval is not None and mode != "blink": + raise ValueError("blink_interval is only used when mode is 'blink'.") + + await self.send_command( + module="C0AM", + command="LI", + li={"on": 1, "off": 0, "blink": 2}[mode], + os=intensity, + ok=blink_interval or 750, + ol=f"{white} {red} {green} {blue} {uv}", + ) diff --git a/pylabrobot/hamilton/liquid_handlers/vantage/errors.py b/pylabrobot/hamilton/liquid_handlers/vantage/errors.py new file mode 100644 index 00000000000..9100195851d --- /dev/null +++ b/pylabrobot/hamilton/liquid_handlers/vantage/errors.py @@ -0,0 +1,282 @@ +"""Vantage-specific firmware error classes and error parsing.""" + +import re +from typing import Dict, Optional + +from pylabrobot.capabilities.liquid_handling.errors import ChannelizedError +from pylabrobot.resources.errors import HasTipError, NoTipError, TooLittleLiquidError + +from .fw_parsing import parse_vantage_fw_string + +# --------------------------------------------------------------------------- +# Error dictionaries (per-module error code -> human-readable message) +# --------------------------------------------------------------------------- + +core96_errors: Dict[int, str] = { + 0: "No error", + 21: "No communication to digital potentiometer", + 25: "Wrong Flash EPROM data", + 26: "Flash EPROM not programmable", + 27: "Flash EPROM not erasable", + 28: "Flash EPROM checksum error", + 29: "Wrong FW loaded", + 30: "Undefined command", + 31: "Undefined parameter", + 32: "Parameter out of range", + 35: "Voltages out of range", + 36: "Stop during command execution", + 37: "Adjustment sensor didn't switch (no teach in signal)", + 40: "No parallel processes on level 1 permitted", + 41: "No parallel processes on level 2 permitted", + 42: "No parallel processes on level 3 permitted", + 50: "Dispensing drive initialization failed", + 51: "Dispensing drive not initialized", + 52: "Dispensing drive movement error", + 53: "Maximum volume in tip reached", + 54: "Dispensing drive position out of permitted area", + 55: "Y drive initialization failed", + 56: "Y drive not initialized", + 57: "Y drive movement error", + 58: "Y drive position out of permitted area", + 60: "Z drive initialization failed", + 61: "Z drive not initialized", + 62: "Z drive movement error", + 63: "Z drive position out of permitted area", + 65: "Squeezer drive initialization failed", + 66: "Squeezer drive not initialized", + 67: "Squeezer drive movement error", + 68: "Squeezer drive position out of permitted area", + 70: "No liquid level found", + 71: "Not enough liquid present", + 75: "No tip picked up", + 76: "Tip already picked up", + 81: "Clot detected with LLD sensor", + 82: "TADM measurement out of lower limit curve", + 83: "TADM measurement out of upper limit curve", + 84: "Not enough memory for TADM measurement", + 90: "Limit curve not resettable", + 91: "Limit curve not programmable", + 92: "Limit curve name not found", + 93: "Limit curve data incorrect", + 94: "Not enough memory for limit curve", + 95: "Not allowed limit curve index", + 96: "Limit curve already stored", +} + +pip_errors: Dict[int, str] = { + 22: "Drive controller message error", + 23: "EC drive controller setup not executed", + 25: "wrong Flash EPROM data", + 26: "Flash EPROM not programmable", + 27: "Flash EPROM not erasable", + 28: "Flash EPROM checksum error", + 29: "wrong FW loaded", + 30: "Undefined command", + 31: "Undefined parameter", + 32: "Parameter out of range", + 35: "Voltages out of range", + 36: "Stop during command execution", + 37: "Adjustment sensor didn't switch (no teach in signal)", + 38: "Movement interrupted by partner channel", + 39: "Angle alignment offset error", + 40: "No parallel processes on level 1 permitted", + 41: "No parallel processes on level 2 permitted", + 42: "No parallel processes on level 3 permitted", + 50: "D drive initialization failed", + 51: "D drive not initialized", + 52: "D drive movement error", + 53: "Maximum volume in tip reached", + 54: "D drive position out of permitted area", + 55: "Y drive initialization failed", + 56: "Y drive not initialized", + 57: "Y drive movement error", + 58: "Y drive position out of permitted area", + 59: "Divergance Y motion controller to linear encoder to height", + 60: "Z drive initialization failed", + 61: "Z drive not initialized", + 62: "Z drive movement error", + 63: "Z drive position out of permitted area", + 64: "Limit stop not found", + 65: "S drive initialization failed", + 66: "S drive not initialized", + 67: "S drive movement error", + 68: "S drive position out of permitted area", + 69: "Init. position adjustment error", + 70: "No liquid level found", + 71: "Not enough liquid present", + 74: "Liquid at a not allowed position detected", + 75: "No tip picked up", + 76: "Tip already picked up", + 77: "Tip not discarded", + 78: "Wrong tip detected", + 79: "Tip not correct squeezed", + 80: "Liquid not correctly aspirated", + 81: "Clot detected", + 82: "TADM measurement out of lower limit curve", + 83: "TADM measurement out of upper limit curve", + 84: "Not enough memory for TADM measurement", + 85: "Jet dispense pressure not reached", + 86: "ADC algorithm error", + 90: "Limit curve not resettable", + 91: "Limit curve not programmable", + 92: "Limit curve name not found", + 93: "Limit curve data incorrect", + 94: "Not enough memory for limit curve", + 95: "Not allowed limit curve index", + 96: "Limit curve already stored", +} + +ipg_errors: Dict[int, str] = { + 0: "No error", + 22: "Drive controller message error", + 23: "EC drive controller setup not executed", + 25: "Wrong Flash EPROM data", + 26: "Flash EPROM not programmable", + 27: "Flash EPROM not erasable", + 28: "Flash EPROM checksum error", + 29: "Wrong FW loaded", + 30: "Undefined command", + 31: "Undefined parameter", + 32: "Parameter out of range", + 35: "Voltages out of range", + 36: "Stop during command execution", + 37: "Adjustment sensor didn't switch (no teach in signal)", + 39: "Angle alignment offset error", + 40: "No parallel processes on level 1 permitted", + 41: "No parallel processes on level 2 permitted", + 42: "No parallel processes on level 3 permitted", + 50: "Y Drive initialization failed", + 51: "Y Drive not initialized", + 52: "Y Drive movement error", + 53: "Y Drive position out of permitted area", + 54: "Diff. motion controller and lin. encoder counter too high", + 55: "Z Drive initialization failed", + 56: "Z Drive not initialized", + 57: "Z Drive movement error", + 58: "Z Drive position out of permitted area", + 59: "Z Drive limit stop not found", + 60: "Rotation Drive initialization failed", + 61: "Rotation Drive not initialized", + 62: "Rotation Drive movement error", + 63: "Rotation Drive position out of permitted area", + 65: "Wrist Twist Drive initialization failed", + 66: "Wrist Twist Drive not initialized", + 67: "Wrist Twist Drive movement error", + 68: "Wrist Twist Drive position out of permitted area", + 70: "Gripper Drive initialization failed", + 71: "Gripper Drive not initialized", + 72: "Gripper Drive movement error", + 73: "Gripper Drive position out of permitted area", + 80: "Plate not found", + 81: "Plate is still held", + 82: "No plate is held", +} + + +# --------------------------------------------------------------------------- +# Module ID -> name mapping +# --------------------------------------------------------------------------- + +VANTAGE_MODULE_NAMES: Dict[str, str] = { + "I1AM": "Cover", + "C0AM": "Master", + "A1PM": "Pip", + "A1HM": "Core 96", + "A1RM": "IPG", + "A1AM": "Arm", + "A1XM": "X-arm", +} + + +# --------------------------------------------------------------------------- +# Error classes +# --------------------------------------------------------------------------- + + +class VantageFirmwareError(Exception): + """Error raised when the Vantage firmware returns an error response.""" + + def __init__(self, errors: Dict[str, str], raw_response: str): + self.errors = errors + self.raw_response = raw_response + + def __str__(self) -> str: + return f"VantageFirmwareError(errors={self.errors}, raw_response={self.raw_response})" + + def __eq__(self, other: object) -> bool: + return ( + isinstance(other, VantageFirmwareError) + and self.errors == other.errors + and self.raw_response == other.raw_response + ) + + +# --------------------------------------------------------------------------- +# Parsing firmware error responses +# --------------------------------------------------------------------------- + + +def vantage_response_string_to_error(string: str) -> VantageFirmwareError: + """Convert a Vantage firmware response string to a VantageFirmwareError. + + Assumes that the response is an error response. + """ + + try: + error_format = r"[A-Z0-9]{2}[0-9]{2}" + error_string = parse_vantage_fw_string(string, {"es": "str"})["es"] + error_codes = re.findall(error_format, error_string) + errors: Dict[str, str] = {} + num_channels = 16 + for error in error_codes: + module, error_code = error[:2], error[2:] + error_code_int = int(error_code) + for channel in range(1, num_channels + 1): + if module == f"P{channel}": + errors[f"Pipetting channel {channel}"] = pip_errors.get(error_code_int, "Unknown error") + elif module in ("H0", "HM"): + errors["Core 96"] = core96_errors.get(error_code_int, "Unknown error") + elif module == "RM": + errors["IPG"] = ipg_errors.get(error_code_int, "Unknown error") + elif module == "AM": + errors["Cover"] = "Unknown error" + except ValueError: + module_id = string[:4] + module_name = VANTAGE_MODULE_NAMES.get(module_id, "Unknown module") + error_string = parse_vantage_fw_string(string, {"et": "str"})["et"] + errors = {module_name: error_string} + + return VantageFirmwareError(errors, string) + + +# --------------------------------------------------------------------------- +# Conversion to standard PLR errors +# --------------------------------------------------------------------------- + + +def convert_vantage_firmware_error_to_plr_error( + error: VantageFirmwareError, +) -> Optional[Exception]: + """Convert a VantageFirmwareError to a standard PLR error if possible. + + Returns the converted error, or None if no conversion is applicable. + """ + + # If all errors are pipetting channel errors, return a ChannelizedError. + if all(key.startswith("Pipetting channel ") for key in error.errors): + channel_errors: Dict[int, Exception] = {} + for channel_name, message in error.errors.items(): + channel_idx = int(channel_name.split(" ")[-1]) - 1 # 1-indexed -> 0-indexed + + if message == pip_errors.get(76): # "Tip already picked up" + channel_errors[channel_idx] = HasTipError() + elif message == pip_errors.get(75): # "No tip picked up" + channel_errors[channel_idx] = NoTipError(message) + elif message in (pip_errors.get(70), pip_errors.get(71)): + channel_errors[channel_idx] = TooLittleLiquidError(message) + else: + channel_errors[channel_idx] = Exception(message) + + return ChannelizedError(errors=channel_errors, raw_response=error.raw_response) + + return None diff --git a/pylabrobot/hamilton/liquid_handlers/vantage/fw_parsing.py b/pylabrobot/hamilton/liquid_handlers/vantage/fw_parsing.py new file mode 100644 index 00000000000..80c4e058470 --- /dev/null +++ b/pylabrobot/hamilton/liquid_handlers/vantage/fw_parsing.py @@ -0,0 +1,63 @@ +"""Vantage-specific firmware response parsing.""" + +import re +from typing import Dict, Optional + + +def parse_vantage_fw_string(s: str, fmt: Optional[Dict[str, str]] = None) -> dict: + """Parse a Vantage firmware string into a dict. + + The identifier parameter (id) is added automatically. + + ``fmt`` is a dict that specifies the format of the string. The keys are the parameter names and + the values are the types. The following types are supported: + + - ``"int"``: a single integer + - ``"str"``: a string + - ``"[int]"``: a list of integers + - ``"hex"``: a hexadecimal number + + Example: + >>> parse_vantage_fw_string("id0xs30 -100 +1 1000", {"id": "int", "x": "[int]"}) + {"id": 0, "x": [30, -100, 1, 1000]} + + >>> parse_vantage_fw_string('es"error string"', {"es": "str"}) + {"es": "error string"} + """ + + parsed: dict = {} + + if fmt is None: + fmt = {} + + if not isinstance(fmt, dict): + raise TypeError(f"invalid fmt for fmt: expected dict, got {type(fmt)}") + + if "id" not in fmt: + fmt["id"] = "int" + + for key, data_type in fmt.items(): + if data_type == "int": + matches = re.findall(rf"{key}([-+]?\d+)", s) + if len(matches) != 1: + raise ValueError(f"Expected exactly one match for {key} in {s}") + parsed[key] = int(matches[0]) + elif data_type == "str": + matches = re.findall(rf"{key}\"(.*)\"", s) + if len(matches) != 1: + raise ValueError(f"Expected exactly one match for {key} in {s}") + parsed[key] = matches[0] + elif data_type == "[int]": + matches = re.findall(rf"{key}((?:[-+]?[\d ]+)+)", s) + if len(matches) != 1: + raise ValueError(f"Expected exactly one match for {key} in {s}") + parsed[key] = [int(x) for x in matches[0].split()] + elif data_type == "hex": + matches = re.findall(rf"{key}([0-9a-fA-F]+)", s) + if len(matches) != 1: + raise ValueError(f"Expected exactly one match for {key} in {s}") + parsed[key] = int(matches[0], 16) + else: + raise ValueError(f"Unknown data type {data_type}") + + return parsed diff --git a/pylabrobot/hamilton/liquid_handlers/vantage/head96_backend.py b/pylabrobot/hamilton/liquid_handlers/vantage/head96_backend.py new file mode 100644 index 00000000000..0652a2c4122 --- /dev/null +++ b/pylabrobot/hamilton/liquid_handlers/vantage/head96_backend.py @@ -0,0 +1,649 @@ +"""Vantage Head96 backend: translates Head96 operations into Vantage firmware commands.""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass +from typing import TYPE_CHECKING, List, Optional, Union + +from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.capabilities.liquid_handling.head96_backend import Head96Backend +from pylabrobot.capabilities.liquid_handling.standard import ( + DropTipRack, + MultiHeadAspirationContainer, + MultiHeadAspirationPlate, + MultiHeadDispenseContainer, + MultiHeadDispensePlate, + PickupTipRack, +) +from pylabrobot.hamilton.lh.vantage.liquid_classes import get_vantage_liquid_class +from pylabrobot.hamilton.liquid_handlers.liquid_class import HamiltonLiquidClass +from pylabrobot.resources import Coordinate, Plate, TipRack +from pylabrobot.resources.hamilton import HamiltonTip +from pylabrobot.resources.liquid import Liquid + +from .errors import VantageFirmwareError, convert_vantage_firmware_error_to_plr_error +from .pip_backend import _get_dispense_mode + +if TYPE_CHECKING: + from .driver import VantageDriver + +logger = logging.getLogger("pylabrobot") + + +def _channel_pattern_to_hex(pattern: List[bool]) -> str: + """Convert a list of 96 booleans to the hex string expected by firmware.""" + if len(pattern) != 96: + raise ValueError("channel_pattern must be a list of 96 boolean values") + channel_pattern_bin_str = reversed(["1" if x else "0" for x in pattern]) + return hex(int("".join(channel_pattern_bin_str), 2)).upper()[2:] + + +class VantageHead96Backend(Head96Backend): + """Translates Head96 operations into Vantage firmware commands via the driver.""" + + def __init__(self, driver: "VantageDriver"): + self.driver = driver + + async def _on_setup(self): + pass + + async def _on_stop(self): + pass + + # -- BackendParams --------------------------------------------------------- + + @dataclass + class PickUpTipsParams(BackendParams): + tip_handling_method: int = 0 + z_deposit_position: float = 216.4 + minimal_traverse_height_at_begin_of_command: Optional[float] = None + minimal_height_at_command_end: Optional[float] = None + + @dataclass + class DropTipsParams(BackendParams): + z_deposit_position: float = 216.4 + minimal_traverse_height_at_begin_of_command: Optional[float] = None + minimal_height_at_command_end: Optional[float] = None + + @dataclass + class AspirateParams(BackendParams): + jet: bool = False + blow_out: bool = False + hlc: Optional[HamiltonLiquidClass] = None + type_of_aspiration: int = 0 + minimal_traverse_height_at_begin_of_command: Optional[float] = None + minimal_height_at_command_end: Optional[float] = None + pull_out_distance_to_take_transport_air_in_function_without_lld: float = 5 + tube_2nd_section_height_measured_from_zm: float = 0 + tube_2nd_section_ratio: float = 0 + immersion_depth: float = 0 + surface_following_distance: float = 0 + transport_air_volume: Optional[float] = None + blow_out_air_volume: Optional[float] = None + pre_wetting_volume: float = 0 + lld_mode: int = 0 + lld_sensitivity: int = 4 + swap_speed: Optional[float] = None + settling_time: Optional[float] = None + limit_curve_index: int = 0 + tadm_channel_pattern: Optional[List[bool]] = None + tadm_algorithm_on_off: int = 0 + recording_mode: int = 0 + disable_volume_correction: bool = False + + @dataclass + class DispenseParams(BackendParams): + jet: bool = False + blow_out: bool = False + empty: bool = False + hlc: Optional[HamiltonLiquidClass] = None + type_of_dispensing_mode: Optional[int] = None + tube_2nd_section_height_measured_from_zm: float = 0 + tube_2nd_section_ratio: float = 0 + pull_out_distance_to_take_transport_air_in_function_without_lld: float = 5.0 + immersion_depth: float = 0 + surface_following_distance: float = 2.9 + minimal_traverse_height_at_begin_of_command: Optional[float] = None + minimal_height_at_command_end: Optional[float] = None + cut_off_speed: float = 250.0 + stop_back_volume: float = 0 + transport_air_volume: Optional[float] = None + blow_out_air_volume: Optional[float] = None + lld_mode: int = 0 + lld_sensitivity: int = 4 + side_touch_off_distance: float = 0 + swap_speed: Optional[float] = None + settling_time: Optional[float] = None + limit_curve_index: int = 0 + tadm_channel_pattern: Optional[List[bool]] = None + tadm_algorithm_on_off: int = 0 + recording_mode: int = 0 + disable_volume_correction: bool = False + + # -- Head96Backend interface ----------------------------------------------- + + async def pick_up_tips96( + self, + pickup: PickupTipRack, + backend_params: Optional[BackendParams] = None, + ): + if not isinstance(backend_params, VantageHead96Backend.PickUpTipsParams): + backend_params = VantageHead96Backend.PickUpTipsParams() + + tip_spot_a1 = pickup.resource.get_item("A1") + prototypical_tip = None + for tip_spot in pickup.resource.get_all_items(): + if tip_spot.has_tip(): + prototypical_tip = tip_spot.get_tip() + break + if prototypical_tip is None: + raise ValueError("No tips found in the tip rack.") + assert isinstance(prototypical_tip, HamiltonTip), "Tip type must be HamiltonTip." + + ttti = await self.driver.request_or_assign_tip_type_index(prototypical_tip) + position = tip_spot_a1.get_absolute_location(x="c", y="c", z="b") + pickup.offset + th = self.driver.traversal_height + + try: + await self._core96_tip_pick_up( + x_position=round(position.x * 10), + y_position=round(position.y * 10), + tip_type=ttti, + tip_handling_method=backend_params.tip_handling_method, + z_deposit_position=round((backend_params.z_deposit_position + pickup.offset.z) * 10), + minimal_traverse_height_at_begin_of_command=round( + (backend_params.minimal_traverse_height_at_begin_of_command or th) * 10 + ), + minimal_height_at_command_end=round( + (backend_params.minimal_height_at_command_end or th) * 10 + ), + ) + except VantageFirmwareError as e: + plr_error = convert_vantage_firmware_error_to_plr_error(e) + raise plr_error if plr_error is not None else e + + async def drop_tips96( + self, + drop: DropTipRack, + backend_params: Optional[BackendParams] = None, + ): + if not isinstance(backend_params, VantageHead96Backend.DropTipsParams): + backend_params = VantageHead96Backend.DropTipsParams() + + if isinstance(drop.resource, TipRack): + tip_spot_a1 = drop.resource.get_item("A1") + position = tip_spot_a1.get_absolute_location(x="c", y="c", z="b") + drop.offset + else: + raise NotImplementedError( + f"Only TipRacks are supported for dropping tips on Vantage, got {drop.resource}" + ) + + th = self.driver.traversal_height + + try: + await self._core96_tip_discard( + x_position=round(position.x * 10), + y_position=round(position.y * 10), + z_deposit_position=round((backend_params.z_deposit_position + drop.offset.z) * 10), + minimal_traverse_height_at_begin_of_command=round( + (backend_params.minimal_traverse_height_at_begin_of_command or th) * 10 + ), + minimal_height_at_command_end=round( + (backend_params.minimal_height_at_command_end or th) * 10 + ), + ) + except VantageFirmwareError as e: + plr_error = convert_vantage_firmware_error_to_plr_error(e) + raise plr_error if plr_error is not None else e + + async def aspirate96( + self, + aspiration: Union[MultiHeadAspirationPlate, MultiHeadAspirationContainer], + backend_params: Optional[BackendParams] = None, + ): + if not isinstance(backend_params, VantageHead96Backend.AspirateParams): + backend_params = VantageHead96Backend.AspirateParams() + + # Resolve position and liquid surface. + if isinstance(aspiration, MultiHeadAspirationPlate): + plate = aspiration.wells[0].parent + assert isinstance(plate, Plate) + rot = plate.get_absolute_rotation() + if rot.x % 360 != 0 or rot.y % 360 != 0: + raise ValueError("Plate rotation around x or y is not supported for 96 head operations") + if rot.z % 360 == 180: + ref_well = plate.get_well("H12") + elif rot.z % 360 == 0: + ref_well = plate.get_well("A1") + else: + raise ValueError("96 head only supports plate rotations of 0 or 180 degrees around z") + position = ( + ref_well.get_absolute_location(x="c", y="c", z="b") + + aspiration.offset + + Coordinate(z=ref_well.material_z_thickness) + ) + well_bottoms = position.z + lld_search_height = well_bottoms + ref_well.get_absolute_size_z() + 1.7 + else: + x_width = (12 - 1) * 9 + y_width = (8 - 1) * 9 + x_position = (aspiration.container.get_absolute_size_x() - x_width) / 2 + y_position = (aspiration.container.get_absolute_size_y() - y_width) / 2 + y_width + position = ( + aspiration.container.get_absolute_location(z="cavity_bottom") + + Coordinate(x=x_position, y=y_position) + + aspiration.offset + ) + well_bottoms = position.z + lld_search_height = well_bottoms + aspiration.container.get_absolute_size_z() + 1.7 + + liquid_height = position.z + (aspiration.liquid_height or 0) + + tip = next(t for t in aspiration.tips if t is not None) + hlc = backend_params.hlc + if hlc is None: + hlc = get_vantage_liquid_class( + tip_volume=tip.maximal_volume, + is_core=True, + is_tip=True, + has_filter=tip.has_filter, + liquid=Liquid.WATER, + jet=backend_params.jet, + blow_out=backend_params.blow_out, + ) + + if backend_params.disable_volume_correction or hlc is None: + volume = aspiration.volume + else: + volume = hlc.compute_corrected_volume(aspiration.volume) + + transport_air_volume = backend_params.transport_air_volume or ( + hlc.aspiration_air_transport_volume if hlc is not None else 0 + ) + blow_out_air_volume = backend_params.blow_out_air_volume or ( + hlc.aspiration_blow_out_volume if hlc is not None else 0 + ) + flow_rate = aspiration.flow_rate or (hlc.aspiration_flow_rate if hlc is not None else 250) + swap_speed = backend_params.swap_speed or ( + hlc.aspiration_swap_speed if hlc is not None else 100 + ) + settling_time = backend_params.settling_time or ( + hlc.aspiration_settling_time if hlc is not None else 5 + ) + + th = self.driver.traversal_height + + try: + await self._core96_aspiration_of_liquid( + type_of_aspiration=backend_params.type_of_aspiration, + x_position=round(position.x * 10), + y_position=round(position.y * 10), + minimal_traverse_height_at_begin_of_command=round( + (backend_params.minimal_traverse_height_at_begin_of_command or th) * 10 + ), + minimal_height_at_command_end=round( + (backend_params.minimal_height_at_command_end or th) * 10 + ), + lld_search_height=round(lld_search_height * 10), + liquid_surface_at_function_without_lld=round(liquid_height * 10), + pull_out_distance_to_take_transport_air_in_function_without_lld=round( + backend_params.pull_out_distance_to_take_transport_air_in_function_without_lld * 10 + ), + minimum_height=round(well_bottoms * 10), + tube_2nd_section_height_measured_from_zm=round( + backend_params.tube_2nd_section_height_measured_from_zm * 10 + ), + tube_2nd_section_ratio=round(backend_params.tube_2nd_section_ratio * 10), + immersion_depth=round(backend_params.immersion_depth * 10), + surface_following_distance=round(backend_params.surface_following_distance * 10), + aspiration_volume=round(volume * 100), + aspiration_speed=round(flow_rate * 10), + transport_air_volume=round(transport_air_volume * 10), + blow_out_air_volume=round(blow_out_air_volume * 100), + pre_wetting_volume=round(backend_params.pre_wetting_volume * 100), + lld_mode=backend_params.lld_mode, + lld_sensitivity=backend_params.lld_sensitivity, + swap_speed=round(swap_speed * 10), + settling_time=round(settling_time * 10), + mix_volume=round(aspiration.mix.volume * 100) if aspiration.mix is not None else 0, + mix_cycles=aspiration.mix.repetitions if aspiration.mix is not None else 0, + mix_position_in_z_direction_from_liquid_surface=0, + surface_following_distance_during_mixing=0, + mix_speed=round(aspiration.mix.flow_rate * 10) if aspiration.mix is not None else 20, + limit_curve_index=backend_params.limit_curve_index, + tadm_channel_pattern=backend_params.tadm_channel_pattern, + tadm_algorithm_on_off=backend_params.tadm_algorithm_on_off, + recording_mode=backend_params.recording_mode, + ) + except VantageFirmwareError as e: + plr_error = convert_vantage_firmware_error_to_plr_error(e) + raise plr_error if plr_error is not None else e + + async def dispense96( + self, + dispense: Union[MultiHeadDispensePlate, MultiHeadDispenseContainer], + backend_params: Optional[BackendParams] = None, + ): + if not isinstance(backend_params, VantageHead96Backend.DispenseParams): + backend_params = VantageHead96Backend.DispenseParams() + + if isinstance(dispense, MultiHeadDispensePlate): + plate = dispense.wells[0].parent + assert isinstance(plate, Plate) + rot = plate.get_absolute_rotation() + if rot.x % 360 != 0 or rot.y % 360 != 0: + raise ValueError("Plate rotation around x or y is not supported for 96 head operations") + if rot.z % 360 == 180: + ref_well = plate.get_well("H12") + elif rot.z % 360 == 0: + ref_well = plate.get_well("A1") + else: + raise ValueError("96 head only supports plate rotations of 0 or 180 degrees around z") + position = ( + ref_well.get_absolute_location(x="c", y="c", z="b") + + dispense.offset + + Coordinate(z=ref_well.material_z_thickness) + ) + well_bottoms = position.z + lld_search_height = well_bottoms + ref_well.get_absolute_size_z() + 1.7 + else: + x_width = (12 - 1) * 9 + y_width = (8 - 1) * 9 + x_position = (dispense.container.get_absolute_size_x() - x_width) / 2 + y_position = (dispense.container.get_absolute_size_y() - y_width) / 2 + y_width + position = ( + dispense.container.get_absolute_location(z="cavity_bottom") + + Coordinate(x=x_position, y=y_position) + + dispense.offset + ) + well_bottoms = position.z + lld_search_height = well_bottoms + dispense.container.get_absolute_size_z() + 1.7 + + liquid_height = position.z + (dispense.liquid_height or 0) + 10 + + tip = next(t for t in dispense.tips if t is not None) + hlc = backend_params.hlc + if hlc is None: + hlc = get_vantage_liquid_class( + tip_volume=tip.maximal_volume, + is_core=True, + is_tip=True, + has_filter=tip.has_filter, + liquid=Liquid.WATER, + jet=backend_params.jet, + blow_out=backend_params.blow_out, + ) + + if backend_params.disable_volume_correction or hlc is None: + volume = dispense.volume + else: + volume = hlc.compute_corrected_volume(dispense.volume) + + type_of_dispensing_mode = backend_params.type_of_dispensing_mode + if type_of_dispensing_mode is None: + type_of_dispensing_mode = _get_dispense_mode( + jet=backend_params.jet, + empty=backend_params.empty, + blow_out=backend_params.blow_out, + ) + + transport_air_volume = backend_params.transport_air_volume or ( + hlc.dispense_air_transport_volume if hlc is not None else 0 + ) + blow_out_air_volume = backend_params.blow_out_air_volume or ( + hlc.dispense_blow_out_volume if hlc is not None else 0 + ) + flow_rate = dispense.flow_rate or (hlc.dispense_flow_rate if hlc is not None else 250) + swap_speed = backend_params.swap_speed or (hlc.dispense_swap_speed if hlc is not None else 100) + settling_time = backend_params.settling_time or ( + hlc.dispense_settling_time if hlc is not None else 5 + ) + + th = self.driver.traversal_height + + try: + await self._core96_dispensing_of_liquid( + type_of_dispensing_mode=type_of_dispensing_mode, + x_position=round(position.x * 10), + y_position=round(position.y * 10), + minimum_height=round(well_bottoms * 10), + tube_2nd_section_height_measured_from_zm=round( + backend_params.tube_2nd_section_height_measured_from_zm * 10 + ), + tube_2nd_section_ratio=round(backend_params.tube_2nd_section_ratio * 10), + lld_search_height=round(lld_search_height * 10), + liquid_surface_at_function_without_lld=round(liquid_height * 10), + pull_out_distance_to_take_transport_air_in_function_without_lld=round( + backend_params.pull_out_distance_to_take_transport_air_in_function_without_lld * 10 + ), + immersion_depth=round(backend_params.immersion_depth * 10), + surface_following_distance=round(backend_params.surface_following_distance * 10), + minimal_traverse_height_at_begin_of_command=round( + (backend_params.minimal_traverse_height_at_begin_of_command or th) * 10 + ), + minimal_height_at_command_end=round( + (backend_params.minimal_height_at_command_end or th) * 10 + ), + dispense_volume=round(volume * 100), + dispense_speed=round(flow_rate * 10), + cut_off_speed=round(backend_params.cut_off_speed * 10), + stop_back_volume=round(backend_params.stop_back_volume * 100), + transport_air_volume=round(transport_air_volume * 10), + blow_out_air_volume=round(blow_out_air_volume * 100), + lld_mode=backend_params.lld_mode, + lld_sensitivity=backend_params.lld_sensitivity, + side_touch_off_distance=round(backend_params.side_touch_off_distance * 10), + swap_speed=round(swap_speed * 10), + settling_time=round(settling_time * 10), + mix_volume=round(dispense.mix.volume * 100) if dispense.mix is not None else 0, + mix_cycles=dispense.mix.repetitions if dispense.mix is not None else 0, + mix_position_in_z_direction_from_liquid_surface=0, + surface_following_distance_during_mixing=0, + mix_speed=round(dispense.mix.flow_rate * 10) if dispense.mix is not None else 10, + limit_curve_index=backend_params.limit_curve_index, + tadm_channel_pattern=backend_params.tadm_channel_pattern, + tadm_algorithm_on_off=backend_params.tadm_algorithm_on_off, + recording_mode=backend_params.recording_mode, + ) + except VantageFirmwareError as e: + plr_error = convert_vantage_firmware_error_to_plr_error(e) + raise plr_error if plr_error is not None else e + + # -- firmware commands (A1HM) ---------------------------------------------- + + async def _core96_tip_pick_up( + self, + x_position: int, + y_position: int, + tip_type: int, + tip_handling_method: int, + z_deposit_position: int, + minimal_traverse_height_at_begin_of_command: int, + minimal_height_at_command_end: int, + ): + """Tip pick up using 96 head (A1HM:TP).""" + await self.driver.send_command( + module="A1HM", + command="TP", + xp=x_position, + yp=y_position, + tt=tip_type, + td=tip_handling_method, + tz=z_deposit_position, + th=minimal_traverse_height_at_begin_of_command, + te=minimal_height_at_command_end, + ) + + async def _core96_tip_discard( + self, + x_position: int, + y_position: int, + z_deposit_position: int, + minimal_traverse_height_at_begin_of_command: int, + minimal_height_at_command_end: int, + ): + """Tip discard using 96 head (A1HM:TR).""" + await self.driver.send_command( + module="A1HM", + command="TR", + xp=x_position, + yp=y_position, + tz=z_deposit_position, + th=minimal_traverse_height_at_begin_of_command, + te=minimal_height_at_command_end, + ) + + async def _core96_aspiration_of_liquid( + self, + type_of_aspiration: int, + x_position: int, + y_position: int, + minimal_traverse_height_at_begin_of_command: int, + minimal_height_at_command_end: int, + lld_search_height: int, + liquid_surface_at_function_without_lld: int, + pull_out_distance_to_take_transport_air_in_function_without_lld: int, + minimum_height: int, + tube_2nd_section_height_measured_from_zm: int, + tube_2nd_section_ratio: int, + immersion_depth: int, + surface_following_distance: int, + aspiration_volume: int, + aspiration_speed: int, + transport_air_volume: int, + blow_out_air_volume: int, + pre_wetting_volume: int, + lld_mode: int, + lld_sensitivity: int, + swap_speed: int, + settling_time: int, + mix_volume: int, + mix_cycles: int, + mix_position_in_z_direction_from_liquid_surface: int, + surface_following_distance_during_mixing: int, + mix_speed: int, + limit_curve_index: int, + tadm_channel_pattern: Optional[List[bool]], + tadm_algorithm_on_off: int, + recording_mode: int, + ): + """Aspiration of liquid using 96 head (A1HM:DA).""" + if tadm_channel_pattern is None: + tadm_channel_pattern = [True] * 96 + tadm_hex = _channel_pattern_to_hex(tadm_channel_pattern) + + await self.driver.send_command( + module="A1HM", + command="DA", + at=type_of_aspiration, + xp=x_position, + yp=y_position, + th=minimal_traverse_height_at_begin_of_command, + te=minimal_height_at_command_end, + lp=lld_search_height, + zl=liquid_surface_at_function_without_lld, + po=pull_out_distance_to_take_transport_air_in_function_without_lld, + zx=minimum_height, + zu=tube_2nd_section_height_measured_from_zm, + zr=tube_2nd_section_ratio, + ip=immersion_depth, + fp=surface_following_distance, + av=aspiration_volume, + as_=aspiration_speed, + ta=transport_air_volume, + ba=blow_out_air_volume, + oa=pre_wetting_volume, + lm=lld_mode, + ll=lld_sensitivity, + de=swap_speed, + wt=settling_time, + mv=mix_volume, + mc=mix_cycles, + mp=mix_position_in_z_direction_from_liquid_surface, + mh=surface_following_distance_during_mixing, + ms=mix_speed, + gi=limit_curve_index, + cw=tadm_hex, + gj=tadm_algorithm_on_off, + gk=recording_mode, + ) + + async def _core96_dispensing_of_liquid( + self, + type_of_dispensing_mode: int, + x_position: int, + y_position: int, + minimum_height: int, + tube_2nd_section_height_measured_from_zm: int, + tube_2nd_section_ratio: int, + lld_search_height: int, + liquid_surface_at_function_without_lld: int, + pull_out_distance_to_take_transport_air_in_function_without_lld: int, + immersion_depth: int, + surface_following_distance: int, + minimal_traverse_height_at_begin_of_command: int, + minimal_height_at_command_end: int, + dispense_volume: int, + dispense_speed: int, + cut_off_speed: int, + stop_back_volume: int, + transport_air_volume: int, + blow_out_air_volume: int, + lld_mode: int, + lld_sensitivity: int, + side_touch_off_distance: int, + swap_speed: int, + settling_time: int, + mix_volume: int, + mix_cycles: int, + mix_position_in_z_direction_from_liquid_surface: int, + surface_following_distance_during_mixing: int, + mix_speed: int, + limit_curve_index: int, + tadm_channel_pattern: Optional[List[bool]], + tadm_algorithm_on_off: int, + recording_mode: int, + ): + """Dispensing of liquid using 96 head (A1HM:DD).""" + if tadm_channel_pattern is None: + tadm_channel_pattern = [True] * 96 + tadm_hex = _channel_pattern_to_hex(tadm_channel_pattern) + + await self.driver.send_command( + module="A1HM", + command="DD", + dm=type_of_dispensing_mode, + xp=x_position, + yp=y_position, + zx=minimum_height, + zu=tube_2nd_section_height_measured_from_zm, + zr=tube_2nd_section_ratio, + lp=lld_search_height, + zl=liquid_surface_at_function_without_lld, + po=pull_out_distance_to_take_transport_air_in_function_without_lld, + ip=immersion_depth, + fp=surface_following_distance, + th=minimal_traverse_height_at_begin_of_command, + te=minimal_height_at_command_end, + dv=dispense_volume, + ds=dispense_speed, + ss=cut_off_speed, + rv=stop_back_volume, + ta=transport_air_volume, + ba=blow_out_air_volume, + lm=lld_mode, + ll=lld_sensitivity, + dj=side_touch_off_distance, + de=swap_speed, + wt=settling_time, + mv=mix_volume, + mc=mix_cycles, + mp=mix_position_in_z_direction_from_liquid_surface, + mh=surface_following_distance_during_mixing, + ms=mix_speed, + gi=limit_curve_index, + cw=tadm_hex, + gj=tadm_algorithm_on_off, + gk=recording_mode, + ) diff --git a/pylabrobot/hamilton/liquid_handlers/vantage/ipg.py b/pylabrobot/hamilton/liquid_handlers/vantage/ipg.py new file mode 100644 index 00000000000..065abbad6e1 --- /dev/null +++ b/pylabrobot/hamilton/liquid_handlers/vantage/ipg.py @@ -0,0 +1,279 @@ +"""Vantage IPG (Integrated Plate Gripper) backend: translates arm operations into firmware commands.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, Optional + +from pylabrobot.capabilities.arms.backend import OrientableGripperArmBackend +from pylabrobot.capabilities.arms.standard import GripperLocation +from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.resources import Coordinate + +if TYPE_CHECKING: + from .driver import VantageDriver + + +def _direction_degrees_to_grip_orientation(degrees: float) -> int: + """Convert rotation angle in degrees to Vantage IPG grip orientation code. + + The IPG uses numeric codes 1-44 for various orientations. The primary ones used for + plate manipulation are: + 32 = front grip (default) + 11 = right grip (90 degrees) + 31 = back grip (180 degrees) + 12 = left grip (270 degrees) + """ + normalized = round(degrees) % 360 + mapping = {0: 32, 90: 11, 180: 31, 270: 12} + if normalized not in mapping: + raise ValueError(f"grip direction must be a multiple of 90 degrees, got {degrees}") + return mapping[normalized] + + +class IPGBackend(OrientableGripperArmBackend): + """Backend for the Vantage Integrated Plate Gripper (IPG). + + Implements OrientableGripperArmBackend, translating arm operations into + firmware commands on module A1RM. + """ + + def __init__(self, driver: "VantageDriver"): + self.driver = driver + self._parked: bool = False + + @property + def parked(self) -> bool: + return self._parked + + async def _on_setup(self) -> None: + pass + + async def _on_stop(self) -> None: + if not self._parked: + try: + await self.park() + except Exception: + pass + + # -- BackendParams --------------------------------------------------------- + + @dataclass + class PickUpParams(BackendParams): + grip_strength: int = 100 + open_gripper_position: int = 860 + plate_width_tolerance: int = 20 + acceleration_index: int = 4 + z_clearance_height: int = 50 + hotel_depth: int = 0 + minimal_height_at_command_end: int = 3600 + + @dataclass + class DropParams(BackendParams): + open_gripper_position: int = 860 + z_clearance_height: int = 50 + press_on_distance: int = 5 + hotel_depth: int = 0 + minimal_height_at_command_end: int = 3600 + + # -- OrientableGripperArmBackend interface --------------------------------- + + async def pick_up_at_location( + self, + location: Coordinate, + direction: float, + resource_width: float, + backend_params: Optional[BackendParams] = None, + ) -> None: + if not isinstance(backend_params, IPGBackend.PickUpParams): + backend_params = IPGBackend.PickUpParams() + + grip_orientation = _direction_degrees_to_grip_orientation(direction) + th = round(self.driver.traversal_height * 10) + + await self._ipg_prepare_gripper_orientation( + grip_orientation=grip_orientation, + minimal_traverse_height_at_begin_of_command=th, + ) + + await self._ipg_grip_plate( + x_position=round(location.x * 10), + y_position=round(location.y * 10), + z_position=round(location.z * 10), + grip_strength=backend_params.grip_strength, + open_gripper_position=backend_params.open_gripper_position, + plate_width=round(resource_width * 10), + plate_width_tolerance=backend_params.plate_width_tolerance, + acceleration_index=backend_params.acceleration_index, + z_clearance_height=backend_params.z_clearance_height, + hotel_depth=backend_params.hotel_depth, + minimal_height_at_command_end=backend_params.minimal_height_at_command_end, + ) + self._parked = False + + async def drop_at_location( + self, + location: Coordinate, + direction: float, + resource_width: float, + backend_params: Optional[BackendParams] = None, + ) -> None: + if not isinstance(backend_params, IPGBackend.DropParams): + backend_params = IPGBackend.DropParams() + + await self._ipg_put_plate( + x_position=round(location.x * 10), + y_position=round(location.y * 10), + z_position=round(location.z * 10), + open_gripper_position=backend_params.open_gripper_position, + z_clearance_height=backend_params.z_clearance_height, + hotel_depth=backend_params.hotel_depth, + minimal_height_at_command_end=backend_params.minimal_height_at_command_end, + ) + + async def move_to_location( + self, + location: Coordinate, + direction: float, + backend_params: Optional[BackendParams] = None, + ) -> None: + th = round(self.driver.traversal_height * 10) + await self._ipg_move_to_defined_position( + x_position=round(location.x * 10), + y_position=round(location.y * 10), + z_position=round(location.z * 10), + minimal_traverse_height_at_begin_of_command=th, + ) + + async def halt(self, backend_params: Optional[BackendParams] = None) -> None: + pass # No explicit halt command for IPG. + + async def park(self, backend_params: Optional[BackendParams] = None) -> None: + await self.driver.send_command(module="A1RM", command="GP") + self._parked = True + + async def request_gripper_location( + self, backend_params: Optional[BackendParams] = None + ) -> GripperLocation: + raise NotImplementedError( + "request_gripper_location is not yet implemented for the Vantage IPG. " + "The firmware response format for A1RM:QI needs to be reverse-engineered." + ) + + async def open_gripper( + self, gripper_width: float, backend_params: Optional[BackendParams] = None + ) -> None: + await self.driver.send_command(module="A1RM", command="DO") + + async def close_gripper( + self, gripper_width: float, backend_params: Optional[BackendParams] = None + ) -> None: + # Closing is handled implicitly by grip_plate with the desired width. + pass + + async def is_gripper_closed(self, backend_params: Optional[BackendParams] = None) -> bool: + return not self._parked + + # -- Initialization and status queries ------------------------------------- + + async def request_initialization_status(self) -> bool: + """Check if the IPG module is initialized (A1RM:QW).""" + resp = await self.driver.send_command(module="A1RM", command="QW", fmt={"qw": "int"}) + return resp is not None and resp["qw"] == 1 + + async def initialize(self) -> None: + """Initialize the IPG (A1RM:DI).""" + await self.driver.send_command(module="A1RM", command="DI") + + async def get_parking_status(self) -> bool: + """Check if the IPG is parked (A1RM:RG). Returns True if parked.""" + resp = await self.driver.send_command(module="A1RM", command="RG", fmt={"rg": "int"}) + parked = resp is not None and resp["rg"] == 1 + self._parked = parked + return parked + + # -- firmware commands (A1RM) ---------------------------------------------- + + async def _ipg_prepare_gripper_orientation( + self, + grip_orientation: int = 32, + minimal_traverse_height_at_begin_of_command: int = 3600, + ) -> None: + """Prepare gripper orientation (A1RM:GA).""" + await self.driver.send_command( + module="A1RM", + command="GA", + gd=grip_orientation, + th=minimal_traverse_height_at_begin_of_command, + ) + + async def _ipg_grip_plate( + self, + x_position: int, + y_position: int, + z_position: int, + grip_strength: int = 100, + open_gripper_position: int = 860, + plate_width: int = 800, + plate_width_tolerance: int = 20, + acceleration_index: int = 4, + z_clearance_height: int = 50, + hotel_depth: int = 0, + minimal_height_at_command_end: int = 3600, + ) -> None: + """Grip plate (A1RM:DG).""" + await self.driver.send_command( + module="A1RM", + command="DG", + xp=x_position, + yp=y_position, + zp=z_position, + yw=grip_strength, + yo=open_gripper_position, + yg=plate_width, + pt=plate_width_tolerance, + ai=acceleration_index, + zc=z_clearance_height, + hd=hotel_depth, + te=minimal_height_at_command_end, + ) + + async def _ipg_put_plate( + self, + x_position: int, + y_position: int, + z_position: int, + open_gripper_position: int = 860, + z_clearance_height: int = 50, + hotel_depth: int = 0, + minimal_height_at_command_end: int = 3600, + ) -> None: + """Put plate (A1RM:DR).""" + await self.driver.send_command( + module="A1RM", + command="DR", + xp=x_position, + yp=y_position, + zp=z_position, + yo=open_gripper_position, + zc=z_clearance_height, + hd=hotel_depth, + te=minimal_height_at_command_end, + ) + + async def _ipg_move_to_defined_position( + self, + x_position: int, + y_position: int, + z_position: int, + minimal_traverse_height_at_begin_of_command: int = 3600, + ) -> None: + """Move to defined position (A1RM:DN).""" + await self.driver.send_command( + module="A1RM", + command="DN", + xp=x_position, + yp=y_position, + zp=z_position, + th=minimal_traverse_height_at_begin_of_command, + ) diff --git a/pylabrobot/hamilton/liquid_handlers/vantage/loading_cover.py b/pylabrobot/hamilton/liquid_handlers/vantage/loading_cover.py new file mode 100644 index 00000000000..c85b61d7f0f --- /dev/null +++ b/pylabrobot/hamilton/liquid_handlers/vantage/loading_cover.py @@ -0,0 +1,56 @@ +"""VantageLoadingCover: loading cover control for Hamilton Vantage liquid handlers.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .driver import VantageDriver + + +class VantageLoadingCover: + """Controls the loading cover on a Hamilton Vantage. + + This is a plain helper class (not a CapabilityBackend). It encapsulates the + firmware protocol for loading cover control and delegates I/O to the driver. + """ + + def __init__(self, driver: "VantageDriver"): + self.driver = driver + + async def _on_setup(self): + pass + + async def _on_stop(self): + pass + + # -- commands (I1AM) ------------------------------------------------------- + + async def request_initialization_status(self) -> bool: + """Check if the loading cover module is initialized (I1AM:QW). + + Returns: + True if initialized, False otherwise. + """ + resp = await self.driver.send_command(module="I1AM", command="QW", fmt={"qw": "int"}) + return resp is not None and resp["qw"] == 1 + + async def initialize(self) -> None: + """Initialize the loading cover module (I1AM:MI).""" + await self.driver.send_command(module="I1AM", command="MI") + + async def set_cover(self, cover_open: bool) -> None: + """Open or close the loading cover (I1AM:LP). + + Args: + cover_open: True to open, False to close. + """ + await self.driver.send_command(module="I1AM", command="LP", lp=not cover_open) + + async def open(self) -> None: + """Open the loading cover.""" + await self.set_cover(cover_open=True) + + async def close(self) -> None: + """Close the loading cover.""" + await self.set_cover(cover_open=False) diff --git a/pylabrobot/hamilton/liquid_handlers/vantage/pip_backend.py b/pylabrobot/hamilton/liquid_handlers/vantage/pip_backend.py new file mode 100644 index 00000000000..25da9ce4f3c --- /dev/null +++ b/pylabrobot/hamilton/liquid_handlers/vantage/pip_backend.py @@ -0,0 +1,822 @@ +"""Vantage PIP backend: translates PIP operations into Vantage firmware commands.""" + +from __future__ import annotations + +import enum +import logging +from dataclasses import dataclass +from typing import TYPE_CHECKING, List, Optional, Sequence, Tuple, Union, cast + +from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.capabilities.liquid_handling.pip_backend import PIPBackend +from pylabrobot.capabilities.liquid_handling.standard import Aspiration, Dispense, Pickup, TipDrop +from pylabrobot.hamilton.lh.vantage.liquid_classes import get_vantage_liquid_class +from pylabrobot.hamilton.liquid_handlers.liquid_class import HamiltonLiquidClass +from pylabrobot.resources import Resource, Well +from pylabrobot.resources.hamilton import HamiltonTip, TipSize +from pylabrobot.resources.liquid import Liquid + +from .errors import VantageFirmwareError, convert_vantage_firmware_error_to_plr_error + +if TYPE_CHECKING: + from .driver import VantageDriver + +logger = logging.getLogger("pylabrobot") + + +# --------------------------------------------------------------------------- +# Enums +# --------------------------------------------------------------------------- + + +class LLDMode(enum.Enum): + """Liquid level detection mode.""" + + OFF = 0 + GAMMA = 1 + PRESSURE = 2 + DUAL = 3 + Z_TOUCH_OFF = 4 + + +# --------------------------------------------------------------------------- +# Utility +# --------------------------------------------------------------------------- + + +def _get_dispense_mode(jet: bool, empty: bool, blow_out: bool) -> int: + """Compute firmware dispensing mode from boolean flags. + + Firmware modes: + 0 = Partial volume in jet mode + 1 = Blow out in jet mode (labelled "empty" in VENUS) + 2 = Partial volume at surface + 3 = Blow out at surface (labelled "empty" in VENUS) + 4 = Empty tip at fix position + """ + if empty: + return 4 + if jet: + return 1 if blow_out else 0 + return 3 if blow_out else 2 + + +def _ops_to_fw_positions( + ops: Sequence[Union[Pickup, TipDrop, Aspiration, Dispense]], + use_channels: List[int], + num_channels: int, +) -> Tuple[List[int], List[int], List[bool]]: + """Convert ops + use_channels into firmware x/y positions and tip pattern. + + Uses absolute coordinates so the driver does not need a ``deck`` reference. + """ + if use_channels != sorted(use_channels): + raise ValueError("Channels must be sorted.") + + x_positions: List[int] = [] + y_positions: List[int] = [] + channels_involved: List[bool] = [] + + for i, channel in enumerate(use_channels): + while channel > len(channels_involved): + channels_involved.append(False) + x_positions.append(0) + y_positions.append(0) + channels_involved.append(True) + + loc = ops[i].resource.get_absolute_location(x="c", y="c", z="b") + x_positions.append(round((loc.x + ops[i].offset.x) * 10)) + y_positions.append(round((loc.y + ops[i].offset.y) * 10)) + + # Minimum distance check (9mm). + for idx1, (x1, y1) in enumerate(zip(x_positions, y_positions)): + for idx2, (x2, y2) in enumerate(zip(x_positions, y_positions)): + if idx1 == idx2: + continue + if not channels_involved[idx1] or not channels_involved[idx2]: + continue + if x1 != x2: + continue + if y1 != y2 and abs(y1 - y2) < 90: + raise ValueError( + f"Minimum distance between two y positions is <9mm: {y1}, {y2}" + f" (channel {idx1} and {idx2})" + ) + + if len(ops) > num_channels: + raise ValueError(f"Too many channels specified: {len(ops)} > {num_channels}") + + # Trailing padding. + if len(x_positions) < num_channels: + x_positions = x_positions + [0] + y_positions = y_positions + [0] + channels_involved = channels_involved + [False] + + return x_positions, y_positions, channels_involved + + +def _resolve_liquid_classes( + explicit: Optional[List[Optional[HamiltonLiquidClass]]], + ops: list, + jet: Union[bool, List[bool]], + blow_out: Union[bool, List[bool]], +) -> List[Optional[HamiltonLiquidClass]]: + """Resolve per-op Hamilton liquid classes. Auto-detect from tip if explicit is None.""" + n = len(ops) + if isinstance(jet, bool): + jet = [jet] * n + if isinstance(blow_out, bool): + blow_out = [blow_out] * n + + if explicit is not None: + return list(explicit) + + result: List[Optional[HamiltonLiquidClass]] = [] + for i, op in enumerate(ops): + tip = op.tip + if not isinstance(tip, HamiltonTip): + result.append(None) + continue + result.append( + get_vantage_liquid_class( + tip_volume=tip.maximal_volume, + is_core=False, + is_tip=True, + has_filter=tip.has_filter, + liquid=Liquid.WATER, + jet=jet[i], + blow_out=blow_out[i], + ) + ) + return result + + +# --------------------------------------------------------------------------- +# VantagePIPBackend +# --------------------------------------------------------------------------- + + +class VantagePIPBackend(PIPBackend): + """Translates PIP operations into Vantage firmware commands via the driver.""" + + def __init__(self, driver: "VantageDriver"): + self.driver = driver + + async def _on_setup(self): + pass + + async def _on_stop(self): + pass + + @property + def num_channels(self) -> int: + return self.driver.num_channels + + # -- BackendParams dataclasses --------------------------------------------- + + @dataclass + class PickUpTipsParams(BackendParams): + """Vantage-specific parameters for ``pick_up_tips``.""" + + minimal_traverse_height_at_begin_of_command: Optional[List[float]] = None + minimal_height_at_command_end: Optional[List[float]] = None + + @dataclass + class DropTipsParams(BackendParams): + """Vantage-specific parameters for ``drop_tips``.""" + + minimal_traverse_height_at_begin_of_command: Optional[List[float]] = None + minimal_height_at_command_end: Optional[List[float]] = None + + @dataclass + class AspirateParams(BackendParams): + """Vantage-specific parameters for ``aspirate``.""" + + jet: Optional[List[bool]] = None + blow_out: Optional[List[bool]] = None + hlcs: Optional[List[Optional[HamiltonLiquidClass]]] = None + type_of_aspiration: Optional[List[int]] = None + minimal_traverse_height_at_begin_of_command: Optional[List[float]] = None + minimal_height_at_command_end: Optional[List[float]] = None + lld_search_height: Optional[List[float]] = None + clot_detection_height: Optional[List[float]] = None + liquid_surface_at_function_without_lld: Optional[List[float]] = None + pull_out_distance_to_take_transport_air_in_function_without_lld: Optional[List[float]] = None + tube_2nd_section_height_measured_from_zm: Optional[List[float]] = None + tube_2nd_section_ratio: Optional[List[float]] = None + minimum_height: Optional[List[float]] = None + immersion_depth: Optional[List[float]] = None + surface_following_distance: Optional[List[float]] = None + transport_air_volume: Optional[List[float]] = None + pre_wetting_volume: Optional[List[float]] = None + lld_mode: Optional[List[int]] = None + lld_sensitivity: Optional[List[int]] = None + pressure_lld_sensitivity: Optional[List[int]] = None + aspirate_position_above_z_touch_off: Optional[List[float]] = None + swap_speed: Optional[List[float]] = None + settling_time: Optional[List[float]] = None + capacitive_mad_supervision_on_off: Optional[List[int]] = None + pressure_mad_supervision_on_off: Optional[List[int]] = None + tadm_algorithm_on_off: int = 0 + limit_curve_index: Optional[List[int]] = None + recording_mode: int = 0 + disable_volume_correction: Optional[List[bool]] = None + + @dataclass + class DispenseParams(BackendParams): + """Vantage-specific parameters for ``dispense``.""" + + jet: Optional[List[bool]] = None + blow_out: Optional[List[bool]] = None + empty: Optional[List[bool]] = None + hlcs: Optional[List[Optional[HamiltonLiquidClass]]] = None + type_of_dispensing_mode: Optional[List[int]] = None + minimal_traverse_height_at_begin_of_command: Optional[List[float]] = None + minimal_height_at_command_end: Optional[List[float]] = None + lld_search_height: Optional[List[float]] = None + minimum_height: Optional[List[float]] = None + pull_out_distance_to_take_transport_air_in_function_without_lld: Optional[List[float]] = None + immersion_depth: Optional[List[float]] = None + surface_following_distance: Optional[List[float]] = None + tube_2nd_section_height_measured_from_zm: Optional[List[float]] = None + tube_2nd_section_ratio: Optional[List[float]] = None + cut_off_speed: Optional[List[float]] = None + stop_back_volume: Optional[List[float]] = None + transport_air_volume: Optional[List[float]] = None + lld_mode: Optional[List[int]] = None + side_touch_off_distance: float = 0 + dispense_position_above_z_touch_off: Optional[List[float]] = None + lld_sensitivity: Optional[List[int]] = None + pressure_lld_sensitivity: Optional[List[int]] = None + swap_speed: Optional[List[float]] = None + settling_time: Optional[List[float]] = None + tadm_algorithm_on_off: int = 0 + limit_curve_index: Optional[List[int]] = None + recording_mode: int = 0 + disable_volume_correction: Optional[List[bool]] = None + + # -- PIPBackend interface: pick_up_tips ------------------------------------ + + async def pick_up_tips( + self, + ops: List[Pickup], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ): + if not isinstance(backend_params, VantagePIPBackend.PickUpTipsParams): + backend_params = VantagePIPBackend.PickUpTipsParams() + + x_positions, y_positions, tip_pattern = _ops_to_fw_positions( + ops, use_channels, self.num_channels + ) + + tips = [cast(HamiltonTip, op.resource.get_tip()) for op in ops] + ttti = [await self.driver.request_or_assign_tip_type_index(tip) for tip in tips] + + max_z = max(op.resource.get_absolute_location(z="b").z + op.offset.z for op in ops) + max_total_tip_length = max(op.tip.total_tip_length for op in ops) + max_tip_length = max((op.tip.total_tip_length - op.tip.fitting_depth) for op in ops) + + # Tip size adjustments (from legacy, confirmed by experiments). + proto_tip = self.driver._get_hamilton_tip([op.resource for op in ops]) + if proto_tip.tip_size == TipSize.LOW_VOLUME: + max_tip_length += 2 + elif proto_tip.tip_size != TipSize.STANDARD_VOLUME: + max_tip_length -= 2 + + th = self.driver.traversal_height + mth = backend_params.minimal_traverse_height_at_begin_of_command + mhe = backend_params.minimal_height_at_command_end + + try: + await self._pip_tip_pick_up( + x_position=x_positions, + y_position=y_positions, + tip_pattern=tip_pattern, + tip_type=ttti, + begin_z_deposit_position=[round((max_z + max_total_tip_length) * 10)] * len(ops), + end_z_deposit_position=[round((max_z + max_tip_length) * 10)] * len(ops), + minimal_traverse_height_at_begin_of_command=[round(t * 10) for t in mth or [th]] * len(ops), + minimal_height_at_command_end=[round(t * 10) for t in mhe or [th]] * len(ops), + tip_handling_method=[1] * len(ops), + blow_out_air_volume=[0] * len(ops), + ) + except VantageFirmwareError as e: + plr_error = convert_vantage_firmware_error_to_plr_error(e) + raise plr_error if plr_error is not None else e + + # -- PIPBackend interface: drop_tips --------------------------------------- + + async def drop_tips( + self, + ops: List[TipDrop], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ): + if not isinstance(backend_params, VantagePIPBackend.DropTipsParams): + backend_params = VantagePIPBackend.DropTipsParams() + + x_positions, y_positions, channels_involved = _ops_to_fw_positions( + ops, use_channels, self.num_channels + ) + + max_z = max(op.resource.get_absolute_location(z="b").z + op.offset.z for op in ops) + th = self.driver.traversal_height + mth = backend_params.minimal_traverse_height_at_begin_of_command + mhe = backend_params.minimal_height_at_command_end + + try: + await self._pip_tip_discard( + x_position=x_positions, + y_position=y_positions, + tip_pattern=channels_involved, + begin_z_deposit_position=[round((max_z + 10) * 10)] * len(ops), + end_z_deposit_position=[round(max_z * 10)] * len(ops), + minimal_traverse_height_at_begin_of_command=[round(t * 10) for t in mth or [th]] * len(ops), + minimal_height_at_command_end=[round(t * 10) for t in mhe or [th]] * len(ops), + tip_handling_method=[0] * len(ops), + ) + except VantageFirmwareError as e: + plr_error = convert_vantage_firmware_error_to_plr_error(e) + raise plr_error if plr_error is not None else e + + # -- safety checks ---------------------------------------------------------- + + @staticmethod + def _assert_valid_resources(resources: List[Resource]) -> None: + """Assert that resources are not too low for safe pipetting.""" + for resource in resources: + if resource.get_absolute_location(z="b").z < 100: + raise ValueError( + f"Resource {resource} is too low: {resource.get_absolute_location(z='b').z} < 100" + ) + + # -- PIPBackend interface: aspirate ---------------------------------------- + + async def aspirate( + self, + ops: List[Aspiration], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ): + if not isinstance(backend_params, VantagePIPBackend.AspirateParams): + backend_params = VantagePIPBackend.AspirateParams() + + self._assert_valid_resources([op.resource for op in ops]) + + x_positions, y_positions, channels_involved = _ops_to_fw_positions( + ops, use_channels, self.num_channels + ) + + jet = backend_params.jet or [False] * len(ops) + blow_out = backend_params.blow_out or [False] * len(ops) + hlcs = _resolve_liquid_classes(backend_params.hlcs, ops, jet, blow_out) + + # Volume correction. + disable_vc = backend_params.disable_volume_correction or [False] * len(ops) + volumes = [ + hlc.compute_corrected_volume(op.volume) if hlc is not None and not disabled else op.volume + for op, hlc, disabled in zip(ops, hlcs, disable_vc) + ] + + well_bottoms = [ + op.resource.get_absolute_location(z="b").z + op.offset.z + op.resource.material_z_thickness + for op in ops + ] + liquid_surfaces_no_lld = backend_params.liquid_surface_at_function_without_lld or [ + wb + (op.liquid_height or 0) for wb, op in zip(well_bottoms, ops) + ] + lld_search_heights = backend_params.lld_search_height or [ + wb + op.resource.get_absolute_size_z() + (1.7 if isinstance(op.resource, Well) else 5) + for wb, op in zip(well_bottoms, ops) + ] + + flow_rates = [ + op.flow_rate or (hlc.aspiration_flow_rate if hlc is not None else 100) + for op, hlc in zip(ops, hlcs) + ] + blow_out_air_volumes = [ + op.blow_out_air_volume or (hlc.dispense_blow_out_volume if hlc is not None else 0) + for op, hlc in zip(ops, hlcs) + ] + + th = self.driver.traversal_height + mth = backend_params.minimal_traverse_height_at_begin_of_command + mhe = backend_params.minimal_height_at_command_end + + try: + await self._pip_aspirate( + x_position=x_positions, + y_position=y_positions, + type_of_aspiration=backend_params.type_of_aspiration or [0] * len(ops), + tip_pattern=channels_involved, + minimal_traverse_height_at_begin_of_command=[round(t * 10) for t in mth or [th]] * len(ops), + minimal_height_at_command_end=[round(t * 10) for t in mhe or [th]] * len(ops), + lld_search_height=[round(ls * 10) for ls in lld_search_heights], + clot_detection_height=[ + round(cdh * 10) for cdh in backend_params.clot_detection_height or [0] * len(ops) + ], + liquid_surface_at_function_without_lld=[round(lsn * 10) for lsn in liquid_surfaces_no_lld], + pull_out_distance_to_take_transport_air_in_function_without_lld=[ + round(pod * 10) + for pod in backend_params.pull_out_distance_to_take_transport_air_in_function_without_lld + or [10.9] * len(ops) + ], + tube_2nd_section_height_measured_from_zm=[ + round(t * 10) + for t in backend_params.tube_2nd_section_height_measured_from_zm or [0] * len(ops) + ], + tube_2nd_section_ratio=[ + round(t * 10) for t in backend_params.tube_2nd_section_ratio or [0] * len(ops) + ], + minimum_height=[round(wb * 10) for wb in backend_params.minimum_height or well_bottoms], + immersion_depth=[round(d * 10) for d in backend_params.immersion_depth or [0] * len(ops)], + surface_following_distance=[ + round(d * 10) for d in backend_params.surface_following_distance or [0] * len(ops) + ], + aspiration_volume=[round(vol * 100) for vol in volumes], + aspiration_speed=[round(fr * 10) for fr in flow_rates], + transport_air_volume=[ + round(tav * 10) + for tav in backend_params.transport_air_volume + or [hlc.aspiration_air_transport_volume if hlc is not None else 0 for hlc in hlcs] + ], + blow_out_air_volume=[round(bav * 100) for bav in blow_out_air_volumes], + pre_wetting_volume=[ + round(pwv * 100) for pwv in backend_params.pre_wetting_volume or [0] * len(ops) + ], + lld_mode=backend_params.lld_mode or [0] * len(ops), + lld_sensitivity=backend_params.lld_sensitivity or [4] * len(ops), + pressure_lld_sensitivity=backend_params.pressure_lld_sensitivity or [4] * len(ops), + aspirate_position_above_z_touch_off=[ + round(apz * 10) + for apz in backend_params.aspirate_position_above_z_touch_off or [0.5] * len(ops) + ], + swap_speed=[round(ss * 10) for ss in backend_params.swap_speed or [2] * len(ops)], + settling_time=[round(st * 10) for st in backend_params.settling_time or [1] * len(ops)], + mix_volume=[round(op.mix.volume * 100) if op.mix is not None else 0 for op in ops], + mix_cycles=[op.mix.repetitions if op.mix is not None else 0 for op in ops], + mix_position_in_z_direction_from_liquid_surface=[0] * len(ops), + mix_speed=[round(op.mix.flow_rate * 10) if op.mix is not None else 2500 for op in ops], + surface_following_distance_during_mixing=[0] * len(ops), + capacitive_mad_supervision_on_off=( + backend_params.capacitive_mad_supervision_on_off or [0] * len(ops) + ), + pressure_mad_supervision_on_off=( + backend_params.pressure_mad_supervision_on_off or [0] * len(ops) + ), + tadm_algorithm_on_off=backend_params.tadm_algorithm_on_off, + limit_curve_index=backend_params.limit_curve_index or [0] * len(ops), + recording_mode=backend_params.recording_mode, + ) + except VantageFirmwareError as e: + plr_error = convert_vantage_firmware_error_to_plr_error(e) + raise plr_error if plr_error is not None else e + + # -- PIPBackend interface: dispense ---------------------------------------- + + async def dispense( + self, + ops: List[Dispense], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ): + if not isinstance(backend_params, VantagePIPBackend.DispenseParams): + backend_params = VantagePIPBackend.DispenseParams() + + self._assert_valid_resources([op.resource for op in ops]) + + x_positions, y_positions, channels_involved = _ops_to_fw_positions( + ops, use_channels, self.num_channels + ) + + jet = backend_params.jet or [False] * len(ops) + empty = backend_params.empty or [False] * len(ops) + blow_out = backend_params.blow_out or [False] * len(ops) + hlcs = _resolve_liquid_classes(backend_params.hlcs, ops, jet, blow_out) + + # Volume correction. + disable_vc = backend_params.disable_volume_correction or [False] * len(ops) + volumes = [ + hlc.compute_corrected_volume(op.volume) if hlc is not None and not disabled else op.volume + for op, hlc, disabled in zip(ops, hlcs, disable_vc) + ] + + well_bottoms = [ + op.resource.get_absolute_location(z="b").z + op.offset.z + op.resource.material_z_thickness + for op in ops + ] + liquid_surfaces_no_lld = [wb + (op.liquid_height or 0) for wb, op in zip(well_bottoms, ops)] + lld_search_heights = backend_params.lld_search_height or [ + wb + op.resource.get_absolute_size_z() + (1.7 if isinstance(op.resource, Well) else 5) + for wb, op in zip(well_bottoms, ops) + ] + + flow_rates = [ + op.flow_rate or (hlc.dispense_flow_rate if hlc is not None else 100) + for op, hlc in zip(ops, hlcs) + ] + blow_out_air_volumes = [ + op.blow_out_air_volume or (hlc.dispense_blow_out_volume if hlc is not None else 0) + for op, hlc in zip(ops, hlcs) + ] + + type_of_dispensing_mode = backend_params.type_of_dispensing_mode or [ + _get_dispense_mode(jet=jet[i], empty=empty[i], blow_out=blow_out[i]) for i in range(len(ops)) + ] + + th = self.driver.traversal_height + mth = backend_params.minimal_traverse_height_at_begin_of_command + mhe = backend_params.minimal_height_at_command_end + + try: + await self._pip_dispense( + x_position=x_positions, + y_position=y_positions, + tip_pattern=channels_involved, + type_of_dispensing_mode=type_of_dispensing_mode, + minimum_height=[round(wb * 10) for wb in backend_params.minimum_height or well_bottoms], + lld_search_height=[round(sh * 10) for sh in lld_search_heights], + liquid_surface_at_function_without_lld=[round(ls * 10) for ls in liquid_surfaces_no_lld], + pull_out_distance_to_take_transport_air_in_function_without_lld=[ + round(pod * 10) + for pod in backend_params.pull_out_distance_to_take_transport_air_in_function_without_lld + or [5.0] * len(ops) + ], + immersion_depth=[round(d * 10) for d in backend_params.immersion_depth or [0] * len(ops)], + surface_following_distance=[ + round(d * 10) for d in backend_params.surface_following_distance or [2.1] * len(ops) + ], + tube_2nd_section_height_measured_from_zm=[ + round(t * 10) + for t in backend_params.tube_2nd_section_height_measured_from_zm or [0] * len(ops) + ], + tube_2nd_section_ratio=[ + round(t * 10) for t in backend_params.tube_2nd_section_ratio or [0] * len(ops) + ], + minimal_traverse_height_at_begin_of_command=[round(t * 10) for t in mth or [th]] * len(ops), + minimal_height_at_command_end=[round(t * 10) for t in mhe or [th]] * len(ops), + dispense_volume=[round(vol * 100) for vol in volumes], + dispense_speed=[round(fr * 10) for fr in flow_rates], + cut_off_speed=[round(cs * 10) for cs in backend_params.cut_off_speed or [250] * len(ops)], + stop_back_volume=[ + round(sbv * 100) for sbv in backend_params.stop_back_volume or [0] * len(ops) + ], + transport_air_volume=[ + round(tav * 10) + for tav in backend_params.transport_air_volume + or [hlc.dispense_air_transport_volume if hlc is not None else 0 for hlc in hlcs] + ], + blow_out_air_volume=[round(boav * 100) for boav in blow_out_air_volumes], + lld_mode=backend_params.lld_mode or [0] * len(ops), + side_touch_off_distance=round(backend_params.side_touch_off_distance * 10), + dispense_position_above_z_touch_off=[ + round(dpz * 10) + for dpz in backend_params.dispense_position_above_z_touch_off or [0.5] * len(ops) + ], + lld_sensitivity=backend_params.lld_sensitivity or [1] * len(ops), + pressure_lld_sensitivity=backend_params.pressure_lld_sensitivity or [1] * len(ops), + swap_speed=[round(ss * 10) for ss in backend_params.swap_speed or [1] * len(ops)], + settling_time=[round(st * 10) for st in backend_params.settling_time or [0] * len(ops)], + mix_volume=[round(op.mix.volume * 100) if op.mix is not None else 0 for op in ops], + mix_cycles=[op.mix.repetitions if op.mix is not None else 0 for op in ops], + mix_position_in_z_direction_from_liquid_surface=[0] * len(ops), + mix_speed=[round(op.mix.flow_rate * 100) if op.mix is not None else 10 for op in ops], + surface_following_distance_during_mixing=[0] * len(ops), + tadm_algorithm_on_off=backend_params.tadm_algorithm_on_off, + limit_curve_index=backend_params.limit_curve_index or [0] * len(ops), + recording_mode=backend_params.recording_mode, + ) + except VantageFirmwareError as e: + plr_error = convert_vantage_firmware_error_to_plr_error(e) + raise plr_error if plr_error is not None else e + + # -- tip presence ---------------------------------------------------------- + + async def request_tip_presence(self) -> List[Optional[bool]]: + presences = await self.driver.query_tip_presence() + return [bool(p) for p in presences] + + # -- firmware commands (A1PM) ---------------------------------------------- + + async def _pip_tip_pick_up( + self, + x_position: List[int], + y_position: List[int], + tip_pattern: List[bool], + tip_type: List[int], + begin_z_deposit_position: List[int], + end_z_deposit_position: List[int], + minimal_traverse_height_at_begin_of_command: List[int], + minimal_height_at_command_end: List[int], + tip_handling_method: List[int], + blow_out_air_volume: List[int], + ): + """Tip pick up (A1PM:TP).""" + await self.driver.send_command( + module="A1PM", + command="TP", + tip_pattern=tip_pattern, + xp=x_position, + yp=y_position, + tm=tip_pattern, + tt=tip_type, + tp=begin_z_deposit_position, + tz=end_z_deposit_position, + th=minimal_traverse_height_at_begin_of_command, + te=minimal_height_at_command_end, + ba=blow_out_air_volume, + td=tip_handling_method, + ) + + async def _pip_tip_discard( + self, + x_position: List[int], + y_position: List[int], + tip_pattern: List[bool], + begin_z_deposit_position: List[int], + end_z_deposit_position: List[int], + minimal_traverse_height_at_begin_of_command: List[int], + minimal_height_at_command_end: List[int], + tip_handling_method: List[int], + ts: int = 0, + ): + """Tip discard (A1PM:TR).""" + await self.driver.send_command( + module="A1PM", + command="TR", + tip_pattern=tip_pattern, + xp=x_position, + yp=y_position, + tp=begin_z_deposit_position, + tz=end_z_deposit_position, + th=minimal_traverse_height_at_begin_of_command, + te=minimal_height_at_command_end, + tm=tip_pattern, + ts=ts, + td=tip_handling_method, + ) + + async def _pip_aspirate( + self, + x_position: List[int], + y_position: List[int], + type_of_aspiration: List[int], + tip_pattern: List[bool], + minimal_traverse_height_at_begin_of_command: List[int], + minimal_height_at_command_end: List[int], + lld_search_height: List[int], + clot_detection_height: List[int], + liquid_surface_at_function_without_lld: List[int], + pull_out_distance_to_take_transport_air_in_function_without_lld: List[int], + tube_2nd_section_height_measured_from_zm: List[int], + tube_2nd_section_ratio: List[int], + minimum_height: List[int], + immersion_depth: List[int], + surface_following_distance: List[int], + aspiration_volume: List[int], + aspiration_speed: List[int], + transport_air_volume: List[int], + blow_out_air_volume: List[int], + pre_wetting_volume: List[int], + lld_mode: List[int], + lld_sensitivity: List[int], + pressure_lld_sensitivity: List[int], + aspirate_position_above_z_touch_off: List[int], + swap_speed: List[int], + settling_time: List[int], + mix_volume: List[int], + mix_cycles: List[int], + mix_position_in_z_direction_from_liquid_surface: List[int], + mix_speed: List[int], + surface_following_distance_during_mixing: List[int], + capacitive_mad_supervision_on_off: List[int], + pressure_mad_supervision_on_off: List[int], + tadm_algorithm_on_off: int = 0, + limit_curve_index: Optional[List[int]] = None, + recording_mode: int = 0, + ): + """Aspiration of liquid (A1PM:DA).""" + await self.driver.send_command( + module="A1PM", + command="DA", + tip_pattern=tip_pattern, + at=type_of_aspiration, + tm=tip_pattern, + xp=x_position, + yp=y_position, + th=minimal_traverse_height_at_begin_of_command, + te=minimal_height_at_command_end, + lp=lld_search_height, + ch=clot_detection_height, + zl=liquid_surface_at_function_without_lld, + po=pull_out_distance_to_take_transport_air_in_function_without_lld, + zu=tube_2nd_section_height_measured_from_zm, + zr=tube_2nd_section_ratio, + zx=minimum_height, + ip=immersion_depth, + fp=surface_following_distance, + av=aspiration_volume, + as_=aspiration_speed, + ta=transport_air_volume, + ba=blow_out_air_volume, + oa=pre_wetting_volume, + lm=lld_mode, + ll=lld_sensitivity, + lv=pressure_lld_sensitivity, + zo=aspirate_position_above_z_touch_off, + de=swap_speed, + wt=settling_time, + mv=mix_volume, + mc=mix_cycles, + mp=mix_position_in_z_direction_from_liquid_surface, + ms=mix_speed, + mh=surface_following_distance_during_mixing, + la=[0] * len(x_position), + lb=capacitive_mad_supervision_on_off, + lc=pressure_mad_supervision_on_off, + gj=tadm_algorithm_on_off, + gi=limit_curve_index or [0] * len(x_position), + gk=recording_mode, + ) + + async def _pip_dispense( + self, + x_position: List[int], + y_position: List[int], + tip_pattern: List[bool], + type_of_dispensing_mode: List[int], + minimum_height: List[int], + lld_search_height: List[int], + liquid_surface_at_function_without_lld: List[int], + pull_out_distance_to_take_transport_air_in_function_without_lld: List[int], + immersion_depth: List[int], + surface_following_distance: List[int], + tube_2nd_section_height_measured_from_zm: List[int], + tube_2nd_section_ratio: List[int], + minimal_traverse_height_at_begin_of_command: List[int], + minimal_height_at_command_end: List[int], + dispense_volume: List[int], + dispense_speed: List[int], + cut_off_speed: List[int], + stop_back_volume: List[int], + transport_air_volume: List[int], + blow_out_air_volume: List[int], + lld_mode: List[int], + side_touch_off_distance: int, + dispense_position_above_z_touch_off: List[int], + lld_sensitivity: List[int], + pressure_lld_sensitivity: List[int], + swap_speed: List[int], + settling_time: List[int], + mix_volume: List[int], + mix_cycles: List[int], + mix_position_in_z_direction_from_liquid_surface: List[int], + mix_speed: List[int], + surface_following_distance_during_mixing: List[int], + tadm_algorithm_on_off: int = 0, + limit_curve_index: Optional[List[int]] = None, + recording_mode: int = 0, + ): + """Dispensing of liquid (A1PM:DD).""" + await self.driver.send_command( + module="A1PM", + command="DD", + tip_pattern=tip_pattern, + dm=type_of_dispensing_mode, + tm=tip_pattern, + xp=x_position, + yp=y_position, + zx=minimum_height, + lp=lld_search_height, + zl=liquid_surface_at_function_without_lld, + po=pull_out_distance_to_take_transport_air_in_function_without_lld, + ip=immersion_depth, + fp=surface_following_distance, + zu=tube_2nd_section_height_measured_from_zm, + zr=tube_2nd_section_ratio, + th=minimal_traverse_height_at_begin_of_command, + te=minimal_height_at_command_end, + dv=[f"{vol:04}" for vol in dispense_volume], + ds=dispense_speed, + ss=cut_off_speed, + rv=stop_back_volume, + ta=transport_air_volume, + ba=blow_out_air_volume, + lm=lld_mode, + dj=side_touch_off_distance, + zo=dispense_position_above_z_touch_off, + ll=lld_sensitivity, + lv=pressure_lld_sensitivity, + de=swap_speed, + wt=settling_time, + mv=mix_volume, + mc=mix_cycles, + mp=mix_position_in_z_direction_from_liquid_surface, + ms=mix_speed, + mh=surface_following_distance_during_mixing, + la=[0] * len(x_position), + gj=tadm_algorithm_on_off, + gi=limit_curve_index or [0] * len(x_position), + gk=recording_mode, + ) diff --git a/pylabrobot/hamilton/liquid_handlers/vantage/tests/__init__.py b/pylabrobot/hamilton/liquid_handlers/vantage/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/pylabrobot/hamilton/liquid_handlers/vantage/tests/test_errors.py b/pylabrobot/hamilton/liquid_handlers/vantage/tests/test_errors.py new file mode 100644 index 00000000000..c562f91c425 --- /dev/null +++ b/pylabrobot/hamilton/liquid_handlers/vantage/tests/test_errors.py @@ -0,0 +1,75 @@ +"""Tests for Vantage error handling.""" + +import unittest + +from pylabrobot.capabilities.liquid_handling.errors import ChannelizedError +from pylabrobot.hamilton.liquid_handlers.vantage.errors import ( + VantageFirmwareError, + convert_vantage_firmware_error_to_plr_error, + vantage_response_string_to_error, +) +from pylabrobot.resources.errors import HasTipError, NoTipError + + +class TestVantageResponseStringToError(unittest.TestCase): + def test_pip_error(self): + error_str = 'A1PMDAid1234er1es"P175"' + error = vantage_response_string_to_error(error_str) + self.assertIsInstance(error, VantageFirmwareError) + self.assertIn("Pipetting channel 1", error.errors) + self.assertEqual(error.errors["Pipetting channel 1"], "No tip picked up") + + def test_core96_error(self): + error_str = 'A1HMDAid1234er1es"H075"' + error = vantage_response_string_to_error(error_str) + self.assertIsInstance(error, VantageFirmwareError) + self.assertIn("Core 96", error.errors) + self.assertEqual(error.errors["Core 96"], "No tip picked up") + + def test_et_format_error(self): + error_str = 'A1PMDAid1234et"some error text"' + error = vantage_response_string_to_error(error_str) + self.assertIsInstance(error, VantageFirmwareError) + self.assertIn("Pip", error.errors) + self.assertEqual(error.errors["Pip"], "some error text") + + def test_error_equality(self): + e1 = VantageFirmwareError({"ch": "test"}, "raw") + e2 = VantageFirmwareError({"ch": "test"}, "raw") + self.assertEqual(e1, e2) + + def test_error_str(self): + e = VantageFirmwareError({"ch": "test"}, "raw") + self.assertIn("VantageFirmwareError", str(e)) + + +class TestConvertToPLRError(unittest.TestCase): + def test_tip_already_picked_up(self): + error = VantageFirmwareError( + {"Pipetting channel 1": "Tip already picked up"}, + "raw", + ) + result = convert_vantage_firmware_error_to_plr_error(error) + assert isinstance(result, ChannelizedError) + self.assertIsInstance(result.errors[0], HasTipError) + + def test_no_tip_picked_up(self): + error = VantageFirmwareError( + {"Pipetting channel 1": "No tip picked up"}, + "raw", + ) + result = convert_vantage_firmware_error_to_plr_error(error) + assert isinstance(result, ChannelizedError) + self.assertIsInstance(result.errors[0], NoTipError) + + def test_non_channel_error_returns_none(self): + error = VantageFirmwareError( + {"Core 96": "No tip picked up"}, + "raw", + ) + result = convert_vantage_firmware_error_to_plr_error(error) + self.assertIsNone(result) + + +if __name__ == "__main__": + unittest.main() diff --git a/pylabrobot/hamilton/liquid_handlers/vantage/tests/test_fw_parsing.py b/pylabrobot/hamilton/liquid_handlers/vantage/tests/test_fw_parsing.py new file mode 100644 index 00000000000..487bdbcedda --- /dev/null +++ b/pylabrobot/hamilton/liquid_handlers/vantage/tests/test_fw_parsing.py @@ -0,0 +1,45 @@ +"""Tests for Vantage firmware response parsing.""" + +import unittest + +from pylabrobot.hamilton.liquid_handlers.vantage.fw_parsing import parse_vantage_fw_string + + +class TestParseVantageFWString(unittest.TestCase): + def test_parse_id(self): + result = parse_vantage_fw_string("A1PMDAid1234") + self.assertEqual(result["id"], 1234) + + def test_parse_int(self): + result = parse_vantage_fw_string("A1PMDAid0qw1", {"qw": "int"}) + self.assertEqual(result["id"], 0) + self.assertEqual(result["qw"], 1) + + def test_parse_str(self): + result = parse_vantage_fw_string('id0es"error string"', {"es": "str"}) + self.assertEqual(result["es"], "error string") + + def test_parse_int_list(self): + result = parse_vantage_fw_string("id0xs30 -100 +1 1000", {"xs": "[int]"}) + self.assertEqual(result["id"], 0) + self.assertEqual(result["xs"], [30, -100, 1, 1000]) + + def test_parse_hex(self): + result = parse_vantage_fw_string("id0cwFF", {"cw": "hex"}) + self.assertEqual(result["cw"], 255) + + def test_invalid_fmt_type(self): + with self.assertRaises(TypeError): + parse_vantage_fw_string("id0", "invalid") # type: ignore + + def test_unknown_data_type(self): + with self.assertRaises(ValueError): + parse_vantage_fw_string("id0foo1", {"foo": "unknown"}) + + def test_no_match_raises(self): + with self.assertRaises(ValueError): + parse_vantage_fw_string("id0", {"qw": "int"}) + + +if __name__ == "__main__": + unittest.main() diff --git a/pylabrobot/hamilton/liquid_handlers/vantage/vantage.py b/pylabrobot/hamilton/liquid_handlers/vantage/vantage.py new file mode 100644 index 00000000000..9240e3eabef --- /dev/null +++ b/pylabrobot/hamilton/liquid_handlers/vantage/vantage.py @@ -0,0 +1,61 @@ +"""Vantage device: wires VantageDriver backends to PIP/Head96/IPG capability frontends.""" + +from typing import Optional + +from pylabrobot.capabilities.arms.orientable_arm import OrientableArm +from pylabrobot.capabilities.liquid_handling.head96 import Head96 +from pylabrobot.capabilities.liquid_handling.pip import PIP +from pylabrobot.device import Device +from pylabrobot.resources.hamilton.vantage_decks import VantageDeck + +from .chatterbox import VantageChatterboxDriver +from .driver import VantageDriver + + +class Vantage(Device): + """Hamilton Vantage liquid handler. + + User-facing device that wires capability frontends (PIP, Head96, IPG) to the + VantageDriver's backends after hardware discovery during setup(). + """ + + def __init__(self, deck: VantageDeck, chatterbox: bool = False): + driver = VantageChatterboxDriver() if chatterbox else VantageDriver() + super().__init__(driver=driver) + self.driver: VantageDriver = driver + self.deck = deck + self.pip: PIP # set in setup() + self.head96: Optional[Head96] = None # set in setup() if installed + self.ipg: Optional[OrientableArm] = None # set in setup() if installed + + async def setup(self): + await self.driver.setup() + + # PIP is always present. + assert self.driver.pip is not None + self.pip = PIP(backend=self.driver.pip) + self._capabilities = [self.pip] + + # Head96 only if the hardware has a 96-head installed. + if self.driver.head96 is not None: + self.head96 = Head96(backend=self.driver.head96) + self._capabilities.append(self.head96) + + # IPG only if installed. + if self.driver.ipg is not None: + self.ipg = OrientableArm(backend=self.driver.ipg, reference_resource=self.deck) + self._capabilities.append(self.ipg) + + for cap in self._capabilities: + await cap._on_setup() + self._setup_finished = True + + async def stop(self): + if not self._setup_finished: + return + for cap in reversed(self._capabilities): + await cap._on_stop() + await self.driver.stop() + self._setup_finished = False + self.head96 = None + self.ipg = None diff --git a/pylabrobot/hamilton/liquid_handlers/vantage/x_arm.py b/pylabrobot/hamilton/liquid_handlers/vantage/x_arm.py new file mode 100644 index 00000000000..16b8ac37f1f --- /dev/null +++ b/pylabrobot/hamilton/liquid_handlers/vantage/x_arm.py @@ -0,0 +1,125 @@ +"""VantageXArm: X-arm positioning control for Hamilton Vantage liquid handlers.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .driver import VantageDriver + + +class VantageXArm: + """Controls the X-arm on a Hamilton Vantage. + + This is a plain helper class (not a CapabilityBackend). It encapsulates the + firmware protocol for X-arm positioning and delegates I/O to the driver. + """ + + def __init__(self, driver: "VantageDriver"): + self.driver = driver + + async def _on_setup(self): + pass + + async def _on_stop(self): + pass + + # -- commands (A1XM) ------------------------------------------------------- + + async def initialize(self) -> None: + """Initialize the X-arm (A1XM:XI).""" + await self.driver.send_command(module="A1XM", command="XI") + + async def move_to_x_position( + self, + x_position: int = 5000, + x_speed: int = 25000, + ) -> None: + """Move arm to X position (A1XM:XP). + + Args: + x_position: X position [0.1mm]. Range -50000 to 50000. + x_speed: X speed [0.1mm/s]. Range 1 to 25000. + """ + if not -50000 <= x_position <= 50000: + raise ValueError("x_position must be in range -50000 to 50000") + if not 1 <= x_speed <= 25000: + raise ValueError("x_speed must be in range 1 to 25000") + + await self.driver.send_command(module="A1XM", command="XP", xp=x_position, xv=x_speed) + + async def move_to_x_position_safe( + self, + x_position: int = 5000, + x_speed: int = 25000, + xx: int = 1, + ) -> None: + """Move arm to X position with all attached components in Z-safety (A1XM:XA). + + Args: + x_position: X position [0.1mm]. + x_speed: X speed [0.1mm/s]. + xx: Unknown parameter. + """ + if not -50000 <= x_position <= 50000: + raise ValueError("x_position must be in range -50000 to 50000") + if not 1 <= x_speed <= 25000: + raise ValueError("x_speed must be in range 1 to 25000") + + await self.driver.send_command(module="A1XM", command="XA", xp=x_position, xv=x_speed, xx=xx) + + async def move_relatively( + self, + x_search_distance: int = 0, + x_speed: int = 25000, + xx: int = 1, + ) -> None: + """Move arm relatively in X (A1XM:XS). + + Args: + x_search_distance: X search distance [0.1mm]. + x_speed: X speed [0.1mm/s]. + xx: Unknown parameter. + """ + if not -50000 <= x_search_distance <= 50000: + raise ValueError("x_search_distance must be in range -50000 to 50000") + if not 1 <= x_speed <= 25000: + raise ValueError("x_speed must be in range 1 to 25000") + + await self.driver.send_command( + module="A1XM", command="XS", xs=x_search_distance, xv=x_speed, xx=xx + ) + + async def search_teach_signal( + self, + x_search_distance: int = 0, + x_speed: int = 25000, + xx: int = 1, + ) -> None: + """Search X for teach signal (A1XM:XT). + + Args: + x_search_distance: X search distance [0.1mm]. + x_speed: X speed [0.1mm/s]. + xx: Unknown parameter. + """ + if not -50000 <= x_search_distance <= 50000: + raise ValueError("x_search_distance must be in range -50000 to 50000") + if not 1 <= x_speed <= 25000: + raise ValueError("x_speed must be in range 1 to 25000") + + await self.driver.send_command( + module="A1XM", command="XT", xs=x_search_distance, xv=x_speed, xx=xx + ) + + async def turn_off(self) -> None: + """Turn X drive off (A1XM:XO).""" + await self.driver.send_command(module="A1XM", command="XO") + + async def request_position(self): + """Request arm X position (A1XM:RX).""" + return await self.driver.send_command(module="A1XM", command="RX") + + async def request_error_code(self): + """Request X-arm error code (A1XM:RE).""" + return await self.driver.send_command(module="A1XM", command="RE") From 2ddcdf4a3c77a24b90340305d9c23a525e01b19b Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Thu, 2 Apr 2026 19:25:36 -0700 Subject: [PATCH 2/3] Legacy VantageBackend delegates to new VantageDriver Follows the same pattern as STARBackend: the legacy class creates a VantageDriver in __init__, delegates send_command/setup/stop to it, and exposes typed property accessors for the new subsystems. All 21 legacy tests pass unchanged. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../backends/hamilton/vantage_backend.py | 138 ++++++++++++------ 1 file changed, 97 insertions(+), 41 deletions(-) diff --git a/pylabrobot/legacy/liquid_handling/backends/hamilton/vantage_backend.py b/pylabrobot/legacy/liquid_handling/backends/hamilton/vantage_backend.py index 07c02e4fc0c..a34968af2bb 100644 --- a/pylabrobot/legacy/liquid_handling/backends/hamilton/vantage_backend.py +++ b/pylabrobot/legacy/liquid_handling/backends/hamilton/vantage_backend.py @@ -374,9 +374,61 @@ def __init__( serial_number=serial_number, ) + from pylabrobot.hamilton.liquid_handlers.vantage.driver import VantageDriver + + self.driver = VantageDriver( + device_address=device_address, + serial_number=serial_number, + packet_read_timeout=packet_read_timeout, + read_timeout=read_timeout, + write_timeout=write_timeout, + ) + self._iswap_parked: Optional[bool] = None self._num_channels: Optional[int] = None self._traversal_height: float = 245.0 + self._setup_done = False + + # -- property accessors for new-arch subsystems ---------------------------- + + @property + def _vantage_pip(self): + """Typed access to the Vantage PIP backend.""" + return self.driver.pip + + @property + def _vantage_head96(self): + """Typed access to the Head96 backend.""" + assert self.driver.head96 is not None, "96-head is not installed" + return self.driver.head96 + + @property + def _vantage_ipg(self): + """Typed access to the IPG backend.""" + assert self.driver.ipg is not None, "IPG is not installed" + return self.driver.ipg + + @property + def _vantage_x_arm(self): + """Typed access to the X-arm.""" + assert self.driver.x_arm is not None, "X-arm is not available" + return self.driver.x_arm + + @property + def _vantage_loading_cover(self): + """Typed access to the loading cover.""" + assert self.driver.loading_cover is not None, "Loading cover is not available" + return self.driver.loading_cover + + @property + def _write_and_read_command(self): + return self.driver._write_and_read_command + + @_write_and_read_command.setter + def _write_and_read_command(self, value): + self.driver._write_and_read_command = value # type: ignore[method-assign] + + # -- HamiltonLiquidHandler abstract methods (delegate to driver) ----------- @property def module_id_length(self) -> int: @@ -400,6 +452,34 @@ def _parse_response(self, resp: str, fmt: Dict[str, str]) -> dict: """Parse a firmware response.""" return parse_vantage_fw_string(resp, fmt) + # -- send_command delegation ----------------------------------------------- + + async def send_command( + self, + module, + command, + auto_id=True, + tip_pattern=None, + write_timeout=None, + read_timeout=None, + wait=True, + fmt=None, + **kwargs, + ): + return await self.driver.send_command( + module=module, + command=command, + auto_id=auto_id, + tip_pattern=tip_pattern, + write_timeout=write_timeout, + read_timeout=read_timeout, + wait=wait, + fmt=fmt, + **kwargs, + ) + + # -- lifecycle (delegate to driver) ---------------------------------------- + async def setup( self, skip_loading_cover: bool = False, @@ -408,50 +488,26 @@ async def setup( ): """Creates a USB connection and finds read/write interfaces.""" - await super().setup() - - tip_presences = await self.query_tip_presence() - self._num_channels = len(tip_presences) - - arm_initialized = await self.arm_request_instrument_initialization_status() - if not arm_initialized: - await self.arm_pre_initialize() - - # TODO: check which modules are actually installed. + # Let the driver own the USB connection and perform hardware discovery. + await self.driver.setup( + skip_loading_cover=skip_loading_cover, + skip_core96=skip_core96, + skip_ipg=skip_ipg, + ) - pip_channels_initialized = await self.pip_request_initialization_status() - if not pip_channels_initialized or any(tip_presences): - await self.pip_initialize( - x_position=[7095] * self.num_channels, - y_position=[3891, 3623, 3355, 3087, 2819, 2551, 2283, 2016], - begin_z_deposit_position=[int(self._traversal_height * 10)] * self.num_channels, - end_z_deposit_position=[1235] * self.num_channels, - minimal_height_at_command_end=[int(self._traversal_height * 10)] * self.num_channels, - tip_pattern=[True] * self.num_channels, - tip_type=[1] * self.num_channels, - TODO_DI_2=70, - ) + # Sync legacy state from driver. + self.id_ = 0 + self._num_channels = self.driver.num_channels + self._traversal_height = self.driver.traversal_height + self._setup_done = True - loading_cover_initialized = await self.loading_cover_request_initialization_status() - if not loading_cover_initialized and not skip_loading_cover: - await self.loading_cover_initialize() - - core96_initialized = await self.core96_request_initialization_status() - if not core96_initialized and not skip_core96: - await self.core96_initialize( - x_position=7347, # TODO: get trash location from deck. - y_position=2684, # TODO: get trash location from deck. - minimal_traverse_height_at_begin_of_command=int(self._traversal_height * 10), - minimal_height_at_command_end=int(self._traversal_height * 10), - end_z_deposit_position=2420, - ) + async def stop(self): + await self.driver.stop() + self._setup_done = False - if not skip_ipg: - ipg_initialized = await self.ipg_request_initialization_status() - if not ipg_initialized: - await self.ipg_initialize() - if not await self.ipg_get_parking_status(): - await self.ipg_park() + @property + def setup_done(self) -> bool: + return self._setup_done @property def num_channels(self) -> int: From 649c5c3529bd2192d61d6ab15633253e53841d57 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Thu, 2 Apr 2026 19:29:41 -0700 Subject: [PATCH 3/3] Remove duplicated error codes/parser from legacy VantageBackend Replace ~275 lines of inline definitions (error dicts, VantageFirmwareError, parse_vantage_fw_string, vantage_response_string_to_error) with re-exports from the new vantage modules. Existing imports continue to work. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../backends/hamilton/vantage_backend.py | 287 +----------------- 1 file changed, 11 insertions(+), 276 deletions(-) diff --git a/pylabrobot/legacy/liquid_handling/backends/hamilton/vantage_backend.py b/pylabrobot/legacy/liquid_handling/backends/hamilton/vantage_backend.py index a34968af2bb..dc446533a5c 100644 --- a/pylabrobot/legacy/liquid_handling/backends/hamilton/vantage_backend.py +++ b/pylabrobot/legacy/liquid_handling/backends/hamilton/vantage_backend.py @@ -1,6 +1,5 @@ import asyncio import random -import re import sys import warnings from typing import Dict, List, Optional, Sequence, Union, cast @@ -48,281 +47,17 @@ from typing_extensions import Literal -def parse_vantage_fw_string(s: str, fmt: Optional[Dict[str, str]] = None) -> dict: - """Parse a Vantage firmware string into a dict. - - The identifier parameter (id) is added automatically. - - `fmt` is a dict that specifies the format of the string. The keys are the parameter names and the - values are the types. The following types are supported: - - - `"int"`: a single integer - - `"str"`: a string - - `"[int]"`: a list of integers - - `"hex"`: a hexadecimal number - - Example: - >>> parse_fw_string("id0xs30 -100 +1 1000", {"id": "int", "x": "[int]"}) - {"id": 0, "x": [30, -100, 1, 1000]} - - >>> parse_fw_string("es\"error string\"", {"es": "str"}) - {"es": "error string"} - """ - - parsed: dict = {} - - if fmt is None: - fmt = {} - - if not isinstance(fmt, dict): - raise TypeError(f"invalid fmt for fmt: expected dict, got {type(fmt)}") - - if "id" not in fmt: - fmt["id"] = "int" - - for key, data_type in fmt.items(): - if data_type == "int": - matches = re.findall(rf"{key}([-+]?\d+)", s) - if len(matches) != 1: - raise ValueError(f"Expected exactly one match for {key} in {s}") - parsed[key] = int(matches[0]) - elif data_type == "str": - matches = re.findall(rf"{key}\"(.*)\"", s) - if len(matches) != 1: - raise ValueError(f"Expected exactly one match for {key} in {s}") - parsed[key] = matches[0] - elif data_type == "[int]": - matches = re.findall(rf"{key}((?:[-+]?[\d ]+)+)", s) - if len(matches) != 1: - raise ValueError(f"Expected exactly one match for {key} in {s}") - parsed[key] = [int(x) for x in matches[0].split()] - elif data_type == "hex": - matches = re.findall(rf"{key}([0-9a-fA-F]+)", s) - if len(matches) != 1: - raise ValueError(f"Expected exactly one match for {key} in {s}") - parsed[key] = int(matches[0], 16) - else: - raise ValueError(f"Unknown data type {data_type}") - - return parsed - - -core96_errors = { - 0: "No error", - 21: "No communication to digital potentiometer", - 25: "Wrong Flash EPROM data", - 26: "Flash EPROM not programmable", - 27: "Flash EPROM not erasable", - 28: "Flash EPROM checksum error", - 29: "Wrong FW loaded", - 30: "Undefined command", - 31: "Undefined parameter", - 32: "Parameter out of range", - 35: "Voltages out of range", - 36: "Stop during command execution", - 37: "Adjustment sensor didn't switch (no teach in signal)", - 40: "No parallel processes on level 1 permitted", - 41: "No parallel processes on level 2 permitted", - 42: "No parallel processes on level 3 permitted", - 50: "Dispensing drive initialization failed", - 51: "Dispensing drive not initialized", - 52: "Dispensing drive movement error", - 53: "Maximum volume in tip reached", - 54: "Dispensing drive position out of permitted area", - 55: "Y drive initialization failed", - 56: "Y drive not initialized", - 57: "Y drive movement error", - 58: "Y drive position out of permitted area", - 60: "Z drive initialization failed", - 61: "Z drive not initialized", - 62: "Z drive movement error", - 63: "Z drive position out of permitted area", - 65: "Squeezer drive initialization failed", - 66: "Squeezer drive not initialized", - 67: "Squeezer drive movement error", - 68: "Squeezer drive position out of permitted area", - 70: "No liquid level found", - 71: "Not enough liquid present", - 75: "No tip picked up", - 76: "Tip already picked up", - 81: "Clot detected with LLD sensor", - 82: "TADM measurement out of lower limit curve", - 83: "TADM measurement out of upper limit curve", - 84: "Not enough memory for TADM measurement", - 90: "Limit curve not resettable", - 91: "Limit curve not programmable", - 92: "Limit curve name not found", - 93: "Limit curve data incorrect", - 94: "Not enough memory for limit curve", - 95: "Not allowed limit curve index", - 96: "Limit curve already stored", -} - -pip_errors = { - 22: "Drive controller message error", - 23: "EC drive controller setup not executed", - 25: "wrong Flash EPROM data", - 26: "Flash EPROM not programmable", - 27: "Flash EPROM not erasable", - 28: "Flash EPROM checksum error", - 29: "wrong FW loaded", - 30: "Undefined command", - 31: "Undefined parameter", - 32: "Parameter out of range", - 35: "Voltages out of range", - 36: "Stop during command execution", - 37: "Adjustment sensor didn't switch (no teach in signal)", - 38: "Movement interrupted by partner channel", - 39: "Angle alignment offset error", - 40: "No parallel processes on level 1 permitted", - 41: "No parallel processes on level 2 permitted", - 42: "No parallel processes on level 3 permitted", - 50: "D drive initialization failed", - 51: "D drive not initialized", - 52: "D drive movement error", - 53: "Maximum volume in tip reached", - 54: "D drive position out of permitted area", - 55: "Y drive initialization failed", - 56: "Y drive not initialized", - 57: "Y drive movement error", - 58: "Y drive position out of permitted area", - 59: "Divergance Y motion controller to linear encoder to height", - 60: "Z drive initialization failed", - 61: "Z drive not initialized", - 62: "Z drive movement error", - 63: "Z drive position out of permitted area", - 64: "Limit stop not found", - 65: "S drive initialization failed", - 66: "S drive not initialized", - 67: "S drive movement error", - 68: "S drive position out of permitted area", - 69: "Init. position adjustment error", - 70: "No liquid level found", - 71: "Not enough liquid present", - 74: "Liquid at a not allowed position detected", - 75: "No tip picked up", - 76: "Tip already picked up", - 77: "Tip not discarded", - 78: "Wrong tip detected", - 79: "Tip not correct squeezed", - 80: "Liquid not correctly aspirated", - 81: "Clot detected", - 82: "TADM measurement out of lower limit curve", - 83: "TADM measurement out of upper limit curve", - 84: "Not enough memory for TADM measurement", - 85: "Jet dispense pressure not reached", - 86: "ADC algorithm error", - 90: "Limit curve not resettable", - 91: "Limit curve not programmable", - 92: "Limit curve name not found", - 93: "Limit curve data incorrect", - 94: "Not enough memory for limit curve", - 95: "Not allowed limit curve index", - 96: "Limit curve already stored", -} - -ipg_errors = { - 0: "No error", - 22: "Drive controller message error", - 23: "EC drive controller setup not executed", - 25: "Wrong Flash EPROM data", - 26: "Flash EPROM not programmable", - 27: "Flash EPROM not erasable", - 28: "Flash EPROM checksum error", - 29: "Wrong FW loaded", - 30: "Undefined command", - 31: "Undefined parameter", - 32: "Parameter out of range", - 35: "Voltages out of range", - 36: "Stop during command execution", - 37: "Adjustment sensor didn't switch (no teach in signal)", - 39: "Angle alignment offset error", - 40: "No parallel processes on level 1 permitted", - 41: "No parallel processes on level 2 permitted", - 42: "No parallel processes on level 3 permitted", - 50: "Y Drive initialization failed", - 51: "Y Drive not initialized", - 52: "Y Drive movement error", - 53: "Y Drive position out of permitted area", - 54: "Diff. motion controller and lin. encoder counter too high", - 55: "Z Drive initialization failed", - 56: "Z Drive not initialized", - 57: "Z Drive movement error", - 58: "Z Drive position out of permitted area", - 59: "Z Drive limit stop not found", - 60: "Rotation Drive initialization failed", - 61: "Rotation Drive not initialized", - 62: "Rotation Drive movement error", - 63: "Rotation Drive position out of permitted area", - 65: "Wrist Twist Drive initialization failed", - 66: "Wrist Twist Drive not initialized", - 67: "Wrist Twist Drive movement error", - 68: "Wrist Twist Drive position out of permitted area", - 70: "Gripper Drive initialization failed", - 71: "Gripper Drive not initialized", - 72: "Gripper Drive movement error", - 73: "Gripper Drive position out of permitted area", - 80: "Plate not found", - 81: "Plate is still held", - 82: "No plate is held", -} - - -class VantageFirmwareError(Exception): - def __init__(self, errors, raw_response): - self.errors = errors - self.raw_response = raw_response - - def __str__(self): - return f"VantageFirmwareError(errors={self.errors}, raw_response={self.raw_response})" - - def __eq__(self, __value: object) -> bool: - return ( - isinstance(__value, VantageFirmwareError) - and self.errors == __value.errors - and self.raw_response == __value.raw_response - ) - - -def vantage_response_string_to_error( - string: str, -) -> VantageFirmwareError: - """Convert a Vantage firmware response string to a VantageFirmwareError. Assumes that the - response is an error response.""" - - try: - error_format = r"[A-Z0-9]{2}[0-9]{2}" - error_string = parse_vantage_fw_string(string, {"es": "str"})["es"] - error_codes = re.findall(error_format, error_string) - errors = {} - num_channels = 16 - for error in error_codes: - module, error_code = error[:2], error[2:] - error_code = int(error_code) - for channel in range(1, num_channels + 1): - if module == f"P{channel}": - errors[f"Pipetting channel {channel}"] = pip_errors.get(error_code, "Unknown error") - elif module in ("H0", "HM"): - errors["Core 96"] = core96_errors.get(error_code, "Unknown error") - elif module == "RM": - errors["IPG"] = ipg_errors.get(error_code, "Unknown error") - elif module == "AM": - errors["Cover"] = "Unknown error" - except ValueError: - module_id = string[:4] - module = modules = { - "I1AM": "Cover", - "C0AM": "Master", - "A1PM": "Pip", - "A1HM": "Core 96", - "A1RM": "IPG", - "A1AM": "Arm", - "A1XM": "X-arm", - }.get(module_id, "Unknown module") - error_string = parse_vantage_fw_string(string, {"et": "str"})["et"] - errors = {modules: error_string} - - return VantageFirmwareError(errors, string) +# Re-export from the new implementation. These used to be defined inline here. +from pylabrobot.hamilton.liquid_handlers.vantage.errors import ( # noqa: F401 + VantageFirmwareError, + core96_errors, + ipg_errors, + pip_errors, + vantage_response_string_to_error, +) +from pylabrobot.hamilton.liquid_handlers.vantage.fw_parsing import ( # noqa: F401 + parse_vantage_fw_string, +) def _get_dispense_mode(jet: bool, empty: bool, blow_out: bool) -> Literal[0, 1, 2, 3, 4]: