-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
247 lines (206 loc) · 7.26 KB
/
utils.py
File metadata and controls
247 lines (206 loc) · 7.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
from headers import *
##################################################################################
# Mathematical functions
def W3d_sth(x):
"""Fourier transform of a 3d spherical top hat window function.
Use x = k*R as input,
where R is the tophat radius and k the wave vector.
Input and output are dimensionless.
"""
if x < 1.e-3: # for small x, replace by expansion, for numerical stability
f = 1. - 0.1* x**2 + 0.00357143* x**4
else:
f = (3./(x**3)) * ( np.sin(x) - x * np.cos(x) )
return f
def dW3d_sth(x):
"""Derivative of the FT of the top hat.
Input and output are dimensionless.
"""
f = 3. * (3. * x * np.cos(x) - 3. * np.sin(x) + (x**2) * np.sin(x)) / (x**4)
return f
def W2d_cth(x):
"""FT of a 2d circular top hat window function.
Input and output are dimensionless.
"""
return 2.*special.jn(1, x) / x
def W1d_th(x):
"""FT of a 1d tophat
normalized to unity at k=0 (ie real space integral is 1)
Input and output are dimensionless.
"""
return sinc(x/2.)
def Si(x):
return special.sici(x)[0]
def Ci(x):
return special.sici(x)[1]
def sinc(x):
return special.sph_jn(0, x)[0][0]
def j0(x):
"""relevant for isotropic Fourier transform in 2d
"""
return special.jn(0, x)
def i0(x):
"""Modified Bessel function of the first kind
"""
return special.iv(0, x)
##################################################################################
# formatting numbers
def intExpForm(input):
"""
clean scientific notation for file names
removes trailing decimal point if not needed
"""
a = '%e' % np.float(input)
# mantissa: remove trailing zeros
# then remove dot if no decimal digits
mantissa = a.split('e')[0].rstrip('0').rstrip('.')
# exponent: remove + sign if there, and leading zeros
exponent = np.int(a.split('e')[1])
exponent = np.str(exponent)
if exponent=='0':
return mantissa
else:
return mantissa + 'e' + exponent
def floatExpForm(input):
"""same as intExpForm, except always leaves the decimal point
"""
a = '%e' % np.float(input)
# mantissa: remove trailing zeros
# then remove dot if no decimal digits
mantissa = a.split('e')[0].rstrip('0')
# exponent: remove + sign if there, and leading zeros
exponent = np.int(a.split('e')[1])
exponent = np.str(exponent)
if exponent=='0':
return mantissa
else:
return mantissa + 'e' + exponent
##################################################################################
# Matrix inversion for ill-conditioned matrices, with SVD
def invertMatrixSvdTruncated(matrix, epsilon=1.e-5, keepLow=True):
'''Invert a matrix by inverting its SVD,
and setting to zero the singular values that are too small/large.
epsilon sets the tolerance for discarding singular values.
keepLow=True: for inverting a cov matrix, want to keep the modes with lowest variance.
keepLow=False: for inverting a Fisher maytrix, want to keep the modes with highest Fisher information.
'''
# Perform SVD on matrix
U, s, Vh = scipy.linalg.svd(matrix)
# invert the singular values
sInv = 1./s
# remove the super poorly constrained modes, that lead to numerical instabilities
if keepLow:
sInvMax = np.max(sInv)
sInv[sInv<=sInvMax*epsilon] = 0.
else:
sInvMin = np.min(sInv)
sInv[sInv>=sInvMin/epsilon] = 0.
# create diagonal matrix
sInv = np.diag(sInv)
# invert the hermitian matrices
V = np.conj(Vh.transpose())
Uh = np.conj(U.transpose())
# generate the inverse
result = np.dot(V, np.dot(sInv, Uh))
return result
##################################################################################
# Generating ell bins with constant number of modes
def generateEllBins(lMin, lMax, nL, fsky=1.):
'''Generates nL bins between lMin and lMax,
such that the number of 2d modes in each bin is identical.
Returns the bin centers, the bin edges, the bin widths, and the number of modes per bin.
'''
# area in ell space between lMin and l,
# normalized to 1 when l=lMax
farea = lambda l: (l**2 - lMin**2) / (lMax**2 - lMin**2)
# find the bin edges,
# such that each bin has equal number of modes
Le = np.zeros(nL+1)
for iL in range(nL+1):
f = lambda l: farea(l) - float(iL) / nL
Le[iL] = optimize.brentq(f , lMin, lMax)
# use the average ell in the bin, weighted by number of modes, as bin center
Lc = 2./3. * (Le[1:]**3 - Le[:-1]**3) / (Le[1:]**2 - Le[:-1]**2)
# bin spacing
dL = Le[1:] - Le[:-1]
# Number of modes
lF = 2.*np.pi / np.sqrt(4. * np.pi * fsky)
nModesTotal = np.pi * (lMax**2 - lMin**2) / lF**2
Nmodes = nModesTotal / nL * np.ones(nL)
return Lc, dL, Nmodes, Le
##################################################################################
# Extract non-mask data vector and cov matrix
def extractMaskedMat(cov, mask=None, I=None):
'''cov: large matrix
mask: 1d array, 0 if unmasked, anything else if masked
I: indices of the large matrix to keep, pre-masking
'''
# convert mask to 0 and 1
mask = mask.astype(bool)
# extract indices of interest, if needed
if I is not None:
mask = mask[I]
J = np.ix_(I, I)
cov = cov[J]
if mask is not None:
# nb of unmasked rows
nNew = np.int(np.sum(1 - mask))
# mask cov matrix
mask = np.diag(mask)
cov = ma.masked_array(cov, mask=mask)
cov = ma.mask_rowcols(cov)
# extract the non-masked elements
cov = cov.compressed().reshape((nNew, nNew))
return cov
def extractMaskedVec(vec, mask=None, I=None):
'''vec: large vector
mask: 1d array, 0 if unmasked, anything else if masked
I: indices of the large vector to keep, pre-masking
'''
# convert mask to 0 and 1
mask = mask.astype(bool)
# extract indices of interest, if needed
if I is not None:
mask = mask[I]
vec = vec[I]
if mask is not None:
# mask vec matrix
vec = ma.masked_array(vec, mask=mask)
# extract the non-masked elements
vec = vec.compressed()
return vec
##################################################################################
# Measuring RAM usage of the current process
def currentRssMB(pid=None):
"""Returns the RSS (resident set size, ie portion of RAM occupied by a process).
Output in MB.
"""
memory = dict(psutil.Process(pid=None).memory_info()._asdict())
result = memory['rss'] # in Bytes
return result / 1.e6
##################################################################################
# !!! works only on scalars, not on arrays
#def divide(x, y, exceptOut=0.):
# '''Returns 0. or any requested value
# when dividing by zero.
# '''
# try: return x/y
# except ZeroDivisionError: return exceptOut
def divide(x, y, exceptOut=0.):
'''Returns 0. or any requested value
when dividing by zero.
Works both on scalars and arrays.
'''
# if both inputs are scalar, make one an array
inputScalar = False
if np.shape(x)==() and np.shape(y)==():
inputScalar = True
x = np.array([x])
# do the division
result = x / y
# where it went wrong, replace value with exceptOut
result[np.where(np.isfinite(result)==False)] = exceptOut
# if inputs were both scalar, make the result a scalar
if inputScalar:
result = result[0]
return result