-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathproducer.py
More file actions
63 lines (54 loc) · 2.34 KB
/
producer.py
File metadata and controls
63 lines (54 loc) · 2.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#!/usr/bin/env python3
from confuent_kafka import Producer
import traceback
import sys
import json
import time
"""
This producer will send a bunch of messages to topic "fast-messages". Every so often,
it will send a message to "slow-messages". This shows how messages can be sent to
multiple topics. On the receiving end, we will see both kinds of messages but will
also see how the two topics aren't really synchronized.
"""
TOPIC_FAST_MESSAGES = "fast-messages"
TOPIC_SUMMARY_MARKERS = "summary-markers"
STREAM_SAMPLE = "/sample-stream"
NUMBER_OF_MESSAGES = 10000
FREQUENCY_DIFF_TOPICS = 10
try:
print("Producing messages...")
p = Producer({'streams.producer.default.stream': STREAM_SAMPLE})
count = 0
total_messages = 0
for i in range(NUMBER_OF_MESSAGES):
# Note that the the Java-version of this program uses System.nanoTime() which uses nanoseconds which
# Python does not natively have. Instead, time.time() is used here which returns a floating point number in
# seconds. See here for more information: https://docs.python.org/3/library/time.html#time.time.
data = {'type': 'test',
't': float("%.3f" % time.time()),
'k': i}
p.produce(TOPIC_FAST_MESSAGES, json.dumps(data).encode('utf-8'))
count += 1
total_messages += 1
if i % FREQUENCY_DIFF_TOPICS == 0:
data_fast = {'type': 'marker',
't': float("%.3f" % time.time()),
'k': i}
p.produce(TOPIC_FAST_MESSAGES, json.dumps(data_fast).encode('utf-8'))
count += 1
total_messages += 1
data_summary = {'type': 'other',
't': float("%.3f" % time.time()),
'k': i}
p.produce(TOPIC_SUMMARY_MARKERS, json.dumps(data_summary).encode('utf-8'))
count += 1
total_messages += 1
p.flush()
# Note this output is slightly different than the Java-version of this program to help improve
# understanding and readability
print("Sent {count} messages this round out of {total} sent so far".format(count=count, total=total_messages))
count = 0 # Reset count at end of loop
print("Done!")
except Exception as e:
print("*** Exception occurred:")
traceback.print_exc(file=sys.stdout)