-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbenchmark.py
More file actions
155 lines (109 loc) · 4.06 KB
/
benchmark.py
File metadata and controls
155 lines (109 loc) · 4.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import concurrent.futures
import functools
import time
import queue
import mmap_queue
def sum_worker():
my_sum = 0
while True:
try:
res = q.get_bytes().tobytes()
my_sum += int(res)
#time.sleep(0.001)
except queue.Empty:
break
return my_sum
def bench_sum_workers(num_procs, num_objects):
expected = 0
start_ts = time.time()
for n in range(num_objects):
payload = str(n+1).encode()
q.put_bytes(payload)
expected += n+1
end_ts = time.time()
executor = concurrent.futures.ProcessPoolExecutor(max_workers=num_procs)
procs = []
for _ in range(num_procs):
p = executor.submit(sum_worker,)
procs.append(p)
total = 0
for _ in range(num_procs):
res = procs[_].result()
total += res
assert total == expected
def bench_inserts(num_objects):
test_object = b'1'*32
start_ts = time.time()
for _ in range(num_objects):
q.put_bytes(test_object)
end_ts = time.time()
elasped = end_ts-start_ts
insert_rate = num_objects/elasped
mb_size = (len(test_object) * num_objects) / (1024**2)
bytes_rate = mb_size / elasped
print(f'inserted {num_objects} in {elasped:.6f}s, {num_objects/elasped:.2f} inserts/second, '
f'{mb_size:.2f} MB, {bytes_rate:.2f} MB/s')
def put_bytes(test_object, num_objects):
for _ in range(num_objects):
q.put_bytes(test_object)
def batch_inserts_multiprocess(num_procs, num_objects, payload_size=32):
executor = concurrent.futures.ProcessPoolExecutor(max_workers=num_procs)
procs = []
test_object = b'1'*payload_size
start_ts = time.time()
for _ in range(num_procs):
p = executor.submit(put_bytes, test_object, num_objects // num_procs)
procs.append(p)
for _ in range(num_procs):
procs[_].result()
end_ts = time.time()
elasped = end_ts-start_ts
insert_rate = num_objects/elasped
mb_size = (len(test_object) * num_objects) / (1024**2)
bytes_rate = mb_size / elasped
print(f'{num_procs} workers inserted {num_objects} in {elasped:.6f}s, {num_objects/elasped:.2f} inserts/second, '
f'{mb_size:.2f} MB, {bytes_rate:.2f} MB/s')
if __name__ == '__main__':
# use queue on global scope to prevent pickling
print('bench_sum_workers anonymous mmap 1M')
q = mmap_queue.RingBuffer(1*1024**2)
bench_sum_workers(num_procs=2, num_objects=1000)
del q
print('-'*30)
print('bench_sum_workers file-based mmap 1M')
q = mmap_queue.RingBuffer(50*1024**2, file_path='bench.buf')
bench_sum_workers(num_procs=2, num_objects=1_000_000)
del q
print('-'*30)
print('bench_inserts file-based mmap 1M, 32 B payload')
q = mmap_queue.RingBuffer(500*1024**2, file_path='bench.buf')
bench_inserts(1_000_000)
del q
print('-'*30)
print('batch_inserts_multiprocess anonymous mmap, 32 B payload')
q = mmap_queue.RingBuffer(500*1024**2, )
batch_inserts_multiprocess(4, 1_000_000)
del q
print('-'*30)
print('batch_inserts_multiprocess file-backed mmap, 32 B payload')
q = mmap_queue.RingBuffer(500*1024**2, file_path='bench.buf')
batch_inserts_multiprocess(4, 1_000_000)
del q
print('-'*30)
print('batch_inserts_multiprocess anonymous mmap 1 KB payload')
q = mmap_queue.RingBuffer(500*1024**2)
batch_inserts_multiprocess(4, 100_000, payload_size=1024)
del q
print('-'*30)
print('batch_inserts_multiprocess file-backed mmap 1 KB payload')
q = mmap_queue.RingBuffer(500*1024**2, file_path='bench.buf')
batch_inserts_multiprocess(4, 100_000, payload_size=1024)
print('-'*30)
print('batch_inserts_multiprocess anonymous mmap 10 KB payload')
q = mmap_queue.RingBuffer(500*1024**2)
batch_inserts_multiprocess(4, 10000, payload_size=10*1024)
del q
print('-'*30)
print('batch_inserts_multiprocess file-backed mmap 10 KB payload')
q = mmap_queue.RingBuffer(500*1024**2, file_path='bench.buf')
batch_inserts_multiprocess(4, 10000, payload_size=10*1024)