diff --git a/Python/.gitignore b/Python/.gitignore new file mode 100644 index 0000000..4765058 --- /dev/null +++ b/Python/.gitignore @@ -0,0 +1,136 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +.vscode + +*.mp4 + +# chainer RL outputs +results/ diff --git a/Python/README.md b/Python/README.md new file mode 100644 index 0000000..ad043a7 --- /dev/null +++ b/Python/README.md @@ -0,0 +1,21 @@ +# MPCC +This is a Python implementation of the MPCC controller. As of 27 January 2021, the implentation is under review. Please note that the controller does not work as a correct manner as Matlab/C++ implementation. + +## Instllation +This implementation depends on the following packages. Please see `requirements.txt` for more details. + +- matplotlib +- numpy +- pandas +- scipy +- qpsolvers +- osqp +- numba +- sympy + +## How to run +With `--render` option, you can watch the animation + +``` +python nmpc_sim.py --render +``` diff --git a/Python/controller/mpc_controller.py b/Python/controller/mpc_controller.py new file mode 100644 index 0000000..1a5bda7 --- /dev/null +++ b/Python/controller/mpc_controller.py @@ -0,0 +1,244 @@ +import numpy as np +import scipy as sp +from scipy.linalg import block_diag +from qpsolvers import solve_qp + +import sympy as sy +from sympy.physics import mechanics +from scipy.signal import cont2discrete + +class SimModel(object): + def __init__(self, param=None, NX=None, NU=None): + assert param is not None + assert NX is not None + assert NU is not None + + self.param = param + self.NX = NX + self.NU = NU + + self.jacA, self.jacB = self.genLinModel() + self.force = self.genDynamicEquation() + + def SYMPY_rh_eq(self): + raise NotImplementedError('SYMPY_rh_eq is not implemented') + + def genJacobian(self): + MAT = self.SYMPY_rh_eq() + q = sy.symbols('q:{0}'.format(self.NX)) + u = sy.symbols('u:{0}'.format(self.NU)) + return MAT.jacobian(q), MAT.jacobian(u) + + def genDynamicEquation(self): + q = sy.symbols("q:{0}".format(self.NX)) + u = sy.symbols("u:{0}".format(self.NU)) + return sy.lambdify([q,u], self.SYMPY_rh_eq(), [{'atan':np.arctan, 'atan2':np.arctan2}, "numpy"]) + + def genLinModel(self): + q = sy.symbols("q:{0}".format(self.NX)) + u = sy.symbols("u:{0}".format(self.NU)) + A, B = self.genJacobian() + return (sy.lambdify([q,u], np.squeeze(A), [{'atan':np.arctan, 'atan2':np.arctan2}, "numpy"]), + sy.lambdify([q,u], np.squeeze(B), [{'atan':np.arctan, 'atan2':np.arctan2}, "numpy"])) + + def genDModel(self, x, dq, u, dT=0.1): + vector = np.hstack((x, dq)) + f = self.force(vector, u).T.flatten() + + A_c = np.array(self.jacA(vector, u)) + B_c = np.array(self.jacB(vector, u)) + g_c = f - A_c@vector - B_c@u + + B = np.hstack((B_c, g_c.reshape((-1,1)))) + + A_d, B_d, _, _, _ = cont2discrete((A_c, B, 0, 0), dT) + g_d = B_d[:,self.NU] + B_d = B_d[:,0:self.NU] + + return A_d, B_d, g_d + + def PredictForwardEuler(self, x, dq, u, dt): + vector = np.hstack((x, dq)) + d_vector = self.force(vector, u).T.flatten() + vector = vector + dt * d_vector + return vector[0:3], vector[3:6] + +class NMPC(): + def __init__(self, dT=0.02, time_horizon = 20, + H = None, J = None, q = None, + RH = None, RJ = None, r = None, + H_N = None, J_N = None, q_N = None, + dmodel = None, + G = None, h = None, + normalization_x = None, + normalization_u = None, + x_ubounds=[], x_lbounds=[], + u_ubounds=[], u_lbounds=[]): + + assert H != None + assert J != None + assert H_N != None + assert J_N != None + assert dmodel != None + + self.dT = dT + self.time_horizon = time_horizon + self.model = dmodel + + self.x_l = np.asarray(x_lbounds,dtype=np.float64) + self.x_u = np.asarray(x_ubounds,dtype=np.float64) + self.u_l = np.asarray(u_lbounds,dtype=np.float64) + self.u_u = np.asarray(u_ubounds,dtype=np.float64) + + self.NX = self.x_u.shape[0] + self.NU = self.u_u.shape[0] + + class StaticStageCost(): + def __init__(self, weight): + self.weight = np.asarray(weight) + def __call__(self, x_guess, u_guess, x_ref): + return self.weight + class StaticValueFunction(): + def __init__(self, weight): + self.weight = np.asarray(weight) + def __call__(self, x_guess, x_ref): + return self.weight + + self.H = H if callable(H) else StaticStageCost(H) + self.J = J if callable(J) else StaticStageCost(J) + self.H_N = H_N if callable(H_N) else StaticValueFunction(H_N) + self.J_N = J_N if callable(J_N) else StaticValueFunction(J_N) + self.R_H = RH if callable(RH) else StaticStageCost(RH) + self.R_J = RJ if callable(RJ) else StaticStageCost(RJ) + + if q is None: + q = np.zeros(self.NX) + self.q = q + if r is None: + r = np.zeros(self.NU) + self.r = r + + if G is None: + self.G = None + self.h = None + else: + assert h is not None + self.G = G if callable(H) else StaticStageCost(H) + self.h = h if callable(J) else StaticStageCost(J) + + if normalization_x is not None: + self.Norm = np.diag(normalization_x*(time_horizon+1) + normalization_u*time_horizon) + self.Norm_inv = np.linalg.inv(self.Norm) + else: + self.Norm = None + self.Norm_inv = None + + def iterate_NMPC(self, x_guess, u_guess, x_ref, verbose=False, warmstart=False): + T = self.time_horizon + X_DIM = self.NX*(T+1) + U_DIM = self.NU*(T) + + P_Q_blocks = [] + q_q_blocks = [] + P_R_blocks = [] + q_r_blocks = [] + + for k in range(T+1): + if k==T: + P_Q_blocks.append(self.H_N(x_guess[:,k], x_ref[:,k])) + q_q_blocks.append(self.J_N(x_guess[:,k], x_ref[:,k])+self.q) + else: + P_Q_blocks.append(self.H(x_guess[:,k], u_guess[:,k], x_ref[:,k])) + q_q_blocks.append(self.J(x_guess[:,k], u_guess[:,k], x_ref[:,k])+self.q) + + P_R_blocks.append(self.R_H(x_guess[:,k], u_guess[:,k], x_ref[:,k])) + q_r_blocks.append(self.R_J(x_guess[:,k], u_guess[:,k], x_ref[:,k])+self.r) + + P = block_diag(*P_Q_blocks,*P_R_blocks) + q = np.hstack(q_q_blocks+q_r_blocks) + P = 0.5*(P.T+P) + + Ad, Bd, gd = zip(*[self.model(q[:3], q[3:], u, self.dT) + for q, u in zip(x_guess.T, u_guess.T)]) + + A = block_diag(*Ad) + B = block_diag(*Bd) + b = np.hstack(( + x_guess[:,0], + - np.hstack(gd) + )) + A = np.block([ + [np.eye(self.NX), np.zeros((self.NX, X_DIM + U_DIM - self.NX))], + [A, np.zeros((X_DIM - self.NX, self.NX)), B] + ]) + A -= np.block([ + [np.zeros((self.NX, X_DIM+U_DIM))], + [ + np.zeros((X_DIM-self.NX, self.NX)), + np.eye(X_DIM - self.NX), + np.zeros_like(B) + ] + ]) + + ### Track Constratint + G = [ + self.G(x_g, x_r) + for x_g,x_r + in zip(x_guess.T, x_ref.T) + ] + h_block = [ + np.asarray(self.G(x_g, x_r))@x_g - np.asarray(self.h(x_g, x_r) ) + for x_g, x_r + in zip(x_guess.T, x_ref.T) + ] + G = np.hstack([ + block_diag(*G), + np.zeros((T+1, U_DIM)) + ]) + h = np.hstack(h_block) + + x_l = np.tile(self.x_l, T+1) + # Set trust region + x_l[6::self.NX] = -0.2 + x_guess[6] + u_l = np.tile(self.u_l, T) + x_l = np.hstack((x_l, u_l)) + + x_u = np.tile(self.x_u, T+1) + # Set trust region + x_u[6::self.NX] = 0.2 + x_guess[6] + u_u = np.tile(self.u_u, T) + x_u = np.hstack((x_u, u_u)) + + #print([x.shape for x in [P, q, G, h, A, b, x_l, x_u]]) + + try: + if self.Norm is None: + ret = solve_qp(P, q, G, h, A, b, x_l, x_u, solver='osqp') + else: + init_val = self.Norm_inv@np.hstack((x_guess.T.ravel(), u_guess.T.ravel())) + #print("Equation Const", np.all(A@init_val==b)) + #print("InEquation Const", np.all(G@init_val<=h)) + #print("Lower bound Const", np.all(init_val>=x_l)) + #print("Upper bound Const", np.all(init_val<=x_u)) + #print("") + + ret = solve_qp(self.Norm@P@self.Norm, + q@self.Norm, + G@self.Norm, h, + self.Norm_inv[:X_DIM,:X_DIM]@A@self.Norm, self.Norm_inv[:X_DIM,:X_DIM]@b, + self.Norm_inv@x_l, self.Norm_inv@x_u, + initvals=init_val, solver='osqp') + if ret[0] is not None: + ret = self.Norm@ret + except Exception as e: + print(e) + return np.zeros_like(x_guess), np.zeros_like(u_guess), None + + #if ret.dtype != np.object: + if ret is not None: + ret_x = ret[:X_DIM].reshape((-1, self.NX)).T + ret_u = ret[X_DIM:].reshape((-1, self.NU)).T + return ret_x, ret_u, 0. + else: + return x_guess, u_guess, None + diff --git a/Python/model/sim_model.py b/Python/model/sim_model.py new file mode 100644 index 0000000..e15624a --- /dev/null +++ b/Python/model/sim_model.py @@ -0,0 +1,150 @@ +import numpy as np +from numba import jit, int32, float32, double, cfunc +from numba.experimental import jitclass + +spec = [ + ('x', double[:]), ('dq', double[:]), ('u', double[:]), + ('m', double), ('Iz', double), + ('lf', double), ('lr', double), + ('Bf', double), ('Cf', double), ('Df', double), + ('Br', double), ('Cr', double), ('Dr', double), + ('Cr0', double), ('Cr2', double), + ('Cm1', double), ('Cm2', double), + ('iterNum', int32), ('sim_dt', double), ('control_dt', double), + ('car_shape', double[:,:]), +] + +@jitclass(spec) +class VehicleSimModel(object): + def __init__(self, m=0.041, Iz=27.8E-6, + lf=0.029, lr=0.033, + Bf=2.579, Cf=1.2, Df=0.192, + Br=3.3852, Cr=1.2691, Dr=0.1737, + Cm1=0.287, Cm2=0.0545, + Cr0= 0.0518, + Cr2=0.00035, scale=1.0, control_dt = 10.0, sim_dt=1.0): + + self.x = np.asfortranarray(np.zeros(3, dtype=np.float64)) + self.dq = np.zeros(3, dtype=np.float64) + self.u = np.zeros(2, dtype=np.float64) + + self.m = m + self.Iz= Iz + + self.lf = lf + self.lr = lr + + self.Bf = Bf + self.Cf = Cf + self.Df = Df + self.Br = Br + self.Cr = Cr + self.Dr = Dr + + self.Cr0 = Cr0 + self.Cr2 = Cr2 + + self.Cm1 = Cm1 + self.Cm2 = Cm2 + + car_l = (lf + lr)/2 * scale + car_w = car_l/2 + self.car_shape = np.asfortranarray(np.array([ [ car_l, car_w, 1.], + [ car_l,-car_w, 1.], + [-car_l,-car_w, 1.], + [-car_l, car_w, 1.], + [ car_l, car_w, 1.]], dtype=np.float64)) + self.sim_dt = sim_dt + self.control_dt = control_dt + self.iterNum = int(self.control_dt/self.sim_dt) + + @property + def shape(self): + shape = np.dot(self.car_shape, + np.asfortranarray(np.array([ + [ np.cos(self.x[2]), np.sin(self.x[2]), 0.], + [-np.sin(self.x[2]), np.cos(self.x[2]), 0.], + [ self.x[0] , self.x[1] , 1.]], dtype=np.float64))) + return shape[:,:2] + + def ODE_rh_eq(self, x, dq, u): + return self.ODE_rh_eq_x(x, dq, u), self.ODE_rh_eq_dq(x, dq, u) + + def ODE_rh_eq_dq(self, x, dq, u): + ddq = np.zeros_like(dq) + + alpha_f = np.arctan2(dq[1] + self.lf * dq[2], dq[0]) - u[1] + alpha_r = np.arctan2(dq[1] - self.lr * dq[2], dq[0]) + + Ffy = - self.Df * np.sin(self.Cf*np.arctan(self.Bf*alpha_f)) + Fry = - self.Dr * np.sin(self.Cr*np.arctan(self.Br*alpha_r)) + Frx = self.Cm1*u[0]-self.Cm2*u[0]*dq[0]-self.Cr0-self.Cr2*dq[0]**2 + + ddq[0] = dq[1] * dq[2] + 1.0/self.m * (Frx - Ffy*np.sin(u[1]) ) + ddq[1] = -dq[0] * dq[2] + 1.0/self.m * (Fry + Ffy*np.cos(u[1]) ) + ddq[2] = 1.0/self.Iz * (Ffy * self.lf * np.cos(u[1]) - Fry * self.lr) + + return ddq + + def ODE_rh_eq_x(self, x, dq, u): + dx = np.zeros_like(x) + dx[0:2] = np.dot(np.array([[np.cos(x[2]), -np.sin(x[2])], + [np.sin(x[2]), np.cos(x[2])]]), + dq[0:2]) + dx[2] = dq[2] + return dx + + def RK4(self, u, dt): + k1x, k1dq = self.ODE_rh_eq(self.x, self.dq, u) + k2x, k2dq = self.ODE_rh_eq(self.x+k1x*dt/2, self.dq+k1dq*dt/2, u) + k3x, k3dq = self.ODE_rh_eq(self.x+k2x*dt/2, self.dq+k2dq*dt/2, u) + k4x, k4dq = self.ODE_rh_eq(self.x+k3x*dt/2, self.dq+k3dq*dt/2, u) + + self.x = self.x + dt * (k1x/6 + k2x/3 + k3x/3 + k4x/6) + self.dq = self.dq + dt * (k1dq/6 + k2dq/3 + k3dq/3 + k4dq/6) + + def LeapFrog(self, u, dt): + dx = self.ODE_rh_eq_x(self.x, self.dq, u) + ddq = self.ODE_rh_eq_dq(self.x, self.dq, u) + self.x = self.x + dx * dt + ddq*dt**2/2 + + ddq2= self.ODE_rh_eq_dq(self.x, self.dq, u) + self.dq = self.dq + (ddq + ddq2)*dt/2 + + def sim_step(self, u): + for i in range(self.iterNum): + self.RK4(u, self.sim_dt/1000) + #self.LeapFrog(u, self.sim_dt/1000) + +if __name__ == "__main__": + import matplotlib.pyplot as plt + import matplotlib.animation as animation + + v = VehicleSimModel(scale=0.01) + + v.dq[0] = 0.2 + v.u[1] = np.deg2rad(20) + + x = np.array([v.x[0]]) + y = np.array([v.x[1]]) + car = [] + vhit = [] + + for i in range(100): + v.sim_step(v.u) + x = np.append(x, v.x[0]) + y = np.append(y, v.x[1]) + + carX, carY = v.shape.T + car.append((carX, carY)) + vhit.append(v.dq[0]) + + plt.figure(0) + plt.plot(x, y, "ob-", label="trajectory") + for carX, carY in car: + plt.plot(carX, carY, 'k-') + plt.axis("equal") + plt.figure(1) + plt.plot(range(len(vhit)), vhit) + plt.grid(True) + plt.show() diff --git a/Python/nmpc_sim.py b/Python/nmpc_sim.py new file mode 100644 index 0000000..72e918e --- /dev/null +++ b/Python/nmpc_sim.py @@ -0,0 +1,334 @@ +import json, os, argparse + +import numpy as np +import scipy as sp +import pandas as pd + +from util.cubic_spline_path import CubicSplinePath +from controller.mpc_controller import SimModel, NMPC +from model.sim_model import VehicleSimModel + +base_path = "../C++/Params/" +def load_param(file_name): + with open(os.path.join(base_path, file_name + ".json")) as json_file: + return json.load(json_file) + +class ExBicycleModel(SimModel): + def SYMPY_rh_eq(self): + import sympy as sy + # Generalized coordinates and velocities + ## Define q as [X,Y, phi, vx, vy, r, d, sigma, v_theta] + q = sy.symbols('q:{0}'.format(self.NX)) + dq = [q[3], q[4], q[5]] + u = [q[7], q[8], q[9]] + ## Define du as [dd, dsigma, dv_theta] + du = sy.symbols('u:{0}'.format(self.NU)) + + alpha_f = sy.atan2(dq[1] + self.param['lf'] * dq[2], dq[0]) - u[1] + alpha_r = sy.atan2(dq[1] - self.param['lr'] * dq[2], dq[0]) + + Ffy = - self.param['Df'] * sy.sin(self.param['Cf']*sy.atan(self.param['Bf']*alpha_f)) + Fry = - self.param['Dr'] * sy.sin(self.param['Cr']*sy.atan(self.param['Br']*alpha_r)) + Frx = self.param['Cm1'] * u[0] - self.param['Cm2']* u[0]*dq[0] - self.param['Cr0'] \ + - self.param['Cr2'] * (dq[0] ** 2) + + F_MAT = sy.Matrix([ + sy.cos(q[2])*dq[0] - sy.sin(q[2])*dq[1], + sy.sin(q[2])*dq[0] + sy.cos(q[2])*dq[1], + dq[2], + dq[1] * dq[2] + 1.0/self.param['m'] * (Frx - Ffy*sy.sin(u[1]) ), + -dq[0] * dq[2] + 1.0/self.param['m'] * (Fry + Ffy*sy.cos(u[1]) ), + 1.0/self.param['Iz'] * (Ffy * self.param['lf'] * sy.cos(u[1]) - Fry * self.param['lr']), + u[2], + du[0], du[1], du[2] + ]) + return F_MAT + + def PredictForwardEuler(self, x, u, dt): + d_vector = self.force(x, u).T.flatten() + return x + dt * d_vector + +def main(): + import matplotlib.pyplot as plt + import matplotlib.animation as animation + + parser = argparse.ArgumentParser() + parser.add_argument( + "--render", + action="store_true", + help="Render env states in a GUI window." + ) + args = parser.parse_args() + + track = pd.read_json(os.path.join(base_path,"track.json")) + track['S']=np.sqrt(track[['X', 'Y']].diff().pow(2).sum(axis=1)).cumsum() + path_ref = CubicSplinePath(track['X'], track['Y'], close=True) + + dT = 0.02 + T = 30 + simulation_time = 17 + + bounds = load_param('bounds') + cost = load_param('cost') + + model_params = load_param('model') + normalization = load_param('normalization') + + # Initialize model + ## Define the environment (vehicle model) + v = VehicleSimModel(scale=2, control_dt = dT*1000) + ## Define controller internal model + dmodel = ExBicycleModel(param=model_params, NX=10, NU=3) + + ### Theta state + s0 = np.array([0.0, 1.0], dtype=np.float64) + phi0 = np.arctan2(path_ref.dY(s0[0]), path_ref.dX(s0[0])) + + ### Vehicle state [X, Y, phi] + x0 = np.array([path_ref.X(s0[0]), path_ref.Y(s0[0]), phi0], + dtype=np.float64) + ### Vehicle state [vx, vy, r] + dq0 = np.array([s0[1], 0., 0.], + dtype=np.float64) + ### Vehicle input [d, sigma, v_theta] + u0 = np.array([0., 0., s0[1]], + dtype=np.float64) + + ## Set the inital value to the internal variable of the simulation environment + v.x = x0 + v.dq= dq0 + + # Define Const Function + def gen_cost_function(): + import sympy + from sympy import cos, sin, atan2 + + # Define x as [X,Y, phi, vx, vy, r, d, sigma, v_theta] + x = sympy.symbols('x:{0}'.format(10)) + a = sympy.Function('X')(x[6]) + b = sympy.Function('Y')(x[6]) + + xg = sympy.Matrix([_ for _ in x]) + xr = sympy.Matrix([a, b, 0, 0, 0, 0, 0, 0, 0, 0]) + + X, Y, dX, dY, ddX, ddY = sympy.symbols('X Y dX dY ddX ddY') + + # [e_c; e_l] = U@(x-x_ref) + U = sympy.Matrix([[ sin(atan2(b.diff(x[6]),a.diff(x[6]))),-cos(atan2(b.diff(x[6]),a.diff(x[6]))),0,0,0,0,0, 0,0,0], + [-cos(atan2(b.diff(x[6]),a.diff(x[6]))),-sin(atan2(b.diff(x[6]),a.diff(x[6]))),0,0,0,0,0, 0,0,0]]) + dE = sympy.derive_by_array(U*(xg-xr),x).reshape(10,2).transpose().tomatrix() + dE = dE.subs(a.diff(x[6],2),ddX).subs(b.diff(x[6],2),ddY) + dE = dE.subs(a.diff(x[6],1),dX).subs(b.diff(x[6],1),dY) + dE = dE.subs(a,X).subs(b,Y) + U = U.subs(a.diff(x[6],1),dX).subs(b.diff(x[6],1),dY) + + U = sympy.lambdify([dX, dY], U, ["numpy"]) + dE = sympy.lambdify([x, X, Y, dX, dY, ddX, ddY], dE, ["numpy"]) + + Q = np.diag([cost['qC'], cost['qL']]) + def H(x_guess, u_guess, x_ref): + track_X = path_ref.X(x_guess[6]) + track_Y = path_ref.Y(x_guess[6]) + track_dX = path_ref.dX(x_guess[6]) + track_dY = path_ref.dY(x_guess[6]) + track_ddX = path_ref.ddX(x_guess[6]) + track_ddY = path_ref.ddY(x_guess[6]) + dE_val = dE(x_guess, track_X, track_Y, track_dX, track_dY, track_ddX, track_ddY) + R = np.diag([0, 0, 0, 0, 0, 0, 0, cost['rD'], cost['rDelta'], cost['rVs']]) + return 2*dE_val.T@Q@dE_val + 2*R + + def J(x_guess, u_guess, x_ref): + track_X = path_ref.X(x_guess[6]) + track_Y = path_ref.Y(x_guess[6]) + track_dX = path_ref.dX(x_guess[6]) + track_dY = path_ref.dY(x_guess[6]) + track_ddX = path_ref.ddX(x_guess[6]) + track_ddY = path_ref.ddY(x_guess[6]) + U_val = U(track_dX, track_dY) + dE_val = dE(x_guess, track_X, track_Y, track_dX, track_dY, track_ddX, track_ddY) + R = np.diag([0, 0, 0, 0, 0, 0, 0, cost['rD'], cost['rDelta'], cost['rVs']]) + return 2*((x_guess - x_ref).T@U_val.T@Q@dE_val -x_guess.T@dE_val.T@Q@dE_val).T + 2*x_guess@R + + def H_N(x_guess, x_ref): + return 10*H(x_guess, None, x_ref) + + def J_N(x_guess, x_ref): + return 10*J(x_guess, None, x_ref) + + q = np.zeros(10) + q[-1] = -cost['qVs'] + + R = 2*np.diag([cost['rdD'], cost['rdDelta'], cost['rdVs']]) + r = np.zeros(3) + + def R_J(x_guess, u_guess, x_ref): + return 2*u_guess@R + + ## ieq constraint G+h<=0 + h = sympy.Matrix([(x[0] - a)**2 + (x[1] - b)**2 - model_params['R_in']**2]) + G = h.jacobian(x) + + G = G.subs(a.diff(x[6], 1), dX).subs(b.diff(x[6], 1), dY) + G = G.subs(a,X).subs(b,Y) + G = sympy.lambdify([x, X, Y, dX, dY], np.squeeze(G), ["numpy"]) + def G_func(x_guess, x_ref): + track_X = path_ref.X(x_guess[6]) + track_Y = path_ref.Y(x_guess[6]) + track_dX = path_ref.dX(x_guess[6]) + track_dY = path_ref.dY(x_guess[6]) + return G(x_guess, track_X, track_Y, track_dX, track_dY) + + h = h.subs(a,X).subs(b,Y) + h = sympy.lambdify([x, X, Y], np.squeeze(h), ["numpy"]) + def h_func(x_guess, x_ref): + track_X = path_ref.X(x_guess[6]) + track_Y = path_ref.Y(x_guess[6]) + return h(x_guess, track_X, track_Y) + + return H, J, q, H_N, J_N, q, R, R_J, r, G_func, h_func + + H, J, q, H_N, J_N, _, RH, RJ, r, G, h = gen_cost_function() + + # Define NMPC Controller + controller = NMPC(dT=dT, dmodel=dmodel.genDModel, time_horizon=T, + H=H, J=J, q=q, H_N=H_N, J_N=J_N, + RH= RH, RJ=RJ, r=r, + G=G, h=h, + #normalization_x = [normalization[e] for e in ["X", "Y", "phi", "vx", "vy", "r", "s", "D", "delta", "vs"]], + #normalization_u = [normalization[e] for e in ["dD", "dDelta", "dVs"]], + x_lbounds=[bounds['Xl'], bounds['Yl'],bounds['phil'], + bounds['vxl'],bounds['vyl'],bounds['rl'], bounds['sl'], + bounds['Dl'], bounds['deltal'], bounds['vsl']], + x_ubounds=[bounds['Xu'], bounds['Yu'],bounds['phiu'], + bounds['vxu'],bounds['vyu'],bounds['ru'], bounds['su'], + bounds['Du'], bounds['deltau'], bounds['vsu']], + u_lbounds=[bounds['dDl'], bounds['dDeltal'], bounds['dVsl']], + u_ubounds=[bounds['dDu'], bounds['dDeltau'], bounds['dVsu']]) + + # Init Initial Guess + u_bar = np.zeros((u0.shape[0], T), dtype=np.float64) + x_bar = np.zeros((x0.shape[0]+1+dq0.shape[0]+u0.shape[0], T+1), dtype=np.float64) + + x_bar[0:3,0] = v.x + x_bar[3:6,0] = v.dq + x_bar[6,0] = s0[0] + x_bar[7:10,0] = u0 + + for t in range(T): + x_bar[:,t+1] = dmodel.PredictForwardEuler(x_bar[:,t], u_bar[:,t], dT) + + # Logger + x = [] + y = [] + car = [] + vlog = [] + + render = args.render + + flag = False + for i in range(int(simulation_time/dT)): + e, _, _, s0[0] = path_ref.calc_track_error(v.x[0], v.x[1], s0[0]) + if e**0.5 > 0.15: + break + + # Init or update the initial guess of SQP + if flag: + u_bar[:,0:-2] = u_bar[:,1:-1] + x_bar[:,0:-2] = x_bar[:,1:-1] + + x_bar[0:3,0] = v.x + x_bar[3:6,0] = v.dq + x_bar[6, 0] = s0[0] + x_bar[7:10, 0] = v.u + + x_bar[:,-2] = x_bar[:,-3] + u_bar[:,-1] = np.zeros(3) + x_bar[:,-1] = dmodel.PredictForwardEuler(x_bar[:,-2], u_bar[:,-1], dT) + + x_bar[6] = np.unwrap(x_bar[6], path_ref.length/2) + x_bar[2] = np.unwrap(x_bar[2]) + + updateW = 0.8 + QP_it = 3 + else: + x_bar[0:3,0] = v.x + x_bar[3:6,0] = v.dq + x_bar[7:10,0] = u0 + u_bar = np.zeros((u0.shape[0], T), dtype=np.float64) + + x_bar[6] = np.unwrap(s0[0]+np.arange(0,T+1)*u0[2]*dT, path_ref.length/2) + x_bar[2,1:] = np.arctan2(path_ref.dY(x_bar[6, 1:]), path_ref.dX(x_bar[6, 1:])) + x_bar[2] = np.unwrap(x_bar[2]) + x_bar[0,1:] = path_ref.X(x_bar[6, 1:]) + x_bar[1,1:] = path_ref.Y(x_bar[6, 1:]) + x_bar[3,1:] = u0[2] + + updateW = 0.5 + QP_it = 5 + + for t in range(T): + x_bar[:,t+1] = dmodel.PredictForwardEuler(x_bar[:,t], u_bar[:,t], dT) + + # SQP Iteration + flag = False + for _ in range(QP_it): + # MPC iteration + x_ref = np.vstack([path_ref.X(x_bar[6]), path_ref.Y(x_bar[6]), + np.arctan2(path_ref.dY(x_bar[6]), path_ref.dX(x_bar[6])), + np.zeros((3, T+1)), + x_bar[6], + np.zeros((3, T+1))]) + + x_new, u_new, loss = controller.iterate_NMPC(x_bar, u_bar, x_ref) + + if loss is not None: + x_bar = updateW*x_bar + (1-updateW)*x_new + u_bar = updateW*u_bar + (1-updateW)*u_new + flag = True + + # Get vehicle command 'u' + v.u = x_bar[7:,1] + + x = np.append(x, v.x[0]) + y = np.append(y, v.x[1]) + + carX, carY = v.shape.T + car.append((carX, carY)) + vlog.append(np.hstack((v.x,v.dq))) + + if render: + plt.cla() + plt.plot(track['X'], track['Y'], "r--" ) + plt.plot(track['X_i'],track['Y_i'], "k-") + plt.plot(track['X_o'],track['Y_o'], "k-") + + plt.plot(carX, carY, "b-") + plt.plot(v.x[0], v.x[1], "x") + plt.plot(x_ref[0], x_ref[1], "g--") + plt.plot(x_bar[0], x_bar[1], "b-") + plt.title("speed[m/s]:{:.2f}, deg:{:.2f}".format(v.dq[0], v.x[2])) + plt.pause(0.001) + + # Step sim forward + v.sim_step(v.u) + + # Plot result after the simulation + plt.figure(0) + plt.cla() + plt.plot(track['X'],track['Y'], "r--") + plt.plot(track['X_i'],track['Y_i'], "k-") + plt.plot(track['X_o'],track['Y_o'], "k-") + + plt.plot(x, y, "b-", label="trajectory") + #for carX, carY in car: + # plt.plot(carX, carY, 'k-') + plt.axis("equal") + plt.figure(1) + plt.cla() + plt.plot(range(len(vlog)), np.asarray(vlog)[:,3]) + plt.ylim(-0.5, 3.0) + plt.grid(True) + plt.show() + +if __name__ == "__main__": + main() diff --git a/Python/requirements.txt b/Python/requirements.txt new file mode 100644 index 0000000..0e109fe --- /dev/null +++ b/Python/requirements.txt @@ -0,0 +1,8 @@ +matplotlib +numpy +pandas +scipy +qpsolvers +osqp +numba +sympy diff --git a/Python/util/cubic_spline_path.py b/Python/util/cubic_spline_path.py new file mode 100644 index 0000000..3327826 --- /dev/null +++ b/Python/util/cubic_spline_path.py @@ -0,0 +1,55 @@ +import numpy as np +from scipy import interpolate +from scipy import optimize + +class CubicSplinePath: + def __init__(self, x, y, close=False): + x, y = map(np.asarray, (x, y)) + if close: + x = np.append(x, x[0]) + y = np.append(y, y[0]) + s = np.append([0],np.cumsum((np.diff(x)**2 + np.diff(y)**2)**0.5)) + + self.X = interpolate.CubicSpline(s, x, bc_type='periodic' if close else 'not-a-knot') + self.Y = interpolate.CubicSpline(s, y, bc_type='periodic' if close else 'not-a-knot') + + self.dX = self.X.derivative(1) + self.ddX = self.X.derivative(2) + + self.dY = self.Y.derivative(1) + self.ddY = self.Y.derivative(2) + + self.length = s[-2] if close else s[-1] + + def calc_yaw(self, s): + dx, dy = self.dX(s), self.dY(s) + return np.arctan2(dy, dx) + + def calc_curvature(self, s): + dx, dy = self.dX(s), self.dY(s) + ddx, ddy = self.ddX(s), self.ddY(s) + return (ddy * dx - ddx * dy) / ((dx ** 2 + dy ** 2)**(3 / 2)) + + def __find_nearest_point(self, s0, x, y): + def calc_distance(_s, *args): + _x, _y= self.X(_s), self.Y(_s) + return (_x - args[0])**2 + (_y - args[1])**2 + + def calc_distance_jacobian(_s, *args): + _x, _y = self.X(_s), self.Y(_s) + _dx, _dy = self.dX(_s), self.dY(_s) + return 2*_dx*(_x - args[0])+2*_dy*(_y-args[1]) + + minimum = optimize.fmin_cg(calc_distance, s0, calc_distance_jacobian, args=(x, y), full_output=True, disp=False) + return minimum + + def calc_track_error(self, x, y, s0): + ret = self.__find_nearest_point(s0, x, y) + + s = ret[0][0] + e = ret[1] + + k = self.calc_curvature(s) + yaw = self.calc_yaw(s) + + return e, k, yaw, s