-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathml_utils.py
More file actions
326 lines (289 loc) · 13.9 KB
/
ml_utils.py
File metadata and controls
326 lines (289 loc) · 13.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Utility module to facilitate searching through hyperparameter
space without having to write a lot of boiler plate code.
Example:
search = random_search(X, y, estimator, params, n_iter, scoring, cv, n_jobs=1, **kwargs)
or
search = grid_search(X, y, estimator, params, scoring, cv, n_jobs=1, **kwargs)
Then, check results:
display_summary(search, show_report=True, show_graphics=True, n_top=5)
"""
import os
import sys
import time
import warnings
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from skopt import gp_minimize
from sklearn.model_selection import (GridSearchCV,
RandomizedSearchCV,
learning_curve,
validation_curve,
cross_val_score)
from sklearn.exceptions import NotFittedError
from operator import itemgetter
def bayes_search(X, y, estimator, params, scoring, cv, n_calls, greater_is_better, n_jobs=1, fit_estimator=True, **kwargs):
"""
Leverage package `scikit-optimize` (skopt) in order to do
hyperparameter tuning in a Bayesian optimalization framework
Parameters:
-------------------
X : training data (n_samples, n_features)
y : training labels (n_samples,)
estimator : estimator implementing `fit` and `predict` methods
params : parameter dictionary
scoring : scoring (string or callable)
cv : cross-validation generator (see docstring of e.g. RandomizedSearchCV)
n_calls : number of iterations to run Bayesian optimization
greater_is_better : set to True if optimizing a metric that should
be increased for better model performance (e.g. accuracy). Conversely,
set to False if you are working with a metric that should be minimized.
n_jobs : number of jobs to run in parallel (default: 1)
fit_estimator : If True, refit the best estimator on all the data (default: True)
**kwargs : other parameters you wish to send to `gp_minimize`
Returns:
-------------------
Tuple consisting of (estimator, best_params, best_score) where `estimator`
by default has been refitted (see `fit_estimator`) to all the input data.
`best_params` is a dictionary with optimized parameters
`best_score` is the best score obtained on the cross-validated data
"""
if len(y.shape)>1: y = y.ravel()
param_names = list(name for name,_ in params.items())
dim = list(dm for _,dm in params.items())
def evalfunc(parameters):
try_params = {k:v for k, v in zip(param_names, parameters)}
estimator.set_params(**try_params)
score = np.mean(np.array(cross_val_score(estimator, X, y, cv=cv, scoring=scoring, n_jobs=n_jobs)))
return -score if greater_is_better else score
with warnings.catch_warnings():
warnings.simplefilter('ignore')
try:
res_gp = gp_minimize(evalfunc, dim, n_calls=n_calls, **kwargs)
except:
warnings.warn("Optimization failed: %s" % sys.exc_info()[1], NotFittedError)
return None
best_score = -res_gp.fun if greater_is_better else res_gp.fun
best_params = {k:v for k,v in zip(param_names, res_gp.x)}
if fit_estimator:
estimator.set_params(**best_params)
estimator.fit(X, y)
return (estimator, best_params, best_score)
def grid_search(X, y, estimator, params, scoring, cv, n_jobs=1, **kwargs):
""" Convenience method for grid search through hyperparameter space of any estimator
Input:
--------------
X: training data (n_samples, n_features)
y: training labels (n_samples,)
estimator: estimator implementing `fit` and `predict` methods
params: parameter dictionary
scoring: scoring (string or callable)
cv: cross-validation generator (see docstring of RandomizedSearchCV)
n_jobs: number of jobs to run in parallel (default: 1)
**kwargs: other parameters you wish to send to RandomizedSearchCV
Output:
--------------
Fitted instance of GridSearchCV or `None` if we failed to fit.
"""
if len(y.shape)>1: y = y.ravel()
search = GridSearchCV(estimator, param_grid=params,
scoring=scoring, cv=cv, n_jobs=n_jobs, **kwargs)
start_time = time.time()
try:
search.fit(X, y)
except:
warnings.warn("Failed to fit.", NotFittedError)
return None
print("Total time spent searching: {0:.1f} minutes."\
.format((time.time()-start_time)/60.), end='\n')
return search
def random_search(X, y, estimator, params, n_iter, scoring, cv, n_jobs=1, **kwargs):
""" Convenience method for random search through hyperparameter space of any estimator
Input:
--------------
X: training data (n_samples, n_features)
y: training labels (n_samples,)
estimator: estimator implementing `fit` and `predict` methods
params: parameter distribution dictionary
n_iter: number of random draws from parameter dictionary to test
scoring: scoring (string or callable)
cv: cross-validation generator (see docstring of RandomizedSearchCV)
n_jobs: number of jobs to run in parallel (default: 1)
**kwargs: other parameters you wish to send to RandomizedSearchCV
Output:
--------------
Fitted instance of RandomizedSearchCV or `None` if we failed to fit.
"""
if len(y.shape)>1: y = y.ravel()
search = RandomizedSearchCV(estimator, param_distributions=params, n_iter=n_iter,
scoring=scoring, cv=cv, n_jobs=n_jobs, **kwargs)
start_time = time.time()
try:
search.fit(X, y)
except:
warnings.warn("Failed to fit.", NotFittedError)
return None
print("Total time spent searching: {0:.1f} minutes."\
.format((time.time()-start_time)/60.), end='\n')
return search
def display_summary(search, show_report=True, show_graphics=True, n_top=5):
"""
Display summary of fitting procedure
Input:
--------------
search: fitted instance of RandomizedSearchCV (output from `random_search`)
show_report: display a text report (for `n_top` models)
show_graphics: display a graphical summary of fitted models (all)
n_top: show model summary for `n_top` models (default: 5)
Ouput:
--------------
Nothing to output
"""
assert hasattr(search, 'cv_results_')
df = pd.DataFrame(search.cv_results_)
# Print a report
if show_report:
_display_report(df, n_top)
# Visualize graphically
if show_graphics:
_display_graphics(df)
return
def _display_report(df, n_top):
top_scores = df.sort_values('rank_test_score').head(n_top)
for i, (_, score) in enumerate(top_scores.iterrows()):
print("Model with rank: {0}".format(i + 1))
print("Mean validation score: {0:.3f} (std: {1:.3f})".format(score.mean_test_score, score.std_test_score))
print("Parameters: {0}".format(score.params), end='\n\n')
return
def _display_graphics(df, **kwargs):
plt.figure(figsize=(12,4))
plt.errorbar(x=1+df.index, y=df.mean_test_score, yerr=df.std_test_score, fmt='--o', ecolor='k', **kwargs)
plt.xticks(1+df.index)
plt.xlabel("Model index"); plt.ylabel("CV score"); plt.title("Results from cross-validation")
plt.tight_layout()
return plt
def plot_learning_curve(title, X, y, estimator, scoring, cv, n_jobs=1, ylim=None, train_sizes=np.linspace(.1, 1.0, 5), **kwargs):
"""
Visualize learning curve. Helps to evaluate under-/overfitting.
Parameters:
--------------
title : Title for the chart
X: training data (n_samples, n_features)
y: training labels (n_samples,)
estimator: estimator to evaluate behavior for. Implements `fit` and `predict` methods
scoring: type of metric to evaluate
cv: cross-validation generator
n_jobs: number of jobs to run in parallel
ylim: tuple, shape (ymin, ymax), optional. Defines minimum and maximum yvalues plotted
train_sizes: proportions of training data to inspect (default: np.linspace(0.1, 1.0, 5))
**kwargs: other arguments to send to `learning_curve` method
"""
plt.figure(figsize=(12,4))
plt.title(title)
if ylim is not None:
plt.ylim(*ylim)
plt.xlabel("Training examples")
plt.ylabel("Score")
train_sizes, train_scores, test_scores = \
learning_curve(estimator, X, y, scoring=scoring, cv=cv, n_jobs=n_jobs, train_sizes=train_sizes, **kwargs)
train_scores_mean = np.mean(train_scores, axis=1)
train_scores_std = np.std(train_scores, axis=1)
test_scores_mean = np.mean(test_scores, axis=1)
test_scores_std = np.std(test_scores, axis=1)
plt.grid()
plt.fill_between(train_sizes, train_scores_mean-train_scores_std,
train_scores_mean+train_scores_std, alpha=0.1, color="r")
plt.fill_between(train_sizes, test_scores_mean-test_scores_std,
test_scores_mean+test_scores_std, alpha=0.1, color="g")
plt.plot(train_sizes, train_scores_mean, 'o-', color="r", label="Training score")
plt.plot(train_sizes, test_scores_mean, 'o-', color="g", label="Cross-validation score")
plt.legend(loc="best")
return plt
def explore_param(title, X, y, estimator, scoring, cv, param_name, param_range, n_jobs=1, ylim=None, logparam=False, **kwargs):
"""
Visualize a 1-dimensional validation curve for a given parameter. Useful when checking
sensitivity to a given parameter (evaluate conditional distribution).
Parameters:
--------------
title : title for the chart
X: training data (n_samples, n_features)
y: training labels (n_samples,)
estimator: estimator to evaluate behavior for. Implements `fit` and `predict` methods
scoring: type of metric to evaluate
cv: cross-validation generator
param_name: name of parameter to vary
param_range: parameter range
n_jobs: number of jobs to run in parallel
ylim: tuple, shape (ymin, ymax), optional. Defines minimum and maximum yvalues plotted
logparam: If True, then plot logarithmic x-axis (good for parameters with loguniform priors)
**kwargs: other arguments to send to `validation_curve` method
"""
train_scores, test_scores = validation_curve(estimator, X, y,
param_name=param_name, param_range=param_range,
cv=cv, scoring=scoring, n_jobs=n_jobs, **kwargs)
train_scores_mean = np.mean(train_scores, axis=1)
train_scores_std = np.std(train_scores, axis=1)
test_scores_mean = np.mean(test_scores, axis=1)
test_scores_std = np.std(test_scores, axis=1)
plt.figure(figsize=(12,4))
plt.title(title)
plt.xlabel(param_name)
plt.ylabel("Score")
if ylim is not None:
plt.ylim(*ylim)
if logparam:
plt.semilogx(param_range, train_scores_mean, label="Training score", color="darkorange", lw=2)
else:
plt.plot(param_range, train_scores_mean, label="Training score", color="darkorange", lw=2)
plt.fill_between(param_range, train_scores_mean-train_scores_std,
train_scores_mean+train_scores_std, alpha=0.2, color="darkorange", lw=2)
if logparam:
plt.semilogx(param_range, test_scores_mean, label="Cross-validation score", color="navy", lw=2)
else:
plt.plot(param_range, test_scores_mean, label="Cross-validation score", color="navy", lw=2)
plt.fill_between(param_range, test_scores_mean-test_scores_std,
test_scores_mean+test_scores_std, alpha=0.2, color="navy", lw=2)
plt.legend(loc="best")
return plt
def ensemble_classifier_from_best_params(X_test, X_train, y_train, estimator_class, fitted_params, n_ensembles=5):
"""
Convenient way to 'average' predictions from multiple instances of a single classifier, using the
best parameters found from some type of hyperparameter search.
Parameters:
-------------------
X_test : array-like, shape(n_samples_test). The data we wish to predict on
X_train : array-like, shape (n_samples_train, n_features). Training data
y_train : array-like, shape (n_samples_train,). Training labels
estimator_class : a class of scikit-learn estimator Note: it should not be instanced.
E.g.: 'RandomForestClassifier' NOT 'RandomForestClassifier()'
fitted_params : dictionary. A set of (preferably optimized) parameters to be fed to `estimator`
n_ensembles : int. Controls number of instances to ensemble over (default: 5)
Returns:
-------------------
ensembled_preds : numpy array. Majority ensembled predictions which usually beat the single best
classifier predictions trained on the same data
"""
# Check that algorithm implements a `random_state` parameter, which is needed for this type of
# ensembling to be meaningful. Note that this invalidates this type of ensembling for
# memory-based learners, where randomness in the fitting procedure is completely redundant.
assert hasattr(estimator_class(), 'random_state')
# Generate and train new model for each seed
models = []
for _ in range(n_ensembles):
model = estimator_class(random_state=np.random.randint(low=0, high=1e9))
model.set_params(**fitted_params)
model.fit(X_train, y_train)
models.append(model)
del model
# Predict on test data
preds = [m.predict(X_test) for m in models]
# Ensemble the predictions (majority vote)
ensembled_preds = []
for jj in range(X_test.shape[0]):
ensembled_preds.append(most_common([preds[ii][jj] for ii in range(n_ensembles)]))
return np.array(ensembled_preds)
def most_common(lst):
return max(set(lst), key=lst.count)