-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
111 lines (84 loc) · 3.24 KB
/
main.py
File metadata and controls
111 lines (84 loc) · 3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""Remote Commands Handler.
This module provides a remote commands handler that listens to MQTT messages and writes
the received data to Modbus coils and holding registers based on the specified configuration.
The remote commands handler connects to an MQTT broker and a Modbus server, and subscribes
to a specific MQTT topic for receiving commands. It retrieves the configuration from a YAML
file and allows overriding certain configuration parameters through command-line arguments.
Example:
Run the remote commands handler:
```
python remote_commands_handler.py --configuration_path config/configuration.yaml
```
Note:
This module requires the `paho-mqtt` and `pymodbus` packages to be installed.
"""
import logging
import os
import signal
import sys
import paho.mqtt.client as mqtt
from pymodbus.client import ModbusTcpClient
from app.error_handler import ErrorHandler
from app.modbus_client import ModbusClient
from app.mqtt_reader import MqttReader
from app.configuration import Configuration
from app.exceptions import (
ConfigurationFileNotFoundError,
ConfigurationFileInvalidError,
)
from app.remote_command_handler import RemoteCommandHandler
def setup_error_handler(configuration: Configuration) -> ErrorHandler:
return ErrorHandler(configuration, mqtt.Client(mqtt.CallbackAPIVersion.VERSION2))
def setup_modbus_client(
configuration: Configuration, error_handler: ErrorHandler
) -> ModbusClient:
return ModbusClient(
configuration,
ModbusTcpClient(
configuration.get_modbus_settings().host,
port=configuration.get_modbus_settings().port,
),
error_handler,
)
def setup_mqtt_client(
configuration: Configuration, error_handler: ErrorHandler
) -> MqttReader:
return MqttReader(
configuration,
mqtt.Client(mqtt.CallbackAPIVersion.VERSION2),
error_handler,
)
def main():
loglevel = os.getenv("LOGLEVEL", "INFO").upper()
logging.basicConfig(
level=getattr(logging, loglevel),
format="%(asctime)s:%(levelname)s:%(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
handler = RemoteCommandHandler()
args = handler.parse_arguments(sys.argv[1:])
try:
configuration = handler.get_configuration_with_overrides(args)
except (ConfigurationFileNotFoundError, ConfigurationFileInvalidError) as ex:
logging.info(ex)
logging.error("Error retrieving configuration, exiting")
sys.exit(1)
logging.info(
f"Starting service at {configuration.get_site_settings().site_name}"
f"/{configuration.get_site_settings().serial_number}"
)
error_handler = setup_error_handler(configuration)
modbus_client = setup_modbus_client(configuration, error_handler)
mqtt_reader = setup_mqtt_client(configuration, error_handler)
def write_to_modbus(message):
modbus_client.write_command(message)
mqtt_reader.add_message_callback(write_to_modbus)
def signal_handler(signum, _):
logging.info(f"Received signal {signum}, shutting down...")
mqtt_reader.stop()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
mqtt_reader.run()
if __name__ == "__main__":
main()