diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2c1abb7..99551ee 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,6 +57,7 @@ jobs: run: | pytest tests/ -q \ --ignore=tests/test_container_smoke.py \ + --ignore=tests/integration \ --cov=src/floodgate \ --cov-report=term-missing \ --cov-report=xml @@ -91,6 +92,31 @@ jobs: - name: Run container smoke tests run: pytest tests/test_container_smoke.py -m smoke -v + # --------------------------------------------------------------------------- + # Integration test — full Docker Compose stack: emqx + floodgate + meshtasticd + # End-to-end validation of drop / zerohop / passthru / noop / custom-key / + # meshtasticd round-trip via subscriber capture and /health stats. + # --------------------------------------------------------------------------- + integration: + name: Integration test (compose stack) + runs-on: ubuntu-latest + needs: smoke + timeout-minutes: 15 + + steps: + - uses: actions/checkout@v4 + + - name: Run integration harness + run: ./scripts/run-integration.sh + + - name: Dump compose logs on failure + if: failure() + run: docker compose -f docker-compose.test.yaml logs --no-color || true + + - name: Always clean up + if: always() + run: docker compose -f docker-compose.test.yaml down -v --remove-orphans || true + # --------------------------------------------------------------------------- # Manifest validation — verify k8s YAML is valid against the k8s schema # --------------------------------------------------------------------------- diff --git a/CLAUDE.md b/CLAUDE.md index e18422b..501edd7 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -24,6 +24,9 @@ Gateway → EMQX → [ExHook gRPC] → floodgate → drop / modify / passthru | `src/floodgate/health.py` | HTTP health check server on `health_port`. | | `src/floodgate/__main__.py` | CLI entry point. | | `proto/emqx/exhook.proto` | EMQX ExHook interface definition. | +| `docker-compose.test.yaml` | Integration test stack (emqx, floodgate, exhook-init, test-driver) on an isolated bridge network. | +| `scripts/run-integration.sh` | Integration harness orchestrator — `--keep` leaves the stack up, `--teardown` removes it. | +| `tests/integration/` | Integration test assets: floodgate config, ExHook init container, test-driver image + cases. | ## Dev Setup @@ -35,6 +38,7 @@ cd floodgate pip install -e ".[dev]" pytest tests/ --ignore=tests/test_container_smoke.py -q # no Docker required pytest tests/ -q # full suite including container smoke test (requires Docker) +./scripts/run-integration.sh # full Docker Compose end-to-end test (requires Docker) ``` Routing-logic tests mock the low-level zerohop functions, so the suite runs diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d6f5fd9..f2460d5 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -15,13 +15,14 @@ All PRs are squash-merged. One PR per feature or fix. ## CI jobs -Every PR and push to `main` runs four jobs in sequence: +Every PR and push to `main` runs five jobs in sequence: | Job | What it checks | |-----|----------------| | **lint** | `ruff` style and import checks | | **unit tests** | Pure Python tests across Python 3.11/3.12/3.13 — no external services needed. CI generates the Meshtastic protobuf stubs before running so the unmocked protobuf payload tests in `tests/payloads/protobuf/` are exercised. Mocked tests in the rest of the suite still run without protobufs (handy for fast local iteration). | | **container smoke** | Builds the Docker image, starts the container, and verifies `/health` returns `200 OK`. Catches Dockerfile bugs and runtime import errors that unit tests cannot. | +| **integration** | Brings up `docker-compose.test.yaml` (EMQX + floodgate + test-driver) and runs `drop` / `zerohop` / `passthru` / `noop` / `custom-key passthru` end-to-end. See "Integration testing" below. | | **manifest validation** | Validates `k8s/*.yaml` against the Kubernetes schema with `kubeconform`. | ### Running locally @@ -50,6 +51,29 @@ pytest tests/test_container_smoke.py -m smoke -v ruff check src/ tests/ ``` +### Integration testing + +The integration harness (`scripts/run-integration.sh`) brings up a full Docker Compose stack — EMQX + floodgate + an ExHook auto-registration container + a Python test-driver — on an isolated bridge network and runs end-to-end checks for `drop`, `zerohop`, `passthru`, `noop`, and `custom-key channel passthru`. The test-driver crafts real Meshtastic `ServiceEnvelope` protobufs (using the same `meshtastic` Python library the firmware uses internally), so each case exercises the exact wire format floodgate sees in production. + +Each case verifies BOTH what the subscriber received (delivered MQTT bytes) AND floodgate's `/health` stats — a behavior change with no stat increment, or a stat increment with no delivery effect, both fail the case. One PASS or FAIL line is printed per case. + +Requirements: `docker` and `bash`. The `pytest` suite never runs the harness — it's opt-in via the script. + +```bash +# One-shot verification — brings the stack up, runs cases, tears down, exits 0/non-zero. +./scripts/run-integration.sh + +# Ad-hoc poking — leave the stack running after the cases finish. +./scripts/run-integration.sh --keep +# floodgate /health: http://localhost:18089/health +# EMQX dashboard: http://localhost:18083 (admin / public) + +# Tear the stack down (and volumes/network) without running cases. +./scripts/run-integration.sh --teardown +``` + +CI runs the same script in the `integration` job after the `smoke` job passes. A failed case dumps service logs into the workflow output before tearing down. + ## Commit style Conventional commits: `feat:`, `fix:`, `docs:`, `test:`, `chore:` diff --git a/docker-compose.test.yaml b/docker-compose.test.yaml new file mode 100644 index 0000000..c715c6a --- /dev/null +++ b/docker-compose.test.yaml @@ -0,0 +1,78 @@ +# Integration test stack. Brought up by scripts/run-integration.sh. +# All inter-service traffic stays on the floodgate-test-net bridge. +# Only floodgate /health (8080->18089) and EMQX REST (18083) are exposed +# to the host so the runner can poll readiness — MQTT 1883 and gRPC 9000 +# are container-internal only. + +services: + emqx: + image: emqx/emqx:6.1.1 + container_name: floodgate-test-emqx + networks: [floodgate-test-net] + ports: + - "18083:18083" + environment: + EMQX_NAME: "emqx-test" + EMQX_DASHBOARD__DEFAULT_PASSWORD: "public" + healthcheck: + test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"] + interval: 5s + timeout: 10s + retries: 12 + start_period: 10s + + floodgate: + build: + context: . + dockerfile: Dockerfile + image: floodgate-test:ci + container_name: floodgate-test-floodgate + networks: [floodgate-test-net] + ports: + - "18089:8080" + environment: + FLOODGATE_CONFIG: /app/config.yaml + volumes: + - ./tests/integration/config.yaml:/app/config.yaml:ro + healthcheck: + test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8080/health')"] + interval: 5s + timeout: 5s + retries: 12 + start_period: 5s + + exhook-init: + build: + context: tests/integration/exhook-init + container_name: floodgate-test-exhook-init + networks: [floodgate-test-net] + depends_on: + emqx: + condition: service_healthy + floodgate: + condition: service_healthy + restart: "no" + + test-driver: + build: + context: tests/integration/test-driver + image: floodgate-test-driver:ci + container_name: floodgate-test-driver + networks: [floodgate-test-net] + depends_on: + emqx: + condition: service_healthy + floodgate: + condition: service_healthy + exhook-init: + condition: service_completed_successfully + profiles: ["driver"] + environment: + EMQX_HOST: "emqx" + EMQX_PORT: "1883" + FLOODGATE_HEALTH_URL: "http://floodgate:8080/health" + +networks: + floodgate-test-net: + name: floodgate-test-net + driver: bridge diff --git a/scripts/run-integration.sh b/scripts/run-integration.sh new file mode 100755 index 0000000..56b7292 --- /dev/null +++ b/scripts/run-integration.sh @@ -0,0 +1,105 @@ +#!/usr/bin/env bash +# Local + CI driver for the integration harness. +# +# Modes: +# (default) bring stack up, run all cases, tear stack down, exit 0/non-zero +# --keep bring stack up, run all cases, leave stack running for poking +# --teardown tear the stack down (and volumes/networks); skip running cases + +set -euo pipefail + +COMPOSE_FILE="docker-compose.test.yaml" +HEALTH_URL="http://localhost:18089/health" +EMQX_URL="http://localhost:18083/api/v5/status" + +usage() { + cat < Tearing down integration stack" + docker compose -f "$COMPOSE_FILE" down -v --remove-orphans +} + +if [ "$mode" = "teardown-only" ]; then + teardown + exit 0 +fi + +echo "==> Bringing up integration stack" +docker compose -f "$COMPOSE_FILE" up -d --build + +cleanup_on_error() { + rc=$? + if [ $rc -ne 0 ] && [ "$mode" != "run-and-keep" ]; then + echo "==> Run failed — collecting service logs before teardown" + docker compose -f "$COMPOSE_FILE" logs --no-color --tail=200 || true + teardown + fi + exit $rc +} +trap cleanup_on_error EXIT + +echo "==> Waiting for EMQX REST" +for i in $(seq 1 60); do + if curl -sf --connect-timeout 2 --max-time 5 -o /dev/null "$EMQX_URL"; then + echo " EMQX REST ready after ${i}s"; break + fi + sleep 1 + [ "$i" -eq 60 ] && { echo "EMQX REST never came up" >&2; exit 1; } +done + +echo "==> Waiting for floodgate /health" +for i in $(seq 1 60); do + if curl -sf --connect-timeout 2 --max-time 5 -o /dev/null "$HEALTH_URL"; then + echo " floodgate /health ready after ${i}s"; break + fi + sleep 1 + [ "$i" -eq 60 ] && { echo "floodgate /health never came up" >&2; exit 1; } +done + +echo "==> Running test-driver" +set +e +docker compose -f "$COMPOSE_FILE" run --rm test-driver +rc=$? +set -e + +echo "==> Test-driver exit code: $rc" + +if [ "$mode" = "run-and-keep" ]; then + echo "==> --keep: leaving stack running." + echo " floodgate /health: $HEALTH_URL" + echo " EMQX dashboard: http://localhost:18083 (admin/public)" + echo " Tear down with: $0 --teardown" + trap - EXIT + exit $rc +fi + +trap - EXIT + +if [ $rc -ne 0 ]; then + # Dump service logs BEFORE teardown on the explicit-failure path. Without + # this, the workflow's `if: failure()` log-dump step fires after teardown + # finishes — by then all containers are gone and nothing remains to log. + echo "==> Run failed — dumping service logs before teardown" + docker compose -f "$COMPOSE_FILE" logs --no-color --tail=400 || true +fi +teardown +exit $rc diff --git a/tests/integration/README.md b/tests/integration/README.md new file mode 100644 index 0000000..547a696 --- /dev/null +++ b/tests/integration/README.md @@ -0,0 +1,3 @@ +# Integration test harness + +End-to-end harness using Docker Compose. See `CONTRIBUTING.md` ("Integration testing") for usage. Top-level entry point: `scripts/run-integration.sh`. Stack defined in `docker-compose.test.yaml`. diff --git a/tests/integration/config.yaml b/tests/integration/config.yaml new file mode 100644 index 0000000..32ee29c --- /dev/null +++ b/tests/integration/config.yaml @@ -0,0 +1,23 @@ +zerohop_enabled: true +zerohop_channels: + - "LongTurbo" + - "LongFast" + - "LongModerate" + - "MediumFast" + - "MediumSlow" + - "ShortFast" + - "ShortSlow" + - "ShortTurbo" + +drop_enabled: true +drop_channels: "zerohop_channels" +drop_portnums: + - "RANGE_TEST_APP" + +grpc_port: 9000 +health_port: 8080 +topic_filter: "msh/#" +stats_interval_s: 10 +log_level: "INFO" +log_format: "text" +stats_log: true diff --git a/tests/integration/exhook-init/Dockerfile b/tests/integration/exhook-init/Dockerfile new file mode 100644 index 0000000..cbbebda --- /dev/null +++ b/tests/integration/exhook-init/Dockerfile @@ -0,0 +1,8 @@ +FROM alpine:3.20 + +RUN apk add --no-cache curl jq bash + +COPY register.sh /register.sh +RUN chmod +x /register.sh + +ENTRYPOINT ["/register.sh"] diff --git a/tests/integration/exhook-init/register.sh b/tests/integration/exhook-init/register.sh new file mode 100755 index 0000000..687488f --- /dev/null +++ b/tests/integration/exhook-init/register.sh @@ -0,0 +1,73 @@ +#!/usr/bin/env bash +# Register floodgate as an ExHook in EMQX once the broker REST API is up. +# Idempotent: if a hook named "floodgate" already exists, PUT to update it +# instead of POSTing a new one. + +set -euo pipefail + +EMQX_URL="${EMQX_URL:-http://emqx:18083}" +EMQX_USER="${EMQX_USER:-admin}" +EMQX_PASS="${EMQX_PASS:-public}" +HOOK_URL="${HOOK_URL:-http://floodgate:9000}" +HOOK_NAME="floodgate" + +echo "exhook-init: waiting for EMQX REST at ${EMQX_URL} ..." +for i in $(seq 1 60); do + if curl -sf --connect-timeout 2 --max-time 5 -o /dev/null "${EMQX_URL}/api/v5/status"; then + echo "exhook-init: EMQX REST is up after ${i}s" + break + fi + sleep 1 + [ "$i" -eq 60 ] && { echo "exhook-init: EMQX REST never came up" >&2; exit 1; } +done + +echo "exhook-init: logging in" +TOKEN=$(curl -sf --connect-timeout 2 --max-time 5 -X POST "${EMQX_URL}/api/v5/login" \ + -H 'Content-Type: application/json' \ + -d "{\"username\":\"${EMQX_USER}\",\"password\":\"${EMQX_PASS}\"}" \ + | jq -r .token) + +if [ -z "${TOKEN}" ] || [ "${TOKEN}" = "null" ]; then + echo "exhook-init: failed to obtain EMQX REST token" >&2 + exit 1 +fi + +BODY=$(cat </dev/null +else + echo "exhook-init: hook '${HOOK_NAME}' does not exist — creating" + curl -sf --connect-timeout 2 --max-time 5 -X POST "${EMQX_URL}/api/v5/exhooks" \ + -H "Authorization: Bearer ${TOKEN}" \ + -H 'Content-Type: application/json' \ + -d "${BODY}" >/dev/null +fi + +echo "exhook-init: verifying registration" +STATUS=$(curl -sf --connect-timeout 2 --max-time 5 "${EMQX_URL}/api/v5/exhooks/${HOOK_NAME}" \ + -H "Authorization: Bearer ${TOKEN}" | jq -r '.status // "unknown"') +echo "exhook-init: hook '${HOOK_NAME}' status=${STATUS}" + +if [ "${STATUS}" = "connected" ] || [ "${STATUS}" = "running" ]; then + echo "exhook-init: success" + exit 0 +fi + +echo "exhook-init: hook registered but status=${STATUS}; floodgate may still be starting." +echo "exhook-init: init container exits 0; floodgate->EMQX gRPC will recover on its own." +exit 0 diff --git a/tests/integration/test-driver/Dockerfile b/tests/integration/test-driver/Dockerfile new file mode 100644 index 0000000..ae3968b --- /dev/null +++ b/tests/integration/test-driver/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.13-slim + +WORKDIR /test-driver + +RUN pip install --no-cache-dir \ + "paho-mqtt>=2.0,<3" \ + "meshtastic>=2.5,<3" \ + "cryptography>=42" \ + "requests>=2.32" \ + "pyyaml>=6" + +COPY run.py /test-driver/run.py + +ENTRYPOINT ["python", "/test-driver/run.py"] diff --git a/tests/integration/test-driver/run.py b/tests/integration/test-driver/run.py new file mode 100644 index 0000000..f7d2e8a --- /dev/null +++ b/tests/integration/test-driver/run.py @@ -0,0 +1,396 @@ +"""Integration test driver. + +Runs each integration case sequentially. For every case: + 1. Crafts a Meshtastic ServiceEnvelope (or borrows one from meshtasticd). + 2. Publishes it via MQTT to a topic whose channel name fits the case. + 3. Waits briefly for floodgate to process and EMQX to deliver. + 4. Asserts on (a) what the bound subscriber received and (b) floodgate's + /health stats, then prints one 'PASS: ' or 'FAIL: : ' line. + +Exits 0 iff every case passed. +""" + +from __future__ import annotations + +import os +import sys +import threading +import time +from dataclasses import dataclass + +import paho.mqtt.client as mqtt +import requests +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from meshtastic import ( # noqa: F401 (portnums re-exported for cases) + mesh_pb2, + mqtt_pb2, + portnums_pb2, +) + +EMQX_HOST = os.environ.get("EMQX_HOST", "emqx") +EMQX_PORT = int(os.environ.get("EMQX_PORT", "1883")) +FLOODGATE_HEALTH = os.environ.get("FLOODGATE_HEALTH_URL", "http://floodgate:8080/health") + +# 16-byte AES-128 key derived from the default Meshtastic PSK ("AQ=="). +DEFAULT_KEY = bytes.fromhex("d4f1bb3a20290759f0bcffabcf4e6901") +# Arbitrary 16-byte key that floodgate does NOT have — used to simulate a +# custom-keyed channel where floodgate cannot decrypt the inner Data. +CUSTOM_KEY = bytes.fromhex("00112233445566778899aabbccddeeff") + +# Settle window between publish and assertion. EMQX + floodgate gRPC + EMQX +# delivery is normally <100ms on a local bridge; 1s gives plenty of margin +# without making the suite slow. +SETTLE_SECONDS = 1.0 + + +# --------------------------------------------------------------------------- +# Subscriber capture +# --------------------------------------------------------------------------- + +@dataclass +class Captured: + topic: str + payload: bytes + + +class Subscriber: + """Background paho-mqtt subscriber that records every message on msh/#.""" + + def __init__(self, host: str, port: int): + self._host = host + self._port = port + self._messages: list[Captured] = [] + self._lock = threading.Lock() + self._client = mqtt.Client( + mqtt.CallbackAPIVersion.VERSION2, + client_id="floodgate-test-driver-sub", + ) + self._client.on_message = self._on_message + + def _on_message(self, _client, _userdata, msg): + with self._lock: + self._messages.append(Captured(topic=msg.topic, payload=bytes(msg.payload))) + + def start(self): + self._client.connect(self._host, self._port, keepalive=30) + self._client.subscribe("msh/#", qos=0) + self._client.loop_start() + # Give EMQX a moment to register the subscription before we publish. + time.sleep(0.5) + + def stop(self): + self._client.loop_stop() + self._client.disconnect() + + def snapshot(self) -> list[Captured]: + with self._lock: + return list(self._messages) + + +# --------------------------------------------------------------------------- +# Publisher +# --------------------------------------------------------------------------- + +class Publisher: + def __init__(self, host: str, port: int): + self._client = mqtt.Client( + mqtt.CallbackAPIVersion.VERSION2, + client_id="floodgate-test-driver-pub", + ) + self._client.connect(host, port, keepalive=30) + self._client.loop_start() + + def publish(self, topic: str, payload: bytes): + info = self._client.publish(topic, payload=payload, qos=1) + info.wait_for_publish(timeout=5.0) + + def close(self): + self._client.loop_stop() + self._client.disconnect() + + +# --------------------------------------------------------------------------- +# Crypto / packet builders +# --------------------------------------------------------------------------- + +def _build_nonce(packet_id: int, from_node: int) -> bytes: + return ( + (packet_id & 0xFFFFFFFFFFFFFFFF).to_bytes(8, "little") + + (from_node & 0xFFFFFFFF).to_bytes(4, "little") + + b"\x00\x00\x00\x00" + ) + + +def encrypt(plaintext: bytes, key: bytes, packet_id: int, from_node: int) -> bytes: + nonce = _build_nonce(packet_id, from_node) + return Cipher(algorithms.AES(key), modes.CTR(nonce)).encryptor().update(plaintext) + + +def build_envelope( + *, + channel: str, + portnum: int, + payload: bytes, + packet_id: int, + from_node: int, + to_node: int = 0xFFFFFFFF, + hop_limit: int = 3, + hop_start: int = 3, + key: bytes = DEFAULT_KEY, +) -> bytes: + """Build a Meshtastic ServiceEnvelope wrapping an encrypted Data message.""" + data = mesh_pb2.Data() + data.portnum = portnum + data.payload = payload + encrypted = encrypt(data.SerializeToString(), key=key, + packet_id=packet_id, from_node=from_node) + + pkt = mesh_pb2.MeshPacket() + setattr(pkt, "from", from_node) # 'from' is a Python keyword + pkt.to = to_node + pkt.id = packet_id + pkt.hop_limit = hop_limit + pkt.hop_start = hop_start + pkt.encrypted = encrypted + + env = mqtt_pb2.ServiceEnvelope() + env.packet.CopyFrom(pkt) + env.channel_id = channel + env.gateway_id = "!00000001" + return env.SerializeToString() + + +def topic_for(channel: str, *, gateway: str = "!00000001") -> str: + return f"msh/US/2/e/{channel}/{gateway}" + + +# --------------------------------------------------------------------------- +# Health-stats reader +# --------------------------------------------------------------------------- + +def health_stats() -> dict: + resp = requests.get(FLOODGATE_HEALTH, timeout=5) + resp.raise_for_status() + return resp.json()["stats"] + + +# --------------------------------------------------------------------------- +# Envelope inspection helpers (used by every test case to read the delivered +# bytes back out of the subscriber's capture buffer). +# --------------------------------------------------------------------------- + +def _parse_hop_limit(payload: bytes) -> int | None: + """Return MeshPacket.hop_limit from a serialized ServiceEnvelope, or None.""" + try: + env = mqtt_pb2.ServiceEnvelope() + env.ParseFromString(payload) + return env.packet.hop_limit if env.HasField("packet") else None + except Exception: + return None + + +def _packet_id_of(payload: bytes) -> int | None: + try: + env = mqtt_pb2.ServiceEnvelope() + env.ParseFromString(payload) + return env.packet.id if env.HasField("packet") else None + except Exception: + return None + + +# --------------------------------------------------------------------------- +# Test-case orchestration scaffold (cases filled in later tasks) +# --------------------------------------------------------------------------- + +@dataclass +class Outcome: + name: str + passed: bool = True + detail: str = "" + + def line(self) -> str: + prefix = "PASS" if self.passed else "FAIL" + return f"{prefix}: {self.name}" + (f" — {self.detail}" if self.detail else "") + + +def case_zerohop(pub: Publisher, sub: Subscriber) -> Outcome: + name = "zerohop" + pre = health_stats() + pkt_id = 0xA1A1A1A1 + body = build_envelope( + channel = "LongFast", + portnum = portnums_pb2.PortNum.TEXT_MESSAGE_APP, + payload = b"hello-zerohop", + packet_id = pkt_id, + from_node = 0xDEADBEEF, + hop_limit = 3, + hop_start = 3, + ) + pub.publish(topic_for("LongFast"), body) + time.sleep(SETTLE_SECONDS) + + delivered = [m for m in sub.snapshot() if _packet_id_of(m.payload) == pkt_id] + if not delivered: + return Outcome(name, False, "no packet with our id was delivered") + hop = _parse_hop_limit(delivered[-1].payload) + if hop != 0: + return Outcome(name, False, f"delivered hop_limit={hop}, expected 0") + + post = health_stats() + if post.get("zerohop", 0) - pre.get("zerohop", 0) < 1: + return Outcome(name, False, "stats.zerohop did not increment") + return Outcome(name) + + +def case_drop(pub: Publisher, sub: Subscriber) -> Outcome: + name = "drop" + pre = health_stats() + pkt_id = 0xA2A2A2A2 + body = build_envelope( + channel = "LongFast", + portnum = portnums_pb2.PortNum.RANGE_TEST_APP, + payload = b"flood", + packet_id = pkt_id, + from_node = 0xDEADBEEF, + hop_limit = 3, + ) + pub.publish(topic_for("LongFast"), body) + time.sleep(SETTLE_SECONDS) + + delivered = [m for m in sub.snapshot() if _packet_id_of(m.payload) == pkt_id] + if delivered: + return Outcome(name, False, + f"packet was delivered to subscriber ({len(delivered)} times); " + "drop should have denied it") + + post = health_stats() + if post.get("dropped", 0) - pre.get("dropped", 0) < 1: + return Outcome(name, False, "stats.dropped did not increment") + return Outcome(name) + + +def case_passthru(pub: Publisher, sub: Subscriber) -> Outcome: + """Channel NOT in zerohop_channels — packet must transit unchanged.""" + name = "passthru" + pre = health_stats() + pkt_id = 0xA3A3A3A3 + body = build_envelope( + channel = "PrivateClear", + portnum = portnums_pb2.PortNum.TEXT_MESSAGE_APP, + payload = b"hello-private", + packet_id = pkt_id, + from_node = 0xDEADBEEF, + hop_limit = 3, + key = DEFAULT_KEY, + ) + pub.publish(topic_for("PrivateClear"), body) + time.sleep(SETTLE_SECONDS) + + delivered = [m for m in sub.snapshot() if _packet_id_of(m.payload) == pkt_id] + if not delivered: + return Outcome(name, False, "packet was not delivered to subscriber") + if delivered[-1].payload != body: + return Outcome(name, False, + "delivered payload was modified; passthru must be byte-identical") + hop = _parse_hop_limit(delivered[-1].payload) + if hop != 3: + return Outcome(name, False, f"hop_limit={hop}, expected 3 (no zerohop on this channel)") + + post = health_stats() + if post.get("passthru", 0) - pre.get("passthru", 0) < 1: + return Outcome(name, False, "stats.passthru did not increment") + return Outcome(name) + + +def case_noop(pub: Publisher, sub: Subscriber) -> Outcome: + """Already hop_limit=0 on a zerohop channel — delivered unchanged, counted as noop.""" + name = "noop" + pre = health_stats() + pkt_id = 0xA4A4A4A4 + body = build_envelope( + channel = "LongFast", + portnum = portnums_pb2.PortNum.TEXT_MESSAGE_APP, + payload = b"already-zero", + packet_id = pkt_id, + from_node = 0xDEADBEEF, + hop_limit = 0, + hop_start = 3, + ) + pub.publish(topic_for("LongFast"), body) + time.sleep(SETTLE_SECONDS) + + delivered = [m for m in sub.snapshot() if _packet_id_of(m.payload) == pkt_id] + if not delivered: + return Outcome(name, False, "packet was not delivered to subscriber") + if delivered[-1].payload != body: + return Outcome(name, False, "noop delivered payload should be byte-identical") + + post = health_stats() + if post.get("noop", 0) - pre.get("noop", 0) < 1: + return Outcome(name, False, "stats.noop did not increment") + return Outcome(name) + + +def case_custom_key_passthru(pub: Publisher, sub: Subscriber) -> Outcome: + """Channel NOT in zerohop_channels, encrypted with a key floodgate doesn't have. + + floodgate cannot decrypt the inner Data, so it cannot read the portnum. + The drop filter must therefore NOT fire even if drop_portnums would + otherwise match. Because the channel is not in zerohop_channels, the + packet is delivered byte-identically. + """ + name = "custom-key-passthru" + pre = health_stats() + pkt_id = 0xA5A5A5A5 + body = build_envelope( + channel = "PrivateNet", + portnum = portnums_pb2.PortNum.RANGE_TEST_APP, # would match drop_portnums if readable + payload = b"opaque", + packet_id = pkt_id, + from_node = 0xDEADBEEF, + hop_limit = 3, + key = CUSTOM_KEY, + ) + pub.publish(topic_for("PrivateNet"), body) + time.sleep(SETTLE_SECONDS) + + delivered = [m for m in sub.snapshot() if _packet_id_of(m.payload) == pkt_id] + if not delivered: + return Outcome(name, False, "packet was not delivered to subscriber") + if delivered[-1].payload != body: + return Outcome(name, False, + "delivered payload was modified; passthru must be byte-identical") + + post = health_stats() + if post.get("passthru", 0) - pre.get("passthru", 0) < 1: + return Outcome(name, False, "stats.passthru did not increment") + if post.get("dropped", 0) - pre.get("dropped", 0) > 0: + return Outcome(name, False, + "stats.dropped incremented; drop must not fire on unreadable portnum") + return Outcome(name) + + +def run_all() -> int: + sub = Subscriber(EMQX_HOST, EMQX_PORT) + sub.start() + pub = Publisher(EMQX_HOST, EMQX_PORT) + try: + outcomes: list[Outcome] = [ + case_zerohop(pub, sub), + case_drop(pub, sub), + case_passthru(pub, sub), + case_noop(pub, sub), + case_custom_key_passthru(pub, sub), + ] + for o in outcomes: + print(o.line(), flush=True) + failed = [o for o in outcomes if not o.passed] + print(f"\n{len(outcomes) - len(failed)}/{len(outcomes)} cases passed", flush=True) + return 1 if failed else 0 + finally: + pub.close() + sub.stop() + + +if __name__ == "__main__": + sys.exit(run_all())