This repository was archived by the owner on Mar 3, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
executable file
·79 lines (61 loc) · 2.21 KB
/
main.py
File metadata and controls
executable file
·79 lines (61 loc) · 2.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#!/usr/bin/python3
"""Generic sensor script."""
from typing import Optional, Tuple
from hasensor.configuration import Configuration
from hasensor.loop import Loop
from hasensor.event import RepeatingEvent, NOW
from hasensor.sensor import Sensor
from hasensor.registry import register_sensor_type, create_sensor
from hasensor.sensors.system import SystemSensor
from hasensor.sensors.announcer import Announcer
from hasensor.sensors.rtlamr import RTLAMRSensor
try:
from hasensor.sensors.bme280 import BME280Sensor
from hasensor.sensors.am2320 import AM2320Sensor
_HAVE_BOARD = True
except NotImplementedError:
_HAVE_BOARD = False
try:
from hasensor.sensors.rpigpio import RPiGPIOSensor
_HAVE_PI = True
except ModuleNotFoundError:
_HAVE_PI = False
def _send_discovery(args: Optional[Tuple[Loop, Configuration]]) -> None:
if args is None:
return
loop, conf = args
disc_prefix = conf.discovery_prefix
prefix = conf.prefix
if conf.discovery_interval != 0:
alive_prefix = "%s/binary_sensor/%sOnline/config" \
% (disc_prefix, conf.discovery_node)
loop.publish_raw(alive_prefix,
'{"state_topic":"%s/state","name":"%s Online"}'
% (prefix, conf.discovery_node))
class _DiscoveryEvent(RepeatingEvent):
def __init__(self, conf: Configuration):
super().__init__(NOW, conf.discovery_interval, _send_discovery, conf)
# Cheat a little
if conf.discovery_interval == 0:
self.repeats = False
def _main():
register_sensor_type("system", SystemSensor)
register_sensor_type("announcer", Announcer)
register_sensor_type("rtlamr", RTLAMRSensor)
if _HAVE_BOARD:
register_sensor_type("bme280", BME280Sensor)
register_sensor_type("am2320", AM2320Sensor)
if _HAVE_PI:
register_sensor_type("gpio", RPiGPIOSensor)
conf = Configuration()
conf.parse_args()
loop = Loop(conf)
if conf.discoverable:
loop.schedule(_DiscoveryEvent(conf))
for desc in conf.sensors:
sensor: Sensor = create_sensor(desc)
sensor.set_loop(loop)
loop.schedule(sensor.event())
loop.loop()
if __name__ == "__main__":
_main()