From 734ad5134e4ed3deb527e4b4d018e5a53ecf70cd Mon Sep 17 00:00:00 2001 From: Robert-Keyser-Calico Date: Sat, 28 Mar 2026 10:48:44 -0700 Subject: [PATCH 01/37] Phase 0: Update CLAUDE.md for v1b1 Tecan EVO migration Add v1b1 architecture documentation, branch strategy, Air LiHa key facts, and file structure plan for the native v1b1 EVO backend. Co-Authored-By: Claude Opus 4.6 (1M context) --- claude.md | 105 +++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 104 insertions(+), 1 deletion(-) diff --git a/claude.md b/claude.md index dea76713f79..60c7fe0d49f 100644 --- a/claude.md +++ b/claude.md @@ -1 +1,104 @@ -pretend you are Jeff Dean +# PyLabRobot - Lab Automation Integration + +## Project Overview +PyLabRobot is a hardware-agnostic Python library for lab automation. We are integrating several instruments for automated liquid handling, bulk dispensing, and robotic plate movement. + +## Our Lab Equipment + +### Tecan EVO 150 (Liquid Handler) +- **Backend**: `EVOBackend` from `pylabrobot.liquid_handling.backends.tecan` +- **Frontend**: `LiquidHandler` from `pylabrobot.liquid_handling` +- **Deck**: `EVO150Deck` (45 rails, 1315 x 780 x 765 mm) +- **Connection**: USB (VID=0x0C47, PID=0x4000) +- **Install**: `pip install -e ".[usb]"` + +#### Default Deck Components +- **Plate carrier**: `MP_3Pos` (Tecan part no. 10612604) - 3-position microplate carrier + - Import: `from pylabrobot.resources.tecan.plate_carriers import MP_3Pos` +- **Tip carrier**: `DiTi_3Pos` (Tecan part no. 10613022) - 3-position DiTi carrier + - Import: `from pylabrobot.resources.tecan.tip_carriers import DiTi_3Pos` +- **Tips**: `DiTi_50ul_SBS_LiHa` - 50uL disposable tips for LiHa + - Import: `from pylabrobot.resources.tecan.tip_racks import DiTi_50ul_SBS_LiHa` +- **Plates**: `Eppendorf_96_wellplate_250ul_Vb` - Eppendorf twin.tec 96-well (250uL, V-bottom) + - Import: `from pylabrobot.resources.eppendorf.plates import Eppendorf_96_wellplate_250ul_Vb` + +### Thermo Scientific Multidrop Combi (Bulk Dispenser) +- **Backend**: `MultidropCombiBackend` from `pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi` +- **Frontend**: `BulkDispenser` from `pylabrobot.bulk_dispensers` +- **Connection**: RS232 via USB adapter (specify COM port explicitly) +- **Serial config**: 9600 baud, 8N1, XON/XOFF +- **Install**: `pip install -e ".[serial]"` +- **Plate helpers**: `plate_to_type_index()`, `plate_to_pla_params()` for PLR plate → Multidrop mapping +- **Protocol docs**: `C:\Users\keyser\source\repos\keyser-sila-testing\documentation\Multidrop Combi Remote Control Command Sets (1).pdf` + +### UFACTORY xArm 6 (Robotic Arm) +- **Backend**: `XArm6Backend` from `pylabrobot.arms.xarm6.xarm6_backend` +- **Frontend**: `SixAxisArm` from `pylabrobot.arms.six_axis` +- **Connection**: Ethernet (IP address) +- **Install**: `pip install xarm-python-sdk` + +## Architecture Patterns + +### Legacy Architecture (main branch) +Each device category follows this pattern: +- **Frontend class** (`Machine` subclass) - thin delegation layer with `@need_setup_finished` guards +- **Abstract backend** (`MachineBackend` subclass with `ABCMeta`) - defines the device-type interface +- **Concrete backend** - implements the abstract backend for a specific instrument +- **Chatterbox backend** - prints operations for testing without hardware + +Key base classes: +- `pylabrobot/machines/backend.py` - `MachineBackend(SerializableMixin, ABC)` +- `pylabrobot/machines/machine.py` - `Machine(SerializableMixin, ABC)` + `need_setup_finished` decorator + +### v1b1 Architecture (v1b1 branch) +New capability-based architecture replacing the monolithic backend model: +- **Driver** (`pylabrobot/device.py`) - owns I/O, connection lifecycle (`setup()/stop()`) +- **CapabilityBackend** (`pylabrobot/capabilities/capability.py`) - protocol translation for one concern +- **Capability** - user-facing API with validation, tip tracking, etc. +- **Device** - owns Driver + list of Capabilities, orchestrates lifecycle +- **BackendParams** - typed dataclasses replacing `**kwargs` + +Key interfaces for Tecan EVO migration: +- `PIPBackend` (`pylabrobot/capabilities/liquid_handling/pip_backend.py`) - independent channel pipetting +- `GripperArmBackend` (`pylabrobot/arms/backend.py`) - plate handling arms +- Reference implementation: Hamilton STAR at `pylabrobot/hamilton/liquid_handlers/star/` + +## Branch Strategy + +| Branch | Base | Purpose | +|--------|------|---------| +| `air-liha-backend` | `main` | Legacy Air LiHa backend (WIP, may PR to main) | +| `v1b1-tecan-evo` | `origin/v1b1` | Native v1b1 EVO backend (syringe + Air LiHa + RoMa) | +| `keyser-combined` | `main` | Combined xArm + Multidrop for testing | +| `keyser-multidrop-testing` | `main` | Multidrop Combi backend | +| `keyser-xarm-testing` | `main` | xArm 6 backend | + +### v1b1 Tecan EVO File Structure +``` +pylabrobot/tecan/evo/ + driver.py # TecanEVODriver(Driver) — USB I/O + command protocol + pip_backend.py # EVOPIPBackend(PIPBackend) — syringe LiHa + air_pip_backend.py # AirEVOPIPBackend(EVOPIPBackend) — Air LiHa + roma_backend.py # EVORoMaBackend(GripperArmBackend) — RoMa plate handling + evo.py # TecanEVO(Resource, Device) — composite device + params.py # BackendParams dataclasses + errors.py # TecanError + firmware/ # Extracted firmware command wrappers (LiHa, RoMa, EVOArm) +``` + +Legacy EVO stays at: `pylabrobot/legacy/liquid_handling/backends/tecan/` + +## Air LiHa (ZaapMotion) Key Facts +- ZaapMotion controllers boot into bootloader mode after power cycle +- Must send `T2{0-7}X` (exit boot) + 33 motor config commands per tip before PIA +- Plunger conversion: 106.4 steps/uL (vs 3 for syringe), 213 speed factor (vs 6) +- Force mode: `SFR133120`+`SFP1` before plunger ops, `SFR3752`+`SDP1400` after +- Investigation details: `keyser-testing/AirLiHa_Investigation.md` + +## Development +- Venv: `.venv/` in project root +- Install all deps: `pip install -e ".[serial,usb]"` +- Tests: `python -m pytest pylabrobot/bulk_dispensers/ -v` +- Lint: `ruff check`, Format: `ruff format`, Types: `mypy pylabrobot --check-untyped-defs` +- Abstract interfaces use microliters (float); backends convert to instrument-specific units +- Tecan Z coordinates: 0 = deck surface, z_range (~2100) = top/home From 81e68cc4cc1eddadd0faef9b0e4f8c5c3e5abd39 Mon Sep 17 00:00:00 2001 From: Robert-Keyser-Calico Date: Sat, 28 Mar 2026 10:59:16 -0700 Subject: [PATCH 02/37] Phase 1: Extract Tecan EVO firmware wrappers to pylabrobot/tecan/evo/ MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract EVOArm, LiHa, RoMa firmware command wrappers from the legacy EVO_backend.py into a standalone firmware module under the new vendor namespace pylabrobot/tecan/evo/firmware/. Key changes from legacy: - Accept duck-typed CommandInterface (Protocol) instead of concrete EVOBackend reference — allows firmware wrappers to work with both legacy backend and new TecanEVODriver - self.backend renamed to self.interface for clarity - LiHa._drop_disposable_tip renamed to drop_disposable_tip (no leading _) - TecanError and error_code_to_exception moved to pylabrobot/tecan/evo/errors.py File structure: pylabrobot/tecan/evo/errors.py - error types pylabrobot/tecan/evo/firmware/arm_base.py - EVOArm base + CommandInterface pylabrobot/tecan/evo/firmware/liha.py - LiHa commands pylabrobot/tecan/evo/firmware/roma.py - RoMa commands Co-Authored-By: Claude Opus 4.6 (1M context) --- pylabrobot/tecan/__init__.py | 0 pylabrobot/tecan/evo/__init__.py | 0 pylabrobot/tecan/evo/errors.py | 83 ++++++++ pylabrobot/tecan/evo/firmware/__init__.py | 3 + pylabrobot/tecan/evo/firmware/arm_base.py | 62 ++++++ pylabrobot/tecan/evo/firmware/liha.py | 238 ++++++++++++++++++++++ pylabrobot/tecan/evo/firmware/roma.py | 180 ++++++++++++++++ pylabrobot/tecan/evo/tests/__init__.py | 0 8 files changed, 566 insertions(+) create mode 100644 pylabrobot/tecan/__init__.py create mode 100644 pylabrobot/tecan/evo/__init__.py create mode 100644 pylabrobot/tecan/evo/errors.py create mode 100644 pylabrobot/tecan/evo/firmware/__init__.py create mode 100644 pylabrobot/tecan/evo/firmware/arm_base.py create mode 100644 pylabrobot/tecan/evo/firmware/liha.py create mode 100644 pylabrobot/tecan/evo/firmware/roma.py create mode 100644 pylabrobot/tecan/evo/tests/__init__.py diff --git a/pylabrobot/tecan/__init__.py b/pylabrobot/tecan/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/pylabrobot/tecan/evo/__init__.py b/pylabrobot/tecan/evo/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/pylabrobot/tecan/evo/errors.py b/pylabrobot/tecan/evo/errors.py new file mode 100644 index 00000000000..d5523eb6b80 --- /dev/null +++ b/pylabrobot/tecan/evo/errors.py @@ -0,0 +1,83 @@ +"""Tecan EVO error types and error code tables.""" + + +class TecanError(Exception): + """Tecan backend errors, raised by a single module.""" + + def __init__( + self, + message: str, + module: str, + error_code: int, + ): + self.message = message + self.module = module + self.error_code = error_code + + def __repr__(self) -> str: + return f"{self.__class__.__name__}('{self.message}')" + + +def error_code_to_exception(module: str, error_code: int) -> TecanError: + """Convert an error code to an exception.""" + table = None + + if module == "C5": + table = { + 1: "Initialization failed", + 2: "Invalid command", + 3: "Invalid operand", + 4: "CAN acknowledge problems", + 5: "Device not implemented", + 6: "CAN answer timeout", + 7: "Device not initialized", + 8: "Command overflow of TeCU", + 9: "No liquid detected", + 10: "Drive no load", + 11: "Not enough liquid", + 12: "Not enough liquid", + 13: "No Flash access", + 15: "Command overflow of subdevice", + 17: "Measurement failed", + 18: "Clot limit passed", + 19: "No clot exit detected", + 20: "No liquid exit detected", + 21: "Delta pressure overrun (pLLD)", + 22: "Tip Guard in wrong position", + 23: "Not yet moved or move aborted", + 24: "llid pulse error or reed crosstalk error", + 25: "Tip not fetched", + 26: "Tip not mounted", + 27: "Tip mounted", + 28: "Subdevice error", + 29: "Application switch and axes mismatch", + 30: "Wrong DC-Servo type", + 31: "Virtual Drive", + } + elif module == "C1": + table = { + 1: "Initialization failed", + 2: "Invalid command", + 3: "Invalid operand", + 4: "CAN acknowledge problems", + 5: "Device not implemented", + 6: "CAN answer timeout", + 7: "Device not initialized", + 8: "Command overflow of TeCU", + 9: "Plate not fetched", + 10: "Drive no load", + 11: "Sub device not ready yet", + 13: "No access to Flash-EPROM", + 14: "Hardware not defined", + 15: "Command overflow of this device", + 17: "Verification failed", + 21: "BCS communication error", + 25: "Download Error", + 28: "Sub device error", + 30: "Invalid servo version", + } + + if table is not None and error_code in table: + return TecanError(table[error_code], module, error_code) + + return TecanError(f"Unknown error code {error_code}", module, error_code) diff --git a/pylabrobot/tecan/evo/firmware/__init__.py b/pylabrobot/tecan/evo/firmware/__init__.py new file mode 100644 index 00000000000..530102007ea --- /dev/null +++ b/pylabrobot/tecan/evo/firmware/__init__.py @@ -0,0 +1,3 @@ +from pylabrobot.tecan.evo.firmware.arm_base import EVOArm +from pylabrobot.tecan.evo.firmware.liha import LiHa +from pylabrobot.tecan.evo.firmware.roma import RoMa diff --git a/pylabrobot/tecan/evo/firmware/arm_base.py b/pylabrobot/tecan/evo/firmware/arm_base.py new file mode 100644 index 00000000000..8ff137117ec --- /dev/null +++ b/pylabrobot/tecan/evo/firmware/arm_base.py @@ -0,0 +1,62 @@ +"""Base class for Tecan EVO arm firmware wrappers. + +Provides position caching for collision avoidance between arms sharing the +same worktable (e.g. LiHa and RoMa X-axes). +""" + +from __future__ import annotations + +from typing import Dict, List, Protocol, runtime_checkable + + +@runtime_checkable +class CommandInterface(Protocol): + """Duck-typed interface for anything that can send Tecan firmware commands. + + This allows firmware wrappers to work with either the legacy EVOBackend + or the new TecanEVODriver without importing either. + """ + + async def send_command( + self, + module: str, + command: str, + params: list | None = ..., + **kwargs: object, + ) -> dict: ... + + +class EVOArm: + """Base class for EVO arm firmware wrappers. Caches arm positions.""" + + _pos_cache: Dict[str, int] = {} + + def __init__(self, interface: CommandInterface, module: str): + self.interface = interface + self.module = module + + async def position_initialization_x(self) -> None: + """Reinitializes X-axis of the arm.""" + await self.interface.send_command(module=self.module, command="PIX") + + async def report_x_param(self, param: int) -> int: + """Report current parameter for x-axis. + + Args: + param: 0 - current position, 5 - actual machine range + """ + resp: List[int] = ( + await self.interface.send_command(module=self.module, command="RPX", params=[param]) + )["data"] + return resp[0] + + async def report_y_param(self, param: int) -> List[int]: + """Report current parameters for y-axis. + + Args: + param: 0 - current position, 5 - actual machine range + """ + resp: List[int] = ( + await self.interface.send_command(module=self.module, command="RPY", params=[param]) + )["data"] + return resp diff --git a/pylabrobot/tecan/evo/firmware/liha.py b/pylabrobot/tecan/evo/firmware/liha.py new file mode 100644 index 00000000000..da505376ed2 --- /dev/null +++ b/pylabrobot/tecan/evo/firmware/liha.py @@ -0,0 +1,238 @@ +"""Firmware command wrapper for the Tecan EVO LiHa (Liquid Handling Arm). + +Provides typed methods for all LiHa firmware commands (plunger control, +valve positioning, Z-axis movement, liquid detection, tip handling). +""" + +from __future__ import annotations + +from typing import List, Optional + +from pylabrobot.tecan.evo.errors import TecanError +from pylabrobot.tecan.evo.firmware.arm_base import EVOArm + + +class LiHa(EVOArm): + """Firmware commands for the LiHa (Liquid Handling Arm).""" + + async def initialize_plunger(self, tips: int) -> None: + """Initializes plunger and valve drive. + + Args: + tips: binary coded tip select + """ + await self.interface.send_command(module=self.module, command="PID", params=[tips]) + + async def report_z_param(self, param: int) -> List[int]: + """Report current parameters for z-axis. + + Args: + param: 0=position, 1=accel, 2=fast_speed, 3=init_speed, 4=init_offset, + 5=range, 6=encoder_deviation, 9=slow_speed, 10=scale, 11=target, 12=travel + """ + resp: List[int] = ( + await self.interface.send_command(module=self.module, command="RPZ", params=[param]) + )["data"] + return resp + + async def report_number_tips(self) -> int: + """Report number of tips on arm.""" + resp: List[int] = ( + await self.interface.send_command(module=self.module, command="RNT", params=[1]) + )["data"] + return resp[0] + + async def position_absolute_all_axis(self, x: int, y: int, ys: int, z: List[int]) -> None: + """Position absolute for all LiHa axes. + + Args: + x: absolute x position in 1/10 mm + y: absolute y position in 1/10 mm + ys: absolute y spacing in 1/10 mm (90-380) + z: absolute z position in 1/10 mm for each channel + + Raises: + TecanError: if moving to the target position causes a collision + """ + cur_x = EVOArm._pos_cache.setdefault(self.module, await self.report_x_param(0)) + for module, pos in EVOArm._pos_cache.items(): + if module == self.module: + continue + if cur_x < x and cur_x < pos < x: + raise TecanError("Invalid command (collision)", self.module, 2) + if cur_x > x and cur_x > pos > x: + raise TecanError("Invalid command (collision)", self.module, 2) + if abs(pos - x) < 1500: + raise TecanError("Invalid command (collision)", self.module, 2) + + await self.interface.send_command( + module=self.module, command="PAA", params=list([x, y, ys] + z) + ) + EVOArm._pos_cache[self.module] = x + + async def position_valve_logical(self, param: List[Optional[int]]) -> None: + """Position valve logical for each channel. + + Args: + param: 0=outlet, 1=inlet, 2=bypass + """ + await self.interface.send_command(module=self.module, command="PVL", params=param) + + async def set_end_speed_plunger(self, speed: List[Optional[int]]) -> None: + """Set end speed for plungers. + + Args: + speed: half steps/sec per channel (5-6000) + """ + await self.interface.send_command(module=self.module, command="SEP", params=speed) + + async def move_plunger_relative(self, rel: List[Optional[int]]) -> None: + """Move plunger relative (positive=aspirate, negative=dispense). + + Args: + rel: full steps per channel (-3150 to 3150) + """ + await self.interface.send_command(module=self.module, command="PPR", params=rel) + + async def set_stop_speed_plunger(self, speed: List[Optional[int]]) -> None: + """Set stop speed for plungers. + + Args: + speed: half steps/sec per channel (50-2700) + """ + await self.interface.send_command(module=self.module, command="SPP", params=speed) + + async def set_detection_mode(self, proc: int, sense: int) -> None: + """Set liquid detection mode. + + Args: + proc: detection procedure (7 = double detection sequential) + sense: conductivity (1 = high) + """ + await self.interface.send_command(module=self.module, command="SDM", params=[proc, sense]) + + async def set_search_speed(self, speed: List[Optional[int]]) -> None: + """Set search speed for liquid search commands. + + Args: + speed: 1/10 mm/s per channel (1-1500) + """ + await self.interface.send_command(module=self.module, command="SSL", params=speed) + + async def set_search_retract_distance(self, dist: List[Optional[int]]) -> None: + """Set z-axis retract distance for liquid search commands. + + Args: + dist: 1/10 mm per channel + """ + await self.interface.send_command(module=self.module, command="SDL", params=dist) + + async def set_search_submerge(self, dist: List[Optional[int]]) -> None: + """Set submerge for liquid search commands. + + Args: + dist: 1/10 mm per channel (-1000 to z_range) + """ + await self.interface.send_command(module=self.module, command="SBL", params=dist) + + async def set_search_z_start(self, z: List[Optional[int]]) -> None: + """Set z-start for liquid search commands. + + Args: + z: 1/10 mm per channel + """ + await self.interface.send_command(module=self.module, command="STL", params=z) + + async def set_search_z_max(self, z: List[Optional[int]]) -> None: + """Set z-max for liquid search commands. + + Args: + z: 1/10 mm per channel + """ + await self.interface.send_command(module=self.module, command="SML", params=z) + + async def set_z_travel_height(self, z: List[int]) -> None: + """Set z-travel height. + + Args: + z: travel heights in 1/10 mm per channel + """ + await self.interface.send_command(module=self.module, command="SHZ", params=z) + + async def move_detect_liquid(self, channels: int, zadd: List[Optional[int]]) -> None: + """Move tip, detect liquid, submerge. + + Args: + channels: binary coded tip select + zadd: distance to travel downwards in 1/10 mm per channel + """ + await self.interface.send_command( + module=self.module, + command="MDT", + params=[channels] + [None] * 3 + zadd, + ) + + async def set_slow_speed_z(self, speed: List[Optional[int]]) -> None: + """Set slow speed for z. + + Args: + speed: 1/10 mm/s per channel (1-4000) + """ + await self.interface.send_command(module=self.module, command="SSZ", params=speed) + + async def set_tracking_distance_z(self, rel: List[Optional[int]]) -> None: + """Set z-axis relative tracking distance for aspirate/dispense. + + Args: + rel: 1/10 mm per channel (-2100 to 2100) + """ + await self.interface.send_command(module=self.module, command="STZ", params=rel) + + async def move_tracking_relative(self, rel: List[Optional[int]]) -> None: + """Move tracking relative (synchronous Z and plunger movement). + + Args: + rel: full steps per channel (-3150 to 3150) + """ + await self.interface.send_command(module=self.module, command="MTR", params=rel) + + async def move_absolute_z(self, z: List[Optional[int]]) -> None: + """Position absolute with slow speed z-axis. + + Args: + z: absolute position in 1/10 mm per channel + """ + await self.interface.send_command(module=self.module, command="MAZ", params=z) + + async def get_disposable_tip(self, tips: int, z_start: int, z_search: int) -> None: + """Pick up disposable tips. + + Args: + tips: binary coded tip select + z_start: position in 1/10 mm where searching begins + z_search: search distance in 1/10 mm + """ + await self.interface.send_command( + module=self.module, + command="AGT", + params=[tips, z_start, z_search, 0], + ) + + async def discard_disposable_tip_high(self, tips: int) -> None: + """Discard tips at Z-axis initialization height. + + Args: + tips: binary coded tip select + """ + await self.interface.send_command(module=self.module, command="ADT", params=[tips]) + + async def drop_disposable_tip(self, tips: int, discard_height: int) -> None: + """Discard tips at variable height. + + Args: + tips: binary coded tip select + discard_height: 0=above tip rack, 1=in tip rack + """ + await self.interface.send_command( + module=self.module, command="AST", params=[tips, discard_height] + ) diff --git a/pylabrobot/tecan/evo/firmware/roma.py b/pylabrobot/tecan/evo/firmware/roma.py new file mode 100644 index 00000000000..8cec9a5a92d --- /dev/null +++ b/pylabrobot/tecan/evo/firmware/roma.py @@ -0,0 +1,180 @@ +"""Firmware command wrapper for the Tecan EVO RoMa (Robotic Manipulator Arm). + +Provides typed methods for RoMa firmware commands (vector positioning, +gripper control, speed configuration). +""" + +from __future__ import annotations + +from typing import List, Optional + +from pylabrobot.tecan.evo.errors import TecanError +from pylabrobot.tecan.evo.firmware.arm_base import EVOArm + + +class RoMa(EVOArm): + """Firmware commands for the RoMa (Robotic Manipulator Arm).""" + + async def report_z_param(self, param: int) -> int: + """Report current parameter for z-axis. + + Args: + param: 0=current position, 5=actual machine range + """ + resp: List[int] = ( + await self.interface.send_command(module=self.module, command="RPZ", params=[param]) + )["data"] + return resp[0] + + async def report_r_param(self, param: int) -> int: + """Report current parameter for r-axis (rotation). + + Args: + param: 0=current position, 5=actual machine range + """ + resp: List[int] = ( + await self.interface.send_command(module=self.module, command="RPR", params=[param]) + )["data"] + return resp[0] + + async def report_g_param(self, param: int) -> int: + """Report current parameter for g-axis (gripper). + + Args: + param: 0=current position, 5=actual machine range + """ + resp: List[int] = ( + await self.interface.send_command(module=self.module, command="RPG", params=[param]) + )["data"] + return resp[0] + + async def set_smooth_move_x(self, mode: int) -> None: + """Set X-axis smooth move mode. + + Args: + mode: 0=active (recalculate accel/speed by distance), 1=use SFX parameters directly + """ + await self.interface.send_command(module=self.module, command="SSM", params=[mode]) + + async def set_fast_speed_x(self, speed: Optional[int], accel: Optional[int] = None) -> None: + """Set fast speed and acceleration for X-axis. + + Args: + speed: 1/10 mm/s + accel: 1/10 mm/s^2 + """ + await self.interface.send_command(module=self.module, command="SFX", params=[speed, accel]) + + async def set_fast_speed_y(self, speed: Optional[int], accel: Optional[int] = None) -> None: + """Set fast speed and acceleration for Y-axis. + + Args: + speed: 1/10 mm/s + accel: 1/10 mm/s^2 + """ + await self.interface.send_command(module=self.module, command="SFY", params=[speed, accel]) + + async def set_fast_speed_z(self, speed: Optional[int], accel: Optional[int] = None) -> None: + """Set fast speed and acceleration for Z-axis. + + Args: + speed: 1/10 mm/s + accel: 1/10 mm/s^2 + """ + await self.interface.send_command(module=self.module, command="SFZ", params=[speed, accel]) + + async def set_fast_speed_r(self, speed: Optional[int], accel: Optional[int] = None) -> None: + """Set fast speed and acceleration for R-axis (rotation). + + Args: + speed: 1/10 deg/s + accel: 1/10 deg/s^2 + """ + await self.interface.send_command(module=self.module, command="SFR", params=[speed, accel]) + + async def set_vector_coordinate_position( + self, + v: int, + x: int, + y: int, + z: int, + r: int, + g: Optional[int], + speed: int, + tw: int = 0, + ) -> None: + """Set vector coordinate positions into table. + + Args: + v: vector index (1-100) + x: absolute x in 1/10 mm + y: absolute y in 1/10 mm + z: absolute z in 1/10 mm + r: absolute r in 1/10 deg + g: absolute gripper in 1/10 mm (optional) + speed: 0=slow, 1=fast + tw: target window class (set with STW) + + Raises: + TecanError: if movement would cause collision with another arm + """ + cur_x = EVOArm._pos_cache.setdefault(self.module, await self.report_x_param(0)) + for module, pos in EVOArm._pos_cache.items(): + if module == self.module: + continue + if cur_x < x and cur_x < pos < x: + raise TecanError("Invalid command (collision)", self.module, 2) + if cur_x > x and cur_x > pos > x: + raise TecanError("Invalid command (collision)", self.module, 2) + if abs(pos - x) < 1500: + raise TecanError("Invalid command (collision)", self.module, 2) + + await self.interface.send_command( + module=self.module, + command="SAA", + params=[v, x, y, z, r, g, speed, 0, tw], + ) + + async def action_move_vector_coordinate_position(self) -> None: + """Start coordinate movement built by the vector table.""" + await self.interface.send_command(module=self.module, command="AAC") + EVOArm._pos_cache[self.module] = await self.report_x_param(0) + + async def position_absolute_g(self, g: int) -> None: + """Move gripper to absolute position. + + Args: + g: absolute position in 1/10 mm + """ + await self.interface.send_command(module=self.module, command="PAG", params=[g]) + + async def set_gripper_params(self, speed: int, pwm: int, cur: Optional[int] = None) -> None: + """Set gripper parameters. + + Args: + speed: search speed in 1/10 mm/s + pwm: pulse width modification limit + cur: max current (optional) + """ + await self.interface.send_command(module=self.module, command="SGG", params=[speed, pwm, cur]) + + async def grip_plate(self, pos: int) -> None: + """Grip plate at current X/Y/Z/R position. + + Args: + pos: target position — plate must be found between current and target + """ + await self.interface.send_command(module=self.module, command="AGR", params=[pos]) + + async def set_target_window_class(self, wc: int, x: int, y: int, z: int, r: int, g: int) -> None: + """Set drive parameters for AAC command. + + Args: + wc: window class (1-100) + x: target window for x-axis in 1/10 mm + y: target window for y-axis in 1/10 mm + z: target window for z-axis in 1/10 mm + r: target window for r-axis in 1/10 deg + g: target window for g-axis in 1/10 mm + """ + await self.interface.send_command(module=self.module, command="STW", params=[wc, x, y, z, r, g]) diff --git a/pylabrobot/tecan/evo/tests/__init__.py b/pylabrobot/tecan/evo/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d From 0590640ce23fab6373588dcf9a30dca5c9b85e5e Mon Sep 17 00:00:00 2001 From: Robert-Keyser-Calico Date: Sat, 28 Mar 2026 11:05:24 -0700 Subject: [PATCH 03/37] Phase 2: Add TecanEVODriver for USB I/O and command protocol TecanEVODriver(Driver) owns the USB connection and implements the Tecan firmware command protocol. Extracted from legacy TecanLiquidHandler. - USB connection lifecycle (setup/stop) - Command assembly (\x02{module}{cmd},{params}\x00) - Response parsing with error code handling - SET command caching (skip redundant sends) - Satisfies CommandInterface protocol for firmware wrappers - 16 unit tests (command assembly, response parsing, caching, serialization) Co-Authored-By: Claude Opus 4.6 (1M context) --- pylabrobot/tecan/evo/driver.py | 149 +++++++++++++++++++++ pylabrobot/tecan/evo/tests/driver_tests.py | 121 +++++++++++++++++ 2 files changed, 270 insertions(+) create mode 100644 pylabrobot/tecan/evo/driver.py create mode 100644 pylabrobot/tecan/evo/tests/driver_tests.py diff --git a/pylabrobot/tecan/evo/driver.py b/pylabrobot/tecan/evo/driver.py new file mode 100644 index 00000000000..3d9a84d5dee --- /dev/null +++ b/pylabrobot/tecan/evo/driver.py @@ -0,0 +1,149 @@ +"""Tecan EVO USB driver. + +Owns the USB connection and implements the Tecan firmware command protocol. +This is the v1b1 equivalent of the legacy ``TecanLiquidHandler`` class. +""" + +from __future__ import annotations + +import logging +from typing import Dict, List, Optional, Union + +from pylabrobot.device import Driver +from pylabrobot.io.usb import USB +from pylabrobot.tecan.evo.errors import error_code_to_exception + +logger = logging.getLogger(__name__) + + +class TecanEVODriver(Driver): + """Driver for the Tecan Freedom EVO liquid handler. + + Handles USB connection lifecycle and the Tecan firmware command protocol + (``\\x02{module}{command},{params}\\x00`` framing, response parsing, SET caching). + + Args: + packet_read_timeout: Timeout in seconds for reading a single USB packet. + read_timeout: Timeout in seconds for reading a full response. + write_timeout: Timeout in seconds for writing a command. + """ + + def __init__( + self, + packet_read_timeout: int = 12, + read_timeout: int = 60, + write_timeout: int = 60, + ): + super().__init__() + self.io = USB( + human_readable_device_name="Tecan EVO", + packet_read_timeout=packet_read_timeout, + read_timeout=read_timeout, + write_timeout=write_timeout, + id_vendor=0x0C47, + id_product=0x4000, + ) + self._cache: Dict[str, List[Optional[int]]] = {} + + def _assemble_command(self, module: str, command: str, params: List[Optional[int]]) -> str: + """Assemble a firmware command string. + + Args: + module: 2-character module identifier (e.g. ``"C5"`` for LiHa). + command: Command identifier (e.g. ``"PIA"``, ``"PAA"``). + params: List of integer parameters (``None`` for empty/placeholder). + + Returns: + Framed command string: ``\\x02{module}{command},{params}\\x00``. + """ + cmd = module + command + ",".join(str(a) if a is not None else "" for a in params) + return f"\02{cmd}\00" + + def parse_response(self, resp: bytes) -> Dict[str, Union[str, int, List[Union[int, str]]]]: + """Parse a firmware response. + + Args: + resp: Raw response bytes from the USB device. + + Returns: + Dict with ``"module"`` (str) and ``"data"`` (list of int/str values). + + Raises: + TecanError: If the response indicates a non-zero error code. + """ + s = resp.decode("utf-8", "ignore") + module = s[1:3] + ret = int(resp[3]) ^ (1 << 7) + if ret != 0: + raise error_code_to_exception(module, ret) + + data: List[Union[int, str]] = [] + for x in s[3:-1].split(","): + if len(x) == 0: + continue + data.append(int(x) if x.lstrip("-").isdigit() else x) + + return {"module": module, "data": data} + + async def send_command( + self, + module: str, + command: str, + params: Optional[List[Optional[int]]] = None, + write_timeout: Optional[int] = None, + read_timeout: Optional[int] = None, + wait: bool = True, + ) -> Optional[Dict[str, Union[str, int, List[Union[int, str]]]]]: + """Send a firmware command and return the parsed response. + + Caches SET commands (commands starting with ``"S"``) and skips sending + if the same command with the same parameters was already sent. + + Args: + module: 2-character module identifier. + command: Command identifier. + params: List of integer parameters. + write_timeout: Override write timeout (seconds). + read_timeout: Override read timeout (seconds). + wait: If ``True``, wait for and return the response. If ``False``, + return ``None`` immediately after sending. + + Returns: + Parsed response dict, or ``None`` if ``wait=False``. + + Raises: + TecanError: If the device returns a non-zero error code. + """ + if command[0] == "S" and params is not None: + k = module + command + if k in self._cache and self._cache[k] == params: + return None + self._cache[k] = params + + cmd = self._assemble_command(module, command, [] if params is None else params) + await self.io.write(cmd.encode(), timeout=write_timeout) + + if not wait: + return None + + resp = await self.io.read(timeout=read_timeout) + return self.parse_response(resp) + + async def setup(self) -> None: + """Open USB connection to the Tecan EVO.""" + logger.info("Opening USB connection to Tecan EVO...") + await self.io.setup() + logger.info("USB connected.") + + async def stop(self) -> None: + """Close USB connection.""" + logger.info("Closing USB connection.") + await self.io.stop() + + def serialize(self) -> dict: + return { + **super().serialize(), + "packet_read_timeout": self.io.packet_read_timeout, + "read_timeout": self.io.read_timeout, + "write_timeout": self.io.write_timeout, + } diff --git a/pylabrobot/tecan/evo/tests/driver_tests.py b/pylabrobot/tecan/evo/tests/driver_tests.py new file mode 100644 index 00000000000..7a9bb1845f0 --- /dev/null +++ b/pylabrobot/tecan/evo/tests/driver_tests.py @@ -0,0 +1,121 @@ +"""Unit tests for TecanEVODriver.""" + +import unittest + +from pylabrobot.tecan.evo.driver import TecanEVODriver +from pylabrobot.tecan.evo.errors import TecanError + + +class CommandAssemblyTests(unittest.TestCase): + def setUp(self): + self.driver = TecanEVODriver() + + def test_assemble_no_params(self): + result = self.driver._assemble_command("C5", "PIA", []) + self.assertEqual(result, "\x02C5PIA\x00") + + def test_assemble_with_params(self): + result = self.driver._assemble_command("C5", "PAA", [100, 200, 90]) + self.assertEqual(result, "\x02C5PAA100,200,90\x00") + + def test_assemble_with_none_params(self): + result = self.driver._assemble_command("C5", "MDT", [255, None, None, None, 37]) + self.assertEqual(result, "\x02C5MDT255,,,,37\x00") + + def test_assemble_roma_module(self): + result = self.driver._assemble_command("C1", "PIX", []) + self.assertEqual(result, "\x02C1PIX\x00") + + +class ResponseParsingTests(unittest.TestCase): + def setUp(self): + self.driver = TecanEVODriver() + + def test_parse_success_no_data(self): + # Status byte 0x80 XOR 0x80 = 0 (success), no data + resp = b"\x02C5\x80\x00" + result = self.driver.parse_response(resp) + self.assertEqual(result["module"], "C5") + self.assertEqual(result["data"], []) + + def test_parse_success_with_int_data(self): + # Status byte 0x80 = success, data "8" + resp = b"\x02C5\x808\x00" + result = self.driver.parse_response(resp) + self.assertEqual(result["module"], "C5") + self.assertEqual(result["data"], [8]) + + def test_parse_success_with_csv_data(self): + resp = b"\x02C5\x802100,2100,2100\x00" + result = self.driver.parse_response(resp) + self.assertEqual(result["data"], [2100, 2100, 2100]) + + def test_parse_success_with_string_data(self): + resp = b"\x02C5\x80LIHACU-V1.80\x00" + result = self.driver.parse_response(resp) + self.assertEqual(result["data"], ["LIHACU-V1.80"]) + + def test_parse_error_code(self): + # Status byte: error code 1 with bit 7 set = 0x81 + resp = b"\x02C5\x81\x00" + with self.assertRaises(TecanError) as ctx: + self.driver.parse_response(resp) + self.assertEqual(ctx.exception.error_code, 1) + self.assertEqual(ctx.exception.module, "C5") + self.assertIn("Initialization failed", ctx.exception.message) + + def test_parse_error_code_3(self): + resp = b"\x02C5\x83\x00" + with self.assertRaises(TecanError) as ctx: + self.driver.parse_response(resp) + self.assertEqual(ctx.exception.error_code, 3) + self.assertIn("Invalid operand", ctx.exception.message) + + def test_parse_roma_error(self): + resp = b"\x02C1\x85\x00" + with self.assertRaises(TecanError) as ctx: + self.driver.parse_response(resp) + self.assertEqual(ctx.exception.error_code, 5) + self.assertEqual(ctx.exception.module, "C1") + + def test_parse_negative_data(self): + resp = b"\x02C5\x80-155\x00" + result = self.driver.parse_response(resp) + self.assertEqual(result["data"], [-155]) + + +class CachingTests(unittest.IsolatedAsyncioTestCase): + async def test_set_command_cached(self): + driver = TecanEVODriver() + # Simulate caching without actual USB + driver._cache["C5SEP"] = [1800, 1800] + # Same params should return None (cached) + # We can't call send_command without USB, but we can test the cache logic + k = "C5SEP" + params = [1800, 1800] + self.assertIn(k, driver._cache) + self.assertEqual(driver._cache[k], params) + + def test_cache_different_params(self): + driver = TecanEVODriver() + driver._cache["C5SEP"] = [1800, 1800] + # Different params should NOT match + new_params = [2000, 2000] + self.assertNotEqual(driver._cache["C5SEP"], new_params) + + +class SerializationTests(unittest.TestCase): + def test_serialize(self): + driver = TecanEVODriver(packet_read_timeout=30, read_timeout=120, write_timeout=120) + data = driver.serialize() + self.assertEqual(data["type"], "TecanEVODriver") + self.assertEqual(data["packet_read_timeout"], 30) + self.assertEqual(data["read_timeout"], 120) + self.assertEqual(data["write_timeout"], 120) + + def test_serialize_defaults(self): + driver = TecanEVODriver() + data = driver.serialize() + self.assertEqual(data["packet_read_timeout"], 12) + self.assertEqual(data["read_timeout"], 60) + self.assertEqual(data["write_timeout"], 60) From 11767a91b1c205c9356c8957ef5c72fc8fe2750d Mon Sep 17 00:00:00 2001 From: Robert-Keyser-Calico Date: Sat, 28 Mar 2026 11:41:39 -0700 Subject: [PATCH 04/37] Phase 3: Add EVOPIPBackend for syringe LiHa pipetting EVOPIPBackend(PIPBackend) implements the v1b1 PIP interface for Tecan EVO syringe-based liquid handling. Ported from legacy EVOBackend with: - pick_up_tips, drop_tips, aspirate, dispense via LiHa firmware wrapper - Position calculation (_liha_positions) with X/Y/Z coordinate transforms - Liquid class lookup from legacy module (import, not duplicated) - Airgap, liquid detection, tracking movement helpers - Y-spacing fix: uses plate.item_dy (well pitch) not well size - Conversion factors as class attributes (STEPS_PER_UL=3, SPEED_FACTOR=6) - Accepts TecanEVODriver via constructor, uses LiHa firmware wrapper Co-Authored-By: Claude Opus 4.6 (1M context) --- pylabrobot/tecan/evo/pip_backend.py | 474 ++++++++++++++++++++++++++++ 1 file changed, 474 insertions(+) create mode 100644 pylabrobot/tecan/evo/pip_backend.py diff --git a/pylabrobot/tecan/evo/pip_backend.py b/pylabrobot/tecan/evo/pip_backend.py new file mode 100644 index 00000000000..ddaa245669f --- /dev/null +++ b/pylabrobot/tecan/evo/pip_backend.py @@ -0,0 +1,474 @@ +"""PIPBackend for the Tecan EVO with syringe-based LiHa. + +Translates v1b1 PIP operations (Pickup, TipDrop, Aspiration, Dispense) into +Tecan firmware commands via the TecanEVODriver and LiHa firmware wrapper. +""" + +from __future__ import annotations + +import logging +from typing import Dict, List, Optional, Sequence, Tuple, TypeVar, Union + +from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.capabilities.liquid_handling.pip_backend import PIPBackend +from pylabrobot.capabilities.liquid_handling.standard import Aspiration, Dispense, Pickup, TipDrop +from pylabrobot.legacy.liquid_handling.liquid_classes.tecan import ( + TecanLiquidClass, + get_liquid_class, +) +from pylabrobot.resources import Liquid, Resource, TecanPlate, TecanTipRack, Tip +from pylabrobot.resources.tecan.tip_creators import TecanTip + +from pylabrobot.tecan.evo.driver import TecanEVODriver +from pylabrobot.tecan.evo.errors import TecanError +from pylabrobot.tecan.evo.firmware import LiHa + +logger = logging.getLogger(__name__) + +T = TypeVar("T") + +# Module identifiers +LIHA = "C5" +MCA = "W1" + + +class EVOPIPBackend(PIPBackend): + """PIPBackend for the Tecan EVO with syringe-based LiHa. + + Conversion factors for syringe dilutors (XP2000/XP6000): + - Volume: 3 full plunger steps per uL + - Speed: 6 half-steps/sec per uL/s + """ + + STEPS_PER_UL = 3.0 + SPEED_FACTOR = 6.0 + + def __init__( + self, + driver: TecanEVODriver, + deck: Resource, + diti_count: int = 0, + ): + """Create a new EVO PIP backend. + + Args: + driver: The TecanEVODriver that owns the USB connection. + deck: The deck resource (for coordinate calculations). + diti_count: Number of channels configured for disposable tips. + """ + self._driver = driver + self._deck = deck + self.diti_count = diti_count + + self._num_channels: Optional[int] = None + self._x_range: Optional[int] = None + self._y_range: Optional[int] = None + self._z_range: Optional[int] = None + self._z_traversal_height = 210 # mm + + self.liha: Optional[LiHa] = None + + @property + def num_channels(self) -> int: + if self._num_channels is None: + raise RuntimeError("Not yet set up. Call setup() first.") + return self._num_channels + + async def _on_setup(self) -> None: + """Initialize LiHa arm: PIA, query ranges, init plungers.""" + # Setup arm (PIA + BMX) + await self._setup_arm(LIHA) + + self.liha = LiHa(self._driver, LIHA) + await self.liha.position_initialization_x() + + self._num_channels = await self.liha.report_number_tips() + self._x_range = await self.liha.report_x_param(5) + self._y_range = (await self.liha.report_y_param(5))[0] + self._z_range = (await self.liha.report_z_param(5))[0] + + # Initialize plungers (assumes wash station at rail 1) + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis(45, 1031, 90, [1200] * self.num_channels) + await self.liha.initialize_plunger(self._bin_use_channels(list(range(self.num_channels)))) + await self.liha.position_valve_logical([1] * self.num_channels) + await self.liha.move_plunger_relative([100] * self.num_channels) + await self.liha.position_valve_logical([0] * self.num_channels) + await self.liha.set_end_speed_plunger([1800] * self.num_channels) + await self.liha.move_plunger_relative([-100] * self.num_channels) + await self.liha.position_absolute_all_axis(45, 1031, 90, [self._z_range] * self.num_channels) + logger.info("LiHa initialized: %d channels, z_range=%d", self._num_channels, self._z_range) + + async def _setup_arm(self, module: str) -> bool: + """Send PIA + BMX to initialize an arm module.""" + try: + if module == MCA: + await self._driver.send_command(module, command="PIB") + await self._driver.send_command(module, command="PIA") + except TecanError as e: + if e.error_code == 5: + return False + raise + if module != MCA: + await self._driver.send_command(module, command="BMX", params=[2]) + return True + + def can_pick_up_tip(self, channel_idx: int, tip: Tip) -> bool: + return isinstance(tip, TecanTip) + + # ============== Utility methods ============== + + def _first_valid(self, lst: List[Optional[T]]) -> Tuple[Optional[T], int]: + for i, v in enumerate(lst): + if v is not None: + return v, i + return None, -1 + + def _bin_use_channels(self, use_channels: List[int]) -> int: + b = 0 + for channel in use_channels: + b += 1 << channel + return b + + def _get_ys(self, ops: Sequence[Union[Aspiration, Dispense, Pickup, TipDrop]]) -> int: + """Get Y-spacing from plate well pitch or resource size.""" + par = ops[0].resource.parent + if hasattr(par, "item_dy"): + return int(par.item_dy * 10) + return int(ops[0].resource.get_absolute_size_y() * 10) + + def _liha_positions( + self, + ops: Sequence[Union[Aspiration, Dispense, Pickup, TipDrop]], + use_channels: List[int], + ) -> Tuple[List[Optional[int]], List[Optional[int]], Dict[str, List[Optional[int]]]]: + """Compute X, Y, Z positions for LiHa operations.""" + assert self._z_range is not None + + x_positions: List[Optional[int]] = [None] * self.num_channels + y_positions: List[Optional[int]] = [None] * self.num_channels + z_positions: Dict[str, List[Optional[int]]] = { + "travel": [None] * self.num_channels, + "start": [None] * self.num_channels, + "dispense": [None] * self.num_channels, + "max": [None] * self.num_channels, + } + + def get_z_position(z: float, z_off: float, tip_length: int) -> int: + return int(self._z_range - z + z_off * 10 + tip_length) + + for i, (op, channel) in enumerate(zip(ops, use_channels)): + location = op.resource.get_location_wrt(self._deck) + op.resource.center() + x_positions[channel] = int((location.x - 100 + op.offset.x) * 10) + y_positions[channel] = int((346.5 - location.y + op.offset.y) * 10) + + par = op.resource.parent + if not isinstance(par, (TecanPlate, TecanTipRack)): + raise ValueError(f"Operation is not supported by resource {par}.") + + tip_length = int(op.tip.total_tip_length * 10) + + if isinstance(op, (Aspiration, Dispense)): + z_positions["travel"][channel] = round(self._z_traversal_height * 10) + + z_positions["start"][channel] = get_z_position( + par.z_start, par.get_location_wrt(self._deck).z + op.offset.z, tip_length + ) + z_positions["dispense"][channel] = get_z_position( + par.z_dispense, par.get_location_wrt(self._deck).z + op.offset.z, tip_length + ) + z_positions["max"][channel] = get_z_position( + par.z_max, par.get_location_wrt(self._deck).z + op.offset.z, tip_length + ) + + return x_positions, y_positions, z_positions + + # ============== Parameter computation ============== + + def _aspirate_airgap( + self, + use_channels: List[int], + tecan_liquid_classes: List[Optional[TecanLiquidClass]], + airgap: str, + ) -> Tuple[List[Optional[int]], List[Optional[int]], List[Optional[int]]]: + pvl: List[Optional[int]] = [None] * self.num_channels + sep: List[Optional[int]] = [None] * self.num_channels + ppr: List[Optional[int]] = [None] * self.num_channels + + for i, channel in enumerate(use_channels): + tlc = tecan_liquid_classes[i] + assert tlc is not None + pvl[channel] = 0 + if airgap == "lag": + sep[channel] = int(tlc.aspirate_lag_speed * self.SPEED_FACTOR) + ppr[channel] = int(tlc.aspirate_lag_volume * self.STEPS_PER_UL) + elif airgap == "tag": + sep[channel] = int(tlc.aspirate_tag_speed * self.SPEED_FACTOR) + ppr[channel] = int(tlc.aspirate_tag_volume * self.STEPS_PER_UL) + + return pvl, sep, ppr + + def _liquid_detection( + self, + use_channels: List[int], + tecan_liquid_classes: List[Optional[TecanLiquidClass]], + ) -> Tuple[List[Optional[int]], List[Optional[int]], List[Optional[int]]]: + ssl: List[Optional[int]] = [None] * self.num_channels + sdl: List[Optional[int]] = [None] * self.num_channels + sbl: List[Optional[int]] = [None] * self.num_channels + + for i, channel in enumerate(use_channels): + tlc = tecan_liquid_classes[i] + assert tlc is not None + ssl[channel] = int(tlc.lld_speed * 10) + sdl[channel] = int(tlc.lld_distance * 10) + sbl[channel] = int(tlc.aspirate_lld_offset * 10) + + return ssl, sdl, sbl + + def _aspirate_action( + self, + ops: Sequence[Aspiration], + use_channels: List[int], + tecan_liquid_classes: List[Optional[TecanLiquidClass]], + zadd: List[Optional[int]], + ) -> Tuple[ + List[Optional[int]], + List[Optional[int]], + List[Optional[int]], + List[Optional[int]], + List[Optional[int]], + ]: + ssz: List[Optional[int]] = [None] * self.num_channels + sep: List[Optional[int]] = [None] * self.num_channels + stz: List[Optional[int]] = [-z if z else None for z in zadd] + mtr: List[Optional[int]] = [None] * self.num_channels + ssz_r: List[Optional[int]] = [None] * self.num_channels + + for i, channel in enumerate(use_channels): + tlc = tecan_liquid_classes[i] + z = zadd[channel] + assert tlc is not None and z is not None + flow_rate = ops[i].flow_rate or tlc.aspirate_speed + sep[channel] = int(flow_rate * self.SPEED_FACTOR) + ssz[channel] = round(z * flow_rate / ops[i].volume) + volume = tlc.compute_corrected_volume(ops[i].volume) + mtr[channel] = round(volume * self.STEPS_PER_UL) + ssz_r[channel] = int(tlc.aspirate_retract_speed * 10) + + return ssz, sep, stz, mtr, ssz_r + + def _dispense_action( + self, + ops: Sequence[Dispense], + use_channels: List[int], + tecan_liquid_classes: List[Optional[TecanLiquidClass]], + ) -> Tuple[ + List[Optional[int]], + List[Optional[int]], + List[Optional[int]], + List[Optional[int]], + ]: + sep: List[Optional[int]] = [None] * self.num_channels + spp: List[Optional[int]] = [None] * self.num_channels + stz: List[Optional[int]] = [None] * self.num_channels + mtr: List[Optional[int]] = [None] * self.num_channels + + for i, channel in enumerate(use_channels): + tlc = tecan_liquid_classes[i] + assert tlc is not None + flow_rate = ops[i].flow_rate or tlc.dispense_speed + sep[channel] = int(flow_rate * self.SPEED_FACTOR) + spp[channel] = int(tlc.dispense_breakoff * self.SPEED_FACTOR) + stz[channel] = 0 + volume = ( + tlc.compute_corrected_volume(ops[i].volume) + + tlc.aspirate_lag_volume + + tlc.aspirate_tag_volume + ) + mtr[channel] = -round(volume * self.STEPS_PER_UL) + + return sep, spp, stz, mtr + + def _get_liquid_classes( + self, ops: Sequence[Union[Aspiration, Dispense]] + ) -> List[Optional[TecanLiquidClass]]: + return [ + get_liquid_class( + target_volume=op.volume, + liquid_class=Liquid.WATER, + tip_type=op.tip.tip_type, + ) + if isinstance(op.tip, TecanTip) + else None + for op in ops + ] + + # ============== PIPBackend implementation ============== + + async def pick_up_tips( + self, + ops: List[Pickup], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.liha is not None and self._z_range is not None + + assert min(use_channels) >= self.num_channels - self.diti_count, ( + f"DiTis can only be configured for the last {self.diti_count} channels" + ) + + x_positions, y_positions, z_positions = self._liha_positions(ops, use_channels) + ys = self._get_ys(ops) + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis( + x, y - yi * ys, ys, [self._z_range] * self.num_channels + ) + + # Aspirate small air gap before tip pickup + pvl: List[Optional[int]] = [None] * self.num_channels + sep: List[Optional[int]] = [None] * self.num_channels + ppr: List[Optional[int]] = [None] * self.num_channels + for channel in use_channels: + pvl[channel] = 0 + sep[channel] = int(70 * self.SPEED_FACTOR) + ppr[channel] = int(10 * self.STEPS_PER_UL) + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + await self.liha.move_plunger_relative(ppr) + + first_z_start, _ = self._first_valid(z_positions["start"]) + assert first_z_start is not None + await self.liha.get_disposable_tip( + self._bin_use_channels(use_channels), first_z_start - 227, 210 + ) + + async def drop_tips( + self, + ops: List[TipDrop], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.liha is not None and self._z_range is not None + + assert min(use_channels) >= self.num_channels - self.diti_count, ( + f"DiTis can only be configured for the last {self.diti_count} channels" + ) + + x_positions, y_positions, _ = self._liha_positions(ops, use_channels) + ys = self._get_ys(ops) + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis( + x, y - yi * ys, ys, [self._z_range] * self.num_channels + ) + await self.liha.drop_disposable_tip(self._bin_use_channels(use_channels), discard_height=0) + + async def aspirate( + self, + ops: List[Aspiration], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.liha is not None and self._z_range is not None + + x_positions, y_positions, z_positions = self._liha_positions(ops, use_channels) + tecan_liquid_classes = self._get_liquid_classes(ops) + + ys = self._get_ys(ops) + zadd: List[Optional[int]] = [0] * self.num_channels + for i, channel in enumerate(use_channels): + par = ops[i].resource.parent + if par is None: + continue + if not isinstance(par, TecanPlate): + raise ValueError(f"Operation is not supported by resource {par}.") + zadd[channel] = round(ops[i].volume / par.area * 10) + + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis( + x, + y - yi * ys, + ys, + [z if z else self._z_range for z in z_positions["travel"]], + ) + + # Leading airgap + pvl, sep, ppr = self._aspirate_airgap(use_channels, tecan_liquid_classes, "lag") + if any(ppr): + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + await self.liha.move_plunger_relative(ppr) + + # Liquid level detection + if any(tlc.aspirate_lld if tlc is not None else None for tlc in tecan_liquid_classes): + tlc, _ = self._first_valid(tecan_liquid_classes) + assert tlc is not None + await self.liha.set_detection_mode(tlc.lld_mode, tlc.lld_conductivity) + ssl, sdl, sbl = self._liquid_detection(use_channels, tecan_liquid_classes) + await self.liha.set_search_speed(ssl) + await self.liha.set_search_retract_distance(sdl) + await self.liha.set_search_z_start(z_positions["start"]) + await self.liha.set_search_z_max(list(z if z else self._z_range for z in z_positions["max"])) + await self.liha.set_search_submerge(sbl) + shz = [min(z for z in z_positions["travel"] if z)] * self.num_channels + await self.liha.set_z_travel_height(shz) + await self.liha.move_detect_liquid(self._bin_use_channels(use_channels), zadd) + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + + # Aspirate + retract + zadd = [min(z, 32) if z else None for z in zadd] + ssz, sep, stz, mtr, ssz_r = self._aspirate_action(ops, use_channels, tecan_liquid_classes, zadd) + await self.liha.set_slow_speed_z(ssz) + await self.liha.set_end_speed_plunger(sep) + await self.liha.set_tracking_distance_z(stz) + await self.liha.move_tracking_relative(mtr) + await self.liha.set_slow_speed_z(ssz_r) + await self.liha.move_absolute_z(z_positions["start"]) + + # Trailing airgap + pvl, sep, ppr = self._aspirate_airgap(use_channels, tecan_liquid_classes, "tag") + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + await self.liha.move_plunger_relative(ppr) + + async def dispense( + self, + ops: List[Dispense], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.liha is not None and self._z_range is not None + + x_positions, y_positions, z_positions = self._liha_positions(ops, use_channels) + tecan_liquid_classes = self._get_liquid_classes(ops) + + ys = self._get_ys(ops) + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + await self.liha.set_z_travel_height([z if z else self._z_range for z in z_positions["travel"]]) + await self.liha.position_absolute_all_axis( + x, + y - yi * ys, + ys, + [z if z else self._z_range for z in z_positions["dispense"]], + ) + + sep, spp, stz, mtr = self._dispense_action(ops, use_channels, tecan_liquid_classes) + await self.liha.set_end_speed_plunger(sep) + await self.liha.set_stop_speed_plunger(spp) + await self.liha.set_tracking_distance_z(stz) + await self.liha.move_tracking_relative(mtr) From 805262d991d89611dc76478683f404ab00239778 Mon Sep 17 00:00:00 2001 From: Robert-Keyser-Calico Date: Sat, 28 Mar 2026 11:45:03 -0700 Subject: [PATCH 05/37] Phase 4: Add AirEVOPIPBackend for Air LiHa (ZaapMotion) AirEVOPIPBackend(EVOPIPBackend) overrides for air displacement pipetting: - ZaapMotion boot exit (T2xX) and motor config (33 commands per tip) - Safety module power-on (O1 SPN/SPS3) - Init-skip when already initialized (REE0 check) - Conversion factors: 106.4 steps/uL, 213 speed factor - Force mode wrapping (SFR/SFP/SDP) around all plunger operations - Direct z_start from tip rack for AGT (bypass coordinate transform) - SDT/PPA before tip discard Co-Authored-By: Claude Opus 4.6 (1M context) --- pylabrobot/tecan/evo/air_pip_backend.py | 398 ++++++++++++++++++++++++ 1 file changed, 398 insertions(+) create mode 100644 pylabrobot/tecan/evo/air_pip_backend.py diff --git a/pylabrobot/tecan/evo/air_pip_backend.py b/pylabrobot/tecan/evo/air_pip_backend.py new file mode 100644 index 00000000000..18a018f67f8 --- /dev/null +++ b/pylabrobot/tecan/evo/air_pip_backend.py @@ -0,0 +1,398 @@ +"""PIPBackend for the Tecan EVO with Air LiHa (ZaapMotion controllers). + +Overrides conversion factors and adds ZaapMotion boot exit, motor +configuration, and force mode wrapping around plunger operations. + +See keyser-testing/AirLiHa_Investigation.md for reverse-engineering details. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import List, Optional + +from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.capabilities.liquid_handling.standard import Aspiration, Dispense, Pickup, TipDrop +from pylabrobot.resources import Resource, TecanTipRack + +from pylabrobot.tecan.evo.driver import TecanEVODriver +from pylabrobot.tecan.evo.errors import TecanError +from pylabrobot.tecan.evo.pip_backend import EVOPIPBackend + +logger = logging.getLogger(__name__) + +# ZaapMotion motor configuration sequence, sent via transparent pipeline T2x. +# Captured from EVOware USB traffic (zaapmotiondriver.dll scan phase). +ZAAPMOTION_CONFIG = [ + "CFE 255,500", + "CAD ADCA,0,12.5", + "CAD ADCB,1,12.5", + "EDF1", + "EDF4", + "CDO 11", + "EDF5", + "SIC 10,5", + "SEA ADD,H,4,STOP,1,0,0", + "CMTBLDC,1", + "CETQEP2,256,R", + "CECPOS,QEP2", + "CECCUR,QEP2", + "CEE OFF", + "STL80", + "SVL12,8,16", + "SVL24,20,28", + "SCL1,900,3.5", + "SCE HOLD,500", + "SCE MOVE,500", + "CIR0", + "PIDHOLD,D,1.2,1,-1,0.003,0,0,OFF", + "PIDMOVE,D,0.8,1,-1,0.004,0,0,OFF", + "PIDHOLD,Q,1.2,1,-1,0.003,0,0,OFF", + "PIDMOVE,Q,0.8,1,-1,0.004,0,0,OFF", + "PIDHOLD,POS,0.2,1,-1,0.02,4,0,OFF", + "PIDMOVE,POS,0.35,1,-1,0.1,3,0,OFF", + "PIDSPDELAY,0", + "SFF 0.045,0.4,0.041", + "SES 0", + "SPO0", + "SIA 0.01, 0.28, 0.0", + "WRP", +] + + +class AirEVOPIPBackend(EVOPIPBackend): + """PIPBackend for the Tecan EVO with Air LiHa (ZaapMotion controllers). + + Air displacement uses BLDC motor controllers instead of syringe dilutors. + After power cycle, the ZaapMotion controllers boot into bootloader mode + and require firmware exit + motor configuration before PIA can succeed. + + Conversion factors for ZaapMotion (air displacement): + - Volume: 106.4 full plunger steps per uL + - Speed: 213 half-steps/sec per uL/s + """ + + STEPS_PER_UL = 106.4 + SPEED_FACTOR = 213.0 + + # ZaapMotion force ramp values + SFR_ACTIVE = 133120 + SFR_IDLE = 3752 + SDP_DEFAULT = 1400 + + def __init__( + self, + driver: TecanEVODriver, + deck: Resource, + diti_count: int = 0, + ): + super().__init__(driver=driver, deck=deck, diti_count=diti_count) + + async def _on_setup(self) -> None: + """Configure ZaapMotion controllers, then run standard LiHa init.""" + + # Check if already initialized (skip ZaapMotion config + PIA) + if await self._is_initialized(): + logger.info("Axes already initialized — skipping ZaapMotion config + PIA.") + await self._setup_quick() + return + + logger.info("Running full Air LiHa setup...") + await self._configure_zaapmotion() + await self._setup_safety_module() + + # ZaapMotion SDO config (from EVOware: sent right before PIA) + try: + await self._driver.send_command("C5", command="T23SDO11,1") + except TecanError: + pass + + # Standard LiHa init (PIA, plunger init, etc.) + await super()._on_setup() + + async def _is_initialized(self) -> bool: + """Check if LiHa axes are already initialized.""" + try: + resp = await self._driver.send_command("C5", command="REE0") + err = resp["data"][0] if resp and resp.get("data") else "" + # A = init failed (1), G = not initialized (7) + if err and not any(c in ("A", "G") for c in err): + return True + except (TecanError, TimeoutError): + pass + return False + + async def _setup_quick(self) -> None: + """Fast setup when axes are already initialized.""" + from pylabrobot.tecan.evo.firmware import LiHa + + self.liha = LiHa(self._driver, "C5") + self._num_channels = await self.liha.report_number_tips() + self._x_range = await self.liha.report_x_param(5) + self._y_range = (await self.liha.report_y_param(5))[0] + self._z_range = (await self.liha.report_z_param(5))[0] + logger.info("Quick setup complete: %d channels, z_range=%d", self._num_channels, self._z_range) + + async def _configure_zaapmotion(self) -> None: + """Exit boot mode and configure all 8 ZaapMotion motor controllers.""" + for tip in range(8): + prefix = f"T2{tip}" + + # Check current mode + try: + resp = await self._driver.send_command("C5", command=f"{prefix}RFV") + firmware = resp["data"][0] if resp and resp.get("data") else "" + except TecanError: + firmware = "" + + if "BOOT" in str(firmware): + logger.info("ZaapMotion tip %d in boot mode, sending exit command", tip + 1) + await self._driver.send_command("C5", command=f"{prefix}X") + await asyncio.sleep(1) + + # Verify transition + try: + resp = await self._driver.send_command("C5", command=f"{prefix}RFV") + firmware = resp["data"][0] if resp and resp.get("data") else "" + except TecanError: + firmware = "" + + if "BOOT" in str(firmware): + raise TecanError(f"ZaapMotion tip {tip + 1} failed to exit boot mode", "C5", 1) + + # Check if already configured + try: + await self._driver.send_command("C5", command=f"{prefix}RCS") + logger.info("ZaapMotion tip %d already configured, skipping", tip + 1) + continue + except TecanError: + pass + + # Send motor configuration + logger.info("Configuring ZaapMotion tip %d (%d commands)", tip + 1, len(ZAAPMOTION_CONFIG)) + for cmd in ZAAPMOTION_CONFIG: + try: + await self._driver.send_command("C5", command=f"{prefix}{cmd}") + except TecanError as e: + logger.warning("ZaapMotion tip %d config '%s' failed: %s", tip + 1, cmd, e) + + async def _setup_safety_module(self) -> None: + """Send safety module commands to enable motor power.""" + try: + await self._driver.send_command("O1", command="SPN") + await self._driver.send_command("O1", command="SPS3") + except TecanError as e: + logger.warning("Safety module command failed: %s", e) + + # ============== ZaapMotion force mode ============== + + async def _zaapmotion_force_on(self) -> None: + """Enable ZaapMotion force mode before plunger operations.""" + for tip in range(8): + await self._driver.send_command("C5", command=f"T2{tip}SFR{self.SFR_ACTIVE}") + for tip in range(8): + await self._driver.send_command("C5", command=f"T2{tip}SFP1") + + async def _zaapmotion_force_off(self) -> None: + """Restore ZaapMotion to idle after plunger operations.""" + for tip in range(8): + await self._driver.send_command("C5", command=f"T2{tip}SFR{self.SFR_IDLE}") + for tip in range(8): + await self._driver.send_command("C5", command=f"T2{tip}SDP{self.SDP_DEFAULT}") + + # ============== Override operations with force mode ============== + + async def pick_up_tips( + self, + ops: List[Pickup], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.liha is not None and self._z_range is not None + + assert min(use_channels) >= self.num_channels - self.diti_count, ( + f"DiTis can only be configured for the last {self.diti_count} channels" + ) + + # Use _liha_positions for X/Y only; Z comes from tip rack directly + x_positions, y_positions, _ = self._liha_positions(ops, use_channels) + ys = self._get_ys(ops) + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis( + x, y - yi * ys, ys, [self._z_range] * self.num_channels + ) + + # Aspirate small air gap with force mode + pvl: List[Optional[int]] = [None] * self.num_channels + sep: List[Optional[int]] = [None] * self.num_channels + ppr: List[Optional[int]] = [None] * self.num_channels + for channel in use_channels: + pvl[channel] = 0 + sep[channel] = int(70 * self.SPEED_FACTOR) + ppr[channel] = int(10 * self.STEPS_PER_UL) + await self._zaapmotion_force_on() + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + await self.liha.move_plunger_relative(ppr) + await self._zaapmotion_force_off() + + # AGT using tip rack z_start directly + par = ops[0].resource.parent + assert isinstance(par, TecanTipRack), f"Expected TecanTipRack, got {type(par)}" + agt_z_start = int(par.z_start) + agt_z_search = abs(int(par.z_max - par.z_start)) + await self.liha.get_disposable_tip( + self._bin_use_channels(use_channels), agt_z_start, agt_z_search + ) + + async def drop_tips( + self, + ops: List[TipDrop], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.liha is not None and self._z_range is not None + + assert min(use_channels) >= self.num_channels - self.diti_count, ( + f"DiTis can only be configured for the last {self.diti_count} channels" + ) + + x_positions, y_positions, _ = self._liha_positions(ops, use_channels) + ys = self._get_ys(ops) + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis( + x, y - yi * ys, ys, [self._z_range] * self.num_channels + ) + + # Empty plunger before discard + await self.liha.position_valve_logical([0] * self.num_channels) + sep_vals: List[Optional[int]] = [int(600 * self.SPEED_FACTOR)] * self.num_channels + await self._zaapmotion_force_on() + await self.liha.set_end_speed_plunger(sep_vals) + await self._driver.send_command("C5", command="PPA" + ",".join(["0"] * self.num_channels)) + await self._zaapmotion_force_off() + + # Set DiTi discard parameters and drop + await self._driver.send_command("C5", command="SDT1,1000,200") + await self.liha.drop_disposable_tip(self._bin_use_channels(use_channels), discard_height=1) + + async def aspirate( + self, + ops: List[Aspiration], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.liha is not None and self._z_range is not None + + x_positions, y_positions, z_positions = self._liha_positions(ops, use_channels) + tecan_liquid_classes = self._get_liquid_classes(ops) + + from pylabrobot.resources import TecanPlate + + ys = self._get_ys(ops) + zadd: List[Optional[int]] = [0] * self.num_channels + for i, channel in enumerate(use_channels): + par = ops[i].resource.parent + if par is None: + continue + if not isinstance(par, TecanPlate): + raise ValueError(f"Operation is not supported by resource {par}.") + zadd[channel] = round(ops[i].volume / par.area * 10) + + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + await self.liha.position_absolute_all_axis( + x, + y - yi * ys, + ys, + [z if z else self._z_range for z in z_positions["travel"]], + ) + + # Leading airgap with force mode + pvl, sep, ppr = self._aspirate_airgap(use_channels, tecan_liquid_classes, "lag") + if any(ppr): + await self._zaapmotion_force_on() + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + await self.liha.move_plunger_relative(ppr) + await self._zaapmotion_force_off() + + # Liquid level detection + if any(tlc.aspirate_lld if tlc is not None else None for tlc in tecan_liquid_classes): + tlc, _ = self._first_valid(tecan_liquid_classes) + assert tlc is not None + await self.liha.set_detection_mode(tlc.lld_mode, tlc.lld_conductivity) + ssl, sdl, sbl = self._liquid_detection(use_channels, tecan_liquid_classes) + await self.liha.set_search_speed(ssl) + await self.liha.set_search_retract_distance(sdl) + await self.liha.set_search_z_start(z_positions["start"]) + await self.liha.set_search_z_max(list(z if z else self._z_range for z in z_positions["max"])) + await self.liha.set_search_submerge(sbl) + shz = [min(z for z in z_positions["travel"] if z)] * self.num_channels + await self.liha.set_z_travel_height(shz) + await self.liha.move_detect_liquid(self._bin_use_channels(use_channels), zadd) + await self.liha.set_z_travel_height([self._z_range] * self.num_channels) + + # Aspirate + retract with force mode + zadd = [min(z, 32) if z else None for z in zadd] + ssz, sep, stz, mtr, ssz_r = self._aspirate_action(ops, use_channels, tecan_liquid_classes, zadd) + await self.liha.set_slow_speed_z(ssz) + await self._zaapmotion_force_on() + await self.liha.set_end_speed_plunger(sep) + await self.liha.set_tracking_distance_z(stz) + await self.liha.move_tracking_relative(mtr) + await self._zaapmotion_force_off() + await self.liha.set_slow_speed_z(ssz_r) + await self.liha.move_absolute_z(z_positions["start"]) + + # Trailing airgap with force mode + pvl, sep, ppr = self._aspirate_airgap(use_channels, tecan_liquid_classes, "tag") + await self._zaapmotion_force_on() + await self.liha.position_valve_logical(pvl) + await self.liha.set_end_speed_plunger(sep) + await self.liha.move_plunger_relative(ppr) + await self._zaapmotion_force_off() + + async def dispense( + self, + ops: List[Dispense], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.liha is not None and self._z_range is not None + + x_positions, y_positions, z_positions = self._liha_positions(ops, use_channels) + tecan_liquid_classes = self._get_liquid_classes(ops) + + ys = self._get_ys(ops) + x, _ = self._first_valid(x_positions) + y, yi = self._first_valid(y_positions) + assert x is not None and y is not None + + await self.liha.set_z_travel_height([z if z else self._z_range for z in z_positions["travel"]]) + await self.liha.position_absolute_all_axis( + x, + y - yi * ys, + ys, + [z if z else self._z_range for z in z_positions["dispense"]], + ) + + sep, spp, stz, mtr = self._dispense_action(ops, use_channels, tecan_liquid_classes) + await self._zaapmotion_force_on() + await self.liha.set_end_speed_plunger(sep) + await self.liha.set_stop_speed_plunger(spp) + await self.liha.set_tracking_distance_z(stz) + await self.liha.move_tracking_relative(mtr) + await self._zaapmotion_force_off() From 01516be222b5df1650b4b2b2ab36b88e6d71d86a Mon Sep 17 00:00:00 2001 From: Robert-Keyser-Calico Date: Sat, 28 Mar 2026 11:56:57 -0700 Subject: [PATCH 06/37] Phase 5+6: Add EVORoMaBackend and TecanEVO device composition MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 5 — EVORoMaBackend(GripperArmBackend): - RoMa plate handling via vector coordinate trajectories - Gripper control (open, close, grip_plate) - Park, halt, move_to_location, get_gripper_location - Carrier-based coordinate computation (_roma_positions) - pick_up_from_carrier / drop_at_carrier for carrier-aware operations Phase 6 — TecanEVO(Resource, Device): - Composite device with Driver + PIP + GripperArm capabilities - Constructor flags: air_liha=True selects AirEVOPIPBackend, has_roma=True adds arm - Capability ordering: arm first (must park before LiHa X-init) - Deck assigned as child resource for coordinate calculations - Exports from pylabrobot.tecan.evo.__init__ Usage: evo = TecanEVO(deck=EVO150Deck(), diti_count=8, air_liha=True) await evo.setup() await evo.pip.pick_up_tips(...) await evo.pip.aspirate(...) Co-Authored-By: Claude Opus 4.6 (1M context) --- pylabrobot/tecan/evo/__init__.py | 5 + pylabrobot/tecan/evo/evo.py | 104 ++++++++++++ pylabrobot/tecan/evo/roma_backend.py | 244 +++++++++++++++++++++++++++ 3 files changed, 353 insertions(+) create mode 100644 pylabrobot/tecan/evo/evo.py create mode 100644 pylabrobot/tecan/evo/roma_backend.py diff --git a/pylabrobot/tecan/evo/__init__.py b/pylabrobot/tecan/evo/__init__.py index e69de29bb2d..ef4ff36925d 100644 --- a/pylabrobot/tecan/evo/__init__.py +++ b/pylabrobot/tecan/evo/__init__.py @@ -0,0 +1,5 @@ +from pylabrobot.tecan.evo.air_pip_backend import AirEVOPIPBackend +from pylabrobot.tecan.evo.driver import TecanEVODriver +from pylabrobot.tecan.evo.evo import TecanEVO +from pylabrobot.tecan.evo.pip_backend import EVOPIPBackend +from pylabrobot.tecan.evo.roma_backend import EVORoMaBackend diff --git a/pylabrobot/tecan/evo/evo.py b/pylabrobot/tecan/evo/evo.py new file mode 100644 index 00000000000..bd4a4ffb10d --- /dev/null +++ b/pylabrobot/tecan/evo/evo.py @@ -0,0 +1,104 @@ +"""Tecan Freedom EVO composite device. + +Composes a TecanEVODriver with PIP (liquid handling) and GripperArm (RoMa) +capabilities using the v1b1 Device architecture. +""" + +from __future__ import annotations + +from typing import Optional + +from pylabrobot.arms.arm import GripperArm +from pylabrobot.capabilities.liquid_handling.pip import PIP +from pylabrobot.device import Device +from pylabrobot.resources import Coordinate, Resource + +from pylabrobot.tecan.evo.air_pip_backend import AirEVOPIPBackend +from pylabrobot.tecan.evo.driver import TecanEVODriver +from pylabrobot.tecan.evo.pip_backend import EVOPIPBackend +from pylabrobot.tecan.evo.roma_backend import EVORoMaBackend + + +class TecanEVO(Resource, Device): + """Tecan Freedom EVO liquid handling platform. + + Composes a USB driver with independent-channel pipetting (PIP) and + optionally a RoMa plate handling arm (GripperArm). + + Example:: + + from pylabrobot.tecan.evo import TecanEVO, TecanEVODriver + from pylabrobot.resources.tecan.tecan_decks import EVO150Deck + + deck = EVO150Deck() + evo = TecanEVO(name="evo", deck=deck, diti_count=8, air_liha=True) + await evo.setup() + + # Pipetting via PIP capability + await evo.pip.pick_up_tips(...) + await evo.pip.aspirate(...) + + # Plate handling via arm capability (if RoMa present) + await evo.arm.pick_up_resource(...) + + await evo.stop() + + Args: + name: Device name. + deck: Deck resource (e.g. EVO150Deck). + diti_count: Number of channels configured for disposable tips. + air_liha: If True, use AirEVOPIPBackend (ZaapMotion). Otherwise syringe. + has_roma: If True, include RoMa arm capability. + packet_read_timeout: USB packet read timeout in seconds. + read_timeout: USB read timeout in seconds. + write_timeout: USB write timeout in seconds. + """ + + def __init__( + self, + name: str = "evo", + deck: Optional[Resource] = None, + diti_count: int = 0, + air_liha: bool = False, + has_roma: bool = True, + packet_read_timeout: int = 12, + read_timeout: int = 60, + write_timeout: int = 60, + size_x: float = 1315, + size_y: float = 780, + size_z: float = 765, + ): + driver = TecanEVODriver( + packet_read_timeout=packet_read_timeout, + read_timeout=read_timeout, + write_timeout=write_timeout, + ) + + Resource.__init__(self, name=name, size_x=size_x, size_y=size_y, size_z=size_z) + Device.__init__(self, driver=driver) + + # Assign deck as child resource + if deck is not None: + self.assign_child_resource(deck, location=Coordinate.zero()) + + deck_ref = deck or self + + # PIP capability + if air_liha: + pip_backend = AirEVOPIPBackend(driver=driver, deck=deck_ref, diti_count=diti_count) + else: + pip_backend = EVOPIPBackend(driver=driver, deck=deck_ref, diti_count=diti_count) + self.pip = PIP(backend=pip_backend) + + # RoMa arm capability + self.arm: Optional[GripperArm] = None + if has_roma: + roma_backend = EVORoMaBackend(driver=driver, deck=deck_ref) + self.arm = GripperArm(backend=roma_backend, reference_resource=deck_ref) + + # Capabilities list: arm first (must park before LiHa X-init) + caps = [] + if self.arm is not None: + caps.append(self.arm) + caps.append(self.pip) + self._capabilities = caps diff --git a/pylabrobot/tecan/evo/roma_backend.py b/pylabrobot/tecan/evo/roma_backend.py new file mode 100644 index 00000000000..ad3f78bca11 --- /dev/null +++ b/pylabrobot/tecan/evo/roma_backend.py @@ -0,0 +1,244 @@ +"""GripperArmBackend for the Tecan EVO RoMa (Robotic Manipulator Arm). + +Translates v1b1 arm operations into Tecan RoMa firmware commands via the +TecanEVODriver and RoMa firmware wrapper. +""" + +from __future__ import annotations + +import logging +from typing import Dict, Optional, Tuple + +from pylabrobot.arms.backend import GripperArmBackend +from pylabrobot.arms.standard import GripperLocation +from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.resources import Coordinate, Resource, TecanPlateCarrier +from pylabrobot.resources.rotation import Rotation + +from pylabrobot.tecan.evo.driver import TecanEVODriver +from pylabrobot.tecan.evo.errors import TecanError +from pylabrobot.tecan.evo.firmware import RoMa + +logger = logging.getLogger(__name__) + +ROMA = "C1" + + +class EVORoMaBackend(GripperArmBackend): + """GripperArmBackend for the Tecan EVO RoMa plate handling arm. + + The RoMa grips plates along the Y-axis at a fixed 900 (90-degree) R-axis + orientation. It uses vector coordinate tables for multi-point trajectories + with target window classes for smooth acceleration profiles. + """ + + def __init__(self, driver: TecanEVODriver, deck: Resource): + self._driver = driver + self._deck = deck + self._z_roma_traversal_height = 68.7 # mm + self.roma: Optional[RoMa] = None + + async def _on_setup(self) -> None: + """Initialize RoMa arm: PIA + park.""" + try: + await self._driver.send_command(ROMA, command="PIA") + except TecanError as e: + if e.error_code == 5: + logger.info("RoMa not present (error 5).") + return + raise + await self._driver.send_command(ROMA, command="BMX", params=[2]) + + self.roma = RoMa(self._driver, ROMA) + await self.roma.position_initialization_x() + await self.park() + logger.info("RoMa initialized and parked.") + + async def _on_stop(self) -> None: + pass + + def _roma_positions( + self, + resource: Resource, + offset: Coordinate, + z_range: int, + ) -> Tuple[int, int, Dict[str, int]]: + """Compute RoMa X, Y, Z positions from resource and carrier attributes.""" + parent = resource.parent # PlateHolder + if parent is None: + raise ValueError(f"Operation is not supported by resource {resource}.") + parent = parent.parent # PlateCarrier + if not isinstance(parent, TecanPlateCarrier): + raise ValueError(f"Operation is not supported by resource {parent}.") + + if parent.roma_x is None or parent.roma_y is None: + raise ValueError(f"RoMa coordinates not defined for carrier {parent}.") + if parent.roma_z_safe is None or parent.roma_z_end is None: + raise ValueError(f"RoMa Z positions not defined for carrier {parent}.") + + x_position = int((offset.x - 100) * 10 + parent.roma_x) + y_position = int((347.1 - (offset.y + resource.get_absolute_size_y())) * 10 + parent.roma_y) + z_positions = { + "safe": z_range - int(parent.roma_z_safe), + "travel": int(self._z_roma_traversal_height * 10), + "end": z_range - int(parent.roma_z_end - offset.z * 10), + } + return x_position, y_position, z_positions + + # ============== GripperArmBackend implementation ============== + + async def pick_up_at_location( + self, + location: Coordinate, + resource_width: float, + backend_params: Optional[BackendParams] = None, + ) -> None: + """Pick up a plate at the given location. + + Note: The current implementation uses the legacy carrier-attribute-based + positioning rather than the raw Coordinate. The ``location`` is used to + find the resource and its parent carrier for RoMa coordinate computation. + """ + assert self.roma is not None + # For now, this method is called from the Device level which provides + # the absolute location. The RoMa needs carrier-specific attributes, + # so the Device.pick_up_resource method should call _pick_up_from_carrier + # with the full resource reference. + raise NotImplementedError( + "Use TecanEVO.pick_up_resource() which provides carrier context. " + "Direct pick_up_at_location with raw coordinates is not yet supported." + ) + + async def drop_at_location( + self, + location: Coordinate, + resource_width: float, + backend_params: Optional[BackendParams] = None, + ) -> None: + assert self.roma is not None + raise NotImplementedError( + "Use TecanEVO.drop_resource() which provides carrier context. " + "Direct drop_at_location with raw coordinates is not yet supported." + ) + + async def pick_up_from_carrier( + self, + resource: Resource, + backend_params: Optional[BackendParams] = None, + ) -> None: + """Pick up a plate using carrier-based RoMa coordinates.""" + assert self.roma is not None + + z_range = await self.roma.report_z_param(5) + offset = resource.get_location_wrt(self._deck) + x, y, z = self._roma_positions(resource, offset, z_range) + h = int(resource.get_absolute_size_y() * 10) + + # Move to resource + await self.roma.set_smooth_move_x(1) + await self.roma.set_fast_speed_x(10000) + await self.roma.set_fast_speed_y(5000, 1500) + await self.roma.set_fast_speed_z(1300) + await self.roma.set_fast_speed_r(5000, 1500) + await self.roma.set_vector_coordinate_position(1, x, y, z["safe"], 900, None, 1, 0) + await self.roma.action_move_vector_coordinate_position() + await self.roma.set_smooth_move_x(0) + + # Pick up + await self.roma.position_absolute_g(900) + await self.roma.set_target_window_class(1, 0, 0, 0, 135, 0) + await self.roma.set_vector_coordinate_position(1, x, y, z["travel"], 900, None, 1, 1) + await self.roma.set_vector_coordinate_position(1, x, y, z["end"], 900, None, 1, 0) + await self.roma.action_move_vector_coordinate_position() + await self.roma.set_fast_speed_y(3500, 1000) + await self.roma.set_fast_speed_r(2000, 600) + await self.roma.set_gripper_params(100, 75) + await self.roma.grip_plate(h - 100) + + async def drop_at_carrier( + self, + resource: Resource, + destination: Coordinate, + backend_params: Optional[BackendParams] = None, + ) -> None: + """Drop a plate at a carrier location using RoMa coordinates.""" + assert self.roma is not None + + z_range = await self.roma.report_z_param(5) + offset = resource.get_location_wrt(self._deck) + x, y, z = self._roma_positions(resource, offset, z_range) + xt, yt, zt = self._roma_positions(resource, destination, z_range) + + # Multi-point trajectory to target + await self.roma.set_target_window_class(1, 0, 0, 0, 135, 0) + await self.roma.set_target_window_class(2, 0, 0, 0, 53, 0) + await self.roma.set_target_window_class(3, 0, 0, 0, 55, 0) + await self.roma.set_target_window_class(4, 45, 0, 0, 0, 0) + await self.roma.set_vector_coordinate_position(1, x, y, z["end"], 900, None, 1, 1) + await self.roma.set_vector_coordinate_position(2, x, y, z["travel"], 900, None, 1, 2) + await self.roma.set_vector_coordinate_position(3, x, y, z["safe"], 900, None, 1, 3) + await self.roma.set_vector_coordinate_position(4, xt, yt, zt["safe"], 900, None, 1, 4) + await self.roma.set_vector_coordinate_position(5, xt, yt, zt["travel"], 900, None, 1, 3) + await self.roma.set_vector_coordinate_position(6, xt, yt, zt["end"], 900, None, 1, 0) + await self.roma.action_move_vector_coordinate_position() + + # Release + await self.roma.position_absolute_g(900) + await self.roma.set_fast_speed_y(5000, 1500) + await self.roma.set_fast_speed_r(5000, 1500) + await self.roma.set_vector_coordinate_position(1, xt, yt, zt["end"], 900, None, 1, 1) + await self.roma.set_vector_coordinate_position(2, xt, yt, zt["travel"], 900, None, 1, 2) + await self.roma.set_vector_coordinate_position(3, xt, yt, zt["safe"], 900, None, 1, 0) + await self.roma.action_move_vector_coordinate_position() + await self.roma.set_fast_speed_y(3500, 1000) + await self.roma.set_fast_speed_r(2000, 600) + + async def move_to_location( + self, location: Coordinate, backend_params: Optional[BackendParams] = None + ) -> None: + assert self.roma is not None + z_range = await self.roma.report_z_param(5) + x = int((location.x - 100) * 10) + y = int((347.1 - location.y) * 10) + z_safe = z_range - 946 # default safe Z + await self.roma.set_vector_coordinate_position(1, x, y, z_safe, 900, None, 1, 0) + await self.roma.action_move_vector_coordinate_position() + + async def halt(self, backend_params: Optional[BackendParams] = None) -> None: + await self._driver.send_command(ROMA, command="BMA", params=[0, 0, 0]) + + async def park(self, backend_params: Optional[BackendParams] = None) -> None: + assert self.roma is not None + await self.roma.set_vector_coordinate_position(1, 9000, 2000, 2464, 1800, None, 1, 0) + await self.roma.action_move_vector_coordinate_position() + + async def open_gripper( + self, gripper_width: float, backend_params: Optional[BackendParams] = None + ) -> None: + assert self.roma is not None + await self.roma.position_absolute_g(int(gripper_width * 10)) + + async def close_gripper( + self, gripper_width: float, backend_params: Optional[BackendParams] = None + ) -> None: + assert self.roma is not None + await self.roma.set_gripper_params(100, 75) + await self.roma.grip_plate(int(gripper_width * 10)) + + async def is_gripper_closed(self, backend_params: Optional[BackendParams] = None) -> bool: + assert self.roma is not None + pos = await self.roma.report_g_param(0) + return pos < 100 # heuristic: < 10mm = closed + + async def get_gripper_location( + self, backend_params: Optional[BackendParams] = None + ) -> GripperLocation: + assert self.roma is not None + x = await self.roma.report_x_param(0) + y = (await self.roma.report_y_param(0))[0] + z = await self.roma.report_z_param(0) + r = await self.roma.report_r_param(0) + return GripperLocation( + location=Coordinate(x=x / 10.0, y=y / 10.0, z=z / 10.0), + rotation=Rotation(x=0, y=0, z=r / 10.0), + ) From f6f46e71af7bd52721cd63ddd14f01c6bb1912f2 Mon Sep 17 00:00:00 2001 From: Robert-Keyser-Calico Date: Sat, 28 Mar 2026 12:08:58 -0700 Subject: [PATCH 07/37] Add hardware testing scripts, checklist, and jog/teach tool - hardware_testing_checklist.md: 8 test scenarios with pass/fail tables, Z-calibration procedure, failure actions - test_v1b1_init.py: v1b1 TecanEVO initialization test (cold + warm boot) - test_v1b1_pipette.py: v1b1 full pipetting cycle test - jog_and_teach.py: Interactive jog/teach/labware editor with: - X/Y/Z jogging with configurable step sizes - Position recording and goto - Deck layout visualization (carriers, sites, contents) - Labware detail viewer (z_start, z_max, area, well pitch) - Live Z-value teaching (teach z_start from current position) - Labware edits saved to labware_edits.json - REE axis status, tip status checks Co-Authored-By: Claude Opus 4.6 (1M context) --- keyser-testing/hardware_testing_checklist.md | 186 ++++++++++ keyser-testing/jog_and_teach.py | 367 +++++++++++++++++++ keyser-testing/test_v1b1_init.py | 106 ++++++ keyser-testing/test_v1b1_pipette.py | 119 ++++++ 4 files changed, 778 insertions(+) create mode 100644 keyser-testing/hardware_testing_checklist.md create mode 100644 keyser-testing/jog_and_teach.py create mode 100644 keyser-testing/test_v1b1_init.py create mode 100644 keyser-testing/test_v1b1_pipette.py diff --git a/keyser-testing/hardware_testing_checklist.md b/keyser-testing/hardware_testing_checklist.md new file mode 100644 index 00000000000..1e9983d81f1 --- /dev/null +++ b/keyser-testing/hardware_testing_checklist.md @@ -0,0 +1,186 @@ +# Tecan EVO Hardware Testing Checklist + +## Pre-Test Setup + +### Equipment Required +- [ ] EVO 150 powered on +- [ ] USB cable connected to pylabrobot PC +- [ ] EVOware PC disconnected from USB (only one client at a time) +- [ ] DiTi 50uL SBS tips loaded (position 3 on MP_3Pos at rail 16) +- [ ] Eppendorf 96-well plate with water in column 1 (position 1) +- [ ] Empty Eppendorf 96-well plate (position 2) +- [ ] `.venv` activated, `pip install -e ".[usb]"` done + +### Software +- [ ] `v1b1-tecan-evo` branch checked out +- [ ] `keyser-testing/labware_library.py` has correct Z values (taught from jog tool) + +--- + +## Test 1: Initialization (Cold Boot) + +**Script:** `keyser-testing/test_v1b1_init.py` + +### Steps +1. Power cycle the EVO +2. Run the init test script +3. Verify each phase completes: + +| Step | Expected | Pass? | +|------|----------|-------| +| USB connection | "USB connected" | [ ] | +| ZaapMotion boot exit | All 8 tips XP2000/ZMA | [ ] | +| ZaapMotion motor config | 33 commands × 8 tips OK | [ ] | +| Safety module (SPN/SPS3) | OK | [ ] | +| PIA (all axes) | REE0 = `@@@@@@@@@@@` | [ ] | +| RoMa init + park | OK | [ ] | +| LiHa range queries | num_channels=8, z_range~2100 | [ ] | +| Plunger init | PID, PVL, PPR sequence completes | [ ] | + +### Failure Actions +- ZaapMotion boot exit fails → check USB connection, retry +- PIA fails → check REE0 for which axis, use jog tool to investigate +- RoMa fails with error 5 → RoMa not present, set `has_roma=False` + +--- + +## Test 2: Initialization (Warm Reconnect) + +### Steps +1. Run Test 1 successfully +2. Stop the device (`evo.stop()`) +3. Run the init test again WITHOUT power cycling + +| Step | Expected | Pass? | +|------|----------|-------| +| REE0 check | Not "A" or "G" → skip full init | [ ] | +| Quick setup | Channel count + ranges loaded fast | [ ] | +| Total time | < 5 seconds (vs ~45s for full init) | [ ] | + +--- + +## Test 3: Tip Pickup + +**Script:** `keyser-testing/test_v1b1_tips.py` + +### Steps +1. Initialize EVO +2. Pick up 8 tips from column 1 + +| Step | Expected | Pass? | +|------|----------|-------| +| X/Y positioning | Channels aligned over tip column | [ ] | +| Z approach | Channels descend to tips | [ ] | +| Tip engagement | Force feedback engages all 8 tips | [ ] | +| Z retract | Channels lift with tips mounted | [ ] | +| RTS check | Tip status = 255 (all mounted) | [ ] | + +### Calibration Notes +- If X is off by > 2mm: adjust X offset in labware or jog tool +- If Z doesn't reach tips: adjust z_start in tip rack definition +- If some tips don't engage: check individual channel alignment + +--- + +## Test 4: Tip Drop + +### Steps +1. With tips mounted, drop all 8 back to the rack + +| Step | Expected | Pass? | +|------|----------|-------| +| Move to drop position | Same X/Y as pickup | [ ] | +| Plunger empty (PPA0) | Plunger returns to zero | [ ] | +| SDT + AST | Tips ejected cleanly | [ ] | +| RTS check | Tip status = 0 (none mounted) | [ ] | + +--- + +## Test 5: Aspirate + +### Steps +1. Pick up tips +2. Move to source plate (position 1) +3. Aspirate 25µL from column 1 + +| Step | Expected | Pass? | +|------|----------|-------| +| X/Y positioning | Channels over well column 1 | [ ] | +| Y-spacing (ys) | 90 (9mm well pitch) | [ ] | +| Leading airgap | PVL + SEP + PPR with force mode | [ ] | +| LLD detection | MDT finds liquid surface | [ ] | +| Aspirate tracking | MTR with correct steps (~2660 for 25µL) | [ ] | +| Z retract | Channels lift after aspiration | [ ] | +| Trailing airgap | PPR with force mode | [ ] | +| Visual check | No dripping, liquid in tips | [ ] | + +### Known Issues +- Z-start may be too high — use jog tool to teach correct plate Z +- If error 3 on PAA: check ys value (must be 90-380) + +--- + +## Test 6: Dispense + +### Steps +1. After aspirating, move to destination plate (position 2) +2. Dispense 25µL into column 1 + +| Step | Expected | Pass? | +|------|----------|-------| +| X/Y positioning | Channels over dest well column 1 | [ ] | +| Dispense tracking | MTR with negative steps (~-2660) | [ ] | +| Visual check | Liquid dispensed into wells | [ ] | + +--- + +## Test 7: Full Cycle + +### Steps +1. Initialize +2. Pick up 8 tips +3. Aspirate 25µL from source column 1 +4. Dispense 25µL to dest column 1 +5. Drop tips + +| Step | Pass? | Notes | +|------|-------|-------| +| Init | [ ] | | +| Tip pickup | [ ] | | +| Aspirate | [ ] | | +| Dispense | [ ] | | +| Tip drop | [ ] | | +| Clean stop | [ ] | | + +--- + +## Test 8: RoMa Plate Handling (if applicable) + +### Steps +1. Initialize with `has_roma=True` +2. Pick up plate from position 1 +3. Drop plate at position 2 + +| Step | Expected | Pass? | +|------|----------|-------| +| RoMa init + park | Arm moves to park position | [ ] | +| Move to plate | Arm positions over carrier | [ ] | +| Gripper open | Gripper opens to plate width | [ ] | +| Descend + grip | Gripper closes on plate | [ ] | +| Lift + transfer | Plate moved safely | [ ] | +| Place + release | Plate placed at destination | [ ] | +| Arm park | Arm returns to park | [ ] | + +--- + +## Z-Calibration Procedure + +Use `keyser-testing/jog_liha.py` to teach positions: + +1. **Tip rack z_start**: Jog to just above tip tops, record Z → set as `z_start` in labware +2. **Tip rack z_max**: Jog to bottom of tip search range, record Z → set as `z_max` +3. **Plate z_start**: Jog tip into well liquid surface, record Z → set as plate `z_start` +4. **Plate z_dispense**: Jog to dispense height (above well bottom), record Z → set as `z_dispense` +5. **Plate z_max**: Jog to maximum depth, record Z → set as `z_max` + +Save all taught positions in `keyser-testing/taught_positions.json` for reference. diff --git a/keyser-testing/jog_and_teach.py b/keyser-testing/jog_and_teach.py new file mode 100644 index 00000000000..5220bdc1282 --- /dev/null +++ b/keyser-testing/jog_and_teach.py @@ -0,0 +1,367 @@ +"""Interactive jog, teach, and labware editor for Tecan EVO Air LiHa. + +Features: + - Jog X/Y/Z in configurable step sizes + - Teach and record positions (tip z_start, plate z_start, etc.) + - Display current deck layout with carriers, plates, tip racks + - Edit labware Z definitions (z_start, z_dispense, z_max) from taught positions + - Save/load taught positions and edited labware to JSON + +Commands: + Movement: + x+/x- Jog X axis + y+/y- Jog Y axis + z+/z- Jog Z axis (z+ = toward deck) + s Set step size in mm (s1, s5, s10, s0.5) + + Position: + p Print current position + r Record position with label + goto