-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocessing_multi_nodes.py
More file actions
79 lines (61 loc) · 2.55 KB
/
processing_multi_nodes.py
File metadata and controls
79 lines (61 loc) · 2.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
from argparse import ArgumentParser
import glob
from mpi4py import MPI
from mpi4py.futures import MPIPoolExecutor
import numpy as np
import os
import pandas as pd
import time
# process an individual file
def processing_file(file_name, channel_id):
df = pd.read_csv(file_name, sep=";")
col = f"c{channel_id}"
mean_val = df[col].mean()
return (os.path.basename(file_name), mean_val)
if __name__ == '__main__':
data_folder = "./"
pattern = "data_*.csv"
verbose = False
channel_id = 1
num_workers = 4
parser = ArgumentParser()
parser.add_argument("-p", "--pattern", dest="pattern", default=pattern, help="File name pattern")
parser.add_argument("-d", "--data-folder", dest="data_folder", default=data_folder, help="Data folder")
parser.add_argument("-c", "--channel-id", dest="channel_id", default="", help="channel")
parser.add_argument("-n", "--num-workers", dest="num_workers", default="", help="number of workers")
parser.add_argument("-v", "--verbose", dest="verbose", default=False, action='store_true', help="Verbose output")
args = parser.parse_args()
pattern = args.pattern
data_folder = args.data_folder
verbose = args.verbose
if len(args.channel_id) > 0:
channel_id = int(args.channel_id)
if len(args.num_workers) > 0:
num_workers = int(args.num_workers)
pattern = os.path.join(data_folder, "data_*.csv")
files = sorted(glob.glob(pattern))
if not files:
raise FileNotFoundError(f"No files matching {pattern}")
start_time = time.time()
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
files_this_rank = files[rank::size] # Distribute files across ranks
# TODO: process the files assigned to this rank and put into results
results = ...
# Gather results from all ranks into the root rank
all_results = comm.gather(results, root=0)
if rank == 0:
combined = [item for sublist in all_results for item in sublist]
# TODO: using MPIPoolExecutor to process files in parallel
#with MPIPoolExecutor() as executor:
# futures = [executor.submit(processing_file, f, channel_id) for f in files]
# results = [future.result() for future in futures]
end_time = time.time()
elasped_time = end_time - start_time
total = comm.allreduce(elasped_time, op=MPI.SUM)
if rank == 0:
print('Elapsed time (seconds): ', total/size)
# Print results
for res in combined:
print(f"Mean of channel {channel_id} in file {res[0]}: {res[1]}")