diff --git a/tests/test_config.py b/tests/test_config.py index 6a76ab1..c20b2b5 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -97,36 +97,24 @@ def test_drop_channels_none_means_all_channels(self): class TestLoadConfigDefaults: - def test_default_zerohop_enabled_true(self): - cfg = load_config(None) - assert cfg["zerohop_enabled"] is True + @pytest.mark.parametrize("key,expected", [ + ("zerohop_enabled", True), + ("drop_enabled", False), + ("drop_channels", INHERIT_ZEROHOP_CHANNELS), + ("drop_portnums", []), + ("grpc_port", 9000), + ("health_port", 8080), + ("log_format", "text"), + ("_drop_portnums_set", set()), + ]) + def test_default_value(self, key, expected): + assert load_config(None)[key] == expected def test_default_zerohop_channels_eight_presets(self): cfg = load_config(None) assert len(cfg["zerohop_channels"]) == 8 assert "LongFast" in cfg["zerohop_channels"] - def test_default_drop_disabled(self): - cfg = load_config(None) - assert cfg["drop_enabled"] is False - - def test_default_drop_channels_inherits(self): - cfg = load_config(None) - assert cfg["drop_channels"] == INHERIT_ZEROHOP_CHANNELS - - def test_default_drop_portnums_empty(self): - cfg = load_config(None) - assert cfg["drop_portnums"] == [] - - def test_default_grpc_port(self): - assert load_config(None)["grpc_port"] == 9000 - - def test_default_health_port(self): - assert load_config(None)["health_port"] == 8080 - - def test_default_log_format_is_text(self): - assert load_config(None)["log_format"] == "text" - def test_precomputed_zerohop_set_matches_list(self): cfg = load_config(None) assert cfg["_zerohop_channels_set"] == set(cfg["zerohop_channels"]) @@ -135,9 +123,6 @@ def test_precomputed_drop_set_inherits_zerohop_set(self): cfg = load_config(None) assert cfg["_drop_channels_set"] == cfg["_zerohop_channels_set"] - def test_precomputed_drop_portnums_empty_set(self): - assert load_config(None)["_drop_portnums_set"] == set() - class TestLoadConfigFromFile: diff --git a/tests/test_container_smoke.py b/tests/test_container_smoke.py index 06a881c..8318a77 100644 --- a/tests/test_container_smoke.py +++ b/tests/test_container_smoke.py @@ -10,14 +10,14 @@ What is tested: - The Dockerfile builds without error - The container starts and the health endpoint returns HTTP 200 - - The /health response body is valid JSON with {"status": "ok"} - - Graceful shutdown: the container stops cleanly within a timeout + - The container runs as a non-root user + +Health-response shape (JSON validity, stats keys, 404 on unknown paths) is +covered by test_health.py without a Docker rebuild. """ -import json import subprocess import time -import urllib.error import urllib.request import pytest @@ -103,28 +103,6 @@ def test_health_returns_200(self, running_container): status, _ = _wait_for_health(f"http://localhost:{HEALTH_PORT}/health") assert status == 200 - def test_health_body_is_valid_json(self, running_container): - _, body = _wait_for_health(f"http://localhost:{HEALTH_PORT}/health") - data = json.loads(body) - assert data["status"] == "ok" - - def test_health_body_has_stats(self, running_container): - _, body = _wait_for_health(f"http://localhost:{HEALTH_PORT}/health") - data = json.loads(body) - stats = data["stats"] - for key in ("zerohop", "passthru", "noop", "dropped", - "skipped", "errors", "total"): - assert key in stats, f"missing stats key: {key}" - - def test_unknown_path_returns_404(self, running_container): - try: - with urllib.request.urlopen( - f"http://localhost:{HEALTH_PORT}/notfound", timeout=5 - ) as resp: - pytest.fail(f"Expected 404 but got {resp.status}") - except urllib.error.HTTPError as exc: - assert exc.code == 404 - def test_container_is_running(self, running_container): result = _docker("inspect", "--format", "{{.State.Status}}", running_container) assert result.stdout.strip() == "running" diff --git a/tests/test_portnum.py b/tests/test_portnum.py index 82df5e2..0eff278 100644 --- a/tests/test_portnum.py +++ b/tests/test_portnum.py @@ -91,7 +91,7 @@ def meshtastic_pb2(): def _build_encrypted_envelope(mesh_pb2, mqtt_pb2, portnum, payload_bytes, packet_id=0xAABBCCDD, from_node=0x12345678, - channel_name="LongFast"): + channel_name="LongFast", hop_limit=0): """Construct a ServiceEnvelope whose inner MeshPacket has the named portnum encrypted under the default Meshtastic key.""" data = mesh_pb2.Data() @@ -104,6 +104,7 @@ def _build_encrypted_envelope(mesh_pb2, mqtt_pb2, portnum, payload_bytes, pkt.id = packet_id setattr(pkt, pkt.DESCRIPTOR.fields_by_name["from"].name, from_node) pkt.encrypted = ciphertext + pkt.hop_limit = hop_limit env = mqtt_pb2.ServiceEnvelope() env.packet.CopyFrom(pkt) diff --git a/tests/test_zerohop.py b/tests/test_zerohop.py index e93c609..55e72a5 100644 --- a/tests/test_zerohop.py +++ b/tests/test_zerohop.py @@ -11,13 +11,13 @@ ACTION_DROP, ACTION_MODIFY, ACTION_PASSTHRU, + AntifloodStats, _fmt_node, _peek_meta, parse_meshtastic_topic, process_message, zerohop_json, ) -from floodgate.zerohop import stats as packet_stats # Directory of real-world Meshtastic payloads captured from gateways. # Each JSON file is the raw form exactly as published on /json/ topics; each @@ -749,6 +749,54 @@ def test_real_world_payload_smoke(self, payload_path, caplog): "zerohop", "noop", "passthru", "warn", "dropped", ) + def test_synthetic_envelope_zerohop_round_trip(self, caplog): + """A real encrypted /e/ ServiceEnvelope built with hop_limit=3 flows + through process_message, returns ACTION_MODIFY with hop_limit zeroed, + and the inner Data still decrypts and parses cleanly.""" + from meshtastic import mesh_pb2, mqtt_pb2, portnums_pb2 + + from floodgate.decrypt import decrypt as floodgate_decrypt + from tests.test_portnum import _build_encrypted_envelope + + packet_id = 0xA1B2C3D4 + from_node = 0x12345678 + envelope_bytes = _build_encrypted_envelope( + mesh_pb2, mqtt_pb2, + portnum=portnums_pb2.PortNum.TEXT_MESSAGE_APP, + payload_bytes=b"hello", + packet_id=packet_id, + from_node=from_node, + channel_name="LongFast", + hop_limit=3, + ) + + config = _make_config(zerohop_channels=["LongFast"]) + with caplog.at_level(logging.INFO, logger="floodgate.zerohop"): + result = process_message( + "msh/US/2/e/LongFast/!00000000", envelope_bytes, config, + ) + + assert result.action == ACTION_MODIFY + assert result.payload is not None + + modified = mqtt_pb2.ServiceEnvelope() + modified.ParseFromString(result.payload) + assert modified.packet.hop_limit == 0 + assert modified.packet.id == packet_id + assert modified.channel_id == "LongFast" + + plaintext = floodgate_decrypt( + modified.packet.encrypted, packet_id=packet_id, from_node=from_node, + ) + inner = mesh_pb2.Data() + inner.ParseFromString(plaintext) + assert inner.portnum == portnums_pb2.PortNum.TEXT_MESSAGE_APP + assert inner.payload == b"hello" + + outcome_records = [r for r in caplog.records if hasattr(r, "outcome")] + assert len(outcome_records) == 1 + assert getattr(outcome_records[0], "outcome") == "zerohop" + # --------------------------------------------------------------------------- # AntifloodStats — counter mechanics @@ -756,29 +804,28 @@ def test_real_world_payload_smoke(self, payload_path, caplog): class TestAntifloodStats: - def setup_method(self): - # Fresh counters per test — both rolling and lifetime - with packet_stats._lock: - for k in ("zerohop", "passthru", "noop", "dropped", "skipped", "errors"): - setattr(packet_stats, k, 0) - packet_stats._lifetime[k] = 0 + @pytest.fixture + def stats(self, monkeypatch): + fresh = AntifloodStats() + monkeypatch.setattr("floodgate.zerohop.stats", fresh) + return fresh - def test_inc_increments_both_views(self): - packet_stats.inc("dropped") - assert packet_stats.dropped == 1 - assert packet_stats.snapshot()["dropped"] == 1 + def test_inc_increments_both_views(self, stats): + stats.inc("dropped") + assert stats.dropped == 1 + assert stats.snapshot()["dropped"] == 1 - def test_reset_zeros_rolling_keeps_lifetime(self): + def test_reset_zeros_rolling_keeps_lifetime(self, stats): for _ in range(3): - packet_stats.inc("dropped") - snap = packet_stats.reset() + stats.inc("dropped") + snap = stats.reset() assert snap["dropped"] == 3 - assert packet_stats.dropped == 0 - assert packet_stats.snapshot()["dropped"] == 3 - - def test_total_includes_dropped(self): - packet_stats.inc("zerohop") - packet_stats.inc("dropped") - packet_stats.inc("dropped") - snap = packet_stats.snapshot() + assert stats.dropped == 0 + assert stats.snapshot()["dropped"] == 3 + + def test_total_includes_dropped(self, stats): + stats.inc("zerohop") + stats.inc("dropped") + stats.inc("dropped") + snap = stats.snapshot() assert snap["total"] == 3