diff --git a/CHANGELOG.md b/CHANGELOG.md index bdae0c8..e22bc6a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,8 +6,13 @@ - FIX: nasty bug #1729 fixed - add your changes here! +- Hardware: Changed GPIO pin assignments - UP: GPIO 18, DOWN: GPIO 17 — 2025-11-18 +- Refactor: Centralized GPIO pin constants in `scripts/constants.py` for maintainability — 2025-11-14 - Docs: Added `docs/bill_of_materials.md` (Bill of Materials) — 2025-11-14 - Docs: Added official product links to `docs/bill_of_materials.md` — 2025-11-14 - Docs: Added prices to `docs/bill_of_materials.md` (prices as of 11/6/2025) — 2025-11-14 - Build: Moved publishing to GitHub Actions trusted publisher workflow and aligned tooling docs — 2025-11-14 +- Fix: Added Raspberry Pi 5 GPIO compatibility using rpi-lgpio library — 2025-11-14 +- Docs: Added `docs/raspberry-pi-setup.md` with Pi 5 setup instructions — 2025-11-14 +- Fix: Updated requirements.txt to use rpi-lgpio instead of RPi.GPIO for Pi 5 compatibility — 2025-11-14 diff --git a/README.md b/README.md index 54a4530..b956d64 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,12 @@ A longer description of your project goes here... +## Setup + +For Raspberry Pi 5 setup instructions, see [docs/raspberry-pi-setup.md](docs/raspberry-pi-setup.md). + +For bill of materials, see [docs/bill_of_materials.md](docs/bill_of_materials.md). + diff --git a/docs/raspberry-pi-setup.md b/docs/raspberry-pi-setup.md new file mode 100644 index 0000000..fa2c708 --- /dev/null +++ b/docs/raspberry-pi-setup.md @@ -0,0 +1,110 @@ +# Raspberry Pi 5 GPIO Setup for Desk Lifter Control + +This document describes the setup required to run the desk lifter control scripts on a Raspberry Pi 5 running Debian Trixie. + +## Hardware Requirements + +- Raspberry Pi 5 (with BCM2712 SoC) +- GPIO pins connected to the desk lifter motor controller: + - UP_PIN: GPIO 18 (physical pin 12) + - DOWN_PIN: GPIO 17 (physical pin 11) +- Power supply: 5V USB-C (at least 3A, preferably 5A for high-power peripherals) + +## Software Requirements + +- Debian Trixie (13.x) +- Python 3.11 or later +- Virtual environment (`venv`) + +## GPIO Library Compatibility + +The standard `RPi.GPIO` library does not support Raspberry Pi 5 due to changes in the BCM2712 SoC. Instead, use `rpi-lgpio`, a drop-in replacement that provides the same API but uses the `lgpio` library for GPIO access. + +## Installation Steps + +1. **Clone the repository** (if not already done): + ```bash + git clone https://github.com/AccelerationConsortium/progressive-automations-python.git + cd progressive-automations-python + ``` + +2. **Create and activate a virtual environment**: + ```bash + python3 -m venv venv + source venv/bin/activate + ``` + +3. **Install dependencies**: + ```bash + pip install -r requirements.txt + ``` + +4. **Remove incompatible RPi.GPIO package** (if installed): + ```bash + sudo apt remove -y python3-rpi.gpio + ``` + +5. **Ensure user is in the gpio group** (for GPIO access without sudo): + ```bash + sudo usermod -a -G gpio $USER + ``` + Reboot after adding the user to the group. + +## Running the Scripts + +Activate the virtual environment and run the scripts from the `scripts/` directory: + +```bash +source venv/bin/activate +cd scripts +python move_to_height_no_reset.py +``` + +Available scripts: +- `move_to_height_no_reset.py`: Move the desk to a target height +- `move_to_height.py`: Alternative height control script +- `desk_control_prefect.py`: Prefect-based workflow orchestration +- `test_up.py`: Test upward movement +- `test_down.py`: Test downward movement +- `reset_to_lowest.py`: Reset to lowest position + +## Calibration + +The scripts use calibration data: +- Lowest height: 23.7 inches +- Highest height: 54.5 inches +- Up rate: 0.54 inches/second +- Down rate: 0.55 inches/second + +Adjust these values in the script if your setup differs. + +State is saved in `lifter_state.json` in the scripts directory. + +## Troubleshooting + +### RuntimeError: Cannot determine SOC peripheral base address + +This error occurs when using the old `RPi.GPIO` library on Raspberry Pi 5. Ensure you have installed `rpi-lgpio` and removed `python3-rpi.gpio`. + +### Permission denied on GPIO access + +- Ensure your user is in the `gpio` group: `groups $USER` should include `gpio`. +- If not, run `sudo usermod -a -G gpio $USER` and reboot. + +### Script runs but motor doesn't move + +- Check GPIO pin connections. +- Verify the motor controller is powered and connected correctly. +- Test with `test_up.py` and `test_down.py` to isolate issues. + +### Virtual environment issues + +- Always activate the venv before running scripts: `source venv/bin/activate` +- If pip installs fail with "externally-managed-environment", you're trying to install system-wide. Use the venv. + +## Notes + +- The scripts use BCM pin numbering. +- GPIO access requires root permissions or membership in the `gpio` group. +- On Raspberry Pi 5, USB peripherals may be limited to 600mA if using a 3A power supply. Use a 5A supply for high-power devices. +- This setup is tested on Debian Trixie with Raspberry Pi 5. \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..954f31a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +# Core dependencies for desk lifter control +prefect>=2.0.0 +rpi-lgpio \ No newline at end of file diff --git a/scripts/README.md b/scripts/README.md new file mode 100644 index 0000000..0bbd1ce --- /dev/null +++ b/scripts/README.md @@ -0,0 +1,93 @@ +# Desk Control Scripts + +This directory contains scripts for controlling a Progressive Automations desk lifter. + +## 🚀 Quick Start + +**For the modern modular system:** +```bash +# Navigate to the modular package +cd desk_control/ + +# Run interactive CLI +python main.py + +# Run test sequence +python main.py test + +# Check duty cycle status +python main.py status + +# Deploy Prefect automation +python main.py deploy +``` + +## 📁 Directory Structure + +``` +scripts/ +├── desk_control/ # 🆕 Modern modular system +│ ├── main.py # Main CLI interface +│ ├── desk_controller.py # High-level control logic +│ ├── movement_control.py # GPIO operations +│ ├── duty_cycle.py # Motor protection +│ ├── prefect_flows.py # Automation & scheduling +│ └── README.md # Detailed documentation +├── constants.py # Shared constants (pins, calibration) +├── lifter_state.json # Persistent state file +├── lifter_calibration.txt # Calibration data +└── desk_control_prefect_LEGACY.py # 📦 Original monolithic file (backup) +``` + +## ✨ What's New + +### **Modular Architecture** +The code has been refactored into focused, maintainable modules: + +- **`movement_control.py`** - GPIO pin control and movement execution +- **`duty_cycle.py`** - 10% duty cycle protection with sliding window +- **`desk_controller.py`** - Height management and safety checks +- **`prefect_flows.py`** - Workflow automation and scheduling +- **`main.py`** - Unified command-line interface + +### **Improved Duty Cycle** +- ✅ True sliding window (not hard resets every 20 minutes) +- ✅ Precise timestamp tracking of usage periods +- ✅ Automatic cleanup of old periods +- ✅ Real-time duty cycle status monitoring + +### **Better Safety** +- ✅ Comprehensive error handling +- ✅ GPIO cleanup in all scenarios +- ✅ Continuous runtime limits (30s max per movement) +- ✅ Height range validation + +## 📖 Usage Examples + +```bash +# Basic movement +python desk_control/main.py move 25.0 30.0 # Move from 25" to 30" + +# Test sequence with custom parameters +python desk_control/main.py test 1.0 5.0 # 1" movement, 5s rest + +# Duty cycle monitoring +python desk_control/main.py status + +# Prefect automation +python desk_control/main.py deploy "0 12 * * *" # Deploy for noon daily +``` + +## 🔧 Configuration + +Edit `constants.py` to adjust: +- GPIO pin assignments +- Calibration values (height range, movement rates) + +## 📚 Documentation + +See `desk_control/README.md` for detailed documentation of the modular system. + +## 🏛️ Legacy Code + +The original monolithic file is preserved as `desk_control_prefect_LEGACY.py` for reference, but the modular system in `desk_control/` should be used for all new development. \ No newline at end of file diff --git a/scripts/constants.py b/scripts/constants.py new file mode 100644 index 0000000..e6ac96b --- /dev/null +++ b/scripts/constants.py @@ -0,0 +1,10 @@ +# GPIO pin constants for desk lifter control +# BCM numbering +UP_PIN = 18 # BCM numbering, physical pin 12 +DOWN_PIN = 17 # BCM numbering, physical pin 11 + +# Calibration data +LOWEST_HEIGHT = 23.7 # inches +HIGHEST_HEIGHT = 54.5 # inches +UP_RATE = 0.54 # inches per second +DOWN_RATE = 0.55 # inches per second \ No newline at end of file diff --git a/scripts/desk_control_prefect_LEGACY.py b/scripts/desk_control_prefect_LEGACY.py new file mode 100644 index 0000000..fe4d35b --- /dev/null +++ b/scripts/desk_control_prefect_LEGACY.py @@ -0,0 +1,298 @@ +import time +import json +import os +from datetime import datetime, timedelta +from prefect import flow, task +from prefect.logging import get_run_logger +import RPi.GPIO as GPIO +from constants import UP_PIN, DOWN_PIN + +# Calibration data +LOWEST_HEIGHT = 23.7 # inches +HIGHEST_HEIGHT = 54.5 # inches +UP_RATE = 0.54 # inches per second +DOWN_RATE = 0.55 # inches per second + +STATE_FILE = "lifter_state.json" +DUTY_CYCLE_PERIOD = 1200 # 20 minutes in seconds +DUTY_CYCLE_MAX_ON_TIME = 120 # 2 minutes in seconds (10% of 20 minutes) +DUTY_CYCLE_PERCENTAGE = 0.10 # 10% duty cycle +MAX_CONTINUOUS_RUNTIME = 30 # Maximum continuous movement time in seconds + +GPIO.setmode(GPIO.BCM) + +@task +def setup_gpio(): + """Initialize GPIO settings""" + GPIO.setmode(GPIO.BCM) + +@task +def release_up(): + """Set UP pin to high-impedance state""" + GPIO.setup(UP_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) + +@task +def press_up(): + """Set UP pin to drive low (button pressed)""" + GPIO.setup(UP_PIN, GPIO.OUT, initial=GPIO.LOW) + +@task +def release_down(): + """Set DOWN pin to high-impedance state""" + GPIO.setup(DOWN_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) + +@task +def press_down(): + """Set DOWN pin to drive low (button pressed)""" + GPIO.setup(DOWN_PIN, GPIO.OUT, initial=GPIO.LOW) + +@task +def cleanup_gpio(): + """Clean up GPIO resources""" + release_up() + release_down() + GPIO.cleanup() + +@task +def load_state(): + """Load the current state from JSON file""" + if os.path.exists(STATE_FILE): + with open(STATE_FILE, 'r') as f: + return json.load(f) + return { + "total_up_time": 0.0, + "last_position": None, + "usage_periods": [] # List of [start_timestamp, end_timestamp, duration] entries + } + +@task +def save_state(state): + """Save state to JSON file""" + with open(STATE_FILE, 'w') as f: + json.dump(state, f, indent=2) + +@task +def clean_old_usage_periods(state): + """Remove usage periods older than the duty cycle period""" + current_time = time.time() + cutoff_time = current_time - DUTY_CYCLE_PERIOD + + # Keep only periods that end after the cutoff time + state["usage_periods"] = [ + period for period in state["usage_periods"] + if period[1] > cutoff_time # period[1] is end_timestamp + ] + return state + +@task +def get_current_duty_cycle_usage(state): + """Calculate current duty cycle usage in the sliding window""" + clean_old_usage_periods(state) + current_time = time.time() + + total_usage = 0.0 + for start_time, end_time, duration in state["usage_periods"]: + # Only count usage that's within the duty cycle period + window_start = current_time - DUTY_CYCLE_PERIOD + + # Adjust start and end times to the current window + effective_start = max(start_time, window_start) + effective_end = min(end_time, current_time) + + if effective_end > effective_start: + total_usage += effective_end - effective_start + + return total_usage + +@task +def get_remaining_duty_time(state): + """Get remaining duty cycle time in seconds""" + current_usage = get_current_duty_cycle_usage(state) + return max(0, DUTY_CYCLE_MAX_ON_TIME - current_usage) + +@task +def record_usage_period(state, start_time, end_time, duration): + """Record a usage period in the duty cycle tracking""" + state["usage_periods"].append([start_time, end_time, duration]) + return state + +@task +def check_timing_limits(state, required_time): + """Check if the movement is within duty cycle limits using sliding window""" + logger = get_run_logger() + + # Clean old periods and get current usage + state = clean_old_usage_periods(state) + current_usage = get_current_duty_cycle_usage(state) + + # Check continuous runtime limit + if required_time > MAX_CONTINUOUS_RUNTIME: + raise ValueError(f"Movement duration {required_time:.1f}s exceeds maximum continuous runtime of {MAX_CONTINUOUS_RUNTIME}s") + + # Check if adding this movement would exceed the duty cycle limit + if current_usage + required_time > DUTY_CYCLE_MAX_ON_TIME: + remaining_time = DUTY_CYCLE_MAX_ON_TIME - current_usage + raise ValueError(f"Movement would exceed {DUTY_CYCLE_PERCENTAGE*100:.0f}% duty cycle limit. Current usage: {current_usage:.1f}s, Remaining: {remaining_time:.1f}s in {DUTY_CYCLE_PERIOD}s window") + + logger.info(f"Duty cycle OK: {current_usage:.1f}s + {required_time:.1f}s <= {DUTY_CYCLE_MAX_ON_TIME}s ({DUTY_CYCLE_PERCENTAGE*100:.0f}% of {DUTY_CYCLE_PERIOD}s)") + return True, state + +@task +def move_up(up_time): + """Execute upward movement for specified time with duty cycle tracking""" + logger = get_run_logger() + logger.info(f"Moving UP for {up_time:.1f} seconds...") + + release_up() + start_time = time.time() + press_up() + time.sleep(up_time) + release_up() + end_time = time.time() + actual_duration = end_time - start_time + + logger.info(f"UP movement completed: {actual_duration:.1f}s actual") + return start_time, end_time, actual_duration + +@task +def move_down(down_time): + """Execute downward movement for specified time with duty cycle tracking""" + logger = get_run_logger() + logger.info(f"Moving DOWN for {down_time:.1f} seconds...") + + release_down() + start_time = time.time() + press_down() + time.sleep(down_time) + release_down() + end_time = time.time() + actual_duration = end_time - start_time + + logger.info(f"DOWN movement completed: {actual_duration:.1f}s actual") + return start_time, end_time, actual_duration + +@flow +def move_to_height_flow(target_height: float, current_height: float): + """Main flow to move desk to target height with safety checks""" + logger = get_run_logger() + + # Validate height range + if not (LOWEST_HEIGHT <= target_height <= HIGHEST_HEIGHT): + raise ValueError(f"Target height {target_height}'' is out of range.") + + # Setup GPIO + setup_gpio() + + try: + # Load current state + state = load_state() + + # Calculate movement requirements + delta = target_height - current_height + if abs(delta) < 0.01: + logger.info(f"Already at {target_height}'' (within tolerance). No movement needed.") + return + + if delta > 0: + # Moving up + up_time = delta / UP_RATE + + # Check timing limits + is_valid, updated_state = check_timing_limits(state, up_time) + state = updated_state + + # Execute movement and get actual timing + start_time, end_time, actual_duration = move_up(up_time) + + # Record the usage period and update state + state = record_usage_period(state, start_time, end_time, actual_duration) + state["total_up_time"] += actual_duration + else: + # Moving down + down_time = abs(delta) / DOWN_RATE + + # Check timing limits + is_valid, updated_state = check_timing_limits(state, down_time) + state = updated_state + + # Execute movement and get actual timing + start_time, end_time, actual_duration = move_down(down_time) + + # Record the usage period (down time counts toward duty cycle but not total_up_time) + state = record_usage_period(state, start_time, end_time, actual_duration) + + # Update position and save state + state["last_position"] = target_height + save_state(state) + + # Get current duty cycle info for logging + current_usage = get_current_duty_cycle_usage(state) + remaining_time = get_remaining_duty_time(state) + + logger.info(f"Arrived at {target_height}'' (approximate). State saved.") + logger.info(f"Duty cycle usage: {current_usage:.1f}s / {DUTY_CYCLE_MAX_ON_TIME}s ({current_usage/DUTY_CYCLE_MAX_ON_TIME*100:.1f}%)") + logger.info(f"Remaining duty time: {remaining_time:.1f}s") + logger.info(f"Total up time: {state['total_up_time']:.1f}s") + + finally: + # Always clean up GPIO + cleanup_gpio() + +@flow +def custom_test_sequence(): + """Custom flow: Start at lowest, move up 0.5 inches, rest 10 seconds, move down 0.5 inches""" + start_height = LOWEST_HEIGHT + up_target = start_height + 0.5 + + print("Starting custom test sequence...") + print(f"Starting at: {start_height}\"") + print(f"Will move to: {up_target}\"") + print(f"Then rest for 10 seconds") + print(f"Then return to: {start_height}\"") + + # Phase 1: Move up 0.5 inches + print("\n--- Phase 1: Moving UP 0.5 inches ---") + move_to_height_flow(up_target, start_height) + + # Phase 2: Rest for 10 seconds + print("\n--- Phase 2: Resting for 10 seconds ---") + time.sleep(10) + print("Rest complete.") + + # Phase 3: Move down 0.5 inches (back to lowest) + print("\n--- Phase 3: Moving DOWN 0.5 inches ---") + move_to_height_flow(start_height, up_target) + + print("\nCustom test sequence complete!") + +@flow +def desk_control_cli(): + """CLI interface for desk control""" + try: + current = float(input(f"Enter current height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) + target = float(input(f"Enter target height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) + move_to_height_flow(target, current) + except ValueError as e: + print(f"Error: {e}") + except KeyboardInterrupt: + print("\nOperation cancelled.") + +if __name__ == "__main__": + import sys + from prefect import flow + + if len(sys.argv) > 1 and sys.argv[1] == "deploy": + # Deploy the custom test sequence to run at scheduled times + custom_test_sequence.from_source( + source=".", + entrypoint="scripts/desk_control_prefect.py:custom_test_sequence", + ).deploy( + name="desk-lifter-test-sequence-1139pm-toronto", + work_pool_name="default-agent-pool", + cron="39 4 * * *", # Run daily at 11:39 PM Toronto time (4:39 AM UTC) + ) + print("Deployment created! Run 'prefect worker start --pool default-agent-pool' to execute scheduled flows.") + elif len(sys.argv) > 1 and sys.argv[1] == "test": + custom_test_sequence() + else: + desk_control_cli() \ No newline at end of file diff --git a/scripts/desk_controller.py b/scripts/desk_controller.py new file mode 100644 index 0000000..d4ab89f --- /dev/null +++ b/scripts/desk_controller.py @@ -0,0 +1,503 @@ +""" +High-level desk controller with height management and safety checks. + +Combines movement control and duty cycle management to provide safe desk operations. +Handles height calculations, movement planning, and state management. +""" + +from typing import Optional +from duty_cycle import ( + check_movement_against_duty_cycle, + record_usage_period, + get_duty_cycle_status, + get_current_duty_cycle_usage, + show_duty_cycle_status, + load_state, + save_state, + DUTY_CYCLE_MAX_ON_TIME, + DUTY_CYCLE_PERIOD +) +from movement_control import setup_gpio, cleanup_gpio, move_up, move_down + + +def check_duty_cycle_status_before_execution() -> dict: + """ + Check current duty cycle status before executing movements. + Returns comprehensive status information for decision making. + + Returns: + dict: { + "current_usage": float, # Seconds used in current window + "remaining_capacity": float, # Seconds remaining + "percentage_used": float, # Percentage of duty cycle used + "max_single_movement": float, # Max movement time within height limits + "movements_possible": int, # Est. number of max movements possible + "current_position": float, # Current desk position + "window_period": int, # Duty cycle window (1200s) + "recommendations": list # Usage recommendations + } + """ + print("=== PRE-EXECUTION DUTY CYCLE STATUS CHECK ===") + + state = load_state() + current_usage = get_current_duty_cycle_usage(state) + remaining_capacity = DUTY_CYCLE_MAX_ON_TIME - current_usage + percentage_used = (current_usage / DUTY_CYCLE_MAX_ON_TIME) * 100 + current_position = state.get("last_position", 24.0) + + # Calculate max possible movement within height range [23.7-54.5] + height_range_max = 54.5 - 23.7 # 30.8 inches max movement + max_single_movement = height_range_max / 4.8 # 6.4 seconds + + # Estimate how many max movements are possible + movements_possible = int(remaining_capacity / max_single_movement) if remaining_capacity > 0 else 0 + + # Generate recommendations + recommendations = [] + if remaining_capacity < 10: + recommendations.append("⚠️ Very low duty cycle remaining - consider waiting") + elif remaining_capacity < 30: + recommendations.append("⚠️ Low duty cycle remaining - use small movements only") + elif percentage_used > 80: + recommendations.append("⚠️ High duty cycle usage - plan movements carefully") + else: + recommendations.append("✅ Good duty cycle capacity available") + + if movements_possible == 0: + recommendations.append("❌ No large movements possible - only small adjustments") + elif movements_possible < 3: + recommendations.append(f"⚠️ Only ~{movements_possible} large movements possible") + else: + recommendations.append(f"✅ ~{movements_possible} large movements possible") + + # Display status + print(f"Current usage: {current_usage:.1f}s / {DUTY_CYCLE_MAX_ON_TIME}s ({percentage_used:.1f}%)") + print(f"Remaining capacity: {remaining_capacity:.1f}s") + print(f"Current position: {current_position}\"") + print(f"Max single movement: {max_single_movement:.1f}s (within height range)") + print(f"Estimated large movements possible: {movements_possible}") + print() + print("Recommendations:") + for rec in recommendations: + print(f" {rec}") + print() + + return { + "current_usage": current_usage, + "remaining_capacity": remaining_capacity, + "percentage_used": percentage_used, + "max_single_movement": max_single_movement, + "movements_possible": movements_possible, + "current_position": current_position, + "window_period": DUTY_CYCLE_PERIOD, + "recommendations": recommendations + } + + +def generate_safe_movement_suggestions(max_movements: int = 5) -> list: + """ + Generate safe movement suggestions based on current duty cycle status. + + Args: + max_movements: Maximum number of movements to suggest + + Returns: + list: List of suggested movements within safety limits + """ + status = check_duty_cycle_status_before_execution() + + current_pos = status["current_position"] + remaining_capacity = status["remaining_capacity"] + max_single_time = status["max_single_movement"] + + suggestions = [] + + if remaining_capacity < 5: + # Only tiny movements + suggestions.append({ + "id": "tiny_up", + "description": f"Tiny up movement: {current_pos}\" → {current_pos + 1:.1f}\" (0.2s)", + "target_height": current_pos + 1.0, + "current_height": current_pos, + "enabled": True + }) + suggestions.append({ + "id": "tiny_down", + "description": f"Tiny down movement: {current_pos}\" → {current_pos - 1:.1f}\" (0.2s)", + "target_height": current_pos - 1.0, + "current_height": current_pos, + "enabled": True + }) + elif remaining_capacity < 15: + # Small movements only + for i in range(min(max_movements, 3)): + up_target = min(current_pos + 5, 54.0) + down_target = max(current_pos - 5, 24.0) + + suggestions.append({ + "id": f"small_movement_{i+1}", + "description": f"Small movement: {current_pos}\" → {up_target}\" (1.0s)", + "target_height": up_target, + "current_height": current_pos, + "enabled": True + }) + current_pos = up_target + else: + # Can do larger movements + positions = [30.0, 45.0, 35.0, 50.0, 25.0, 40.0] + + for i in range(min(max_movements, len(positions))): + target = positions[i] + if 23.7 <= target <= 54.5: # Within safe range + estimated_time = abs(target - current_pos) / 4.8 + + if estimated_time <= remaining_capacity: + suggestions.append({ + "id": f"suggested_move_{i+1}", + "description": f"Suggested movement: {current_pos:.1f}\" → {target}\" ({estimated_time:.1f}s)", + "target_height": target, + "current_height": current_pos, + "enabled": True + }) + current_pos = target + remaining_capacity -= estimated_time + + return suggestions + +try: + from constants import LOWEST_HEIGHT, HIGHEST_HEIGHT, UP_RATE, DOWN_RATE +except ImportError: + # Fallback values if constants not available + LOWEST_HEIGHT = 23.7 # inches + HIGHEST_HEIGHT = 54.5 # inches + UP_RATE = 0.54 # inches per second + DOWN_RATE = 0.55 # inches per second + + +def move_to_height(target_height: float, current_height: Optional[float] = None) -> dict: + """ + Move desk to target height with safety checks and duty cycle enforcement + + Args: + target_height: Desired height in inches + current_height: Current height in inches (if None, uses last known position) + + Returns: + dict with movement results and status information + """ + # Validate height range + if not (LOWEST_HEIGHT <= target_height <= HIGHEST_HEIGHT): + raise ValueError(f"Target height {target_height}'' is out of range [{LOWEST_HEIGHT}-{HIGHEST_HEIGHT}].") + + # Setup GPIO + setup_gpio() + + try: + # Load current state + state = load_state() + + # Determine current height + if current_height is None: + if state["last_position"] is None: + raise ValueError("No current height provided and no last known position in state file.") + current_height = state["last_position"] + + # Calculate movement requirements + delta = target_height - current_height + if abs(delta) < 0.01: + print(f"Already at {target_height}'' (within tolerance). No movement needed.") + return { + "success": True, + "movement": "none", + "message": f"Already at target height {target_height}''", + "duty_cycle": get_duty_cycle_status(state) + } + + if delta > 0: + # Moving up + required_time = delta / UP_RATE + direction = "up" + + # Check duty cycle limits using the new function + check_result = check_movement_against_duty_cycle(target_height, current_height, UP_RATE, DOWN_RATE) + + if not check_result["allowed"]: + raise ValueError(check_result["error"]) + + print(f"Duty cycle OK: {check_result['current_usage']:.1f}s + {check_result['estimated_duration']:.1f}s <= {DUTY_CYCLE_MAX_ON_TIME}s") + + # Execute movement and get actual timing + start_time, end_time, actual_duration = move_up(required_time) + + # Record the usage period and update state + state = record_usage_period(state, start_time, end_time, actual_duration) + state["total_up_time"] += actual_duration + else: + # Moving down + required_time = abs(delta) / DOWN_RATE + direction = "down" + + # Check duty cycle limits using the new function + check_result = check_movement_against_duty_cycle(target_height, current_height, UP_RATE, DOWN_RATE) + + if not check_result["allowed"]: + raise ValueError(check_result["error"]) + + print(f"Duty cycle OK: {check_result['current_usage']:.1f}s + {check_result['estimated_duration']:.1f}s <= {DUTY_CYCLE_MAX_ON_TIME}s") + + # Execute movement and get actual timing + start_time, end_time, actual_duration = move_down(required_time) + + # Record the usage period (down time counts toward duty cycle but not total_up_time) + state = record_usage_period(state, start_time, end_time, actual_duration) + + # Update position and save state + state["last_position"] = target_height + save_state(state) + + # Get final duty cycle info + duty_status = get_duty_cycle_status(state) + + print(f"Arrived at {target_height}'' (approximate). State saved.") + print(f"Duty cycle usage: {duty_status['current_usage']:.1f}s / {duty_status['max_usage']}s ({duty_status['percentage_used']:.1f}%)") + print(f"Remaining duty time: {duty_status['remaining_time']:.1f}s") + print(f"Total up time: {state['total_up_time']:.1f}s") + + return { + "success": True, + "movement": direction, + "start_height": current_height, + "end_height": target_height, + "distance": abs(delta), + "duration": actual_duration, + "duty_cycle": duty_status, + "total_up_time": state["total_up_time"] + } + + except Exception as e: + print(f"Error during movement: {e}") + return { + "success": False, + "error": str(e), + "duty_cycle": get_duty_cycle_status(load_state()) + } + + finally: + # Always clean up GPIO + cleanup_gpio() + + +def test_sequence(movement_distance: float = 0.5, rest_time: float = 10.0) -> dict: + """ + Execute a test sequence: move up, rest, move down + + Args: + movement_distance: Distance to move in inches + rest_time: Time to rest between movements in seconds + + Returns: + dict with test results + """ + start_height = LOWEST_HEIGHT + up_target = start_height + movement_distance + + print("Starting test sequence...") + print(f"Starting at: {start_height}\"") + print(f"Will move to: {up_target}\"") + print(f"Then rest for {rest_time} seconds") + print(f"Then return to: {start_height}\"") + + results = [] + + # Phase 1: Move up + print(f"\n--- Phase 1: Moving UP {movement_distance} inches ---") + result1 = move_to_height(up_target, start_height) + results.append(result1) + + if not result1["success"]: + return {"success": False, "phase": 1, "error": result1["error"]} + + # Phase 2: Rest + print(f"\n--- Phase 2: Resting for {rest_time} seconds ---") + import time + time.sleep(rest_time) + print("Rest complete.") + + # Phase 3: Move down + print(f"\n--- Phase 3: Moving DOWN {movement_distance} inches ---") + result2 = move_to_height(start_height, up_target) + results.append(result2) + + if not result2["success"]: + return {"success": False, "phase": 3, "error": result2["error"]} + + print("\nTest sequence complete!") + + return { + "success": True, + "results": results, + "total_duration": sum(r.get("duration", 0) for r in results if r["success"]), + "final_duty_cycle": results[-1]["duty_cycle"] if results else None + } + + +def load_movement_configs(config_file: str = "movement_configs.json") -> list: + """Load movement configurations from JSON file""" + import json + import os + + print(f"Loading movement configurations from {config_file}") + + if not os.path.exists(config_file): + raise FileNotFoundError(f"Configuration file {config_file} not found") + + with open(config_file, 'r') as f: + config = json.load(f) + + # Filter for enabled movements only + enabled_movements = [m for m in config.get("movements", []) if m.get("enabled", True)] + + print(f"Found {len(enabled_movements)} enabled movements") + return enabled_movements + + +def validate_movement_config(movement: dict) -> dict: + """Validate a movement configuration before execution""" + movement_id = movement.get("id", "unknown") + target_height = movement["target_height"] + current_height = movement.get("current_height") + + print(f"Validating movement {movement_id}: {current_height}\" → {target_height}\"") + + # Check duty cycle limits + check_result = check_movement_against_duty_cycle(target_height, current_height) + + if not check_result["allowed"]: + error_msg = f"Movement {movement_id} rejected: {check_result['error']}" + print(f"❌ {error_msg}") + raise ValueError(error_msg) + + print(f"✅ Movement {movement_id} validated: {check_result['estimated_duration']:.1f}s, {check_result['movement_type']}") + return check_result + + +def execute_movement_config(movement: dict) -> dict: + """Execute a movement from configuration""" + movement_id = movement.get("id", "unknown") + target_height = movement["target_height"] + current_height = movement.get("current_height") + + print(f"Executing configured movement {movement_id}: {movement.get('description', '')}") + + result = move_to_height(target_height, current_height) + + if result["success"]: + print(f"✅ Movement {movement_id} completed: {result['duration']:.1f}s, final height: {result['end_height']}\"") + else: + print(f"❌ Movement {movement_id} failed: {result['error']}") + raise ValueError(result["error"]) + + return result + + +def execute_custom_movements(config_file: str = "movement_configs.json") -> dict: + """Execute custom movements from configuration file""" + print("=== CUSTOM MOVEMENTS EXECUTION ===") + + # ALWAYS check duty cycle status before execution + duty_status = check_duty_cycle_status_before_execution() + + # If very low capacity, warn and potentially abort + if duty_status["remaining_capacity"] < 1.0: + print("❌ EXECUTION ABORTED: Insufficient duty cycle capacity remaining") + return { + "success": False, + "error": "Insufficient duty cycle capacity", + "duty_status": duty_status + } + + # Load movement configurations + print("Loading movement configurations from movement_configs.json") + movements = load_movement_configs(config_file) + + if not movements: + print("⚠️ No enabled movements found in configuration") + return {"success": False, "error": "No movements to execute"} + + results = [] + + for movement in movements: + movement_id = movement.get("id", "unknown") + print(f"\nProcessing movement: {movement_id}") + + try: + # Validate movement first + validation_result = validate_movement_config(movement) + + # Execute the movement if validation passed + execution_result = execute_movement_config(movement) + + results.append({ + "movement_id": movement_id, + "success": True, + "validation": validation_result, + "execution": execution_result + }) + + except Exception as e: + print(f"❌ Movement {movement_id} failed: {str(e)}") + results.append({ + "movement_id": movement_id, + "success": False, + "error": str(e) + }) + # Continue with remaining movements + + successful_movements = [r for r in results if r["success"]] + failed_movements = [r for r in results if not r["success"]] + + print(f"\n=== EXECUTION SUMMARY ===") + print(f"Total movements: {len(results)}") + print(f"Successful: {len(successful_movements)}") + print(f"Failed: {len(failed_movements)}") + + # Show final duty cycle status + print(f"\n=== FINAL DUTY CYCLE STATUS ===") + final_status = check_duty_cycle_status_before_execution() + + return { + "success": len(failed_movements) == 0, + "total_movements": len(results), + "successful": len(successful_movements), + "failed": len(failed_movements), + "results": results, + "initial_duty_status": duty_status, + "final_duty_status": final_status + } + + +def cli_interface(): + """Command-line interface for desk control""" + try: + current = float(input(f"Enter current height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) + target = float(input(f"Enter target height in inches ({LOWEST_HEIGHT}-{HIGHEST_HEIGHT}): ")) + result = move_to_height(target, current) + + if result["success"]: + print("Movement completed successfully!") + else: + print(f"Movement failed: {result['error']}") + + except ValueError as e: + print(f"Error: {e}") + except KeyboardInterrupt: + print("\nOperation cancelled.") + + +if __name__ == "__main__": + import sys + + if len(sys.argv) > 1 and sys.argv[1] == "test": + test_sequence() + else: + cli_interface() \ No newline at end of file diff --git a/scripts/duty_cycle.py b/scripts/duty_cycle.py new file mode 100644 index 0000000..5d3a6c4 --- /dev/null +++ b/scripts/duty_cycle.py @@ -0,0 +1,205 @@ +""" +Duty cycle management for motor protection. + +Implements a 10% duty cycle (2 minutes on, 18 minutes off) using a sliding window approach. +Tracks individual usage periods and enforces both continuous runtime and total usage limits. +""" + +import time +import json +import os +from constants import LOWEST_HEIGHT +from datetime import datetime +from typing import List, Tuple, Dict, Any, Optional + +# Duty cycle constants +DUTY_CYCLE_PERIOD = 1200 # 20 minutes in seconds +DUTY_CYCLE_MAX_ON_TIME = 120 # 2 minutes in seconds (10% of 20 minutes) +DUTY_CYCLE_PERCENTAGE = 0.10 # 10% duty cycle +MAX_CONTINUOUS_RUNTIME = 30 # Maximum continuous movement time in seconds + +STATE_FILE = "lifter_state.json" + + +def load_state(): + """Load the current state from file""" + try: + with open(STATE_FILE, "r") as f: + state = json.load(f) + + # Ensure all required keys exist with proper defaults + if "usage_periods" not in state: + state["usage_periods"] = [] + if "last_position" not in state: + state["last_position"] = LOWEST_HEIGHT # Default to minimum height + if "total_up_time" not in state: + state["total_up_time"] = 0.0 + + return state + except FileNotFoundError: + # Return default state if file doesn't exist + return { + "usage_periods": [], + "last_position": LOWEST_HEIGHT, + "total_up_time": 0.0 + } + + +def save_state(state: Dict[str, Any]) -> None: + """Save state to JSON file""" + with open(STATE_FILE, 'w') as f: + json.dump(state, f, indent=2) + + +def clean_old_usage_periods(state: Dict[str, Any]) -> Dict[str, Any]: + """Remove usage periods older than the duty cycle period""" + current_time = time.time() + cutoff_time = current_time - DUTY_CYCLE_PERIOD + + # Keep only periods that end after the cutoff time + state["usage_periods"] = [ + period for period in state["usage_periods"] + if period[1] > cutoff_time # period[1] is end_timestamp + ] + return state + + +def get_current_duty_cycle_usage(state: Dict[str, Any]) -> float: + """Calculate current duty cycle usage in the sliding window""" + clean_old_usage_periods(state) + current_time = time.time() + + total_usage = 0.0 + for start_time, end_time, duration in state["usage_periods"]: + # Only count usage that's within the duty cycle period + window_start = current_time - DUTY_CYCLE_PERIOD + + # Adjust start and end times to the current window + effective_start = max(start_time, window_start) + effective_end = min(end_time, current_time) + + if effective_end > effective_start: + total_usage += effective_end - effective_start + + return total_usage + + +def record_usage_period(state: Dict[str, Any], start_time: float, end_time: float, duration: float) -> Dict[str, Any]: + """Record a usage period in the duty cycle tracking""" + state["usage_periods"].append([start_time, end_time, duration]) + return state + + +def check_movement_against_duty_cycle(target_height: float, current_height: Optional[float] = None, up_rate: float = 4.8, down_rate: float = 4.8) -> dict: + """ + Check if a movement to a target height would exceed duty cycle limits. + + Args: + target_height: Target height in mm/inches + current_height: Current height (if None, loads from state) + up_rate: Movement rate upward (mm/s or inches/s) + down_rate: Movement rate downward (mm/s or inches/s) + + Returns: + dict: { + "allowed": bool, + "error": str or None, + "estimated_duration": float, + "current_usage": float, + "remaining_capacity": float, + "movement_type": "UP" or "DOWN", + "distance": float + } + """ + # Load current state + state = load_state() + + if current_height is None: + current_height = state.get("last_position", LOWEST_HEIGHT) + + # Calculate movement requirements + distance = abs(target_height - current_height) + movement_type = "UP" if target_height > current_height else "DOWN" + rate = up_rate if movement_type == "UP" else down_rate + + if distance == 0: + return { + "allowed": True, + "error": None, + "estimated_duration": 0.0, + "current_usage": get_current_duty_cycle_usage(state), + "remaining_capacity": DUTY_CYCLE_MAX_ON_TIME - get_current_duty_cycle_usage(state), + "movement_type": movement_type, + "distance": distance + } + + estimated_duration = distance / rate + + # Check continuous runtime limit + if estimated_duration > MAX_CONTINUOUS_RUNTIME: + return { + "allowed": False, + "error": f"Movement would take {estimated_duration:.1f}s, exceeding {MAX_CONTINUOUS_RUNTIME}s continuous runtime limit", + "estimated_duration": estimated_duration, + "current_usage": get_current_duty_cycle_usage(state), + "remaining_capacity": DUTY_CYCLE_MAX_ON_TIME - get_current_duty_cycle_usage(state), + "movement_type": movement_type, + "distance": distance + } + + # Check duty cycle limits + current_usage = get_current_duty_cycle_usage(state) + remaining_capacity = DUTY_CYCLE_MAX_ON_TIME - current_usage + + if estimated_duration > remaining_capacity: + return { + "allowed": False, + "error": f"Movement would exceed 10% duty cycle limit. Current usage: {current_usage:.1f}s, Remaining: {remaining_capacity:.1f}s in {DUTY_CYCLE_PERIOD:.0f}s window", + "estimated_duration": estimated_duration, + "current_usage": current_usage, + "remaining_capacity": remaining_capacity, + "movement_type": movement_type, + "distance": distance + } + + return { + "allowed": True, + "error": None, + "estimated_duration": estimated_duration, + "current_usage": current_usage, + "remaining_capacity": remaining_capacity, + "movement_type": movement_type, + "distance": distance + } + + +def get_duty_cycle_status(state: Dict[str, Any]) -> Dict[str, float]: + """Get current duty cycle status information""" + current_usage = get_current_duty_cycle_usage(state) + remaining_time = max(0, DUTY_CYCLE_MAX_ON_TIME - current_usage) + percentage_used = current_usage / DUTY_CYCLE_MAX_ON_TIME * 100 + + return { + "current_usage": current_usage, + "max_usage": DUTY_CYCLE_MAX_ON_TIME, + "remaining_time": remaining_time, + "percentage_used": percentage_used, + "window_period": DUTY_CYCLE_PERIOD + } + + +def show_duty_cycle_status(): + """Display current duty cycle status in a user-friendly format""" + state = load_state() + status = get_duty_cycle_status(state) + current_usage = get_current_duty_cycle_usage(state) + + print("Current Duty Cycle Status:") + print(f" Current usage: {current_usage:.2f}s / {status['max_usage']}s") + print(f" Percentage used: {status['percentage_used']:.2f}%") + print(f" Remaining time: {status['remaining_time']:.2f}s") + print(f" Window period: {status['window_period']}s ({status['window_period']/60:.0f} minutes)") + + if len(state.get("usage_periods", [])) > 0: + print(f" Recent usage periods: {len(state['usage_periods'])}") + print(f" Total up time (all time): {state.get('total_up_time', 0):.1f}s") \ No newline at end of file diff --git a/scripts/generate_movements.py b/scripts/generate_movements.py new file mode 100644 index 0000000..5a608d7 --- /dev/null +++ b/scripts/generate_movements.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 +""" +Generate movement configurations based on current duty cycle status. + +This utility checks the current duty cycle usage and generates appropriate +movement configurations that will demonstrate both successful movements +and duty cycle limit protection. +""" + +import json +from desk_controller import check_duty_cycle_status_before_execution, generate_safe_movement_suggestions + +def generate_duty_cycle_test_config(output_file: str = "movement_configs.json"): + """ + Generate movement configurations that will test duty cycle limits. + + Creates movements that: + 1. Respect the 30-second continuous runtime limit + 2. Use available capacity efficiently + 3. Demonstrate successful movements within limits + 4. Show duty cycle protection when limits are exceeded + """ + + print("=== GENERATING MOVEMENT CONFIGS BASED ON CURRENT DUTY CYCLE ===") + + # Check current status + status = check_duty_cycle_status_before_execution() + + remaining = status["remaining_capacity"] + current_pos = status["current_position"] + max_movement_time = status["max_single_movement"] + + # IMPORTANT: Respect 30-second continuous runtime limit + MAX_CONTINUOUS_TIME = 30.0 + max_safe_distance = MAX_CONTINUOUS_TIME * 4.8 # 144 inches + + # But also respect height range [23.7-54.5] + max_range_distance = 54.5 - 23.7 # 30.8 inches + practical_max_distance = min(max_safe_distance, max_range_distance) # 30.8 inches + practical_max_time = practical_max_distance / 4.8 # 6.4 seconds + + print(f"Max distance by continuous runtime: {max_safe_distance:.1f} inches ({MAX_CONTINUOUS_TIME}s)") + print(f"Max distance by height range: {max_range_distance:.1f} inches") + print(f"Practical max distance: {practical_max_distance:.1f} inches ({practical_max_time:.1f}s)") + + # Calculate how many practical movements we can do + full_movements_possible = int(remaining / practical_max_time) + + movements = [] + + if remaining < 5: + print("Very low capacity - generating minimal movements") + movements = [ + { + "id": "minimal_test", + "description": f"Minimal movement due to low capacity ({remaining:.1f}s remaining)", + "target_height": min(current_pos + 2.0, 54.0), + "current_height": current_pos, + "enabled": True + } + ] + else: + print(f"Generating {full_movements_possible + 2} movements to test duty cycle limits") + + # Generate movements that respect both limits + pos = current_pos + + for i in range(full_movements_possible): + # Alternate between small and medium movements within safe range + if i % 2 == 0: + # Medium movement up (within 30.8 inch limit) + distance = min(15.0, 54.0 - pos) # 15 inches = 3.1s + target = min(54.0, pos + distance) + else: + # Medium movement down + distance = min(15.0, pos - 24.0) # 15 inches = 3.1s + target = max(24.0, pos - distance) + + actual_distance = abs(target - pos) + time_est = actual_distance / 4.8 + + movements.append({ + "id": f"success_move_{i+1:02d}", + "description": f"SUCCESS: {pos:.1f}→{target:.1f}\" ({actual_distance:.1f}in = {time_est:.1f}s)", + "target_height": target, + "current_height": pos, + "enabled": True + }) + pos = target + + # Add movements that should fail due to duty cycle (not continuous runtime) + # These will be small enough to pass continuous runtime but exceed duty cycle + movements.extend([ + { + "id": "fail_duty_cycle_1", + "description": f"FAIL: Should exceed duty cycle limit (small movement but no capacity)", + "target_height": min(pos + 10.0, 54.0), # Small 10-inch movement = 2.1s + "current_height": pos, + "enabled": True + }, + { + "id": "fail_duty_cycle_2", + "description": f"FAIL: Should definitely exceed duty cycle limit", + "target_height": max(pos - 10.0, 24.0), # Small 10-inch movement = 2.1s + "current_height": pos, + "enabled": True + } + ]) + + config = {"movements": movements} + + # Save to file + with open(output_file, 'w') as f: + json.dump(config, f, indent=2) + + print(f"\n✅ Generated {len(movements)} movements in {output_file}") + print(f"Expected: {full_movements_possible} successes, {len(movements) - full_movements_possible} duty cycle failures") + print(f"All movements respect 30s continuous runtime limit") + + return config + +if __name__ == "__main__": + generate_duty_cycle_test_config() \ No newline at end of file diff --git a/scripts/lifter_calibration.txt b/scripts/lifter_calibration.txt new file mode 100644 index 0000000..c75dfff --- /dev/null +++ b/scripts/lifter_calibration.txt @@ -0,0 +1,15 @@ +# Desk Lifter Calibration Data + +Lowest height: 23.7 inches +Highest height: 54.5 inches + +Down movement rate: 0.55 inches/second (measured from 54.5" to 49" in 10s) +Up movement rate: 0.55 inches/second (measured from 23.7" to 29.1" in 10s) + +Calibration measurements: +- Down 10s: 54.5" → 49.0" +- Up 10s: 23.7" → 29.1" + +Notes: +- Use these rates to estimate time needed to reach a target height from the lowest position. +- Rates may vary slightly; repeat measurements for higher accuracy if needed. diff --git a/scripts/lifter_state.json b/scripts/lifter_state.json new file mode 100644 index 0000000..d4c18d1 --- /dev/null +++ b/scripts/lifter_state.json @@ -0,0 +1,161 @@ +{ + "last_position": 30.0, + "total_up_time": 184.9553032875061, + "usage_periods": [ + [ + 1763494756.4042995, + 1763494786.4042995, + 30.0 + ], + [ + 1763494956.4042995, + 1763494981.4042995, + 25.0 + ], + [ + 1763495156.4042995, + 1763495176.4042995, + 20.0 + ], + [ + 1763495356.4042995, + 1763495381.4042995, + 25.0 + ], + [ + 1763495856.405774, + 1763495857.0423234, + 0.6365492343902588 + ], + [ + 1763497574.8526337, + 1763497576.6710057, + 1.8183720111846924 + ], + [ + 1763497617.6946945, + 1763497621.3312733, + 3.6365787982940674 + ], + [ + 1763497928.1398637, + 1763497931.776416, + 3.636552333831787 + ], + [ + 1763498408.447813, + 1763498426.9665084, + 18.51869535446167 + ], + [ + 1763498863.0324163, + 1763498890.8104022, + 27.77798581123352 + ], + [ + 1763498890.9123645, + 1763498918.1852903, + 27.272925853729248 + ], + [ + 1763498918.2872386, + 1763498946.0652158, + 27.777977228164673 + ], + [ + 1763498947.3871467, + 1763498954.7947295, + 7.407582759857178 + ], + [ + 1763499185.2448037, + 1763499188.8813705, + 3.6365668773651123 + ], + [ + 1763499952.5517168, + 1763499954.4037511, + 1.852034330368042 + ], + [ + 1763500576.6500714, + 1763500578.5023472, + 1.8522758483886719 + ], + [ + 1763500585.8909063, + 1763500587.7429347, + 1.8520283699035645 + ], + [ + 1763500854.9809864, + 1763500856.8330512, + 1.852064847946167 + ], + [ + 1763500856.934996, + 1763500860.6388907, + 3.703894853591919 + ], + [ + 1763501062.8640413, + 1763501064.716086, + 1.8520445823669434 + ], + [ + 1763501064.817657, + 1763501068.521543, + 3.703886032104492 + ], + [ + 1763501188.6183019, + 1763501195.8912296, + 7.272927761077881 + ], + [ + 1763501195.9933672, + 1763501207.1046762, + 11.111309051513672 + ], + [ + 1763501207.2066073, + 1763501214.4795344, + 7.2729270458221436 + ], + [ + 1763501388.0313075, + 1763501389.883359, + 1.8520514965057373 + ], + [ + 1763501390.1899018, + 1763501419.8197086, + 29.629806756973267 + ], + [ + 1763501504.581304, + 1763501508.2851863, + 3.7038822174072266 + ], + [ + 1763501521.9692283, + 1763501525.6731281, + 3.70389986038208 + ], + [ + 1763501599.0091352, + 1763501602.7130191, + 3.7038838863372803 + ], + [ + 1763502238.5140986, + 1763502242.1506405, + 3.6365418434143066 + ], + [ + 1763502260.9601638, + 1763502264.5967178, + 3.636554002761841 + ] + ] +} \ No newline at end of file diff --git a/scripts/main.py b/scripts/main.py new file mode 100644 index 0000000..9bc7785 --- /dev/null +++ b/scripts/main.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python3 +""" +Main entry point for the desk control system. + +This script provides a unified interface to all desk control functionality: +- Direct movement control +- Test sequences +- Prefect deployment and automation +- Duty cycle monitoring + +Usage Examples: + python main.py # Interactive CLI + python main.py test # Run test sequence + python main.py deploy # Deploy Prefect automation + python main.py deploy --immediate # Deploy immediate test + python main.py deploy --movements # Deploy custom movements flow + python main.py move # Move between heights + python main.py custom-movements # Execute custom movements from JSON + python main.py status # Show duty cycle status +""" + +import sys +import os + +# Add the scripts directory to the path for imports +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +# Import from the modular files +from desk_controller import move_to_height, test_sequence, cli_interface, execute_custom_movements +from duty_cycle import get_duty_cycle_status, show_duty_cycle_status, load_state + + +def show_help(): + """Display help information""" + print(__doc__) + + +def main(): + """Main entry point""" + if len(sys.argv) == 1: + # No arguments - run interactive CLI + cli_interface() + + elif sys.argv[1] == "help" or sys.argv[1] == "--help" or sys.argv[1] == "-h": + show_help() + + elif sys.argv[1] == "test": + # Run test sequence + distance = 0.5 # default + rest_time = 10.0 # default + + if len(sys.argv) > 2: + distance = float(sys.argv[2]) + if len(sys.argv) > 3: + rest_time = float(sys.argv[3]) + + result = test_sequence(distance, rest_time) + if not result["success"]: + print("Failed to deploy flows to Prefect Cloud") + sys.exit(1) + + elif sys.argv[1] == "move": + # Direct movement command + if len(sys.argv) < 4: + print("Usage: python main.py move ") + sys.exit(1) + + current_height = float(sys.argv[2]) + target_height = float(sys.argv[3]) + + result = move_to_height(target_height, current_height) + if not result["success"]: + print(f"Movement failed: {result['error']}") + sys.exit(1) + + elif sys.argv[1] == "status": + # Show duty cycle status + show_duty_cycle_status() + + elif sys.argv[1] == "deploy": + # Deploy Prefect flows + if "--immediate" in sys.argv: + from prefect_flows import deploy_test_sequence_immediate + deploy_test_sequence_immediate() + elif "--movements" in sys.argv: + from prefect_flows import deploy_custom_movements_flow + deploy_custom_movements_flow() + else: + from prefect_flows import deploy_test_sequence + deploy_test_sequence() + + elif sys.argv[1] == "custom-movements": + # Execute custom movements from JSON configuration + try: + result = execute_custom_movements() + if result["success"]: + print(f"✅ All movements completed successfully ({result['successful']}/{result['total_movements']})") + else: + print(f"❌ Some movements failed ({result['failed']}/{result['total_movements']} failed)") + for failed in [r for r in result['results'] if not r['success']]: + print(f" - {failed['movement_id']}: {failed['error']}") + sys.exit(1) + except Exception as e: + print(f"❌ Custom movements failed: {e}") + sys.exit(1) + + elif sys.argv[1] == "prefect-test": + # Run test sequence via Prefect + try: + from prefect_flows import custom_test_sequence_flow + + distance = 0.5 # default + rest_time = 10.0 # default + + if len(sys.argv) > 2: + distance = float(sys.argv[2]) + if len(sys.argv) > 3: + rest_time = float(sys.argv[3]) + + custom_test_sequence_flow(distance, rest_time) + + except ImportError as e: + print(f"Prefect not available: {e}") + print("Install with: pip install prefect") + sys.exit(1) + + else: + print(f"Unknown command: {sys.argv[1]}") + print("Run 'python main.py help' for usage information") + sys.exit(1) + + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\nOperation cancelled.") + except Exception as e: + print(f"Error: {e}") + sys.exit(1) \ No newline at end of file diff --git a/scripts/movement_configs.json b/scripts/movement_configs.json new file mode 100644 index 0000000..3542eb0 --- /dev/null +++ b/scripts/movement_configs.json @@ -0,0 +1,74 @@ +{ + "movements": [ + { + "id": "step_01", + "description": "Step 1: 30.0->28.0\" (2in down, ~0.4s)", + "target_height": 28.0, + "current_height": 30.0, + "enabled": true + }, + { + "id": "step_02", + "description": "Step 2: 28.0->26.0\" (2in down, ~0.4s)", + "target_height": 26.0, + "current_height": 28.0, + "enabled": true + }, + { + "id": "step_03", + "description": "Step 3: 26.0->24.0\" (2in down, ~0.4s)", + "target_height": 24.0, + "current_height": 26.0, + "enabled": true + }, + { + "id": "step_04", + "description": "Step 4: 24.0->22.0\" (2in down, ~0.4s)", + "target_height": 22.0, + "current_height": 24.0, + "enabled": true + }, + { + "id": "step_05", + "description": "Step 5: 22.0->20.0\" (2in down, ~0.4s)", + "target_height": 20.0, + "current_height": 22.0, + "enabled": true + }, + { + "id": "step_06", + "description": "Step 6: 20.0->18.0\" (2in down, ~0.4s)", + "target_height": 18.0, + "current_height": 20.0, + "enabled": true + }, + { + "id": "step_07", + "description": "Step 7: 18.0->16.0\" (2in down, ~0.4s)", + "target_height": 16.0, + "current_height": 18.0, + "enabled": true + }, + { + "id": "step_08", + "description": "Step 8: 16.0->14.0\" (2in down, ~0.4s)", + "target_height": 14.0, + "current_height": 16.0, + "enabled": true + }, + { + "id": "step_09", + "description": "Step 9: 14.0->12.0\" (2in down, ~0.4s)", + "target_height": 12.0, + "current_height": 14.0, + "enabled": true + }, + { + "id": "step_10", + "description": "Step 10: 12.0->10.0\" (2in down, ~0.4s)", + "target_height": 10.0, + "current_height": 12.0, + "enabled": true + } + ] +} \ No newline at end of file diff --git a/scripts/movement_control.py b/scripts/movement_control.py new file mode 100644 index 0000000..178100c --- /dev/null +++ b/scripts/movement_control.py @@ -0,0 +1,108 @@ +""" +GPIO-based movement control for the desk lifter. + +Handles low-level GPIO operations, pin management, and movement execution. +Provides safe pin control with proper initialization and cleanup. +""" + +import time +from typing import Tuple + +try: + import RPi.GPIO as GPIO + from constants import UP_PIN, DOWN_PIN +except ImportError: + # For testing without actual GPIO hardware + class MockGPIO: + BCM = "BCM" + OUT = "OUT" + IN = "IN" + LOW = 0 + HIGH = 1 + PUD_OFF = "PUD_OFF" + + @staticmethod + def setmode(mode): pass + @staticmethod + def setup(pin, mode, **kwargs): pass + @staticmethod + def cleanup(): pass + + GPIO = MockGPIO() + # Use default pins if constants not available (match constants.py) + UP_PIN = 18 + DOWN_PIN = 17 + + +def setup_gpio() -> None: + """Initialize GPIO settings""" + GPIO.setmode(GPIO.BCM) + + +def release_up() -> None: + """Set UP pin to high-impedance state""" + GPIO.setup(UP_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) + + +def press_up() -> None: + """Set UP pin to drive low (button pressed)""" + GPIO.setup(UP_PIN, GPIO.OUT, initial=GPIO.LOW) + + +def release_down() -> None: + """Set DOWN pin to high-impedance state""" + GPIO.setup(DOWN_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) + + +def press_down() -> None: + """Set DOWN pin to drive low (button pressed)""" + GPIO.setup(DOWN_PIN, GPIO.OUT, initial=GPIO.LOW) + + +def cleanup_gpio() -> None: + """Clean up GPIO resources""" + release_up() + release_down() + GPIO.cleanup() + + +def move_up(up_time: float) -> Tuple[float, float, float]: + """ + Execute upward movement for specified time + + Returns: + (start_time, end_time, actual_duration) + """ + print(f"Moving UP for {up_time:.1f} seconds...") + + release_up() + start_time = time.time() + press_up() + time.sleep(up_time) + release_up() + end_time = time.time() + actual_duration = end_time - start_time + + print(f"UP movement completed: {actual_duration:.1f}s actual") + return start_time, end_time, actual_duration + + +def move_down(down_time: float) -> Tuple[float, float, float]: + """ + Execute downward movement for specified time + + Returns: + (start_time, end_time, actual_duration) + """ + print(f"Moving DOWN for {down_time:.1f} seconds...") + + release_down() + start_time = time.time() + press_down() + time.sleep(down_time) + release_down() + end_time = time.time() + actual_duration = end_time - start_time + + print(f"DOWN movement completed: {actual_duration:.1f}s actual") + return start_time, end_time, actual_duration \ No newline at end of file diff --git a/scripts/prefect_flows.py b/scripts/prefect_flows.py new file mode 100644 index 0000000..05de3fb --- /dev/null +++ b/scripts/prefect_flows.py @@ -0,0 +1,368 @@ +""" +Simplified Prefect flows for automated desk control. + +Provides scheduled automation and workflow orchestration using Prefect. +Uses the comprehensive desk_controller.execute_custom_movements() function. +""" + +import time +import os +import sys +from prefect import flow, task +from prefect.logging import get_run_logger + +# Add the scripts directory to Python path for imports +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +# Import our modular components +from desk_controller import ( + move_to_height, + test_sequence, + LOWEST_HEIGHT, + execute_custom_movements, + check_duty_cycle_status_before_execution +) +from duty_cycle import show_duty_cycle_status, get_duty_cycle_status, load_state + + +@task +def log_info(message: str): + """Log information message""" + print(message) + + +@task +def duty_cycle_status_task(): + """ + Check duty cycle status as a Prefect task. + Reuses existing check_duty_cycle_status_before_execution() from desk_controller. + """ + logger = get_run_logger() + + try: + # Use the existing function - no need to reimplement + status = check_duty_cycle_status_before_execution() + + # Log for Prefect monitoring + logger.info(f"Duty cycle check completed:") + logger.info(f" Usage: {status['current_usage']:.1f}s / 120s ({status['percentage_used']:.1f}%)") + logger.info(f" Remaining: {status['remaining_capacity']:.1f}s") + logger.info(f" Position: {status['current_position']}\"") + + return status + + except Exception as e: + logger.error(f"Failed to check duty cycle status: {e}") + raise + + +@task +def execute_movement(target_height: float, current_height: float = None): + """Execute a single movement as a Prefect task""" + logger = get_run_logger() + + try: + result = move_to_height(target_height, current_height) + + if result["success"]: + logger.info(f"Movement successful: {result}") + return result + else: + logger.error(f"Movement failed: {result['error']}") + raise ValueError(result["error"]) + + except Exception as e: + logger.error(f"Movement execution failed: {e}") + raise + + +@task +def execute_custom_movements_task(config_file: str = "movement_configs.json"): + """ + Execute custom movements from configuration file as a Prefect task. + + This is a thin wrapper around desk_controller.execute_custom_movements() + which already handles all the complexity: + - Loading movement configs + - Pre-execution duty cycle checking + - Movement validation + - Movement execution + - Post-execution status reporting + """ + logger = get_run_logger() + logger.info(f"Executing custom movements from {config_file}") + + try: + # This function does EVERYTHING - no need for separate loading/validation tasks + result = execute_custom_movements(config_file) + + if result["success"]: + logger.info(f"✅ All movements completed successfully ({result['successful']}/{result['total_movements']})") + else: + logger.info(f"⚠️ Movements completed with some failures ({result['failed']}/{result['total_movements']} failed)") + + return result + except Exception as e: + logger.error(f"❌ Custom movements execution failed: {e}") + raise + + +@task +def execute_test_sequence(movement_distance: float = 0.5, rest_time: float = 10.0): + """Execute test sequence as a Prefect task""" + logger = get_run_logger() + + try: + result = test_sequence(movement_distance, rest_time) + + if result["success"]: + logger.info(f"Test sequence successful: {result}") + return result + else: + logger.error(f"Test sequence failed: {result.get('error', 'Unknown error')}") + raise ValueError(result.get("error", "Test sequence failed")) + + except Exception as e: + logger.error(f"Test sequence execution failed: {e}") + raise + + +# ============================================================================= +# FLOWS +# ============================================================================= + +@flow +def simple_movement_flow(target_height: float, current_height: float = None): + """Simple Prefect flow for moving to a specific height with duty cycle checking""" + logger = get_run_logger() + logger.info(f"=== SIMPLE MOVEMENT FLOW ===") + logger.info(f"Target: {target_height}\", Current: {current_height}\"") + + # Check duty cycle status using existing function + initial_status = duty_cycle_status_task() + + # Abort if insufficient capacity + if initial_status["remaining_capacity"] < 1.0: + logger.error("❌ MOVEMENT ABORTED: Insufficient duty cycle capacity") + raise ValueError("Insufficient duty cycle capacity - must wait for reset") + + # Execute the movement + result = execute_movement(target_height, current_height) + + # Check final duty cycle status + final_status = duty_cycle_status_task() + + # Log usage + capacity_used = initial_status["remaining_capacity"] - final_status["remaining_capacity"] + logger.info(f"Movement completed - Duty cycle used: {capacity_used:.1f}s") + + return { + **result, + "initial_duty_status": initial_status, + "final_duty_status": final_status, + "capacity_used": capacity_used + } + + +@flow +def custom_movements_flow(config_file: str = "movement_configs.json"): + """ + Simplified Prefect flow to execute custom movements. + + Uses the comprehensive desk_controller.execute_custom_movements() function + which already includes all necessary features internally. + """ + logger = get_run_logger() + logger.info("=== CUSTOM MOVEMENTS FLOW ===") + + # Execute custom movements - this function already does all the duty cycle checking + result = execute_custom_movements_task(config_file) + + logger.info("Custom movements flow completed") + return result + + +@flow +def duty_cycle_monitoring_flow(): + """ + Simplified duty cycle monitoring flow. + Uses existing duty cycle checking functions. + """ + logger = get_run_logger() + logger.info("=== DUTY CYCLE MONITORING FLOW ===") + + # Use existing duty cycle status function + status = duty_cycle_status_task() + + # Simple recommendation logic + remaining = status["remaining_capacity"] + + if remaining < 5: + recommendation = "wait" + logger.warning("⚠️ VERY LOW CAPACITY - Recommend waiting for duty cycle reset") + elif remaining < 15: + recommendation = "small_movements_only" + logger.warning("⚠️ LOW CAPACITY - Use small movements only") + elif remaining < 60: + recommendation = "moderate_planning" + logger.info("✅ MODERATE CAPACITY - Plan movements carefully") + else: + recommendation = "normal_operations" + logger.info("✅ GOOD CAPACITY - Normal operations possible") + + return { + "status": status, + "recommendation": recommendation, + "operational_mode": recommendation + } + + +@flow +def scheduled_duty_cycle_check(): + """ + Scheduled duty cycle monitoring using existing functions. + Just wraps duty_cycle_monitoring_flow for scheduled execution. + """ + logger = get_run_logger() + logger.info("=== SCHEDULED DUTY CYCLE CHECK ===") + + # Use the monitoring flow + result = duty_cycle_monitoring_flow() + + # Log summary for scheduled monitoring + status = result["status"] + logger.info(f"Scheduled duty cycle check:") + logger.info(f" Usage: {status['current_usage']:.1f}s / 120s ({status['percentage_used']:.1f}%)") + logger.info(f" Mode: {result['recommendation']}") + + # Alert on very low capacity + if status["remaining_capacity"] < 10: + logger.warning("🚨 ALERT: Very low duty cycle capacity remaining!") + + return result + + +@flow +def test_sequence_flow(movement_distance: float = 0.5, rest_time: float = 10.0): + """Prefect flow for automated test sequence""" + logger = get_run_logger() + logger.info(f"=== TEST SEQUENCE FLOW ===") + logger.info(f"Distance: {movement_distance}\", Rest: {rest_time}s") + + # Check duty cycle before starting using existing function + initial_status = duty_cycle_status_task() + + # Execute test sequence + result = execute_test_sequence(movement_distance, rest_time) + + # Check final status + final_status = duty_cycle_status_task() + + logger.info("Test sequence flow completed") + return { + **result, + "initial_duty_status": initial_status, + "final_duty_status": final_status + } + + +# ============================================================================= +# DEPLOYMENT FUNCTIONS +# ============================================================================= + +def deploy_custom_movements_flow(deployment_name: str = "custom-movements"): + """Deploy the main custom movements flow""" + + deployment = custom_movements_flow.from_source( + source=".", + entrypoint="prefect_flows.py:custom_movements_flow", + ).deploy( + name=deployment_name, + work_pool_name="default-process-pool", + ) + + print(f"✅ Deployment '{deployment_name}' created!") + print(f"To run: prefect deployment run 'custom-movements-flow/{deployment_name}'") + return deployment_name + + +def deploy_duty_cycle_monitoring(deployment_name: str = "duty-cycle-monitor", schedule_cron: str = None): + """Deploy duty cycle monitoring flow with optional scheduling""" + + deploy_kwargs = { + "name": deployment_name, + "work_pool_name": "default-process-pool", + } + + if schedule_cron: + from prefect.client.schemas.schedules import CronSchedule + deploy_kwargs["schedule"] = CronSchedule(cron=schedule_cron) + print(f"Deploying with cron schedule: {schedule_cron}") + + deployment = scheduled_duty_cycle_check.from_source( + source=".", + entrypoint="prefect_flows.py:scheduled_duty_cycle_check", + ).deploy(**deploy_kwargs) + + print(f"✅ Deployment '{deployment_name}' created!") + if schedule_cron: + print(f"Scheduled to run: {schedule_cron}") + else: + print(f"To run: prefect deployment run 'scheduled-duty-cycle-check/{deployment_name}'") + return deployment_name + + +def deploy_test_sequence(deployment_name: str = "test-sequence"): + """Deploy test sequence flow""" + + deployment = test_sequence_flow.from_source( + source=".", + entrypoint="prefect_flows.py:test_sequence_flow", + ).deploy( + name=deployment_name, + work_pool_name="default-process-pool", + ) + + print(f"✅ Deployment '{deployment_name}' created!") + print(f"To run: prefect deployment run 'test-sequence-flow/{deployment_name}'") + return deployment_name + + +def deploy_all_flows(): + """Deploy all desk control flows""" + print("=== DEPLOYING ALL SIMPLIFIED DESK CONTROL FLOWS ===") + + # Deploy main flows + deploy_custom_movements_flow() + deploy_test_sequence() + + # Deploy monitoring flows + deploy_duty_cycle_monitoring("duty-cycle-monitor-scheduled", "*/10 * * * *") + deploy_duty_cycle_monitoring("duty-cycle-monitor-immediate") + + print("\n🎉 All deployments created!") + print("\nAvailable flows:") + print(" 1. custom-movements - Main movement execution") + print(" 2. test-sequence - Automated test sequence") + print(" 3. duty-cycle-monitor-scheduled - Auto monitoring (every 10min)") + print(" 4. duty-cycle-monitor-immediate - On-demand monitoring") + + return True + + +if __name__ == "__main__": + import sys + + if len(sys.argv) > 1: + if sys.argv[1] == "test": + test_sequence_flow() + elif sys.argv[1] == "movements": + custom_movements_flow() + elif sys.argv[1] == "monitor": + duty_cycle_monitoring_flow() + elif sys.argv[1] == "deploy": + deploy_all_flows() + else: + print("Usage: python prefect_flows.py [test|movements|monitor|deploy]") + else: + custom_movements_flow() \ No newline at end of file diff --git a/scripts/test_up.py b/scripts/test_up.py deleted file mode 100644 index 1078c4c..0000000 --- a/scripts/test_up.py +++ /dev/null @@ -1,33 +0,0 @@ -# test_up.py -import time -import RPi.GPIO as GPIO - -UP_PIN = 17 # BCM numbering, physical pin 11 - -GPIO.setmode(GPIO.BCM) - -def release_up(): - # High-impedance: "button not pressed" - GPIO.setup(UP_PIN, GPIO.IN, pull_up_down=GPIO.PUD_OFF) - -def press_up(): - # Drive low: "button pressed" - GPIO.setup(UP_PIN, GPIO.OUT, initial=GPIO.LOW) - -try: - # Make sure we're released at start - release_up() - print("Ready. Desk should NOT move yet.") - - input("Press Enter to move UP for 2 seconds...") - - print("Moving UP...") - press_up() - time.sleep(2.0) # adjust if you want a shorter/longer test - release_up() - print("Released UP. Test finished.") - -finally: - # Ensure we leave the line released - release_up() - GPIO.cleanup()