-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathledController.py
More file actions
166 lines (118 loc) · 6.02 KB
/
ledController.py
File metadata and controls
166 lines (118 loc) · 6.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""
File: chapter04/mqtt_led.py
A full life-cycle Python + MQTT program to control an LED.
Dependencies:
pip3 install paho-mqtt gpiozero pigpio
Built and tested with Python 3.7 on Raspberry Pi 4 Model B
"""
import logging
import signal
import sys
import json
from time import sleep
from gpiozero import Device, PWMLED
from gpiozero.pins.pigpio import PiGPIOFactory
import paho.mqtt.client as mqtt # (1)
# Initialize Logging
logging.basicConfig(level=logging.WARNING) # Global logging configuration
logger = logging.getLogger("main") # Logger for this module
logger.setLevel(logging.INFO) # Debugging for this file.
# Initialize GPIO
Device.pin_factory = PiGPIOFactory() # Set GPIOZero to use PiGPIO by default.
# Global Variables
LED_GPIO_PIN = 13
BROKER_HOST = "192.168.8.196" # (2)
BROKER_PORT = 1883
CLIENT_ID = "LEDClient" # (3)
TOPIC = "led" # (4)
client = None # MQTT client instance. See init_mqtt() # (5)
led = None # PWMLED Instance. See init_led()
"""
GPIO Related Functions
"""
class ledController:
def init_led(self):
"""Create and initialise an LED Object"""
global led
led = PWMLED(LED_GPIO_PIN)
led.off()
def set_led_level(self,data): # (6)
"""Set LED State to one of On, Blink or Off (Default)
'data' expected to be a dictionary with the following format:
{
"level": a number between 0 and 100,
}
"""
level = None # a number 0..100
if "level" in data:
level = data["level"]
if isinstance(level, int) or isinstance(level, float) or level.isdigit():
# State is a number
level = max(0, min(100, int(level))) # Bound state to range 0..100
led.value = level / 100 # Scale 0..100% back to 0..1
logger.info("LED at brightness {}%".format(level))
else:
logger.info("Request for unknown LED level of '{}'. We'll turn it Off instead.".format(level))
led.value = 0 # 0% = Led off.
else:
logger.info("Message '{}' did not contain property 'level'.".format(data))
"""
MQTT Related Functions and Callbacks
"""
def on_connect(self,client, user_data, flags, connection_result_code): # (7)
"""on_connect is called when our program connects to the MQTT Broker.
Always subscribe to topics in an on_connect() callback.
This way if a connection is lost, the automatic
re-connection will also results in the re-subscription occurring."""
if connection_result_code == 0: # (8)
# 0 = successful connection
logger.info("Connected to MQTT Broker")
else:
# connack_string() gives us a user friendly string for a connection code.
logger.error("Failed to connect to MQTT Broker: " + mqtt.connack_string(connection_result_code))
# Subscribe to the topic for LED level changes.
client.subscribe(TOPIC, qos=2) # (9)
def on_disconnect(self,client, user_data, disconnection_result_code): # (10)
"""Called disconnects from MQTT Broker."""
logger.error("Disconnected from MQTT Broker")
def on_message(self,client, userdata, msg): # (11)
"""Callback called when a message is received on a subscribed topic."""
logger.debug("Received message for topic {}: {}".format( msg.topic, msg.payload))
data = None
try:
data = json.loads(msg.payload.decode("UTF-8")) # (12)
except json.JSONDecodeError as e:
logger.error("JSON Decode Error: " + msg.payload.decode("UTF-8"))
if msg.topic == TOPIC: # (13)
set_led_level(data) # (14)
else:
logger.error("Unhandled message topic {} with payload " + str(msg.topic, msg.payload))
def signal_handler(self,sig, frame):
"""Capture Control+C and disconnect from Broker."""
global led_state
logger.info("You pressed Control + C. Shutting down, please wait...")
client.disconnect() # Graceful disconnection.
led.off()
sys.exit(0)
def init_mqtt(self):
global client
# Our MQTT Client. See PAHO documentation for all configurable options.
# "clean_session=True" means we don"t want Broker to retain QoS 1 and 2 messages
# for us when we"re offline. You"ll see the "{"session present": 0}" logged when
# connected.
client = mqtt.Client( # (15)
client_id=CLIENT_ID,
clean_session=False)
# Route Paho logging to Python logging.
client.enable_logger() # (16)
# Setup callbacks
client.on_connect = on_connect # (17)
client.on_disconnect = on_disconnect
client.on_message = on_message
# Connect to Broker.
client.connect(BROKER_HOST, BROKER_PORT) # (18)
# Initialise Module
init_led()
init_mqtt()
client.loop_start() # (20)
signal.pause()