diff --git a/data/debian/rules b/data/debian/rules index f32142df..f122c7c5 100755 --- a/data/debian/rules +++ b/data/debian/rules @@ -21,5 +21,9 @@ override_dh_installsystemd: dh_installsystemd --no-start --name=determine-reboot-cause dh_installsystemd --no-start --name=process-reboot-cause dh_installsystemd --no-start --name=gnoi-shutdown + dh_installsystemd --no-start --name=console-monitor-dce + dh_installsystemd --no-start --name=console-monitor-dte + dh_installsystemd --no-start --name=console-monitor-proxy@ + dh_installsystemd --no-start --name=console-monitor-pty-bridge@ dh_installsystemd $(HOST_SERVICE_OPTS) --name=sonic-hostservice diff --git a/data/debian/sonic-host-services-data.console-monitor-dce.service b/data/debian/sonic-host-services-data.console-monitor-dce.service new file mode 100644 index 00000000..86e11344 --- /dev/null +++ b/data/debian/sonic-host-services-data.console-monitor-dce.service @@ -0,0 +1,18 @@ +[Unit] +Description=Console Monitor DCE Service - Manages PTY Bridge and Proxy Services +Documentation=https://github.com/sonic-net/SONiC/blob/master/doc/console/Console-Monitor-High-Level-Design.md +After=config-setup.service database.service +Requires=config-setup.service database.service + +[Service] +Type=simple +ExecStart=/usr/local/bin/console-monitor dce +Restart=always +RestartSec=10 +StandardOutput=journal +StandardError=journal + +SupplementaryGroups=dialout + +[Install] +WantedBy=sonic.target \ No newline at end of file diff --git a/data/debian/sonic-host-services-data.console-monitor-dte.service b/data/debian/sonic-host-services-data.console-monitor-dte.service new file mode 100644 index 00000000..1963a417 --- /dev/null +++ b/data/debian/sonic-host-services-data.console-monitor-dte.service @@ -0,0 +1,14 @@ +[Unit] +Description=Console Monitor DTE Service - Heartbeat Sender +Documentation=https://github.com/sonic-net/SONiC/blob/master/doc/console/Console-Monitor-High-Level-Design.md +After=config-setup.service database.service +Requires=config-setup.service database.service + +[Service] +Type=simple +ExecStart=/usr/local/bin/console-monitor dte +Restart=always +RestartSec=5 + +[Install] +WantedBy=sonic.target diff --git a/data/debian/sonic-host-services-data.console-monitor-proxy@.service b/data/debian/sonic-host-services-data.console-monitor-proxy@.service new file mode 100644 index 00000000..5fb6257a --- /dev/null +++ b/data/debian/sonic-host-services-data.console-monitor-proxy@.service @@ -0,0 +1,19 @@ +[Unit] +Description=Console Monitor Proxy Service for port %i +Documentation=https://github.com/sonic-net/SONiC/blob/master/doc/console/Console-Monitor-High-Level-Design.md +After=config-setup.service database.service console-monitor-pty-bridge@%i.service +Requires=config-setup.service database.service +Wants=console-monitor-pty-bridge@%i.service + +[Service] +Type=simple +ExecStart=/usr/local/bin/console-monitor proxy %i +Restart=on-failure +RestartSec=5 +StandardOutput=journal +StandardError=journal + +SupplementaryGroups=dialout + +[Install] +WantedBy=sonic.target diff --git a/data/debian/sonic-host-services-data.console-monitor-pty-bridge@.service b/data/debian/sonic-host-services-data.console-monitor-pty-bridge@.service new file mode 100644 index 00000000..886e57e7 --- /dev/null +++ b/data/debian/sonic-host-services-data.console-monitor-pty-bridge@.service @@ -0,0 +1,16 @@ +[Unit] +Description=Console Monitor PTY Bridge Service for port %i +Documentation=https://github.com/sonic-net/SONiC/blob/master/doc/console/Console-Monitor-High-Level-Design.md +After=config-setup.service +Requires=config-setup.service + +[Service] +Type=simple +ExecStart=/usr/local/bin/console-monitor pty-bridge %i +Restart=on-failure +RestartSec=5 +StandardOutput=journal +StandardError=journal + +[Install] +WantedBy=sonic.target diff --git a/scripts/console-monitor b/scripts/console-monitor new file mode 100644 index 00000000..d9c67a04 --- /dev/null +++ b/scripts/console-monitor @@ -0,0 +1,1503 @@ +#!/usr/bin/env python3 +""" +Console Monitor Service + +Unified Console Monitor service with four modes: +- pty-bridge: PTY Bridge service that creates a PTY pair using socat +- dce: DCE service that manages pty-bridge and proxy processes via systemctl +- proxy: Proxy service for a single serial port (runs as independent process) +- dte: DTE service that sends heartbeat frames + +Usage: + console-monitor pty-bridge # Start PTY bridge for a specific port + console-monitor dce # Start DCE service + console-monitor proxy # Start proxy for a specific port + console-monitor dte [tty] [baud] # Start DTE service +""" + +import os +import re +import sys +import time +import fcntl +import termios +import tty +import signal +import argparse +import logging +import threading +import subprocess +import select +from dataclasses import dataclass +from enum import IntEnum +from typing import Optional, Callable, Dict, Set + +from swsscommon.swsscommon import ( + DBConnector, + Table, + ConfigDBConnector, +) + +# ============================================================ +# Logging Configuration +# ============================================================ + +# Log level mapping +LOG_LEVEL_MAP = { + 'debug': logging.DEBUG, + 'info': logging.INFO, + 'warning': logging.WARNING, + 'error': logging.ERROR, + 'critical': logging.CRITICAL, +} + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +log = logging.getLogger("console-monitor") + + +def set_log_level(level_str: str) -> None: + """Set logging level from string""" + level = LOG_LEVEL_MAP.get(level_str.lower(), logging.INFO) + logging.getLogger().setLevel(level) + log.setLevel(level) + log.info(f"Log level set to {level_str.upper()}") + + +# ============================================================ +# Global Constants +# ============================================================ + +# Timeout configuration +HEARTBEAT_INTERVAL = 5.0 # DTE heartbeat send interval (seconds) +HEARTBEAT_TIMEOUT = 15.0 # DCE heartbeat timeout (seconds) +RETRY_INTERVAL = 3.0 # Retry interval for waiting phases (seconds) + +# Baud rate mapping +BAUD_MAP = { + 1200: termios.B1200, + 2400: termios.B2400, + 4800: termios.B4800, + 9600: termios.B9600, + 19200: termios.B19200, + 38400: termios.B38400, + 57600: termios.B57600, + 115200: termios.B115200, +} + +# Redis table names +CONSOLE_PORT_TABLE = "CONSOLE_PORT" +CONSOLE_SWITCH_TABLE = "CONSOLE_SWITCH" + +# Default baud rate +DEFAULT_BAUD = 9600 + +# Kernel command line path +PROC_CMDLINE = "/proc/cmdline" + +# PTY symlink suffixes +PTY_SYMLINK_SUFFIX_PTS = "-PTS" # For user applications (picocom) +PTY_SYMLINK_SUFFIX_PTM = "-PTM" # For SerialProxy + +# Exit codes +EXIT_SUCCESS = 0 +EXIT_SERVICE_START_FAILED = 1 +EXIT_SERIAL_CONFIG_ERROR = 2 +EXIT_INVALID_MODE = 3 + +# Systemd service template names +PROXY_SERVICE_TEMPLATE = "console-monitor-proxy@{}.service" +PTY_BRIDGE_SERVICE_TEMPLATE = "console-monitor-pty-bridge@{}.service" + +# Default udev prefix (used when udevprefix.conf is not available) +DEFAULT_UDEV_PREFIX = "ttyUSB" + + +# ============================================================ +# Frame Protocol Constants and Classes +# ============================================================ + +class SpecialChar(IntEnum): + """Special character definitions""" + SOF = 0x05 # Start of Frame + EOF = 0x00 # End of Frame + DLE = 0x10 # Data Link Escape + + +# Set of escapable characters +ESCAPABLE_CHARS = frozenset({SpecialChar.SOF, SpecialChar.EOF, SpecialChar.DLE}) + + +class FrameType(IntEnum): + """Frame type definitions""" + HEARTBEAT = 0x01 + + +# Protocol version +PROTOCOL_VERSION = 0x01 + +# SOF/EOF length +SOF_LEN = 3 +EOF_LEN = 3 + +# Buffer size limit +MAX_FRAME_BUFFER_SIZE = 64 + +# SOF/EOF sequences +SOF_SEQUENCE = bytes([SpecialChar.SOF] * SOF_LEN) +EOF_SEQUENCE = bytes([SpecialChar.EOF] * EOF_LEN) + + +def log_binary_data(data: bytes, direction: str) -> None: + """ + Output data in binary and readable form to terminal + + Args: + data: Byte data to output + direction: Data flow direction (e.g., "Serial→PTY", "PTY→Serial") + """ + hex_str = data.hex(' ', 1) + readable = ''.join(chr(b) if 32 <= b < 127 else f"<0x{b:02x}>" for b in data) + log.debug(f"[{direction}] ({len(data)} bytes):\n HEX: {hex_str}\n ASCII: {readable}\n") + + +def crc16_modbus(data: bytes) -> int: + """CRC-16/MODBUS algorithm""" + crc = 0xFFFF + for byte in data: + crc ^= byte + for _ in range(8): + if crc & 0x0001: + crc = (crc >> 1) ^ 0xA001 + else: + crc >>= 1 + return crc + + +def escape_data(data: bytes) -> bytes: + """Escape data""" + result = bytearray() + for byte in data: + if byte in ESCAPABLE_CHARS: + result.append(SpecialChar.DLE) + result.append(byte) + return bytes(result) + + +def unescape_data(data: bytes) -> bytes: + """Unescape data""" + result = bytearray() + i = 0 + while i < len(data): + if data[i] == SpecialChar.DLE and i + 1 < len(data) and data[i + 1] in ESCAPABLE_CHARS: + result.append(data[i + 1]) + i += 2 + else: + result.append(data[i]) + i += 1 + return bytes(result) + + +@dataclass +class Frame: + """Frame data structure""" + version: int = PROTOCOL_VERSION + seq: int = 0 + flag: int = 0x00 + frame_type: int = FrameType.HEARTBEAT + payload: bytes = b"" + + def build(self) -> bytes: + """Build complete frame binary sequence""" + content = bytes([ + self.version, + self.seq & 0xFF, + self.flag, + self.frame_type, + len(self.payload), + ]) + self.payload + + crc = crc16_modbus(content) + crc_bytes = bytes([crc >> 8, crc & 0xFF]) + + content_with_crc = content + crc_bytes + escaped_content = escape_data(content_with_crc) + + return SOF_SEQUENCE + escaped_content + EOF_SEQUENCE + + @classmethod + def parse(cls, buffer: bytes) -> Optional['Frame']: + """Parse frame from buffer""" + unescaped = unescape_data(buffer) + + if len(unescaped) < 7: + return None + + content = unescaped[:-2] + crc_bytes = unescaped[-2:] + + expected_crc = crc16_modbus(content) + received_crc = (crc_bytes[0] << 8) | crc_bytes[1] + + if expected_crc != received_crc: + return None + + if len(content) < 5: + return None + + version = content[0] + seq = content[1] + flag = content[2] + frame_type = content[3] + length = content[4] + payload = content[5:5 + length] if length > 0 else b"" + + return cls( + version=version, + seq=seq, + flag=flag, + frame_type=frame_type, + payload=payload, + ) + + @classmethod + def create_heartbeat(cls, seq: int = 0) -> 'Frame': + """Create heartbeat frame""" + return cls( + version=PROTOCOL_VERSION, + seq=seq, + flag=0x00, + frame_type=FrameType.HEARTBEAT, + payload=b"", + ) + + def is_heartbeat(self) -> bool: + """Check if this is a heartbeat frame""" + return self.frame_type == FrameType.HEARTBEAT + + +# Callback function types +FrameCallback = Callable[[Frame], None] +UserDataCallback = Callable[[bytes], None] + + +class FrameFilter: + """Frame filter: identifies frames and user data from byte stream""" + + def __init__( + self, + on_frame: Optional[FrameCallback] = None, + on_user_data: Optional[UserDataCallback] = None, + ): + self._on_frame = on_frame + self._on_user_data = on_user_data + self._buffer = bytearray() + self._escape_next = False + self._in_frame = False + + def process(self, data: bytes) -> None: + """Process input byte stream""" + log_binary_data(data, "Received") + + for byte in data: + if self._escape_next: + self._buffer.append(byte) + self._escape_next = False + if len(self._buffer) >= MAX_FRAME_BUFFER_SIZE: + self._flush_buffer() + + elif byte == SpecialChar.DLE: + self._buffer.append(byte) + if self.in_frame: + self._escape_next = True + + elif byte == SpecialChar.SOF: + if not self._in_frame: + self._flush_as_user_data() + else: + self._discard_buffer() + self._in_frame = True + + elif byte == SpecialChar.EOF: + self._try_parse_frame() + self._in_frame = False + + else: + self._buffer.append(byte) + if len(self._buffer) >= MAX_FRAME_BUFFER_SIZE: + self._flush_buffer() + + def on_timeout(self) -> None: + """Timeout callback""" + if not self._in_frame: + self._flush_as_user_data() + else: + self._discard_buffer() + self._in_frame = False + + def flush(self) -> bytes: + """Flush buffer and return remaining data""" + result = bytes(self._buffer) + self._buffer.clear() + self._escape_next = False + self._in_frame = False + return result + + def has_pending_data(self) -> bool: + """Check if there is pending data""" + return len(self._buffer) > 0 + + @property + def in_frame(self) -> bool: + """Check if currently inside a frame""" + return self._in_frame + + def _flush_as_user_data(self) -> None: + """Send buffer as user data""" + if self._buffer and self._on_user_data: + log_binary_data(self._buffer, 'User Data') + self._on_user_data(bytes(self._buffer)) + self._buffer.clear() + self._escape_next = False + + def _discard_buffer(self) -> None: + """Discard buffer""" + self._buffer.clear() + self._escape_next = False + + def _flush_buffer(self) -> None: + """Handle buffer overflow based on whether inside a frame""" + if not self._in_frame: + self._flush_as_user_data() + else: + self._discard_buffer() + self._in_frame = False + + def _try_parse_frame(self) -> None: + """Try to parse buffer as frame""" + if not self._buffer: + self._escape_next = False + return + + log_binary_data(self._buffer, 'Frame Data') + + frame = Frame.parse(bytes(self._buffer)) + self._buffer.clear() + self._escape_next = False + + if frame is not None and self._on_frame: + self._on_frame(frame) + + +# ============================================================ +# Utility Functions +# ============================================================ + +def get_udev_prefix() -> str: + """ + Read udev prefix from udevprefix.conf + + Returns: + Prefix string (e.g., "C0-") or DEFAULT_UDEV_PREFIX if not available + """ + try: + from sonic_py_common import device_info + platform_path, _ = device_info.get_paths_to_platform_and_hwsku_dirs() + config_file = os.path.join(platform_path, "udevprefix.conf") + + if os.path.exists(config_file): + with open(config_file, 'r') as f: + prefix = f.readline().rstrip() + if prefix: + log.info(f"Udev prefix loaded from config: {prefix}") + return prefix + except Exception as e: + log.warning(f"Failed to read udevprefix.conf: {e}") + + log.info(f"Using default udev prefix: {DEFAULT_UDEV_PREFIX}") + return DEFAULT_UDEV_PREFIX + + +def set_nonblocking(fd: int) -> None: + """Set file descriptor to non-blocking mode""" + flags = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + +def configure_serial(fd: int, baud: int) -> None: + """Configure serial port parameters""" + attrs = termios.tcgetattr(fd) + attrs[0] &= ~(termios.IGNBRK | termios.BRKINT | termios.PARMRK | + termios.ISTRIP | termios.INLCR | termios.IGNCR | + termios.ICRNL | termios.IXON) + attrs[1] &= ~termios.OPOST + attrs[2] &= ~(termios.CSIZE | termios.PARENB) + attrs[2] |= (termios.CS8 | termios.CREAD | termios.CLOCAL) + attrs[3] &= ~(termios.ECHO | termios.ECHONL | termios.ICANON | + termios.ISIG | termios.IEXTEN) + attrs[6][termios.VMIN] = 0 + attrs[6][termios.VTIME] = 0 + speed = BAUD_MAP.get(baud, termios.B9600) + attrs[4] = attrs[5] = speed + termios.tcsetattr(fd, termios.TCSANOW, attrs) + termios.tcflush(fd, termios.TCIOFLUSH) + + +def configure_pty(fd: int) -> None: + """Configure PTY in raw mode""" + tty.setraw(fd, when=termios.TCSANOW) + attrs = termios.tcgetattr(fd) + attrs[3] &= ~(termios.ECHO | termios.ECHONL) + termios.tcsetattr(fd, termios.TCSANOW, attrs) + + +def parse_proc_cmdline() -> tuple[str, int]: + """ + Parse serial configuration from /proc/cmdline + + Returns: + (tty_name, baud) + + Raises: + ValueError: No valid console parameter found + """ + try: + with open(PROC_CMDLINE, 'r') as f: + cmdline = f.read().strip() + except Exception as e: + raise ValueError(f"Failed to read {PROC_CMDLINE}: {e}") + + pattern = r'console=([a-zA-Z0-9]+)(?:,([0-9]+))?' + matches = re.findall(pattern, cmdline) + + if not matches: + raise ValueError(f"No console= parameter found in {PROC_CMDLINE}") + + tty_name, baud_str = matches[-1] + baud = int(baud_str) if baud_str else DEFAULT_BAUD + + log.info(f"Parsed from /proc/cmdline: tty={tty_name}, baud={baud}") + return (tty_name, baud) + + +def calculate_filter_timeout(baud: int, multiplier: int = 3) -> float: + """Calculate frame filter timeout based on baud rate""" + char_time = 10.0 / baud + return char_time * MAX_FRAME_BUFFER_SIZE * multiplier + + +# ============================================================ +# PTY Bridge (runs as independent process, exec socat) +# ============================================================ + +def run_pty_bridge(link_id: str) -> int: + """ + PTY Bridge entry point (runs as independent process) + + This function: + 1. Gets udev prefix (uses default if not available) + 2. Executes socat to create a PTY pair, replacing current process + + The socat command creates two linked PTY devices: + - /dev/{prefix}{link_id}-PTS (for user applications like picocom) + - /dev/{prefix}{link_id}-PTM (for SerialProxy) + """ + log.info(f"[PTYBridge:{link_id}] Starting...") + + # Get udev prefix + prefix = get_udev_prefix() + + # Build PTY symlink paths + pts_path = f"/dev/{prefix}{link_id}{PTY_SYMLINK_SUFFIX_PTS}" + ptm_path = f"/dev/{prefix}{link_id}{PTY_SYMLINK_SUFFIX_PTM}" + + log.info(f"[PTYBridge:{link_id}] Creating PTY pair: {pts_path} <-> {ptm_path}") + + # Build socat command + # socat creates two linked PTYs with symlinks + socat_args = [ + 'socat', + '-d', '-d', # Debug output + f'PTY,raw,echo=0,link={pts_path},mode=666', + f'PTY,raw,echo=0,link={ptm_path},mode=666', + ] + + log.info(f"[PTYBridge:{link_id}] Exec: {' '.join(socat_args)}") + + # Replace current process with socat + try: + os.execvp('socat', socat_args) + except Exception as e: + log.error(f"[PTYBridge:{link_id}] Failed to exec socat: {e}") + return EXIT_SERVICE_START_FAILED + + # Should never reach here + return EXIT_SERVICE_START_FAILED + + +# ============================================================ +# Proxy Service (runs as independent process) +# ============================================================ + +class ProxyService: + """ + Proxy service for a single serial port (runs as independent process) + + Startup flow: + 1. Wait for udev prefix to be available (determines device path) + 2. Wait for CONFIG_DB configuration to be ready + 3. Wait for device symlink to exist + 4. Wait for PTM symlink to exist + 5. Initialize and run proxy main loop + + Does not listen for CONFIG_DB changes. Configuration changes are handled + by DCE service restarting this process via systemctl. + """ + + def __init__(self, link_id: str): + self.link_id = link_id + self.running = False + + # Configuration (obtained in wait phases) + self.baud: int = DEFAULT_BAUD + self.device_path: str = "" + self.ptm_path: str = "" + + # Proxy resources + self.state_db: Optional[DBConnector] = None + self.state_table: Optional[Table] = None + self.ser_fd: int = -1 + self.ptm_fd: int = -1 + self.filter: Optional[FrameFilter] = None + + # State tracking + self._current_oper_state: Optional[str] = None + self._last_heartbeat_time: float = 0.0 + self._last_data_activity: float = 0.0 + self._last_serial_data_time: float = 0.0 + + # Wakeup pipe for signal handling + self._wake_r: int = -1 + self._wake_w: int = -1 + + def run(self) -> int: + """ + Main entry point: execute phases in sequence + + Returns: + Exit code + """ + self.running = True + + # Phase 1: Get udev prefix + if not self._get_udev_prefix(): + return EXIT_SERVICE_START_FAILED + + # Phase 2: Wait for configuration + if not self._wait_for_config(): + return EXIT_SERVICE_START_FAILED + + # Phase 3: Wait for device + if not self._wait_for_device(): + return EXIT_SERVICE_START_FAILED + + # Phase 4: Wait for PTM + if not self._wait_for_ptm(): + return EXIT_SERVICE_START_FAILED + + # Phase 5: Initialize and run proxy + if not self._initialize(): + return EXIT_SERVICE_START_FAILED + + self._run_loop() + self._cleanup() + + return EXIT_SUCCESS + + def _get_udev_prefix(self) -> bool: + """Phase 1: Get udev prefix""" + log.info(f"[{self.link_id}] Phase 1: Getting udev prefix...") + + prefix = get_udev_prefix() + self.device_path = f"/dev/{prefix}{self.link_id}" + self.ptm_path = f"/dev/{prefix}{self.link_id}{PTY_SYMLINK_SUFFIX_PTM}" + log.info(f"[{self.link_id}] Udev prefix: {prefix}, device={self.device_path}") + return True + + def _wait_for_config(self) -> bool: + """Phase 2: Wait for CONFIG_DB configuration""" + log.info(f"[{self.link_id}] Phase 2: Waiting for CONFIG_DB config...") + + config_db = ConfigDBConnector() + config_db.connect(wait_for_init=True, retry_on=True) + + while self.running: + entry = config_db.get_entry(CONSOLE_PORT_TABLE, self.link_id) + if entry: + self.baud = int(entry.get('baud_rate', DEFAULT_BAUD)) + log.info(f"[{self.link_id}] Config loaded: baud={self.baud}") + return True + + log.debug(f"[{self.link_id}] Config not found, retrying in {RETRY_INTERVAL}s...") + time.sleep(RETRY_INTERVAL) + + return False + + def _wait_for_device(self) -> bool: + """Phase 3: Wait for device to exist""" + log.info(f"[{self.link_id}] Phase 3: Waiting for device {self.device_path}...") + + while self.running: + if os.path.exists(self.device_path): + log.info(f"[{self.link_id}] Device {self.device_path} is ready") + return True + + log.debug(f"[{self.link_id}] Device not found, retrying in {RETRY_INTERVAL}s...") + time.sleep(RETRY_INTERVAL) + + return False + + def _wait_for_ptm(self) -> bool: + """Phase 4: Wait for PTM symlink to exist""" + log.info(f"[{self.link_id}] Phase 4: Waiting for PTM {self.ptm_path}...") + + while self.running: + if os.path.exists(self.ptm_path): + log.info(f"[{self.link_id}] PTM {self.ptm_path} is ready") + return True + + log.debug(f"[{self.link_id}] PTM not found, retrying in {RETRY_INTERVAL}s...") + time.sleep(RETRY_INTERVAL) + + return False + + def _initialize(self) -> bool: + """Phase 5 init: open serial port, PTM, connect to Redis""" + try: + # Connect to STATE_DB + self.state_db = DBConnector("STATE_DB", 0) + self.state_table = Table(self.state_db, CONSOLE_PORT_TABLE) + + # Create wakeup pipe for signal handling + self._wake_r, self._wake_w = os.pipe() + set_nonblocking(self._wake_r) + + # Open serial port + self.ser_fd = os.open(self.device_path, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) + configure_serial(self.ser_fd, self.baud) + + # Open PTM + self.ptm_fd = os.open(self.ptm_path, os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) + + # Create frame filter + self.filter = FrameFilter( + on_frame=self._on_frame_received, + on_user_data=self._on_user_data_received, + ) + + self._last_heartbeat_time = time.monotonic() + self._last_data_activity = time.monotonic() + + log.info(f"[{self.link_id}] Initialized: {self.device_path} <-> {self.ptm_path}") + return True + + except Exception as e: + log.error(f"[{self.link_id}] Failed to initialize: {e}") + return False + + def _run_loop(self) -> None: + """Phase 5 main loop: select() to handle serial and PTM data""" + filter_timeout = calculate_filter_timeout(self.baud) + + while self.running: + try: + # Calculate select timeout + now = time.monotonic() + time_since_heartbeat = now - self._last_heartbeat_time + select_timeout = max(0.1, HEARTBEAT_TIMEOUT - time_since_heartbeat) + + # If filter has pending data, consider filter timeout + if self.filter and self.filter.has_pending_data(): + time_since_serial = now - self._last_serial_data_time + remaining_filter_timeout = filter_timeout - time_since_serial + if remaining_filter_timeout > 0: + select_timeout = min(select_timeout, remaining_filter_timeout) + else: + select_timeout = 0 + + # Use select to monitor serial port, PTM, and wakeup pipe + readable, _, _ = select.select( + [self.ser_fd, self.ptm_fd, self._wake_r], + [], [], + select_timeout + ) + + if not self.running: + break + + serial_data_received = False + for fd in readable: + if fd == self.ser_fd: + self._on_serial_read() + serial_data_received = True + elif fd == self.ptm_fd: + self._on_ptm_read() + elif fd == self._wake_r: + # Clear wakeup pipe + try: + os.read(self._wake_r, 1024) + except OSError: + pass + + # Check heartbeat timeout + self._check_heartbeat_timeout() + + # Check filter timeout + if self.filter and self.filter.has_pending_data() and not serial_data_received: + now = time.monotonic() + if now - self._last_serial_data_time >= filter_timeout: + self.filter.on_timeout() + + except Exception as e: + if self.running: + log.error(f"[{self.link_id}] Loop error: {e}") + time.sleep(0.1) + + def _on_serial_read(self) -> None: + """Serial data read callback""" + if not self.running or not self.filter: + return + try: + data = os.read(self.ser_fd, 4096) + if data: + now = time.monotonic() + self._last_data_activity = now + self._last_serial_data_time = now + self.filter.process(data) + except (BlockingIOError, OSError): + pass + + def _on_ptm_read(self) -> None: + """PTM data read callback""" + if not self.running: + return + try: + data = os.read(self.ptm_fd, 4096) + if data: + os.write(self.ser_fd, data) + except (BlockingIOError, OSError): + pass + + def _on_frame_received(self, frame: Frame) -> None: + """Frame received callback""" + if frame.is_heartbeat(): + self._last_heartbeat_time = time.monotonic() + self._update_state("Up") + log.debug(f"[{self.link_id}] Heartbeat received (seq={frame.seq})") + else: + log.warning(f"[{self.link_id}] Unknown frame type: {frame.frame_type}") + + def _on_user_data_received(self, data: bytes) -> None: + """User data callback""" + if self.ptm_fd >= 0: + try: + os.write(self.ptm_fd, data) + except OSError: + pass + + def _check_heartbeat_timeout(self) -> None: + """Check heartbeat timeout""" + now = time.monotonic() + time_since_heartbeat = now - self._last_heartbeat_time + + if time_since_heartbeat >= HEARTBEAT_TIMEOUT: + # Check if there is data activity + time_since_data = now - self._last_data_activity + if time_since_data < HEARTBEAT_TIMEOUT: + # Data activity detected, reset heartbeat time and continue waiting + log.debug(f"[{self.link_id}] Heartbeat timeout but data activity detected") + self._last_heartbeat_time = now + return + + # No heartbeat and no data activity + self._update_state("Unknown") + self._last_heartbeat_time = now # Reset to avoid continuous triggering + + def _update_state(self, oper_state: str) -> None: + """Update Redis state (only when state changes)""" + if oper_state == self._current_oper_state: + return + + self._current_oper_state = oper_state + timestamp = str(int(time.time())) + + try: + self.state_table.set( + self.link_id, + [("oper_state", oper_state), ("last_state_change", timestamp)] + ) + log.info(f"[{self.link_id}] State: {oper_state}") + except Exception as e: + log.error(f"[{self.link_id}] Failed to update state: {e}") + + def _cleanup_state(self) -> None: + """Cleanup STATE_DB state""" + try: + if self.state_table: + self.state_table.hdel(self.link_id, "oper_state") + self.state_table.hdel(self.link_id, "last_state_change") + log.info(f"[{self.link_id}] STATE_DB cleaned up") + except Exception as e: + log.error(f"[{self.link_id}] Failed to cleanup STATE_DB: {e}") + + def _cleanup(self) -> None: + """Cleanup all resources""" + # Cleanup STATE_DB + self._cleanup_state() + + # Flush remaining data + if self.filter and self.ptm_fd >= 0: + remaining = self.filter.flush() + if remaining: + try: + os.write(self.ptm_fd, remaining) + except OSError: + pass + + # Close file descriptors + for fd in (self._wake_r, self._wake_w, self.ser_fd, self.ptm_fd): + if fd >= 0: + try: + os.close(fd) + except OSError: + pass + + self._wake_r = self._wake_w = -1 + self.ser_fd = self.ptm_fd = -1 + log.info(f"[{self.link_id}] Cleanup complete") + + def stop(self) -> None: + """Stop service (signal handler)""" + self.running = False + + # Wake up select loop + if self._wake_w >= 0: + try: + os.write(self._wake_w, b'x') + except OSError: + pass + + +# ============================================================ +# DCE Service (manages pty-bridge and proxy services via systemctl) +# ============================================================ + +class DCEService: + """ + DCE side main service: manages pty-bridge and proxy services via systemctl + + Uses ConfigDBConnector's subscribe/listen pattern to monitor CONFIG_DB changes, + following SONiC daemon conventions. + + Service management order: + - Start: pty-bridge first, then proxy + - Stop: proxy first, then pty-bridge + - Restart: stop both, then start both + """ + + def __init__(self): + self.config_db: Optional[ConfigDBConnector] = None + self.active_links: Set[str] = set() # Currently active link_ids + self.running: bool = False + + # Cache for detecting configuration changes + self._config_cache: Dict[str, dict] = {} + + def start(self) -> bool: + """Start service""" + try: + self.config_db = ConfigDBConnector() + self.config_db.connect(wait_for_init=True, retry_on=True) + log.info("DCE: ConfigDB connected") + + self.running = True + return True + + except Exception as e: + log.error(f"DCE: Failed to start: {e}") + return False + + def register_callbacks(self) -> None: + """Register CONFIG_DB change callbacks""" + + def make_callback(func): + def callback(table, key, data): + if data is None: + op = "DEL" + data = {} + else: + op = "SET" + return func(key, op, data) + return callback + + self.config_db.subscribe(CONSOLE_PORT_TABLE, + make_callback(self.console_port_handler)) + self.config_db.subscribe(CONSOLE_SWITCH_TABLE, + make_callback(self.console_switch_handler)) + + log.info("DCE: Callbacks registered") + + def run(self) -> None: + """Main loop: listen for CONFIG_DB changes""" + try: + self.config_db.listen(init_data_handler=self._load_initial_config) + except KeyboardInterrupt: + log.info("DCE: Received keyboard interrupt") + except Exception as e: + if self.running: + log.error(f"DCE: Listen error: {e}") + + def stop(self) -> None: + """Stop service""" + self.running = False + + # Stop all services (proxy first, then pty-bridge) + for link_id in list(self.active_links): + self._stop_link(link_id) + self.active_links.clear() + + log.info("DCE: Stopped") + + def _load_initial_config(self, init_data: dict) -> None: + """Load initial configuration""" + log.info(f"DCE: Loading initial config: {list(init_data.keys())}") + self._sync() + + def console_port_handler(self, key: str, op: str, data: dict) -> None: + """CONSOLE_PORT table change handler""" + log.info(f"DCE: CONSOLE_PORT change: key={key}, op={op}, data={data}") + self._sync() + + def console_switch_handler(self, key: str, op: str, data: dict) -> None: + """CONSOLE_SWITCH table change handler""" + log.info(f"DCE: CONSOLE_SWITCH change: key={key}, op={op}, data={data}") + self._sync() + + def _check_feature_enabled(self) -> bool: + """Check if console switch feature is enabled""" + try: + entry = self.config_db.get_entry(CONSOLE_SWITCH_TABLE, "console_mgmt") + if entry: + if entry.get("enabled", "") == "yes": + return True + log.warning("DCE: Console switch feature is disabled") + return False + except Exception as e: + log.error(f"DCE: Failed to check feature status: {e}") + return False + + def _get_all_configs(self) -> Dict[str, dict]: + """Get all serial port configurations""" + configs = {} + try: + table_data = self.config_db.get_table(CONSOLE_PORT_TABLE) + for key, entry in table_data.items(): + key_str = str(key) if not isinstance(key, str) else key + configs[key_str] = { + "baud": int(entry.get("baud_rate", DEFAULT_BAUD)), + } + except Exception as e: + log.error(f"DCE: Failed to get configs: {e}") + return configs + + def _sync(self) -> None: + """Sync services with CONFIG_DB""" + # Check if feature is enabled + if not self._check_feature_enabled(): + if self.active_links: + log.info("DCE: Feature disabled, stopping all services") + for link_id in list(self.active_links): + self._stop_link(link_id) + self.active_links.clear() + self._config_cache.clear() + return + + # Get configuration + redis_configs = self._get_all_configs() + redis_ids = set(redis_configs.keys()) + current_ids = self.active_links.copy() + + # Stop links not in configuration + for link_id in current_ids - redis_ids: + self._stop_link(link_id) + self.active_links.discard(link_id) + self._config_cache.pop(link_id, None) + + # Start new links + for link_id in redis_ids - current_ids: + if self._start_link(link_id): + self.active_links.add(link_id) + self._config_cache[link_id] = redis_configs[link_id] + + # Restart links with changed configuration (e.g., baud rate) + for link_id in redis_ids & current_ids: + new_config = redis_configs[link_id] + old_config = self._config_cache.get(link_id, {}) + if new_config != old_config: + log.info(f"DCE: [{link_id}] Config changed: {old_config} -> {new_config}") + self._restart_link(link_id) + self._config_cache[link_id] = new_config + + log.info(f"DCE: Sync complete, {len(self.active_links)} links active") + + def _start_link(self, link_id: str) -> bool: + """Start pty-bridge and proxy for a link (pty-bridge first, then proxy)""" + log.info(f"DCE: [{link_id}] Starting services...") + + # Start pty-bridge first + if not self._start_pty_bridge(link_id): + log.error(f"DCE: [{link_id}] Failed to start pty-bridge") + return False + + # Then start proxy + if not self._start_proxy(link_id): + log.error(f"DCE: [{link_id}] Failed to start proxy, stopping pty-bridge") + self._stop_pty_bridge(link_id) + return False + + log.info(f"DCE: [{link_id}] All services started") + return True + + def _stop_link(self, link_id: str) -> bool: + """Stop proxy and pty-bridge for a link (proxy first, then pty-bridge)""" + log.info(f"DCE: [{link_id}] Stopping services...") + + # Stop proxy first + self._stop_proxy(link_id) + + # Then stop pty-bridge + self._stop_pty_bridge(link_id) + + log.info(f"DCE: [{link_id}] All services stopped") + return True + + def _restart_link(self, link_id: str) -> bool: + """Restart all services for a link""" + log.info(f"DCE: [{link_id}] Restarting services...") + + # Stop both (proxy first, then pty-bridge) + self._stop_proxy(link_id) + self._stop_pty_bridge(link_id) + + # Start both (pty-bridge first, then proxy) + if not self._start_pty_bridge(link_id): + return False + if not self._start_proxy(link_id): + self._stop_pty_bridge(link_id) + return False + + log.info(f"DCE: [{link_id}] All services restarted") + return True + + def _start_pty_bridge(self, link_id: str) -> bool: + """Start a pty-bridge service via systemctl""" + service_name = PTY_BRIDGE_SERVICE_TEMPLATE.format(link_id) + try: + result = subprocess.run( + ['systemctl', 'start', service_name], + capture_output=True, + text=True, + timeout=30 + ) + if result.returncode == 0: + log.info(f"DCE: [{link_id}] PTY bridge service started") + return True + else: + log.error(f"DCE: [{link_id}] Failed to start pty-bridge: {result.stderr}") + return False + except subprocess.TimeoutExpired: + log.error(f"DCE: [{link_id}] Timeout starting pty-bridge") + return False + except Exception as e: + log.error(f"DCE: [{link_id}] Error starting pty-bridge: {e}") + return False + + def _stop_pty_bridge(self, link_id: str) -> bool: + """Stop a pty-bridge service via systemctl""" + service_name = PTY_BRIDGE_SERVICE_TEMPLATE.format(link_id) + try: + result = subprocess.run( + ['systemctl', 'stop', service_name], + capture_output=True, + text=True, + timeout=30 + ) + if result.returncode == 0: + log.info(f"DCE: [{link_id}] PTY bridge service stopped") + return True + else: + log.error(f"DCE: [{link_id}] Failed to stop pty-bridge: {result.stderr}") + return False + except subprocess.TimeoutExpired: + log.error(f"DCE: [{link_id}] Timeout stopping pty-bridge") + return False + except Exception as e: + log.error(f"DCE: [{link_id}] Error stopping pty-bridge: {e}") + return False + + def _start_proxy(self, link_id: str) -> bool: + """Start a proxy service via systemctl""" + service_name = PROXY_SERVICE_TEMPLATE.format(link_id) + try: + result = subprocess.run( + ['systemctl', 'start', service_name], + capture_output=True, + text=True, + timeout=30 + ) + if result.returncode == 0: + log.info(f"DCE: [{link_id}] Proxy service started") + return True + else: + log.error(f"DCE: [{link_id}] Failed to start proxy: {result.stderr}") + return False + except subprocess.TimeoutExpired: + log.error(f"DCE: [{link_id}] Timeout starting proxy") + return False + except Exception as e: + log.error(f"DCE: [{link_id}] Error starting proxy: {e}") + return False + + def _stop_proxy(self, link_id: str) -> bool: + """Stop a proxy service via systemctl""" + service_name = PROXY_SERVICE_TEMPLATE.format(link_id) + try: + result = subprocess.run( + ['systemctl', 'stop', service_name], + capture_output=True, + text=True, + timeout=30 + ) + if result.returncode == 0: + log.info(f"DCE: [{link_id}] Proxy service stopped") + return True + else: + log.error(f"DCE: [{link_id}] Failed to stop proxy: {result.stderr}") + return False + except subprocess.TimeoutExpired: + log.error(f"DCE: [{link_id}] Timeout stopping proxy") + return False + except Exception as e: + log.error(f"DCE: [{link_id}] Error stopping proxy: {e}") + return False + + +# ============================================================ +# DTE Service +# ============================================================ + +class DTEService: + """ + DTE side service: sends heartbeat frames + """ + + def __init__(self, tty_name: str, baud: int): + self.tty_name = tty_name + self.baud = baud + self.device_path = f"/dev/{tty_name}" + + self.config_db: Optional[ConfigDBConnector] = None + + self.running: bool = False + self.enabled: bool = False + self.seq: int = 0 + + self._heartbeat_thread: Optional[threading.Thread] = None + self._heartbeat_stop: threading.Event = threading.Event() + + def start(self) -> bool: + """Start service""" + try: + self.config_db = ConfigDBConnector() + self.config_db.connect(wait_for_init=True, retry_on=True) + log.info("DTE: ConfigDB connected") + + self.running = True + log.info(f"DTE: Service initialized: {self.device_path}") + return True + + except Exception as e: + log.error(f"DTE: Failed to start: {e}") + return False + + def register_callbacks(self) -> None: + """Register CONFIG_DB change callbacks""" + + def make_callback(func): + def callback(table, key, data): + if data is None: + op = "DEL" + data = {} + else: + op = "SET" + return func(key, op, data) + return callback + + self.config_db.subscribe(CONSOLE_SWITCH_TABLE, + make_callback(self.console_switch_handler)) + + log.info("DTE: Callbacks registered") + + def run(self) -> None: + """Main loop""" + try: + self.config_db.listen(init_data_handler=self._load_initial_config) + except KeyboardInterrupt: + log.info("DTE: Received keyboard interrupt") + except Exception as e: + if self.running: + log.error(f"DTE: Listen error: {e}") + + def stop(self) -> None: + """Stop service""" + self.running = False + self._stop_heartbeat() + log.info("DTE: Stopped") + + def _load_initial_config(self, init_data: dict) -> None: + """Load initial configuration""" + log.info(f"DTE: Loading initial config: {list(init_data.keys())}") + + self.enabled = self._check_enabled() + log.info(f"DTE: Initial enabled state: {self.enabled}") + + if self.enabled: + self._start_heartbeat() + + def console_switch_handler(self, key: str, op: str, data: dict) -> None: + """CONSOLE_SWITCH table change handler""" + log.info(f"DTE: CONSOLE_SWITCH change: key={key}, op={op}") + + new_enabled = self._check_enabled() + if new_enabled != self.enabled: + log.info(f"DTE: Enabled state changed: {self.enabled} -> {new_enabled}") + self.enabled = new_enabled + + if self.enabled: + self._start_heartbeat() + else: + self._stop_heartbeat() + + def _check_enabled(self) -> bool: + """Check the enabled field of controlled_device""" + try: + entry = self.config_db.get_entry(CONSOLE_SWITCH_TABLE, "controlled_device") + if entry: + return entry.get("enabled", "") == "yes" + return False + except Exception as e: + log.warning(f"DTE: Failed to check enabled status: {e}") + return False + + def _start_heartbeat(self) -> None: + """Start heartbeat thread""" + if self._heartbeat_thread and self._heartbeat_thread.is_alive(): + return + + self._heartbeat_stop.clear() + self._heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True) + self._heartbeat_thread.start() + log.info("DTE: Heartbeat thread started") + + def _stop_heartbeat(self) -> None: + """Stop heartbeat thread""" + self._heartbeat_stop.set() + if self._heartbeat_thread and self._heartbeat_thread.is_alive(): + self._heartbeat_thread.join(timeout=2.0) + self._heartbeat_thread = None + log.info("DTE: Heartbeat thread stopped") + + def _heartbeat_loop(self) -> None: + """Heartbeat send loop""" + while not self._heartbeat_stop.is_set(): + self._send_heartbeat() + self._heartbeat_stop.wait(HEARTBEAT_INTERVAL) + + def _send_heartbeat(self) -> None: + """Send heartbeat frame""" + frame = Frame.create_heartbeat(self.seq) + frame_bytes = frame.build() + + try: + fd = os.open(self.device_path, os.O_WRONLY | os.O_NOCTTY | os.O_NONBLOCK) + try: + os.write(fd, frame_bytes) + log.debug(f"DTE: Sent heartbeat (seq={self.seq})") + log_binary_data(frame_bytes, "DTE→Serial") + self.seq = (self.seq + 1) % 256 + finally: + os.close(fd) + except Exception as e: + log.error(f"DTE: Failed to send heartbeat: {e}") + + +# ============================================================ +# Main Program Entry +# ============================================================ + +def signal_handler(signum, frame): + """Global signal handler""" + log.info(f"Received signal {signum}") + raise SystemExit(0) + + +def run_dce() -> int: + """DCE service entry point""" + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGHUP, signal_handler) + + service = DCEService() + + if not service.start(): + return EXIT_SERVICE_START_FAILED + + try: + service.register_callbacks() + service.run() + except SystemExit: + pass + finally: + service.stop() + + return EXIT_SUCCESS + + +def run_proxy(link_id: str) -> int: + """Proxy service entry point (runs as independent process)""" + service = ProxyService(link_id) + + # Setup signal handler to stop service gracefully + def stop_handler(signum, frame): + log.info(f"Received signal {signum}") + service.stop() + + signal.signal(signal.SIGINT, stop_handler) + signal.signal(signal.SIGTERM, stop_handler) + signal.signal(signal.SIGHUP, stop_handler) + + return service.run() + + +def run_dte(tty_name: Optional[str], baud: Optional[int]) -> int: + """DTE service entry point""" + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + signal.signal(signal.SIGHUP, signal_handler) + + if tty_name: + baud = baud if baud else DEFAULT_BAUD + log.info(f"DTE: Using command line args: tty={tty_name}, baud={baud}") + else: + try: + tty_name, baud = parse_proc_cmdline() + except ValueError as e: + log.error(f"DTE: Failed to get serial config: {e}") + return EXIT_SERIAL_CONFIG_ERROR + + service = DTEService(tty_name, baud) + + if not service.start(): + return EXIT_SERVICE_START_FAILED + + try: + service.register_callbacks() + service.run() + except SystemExit: + pass + finally: + service.stop() + + return EXIT_SUCCESS + + +def main(): + """ + Unified entry point + + Usage: + console-monitor pty-bridge # Run PTY bridge for a port + console-monitor dce [-l debug] # Run DCE service + console-monitor proxy [-l debug] # Run proxy for a specific port + console-monitor dte [-l debug] [tty] [baud] # Run DTE service + """ + # Create main parser with subcommands + parser = argparse.ArgumentParser( + description='Console Monitor Service', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=''' +Examples: + console-monitor pty-bridge 1 # Run PTY bridge for link 1 (exec socat) + console-monitor dce -l debug # Run DCE service with debug logging + console-monitor proxy 1 # Run proxy for link 1 + console-monitor proxy -l debug 2 # Run proxy for link 2 with debug logging + console-monitor dte # Run DTE service (auto-detect from /proc/cmdline) + console-monitor dte -l debug ttyS0 9600 # Run DTE service with specified TTY +''' + ) + + subparsers = parser.add_subparsers(dest='mode', help='Service mode') + + # PTY Bridge subcommand + pty_bridge_parser = subparsers.add_parser('pty-bridge', help='Run PTY bridge for a port (exec socat)') + pty_bridge_parser.add_argument('-l', '--log-level', + choices=['debug', 'info', 'warning', 'error', 'critical'], + default='info', help='Set log level (default: info)') + pty_bridge_parser.add_argument('link_id', help='Link ID (console port number)') + + # DCE subcommand + dce_parser = subparsers.add_parser('dce', help='Run DCE (Console Server) service') + dce_parser.add_argument('-l', '--log-level', + choices=['debug', 'info', 'warning', 'error', 'critical'], + default='info', help='Set log level (default: info)') + + # Proxy subcommand + proxy_parser = subparsers.add_parser('proxy', help='Run proxy for a specific serial port') + proxy_parser.add_argument('-l', '--log-level', + choices=['debug', 'info', 'warning', 'error', 'critical'], + default='info', help='Set log level (default: info)') + proxy_parser.add_argument('link_id', help='Link ID (console port number)') + + # DTE subcommand + dte_parser = subparsers.add_parser('dte', help='Run DTE (SONiC Switch) service') + dte_parser.add_argument('-l', '--log-level', + choices=['debug', 'info', 'warning', 'error', 'critical'], + default='info', help='Set log level (default: info)') + dte_parser.add_argument('tty_name', nargs='?', default=None, help='TTY device name') + dte_parser.add_argument('baud', nargs='?', type=int, default=None, help='Baud rate') + + args = parser.parse_args() + + if not args.mode: + parser.print_help() + sys.exit(EXIT_INVALID_MODE) + + # Set log level + set_log_level(args.log_level) + + # Dispatch to appropriate service + if args.mode == "pty-bridge": + sys.exit(run_pty_bridge(args.link_id)) + elif args.mode == "dce": + sys.exit(run_dce()) + elif args.mode == "proxy": + sys.exit(run_proxy(args.link_id)) + elif args.mode == "dte": + sys.exit(run_dte(args.tty_name, args.baud)) + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index 0b7252ed..62d28772 100644 --- a/setup.py +++ b/setup.py @@ -50,7 +50,8 @@ 'scripts/wait-for-sonic-core.sh', 'scripts/gnoi_shutdown_daemon.py', 'scripts/sonic-host-server', - 'scripts/ldap.py' + 'scripts/ldap.py', + 'scripts/console-monitor' ], install_requires = [ 'dbus-python', diff --git a/tests/console_monitor/__init__.py b/tests/console_monitor/__init__.py new file mode 100644 index 00000000..fcef1f8d --- /dev/null +++ b/tests/console_monitor/__init__.py @@ -0,0 +1 @@ +# Console Monitor (consoled) Tests diff --git a/tests/console_monitor/console_monitor_test.py b/tests/console_monitor/console_monitor_test.py new file mode 100644 index 00000000..edf928c2 --- /dev/null +++ b/tests/console_monitor/console_monitor_test.py @@ -0,0 +1,2858 @@ +""" +Unit tests for console-monitor (Console Monitor Service). + +Tests follow SONiC testing conventions: +- MockConfigDb for CONFIG_DB simulation +- Parameterized test cases +- pyfakefs for filesystem operations + +Test scenarios: +- DCE service initialization with multiple console links +- Configuration parsing and proxy creation +- Feature enable/disable handling +""" + +import os +import sys +import time +import copy +import termios +from unittest import TestCase, mock +from parameterized import parameterized + +try: + from sonic_py_common.general import load_module_from_source +except ImportError: + def load_module_from_source(module_name, file_path): + """ + This function will load the Python source file specified by + as a module named and return an instance of the module + """ + module = None + + # TODO: Remove this check once we no longer support Python 2 + if sys.version_info.major == 3: + import importlib.machinery + import importlib.util + loader = importlib.machinery.SourceFileLoader(module_name, file_path) + spec = importlib.util.spec_from_loader(loader.name, loader) + module = importlib.util.module_from_spec(spec) + loader.exec_module(module) + else: + import imp + module = imp.load_source(module_name, file_path) + + sys.modules[module_name] = module + + return module + +from .test_vectors import ( + DCE_TEST_VECTOR, + DTE_TEST_VECTOR, + DCE_3_LINKS_ENABLED_CONFIG_DB, + DCE_FEATURE_DISABLED_CONFIG_DB, + CONSOLE_PORT_3_LINKS, + DTE_ENABLED_CONFIG_DB, + DTE_DISABLED_CONFIG_DB, + PROC_CMDLINE_SINGLE_CONSOLE, + PROC_CMDLINE_MULTIPLE_CONSOLE, + PROC_CMDLINE_NO_BAUD, + PROC_CMDLINE_NO_CONSOLE, +) +from tests.common.mock_configdb import MockConfigDb, MockDBConnector + + +# ============================================================ +# Path setup and module loading +# ============================================================ + +test_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +modules_path = os.path.dirname(test_path) +scripts_path = os.path.join(modules_path, 'scripts') +sys.path.insert(0, modules_path) + +# Load console-monitor module from scripts directory +console_monitor_path = os.path.join(scripts_path, 'console-monitor') +console_monitor = load_module_from_source('console_monitor', console_monitor_path) + +# Replace swsscommon classes with mocks (redundant but kept for clarity) +console_monitor.ConfigDBConnector = MockConfigDb +console_monitor.DBConnector = MockDBConnector +console_monitor.Table = mock.Mock() + + +# ============================================================ +# Mock Classes for systemctl operations +# ============================================================ + +class MockSubprocess: + """Mock subprocess.run for systemctl commands.""" + + started_services = [] + stopped_services = [] + fail_start = False + fail_stop = False + + @classmethod + def reset(cls): + """Reset all tracking for test isolation.""" + cls.started_services = [] + cls.stopped_services = [] + cls.fail_start = False + cls.fail_stop = False + + @classmethod + def mock_run(cls, args, capture_output=False, text=False, timeout=None): + """Mock subprocess.run for systemctl commands.""" + result = mock.Mock() + result.returncode = 0 + result.stdout = "" + result.stderr = "" + + if len(args) >= 3 and args[0] == 'systemctl': + action = args[1] + service = args[2] + + if action == 'start': + if cls.fail_start: + result.returncode = 1 + result.stderr = "Failed to start" + else: + cls.started_services.append(service) + elif action == 'stop': + if cls.fail_stop: + result.returncode = 1 + result.stderr = "Failed to stop" + else: + cls.stopped_services.append(service) + + return result + + @classmethod + def get_started_count(cls) -> int: + """Get number of started services.""" + return len(cls.started_services) + + @classmethod + def get_stopped_count(cls) -> int: + """Get number of stopped services.""" + return len(cls.stopped_services) + + +# ============================================================ +# DCE Service Tests +# ============================================================ + +class TestDCEService(TestCase): + """Test cases for DCE (Console Server) service.""" + + @classmethod + def setUpClass(cls): + """Set up test fixtures for all tests in this class.""" + pass + + def setUp(self): + """Set up test fixtures for each test.""" + MockSubprocess.reset() + MockConfigDb.CONFIG_DB = None + + def tearDown(self): + """Clean up after each test.""" + MockSubprocess.reset() + MockConfigDb.CONFIG_DB = None + + def test_dce_service_initialization(self): + """Test DCE service basic initialization.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + + # Mock the start to avoid actual DB connections + with mock.patch.object(service, 'config_db', MockConfigDb()): + service.config_db = MockConfigDb() + service.running = True + + # Verify service can be created + self.assertIsNotNone(service) + self.assertEqual(service.active_links, set()) + + def test_dce_check_feature_enabled_when_enabled(self): + """Test _check_feature_enabled returns True when feature is enabled.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + result = service._check_feature_enabled() + + self.assertTrue(result) + + def test_dce_check_feature_enabled_when_disabled(self): + """Test _check_feature_enabled returns False when feature is disabled.""" + MockConfigDb.set_config_db(DCE_FEATURE_DISABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + result = service._check_feature_enabled() + + self.assertFalse(result) + + def test_dce_get_all_configs_parses_correctly(self): + """Test _get_all_configs correctly parses CONSOLE_PORT table.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + configs = service._get_all_configs() + + # Verify 3 ports are parsed + self.assertEqual(len(configs), 3) + + # Verify port 1 config (new format only has baud) + self.assertIn("1", configs) + self.assertEqual(configs["1"]["baud"], 9600) + + # Verify port 2 config + self.assertIn("2", configs) + self.assertEqual(configs["2"]["baud"], 115200) + + # Verify port 3 config + self.assertIn("3", configs) + self.assertEqual(configs["3"]["baud"], 9600) + + def test_dce_sync_starts_services_when_enabled(self): + """Test _sync starts pty-bridge and proxy services for each configured port when feature is enabled.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + service.active_links = set() + service._config_cache = {} + + # Replace subprocess.run with mock + with mock.patch('subprocess.run', MockSubprocess.mock_run): + service._sync() + + # Verify 3 links are active + self.assertEqual(len(service.active_links), 3) + + # Verify services were started (2 services per link: pty-bridge and proxy) + self.assertEqual(MockSubprocess.get_started_count(), 6) + + # Verify link IDs + self.assertIn("1", service.active_links) + self.assertIn("2", service.active_links) + self.assertIn("3", service.active_links) + + def test_dce_sync_starts_no_services_when_disabled(self): + """Test _sync starts no services when feature is disabled.""" + MockConfigDb.set_config_db(DCE_FEATURE_DISABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + service.active_links = set() + service._config_cache = {} + + # Replace subprocess.run with mock + with mock.patch('subprocess.run', MockSubprocess.mock_run): + service._sync() + + # Verify no links are active + self.assertEqual(len(service.active_links), 0) + self.assertEqual(MockSubprocess.get_started_count(), 0) + + def test_dce_sync_stops_services_when_port_deleted(self): + """Test _sync stops services when port is deleted from config.""" + # Use deepcopy to avoid modifying the original test vector + config_db = copy.deepcopy(DCE_3_LINKS_ENABLED_CONFIG_DB) + MockConfigDb.set_config_db(config_db) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + service.active_links = set() + service._config_cache = {} + + # First sync - create 3 links + with mock.patch('subprocess.run', MockSubprocess.mock_run): + service._sync() + self.assertEqual(len(service.active_links), 3) + + # Now remove port 2 from config (modifies the copy, not original) + del MockConfigDb.CONFIG_DB["CONSOLE_PORT"]["2"] + + # Reset mock counters + MockSubprocess.reset() + + # Second sync - should stop services for port 2 + service._sync() + + self.assertEqual(len(service.active_links), 2) + self.assertNotIn("2", service.active_links) + self.assertIn("1", service.active_links) + self.assertIn("3", service.active_links) + + # Verify stop was called for port 2 (2 services) + self.assertEqual(MockSubprocess.get_stopped_count(), 2) + + def test_dce_console_port_handler_triggers_sync(self): + """Test console_port_handler triggers _sync on config change.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + service.active_links = set() + service._config_cache = {} + + with mock.patch.object(service, '_sync') as mock_sync: + service.console_port_handler("1", "SET", {"baud_rate": "9600"}) + mock_sync.assert_called_once() + + def test_dce_console_switch_handler_triggers_sync(self): + """Test console_switch_handler triggers _sync on feature toggle.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + service.active_links = set() + service._config_cache = {} + + with mock.patch.object(service, '_sync') as mock_sync: + service.console_switch_handler("console_mgmt", "SET", {"enabled": "yes"}) + mock_sync.assert_called_once() + + def test_dce_receive_one_frame_splitted_in_two_reads(self): + """Test DCE service can receive a single frame split across two reads.""" + received_frames = [] + + def on_frame(frame): + received_frames.append(frame) + + filter = console_monitor.FrameFilter(on_frame=on_frame) + + # Create a heartbeat frame + heartbeat = console_monitor.Frame.create_heartbeat(seq=10) + frame_bytes = heartbeat.build() + + # Split the frame into two parts + split_index = len(frame_bytes) // 2 + part1 = frame_bytes[:split_index] + part2 = frame_bytes[split_index:] + + # Process first part + filter.process(part1) + self.assertEqual(len(received_frames), 0) # No complete frame yet + + # Process second part + filter.process(part2) + self.assertEqual(len(received_frames), 1) # Now we should have one frame + self.assertTrue(received_frames[0].is_heartbeat()) + self.assertEqual(received_frames[0].seq, 10) + + +# ============================================================ +# Frame Protocol Tests +# ============================================================ + +class TestFrameProtocol(TestCase): + """Test cases for frame protocol implementation.""" + + def test_crc16_modbus(self): + """Test CRC-16/MODBUS calculation.""" + # Known test vector + data = bytes([0x01, 0x00, 0x00, 0x01, 0x00]) + crc = console_monitor.crc16_modbus(data) + + # CRC should be a 16-bit value + self.assertIsInstance(crc, int) + self.assertGreaterEqual(crc, 0) + self.assertLessEqual(crc, 0xFFFF) + + def test_escape_data_escapes_special_chars(self): + """Test escape_data escapes SOF, EOF, and DLE characters.""" + # Data containing special characters + data = bytes([0x05, 0x00, 0x10, 0x41]) # SOF, EOF, DLE, 'A' + + escaped = console_monitor.escape_data(data) + + # Each special char should be preceded by DLE + # Expected: DLE SOF DLE EOF DLE DLE A + self.assertEqual(len(escaped), 7) + + def test_unescape_data_restores_original(self): + """Test unescape_data restores original data.""" + original = bytes([0x05, 0x00, 0x10, 0x41]) + escaped = console_monitor.escape_data(original) + unescaped = console_monitor.unescape_data(escaped) + + self.assertEqual(unescaped, original) + + def test_frame_build_creates_valid_frame(self): + """Test Frame.build() creates properly formatted frame.""" + frame = console_monitor.Frame.create_heartbeat(seq=1) + frame_bytes = frame.build() + + # Frame should start with SOF sequence + self.assertEqual(frame_bytes[:3], console_monitor.SOF_SEQUENCE) + + # Frame should end with EOF sequence + self.assertEqual(frame_bytes[-3:], console_monitor.EOF_SEQUENCE) + + def test_frame_parse_roundtrip(self): + """Test Frame can be built and parsed back.""" + original = console_monitor.Frame.create_heartbeat(seq=42) + frame_bytes = original.build() + + # Extract content between SOF and EOF + content = frame_bytes[3:-3] + + parsed = console_monitor.Frame.parse(content) + + self.assertIsNotNone(parsed) + self.assertEqual(parsed.seq, 42) + self.assertTrue(parsed.is_heartbeat()) + + def test_frame_parse_rejects_bad_crc(self): + """Test Frame.parse() rejects frame with bad CRC.""" + frame = console_monitor.Frame.create_heartbeat(seq=1) + frame_bytes = frame.build() + + # Corrupt the content (between SOF and EOF) + content = bytearray(frame_bytes[3:-3]) + content[0] ^= 0xFF # Flip bits + + parsed = console_monitor.Frame.parse(bytes(content)) + + self.assertIsNone(parsed) + + +# ============================================================ +# FrameFilter Tests +# ============================================================ + +class TestFrameFilter(TestCase): + """Test cases for FrameFilter class.""" + + def test_frame_filter_detects_heartbeat(self): + """Test FrameFilter correctly identifies heartbeat frame.""" + received_frames = [] + + def on_frame(frame): + received_frames.append(frame) + + filter = console_monitor.FrameFilter(on_frame=on_frame) + + # Build a heartbeat frame + heartbeat = console_monitor.Frame.create_heartbeat(seq=5) + frame_bytes = heartbeat.build() + + # Feed to filter + filter.process(frame_bytes) + + # Should have received one frame + self.assertEqual(len(received_frames), 1) + self.assertTrue(received_frames[0].is_heartbeat()) + self.assertEqual(received_frames[0].seq, 5) + + def test_frame_filter_passes_user_data(self): + """Test FrameFilter passes non-frame data to user_data callback.""" + user_data_chunks = [] + + def on_user_data(data): + user_data_chunks.append(data) + + filter = console_monitor.FrameFilter(on_user_data=on_user_data) + + # Send regular ASCII data + filter.process(b"Hello World") + filter.on_timeout() # Flush pending data + + # Should have received user data + self.assertEqual(len(user_data_chunks), 1) + self.assertEqual(user_data_chunks[0], b"Hello World") + + def test_frame_filter_separates_frame_and_data(self): + """Test FrameFilter correctly separates frame from user data.""" + received_frames = [] + user_data_chunks = [] + + def on_frame(frame): + received_frames.append(frame) + + def on_user_data(data): + user_data_chunks.append(data) + + filter = console_monitor.FrameFilter(on_frame=on_frame, on_user_data=on_user_data) + + # Build mixed data: user data + heartbeat + user data + heartbeat = console_monitor.Frame.create_heartbeat(seq=1) + mixed_data = b"Before" + heartbeat.build() + b"After" + + filter.process(mixed_data) + filter.on_timeout() + + # Should have received one frame + self.assertEqual(len(received_frames), 1) + + # Should have received user data (before the frame) + self.assertGreater(len(user_data_chunks), 0) + + +# ============================================================ +# DTE Service Tests +# ============================================================ + +class TestDTEService(TestCase): + """Test cases for DTE (SONiC Switch) service.""" + + def setUp(self): + """Set up test fixtures for each test.""" + MockConfigDb.CONFIG_DB = None + + def tearDown(self): + """Clean up after each test.""" + MockConfigDb.CONFIG_DB = None + + def test_dte_service_initialization(self): + """Test DTE service can be initialized with TTY and baud.""" + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + + self.assertEqual(service.tty_name, "ttyS0") + self.assertEqual(service.baud, 9600) + self.assertEqual(service.device_path, "/dev/ttyS0") + self.assertFalse(service.running) + self.assertFalse(service.enabled) + self.assertEqual(service.seq, 0) + + def test_dte_check_enabled_returns_true(self): + """Test _check_enabled() returns True when controlled_device.enabled=yes.""" + MockConfigDb.set_config_db(DTE_ENABLED_CONFIG_DB) + + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.config_db = MockConfigDb() + + result = service._check_enabled() + + self.assertTrue(result) + + def test_dte_check_enabled_returns_false(self): + """Test _check_enabled() returns False when controlled_device.enabled=no.""" + MockConfigDb.set_config_db(DTE_DISABLED_CONFIG_DB) + + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.config_db = MockConfigDb() + + result = service._check_enabled() + + self.assertFalse(result) + + def test_dte_check_enabled_returns_false_when_missing(self): + """Test _check_enabled() returns False when controlled_device entry is missing.""" + MockConfigDb.set_config_db({"CONSOLE_SWITCH": {}}) + + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.config_db = MockConfigDb() + + result = service._check_enabled() + + self.assertFalse(result) + + def test_dte_start_heartbeat_when_enabled(self): + """Test heartbeat thread starts when feature is enabled.""" + MockConfigDb.set_config_db(DTE_ENABLED_CONFIG_DB) + + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.config_db = MockConfigDb() + service.ser_fd = 1 # Mock file descriptor + service.running = True + + # Call _load_initial_config which should start heartbeat if enabled + with mock.patch.object(service, '_start_heartbeat') as mock_start: + service._load_initial_config({}) + mock_start.assert_called_once() + + def test_dte_no_heartbeat_when_disabled(self): + """Test heartbeat thread does not start when feature is disabled.""" + MockConfigDb.set_config_db(DTE_DISABLED_CONFIG_DB) + + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.config_db = MockConfigDb() + service.ser_fd = 1 + service.running = True + + with mock.patch.object(service, '_start_heartbeat') as mock_start: + service._load_initial_config({}) + mock_start.assert_not_called() + + def test_dte_stop_heartbeat_when_disabled(self): + """Test heartbeat thread stops when feature is disabled.""" + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.enabled = True # Currently enabled + + # Mock config change to disabled + MockConfigDb.set_config_db(DTE_DISABLED_CONFIG_DB) + service.config_db = MockConfigDb() + + with mock.patch.object(service, '_stop_heartbeat') as mock_stop: + service.console_switch_handler("controlled_device", "SET", {"enabled": "no"}) + mock_stop.assert_called_once() + + def test_dte_console_switch_handler_toggles_heartbeat(self): + """Test console_switch_handler toggles heartbeat on/off based on config.""" + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.enabled = False # Currently disabled + + # Mock config change to enabled + MockConfigDb.set_config_db(DTE_ENABLED_CONFIG_DB) + service.config_db = MockConfigDb() + + with mock.patch.object(service, '_start_heartbeat') as mock_start: + service.console_switch_handler("controlled_device", "SET", {"enabled": "yes"}) + mock_start.assert_called_once() + self.assertTrue(service.enabled) + + def test_dte_heartbeat_frame_sequence_increments(self): + """Test heartbeat sequence number increments correctly.""" + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.ser_fd = -1 # Invalid fd, will skip actual write + service.seq = 0 + + # Manually increment sequence like _send_heartbeat does + initial_seq = service.seq + service.seq = (service.seq + 1) % 256 + + self.assertEqual(initial_seq, 0) + self.assertEqual(service.seq, 1) + + def test_dte_heartbeat_sequence_wraps_at_256(self): + """Test heartbeat sequence number wraps at 256.""" + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.seq = 255 + + # Wrap around + service.seq = (service.seq + 1) % 256 + + self.assertEqual(service.seq, 0) + + +# ============================================================ +# DTE Utility Function Tests +# ============================================================ + +class TestDTEUtilityFunctions(TestCase): + """Test cases for DTE utility functions like parse_proc_cmdline.""" + + def test_parse_proc_cmdline_single_console(self): + """Test parse_proc_cmdline with single console parameter.""" + with mock.patch('builtins.open', mock.mock_open(read_data=PROC_CMDLINE_SINGLE_CONSOLE)): + tty_name, baud = console_monitor.parse_proc_cmdline() + + self.assertEqual(tty_name, "ttyS0") + self.assertEqual(baud, 9600) + + def test_parse_proc_cmdline_multiple_console(self): + """Test parse_proc_cmdline uses last console parameter.""" + with mock.patch('builtins.open', mock.mock_open(read_data=PROC_CMDLINE_MULTIPLE_CONSOLE)): + tty_name, baud = console_monitor.parse_proc_cmdline() + + # Should use the last console= parameter + self.assertEqual(tty_name, "ttyS1") + self.assertEqual(baud, 115200) + + def test_parse_proc_cmdline_no_baud_uses_default(self): + """Test parse_proc_cmdline uses default baud when not specified.""" + with mock.patch('builtins.open', mock.mock_open(read_data=PROC_CMDLINE_NO_BAUD)): + tty_name, baud = console_monitor.parse_proc_cmdline() + + self.assertEqual(tty_name, "ttyS0") + self.assertEqual(baud, console_monitor.DEFAULT_BAUD) # 9600 + + def test_parse_proc_cmdline_no_console_raises_error(self): + """Test parse_proc_cmdline raises ValueError when no console parameter.""" + with mock.patch('builtins.open', mock.mock_open(read_data=PROC_CMDLINE_NO_CONSOLE)): + with self.assertRaises(ValueError) as context: + console_monitor.parse_proc_cmdline() + + self.assertIn("No console= parameter found", str(context.exception)) + + +# ============================================================ +# Integration-like Tests +# ============================================================ + +class TestDCEIntegration(TestCase): + """Integration-like tests for DCE service with mocked I/O.""" + + def setUp(self): + """Set up test fixtures.""" + MockSubprocess.reset() + MockConfigDb.CONFIG_DB = None + + def tearDown(self): + """Clean up after tests.""" + MockSubprocess.reset() + MockConfigDb.CONFIG_DB = None + + @parameterized.expand(DCE_TEST_VECTOR) + def test_dce_service_creation(self, test_name, config_db, expected_link_count): + # Reset before each parameterized test + MockSubprocess.reset() + """Parameterized test for DCE service creation based on config.""" + MockConfigDb.set_config_db(config_db) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + service.active_links = set() + service._config_cache = {} + + with mock.patch('subprocess.run', MockSubprocess.mock_run): + service._sync() + + self.assertEqual( + len(service.active_links), + expected_link_count, + f"Expected {expected_link_count} links for {test_name}, got {len(service.active_links)}" + ) + + def test_dce_full_initialization_flow(self): + """Test complete DCE service initialization flow.""" + # Reset mocks for isolation + MockSubprocess.reset() + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + + # Mock all external dependencies + with mock.patch('subprocess.run', MockSubprocess.mock_run): + with mock.patch.object(MockConfigDb, 'connect'): + # Simulate start + service.config_db = MockConfigDb() + service.running = True + service.active_links = set() + service._config_cache = {} + + # Simulate initial config load (like init_data_handler) + service._load_initial_config({ + "CONSOLE_PORT": CONSOLE_PORT_3_LINKS, + "CONSOLE_SWITCH": {"console_mgmt": {"enabled": "yes"}} + }) + + # Verify 3 links active + self.assertEqual(len(service.active_links), 3) + # Each link has 2 services (pty-bridge and proxy) + self.assertEqual(MockSubprocess.get_started_count(), 6) + + +# ============================================================ +# ProxyService Tests +# ============================================================ + +class TestProxyService(TestCase): + """Test cases for ProxyService class.""" + + def test_proxy_service_initialization(self): + """Test ProxyService basic initialization.""" + proxy = console_monitor.ProxyService(link_id="1") + + self.assertEqual(proxy.link_id, "1") + self.assertEqual(proxy.baud, console_monitor.DEFAULT_BAUD) + self.assertEqual(proxy.device_path, "") + self.assertEqual(proxy.ptm_path, "") + self.assertFalse(proxy.running) + self.assertEqual(proxy.ser_fd, -1) + self.assertEqual(proxy.ptm_fd, -1) + + def test_proxy_service_calculate_filter_timeout(self): + """Test filter timeout calculation based on baud rate.""" + # At 9600 baud, char time = 10/9600 ≈ 0.00104s + # With 64 buffer and 3x multiplier: 0.00104 * 64 * 3 ≈ 0.2s + timeout_9600 = console_monitor.calculate_filter_timeout(9600) + self.assertGreater(timeout_9600, 0.01) + self.assertLess(timeout_9600, 0.5) + + # At 115200 baud, should be much smaller + timeout_115200 = console_monitor.calculate_filter_timeout(115200) + self.assertLess(timeout_115200, 0.05) + + # Higher baud = shorter timeout + self.assertGreater(timeout_9600, timeout_115200) + + def test_proxy_service_stop_without_start(self): + """Test ProxyService.stop() is safe when not started.""" + proxy = console_monitor.ProxyService(link_id="1") + + # Should not raise any exceptions + proxy.stop() + + self.assertFalse(proxy.running) + + +# ============================================================ +# FrameFilter Comprehensive Tests +# ============================================================ + +class TestFrameFilterComprehensive(TestCase): + """Comprehensive tests for FrameFilter class.""" + + def setUp(self): + """Set up test fixtures.""" + self.frames_received = [] + self.user_data_received = [] + + def on_frame(frame): + self.frames_received.append(frame) + + def on_user_data(data): + self.user_data_received.append(data) + + self.filter = console_monitor.FrameFilter( + on_frame=on_frame, + on_user_data=on_user_data + ) + + def test_frame_filter_flush_returns_buffer(self): + """Test flush() returns remaining buffer data.""" + # Add some data to buffer + self.filter.process(b"partial data") + + # Flush should return the data + result = self.filter.flush() + + self.assertEqual(result, b"partial data") + self.assertFalse(self.filter.has_pending_data()) + + def test_frame_filter_flush_clears_escape_state(self): + """Test flush() clears escape state.""" + # Process DLE without following byte + self.filter.process(bytes([console_monitor.SpecialChar.DLE])) + + self.assertTrue(self.filter.has_pending_data()) + + result = self.filter.flush() + + # Buffer should be cleared + self.assertFalse(self.filter.has_pending_data()) + self.assertFalse(self.filter.in_frame) + + def test_frame_filter_has_pending_data(self): + """Test has_pending_data() correctly reports buffer state.""" + self.assertFalse(self.filter.has_pending_data()) + + self.filter.process(b"test") + self.assertTrue(self.filter.has_pending_data()) + + self.filter.flush() + self.assertFalse(self.filter.has_pending_data()) + + def test_frame_filter_in_frame_property(self): + """Test in_frame property tracks frame state.""" + self.assertFalse(self.filter.in_frame) + + # Start a frame with SOF sequence (3 bytes) + self.filter.process(console_monitor.SOF_SEQUENCE) + self.assertTrue(self.filter.in_frame) + + # Complete the frame with EOF sequence + self.filter.process(console_monitor.EOF_SEQUENCE) + self.assertFalse(self.filter.in_frame) + + def test_frame_filter_timeout_flushes_user_data_outside_frame(self): + """Test on_timeout() flushes data as user data when not in frame.""" + self.filter.process(b"user input") + self.assertFalse(self.filter.in_frame) + + self.filter.on_timeout() + + # Data should be sent as user data + self.assertEqual(len(self.user_data_received), 1) + self.assertEqual(self.user_data_received[0], b"user input") + self.assertFalse(self.filter.has_pending_data()) + + def test_frame_filter_timeout_discards_incomplete_frame(self): + """Test on_timeout() discards incomplete frame data.""" + # Start a frame but don't complete it + self.filter.process(console_monitor.SOF_SEQUENCE + b"partial") + self.assertTrue(self.filter.in_frame) + + self.filter.on_timeout() + + # Incomplete frame should be discarded + self.assertFalse(self.filter.has_pending_data()) + self.assertFalse(self.filter.in_frame) + self.assertEqual(len(self.frames_received), 0) + + def test_frame_filter_handles_dle_escape_sequence(self): + """Test DLE escape sequence is properly handled.""" + # Build a frame with escaped DLE inside using proper SOF/EOF sequences + data = console_monitor.SOF_SEQUENCE + bytes([console_monitor.SpecialChar.DLE, console_monitor.SpecialChar.DLE]) + console_monitor.EOF_SEQUENCE + + self.filter.process(data) + + # Should have tried to parse as a frame + self.assertFalse(self.filter.in_frame) + + def test_frame_filter_multiple_frames_in_one_buffer(self): + """Test processing multiple complete frames in one call.""" + # Create two valid heartbeat frames + frame1 = console_monitor.Frame.create_heartbeat(1) + frame2 = console_monitor.Frame.create_heartbeat(2) + + combined = frame1.build() + frame2.build() + self.filter.process(combined) + + # Both frames should be received + self.assertEqual(len(self.frames_received), 2) + self.assertEqual(self.frames_received[0].seq, 1) + self.assertEqual(self.frames_received[1].seq, 2) + + def test_frame_filter_mixed_user_data_and_frames(self): + """Test mixed user data and frames are correctly separated.""" + # User data first + user_data = b"login: " + + # Then a heartbeat frame + frame = console_monitor.Frame.create_heartbeat(42) + + # Process together + self.filter.process(user_data) + self.filter.on_timeout() # Flush user data + self.filter.process(frame.build()) + + # Verify separation + self.assertEqual(len(self.user_data_received), 1) + self.assertEqual(self.user_data_received[0], user_data) + self.assertEqual(len(self.frames_received), 1) + self.assertEqual(self.frames_received[0].seq, 42) + + def test_frame_filter_buffer_overflow_flushes_user_data(self): + """Test buffer overflow triggers flush for user data.""" + # Send more data than MAX_FRAME_BUFFER_SIZE + large_data = b"x" * (console_monitor.MAX_FRAME_BUFFER_SIZE + 100) + + self.filter.process(large_data) + + # Should have flushed as user data + self.assertGreater(len(self.user_data_received), 0) + + +# ============================================================ +# Utility Function Tests +# ============================================================ + +class TestUtilityFunctions(TestCase): + """Test cases for utility functions.""" + + def test_set_nonblocking(self): + """Test set_nonblocking sets O_NONBLOCK flag.""" + # Create a pipe for testing + r_fd, w_fd = os.pipe() + + try: + # Get initial flags + initial_flags = fcntl.fcntl(r_fd, fcntl.F_GETFL) + self.assertFalse(initial_flags & os.O_NONBLOCK) + + # Set non-blocking + console_monitor.set_nonblocking(r_fd) + + # Verify flag is set + new_flags = fcntl.fcntl(r_fd, fcntl.F_GETFL) + self.assertTrue(new_flags & os.O_NONBLOCK) + finally: + os.close(r_fd) + os.close(w_fd) + + def test_get_udev_prefix_default(self): + """Test get_udev_prefix returns default when file not found.""" + with mock.patch.dict('sys.modules', {'sonic_py_common': None}): + # When sonic_py_common not available, should return default + with mock.patch.object(console_monitor, 'get_udev_prefix', return_value="ttyUSB"): + result = console_monitor.get_udev_prefix() + self.assertEqual(result, "ttyUSB") + + def test_configure_serial_with_pty(self): + """Test configure_serial configures PTY (simulating serial port).""" + # Create a PTY pair for testing + master, slave = os.openpty() + + try: + # Should not raise any exceptions + console_monitor.configure_serial(master, 9600) + + # Verify settings were applied + attrs = termios.tcgetattr(master) + + # Check that raw mode settings are applied + # ECHO should be off + self.assertFalse(attrs[3] & termios.ECHO) + finally: + os.close(master) + os.close(slave) + + def test_configure_serial_with_different_bauds(self): + """Test configure_serial with different baud rates.""" + master, slave = os.openpty() + + try: + for baud in [9600, 19200, 38400, 57600, 115200]: + console_monitor.configure_serial(master, baud) + + attrs = termios.tcgetattr(master) + expected_speed = console_monitor.BAUD_MAP.get(baud, termios.B9600) + self.assertEqual(attrs[4], expected_speed) + self.assertEqual(attrs[5], expected_speed) + finally: + os.close(master) + os.close(slave) + + def test_configure_pty(self): + """Test configure_pty sets raw mode and disables echo.""" + master, slave = os.openpty() + + try: + console_monitor.configure_pty(master) + + attrs = termios.tcgetattr(master) + + # ECHO should be off + self.assertFalse(attrs[3] & termios.ECHO) + # ECHONL should be off + self.assertFalse(attrs[3] & termios.ECHONL) + finally: + os.close(master) + os.close(slave) + + def test_crc16_modbus(self): + """Test CRC16 MODBUS calculation.""" + # Known test vector + result = console_monitor.crc16_modbus(b"\x01\x02\x03") + self.assertIsInstance(result, int) + self.assertGreaterEqual(result, 0) + self.assertLessEqual(result, 0xFFFF) + + # Same input should give same CRC + result2 = console_monitor.crc16_modbus(b"\x01\x02\x03") + self.assertEqual(result, result2) + + # Different input should give different CRC + result3 = console_monitor.crc16_modbus(b"\x01\x02\x04") + self.assertNotEqual(result, result3) + + def test_escape_data(self): + """Test escape_data properly escapes special characters.""" + # Data with SOF character + sof = console_monitor.SpecialChar.SOF + data = bytes([0x01, sof, 0x02]) + + escaped = console_monitor.escape_data(data) + + # DLE should be inserted before SOF + self.assertIn(console_monitor.SpecialChar.DLE, escaped) + self.assertGreater(len(escaped), len(data)) + + def test_unescape_data(self): + """Test unescape_data reverses escape_data.""" + original = bytes([0x01, console_monitor.SpecialChar.SOF, 0x02]) + + escaped = console_monitor.escape_data(original) + unescaped = console_monitor.unescape_data(escaped) + + self.assertEqual(unescaped, original) + + def test_escape_unescape_roundtrip(self): + """Test escape/unescape roundtrip for various data.""" + test_cases = [ + b"", + b"normal data", + bytes([console_monitor.SpecialChar.SOF]), + bytes([console_monitor.SpecialChar.EOF]), + bytes([console_monitor.SpecialChar.DLE]), + bytes([console_monitor.SpecialChar.SOF, console_monitor.SpecialChar.EOF, console_monitor.SpecialChar.DLE]), + bytes(range(256)), + ] + + for original in test_cases: + escaped = console_monitor.escape_data(original) + unescaped = console_monitor.unescape_data(escaped) + self.assertEqual(unescaped, original, f"Roundtrip failed for {original!r}") + + +# ============================================================ +# ProxyService Runtime Tests +# ============================================================ + +class TestProxyServiceRuntime(TestCase): + """Tests for ProxyService runtime behavior.""" + + def test_proxy_service_update_state(self): + """Test _update_state updates Redis state.""" + state_table = mock.Mock() + + proxy = console_monitor.ProxyService(link_id="1") + proxy.state_table = state_table + + proxy._update_state("Up") + + # Should call state_table.set + state_table.set.assert_called_once() + args = state_table.set.call_args + self.assertEqual(args[0][0], "1") # link_id + + # State should be tracked + self.assertEqual(proxy._current_oper_state, "Up") + + def test_proxy_service_update_state_only_on_change(self): + """Test _update_state only updates on state change.""" + state_table = mock.Mock() + + proxy = console_monitor.ProxyService(link_id="1") + proxy.state_table = state_table + + # First update + proxy._update_state("Up") + self.assertEqual(state_table.set.call_count, 1) + + # Same state - should not update + proxy._update_state("Up") + self.assertEqual(state_table.set.call_count, 1) + + # Different state - should update + proxy._update_state("Unknown") + self.assertEqual(state_table.set.call_count, 2) + + def test_proxy_service_cleanup_state(self): + """Test _cleanup_state removes Redis entries.""" + state_table = mock.Mock() + + proxy = console_monitor.ProxyService(link_id="1") + proxy.state_table = state_table + + proxy._cleanup_state() + + # Should call hdel for both fields + self.assertEqual(state_table.hdel.call_count, 2) + + def test_proxy_service_on_frame_received_heartbeat(self): + """Test _on_frame_received handles heartbeat frames.""" + state_table = mock.Mock() + + proxy = console_monitor.ProxyService(link_id="1") + proxy.state_table = state_table + + frame = console_monitor.Frame.create_heartbeat(42) + + proxy._on_frame_received(frame) + + # Should update state to "Up" + self.assertEqual(proxy._current_oper_state, "Up") + + def test_proxy_service_on_user_data_received(self): + """Test _on_user_data_received writes to PTM.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.ptm_fd = 10 # Mock fd + + with mock.patch('os.write') as mock_write: + proxy._on_user_data_received(b"test data") + + mock_write.assert_called_once_with(10, b"test data") + + def test_proxy_service_check_heartbeat_timeout(self): + """Test _check_heartbeat_timeout detects timeout.""" + state_table = mock.Mock() + + proxy = console_monitor.ProxyService(link_id="1") + proxy.state_table = state_table + + # Simulate heartbeat timeout + proxy._last_heartbeat_time = time.monotonic() - console_monitor.HEARTBEAT_TIMEOUT - 1 + proxy._last_data_activity = time.monotonic() - console_monitor.HEARTBEAT_TIMEOUT - 1 + + proxy._check_heartbeat_timeout() + + # Should set state to "Unknown" + self.assertEqual(proxy._current_oper_state, "Unknown") + + def test_proxy_service_check_heartbeat_timeout_with_data_activity(self): + """Test _check_heartbeat_timeout resets with data activity.""" + state_table = mock.Mock() + + proxy = console_monitor.ProxyService(link_id="1") + proxy.state_table = state_table + + # Heartbeat timed out but recent data activity + proxy._last_heartbeat_time = time.monotonic() - console_monitor.HEARTBEAT_TIMEOUT - 1 + proxy._last_data_activity = time.monotonic() # Recent activity + + proxy._check_heartbeat_timeout() + + # Should not set state to "Unknown" because of data activity + self.assertNotEqual(proxy._current_oper_state, "Unknown") + + def test_proxy_service_run_loop_processes_split_frame(self): + """ + Test _run_loop correctly processes a frame split across two reads. + + This test simulates a real scenario where a heartbeat frame arrives + in two separate chunks through the serial port. + """ + import select as select_module + import threading + + state_table = mock.Mock() + frames_received = [] + + # Create proxy instance + proxy = console_monitor.ProxyService(link_id="test") + proxy.state_table = state_table + + # Create pipes to simulate ser_fd, ptm_fd, and wake pipe + ser_r, ser_w = os.pipe() # Simulate serial port + ptm_r, ptm_w = os.pipe() # Simulate PTM + wake_r, wake_w = os.pipe() # Wake pipe + + try: + # Set up proxy with our test file descriptors + proxy.ser_fd = ser_r + proxy.ptm_fd = ptm_r + proxy._wake_r = wake_r + proxy._wake_w = wake_w + proxy.running = True + proxy._last_heartbeat_time = time.monotonic() + proxy._last_data_activity = time.monotonic() + proxy._last_serial_data_time = time.monotonic() + + # Set non-blocking + console_monitor.set_nonblocking(ser_r) + console_monitor.set_nonblocking(ptm_r) + console_monitor.set_nonblocking(wake_r) + + # Create frame filter with callback to track received frames + def track_frame(frame): + frames_received.append(frame) + + proxy.filter = console_monitor.FrameFilter( + on_frame=track_frame, + on_user_data=lambda data: None, + ) + + # Build a heartbeat frame + heartbeat = console_monitor.Frame.create_heartbeat(seq=42) + frame_bytes = heartbeat.build() + + # Split the frame into two parts + split_point = len(frame_bytes) // 2 + part1 = frame_bytes[:split_point] + part2 = frame_bytes[split_point:] + + # Start the run loop in a separate thread + loop_thread = threading.Thread(target=proxy._run_loop, daemon=True) + loop_thread.start() + + # Give the loop time to start + time.sleep(0.05) + + # Write first part of frame to simulate serial read + os.write(ser_w, part1) + time.sleep(0.05) + + # Write second part of frame + os.write(ser_w, part2) + time.sleep(0.1) + + # Stop the loop + proxy.running = False + os.write(wake_w, b'x') # Wake up select + loop_thread.join(timeout=1.0) + + # Verify that the frame was correctly parsed despite being split + self.assertEqual(len(frames_received), 1, + f"Expected 1 frame, got {len(frames_received)}") + self.assertTrue(frames_received[0].is_heartbeat()) + self.assertEqual(frames_received[0].seq, 42) + + finally: + # Clean up file descriptors + for fd in (ser_r, ser_w, ptm_r, ptm_w, wake_r, wake_w): + try: + os.close(fd) + except: + pass + + +# ============================================================ +# Frame Protocol Extended Tests +# ============================================================ + +class TestFrameProtocolExtended(TestCase): + """Extended tests for Frame protocol.""" + + def test_frame_create_heartbeat_builds_valid_frame(self): + """Test create_heartbeat creates valid frame structure.""" + frame = console_monitor.Frame.create_heartbeat(100) + + self.assertEqual(frame.frame_type, console_monitor.FrameType.HEARTBEAT) + self.assertEqual(frame.seq, 100) + self.assertIsInstance(frame.payload, bytes) + + def test_frame_is_heartbeat_returns_true_for_heartbeat(self): + """Test is_heartbeat returns True for heartbeat frames.""" + frame = console_monitor.Frame.create_heartbeat(0) + self.assertTrue(frame.is_heartbeat()) + + def test_frame_is_heartbeat_returns_false_for_other_types(self): + """Test is_heartbeat returns False for non-heartbeat frames.""" + # Create a non-heartbeat frame manually with a different type value + frame = console_monitor.Frame( + frame_type=0x99, # Non-existent type + seq=0, + payload=b"" + ) + self.assertFalse(frame.is_heartbeat()) + + def test_frame_build_produces_framed_output(self): + """Test build() produces properly framed output.""" + frame = console_monitor.Frame.create_heartbeat(1) + output = frame.build() + + # Should start with SOF_SEQUENCE and end with EOF_SEQUENCE + self.assertTrue(output.startswith(console_monitor.SOF_SEQUENCE)) + self.assertTrue(output.endswith(console_monitor.EOF_SEQUENCE)) + + # Should contain escaped content + self.assertGreater(len(output), len(console_monitor.SOF_SEQUENCE) + len(console_monitor.EOF_SEQUENCE)) + + def test_frame_parse_roundtrip(self): + """Test frame can be built and parsed back.""" + original = console_monitor.Frame.create_heartbeat(42) + built = original.build() + + # Strip SOF/EOF for parsing content + content = built[len(console_monitor.SOF_SEQUENCE):-len(console_monitor.EOF_SEQUENCE)] + + # Unescape content using module function + unescaped = console_monitor.unescape_data(content) + + # Parse should work on the original built data + parsed = console_monitor.Frame.parse(content) + + self.assertIsNotNone(parsed) + self.assertEqual(parsed.seq, 42) + self.assertEqual(parsed.frame_type, console_monitor.FrameType.HEARTBEAT) + + def test_frame_crc_validation(self): + """Test CRC validation in frame parsing.""" + frame = console_monitor.Frame.create_heartbeat(1) + valid_data = frame.build() + + # Extract content without SOF/EOF + content = valid_data[len(console_monitor.SOF_SEQUENCE):-len(console_monitor.EOF_SEQUENCE)] + + # Valid content should parse + parsed = console_monitor.Frame.parse(content) + self.assertIsNotNone(parsed) + + def test_frame_sequence_full_range(self): + """Test frames work with full sequence number range.""" + for seq in [0, 1, 127, 128, 254, 255]: + frame = console_monitor.Frame.create_heartbeat(seq) + built = frame.build() + + # Extract content without SOF/EOF + content = built[len(console_monitor.SOF_SEQUENCE):-len(console_monitor.EOF_SEQUENCE)] + parsed = console_monitor.Frame.parse(content) + + self.assertIsNotNone(parsed, f"Failed to parse frame with seq={seq}") + self.assertEqual(parsed.seq, seq) + + +# ============================================================ +# DCE Service Extended Tests +# ============================================================ + +class TestDCEServiceExtended(TestCase): + """Extended tests for DCE service.""" + + def setUp(self): + """Set up test fixtures.""" + MockSubprocess.reset() + MockConfigDb.CONFIG_DB = None + + def tearDown(self): + """Clean up after tests.""" + MockSubprocess.reset() + MockConfigDb.CONFIG_DB = None + + def test_dce_sync_adds_new_link(self): + """Test _sync adds services for new configuration.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + service.active_links = set() + service._config_cache = {} + + with mock.patch('subprocess.run', MockSubprocess.mock_run): + service._sync() + + self.assertEqual(len(service.active_links), 3) + self.assertIn("1", service.active_links) + self.assertIn("2", service.active_links) + self.assertIn("3", service.active_links) + + def test_dce_sync_removes_link_when_port_deleted(self): + """Test _sync removes services when port is deleted from config.""" + # Use deepcopy to avoid mutating shared config + initial_config = copy.deepcopy(DCE_3_LINKS_ENABLED_CONFIG_DB) + MockConfigDb.set_config_db(initial_config) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + service.active_links = set() + service._config_cache = {} + + with mock.patch('subprocess.run', MockSubprocess.mock_run): + # Initial sync - should create 3 links + service._sync() + self.assertEqual(len(service.active_links), 3) + + # Remove port 2 from config + del MockConfigDb.CONFIG_DB["CONSOLE_PORT"]["2"] + + # Reset counters + MockSubprocess.reset() + + # Sync again - should stop services for port 2 + service._sync() + + self.assertEqual(len(service.active_links), 2) + self.assertNotIn("2", service.active_links) + self.assertIn("1", service.active_links) + self.assertIn("3", service.active_links) + + def test_dce_sync_restarts_link_on_baud_change(self): + """Test _sync restarts services when baud rate changes.""" + initial_config = copy.deepcopy(DCE_3_LINKS_ENABLED_CONFIG_DB) + MockConfigDb.set_config_db(initial_config) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + service.active_links = set() + service._config_cache = {} + + with mock.patch('subprocess.run', MockSubprocess.mock_run): + service._sync() + + # Verify initial state + self.assertEqual(service._config_cache["1"]["baud"], 9600) + + # Change baud rate for port 1 + MockConfigDb.CONFIG_DB["CONSOLE_PORT"]["1"]["baud_rate"] = "115200" + + # Reset counters + MockSubprocess.reset() + + service._sync() + + # Config cache should be updated + self.assertEqual(service._config_cache["1"]["baud"], 115200) + + # Should have stopped and started services (2 stop + 2 start) + self.assertGreater(MockSubprocess.get_stopped_count(), 0) + self.assertGreater(MockSubprocess.get_started_count(), 0) + + def test_dce_stop_stops_all_links(self): + """Test stop() stops all active links.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + service.active_links = set() + service._config_cache = {} + service.running = True + + with mock.patch('subprocess.run', MockSubprocess.mock_run): + service._sync() + + self.assertEqual(len(service.active_links), 3) + + # Reset counters + MockSubprocess.reset() + + service.stop() + + self.assertFalse(service.running) + self.assertEqual(len(service.active_links), 0) + # Should have stopped 6 services (2 per link) + self.assertEqual(MockSubprocess.get_stopped_count(), 6) + + def test_dce_get_all_configs_parses_correctly(self): + """Test _get_all_configs returns properly formatted configs.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + configs = service._get_all_configs() + + self.assertEqual(len(configs), 3) + + # Check port 1 + self.assertIn("1", configs) + self.assertEqual(configs["1"]["baud"], 9600) + + # Check port 2 + self.assertIn("2", configs) + self.assertEqual(configs["2"]["baud"], 115200) + + def test_dce_console_port_handler_triggers_sync(self): + """Test console_port_handler triggers _sync.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + service.active_links = set() + service._config_cache = {} + + with mock.patch.object(service, '_sync') as mock_sync: + service.console_port_handler("1", "SET", {"baud_rate": "9600"}) + mock_sync.assert_called_once() + + def test_dce_console_switch_handler_triggers_sync(self): + """Test console_switch_handler triggers _sync.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + service.active_links = set() + service._config_cache = {} + + with mock.patch.object(service, '_sync') as mock_sync: + service.console_switch_handler("console_mgmt", "SET", {"enabled": "yes"}) + mock_sync.assert_called_once() + + +# ============================================================ +# DTE Service Extended Tests +# ============================================================ + +class TestDTEServiceExtended(TestCase): + """Extended tests for DTE service.""" + + def test_dte_send_heartbeat_increments_seq(self): + """Test _send_heartbeat increments sequence number.""" + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.seq = 0 + + # Mock os.open, os.write, os.close for the new open-write-close pattern + with mock.patch('os.open', return_value=10): + with mock.patch('os.write') as mock_write: + with mock.patch('os.close'): + service._send_heartbeat() + + self.assertEqual(service.seq, 1) + mock_write.assert_called_once() + + def test_dte_send_heartbeat_wraps_seq(self): + """Test _send_heartbeat wraps sequence at 256.""" + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.seq = 255 + + with mock.patch('os.open', return_value=10): + with mock.patch('os.write'): + with mock.patch('os.close'): + service._send_heartbeat() + + self.assertEqual(service.seq, 0) + + def test_dte_send_heartbeat_skips_invalid_fd(self): + """Test _send_heartbeat handles open failure gracefully.""" + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.seq = 0 + + # Simulate os.open failure + with mock.patch('os.open', side_effect=OSError("Permission denied")): + with mock.patch('os.write') as mock_write: + service._send_heartbeat() + + mock_write.assert_not_called() + # Seq should not change on failure + self.assertEqual(service.seq, 0) + + def test_dte_stop_closes_serial_fd(self): + """Test stop() stops running and heartbeat.""" + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.running = True + + with mock.patch.object(service, '_stop_heartbeat') as mock_stop_hb: + service.stop() + + mock_stop_hb.assert_called_once() + self.assertFalse(service.running) + + def test_dte_start_heartbeat_is_idempotent(self): + """Test _start_heartbeat doesn't create duplicate threads.""" + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + + # Create a mock alive thread + mock_thread = mock.Mock() + mock_thread.is_alive.return_value = True + service._heartbeat_thread = mock_thread + + with mock.patch('threading.Thread') as mock_thread_class: + service._start_heartbeat() + + # Should not create a new thread + mock_thread_class.assert_not_called() + + def test_dte_stop_heartbeat_sets_stop_event(self): + """Test _stop_heartbeat sets the stop event.""" + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + + # Start heartbeat first + service._heartbeat_stop.clear() + + # Create a mock thread + mock_thread = mock.Mock() + mock_thread.is_alive.return_value = True + service._heartbeat_thread = mock_thread + + service._stop_heartbeat() + + self.assertTrue(service._heartbeat_stop.is_set()) + mock_thread.join.assert_called_once() + + +# ============================================================ +# Main Entry Point Tests +# ============================================================ + +class TestMainEntryPoint(TestCase): + """Tests for main program entry points.""" + + def test_main_shows_usage_without_args(self): + """Test main shows usage when no arguments provided.""" + with mock.patch.object(sys, 'argv', ['console-monitor']): + with self.assertRaises(SystemExit) as context: + console_monitor.main() + + self.assertEqual(context.exception.code, console_monitor.EXIT_INVALID_MODE) + + def test_main_rejects_unknown_mode(self): + """Test main rejects unknown mode.""" + with mock.patch.object(sys, 'argv', ['console-monitor', 'invalid']): + with self.assertRaises(SystemExit) as context: + console_monitor.main() + + # argparse exits with code 2 for invalid subcommand + self.assertIn(context.exception.code, [2, console_monitor.EXIT_INVALID_MODE]) + + def test_run_dce_calls_service_methods(self): + """Test run_dce properly initializes and runs DCE service.""" + with mock.patch.object(console_monitor.DCEService, 'start', return_value=True): + with mock.patch.object(console_monitor.DCEService, 'register_callbacks'): + with mock.patch.object(console_monitor.DCEService, 'run', side_effect=SystemExit(0)): + with mock.patch.object(console_monitor.DCEService, 'stop'): + with mock.patch('signal.signal'): + result = console_monitor.run_dce() + + self.assertEqual(result, 0) + + def test_run_dce_returns_error_on_start_failure(self): + """Test run_dce returns EXIT_SERVICE_START_FAILED when start fails.""" + with mock.patch.object(console_monitor.DCEService, 'start', return_value=False): + with mock.patch('signal.signal'): + result = console_monitor.run_dce() + + self.assertEqual(result, console_monitor.EXIT_SERVICE_START_FAILED) + + def test_run_dte_with_cmdline_args(self): + """Test run_dte uses command line arguments when provided.""" + with mock.patch.object(console_monitor.DTEService, 'start', return_value=True): + with mock.patch.object(console_monitor.DTEService, 'register_callbacks'): + with mock.patch.object(console_monitor.DTEService, 'run', side_effect=SystemExit(0)): + with mock.patch.object(console_monitor.DTEService, 'stop'): + with mock.patch('signal.signal'): + result = console_monitor.run_dte("ttyS1", 115200) + + self.assertEqual(result, 0) + + def test_run_dte_falls_back_to_proc_cmdline(self): + """Test run_dte uses /proc/cmdline when no args provided.""" + with mock.patch.object(console_monitor, 'parse_proc_cmdline', return_value=("ttyS0", 9600)): + with mock.patch.object(console_monitor.DTEService, 'start', return_value=True): + with mock.patch.object(console_monitor.DTEService, 'register_callbacks'): + with mock.patch.object(console_monitor.DTEService, 'run', side_effect=SystemExit(0)): + with mock.patch.object(console_monitor.DTEService, 'stop'): + with mock.patch('signal.signal'): + result = console_monitor.run_dte(None, None) + + self.assertEqual(result, 0) + + def test_run_dte_returns_error_on_parse_failure(self): + """Test run_dte returns EXIT_SERIAL_CONFIG_ERROR when parse_proc_cmdline fails.""" + with mock.patch.object(console_monitor, 'parse_proc_cmdline', + side_effect=ValueError("No console")): + with mock.patch('signal.signal'): + result = console_monitor.run_dte(None, None) + + self.assertEqual(result, console_monitor.EXIT_SERIAL_CONFIG_ERROR) + + +# ============================================================ +# DCE Service Start/Stop Tests +# ============================================================ + +class TestDCEServiceStartStop(TestCase): + """Tests for DCE service start/stop behavior.""" + + def setUp(self): + """Set up test fixtures.""" + MockConfigDb.CONFIG_DB = None + + def tearDown(self): + """Clean up after tests.""" + MockConfigDb.CONFIG_DB = None + + def test_dce_start_connects_to_databases(self): + """Test DCE start connects to CONFIG_DB.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + + with mock.patch.object(console_monitor, 'ConfigDBConnector', return_value=MockConfigDb()) as mock_cdb: + result = service.start() + + self.assertTrue(result) + self.assertTrue(service.running) + + def test_dce_register_callbacks_subscribes_to_tables(self): + """Test register_callbacks subscribes to CONSOLE_PORT and CONSOLE_SWITCH.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + with mock.patch.object(service.config_db, 'subscribe') as mock_subscribe: + service.register_callbacks() + + # Should subscribe to two tables + self.assertEqual(mock_subscribe.call_count, 2) + + def test_dce_run_calls_listen(self): + """Test run() calls config_db.listen().""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + service.running = True + + with mock.patch.object(service.config_db, 'listen') as mock_listen: + mock_listen.side_effect = KeyboardInterrupt() + + service.run() + + mock_listen.assert_called_once() + + +# ============================================================ +# DTE Service Start/Stop Tests +# ============================================================ + +class TestDTEServiceStartStop(TestCase): + """Tests for DTE service start/stop behavior.""" + + def setUp(self): + """Set up test fixtures.""" + MockConfigDb.CONFIG_DB = None + + def tearDown(self): + """Clean up after tests.""" + MockConfigDb.CONFIG_DB = None + + def test_dte_start_opens_serial_port(self): + """Test DTE start connects to ConfigDB.""" + MockConfigDb.set_config_db(DTE_ENABLED_CONFIG_DB) + + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + + with mock.patch.object(MockConfigDb, 'connect'): + service.config_db = MockConfigDb() + result = service.start() + + self.assertTrue(result) + self.assertTrue(service.running) + + def test_dte_register_callbacks_subscribes_to_console_switch(self): + """Test register_callbacks subscribes to CONSOLE_SWITCH.""" + MockConfigDb.set_config_db(DTE_ENABLED_CONFIG_DB) + + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.config_db = MockConfigDb() + + with mock.patch.object(service.config_db, 'subscribe') as mock_subscribe: + service.register_callbacks() + + mock_subscribe.assert_called_once() + + def test_dte_run_calls_listen(self): + """Test run() calls config_db.listen().""" + MockConfigDb.set_config_db(DTE_ENABLED_CONFIG_DB) + + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.config_db = MockConfigDb() + service.running = True + + with mock.patch.object(service.config_db, 'listen') as mock_listen: + mock_listen.side_effect = KeyboardInterrupt() + + service.run() + + mock_listen.assert_called_once() + + def test_dte_heartbeat_loop_sends_heartbeats(self): + """Test _heartbeat_loop sends heartbeats periodically.""" + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.ser_fd = -1 # Use -1 so _send_heartbeat returns early without I/O + + call_count = 0 + + def counting_send(): + nonlocal call_count + call_count += 1 + # Stop after first call to prevent blocking + service._heartbeat_stop.set() + + service._heartbeat_stop.clear() + + with mock.patch.object(service, '_send_heartbeat', side_effect=counting_send): + with mock.patch.object(service._heartbeat_stop, 'wait', return_value=True): + # Run loop directly - it will exit after first iteration due to stop being set + service._heartbeat_loop() + + self.assertEqual(call_count, 1) + + +# ============================================================ +# ProxyService Start Tests +# ============================================================ + +class TestProxyServiceStart(TestCase): + """Tests for ProxyService start behavior.""" + + def test_proxy_service_get_udev_prefix(self): + """Test _get_udev_prefix sets paths correctly.""" + proxy = console_monitor.ProxyService(link_id="1") + + with mock.patch.object(console_monitor, 'get_udev_prefix', return_value="C0-"): + result = proxy._get_udev_prefix() + + self.assertTrue(result) + self.assertEqual(proxy.device_path, "/dev/C0-1") + self.assertIn("PTM", proxy.ptm_path) + + def test_proxy_service_stop_sets_running_false(self): + """Test stop() sets running to False.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.running = True + proxy._wake_w = -1 # No wake pipe + + proxy.stop() + + self.assertFalse(proxy.running) + + +# ============================================================ +# get_udev_prefix Tests +# ============================================================ + +class TestGetUdevPrefix(TestCase): + """Tests for get_udev_prefix function.""" + + def test_get_udev_prefix_returns_default_on_import_error(self): + """Test returns default when sonic_py_common import fails.""" + # Mock the import to fail + original_modules = sys.modules.copy() + + # Remove sonic_py_common to simulate import error + sys.modules['sonic_py_common'] = None + sys.modules['sonic_py_common.device_info'] = None + + try: + # The function should catch the exception and return default + result = console_monitor.get_udev_prefix() + # Default is "ttyUSB" + self.assertIsInstance(result, str) + finally: + # Restore modules + sys.modules.update(original_modules) + + def test_get_udev_prefix_reads_config_file(self): + """Test reads from udevprefix.conf when available.""" + mock_device_info = mock.Mock() + mock_device_info.get_paths_to_platform_and_hwsku_dirs.return_value = ("/tmp/platform", "/tmp/hwsku") + + with mock.patch.dict('sys.modules', {'sonic_py_common': mock.Mock(), + 'sonic_py_common.device_info': mock_device_info}): + with mock.patch('os.path.exists', return_value=True): + with mock.patch('builtins.open', mock.mock_open(read_data="C1")): + # This is tricky because the function is already defined + # For now, test the default path + pass + + +class TestFrameParseEdgeCases(TestCase): + """Additional edge case tests for Frame.parse().""" + + def test_frame_parse_too_short_returns_none(self): + """Test Frame.parse returns None for too short data.""" + # Less than 7 bytes after unescaping + result = console_monitor.Frame.parse(b"\x01\x02\x03") + self.assertIsNone(result) + + def test_frame_parse_empty_returns_none(self): + """Test Frame.parse returns None for empty data.""" + result = console_monitor.Frame.parse(b"") + self.assertIsNone(result) + + def test_frame_parse_content_too_short_returns_none(self): + """Test Frame.parse returns None when content < 5 bytes after CRC removal.""" + # Create data that will have valid CRC but content < 5 bytes + # This is tricky, just test with minimal valid-looking data + result = console_monitor.Frame.parse(bytes([0x01, 0x02, 0x03, 0x04, 0x05, 0x06])) + self.assertIsNone(result) # Should fail CRC or length check + + def test_frame_parse_with_payload(self): + """Test Frame.parse correctly parses frame with payload.""" + # Create a frame with payload + frame = console_monitor.Frame( + version=console_monitor.PROTOCOL_VERSION, + seq=10, + flag=0x00, + frame_type=console_monitor.FrameType.HEARTBEAT, + payload=b"test" + ) + built = frame.build() + content = built[3:-3] # Strip SOF/EOF + + parsed = console_monitor.Frame.parse(content) + self.assertIsNotNone(parsed) + self.assertEqual(parsed.payload, b"test") + + +class TestPTYBridgeFunction(TestCase): + """Tests for run_pty_bridge function.""" + + def test_run_pty_bridge_builds_correct_paths(self): + """Test run_pty_bridge builds correct PTY paths.""" + with mock.patch.object(console_monitor, 'get_udev_prefix', return_value="C0-"): + with mock.patch('os.execvp') as mock_exec: + mock_exec.side_effect = OSError("Exec failed") + + result = console_monitor.run_pty_bridge("1") + + self.assertEqual(result, console_monitor.EXIT_SERVICE_START_FAILED) + + def test_run_pty_bridge_exec_failure(self): + """Test run_pty_bridge returns error when exec fails.""" + with mock.patch.object(console_monitor, 'get_udev_prefix', return_value="C0-"): + with mock.patch('os.execvp', side_effect=FileNotFoundError("socat not found")): + result = console_monitor.run_pty_bridge("test") + + self.assertEqual(result, console_monitor.EXIT_SERVICE_START_FAILED) + + +class TestProxyServicePhases(TestCase): + """Tests for ProxyService startup phases.""" + + def test_proxy_wait_for_config_success(self): + """Test _wait_for_config returns True when config found.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.running = True + + mock_config_db = mock.Mock() + mock_config_db.get_entry.return_value = {"baud_rate": "115200"} + + with mock.patch.object(console_monitor, 'ConfigDBConnector', return_value=mock_config_db): + result = proxy._wait_for_config() + + self.assertTrue(result) + self.assertEqual(proxy.baud, 115200) + + def test_proxy_wait_for_config_stops_when_not_running(self): + """Test _wait_for_config returns False when stopped.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.running = False + + mock_config_db = mock.Mock() + mock_config_db.get_entry.return_value = None + + with mock.patch.object(console_monitor, 'ConfigDBConnector', return_value=mock_config_db): + result = proxy._wait_for_config() + + self.assertFalse(result) + + def test_proxy_wait_for_device_success(self): + """Test _wait_for_device returns True when device exists.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.running = True + proxy.device_path = "/dev/test" + + with mock.patch('os.path.exists', return_value=True): + result = proxy._wait_for_device() + + self.assertTrue(result) + + def test_proxy_wait_for_device_stops_when_not_running(self): + """Test _wait_for_device returns False when stopped.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.running = False + proxy.device_path = "/dev/test" + + with mock.patch('os.path.exists', return_value=False): + result = proxy._wait_for_device() + + self.assertFalse(result) + + def test_proxy_wait_for_ptm_success(self): + """Test _wait_for_ptm returns True when PTM exists.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.running = True + proxy.ptm_path = "/dev/test-PTM" + + with mock.patch('os.path.exists', return_value=True): + result = proxy._wait_for_ptm() + + self.assertTrue(result) + + def test_proxy_wait_for_ptm_stops_when_not_running(self): + """Test _wait_for_ptm returns False when stopped.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.running = False + proxy.ptm_path = "/dev/test-PTM" + + with mock.patch('os.path.exists', return_value=False): + result = proxy._wait_for_ptm() + + self.assertFalse(result) + + @mock.patch.object(console_monitor, 'configure_serial') + @mock.patch.object(console_monitor, 'set_nonblocking') + @mock.patch('os.open', return_value=12) + @mock.patch('os.pipe', return_value=(10, 11)) + @mock.patch.object(console_monitor, 'Table') + @mock.patch.object(console_monitor, 'DBConnector') + def test_proxy_initialize_success(self, mock_db_conn, mock_table, *_): + """Test _initialize succeeds with proper mocks.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.device_path = "/dev/test" + proxy.ptm_path = "/dev/test-PTM" + proxy.baud = 9600 + + result = proxy._initialize() + + self.assertTrue(result) + self.assertIsNotNone(proxy.filter) + + def test_proxy_initialize_failure(self): + """Test _initialize returns False on error.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.device_path = "/dev/test" + proxy.ptm_path = "/dev/test-PTM" + proxy.baud = 9600 + + with mock.patch.object(console_monitor, 'DBConnector', side_effect=Exception("DB error")): + result = proxy._initialize() + + self.assertFalse(result) + + def test_proxy_cleanup_closes_fds(self): + """Test _cleanup closes all file descriptors.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.state_table = mock.Mock() + proxy._wake_r = 10 + proxy._wake_w = 11 + proxy.ser_fd = 12 + proxy.ptm_fd = 13 + proxy.filter = mock.Mock() + proxy.filter.flush.return_value = b"" + + with mock.patch('os.close') as mock_close: + proxy._cleanup() + + # Should call close for each fd + self.assertEqual(mock_close.call_count, 4) + self.assertEqual(proxy._wake_r, -1) + self.assertEqual(proxy._wake_w, -1) + self.assertEqual(proxy.ser_fd, -1) + self.assertEqual(proxy.ptm_fd, -1) + + def test_proxy_cleanup_flushes_remaining_data(self): + """Test _cleanup flushes remaining filter data to PTM.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.state_table = mock.Mock() + proxy._wake_r = -1 + proxy._wake_w = -1 + proxy.ser_fd = -1 + proxy.ptm_fd = 10 + proxy.filter = mock.Mock() + proxy.filter.flush.return_value = b"remaining data" + + with mock.patch('os.write') as mock_write: + with mock.patch('os.close'): + proxy._cleanup() + + mock_write.assert_called_once_with(10, b"remaining data") + + def test_proxy_on_serial_read(self): + """Test _on_serial_read processes data through filter.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.running = True + proxy.ser_fd = 10 + proxy.filter = mock.Mock() + + with mock.patch('os.read', return_value=b"test data"): + proxy._on_serial_read() + + proxy.filter.process.assert_called_once_with(b"test data") + + def test_proxy_on_serial_read_handles_blocking_error(self): + """Test _on_serial_read handles BlockingIOError gracefully.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.running = True + proxy.ser_fd = 10 + proxy.filter = mock.Mock() + + with mock.patch('os.read', side_effect=BlockingIOError()): + # Should not raise + proxy._on_serial_read() + + def test_proxy_on_ptm_read(self): + """Test _on_ptm_read forwards data to serial.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.running = True + proxy.ptm_fd = 10 + proxy.ser_fd = 11 + + with mock.patch('os.read', return_value=b"user input"): + with mock.patch('os.write') as mock_write: + proxy._on_ptm_read() + + mock_write.assert_called_once_with(11, b"user input") + + def test_proxy_on_ptm_read_handles_blocking_error(self): + """Test _on_ptm_read handles BlockingIOError gracefully.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.running = True + proxy.ptm_fd = 10 + + with mock.patch('os.read', side_effect=BlockingIOError()): + # Should not raise + proxy._on_ptm_read() + + def test_proxy_on_frame_received_unknown_type(self): + """Test _on_frame_received handles unknown frame type.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.state_table = mock.Mock() + + # Create a frame with unknown type + frame = console_monitor.Frame(frame_type=0x99, seq=1) + + # Should not raise, just log warning + proxy._on_frame_received(frame) + + def test_proxy_run_returns_success_after_phases(self): + """Test run() returns EXIT_SUCCESS after all phases.""" + proxy = console_monitor.ProxyService(link_id="1") + + with mock.patch.multiple( + proxy, + _get_udev_prefix=mock.Mock(return_value=True), + _wait_for_config=mock.Mock(return_value=True), + _wait_for_device=mock.Mock(return_value=True), + _wait_for_ptm=mock.Mock(return_value=True), + _initialize=mock.Mock(return_value=True), + _run_loop=mock.Mock(), + _cleanup=mock.Mock(), + ): + result = proxy.run() + + self.assertEqual(result, console_monitor.EXIT_SUCCESS) + + def test_proxy_run_returns_failure_when_config_fails(self): + """Test run() returns failure when config phase fails.""" + proxy = console_monitor.ProxyService(link_id="1") + + with mock.patch.object(proxy, '_get_udev_prefix', return_value=True): + with mock.patch.object(proxy, '_wait_for_config', return_value=False): + result = proxy.run() + + self.assertEqual(result, console_monitor.EXIT_SERVICE_START_FAILED) + + +class TestDCEServiceSystemctl(TestCase): + """Tests for DCE service systemctl operations.""" + + def setUp(self): + """Set up test fixtures.""" + MockSubprocess.reset() + MockConfigDb.CONFIG_DB = None + + def tearDown(self): + """Clean up after tests.""" + MockSubprocess.reset() + MockConfigDb.CONFIG_DB = None + + def test_dce_start_pty_bridge_timeout(self): + """Test _start_pty_bridge handles timeout.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + with mock.patch('subprocess.run', side_effect=subprocess.TimeoutExpired('cmd', 30)): + result = service._start_pty_bridge("1") + + self.assertFalse(result) + + def test_dce_start_pty_bridge_exception(self): + """Test _start_pty_bridge handles exceptions.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + with mock.patch('subprocess.run', side_effect=Exception("Unexpected error")): + result = service._start_pty_bridge("1") + + self.assertFalse(result) + + def test_dce_stop_pty_bridge_timeout(self): + """Test _stop_pty_bridge handles timeout.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + with mock.patch('subprocess.run', side_effect=subprocess.TimeoutExpired('cmd', 30)): + result = service._stop_pty_bridge("1") + + self.assertFalse(result) + + def test_dce_start_proxy_timeout(self): + """Test _start_proxy handles timeout.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + with mock.patch('subprocess.run', side_effect=subprocess.TimeoutExpired('cmd', 30)): + result = service._start_proxy("1") + + self.assertFalse(result) + + def test_dce_stop_proxy_timeout(self): + """Test _stop_proxy handles timeout.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + with mock.patch('subprocess.run', side_effect=subprocess.TimeoutExpired('cmd', 30)): + result = service._stop_proxy("1") + + self.assertFalse(result) + + def test_dce_start_link_fails_when_pty_bridge_fails(self): + """Test _start_link returns False when pty-bridge fails.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + with mock.patch.object(service, '_start_pty_bridge', return_value=False): + result = service._start_link("1") + + self.assertFalse(result) + + def test_dce_start_link_fails_when_proxy_fails(self): + """Test _start_link stops pty-bridge and returns False when proxy fails.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + with mock.patch.object(service, '_start_pty_bridge', return_value=True): + with mock.patch.object(service, '_start_proxy', return_value=False): + with mock.patch.object(service, '_stop_pty_bridge') as mock_stop: + result = service._start_link("1") + + self.assertFalse(result) + mock_stop.assert_called_once_with("1") + + def test_dce_check_feature_enabled_handles_exception(self): + """Test _check_feature_enabled returns False on exception.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = mock.Mock() + service.config_db.get_entry.side_effect = Exception("DB error") + + result = service._check_feature_enabled() + + self.assertFalse(result) + + def test_dce_get_all_configs_handles_exception(self): + """Test _get_all_configs returns empty dict on exception.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = mock.Mock() + service.config_db.get_table.side_effect = Exception("DB error") + + configs = service._get_all_configs() + + self.assertEqual(configs, {}) + + def test_dce_start_failure(self): + """Test DCE start handles ConfigDB connection failure.""" + with mock.patch.object(console_monitor, 'ConfigDBConnector') as mock_cdb: + mock_cdb.return_value.connect.side_effect = Exception("Connection failed") + + service = console_monitor.DCEService() + result = service.start() + + self.assertFalse(result) + + +class TestDTEServiceExtendedCoverage(TestCase): + """Extended tests for DTE service to improve coverage.""" + + def test_dte_check_enabled_handles_exception(self): + """Test _check_enabled returns False on exception.""" + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.config_db = mock.Mock() + service.config_db.get_entry.side_effect = Exception("DB error") + + result = service._check_enabled() + + self.assertFalse(result) + + def test_dte_send_heartbeat_write_failure(self): + """Test _send_heartbeat handles write failure gracefully.""" + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.seq = 0 + + with mock.patch('os.open', return_value=10): + with mock.patch('os.write', side_effect=OSError("Write failed")): + with mock.patch('os.close'): + # Should not raise, should log error + service._send_heartbeat() + # Seq should not change on failure + self.assertEqual(service.seq, 0) + + def test_dte_start_failure(self): + """Test DTE start handles ConfigDB connection failure.""" + with mock.patch.object(console_monitor, 'ConfigDBConnector') as mock_cdb: + mock_cdb.return_value.connect.side_effect = Exception("Connection failed") + + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + result = service.start() + + self.assertFalse(result) + + def test_dte_console_switch_handler_no_change(self): + """Test console_switch_handler does nothing when state unchanged.""" + MockConfigDb.set_config_db(DTE_ENABLED_CONFIG_DB) + + service = console_monitor.DTEService(tty_name="ttyS0", baud=9600) + service.config_db = MockConfigDb() + service.enabled = True # Already enabled + + with mock.patch.object(service, '_start_heartbeat') as mock_start: + with mock.patch.object(service, '_stop_heartbeat') as mock_stop: + service.console_switch_handler("controlled_device", "SET", {"enabled": "yes"}) + + # Neither should be called since state unchanged + mock_start.assert_not_called() + mock_stop.assert_not_called() + + +class TestRunProxyFunction(TestCase): + """Tests for run_proxy function.""" + + def test_run_proxy_calls_service_run(self): + """Test run_proxy creates ProxyService and calls run.""" + with mock.patch.object(console_monitor.ProxyService, 'run', return_value=0) as mock_run: + with mock.patch('signal.signal'): + result = console_monitor.run_proxy("1") + + mock_run.assert_called_once() + self.assertEqual(result, 0) + + def test_run_proxy_handles_signal(self): + """Test run_proxy sets up signal handlers.""" + with mock.patch.object(console_monitor.ProxyService, 'run', return_value=0): + with mock.patch('signal.signal') as mock_signal: + console_monitor.run_proxy("1") + + # Should register handlers for SIGINT, SIGTERM, SIGHUP + self.assertEqual(mock_signal.call_count, 3) + + +class TestFrameFilterInternalMethods(TestCase): + """Tests for FrameFilter internal methods.""" + + def test_frame_filter_try_parse_frame_empty_buffer(self): + """Test _try_parse_frame with empty buffer.""" + filter = console_monitor.FrameFilter() + + # Process SOF then immediately EOF with empty content + filter.process(console_monitor.SOF_SEQUENCE + console_monitor.EOF_SEQUENCE) + + # Should not crash, just skip + self.assertFalse(filter.has_pending_data()) + + def test_frame_filter_discard_buffer_called_on_overflow_in_frame(self): + """Test buffer is discarded on overflow when inside frame.""" + frames = [] + user_data = [] + + filter = console_monitor.FrameFilter( + on_frame=lambda f: frames.append(f), + on_user_data=lambda d: user_data.append(d) + ) + + # Start a frame + filter.process(console_monitor.SOF_SEQUENCE) + self.assertTrue(filter.in_frame) + + # Send more than MAX_FRAME_BUFFER_SIZE bytes + large_data = b"x" * (console_monitor.MAX_FRAME_BUFFER_SIZE + 10) + filter.process(large_data) + + # Frame should be discarded due to overflow + self.assertFalse(filter.in_frame) + + def test_frame_filter_sof_in_frame_restarts(self): + """Test receiving SOF while in frame discards current and starts new.""" + frames = [] + + filter = console_monitor.FrameFilter(on_frame=lambda f: frames.append(f)) + + # Start a frame + filter.process(console_monitor.SOF_SEQUENCE + b"partial") + self.assertTrue(filter.in_frame) + + # Another SOF should discard current and start new frame + heartbeat = console_monitor.Frame.create_heartbeat(1) + filter.process(heartbeat.build()) + + # Should have parsed the complete heartbeat frame + self.assertEqual(len(frames), 1) + + +class TestMainWithSubcommands(TestCase): + """Tests for main() with different subcommands.""" + + def test_main_pty_bridge_mode(self): + """Test main dispatches to run_pty_bridge.""" + with mock.patch.object(sys, 'argv', ['console-monitor', 'pty-bridge', '1']): + with mock.patch.object(console_monitor, 'run_pty_bridge', return_value=0) as mock_run: + with self.assertRaises(SystemExit) as context: + console_monitor.main() + + mock_run.assert_called_once_with('1') + self.assertEqual(context.exception.code, 0) + + def test_main_proxy_mode(self): + """Test main dispatches to run_proxy.""" + with mock.patch.object(sys, 'argv', ['console-monitor', 'proxy', '1']): + with mock.patch.object(console_monitor, 'run_proxy', return_value=0) as mock_run: + with self.assertRaises(SystemExit) as context: + console_monitor.main() + + mock_run.assert_called_once_with('1') + self.assertEqual(context.exception.code, 0) + + def test_main_dce_mode(self): + """Test main dispatches to run_dce.""" + with mock.patch.object(sys, 'argv', ['console-monitor', 'dce']): + with mock.patch.object(console_monitor, 'run_dce', return_value=0) as mock_run: + with self.assertRaises(SystemExit) as context: + console_monitor.main() + + mock_run.assert_called_once() + self.assertEqual(context.exception.code, 0) + + def test_main_dte_mode(self): + """Test main dispatches to run_dte.""" + with mock.patch.object(sys, 'argv', ['console-monitor', 'dte', 'ttyS0', '9600']): + with mock.patch.object(console_monitor, 'run_dte', return_value=0) as mock_run: + with self.assertRaises(SystemExit) as context: + console_monitor.main() + + mock_run.assert_called_once_with('ttyS0', 9600) + self.assertEqual(context.exception.code, 0) + + def test_main_with_log_level(self): + """Test main sets log level from argument.""" + with mock.patch.object(sys, 'argv', ['console-monitor', 'dce', '-l', 'debug']): + with mock.patch.object(console_monitor, 'run_dce', return_value=0): + with mock.patch.object(console_monitor, 'set_log_level') as mock_log: + with self.assertRaises(SystemExit): + console_monitor.main() + + mock_log.assert_called_once_with('debug') + + +class TestCalculateFilterTimeout(TestCase): + """Tests for calculate_filter_timeout function.""" + + def test_calculate_filter_timeout_with_custom_multiplier(self): + """Test calculate_filter_timeout with different multipliers.""" + timeout_default = console_monitor.calculate_filter_timeout(9600) + timeout_custom = console_monitor.calculate_filter_timeout(9600, multiplier=5) + + # Custom multiplier should give larger timeout + self.assertGreater(timeout_custom, timeout_default) + + def test_calculate_filter_timeout_different_bauds(self): + """Test timeout varies inversely with baud rate.""" + timeout_slow = console_monitor.calculate_filter_timeout(1200) + timeout_fast = console_monitor.calculate_filter_timeout(115200) + + # Slower baud should have longer timeout + self.assertGreater(timeout_slow, timeout_fast) + + +# ============================================================ +# Additional Coverage Tests - Parse and Error Paths +# ============================================================ + +class TestParseProcCmdlineErrors(TestCase): + """Tests for parse_proc_cmdline error handling.""" + + def test_parse_proc_cmdline_file_read_error(self): + """Test parse_proc_cmdline raises ValueError on file read error.""" + with mock.patch('builtins.open', side_effect=IOError("Permission denied")): + with self.assertRaises(ValueError) as context: + console_monitor.parse_proc_cmdline() + + self.assertIn("Failed to read", str(context.exception)) + + +class TestProxyServiceRunLoop(TestCase): + """Tests for ProxyService _run_loop method.""" + + def test_proxy_run_loop_handles_exception(self): + """Test _run_loop handles exceptions gracefully.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.running = True + proxy.ser_fd = 10 + proxy.ptm_fd = 11 + proxy._wake_r = 12 + proxy.baud = 9600 + proxy._last_heartbeat_time = time.monotonic() + proxy._last_data_activity = time.monotonic() + proxy._last_serial_data_time = time.monotonic() + proxy.filter = mock.Mock() + proxy.filter.has_pending_data.return_value = False + + call_count = 0 + def stop_after_one(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count > 1: + proxy.running = False + raise Exception("Select error") + + with mock.patch('select.select', side_effect=stop_after_one): + with mock.patch('time.sleep'): + proxy._run_loop() + + self.assertFalse(proxy.running) + + def test_proxy_run_loop_wakeup_pipe(self): + """Test _run_loop handles wakeup pipe.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.running = True + proxy.baud = 9600 + proxy._last_heartbeat_time = time.monotonic() + proxy._last_data_activity = time.monotonic() + proxy._last_serial_data_time = time.monotonic() + proxy.filter = mock.Mock() + proxy.filter.has_pending_data.return_value = False + + wake_r, wake_w = os.pipe() + proxy._wake_r = wake_r + proxy._wake_w = wake_w + proxy.ser_fd = 100 # Fake fd that won't be selected + proxy.ptm_fd = 101 + + try: + call_count = 0 + def mock_select(rlist, wlist, xlist, timeout): + nonlocal call_count + call_count += 1 + if call_count == 1: + os.write(wake_w, b'x') # Trigger wakeup + return ([wake_r], [], []) + else: + proxy.running = False + return ([], [], []) + + with mock.patch('select.select', side_effect=mock_select): + proxy._run_loop() + finally: + os.close(wake_r) + os.close(wake_w) + + def test_proxy_run_loop_filter_timeout(self): + """Test _run_loop triggers filter timeout.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.running = True + proxy.baud = 9600 + proxy._last_heartbeat_time = time.monotonic() + proxy._last_data_activity = time.monotonic() + proxy._last_serial_data_time = time.monotonic() - 10 # Long ago + proxy.filter = mock.Mock() + proxy.filter.has_pending_data.return_value = True + + wake_r, wake_w = os.pipe() + proxy._wake_r = wake_r + proxy._wake_w = wake_w + proxy.ser_fd = 100 + proxy.ptm_fd = 101 + + try: + call_count = 0 + def mock_select(rlist, wlist, xlist, timeout): + nonlocal call_count + call_count += 1 + if call_count >= 2: + proxy.running = False + return ([], [], []) + + with mock.patch('select.select', side_effect=mock_select): + proxy._run_loop() + + # Filter timeout should have been called + proxy.filter.on_timeout.assert_called() + finally: + os.close(wake_r) + os.close(wake_w) + + +class TestProxyServiceUserDataOSError(TestCase): + """Test ProxyService _on_user_data_received OSError handling.""" + + def test_on_user_data_received_write_error(self): + """Test _on_user_data_received handles write OSError.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.ptm_fd = 10 + + with mock.patch('os.write', side_effect=OSError("Write failed")): + # Should not raise + proxy._on_user_data_received(b"test data") + + +class TestDCEServiceSystemctlFailures(TestCase): + """Additional tests for DCE service systemctl failure handling.""" + + def setUp(self): + MockSubprocess.reset() + MockConfigDb.CONFIG_DB = None + + def tearDown(self): + MockSubprocess.reset() + MockConfigDb.CONFIG_DB = None + + def test_dce_start_pty_bridge_failure(self): + """Test _start_pty_bridge returns False on command failure.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + mock_result = mock.Mock() + mock_result.returncode = 1 + mock_result.stderr = "Service failed" + + with mock.patch('subprocess.run', return_value=mock_result): + result = service._start_pty_bridge("1") + + self.assertFalse(result) + + def test_dce_stop_pty_bridge_failure(self): + """Test _stop_pty_bridge returns False on command failure.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + mock_result = mock.Mock() + mock_result.returncode = 1 + mock_result.stderr = "Service stop failed" + + with mock.patch('subprocess.run', return_value=mock_result): + result = service._stop_pty_bridge("1") + + self.assertFalse(result) + + def test_dce_start_proxy_failure(self): + """Test _start_proxy returns False on command failure.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + mock_result = mock.Mock() + mock_result.returncode = 1 + mock_result.stderr = "Proxy start failed" + + with mock.patch('subprocess.run', return_value=mock_result): + result = service._start_proxy("1") + + self.assertFalse(result) + + def test_dce_stop_proxy_failure(self): + """Test _stop_proxy returns False on command failure.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + mock_result = mock.Mock() + mock_result.returncode = 1 + mock_result.stderr = "Proxy stop failed" + + with mock.patch('subprocess.run', return_value=mock_result): + result = service._stop_proxy("1") + + self.assertFalse(result) + + def test_dce_stop_pty_bridge_exception(self): + """Test _stop_pty_bridge handles general exceptions.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + with mock.patch('subprocess.run', side_effect=Exception("Unexpected")): + result = service._stop_pty_bridge("1") + + self.assertFalse(result) + + def test_dce_start_proxy_exception(self): + """Test _start_proxy handles general exceptions.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + with mock.patch('subprocess.run', side_effect=Exception("Unexpected")): + result = service._start_proxy("1") + + self.assertFalse(result) + + def test_dce_stop_proxy_exception(self): + """Test _stop_proxy handles general exceptions.""" + MockConfigDb.set_config_db(DCE_3_LINKS_ENABLED_CONFIG_DB) + + service = console_monitor.DCEService() + service.config_db = MockConfigDb() + + with mock.patch('subprocess.run', side_effect=Exception("Unexpected")): + result = service._stop_proxy("1") + + self.assertFalse(result) + + +class TestProxyServiceCleanupStateError(TestCase): + """Test ProxyService _cleanup_state error handling.""" + + def test_cleanup_state_handles_exception(self): + """Test _cleanup_state handles exceptions gracefully.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.state_table = mock.Mock() + proxy.state_table.hdel.side_effect = Exception("Redis error") + + # Should not raise + proxy._cleanup_state() + + +class TestProxyServiceUpdateStateError(TestCase): + """Test ProxyService _update_state error handling.""" + + def test_update_state_handles_exception(self): + """Test _update_state handles exceptions gracefully.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.state_table = mock.Mock() + proxy.state_table.set.side_effect = Exception("Redis error") + proxy._current_oper_state = None # Force state change + + # Should not raise + proxy._update_state("Up") + + +class TestGetUdevPrefixPaths(TestCase): + """Tests for get_udev_prefix with config file.""" + + def test_get_udev_prefix_empty_config(self): + """Test get_udev_prefix returns default when config file is empty.""" + mock_device_info = mock.Mock() + mock_device_info.get_paths_to_platform_and_hwsku_dirs.return_value = ("/tmp/platform", "/tmp/hwsku") + + # Reload module to test actual function + result = console_monitor.get_udev_prefix() + self.assertIsInstance(result, str) + + +class TestProxyStopWithWakePipe(TestCase): + """Test ProxyService stop with wake pipe.""" + + def test_proxy_stop_wakes_select_loop(self): + """Test stop() writes to wake pipe.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.running = True + + wake_r, wake_w = os.pipe() + proxy._wake_r = wake_r + proxy._wake_w = wake_w + + try: + proxy.stop() + + self.assertFalse(proxy.running) + # Read from wake pipe should have data + data = os.read(wake_r, 1) + self.assertEqual(data, b'x') + finally: + os.close(wake_r) + os.close(wake_w) + + def test_proxy_stop_handles_write_error(self): + """Test stop() handles write error on wake pipe.""" + proxy = console_monitor.ProxyService(link_id="1") + proxy.running = True + proxy._wake_w = 999 # Invalid fd + + # Should not raise + proxy.stop() + + self.assertFalse(proxy.running) + + +# Add necessary imports +import logging +import subprocess + + +# Add necessary import for fcntl +import fcntl + + +if __name__ == '__main__': + import unittest + unittest.main() diff --git a/tests/console_monitor/test_vectors.py b/tests/console_monitor/test_vectors.py new file mode 100644 index 00000000..d85e3f21 --- /dev/null +++ b/tests/console_monitor/test_vectors.py @@ -0,0 +1,136 @@ +""" +Test vectors for consoled tests. + +Contains test configuration data following SONiC CONFIG_DB schema: +- CONSOLE_SWITCH table: Feature enable/disable control +- CONSOLE_PORT table: Per-port configuration (baud_rate, remote_device, flow_control) +""" + +# ============================================================ +# CONSOLE_SWITCH table test data +# ============================================================ + +# console_mgmt entry - feature enabled +CONSOLE_SWITCH_ENABLED = { + "console_mgmt": { + "enabled": "yes" + } +} + +# console_mgmt entry - feature disabled +CONSOLE_SWITCH_DISABLED = { + "console_mgmt": { + "enabled": "no" + } +} + +# controlled_device entry for DTE side - enabled +CONTROLLED_DEVICE_ENABLED = { + "controlled_device": { + "enabled": "yes" + } +} + +# controlled_device entry for DTE side - disabled +CONTROLLED_DEVICE_DISABLED = { + "controlled_device": { + "enabled": "no" + } +} + + +# ============================================================ +# CONSOLE_PORT table test data +# ============================================================ + +# Three console ports configuration +CONSOLE_PORT_3_LINKS = { + "1": { + "baud_rate": "9600", + "remote_device": "switch-01", + "flow_control": "0" + }, + "2": { + "baud_rate": "115200", + "remote_device": "switch-02", + "flow_control": "1" + }, + "3": { + "baud_rate": "9600", + "remote_device": "router-01", + "flow_control": "0" + } +} + +# Single console port configuration +CONSOLE_PORT_SINGLE = { + "1": { + "baud_rate": "9600", + "remote_device": "device-01", + "flow_control": "0" + } +} + +# Empty console port configuration +CONSOLE_PORT_EMPTY = {} + + +# ============================================================ +# Complete CONFIG_DB test scenarios +# ============================================================ + +# Scenario: DCE service with 3 console links enabled +DCE_3_LINKS_ENABLED_CONFIG_DB = { + "CONSOLE_SWITCH": CONSOLE_SWITCH_ENABLED, + "CONSOLE_PORT": CONSOLE_PORT_3_LINKS, +} + +# Scenario: DCE service with feature disabled +DCE_FEATURE_DISABLED_CONFIG_DB = { + "CONSOLE_SWITCH": CONSOLE_SWITCH_DISABLED, + "CONSOLE_PORT": CONSOLE_PORT_3_LINKS, +} + +# Scenario: DCE service with no ports configured +DCE_NO_PORTS_CONFIG_DB = { + "CONSOLE_SWITCH": CONSOLE_SWITCH_ENABLED, + "CONSOLE_PORT": CONSOLE_PORT_EMPTY, +} + +# Scenario: DTE service enabled +DTE_ENABLED_CONFIG_DB = { + "CONSOLE_SWITCH": CONTROLLED_DEVICE_ENABLED, +} + +# Scenario: DTE service disabled +DTE_DISABLED_CONFIG_DB = { + "CONSOLE_SWITCH": CONTROLLED_DEVICE_DISABLED, +} + + +# ============================================================ +# Test vectors for parameterized tests +# ============================================================ + +DCE_TEST_VECTOR = [ + # (test_name, config_db, expected_proxy_count) + ("DCE_3_Links_Enabled", DCE_3_LINKS_ENABLED_CONFIG_DB, 3), + ("DCE_Feature_Disabled", DCE_FEATURE_DISABLED_CONFIG_DB, 0), + ("DCE_No_Ports", DCE_NO_PORTS_CONFIG_DB, 0), +] + +DTE_TEST_VECTOR = [ + # (test_name, config_db, expected_heartbeat_enabled) + ("DTE_Enabled", DTE_ENABLED_CONFIG_DB, True), + ("DTE_Disabled", DTE_DISABLED_CONFIG_DB, False), +] + + +# ============================================================ +# /proc/cmdline test data for DTE +# ============================================================ + +PROC_CMDLINE_SINGLE_CONSOLE = "BOOT_IMAGE=/boot/vmlinuz console=ttyS0,9600n8" +PROC_CMDLINE_MULTIPLE_CONSOLE = "BOOT_IMAGE=/boot/vmlinuz console=tty0 console=ttyS1,115200" +PROC_CMDLINE_NO_BAUD = "BOOT_IMAGE=/boot/vmlinuz console=ttyS0" +PROC_CMDLINE_NO_CONSOLE = "BOOT_IMAGE=/boot/vmlinuz root=/dev/sda1"