diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index e8a1173..fec1981 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,6 +1,8 @@ name: Build and Publish to PyPi -on: push +on: + push: + branches: [main] jobs: build: diff --git a/examples/01_scan_servos.py b/examples/01_scan_servos.py index 898b3d9..2a5bcaf 100644 --- a/examples/01_scan_servos.py +++ b/examples/01_scan_servos.py @@ -3,6 +3,7 @@ """ import os + from python_st3215 import ST3215 controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) @@ -25,10 +26,10 @@ f" Firmware: v{servo.eeprom.read_firmware_major_version()}.{servo.eeprom.read_firmware_minor_version()}" ) print(f" Position: {servo.sram.read_current_location()}") - print(f" Temperature: {servo.sram. read_current_temperature()}°C") + print(f" Temperature: {servo.sram.read_current_temperature()}°C") print(f" Voltage: {servo.sram.read_current_voltage() / 10:.1f}V") print( - f" Min/Max Angle: {servo.eeprom. read_min_angle_limit()} / {servo.eeprom.read_max_angle_limit()}" + f" Min/Max Angle: {servo.eeprom.read_min_angle_limit()} / {servo.eeprom.read_max_angle_limit()}" ) mode = servo.eeprom.read_operating_mode() diff --git a/examples/03_read_position.py b/examples/03_read_position.py index 1a0f8e6..29459ce 100644 --- a/examples/03_read_position.py +++ b/examples/03_read_position.py @@ -4,6 +4,7 @@ import os import time + from python_st3215 import ST3215 controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) diff --git a/examples/07_temperature_monitor.py b/examples/07_temperature_monitor.py index 8ffd2e9..56d0c76 100644 --- a/examples/07_temperature_monitor.py +++ b/examples/07_temperature_monitor.py @@ -4,6 +4,7 @@ import os import time + from python_st3215 import ST3215 controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) diff --git a/examples/09_registered_write.py b/examples/09_registered_write.py index 9e2729d..afa1bf0 100644 --- a/examples/09_registered_write.py +++ b/examples/09_registered_write.py @@ -4,6 +4,7 @@ import os import time + from python_st3215 import ST3215 controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) @@ -11,32 +12,35 @@ try: servos = controller.list_servos() - if len(servos) < 2: - print("This example requires at least 2 servos") - else: - servo_objects = [controller.wrap_servo(sid) for sid in servos[:2]] + servo_objects = [controller.wrap_servo(sid) for sid in servos] - for servo in servo_objects: - servo.sram.torque_enable() - servo.sram.write_acceleration(50) + for servo in servo_objects: + servo.sram.torque_enable() + servo.sram.write_acceleration(50) - print("Preparing movements with registered write...") - servo_objects[0].sram.write_target_location(1000, reg=True) - servo_objects[1].sram.write_target_location(3000, reg=True) + print("Preparing movements with registered write...") + for i, servo in enumerate(servo_objects): + if i % 2 == 0: + servo.sram.write_target_location(1000, reg=True) + else: + servo.sram.write_target_location(3000, reg=True) - print("Executing all movements simultaneously!") - controller.broadcast.action() - time.sleep(2) + print("Executing all movements simultaneously!") + controller.broadcast.action() + time.sleep(2) - print("Preparing opposite movements...") - servo_objects[0].sram.write_target_location(3000, reg=True) - servo_objects[1].sram.write_target_location(1000, reg=True) + print("Preparing opposite movements...") + for i, servo in enumerate(servo_objects): + if i % 2 == 0: + servo.sram.write_target_location(3000, reg=True) + else: + servo.sram.write_target_location(1000, reg=True) - print("Executing!") - controller.broadcast.action() - time.sleep(2) + print("Executing!") + controller.broadcast.action() + time.sleep(2) - for servo in servo_objects: - servo.sram.torque_disable() + for servo in servo_objects: + servo.sram.torque_disable() finally: controller.close() diff --git a/examples/14_change_id.py b/examples/14_change_id.py index 5f43ae8..7f3c973 100644 --- a/examples/14_change_id.py +++ b/examples/14_change_id.py @@ -5,7 +5,6 @@ import os from python_st3215 import ST3215 - controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) try: diff --git a/examples/17_sync_write.py b/examples/17_sync_write.py index c540e54..f7e9a80 100644 --- a/examples/17_sync_write.py +++ b/examples/17_sync_write.py @@ -5,6 +5,7 @@ import os import time + from python_st3215 import ST3215 controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) diff --git a/examples/18_network_port.py b/examples/18_network_port.py new file mode 100644 index 0000000..20e848d --- /dev/null +++ b/examples/18_network_port.py @@ -0,0 +1,70 @@ +""" +Scan for all servos on the bus and display their information. +Supports either a direct serial device (e.g. /dev/ttyUSB0) or a network +socket URL (e.g. socket://:). + +Env vars: +- ST3215_URL : full pyserial URL (takes precedence) +- ST3215_HOST : IP/hostname for socket connection (default: 100.122.96.71) +- ST3215_PORT : TCP port for socket connection (default: 2000) +- ST3215_DEV : fallback serial device (default: /dev/ttyUSB0) + +Run this on the host connected to the ST3215 driver board (make sure you have socat installed and set up udev rules for /dev/ttyACM0): + `stty -F /dev/ttyACM0 1000000 raw -echo && socat -d -d TCP4-LISTEN:2000,bind=0.0.0.0,reuseaddr,fork,nodelay FILE:/dev/ttyACM0,b1000000,raw,echo=0` +""" + +import os +import sys + +import serial + +from python_st3215 import ST3215 + +url_env = os.environ.get("ST3215_URL") +host = os.environ.get("ST3215_HOST", "st3215-host") +port = os.environ.get("ST3215_PORT", "2000") + +if url_env: + TARGET_URL = url_env +elif host and port: + TARGET_URL = f"socket://{host}:{port}" +else: + sys.exit(1) + +print(f"Connecting to ST3215 via: {TARGET_URL}") + +ser = serial.serial_for_url(TARGET_URL, timeout=0.02) +controller = ST3215(ser=ser, read_timeout=0.02) + +try: + print("Scanning for servos...\n") + servos = controller.list_servos(timeout=0.02) + + if not servos: + print("No servos found!") + else: + print(f"Found {len(servos)} servo(s)\n") + print("=" * 80) + + mode_names = {0: "Position", 1: "Constant Speed", 2: "PWM", 3: "Stepper"} + + for servo_id in servos: + servo = controller.wrap_servo(servo_id) + + print(f"\nServo ID: {servo_id}") + print( + f" Firmware: v{servo.eeprom.read_firmware_major_version()}.{servo.eeprom.read_firmware_minor_version()}" + ) + print(f" Position: {servo.sram.read_current_location()}") + print(f" Temperature: {servo.sram.read_current_temperature()}°C") + print(f" Voltage: {servo.sram.read_current_voltage() / 10:.1f}V") + print( + f" Min/Max Angle: {servo.eeprom.read_min_angle_limit()} / {servo.eeprom.read_max_angle_limit()}" + ) + + mode = servo.eeprom.read_operating_mode() + print(f" Operating Mode: {mode_names.get(mode, 'Unknown')}") + + print("\n" + "=" * 80) +finally: + controller.close() diff --git a/src/python_st3215/__init__.py b/src/python_st3215/__init__.py index 2b7349c..98616e4 100644 --- a/src/python_st3215/__init__.py +++ b/src/python_st3215/__init__.py @@ -1,14 +1,34 @@ -from .st3215 import ST3215 -from .errors import ST3215Error, ServoNotRespondingError, InvalidInstructionError +from .errors import ( + BroadcastOperationError, + ChecksumError, + CommunicationTimeoutError, + InvalidIDError, + InvalidInstructionError, + InvalidParameterError, + ServoAngleLimitError, + ServoLockedError, + ServoNotRespondingError, + ServoStatusError, + ST3215Error, +) from .servo import Servo +from .st3215 import ST3215 -__version__ = "1.1.0" +__version__ = "1.2.0" __all__ = [ + "__version__", "ST3215", "ST3215Error", "ServoNotRespondingError", "InvalidInstructionError", + "ChecksumError", + "InvalidParameterError", + "ServoAngleLimitError", + "CommunicationTimeoutError", + "ServoLockedError", + "BroadcastOperationError", + "InvalidIDError", + "ServoStatusError", "Servo", - "__version__", ] diff --git a/src/python_st3215/decorators.py b/src/python_st3215/decorators.py index b95e9d7..170f0a8 100644 --- a/src/python_st3215/decorators.py +++ b/src/python_st3215/decorators.py @@ -12,40 +12,22 @@ def validate_servo_id(func: Callable[..., Any]) -> Callable[..., Any]: @wraps(func) def wrapper(self: Any, servo_id: int, *args: Any, **kwargs: Any) -> Any: if servo_id == 254: - from .errors import ST3215Error + from .errors import BroadcastOperationError - raise ST3215Error(f"Cannot {func.__name__} broadcast servo ID 254.") - return func(self, servo_id, *args, **kwargs) - - return wrapper - - -def validate_broadcast_only(func: Callable[..., Any]) -> Callable[..., Any]: - """ - Decorator to ensure operation is only used with broadcast servo (ID 254). - """ - - @wraps(func) - def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: - if self.servo.id != 254: - from .errors import ST3215Error - - raise ST3215Error( - f"{func.__name__} can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + f"{func.__name__} cannot be used with broadcast ID 254." ) - return func(self, *args, **kwargs) + return func(self, servo_id, *args, **kwargs) return wrapper -def validate_value_range( - min_val: int, max_val: int -) -> Callable[[Callable[..., Any]], Callable[..., Any]]: +def validate_value_range(min_val: int, max_val: int) -> Callable[[F], F]: """ Decorator to validate that a value parameter is within the specified range. """ - def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + def decorator(func: F) -> F: @wraps(func) def wrapper(self: Any, value: int, *args: Any, **kwargs: Any) -> Any: if not (min_val <= value <= max_val): @@ -54,7 +36,7 @@ def wrapper(self: Any, value: int, *args: Any, **kwargs: Any) -> Any: ) return func(self, value, *args, **kwargs) - return wrapper + return wrapper # type: ignore[return-value] return decorator @@ -62,19 +44,19 @@ def wrapper(self: Any, value: int, *args: Any, **kwargs: Any) -> Any: def encode_signed_word(value: int) -> tuple[int, int]: """ Encode a signed 16-bit value to low and high bytes. + Encoding uses most-significant bit as sign bit. Args: - value: Signed integer (-32768 to 32767) + value: Signed integer (-32767 to 32767) Returns: Tuple of (low_byte, high_byte) """ + if value < -32767 or value > 32767: + raise ValueError + low, high = abs(value) & 0xFF, (abs(value) >> 8) & 0x7F if value < 0: - raw = 65536 + value - else: - raw = value - low = raw & 0xFF - high = (raw >> 8) & 0xFF + high |= 0x80 return low, high @@ -83,13 +65,13 @@ def decode_signed_word(raw: int) -> int: Decode a 16-bit raw value to signed integer. Args: - raw: Raw 16-bit value (0-65535) + raw: Raw 16-bit value (0-65535) with dedicated sign bit. Returns: - Signed integer (-32768 to 32767) + Signed integer (-32767 to 32767) """ if raw & 0x8000: - return raw - 65536 + return -(raw & 0x7FFF) return raw diff --git a/src/python_st3215/errors.py b/src/python_st3215/errors.py index a9e72c4..938ce51 100644 --- a/src/python_st3215/errors.py +++ b/src/python_st3215/errors.py @@ -1,10 +1,118 @@ class ST3215Error(Exception): + """Base exception for all ST3215-related errors.""" + pass class ServoNotRespondingError(ST3215Error): + """ + Raised when a servo does not respond to a command. + """ + pass class InvalidInstructionError(ST3215Error): + """ + Raised when an invalid instruction code is used. + """ + + pass + + +class ChecksumError(ST3215Error): + """ + Raised when response checksum validation fails. + """ + + pass + + +class InvalidParameterError(ST3215Error): + """ + Raised when a parameter value is outside valid range. + """ + + def __init__( + self, + parameter: str, + value: int | float, + min_val: int | float, + max_val: int | float, + ): + self.parameter = parameter + self.value = value + self.min_val = min_val + self.max_val = max_val + super().__init__( + f"Parameter '{parameter}' value {value} is outside valid range [{min_val}, {max_val}]" + ) + + +class ServoAngleLimitError(ST3215Error): + """ + Raised when attempting to move servo beyond configured angle limits. + """ + + pass + + +class CommunicationTimeoutError(ST3215Error): + """ + Raised when communication with servo times out. + """ + pass + + +class ServoLockedError(ST3215Error): + """ + Raised when attempting to write to a locked servo. + + The servo's lock must be disabled before writing to EEPROM. + """ + + pass + + +class BroadcastOperationError(ST3215Error): + """ + Raised when an invalid operation is attempted on broadcast ID (254). + + Some operations like ping cannot be performed on the broadcast address. + """ + + pass + + +class InvalidIDError(ST3215Error): + """ + Raised when an invalid servo ID is specified. + + Valid IDs are 0-253. ID 254 is reserved for broadcast. + """ + + def __init__(self, servo_id: int): + self.servo_id = servo_id + super().__init__( + f"Invalid servo ID {servo_id}. Valid range is 0-253 (254 is broadcast)" + ) + + +class ServoStatusError(ST3215Error): + """ + Raised when servo reports an error status. + + The error code from the servo can be inspected to determine + the specific issue. + """ + + def __init__(self, servo_id: int, error_code: int, message: str = ""): + self.servo_id = servo_id + self.error_code = error_code + full_message = ( + f"Servo {servo_id} reported error status (code {error_code:#04x})" + ) + if message: + full_message += f": {message}" + super().__init__(full_message) diff --git a/src/python_st3215/registers.py b/src/python_st3215/registers.py index 6d6ad3e..2862987 100644 --- a/src/python_st3215/registers.py +++ b/src/python_st3215/registers.py @@ -1,15 +1,17 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Optional, Any -from .errors import ST3215Error + +from typing import TYPE_CHECKING, Any, Optional + +from .errors import BroadcastOperationError, ST3215Error if TYPE_CHECKING: from .servo import Servo from .decorators import ( - validate_value_range, - encode_signed_word, decode_signed_word, + encode_signed_word, encode_unsigned_word, + validate_value_range, ) @@ -48,7 +50,7 @@ def write_word( return write_fn(address, [low, high]) -class _EEPROMRegisters: +class EEPROMRegisters: def __init__(self, servo: "Servo") -> None: self.servo = servo @@ -992,8 +994,8 @@ def sync_write_acceleration(self, servo_data: dict[int, int]) -> None: None (broadcast operation, no response) """ if self.servo.id != 254: - raise ST3215Error( - "sync_write_acceleration can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_write_acceleration can only be called on the broadcast servo (ID 254)." ) formatted_data: dict[int, list[int]] = {} for servo_id, value in servo_data.items(): @@ -1038,8 +1040,8 @@ def sync_write_target_location(self, servo_data: dict[int, int]) -> None: None (broadcast operation, no response) """ if self.servo.id != 254: - raise ST3215Error( - "sync_write_target_location can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_write_target_location can only be called on the broadcast servo (ID 254)." ) formatted_data: dict[int, list[int]] = {} for servo_id, value in servo_data.items(): @@ -1111,8 +1113,8 @@ def sync_write_running_speed(self, servo_data: dict[int, int]) -> None: None (broadcast operation, no response) """ if self.servo.id != 254: - raise ST3215Error( - "sync_write_running_speed can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_write_running_speed can only be called on the broadcast servo (ID 254)." ) formatted_data: dict[int, list[int]] = {} for servo_id, value in servo_data.items(): @@ -1158,8 +1160,8 @@ def sync_write_torque_limit(self, servo_data: dict[int, int]) -> None: None (broadcast operation, no response) """ if self.servo.id != 254: - raise ST3215Error( - "sync_write_torque_limit can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_write_torque_limit can only be called on the broadcast servo (ID 254)." ) formatted_data: dict[int, list[int]] = {} for servo_id, value in servo_data.items(): @@ -1240,8 +1242,8 @@ def sync_read_current_location( Dictionary mapping servo_id to current position """ if self.servo.id != 254: - raise ST3215Error( - "sync_read_current_location can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_read_current_location can only be called on the broadcast servo (ID 254)." ) responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( 0x38, 2, servo_ids @@ -1249,12 +1251,8 @@ def sync_read_current_location( results: dict[int, int | None] = {} for servo_id, response in responses.items(): if response and isinstance(response, dict) and response.get("parameters"): - data: list[int] | bytes = response["parameters"] - if isinstance(data, (bytes, bytearray)): - raw = data[0] | (data[1] << 8) - else: - raw = data[0] | (data[1] << 8) - results[servo_id] = raw + data: bytes = response["parameters"] + results[servo_id] = data[0] | (data[1] << 8) else: results[servo_id] = None return results @@ -1266,7 +1264,7 @@ def read_current_speed(self) -> Optional[int]: Returns: int: Current speed in steps/s. Returns None if read fails. """ - return read_word(self.servo, 0x3A) + return read_word(self.servo, 0x3A, signed=True) def sync_read_current_speed(self, servo_ids: list[int]) -> dict[int, Optional[int]]: """ @@ -1279,8 +1277,8 @@ def sync_read_current_speed(self, servo_ids: list[int]) -> dict[int, Optional[in Dictionary mapping servo_id to current speed """ if self.servo.id != 254: - raise ST3215Error( - "sync_read_current_speed can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_read_current_speed can only be called on the broadcast servo (ID 254)." ) responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( 0x3A, 2, servo_ids @@ -1293,7 +1291,7 @@ def sync_read_current_speed(self, servo_ids: list[int]) -> dict[int, Optional[in raw = data[0] | (data[1] << 8) else: raw = data[0] | (data[1] << 8) - results[servo_id] = raw + results[servo_id] = decode_signed_word(raw) else: results[servo_id] = None return results @@ -1319,8 +1317,8 @@ def sync_read_current_load(self, servo_ids: list[int]) -> dict[int, Optional[int Dictionary mapping servo_id to current load """ if self.servo.id != 254: - raise ST3215Error( - "sync_read_current_load can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_read_current_load can only be called on the broadcast servo (ID 254)." ) responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( 0x3C, 2, servo_ids @@ -1358,8 +1356,8 @@ def sync_read_current_voltage( Dictionary mapping servo_id to current voltage """ if self.servo.id != 254: - raise ST3215Error( - "sync_read_current_voltage can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_read_current_voltage can only be called on the broadcast servo (ID 254)." ) responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( 0x3E, 1, servo_ids @@ -1395,8 +1393,8 @@ def sync_read_current_temperature( Dictionary mapping servo_id to current temperature """ if self.servo.id != 254: - raise ST3215Error( - "sync_read_current_temperature can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_read_current_temperature can only be called on the broadcast servo (ID 254)." ) responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( 0x3F, 1, servo_ids @@ -1441,8 +1439,8 @@ def sync_read_servo_status(self, servo_ids: list[int]) -> dict[int, Optional[int Dictionary mapping servo_id to servo status """ if self.servo.id != 254: - raise ST3215Error( - "sync_read_servo_status can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_read_servo_status can only be called on the broadcast servo (ID 254)." ) responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( 0x41, 1, servo_ids @@ -1499,8 +1497,8 @@ def sync_read_current_current( Dictionary mapping servo_id to current draw """ if self.servo.id != 254: - raise ST3215Error( - "sync_read_current_current can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_read_current_current can only be called on the broadcast servo (ID 254)." ) responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( 0x45, 2, servo_ids diff --git a/src/python_st3215/servo.py b/src/python_st3215/servo.py index 7de9d8e..5cf06eb 100644 --- a/src/python_st3215/servo.py +++ b/src/python_st3215/servo.py @@ -1,24 +1,49 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Sequence, Any, cast + +from typing import TYPE_CHECKING, Any, Sequence, cast if TYPE_CHECKING: from .st3215 import ST3215 from .instructions import Instruction -from .registers import _EEPROMRegisters, SRAMRegisters +from .registers import EEPROMRegisters, SRAMRegisters class Servo: def __init__(self, controller: "ST3215", servo_id: int) -> None: + """Wrap a single servo for convenient register access. + + Instances are normally obtained via :meth:`ST3215.wrap_servo` rather than + constructed directly. The broadcast pseudo-servo (ID 254) is available as + ``controller.broadcast`` and exposes the ``sync_write_*`` / ``sync_read_*`` + helpers on its ``sram`` attribute. + + Args: + controller (ST3215): The parent controller that owns the serial connection. + servo_id (int): The servo's bus ID (0-253, or 254 for broadcast). + """ self.controller = controller self.id = servo_id self.logger = controller.logger - self.eeprom = _EEPROMRegisters(self) + self.eeprom = EEPROMRegisters(self) self.sram = SRAMRegisters(self) def send( self, instruction: int | Instruction, parameters: Sequence[int] | None = None ) -> dict[str, object] | None: + """Send a raw instruction to this servo and return the parsed response. + + This is the low-level building block used by all register helpers. Prefer + the typed methods on :attr:`eeprom` and :attr:`sram` for normal use. + + Args: + instruction (int | Instruction): Instruction byte to send. + parameters (Sequence[int] | None): Optional parameter bytes. + + Returns: + dict | None: Parsed response dict (see :meth:`ST3215.parse_response`), + or ``None`` if no response was received. + """ self.logger.debug( f"Servo {self.id}: sending instruction {instruction} with parameters {parameters}" ) diff --git a/src/python_st3215/st3215.py b/src/python_st3215/st3215.py index 0dce40c..873ad89 100644 --- a/src/python_st3215/st3215.py +++ b/src/python_st3215/st3215.py @@ -1,63 +1,156 @@ +from __future__ import annotations + import logging +import time +from typing import Callable, Literal, Optional, Protocol, Sequence + import serial -from .servo import Servo -from .instructions import Instruction -from .errors import ServoNotRespondingError, InvalidInstructionError from .decorators import validate_servo_id +from .errors import ( + ChecksumError, + CommunicationTimeoutError, + InvalidIDError, + InvalidInstructionError, + ServoNotRespondingError, + ServoStatusError, +) +from .instructions import Instruction +from .servo import Servo + + +class _SerialLike(Protocol): + """Minimal interface expected of any serial-like object passed to ST3215.""" -from typing import Optional, Sequence, Literal + timeout: Optional[float] + is_open: bool + + def write(self, data: bytes) -> Optional[int]: ... + def read(self, size: int) -> bytes: ... + def flush(self) -> None: ... + def close(self) -> None: ... + + +logging.getLogger("ST3215").addHandler(logging.NullHandler()) class ST3215: logger = logging.getLogger("ST3215") - logger.setLevel(logging.WARN) - _console_handler = logging.StreamHandler() - _console_handler.setFormatter( - logging.Formatter("[%(levelname)s] %(name)s: %(message)s") - ) - logger.addHandler(_console_handler) @classmethod def set_log_level(cls, level: int) -> None: + """Set the logging level for the ST3215 logger. + + Args: + level (int): A logging level constant such as ``logging.DEBUG``, + ``logging.INFO``, ``logging.WARNING``, etc. + """ cls.logger.setLevel(level) @classmethod def disable_logging(cls) -> None: + """Disable all ST3215 log output.""" cls.logger.disabled = True @classmethod def enable_logging(cls) -> None: + """Re-enable ST3215 log output after a previous call to :meth:`disable_logging`.""" cls.logger.disabled = False def __init__( self, - port: str, + port: Optional[str] = None, baudrate: int = 1000000, read_timeout: float = 0.002, + retry_count: int = 3, + retry_delay: float = 0.01, + ser: Optional[_SerialLike] = None, ) -> None: """ Initialize the ST3215 controller with the given serial port settings. + Args: - port (str): Serial port to connect to (e.g., 'COM3' or '/dev/ttyUSB0'). - baudrate (int): Baud rate for serial communication. - read_timeout (float): Read timeout in seconds. + port (Optional[str]): Serial port to connect to (e.g., 'COM3' or '/dev/ttyUSB0'). + baudrate (int): Baud rate for serial communication. Default is 1,000,000. + read_timeout (float): Read timeout in seconds. Default is 0.002. + retry_count (int): Number of retries for failed communication. Default is 3. + retry_delay (float): Delay between retries in seconds. Default is 0.01. + ser (Optional[_SerialLike]): Optional existing serial-like object to use instead + of opening a new port. Must expose ``read``, ``write``, ``flush``, ``close``, + ``is_open``, and ``timeout``. + + Raises: + ValueError: If neither port nor ser is provided. + serial.SerialException: If the serial port cannot be opened. """ + if port is None and ser is None: + raise ValueError("Either 'port' or 'ser' must be provided.") + self.logger.debug( f"Initializing ST3215 on port {port} with baudrate {baudrate}" ) - self.ser = serial.Serial(port, baudrate=baudrate, timeout=read_timeout) - self.logger.debug(f"Serial port opened at {baudrate} baud.") + self.port = port + self.baudrate = baudrate + self.read_timeout = read_timeout + self.retry_count = retry_count + self.retry_delay = retry_delay + + try: + if ser is not None: + self.ser: _SerialLike = ser + self.ser.timeout = read_timeout + else: + self.ser = serial.Serial(port, baudrate=baudrate, timeout=read_timeout) + self.logger.debug(f"Serial port opened at {baudrate} baud.") + except serial.SerialException as e: + self.logger.error(f"Failed to open serial port {port}: {e}") + raise self.broadcast = Servo(self, 254) def close(self) -> None: - if self.ser.is_open: + """ + Close the serial connection. + + Safe to call multiple times. + """ + if hasattr(self, "ser") and self.ser.is_open: self.ser.close() self.logger.info("Serial port closed.") + def is_connected(self) -> bool: + """ + Check if the serial connection is open and healthy. + + Returns: + bool: True if connected, False otherwise. + """ + return hasattr(self, "ser") and self.ser.is_open + def build_packet( self, servo_id: int, instruction: int, parameters: Sequence[int] | None = None ) -> bytes: + """Construct a raw protocol packet ready to be written to the serial bus. + + The ST3215 packet format is:: + + [0xFF, 0xFF, ID, LENGTH, INSTRUCTION, PARAM..., CHECKSUM] + + Checksum = (~(ID + LENGTH + INSTRUCTION + sum(PARAMS))) & 0xFF + + Args: + servo_id (int): Target servo ID (0-254, where 254 is broadcast). + instruction (int): Instruction byte (must be a valid :class:`Instruction` value). + parameters (Sequence[int] | None): Optional parameter bytes to include. + + Returns: + bytes: The fully-formed packet. + + Raises: + InvalidIDError: If ``servo_id`` is outside 0-254. + InvalidInstructionError: If ``instruction`` is not a recognised instruction code. + """ + if not 0 <= servo_id <= 254: + raise InvalidIDError(servo_id) if not Instruction.has_value(instruction): raise InvalidInstructionError(f"Invalid instruction: {instruction:#04x}") params = tuple(parameters) if parameters else () @@ -80,29 +173,98 @@ def send_instruction( instruction: int | Instruction, parameters: Sequence[int] | None = None, ) -> bytes: + """ + Send an instruction packet to a servo. + + Args: + servo_id: Target servo ID. + instruction: Instruction code (int or Instruction enum). + parameters: Optional parameter bytes. + + Returns: + bytes: The packet that was sent. + + Raises: + serial.SerialException: If write fails. + """ instruction_value = ( instruction.value if isinstance(instruction, Instruction) else instruction ) packet = self.build_packet(servo_id, instruction_value, parameters) self.logger.debug(f"Sending packet: {list(packet)}") - self.ser.write(packet) - self.ser.flush() - return packet - - def read_response(self, sent_packet: bytes) -> Optional[bytes]: - raw_data = self.ser.read(1024) - self.logger.debug(f"Raw data read: {list(raw_data)}") - if not raw_data: - self.logger.warning("No response received.") - return None - if raw_data.startswith(sent_packet): - self.logger.debug( - "Response includes sent packet header, stripping sent packet." - ) - return raw_data[len(sent_packet) :] - return raw_data - def parse_response(self, data: bytes) -> Optional[dict[str, object]]: + try: + self.ser.write(packet) + self.ser.flush() + return packet + except serial.SerialException as e: + self.logger.error(f"Failed to send packet: {e}") + raise + + def read_response( + self, sent_packet: bytes, timeout: Optional[float] = None + ) -> Optional[bytes]: + """ + Read and parse response from servo. + + Args: + sent_packet: The packet that was sent (for echo filtering). + timeout: Optional timeout override in seconds. + + Returns: + Optional[bytes]: Response data or None if no response. + """ + old_timeout: Optional[float] = None + if timeout is not None: + old_timeout = self.ser.timeout + self.ser.timeout = timeout + + try: + raw_data = self.ser.read(1024) + self.logger.debug(f"Raw data read: {list(raw_data)}") + if not raw_data: + self.logger.warning("No response received.") + return None + if raw_data.startswith(sent_packet): + self.logger.debug( + "Response includes sent packet header, stripping sent packet." + ) + return raw_data[len(sent_packet) :] + return raw_data + finally: + if old_timeout is not None: + self.ser.timeout = old_timeout + + def parse_response( + self, data: bytes, raise_on_error: bool = False + ) -> Optional[dict[str, object]]: + """ + Parse a raw response packet received from a servo. + + Returns a dictionary with the following keys: + + - ``header`` (bytes): The two-byte header ``[0xFF, 0xFF]``. + - ``id`` (int): Responding servo ID. + - ``length`` (int): Packet length field. + - ``error`` (int): Error status byte (0 = no error). + - ``parameters`` (bytes): Payload bytes, empty if none. + - ``received_checksum`` (int): Checksum byte from the packet. + - ``calculated_checksum`` (int): Locally-computed checksum. + - ``checksum_valid`` (bool): Whether the checksums match. + + Args: + data (bytes): Raw bytes received from the servo. + raise_on_error (bool): If ``True``, raise :class:`ServoStatusError` when + the servo reports a non-zero error status. Default is ``False``. + + Returns: + dict | None: Parsed response, or ``None`` if ``data`` is too short to parse. + + Raises: + ChecksumError: If the packet checksum does not match. + ServoStatusError: If ``raise_on_error`` is ``True`` and the servo error byte + is non-zero. + """ self.logger.debug(f"Parsing response data: {list(data)}") if len(data) < 6: self.logger.warning("Response too short to parse.") @@ -116,6 +278,17 @@ def parse_response(self, data: bytes) -> Optional[dict[str, object]]: checksum_base = servo_id + length + error + sum(parameters) calculated_checksum = (~checksum_base) & 0xFF valid_checksum = calculated_checksum == received_checksum + if not valid_checksum: + error_msg = ( + f"Checksum mismatch: received {received_checksum:#02x}, " + f"calculated {calculated_checksum:#02x}" + ) + self.logger.error(error_msg) + raise ChecksumError(error_msg) + if error != 0: + if raise_on_error: + raise ServoStatusError(servo_id, error) + self.logger.warning(f"Servo {servo_id} reported error code: {error:#02x}") parsed: dict[str, object] = { "header": header, "id": servo_id, @@ -129,50 +302,172 @@ def parse_response(self, data: bytes) -> Optional[dict[str, object]]: self.logger.debug(f"Parsed response: {parsed}") return parsed + def _retry_operation( + self, + operation: Callable[[], Optional[dict[str, object]]], + operation_name: str = "operation", + ) -> Optional[dict[str, object]]: + """ + Retry an operation with exponential backoff. + + Args: + operation: Function to execute. + operation_name: Name for logging. + + Returns: + Result from operation or None if all retries fail. + """ + last_exception = None + + for attempt in range(self.retry_count): + try: + result = operation() + if result is not None: + return result + if attempt < self.retry_count - 1: + delay = self.retry_delay * (2**attempt) + self.logger.debug( + f"{operation_name} attempt {attempt + 1} failed, " + f"retrying in {delay:.3f}s..." + ) + time.sleep(delay) + except (ChecksumError, serial.SerialException) as e: + last_exception = e + if attempt < self.retry_count - 1: + delay = self.retry_delay * (2**attempt) + self.logger.debug( + f"{operation_name} attempt {attempt + 1} failed with {type(e).__name__}, " + f"retrying in {delay:.3f}s..." + ) + time.sleep(delay) + else: + self.logger.error( + f"{operation_name} failed after {self.retry_count} attempts" + ) + + if last_exception: + raise CommunicationTimeoutError( + f"{operation_name} failed after {self.retry_count} retries" + ) from last_exception + + return None + @validate_servo_id - def ping(self, servo_id: int) -> Optional[dict[str, object]]: + def ping( + self, servo_id: int, use_retry: bool = True + ) -> Optional[dict[str, object]]: """ Send PING command to the servo to check if it is responsive. + + Args: + servo_id: ID of servo to ping (0-253). + use_retry: Whether to use retry logic. Default is True. + Returns: dict: Parsed response from the servo if it responds, else None. + + Raises: + CommunicationTimeoutError: If servo doesn't respond after retries. """ self.logger.debug(f"Pinging servo {servo_id}") - packet = self.send_instruction(servo_id, Instruction.PING) - response = self.read_response(packet) - if response: - parsed = self.parse_response(response) - return parsed - return None + + def _ping() -> Optional[dict[str, object]]: + packet = self.send_instruction(servo_id, Instruction.PING) + response = self.read_response(packet) + if response: + return self.parse_response(response) + return None + + if use_retry: + return self._retry_operation(_ping, f"Ping servo {servo_id}") + else: + return _ping() @validate_servo_id - def wrap_servo(self, servo_id: int) -> Servo: + def wrap_servo(self, servo_id: int, verify: bool = True) -> Servo: """ - Create a Servo instance for the given servo ID after verifying it responds to ping. + Create a Servo instance for the given servo ID. + + Args: + servo_id: ID of servo to wrap (0-253). + verify: If True, ping servo first to verify it responds. Default is True. + Returns: Servo: An instance of the Servo class for the given ID. + Raises: - ServoNotRespondingError: If the servo does not respond to ping. + ServoNotRespondingError: If verify=True and servo does not respond to ping. """ - parsed = self.ping(servo_id) - if not parsed or parsed.get("error") != 0: - raise ServoNotRespondingError( - f"Servo ID {servo_id} did not respond to ping." - ) + if verify: + try: + parsed = self.ping(servo_id) + if not parsed or parsed.get("error") != 0: + raise ServoNotRespondingError( + f"Servo ID {servo_id} did not respond to ping or reported an error." + ) + except CommunicationTimeoutError as e: + raise ServoNotRespondingError( + f"Servo ID {servo_id} did not respond to ping." + ) from e return Servo(self, servo_id) - def list_servos(self) -> list[int]: + def list_servos( + self, + start_id: int = 0, + end_id: int = 253, + timeout: float = 0.001, + progress_callback: Optional[Callable[[int, int], None]] = None, + ) -> list[int]: """ - Scan for connected servos by pinging all possible IDs (0-253). + Scan for connected servos by pinging a range of IDs. + + Args: + start_id: Starting servo ID (inclusive). Default is 0. + end_id: Ending servo ID (inclusive). Default is 253. + timeout: Timeout for each ping in seconds. Default is 0.001. + progress_callback: Optional callback function(current, total) for progress updates. + Returns: List of servo IDs that responded to the ping. + + Example: + >>> def show_progress(current, total): + ... print(f"Scanning: {current}/{total}") + >>> servos = controller.list_servos(progress_callback=show_progress) """ + if not 0 <= start_id <= 253: + raise InvalidIDError(start_id) + if not 0 <= end_id <= 253: + raise InvalidIDError(end_id) + if start_id > end_id: + raise ValueError(f"start_id ({start_id}) must be <= end_id ({end_id})") + found = [] - for servo_id in range(0, 254): - try: - self.wrap_servo(servo_id) - found.append(servo_id) - except ServoNotRespondingError: - continue + total = end_id - start_id + 1 + old_retry_count = self.retry_count + self.retry_count = 1 + + try: + for i, servo_id in enumerate(range(start_id, end_id + 1)): + if progress_callback: + progress_callback(i + 1, total) + try: + packet = self.send_instruction(servo_id, Instruction.PING) + response = self.read_response(packet, timeout=timeout) + if response: + parsed = self.parse_response(response) + if parsed and parsed.get("error") == 0: + found.append(servo_id) + self.logger.info(f"Found servo at ID {servo_id}") + except ( + serial.SerialException, + ChecksumError, + CommunicationTimeoutError, + ): + continue + finally: + self.retry_count = old_retry_count + self.logger.info(f"Scan complete. Found {len(found)} servo(s): {found}") return found def _sync_write( @@ -191,7 +486,7 @@ def _sync_write( parameters.append(servo_id) parameters.extend(data) self.send_instruction(0xFE, Instruction.SYNC_WRITE, parameters) - self.logger.debug(f"SYNC WRITE command sent, no response expected") + self.logger.debug("SYNC WRITE command sent, no response expected") def _sync_read( self, address: int, data_length: int, servo_ids: Sequence[int] @@ -202,20 +497,26 @@ def _sync_read( ) parameters = [address, data_length, *servo_ids] packet = self.send_instruction(0xFE, Instruction.SYNC_READ, parameters) - responses: dict[int, Optional[dict[str, object]]] = {} - for servo_id in servo_ids: - response = self.read_response(packet) - if response: - parsed = self.parse_response(response) - self.logger.debug( - f"Servo {servo_id}: received SYNC READ response {parsed}" - ) - responses[servo_id] = parsed - else: + responses: dict[int, Optional[dict[str, object]]] = { + servo_id: None for servo_id in servo_ids + } + rx = self.read_response(packet) + if rx is None: + return responses + b = 0 + while b+3 < len(rx) and rx[b]==0xFF and rx[b+1]==0xFF: + servo_id = rx[b+2] + paramlen = rx[b+3] + pkglen = paramlen + 4 + if paramlen != data_length + 2: self.logger.warning( - f"Servo {servo_id}: no response received for SYNC READ" + f"Servo {servo_id}: no valid response for SYNC READ" ) responses[servo_id] = None + break + if b+pkglen <= len(rx): + responses[servo_id] = self.parse_response(rx[b:b+pkglen]) + b += pkglen return responses def __enter__(self) -> "ST3215": @@ -227,5 +528,14 @@ def __exit__( exc_value: BaseException | None, traceback: object, ) -> Literal[False]: - self.close() + try: + self.close() + except Exception as e: + self.logger.error(f"Error closing connection: {e}") return False + + def __del__(self) -> None: + try: + self.close() + except Exception: + pass