From 66ebf008a687aaa51fe5dc5ac91b79053e50976c Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Jun 2026 21:30:05 +0000 Subject: [PATCH 1/2] fix(cot): switch emitter to TCP for OpenTAK Server compatibility MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit OpenTAK Server does not accept UDP CoT — only TCP on its streaming port (OTS_TCP_STREAMING_PORT, default 8088). The emitter was sending UDP unicast which arrived at the host but was never received by OTS. - CotEmitter now connects via TCP when COT_TAKSERVER_HOST is set, with the same reconnect/backoff logic as CotReceiver - Falls back to UDP multicast when no server host is configured (LAN mode) - Extracts _msg_to_cot() helper shared by both TCP and UDP paths - Adds VERTEX- UID filter in CotReceiver to drop echoed-back CoT and prevent feedback loops when OTS broadcasts to all connected clients Co-Authored-By: Claude Sonnet 4.6 Claude-Session: https://claude.ai/code/session_01PswxT6xcRYioF1ZfgszgRi --- poller/pollers/cot_emitter.py | 136 ++++++++++++++++++++++----------- poller/pollers/cot_receiver.py | 4 + 2 files changed, 96 insertions(+), 44 deletions(-) diff --git a/poller/pollers/cot_emitter.py b/poller/pollers/cot_emitter.py index 76c587a..97942d6 100644 --- a/poller/pollers/cot_emitter.py +++ b/poller/pollers/cot_emitter.py @@ -1,12 +1,15 @@ """ -TAK/CoT (Cursor-on-Target) UDP emitter. +TAK/CoT (Cursor-on-Target) emitter. Subscribes to Redis entity_update and annotation_update events and broadcasts: -- Entity positions as CoT datagrams to ATAK/WinTAK clients +- Entity positions as CoT events to ATAK/WinTAK clients - Vertex annotations as CoT map markers (b-m-p-s-m) to openTAK -Supports both UDP multicast (default 239.2.3.1:6969) and unicast to a -dedicated TAK server. Enable via COT_ENABLED=true in .env. +When COT_TAKSERVER_HOST is set, connects via TCP (required by OpenTAK Server +and most modern TAK servers). Falls back to UDP multicast (239.2.3.1:6969) +when no server host is configured. + +Enable via COT_ENABLED=true in .env. """ import asyncio @@ -19,6 +22,7 @@ from bus import get_bus from config import settings +from security import validate_safe_host from .base import BasePoller logger = logging.getLogger(__name__) @@ -64,7 +68,7 @@ def _build_cot(entity: dict[str, Any]) -> str | None: # Fix UID collision bug (prefer entity_id or id) entity_id = entity.get("entity_id") or entity.get("id") or "unknown" raw_uid = f"VERTEX-{entity_id}" - + cot_type = _COT_TYPES.get(entity.get("entity_type", ""), _COT_DEFAULT) raw_callsign = ( entity.get("callsign") @@ -191,20 +195,34 @@ def _build_annotation_cot(ann: dict[str, Any]) -> str | None: ) -def _make_socket() -> tuple[socket.socket, tuple[str, int]]: - if settings.cot_takserver_host: - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - addr = (settings.cot_takserver_host, settings.cot_takserver_port) +def _msg_to_cot(msg: dict) -> str | None: + """Convert a Redis pub/sub message to a CoT XML string, or None to skip.""" + if msg["type"] != "message": + return None + try: + payload = json.loads(msg["data"]) + except (json.JSONDecodeError, TypeError): + return None + + if msg["channel"] == "civic:updates": + if payload.get("type") != "entity_update": + return None + return _build_cot(payload.get("data", {})) else: - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) - sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32) - addr = (settings.cot_multicast_addr, settings.cot_multicast_port) - sock.setblocking(False) - return sock, addr + # annotation_update — skip TAK-sourced annotations to avoid feedback loops + if payload.get("source") == "tak": + return None + if payload.get("action") == "delete": + return None + return _build_annotation_cot(payload) class CotEmitter(BasePoller): - """Listens on Redis pub/sub and emits CoT datagrams for entity updates and annotations.""" + """Listens on Redis pub/sub and emits CoT events for entity updates and annotations. + + Uses TCP when COT_TAKSERVER_HOST is set (required by OpenTAK Server and most + modern TAK servers). Falls back to UDP multicast for LAN-only deployments. + """ name = "cot_emitter" interval = 0 # event-driven @@ -214,47 +232,77 @@ async def run(self) -> None: logger.info("[cot] CoT output disabled (COT_ENABLED not set)") return - dest = ( - f"{settings.cot_takserver_host}:{settings.cot_takserver_port}" - if settings.cot_takserver_host - else f"multicast {settings.cot_multicast_addr}:{settings.cot_multicast_port}" - ) - logger.info("[cot] Starting CoT emitter → %s", dest) + if settings.cot_takserver_host: + await self._run_tcp(settings.cot_takserver_host, settings.cot_takserver_port) + else: + await self._run_udp() - r = await get_bus() - sock, addr = _make_socket() + async def _run_tcp(self, host: str, port: int) -> None: + logger.info("[cot] Starting CoT emitter (TCP) → %s:%d", host, port) + delay = 2.0 + failures = 0 + + while True: + try: + await validate_safe_host(host) + reader, writer = await asyncio.open_connection(host, port) + failures = 0 + delay = 2.0 + logger.info("[cot] TCP connected to TAK server %s:%d", host, port) + + r = await get_bus() + try: + async with r.pubsub() as ps: + await ps.subscribe("civic:updates", "annotation_update") + async for msg in ps.listen(): + xml = _msg_to_cot(msg) + if xml: + try: + writer.write(xml.encode()) + await writer.drain() + except (ConnectionResetError, BrokenPipeError, OSError) as exc: + logger.warning("[cot] TCP send failed: %s — reconnecting", exc) + break + finally: + writer.close() + try: + await writer.wait_closed() + except Exception: + pass + + except (ConnectionRefusedError, OSError) as exc: + failures += 1 + logger.warning( + "[cot] TCP connect failed (%d): %s — retry in %.0fs", failures, exc, delay + ) + except Exception as exc: + failures += 1 + logger.exception("[cot] Unexpected error (%d) — retry in %.0fs", failures, delay) + + await asyncio.sleep(delay) + delay = min(delay * 2, 60.0) + + async def _run_udp(self) -> None: + addr = (settings.cot_multicast_addr, settings.cot_multicast_port) + logger.info("[cot] Starting CoT emitter (UDP multicast) → %s:%d", *addr) + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32) + sock.setblocking(False) loop = asyncio.get_running_loop() async def _send(data: bytes) -> None: try: await loop.sock_sendto(sock, data, addr) except Exception as exc: - logger.debug("[cot] Send failed: %s", exc) + logger.debug("[cot] UDP send failed: %s", exc) + r = await get_bus() try: async with r.pubsub() as ps: await ps.subscribe("civic:updates", "annotation_update") async for msg in ps.listen(): - if msg["type"] != "message": - continue - try: - payload = json.loads(msg["data"]) - except (json.JSONDecodeError, TypeError): - continue - - if msg["channel"] == "civic:updates": - if payload.get("type") != "entity_update": - continue - xml = _build_cot(payload.get("data", {})) - else: - # annotation_update channel - # Skip annotations that originated from TAK to avoid feedback loops. - if payload.get("source") == "tak": - continue - if payload.get("action") == "delete": - continue - xml = _build_annotation_cot(payload) - + xml = _msg_to_cot(msg) if xml: await _send(xml.encode()) finally: diff --git a/poller/pollers/cot_receiver.py b/poller/pollers/cot_receiver.py index 2258474..80a8e57 100644 --- a/poller/pollers/cot_receiver.py +++ b/poller/pollers/cot_receiver.py @@ -42,6 +42,10 @@ def _parse_cot(xml_bytes: bytes) -> dict[str, Any] | None: uid = root.get("uid", "") cot_type = root.get("type", "") + # Skip CoT that Vertex itself emitted and OTS echoed back. + if uid.startswith("VERTEX-"): + return None + point = root.find("point") if point is None: return None From 68131148d58d3ad15295af7547029633639fa4fb Mon Sep 17 00:00:00 2001 From: Claude Date: Mon, 22 Jun 2026 21:34:17 +0000 Subject: [PATCH 2/2] refactor(cot): consolidate receiver host/port into COT_TAKSERVER_HOST/PORT Both emitter and receiver connect to the same TAK server endpoint, so COT_RECEIVE_HOST and COT_RECEIVE_PORT were redundant. The receiver now uses COT_TAKSERVER_HOST and COT_TAKSERVER_PORT, and COT_RECEIVE_ENABLED remains as the simple boolean gate for the receive direction. Co-Authored-By: Claude Sonnet 4.6 Claude-Session: https://claude.ai/code/session_01PswxT6xcRYioF1ZfgszgRi --- .env.example | 13 +++++-------- poller/config.py | 12 ++++-------- poller/pollers/cot_receiver.py | 14 +++++++------- 3 files changed, 16 insertions(+), 23 deletions(-) diff --git a/.env.example b/.env.example index ddf58c5..5989483 100644 --- a/.env.example +++ b/.env.example @@ -131,22 +131,19 @@ TRAFFIC_FLOW_CORRIDORS=I-5,99W,Pacific Highway # ── 4. TAK / CoT (BIDIRECTIONAL) ───────────────────────────────────────────── -# Outbound — broadcast Vertex entity positions and map annotations to ATAK/WinTAK -# via UDP multicast (default) or directly to a TAK server via unicast. +# TAK server host and port — shared by both emitter and receiver. +# Standard TAK Server: port 8087 | OpenTAK Server (OTS): port 8088 +# Leave COT_TAKSERVER_HOST blank to use UDP multicast instead of a TAK server. COT_ENABLED=false COT_MULTICAST_ADDR=239.2.3.1 COT_MULTICAST_PORT=6969 COT_STALE_SECONDS=60 -# Optional unicast to a TAK server — overrides multicast when set COT_TAKSERVER_HOST= COT_TAKSERVER_PORT=8087 -# Inbound — receive CoT from an openTAK / TAK Server via TCP streaming. -# Field operator positions appear as tak_client entities on the Vertex map. -# TAK map markers are ingested as Vertex annotations. +# Enable receive direction (TAK server → Vertex). +# Field operator positions appear as tak_client entities; TAK markers as annotations. COT_RECEIVE_ENABLED=false -COT_RECEIVE_HOST= -COT_RECEIVE_PORT=8087 # ── 5. FRONTEND BUILD ───────────────────────────────────────────────────────── diff --git a/poller/config.py b/poller/config.py index 6484122..37588a2 100644 --- a/poller/config.py +++ b/poller/config.py @@ -158,22 +158,18 @@ def regions(self) -> list[RegionConfig]: adsb_airlines_db_path: str = "/data/airlines.dat" adsb_navaids_db_path: str = "/data/navaids.csv" - # TAK/CoT output — set COT_ENABLED=true to broadcast entity positions - # to ATAK/WinTAK clients via UDP multicast or a dedicated TAK server. + # TAK/CoT — COT_TAKSERVER_HOST/PORT is shared by both emitter and receiver. + # Standard TAK Server: port 8087. OpenTAK Server (OTS): port 8088. + # Leave host blank to use UDP multicast instead of a TAK server. cot_enabled: bool = False cot_multicast_addr: str = "239.2.3.1" cot_multicast_port: int = 6969 cot_stale_seconds: int = 60 - # Optional unicast to a TAK server (overrides multicast when set) cot_takserver_host: str = "" cot_takserver_port: int = 8087 - # TAK/CoT ingest — set COT_RECEIVE_ENABLED=true to receive CoT from openTAK. - # Connects to openTAK via TCP streaming; ingests field operator positions as - # tak_client entities and TAK map markers as Vertex annotations. + # Enable receive direction (OTS → Vertex). Uses cot_takserver_host/port. cot_receive_enabled: bool = False - cot_receive_host: str = "" - cot_receive_port: int = 8087 # Anomaly detection — statistical baseline monitoring anomaly_enabled: bool = True diff --git a/poller/pollers/cot_receiver.py b/poller/pollers/cot_receiver.py index 80a8e57..a302526 100644 --- a/poller/pollers/cot_receiver.py +++ b/poller/pollers/cot_receiver.py @@ -10,7 +10,7 @@ CoT messages arriving from TAK are tagged with their original TAK UID so the annotation bridge can skip re-broadcasting them back to TAK (no feedback loop). -Enable via COT_RECEIVE_ENABLED=true, COT_RECEIVE_HOST= in .env. +Enable via COT_RECEIVE_ENABLED=true, COT_TAKSERVER_HOST= in .env. """ import asyncio @@ -182,14 +182,14 @@ async def run(self) -> None: if not settings.cot_receive_enabled: logger.info("[cot_rx] CoT receive disabled (COT_RECEIVE_ENABLED not set)") return - if not settings.cot_receive_host: - logger.warning("[cot_rx] COT_RECEIVE_HOST not set — receiver disabled") + if not settings.cot_takserver_host: + logger.warning("[cot_rx] COT_TAKSERVER_HOST not set — receiver disabled") return logger.info( "[cot_rx] Connecting to openTAK at %s:%d", - settings.cot_receive_host, - settings.cot_receive_port, + settings.cot_takserver_host, + settings.cot_takserver_port, ) delay = 2.0 @@ -197,9 +197,9 @@ async def run(self) -> None: while True: try: - await validate_safe_host(settings.cot_receive_host) + await validate_safe_host(settings.cot_takserver_host) reader, writer = await asyncio.open_connection( - settings.cot_receive_host, settings.cot_receive_port + settings.cot_takserver_host, settings.cot_takserver_port ) _consecutive_failures = 0 delay = 2.0