diff --git a/stack/motors/src/trunk_motors/trunk_motors/launch/launch_motors.py b/stack/motors/src/trunk_motors/trunk_motors/launch/launch_motors.py index 10c9cec5..6d2d7c84 100644 --- a/stack/motors/src/trunk_motors/trunk_motors/launch/launch_motors.py +++ b/stack/motors/src/trunk_motors/trunk_motors/launch/launch_motors.py @@ -2,10 +2,18 @@ from ament_index_python.packages import get_package_share_directory from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration from launch_ros.actions import Node def generate_launch_description(): return LaunchDescription([ + # Declare launch argument with default 'true' + DeclareLaunchArgument( + 'secure_mode', + default_value='true', + description='Enable or disable secure mode for motor_node' + ), Node( package='trunk_motors', executable='motor_node.py', @@ -16,7 +24,8 @@ def generate_launch_description(): {'kP': 800.0}, {'kI': 0.0}, {'kD': 0.0}, - {'curr_lim': 3000.0} + {'curr_lim': 3000.0}, + {'secure_mode': LaunchConfiguration('secure_mode')} ] ) ]) diff --git a/stack/motors/src/trunk_motors/trunk_motors/scripts/motor_node.py b/stack/motors/src/trunk_motors/trunk_motors/scripts/motor_node.py index 14180943..738b51df 100755 --- a/stack/motors/src/trunk_motors/trunk_motors/scripts/motor_node.py +++ b/stack/motors/src/trunk_motors/trunk_motors/scripts/motor_node.py @@ -2,28 +2,37 @@ import rclpy from rclpy.node import Node +from rclpy.qos import QoSProfile +from rclpy.callback_groups import ReentrantCallbackGroup import numpy as np -import time -from std_msgs.msg import Float64MultiArray +# import time +# from std_msgs.msg import Float64MultiArray from motor_utils.dynamixel_client_motor import DynamixelClient -from interfaces.msg import AllMotorsControl +from interfaces.msg import AllMotorsControl, TrunkRigidBodies from interfaces.msg import AllMotorsStatus class MotorNode(Node): def __init__(self): super().__init__('motor_node') - self.control_augmented = False # True if running Paul's control augmented data collected, False else + self.declare_parameters(namespace='', parameters=[ + ('secure_mode', True) + ]) - if self.control_augmented: - # self.rest_positions = np.array([196.0, 201.0, 193.0, 210.0, 202.0, 197]) update these - self.motor_ids = [1, 2, 3, 4, 5, 6] - else: - self.rest_positions = np.array([193.0, 189.0, 186.0, 183.0, 187.0, 204]) # CHANGE THIS WHENEVER TENDONS ARE RE-TENSIONED - self.motor_ids = [1, 2, 3, 4, 5, 6] # all 6 trunk motors + self.MPC_SECURITY_MODE = self.get_parameter('secure_mode').value # True if MPC is running + + # Execution occurs in multiple threads + self.callback_group = ReentrantCallbackGroup() + + # CHANGE THIS WHENEVER TENDONS ARE RE-TENSIONED + self.rest_positions = np.array([193.0, 189.0, 186.0, 183.0, 187.0, 204.0]) + self.motor_ids = [1, 2, 3, 4, 5, 6] # all 6 trunk motors - # Define a safe region to operate the motors in: + # Define a safe region to operate the motors in (position and velocity): self.limits_safe = np.array([51, 81, 31, 81, 31, 51]) + self.delta_limits_safe = np.array([5.0, 5.0, 5.0, 5.0, 5.0, 5.0]) + + self.last_motor_positions = None # initialize motors client self.dxl_client = DynamixelClient(motor_ids=self.motor_ids, port='/dev/ttyUSB0') @@ -40,17 +49,33 @@ def __init__(self): self.controls_subscriber = self.create_subscription( AllMotorsControl, '/all_motors_control', - self.command_positions, # callback function - 10 + self.command_positions, # callback function + 10, + callback_group=self.callback_group ) + if self.MPC_SECURITY_MODE: + # Subscribe to current positions + self.mocap_subscription = self.create_subscription( + TrunkRigidBodies, + '/trunk_rigid_bodies', + self.mocap_listener_callback, + QoSProfile(depth=3), + callback_group=self.callback_group + ) + # create status publisher self.status_publisher = self.create_publisher( AllMotorsStatus, '/all_motors_status', 10 ) - self.timer = self.create_timer(1.0/100, self.read_status) # publish at 100Hz + + self.rest_position_trunk = np.array([0.10753094404935837, -0.11212190985679626, 0.10474388301372528, + 0.10156622529029846, -0.20444495975971222, 0.11144950985908508, + 0.10224875807762146, -0.3151078522205353, 0.10935673117637634]) + + self.timer = self.create_timer(1.0/100, self.read_status) # publish at 100Hz # read out initial positions self.get_logger().info('Initial motor status: ') @@ -58,26 +83,42 @@ def __init__(self): positions_raw = positions + self.rest_positions for idx, id in enumerate(self.motor_ids): - self.get_logger().info(f"Motor {id} position: {positions[idx]:.2f} degrees") # display in degrees + self.get_logger().info(f"Motor {id} position: {positions[idx]:.2f} degrees") # display in degrees for idx, id in enumerate(self.motor_ids): - self.get_logger().info(f"Motor {id} raw position: {positions_raw[idx]:.2f} degrees") # display in degrees + self.get_logger().info(f"Motor {id} raw position: {positions_raw[idx]:.2f} degrees") # display in degrees self.get_logger().info('Motor control node initialized!') - def command_positions(self, msg): # commands new positions to the motors positions = msg.motors_control positions = np.array(positions) + if self.last_motor_positions is None: + delta_positions = np.zeros_like(positions) + else: + delta_positions = positions - self.last_motor_positions + + # calculate relative position change + mask_delta_low = delta_positions < -self.delta_limits_safe + mask_delta_high = delta_positions > self.delta_limits_safe + # inputs from ROS message are zero centered, need to center them about rest positions before sending to motor mask_low = positions < -self.limits_safe mask_high = positions > self.limits_safe - if np.any(mask_low | mask_high): + + if self.MPC_SECURITY_MODE: + print(f"Positions: {positions}, Delta Positions: {delta_positions}") + + if np.any(mask_low | mask_high) or self.MPC_SECURITY_MODE and np.any(mask_delta_low | mask_delta_high): bad_idxs = np.where(mask_low | mask_high)[0] + bad_idxs_delta = np.where(mask_delta_low | mask_delta_high)[0] bad_vals = positions[bad_idxs] + bad_vals_delta = delta_positions[bad_idxs_delta] + self.get_logger().error( f"Unsafe motor commands at indices {bad_idxs.tolist()}: {bad_vals.tolist()}. " + f"Unsafe delta commands at indices {bad_idxs_delta.tolist()}: {bad_vals_delta.tolist()}. " "Shutting down without sending to motors." ) @@ -87,31 +128,55 @@ def command_positions(self, msg): # signal ROS to exit rclpy.shutdown() return - - positions += self.rest_positions # inputs from ROS message are zero centered, need to center them about rest positions before sending to motor - positions *= np.pi/180 # receives a position in degrees, convert to radians for dynamixel sdk + self.last_motor_positions = positions.copy() + # inputs from ROS message are zero centered, need to center them about rest positions before sending to motor + positions += self.rest_positions + + positions *= np.pi/180 # receives a position in degrees, convert to radians for dynamixel sdk + self.dxl_client.write_desired_pos(self.motor_ids, positions) - # for idx, id in enumerate(self.motor_ids): - # self.get_logger().info(f"commanded motor {id} to {positions[idx]*180/np.pi:.2f} degrees") - + def mocap_listener_callback(self, msg): + """ + Callback to process mocap data, updating the latest observation. + """ + # Unpack the message into simple list of positions, eg [x1, y1, z1, x2, y2, z2, ...] + y_new = np.array([coord for pos in msg.positions for coord in [pos.x, pos.y, pos.z]]) + y_centered = y_new - self.rest_position_trunk + + # Subselect tip + y_centered_tip = y_centered[-3:] + + if np.linalg.norm(y_centered_tip) > 0.1: + self.get_logger().error( + f"Unsafe trunk position at value commands at indices {np.linalg.norm(y_centered_tip)}. " + "Shutting down the motor node." + ) + + # clean up torque + disconnect + self.shutdown() + + # signal ROS to exit + rclpy.shutdown() + return + + return def read_status(self): # reads and publishes the motor position, velocity, and current positions, velocities, currents = self.dxl_client.read_pos_vel_cur() - positions *= 180/np.pi # dynamixel position in rad, convert to degrees - positions -= self.rest_positions # motor sends real positions, we want to read zero centered positions + positions *= 180/np.pi # dynamixel position in rad, convert to degrees + positions -= self.rest_positions # motor sends real positions, we want to read zero centered positions msg = AllMotorsStatus() msg.positions = positions.tolist() - msg.velocities = velocities.tolist() # TODO: determine if in rpm (should be) - msg.currents = currents.tolist() # TODO: determine if in mA (should be) + msg.velocities = velocities.tolist() # TODO: determine if in rpm (should be) + msg.currents = currents.tolist() # TODO: determine if in mA (should be) self.status_publisher.publish(msg) return positions - def shutdown(self): # cleanup before shutdown @@ -119,31 +184,11 @@ def shutdown(self): self.dxl_client.set_torque_enabled(self.motor_ids, False) self.dxl_client.disconnect() + def main(): rclpy.init() node = MotorNode() try: - # checks to see maximum read and write rate of the dynamixels ~20Hz read, ~124Hz write - # # READ rate check - # dxl = node.dxl_client - # dxl.motor_ids = [1, 2, 3, 4, 5, 6] - # N = 100 - # start = time.time() - # for _ in range(N): - # dxl.read_pos_vel_cur() - # end = time.time() - # node.get_logger().info(f"READ avg rate: {N / (end - start):.2f} Hz") - - # # WRITE rate check - # positions = [198.0, 204.0, 189.0, 211.0, 200.0, 192.0] - # positions = [np.pi/180 * pos for pos in positions] # receives a position in degrees, convert to radians for dynamixel sdk - # start = time.time() - # for _ in range(N): - # dxl.write_desired_pos(dxl.motor_ids, np.array(positions)) - - # end = time.time() - # node.get_logger().info(f"WRITE avg rate: {N / (end - start):.2f} Hz") - rclpy.spin(node) except KeyboardInterrupt: pass