-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathregularizer.py
More file actions
115 lines (95 loc) · 5.06 KB
/
regularizer.py
File metadata and controls
115 lines (95 loc) · 5.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import tensorflow as tf
from tensorflow import keras
from sklearn.metrics import accuracy_score, recall_score, f1_score, precision_score
import numpy as np
from utils import MajorityVote, AveragingProbabilities, get_segments_a_decision_window
class ModelAnalyser:
def __init__(self, segment_size, segment_overlap, decision_size, decision_overlap, X, y):
"""
:param segment_size: for calculating number of segments in a decision window.
:param segment_overlap: for calculating number of segments in a decision window.
:param decision_size:
:param decision_overlap:
:param X:
:param y:
"""
self.segments_a_decision_window = get_segments_a_decision_window(segment_size, segment_overlap, decision_size)
self.decision_overlap = decision_overlap
self.X = X
self.y_real = y
def measurement(self, model, monitor):
"""
:param y_real: expected labels
:param y_prediction: the outcomes of core
:param monitor: '#METHOD_#MEASURE', #METHOD={'mv','ms'}, #MEASURE={'loss','accuracy','precision','recall','f1'}
"""
monitor_method = monitor.split('_')[0]
monitor_measure = monitor.split('_')[1]
y_prediction = model.predict(self.X)
if monitor_method == 'ms':
y_pred_labels, y_dw_real = AveragingProbabilities(self.y_real, y_prediction,
self.segments_a_decision_window,
self.decision_overlap)
y_pred_one_hot = np.zeros_like(y_pred_labels)
y_pred_one_hot[np.arange(len(y_pred_one_hot)), y_pred_labels.argmax(1)] = 1
if monitor_measure == 'loss':
# loss_fn = tf.keras.losses.get(model.loss)
# return np.mean(loss_fn(y_dw_real, y_pred_labels))
loss_fn = tf.keras.losses.MeanSquaredError()
return loss_fn(y_dw_real, y_pred_labels).numpy()
elif monitor_measure == 'accuracy':
return accuracy_score(y_dw_real, y_pred_one_hot)
elif monitor_measure == 'precision':
return precision_score(y_dw_real, y_pred_one_hot, average='macro')
elif monitor_measure == 'recall':
return recall_score(y_dw_real, y_pred_one_hot, average='macro')
elif monitor_measure == 'f1':
return f1_score(y_dw_real, y_pred_one_hot, average='macro')
if monitor_method == 'mv':
y_pred_labels, y_dw_real = MajorityVote(self.y_real, y_prediction, self.segments_a_decision_window,
self.decision_overlap)
y_pred_one_hot = np.zeros_like(y_pred_labels)
y_pred_one_hot[np.arange(len(y_pred_one_hot)), y_pred_labels.argmax(1)] = 1
if monitor_measure == 'loss':
# loss_fn = tf.keras.losses.get(model.loss)
# return np.mean(loss_fn(y_dw_real, y_pred_labels))
loss_fn = tf.keras.losses.MeanSquaredError()
return loss_fn(y_dw_real, y_pred_labels).numpy()
elif monitor_measure == 'accuracy':
return accuracy_score(y_dw_real, y_pred_one_hot)
elif monitor_measure == 'precision':
return precision_score(y_dw_real, y_pred_one_hot, average='macro')
elif monitor_measure == 'recall':
return recall_score(y_dw_real, y_pred_one_hot, average='macro')
elif monitor_measure == 'f1':
return f1_score(y_dw_real, y_pred_one_hot, average='macro')
return 0
class RestoringBest(keras.callbacks.Callback):
"""
:param monitor: '#METHOD_#MEASURE', #METHOD={'mv','ms'}, #MEASURE={'loss','accuracy','precision','recall','f1'}
"""
def __init__(self, metric: ModelAnalyser, monitor):
super(keras.callbacks.Callback, self).__init__()
self.metric = metric
self.monitor = monitor
if monitor.split('_')[1] == 'loss':
self.best = np.Inf
else:
self.best = -np.Inf
self.best_weights = None
self.best_epoch = -1
def on_epoch_end(self, epoch, logs={}):
current = self.metric.measurement(model=self.model, monitor=self.monitor)
accuracy = self.metric.measurement(model=self.model, monitor="ms_accuracy")
f1 = self.metric.measurement(model=self.model, monitor="ms_f1")
print(
'epoch %d: \t %s: %f \t %s: %f \t %s: %f' % (epoch, self.monitor, current, "accuracy", accuracy, "f1", f1))
monitor_measure = self.monitor.split('_')[1]
if (monitor_measure != 'loss' and current > self.best) or (monitor_measure == 'loss' and current < self.best):
self.best = current
self.best_weights = self.model.get_weights()
self.best_epoch = epoch
def on_train_end(self, logs=None):
if self.best_epoch > -1:
print("Restoring model weights from the end of the %d epoch." % (self.best_epoch + 1))
self.model.set_weights(self.best_weights)