From c516698e719b5ea4d04e352b81fb5273f7c43b1c Mon Sep 17 00:00:00 2001 From: Yash Goyal Date: Wed, 24 Sep 2025 21:16:56 -0700 Subject: [PATCH 1/2] Added support for ZephyrOS based devices --- README.md | 12 +- client_test_lib/fixtures/client_fixtures.py | 11 +- client_test_lib/tools/client_runner.py | 127 ++++++++++++++++++-- client_test_lib/tools/utils.py | 110 +++++++++++++++++ requirements.txt | 1 + 5 files changed, 243 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 44d2679..94ecc46 100644 --- a/README.md +++ b/README.md @@ -25,11 +25,19 @@ $ pip install -I client_test_lib*.whl - Windows: `set CLOUD_API_KEY=` - Default API address is `https://api.us-east-1.mbedcloud.com`. You can change this by defining `CLOUD_API_GW` environment variable in similar way as `CLOUD_API_KEY` is done above. - Test run will create temporary API key for the WebSocket callback channel by default. If you want to prevent that and use only the exported API key, add `--use_one_apikey` startup argument. -- Tests use [Mbed LS](https://github.com/ARMmbed/mbed-os-tools/tree/master/packages/mbed-ls) to select the board from the serial port. +- Tests use [pyOCD](https://pyocd.io/) for device discovery, with automatic fallback to [Mbed LS](https://github.com/ARMmbed/mbed-os-tools/tree/master/packages/mbed-ls) if pyOCD is not available. - If you have only one board connected to the serial port, you don't need to select the device for the tests. - - If there are multiple boards connected to the serial port, run `mbedls` to check the target board's ID, and use it in the test run's argument `--target_id=[id]`. + - If there are multiple boards connected to the serial port, you can use either: + - `pyocd list` to check pyOCD-discovered boards and use the board ID with `--target_id=[id]` + - `mbedls` to check mbed-discovered boards and use the target ID with `--target_id=[id]` ```bash + # Using pyOCD (preferred) + $ pyocd list + [INFO] Available debug probes: + 0: 0240000032044e4500257009997b00386781000097969900 (ST-Link V2-1) + + # Using mbed-ls (fallback) $ mbedls +---------------+----------------------+-------------+--------------+--------------------------------------------------+-----------------+ | platform_name | platform_name_unique | mount_point | serial_port | target_id | daplink_version | diff --git a/client_test_lib/fixtures/client_fixtures.py b/client_test_lib/fixtures/client_fixtures.py index 10dbfe5..c3e7e05 100644 --- a/client_test_lib/fixtures/client_fixtures.py +++ b/client_test_lib/fixtures/client_fixtures.py @@ -21,7 +21,7 @@ from client_test_lib.tools.external_conn import ExternalConnection from client_test_lib.tools.local_conn import LocalConnection from client_test_lib.tools.serial_conn import SerialConnection -from client_test_lib.tools.utils import get_serial_port_for_mbed +from client_test_lib.tools.utils import get_serial_port_for_mbed, get_serial_port_for_pyocd log = logging.getLogger(__name__) @@ -40,23 +40,28 @@ def client_internal(request): log.info("Using local binary process") conn = LocalConnection(request.config.getoption("local_binary")) else: - address = get_serial_port_for_mbed( + # Try pyocd first, fall back to mbed-ls if needed + address = get_serial_port_for_pyocd( request.config.getoption("target_id") ) + log.info("Serial connection address: {}".format(address)) if address: conn = SerialConnection(address, 115200) + log.info("Serial connection opened successfully") else: err_msg = "No serial connection to open for test device" log.error(err_msg) assert False, err_msg - cli = Client(conn) + cli = Client(conn, trace=True) # reset the serial connection device if not request.config.getoption( "ext_conn" ) and not request.config.getoption("local_binary"): + log.info("Resetting device before test...") cli.reset() + sleep(2) # Give device time to reset and stabilize cli.wait_for_output("Client registered", 300) ep_id = cli.endpoint_id(120) diff --git a/client_test_lib/tools/client_runner.py b/client_test_lib/tools/client_runner.py index 8f3f2b0..8a9fed9 100644 --- a/client_test_lib/tools/client_runner.py +++ b/client_test_lib/tools/client_runner.py @@ -39,12 +39,14 @@ class Client: :param dut: Running client object :param trace: Log the raw client output :param name: Logging name for the client + :param filter_debug: Filter out debug messages to reduce log noise (default: True) """ - def __init__(self, dut, trace=False, name="0"): + def __init__(self, dut, trace=False, name="0", filter_debug=True): self._ep_id = None self.name = name self.trace = trace + self.filter_debug = filter_debug self.run = True self.iq = queue.Queue() self.dut = dut @@ -64,21 +66,120 @@ def _input_thread(self): while self.run: line = self.dut.readline() if line: - plain_line = utils.strip_escape(line) - if b"\r" in line and line.count(b"\r") > 1: - plain_line = plain_line.split(b"\r")[-2] - plain_line = plain_line.replace(b"\t", b" ").decode( - "utf-8", "replace" - ) - flog.info("<--|D{}| {}".format(self.name, plain_line.strip())) - if self.trace: - log.debug("Raw output: {}".format(line)) - if b"Error" in line: - log.error("Output: {}".format(line)) - self.iq.put(plain_line) + plain_line = self._parse_serial_line(line) + if plain_line: # Only process non-empty lines + flog.info("<--|D{}| {}".format(self.name, plain_line.strip())) + if self.trace: + log.debug("Raw output: {}".format(line)) + self._check_for_errors(line, plain_line) + self.iq.put(plain_line) else: pass + def _parse_serial_line(self, line): + """ + Parse serial line to extract clean content + :param line: Raw serial line bytes + :return: Cleaned string or None if line should be ignored + """ + if not line or line == b'': + return None + + # Strip escape sequences first + plain_line = utils.strip_escape(line) + + # Handle multiple carriage returns and newlines + # Split on \r\n or \r\r\n patterns and take the last meaningful part + if b"\r" in plain_line: + # Split on carriage returns and filter out empty parts + parts = plain_line.split(b"\r") + # Find the last non-empty part that contains actual content + for part in reversed(parts): + if part.strip() and not part.startswith(b"\n"): + plain_line = part + break + + # Remove leading/trailing newlines and whitespace + plain_line = plain_line.strip(b"\n\r\t ") + + # Skip empty lines + if not plain_line: + return None + + # Convert tabs to spaces and decode to string + plain_line = plain_line.replace(b"\t", b" ").decode("utf-8", "replace") + + # Skip lines that are just whitespace or control characters + if not plain_line.strip(): + return None + + # Filter debug output if enabled + if self.filter_debug and self._is_debug_line(plain_line): + return None + + return plain_line + + def _is_debug_line(self, line): + """ + Check if a line is debug output that should be filtered + :param line: Parsed line string + :return: True if line should be filtered out + """ + line_lower = line.lower().strip() + + # log.info("Checking if line is debug: {}".format(line_lower)) + # Common debug patterns to filter + debug_patterns = [ + "[trace][paal]", + "debug: ", + ] + + for pattern in debug_patterns: + if pattern in line_lower: + return True + + return False + + def _check_for_errors(self, raw_line, parsed_line): + """ + Check for error conditions in the serial output + :param raw_line: Raw bytes from serial + :param parsed_line: Parsed string line + """ + # Check for various error patterns (case insensitive) + error_patterns = [ + b"Error ", + b"ERROR:", + b"error:", + b"Error:", + b"FAIL", + b"fail", + b"Exception", + b"exception", + b"Fatal", + b"fatal", + b"Critical", + b"critical" + ] + + # Check raw line for error patterns + for pattern in error_patterns: + if pattern in raw_line: + log.error("Output: {}".format(raw_line)) + return + + # Also check parsed line for error keywords + parsed_lower = parsed_line.lower() + error_keywords = [ + "error", "fail", "exception", "fatal", "critical", + "timeout", "abort", "crash", "panic" + ] + + for keyword in error_keywords: + if keyword in parsed_lower: + log.error("Output: {}".format(raw_line)) + return + def _read_line(self, timeout): """ Read data from input queue diff --git a/client_test_lib/tools/utils.py b/client_test_lib/tools/utils.py index 713cd17..0046021 100644 --- a/client_test_lib/tools/utils.py +++ b/client_test_lib/tools/utils.py @@ -18,6 +18,7 @@ import re import string import mbed_lstools +import serial.tools.list_ports log = logging.getLogger(__name__) @@ -157,6 +158,115 @@ def get_serial_port_for_mbed(target_id): return None +def get_serial_port_for_pyocd(target_id): + """ + Gets serial port address for the device using pyocd for device discovery + Falls back to mbed-ls if pyocd is not available or fails + :param target_id: device target_id (can be pyocd board ID or mbed target_id) + :return: Serial port address + """ + try: + from pyocd.core.helpers import ConnectHelper + + log.debug("Attempting to discover devices using pyocd") + + # Try to create a session with pyocd + session = ConnectHelper.session_with_chosen_probe() + + if session is None: + log.warning("No devices found with pyocd, falling back to mbed-ls") + return get_serial_port_for_mbed(target_id) + + # Get the probe from the session + probe = session.probe + if probe: + # Map pyocd probe to serial port + serial_port = _map_pyocd_probe_to_serial_port(probe) + if serial_port: + log.info( + 'Using pyocd-discovered device "{}" at "{}" port for tests'.format( + getattr(probe, 'unique_id', 'Unknown'), + serial_port + ) + ) + session.close() + return serial_port + else: + log.warning("Could not map pyocd probe to serial port, falling back to mbed-ls") + session.close() + return get_serial_port_for_mbed(target_id) + else: + log.warning("No probe found in pyocd session, falling back to mbed-ls") + session.close() + return get_serial_port_for_mbed(target_id) + + except ImportError: + log.debug("pyocd not available, falling back to mbed-ls") + return get_serial_port_for_mbed(target_id) + except Exception as e: + log.warning("pyocd device discovery failed: {}, falling back to mbed-ls".format(e)) + return get_serial_port_for_mbed(target_id) + + +def _map_pyocd_probe_to_serial_port(probe): + """ + Maps a pyocd probe to its corresponding serial port + :param probe: pyocd probe object + :return: Serial port path or None if not found + """ + try: + # Get all available serial ports + ports = serial.tools.list_ports.comports() + + # Try to match based on USB VID/PID if available + if hasattr(probe, 'vid') and hasattr(probe, 'pid'): + target_vid = probe.vid + target_pid = probe.pid + + for port in ports: + if port.vid == target_vid and port.pid == target_pid: + log.debug("Matched pyocd probe to serial port {} by VID/PID".format(port.device)) + return port.device + + # Prioritize USB serial ports over system serial ports + # Common patterns for ARM development boards (in order of preference) + arm_patterns = [ + 'ttyACM', # Linux USB CDC-ACM (most common for ARM boards) + 'ttyUSB', # Linux USB serial + 'cu.usbmodem', # macOS USB + 'COM', # Windows + ] + + # First pass: look for USB serial ports + for port in ports: + port_name = port.device.lower() + for pattern in arm_patterns: + if pattern in port_name: + log.debug("Matched pyocd probe to USB serial port {} by name pattern".format(port.device)) + return port.device + + # Second pass: exclude system serial ports and use first available USB port + usb_ports = [] + for port in ports: + port_name = port.device.lower() + # Skip system serial ports (ttyS*) and virtual ports + if not any(skip in port_name for skip in ['ttys', 'pts', 'ttyprintk']): + usb_ports.append(port) + + if usb_ports: + log.debug("Using first available USB serial port {} for pyocd probe".format(usb_ports[0].device)) + return usb_ports[0].device + + # Last resort: return None to fall back to mbed-ls + log.debug("No suitable USB serial port found for pyocd probe") + return None + + except Exception as e: + log.debug("Error mapping pyocd probe to serial port: {}".format(e)) + + return None + + def get_path(path): if "WORKSPACE" in os.environ: log.debug("$WORKSPACE: {}".format(os.environ["WORKSPACE"])) diff --git a/requirements.txt b/requirements.txt index 303f771..90e7ae2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ manifest-tool==2.6.2 mbed-ls==1.8.* +pyocd>=0.35.0 pytest==7.4.4 pytest-html pyserial From 973fefb540debcc001d8c591ef72d88d31588b51 Mon Sep 17 00:00:00 2001 From: Yash Goyal Date: Wed, 24 Sep 2025 21:42:38 -0700 Subject: [PATCH 2/2] Fixing pysh check --- client_test_lib/fixtures/client_fixtures.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/client_test_lib/fixtures/client_fixtures.py b/client_test_lib/fixtures/client_fixtures.py index c3e7e05..1a04892 100644 --- a/client_test_lib/fixtures/client_fixtures.py +++ b/client_test_lib/fixtures/client_fixtures.py @@ -21,7 +21,10 @@ from client_test_lib.tools.external_conn import ExternalConnection from client_test_lib.tools.local_conn import LocalConnection from client_test_lib.tools.serial_conn import SerialConnection -from client_test_lib.tools.utils import get_serial_port_for_mbed, get_serial_port_for_pyocd +from client_test_lib.tools.utils import ( + get_serial_port_for_mbed, + get_serial_port_for_pyocd, +) log = logging.getLogger(__name__)