diff --git a/.github/workflows/compile.yml b/.github/workflows/compile.yml new file mode 100644 index 0000000..b4699ba --- /dev/null +++ b/.github/workflows/compile.yml @@ -0,0 +1,33 @@ +name: Test Compilation + +on: + push: + paths: + - "src/**/*.py" + - "pyproject.toml" + - "requirements.txt" + - ".github/workflows/compile.yml" + pull_request: + paths: + - "src/**/*.py" + - "pyproject.toml" + - "requirements.txt" + - ".github/workflows/compile.yml" + +jobs: + compile: + name: Library Compilation + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.x" + - name: Install build tools + run: | + pip install --upgrade pip + pip install -r requirements.txt + pip install build + - name: Attempt building wheel and sdist + run: python3 -m build --sdist --wheel diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 4efd3c9..e8a1173 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,4 +1,4 @@ -name: Publish Python 🐍 distribution 📦 to PyPI and TestPyPI +name: Build and Publish to PyPi on: push diff --git a/.github/workflows/type-check.yml b/.github/workflows/type-check.yml new file mode 100644 index 0000000..c21a384 --- /dev/null +++ b/.github/workflows/type-check.yml @@ -0,0 +1,34 @@ +name: Type Check + +on: + push: + paths: + - "src/**/*.py" + - "pyproject.toml" + - "requirements.txt" + - ".github/workflows/type-check.yml" + pull_request: + paths: + - "src/**/*.py" + - "pyproject.toml" + - "requirements.txt" + - ".github/workflows/type-check.yml" + +jobs: + type-check: + name: Type Checking with mypy + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.x" + - name: Install dependencies + run: | + pip install --upgrade pip + pip install mypy + pip install -r requirements.txt + pip install . + - name: Run mypy + run: mypy src/ diff --git a/MEMORY_TABLE.md b/MEMORY_TABLE.md index 53024ff..c8b9abb 100644 --- a/MEMORY_TABLE.md +++ b/MEMORY_TABLE.md @@ -2,63 +2,63 @@ > This is based on the [STS3215_Memory_Table_EN.xlsx](documents/STS3215_Memory_Table_EN.xlsx) spreadsheet. -| 0x | DEC | HEX | Parameter | Size | Default | Type | Access | Min | Max | Unit | Description | +| 0x | Dec | Hex | Parameter | Size | Default | Type | Access | Min | Max | Unit | Description | |----|-----|------|-----------------------------------------------------|------|---------|--------|---------------------|--------|-------|--------------|-----------------------------------------------------------------------------------------------------| -| 0 | 0 | 0 | Firmware major version number | 1 | 1 | EEPROM | Read-only | -- | -- | -- | -- | -| 1 | 1 | 1 | Firmware minor version number | 1 | 1 | EEPROM | Read-only | -- | -- | -- | -- | -| 2 | 2 | 2 | END | 1 | 0 | EEPROM | Read-only | -- | -- | -- | -- | -| 3 | 3 | 3 | Servo main version number | 1 | | EEPROM | Read-only | -- | -- | -- | -- | -| 4 | 4 | 4 | Servo version number | 1 | | EEPROM | Read-only | -- | -- | -- | -- | -| 5 | 5 | 0x5 | ID | 1 | 1 | EEPROM | Reading and writing | 0 | 253 | Number | Unique identifier on the bus; duplicate IDs are not allowed. 254 (0xFE) is broadcast ID. | -| 6 | 6 | 0x6 | Baud rate | 1 | 0 | EEPROM | Reading and writing | 0 | 7 | none | 0: 1,000,000; 1: 500,000; 2: 250,000; 3: 128,000; 4: 115,200; 5: 76,800; 6: 57,600; 7: 38,400 baud. | -| 7 | 7 | 0x7 | Return delay | 1 | 250 | EEPROM | Reading and writing | 0 | 254 | 2µs | Minimum unit is 2µs; max configurable 254*2 = 508µs. | -| 8 | 8 | 0x8 | Phase 2 (Response Status Level) | 1 | 1 | EEPROM | Reading and writing | 0 | 1 | none | 0: No response for commands other than read/PING. 1: Response returned for all commands. | -| 9 | 9 | 0x9 | Minimum angle limit | 2 | 0 | EEPROM | Reading and writing | 0 | 4094 | step | Sets minimum motion travel limit; must be < max angle limit. | -| 11 | B | 0xB | Maximum angle limit | 2 | 4095 | EEPROM | Reading and writing | 1 | 4095 | step | Sets maximum motion travel limit; must be > min angle limit. | -| 13 | D | 0xD | Maximum temperature limit | 1 | 70 | EEPROM | Reading and writing | 0 | 100 | °C | Maximum operating temperature limit; setting accuracy 1°C. | -| 14 | E | 0xE | Maximum input voltage | 1 | | EEPROM | Reading and writing | 0 | 254 | 0.1V | Maximum voltage = value*0.1V. | -| 15 | F | 0xF | Minimum input voltage | 1 | | EEPROM | Reading and writing | 0 | 254 | 0.1V | Minimum voltage = value*0.1V. | -| 16 | 10 | 0x10 | Maximum torque | 2 | 1000 | EEPROM | Reading and writing | 0 | 1000 | 0.1% | Maximum output torque limit, assigned on power-up. | -| 18 | 12 | 0x12 | Phase | 1 | | EEPROM | Reading and writing | 0 | 254 | none | Special function bytes cannot be modified unless required. | -| 19 | 13 | 0x13 | Uninstallation conditions | 1 | 44 | EEPROM | Reading and writing | 0 | 254 | none | Bit 1 enables/disables corresponding protection. | -| 20 | 14 | 0x14 | LED alarm conditions | 1 | 47 | EEPROM | Reading and writing | 0 | 254 | none | Bit 1 enables/disables flashing alarm. | -| 21 | 15 | 0x15 | Position ring P proportional coefficient | 1 | | EEPROM | Reading and writing | 0 | 254 | none | Proportional coefficient for motor control. | -| 22 | 16 | 0x16 | Differential coefficients of position loop D | 1 | | EEPROM | Reading and writing | 0 | 254 | none | Differential coefficient of motor control. | -| 23 | 17 | 0x17 | Integral coefficients of position loop I | 1 | | EEPROM | Reading and writing | 0 | 254 | none | Integral coefficient of motor control. | -| 24 | 18 | 0x18 | Minimum starting force | 1 | | EEPROM | Reading and writing | 0 | 254 | 0.1% | Minimum output starting torque; 10 = 1% stall torque. | -| 25 | 19 | 0x19 | Points limit | 1 | | EEPROM | Reading and writing | 0 | 254 | none | Max score = score limit * 4. 0 disables limit. | -| 26 | 1A | 0x1A | Clockwise insensitive area | 1 | 1 | EEPROM | Reading and writing | 0 | 32 | step | Smallest unit is minimum resolution angle. | -| 27 | 1B | 0x1B | Counterclockwise insensitive area | 1 | 1 | EEPROM | Reading and writing | 0 | 32 | step | Smallest unit is minimum resolution angle. | -| 28 | 1C | 0x1C | Protective current | 2 | | EEPROM | Reading and writing | 0 | 511 | 6.5mA | Max settable current = 500*6.5mA = 3250mA. | -| 30 | 1E | 0x1E | Angular resolution | 1 | 1 | EEPROM | Reading and writing | 1 | 3 | none | Modifies magnification factor for sensor min resolution angle. | -| 31 | 1F | 0x1F | Position correction | 2 | 0 | EEPROM | Reading and writing | -2047 | 2047 | step | BIT11 = direction; other bits = 0-2047 steps. | -| 33 | 21 | 0x21 | Operating mode | 1 | 0 | EEPROM | Reading and writing | 0 | 3 | none | 0: Position mode; 1: Constant speed; 2: PWM open-loop; 3: Stepper servo mode. | -| 34 | 22 | 0x22 | Protective torque | 1 | 20 | EEPROM | Reading and writing | 0 | 100 | 1.0% | Output torque after overload protection. | -| 35 | 23 | 0x23 | Protection time | 1 | 200 | EEPROM | Reading and writing | 0 | 254 | 10ms | Duration exceeding overload torque before reset. | -| 36 | 24 | 0x24 | Overload torque | 1 | 80 | EEPROM | Reading and writing | 0 | 100 | 1.0% | Max torque threshold for overload protection. | -| 37 | 25 | 0x25 | Speed closed-loop P proportional coefficient | 1 | | EEPROM | Reading and writing | 0 | 254 | none | Speed loop proportional coefficient in constant speed mode. | -| 38 | 26 | 0x26 | Overcurrent protection time | 1 | 200 | EEPROM | Reading and writing | 0 | 254 | 10ms | Max configurable = 254*10ms = 2540ms. | -| 39 | 27 | 0x27 | Integral coefficient of closed-loop velocity system | 1 | | EEPROM | Reading and writing | 0 | 254 | 1/10 | Integral coefficient of speed loop, reduced by factor 10 vs version 3.6. | -| 40 | 28 | 0x28 | Torque switch | 1 | 0 | SRAM | Reading and writing | 0 | 128 | none | 0: Off torque; 1: On torque; 128: Correct current position to 2048. | -| 41 | 29 | 0x29 | Acceleration | 1 | 0 | SRAM | Reading and writing | 0 | 254 | 100 steps/s² | 10 → 1000 steps/s² acceleration/deceleration. | -| 42 | 2A | 0x2A | Target location | 2 | 0 | SRAM | Reading and writing | -32766 | 32766 | step | Absolute position control; each step = min resolvable angle. | -| 44 | 2C | 0x2C | Runtime | 2 | 0 | SRAM | Reading and writing | 0 | 1000 | 0.10% | PWM open-loop speed control, BIT10 = direction. | -| 46 | 2E | 0x2E | Running speed | 2 | 0 | SRAM | Reading and writing | -32766 | 32766 | Steps/s | Number of steps per second; 50 steps/s ≈ 0.732 RPM. | -| 48 | 30 | 0x30 | Torque Limit | 2 | 1000 | SRAM | Reading and writing | 0 | 1000 | 1.0% | Initial power-on value assigned by maximum torque (0x10). | -| 49 | 31 | 0x31 | Undefined | 1 | | SRAM | none | -- | -- | -- | -- | -| 50 | 32 | 0x32 | Undefined | 1 | | SRAM | none | -- | -- | -- | -- | -| 51 | 33 | 0x33 | Undefined | 1 | | SRAM | none | -- | -- | -- | -- | -| 52 | 34 | 0x34 | Undefined | 1 | | SRAM | none | -- | -- | -- | -- | -| 53 | 35 | 0x35 | Undefined | 1 | | SRAM | none | -- | -- | -- | -- | -| 54 | 36 | 0x36 | Undefined | 1 | | SRAM | none | -- | -- | -- | -- | -| 55 | 37 | 0x37 | Lock symbol | 1 | 0 | SRAM | Reading and writing | 0 | 1 | none | 0: disable write lock; 1: enable write lock; affects EEPROM saving. | -| 56 | 38 | 0x38 | Current location | 2 | 0 | SRAM | Read-only | -- | -- | step | Feedback on current position step count; absolute control. | -| 58 | 3A | 0x3A | Current speed | 2 | 0 | SRAM | Read-only | -- | -- | Steps/s | Feedback on motor speed in steps per second. | -| 60 | 3C | 0x3C | Current load | 2 | 0 | SRAM | Read-only | -- | -- | 0.1% | Motor output drive duty cycle. | -| 62 | 3E | 0x3E | Current voltage | 1 | 0 | SRAM | Read-only | -- | -- | 0.1V | Current operating voltage. | -| 63 | 3F | 0x3F | Current temperature | 1 | 0 | SRAM | Read-only | -- | -- | °C | Internal operating temperature. | -| 64 | 40 | 0x40 | Asynchronous write flag | 1 | 0 | SRAM | Read-only | -- | -- | none | Flag for asynchronous write instructions. | -| 65 | 41 | 0x41 | Servo status | 1 | 0 | SRAM | Read-only | -- | -- | none | Bit = 1 indicates error; 0 = no error. | -| 66 | 42 | 0x42 | Mobile sign | 1 | 0 | SRAM | Read-only | -- | -- | none | 1: servo in motion; 0: stationary. | -| 67 | 43 | 0x43 | Undefined | 2 | 0 | SRAM | Read-only | -- | -- | -- | -- | -| 69 | 45 | 0x45 | Current current | 2 | 0 | SRAM | Read-only | -- | -- | 6.5mA | Max measurable current = 500*6.5mA = 3250mA. | +| 0 | 0 | 0x00 | Firmware major version number | 1 | 1 | EEPROM | Read-only | -- | -- | -- | -- | +| 1 | 1 | 0x01 | Firmware minor version number | 1 | 1 | EEPROM | Read-only | -- | -- | -- | -- | +| 2 | 2 | 0x02 | END | 1 | 0 | EEPROM | Read-only | -- | -- | -- | -- | +| 3 | 3 | 0x03 | Servo main version number | 1 | | EEPROM | Read-only | -- | -- | -- | -- | +| 4 | 4 | 0x04 | Servo version number | 1 | | EEPROM | Read-only | -- | -- | -- | -- | +| 5 | 5 | 0x05 | ID | 1 | 1 | EEPROM | Reading and writing | 0 | 253 | Number | Unique identifier on the bus; duplicate IDs are not allowed. 254 (0xFE) is broadcast ID. | +| 6 | 6 | 0x06 | Baud rate | 1 | 0 | EEPROM | Reading and writing | 0 | 7 | none | 0: 1,000,000; 1: 500,000; 2: 250,000; 3: 128,000; 4: 115,200; 5: 76,800; 6: 57,600; 7: 38,400 baud. | +| 7 | 7 | 0x07 | Return delay | 1 | 250 | EEPROM | Reading and writing | 0 | 254 | 2µs | Minimum unit is 2µs; max configurable 254*2 = 508µs. | +| 8 | 8 | 0x08 | Phase 2 (Response Status Level) | 1 | 1 | EEPROM | Reading and writing | 0 | 1 | none | 0: No response for commands other than read/PING. 1: Response returned for all commands. | +| 9 | 9 | 0x09 | Minimum angle limit | 2 | 0 | EEPROM | Reading and writing | 0 | 4094 | step | Sets minimum motion travel limit; must be < max angle limit. | +| 11 | 11 | 0x0B | Maximum angle limit | 2 | 4095 | EEPROM | Reading and writing | 1 | 4095 | step | Sets maximum motion travel limit; must be > min angle limit. | +| 13 | 13 | 0x0D | Maximum temperature limit | 1 | 70 | EEPROM | Reading and writing | 0 | 100 | °C | Maximum operating temperature limit; setting accuracy 1°C. | +| 14 | 14 | 0x0E | Maximum input voltage | 1 | | EEPROM | Reading and writing | 0 | 254 | 0.1V | Maximum voltage = value*0.1V. | +| 15 | 15 | 0x0F | Minimum input voltage | 1 | | EEPROM | Reading and writing | 0 | 254 | 0.1V | Minimum voltage = value*0.1V. | +| 16 | 16 | 0x10 | Maximum torque | 2 | 1000 | EEPROM | Reading and writing | 0 | 1000 | 0.1% | Maximum output torque limit, assigned on power-up. | +| 18 | 18 | 0x12 | Phase | 1 | | EEPROM | Reading and writing | 0 | 254 | none | Special function bytes cannot be modified unless required. | +| 19 | 19 | 0x13 | Uninstallation conditions | 1 | 44 | EEPROM | Reading and writing | 0 | 254 | none | Bit 1 enables/disables corresponding protection. | +| 20 | 20 | 0x14 | LED alarm conditions | 1 | 47 | EEPROM | Reading and writing | 0 | 254 | none | Bit 1 enables/disables flashing alarm. | +| 21 | 21 | 0x15 | Position ring P proportional coefficient | 1 | | EEPROM | Reading and writing | 0 | 254 | none | Proportional coefficient for motor control. | +| 22 | 22 | 0x16 | Differential coefficients of position loop D | 1 | | EEPROM | Reading and writing | 0 | 254 | none | Differential coefficient of motor control. | +| 23 | 23 | 0x17 | Integral coefficients of position loop I | 1 | | EEPROM | Reading and writing | 0 | 254 | none | Integral coefficient of motor control. | +| 24 | 24 | 0x18 | Minimum starting force | 1 | | EEPROM | Reading and writing | 0 | 254 | 0.1% | Minimum output starting torque; 10 = 1% stall torque. | +| 25 | 25 | 0x19 | Points limit | 1 | | EEPROM | Reading and writing | 0 | 254 | none | Max score = score limit * 4. 0 disables limit. | +| 26 | 26 | 0x1A | Clockwise insensitive area | 1 | 1 | EEPROM | Reading and writing | 0 | 32 | step | Smallest unit is minimum resolution angle. | +| 27 | 27 | 0x1B | Counterclockwise insensitive area | 1 | 1 | EEPROM | Reading and writing | 0 | 32 | step | Smallest unit is minimum resolution angle. | +| 28 | 28 | 0x1C | Protective current | 2 | | EEPROM | Reading and writing | 0 | 511 | 6.5mA | Max settable current = 500*6.5mA = 3250mA. | +| 30 | 30 | 0x1E | Angular resolution | 1 | 1 | EEPROM | Reading and writing | 1 | 3 | none | Modifies magnification factor for sensor min resolution angle. | +| 31 | 31 | 0x1F | Position correction | 2 | 0 | EEPROM | Reading and writing | -2047 | 2047 | step | BIT11 = direction; other bits = 0-2047 steps. | +| 33 | 33 | 0x21 | Operating mode | 1 | 0 | EEPROM | Reading and writing | 0 | 3 | none | 0: Position mode; 1: Constant speed; 2: PWM open-loop; 3: Stepper servo mode. | +| 34 | 34 | 0x22 | Protective torque | 1 | 20 | EEPROM | Reading and writing | 0 | 100 | 1.0% | Output torque after overload protection. | +| 35 | 35 | 0x23 | Protection time | 1 | 200 | EEPROM | Reading and writing | 0 | 254 | 10ms | Duration exceeding overload torque before reset. | +| 36 | 36 | 0x24 | Overload torque | 1 | 80 | EEPROM | Reading and writing | 0 | 100 | 1.0% | Max torque threshold for overload protection. | +| 37 | 37 | 0x25 | Speed closed-loop P proportional coefficient | 1 | | EEPROM | Reading and writing | 0 | 254 | none | Speed loop proportional coefficient in constant speed mode. | +| 38 | 38 | 0x26 | Overcurrent protection time | 1 | 200 | EEPROM | Reading and writing | 0 | 254 | 10ms | Max configurable = 254*10ms = 2540ms. | +| 39 | 39 | 0x27 | Integral coefficient of closed-loop velocity system | 1 | | EEPROM | Reading and writing | 0 | 254 | 1/10 | Integral coefficient of speed loop, reduced by factor 10 vs version 3.6. | +| 40 | 40 | 0x28 | Torque switch | 1 | 0 | SRAM | Reading and writing | 0 | 128 | none | 0: Off torque; 1: On torque; 128: Correct current position to 2048. | +| 41 | 41 | 0x29 | Acceleration | 1 | 0 | SRAM | Reading and writing | 0 | 254 | 100 steps/s² | 10 → 1000 steps/s² acceleration/deceleration. | +| 42 | 42 | 0x2A | Target location | 2 | 0 | SRAM | Reading and writing | -32766 | 32766 | step | Absolute position control; each step = min resolvable angle. | +| 44 | 44 | 0x2C | Runtime | 2 | 0 | SRAM | Reading and writing | 0 | 1000 | 0.10% | PWM open-loop speed control, BIT10 = direction. | +| 46 | 46 | 0x2E | Running speed | 2 | 0 | SRAM | Reading and writing | -32766 | 32766 | Steps/s | Number of steps per second; 50 steps/s ≈ 0.732 RPM. | +| 48 | 48 | 0x30 | Torque Limit | 2 | 1000 | SRAM | Reading and writing | 0 | 1000 | 1.0% | Initial power-on value assigned by maximum torque (0x10). | +| 49 | 49 | 0x31 | Undefined | 1 | | SRAM | none | -- | -- | -- | -- | +| 50 | 50 | 0x32 | Undefined | 1 | | SRAM | none | -- | -- | -- | -- | +| 51 | 51 | 0x33 | Undefined | 1 | | SRAM | none | -- | -- | -- | -- | +| 52 | 52 | 0x34 | Undefined | 1 | | SRAM | none | -- | -- | -- | -- | +| 53 | 53 | 0x35 | Undefined | 1 | | SRAM | none | -- | -- | -- | -- | +| 54 | 54 | 0x36 | Undefined | 1 | | SRAM | none | -- | -- | -- | -- | +| 55 | 55 | 0x37 | Lock symbol | 1 | 0 | SRAM | Reading and writing | 0 | 1 | none | 0: disable write lock; 1: enable write lock; affects EEPROM saving. | +| 56 | 56 | 0x38 | Current location | 2 | 0 | SRAM | Read-only | -- | -- | step | Feedback on current position step count; absolute control. | +| 58 | 58 | 0x3A | Current speed | 2 | 0 | SRAM | Read-only | -- | -- | Steps/s | Feedback on motor speed in steps per second. | +| 60 | 60 | 0x3C | Current load | 2 | 0 | SRAM | Read-only | -- | -- | 0.1% | Motor output drive duty cycle. | +| 62 | 62 | 0x3E | Current voltage | 1 | 0 | SRAM | Read-only | -- | -- | 0.1V | Current operating voltage. | +| 63 | 63 | 0x3F | Current temperature | 1 | 0 | SRAM | Read-only | -- | -- | °C | Internal operating temperature. | +| 64 | 64 | 0x40 | Asynchronous write flag | 1 | 0 | SRAM | Read-only | -- | -- | none | Flag for asynchronous write instructions. | +| 65 | 65 | 0x41 | Servo status | 1 | 0 | SRAM | Read-only | -- | -- | none | Bit = 1 indicates error; 0 = no error. | +| 66 | 66 | 0x42 | Mobile sign | 1 | 0 | SRAM | Read-only | -- | -- | none | 1: servo in motion; 0: stationary. | +| 67 | 67 | 0x43 | Undefined | 2 | 0 | SRAM | Read-only | -- | -- | -- | -- | +| 69 | 69 | 0x45 | Current current | 2 | 0 | SRAM | Read-only | -- | -- | 6.5mA | Max measurable current = 500*6.5mA = 3250mA. | \ No newline at end of file diff --git a/examples/14_change_id.py b/examples/14_change_id.py index 5727693..5f43ae8 100644 --- a/examples/14_change_id.py +++ b/examples/14_change_id.py @@ -5,8 +5,6 @@ import os from python_st3215 import ST3215 -OLD_ID = 1 -NEW_ID = 5 controller = ST3215(os.environ.get("ST3215_PORT", "/dev/ttyUSB0")) @@ -15,7 +13,21 @@ print("CHANGE SERVO ID") print("=" * 60) print("\nWARNING: Only connect ONE servo to avoid ID conflicts!") - print(f"This will change servo ID from {OLD_ID} to {NEW_ID}") + + try: + old_id_input = input("\nEnter CURRENT Servo ID (default 1): ").strip() + OLD_ID = int(old_id_input) if old_id_input else 1 + + new_id_input = input("Enter NEW Servo ID: ").strip() + if not new_id_input: + print("New ID is required. Exiting.") + exit(1) + NEW_ID = int(new_id_input) + except ValueError: + print("Invalid input. Please enter numeric IDs.") + exit(1) + + print(f"\nThis will change servo ID from {OLD_ID} to {NEW_ID}") response = input("\nType 'yes' to continue: ") if response.lower() != "yes": @@ -26,8 +38,17 @@ print(f"\nCurrent ID: {servo.eeprom.read_id()}") print(f"Changing to ID {NEW_ID}...") + # Unlock EEPROM to allow saving changes + print("Unlocking EEPROM...") + servo.sram.unlock() + + print("Writing new ID...") servo.eeprom.write_id(NEW_ID) + # Lock EEPROM again (optional but recommended) + print("Locking EEPROM...") + servo.sram.lock() + print("\nVerifying change...") new_servo = controller.wrap_servo(NEW_ID) confirmed_id = new_servo.eeprom.read_id() diff --git a/pyproject.toml b/pyproject.toml index c76398f..275967d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "python-st3215" -version = "1.0.0" +dynamic = ["version"] authors = [ { name="Alessio D'Ambrosio", email="alessio@alessiodam.dev" }, ] @@ -26,6 +26,8 @@ classifiers = [ "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: 3.14", "Operating System :: OS Independent", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)" ] @@ -39,3 +41,12 @@ Issues = "https://github.com/alessiodam/python-st3215/issues" [build-system] requires = ["hatchling >= 1.27"] build-backend = "hatchling.build" + +[tool.hatch.version] +path = "src/python_st3215/__init__.py" + +[tool.mypy] +python_version = "3.13" +packages = ["python_st3215"] +exclude = ["examples"] +strict = true diff --git a/requirements.txt b/requirements.txt index 7ad05ef..e9bc495 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ pyserial>=3.5 +types-pyserial>=3.5 diff --git a/src/python_st3215/__init__.py b/src/python_st3215/__init__.py index 021a19d..2b7349c 100644 --- a/src/python_st3215/__init__.py +++ b/src/python_st3215/__init__.py @@ -2,7 +2,7 @@ from .errors import ST3215Error, ServoNotRespondingError, InvalidInstructionError from .servo import Servo -__version__ = "1.0.0" +__version__ = "1.1.0" __all__ = [ "ST3215", diff --git a/src/python_st3215/decorators.py b/src/python_st3215/decorators.py index 1c05751..b95e9d7 100644 --- a/src/python_st3215/decorators.py +++ b/src/python_st3215/decorators.py @@ -1,14 +1,16 @@ from functools import wraps -from typing import Callable +from typing import Any, Callable, TypeVar +F = TypeVar("F", bound=Callable[..., Any]) -def validate_servo_id(func: Callable) -> Callable: + +def validate_servo_id(func: Callable[..., Any]) -> Callable[..., Any]: """ Decorator to validate servo ID is not broadcast ID (254). """ @wraps(func) - def wrapper(self, servo_id: int, *args, **kwargs): + def wrapper(self: Any, servo_id: int, *args: Any, **kwargs: Any) -> Any: if servo_id == 254: from .errors import ST3215Error @@ -18,13 +20,13 @@ def wrapper(self, servo_id: int, *args, **kwargs): return wrapper -def validate_broadcast_only(func: Callable) -> Callable: +def validate_broadcast_only(func: Callable[..., Any]) -> Callable[..., Any]: """ Decorator to ensure operation is only used with broadcast servo (ID 254). """ @wraps(func) - def wrapper(self, *args, **kwargs): + def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: if self.servo.id != 254: from .errors import ST3215Error @@ -36,14 +38,16 @@ def wrapper(self, *args, **kwargs): return wrapper -def validate_value_range(min_val: int, max_val: int) -> Callable: +def validate_value_range( + min_val: int, max_val: int +) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """ Decorator to validate that a value parameter is within the specified range. """ - def decorator(func: Callable) -> Callable: + def decorator(func: Callable[..., Any]) -> Callable[..., Any]: @wraps(func) - def wrapper(self, value: int, *args, **kwargs): + def wrapper(self: Any, value: int, *args: Any, **kwargs: Any) -> Any: if not (min_val <= value <= max_val): raise ValueError( f"{func.__name__}: value {value} out of range [{min_val}, {max_val}]" @@ -102,151 +106,3 @@ def encode_unsigned_word(value: int) -> tuple[int, int]: low = value & 0xFF high = (value >> 8) & 0xFF return low, high - - -def write_method(address: int, size: int = 1, signed: bool = False): - """ - Decorator factory to create write methods for memory addresses. - - Args: - address: Memory address to write to - size: Size in bytes (1 or 2) - signed: Whether the value is signed (for 2-byte values) - - Returns: - Decorator function - """ - - def decorator(func: Callable) -> Callable: - @wraps(func) - def wrapper(self, value: int, reg: bool = False): - write_fn = self.servo._reg_write_memory if reg else self.servo._write_memory - if size == 1: - return write_fn(address, [value & 0xFF]) - elif size == 2: - if signed: - low, high = encode_signed_word(value) - else: - low, high = encode_unsigned_word(value) - return write_fn(address, [low, high]) - raise ValueError(f"Unsupported size {size} for write_method.") - - return wrapper - - return decorator - - -def read_method(address: int, size: int = 1, signed: bool = False): - """ - Decorator factory to create read methods for memory addresses. - - Args: - address: Memory address to read from - size: Size in bytes (1 or 2) - signed: Whether the value is signed (for 2-byte values) - - Returns: - Decorator function - """ - - def decorator(func: Callable) -> Callable: - @wraps(func) - def wrapper(self): - raw = self.servo._read_memory(address, size) - if raw is None: - return None - if size == 2 and signed: - return decode_signed_word(raw) - return raw - - return wrapper - - return decorator - - -def sync_write_method(address: int, size: int = 1, signed: bool = False): - """ - Decorator factory to create sync write methods for broadcast operations. - - Args: - address: Memory address to write to - size: Size in bytes (1 or 2) - signed: Whether the value is signed (for 2-byte values) - - Returns: - Decorator function - """ - - def decorator(func: Callable) -> Callable: - @wraps(func) - @validate_broadcast_only - def wrapper(self, servo_data: dict[int, int]): - """ - Perform SYNC WRITE to multiple servos. - - Args: - servo_data: Dictionary mapping servo_id to value - - Returns: - None (broadcast operation, no response) - """ - formatted_data = {} - for servo_id, value in servo_data.items(): - if size == 1: - formatted_data[servo_id] = [value & 0xFF] - elif size == 2: - if signed: - low, high = encode_signed_word(value) - else: - low, high = encode_unsigned_word(value) - formatted_data[servo_id] = [low, high] - return self.servo._sync_write(address, size, formatted_data) - - return wrapper - - return decorator - - -def sync_read_method(address: int, size: int = 1, signed: bool = False): - """ - Decorator factory to create sync read methods for broadcast operations. - - Args: - address: Memory address to read from - size: Size in bytes (1 or 2) - signed: Whether the value is signed (for 2-byte values) - - Returns: - Decorator function - """ - - def decorator(func: Callable) -> Callable: - @wraps(func) - @validate_broadcast_only - def wrapper(self, servo_ids: list[int]): - """ - Perform SYNC READ from multiple servos. - - Args: - servo_ids: List of servo IDs to query - - Returns: - Dictionary mapping servo_id to value - """ - responses = self.servo._sync_read(address, size, servo_ids) - results = {} - for servo_id, response in responses.items(): - if response and response.get("parameters"): - data = response["parameters"] - if size == 1: - results[servo_id] = data[0] - elif size == 2: - raw = data[0] | (data[1] << 8) - results[servo_id] = decode_signed_word(raw) if signed else raw - else: - results[servo_id] = None - return results - - return wrapper - - return decorator diff --git a/src/python_st3215/registers.py b/src/python_st3215/registers.py index f486b46..6d6ad3e 100644 --- a/src/python_st3215/registers.py +++ b/src/python_st3215/registers.py @@ -1,24 +1,57 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, Optional, Any +from .errors import ST3215Error if TYPE_CHECKING: from .servo import Servo from .decorators import ( - read_method, - write_method, - sync_read_method, - sync_write_method, validate_value_range, + encode_signed_word, + decode_signed_word, encode_unsigned_word, ) +def read_byte(servo: "Servo", address: int) -> Optional[int]: + """Read a single byte from memory.""" + return servo._read_memory(address, 1) + + +def read_word(servo: "Servo", address: int, signed: bool = False) -> Optional[int]: + """Read a 16-bit word from memory.""" + raw = servo._read_memory(address, 2) + if raw is None: + return None + if signed: + return decode_signed_word(raw) + return raw + + +def write_byte( + servo: "Servo", address: int, value: int, reg: bool = False +) -> dict[str, object] | None: + """Write a single byte to memory.""" + write_fn = servo._reg_write_memory if reg else servo._write_memory + return write_fn(address, [value & 0xFF]) + + +def write_word( + servo: "Servo", address: int, value: int, signed: bool = False, reg: bool = False +) -> dict[str, object] | None: + """Write a 16-bit word to memory.""" + if signed: + low, high = encode_signed_word(value) + else: + low, high = encode_unsigned_word(value) + write_fn = servo._reg_write_memory if reg else servo._write_memory + return write_fn(address, [low, high]) + + class _EEPROMRegisters: - def __init__(self, servo: "Servo"): + def __init__(self, servo: "Servo") -> None: self.servo = servo - @read_method(0x00, size=1) def read_firmware_major_version(self) -> Optional[int]: """ Read the firmware major version number. @@ -26,9 +59,8 @@ def read_firmware_major_version(self) -> Optional[int]: Returns: int: Major version number, or None if read fails """ - pass + return read_byte(self.servo, 0x00) - @read_method(0x01, size=1) def read_firmware_minor_version(self) -> Optional[int]: """ Read the firmware minor version number. @@ -36,9 +68,8 @@ def read_firmware_minor_version(self) -> Optional[int]: Returns: int: Minor version number, or None if read fails """ - pass + return read_byte(self.servo, 0x01) - @read_method(0x03, size=1) def read_servo_main_version(self) -> Optional[int]: """ Read the servo main version number. @@ -46,9 +77,8 @@ def read_servo_main_version(self) -> Optional[int]: Returns: int: Main version number, or None if read fails """ - pass + return read_byte(self.servo, 0x03) - @read_method(0x04, size=1) def read_servo_version(self) -> Optional[int]: """ Read the servo version number. @@ -56,9 +86,8 @@ def read_servo_version(self) -> Optional[int]: Returns: int: Servo version number, or None if read fails """ - pass + return read_byte(self.servo, 0x04) - @read_method(0x05, size=1) def read_id(self) -> Optional[int]: """ Read the servo ID. @@ -66,11 +95,10 @@ def read_id(self) -> Optional[int]: Returns: int: Servo ID (0-253), or None if read fails """ - pass + return read_byte(self.servo, 0x05) - @write_method(0x05, size=1) @validate_value_range(0, 253) - def write_id(self, value: int, reg: bool = False): + def write_id(self, value: int, reg: bool = False) -> dict[str, object] | None: """ Set the servo ID. @@ -81,9 +109,8 @@ def write_id(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x05, value, reg) - @read_method(0x06, size=1) def read_baudrate(self) -> Optional[int]: """ Read the baud rate setting. @@ -100,11 +127,10 @@ def read_baudrate(self) -> Optional[int]: 7 = 9,600 baud Returns None if read fails """ - pass + return read_byte(self.servo, 0x06) - @write_method(0x06, size=1) @validate_value_range(0, 7) - def write_baudrate(self, value: int, reg: bool = False): + def write_baudrate(self, value: int, reg: bool = False) -> dict[str, object] | None: """ Set the baud rate. @@ -115,9 +141,8 @@ def write_baudrate(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x06, value, reg) - @read_method(0x07, size=1) def read_return_delay(self) -> Optional[int]: """ Read the return delay time. @@ -125,11 +150,12 @@ def read_return_delay(self) -> Optional[int]: Returns: int: Delay in 2µs units (0-254). Max = 508µs. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x07) - @write_method(0x07, size=1) @validate_value_range(0, 254) - def write_return_delay(self, value: int, reg: bool = False): + def write_return_delay( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the return delay time. @@ -140,9 +166,8 @@ def write_return_delay(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x07, value, reg) - @read_method(0x08, size=1) def read_response_status_level(self) -> Optional[int]: """ Read the response status level. @@ -152,11 +177,12 @@ def read_response_status_level(self) -> Optional[int]: 1 = Response returned for all commands Returns None if read fails """ - pass + return read_byte(self.servo, 0x08) - @write_method(0x08, size=1) @validate_value_range(0, 1) - def write_response_status_level(self, value: int, reg: bool = False): + def write_response_status_level( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the response status level. @@ -168,9 +194,8 @@ def write_response_status_level(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x08, value, reg) - @read_method(0x09, size=2) def read_min_angle_limit(self) -> Optional[int]: """ Read the minimum angle limit. @@ -179,11 +204,12 @@ def read_min_angle_limit(self) -> Optional[int]: int: Minimum position in steps (0-4094), or None if read fails. Must be less than max_angle_limit. """ - pass + return read_word(self.servo, 0x09) - @write_method(0x09, size=2) @validate_value_range(0, 4094) - def write_min_angle_limit(self, value: int, reg: bool = False): + def write_min_angle_limit( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the minimum angle limit. @@ -195,9 +221,8 @@ def write_min_angle_limit(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_word(self.servo, 0x09, value, reg=reg) - @read_method(0x0B, size=2) def read_max_angle_limit(self) -> Optional[int]: """ Read the maximum angle limit. @@ -206,11 +231,12 @@ def read_max_angle_limit(self) -> Optional[int]: int: Maximum position in steps (1-4095), or None if read fails. Must be greater than min_angle_limit. """ - pass + return read_word(self.servo, 0x0B) - @write_method(0x0B, size=2) @validate_value_range(1, 4095) - def write_max_angle_limit(self, value: int, reg: bool = False): + def write_max_angle_limit( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the maximum angle limit. @@ -222,9 +248,8 @@ def write_max_angle_limit(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_word(self.servo, 0x0B, value, reg=reg) - @read_method(0x0D, size=1) def read_max_temperature_limit(self) -> Optional[int]: """ Read the maximum temperature limit. @@ -232,11 +257,12 @@ def read_max_temperature_limit(self) -> Optional[int]: Returns: int: Temperature in °C (0-100), or None if read fails. """ - pass + return read_byte(self.servo, 0x0D) - @write_method(0x0D, size=1) @validate_value_range(0, 100) - def write_max_temperature_limit(self, value: int, reg: bool = False): + def write_max_temperature_limit( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the maximum temperature limit. @@ -247,9 +273,8 @@ def write_max_temperature_limit(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x0D, value, reg) - @read_method(0x0E, size=1) def read_max_input_voltage(self) -> Optional[int]: """ Read the maximum input voltage limit. @@ -258,11 +283,12 @@ def read_max_input_voltage(self) -> Optional[int]: int: Voltage in 0.1V units (0-254). Example: 120 = 12.0V. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x0E) - @write_method(0x0E, size=1) @validate_value_range(0, 254) - def write_max_input_voltage(self, value: int, reg: bool = False): + def write_max_input_voltage( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the maximum input voltage limit. @@ -273,9 +299,8 @@ def write_max_input_voltage(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x0E, value, reg) - @read_method(0x0F, size=1) def read_min_input_voltage(self) -> Optional[int]: """ Read the minimum input voltage limit. @@ -284,24 +309,24 @@ def read_min_input_voltage(self) -> Optional[int]: int: Voltage in 0.1V units (0-254). Example: 60 = 6.0V. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x0F) - @write_method(0x0F, size=1) @validate_value_range(0, 254) - def write_min_input_voltage(self, value: int, reg: bool = False): + def write_min_input_voltage( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the minimum input voltage limit. Args: - value (int): Voltage in 0. 1V units (0-254). Example: 60 = 6.0V. + value (int): Voltage in 0.1V units (0-254). Example: 60 = 6.0V. reg (bool): If True, use registered write mode. Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x0F, value, reg) - @read_method(0x10, size=2) def read_max_torque(self) -> Optional[int]: """ Read the maximum torque limit. @@ -311,11 +336,12 @@ def read_max_torque(self) -> Optional[int]: This value is assigned to torque_limit on power-up. Returns None if read fails. """ - pass + return read_word(self.servo, 0x10) - @write_method(0x10, size=2) @validate_value_range(0, 1000) - def write_max_torque(self, value: int, reg: bool = False): + def write_max_torque( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the maximum torque limit. @@ -327,9 +353,8 @@ def write_max_torque(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_word(self.servo, 0x10, value, reg=reg) - @read_method(0x12, size=1) def read_phase(self) -> Optional[int]: """ Read the phase register (special function). @@ -338,11 +363,10 @@ def read_phase(self) -> Optional[int]: int: Phase value (0-254), or None if read fails. Do not modify unless required. """ - pass + return read_byte(self.servo, 0x12) - @write_method(0x12, size=1) @validate_value_range(0, 254) - def write_phase(self, value: int, reg: bool = False): + def write_phase(self, value: int, reg: bool = False) -> dict[str, object] | None: """ Set the phase register (special function). @@ -353,9 +377,8 @@ def write_phase(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x12, value, reg) - @read_method(0x13, size=1) def read_uninstallation_conditions(self) -> Optional[int]: """ Read the uninstallation conditions (protection settings). @@ -364,11 +387,12 @@ def read_uninstallation_conditions(self) -> Optional[int]: int: Bitmask (0-254) where bit 1 enables/disables corresponding protection. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x13) - @write_method(0x13, size=1) @validate_value_range(0, 254) - def write_uninstallation_conditions(self, value: int, reg: bool = False): + def write_uninstallation_conditions( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the uninstallation conditions (protection settings). @@ -379,9 +403,8 @@ def write_uninstallation_conditions(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x13, value, reg) - @read_method(0x14, size=1) def read_led_alarm_conditions(self) -> Optional[int]: """ Read the LED alarm conditions. @@ -390,11 +413,12 @@ def read_led_alarm_conditions(self) -> Optional[int]: int: Bitmask (0-254) where bit 1 enables/disables LED flashing alarm. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x14) - @write_method(0x14, size=1) @validate_value_range(0, 254) - def write_led_alarm_conditions(self, value: int, reg: bool = False): + def write_led_alarm_conditions( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the LED alarm conditions. @@ -405,9 +429,8 @@ def write_led_alarm_conditions(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x14, value, reg) - @read_method(0x15, size=1) def read_position_p(self) -> Optional[int]: """ Read the position control loop P (proportional) coefficient. @@ -415,11 +438,12 @@ def read_position_p(self) -> Optional[int]: Returns: int: P coefficient (0-254), or None if read fails. """ - pass + return read_byte(self.servo, 0x15) - @write_method(0x15, size=1) @validate_value_range(0, 254) - def write_position_p(self, value: int, reg: bool = False): + def write_position_p( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the position control loop P (proportional) coefficient. @@ -430,9 +454,8 @@ def write_position_p(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x15, value, reg) - @read_method(0x16, size=1) def read_position_d(self) -> Optional[int]: """ Read the position control loop D (differential) coefficient. @@ -440,11 +463,12 @@ def read_position_d(self) -> Optional[int]: Returns: int: D coefficient (0-254), or None if read fails. """ - pass + return read_byte(self.servo, 0x16) - @write_method(0x16, size=1) @validate_value_range(0, 254) - def write_position_d(self, value: int, reg: bool = False): + def write_position_d( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the position control loop D (differential) coefficient. @@ -455,9 +479,8 @@ def write_position_d(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x16, value, reg) - @read_method(0x17, size=1) def read_position_i(self) -> Optional[int]: """ Read the position control loop I (integral) coefficient. @@ -465,11 +488,12 @@ def read_position_i(self) -> Optional[int]: Returns: int: I coefficient (0-254), or None if read fails. """ - pass + return read_byte(self.servo, 0x17) - @write_method(0x17, size=1) @validate_value_range(0, 254) - def write_position_i(self, value: int, reg: bool = False): + def write_position_i( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the position control loop I (integral) coefficient. @@ -480,9 +504,8 @@ def write_position_i(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x17, value, reg) - @read_method(0x18, size=1) def read_min_starting_force(self) -> Optional[int]: """ Read the minimum starting force. @@ -491,11 +514,12 @@ def read_min_starting_force(self) -> Optional[int]: int: Force in 0.1% units (0-254). Example: 10 = 1% of stall torque. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x18) - @write_method(0x18, size=1) @validate_value_range(0, 254) - def write_min_starting_force(self, value: int, reg: bool = False): + def write_min_starting_force( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the minimum starting force. @@ -506,9 +530,8 @@ def write_min_starting_force(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x18, value, reg) - @read_method(0x19, size=1) def read_points_limit(self) -> Optional[int]: """ Read the points limit. @@ -517,11 +540,12 @@ def read_points_limit(self) -> Optional[int]: int: Points limit value (0-254). Max score = value * 4. 0 disables the limit. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x19) - @write_method(0x19, size=1) @validate_value_range(0, 254) - def write_points_limit(self, value: int, reg: bool = False): + def write_points_limit( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the points limit. @@ -533,9 +557,8 @@ def write_points_limit(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x19, value, reg) - @read_method(0x1A, size=1) def read_cw_insensitive_area(self) -> Optional[int]: """ Read the clockwise insensitive area (deadzone). @@ -544,11 +567,12 @@ def read_cw_insensitive_area(self) -> Optional[int]: int: Deadzone in steps (0-32). Smallest unit is minimum resolution angle. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x1A) - @write_method(0x1A, size=1) @validate_value_range(0, 32) - def write_cw_insensitive_area(self, value: int, reg: bool = False): + def write_cw_insensitive_area( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the clockwise insensitive area (deadzone). @@ -559,22 +583,22 @@ def write_cw_insensitive_area(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x1A, value, reg) - @read_method(0x1B, size=1) def read_ccw_insensitive_area(self) -> Optional[int]: """ Read the counterclockwise insensitive area (deadzone). Returns: - int: Deadzone in steps (0-32). Smallest unit is minimum resolution angle. + int: Deadzone in steps (0-32). Smallest unit is minimum resolution angle. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x1B) - @write_method(0x1B, size=1) @validate_value_range(0, 32) - def write_ccw_insensitive_area(self, value: int, reg: bool = False): + def write_ccw_insensitive_area( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the counterclockwise insensitive area (deadzone). @@ -585,9 +609,8 @@ def write_ccw_insensitive_area(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x1B, value, reg) - @read_method(0x1C, size=2) def read_protective_current(self) -> Optional[int]: """ Read the protective current limit. @@ -596,11 +619,12 @@ def read_protective_current(self) -> Optional[int]: int: Current in 6.5mA units (0-511). Max = 500*6.5mA = 3250mA. Returns None if read fails. """ - pass + return read_word(self.servo, 0x1C) - @write_method(0x1C, size=2) @validate_value_range(0, 511) - def write_protective_current(self, value: int, reg: bool = False): + def write_protective_current( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the protective current limit. @@ -611,9 +635,8 @@ def write_protective_current(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_word(self.servo, 0x1C, value, reg=reg) - @read_method(0x1E, size=1) def read_angular_resolution(self) -> Optional[int]: """ Read the angular resolution setting. @@ -622,11 +645,12 @@ def read_angular_resolution(self) -> Optional[int]: int: Resolution mode (1-3). Modifies magnification factor for sensor minimum resolution angle. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x1E) - @write_method(0x1E, size=1) @validate_value_range(1, 3) - def write_angular_resolution(self, value: int, reg: bool = False): + def write_angular_resolution( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the angular resolution. @@ -637,7 +661,7 @@ def write_angular_resolution(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x1E, value, reg) def read_position_correction(self) -> Optional[int]: """ @@ -654,7 +678,9 @@ def read_position_correction(self) -> Optional[int]: return raw @validate_value_range(-2047, 2047) - def write_position_correction(self, value: int, reg: bool = False): + def write_position_correction( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the position correction offset. @@ -673,7 +699,6 @@ def write_position_correction(self, value: int, reg: bool = False): write_fn = self.servo._reg_write_memory if reg else self.servo._write_memory return write_fn(0x1F, [low, high]) - @read_method(0x21, size=1) def read_operating_mode(self) -> Optional[int]: """ Read the operating mode. @@ -686,11 +711,12 @@ def read_operating_mode(self) -> Optional[int]: 3 = Stepper servo mode Returns None if read fails. """ - pass + return read_byte(self.servo, 0x21) - @write_method(0x21, size=1) @validate_value_range(0, 3) - def write_operating_mode(self, value: int, reg: bool = False): + def write_operating_mode( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the operating mode. @@ -705,9 +731,8 @@ def write_operating_mode(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x21, value, reg) - @read_method(0x22, size=1) def read_protective_torque(self) -> Optional[int]: """ Read the protective torque setting. @@ -716,11 +741,12 @@ def read_protective_torque(self) -> Optional[int]: int: Torque in 1.0% units (0-100). Output torque after overload protection. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x22) - @write_method(0x22, size=1) @validate_value_range(0, 100) - def write_protective_torque(self, value: int, reg: bool = False): + def write_protective_torque( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the protective torque. @@ -731,9 +757,8 @@ def write_protective_torque(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x22, value, reg) - @read_method(0x23, size=1) def read_protection_time(self) -> Optional[int]: """ Read the protection time. @@ -742,11 +767,12 @@ def read_protection_time(self) -> Optional[int]: int: Time in 10ms units (0-254). Duration exceeding overload torque before reset. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x23) - @write_method(0x23, size=1) @validate_value_range(0, 254) - def write_protection_time(self, value: int, reg: bool = False): + def write_protection_time( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the protection time. @@ -757,22 +783,22 @@ def write_protection_time(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x23, value, reg) - @read_method(0x24, size=1) def read_overload_torque(self) -> Optional[int]: """ Read the overload torque threshold. Returns: - int: Torque in 1. 0% units (0-100). Max torque threshold for + int: Torque in 1.0% units (0-100). Max torque threshold for overload protection. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x24) - @write_method(0x24, size=1) @validate_value_range(0, 100) - def write_overload_torque(self, value: int, reg: bool = False): + def write_overload_torque( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the overload torque threshold. @@ -783,9 +809,8 @@ def write_overload_torque(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x24, value, reg) - @read_method(0x25, size=1) def read_speed_p(self) -> Optional[int]: """ Read the speed control loop P (proportional) coefficient. @@ -794,11 +819,10 @@ def read_speed_p(self) -> Optional[int]: int: P coefficient (0-254) for constant speed mode. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x25) - @write_method(0x25, size=1) @validate_value_range(0, 254) - def write_speed_p(self, value: int, reg: bool = False): + def write_speed_p(self, value: int, reg: bool = False) -> dict[str, object] | None: """ Set the speed control loop P (proportional) coefficient. @@ -809,9 +833,8 @@ def write_speed_p(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x25, value, reg) - @read_method(0x26, size=1) def read_overcurrent_protection_time(self) -> Optional[int]: """ Read the overcurrent protection time. @@ -820,11 +843,12 @@ def read_overcurrent_protection_time(self) -> Optional[int]: int: Time in 10ms units (0-254). Max = 2540ms. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x26) - @write_method(0x26, size=1) @validate_value_range(0, 254) - def write_overcurrent_protection_time(self, value: int, reg: bool = False): + def write_overcurrent_protection_time( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the overcurrent protection time. @@ -835,9 +859,8 @@ def write_overcurrent_protection_time(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x26, value, reg) - @read_method(0x27, size=1) def read_speed_i(self) -> Optional[int]: """ Read the speed control loop I (integral) coefficient. @@ -846,11 +869,10 @@ def read_speed_i(self) -> Optional[int]: int: I coefficient (0-254) in 1/10 units. Reduced by factor 10 vs v3.6. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x27) - @write_method(0x27, size=1) @validate_value_range(0, 254) - def write_speed_i(self, value: int, reg: bool = False): + def write_speed_i(self, value: int, reg: bool = False) -> dict[str, object] | None: """ Set the speed control loop I (integral) coefficient. @@ -861,14 +883,13 @@ def write_speed_i(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x27, value, reg) class SRAMRegisters: - def __init__(self, servo: "Servo"): + def __init__(self, servo: "Servo") -> None: self.servo = servo - @read_method(0x28, size=1) def read_torque_switch(self) -> Optional[int]: """ Read the torque switch state. @@ -879,10 +900,11 @@ def read_torque_switch(self) -> Optional[int]: 128 = Correct current position to 2048 Returns None if read fails. """ - pass + return read_byte(self.servo, 0x28) - @write_method(0x28, size=1) - def write_torque_switch(self, value: int, reg: bool = False): + def write_torque_switch( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the torque switch state. @@ -893,9 +915,9 @@ def write_torque_switch(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x28, value, reg) - def torque_enable(self, reg: bool = False): + def torque_enable(self, reg: bool = False) -> dict[str, object] | None: """ Enable servo torque (motor power on). @@ -907,7 +929,7 @@ def torque_enable(self, reg: bool = False): """ return self.write_torque_switch(1, reg=reg) - def torque_disable(self, reg: bool = False): + def torque_disable(self, reg: bool = False) -> dict[str, object] | None: """ Disable servo torque (motor power off, free-spinning). @@ -919,7 +941,7 @@ def torque_disable(self, reg: bool = False): """ return self.write_torque_switch(0, reg=reg) - def correct_position_to_2048(self, reg: bool = False): + def correct_position_to_2048(self, reg: bool = False) -> dict[str, object] | None: """ Correct the current position to 2048 (mid-point calibration). @@ -931,7 +953,6 @@ def correct_position_to_2048(self, reg: bool = False): """ return self.write_torque_switch(128, reg=reg) - @read_method(0x29, size=1) def read_acceleration(self) -> Optional[int]: """ Read the acceleration setting. @@ -941,11 +962,12 @@ def read_acceleration(self) -> Optional[int]: Example: 10 = 1000 steps/s². Returns None if read fails. """ - pass + return read_byte(self.servo, 0x29) - @write_method(0x29, size=1) @validate_value_range(0, 254) - def write_acceleration(self, value: int, reg: bool = False): + def write_acceleration( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the acceleration/deceleration. @@ -957,10 +979,9 @@ def write_acceleration(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x29, value, reg) - @sync_write_method(0x29, size=1) - def sync_write_acceleration(self, servo_data: dict[int, int]): + def sync_write_acceleration(self, servo_data: dict[int, int]) -> None: """ SYNC WRITE acceleration to multiple servos. @@ -970,9 +991,15 @@ def sync_write_acceleration(self, servo_data: dict[int, int]): Returns: None (broadcast operation, no response) """ - pass + if self.servo.id != 254: + raise ST3215Error( + "sync_write_acceleration can only be used with broadcast servo (ID 254)." + ) + formatted_data: dict[int, list[int]] = {} + for servo_id, value in servo_data.items(): + formatted_data[servo_id] = [value & 0xFF] + return self.servo._sync_write(0x29, 1, formatted_data) # type: ignore - @read_method(0x2A, size=2, signed=True) def read_target_location(self) -> Optional[int]: """ Read the target position. @@ -982,11 +1009,12 @@ def read_target_location(self) -> Optional[int]: Each step = minimum resolvable angle. Returns None if read fails. """ - pass + return read_word(self.servo, 0x2A, signed=True) - @write_method(0x2A, size=2, signed=True) @validate_value_range(-32766, 32766) - def write_target_location(self, value: int, reg: bool = False): + def write_target_location( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the target position (absolute position control). @@ -997,10 +1025,9 @@ def write_target_location(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_word(self.servo, 0x2A, value, signed=True, reg=reg) - @sync_write_method(0x2A, size=2, signed=True) - def sync_write_target_location(self, servo_data: dict[int, int]): + def sync_write_target_location(self, servo_data: dict[int, int]) -> None: """ SYNC WRITE target position to multiple servos. @@ -1010,9 +1037,16 @@ def sync_write_target_location(self, servo_data: dict[int, int]): Returns: None (broadcast operation, no response) """ - pass + if self.servo.id != 254: + raise ST3215Error( + "sync_write_target_location can only be used with broadcast servo (ID 254)." + ) + formatted_data: dict[int, list[int]] = {} + for servo_id, value in servo_data.items(): + low, high = encode_signed_word(value) + formatted_data[servo_id] = [low, high] + return self.servo._sync_write(0x2A, 2, formatted_data) # type: ignore - @read_method(0x2C, size=2) def read_runtime(self) -> Optional[int]: """ Read the runtime setting (PWM open-loop mode). @@ -1022,11 +1056,10 @@ def read_runtime(self) -> Optional[int]: Used for PWM open-loop speed control. Returns None if read fails. """ - pass + return read_word(self.servo, 0x2C) - @write_method(0x2C, size=2) @validate_value_range(0, 2047) - def write_runtime(self, value: int, reg: bool = False): + def write_runtime(self, value: int, reg: bool = False) -> dict[str, object] | None: """ Set the runtime (PWM open-loop mode). @@ -1037,9 +1070,8 @@ def write_runtime(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_word(self.servo, 0x2C, value, reg=reg) - @read_method(0x2E, size=2, signed=True) def read_running_speed(self) -> Optional[int]: """ Read the running speed setpoint. @@ -1049,11 +1081,12 @@ def read_running_speed(self) -> Optional[int]: Sign indicates direction. 50 steps/s ≈ 0.732 RPM. Returns None if read fails. """ - pass + return read_word(self.servo, 0x2E, signed=True) - @write_method(0x2E, size=2, signed=True) @validate_value_range(-32766, 32766) - def write_running_speed(self, value: int, reg: bool = False): + def write_running_speed( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the running speed (constant speed mode). @@ -1065,10 +1098,9 @@ def write_running_speed(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_word(self.servo, 0x2E, value, signed=True, reg=reg) - @sync_write_method(0x2E, size=2, signed=True) - def sync_write_running_speed(self, servo_data: dict[int, int]): + def sync_write_running_speed(self, servo_data: dict[int, int]) -> None: """ SYNC WRITE running speed to multiple servos. @@ -1078,23 +1110,31 @@ def sync_write_running_speed(self, servo_data: dict[int, int]): Returns: None (broadcast operation, no response) """ - pass + if self.servo.id != 254: + raise ST3215Error( + "sync_write_running_speed can only be used with broadcast servo (ID 254)." + ) + formatted_data: dict[int, list[int]] = {} + for servo_id, value in servo_data.items(): + low, high = encode_signed_word(value) + formatted_data[servo_id] = [low, high] + return self.servo._sync_write(0x2E, 2, formatted_data) # type: ignore - @read_method(0x30, size=2) def read_torque_limit(self) -> Optional[int]: """ Read the current torque limit. Returns: - int: Torque limit in 0.1% units (0-1000). Example: 500 = 50. 0%. + int: Torque limit in 0.1% units (0-1000). Example: 500 = 50.0%. Initial value is set from max_torque (EEPROM) on power-up. Returns None if read fails. """ - pass + return read_word(self.servo, 0x30) - @write_method(0x30, size=2) @validate_value_range(0, 1000) - def write_torque_limit(self, value: int, reg: bool = False): + def write_torque_limit( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the torque limit. @@ -1105,10 +1145,9 @@ def write_torque_limit(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_word(self.servo, 0x30, value, reg=reg) - @sync_write_method(0x30, size=2) - def sync_write_torque_limit(self, servo_data: dict[int, int]): + def sync_write_torque_limit(self, servo_data: dict[int, int]) -> None: """ SYNC WRITE torque limit to multiple servos. @@ -1118,9 +1157,16 @@ def sync_write_torque_limit(self, servo_data: dict[int, int]): Returns: None (broadcast operation, no response) """ - pass + if self.servo.id != 254: + raise ST3215Error( + "sync_write_torque_limit can only be used with broadcast servo (ID 254)." + ) + formatted_data: dict[int, list[int]] = {} + for servo_id, value in servo_data.items(): + low, high = encode_unsigned_word(value) + formatted_data[servo_id] = [low, high] + self.servo._sync_write(0x30, 2, formatted_data) # type: ignore - @read_method(0x37, size=1) def read_lock_symbol(self) -> Optional[int]: """ Read the EEPROM write lock status. @@ -1130,11 +1176,12 @@ def read_lock_symbol(self) -> Optional[int]: 1 = Write lock enabled (cannot save to EEPROM) Returns None if read fails. """ - pass + return read_byte(self.servo, 0x37) - @write_method(0x37, size=1) @validate_value_range(0, 1) - def write_lock_symbol(self, value: int, reg: bool = False): + def write_lock_symbol( + self, value: int, reg: bool = False + ) -> dict[str, object] | None: """ Set the EEPROM write lock status. @@ -1145,9 +1192,9 @@ def write_lock_symbol(self, value: int, reg: bool = False): Returns: Response dict or None """ - pass + return write_byte(self.servo, 0x37, value, reg) - def lock(self, reg: bool = False): + def lock(self, reg: bool = False) -> dict[str, object] | None: """ Enable EEPROM write lock (prevent saving settings to EEPROM). @@ -1157,9 +1204,9 @@ def lock(self, reg: bool = False): Returns: Response dict or None """ - return self.write_lock_symbol(0, reg=reg) + return self.write_lock_symbol(1, reg=reg) - def unlock(self, reg: bool = False): + def unlock(self, reg: bool = False) -> dict[str, object] | None: """ Disable EEPROM write lock (allow saving settings to EEPROM). @@ -1169,9 +1216,8 @@ def unlock(self, reg: bool = False): Returns: Response dict or None """ - return self.write_lock_symbol(1, reg=reg) + return self.write_lock_symbol(0, reg=reg) - @read_method(0x38, size=2) def read_current_location(self) -> Optional[int]: """ Read the current position (feedback). @@ -1179,10 +1225,11 @@ def read_current_location(self) -> Optional[int]: Returns: int: Current position in steps. Returns None if read fails. """ - pass + return read_word(self.servo, 0x38) - @sync_read_method(0x38, size=2) - def sync_read_current_location(self, servo_ids: list[int]): + def sync_read_current_location( + self, servo_ids: list[int] + ) -> dict[int, Optional[int]]: """ SYNC READ current position from multiple servos. @@ -1192,20 +1239,36 @@ def sync_read_current_location(self, servo_ids: list[int]): Returns: Dictionary mapping servo_id to current position """ - pass + if self.servo.id != 254: + raise ST3215Error( + "sync_read_current_location can only be used with broadcast servo (ID 254)." + ) + responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( + 0x38, 2, servo_ids + ) + results: dict[int, int | None] = {} + for servo_id, response in responses.items(): + if response and isinstance(response, dict) and response.get("parameters"): + data: list[int] | bytes = response["parameters"] + if isinstance(data, (bytes, bytearray)): + raw = data[0] | (data[1] << 8) + else: + raw = data[0] | (data[1] << 8) + results[servo_id] = raw + else: + results[servo_id] = None + return results - @read_method(0x3A, size=2) def read_current_speed(self) -> Optional[int]: """ Read the current speed (feedback). Returns: - int: Current speed in steps/s. Returns None if read fails. + int: Current speed in steps/s. Returns None if read fails. """ - pass + return read_word(self.servo, 0x3A) - @sync_read_method(0x3A, size=2) - def sync_read_current_speed(self, servo_ids: list[int]): + def sync_read_current_speed(self, servo_ids: list[int]) -> dict[int, Optional[int]]: """ SYNC READ current speed from multiple servos. @@ -1215,9 +1278,26 @@ def sync_read_current_speed(self, servo_ids: list[int]): Returns: Dictionary mapping servo_id to current speed """ - pass + if self.servo.id != 254: + raise ST3215Error( + "sync_read_current_speed can only be used with broadcast servo (ID 254)." + ) + responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( + 0x3A, 2, servo_ids + ) + results: dict[int, int | None] = {} + for servo_id, response in responses.items(): + if response and isinstance(response, dict) and response.get("parameters"): + data: list[int] | bytes = response["parameters"] + if isinstance(data, (bytes, bytearray)): + raw = data[0] | (data[1] << 8) + else: + raw = data[0] | (data[1] << 8) + results[servo_id] = raw + else: + results[servo_id] = None + return results - @read_method(0x3C, size=2) def read_current_load(self) -> Optional[int]: """ Read the current load (motor drive duty cycle). @@ -1226,10 +1306,9 @@ def read_current_load(self) -> Optional[int]: int: Load in 0.1% units (0-1000). Example: 500 = 50.0%. Returns None if read fails. """ - pass + return read_word(self.servo, 0x3C) - @sync_read_method(0x3C, size=2) - def sync_read_current_load(self, servo_ids: list[int]): + def sync_read_current_load(self, servo_ids: list[int]) -> dict[int, Optional[int]]: """ SYNC READ current load from multiple servos. @@ -1239,21 +1318,36 @@ def sync_read_current_load(self, servo_ids: list[int]): Returns: Dictionary mapping servo_id to current load """ - pass + if self.servo.id != 254: + raise ST3215Error( + "sync_read_current_load can only be used with broadcast servo (ID 254)." + ) + responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( + 0x3C, 2, servo_ids + ) + results: dict[int, Optional[int]] = {} + for servo_id, response in responses.items(): + if response and response.get("parameters"): + data = response["parameters"] + raw = data[0] | (data[1] << 8) + results[servo_id] = raw + else: + results[servo_id] = None + return results - @read_method(0x3E, size=1) def read_current_voltage(self) -> Optional[int]: """ Read the current operating voltage. Returns: - int: Voltage in 0.1V units. Example: 120 = 12.0V. + int: Voltage in 0.1V units. Example: 120 = 12.0V. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x3E) - @sync_read_method(0x3E, size=1) - def sync_read_current_voltage(self, servo_ids: list[int]): + def sync_read_current_voltage( + self, servo_ids: list[int] + ) -> dict[int, Optional[int]]: """ SYNC READ current voltage from multiple servos. @@ -1263,9 +1357,22 @@ def sync_read_current_voltage(self, servo_ids: list[int]): Returns: Dictionary mapping servo_id to current voltage """ - pass + if self.servo.id != 254: + raise ST3215Error( + "sync_read_current_voltage can only be used with broadcast servo (ID 254)." + ) + responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( + 0x3E, 1, servo_ids + ) + results: dict[int, Optional[int]] = {} + for servo_id, response in responses.items(): + if response and response.get("parameters"): + data = response["parameters"] + results[servo_id] = data[0] + else: + results[servo_id] = None + return results - @read_method(0x3F, size=1) def read_current_temperature(self) -> Optional[int]: """ Read the current internal temperature. @@ -1273,10 +1380,11 @@ def read_current_temperature(self) -> Optional[int]: Returns: int: Temperature in °C. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x3F) - @sync_read_method(0x3F, size=1) - def sync_read_current_temperature(self, servo_ids: list[int]): + def sync_read_current_temperature( + self, servo_ids: list[int] + ) -> dict[int, Optional[int]]: """ SYNC READ current temperature from multiple servos. @@ -1286,9 +1394,22 @@ def sync_read_current_temperature(self, servo_ids: list[int]): Returns: Dictionary mapping servo_id to current temperature """ - pass + if self.servo.id != 254: + raise ST3215Error( + "sync_read_current_temperature can only be used with broadcast servo (ID 254)." + ) + responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( + 0x3F, 1, servo_ids + ) + results: dict[int, Optional[int]] = {} + for servo_id, response in responses.items(): + if response and response.get("parameters"): + data = response["parameters"] + results[servo_id] = data[0] + else: + results[servo_id] = None + return results - @read_method(0x40, size=1) def read_async_write_flag(self) -> Optional[int]: """ Read the asynchronous write flag. @@ -1297,21 +1418,19 @@ def read_async_write_flag(self) -> Optional[int]: int: Flag value for asynchronous write instructions. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x40) - @read_method(0x41, size=1) def read_servo_status(self) -> Optional[int]: """ Read the servo status (error flags). Returns: - int: Status bitmask. Bit = 1 indicates error, 0 = no error. + int: Status bitmask. Bit = 1 indicates error, 0 = no error. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x41) - @sync_read_method(0x41, size=1) - def sync_read_servo_status(self, servo_ids: list[int]): + def sync_read_servo_status(self, servo_ids: list[int]) -> dict[int, Optional[int]]: """ SYNC READ servo status from multiple servos. @@ -1321,9 +1440,22 @@ def sync_read_servo_status(self, servo_ids: list[int]): Returns: Dictionary mapping servo_id to servo status """ - pass + if self.servo.id != 254: + raise ST3215Error( + "sync_read_servo_status can only be used with broadcast servo (ID 254)." + ) + responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( + 0x41, 1, servo_ids + ) + results: dict[int, Optional[int]] = {} + for servo_id, response in responses.items(): + if response and response.get("parameters"): + data = response["parameters"] + results[servo_id] = data[0] + else: + results[servo_id] = None + return results - @read_method(0x42, size=1) def read_mobile_sign(self) -> Optional[int]: """ Read the movement flag. @@ -1332,7 +1464,7 @@ def read_mobile_sign(self) -> Optional[int]: int: 1 = Servo in motion, 0 = Stationary. Returns None if read fails. """ - pass + return read_byte(self.servo, 0x42) def is_moving(self) -> bool: """ @@ -1344,7 +1476,6 @@ def is_moving(self) -> bool: value = self.read_mobile_sign() return value == 1 if value is not None else False - @read_method(0x45, size=2) def read_current_current(self) -> Optional[int]: """ Read the current motor current draw. @@ -1353,10 +1484,11 @@ def read_current_current(self) -> Optional[int]: int: Current in 6.5mA units (0-500). Max = 500*6.5mA = 3250mA. Returns None if read fails. """ - pass + return read_word(self.servo, 0x45) - @sync_read_method(0x45, size=2) - def sync_read_current_current(self, servo_ids: list[int]): + def sync_read_current_current( + self, servo_ids: list[int] + ) -> dict[int, Optional[int]]: """ SYNC READ current draw from multiple servos. @@ -1366,4 +1498,19 @@ def sync_read_current_current(self, servo_ids: list[int]): Returns: Dictionary mapping servo_id to current draw """ - pass + if self.servo.id != 254: + raise ST3215Error( + "sync_read_current_current can only be used with broadcast servo (ID 254)." + ) + responses: dict[int, dict[str, Any] | None] = self.servo._sync_read( + 0x45, 2, servo_ids + ) + results: dict[int, Optional[int]] = {} + for servo_id, response in responses.items(): + if response and response.get("parameters"): + data = response["parameters"] + raw = data[0] | (data[1] << 8) + results[servo_id] = raw + else: + results[servo_id] = None + return results diff --git a/src/python_st3215/servo.py b/src/python_st3215/servo.py index 3c2bf84..7de9d8e 100644 --- a/src/python_st3215/servo.py +++ b/src/python_st3215/servo.py @@ -1,5 +1,5 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Sequence +from typing import TYPE_CHECKING, Sequence, Any, cast if TYPE_CHECKING: from .st3215 import ST3215 @@ -9,7 +9,7 @@ class Servo: - def __init__(self, controller: "ST3215", servo_id: int): + def __init__(self, controller: "ST3215", servo_id: int) -> None: self.controller = controller self.id = servo_id self.logger = controller.logger @@ -18,14 +18,14 @@ def __init__(self, controller: "ST3215", servo_id: int): def send( self, instruction: int | Instruction, parameters: Sequence[int] | None = None - ): + ) -> dict[str, object] | None: self.logger.debug( f"Servo {self.id}: sending instruction {instruction} with parameters {parameters}" ) packet = self.controller.send_instruction(self.id, instruction, parameters) response = self.controller.read_response(packet) if response: - parsed = self.controller.parse_response(response) + parsed: dict[str, object] | None = self.controller.parse_response(response) self.logger.debug(f"Servo {self.id}: received response {parsed}") return parsed self.logger.warning( @@ -33,42 +33,45 @@ def send( ) return None - def ping(self): + def ping(self) -> dict[str, object] | None: """Send PING command to the servo to check if it is responsive.""" - return self.controller.ping(self.id) + return cast(dict[str, object] | None, self.controller.ping(self.id)) - def action(self): + def action(self) -> dict[str, object] | None: """Send ACTION command to the servo to execute all registered commands.""" self.logger.debug(f"Sending ACTION command to servo {self.id}") return self.send(Instruction.ACTION) - def reset(self): + def reset(self) -> dict[str, object] | None: """Send RESET command to the servo to reset it to factory defaults.""" self.logger.debug(f"Sending RESET command to servo {self.id}") return self.send(Instruction.RESET) - def _read_memory(self, address: int, length: int = 1): + def _read_memory(self, address: int, length: int = 1) -> int | None: self.logger.debug( f"Reading {length} bytes from address {address:#02x} on servo {self.id}" ) response = self.send(Instruction.READ, [address, length]) - if response and response["parameters"]: + if response and response.get("parameters"): data = response["parameters"] - if length == 1: - return data[0] - value = 0 - for i, byte in enumerate(data): - value |= byte << (8 * i) - self.logger.debug( - f"Read value {value} (0x{value:04X}) from servo {self.id}" - ) - return value + if isinstance(data, (bytes, bytearray)): + if length == 1: + return data[0] + value = 0 + for i, byte in enumerate(data): + value |= byte << (8 * i) + self.logger.debug( + f"Read value {value} (0x{value:04X}) from servo {self.id}" + ) + return value self.logger.warning( f"Failed to read memory from servo {self.id} at address {address:#02x}" ) return None - def _write_memory(self, address: int, values: Sequence[int]): + def _write_memory( + self, address: int, values: Sequence[int] + ) -> dict[str, object] | None: if not isinstance(values, Sequence): values = [values] self.logger.debug( @@ -76,7 +79,9 @@ def _write_memory(self, address: int, values: Sequence[int]): ) return self.send(Instruction.WRITE, [address, *values]) - def _reg_write_memory(self, address: int, values: Sequence[int]): + def _reg_write_memory( + self, address: int, values: Sequence[int] + ) -> dict[str, object] | None: if not isinstance(values, Sequence): values = [values] self.logger.debug( @@ -86,8 +91,10 @@ def _reg_write_memory(self, address: int, values: Sequence[int]): def _sync_write( self, address: int, data_length: int, servo_data: dict[int, Sequence[int]] - ): + ) -> None: return self.controller._sync_write(address, data_length, servo_data) - def _sync_read(self, address: int, data_length: int, servo_ids: Sequence[int]): + def _sync_read( + self, address: int, data_length: int, servo_ids: Sequence[int] + ) -> dict[int, dict[str, Any] | None]: return self.controller._sync_read(address, data_length, servo_ids) diff --git a/src/python_st3215/st3215.py b/src/python_st3215/st3215.py index 1993640..0dce40c 100644 --- a/src/python_st3215/st3215.py +++ b/src/python_st3215/st3215.py @@ -6,7 +6,7 @@ from .errors import ServoNotRespondingError, InvalidInstructionError from .decorators import validate_servo_id -from typing import Optional, Sequence +from typing import Optional, Sequence, Literal class ST3215: @@ -19,15 +19,15 @@ class ST3215: logger.addHandler(_console_handler) @classmethod - def set_log_level(cls, level: int): + def set_log_level(cls, level: int) -> None: cls.logger.setLevel(level) @classmethod - def disable_logging(cls): + def disable_logging(cls) -> None: cls.logger.disabled = True @classmethod - def enable_logging(cls): + def enable_logging(cls) -> None: cls.logger.disabled = False def __init__( @@ -35,7 +35,7 @@ def __init__( port: str, baudrate: int = 1000000, read_timeout: float = 0.002, - ): + ) -> None: """ Initialize the ST3215 controller with the given serial port settings. Args: @@ -50,7 +50,7 @@ def __init__( self.logger.debug(f"Serial port opened at {baudrate} baud.") self.broadcast = Servo(self, 254) - def close(self): + def close(self) -> None: if self.ser.is_open: self.ser.close() self.logger.info("Serial port closed.") @@ -102,7 +102,7 @@ def read_response(self, sent_packet: bytes) -> Optional[bytes]: return raw_data[len(sent_packet) :] return raw_data - def parse_response(self, data: bytes): + def parse_response(self, data: bytes) -> Optional[dict[str, object]]: self.logger.debug(f"Parsing response data: {list(data)}") if len(data) < 6: self.logger.warning("Response too short to parse.") @@ -116,7 +116,7 @@ def parse_response(self, data: bytes): checksum_base = servo_id + length + error + sum(parameters) calculated_checksum = (~checksum_base) & 0xFF valid_checksum = calculated_checksum == received_checksum - parsed = { + parsed: dict[str, object] = { "header": header, "id": servo_id, "length": length, @@ -130,7 +130,7 @@ def parse_response(self, data: bytes): return parsed @validate_servo_id - def ping(self, servo_id: int): + def ping(self, servo_id: int) -> Optional[dict[str, object]]: """ Send PING command to the servo to check if it is responsive. Returns: @@ -145,7 +145,7 @@ def ping(self, servo_id: int): return None @validate_servo_id - def wrap_servo(self, servo_id: int): + def wrap_servo(self, servo_id: int) -> Servo: """ Create a Servo instance for the given servo ID after verifying it responds to ping. Returns: @@ -160,7 +160,7 @@ def wrap_servo(self, servo_id: int): ) return Servo(self, servo_id) - def list_servos(self): + def list_servos(self) -> list[int]: """ Scan for connected servos by pinging all possible IDs (0-253). Returns: @@ -177,7 +177,7 @@ def list_servos(self): def _sync_write( self, address: int, data_length: int, servo_data: dict[int, Sequence[int]] - ): + ) -> None: self.logger.debug( f"SYNC WRITE to address {address:#02x} for {len(servo_data)} servos" ) @@ -192,16 +192,17 @@ def _sync_write( parameters.extend(data) self.send_instruction(0xFE, Instruction.SYNC_WRITE, parameters) self.logger.debug(f"SYNC WRITE command sent, no response expected") - return None - def _sync_read(self, address: int, data_length: int, servo_ids: Sequence[int]): + def _sync_read( + self, address: int, data_length: int, servo_ids: Sequence[int] + ) -> dict[int, Optional[dict[str, object]]]: self.logger.debug( f"SYNC READ from address {address:#02x}, length {data_length} " f"for servos {servo_ids}" ) parameters = [address, data_length, *servo_ids] packet = self.send_instruction(0xFE, Instruction.SYNC_READ, parameters) - responses = {} + responses: dict[int, Optional[dict[str, object]]] = {} for servo_id in servo_ids: response = self.read_response(packet) if response: @@ -216,3 +217,15 @@ def _sync_read(self, address: int, data_length: int, servo_ids: Sequence[int]): ) responses[servo_id] = None return responses + + def __enter__(self) -> "ST3215": + return self + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: object, + ) -> Literal[False]: + self.close() + return False