-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
112 lines (100 loc) · 4.31 KB
/
utils.py
File metadata and controls
112 lines (100 loc) · 4.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#!/usr/bin/env python3
import time
import sklearn.manifold as manifold
import numpy as np
from sklearn.decomposition import PCA, TruncatedSVD, SparsePCA
from argparse import ArgumentParser
import configparser
# a wrapper function for execution time measurement
def time_usage(func):
def wrapper(*args, **kwargs):
start = time.perf_counter()
retval = func(*args, **kwargs)
end = time.perf_counter()
print("elapsed time: (%s) %f ms" % (func.__name__, 1000*(end - start)))
return retval
return wrapper
def deprecated(func):
def wrapper(*args, **kwargs):
print("Warning deprecated function:", func.__name__)
return func(args, kwargs)
return wrapper
def dimension_reduction(reduction_type, n_neighbors):
if reduction_type == "PCA":
reductor = PCA(n_components=2, svd_solver='full')
elif reduction_type == "LLE":
reductor = manifold.LocallyLinearEmbedding(n_neighbors=n_neighbors,
n_components=min(2, n_neighbors),
method="modified",
n_jobs=-1)
elif reduction_type == "TruncatedSVD":
reductor = TruncatedSVD(n_components=2)
elif reduction_type == "SparsePCA":
reductor = SparsePCA(n_components=2, n_jobs=-1)
return reductor
def filter_knn(matrix, vectors, dists, neigh_ids):
mask = np.zeros_like(matrix)
mask[np.triu_indices_from(mask)] = np.Inf
indices = set(np.argwhere((matrix + mask) < 1e-4)[:, 0])
good_ind = list(set(range(matrix.shape[0])) - indices)
matrix = matrix[good_ind, :]
matrix = matrix[:, good_ind]
if hasattr(vectors, 'tocsr'):
vectors = vectors.tocsr()[good_ind, :].tocoo()
else:
vectors = [v for i, v in enumerate(vectors) if i in good_ind]
neig_before = neigh_ids
neigh_ids, dists = zip(*((d, v) for i, (d, v) in enumerate(zip(neigh_ids, dists)) if i in good_ind))
neigh_ids, dists = list(neigh_ids), list(dists)
return matrix, vectors, dists, neigh_ids
def get_args():
# Needed when updating the models when the API is not running
# Needed when configs are not passed as arguments in the API launchment
config = configparser.ConfigParser()
config.read('config.ini')
parser = ArgumentParser(prog='argpconfig')
parser.add_argument('--host',
'-hs',
required=True,
help='Host to be specified on api launch.',
default=config['lamapi']['host'])
parser.add_argument('--port',
'-p',
required=True,
help='Port to be specified on api launch.',
default=config['lamapi']['port'])
parser.add_argument('--debug',
'-d',
help='Activate debug mode',
action='store_true',
default=False)
parser.add_argument('--cert',
'-ct',
help='Point to your ssl certification path',
default=config['lamapi']['cert'])
parser.add_argument('--key',
'-ky',
help='Point to your ssl privatekey path',
default=config['lamapi']['key'])
parser.add_argument('--dbhost',
'-bdh',
help='Point to x5gon db host',
efault=config['x5gondb']['dbhost'])
parser.add_argument('--dbname',
'-bdn',
help='Point to x5gon db name',
default=config['x5gondb']['dbname'])
parser.add_argument('--dbuser',
'-bdu',
help='Point to x5gon db user',
default=config['x5gondb']['dbuser'])
parser.add_argument('--dbpass',
'-bdpw',
help='Point to x5gon db user password',
default=config['x5gondb']['dbpass'])
parser.add_argument('--dbport',
'-bdp',
help='Point to x5gon db port',
default=config['x5gondb']['dbport'])
args = parser.parse_args()
return args