diff --git a/docs/user_guide/agilent/biotek/cytation/hello-world.ipynb b/docs/user_guide/agilent/biotek/cytation/hello-world.ipynb index 81a82daa89f..d5e98ca73de 100644 --- a/docs/user_guide/agilent/biotek/cytation/hello-world.ipynb +++ b/docs/user_guide/agilent/biotek/cytation/hello-world.ipynb @@ -3,135 +3,258 @@ { "cell_type": "markdown", "id": "wez9bxdabm", - "source": "# Agilent BioTek Cytation\n\nThe Cytation is an Agilent BioTek multi-mode plate reader with optional microscopy imaging. Depending on the model it supports:\n\n- [Absorbance](../../../capabilities/absorbance)\n- [Fluorescence](../../../capabilities/fluorescence)\n- [Luminescence](../../../capabilities/luminescence)\n- [Microscopy](../../../capabilities/microscopy)\n- [Temperature control](../../../capabilities/temperature-control)\n\n| Model | PLR Name | Plate Reading | Microscopy | Temperature |\n|---|---|---|---|---|\n| Cytation 5 | `Cytation5` | Absorbance, Fluorescence, Luminescence | yes | yes |\n| Cytation 1 | `Cytation1` | -- | yes | yes |\n\nBoth models share the `CytationBackend` driver, which communicates over FTDI USB. The Cytation 5 adds plate-reading capabilities on top of the shared microscopy and temperature-control features.", - "metadata": {} + "metadata": {}, + "source": [ + "# Agilent BioTek Cytation\n", + "\n", + "The Cytation is an Agilent BioTek multi-mode plate reader with optional microscopy imaging. Depending on the model it supports:\n", + "\n", + "- [Absorbance](../../../capabilities/absorbance)\n", + "- [Fluorescence](../../../capabilities/fluorescence)\n", + "- [Luminescence](../../../capabilities/luminescence)\n", + "- [Microscopy](../../../capabilities/microscopy)\n", + "- [Temperature control](../../../capabilities/temperature-control)\n", + "\n", + "| Model | PLR Name | Plate Reading | Microscopy | Temperature |\n", + "|---|---|---|---|---|\n", + "| Cytation 5 | `Cytation5` | Absorbance, Fluorescence, Luminescence | yes | yes |\n", + "| Cytation 1 | `Cytation1` | -- | yes | yes |\n", + "\n", + "Both models share the `CytationBackend` driver, which communicates over FTDI USB. The Cytation 5 adds plate-reading capabilities on top of the shared microscopy and temperature-control features." + ] }, { "cell_type": "markdown", "id": "0rn94ubvq8dj", - "source": "## Setup\n\nThe examples below use a Cytation 5. For a Cytation 1, replace `Cytation5` with `Cytation1` (the Cytation 1 does not have `.absorbance`, `.fluorescence`, or `.luminescence` attributes).", - "metadata": {} + "metadata": {}, + "source": [ + "## Setup\n", + "\n", + "The examples below use a Cytation 5. For a Cytation 1, replace `Cytation5` with `Cytation1` (the Cytation 1 does not have `.absorbance`, `.fluorescence`, or `.luminescence` attributes)." + ] }, { "cell_type": "code", + "execution_count": null, "id": "ia4t5ga2ldg", - "source": "from pylabrobot.agilent.biotek.cytation import Cytation5\n\nc5 = Cytation5(name=\"cytation5\")\nawait c5.setup()", "metadata": {}, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "from pylabrobot.agilent.biotek.cytation import Cytation5\n", + "\n", + "c5 = Cytation5(name=\"cytation5\")\n", + "await c5.setup()" + ] }, { "cell_type": "markdown", "id": "35rmpdivj44", - "source": "Open and close the tray door. Pass `slow=True` for slower motor travel if needed.", - "metadata": {} + "metadata": {}, + "source": [ + "Open and close the tray door. Pass `slow=True` for slower motor travel if needed." + ] }, { "cell_type": "code", + "execution_count": null, "id": "l85qt1z6hdf", - "source": "await c5.open()\n\nfrom pylabrobot.resources import Cor_96_wellplate_360ul_Fb\nplate = Cor_96_wellplate_360ul_Fb(name=\"plate\")\nc5.plate_holder.assign_child_resource(plate)\n\nawait c5.close()", "metadata": {}, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "await c5.open()\n", + "\n", + "from pylabrobot.resources import Cor_96_wellplate_360ul_Fb\n", + "plate = Cor_96_wellplate_360ul_Fb(name=\"plate\")\n", + "c5.plate_holder.assign_child_resource(plate)\n", + "\n", + "await c5.close()" + ] }, { "cell_type": "markdown", "id": "hxkf2luxk9n", - "source": "## Plate reading (Cytation 5 only)\n\nThe Cytation 5 exposes `.absorbance`, `.fluorescence`, and `.luminescence` capability objects. For the full API, see [Absorbance](../../../capabilities/absorbance), [Fluorescence](../../../capabilities/fluorescence), and [Luminescence](../../../capabilities/luminescence).", - "metadata": {} + "metadata": {}, + "source": [ + "## Plate reading (Cytation 5 only)\n", + "\n", + "The Cytation 5 exposes `.absorbance`, `.fluorescence`, and `.luminescence` capability objects. For the full API, see [Absorbance](../../../capabilities/absorbance), [Fluorescence](../../../capabilities/fluorescence), and [Luminescence](../../../capabilities/luminescence)." + ] }, { "cell_type": "code", + "execution_count": null, "id": "hwipa2rwkzl", - "source": "# Absorbance\ndata = await c5.absorbance.read_absorbance(wavelength=450)", "metadata": {}, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "# Absorbance\n", + "data = await c5.absorbance.read_absorbance(wavelength=450)" + ] }, { "cell_type": "code", + "execution_count": null, "id": "jmvn8du2t5", - "source": "# Fluorescence\ndata = await c5.fluorescence.read_fluorescence(\n excitation_wavelength=485, emission_wavelength=528, focal_height=7.5\n)", "metadata": {}, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "# Fluorescence\n", + "data = await c5.fluorescence.read_fluorescence(\n", + " excitation_wavelength=485, emission_wavelength=528, focal_height=7.5\n", + ")" + ] }, { "cell_type": "code", + "execution_count": null, "id": "oxn123gjoh", - "source": "# Luminescence\ndata = await c5.luminescence.read_luminescence(focal_height=4.5)", "metadata": {}, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "# Luminescence\n", + "data = await c5.luminescence.read_luminescence(focal_height=4.5)" + ] }, { "cell_type": "markdown", "id": "1qn3t2pqvw", - "source": "## Microscopy\n\nBoth the Cytation 5 and Cytation 1 expose a `.microscopy` capability. For imaging, pass `use_cam=True` during setup so the Spinnaker camera is initialized. For the full API, see [Microscopy](../../../capabilities/microscopy).\n\nUse {class}`~pylabrobot.agilent.biotek.cytation.CytationBackend.CaptureParams` to control LED intensity, coverage tiling, and pixel format.", - "metadata": {} + "metadata": {}, + "source": [ + "## Microscopy\n", + "\n", + "Both the Cytation 5 and Cytation 1 expose a `.microscopy` capability. For imaging, pass `use_cam=True` during setup so the Spinnaker camera is initialized. For the full API, see [Microscopy](../../../capabilities/microscopy).\n", + "\n", + "Use {class}`~pylabrobot.agilent.biotek.cytation.CytationBackend.CaptureParams` to control LED intensity, coverage tiling, and pixel format." + ] }, { "cell_type": "code", + "execution_count": null, "id": "qr1jm6691a", - "source": "from pylabrobot.agilent.biotek.cytation import CytationBackend, CytationImagingConfig\nfrom pylabrobot.capabilities.microscopy.standard import ImagingMode, Objective\n\nres = await c5.microscopy.capture(\n row=1,\n column=2,\n mode=ImagingMode.BRIGHTFIELD,\n objective=Objective.O_4X_PL_FL_Phase,\n focal_height=0.833,\n exposure_time=5,\n gain=16,\n plate=plate,\n backend_params=CytationBackend.CaptureParams(led_intensity=10),\n)", "metadata": {}, + "outputs": [], + "source": [ + "from pylabrobot.agilent.biotek.cytation import CytationBackend, CytationImagingConfig\n", + "from pylabrobot.capabilities.microscopy.standard import ImagingMode, Objective\n", + "\n", + "res = await c5.microscopy.capture(\n", + " well=(1, 2),\n", + " mode=ImagingMode.BRIGHTFIELD,\n", + " objective=Objective.O_4X_PL_FL_Phase,\n", + " focal_height=0.833,\n", + " exposure_time=5,\n", + " gain=16,\n", + " plate=plate,\n", + " backend_params=CytationBackend.CaptureParams(led_intensity=10),\n", + ")" + ] + }, + { + "cell_type": "code", "execution_count": null, - "outputs": [] + "id": "b7125e49", + "metadata": {}, + "outputs": [], + "source": [ + "from PIL import Image\n", + "img = Image.fromarray(res.images[0])\n", + "img" + ] }, { "cell_type": "markdown", "id": "km95iou30f", - "source": "Tile multiple fields of view with the `coverage` parameter:", - "metadata": {} + "metadata": {}, + "source": [ + "Tile multiple fields of view with the `coverage` parameter:" + ] }, { "cell_type": "code", + "execution_count": null, "id": "fi1o94l1uni", - "source": "res = await c5.microscopy.capture(\n row=1,\n column=2,\n mode=ImagingMode.BRIGHTFIELD,\n objective=Objective.O_4X_PL_FL_Phase,\n focal_height=0.833,\n exposure_time=5,\n gain=16,\n plate=plate,\n backend_params=CytationBackend.CaptureParams(\n led_intensity=10,\n coverage=(4, 4),\n ),\n)\nprint(f\"{len(res.images)} images captured\")", "metadata": {}, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "res = await c5.microscopy.capture(\n", + " well=(1, 2),\n", + " mode=ImagingMode.BRIGHTFIELD,\n", + " objective=Objective.O_4X_PL_FL_Phase,\n", + " focal_height=0.833,\n", + " exposure_time=5,\n", + " gain=16,\n", + " plate=plate,\n", + " backend_params=CytationBackend.CaptureParams(\n", + " led_intensity=10,\n", + " coverage=(4, 4),\n", + " ),\n", + ")\n", + "print(f\"{len(res.images)} images captured\")" + ] }, { "cell_type": "markdown", "id": "sa9pdeeo51", - "source": "## Temperature control\n\nBoth models expose a `.temperature` controller. For the full API, see [Temperature Control](../../../capabilities/temperature-control).", - "metadata": {} + "metadata": {}, + "source": [ + "## Temperature control\n", + "\n", + "Both models expose a `.temperature` controller. For the full API, see [Temperature Control](../../../capabilities/temperature-control)." + ] }, { "cell_type": "code", + "execution_count": null, "id": "qhsjnerhl3", - "source": "await c5.temperature.set_temperature(37.0)\n\ncurrent = await c5.temperature.request_temperature()\nprint(f\"{current:.1f} \\u00b0C\")\n\nawait c5.temperature.deactivate()", "metadata": {}, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "await c5.temperature.set_temperature(37.0)\n", + "\n", + "current = await c5.temperature.request_temperature()\n", + "print(f\"{current:.1f} \\u00b0C\")\n", + "\n", + "await c5.temperature.deactivate()" + ] }, { "cell_type": "markdown", "id": "f667qnt4occ", - "source": "## Teardown", - "metadata": {} + "metadata": {}, + "source": [ + "## Teardown" + ] }, { "cell_type": "code", + "execution_count": null, "id": "xcqz2zwu04g", - "source": "await c5.stop()", "metadata": {}, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "await c5.stop()" + ] } ], "metadata": { "kernelspec": { - "display_name": "Python 3 (ipykernel)", + "display_name": "env", "language": "python", "name": "python3" }, "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", "name": "python", - "version": "3.11.0" + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.15" } }, "nbformat": 4, "nbformat_minor": 5 -} \ No newline at end of file +} diff --git a/pylabrobot/agilent/__init__.py b/pylabrobot/agilent/__init__.py index 0c5e113d9fb..38b73534c76 100644 --- a/pylabrobot/agilent/__init__.py +++ b/pylabrobot/agilent/__init__.py @@ -1,8 +1,10 @@ from .biotek import ( EL406, + AravisImagingConfig, BioTekBackend, Cytation1, Cytation5, + CytationAravisDriver, CytationBackend, CytationImagingConfig, EL406Driver, diff --git a/pylabrobot/agilent/biotek/__init__.py b/pylabrobot/agilent/biotek/__init__.py index ea6fce622a4..aac942dfdd7 100644 --- a/pylabrobot/agilent/biotek/__init__.py +++ b/pylabrobot/agilent/biotek/__init__.py @@ -1,9 +1,7 @@ from .biotek import BioTekBackend -from .cytation import ( - Cytation1, - Cytation5, - CytationBackend, - CytationImagingConfig, -) +from .cytation import CytationBackend, CytationImagingConfig +from .cytation1 import Cytation1 +from .cytation5 import Cytation5 +from .cytation_aravis_driver import AravisImagingConfig, CytationAravisDriver from .el406 import EL406, EL406Driver, EL406PlateWashingBackend, EL406ShakingBackend from .synergy_h1 import SynergyH1, SynergyH1Backend diff --git a/pylabrobot/agilent/biotek/aravis_camera.py b/pylabrobot/agilent/biotek/aravis_camera.py new file mode 100644 index 00000000000..26c29000f55 --- /dev/null +++ b/pylabrobot/agilent/biotek/aravis_camera.py @@ -0,0 +1,434 @@ +"""Standalone BlackFly camera driver using Aravis (GenICam/USB3 Vision). + +Layer: Camera driver (standalone, no PLR dependencies except numpy) +Adjacent layers: + - Above: CytationAravisDriver delegates camera operations here + - Below: Aravis library (via PyGObject) talks to camera via USB3 Vision/GenICam + +Aravis talks directly to the camera via the GenICam standard over USB3 Vision. +""" + +from __future__ import annotations + +import asyncio +import logging +from dataclasses import dataclass +from typing import Optional + +import numpy as np + +logger = logging.getLogger(__name__) + +# Aravis is an optional dependency. It requires: +# - System library: brew install aravis (macOS) or apt install libaravis-dev (Linux) +# - Python bindings: pip install PyGObject +# If not installed, AravisCamera.setup() will raise ImportError with instructions. +try: + import gi + + gi.require_version("Aravis", "0.8") + from gi.repository import Aravis # type: ignore[attr-defined] + + HAS_ARAVIS = True +except (ImportError, ValueError): + HAS_ARAVIS = False + Aravis = None # type: ignore[assignment] + +# Number of pre-allocated buffers for the Aravis stream. +# For single-frame software-triggered capture, 5 is more than sufficient. +_BUFFER_COUNT = 5 + + +@dataclass +class CameraInfo: + """Discovery result for a GenICam-compatible camera. + + Returned by AravisCamera.enumerate_cameras() and get_device_info(). + Contains identification and connection metadata — no hardware access needed + after discovery. + """ + + serial_number: str + model_name: str + vendor: str + firmware_version: str + connection_type: str # "USB3" for the Cytation 5 BlackFly + + +class AravisCamera: + """BlackFly camera driver using Aravis for GenICam access over USB3 Vision. + + This class wraps all Aravis/GenICam operations for single-frame image + acquisition with software triggering. + + GenICam primer for non-camera-experts: + GenICam is a standard that defines how camera features (exposure, gain, + trigger, etc.) are named and accessed. Every compliant camera publishes + an XML file describing its features as "nodes." Aravis reads this XML + and lets you get/set node values by name (e.g., "ExposureTime", "Gain"). + Aravis provides a Python API to access these nodes via PyGObject. + + Buffer management: + Aravis uses a producer-consumer model with pre-allocated buffers. + During setup(), we allocate a small pool of buffers and push them to + the stream. When we trigger a capture, Aravis fills a buffer with image + data. We pop the buffer, copy the data to a numpy array, and push the + buffer back to the pool for reuse. The copy is necessary because the + buffer memory is owned by Aravis and will be reused. + + Usage: + camera = AravisCamera() + await camera.setup("12345678") + await camera.set_exposure(10.0) # 10 ms + image = await camera.trigger() # numpy array, Mono8 + await camera.stop() + """ + + def __init__(self) -> None: + self._camera: Optional[object] = None # Aravis.Camera + self._device: Optional[object] = None # Aravis.Device + self._stream: Optional[object] = None # Aravis.Stream + self._serial_number: Optional[str] = None + self._acquiring: bool = False + self._width: int = 0 + self._height: int = 0 + self._payload_size: int = 0 + + @property + def width(self) -> int: + """Image width in pixels (read-only, from camera default).""" + return self._width + + @property + def height(self) -> int: + """Image height in pixels (read-only, from camera default).""" + return self._height + + async def setup(self, serial_number: Optional[str] = None) -> None: + """Connect to camera, configure software trigger, allocate buffers. + + The software trigger configuration: + TriggerSelector=FrameStart, TriggerSource=Software, TriggerMode=On. + + Args: + serial_number: Camera serial number (e.g., "12345678"). If None, uses + the first available camera. Use enumerate_cameras() to discover + available cameras. + + Raises: + ImportError: If Aravis/PyGObject is not installed. + RuntimeError: If camera cannot be found or connected. + """ + if not HAS_ARAVIS: + raise ImportError( + "Aravis is not installed. Install it with:\n" + " macOS: brew install aravis && pip install PyGObject\n" + " Linux: sudo apt-get install libaravis-dev gobject-introspection " + "&& pip install PyGObject\n" + " Windows: pacman -S mingw-w64-x86_64-aravis (in MSYS2)" + ) + + self._serial_number = serial_number + + Aravis.update_device_list() + n_devices = Aravis.get_n_devices() + + if n_devices == 0: + raise RuntimeError("No cameras found.") + + device_id_to_connect = None + if serial_number is None: + # Use the first available camera. + device_id_to_connect = Aravis.get_device_id(0) + else: + for i in range(n_devices): + dev_serial = Aravis.get_device_serial_nbr(i) or "" + dev_id = Aravis.get_device_id(i) or "" + if serial_number in (dev_serial, dev_id): + device_id_to_connect = dev_id + break + + if device_id_to_connect is None: + raise RuntimeError( + f"Camera with serial '{serial_number}' not found. " + f"Available devices: {[Aravis.get_device_id(i) for i in range(n_devices)]}" + ) + + try: + self._camera = Aravis.Camera.new(device_id_to_connect) + except Exception as e: + raise RuntimeError( + f"Failed to connect to camera '{device_id_to_connect}'. " + f"Is the camera in use by another process? Error: {e}" + ) from e + + self._device = self._camera.get_device() + + # Configure software trigger mode. + # GenICam nodes: TriggerSelector, TriggerSource, TriggerMode are + # standard SFNC (Standard Features Naming Convention) names. + self._device.set_string_feature_value("TriggerSelector", "FrameStart") + self._device.set_string_feature_value("TriggerSource", "Software") + self._device.set_string_feature_value("TriggerMode", "On") + + # Read image dimensions and payload size from camera. + self._width = self._camera.get_region()[2] # x, y, width, height + self._height = self._camera.get_region()[3] + self._payload_size = self._camera.get_payload() + + # BlackFly/Flea3 cameras need a delay after trigger mode change. + await asyncio.sleep(1) + + # Create stream and pre-allocate buffer pool. + # Aravis requires buffers to be pushed to the stream before acquisition. + # We allocate a small pool (5 buffers) — for single-frame software + # trigger, we only use one at a time but the pool prevents starvation. + self._stream = self._camera.create_stream(None, None) + for _ in range(_BUFFER_COUNT): + self._stream.push_buffer(Aravis.Buffer.new_allocate(self._payload_size)) + + logger.info( + "AravisCamera: Connected to %s (SN: %s), %dx%d", + self._device.get_string_feature_value("DeviceModelName"), + serial_number, + self._width, + self._height, + ) + + def start_acquisition(self) -> None: + """Begin camera acquisition (no-op if already acquiring). + + After this call, the + camera is ready to receive software triggers and produce image buffers. + """ + if self._camera is None: + raise RuntimeError("Camera is not initialized. Call setup() first.") + if self._acquiring: + return + self._camera.start_acquisition() + self._acquiring = True + + def stop_acquisition(self) -> None: + """End camera acquisition (no-op if not acquiring). + + Stop camera acquisition. + """ + if self._camera is None: + raise RuntimeError("Camera is not initialized. Call setup() first.") + if not self._acquiring: + return + self._camera.stop_acquisition() + self._acquiring = False + + async def trigger(self, timeout_ms: int = 5000) -> np.ndarray: + """Capture a single frame: software trigger → grab. + + Acquisition must already be started via start_acquisition(). + Start/stop acquisition bracket the capture loop, not each individual frame. + + Args: + timeout_ms: Maximum time to wait for image buffer, in milliseconds. + Default 5000 (5 seconds). + + Returns: + numpy.ndarray: Image as 2D uint8 array (height × width), Mono8 format. + + Raises: + RuntimeError: If camera not initialized, not acquiring, or times out. + """ + if self._camera is None: + raise RuntimeError("Camera is not initialized. Call setup() first.") + if not self._acquiring: + raise RuntimeError("Camera is not acquiring. Call start_acquisition() first.") + + # Send software trigger command. + self._device.execute_command("TriggerSoftware") + + # Pop the filled buffer from the stream. + # timeout_pop_buffer takes microseconds, so convert from ms. + buffer = self._stream.timeout_pop_buffer(timeout_ms * 1000) + if buffer is None: + raise RuntimeError( + f"Camera capture timed out after {timeout_ms}ms. " + "Is the camera connected and trigger mode configured?" + ) + + # Extract image data and copy to numpy array. + data = buffer.get_data() + image = np.frombuffer(data, dtype=np.uint8).reshape(self._height, self._width).copy() + + # Return buffer to pool for reuse. + self._stream.push_buffer(buffer) + + return image + + async def stop(self) -> None: + """Release camera and free all Aravis resources. + + Safe to call at any point — + stops acquisition if active, resets trigger mode, releases camera. + """ + try: + if self._acquiring and self._camera is not None: + self.stop_acquisition() + + if self._device is not None: + try: + self._device.set_string_feature_value("TriggerMode", "Off") + except Exception: + pass # Camera may already be disconnected + finally: + self._camera = None + self._device = None + self._stream = None + self._serial_number = None + self._acquiring = False + self._width = 0 + self._height = 0 + self._payload_size = 0 + + async def set_exposure(self, exposure_ms: float) -> None: + """Set exposure time in milliseconds. + + Disables auto-exposure first, then sets the GenICam ExposureTime node. + GenICam uses microseconds internally; this method handles the conversion. + + Args: + exposure_ms: Exposure time in milliseconds (e.g., 10.0 = 10 ms). + """ + if self._camera is None: + raise RuntimeError("Camera is not initialized. Call setup() first.") + # Disable auto-exposure before setting manual value. + self._device.set_string_feature_value("ExposureAuto", "Off") + # GenICam ExposureTime is in microseconds. + exposure_us = exposure_ms * 1000.0 + self._camera.set_exposure_time(exposure_us) + + async def get_exposure(self) -> float: + """Read current exposure time in milliseconds (from hardware, not cached).""" + if self._camera is None: + raise RuntimeError("Camera is not initialized. Call setup() first.") + exposure_us = self._camera.get_exposure_time() + return exposure_us / 1000.0 + + async def set_gain(self, gain: float) -> None: + """Set gain value. + + Disables auto-gain first, then sets the GenICam Gain node. + The gain range depends on the camera model (typically 0-30 for BlackFly). + + Args: + gain: Gain value (e.g., 1.0). + """ + if self._camera is None: + raise RuntimeError("Camera is not initialized. Call setup() first.") + self._device.set_string_feature_value("GainAuto", "Off") + self._camera.set_gain(gain) + + async def get_gain(self) -> float: + """Read current gain value (from hardware, not cached).""" + if self._camera is None: + raise RuntimeError("Camera is not initialized. Call setup() first.") + return self._camera.get_gain() + + async def set_auto_exposure(self, mode: str) -> None: + """Set auto-exposure mode. + + Args: + mode: One of "off", "once", "continuous". Maps to GenICam + ExposureAuto node values: Off, Once, Continuous. + """ + if self._camera is None: + raise RuntimeError("Camera is not initialized. Call setup() first.") + mode_map = {"off": "Off", "once": "Once", "continuous": "Continuous"} + aravis_mode = mode_map.get(mode.lower()) + if aravis_mode is None: + raise ValueError(f"Invalid auto-exposure mode '{mode}'. Use 'off', 'once', or 'continuous'.") + self._device.set_string_feature_value("ExposureAuto", aravis_mode) + + async def set_pixel_format(self, fmt: Optional[int] = None) -> None: + """Set pixel format. Default is Mono8. + + Must be called before start_acquisition(). The format value is an + Aravis pixel format constant (e.g., Aravis.PIXEL_FORMAT_MONO_8). + + Args: + fmt: Aravis pixel format constant. If None, uses Mono8. + """ + if self._camera is None: + raise RuntimeError("Camera is not initialized. Call setup() first.") + if fmt is None: + if HAS_ARAVIS: + fmt = Aravis.PIXEL_FORMAT_MONO_8 + else: + return + self._camera.set_pixel_format(fmt) + + def get_device_info(self) -> CameraInfo: + """Read camera identification from GenICam nodes. + + Returns model name, serial number, vendor, and firmware version + without requiring acquisition to be active. + + Returns: + CameraInfo with fields populated from the camera's GenICam XML. + """ + if self._device is None: + raise RuntimeError("Camera is not initialized. Call setup() first.") + return CameraInfo( + serial_number=self._device.get_string_feature_value("DeviceSerialNumber"), + model_name=self._device.get_string_feature_value("DeviceModelName"), + vendor=self._device.get_string_feature_value("DeviceVendorName"), + firmware_version=self._device.get_string_feature_value("DeviceFirmwareVersion"), + connection_type="USB3", + ) + + @staticmethod + def enumerate_cameras() -> list[CameraInfo]: + """List all connected GenICam-compatible cameras. + + Uses Aravis device enumeration — finds cameras across USB3 Vision + and GigE Vision transports (though this driver targets USB3 only). + + Returns: + List of CameraInfo for each detected camera. Empty list if none + found (not an error — matches PLR's graceful enumeration pattern). + + Raises: + ImportError: If Aravis/PyGObject is not installed. + """ + if not HAS_ARAVIS: + raise ImportError( + "Aravis is not installed. Install it with:\n" + " macOS: brew install aravis && pip install PyGObject\n" + " Linux: sudo apt-get install libaravis-dev gobject-introspection " + "&& pip install PyGObject\n" + " Windows: pacman -S mingw-w64-x86_64-aravis (in MSYS2)" + ) + + Aravis.update_device_list() + n_devices = Aravis.get_n_devices() + cameras: list[CameraInfo] = [] + + for i in range(n_devices): + # Parse info from device ID string without opening the camera. + # Opening the camera here would lock the USB device and prevent + # a subsequent setup() from connecting (GObject doesn't release + # the USB handle reliably on del). + device_id = Aravis.get_device_id(i) + # device_id format varies: "USB3Vision-vendor-model-serial" or similar + serial = Aravis.get_device_serial_nbr(i) or device_id + model = Aravis.get_device_model(i) or "Unknown" + vendor = Aravis.get_device_vendor(i) or "Unknown" + protocol = Aravis.get_device_protocol(i) or "USB3" + + info = CameraInfo( + serial_number=serial, + model_name=model, + vendor=vendor, + firmware_version="", # Not available without opening camera + connection_type=protocol, + ) + cameras.append(info) + + return cameras diff --git a/pylabrobot/agilent/biotek/aravis_simulated.py b/pylabrobot/agilent/biotek/aravis_simulated.py new file mode 100644 index 00000000000..dab190000d5 --- /dev/null +++ b/pylabrobot/agilent/biotek/aravis_simulated.py @@ -0,0 +1,213 @@ +"""Simulated BlackFly camera for testing without hardware. + +Layer: Simulated camera driver (no native dependencies) +Role: Drop-in replacement for AravisCamera that returns synthetic images. +Adjacent layers: + - Above: CytationAravisBackend or test code uses this instead of AravisCamera + - Below: No hardware — generates images from noise + parameters + +This module provides a simulated camera that implements the same public API +as AravisCamera but requires no Aravis library, no GObject introspection, +and no physical camera. It is used for: + - Unit and integration tests (pytest in CI) + - Development without hardware access + - Demonstrating the camera API + +The simulated images have brightness that scales with exposure and gain, +giving a rough visual indication that parameters are being applied. This +is NOT a physics-accurate simulation — it's enough to verify the API works. + +Architecture label: **[Proposed]** — Simulated backend for Aravis driver. +""" + +from __future__ import annotations + +import logging +from typing import Optional + +import numpy as np + +from .aravis_camera import CameraInfo + +logger = logging.getLogger(__name__) + + +class AravisSimulated: + """Simulated BlackFly camera — same API as AravisCamera, no native deps. + + Returns synthetic Mono8 images with brightness proportional to exposure + and gain settings. All parameter operations store and return values + without hardware access. + + Constitution Principle VII (Simulator Parity): This class implements the + same interface as AravisCamera so it can be used as a drop-in replacement + in tests and development environments. + + Usage: + camera = AravisSimulated(width=720, height=540) + await camera.setup("SIM-001") + await camera.set_exposure(10.0) + image = await camera.trigger() # synthetic noise image + await camera.stop() + """ + + def __init__(self, width: int = 720, height: int = 540) -> None: + self._default_width = width + self._default_height = height + self._width: int = 0 + self._height: int = 0 + self._serial_number: Optional[str] = None + self._exposure_ms: float = 10.0 + self._gain: float = 1.0 + self._auto_exposure: str = "off" + self._acquiring: bool = False + self._setup_done: bool = False + + @property + def width(self) -> int: + """Image width in pixels (read-only).""" + return self._width + + @property + def height(self) -> int: + """Image height in pixels (read-only).""" + return self._height + + async def setup(self, serial_number: str) -> None: + """Simulate camera connection. + + Args: + serial_number: Any string — stored for get_device_info(). + """ + self._serial_number = serial_number + self._width = self._default_width + self._height = self._default_height + self._exposure_ms = 10.0 + self._gain = 1.0 + self._auto_exposure = "off" + self._acquiring = False + self._setup_done = True + logger.info( + "AravisSimulated: Connected to simulated camera (SN: %s), %dx%d", + serial_number, + self._width, + self._height, + ) + + def start_acquisition(self) -> None: + """Simulate starting acquisition.""" + if not self._setup_done: + raise RuntimeError("Camera is not initialized. Call setup() first.") + if self._acquiring: + return + self._acquiring = True + + def stop_acquisition(self) -> None: + """Simulate stopping acquisition.""" + if not self._setup_done: + raise RuntimeError("Camera is not initialized. Call setup() first.") + if not self._acquiring: + return + self._acquiring = False + + async def trigger(self, timeout_ms: int = 5000) -> np.ndarray: + """Return a synthetic Mono8 image with brightness scaled by exposure and gain. + + Brightness formula: base = min(255, exposure_ms * 2.5 * (1 + gain / 10)) + Gaussian noise with sigma=10 is added, then clipped to uint8 range. + + Args: + timeout_ms: Ignored in simulation. + + Returns: + numpy.ndarray: Synthetic 2D uint8 array (height × width). + """ + if not self._setup_done: + raise RuntimeError("Camera is not initialized. Call setup() first.") + + self.start_acquisition() + + # Base brightness scales with exposure and gain. + base = min(255.0, self._exposure_ms * 2.5 * (1.0 + self._gain / 10.0)) + noise = np.random.normal(loc=base, scale=10.0, size=(self._height, self._width)) + image = np.clip(noise, 0, 255).astype(np.uint8) + + self.stop_acquisition() + return image + + async def stop(self) -> None: + """Simulate camera release.""" + if self._acquiring: + self.stop_acquisition() + self._serial_number = None + self._width = 0 + self._height = 0 + self._acquiring = False + self._setup_done = False + + async def set_exposure(self, exposure_ms: float) -> None: + """Store exposure value (milliseconds).""" + if not self._setup_done: + raise RuntimeError("Camera is not initialized. Call setup() first.") + self._auto_exposure = "off" + self._exposure_ms = exposure_ms + + async def get_exposure(self) -> float: + """Return stored exposure value (milliseconds).""" + if not self._setup_done: + raise RuntimeError("Camera is not initialized. Call setup() first.") + return self._exposure_ms + + async def set_gain(self, gain: float) -> None: + """Store gain value.""" + if not self._setup_done: + raise RuntimeError("Camera is not initialized. Call setup() first.") + self._gain = gain + + async def get_gain(self) -> float: + """Return stored gain value.""" + if not self._setup_done: + raise RuntimeError("Camera is not initialized. Call setup() first.") + return self._gain + + async def set_auto_exposure(self, mode: str) -> None: + """Store auto-exposure mode ("off", "once", "continuous").""" + if not self._setup_done: + raise RuntimeError("Camera is not initialized. Call setup() first.") + mode_lower = mode.lower() + if mode_lower not in ("off", "once", "continuous"): + raise ValueError( + f"Invalid auto-exposure mode '{mode}'. " + "Use 'off', 'once', or 'continuous'." + ) + self._auto_exposure = mode_lower + + async def set_pixel_format(self, fmt: Optional[int] = None) -> None: + """No-op in simulation — always returns Mono8.""" + if not self._setup_done: + raise RuntimeError("Camera is not initialized. Call setup() first.") + + def get_device_info(self) -> CameraInfo: + """Return simulated camera info.""" + if not self._setup_done: + raise RuntimeError("Camera is not initialized. Call setup() first.") + return CameraInfo( + serial_number=self._serial_number or "SIM-000", + model_name="Simulated BlackFly S", + vendor="Simulated", + firmware_version="1.0.0", + connection_type="USB3", + ) + + @staticmethod + def enumerate_cameras() -> list[CameraInfo]: + """Return a single simulated camera.""" + return [ + CameraInfo( + serial_number="SIM-001", + model_name="Simulated BlackFly S", + vendor="Simulated", + firmware_version="1.0.0", + connection_type="USB3", + ) + ] diff --git a/pylabrobot/agilent/biotek/cytation.py b/pylabrobot/agilent/biotek/cytation.py index a24fb5151cb..e0506678541 100644 --- a/pylabrobot/agilent/biotek/cytation.py +++ b/pylabrobot/agilent/biotek/cytation.py @@ -1,5 +1,4 @@ import asyncio -import atexit import logging import math import re @@ -10,7 +9,6 @@ from pylabrobot.agilent.biotek.biotek import BioTekBackend from pylabrobot.capabilities.capability import BackendParams from pylabrobot.capabilities.microscopy import ( - Microscopy, MicroscopyBackend, ) from pylabrobot.capabilities.microscopy.standard import ( @@ -22,27 +20,10 @@ ImagingResult, Objective, ) -from pylabrobot.capabilities.plate_reading.absorbance import Absorbance -from pylabrobot.capabilities.plate_reading.fluorescence import Fluorescence -from pylabrobot.capabilities.plate_reading.luminescence import Luminescence -from pylabrobot.capabilities.temperature_controlling import TemperatureController -from pylabrobot.device import Device -from pylabrobot.resources import Coordinate, Plate, PlateHolder, Resource +from pylabrobot.resources import Plate from pylabrobot.serializer import SerializableMixin -try: - import PySpin # type: ignore - - USE_PYSPIN = True -except ImportError as e: - USE_PYSPIN = False - _PYSPIN_IMPORT_ERROR = e - -SPINNAKER_COLOR_PROCESSING_ALGORITHM_HQ_LINEAR = ( - PySpin.SPINNAKER_COLOR_PROCESSING_ALGORITHM_HQ_LINEAR if USE_PYSPIN else -1 -) -PixelFormat_Mono8 = PySpin.PixelFormat_Mono8 if USE_PYSPIN else -1 -SpinnakerException = PySpin.SpinnakerException if USE_PYSPIN else Exception +from .aravis_camera import AravisCamera logger = logging.getLogger(__name__) @@ -55,21 +36,6 @@ class CytationImagingConfig: filters: Optional[List[Optional[ImagingMode]]] = None -def retry(func, *args, **kwargs): - max_tries = 10 - delay = 0.1 - tries = 0 - while True: - try: - return func(*args, **kwargs) - except SpinnakerException as ex: - tries += 1 - if tries >= max_tries: - raise RuntimeError(f"Failed after {max_tries} tries") from ex - logger.warning("Retry %d/%d failed: %s", tries, max_tries, str(ex)) - time.sleep(delay) - - # --------------------------------------------------------------------------- # Backend # --------------------------------------------------------------------------- @@ -88,8 +54,7 @@ def __init__( timeout=timeout, device_id=device_id, human_readable_device_name="Agilent BioTek Cytation" ) - self._spinnaker_system: Optional["PySpin.SystemPtr"] = None - self._cam: Optional["PySpin.CameraPtr"] = None + self.camera = AravisCamera() self.imaging_config = imaging_config or CytationImagingConfig() self._filters: Optional[List[Optional[ImagingMode]]] = self.imaging_config.filters self._objectives: Optional[List[Optional[Objective]]] = self.imaging_config.objectives @@ -109,7 +74,7 @@ class SetupParams(BackendParams): """Cytation-specific parameters for ``setup``. Args: - use_cam: If True, initialize the Spinnaker camera during setup. + use_cam: If True, initialize the Aravis camera during setup. """ use_cam: bool = False @@ -123,27 +88,28 @@ async def setup(self, backend_params: Optional[BackendParams] = None) -> None: await super().setup() if backend_params.use_cam: - try: - await self._set_up_camera() - except: - try: - await self.stop() - except Exception: - pass - raise + serial = self.imaging_config.camera_serial_number if self.imaging_config else None + await self.camera.setup(serial_number=serial) + logger.info("Camera connected: %s", self.camera.get_device_info()) - async def stop(self): - await super().stop() + if self._filters is None: + await self._load_filters() + if self._objectives is None: + await self._load_objectives() + async def stop(self): if self._acquiring: self.stop_acquisition() + try: + await self.camera.stop() + except Exception: + logger.exception("Error stopping camera") + logger.info(f"{self.__class__.__name__} stopping") await self.stop_shaking() await self.io.stop() - self._stop_camera() - self._objectives = None self._filters = None self._slow_mode = None @@ -168,109 +134,6 @@ def supports_heating(self): def supports_cooling(self): return True - async def _set_up_camera(self) -> None: - atexit.register(self._stop_camera) - - if not USE_PYSPIN: - raise RuntimeError( - "PySpin is not installed. Please follow the imaging setup instructions. " - f"Import error: {_PYSPIN_IMPORT_ERROR}" - ) - if self.imaging_config is None: - raise RuntimeError("Imaging configuration is not set.") - - logger.debug(f"{self.__class__.__name__} setting up camera") - - self._spinnaker_system = PySpin.System.GetInstance() - version = self._spinnaker_system.GetLibraryVersion() - logger.debug( - f"{self.__class__.__name__} Library version: %d.%d.%d.%d", - version.major, - version.minor, - version.type, - version.build, - ) - - cam_list = self._spinnaker_system.GetCameras() - num_cameras = cam_list.GetSize() - logger.debug(f"{self.__class__.__name__} number of cameras detected: %d", num_cameras) - - for cam in cam_list: - info = self._get_device_info(cam) - serial_number = info["DeviceSerialNumber"] - logger.debug(f"{self.__class__.__name__} camera detected: %s", serial_number) - - if ( - self.imaging_config.camera_serial_number is not None - and serial_number == self.imaging_config.camera_serial_number - ): - self._cam = cam - logger.info(f"{self.__class__.__name__} using camera with serial number %s", serial_number) - break - else: - if num_cameras > 0: - self._cam = cam_list.GetByIndex(0) - logger.info( - f"{self.__class__.__name__} using first camera with serial number %s", - info["DeviceSerialNumber"], - ) - else: - logger.error(f"{self.__class__.__name__}: No cameras found") - self._cam = None - cam_list.Clear() - - if self._cam is None: - raise RuntimeError( - f"{self.__class__.__name__}: No camera found. Make sure the camera is connected and the " - "serial number is correct." - ) - - for _ in range(10): - try: - self._cam.Init() - break - except: # noqa - await asyncio.sleep(0.1) - else: - raise RuntimeError( - "Failed to initialize camera. Make sure the camera is connected and the " - "Spinnaker SDK is installed correctly." - ) - nodemap = self._cam.GetNodeMap() - - # Configure software trigger - ptr_trigger_selector = PySpin.CEnumerationPtr(nodemap.GetNode("TriggerSelector")) - if not PySpin.IsReadable(ptr_trigger_selector) or not PySpin.IsWritable(ptr_trigger_selector): - raise RuntimeError("unable to configure TriggerSelector") - ptr_frame_start = PySpin.CEnumEntryPtr(ptr_trigger_selector.GetEntryByName("FrameStart")) - if not PySpin.IsReadable(ptr_frame_start): - raise RuntimeError("unable to configure TriggerSelector (can't read FrameStart)") - ptr_trigger_selector.SetIntValue(int(ptr_frame_start.GetNumericValue())) - - ptr_trigger_source = PySpin.CEnumerationPtr(nodemap.GetNode("TriggerSource")) - if not PySpin.IsReadable(ptr_trigger_source) or not PySpin.IsWritable(ptr_trigger_source): - raise RuntimeError("unable to configure TriggerSource") - ptr_inference_ready = PySpin.CEnumEntryPtr(ptr_trigger_source.GetEntryByName("Software")) - if not PySpin.IsReadable(ptr_inference_ready): - raise RuntimeError("unable to configure TriggerSource (can't read Software)") - ptr_trigger_source.SetIntValue(int(ptr_inference_ready.GetNumericValue())) - - ptr_trigger_mode = PySpin.CEnumerationPtr(nodemap.GetNode("TriggerMode")) - if not PySpin.IsReadable(ptr_trigger_mode) or not PySpin.IsWritable(ptr_trigger_mode): - raise RuntimeError("unable to configure TriggerMode") - ptr_trigger_on = PySpin.CEnumEntryPtr(ptr_trigger_mode.GetEntryByName("On")) - if not PySpin.IsReadable(ptr_trigger_on): - raise RuntimeError("unable to query TriggerMode On") - ptr_trigger_mode.SetIntValue(int(ptr_trigger_on.GetNumericValue())) - - # Blackfly/Flea3 GEV cameras need 1 second delay after trigger mode on - await asyncio.sleep(1) - - if self._filters is None: - await self._load_filters() - if self._objectives is None: - await self._load_objectives() - @property def objectives(self) -> List[Optional[Objective]]: if self._objectives is None: @@ -417,70 +280,18 @@ async def _load_objectives(self): else: raise RuntimeError(f"{self.__class__.__name__}: Unsupported version: {self.version}") - def _stop_camera(self) -> None: - if self._cam is not None: - if self._acquiring: - self.stop_acquisition() - self._reset_trigger() - self._cam.DeInit() - self._cam = None - if self._spinnaker_system is not None: - self._spinnaker_system.ReleaseInstance() - - def _reset_trigger(self): - if self._cam is None: - return - try: - nodemap = self._cam.GetNodeMap() - node_trigger_mode = PySpin.CEnumerationPtr(nodemap.GetNode("TriggerMode")) - if not PySpin.IsReadable(node_trigger_mode) or not PySpin.IsWritable(node_trigger_mode): - return - node_trigger_mode_off = node_trigger_mode.GetEntryByName("Off") - if not PySpin.IsReadable(node_trigger_mode_off): - return - node_trigger_mode.SetIntValue(node_trigger_mode_off.GetValue()) - except PySpin.SpinnakerException: - pass - - def _get_device_info(self, cam): - device_info = {} - nodemap = cam.GetTLDeviceNodeMap() - node_device_information = PySpin.CCategoryPtr(nodemap.GetNode("DeviceInformation")) - if not PySpin.IsReadable(node_device_information): - raise RuntimeError("Device control information not readable.") - features = node_device_information.GetFeatures() - for feature in features: - node_feature = PySpin.CValuePtr(feature) - node_feature_name = node_feature.GetName() - try: - node_feature_value = node_feature.ToString() if PySpin.IsReadable(node_feature) else None - except Exception as e: - raise RuntimeError( - f"Got an error while reading feature {node_feature_name}. " - "Is the cytation in use by another notebook? " - f"Error: {str(e)}" - ) from e - device_info[node_feature_name] = node_feature_value - return device_info - async def close(self, plate: Optional[Plate] = None, slow: bool = False): await super().close(plate, slow) self._clear_imaging_state() def start_acquisition(self): - if self._cam is None: - raise RuntimeError(f"{self.__class__.__name__}: Camera is not initialized.") - if self._acquiring: - return - retry(self._cam.BeginAcquisition) + self.camera.start_acquisition() self._acquiring = True def stop_acquisition(self): - if self._cam is None: - raise RuntimeError(f"{self.__class__.__name__}: Camera is not initialized.") if not self._acquiring: return - retry(self._cam.EndAcquisition) + self.camera.stop_acquisition() self._acquiring = False async def led_on(self, intensity: int = 10): @@ -555,47 +366,21 @@ async def set_position(self, x: float, y: float): await asyncio.sleep(0.1) async def set_auto_exposure(self, auto_exposure: Literal["off", "once", "continuous"]): - if self._cam is None: - raise ValueError("Camera not initialized. Run setup(use_cam=True) first.") - - if self._cam.ExposureAuto.GetAccessMode() != PySpin.RW: - raise RuntimeError("unable to write ExposureAuto") - - retry( - self._cam.ExposureAuto.SetValue, - { - "off": PySpin.ExposureAuto_Off, - "once": PySpin.ExposureAuto_Once, - "continuous": PySpin.ExposureAuto_Continuous, - }[auto_exposure], - ) + await self.camera.set_auto_exposure(auto_exposure) async def set_exposure(self, exposure: Exposure): if exposure == self._exposure: logger.debug("Exposure time is already set to %s", exposure) return - if self._cam is None: - raise ValueError("Camera not initialized. Run setup(use_cam=True) first.") - if isinstance(exposure, str): if exposure == "machine-auto": await self.set_auto_exposure("continuous") self._exposure = "machine-auto" return raise ValueError("exposure must be a number or 'auto'") - retry(self._cam.ExposureAuto.SetValue, PySpin.ExposureAuto_Off) - - if self._cam.ExposureTime.GetAccessMode() != PySpin.RW: - raise RuntimeError("unable to write ExposureTime") - exposure_us = int(exposure * 1000) - min_et = retry(self._cam.ExposureTime.GetMin) - if exposure_us < min_et: - raise ValueError(f"exposure must be >= {min_et}") - max_et = retry(self._cam.ExposureTime.GetMax) - if exposure_us > max_et: - raise ValueError(f"exposure must be <= {max_et}") - retry(self._cam.ExposureTime.SetValue, exposure_us) + await self.camera.set_auto_exposure("off") + await self.camera.set_exposure(float(exposure)) self._exposure = exposure async def select(self, row: int, column: int): @@ -609,45 +394,12 @@ async def select(self, row: int, column: int): await self.set_position(0, 0) async def set_gain(self, gain: Gain): - if self._cam is None: - raise ValueError("Camera not initialized. Run setup(use_cam=True) first.") - if gain == self._gain: logger.debug("Gain is already set to %s", gain) return - if not (gain == "machine-auto" or 0 <= gain <= 30): - raise ValueError("gain must be between 0 and 30 (inclusive), or 'auto'") - - nodemap = self._cam.GetNodeMap() - - node_gain_auto = PySpin.CEnumerationPtr(nodemap.GetNode("GainAuto")) - if not PySpin.IsReadable(node_gain_auto) or not PySpin.IsWritable(node_gain_auto): - raise RuntimeError("unable to set automatic gain") - node = ( - PySpin.CEnumEntryPtr(node_gain_auto.GetEntryByName("Continuous")) - if gain == "machine-auto" - else PySpin.CEnumEntryPtr(node_gain_auto.GetEntryByName("Off")) - ) - if not PySpin.IsReadable(node): - raise RuntimeError("unable to set automatic gain (enum entry retrieval)") - node_gain_auto.SetIntValue(node.GetValue()) - - if not gain == "machine-auto": - node_gain = PySpin.CFloatPtr(nodemap.GetNode("Gain")) - if ( - not PySpin.IsReadable(node_gain) - or not PySpin.IsWritable(node_gain) - or node_gain.GetMax() == 0 - ): - raise RuntimeError("unable to set gain") - min_gain = node_gain.GetMin() - if gain < min_gain: - raise ValueError(f"gain must be >= {min_gain}") - max_gain = node_gain.GetMax() - if gain > max_gain: - raise ValueError(f"gain must be <= {max_gain}") - node_gain.SetValue(gain) + if gain != "machine-auto": + await self.camera.set_gain(float(gain)) self._gain = gain @@ -678,9 +430,6 @@ async def set_objective(self, objective: Objective): self._imaging_mode = None async def set_imaging_mode(self, mode: ImagingMode, led_intensity: int): - if self._cam is None: - raise ValueError("Camera not initialized. Run setup(use_cam=True) first.") - if mode == self._imaging_mode: logger.debug("Imaging mode is already set to %s", mode) await self.led_on(intensity=led_intensity) @@ -722,43 +471,19 @@ async def set_imaging_mode(self, mode: ImagingMode, led_intensity: int): self._imaging_mode = mode await self.led_on(intensity=led_intensity) - async def _acquire_image( - self, - color_processing_algorithm: int = SPINNAKER_COLOR_PROCESSING_ALGORITHM_HQ_LINEAR, - pixel_format: int = PixelFormat_Mono8, - ) -> Image: - assert self._cam is not None - nodemap = self._cam.GetNodeMap() + async def _acquire_image(self) -> Image: assert self.imaging_config is not None - - num_tries = 0 - while num_tries < self.imaging_config.max_image_read_attempts: - node_softwaretrigger_cmd = PySpin.CCommandPtr(nodemap.GetNode("TriggerSoftware")) - if not PySpin.IsWritable(node_softwaretrigger_cmd): - raise RuntimeError("unable to execute software trigger") - + for attempt in range(self.imaging_config.max_image_read_attempts): try: - node_softwaretrigger_cmd.Execute() - timeout = int(self._cam.ExposureTime.GetValue() / 1000 + 1000) - image_result = self._cam.GetNextImage(timeout) - if not image_result.IsIncomplete(): - processor = PySpin.ImageProcessor() - processor.SetColorProcessing(color_processing_algorithm) - image_converted = processor.Convert(image_result, pixel_format) - image_result.Release() - return image_converted.GetNDArray() # type: ignore - except SpinnakerException as e: - logger.warning("Failed to get image: %s", e) - self.stop_acquisition() - self.start_acquisition() - if "[-1011]" in str(e): - logger.warning( - "[-1011] error might occur when the camera is plugged into a USB hub " - "that does not have enough throughput." - ) - - num_tries += 1 - await asyncio.sleep(0.3) + return await self.camera.trigger(timeout_ms=5000) + except Exception: + if attempt < self.imaging_config.max_image_read_attempts - 1: + logger.warning("Failed to get image, retrying...") + self.stop_acquisition() + self.start_acquisition() + await asyncio.sleep(0.3) + else: + raise raise TimeoutError("max_image_read_attempts reached") @dataclass @@ -774,9 +499,6 @@ class CaptureParams(BackendParams): to the well center. If None, centers on the well. Default None. overlap: Fractional overlap between tiles (0.0-1.0) for montage stitching. If None, no overlap. Only used when coverage produces multiple tiles. - color_processing_algorithm: Spinnaker SDK color processing algorithm constant. - Default ``SPINNAKER_COLOR_PROCESSING_ALGORITHM_HQ_LINEAR``. - pixel_format: Spinnaker SDK pixel format constant. Default ``PixelFormat_Mono8``. auto_stop_acquisition: Whether to automatically stop image acquisition after capture. Default True. """ @@ -785,8 +507,6 @@ class CaptureParams(BackendParams): coverage: Union[Literal["full"], Tuple[int, int]] = (1, 1) center_position: Optional[Tuple[float, float]] = None overlap: Optional[float] = None - color_processing_algorithm: int = SPINNAKER_COLOR_PROCESSING_ALGORITHM_HQ_LINEAR - pixel_format: int = PixelFormat_Mono8 auto_stop_acquisition: bool = True async def capture( @@ -808,15 +528,10 @@ async def capture( coverage = backend_params.coverage center_position = backend_params.center_position overlap = backend_params.overlap - color_processing_algorithm = backend_params.color_processing_algorithm - pixel_format = backend_params.pixel_format auto_stop_acquisition = backend_params.auto_stop_acquisition assert overlap is None, "not implemented yet" - if self._cam is None: - raise ValueError("Camera not initialized. Run setup(use_cam=True) first.") - await self.set_plate(plate) if not self._acquiring: @@ -865,11 +580,7 @@ def image_size(magnification: float) -> Tuple[float, float]: for x_pos, y_pos in positions: await self.set_position(x=x_pos, y=y_pos) t0 = time.time() - images.append( - await self._acquire_image( - color_processing_algorithm=color_processing_algorithm, pixel_format=pixel_format - ) - ) + images.append(await self._acquire_image()) t1 = time.time() logger.debug("[cytation] acquired image in %.2f seconds", t1 - t0) finally: @@ -877,7 +588,7 @@ def image_size(magnification: float) -> Tuple[float, float]: if auto_stop_acquisition: self.stop_acquisition() - exposure_ms = float(self._cam.ExposureTime.GetValue()) / 1000 + exposure_ms = await self.camera.get_exposure() assert self._focal_height is not None focal_height_val = float(self._focal_height) @@ -885,107 +596,8 @@ def image_size(magnification: float) -> Tuple[float, float]: # --------------------------------------------------------------------------- -# Devices +# Backwards compatibility — device classes moved to cytation5.py / cytation1.py # --------------------------------------------------------------------------- - -class Cytation5(Resource, Device): - """Agilent BioTek Cytation 5 — plate reader + imager.""" - - def __init__( - self, - name: str, - device_id: Optional[str] = None, - imaging_config: Optional[CytationImagingConfig] = None, - size_x: float = 0.0, # TODO: measure - size_y: float = 0.0, # TODO: measure - size_z: float = 0.0, # TODO: measure - ): - backend = CytationBackend(device_id=device_id, imaging_config=imaging_config) - Resource.__init__( - self, - name=name, - size_x=size_x, - size_y=size_y, - size_z=size_z, - model="Agilent BioTek Cytation 5", - ) - Device.__init__(self, driver=backend) - self.driver: CytationBackend = backend - self.absorbance = Absorbance(backend=backend) - self.luminescence = Luminescence(backend=backend) - self.fluorescence = Fluorescence(backend=backend) - self.microscopy = Microscopy(backend=backend) - self.temperature = TemperatureController(backend=backend) - self._capabilities = [ - self.absorbance, - self.luminescence, - self.fluorescence, - self.microscopy, - self.temperature, - ] - - self.plate_holder = PlateHolder( - name=name + "_plate_holder", - size_x=127.76, - size_y=85.48, - size_z=0, # TODO: measure - pedestal_size_z=0, - child_location=Coordinate.zero(), # TODO: measure - ) - self.assign_child_resource(self.plate_holder, location=Coordinate.zero()) - - def serialize(self) -> dict: - return {**Resource.serialize(self), **Device.serialize(self)} - - async def open(self, slow: bool = False) -> None: - await self.driver.open(slow=slow) - - async def close(self, slow: bool = False) -> None: - await self.driver.close(slow=slow) - - -class Cytation1(Resource, Device): - """Agilent BioTek Cytation 1 — plate reader only (no imager).""" - - def __init__( - self, - name: str, - device_id: Optional[str] = None, - size_x: float = 0.0, # TODO: measure - size_y: float = 0.0, # TODO: measure - size_z: float = 0.0, # TODO: measure - ): - backend = BioTekBackend(device_id=device_id) - Resource.__init__( - self, - name=name, - size_x=size_x, - size_y=size_y, - size_z=size_z, - model="Agilent BioTek Cytation 1", - ) - Device.__init__(self, driver=backend) - self.driver: BioTekBackend = backend - self.microscopy = Microscopy(backend=backend) # type: ignore[arg-type] - self.temperature = TemperatureController(backend=backend) - self._capabilities = [self.microscopy, self.temperature] - - self.plate_holder = PlateHolder( - name=name + "_plate_holder", - size_x=127.76, - size_y=85.48, - size_z=0, # TODO: measure - pedestal_size_z=0, - child_location=Coordinate.zero(), # TODO: measure - ) - self.assign_child_resource(self.plate_holder, location=Coordinate.zero()) - - def serialize(self) -> dict: - return {**Resource.serialize(self), **Device.serialize(self)} - - async def open(self, slow: bool = False) -> None: - await self.driver.open(slow=slow) - - async def close(self, slow: bool = False) -> None: - await self.driver.close(slow=slow) +from .cytation1 import Cytation1 as Cytation1 # noqa: E402, F401 +from .cytation5 import Cytation5 as Cytation5 # noqa: E402, F401 diff --git a/pylabrobot/agilent/biotek/cytation1.py b/pylabrobot/agilent/biotek/cytation1.py new file mode 100644 index 00000000000..954ef214580 --- /dev/null +++ b/pylabrobot/agilent/biotek/cytation1.py @@ -0,0 +1,104 @@ +"""Cytation 1 device — imager with temperature control. + +Follows the STAR pattern: device creates driver internally, +setup() wires capabilities. + +Example:: + + cytation = Cytation1(name="cytation1", camera_serial="22580842") + + await cytation.setup() + result = await cytation.microscopy.capture(...) + await cytation.stop() +""" + +from __future__ import annotations + +import logging +from typing import Optional + +from pylabrobot.capabilities.microscopy import Microscopy +from pylabrobot.capabilities.temperature_controlling import TemperatureController +from pylabrobot.device import Device +from pylabrobot.resources import Coordinate, PlateHolder, Resource + +logger = logging.getLogger(__name__) + + +class Cytation1(Resource, Device): + """Agilent BioTek Cytation 1 — imager with temperature control. + + Uses CytationAravisDriver (Aravis/GenICam) for camera access. + + Capabilities: + - microscopy (imaging) + - temperature (incubation) + """ + + def __init__( + self, + name: str, + camera_serial: Optional[str] = None, + device_id: Optional[str] = None, + size_x: float = 0.0, + size_y: float = 0.0, + size_z: float = 0.0, + ): + from .cytation_aravis_driver import CytationAravisDriver + + driver = CytationAravisDriver( + camera_serial=camera_serial, + device_id=device_id, + ) + + Resource.__init__( + self, + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + model="Agilent BioTek Cytation 1", + ) + Device.__init__(self, driver=driver) + self.driver = driver + + self.microscopy: Microscopy # set in setup() + self.temperature: TemperatureController # set in setup() + + self.plate_holder = PlateHolder( + name=name + "_plate_holder", + size_x=127.76, + size_y=85.48, + size_z=0, + pedestal_size_z=0, + child_location=Coordinate.zero(), + ) + self.assign_child_resource(self.plate_holder, location=Coordinate.zero()) + + async def setup(self) -> None: + await self.driver.setup() + self.microscopy = Microscopy(backend=self.driver.microscopy_backend) + + self.temperature = TemperatureController(backend=self.driver) + self._capabilities = [self.microscopy, self.temperature] + + for cap in self._capabilities: + await cap._on_setup() + self._setup_finished = True + logger.info("Cytation1 setup complete") + + async def stop(self) -> None: + for cap in reversed(self._capabilities): + await cap._on_stop() + await self.driver.stop() + self._setup_finished = False + logger.info("Cytation1 stopped") + + def serialize(self) -> dict: + return {**Resource.serialize(self), **Device.serialize(self)} + + async def open(self, slow: bool = False) -> None: + await self.driver.open(slow=slow) + + async def close(self, slow: bool = False) -> None: + await self.driver.close(slow=slow) diff --git a/pylabrobot/agilent/biotek/cytation5.py b/pylabrobot/agilent/biotek/cytation5.py new file mode 100644 index 00000000000..e59c165b301 --- /dev/null +++ b/pylabrobot/agilent/biotek/cytation5.py @@ -0,0 +1,123 @@ +"""Cytation 5 device — plate reader + imager. + +Follows the STAR pattern: device creates driver internally, +setup() wires capabilities. + +Example:: + + cytation = Cytation5(name="cytation5", camera_serial="22580842") + + await cytation.setup() + result = await cytation.microscopy.capture(...) + await cytation.stop() +""" + +from __future__ import annotations + +import logging +from typing import Optional + +from pylabrobot.capabilities.microscopy import Microscopy +from pylabrobot.capabilities.plate_reading.absorbance import Absorbance +from pylabrobot.capabilities.plate_reading.fluorescence import Fluorescence +from pylabrobot.capabilities.plate_reading.luminescence import Luminescence +from pylabrobot.capabilities.temperature_controlling import TemperatureController +from pylabrobot.device import Device +from pylabrobot.resources import Coordinate, PlateHolder, Resource + +logger = logging.getLogger(__name__) + + +class Cytation5(Resource, Device): + """Agilent BioTek Cytation 5 — plate reader + imager. + + Uses CytationAravisDriver (Aravis/GenICam) for camera access. + + Capabilities: + - absorbance, fluorescence, luminescence (plate reading) + - microscopy (imaging) + - temperature (incubation) + """ + + def __init__( + self, + name: str, + camera_serial: Optional[str] = None, + device_id: Optional[str] = None, + size_x: float = 0.0, + size_y: float = 0.0, + size_z: float = 0.0, + ): + from .cytation_aravis_driver import CytationAravisDriver + + driver = CytationAravisDriver( + camera_serial=camera_serial, + device_id=device_id, + ) + + Resource.__init__( + self, + name=name, + size_x=size_x, + size_y=size_y, + size_z=size_z, + model="Agilent BioTek Cytation 5", + ) + Device.__init__(self, driver=driver) + self.driver = driver + + self.absorbance: Absorbance # set in setup() + self.luminescence: Luminescence # set in setup() + self.fluorescence: Fluorescence # set in setup() + self.microscopy: Microscopy # set in setup() + self.temperature: TemperatureController # set in setup() + + self.plate_holder = PlateHolder( + name=name + "_plate_holder", + size_x=127.76, + size_y=85.48, + size_z=0, + pedestal_size_z=0, + child_location=Coordinate.zero(), + ) + self.assign_child_resource(self.plate_holder, location=Coordinate.zero()) + + async def setup(self) -> None: + await self.driver.setup() + self.microscopy = Microscopy(backend=self.driver.microscopy_backend) + + # Plate reading + temperature use the driver directly + # (BioTekBackend implements these backend ABCs) + self.absorbance = Absorbance(backend=self.driver) + self.luminescence = Luminescence(backend=self.driver) + self.fluorescence = Fluorescence(backend=self.driver) + self.temperature = TemperatureController(backend=self.driver) + + self._capabilities = [ + self.absorbance, + self.luminescence, + self.fluorescence, + self.microscopy, + self.temperature, + ] + + for cap in self._capabilities: + await cap._on_setup() + self._setup_finished = True + logger.info("Cytation5 setup complete") + + async def stop(self) -> None: + for cap in reversed(self._capabilities): + await cap._on_stop() + await self.driver.stop() + self._setup_finished = False + logger.info("Cytation5 stopped") + + def serialize(self) -> dict: + return {**Resource.serialize(self), **Device.serialize(self)} + + async def open(self, slow: bool = False) -> None: + await self.driver.open(slow=slow) + + async def close(self, slow: bool = False) -> None: + await self.driver.close(slow=slow) diff --git a/pylabrobot/agilent/biotek/cytation_aravis_driver.py b/pylabrobot/agilent/biotek/cytation_aravis_driver.py new file mode 100644 index 00000000000..3174a49119f --- /dev/null +++ b/pylabrobot/agilent/biotek/cytation_aravis_driver.py @@ -0,0 +1,514 @@ +"""CytationAravisDriver — connection + optics + camera for the Cytation using Aravis. + +Extends BioTekBackend (serial IO) with AravisCamera for image acquisition. +This is the driver layer — it owns the connections and low-level commands. +The MicroscopyBackend (capture orchestration) is created during setup() +and accessed via ``self.microscopy_backend``. + +Layer: Driver (connection + low-level commands) +Adjacent layers: + - Above: Cytation1/Cytation5 device reads driver.microscopy_backend + - Below: BioTekBackend serial IO + AravisCamera (GenICam/USB3 Vision) +""" + +from __future__ import annotations + +import asyncio +import logging +import re +from dataclasses import dataclass +from typing import List, Literal, Optional + +from pylabrobot.capabilities.microscopy.standard import ( + Exposure, + FocalPosition, + Gain, + Image, + ImagingMode, + Objective, +) + +from .aravis_camera import AravisCamera +from .biotek import BioTekBackend +from .cytation_aravis_microscopy import CytationAravisMicroscopyBackend + +logger = logging.getLogger(__name__) + + +@dataclass +class AravisImagingConfig: + """Imaging configuration for the Cytation with Aravis camera. + + Defines filter wheel positions, objective positions, and camera serial. + """ + + camera_serial_number: Optional[str] = None + filters: Optional[List[Optional[ImagingMode]]] = None + objectives: Optional[List[Optional[Objective]]] = None + max_image_read_attempts: int = 10 + image_read_delay: float = 0.3 + + +class CytationAravisDriver(BioTekBackend): + """Driver for the Cytation using Aravis camera. + + Extends BioTekBackend with: + - AravisCamera for image acquisition (Aravis/GenICam over USB3 Vision) + - Optics control methods (filter wheel, objectives, focus, LED, positioning) + - Camera control methods (exposure, gain, auto-exposure, trigger) + + During setup(), creates a CytationAravisMicroscopyBackend that the + device class reads to wire the Microscopy capability. + + Usage:: + + driver = CytationAravisDriver(camera_serial="22580842") + await driver.setup() + # driver.microscopy_backend is now available for Microscopy(backend=...) + """ + + def __init__( + self, + camera_serial: Optional[str] = None, + timeout: float = 20, + device_id: Optional[str] = None, + imaging_config: Optional[AravisImagingConfig] = None, + ) -> None: + super().__init__( + timeout=timeout, + device_id=device_id, + human_readable_device_name="Agilent BioTek Cytation (Aravis)", + ) + + self.camera = AravisCamera() + self._camera_serial = camera_serial + self.imaging_config = imaging_config or AravisImagingConfig(camera_serial_number=camera_serial) + + # Imaging state + self._filters: Optional[List[Optional[ImagingMode]]] = self.imaging_config.filters + self._objectives: Optional[List[Optional[Objective]]] = self.imaging_config.objectives + self._exposure: Optional[Exposure] = None + self._focal_height: Optional[FocalPosition] = None + self._gain: Optional[Gain] = None + self._imaging_mode: Optional[ImagingMode] = None + self._row: Optional[int] = None + self._column: Optional[int] = None + self._pos_x: Optional[float] = None + self._pos_y: Optional[float] = None + self._objective: Optional[Objective] = None + self._acquiring = False + + # Created during setup() + self.microscopy_backend: CytationAravisMicroscopyBackend + + @property + def filters(self) -> List[Optional[ImagingMode]]: + if self._filters is None: + raise RuntimeError("Filters not loaded. Call setup() first.") + return self._filters + + @property + def objectives(self) -> List[Optional[Objective]]: + if self._objectives is None: + raise RuntimeError("Objectives not loaded. Call setup() first.") + return self._objectives + + # ─── Lifecycle ─────────────────────────────────────────────────────── + + async def setup(self, use_cam: bool = True) -> None: + """Set up serial connection and camera.""" + logger.info("CytationAravisDriver setting up") + + await super().setup() + + if use_cam: + serial = self._camera_serial or ( + self.imaging_config.camera_serial_number if self.imaging_config else None + ) + await self.camera.setup(serial_number=serial) + logger.info("Camera connected: %s", self.camera.get_device_info()) + + if self._filters is None: + await self._load_filters() + + if self._objectives is None: + await self._load_objectives() + + # Create microscopy backend (device reads this to wire Microscopy capability) + self.microscopy_backend = CytationAravisMicroscopyBackend(driver=self) + + logger.info("CytationAravisDriver setup complete") + + async def stop(self) -> None: + """Disconnect camera and serial.""" + self._clear_imaging_state() + try: + await self.camera.stop() + except Exception: + logger.exception("Error stopping camera") + await super().stop() + logger.info("CytationAravisDriver stopped") + + def _clear_imaging_state(self) -> None: + self._exposure = None + self._focal_height = None + self._gain = None + self._imaging_mode = None + self._row = None + self._column = None + self._pos_x = None + self._pos_y = None + self._objective = None + self._acquiring = False + + # ─── Filter / Objective Discovery ──────────────────────────────────── + + async def _load_filters(self) -> None: + """Discover installed filter cube positions from firmware. + + Queries each slot individually with command ``i q{slot}``. + Uses the Cytation firmware filter code mapping. + """ + cytation_code2imaging_mode = { + 1225121: ImagingMode.C377_647, + 1225123: ImagingMode.C400_647, + 1225113: ImagingMode.C469_593, + 1225109: ImagingMode.ACRIDINE_ORANGE, + 1225107: ImagingMode.CFP, + 1225118: ImagingMode.CFP_FRET_V2, + 1225110: ImagingMode.CFP_YFP_FRET, + 1225119: ImagingMode.CFP_YFP_FRET_V2, + 1225112: ImagingMode.CHLOROPHYLL_A, + 1225105: ImagingMode.CY5, + 1225114: ImagingMode.CY5_5, + 1225106: ImagingMode.CY7, + 1225100: ImagingMode.DAPI, + 1225101: ImagingMode.GFP, + 1225116: ImagingMode.GFP_CY5, + 1225122: ImagingMode.OXIDIZED_ROGFP2, + 1225111: ImagingMode.PROPIDIUM_IODIDE, + 1225103: ImagingMode.RFP, + 1225117: ImagingMode.RFP_CY5, + 1225115: ImagingMode.TAG_BFP, + 1225102: ImagingMode.TEXAS_RED, + 1225104: ImagingMode.YFP, + } + + self._filters = [] + for slot in range(1, 5): + configuration = await self.send_command("i", f"q{slot}") + assert configuration is not None + parts = configuration.decode().strip().split(" ") + if len(parts) == 1: + self._filters.append(None) + else: + cytation_code = int(parts[0]) + self._filters.append(cytation_code2imaging_mode.get(cytation_code, None)) + + logger.info("Loaded filters: %s", self._filters) + + async def _load_objectives(self) -> None: + """Discover installed objective positions from firmware. + + Queries each slot individually. Uses the same weird encoding + and part number mapping. + Firmware version 1.x uses ``i o{slot}``, version 2.x uses ``i h{slot}``. + """ + weird_encoding = { + 0x00: " ", + 0x14: ".", + 0x15: "/", + 0x16: "0", + 0x17: "1", + 0x18: "2", + 0x19: "3", + 0x20: "4", + 0x21: "5", + 0x22: "6", + 0x23: "7", + 0x24: "8", + 0x25: "9", + 0x33: "A", + 0x34: "B", + 0x35: "C", + 0x36: "D", + 0x37: "E", + 0x38: "F", + 0x39: "G", + 0x40: "H", + 0x41: "I", + 0x42: "J", + 0x43: "K", + 0x44: "L", + 0x45: "M", + 0x46: "N", + 0x47: "O", + 0x48: "P", + 0x49: "Q", + 0x50: "R", + 0x51: "S", + 0x52: "T", + 0x53: "U", + 0x54: "V", + 0x55: "W", + 0x56: "X", + 0x57: "Y", + 0x58: "Z", + } + part_number2objective = { + "uplsapo 40x2": Objective.O_40X_PL_APO, + "lucplfln 60X": Objective.O_60X_PL_FL, + "uplfln 4x": Objective.O_4X_PL_FL, + "lucplfln 20xph": Objective.O_20X_PL_FL_Phase, + "lucplfln 40xph": Objective.O_40X_PL_FL_Phase, + "u plan": Objective.O_2_5X_PL_ACH_Meiji, + "uplfln 10xph": Objective.O_10X_PL_FL_Phase, + "plapon 1.25x": Objective.O_1_25X_PL_APO, + "uplfln 10x": Objective.O_10X_PL_FL, + "uplfln 60xoi": Objective.O_60X_OIL_PL_FL, + "pln 4x": Objective.O_4X_PL_ACH, + "pln 40x": Objective.O_40X_PL_ACH, + "lucplfln 40x": Objective.O_40X_PL_FL, + "ec-h-plan/2x": Objective.O_2X_PL_ACH_Motic, + "uplfln 100xO2": Objective.O_100X_OIL_PL_FL, + "uplfln 4xph": Objective.O_4X_PL_FL_Phase, + "lucplfln 20X": Objective.O_20X_PL_FL, + "pln 20x": Objective.O_20X_PL_ACH, + "fluar 2.5x/0.12": Objective.O_2_5X_FL_Zeiss, + "uplsapo 100xo": Objective.O_100X_OIL_PL_APO, + "plapon 60xo": Objective.O_60X_OIL_PL_APO, + "uplsapo 20x": Objective.O_20X_PL_APO, + } + + self._objectives = [] + if self.version.startswith("1"): + for slot in [1, 2]: + configuration = await self.send_command("i", f"o{slot}") + if configuration is None: + raise RuntimeError("Failed to load objective configuration") + middle_part = re.split(r"\s+", configuration.rstrip(b"\x03").decode("utf-8"))[1] + if middle_part == "0000": + self._objectives.append(None) + else: + part_number = "".join([weird_encoding[x] for x in bytes.fromhex(middle_part)]) + self._objectives.append(part_number2objective.get(part_number.lower(), None)) + elif self.version.startswith("2"): + for slot in range(1, 7): + configuration = await self.send_command("i", f"h{slot + 1}") + assert configuration is not None + if configuration.startswith(b"****"): + self._objectives.append(None) + else: + annulus_code = int(configuration.decode("latin").strip().split(" ")[0]) + annulus2objective = { + 1320520: Objective.O_4X_PL_FL_Phase, + 1320521: Objective.O_20X_PL_FL_Phase, + 1322026: Objective.O_40X_PL_FL_Phase, + } + self._objectives.append(annulus2objective.get(annulus_code, None)) + else: + raise RuntimeError(f"Unsupported firmware version: {self.version}") + + logger.info("Loaded objectives: %s", self._objectives) + + # ─── Camera Control ────────────────────────────────────────────────── + + async def set_exposure(self, exposure: Exposure) -> None: + """Set camera exposure time in ms.""" + if exposure == "machine-auto": + await self.camera.set_auto_exposure("continuous") + else: + await self.camera.set_auto_exposure("off") + await self.camera.set_exposure(float(exposure)) + self._exposure = exposure + + async def set_gain(self, gain: Gain) -> None: + """Set camera gain.""" + if gain == "machine-auto": + pass + else: + await self.camera.set_gain(float(gain)) + self._gain = gain + + async def set_auto_exposure(self, auto_exposure: Literal["off", "once", "continuous"]) -> None: + """Set camera auto-exposure mode.""" + await self.camera.set_auto_exposure(auto_exposure) + + def start_acquisition(self) -> None: + """Start camera acquisition (buffered streaming).""" + self.camera.start_acquisition() + self._acquiring = True + + def stop_acquisition(self) -> None: + """Stop camera acquisition.""" + if self._acquiring: + self.camera.stop_acquisition() + self._acquiring = False + + async def acquire_image(self) -> Image: + """Trigger camera and read image.""" + config = self.imaging_config + for attempt in range(config.max_image_read_attempts): + try: + image = await self.camera.trigger(timeout_ms=5000) + return image + except Exception: + if attempt < config.max_image_read_attempts - 1: + await asyncio.sleep(config.image_read_delay) + else: + raise + + async def get_exposure(self) -> float: + """Get current exposure time in ms.""" + return await self.camera.get_exposure() + + # ─── Optics Control (serial protocol) ──────────────────────────────── + + def _imaging_mode_code(self, mode: ImagingMode) -> int: + """Get filter wheel position index for an imaging mode. + + Brightfield and phase contrast use position 5 (no filter cube). + """ + if mode == ImagingMode.BRIGHTFIELD or mode == ImagingMode.PHASE_CONTRAST: + return 5 + for i, f in enumerate(self.filters): + if f == mode: + return i + 1 + raise ValueError(f"Mode {mode} not found in filters: {self.filters}") + + def _objective_code(self, objective: Objective) -> int: + """Get turret position index for an objective.""" + for i, o in enumerate(self.objectives): + if o == objective: + return i + 1 + raise ValueError(f"Objective {objective} not found: {self.objectives}") + + async def set_imaging_mode(self, mode: ImagingMode, led_intensity: int = 10) -> None: + """Set filter wheel position and LED. + + Brightfield uses filter position 5 (empty slot) and light path + mode 02 (transmitted). Fluorescence modes use the filter cube + position and light path mode 01 (epifluorescence). + """ + if mode == self._imaging_mode: + await self.led_on(intensity=led_intensity) + return + + if mode == ImagingMode.COLOR_BRIGHTFIELD: + raise NotImplementedError("Color brightfield not implemented") + + await self.led_off() + filter_index = self._imaging_mode_code(mode) + + if self.version.startswith("1"): + if mode == ImagingMode.PHASE_CONTRAST: + raise NotImplementedError("Phase contrast not implemented on Cytation 1") + elif mode == ImagingMode.BRIGHTFIELD: + await self.send_command("Y", "P0c05") + await self.send_command("Y", "P0f02") + else: + await self.send_command("Y", f"P0c{filter_index:02}") + await self.send_command("Y", "P0f01") + else: + await self.send_command("Y", f"P0c{filter_index:02}") + + self._imaging_mode = mode + await self.led_on(intensity=led_intensity) + await asyncio.sleep(0.5) + + async def set_objective(self, objective: Objective) -> None: + """Rotate objective turret to the specified objective.""" + if objective == self._objective: + return + obj_code = self._objective_code(objective) + if self.version.startswith("1"): + await self.send_command("Y", f"P0d{obj_code:02}", timeout=60) + else: + await self.send_command("Y", f"P0e{obj_code:02}", timeout=60) + self._objective = objective + self._imaging_mode = None # force re-set after objective change + + async def set_focus(self, focal_position: FocalPosition) -> None: + """Move focus motor to the specified height (mm). + + Uses a linear calibration for focus motor positioning. + """ + if focal_position == "machine-auto": + raise ValueError( + "focal_position cannot be 'machine-auto'. Use PLR's Microscopy auto-focus instead." + ) + + if focal_position == self._focal_height: + return + + slope, intercept = (10.637991436186072, 1.0243013203461762) + focus_integer = int(float(focal_position) + intercept + slope * float(focal_position) * 1000) + focus_str = str(focus_integer).zfill(5) + + if self._imaging_mode is None: + raise ValueError("Imaging mode not set. Call set_imaging_mode() first.") + imaging_mode_code = self._imaging_mode_code(self._imaging_mode) + await self.send_command("i", f"F{imaging_mode_code}0{focus_str}") + + self._focal_height = focal_position + + async def led_on(self, intensity: int = 10) -> None: + """Turn on LED at specified intensity (1–10).""" + if not 1 <= intensity <= 10: + raise ValueError("intensity must be between 1 and 10") + if self._imaging_mode is None: + raise ValueError("Imaging mode not set. Call set_imaging_mode() first.") + imaging_mode_code = self._imaging_mode_code(self._imaging_mode) + intensity_str = str(intensity).zfill(2) + await self.send_command("i", f"L0{imaging_mode_code}{intensity_str}") + + async def led_off(self) -> None: + """Turn off LED.""" + await self.send_command("i", "L0001") + + async def select(self, row: int, column: int) -> None: + """Move plate stage to a well position.""" + if row == self._row and column == self._column: + return + row_str = str(row).zfill(2) + col_str = str(column).zfill(2) + await self.send_command("Y", f"W6{row_str}{col_str}") + self._row, self._column = row, column + self._pos_x, self._pos_y = None, None + await self.set_position(0, 0) + + async def set_position(self, x: float, y: float) -> None: + """Fine-position the plate stage within a well (mm).""" + if self._imaging_mode is None: + raise ValueError("Imaging mode not set. Call set_imaging_mode() first.") + + if x == self._pos_x and y == self._pos_y: + return + + x_str = str(round(x * 100 * 0.984)).zfill(6) + y_str = str(round(y * 100 * 0.984)).zfill(6) + + if self._row is None or self._column is None: + raise ValueError("Row and column not set. Call select() first.") + row_str = str(self._row).zfill(2) + column_str = str(self._column).zfill(2) + + if self._objective is None: + raise ValueError("Objective not set. Call set_objective() first.") + objective_code = self._objective_code(self._objective) + imaging_mode_code = self._imaging_mode_code(self._imaging_mode) + await self.send_command( + "Y", + f"Z{objective_code}{imaging_mode_code}6{row_str}{column_str}{y_str}{x_str}", + ) + + relative_x = x - (self._pos_x or 0) + relative_y = y - (self._pos_y or 0) + if relative_x != 0: + relative_x_str = str(round(relative_x * 100 * 0.984)).zfill(6) + await self.send_command("Y", f"O00{relative_x_str}") + if relative_y != 0: + relative_y_str = str(round(relative_y * 100 * 0.984)).zfill(6) + await self.send_command("Y", f"O01{relative_y_str}") + + self._pos_x, self._pos_y = x, y diff --git a/pylabrobot/agilent/biotek/cytation_aravis_microscopy.py b/pylabrobot/agilent/biotek/cytation_aravis_microscopy.py new file mode 100644 index 00000000000..e832d0bebae --- /dev/null +++ b/pylabrobot/agilent/biotek/cytation_aravis_microscopy.py @@ -0,0 +1,170 @@ +"""CytationAravisMicroscopyBackend — MicroscopyBackend for the Cytation with Aravis. + +Orchestrates capture by sequencing the driver's optics and camera methods. +Same role as STARPIPBackend: translates capability operations into driver calls. + +Layer: Capability backend (orchestration) +Adjacent layers: + - Above: Microscopy capability calls capture() + - Below: CytationAravisDriver (optics + camera commands) +""" + +from __future__ import annotations + +import logging +import math +from dataclasses import dataclass +from typing import TYPE_CHECKING, List, Literal, Optional, Tuple, Union + +from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.capabilities.microscopy.backend import MicroscopyBackend +from pylabrobot.capabilities.microscopy.standard import ( + Exposure, + FocalPosition, + Gain, + Image, + ImagingMode, + ImagingResult, + Objective, +) +from pylabrobot.resources.plate import Plate +from pylabrobot.serializer import SerializableMixin + +if TYPE_CHECKING: + from .cytation_aravis_driver import CytationAravisDriver + +logger = logging.getLogger(__name__) + + +class CytationAravisMicroscopyBackend(MicroscopyBackend): + """MicroscopyBackend for the Cytation using Aravis camera. + + Orchestrates a capture by calling the driver's optics and camera methods + in the correct sequence. Same pattern as STARPIPBackend: the backend + translates capability operations into driver calls. + + Created by CytationAravisDriver during setup() and accessed via + ``driver.microscopy_backend``. + """ + + def __init__(self, driver: CytationAravisDriver) -> None: + self.driver = driver + + async def setup(self) -> None: + pass + + async def stop(self) -> None: + pass + + # ─── Vendor Params ────────────────────────────────────────────────── + + @dataclass + class CaptureParams(BackendParams): + """Aravis-specific capture parameters. + + Passed via ``backend_params`` in ``Microscopy.capture()``. + """ + + led_intensity: int = 10 + coverage: Union[Literal["full"], Tuple[int, int]] = (1, 1) + center_position: Optional[Tuple[float, float]] = None + auto_stop_acquisition: bool = True + + # ─── MicroscopyBackend.capture() ───────────────────────────────────── + + async def capture( + self, + row: int, + column: int, + mode: ImagingMode, + objective: Objective, + exposure_time: Exposure, + focal_height: FocalPosition, + gain: Gain, + plate: Plate, + backend_params: Optional[SerializableMixin] = None, + ) -> ImagingResult: + """Capture image(s) from a well. + + Orchestrates the full imaging pipeline via the driver: + 1. Set plate geometry (serial) + 2. Start camera acquisition + 3. Set objective (turret motor, serial) + 4. Set imaging mode / filter (filter wheel, serial) + 5. Select well (stage motor, serial) + 6. Set exposure and gain (AravisCamera) + 7. Set focal height (focus motor, serial) + 8. Trigger and grab image (AravisCamera) + 9. Return ImagingResult + """ + if not isinstance(backend_params, self.CaptureParams): + backend_params = CytationAravisMicroscopyBackend.CaptureParams() + + led_intensity = backend_params.led_intensity + coverage = backend_params.coverage + center_position = backend_params.center_position + auto_stop_acquisition = backend_params.auto_stop_acquisition + + d = self.driver + await d.set_plate(plate) + + if not d._acquiring: + d.start_acquisition() + + try: + await d.set_objective(objective) + await d.set_imaging_mode(mode, led_intensity=led_intensity) + await d.select(row, column) + await d.set_exposure(exposure_time) + await d.set_gain(gain) + await d.set_focus(focal_height) + + if center_position is not None: + await d.set_position(center_position[0], center_position[1]) + + images: List[Image] = [] + + if coverage == (1, 1): + image = await d.acquire_image() + images.append(image) + else: + if d._objective is None: + raise RuntimeError("Objective not set.") + magnification = d._objective.magnification + + fov_map = {4: 3.474, 20: 0.694, 40: 0.347} + fov = fov_map.get(int(magnification), 3.474) + + if coverage == "full": + first_well = plate.get_item(0) + well_w = first_well.get_size_x() + well_h = first_well.get_size_y() + rows_n = math.ceil(well_h / fov) + cols_n = math.ceil(well_w / fov) + else: + rows_n, cols_n = coverage + + cx = center_position[0] if center_position else 0.0 + cy = center_position[1] if center_position else 0.0 + + for yi in range(rows_n): + for xi in range(cols_n): + x_pos = (xi - (cols_n - 1) / 2) * fov + cx + y_pos = -(yi - (rows_n - 1) / 2) * fov + cy + await d.set_position(x=x_pos, y=y_pos) + image = await d.acquire_image() + images.append(image) + + finally: + await d.led_off() + if auto_stop_acquisition: + d.stop_acquisition() + + exposure_ms = await d.get_exposure() + focal_height_val = float(d._focal_height) if d._focal_height else 0.0 + + return ImagingResult( + images=images, + exposure_time=exposure_ms, + focal_height=focal_height_val, + )