diff --git a/Makefile b/Makefile index 05922051..6253bd8c 100644 --- a/Makefile +++ b/Makefile @@ -47,6 +47,10 @@ fprime-venv: uv ## Create a virtual environment @INST_CFG=$$(ls $(shell pwd)/fprime-venv/lib/python*/site-packages/fprime_yamcs/yamcs/src/main/yamcs/etc/yamcs.fprime-project.yaml 2>/dev/null | head -1); \ if [ -z "$$INST_CFG" ]; then echo "⚠ instance config not found, skipping"; exit 0; fi; \ $(VIRTUAL_ENV)/bin/python tools/apply-yamcs-instance-config-fix.py "$$INST_CFG" + @echo "Applying fprime-yamcs-events opcode-ack fix..." + @EVENTS_PROC=$$(ls $(shell pwd)/fprime-venv/lib/python*/site-packages/fprime_yamcs/events/processor.py 2>/dev/null | head -1); \ + if [ -z "$$EVENTS_PROC" ]; then echo "⚠ events processor not found, skipping"; exit 0; fi; \ + $(VIRTUAL_ENV)/bin/python tools/apply-opcode-ack-fix.py "$$EVENTS_PROC" .PHONY: zephyr-setup @@ -307,6 +311,7 @@ yamcs-stop: ## Stop all YAMCS-related processes (YAMCS server, events bridge, ad }; \ kill_udp_port 50001 'serial adapter'; \ kill_udp_port 50000 'TM UDP sender'; \ + stop_repo_processes 'opcode_ack_bridge.py' 'opcode-ack bridge'; \ stop_repo_processes 'fprime-yamcs-events' 'fprime-yamcs-events'; \ stop_repo_processes 'fprime_yamcs' 'fprime-yamcs wrapper'; \ stop_repo_processes 'mvn' 'Maven yamcs runner'; \ @@ -337,6 +342,8 @@ yamcs: fprime-venv yamcs-dict ## Run YAMCS with serial adapter (Use Case 1: UART @sleep 5 @echo "Starting fprime-yamcs-events bridge..." $(UV_RUN) fprime-yamcs-events --dictionary $(shell pwd)/build-artifacts/zephyr/fprime-zephyr-deployment/dict/ReferenceDeploymentTopologyDictionary.json & + @echo "Starting OpCode acknowledgement bridge..." + $(VIRTUAL_ENV)/bin/python tools/yamcs/opcode_ack_bridge.py --dictionary $(shell pwd)/build-artifacts/zephyr/fprime-zephyr-deployment/dict/ReferenceDeploymentTopologyDictionary.json & @echo "Starting serial adapter on $(UART_DEVICE)..." $(VIRTUAL_ENV)/bin/python tools/yamcs/proves_adapter.py \ --mode serial \ diff --git a/tools/apply-opcode-ack-fix.py b/tools/apply-opcode-ack-fix.py new file mode 100644 index 00000000..d17682c5 --- /dev/null +++ b/tools/apply-opcode-ack-fix.py @@ -0,0 +1,58 @@ +"""Patch FPrimeEventProcessor to include event args in the send_event extra field. + +The extra field is a free-form str→str mapping that YAMCS stores alongside each +event. By including event arguments there, the opcode_ack_bridge can recover the +opcode value from OpCodeCompleted events without needing to re-decode the raw +CCSDS packet or parse the human-readable message string. +""" + +import sys + +if len(sys.argv) != 2: + print( + f"Usage: {sys.argv[0]} ", + file=sys.stderr, + ) + sys.exit(1) + +path = sys.argv[1] +with open(path) as f: + content = f.read() + +SENTINEL = "extra=event_args if event_args else None," + +if SENTINEL in content: + print("⚠ fprime-yamcs-events opcode-ack fix already applied") + sys.exit(0) + +OLD = ( + " self.yamcs_client.send_event(\n" + " instance=self.yamcs_instance,\n" + " source='FPrimeEventProcessor',\n" + " event_type=event_name,\n" + " severity=yamcs_severity,\n" + " message=message,\n" + " )\n" +) +NEW = ( + " self.yamcs_client.send_event(\n" + " instance=self.yamcs_instance,\n" + " source='FPrimeEventProcessor',\n" + " event_type=event_name,\n" + " severity=yamcs_severity,\n" + " message=message,\n" + " extra=event_args if event_args else None,\n" + " )\n" +) + +fixed = content.replace(OLD, NEW) + +if fixed == content: + print( + "❌ Error: pattern not found in events processor — fix may not apply or upstream changed" + ) + sys.exit(1) + +with open(path, "w") as f: + f.write(fixed) +print("✓ Applied fprime-yamcs-events opcode-ack fix") diff --git a/tools/yamcs/opcode_ack_bridge.py b/tools/yamcs/opcode_ack_bridge.py new file mode 100644 index 00000000..3631411a --- /dev/null +++ b/tools/yamcs/opcode_ack_bridge.py @@ -0,0 +1,347 @@ +"""OpCode Acknowledgement Bridge — PROVES YAMCS Integration. + +Subscribes to the YAMCS event stream and command history, then bridges +F Prime OpCodeCompleted events to YAMCS command acknowledgements. + +How it works +------------ +1. The FPrimeEventProcessor (fprime-yamcs-events) decodes raw F Prime event + packets and publishes them to YAMCS. For OpCodeCompleted events it also + stores the raw opcode value in the YAMCS event's ``extra`` dict under the + key ``Opcode`` (applied via tools/apply-opcode-ack-fix.py). +2. This bridge subscribes to YAMCS command history to build a queue of + pending (issued-but-not-yet-acknowledged) commands keyed by their F Prime + component.command name. +3. It also subscribes to YAMCS events and listens for events whose source is + ``FPrimeEventProcessor`` and whose type ends with ``OpCodeDispatched`` + or ``OpCodeCompleted``. +4. On every such event it: + a. Reads the opcode from ``event.extra["Opcode"]``. + b. Cross-references the F Prime GDS dictionary (loaded from the topology + dictionary JSON) to resolve the opcode to the fully-qualified F Prime + command name (e.g. ``cmdDisp.CMD_NO_OP``). + c. For OpCodeDispatched: posts ``Acknowledged_Status = OK`` (command was + dispatched to the target component). + d. For OpCodeCompleted: posts ``CommandComplete_Status = OK`` (populates + the YAMCS Completion section) and removes the command from the pending + queue. + +Usage +----- + python opcode_ack_bridge.py \\ + --dictionary /path/to/ReferenceDeploymentTopologyDictionary.json +""" + +import argparse +import logging +import os +import sys +import threading +from collections import deque +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +# Ensure the Framing/src directory is on sys.path so that fprime_gds can be +# imported when the script is run outside the virtual environment. +sys.path.insert(0, str(Path(__file__).parents[2] / "Framing" / "src")) + +# Flush stdout immediately so log lines appear in real time when used as a +# background process (consistent with proves_adapter.py). +sys.stdout.reconfigure(line_buffering=True) + +from fprime_gds.common.loaders.cmd_json_loader import CmdJsonLoader # noqa: E402 +from yamcs.client import YamcsClient # noqa: E402 +from yamcs.protobuf import yamcs_pb2 # noqa: E402 +from yamcs.protobuf.commanding import ( # noqa: E402 + commands_service_pb2, +) + +logger = logging.getLogger("opcode-ack-bridge") + + +class OpcodeAckBridge: + """Bridge that recognises F Prime OpCodeCompleted events as YAMCS command acks. + + Thread-safety + ~~~~~~~~~~~~~ + Both the command-history subscription callback and the event subscription + callback run in background threads managed by the yamcs-client library. + Access to ``_pending`` is protected by ``_lock``. + """ + + # Source label set by FPrimeEventProcessor when publishing events. + EVENT_SOURCE = "FPrimeEventProcessor" + # F Prime event type suffixes for command lifecycle events. + OPCODE_DISPATCHED_SUFFIX = "OpCodeDispatched" + OPCODE_COMPLETED_SUFFIX = "OpCodeCompleted" + + def __init__(self, yamcs_url: str, yamcs_instance: str, dictionary_path: str): + self.yamcs_url = yamcs_url + self.yamcs_instance = yamcs_instance + self._lock = threading.Lock() + # {fprime_cmd_name: deque([(yamcs_cmd_id, yamcs_cmd_qualified_name)])} + self._pending: dict[str, deque[tuple[str, str]]] = {} + + # Build opcode (int) → CmdTemplate mapping from the F Prime GDS dictionary. + self._opcode_map = self._load_cmd_dict(dictionary_path) + logger.info( + f"Loaded {len(self._opcode_map)} command definition(s) from {dictionary_path}" + ) + + # Connect to YAMCS. + logger.info(f"Connecting to YAMCS at {yamcs_url}, instance={yamcs_instance!r}") + self.yamcs_client = YamcsClient(yamcs_url) + self.processor_client = self.yamcs_client.get_processor( + yamcs_instance, "realtime" + ) + + # ------------------------------------------------------------------ + # Initialisation helpers + # ------------------------------------------------------------------ + + @staticmethod + def _load_cmd_dict(dictionary_path: str) -> dict: + """Return ``{opcode (int): CmdTemplate}`` from the topology dictionary.""" + loader = CmdJsonLoader(Path(dictionary_path)) + id_dict = loader.get_id_dict(dictionary_path) + return id_dict + + # ------------------------------------------------------------------ + # Name conversion utilities + # ------------------------------------------------------------------ + + @staticmethod + def _yamcs_to_fprime_name(yamcs_name: str) -> Optional[str]: + """Convert a YAMCS XTCE-qualified command name to an F Prime ``component.cmd`` name. + + The XTCE generated by fprime-to-xtce nests SpaceSystem elements that + YAMCS represents as ``/``-separated qualified names:: + + /ReferenceDeployment_ReferenceDeployment/CdhCore/cmdDisp/CMD_NO_OP + + The corresponding F Prime fully-qualified name is:: + + CdhCore.cmdDisp.CMD_NO_OP + + The first segment is the root SpaceSystem (topology name) and is + stripped; the remaining segments are joined with ``.``. + """ + # Strip leading '/' and split on '/'. + parts = yamcs_name.strip("/").split("/") + # Need at least the root SpaceSystem + one more segment. + if len(parts) < 2: + return None + # Drop the root SpaceSystem (topology name), join the rest. + return ".".join(parts[1:]) + + # ------------------------------------------------------------------ + # Subscription callbacks + # ------------------------------------------------------------------ + + def _on_cmd_history(self, cmdhist) -> None: + """Track newly issued commands that have not yet been acknowledged.""" + cmd_id: str = cmdhist.id + yamcs_name: str = cmdhist.name + + # Skip commands that have already received a completion acknowledgement. + if "CommandComplete_Status" in (cmdhist.attributes or {}): + return + + fprime_name = self._yamcs_to_fprime_name(yamcs_name) + if fprime_name is None: + return + + with self._lock: + queue = self._pending.setdefault(fprime_name, deque()) + # Avoid duplicates on repeated cmdhist updates for the same command. + if not any(entry[0] == cmd_id for entry in queue): + queue.append((cmd_id, yamcs_name)) + logger.debug(f"Tracking pending command: {fprime_name!r} id={cmd_id}") + + def _on_event(self, event) -> None: + """Recognise OpCodeDispatched/OpCodeCompleted events and post acks.""" + if event.source != self.EVENT_SOURCE: + return + + event_type = event.event_type or "" + if event_type.endswith(self.OPCODE_DISPATCHED_SUFFIX): + stage = "Acknowledged" + pop_pending = False + elif event_type.endswith(self.OPCODE_COMPLETED_SUFFIX): + stage = "CommandComplete" + pop_pending = True + else: + return + + extra = event.extra or {} + opcode_str = extra.get("Opcode") + if opcode_str is None: + logger.warning( + f"Received {event_type} event without 'Opcode' in extra — " + f"ensure tools/apply-opcode-ack-fix.py has been applied. " + f"Message: {event.message!r}" + ) + return + + try: + opcode = int(opcode_str) + except ValueError: + logger.warning(f"Malformed Opcode value in event extra: {opcode_str!r}") + return + + cmd_template = self._opcode_map.get(opcode) + if cmd_template is None: + logger.warning( + f"{event_type} for opcode {opcode:#010x} — " + f"opcode not found in F Prime GDS dictionary" + ) + return + + fprime_name: str = cmd_template.get_full_name() + logger.info(f"{event_type} → opcode={opcode:#010x} command={fprime_name!r}") + + with self._lock: + queue = self._pending.get(fprime_name) + if not queue: + logger.warning( + f"No pending YAMCS command found for {fprime_name!r} " + f"(opcode={opcode:#010x}). " + "The command may have been issued before this bridge started." + ) + return + if pop_pending: + cmd_id, yamcs_name = queue.popleft() + else: + cmd_id, yamcs_name = queue[0] + + self._post_ack(cmd_id, yamcs_name, stage) + + # ------------------------------------------------------------------ + # YAMCS REST API call + # ------------------------------------------------------------------ + + def _post_ack(self, cmd_id: str, yamcs_name: str, stage: str) -> None: + """Post a ``{stage}_Status = OK`` attribute to the YAMCS command history. + + YAMCS command acknowledgement stages follow the naming convention + ``{stage}_Status`` (string) and ``{stage}_Time`` (timestamp). + """ + req = commands_service_pb2.UpdateCommandHistoryRequest() + req.instance = self.yamcs_instance + req.processor = "realtime" + req.name = yamcs_name + req.id = cmd_id + + now = datetime.now(timezone.utc) + now_ms = int(now.timestamp() * 1000) + now_iso = now.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + + attr_status = req.attributes.add() + attr_status.name = f"{stage}_Status" + attr_status.value.type = yamcs_pb2.Value.STRING + attr_status.value.stringValue = "OK" + attr_status.time = now_ms + + attr_time = req.attributes.add() + attr_time.name = f"{stage}_Time" + attr_time.value.type = yamcs_pb2.Value.STRING + attr_time.value.stringValue = now_iso + attr_time.time = now_ms + + url = f"/processors/{self.yamcs_instance}/realtime/commandhistory{yamcs_name}" + try: + self.yamcs_client.ctx.post_proto(url, data=req.SerializeToString()) + logger.info(f"✓ {stage}=OK for {yamcs_name!r} (id={cmd_id})") + except Exception as exc: + logger.error(f"Failed to post {stage} ack for {yamcs_name!r}: {exc}") + + # ------------------------------------------------------------------ + # Main loop + # ------------------------------------------------------------------ + + def start(self) -> None: + """Subscribe to YAMCS and block until the connection ends or Ctrl-C.""" + logger.info("Subscribing to YAMCS command history…") + self.processor_client.create_command_history_subscription( + on_data=self._on_cmd_history + ) + + logger.info("Subscribing to YAMCS event stream…") + event_sub = self.yamcs_client.create_event_subscription( + instance=self.yamcs_instance, + on_data=self._on_event, + ) + + logger.info( + f"OpCode Ack Bridge running — " + f"YAMCS={self.yamcs_url} instance={self.yamcs_instance!r} " + "Press Ctrl-C to stop." + ) + try: + # result() blocks until the WebSocket subscription is closed by + # the server or by calling subscription.cancel(), avoiding a + # busy-wait loop. A KeyboardInterrupt (Ctrl-C) breaks out cleanly. + event_sub.result() + except KeyboardInterrupt: + logger.info("Shutting down OpCode Ack Bridge.") + + +# --------------------------------------------------------------------------- +# CLI entry point +# --------------------------------------------------------------------------- + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description=( + "Bridge F Prime OpCodeCompleted events to YAMCS command acknowledgements. " + "Cross-references the F Prime GDS dictionary to resolve opcodes to command " + "names and posts Completed_Status=OK to the YAMCS command history." + ), + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument( + "--yamcs-url", + default="http://localhost:8090", + help="YAMCS server base URL", + ) + parser.add_argument( + "--instance", + default=os.environ.get("FPRIME_YAMCS_INSTANCE", "fprime-project"), + help="YAMCS instance name", + ) + parser.add_argument( + "--dictionary", + default=os.environ.get("FPRIME_DICTIONARY"), + help="Path to the F Prime topology dictionary JSON file", + ) + parser.add_argument( + "--verbose", + action="store_true", + help="Enable DEBUG-level logging", + ) + args = parser.parse_args() + if args.dictionary is None: + parser.error( + "Supply --dictionary or set the FPRIME_DICTIONARY environment variable" + ) + return args + + +def main() -> None: + args = parse_args() + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", + ) + bridge = OpcodeAckBridge( + yamcs_url=args.yamcs_url, + yamcs_instance=args.instance, + dictionary_path=args.dictionary, + ) + bridge.start() + + +if __name__ == "__main__": + main()