Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -131,22 +131,19 @@ TRAFFIC_FLOW_CORRIDORS=I-5,99W,Pacific Highway

# ── 4. TAK / CoT (BIDIRECTIONAL) ─────────────────────────────────────────────

# Outbound — broadcast Vertex entity positions and map annotations to ATAK/WinTAK
# via UDP multicast (default) or directly to a TAK server via unicast.
# TAK server host and port — shared by both emitter and receiver.
# Standard TAK Server: port 8087 | OpenTAK Server (OTS): port 8088
# Leave COT_TAKSERVER_HOST blank to use UDP multicast instead of a TAK server.
COT_ENABLED=false
COT_MULTICAST_ADDR=239.2.3.1
COT_MULTICAST_PORT=6969
COT_STALE_SECONDS=60
# Optional unicast to a TAK server — overrides multicast when set
COT_TAKSERVER_HOST=
COT_TAKSERVER_PORT=8087

# Inbound — receive CoT from an openTAK / TAK Server via TCP streaming.
# Field operator positions appear as tak_client entities on the Vertex map.
# TAK map markers are ingested as Vertex annotations.
# Enable receive direction (TAK server → Vertex).
# Field operator positions appear as tak_client entities; TAK markers as annotations.
COT_RECEIVE_ENABLED=false
COT_RECEIVE_HOST=
COT_RECEIVE_PORT=8087

# ── 5. FRONTEND BUILD ─────────────────────────────────────────────────────────

Expand Down
12 changes: 4 additions & 8 deletions poller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,22 +158,18 @@ def regions(self) -> list[RegionConfig]:
adsb_airlines_db_path: str = "/data/airlines.dat"
adsb_navaids_db_path: str = "/data/navaids.csv"

# TAK/CoT output — set COT_ENABLED=true to broadcast entity positions
# to ATAK/WinTAK clients via UDP multicast or a dedicated TAK server.
# TAK/CoT — COT_TAKSERVER_HOST/PORT is shared by both emitter and receiver.
# Standard TAK Server: port 8087. OpenTAK Server (OTS): port 8088.
# Leave host blank to use UDP multicast instead of a TAK server.
cot_enabled: bool = False
cot_multicast_addr: str = "239.2.3.1"
cot_multicast_port: int = 6969
cot_stale_seconds: int = 60
# Optional unicast to a TAK server (overrides multicast when set)
cot_takserver_host: str = ""
cot_takserver_port: int = 8087

# TAK/CoT ingest — set COT_RECEIVE_ENABLED=true to receive CoT from openTAK.
# Connects to openTAK via TCP streaming; ingests field operator positions as
# tak_client entities and TAK map markers as Vertex annotations.
# Enable receive direction (OTS → Vertex). Uses cot_takserver_host/port.
cot_receive_enabled: bool = False
cot_receive_host: str = ""
cot_receive_port: int = 8087

# Anomaly detection — statistical baseline monitoring
anomaly_enabled: bool = True
Expand Down
136 changes: 92 additions & 44 deletions poller/pollers/cot_emitter.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
"""
TAK/CoT (Cursor-on-Target) UDP emitter.
TAK/CoT (Cursor-on-Target) emitter.

Subscribes to Redis entity_update and annotation_update events and broadcasts:
- Entity positions as CoT datagrams to ATAK/WinTAK clients
- Entity positions as CoT events to ATAK/WinTAK clients
- Vertex annotations as CoT map markers (b-m-p-s-m) to openTAK

Supports both UDP multicast (default 239.2.3.1:6969) and unicast to a
dedicated TAK server. Enable via COT_ENABLED=true in .env.
When COT_TAKSERVER_HOST is set, connects via TCP (required by OpenTAK Server
and most modern TAK servers). Falls back to UDP multicast (239.2.3.1:6969)
when no server host is configured.

Enable via COT_ENABLED=true in .env.
"""

import asyncio
Expand All @@ -19,6 +22,7 @@

from bus import get_bus
from config import settings
from security import validate_safe_host
from .base import BasePoller

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -64,7 +68,7 @@ def _build_cot(entity: dict[str, Any]) -> str | None:
# Fix UID collision bug (prefer entity_id or id)
entity_id = entity.get("entity_id") or entity.get("id") or "unknown"
raw_uid = f"VERTEX-{entity_id}"

cot_type = _COT_TYPES.get(entity.get("entity_type", ""), _COT_DEFAULT)
raw_callsign = (
entity.get("callsign")
Expand Down Expand Up @@ -191,20 +195,34 @@ def _build_annotation_cot(ann: dict[str, Any]) -> str | None:
)


def _make_socket() -> tuple[socket.socket, tuple[str, int]]:
if settings.cot_takserver_host:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
addr = (settings.cot_takserver_host, settings.cot_takserver_port)
def _msg_to_cot(msg: dict) -> str | None:
"""Convert a Redis pub/sub message to a CoT XML string, or None to skip."""
if msg["type"] != "message":
return None
try:
payload = json.loads(msg["data"])
except (json.JSONDecodeError, TypeError):
return None

if msg["channel"] == "civic:updates":
if payload.get("type") != "entity_update":
return None
return _build_cot(payload.get("data", {}))
else:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32)
addr = (settings.cot_multicast_addr, settings.cot_multicast_port)
sock.setblocking(False)
return sock, addr
# annotation_update — skip TAK-sourced annotations to avoid feedback loops
if payload.get("source") == "tak":
return None
if payload.get("action") == "delete":
return None
return _build_annotation_cot(payload)


