diff --git a/docs/api/pylabrobot.scales.rst b/docs/api/pylabrobot.scales.rst index 2b8346e56ee..5d8d438cea6 100644 --- a/docs/api/pylabrobot.scales.rst +++ b/docs/api/pylabrobot.scales.rst @@ -22,4 +22,4 @@ Backends :recursive: chatterbox.ScaleChatterboxBackend - mettler_toledo_backend.MettlerToledoWXS205SDU + mettler_toledo.backend.MettlerToledoWXS205SDUBackend diff --git a/docs/user_guide/02_analytical/scales/mettler-toledo-WXS205SDU.ipynb b/docs/user_guide/02_analytical/scales/mettler-toledo-WXS205SDU.ipynb index 483ee4079ce..46ed83fef9c 100644 --- a/docs/user_guide/02_analytical/scales/mettler-toledo-WXS205SDU.ipynb +++ b/docs/user_guide/02_analytical/scales/mettler-toledo-WXS205SDU.ipynb @@ -8,7 +8,7 @@ "\n", "| Summary | Image |\n", "|------------|--------|\n", - "| |
![shaker](img/mettler_toledo_wx_scale.png)
Figure: Mettler Toledo WXS205SDU used for gravimetric liquid transfer verification
|" + "| |
![scale](img/mettler_toledo_wx_scale.png)
Figure: Mettler Toledo WXS205SDU used for gravimetric liquid transfer verification
|" ] }, { @@ -31,8 +31,8 @@ "\n", "| Configuration Name | Has Load Cell | Has Electronics Unit | Has Terminal/Display |\n", "|---------------|---------------|-----------------|---------------------|\n", - "| **Balance** | ✓ | ✓ | ✓ |\n", - "| **Weigh Module** (or \"Bridge\") | ✓ | ✓ | ✗ |\n", + "| **Balance** | \u2713 | \u2713 | \u2713 |\n", + "| **Weigh Module** (or \"Bridge\") | \u2713 | \u2713 | \u2717 |\n", "\n", "**Note:** When used with PyLabRobot, the terminal/display is optional since all control is done programmatically.\n", "\n", @@ -51,48 +51,113 @@ "---\n", "## Setup (Programmatic)\n", "\n", - "Import the necessary classes:" + "Select a protocol mode:" ] }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, + "execution_count": 1, + "metadata": { + "execution": { + "iopub.execute_input": "2026-03-30T12:01:11.722108Z", + "iopub.status.busy": "2026-03-30T12:01:11.721949Z", + "iopub.status.idle": "2026-03-30T12:01:11.725764Z", + "shell.execute_reply": "2026-03-30T12:01:11.725399Z" + } + }, "outputs": [], "source": [ - "from pylabrobot.scales import Scale\n", - "from pylabrobot.scales.mettler_toledo_backend import MettlerToledoWXS205SDUBackend\n" + "protocol_mode = \"simulation\" # \"execution\" or \"simulation\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Initialize the scale backend and create a scale instance.\n", - "You'll need to specify the serial port where your scale is connected:" + "### Logging\n\nIn Jupyter, PyLabRobot automatically shows INFO-level log messages (including device identity discovered during `setup()`).\n\nTo decrease the log level and save logs to disk, set up file logging:\n\nSee the [Logging documentation](../../machine-agnostic-features/logging-and-validation/logging.ipynb) for details." ] }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, + "execution_count": 2, + "metadata": { + "execution": { + "iopub.execute_input": "2026-03-30T12:01:11.727610Z", + "iopub.status.busy": "2026-03-30T12:01:11.727461Z", + "iopub.status.idle": "2026-03-30T12:01:11.778818Z", + "shell.execute_reply": "2026-03-30T12:01:11.778624Z" + } + }, "outputs": [ { - "data": { - "text/plain": [ - "0.00148" - ] - }, - "execution_count": 3, - "metadata": {}, - "output_type": "execute_result" + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-03-30 17:05:11,973 - pylabrobot - INFO - === MT Scale tutorial started ===\n" + ] + } + ], + "source": [ + "import logging\n", + "from pylabrobot.io import LOG_LEVEL_IO\n", + "import pylabrobot\n", + "from datetime import datetime\n", + "\n", + "timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')\n", + "log_file = f\"./logs/{protocol_mode}/{timestamp}_mt_scale_tutorial.log\"\n", + "\n", + "pylabrobot.verbose(True, level=LOG_LEVEL_IO)\n", + "pylabrobot.setup_logger(log_dir=log_file, level=LOG_LEVEL_IO)\n", + "\n", + "plr_logger = logging.getLogger(\"pylabrobot\")\n", + "plr_logger.info(\"=== MT Scale tutorial started ===\")" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": { + "execution": { + "iopub.execute_input": "2026-03-30T12:01:11.795670Z", + "iopub.status.busy": "2026-03-30T12:01:11.795553Z", + "iopub.status.idle": "2026-03-30T12:01:11.848893Z", + "shell.execute_reply": "2026-03-30T12:01:11.848711Z" + } + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-03-30 17:05:12,021 - pylabrobot - INFO - [Mettler Toledo Scale] Connected (simulation)\n", + "Device type: WXS205SDU\n", + "Configuration: Balance\n", + "Serial number: SIM0000001\n", + "Firmware: 1.10 18.6.4.1361.772\n", + "Capacity: 220.0 g\n", + "Supported commands: ['@', 'C', 'D', 'DAT', 'DW', 'I0', 'I1', 'I10', 'I11', 'I14', 'I15', 'I2', 'I3', 'I4', 'I5', 'I50', 'M01', 'M02', 'M03', 'M21', 'M27', 'M28', 'S', 'SC', 'SI', 'SIR', 'SIS', 'SNR', 'SR', 'T', 'TA', 'TAC', 'TC', 'TI', 'TIM', 'UPD', 'Z', 'ZC', 'ZI']\n" + ] } ], "source": [ - "backend = MettlerToledoWXS205SDUBackend(port=\"/dev/cu.usbserial-110\")\n", + "from pylabrobot.scales import Scale\n", + "\n", + "if protocol_mode == \"execution\":\n", + "\n", + " from pylabrobot.scales.mettler_toledo import MettlerToledoWXS205SDUBackend\n", + "\n", + " # Platform-specific port: Mac: /dev/cu.usbserial-*, Linux: /dev/ttyUSB*, Windows: COM*\n", + " backend = MettlerToledoWXS205SDUBackend(port=\"/dev/cu.usbserial-110\")\n", + "\n", + "elif protocol_mode == \"simulation\":\n", + "\n", + " from pylabrobot.scales.mettler_toledo.simulator import MettlerToledoSICSSimulator\n", + "\n", + " backend = MettlerToledoSICSSimulator()\n", + "\n", "scale = Scale(name=\"scale\", backend=backend, size_x=0, size_y=0, size_z=0)\n", "\n", - "await scale.setup()\n" + "await scale.setup()" ] }, { @@ -102,9 +167,9 @@ "```{Warning}\n", "### Warm-up Time Required\n", "\n", - "This scale requires a **warm-up period** after being powered on. Mettler Toledo documentation specifies 60-90 minutes, though in practice 30 minutes is often sufficient.\n", + "This scale requires a **warm-up period** after being powered on. Mettler Toledo documentation specifies 60 minutes, though in practice 30 minutes is often sufficient.\n", "\n", - "If you attempt measurements before the scale has warmed up, you'll likely encounter an error: *\"Command understood but currently not executable (balance is currently executing another command)\"*.\n", + "If you attempt measurements before the scale has warmed up, you may see unstable readings or the scale may return status `I` (command understood but not currently executable).\n", "\n", "**Tip**: Sometimes power-cycling the scale (unplugging and replugging the power cord) can help resolve initialization issues.\n", "```\n", @@ -115,6 +180,86 @@ "```" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Available methods\n", + "\n", + "Different Mettler Toledo models support different MT-SICS commands. During\n", + "`setup()`, the backend queries the device to discover which commands it\n", + "supports and gates methods accordingly. Use `request_supported_methods()`\n", + "to see what is available on your device:" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['cancel_all',\n", + " 'clear_tare',\n", + " 'deserialize',\n", + " 'get_all_instances',\n", + " 'get_weight',\n", + " 'measure_temperature',\n", + " 'read_dynamic_weight',\n", + " 'read_stable_weight',\n", + " 'read_stable_weight_repeat_on_change',\n", + " 'read_weight',\n", + " 'read_weight_value_immediately',\n", + " 'request_adjustment_history',\n", + " 'request_auto_zero',\n", + " 'request_capacity',\n", + " 'request_date',\n", + " 'request_device_id',\n", + " 'request_device_info',\n", + " 'request_device_type',\n", + " 'request_environment_condition',\n", + " 'request_firmware_version',\n", + " 'request_model_designation',\n", + " 'request_net_weight_with_status',\n", + " 'request_remaining_weighing_range',\n", + " 'request_serial_number',\n", + " 'request_software_material_number',\n", + " 'request_supported_methods',\n", + " 'request_tare_weight',\n", + " 'request_time',\n", + " 'request_update_rate',\n", + " 'request_uptime_minutes',\n", + " 'request_weighing_mode',\n", + " 'reset',\n", + " 'send_command',\n", + " 'serialize',\n", + " 'set_date',\n", + " 'set_display_text',\n", + " 'set_host_unit_grams',\n", + " 'set_time',\n", + " 'set_weight_display',\n", + " 'setup',\n", + " 'stop',\n", + " 'tare',\n", + " 'tare_immediately',\n", + " 'tare_stable',\n", + " 'tare_timeout',\n", + " 'zero',\n", + " 'zero_immediately',\n", + " 'zero_stable',\n", + " 'zero_timeout']" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "backend.request_supported_methods()" + ] + }, { "cell_type": "markdown", "metadata": {}, @@ -122,7 +267,7 @@ "---\n", "## Usage\n", "\n", - "The scale implements the three core methods required for all PyLabRobot scales.\n", + "The scale implements four core methods required for all PyLabRobot scales.\n", "\n", "They are presented here in typical workflow order:\n", "\n", @@ -135,11 +280,27 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 5, + "metadata": { + "execution": { + "iopub.execute_input": "2026-03-30T12:01:11.850228Z", + "iopub.status.busy": "2026-03-30T12:01:11.850139Z", + "iopub.status.idle": "2026-03-30T12:01:11.852160Z", + "shell.execute_reply": "2026-03-30T12:01:11.851968Z" + } + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-03-30 17:05:12,031 - pylabrobot - IO - [Mettler Toledo Scale] Sent command: ZC 5000\n", + "2026-03-30 17:05:12,031 - pylabrobot - IO - [Mettler Toledo Scale] Received response: ZC A \n" + ] + } + ], "source": [ - "await scale.zero(timeout=5)\n" + "await scale.zero(timeout=5)" ] }, { @@ -160,32 +321,77 @@ "Resets the scale reading to zero while accounting for the weight of a container already on the platform. Use this when you want to measure only the weight of material being added to a container.\n", "\n", "**Example workflow**:\n", - "Place an empty beaker on the scale → tare → dispense liquid → read only the liquid's weight." + "Place an empty beaker on the scale \u2192 tare \u2192 dispense liquid \u2192 read only the liquid's weight." ] }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 6, + "metadata": { + "execution": { + "iopub.execute_input": "2026-03-30T12:01:11.853205Z", + "iopub.status.busy": "2026-03-30T12:01:11.853148Z", + "iopub.status.idle": "2026-03-30T12:01:11.855046Z", + "shell.execute_reply": "2026-03-30T12:01:11.854876Z" + } + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-03-30 17:05:12,036 - pylabrobot - IO - [Mettler Toledo Scale] Sent command: TC 5000\n", + "2026-03-30 17:05:12,037 - pylabrobot - IO - [Mettler Toledo Scale] Received response: TC S 50.00000 g\n" + ] + } + ], "source": [ - "await scale.tare(timeout=5)\n" + "if protocol_mode == \"simulation\":\n", + " backend.platform_weight = 50.0 # simulate placing a 50g beaker\n", + "\n", + "await scale.tare(timeout=5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "The difference between load at `scale.zero()` and load at `scale.tare()` is stored in and can be retrieved from the scales's memory:" + "The difference between load at `scale.zero()` and load at `scale.tare()` is stored in and can be retrieved from the scale's memory:" ] }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 7, + "metadata": { + "execution": { + "iopub.execute_input": "2026-03-30T12:01:11.855951Z", + "iopub.status.busy": "2026-03-30T12:01:11.855881Z", + "iopub.status.idle": "2026-03-30T12:01:11.858966Z", + "shell.execute_reply": "2026-03-30T12:01:11.858798Z" + } + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-03-30 17:05:12,042 - pylabrobot - IO - [Mettler Toledo Scale] Sent command: TA\n", + "2026-03-30 17:05:12,042 - pylabrobot - IO - [Mettler Toledo Scale] Received response: TA A 50.00000 g\n" + ] + }, + { + "data": { + "text/plain": [ + "50.0" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ - "await scale.request_tare_weight()\n" + "await scale.request_tare_weight()" ] }, { @@ -193,29 +399,47 @@ "metadata": {}, "source": [ "\n", - "### `read_weight()`\n", + "### `.read_weight()`\n", "\n", "Retrieves the current weight measurement from the scale **in grams**." ] }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, + "execution_count": 8, + "metadata": { + "execution": { + "iopub.execute_input": "2026-03-30T12:01:11.859830Z", + "iopub.status.busy": "2026-03-30T12:01:11.859763Z", + "iopub.status.idle": "2026-03-30T12:01:11.861851Z", + "shell.execute_reply": "2026-03-30T12:01:11.861685Z" + } + }, "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-03-30 17:05:12,050 - pylabrobot - IO - [Mettler Toledo Scale] Sent command: SI\n", + "2026-03-30 17:05:12,051 - pylabrobot - IO - [Mettler Toledo Scale] Received response: SI S 0.01060 g\n" + ] + }, { "data": { "text/plain": [ - "0.00148" + "0.0106" ] }, - "execution_count": 4, + "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "await scale.read_weight(timeout=0)\n" + "if protocol_mode == \"simulation\":\n", + " backend.sample_weight = 0.0106 # simulate dispensing ~10 uL of liquid\n", + "\n", + "await scale.read_weight(timeout=0)" ] }, { @@ -230,38 +454,119 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], + "execution_count": 9, + "metadata": { + "execution": { + "iopub.execute_input": "2026-03-30T12:01:11.862744Z", + "iopub.status.busy": "2026-03-30T12:01:11.862691Z", + "iopub.status.idle": "2026-03-30T12:01:11.865675Z", + "shell.execute_reply": "2026-03-30T12:01:11.865501Z" + } + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-03-30 17:05:12,056 - pylabrobot - IO - [Mettler Toledo Scale] Sent command: Z\n", + "2026-03-30 17:05:12,057 - pylabrobot - IO - [Mettler Toledo Scale] Received response: Z A \n", + "2026-03-30 17:05:12,058 - pylabrobot - IO - [Mettler Toledo Scale] Sent command: TC 5000\n", + "2026-03-30 17:05:12,058 - pylabrobot - IO - [Mettler Toledo Scale] Received response: TC S -0.01060 g\n", + "2026-03-30 17:05:12,059 - pylabrobot - IO - [Mettler Toledo Scale] Sent command: SC 5000\n", + "2026-03-30 17:05:12,059 - pylabrobot - IO - [Mettler Toledo Scale] Received response: SC S 0.01060 g\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Dispensed 10.60 mg (10.00 uL)\n" + ] + } + ], "source": [ "import asyncio\n", "\n", - "# 1. Zero the scale\n", - "await scale.zero(timeout=\"stable\")\n", + "# 1. Place container with liquid on the weighing pan\n", + "if protocol_mode == \"simulation\":\n", + " backend.platform_weight = 45.0 # 45g container on weighing pan\n", + " backend.sample_weight = 1.06 # ~1 mL of liquid (1000 uL)\n", "\n", - "# 2. Place container with liquid on scale\n", + "# 2. Zero the scale (zeroes out container + liquid)\n", + "await scale.zero(timeout=\"stable\")\n", "\n", "# 3. Aspirate liquid from container (on scale)\n", "# (your liquid handling code here)\n", + "if protocol_mode == \"simulation\":\n", + " backend.sample_weight = 1.06 - 0.0106 # aspirated ~10 uL\n", "\n", "# 4. Tare the scale (ignore weight loss from aspiration)\n", "await scale.tare(timeout=5)\n", "\n", "# 5. Dispense liquid back into same container (on scale)\n", "# (your liquid handling code here)\n", + "if protocol_mode == \"simulation\":\n", + " backend.sample_weight = 1.06 # dispensed ~10 uL back\n", "\n", "# 6. Brief pause to allow scale to settle\n", - "await asyncio.sleep(1) # Allow 1 second for settling after dispense\n", + "if protocol_mode == \"execution\":\n", + " await asyncio.sleep(1)\n", "\n", "# 7. Read the weight of dispensed liquid\n", "weight_g = await scale.read_weight(timeout=5)\n", "\n", "# 8. Convert weight to volume\n", "weight_mg = weight_g * 1000\n", - "liquid_density = 1.06 # mg/µL for 50% v/v glycerol at ~25°C, 1 atm\n", + "liquid_density = 1.06 # mg/uL for 50% v/v glycerol at ~25C, 1 atm\n", "volume_uL = weight_mg / liquid_density\n", "\n", - "print(f\"Dispensed {weight_mg:.2f} mg or ({volume_uL:.2f} µL)\")\n" + "print(f\"Dispensed {weight_mg:.2f} mg ({volume_uL:.2f} uL)\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Temperature\n", + "\n", + "Read the current temperature from the scale's internal sensor. Useful for gravimetric\n", + "verification where temperature affects liquid density and evaporation rate." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": { + "execution": { + "iopub.execute_input": "2026-03-30T12:01:11.866619Z", + "iopub.status.busy": "2026-03-30T12:01:11.866564Z", + "iopub.status.idle": "2026-03-30T12:01:11.868851Z", + "shell.execute_reply": "2026-03-30T12:01:11.868707Z" + } + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-03-30 17:05:12,063 - pylabrobot - IO - [Mettler Toledo Scale] Sent command: M28\n", + "2026-03-30 17:05:12,064 - pylabrobot - IO - [Mettler Toledo Scale] Received response: M28 A 1 21.3\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Scale temperature: 21.3 C\n" + ] + } + ], + "source": [ + "if protocol_mode == \"simulation\":\n", + " backend.temperature = 21.3\n", + "\n", + "temp_c = await backend.measure_temperature()\n", + "print(f\"Scale temperature: {temp_c} C\")" ] }, { @@ -273,40 +578,88 @@ "\n", "#### Example: Measuring Read Time\n", "\n", - "You can easily benchmark the scale's performance using standard Python timing:" + "You can benchmark the scale's read latency using standard Python timing.\n", + "This section only runs in execution mode (requires a physical scale)." ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 11, + "metadata": { + "execution": { + "iopub.execute_input": "2026-03-30T12:01:11.872604Z", + "iopub.status.busy": "2026-03-30T12:01:11.872550Z", + "iopub.status.idle": "2026-03-30T12:01:11.874253Z", + "shell.execute_reply": "2026-03-30T12:01:11.874029Z" + } + }, + "outputs": [], + "source": [ + "if protocol_mode == \"execution\":\n", + " import time\n", + " import numpy as np\n", + "\n", + " times = []\n", + " for i in range(10):\n", + " t0 = time.monotonic_ns()\n", + " await scale.read_weight(timeout=\"stable\")\n", + " t1 = time.monotonic_ns()\n", + " times.append((t1 - t0) / 1e6)\n", + "\n", + " print(f\"{np.mean(times):.2f} ms +/- {np.std(times):.2f} ms\")" + ] + }, + { + "cell_type": "markdown", "metadata": {}, + "source": [ + "---\n", + "### Teardown\n", + "\n", + "The scale resets the device to a clean state before disconnecting. If the serial\n", + "port is already broken (e.g. unexpected disconnect), the reset is skipped gracefully.\n", + "\n", + "For scripts and automated protocols, use `async with` to guarantee cleanup even\n", + "if an error occurs:\n", + "\n", + "```python\n", + "async with Scale(name=\"scale\", backend=backend, size_x=0, size_y=0, size_z=0) as scale:\n", + " await scale.zero()\n", + " weight = await scale.read_weight()\n", + " # scale.stop() is called automatically, even on exceptions\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": { + "execution": { + "iopub.execute_input": "2026-03-30T12:01:11.875149Z", + "iopub.status.busy": "2026-03-30T12:01:11.875093Z", + "iopub.status.idle": "2026-03-30T12:01:11.876950Z", + "shell.execute_reply": "2026-03-30T12:01:11.876744Z" + } + }, "outputs": [ { - "name": "stdout", + "name": "stderr", "output_type": "stream", "text": [ - "100.44 ms ± 6.78 ms\n" + "2026-03-30 17:05:12,076 - pylabrobot - INFO - === MT Scale tutorial ended ===\n", + "2026-03-30 17:05:12,078 - pylabrobot - INFO - [Mettler Toledo Scale] Disconnected (simulation)\n" ] } ], "source": [ - "import time\n", - "import numpy as np\n", - "\n", - "times = []\n", - "for i in range(10):\n", - " t0 = time.monotonic_ns()\n", - " await scale.read_weight(timeout=\"stable\")\n", - " t1 = time.monotonic_ns()\n", - " times.append((t1 - t0) / 1e6)\n", - "\n", - "print(f\"{np.mean(times):.2f} ms ± {np.std(times):.2f} ms\")\n" + "plr_logger.info(\"=== MT Scale tutorial ended ===\")\n", + "await scale.stop()" ] } ], "metadata": { "kernelspec": { - "display_name": "env", + "display_name": "plr", "language": "python", "name": "python3" }, @@ -320,9 +673,9 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.12" + "version": "3.12.11" } }, "nbformat": 4, "nbformat_minor": 2 -} +} \ No newline at end of file diff --git a/pylabrobot/scales/__init__.py b/pylabrobot/scales/__init__.py index 5e798de1388..e82615e20f3 100644 --- a/pylabrobot/scales/__init__.py +++ b/pylabrobot/scales/__init__.py @@ -1,7 +1,7 @@ -from pylabrobot.scales.chatterbox import ScaleChatterboxBackend -from pylabrobot.scales.mettler_toledo_backend import ( - MettlerToledoWXS205SDU, - MettlerToledoWXS205SDUBackend, -) +"""PyLabRobot scales package - frontend, backends, and error types.""" + +from pylabrobot.scales.chatterbox import ScaleChatterboxBackend # backwards compat +from pylabrobot.scales.mettler_toledo import MettlerToledoError, MettlerToledoWXS205SDUBackend from pylabrobot.scales.scale import Scale from pylabrobot.scales.scale_backend import ScaleBackend +from pylabrobot.scales.simulator import ScaleSimulator diff --git a/pylabrobot/scales/chatterbox.py b/pylabrobot/scales/chatterbox.py index 9ffb5a68e05..4cb22ea4810 100644 --- a/pylabrobot/scales/chatterbox.py +++ b/pylabrobot/scales/chatterbox.py @@ -1,24 +1,4 @@ -from pylabrobot.scales.scale_backend import ScaleBackend +"""Backwards-compatible import shim. Use pylabrobot.scales.simulator instead.""" - -class ScaleChatterboxBackend(ScaleBackend): - """Chatter box backend for device-free testing. Prints out all operations.""" - - def __init__(self, dummy_weight: float = 0.0) -> None: - self._dummy_weight = dummy_weight - - async def setup(self) -> None: - print("Setting up the scale.") - - async def stop(self) -> None: - print("Stopping the scale.") - - async def tare(self): - print("Taring the scale") - - async def read_weight(self) -> float: - print("Reading the weight") - return self._dummy_weight - - async def zero(self): - print("Zeroing the scale") +# TODO: remove after 2026-09 +from pylabrobot.scales.simulator import ScaleSimulator as ScaleChatterboxBackend # noqa: F401 diff --git a/pylabrobot/scales/mettler_toledo/__init__.py b/pylabrobot/scales/mettler_toledo/__init__.py new file mode 100644 index 00000000000..8e5b02db052 --- /dev/null +++ b/pylabrobot/scales/mettler_toledo/__init__.py @@ -0,0 +1,8 @@ +"""Mettler Toledo scale backend using the MT-SICS protocol.""" + +from pylabrobot.scales.mettler_toledo.backend import ( + MettlerToledoResponse, + MettlerToledoWXS205SDUBackend, +) +from pylabrobot.scales.mettler_toledo.errors import MettlerToledoError +from pylabrobot.scales.mettler_toledo.simulator import MettlerToledoSICSSimulator diff --git a/pylabrobot/scales/mettler_toledo/backend.py b/pylabrobot/scales/mettler_toledo/backend.py new file mode 100644 index 00000000000..2accae392bc --- /dev/null +++ b/pylabrobot/scales/mettler_toledo/backend.py @@ -0,0 +1,1111 @@ +"""Mettler Toledo scale backend using the MT-SICS (Mettler Toledo Standard Interface Command Set) serial protocol.""" + +# similar library: https://github.com/janelia-pypi/mettler_toledo_device_python + +import asyncio +import functools +import inspect +import logging +import shlex +import time +from dataclasses import dataclass, field +from typing import Any, Callable, List, Literal, Optional, Set, TypeVar, Union + +from pylabrobot.io.serial import Serial +from pylabrobot.io.validation_utils import LOG_LEVEL_IO +from pylabrobot.scales.mettler_toledo.confirmed_firmware_versions import CONFIRMED_FIRMWARE_VERSIONS +from pylabrobot.scales.mettler_toledo.errors import MettlerToledoError +from pylabrobot.scales.scale_backend import ScaleBackend + +logger = logging.getLogger("pylabrobot") + + +@dataclass +class MettlerToledoResponse: + """A single parsed MT-SICS response line. + + Format: [ ...] CR LF + See protocol.md for full format description. + """ + + command: str + status: str + data: List[str] = field(default_factory=list) + + +F = TypeVar("F", bound=Callable[..., Any]) + + +def requires_mt_sics_command(mt_sics_command: str) -> Callable[[F], F]: + """Decorator that gates a method on the connected device supporting a specific MT-SICS command. + + During setup(), the backend queries I0 to discover the full list of implemented commands. + Methods decorated with a command not in that list will raise MettlerToledoError. + + I0 is the definitive source of command support - I1 only reports which standardized + level sets are fully implemented, but individual commands may exist outside those levels. + """ + + def decorator(func: F) -> F: + @functools.wraps(func) + async def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: + if hasattr(self, "_supported_commands") and self._supported_commands is not None: + if mt_sics_command not in self._supported_commands: + raise MettlerToledoError( + title="Command not supported", + message=f"'{func.__name__}' requires MT-SICS command '{mt_sics_command}', " + f"which is not implemented on this device.", + ) + return await func(self, *args, **kwargs) + + # Register in the class-level command-to-method mapping. + _MT_SICS_COMMAND_REGISTRY[func.__name__] = mt_sics_command + return wrapper # type: ignore[return-value] + + return decorator + + +# Maps method name -> MT-SICS command string, populated by @requires_mt_sics_command. +_MT_SICS_COMMAND_REGISTRY: dict[str, str] = {} + + +# TODO: rename to MTSICSDriver in v1.0.0-beta +class MettlerToledoWXS205SDUBackend(ScaleBackend): + """Backend for Mettler Toledo scales using the MT-SICS protocol. + + MT-SICS (Mettler Toledo Standard Interface Command Set) is the serial communication + protocol used by Mettler Toledo's Automated Precision Weigh Modules. This backend is + compatible with any MT-SICS device, including the WXS, WMS, and WX series. + + During setup(), the backend queries I0 to discover which commands the connected device + supports, then queries I1/I2/I4 for device identity. Methods decorated with + ``@requires_mt_sics_command`` will raise ``MettlerToledoError`` if the required command + is not in the device's I0 command list. + + Tested on the WXS205SDU (used by Hamilton in the Liquid Verification Kit). + + Spec: https://web.archive.org/web/20240208213802/https://www.mt.com/dam/ + product_organizations/industry/apw/generic/11781363_N_MAN_RM_MT-SICS_APW_en.pdf + + From the spec (Section 2.2): + "If several commands are sent in succession without waiting for the corresponding + responses, it is possible that the weigh module/balance confuses the sequence of + command processing or ignores entire commands." + """ + + # === Constructor === + + def __init__(self, port: Optional[str] = None, vid: int = 0x0403, pid: int = 0x6001): + """Create a new MT-SICS backend. + + Args: + port: Serial port path. If None, auto-detected by VID:PID. + vid: USB vendor ID (default 0x0403 = FTDI). + pid: USB product ID (default 0x6001 = FT232R). + """ + super().__init__() + + self.io = Serial( + human_readable_device_name="Mettler Toledo Scale", + port=port, + vid=vid, + pid=pid, + baudrate=9600, + timeout=1, + ) + + async def setup(self) -> None: + """Connect to the scale, reset to clean state, discover identity and supported commands.""" + await self.io.setup() + + # Reset device to clean state (spec Section 2.2) + # reset() clears the input buffer and sends @, which returns the serial number + self.serial_number = await self.reset() + + # Discover supported commands via I0 (the definitive source per spec Section 2.2) + self._supported_commands: Set[str] = await self._request_supported_commands() + + # Device identity (Level 0 - always available) + # Note: device_type and capacity both use I2 but are separate methods intentionally - + # single-responsibility per method, the duplicate I2 round-trip during one-time setup is fine. + self.device_type = await self.request_device_type() + self.capacity = await self.request_capacity() + + # Firmware version and configuration + self.firmware_version = await self.request_firmware_version() + # I2 device_type encodes the configuration: "WXS205SDU WXA-Bridge" = bridge only + self.configuration = "Bridge" if "Bridge" in self.device_type else "Balance" + + logger.info( + "[%s] Connected on %s\n" + "Device type: %s\n" + "Configuration: %s\n" + "Serial number: %s\n" + "Firmware: %s\n" + "Capacity: %.1f g\n" + "Supported commands (%d): %s", + self.io._human_readable_device_name, + self.io.port, + self.device_type, + self.configuration, + self.serial_number, + self.firmware_version, + self.capacity, + len(self._supported_commands), + ", ".join(sorted(self._supported_commands)), + ) + + # Check major.minor version only (TDNR varies by hardware revision) + fw_version_short = self.firmware_version.split()[0] if self.firmware_version else "" + if fw_version_short not in CONFIRMED_FIRMWARE_VERSIONS: + logger.warning( + "[%s] Firmware version %r has not been tested with this driver. " + "Confirmed versions: %s. " + "If this version works correctly, please contribute it to " + "confirmed_firmware_versions.py so others can benefit.", + self.io._human_readable_device_name, + self.firmware_version, + ", ".join(sorted(CONFIRMED_FIRMWARE_VERSIONS)), + ) + + # Set output unit to grams + if "M21" in self._supported_commands: + await self.set_host_unit_grams() + + async def stop(self) -> None: + """Reset the device to a clean state and close the serial connection. + + Sends @ to cancel any pending commands before disconnecting. If the + serial port is already broken (e.g. kernel crash), the reset is skipped + and the port is closed anyway. + """ + try: + await self.reset() + except (OSError, TimeoutError, MettlerToledoError): + logger.warning( + "[%s] Could not reset device before disconnecting", self.io._human_readable_device_name + ) + logger.info("[%s] Disconnected from %s", self.io._human_readable_device_name, self.io.port) + await self.io.stop() + + def serialize(self) -> dict: + return {**super().serialize(), "port": self.io.port} + + # === Device discovery === + + async def _request_supported_commands(self) -> Set[str]: + """Query all implemented MT-SICS commands via I0 (Level 0 - always available). + + I0 is the definitive source of command support per spec Section 2.2. + I1 only reports which standardized level sets are fully implemented, + but individual commands may exist outside those levels. + + Returns a set of MT-SICS command strings (e.g. {"@", "S", "SI", "Z", "M21", "M28"}). + """ + responses = await self.send_command("I0") + commands: Set[str] = set() + for resp in responses: + # Format: I0 B/A + if len(resp.data) >= 2: + commands.add(resp.data[1]) + return commands + + def request_supported_methods(self) -> List[str]: + """Return the names of all backend methods supported by the connected device. + + Uses the I0 command list (populated during setup) to determine which + decorated methods can be called without raising MettlerToledoError. + Undecorated methods (Level 0/1) are always included. + """ + supported: List[str] = [] + for name in sorted(inspect.getmembers(self, predicate=inspect.ismethod), key=lambda m: m[0]): + method_name = name[0] + if method_name.startswith("_"): + continue + if method_name in _MT_SICS_COMMAND_REGISTRY: + mt_cmd = _MT_SICS_COMMAND_REGISTRY[method_name] + if hasattr(self, "_supported_commands") and mt_cmd in self._supported_commands: + supported.append(method_name) + else: + supported.append(method_name) + return supported + + # === Response parsing === + + @staticmethod + def _validate_response(response: MettlerToledoResponse, min_fields: int, command: str) -> None: + """Validate that a parsed response has the expected minimum total field count. + + min_fields counts all fields (command + status + data). For example, + a weight response "S S 0.00006 g" has 4 fields total. + + Raises: + MettlerToledoError: if the response has fewer fields than expected. + """ + total = 1 + (1 if response.status else 0) + len(response.data) + if total < min_fields: + raise MettlerToledoError( + title="Unexpected response", + message=f"Expected at least {min_fields} fields for '{command}', got {total}: {response}", + ) + + @staticmethod + def _validate_unit(unit: str, command: str) -> None: + """Validate that the unit in a response is grams. + + Raises: + MettlerToledoError: if the unit is not 'g'. + """ + if unit != "g": + raise MettlerToledoError( + title="Unexpected unit", + message=f"Expected 'g' for '{command}', got '{unit}'", + ) + + def _parse_basic_errors(self, response: MettlerToledoResponse) -> None: + """Helper function for parsing basic errors that are common to many commands. If an error is + detected, a 'MettlerToledoError' exception is raised. + + Error commands (ES, ET, EL) have status="" and no data. + Status codes I, L, +, - indicate command-specific errors. + + Note: B status (multi-response) is handled by send_command, which reads all lines + until status A. Each line is validated through this method individually. + """ + + # General error messages: ES, ET, EL (status is "" for these) + if response.command == "ES": + raise MettlerToledoError.syntax_error() + if response.command == "ET": + raise MettlerToledoError.transmission_error() + if response.command == "EL": + raise MettlerToledoError.logical_error() + + # Status code errors + if response.status == "I": + raise MettlerToledoError.executing_another_command() + if response.status == "L": + raise MettlerToledoError.incorrect_parameter() + if response.status == "+": + raise MettlerToledoError.overload() + if response.status == "-": + raise MettlerToledoError.underload() + + # Weight response error: S S Error + if ( + response.command == "S" + and response.status == "S" + and len(response.data) >= 2 + and response.data[0] == "Error" + ): + error_code = response.data[1] + code, source = error_code[:-1], error_code[-1] + from_terminal = source == "t" + if code == "1": + raise MettlerToledoError.boot_error(from_terminal=from_terminal) + if code == "2": + raise MettlerToledoError.brand_error(from_terminal=from_terminal) + if code == "3": + raise MettlerToledoError.checksum_error(from_terminal=from_terminal) + if code == "9": + raise MettlerToledoError.option_fail(from_terminal=from_terminal) + if code == "10": + raise MettlerToledoError.eeprom_error(from_terminal=from_terminal) + if code == "11": + raise MettlerToledoError.device_mismatch(from_terminal=from_terminal) + if code == "12": + raise MettlerToledoError.hot_plug_out(from_terminal=from_terminal) + if code == "14": + raise MettlerToledoError.weight_module_electronic_mismatch(from_terminal=from_terminal) + if code == "15": + raise MettlerToledoError.adjustment_needed(from_terminal=from_terminal) + raise MettlerToledoError( + title="Unknown weight error", + message=f"Unrecognized error code '{error_code}' in weight response", + ) + + # === Command Layer === + + async def send_command(self, command: str, timeout: int = 60) -> List[MettlerToledoResponse]: + """Send a command to the scale and read all response lines. + + Single-response commands (status A) return a list of one parsed line. + Multi-response commands (status B) return all lines, reading until status A. + + Args: + timeout: The timeout in seconds (applies across all response lines). + """ + + logger.log(LOG_LEVEL_IO, "[%s] Sent command: %s", self.io._human_readable_device_name, command) + await self.io.write(command.encode() + b"\r\n") + + try: + responses: List[MettlerToledoResponse] = [] + timeout_time = time.time() + timeout + while True: + while True: + raw_response = await self.io.readline() + if raw_response != b"": + break + if time.time() > timeout_time: + raise TimeoutError("Timeout while waiting for response from scale.") + await asyncio.sleep(0.001) + + logger.log( + LOG_LEVEL_IO, + "[%s] Received response: %s", + self.io._human_readable_device_name, + raw_response, + ) + fields = shlex.split(raw_response.decode("utf-8").strip()) + if len(fields) >= 2: + response = MettlerToledoResponse(command=fields[0], status=fields[1], data=fields[2:]) + elif len(fields) == 1: + response = MettlerToledoResponse(command=fields[0], status="", data=[]) + else: + response = MettlerToledoResponse(command="", status="", data=[]) + self._parse_basic_errors(response) + responses.append(response) + + # Status B means more responses follow; anything else (A, etc.) is final + if response.status != "B": + break + + return responses + + except (KeyboardInterrupt, asyncio.CancelledError): + # Cancel pending commands without resetting device state (zero/tare). + # Use C (cancel all) if available; otherwise just flush the buffer. + # Never send @ here - it clears zero/tare which the user wants to keep. + if hasattr(self, "_supported_commands") and "C" in self._supported_commands: + logger.warning( + "[%s] Command interrupted, sending C to cancel pending commands", + self.io._human_readable_device_name, + ) + await self.io.write(b"C\r\n") + else: + logger.warning( + "[%s] Command interrupted, flushing serial buffer", + self.io._human_readable_device_name, + ) + await self.io.reset_input_buffer() + raise + + # === Public API === + # Organized by function: cancel, identity, zero, tare, weight, measurement, + # configuration (read), display, configuration (write). + + # # Reset and cancel # # + + async def reset(self) -> str: + """@ - Reset the device to a determined state (spec Section 2.2). + + Equivalent to a power cycle: empties volatile memories, resets key control + to default. Tare memory is NOT reset. Always executed, even when busy. + + Returns the serial number from the I4-style response. + """ + await self.io.reset_input_buffer() + responses = await self.send_command("@") + # @ responds with I4-style: I4 A "" + self._validate_response(responses[0], 3, "@") + return responses[0].data[0] + + @requires_mt_sics_command("C") + async def cancel_all(self) -> None: + """C - Cancel all active and pending interface commands. + + Unlike reset() (@), this does not reset the device - it only cancels + commands that were requested via this interface. Typically used to stop + repeating commands (SIR, SR) or abort adjustment procedures. + + This is a multi-response command: the device sends C B (started) then + C A (complete). Both responses are consumed to keep the serial buffer clean. + """ + responses = await self.send_command("C") + # send_command reads both C B (started) and C A (complete) automatically + self._validate_response(responses[0], 2, "C") + if responses[0].status == "E": + raise MettlerToledoError( + title="Error while canceling", + message=f"C command returned error: {responses[0]}", + ) + + # # Device identity # # + + async def request_serial_number(self) -> str: + """Get the serial number of the scale. (I4 command)""" + responses = await self.send_command("I4") + self._validate_response(responses[0], 3, "I4") + return responses[0].data[0] + + async def request_device_type(self) -> str: + """Query the device type string. (I2 command) + + The I2 response packs type, capacity, and unit into a single quoted string: + I2 A "WXS205SDU WXA-Bridge 220.00900 g" + The type is everything before the last two tokens (capacity and unit). + """ + responses = await self.send_command("I2") + self._validate_response(responses[0], 3, "I2") + parts = responses[0].data[0].split() + return " ".join(parts[:-2]) + + async def request_capacity(self) -> float: + """Query the maximum weighing capacity in grams. (I2 command) + + The I2 response packs type, capacity, and unit into a single quoted string: + I2 A "WXS205SDU WXA-Bridge 220.00900 g" + Capacity is the second-to-last token, unit is the last. + """ + responses = await self.send_command("I2") + self._validate_response(responses[0], 3, "I2") + parts = responses[0].data[0].split() + self._validate_unit(parts[-1], "I2") + return float(parts[-2]) + + async def request_firmware_version(self) -> str: + """Query the firmware version and type definition number. (I3 command) + + Returns the version string (e.g. "1.10 18.6.4.1361.772"). + For bridge mode (no terminal), returns the bridge firmware version. + """ + responses = await self.send_command("I3") + self._validate_response(responses[0], 3, "I3") + return responses[0].data[0] + + async def request_software_material_number(self) -> str: + """Query the software material number (SW-ID). (I5 command) + + Unique per software release: 8-digit number + alphabetic index. + For bridge mode (no terminal), returns the bridge SW-ID. + """ + responses = await self.send_command("I5") + self._validate_response(responses[0], 3, "I5") + return responses[0].data[0] + + @requires_mt_sics_command("I10") + async def request_device_id(self) -> str: + """Query the user-assigned device identification string. (I10 command) + + This is a user-configurable name (max 20 chars) to identify + individual scales in multi-scale setups. Retained after @ cancel. + """ + responses = await self.send_command("I10") + self._validate_response(responses[0], 3, "I10") + return responses[0].data[0] + + @requires_mt_sics_command("I10") + async def set_device_id(self, device_id: str) -> None: + """Set the user-assigned device identification string. (I10 command) + + Max 20 alphanumeric characters. Persists across power cycles. + Useful for labeling individual scales in multi-scale setups. + """ + await self.send_command(f'I10 "{device_id}"') + + @requires_mt_sics_command("I11") + async def request_model_designation(self) -> str: + """Query the model designation string. (I11 command) + + Returns the weigh module model type (e.g. "WMS404C-L/10"). + Abbreviations: DR=Delta Range, DU=Dual Range, /M or /A=Approved. + """ + responses = await self.send_command("I11") + self._validate_response(responses[0], 3, "I11") + return responses[0].data[0] + + @requires_mt_sics_command("I14") + async def request_device_info(self, category: int = 0) -> List[MettlerToledoResponse]: + """Query detailed device information for a specific category. (I14 command) + + Args: + category: Information category to query: + 0 = instrument configuration (Bridge, Terminal, Option) + 1 = instrument descriptions (model names) + 2 = SW identification numbers + 3 = SW versions + 4 = serial numbers + 5 = TDNR (type definition) numbers + + Returns multi-response with data for each component (bridge, terminal, etc.). + """ + return await self.send_command(f"I14 {category}") + + @requires_mt_sics_command("I15") + async def request_uptime_minutes(self) -> int: + """Query the uptime in minutes since last start or restart. (I15 command) + + Returns the number of minutes the device has been running since + the last power-on, start, or reset. Accuracy +/- 5%. + """ + responses = await self.send_command("I15") + self._validate_response(responses[0], 3, "I15") + return int(responses[0].data[0]) + + @requires_mt_sics_command("DAT") + async def request_date(self) -> str: + """Query the current date from the device. (DAT command) + + Response format: DAT A . + Returns the date as "DD.MM.YYYY". + """ + responses = await self.send_command("DAT") + self._validate_response(responses[0], 5, "DAT") + day, month, year = responses[0].data[0], responses[0].data[1], responses[0].data[2] + return f"{day}.{month}.{year}" + + @requires_mt_sics_command("DAT") + async def set_date(self, day: int, month: int, year: int) -> None: + """Set the device date. (DAT command) + + Args: + day: Day (1-31). + month: Month (1-12). + year: Year (2020-2099, platform-dependent). + """ + await self.send_command(f"DAT {day:02d} {month:02d} {year}") + + @requires_mt_sics_command("TIM") + async def request_time(self) -> str: + """Query the current time from the device. (TIM command) + + Response format: TIM A . + Returns the time as "HH:MM:SS". + """ + responses = await self.send_command("TIM") + self._validate_response(responses[0], 5, "TIM") + hour, minute, second = responses[0].data[0], responses[0].data[1], responses[0].data[2] + return f"{hour}:{minute}:{second}" + + @requires_mt_sics_command("TIM") + async def set_time(self, hour: int, minute: int, second: int) -> None: + """Set the device time. (TIM command) + + Persists across power cycles. Only reset via FSET or terminal menu, not @. + + Args: + hour: Hour (0-23). + minute: Minute (0-59). + second: Second (0-59). + """ + await self.send_command(f"TIM {hour:02d} {minute:02d} {second:02d}") + + @requires_mt_sics_command("I16") + async def request_next_service_date(self) -> str: + """Query the date when the balance is next due to be serviced. (I16 command) + + Returns the date as "DD.MM.YYYY". + """ + responses = await self.send_command("I16") + self._validate_response(responses[0], 5, "I16") + day, month, year = responses[0].data[0], responses[0].data[1], responses[0].data[2] + return f"{day}.{month}.{year}" + + @requires_mt_sics_command("I21") + async def request_assortment_type_revision(self) -> str: + """Query the revision of assortment type tolerances. (I21 command)""" + responses = await self.send_command("I21") + self._validate_response(responses[0], 3, "I21") + return responses[0].data[0] + + @requires_mt_sics_command("I26") + async def request_operating_mode_after_restart(self) -> List[MettlerToledoResponse]: + """Query the operating mode after restart. (I26 command)""" + return await self.send_command("I26") + + # # Zero # # + + async def zero_immediately(self) -> List[MettlerToledoResponse]: + """Zero the scale immediately. (ZI command)""" + return await self.send_command("ZI") + + async def zero_stable(self) -> List[MettlerToledoResponse]: + """Zero the scale when the weight is stable. (Z command)""" + return await self.send_command("Z") + + @requires_mt_sics_command("ZC") + async def zero_timeout(self, timeout: float) -> List[MettlerToledoResponse]: + """Zero the scale after a given timeout. (ZC command)""" + timeout_ms = int(timeout * 1000) + return await self.send_command(f"ZC {timeout_ms}") + + async def zero(self, timeout: Union[Literal["stable"], float, int] = "stable") -> None: + """Zero the scale. + + Args: + timeout: "stable" waits for stable reading, 0 zeros immediately, + float/int zeros after that many seconds. + """ + if timeout == "stable": + await self.zero_stable() + elif not isinstance(timeout, (float, int)): + raise TypeError("timeout must be a float or 'stable'") + elif timeout < 0: + raise ValueError("timeout must be greater than or equal to 0") + elif timeout == 0: + await self.zero_immediately() + else: + await self.zero_timeout(timeout) + + # # Tare # # + + async def tare_stable(self) -> List[MettlerToledoResponse]: + """Tare the scale when the weight is stable. (T command)""" + return await self.send_command("T") + + async def tare_immediately(self) -> List[MettlerToledoResponse]: + """Tare the scale immediately. (TI command)""" + return await self.send_command("TI") + + @requires_mt_sics_command("TC") + async def tare_timeout(self, timeout: float) -> List[MettlerToledoResponse]: + """Tare the scale after a given timeout. (TC command)""" + timeout_ms = int(timeout * 1000) + return await self.send_command(f"TC {timeout_ms}") + + async def tare(self, timeout: Union[Literal["stable"], float, int] = "stable") -> None: + """Tare the scale. + + Args: + timeout: "stable" waits for stable reading, 0 tares immediately, + float/int tares after that many seconds. + """ + if timeout == "stable": + await self.tare_stable() + elif not isinstance(timeout, (float, int)): + raise TypeError("timeout must be a float or 'stable'") + elif timeout < 0: + raise ValueError("timeout must be greater than or equal to 0") + elif timeout == 0: + await self.tare_immediately() + else: + await self.tare_timeout(timeout) + + async def request_tare_weight(self) -> float: + """Query tare weight value from scale's memory. (TA command)""" + responses = await self.send_command("TA") + self._validate_response(responses[0], 4, "TA") + self._validate_unit(responses[0].data[1], "TA") + return float(responses[0].data[0]) + + async def clear_tare(self) -> List[MettlerToledoResponse]: + """Clear tare weight value. (TAC command)""" + return await self.send_command("TAC") + + # # Weight measurement # # + + async def read_stable_weight(self) -> float: + """Read a stable weight value from the scale. (MEASUREMENT command) + + from the docs: + + "Use S to send a stable weight value, along with the host unit, from the balance to + the connected communication partner via the interface. If the automatic door function + is enabled and a stable weight is requested the balance will open and close the balance's + doors to achieve a stable weight." + """ + + responses = await self.send_command("S") + self._validate_response(responses[0], 4, "S") + self._validate_unit(responses[0].data[1], "S") + return float(responses[0].data[0]) + + @requires_mt_sics_command("SC") + async def read_dynamic_weight(self, timeout: float) -> float: + """Read a stable weight value within a given timeout, or return the current + weight value if stability is not reached. (SC command) + + Args: + timeout: The timeout in seconds. + """ + timeout_ms = int(timeout * 1000) + responses = await self.send_command(f"SC {timeout_ms}") + self._validate_response(responses[0], 4, "SC") + self._validate_unit(responses[0].data[1], "SC") + return float(responses[0].data[0]) + + async def read_weight_value_immediately(self) -> float: + """Read a weight value immediately from the scale. (SI command)""" + responses = await self.send_command("SI") + self._validate_response(responses[0], 4, "SI") + self._validate_unit(responses[0].data[1], "SI") + return float(responses[0].data[0]) + + async def read_weight(self, timeout: Union[Literal["stable"], float, int] = "stable") -> float: + """High level function to read a weight value from the scale. (MEASUREMENT command) + + Args: + timeout: The timeout in seconds. If "stable", the scale will return a weight value when the + weight is stable. If 0, the scale will return a weight value immediately. If a float/int, + the scale will return a weight value after the given timeout (in seconds). + """ + + if timeout == "stable": + return await self.read_stable_weight() + + if not isinstance(timeout, (float, int)): + raise TypeError("timeout must be a float or 'stable'") + + if timeout < 0: + raise ValueError("timeout must be greater than or equal to 0") + + if timeout == 0: + return await self.read_weight_value_immediately() + + return await self.read_dynamic_weight(timeout) + + @requires_mt_sics_command("M28") + async def measure_temperature(self) -> float: + """Read the current temperature from the scale's internal sensor in degrees C. (M28 command) + + The number of temperature sensors depends on the product. This method returns + the value from the first sensor. Useful for gravimetric verification where + temperature affects liquid density and evaporation rate. + """ + responses = await self.send_command("M28") + self._validate_response(responses[0], 4, "M28") + return float(responses[0].data[1]) + + @requires_mt_sics_command("SIS") + async def request_net_weight_with_status(self) -> MettlerToledoResponse: + """Query net weight with unit and weighing status in one call. (SIS command) + + Response data fields: + + - data[0] = State: 0=stable, 1=dynamic, 2=stable inaccurate (MinWeigh), 3=dynamic inaccurate, 4=overload, 5=underload, 6=error + - data[1] = Net weight value + - data[2] = Unit code: 0=g, 1=kg, 3=mg, 4=ug, 5=ct, 7=lb, 8=oz, etc. + - data[3] = Readability (number of decimal places, 0-6) + - data[4] = Step: 1, 2, 5, 10, 20, 50, or 100 + - data[5] = Approval: 0=standard (not approved), 1=e=d, 10=e=10d, 100=e=100d, -1=unapproved + - data[6] = Info: 0=without tare, 1=net with weighed tare, 2=net with stored tare + """ + responses = await self.send_command("SIS") + return responses[0] + + @requires_mt_sics_command("SNR") + async def read_stable_weight_repeat_on_change(self) -> List[MettlerToledoResponse]: + """Start sending stable weight values on every stable weight change. (SNR command) + + The device sends a new value each time the weight changes and stabilizes. + Use reset() to stop. + """ + return await self.send_command("SNR") + + # # Device configuration (read-only) # # + + @requires_mt_sics_command("M01") + async def request_weighing_mode(self) -> int: + """Query the current weighing mode. (M01 command) + + Returns: 0=Normal/Universal, 1=Dosing, 2=Sensor, 3=Check weighing, 6=Raw/No filter. + """ + responses = await self.send_command("M01") + self._validate_response(responses[0], 3, "M01") + return int(responses[0].data[0]) + + # @requires_mt_sics_command("M01") + # async def set_weighing_mode(self, mode: int) -> None: + # """Set weighing mode. (M01) WRITES TO DEVICE MEMORY.""" + # await self.send_command(f"M01 {mode}") + + @requires_mt_sics_command("M02") + async def request_environment_condition(self) -> int: + """Query the current environment condition setting. (M02 command) + + Returns: 0=Very stable, 1=Stable, 2=Standard, 3=Unstable, 4=Very unstable, 5=Automatic. + Affects the scale's internal filter and stability detection. + """ + responses = await self.send_command("M02") + self._validate_response(responses[0], 3, "M02") + return int(responses[0].data[0]) + + # @requires_mt_sics_command("M02") + # async def set_environment_condition(self, condition: int) -> None: + # """Set environment condition. (M02) WRITES TO DEVICE MEMORY.""" + # await self.send_command(f"M02 {condition}") + + @requires_mt_sics_command("M03") + async def request_auto_zero(self) -> int: + """Query the current auto zero setting. (M03 command) + + Returns: 0=off, 1=on. Auto zero compensates for slow drift + (e.g. evaporation, temperature changes) by automatically + re-zeroing when the weight is near zero and stable. + """ + responses = await self.send_command("M03") + self._validate_response(responses[0], 3, "M03") + return int(responses[0].data[0]) + + # @requires_mt_sics_command("M03") + # async def set_auto_zero(self, enabled: int) -> None: + # """Set auto zero. (M03) WRITES TO DEVICE MEMORY.""" + # await self.send_command(f"M03 {enabled}") + + @requires_mt_sics_command("M17") + async def request_profact_time_criteria(self) -> List[MettlerToledoResponse]: + """Query ProFACT single time criteria. (M17 command)""" + return await self.send_command("M17") + + # @requires_mt_sics_command("M17") + # async def set_profact_time_criteria(self, ...) -> None: + # """Set ProFACT time criteria. (M17) WRITES TO DEVICE MEMORY.""" + # ... + + @requires_mt_sics_command("M18") + async def request_profact_temperature_criterion(self) -> List[MettlerToledoResponse]: + """Query ProFACT/FACT temperature criterion. (M18 command)""" + return await self.send_command("M18") + + # @requires_mt_sics_command("M18") + # async def set_profact_temperature_criterion(self, ...) -> None: + # """Set ProFACT temperature criterion. (M18) WRITES TO DEVICE MEMORY.""" + # ... + + @requires_mt_sics_command("M19") + async def request_adjustment_weight(self) -> List[MettlerToledoResponse]: + """Query the adjustment weight setting. (M19 command)""" + return await self.send_command("M19") + + # @requires_mt_sics_command("M19") + # async def set_adjustment_weight(self, ...) -> None: + # """Set adjustment weight. (M19) WRITES TO DEVICE MEMORY.""" + # ... + + @requires_mt_sics_command("M20") + async def request_test_weight(self) -> List[MettlerToledoResponse]: + """Query the test weight setting. (M20 command)""" + return await self.send_command("M20") + + # @requires_mt_sics_command("M20") + # async def set_test_weight(self, ...) -> None: + # """Set test weight. (M20) WRITES TO DEVICE MEMORY.""" + # ... + + @requires_mt_sics_command("M29") + async def request_weighing_value_release(self) -> List[MettlerToledoResponse]: + """Query the weighing value release setting. (M29 command)""" + return await self.send_command("M29") + + # @requires_mt_sics_command("M29") + # async def set_weighing_value_release(self, ...) -> None: + # """Set weighing value release. (M29) WRITES TO DEVICE MEMORY.""" + # ... + + @requires_mt_sics_command("M31") + async def request_operating_mode(self) -> List[MettlerToledoResponse]: + """Query the operating mode after restart. (M31 command)""" + return await self.send_command("M31") + + # @requires_mt_sics_command("M31") + # async def set_operating_mode(self, ...) -> None: + # """Set operating mode after restart. (M31) WRITES TO DEVICE MEMORY.""" + # ... + + @requires_mt_sics_command("M32") + async def request_profact_time(self) -> List[MettlerToledoResponse]: + """Query ProFACT time criteria. (M32 command)""" + return await self.send_command("M32") + + # @requires_mt_sics_command("M32") + # async def set_profact_time(self, ...) -> None: + # """Set ProFACT time. (M32) WRITES TO DEVICE MEMORY.""" + # ... + + @requires_mt_sics_command("M33") + async def request_profact_day(self) -> List[MettlerToledoResponse]: + """Query ProFACT day of the week. (M33 command)""" + return await self.send_command("M33") + + # @requires_mt_sics_command("M33") + # async def set_profact_day(self, ...) -> None: + # """Set ProFACT day of the week. (M33) WRITES TO DEVICE MEMORY.""" + # ... + + @requires_mt_sics_command("M35") + async def request_zeroing_mode(self) -> List[MettlerToledoResponse]: + """Query the zeroing mode at startup. (M35 command)""" + return await self.send_command("M35") + + # @requires_mt_sics_command("M35") + # async def set_zeroing_mode(self, ...) -> None: + # """Set zeroing mode at startup. (M35) WRITES TO DEVICE MEMORY.""" + # ... + + @requires_mt_sics_command("UPD") + async def request_update_rate(self) -> float: + """Query the current update rate for SIR/SIRU streaming. (UPD command) + + Returns the update rate in values per second. + """ + responses = await self.send_command("UPD") + self._validate_response(responses[0], 3, "UPD") + return float(responses[0].data[0]) + + # @requires_mt_sics_command("UPD") + # async def set_update_rate(self, rate: float) -> None: + # """Set streaming update rate. (UPD) WRITES TO DEVICE MEMORY.""" + # await self.send_command(f"UPD {rate}") + + @requires_mt_sics_command("C0") + async def request_adjustment_setting(self) -> List[MettlerToledoResponse]: + """Query the current adjustment setting. (C0 command)""" + return await self.send_command("C0") + + # @requires_mt_sics_command("C0") + # async def set_adjustment_setting(self, ...) -> None: + # """Set adjustment setting. (C0) WRITES TO DEVICE MEMORY.""" + # ... + + @requires_mt_sics_command("COM") + async def request_serial_parameters(self) -> List[MettlerToledoResponse]: + """Query current serial interface parameters. (COM command)""" + return await self.send_command("COM") + + # @requires_mt_sics_command("COM") + # async def set_serial_parameters(self, ...) -> None: + # """Set serial port parameters. (COM) WRITES TO DEVICE MEMORY. + # WARNING: changing baud rate will lose communication.""" + # ... + + @requires_mt_sics_command("FCUT") + async def request_filter_cutoff(self) -> List[MettlerToledoResponse]: + """Query the filter cut-off frequency. (FCUT command)""" + return await self.send_command("FCUT") + + # @requires_mt_sics_command("FCUT") + # async def set_filter_cutoff(self, frequency: float) -> None: + # """Set filter cut-off frequency. (FCUT) WRITES TO DEVICE MEMORY.""" + # await self.send_command(f"FCUT {frequency}") + + @requires_mt_sics_command("USTB") + async def request_stability_criteria(self) -> List[MettlerToledoResponse]: + """Query the user-defined stability criteria. (USTB command)""" + return await self.send_command("USTB") + + # @requires_mt_sics_command("USTB") + # async def set_stability_criteria(self, ...) -> None: + # """Set stability criteria. (USTB) WRITES TO DEVICE MEMORY.""" + # ... + + @requires_mt_sics_command("TST0") + async def request_test_settings(self) -> List[MettlerToledoResponse]: + """Query current test function settings. (TST0 command)""" + return await self.send_command("TST0") + + # @requires_mt_sics_command("TST0") + # async def set_test_settings(self, ...) -> None: + # """Set test function settings. (TST0) WRITES TO DEVICE MEMORY.""" + # ... + + @requires_mt_sics_command("I50") + async def request_remaining_weighing_range(self) -> float: + """Query remaining maximum weighing range in grams. (I50 command) + + Returns the remaining capacity accounting for all loads currently on the + weighing platform (pre-load, tare, net load). A negative value means the + maximum weighing range has been exceeded. + + Multi-response: the device sends up to 3 lines (B, B, A). + """ + responses = await self.send_command("I50") + self._validate_response(responses[0], 5, "I50") + self._validate_unit(responses[0].data[2], "I50") + return float(responses[0].data[1]) + + @requires_mt_sics_command("M27") + async def request_adjustment_history(self) -> List[MettlerToledoResponse]: + """Query the adjustment (calibration) history. (M27 command) + + Returns multi-response with each adjustment entry containing: + entry number, date, time, mode (0=built-in, 1=external), and weight used. + """ + return await self.send_command("M27") + + @requires_mt_sics_command("LST") + async def request_user_settings(self) -> List[MettlerToledoResponse]: + """Query all current user-configurable settings. (LST command) + + Returns a multi-response listing every configurable parameter and its value. + """ + return await self.send_command("LST") + + @requires_mt_sics_command("RDB") + async def request_readability(self) -> List[MettlerToledoResponse]: + """Query the readability setting. (RDB command)""" + return await self.send_command("RDB") + + # # Display # # + + @requires_mt_sics_command("D") + async def set_display_text(self, text: str) -> List[MettlerToledoResponse]: + """Write text to the display. (D command) + + Use set_weight_display() to restore the normal weight display. + """ + return await self.send_command(f'D "{text}"') + + @requires_mt_sics_command("DW") + async def set_weight_display(self) -> List[MettlerToledoResponse]: + """Restore the normal weight display. (DW command)""" + return await self.send_command("DW") + + # # Configuration (write - no corresponding query) # # + + @requires_mt_sics_command("M21") + async def set_host_unit_grams(self) -> List[MettlerToledoResponse]: + """Set the host output unit to grams. (M21 command) + + Called automatically during setup() if supported. + """ + return await self.send_command("M21 0 0") + + # # Commented out - standalone write commands # # + # + # @requires_mt_sics_command("FSET") + # async def factory_reset(self, exclusion: int = 0) -> None: + # """Reset ALL settings to factory defaults. (FSET) DESTRUCTIVE.""" + # await self.send_command(f"FSET {exclusion}") + + # # Commented out - require physical interaction or architecture changes # # + # + # @requires_mt_sics_command("C1") + # async def start_adjustment(self) -> List[MettlerToledoResponse]: + # """Start adjustment. (C1) Moves internal calibration weights.""" + # return await self.send_command("C1") + # + # @requires_mt_sics_command("C2") + # async def start_adjustment_external_weight(self) -> List[MettlerToledoResponse]: + # """Adjust with external weight. (C2) Requires placing calibration weight.""" + # return await self.send_command("C2") + # + # @requires_mt_sics_command("C3") + # async def start_adjustment_builtin_weight(self) -> List[MettlerToledoResponse]: + # """Adjust with built-in weight. (C3) Moves internal weights.""" + # return await self.send_command("C3") + # + # @requires_mt_sics_command("TST1") + # async def start_test(self) -> List[MettlerToledoResponse]: + # """Run test according to current settings. (TST1) Moves internal weights.""" + # return await self.send_command("TST1") + # + # @requires_mt_sics_command("TST2") + # async def start_test_external_weight(self) -> List[MettlerToledoResponse]: + # """Run test with external weight. (TST2) Requires placing test weight.""" + # return await self.send_command("TST2") + # + # @requires_mt_sics_command("TST3") + # async def start_test_builtin_weight(self) -> List[MettlerToledoResponse]: + # """Run test with built-in weight. (TST3) Moves internal weights.""" + # return await self.send_command("TST3") + # + # @requires_mt_sics_command("SIR") + # async def read_weight_immediately_repeat(self) -> ...: + # """Stream weight values at update rate. (SIR) Needs async iterator.""" + # ... + # + # @requires_mt_sics_command("SR") + # async def read_stable_weight_repeat(self) -> ...: + # """Stream stable weight on change. (SR) Needs async iterator.""" + # ... diff --git a/pylabrobot/scales/mettler_toledo/backend_tests.py b/pylabrobot/scales/mettler_toledo/backend_tests.py new file mode 100644 index 00000000000..18b510b2364 --- /dev/null +++ b/pylabrobot/scales/mettler_toledo/backend_tests.py @@ -0,0 +1,380 @@ +"""Tests for MT-SICS response parsing, validation, and protocol simulation.""" + +import unittest + +from pylabrobot.scales.mettler_toledo.backend import ( + MettlerToledoResponse, + MettlerToledoWXS205SDUBackend, +) +from pylabrobot.scales.mettler_toledo.errors import MettlerToledoError +from pylabrobot.scales.mettler_toledo.simulator import MettlerToledoSICSSimulator +from pylabrobot.scales.scale import Scale + +R = MettlerToledoResponse + + +class MTSICSResponseParsingTests(unittest.TestCase): + """Tests for response parsing helpers - no hardware or simulator needed.""" + + def setUp(self): + self.backend = MettlerToledoWXS205SDUBackend.__new__(MettlerToledoWXS205SDUBackend) + + def test_parse_errors_ES_ET_EL(self): + """General error codes (ES, ET, EL) must raise the correct MettlerToledoError. + These are the first line of defense against protocol-level failures.""" + with self.assertRaises(MettlerToledoError) as ctx: + self.backend._parse_basic_errors(R("ES", "")) + self.assertIn("Syntax error", str(ctx.exception)) + + with self.assertRaises(MettlerToledoError) as ctx: + self.backend._parse_basic_errors(R("ET", "")) + self.assertIn("Transmission error", str(ctx.exception)) + + with self.assertRaises(MettlerToledoError) as ctx: + self.backend._parse_basic_errors(R("EL", "")) + self.assertIn("Logical error", str(ctx.exception)) + + def test_parse_errors_status_codes(self): + """Command-specific status codes (I, L, +, -) must raise descriptive errors. + These catch device-busy, bad parameters, and overload/underload conditions.""" + with self.assertRaises(MettlerToledoError) as ctx: + self.backend._parse_basic_errors(R("S", "I")) + self.assertIn("not executable at present", str(ctx.exception)) + + with self.assertRaises(MettlerToledoError) as ctx: + self.backend._parse_basic_errors(R("S", "L")) + self.assertIn("incorrect parameter", str(ctx.exception)) + + with self.assertRaises(MettlerToledoError) as ctx: + self.backend._parse_basic_errors(R("S", "+")) + self.assertIn("overload", str(ctx.exception)) + + with self.assertRaises(MettlerToledoError) as ctx: + self.backend._parse_basic_errors(R("S", "-")) + self.assertIn("underload", str(ctx.exception)) + + def test_validate_response_rejects_short(self): + """Responses with fewer fields than expected must be rejected. + Prevents silent IndexError when accessing data fields.""" + with self.assertRaises(MettlerToledoError): + MettlerToledoWXS205SDUBackend._validate_response(R("I4", "A"), 3, "I4") + + # should not raise + MettlerToledoWXS205SDUBackend._validate_response(R("I4", "A", ["B207696838"]), 3, "I4") + + def test_validate_unit_rejects_wrong(self): + """Non-gram unit responses must be rejected. + The backend assumes grams throughout - a wrong unit would produce wrong values.""" + with self.assertRaises(MettlerToledoError): + MettlerToledoWXS205SDUBackend._validate_unit("kg", "S") + + # should not raise + MettlerToledoWXS205SDUBackend._validate_unit("g", "S") + + def test_parse_errors_passes_valid_success(self): + """A valid success response (status A) must not raise. + Ensures the happy path is not accidentally blocked.""" + self.backend._parse_basic_errors(R("Z", "A")) + + def test_parse_errors_weight_response_error(self): + """S S Error responses (hardware faults detected during weighing) must raise. + These indicate boot errors, EEPROM failures, etc. on the physical device.""" + with self.assertRaises(MettlerToledoError) as ctx: + self.backend._parse_basic_errors(R("S", "S", ["Error", "10b"])) + self.assertIn("EEPROM error", str(ctx.exception)) + + def test_dataclass_construction(self): + """MettlerToledoResponse dataclass must correctly separate command, status, and data. + This is the foundation for all response access throughout the backend.""" + resp = R("S", "S", ["0.00006", "g"]) + self.assertEqual(resp.command, "S") + self.assertEqual(resp.status, "S") + self.assertEqual(resp.data, ["0.00006", "g"]) + + # Error-only response (no status) + resp = R("ES", "") + self.assertEqual(resp.command, "ES") + self.assertEqual(resp.status, "") + self.assertEqual(resp.data, []) + + +class MTSICSSimulatorTests(unittest.IsolatedAsyncioTestCase): + """Tests for the MT-SICS protocol simulator. + These exercise the full stack: send_command -> mock response -> parse -> validate -> return. + Catches dataclass construction bugs, index mapping errors, and response format mismatches.""" + + async def asyncSetUp(self): + self.backend = MettlerToledoSICSSimulator() + self.scale = Scale( + name="test_scale", + backend=self.backend, + size_x=0, + size_y=0, + size_z=0, + ) + await self.scale.setup() + + async def asyncTearDown(self): + await self.scale.stop() + + async def test_setup_populates_device_identity(self): + """setup() must populate device_type, serial_number, capacity, and MT-SICS levels. + If any of these are missing, downstream methods that depend on them will fail.""" + self.assertEqual(self.backend.device_type, "WXS205SDU WXA-Bridge") + self.assertEqual(self.backend.serial_number, "SIM0000001") + self.assertEqual(self.backend.capacity, 220.0) + self.assertIn("S", self.backend._supported_commands) + self.assertIn("M28", self.backend._supported_commands) + + async def test_tare_workflow_through_protocol(self): + """Full tare workflow through the MT-SICS protocol layer. + Verifies that mock responses are correctly constructed, parsed through + _parse_basic_errors, and returned as the right value.""" + self.backend.platform_weight = 50.0 + await self.scale.tare() + self.backend.sample_weight = 10.0 + weight = await self.scale.read_weight() + self.assertEqual(weight, 10.0) + + async def test_read_weight_returns_correct_value(self): + """read_weight must return the net weight (sensor - zero_offset - tare). + Tests the data[0] index mapping after the dataclass change.""" + self.backend.platform_weight = 25.0 + weight = await self.scale.read_weight(timeout=0) + self.assertEqual(weight, 25.0) + + async def test_request_capacity_from_i2(self): + """request_capacity must parse the capacity from the I2 response data[1] field. + Wrong index mapping would return the device type string instead of a float.""" + capacity = await self.backend.request_capacity() + self.assertEqual(capacity, 220.0) + + async def test_i50_multi_response(self): + """I50 returns 3 response lines (B, B, A). send_command must read all of them + and return the correct remaining range from the first line.""" + self.backend.platform_weight = 50.0 + remaining = await self.backend.request_remaining_weighing_range() + self.assertEqual(remaining, 170.0) + + async def test_reset_returns_serial_number(self): + """reset() sends @ which responds with I4-style (command echo is I4, not @). + Must correctly parse the serial number despite the unusual response format.""" + sn = await self.backend.reset() + self.assertEqual(sn, "SIM0000001") + + async def test_cancel_all(self): + """cancel_all() sends C which returns multi-response (C B, C A). + Must consume both lines without raising.""" + await self.backend.cancel_all() + + async def test_unknown_command_returns_syntax_error(self): + """Unknown commands must return ES (syntax error) response. + Ensures the simulator correctly simulates the device rejecting invalid commands.""" + with self.assertRaises(MettlerToledoError) as ctx: + await self.backend.send_command("XYZNOTREAL") + self.assertIn("Syntax error", str(ctx.exception)) + + async def test_measure_temperature(self): + """measure_temperature must return a float from the M28 response. + Requires M28 in the device's I0 command list.""" + temp = await self.backend.measure_temperature() + self.assertEqual(temp, 22.5) + + async def test_measure_temperature_blocked_when_unsupported(self): + """measure_temperature must raise when M28 is not in the device's command list. + Validates that I0-based command gating works correctly.""" + backend = MettlerToledoSICSSimulator( + supported_commands={"@", "I0", "I2", "I4", "S", "SI", "Z", "ZI", "T", "TI", "TA", "TAC"}, + ) + scale = Scale(name="limited_scale", backend=backend, size_x=0, size_y=0, size_z=0) + await scale.setup() + with self.assertRaises(MettlerToledoError) as ctx: + await backend.measure_temperature() + self.assertIn("M28", str(ctx.exception)) + self.assertIn("not implemented", str(ctx.exception)) + + async def test_uptime_returns_minutes(self): + """I15 returns uptime in minutes since last start or restart. + The spec shows I15 A with accuracy +/- 5%.""" + minutes = await self.backend.request_uptime_minutes() + self.assertEqual(minutes, 1440) # Simulator returns 24 hours + + async def test_configuration_bridge_detection(self): + """Device type containing 'Bridge' must set configuration to 'Bridge'. + This determines which commands are expected to work (no display commands + in bridge mode).""" + backend = MettlerToledoSICSSimulator(device_type="WXS205SDU WXA-Bridge") + scale = Scale(name="s", backend=backend, size_x=0, size_y=0, size_z=0) + await scale.setup() + self.assertEqual(backend.configuration, "Bridge") + + async def test_shlex_preserves_quoted_strings_with_spaces(self): + """The I2 response packs type, capacity, and unit into one quoted string. + shlex must keep the quoted content as a single token. This bug broke + hardware validation before the shlex fix.""" + # The simulator returns I2 as a single data field (matching shlex behavior) + device_type = await self.backend.request_device_type() + self.assertIsInstance(device_type, str) + capacity = await self.backend.request_capacity() + self.assertEqual(capacity, 220.0) + + async def test_multi_response_terminates_on_status_a(self): + """send_command must keep reading while status is B and stop on A. + I50 returns 3 lines (B, B, A). All must be captured.""" + responses = await self.backend.send_command("I50") + self.assertEqual(len(responses), 3) + self.assertEqual(responses[0].status, "B") + self.assertEqual(responses[1].status, "B") + self.assertEqual(responses[2].status, "A") + + async def test_zero_stable_dispatches_to_z(self): + """zero(timeout='stable') must send the Z command (wait for stable), + not ZI (immediate) or ZC (timed).""" + self.backend.platform_weight = 5.0 + await self.scale.zero(timeout="stable") + # After zeroing, reading should return 0 + weight = await self.scale.read_weight(timeout=0) + self.assertEqual(weight, 0.0) + + async def test_clear_tare_resets_to_zero(self): + """clear_tare (TAC) must reset the stored tare value to zero. + After clearing, request_tare_weight must return 0.""" + self.backend.platform_weight = 50.0 + await self.scale.tare() + tare = await self.scale.request_tare_weight() + self.assertEqual(tare, 50.0) + await self.backend.clear_tare() + tare_after = await self.scale.request_tare_weight() + self.assertEqual(tare_after, 0.0) + + async def test_b_status_does_not_raise(self): + """Status B (more responses follow) must not be treated as an error. + If _parse_basic_errors raises on B, all multi-response commands break.""" + self.backend._parse_basic_errors(R("I50", "B", ["0", "535.141", "g"])) + + async def test_request_weighing_mode(self): + """request_weighing_mode must return an integer from the M01 response. + Verifies Batch 2 M01 query parsing.""" + mode = await self.backend.request_weighing_mode() + self.assertEqual(mode, 0) # Normal weighing mode + + # -- Identity and diagnostics -- + + async def test_request_firmware_version(self): + """Firmware version must return a non-empty string. + Hardware returns '1.10 18.6.4.1361.772' - validates I3 parsing.""" + version = await self.backend.request_firmware_version() + self.assertIsInstance(version, str) + self.assertGreater(len(version), 0) + + async def test_request_model_designation(self): + """Model designation must return the device type string. + Hardware returns 'WXS205SDU' - validates I11 parsing.""" + model = await self.backend.request_model_designation() + self.assertEqual(model, "WXS205SDU") + + # -- Setup and teardown -- + + async def test_setup_populates_firmware_version(self): + """setup() must query I3 and store the firmware version. + The firmware check is new and untested - if it breaks, the + firmware warning logic fails silently.""" + self.assertIsNotNone(self.backend.firmware_version) + self.assertGreater(len(self.backend.firmware_version), 0) + + async def test_setup_populates_configuration(self): + """setup() must detect 'Bridge' for default simulator (WXS205SDU WXA-Bridge). + Drives which commands are expected to work on the device.""" + self.assertEqual(self.backend.configuration, "Bridge") + + # -- Weight dispatch -- + + async def test_read_weight_timeout_zero_dispatches_to_si(self): + """read_weight(timeout=0) must use SI (immediate read). + If it dispatches to S (stable) instead, the call blocks waiting + for stability that may never come.""" + self.backend.platform_weight = 7.5 + weight = await self.scale.read_weight(timeout=0) + self.assertEqual(weight, 7.5) + + async def test_read_weight_rejects_negative_timeout(self): + """Negative timeout must raise ValueError. + Without this guard, a negative timeout would be converted to + a negative millisecond value and sent to the device.""" + with self.assertRaises(ValueError): + await self.scale.read_weight(timeout=-1) + + async def test_tare_timeout_zero_dispatches_to_ti(self): + """tare(timeout=0) must use TI (immediate tare). + Validates the timeout dispatcher sends the right MT-SICS command.""" + self.backend.platform_weight = 30.0 + await self.scale.tare(timeout=0) + tare = await self.scale.request_tare_weight() + self.assertEqual(tare, 30.0) + + # -- Batch 2 configuration queries -- + + async def test_request_environment_condition(self): + """Environment condition must return an integer. + Hardware returned 2 (Standard) - validates M02 parsing.""" + env = await self.backend.request_environment_condition() + self.assertEqual(env, 2) + + async def test_request_auto_zero(self): + """Auto zero setting must return an integer. + Hardware returned 0 (off) - validates M03 parsing.""" + auto_zero = await self.backend.request_auto_zero() + self.assertEqual(auto_zero, 1) # Simulator default is on + + async def test_request_update_rate(self): + """Update rate must return a float in values per second. + Hardware returned 10.173 - validates UPD parsing.""" + rate = await self.backend.request_update_rate() + self.assertEqual(rate, 18.3) # Simulator default + + # -- SIS response format -- + + async def test_sis_response_has_seven_fields(self): + """SIS must return 7 data fields: state, weight, unit, readability, + step, approval, info. Format confirmed from spec p.234-235 and + hardware validation.""" + resp = await self.backend.request_net_weight_with_status() + self.assertEqual(len(resp.data), 7) + self.assertEqual(resp.data[2], "0") # unit code 0 = grams + self.assertEqual(resp.data[3], "5") # readability = 5 decimal places + + async def test_sis_tare_info_field_tracks_state(self): + """SIS data[6] must be '0' without tare, '1' with weighed tare. + Validates the simulator correctly tracks tare state in the SIS response.""" + resp_no_tare = await self.backend.request_net_weight_with_status() + self.assertEqual(resp_no_tare.data[6], "0") # no tare + + self.backend.platform_weight = 50.0 + await self.scale.tare() + resp_with_tare = await self.backend.request_net_weight_with_status() + self.assertEqual(resp_with_tare.data[6], "1") # weighed tare + + # -- I50 physics simulation -- + + async def test_i50_remaining_range_computed_from_capacity(self): + """I50 remaining range must equal capacity minus total sensor reading. + Validates the simulator computes this correctly.""" + self.backend.platform_weight = 50.0 + self.backend.sample_weight = 10.0 + remaining = await self.backend.request_remaining_weighing_range() + self.assertEqual(remaining, 160.0) # 220 - 60 + + # -- I0 command discovery -- + + async def test_i0_returns_supported_commands_set(self): + """_request_supported_commands must parse I0 multi-response into a Set[str]. + This is the foundation of all command gating.""" + commands = await self.backend._request_supported_commands() + self.assertIsInstance(commands, set) + self.assertIn("S", commands) + self.assertIn("M28", commands) + self.assertNotIn("NONEXISTENT", commands) + + +if __name__ == "__main__": + unittest.main() diff --git a/pylabrobot/scales/mettler_toledo/confirmed_firmware_versions.py b/pylabrobot/scales/mettler_toledo/confirmed_firmware_versions.py new file mode 100644 index 00000000000..a34f346e35c --- /dev/null +++ b/pylabrobot/scales/mettler_toledo/confirmed_firmware_versions.py @@ -0,0 +1,16 @@ +"""Firmware versions confirmed to work with this driver. + +The firmware version is queried via the I3 command (request_firmware_version) during setup(). +If the connected device runs a version not in this list, a warning +is logged. Please report untested versions that work so they can +be added. + +Only the major.minor version is checked (e.g. "1.10"), not the full +I3 response string (e.g. "1.10 18.6.4.1361.772"), because the second +part is a type definition number that varies by hardware revision and +model while the firmware behavior is determined by the version number. +""" + +CONFIRMED_FIRMWARE_VERSIONS = [ + "1.10", +] diff --git a/pylabrobot/scales/mettler_toledo/errors.py b/pylabrobot/scales/mettler_toledo/errors.py new file mode 100644 index 00000000000..59ee3a38d9d --- /dev/null +++ b/pylabrobot/scales/mettler_toledo/errors.py @@ -0,0 +1,137 @@ +"""MT-SICS error types and response codes (spec Sections 2.1.3.1 - 2.1.3.3).""" + +from typing import Optional + + +class MettlerToledoError(Exception): + """Exceptions raised by a Mettler Toledo scale.""" + + def __init__(self, title: str, message: Optional[str] = None) -> None: + self.title = title + self.message = message + + def __str__(self) -> str: + return f"{self.title}: {self.message}" + + # -- General errors (spec Section 2.1.3.2) -- + + @staticmethod + def unknown_error() -> "MettlerToledoError": + return MettlerToledoError(title="Unknown error", message="An unknown error occurred") + + @staticmethod + def syntax_error() -> "MettlerToledoError": + return MettlerToledoError( + title="Syntax error", + message="The weigh module/balance has not recognized the received command or the command is " + "not allowed", + ) + + @staticmethod + def transmission_error() -> "MettlerToledoError": + return MettlerToledoError( + title="Transmission error", + message="The weigh module/balance has received a 'faulty' command, e.g. owing to a parity " + "error or interface break", + ) + + @staticmethod + def logical_error() -> "MettlerToledoError": + return MettlerToledoError( + title="Logical error", + message="The weigh module/balance can not execute the received command", + ) + + # -- Command-specific status codes (spec Section 2.1.3.1) -- + + @staticmethod + def executing_another_command() -> "MettlerToledoError": + return MettlerToledoError( + title="Command not understood, not executable at present", + message=( + "Command understood but currently not executable (balance is " + "currently executing another command)." + ), + ) + + @staticmethod + def incorrect_parameter() -> "MettlerToledoError": + return MettlerToledoError( + title="Command understood but not executable", + message="(incorrect parameter).", + ) + + @staticmethod + def overload() -> "MettlerToledoError": + return MettlerToledoError(title="Balance in overload range.", message=None) + + @staticmethod + def underload() -> "MettlerToledoError": + return MettlerToledoError(title="Balance in underload range.", message=None) + + # -- Weight response error codes (spec Section 2.1.3.3) -- + + @staticmethod + def boot_error(from_terminal: bool) -> "MettlerToledoError": + return MettlerToledoError( + title="Boot error", + message="from terminal" if from_terminal else "from electronics", + ) + + @staticmethod + def brand_error(from_terminal: bool) -> "MettlerToledoError": + return MettlerToledoError( + title="Brand error", + message="from terminal" if from_terminal else "from electronics", + ) + + @staticmethod + def checksum_error(from_terminal: bool) -> "MettlerToledoError": + return MettlerToledoError( + title="Checksum error", + message="from terminal" if from_terminal else "from electronics", + ) + + @staticmethod + def option_fail(from_terminal: bool) -> "MettlerToledoError": + return MettlerToledoError( + title="Option fail", + message="from terminal" if from_terminal else "from electronics", + ) + + @staticmethod + def eeprom_error(from_terminal: bool) -> "MettlerToledoError": + return MettlerToledoError( + title="EEPROM error", + message="from terminal" if from_terminal else "from electronics", + ) + + @staticmethod + def device_mismatch(from_terminal: bool) -> "MettlerToledoError": + return MettlerToledoError( + title="Device mismatch", + message="from terminal" if from_terminal else "from electronics", + ) + + @staticmethod + def hot_plug_out(from_terminal: bool) -> "MettlerToledoError": + return MettlerToledoError( + title="Hot plug out", + message="from terminal" if from_terminal else "from electronics", + ) + + @staticmethod + def weight_module_electronic_mismatch( + from_terminal: bool, + ) -> "MettlerToledoError": + return MettlerToledoError( + title="Weight module / electronic mismatch", + message="from terminal" if from_terminal else "from electronics", + ) + + @staticmethod + def adjustment_needed(from_terminal: bool) -> "MettlerToledoError": + return MettlerToledoError( + title="Adjustment needed", + message="from terminal" if from_terminal else "from electronics", + ) diff --git a/pylabrobot/scales/mettler_toledo/mt_sics_commands.md b/pylabrobot/scales/mettler_toledo/mt_sics_commands.md new file mode 100644 index 00000000000..a4f2cb7b13d --- /dev/null +++ b/pylabrobot/scales/mettler_toledo/mt_sics_commands.md @@ -0,0 +1,321 @@ +# MT-SICS Command Reference + +MT-SICS = Mettler Toledo Standard Interface Command Set + +Commands organized by level and ranked by utility for PyLabRobot integration. +Source: MT-SICS Interface Command Set for Automated Precision Weigh Modules (spec doc). + +**Important:** I1 reports which standardized level sets are fully implemented, but +individual commands may exist outside those levels. I0 is the definitive source of +command support. During setup(), the backend queries I0 to discover all available +commands and gates methods via `@requires_mt_sics_command`. + +**Hardware-validated on WXS205SDU WXA-Bridge (S/N: B207696838, firmware: 1.10):** +I1 reports levels [0, 1] but I0 discovers 62 commands across levels 0-3. +Commands not in I0 (C, D, DW, SC, ZC, TC, I50) return ES (syntax error). + +Status key: +- DONE = implemented in backend.py (read active; set active or commented out per write safety) +- STUB = commented out entirely (requires physical interaction) +- HIGH = high priority for implementation +- MED = medium priority +- LOW = low priority / niche use case +- N/A = not applicable to automation use case +- WXS205SDU column: supported/not supported on our test device + +## Level 0 - Basic Set (always available) + +| Command | Description | Spec Page | Status | WXS205SDU | Notes | +|---------|------------------------------------------|-----------|--------|-----------|-------| +| @ | Reset device to determined state | 16 | DONE | yes | reset(). Sent during setup(). Response is I4-style. | +| I0 | List all implemented commands + levels | 96 | DONE | yes | _request_supported_commands(). Queried during setup(). | +| I1 | MT-SICS level and level versions | 97 | DONE | yes | Not used for gating - I0 is authoritative. | +| I2 | Device data (type and capacity) | 98 | DONE | yes | request_device_type() and request_capacity(). Response is one quoted string parsed with shlex. | +| I3 | Firmware version and type definition | 99 | DONE | yes | request_firmware_version(). Returns "1.10 18.6.4.1361.772" on test device. | +| I4 | Serial number | 100 | DONE | yes | request_serial_number(). | +| I5 | Software material number | 101 | DONE | yes | request_software_material_number(). Returns "11671158C" on test device. | +| S | Stable weight value | 223 | DONE | yes | read_stable_weight(). | +| SI | Weight value immediately | 225 | DONE | yes | read_weight_value_immediately(). | +| SIR | Weight immediately + repeat | 232 | MED | yes | Continuous streaming. Needs async iterator architecture. | +| SIRU | Weight immediately + repeat (display unit) | - | LOW | - | Streaming variant in display unit. | +| Z | Zero (wait for stable) | 272 | DONE | yes | zero_stable(). | +| ZI | Zero immediately | 274 | DONE | yes | zero_immediately(). | + +## Level 1 - Elementary Commands (always available) + +| Command | Description | Spec Page | Status | WXS205SDU | Notes | +|---------|------------------------------------------|-----------|--------|-----------|-------| +| C | Cancel all pending commands | 23 | DONE | **no** | cancel_all(). Not supported on WXS205SDU bridge. | +| D | Write text to display | 52 | DONE | **no** | set_display_text(). Not supported in bridge mode (no terminal). | +| DW | Show weight on display | 61 | DONE | **no** | set_weight_display(). Not supported in bridge mode. | +| K | Keys control | 153 | LOW | - | Lock/unlock terminal keys. | +| SC | Stable or dynamic value after timeout | 224 | DONE | **no** | read_dynamic_weight(). Not supported on WXS205SDU. | +| SR | Stable weight + repeat on any change | 245 | MED | yes | Continuous streaming. Needs async iterator architecture. | +| SRU | Stable weight + repeat (display unit) | 247 | LOW | - | | +| T | Tare (wait for stable) | 252 | DONE | yes | tare_stable(). | +| TA | Tare weight value (query/set) | 253 | DONE | yes | request_tare_weight(). | +| TAC | Clear tare weight value | 254 | DONE | yes | clear_tare(). | +| TC | Tare with timeout | 255 | DONE | **no** | tare_timeout(). Not supported on WXS205SDU. | +| TI | Tare immediately | 257 | DONE | yes | tare_immediately(). | +| ZC | Zero with timeout | 273 | DONE | **no** | zero_timeout(). Not supported on WXS205SDU. | + +## Level 2 - Extended Commands (model-dependent) + +### Device Information (query) + +| Command | Description | Spec Page | Status | WXS205SDU | Notes | +|---------|------------------------------------------|-----------|--------|-----------|-------| +| I10 | Device identification | 102 | DONE | yes | request_device_id() and set_device_id(). Labels individual scales in multi-scale setups. | +| I11 | Model designation | 103 | DONE | yes | request_model_designation(). Returns "WXS205SDU" on test device. | +| I14 | Device information (detailed) | 104 | DONE | yes | request_device_info(). Multi-response with config, descriptions, SW IDs, serial numbers. | +| I15 | Uptime in minutes since start/restart | 106 | DONE | yes | request_uptime_minutes(). Returns minutes, accuracy +/- 5%. | +| I16 | Date of next service | 107 | DONE | yes | request_next_service_date(). | +| I21 | Revision of assortment type tolerances | 108 | DONE | yes | request_assortment_type_revision(). | +| I26 | Operating mode after restart | - | DONE | yes | request_operating_mode_after_restart(). Not in spec but on WXS205SDU via I0. | +| I27 | Undocumented | - | LOW | - | In spec TOC but no documentation found. | +| I29 | Filter configuration | 111 | LOW | - | | +| I32 | Voltage monitoring | 112 | MED | - | | +| I43 | Selectable units for host unit | 113 | LOW | - | | +| I44 | Selectable units for display unit | 114 | LOW | - | | +| I45 | Selectable environment filter settings | 115 | LOW | - | | +| I46 | Selectable weighing modes | 117 | LOW | - | | +| I47 | Switch-on range | 118 | LOW | - | | +| I48 | Initial zero range | 119 | LOW | - | | +| I50 | Remaining weighing ranges | 120 | DONE | **no** | request_remaining_weighing_range(). Not on WXS205SDU. | +| I51 | Power-on time | 121 | MED | - | | +| I52 | Auto zero activation settings | 122 | LOW | - | | +| I54 | Adjustment loads | 125 | LOW | - | | +| I55 | Menu version | 126 | LOW | - | | +| I56 | Undocumented | - | LOW | - | In spec TOC but no documentation found. | +| I59 | Initial zero information | 129 | LOW | - | | +| I62 | Timeout setting | 131 | LOW | - | | +| I65 | Total operating time | 132 | MED | - | | +| I66 | Total load weighed | 133 | MED | - | | +| I67 | Total number of weighings | 134 | MED | - | | +| I69 | Service provider address | 135 | LOW | - | | +| I71 | One time adjustment status | 136 | LOW | - | | +| I73 | Sign off | 137 | LOW | - | | +| I74 | GEO code at calibration point (HighRes) | 138 | LOW | - | | +| I75 | GEO code at point of use (HighRes) | 139 | LOW | - | | +| I76 | Total voltage exceeds | 140 | LOW | - | | +| I77 | Total load cycles | 141 | MED | - | | +| I78 | Zero deviation | 143 | LOW | - | | +| I79 | Total zero deviation exceeds | 144 | LOW | - | | +| I80 | Total temperature exceeds | 145 | LOW | - | | +| I81 | Temperature gradient | 147 | LOW | - | | +| I82 | Total temperature gradient exceeds | 148 | LOW | - | | +| I83 | Software identification | 149 | LOW | - | | +| I100 | Active stability criteria | 151 | LOW | - | | +| I101 | Humidity value | 152 | LOW | - | | + +### Configuration (read/write) + +| Command | Description | Spec Page | Status | WXS205SDU | Notes | +|---------|------------------------------------------|-----------|--------|-----------|-------| +| M01 | Weighing mode | 157 | DONE | yes | request_weighing_mode() (read). set commented out (persists to memory). | +| M02 | Environment condition | 158 | DONE | yes | request_environment_condition() (read). set commented out (persists to memory). | +| M03 | Auto zero function | 159 | DONE | yes | request_auto_zero() (read). set commented out (persists to memory). | +| M21 | Unit (host/display) | 165 | DONE | yes | set_host_unit_grams(). | +| M23 | Readability (1d/xd) | 169 | LOW | - | | +| M28 | Temperature value | 172 | DONE | yes | measure_temperature(). Returns 19.8-19.9 C on test device. | +| M29 | Weighing value release | - | DONE | yes | request_weighing_value_release() (read). set commented out (persists to memory). | +| M35 | Zeroing mode at startup | 178 | DONE | yes | request_zeroing_mode() (read). set commented out (persists to memory). | +| M49 | Permanent tare mode | 188 | LOW | - | | +| M67 | Timeout | 191 | LOW | - | | +| M68 | Behavior of serial interfaces | 192 | LOW | - | | +| COM | Serial interface parameters | 46 | DONE | yes | request_serial_parameters(). set commented out (persists to memory). | +| ECHO | Echo mode | 66 | LOW | - | | +| LST | Current user settings | 156 | DONE | yes | request_user_settings(). Level 3 on WXS205SDU. | +| PROT | Protocol mode | 220 | LOW | - | | + +### Adjustment / Calibration + +| Command | Description | Spec Page | Status | WXS205SDU | Notes | +|---------|------------------------------------------|-----------|--------|-----------|-------| +| C0 | Adjustment setting | 24 | DONE | yes | request_adjustment_setting() (read). set commented out (persists to memory). | +| C1 | Start adjustment (current settings) | 26 | STUB | yes | Commented out (moves internal weights). Multi-response. | +| C2 | Start adjustment (external weight) | 28 | STUB | yes | Commented out (requires placing external weight). | +| C3 | Start adjustment (built-in weight) | 30 | STUB | yes | Commented out (moves internal weights). Multi-response. | +| C4 | Standard / initial adjustment | 31 | LOW | - | | +| C5 | Enable/disable step control | 33 | LOW | - | | +| C6 | Customer linearization + sensitivity | 34 | LOW | - | | +| C7 | Customer standard calibration | 37 | LOW | - | | +| C8 | Sensitivity adjustment | 40 | LOW | - | | +| C9 | Scale placement sensitivity adjustment | 43 | LOW | - | | +| M19 | Adjustment weight | 163 | DONE | yes | request_adjustment_weight() (read). set commented out (persists to memory). | +| M20 | Test weight | - | DONE | yes | request_test_weight() (read). set commented out (persists to memory). | +| M27 | Adjustment history | 171 | DONE | yes | request_adjustment_history(). Multi-response. | + +### Testing + +| Command | Description | Spec Page | Status | WXS205SDU | Notes | +|---------|------------------------------------------|-----------|--------|-----------|-------| +| TST0 | Query/set test function settings | 259 | DONE | yes | request_test_settings() (read). set commented out (persists to memory). | +| TST1 | Test according to current settings | 260 | STUB | yes | Commented out (moves internal weights). | +| TST2 | Test with external weight | 262 | STUB | yes | Commented out (requires placing test weight). | +| TST3 | Test with built-in weight | 264 | STUB | yes | Commented out (moves internal weights). | +| TST5 | Module test with built-in weights | 265 | LOW | - | | + +### Weight Variants (alternative read commands) + +| Command | Description | Spec Page | Status | WXS205SDU | Notes | +|---------|------------------------------------------|-----------|--------|-----------|-------| +| SIC1 | Weight with CRC16 immediately | 226 | LOW | - | | +| SIC2 | HighRes weight with CRC16 immediately | 227 | LOW | - | | +| SIS | Net weight with unit + weighing status | 234 | DONE | yes | request_net_weight_with_status(). | +| SIU | Weight in display unit immediately | 237 | LOW | - | | +| SIUM | Weight + MinWeigh info immediately | 238 | LOW | - | | +| SIX1 | Current gross, net, and tare values | 239 | HIGH | - | Not on WXS205SDU. | +| SNR | Stable weight + repeat on stable change | 241 | DONE | yes | read_stable_weight_repeat_on_change(). Use reset() to stop. | +| ST | Stable weight on Transfer key press | 249 | N/A | - | Manual operation. | +| SU | Stable weight in display unit | 250 | LOW | - | | +| SUM | Stable weight + MinWeigh info | 251 | LOW | - | | + +### Stored Weight + +| Command | Description | Spec Page | Status | WXS205SDU | Notes | +|---------|------------------------------------------|-----------|--------|-----------|-------| +| SIMC | Clear stored weight value | 228 | LOW | - | | +| SIMR | Recall stored weight value | 229 | LOW | - | | +| SIMRC | Recall and clear stored weight value | 230 | LOW | - | | +| SIMS | Store weight immediately | 231 | LOW | - | | + +### Date/Time + +| Command | Description | Spec Page | Status | WXS205SDU | Notes | +|---------|------------------------------------------|-----------|--------|-----------|-------| +| DAT | Date (query/set) | 53 | DONE | yes | request_date() and set_date(). Format: DAT A Day Month Year. | +| DATI | Date and time (query/set) | 54 | MED | - | Combined date+time. Not on WXS205SDU. | +| TIM | Time (query/set) | 258 | DONE | yes | request_time() and set_time(). Format: TIM A Hour Minute Second. Persists (not reset by @). | + +### Digital I/O + +| Command | Description | Spec Page | Status | WXS205SDU | Notes | +|---------|------------------------------------------|-----------|--------|-----------|-------| +| DIN | Configuration for digital inputs | 55 | LOW | - | | +| DIS | Digital input status | 56 | LOW | - | | +| DOS | Digital output status | 57 | LOW | - | | +| DOT | Configuration for digital outputs | 58 | LOW | - | | +| DOTC | Configurable digital outputs (weight) | 59 | LOW | - | | + +### System / Lifecycle + +| Command | Description | Spec Page | Status | WXS205SDU | Notes | +|---------|------------------------------------------|-----------|--------|-----------|-------| +| E01 | Current system error state | 62 | HIGH | - | Not on WXS205SDU. | +| E02 | Weighing device errors and warnings | 63 | HIGH | - | Not on WXS205SDU. | +| E03 | Current system errors and warnings | 65 | HIGH | - | Not on WXS205SDU. | +| FSET | Reset all settings to factory defaults | 95 | LOW | yes | Level 3 on WXS205SDU. Destructive. | +| RO1 | Restart device | 221 | MED | - | | +| RDB | Readability | 222 | DONE | yes | request_readability(). Level 3 on WXS205SDU. | +| UPD | Update rate for SIR/SIRU | 267 | DONE | yes | request_update_rate() (read). set commented out (persists to memory). | +| USTB | User defined stability criteria | 268 | DONE | yes | request_stability_criteria() (read). set commented out. Level 3 on WXS205SDU. | + +### Network (not relevant for serial) + +| Command | Description | Spec Page | Status | WXS205SDU | Notes | +|---------|------------------------------------------|-----------|--------|-----------|-------| +| I53 | IPv4 runtime network config | 123 | N/A | - | Ethernet only. | +| M69 | IPv4 network configuration mode | 193 | N/A | - | | +| M70 | IPv4 host address + netmask | 195 | N/A | - | | +| M71 | IPv4 default gateway | 197 | N/A | - | | +| M72 | IPv4 DNS server | 199 | N/A | - | | +| M109 | IPv4 managed network config | 204 | N/A | - | | +| M117 | TCP port number | 209 | N/A | - | | +| M118 | Fieldbus network stack type | 211 | N/A | - | | +| NID | Node identification | 218 | N/A | - | | +| NID2 | Device node ID | 219 | N/A | - | | + +### Application-Specific (Level 3, filling/dosing) + +| Command | Description | Spec Page | Status | WXS205SDU | Notes | +|---------|------------------------------------------|-----------|--------|-----------|-------| +| A01 | Percent weighing reference | 17 | N/A | - | Application mode. | +| A02 | Sample identification | 18 | N/A | - | | +| A03 | Sample name | 19 | N/A | - | | +| A06 | Dynamic weighing behavior | 20 | N/A | - | | +| A10 | Nominal, +Tolerance, -Tolerance | 21 | N/A | - | | +| A30 | Internal loads | 22 | N/A | - | | +| CW02 | Time for weighing | 48 | N/A | - | | +| CW03 | Triggered weight value | 50 | N/A | - | | +| CW11 | Check weighing: weight calculation mode | 51 | N/A | - | | +| F01-F16 | Filling functions (16 commands) | 69-91 | N/A | - | Filling/dosing application. | +| FCUT | Filter cut-off frequency | 92 | DONE | yes | request_filter_cutoff() (read). set commented out. Level 3 on WXS205SDU. | +| FCUT2 | Alt weight path cut-off frequency | 93 | N/A | - | | +| WMCF | Weight monitoring functions | 270 | N/A | - | | +| M17 | ProFACT: Single time criteria | 160 | DONE | yes | request_profact_time_criteria() (read). set commented out. Level 2 on WXS205SDU. | +| M18 | ProFACT/FACT: Temperature criterion | 162 | DONE | yes | request_profact_temperature_criterion() (read). set commented out. Level 2 on WXS205SDU. | +| M22 | Custom unit definitions | 168 | N/A | - | | +| M31 | Operating mode after restart | 174 | DONE | yes | request_operating_mode() (read). set commented out. Level 2 on WXS205SDU. | +| M32 | ProFACT: Time criteria | 175 | DONE | yes | request_profact_time() (read). set commented out. Level 2 on WXS205SDU. | +| M33 | ProFACT: Day of the week | 176 | DONE | yes | request_profact_day() (read). set commented out. Level 2 on WXS205SDU. | +| M34 | MinWeigh: Method | 177 | N/A | - | | +| M38 | Selective parameter reset | 179 | N/A | - | | +| M39 | SmartTrac: Graphic | 180 | N/A | - | | +| M43 | Custom unit | 181 | N/A | - | | +| M44 | Command after startup response | 182 | N/A | - | | +| M45 | RS422/485 line termination | 183 | N/A | - | | +| M47 | Frequently changed test weight settings | 184 | N/A | - | | +| M48 | Infrequently changed test weight settings| 186 | N/A | - | | +| M66 | GWP: Certified test weight settings | 189 | N/A | - | | +| M89 | Interface command set | 201 | N/A | - | | +| M103 | RS422/485 driver mode | 202 | N/A | - | | +| M110 | Change display resolution | 205 | N/A | - | | +| M111 | SAI Cyclic data format | 207 | N/A | - | | +| M116 | Ignore Ethernet initial parametrization | 208 | N/A | - | | +| M119 | Byte order mode for automation | 212 | N/A | - | | +| M124 | Power supply for daisy chain | 214 | N/A | - | | +| MOD | Various user modes | 215 | N/A | - | | +| MONH | Monitor on interface | 217 | N/A | - | | +| SNRU | Stable weight (display unit) + repeat | 243 | N/A | - | | + +## Implementation Summary + +The MT-SICS spec defines **194 commands** (counting F01-F16 as 16 individual commands). + +| Category | Count | Description | +|----------|-------|-------------| +| Backend (active) | 54 | Implemented and callable | +| Backend (commented out) | 27 | Set/write counterparts and physical interaction commands | +| Simulator | 55 | Handled in `_build_response` (all active commands except I0/I1 internal) | +| Not implemented | 113 | Not available on WXS205SDU or not applicable | + +### WXS205SDU coverage + +The WXS205SDU reports **62 commands** via I0. Of these: + +| State | Count | +|-------|-------| +| Active in backend | 49 | +| Commented out (physical/write) | 7 | +| Not implemented (streaming) | 2 (SIR, SR) | +| Undocumented (not in spec) | 4 (I22-I25) | + +### Expanding to other devices + +The remaining ~113 unimplemented spec commands (HIGH/MED/LOW/N/A in the table above) +are not available on the WXS205SDU and could not be validated. Integrating them +requires a developer with physical access to a device that supports the command, +to validate the response format and add a handler to both `backend.py` and +`simulator.py`. The pattern is: + +1. Confirm the command appears in the device's I0 list +2. Send the command and observe the raw response +3. Add a method to `backend.py` with `@requires_mt_sics_command` +4. Add a handler to `simulator.py` with an instance variable for the response value +5. Add a test to `backend_tests.py` + +### Remaining priorities + +**HIGH (not available on WXS205SDU):** +- E01/E02/E03 (error monitoring) +- SIX1 (gross, net, tare in one call) + +**MED (useful but not urgent):** +- SIR/SR (continuous streaming) - needs async iterator architecture +- DATI (date + time combined) - not on WXS205SDU + +**STUB (commented out, require physical interaction):** +- C1/C3 (internal weight adjustment) +- C2 (external weight adjustment) +- TST1-TST3 (test procedures) diff --git a/pylabrobot/scales/mettler_toledo/protocol.md b/pylabrobot/scales/mettler_toledo/protocol.md new file mode 100644 index 00000000000..b9ccb00c0bf --- /dev/null +++ b/pylabrobot/scales/mettler_toledo/protocol.md @@ -0,0 +1,191 @@ +# Protocol: MT-SICS (Mettler Toledo Standard Interface Command Set) + + + +## Overview + +| Property | Value | +|----------|-------| +| Protocol name | MT-SICS (Mettler Toledo Standard Interface Command Set) | +| Transport | Serial (RS-232) via USB-to-serial adapter | +| Encoding | ASCII text | +| Baud rate | 9600 | +| Line terminator | CR LF (`\r\n`, 0x0D 0x0A) | +| Direction | Half-duplex (send command, wait for response) | +| Spec document | [MT-SICS Reference Manual](https://web.archive.org/web/20240208213802/https://www.mt.com/dam/product_organizations/industry/apw/generic/11781363_N_MAN_RM_MT-SICS_APW_en.pdf) | + +## Command format (PLR to device) + +``` + [ ...] CR LF +``` + +- Commands are uppercase ASCII +- Parameters separated by spaces +- Quoted strings use `"text"` +- Each command must be followed by CR LF + +Examples: +``` +S\r\n -- read stable weight +ZI\r\n -- zero immediately +M21 0 0\r\n -- set host unit to grams +D "Hello"\r\n -- write text to display +``` + +## Response format (device to PLR) + +### Standard response (single line) + +``` + [ ...] [] CR LF +``` + +The response echoes the command name, followed by a status character, optional data fields, and an optional unit. + +### Status codes + +| Status | Meaning | +|--------|---------| +| `A` | Command executed successfully (final response) | +| `B` | Command not yet terminated, additional responses follow | +| `S` | Stable weight value | +| `D` | Dynamic (unstable) weight value | +| `I` | Command understood but not executable (device busy) | +| `L` | Logical error (parameter not allowed) | +| `+` | Overload (weighing range exceeded) | +| `-` | Underload (weighing pan not in place) | + +### Error responses (no status field) + +``` +ES CR LF -- syntax error (command not recognized) +ET CR LF -- transmission error (parity/break) +EL CR LF -- logical error (command cannot execute) +``` + +These are 2-character responses with no status field or data. + +### Weight response errors + +``` +S S Error CR LF +``` + +The weight value field is replaced with an error code when the device detects a hardware fault. See spec Section 2.1.3.3. + +## Multi-response commands + +Commands that return status `B` send multiple lines. The final line has status `A`. + +Example - I50 (remaining weighing ranges): +``` +PLR sends: I50\r\n +Device sends: I50 B 0 535.141 g\r\n -- RangeNo 0, more lines follow + I50 B 1 -18.973 g\r\n -- RangeNo 1, more lines follow + I50 A 2 335.465 g\r\n -- RangeNo 2, final response +``` + +Example - C (cancel all): +``` +PLR sends: C\r\n +Device sends: C B\r\n -- cancel started + C A\r\n -- cancel complete +``` + +`send_command()` reads all lines until it sees status `A` (or non-`B`). + +## Exceptions to the standard format + +### @ (reset) response echoes I4, not @ + +``` +PLR sends: @\r\n +Device sends: I4 A "B207696838"\r\n +``` + +The @ command resets the device to its power-on state and responds with the serial number using the I4 response format, not the @ command name. + +### Commands not supported on WXS205SDU (bridge mode) + +The following commands return `ES` (syntax error) on the WXS205SDU WXA-Bridge +because they are not in the device's I0 command list. They may work on other +MT-SICS devices or on the same model with a terminal attached. + +- `C` (cancel all), `SC` (timed read), `ZC` (timed zero), `TC` (timed tare) +- `D`, `DW` (display commands - no terminal in bridge mode) +- `I50` (remaining weighing range) + +### I2 response format + +The I2 response packs type, capacity, and unit into a single quoted string: +``` +I2 A "WXS205SDU WXA-Bridge 220.00900 g" +``` +The device type can contain spaces. Parse from the right: unit is the last +token, capacity is second-to-last, type is everything before. +`shlex.split` is used to handle quoted strings correctly. + +### I15 uptime is in minutes + +I15 returns uptime in minutes since last start or restart, with +/- 5% accuracy. +Response: `I15 A `. Example: `I15 A 123014` = ~85 days. + +## Command discovery + +**I0 is the definitive source of command support**, not I1. + +I1 reports which standardized level sets are fully implemented. However, a device +can have individual commands from levels it does not fully support. The WXS205SDU +reports I1 levels [0, 1] but I0 discovers 62 commands across levels 0-3, including +M21, M28, and many other Level 2 commands. + +During `setup()`, the backend queries I0 to discover all available commands. +Methods decorated with `@requires_mt_sics_command("CMD")` check against this list. + +## Command levels + +MT-SICS commands are grouped into levels. I1 reports level compliance but I0 is +the authoritative list of implemented commands. + +| Level | Description | Availability | +|-------|-------------|-------------| +| 0 | Basic set: identification, weighing, zero, tare, reset (@) | Always available | +| 1 | Elementary: tare memory, timed commands, repeat | Always available | +| 2 | Extended: configuration, device info, diagnostics | Model-dependent | +| 3 | Application-specific: filling, dosing, calibration | Model-dependent | + +## Date/time response format + +DAT and TIM return space-separated fields, not a single string: +``` +DAT A -- e.g. DAT A 01 10 2021 = 1 Oct 2021 +TIM A -- e.g. TIM A 09 56 11 = 09:56:11 +``` + +Both support set variants (`DAT DD MM YYYY`, `TIM HH MM SS`). +DAT set persists only via MT-SICS or FSET, not @. +TIM set also persists; only reset via MT-SICS, FSET, or terminal menu, not @. + +## Write safety + +Commands that modify device settings (M01 set, M02 set, M03 set, etc.) persist +to memory and survive power cycles. They cannot be undone with @ reset - only +via FSET (factory reset) or the terminal menu. Write methods are commented out +in the backend to prevent accidental modification. + +Exceptions: `set_date()`, `set_time()`, and `set_device_id()` are active (not +commented out) since they do not change weighing behaviour. + +## Interrupt safety + +When a command is interrupted (KeyboardInterrupt or asyncio.CancelledError), +`send_command` sends `C` (cancel all) if the device supports it, otherwise just +flushes the serial buffer. Device state (zero, tare) is never cleared by an +interrupt. See the interrupt-safe command layer pattern. + +See `mt_sics_commands.md` for the full command reference with implementation status. diff --git a/pylabrobot/scales/mettler_toledo/simulator.py b/pylabrobot/scales/mettler_toledo/simulator.py new file mode 100644 index 00000000000..c3578ce117a --- /dev/null +++ b/pylabrobot/scales/mettler_toledo/simulator.py @@ -0,0 +1,373 @@ +"""MT-SICS protocol-level simulator for device-free testing. + +Inherits from MettlerToledoWXS205SDUBackend and overrides send_command to return +mock MT-SICS responses. All high-level methods (zero, tare, read_weight, etc.) +work unchanged because they call send_command which is intercepted here. + +This follows the same pattern as STARChatterboxBackend in PLR (inherits from +the hardware backend and overrides the low-level command transmission). +""" + +import logging +from typing import List, Optional, Set + +from pylabrobot.io.validation_utils import LOG_LEVEL_IO +from pylabrobot.scales.mettler_toledo.backend import ( + MettlerToledoResponse, + MettlerToledoWXS205SDUBackend, +) +from pylabrobot.scales.scale_backend import ScaleBackend + +logger = logging.getLogger("pylabrobot") + + +class MettlerToledoSICSSimulator(MettlerToledoWXS205SDUBackend): + """MT-SICS protocol simulator for testing without hardware. + + Inherits all MT-SICS methods from MettlerToledoWXS205SDUBackend. + Overrides send_command to return mock MT-SICS responses. + + Set ``platform_weight`` and ``sample_weight`` to simulate placing items + on the scale. The total sensor reading is ``platform_weight + sample_weight``. + + Example:: + + backend = MettlerToledoSICSSimulator() + scale = Scale(name="scale", backend=backend, size_x=0, size_y=0, size_z=0) + await scale.setup() + # backend.device_type == "WXS205SDU", backend.capacity == 220.0 + + backend.platform_weight = 50.0 # place 50g container + await scale.tare() + backend.sample_weight = 10.0 # add 10g + weight = await scale.read_weight() # returns 10.0 + """ + + def __init__( + self, + device_type: str = "WXS205SDU WXA-Bridge", + serial_number: str = "SIM0000001", + capacity: float = 220.0, + supported_commands: Optional[Set[str]] = None, + ) -> None: + # Skip MettlerToledoWXS205SDUBackend.__init__ (which creates a Serial object) + ScaleBackend.__init__(self) + + # Physics state + self.platform_weight: float = 0.0 + self.sample_weight: float = 0.0 + self.zero_offset: float = 0.0 + self.tare_weight: float = 0.0 + self.temperature: float = 22.5 + + # Simulated device identity + self._human_readable_device_name = "Mettler Toledo Scale" + self.device_type = device_type + self.serial_number = serial_number + self.capacity = capacity + self.software_material_number: str = "12121306C" + self.device_id: str = "SimScale" + self.next_service_date: str = "16.03.2013" + self.assortment_type_revision: str = "5" + self.operating_mode_after_restart: str = "0" + self.date: str = "30.03.2026" + self.time: str = "12:00:00" + + # Simulated device configuration + self.weighing_mode: str = "0" + self.environment_condition: str = "2" + self.auto_zero: str = "1" + self.update_rate: str = "18.3" + self.adjustment_setting: str = "0 0" + self.serial_parameters: str = "0 6 3 1" + self.filter_cutoff: str = "0.000" + self.readability: str = "5" + self.test_settings: str = "0" + self.weighing_value_release: str = "1" + self.operating_mode: str = "0" + self.profact_time_criteria: str = "00 00 00 0" + self.profact_temperature_criterion: str = "1" + self.adjustment_weight: str = "10.00000 g" + self.test_weight: str = "200.00000 g" + self.profact_day: str = "0" + self.zeroing_mode: str = "0" + self.uptime_minutes: int = 60 * 24 # 1 day in minutes + # Default: all commands the simulator can mock + self._supported_commands = supported_commands or { + "@", + "C", + "C0", + "COM", + "D", + "DAT", + "DW", + "FCUT", + "I0", + "I1", + "I2", + "I3", + "I4", + "I5", + "I10", + "I11", + "I14", + "I15", + "I16", + "I21", + "I26", + "I50", + "LST", + "M01", + "M02", + "M03", + "M17", + "M18", + "M19", + "M20", + "M21", + "M27", + "M28", + "M29", + "M31", + "M32", + "M33", + "M35", + "RDB", + "S", + "SC", + "SI", + "SIS", + "SNR", + "T", + "TA", + "TAC", + "TC", + "TI", + "TIM", + "TST0", + "UPD", + "USTB", + "Z", + "ZC", + "ZI", + } + + @property + def _sensor_reading(self) -> float: + return self.platform_weight + self.sample_weight + + async def setup(self) -> None: + self.firmware_version = "1.10 18.6.4.1361.772" + self.configuration = "Bridge" if "Bridge" in self.device_type else "Balance" + logger.info( + "[%s] Connected (simulation)\n" + "Device type: %s\n" + "Configuration: %s\n" + "Serial number: %s\n" + "Firmware: %s\n" + "Capacity: %.1f g\n" + "Supported commands (%d): %s", + self._human_readable_device_name, + self.device_type, + self.configuration, + self.serial_number, + self.firmware_version, + self.capacity, + len(self._supported_commands), + ", ".join(sorted(self._supported_commands)), + ) + + async def stop(self) -> None: + logger.info("[%s] Disconnected (simulation)", self._human_readable_device_name) + + async def reset(self) -> str: + responses = await self.send_command("@") + self._validate_response(responses[0], 3, "@") + return responses[0].data[0] + + async def send_command(self, command: str, timeout: int = 60) -> List[MettlerToledoResponse]: + logger.log(LOG_LEVEL_IO, "[%s] Sent command: %s", self._human_readable_device_name, command) + responses = self._build_response(command) + for resp in responses: + logger.log( + LOG_LEVEL_IO, + "[%s] Received response: %s %s %s", + self._human_readable_device_name, + resp.command, + resp.status, + " ".join(resp.data), + ) + self._parse_basic_errors(resp) + return responses + + def _build_response(self, command: str) -> List[MettlerToledoResponse]: + R = MettlerToledoResponse + cmd = command.split()[0] + net = round(self._sensor_reading - self.zero_offset - self.tare_weight, 5) + + # Reset and cancel + if cmd == "@": + return [R("I4", "A", [self.serial_number])] + if cmd == "C": + return [R("C", "B"), R("C", "A")] + + # Device identity + if cmd == "I0": + cmds = sorted(self._supported_commands) + responses = [R("I0", "B", ["0", c]) for c in cmds[:-1]] + responses.append(R("I0", "A", ["0", cmds[-1]])) + return responses + if cmd == "I1": + return [R("I1", "A", ["01"])] + if cmd == "I2": + return [R("I2", "A", [f"{self.device_type} {self.capacity:.5f} g"])] + if cmd == "I3": + return [R("I3", "A", [self.firmware_version])] + if cmd == "I4": + return [R("I4", "A", [self.serial_number])] + if cmd == "I5": + return [R("I5", "A", [self.software_material_number])] + if cmd == "I10": + parts = command.split(maxsplit=1) + if len(parts) > 1: + self.device_id = parts[1].strip('"') + return [R("I10", "A")] + return [R("I10", "A", [self.device_id])] + if cmd == "I11": + return [R("I11", "A", [self.device_type.split()[0]])] + if cmd == "I14": + return [ + R("I14", "B", ["0", "1", "Bridge"]), + R("I14", "A", ["1", "1", self.device_type]), + ] + if cmd == "I15": + return [R("I15", "A", [str(self.uptime_minutes)])] + if cmd == "I16": + d, m, y = self.next_service_date.split(".") + return [R("I16", "A", [d, m, y])] + if cmd == "I21": + return [R("I21", "A", [self.assortment_type_revision])] + if cmd == "I26": + return [R("I26", "A", [self.operating_mode_after_restart])] + if cmd == "DAT": + parts = command.split() + if len(parts) > 1: + self.date = f"{parts[1]}.{parts[2]}.{parts[3]}" + return [R("DAT", "A")] + d, m, y = self.date.split(".") + return [R("DAT", "A", [d, m, y])] + if cmd == "TIM": + parts = command.split() + if len(parts) > 1: + self.time = f"{parts[1]}:{parts[2]}:{parts[3]}" + return [R("TIM", "A")] + h, mi, s = self.time.split(":") + return [R("TIM", "A", [h, mi, s])] + + # Zero + if cmd in ("Z", "ZI", "ZC"): + self.zero_offset = self._sensor_reading + return [R(cmd, "A")] + + # Tare + if cmd in ("T", "TI", "TC"): + self.tare_weight = self._sensor_reading - self.zero_offset + return [R(cmd, "S", [f"{self.tare_weight:.5f}", "g"])] + if cmd == "TA": + return [R("TA", "A", [f"{self.tare_weight:.5f}", "g"])] + if cmd == "TAC": + self.tare_weight = 0.0 + return [R("TAC", "A")] + + # Weight measurement + if cmd in ("S", "SI", "SC"): + return [R(cmd, "S", [f"{net:.5f}", "g"])] + if cmd == "M28": + return [R("M28", "A", ["1", str(self.temperature)])] + if cmd == "SIS": + state = "0" + info = "0" if self.tare_weight == 0 else "1" + return [R("SIS", "A", [state, f"{net:.5f}", "0", "5", "1", "0", info])] + if cmd == "SNR": + return [R("SNR", "S", [f"{net:.5f}", "g"])] + if cmd == "I50": + remaining = self.capacity - self.platform_weight - self.sample_weight + return [ + R("I50", "B", ["0", f"{remaining:.3f}", "g"]), + R("I50", "B", ["1", "0.000", "g"]), + R("I50", "A", ["2", f"{remaining:.3f}", "g"]), + ] + + # Device configuration (read-only) + if cmd == "M01": + return [R("M01", "A", [self.weighing_mode])] + if cmd == "M02": + return [R("M02", "A", [self.environment_condition])] + if cmd == "M03": + return [R("M03", "A", [self.auto_zero])] + if cmd == "M17": + return [R("M17", "A", self.profact_time_criteria.split())] + if cmd == "M18": + return [R("M18", "A", [self.profact_temperature_criterion])] + if cmd == "M19": + return [R("M19", "A", self.adjustment_weight.split())] + if cmd == "M20": + return [R("M20", "A", self.test_weight.split())] + if cmd == "M27": + return [ + R("M27", "B", ["1", "1", "1", "2026", "8", "0", "0", ""]), + R("M27", "A", ["2", "15", "3", "2026", "10", "30", "1", "200.1234 g"]), + ] + if cmd == "M29": + return [R("M29", "A", [self.weighing_value_release])] + if cmd == "M31": + return [R("M31", "A", [self.operating_mode])] + if cmd == "M32": + return [ + R("M32", "B", ["1", "00", "00", "0"]), + R("M32", "B", ["2", "00", "00", "0"]), + R("M32", "A", ["3", "00", "00", "0"]), + ] + if cmd == "M33": + return [R("M33", "A", [self.profact_day])] + if cmd == "M35": + return [R("M35", "A", [self.zeroing_mode])] + if cmd == "UPD": + return [R("UPD", "A", [self.update_rate])] + if cmd == "C0": + return [R("C0", "A", self.adjustment_setting.split())] + if cmd == "COM": + return [R("COM", "A", self.serial_parameters.split())] + if cmd == "FCUT": + return [R("FCUT", "A", [self.filter_cutoff])] + if cmd == "RDB": + return [R("RDB", "A", [self.readability])] + if cmd == "USTB": + return [ + R("USTB", "B", ["0", "3.600", "1.100"]), + R("USTB", "B", ["1", "0.000", "0.000"]), + R("USTB", "A", ["2", "0.000", "0.000"]), + ] + if cmd == "TST0": + return [R("TST0", "A", [self.test_settings])] + if cmd == "LST": + return [ + R("LST", "B", ["C0"] + self.adjustment_setting.split()), + R("LST", "B", ["FCUT", self.filter_cutoff]), + R("LST", "B", ["M01", self.weighing_mode]), + R("LST", "B", ["M02", self.environment_condition]), + R("LST", "B", ["M03", self.auto_zero]), + R("LST", "B", ["M21", "0", "0"]), + R("LST", "A", ["UPD", self.update_rate]), + ] + + # Display + if cmd in ("D", "DW"): + return [R(cmd, "A")] + + # Configuration (write) + if cmd == "M21": + return [R("M21", "A")] + + # Unknown command + return [R("ES", "")] diff --git a/pylabrobot/scales/mettler_toledo_backend.py b/pylabrobot/scales/mettler_toledo_backend.py index ae73eb114df..82847c8a478 100644 --- a/pylabrobot/scales/mettler_toledo_backend.py +++ b/pylabrobot/scales/mettler_toledo_backend.py @@ -1,547 +1,4 @@ -# similar library: https://github.com/janelia-pypi/mettler_toledo_device_python +"""Backwards-compatible import shim. Use pylabrobot.scales.mettler_toledo instead.""" -import asyncio -import logging -import time -import warnings -from typing import List, Literal, Optional, Union - -from pylabrobot.io.serial import Serial -from pylabrobot.scales.scale_backend import ScaleBackend - -logger = logging.getLogger("pylabrobot") - - -class MettlerToledoError(Exception): - """Exceptions raised by a Mettler Toledo scale.""" - - def __init__(self, title: str, message: Optional[str] = None) -> None: - self.title = title - self.message = message - - def __str__(self) -> str: - return f"{self.title}: {self.message}" - - @staticmethod - def unknown_error() -> "MettlerToledoError": - return MettlerToledoError(title="Unknown error", message="An unknown error occurred") - - @staticmethod - def executing_another_command() -> "MettlerToledoError": - return MettlerToledoError( - title="Command not understood, not executable at present", - message=( - "Command understood but currently not executable (balance is " - "currently executing another command)." - ), - ) - - @staticmethod - def incorrect_parameter() -> "MettlerToledoError": - return MettlerToledoError( - title="Command understood but not executable", - message="(incorrect parameter).", - ) - - @staticmethod - def overload() -> "MettlerToledoError": - return MettlerToledoError(title="Balance in overload range.", message=None) - - @staticmethod - def underload() -> "MettlerToledoError": - return MettlerToledoError(title="Balance in underload range.", message=None) - - @staticmethod - def syntax_error() -> "MettlerToledoError": - return MettlerToledoError( - title="Syntax error", - message="The weigh module/balance has not recognized the received command or the command is " - "not allowed", - ) - - @staticmethod - def transmission_error() -> "MettlerToledoError": - return MettlerToledoError( - title="Transmission error", - message="The weigh module/balance has received a 'faulty' command, e.g. owing to a parity " - "error or interface break", - ) - - @staticmethod - def logical_error() -> "MettlerToledoError": - return MettlerToledoError( - title="Logical error", - message="The weigh module/balance can not execute the received command", - ) - - @staticmethod - def boot_error(from_terminal: bool) -> "MettlerToledoError": - return MettlerToledoError( - title="Boot error", - message="from terminal" if from_terminal else "from electronics", - ) - - @staticmethod - def brand_error(from_terminal: bool) -> "MettlerToledoError": - return MettlerToledoError( - title="Brand error", - message="from terminal" if from_terminal else "from electronics", - ) - - @staticmethod - def checksum_error(from_terminal: bool) -> "MettlerToledoError": - return MettlerToledoError( - title="Checksum error", - message="from terminal" if from_terminal else "from electronics", - ) - - @staticmethod - def option_fail(from_terminal: bool) -> "MettlerToledoError": - return MettlerToledoError( - title="Option fail", - message="from terminal" if from_terminal else "from electronics", - ) - - @staticmethod - def eeprom_error(from_terminal: bool) -> "MettlerToledoError": - return MettlerToledoError( - title="EEPROM error", - message="from terminal" if from_terminal else "from electronics", - ) - - @staticmethod - def device_mismatch(from_terminal: bool) -> "MettlerToledoError": - return MettlerToledoError( - title="Device mismatch", - message="from terminal" if from_terminal else "from electronics", - ) - - @staticmethod - def hot_plug_out(from_terminal: bool) -> "MettlerToledoError": - return MettlerToledoError( - title="Hot plug out", - message="from terminal" if from_terminal else "from electronics", - ) - - @staticmethod - def weight_module_electronic_mismatch( - from_terminal: bool, - ) -> "MettlerToledoError": - return MettlerToledoError( - title="Weight module / electronic mismatch", - message="from terminal" if from_terminal else "from electronics", - ) - - @staticmethod - def adjustment_needed(from_terminal: bool) -> "MettlerToledoError": - return MettlerToledoError( - title="Adjustment needed", - message="from terminal" if from_terminal else "from electronics", - ) - - -MettlerToledoResponse = List[str] - - -class MettlerToledoWXS205SDUBackend(ScaleBackend): - """Backend for the Mettler Toledo WXS205SDU scale. - - This scale is used by Hamilton in the liquid verification kit (LVK). - - Documentation: https://web.archive.org/web/20240208213802/https://www.mt.com/dam/ - product_organizations/industry/apw/generic/11781363_N_MAN_RM_MT-SICS_APW_en.pdf - - From the docs: - - "If several commands are sent in succession without waiting for the corresponding - responses, it is possible that the weigh module/balance confuses the sequence of - command processing or ignores entire commands." - """ - - # === Constructor === - - def __init__(self, port: Optional[str] = None, vid: int = 0x0403, pid: int = 0x6001): - super().__init__() - - self.io = Serial( - human_readable_device_name="Mettler Toledo Scale", - port=port, - vid=vid, - pid=pid, - baudrate=9600, - timeout=1, - ) - - async def setup(self) -> None: - # Core state - await self.io.setup() - - # set output unit to grams - await self.send_command("M21 0 0") - - # Handshake: parse requested serial number - self.serial_number = await self.request_serial_number() - # TODO: verify serial number pattern - - async def stop(self) -> None: - await self.io.stop() - - def serialize(self) -> dict: - return {**super().serialize(), "port": self.io.port} - - # === Response parsing === - - def _parse_basic_errors(self, response: List[str]) -> None: - """Helper function for parsing basic errors that are common to many commands. If an error is - detected, a 'MettlerToledoError' exception is raised. - - These are in the first place of the response: - - ES: syntax error: The weigh module/balance has not recognized the received command or the - command is not allowed - - ET: transmission error: The weigh module/balance has received a "faulty" command, e.g. owing - to a parity error or interface break - - EL: logical error: The weigh module/balance can not execute the received command - - These are in the second place of the response (MT-SICS spec p.10, sec 2.1.3.1): - - A: Command executed successfully - - B: Command not yet terminated, additional responses following - - I: Internal error (e.g. balance not ready yet) - - L: Logical error (e.g. parameter not allowed) - - +: Balance in overload range - - -: Balance in underload range - - TODO: handle 'B' status — multi-response commands (e.g. C1 adjustment) send 'B' first, - then additional responses, then 'A' on completion. Currently send_command returns after - the first response, so 'B' responses are not followed up. - """ - - if response[0] == "ES": - raise MettlerToledoError.syntax_error() - if response[0] == "ET": - raise MettlerToledoError.transmission_error() - if response[0] == "EL": - raise MettlerToledoError.logical_error() - - if response[1] == "I": - raise MettlerToledoError.executing_another_command() - if response[1] == "L": - raise MettlerToledoError.incorrect_parameter() - if response[1] == "+": - raise MettlerToledoError.overload() - if response[1] == "-": - raise MettlerToledoError.underload() - - if response[0] == "S" and response[1] == "S" and response[2] == "Error": - error_code = response[3] - code, source = error_code[:-1], error_code[-1] - from_terminal = source == "t" - if code == "1": - raise MettlerToledoError.boot_error(from_terminal=from_terminal) - if code == "2": - raise MettlerToledoError.brand_error(from_terminal=from_terminal) - if code == "3": - raise MettlerToledoError.checksum_error(from_terminal=from_terminal) - if code == "9": - raise MettlerToledoError.option_fail(from_terminal=from_terminal) - if code == "10": - raise MettlerToledoError.eeprom_error(from_terminal=from_terminal) - if code == "11": - raise MettlerToledoError.device_mismatch(from_terminal=from_terminal) - if code == "12": - raise MettlerToledoError.hot_plug_out(from_terminal=from_terminal) - if code == "14": - raise MettlerToledoError.weight_module_electronic_mismatch(from_terminal=from_terminal) - if code == "15": - raise MettlerToledoError.adjustment_needed(from_terminal=from_terminal) - - # === Command Layer === - - async def send_command(self, command: str, timeout: int = 60) -> MettlerToledoResponse: - """Send a command to the scale and receive the response. - - Args: - timeout: The timeout in seconds. - """ - - await self.io.write(command.encode() + b"\r\n") - - raw_response = b"" - timeout_time = time.time() + timeout - while True: - raw_response = await self.io.readline() - await asyncio.sleep(0.001) - if time.time() > timeout_time: - raise TimeoutError("Timeout while waiting for response from scale.") - if raw_response != b"": - break - logger.debug("[scale] Received response: %s", raw_response) - response = raw_response.decode("utf-8").strip().split() - - # parse basic errors - self._parse_basic_errors(response) - - # mypy doesn't understand this - return response # type: ignore - - # === Public high-level API === - - async def request_serial_number(self) -> str: - """Get the serial number of the scale. (MEM-READ command)""" - response = await self.send_command("I4") - serial_number = response[2] - serial_number = serial_number.replace('"', "") - return serial_number - - # # Zero commands # # - - async def zero_immediately(self) -> MettlerToledoResponse: - """Zero the scale immediately. (ACTION command)""" - return await self.send_command("ZI") - - async def zero_stable(self) -> MettlerToledoResponse: - """Zero the scale when the weight is stable. (ACTION command)""" - return await self.send_command("Z") - - async def zero_timeout(self, timeout: float) -> MettlerToledoResponse: - """Zero the scale after a given timeout. (ACTION command)""" - # For some reason, this will always return a syntax error (ES), even though it should be allowed - # according to the docs. - timeout = int(timeout * 1000) - return await self.send_command(f"ZC {timeout}") - - async def zero( - self, timeout: Union[Literal["stable"], float, int] = "stable" - ) -> MettlerToledoResponse: - """High level function to zero the scale. (ACTION command) - - Args: - timeout: The timeout in seconds. If "stable", the scale will zero when the weight is stable. - If 0, the scale will zero immediately. If a float/int, the scale will zero after the given - timeout (in seconds). - """ - - if timeout == "stable": - return await self.zero_stable() - - if not isinstance(timeout, (float, int)): - raise TypeError("timeout must be a float or 'stable'") - - if timeout < 0: - raise ValueError("timeout must be greater than or equal to 0") - - if timeout == 0: - return await self.zero_immediately() - - return await self.zero_timeout(timeout) - - # # Tare commands # # - - async def tare_stable(self) -> MettlerToledoResponse: - """Tare the scale when the weight is stable. (ACTION command)""" - return await self.send_command("T") - - async def tare_immediately(self) -> MettlerToledoResponse: - """Tare the scale immediately. (ACTION command)""" - return await self.send_command("TI") - - async def tare_timeout(self, timeout: float) -> MettlerToledoResponse: - """Tare the scale after a given timeout. (ACTION command)""" - # For some reason, this will always return a syntax error (ES), even though it should be allowed - # according to the docs. - timeout = int(timeout * 1000) # convert to milliseconds - return await self.send_command(f"TC {timeout}") - - async def tare( - self, timeout: Union[Literal["stable"], float, int] = "stable" - ) -> MettlerToledoResponse: - """High level function to tare the scale. (ACTION command) - - Args: - timeout: The timeout in seconds. If "stable", the scale will tare when the weight is stable. - If 0, the scale will tare immediately. If a float/int, the scale will tare after the given - timeout (in seconds). - """ - - if timeout == "stable": - # "Use T to tare the balance. The next stable weight value will be saved in the tare memory." - return await self.tare_stable() - - if not isinstance(timeout, (float, int)): - raise TypeError("timeout must be a float or 'stable'") - - if timeout < 0: - raise ValueError("timeout must be greater than or equal to 0") - - if timeout == 0: - return await self.tare_immediately() - return await self.tare_timeout(timeout) - - # # Weight reading commands # # - - async def request_tare_weight(self) -> float: - """Request tare weight value from scale's memory. (MEM-READ command) - "Use TA to query the current tare value or preset a known tare value." - """ - - response = await self.send_command("TA") - tare = float(response[2]) - unit = response[3] - assert unit == "g" # this is the format we expect - return tare - - async def clear_tare(self) -> MettlerToledoResponse: - """TAC - Clear tare weight value (MEM-WRITE command)""" - return await self.send_command("TAC") - - async def read_stable_weight(self) -> float: - """Read a stable weight value from the scale. (MEASUREMENT command) - - from the docs: - - "Use S to send a stable weight value, along with the host unit, from the balance to - the connected communication partner via the interface. If the automatic door function - is enabled and a stable weight is requested the balance will open and close the balance's - doors to achieve a stable weight." - """ - - response = await self.send_command("S") - weight = float(response[2]) - unit = response[3] - assert unit == "g" # this is the format we expect - return weight - - async def read_dynamic_weight(self, timeout: float) -> float: - """Read a stable weight value from the machine within a given timeout, or - return the current weight value if not possible. (MEASUREMENT command) - - Args: - timeout: The timeout in seconds. - """ - - timeout = int(timeout * 1000) # convert to milliseconds - - response = await self.send_command(f"SC {timeout}") - weight = float(response[2]) - unit = response[3] - assert unit == "g" # this is the format we expect - return weight - - async def read_weight_value_immediately(self) -> float: - """Read a weight value immediately from the scale. (MEASUREMENT command) - - "Use SI to immediately send the current weight value, along with the host unit, from the - balance to the connected communication partner via the interface." - """ - - response = await self.send_command("SI") - weight = float(response[2]) - assert response[3] == "g" # this is the format we expect - return weight - - async def read_weight(self, timeout: Union[Literal["stable"], float, int] = "stable") -> float: - """High level function to read a weight value from the scale. (MEASUREMENT command) - - Args: - timeout: The timeout in seconds. If "stable", the scale will return a weight value when the - weight is stable. If 0, the scale will return a weight value immediately. If a float/int, - the scale will return a weight value after the given timeout (in seconds). - """ - - if timeout == "stable": - return await self.read_stable_weight() - - if not isinstance(timeout, (float, int)): - raise TypeError("timeout must be a float or 'stable'") - - if timeout < 0: - raise ValueError("timeout must be greater than or equal to 0") - - if timeout == 0: - return await self.read_weight_value_immediately() - - return await self.read_dynamic_weight(timeout) - - # Commands for (optional) display manipulation - - async def set_display_text(self, text: str) -> MettlerToledoResponse: - """Set the display text of the scale. Return to the normal weight display with - self.set_weight_display().""" - return await self.send_command(f'D "{text}"') - - async def set_weight_display(self) -> MettlerToledoResponse: - """Return the display to the normal weight display.""" - return await self.send_command("DW") - - # # # Deprecated alias with warning # # # - - # # TODO: remove 2026-03 (giving people >2 months to update) - - async def get_serial_number(self) -> str: - """Deprecated: Use request_serial_number() instead.""" - warnings.warn( - "get_serial_number() is deprecated and will be removed in 2026-03. " - "Use request_serial_number() instead.", - DeprecationWarning, - stacklevel=2, - ) - return await self.request_serial_number() - - async def get_tare_weight(self) -> float: - """Deprecated: Use request_tare_weight() instead.""" - warnings.warn( - "get_tare_weight() is deprecated and will be removed in 2026-03. " - "Use request_tare_weight() instead.", - DeprecationWarning, - stacklevel=2, - ) - return await self.request_tare_weight() - - async def get_stable_weight(self) -> float: - """Deprecated: Use read_stable_weight() instead.""" - warnings.warn( - "get_stable_weight() is deprecated and will be removed in 2026-03. " - "Use read_stable_weight() instead.", - DeprecationWarning, - stacklevel=2, - ) - return await self.read_stable_weight() - - async def get_dynamic_weight(self, timeout: float) -> float: - """Deprecated: Use read_dynamic_weight() instead.""" - warnings.warn( - "get_dynamic_weight() is deprecated and will be removed in 2026-03. " - "Use read_dynamic_weight() instead.", - DeprecationWarning, - stacklevel=2, - ) - return await self.read_dynamic_weight(timeout) - - async def get_weight_value_immediately(self) -> float: - """Deprecated: Use read_weight_value_immediately() instead.""" - warnings.warn( - "get_weight_value_immediately() is deprecated and will be removed in 2026-03. " - "Use read_weight_value_immediately() instead.", - DeprecationWarning, - stacklevel=2, - ) - return await self.read_weight_value_immediately() - - async def get_weight(self, timeout: Union[Literal["stable"], float, int] = "stable") -> float: - """Deprecated: Use read_weight() instead.""" - warnings.warn( - "get_weight() is deprecated and will be removed in 2026-03. Use read_weight() instead.", - DeprecationWarning, - stacklevel=2, - ) - return await self.read_weight(timeout) - - -# Deprecated alias with warning # TODO: remove mid May 2025 (giving people 1 month to update) -# https://github.com/PyLabRobot/pylabrobot/issues/466 - - -class MettlerToledoWXS205SDU: - def __init__(self, *args, **kwargs): - raise RuntimeError( - "`MettlerToledoWXS205SDU` is deprecated. Please use `MettlerToledoWXS205SDUBackend` instead." - ) +# TODO: remove after 2026-09 +from pylabrobot.scales.mettler_toledo.backend import MettlerToledoWXS205SDUBackend # noqa: F401 diff --git a/pylabrobot/scales/scale.py b/pylabrobot/scales/scale.py index 977c28d1ecf..e2f8190e58c 100644 --- a/pylabrobot/scales/scale.py +++ b/pylabrobot/scales/scale.py @@ -54,6 +54,10 @@ async def tare(self, **backend_kwargs) -> None: """ await self.backend.tare(**backend_kwargs) + async def request_tare_weight(self, **backend_kwargs) -> float: + """Query the current tare weight value stored in the scale, in grams.""" + return await self.backend.request_tare_weight(**backend_kwargs) + async def read_weight(self, **backend_kwargs) -> float: """Read the current weight in grams. diff --git a/pylabrobot/scales/scale_backend.py b/pylabrobot/scales/scale_backend.py index 85894aea7c7..46b844e71f2 100644 --- a/pylabrobot/scales/scale_backend.py +++ b/pylabrobot/scales/scale_backend.py @@ -1,3 +1,5 @@ +"""Abstract base class for scale backends.""" + from abc import ABCMeta, abstractmethod from pylabrobot.machines.backend import MachineBackend @@ -7,16 +9,25 @@ class ScaleBackend(MachineBackend, metaclass=ABCMeta): """Backend for a scale""" @abstractmethod - async def zero(self): ... + async def zero(self) -> None: + """Zero the scale.""" + ... @abstractmethod - async def tare(self): ... + async def tare(self) -> None: + """Tare the scale.""" + ... @abstractmethod async def read_weight(self) -> float: """Read the weight in grams""" ... + @abstractmethod + async def request_tare_weight(self) -> float: + """Request the current tare weight value in grams.""" + ... + # Deprecated: for backward compatibility async def get_weight(self) -> float: """Deprecated: Use read_weight() instead. diff --git a/pylabrobot/scales/scales_tests.py b/pylabrobot/scales/scales_tests.py new file mode 100644 index 00000000000..e1318c25e0a --- /dev/null +++ b/pylabrobot/scales/scales_tests.py @@ -0,0 +1,78 @@ +"""Tests for generic scale behavior via the scale physics simulation.""" + +import unittest + +from pylabrobot.scales.scale import Scale +from pylabrobot.scales.simulator import ScaleSimulator + + +class ScaleSimulatorTests(unittest.IsolatedAsyncioTestCase): + """Tests for the physics simulation via the Scale frontend.""" + + async def asyncSetUp(self): + self.backend = ScaleSimulator() + self.scale = Scale( + name="test_scale", + backend=self.backend, + size_x=0, + size_y=0, + size_z=0, + ) + await self.scale.setup() + + async def asyncTearDown(self): + await self.scale.stop() + + async def test_zero_then_read_returns_zero(self): + self.backend.platform_weight = 5.0 + await self.scale.zero() + weight = await self.scale.read_weight() + self.assertEqual(weight, 0.0) + + async def test_tare_workflow(self): + self.backend.platform_weight = 50.0 # container + await self.scale.tare() + self.backend.sample_weight = 0.0106 # ~10 uL liquid + weight = await self.scale.read_weight() + self.assertEqual(weight, 0.0106) + + async def test_zero_and_tare_compose(self): + # preload on platform + self.backend.platform_weight = 2.0 + await self.scale.zero() + + # place container + self.backend.platform_weight = 52.0 # 2g preload + 50g container + await self.scale.tare() + + # add sample + self.backend.sample_weight = 1.06 + weight = await self.scale.read_weight() + self.assertEqual(weight, 1.06) + + async def test_request_tare_weight_accuracy(self): + self.backend.platform_weight = 45.0 + await self.scale.tare() + tare = await self.scale.request_tare_weight() + self.assertEqual(tare, 45.0) + + async def test_re_tare_resets(self): + # first tare with 50g container + self.backend.platform_weight = 50.0 + await self.scale.tare() + + # re-tare with 30g container + self.backend.platform_weight = 30.0 + await self.scale.tare() + + # add sample + self.backend.sample_weight = 5.0 + weight = await self.scale.read_weight() + self.assertEqual(weight, 5.0) + + tare = await self.scale.request_tare_weight() + self.assertEqual(tare, 30.0) + + +if __name__ == "__main__": + unittest.main() diff --git a/pylabrobot/scales/simulator.py b/pylabrobot/scales/simulator.py new file mode 100644 index 00000000000..186e06f102a --- /dev/null +++ b/pylabrobot/scales/simulator.py @@ -0,0 +1,71 @@ +"""Generic scale simulator for testing the Scale frontend and ScaleBackend interface. + +This simulator is protocol-agnostic - it tests the abstract scale contract (zero, tare, +read_weight, request_tare_weight) without any device-specific protocol. For MT-SICS +protocol-level simulation, use MettlerToledoSICSSimulator instead. +""" + +from pylabrobot.scales.scale_backend import ScaleBackend + + +class ScaleSimulator(ScaleBackend): + """Generic scale simulator for device-free testing. + + Simulates scale behavior: tracks zero offset, tare weight, and platform load. + The total sensor reading is ``platform_weight + sample_weight``. + ``read_weight`` returns the net: ``platform_weight + sample_weight - zero_offset - tare_weight``. + + Set ``platform_weight`` to simulate a container or vessel on the scale. + Set ``sample_weight`` to simulate material added to the container. + + Example - zero:: + + backend = ScaleSimulator() + backend.platform_weight = 2.0 # residue on empty platform + await scale.zero() # zero_offset = 2.0 + await scale.read_weight() # returns 0.0 + backend.platform_weight = 52.0 # place a 50g beaker + await scale.read_weight() # returns 50.0 + + Example - tare:: + + backend = ScaleSimulator() + backend.platform_weight = 50.0 # place a 50g beaker + await scale.tare() # tare_weight = 50.0 + backend.sample_weight = 10.0 # add 10g of liquid + await scale.read_weight() # returns 10.0 + await scale.request_tare_weight() # returns 50.0 + """ + + def __init__(self) -> None: + super().__init__() + self.platform_weight: float = 0.0 + self.sample_weight: float = 0.0 + self.zero_offset: float = 0.0 + self.tare_weight: float = 0.0 + + @property + def _sensor_reading(self) -> float: + return self.platform_weight + self.sample_weight + + async def setup(self) -> None: + print("Setting up the scale.") + + async def stop(self) -> None: + print("Stopping the scale.") + + async def zero(self, **kwargs): + print("Zeroing the scale") + self.zero_offset = self._sensor_reading + + async def tare(self, **kwargs): + print("Taring the scale") + self.tare_weight = self._sensor_reading - self.zero_offset + + async def request_tare_weight(self, **kwargs) -> float: + print("Requesting tare weight") + return round(self.tare_weight, 5) + + async def read_weight(self, **kwargs) -> float: + print("Reading the weight") + return round(self._sensor_reading - self.zero_offset - self.tare_weight, 5)