-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathGraphProducer.py
More file actions
52 lines (40 loc) · 1.59 KB
/
Copy pathGraphProducer.py
File metadata and controls
52 lines (40 loc) · 1.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import json
from time import sleep
from kafka import KafkaProducer
from kafka.errors import KafkaError
from neo4j import GraphDatabase
def fetch_all_nodes():
uri = "neo4j://localhost:7687"
driver = GraphDatabase.driver(uri, auth=("neo4j", "hua-neo4j"))
with driver.session() as session:
result = session.read_transaction(retrieve_all_nodes_from_neo4j)
for record in result:
return record
def retrieve_all_nodes_from_neo4j(tx):
query = (
"MATCH (n) "
"RETURN n "
)
result = tx.run(query)
print("Returned all nodes from neo4j.")
return [result.data()]
all_neo4j_nodes = fetch_all_nodes()
filtered_neo4j_nodes_list = []
for obj in range(len(all_neo4j_nodes)):
# Removing {'n':}
filtered_neo4j_nodes_list.append(all_neo4j_nodes[obj]["n"])
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda m: json.dumps(m).encode('utf-8'))
print('Sending total nodes: {}'.format(len(all_neo4j_nodes)))
for i in range(len(filtered_neo4j_nodes_list)):
print('Sending data to users-topic: ', filtered_neo4j_nodes_list[i])
graph_producer = producer.send('users-topic', filtered_neo4j_nodes_list[i])
sleep(4)
# Waiting 4 seconds per row, will provide the required result: 5 elements per 20 seconds
try:
# response_producer = graph_producer.get(timeout=2)
# in case we wanted to retrieve topic, partition,offset
print('Successfully published the message to users-topic')
except KafkaError as e:
print('[ERROR] ' + e.__str__())
producer.flush()