diff --git a/configs/ilqr.yaml b/configs/ilqr.yaml index 46c294a..73e4916 100644 --- a/configs/ilqr.yaml +++ b/configs/ilqr.yaml @@ -9,7 +9,7 @@ platform: "cpu" # "cpu" or "gpu" or "tpu" ###########Optimization Parameters ################# #################################################### max_iter: 50 # maximum number of iterations -# tolerance for the iLQR convergence +# tolerance for the ILQR convergence # Make sure this is smaller than the minimum line search step size tol: 0.05 @@ -46,7 +46,7 @@ a_max: 5.0 # maximum acceleration a_min: -5.0 # minimum acceleration #################################################### -########## Parameters for iLQR COST ################ +########## Parameters for ILQR COST ################ #################################################### ######## State Cost ############ diff --git a/configs/ilqr_avodiance.yaml b/configs/ilqr_avodiance.yaml index 05c54f6..d3834c6 100644 --- a/configs/ilqr_avodiance.yaml +++ b/configs/ilqr_avodiance.yaml @@ -10,7 +10,7 @@ dt: 0.1 # time step ###########Optimization Parameters ################# #################################################### max_iter: 50 # maximum number of iterations -# tolerance for the iLQR convergence +# tolerance for the ILQR convergence # Make sure this is smaller than the minimum line search step size tol: 1e-3 @@ -29,7 +29,7 @@ reg_init: 1.0 # initial regularization #################################################### -########## Parameters for iLQR COST ################ +########## Parameters for ILQR COST ################ #################################################### ######## State Cost ############ diff --git a/configs/lab1.yaml b/configs/lab1.yaml index a513ae2..7e9ca65 100644 --- a/configs/lab1.yaml +++ b/configs/lab1.yaml @@ -12,7 +12,7 @@ platform: "cpu" # "cpu" or "gpu" or "tpu" ###########Optimization Parameters ################# #################################################### max_iter: 50 # maximum number of iterations -# tolerance for the iLQR convergence +# tolerance for the ILQR convergence # Make sure this is smaller than the minimum line search step size tol: 0.05 @@ -60,7 +60,7 @@ a_min: -5.0 # minimum acceleration # reference velocity v_ref: 1 # reference velocity #################################################### -########## Parameters for iLQR COST ################ +########## Parameters for ILQR COST ################ #################################################### ######## State Cost ############ diff --git a/scripts/planner/iLQR/__init__.py b/scripts/planner/iLQR/__init__.py deleted file mode 100644 index 54f735b..0000000 --- a/scripts/planner/iLQR/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .ref_path import RefPath -from .ilqr import iLQR -from .ilqr_np import iLQRnp \ No newline at end of file diff --git a/scripts/planner/iLQR/config.py b/scripts/planner/iLQR/config.py deleted file mode 100644 index e1b663e..0000000 --- a/scripts/planner/iLQR/config.py +++ /dev/null @@ -1,134 +0,0 @@ -import yaml - -class Config(): - def __init__(self): - - #################################################### - ############### General Parameters ################# - #################################################### - self.num_dim_x = 5 - self.num_dim_u = 2 - self.T = 10 # horizon length - self.dt = 0.1 # time step - self.platform = "cpu" # "cpu" or "gpu" or "tpu" - - #################################################### - ###########Optimization Parameters ################# - #################################################### - self.max_iter = 50 # maximum number of iterations - # tolerance for the iLQR convergence - # Make sure this is smaller than the minimum line search step size - self.tol = 0.05 - - # line search parameters - # We assume line search parameter is base**np.arange(a, b, c) - self.line_search_base = 0.1 - self.line_search_a = -1 # line search parameter a - self.line_search_b = 3 # line search parameter b - self.line_search_c = 1 # line search parameter c - - # regularization parameters - self.reg_min = 1e-5 # minimum regularization - self.reg_max = 1e8 # maximum regularization - self.reg_scale_down = 5 # scale down factor for regularization - self.reg_scale_up = 5 # scale up factor for regularization - self.reg_init = 1.0 # initial regularization - - # max number of re-attempts after unsuccessful line search - self.max_attempt = 5 - - #################################################### - ############### Dynamics Parameters ################ - #################################################### - self.wheelbase = 0.257 # wheelbase of the vehicle - self.radius = 0.13 # radius of the vehicle - self.width = 0.22 # width of the vehicle - self.length = 0.40 # length of the vehicle - - # steering angle limits - self.delta_max = 0.35 # maximum steering angle - self.delta_min = -0.35 # minimum steering angle - - # velocity limits - self.v_max = 5.0 # maximum velocity - self.v_min = 0.0 # minimum velocity - - # turn rate limits - self.omega_min = -6.0 # minimum turn rate - self.omega_max = 6.0 # maximum turn rate - - # acceleration limits - self.a_max = 5.0 # maximum acceleration - self.a_min = -5.0 # minimum acceleration - - # reference velocity - self.v_ref = 1.0 # reference velocity - #################################################### - ########## Parameters for iLQR COST ################ - #################################################### - - ######## State Cost ############ - - # Path Offset Cost - self.dim_closest_pt_x = 0 # dimension of closest point x in the reference - self.dim_closest_pt_y = 1 # dimension of closest point y in the reference - self.dim_path_slope = 2 # dimension of path slope in the reference - self.path_cost_type = 'quadratic' # 'quadratic' or 'huber' - self.path_weight = 5.0 # weight for the path deviation cost - self.path_huber_delta = 2 # huber loss delta for path deviation cost - - # Velocity Cost - self.vel_cost_type = 'quadratic' # 'quadratic' or 'huber' - self.vel_weight = 2.0 # weight for the velocity cost - self.vel_huber_delta = 1 # huber loss delta for velocity cost - - # Speed Limit Cost - self.dim_vel_limit = 3 # dimension of reference velocity in the reference - self.vel_limit_a = 10.0 # parameter for speed limit cost - self.vel_limit_b = 1.0 # parameter for ExpLinear Cost - - # Heading Cost - self.heading_cost_type = 'quadratic' # 'quadratic' or 'huber' - self.heading_weight = 1 # weight for the heading cost - self.heading_huber_delta = 1 # huber loss delta for heading cost - - # Lateral Acceleration Cost - # We use ExpLinearCost for lateral acceleration cost - self.lat_accel_thres = 6.0 # threshold for lateral acceleration cost - self.lat_accel_a = 5.0 # parameter for lateral acceleration cost - self.lat_accel_b = 2.0 # parameter for ExpLinear Cost - - # Progress Cost - self.dim_progress = 4 # dimension of progress in the reference - self.progress_weight = 0 # weight for the progress cost - - # Lane Boundary Cost - self.dim_right_boundary = 5 # dimension of lane boundary in the reference - self.dim_left_boundary = 6 # dimension of lane boundary in the reference - self.lane_boundary_a = 30.0 # parameter for lane boundary cost - self.lane_boundary_b = 2.0 # parameter for ExpLinear Cost - - ######## Control Cost ############ - - self.ctrl_cost_type = 'quadratic' # 'quadratic' or 'huber' - # those value should not be too small - self.ctrl_cost_accel_weight = 0.5 - self.ctrl_cost_steer_weight = 0.3 - self.ctrl_cost_accel_huber_delta = 1.0 # huber loss delta - self.ctrl_cost_steer_huber_delta = 1.0 # huber loss delta - - ######## Obstacle Cost ############ - self.obs_a = 10.0 # parameter for obstacle cost - self.obs_b = 10.0 # parameter for ExpLinear Cost - - def load_config(self, config_path): - with open(config_path, 'r') as f: - config_dict = yaml.load(f, Loader=yaml.FullLoader) - - config_varible = vars(self) - for key in config_varible.keys(): - if key in config_dict: - setattr(self, key, config_dict[key]) - - def __str__(self) -> str: - return 'iLQR config: ' + str(vars(self)) \ No newline at end of file diff --git a/scripts/planner/iLQR/cost/__init__.py b/scripts/planner/iLQR/cost/__init__.py deleted file mode 100644 index ba8268d..0000000 --- a/scripts/planner/iLQR/cost/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .cost import Cost -from .collision_checker import CollisionChecker, Obstacle \ No newline at end of file diff --git a/scripts/planner/iLQR/cost/base_cost.py b/scripts/planner/iLQR/cost/base_cost.py deleted file mode 100644 index 298783a..0000000 --- a/scripts/planner/iLQR/cost/base_cost.py +++ /dev/null @@ -1,237 +0,0 @@ -from abc import ABC, abstractmethod -from functools import partial -from typing import Union -from jaxlib.xla_extension import DeviceArray -import jax -from jax import numpy as jnp - -class BaseCost(ABC): - ''' - Base class for cost functions. - ''' - def __init__(self): - super().__init__() - - @abstractmethod - def get_running_cost( - self, state: DeviceArray, ctrl: DeviceArray, ref: DeviceArray - ) -> float: - ''' - Abstract method for computing the cost at a single timestep. - Input: - state: (dim_x) - ctrl: (dim_u) - ref: (dim_ref) - return: - cost: float - ''' - raise NotImplementedError - - - # Note: We assume that the Cost is in the Lagrange problem form. - # That is the Terminal cost is 0 - # A little bit abusing the notation, but it is convenient for the implementation, - # We assume there might be a "state/control independent" terminal cost. - # In this case, we need to careful convert it to the Lagrange form and add it to the running cost. - # See progress cost of State Cost for an example. - - @classmethod - @partial(jax.jit, static_argnums=(0,)) - def get_terminal_cost( - self, ref: DeviceArray - ) -> float: - return 0 - - @partial(jax.jit, static_argnums=(0,)) - def get_traj_cost( - self, trajectory: DeviceArray, controls: DeviceArray, path_refs: DeviceArray - ) -> float: - ''' - Given a state, control, and time index, return the sum of the cost. - Input: - trajectory: (dim_x, N) List of trajectory - controls: (dim_u, N) List of controls - path_refs: (dim_ref, N) List of references - time_indices: (1, N) List of time indices - return: - cost: float - ''' - running_costs = jax.vmap(self.get_running_cost, - in_axes=(1, 1, 1))(trajectory, controls, path_refs) - terminal_cost = self.get_terminal_cost(path_refs) - return jnp.sum(running_costs).astype(float) + terminal_cost - - @partial(jax.jit, static_argnums=(0,)) - def get_derivatives_jax( - self, trajectory: DeviceArray, controls: DeviceArray, path_refs: DeviceArray - ) -> DeviceArray: - return ( - self.get_cx(trajectory, controls, path_refs), - self.get_cu(trajectory, controls, path_refs), - self.get_cxx(trajectory, controls, path_refs), - self.get_cuu(trajectory, controls, path_refs), - self.get_cux(trajectory, controls, path_refs), - ) - - @partial(jax.jit, static_argnums=(0,)) - def get_cx( - self, trajectory: DeviceArray, controls: DeviceArray, path_refs: DeviceArray - ) -> DeviceArray: - ''' - Given a state, control, and time index, return the jacobian of cost w.r.t the state. - Input: - trajectory: (dim_x, N) List of trajectory - controls: (dim_u, N) List of controls - path_refs: (dim_ref, N) List of references - time_indices: (1, N) List of time indices - return: - dc_dx: (dim_x, N) - ''' - _cx = jax.jacfwd(self.get_running_cost, argnums=0) - return jax.vmap(_cx, in_axes=(1, 1, 1), - out_axes=1)(trajectory, controls, path_refs) - - @partial(jax.jit, static_argnums=(0,)) - def get_cu( - self, trajectory: DeviceArray, controls: DeviceArray, path_refs: DeviceArray - ) -> DeviceArray: - ''' - Given a state, control, and time index, return the jacobian of cost w.r.t the control. - Input: - trajectory: (dim_x, N) List of trajectory - controls: (dim_u, N) List of controls - path_refs: (dim_ref, N) List of references - time_indices: (1, N) List of time indices - return: - dc_du: (dim_u, N) - ''' - - _cu = jax.jacfwd(self.get_running_cost, argnums=1) - return jax.vmap(_cu, in_axes=(1, 1, 1), - out_axes=1)(trajectory, controls, path_refs) - - @partial(jax.jit, static_argnums=(0,)) - def get_cxx( - self, trajectory: DeviceArray, controls: DeviceArray, path_refs: DeviceArray - ) -> DeviceArray: - ''' - Given a state, control, and time index, return the hessian of cost w.r.t the state. - Input: - trajectory: (dim_x, N) List of trajectory - controls: (dim_u, N) List of controls - path_refs: (dim_ref, N) List of references - time_indices: (1, N) List of time indices - return: - d^2c_dx^2: (dim_x, dim_x, N) - ''' - _cxx = jax.jacfwd(jax.jacrev(self.get_running_cost, argnums=0), argnums=0) - return jax.vmap(_cxx, in_axes=(1, 1, 1), - out_axes=2)(trajectory, controls, path_refs) - - @partial(jax.jit, static_argnums=(0,)) - def get_cuu( - self, trajectory: DeviceArray, controls: DeviceArray, path_refs: DeviceArray - ) -> DeviceArray: - ''' - Given a state, control, and time index, return the hessian of cost w.r.t the control. - Input: - trajectory: (dim_x, N) List of trajectory - controls: (dim_u, N) List of controls - path_refs: (dim_ref, N) List of references - time_indices: (1, N) List of time indices - return: - d^2c_du^2: (dim_u, dim_u, N) - ''' - _cuu = jax.jacfwd(jax.jacrev(self.get_running_cost, argnums=1), argnums=1) - return jax.vmap(_cuu, in_axes=(1, 1, 1), - out_axes=2)(trajectory, controls, path_refs) - - @partial(jax.jit, static_argnums=(0,)) - def get_cux( - self, trajectory: DeviceArray, controls: DeviceArray, path_refs: DeviceArray - ) -> DeviceArray: - ''' - Given a state, control, and time index, return the hessian of cost w.r.t the control and state. - Input: - trajectory: (dim_x, N) List of trajectory - controls: (dim_u, N) List of controls - path_refs: (dim_ref, N) List of references - time_indices: (1, N) List of time indices - return: - d^2c_dux: (dim_u, dim_x, N) - ''' - _cux = jax.jacfwd(jax.jacrev(self.get_running_cost, argnums=1), argnums=0) - return jax.vmap(_cux, in_axes=(1, 1, 1), - out_axes=2)(trajectory, controls, path_refs) - - @partial(jax.jit, static_argnums=(0,)) - def get_cxu( - self, trajectory: DeviceArray, controls: DeviceArray, path_refs: DeviceArray - ) -> DeviceArray: - ''' - Given a state, control, and time index, return the hessian of cost w.r.t the control and state. - Input: - trajectory: (dim_x, N) List of trajectory - controls: (dim_u, N) List of controls - path_refs: (dim_ref, N) List of references - time_indices: (1, N) List of time indices - return: - d^2c_dxu: (dim_x, dim_u, N) - ''' - return self.get_cux(trajectory, controls, path_refs).T - - -@jax.jit -def exp_linear_cost( - y: Union[float, DeviceArray], a: Union[float, DeviceArray] = 1, - b: Union[float, DeviceArray] = 1 - ) -> Union[float, DeviceArray]: - ''' - Base Class of Exponential Linear Cost defined as following - Take y = func(x, u, t) - z = a*y - c(x, u, t) = b*exp(z) if z <=0 - c(x, u, t) = b*z+b if z > 0 - - Args: - y: float or 1D array, value to be costed - a: float or 1D array, coefficient of exponential linear cost - - ''' - z = a * y - # return b * jnp.exp(z) - return jnp.where(z <= 0, b * jnp.exp(z), b * z + b) - -@jax.jit -def quadratic_cost( - y: Union[float, DeviceArray], a: Union[float, DeviceArray] = 1, - not_used = None - ) -> Union[float, DeviceArray]: - ''' - Base Class of Quadratic Cost defined as following - Take y = func(x, u, t) - c(x, u, t) = (a*y)^2 - Args: - y: float or 1D array, value to be costed - a: float or 1D array, coefficient of quadratic cost - ''' - return (a * y) ** 2 - -@jax.jit -def huber_cost( - y: Union[float, DeviceArray], a: Union[float, DeviceArray] = 1, - delta: Union[float, DeviceArray] = 1 - ) -> Union[float, DeviceArray]: - ''' - Base Class of Huber Cost defined as following - Take y = func(x, u, t) - z = |a*y} - c(x, u, t) = y**2 if |y| <= delta - c(x, u, t) = delta*(2*|y|-delta) if |y| > delta - - Args: - y: float or 1D array, value to be costed - delta: float or 1D array, threshold of Huber Cost - ''' - z = jnp.abs(a*y) - return jnp.where(z <= delta, z**2, delta*(2*z-delta)) \ No newline at end of file diff --git a/scripts/planner/iLQR/cost/collision_checker/__init__.py b/scripts/planner/iLQR/cost/collision_checker/__init__.py deleted file mode 100644 index cc9eee0..0000000 --- a/scripts/planner/iLQR/cost/collision_checker/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .collision_checker import CollisionChecker -from .obstacle import Obstacle \ No newline at end of file diff --git a/scripts/planner/iLQR/cost/collision_checker/collision_checker.py b/scripts/planner/iLQR/cost/collision_checker/collision_checker.py deleted file mode 100644 index 62d41f4..0000000 --- a/scripts/planner/iLQR/cost/collision_checker/collision_checker.py +++ /dev/null @@ -1,123 +0,0 @@ -from typing import Union -import jax -from functools import partial -import jax.numpy as jnp -from jaxlib.xla_extension import DeviceArray -import hppfcl -import numpy as np -import time -import warnings -class CollisionChecker: - def __init__(self, config) -> None: - - self.width = config.width - self.length = config.length - self.wheelbase = config.wheelbase - self.step = config.T - - # create a box for the vehicle - # The vehicle is centered at its rear axle - verts = hppfcl.StdVec_Vec3f() - verts.extend( - [ - np.array([-(self.length - self.wheelbase)/2.0, self.width/2.0, 100]), - np.array([-(self.length - self.wheelbase)/2.0, -self.width/2.0, 100]), - np.array([(self.length + self.wheelbase)/2.0, self.width/2.0, 100]), - np.array([(self.length + self.wheelbase)/2.0, -self.width/2.0, 100]), - np.array([-(self.length - self.wheelbase)/2.0, self.width/2.0, -100]), - np.array([-(self.length - self.wheelbase)/2.0, -self.width/2.0, -100]), - np.array([(self.length + self.wheelbase)/2.0, self.width/2.0, -100]), - np.array([(self.length + self.wheelbase)/2.0, -self.width/2.0, -100]), - ] - ) - self.ego_object = hppfcl.Convex.convexHull(verts, False, None) - self.ego_collision_object = hppfcl.CollisionObject(self.ego_object, np.eye(3), np.zeros(3)) - - @partial(jax.jit, static_argnums=(0,)) - def _gen_pose(self, state): - ''' - state: a 1D numpy array of shape (5,), (x, y, z, psi, v) - ''' - psi = state[3] - cp = jnp.cos(psi) - sp = jnp.sin(psi) - ego_rotm = jnp.array([[cp, -sp], - [sp, cp]]) - ego_trans = state[:2] - - ego_rotm_inv = jnp.array([[cp, sp, 0], - [-sp, cp, 0], - [0,0,1]]) - ego_trans_inv = jnp.array([-(cp*state[0] + sp*state[1]), - (sp*state[0] - cp*state[1]), 0]) - return ego_rotm, ego_trans, ego_rotm_inv, ego_trans_inv - - def _check_collision(self, state: np.ndarray, polytope: hppfcl.ConvexBase) -> np.ndarray: - """ - Check collision between the ego vehicle and a polytope - args: - - state: a 1D numpy array of shape (5,), (x, y, z, psi, v) the ego vehicle state in the global coordinate frame - polytope: a hppfcl collision object whose vertices are in the global coordinate frame - """ - # get the ego vehicle pose - psi = state[3] - cp = np.cos(psi) - sp = np.sin(psi) - ego_rotm = np.array([[cp, -sp], - [sp, cp]]) - ego_trans = state[:2] - ego_rotm_inv = np.array([[cp, sp, 0], - [-sp, cp, 0], - [0,0,1]]) - ego_trans_inv = np.array([-(cp*state[0] + sp*state[1]), - (sp*state[0] - cp*state[1]), 0]) - - # create a collision object for the polytope in the ego vehicle frame - collision_object = hppfcl.CollisionObject(polytope, ego_rotm_inv, ego_trans_inv) - # check collision between the ego vehicle and the obstacles - request = hppfcl.DistanceRequest() - result = hppfcl.DistanceResult() - hppfcl.distance(self.ego_collision_object, collision_object, request, result) - ego_point = result.getNearestPoint1()[:2] - obj_point = result.getNearestPoint2()[:2] - - obj_point_global = ego_rotm@obj_point + ego_trans - - distance = result.min_distance - if distance < -1e10: - distance = -0.01 - - output = np.array([ego_point[0], ego_point[1], obj_point_global[0], obj_point_global[1], distance]) - - return output - - def check_collisions(self, state: np.ndarray, obstacles: list) -> np.ndarray: - """ - Check collision between the ego vehicle and a list of obstacles - args: - - state: a 2D numpy array of shape (5,n), - each column is (x, y, z, psi, v) the ego vehicle state in the global coordinate frame - obstacles: a list of Obstacle objects - - returns: - - collision_ref: a 3D numpy array of shape (num_obstacles, 5, n) - """ - if not isinstance(state, np.ndarray): - warnings.warn(f"'obs_refs' is a class of {type(state)} instead of . "+ \ - "There maybe performance issue due to sliceing []") - - num_obstacles = len(obstacles) - # No obstacles return None - if num_obstacles == 0: - return None - - # initialize the collision_ref for the ego vehicle - collision_ref = np.zeros((num_obstacles, 5, self.step)) - - for i, obstacle in enumerate(obstacles): - for t in range(self.step): - collision_ref[i,:, t] = self._check_collision(state[:,t], obstacle.at(t)) - return collision_ref \ No newline at end of file diff --git a/scripts/planner/iLQR/cost/collision_checker/obstacle.py b/scripts/planner/iLQR/cost/collision_checker/obstacle.py deleted file mode 100644 index a1af45c..0000000 --- a/scripts/planner/iLQR/cost/collision_checker/obstacle.py +++ /dev/null @@ -1,49 +0,0 @@ -import hppfcl -import numpy as np -from typing import Union - -class Obstacle: - def __init__(self, vertices: Union[np.ndarray, list]) -> None: - ''' - Constuct a time-varying obstacle from a list of vertices - vertices: LIST: a list of 2D numpy arrays of shape (n, 2) - ''' - self.time_varying = isinstance(vertices, list) - if self.time_varying: - self.steps = len(vertices) - else: - self.steps = np.inf - vertices = [vertices] - self.time_varying_obstacle = [] - for v in vertices: - self.time_varying_obstacle.append(self._construct_collision_object(v)) - - def _construct_collision_object(self, vertices): - """ - Construct a collision object from a list of vertices - vertices: a 2D numpy array of shape (n, 2) - """ - num_points = vertices.shape[0] - dim = vertices.shape[1] - - assert num_points >= 3, f"The number of vertices ({num_points}) must be at least 3" - assert dim == 2, f"The dimension of the vertices ({dim}) must be 2" - - # add a column of ones to the polygon - vertices = np.append(vertices, np.ones((num_points, 1)), axis=1) - - # create a polytope object - verts = hppfcl.StdVec_Vec3f() - verts.extend(list(vertices)) - vertices[:,-1] = -1 - verts.extend(list(vertices)) - - polygon = hppfcl.Convex.convexHull(verts, True, None) - - return polygon - - def at(self, step): - assert step < self.steps, f"The step {step} is out of range [0, {self.steps-1}]" - if not self.time_varying: - step = 0 - return self.time_varying_obstacle[step] \ No newline at end of file diff --git a/scripts/planner/iLQR/cost/control_cost.py b/scripts/planner/iLQR/cost/control_cost.py deleted file mode 100644 index dd8d1e0..0000000 --- a/scripts/planner/iLQR/cost/control_cost.py +++ /dev/null @@ -1,45 +0,0 @@ -from abc import ABC, abstractmethod -from functools import partial -from jaxlib.xla_extension import DeviceArray -import jax -from jax import numpy as jnp -import numpy as np - -from .base_cost import BaseCost -from .base_cost import quadratic_cost, huber_cost - - -class ControlCost(BaseCost): - def __init__(self, config): - super().__init__() - self.weight = jnp.array([config.ctrl_cost_accel_weight, - config.ctrl_cost_steer_weight]) # shape of (dim_u) - - self.delta = jnp.array([config.ctrl_cost_accel_huber_delta, - config.ctrl_cost_steer_huber_delta]) # shape of (dim_u) - - if config.ctrl_cost_type == 'quadratic': - self.cost_func = quadratic_cost - elif config.ctrl_cost_type == 'huber': - self.cost_func = huber_cost - else: - raise NotImplementedError( - f'Cost type {config.ctrl_cost_type} not implemented for CTRL COST. '+ - 'Please choose from [quadratic, huber]' - ) - - - @partial(jax.jit, static_argnums=(0,)) - def get_running_cost( - self, state: DeviceArray, ctrl: DeviceArray, ref: DeviceArray - ) -> float: - ''' - Given a state, control, and time index, return the cost. - Input: - state: (dim_x) state - ctrl: (dim_u) control - ref: (dim_ref) reference - return: - cost: float - ''' - return jnp.sum(self.cost_func(ctrl, self.weight, self.delta)) \ No newline at end of file diff --git a/scripts/planner/iLQR/cost/cost.py b/scripts/planner/iLQR/cost/cost.py deleted file mode 100644 index 48153b4..0000000 --- a/scripts/planner/iLQR/cost/cost.py +++ /dev/null @@ -1,100 +0,0 @@ -import jax -from jax import numpy as jnp -import numpy as np -from jaxlib.xla_extension import DeviceArray -from typing import Union - -from .state_cost import StateCost -from .control_cost import ControlCost -from .obstacle_cost import ObstacleCost - -import time - -class Cost(): - def __init__(self, config): - super().__init__() - - self.state_cost = StateCost(config) - self.control_cost = ControlCost(config) - self.obstacle_cost = ObstacleCost(config) - - def get_traj_cost( - self, trajectory: Union[np.ndarray, DeviceArray], - controls: Union[np.ndarray, DeviceArray], - path_refs: Union[np.ndarray, DeviceArray], - obs_refs: list = None - ) -> float: - ''' - Given a state, control, and references, return the sum of the cost. - Input: - trajectory: (dim_x, N) array of state trajectory - controls: (dim_u, N) array of control sequence - path_refs: (dim_ref, N) array of references (e.g. reference path, reference velocity, etc.) - obs_refs: -Optional- (num_obstacle, 2, N) List of obstacles - return: - cost: float, sum of the running cost over the trajectory - ''' - - state_cost = self.state_cost.get_traj_cost(trajectory, controls, path_refs) - - control_cost = self.control_cost.get_traj_cost(trajectory, controls, path_refs) - - obstacle_cost = self.obstacle_cost.get_traj_cost(trajectory, controls, obs_refs) - - return state_cost + control_cost + obstacle_cost - - def get_derivatives_jax( - self, trajectory: Union[np.ndarray, DeviceArray], - controls: Union[np.ndarray, DeviceArray], - path_refs: Union[np.ndarray, DeviceArray], - obs_refs: list = None - ) -> tuple: - ''' - Given a state, control, and references, return Jacobians and Hessians of cost function - Input: - trajectory: (dim_x, N) array of state trajectory - controls: (dim_u, N) array of control sequence - path_refs: (dim_ref, N) array of references (e.g. reference path, reference velocity, etc.) - obs_refs: -Optional- (num_obstacle, 2, N) List of obstacles - return: - q: DeviceArray, jacobian of cost function w.r.t. trajectory - r: DeviceArray, jacobian of cost function w.r.t. controls - Q: DeviceArray, hessian of cost function w.r.t. trajectory - R: DeviceArray, hessian of cost function w.r.t. controls - H: DeviceArray, hessian of cost function w.r.t. trajectory and controls - ''' - - (state_q, state_r, state_Q, - state_R, state_H) = self.state_cost.get_derivatives_jax(trajectory, controls, path_refs) - - (ctrl_q, ctrl_r, ctrl_Q, - ctrl_R, ctrl_H) = self.control_cost.get_derivatives_jax(trajectory, controls, path_refs) - - (obs_q, obs_r, obs_Q, - obs_R, obs_H) = self.obstacle_cost.get_derivatives_jax(trajectory, controls, obs_refs) - - return (state_q + ctrl_q + obs_q, - state_r + ctrl_r + obs_r, - state_Q + ctrl_Q + obs_Q, - state_R + ctrl_R + obs_R, - state_H + ctrl_H + obs_H) - - def get_derivatives_np( - self, trajectory: np.ndarray, controls: np.ndarray, path_refs: np.ndarray, obs_refs: list = None - ) -> tuple: - ''' - Given a state, control, and references, return Jacobians and Hessians of cost function - Input: - trajectory: (dim_x, N) array of state trajectory - controls: (dim_u, N) array of control sequence - path_refs: (dim_ref, N) array of references (e.g. reference path, reference velocity, etc.) - obs_refs: -Optional- (num_obstacle, 2, N) List of obstacles - return: - q: np.ndarray, jacobian of cost function w.r.t. trajectory - r: np.ndarray, jacobian of cost function w.r.t. controls - Q: np.ndarray, hessian of cost function w.r.t. trajectory - R: np.ndarray, hessian of cost function w.r.t. controls - H: np.ndarray, hessian of cost function w.r.t. trajectory and controls - ''' - q, r, Q, R, H = self.get_derivatives_jax(trajectory, controls, path_refs, obs_refs) - return np.asarray(q), np.asarray(r), np.asarray(Q), np.asarray(R), np.asarray(H) \ No newline at end of file diff --git a/scripts/planner/iLQR/cost/obstacle_cost.py b/scripts/planner/iLQR/cost/obstacle_cost.py deleted file mode 100644 index 5703e90..0000000 --- a/scripts/planner/iLQR/cost/obstacle_cost.py +++ /dev/null @@ -1,110 +0,0 @@ -from abc import ABC, abstractmethod -from functools import partial -from jaxlib.xla_extension import DeviceArray -import jax -from jax import numpy as jnp -import numpy as np - -from .base_cost import BaseCost -from .base_cost import exp_linear_cost -import warnings - -class SingleObstacleCost(BaseCost): - def __init__(self, config): - ''' - Obtain the cost and its gradient w.r.t to a single obstacle given a state, control, - ''' - super().__init__() - - - self.cost_func = exp_linear_cost - - self.obs_a = config.obs_a - self.obs_b = config.obs_b - - @partial(jax.jit, static_argnums=(0,)) - def get_running_cost( - self, state: DeviceArray, ctrl: DeviceArray, ref: np.ndarray - ) -> float: - ''' - Given a state, control, and time index, return the cost. - Input: - state: (dim_x) state [x, y, v, psi, delta] - ctrl: (dim_u) control - ref: (5,) obstacle reference - time_idx: int (1) - return: - cost: float - ''' - psi = state[3] - rot_mat = jnp.array([[jnp.cos(psi), -jnp.sin(psi)], - [jnp.sin(psi), jnp.cos(psi)]]) - - ego_pt = ref[:2] - obs_pt = ref[2:4] - dis_ref = ref[4] - - ego_pt_global = jnp.matmul(rot_mat, ego_pt) + state[:2] - - diff = obs_pt - ego_pt_global - - # handle the case when distance is 0 (when the obstacle touch each other) - # THis leads to nan in the gradient - dis = jnp.linalg.norm(diff) - y = jnp.where(dis >= 1e-4, dis, 1e-4) - - y= -1*jnp.sign(dis_ref)*y - - # Do not calrlate gradient w.r.t velocity - v_no_grad = jax.lax.stop_gradient(state[2]) - obs_b = self.obs_b#*jnp.maximum(v_no_grad, 1) - - return exp_linear_cost(y, self.obs_a, obs_b) - -class ObstacleCost(): - def __init__(self, config): - self.single_obstacle_cost = SingleObstacleCost(config) - - def get_traj_cost( - self, trajectory: DeviceArray, controls: DeviceArray, obs_refs: np.ndarray - ) -> float: - ''' - Given a state, control, and time index, return the sum of the cost. - Input: - trajectory: (dim_x, N) List of trajectory - controls: (dim_u, N) List of controls - path_refs: (num_obstacle, dim_ref, N) List of references - return: - cost: float - ''' - cost = 0 - if obs_refs is not None: - if not isinstance(obs_refs, np.ndarray): - warnings.warn(f"'obs_refs' is a class of {type(obs_refs)} instead of . "+ \ - "There maybe performance issue due to sliceing []") - num_obstacle = obs_refs.shape[0] - for i in range(num_obstacle): - cost += self.single_obstacle_cost.get_traj_cost(trajectory, controls, obs_refs[i, :, :]) - return cost - - def get_derivatives_jax( - self, trajectory: DeviceArray, controls: DeviceArray, obs_refs: DeviceArray - ) -> DeviceArray: - - q = 0 - r = 0 - Q = 0 - R = 0 - H = 0 - - if obs_refs is not None: - num_obstacle = obs_refs.shape[0] - - for i in range(num_obstacle): - cx_, r_, Q_, R_, H_ = self.single_obstacle_cost.get_derivatives_jax(trajectory, controls, obs_refs[i]) - q += cx_ - r += r_ - Q += Q_ - R += R_ - H += H_ - return (q, r, Q, R, H) \ No newline at end of file diff --git a/scripts/planner/iLQR/cost/state_cost.py b/scripts/planner/iLQR/cost/state_cost.py deleted file mode 100644 index c4d8c10..0000000 --- a/scripts/planner/iLQR/cost/state_cost.py +++ /dev/null @@ -1,152 +0,0 @@ -from abc import ABC, abstractmethod -from functools import partial -from typing import Optional -from jaxlib.xla_extension import DeviceArray -import jax -from jax import numpy as jnp -import numpy as np - -from .base_cost import BaseCost -from .base_cost import quadratic_cost, huber_cost, exp_linear_cost - -def raise_error(cost_type, cost_name): - raise NotImplementedError( - f'Cost type {cost_type} not implemented for {cost_name} of STATE COST.' - ) - -class StateCost(BaseCost): - def __init__(self, config): - super().__init__() - - # Reference Path Cost - self.dim_path_slope = config.dim_path_slope - self.dim_closest_pt_x = config.dim_closest_pt_x - self.dim_closest_pt_y = config.dim_closest_pt_y - self.path_weight = config.path_weight - self.path_delta = config.path_huber_delta - - if config.path_cost_type == 'quadratic': - self.path_cost_func = quadratic_cost - elif config.path_cost_type == 'huber': - self.path_cost_func = huber_cost - else: - raise_error(config.path_cost_type, 'PATH_COST') - - # Reference Velocity Cost - self.v_ref = config.v_ref - self.vel_weight = config.vel_weight - self.vel_delta = config.vel_huber_delta - if config.vel_cost_type == 'quadratic': - self.vel_cost_func = quadratic_cost - elif config.vel_cost_type == 'huber': - self.vel_cost_func = huber_cost - else: - raise_error(config.vel_cost_type, 'VEL_COST') - - # Speed Limit Cost - self.dim_vel_limit = config.dim_vel_limit - self.vel_limit_a = config.vel_limit_a - self.vel_limit_b = config.vel_limit_b - - # Lateral Acceleration Cost - self.wheelbase = config.wheelbase - self.lat_accel_thres = config.lat_accel_thres #float - self.lat_accel_a = config.lat_accel_a # float - self.lat_accel_b = config.lat_accel_b # float - self.lat_accel_cost_func = exp_linear_cost - - # Cost against large heading difference with reference - self.heading_weight = config.heading_weight - self.heading_huber_delta = config.heading_huber_delta - if config.heading_cost_type == 'quadratic': - self.heading_cost_func = quadratic_cost - elif config.heading_cost_type == 'huber': - self.heading_cost_func = huber_cost - - # Progress Cost - self.dim_progress = config.dim_progress - self.progress_weight = config.progress_weight/(config.T * config.dt) - - # Boundary Cost - self.width = config.width - self.dim_right_boundary = config.dim_right_boundary - self.dim_left_boundary = config.dim_left_boundary - self.lane_boundary_a = config.lane_boundary_a - self.lane_boundary_b = config.lane_boundary_b - - @partial(jax.jit, static_argnums=(0,)) - def get_terminal_cost( - self, ref: DeviceArray - ) -> float: - ''' - Since the progress is calulate from PySpline, - it is intracable to make it work with JAX. - However, we can locally approximate the progress's derivative - with method described in the MPCC. - In this case, we can add a terminal cost to reward the total progress - the vehicle has made. - ''' - progress = ref[self.dim_progress,:] - return -self.progress_weight * (progress[-1] - progress[0]) - - - @partial(jax.jit, static_argnums=(0,)) - def get_running_cost(self, state, ctrl, ref): - ''' - Given a state, control, and time index, return the cost. - Input: - state: (dim_x) - [x, y, v, psi, delta] - ctrl: (dim_u) - ref: (dim_ref) reference - time_idx: int (1) - return: - cost: float - ''' - # Cost for the vehicle's deviation from the reference path - slope = ref[self.dim_path_slope] - closest_pt_x = ref[self.dim_closest_pt_x] - closest_pt_y = ref[self.dim_closest_pt_y] - sr = jnp.sin(slope) - cr = jnp.cos(slope) - path_dev = sr * (state[0] - closest_pt_x) - cr *(state[1] - closest_pt_y) - path_cost = self.path_cost_func(path_dev, self.path_weight, self.path_delta) - - # boundary_cost - right_boundary = ref[self.dim_right_boundary] - left_boundary = ref[self.dim_left_boundary] - b_right = path_dev - right_boundary + self.width/2.0 - b_left = -path_dev - left_boundary + self.width/2.0 - - boundary_cost = exp_linear_cost(b_right, self.lane_boundary_a, self.lane_boundary_b) + \ - exp_linear_cost(b_left, self.lane_boundary_a, self.lane_boundary_b) - - # Cost for the vehicle's deviation from the reference heading - delta_heading = state[3] - slope - delta_heading = jnp.arctan2(jnp.sin(delta_heading), jnp.cos(delta_heading)) - heading_cost = self.heading_cost_func(delta_heading, self.heading_weight, self.heading_huber_delta) - - # Cost for speed limit - speed_limit = ref[self.dim_vel_limit] - speed_limit_cost = exp_linear_cost(state[2]-speed_limit, self.vel_limit_a, self.vel_limit_b) - - # Progress cost - # This is always zero due to the way the closest point is calculated - # However, it will help us to calculate the gradient of the cost function - progress_cost = -self.progress_weight*(cr*(state[0] - closest_pt_x) + sr*(state[1] - closest_pt_y)) - - # Cost for the vehicle's deviation from the reference velocity - vel_ref = jnp.minimum(ref[self.dim_vel_limit], self.v_ref) - vel_dev = state[2] - vel_ref - vel_cost = self.vel_cost_func(vel_dev, self.vel_weight, self.vel_delta) - - # Cost for the vehicle's lateral acceleration - lat_accel = jnp.abs(state[2]**2 / self.wheelbase * jnp.tan(state[4])) \ - - self.lat_accel_thres - - lat_accel_cost = self.lat_accel_cost_func(lat_accel, - self.lat_accel_a, - self.lat_accel_b) - - return path_cost + vel_cost + \ - lat_accel_cost + heading_cost + \ - speed_limit_cost+progress_cost + boundary_cost \ No newline at end of file diff --git a/scripts/planner/iLQR/dynamics/__init__.py b/scripts/planner/iLQR/dynamics/__init__.py deleted file mode 100644 index bba2521..0000000 --- a/scripts/planner/iLQR/dynamics/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .bicycle5d import Bicycle5D \ No newline at end of file diff --git a/scripts/planner/iLQR/dynamics/bicycle5d.py b/scripts/planner/iLQR/dynamics/bicycle5d.py deleted file mode 100644 index f5c60d9..0000000 --- a/scripts/planner/iLQR/dynamics/bicycle5d.py +++ /dev/null @@ -1,287 +0,0 @@ -from typing import Tuple, Any -import numpy as np -from functools import partial -from jaxlib.xla_extension import DeviceArray -import jax -from jax import numpy as jnp - -class Bicycle5D(): - - def __init__(self, config: Any) -> None: - """ - Implements the bicycle dynamics (for Princeton race car). The state is the - center of the rear axis. - State: [x, y, v, psi, delta] - Control: [accel, omega] - dx_k = v_k cos(psi_k) - dy_k = v_k sin(psi_k) - dv_k = accel_k - dpsi_k = v_k tan(delta_k) / L - ddelta_k = omega_k - - Args: - config (Any): an object specifies configuration. - action_space (np.ndarray): action space. - """ - self.dt: float = config.dt # time step for each planning step - self.dim_u = 2 # [accel, omega]. - self.dim_x = 5 # [x, y, v, psi, delta]. - - # load parameters - self.wheelbase: float = config.wheelbase # vehicle chassis length - - self.delta_min = config.delta_min - self.delta_max = config.delta_max - self.v_min = config.v_min - self.v_max = config.v_max - - self.a_min = config.a_min - self.a_max = config.a_max - self.omega_min = config.omega_min - self.omega_max = config.omega_max - - self.ctrl_limits = jnp.array([[self.a_min, self.a_max], - [self.omega_min, self.omega_max]]) - - self.state_limits = jnp.array([[-jnp.inf, jnp.inf], - [-jnp.inf, jnp.inf], - [self.v_min, self.v_max], - [-jnp.inf, jnp.inf], - [self.delta_min, self.delta_max]]) - - ############################################################################### - ################ Numpy Function ###################### - ############################################################################### - def integrate_forward_np( - self, state: np.ndarray, control: np.ndarray, **kwargs - ) -> Tuple[np.ndarray, np.ndarray]: - """ - Finds the next state of the vehicle given the current state and - control input. - - Args: - state (np.ndarray) [5]. - control (np.ndarray) [2]. - - Returns: - state_next: np.ndarray: next state. [5] - control_clip: np.ndarray: clipped control. [2] - """ - state_nxt, ctrl_clip = self.integrate_forward_jax(state, control) - return np.asarray(state_nxt), np.asarray(ctrl_clip) - - def get_jacobian_np( - self, trajectory: DeviceArray, controls: DeviceArray - ) -> Tuple[np.ndarray, np.ndarray]: - """ - Returns the linearized 'A' and 'B' matrix of the ego vehicle around - nominal trajectory and controls. - - Args: - trajectory (DeviceArray): trajectory along the nominal trajectory. - controls (DeviceArray): controls along the trajectory. - - Returns: - np.ndarray: the Jacobian of the dynamics w.r.t. the state. - np.ndarray: the Jacobian of the dynamics w.r.t. the control. - """ - A_jax, B_jax = self.get_jacobian_jax(trajectory, controls) - return np.asarray(A_jax), np.asarray(B_jax) - - def rollout_nominal_np( - self, initial_state: np.ndarray, controls: DeviceArray - ) -> Tuple[DeviceArray, DeviceArray]: - ''' - Rolls out the nominal trajectory Given the controls and initial state. - - Args: - initial_state (np.ndarray): initial state. - controls (np.ndarray): controls. - - Returns: - trajectory (np.ndarray): roll out trajectory along the nominal trajectory. - ctrls_clip (np.ndarray): clipped control sequence - ''' - trajectory, ctrls_clip = self.rollout_nominal_jax(initial_state, controls) - return np.asarray(trajectory), np.asarray(ctrls_clip) - - ############################################################################### - ################ Jax Function ###################### - ############################################################################### - @partial(jax.jit, static_argnums=(0,)) - def rollout_nominal_jax( - self, initial_state: np.ndarray, controls: np.ndarray, - ) -> Tuple[DeviceArray, DeviceArray]: - ''' - Rolls out the nominal trajectory Given the controls and initial state. - - Args: - initial_state (np.ndarray): initial state. - controls (np.ndarray): controls. - - Returns: - trajectory (DeviceArray): roll out trajectory along the nominal trajectory. - ctrls_clip (DeviceArray): clipped control sequence - ''' - n = controls.shape[1] - @jax.jit - def _rollout_nominal_step(i, args): - trajectory, controls = args - x_nxt, u_clip = self.integrate_forward_jax(trajectory[:, i], controls[:, i]) - trajectory = trajectory.at[:, i + 1].set(x_nxt) - controls = controls.at[:, i].set(u_clip) - return trajectory, controls - - trajectory = jnp.zeros((self.dim_x, n)) - trajectory = trajectory.at[:, 0].set(initial_state) - trajectory, ctrls_clip = jax.lax.fori_loop( - 0, n - 1, _rollout_nominal_step, (trajectory, controls) - ) - - # Make the heading angle in [-pi, pi] - trajectory.at[3,:].set( - jnp.mod(trajectory[3, :] + jnp.pi, 2 * jnp.pi) - jnp.pi - ) - return trajectory, ctrls_clip - - @partial(jax.jit, static_argnums=(0,)) - def get_jacobian_jax( - self, trajectory: DeviceArray, controls: DeviceArray - ) -> Tuple[DeviceArray, DeviceArray]: - """ - Returns the linearized 'A' and 'B' matrix of the ego vehicle around - nominal trajectory and controls. - - Args: - trajectory (DeviceArray): trajectory along the nominal trajectory. - controls (DeviceArray): controls along the trajectory. - - Returns: - A (DeviceArray): the Jacobian of the dynamics w.r.t. the state. - B (DeviceArray): the Jacobian of the dynamics w.r.t. the control. - """ - _jac = jax.jacfwd(self._integrate_forward, argnums=[0, 1]) - jac = jax.jit(jax.vmap(_jac, in_axes=(1, 1), out_axes=(2, 2))) - return jac(trajectory, controls) - - @partial(jax.jit, static_argnums=(0,)) - def get_hessian_jax( - self, trajectory: DeviceArray, controls: DeviceArray - ) -> Tuple[DeviceArray, DeviceArray]: - """ - Returns the linearized 'A' and 'B' matrix of the ego vehicle around - nominal trajectory and controls. - - Args: - trajectory (DeviceArray): trajectory along the nominal trajectory. - controls (DeviceArray): controls along the trajectory. - - Returns: - A (DeviceArray): the Jacobian of the dynamics w.r.t. the state. - B (DeviceArray): the Jacobian of the dynamics w.r.t. the control. - """ - _fxx = jax.jacfwd(jax.jacrev(self._integrate_forward, argnums=0), argnums=0) - _fuu = jax.jacfwd(jax.jacrev(self._integrate_forward, argnums=1), argnums=1) - _fux = jax.jacfwd(jax.jacrev(self._integrate_forward, argnums=1), argnums=0) - fxx = jax.jit(jax.vmap(_fxx, in_axes=(1, 1), out_axes=3))(trajectory, controls) - fuu = jax.jit(jax.vmap(_fuu, in_axes=(1, 1), out_axes=3))(trajectory, controls) - fux = jax.jit(jax.vmap(_fux, in_axes=(1, 1), out_axes=3))(trajectory, controls) - return fxx, fuu, fux - - @partial(jax.jit, static_argnums=(0,)) - def integrate_forward_jax( - self, state: DeviceArray, control: DeviceArray - ) -> Tuple[DeviceArray, DeviceArray]: - """Clips the control and computes one-step time evolution of the system. - - Args: - state (DeviceArray): [x, y, v, psi, delta]. - control (DeviceArray): [accel, omega]. - - Returns: - DeviceArray: next state. [5] - DeviceArray: clipped control. [2] - """ - # Clips the controller values between min and max accel and steer values. - ctrl_clip = jnp.clip(control, self.ctrl_limits[:, 0], self.ctrl_limits[:, 1]) - state_nxt = self._integrate_forward(state, ctrl_clip) - - # Important Note: we need carefully handle the mod in forward pass as well. - state_nxt = state_nxt.at[3].set( - jnp.mod(state_nxt[3] + jnp.pi, 2 * jnp.pi) - jnp.pi - ) - - # Clip the state to the limits. - state_nxt = jnp.clip(state_nxt, self.state_limits[:, 0], self.state_limits[:, 1]) - return state_nxt, ctrl_clip - - ############################################################################### - ################ Helper functions for the dynamics model ###################### - ############################################################################### - @partial(jax.jit, static_argnums=(0,)) - def deriv(self, state: DeviceArray, control: DeviceArray) -> DeviceArray: - """ Computes the continuous system dynamics x_dot = f(x, u). - dx_k = v_k cos(psi_k) - dy_k = v_k sin(psi_k) - dv_k = u[0]_k - dpsi_k = v_k tan(delta_k) / L - ddelta_k = u[1]_k - - Args: - state (DeviceArray): [x, y, v, psi, delta]. - control (DeviceArray): [accel, omega]. - - Returns: - DeviceArray: next state. - """ - deriv = jnp.zeros((self.dim_x,)) - deriv = deriv.at[0].set(state[2] * jnp.cos(state[3])) - deriv = deriv.at[1].set(state[2] * jnp.sin(state[3])) - deriv = deriv.at[2].set(control[0]) - deriv = deriv.at[3].set(state[2] * jnp.tan(state[4]) / self.wheelbase) - deriv = deriv.at[4].set(control[1]) - return deriv - - @partial(jax.jit, static_argnums=(0,)) - def _integrate_forward( - self, state: DeviceArray, control: DeviceArray - ) -> DeviceArray: - """ Computes one-step time evolution of the system: x_+ = f(x, u). - The discrete-time dynamics is as below: - x_k+1 = x_k + v_k cos(psi_k) dt - y_k+1 = y_k + v_k sin(psi_k) dt - v_k+1 = v_k + u0_k dt - psi_k+1 = psi_k + v_k tan(delta_k) / L dt - delta_k+1 = delta_k + u1_k dt - - Args: - state (DeviceArray): [x, y, v, psi, delta]. - control (DeviceArray): [accel, omega]. - - Returns: - DeviceArray: next state. - """ - state_nxt = self._integrate_forward_dt(state, control, self.dt) - return state_nxt - - @partial(jax.jit, static_argnums=(0,)) - def _integrate_forward_dt( - self, state: DeviceArray, control: DeviceArray, dt: float - ) -> DeviceArray: - """4th-order Runge-Kutta method. - - Args: - state (DeviceArray): current state - control (DeviceArray): current control - dt (float): time horizon to intergrate - - Returns: - DeviceArray: next state - """ - k1 = self.deriv(state, control) - k2 = self.deriv(state + k1*dt/2, control) - k3 = self.deriv(state + k2*dt/2, control) - k4 = self.deriv(state + k3*dt, control) - return state + (k1 + 2*k2 + 2*k3 + k4) * dt / 6 - - diff --git a/scripts/planner/iLQR/ilqr.py b/scripts/planner/iLQR/ilqr.py deleted file mode 100644 index e3120d0..0000000 --- a/scripts/planner/iLQR/ilqr.py +++ /dev/null @@ -1,376 +0,0 @@ -from typing import Tuple, Optional, Dict -import time -import os -import numpy as np -from functools import partial -import jax -from jax import numpy as jnp -from jaxlib.xla_extension import DeviceArray - -from .dynamics import Bicycle5D -from .cost import Cost, CollisionChecker, Obstacle -from .ref_path import RefPath -from .config import Config -import time - -status_lookup = ["Iteration Limit Exceed", - "Converged", - "Failed Line Search"] -class iLQR(): - def __init__(self, config_file = None) -> None: - - self.config = Config() # Load default config. - if config_file is not None: - self.config.load_config(config_file) # Load config from file. - - print("iLQR setting:", self.config) - - # Set up Jax parameters - jax.config.update('jax_platform_name', self.config.platform) - print("Jax using Platform: ", jax.lib.xla_bridge.get_backend().platform) - # If you want to use GPU, lower the memory fraction from 90% to avoid OOM. - os.environ["XLA_PYTHON_CLIENT_MEM_FRACTION"] = "20" - - self.dyn = Bicycle5D(self.config) - self.cost = Cost(self.config) - self.collision_checker = CollisionChecker(self.config) - self.obstacle_list = [] - self.ref_path = None - - # iLQR parameters - self.dim_x = self.config.num_dim_x - self.dim_u = self.config.num_dim_u - self.T = int(self.config.T) - self.dt = float(self.config.dt) - self.max_iter = int(self.config.max_iter) - self.tol = float(self.config.tol) # ILQR update tolerance. - - # line search parameters. - self.alphas = self.config.line_search_base**(np.arange(self.config.line_search_a, - self.config.line_search_b, - self.config.line_search_c) - ) - - print("Line Search Alphas: ", self.alphas) - - # regularization parameters - self.reg_min = float(self.config.reg_min) - self.reg_max = float(self.config.reg_max) - self.reg_init = float(self.config.reg_init) - self.reg_scale_up = float(self.config.reg_scale_up) - self.reg_scale_down = float(self.config.reg_scale_down) - self.max_attempt = self.config.max_attempt - self.warm_up() - - def update_ref_path(self, ref_path: RefPath): - self.ref_path = ref_path - - def update_obstacles(self, vertices_list: list): - self.obstacle_list = [] - for vertices in vertices_list: - self.obstacle_list.append(Obstacle(vertices)) - - def get_references(self, trajectory): - states_np = np.asarray(trajectory) - path_refs = self.ref_path.get_reference(states_np[:2, :]) - obs_refs = self.collision_checker.check_collisions(states_np, self.obstacle_list) - return path_refs, obs_refs - - def plan(self, init_state: np.ndarray, - controls: Optional[np.ndarray] = None, verbose=False) -> Dict: - - # We first check if the planner is ready - if self.ref_path is None: - return dict(status=-1) - - t_start = time.time() - ''' - Main iLQR loop. - Args: - init_state: [num_dim_x] np.ndarray: initial state. - control: [num_dim_u, N] np.ndarray: initial control. - ''' - # np.set_printoptions(suppress=True, precision=5) - if controls is None: - controls =np.zeros((self.dim_u, self.T)) - else: - assert controls.shape[1] == self.T - - # Rolls out the nominal trajectory and gets the initial cost. - #* This is differnet from the naive iLQR as it relies on the information - #* from the pyspline. - trajectory, controls = self.dyn.rollout_nominal_jax(init_state, controls) - - path_refs, obs_refs = self.get_references(trajectory) - - J = self.cost.get_traj_cost(trajectory, controls, path_refs, obs_refs) - - converged = False - reg = self.reg_init - - # parameters for tracking the status - fail_attempts = 0 - - t_setup = time.time() - t_start - status = 0 - num_cost_der = 0 - num_dyn_der = 0 - num_back = 0 - num_forward = 0 - t_cost_der = 0 - t_dyn_der = 0 - t_back = 0 - t_forward = 0 - - for i in range(self.max_iter): - updated = False - # Get the derivatives of the cost - t0 = time.time() - q, r, Q, R, H = self.cost.get_derivatives_jax(trajectory, controls, path_refs, obs_refs) - t_cost_der += (time.time()-t0) - num_cost_der += 1 - - # We only need dynamics derivatives (A and B matrics) from 0 to N-2. - # But doing slice with Jax will leads to performance issue. - # So we calculate the derivatives from 0 to N-1 and Backward pass will ignore the last one. - t0 = time.time() - A, B = self.dyn.get_jacobian_jax(trajectory, controls) - fxx, fuu, fux = self.dyn.get_hessian_jax(trajectory, controls) - t_dyn_der += (time.time()-t0) - num_dyn_der += 1 - - #Backward pass with new regularization. - t0 = time.time() - K_closed_loop, k_open_loop, reg = self.backward_pass( - q=q, r=r, Q=Q, R=R, H=H, A=A, B=B, - fxx = fxx, fuu = fuu, fux = fux, - reg = reg - ) - t_back += (time.time()-t0) - num_back += 1 - - # Line search through alphas. - for alpha in self.alphas: - t0 = time.time() - X_new, U_new, J_new, refs_new, obs_refs_new = ( - self.forward_pass( - trajectory, controls, K_closed_loop, k_open_loop, alpha - ) - ) - t_forward += (time.time()-t0) - num_forward += 1 - - if J_new <= J: # Improved! - # Small improvement. - # if np.abs(J-J_new)/max(1, np.abs(J)) < self.tol: - if np.abs(J-J_new) < (self.tol*min(1,alpha)): - converged = True - if verbose: - print("Update from ", J, " to ", J_new, "reg: ", reg, - "alpha: {0:0.3f}".format(alpha), "{0:0.3f}".format(time.time()-t_start)) - # Updates nominal trajectory and best cost. - J = J_new - trajectory = X_new - controls = U_new - path_refs = refs_new - obs_refs = obs_refs_new - reg = max(self.reg_min, reg/self.reg_scale_down) - updated = True - fail_attempts = 0 - break # break the for loop - elif np.abs(J-J_new) < 1e-3: - converged = True - if verbose: - print(f"cost increase from {J} to {J_new}, but the difference is {np.abs(J-J_new)} is small.") - break - - # Terminates early if the objective improvement is negligible. - if converged: - status = 1 - break - - # if no line search succeeds, terminate. - if not updated: - fail_attempts += 1 - reg = reg*self.reg_scale_up - if verbose: - print(f"Fail attempts {fail_attempts}, cost increase from {J} to {J_new} new reg {reg}.") - if fail_attempts > self.max_attempt or reg > self.reg_max: - status = 2 - break - - t_process = time.time() - t_start - analysis_string = f"Exit after {i} iterations with runtime {t_process} with status {status_lookup[status]}. "+ \ - f"Set uo takes {t_setup} s. " + \ - f"Total {num_cost_der} cost derivative with average time of {t_cost_der/num_cost_der} s. " + \ - f"Total {num_dyn_der} dyn derivative with average time of {t_dyn_der/num_dyn_der} s. " + \ - f"Total {num_forward} forward with average time of {t_forward/num_forward} s. " +\ - f"Total {num_back} cost derivative with average time of {t_back/num_back} s." - - if verbose: - print(analysis_string) - - solver_info = dict( - trajectory=np.asarray(trajectory), controls=np.asarray(controls), - K_closed_loop=np.asarray(K_closed_loop), - k_open_loop=np.asarray(k_open_loop), t_process=t_process, - status=status, J=J, q=q, r=r, - Q=Q, R=R, H=H, - A=A, B=B, N = self.T, dt = self.dt, - analysis = analysis_string - ) - return solver_info - - def forward_pass( - self, nominal_states: DeviceArray, nominal_controls: DeviceArray, - K_closed_loop: DeviceArray, k_open_loop: DeviceArray, alpha: float - ) -> Tuple[DeviceArray, DeviceArray, float, DeviceArray, DeviceArray, - DeviceArray]: - X, U = self.rollout( - nominal_states, nominal_controls, K_closed_loop, k_open_loop, alpha - ) - #* This is differnet from the naive iLQR as it relies on the information - #* from the pyspline. Thus, it cannot be used with jit. - path_refs, obs_refs = self.get_references(X) - - J = self.cost.get_traj_cost(X, U, path_refs, obs_refs) - - return X, U, J, path_refs, obs_refs - - @partial(jax.jit, static_argnums=(0,)) - def backward_pass( - self, q: DeviceArray, r: DeviceArray, Q: DeviceArray, - R: DeviceArray, H: DeviceArray, A: DeviceArray, B: DeviceArray, - fxx: DeviceArray, fuu: DeviceArray, fux: DeviceArray, - reg: float, ddp: bool = True - ) -> Tuple[DeviceArray, DeviceArray]: - """ - Jitted backward pass looped computation. - - Args: - q (DeviceArray): (dim_x, N) - r (DeviceArray): (dim_u, N) - Q (DeviceArray): (dim_x, dim_x, N) - R (DeviceArray): (dim_u, dim_u, N) - H (DeviceArray): (dim_u, dim_x, N) - A (DeviceArray): (dim_x, dim_x, N-1) - B (DeviceArray): (dim_x, dim_u, N-1) - - Returns: - Ks (DeviceArray): gain matrices (dim_u, dim_x, N - 1) - ks (DeviceArray): gain vectors (dim_u, N - 1) - """ - def init(): - Ks = jnp.zeros((self.dim_u, self.dim_x, self.T - 1)) - ks = jnp.zeros((self.dim_u, self.T - 1)) - V_x = q[:, -1] - V_xx = Q[:, :, -1] - return V_x, V_xx, ks, Ks - - @jax.jit - def backward_pass_looper(val): - V_x, V_xx, ks, Ks, t, reg = val - Q_x = q[:, t] + A[:, :, t].T @ V_x - Q_u = r[:, t] + B[:, :, t].T @ V_x - Q_xx = Q[:, :, t] + A[:, :, t].T @ V_xx @ A[:, :, t] + ddp*jnp.tensordot(V_x, fxx[:, :, :, t], axes=1) - Q_ux = H[:, :, t] + B[:, :, t].T @ V_xx @ A[:, :, t] + ddp*jnp.tensordot(V_x, fux[:, :, :, t], axes=1) - Q_uu = R[:, :, t] + B[:, :, t].T @ V_xx @ B[:, :, t] + ddp*jnp.tensordot(V_x, fuu[:, :, :, t], axes=1) - - # According to the paper, the regularization is added to Vxx for robustness. - # http://roboticexplorationlab.org/papers/iLQR_Tutorial.pdf - reg_mat = reg* jnp.eye(self.dim_x) - V_xx_reg = V_xx + reg_mat - Q_ux_reg = H[:, :, t] + B[:, :, t].T @ V_xx_reg @ A[:, :, t] + ddp*jnp.tensordot(V_x, fux[:, :, :, t], axes=1) - Q_uu_reg = R[:, :, t] + B[:, :, t].T @ V_xx_reg @ B[:, :, t] + ddp*jnp.tensordot(V_x, fuu[:, :, :, t], axes=1) - - @jax.jit - def isposdef(A, reg): - # If the regularization is too large, but the matrix is still not positive definite, - # we will let the backward pass continue to avoid infinite loop. - return (jnp.all(jnp.linalg.eigvalsh(A) > 0) | (reg >= self.reg_max)) - - @jax.jit - def false_func(val): - V_x, V_xx, ks, Ks = init() - updated_reg = self.reg_scale_up * reg - updated_reg = jax.lax.cond(updated_reg<=self.reg_max, - lambda x: x, lambda x: self.reg_max, updated_reg) - return V_x, V_xx, ks, Ks, self.T - 2, updated_reg - - @jax.jit - def true_func(val): - Ks, ks = val - Q_uu_reg_inv = jnp.linalg.inv(Q_uu_reg) - - Ks = Ks.at[:, :, t].set(-Q_uu_reg_inv @ Q_ux_reg) - ks = ks.at[:, t].set(-Q_uu_reg_inv @ Q_u) - - V_x = Q_x + Ks[:, :, t].T @ Q_u + Q_ux.T @ ks[:, t] + Ks[:, :, t].T @ Q_uu @ ks[:, t] - V_xx = Q_xx + Ks[:, :, t].T @ Q_ux+ Q_ux.T @ Ks[:, :, t] + Ks[:, :, t].T @ Q_uu @ Ks[:, :, t] - return V_x, V_xx, ks, Ks, t-1, reg - return jax.lax.cond(isposdef(Q_uu_reg, reg), true_func, false_func, (Ks, ks)) - - @jax.jit - def cond_fun(val): - _, _, _, _, t, _ = val - return t>=0 - - # Initializes. - V_x, V_xx, ks, Ks = init() - t = self.T - 2 - - V_x, V_xx, ks, Ks, t, reg = jax.lax.while_loop(cond_fun, backward_pass_looper,(V_x, V_xx, ks, Ks, t, reg)) - return Ks, ks, reg - - @partial(jax.jit, static_argnums=(0,)) - def rollout( - self, nominal_states: DeviceArray, nominal_controls: DeviceArray, - K_closed_loop: DeviceArray, k_open_loop: DeviceArray, alpha: float - ) -> Tuple[DeviceArray, DeviceArray]: - - @jax.jit - def _rollout_step(t, args): - X, U = args - dx = X[:, t] - nominal_states[:, t] - # VERY IMPORTANT: THIS IS A HACK TO MAKE THE ANGLE DIFFERENCE BETWEEN -pi and pi - dx = dx.at[3].set(jnp.mod(dx[3] + jnp.pi, 2 * jnp.pi) - jnp.pi) - u_fb = jnp.einsum("ik,k->i", K_closed_loop[:, :, t], dx) - u = nominal_controls[:, t] + alpha * k_open_loop[:, t] + u_fb - x_nxt, u_clip = self.dyn.integrate_forward_jax(X[:, t], u) - X = X.at[:, t + 1].set(x_nxt) - U = U.at[:, t].set(u_clip) - return X, U - - X = jnp.zeros((self.dim_x, self.T)) - U = jnp.zeros((self.dim_u, self.T)) # Assumes the last ctrl are zeros. - X = X.at[:, 0].set(nominal_states[:, 0]) - - X, U = jax.lax.fori_loop(0, self.T - 1, _rollout_step, (X, U)) - - return X, U - - def warm_up(self): - """Warm up the jitted functions.""" - - # Build a fake path as a 1 meter radius circle. - theta = np.linspace(0, 2 * jnp.pi, 100) - centerline = np.zeros([2, 100]) - centerline[0,:] = 1 * np.cos(theta) - centerline[1,:] = 1 * np.sin(theta) - - self.ref_path = RefPath(centerline, 0.5, 0.5, 1, True) - - # add obstacle - obs = np.array([[0, 0, 0.5, 0.5], [1, 1.5, 1, 1.5]]).T - obs_list = [[obs for _ in range(self.T)]] - self.update_obstacles(obs_list) - - x_init = np.array([0.0, -1.0, 1, 0, 0]) - print("Start warm up iLQR...") - self.plan(x_init, verbose=False) - print("iLQR warm up finished.") - - self.ref_path = None - self.obstacle_list = [] - - diff --git a/scripts/planner/iLQR/ilqr_np.py b/scripts/planner/iLQR/ilqr_np.py deleted file mode 100644 index 7b4fcba..0000000 --- a/scripts/planner/iLQR/ilqr_np.py +++ /dev/null @@ -1,340 +0,0 @@ -from typing import Tuple, Optional, Dict -import time -import os -import numpy as np -import jax - -from .dynamics import Bicycle5D -from .cost import Cost, CollisionChecker, Obstacle -from .ref_path import RefPath -from .config import Config -import time - -status_lookup = ["Iteration Limit Exceed", - "Converged", - "Failed Line Search"] - -class iLQRnp(): - def __init__(self, config_file = None) -> None: - - self.config = Config() # Load default config. - if config_file is not None: - self.config.load_config(config_file) # Load config from file. - - print("iLQR setting:", self.config) - - # Set up Jax parameters - jax.config.update('jax_platform_name', self.config.platform) - print("Jax using Platform: ", jax.lib.xla_bridge.get_backend().platform) - - # If you want to use GPU, lower the memory fraction from 90% to avoid OOM. - os.environ["XLA_PYTHON_CLIENT_MEM_FRACTION"] = "20" - - self.dyn = Bicycle5D(self.config) - self.cost = Cost(self.config) - self.collision_checker = CollisionChecker(self.config) - self.obstacle_list = [] - self.ref_path = None - - # iLQR parameters - self.dim_x = self.config.num_dim_x - self.dim_u = self.config.num_dim_u - self.T = int(self.config.T) - self.dt = float(self.config.dt) - self.max_iter = int(self.config.max_iter) - self.tol = float(self.config.tol) # ILQR update tolerance. - - # line search parameters. - self.alphas = self.config.line_search_base**(np.arange(self.config.line_search_a, - self.config.line_search_b, - self.config.line_search_c) - ) - - print("Line Search Alphas: ", self.alphas) - - # regularization parameters - self.reg_min = float(self.config.reg_min) - self.reg_max = float(self.config.reg_max) - self.reg_init = float(self.config.reg_init) - self.reg_scale_up = float(self.config.reg_scale_up) - self.reg_scale_down = float(self.config.reg_scale_down) - self.max_attempt = self.config.max_attempt - self.warm_up() - - def update_ref_path(self, ref_path: RefPath): - self.ref_path = ref_path - - def update_obstacles(self, vertices_list: list): - self.obstacle_list = [] - for vertices in vertices_list: - self.obstacle_list.append(Obstacle(vertices)) - - def get_references(self, trajectory): - states_np = np.asarray(trajectory) - path_refs = self.ref_path.get_reference(states_np[:2, :]) - obs_refs = self.collision_checker.check_collisions(states_np, self.obstacle_list) - return path_refs, obs_refs - - def plan(self, init_state: np.ndarray, - controls: Optional[np.ndarray] = None, verbose=False) -> Dict: - - # We first check if the planner is ready - if self.ref_path is None: - return dict(status=-1) - - t_start = time.time() - ''' - Main iLQR loop. - Args: - init_state: [num_dim_x] np.ndarray: initial state. - control: [num_dim_u, N] np.ndarray: initial control. - ''' - # np.set_printoptions(suppress=True, precision=5) - if controls is None: - controls =np.zeros((self.dim_u, self.T)) - else: - assert controls.shape[1] == self.T - - # Rolls out the nominal trajectory and gets the initial cost. - #* This is differnet from the naive iLQR as it relies on the information - #* from the pyspline. - trajectory, controls = self.dyn.rollout_nominal_np(init_state, controls) - - path_refs, obs_refs = self.get_references(trajectory) - - J = self.cost.get_traj_cost(trajectory, controls, path_refs, obs_refs) - - converged = False - reg = self.reg_init - - # parameters for tracking the status - fail_attempts = 0 - - t_setup = time.time() - t_start - status = 0 - num_cost_der = 0 - num_dyn_der = 0 - num_back = 0 - num_forward = 0 - t_cost_der = 0 - t_dyn_der = 0 - t_back = 0 - t_forward = 0 - - for i in range(self.max_iter): - updated = False - # Get the derivatives of the cost - t0 = time.time() - q, r, Q, R, H = self.cost.get_derivatives_np(trajectory, controls, path_refs, obs_refs) - t_cost_der += (time.time()-t0) - num_cost_der += 1 - - # We only need dynamics derivatives (A and B matrics) from 0 to N-2. - # But doing slice with Jax will leads to performance issue. - # So we calculate the derivatives from 0 to N-1 and Backward pass will ignore the last one. - t0 = time.time() - A, B = self.dyn.get_jacobian_np(trajectory, controls) - t_dyn_der += (time.time()-t0) - num_dyn_der += 1 - - #Backward pass with new regularization. - t0 = time.time() - K_closed_loop, k_open_loop, reg = self.backward_pass( - q=q, r=r, Q=Q, R=R, H=H, A=A, B=B, reg = reg - ) - t_back += (time.time()-t0) - num_back += 1 - - # Line search through alphas. - for alpha in self.alphas: - t0 = time.time() - X_new, U_new, J_new, refs_new, obs_refs_new = ( - self.forward_pass( - trajectory, controls, K_closed_loop, k_open_loop, alpha - ) - ) - t_forward += (time.time()-t0) - num_forward += 1 - - if J_new <= J: # Improved! - # Small improvement. - # if np.abs(J-J_new)/max(1, np.abs(J)) < self.tol: - if np.abs(J-J_new) < (self.tol*min(1,alpha)): - converged = True - if verbose: - print("Update from ", J, " to ", J_new, "reg: ", reg, - "alpha: {0:0.3f}".format(alpha), "{0:0.3f}".format(time.time()-t_start)) - # Updates nominal trajectory and best cost. - J = J_new - trajectory = X_new - controls = U_new - path_refs = refs_new - obs_refs = obs_refs_new - reg = max(self.reg_min, reg/self.reg_scale_down) - updated = True - fail_attempts = 0 - break # break the for loop - elif np.abs(J-J_new) < 1e-3: - converged = True - if verbose: - print(f"cost increase from {J} to {J_new}, but the difference is {np.abs(J-J_new)} is small.") - break - - # Terminates early if the objective improvement is negligible. - if converged: - status = 1 - break - - # if no line search succeeds, terminate. - if not updated: - fail_attempts += 1 - reg = reg*self.reg_scale_up - if verbose: - print(f"Fail attempts {fail_attempts}, cost increase from {J} to {J_new} new reg {reg}.") - if fail_attempts > self.max_attempt or reg > self.reg_max: - status = 2 - break - - t_process = time.time() - t_start - analysis_string = f"Exit after {i} iterations with runtime {t_process} with status {status_lookup[status]}. "+ \ - f"Set uo takes {t_setup} s. " + \ - f"Total {num_cost_der} cost derivative with average time of {t_cost_der/num_cost_der} s. " + \ - f"Total {num_dyn_der} dyn derivative with average time of {t_dyn_der/num_dyn_der} s. " + \ - f"Total {num_forward} forward with average time of {t_forward/num_forward} s. " +\ - f"Total {num_back} cost derivative with average time of {t_back/num_back} s." - - if verbose: - print(analysis_string) - - solver_info = dict( - trajectory=np.asarray(trajectory), controls=np.asarray(controls), - K_closed_loop=np.asarray(K_closed_loop), - k_open_loop=np.asarray(k_open_loop), t_process=t_process, - status=status, J=J, q=q, r=r, - Q=Q, R=R, H=H, - A=A, B=B, N = self.T, dt = self.dt, - analysis = analysis_string - ) - return solver_info - - def forward_pass( - self, nominal_states: np.ndarray, nominal_controls: np.ndarray, - K_closed_loop: np.ndarray, k_open_loop: np.ndarray, alpha: float - ) -> Tuple[np.ndarray, np.ndarray, float, np.ndarray, np.ndarray, - np.ndarray]: - X, U = self.rollout( - nominal_states, nominal_controls, K_closed_loop, k_open_loop, alpha - ) - path_refs, obs_refs = self.get_references(X) - - J = self.cost.get_traj_cost(X, U, path_refs, obs_refs) - - return X, U, J, path_refs, obs_refs - - def backward_pass( - self, q: np.ndarray, r: np.ndarray, Q: np.ndarray, - R: np.ndarray, H: np.ndarray, A: np.ndarray, B: np.ndarray, - reg: float - ) -> Tuple[np.ndarray, np.ndarray]: - """ - backward pass looped computation. - - Args: - q (np.ndarray): (dim_x, N) - r (np.ndarray): (dim_u, N) - Q (np.ndarray): (dim_x, dim_x, N) - R (np.ndarray): (dim_u, dim_u, N) - H (np.ndarray): (dim_u, dim_x, N) - A (np.ndarray): (dim_x, dim_x, N-1) - B (np.ndarray): (dim_x, dim_u, N-1) - - Returns: - Ks (np.ndarray): gain matrices (dim_u, dim_x, N - 1) - ks (np.ndarray): gain vectors (dim_u, N - 1) - """ - Ks = np.zeros((self.dim_u, self.dim_x, self.T - 1)) - ks = np.zeros((self.dim_u, self.T - 1)) - V_x = q[:, -1] - V_xx = Q[:, :, -1] - t = self.T-2 - - while t>=0: - Q_x = q[:, t] + A[:, :, t].T @ V_x - Q_u = r[:, t] + B[:, :, t].T @ V_x - Q_xx = Q[:, :, t] + A[:, :, t].T @ V_xx @ A[:, :, t] - Q_ux = H[:, :, t] + B[:, :, t].T @ V_xx @ A[:, :, t] - Q_uu = R[:, :, t] + B[:, :, t].T @ V_xx @ B[:, :, t] - - # According to the paper, the regularization is added to Vxx for robustness. - # http://roboticexplorationlab.org/papers/iLQR_Tutorial.pdf - reg_mat = reg* np.eye(self.dim_x) - V_xx_reg = V_xx + reg_mat - Q_ux_reg = H[:, :, t] + B[:, :, t].T @ V_xx_reg @ A[:, :, t] - Q_uu_reg = R[:, :, t] + B[:, :, t].T @ V_xx_reg @ B[:, :, t] - - if (not (np.all(np.linalg.eigvalsh(Q_uu_reg) > 0)) and (reg < self.reg_max)): - t = self.T-2 - V_x = q[:, -1] - V_xx = Q[:, :, -1] - reg *= self.reg_scale_up - continue - - Q_uu_reg_inv = np.linalg.inv(Q_uu_reg) - - Ks[:, :, t] = (-Q_uu_reg_inv @ Q_ux_reg) - ks[:, t] = (-Q_uu_reg_inv @ Q_u) - - V_x = Q_x + Ks[:,:,t].T @ Q_u + Q_ux.T @ ks[:, t] + Ks[:,:,t].T @ Q_uu @ ks[:, t] - V_xx = Q_xx + Ks[:, :, t].T @ Q_ux+ Q_ux.T @ Ks[:, :, t] + Ks[:, :, t].T @ Q_uu @ Ks[:, :, t] - t -= 1 - - return Ks, ks, reg - - def rollout( - self, nominal_states: np.ndarray, nominal_controls: np.ndarray, - K_closed_loop: np.ndarray, k_open_loop: np.ndarray, alpha: float - ) -> Tuple[np.ndarray, np.ndarray]: - X = np.zeros((self.dim_x, self.T)) - U = np.zeros((self.dim_u, self.T)) # Assumes the last ctrl are zeros. - - X[:,0] = nominal_states[:,0] - - for t in range(self.T - 1): - dx = X[:, t] - nominal_states[:, t] - # VERY IMPORTANT: THIS IS A HACK TO MAKE THE ANGLE DIFFERENCE BETWEEN -pi and pi - dx[3] = np.mod(dx[3] + np.pi, 2 * np.pi) - np.pi - u_fb = np.einsum("ik,k->i", K_closed_loop[:, :, t], dx) - u = nominal_controls[:, t] + alpha * k_open_loop[:, t] + u_fb - x_nxt, u_clip = self.dyn.integrate_forward_np(X[:, t], u) - X[:, t+1] = x_nxt - U[:, t] =u_clip - - return X, U - - def warm_up(self): - """Warm up the jitted functions.""" - - # Build a fake path as a 1 meter radius circle. - theta = np.linspace(0, 2 * np.pi, 100) - centerline = np.zeros([2, 100]) - centerline[0,:] = 1 * np.cos(theta) - centerline[1,:] = 1 * np.sin(theta) - - self.ref_path = RefPath(centerline, 0.5, 0.5, 1, True) - - # add obstacle - obs = np.array([[0, 0, 0.5, 0.5], [1, 1.5, 1, 1.5]]).T - obs_list = [[obs for _ in range(self.T)]] - self.update_obstacles(obs_list) - - x_init = np.array([0.0, -1.0, 1, 0, 0]) - print("Start warm up iLQR...") - # import matplotlib.pyplot as plt - plan = self.plan(x_init, verbose=False) - print("iLQR warm up finished.") - # plt.plot(plan['trajectory'][0,:], plan['trajectory'][1,:]) - # print(f"Warm up takes {plan['t_process']} seconds.") - self.ref_path = None - self.obstacle_list = [] - - diff --git a/scripts/planner/iLQR/ilqr_template.py b/scripts/planner/iLQR/ilqr_template.py deleted file mode 100644 index 3751e18..0000000 --- a/scripts/planner/iLQR/ilqr_template.py +++ /dev/null @@ -1,230 +0,0 @@ -from typing import Tuple, Optional, Dict, Union -from jaxlib.xla_extension import DeviceArray -import time -import os -import numpy as np -import jax -from .dynamics import Bicycle5D -from .cost import Cost, CollisionChecker, Obstacle -from .ref_path import RefPath -from .config import Config -import time - -status_lookup = ['Iteration Limit Exceed', - 'Converged', - 'Failed Line Search'] - -class iLQR(): - def __init__(self, config_file = None) -> None: - - self.config = Config() # Load default config. - if config_file is not None: - self.config.load_config(config_file) # Load config from file. - - self.load_parameters() - print('iLQR setting:', self.config) - - # Set up Jax parameters - jax.config.update('jax_platform_name', self.config.platform) - print('Jax using Platform: ', jax.lib.xla_bridge.get_backend().platform) - - # If you want to use GPU, lower the memory fraction from 90% to avoid OOM. - os.environ['XLA_PYTHON_CLIENT_MEM_FRACTION'] = '20' - - self.dyn = Bicycle5D(self.config) - self.cost = Cost(self.config) - self.ref_path = None - - # collision checker - # Note: This will not be used until lab2. - self.collision_checker = CollisionChecker(self.config) - self.obstacle_list = [] - - self.warm_up() - - def load_parameters(self): - ''' - This function defines iLQR parameters from . - ''' - # iLQR parameters - self.dim_x = self.config.num_dim_x - self.dim_u = self.config.num_dim_u - self.T = int(self.config.T) - self.dt = float(self.config.dt) - self.max_iter = int(self.config.max_iter) - self.tol = float(self.config.tol) # ILQR update tolerance. - - # line search parameters. - self.alphas = self.config.line_search_base**(np.arange(self.config.line_search_a, - self.config.line_search_b, - self.config.line_search_c) - ) - - print('Line Search Alphas: ', self.alphas) - - # regularization parameters - self.reg_min = float(self.config.reg_min) - self.reg_max = float(self.config.reg_max) - self.reg_init = float(self.config.reg_init) - self.reg_scale_up = float(self.config.reg_scale_up) - self.reg_scale_down = float(self.config.reg_scale_down) - self.max_attempt = self.config.max_attempt - - def warm_up(self): - ''' - Warm up the jitted functions. - ''' - # Build a fake path as a 1 meter radius circle. - theta = np.linspace(0, 2 * np.pi, 100) - centerline = np.zeros([2, 100]) - centerline[0,:] = 1 * np.cos(theta) - centerline[1,:] = 1 * np.sin(theta) - - self.ref_path = RefPath(centerline, 0.5, 0.5, 1, True) - - # add obstacle - obs = np.array([[0, 0, 0.5, 0.5], [1, 1.5, 1, 1.5]]).T - obs_list = [[obs for _ in range(self.T)]] - self.update_obstacles(obs_list) - - x_init = np.array([0.0, -1.0, 1, 0, 0]) - print('Start warm up iLQR...') - # import matplotlib.pyplot as plt - self.plan(x_init, verbose=False) - print('iLQR warm up finished.') - # plt.plot(plan['trajectory'][0,:], plan['trajectory'][1,:]) - # print(f'Warm up takes {plan['t_process']} seconds.') - self.ref_path = None - self.obstacle_list = [] - - def update_ref_path(self, ref_path: RefPath): - ''' - Update the reference path. - Args: - ref_path: RefPath: reference path. - ''' - self.ref_path = ref_path - - def update_obstacles(self, vertices_list: list): - ''' - Update the obstacle list for a list of vertices. - Args: - vertices_list: list of np.ndarray: list of vertices for each obstacle. - ''' - # Note: This will not be used until lab2. - self.obstacle_list = [] - for vertices in vertices_list: - self.obstacle_list.append(Obstacle(vertices)) - - def get_references(self, trajectory: Union[np.ndarray, DeviceArray]): - ''' - Given the trajectory, get the path reference and obstacle information. - Args: - trajectory: [num_dim_x, T] trajectory. - Returns: - path_refs: [num_dim_x, T] np.ndarray: references. - obs_refs: [num_dim_x, T] np.ndarray: obstacle references. - ''' - trajectory = np.asarray(trajectory) - path_refs = self.ref_path.get_reference(trajectory[:2, :]) - obs_refs = self.collision_checker.check_collisions(trajectory, self.obstacle_list) - return path_refs, obs_refs - - def plan(self, init_state: np.ndarray, - controls: Optional[np.ndarray] = None, - verbose=False) -> Dict: - ''' - Main iLQR loop. - Args: - init_state: [num_dim_x] np.ndarray: initial state. - control: [num_dim_u, T] np.ndarray: initial control. - ''' - - # We first check if the planner is ready - if self.ref_path is None: - # TODO: define your own return behavior in case there is no reference path. - print('No reference path is provided.') - return dict(status=-1) - - # if no initial control sequence is provided, we assume it is all zeros. - if controls is None: - controls =np.zeros((self.dim_u, self.T)) - else: - assert controls.shape[1] == self.T - - # Start timing - t_start = time.time() - - # Rolls out the nominal trajectory and gets the initial cost. - trajectory, controls = self.dyn.rollout_nominal_np(init_state, controls) - # Get path and obstacle references. - path_refs, obs_refs = self.get_references(trajectory) - # Get the initial cost of the trajectory. - J = self.cost.get_traj_cost(trajectory, controls, path_refs, obs_refs) - - ########################################################################## - # TODO: Implement the iLQR algorithm. Feel free to add any helper functions. - # You will find following implemented functions useful: - - ''' - A, B = self.dyn.get_jacobian_np(trajectory, controls) - - Returns the linearized 'A' and 'B' matrix of the ego vehicle around - nominal trajectory and controls. - - Args: - trajectory (DeviceArray): trajectory along the nominal trajectory. - controls (DeviceArray): controls along the trajectory. - - Returns: - np.ndarray: the Jacobian of the dynamics w.r.t. the state. - np.ndarray: the Jacobian of the dynamics w.r.t. the control. - ''' - ''' - state_next, control_clip = self.dyn.integrate_forward_np(state, control) - - Finds the next state of the vehicle given the current state and - control input. - - Args: - state:(np.ndarray) [5]. - control: (np.ndarray) [2]. - - Returns: - state_next: np.ndarray: next state. [5] - control_clip: np.ndarray: clipped control. [2] - ''' - - ''' - q, r, Q, R, H = self.cost.get_derivatives_np(trajectory, controls, path_refs, obs_refs) - - Given a state, control, and references, return Jacobians and Hessians of cost function - Input: - trajectory: (dim_x, N) array of state trajectory - controls: (dim_u, N) array of control sequence - path_refs: (dim_ref, N) array of references (e.g. reference path, reference velocity, etc.) - obs_refs: -Optional- (num_obstacle, 2, N) List of obstacles - return: - q: np.ndarray, jacobian of cost function w.r.t. trajectory - r: np.ndarray, jacobian of cost function w.r.t. controls - Q: np.ndarray, hessian of cost function w.r.t. trajectory - R: np.ndarray, hessian of cost function w.r.t. controls - H: np.ndarray, hessian of cost function w.r.t. trajectory and controls - ''' - ########################################################################## - - t_process = time.time() - t_start - solver_info = dict( - t_process=t_process, # Time spent on planning - N = self.T, dt = self.dt, - trajectory = trajectory, - controls = controls, - status=None, # TODO: Fill this in - K_closed_loop=None, # TODO: Fill this in - k_open_loop=None # TODO: Fill this in - # Optional TODO: Fill in other information you want to return - ) - return solver_info - - - diff --git a/scripts/planner/iLQR/ref_path.py b/scripts/planner/iLQR/ref_path.py deleted file mode 100644 index 51ef9b6..0000000 --- a/scripts/planner/iLQR/ref_path.py +++ /dev/null @@ -1,267 +0,0 @@ -from typing import Optional, Tuple, Union -import numpy as np -from matplotlib import pyplot as plt -import matplotlib -from pyspline.pyCurve import Curve -import csv - -class RefPath: - def __init__(self, center_line: np.ndarray, width_left: Union[np.ndarray, float], - width_right: Union[np.ndarray, float], speed_limt: Union[np.ndarray, float], loop: Optional[bool] = True) -> None: - ''' - Considers a track with fixed width. - - Args: - center_line: 2D numpy array containing samples of track center line - [[x1,x2,...], [y1,y2,...]] - width_left: float, width of the track on the left side - width_right: float, width of the track on the right side - loop: Boolean. If the track has loop - ''' - self.center_line_data = center_line.copy() - - # First, we build the centerline spline in XY space - self.center_line = Curve(x=center_line[0, :], y=center_line[1, :], k=3) - - # Project back to get the s for each point - s_norm, _ = self.center_line.projectPoint(center_line.T) - - if not isinstance(width_left, np.ndarray): - self.width_left = Curve(x=s_norm, y = np.ones_like(s_norm) * width_left, k=3) - else: - self.width_left = Curve(x=s_norm, y=width_left, k=3) - - if not isinstance(width_right, np.ndarray): - self.width_right = Curve(x=s_norm, y = np.ones_like(s_norm) * width_right, k=3) - else: - self.width_right = Curve(x=s_norm, y=width_right, k=3) - - if not isinstance(speed_limt, np.ndarray): - self.speed_limit = Curve(x=s_norm, y = np.ones_like(s_norm) * speed_limt, k=3) - else: - self.speed_limit = Curve(x=s_norm, y=speed_limt, k=3) - - self.loop = loop - self.length = self.center_line.getLength() - - # variables for plotting - self.build_track() - - def _interp_s(self, s: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: - """ - Gets the closest points on the centerline and the slope of trangent line on - those points given the normalized progress. - - Args: - s (np.ndarray): progress on the centerline. This is a vector of shape - (N,) and each entry should be within [0, 1]. - - Returns: - np.ndarray: the position of the closest points on the centerline. This - array is of the shape (2, N). - np.ndarray: the slope of of trangent line on those points. This vector - is of the shape (N, ). - """ - n = len(s) - interp_pt = self.center_line.getValue(s) - if n == 1: - interp_pt = interp_pt[np.newaxis, :] - slope = np.zeros(n) - - for i in range(n): - deri = self.center_line.getDerivative(s[i]) - slope[i] = np.arctan2(deri[1], deri[0]) - return interp_pt.T, slope - - def interp(self, theta_list): - """ - Gets the closest points on the centerline and the slope of trangent line on - those points given the unnormalized progress. - - Args: - s (np.ndarray): unnormalized progress on the centerline. This is a - vector of shape (N,). - - Returns: - np.ndarray: the position of the closest points on the centerline. This - array is of the shape (2, N). - np.ndarray: the slope of of trangent line on those points. This vector - is of the shape (N, ). - """ - if self.loop: - s = np.remainder(theta_list, self.length) / self.length - else: - s = np.array(theta_list) / self.length - s[s > 1] = 1 - return self._interp_s(s) - - def build_track(self): - N = 500 - theta_sample = np.linspace(0, 1, N, endpoint=False) * self.length - interp_pt, slope = self.interp(theta_sample) - self.track_center = interp_pt - - if self.loop: - self.track_bound = np.zeros((4, N + 1)) - else: - self.track_bound = np.zeros((4, N)) - - # Inner curve. - width_left = self.width_left.getValue(theta_sample)[:,1] - self.track_bound[0, :N] = interp_pt[0, :] - np.sin(slope) * width_left - self.track_bound[1, :N] = interp_pt[1, :] + np.cos(slope) * width_left - - # Outer curve. - width_right = self.width_right.getValue(theta_sample)[:,1] - self.track_bound[2, :N] = interp_pt[0, :] + np.sin(slope) * width_right - self.track_bound[3, :N] = interp_pt[1, :] - np.cos(slope) * width_right - - if self.loop: - self.track_bound[:, -1] = self.track_bound[:, 0] - - def get_reference(self, points: np.ndarray, - normalize_progress: Optional[bool] = False, - eps: Optional[float] = 1e-5) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: - - closest_pt, slope, s = self.get_closest_pts(points, eps=eps) - - v_ref = self.speed_limit.getValue(s)[:,1] - - if not self.loop: - temp = (1-s) * self.length - # bring the speed limit to 0 at the end of the path - v_ref = np.minimum(v_ref, temp) - v_ref = v_ref[np.newaxis, :] - - width_left = self.width_left.getValue(s)[:,1][np.newaxis, :] - width_right = self.width_right.getValue(s)[:,1][np.newaxis, :] - - if not normalize_progress: - s = s * self.length - s = s[np.newaxis, :] - return np.concatenate([closest_pt, slope, v_ref, s, width_right, width_left], axis=0) - - def get_closest_pts(self, points: np.ndarray, eps: Optional[float] = 1e-5) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: - """ - Gets the closest points on the centerline, the slope of their tangent - lines, and the progress given the points in the global frame. - - Args: - points (np.ndarray): the points in the global frame, of the shape - (2, N). - - Returns: - np.ndarray: the position of the closest points on the centerline. This - array is of the shape (2, N). - np.ndarray: the slope of of trangent line on those points. This vector - is of the shape (1, N). - np.ndarray: the normalized progress along the centerline. This vector is of the - shape (1, N). - """ - s, _ = self.center_line.projectPoint(points.T, eps=eps) - - if points.shape[1] == 1: - s = np.array([s]) - closest_pt, slope = self._interp_s(s) - slope = slope[np.newaxis, :] - - return closest_pt, slope, s - - def local2global(self, local_states: np.ndarray, return_slope=False) -> np.ndarray: - """ - Transforms trajectory in the local frame to the global frame (x, y) position. - - Args: - local_states (np.ndarray): The first row is the progress of the trajectory - and the second row is the lateral deviation. - - Returns: - np.ndarray: trajectory in the global frame. - """ - flatten = False - if local_states.ndim == 1: - flatten = True - local_states = local_states[:, np.newaxis] - num_pts = local_states.shape[1] - progress = local_states[0, :] - assert np.min(progress) >= 0. and np.max(progress) <= 1., ( - "The progress should be within [0, 1]!" - ) - lateral_dev = local_states[1, :] - global_states, slope = self._interp_s(progress) - if num_pts == 1: - global_states = global_states.reshape(2, 1) - global_states[0, :] = global_states[0, :] + np.sin(slope) * lateral_dev - global_states[1, :] = global_states[1, :] - np.cos(slope) * lateral_dev - - if flatten: - global_states = global_states[:, 0] - if return_slope: - return global_states, slope - return global_states - - def global2local(self, global_states: np.ndarray) -> np.ndarray: - """ - Transforms trajectory in the global frame to the local frame (progress, lateral - deviation). - - Args: - global_states (np.ndarray): The first row is the x position and the - second row is the y position. - - Returns: - np.ndarray: trajectory in the local frame. - """ - flatten = False - if global_states.ndim == 1: - flatten = True - global_states = global_states[:, np.newaxis] - local_states = np.zeros(shape=(2, global_states.shape[1])) - closest_pt, slope, progress = self.get_closest_pts( - global_states, normalize_progress=True - ) - dx = global_states[0, :] - closest_pt[0, :] - dy = global_states[1, :] - closest_pt[1, :] - sr = np.sin(slope) - cr = np.cos(slope) - - lateral_dev = sr*dx - cr*dy - local_states[0, :] = progress.reshape(-1) - local_states[1, :] = lateral_dev - - if flatten: - local_states = local_states[:, 0] - - return local_states - - # region: plotting - def plot_track(self, ax: Optional[matplotlib.axes.Axes] = None, - c: str = 'k', linewidth = 1, zorder=0, plot_center_line: bool = False): - if ax is None: - ax = plt.gca() - # Inner curve. - ax.plot( - self.track_bound[0, :], self.track_bound[1, :], c=c, linestyle='-', - linewidth = linewidth, - zorder=zorder - ) - # Outer curve. - ax.plot( - self.track_bound[2, :], self.track_bound[3, :], c=c, linestyle='-', - zorder=zorder - ) - if plot_center_line: - self.plot_track_center(ax, c=c, zorder=zorder) - - def plot_track_center(self, ax: Optional[matplotlib.axes.Axes] = None, c: str = 'k', linewidth = 1, zorder=0): - if ax is None: - ax = plt.gca() - ax.plot( - self.track_center[0, :], self.track_center[1, :], c=c, linestyle='--', - linewidth = linewidth, - zorder=zorder - ) - - # endregion - - \ No newline at end of file diff --git a/scripts/planner/planner.py b/scripts/planner/planner.py index 4972f26..7fc9b97 100644 --- a/scripts/planner/planner.py +++ b/scripts/planner/planner.py @@ -8,8 +8,8 @@ import os from .utils import RealtimeBuffer, get_ros_param, State2D, Policy, GeneratePwm -from .iLQR import RefPath -from .iLQR import iLQRnp as iLQR +from .ILQR import RefPath +from .ILQR import ILQRnp as ILQR from racecar_msgs.msg import ServoMsg #, OdomMsg, PathMsg, ObstacleMsg, PathMsg # https://wiki.ros.org/rospy_tutorials/Tutorials/numpy # from rospy.numpy_msg import numpy_msg @@ -82,9 +82,9 @@ def read_parameters(self): def setup_planner(self): ''' - This function setup the iLQR solver + This function setup the ILQR solver ''' - # Read iLQR parameters + # Read ILQR parameters if self.ilqr_params_file == "": ilqr_params_abs_path = None elif os.path.isabs(self.ilqr_params_file): @@ -92,8 +92,8 @@ def setup_planner(self): else: ilqr_params_abs_path = os.path.join(self.package_path, self.ilqr_params_file) - # TODO: Initialize iLQR solver - self.planner = iLQR(ilqr_params_abs_path) + # TODO: Initialize ILQR solver + self.planner = ILQR(ilqr_params_abs_path) # create buffers to handle multi-threading self.plan_state_buffer = RealtimeBuffer() diff --git a/scripts/planner/profile.ipynb b/scripts/planner/profile.ipynb index 16b5e73..59c6ea2 100644 --- a/scripts/planner/profile.ipynb +++ b/scripts/planner/profile.ipynb @@ -20,7 +20,7 @@ "import jax\n", "import jax.numpy as jnp\n", "jax.config.update('jax_platform_name', 'cpu')\n", - "from iLQR import iLQR, Path\n", + "from ILQR import ILQR, Path\n", "import csv\n", "import random\n", "\n", @@ -67,8 +67,8 @@ } ], "source": [ - "from iLQR.config import Config\n", - "from iLQR.dynamics import Bicycle5D\n", + "from ILQR.config import Config\n", + "from ILQR.dynamics import Bicycle5D\n", "\n", "config = Config()\n", "dyn = Bicycle5D(config)\n", @@ -96,7 +96,7 @@ } ], "source": [ - "from iLQR.cost import Cost\n", + "from ILQR.cost import Cost\n", "\n", "init_state = np.array([1, 5.4, 2, 3.14, 0])\n", "controls = jnp.zeros((2, n))\n", @@ -151,7 +151,7 @@ } ], "source": [ - "from iLQR.cost.collision_checker import CollisionChecker, Obstacle\n", + "from ILQR.cost.collision_checker import CollisionChecker, Obstacle\n", "collision_checker = CollisionChecker(config)\n", "\n", "obs1 = np.array([[-1, -1, -0.5, -0.5], [5.2, 5.9, 5.9, 5.2]]).T\n", diff --git a/scripts/planner/task1.ipynb b/scripts/planner/task1.ipynb index 80e705e..5079db4 100644 --- a/scripts/planner/task1.ipynb +++ b/scripts/planner/task1.ipynb @@ -50,9 +50,9 @@ "plt.rcParams['font.family'] = 'serif'\n", "plt.rcParams['font.serif'] = ['Times New Roman'] + plt.rcParams['font.serif']\n", "\n", - "from iLQR import RefPath\n", - "from iLQR import iLQR\n", - "# from iLQR import iLQRnp as iLQR\n" + "from ILQR import RefPath\n", + "from ILQR import ILQR\n", + "# from ILQR import iLQRnp as ILQR\n" ] }, { @@ -157,7 +157,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Test your iLQR planner\n", + "## Test your ILQR planner\n", "The planner need to warmup for a few steps for the first time it is run. This process will takes 10-30 seconds depending on your computer." ] }, @@ -170,16 +170,16 @@ "name": "stdout", "output_type": "stream", "text": [ - "iLQR setting: iLQR config: {'num_dim_x': 5, 'num_dim_u': 2, 'T': 10, 'dt': 0.1, 'platform': 'cpu', 'max_iter': 50, 'tol': 0.05, 'line_search_base': 0.1, 'line_search_a': -1, 'line_search_b': 3, 'line_search_c': 1, 'reg_min': '1e-5', 'reg_max': '1e8', 'reg_scale_down': 5, 'reg_scale_up': 5, 'reg_init': 1.0, 'max_attempt': 5, 'wheelbase': 0.257, 'radius': 0.13, 'width': 0.22, 'length': 0.4, 'delta_max': 0.35, 'delta_min': -0.35, 'v_max': 5.0, 'v_min': 0.0, 'omega_min': -6.0, 'omega_max': 6.0, 'a_max': 5.0, 'a_min': -5.0, 'v_ref': 5.0, 'dim_closest_pt_x': 0, 'dim_closest_pt_y': 1, 'dim_path_slope': 2, 'path_cost_type': 'quadratic', 'path_weight': 2.0, 'path_huber_delta': 2, 'vel_cost_type': 'quadratic', 'vel_weight': 0.2, 'vel_huber_delta': 1, 'dim_vel_limit': 3, 'vel_limit_a': 10.0, 'vel_limit_b': 1.0, 'heading_cost_type': 'quadratic', 'heading_weight': 1, 'heading_huber_delta': 1, 'lat_accel_thres': 6.0, 'lat_accel_a': 5.0, 'lat_accel_b': 2.0, 'dim_progress': 4, 'progress_weight': 2, 'dim_right_boundary': 5, 'dim_left_boundary': 6, 'lane_boundary_a': 10.0, 'lane_boundary_b': 10.0, 'ctrl_cost_type': 'quadratic', 'ctrl_cost_accel_weight': 0.5, 'ctrl_cost_steer_weight': 0.5, 'ctrl_cost_accel_huber_delta': 1.0, 'ctrl_cost_steer_huber_delta': 1.0, 'obs_a': 10.0, 'obs_b': 0.0}\n", + "ILQR setting: ILQR config: {'num_dim_x': 5, 'num_dim_u': 2, 'T': 10, 'dt': 0.1, 'platform': 'cpu', 'max_iter': 50, 'tol': 0.05, 'line_search_base': 0.1, 'line_search_a': -1, 'line_search_b': 3, 'line_search_c': 1, 'reg_min': '1e-5', 'reg_max': '1e8', 'reg_scale_down': 5, 'reg_scale_up': 5, 'reg_init': 1.0, 'max_attempt': 5, 'wheelbase': 0.257, 'radius': 0.13, 'width': 0.22, 'length': 0.4, 'delta_max': 0.35, 'delta_min': -0.35, 'v_max': 5.0, 'v_min': 0.0, 'omega_min': -6.0, 'omega_max': 6.0, 'a_max': 5.0, 'a_min': -5.0, 'v_ref': 5.0, 'dim_closest_pt_x': 0, 'dim_closest_pt_y': 1, 'dim_path_slope': 2, 'path_cost_type': 'quadratic', 'path_weight': 2.0, 'path_huber_delta': 2, 'vel_cost_type': 'quadratic', 'vel_weight': 0.2, 'vel_huber_delta': 1, 'dim_vel_limit': 3, 'vel_limit_a': 10.0, 'vel_limit_b': 1.0, 'heading_cost_type': 'quadratic', 'heading_weight': 1, 'heading_huber_delta': 1, 'lat_accel_thres': 6.0, 'lat_accel_a': 5.0, 'lat_accel_b': 2.0, 'dim_progress': 4, 'progress_weight': 2, 'dim_right_boundary': 5, 'dim_left_boundary': 6, 'lane_boundary_a': 10.0, 'lane_boundary_b': 10.0, 'ctrl_cost_type': 'quadratic', 'ctrl_cost_accel_weight': 0.5, 'ctrl_cost_steer_weight': 0.5, 'ctrl_cost_accel_huber_delta': 1.0, 'ctrl_cost_steer_huber_delta': 1.0, 'obs_a': 10.0, 'obs_b': 0.0}\n", "Jax using Platform: cpu\n", "Line Search Alphas: [10. 1. 0.1 0.01]\n", - "Start warm up iLQR...\n", - "iLQR warm up finished.\n" + "Start warm up ILQR...\n", + "ILQR warm up finished.\n" ] } ], "source": [ - "ilqr = iLQR(config_file)\n", + "ilqr = ILQR(config_file)\n", "ilqr.update_ref_path(ref_path)\n" ] }, @@ -248,7 +248,7 @@ } ], "source": [ - "# Create an iLQR solver\n", + "# Create an ILQR solver\n", "x_init = np.array([x[70], y[70], 5, np.arctan2(y[71]-y[70], x[71]-x[70]), 0])\n", "plan = ilqr.plan(x_init)\n", "\n", @@ -291,7 +291,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "iLQR takes : 0.01 sec']: 100%|██████████| 400/400 [00:02<00:00, 144.60it/s]\n" + "ILQR takes : 0.01 sec']: 100%|██████████| 400/400 [00:02<00:00, 144.60it/s]\n" ] } ], @@ -320,7 +320,7 @@ " x_cur = trajectory[:,1] \n", " init_control[:,:-1] = controls[:,1:]\n", " \n", - " pbar.set_description(f\"iLQR takes : {plan['t_process']:.2f} sec']\")" + " pbar.set_description(f\"ILQR takes : {plan['t_process']:.2f} sec']\")" ] }, { @@ -2450,7 +2450,7 @@ "provenance": [] }, "kernelspec": { - "display_name": "ros_base", + "display_name": "spirit-rl-pybullet", "language": "python", "name": "python3" }, @@ -2464,11 +2464,11 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.15" + "version": "3.8.16" }, "vscode": { "interpreter": { - "hash": "7568f0ba5f4aa6b4771c1bfaa80032ca4a9c3cb4e7689d750c939fd1994746c1" + "hash": "49f02b2742529133dbe8dbddfe10b388fed16170e20978eaf0bbb16d1bc6f59d" } } }, diff --git a/scripts/planner/task1/task1.yaml b/scripts/planner/task1/task1.yaml index 2a82e49..cf95575 100644 --- a/scripts/planner/task1/task1.yaml +++ b/scripts/planner/task1/task1.yaml @@ -12,7 +12,7 @@ platform: "cpu" # "cpu" or "gpu" or "tpu" ###########Optimization Parameters ################# #################################################### max_iter: 50 # maximum number of iterations -# tolerance for the iLQR convergence +# tolerance for the ILQR convergence # Make sure this is smaller than the minimum line search step size tol: 0.05 @@ -60,7 +60,7 @@ a_min: -5.0 # minimum acceleration # reference velocity v_ref: 5.0 # reference velocity #################################################### -########## Parameters for iLQR COST ################ +########## Parameters for ILQR COST ################ #################################################### ######## State Cost ############ diff --git a/scripts/planner/test_ilqr.ipynb b/scripts/planner/test_ilqr.ipynb index 3fe0527..08d9412 100644 --- a/scripts/planner/test_ilqr.ipynb +++ b/scripts/planner/test_ilqr.ipynb @@ -33,9 +33,9 @@ "plt.rcParams['font.family'] = 'serif'\n", "plt.rcParams['font.serif'] = ['Times New Roman'] + plt.rcParams['font.serif']\n", "\n", - "from iLQR import RefPath\n", - "from iLQR import iLQR\n", - "# from iLQR import iLQRnp as iLQR\n", + "from ILQR import RefPath\n", + "from ILQR import ILQR\n", + "# from ILQR import iLQRnp as ILQR\n", "\n" ] }, @@ -77,11 +77,11 @@ "name": "stdout", "output_type": "stream", "text": [ - "iLQR setting: iLQR config: {'num_dim_x': 5, 'num_dim_u': 2, 'T': 10, 'dt': 0.1, 'platform': 'cpu', 'max_iter': 50, 'tol': 0.05, 'line_search_base': 0.1, 'line_search_a': -1, 'line_search_b': 3, 'line_search_c': 1, 'reg_min': 1e-05, 'reg_max': 100000000.0, 'reg_scale_down': 5, 'reg_scale_up': 5, 'reg_init': 1.0, 'max_attempt': 5, 'wheelbase': 0.257, 'radius': 0.13, 'width': 0.22, 'length': 0.4, 'delta_max': 0.35, 'delta_min': -0.35, 'v_max': 5.0, 'v_min': 0.0, 'omega_min': -6.0, 'omega_max': 6.0, 'a_max': 5.0, 'a_min': -5.0, 'v_ref': 1.0, 'dim_closest_pt_x': 0, 'dim_closest_pt_y': 1, 'dim_path_slope': 2, 'path_cost_type': 'quadratic', 'path_weight': 5.0, 'path_huber_delta': 2, 'vel_cost_type': 'quadratic', 'vel_weight': 2.0, 'vel_huber_delta': 1, 'dim_vel_limit': 3, 'vel_limit_a': 10.0, 'vel_limit_b': 1.0, 'heading_cost_type': 'quadratic', 'heading_weight': 1, 'heading_huber_delta': 1, 'lat_accel_thres': 6.0, 'lat_accel_a': 5.0, 'lat_accel_b': 2.0, 'dim_progress': 4, 'progress_weight': 0, 'dim_right_boundary': 5, 'dim_left_boundary': 6, 'lane_boundary_a': 30.0, 'lane_boundary_b': 2.0, 'ctrl_cost_type': 'quadratic', 'ctrl_cost_accel_weight': 0.5, 'ctrl_cost_steer_weight': 0.3, 'ctrl_cost_accel_huber_delta': 1.0, 'ctrl_cost_steer_huber_delta': 1.0, 'obs_a': 10.0, 'obs_b': 10.0}\n", + "ILQR setting: ILQR config: {'num_dim_x': 5, 'num_dim_u': 2, 'T': 10, 'dt': 0.1, 'platform': 'cpu', 'max_iter': 50, 'tol': 0.05, 'line_search_base': 0.1, 'line_search_a': -1, 'line_search_b': 3, 'line_search_c': 1, 'reg_min': 1e-05, 'reg_max': 100000000.0, 'reg_scale_down': 5, 'reg_scale_up': 5, 'reg_init': 1.0, 'max_attempt': 5, 'wheelbase': 0.257, 'radius': 0.13, 'width': 0.22, 'length': 0.4, 'delta_max': 0.35, 'delta_min': -0.35, 'v_max': 5.0, 'v_min': 0.0, 'omega_min': -6.0, 'omega_max': 6.0, 'a_max': 5.0, 'a_min': -5.0, 'v_ref': 1.0, 'dim_closest_pt_x': 0, 'dim_closest_pt_y': 1, 'dim_path_slope': 2, 'path_cost_type': 'quadratic', 'path_weight': 5.0, 'path_huber_delta': 2, 'vel_cost_type': 'quadratic', 'vel_weight': 2.0, 'vel_huber_delta': 1, 'dim_vel_limit': 3, 'vel_limit_a': 10.0, 'vel_limit_b': 1.0, 'heading_cost_type': 'quadratic', 'heading_weight': 1, 'heading_huber_delta': 1, 'lat_accel_thres': 6.0, 'lat_accel_a': 5.0, 'lat_accel_b': 2.0, 'dim_progress': 4, 'progress_weight': 0, 'dim_right_boundary': 5, 'dim_left_boundary': 6, 'lane_boundary_a': 30.0, 'lane_boundary_b': 2.0, 'ctrl_cost_type': 'quadratic', 'ctrl_cost_accel_weight': 0.5, 'ctrl_cost_steer_weight': 0.3, 'ctrl_cost_accel_huber_delta': 1.0, 'ctrl_cost_steer_huber_delta': 1.0, 'obs_a': 10.0, 'obs_b': 10.0}\n", "Jax using Platform: cpu\n", "Line Search Alphas: [10. 1. 0.1 0.01]\n", - "Start warm up iLQR...\n", - "iLQR warm up finished.\n", + "Start warm up ILQR...\n", + "ILQR warm up finished.\n", "Update from 213.40329 to 94.71682 reg: 1.0 alpha: 10.000 0.002\n", "Update from 94.71682 to 37.47098 reg: 0.2 alpha: 1.000 0.004\n", "Update from 37.47098 to 32.437042 reg: 624.99994 alpha: 1.000 0.006\n", @@ -152,7 +152,7 @@ "centerline = load_path('outerloop_center_smooth.csv')\n", "path = RefPath(centerline, 0.6, 0.6, 2, loop=False)\n", "config_file = '/hdd/Git_Repo/PrincetonRaceCar/ROS_Core/src/Planning/ilqr_planning_ros/configs/ilqr.yaml'\n", - "solver = iLQR()#config_file)\n", + "solver = ILQR()#config_file)\n", "# # make some obstacles\n", "# obs1 = np.array([[-1, -1, -0, -0], [5.2, 6.0, 6.0, 5.2]]).T\n", "obs1 = np.array([[-1, -1, -0.5, -0.25, -0.5], [5.2, 5.9, 5.9, 5.6, 5.2]]).T\n", @@ -204,18 +204,18 @@ "name": "stdout", "output_type": "stream", "text": [ - "iLQR setting: iLQR config: {'num_dim_x': 5, 'num_dim_u': 2, 'T': 10, 'dt': 0.1, 'platform': 'cpu', 'max_iter': 50, 'tol': 0.05, 'line_search_base': 0.1, 'line_search_a': -1, 'line_search_b': 3, 'line_search_c': 1, 'reg_min': 1e-05, 'reg_max': 100000000.0, 'reg_scale_down': 5, 'reg_scale_up': 5, 'reg_init': 1.0, 'max_attempt': 5, 'wheelbase': 0.257, 'radius': 0.13, 'width': 0.22, 'length': 0.4, 'delta_max': 0.35, 'delta_min': -0.35, 'v_max': 5.0, 'v_min': 0.0, 'omega_min': -6.0, 'omega_max': 6.0, 'a_max': 5.0, 'a_min': -5.0, 'v_ref': 1.0, 'dim_closest_pt_x': 0, 'dim_closest_pt_y': 1, 'dim_path_slope': 2, 'path_cost_type': 'quadratic', 'path_weight': 5.0, 'path_huber_delta': 2, 'vel_cost_type': 'quadratic', 'vel_weight': 2.0, 'vel_huber_delta': 1, 'dim_vel_limit': 3, 'vel_limit_a': 10.0, 'vel_limit_b': 1.0, 'heading_cost_type': 'quadratic', 'heading_weight': 1, 'heading_huber_delta': 1, 'lat_accel_thres': 6.0, 'lat_accel_a': 5.0, 'lat_accel_b': 2.0, 'dim_progress': 4, 'progress_weight': 0, 'dim_right_boundary': 5, 'dim_left_boundary': 6, 'lane_boundary_a': 30.0, 'lane_boundary_b': 2.0, 'ctrl_cost_type': 'quadratic', 'ctrl_cost_accel_weight': 0.5, 'ctrl_cost_steer_weight': 0.3, 'ctrl_cost_accel_huber_delta': 1.0, 'ctrl_cost_steer_huber_delta': 1.0, 'obs_a': 10.0, 'obs_b': 10.0}\n", + "ILQR setting: ILQR config: {'num_dim_x': 5, 'num_dim_u': 2, 'T': 10, 'dt': 0.1, 'platform': 'cpu', 'max_iter': 50, 'tol': 0.05, 'line_search_base': 0.1, 'line_search_a': -1, 'line_search_b': 3, 'line_search_c': 1, 'reg_min': 1e-05, 'reg_max': 100000000.0, 'reg_scale_down': 5, 'reg_scale_up': 5, 'reg_init': 1.0, 'max_attempt': 5, 'wheelbase': 0.257, 'radius': 0.13, 'width': 0.22, 'length': 0.4, 'delta_max': 0.35, 'delta_min': -0.35, 'v_max': 5.0, 'v_min': 0.0, 'omega_min': -6.0, 'omega_max': 6.0, 'a_max': 5.0, 'a_min': -5.0, 'v_ref': 1.0, 'dim_closest_pt_x': 0, 'dim_closest_pt_y': 1, 'dim_path_slope': 2, 'path_cost_type': 'quadratic', 'path_weight': 5.0, 'path_huber_delta': 2, 'vel_cost_type': 'quadratic', 'vel_weight': 2.0, 'vel_huber_delta': 1, 'dim_vel_limit': 3, 'vel_limit_a': 10.0, 'vel_limit_b': 1.0, 'heading_cost_type': 'quadratic', 'heading_weight': 1, 'heading_huber_delta': 1, 'lat_accel_thres': 6.0, 'lat_accel_a': 5.0, 'lat_accel_b': 2.0, 'dim_progress': 4, 'progress_weight': 0, 'dim_right_boundary': 5, 'dim_left_boundary': 6, 'lane_boundary_a': 30.0, 'lane_boundary_b': 2.0, 'ctrl_cost_type': 'quadratic', 'ctrl_cost_accel_weight': 0.5, 'ctrl_cost_steer_weight': 0.3, 'ctrl_cost_accel_huber_delta': 1.0, 'ctrl_cost_steer_huber_delta': 1.0, 'obs_a': 10.0, 'obs_b': 10.0}\n", "Jax using Platform: cpu\n", "Line Search Alphas: [10. 1. 0.1 0.01]\n", - "Start warm up iLQR...\n", - "iLQR warm up finished.\n" + "Start warm up ILQR...\n", + "ILQR warm up finished.\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ - "iLQR takes : 0.01 sec']: 100%|██████████| 100/100 [00:21<00:00, 4.67it/s]\n", + "ILQR takes : 0.01 sec']: 100%|██████████| 100/100 [00:21<00:00, 4.67it/s]\n", "/tmp/ipykernel_19783/167050917.py:74: DeprecationWarning: Starting with ImageIO v3 the behavior of this function will switch to that of iio.v3.imread. To keep the current behavior (and make this warning disappear) use `import imageio.v2 as imageio` or call `imageio.v2.imread` directly.\n", " image = imageio.imread(filename)\n" ] @@ -279,7 +279,7 @@ "path = RefPath(centerline, 0.6, 0.6, 2, loop=False)\n", "\n", "config_file = '/hdd/Git_Repo/PrincetonRaceCar/ROS_Core/src/Planning/ilqr_planning_ros/configs/ilqr.yaml'\n", - "solver = iLQR() #(config_file)\n", + "solver = ILQR() #(config_file)\n", "solver.update_ref_path(path)\n", "\n", "obs1 = np.array([[-1, -1, -0.5, -0.5], [5.3, 5.8, 5.8, 5.3]]).T\n", @@ -310,7 +310,7 @@ " x_cur = trajectory[:,1] \n", " state_history[:,i] = x_cur\n", " init_control[:,:-1] = controls[:,1:]\n", - " pbar.set_description(f\"iLQR takes : {plan['t_process']:.2f} sec']\")\n", + " pbar.set_description(f\"ILQR takes : {plan['t_process']:.2f} sec']\")\n", " t_process[i] = plan['t_process']\n", " # plot\n", " plt.clf()\n",