-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathredis_callbacks.py
More file actions
51 lines (44 loc) · 1.42 KB
/
redis_callbacks.py
File metadata and controls
51 lines (44 loc) · 1.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import json
import redis
import sys
from datetime import datetime
from config import REDIS_HOST, REDIS_PORT, REDIS_PASSWORD, REDIS_QUEUE_NAME, TOPIC
if not redis:
print("Redis package not available. Please check requirements.txt.")
sys.exit(1)
try:
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, db=0)
pong = redis_client.ping()
queue_len = redis_client.llen(REDIS_QUEUE_NAME)
except ConnectionError as e:
print("Could not connect to Redis:", e)
sys.exit(1)
except TimeoutError as e:
print("Redis connection timed out:", e)
sys.exit(1)
except Exception as e:
print("Error:", e)
sys.exit(1)
def on_connect(client, userdata, flags, rc):
print("Connected to MQTT broker with code:", rc)
client.subscribe(TOPIC)
def on_message(client, userdata, msg, max=0):
topic = msg.topic
try:
payload = msg.payload.decode('utf-8')
except UnicodeDecodeError:
payload = msg.payload.hex()
data = {
"topic": topic,
"value": payload,
"timestamp": datetime.now().isoformat()
}
queue_len = redis_client.llen(REDIS_QUEUE_NAME)
if max and queue_len >= max:
print(f"Queue limit reached ({max}). Skipping: {topic}")
return
try:
redis_client.rpush(REDIS_QUEUE_NAME, json.dumps(data))
print(f"Processed: {topic}")
except Exception as e:
print("Redis push error:", e)