-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsort_parallel.cpp
More file actions
253 lines (222 loc) · 8.14 KB
/
sort_parallel.cpp
File metadata and controls
253 lines (222 loc) · 8.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
#include <iomanip>
#include <iostream>
#include <stdlib.h>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include "core/cxxopts.h"
#include "core/get_time.h"
#include "binaryFileController.h"
using namespace std;
#define DEFAULT_N "10000000000"
#define DEFAULT_SPLIT "8"
#define DEFAULT_SEED 123
#define DEFAULT_THREAD_SIZE "2"
// Node to store value and chunk file number
struct Node {
float value; // floating-point value
int chunk; // which file the value is from
};
struct compare {
bool operator()(Node a,Node b)
{
return a.value>b.value;
}
};
// Checks if the input vector arr is sorted in ascending order
// @params
// arr = input vector of floating numbers
// N = size of vector arr
bool isSort(std::vector<float> arr, unsigned long N) {
float prev, cur, next;
for(int i = 1; i < N; i++) {
if (arr[i-1] > arr[i]) {
prev = arr[i-1];
cur = arr[i];
next = arr[i+1];
printf("Mistake founded in array. Sequence not sorted [%f, %f, %f]\n", prev, cur, next);
return false;
}
}
return true;
}
// Separates an array of 10 billion numbers into smaller chunks
// and then sorts those chunks using quicksort
// Array chunks are stored in separate files
// @params
// k = number of splits (how many times the original array is partitioned)
// size = total size of the array (10 billion for this assignment)
// start = the start of the partition
// end = the end of the partition
// threads = total number of threads
void sort_K(unsigned int k, unsigned long long size, unsigned int start, unsigned int end, unsigned int threads) {
unsigned long split_size = size/threads;
for (int i = start; i < end; i++) {
vector<float> *readArr = new vector<float>(split_size);
std::string file_name = "sortedFloats_" + std::to_string(i) + ".bin";
binRead(readArr, "randomFloats.bin", split_size,i*split_size);
printf("Finished reading from large bin file, index: %d \n", i);
std::sort((*readArr).begin(), (*readArr).begin()+split_size);
binWrite(readArr, file_name, split_size, 0);
printf("Finished writing to bin file %s \n", file_name.c_str());
delete readArr;
}
printf("Finished sorting %d files\n", k);
}
// Merge all sorted array chunks into one big array
// @params
// K = number of splits (how many times the original array is partitioned)
// size = total size of the array (10 billion for this assignment)
// threads = total number of threads
void merge_K(unsigned int K, unsigned long long size, unsigned int threads) {
printf("Merging all sorted files into one\n");
std::vector<float> vecs[K*threads];
std::vector<float> sorted_vec;
int index_K[K];
unsigned long current_MaxK[K*threads];
unsigned long prev_MaxK[K*threads];
unsigned long split_size = size/(K*threads);
unsigned long avaiable_floats = 10000000;
sorted_vec.reserve(split_size);
// Initialization step
// Store the beginning of each sorted bin file in vecs
// All numbers are separated by which file they came from
for (int i = 0; i < K; i++) {
std::string file = "sortedFloats_" + std::to_string(i) + ".bin";
index_K[i] = 0;
if (avaiable_floats < split_size) {
current_MaxK[i] = avaiable_floats;
} else {
current_MaxK[i] = split_size;
}
prev_MaxK[i] = 0;
vecs[i].resize(current_MaxK[i]);
binRead(&vecs[i], file, current_MaxK[i], index_K[i]);
}
// Initialize a min heap
// using floating point values from vecs
priority_queue<Node,vector<Node>, compare> minh;
unsigned long index = 0;
for(int j=0; j<K; j++) {
if(vecs[j].empty() != true) {
struct Node node;
node.value = vecs[j][index_K[j]];
node.chunk = j;
minh.push(node);
index_K[j]++;
index++;
}
}
// Combine smaller sorted bin files
// into one large sorted file
int chunk =0;
while(minh.size() > 0) {
// Get min value from heap
// and insert it into the sorted vector
chunk = minh.top().chunk;
sorted_vec.push_back(minh.top().value);
minh.pop();
// If we ran out of numbers stored in vecs for one file
if (index_K[chunk] >= current_MaxK[chunk]) {
if (index_K[chunk] < split_size) {
prev_MaxK[chunk] = current_MaxK[chunk];
if ((avaiable_floats+index_K[chunk]) < split_size) {
current_MaxK[chunk] = avaiable_floats+index_K[chunk];
} else {
current_MaxK[chunk] = split_size;
}
// Load in more numbers from the file to vecs
// so we have access to them in memory
vecs[chunk].resize(0);
vecs[chunk].resize(current_MaxK[chunk]-index_K[chunk]);
std::string file = "sortedFloats_" + std::to_string(chunk) + ".bin";
binRead(&vecs[chunk], file, (current_MaxK[chunk]-index_K[chunk]), index_K[chunk]);
}
}
// If we still have numbers stored in vecs
if (index_K[chunk] < current_MaxK[chunk]) {
// Determine which number we just took out of the heap
// and which file it belonged to
// And then, push a new number from that file (from vecs) into the heap
struct Node node;
node.value = vecs[chunk][index_K[chunk]-prev_MaxK[chunk]];
node.chunk = chunk;
minh.push(node);
index_K[chunk]++;
index++;
}
// If the sorted vector is full
if(sorted_vec.size() > split_size) {
// Write the values in the sorted vector to a file
binWrite(&sorted_vec, "sortedFloats.bin", sorted_vec.size(), 1);
// Reset the sorted vector to take in more values
sorted_vec.resize(0);
sorted_vec.reserve(current_MaxK[chunk]);
}
}
binWrite(&sorted_vec, "sortedFloats.bin", sorted_vec.size(), 1);
}
int main(int argc, char *argv[]) {
// Initialize command line arguments
cxxopts::Options options("Serial Sorter",
"Sort lots of floating points");
options.add_options(
"custom",
{
{"nSize", "Number of floating points",
cxxopts::value<unsigned long long>()->default_value(DEFAULT_N)},
{"nSplit", "Number of split points",
cxxopts::value<unsigned long long>()->default_value(DEFAULT_SPLIT)},
{"nThreads", "Number of threads",
cxxopts::value<unsigned int>()->default_value(DEFAULT_THREAD_SIZE)}
});
// Read input arguments
auto cl_options = options.parse(argc, argv);
unsigned long long n_size = cl_options["nSize"].as<unsigned long long>();
unsigned long long n_split = cl_options["nSplit"].as<unsigned long long>();
unsigned int n_threads = cl_options["nThreads"].as<unsigned int>();
std::cout << "Number of floating points : " << n_size << "\n";
std::cout << "Number of split points : " << n_split << "\n";
std::cout << "Number of Threads : " << n_threads << std::endl;
timer serial_timer;
serial_timer.start();
// Sort K different chunks of the big array
// in different threads
std::thread threads[n_threads];
int start = 0;
int end = n_split;
for (unsigned int i = 0; i < n_threads; i++) {
threads[i] = std::thread(sort_K,
n_split, n_size, start, end, n_threads);
start = end;
end += n_split;
}
// Wait for threads to finish sorting their separate files
for (int i = 0; i < n_threads; i++) {
threads[i].join();
}
// Merge separate files into one sorted array
merge_K(n_split, n_size, n_threads);
double time_taken = serial_timer.stop();
// Print statistics
printf("\n***************\n");
printf("Finished merging all sorted bin files into one\n");
printf("Time taken to sort: %f\n", time_taken);
printf("***************\n\n");
vector<float> sorted_array;
printf("Validating the output. Checking if the array is sorted in ascending order.\n");
bool sorted = true;
for (int k = 0; k < 10; k++) {
sorted_array.resize(0);
sorted_array.resize(n_size/(n_split*n_threads));
binRead(&sorted_array, "sortedFloats.bin", n_size/(n_split*n_threads), k*(n_size/(n_split*n_threads)));
sorted = sorted && isSort(sorted_array, n_size/(n_split*n_threads));
}
if (sorted) {
printf("Validation complete. Array was sorted correctly\n");
} else {
printf("Validation complete. Array was not sorted correctly.\n");
}
return 0;
}