-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmp_utils.py
More file actions
76 lines (56 loc) · 2.16 KB
/
mp_utils.py
File metadata and controls
76 lines (56 loc) · 2.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import ctypes
from multiprocessing import Pool, Array, Value
import numpy as np
def emb2Arr(emb):
"""Converts embedding to a shared array."""
return Array(ctypes.c_double, emb.ravel())
def arr2Arr(arr, is_int=False):
"""Converts np.ndarray to a shared array."""
if is_int:
return Array(ctypes.c_int, arr, lock=False)
return Array(ctypes.c_double, arr, lock=False)
def int2Val(value):
"""Converts an int to a shared int."""
return Value('i', value, lock=False)
def Arr2emb(arr, v=None):
"""Reads a shared array as embeddings, without data copy by leveraging on some numpy magic."""
global VOCABSIZE
if v is None:
v = VOCABSIZE.value
return np.frombuffer(arr.get_obj()).reshape((v, -1))
def Arr2arr(arr, is_int=False):
"""Reads a shared array as a np.array, without data copy by leveraging on some numpy magic."""
if is_int:
return np.frombuffer(arr, dtype="int32")
return np.frombuffer(arr)
def init(vocab_, k_, context_size_, noise_probas_, w_emb_, c_emb_, all_ids_):
"""Little helper to share the data between all workers."""
global VOCABSIZE, k_factor, context_size, noise_probas, word_embeddings, context_embeddings, probas, all_ids
VOCABSIZE = vocab_
k_factor = k_
context_size = context_size_
noise_probas = noise_probas_
word_embeddings = w_emb_
context_embeddings = c_emb_
all_ids = all_ids_
def parallel_iter(fun, args, n_worker, initargs):
p = Pool(n_worker, initializer=init, initargs=initargs)
res = p.imap_unordered(fun, args, chunksize=500) # small chunks to get some speed improvement
for r in res:
yield r
p.close()
p.join()
def unpack_iterator(iterator):
while True:
listed_res = next(iterator)
for res in listed_res:
if res:
yield res
def add_to_iterator(iterator, arg):
for d in iterator:
yield((d, arg))
def build_iterator(fun, args, eta, n_worker, initargs):
base_iterator = parallel_iter(fun, args, n_worker, initargs)
unpacked_iterator = unpack_iterator(base_iterator)
final_iterator = add_to_iterator(unpacked_iterator, eta)
return final_iterator