-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfour_box.py
More file actions
156 lines (135 loc) · 4.51 KB
/
four_box.py
File metadata and controls
156 lines (135 loc) · 4.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import numpy as np
from scipy.integrate import solve_ivp
from scipy.optimize import minimize
from scipy.linalg import expm
from step import step
class FourBox:
def __init__(self, C, κ, ϵ, F):
self.C = C
self.κ = κ
self.ϵ = ϵ
self.F = F
self.A = A_matrix(C, κ, ϵ)
self.B = B_matrix(C)
def __repr__(self):
return f'FourBox(C={self.C}, κ={self.κ}, ϵ={self.ϵ}, F={self.F})'
def timescales(self):
eigvals = np.linalg.eigvals(self.A)
tau = -1 / eigvals
return tau
def step(self, n, method='analytical'):
if method == 'analytical':
return step_analytical(self.A, self.B, self.F, n)
elif method == 'numerical':
return step_numerical(self.A, self.B, self.F, n)
else:
raise ValueError("Method must be 'analytical' or 'numerical'.")
def observe(self, x):
κ = self.κ
ϵ = self.ϵ
F = self.F
tas = x[:, 0]
rtmt = F - κ[0]*tas + (1 - ϵ)*κ[3]*(x[:, 2] - x[:, 3])
y = np.column_stack((tas, rtmt))
return y
def forward(self, n, method='analytical'):
x = self.step(n, method)
y = self.observe(x)
return y
def A_matrix(C, κ, ϵ):
A = np.zeros((4, 4))
A[0, :] = np.array([-(κ[0] + κ[1]), κ[1], 0, 0]) / C[0]
A[1, :] = np.array([κ[1], -(κ[1] + κ[2]), κ[2], 0]) / C[1]
A[2, :] = np.array([0, κ[2], -(κ[2] + ϵ*κ[3]), ϵ*κ[3]]) / C[2]
A[3, :] = np.array([0, 0, κ[3], -κ[3]]) / C[3]
return A
def B_matrix(C):
B = np.zeros((4, 1))
B[0] = 1/C[0]
return B
def step_numerical(A, B, F, n):
def ode_system(t, y):
gradient = A @ y.reshape(-1, 1) + B * F
return gradient.flatten()
y0 = np.zeros(4)
t_span = (0, n)
t_eval = np.arange(1, n + 1)
sol = solve_ivp(ode_system, t_span, y0, 'RK45', t_eval)
return sol.y.T
def step_analytical(A, B, F, n):
# Eigendecomposition: A = V D V_inv
eigvals, V = np.linalg.eig(A)
V_inv = np.linalg.inv(V)
# Precompute constant term
BF_transformed = V_inv @ (B * F) # (4, 1)
# Time steps (n,)
t = np.arange(1, n + 1)
# Compute exp(D * t) for all t: shape (n, 4)
exp_diag = np.exp(np.outer(t, eigvals)) # (n, 4)
# Compute (exp - 1) / lambda
diag_terms = (exp_diag - 1) / eigvals # (n, 4)
# Scale V columns by diag_terms
scaled_V = V[None, :, :] * diag_terms[:, None, :] # (n, 4, 4)
# Multiply by BF in transformed space
x = (scaled_V @ BF_transformed).squeeze(-1) # (n, 4)
return x
def step_loop(A, B, F, n):
eA = expm(A)
AinvBF = np.linalg.solve(A, B * F).flatten()
x = np.empty((n, 4))
eAk = eA.copy()
I = np.eye(4)
x[0] = (eAk - I) @ AinvBF
for k in range(1, n):
np.matmul(eAk, eA, out=eAk)
x[k] = (eAk - I) @ AinvBF
return x
def step_loop2(A, B, F, n):
eA = expm(A)
x = np.linalg.solve(A, (eA - np.eye(4)) @ B * F).T.repeat(n, axis=0)
for k in range(1, n):
x[k] += eA @ x[k-1]
return x
def step_fortran(A, B, F, n):
eA = expm(A)
AinvBF = np.linalg.solve(A, B * F).flatten()
x = step(eA, AinvBF, n).T
return x
def pack(C, κ, ϵ, F):
theta = np.concatenate((C, κ, [ϵ], [F]))
log_theta = np.log(theta)
return log_theta
def unpack(log_theta):
theta = np.exp(log_theta)
C = theta[:4]
κ = theta[4:8]
ϵ = theta[8]
F = theta[9]
return C, κ, ϵ, F
def penalty(tau, target=np.array([1., 10., 100., 1000.])):
log_ratio = np.log(tau) - np.log(target)
return np.sum(log_ratio**2)
def mse_loss(log_theta, y, alpha=0):
C, κ, ϵ, F = unpack(log_theta)
model = FourBox(C, κ, ϵ, F)
y_pred = model.forward(y.shape[0], method='analytical')
mse = np.mean((y - y_pred)**2) # data loss
tau = model.timescales()
penalty_value = penalty(tau) # penalty on timescales
return mse + alpha*penalty_value
def fit_model(y, alpha=0, C_init=None, κ_init=None, ϵ_init=None, F_init=None):
if C_init is None:
C_init = [5., 20., 80., 150.]
if κ_init is None:
κ_init = [1., 1.5, 0.75, 0.5]
if ϵ_init is None:
ϵ_init = 1.
if F_init is None:
F_init = 5.9
log_theta_init = pack(C_init, κ_init, ϵ_init, F_init)
res = minimize(mse_loss, log_theta_init, args=(y, alpha), method='L-BFGS-B')
if not res.success:
raise RuntimeError("Optimization failed: " + res.message)
C, κ, ϵ, F = unpack(res.x)
model = FourBox(C, κ, ϵ, F)
return model, res