-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmap-reduce-pi.py
More file actions
112 lines (93 loc) · 3.62 KB
/
map-reduce-pi.py
File metadata and controls
112 lines (93 loc) · 3.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# Demo: Map/reduce with multiprocessing
import os
import numpy as np
import multiprocessing as mp
from multiprocessing import Pool, Process
import functools
#import cProfile
from pyinstrument import Profiler
def calculate_pi(darts):
"""approximate pi by throwing darts at a board"""
np.random.seed() # we need to set the random seed... see what happens if you comment this line out
x = np.random.uniform(-1, 1, darts)
y = np.random.uniform(-1, 1, darts)
r_sq = x**2 + y**2
return 4*np.sum(r_sq<1)/darts
'''
an example of functions with multiple arguments that require Pool.starmap()
'''
def calculate_pi_multiargs(darts, params):
"""approximate pi by throwing darts at a board"""
np.random.seed() # we need to set the random seed... see what happens if you comment this line out
x = np.random.uniform(-1, 1, darts)
y = np.random.uniform(-1, 1, darts)
r_sq = x**2 + y**2
return 4*np.sum(r_sq<1)/darts
# brute force reduction with map
def calculate_pi_parallel(darts_per_process, Ncores):
with Pool(Ncores) as pool:
results = pool.map(calculate_pi, [darts_per_process for i in range(Ncores)])
return np.sum(results)/Ncores
# brute force reduction with starmap for functions with multiple arguments
def calculate_pi_parallel_multiargs(darts_per_process, Ncores):
with Pool(Ncores) as pool:
# args is a list of Ncores tuples, each tuple contains the arguments passed to the function executed by a worker
args = []
for i in range(Ncores):
args.append((darts_per_process, i))
results = pool.starmap(calculate_pi_multiargs, args)
return np.sum(results)/Ncores
# put the result in a queue instead of returning
def calculate_pi_serial(darts, queue):
"""approximate pi by throwing darts at a board"""
np.random.seed() # we need to set the random seed... see what happens if you comment this line out
x = np.random.uniform(-1, 1, darts)
y = np.random.uniform(-1, 1, darts)
r_sq = x**2 + y**2
queue.put(4*np.sum(r_sq<1)/darts)
# calculate pi with queue
def calculate_pi_parallel_queue(darts_per_process, Ncores):
processes = []
queue = []
for i in range(Ncores):
q = mp.Queue()
p = Process(target=calculate_pi_serial, args=(darts_per_process, q))
p.start()
processes.append(p)
queue.append(q)
for p in processes:
p.join()
# get the result from the queues: brute-force tallying across the queues
total = 0
for q in queue:
total += q.get()
return np.sum(total)/Ncores
# generalized operator used for reduction
def combine_operator(a, b):
return (a + b)
# map with pool and reduce with functools
def calculate_pi_parallel_reduce(darts_per_process, Ncores):
# create a process pool with Ncores processes
# then map the function calculate_pi with each of the Ncores argument sets
with Pool(Ncores) as pool:
results = pool.map(calculate_pi, [darts_per_process for i in range(Ncores)])
# finally reduce the results using the operator defined above
return functools.reduce(combine_operator, results)/Ncores
#return functools.reduce(lambda a, b: a+b, results)/Ncores
if __name__=="__main__":
# show the max number of cores on the node,
# not nessarily the available cores allocated to the job
nmax_cores = mp.cpu_count()
print(f"Max number of cores: {nmax_cores}")
N = 10000000
Ncores = 4
#profiler = Profiler()
#profiler.start()
#a = calculate_pi_parallel_queue(darts_per_process, Ncores)
#print(a)
print(f"Using {Ncores} processes")
#a = calculate_pi_parallel_reduce(N//Ncores, Ncores)
a = calculate_pi_parallel_multiargs(N//Ncores, Ncores)
print(a)
#profiler.stop()
#profiler.print()