From 211c6ac0866c7b0121a5282bd7b480f98a9614ac Mon Sep 17 00:00:00 2001 From: sgbaird Date: Fri, 14 Nov 2025 19:50:53 -0500 Subject: [PATCH 1/7] feat: add scripts for desk lifter control and calibration --- scripts/lifter_calibration.txt | 15 ++++++++++ scripts/move_to_height.py | 55 ++++++++++++++++++++++++++++++++++ scripts/reset_to_lowest.py | 33 ++++++++++++++++++++ scripts/test_down.py | 32 ++++++++++++++++++++ 4 files changed, 135 insertions(+) create mode 100644 scripts/lifter_calibration.txt create mode 100644 scripts/move_to_height.py create mode 100644 scripts/reset_to_lowest.py create mode 100644 scripts/test_down.py diff --git a/scripts/lifter_calibration.txt b/scripts/lifter_calibration.txt new file mode 100644 index 0000000..c75dfff --- /dev/null +++ b/scripts/lifter_calibration.txt @@ -0,0 +1,15 @@ +# Desk Lifter Calibration Data + +Lowest height: 23.7 inches +Highest height: 54.5 inches + +Down movement rate: 0.55 inches/second (measured from 54.5" to 49" in 10s) +Up movement rate: 0.55 inches/second (measured from 23.7" to 29.1" in 10s) + +Calibration measurements: +- Down 10s: 54.5" → 49.0" +- Up 10s: 23.7" → 29.1" + +Notes: +- Use these rates to estimate time needed to reach a target height from the lowest position. +- Rates may vary slightly; repeat measurements for higher accuracy if needed. diff --git a/scripts/move_to_height.py b/scripts/move_to_height.py new file mode 100644 index 0000000..46b986d --- /dev/null +++ b/scripts/move_to_height.py @@ -0,0 +1,55 @@ +import time +import RPi.GPIO as GPIO + +# Calibration data +LOWEST_HEIGHT = 23.7 # inches +HIGHEST_HEIGHT = 54.5 # inches +UP_RATE = 0.54 # inches per second (from calibration) + +UP_PIN = 17 # BCM numbering, physical pin 11 +DOWN_PIN = 27 # BCM numbering, physical pin 13 + +GPIO.setmode(GPIO.BCM) + +def release_up(): + GPIO.setup(UP_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) + +def press_up(): + GPIO.setup(UP_PIN, GPIO.OUT, initial=GPIO.LOW) + +def release_down(): + GPIO.setup(DOWN_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) + +def press_down(): + GPIO.setup(DOWN_PIN, GPIO.OUT, initial=GPIO.LOW) + +def move_to_height(target_height): + if not (LOWEST_HEIGHT <= target_height <= HIGHEST_HEIGHT): + raise ValueError(f"Target height {target_height}'' is out of range.") + # Step 1: Reset to lowest + print(f"Resetting to lowest position ({LOWEST_HEIGHT})...") + release_down() + input("Press Enter to move all the way DOWN (about 56 seconds from max height)...") + press_down() + time.sleep(56.0) + release_down() + print("At lowest position.") + # Step 2: Move up for calculated time + delta = target_height - LOWEST_HEIGHT + up_time = delta / UP_RATE + print(f"Moving UP for {up_time:.1f} seconds to reach {target_height}''...") + release_up() + input("Press Enter to move UP...") + press_up() + time.sleep(up_time) + release_up() + print(f"Arrived at {target_height}'' (approximate).") + +if __name__ == "__main__": + try: + target = float(input(f"Enter target height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) + move_to_height(target) + finally: + release_up() + release_down() + GPIO.cleanup() diff --git a/scripts/reset_to_lowest.py b/scripts/reset_to_lowest.py new file mode 100644 index 0000000..0882b7a --- /dev/null +++ b/scripts/reset_to_lowest.py @@ -0,0 +1,33 @@ +import time +import RPi.GPIO as GPIO + +DOWN_PIN = 27 # BCM numbering, physical pin 13 + +GPIO.setmode(GPIO.BCM) + +def release_down(): + # High-impedance: "button not pressed" + GPIO.setup(DOWN_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) + +def press_down(): + # Drive low: "button pressed" + GPIO.setup(DOWN_PIN, GPIO.OUT, initial=GPIO.LOW) + +try: + # This script is intended to move the desk from MAX height to MIN height. + # Ensure the desk is at the highest position before running. + release_down() + print("Ready. Desk should be at MAX height. It will now move to MIN height.") + + input("Press Enter to move all the way DOWN (about 56 seconds from max height)...") + + print("Moving DOWN to lowest position...") + press_down() + time.sleep(56.0) # 56s: calibrated for max to min travel (30.8" at 0.55 in/s) + release_down() + print("Released DOWN. Lifter should now be at lowest position.") + +finally: + # Ensure we leave the line released + release_down() + GPIO.cleanup() diff --git a/scripts/test_down.py b/scripts/test_down.py new file mode 100644 index 0000000..c80765f --- /dev/null +++ b/scripts/test_down.py @@ -0,0 +1,32 @@ +import time +import RPi.GPIO as GPIO + +DOWN_PIN = 27 # BCM numbering, physical pin 13 + +GPIO.setmode(GPIO.BCM) + +def release_down(): + # High-impedance: "button not pressed" + GPIO.setup(DOWN_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) + +def press_down(): + # Drive low: "button pressed" + GPIO.setup(DOWN_PIN, GPIO.OUT, initial=GPIO.LOW) + +try: + # Make sure we're released at start + release_down() + print("Ready. Desk should NOT move yet.") + + input("Press Enter to move DOWN for 2 seconds...") + + print("Moving DOWN...") + press_down() + time.sleep(2.0) # adjust if you want a shorter/longer test + release_down() + print("Released DOWN. Test finished.") + +finally: + # Ensure we leave the line released + release_down() + GPIO.cleanup() From ef2732d5454e8ca6178063d72205910ceb592926 Mon Sep 17 00:00:00 2001 From: sgbaird Date: Fri, 14 Nov 2025 22:03:43 -0500 Subject: [PATCH 2/7] feat: implement lifter control script with state management --- scripts/lifter_state.json | 4 ++ scripts/move_to_height_no_reset.py | 84 ++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+) create mode 100644 scripts/lifter_state.json create mode 100644 scripts/move_to_height_no_reset.py diff --git a/scripts/lifter_state.json b/scripts/lifter_state.json new file mode 100644 index 0000000..0d71c91 --- /dev/null +++ b/scripts/lifter_state.json @@ -0,0 +1,4 @@ +{ + "last_position": 24.0, + "total_up_time": 26.0 +} \ No newline at end of file diff --git a/scripts/move_to_height_no_reset.py b/scripts/move_to_height_no_reset.py new file mode 100644 index 0000000..8c6a501 --- /dev/null +++ b/scripts/move_to_height_no_reset.py @@ -0,0 +1,84 @@ +import time +import json +import os +import RPi.GPIO as GPIO + +# Calibration data +LOWEST_HEIGHT = 23.7 # inches +HIGHEST_HEIGHT = 54.5 # inches +UP_RATE = 0.54 # inches per second +DOWN_RATE = 0.55 # inches per second + +STATE_FILE = "lifter_state.json" + +UP_PIN = 17 # BCM numbering, physical pin 11 +DOWN_PIN = 27 # BCM numbering, physical pin 13 + +GPIO.setmode(GPIO.BCM) + +def release_up(): + GPIO.setup(UP_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) + +def press_up(): + GPIO.setup(UP_PIN, GPIO.OUT, initial=GPIO.LOW) + +def release_down(): + GPIO.setup(DOWN_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) + +def press_down(): + GPIO.setup(DOWN_PIN, GPIO.OUT, initial=GPIO.LOW) + +def load_state(): + if os.path.exists(STATE_FILE): + with open(STATE_FILE, 'r') as f: + return json.load(f) + return { + "total_up_time": 0.0, + "last_position": None + } + +def save_state(state): + with open(STATE_FILE, 'w') as f: + json.dump(state, f, indent=2) + +def move_to_height(target_height, current_height): + state = load_state() + if not (LOWEST_HEIGHT <= target_height <= HIGHEST_HEIGHT): + raise ValueError(f"Target height {target_height}'' is out of range.") + delta = target_height - current_height + if abs(delta) < 0.01: + print(f"Already at {target_height}'' (within tolerance). No movement needed.") + return + if delta > 0: + # Move up + up_time = delta / UP_RATE + print(f"Moving UP for {up_time:.1f} seconds to reach {target_height}''...") + release_up() + input("Press Enter to move UP...") + press_up() + time.sleep(up_time) + release_up() + state["total_up_time"] += up_time + else: + # Move down + down_time = abs(delta) / DOWN_RATE + print(f"Moving DOWN for {down_time:.1f} seconds to reach {target_height}''...") + release_down() + input("Press Enter to move DOWN...") + press_down() + time.sleep(down_time) + release_down() + # Do not update total_up_time for down movement + state["last_position"] = target_height + save_state(state) + print(f"Arrived at {target_height}'' (approximate). State saved.") + +if __name__ == "__main__": + try: + current = float(input(f"Enter current height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) + target = float(input(f"Enter target height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) + move_to_height(target, current) + finally: + release_up() + release_down() + GPIO.cleanup() From a533d0a036698fe080a966ed5ecacce15da3a304 Mon Sep 17 00:00:00 2001 From: sgbaird Date: Sat, 15 Nov 2025 00:14:10 -0500 Subject: [PATCH 3/7] Tested with and without prefect, using state tracking. Prefect cloud working OK. Still assumes developer permissions. --- CHANGELOG.md | 3 + README.md | 6 + docs/raspberry-pi-setup.md | 114 ++++++++++++++ lifter_state.json | 6 + requirements.txt | 3 + scripts/desk_control_prefect.py | 231 +++++++++++++++++++++++++++++ scripts/lifter_state.json | 4 +- scripts/move_to_height_no_reset.py | 12 +- 8 files changed, 374 insertions(+), 5 deletions(-) create mode 100644 docs/raspberry-pi-setup.md create mode 100644 lifter_state.json create mode 100644 requirements.txt create mode 100644 scripts/desk_control_prefect.py diff --git a/CHANGELOG.md b/CHANGELOG.md index bdae0c8..6cdf280 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,3 +11,6 @@ - Docs: Added prices to `docs/bill_of_materials.md` (prices as of 11/6/2025) — 2025-11-14 - Build: Moved publishing to GitHub Actions trusted publisher workflow and aligned tooling docs — 2025-11-14 +- Fix: Added Raspberry Pi 5 GPIO compatibility using rpi-lgpio library — 2025-11-14 +- Docs: Added `docs/raspberry-pi-setup.md` with Pi 5 setup instructions — 2025-11-14 +- Fix: Updated requirements.txt to use rpi-lgpio instead of RPi.GPIO for Pi 5 compatibility — 2025-11-14 diff --git a/README.md b/README.md index 54a4530..b956d64 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,12 @@ A longer description of your project goes here... +## Setup + +For Raspberry Pi 5 setup instructions, see [docs/raspberry-pi-setup.md](docs/raspberry-pi-setup.md). + +For bill of materials, see [docs/bill_of_materials.md](docs/bill_of_materials.md). + diff --git a/docs/raspberry-pi-setup.md b/docs/raspberry-pi-setup.md new file mode 100644 index 0000000..53c2c15 --- /dev/null +++ b/docs/raspberry-pi-setup.md @@ -0,0 +1,114 @@ +# Raspberry Pi 5 GPIO Setup for Desk Lifter Control + +This document describes the setup required to run the desk lifter control scripts on a Raspberry Pi 5 running Debian Trixie. + +## Hardware Requirements + +- Raspberry Pi 5 (with BCM2712 SoC) +- GPIO pins connected to the desk lifter motor controller: + - UP_PIN: GPIO 17 (physical pin 11) + - DOWN_PIN: GPIO 27 (physical pin 13) +- Power supply: 5V USB-C (at least 3A, preferably 5A for high-power peripherals) + +## Software Requirements + +- Debian Trixie (13.x) +- Python 3.11 or later +- Virtual environment (`venv`) + +## GPIO Library Compatibility + +The standard `RPi.GPIO` library does not support Raspberry Pi 5 due to changes in the BCM2712 SoC. Instead, use `rpi-lgpio`, a drop-in replacement that provides the same API but uses the `lgpio` library for GPIO access. + +## Installation Steps + +1. **Clone the repository** (if not already done): + ```bash + git clone https://github.com/AccelerationConsortium/progressive-automations-python.git + cd progressive-automations-python + ``` + +2. **Create and activate a virtual environment**: + ```bash + python3 -m venv venv + source venv/bin/activate + ``` + +3. **Install dependencies**: + ```bash + pip install -r requirements.txt + ``` + +4. **Remove incompatible RPi.GPIO package** (if installed): + ```bash + sudo apt remove -y python3-rpi.gpioRP + ``` + +5. **Install rpi-lgpio in the virtual environment**: + ```bash + source venv/bin/activate + pip install rpi-lgpio + ``` + +6. **Ensure user is in the gpio group** (for GPIO access without sudo): + ```bash + sudo usermod -a -G gpio $USER + ``` + Reboot after adding the user to the group. + +## Running the Scripts + +Activate the virtual environment and run the scripts from the `scripts/` directory: + +```bash +source venv/bin/activate +cd scripts +python move_to_height_no_reset.py +``` + +Available scripts: +- `move_to_height_no_reset.py`: Move the desk to a target height +- `test_up.py`: Test upward movement +- `test_down.py`: Test downward movement +- `reset_to_lowest.py`: Reset to lowest position + +## Calibration + +The scripts use calibration data: +- Lowest height: 23.7 inches +- Highest height: 54.5 inches +- Up rate: 0.54 inches/second +- Down rate: 0.55 inches/second + +Adjust these values in the script if your setup differs. + +State is saved in `lifter_state.json` in the scripts directory. + +## Troubleshooting + +### RuntimeError: Cannot determine SOC peripheral base address + +This error occurs when using the old `RPi.GPIO` library on Raspberry Pi 5. Ensure you have installed `rpi-lgpio` and removed `python3-rpi.gpio`. + +### Permission denied on GPIO access + +- Ensure your user is in the `gpio` group: `groups $USER` should include `gpio`. +- If not, run `sudo usermod -a -G gpio $USER` and reboot. + +### Script runs but motor doesn't move + +- Check GPIO pin connections. +- Verify the motor controller is powered and connected correctly. +- Test with `test_up.py` and `test_down.py` to isolate issues. + +### Virtual environment issues + +- Always activate the venv before running scripts: `source venv/bin/activate` +- If pip installs fail with "externally-managed-environment", you're trying to install system-wide. Use the venv. + +## Notes + +- The scripts use BCM pin numbering. +- GPIO access requires root permissions or membership in the `gpio` group. +- On Raspberry Pi 5, USB peripherals may be limited to 600mA if using a 3A power supply. Use a 5A supply for high-power devices. +- This setup is tested on Debian Trixie with Raspberry Pi 5. \ No newline at end of file diff --git a/lifter_state.json b/lifter_state.json new file mode 100644 index 0000000..cc5dbf6 --- /dev/null +++ b/lifter_state.json @@ -0,0 +1,6 @@ +{ + "total_up_time": 32.40740740740741, + "last_position": 23.7, + "last_reset_time": "2025-11-14T23:55:54.119832", + "current_window_usage": 7.340067340067339 +} \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..954f31a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +# Core dependencies for desk lifter control +prefect>=2.0.0 +rpi-lgpio \ No newline at end of file diff --git a/scripts/desk_control_prefect.py b/scripts/desk_control_prefect.py new file mode 100644 index 0000000..6a41280 --- /dev/null +++ b/scripts/desk_control_prefect.py @@ -0,0 +1,231 @@ +import time +import json +import os +from datetime import datetime, timedelta +from prefect import flow, task +import RPi.GPIO as GPIO + +# Calibration data +LOWEST_HEIGHT = 23.7 # inches +HIGHEST_HEIGHT = 54.5 # inches +UP_RATE = 0.54 # inches per second +DOWN_RATE = 0.55 # inches per second + +STATE_FILE = "lifter_state.json" +MAX_USAGE_TIME = 120 # 2 minutes in seconds +RESET_WINDOW = 1200 # 20 minutes in seconds + +UP_PIN = 17 # BCM numbering, physical pin 11 +DOWN_PIN = 27 # BCM numbering, physical pin 13 + +GPIO.setmode(GPIO.BCM) + +@task +def setup_gpio(): + """Initialize GPIO settings""" + GPIO.setmode(GPIO.BCM) + +@task +def release_up(): + """Set UP pin to high-impedance state""" + GPIO.setup(UP_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) + +@task +def press_up(): + """Set UP pin to drive low (button pressed)""" + GPIO.setup(UP_PIN, GPIO.OUT, initial=GPIO.LOW) + +@task +def release_down(): + """Set DOWN pin to high-impedance state""" + GPIO.setup(DOWN_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) + +@task +def press_down(): + """Set DOWN pin to drive low (button pressed)""" + GPIO.setup(DOWN_PIN, GPIO.OUT, initial=GPIO.LOW) + +@task +def cleanup_gpio(): + """Clean up GPIO resources""" + release_up() + release_down() + GPIO.cleanup() + +@task +def load_state(): + """Load the current state from JSON file""" + if os.path.exists(STATE_FILE): + with open(STATE_FILE, 'r') as f: + return json.load(f) + return { + "total_up_time": 0.0, + "last_position": None, + "last_reset_time": datetime.now().isoformat(), + "current_window_usage": 0.0 + } + +@task +def save_state(state): + """Save state to JSON file""" + with open(STATE_FILE, 'w') as f: + json.dump(state, f, indent=2) + +@task +def check_timing_limits(state, required_time): + """Check if the movement is within timing limits""" + current_time = datetime.now() + last_reset = datetime.fromisoformat(state["last_reset_time"]) + + # Check if 20-minute window has passed + if (current_time - last_reset).total_seconds() >= RESET_WINDOW: + # Reset the usage window + state["last_reset_time"] = current_time.isoformat() + state["current_window_usage"] = 0.0 + return True, state + + # Check if adding this movement would exceed the 2-minute limit + if state["current_window_usage"] + required_time > MAX_USAGE_TIME: + remaining_time = MAX_USAGE_TIME - state["current_window_usage"] + raise ValueError(f"Movement would exceed 2-minute limit. Remaining time: {remaining_time:.1f}s") + + return True, state + +@task +def move_up(up_time): + """Execute upward movement for specified time""" + print(f"Moving UP for {up_time:.1f} seconds...") + release_up() + # Removed input() for automated execution + press_up() + time.sleep(up_time) + release_up() + +@task +def move_down(down_time): + """Execute downward movement for specified time""" + print(f"Moving DOWN for {down_time:.1f} seconds...") + release_down() + # Removed input() for automated execution + press_down() + time.sleep(down_time) + release_down() + +#@flow +@task +def move_to_height_flow(target_height: float, current_height: float): + """Main flow to move desk to target height with safety checks""" + + # Validate height range + if not (LOWEST_HEIGHT <= target_height <= HIGHEST_HEIGHT): + raise ValueError(f"Target height {target_height}'' is out of range.") + + # Setup GPIO + setup_gpio() + + try: + # Load current state + state = load_state() + + # Calculate movement requirements + delta = target_height - current_height + if abs(delta) < 0.01: + print(f"Already at {target_height}'' (within tolerance). No movement needed.") + return + + if delta > 0: + # Moving up + up_time = delta / UP_RATE + + # Check timing limits + check_timing_limits(state, up_time) + + # Execute movement + move_up(up_time) + + # Update state + state["total_up_time"] += up_time + state["current_window_usage"] += up_time + else: + # Moving down + down_time = abs(delta) / DOWN_RATE + + # Check timing limits + check_timing_limits(state, down_time) + + # Execute movement + move_down(down_time) + + # Update state (down time counts toward window usage but not total_up_time) + state["current_window_usage"] += down_time + + # Update position and save state + state["last_position"] = target_height + save_state(state) + + print(f"Arrived at {target_height}'' (approximate). State saved.") + print(f"Window usage: {state['current_window_usage']:.1f}s / {MAX_USAGE_TIME}s") + print(f"Total up time: {state['total_up_time']:.1f}s") + + finally: + # Always clean up GPIO + cleanup_gpio() + +@flow +def custom_test_sequence(): + """Custom flow: Start at lowest, move up 0.5 inches, rest 10 seconds, move down 0.5 inches""" + start_height = LOWEST_HEIGHT + up_target = start_height + 0.5 + + print("Starting custom test sequence...") + print(f"Starting at: {start_height}\"") + print(f"Will move to: {up_target}\"") + print(f"Then rest for 10 seconds") + print(f"Then return to: {start_height}\"") + + # Phase 1: Move up 0.5 inches + print("\n--- Phase 1: Moving UP 0.5 inches ---") + move_to_height_flow(up_target, start_height) + + # Phase 2: Rest for 10 seconds + print("\n--- Phase 2: Resting for 10 seconds ---") + time.sleep(10) + print("Rest complete.") + + # Phase 3: Move down 0.5 inches (back to lowest) + print("\n--- Phase 3: Moving DOWN 0.5 inches ---") + move_to_height_flow(start_height, up_target) + + print("\nCustom test sequence complete!") + +@flow +def desk_control_cli(): + """CLI interface for desk control""" + try: + current = float(input(f"Enter current height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) + target = float(input(f"Enter target height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) + move_to_height_flow(target, current) + except ValueError as e: + print(f"Error: {e}") + except KeyboardInterrupt: + print("\nOperation cancelled.") + +if __name__ == "__main__": + import sys + from prefect import flow + + if len(sys.argv) > 1 and sys.argv[1] == "deploy": + # Deploy the custom test sequence to run at scheduled times + custom_test_sequence.from_source( + source=".", + entrypoint="scripts/desk_control_prefect.py:custom_test_sequence", + ).deploy( + name="desk-lifter-test-sequence-1139pm-toronto", + work_pool_name="default-agent-pool", + #cron="39 4 * * *", # Run daily at 11:39 PM Toronto time (4:39 AM UTC) + ) + print("Deployment created! Run 'prefect worker start --pool default-agent-pool' to execute scheduled flows.") + elif len(sys.argv) > 1 and sys.argv[1] == "test": + custom_test_sequence() + else: + desk_control_cli() \ No newline at end of file diff --git a/scripts/lifter_state.json b/scripts/lifter_state.json index 0d71c91..e1e0664 100644 --- a/scripts/lifter_state.json +++ b/scripts/lifter_state.json @@ -1,4 +1,4 @@ { - "last_position": 24.0, - "total_up_time": 26.0 + "last_position": 26.0, + "total_up_time": 29.703703703703702 } \ No newline at end of file diff --git a/scripts/move_to_height_no_reset.py b/scripts/move_to_height_no_reset.py index 8c6a501..f2315ad 100644 --- a/scripts/move_to_height_no_reset.py +++ b/scripts/move_to_height_no_reset.py @@ -41,8 +41,12 @@ def save_state(state): with open(STATE_FILE, 'w') as f: json.dump(state, f, indent=2) -def move_to_height(target_height, current_height): +def move_to_height(target_height, current_height=None): state = load_state() + if current_height is None: + if state["last_position"] is None: + raise ValueError("Current height unknown. Please provide current height.") + current_height = state["last_position"] if not (LOWEST_HEIGHT <= target_height <= HIGHEST_HEIGHT): raise ValueError(f"Target height {target_height}'' is out of range.") delta = target_height - current_height @@ -75,9 +79,11 @@ def move_to_height(target_height, current_height): if __name__ == "__main__": try: - current = float(input(f"Enter current height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) + # current = float(input(f"Enter current height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) + state = load_state() + print(state) target = float(input(f"Enter target height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) - move_to_height(target, current) + move_to_height(target) finally: release_up() release_down() From 73e21d3a587addb6cc3ae51a7367bb5f7e81d47a Mon Sep 17 00:00:00 2001 From: sgbaird Date: Tue, 18 Nov 2025 13:35:47 -0500 Subject: [PATCH 4/7] feat: refactor GPIO pin management and implement duty cycle tracking for desk lifter control --- CHANGELOG.md | 1 + docs/raspberry-pi-setup.md | 16 +-- lifter_state.json | 6 - scripts/constants.py | 4 + scripts/desk_control_prefect.py | 159 +++++++++++++++------ scripts/desk_controller.py | 215 +++++++++++++++++++++++++++++ scripts/duty_cycle.py | 124 +++++++++++++++++ scripts/move_to_height.py | 4 +- scripts/move_to_height_no_reset.py | 4 +- scripts/movement_control.py | 107 ++++++++++++++ scripts/prefect_flows.py | 159 +++++++++++++++++++++ scripts/reset_to_lowest.py | 3 +- scripts/test_down.py | 3 +- scripts/test_up.py | 3 +- 14 files changed, 734 insertions(+), 74 deletions(-) delete mode 100644 lifter_state.json create mode 100644 scripts/constants.py create mode 100644 scripts/desk_controller.py create mode 100644 scripts/duty_cycle.py create mode 100644 scripts/movement_control.py create mode 100644 scripts/prefect_flows.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 6cdf280..377ee07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - FIX: nasty bug #1729 fixed - add your changes here! +- Refactor: Centralized GPIO pin constants in `scripts/constants.py` for maintainability — 2025-11-14 - Docs: Added `docs/bill_of_materials.md` (Bill of Materials) — 2025-11-14 - Docs: Added official product links to `docs/bill_of_materials.md` — 2025-11-14 - Docs: Added prices to `docs/bill_of_materials.md` (prices as of 11/6/2025) — 2025-11-14 diff --git a/docs/raspberry-pi-setup.md b/docs/raspberry-pi-setup.md index 53c2c15..fa2c708 100644 --- a/docs/raspberry-pi-setup.md +++ b/docs/raspberry-pi-setup.md @@ -6,8 +6,8 @@ This document describes the setup required to run the desk lifter control script - Raspberry Pi 5 (with BCM2712 SoC) - GPIO pins connected to the desk lifter motor controller: - - UP_PIN: GPIO 17 (physical pin 11) - - DOWN_PIN: GPIO 27 (physical pin 13) + - UP_PIN: GPIO 18 (physical pin 12) + - DOWN_PIN: GPIO 17 (physical pin 11) - Power supply: 5V USB-C (at least 3A, preferably 5A for high-power peripherals) ## Software Requirements @@ -41,16 +41,10 @@ The standard `RPi.GPIO` library does not support Raspberry Pi 5 due to changes i 4. **Remove incompatible RPi.GPIO package** (if installed): ```bash - sudo apt remove -y python3-rpi.gpioRP + sudo apt remove -y python3-rpi.gpio ``` -5. **Install rpi-lgpio in the virtual environment**: - ```bash - source venv/bin/activate - pip install rpi-lgpio - ``` - -6. **Ensure user is in the gpio group** (for GPIO access without sudo): +5. **Ensure user is in the gpio group** (for GPIO access without sudo): ```bash sudo usermod -a -G gpio $USER ``` @@ -68,6 +62,8 @@ python move_to_height_no_reset.py Available scripts: - `move_to_height_no_reset.py`: Move the desk to a target height +- `move_to_height.py`: Alternative height control script +- `desk_control_prefect.py`: Prefect-based workflow orchestration - `test_up.py`: Test upward movement - `test_down.py`: Test downward movement - `reset_to_lowest.py`: Reset to lowest position diff --git a/lifter_state.json b/lifter_state.json deleted file mode 100644 index cc5dbf6..0000000 --- a/lifter_state.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "total_up_time": 32.40740740740741, - "last_position": 23.7, - "last_reset_time": "2025-11-14T23:55:54.119832", - "current_window_usage": 7.340067340067339 -} \ No newline at end of file diff --git a/scripts/constants.py b/scripts/constants.py new file mode 100644 index 0000000..c80c6a5 --- /dev/null +++ b/scripts/constants.py @@ -0,0 +1,4 @@ +# GPIO pin constants for desk lifter control +# BCM numbering +UP_PIN = 18 # Physical pin 12 +DOWN_PIN = 17 # Physical pin 11 \ No newline at end of file diff --git a/scripts/desk_control_prefect.py b/scripts/desk_control_prefect.py index 6a41280..fe4d35b 100644 --- a/scripts/desk_control_prefect.py +++ b/scripts/desk_control_prefect.py @@ -3,7 +3,9 @@ import os from datetime import datetime, timedelta from prefect import flow, task +from prefect.logging import get_run_logger import RPi.GPIO as GPIO +from constants import UP_PIN, DOWN_PIN # Calibration data LOWEST_HEIGHT = 23.7 # inches @@ -12,11 +14,10 @@ DOWN_RATE = 0.55 # inches per second STATE_FILE = "lifter_state.json" -MAX_USAGE_TIME = 120 # 2 minutes in seconds -RESET_WINDOW = 1200 # 20 minutes in seconds - -UP_PIN = 17 # BCM numbering, physical pin 11 -DOWN_PIN = 27 # BCM numbering, physical pin 13 +DUTY_CYCLE_PERIOD = 1200 # 20 minutes in seconds +DUTY_CYCLE_MAX_ON_TIME = 120 # 2 minutes in seconds (10% of 20 minutes) +DUTY_CYCLE_PERCENTAGE = 0.10 # 10% duty cycle +MAX_CONTINUOUS_RUNTIME = 30 # Maximum continuous movement time in seconds GPIO.setmode(GPIO.BCM) @@ -61,8 +62,7 @@ def load_state(): return { "total_up_time": 0.0, "last_position": None, - "last_reset_time": datetime.now().isoformat(), - "current_window_usage": 0.0 + "usage_periods": [] # List of [start_timestamp, end_timestamp, duration] entries } @task @@ -71,50 +71,110 @@ def save_state(state): with open(STATE_FILE, 'w') as f: json.dump(state, f, indent=2) +@task +def clean_old_usage_periods(state): + """Remove usage periods older than the duty cycle period""" + current_time = time.time() + cutoff_time = current_time - DUTY_CYCLE_PERIOD + + # Keep only periods that end after the cutoff time + state["usage_periods"] = [ + period for period in state["usage_periods"] + if period[1] > cutoff_time # period[1] is end_timestamp + ] + return state + +@task +def get_current_duty_cycle_usage(state): + """Calculate current duty cycle usage in the sliding window""" + clean_old_usage_periods(state) + current_time = time.time() + + total_usage = 0.0 + for start_time, end_time, duration in state["usage_periods"]: + # Only count usage that's within the duty cycle period + window_start = current_time - DUTY_CYCLE_PERIOD + + # Adjust start and end times to the current window + effective_start = max(start_time, window_start) + effective_end = min(end_time, current_time) + + if effective_end > effective_start: + total_usage += effective_end - effective_start + + return total_usage + +@task +def get_remaining_duty_time(state): + """Get remaining duty cycle time in seconds""" + current_usage = get_current_duty_cycle_usage(state) + return max(0, DUTY_CYCLE_MAX_ON_TIME - current_usage) + +@task +def record_usage_period(state, start_time, end_time, duration): + """Record a usage period in the duty cycle tracking""" + state["usage_periods"].append([start_time, end_time, duration]) + return state + @task def check_timing_limits(state, required_time): - """Check if the movement is within timing limits""" - current_time = datetime.now() - last_reset = datetime.fromisoformat(state["last_reset_time"]) - - # Check if 20-minute window has passed - if (current_time - last_reset).total_seconds() >= RESET_WINDOW: - # Reset the usage window - state["last_reset_time"] = current_time.isoformat() - state["current_window_usage"] = 0.0 - return True, state - - # Check if adding this movement would exceed the 2-minute limit - if state["current_window_usage"] + required_time > MAX_USAGE_TIME: - remaining_time = MAX_USAGE_TIME - state["current_window_usage"] - raise ValueError(f"Movement would exceed 2-minute limit. Remaining time: {remaining_time:.1f}s") + """Check if the movement is within duty cycle limits using sliding window""" + logger = get_run_logger() + + # Clean old periods and get current usage + state = clean_old_usage_periods(state) + current_usage = get_current_duty_cycle_usage(state) + # Check continuous runtime limit + if required_time > MAX_CONTINUOUS_RUNTIME: + raise ValueError(f"Movement duration {required_time:.1f}s exceeds maximum continuous runtime of {MAX_CONTINUOUS_RUNTIME}s") + + # Check if adding this movement would exceed the duty cycle limit + if current_usage + required_time > DUTY_CYCLE_MAX_ON_TIME: + remaining_time = DUTY_CYCLE_MAX_ON_TIME - current_usage + raise ValueError(f"Movement would exceed {DUTY_CYCLE_PERCENTAGE*100:.0f}% duty cycle limit. Current usage: {current_usage:.1f}s, Remaining: {remaining_time:.1f}s in {DUTY_CYCLE_PERIOD}s window") + + logger.info(f"Duty cycle OK: {current_usage:.1f}s + {required_time:.1f}s <= {DUTY_CYCLE_MAX_ON_TIME}s ({DUTY_CYCLE_PERCENTAGE*100:.0f}% of {DUTY_CYCLE_PERIOD}s)") return True, state @task def move_up(up_time): - """Execute upward movement for specified time""" - print(f"Moving UP for {up_time:.1f} seconds...") + """Execute upward movement for specified time with duty cycle tracking""" + logger = get_run_logger() + logger.info(f"Moving UP for {up_time:.1f} seconds...") + release_up() - # Removed input() for automated execution + start_time = time.time() press_up() time.sleep(up_time) release_up() + end_time = time.time() + actual_duration = end_time - start_time + + logger.info(f"UP movement completed: {actual_duration:.1f}s actual") + return start_time, end_time, actual_duration @task def move_down(down_time): - """Execute downward movement for specified time""" - print(f"Moving DOWN for {down_time:.1f} seconds...") + """Execute downward movement for specified time with duty cycle tracking""" + logger = get_run_logger() + logger.info(f"Moving DOWN for {down_time:.1f} seconds...") + release_down() - # Removed input() for automated execution + start_time = time.time() press_down() time.sleep(down_time) release_down() + end_time = time.time() + actual_duration = end_time - start_time + + logger.info(f"DOWN movement completed: {actual_duration:.1f}s actual") + return start_time, end_time, actual_duration -#@flow -@task +@flow def move_to_height_flow(target_height: float, current_height: float): """Main flow to move desk to target height with safety checks""" + logger = get_run_logger() # Validate height range if not (LOWEST_HEIGHT <= target_height <= HIGHEST_HEIGHT): @@ -130,7 +190,7 @@ def move_to_height_flow(target_height: float, current_height: float): # Calculate movement requirements delta = target_height - current_height if abs(delta) < 0.01: - print(f"Already at {target_height}'' (within tolerance). No movement needed.") + logger.info(f"Already at {target_height}'' (within tolerance). No movement needed.") return if delta > 0: @@ -138,34 +198,41 @@ def move_to_height_flow(target_height: float, current_height: float): up_time = delta / UP_RATE # Check timing limits - check_timing_limits(state, up_time) + is_valid, updated_state = check_timing_limits(state, up_time) + state = updated_state - # Execute movement - move_up(up_time) + # Execute movement and get actual timing + start_time, end_time, actual_duration = move_up(up_time) - # Update state - state["total_up_time"] += up_time - state["current_window_usage"] += up_time + # Record the usage period and update state + state = record_usage_period(state, start_time, end_time, actual_duration) + state["total_up_time"] += actual_duration else: # Moving down down_time = abs(delta) / DOWN_RATE # Check timing limits - check_timing_limits(state, down_time) + is_valid, updated_state = check_timing_limits(state, down_time) + state = updated_state - # Execute movement - move_down(down_time) + # Execute movement and get actual timing + start_time, end_time, actual_duration = move_down(down_time) - # Update state (down time counts toward window usage but not total_up_time) - state["current_window_usage"] += down_time + # Record the usage period (down time counts toward duty cycle but not total_up_time) + state = record_usage_period(state, start_time, end_time, actual_duration) # Update position and save state state["last_position"] = target_height save_state(state) - print(f"Arrived at {target_height}'' (approximate). State saved.") - print(f"Window usage: {state['current_window_usage']:.1f}s / {MAX_USAGE_TIME}s") - print(f"Total up time: {state['total_up_time']:.1f}s") + # Get current duty cycle info for logging + current_usage = get_current_duty_cycle_usage(state) + remaining_time = get_remaining_duty_time(state) + + logger.info(f"Arrived at {target_height}'' (approximate). State saved.") + logger.info(f"Duty cycle usage: {current_usage:.1f}s / {DUTY_CYCLE_MAX_ON_TIME}s ({current_usage/DUTY_CYCLE_MAX_ON_TIME*100:.1f}%)") + logger.info(f"Remaining duty time: {remaining_time:.1f}s") + logger.info(f"Total up time: {state['total_up_time']:.1f}s") finally: # Always clean up GPIO @@ -222,7 +289,7 @@ def desk_control_cli(): ).deploy( name="desk-lifter-test-sequence-1139pm-toronto", work_pool_name="default-agent-pool", - #cron="39 4 * * *", # Run daily at 11:39 PM Toronto time (4:39 AM UTC) + cron="39 4 * * *", # Run daily at 11:39 PM Toronto time (4:39 AM UTC) ) print("Deployment created! Run 'prefect worker start --pool default-agent-pool' to execute scheduled flows.") elif len(sys.argv) > 1 and sys.argv[1] == "test": diff --git a/scripts/desk_controller.py b/scripts/desk_controller.py new file mode 100644 index 0000000..66beb43 --- /dev/null +++ b/scripts/desk_controller.py @@ -0,0 +1,215 @@ +""" +High-level desk controller with height management and safety checks. + +Combines movement control and duty cycle management to provide safe desk operations. +Handles height calculations, movement planning, and state management. +""" + +from typing import Optional +from duty_cycle import ( + load_state, save_state, check_duty_cycle_limits, + record_usage_period, get_duty_cycle_status +) +from movement_control import setup_gpio, cleanup_gpio, move_up, move_down + +# Calibration data +LOWEST_HEIGHT = 23.7 # inches +HIGHEST_HEIGHT = 54.5 # inches +UP_RATE = 0.54 # inches per second +DOWN_RATE = 0.55 # inches per second + + +def move_to_height(target_height: float, current_height: Optional[float] = None) -> dict: + """ + Move desk to target height with safety checks and duty cycle enforcement + + Args: + target_height: Desired height in inches + current_height: Current height in inches (if None, uses last known position) + + Returns: + dict with movement results and status information + """ + # Validate height range + if not (LOWEST_HEIGHT <= target_height <= HIGHEST_HEIGHT): + raise ValueError(f"Target height {target_height}'' is out of range [{LOWEST_HEIGHT}-{HIGHEST_HEIGHT}].") + + # Setup GPIO + setup_gpio() + + try: + # Load current state + state = load_state() + + # Determine current height + if current_height is None: + if state["last_position"] is None: + raise ValueError("No current height provided and no last known position in state file.") + current_height = state["last_position"] + + # Calculate movement requirements + delta = target_height - current_height + if abs(delta) < 0.01: + print(f"Already at {target_height}'' (within tolerance). No movement needed.") + return { + "success": True, + "movement": "none", + "message": f"Already at target height {target_height}''", + "duty_cycle": get_duty_cycle_status(state) + } + + if delta > 0: + # Moving up + required_time = delta / UP_RATE + direction = "up" + + # Check duty cycle limits + is_valid, updated_state, message = check_duty_cycle_limits(state, required_time) + state = updated_state + + if not is_valid: + raise ValueError(message) + + print(message) + + # Execute movement and get actual timing + start_time, end_time, actual_duration = move_up(required_time) + + # Record the usage period and update state + state = record_usage_period(state, start_time, end_time, actual_duration) + state["total_up_time"] += actual_duration + else: + # Moving down + required_time = abs(delta) / DOWN_RATE + direction = "down" + + # Check duty cycle limits + is_valid, updated_state, message = check_duty_cycle_limits(state, required_time) + state = updated_state + + if not is_valid: + raise ValueError(message) + + print(message) + + # Execute movement and get actual timing + start_time, end_time, actual_duration = move_down(required_time) + + # Record the usage period (down time counts toward duty cycle but not total_up_time) + state = record_usage_period(state, start_time, end_time, actual_duration) + + # Update position and save state + state["last_position"] = target_height + save_state(state) + + # Get final duty cycle info + duty_status = get_duty_cycle_status(state) + + print(f"Arrived at {target_height}'' (approximate). State saved.") + print(f"Duty cycle usage: {duty_status['current_usage']:.1f}s / {duty_status['max_usage']}s ({duty_status['percentage_used']:.1f}%)") + print(f"Remaining duty time: {duty_status['remaining_time']:.1f}s") + print(f"Total up time: {state['total_up_time']:.1f}s") + + return { + "success": True, + "movement": direction, + "start_height": current_height, + "end_height": target_height, + "distance": abs(delta), + "duration": actual_duration, + "duty_cycle": duty_status, + "total_up_time": state["total_up_time"] + } + + except Exception as e: + print(f"Error during movement: {e}") + return { + "success": False, + "error": str(e), + "duty_cycle": get_duty_cycle_status(load_state()) + } + + finally: + # Always clean up GPIO + cleanup_gpio() + + +def test_sequence(movement_distance: float = 0.5, rest_time: float = 10.0) -> dict: + """ + Execute a test sequence: move up, rest, move down + + Args: + movement_distance: Distance to move in inches + rest_time: Time to rest between movements in seconds + + Returns: + dict with test results + """ + start_height = LOWEST_HEIGHT + up_target = start_height + movement_distance + + print("Starting test sequence...") + print(f"Starting at: {start_height}\"") + print(f"Will move to: {up_target}\"") + print(f"Then rest for {rest_time} seconds") + print(f"Then return to: {start_height}\"") + + results = [] + + # Phase 1: Move up + print(f"\n--- Phase 1: Moving UP {movement_distance} inches ---") + result1 = move_to_height(up_target, start_height) + results.append(result1) + + if not result1["success"]: + return {"success": False, "phase": 1, "error": result1["error"]} + + # Phase 2: Rest + print(f"\n--- Phase 2: Resting for {rest_time} seconds ---") + import time + time.sleep(rest_time) + print("Rest complete.") + + # Phase 3: Move down + print(f"\n--- Phase 3: Moving DOWN {movement_distance} inches ---") + result2 = move_to_height(start_height, up_target) + results.append(result2) + + if not result2["success"]: + return {"success": False, "phase": 3, "error": result2["error"]} + + print("\nTest sequence complete!") + + return { + "success": True, + "results": results, + "total_duration": sum(r.get("duration", 0) for r in results if r["success"]), + "final_duty_cycle": results[-1]["duty_cycle"] if results else None + } + + +def cli_interface(): + """Command-line interface for desk control""" + try: + current = float(input(f"Enter current height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) + target = float(input(f"Enter target height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) + result = move_to_height(target, current) + + if result["success"]: + print("Movement completed successfully!") + else: + print(f"Movement failed: {result['error']}") + + except ValueError as e: + print(f"Error: {e}") + except KeyboardInterrupt: + print("\nOperation cancelled.") + + +if __name__ == "__main__": + import sys + + if len(sys.argv) > 1 and sys.argv[1] == "test": + test_sequence() + else: + cli_interface() \ No newline at end of file diff --git a/scripts/duty_cycle.py b/scripts/duty_cycle.py new file mode 100644 index 0000000..9f7060f --- /dev/null +++ b/scripts/duty_cycle.py @@ -0,0 +1,124 @@ +""" +Duty cycle management for motor protection. + +Implements a 10% duty cycle (2 minutes on, 18 minutes off) using a sliding window approach. +Tracks individual usage periods and enforces both continuous runtime and total usage limits. +""" + +import time +import json +import os +from datetime import datetime +from typing import List, Tuple, Dict, Any + +# Duty cycle constants +DUTY_CYCLE_PERIOD = 1200 # 20 minutes in seconds +DUTY_CYCLE_MAX_ON_TIME = 120 # 2 minutes in seconds (10% of 20 minutes) +DUTY_CYCLE_PERCENTAGE = 0.10 # 10% duty cycle +MAX_CONTINUOUS_RUNTIME = 30 # Maximum continuous movement time in seconds + +STATE_FILE = "lifter_state.json" + + +def load_state() -> Dict[str, Any]: + """Load the current state from JSON file""" + if os.path.exists(STATE_FILE): + with open(STATE_FILE, 'r') as f: + return json.load(f) + return { + "total_up_time": 0.0, + "last_position": None, + "usage_periods": [] # List of [start_timestamp, end_timestamp, duration] entries + } + + +def save_state(state: Dict[str, Any]) -> None: + """Save state to JSON file""" + with open(STATE_FILE, 'w') as f: + json.dump(state, f, indent=2) + + +def clean_old_usage_periods(state: Dict[str, Any]) -> Dict[str, Any]: + """Remove usage periods older than the duty cycle period""" + current_time = time.time() + cutoff_time = current_time - DUTY_CYCLE_PERIOD + + # Keep only periods that end after the cutoff time + state["usage_periods"] = [ + period for period in state["usage_periods"] + if period[1] > cutoff_time # period[1] is end_timestamp + ] + return state + + +def get_current_duty_cycle_usage(state: Dict[str, Any]) -> float: + """Calculate current duty cycle usage in the sliding window""" + clean_old_usage_periods(state) + current_time = time.time() + + total_usage = 0.0 + for start_time, end_time, duration in state["usage_periods"]: + # Only count usage that's within the duty cycle period + window_start = current_time - DUTY_CYCLE_PERIOD + + # Adjust start and end times to the current window + effective_start = max(start_time, window_start) + effective_end = min(end_time, current_time) + + if effective_end > effective_start: + total_usage += effective_end - effective_start + + return total_usage + + +def get_remaining_duty_time(state: Dict[str, Any]) -> float: + """Get remaining duty cycle time in seconds""" + current_usage = get_current_duty_cycle_usage(state) + return max(0, DUTY_CYCLE_MAX_ON_TIME - current_usage) + + +def record_usage_period(state: Dict[str, Any], start_time: float, end_time: float, duration: float) -> Dict[str, Any]: + """Record a usage period in the duty cycle tracking""" + state["usage_periods"].append([start_time, end_time, duration]) + return state + + +def check_duty_cycle_limits(state: Dict[str, Any], required_time: float) -> Tuple[bool, Dict[str, Any], str]: + """ + Check if the movement is within duty cycle limits using sliding window + + Returns: + (is_valid, updated_state, info_message) + """ + # Clean old periods and get current usage + state = clean_old_usage_periods(state) + current_usage = get_current_duty_cycle_usage(state) + + # Check continuous runtime limit + if required_time > MAX_CONTINUOUS_RUNTIME: + error_msg = f"Movement duration {required_time:.1f}s exceeds maximum continuous runtime of {MAX_CONTINUOUS_RUNTIME}s" + return False, state, error_msg + + # Check if adding this movement would exceed the duty cycle limit + if current_usage + required_time > DUTY_CYCLE_MAX_ON_TIME: + remaining_time = DUTY_CYCLE_MAX_ON_TIME - current_usage + error_msg = f"Movement would exceed {DUTY_CYCLE_PERCENTAGE*100:.0f}% duty cycle limit. Current usage: {current_usage:.1f}s, Remaining: {remaining_time:.1f}s in {DUTY_CYCLE_PERIOD}s window" + return False, state, error_msg + + info_msg = f"Duty cycle OK: {current_usage:.1f}s + {required_time:.1f}s <= {DUTY_CYCLE_MAX_ON_TIME}s ({DUTY_CYCLE_PERCENTAGE*100:.0f}% of {DUTY_CYCLE_PERIOD}s)" + return True, state, info_msg + + +def get_duty_cycle_status(state: Dict[str, Any]) -> Dict[str, float]: + """Get current duty cycle status information""" + current_usage = get_current_duty_cycle_usage(state) + remaining_time = get_remaining_duty_time(state) + percentage_used = current_usage / DUTY_CYCLE_MAX_ON_TIME * 100 + + return { + "current_usage": current_usage, + "max_usage": DUTY_CYCLE_MAX_ON_TIME, + "remaining_time": remaining_time, + "percentage_used": percentage_used, + "window_period": DUTY_CYCLE_PERIOD + } \ No newline at end of file diff --git a/scripts/move_to_height.py b/scripts/move_to_height.py index 46b986d..8e3b688 100644 --- a/scripts/move_to_height.py +++ b/scripts/move_to_height.py @@ -1,14 +1,12 @@ import time import RPi.GPIO as GPIO +from constants import UP_PIN, DOWN_PIN # Calibration data LOWEST_HEIGHT = 23.7 # inches HIGHEST_HEIGHT = 54.5 # inches UP_RATE = 0.54 # inches per second (from calibration) -UP_PIN = 17 # BCM numbering, physical pin 11 -DOWN_PIN = 27 # BCM numbering, physical pin 13 - GPIO.setmode(GPIO.BCM) def release_up(): diff --git a/scripts/move_to_height_no_reset.py b/scripts/move_to_height_no_reset.py index f2315ad..f752d12 100644 --- a/scripts/move_to_height_no_reset.py +++ b/scripts/move_to_height_no_reset.py @@ -2,6 +2,7 @@ import json import os import RPi.GPIO as GPIO +from constants import UP_PIN, DOWN_PIN # Calibration data LOWEST_HEIGHT = 23.7 # inches @@ -11,9 +12,6 @@ STATE_FILE = "lifter_state.json" -UP_PIN = 17 # BCM numbering, physical pin 11 -DOWN_PIN = 27 # BCM numbering, physical pin 13 - GPIO.setmode(GPIO.BCM) def release_up(): diff --git a/scripts/movement_control.py b/scripts/movement_control.py new file mode 100644 index 0000000..9029544 --- /dev/null +++ b/scripts/movement_control.py @@ -0,0 +1,107 @@ +""" +GPIO-based movement control for the desk lifter. + +Handles low-level GPIO operations, pin management, and movement execution. +Provides safe pin control with proper initialization and cleanup. +""" + +import time +from typing import Tuple +try: + import RPi.GPIO as GPIO +except ImportError: + # For testing without actual GPIO hardware + class MockGPIO: + BCM = "BCM" + OUT = "OUT" + IN = "IN" + LOW = 0 + HIGH = 1 + PUD_OFF = "PUD_OFF" + + @staticmethod + def setmode(mode): pass + @staticmethod + def setup(pin, mode, **kwargs): pass + @staticmethod + def cleanup(): pass + + GPIO = MockGPIO() + +# Pin assignments +UP_PIN = 17 # BCM numbering, physical pin 11 +DOWN_PIN = 27 # BCM numbering, physical pin 13 + + +def setup_gpio() -> None: + """Initialize GPIO settings""" + GPIO.setmode(GPIO.BCM) + + +def release_up() -> None: + """Set UP pin to high-impedance state""" + GPIO.setup(UP_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) + + +def press_up() -> None: + """Set UP pin to drive low (button pressed)""" + GPIO.setup(UP_PIN, GPIO.OUT, initial=GPIO.LOW) + + +def release_down() -> None: + """Set DOWN pin to high-impedance state""" + GPIO.setup(DOWN_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) + + +def press_down() -> None: + """Set DOWN pin to drive low (button pressed)""" + GPIO.setup(DOWN_PIN, GPIO.OUT, initial=GPIO.LOW) + + +def cleanup_gpio() -> None: + """Clean up GPIO resources""" + release_up() + release_down() + GPIO.cleanup() + + +def move_up(up_time: float) -> Tuple[float, float, float]: + """ + Execute upward movement for specified time + + Returns: + (start_time, end_time, actual_duration) + """ + print(f"Moving UP for {up_time:.1f} seconds...") + + release_up() + start_time = time.time() + press_up() + time.sleep(up_time) + release_up() + end_time = time.time() + actual_duration = end_time - start_time + + print(f"UP movement completed: {actual_duration:.1f}s actual") + return start_time, end_time, actual_duration + + +def move_down(down_time: float) -> Tuple[float, float, float]: + """ + Execute downward movement for specified time + + Returns: + (start_time, end_time, actual_duration) + """ + print(f"Moving DOWN for {down_time:.1f} seconds...") + + release_down() + start_time = time.time() + press_down() + time.sleep(down_time) + release_down() + end_time = time.time() + actual_duration = end_time - start_time + + print(f"DOWN movement completed: {actual_duration:.1f}s actual") + return start_time, end_time, actual_duration \ No newline at end of file diff --git a/scripts/prefect_flows.py b/scripts/prefect_flows.py new file mode 100644 index 0000000..9043a71 --- /dev/null +++ b/scripts/prefect_flows.py @@ -0,0 +1,159 @@ +""" +Prefect flows and deployments for automated desk control. + +Provides scheduled automation and workflow orchestration using Prefect. +Integrates with the desk controller for safe, automated movements. +""" + +import time +from prefect import flow, task +from prefect.logging import get_run_logger + +# Import our modular components +from desk_controller import move_to_height, test_sequence, LOWEST_HEIGHT + + +@task +def log_info(message: str): + """Log information message""" + logger = get_run_logger() + logger.info(message) + print(message) + + +@task +def execute_movement(target_height: float, current_height: float = None): + """Execute a movement as a Prefect task""" + logger = get_run_logger() + + try: + result = move_to_height(target_height, current_height) + + if result["success"]: + logger.info(f"Movement successful: {result}") + return result + else: + logger.error(f"Movement failed: {result['error']}") + raise ValueError(result["error"]) + + except Exception as e: + logger.error(f"Movement execution failed: {e}") + raise + + +@task +def execute_test_sequence(movement_distance: float = 0.5, rest_time: float = 10.0): + """Execute test sequence as a Prefect task""" + logger = get_run_logger() + + try: + result = test_sequence(movement_distance, rest_time) + + if result["success"]: + logger.info(f"Test sequence successful: {result}") + return result + else: + logger.error(f"Test sequence failed: {result.get('error', 'Unknown error')}") + raise ValueError(result.get("error", "Test sequence failed")) + + except Exception as e: + logger.error(f"Test sequence execution failed: {e}") + raise + + +@flow +def move_to_height_flow(target_height: float, current_height: float = None): + """Prefect flow for moving to a specific height""" + log_info(f"Starting movement flow: target={target_height}, current={current_height}") + + result = execute_movement(target_height, current_height) + + log_info(f"Movement flow completed: {result}") + return result + + +@flow +def custom_test_sequence_flow(movement_distance: float = 0.5, rest_time: float = 10.0): + """Prefect flow for custom test sequence""" + log_info(f"Starting test sequence: distance={movement_distance}, rest={rest_time}") + + start_height = LOWEST_HEIGHT + up_target = start_height + movement_distance + + log_info(f"Test sequence plan:") + log_info(f" Starting at: {start_height}\"") + log_info(f" Will move to: {up_target}\"") + log_info(f" Then rest for {rest_time} seconds") + log_info(f" Then return to: {start_height}\"") + + # Phase 1: Move up + log_info(f"--- Phase 1: Moving UP {movement_distance} inches ---") + result1 = execute_movement(up_target, start_height) + + # Phase 2: Rest + log_info(f"--- Phase 2: Resting for {rest_time} seconds ---") + time.sleep(rest_time) + log_info("Rest complete.") + + # Phase 3: Move down + log_info(f"--- Phase 3: Moving DOWN {movement_distance} inches ---") + result2 = execute_movement(start_height, up_target) + + log_info("Custom test sequence complete!") + + return { + "success": True, + "phase1_result": result1, + "phase2_result": result2, + "total_duration": result1.get("duration", 0) + result2.get("duration", 0) + } + + +@flow +def desk_control_cli_flow(): + """Prefect flow for CLI-based desk control""" + log_info("Starting CLI flow") + + try: + from desk_controller import LOWEST_HEIGHT, HIGHEST_HEIGHT + + current = float(input(f"Enter current height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) + target = float(input(f"Enter target height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) + + result = execute_movement(target, current) + log_info("CLI flow completed successfully!") + return result + + except ValueError as e: + log_info(f"Error: {e}") + raise + except KeyboardInterrupt: + log_info("Operation cancelled.") + raise + + +def deploy_test_sequence(schedule_cron: str = "39 4 * * *", deployment_name: str = "desk-lifter-test-sequence-1139pm-toronto"): + """Deploy the test sequence with scheduling""" + + custom_test_sequence_flow.from_source( + source=".", + entrypoint="scripts/prefect_flows.py:custom_test_sequence_flow", + ).deploy( + name=deployment_name, + work_pool_name="default-agent-pool", + cron=schedule_cron, # Default: Run daily at 11:39 PM Toronto time (4:39 AM UTC) + ) + + print(f"Deployment '{deployment_name}' created with schedule '{schedule_cron}'!") + print("Run 'prefect worker start --pool default-agent-pool' to execute scheduled flows.") + + +if __name__ == "__main__": + import sys + + if len(sys.argv) > 1 and sys.argv[1] == "deploy": + deploy_test_sequence() + elif len(sys.argv) > 1 and sys.argv[1] == "test": + custom_test_sequence_flow() + else: + desk_control_cli_flow() \ No newline at end of file diff --git a/scripts/reset_to_lowest.py b/scripts/reset_to_lowest.py index 0882b7a..b7a4933 100644 --- a/scripts/reset_to_lowest.py +++ b/scripts/reset_to_lowest.py @@ -1,7 +1,6 @@ import time import RPi.GPIO as GPIO - -DOWN_PIN = 27 # BCM numbering, physical pin 13 +from constants import DOWN_PIN GPIO.setmode(GPIO.BCM) diff --git a/scripts/test_down.py b/scripts/test_down.py index c80765f..cf8f694 100644 --- a/scripts/test_down.py +++ b/scripts/test_down.py @@ -1,7 +1,6 @@ import time import RPi.GPIO as GPIO - -DOWN_PIN = 27 # BCM numbering, physical pin 13 +from constants import DOWN_PIN GPIO.setmode(GPIO.BCM) diff --git a/scripts/test_up.py b/scripts/test_up.py index 1078c4c..7f0ad1c 100644 --- a/scripts/test_up.py +++ b/scripts/test_up.py @@ -1,8 +1,7 @@ # test_up.py import time import RPi.GPIO as GPIO - -UP_PIN = 17 # BCM numbering, physical pin 11 +from constants import UP_PIN GPIO.setmode(GPIO.BCM) From f2eab55af9d79264b68372f6f877abbee321ea63 Mon Sep 17 00:00:00 2001 From: sgbaird Date: Tue, 18 Nov 2025 14:08:18 -0500 Subject: [PATCH 5/7] feat: update changelog, add README for desk control scripts, and refactor GPIO pin management --- CHANGELOG.md | 1 + scripts/README.md | 93 ++++++++++++ scripts/constants.py | 10 +- ...fect.py => desk_control_prefect_LEGACY.py} | 0 scripts/desk_controller.py | 13 +- scripts/duty_cycle.py | 33 +++-- scripts/main.py | 136 ++++++++++++++++++ scripts/move_to_height.py | 53 ------- scripts/move_to_height_no_reset.py | 88 ------------ scripts/movement_control.py | 9 +- scripts/prefect_flows.py | 19 ++- scripts/reset_to_lowest.py | 32 ----- scripts/test_down.py | 31 ---- scripts/test_up.py | 32 ----- 14 files changed, 292 insertions(+), 258 deletions(-) create mode 100644 scripts/README.md rename scripts/{desk_control_prefect.py => desk_control_prefect_LEGACY.py} (100%) create mode 100644 scripts/main.py delete mode 100644 scripts/move_to_height.py delete mode 100644 scripts/move_to_height_no_reset.py delete mode 100644 scripts/reset_to_lowest.py delete mode 100644 scripts/test_down.py delete mode 100644 scripts/test_up.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 377ee07..e22bc6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - FIX: nasty bug #1729 fixed - add your changes here! +- Hardware: Changed GPIO pin assignments - UP: GPIO 18, DOWN: GPIO 17 — 2025-11-18 - Refactor: Centralized GPIO pin constants in `scripts/constants.py` for maintainability — 2025-11-14 - Docs: Added `docs/bill_of_materials.md` (Bill of Materials) — 2025-11-14 - Docs: Added official product links to `docs/bill_of_materials.md` — 2025-11-14 diff --git a/scripts/README.md b/scripts/README.md new file mode 100644 index 0000000..0bbd1ce --- /dev/null +++ b/scripts/README.md @@ -0,0 +1,93 @@ +# Desk Control Scripts + +This directory contains scripts for controlling a Progressive Automations desk lifter. + +## 🚀 Quick Start + +**For the modern modular system:** +```bash +# Navigate to the modular package +cd desk_control/ + +# Run interactive CLI +python main.py + +# Run test sequence +python main.py test + +# Check duty cycle status +python main.py status + +# Deploy Prefect automation +python main.py deploy +``` + +## 📁 Directory Structure + +``` +scripts/ +├── desk_control/ # 🆕 Modern modular system +│ ├── main.py # Main CLI interface +│ ├── desk_controller.py # High-level control logic +│ ├── movement_control.py # GPIO operations +│ ├── duty_cycle.py # Motor protection +│ ├── prefect_flows.py # Automation & scheduling +│ └── README.md # Detailed documentation +├── constants.py # Shared constants (pins, calibration) +├── lifter_state.json # Persistent state file +├── lifter_calibration.txt # Calibration data +└── desk_control_prefect_LEGACY.py # 📦 Original monolithic file (backup) +``` + +## ✨ What's New + +### **Modular Architecture** +The code has been refactored into focused, maintainable modules: + +- **`movement_control.py`** - GPIO pin control and movement execution +- **`duty_cycle.py`** - 10% duty cycle protection with sliding window +- **`desk_controller.py`** - Height management and safety checks +- **`prefect_flows.py`** - Workflow automation and scheduling +- **`main.py`** - Unified command-line interface + +### **Improved Duty Cycle** +- ✅ True sliding window (not hard resets every 20 minutes) +- ✅ Precise timestamp tracking of usage periods +- ✅ Automatic cleanup of old periods +- ✅ Real-time duty cycle status monitoring + +### **Better Safety** +- ✅ Comprehensive error handling +- ✅ GPIO cleanup in all scenarios +- ✅ Continuous runtime limits (30s max per movement) +- ✅ Height range validation + +## 📖 Usage Examples + +```bash +# Basic movement +python desk_control/main.py move 25.0 30.0 # Move from 25" to 30" + +# Test sequence with custom parameters +python desk_control/main.py test 1.0 5.0 # 1" movement, 5s rest + +# Duty cycle monitoring +python desk_control/main.py status + +# Prefect automation +python desk_control/main.py deploy "0 12 * * *" # Deploy for noon daily +``` + +## 🔧 Configuration + +Edit `constants.py` to adjust: +- GPIO pin assignments +- Calibration values (height range, movement rates) + +## 📚 Documentation + +See `desk_control/README.md` for detailed documentation of the modular system. + +## 🏛️ Legacy Code + +The original monolithic file is preserved as `desk_control_prefect_LEGACY.py` for reference, but the modular system in `desk_control/` should be used for all new development. \ No newline at end of file diff --git a/scripts/constants.py b/scripts/constants.py index c80c6a5..e6ac96b 100644 --- a/scripts/constants.py +++ b/scripts/constants.py @@ -1,4 +1,10 @@ # GPIO pin constants for desk lifter control # BCM numbering -UP_PIN = 18 # Physical pin 12 -DOWN_PIN = 17 # Physical pin 11 \ No newline at end of file +UP_PIN = 18 # BCM numbering, physical pin 12 +DOWN_PIN = 17 # BCM numbering, physical pin 11 + +# Calibration data +LOWEST_HEIGHT = 23.7 # inches +HIGHEST_HEIGHT = 54.5 # inches +UP_RATE = 0.54 # inches per second +DOWN_RATE = 0.55 # inches per second \ No newline at end of file diff --git a/scripts/desk_control_prefect.py b/scripts/desk_control_prefect_LEGACY.py similarity index 100% rename from scripts/desk_control_prefect.py rename to scripts/desk_control_prefect_LEGACY.py diff --git a/scripts/desk_controller.py b/scripts/desk_controller.py index 66beb43..255f707 100644 --- a/scripts/desk_controller.py +++ b/scripts/desk_controller.py @@ -12,11 +12,14 @@ ) from movement_control import setup_gpio, cleanup_gpio, move_up, move_down -# Calibration data -LOWEST_HEIGHT = 23.7 # inches -HIGHEST_HEIGHT = 54.5 # inches -UP_RATE = 0.54 # inches per second -DOWN_RATE = 0.55 # inches per second +try: + from constants import LOWEST_HEIGHT, HIGHEST_HEIGHT, UP_RATE, DOWN_RATE +except ImportError: + # Fallback values if constants not available + LOWEST_HEIGHT = 23.7 # inches + HIGHEST_HEIGHT = 54.5 # inches + UP_RATE = 0.54 # inches per second + DOWN_RATE = 0.55 # inches per second def move_to_height(target_height: float, current_height: Optional[float] = None) -> dict: diff --git a/scripts/duty_cycle.py b/scripts/duty_cycle.py index 9f7060f..5b45c4e 100644 --- a/scripts/duty_cycle.py +++ b/scripts/duty_cycle.py @@ -8,6 +8,7 @@ import time import json import os +from constants import HEIGHT_MIN from datetime import datetime from typing import List, Tuple, Dict, Any @@ -20,16 +21,28 @@ STATE_FILE = "lifter_state.json" -def load_state() -> Dict[str, Any]: - """Load the current state from JSON file""" - if os.path.exists(STATE_FILE): - with open(STATE_FILE, 'r') as f: - return json.load(f) - return { - "total_up_time": 0.0, - "last_position": None, - "usage_periods": [] # List of [start_timestamp, end_timestamp, duration] entries - } +def load_state(): + """Load the current state from file""" + try: + with open(STATE_FILE, "r") as f: + state = json.load(f) + + # Ensure all required keys exist with proper defaults + if "usage_periods" not in state: + state["usage_periods"] = [] + if "last_position" not in state: + state["last_position"] = HEIGHT_MIN # Default to minimum height + if "total_up_time" not in state: + state["total_up_time"] = 0.0 + + return state + except FileNotFoundError: + # Return default state if file doesn't exist + return { + "usage_periods": [], + "last_position": HEIGHT_MIN, + "total_up_time": 0.0 + } def save_state(state: Dict[str, Any]) -> None: diff --git a/scripts/main.py b/scripts/main.py new file mode 100644 index 0000000..f9ab5cd --- /dev/null +++ b/scripts/main.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +""" +Main entry point for the desk control system. + +This script provides a unified interface to all desk control functionality: +- Direct movement control +- Test sequences +- Prefect deployment and automation +- Duty cycle monitoring + +Usage Examples: + python main.py # Interactive CLI + python main.py test # Run test sequence + python main.py deploy # Deploy Prefect automation + python main.py move 25.0 30.0 # Move from 25" to 30" + python main.py status # Show duty cycle status +""" + +import sys +import os + +# Add the scripts directory to the path for imports +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +# Import from the modular files +from desk_controller import move_to_height, test_sequence, cli_interface +from duty_cycle import get_duty_cycle_status, load_state + + +def show_help(): + """Display help information""" + print(__doc__) + + +def show_duty_cycle_status(): + """Show current duty cycle status""" + state = load_state() + status = get_duty_cycle_status(state) + + print("=== Duty Cycle Status ===") + print(f"Current usage: {status['current_usage']:.1f}s / {status['max_usage']}s") + print(f"Percentage used: {status['percentage_used']:.1f}%") + print(f"Remaining time: {status['remaining_time']:.1f}s") + print(f"Window period: {status['window_period']}s (20 minutes)") + print(f"Total periods tracked: {len(state['usage_periods'])}") + + if state.get("last_position"): + print(f"Last known position: {state['last_position']}\"") + if state.get("total_up_time"): + print(f"Total up time (all time): {state['total_up_time']:.1f}s") + + +def main(): + """Main entry point""" + if len(sys.argv) == 1: + # No arguments - run interactive CLI + cli_interface() + + elif sys.argv[1] == "help" or sys.argv[1] == "--help" or sys.argv[1] == "-h": + show_help() + + elif sys.argv[1] == "test": + # Run test sequence + distance = 0.5 # default + rest_time = 10.0 # default + + if len(sys.argv) > 2: + distance = float(sys.argv[2]) + if len(sys.argv) > 3: + rest_time = float(sys.argv[3]) + + result = test_sequence(distance, rest_time) + if not result["success"]: + print(f"Test failed: {result.get('error', 'Unknown error')}") + sys.exit(1) + + elif sys.argv[1] == "move": + # Direct movement command + if len(sys.argv) < 4: + print("Usage: python main.py move ") + sys.exit(1) + + current_height = float(sys.argv[2]) + target_height = float(sys.argv[3]) + + result = move_to_height(target_height, current_height) + if not result["success"]: + print(f"Movement failed: {result['error']}") + sys.exit(1) + + elif sys.argv[1] == "status": + # Show duty cycle status + show_duty_cycle_status() + + elif sys.argv[1] == "deploy": + if "--immediate" in sys.argv: + from prefect_flows import deploy_test_sequence_immediate + deploy_test_sequence_immediate() + else: + from prefect_flows import deploy_test_sequence + deploy_test_sequence() + + elif sys.argv[1] == "prefect-test": + # Run test sequence via Prefect + try: + from prefect_flows import custom_test_sequence_flow + + distance = 0.5 # default + rest_time = 10.0 # default + + if len(sys.argv) > 2: + distance = float(sys.argv[2]) + if len(sys.argv) > 3: + rest_time = float(sys.argv[3]) + + custom_test_sequence_flow(distance, rest_time) + + except ImportError as e: + print(f"Prefect not available: {e}") + print("Install with: pip install prefect") + sys.exit(1) + + else: + print(f"Unknown command: {sys.argv[1]}") + print("Run 'python main.py help' for usage information") + sys.exit(1) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\nOperation cancelled.") + except Exception as e: + print(f"Error: {e}") + sys.exit(1) \ No newline at end of file diff --git a/scripts/move_to_height.py b/scripts/move_to_height.py deleted file mode 100644 index 8e3b688..0000000 --- a/scripts/move_to_height.py +++ /dev/null @@ -1,53 +0,0 @@ -import time -import RPi.GPIO as GPIO -from constants import UP_PIN, DOWN_PIN - -# Calibration data -LOWEST_HEIGHT = 23.7 # inches -HIGHEST_HEIGHT = 54.5 # inches -UP_RATE = 0.54 # inches per second (from calibration) - -GPIO.setmode(GPIO.BCM) - -def release_up(): - GPIO.setup(UP_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) - -def press_up(): - GPIO.setup(UP_PIN, GPIO.OUT, initial=GPIO.LOW) - -def release_down(): - GPIO.setup(DOWN_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) - -def press_down(): - GPIO.setup(DOWN_PIN, GPIO.OUT, initial=GPIO.LOW) - -def move_to_height(target_height): - if not (LOWEST_HEIGHT <= target_height <= HIGHEST_HEIGHT): - raise ValueError(f"Target height {target_height}'' is out of range.") - # Step 1: Reset to lowest - print(f"Resetting to lowest position ({LOWEST_HEIGHT})...") - release_down() - input("Press Enter to move all the way DOWN (about 56 seconds from max height)...") - press_down() - time.sleep(56.0) - release_down() - print("At lowest position.") - # Step 2: Move up for calculated time - delta = target_height - LOWEST_HEIGHT - up_time = delta / UP_RATE - print(f"Moving UP for {up_time:.1f} seconds to reach {target_height}''...") - release_up() - input("Press Enter to move UP...") - press_up() - time.sleep(up_time) - release_up() - print(f"Arrived at {target_height}'' (approximate).") - -if __name__ == "__main__": - try: - target = float(input(f"Enter target height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) - move_to_height(target) - finally: - release_up() - release_down() - GPIO.cleanup() diff --git a/scripts/move_to_height_no_reset.py b/scripts/move_to_height_no_reset.py deleted file mode 100644 index f752d12..0000000 --- a/scripts/move_to_height_no_reset.py +++ /dev/null @@ -1,88 +0,0 @@ -import time -import json -import os -import RPi.GPIO as GPIO -from constants import UP_PIN, DOWN_PIN - -# Calibration data -LOWEST_HEIGHT = 23.7 # inches -HIGHEST_HEIGHT = 54.5 # inches -UP_RATE = 0.54 # inches per second -DOWN_RATE = 0.55 # inches per second - -STATE_FILE = "lifter_state.json" - -GPIO.setmode(GPIO.BCM) - -def release_up(): - GPIO.setup(UP_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) - -def press_up(): - GPIO.setup(UP_PIN, GPIO.OUT, initial=GPIO.LOW) - -def release_down(): - GPIO.setup(DOWN_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) - -def press_down(): - GPIO.setup(DOWN_PIN, GPIO.OUT, initial=GPIO.LOW) - -def load_state(): - if os.path.exists(STATE_FILE): - with open(STATE_FILE, 'r') as f: - return json.load(f) - return { - "total_up_time": 0.0, - "last_position": None - } - -def save_state(state): - with open(STATE_FILE, 'w') as f: - json.dump(state, f, indent=2) - -def move_to_height(target_height, current_height=None): - state = load_state() - if current_height is None: - if state["last_position"] is None: - raise ValueError("Current height unknown. Please provide current height.") - current_height = state["last_position"] - if not (LOWEST_HEIGHT <= target_height <= HIGHEST_HEIGHT): - raise ValueError(f"Target height {target_height}'' is out of range.") - delta = target_height - current_height - if abs(delta) < 0.01: - print(f"Already at {target_height}'' (within tolerance). No movement needed.") - return - if delta > 0: - # Move up - up_time = delta / UP_RATE - print(f"Moving UP for {up_time:.1f} seconds to reach {target_height}''...") - release_up() - input("Press Enter to move UP...") - press_up() - time.sleep(up_time) - release_up() - state["total_up_time"] += up_time - else: - # Move down - down_time = abs(delta) / DOWN_RATE - print(f"Moving DOWN for {down_time:.1f} seconds to reach {target_height}''...") - release_down() - input("Press Enter to move DOWN...") - press_down() - time.sleep(down_time) - release_down() - # Do not update total_up_time for down movement - state["last_position"] = target_height - save_state(state) - print(f"Arrived at {target_height}'' (approximate). State saved.") - -if __name__ == "__main__": - try: - # current = float(input(f"Enter current height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) - state = load_state() - print(state) - target = float(input(f"Enter target height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) - move_to_height(target) - finally: - release_up() - release_down() - GPIO.cleanup() diff --git a/scripts/movement_control.py b/scripts/movement_control.py index 9029544..178100c 100644 --- a/scripts/movement_control.py +++ b/scripts/movement_control.py @@ -7,8 +7,10 @@ import time from typing import Tuple + try: import RPi.GPIO as GPIO + from constants import UP_PIN, DOWN_PIN except ImportError: # For testing without actual GPIO hardware class MockGPIO: @@ -27,10 +29,9 @@ def setup(pin, mode, **kwargs): pass def cleanup(): pass GPIO = MockGPIO() - -# Pin assignments -UP_PIN = 17 # BCM numbering, physical pin 11 -DOWN_PIN = 27 # BCM numbering, physical pin 13 + # Use default pins if constants not available (match constants.py) + UP_PIN = 18 + DOWN_PIN = 17 def setup_gpio() -> None: diff --git a/scripts/prefect_flows.py b/scripts/prefect_flows.py index 9043a71..1856fbe 100644 --- a/scripts/prefect_flows.py +++ b/scripts/prefect_flows.py @@ -137,7 +137,7 @@ def deploy_test_sequence(schedule_cron: str = "39 4 * * *", deployment_name: str custom_test_sequence_flow.from_source( source=".", - entrypoint="scripts/prefect_flows.py:custom_test_sequence_flow", + entrypoint="prefect_flows.py:custom_test_sequence_flow", ).deploy( name=deployment_name, work_pool_name="default-agent-pool", @@ -148,6 +148,23 @@ def deploy_test_sequence(schedule_cron: str = "39 4 * * *", deployment_name: str print("Run 'prefect worker start --pool default-agent-pool' to execute scheduled flows.") +def deploy_test_sequence_immediate(deployment_name: str = "desk-lifter-test-immediate"): + """Deploy the test sequence without scheduling for immediate execution""" + + deployment = custom_test_sequence_flow.from_source( + source=".", + entrypoint="prefect_flows.py:custom_test_sequence_flow", + ).deploy( + name=deployment_name, + work_pool_name="default-process-pool", + # No cron schedule - runs on demand only + ) + + print(f"Deployment '{deployment_name}' created for immediate execution!") + print(f"To run immediately: prefect deployment run 'custom-test-sequence-flow/{deployment_name}'") + return deployment_name + + if __name__ == "__main__": import sys diff --git a/scripts/reset_to_lowest.py b/scripts/reset_to_lowest.py deleted file mode 100644 index b7a4933..0000000 --- a/scripts/reset_to_lowest.py +++ /dev/null @@ -1,32 +0,0 @@ -import time -import RPi.GPIO as GPIO -from constants import DOWN_PIN - -GPIO.setmode(GPIO.BCM) - -def release_down(): - # High-impedance: "button not pressed" - GPIO.setup(DOWN_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) - -def press_down(): - # Drive low: "button pressed" - GPIO.setup(DOWN_PIN, GPIO.OUT, initial=GPIO.LOW) - -try: - # This script is intended to move the desk from MAX height to MIN height. - # Ensure the desk is at the highest position before running. - release_down() - print("Ready. Desk should be at MAX height. It will now move to MIN height.") - - input("Press Enter to move all the way DOWN (about 56 seconds from max height)...") - - print("Moving DOWN to lowest position...") - press_down() - time.sleep(56.0) # 56s: calibrated for max to min travel (30.8" at 0.55 in/s) - release_down() - print("Released DOWN. Lifter should now be at lowest position.") - -finally: - # Ensure we leave the line released - release_down() - GPIO.cleanup() diff --git a/scripts/test_down.py b/scripts/test_down.py deleted file mode 100644 index cf8f694..0000000 --- a/scripts/test_down.py +++ /dev/null @@ -1,31 +0,0 @@ -import time -import RPi.GPIO as GPIO -from constants import DOWN_PIN - -GPIO.setmode(GPIO.BCM) - -def release_down(): - # High-impedance: "button not pressed" - GPIO.setup(DOWN_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) - -def press_down(): - # Drive low: "button pressed" - GPIO.setup(DOWN_PIN, GPIO.OUT, initial=GPIO.LOW) - -try: - # Make sure we're released at start - release_down() - print("Ready. Desk should NOT move yet.") - - input("Press Enter to move DOWN for 2 seconds...") - - print("Moving DOWN...") - press_down() - time.sleep(2.0) # adjust if you want a shorter/longer test - release_down() - print("Released DOWN. Test finished.") - -finally: - # Ensure we leave the line released - release_down() - GPIO.cleanup() diff --git a/scripts/test_up.py b/scripts/test_up.py deleted file mode 100644 index 7f0ad1c..0000000 --- a/scripts/test_up.py +++ /dev/null @@ -1,32 +0,0 @@ -# test_up.py -import time -import RPi.GPIO as GPIO -from constants import UP_PIN - -GPIO.setmode(GPIO.BCM) - -def release_up(): - # High-impedance: "button not pressed" - GPIO.setup(UP_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) - -def press_up(): - # Drive low: "button pressed" - GPIO.setup(UP_PIN, GPIO.OUT, initial=GPIO.LOW) - -try: - # Make sure we're released at start - release_up() - print("Ready. Desk should NOT move yet.") - - input("Press Enter to move UP for 2 seconds...") - - print("Moving UP...") - press_up() - time.sleep(2.0) # adjust if you want a shorter/longer test - release_up() - print("Released UP. Test finished.") - -finally: - # Ensure we leave the line released - release_up() - GPIO.cleanup() From 2390e0df9d2ce1b01f175859845fae889525c787 Mon Sep 17 00:00:00 2001 From: sgbaird Date: Tue, 18 Nov 2025 14:11:28 -0500 Subject: [PATCH 6/7] fix: update last_position to LOWEST_HEIGHT and adjust lifter state data --- scripts/duty_cycle.py | 6 +++--- scripts/lifter_state.json | 16 ++++++++++++++-- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/scripts/duty_cycle.py b/scripts/duty_cycle.py index 5b45c4e..3a82eb5 100644 --- a/scripts/duty_cycle.py +++ b/scripts/duty_cycle.py @@ -8,7 +8,7 @@ import time import json import os -from constants import HEIGHT_MIN +from constants import LOWEST_HEIGHT from datetime import datetime from typing import List, Tuple, Dict, Any @@ -31,7 +31,7 @@ def load_state(): if "usage_periods" not in state: state["usage_periods"] = [] if "last_position" not in state: - state["last_position"] = HEIGHT_MIN # Default to minimum height + state["last_position"] = LOWEST_HEIGHT # Default to minimum height if "total_up_time" not in state: state["total_up_time"] = 0.0 @@ -40,7 +40,7 @@ def load_state(): # Return default state if file doesn't exist return { "usage_periods": [], - "last_position": HEIGHT_MIN, + "last_position": LOWEST_HEIGHT, "total_up_time": 0.0 } diff --git a/scripts/lifter_state.json b/scripts/lifter_state.json index e1e0664..b65abd5 100644 --- a/scripts/lifter_state.json +++ b/scripts/lifter_state.json @@ -1,4 +1,16 @@ { - "last_position": 26.0, - "total_up_time": 29.703703703703702 + "last_position": 23.7, + "total_up_time": 30.629789175810636, + "usage_periods": [ + [ + 1763493039.64621, + 1763493040.5722954, + 0.9260854721069336 + ], + [ + 1763493050.788349, + 1763493051.6976275, + 0.9092786312103271 + ] + ] } \ No newline at end of file From 30f01633d4812be03216dc8302973b4d24c07133 Mon Sep 17 00:00:00 2001 From: sgbaird Date: Tue, 18 Nov 2025 16:54:38 -0500 Subject: [PATCH 7/7] Refactor duty cycle management and add movement generation utility - Updated duty_cycle.py to enhance movement validation against duty cycle limits. - Introduced generate_movements.py for generating movement configurations based on current duty cycle status. - Modified lifter_state.json to reflect updated usage periods and total up time. - Enhanced main.py to support new commands for deploying immediate and custom movements. - Created movement_configs.json to store generated movement configurations. - Updated prefect_flows.py to simplify flow definitions and integrate custom movements execution. - Added duty cycle monitoring flow for scheduled checks and recommendations based on capacity. --- scripts/desk_controller.py | 313 ++++++++++++++++++++++++++++-- scripts/duty_cycle.py | 118 +++++++++--- scripts/generate_movements.py | 123 ++++++++++++ scripts/lifter_state.json | 161 +++++++++++++++- scripts/main.py | 48 ++--- scripts/movement_configs.json | 74 ++++++++ scripts/prefect_flows.py | 346 ++++++++++++++++++++++++++-------- 7 files changed, 1037 insertions(+), 146 deletions(-) create mode 100644 scripts/generate_movements.py create mode 100644 scripts/movement_configs.json diff --git a/scripts/desk_controller.py b/scripts/desk_controller.py index 255f707..d4ab89f 100644 --- a/scripts/desk_controller.py +++ b/scripts/desk_controller.py @@ -7,11 +7,163 @@ from typing import Optional from duty_cycle import ( - load_state, save_state, check_duty_cycle_limits, - record_usage_period, get_duty_cycle_status + check_movement_against_duty_cycle, + record_usage_period, + get_duty_cycle_status, + get_current_duty_cycle_usage, + show_duty_cycle_status, + load_state, + save_state, + DUTY_CYCLE_MAX_ON_TIME, + DUTY_CYCLE_PERIOD ) from movement_control import setup_gpio, cleanup_gpio, move_up, move_down + +def check_duty_cycle_status_before_execution() -> dict: + """ + Check current duty cycle status before executing movements. + Returns comprehensive status information for decision making. + + Returns: + dict: { + "current_usage": float, # Seconds used in current window + "remaining_capacity": float, # Seconds remaining + "percentage_used": float, # Percentage of duty cycle used + "max_single_movement": float, # Max movement time within height limits + "movements_possible": int, # Est. number of max movements possible + "current_position": float, # Current desk position + "window_period": int, # Duty cycle window (1200s) + "recommendations": list # Usage recommendations + } + """ + print("=== PRE-EXECUTION DUTY CYCLE STATUS CHECK ===") + + state = load_state() + current_usage = get_current_duty_cycle_usage(state) + remaining_capacity = DUTY_CYCLE_MAX_ON_TIME - current_usage + percentage_used = (current_usage / DUTY_CYCLE_MAX_ON_TIME) * 100 + current_position = state.get("last_position", 24.0) + + # Calculate max possible movement within height range [23.7-54.5] + height_range_max = 54.5 - 23.7 # 30.8 inches max movement + max_single_movement = height_range_max / 4.8 # 6.4 seconds + + # Estimate how many max movements are possible + movements_possible = int(remaining_capacity / max_single_movement) if remaining_capacity > 0 else 0 + + # Generate recommendations + recommendations = [] + if remaining_capacity < 10: + recommendations.append("⚠️ Very low duty cycle remaining - consider waiting") + elif remaining_capacity < 30: + recommendations.append("⚠️ Low duty cycle remaining - use small movements only") + elif percentage_used > 80: + recommendations.append("⚠️ High duty cycle usage - plan movements carefully") + else: + recommendations.append("✅ Good duty cycle capacity available") + + if movements_possible == 0: + recommendations.append("❌ No large movements possible - only small adjustments") + elif movements_possible < 3: + recommendations.append(f"⚠️ Only ~{movements_possible} large movements possible") + else: + recommendations.append(f"✅ ~{movements_possible} large movements possible") + + # Display status + print(f"Current usage: {current_usage:.1f}s / {DUTY_CYCLE_MAX_ON_TIME}s ({percentage_used:.1f}%)") + print(f"Remaining capacity: {remaining_capacity:.1f}s") + print(f"Current position: {current_position}\"") + print(f"Max single movement: {max_single_movement:.1f}s (within height range)") + print(f"Estimated large movements possible: {movements_possible}") + print() + print("Recommendations:") + for rec in recommendations: + print(f" {rec}") + print() + + return { + "current_usage": current_usage, + "remaining_capacity": remaining_capacity, + "percentage_used": percentage_used, + "max_single_movement": max_single_movement, + "movements_possible": movements_possible, + "current_position": current_position, + "window_period": DUTY_CYCLE_PERIOD, + "recommendations": recommendations + } + + +def generate_safe_movement_suggestions(max_movements: int = 5) -> list: + """ + Generate safe movement suggestions based on current duty cycle status. + + Args: + max_movements: Maximum number of movements to suggest + + Returns: + list: List of suggested movements within safety limits + """ + status = check_duty_cycle_status_before_execution() + + current_pos = status["current_position"] + remaining_capacity = status["remaining_capacity"] + max_single_time = status["max_single_movement"] + + suggestions = [] + + if remaining_capacity < 5: + # Only tiny movements + suggestions.append({ + "id": "tiny_up", + "description": f"Tiny up movement: {current_pos}\" → {current_pos + 1:.1f}\" (0.2s)", + "target_height": current_pos + 1.0, + "current_height": current_pos, + "enabled": True + }) + suggestions.append({ + "id": "tiny_down", + "description": f"Tiny down movement: {current_pos}\" → {current_pos - 1:.1f}\" (0.2s)", + "target_height": current_pos - 1.0, + "current_height": current_pos, + "enabled": True + }) + elif remaining_capacity < 15: + # Small movements only + for i in range(min(max_movements, 3)): + up_target = min(current_pos + 5, 54.0) + down_target = max(current_pos - 5, 24.0) + + suggestions.append({ + "id": f"small_movement_{i+1}", + "description": f"Small movement: {current_pos}\" → {up_target}\" (1.0s)", + "target_height": up_target, + "current_height": current_pos, + "enabled": True + }) + current_pos = up_target + else: + # Can do larger movements + positions = [30.0, 45.0, 35.0, 50.0, 25.0, 40.0] + + for i in range(min(max_movements, len(positions))): + target = positions[i] + if 23.7 <= target <= 54.5: # Within safe range + estimated_time = abs(target - current_pos) / 4.8 + + if estimated_time <= remaining_capacity: + suggestions.append({ + "id": f"suggested_move_{i+1}", + "description": f"Suggested movement: {current_pos:.1f}\" → {target}\" ({estimated_time:.1f}s)", + "target_height": target, + "current_height": current_pos, + "enabled": True + }) + current_pos = target + remaining_capacity -= estimated_time + + return suggestions + try: from constants import LOWEST_HEIGHT, HIGHEST_HEIGHT, UP_RATE, DOWN_RATE except ImportError: @@ -66,14 +218,13 @@ def move_to_height(target_height: float, current_height: Optional[float] = None) required_time = delta / UP_RATE direction = "up" - # Check duty cycle limits - is_valid, updated_state, message = check_duty_cycle_limits(state, required_time) - state = updated_state + # Check duty cycle limits using the new function + check_result = check_movement_against_duty_cycle(target_height, current_height, UP_RATE, DOWN_RATE) - if not is_valid: - raise ValueError(message) + if not check_result["allowed"]: + raise ValueError(check_result["error"]) - print(message) + print(f"Duty cycle OK: {check_result['current_usage']:.1f}s + {check_result['estimated_duration']:.1f}s <= {DUTY_CYCLE_MAX_ON_TIME}s") # Execute movement and get actual timing start_time, end_time, actual_duration = move_up(required_time) @@ -86,14 +237,13 @@ def move_to_height(target_height: float, current_height: Optional[float] = None) required_time = abs(delta) / DOWN_RATE direction = "down" - # Check duty cycle limits - is_valid, updated_state, message = check_duty_cycle_limits(state, required_time) - state = updated_state + # Check duty cycle limits using the new function + check_result = check_movement_against_duty_cycle(target_height, current_height, UP_RATE, DOWN_RATE) - if not is_valid: - raise ValueError(message) + if not check_result["allowed"]: + raise ValueError(check_result["error"]) - print(message) + print(f"Duty cycle OK: {check_result['current_usage']:.1f}s + {check_result['estimated_duration']:.1f}s <= {DUTY_CYCLE_MAX_ON_TIME}s") # Execute movement and get actual timing start_time, end_time, actual_duration = move_down(required_time) @@ -191,6 +341,141 @@ def test_sequence(movement_distance: float = 0.5, rest_time: float = 10.0) -> di } +def load_movement_configs(config_file: str = "movement_configs.json") -> list: + """Load movement configurations from JSON file""" + import json + import os + + print(f"Loading movement configurations from {config_file}") + + if not os.path.exists(config_file): + raise FileNotFoundError(f"Configuration file {config_file} not found") + + with open(config_file, 'r') as f: + config = json.load(f) + + # Filter for enabled movements only + enabled_movements = [m for m in config.get("movements", []) if m.get("enabled", True)] + + print(f"Found {len(enabled_movements)} enabled movements") + return enabled_movements + + +def validate_movement_config(movement: dict) -> dict: + """Validate a movement configuration before execution""" + movement_id = movement.get("id", "unknown") + target_height = movement["target_height"] + current_height = movement.get("current_height") + + print(f"Validating movement {movement_id}: {current_height}\" → {target_height}\"") + + # Check duty cycle limits + check_result = check_movement_against_duty_cycle(target_height, current_height) + + if not check_result["allowed"]: + error_msg = f"Movement {movement_id} rejected: {check_result['error']}" + print(f"❌ {error_msg}") + raise ValueError(error_msg) + + print(f"✅ Movement {movement_id} validated: {check_result['estimated_duration']:.1f}s, {check_result['movement_type']}") + return check_result + + +def execute_movement_config(movement: dict) -> dict: + """Execute a movement from configuration""" + movement_id = movement.get("id", "unknown") + target_height = movement["target_height"] + current_height = movement.get("current_height") + + print(f"Executing configured movement {movement_id}: {movement.get('description', '')}") + + result = move_to_height(target_height, current_height) + + if result["success"]: + print(f"✅ Movement {movement_id} completed: {result['duration']:.1f}s, final height: {result['end_height']}\"") + else: + print(f"❌ Movement {movement_id} failed: {result['error']}") + raise ValueError(result["error"]) + + return result + + +def execute_custom_movements(config_file: str = "movement_configs.json") -> dict: + """Execute custom movements from configuration file""" + print("=== CUSTOM MOVEMENTS EXECUTION ===") + + # ALWAYS check duty cycle status before execution + duty_status = check_duty_cycle_status_before_execution() + + # If very low capacity, warn and potentially abort + if duty_status["remaining_capacity"] < 1.0: + print("❌ EXECUTION ABORTED: Insufficient duty cycle capacity remaining") + return { + "success": False, + "error": "Insufficient duty cycle capacity", + "duty_status": duty_status + } + + # Load movement configurations + print("Loading movement configurations from movement_configs.json") + movements = load_movement_configs(config_file) + + if not movements: + print("⚠️ No enabled movements found in configuration") + return {"success": False, "error": "No movements to execute"} + + results = [] + + for movement in movements: + movement_id = movement.get("id", "unknown") + print(f"\nProcessing movement: {movement_id}") + + try: + # Validate movement first + validation_result = validate_movement_config(movement) + + # Execute the movement if validation passed + execution_result = execute_movement_config(movement) + + results.append({ + "movement_id": movement_id, + "success": True, + "validation": validation_result, + "execution": execution_result + }) + + except Exception as e: + print(f"❌ Movement {movement_id} failed: {str(e)}") + results.append({ + "movement_id": movement_id, + "success": False, + "error": str(e) + }) + # Continue with remaining movements + + successful_movements = [r for r in results if r["success"]] + failed_movements = [r for r in results if not r["success"]] + + print(f"\n=== EXECUTION SUMMARY ===") + print(f"Total movements: {len(results)}") + print(f"Successful: {len(successful_movements)}") + print(f"Failed: {len(failed_movements)}") + + # Show final duty cycle status + print(f"\n=== FINAL DUTY CYCLE STATUS ===") + final_status = check_duty_cycle_status_before_execution() + + return { + "success": len(failed_movements) == 0, + "total_movements": len(results), + "successful": len(successful_movements), + "failed": len(failed_movements), + "results": results, + "initial_duty_status": duty_status, + "final_duty_status": final_status + } + + def cli_interface(): """Command-line interface for desk control""" try: diff --git a/scripts/duty_cycle.py b/scripts/duty_cycle.py index 3a82eb5..5d3a6c4 100644 --- a/scripts/duty_cycle.py +++ b/scripts/duty_cycle.py @@ -10,7 +10,7 @@ import os from constants import LOWEST_HEIGHT from datetime import datetime -from typing import List, Tuple, Dict, Any +from typing import List, Tuple, Dict, Any, Optional # Duty cycle constants DUTY_CYCLE_PERIOD = 1200 # 20 minutes in seconds @@ -84,48 +84,99 @@ def get_current_duty_cycle_usage(state: Dict[str, Any]) -> float: return total_usage -def get_remaining_duty_time(state: Dict[str, Any]) -> float: - """Get remaining duty cycle time in seconds""" - current_usage = get_current_duty_cycle_usage(state) - return max(0, DUTY_CYCLE_MAX_ON_TIME - current_usage) - - def record_usage_period(state: Dict[str, Any], start_time: float, end_time: float, duration: float) -> Dict[str, Any]: """Record a usage period in the duty cycle tracking""" state["usage_periods"].append([start_time, end_time, duration]) return state -def check_duty_cycle_limits(state: Dict[str, Any], required_time: float) -> Tuple[bool, Dict[str, Any], str]: +def check_movement_against_duty_cycle(target_height: float, current_height: Optional[float] = None, up_rate: float = 4.8, down_rate: float = 4.8) -> dict: """ - Check if the movement is within duty cycle limits using sliding window + Check if a movement to a target height would exceed duty cycle limits. + Args: + target_height: Target height in mm/inches + current_height: Current height (if None, loads from state) + up_rate: Movement rate upward (mm/s or inches/s) + down_rate: Movement rate downward (mm/s or inches/s) + Returns: - (is_valid, updated_state, info_message) + dict: { + "allowed": bool, + "error": str or None, + "estimated_duration": float, + "current_usage": float, + "remaining_capacity": float, + "movement_type": "UP" or "DOWN", + "distance": float + } """ - # Clean old periods and get current usage - state = clean_old_usage_periods(state) - current_usage = get_current_duty_cycle_usage(state) + # Load current state + state = load_state() + + if current_height is None: + current_height = state.get("last_position", LOWEST_HEIGHT) + + # Calculate movement requirements + distance = abs(target_height - current_height) + movement_type = "UP" if target_height > current_height else "DOWN" + rate = up_rate if movement_type == "UP" else down_rate + + if distance == 0: + return { + "allowed": True, + "error": None, + "estimated_duration": 0.0, + "current_usage": get_current_duty_cycle_usage(state), + "remaining_capacity": DUTY_CYCLE_MAX_ON_TIME - get_current_duty_cycle_usage(state), + "movement_type": movement_type, + "distance": distance + } + + estimated_duration = distance / rate # Check continuous runtime limit - if required_time > MAX_CONTINUOUS_RUNTIME: - error_msg = f"Movement duration {required_time:.1f}s exceeds maximum continuous runtime of {MAX_CONTINUOUS_RUNTIME}s" - return False, state, error_msg + if estimated_duration > MAX_CONTINUOUS_RUNTIME: + return { + "allowed": False, + "error": f"Movement would take {estimated_duration:.1f}s, exceeding {MAX_CONTINUOUS_RUNTIME}s continuous runtime limit", + "estimated_duration": estimated_duration, + "current_usage": get_current_duty_cycle_usage(state), + "remaining_capacity": DUTY_CYCLE_MAX_ON_TIME - get_current_duty_cycle_usage(state), + "movement_type": movement_type, + "distance": distance + } + + # Check duty cycle limits + current_usage = get_current_duty_cycle_usage(state) + remaining_capacity = DUTY_CYCLE_MAX_ON_TIME - current_usage - # Check if adding this movement would exceed the duty cycle limit - if current_usage + required_time > DUTY_CYCLE_MAX_ON_TIME: - remaining_time = DUTY_CYCLE_MAX_ON_TIME - current_usage - error_msg = f"Movement would exceed {DUTY_CYCLE_PERCENTAGE*100:.0f}% duty cycle limit. Current usage: {current_usage:.1f}s, Remaining: {remaining_time:.1f}s in {DUTY_CYCLE_PERIOD}s window" - return False, state, error_msg + if estimated_duration > remaining_capacity: + return { + "allowed": False, + "error": f"Movement would exceed 10% duty cycle limit. Current usage: {current_usage:.1f}s, Remaining: {remaining_capacity:.1f}s in {DUTY_CYCLE_PERIOD:.0f}s window", + "estimated_duration": estimated_duration, + "current_usage": current_usage, + "remaining_capacity": remaining_capacity, + "movement_type": movement_type, + "distance": distance + } - info_msg = f"Duty cycle OK: {current_usage:.1f}s + {required_time:.1f}s <= {DUTY_CYCLE_MAX_ON_TIME}s ({DUTY_CYCLE_PERCENTAGE*100:.0f}% of {DUTY_CYCLE_PERIOD}s)" - return True, state, info_msg + return { + "allowed": True, + "error": None, + "estimated_duration": estimated_duration, + "current_usage": current_usage, + "remaining_capacity": remaining_capacity, + "movement_type": movement_type, + "distance": distance + } def get_duty_cycle_status(state: Dict[str, Any]) -> Dict[str, float]: """Get current duty cycle status information""" current_usage = get_current_duty_cycle_usage(state) - remaining_time = get_remaining_duty_time(state) + remaining_time = max(0, DUTY_CYCLE_MAX_ON_TIME - current_usage) percentage_used = current_usage / DUTY_CYCLE_MAX_ON_TIME * 100 return { @@ -134,4 +185,21 @@ def get_duty_cycle_status(state: Dict[str, Any]) -> Dict[str, float]: "remaining_time": remaining_time, "percentage_used": percentage_used, "window_period": DUTY_CYCLE_PERIOD - } \ No newline at end of file + } + + +def show_duty_cycle_status(): + """Display current duty cycle status in a user-friendly format""" + state = load_state() + status = get_duty_cycle_status(state) + current_usage = get_current_duty_cycle_usage(state) + + print("Current Duty Cycle Status:") + print(f" Current usage: {current_usage:.2f}s / {status['max_usage']}s") + print(f" Percentage used: {status['percentage_used']:.2f}%") + print(f" Remaining time: {status['remaining_time']:.2f}s") + print(f" Window period: {status['window_period']}s ({status['window_period']/60:.0f} minutes)") + + if len(state.get("usage_periods", [])) > 0: + print(f" Recent usage periods: {len(state['usage_periods'])}") + print(f" Total up time (all time): {state.get('total_up_time', 0):.1f}s") \ No newline at end of file diff --git a/scripts/generate_movements.py b/scripts/generate_movements.py new file mode 100644 index 0000000..5a608d7 --- /dev/null +++ b/scripts/generate_movements.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 +""" +Generate movement configurations based on current duty cycle status. + +This utility checks the current duty cycle usage and generates appropriate +movement configurations that will demonstrate both successful movements +and duty cycle limit protection. +""" + +import json +from desk_controller import check_duty_cycle_status_before_execution, generate_safe_movement_suggestions + +def generate_duty_cycle_test_config(output_file: str = "movement_configs.json"): + """ + Generate movement configurations that will test duty cycle limits. + + Creates movements that: + 1. Respect the 30-second continuous runtime limit + 2. Use available capacity efficiently + 3. Demonstrate successful movements within limits + 4. Show duty cycle protection when limits are exceeded + """ + + print("=== GENERATING MOVEMENT CONFIGS BASED ON CURRENT DUTY CYCLE ===") + + # Check current status + status = check_duty_cycle_status_before_execution() + + remaining = status["remaining_capacity"] + current_pos = status["current_position"] + max_movement_time = status["max_single_movement"] + + # IMPORTANT: Respect 30-second continuous runtime limit + MAX_CONTINUOUS_TIME = 30.0 + max_safe_distance = MAX_CONTINUOUS_TIME * 4.8 # 144 inches + + # But also respect height range [23.7-54.5] + max_range_distance = 54.5 - 23.7 # 30.8 inches + practical_max_distance = min(max_safe_distance, max_range_distance) # 30.8 inches + practical_max_time = practical_max_distance / 4.8 # 6.4 seconds + + print(f"Max distance by continuous runtime: {max_safe_distance:.1f} inches ({MAX_CONTINUOUS_TIME}s)") + print(f"Max distance by height range: {max_range_distance:.1f} inches") + print(f"Practical max distance: {practical_max_distance:.1f} inches ({practical_max_time:.1f}s)") + + # Calculate how many practical movements we can do + full_movements_possible = int(remaining / practical_max_time) + + movements = [] + + if remaining < 5: + print("Very low capacity - generating minimal movements") + movements = [ + { + "id": "minimal_test", + "description": f"Minimal movement due to low capacity ({remaining:.1f}s remaining)", + "target_height": min(current_pos + 2.0, 54.0), + "current_height": current_pos, + "enabled": True + } + ] + else: + print(f"Generating {full_movements_possible + 2} movements to test duty cycle limits") + + # Generate movements that respect both limits + pos = current_pos + + for i in range(full_movements_possible): + # Alternate between small and medium movements within safe range + if i % 2 == 0: + # Medium movement up (within 30.8 inch limit) + distance = min(15.0, 54.0 - pos) # 15 inches = 3.1s + target = min(54.0, pos + distance) + else: + # Medium movement down + distance = min(15.0, pos - 24.0) # 15 inches = 3.1s + target = max(24.0, pos - distance) + + actual_distance = abs(target - pos) + time_est = actual_distance / 4.8 + + movements.append({ + "id": f"success_move_{i+1:02d}", + "description": f"SUCCESS: {pos:.1f}→{target:.1f}\" ({actual_distance:.1f}in = {time_est:.1f}s)", + "target_height": target, + "current_height": pos, + "enabled": True + }) + pos = target + + # Add movements that should fail due to duty cycle (not continuous runtime) + # These will be small enough to pass continuous runtime but exceed duty cycle + movements.extend([ + { + "id": "fail_duty_cycle_1", + "description": f"FAIL: Should exceed duty cycle limit (small movement but no capacity)", + "target_height": min(pos + 10.0, 54.0), # Small 10-inch movement = 2.1s + "current_height": pos, + "enabled": True + }, + { + "id": "fail_duty_cycle_2", + "description": f"FAIL: Should definitely exceed duty cycle limit", + "target_height": max(pos - 10.0, 24.0), # Small 10-inch movement = 2.1s + "current_height": pos, + "enabled": True + } + ]) + + config = {"movements": movements} + + # Save to file + with open(output_file, 'w') as f: + json.dump(config, f, indent=2) + + print(f"\n✅ Generated {len(movements)} movements in {output_file}") + print(f"Expected: {full_movements_possible} successes, {len(movements) - full_movements_possible} duty cycle failures") + print(f"All movements respect 30s continuous runtime limit") + + return config + +if __name__ == "__main__": + generate_duty_cycle_test_config() \ No newline at end of file diff --git a/scripts/lifter_state.json b/scripts/lifter_state.json index b65abd5..d4c18d1 100644 --- a/scripts/lifter_state.json +++ b/scripts/lifter_state.json @@ -1,16 +1,161 @@ { - "last_position": 23.7, - "total_up_time": 30.629789175810636, + "last_position": 30.0, + "total_up_time": 184.9553032875061, "usage_periods": [ [ - 1763493039.64621, - 1763493040.5722954, - 0.9260854721069336 + 1763494756.4042995, + 1763494786.4042995, + 30.0 ], [ - 1763493050.788349, - 1763493051.6976275, - 0.9092786312103271 + 1763494956.4042995, + 1763494981.4042995, + 25.0 + ], + [ + 1763495156.4042995, + 1763495176.4042995, + 20.0 + ], + [ + 1763495356.4042995, + 1763495381.4042995, + 25.0 + ], + [ + 1763495856.405774, + 1763495857.0423234, + 0.6365492343902588 + ], + [ + 1763497574.8526337, + 1763497576.6710057, + 1.8183720111846924 + ], + [ + 1763497617.6946945, + 1763497621.3312733, + 3.6365787982940674 + ], + [ + 1763497928.1398637, + 1763497931.776416, + 3.636552333831787 + ], + [ + 1763498408.447813, + 1763498426.9665084, + 18.51869535446167 + ], + [ + 1763498863.0324163, + 1763498890.8104022, + 27.77798581123352 + ], + [ + 1763498890.9123645, + 1763498918.1852903, + 27.272925853729248 + ], + [ + 1763498918.2872386, + 1763498946.0652158, + 27.777977228164673 + ], + [ + 1763498947.3871467, + 1763498954.7947295, + 7.407582759857178 + ], + [ + 1763499185.2448037, + 1763499188.8813705, + 3.6365668773651123 + ], + [ + 1763499952.5517168, + 1763499954.4037511, + 1.852034330368042 + ], + [ + 1763500576.6500714, + 1763500578.5023472, + 1.8522758483886719 + ], + [ + 1763500585.8909063, + 1763500587.7429347, + 1.8520283699035645 + ], + [ + 1763500854.9809864, + 1763500856.8330512, + 1.852064847946167 + ], + [ + 1763500856.934996, + 1763500860.6388907, + 3.703894853591919 + ], + [ + 1763501062.8640413, + 1763501064.716086, + 1.8520445823669434 + ], + [ + 1763501064.817657, + 1763501068.521543, + 3.703886032104492 + ], + [ + 1763501188.6183019, + 1763501195.8912296, + 7.272927761077881 + ], + [ + 1763501195.9933672, + 1763501207.1046762, + 11.111309051513672 + ], + [ + 1763501207.2066073, + 1763501214.4795344, + 7.2729270458221436 + ], + [ + 1763501388.0313075, + 1763501389.883359, + 1.8520514965057373 + ], + [ + 1763501390.1899018, + 1763501419.8197086, + 29.629806756973267 + ], + [ + 1763501504.581304, + 1763501508.2851863, + 3.7038822174072266 + ], + [ + 1763501521.9692283, + 1763501525.6731281, + 3.70389986038208 + ], + [ + 1763501599.0091352, + 1763501602.7130191, + 3.7038838863372803 + ], + [ + 1763502238.5140986, + 1763502242.1506405, + 3.6365418434143066 + ], + [ + 1763502260.9601638, + 1763502264.5967178, + 3.636554002761841 ] ] } \ No newline at end of file diff --git a/scripts/main.py b/scripts/main.py index f9ab5cd..9bc7785 100644 --- a/scripts/main.py +++ b/scripts/main.py @@ -12,7 +12,10 @@ python main.py # Interactive CLI python main.py test # Run test sequence python main.py deploy # Deploy Prefect automation - python main.py move 25.0 30.0 # Move from 25" to 30" + python main.py deploy --immediate # Deploy immediate test + python main.py deploy --movements # Deploy custom movements flow + python main.py move # Move between heights + python main.py custom-movements # Execute custom movements from JSON python main.py status # Show duty cycle status """ @@ -23,8 +26,8 @@ sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) # Import from the modular files -from desk_controller import move_to_height, test_sequence, cli_interface -from duty_cycle import get_duty_cycle_status, load_state +from desk_controller import move_to_height, test_sequence, cli_interface, execute_custom_movements +from duty_cycle import get_duty_cycle_status, show_duty_cycle_status, load_state def show_help(): @@ -32,24 +35,6 @@ def show_help(): print(__doc__) -def show_duty_cycle_status(): - """Show current duty cycle status""" - state = load_state() - status = get_duty_cycle_status(state) - - print("=== Duty Cycle Status ===") - print(f"Current usage: {status['current_usage']:.1f}s / {status['max_usage']}s") - print(f"Percentage used: {status['percentage_used']:.1f}%") - print(f"Remaining time: {status['remaining_time']:.1f}s") - print(f"Window period: {status['window_period']}s (20 minutes)") - print(f"Total periods tracked: {len(state['usage_periods'])}") - - if state.get("last_position"): - print(f"Last known position: {state['last_position']}\"") - if state.get("total_up_time"): - print(f"Total up time (all time): {state['total_up_time']:.1f}s") - - def main(): """Main entry point""" if len(sys.argv) == 1: @@ -71,7 +56,7 @@ def main(): result = test_sequence(distance, rest_time) if not result["success"]: - print(f"Test failed: {result.get('error', 'Unknown error')}") + print("Failed to deploy flows to Prefect Cloud") sys.exit(1) elif sys.argv[1] == "move": @@ -93,13 +78,32 @@ def main(): show_duty_cycle_status() elif sys.argv[1] == "deploy": + # Deploy Prefect flows if "--immediate" in sys.argv: from prefect_flows import deploy_test_sequence_immediate deploy_test_sequence_immediate() + elif "--movements" in sys.argv: + from prefect_flows import deploy_custom_movements_flow + deploy_custom_movements_flow() else: from prefect_flows import deploy_test_sequence deploy_test_sequence() + elif sys.argv[1] == "custom-movements": + # Execute custom movements from JSON configuration + try: + result = execute_custom_movements() + if result["success"]: + print(f"✅ All movements completed successfully ({result['successful']}/{result['total_movements']})") + else: + print(f"❌ Some movements failed ({result['failed']}/{result['total_movements']} failed)") + for failed in [r for r in result['results'] if not r['success']]: + print(f" - {failed['movement_id']}: {failed['error']}") + sys.exit(1) + except Exception as e: + print(f"❌ Custom movements failed: {e}") + sys.exit(1) + elif sys.argv[1] == "prefect-test": # Run test sequence via Prefect try: diff --git a/scripts/movement_configs.json b/scripts/movement_configs.json new file mode 100644 index 0000000..3542eb0 --- /dev/null +++ b/scripts/movement_configs.json @@ -0,0 +1,74 @@ +{ + "movements": [ + { + "id": "step_01", + "description": "Step 1: 30.0->28.0\" (2in down, ~0.4s)", + "target_height": 28.0, + "current_height": 30.0, + "enabled": true + }, + { + "id": "step_02", + "description": "Step 2: 28.0->26.0\" (2in down, ~0.4s)", + "target_height": 26.0, + "current_height": 28.0, + "enabled": true + }, + { + "id": "step_03", + "description": "Step 3: 26.0->24.0\" (2in down, ~0.4s)", + "target_height": 24.0, + "current_height": 26.0, + "enabled": true + }, + { + "id": "step_04", + "description": "Step 4: 24.0->22.0\" (2in down, ~0.4s)", + "target_height": 22.0, + "current_height": 24.0, + "enabled": true + }, + { + "id": "step_05", + "description": "Step 5: 22.0->20.0\" (2in down, ~0.4s)", + "target_height": 20.0, + "current_height": 22.0, + "enabled": true + }, + { + "id": "step_06", + "description": "Step 6: 20.0->18.0\" (2in down, ~0.4s)", + "target_height": 18.0, + "current_height": 20.0, + "enabled": true + }, + { + "id": "step_07", + "description": "Step 7: 18.0->16.0\" (2in down, ~0.4s)", + "target_height": 16.0, + "current_height": 18.0, + "enabled": true + }, + { + "id": "step_08", + "description": "Step 8: 16.0->14.0\" (2in down, ~0.4s)", + "target_height": 14.0, + "current_height": 16.0, + "enabled": true + }, + { + "id": "step_09", + "description": "Step 9: 14.0->12.0\" (2in down, ~0.4s)", + "target_height": 12.0, + "current_height": 14.0, + "enabled": true + }, + { + "id": "step_10", + "description": "Step 10: 12.0->10.0\" (2in down, ~0.4s)", + "target_height": 10.0, + "current_height": 12.0, + "enabled": true + } + ] +} \ No newline at end of file diff --git a/scripts/prefect_flows.py b/scripts/prefect_flows.py index 1856fbe..05de3fb 100644 --- a/scripts/prefect_flows.py +++ b/scripts/prefect_flows.py @@ -1,29 +1,64 @@ """ -Prefect flows and deployments for automated desk control. +Simplified Prefect flows for automated desk control. Provides scheduled automation and workflow orchestration using Prefect. -Integrates with the desk controller for safe, automated movements. +Uses the comprehensive desk_controller.execute_custom_movements() function. """ import time +import os +import sys from prefect import flow, task from prefect.logging import get_run_logger -# Import our modular components -from desk_controller import move_to_height, test_sequence, LOWEST_HEIGHT +# Add the scripts directory to Python path for imports +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +# Import our modular components +from desk_controller import ( + move_to_height, + test_sequence, + LOWEST_HEIGHT, + execute_custom_movements, + check_duty_cycle_status_before_execution +) +from duty_cycle import show_duty_cycle_status, get_duty_cycle_status, load_state @task def log_info(message: str): """Log information message""" - logger = get_run_logger() - logger.info(message) print(message) +@task +def duty_cycle_status_task(): + """ + Check duty cycle status as a Prefect task. + Reuses existing check_duty_cycle_status_before_execution() from desk_controller. + """ + logger = get_run_logger() + + try: + # Use the existing function - no need to reimplement + status = check_duty_cycle_status_before_execution() + + # Log for Prefect monitoring + logger.info(f"Duty cycle check completed:") + logger.info(f" Usage: {status['current_usage']:.1f}s / 120s ({status['percentage_used']:.1f}%)") + logger.info(f" Remaining: {status['remaining_capacity']:.1f}s") + logger.info(f" Position: {status['current_position']}\"") + + return status + + except Exception as e: + logger.error(f"Failed to check duty cycle status: {e}") + raise + + @task def execute_movement(target_height: float, current_height: float = None): - """Execute a movement as a Prefect task""" + """Execute a single movement as a Prefect task""" logger = get_run_logger() try: @@ -41,6 +76,37 @@ def execute_movement(target_height: float, current_height: float = None): raise +@task +def execute_custom_movements_task(config_file: str = "movement_configs.json"): + """ + Execute custom movements from configuration file as a Prefect task. + + This is a thin wrapper around desk_controller.execute_custom_movements() + which already handles all the complexity: + - Loading movement configs + - Pre-execution duty cycle checking + - Movement validation + - Movement execution + - Post-execution status reporting + """ + logger = get_run_logger() + logger.info(f"Executing custom movements from {config_file}") + + try: + # This function does EVERYTHING - no need for separate loading/validation tasks + result = execute_custom_movements(config_file) + + if result["success"]: + logger.info(f"✅ All movements completed successfully ({result['successful']}/{result['total_movements']})") + else: + logger.info(f"⚠️ Movements completed with some failures ({result['failed']}/{result['total_movements']} failed)") + + return result + except Exception as e: + logger.error(f"❌ Custom movements execution failed: {e}") + raise + + @task def execute_test_sequence(movement_distance: float = 0.5, rest_time: float = 10.0): """Execute test sequence as a Prefect task""" @@ -61,116 +127,242 @@ def execute_test_sequence(movement_distance: float = 0.5, rest_time: float = 10. raise +# ============================================================================= +# FLOWS +# ============================================================================= + @flow -def move_to_height_flow(target_height: float, current_height: float = None): - """Prefect flow for moving to a specific height""" - log_info(f"Starting movement flow: target={target_height}, current={current_height}") +def simple_movement_flow(target_height: float, current_height: float = None): + """Simple Prefect flow for moving to a specific height with duty cycle checking""" + logger = get_run_logger() + logger.info(f"=== SIMPLE MOVEMENT FLOW ===") + logger.info(f"Target: {target_height}\", Current: {current_height}\"") + + # Check duty cycle status using existing function + initial_status = duty_cycle_status_task() + + # Abort if insufficient capacity + if initial_status["remaining_capacity"] < 1.0: + logger.error("❌ MOVEMENT ABORTED: Insufficient duty cycle capacity") + raise ValueError("Insufficient duty cycle capacity - must wait for reset") + # Execute the movement result = execute_movement(target_height, current_height) - log_info(f"Movement flow completed: {result}") - return result + # Check final duty cycle status + final_status = duty_cycle_status_task() + + # Log usage + capacity_used = initial_status["remaining_capacity"] - final_status["remaining_capacity"] + logger.info(f"Movement completed - Duty cycle used: {capacity_used:.1f}s") + + return { + **result, + "initial_duty_status": initial_status, + "final_duty_status": final_status, + "capacity_used": capacity_used + } -@flow -def custom_test_sequence_flow(movement_distance: float = 0.5, rest_time: float = 10.0): - """Prefect flow for custom test sequence""" - log_info(f"Starting test sequence: distance={movement_distance}, rest={rest_time}") +@flow +def custom_movements_flow(config_file: str = "movement_configs.json"): + """ + Simplified Prefect flow to execute custom movements. - start_height = LOWEST_HEIGHT - up_target = start_height + movement_distance + Uses the comprehensive desk_controller.execute_custom_movements() function + which already includes all necessary features internally. + """ + logger = get_run_logger() + logger.info("=== CUSTOM MOVEMENTS FLOW ===") - log_info(f"Test sequence plan:") - log_info(f" Starting at: {start_height}\"") - log_info(f" Will move to: {up_target}\"") - log_info(f" Then rest for {rest_time} seconds") - log_info(f" Then return to: {start_height}\"") + # Execute custom movements - this function already does all the duty cycle checking + result = execute_custom_movements_task(config_file) - # Phase 1: Move up - log_info(f"--- Phase 1: Moving UP {movement_distance} inches ---") - result1 = execute_movement(up_target, start_height) + logger.info("Custom movements flow completed") + return result + + +@flow +def duty_cycle_monitoring_flow(): + """ + Simplified duty cycle monitoring flow. + Uses existing duty cycle checking functions. + """ + logger = get_run_logger() + logger.info("=== DUTY CYCLE MONITORING FLOW ===") - # Phase 2: Rest - log_info(f"--- Phase 2: Resting for {rest_time} seconds ---") - time.sleep(rest_time) - log_info("Rest complete.") + # Use existing duty cycle status function + status = duty_cycle_status_task() - # Phase 3: Move down - log_info(f"--- Phase 3: Moving DOWN {movement_distance} inches ---") - result2 = execute_movement(start_height, up_target) + # Simple recommendation logic + remaining = status["remaining_capacity"] - log_info("Custom test sequence complete!") + if remaining < 5: + recommendation = "wait" + logger.warning("⚠️ VERY LOW CAPACITY - Recommend waiting for duty cycle reset") + elif remaining < 15: + recommendation = "small_movements_only" + logger.warning("⚠️ LOW CAPACITY - Use small movements only") + elif remaining < 60: + recommendation = "moderate_planning" + logger.info("✅ MODERATE CAPACITY - Plan movements carefully") + else: + recommendation = "normal_operations" + logger.info("✅ GOOD CAPACITY - Normal operations possible") return { - "success": True, - "phase1_result": result1, - "phase2_result": result2, - "total_duration": result1.get("duration", 0) + result2.get("duration", 0) + "status": status, + "recommendation": recommendation, + "operational_mode": recommendation } @flow -def desk_control_cli_flow(): - """Prefect flow for CLI-based desk control""" - log_info("Starting CLI flow") +def scheduled_duty_cycle_check(): + """ + Scheduled duty cycle monitoring using existing functions. + Just wraps duty_cycle_monitoring_flow for scheduled execution. + """ + logger = get_run_logger() + logger.info("=== SCHEDULED DUTY CYCLE CHECK ===") - try: - from desk_controller import LOWEST_HEIGHT, HIGHEST_HEIGHT - - current = float(input(f"Enter current height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) - target = float(input(f"Enter target height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) - - result = execute_movement(target, current) - log_info("CLI flow completed successfully!") - return result - - except ValueError as e: - log_info(f"Error: {e}") - raise - except KeyboardInterrupt: - log_info("Operation cancelled.") - raise + # Use the monitoring flow + result = duty_cycle_monitoring_flow() + + # Log summary for scheduled monitoring + status = result["status"] + logger.info(f"Scheduled duty cycle check:") + logger.info(f" Usage: {status['current_usage']:.1f}s / 120s ({status['percentage_used']:.1f}%)") + logger.info(f" Mode: {result['recommendation']}") + + # Alert on very low capacity + if status["remaining_capacity"] < 10: + logger.warning("🚨 ALERT: Very low duty cycle capacity remaining!") + + return result -def deploy_test_sequence(schedule_cron: str = "39 4 * * *", deployment_name: str = "desk-lifter-test-sequence-1139pm-toronto"): - """Deploy the test sequence with scheduling""" +@flow +def test_sequence_flow(movement_distance: float = 0.5, rest_time: float = 10.0): + """Prefect flow for automated test sequence""" + logger = get_run_logger() + logger.info(f"=== TEST SEQUENCE FLOW ===") + logger.info(f"Distance: {movement_distance}\", Rest: {rest_time}s") + + # Check duty cycle before starting using existing function + initial_status = duty_cycle_status_task() + + # Execute test sequence + result = execute_test_sequence(movement_distance, rest_time) - custom_test_sequence_flow.from_source( + # Check final status + final_status = duty_cycle_status_task() + + logger.info("Test sequence flow completed") + return { + **result, + "initial_duty_status": initial_status, + "final_duty_status": final_status + } + + +# ============================================================================= +# DEPLOYMENT FUNCTIONS +# ============================================================================= + +def deploy_custom_movements_flow(deployment_name: str = "custom-movements"): + """Deploy the main custom movements flow""" + + deployment = custom_movements_flow.from_source( source=".", - entrypoint="prefect_flows.py:custom_test_sequence_flow", + entrypoint="prefect_flows.py:custom_movements_flow", ).deploy( name=deployment_name, - work_pool_name="default-agent-pool", - cron=schedule_cron, # Default: Run daily at 11:39 PM Toronto time (4:39 AM UTC) + work_pool_name="default-process-pool", ) - print(f"Deployment '{deployment_name}' created with schedule '{schedule_cron}'!") - print("Run 'prefect worker start --pool default-agent-pool' to execute scheduled flows.") + print(f"✅ Deployment '{deployment_name}' created!") + print(f"To run: prefect deployment run 'custom-movements-flow/{deployment_name}'") + return deployment_name -def deploy_test_sequence_immediate(deployment_name: str = "desk-lifter-test-immediate"): - """Deploy the test sequence without scheduling for immediate execution""" +def deploy_duty_cycle_monitoring(deployment_name: str = "duty-cycle-monitor", schedule_cron: str = None): + """Deploy duty cycle monitoring flow with optional scheduling""" + + deploy_kwargs = { + "name": deployment_name, + "work_pool_name": "default-process-pool", + } - deployment = custom_test_sequence_flow.from_source( + if schedule_cron: + from prefect.client.schemas.schedules import CronSchedule + deploy_kwargs["schedule"] = CronSchedule(cron=schedule_cron) + print(f"Deploying with cron schedule: {schedule_cron}") + + deployment = scheduled_duty_cycle_check.from_source( source=".", - entrypoint="prefect_flows.py:custom_test_sequence_flow", + entrypoint="prefect_flows.py:scheduled_duty_cycle_check", + ).deploy(**deploy_kwargs) + + print(f"✅ Deployment '{deployment_name}' created!") + if schedule_cron: + print(f"Scheduled to run: {schedule_cron}") + else: + print(f"To run: prefect deployment run 'scheduled-duty-cycle-check/{deployment_name}'") + return deployment_name + + +def deploy_test_sequence(deployment_name: str = "test-sequence"): + """Deploy test sequence flow""" + + deployment = test_sequence_flow.from_source( + source=".", + entrypoint="prefect_flows.py:test_sequence_flow", ).deploy( name=deployment_name, work_pool_name="default-process-pool", - # No cron schedule - runs on demand only ) - print(f"Deployment '{deployment_name}' created for immediate execution!") - print(f"To run immediately: prefect deployment run 'custom-test-sequence-flow/{deployment_name}'") + print(f"✅ Deployment '{deployment_name}' created!") + print(f"To run: prefect deployment run 'test-sequence-flow/{deployment_name}'") return deployment_name +def deploy_all_flows(): + """Deploy all desk control flows""" + print("=== DEPLOYING ALL SIMPLIFIED DESK CONTROL FLOWS ===") + + # Deploy main flows + deploy_custom_movements_flow() + deploy_test_sequence() + + # Deploy monitoring flows + deploy_duty_cycle_monitoring("duty-cycle-monitor-scheduled", "*/10 * * * *") + deploy_duty_cycle_monitoring("duty-cycle-monitor-immediate") + + print("\n🎉 All deployments created!") + print("\nAvailable flows:") + print(" 1. custom-movements - Main movement execution") + print(" 2. test-sequence - Automated test sequence") + print(" 3. duty-cycle-monitor-scheduled - Auto monitoring (every 10min)") + print(" 4. duty-cycle-monitor-immediate - On-demand monitoring") + + return True + + if __name__ == "__main__": import sys - if len(sys.argv) > 1 and sys.argv[1] == "deploy": - deploy_test_sequence() - elif len(sys.argv) > 1 and sys.argv[1] == "test": - custom_test_sequence_flow() + if len(sys.argv) > 1: + if sys.argv[1] == "test": + test_sequence_flow() + elif sys.argv[1] == "movements": + custom_movements_flow() + elif sys.argv[1] == "monitor": + duty_cycle_monitoring_flow() + elif sys.argv[1] == "deploy": + deploy_all_flows() + else: + print("Usage: python prefect_flows.py [test|movements|monitor|deploy]") else: - desk_control_cli_flow() \ No newline at end of file + custom_movements_flow() \ No newline at end of file