From a2aa3f9f7c94c92b7951bfff8d4882c4008cf909 Mon Sep 17 00:00:00 2001 From: Eric Becker Date: Sat, 2 May 2026 07:48:13 +0000 Subject: [PATCH 1/4] test: add real-protobuf zerohop round-trip through process_message The TestProcessMessageUnmockedProtobuf suite previously covered only the anonymized-fixture smoke path (envelopes whose ciphertext no longer decrypts). Add a synthetic-envelope test that builds a real encrypted ServiceEnvelope with hop_limit=3, drives it through process_message, and verifies the returned ACTION_MODIFY payload re-parses with hop_limit=0 and an inner Data message that still decrypts cleanly. Extends the shared _build_encrypted_envelope helper in test_portnum.py with a hop_limit kwarg so both portnum-extraction and zerohop tests can construct envelopes with the field set. Refs #37. --- tests/test_portnum.py | 3 ++- tests/test_zerohop.py | 48 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/tests/test_portnum.py b/tests/test_portnum.py index 82df5e2..0eff278 100644 --- a/tests/test_portnum.py +++ b/tests/test_portnum.py @@ -91,7 +91,7 @@ def meshtastic_pb2(): def _build_encrypted_envelope(mesh_pb2, mqtt_pb2, portnum, payload_bytes, packet_id=0xAABBCCDD, from_node=0x12345678, - channel_name="LongFast"): + channel_name="LongFast", hop_limit=0): """Construct a ServiceEnvelope whose inner MeshPacket has the named portnum encrypted under the default Meshtastic key.""" data = mesh_pb2.Data() @@ -104,6 +104,7 @@ def _build_encrypted_envelope(mesh_pb2, mqtt_pb2, portnum, payload_bytes, pkt.id = packet_id setattr(pkt, pkt.DESCRIPTOR.fields_by_name["from"].name, from_node) pkt.encrypted = ciphertext + pkt.hop_limit = hop_limit env = mqtt_pb2.ServiceEnvelope() env.packet.CopyFrom(pkt) diff --git a/tests/test_zerohop.py b/tests/test_zerohop.py index e93c609..9b214d0 100644 --- a/tests/test_zerohop.py +++ b/tests/test_zerohop.py @@ -749,6 +749,54 @@ def test_real_world_payload_smoke(self, payload_path, caplog): "zerohop", "noop", "passthru", "warn", "dropped", ) + def test_synthetic_envelope_zerohop_round_trip(self, caplog): + """A real encrypted /e/ ServiceEnvelope built with hop_limit=3 flows + through process_message, returns ACTION_MODIFY with hop_limit zeroed, + and the inner Data still decrypts and parses cleanly.""" + from meshtastic import mesh_pb2, mqtt_pb2, portnums_pb2 + + from floodgate.decrypt import decrypt as floodgate_decrypt + from tests.test_portnum import _build_encrypted_envelope + + packet_id = 0xA1B2C3D4 + from_node = 0x12345678 + envelope_bytes = _build_encrypted_envelope( + mesh_pb2, mqtt_pb2, + portnum=portnums_pb2.PortNum.TEXT_MESSAGE_APP, + payload_bytes=b"hello", + packet_id=packet_id, + from_node=from_node, + channel_name="LongFast", + hop_limit=3, + ) + + config = _make_config(zerohop_channels=["LongFast"]) + with caplog.at_level(logging.INFO, logger="floodgate.zerohop"): + result = process_message( + "msh/US/2/e/LongFast/!00000000", envelope_bytes, config, + ) + + assert result.action == ACTION_MODIFY + assert result.payload is not None + + modified = mqtt_pb2.ServiceEnvelope() + modified.ParseFromString(result.payload) + assert modified.packet.hop_limit == 0 + assert modified.packet.id == packet_id + assert modified.channel_id == "LongFast" + + plaintext = floodgate_decrypt( + modified.packet.encrypted, packet_id=packet_id, from_node=from_node, + ) + inner = mesh_pb2.Data() + inner.ParseFromString(plaintext) + assert inner.portnum == portnums_pb2.PortNum.TEXT_MESSAGE_APP + assert inner.payload == b"hello" + + outcome_records = [r for r in caplog.records if hasattr(r, "outcome")] + assert len(outcome_records) == 1 + assert getattr(outcome_records[0], "outcome") == "zerohop" + # --------------------------------------------------------------------------- # AntifloodStats — counter mechanics From ae7e1e24bf155adaaa51fe386bc980d028481c26 Mon Sep 17 00:00:00 2001 From: Eric Becker Date: Sat, 2 May 2026 08:03:39 +0000 Subject: [PATCH 2/4] test: isolate AntifloodStats tests with a per-test stats fixture The previous setup_method reset the live module-level floodgate.zerohop.stats instance in place, which is safe under sequential pytest but would race under pytest-xdist. Replace it with a fixture that constructs a fresh AntifloodStats and monkeypatches it onto the module for the lifetime of each test, then yields it for the test to operate on directly. Refs #37. --- tests/test_zerohop.py | 43 +++++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/tests/test_zerohop.py b/tests/test_zerohop.py index 9b214d0..55e72a5 100644 --- a/tests/test_zerohop.py +++ b/tests/test_zerohop.py @@ -11,13 +11,13 @@ ACTION_DROP, ACTION_MODIFY, ACTION_PASSTHRU, + AntifloodStats, _fmt_node, _peek_meta, parse_meshtastic_topic, process_message, zerohop_json, ) -from floodgate.zerohop import stats as packet_stats # Directory of real-world Meshtastic payloads captured from gateways. # Each JSON file is the raw form exactly as published on /json/ topics; each @@ -804,29 +804,28 @@ def test_synthetic_envelope_zerohop_round_trip(self, caplog): class TestAntifloodStats: - def setup_method(self): - # Fresh counters per test — both rolling and lifetime - with packet_stats._lock: - for k in ("zerohop", "passthru", "noop", "dropped", "skipped", "errors"): - setattr(packet_stats, k, 0) - packet_stats._lifetime[k] = 0 + @pytest.fixture + def stats(self, monkeypatch): + fresh = AntifloodStats() + monkeypatch.setattr("floodgate.zerohop.stats", fresh) + return fresh - def test_inc_increments_both_views(self): - packet_stats.inc("dropped") - assert packet_stats.dropped == 1 - assert packet_stats.snapshot()["dropped"] == 1 + def test_inc_increments_both_views(self, stats): + stats.inc("dropped") + assert stats.dropped == 1 + assert stats.snapshot()["dropped"] == 1 - def test_reset_zeros_rolling_keeps_lifetime(self): + def test_reset_zeros_rolling_keeps_lifetime(self, stats): for _ in range(3): - packet_stats.inc("dropped") - snap = packet_stats.reset() + stats.inc("dropped") + snap = stats.reset() assert snap["dropped"] == 3 - assert packet_stats.dropped == 0 - assert packet_stats.snapshot()["dropped"] == 3 - - def test_total_includes_dropped(self): - packet_stats.inc("zerohop") - packet_stats.inc("dropped") - packet_stats.inc("dropped") - snap = packet_stats.snapshot() + assert stats.dropped == 0 + assert stats.snapshot()["dropped"] == 3 + + def test_total_includes_dropped(self, stats): + stats.inc("zerohop") + stats.inc("dropped") + stats.inc("dropped") + snap = stats.snapshot() assert snap["total"] == 3 From 1a65ab0ae080df991c86b20e2d31bf3de08592ff Mon Sep 17 00:00:00 2001 From: Eric Becker Date: Sat, 2 May 2026 08:04:38 +0000 Subject: [PATCH 3/4] test: drop duplicated health-shape assertions from container smoke test_health_body_is_valid_json, test_health_body_has_stats, and test_unknown_path_returns_404 are already covered by test_health.py against an in-process health server. Running them again under the smoke marker forces a Docker rebuild for assertions that don't need the container, costing ~30s on every smoke job. Smoke is now scoped to Docker-specific facts only: image builds, /health responds, and the container runs as a non-root user. Refs #37. --- tests/test_container_smoke.py | 30 ++++-------------------------- 1 file changed, 4 insertions(+), 26 deletions(-) diff --git a/tests/test_container_smoke.py b/tests/test_container_smoke.py index 06a881c..8318a77 100644 --- a/tests/test_container_smoke.py +++ b/tests/test_container_smoke.py @@ -10,14 +10,14 @@ What is tested: - The Dockerfile builds without error - The container starts and the health endpoint returns HTTP 200 - - The /health response body is valid JSON with {"status": "ok"} - - Graceful shutdown: the container stops cleanly within a timeout + - The container runs as a non-root user + +Health-response shape (JSON validity, stats keys, 404 on unknown paths) is +covered by test_health.py without a Docker rebuild. """ -import json import subprocess import time -import urllib.error import urllib.request import pytest @@ -103,28 +103,6 @@ def test_health_returns_200(self, running_container): status, _ = _wait_for_health(f"http://localhost:{HEALTH_PORT}/health") assert status == 200 - def test_health_body_is_valid_json(self, running_container): - _, body = _wait_for_health(f"http://localhost:{HEALTH_PORT}/health") - data = json.loads(body) - assert data["status"] == "ok" - - def test_health_body_has_stats(self, running_container): - _, body = _wait_for_health(f"http://localhost:{HEALTH_PORT}/health") - data = json.loads(body) - stats = data["stats"] - for key in ("zerohop", "passthru", "noop", "dropped", - "skipped", "errors", "total"): - assert key in stats, f"missing stats key: {key}" - - def test_unknown_path_returns_404(self, running_container): - try: - with urllib.request.urlopen( - f"http://localhost:{HEALTH_PORT}/notfound", timeout=5 - ) as resp: - pytest.fail(f"Expected 404 but got {resp.status}") - except urllib.error.HTTPError as exc: - assert exc.code == 404 - def test_container_is_running(self, running_container): result = _docker("inspect", "--format", "{{.State.Status}}", running_container) assert result.stdout.strip() == "running" From baeda221b92b3f8df3efa992f5d6ae47d60f81ba Mon Sep 17 00:00:00 2001 From: Eric Becker Date: Sat, 2 May 2026 08:05:33 +0000 Subject: [PATCH 4/4] test: collapse single-default-value config cases into parametrize Eight TestLoadConfigDefaults methods asserted "the default value of key X is Y" individually. Collapse them into one parametrized test_default_value, leaving the cases that genuinely diverge from the key->value pattern (eight-preset assertion, derived-set assertions) as their own methods. Refs #37. --- tests/test_config.py | 39 ++++++++++++--------------------------- 1 file changed, 12 insertions(+), 27 deletions(-) diff --git a/tests/test_config.py b/tests/test_config.py index 6a76ab1..c20b2b5 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -97,36 +97,24 @@ def test_drop_channels_none_means_all_channels(self): class TestLoadConfigDefaults: - def test_default_zerohop_enabled_true(self): - cfg = load_config(None) - assert cfg["zerohop_enabled"] is True + @pytest.mark.parametrize("key,expected", [ + ("zerohop_enabled", True), + ("drop_enabled", False), + ("drop_channels", INHERIT_ZEROHOP_CHANNELS), + ("drop_portnums", []), + ("grpc_port", 9000), + ("health_port", 8080), + ("log_format", "text"), + ("_drop_portnums_set", set()), + ]) + def test_default_value(self, key, expected): + assert load_config(None)[key] == expected def test_default_zerohop_channels_eight_presets(self): cfg = load_config(None) assert len(cfg["zerohop_channels"]) == 8 assert "LongFast" in cfg["zerohop_channels"] - def test_default_drop_disabled(self): - cfg = load_config(None) - assert cfg["drop_enabled"] is False - - def test_default_drop_channels_inherits(self): - cfg = load_config(None) - assert cfg["drop_channels"] == INHERIT_ZEROHOP_CHANNELS - - def test_default_drop_portnums_empty(self): - cfg = load_config(None) - assert cfg["drop_portnums"] == [] - - def test_default_grpc_port(self): - assert load_config(None)["grpc_port"] == 9000 - - def test_default_health_port(self): - assert load_config(None)["health_port"] == 8080 - - def test_default_log_format_is_text(self): - assert load_config(None)["log_format"] == "text" - def test_precomputed_zerohop_set_matches_list(self): cfg = load_config(None) assert cfg["_zerohop_channels_set"] == set(cfg["zerohop_channels"]) @@ -135,9 +123,6 @@ def test_precomputed_drop_set_inherits_zerohop_set(self): cfg = load_config(None) assert cfg["_drop_channels_set"] == cfg["_zerohop_channels_set"] - def test_precomputed_drop_portnums_empty_set(self): - assert load_config(None)["_drop_portnums_set"] == set() - class TestLoadConfigFromFile: