diff --git a/_typos.toml b/_typos.toml index 742efae9454..6b8623a2f3d 100644 --- a/_typos.toml +++ b/_typos.toml @@ -28,6 +28,7 @@ mis = "mis" RHE = "RHE" "ASEND" = "ASEND" caf = "caf" +Occured = "Occured" [files] extend-exclude = [ diff --git a/pylabrobot/dispensing/__init__.py b/pylabrobot/dispensing/__init__.py new file mode 100644 index 00000000000..d830708659e --- /dev/null +++ b/pylabrobot/dispensing/__init__.py @@ -0,0 +1,3 @@ +from .backend import DispenserBackend +from .dispenser import Dispenser +from .standard import DispenseOp diff --git a/pylabrobot/dispensing/backend.py b/pylabrobot/dispensing/backend.py new file mode 100644 index 00000000000..2da50272070 --- /dev/null +++ b/pylabrobot/dispensing/backend.py @@ -0,0 +1,37 @@ +"""Abstract backend interface for chip-based contactless liquid dispensers.""" + +from __future__ import annotations + +from abc import ABCMeta, abstractmethod +from typing import List + +from pylabrobot.machines.backend import MachineBackend + +from .standard import DispenseOp + + +class DispenserBackend(MachineBackend, metaclass=ABCMeta): + """Abstract class for a chip-based contactless liquid dispenser backend. + + Subclasses must implement :meth:`setup`, :meth:`stop`, and :meth:`dispense`. + """ + + @abstractmethod + async def setup(self) -> None: + """Set up the dispenser (connect, home, initialize pressure, etc.).""" + + @abstractmethod + async def stop(self) -> None: + """Shut down the dispenser and release all resources. + + After calling this, :meth:`setup` should be callable again. + """ + + @abstractmethod + async def dispense(self, ops: List[DispenseOp], **backend_kwargs) -> None: + """Dispense liquid into the specified wells. + + Args: + ops: A list of :class:`DispenseOp` describing each dispense target. + **backend_kwargs: Additional keyword arguments specific to the backend. + """ diff --git a/pylabrobot/dispensing/chatterbox.py b/pylabrobot/dispensing/chatterbox.py new file mode 100644 index 00000000000..ef8921db125 --- /dev/null +++ b/pylabrobot/dispensing/chatterbox.py @@ -0,0 +1,21 @@ +"""Chatterbox backend for device-free testing of dispensers.""" + +from typing import List + +from pylabrobot.dispensing.backend import DispenserBackend +from pylabrobot.dispensing.standard import DispenseOp + + +class DispenserChatterboxBackend(DispenserBackend): + """Chatterbox backend for device-free testing. Prints all operations.""" + + async def setup(self) -> None: + print("Setting up the dispenser.") + + async def stop(self) -> None: + print("Stopping the dispenser.") + + async def dispense(self, ops: List[DispenseOp], **backend_kwargs) -> None: + for op in ops: + chip_str = f" (chip {op.chip})" if op.chip is not None else "" + print(f"Dispensing {op.volume:.2f} µL into {op.resource.name}{chip_str}") diff --git a/pylabrobot/dispensing/dispenser.py b/pylabrobot/dispensing/dispenser.py new file mode 100644 index 00000000000..75fcb7bb6e8 --- /dev/null +++ b/pylabrobot/dispensing/dispenser.py @@ -0,0 +1,68 @@ +"""Front-end for chip-based contactless liquid dispensers.""" + +from __future__ import annotations + +import logging +from typing import List, Optional, Sequence, Union + +from pylabrobot.machines.machine import Machine, need_setup_finished +from pylabrobot.resources import Well + +from .backend import DispenserBackend +from .standard import DispenseOp + +logger = logging.getLogger(__name__) + + +class Dispenser(Machine): + """Front-end for chip-based contactless liquid dispensers. + + Dispensers use disposable silicon chips with microvalves and pressure-driven + dispensing to deliver nanoliter-to-microliter volumes into microplate wells + without contacting the liquid. + + Example:: + + >>> from pylabrobot.dispensing.mantis import MantisBackend + >>> d = Dispenser(backend=MantisBackend(serial_number="M-000438")) + >>> await d.setup() + >>> await d.dispense(plate["A1:H12"], volume=5.0, chip=3) + >>> await d.stop() + """ + + def __init__(self, backend: DispenserBackend) -> None: + super().__init__(backend=backend) + self.backend: DispenserBackend = backend # fix type for IDE + + @need_setup_finished + async def dispense( + self, + resources: Union[Well, Sequence[Well]], + volume: float, + chip: Optional[int] = None, + **backend_kwargs, + ) -> None: + """Dispense liquid into target wells. + + Args: + resources: Target well(s) to dispense into. + volume: Volume in µL to dispense per well. + chip: Chip number to use (1-6). If ``None``, the backend selects automatically. + **backend_kwargs: Additional keyword arguments passed to the backend. + + Raises: + RuntimeError: If setup has not been called. + ValueError: If *volume* is not positive. + """ + if isinstance(resources, Well): + resources = [resources] + + if volume <= 0: + raise ValueError(f"Volume must be positive, got {volume}") + + ops: List[DispenseOp] = [ + DispenseOp(resource=well, volume=volume, chip=chip) for well in resources + ] + + logger.info("Dispensing %.2f µL into %d well(s)", volume, len(ops)) + await self.backend.dispense(ops, **backend_kwargs) diff --git a/pylabrobot/dispensing/dispenser_tests.py b/pylabrobot/dispensing/dispenser_tests.py new file mode 100644 index 00000000000..f944be2ef44 --- /dev/null +++ b/pylabrobot/dispensing/dispenser_tests.py @@ -0,0 +1,79 @@ +"""Tests for the Dispenser front-end.""" + +import unittest +from unittest.mock import AsyncMock + +from pylabrobot.dispensing.backend import DispenserBackend +from pylabrobot.dispensing.dispenser import Dispenser +from pylabrobot.resources import Cor_96_wellplate_360ul_Fb + + +class MockDispenserBackend(DispenserBackend): + """Mock backend for testing.""" + + async def setup(self) -> None: + pass + + async def stop(self) -> None: + pass + + async def dispense(self, ops, **backend_kwargs) -> None: + pass + + +class TestDispenser(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self) -> None: + self.backend = AsyncMock(spec=MockDispenserBackend) + self.dispenser = Dispenser(backend=self.backend) + await self.dispenser.setup() + + async def asyncTearDown(self) -> None: + await self.dispenser.stop() + + async def test_dispense_single_well(self): + plate = Cor_96_wellplate_360ul_Fb(name="test_plate") + await self.dispenser.dispense(plate["A1"][0], volume=5.0, chip=3) + self.backend.dispense.assert_called_once() + ops = self.backend.dispense.call_args[0][0] + self.assertEqual(len(ops), 1) + self.assertEqual(ops[0].volume, 5.0) + self.assertEqual(ops[0].chip, 3) + + async def test_dispense_multiple_wells(self): + plate = Cor_96_wellplate_360ul_Fb(name="test_plate") + wells = plate["A1"] + plate["A2"] + plate["B1"] + await self.dispenser.dispense(wells, volume=10.0) + self.backend.dispense.assert_called_once() + ops = self.backend.dispense.call_args[0][0] + self.assertEqual(len(ops), 3) + for op in ops: + self.assertEqual(op.volume, 10.0) + self.assertIsNone(op.chip) + + async def test_dispense_negative_volume_raises(self): + plate = Cor_96_wellplate_360ul_Fb(name="test_plate") + with self.assertRaises(ValueError): + await self.dispenser.dispense(plate["A1"][0], volume=-1.0) + + async def test_dispense_zero_volume_raises(self): + plate = Cor_96_wellplate_360ul_Fb(name="test_plate") + with self.assertRaises(ValueError): + await self.dispenser.dispense(plate["A1"][0], volume=0.0) + + async def test_dispense_before_setup_raises(self): + backend = AsyncMock(spec=MockDispenserBackend) + dispenser = Dispenser(backend=backend) + plate = Cor_96_wellplate_360ul_Fb(name="test_plate") + with self.assertRaises(RuntimeError): + await dispenser.dispense(plate["A1"][0], volume=5.0) + + async def test_backend_kwargs_forwarded(self): + plate = Cor_96_wellplate_360ul_Fb(name="test_plate") + await self.dispenser.dispense(plate["A1"][0], volume=5.0, custom_param="value") + self.backend.dispense.assert_called_once() + kwargs = self.backend.dispense.call_args[1] + self.assertEqual(kwargs["custom_param"], "value") + + +if __name__ == "__main__": + unittest.main() diff --git a/pylabrobot/dispensing/mantis/__init__.py b/pylabrobot/dispensing/mantis/__init__.py new file mode 100644 index 00000000000..ce295f21a15 --- /dev/null +++ b/pylabrobot/dispensing/mantis/__init__.py @@ -0,0 +1 @@ +from .mantis_backend import MantisBackend diff --git a/pylabrobot/dispensing/mantis/fmlx_driver.py b/pylabrobot/dispensing/mantis/fmlx_driver.py new file mode 100644 index 00000000000..da6a44d014b --- /dev/null +++ b/pylabrobot/dispensing/mantis/fmlx_driver.py @@ -0,0 +1,642 @@ +"""Low-level Formulatrix (FMLX) protocol driver for the Mantis dispenser. + +Handles packet construction, checksum calculation, command sending/receiving, +and asynchronous event dispatch over an FTDI serial link. +""" + +from __future__ import annotations + +import asyncio +import logging +import struct +from typing import Any, Callable, Dict, List, Optional, Tuple + +from pylabrobot.io.ftdi import FTDI + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Packet +# --------------------------------------------------------------------------- + + +class FmlxPacket: + """A single FMLX protocol packet (command or response).""" + + HEADER_SIZE = 12 + CHECKSUM_SIZE = 2 + + def __init__( + self, + opcode: int, + packet_id: int = 0, + address: int = 0, + sequence_id: int = 0, + ) -> None: + self.opcode = opcode + self.packet_id = packet_id + self.address = address + self.sequence_id = sequence_id + self.reserved = 0 + self.data = bytearray() + + # -- payload builders (fluent API) -- + + def add_int16(self, value: int) -> "FmlxPacket": + self.data.extend(struct.pack(" "FmlxPacket": + self.data.extend(struct.pack(" "FmlxPacket": + self.data.extend(struct.pack(" "FmlxPacket": + self.add_uint16(1 if value else 0) + return self + + def add_double(self, value: float) -> "FmlxPacket": + self.data.extend(struct.pack(" "FmlxPacket": + encoded = value.encode("utf-16-le") + self.data.extend(encoded) + self.data.extend(b"\x00\x00") + return self + + # -- serialisation -- + + @staticmethod + def calculate_checksum(raw_bytes: bytes) -> int: + checksum = 0 + for i in range(0, len(raw_bytes), 2): + val = raw_bytes[i] + if i + 1 < len(raw_bytes): + val |= raw_bytes[i + 1] << 8 + checksum ^= val + return checksum + + def to_bytes(self) -> bytes: + size_val = self.HEADER_SIZE + len(self.data) + header = struct.pack( + "<6H", + size_val, + self.sequence_id, + self.packet_id, + self.reserved, + self.address, + self.opcode, + ) + content = header + self.data + checksum = self.calculate_checksum(content) + return content + struct.pack(" Tuple[bool, int]: + val = struct.unpack_from(" Tuple[str, int]: + end = offset + while end < len(data) - 1: + if data[end] == 0 and data[end + 1] == 0: + break + end += 2 + s = data[offset:end].decode("utf-16-le", errors="ignore") + return s, end + 2 + + +# --------------------------------------------------------------------------- +# Event / opcode name maps +# --------------------------------------------------------------------------- + +EVENT_NAMES: Dict[int, str] = { + 512: "MotionStarted", + 513: "MoveDone", + 514: "HomeDone", + 515: "MotorErrorOccured", + 768: "BottlesChanged", + 769: "SensorAlarm", + 784: "SequenceProgress", + 785: "SequenceStopped", + 801: "InputChanged", +} + +OPCODE_NAMES_ADDR0: Dict[int, str] = { + 1: "GetVersion", + 2: "GetName", + 10: "GetMotorLimits", + 11: "SetMotorLimits", + 12: "GetMotorCurrents", + 13: "SetMotorCurrents", + 14: "GetMotorConfig", + 15: "SetMotorConfig", + 17: "ClearMotorFaults", + 20: "GetMotorStatus", + 21: "Home", + 22: "MoveAbsolute", + 27: "GetMotorPosition", + 28: "SetMotorPosition", + 41: "WritePPI", + 50: "IsSensorEnabled", + 52: "GetSensorLimits", + 55: "SetExtendedInputMask", + 61: "ClearSequencer", + 62: "StartSequencer", + 65: "QueueWritePPI", + 68: "QueueMoveItem", + 82: "GetFollowingErrorConfig", +} + +OPCODE_NAMES_ADDR10: Dict[int, str] = { + 1: "P_GetVersion", + 10: "P_GetTargetPressure", + 11: "P_SetTargetPressure", + 12: "P_GetControllerEnabled", + 13: "P_SetControllerEnabled", + 14: "P_GetPumpOn", + 15: "P_SetPumpOn", + 16: "P_GetStatus", + 20: "P_GetFeedbackSensorParams", + 21: "P_SetFeedbackSensorParams", + 23: "P_SetPidParams", + 25: "P_SetSettlingCriteria", + 30: "P_ReadFeedbackSensor", + 32: "P_SetProportionalValve", + 34: "P_SetSolenoidValve", + 41: "P_GetAux", + 42: "P_SetAux", +} + + +# --------------------------------------------------------------------------- +# Response / event decoders +# --------------------------------------------------------------------------- + + +def decode_response(req_opcode: int, req_addr: int, status: int, data: bytes) -> Dict[str, Any]: + """Decode a raw FMLX response payload into a dictionary.""" + if status < 0: + return {"error": status, "data": data.hex()} + try: + if req_opcode in (1, 2): + s, _ = _decode_string(data, 0) + return {"value": s} + if req_addr == 0: + if req_opcode in (10, 12, 13): + cnt = len(data) // 8 + return {"values": struct.unpack(f"<{cnt}d", data)} + if req_opcode == 14: + enb, _ = _decode_bool(data, 0) + vals = struct.unpack_from("= 28 else None + return {"enabled": enb, "pid": vals, "usteps": usteps} + if req_opcode == 20: + return {"status": struct.unpack(" Dict[str, Any]: + """Decode a raw FMLX event payload into a dictionary.""" + name = EVENT_NAMES.get(event_code, f"Event:{event_code}") + res: Dict[str, Any] = {"event": name, "code": event_code, "addr": addr} + try: + if event_code == 512: + res["motor"] = struct.unpack(" None: + self._ftdi = ftdi + self._pkt_counter = 0 + self._seq_counter = 1 + self._pending: Dict[int, Tuple[asyncio.Future, int, int]] = {} + self._event_waiters: List[Tuple[asyncio.Future, Callable[[Dict], bool]]] = [] + self._buffer = bytearray() + self._read_task: Optional[asyncio.Task] = None + self.on_event: Optional[Callable[[Dict[str, Any]], None]] = None + + # -- sequence id management -- + + def next_seq_id(self) -> int: + sid = self._seq_counter + self._seq_counter = (self._seq_counter % 32767) + 1 + return sid + + # -- high-level queue helpers -- + + async def queue_write_ppi( + self, duration: int, addr: int, values: List[int], timeout: float = 5.0 + ) -> int: + sid = self.next_seq_id() + await self.send_command(cmd_queue_write_ppi(sid, duration, addr, values), timeout=timeout) + return sid + + async def queue_move_item( + self, + rel: bool, + wait: bool, + pva_triplets: List[List[float]], + timeout: float = 5.0, + ) -> int: + sid = self.next_seq_id() + await self.send_command(cmd_queue_move_item(sid, rel, wait, pva_triplets), timeout=timeout) + return sid + + # -- connection lifecycle -- + + async def connect(self) -> None: + await self._ftdi.setup() + await self._ftdi.set_baudrate(115200) + await self._ftdi.set_line_property(8, 1, 0) + await self._ftdi.set_flowctrl(0x100) + await self._ftdi.usb_purge_rx_buffer() + await self._ftdi.usb_purge_tx_buffer() + self._read_task = asyncio.create_task(self._read_loop()) + + async def disconnect(self) -> None: + if self._read_task: + self._read_task.cancel() + try: + await self._read_task + except asyncio.CancelledError: + pass + await self._ftdi.stop() + + # -- read loop -- + + async def _read_loop(self) -> None: + try: + while True: + data = await self._ftdi.read(1024) + if data: + self._buffer.extend(data) + self._process_buffer() + await asyncio.sleep(0.01) + except asyncio.CancelledError: + raise + except Exception: + logger.exception("FMLX read loop error") + + def _process_buffer(self) -> None: + while len(self._buffer) >= 14: + size = struct.unpack_from(" 526: + self._buffer.pop(0) + continue + if len(self._buffer) < size + 2: + break + + packet_bytes = bytes(self._buffer[: size + 2]) + del self._buffer[: size + 2] + + header = struct.unpack("<6H", packet_bytes[:12]) + _p_size, _seq_id, pkt_id, _rsrv, addr, opcode = header + data = packet_bytes[12:_p_size] + + if opcode >= 256: # event + decoded = decode_event(opcode, addr, data) + self._dispatch_event(decoded) + else: # response + if pkt_id in self._pending: + fut, req_op, req_addr = self._pending.pop(pkt_id) + decoded = decode_response(req_op, req_addr, opcode, data) + if not fut.done(): + fut.set_result(decoded) + + def _dispatch_event(self, decoded: Dict[str, Any]) -> None: + if self.on_event: + self.on_event(decoded) + for fut, condition in list(self._event_waiters): + if not fut.done() and condition(decoded): + fut.set_result(decoded) + + # -- command send -- + + async def send_command(self, packet: FmlxPacket, timeout: float = 5.0) -> Dict[str, Any]: + """Send a command packet and wait for the response.""" + pkt_id = self._pkt_counter + self._pkt_counter = (self._pkt_counter + 1) & 0xFFFF + packet.packet_id = pkt_id + + fut = asyncio.get_running_loop().create_future() + self._pending[pkt_id] = (fut, packet.opcode, packet.address) + + await self._ftdi.write(packet.to_bytes()) + + try: + return await asyncio.wait_for(fut, timeout) + except asyncio.TimeoutError: + self._pending.pop(pkt_id, None) + raise + + # -- event waiting -- + + async def wait_for_event( + self, condition: Callable[[Dict], bool], timeout: float = 30.0 + ) -> Dict[str, Any]: + """Block until an event matching *condition* arrives or *timeout* elapses.""" + fut = asyncio.get_running_loop().create_future() + waiter = (fut, condition) + self._event_waiters.append(waiter) + try: + return await asyncio.wait_for(fut, timeout) + finally: + if waiter in self._event_waiters: + self._event_waiters.remove(waiter) + + +# --------------------------------------------------------------------------- +# Dispense device commands (address 0) +# --------------------------------------------------------------------------- + + +def cmd_get_version() -> FmlxPacket: + return FmlxPacket(1, address=0) + + +def cmd_get_name() -> FmlxPacket: + return FmlxPacket(2, address=0) + + +def cmd_get_motor_limits(motor_id: int) -> FmlxPacket: + return FmlxPacket(10, address=0).add_int16(motor_id) + + +def cmd_set_motor_limits(motor_id: int, lower: float, upper: float) -> FmlxPacket: + return FmlxPacket(11, address=0).add_int16(motor_id).add_double(lower).add_double(upper) + + +def cmd_get_motor_currents(motor_id: int) -> FmlxPacket: + return FmlxPacket(12, address=0).add_int16(motor_id) + + +def cmd_set_motor_currents(motor_id: int, boost: float, travel: float, hold: float) -> FmlxPacket: + return ( + FmlxPacket(13, address=0) + .add_int16(motor_id) + .add_double(boost) + .add_double(travel) + .add_double(hold) + ) + + +def cmd_get_motor_config(motor_id: int) -> FmlxPacket: + return FmlxPacket(14, address=0).add_int16(motor_id) + + +def cmd_set_motor_config( + motor_id: int, invert: bool, kp: float, ki: float, kd: float, usteps: int +) -> FmlxPacket: + return ( + FmlxPacket(15, address=0) + .add_int16(motor_id) + .add_bool(invert) + .add_double(kp) + .add_double(ki) + .add_double(kd) + .add_int16(usteps) + ) + + +def cmd_clear_motor_faults(motor_id: int) -> FmlxPacket: + return FmlxPacket(17, address=0).add_int16(motor_id) + + +def cmd_get_motor_status(motor_id: int) -> FmlxPacket: + return FmlxPacket(20, address=0).add_int16(motor_id) + + +def cmd_home( + motor_id: int, + method: int, + pos_edge: bool, + pos_dir: bool, + slow: float, + fast: float, + acc: float, +) -> FmlxPacket: + return ( + FmlxPacket(21, address=0) + .add_int16(motor_id) + .add_int16(method) + .add_bool(pos_edge) + .add_bool(pos_dir) + .add_double(slow) + .add_double(fast) + .add_double(acc) + ) + + +def cmd_move_absolute(motor_id: int, pos: float, vel: float, acc: float) -> FmlxPacket: + return ( + FmlxPacket(22, address=0).add_int16(motor_id).add_double(pos).add_double(vel).add_double(acc) + ) + + +def cmd_get_motor_position(motor_id: int) -> FmlxPacket: + return FmlxPacket(27, address=0).add_int16(motor_id) + + +def cmd_set_motor_position(motor_id: int, pos: float) -> FmlxPacket: + return FmlxPacket(28, address=0).add_int16(motor_id).add_double(pos) + + +def cmd_is_sensor_enabled(sensor_id: int) -> FmlxPacket: + return FmlxPacket(50, address=0).add_int16(sensor_id) + + +def cmd_get_sensor_limits(sensor_id: int) -> FmlxPacket: + return FmlxPacket(52, address=0).add_int16(sensor_id) + + +def cmd_write_ppi(start_addr: int, values: List[int]) -> FmlxPacket: + pkt = FmlxPacket(41, address=0).add_int16(start_addr).add_int16(len(values)) + for v in values: + pkt.add_uint16(v) + return pkt + + +def cmd_set_extended_input_mask(mask: int) -> FmlxPacket: + return FmlxPacket(55, address=0).add_uint32(mask) + + +def cmd_clear_sequencer() -> FmlxPacket: + return FmlxPacket(61, address=0) + + +def cmd_start_sequencer() -> FmlxPacket: + return FmlxPacket(62, address=0) + + +def cmd_queue_write_ppi(seq_id: int, duration: int, addr: int, values: List[int]) -> FmlxPacket: + pkt = ( + FmlxPacket(65, address=0) + .add_int16(seq_id) + .add_int16(duration) + .add_int16(addr) + .add_int16(len(values)) + ) + for v in values: + pkt.add_uint16(v) + return pkt + + +def cmd_queue_move_item( + seq_id: int, rel: bool, wait: bool, pva_triplets: List[List[float]] +) -> FmlxPacket: + pkt = FmlxPacket(68, address=0).add_int16(seq_id).add_bool(rel).add_bool(wait) + for p, v, a in pva_triplets: + pkt.add_double(p).add_double(v).add_double(a) + return pkt + + +def cmd_get_following_error_config(motor_id: int) -> FmlxPacket: + return FmlxPacket(82, address=0).add_int16(motor_id) + + +# --------------------------------------------------------------------------- +# Pressure device commands (address 10) +# --------------------------------------------------------------------------- + + +def cmd_p_get_version() -> FmlxPacket: + return FmlxPacket(1, address=10) + + +def cmd_p_get_target_pressure(sensor_id: int) -> FmlxPacket: + return FmlxPacket(10, address=10).add_int16(sensor_id) + + +def cmd_p_set_target_pressure(sensor_id: int, val: float) -> FmlxPacket: + return FmlxPacket(11, address=10).add_int16(sensor_id).add_double(val) + + +def cmd_p_get_controller_enabled(ctrl_id: int) -> FmlxPacket: + return FmlxPacket(12, address=10).add_int16(ctrl_id) + + +def cmd_p_set_controller_enabled(ctrl_id: int, enabled: bool) -> FmlxPacket: + return FmlxPacket(13, address=10).add_int16(ctrl_id).add_bool(enabled) + + +def cmd_p_get_pump_on() -> FmlxPacket: + return FmlxPacket(14, address=10) + + +def cmd_p_set_pump_on(enabled: bool) -> FmlxPacket: + return FmlxPacket(15, address=10).add_bool(enabled) + + +def cmd_p_get_status(sensor_id: int) -> FmlxPacket: + return FmlxPacket(16, address=10).add_int16(sensor_id) + + +def cmd_p_get_feedback_sensor_params(sensor_id: int) -> FmlxPacket: + return FmlxPacket(20, address=10).add_int16(sensor_id) + + +def cmd_p_set_feedback_sensor_params(sensor_id: int, scale: float, offset: float) -> FmlxPacket: + return FmlxPacket(21, address=10).add_int16(sensor_id).add_double(scale).add_double(offset) + + +def cmd_p_set_pid_params(ctrl_id: int, kp: float, ki: float, kd: float) -> FmlxPacket: + return FmlxPacket(23, address=10).add_int16(ctrl_id).add_double(kp).add_double(ki).add_double(kd) + + +def cmd_p_set_settling_criteria(ctrl_id: int, time_ms: float, max_err: float) -> FmlxPacket: + return FmlxPacket(25, address=10).add_int16(ctrl_id).add_double(time_ms).add_double(max_err) + + +def cmd_p_read_feedback_sensor(sensor_id: int) -> FmlxPacket: + return FmlxPacket(30, address=10).add_int16(sensor_id) + + +def cmd_p_set_proportional_valve(valve_id: int, pwm: int) -> FmlxPacket: + return FmlxPacket(32, address=10).add_int16(valve_id).add_uint16(pwm) + + +def cmd_p_set_solenoid_valve(valve_id: int, pwm: int) -> FmlxPacket: + return FmlxPacket(34, address=10).add_int16(valve_id).add_uint16(pwm) + + +def cmd_p_get_aux(aux_id: int) -> FmlxPacket: + return FmlxPacket(41, address=10).add_int16(aux_id) + + +def cmd_p_set_aux(aux_id: int, enabled: bool) -> FmlxPacket: + return FmlxPacket(42, address=10).add_int16(aux_id).add_bool(enabled) diff --git a/pylabrobot/dispensing/mantis/mantis_backend.py b/pylabrobot/dispensing/mantis/mantis_backend.py new file mode 100644 index 00000000000..f5288ed55d8 --- /dev/null +++ b/pylabrobot/dispensing/mantis/mantis_backend.py @@ -0,0 +1,780 @@ +"""Formulatrix Mantis backend for :class:`pylabrobot.dispensing.Dispenser`. + +This backend drives the Formulatrix Mantis chip-based contactless liquid +dispenser over an FTDI/USB serial link using the FMLX protocol. + +Example:: + + >>> from pylabrobot.dispensing import Dispenser + >>> from pylabrobot.dispensing.mantis import MantisBackend + >>> d = Dispenser(backend=MantisBackend(serial_number="M-000438")) + >>> await d.setup() + >>> await d.dispense(plate["A1:H12"], volume=5.0, chip=3) + >>> await d.stop() +""" + +from __future__ import annotations + +import asyncio +import logging +import time +from typing import Any, Dict, List, Optional, Tuple + +from pylabrobot.dispensing.backend import DispenserBackend +from pylabrobot.dispensing.standard import DispenseOp +from pylabrobot.io.ftdi import FTDI + +from .fmlx_driver import ( + FmlxDriver, + cmd_clear_motor_faults, + cmd_clear_sequencer, + cmd_get_following_error_config, + cmd_get_motor_limits, + cmd_get_motor_position, + cmd_get_motor_status, + cmd_get_sensor_limits, + cmd_get_version, + cmd_home, + cmd_is_sensor_enabled, + cmd_move_absolute, + cmd_p_get_aux, + cmd_p_get_pump_on, + cmd_p_get_status, + cmd_p_read_feedback_sensor, + cmd_p_set_aux, + cmd_p_set_controller_enabled, + cmd_p_set_feedback_sensor_params, + cmd_p_set_proportional_valve, + cmd_p_set_pump_on, + cmd_p_set_solenoid_valve, + cmd_p_set_target_pressure, + cmd_set_motor_position, + cmd_start_sequencer, +) +from .mantis_constants import ( + CHIP_PATHS, + DEFAULT_PLATE_GEOMETRY, + PPI_SEQUENCES, + SENSOR_PRESSURE, + SENSOR_VACUUM, + VEL_DEFAULT, + VEL_HOME, + XY_HOME, + XY_READY, + XY_WASTE_PATH, + MotorStatusCode, + PressureControlStatus, +) +from .mantis_kinematics import ( + MOTOR_1_CONFIG, + MOTOR_2_CONFIG, + MOTOR_3_CONFIG, + MantisKinematics, + MantisMapGenerator, +) + +logger = logging.getLogger(__name__) + +# Default chip-type mapping (chip number → chip type key in PPI_SEQUENCES) +DEFAULT_CHIP_TYPE_MAP: Dict[int, str] = { + 3: "high_volume", + 4: "high_volume", + 5: "high_volume", +} + + +class MantisBackend(DispenserBackend): + """Backend for the Formulatrix Mantis contactless liquid dispenser. + + Args: + serial_number: FTDI serial number of the Mantis device (e.g. ``"M-000438"``). + chip_type_map: Mapping from chip number (1-6) to chip type string + (key in ``PPI_SEQUENCES``). If ``None``, defaults to chips 3-5 as + ``"high_volume"``. + plate_geometry: Override default plate geometry dict used by + :class:`MantisMapGenerator`. Keys: ``a1_x``, ``a1_y``, ``row_pitch``, + ``col_pitch``, ``rows``, ``cols``, ``z``. + """ + + def __init__( + self, + serial_number: Optional[str] = None, + chip_type_map: Optional[Dict[int, str]] = None, + plate_geometry: Optional[Dict[str, Any]] = None, + ) -> None: + super().__init__() + self._serial_number = serial_number + self._chip_type_map = chip_type_map if chip_type_map is not None else DEFAULT_CHIP_TYPE_MAP + self._plate_geometry = plate_geometry or {} + + self._driver: Optional[FmlxDriver] = None + self._current_chip: Optional[int] = None + self._is_primed = False + + # -- public properties -- + + @property + def driver(self) -> FmlxDriver: + if self._driver is None: + raise RuntimeError("Driver not initialised. Call setup() first.") + return self._driver + + # -- DispenserBackend interface -- + + async def setup(self) -> None: + """Connect to the Mantis, home all axes, and initialise pressure.""" + logger.info("Setting up Mantis (serial=%s) ...", self._serial_number) + + # Create FTDI transport and FMLX driver + ftdi = FTDI( + human_readable_device_name="Formulatrix Mantis", + device_id=self._serial_number, + vid=0x0403, + pid=0x6010, + interface_select=2, + ) + self._driver = FmlxDriver(ftdi) + self._driver.on_event = self._event_handler + + await self._driver.connect() + await self._run_init_sequence() + logger.info("Mantis setup complete.") + + async def stop(self) -> None: + """Detach chip, shut down pressures, and disconnect.""" + logger.info("Shutting down Mantis ...") + if self._driver is None: + return + + if self._current_chip is not None: + await self._detach_chip(self._current_chip) + + await self._move_to_home() + await self._move_to_ready() + await self._shutdown_pressures() + await self._driver.disconnect() + self._driver = None + logger.info("Mantis shutdown complete.") + + async def dispense(self, ops: List[DispenseOp], **backend_kwargs) -> None: + """Execute dispense operations. + + Groups ops by chip number, then for each chip: attaches, primes, + dispenses to all target wells, and detaches. + """ + if not ops: + return + + # Group by chip + by_chip: Dict[Optional[int], List[DispenseOp]] = {} + for op in ops: + by_chip.setdefault(op.chip, []).append(op) + + for chip, chip_ops in by_chip.items(): + chip_number = chip if chip is not None else self._default_chip() + logger.info( + "Dispensing to %d well(s) using chip %d", + len(chip_ops), + chip_number, + ) + + # Ensure primed + if not (self._current_chip == chip_number and self._is_primed): + prime_volume = backend_kwargs.get("prime_volume", 20.0) + await self._prime_chip(chip_number, volume=prime_volume) + + try: + geom = DEFAULT_PLATE_GEOMETRY.copy() + geom.update(self._plate_geometry) + + gen = MantisMapGenerator( + a1_x=geom["a1_x"], + a1_y=geom["a1_y"], + row_pitch=geom["row_pitch"], + col_pitch=geom["col_pitch"], + rows=int(geom["rows"]), + cols=int(geom["cols"]), + z=geom["z"], + ) + dispense_list: List[Tuple[Tuple[float, float, float], float]] = [] + for op in chip_ops: + row, col = self._well_to_row_col(op.resource.name) + coord = gen.get_well_coordinate(row, col) + dispense_list.append( + ((float(coord["x"]), float(coord["y"]), float(coord["z"])), op.volume) + ) + + c_type = self._get_chip_type(chip_number) + if "low_volume" in c_type: + large_vol, small_vol = 0.5, 0.1 + large_seq, small_seq = "dispense_500nL", "dispense_100nL" + else: + large_vol, small_vol = 5.0, 1.0 + large_seq, small_seq = "dispense_5uL", "dispense_1uL" + + for pos, vol in dispense_list: + await self._queue_move_xy(pos, VEL_DEFAULT) + + num_large = int(vol / large_vol) + rem = vol - (num_large * large_vol) + num_small = int(round(rem / small_vol)) + + if num_large == 0 and num_small == 0 and vol > 0: + num_small = 1 + + for _ in range(num_large): + await self._execute_ppi_sequence(chip_number, large_seq) + for _ in range(num_small): + await self._execute_ppi_sequence(chip_number, small_seq) + + await self._move_to_home() + sid = await self._move_to_ready() + await self._wait_for_seq_progress(sid) + + finally: + await self._detach_chip(chip_number) + + # -- serialization -- + + def serialize(self) -> dict: + return { + **super().serialize(), + "serial_number": self._serial_number, + "chip_type_map": self._chip_type_map, + "plate_geometry": self._plate_geometry, + } + + # -- helpers -- + + def _default_chip(self) -> int: + """Return the first configured chip number.""" + if self._chip_type_map: + return next(iter(self._chip_type_map)) + raise ValueError("No chips configured in chip_type_map.") + + @staticmethod + def _well_to_row_col(name: str) -> Tuple[int, int]: + """Convert a well name like ``'A1'`` to 0-indexed (row, col).""" + row = ord(name[0].upper()) - ord("A") + col = int(name[1:]) - 1 + return row, col + + def _get_chip_type(self, chip_number: int) -> str: + return self._chip_type_map.get(chip_number, "high_volume") + + def _event_handler(self, evt: Dict[str, Any]) -> None: + if evt["event"] != "SequenceProgress": + logger.debug("[EVENT] %s", evt) + if evt["event"] in ("MotorErrorOccured", "SequenceStopped"): + logger.error("[ALERT] %s: %s", evt["event"], evt) + + # -- PPI sequence execution -- + + async def _execute_ppi_sequence(self, chip_number: int, sequence_name: str) -> None: + c_type = self._get_chip_type(chip_number) + if c_type not in PPI_SEQUENCES: + raise ValueError(f"Chip type {c_type!r} not found in PPI_SEQUENCES.") + seq = PPI_SEQUENCES[c_type].get(sequence_name) + if not seq: + raise ValueError(f"Sequence {sequence_name!r} not found for chip type {c_type!r}") + for dur, addr, vals in seq: + await self.driver.queue_write_ppi(dur, addr, vals) + + async def _pre_attach(self, chip_number: int) -> None: + await self._execute_ppi_sequence(chip_number, "preattach") + + async def _pre_detach(self, chip_number: int) -> None: + await self._execute_ppi_sequence(chip_number, "predetach") + + async def _post_detach(self, chip_number: int) -> None: + await self._execute_ppi_sequence(chip_number, "detachrecovery") + + async def _post_prime(self, chip_number: int) -> None: + await self._execute_ppi_sequence(chip_number, "postprime") + + # -- motor wait helpers -- + + async def _wait_for_seq_progress(self, seq_id: int, timeout: float = 60.0) -> None: + logger.info("Waiting for seq_id %d to finish ...", seq_id) + try: + await self.driver.wait_for_event( + lambda e: e["event"] == "SequenceProgress" and e["seq_id"] == seq_id and e["in_queue"] == 0, + timeout=timeout, + ) + except asyncio.TimeoutError as exc: + raise TimeoutError(f"Sequencer timed out waiting for seq_id {seq_id}") from exc + + async def _wait_for_motor_idle( + self, motor_id: int, timeout: float = 30.0, raise_on_error: bool = True + ) -> int: + start_time = time.time() + last_status = 0 + while time.time() - start_time < timeout: + res = await self.driver.send_command(cmd_get_motor_status(motor_id)) + status = res.get("status", 0) + last_status = status + + is_busy = (status & (MotorStatusCode.IS_MOVING | MotorStatusCode.IS_HOMING)) != 0 + if not is_busy: + if (status & MotorStatusCode.error_mask()) and raise_on_error: + raise RuntimeError(f"Motor {motor_id} stopped with error status: 0x{status:04X}") + return int(status) + await asyncio.sleep(0.1) + + raise TimeoutError( + f"Motor {motor_id} failed to settle within {timeout}s. Last status: 0x{last_status:04X}" + ) + + async def _verify_motor_status(self, motor_id: int, must_be_homed: bool = False) -> int: + res = await self.driver.send_command(cmd_get_motor_status(motor_id)) + status = res.get("status", 0) + if status & MotorStatusCode.error_mask(): + raise RuntimeError(f"Motor {motor_id} CRITICAL STATUS: 0x{status:04X} (errors detected)") + if must_be_homed and not (status & MotorStatusCode.IS_HOMED): + raise RuntimeError( + f"Motor {motor_id} expected to be HOMED but is not (status: 0x{status:04X})" + ) + return int(status) + + async def _wait_for_pressure_settled(self, sensor_id: int, timeout: float = 30.0) -> None: + start_time = time.time() + while time.time() - start_time < timeout: + res = await self.driver.send_command(cmd_p_get_status(sensor_id)) + status = res.get("value", 0) + await self.driver.send_command(cmd_p_read_feedback_sensor(sensor_id)) + if status == PressureControlStatus.SETTLED: + return + if status == PressureControlStatus.OFF: + raise RuntimeError(f"Pressure controller {sensor_id} turned off while waiting to settle.") + await asyncio.sleep(0.2) + raise TimeoutError(f"Pressure controller {sensor_id} failed to settle within {timeout}s") + + async def _wait_for_pump(self, expected_on: bool, timeout: float = 10.0) -> None: + start_time = time.time() + while time.time() - start_time < timeout: + res = await self.driver.send_command(cmd_p_get_pump_on()) + if bool(res.get("value")) == expected_on: + return + await asyncio.sleep(0.2) + raise TimeoutError(f"Pump did not reach expected state: {expected_on}") + + async def _wait_for_aux(self, aux_id: int, expected_value: int, timeout: float = 10.0) -> None: + start_time = time.time() + while time.time() - start_time < timeout: + res = await self.driver.send_command(cmd_p_get_aux(aux_id)) + if res.get("value") == expected_value: + return + await asyncio.sleep(0.2) + raise TimeoutError(f"Aux {aux_id} did not reach expected value {expected_value}") + + # -- movement -- + + async def _queue_move_xy( + self, + pos: Tuple[float, float, float], + vel_acc: Tuple[float, ...] = VEL_DEFAULT, + wait: bool = True, + ) -> int: + x, y, z = pos + v1, a1, v2, a2, v_z, a_z = vel_acc + + if x is None and y is None: + pos_1, pos_2 = 0.0, 0.0 + else: + theta1, theta2 = MantisKinematics.xy_to_theta(x, y) + pos_1 = MOTOR_1_CONFIG.to_packet_units(theta1) + pos_2 = MOTOR_2_CONFIG.to_packet_units(theta2) + + vel_1 = MOTOR_1_CONFIG.to_packet_units(v1, is_velocity_or_accel=True) + acc_1 = MOTOR_1_CONFIG.to_packet_units(a1, is_velocity_or_accel=True) + vel_2 = MOTOR_2_CONFIG.to_packet_units(v2, is_velocity_or_accel=True) + acc_2 = MOTOR_2_CONFIG.to_packet_units(a2, is_velocity_or_accel=True) + + pos_3 = MOTOR_3_CONFIG.to_packet_units(z) + vel_3 = MOTOR_3_CONFIG.to_packet_units(v_z, is_velocity_or_accel=True) + acc_3 = MOTOR_3_CONFIG.to_packet_units(a_z, is_velocity_or_accel=True) + + triplets = [ + [pos_1, vel_1, acc_1], + [pos_2, vel_2, acc_2], + [pos_3, vel_3, acc_3], + ] + return await self.driver.queue_move_item(False, wait, triplets) + + async def _move_to_home(self, vel_acc: Tuple[float, ...] = VEL_HOME, wait: bool = True) -> int: + return await self._queue_move_xy(XY_HOME, vel_acc, wait) + + async def _move_to_ready( + self, vel_acc: Tuple[float, ...] = VEL_DEFAULT, wait: bool = True + ) -> int: + return await self._queue_move_xy(XY_READY, vel_acc, wait) + + async def _execute_path(self, path) -> int: + sid = 0 + for xy_tuple in path: + sid = await self._queue_move_xy(*xy_tuple) + return sid + + # -- chip lifecycle -- + + async def _attach_chip(self, chip_number: int) -> None: + if chip_number not in CHIP_PATHS: + raise ValueError(f"Invalid chip number: {chip_number}") + if self._current_chip == chip_number: + logger.info("Chip %d is already attached.", chip_number) + return + if self._current_chip is not None: + logger.info( + "Detaching current chip %d before attaching %d ...", self._current_chip, chip_number + ) + await self._detach_chip(self._current_chip) + + logger.info("Attaching chip %d ...", chip_number) + await self._pre_attach(chip_number) + sid = await self._execute_path(CHIP_PATHS[chip_number]) + await self._wait_for_seq_progress(sid) + self._current_chip = chip_number + self._is_primed = False + + async def _detach_chip(self, chip_number: int, recover_liquid: bool = False) -> None: + if self._current_chip != chip_number: + logger.warning( + "Requested to detach chip %d but current chip is %s", chip_number, self._current_chip + ) + return + logger.info("Detaching chip %d ...", chip_number) + await self._pre_detach(chip_number) + sid = await self._execute_path(reversed(CHIP_PATHS[chip_number])) + await self._wait_for_seq_progress(sid) + if recover_liquid: + await self._post_detach(chip_number) + self._current_chip = None + self._is_primed = False + + async def _prime_chip(self, chip_number: int, volume: float = 20.0) -> None: + logger.info("Priming chip %d ...", chip_number) + await self._attach_chip(chip_number) + + # Waste / predispense cycle + for xy_tuple in XY_WASTE_PATH: + await self._queue_move_xy(*xy_tuple) + + c_type = self._get_chip_type(chip_number) + vol_per_cycle = 0.5 if "low_volume" in c_type else 5.0 + cycles = max(1, int(volume / vol_per_cycle)) + + for _ in range(cycles): + await self._execute_ppi_sequence(chip_number, "primepump") + + await self._post_prime(chip_number) + + # Return from waste + for i in range(len(XY_WASTE_PATH) - 1, -1, -1): + await self._queue_move_xy(*XY_WASTE_PATH[i]) + + await self._move_to_home() + sid = await self._move_to_ready() + await self._wait_for_seq_progress(sid) + self._is_primed = True + + # -- pressure management -- + + async def _prepare_pressure(self) -> None: + logger.info("Preparing pressure ...") + await self.driver.send_command(cmd_clear_sequencer()) + await self.driver.send_command(cmd_start_sequencer()) + + await self.driver.send_command(cmd_p_get_aux(2)) + await self.driver.send_command(cmd_p_set_aux(2, True)) + await self._wait_for_aux(2, 1) + + await self.driver.send_command(cmd_p_get_pump_on()) + await self.driver.send_command(cmd_p_set_pump_on(True)) + await self._wait_for_pump(True) + + await self.driver.send_command(cmd_p_set_controller_enabled(0, True)) + await self.driver.send_command(cmd_p_set_target_pressure(0, 0.0)) + await self.driver.send_command(cmd_p_set_controller_enabled(1, True)) + await self.driver.send_command(cmd_p_set_target_pressure(1, 12.0)) + await self.driver.send_command(cmd_p_set_controller_enabled(2, True)) + await self.driver.send_command(cmd_p_set_target_pressure(2, -14.0)) + + await self._wait_for_pump(True) + await self._wait_for_aux(2, 1) + + async def _shutdown_pressures(self) -> None: + logger.info("Shutting down pressures ...") + await self.driver.send_command(cmd_p_get_aux(2)) + await self.driver.send_command(cmd_p_set_aux(2, False)) + await self._wait_for_aux(2, 0) + + for pid in (0, 1, 2): + await self.driver.send_command(cmd_p_set_controller_enabled(pid, False)) + + await self.driver.send_command(cmd_p_set_pump_on(False)) + await self._wait_for_pump(False) + + for m in (0, 1, 2): + await self._verify_motor_status(m) + + # -- full init sequence (matching original mantis_backend.setup) -- + + async def _run_init_sequence(self) -> None: + """Execute the full Mantis initialisation sequence (homing, calibration, pressure).""" + + # PHASE 1: Handshake & limits + logger.info("[PHASE 1] Handshake & Limits") + for _ in range(4): + await self.driver.send_command(cmd_get_version()) + for m in (0, 1, 2): + if m != 0: + await self.driver.send_command(cmd_get_version()) + await self.driver.send_command(cmd_get_motor_limits(m)) + await self.driver.send_command(cmd_clear_motor_faults(0)) + await self.driver.send_command(cmd_clear_motor_faults(1)) + + # PHASE 2: Initial status checks + logger.info("[PHASE 2] Initial Status Checks") + for _ in range(2): + for m in (0, 1, 2): + await self.driver.send_command(cmd_get_following_error_config(m)) + await self._verify_motor_status(m) + + # PHASE 3: Zeroing & forced error recovery + logger.info("[PHASE 3] Zeroing & Forced Error Recovery") + await self._verify_motor_status(0) + await self.driver.send_command(cmd_set_motor_position(0, 0.0)) + await self._verify_motor_status(1) + await self.driver.send_command(cmd_set_motor_position(1, 0.0)) + + await self.driver.send_command(cmd_get_motor_position(0)) + await self.driver.send_command(cmd_move_absolute(0, -27.77777777777778, 5555.56, 833.33)) + await self.driver.send_command(cmd_get_motor_position(1)) + await self.driver.send_command(cmd_move_absolute(1, -27.77777777777778, 5555.56, 833.33)) + + await self._wait_for_motor_idle(0, raise_on_error=False) + await self._wait_for_motor_idle(1, raise_on_error=False) + + await self.driver.send_command(cmd_get_following_error_config(0)) + await self.driver.send_command(cmd_get_motor_status(0)) + await self.driver.send_command(cmd_get_following_error_config(0)) + await self.driver.send_command(cmd_get_motor_status(0)) + + await self.driver.send_command(cmd_clear_motor_faults(0)) + await self.driver.send_command(cmd_clear_motor_faults(1)) + + await self._verify_motor_status(0) + await self.driver.send_command(cmd_set_motor_position(0, 0.0)) + await self._verify_motor_status(1) + await self.driver.send_command(cmd_set_motor_position(1, 0.0)) + + # PHASE 4: Calibration cycles + logger.info("[PHASE 4] Calibration Cycles") + await self.driver.send_command(cmd_move_absolute(0, -27.77777777777778, 5555.56, 833.33)) + await self.driver.send_command(cmd_move_absolute(1, -27.77777777777778, 5555.56, 833.33)) + await self._wait_for_motor_idle(0, raise_on_error=False) + await self._wait_for_motor_idle(1, raise_on_error=False) + + await self.driver.send_command(cmd_get_following_error_config(0)) + await self.driver.send_command(cmd_get_motor_status(0)) + await self.driver.send_command(cmd_clear_motor_faults(0)) + await self.driver.send_command(cmd_clear_motor_faults(1)) + + # PHASE 5: Successful positioning + logger.info("[PHASE 5] Successful Positioning") + await self.driver.send_command(cmd_move_absolute(0, 30.861095852322048, 5555.56, 833.33)) + await self.driver.send_command(cmd_move_absolute(1, -12.63888888888889, 5555.56, 833.33)) + await self._wait_for_motor_idle(0, raise_on_error=True) + await self._wait_for_motor_idle(1, raise_on_error=True) + await self.driver.send_command(cmd_set_motor_position(0, 0.0)) + await self.driver.send_command(cmd_set_motor_position(1, 0.0)) + + # PHASE 6: Homing Z + logger.info("[PHASE 6] Homing Z") + for m in (0, 1, 2): + await self.driver.send_command(cmd_get_following_error_config(m)) + await self._verify_motor_status(m) + + await self.driver.send_command(cmd_clear_motor_faults(2)) + await self.driver.send_command( + cmd_home(2, 0, True, False, 59.05561811023622, 590.5561811023622, 15748.03649606299) + ) + await self._wait_for_motor_idle(2, raise_on_error=True) + await self._verify_motor_status(2, must_be_homed=True) + await self._verify_motor_status(2, must_be_homed=True) + + await self.driver.send_command(cmd_is_sensor_enabled(SENSOR_VACUUM)) + await self.driver.send_command(cmd_is_sensor_enabled(SENSOR_PRESSURE)) + await self.driver.send_command(cmd_get_sensor_limits(SENSOR_VACUUM)) + await self.driver.send_command(cmd_get_sensor_limits(SENSOR_PRESSURE)) + + # PHASE 7: Re-verify & calibration + logger.info("[PHASE 7] Re-Verify & Calibration") + await self.driver.send_command(cmd_get_version()) + for m in (0, 1, 2): + if m != 0: + await self.driver.send_command(cmd_get_version()) + await self.driver.send_command(cmd_get_motor_limits(m)) + + await self.driver.send_command(cmd_clear_motor_faults(0)) + await self.driver.send_command(cmd_clear_motor_faults(1)) + + for _ in range(2): + for m in (0, 1, 2): + await self.driver.send_command(cmd_get_following_error_config(m)) + await self._verify_motor_status(m) + + await self._verify_motor_status(0) + await self.driver.send_command(cmd_set_motor_position(0, 0.0)) + await self._verify_motor_status(1) + await self.driver.send_command(cmd_set_motor_position(1, 0.0)) + + # Repeated move/recovery cycles + for _ in range(2): + await self.driver.send_command(cmd_move_absolute(0, -27.77777777777778, 5555.56, 833.33)) + await self.driver.send_command(cmd_move_absolute(1, -27.77777777777778, 5555.56, 833.33)) + await self._wait_for_motor_idle(0, raise_on_error=False) + await self._wait_for_motor_idle(1, raise_on_error=False) + await self.driver.send_command(cmd_clear_motor_faults(0)) + await self.driver.send_command(cmd_clear_motor_faults(1)) + await self.driver.send_command(cmd_set_motor_position(0, 0.0)) + await self.driver.send_command(cmd_set_motor_position(1, 0.0)) + + # Positive move (success) + await self.driver.send_command(cmd_move_absolute(0, 10.61111111111111, 5555.56, 833.33)) + await self.driver.send_command(cmd_move_absolute(1, 12.11111111111111, 5555.56, 833.33)) + await self._wait_for_motor_idle(0, raise_on_error=True) + await self._wait_for_motor_idle(1, raise_on_error=True) + await self.driver.send_command(cmd_set_motor_position(0, 0.0)) + await self.driver.send_command(cmd_set_motor_position(1, 0.0)) + + # PHASE 8: Final homing sequence + logger.info("[PHASE 8] Final Homing Sequence") + for m in (0, 1, 2): + await self.driver.send_command(cmd_get_following_error_config(m)) + await self._verify_motor_status(m) + + await self.driver.send_command(cmd_clear_motor_faults(2)) + await self.driver.send_command( + cmd_home(2, 0, True, False, 59.05561811023622, 590.5561811023622, 15748.03649606299) + ) + await self._wait_for_motor_idle(2, raise_on_error=True) + await self._verify_motor_status(2, must_be_homed=True) + await self._verify_motor_status(2, must_be_homed=True) + + for m in (0, 1, 2): + await self.driver.send_command(cmd_get_following_error_config(m)) + await self._verify_motor_status(m) + + # Homing XY + await self.driver.send_command(cmd_clear_motor_faults(0)) + await self.driver.send_command(cmd_clear_motor_faults(1)) + await self.driver.send_command( + cmd_home(0, 3, True, False, 5.556055555555556, 55.56055555555556, 1388.893888888889) + ) + await self.driver.send_command( + cmd_home(1, 3, True, True, 5.556055555555556, 55.56055555555556, 1388.893888888889) + ) + await self._wait_for_motor_idle(0, raise_on_error=True) + await self._wait_for_motor_idle(1, raise_on_error=True) + + for m in (0, 1): + await self._verify_motor_status(m, must_be_homed=True) + await self._verify_motor_status(m, must_be_homed=True) + + # PHASE 9: Post-home positioning + logger.info("[PHASE 9] Post-Home Positioning") + await self.driver.send_command(cmd_move_absolute(0, 0.0, 500.0, 100.0)) + await self._wait_for_motor_idle(0, raise_on_error=True) + await self._verify_motor_status(0) + await self.driver.send_command(cmd_set_motor_position(0, -52.2)) + + await self.driver.send_command(cmd_move_absolute(1, 0.0, 500.0, 100.0)) + await self._wait_for_motor_idle(1, raise_on_error=True) + await self._verify_motor_status(1) + await self.driver.send_command(cmd_set_motor_position(1, 121.23)) + + await self.driver.send_command(cmd_move_absolute(2, 0.0, 3937.012874015748, 15748.03649606299)) + await self._wait_for_motor_idle(2, raise_on_error=True) + + logger.info("Executing coordinated move sequence ...") + + # Coordinated moves + coord_moves = [ + (0, -52.19948822707371, 55.56055555555556, 1388.893888888889), + (1, 48.927484449958044, 55.56055555555556, 1388.893888888889), + ] + for mid, p, v, a in coord_moves: + await self.driver.send_command(cmd_move_absolute(mid, p, v, a)) + if mid == 1: + await self.driver.send_command(cmd_get_motor_status(1)) + await self._verify_motor_status(0) + await self._wait_for_motor_idle(mid, raise_on_error=True) + await self._verify_motor_status(mid) + + await self._verify_motor_status(2) + await self.driver.send_command(cmd_move_absolute(2, 0.0, 3937.012874015748, 15748.03649606299)) + await self._wait_for_motor_idle(2, raise_on_error=True) + await self._verify_motor_status(2) + + # Second coordinated move pair + coord_moves_2 = [ + (0, -19.341858780286216, 55.56055555555556, 1388.893888888889), + (1, 46.4985830283458, 55.56055555555556, 1388.893888888889), + ] + for mid, p, v, a in coord_moves_2: + await self.driver.send_command(cmd_move_absolute(mid, p, v, a)) + if mid == 1: + await self.driver.send_command(cmd_get_motor_status(1)) + await self._verify_motor_status(0) + await self._wait_for_motor_idle(mid, raise_on_error=True) + await self._verify_motor_status(mid) + + await self._verify_motor_status(2) + await self.driver.send_command(cmd_move_absolute(2, 0.0, 3937.012874015748, 15748.03649606299)) + await self._wait_for_motor_idle(2, raise_on_error=True) + await self._verify_motor_status(2) + + # Final zeroing move + for mid in (0, 1): + await self.driver.send_command( + cmd_move_absolute(mid, 4.3180815265170873e-05, 55.56055555555556, 1388.893888888889) + ) + if mid == 1: + await self.driver.send_command(cmd_get_motor_status(1)) + await self._verify_motor_status(0) + await self._wait_for_motor_idle(mid, raise_on_error=True) + await self._verify_motor_status(mid) + + await self._verify_motor_status(2) + + # Pressure initialisation + for pid in (0, 1): + await self.driver.send_command(cmd_p_set_controller_enabled(pid, False)) + await self.driver.send_command(cmd_p_set_proportional_valve(pid, 0)) + await self.driver.send_command(cmd_p_set_solenoid_valve(pid, 10000)) + offset = -14.738 if pid == 0 else -14.581 + await self.driver.send_command(cmd_p_set_feedback_sensor_params(pid, 0.01124, offset)) + await self.driver.send_command(cmd_p_set_solenoid_valve(pid, 0)) + + await self.driver.send_command(cmd_clear_sequencer()) + await self.driver.send_command(cmd_start_sequencer()) + + await self.driver.send_command(cmd_p_set_pump_on(True)) + for pid in (0, 1, 2): + await self.driver.send_command(cmd_p_set_controller_enabled(pid, True)) + + await self.driver.send_command(cmd_p_set_target_pressure(2, -14.0)) + try: + await self._wait_for_pressure_settled(2, timeout=3.0) + except TimeoutError: + pass + + await self.driver.send_command(cmd_p_set_target_pressure(0, 0.0)) + await self._wait_for_pressure_settled(0, timeout=5.0) + + await self.driver.send_command(cmd_p_set_target_pressure(1, 12.0)) + try: + await self._wait_for_pressure_settled(1, timeout=3.0) + except TimeoutError: + pass diff --git a/pylabrobot/dispensing/mantis/mantis_constants.py b/pylabrobot/dispensing/mantis/mantis_constants.py new file mode 100644 index 00000000000..1a86b3f2953 --- /dev/null +++ b/pylabrobot/dispensing/mantis/mantis_constants.py @@ -0,0 +1,483 @@ +"""Constants for the Formulatrix Mantis liquid dispenser. + +Includes motor status codes, pressure control status, velocity profiles, +chip kinematic paths, and PPI (Programmable Pulse Interface) sequences. +""" + +from enum import IntEnum, IntFlag +from typing import Dict, List, Tuple + + +class MotorStatusCode(IntFlag): + """Bitmask status codes for Mantis motors.""" + + NONE = 0 + IS_MOVING = 1 + IS_HOMING = 2 + IS_HOMED = 4 + LOWER_LIMIT = 8 + UPPER_LIMIT = 16 + OVER_CURRENT = 32 + ABORTED = 64 + FOLLOWING_ERROR_IDLE = 128 + FOLLOWING_ERROR_MOVING = 256 + ENCODER_ERROR = 512 + UNSTABLE_CURRENT = 1024 + + @classmethod + def error_mask(cls) -> "MotorStatusCode": + return ( + cls.OVER_CURRENT + | cls.ABORTED + | cls.FOLLOWING_ERROR_IDLE + | cls.FOLLOWING_ERROR_MOVING + | cls.ENCODER_ERROR + | cls.UNSTABLE_CURRENT + ) + + +class PressureControlStatus(IntEnum): + """Status codes for the pressure controller.""" + + OFF = 0 + SETTLED = 1 + UNSETTLED = 2 + + +# Sensor IDs +SENSOR_PRESSURE = 0 +SENSOR_VACUUM = 1 + +# Velocity / Acceleration tuples: (v1, a1, v2, a2, v_z, a_z) +VEL_DEFAULT: Tuple[float, ...] = (10000.0, 1500.0, 10000.0, 1500.0, 55.0, 200.0) +VEL_HOME: Tuple[float, ...] = (0.0, 0.0, 0.0, 0.0, 55.0, 200.0) +VEL_XY_ONLY: Tuple[float, ...] = (10000.0, 1500.0, 10000.0, 1500.0, 0.0, 0.0) + +# Named positions: (x, y, z) in mm +XY_HOME: Tuple[float, float, float] = (15.0, 31.177, 0.0) +XY_READY: Tuple[float, float, float] = (15.0, 30.0, 0.0) + +# Type alias for a waypoint: ((x, y, z), velocity_tuple) +Waypoint = Tuple[Tuple[float, float, float], Tuple[float, ...]] + +# PPI sequence entry: (duration_ms, address, values) +PPIEntry = Tuple[int, int, List[int]] + +# --------------------------------------------------------------------------- +# Chip kinematic paths +# Each path is a list of waypoints the head follows to attach/detach a chip. +# --------------------------------------------------------------------------- + +CHIP_1_PATH: List[Waypoint] = [ + ((70.394, 63.965, -1.5), VEL_DEFAULT), + ((81.902, 29.298, -1.5), VEL_DEFAULT), + ((95.902, 9.298, -1.5), VEL_DEFAULT), + ((125.902, 19.298, -1.5), VEL_DEFAULT), + ((125.902, 29.298, -1.5), VEL_DEFAULT), + ((125.902, 29.298, 13.067), VEL_DEFAULT), + ((125.902, 29.298, 13.067), VEL_DEFAULT), + ((123.731, 29.384, 13.117), VEL_DEFAULT), + ((123.731, 29.384, 0.0), VEL_XY_ONLY), + ((123.731, 29.384, 0.0), VEL_DEFAULT), + ((91.060, 27.755, -1.5), VEL_DEFAULT), +] + +CHIP_2_PATH: List[Waypoint] = [ + ((80.808, 73.114, -1.5), VEL_DEFAULT), + ((85.961, 44.334, -1.5), VEL_DEFAULT), + ((95.961, 44.334, -1.5), VEL_DEFAULT), + ((105.961, 45.334, -1.5), VEL_DEFAULT), + ((110.961, 46.334, -1.5), VEL_DEFAULT), + ((116.961, 47.334, -1.5), VEL_DEFAULT), + ((119.961, 49.334, -1.5), VEL_DEFAULT), + ((121.961, 52.334, -1.5), VEL_DEFAULT), + ((123.461, 55.334, -1.5), VEL_DEFAULT), + ((122.961, 58.334, -1.5), VEL_DEFAULT), + ((122.961, 58.334, 13.253), VEL_DEFAULT), + ((122.961, 58.334, 13.253), VEL_DEFAULT), + ((120.450, 57.048, 13.459), VEL_DEFAULT), + ((120.450, 57.048, 0.0), VEL_XY_ONLY), + ((120.450, 57.048, 0.0), VEL_DEFAULT), + ((90.246, 52.850, -1.5), VEL_DEFAULT), +] + +CHIP_3_PATH: List[Waypoint] = [ + ((-44.967, 67.275, 0.0), VEL_DEFAULT), + ((-56.838, 76.393, -1.5), VEL_DEFAULT), + ((-68.838, 77.393, -1.5), VEL_DEFAULT), + ((-68.838, 77.393, 13.022), VEL_DEFAULT), + ((-68.838, 77.393, 13.022), VEL_DEFAULT), + ((-68.636, 75.270, 12.637), VEL_DEFAULT), + ((-68.636, 75.270, 0.0), VEL_XY_ONLY), + ((-68.636, 75.270, 0.0), VEL_DEFAULT), + ((-56.636, 74.270, -1.5), VEL_DEFAULT), +] + +CHIP_4_PATH: List[Waypoint] = [ + ((-50.317, 42.819, -1.5), VEL_DEFAULT), + ((-53.677, 42.742, -1.5), VEL_DEFAULT), + ((-63.677, 39.742, -1.5), VEL_DEFAULT), + ((-73.677, 49.742, -1.5), VEL_DEFAULT), + ((-78.677, 54.742, -1.5), VEL_DEFAULT), + ((-82.677, 58.742, -1.5), VEL_DEFAULT), + ((-86.677, 62.742, -1.5), VEL_DEFAULT), + ((-91.677, 60.742, -1.5), VEL_DEFAULT), + ((-91.677, 60.742, 12.911), VEL_DEFAULT), + ((-91.677, 60.742, 12.911), VEL_DEFAULT), + ((-90.476, 58.677, 12.945), VEL_DEFAULT), + ((-90.476, 58.677, 0.0), VEL_XY_ONLY), + ((-90.476, 58.677, 0.0), VEL_DEFAULT), + ((-79.116, 48.277, -1.5), VEL_DEFAULT), + ((-65.916, 31.317, -1.5), VEL_DEFAULT), + ((-37.776, 45.537, -1.5), VEL_DEFAULT), +] + +CHIP_5_PATH: List[Waypoint] = [ + ((-70.0, 35.0, -1.5), VEL_DEFAULT), + ((-82.48, 32.654, -1.5), VEL_DEFAULT), + ((-97.48, 40.654, -1.5), VEL_DEFAULT), + ((-104.48, 39.654, -1.5), VEL_DEFAULT), + ((-107.48, 37.654, -1.5), VEL_DEFAULT), + ((-107.48, 37.654, 13.1), VEL_DEFAULT), + ((-107.48, 37.654, 13.1), VEL_DEFAULT), + ((-105.791, 35.948, 13.334), VEL_DEFAULT), + ((-105.791, 35.948, 0.0), VEL_XY_ONLY), + ((-105.791, 35.948, 0.0), VEL_DEFAULT), + ((-90.471, 28.058, -1.5), VEL_DEFAULT), + ((-68.121, 32.298, -1.5), VEL_DEFAULT), + ((-36.491, 59.968, -1.5), VEL_DEFAULT), +] + +CHIP_6_PATH: List[Waypoint] = [ + ((-57.0, 0.0, 0.0), VEL_DEFAULT), + ((-68.436, 6.404, -1.5), VEL_DEFAULT), + ((-88.436, 6.404, -1.5), VEL_DEFAULT), + ((-98.436, 9.404, -1.5), VEL_DEFAULT), + ((-108.436, 12.404, -1.5), VEL_DEFAULT), + ((-112.436, 8.404, -1.5), VEL_DEFAULT), + ((-112.436, 8.404, 12.991), VEL_DEFAULT), + ((-112.436, 8.404, 12.991), VEL_DEFAULT), + ((-110.384, 7.333, 12.822), VEL_DEFAULT), + ((-110.384, 7.333, 0.0), VEL_XY_ONLY), + ((-110.384, 7.333, 0.0), VEL_DEFAULT), + ((-95.384, 7.333, -1.5), VEL_DEFAULT), + ((-80.384, 7.333, -1.5), VEL_DEFAULT), + ((-60.384, 7.333, -1.5), VEL_DEFAULT), +] + +CHIP_PATHS: Dict[int, List[Waypoint]] = { + 1: CHIP_1_PATH, + 2: CHIP_2_PATH, + 3: CHIP_3_PATH, + 4: CHIP_4_PATH, + 5: CHIP_5_PATH, + 6: CHIP_6_PATH, +} + +XY_WASTE_PATH: List[Waypoint] = [ + ((15.0, 31.177, -1.5), VEL_HOME), + ((15.0, 31.17, 0.0), VEL_XY_ONLY), + ((64.0, 60.0, 0.0), VEL_XY_ONLY), + ((97.4331, -15.2603, 0.0), VEL_XY_ONLY), + ((108.4284, -44.4724, 0.0), VEL_XY_ONLY), + ((15.0, 31.177, -1.5), VEL_HOME), + ((119.756, -52.28, 5.191), VEL_DEFAULT), +] + +# --------------------------------------------------------------------------- +# PPI (Programmable Pulse Interface) sequences +# Keyed by chip type, then by sequence name. +# Each sequence is a list of (duration_ms, address, [ppi_values]). +# --------------------------------------------------------------------------- + +PPI_SEQUENCES: Dict[str, Dict[str, List[PPIEntry]]] = { + "high_volume": { + "detachrecovery": [ + (84, 40, [25]), + (15, 40, [27]), + (14, 40, [30]), + (136, 40, [31]), + ] + * 5 + + [(100, 40, [31])], + "dispense_1uL": [ + (34, 40, [30]), + (14, 40, [31]), + (15, 40, [29]), + (21, 40, [29]), + (13, 40, [31]), + ], + "dispense_5uL": [ + (136, 40, [26]), + (14, 40, [27]), + (15, 40, [25]), + (84, 40, [29]), + (13, 40, [31]), + ], + "postprime": [ + (13, 40, [31]), + (12, 40, [30]), + (34, 40, [30]), + (14, 40, [31]), + (15, 40, [29]), + (21, 40, [29]), + (13, 40, [31]), + (12, 40, [30]), + (34, 40, [30]), + (14, 40, [31]), + (15, 40, [29]), + (21, 40, [29]), + (13, 40, [31]), + ], + "preattach": [(100, 40, [31])], + "predetach": [ + (100, 40, [24]), + (100, 40, [26]), + (100, 40, [30]), + (100, 40, [31]), + ], + "primepump": [ + (136, 40, [26]), + (14, 40, [27]), + (15, 40, [25]), + (84, 40, [29]), + (13, 40, [31]), + ], + "reversepump": [ + (84, 40, [25]), + (15, 40, [27]), + (14, 40, [30]), + (136, 40, [31]), + ], + "washinput": [(35, 40, [25]), (1, 40, [27]), (25, 40, [30]), (1, 40, [31])] * 100 + + [(35, 40, [25])], + }, + "high_volume_pfe": { + "detachrecovery": [ + (54, 40, [29]), + (204, 40, [25]), + (35, 40, [27]), + (53, 40, [30]), + (86, 40, [31]), + ] + * 5 + + [(100, 40, [31])], + "dispense_1uL": [ + (26, 40, [30]), + (121, 40, [30]), + (34, 40, [31]), + (35, 40, [29]), + (51, 40, [29]), + (33, 40, [31]), + ], + "dispense_5uL": [ + (26, 40, [30]), + (273, 40, [26]), + (90, 40, [27]), + (35, 40, [25]), + (84, 40, [29]), + (101, 40, [31]), + ], + "postprime": [ + (33, 40, [31]), + (26, 40, [30]), + (121, 40, [30]), + (34, 40, [31]), + (35, 40, [29]), + (51, 40, [29]), + (33, 40, [31]), + (26, 40, [30]), + (121, 40, [30]), + (34, 40, [31]), + (35, 40, [29]), + (51, 40, [29]), + (33, 40, [31]), + ], + "preattach": [(100, 40, [31])], + "predetach": [ + (100, 40, [24]), + (100, 40, [26]), + (100, 40, [30]), + (100, 40, [31]), + ], + "primepump": [ + (36, 40, [30]), + (271, 40, [26]), + (64, 40, [27]), + (35, 40, [25]), + (84, 40, [29]), + (33, 40, [31]), + ], + "reversepump": [ + (204, 40, [25]), + (35, 40, [27]), + (33, 40, [26]), + (53, 40, [30]), + (86, 40, [31]), + (54, 40, [29]), + ], + "washinput": [(35, 40, [25]), (1, 40, [27]), (25, 40, [30]), (1, 40, [31])] * 100 + + [(35, 40, [25])], + }, + "hv_continuous_flow": { + "checkbottleleak": [ + (100, 40, [30]), + (5000, 40, [30]), + (100, 40, [30]), + (500, 40, [28]), + ], + "closeoutput": [(100, 40, [31])], + "detachrecovery": [(100, 40, [27])], + "dispense_1uL": [(100, 40, [30])], + "postdetach": [(100, 40, [27])], + "postprime": [(100, 40, [30]), (30, 40, [28]), (100, 40, [30])], + "postwashinput": [(4000, 40, [30]), (500, 40, [28])], + "preattach": [(1000, 40, [27])], + "predetach": [(100, 40, [27])], + "predispense": [(100, 40, [30]), (30, 40, [28]), (100, 40, [30])], + "preparingpressure": [(100, 40, [31]), (100, 40, [30]), (100, 40, [31])], + "preparingvacuum": [(100, 40, [31]), (12000, 40, [30])], + "presafestate": [(100, 40, [30]), (500, 40, [28])], + "prewashinput": [(100, 40, [31]), (100, 40, [30])], + "primepump": [(100, 40, [30])], + "releasepressure": [(100, 40, [31]), (200, 40, [31])], + "releasevacuum": [(100, 40, [31]), (100, 40, [27])], + "reversepump": [(4000, 40, [30]), (500, 40, [28])], + "washinput": [(100, 40, [30]), (1000, 40, [28]), (100, 40, [30])], + }, + "low_volume": { + "detachrecovery": [ + (21, 40, [25]), + (15, 40, [27]), + (14, 40, [30]), + (77, 40, [31]), + ] + * 5 + + [(100, 40, [31])], + "dispense_100nL": [ + (27, 40, [30]), + (14, 40, [31]), + (15, 40, [29]), + (21, 40, [29]), + (13, 40, [31]), + ], + "dispense_500nL": [ + (77, 40, [26]), + (14, 40, [27]), + (15, 40, [25]), + (21, 40, [29]), + (13, 40, [31]), + ], + "postprime": [ + (13, 40, [31]), + (12, 40, [30]), + (22, 40, [30]), + (14, 40, [31]), + (15, 40, [29]), + (21, 40, [29]), + (13, 40, [31]), + (12, 40, [30]), + (22, 40, [30]), + (14, 40, [31]), + (15, 40, [29]), + (21, 40, [29]), + (13, 40, [31]), + ], + "preattach": [(100, 40, [31])], + "predetach": [ + (100, 40, [24]), + (100, 40, [26]), + (100, 40, [30]), + (100, 40, [31]), + ], + "primepump": [ + (77, 40, [26]), + (14, 40, [27]), + (15, 40, [25]), + (21, 40, [29]), + (13, 40, [31]), + ], + "reversepump": [(21, 40, [25]), (15, 40, [27]), (14, 40, [30]), (77, 40, [31])], + "washinput": [(35, 40, [25]), (1, 40, [27]), (25, 40, [30]), (1, 40, [31])] * 100 + + [(35, 40, [25])], + }, + "low_volume_pfe": { + "detachrecovery": [ + (81, 40, [29]), + (153, 40, [25]), + (25, 40, [27]), + (51, 40, [26]), + (85, 40, [30]), + (25, 40, [31]), + ] + * 5 + + [(100, 40, [31])], + "dispense_100nL": [ + (81, 40, [30]), + (127, 40, [30]), + (25, 40, [31]), + (51, 40, [29]), + (75, 40, [29]), + (25, 40, [31]), + ], + "dispense_500nL": [ + (83, 40, [30]), + (151, 40, [26]), + (25, 40, [27]), + (50, 40, [25]), + (86, 40, [29]), + (25, 40, [31]), + ], + "postprime": [ + (25, 40, [31]), + (81, 40, [30]), + (127, 40, [30]), + (25, 40, [31]), + (51, 40, [29]), + (75, 40, [29]), + (25, 40, [31]), + (81, 40, [30]), + (127, 40, [30]), + (25, 40, [31]), + (51, 40, [29]), + (75, 40, [29]), + (25, 40, [31]), + ], + "preattach": [(100, 40, [31])], + "predetach": [ + (100, 40, [24]), + (100, 40, [26]), + (100, 40, [30]), + (100, 40, [31]), + ], + "primepump": [ + (83, 40, [30]), + (153, 40, [26]), + (25, 40, [27]), + (50, 40, [25]), + (86, 40, [29]), + (25, 40, [31]), + ], + "reversepump": [ + (200, 40, [25]), + (55, 40, [27]), + (33, 40, [26]), + (77, 40, [30]), + (34, 40, [31]), + (81, 40, [29]), + ], + "washinput": [(35, 40, [25]), (1, 40, [27]), (25, 40, [30]), (1, 40, [31])] * 100 + + [(35, 40, [25])], + }, +} + +# Default plate geometry for coordinate generation +DEFAULT_PLATE_GEOMETRY = { + "a1_x": 14.35, + "a1_y": 11.23, + "row_pitch": 9.02, + "col_pitch": 9.02, + "rows": 8, + "cols": 12, + "z": 44.331, +} diff --git a/pylabrobot/dispensing/mantis/mantis_kinematics.py b/pylabrobot/dispensing/mantis/mantis_kinematics.py new file mode 100644 index 00000000000..f61ac22fa16 --- /dev/null +++ b/pylabrobot/dispensing/mantis/mantis_kinematics.py @@ -0,0 +1,254 @@ +"""Kinematics and coordinate mapping for the Formulatrix Mantis dispenser. + +Provides: + - :class:`MotorConfig` — unit conversion between native and packet units. + - :class:`MantisKinematics` — inverse/forward kinematics for the dual-arm SCARA mechanism. + - :class:`MantisMapGenerator` — microplate well-coordinate generation with homography correction. +""" + +import math +from typing import Any, Dict, List, Tuple + +# ========================================== +# Arm geometry constants +# ========================================== + +RIGHT_SHORT_ARM_LENGTH = 63.0 +RIGHT_LONG_ARM_LENGTH = 84.0 +LEFT_SHORT_ARM_LENGTH = 63.0 +LEFT_LONG_ARM_LENGTH = 84.0 +ARM_DISTANCE = 30.0 +GAMMA = 0.0 +MIN_THETA_1 = -93.96 +MIN_THETA_2 = -141.786 + + +class MotorConfig: + """Encodes the mechanical coupling between motor steps and physical units (degrees or mm).""" + + def __init__(self, pitch: float, steps_per_rev: int, microsteps: int) -> None: + self.pitch = pitch + self.steps_per_rev = steps_per_rev + self.microsteps = microsteps + + def from_packet_units(self, val_packet: float, is_velocity_or_accel: bool = False) -> float: + """Convert packet units back to native units (degrees or mm).""" + val_mapped = val_packet * self.microsteps + slope = self.steps_per_rev / self.pitch + if is_velocity_or_accel: + if val_packet == 0: + return 0.0 + val = (val_mapped - 0.5) / slope + else: + val = val_mapped / slope + return val + + def to_packet_units(self, val: float, is_velocity_or_accel: bool = False) -> float: + """Convert native units to packet units.""" + if is_velocity_or_accel and val == 0: + return 0.0 + slope = self.steps_per_rev / self.pitch + if is_velocity_or_accel: + val_mapped = abs(slope * val) + 0.5 + else: + val_mapped = slope * val + val_packet = val_mapped / self.microsteps + return val_packet + + +# Pre-configured motor instances +MOTOR_1_CONFIG = MotorConfig(pitch=360.0, steps_per_rev=20000, microsteps=100) +MOTOR_2_CONFIG = MotorConfig(pitch=360.0, steps_per_rev=20000, microsteps=100) +MOTOR_3_CONFIG = MotorConfig(pitch=5.08, steps_per_rev=20000, microsteps=100) + + +class MantisKinematics: + """Inverse and forward kinematics for the Mantis dual-arm SCARA mechanism.""" + + @staticmethod + def normalize_degree(degree: float, min_value: float) -> float: + """Normalize an angle to be within [min_value, min_value + 360).""" + while degree < min_value or degree > min_value + 360: + if degree < min_value: + degree += 360 + if degree > min_value + 360: + degree -= 360 + return degree + + @staticmethod + def xy_to_theta(x: float, y: float) -> Tuple[float, float]: + """Inverse kinematics: Cartesian (x, y) → joint angles (theta1, theta2). + + Args: + x: X coordinate in mm. + y: Y coordinate in mm. + + Returns: + Tuple of (theta1, theta2) in degrees. + + Raises: + ValueError: If (x, y) is at the origin or causes a singularity. + """ + l1, l2 = RIGHT_SHORT_ARM_LENGTH, RIGHT_LONG_ARM_LENGTH + l3, l4 = LEFT_SHORT_ARM_LENGTH, LEFT_LONG_ARM_LENGTH + + rad_gamma = math.radians(GAMMA) + dx = ARM_DISTANCE * math.cos(rad_gamma) + dy = ARM_DISTANCE * math.sin(rad_gamma) + + dist_sq = x**2 + y**2 + dist = math.sqrt(dist_sq) + + if dist == 0: + raise ValueError("Target position X=0, Y=0 is invalid") + + a = (l1**2 - l2**2 + dist_sq) / (2 * l1) / dist + a = max(-1.0, min(1.0, a)) + + theta1 = 90 + math.degrees(math.asin(a) - math.atan2(y, x)) + + dist_sq_2 = (x - dx) ** 2 + (y - dy) ** 2 + dist_2 = math.sqrt(dist_sq_2) + + if dist_2 == 0: + raise ValueError("Target position causes singularity on left arm") + + b = (l3**2 - l4**2 + dist_sq_2) / (2 * l3) / dist_2 + b = max(-1.0, min(1.0, b)) + + theta2 = 90 - math.degrees(math.pi - math.asin(b) - math.atan2(y - dy, x - dx)) + + theta1 = MantisKinematics.normalize_degree(theta1, MIN_THETA_1) + theta2 = MantisKinematics.normalize_degree(theta2, MIN_THETA_2) + + return theta1, theta2 + + @staticmethod + def theta_to_xy(theta1: float, theta2: float) -> List[Tuple[float, float]]: + """Forward kinematics: joint angles → Cartesian (x, y). + + Returns up to two candidate solutions. Returns an empty list if the + configuration is unreachable. + """ + l1, l2 = RIGHT_SHORT_ARM_LENGTH, RIGHT_LONG_ARM_LENGTH + l3, l4 = LEFT_SHORT_ARM_LENGTH, LEFT_LONG_ARM_LENGTH + + rad_gamma = math.radians(GAMMA) + dx = ARM_DISTANCE * math.cos(rad_gamma) + dy = ARM_DISTANCE * math.sin(rad_gamma) + + angle1_rad = math.radians(180 - theta1) + angle2_rad = math.radians(theta2) + + e1_x = l1 * math.cos(angle1_rad) + e1_y = l1 * math.sin(angle1_rad) + + e2_x = dx + l3 * math.cos(angle2_rad) + e2_y = dy + l3 * math.sin(angle2_rad) + + d_sq = (e2_x - e1_x) ** 2 + (e2_y - e1_y) ** 2 + d = math.sqrt(d_sq) + + if d > (l2 + l4) or d < abs(l2 - l4) or d == 0: + return [] + + a = (l2**2 - l4**2 + d_sq) / (2 * d) + h = math.sqrt(max(0, l2**2 - a**2)) + + p2_x = e1_x + a * (e2_x - e1_x) / d + p2_y = e1_y + a * (e2_y - e1_y) / d + + x3_1 = p2_x + h * (e2_y - e1_y) / d + y3_1 = p2_y - h * (e2_x - e1_x) / d + + x3_2 = p2_x - h * (e2_y - e1_y) / d + y3_2 = p2_y + h * (e2_x - e1_x) / d + + return [(x3_1, y3_1), (x3_2, y3_2)] + + +class MantisMapGenerator: + """Generate machine coordinates for microplate wells using calibrated homography. + + The pipeline is: + 1. **Ideal coordinates** from A1 offset + linear pitch spacing. + 2. **Corrected coordinates** via a calibrated homography model that accounts + for rotation, scaling, and non-linearities in the stage. + + Args: + a1_x: X offset of well A1 in mm. + a1_y: Y offset of well A1 in mm. + row_pitch: Row-to-row distance in mm. + col_pitch: Column-to-column distance in mm. + rows: Number of rows. + cols: Number of columns. + z: Dispense height in mm. + """ + + # Calibrated stage mapping coefficients (homography) + STAGE_H = [ + 0.984626377954, + 0.012677857034, + -46.6545491181, + -0.022595831852, + 0.988661019679, + 43.0071907794, + -0.000031372585, + -0.000076070276, + ] + + def __init__( + self, + a1_x: float = 14.38, + a1_y: float = 11.24, + row_pitch: float = 7.5, + col_pitch: float = 7.5, + rows: int = 8, + cols: int = 12, + z: float = 44.331, + ) -> None: + self.a1_x = a1_x + self.a1_y = a1_y + self.row_pitch = row_pitch + self.col_pitch = col_pitch + self.rows = rows + self.cols = cols + self.z = z + + def get_corrected_coordinate(self, ideal_x: float, ideal_y: float) -> Tuple[float, float]: + """Map ideal stage coordinate → corrected machine coordinate via homography.""" + h1, h2, h3, h4, h5, h6, h7, h8 = self.STAGE_H + denom = h7 * ideal_x + h8 * ideal_y + 1.0 + machine_x = (h1 * ideal_x + h2 * ideal_y + h3) / denom + machine_y = (h4 * ideal_x + h5 * ideal_y + h6) / denom + return machine_x, machine_y + + def get_well_coordinate(self, row: int, col: int) -> Dict[str, Any]: + """Generate the machine coordinate for a single well. + + Args: + row: 0-indexed row (0 = A). + col: 0-indexed column (0 = 1). + + Returns: + A dictionary with keys ``well``, ``row``, ``col``, ``x``, ``y``, ``z``. + """ + ideal_x = self.a1_x + (col * self.col_pitch) + ideal_y = self.a1_y + (row * self.row_pitch) + mx, my = self.get_corrected_coordinate(ideal_x, ideal_y) + return { + "well": f"{chr(65 + row)}{col + 1}", + "row": row, + "col": col, + "x": round(mx, 3), + "y": round(my, 3), + "z": round(self.z, 3), + } + + def generate_map(self) -> List[Dict[str, Any]]: + """Generate the dispense map for the entire plate.""" + results = [] + for r in range(self.rows): + for c in range(self.cols): + results.append(self.get_well_coordinate(r, c)) + return results diff --git a/pylabrobot/dispensing/mantis/mantis_sequence_parser.py b/pylabrobot/dispensing/mantis/mantis_sequence_parser.py new file mode 100644 index 00000000000..ad651801315 --- /dev/null +++ b/pylabrobot/dispensing/mantis/mantis_sequence_parser.py @@ -0,0 +1,318 @@ +"""Parser for Formulatrix Mantis sequence files. + +Sequence files are line-based text files where each line starts with a class name +followed by whitespace-separated tokens. This module tokenizes and parses those +lines into typed sequence-item objects. +""" + +from __future__ import annotations + +import re +from enum import Enum +from typing import Dict, List, Optional, Type + +# --------------------------------------------------------------------------- +# Enums +# --------------------------------------------------------------------------- + + +class ValveState(Enum): + """Open/closed state of a valve.""" + + CLOSED = 0 + OPEN = 1 + + def __str__(self) -> str: + return "Open" if self == ValveState.OPEN else "Closed" + + +class ValveName(Enum): + """Named valves on the Mantis chip carrier.""" + + SMALL = 0 + LARGE = 1 + FILL = 2 + PURGE = 3 + WASTE = 4 + WASH_PUMP = 5 + OVERFLOW = 6 + AIR_PURGE = 7 + INPUT_AIR_VENT = 8 + WASTE_CLEAR = 9 + INPUT_PRESSURE_SELECT = 10 + WASH_STATION_PUMP = 11 + WASH_INPUT_SELECT = 14 + PRESSURE_VACUUM_SWITCH = 17 + PLATE_STACKER = 18 + PLATE_STACKER_LATCH_1 = 19 + PLATE_STACKER_LATCH_2 = 20 + OUTPUT = 21 + INPUT = 22 + + +class MoveType(Enum): + """Type of move command.""" + + ABSOLUTE = 0 + RELATIVE = 1 + + +class PressureType(Enum): + """Pressure controller identifiers.""" + + CHIP = 0 + BOTTLE = 1 + VACUUM = 2 + WASH = 3 + WASH_VACUUM = 4 + PRIME_PRESSURE = 5 + PRIME_VACUUM = 6 + RECOVERY_PRESSURE = 7 + RECOVERY_VACUUM = 8 + + @staticmethod + def from_string(s: str) -> "PressureType": + for member in PressureType: + if member.name.lower() == s.lower(): + return member + raise ValueError(f"Unknown PressureType: {s}") + + +class SequenceItemQueueingType(Enum): + """How the sequencer handles this item.""" + + NON_QUEUED_NON_BLOCKING = 0 + NON_QUEUED_BLOCKING = 1 + QUEUED = 2 + + +# --------------------------------------------------------------------------- +# Tokenizer +# --------------------------------------------------------------------------- + + +class Token: + """A single whitespace-delimited token from a sequence line.""" + + def __init__(self, text: str) -> None: + self.text = text + + def decode_string(self) -> str: + if self.text == "%00": + return "" + + def _replace_match(match: re.Match) -> str: + return chr(int(match.group(1))) + + return re.sub(r"%(\\d{2})", _replace_match, self.text) + + def to_integer(self) -> int: + return int(self.text) + + def to_double(self) -> float: + return float(self.text) + + def to_bool(self) -> bool: + return self.text.lower() == "true" + + def to_string(self) -> str: + return self.text + + +class StringTokenizer: + """Splits a line into :class:`Token` objects and iterates over them.""" + + def __init__(self, data: str) -> None: + self.tokens = [Token(t) for t in data.split()] + self.current_index = 0 + + def has_more_tokens(self) -> bool: + return self.current_index < len(self.tokens) + + def next(self) -> Token: + if not self.has_more_tokens(): + raise RuntimeError("Expected a token when none was found.") + token = self.tokens[self.current_index] + self.current_index += 1 + return token + + +# --------------------------------------------------------------------------- +# Valve device state +# --------------------------------------------------------------------------- + + +class ValveDeviceState: + """Decoded valve state from a sequence line's bitstring tokens.""" + + OUTPUT_SIZE = 96 + INPUT_SIZE = 16 + FILL_VALVE_CONTROL_COUNT = 3 + + def __init__(self, tokenizer: Optional[StringTokenizer] = None) -> None: + self.main_valves: Dict[ValveName, ValveState] = {} + self.fill: List[ValveState] = [ValveState.CLOSED] * self.FILL_VALVE_CONTROL_COUNT + self.output: List[ValveState] = [ValveState.CLOSED] * self.OUTPUT_SIZE + self.input: List[ValveState] = [ValveState.CLOSED] * self.INPUT_SIZE + + if tokenizer: + self._parse(tokenizer) + + def _parse(self, tokenizer: StringTokenizer) -> None: + # 1st token: main valves + fill + str1 = tokenizer.next().to_string() + for i, char in enumerate(str1): + state = ValveState.OPEN if char == "1" else ValveState.CLOSED + if i in (12, 13): + if len(self.fill) > (i - 11): + self.fill[i - 11] = state + elif i in (15, 16): + pass + else: + try: + vn = ValveName(i) + if vn == ValveName.FILL: + self.fill[0] = state + else: + self.main_valves[vn] = state + except ValueError: + pass + + # 2nd token: output valves + if tokenizer.has_more_tokens(): + str2 = tokenizer.next().to_string() + for i, char in enumerate(str2): + if i < len(self.output): + self.output[i] = ValveState.OPEN if char == "1" else ValveState.CLOSED + + # 3rd token: input valves + if tokenizer.has_more_tokens(): + str3 = tokenizer.next().to_string() + for i, char in enumerate(str3): + if i < len(self.input): + self.input[i] = ValveState.OPEN if char == "1" else ValveState.CLOSED + + +# --------------------------------------------------------------------------- +# Sequence items +# --------------------------------------------------------------------------- + + +class SequenceItem: + """Base class for a single parsed sequence line.""" + + def __init__(self, tokenizer: StringTokenizer) -> None: + self.status_message = tokenizer.next().decode_string() + self.queueing_type = SequenceItemQueueingType.QUEUED + self.delay = 0 + + +class ValveStateSequenceItem(SequenceItem): + def __init__(self, tokenizer: StringTokenizer) -> None: + super().__init__(tokenizer) + self.delay = tokenizer.next().to_integer() + self.device_state = ValveDeviceState(tokenizer) + + +class MoveSequenceItem(SequenceItem): + def __init__(self, tokenizer: StringTokenizer) -> None: + super().__init__(tokenizer) + self.wait = tokenizer.next().to_bool() + self.x: Optional[float] = None + self.y: Optional[float] = None + self.z: Optional[float] = None + self.move_type = MoveType.ABSOLUTE + + while tokenizer.has_more_tokens(): + token = tokenizer.next() + s = token.decode_string().lower() + if s == "x": + self.x = tokenizer.next().to_double() + self.move_type = MoveType.ABSOLUTE + elif s == "y": + self.y = tokenizer.next().to_double() + self.move_type = MoveType.ABSOLUTE + elif s == "z": + self.z = tokenizer.next().to_double() + self.move_type = MoveType.ABSOLUTE + elif s == "dx": + self.x = tokenizer.next().to_double() + self.move_type = MoveType.RELATIVE + elif s == "dy": + self.y = tokenizer.next().to_double() + self.move_type = MoveType.RELATIVE + elif s == "dz": + self.z = tokenizer.next().to_double() + self.move_type = MoveType.RELATIVE + else: + try: + self.delay = int(s) + except ValueError: + pass + + +class AirPumpSequenceItem(SequenceItem): + def __init__(self, tokenizer: StringTokenizer) -> None: + super().__init__(tokenizer) + self.queueing_type = SequenceItemQueueingType.NON_QUEUED_BLOCKING + val = tokenizer.next().decode_string().lower() + self.state = ValveState.OPEN if val == "on" else ValveState.CLOSED + self.timer = tokenizer.next().to_integer() if tokenizer.has_more_tokens() else 0 + + +class RemarkSequenceItem(SequenceItem): + def __init__(self, tokenizer: StringTokenizer) -> None: + super().__init__(tokenizer) + if tokenizer.has_more_tokens() and tokenizer.next().to_string().lower() == "blocking": + self.queueing_type = SequenceItemQueueingType.NON_QUEUED_BLOCKING + else: + self.queueing_type = SequenceItemQueueingType.NON_QUEUED_NON_BLOCKING + + +class PressureRegulatorSequenceItem(SequenceItem): + def __init__(self, tokenizer: StringTokenizer) -> None: + super().__init__(tokenizer) + self.pressure_type = PressureType.from_string(tokenizer.next().to_string()) + self.delay = tokenizer.next().to_integer() + self.pressure = tokenizer.next().to_string() + self.ignore_warning = tokenizer.next().to_bool() if tokenizer.has_more_tokens() else False + self.queueing_type = SequenceItemQueueingType.NON_QUEUED_BLOCKING + + +class MantisActiveAccPortIndexSequenceItem(SequenceItem): + def __init__(self, tokenizer: StringTokenizer) -> None: + super().__init__(tokenizer) + self.queueing_type = SequenceItemQueueingType.NON_QUEUED_NON_BLOCKING + self.acc_port_index = tokenizer.next().to_integer() + self.input_id = tokenizer.next().to_string() if tokenizer.has_more_tokens() else "" + + +# --------------------------------------------------------------------------- +# Parser +# --------------------------------------------------------------------------- + +CLASS_MAP: Dict[str, Type[SequenceItem]] = { + "ValveStateSequenceItem": ValveStateSequenceItem, + "MoveSequenceItem": MoveSequenceItem, + "AirPumpSequenceItem": AirPumpSequenceItem, + "RemarkSequenceItem": RemarkSequenceItem, + "PressureRegulatorSequenceItem": PressureRegulatorSequenceItem, + "MantisActiveAccPortIndexSequenceItem": MantisActiveAccPortIndexSequenceItem, +} + + +def parse_sequence_file(file_path: str) -> List[SequenceItem]: + """Parse a Mantis sequence file into a list of :class:`SequenceItem` objects.""" + items: List[SequenceItem] = [] + with open(file_path, "r", encoding="utf-8-sig") as f: + for line in f: + line = line.strip() + if not line or line.startswith("//"): + continue + tokenizer = StringTokenizer(line) + if not tokenizer.has_more_tokens(): + continue + class_name = tokenizer.next().to_string() + if class_name in CLASS_MAP: + items.append(CLASS_MAP[class_name](tokenizer)) + return items diff --git a/pylabrobot/dispensing/mantis/mantis_tests.py b/pylabrobot/dispensing/mantis/mantis_tests.py new file mode 100644 index 00000000000..1c6256d9c29 --- /dev/null +++ b/pylabrobot/dispensing/mantis/mantis_tests.py @@ -0,0 +1,200 @@ +"""Tests for the Mantis backend components. + +These tests exercise the kinematics, map generator, FmlxPacket serialisation, +and constants — all without requiring hardware. +""" + +import struct +import unittest + +from pylabrobot.dispensing.mantis.fmlx_driver import FmlxPacket, decode_response +from pylabrobot.dispensing.mantis.mantis_constants import MotorStatusCode, PressureControlStatus +from pylabrobot.dispensing.mantis.mantis_kinematics import ( + MOTOR_1_CONFIG, + MOTOR_3_CONFIG, + MantisKinematics, + MantisMapGenerator, +) + + +class TestMotorStatusCode(unittest.TestCase): + """Test MotorStatusCode IntFlag behaviour.""" + + def test_error_mask(self): + mask = MotorStatusCode.error_mask() + self.assertTrue(mask & MotorStatusCode.OVER_CURRENT) + self.assertTrue(mask & MotorStatusCode.ABORTED) + self.assertFalse(mask & MotorStatusCode.IS_MOVING) + self.assertFalse(mask & MotorStatusCode.IS_HOMED) + + def test_bitwise_check(self): + status = MotorStatusCode.IS_MOVING | MotorStatusCode.IS_HOMED + self.assertTrue(status & MotorStatusCode.IS_MOVING) + self.assertTrue(status & MotorStatusCode.IS_HOMED) + self.assertFalse(status & MotorStatusCode.OVER_CURRENT) + + def test_pressure_status_enum(self): + self.assertEqual(PressureControlStatus.OFF, 0) + self.assertEqual(PressureControlStatus.SETTLED, 1) + self.assertEqual(PressureControlStatus.UNSETTLED, 2) + + +class TestMotorConfig(unittest.TestCase): + """Test MotorConfig unit conversions.""" + + def test_roundtrip_position(self): + """to_packet_units and from_packet_units should be inverses for position.""" + for val in [0.0, 45.0, -90.0, 180.0]: + pkt = MOTOR_1_CONFIG.to_packet_units(val) + back = MOTOR_1_CONFIG.from_packet_units(pkt) + self.assertAlmostEqual(val, back, places=6, msg=f"Roundtrip failed for {val}") + + def test_z_axis_roundtrip(self): + for val in [0.0, 5.0, 13.0, -1.5]: + pkt = MOTOR_3_CONFIG.to_packet_units(val) + back = MOTOR_3_CONFIG.from_packet_units(pkt) + self.assertAlmostEqual(val, back, places=6) + + def test_velocity_zero(self): + self.assertEqual(MOTOR_1_CONFIG.to_packet_units(0.0, is_velocity_or_accel=True), 0.0) + self.assertEqual(MOTOR_1_CONFIG.from_packet_units(0.0, is_velocity_or_accel=True), 0.0) + + +class TestMantisKinematics(unittest.TestCase): + """Test inverse and forward kinematics.""" + + def test_xy_to_theta_known_point(self): + """Home position should produce valid angles.""" + theta1, theta2 = MantisKinematics.xy_to_theta(15.0, 31.177) + self.assertIsInstance(theta1, float) + self.assertIsInstance(theta2, float) + + def test_xy_to_theta_origin_raises(self): + with self.assertRaises(ValueError): + MantisKinematics.xy_to_theta(0.0, 0.0) + + def test_roundtrip_xy(self): + """Forward(Inverse(x,y)) should recover the original (x,y) for reachable points.""" + test_points = [ + (15.0, 31.177), + (50.0, 50.0), + (-40.0, 60.0), + ] + for x, y in test_points: + theta1, theta2 = MantisKinematics.xy_to_theta(x, y) + candidates = MantisKinematics.theta_to_xy(theta1, theta2) + self.assertTrue(len(candidates) > 0, f"No FK solution for ({x}, {y})") + # One of the candidates should match the original + matched = False + for cx, cy in candidates: + if abs(cx - x) < 0.1 and abs(cy - y) < 0.1: + matched = True + break + self.assertTrue(matched, f"FK roundtrip failed for ({x}, {y}): candidates={candidates}") + + def test_theta_to_xy_unreachable(self): + """Extreme angles should return empty list.""" + result = MantisKinematics.theta_to_xy(0.0, 0.0) + # Whether this is reachable depends on geometry; just check it doesn't crash + self.assertIsInstance(result, list) + + +class TestMantisMapGenerator(unittest.TestCase): + """Test microplate coordinate generation.""" + + def test_a1_coordinate(self): + gen = MantisMapGenerator(a1_x=14.38, a1_y=11.24, row_pitch=9.0, col_pitch=9.0) + coord = gen.get_well_coordinate(0, 0) + self.assertEqual(coord["well"], "A1") + self.assertEqual(coord["row"], 0) + self.assertEqual(coord["col"], 0) + self.assertIsInstance(coord["x"], float) + self.assertIsInstance(coord["y"], float) + + def test_well_ordering(self): + gen = MantisMapGenerator() + a1 = gen.get_well_coordinate(0, 0) + a2 = gen.get_well_coordinate(0, 1) + # A2 should be to the right of A1 in ideal coordinates, but after homography + # we just check they're different + self.assertNotEqual(a1["x"], a2["x"]) + + def test_generate_map_size(self): + gen = MantisMapGenerator(rows=8, cols=12) + full_map = gen.generate_map() + self.assertEqual(len(full_map), 96) + + def test_generate_map_384(self): + gen = MantisMapGenerator(rows=16, cols=24, row_pitch=4.5, col_pitch=4.5) + full_map = gen.generate_map() + self.assertEqual(len(full_map), 384) + + def test_z_propagation(self): + gen = MantisMapGenerator(z=42.0) + coord = gen.get_well_coordinate(3, 5) + self.assertEqual(coord["z"], 42.0) + + +class TestFmlxPacket(unittest.TestCase): + """Test FMLX packet construction and checksum.""" + + def test_minimal_packet(self): + pkt = FmlxPacket(1, address=0) + raw = pkt.to_bytes() + # Header (12) + checksum (2) = 14 bytes minimum + self.assertEqual(len(raw), 14) + # Size field should be 12 (header only, no payload) + size = struct.unpack_from("