class CotEmitter(BasePoller):
"""Listens on Redis pub/sub and emits CoT datagrams for entity updates and annotations."""
"""Listens on Redis pub/sub and emits CoT events for entity updates and annotations.

Uses TCP when COT_TAKSERVER_HOST is set (required by OpenTAK Server and most
modern TAK servers). Falls back to UDP multicast for LAN-only deployments.
"""

name = "cot_emitter"
interval = 0 # event-driven
Expand All @@ -214,47 +232,77 @@ async def run(self) -> None:
logger.info("[cot] CoT output disabled (COT_ENABLED not set)")
return

dest = (
f"{settings.cot_takserver_host}:{settings.cot_takserver_port}"
if settings.cot_takserver_host
else f"multicast {settings.cot_multicast_addr}:{settings.cot_multicast_port}"
)
logger.info("[cot] Starting CoT emitter → %s", dest)
if settings.cot_takserver_host:
await self._run_tcp(settings.cot_takserver_host, settings.cot_takserver_port)
else:
await self._run_udp()

r = await get_bus()
sock, addr = _make_socket()
async def _run_tcp(self, host: str, port: int) -> None:
logger.info("[cot] Starting CoT emitter (TCP) → %s:%d", host, port)
delay = 2.0
failures = 0

while True:
try:
await validate_safe_host(host)
reader, writer = await asyncio.open_connection(host, port)
failures = 0
delay = 2.0
logger.info("[cot] TCP connected to TAK server %s:%d", host, port)

r = await get_bus()
try:
async with r.pubsub() as ps:
await ps.subscribe("civic:updates", "annotation_update")
async for msg in ps.listen():
xml = _msg_to_cot(msg)
if xml:
try:
writer.write(xml.encode())
await writer.drain()
except (ConnectionResetError, BrokenPipeError, OSError) as exc:
logger.warning("[cot] TCP send failed: %s — reconnecting", exc)
break
finally:
writer.close()
try:
await writer.wait_closed()
except Exception:
pass

except (ConnectionRefusedError, OSError) as exc:
failures += 1
logger.warning(
"[cot] TCP connect failed (%d): %s — retry in %.0fs", failures, exc, delay
)
except Exception as exc:
failures += 1
logger.exception("[cot] Unexpected error (%d) — retry in %.0fs", failures, delay)

await asyncio.sleep(delay)
delay = min(delay * 2, 60.0)

async def _run_udp(self) -> None:
addr = (settings.cot_multicast_addr, settings.cot_multicast_port)
logger.info("[cot] Starting CoT emitter (UDP multicast) → %s:%d", *addr)

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32)
sock.setblocking(False)
loop = asyncio.get_running_loop()

async def _send(data: bytes) -> None:
try:
await loop.sock_sendto(sock, data, addr)
except Exception as exc:
logger.debug("[cot] Send failed: %s", exc)
logger.debug("[cot] UDP send failed: %s", exc)

r = await get_bus()
try:
async with r.pubsub() as ps:
await ps.subscribe("civic:updates", "annotation_update")
async for msg in ps.listen():
if msg["type"] != "message":
continue
try:
payload = json.loads(msg["data"])
except (json.JSONDecodeError, TypeError):
continue

if msg["channel"] == "civic:updates":
if payload.get("type") != "entity_update":
continue
xml = _build_cot(payload.get("data", {}))
else:
# annotation_update channel
# Skip annotations that originated from TAK to avoid feedback loops.
if payload.get("source") == "tak":
continue
if payload.get("action") == "delete":
continue
xml = _build_annotation_cot(payload)

xml = _msg_to_cot(msg)
if xml:
await _send(xml.encode())
finally:
Expand Down
18 changes: 11 additions & 7 deletions poller/pollers/cot_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
CoT messages arriving from TAK are tagged with their original TAK UID so the
annotation bridge can skip re-broadcasting them back to TAK (no feedback loop).

Enable via COT_RECEIVE_ENABLED=true, COT_RECEIVE_HOST=<ip> in .env.
Enable via COT_RECEIVE_ENABLED=true, COT_TAKSERVER_HOST=<ip> in .env.
"""

import asyncio
Expand Down Expand Up @@ -42,6 +42,10 @@ def _parse_cot(xml_bytes: bytes) -> dict[str, Any] | None:
uid = root.get("uid", "")
cot_type = root.get("type", "")

# Skip CoT that Vertex itself emitted and OTS echoed back.
if uid.startswith("VERTEX-"):
return None

point = root.find("point")
if point is None:
return None
Expand Down Expand Up @@ -178,24 +182,24 @@ async def run(self) -> None:
if not settings.cot_receive_enabled:
logger.info("[cot_rx] CoT receive disabled (COT_RECEIVE_ENABLED not set)")
return
if not settings.cot_receive_host:
logger.warning("[cot_rx] COT_RECEIVE_HOST not set — receiver disabled")
if not settings.cot_takserver_host:
logger.warning("[cot_rx] COT_TAKSERVER_HOST not set — receiver disabled")
return

logger.info(
"[cot_rx] Connecting to openTAK at %s:%d",
settings.cot_receive_host,
settings.cot_receive_port,
settings.cot_takserver_host,
settings.cot_takserver_port,
)

delay = 2.0
_consecutive_failures = 0

while True:
try:
await validate_safe_host(settings.cot_receive_host)
await validate_safe_host(settings.cot_takserver_host)
reader, writer = await asyncio.open_connection(
settings.cot_receive_host, settings.cot_receive_port
settings.cot_takserver_host, settings.cot_takserver_port
)
_consecutive_failures = 0
delay = 2.0
Expand Down
Loading