-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaccProcess.py
More file actions
367 lines (307 loc) · 12.6 KB
/
accProcess.py
File metadata and controls
367 lines (307 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
import shutil
import tarfile
import os
import numpy as np
import matplotlib.pyplot as plt
import scipy
from scipy import signal
from tqdm import tqdm
import matlab.engine
matlabeng = matlab.engine.start_matlab() #启动matlab
def getIntDifMat(n: int, dt: float) -> tuple[np.ndarray, np.ndarray]:
"""获取积分和求导矩阵
Args:
n (int): 矩阵阶数
dt (float): 时间步长
Returns:
tuple[np.array, np.array]: 积分矩阵,求导矩阵
"""
phi1 = np.concatenate([np.array([0, 0.5, 0]), np.zeros([n - 3, ])])
temp1 = np.concatenate([-1 / 2 * np.identity(n - 2), np.zeros([n - 2, 2])], axis=1)
temp2 = np.concatenate([np.zeros([n - 2, 2]), 1 / 2 * np.identity(n - 2)], axis=1)
phi2 = temp1 + temp2
phi3 = np.concatenate([np.zeros([n - 3, ]), np.array([0, -0.5, 0])])
Phi_dif = np.concatenate([np.reshape(phi1, [1, phi1.shape[0]]), phi2, np.reshape(phi3, [1, phi3.shape[0]])], axis=0)
Phi_int = np.linalg.inv(Phi_dif) * dt
Phi_dif = Phi_dif / dt
return Phi_int, Phi_dif
def getIntDifMat1(n: int, dt: float) -> tuple[np.array, np.array]:
"""获取积分和求导矩阵
Args:
n (int): 矩阵阶数
dt (float): 时间步长
Returns:
tuple[np.array, np.array]: 积分矩阵,求导矩阵
"""
Phi_int = np.triu(np.zeros((n, n)) + 1).transpose() - np.identity(n) / 2
Phi_dif = np.linalg.inv(Phi_int) / dt
Phi_int *= dt
return Phi_int, Phi_dif
def getvel_dsp(acc: np.ndarray, dt: float) -> tuple[np.ndarray, np.ndarray]:
"""根据加速度时程获取速度和位移时程
Args:
acc (np.ndarray): 加速度时程矩阵,每一行表示一个时程
dt (float): 时间步长
Returns:
tuple[np.ndarray, np.ndarray]: 速度时程矩阵,位移时程矩阵
"""
n = acc.shape[-1]
if n < 10000:
Phi_int = np.triu(np.zeros((n, n)) + 1).transpose() - np.identity(n) / 2
# Phi_dif = np.linalg.inv(Phi_int) / dt
Phi_int *= dt
vel = Phi_int.dot(acc.transpose()).transpose()
if len(vel.shape) == 2:
vel = vel - np.mean(vel, axis=1)[:, None]
else:
vel = vel - np.mean(vel)
dsp = Phi_int.dot(vel.transpose()).transpose()
if len(dsp.shape) == 2:
dsp = dsp - np.mean(dsp, axis=1)[:, None]
else:
dsp = dsp - np.mean(dsp)
else:
vel = np.zeros_like(acc)
vel[..., 0] = 0.5 * acc[..., 0] * dt
for i in range(1, n):
vel[..., i] = vel[..., i - 1] + 0.5 * (acc[..., i - 1] + acc[..., i]) * dt
vel = vel - np.mean(vel)
dsp = np.zeros_like(vel)
dsp[..., 0] = 0.5 * vel[..., 0] * dt
for i in range(1, n):
dsp[..., i] = dsp[..., i - 1] + 0.5 * (vel[..., i - 1] + vel[..., i]) * dt
dsp = dsp - np.mean(dsp)
return vel, dsp
def baselineCorrection(acc: np.ndarray, dt: float, M: int) -> np.ndarray:
"""基线调整函数
Args:
acc (np.ndarray): 加速度时程
dt (float): 时间步长
M (int): 阶数,越高代表基线调整去除的高次项越多
Returns:
np.ndarray: 基线调整后的加速度时程
"""
t = np.linspace(dt, dt * len(acc), len(acc))
acc1 = acc
vel, dsp = getvel_dsp(acc, dt)
Gv = np.zeros(shape=(acc.shape[0], M + 1))
for i in range(M + 1):
Gv[:, i] = t ** (M + 1 - i)
polyv = np.dot(np.dot(np.linalg.inv(Gv.transpose().dot(Gv)), Gv.transpose()), vel)
for i in range(M + 1):
acc1 -= (M + 1 - i) * polyv[i] * t ** (M - i)
acc_new = acc1
vel1, dsp1 = getvel_dsp(acc1, dt)
Gd = np.zeros(shape=(acc.shape[0], M + 1))
for i in range(M + 1):
Gd[:, i] = t ** (M + 2 - i)
polyd = np.dot(np.dot(np.linalg.inv(Gd.transpose().dot(Gd)), Gd.transpose()), dsp1)
for i in range(M + 1):
acc_new -= (M + 2 - i) * (M + 1 - i) * polyd[i] * t ** (M - i)
return acc_new
def solve_sdof_eqwave_piecewise_exact(omg: np.ndarray, zeta: float, ag: np.ndarray, dt: float) -> tuple[np.ndarray, np.ndarray]:
"""求解单自由度响应
Args:
omg (np.ndarray): 单自由度圆频率数组
zeta (float): 阻尼比
ag (np.ndarray): 作用加速度时程
dt (float): 时间步长
Returns:
tuple[np.ndarray, np.ndarray]: 位移响应时程,速度响应时程
"""
omg_d = omg * np.sqrt(1.0 - zeta * zeta)
m = len(omg)
n = len(ag)
u, v = np.zeros((m, n)), np.zeros((m, n))
B1 = np.exp(-zeta * omg * dt) * np.cos(omg_d * dt)
B2 = np.exp(-zeta * omg * dt) * np.sin(omg_d * dt)
omg2 = 1.0 / omg / omg
omg3 = 1.0 / omg / omg / omg
for i in range(n - 1):
alpha = (-ag[i + 1] + ag[i]) / dt
A0 = -ag[i] * omg2 - 2.0 * zeta * alpha * omg3
A1 = alpha * omg2
A2 = u[:, i] - A0
A3 = (v[:, i] + zeta * omg * A2 - A1) / omg_d
u[:, i + 1] = A0 + A1 * dt + A2 * B1 + A3 * B2
v[:, i + 1] = A1 + (omg_d * A3 - zeta * omg * A2) * B1 - (omg_d * A2 + zeta * omg * A3) * B2
return u, v
def getResponseSpectrum(acc, dt, Period=np.logspace(-1.5, 0.5, 300), damp=0.05):
sa = matlabeng.getResponseSpectrum(matlab.double(acc.tolist()), dt, matlab.double(Period.tolist()), damp)
return np.array(sa).ravel()
def response_spectra_py(ag: np.ndarray, dt: float, T: np.ndarray=np.logspace(-1.5, 0.5, 300), zeta=0.05) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""_summary_
Args:
ag (np.ndarray): 加速度时程
dt (float): 时间步长
T (np.ndarray, optional): 反应谱周期数组. Defaults to np.logspace(-1.5, 0.5, 300).
zeta (float, optional): 阻尼比. Defaults to 0.05.
Returns:
tuple[np.ndarray, np.ndarray, np.ndarray]: 加速度谱,速度谱,位移谱
"""
N = len(T)
RSA = np.zeros(N)
RSV = np.zeros(N)
RSD = np.zeros(N)
omg = 2.0 * np.pi / T
u, v = solve_sdof_eqwave_piecewise_exact(omg, zeta, ag, dt)
a = -2.0 * zeta * omg[:, None] * v - omg[:, None] * omg[:, None] * u
RSA = np.max(np.abs(a), axis=1)
RSV = np.max(np.abs(v), axis=1)
RSD = np.max(np.abs(u), axis=1)
return RSA, RSV, RSD
def getFourierSpectrum(acc: np.ndarray, dt: float, freq: np.ndarray=None, smooth: str=None, order: int=3, coef: int=None) -> tuple[np.ndarray, np.ndarray]:
"""求解傅里叶谱
Args:
acc (np.ndarray): 加速度时程
dt (float): 时间步长
freq (np.ndarray, optional): 傅里叶谱频率数组. Defaults to None.
smooth (str, optional): 平滑方法. Defaults to None.
order (int, optional): 's-g'平滑方法的平滑阶数. Defaults to 3.
coef (int, optional): 平滑系数. Defaults to None.
Returns:
tuple[np.ndarray, np.ndarray]: 傅里叶谱频率数组,傅里叶谱值数组
"""
num = len(acc)
lent = dt * num
newdt = 0.01
newnum = np.floor(num * dt / newdt)
acc = np.interp((np.arange(newnum) + 1) * newdt, (np.arange(num) + 1) * dt, acc)
nfft = int(2 ** np.ceil(np.log2(newnum)))
h = np.fft.fft(acc, n=nfft)
f = np.fft.fftfreq(nfft, d=newdt)
h = np.abs(h[1 : int(len(h) / 2)]) / lent
f = f[1 : int(len(f) / 2)]
if smooth == 'kohmachi':
h = kohmachi(h, f, coef)
if smooth == 'movemean':
h = np.convolve(h, np.ones((coef,)) / coef, mode='same')
if smooth == 's-g':
h = signal.savgol_filter(h, coef, order, mode='nearest')
if freq is not None:
h = np.interp(freq, f, h)
f = freq
return f, h
def kohmachi(sign: np.ndarray, freq: np.ndarray, coef: int) -> np.ndarray:
"""信号频谱平滑函数
Args:
sign (np.ndarray): 信号频谱数组
freq (np.ndarray): 信号频谱的频率数组
coef (int): 平滑系数
Returns:
np.ndarray: 平滑后的频谱
"""
f_shift = freq / (1 + 1e-4)
f_shift = np.repeat(f_shift[:, None], len(f_shift), axis=1)
z = f_shift / freq
w = (np.sin(coef * np.log10(z)) / (coef * np.log10(z))) ** 4
y = sign.dot(w) / np.sum(w, axis=0)
return y
def EQprocess(acc: np.ndarray, dt: float) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""GVDA和WLA数据库处理地震动的算法
Args:
acc (np.ndarray): 地震加速度时程数组
dt (float): 时间步长
Returns:
tuple[np.ndarray, np.ndarray, np.ndarray]: 处理后的加速度、速度和位移时程
"""
dt = float(dt)
t = np.linspace(dt, dt * len(acc), len(acc))
loc_uncer = 0.2
# Remove mean
acc = acc - np.mean(acc)
# Copy acc
acc_copy1 = np.copy(acc)
acc_copy2 = np.copy(acc)
# Remove linear trend
coef1 = np.polyfit(t, acc_copy1, 1)
acc_fit = coef1[0] * t + coef1[1]
acc_copy1 = acc_copy1 - acc_fit
# Acausal bandpass filter
sos = signal.butter(4, [0.1, 20], 'bandpass', fs=int(1 / dt), output='sos')
acc_filter = signal.sosfilt(sos, acc_copy1)
# Find event onset
loc, _ = matlabeng.PphasePicker(matlab.double(acc_filter.tolist()), dt, 'sm', nargout=2)
# Initial baseline correction: remove pre-event mean
acc_copy2 = acc_copy2 - np.mean(acc_copy2[:int((loc - loc_uncer) / dt)])
# Integrate to velocity
vel, _ = getvel_dsp(acc_copy2, dt)
# Compute best fit trend in velocity
vel_fit1_coef = np.polyfit(t, vel, 1)
vel_fit2_coef = np.polyfit(t, vel, 2)
vel_fit1 = vel_fit1_coef[0] * t + vel_fit1_coef[1]
vel_fit2 = vel_fit2_coef[0] * t * t + vel_fit2_coef[1] * t + vel_fit2_coef[2]
RMSD1 = np.sqrt(np.mean((vel_fit1 - vel) ** 2))
RMSD2 = np.sqrt(np.mean((vel_fit2 - vel) ** 2))
# Remove derivative of best fit trend from accelerationf
if RMSD1 > RMSD2:
acc_copy2 = acc_copy2 - (2 * vel_fit2_coef[0] * t + vel_fit2_coef[1])
else:
acc_copy2 = acc_copy2 - vel_fit1_coef[0]
# Integrate acceleration to velocity
vel, _ = getvel_dsp(acc_copy2, dt)
# Quality check for velocity
flc = 0.1
fhc = np.min([40, 0.5 / dt - 5])
win_len = np.max([loc - loc_uncer, 1 / flc])
lead = np.abs(np.mean(vel[:int(win_len / dt)]))
trail = np.abs(np.mean(vel[-int(win_len / dt):]))
if lead > 0.01 or trail > 0.01:
print('Quality check for velocity not pass!')
# Tapering and padding
N_begin = int((loc - loc_uncer) / dt / 2)
N_end = int(3 / dt)
taper_begin = 0.5 * (1 - np.cos(np.pi * np.linspace(0, N_begin - 1, N_begin) / N_begin))
taper_end = 0.5 * (1 + np.cos(np.pi * np.linspace(0, N_end - 1, N_end) / N_end))
acc_copy2[:N_begin] = acc_copy2[:N_begin] * taper_begin
acc_copy2[-N_end:] = acc_copy2[-N_end:] * taper_end
num_pad = int(6 / flc / dt)
acc_copy2 = np.concatenate([np.zeros(int(num_pad / 2)), acc_copy2, np.zeros(int(num_pad / 2))])
# Acausal bandpass filter acceleration
sos = scipy.signal.butter(4, [flc, fhc], 'bandpass', fs=int(1 / dt), output='sos')
acc_copy2 = scipy.signal.sosfilt(sos, acc_copy2)
# Integrate acceleration to velocity and displacement
vel, dsp = getvel_dsp(acc_copy2, dt)
acc_copy2 = acc_copy2[int(num_pad / 2) + 2 : len(acc_copy2) - int(num_pad / 2) + 2]
vel = vel[int(num_pad / 2) + 2 : len(vel) - int(num_pad / 2) + 2]
dsp = dsp[int(num_pad / 2) + 2 : len(dsp) - int(num_pad / 2) + 2]
# Quality check for final velocity and displacement
win_len = np.max([loc - loc_uncer, 1 / flc])
vel_lead = np.abs(np.mean(vel[:int(win_len / dt)]))
vel_trail = np.abs(np.mean(vel[-int(win_len / dt):]))
dsp_trail = np.abs(np.mean(dsp[-int(win_len / dt):]))
if vel_lead > 0.01 or vel_trail > 0.01 or dsp_trail > 0.01:
print('Quality check for velocity and displacement not pass!')
return acc_copy2, vel, dsp
def getAccMsg(acc: np.ndarray, dt: float) -> list:
"""获取加速度时程的相关信息(PGA、PGV、PGD等)
Args:
acc (np.ndarray): 加速度时程
dt (float): 时间步长
Returns:
list: 加速度时程的关键参数
"""
accmsg = []
acc2 = np.insert(acc, 0, [0])
acc2 = acc2[0 : -1]
accAvg = (acc + acc2) / 2
vel = np.cumsum(accAvg) * dt
velAvg = vel + (acc / 3 + acc2 / 6) * dt
dsp = np.cumsum(velAvg) * dt
accmsg.append(max(abs(acc)))
accmsg.append(max(abs(vel)))
accmsg.append(max(abs(dsp)))
CAV = np.cumsum(abs(acc)) * dt
accmsg.append(CAV[-1])
Ia = np.cumsum(acc * acc) * dt * np.pi / (2 * 981)
accmsg.append(Ia[-1])
idx = np.where(Ia < 0.05 * Ia[-1])
idx5 = idx[0][-1]
idx = np.where(Ia > 0.75 * Ia[-1])
idx75 = idx[0][0]
idx = np.where(Ia > 0.95 * Ia[-1])
idx95 = idx[0][0]
accmsg.append(idx5 * dt)
accmsg.append((idx75 - idx5) * dt)
accmsg.append((idx95 - idx5) * dt)
return accmsg