forked from macsnoeren/python-p2p-network
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTcpServerNode.py
More file actions
330 lines (265 loc) · 13 KB
/
TcpServerNode.py
File metadata and controls
330 lines (265 loc) · 13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
#######################################################################################################################
# AVANS - BLOCKCHAIN - MINOR MAD #
# #
# Author: Maurice Snoeren #
# Version: 0.1 beta (use at your own risk) #
# #
# TcpServerNode creates a TCP/IP server on the port you have given. It accepts incoming nodes and put these into its #
# internal datastructure. When nodes disconnect, the nodes are removed. Events are generated when nodes are connected #
# , when nodes leave and when nodes have data. Furthermore, this class is able to connect to other nodes. Sending #
# data to these nodes is easy as well. The datastructure is up to you and how you implement the protocol to form the #
# decentralized peer-to-peer network. This class is at you disposal to use within your code to speed up the #
# development. #
#######################################################################################################################
import socket
import struct
import sys
import json
import time
import threading
import pprint
import random
import hashlib
from Cryptodome.PublicKey import RSA
#######################################################################################################################
# TCPServer Class #####################################################################################################
#######################################################################################################################
# Class Node
# Implements a node that is able to connect to other nodes and is able to accept connections from other nodes.
# After instantiation, the node creates a TCP/IP server with the given port.
#
class Node(threading.Thread):
# Python class constructor
def __init__(self, host, port, callback):
super(Node, self).__init__()
# When this flag is set, the node will stop and close
self.terminate_flag = threading.Event()
# Server details, host (or ip) to bind to and the port
self.host = host
self.port = port
# Events are send back to the given callback
self.callback = callback
# Nodes that have established a connection with this node
self.nodesIn = [] # Nodes that are connect with us N->(US)->N
# Nodes that this nodes is connected to
self.nodesOut = [] # Nodes that we are connected to (US)->N
# Keeps a list of connected nodes and their corresponding public keys.
self.keyList = {}
# Create a public and private key on startup
self.rsaKey = RSA.generate(2048)
self.pubKey = self.rsaKey.publickey().export_key()
# Create a unique ID for each node.
id = hashlib.md5()
t = self.host + str(self.port) + str(random.randint(1, 99999999))
id.update(t.encode('ascii'))
self.id = id.hexdigest()
# Create a transaction pool
self.transaction_pool = []
# Create a transaction pool with objects (needed for miners?)
self.transaction_data_pool = []
# Create blockchain
self.block_chain = []
# Create ledger
self.ledger = []
# Start the TCP/IP server
self.init_server()
# Creates the TCP/IP socket and bind is to the ip and port
def init_server(self):
print("Initialisation of the TcpServer on port: " + str(self.port) + " on node (" + self.id + ")")
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.bind((self.host, self.port))
self.sock.settimeout(10.0)
self.sock.listen(1)
# Print the nodes with this node is connected to. It makes two lists. One for the nodes that have established
# a connection with this node and one for the node that this node has made connection with.
def print_connections(self):
print("Connection status:")
print("- Total nodes connected with us: %d" % len(self.nodesIn))
print("- Total nodes connected to : %d" % len(self.nodesOut))
# Misleading function name, while this function checks whether the connected nodes have been terminated
# by the other host. If so, clean the array list of the nodes.
# When a connection is closed, an event is send NODEINBOUNDCLOSED or NODEOUTBOUNDCLOSED
def delete_closed_connections(self):
for n in self.nodesIn:
if n.terminate_flag.is_set():
self.callback("NODEINBOUNDCLOSED", self, n, {})
n.join()
del self.nodesIn[self.nodesIn.index(n)]
for n in self.nodesOut:
if n.terminate_flag.is_set():
self.callback("NODEOUTBOUNDCLOSED", self, n, {})
n.join()
del self.nodesOut[self.nodesIn.index(n)]
# Send a message to all the nodes that are connected with this node.
# data is a python variabele which is converted to JSON that is send over to the other node.
# exclude list gives all the nodes to which this data should not be sent.
def send_to_nodes(self, data, exclude = []):
for n in self.nodesIn:
if n in exclude:
print("TcpServer.send2nodes: Excluding node in sending the message")
else:
self.send_to_node(n, data)
for n in self.nodesOut:
if n in exclude:
print("TcpServer.send2nodes: Excluding node in sending the message")
else:
self.send_to_node(n, data)
# Send the data to the node n if it exists.
# data is a python variabele which is converted to JSON that is send over to the other node.
def send_to_node(self, n, data):
self.delete_closed_connections()
if n in self.nodesIn or n in self.nodesOut:
try:
# n.send("{}|{}".format(len(data),data))
n.send(data + '\n')
except:
print("TcpServer.send2node: Error while sending data to the node")
else:
print("TcpServer.send2node: Could not send the data, node is not found!")
# Make a connection with another node that is running on host with port.
# When the connection is made, an event is triggered CONNECTEDWITHNODE.
def connect_with_node(self, host, port):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print("connecting to %s port %s" % (host, port))
sock.connect((host, port))
thread_client = NodeConnection(self, sock, (host, port), self.callback)
thread_client.start()
self.nodesOut.append(thread_client)
self.callback("CONNECTEDWITHNODE", self, thread_client, {})
self.print_connections()
except:
print("TcpServer.connect_with_node: Could not connect with node.")
# Disconnect with a node. It sends a last message to the node!
def disconnect_with_node(self, node):
if node in self.nodesOut:
node.stop()
node.send({"type": "message", "message": "Terminate connection"})
node.join() # When this is here, the application is waiting and waiting
del self.nodesOut[self.nodesOut.index(node)]
# When this function is executed, the thread will stop!
def stop(self):
self.terminate_flag.set()
# This method is required for the Thead function and is called when it is started.
# This function implements the main loop of this thread.
def run(self):
while not self.terminate_flag.is_set(): # Check whether the thread needs to be closed
try:
# print("Wait for connection")
connection, client_address = self.sock.accept()
thread_client = NodeConnection(self, connection, client_address, self.callback)
thread_client.start()
self.nodesIn.append(thread_client)
self.callback("NODECONNECTED", self, thread_client, {})
except socket.timeout:
pass
except:
raise
time.sleep(0.01)
print("TcpServer stopping...")
for t in self.nodesIn:
t.stop()
for t in self.nodesOut:
t.stop()
time.sleep(1)
for t in self.nodesIn:
t.join()
for t in self.nodesOut:
t.join()
self.sock.close()
print("TcpServer stopped")
#######################################################################################################################
# NodeConnection Class ###############################################################################################
#######################################################################################################################
# Class NodeConnection
# Implements the connection that is made with a node.
# Both inbound and outbound nodes are created with this class.
# Events are send when data is coming from the node
# Messages could be sent to this node.
class NodeConnection(threading.Thread):
# Python constructor
def __init__(self, nodeServer, sock, clientAddress, callback):
super(NodeConnection, self).__init__()
self.host = clientAddress[0]
self.port = clientAddress[1]
self.nodeServer = nodeServer
self.sock = sock
self.clientAddress = clientAddress
self.callback = callback
self.terminate_flag = threading.Event()
id = hashlib.md5()
t = self.host + str(self.port) + str(random.randint(1, 99999999))
id.update(t.encode('ascii'))
self.id = id.hexdigest()
print("NodeConnection.send: Started with client (" + self.id + ") '" + self.host + ":" + str(self.port) + "'")
# Send data to the node. The data should be a python variabele
# This data is converted into json and send.
def send(self, data):
try:
self.sock.sendall(json.dumps(data, separators=(',', ':')).encode('ascii'))
except:
print("NodeConnection.send: Unexpected error:", sys.exc_info()[0])
self.terminate_flag.set()
def parse(self, line):
if line != "":
try:
obj = json.loads(json.loads(line))
self.callback(obj['event'], self.nodeServer, self, obj)
except:
print("NodeConnection: Data could not be parsed (%s)" % line)
# Stop the node client. Please make sure you join the thread.
def stop(self):
self.terminate_flag.set()
# Required to implement the Thread. This is the main loop of the node client.
def run(self):
# Timeout, so the socket can be closed when it is dead!
self.sock.settimeout(10.0)
while not self.terminate_flag.is_set(): # Check whether the thread needs to be closed
# line = ""
try:
data = self.sock.recv(4096).decode("utf-8")
for line in data.split('\\n'):
if line != '"':
if line[1] == '"':
line = line[1:]
self.parse(line + '"')
except socket.timeout:
pass
except:
self.terminate_flag.set()
print("NodeConnection: Socket has been terminated (%s)" % line)
time.sleep(0.01)
self.sock.settimeout(None)
self.sock.close()
print("NodeConnection: Stopped")
#######################################################################################################################
# Example usage of Node ###############################################################################################
#######################################################################################################################
#
# from TcpServerNode import Node
#
# node = None # global variable
#
# def callbackNodeEvent(event, node, other, data):
# print("Event Node 1 (" + node.id + "): %s: %s" % (event, data))
# node.send2nodes({"thank": "you"})
#
# node = Node('localhost', 10000, callbackNodeEvent)
#
# node.start()
#
# node.connect_with_node('12.34.56.78', 20000)
#
# server.terminate_flag.set() # Stopping the thread
#
# node.send2nodes({"type": "message", "message": "test"})
#
# while ( 1 ):
# time.sleep(1)
#
# node.stop()
#
# node.join()
#
#
# END OF FILE