-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
executable file
·75 lines (56 loc) · 2.27 KB
/
main.py
File metadata and controls
executable file
·75 lines (56 loc) · 2.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#!/usr/bin/env python3
import sys
import websockets
import asyncio
import os
HOST = os.getenv('SOCKETS_HOST', 'localhost')
def getPort(topic_name, protocal):
if protocal == "sub":
return 1234
return 1235
pubs = []
subs = []
async def new_sub(new_sub):
subs.append(new_sub)
print("New subscription. Total number of subs: " +
str(len(subs)))
async def new_pub(new_pub):
pubs.append(new_pub)
print("New publisher. Total number of pubs: " +
str(len(pubs)))
async for message in new_pub:
# Keep watching the subscription for new messages
for s in subs:
await s.send(message)
async def createTopic(topic_name):
async with websockets.serve(new_pub, HOST, getPort(topic_name, "pub")), websockets.serve(new_sub, HOST, getPort(topic_name, "sub")):
await asyncio.Future() # run forever
async def sub_topic(topic_name):
# TODO Client drops connection between messages. This is probably not ideal but is dependent on
# load and use case. Look into heartbeating to keep connections alive if load testing proves
# the price of recreating the connection is 'expensive'.
while True:
async with websockets.connect("ws://" + HOST + ":" + str(getPort(topic_name, 'sub'))) as s:
print("+ " + await s.recv())
async def pub_client(topic_name):
# TODO Client drops connection between messages. This is probably not ideal but is dependent on
# load and use case. Look into heartbeating to keep connections alive if load testing proves
# the price of recreating the connection is 'expensive'.
while True:
async with websockets.connect("ws://" + HOST + ":" + str(getPort(topic_name, 'pub'))) as s:
await s.send(input("* "))
def main(mode, topic_name):
if(mode == 'new_topic'):
asyncio.run(createTopic(topic_name))
elif(mode == 'pub_topic'):
asyncio.run(pub_client(topic_name))
elif(mode == 'sub_topic'):
asyncio.run(sub_topic(topic_name))
else:
print(
"Invalid mode. Should be ['new_topic' || 'sub_topic' || 'pub_topic']")
if __name__ == '__main__':
if(len(sys.argv) < 3): # Make sure we're given a 'mode' and a 'topic_name'
main(None, None)
else:
main(sys.argv[1], sys.argv[2])