-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmapper_node.py
More file actions
89 lines (69 loc) · 3.38 KB
/
mapper_node.py
File metadata and controls
89 lines (69 loc) · 3.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env python3
# Import XML RPC server to receive data from the main_node to map
from xmlrpc.server import SimpleXMLRPCServer
# Import XML RPC client to then send the mapped data to the reducer
import xmlrpc.client
# Import group by to sort the data that will be separated by which node
from itertools import groupby
# Initialize the mapper with an address and port number
# Start an rpc server to get the data
def init_mapper(address, port, mapper_num):
print("Mapper {} ready!".format(mapper_num))
server = SimpleXMLRPCServer((address, port), allow_none=True)
server.register_function(map_data_wordcount, "map_data_wordcount")
server.register_function(map_data_inverted_index, "map_data_inverted_index")
server.serve_forever()
# Map the data given from the main node
# Send it out based on the size of the word by hashing
def map_data_wordcount(data, mappers, reducers, address, port):
values = []
# The reducer node ports are numbered
reducer_node = mappers + reducers - 1
# Clean up any grammatical things in the word and map it
# Then create a new array to store the newly mapped data
# Store it with the respective mapper it will go to, this is done with the hashing function
for word in data:
word = word.replace(".", "")
word = word.replace(",", "")
mapped_word = word + ",1"
node_num = hash_word(word, reducers)
values.append(str(node_num) + ":" + str(mapped_word))
# Sort the data by the reducer its going to and then group it together
values.sort(key=lambda x: x[0])
data = [list(i) for j, i in groupby(values, lambda a: a.split(":")[0])]
print("Sending to reducers")
# For each reducer, send out its designated data over RPC using an rpc client
for i in range(reducers):
with xmlrpc.client.ServerProxy("http://" + address + ":" + str(port + reducer_node)) as proxy:
proxy.get_map_wordcount(data[i], mappers, i, address, port)
# Map the data given from the main node
# Send it out based on the size of the word by hashing
def map_data_inverted_index(data, mappers, reducers, address, port, doc_num):
mapped = []
cleaned = []
# Clean up the words for commas and periods
for word in data:
word = word.replace(",", "")
word = word.replace(".", "")
cleaned.append(word)
# Sort the data alphabetically
cleaned.sort()
# Group the data together that is the same (based on the word)
data = [list(i) for j, i in groupby(cleaned)]
# Go through each set of data (like words) and add its designation by hashing from word length
for i in data:
occur = len(i)
word = i[0]
hash_value = hash_word(word, reducers)
mapped.append(str(hash_value) + ":" + str(word) + "," + str(doc_num) + "-" + str(occur))
# Sort the data based on the reducer it is going to
mapped.sort()
chunks = [list(i) for j, i in groupby(mapped, lambda a: a.split(":")[0])]
print("Sending to reducers")
# Go through all of the data and sent it to its designated reducer
for i in range(len(chunks)):
with xmlrpc.client.ServerProxy("http://" + address + ":" + str(port + mappers + i)) as proxy:
proxy.get_map_inverted_index(chunks[i], mappers, i, address, port)
# Return N - 1 reducer to send to based on the length % number of reducers
def hash_word(word, reducers):
return len(word) % reducers