Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 12 additions & 27 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,36 +97,24 @@ def test_drop_channels_none_means_all_channels(self):

class TestLoadConfigDefaults:

def test_default_zerohop_enabled_true(self):
cfg = load_config(None)
assert cfg["zerohop_enabled"] is True
@pytest.mark.parametrize("key,expected", [
("zerohop_enabled", True),
("drop_enabled", False),
("drop_channels", INHERIT_ZEROHOP_CHANNELS),
("drop_portnums", []),
("grpc_port", 9000),
("health_port", 8080),
("log_format", "text"),
("_drop_portnums_set", set()),
])
def test_default_value(self, key, expected):
assert load_config(None)[key] == expected

def test_default_zerohop_channels_eight_presets(self):
cfg = load_config(None)
assert len(cfg["zerohop_channels"]) == 8
assert "LongFast" in cfg["zerohop_channels"]

def test_default_drop_disabled(self):
cfg = load_config(None)
assert cfg["drop_enabled"] is False

def test_default_drop_channels_inherits(self):
cfg = load_config(None)
assert cfg["drop_channels"] == INHERIT_ZEROHOP_CHANNELS

def test_default_drop_portnums_empty(self):
cfg = load_config(None)
assert cfg["drop_portnums"] == []

def test_default_grpc_port(self):
assert load_config(None)["grpc_port"] == 9000

def test_default_health_port(self):
assert load_config(None)["health_port"] == 8080

def test_default_log_format_is_text(self):
assert load_config(None)["log_format"] == "text"

def test_precomputed_zerohop_set_matches_list(self):
cfg = load_config(None)
assert cfg["_zerohop_channels_set"] == set(cfg["zerohop_channels"])
Expand All @@ -135,9 +123,6 @@ def test_precomputed_drop_set_inherits_zerohop_set(self):
cfg = load_config(None)
assert cfg["_drop_channels_set"] == cfg["_zerohop_channels_set"]

def test_precomputed_drop_portnums_empty_set(self):
assert load_config(None)["_drop_portnums_set"] == set()


class TestLoadConfigFromFile:

Expand Down
30 changes: 4 additions & 26 deletions tests/test_container_smoke.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
What is tested:
- The Dockerfile builds without error
- The container starts and the health endpoint returns HTTP 200
- The /health response body is valid JSON with {"status": "ok"}
- Graceful shutdown: the container stops cleanly within a timeout
- The container runs as a non-root user

