v1.2.0#5
Conversation
Introduce multiple specific ST3215 exception classes and expose them in package exports Rename _EEPROMRegisters to EEPROMRegisters and update Servo usage Remove validate_broadcast_only decorator Harden ST3215: validate IDs, handle serial open errors, support timeout overrides, improve packet building/sending, and surface checksum/status errors Bump package version to 1.2.0
There was a problem hiding this comment.
Pull request overview
This pull request introduces version 1.2.0 of the python_st3215 library, adding comprehensive error handling, retry mechanisms with exponential backoff, and improved API documentation for servo communication.
Changes:
- Added 8 new custom exception classes for specific error scenarios (ChecksumError, InvalidIDError, ServoStatusError, etc.)
- Implemented retry logic with exponential backoff for communication operations and integrated into ping method
- Enhanced servo scanning with configurable ID ranges, per-ping timeouts, and progress callbacks
- Improved docstrings for all public methods with detailed parameter, return value, and exception documentation
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| src/python_st3215/errors.py | Added 8 new exception classes with detailed docstrings for better error handling |
| src/python_st3215/st3215.py | Added retry logic, timeout handling, enhanced scanning, improved error handling, and comprehensive docstrings |
| src/python_st3215/servo.py | Updated import to use public EEPROMRegisters class |
| src/python_st3215/registers.py | Renamed _EEPROMRegisters to EEPROMRegisters (made public) |
| src/python_st3215/decorators.py | Removed unused validate_broadcast_only decorator |
| src/python_st3215/init.py | Updated version to 1.2.0 and exported all new exception classes |
| .github/workflows/release.yml | Restricted workflow to only trigger on main branch pushes |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| error_details = None | ||
| detail_str = ", ".join(error_details) if error_details else "Unknown error" |
There was a problem hiding this comment.
The variable error_details is always set to None but then used in a conditional join operation. This will cause the condition if error_details to always be False, and ", ".join(error_details) would fail with a TypeError if error_details were truthy. Either this variable should be computed from the error_code, or this logic should be removed if error code parsing is not yet implemented.
| error_details = None | |
| detail_str = ", ".join(error_details) if error_details else "Unknown error" | |
| detail_str = "Unknown error" |
| Returns: | ||
| dict: Parsed response from the servo if it responds, else None. | ||
|
|
||
| Raises: | ||
| CommunicationTimeoutError: If servo doesn't respond after retries. |
There was a problem hiding this comment.
The docstring states that ping returns "dict: Parsed response from the servo if it responds, else None" but also states it raises CommunicationTimeoutError. When use_retry is True (the default), the method will raise an exception instead of returning None when retries are exhausted. The docstring should clarify that None is only returned when use_retry=False and there's no response, or CommunicationTimeoutError is raised when use_retry=True and all retries fail.
| for attempt in range(self.retry_count): | ||
| try: | ||
| result = operation() | ||
| if result is not None: | ||
| return result | ||
| if attempt < self.retry_count - 1: | ||
| delay = self.retry_delay * (2**attempt) | ||
| self.logger.debug( | ||
| f"{operation_name} attempt {attempt + 1} failed, " | ||
| f"retrying in {delay:.3f}s..." | ||
| ) | ||
| time.sleep(delay) | ||
| except (ChecksumError, serial.SerialException) as e: | ||
| last_exception = e | ||
| if attempt < self.retry_count - 1: | ||
| delay = self.retry_delay * (2**attempt) | ||
| self.logger.debug( | ||
| f"{operation_name} attempt {attempt + 1} failed with {type(e).__name__}, " | ||
| f"retrying in {delay:.3f}s..." | ||
| ) | ||
| time.sleep(delay) | ||
| else: | ||
| self.logger.error( | ||
| f"{operation_name} failed after {self.retry_count} attempts" | ||
| ) | ||
|
|
||
| if last_exception: | ||
| raise CommunicationTimeoutError( | ||
| f"{operation_name} failed after {self.retry_count} retries" | ||
| ) from last_exception | ||
|
|
||
| return None |
There was a problem hiding this comment.
In the retry logic, when an operation returns None (lines 274-283), the code logs that the attempt "failed" and retries. However, after all retry attempts are exhausted and the operation consistently returns None without raising an exception, the method returns None at line 303 instead of raising a CommunicationTimeoutError. This inconsistency means that if an operation returns None (e.g., no response), it won't raise the documented CommunicationTimeoutError. The method should either raise an exception when None is returned after all retries, or the documentation should be updated to reflect that None can be returned.
Addresses #6: `_sync_read` receives one big package from the serial bus, so we shouldn't call `read_response` more than once. This trusts that the buffer read by `read_response` is big enough so that all returned data fits into it. It's currently at 1024 bytes, which means that reading a 2-byte word from up to 128 servos should be OK. Leaving this open for a future patch.
The servos encode signed words with a dedicated sign bit, and not in 2s-complement signed words. Also, the running speed is a signed word, this was missing.
Fixed _sync_read.
…perationError, add _SerialLike Protocol, and resolve all mypy strict errors Signed-off-by: Alessio D'Ambrosio <alessio@alessiodam.dev>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 14 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| raise CommunicationTimeoutError( | ||
| f"{operation_name} failed after {self.retry_count} retries" | ||
| ) from last_exception | ||
|
|
||
| return None |
There was a problem hiding this comment.
_retry_operation only raises CommunicationTimeoutError when an exception was captured; if every attempt simply returns None (e.g., repeated read timeouts with no bytes), last_exception stays None and the method returns None, contradicting ping()'s docstring and making failures silent. Consider treating a None result after exhausting retries as a failure (raise CommunicationTimeoutError) or returning a typed Result that distinguishes "no response" from "success".
| self.logger.warning( | ||
| f"Servo {servo_id}: no response received for SYNC READ" | ||
| f"Servo {servo_id}: no valid response for SYNC READ" | ||
| ) | ||
| responses[servo_id] = None | ||
| break |
There was a problem hiding this comment.
In _sync_read, a single packet with an unexpected LENGTH (paramlen) causes a break, which stops parsing and can drop valid responses from other servos contained later in the buffer. More robust behavior would be to advance/search for the next 0xFF 0xFF header (or continue parsing) instead of breaking the whole SYNC_READ response processing.
| Tuple of (low_byte, high_byte) | ||
| """ | ||
| if value < -32767 or value > 32767: | ||
| raise ValueError |
There was a problem hiding this comment.
encode_signed_word raises a bare ValueError with no message when out of range. Since the library now defines InvalidParameterError, consider raising that (or at least providing a descriptive ValueError message) so callers can diagnose invalid ranges more easily.
| raise ValueError | |
| from .errors import InvalidParameterError | |
| raise InvalidParameterError( | |
| f"encode_signed_word: value {value} out of range [-32767, 32767]" | |
| ) |
| Supports either a direct serial device (e.g. /dev/ttyUSB0) or a network | ||
| socket URL (e.g. socket://<IP>:<PORT>). | ||
|
|
||
| Env vars: | ||
| - ST3215_URL : full pyserial URL (takes precedence) | ||
| - ST3215_HOST : IP/hostname for socket connection (default: 100.122.96.71) | ||
| - ST3215_PORT : TCP port for socket connection (default: 2000) | ||
| - ST3215_DEV : fallback serial device (default: /dev/ttyUSB0) |
There was a problem hiding this comment.
This docstring documents ST3215_HOST default as "100.122.96.71" and mentions an ST3215_DEV serial fallback, but the script defaults host to "st3215-host" and never uses ST3215_DEV (it always builds a socket:// URL when host/port defaults are present). Align the documented defaults with the code and either implement the ST3215_DEV fallback or remove it from the docstring.
| Supports either a direct serial device (e.g. /dev/ttyUSB0) or a network | |
| socket URL (e.g. socket://<IP>:<PORT>). | |
| Env vars: | |
| - ST3215_URL : full pyserial URL (takes precedence) | |
| - ST3215_HOST : IP/hostname for socket connection (default: 100.122.96.71) | |
| - ST3215_PORT : TCP port for socket connection (default: 2000) | |
| - ST3215_DEV : fallback serial device (default: /dev/ttyUSB0) | |
| Connect using either a full pyserial URL or a network socket URL | |
| constructed from the host and port settings. | |
| Env vars: | |
| - ST3215_URL : full pyserial URL (takes precedence) | |
| - ST3215_HOST : IP/hostname for socket connection (default: st3215-host) | |
| - ST3215_PORT : TCP port for socket connection (default: 2000) |
| data: list[int] | bytes = response["parameters"] | ||
| if isinstance(data, (bytes, bytearray)): | ||
| raw = data[0] | (data[1] << 8) | ||
| else: | ||
| raw = data[0] | (data[1] << 8) |
There was a problem hiding this comment.
sync_read_current_speed contains a bytes-vs-list branch where both branches compute raw the same way; parse_response currently always sets "parameters" to a bytes object, so this conditional is redundant and can be simplified to reduce complexity and avoid misleading typing.
| old_retry_count = self.retry_count | ||
| self.retry_count = 1 | ||
|
|
||
| try: | ||
| for i, servo_id in enumerate(range(start_id, end_id + 1)): | ||
| if progress_callback: | ||
| progress_callback(i + 1, total) | ||
| try: | ||
| packet = self.send_instruction(servo_id, Instruction.PING) | ||
| response = self.read_response(packet, timeout=timeout) | ||
| if response: | ||
| parsed = self.parse_response(response) | ||
| if parsed and parsed.get("error") == 0: | ||
| found.append(servo_id) | ||
| self.logger.info(f"Found servo at ID {servo_id}") | ||
| except ( | ||
| serial.SerialException, | ||
| ChecksumError, | ||
| CommunicationTimeoutError, | ||
| ): | ||
| continue | ||
| finally: | ||
| self.retry_count = old_retry_count |
There was a problem hiding this comment.
list_servos temporarily sets self.retry_count = 1 but the method doesn't call _retry_operation / ping(use_retry=True), so this mutation currently has no effect other than briefly changing controller state. Consider removing it, or route the per-ID ping through ping(use_retry=...) so the retry settings are actually applied as intended.
| old_retry_count = self.retry_count | |
| self.retry_count = 1 | |
| try: | |
| for i, servo_id in enumerate(range(start_id, end_id + 1)): | |
| if progress_callback: | |
| progress_callback(i + 1, total) | |
| try: | |
| packet = self.send_instruction(servo_id, Instruction.PING) | |
| response = self.read_response(packet, timeout=timeout) | |
| if response: | |
| parsed = self.parse_response(response) | |
| if parsed and parsed.get("error") == 0: | |
| found.append(servo_id) | |
| self.logger.info(f"Found servo at ID {servo_id}") | |
| except ( | |
| serial.SerialException, | |
| ChecksumError, | |
| CommunicationTimeoutError, | |
| ): | |
| continue | |
| finally: | |
| self.retry_count = old_retry_count | |
| for i, servo_id in enumerate(range(start_id, end_id + 1)): | |
| if progress_callback: | |
| progress_callback(i + 1, total) | |
| try: | |
| packet = self.send_instruction(servo_id, Instruction.PING) | |
| response = self.read_response(packet, timeout=timeout) | |
| if response: | |
| parsed = self.parse_response(response) | |
| if parsed and parsed.get("error") == 0: | |
| found.append(servo_id) | |
| self.logger.info(f"Found servo at ID {servo_id}") | |
| except ( | |
| serial.SerialException, | |
| ChecksumError, | |
| CommunicationTimeoutError, | |
| ): | |
| continue |
This pull request introduces several improvements and refactorings to the ST3215 Python library, focusing on error handling, broadcast operation validation, and signed value encoding/decoding. It also adds a new example for network socket connections and cleans up the example scripts.
Highlights:
18_network_port.py) for connecting via network socket.Error Handling Improvements
errors.py(e.g.,ChecksumError,InvalidParameterError,ServoAngleLimitError,CommunicationTimeoutError,ServoLockedError,BroadcastOperationError,InvalidIDError,ServoStatusError) with descriptive docstrings and constructors for improved error reporting and handling.__init__.pyto export all new exceptions and incremented the version to1.2.0.Broadcast Operation Validation
BroadcastOperationErrorinstead of the genericST3215Errorwhen an operation is attempted on the broadcast ID (254) where it is not allowed. This change improves clarity and consistency in error handling. [1] [2] [3] [4] [5] [6] [7] [8] [9]Signed Value Encoding/Decoding
encode_signed_wordanddecode_signed_wordto use a dedicated sign bit, ensuring accurate conversion for the servo protocol. Updated relevant usages to match the new logic. [1] [2] [3] [4]New and Improved Examples
examples/18_network_port.py, allowing scanning and information display for servos over a network socket, with flexible environment variable configuration.09_registered_write.pyto support any number of servos and alternate movement targets for more flexible demonstrations.Miscellaneous
.github/workflows/release.ymlto only build and publish on pushes to themainbranch._EEPROMRegisterstoEEPROMRegisters.These changes improve the robustness, maintainability, and usability of the library, especially in error handling and broadcast operations.