|
6 | 6 | from __future__ import annotations |
7 | 7 |
|
8 | 8 | import asyncio |
| 9 | +import enum |
9 | 10 | import logging |
10 | 11 | import time |
11 | 12 | from collections.abc import AsyncIterator |
12 | 13 | from contextlib import asynccontextmanager |
13 | | -from typing import NamedTuple |
| 14 | +from typing import NamedTuple, TypedDict, TypeVar |
14 | 15 |
|
15 | 16 | from pylabrobot.device import Driver |
16 | 17 | from pylabrobot.io.binary import Reader |
|
20 | 21 | from .enums import ( |
21 | 22 | EL406Motor, |
22 | 23 | EL406MotorHomeType, |
| 24 | + EL406Sensor, |
23 | 25 | EL406StepType, |
| 26 | + EL406SyringeManifold, |
24 | 27 | EL406WasherManifold, |
25 | 28 | ) |
26 | 29 | from .error_codes import get_error_message |
@@ -808,3 +811,144 @@ async def set_washer_manifold(self, manifold: EL406WasherManifold) -> None: |
808 | 811 | framed_command = build_framed_message(command=0xD9, data=data) |
809 | 812 | await self._send_framed_command(framed_command) |
810 | 813 | logger.info("Washer manifold set to: %s", manifold.name) |
| 814 | + |
| 815 | + # --------------------------------------------------------------------------- |
| 816 | + # Queries |
| 817 | + # --------------------------------------------------------------------------- |
| 818 | + |
| 819 | + @staticmethod |
| 820 | + def _extract_payload_byte(response_data: bytes) -> int: |
| 821 | + """Extract the first payload byte, handling optional 2-byte header prefix.""" |
| 822 | + return response_data[2] if len(response_data) > 2 else response_data[0] |
| 823 | + |
| 824 | + _E = TypeVar("_E", bound=enum.Enum) |
| 825 | + |
| 826 | + async def _query_enum(self, command: int, enum_cls: type[_E], label: str) -> _E: |
| 827 | + """Send a framed query and parse the response byte as an *enum_cls* member.""" |
| 828 | + logger.info("Querying %s", label) |
| 829 | + response_data = await self._send_framed_query(command) |
| 830 | + logger.debug("%s response data: %s", label.capitalize(), response_data.hex()) |
| 831 | + value_byte = self._extract_payload_byte(response_data) |
| 832 | + |
| 833 | + try: |
| 834 | + result = enum_cls(value_byte) |
| 835 | + except ValueError: |
| 836 | + logger.warning("Unknown %s: %d (0x%02X)", label, value_byte, value_byte) |
| 837 | + raise ValueError( |
| 838 | + f"Unknown {label}: {value_byte} (0x{value_byte:02X}). " |
| 839 | + f"Valid types: {[m.name for m in enum_cls]}" |
| 840 | + ) from None |
| 841 | + |
| 842 | + logger.info("%s: %s (0x%02X)", label.capitalize(), result.name, result.value) |
| 843 | + return result |
| 844 | + |
| 845 | + async def request_washer_manifold(self) -> EL406WasherManifold: |
| 846 | + """Query the installed washer manifold type.""" |
| 847 | + return await self._query_enum( |
| 848 | + command=0xD8, enum_cls=EL406WasherManifold, label="washer manifold type" |
| 849 | + ) |
| 850 | + |
| 851 | + async def request_syringe_manifold(self) -> EL406SyringeManifold: |
| 852 | + """Query the installed syringe manifold type.""" |
| 853 | + return await self._query_enum( |
| 854 | + command=0xBB, enum_cls=EL406SyringeManifold, label="syringe manifold type" |
| 855 | + ) |
| 856 | + |
| 857 | + async def request_serial_number(self) -> str: |
| 858 | + """Query the product serial number.""" |
| 859 | + logger.info("Querying product serial number") |
| 860 | + response_data = await self._send_framed_query(command=0x0100) |
| 861 | + serial_number = response_data[2:].decode("ascii", errors="ignore").strip().rstrip("\x00") |
| 862 | + logger.info("Product serial number: %s", serial_number) |
| 863 | + return serial_number |
| 864 | + |
| 865 | + async def request_sensor_enabled(self, sensor: EL406Sensor) -> bool: |
| 866 | + """Query whether a specific sensor is enabled.""" |
| 867 | + logger.info("Querying sensor enabled status: %s", sensor.name) |
| 868 | + response_data = await self._send_framed_query(command=0xD2, data=bytes([sensor.value])) |
| 869 | + logger.debug("Sensor enabled response data: %s", response_data.hex()) |
| 870 | + enabled = bool(self._extract_payload_byte(response_data)) |
| 871 | + logger.info("Sensor %s enabled: %s", sensor.name, enabled) |
| 872 | + return enabled |
| 873 | + |
| 874 | + class SyringeBoxInfo(TypedDict): |
| 875 | + box_type: int |
| 876 | + box_size: int |
| 877 | + installed: bool |
| 878 | + |
| 879 | + async def request_syringe_box_info(self) -> SyringeBoxInfo: |
| 880 | + """Get syringe box information.""" |
| 881 | + logger.info("Querying syringe box info") |
| 882 | + response_data = await self._send_framed_query(command=0xF6) |
| 883 | + logger.debug("Syringe box info response data: %s", response_data.hex()) |
| 884 | + |
| 885 | + box_type = self._extract_payload_byte(response_data) |
| 886 | + box_size = ( |
| 887 | + response_data[3] |
| 888 | + if len(response_data) > 3 |
| 889 | + else (response_data[1] if len(response_data) > 1 else 0) |
| 890 | + ) |
| 891 | + installed = box_type != 0 |
| 892 | + |
| 893 | + info = self.SyringeBoxInfo(box_type=box_type, box_size=box_size, installed=installed) |
| 894 | + logger.info("Syringe box info: %s", info) |
| 895 | + return info |
| 896 | + |
| 897 | + async def request_peristaltic_installed(self, selector: int) -> bool: |
| 898 | + """Check if a peristaltic pump is installed.""" |
| 899 | + if selector < 0 or selector > 1: |
| 900 | + raise ValueError(f"Invalid selector {selector}. Must be 0 (primary) or 1 (secondary).") |
| 901 | + |
| 902 | + logger.info("Querying peristaltic pump installed: selector=%d", selector) |
| 903 | + response_data = await self._send_framed_query(command=0x0104, data=bytes([selector])) |
| 904 | + logger.debug("Peristaltic installed response data: %s", response_data.hex()) |
| 905 | + |
| 906 | + installed = bool(self._extract_payload_byte(response_data)) |
| 907 | + |
| 908 | + logger.info("Peristaltic pump %d installed: %s", selector, installed) |
| 909 | + return installed |
| 910 | + |
| 911 | + class InstrumentSettings(TypedDict): |
| 912 | + washer_manifold: EL406WasherManifold |
| 913 | + syringe_manifold: EL406SyringeManifold |
| 914 | + syringe_box: "EL406Driver.SyringeBoxInfo" |
| 915 | + peristaltic_pump_1: bool |
| 916 | + peristaltic_pump_2: bool |
| 917 | + |
| 918 | + async def request_instrument_settings(self) -> InstrumentSettings: |
| 919 | + """Get current instrument hardware configuration.""" |
| 920 | + logger.info("Querying instrument settings from hardware") |
| 921 | + |
| 922 | + washer_manifold = await self.request_washer_manifold() |
| 923 | + syringe_manifold = await self.request_syringe_manifold() |
| 924 | + syringe_box = await self.request_syringe_box_info() |
| 925 | + peristaltic_1 = await self.request_peristaltic_installed(0) |
| 926 | + peristaltic_2 = await self.request_peristaltic_installed(1) |
| 927 | + |
| 928 | + settings = self.InstrumentSettings( |
| 929 | + washer_manifold=washer_manifold, |
| 930 | + syringe_manifold=syringe_manifold, |
| 931 | + syringe_box=syringe_box, |
| 932 | + peristaltic_pump_1=peristaltic_1, |
| 933 | + peristaltic_pump_2=peristaltic_2, |
| 934 | + ) |
| 935 | + logger.info("Instrument settings: %s", settings) |
| 936 | + return settings |
| 937 | + |
| 938 | + class SelfCheckResult(TypedDict): |
| 939 | + success: bool |
| 940 | + error_code: int |
| 941 | + message: str |
| 942 | + |
| 943 | + async def run_self_check(self) -> SelfCheckResult: |
| 944 | + """Run instrument self-check diagnostics.""" |
| 945 | + logger.info("Running instrument self-check") |
| 946 | + response_data = await self._send_framed_query(command=0x95, timeout=LONG_READ_TIMEOUT) |
| 947 | + logger.debug("Self-check response data: %s", response_data.hex()) |
| 948 | + error_code = self._extract_payload_byte(response_data) |
| 949 | + success = error_code == 0 |
| 950 | + |
| 951 | + message = "Self-check passed" if success else f"Self-check failed (error code: {error_code})" |
| 952 | + result = self.SelfCheckResult(success=success, error_code=error_code, message=message) |
| 953 | + logger.info("Self-check result: %s", result["message"]) |
| 954 | + return result |
0 commit comments