Health-response shape (JSON validity, stats keys, 404 on unknown paths) is
covered by test_health.py without a Docker rebuild.
"""

import json
import subprocess
import time
import urllib.error
import urllib.request

import pytest
Expand Down Expand Up @@ -103,28 +103,6 @@ def test_health_returns_200(self, running_container):
status, _ = _wait_for_health(f"http://localhost:{HEALTH_PORT}/health")
assert status == 200

def test_health_body_is_valid_json(self, running_container):
_, body = _wait_for_health(f"http://localhost:{HEALTH_PORT}/health")
data = json.loads(body)
assert data["status"] == "ok"

def test_health_body_has_stats(self, running_container):
_, body = _wait_for_health(f"http://localhost:{HEALTH_PORT}/health")
data = json.loads(body)
stats = data["stats"]
for key in ("zerohop", "passthru", "noop", "dropped",
"skipped", "errors", "total"):
assert key in stats, f"missing stats key: {key}"

def test_unknown_path_returns_404(self, running_container):
try:
with urllib.request.urlopen(
f"http://localhost:{HEALTH_PORT}/notfound", timeout=5
) as resp:
pytest.fail(f"Expected 404 but got {resp.status}")
except urllib.error.HTTPError as exc:
assert exc.code == 404

def test_container_is_running(self, running_container):
result = _docker("inspect", "--format", "{{.State.Status}}", running_container)
assert result.stdout.strip() == "running"
Expand Down
3 changes: 2 additions & 1 deletion tests/test_portnum.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def meshtastic_pb2():

def _build_encrypted_envelope(mesh_pb2, mqtt_pb2, portnum, payload_bytes,
packet_id=0xAABBCCDD, from_node=0x12345678,
channel_name="LongFast"):
channel_name="LongFast", hop_limit=0):
"""Construct a ServiceEnvelope whose inner MeshPacket has the named
portnum encrypted under the default Meshtastic key."""
data = mesh_pb2.Data()
Expand All @@ -104,6 +104,7 @@ def _build_encrypted_envelope(mesh_pb2, mqtt_pb2, portnum, payload_bytes,
pkt.id = packet_id
setattr(pkt, pkt.DESCRIPTOR.fields_by_name["from"].name, from_node)
pkt.encrypted = ciphertext
pkt.hop_limit = hop_limit

env = mqtt_pb2.ServiceEnvelope()
env.packet.CopyFrom(pkt)
Expand Down
91 changes: 69 additions & 22 deletions tests/test_zerohop.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
ACTION_DROP,
ACTION_MODIFY,
ACTION_PASSTHRU,
AntifloodStats,
_fmt_node,
_peek_meta,
parse_meshtastic_topic,
process_message,
zerohop_json,
)
from floodgate.zerohop import stats as packet_stats

# Directory of real-world Meshtastic payloads captured from gateways.
# Each JSON file is the raw form exactly as published on /json/ topics; each
Expand Down Expand Up @@ -749,36 +749,83 @@ def test_real_world_payload_smoke(self, payload_path, caplog):
"zerohop", "noop", "passthru", "warn", "dropped",
)

def test_synthetic_envelope_zerohop_round_trip(self, caplog):
"""A real encrypted /e/ ServiceEnvelope built with hop_limit=3 flows
through process_message, returns ACTION_MODIFY with hop_limit zeroed,
and the inner Data still decrypts and parses cleanly."""
from meshtastic import mesh_pb2, mqtt_pb2, portnums_pb2

from floodgate.decrypt import decrypt as floodgate_decrypt
from tests.test_portnum import _build_encrypted_envelope

packet_id = 0xA1B2C3D4
from_node = 0x12345678
envelope_bytes = _build_encrypted_envelope(
mesh_pb2, mqtt_pb2,
portnum=portnums_pb2.PortNum.TEXT_MESSAGE_APP,
payload_bytes=b"hello",
packet_id=packet_id,
from_node=from_node,
channel_name="LongFast",
hop_limit=3,
)

config = _make_config(zerohop_channels=["LongFast"])
with caplog.at_level(logging.INFO, logger="floodgate.zerohop"):
result = process_message(
"msh/US/2/e/LongFast/!00000000", envelope_bytes, config,
)

assert result.action == ACTION_MODIFY
assert result.payload is not None

modified = mqtt_pb2.ServiceEnvelope()
modified.ParseFromString(result.payload)
assert modified.packet.hop_limit == 0
assert modified.packet.id == packet_id
assert modified.channel_id == "LongFast"

plaintext = floodgate_decrypt(
modified.packet.encrypted, packet_id=packet_id, from_node=from_node,
)
inner = mesh_pb2.Data()
inner.ParseFromString(plaintext)
assert inner.portnum == portnums_pb2.PortNum.TEXT_MESSAGE_APP
assert inner.payload == b"hello"

outcome_records = [r for r in caplog.records if hasattr(r, "outcome")]
assert len(outcome_records) == 1
assert getattr(outcome_records[0], "outcome") == "zerohop"


# ---------------------------------------------------------------------------
# AntifloodStats — counter mechanics
# ---------------------------------------------------------------------------

class TestAntifloodStats:

def setup_method(self):
# Fresh counters per test — both rolling and lifetime
with packet_stats._lock:
for k in ("zerohop", "passthru", "noop", "dropped", "skipped", "errors"):
setattr(packet_stats, k, 0)
packet_stats._lifetime[k] = 0
@pytest.fixture
def stats(self, monkeypatch):
fresh = AntifloodStats()
monkeypatch.setattr("floodgate.zerohop.stats", fresh)
return fresh

def test_inc_increments_both_views(self):
packet_stats.inc("dropped")
assert packet_stats.dropped == 1
assert packet_stats.snapshot()["dropped"] == 1
def test_inc_increments_both_views(self, stats):
stats.inc("dropped")
assert stats.dropped == 1
assert stats.snapshot()["dropped"] == 1

def test_reset_zeros_rolling_keeps_lifetime(self):
def test_reset_zeros_rolling_keeps_lifetime(self, stats):
for _ in range(3):
packet_stats.inc("dropped")
snap = packet_stats.reset()
stats.inc("dropped")
snap = stats.reset()
assert snap["dropped"] == 3
assert packet_stats.dropped == 0
assert packet_stats.snapshot()["dropped"] == 3

def test_total_includes_dropped(self):
packet_stats.inc("zerohop")
packet_stats.inc("dropped")
packet_stats.inc("dropped")
snap = packet_stats.snapshot()
assert stats.dropped == 0
assert stats.snapshot()["dropped"] == 3

def test_total_includes_dropped(self, stats):
stats.inc("zerohop")
stats.inc("dropped")
stats.inc("dropped")
snap = stats.snapshot()
assert snap["total"] == 3
Loading