From 94242e4513ea5bb081b4863661e0c583f5df0322 Mon Sep 17 00:00:00 2001 From: Alessio D'Ambrosio Date: Tue, 20 Jan 2026 09:18:34 +0100 Subject: [PATCH 1/7] Add comprehensive error classes and improve ST3215 Introduce multiple specific ST3215 exception classes and expose them in package exports Rename _EEPROMRegisters to EEPROMRegisters and update Servo usage Remove validate_broadcast_only decorator Harden ST3215: validate IDs, handle serial open errors, support timeout overrides, improve packet building/sending, and surface checksum/status errors Bump package version to 1.2.0 --- src/python_st3215/__init__.py | 28 ++- src/python_st3215/decorators.py | 18 -- src/python_st3215/errors.py | 108 ++++++++++ src/python_st3215/registers.py | 10 +- src/python_st3215/servo.py | 7 +- src/python_st3215/st3215.py | 358 +++++++++++++++++++++++++++----- 6 files changed, 448 insertions(+), 81 deletions(-) diff --git a/src/python_st3215/__init__.py b/src/python_st3215/__init__.py index 2b7349c..98616e4 100644 --- a/src/python_st3215/__init__.py +++ b/src/python_st3215/__init__.py @@ -1,14 +1,34 @@ -from .st3215 import ST3215 -from .errors import ST3215Error, ServoNotRespondingError, InvalidInstructionError +from .errors import ( + BroadcastOperationError, + ChecksumError, + CommunicationTimeoutError, + InvalidIDError, + InvalidInstructionError, + InvalidParameterError, + ServoAngleLimitError, + ServoLockedError, + ServoNotRespondingError, + ServoStatusError, + ST3215Error, +) from .servo import Servo +from .st3215 import ST3215 -__version__ = "1.1.0" +__version__ = "1.2.0" __all__ = [ + "__version__", "ST3215", "ST3215Error", "ServoNotRespondingError", "InvalidInstructionError", + "ChecksumError", + "InvalidParameterError", + "ServoAngleLimitError", + "CommunicationTimeoutError", + "ServoLockedError", + "BroadcastOperationError", + "InvalidIDError", + "ServoStatusError", "Servo", - "__version__", ] diff --git a/src/python_st3215/decorators.py b/src/python_st3215/decorators.py index b95e9d7..863f91e 100644 --- a/src/python_st3215/decorators.py +++ b/src/python_st3215/decorators.py @@ -20,24 +20,6 @@ def wrapper(self: Any, servo_id: int, *args: Any, **kwargs: Any) -> Any: return wrapper -def validate_broadcast_only(func: Callable[..., Any]) -> Callable[..., Any]: - """ - Decorator to ensure operation is only used with broadcast servo (ID 254). - """ - - @wraps(func) - def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: - if self.servo.id != 254: - from .errors import ST3215Error - - raise ST3215Error( - f"{func.__name__} can only be used with broadcast servo (ID 254)." - ) - return func(self, *args, **kwargs) - - return wrapper - - def validate_value_range( min_val: int, max_val: int ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: diff --git a/src/python_st3215/errors.py b/src/python_st3215/errors.py index a9e72c4..8399528 100644 --- a/src/python_st3215/errors.py +++ b/src/python_st3215/errors.py @@ -1,10 +1,118 @@ class ST3215Error(Exception): + """Base exception for all ST3215-related errors.""" + pass class ServoNotRespondingError(ST3215Error): + """ + Raised when a servo does not respond to a command. + """ + pass class InvalidInstructionError(ST3215Error): + """ + Raised when an invalid instruction code is used. + """ + + pass + + +class ChecksumError(ST3215Error): + """ + Raised when response checksum validation fails. + """ + + pass + + +class InvalidParameterError(ST3215Error): + """ + Raised when a parameter value is outside valid range. + """ + + def __init__( + self, + parameter: str, + value: int | float, + min_val: int | float, + max_val: int | float, + ): + self.parameter = parameter + self.value = value + self.min_val = min_val + self.max_val = max_val + super().__init__( + f"Parameter '{parameter}' value {value} is outside valid range [{min_val}, {max_val}]" + ) + + +class ServoAngleLimitError(ST3215Error): + """ + Raised when attempting to move servo beyond configured angle limits. + """ + + pass + + +class CommunicationTimeoutError(ST3215Error): + """ + Raised when communication with servo times out. + """ + pass + + +class ServoLockedError(ST3215Error): + """ + Raised when attempting to write to a locked servo. + + The servo's lock must be disabled before writing to EEPROM. + """ + + pass + + +class BroadcastOperationError(ST3215Error): + """ + Raised when an invalid operation is attempted on broadcast ID (254). + + Some operations like ping cannot be performed on the broadcast address. + """ + + pass + + +class InvalidIDError(ST3215Error): + """ + Raised when an invalid servo ID is specified. + + Valid IDs are 0-253. ID 254 is reserved for broadcast. + """ + + def __init__(self, servo_id: int): + self.servo_id = servo_id + super().__init__( + f"Invalid servo ID {servo_id}. Valid range is 0-253 (254 is broadcast)" + ) + + +class ServoStatusError(ST3215Error): + """ + Raised when servo reports an error status. + + The error code from the servo can be inspected to determine + the specific issue. + """ + + def __init__(self, servo_id: int, error_code: int, message: str = ""): + self.servo_id = servo_id + self.error_code = error_code + error_details = None + detail_str = ", ".join(error_details) if error_details else "Unknown error" + full_message = f"Servo {servo_id} error (code {error_code:#02x}): {detail_str}" + if message: + full_message += f" - {message}" + super().__init__(full_message) diff --git a/src/python_st3215/registers.py b/src/python_st3215/registers.py index 6d6ad3e..7809676 100644 --- a/src/python_st3215/registers.py +++ b/src/python_st3215/registers.py @@ -1,15 +1,17 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Optional, Any + +from typing import TYPE_CHECKING, Any, Optional + from .errors import ST3215Error if TYPE_CHECKING: from .servo import Servo from .decorators import ( - validate_value_range, - encode_signed_word, decode_signed_word, + encode_signed_word, encode_unsigned_word, + validate_value_range, ) @@ -48,7 +50,7 @@ def write_word( return write_fn(address, [low, high]) -class _EEPROMRegisters: +class EEPROMRegisters: def __init__(self, servo: "Servo") -> None: self.servo = servo diff --git a/src/python_st3215/servo.py b/src/python_st3215/servo.py index 7de9d8e..70f2615 100644 --- a/src/python_st3215/servo.py +++ b/src/python_st3215/servo.py @@ -1,11 +1,12 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Sequence, Any, cast + +from typing import TYPE_CHECKING, Any, Sequence, cast if TYPE_CHECKING: from .st3215 import ST3215 from .instructions import Instruction -from .registers import _EEPROMRegisters, SRAMRegisters +from .registers import EEPROMRegisters, SRAMRegisters class Servo: @@ -13,7 +14,7 @@ def __init__(self, controller: "ST3215", servo_id: int) -> None: self.controller = controller self.id = servo_id self.logger = controller.logger - self.eeprom = _EEPROMRegisters(self) + self.eeprom = EEPROMRegisters(self) self.sram = SRAMRegisters(self) def send( diff --git a/src/python_st3215/st3215.py b/src/python_st3215/st3215.py index 0dce40c..4d25a1d 100644 --- a/src/python_st3215/st3215.py +++ b/src/python_st3215/st3215.py @@ -1,12 +1,20 @@ import logging +import time +from typing import Callable, Literal, Optional, Sequence + import serial -from .servo import Servo -from .instructions import Instruction -from .errors import ServoNotRespondingError, InvalidInstructionError from .decorators import validate_servo_id - -from typing import Optional, Sequence, Literal +from .errors import ( + ChecksumError, + CommunicationTimeoutError, + InvalidIDError, + InvalidInstructionError, + ServoNotRespondingError, + ServoStatusError, +) +from .instructions import Instruction +from .servo import Servo class ST3215: @@ -35,29 +43,78 @@ def __init__( port: str, baudrate: int = 1000000, read_timeout: float = 0.002, + retry_count: int = 3, + retry_delay: float = 0.01, ) -> None: """ Initialize the ST3215 controller with the given serial port settings. + Args: port (str): Serial port to connect to (e.g., 'COM3' or '/dev/ttyUSB0'). - baudrate (int): Baud rate for serial communication. - read_timeout (float): Read timeout in seconds. + baudrate (int): Baud rate for serial communication. Default is 1,000,000. + read_timeout (float): Read timeout in seconds. Default is 0.002. + retry_count (int): Number of retries for failed communication. Default is 3. + retry_delay (float): Delay between retries in seconds. Default is 0.01. + + Raises: + serial.SerialException: If the serial port cannot be opened. """ self.logger.debug( f"Initializing ST3215 on port {port} with baudrate {baudrate}" ) - self.ser = serial.Serial(port, baudrate=baudrate, timeout=read_timeout) - self.logger.debug(f"Serial port opened at {baudrate} baud.") + self.port = port + self.baudrate = baudrate + self.read_timeout = read_timeout + self.retry_count = retry_count + self.retry_delay = retry_delay + + try: + self.ser = serial.Serial(port, baudrate=baudrate, timeout=read_timeout) + self.logger.debug(f"Serial port opened at {baudrate} baud.") + except serial.SerialException as e: + self.logger.error(f"Failed to open serial port {port}: {e}") + raise self.broadcast = Servo(self, 254) def close(self) -> None: - if self.ser.is_open: + """ + Close the serial connection. + + Safe to call multiple times. + """ + if hasattr(self, "ser") and self.ser.is_open: self.ser.close() self.logger.info("Serial port closed.") + def is_connected(self) -> bool: + """ + Check if the serial connection is open and healthy. + + Returns: + bool: True if connected, False otherwise. + """ + return hasattr(self, "ser") and self.ser.is_open + def build_packet( self, servo_id: int, instruction: int, parameters: Sequence[int] | None = None ) -> bytes: + """ + Build a command packet for the ST3215 protocol. + + Args: + servo_id: Target servo ID (0-253) or 254 for broadcast. + instruction: Instruction code from Instruction enum. + parameters: Optional list of parameter bytes. + + Returns: + bytes: Complete packet ready to send. + + Raises: + InvalidInstructionError: If instruction code is invalid. + InvalidIDError: If servo_id is out of range. + """ + if not 0 <= servo_id <= 254: + raise InvalidIDError(servo_id) if not Instruction.has_value(instruction): raise InvalidInstructionError(f"Invalid instruction: {instruction:#04x}") params = tuple(parameters) if parameters else () @@ -80,29 +137,84 @@ def send_instruction( instruction: int | Instruction, parameters: Sequence[int] | None = None, ) -> bytes: + """ + Send an instruction packet to a servo. + + Args: + servo_id: Target servo ID. + instruction: Instruction code (int or Instruction enum). + parameters: Optional parameter bytes. + + Returns: + bytes: The packet that was sent. + + Raises: + serial.SerialException: If write fails. + """ instruction_value = ( instruction.value if isinstance(instruction, Instruction) else instruction ) packet = self.build_packet(servo_id, instruction_value, parameters) self.logger.debug(f"Sending packet: {list(packet)}") - self.ser.write(packet) - self.ser.flush() - return packet - - def read_response(self, sent_packet: bytes) -> Optional[bytes]: - raw_data = self.ser.read(1024) - self.logger.debug(f"Raw data read: {list(raw_data)}") - if not raw_data: - self.logger.warning("No response received.") - return None - if raw_data.startswith(sent_packet): - self.logger.debug( - "Response includes sent packet header, stripping sent packet." - ) - return raw_data[len(sent_packet) :] - return raw_data - def parse_response(self, data: bytes) -> Optional[dict[str, object]]: + try: + self.ser.write(packet) + self.ser.flush() + return packet + except serial.SerialException as e: + self.logger.error(f"Failed to send packet: {e}") + raise + + def read_response( + self, sent_packet: bytes, timeout: Optional[float] = None + ) -> Optional[bytes]: + """ + Read and parse response from servo. + + Args: + sent_packet: The packet that was sent (for echo filtering). + timeout: Optional timeout override in seconds. + + Returns: + Optional[bytes]: Response data or None if no response. + """ + if timeout is not None: + old_timeout = self.ser.timeout + self.ser.timeout = timeout + + try: + raw_data = self.ser.read(1024) + self.logger.debug(f"Raw data read: {list(raw_data)}") + if not raw_data: + self.logger.warning("No response received.") + return None + if raw_data.startswith(sent_packet): + self.logger.debug( + "Response includes sent packet header, stripping sent packet." + ) + return raw_data[len(sent_packet) :] + return raw_data + finally: + if timeout is not None: + self.ser.timeout = old_timeout + + def parse_response( + self, data: bytes, raise_on_error: bool = False + ) -> Optional[dict[str, object]]: + """ + Parse a response packet from a servo. + + Args: + data: Raw response data. + raise_on_error: If True, raise exception on servo error status. + + Returns: + Optional[dict]: Parsed response or None if invalid. + + Raises: + ChecksumError: If checksum validation fails. + ServoStatusError: If servo reports error and raise_on_error is True. + """ self.logger.debug(f"Parsing response data: {list(data)}") if len(data) < 6: self.logger.warning("Response too short to parse.") @@ -116,6 +228,17 @@ def parse_response(self, data: bytes) -> Optional[dict[str, object]]: checksum_base = servo_id + length + error + sum(parameters) calculated_checksum = (~checksum_base) & 0xFF valid_checksum = calculated_checksum == received_checksum + if not valid_checksum: + error_msg = ( + f"Checksum mismatch: received {received_checksum:#02x}, " + f"calculated {calculated_checksum:#02x}" + ) + self.logger.error(error_msg) + raise ChecksumError(error_msg) + if error != 0: + if raise_on_error: + raise ServoStatusError(servo_id, error) + self.logger.warning(f"Servo {servo_id} reported error code: {error:#02x}") parsed: dict[str, object] = { "header": header, "id": servo_id, @@ -129,50 +252,172 @@ def parse_response(self, data: bytes) -> Optional[dict[str, object]]: self.logger.debug(f"Parsed response: {parsed}") return parsed + def _retry_operation( + self, + operation: Callable[[], Optional[dict[str, object]]], + operation_name: str = "operation", + ) -> Optional[dict[str, object]]: + """ + Retry an operation with exponential backoff. + + Args: + operation: Function to execute. + operation_name: Name for logging. + + Returns: + Result from operation or None if all retries fail. + """ + last_exception = None + + for attempt in range(self.retry_count): + try: + result = operation() + if result is not None: + return result + if attempt < self.retry_count - 1: + delay = self.retry_delay * (2**attempt) + self.logger.debug( + f"{operation_name} attempt {attempt + 1} failed, " + f"retrying in {delay:.3f}s..." + ) + time.sleep(delay) + except (ChecksumError, serial.SerialException) as e: + last_exception = e + if attempt < self.retry_count - 1: + delay = self.retry_delay * (2**attempt) + self.logger.debug( + f"{operation_name} attempt {attempt + 1} failed with {type(e).__name__}, " + f"retrying in {delay:.3f}s..." + ) + time.sleep(delay) + else: + self.logger.error( + f"{operation_name} failed after {self.retry_count} attempts" + ) + + if last_exception: + raise CommunicationTimeoutError( + f"{operation_name} failed after {self.retry_count} retries" + ) from last_exception + + return None + @validate_servo_id - def ping(self, servo_id: int) -> Optional[dict[str, object]]: + def ping( + self, servo_id: int, use_retry: bool = True + ) -> Optional[dict[str, object]]: """ Send PING command to the servo to check if it is responsive. + + Args: + servo_id: ID of servo to ping (0-253). + use_retry: Whether to use retry logic. Default is True. + Returns: dict: Parsed response from the servo if it responds, else None. + + Raises: + CommunicationTimeoutError: If servo doesn't respond after retries. """ self.logger.debug(f"Pinging servo {servo_id}") - packet = self.send_instruction(servo_id, Instruction.PING) - response = self.read_response(packet) - if response: - parsed = self.parse_response(response) - return parsed - return None + + def _ping() -> Optional[dict[str, object]]: + packet = self.send_instruction(servo_id, Instruction.PING) + response = self.read_response(packet) + if response: + return self.parse_response(response) + return None + + if use_retry: + return self._retry_operation(_ping, f"Ping servo {servo_id}") + else: + return _ping() @validate_servo_id - def wrap_servo(self, servo_id: int) -> Servo: + def wrap_servo(self, servo_id: int, verify: bool = True) -> Servo: """ - Create a Servo instance for the given servo ID after verifying it responds to ping. + Create a Servo instance for the given servo ID. + + Args: + servo_id: ID of servo to wrap (0-253). + verify: If True, ping servo first to verify it responds. Default is True. + Returns: Servo: An instance of the Servo class for the given ID. + Raises: - ServoNotRespondingError: If the servo does not respond to ping. + ServoNotRespondingError: If verify=True and servo does not respond to ping. """ - parsed = self.ping(servo_id) - if not parsed or parsed.get("error") != 0: - raise ServoNotRespondingError( - f"Servo ID {servo_id} did not respond to ping." - ) + if verify: + try: + parsed = self.ping(servo_id) + if not parsed or parsed.get("error") != 0: + raise ServoNotRespondingError( + f"Servo ID {servo_id} did not respond to ping or reported an error." + ) + except CommunicationTimeoutError as e: + raise ServoNotRespondingError( + f"Servo ID {servo_id} did not respond to ping." + ) from e return Servo(self, servo_id) - def list_servos(self) -> list[int]: + def list_servos( + self, + start_id: int = 0, + end_id: int = 253, + timeout: float = 0.001, + progress_callback: Optional[Callable[[int, int], None]] = None, + ) -> list[int]: """ - Scan for connected servos by pinging all possible IDs (0-253). + Scan for connected servos by pinging a range of IDs. + + Args: + start_id: Starting servo ID (inclusive). Default is 0. + end_id: Ending servo ID (inclusive). Default is 253. + timeout: Timeout for each ping in seconds. Default is 0.001. + progress_callback: Optional callback function(current, total) for progress updates. + Returns: List of servo IDs that responded to the ping. + + Example: + >>> def show_progress(current, total): + ... print(f"Scanning: {current}/{total}") + >>> servos = controller.list_servos(progress_callback=show_progress) """ + if not 0 <= start_id <= 253: + raise InvalidIDError(start_id) + if not 0 <= end_id <= 253: + raise InvalidIDError(end_id) + if start_id > end_id: + raise ValueError(f"start_id ({start_id}) must be <= end_id ({end_id})") + found = [] - for servo_id in range(0, 254): - try: - self.wrap_servo(servo_id) - found.append(servo_id) - except ServoNotRespondingError: - continue + total = end_id - start_id + 1 + old_retry_count = self.retry_count + self.retry_count = 1 + + try: + for i, servo_id in enumerate(range(start_id, end_id + 1)): + if progress_callback: + progress_callback(i + 1, total) + try: + packet = self.send_instruction(servo_id, Instruction.PING) + response = self.read_response(packet, timeout=timeout) + if response: + parsed = self.parse_response(response) + if parsed and parsed.get("error") == 0: + found.append(servo_id) + self.logger.info(f"Found servo at ID {servo_id}") + except ( + serial.SerialException, + ChecksumError, + CommunicationTimeoutError, + ): + continue + finally: + self.retry_count = old_retry_count + self.logger.info(f"Scan complete. Found {len(found)} servo(s): {found}") return found def _sync_write( @@ -191,7 +436,7 @@ def _sync_write( parameters.append(servo_id) parameters.extend(data) self.send_instruction(0xFE, Instruction.SYNC_WRITE, parameters) - self.logger.debug(f"SYNC WRITE command sent, no response expected") + self.logger.debug("SYNC WRITE command sent, no response expected") def _sync_read( self, address: int, data_length: int, servo_ids: Sequence[int] @@ -227,5 +472,14 @@ def __exit__( exc_value: BaseException | None, traceback: object, ) -> Literal[False]: - self.close() + try: + self.close() + except Exception as e: + self.logger.error(f"Error closing connection: {e}") return False + + def __del__(self) -> None: + try: + self.close() + except Exception: + pass From c61a012e4ab5761e23afe54fdd53db15ffb5e3a0 Mon Sep 17 00:00:00 2001 From: Alessio D'Ambrosio Date: Tue, 20 Jan 2026 09:21:03 +0100 Subject: [PATCH 2/7] only run release workflow on main branch --- .github/workflows/release.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index e8a1173..fec1981 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,6 +1,8 @@ name: Build and Publish to PyPi -on: push +on: + push: + branches: [main] jobs: build: From 11c5d4c173cc5808e26bb197620f0f496878ee9c Mon Sep 17 00:00:00 2001 From: VESICOLOS Date: Mon, 23 Mar 2026 17:45:24 +0100 Subject: [PATCH 3/7] Fixed _sync_read. Addresses #6: `_sync_read` receives one big package from the serial bus, so we shouldn't call `read_response` more than once. This trusts that the buffer read by `read_response` is big enough so that all returned data fits into it. It's currently at 1024 bytes, which means that reading a 2-byte word from up to 128 servos should be OK. Leaving this open for a future patch. --- src/python_st3215/st3215.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/python_st3215/st3215.py b/src/python_st3215/st3215.py index 4d25a1d..5f052db 100644 --- a/src/python_st3215/st3215.py +++ b/src/python_st3215/st3215.py @@ -448,19 +448,22 @@ def _sync_read( parameters = [address, data_length, *servo_ids] packet = self.send_instruction(0xFE, Instruction.SYNC_READ, parameters) responses: dict[int, Optional[dict[str, object]]] = {} - for servo_id in servo_ids: - response = self.read_response(packet) - if response: - parsed = self.parse_response(response) - self.logger.debug( - f"Servo {servo_id}: received SYNC READ response {parsed}" - ) - responses[servo_id] = parsed - else: + rx = self.read_response(packet) + if rx is None: + return None + b = 0 + while b+3 < len(rx) and rx[b]==0xFF and rx[b+1]==0xFF: + servo_id = rx[b+2] + paramlen = rx[b+3] + pkglen = paramlen + 4 + if paramlen != data_length + 2: self.logger.warning( - f"Servo {servo_id}: no response received for SYNC READ" + f"Servo {servo_id}: no valid response for SYNC READ" ) responses[servo_id] = None + break + responses[servo_id] = self.parse_response(rx[b:b+pkglen]) + b += pkglen return responses def __enter__(self) -> "ST3215": From 2bffa43435552096ab8ff11f1459776a6ac7110c Mon Sep 17 00:00:00 2001 From: VESICOLOS Date: Mon, 23 Mar 2026 18:32:24 +0100 Subject: [PATCH 4/7] Fixed signed-word issues. The servos encode signed words with a dedicated sign bit, and not in 2s-complement signed words. Also, the running speed is a signed word, this was missing. --- src/python_st3215/decorators.py | 16 +++++++--------- src/python_st3215/registers.py | 4 ++-- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/python_st3215/decorators.py b/src/python_st3215/decorators.py index 863f91e..1b79395 100644 --- a/src/python_st3215/decorators.py +++ b/src/python_st3215/decorators.py @@ -44,19 +44,17 @@ def wrapper(self: Any, value: int, *args: Any, **kwargs: Any) -> Any: def encode_signed_word(value: int) -> tuple[int, int]: """ Encode a signed 16-bit value to low and high bytes. + Encoding uses most-significant bit as sign bit. Args: - value: Signed integer (-32768 to 32767) + value: Signed integer (-32767 to 32767) Returns: Tuple of (low_byte, high_byte) """ + low, high = abs(value) & 0xFF, (abs(value) >> 8) & 0x7F if value < 0: - raw = 65536 + value - else: - raw = value - low = raw & 0xFF - high = (raw >> 8) & 0xFF + high |= 0x80 return low, high @@ -65,13 +63,13 @@ def decode_signed_word(raw: int) -> int: Decode a 16-bit raw value to signed integer. Args: - raw: Raw 16-bit value (0-65535) + raw: Raw 16-bit value (0-65535) with dedicated sign bit. Returns: - Signed integer (-32768 to 32767) + Signed integer (-32767 to 32767) """ if raw & 0x8000: - return raw - 65536 + return -(raw & 0x7FFF) return raw diff --git a/src/python_st3215/registers.py b/src/python_st3215/registers.py index 7809676..f13d193 100644 --- a/src/python_st3215/registers.py +++ b/src/python_st3215/registers.py @@ -1268,7 +1268,7 @@ def read_current_speed(self) -> Optional[int]: Returns: int: Current speed in steps/s. Returns None if read fails. """ - return read_word(self.servo, 0x3A) + return read_word(self.servo, 0x3A, signed=True) def sync_read_current_speed(self, servo_ids: list[int]) -> dict[int, Optional[int]]: """ @@ -1295,7 +1295,7 @@ def sync_read_current_speed(self, servo_ids: list[int]) -> dict[int, Optional[in raw = data[0] | (data[1] << 8) else: raw = data[0] | (data[1] << 8) - results[servo_id] = raw + results[servo_id] = decode_signed_word(raw) else: results[servo_id] = None return results From 3f96304424184c8e8671b047e41db615d42e9d7c Mon Sep 17 00:00:00 2001 From: Alessio D'Ambrosio Date: Tue, 24 Mar 2026 10:25:12 +0100 Subject: [PATCH 5/7] add support for passing a ser object (for networked purposes) --- examples/01_scan_servos.py | 5 ++- examples/03_read_position.py | 1 + examples/07_temperature_monitor.py | 1 + examples/09_registered_write.py | 46 +++++++++++--------- examples/17_sync_write.py | 1 + examples/18_network_port.py | 70 ++++++++++++++++++++++++++++++ src/python_st3215/st3215.py | 68 ++++++++++++----------------- 7 files changed, 129 insertions(+), 63 deletions(-) create mode 100644 examples/18_network_port.py diff --git a/examples/01_scan_servos.py b/examples/01_scan_servos.py index 898b3d9..2a5bcaf 100644 --- a/examples/01_scan_servos.py +++ b/examples/01_scan_servos.py @@ -3,6 +3,7 @@ """ import os + from python_st3215 import ST3215 controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) @@ -25,10 +26,10 @@ f" Firmware: v{servo.eeprom.read_firmware_major_version()}.{servo.eeprom.read_firmware_minor_version()}" ) print(f" Position: {servo.sram.read_current_location()}") - print(f" Temperature: {servo.sram. read_current_temperature()}°C") + print(f" Temperature: {servo.sram.read_current_temperature()}°C") print(f" Voltage: {servo.sram.read_current_voltage() / 10:.1f}V") print( - f" Min/Max Angle: {servo.eeprom. read_min_angle_limit()} / {servo.eeprom.read_max_angle_limit()}" + f" Min/Max Angle: {servo.eeprom.read_min_angle_limit()} / {servo.eeprom.read_max_angle_limit()}" ) mode = servo.eeprom.read_operating_mode() diff --git a/examples/03_read_position.py b/examples/03_read_position.py index 1a0f8e6..29459ce 100644 --- a/examples/03_read_position.py +++ b/examples/03_read_position.py @@ -4,6 +4,7 @@ import os import time + from python_st3215 import ST3215 controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) diff --git a/examples/07_temperature_monitor.py b/examples/07_temperature_monitor.py index 8ffd2e9..56d0c76 100644 --- a/examples/07_temperature_monitor.py +++ b/examples/07_temperature_monitor.py @@ -4,6 +4,7 @@ import os import time + from python_st3215 import ST3215 controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) diff --git a/examples/09_registered_write.py b/examples/09_registered_write.py index 9e2729d..afa1bf0 100644 --- a/examples/09_registered_write.py +++ b/examples/09_registered_write.py @@ -4,6 +4,7 @@ import os import time + from python_st3215 import ST3215 controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) @@ -11,32 +12,35 @@ try: servos = controller.list_servos() - if len(servos) < 2: - print("This example requires at least 2 servos") - else: - servo_objects = [controller.wrap_servo(sid) for sid in servos[:2]] + servo_objects = [controller.wrap_servo(sid) for sid in servos] - for servo in servo_objects: - servo.sram.torque_enable() - servo.sram.write_acceleration(50) + for servo in servo_objects: + servo.sram.torque_enable() + servo.sram.write_acceleration(50) - print("Preparing movements with registered write...") - servo_objects[0].sram.write_target_location(1000, reg=True) - servo_objects[1].sram.write_target_location(3000, reg=True) + print("Preparing movements with registered write...") + for i, servo in enumerate(servo_objects): + if i % 2 == 0: + servo.sram.write_target_location(1000, reg=True) + else: + servo.sram.write_target_location(3000, reg=True) - print("Executing all movements simultaneously!") - controller.broadcast.action() - time.sleep(2) + print("Executing all movements simultaneously!") + controller.broadcast.action() + time.sleep(2) - print("Preparing opposite movements...") - servo_objects[0].sram.write_target_location(3000, reg=True) - servo_objects[1].sram.write_target_location(1000, reg=True) + print("Preparing opposite movements...") + for i, servo in enumerate(servo_objects): + if i % 2 == 0: + servo.sram.write_target_location(3000, reg=True) + else: + servo.sram.write_target_location(1000, reg=True) - print("Executing!") - controller.broadcast.action() - time.sleep(2) + print("Executing!") + controller.broadcast.action() + time.sleep(2) - for servo in servo_objects: - servo.sram.torque_disable() + for servo in servo_objects: + servo.sram.torque_disable() finally: controller.close() diff --git a/examples/17_sync_write.py b/examples/17_sync_write.py index c540e54..f7e9a80 100644 --- a/examples/17_sync_write.py +++ b/examples/17_sync_write.py @@ -5,6 +5,7 @@ import os import time + from python_st3215 import ST3215 controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) diff --git a/examples/18_network_port.py b/examples/18_network_port.py new file mode 100644 index 0000000..20e848d --- /dev/null +++ b/examples/18_network_port.py @@ -0,0 +1,70 @@ +""" +Scan for all servos on the bus and display their information. +Supports either a direct serial device (e.g. /dev/ttyUSB0) or a network +socket URL (e.g. socket://:). + +Env vars: +- ST3215_URL : full pyserial URL (takes precedence) +- ST3215_HOST : IP/hostname for socket connection (default: 100.122.96.71) +- ST3215_PORT : TCP port for socket connection (default: 2000) +- ST3215_DEV : fallback serial device (default: /dev/ttyUSB0) + +Run this on the host connected to the ST3215 driver board (make sure you have socat installed and set up udev rules for /dev/ttyACM0): + `stty -F /dev/ttyACM0 1000000 raw -echo && socat -d -d TCP4-LISTEN:2000,bind=0.0.0.0,reuseaddr,fork,nodelay FILE:/dev/ttyACM0,b1000000,raw,echo=0` +""" + +import os +import sys + +import serial + +from python_st3215 import ST3215 + +url_env = os.environ.get("ST3215_URL") +host = os.environ.get("ST3215_HOST", "st3215-host") +port = os.environ.get("ST3215_PORT", "2000") + +if url_env: + TARGET_URL = url_env +elif host and port: + TARGET_URL = f"socket://{host}:{port}" +else: + sys.exit(1) + +print(f"Connecting to ST3215 via: {TARGET_URL}") + +ser = serial.serial_for_url(TARGET_URL, timeout=0.02) +controller = ST3215(ser=ser, read_timeout=0.02) + +try: + print("Scanning for servos...\n") + servos = controller.list_servos(timeout=0.02) + + if not servos: + print("No servos found!") + else: + print(f"Found {len(servos)} servo(s)\n") + print("=" * 80) + + mode_names = {0: "Position", 1: "Constant Speed", 2: "PWM", 3: "Stepper"} + + for servo_id in servos: + servo = controller.wrap_servo(servo_id) + + print(f"\nServo ID: {servo_id}") + print( + f" Firmware: v{servo.eeprom.read_firmware_major_version()}.{servo.eeprom.read_firmware_minor_version()}" + ) + print(f" Position: {servo.sram.read_current_location()}") + print(f" Temperature: {servo.sram.read_current_temperature()}°C") + print(f" Voltage: {servo.sram.read_current_voltage() / 10:.1f}V") + print( + f" Min/Max Angle: {servo.eeprom.read_min_angle_limit()} / {servo.eeprom.read_max_angle_limit()}" + ) + + mode = servo.eeprom.read_operating_mode() + print(f" Operating Mode: {mode_names.get(mode, 'Unknown')}") + + print("\n" + "=" * 80) +finally: + controller.close() diff --git a/src/python_st3215/st3215.py b/src/python_st3215/st3215.py index 4d25a1d..e59c6f3 100644 --- a/src/python_st3215/st3215.py +++ b/src/python_st3215/st3215.py @@ -40,25 +40,31 @@ def enable_logging(cls) -> None: def __init__( self, - port: str, + port: Optional[str] = None, baudrate: int = 1000000, read_timeout: float = 0.002, retry_count: int = 3, retry_delay: float = 0.01, + ser: Optional[object] = None, ) -> None: """ Initialize the ST3215 controller with the given serial port settings. Args: - port (str): Serial port to connect to (e.g., 'COM3' or '/dev/ttyUSB0'). + port (Optional[str]): Serial port to connect to (e.g., 'COM3' or '/dev/ttyUSB0'). baudrate (int): Baud rate for serial communication. Default is 1,000,000. read_timeout (float): Read timeout in seconds. Default is 0.002. retry_count (int): Number of retries for failed communication. Default is 3. retry_delay (float): Delay between retries in seconds. Default is 0.01. + ser (Optional[object]): Optional existing serial object to use instead of creating a new one. Raises: + ValueError: If neither port nor ser is provided. serial.SerialException: If the serial port cannot be opened. """ + if port is None and ser is None: + raise ValueError("Either 'port' or 'ser' must be provided.") + self.logger.debug( f"Initializing ST3215 on port {port} with baudrate {baudrate}" ) @@ -69,8 +75,14 @@ def __init__( self.retry_delay = retry_delay try: - self.ser = serial.Serial(port, baudrate=baudrate, timeout=read_timeout) - self.logger.debug(f"Serial port opened at {baudrate} baud.") + if ser is not None: + self.ser = ser + if hasattr(self.ser, "timeout"): + # type: ignore[assignment] + self.ser.timeout = read_timeout + else: + self.ser = serial.Serial(port, baudrate=baudrate, timeout=read_timeout) + self.logger.debug(f"Serial port opened at {baudrate} baud.") except serial.SerialException as e: self.logger.error(f"Failed to open serial port {port}: {e}") raise @@ -82,9 +94,11 @@ def close(self) -> None: Safe to call multiple times. """ - if hasattr(self, "ser") and self.ser.is_open: - self.ser.close() - self.logger.info("Serial port closed.") + if hasattr(self, "ser"): + is_open = getattr(self.ser, "is_open", True) + if is_open and hasattr(self.ser, "close"): + self.ser.close() + self.logger.info("Serial port closed.") def is_connected(self) -> bool: """ @@ -93,26 +107,11 @@ def is_connected(self) -> bool: Returns: bool: True if connected, False otherwise. """ - return hasattr(self, "ser") and self.ser.is_open + return hasattr(self, "ser") and getattr(self.ser, "is_open", True) def build_packet( self, servo_id: int, instruction: int, parameters: Sequence[int] | None = None ) -> bytes: - """ - Build a command packet for the ST3215 protocol. - - Args: - servo_id: Target servo ID (0-253) or 254 for broadcast. - instruction: Instruction code from Instruction enum. - parameters: Optional list of parameter bytes. - - Returns: - bytes: Complete packet ready to send. - - Raises: - InvalidInstructionError: If instruction code is invalid. - InvalidIDError: If servo_id is out of range. - """ if not 0 <= servo_id <= 254: raise InvalidIDError(servo_id) if not Instruction.has_value(instruction): @@ -159,7 +158,8 @@ def send_instruction( try: self.ser.write(packet) - self.ser.flush() + if hasattr(self.ser, "flush"): + self.ser.flush() return packet except serial.SerialException as e: self.logger.error(f"Failed to send packet: {e}") @@ -178,9 +178,11 @@ def read_response( Returns: Optional[bytes]: Response data or None if no response. """ - if timeout is not None: + if timeout is not None and hasattr(self.ser, "timeout"): old_timeout = self.ser.timeout self.ser.timeout = timeout + else: + old_timeout = None try: raw_data = self.ser.read(1024) @@ -195,26 +197,12 @@ def read_response( return raw_data[len(sent_packet) :] return raw_data finally: - if timeout is not None: + if old_timeout is not None and hasattr(self.ser, "timeout"): self.ser.timeout = old_timeout def parse_response( self, data: bytes, raise_on_error: bool = False ) -> Optional[dict[str, object]]: - """ - Parse a response packet from a servo. - - Args: - data: Raw response data. - raise_on_error: If True, raise exception on servo error status. - - Returns: - Optional[dict]: Parsed response or None if invalid. - - Raises: - ChecksumError: If checksum validation fails. - ServoStatusError: If servo reports error and raise_on_error is True. - """ self.logger.debug(f"Parsing response data: {list(data)}") if len(data) < 6: self.logger.warning("Response too short to parse.") From 99d2c711ad5a900477e14212dc7caaf631a7ae2a Mon Sep 17 00:00:00 2001 From: Thomas Voigtmann Date: Fri, 27 Mar 2026 16:25:48 +0100 Subject: [PATCH 6/7] Fixed errors pointed out by Copilot. --- src/python_st3215/decorators.py | 2 ++ src/python_st3215/st3215.py | 9 ++++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/python_st3215/decorators.py b/src/python_st3215/decorators.py index 1b79395..925e30c 100644 --- a/src/python_st3215/decorators.py +++ b/src/python_st3215/decorators.py @@ -52,6 +52,8 @@ def encode_signed_word(value: int) -> tuple[int, int]: Returns: Tuple of (low_byte, high_byte) """ + if value < -32767 or value > 32767: + raise ValueError low, high = abs(value) & 0xFF, (abs(value) >> 8) & 0x7F if value < 0: high |= 0x80 diff --git a/src/python_st3215/st3215.py b/src/python_st3215/st3215.py index 5f052db..e29e358 100644 --- a/src/python_st3215/st3215.py +++ b/src/python_st3215/st3215.py @@ -447,10 +447,12 @@ def _sync_read( ) parameters = [address, data_length, *servo_ids] packet = self.send_instruction(0xFE, Instruction.SYNC_READ, parameters) - responses: dict[int, Optional[dict[str, object]]] = {} + responses: dict[int, Optional[dict[str, object]]] = { + servo_id: None for servo_id in servo_ids + } rx = self.read_response(packet) if rx is None: - return None + return responses b = 0 while b+3 < len(rx) and rx[b]==0xFF and rx[b+1]==0xFF: servo_id = rx[b+2] @@ -462,7 +464,8 @@ def _sync_read( ) responses[servo_id] = None break - responses[servo_id] = self.parse_response(rx[b:b+pkglen]) + if b+pkglen <= len(rx): + responses[servo_id] = self.parse_response(rx[b:b+pkglen]) b += pkglen return responses From de9a2fed0dfb0a9c1efb9d9ffd79d2f5f1809ecb Mon Sep 17 00:00:00 2001 From: Alessio D'Ambrosio Date: Mon, 6 Apr 2026 13:36:34 +0200 Subject: [PATCH 7/7] prep v1.2.0: add docstrings, fix ServoStatusError, enforce BroadcastOperationError, add _SerialLike Protocol, and resolve all mypy strict errors Signed-off-by: Alessio D'Ambrosio --- examples/14_change_id.py | 1 - src/python_st3215/decorators.py | 14 ++-- src/python_st3215/errors.py | 8 +-- src/python_st3215/registers.py | 54 +++++++-------- src/python_st3215/servo.py | 24 +++++++ src/python_st3215/st3215.py | 112 +++++++++++++++++++++++++------- 6 files changed, 147 insertions(+), 66 deletions(-) diff --git a/examples/14_change_id.py b/examples/14_change_id.py index 5f43ae8..7f3c973 100644 --- a/examples/14_change_id.py +++ b/examples/14_change_id.py @@ -5,7 +5,6 @@ import os from python_st3215 import ST3215 - controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) try: diff --git a/src/python_st3215/decorators.py b/src/python_st3215/decorators.py index 863f91e..2d1ef4a 100644 --- a/src/python_st3215/decorators.py +++ b/src/python_st3215/decorators.py @@ -12,22 +12,22 @@ def validate_servo_id(func: Callable[..., Any]) -> Callable[..., Any]: @wraps(func) def wrapper(self: Any, servo_id: int, *args: Any, **kwargs: Any) -> Any: if servo_id == 254: - from .errors import ST3215Error + from .errors import BroadcastOperationError - raise ST3215Error(f"Cannot {func.__name__} broadcast servo ID 254.") + raise BroadcastOperationError( + f"{func.__name__} cannot be used with broadcast ID 254." + ) return func(self, servo_id, *args, **kwargs) return wrapper -def validate_value_range( - min_val: int, max_val: int -) -> Callable[[Callable[..., Any]], Callable[..., Any]]: +def validate_value_range(min_val: int, max_val: int) -> Callable[[F], F]: """ Decorator to validate that a value parameter is within the specified range. """ - def decorator(func: Callable[..., Any]) -> Callable[..., Any]: + def decorator(func: F) -> F: @wraps(func) def wrapper(self: Any, value: int, *args: Any, **kwargs: Any) -> Any: if not (min_val <= value <= max_val): @@ -36,7 +36,7 @@ def wrapper(self: Any, value: int, *args: Any, **kwargs: Any) -> Any: ) return func(self, value, *args, **kwargs) - return wrapper + return wrapper # type: ignore[return-value] return decorator diff --git a/src/python_st3215/errors.py b/src/python_st3215/errors.py index 8399528..938ce51 100644 --- a/src/python_st3215/errors.py +++ b/src/python_st3215/errors.py @@ -110,9 +110,9 @@ class ServoStatusError(ST3215Error): def __init__(self, servo_id: int, error_code: int, message: str = ""): self.servo_id = servo_id self.error_code = error_code - error_details = None - detail_str = ", ".join(error_details) if error_details else "Unknown error" - full_message = f"Servo {servo_id} error (code {error_code:#02x}): {detail_str}" + full_message = ( + f"Servo {servo_id} reported error status (code {error_code:#04x})" + ) if message: - full_message += f" - {message}" + full_message += f": {message}" super().__init__(full_message) diff --git a/src/python_st3215/registers.py b/src/python_st3215/registers.py index 7809676..bbe26c1 100644 --- a/src/python_st3215/registers.py +++ b/src/python_st3215/registers.py @@ -2,7 +2,7 @@ from typing import TYPE_CHECKING, Any, Optional -from .errors import ST3215Error +from .errors import BroadcastOperationError, ST3215Error if TYPE_CHECKING: from .servo import Servo @@ -994,8 +994,8 @@ def sync_write_acceleration(self, servo_data: dict[int, int]) -> None: None (broadcast operation, no response) """ if self.servo.id != 254: - raise ST3215Error( - "sync_write_acceleration can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_write_acceleration can only be called on the broadcast servo (ID 254)." ) formatted_data: dict[int, list[int]] = {} for servo_id, value in servo_data.items(): @@ -1040,8 +1040,8 @@ def sync_write_target_location(self, servo_data: dict[int, int]) -> None: None (broadcast operation, no response) """ if self.servo.id != 254: - raise ST3215Error( - "sync_write_target_location can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_write_target_location can only be called on the broadcast servo (ID 254)." ) formatted_data: dict[int, list[int]] = {} for servo_id, value in servo_data.items(): @@ -1113,8 +1113,8 @@ def sync_write_running_speed(self, servo_data: dict[int, int]) -> None: None (broadcast operation, no response) """ if self.servo.id != 254: - raise ST3215Error( - "sync_write_running_speed can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_write_running_speed can only be called on the broadcast servo (ID 254)." ) formatted_data: dict[int, list[int]] = {} for servo_id, value in servo_data.items(): @@ -1160,8 +1160,8 @@ def sync_write_torque_limit(self, servo_data: dict[int, int]) -> None: None (broadcast operation, no response) """ if self.servo.id != 254: - raise ST3215Error( - "sync_write_torque_limit can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_write_torque_limit can only be called on the broadcast servo (ID 254)." ) formatted_data: dict[int, list[int]] = {} for servo_id, value in servo_data.items(): @@ -1242,8 +1242,8 @@ def sync_read_current_location( Dictionary mapping servo_id to current position """ if self.servo.id != 254: - raise ST3215Error( - "sync_read_current_location can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_read_current_location can only be called on the broadcast servo (ID 254)." ) responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( 0x38, 2, servo_ids @@ -1251,12 +1251,8 @@ def sync_read_current_location( results: dict[int, int | None] = {} for servo_id, response in responses.items(): if response and isinstance(response, dict) and response.get("parameters"): - data: list[int] | bytes = response["parameters"] - if isinstance(data, (bytes, bytearray)): - raw = data[0] | (data[1] << 8) - else: - raw = data[0] | (data[1] << 8) - results[servo_id] = raw + data: bytes = response["parameters"] + results[servo_id] = data[0] | (data[1] << 8) else: results[servo_id] = None return results @@ -1281,8 +1277,8 @@ def sync_read_current_speed(self, servo_ids: list[int]) -> dict[int, Optional[in Dictionary mapping servo_id to current speed """ if self.servo.id != 254: - raise ST3215Error( - "sync_read_current_speed can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_read_current_speed can only be called on the broadcast servo (ID 254)." ) responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( 0x3A, 2, servo_ids @@ -1321,8 +1317,8 @@ def sync_read_current_load(self, servo_ids: list[int]) -> dict[int, Optional[int Dictionary mapping servo_id to current load """ if self.servo.id != 254: - raise ST3215Error( - "sync_read_current_load can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_read_current_load can only be called on the broadcast servo (ID 254)." ) responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( 0x3C, 2, servo_ids @@ -1360,8 +1356,8 @@ def sync_read_current_voltage( Dictionary mapping servo_id to current voltage """ if self.servo.id != 254: - raise ST3215Error( - "sync_read_current_voltage can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_read_current_voltage can only be called on the broadcast servo (ID 254)." ) responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( 0x3E, 1, servo_ids @@ -1397,8 +1393,8 @@ def sync_read_current_temperature( Dictionary mapping servo_id to current temperature """ if self.servo.id != 254: - raise ST3215Error( - "sync_read_current_temperature can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_read_current_temperature can only be called on the broadcast servo (ID 254)." ) responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( 0x3F, 1, servo_ids @@ -1443,8 +1439,8 @@ def sync_read_servo_status(self, servo_ids: list[int]) -> dict[int, Optional[int Dictionary mapping servo_id to servo status """ if self.servo.id != 254: - raise ST3215Error( - "sync_read_servo_status can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_read_servo_status can only be called on the broadcast servo (ID 254)." ) responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( 0x41, 1, servo_ids @@ -1501,8 +1497,8 @@ def sync_read_current_current( Dictionary mapping servo_id to current draw """ if self.servo.id != 254: - raise ST3215Error( - "sync_read_current_current can only be used with broadcast servo (ID 254)." + raise BroadcastOperationError( + "sync_read_current_current can only be called on the broadcast servo (ID 254)." ) responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( 0x45, 2, servo_ids diff --git a/src/python_st3215/servo.py b/src/python_st3215/servo.py index 70f2615..5cf06eb 100644 --- a/src/python_st3215/servo.py +++ b/src/python_st3215/servo.py @@ -11,6 +11,17 @@ class Servo: def __init__(self, controller: "ST3215", servo_id: int) -> None: + """Wrap a single servo for convenient register access. + + Instances are normally obtained via :meth:`ST3215.wrap_servo` rather than + constructed directly. The broadcast pseudo-servo (ID 254) is available as + ``controller.broadcast`` and exposes the ``sync_write_*`` / ``sync_read_*`` + helpers on its ``sram`` attribute. + + Args: + controller (ST3215): The parent controller that owns the serial connection. + servo_id (int): The servo's bus ID (0-253, or 254 for broadcast). + """ self.controller = controller self.id = servo_id self.logger = controller.logger @@ -20,6 +31,19 @@ def __init__(self, controller: "ST3215", servo_id: int) -> None: def send( self, instruction: int | Instruction, parameters: Sequence[int] | None = None ) -> dict[str, object] | None: + """Send a raw instruction to this servo and return the parsed response. + + This is the low-level building block used by all register helpers. Prefer + the typed methods on :attr:`eeprom` and :attr:`sram` for normal use. + + Args: + instruction (int | Instruction): Instruction byte to send. + parameters (Sequence[int] | None): Optional parameter bytes. + + Returns: + dict | None: Parsed response dict (see :meth:`ST3215.parse_response`), + or ``None`` if no response was received. + """ self.logger.debug( f"Servo {self.id}: sending instruction {instruction} with parameters {parameters}" ) diff --git a/src/python_st3215/st3215.py b/src/python_st3215/st3215.py index e59c6f3..fc597d1 100644 --- a/src/python_st3215/st3215.py +++ b/src/python_st3215/st3215.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import logging import time -from typing import Callable, Literal, Optional, Sequence +from typing import Callable, Literal, Optional, Protocol, Sequence import serial @@ -17,25 +19,42 @@ from .servo import Servo +class _SerialLike(Protocol): + """Minimal interface expected of any serial-like object passed to ST3215.""" + + timeout: Optional[float] + is_open: bool + + def write(self, data: bytes) -> Optional[int]: ... + def read(self, size: int) -> bytes: ... + def flush(self) -> None: ... + def close(self) -> None: ... + + +logging.getLogger("ST3215").addHandler(logging.NullHandler()) + + class ST3215: logger = logging.getLogger("ST3215") - logger.setLevel(logging.WARN) - _console_handler = logging.StreamHandler() - _console_handler.setFormatter( - logging.Formatter("[%(levelname)s] %(name)s: %(message)s") - ) - logger.addHandler(_console_handler) @classmethod def set_log_level(cls, level: int) -> None: + """Set the logging level for the ST3215 logger. + + Args: + level (int): A logging level constant such as ``logging.DEBUG``, + ``logging.INFO``, ``logging.WARNING``, etc. + """ cls.logger.setLevel(level) @classmethod def disable_logging(cls) -> None: + """Disable all ST3215 log output.""" cls.logger.disabled = True @classmethod def enable_logging(cls) -> None: + """Re-enable ST3215 log output after a previous call to :meth:`disable_logging`.""" cls.logger.disabled = False def __init__( @@ -45,7 +64,7 @@ def __init__( read_timeout: float = 0.002, retry_count: int = 3, retry_delay: float = 0.01, - ser: Optional[object] = None, + ser: Optional[_SerialLike] = None, ) -> None: """ Initialize the ST3215 controller with the given serial port settings. @@ -56,7 +75,9 @@ def __init__( read_timeout (float): Read timeout in seconds. Default is 0.002. retry_count (int): Number of retries for failed communication. Default is 3. retry_delay (float): Delay between retries in seconds. Default is 0.01. - ser (Optional[object]): Optional existing serial object to use instead of creating a new one. + ser (Optional[_SerialLike]): Optional existing serial-like object to use instead + of opening a new port. Must expose ``read``, ``write``, ``flush``, ``close``, + ``is_open``, and ``timeout``. Raises: ValueError: If neither port nor ser is provided. @@ -76,10 +97,8 @@ def __init__( try: if ser is not None: - self.ser = ser - if hasattr(self.ser, "timeout"): - # type: ignore[assignment] - self.ser.timeout = read_timeout + self.ser: _SerialLike = ser + self.ser.timeout = read_timeout else: self.ser = serial.Serial(port, baudrate=baudrate, timeout=read_timeout) self.logger.debug(f"Serial port opened at {baudrate} baud.") @@ -94,11 +113,9 @@ def close(self) -> None: Safe to call multiple times. """ - if hasattr(self, "ser"): - is_open = getattr(self.ser, "is_open", True) - if is_open and hasattr(self.ser, "close"): - self.ser.close() - self.logger.info("Serial port closed.") + if hasattr(self, "ser") and self.ser.is_open: + self.ser.close() + self.logger.info("Serial port closed.") def is_connected(self) -> bool: """ @@ -107,11 +124,31 @@ def is_connected(self) -> bool: Returns: bool: True if connected, False otherwise. """ - return hasattr(self, "ser") and getattr(self.ser, "is_open", True) + return hasattr(self, "ser") and self.ser.is_open def build_packet( self, servo_id: int, instruction: int, parameters: Sequence[int] | None = None ) -> bytes: + """Construct a raw protocol packet ready to be written to the serial bus. + + The ST3215 packet format is:: + + [0xFF, 0xFF, ID, LENGTH, INSTRUCTION, PARAM..., CHECKSUM] + + Checksum = (~(ID + LENGTH + INSTRUCTION + sum(PARAMS))) & 0xFF + + Args: + servo_id (int): Target servo ID (0-254, where 254 is broadcast). + instruction (int): Instruction byte (must be a valid :class:`Instruction` value). + parameters (Sequence[int] | None): Optional parameter bytes to include. + + Returns: + bytes: The fully-formed packet. + + Raises: + InvalidIDError: If ``servo_id`` is outside 0-254. + InvalidInstructionError: If ``instruction`` is not a recognised instruction code. + """ if not 0 <= servo_id <= 254: raise InvalidIDError(servo_id) if not Instruction.has_value(instruction): @@ -158,8 +195,7 @@ def send_instruction( try: self.ser.write(packet) - if hasattr(self.ser, "flush"): - self.ser.flush() + self.ser.flush() return packet except serial.SerialException as e: self.logger.error(f"Failed to send packet: {e}") @@ -178,11 +214,10 @@ def read_response( Returns: Optional[bytes]: Response data or None if no response. """ - if timeout is not None and hasattr(self.ser, "timeout"): + old_timeout: Optional[float] = None + if timeout is not None: old_timeout = self.ser.timeout self.ser.timeout = timeout - else: - old_timeout = None try: raw_data = self.ser.read(1024) @@ -197,12 +232,39 @@ def read_response( return raw_data[len(sent_packet) :] return raw_data finally: - if old_timeout is not None and hasattr(self.ser, "timeout"): + if old_timeout is not None: self.ser.timeout = old_timeout def parse_response( self, data: bytes, raise_on_error: bool = False ) -> Optional[dict[str, object]]: + """ + Parse a raw response packet received from a servo. + + Returns a dictionary with the following keys: + + - ``header`` (bytes): The two-byte header ``[0xFF, 0xFF]``. + - ``id`` (int): Responding servo ID. + - ``length`` (int): Packet length field. + - ``error`` (int): Error status byte (0 = no error). + - ``parameters`` (bytes): Payload bytes, empty if none. + - ``received_checksum`` (int): Checksum byte from the packet. + - ``calculated_checksum`` (int): Locally-computed checksum. + - ``checksum_valid`` (bool): Whether the checksums match. + + Args: + data (bytes): Raw bytes received from the servo. + raise_on_error (bool): If ``True``, raise :class:`ServoStatusError` when + the servo reports a non-zero error status. Default is ``False``. + + Returns: + dict | None: Parsed response, or ``None`` if ``data`` is too short to parse. + + Raises: + ChecksumError: If the packet checksum does not match. + ServoStatusError: If ``raise_on_error`` is ``True`` and the servo error byte + is non-zero. + """ self.logger.debug(f"Parsing response data: {list(data)}") if len(data) < 6: self.logger.warning("Response too short to parse.")