-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathMultiThreadedExecutor.py
More file actions
executable file
·80 lines (72 loc) · 3.03 KB
/
MultiThreadedExecutor.py
File metadata and controls
executable file
·80 lines (72 loc) · 3.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
from numpy.random import default_rng, SeedSequence
import multiprocessing
import concurrent.futures
import numpy as np
class MultiThreadedExecutor:
debug = True
def __init__(self, threads=None):
if threads is None:
threads = multiprocessing.cpu_count()
self.threads = threads
self.executor = concurrent.futures.ThreadPoolExecutor(threads)
def fill(self, data_func, in_data, out_shape = None):
def _fill(data_func, in_data, out_data, first, last):
out_data[first:last] = [data_func(d) for d in in_data[first:last]]
futures = {}
out_data = np.zeros(out_shape or len(in_data))
step = np.ceil(len(in_data) / self.threads).astype(np.int_)
for i in range(min(self.threads, len(in_data))):
args = (_fill,
data_func,
in_data,
out_data,
i * step,
min((i + 1) * step, len(in_data)))
if self.debug:
#execute single threaded to debug
_fill(*args[1:])
else:
futures[self.executor.submit(*args)] = i
concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
return out_data
def fill_list(self, data_func, in_data):
def _fill(data_func, in_data, out_data, first, last):
for i, d in enumerate(in_data[first:last]):
out_data[first+i] = data_func(d)
futures = {}
out_data = [None for i in range(len(in_data))]
step = np.ceil(len(in_data) / self.threads).astype(np.int_)
for i in range(min(self.threads, len(in_data))):
args = (_fill,
data_func,
in_data,
out_data,
i * step,
min((i + 1) * step, len(in_data)))
if self.debug:
#execute single threaded to debug
_fill(*args[1:])
else:
futures[self.executor.submit(*args)] = i
concurrent.futures.wait(futures, return_when=concurrent.futures.ALL_COMPLETED)
return out_data
def exec(self, data_func, in_data):
def _exec(data_func, in_data, first, last):
for d in in_data[first:last]:
data_func(d)
futures = {}
step = np.ceil(len(in_data) / self.threads).astype(np.int_)
for i in range(min(self.threads, len(in_data))):
args = (_exec,
data_func,
in_data,
i * step,
min((i + 1) * step, len(in_data)))
if self.debug:
#execute single threaded to debug
_exec(*args[1:])
else:
futures[self.executor.submit(*args)] = i
concurrent.futures.wait(futures.keys(), return_when=concurrent.futures.ALL_COMPLETED)
def __del__(self):
self.executor.shutdown(False)