diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 0000000..2748653 --- /dev/null +++ b/tests/test_main.py @@ -0,0 +1,133 @@ +import sys + +import pytest + +from modpoll import main + + +def test_mqtt_tls_cli_options_forwarded(monkeypatch): + captured = {} + + class FakeMqttHandler: + def __init__( + self, + name, + host, + port, + user, + password, + clientid, + qos, + subscribe_topics, + use_tls, + tls_version, + cacerts, + insecure, + mqtt_version, + log_level, + ): + captured["init"] = { + "name": name, + "host": host, + "port": port, + "user": user, + "password": password, + "clientid": clientid, + "qos": qos, + "subscribe_topics": subscribe_topics, + "use_tls": use_tls, + "tls_version": tls_version, + "cacerts": cacerts, + "insecure": insecure, + "mqtt_version": mqtt_version, + "log_level": log_level, + } + + def setup(self): + return True + + def connect(self): + return True + + def close(self): + captured["closed"] = True + + def receive(self): + return None, None + + def fake_setup_modbus_handlers(args, mqtt_handler): + captured["handler_instance"] = mqtt_handler + # Stop execution before entering the main polling loop + raise SystemExit(0) + + monkeypatch.setattr(main, "MqttHandler", FakeMqttHandler) + monkeypatch.setattr(main, "setup_modbus_handlers", fake_setup_modbus_handlers) + monkeypatch.setattr( + sys, + "argv", + [ + "modpoll", + "--config", + "dummy.csv", + "--tcp", + "127.0.0.1", + "--mqtt-host", + "broker.local", + "--mqtt-use-tls", + "--mqtt-cacerts", + "/tmp/ca.pem", + "--mqtt-tls-version", + "tlsv1.2", + "--mqtt-version", + "3.1.1", + "--mqtt-insecure", + ], + ) + + with pytest.raises(SystemExit) as excinfo: + main.app() + + assert excinfo.value.code == 0 + init_args = captured["init"] + assert init_args["use_tls"] is True + assert init_args["tls_version"] == "tlsv1.2" + assert init_args["cacerts"] == "/tmp/ca.pem" + assert init_args["insecure"] is True + assert init_args["mqtt_version"] == "3.1.1" + assert init_args["subscribe_topics"] == ["modpoll/+/set"] + + +def test_mqtt_setup_close_errors_are_suppressed(monkeypatch): + class FakeMqttHandler: + def __init__(self, *args, **kwargs): + pass + + def setup(self): + raise RuntimeError("setup failed") + + def connect(self): + return False + + def close(self): + raise RuntimeError("close exploded") + + monkeypatch.setattr(main, "MqttHandler", FakeMqttHandler) + monkeypatch.setattr(main, "setup_modbus_handlers", lambda args, mqtt_handler: []) + monkeypatch.setattr( + sys, + "argv", + [ + "modpoll", + "--config", + "dummy.csv", + "--tcp", + "127.0.0.1", + "--mqtt-host", + "broker.local", + ], + ) + + with pytest.raises(SystemExit) as excinfo: + main.app() + + assert excinfo.value.code == 1 diff --git a/tests/test_modbus_task.py b/tests/test_modbus_task.py index 5907ae7..d9074d3 100644 --- a/tests/test_modbus_task.py +++ b/tests/test_modbus_task.py @@ -1,8 +1,9 @@ -import pytest +import pytest # type: ignore from modpoll.arg_parser import get_parser from modpoll.modbus_task import setup_modbus_handlers +@pytest.mark.integration def test_modbus_task_modbus_setup(): parser = get_parser() args = parser.parse_args( diff --git a/tests/test_mqtt_task.py b/tests/test_mqtt_task.py index a184fb9..8ca084d 100644 --- a/tests/test_mqtt_task.py +++ b/tests/test_mqtt_task.py @@ -26,6 +26,160 @@ def test_mqtt_task_setup(): mqtt_handler.close() +def test_receive_empty_queue_returns_none(): + handler = MqttHandler( + name="test_mqtt", + host="broker.emqx.io", + port=1883, + user=None, + password=None, + clientid="test_client_13579", + qos=0, + ) + + topic, payload = handler.receive() + + assert topic is None + assert payload is None + + +def test_receive_returns_message_when_available(): + handler = MqttHandler( + name="test_mqtt", + host="broker.emqx.io", + port=1883, + user=None, + password=None, + clientid="test_client_13579", + qos=0, + ) + + class FakeQueue: + def get(self, block=False): + return ("some/topic", b"payload") + + handler.rx_queue = FakeQueue() + + topic, payload = handler.receive() + + assert topic == "some/topic" + assert payload == b"payload" + + +def test_publish_when_connected_calls_client_publish(): + handler = MqttHandler( + name="test_mqtt", + host="broker.emqx.io", + port=1883, + user=None, + password=None, + clientid="test_client_13579", + qos=1, + ) + + class StubClient: + def __init__(self): + self.calls = [] + + def is_connected(self): + return True + + def publish(self, topic, msg, qos, retain): + self.calls.append((topic, msg, qos, retain)) + + class PubInfo: + rc = 0 + + return PubInfo() + + stub = StubClient() + handler.mqtt_client = stub + + result = handler.publish("topic", "message", qos=2, retain=True) + + assert result.rc == 0 + assert stub.calls == [("topic", "message", 2, True)] + + +def test_setup_respects_tls_option(monkeypatch): + called = {} + handler = MqttHandler( + name="test_mqtt", + host="broker.emqx.io", + port=8883, + user=None, + password=None, + clientid="client", + qos=0, + subscribe_topics=["secure/topic"], + use_tls=True, + tls_version="tlsv1.2", + mqtt_version="3.1.1", + ) + + def fake_setup_tls(): + called["tls"] = True + + monkeypatch.setattr(handler, "_setup_tls", fake_setup_tls) + + assert handler.setup() + assert called.get("tls") is True + assert handler.mqtt_client is not None + + +def test_publish_skips_when_disconnected_qos0(monkeypatch): + handler = MqttHandler( + name="test_mqtt", + host="broker.emqx.io", + port=1883, + user=None, + password=None, + clientid="test_client_13579", + qos=0, + ) + + class StubClient: + def is_connected(self): + return False + + handler.mqtt_client = StubClient() + + result = handler.publish("topic", "msg") + + assert result is None + + +def test_publish_attempts_reconnect_when_disconnected_qos1(monkeypatch): + handler = MqttHandler( + name="test_mqtt", + host="broker.emqx.io", + port=1883, + user=None, + password=None, + clientid="test_client_13579", + qos=1, + ) + + class StubClient: + def is_connected(self): + return False + + handler.mqtt_client = StubClient() + + called = {} + + def fake_connect(): + called["connect"] = True + return False + + monkeypatch.setattr(handler, "connect", fake_connect) + + result = handler.publish("topic", "msg") + + assert result is None + assert called.get("connect") is True + + @pytest.mark.integration def test_mqtt_task_connect(): mqtt_handler = MqttHandler(