-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
239 lines (205 loc) · 7.39 KB
/
utils.py
File metadata and controls
239 lines (205 loc) · 7.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
import pickle
import time
import numpy as np
import scipy.linalg as sci
from scipy import signal
from sklearn.neighbors import NearestNeighbors
def print_completion(i,simulation_length,tstart,div=4.0):
if (i+1) % int(simulation_length/div) == 0:
print(str(int(100.0*((float(i+1)/float(simulation_length)))))+"% complete. Time elapsed: "+str(np.round(time.time()-tstart,2))+"s.")
# Integrating dynamics
def integrate(f,x,u,dt):
k1 = f(x,u)*dt
k2 = f(x+k1/2.0,u)*dt
k3 = f(x+k2/2.0,u)*dt
k4 = f(x+k3,u)*dt
return x + (k1+2.0*(k2+k3)+k4)/6.0
def simulate(f,x0,u_vec,dt,N):
x = np.copy(x0)
xtraj = np.zeros((len(x0),N))
if len(u_vec.shape) == 1:
u_vec = np.repeat(u_vec.reshape(-1,1),N,axis=1)
for i in range(N):
xtraj[:,i] = integrate(f,x,u_vec[:,i],dt)
x = np.copy(xtraj[:,i])
return xtraj
# Interpolate image
def interp_img(x,img_array,extent=[[-1,1],[-1,1]]):
xlen = np.abs(extent[0][1]-extent[0][0])
ylen = np.abs(extent[1][1]-extent[1][0])
if x[0] < extent[0][1] and x[0] > extent[0][0] and x[1] < extent[1][1] and x[1] > extent[1][0]:
xind = int((x[0]-extent[0][0])*img_array.shape[0]/xlen)
yind = int((x[1]-extent[1][0])*img_array.shape[1]/ylen)
val = img_array[xind,yind]
else:
val = 1
return val
def closest_point(point, point_array):
dist = np.linalg.norm(point_array-point, axis=1)
arg = np.argmin(dist)
return dist[arg], arg
# Rotations
def wrap2Pi(x):
xm = np.mod(x+np.pi,(2.0*np.pi))
return xm-np.pi
def Rot(x):
return np.array([[np.cos(x),-np.sin(x)],[np.sin(x),np.cos(x)]])
def RotVec(x_vec, rot_vec):
rvec = np.array([np.dot(x_vec[i,:-1],Rot(rot_vec[i])) for i in range(x_vec.shape[0])])
return np.hstack([rvec, x_vec[:,2].reshape(x_vec.shape[0],1)])
# Calculating rattling
def MSD(xvec,steps):
c = []
for i in range(xvec.shape[1]-steps):
c.append(np.sum((xvec[:,i+steps]-xvec[:,i])**2.0))
return np.mean(c)/float(xvec.shape[0])
def diffusion_vel(x, dt):
t_vec = np.expand_dims(np.sqrt(np.arange(1,x.shape[0]+1)*dt),axis=1)
vec = np.divide(x-x[0],t_vec)
return vec
def rattling(x, dt, diffVel=True):
eps = 1e-10
if diffVel:
vec = diffusion_vel(x, dt)
else:
vec = np.copy(x)
if len(x.shape) == 1:
s = np.var(vec)
if s < 1e-10:
R = 0.5*np.log(eps)
else:
R = 0.5*np.log(s)
else:
s = np.linalg.det(np.cov(vec.T))
if s < 1e-10:
R = 0.5*np.log(eps)
else:
R = 0.5*np.log(s)
return R
def rattling_windows(mat, dt, window_sz, overlap, diffVel=True):
rat_list = []
ind_list = window_inds(mat,window_sz,overlap)
for inds in ind_list:
R = rattling(mat[inds[0]:inds[1]],dt, diffVel)
rat_list.append(R)
return rat_list
# Rectangular windowing
def window_inds(dataset, window_sz, overlap, offset=0):
"""
Helper function that applies a rectangular window to the dataset
given some overlap percentage, s.t. ov \in [0,1)
"""
data_len = dataset.shape[0]
assert window_sz < data_len
ind1 = offset
ind2 = offset+window_sz-1
ind_list = []
ov_ind_diff = int(np.ceil(np.abs(overlap*window_sz)))
if ov_ind_diff == window_sz:
ov_ind_diff += -1
while ind2 < data_len+offset:
ind_list.append((ind1,ind2))
ind1 += window_sz-ov_ind_diff
ind2 += window_sz-ov_ind_diff
return ind_list
# Filtering
def moving_average(x,N):
return np.convolve(x,np.ones((N,))/float(N),mode='valid')
def butter_highpass(cutoff, fs, order=5):
nyq = 0.5 * fs
normal_cutoff = cutoff / nyq
b, a = signal.butter(order, normal_cutoff, btype='high', analog=False)
return b, a
def butter_highpass_filter(data, cutoff, fs, order=5):
b, a = butter_highpass(cutoff, fs, order=order)
y = signal.filtfilt(b, a, data)
return y
def butter_bandstop(cutoffs, fs, order=5):
nyq = 0.5 * fs
normal_cutoffs = cutoffs / nyq
b, a = signal.butter(order, normal_cutoffs, btype='bandstop', analog=False)
return b, a
def butter_bandstop_filter(data, cutoffs, fs, order=5):
b, a = butter_bandstop(cutoffs, fs, order=order)
y = signal.filtfilt(b, a, data)
return y
# Save/Load
def store_data(fname, rlist, clist, elist):
db = {}
db['rlist'] = rlist
db['clist'] = clist
db['elist'] = elist
dbfile = open(fname, 'ab')
pickle.dump(db, dbfile)
dbfile.close()
def load_data(fname):
# for reading also binary mode is important
dbfile = open(fname, 'rb')
db = pickle.load(dbfile)
rlist = db['rlist']
clist = db['clist']
elist = db['elist']
dbfile.close()
return rlist, clist, elist
# Observable preprocessing
def preprocess(data):
"""
Here we have take in a dataset of smarticle coordinates of the following
shape: (SampleNum, SmarticleNum*3), where each of the 3 coordinates are
coords = [x,y,theta,L_arm_theta,R_arm_theta]. We output a 7 dimensional
vector of the following format: [<mx_1>,<mx_2>,<mx_3>,<my_1>,<my_2>,<my_3>,e_theta]
"""
# Take in (x,y,theta) of each smarticle
S1_coords = data[:,0:3]
S2_coords = data[:,3:6]
S3_coords = data[:,6:9]
#########################
# Rotational invariance #
#########################
# Get CoM from the frame of each smarticle
CoM = np.mean([S1_coords,S2_coords,S3_coords],axis=0)
CoM_S1 = CoM-S1_coords
CoM_S2 = CoM-S2_coords
CoM_S3 = CoM-S3_coords
# Wrap angles
CoM_S1[:,2] = wrap2Pi(CoM_S1[:,2])
CoM_S2[:,2] = wrap2Pi(CoM_S2[:,2])
CoM_S3[:,2] = wrap2Pi(CoM_S3[:,2])
# Rotate coordinates so they're relative to the previous timestep
relCoM_S1 = RotVec(CoM_S1, S1_coords[:,2])
relCoM_S2 = RotVec(CoM_S2, S2_coords[:,2])
relCoM_S3 = RotVec(CoM_S3, S3_coords[:,2])
# Result Matrix
resMat = np.vstack([relCoM_S1[:,0],relCoM_S2[:,0],relCoM_S3[:,0],
relCoM_S1[:,1],relCoM_S2[:,1],relCoM_S3[:,1]]).T
# For theta:
pTheta = np.abs(np.mean(np.exp(1j*np.vstack([S1_coords[:,2],S2_coords[:,2],S3_coords[:,2]]).T),axis=1)).reshape(data.shape[0],1)
return np.hstack([resMat,pTheta])
# Estimating steady-state
def estimate_pss(qseed,qss=None,thresh=5,num_neighb=None,returnModel=False):
# Make N >> d
d = np.min(qseed.shape)
L = np.max(qseed.shape)
N = 3*d if num_neighb is None else num_neighb
qss_vec = np.copy(qseed) if qss is None else qss
# Instantiate NN model
knn = NearestNeighbors(n_neighbors=N)
knn.fit(qseed)
neighborhoods = knn.kneighbors(qseed,N,return_distance=False) # get neighbors
# Calculate neighborhood volume
var_tensors = [np.cov(qseed[neighborhoods[i]].T) for i in range(L)] # get variance tensor over neighborhoods
vol_list = np.array([np.sqrt(np.linalg.det(var_tensors[i])) for i in range(L)])
# Calculate inner-product over variance tensor
var_prods = []
for i in range(L):
temp = []
for q_p in qss_vec:
temp.append((q_p-qseed[i]).dot(np.linalg.inv(var_tensors[i])).dot((q_p-qseed[i]).T))
var_prods.append(temp)
# Estimate probability given by counting fraction of points within some radius
prob_list = np.array([len(np.where(np.array(v)<thresh)[0])/float(len(v)) for v in var_prods])
# Output probability density
if returnModel:
return prob_list/vol_list, knn
else:
return prob_list/vol_list