-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinv3.py
More file actions
140 lines (124 loc) · 5.7 KB
/
inv3.py
File metadata and controls
140 lines (124 loc) · 5.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
from kafka import KafkaConsumer, KafkaProducer
from json import loads, dumps
# Define variables for tracking previous portfolio evaluation
portfolio1 = {
'HPQ': 2200,
'ZM': 1800,
'DELL': 2400,
'NVDA': 1200,
'IBM': 1900,
'INTC': 1600
}
portfolio2 = {
'VZ': 1800,
'AVGO': 2900,
'TWTR': 1600,
'AAPL': 2200,
'DELL': 2500,
'ORCL': 2000
}
# Create Kafka consumer and subscribe to "StockExchange1" topic
vdszer = lambda x: loads(x.decode('utf-8'))
consumer = KafkaConsumer('StockExchange1', bootstrap_servers=['localhost:9092'],
group_id='g3',
value_deserializer=vdszer)
# Create Kafka producer to write to "portfolios" topic
vszer = lambda x: dumps(x).encode('utf-8')
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=vszer)
# Evaluate portfolio for each day
# Initialize dictionaries to store portfolio evaluation and other data
port1 = {}
port2 = {}
n_stock1 = {}
n_stock2 = {}
dates = {}
prev_evaluation1 = {}
prev_evaluation2 = {}
n = 0
for message in consumer:
data = message.value
if data['TICK'] in portfolio1 or data['TICK'] in portfolio2:
if data['TS'] not in dates:
n += 1
dates[data['TS']] = n
# Calculate evaluation for portfolio1
if data['TICK'] in portfolio1:
price = float(data['PRICE'])
evaluation = price * portfolio1[data['TICK']]
if data['TS'] in port1:
port1[data['TS']] += evaluation
n_stock1[data['TS']] += 1
else:
port1[data['TS']] = evaluation
n_stock1[data['TS']] = 1
# Check if 6 stocks have been evaluated for the current timestamp
if n_stock1[data['TS']] == 6:
n_stock1[data['TS']] += 1
if dates[data['TS']] == 1:
# Send the message to the "portfolios" topic
msg = {'Investor': 'Inv3', 'Portfolio': '1', 'Evaluation': port1[data['TS']],
'Difference': "There is no previous evaluation",
'Percentage Difference': "There is no previous evaluation",
'Timestamp': data['TS']}
#print(msg)
producer.send("portfolios", value=msg)
# This evaluation is the next day's previous evaluation
prev_evaluation1[dates[data['TS']]] = port1[data['TS']]
else:
if dates[data['TS']] - 1 in prev_evaluation1:
# Send the message to the "portfolios" topic
msg = {'Investor': 'Inv3', 'Portfolio': '1', 'Evaluation': port1[data['TS']],
'Difference': port1[data['TS']] - prev_evaluation1[dates[data['TS']] - 1],
'Percentage Difference': ((port1[data['TS']] - prev_evaluation1[dates[data['TS']] - 1]) /
prev_evaluation1[dates[data['TS']] - 1]) * 100,
'Timestamp': data['TS']}
producer.send("portfolios", value=msg)
#print(msg)
# This evaluation is the next day's previous evaluation
prev_evaluation1[dates[data['TS']]] = port1[data['TS']]
else:
err = "There was an error. Try running the inv.py files first, and then the servers."
#print(err)
producer.send("portfolios", value=msg)
break
# Calculate evaluation for portfolio2
if data['TICK'] in portfolio2:
price = float(data['PRICE'])
evaluation = price * portfolio2[data['TICK']]
if data['TS'] in port2:
port2[data['TS']] += evaluation
n_stock2[data['TS']] += 1
else:
port2[data['TS']] = evaluation
n_stock2[data['TS']] = 1
# Check if 5 stocks have been evaluated for the current timestamp
if n_stock2[data['TS']] == 5:
n_stock2[data['TS']] += 1
if dates[data['TS']] == 1:
# Send the message to the "portfolios" topic
msg = {'Investor': 'Inv3', 'Portfolio': '2', 'Evaluation': port2[data['TS']],
'Difference': "There is no previous evaluation",
'Percentage Difference': "There is no previous evaluation",
'Timestamp': data['TS']}
#print(msg)
producer.send("portfolios", value=msg)
# This evaluation is the next day's previous evaluation
prev_evaluation2[dates[data['TS']]] = port2[data['TS']]
else:
if dates[data['TS']] - 1 in prev_evaluation2:
# Send the message to the "portfolios" topic
msg = {'Investor': 'Inv3', 'Portfolio': '2', 'Evaluation': port2[data['TS']],
'Difference': port2[data['TS']] - prev_evaluation2[dates[data['TS']] - 1],
'Percentage Difference': ((port2[data['TS']] - prev_evaluation2[dates[data['TS']] - 1]) /
prev_evaluation2[dates[data['TS']] - 1]) * 100,
'Timestamp': data['TS']}
producer.send("portfolios", value=msg)
#print(msg)
# This evaluation is the next day's previous evaluation
prev_evaluation2[dates[data['TS']]] = port2[data['TS']]
else:
err = "There was an error. Try running the inv.py files first, and then the servers."
#print(err)
producer.send("portfolios", value=msg)
break