Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,22 @@ COT_MULTICAST_PORT=6969
COT_STALE_SECONDS=60
COT_TAKSERVER_HOST=
COT_TAKSERVER_PORT=8087
# Entity types to emit — comma-separated, empty = all types.
# Available types:
# aircraft ADS-B transponder tracks (Beast / OpenSky)
# vessel AIS vessel positions
# aprs APRS packet radio stations
# mesh_node Meshtastic / MeshCore nodes
# train Amtrak + GTFS-RT light rail / commuter rail
# fire_incident Wildfire incident data (NIFC / fire.gov)
# stream_gauge USGS stream gauge readings
# rf_sensor RTL-433 RF sensor devices
# tak_client TAK field operators received from OTS
# Examples:
# COT_ENTITY_TYPES=aircraft,vessel
# COT_ENTITY_TYPES=aircraft,vessel,fire_incident
# COT_ENTITY_TYPES=aircraft,vessel,fire_incident,aprs,mesh_node
COT_ENTITY_TYPES=

# Enable receive direction (TAK server → Vertex).
# Field operator positions appear as tak_client entities; TAK markers as annotations.
Expand Down
3 changes: 3 additions & 0 deletions poller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ def regions(self) -> list[RegionConfig]:
cot_stale_seconds: int = 60
cot_takserver_host: str = ""
cot_takserver_port: int = 8087
# Comma-separated entity types to emit. Empty = all types.
# Known types: aircraft, vessel, aprs, mesh_node, fire_incident, tak_client
cot_entity_types: str = ""

# Enable receive direction (OTS → Vertex). Uses cot_takserver_host/port.
cot_receive_enabled: bool = False
Expand Down
31 changes: 22 additions & 9 deletions poller/pollers/cot_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,11 @@ def _build_annotation_cot(ann: dict[str, Any]) -> str | None:
)


def _msg_to_cot(msg: dict) -> str | None:
"""Convert a Redis pub/sub message to a CoT XML string, or None to skip."""
def _msg_to_cot(msg: dict, allowed: frozenset[str] | None = None) -> str | None:
"""Convert a Redis pub/sub message to a CoT XML string, or None to skip.

allowed: frozenset of entity_type strings to emit; None means emit all.
"""
if msg["type"] != "message":
return None
try:
Expand All @@ -207,7 +210,10 @@ def _msg_to_cot(msg: dict) -> str | None:
if msg["channel"] == "civic:updates":
if payload.get("type") != "entity_update":
return None
return _build_cot(payload.get("data", {}))
entity = payload.get("data", {})
if allowed is not None and entity.get("entity_type") not in allowed:
return None
return _build_cot(entity)
else:
# annotation_update — skip TAK-sourced annotations to avoid feedback loops
if payload.get("source") == "tak":
Expand All @@ -232,12 +238,19 @@ async def run(self) -> None:
logger.info("[cot] CoT output disabled (COT_ENABLED not set)")
return

raw = {t.strip() for t in settings.cot_entity_types.split(",") if t.strip()}
allowed: frozenset[str] | None = frozenset(raw) if raw else None
if allowed:
logger.info("[cot] Filtering entity types: %s", ", ".join(sorted(allowed)))
else:
logger.info("[cot] Emitting all entity types")

if settings.cot_takserver_host:
await self._run_tcp(settings.cot_takserver_host, settings.cot_takserver_port)
await self._run_tcp(settings.cot_takserver_host, settings.cot_takserver_port, allowed)
else:
await self._run_udp()
await self._run_udp(allowed)

async def _run_tcp(self, host: str, port: int) -> None:
async def _run_tcp(self, host: str, port: int, allowed: frozenset[str] | None) -> None:
logger.info("[cot] Starting CoT emitter (TCP) → %s:%d", host, port)
delay = 2.0
failures = 0
Expand All @@ -255,7 +268,7 @@ async def _run_tcp(self, host: str, port: int) -> None:
async with r.pubsub() as ps:
await ps.subscribe("civic:updates", "annotation_update")
async for msg in ps.listen():
xml = _msg_to_cot(msg)
xml = _msg_to_cot(msg, allowed)
if xml:
try:
writer.write(xml.encode())
Expand All @@ -282,7 +295,7 @@ async def _run_tcp(self, host: str, port: int) -> None:
await asyncio.sleep(delay)
delay = min(delay * 2, 60.0)

async def _run_udp(self) -> None:
async def _run_udp(self, allowed: frozenset[str] | None) -> None:
addr = (settings.cot_multicast_addr, settings.cot_multicast_port)
logger.info("[cot] Starting CoT emitter (UDP multicast) → %s:%d", *addr)

Expand All @@ -302,7 +315,7 @@ async def _send(data: bytes) -> None:
async with r.pubsub() as ps:
await ps.subscribe("civic:updates", "annotation_update")
async for msg in ps.listen():
xml = _msg_to_cot(msg)
xml = _msg_to_cot(msg, allowed)
if xml:
await _send(xml.encode())
finally:
Expand Down
Loading