Fixed _sync_read.#7
Conversation
Addresses alessiodam#6: `_sync_read` receives one big package from the serial bus, so we shouldn't call `read_response` more than once. This trusts that the buffer read by `read_response` is big enough so that all returned data fits into it. It's currently at 1024 bytes, which means that reading a 2-byte word from up to 128 servos should be OK. Leaving this open for a future patch.
The servos encode signed words with a dedicated sign bit, and not in 2s-complement signed words. Also, the running speed is a signed word, this was missing.
There was a problem hiding this comment.
Pull request overview
This PR updates the Python ST3215 driver to correctly handle SYNC_READ responses that arrive as a single combined serial read, and adjusts signed word handling for speed feedback.
Changes:
- Update
ST3215._sync_readto callread_response()once and parse multiple response packets from the returned buffer. - Fix current speed decoding to be signed for both single-servo reads and sync reads.
- Change signed 16-bit encoding/decoding helpers to use a dedicated sign bit (sign-magnitude).
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
src/python_st3215/st3215.py |
Reworks _sync_read to parse multiple servo responses from a single read buffer. |
src/python_st3215/registers.py |
Reads/decodes current speed as signed for both single and sync reads. |
src/python_st3215/decorators.py |
Updates signed word encode/decode logic to sign-bit (sign-magnitude) representation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| rx = self.read_response(packet) | ||
| if rx is None: | ||
| return None | ||
| b = 0 |
There was a problem hiding this comment.
_sync_read is annotated to return a dict, but when read_response() returns None this now returns None. Callers (e.g., sync_read_current_location/speed/...) immediately call .items() on the result, which will raise at runtime and should also fail mypy. Consider always returning a dict (e.g., pre-fill {servo_id: None for servo_id in servo_ids} and return that on timeout) or update the return type and fix all call sites accordingly.
| while b+3 < len(rx) and rx[b]==0xFF and rx[b+1]==0xFF: | ||
| servo_id = rx[b+2] | ||
| paramlen = rx[b+3] | ||
| pkglen = paramlen + 4 | ||
| if paramlen != data_length + 2: | ||
| self.logger.warning( | ||
| f"Servo {servo_id}: no response received for SYNC READ" | ||
| f"Servo {servo_id}: no valid response for SYNC READ" | ||
| ) | ||
| responses[servo_id] = None | ||
| break | ||
| responses[servo_id] = self.parse_response(rx[b:b+pkglen]) | ||
| b += pkglen |
There was a problem hiding this comment.
The loop slices rx[b:b+pkglen] without verifying b + pkglen <= len(rx). If the serial read returns a truncated final packet, parse_response can raise ChecksumError (or mis-parse) and abort the whole sync-read. Add an explicit bounds check before parsing and handle incomplete packets (e.g., log and break / mark remaining requested IDs as None).
| def encode_signed_word(value: int) -> tuple[int, int]: | ||
| """ | ||
| Encode a signed 16-bit value to low and high bytes. | ||
| Encoding uses most-significant bit as sign bit. | ||
|
|
||
| Args: | ||
| value: Signed integer (-32768 to 32767) | ||
| value: Signed integer (-32767 to 32767) | ||
|
|
||
| Returns: | ||
| Tuple of (low_byte, high_byte) | ||
| """ | ||
| low, high = abs(value) & 0xFF, (abs(value) >> 8) & 0x7F | ||
| if value < 0: | ||
| raw = 65536 + value | ||
| else: | ||
| raw = value | ||
| low = raw & 0xFF | ||
| high = (raw >> 8) & 0xFF | ||
| high |= 0x80 | ||
| return low, high |
There was a problem hiding this comment.
encode_signed_word will silently produce an incorrect encoding for out-of-range values (e.g., -32768 becomes 0x8000, which decodes back to 0 with the new sign-bit scheme). Since this helper is callable outside the register methods that have @validate_value_range, it should defensively validate the input range (±32767) and raise a clear ValueError if exceeded.
| if paramlen != data_length + 2: | ||
| self.logger.warning( | ||
| f"Servo {servo_id}: no response received for SYNC READ" | ||
| f"Servo {servo_id}: no valid response for SYNC READ" | ||
| ) | ||
| responses[servo_id] = None | ||
| break | ||
| responses[servo_id] = self.parse_response(rx[b:b+pkglen]) |
There was a problem hiding this comment.
On paramlen mismatch you break, which stops parsing the remainder of rx and will drop any subsequent valid servo responses that were already received. If one servo returns a malformed/short packet, it would be better to record that servo as None and continue scanning/parsing the rest (or at least mark remaining requested IDs as None before returning).
|
I've tested your changes and they seem to be working :) Just waiting for the Copilot review to see what kind of things it might hallucinate xD |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if paramlen != data_length + 2: | ||
| self.logger.warning( | ||
| f"Servo {servo_id}: no response received for SYNC READ" | ||
| f"Servo {servo_id}: no valid response for SYNC READ" | ||
| ) | ||
| responses[servo_id] = None | ||
| break |
There was a problem hiding this comment.
In _sync_read, when a packet’s paramlen doesn’t match data_length + 2, the code breaks out of the parsing loop. This will drop any subsequent valid response packets that may already be present in rx (e.g., if one servo returns a malformed packet or there’s a stray/unknown packet in the stream). Consider marking that servo as None and then attempting to re-sync by scanning forward for the next 0xFF 0xFF header (or at least continue parsing remaining bytes when safe) instead of breaking immediately.
| break | ||
| if b+pkglen <= len(rx): | ||
| responses[servo_id] = self.parse_response(rx[b:b+pkglen]) | ||
| b += pkglen |
There was a problem hiding this comment.
If b + pkglen > len(rx) (truncated/partial packet at the end of the read), the code currently increments b and exits the loop without logging or explicitly handling the incomplete packet. This can make partial reads hard to diagnose and may silently return None for that servo. Consider adding an else branch to warn about a truncated packet and break (or attempt a re-read/resync), so callers get clearer diagnostics.
| b += pkglen | |
| b += pkglen | |
| else: | |
| self.logger.warning( | |
| "Truncated SYNC READ response for servo %s: expected %d bytes, " | |
| "but only %d bytes remain; stopping parse", | |
| servo_id, | |
| pkglen, | |
| len(rx) - b, | |
| ) | |
| break |
| def read_current_speed(self) -> Optional[int]: | ||
| """ | ||
| Read the current speed (feedback). | ||
|
|
||
| Returns: | ||
| int: Current speed in steps/s. Returns None if read fails. | ||
| """ | ||
| return read_word(self.servo, 0x3A) | ||
| return read_word(self.servo, 0x3A, signed=True) |
There was a problem hiding this comment.
read_current_speed now returns a signed value (signed=True), so the return value can be negative (direction). The docstring still describes it as “Current speed in steps/s” without mentioning sign/direction or the signed range. Please update the docstring (and the SYNC variant nearby, since it now also decodes signed values) to reflect the signed semantics so callers don’t misinterpret negative speeds.
|
Looks good to me, it works :) |
Addresses #6:
_sync_readreceives one big package from the serial bus, so we shouldn't callread_responsemore than once.This trusts that the buffer read by
read_responseis big enough so that all returned data fits into it. It's currently at 1024 bytes, which means that reading a 2-byte word from up to 128 servos should be OK. Leaving this open for a future patch.