-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathdata.py
More file actions
executable file
·70 lines (55 loc) · 2.14 KB
/
data.py
File metadata and controls
executable file
·70 lines (55 loc) · 2.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
'''
Keras Data Generator
'''
import keras
from keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
import numpy as np
import os
import keras
class DataGenerator(keras.utils.Sequence):
def __init__(self, indexes, batch_size, datadir):
self.batch_size = batch_size
self.indexes = indexes
self.batches = []
self.index2path = get_index2path_dict(datadir,indexes)
self.num = len(self) * batch_size # self.num % batch_size = 0
self.on_epoch_start()
def __len__(self): # how many batches
return len(self.indexes) // self.batch_size
def __getitem__(self, batch_index):
X = []
Y = np.empty((self.batch_size), dtype=float)
for i,index in enumerate(self.batches[batch_index]):
x, Y[i] = np.load(self.index2path[index])
X.append(x)
return pad_sequences(X), Y
def on_epoch_start(self):
np.random.shuffle(self.indexes)
indexes = self.indexes[ :self.num ]
self.batches = np.split( indexes, indices_or_sections=len(self) )
def get_y_array(self):
Y = []
for i in range(len(self)):
x,y = self[i]
Y.extend(y)
return np.array(Y)
def get_index2path_dict(datadir,indexes=[]):
''' if indexes == []: gets all indexes '''
index2path = {}
folders = [f for f in os.scandir(datadir) if f.is_dir() and f.name != 'json']
for folder in folders:
files = [f for f in os.scandir(folder.path) if f.name[-4:] == '.npy']
for f in files:
index = int(f.name[:-4])
if indexes == []:
index2path[index] = f.path
elif index in indexes:
index2path[index] = f.path
return index2path
def split_indexes(datadir,test_split):
index2path = get_index2path_dict(datadir)
all_indexes = [int(os.path.basename(path)[:-4]) for path in index2path.values()]
data_indexes,test_indexes,_,_ = train_test_split(all_indexes,all_indexes,test_size=test_split)
return np.array(data_indexes), np.array(test_indexes)
#