-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathEmceeFitter.py
More file actions
143 lines (123 loc) · 4.61 KB
/
EmceeFitter.py
File metadata and controls
143 lines (123 loc) · 4.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
__author__='Andrea Chiappo'
__email__='chiappo.andrea@gmail.com'
import numpy as np
from sys import float_info
from emcee import EnsembleSampler
class Fitter(object):
"""Base class for EmceeChi2Fitter"""
def __init__(self, ranges, priors='uniform', scale='linear', **kwargs):
super(Fitter, self).__init__()
self.nWalkers = kwargs['walkers'] if 'walkers' in kwargs else 100
self.nThreads = kwargs['threads'] if 'threads' in kwargs else None
self.nBurnin = kwargs['burnin'] if 'burnin' in kwargs else None
self.nSteps = kwargs['steps'] if 'steps' in kwargs else 1000
self.Xargs = ranges.keys()
self.nDim = len(ranges)
self.ranges = ranges
self.priors = priors
self.scale = scale
self._initial_parameters()
def _initial_parameters(self):
"""
initialise positions of random walkers
randomly sampled points from prior distribution
"""
if self.priors=='uniform':
pos0 = np.empty([self.nWalkers,self.nDim])
for w in range(self.nWalkers):
for p,par in enumerate(self.Xargs):
pL,pR = self.ranges[par]
p0 = np.random.uniform(low=pL,high=pR)
pos0[w,p] = p0
self.pos0 = pos0
else:
raise Exception("only uniform priors implemented")
def _uniform_prior(self, theta):
for val,par in zip(theta, self.Xargs):
pi, pf = self.ranges[par]
if not pi < val < pf:
return -np.inf
return 0.0
def _prior_choice(self, theta):
if self.priors=='uniform':
return self._uniform_prior(theta)
else:
raise Exception("only uniform priors implemented")
class EmceeChi2Fitter(Fitter):
"""
function fitter minimising Chi-squared
using MCMC parameter space sampling via emcee package
input parameters
- function : function to fit
- xobs : abscissa of the observational points
- yobs : ordinate of the observational points
- ranges : dictionary with prior ranges on parameters
- priors : prior distribution of choice (for now, only uniform)
- kwargs : keyword arguments for setting emcee sampler
"""
def __init__(self, function, xobs, yobs, errors=None, *args, **kwargs):
super(EmceeChi2Fitter, self).__init__(*args, **kwargs)
self.func = function
self.errs = errors
self.xobs = xobs
self.yobs = yobs
def lnLike(self, theta):
if self.scale=="log":
theta = np.power(10, theta)
try:
tmod = self.func(self.xobs, *theta)
tobs = self.yobs
terr = self.errs
if terr is None:
chi = (tobs-tmod)**2 / tmod
else:
chi = (tobs-tmod)**2 / terr**2
LL = -sum(chi) / 2
except:
return -float_info.max
if not np.isfinite(LL):
return -float_info.max
return LL
def lnProb(self, theta):
lnPrior = self._prior_choice(theta)
if not np.isfinite(lnPrior):
return -np.inf
return lnPrior + self.lnLike(theta)
def __call__(self, nw=None, nt=None, nb=None, ns=None):
if nw is None:
nw = self.nWalkers
else:
self.nWalkers = nw
self._initial_parameters()
if nt is None:
nt = self.nThreads
if nb is None:
nb = self.nBurnin
if ns is None:
ns = self.nSteps
# setup emcee sampler
sampler = EnsembleSampler(nw, self.nDim, self.lnProb, threads=nt)
if nb:
# Run burn-in steps
pos, prob, state = sampler.run_mcmc(self.pos0, nb)
# Reset the chain to remove the burn-in samples
sampler.reset()
# from the final position in burn-in chain, sample for nsteps
sampler.run_mcmc(pos, ns, rstate0=state)
else:
# sample for nsteps
sampler.run_mcmc(self.pos0, ns)
samples = sampler.flatchain
lnprobs = sampler.flatlnprobability
indxs = np.where(lnprobs>-float_info.max)[0]
if self.scale=='linear':
samples = samples[indxs]
elif self.scale=='log':
samples = np.power(10,samples[indxs])
else:
raise Exception("prior scale must be set")
lnprobs = lnprobs[indxs]
Xmin = max(lnprobs)
indmin = np.where(lnprobs==Xmin)[0][0]
vals = samples[indmin]
return vals, samples, lnprobs