-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathkinesis_consumer.py
More file actions
32 lines (22 loc) · 1.1 KB
/
kinesis_consumer.py
File metadata and controls
32 lines (22 loc) · 1.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import boto3
import json
from datetime import datetime
import time
my_stream_name = 'jishas-stream'
kinesis_client = boto3.client('kinesis', region_name='us-east-2',
aws_access_key_id='xx',
aws_secret_access_key='xx')
response = kinesis_client.describe_stream(StreamName=my_stream_name)
my_shard_id = response['StreamDescription']['Shards'][0]['ShardId']
shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name,
ShardId=my_shard_id,
ShardIteratorType='LATEST')
my_shard_iterator = shard_iterator['ShardIterator']
record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator,
Limit=2)
while 'NextShardIterator' in record_response:
record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'],
Limit=2)
print (record_response)
# wait for 5 seconds
time.sleep(5)