From 6892f25ab3525a1e865d7b73e300a219a3d51ad3 Mon Sep 17 00:00:00 2001 From: Willian Galvani Date: Fri, 14 Nov 2025 14:43:28 -0300 Subject: [PATCH 1/6] AutopilotManager: drop CAP_SYS_ADMIN capability when starting --- .github/workflows/test-and-deploy.yml | 2 +- core/Dockerfile | 2 +- core/services/ardupilot_manager/main.py | 17 ++++++++++++++++- core/services/ardupilot_manager/pyproject.toml | 1 + core/tools/install-system-tools.sh | 4 +--- core/uv.lock | 8 ++++++++ 6 files changed, 28 insertions(+), 6 deletions(-) diff --git a/.github/workflows/test-and-deploy.yml b/.github/workflows/test-and-deploy.yml index 57893391bc..24616f733a 100644 --- a/.github/workflows/test-and-deploy.yml +++ b/.github/workflows/test-and-deploy.yml @@ -29,7 +29,7 @@ jobs: - name: Install dependencies run: | - sudo apt-get install --no-install-recommends --assume-yes shellcheck parallel + sudo apt-get install --no-install-recommends --assume-yes shellcheck parallel libcap-dev python -m pip install --upgrade pip pip install poetry diff --git a/core/Dockerfile b/core/Dockerfile index e036e6a8af..5a906c1cde 100644 --- a/core/Dockerfile +++ b/core/Dockerfile @@ -37,7 +37,7 @@ RUN /home/pi/tools/install-static-binaries.sh # Generation of python virtual environment for our libraries and services FROM base AS install-services-and-libs -RUN apt update && apt install -y --no-install-recommends g++ +RUN apt update && apt install -y --no-install-recommends g++ libcap-dev # UV installation ADD https://astral.sh/uv/install.sh /uv-installer.sh diff --git a/core/services/ardupilot_manager/main.py b/core/services/ardupilot_manager/main.py index acebc74343..de3da29523 100755 --- a/core/services/ardupilot_manager/main.py +++ b/core/services/ardupilot_manager/main.py @@ -16,12 +16,27 @@ init_logger(SERVICE_NAME) logger.info("Starting AutoPilot Manager.") + +# Drop CAP_SYS_ADMIN capability for the entire service as it's not needed +# Serial port access, process management, and file operations don't require it +try: + import prctl + + prctl.cap_effective.drop(prctl.CAP_SYS_ADMIN) + prctl.cap_permitted.drop(prctl.CAP_SYS_ADMIN) + logger.info("Dropped CAP_SYS_ADMIN capability for security") +except ImportError: + logger.warning("prctl module not available - cannot drop capabilities") +except Exception as e: + logger.warning(f"Failed to drop CAP_SYS_ADMIN capability: {e}") + autopilot = AutoPilotManager() from api import application if not is_running_as_root(): - raise RuntimeError("AutoPilot manager needs to run with root privilege.") + logger.error("AutoPilot manager needs to run with root privilege.") + logger.error("expect issues") async def main() -> None: diff --git a/core/services/ardupilot_manager/pyproject.toml b/core/services/ardupilot_manager/pyproject.toml index a27e7bd409..009283af44 100644 --- a/core/services/ardupilot_manager/pyproject.toml +++ b/core/services/ardupilot_manager/pyproject.toml @@ -15,6 +15,7 @@ dependencies = [ "psutil==5.7.2", "pydantic==1.10.12", "pyelftools==0.30", + "python-prctl==1.8.1", "pyserial==3.5", # This dependency needs to be locked since it is used by fastapi "python-multipart==0.0.5", diff --git a/core/tools/install-system-tools.sh b/core/tools/install-system-tools.sh index 11f78db583..89eb401ab4 100755 --- a/core/tools/install-system-tools.sh +++ b/core/tools/install-system-tools.sh @@ -17,6 +17,4 @@ parallel --halt now,fail=1 '/home/pi/tools/{}/bootstrap.sh' ::: "${TOOLS[@]}" # Tools that uses apt to do the installation # APT is terrible like pip and don't know how to handle parallel installation # These should periodically be moved onto the base image - -# apt update && apt install -y --no-install-recommends [your latest forgotten packages here] -# clean && rm -rf /var/lib/apt/lists/* +apt update && apt install -y --no-install-recommends dhcpcd5 iptables iproute2 isc-dhcp-client nmap systemd libcap-dev diff --git a/core/uv.lock b/core/uv.lock index 6c28ffd4d2..f525f3b144 100644 --- a/core/uv.lock +++ b/core/uv.lock @@ -86,6 +86,7 @@ dependencies = [ { name = "pyelftools" }, { name = "pyserial" }, { name = "python-multipart" }, + { name = "python-prctl" }, { name = "smbus2" }, { name = "uvicorn" }, { name = "validators" }, @@ -105,6 +106,7 @@ requires-dist = [ { name = "pyelftools", specifier = "==0.30" }, { name = "pyserial", specifier = "==3.5" }, { name = "python-multipart", specifier = "==0.0.5" }, + { name = "python-prctl", specifier = "==1.8.1" }, { name = "smbus2", specifier = "==0.3.0" }, { name = "uvicorn", specifier = "==0.18.0" }, { name = "validators", specifier = "==0.18.2" }, @@ -1327,6 +1329,12 @@ dependencies = [ ] sdist = { url = "https://files.pythonhosted.org/packages/46/40/a933ac570bf7aad12a298fc53458115cc74053474a72fbb8201d7dc06d3d/python-multipart-0.0.5.tar.gz", hash = "sha256:f7bb5f611fc600d15fa47b3974c8aa16e93724513b49b5f95c81e6624c83fa43", size = 32581 } +[[package]] +name = "python-prctl" +version = "1.8.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/c0/99/be5393cfe9c16376b4f515d90a68b11f1840143ac1890e9008bc176cf6a6/python-prctl-1.8.1.tar.gz", hash = "sha256:b4ca9a25a7d4f1ace4fffd1f3a2e64ef5208fe05f929f3edd5e27081ca7e67ce", size = 28033, upload-time = "2020-11-02T19:30:25.257Z" } + [[package]] name = "pytz" version = "2025.2" From 005cb379f44ce1d65b81b39366b61ff6386aade3 Mon Sep 17 00:00:00 2001 From: Willian Galvani Date: Fri, 14 Nov 2025 14:45:06 -0300 Subject: [PATCH 2/6] commonwealth: decorators: support caching async functions --- .../src/commonwealth/utils/decorators.py | 34 +++++-- .../utils/tests/test_decorators.py | 89 +++++++++++++++++++ 2 files changed, 116 insertions(+), 7 deletions(-) diff --git a/core/libs/commonwealth/src/commonwealth/utils/decorators.py b/core/libs/commonwealth/src/commonwealth/utils/decorators.py index 45875c0a64..2dca0eee71 100644 --- a/core/libs/commonwealth/src/commonwealth/utils/decorators.py +++ b/core/libs/commonwealth/src/commonwealth/utils/decorators.py @@ -1,3 +1,4 @@ +import asyncio import time from functools import wraps from threading import Lock @@ -9,6 +10,8 @@ def temporary_cache(timeout_seconds: float = 10) -> Callable[[F], F]: """Decorator that creates a cache for specific inputs with a configured timeout in seconds. + Supports both synchronous and asynchronous functions. + Args: timeout_seconds (float, optional): Timeout to be used for cache invalidation. Defaults to 10. @@ -19,23 +22,40 @@ def temporary_cache(timeout_seconds: float = 10) -> Callable[[F], F]: last_sample_time: Dict[Any, float] = {} def inner_function(function: F) -> F: - @wraps(function) - def wrapper(*args: Any) -> Any: - nonlocal last_sample_time + def is_cache_valid(args: Any) -> bool: current_time = time.time() - cache_is_valid = args in last_sample_time and current_time - last_sample_time[args] < timeout_seconds + return args in last_sample_time and current_time - last_sample_time[args] < timeout_seconds + + # Check if the function is async + if asyncio.iscoroutinefunction(function): + + @wraps(function) + async def async_wrapper(*args: Any) -> Any: + # The cache is still valid and we can return the value if exists + if is_cache_valid(args) and args in cache: + return cache[args] + # The cache is invalid or argument does not exist in cache, update it! + last_sample_time[args] = time.time() + function_return = await function(*args) + cache[args] = function_return + return function_return + + return async_wrapper # type: ignore + + @wraps(function) + def sync_wrapper(*args: Any) -> Any: # The cache is still valid and we can return the value if exists - if cache_is_valid and args in cache: + if is_cache_valid(args) and args in cache: return cache[args] # The cache is invalid or argument does not exist in cache, update it! - last_sample_time[args] = current_time + last_sample_time[args] = time.time() function_return = function(*args) cache[args] = function_return return function_return - return wrapper # type: ignore + return sync_wrapper # type: ignore return inner_function diff --git a/core/libs/commonwealth/src/commonwealth/utils/tests/test_decorators.py b/core/libs/commonwealth/src/commonwealth/utils/tests/test_decorators.py index 604bf84a13..5e697e0f9b 100644 --- a/core/libs/commonwealth/src/commonwealth/utils/tests/test_decorators.py +++ b/core/libs/commonwealth/src/commonwealth/utils/tests/test_decorators.py @@ -1,6 +1,9 @@ +import asyncio import time from datetime import datetime +import pytest + from .. import decorators CACHE_TIME = 0.3 @@ -24,3 +27,89 @@ def test_nested_settings_save_load() -> None: # Check if all cache values are invalid after waiting for a long time assert all(original_output[key] != cached_function(key) for key in inputs) + + +@pytest.mark.asyncio +async def test_async_cache_not_called_on_hit() -> None: + """Test that the inner coroutine is not called again on cache hit.""" + call_count = 0 + + @decorators.temporary_cache(timeout_seconds=CACHE_TIME) + async def counting_function(_key: str) -> int: + nonlocal call_count + call_count += 1 + await asyncio.sleep(0.01) + return call_count + + # First call - should execute the function + result1 = await counting_function("test_key") + assert result1 == 1 + assert call_count == 1 + + # Second call with same key - should return cached result WITHOUT calling function + result2 = await counting_function("test_key") + assert result2 == 1 # Same cached result + assert call_count == 1 # Function was NOT called again + + # Third call - still cached + result3 = await counting_function("test_key") + assert result3 == 1 + assert call_count == 1 + + +@pytest.mark.asyncio +async def test_async_cache_timeout() -> None: + """Test that async cache expires after timeout.""" + call_count = 0 + + @decorators.temporary_cache(timeout_seconds=CACHE_TIME) + async def counting_function(_key: str) -> int: + nonlocal call_count + call_count += 1 + await asyncio.sleep(0.01) + return call_count + + # First call + result1 = await counting_function("timeout_test") + assert result1 == 1 + assert call_count == 1 + + # Should return cached value immediately + result2 = await counting_function("timeout_test") + assert result2 == 1 + assert call_count == 1 + + # Wait for cache to expire + await asyncio.sleep(CACHE_WAIT_TIME) + + # Should call function again after timeout + result3 = await counting_function("timeout_test") + assert result3 == 2 # New result + assert call_count == 2 # Function was called again + + +@pytest.mark.asyncio +async def test_async_cache_no_coroutine_reuse_error() -> None: + """Test that async cached functions don't raise 'cannot reuse already awaited coroutine' error.""" + # This was the original bug - caching the coroutine instead of the result + + @decorators.temporary_cache(timeout_seconds=CACHE_TIME) + async def async_function(key: str) -> str: + await asyncio.sleep(0.01) + return f"result_{key}" + + key = "reuse_test" + + # First call + first_result = await async_function(key) + assert first_result == "result_reuse_test" + + # Second call should not raise "cannot reuse already awaited coroutine" + # It should return the cached result, not a cached coroutine + try: + second_result = await async_function(key) + assert second_result == first_result + except RuntimeError as e: + if "cannot reuse already awaited coroutine" in str(e): + pytest.fail("Cache is storing coroutine instead of result!") + raise From 941b74f69a02b594684409955771695ad1f5f0c8 Mon Sep 17 00:00:00 2001 From: Willian Galvani Date: Fri, 14 Nov 2025 14:48:11 -0300 Subject: [PATCH 3/6] autopilotmanager: add streaming, board_id, and multiple platform support --- .../ardupilot_manager/api/v1/routers/index.py | 203 ++++++++++++++---- .../ardupilot_manager/api/v2/routers/index.py | 15 ++ .../ardupilot_manager/autopilot_manager.py | 74 ++++--- .../firmware/FirmwareDownload.py | 106 +++++---- .../firmware/FirmwareInstall.py | 53 ++--- .../firmware/FirmwareManagement.py | 58 ++--- .../firmware/FirmwareUpload.py | 157 ++++++++++++-- .../firmware/test_FirmwareDownload.py | 49 +++-- .../firmware/test_FirmwareInstall.py | 41 ++-- .../flight_controller_detector/Detector.py | 109 +++++++--- .../board_identification.py | 33 --- .../bootloader/__init__.py | 0 .../bootloader/px4_boards.py | 63 ++++++ .../bootloader/px4_bootloader.py | 151 +++++++++++++ .../linux/argonot.py | 4 +- .../linux/navigator.py | 9 +- .../mavlink_board_id.py | 169 +++++++++++++++ core/services/ardupilot_manager/typedefs.py | 86 ++++---- 18 files changed, 1040 insertions(+), 340 deletions(-) delete mode 100644 core/services/ardupilot_manager/flight_controller_detector/board_identification.py create mode 100644 core/services/ardupilot_manager/flight_controller_detector/bootloader/__init__.py create mode 100644 core/services/ardupilot_manager/flight_controller_detector/bootloader/px4_boards.py create mode 100644 core/services/ardupilot_manager/flight_controller_detector/bootloader/px4_bootloader.py create mode 100644 core/services/ardupilot_manager/flight_controller_detector/mavlink_board_id.py diff --git a/core/services/ardupilot_manager/api/v1/routers/index.py b/core/services/ardupilot_manager/api/v1/routers/index.py index 19136b205b..be1277baeb 100644 --- a/core/services/ardupilot_manager/api/v1/routers/index.py +++ b/core/services/ardupilot_manager/api/v1/routers/index.py @@ -1,9 +1,10 @@ import asyncio +import json import os import shutil from functools import wraps from pathlib import Path -from typing import Any, Callable, List, Optional, Tuple +from typing import Any, AsyncGenerator, Awaitable, Callable, List, Optional, Tuple from autopilot_manager import AutoPilotManager from commonwealth.mavlink_comm.exceptions import ( @@ -12,17 +13,17 @@ MavlinkMessageSendFail, ) from commonwealth.mavlink_comm.typedefs import FirmwareInfo, MavlinkVehicleType -from commonwealth.utils.apis import StackedHTTPException from commonwealth.utils.decorators import single_threaded from exceptions import InvalidFirmwareFile, NoDefaultFirmwareAvailable from fastapi import APIRouter, Body, File, HTTPException, UploadFile, status -from fastapi.responses import PlainTextResponse +from fastapi.responses import PlainTextResponse, StreamingResponse from fastapi_versioning import versioned_api_route from loguru import logger from typedefs import ( Firmware, FlightController, FlightControllerFlags, + FlightControllerV1, Parameters, Serial, SITLFrame, @@ -59,7 +60,7 @@ async def wrapper(*args: Tuple[Any], **kwargs: dict[str, Any]) -> Any: # all the CRUD operations, we gonna keep ones that have less than 2 endpoints in the index router. -async def target_board(board_name: Optional[str]) -> FlightController: +async def target_board(board_name: Optional[str], board_id: Optional[int] = None) -> FlightController: """Returns the board that should be used to perform operations on. Most of the API routes that have operations related to board management will give the option to perform those @@ -71,7 +72,11 @@ async def target_board(board_name: Optional[str]) -> FlightController: """ if board_name is not None: try: - return next(board for board in await autopilot.available_boards(True) if board.name == board_name) + if board_id is not None: + return next( + board for board in await autopilot.available_boards(True) if board.ardupilot_board_id == board_id + ) + return next(board for board in await autopilot.available_boards(True) if board.platform.name == board_name) except StopIteration as error: raise ValueError("Chosen board not available.") from error if autopilot.current_board is None: @@ -88,6 +93,89 @@ def raise_lock(*raise_args: str, **kwargs: int) -> None: raise HTTPException(status_code=status.HTTP_423_LOCKED, detail="Operation already in progress.") +async def streaming_firmware_operation( + operation_func: Callable[..., Awaitable[None]], + *operation_args: Any, + pre_install_callback: Optional[Callable[[asyncio.Queue[Tuple[str, str]]], Awaitable[None]]] = None, + post_install_callback: Optional[Callable[[asyncio.Queue[Tuple[str, str]]], Awaitable[None]]] = None, + **operation_kwargs: Any, +) -> StreamingResponse: + """Common helper for streaming firmware installation operations. + + Args: + operation_func: The autopilot operation to perform (e.g., autopilot.install_firmware_from_url) + *operation_args: Positional arguments to pass to the operation function + pre_install_callback: Optional callback to run before stopping autopilot + post_install_callback: Optional callback to run after starting autopilot + **operation_kwargs: Keyword arguments to pass to the operation function + """ + + async def generate() -> AsyncGenerator[str, None]: + # Create a queue to receive output from the callback + output_queue: asyncio.Queue[Tuple[str, str]] = asyncio.Queue() + + async def output_callback(stream: str, line: str) -> None: + await output_queue.put((stream, line)) + + # Start the operation in a separate task + async def operation_task() -> None: + try: + # Run pre-install callback if provided + if pre_install_callback: + await pre_install_callback(output_queue) + + await autopilot.kill_ardupilot() + await output_queue.put(("stdout", "Stopped autopilot")) + + # Execute the main operation with output callback + await operation_func(*operation_args, output_callback=output_callback, **operation_kwargs) + + except (InvalidFirmwareFile, NoDefaultFirmwareAvailable, ValueError) as error: + await output_queue.put(("stderr", f"Error: {str(error)}")) + except Exception as e: + await output_queue.put(("stderr", f"Error: {str(e)}")) + finally: + await autopilot.start_ardupilot() + await output_queue.put(("stdout", "Started autopilot")) + + # Run post-install callback if provided + if post_install_callback: + await post_install_callback(output_queue) + + # Signal completion + await output_queue.put(("done", "")) + + # Start the operation task + task = asyncio.create_task(operation_task()) + + # Stream output as it comes + try: + while True: + stream, line = await output_queue.get() + if stream == "done": + break + yield json.dumps({"stream": stream, "data": line}) + "\n" + finally: + # Ensure task is cleaned up + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + return StreamingResponse( + generate(), + media_type="application/x-ndjson", + headers={ + "Content-Type": "application/x-ndjson", + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", # Disable buffering for nginx + }, + ) + + @index_router_v1.put("/serials", status_code=status.HTTP_200_OK) @index_to_http_exception def update_serials(serials: List[Serial] = Body(...)) -> Any: @@ -150,12 +238,16 @@ async def get_firmware_vehicle_type() -> Any: summary="Retrieve dictionary of available firmwares versions with their respective URL.", ) @index_to_http_exception -async def get_available_firmwares(vehicle: Vehicle, board_name: Optional[str] = None) -> Any: - return await autopilot.get_available_firmwares(vehicle, (await target_board(board_name)).platform) +async def get_available_firmwares( + vehicle: Vehicle, + board_name: Optional[str] = None, + board_id: Optional[int] = None, + firmware: Optional[str] = "Ardupilot", +) -> Any: + return await autopilot.get_available_firmwares(vehicle, await target_board(board_name, board_id), firmware) @index_router_v1.post("/install_firmware_from_url", summary="Install firmware for given URL.") -@index_to_http_exception @single_threaded(callback=raise_lock) async def install_firmware_from_url( url: str, @@ -164,28 +256,32 @@ async def install_firmware_from_url( parameters: Optional[Parameters] = None, auto_switch_board: bool = True, ) -> Any: - board = None - try: - await autopilot.kill_ardupilot() - board = await target_board(board_name) - await autopilot.install_firmware_from_url(url, board, make_default, parameters) - finally: - await autopilot.start_ardupilot() - - # In some cases user might install a firmware that implies in a board change but this is not reflected, - # so if the board is different from the current one, we change it. - if ( - auto_switch_board - and board - and autopilot.current_board - and autopilot.current_board.name != board.name - and FlightControllerFlags.is_bootloader not in board.flags - ): - await autopilot.change_board(board) + board = await target_board(board_name) + + async def post_install_callback(output_queue: asyncio.Queue[Tuple[str, str]]) -> None: + # In some cases user might install a firmware that implies in a board change but this is not reflected, + # so if the board is different from the current one, we change it. + if ( + auto_switch_board + and board + and autopilot.current_board + and autopilot.current_board.name != board.name + and FlightControllerFlags.is_bootloader not in board.flags + ): + await autopilot.change_board(board) + await output_queue.put(("stdout", f"Switched to board {board.name}")) + + return await streaming_firmware_operation( + autopilot.install_firmware_from_url, + url, + board, + make_default, + parameters, + post_install_callback=post_install_callback, + ) @index_router_v1.post("/install_firmware_from_file", summary="Install firmware from user file.") -@index_to_http_exception @single_threaded(callback=raise_lock) async def install_firmware_from_file( binary: UploadFile = File(...), @@ -196,25 +292,44 @@ async def install_firmware_from_file( custom_firmware = Path.joinpath(autopilot.settings.firmware_folder, "custom_firmware") with open(custom_firmware, "wb") as buffer: shutil.copyfileobj(binary.file, buffer) - logger.debug("Going to kill ardupilot") - await autopilot.kill_ardupilot() - logger.debug("Installing firmware from file") - await autopilot.install_firmware_from_file(custom_firmware, await target_board(board_name), parameters) - os.remove(custom_firmware) - except InvalidFirmwareFile as error: - raise StackedHTTPException(status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE, error=error) from error finally: binary.file.close() - logger.debug("Starting ardupilot again") - await autopilot.start_ardupilot() + + board = await target_board(board_name) + + async def pre_install_callback(output_queue: asyncio.Queue[Tuple[str, str]]) -> None: + await output_queue.put(("stdout", "Firmware file uploaded")) + + async def post_install_callback(output_queue: asyncio.Queue[Tuple[str, str]]) -> None: + if custom_firmware and os.path.exists(custom_firmware): + os.remove(custom_firmware) + await output_queue.put(("stdout", "Cleaned up temporary firmware file")) + + return await streaming_firmware_operation( + autopilot.install_firmware_from_file, + custom_firmware, + board, + parameters, + pre_install_callback=pre_install_callback, + post_install_callback=post_install_callback, + ) @index_router_v1.get( - "/board", response_model=Optional[FlightController], summary="Check what is the current running board." + "/board", response_model=Optional[FlightControllerV1], summary="Check what is the current running board." ) @index_to_http_exception def get_board() -> Any: - return autopilot.current_board + if autopilot.current_board is None: + return None + return { + "name": autopilot.current_board.name, + "manufacturer": autopilot.current_board.manufacturer, + "platform": autopilot.current_board.platform.name, + "platform_type": autopilot.current_board.platform.platform_type.value, + "ardupilot_board_id": autopilot.current_board.ardupilot_board_id, + "flags": autopilot.current_board.flags, + } @index_router_v1.post("/board", summary="Set board to be used.") @@ -271,13 +386,11 @@ async def stop() -> Any: @index_router_v1.post("/restore_default_firmware", summary="Restore default firmware.") @index_to_http_exception async def restore_default_firmware(board_name: Optional[str] = None) -> Any: - try: - await autopilot.kill_ardupilot() - await autopilot.restore_default_firmware(await target_board(board_name)) - except (NoDefaultFirmwareAvailable, ValueError) as error: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(error)) from error - finally: - await autopilot.start_ardupilot() + board = await target_board(board_name) + return await streaming_firmware_operation( + autopilot.restore_default_firmware, + board, + ) @index_router_v1.get( diff --git a/core/services/ardupilot_manager/api/v2/routers/index.py b/core/services/ardupilot_manager/api/v2/routers/index.py index a8a1a06f24..d7c76cbef6 100644 --- a/core/services/ardupilot_manager/api/v2/routers/index.py +++ b/core/services/ardupilot_manager/api/v2/routers/index.py @@ -1,6 +1,11 @@ +from typing import Any, Optional + +from api.v1.routers.index import index_to_http_exception +from autopilot_manager import AutoPilotManager from fastapi import APIRouter, status from fastapi.responses import HTMLResponse from fastapi_versioning import versioned_api_route +from typedefs import FlightController index_router_v2 = APIRouter( tags=["index_v2"], @@ -8,6 +13,8 @@ responses={status.HTTP_404_NOT_FOUND: {"description": "Not found"}}, ) +autopilot = AutoPilotManager() + @index_router_v2.get("/", status_code=status.HTTP_200_OK) async def root_v2() -> HTMLResponse: @@ -19,3 +26,11 @@ async def root_v2() -> HTMLResponse: """ return HTMLResponse(content=html_content, status_code=200) + + +@index_router_v2.get( + "/board", response_model=Optional[FlightController], summary="Check what is the current running board." +) +@index_to_http_exception +def get_board() -> Any: + return autopilot.current_board diff --git a/core/services/ardupilot_manager/autopilot_manager.py b/core/services/ardupilot_manager/autopilot_manager.py index 50ca2900b5..d273e5a89e 100644 --- a/core/services/ardupilot_manager/autopilot_manager.py +++ b/core/services/ardupilot_manager/autopilot_manager.py @@ -4,7 +4,7 @@ import subprocess import time from copy import deepcopy -from typing import Any, List, Optional, Set +from typing import Any, Awaitable, Callable, List, Optional, Set import psutil from commonwealth.mavlink_comm.VehicleManager import VehicleManager @@ -300,12 +300,12 @@ def get_default_params_cmdline(self, platform: Platform) -> str: async def start_linux_board(self, board: LinuxFlightController) -> None: self._current_board = board if not self.firmware_manager.is_firmware_installed(self._current_board): - if board.platform == Platform.Navigator: + if board.platform.name == "Navigator": await self.firmware_manager.install_firmware_from_file( pathlib.Path("/root/blueos-files/ardupilot-manager/default/ardupilot_navigator"), board, ) - elif board.platform == Platform.Navigator64: + elif board.platform.name == "Navigator64": await self.firmware_manager.install_firmware_from_file( pathlib.Path("/root/blueos-files/ardupilot-manager/default/ardupilot_navigator64"), board, @@ -316,7 +316,7 @@ async def start_linux_board(self, board: LinuxFlightController) -> None: ) firmware_path = self.firmware_manager.firmware_path(self._current_board.platform) - self.firmware_manager.validate_firmware(firmware_path, self._current_board.platform) + self.firmware_manager.validate_firmware(firmware_path, self._current_board) # ArduPilot process will connect as a client on the UDP server created by the mavlink router master_endpoint = Endpoint( @@ -434,7 +434,7 @@ async def start_sitl(self) -> None: self.current_sitl_frame = frame firmware_path = self.firmware_manager.firmware_path(self._current_board.platform) - self.firmware_manager.validate_firmware(firmware_path, self._current_board.platform) + self.firmware_manager.validate_firmware(firmware_path, self._current_board) # ArduPilot SITL binary will bind TCP port 5760 (server) and the mavlink router will connect to it as a client master_endpoint = Endpoint( @@ -474,9 +474,10 @@ async def start_mavlink_manager(self, device: Endpoint) -> None: logger.warning(str(error)) await self.mavlink_manager.start(device) - @staticmethod - async def available_boards(include_bootloaders: bool = False) -> List[FlightController]: + async def available_boards(self, include_bootloaders: bool = False) -> List[FlightController]: all_boards = await BoardDetector.detect(True) + if self.current_board and self.current_board.path is not None and self.current_board not in all_boards: + all_boards.append(self.current_board) if include_bootloaders: return all_boards return [board for board in all_boards if FlightControllerFlags.is_bootloader not in board.flags] @@ -522,22 +523,23 @@ def get_board_to_be_used(self, boards: List[FlightController]) -> FlightControll real_boards = [board for board in boards if board.type not in [PlatformType.SITL, PlatformType.Manual]] if not real_boards: raise RuntimeError("No physical board detected and SITL/Manual board aren't explicitly chosen.") - real_boards.sort(key=lambda board: board.platform) + real_boards.sort(key=lambda board: board.platform.name) return real_boards[0] def running_ardupilot_processes(self) -> List[psutil.Process]: """Return list of all Ardupilot process running on system.""" def is_ardupilot_process(process: psutil.Process) -> bool: - """Checks if given process is using a Ardupilot's firmware file, for any known platform.""" - for platform in Platform: - firmware_path = self.firmware_manager.firmware_path(platform) - try: - if str(firmware_path) in " ".join(process.cmdline()): - return True - except psutil.NoSuchProcess: - # process may have died before we could call cmdline() - pass + """Checks if given process is using a Ardupilot's firmware file, for the current platform.""" + if not self._current_board: + return False + firmware_path = self.firmware_manager.firmware_path(self._current_board.platform) + try: + if str(firmware_path) in " ".join(process.cmdline()): + return True + except psutil.NoSuchProcess: + # process may have died before we could call cmdline() + pass return False return list(filter(is_ardupilot_process, psutil.process_iter())) @@ -577,7 +579,7 @@ async def prune_ardupilot_processes(self) -> None: async def kill_ardupilot(self) -> None: self.should_be_running = False - if not self.current_board or self.current_board.platform != Platform.SITL: + if not self.current_board or self.current_board.platform.platform_type != PlatformType.SITL: try: logger.info("Disarming vehicle.") await self.vehicle_manager.disarm_vehicle() @@ -620,15 +622,15 @@ async def start_ardupilot(self) -> None: flight_controller = self.get_board_to_be_used(available_boards) logger.info(f"Using {flight_controller.name} flight-controller.") - if flight_controller.platform.type == PlatformType.Linux: + if flight_controller.platform.platform_type == PlatformType.Linux: assert isinstance(flight_controller, LinuxFlightController) flight_controller.setup() await self.start_linux_board(flight_controller) - elif flight_controller.platform.type == PlatformType.Serial: + elif flight_controller.platform.platform_type == PlatformType.Serial: await self.start_serial(flight_controller) - elif flight_controller.platform == Platform.SITL: + elif flight_controller.platform.platform_type == PlatformType.SITL: await self.start_sitl() - elif flight_controller.platform == Platform.Manual: + elif flight_controller.platform.platform_type == PlatformType.Manual: await self.start_manual_board(flight_controller) else: raise RuntimeError(f"Invalid board type: {flight_controller}") @@ -689,25 +691,39 @@ async def update_endpoints(self, endpoints_to_update: Set[Endpoint]) -> None: self._save_current_endpoints() await self.mavlink_manager.restart() - async def get_available_firmwares(self, vehicle: Vehicle, platform: Platform) -> List[Firmware]: - return await self.firmware_manager.get_available_firmwares(vehicle, platform) + async def get_available_firmwares( + self, vehicle: Vehicle, board: FlightController, firmware_name: Optional[str] = "Ardupilot" + ) -> List[Firmware]: + return await self.firmware_manager.get_available_firmwares(vehicle, board, firmware_name) async def install_firmware_from_file( - self, firmware_path: pathlib.Path, board: FlightController, default_parameters: Optional[Parameters] = None + self, + firmware_path: pathlib.Path, + board: FlightController, + default_parameters: Optional[Parameters] = None, + output_callback: Optional[Callable[[str, str], Awaitable[None]]] = None, ) -> None: - await self.firmware_manager.install_firmware_from_file(firmware_path, board, default_parameters) + await self.firmware_manager.install_firmware_from_file( + firmware_path, board, default_parameters, output_callback + ) + # pylint: disable=too-many-arguments async def install_firmware_from_url( self, url: str, board: FlightController, make_default: bool = False, default_parameters: Optional[Parameters] = None, + output_callback: Optional[Callable[[str, str], Awaitable[None]]] = None, ) -> None: - await self.firmware_manager.install_firmware_from_url(url, board, make_default, default_parameters) + await self.firmware_manager.install_firmware_from_url( + url, board, make_default, default_parameters, output_callback + ) - async def restore_default_firmware(self, board: FlightController) -> None: - await self.firmware_manager.restore_default_firmware(board) + async def restore_default_firmware( + self, board: FlightController, output_callback: Optional[Callable[[str, str], Awaitable[None]]] = None + ) -> None: + await self.firmware_manager.restore_default_firmware(board, output_callback) async def set_manual_board_master_endpoint(self, endpoint: Endpoint) -> bool: self.configuration["manual_board_master_endpoint"] = endpoint.as_dict() diff --git a/core/services/ardupilot_manager/firmware/FirmwareDownload.py b/core/services/ardupilot_manager/firmware/FirmwareDownload.py index a733431f3b..e63b43be4a 100644 --- a/core/services/ardupilot_manager/firmware/FirmwareDownload.py +++ b/core/services/ardupilot_manager/firmware/FirmwareDownload.py @@ -19,7 +19,7 @@ ) from loguru import logger from packaging.version import Version -from typedefs import FirmwareFormat, Platform, PlatformType, Vehicle +from typedefs import Firmware, FirmwareFormat, FlightController, PlatformType, Vehicle class FirmwareDownloader: @@ -114,7 +114,7 @@ async def download_manifest(self) -> bool: return True - async def _find_version_item(self, **args: str) -> List[Dict[str, Any]]: + async def _find_version_item(self, **args: str | int) -> List[Firmware]: """Find version objects in the manifest that match the specific case of **args The arguments should follow the same name described in the dictionary inside the manifest @@ -124,7 +124,7 @@ async def _find_version_item(self, **args: str) -> List[Dict[str, Any]]: E.g: `self._find_version_item(vehicletype="Sub", platform="Pixhawk1", mav_firmware_version_type="4.0.1")` Returns: - List[Dict[str, Any]]: A list of firmware items that match the arguments. + List[Firmware]: A list of firmware items that match the arguments. """ if not self._manifest and not await self.download_manifest(): raise ManifestUnavailable("Manifest file is not available. Cannot use it to find firmware candidates.") @@ -134,40 +134,58 @@ async def _find_version_item(self, **args: str) -> List[Dict[str, Any]]: # Make sure that the item matches all args value for item in self._manifest["firmware"]: for key, value in args.items(): - real_key = key.replace("_", "-") - if real_key not in item or item[real_key] != value: + if key == "platform" and "board_id" in args: + continue + real_key = key.replace("_", "-").lower() if key != "board_id" else "board_id" + if real_key not in item or str(item[real_key]).lower() != str(value).lower(): break else: found_version_item.append(item) - return found_version_item + return [ + Firmware( + board_id=item["board_id"] if "board_id" in item else None, + platform=item["platform"], + name=item["mav-firmware-version-type"], + url=item["url"], + ) + for item in found_version_item + ] @cached(ttl=3600, namespace="firmware_versions") - async def get_available_versions(self, vehicle: Vehicle, platform: Platform) -> List[str]: + async def get_available_versions( + self, vehicle: Vehicle, board: FlightController, _firmware_name: Optional[str] = "Ardupilot" + ) -> List[Firmware]: """Get available firmware versions for the specific plataform and vehicle Args: vehicle (Vehicle): Desired vehicle. - platform (Platform): Desired platform. + board (FlightController): Desired Flight Controller. + firmware (Optional[str]): Desired firmware ("Ardupilot" or "PX4"). Returns: - List[str]: List of available versions that match the specific desired configuration. + List[Firmware]: List of available versions that match the specific desired configuration. """ - available_versions: List[str] = [] - if not await self._manifest_is_valid(): raise InvalidManifest("Manifest file is invalid. Cannot use it to find available versions.") - items = await self._find_version_item(vehicletype=vehicle.value, platform=platform.value) - - for item in items: - if item["format"] == FirmwareDownloader._supported_firmware_formats[platform.type]: - available_versions.append(item["mav-firmware-version-type"]) - - return available_versions + # file format (elf/apj) + file_format = FirmwareDownloader._supported_firmware_formats[board.platform.platform_type].value + if board.ardupilot_board_id is not None: + return await self._find_version_item( + vehicletype=vehicle.value, + platform=board.platform.name, + format=file_format, + board_id=board.ardupilot_board_id, + ) + return await self._find_version_item( + vehicletype=vehicle.value, + platform=board.platform.name, + format=file_format, + ) @cached(ttl=3600, namespace="firmware_url") - async def get_download_url(self, vehicle: Vehicle, platform: Platform, version: str = "") -> str: + async def get_download_url(self, vehicle: Vehicle, board: FlightController, version: str = "") -> str: """Find a specific firmware URL from manifest that matches the arguments. Args: @@ -179,16 +197,16 @@ async def get_download_url(self, vehicle: Vehicle, platform: Platform, version: Returns: str: URL of valid firmware. """ - versions = await self.get_available_versions(vehicle, platform) - logger.debug(f"Got following versions for {vehicle} running {platform}: {versions}") + versions = await self.get_available_versions(vehicle, board) + logger.debug(f"Got following versions for {vehicle} running {board}: {versions}") if not versions: - raise NoVersionAvailable(f"Could not find available firmware versions for {platform}/{vehicle}.") + raise NoVersionAvailable(f"Could not find available firmware versions for {board}/{vehicle}.") - if version and version not in versions: - raise NoVersionAvailable(f"Version {version} was not found for {platform}/{vehicle}.") + if version and not any(version == found_version.name for found_version in versions): + raise NoVersionAvailable(f"Version {version} was not found for {board}/{vehicle}.") - firmware_format = FirmwareDownloader._supported_firmware_formats[platform.type] + firmware_format = FirmwareDownloader._supported_firmware_formats[board.platform.platform_type].value # Autodetect the latest supported version. # For .apj firmwares (e.g. Pixhawk), we use the latest STABLE version while for the others (e.g. SITL and @@ -196,28 +214,36 @@ async def get_download_url(self, vehicle: Vehicle, platform: Platform, version: # the BETA release allow us to track and fix introduced bugs faster. if not version: if firmware_format == FirmwareFormat.APJ: - supported_versions = [version for version in versions if "STABLE" in version] + supported_versions = [version for version in versions if "STABLE" in version.name] newest_version: Optional[str] = None for supported_version in supported_versions: - semver_version = supported_version.split("-")[1] + semver_version = supported_version.name.split("-")[1] if not newest_version or Version(newest_version) < Version(semver_version): newest_version = semver_version if not newest_version: - raise NoVersionAvailable(f"No firmware versions found for {platform}/{vehicle}.") + raise NoVersionAvailable(f"No firmware versions found for {board}/{vehicle}.") version = f"STABLE-{newest_version}" else: version = "BETA" - - items = await self._find_version_item( - vehicletype=vehicle.value, - platform=platform.value, - mav_firmware_version_type=version, - format=firmware_format, - ) + logger.debug(board) + if board.ardupilot_board_id is not None: + items = await self._find_version_item( + vehicletype=vehicle.value, + mav_firmware_version_type=version, + format=firmware_format, + board_id=board.ardupilot_board_id, + ) + else: + items = await self._find_version_item( + vehicletype=vehicle.value, + platform=board.platform.name, + mav_firmware_version_type=version, + format=firmware_format, + ) if len(items) == 0: raise NoCandidate( - f"Found no candidate for configuration: {vehicle=}, {platform=}, {version=}, {firmware_format=}" + f"Found no candidate for configuration: {vehicle=}, {board=}, {version=}, {firmware_format=}" ) if len(items) != 1: @@ -225,19 +251,19 @@ async def get_download_url(self, vehicle: Vehicle, platform: Platform, version: item = items[0] logger.debug(f"Downloading following firmware: {item}") - return str(item["url"]) + return item.url - async def download(self, vehicle: Vehicle, platform: Platform, version: str = "") -> pathlib.Path: + async def download(self, vehicle: Vehicle, board: FlightController, version: str = "") -> pathlib.Path: """Download a specific firmware that matches the arguments. Args: vehicle (Vehicle): Desired vehicle. - platform (Platform): Desired platform. + board (FlightController): Desired board. version (str, optional): Desired version, if None provided the latest stable will be used. Defaults to None. Returns: pathlib.Path: Temporary path for the firmware file. """ - url = await self.get_download_url(vehicle, platform, version) + url = await self.get_download_url(vehicle, board, version) return await FirmwareDownloader._download(url) diff --git a/core/services/ardupilot_manager/firmware/FirmwareInstall.py b/core/services/ardupilot_manager/firmware/FirmwareInstall.py index 77f75e106d..8607b2b3cc 100644 --- a/core/services/ardupilot_manager/firmware/FirmwareInstall.py +++ b/core/services/ardupilot_manager/firmware/FirmwareInstall.py @@ -4,7 +4,7 @@ import platform as system_platform import shutil import stat -from typing import Optional, Union +from typing import Awaitable, Callable, Optional, Union from ardupilot_fw_decoder import BoardSubType, BoardType, Decoder from elftools.elf.elffile import ELFFile @@ -15,17 +15,6 @@ from typedefs import FirmwareFormat, FlightController, Platform, PlatformType -def get_board_id(platform: Platform) -> int: - ardupilot_board_ids = { - Platform.Pixhawk1: 9, - Platform.Pixhawk4: 50, - Platform.Pixhawk6X: 53, - Platform.Pixhawk6C: 56, - Platform.CubeOrange: 140, - } - return ardupilot_board_ids.get(platform, -1) - - def is_valid_elf_type(elf_arch: str) -> bool: arch_mapping = {"i386": "x86", "x86_64": "x64", "armv7l": "ARM", "aarch64": "AArch64"} system_arch = system_platform.machine() @@ -38,14 +27,9 @@ def is_valid_elf_type(elf_arch: str) -> bool: return False -def get_correspondent_decoder_platform(current_platform: Platform) -> Union[BoardType, BoardSubType]: - correspondent_decoder_platform = { - Platform.SITL: BoardType.SITL, - Platform.Navigator: BoardSubType.LINUX_NAVIGATOR, - Platform.Argonot: BoardSubType.LINUX_NAVIGATOR, - Platform.Navigator64: BoardSubType.LINUX_NAVIGATOR, - } - return correspondent_decoder_platform.get(current_platform, BoardType.EMPTY) +def get_correspondent_decoder_platform(current_platform_type: PlatformType) -> Union[BoardType, BoardSubType]: + correspondent_decoder_platform = {PlatformType.SITL: BoardType.SITL, PlatformType.Linux: BoardType.LINUX} + return correspondent_decoder_platform.get(current_platform_type, BoardType.EMPTY) class FirmwareInstaller: @@ -61,12 +45,12 @@ def __init__(self) -> None: pass @staticmethod - def _validate_apj(firmware_path: pathlib.Path, platform: Platform) -> None: + def _validate_apj(firmware_path: pathlib.Path, board: FlightController) -> None: try: with open(firmware_path, "r", encoding="utf-8") as firmware_file: firmware_data = firmware_file.read() firm_board_id = int(json.loads(firmware_data).get("board_id", -1)) - expected_board_id = get_board_id(platform) + expected_board_id = board.ardupilot_board_id if expected_board_id == -1: raise UnsupportedPlatform("Firmware validation is not implemented for this board yet.") if firm_board_id == -1: @@ -97,7 +81,7 @@ def _validate_elf(firmware_path: pathlib.Path, platform: Platform) -> None: firm_decoder.process(firmware_path) firm_board = BoardType(firm_decoder.fwversion.board_type) firm_sub_board = BoardSubType(firm_decoder.fwversion.board_subtype) - current_decoder_platform = get_correspondent_decoder_platform(platform) + current_decoder_platform = get_correspondent_decoder_platform(platform.platform_type) logger.debug( f"firm_board: {firm_board}, firm_sub_board: {firm_sub_board}, current_decoder_platform: {current_decoder_platform}" ) @@ -112,18 +96,19 @@ def _validate_elf(firmware_path: pathlib.Path, platform: Platform) -> None: raise InvalidFirmwareFile("Given firmware is not a supported version.") from error @staticmethod - def validate_firmware(firmware_path: pathlib.Path, platform: Platform) -> None: + def validate_firmware(firmware_path: pathlib.Path, board: FlightController) -> None: """Check if given firmware is valid for given platform.""" - firmware_format = FirmwareDownloader._supported_firmware_formats[platform.type] - + logger.debug(f"Validating firmware for board {board}.") + logger.debug(f"platform type: {board.platform.platform_type}") + firmware_format = FirmwareDownloader._supported_firmware_formats[board.platform.platform_type] + logger.debug(f"Firmware format: {firmware_format}") if firmware_format == FirmwareFormat.APJ: - FirmwareInstaller._validate_apj(firmware_path, platform) + FirmwareInstaller._validate_apj(firmware_path, board) return if firmware_format == FirmwareFormat.ELF: - FirmwareInstaller._validate_elf(firmware_path, platform) + FirmwareInstaller._validate_elf(firmware_path, board.platform) return - raise UnsupportedPlatform("Firmware validation is not implemented for this platform.") @staticmethod @@ -142,29 +127,33 @@ async def install_firmware( new_firmware_path: pathlib.Path, board: FlightController, firmware_dest_path: Optional[pathlib.Path] = None, + output_callback: Optional[Callable[[str, str], Awaitable[None]]] = None, ) -> None: """Install given firmware.""" if not new_firmware_path.is_file(): raise InvalidFirmwareFile("Given path is not a valid file.") - firmware_format = FirmwareDownloader._supported_firmware_formats[board.platform.type] + logger.debug(f"Installing firmware for board {board}, from {new_firmware_path}.") + firmware_format = FirmwareDownloader._supported_firmware_formats[board.platform.platform_type] if firmware_format == FirmwareFormat.ELF: self.add_run_permission(new_firmware_path) - self.validate_firmware(new_firmware_path, board.platform) + self.validate_firmware(new_firmware_path, board) if board.type == PlatformType.Serial: firmware_uploader = FirmwareUploader() if not board.path: raise ValueError("Board path not available.") firmware_uploader.set_autopilot_port(pathlib.Path(board.path)) - await firmware_uploader.upload(new_firmware_path) + await firmware_uploader.upload(new_firmware_path, output_callback) return if firmware_format == FirmwareFormat.ELF: # Using copy() instead of move() since the last can't handle cross-device properly (e.g. docker binds) if not firmware_dest_path: raise FirmwareInstallFail("Firmware file destination not provided.") shutil.copy(new_firmware_path, firmware_dest_path) + if output_callback: + await output_callback("stdout", f"Firmware copied to {firmware_dest_path}") return raise UnsupportedPlatform("Firmware install is not implemented for this platform.") diff --git a/core/services/ardupilot_manager/firmware/FirmwareManagement.py b/core/services/ardupilot_manager/firmware/FirmwareManagement.py index bde59602d0..071b7ae7d7 100644 --- a/core/services/ardupilot_manager/firmware/FirmwareManagement.py +++ b/core/services/ardupilot_manager/firmware/FirmwareManagement.py @@ -3,7 +3,7 @@ import subprocess import tempfile from pathlib import Path -from typing import List, Optional +from typing import Awaitable, Callable, List, Optional from exceptions import ( FirmwareInstallFail, @@ -38,7 +38,7 @@ def __init__( @staticmethod def firmware_name(platform: Platform) -> str: """Get consistent firmware name for given platform.""" - return f"ardupilot_{platform.value.lower()}" + return f"ardupilot_{platform.name.lower()}" def firmware_path(self, platform: Platform) -> pathlib.Path: """Get firmware's path for given platform. This is the path where we expect to find @@ -71,42 +71,38 @@ def is_firmware_installed(self, board: FlightController) -> bool: # TODO: Validate if properly. The uploader tool seems capable of doing this. return True - firmware_format = FirmwareDownloader._supported_firmware_formats[board.platform.type] + firmware_format = FirmwareDownloader._supported_firmware_formats[board.platform.platform_type] if firmware_format == FirmwareFormat.ELF: return pathlib.Path.is_file(self.firmware_path(board.platform)) raise UnsupportedPlatform("Install check is not implemented for this platform.") - async def get_available_firmwares(self, vehicle: Vehicle, platform: Platform) -> List[Firmware]: - firmwares = [] - versions = await self.firmware_download.get_available_versions(vehicle, platform) - if not versions: - raise NoVersionAvailable(f"Failed to find any version for vehicle {vehicle}.") - for version in versions: - try: - url = await self.firmware_download.get_download_url(vehicle, platform, version) - firmware = Firmware(name=version, url=url) - firmwares.append(firmware) - except Exception as error: - logger.debug(f"Error fetching URL for version {version} on vehicle {vehicle}: {error}") + async def get_available_firmwares( + self, vehicle: Vehicle, board: FlightController, firmware_name: Optional[str] = "Ardupilot" + ) -> List[Firmware]: + firmwares: List[Firmware] = await self.firmware_download.get_available_versions(vehicle, board, firmware_name) if not firmwares: - raise NoVersionAvailable(f"Failed do get any valid URL for vehicle {vehicle}.") + raise NoVersionAvailable(f"Failed to find any version for vehicle {vehicle}.") return firmwares async def install_firmware_from_file( - self, new_firmware_path: pathlib.Path, board: FlightController, default_parameters: Optional[Parameters] = None + self, + new_firmware_path: pathlib.Path, + board: FlightController, + default_parameters: Optional[Parameters] = None, + output_callback: Optional[Callable[[str, str], Awaitable[None]]] = None, ) -> None: if default_parameters is not None: - if board.platform.type == PlatformType.Serial: + if board.platform.platform_type == PlatformType.Serial: self.embed_params_into_apj(new_firmware_path, default_parameters) else: self.save_params_to_default_linux_path(board.platform, default_parameters) try: if board.type == PlatformType.Serial: - await self.firmware_installer.install_firmware(new_firmware_path, board) + await self.firmware_installer.install_firmware(new_firmware_path, board, None, output_callback) else: await self.firmware_installer.install_firmware( - new_firmware_path, board, self.firmware_path(board.platform) + new_firmware_path, board, self.firmware_path(board.platform), output_callback ) logger.info(f"Succefully installed firmware for {board.name}.") except Exception as error: @@ -142,33 +138,41 @@ def save_params_to_default_linux_path(self, platform: Platform, default_paramete if self.default_user_params_path(platform).is_file(): self.default_user_params_path(platform).unlink() + # pylint: disable=too-many-arguments async def install_firmware_from_url( self, url: str, board: FlightController, makeDefault: bool = False, default_parameters: Optional[Parameters] = None, + output_callback: Optional[Callable[[str, str], Awaitable[None]]] = None, ) -> None: + if output_callback: + await output_callback("stdout", f"Downloading firmware from {url}...") temporary_file = await self.firmware_download._download(url.strip()) + if output_callback: + await output_callback("stdout", "Firmware downloaded successfully") if default_parameters is not None: - if board.platform.type == PlatformType.Serial: + if board.platform.platform_type == PlatformType.Serial: self.embed_params_into_apj(temporary_file, default_parameters) else: self.save_params_to_default_linux_path(board.platform, default_parameters) if makeDefault: shutil.copy(temporary_file, self.default_user_firmware_path(board.platform)) - await self.install_firmware_from_file(temporary_file, board, default_parameters) + await self.install_firmware_from_file(temporary_file, board, default_parameters, output_callback) async def install_firmware_from_params(self, vehicle: Vehicle, board: FlightController, version: str = "") -> None: - url = await self.firmware_download.get_download_url(vehicle, board.platform, version) + url = await self.firmware_download.get_download_url(vehicle, board, version) await self.install_firmware_from_url(url, board) - async def restore_default_firmware(self, board: FlightController) -> None: + async def restore_default_firmware( + self, board: FlightController, output_callback: Optional[Callable[[str, str], Awaitable[None]]] = None + ) -> None: if not self.is_default_firmware_available(board.platform): raise NoDefaultFirmwareAvailable(f"Default firmware not available for '{board.name}'.") - await self.install_firmware_from_file(self.default_firmware_path(board.platform), board) + await self.install_firmware_from_file(self.default_firmware_path(board.platform), board, None, output_callback) @staticmethod - def validate_firmware(firmware_path: pathlib.Path, platform: Platform) -> None: - FirmwareInstaller.validate_firmware(firmware_path, platform) + def validate_firmware(firmware_path: pathlib.Path, board: FlightController) -> None: + FirmwareInstaller.validate_firmware(firmware_path, board) diff --git a/core/services/ardupilot_manager/firmware/FirmwareUpload.py b/core/services/ardupilot_manager/firmware/FirmwareUpload.py index 9ee54606fd..0db07103ce 100644 --- a/core/services/ardupilot_manager/firmware/FirmwareUpload.py +++ b/core/services/ardupilot_manager/firmware/FirmwareUpload.py @@ -2,11 +2,101 @@ import pathlib import shutil import subprocess +import time +from typing import Awaitable, Callable, Optional from exceptions import FirmwareUploadFail, InvalidUploadTool, UploadToolNotFound from loguru import logger +class SharedActivityTracker: + """Tracks the last activity time across multiple stream readers.""" + + def __init__(self) -> None: + self.last_activity = time.monotonic() + + def mark_activity(self) -> None: + """Mark that activity occurred on any stream.""" + self.last_activity = time.monotonic() + + def seconds_since_last_activity(self) -> float: + """Return seconds since last activity on any stream.""" + return time.monotonic() - self.last_activity + + +class StreamReader: + """Reads a stream byte-by-byte, treating both \\n and \\r as line delimiters. + + Automatically sends buffered data after 1 second of inactivity. + """ + + # pylint: disable=too-many-arguments + def __init__( + self, + stream: asyncio.StreamReader, + stream_name: str, + output_callback: Optional[Callable[[str, str], Awaitable[None]]] = None, + timeout: float = 1.0, + activity_tracker: Optional[SharedActivityTracker] = None, + ) -> None: + self.stream = stream + self.stream_name = stream_name + self.output_callback = output_callback + self.timeout = timeout + self.buffer = b"" + self.lines: list[str] = [] + self.activity_tracker = activity_tracker or SharedActivityTracker() + + async def _send_line(self, line: str) -> None: + """Send a decoded line through the callback and logger.""" + if line: + logger.debug(f"[{self.stream_name}] {line}") + self.lines.append(line) + if self.output_callback: + await self.output_callback(self.stream_name, line) + + async def _flush_buffer(self) -> None: + """Flush the current buffer and send as a line.""" + if self.buffer: + decoded_line = self.buffer.decode().rstrip("\r\n") + await self._send_line(decoded_line) + self.buffer = b"" + + async def read_all(self) -> None: + """Read all data from the stream until it closes.""" + max_idle_seconds = 10 + + while True: + try: + # Try to read one byte with timeout + chunk = await asyncio.wait_for(self.stream.read(1), timeout=self.timeout) + if not chunk: + # End of stream - send any remaining buffer + await self._flush_buffer() + break + + # Mark activity on successful read + self.activity_tracker.mark_activity() + self.buffer += chunk + + # Check if we hit a line delimiter (\n or \r) + if chunk in (b"\n", b"\r"): + decoded_line = self.buffer.decode().rstrip("\r\n") + await self._send_line(decoded_line) + self.buffer = b"" + except asyncio.TimeoutError: + # Timeout passed without new data - send buffer if non-empty + await self._flush_buffer() + + # Check if BOTH streams have been idle (using shared tracker) + idle_time = self.activity_tracker.seconds_since_last_activity() + if idle_time >= max_idle_seconds: + logger.debug( + f"[{self.stream_name}] All streams inactive for {idle_time:.1f} seconds, assuming closed" + ) + break + + class FirmwareUploader: def __init__(self) -> None: self._autopilot_port: pathlib.Path = pathlib.Path("/dev/autopilot") @@ -42,7 +132,20 @@ def set_baudrate_bootloader(self, baudrate: int) -> None: def set_baudrate_flightstack(self, baudrate: int) -> None: self._baudrate_flightstack = baudrate - async def upload(self, firmware_path: pathlib.Path) -> None: + async def _send_control_message( + self, + message: str, + output_callback: Optional[Callable[[str, str], Awaitable[None]]] = None, + ) -> None: + """Send a control message through the output callback.""" + if output_callback: + await output_callback("control", message) + + async def upload( + self, + firmware_path: pathlib.Path, + output_callback: Optional[Callable[[str, str], Awaitable[None]]] = None, + ) -> None: logger.info("Starting upload of firmware to board.") process = await asyncio.create_subprocess_shell( @@ -55,34 +158,52 @@ async def upload(self, firmware_path: pathlib.Path) -> None: shell=True, ) - async def monitor_uploader_process() -> None: - if process.stdout: - while True: - line = await process.stdout.readline() - if not line: - break - logger.debug(line.decode().strip()) + # Create shared activity tracker for both streams + activity_tracker = SharedActivityTracker() + stdout_reader = ( + StreamReader(process.stdout, "stdout", output_callback, activity_tracker=activity_tracker) + if process.stdout + else None + ) + stderr_reader = ( + StreamReader(process.stderr, "stderr", output_callback, activity_tracker=activity_tracker) + if process.stderr + else None + ) + + async def read_stdout() -> None: + if stdout_reader: + await stdout_reader.read_all() - while True: - if process.returncode is not None: - break - logger.debug("Waiting for upload process to finish.") - await asyncio.sleep(1) + async def read_stderr() -> None: + if stderr_reader: + await stderr_reader.read_all() try: - await asyncio.wait_for(monitor_uploader_process(), timeout=180) - - return_code = await process.wait() - if return_code != 0: - raise FirmwareUploadFail(f"Upload process returned non-zero code {return_code}.") + # Run both stream readers and process wait concurrently with a single timeout + await asyncio.wait_for(asyncio.gather(read_stdout(), read_stderr(), process.wait()), timeout=180) logger.info("Successfully uploaded firmware to board.") except asyncio.TimeoutError as error: process.kill() + await self._send_control_message("error", output_callback) raise FirmwareUploadFail("Firmware upload timed out after 180 seconds.") from error except Exception as error: process.kill() + await self._send_control_message("error", output_callback) raise FirmwareUploadFail("Unable to upload firmware to board.") from error finally: + return_code = process.returncode + errors = stderr_reader.lines if stderr_reader else [] + if errors and return_code != 0: + await self._send_control_message("error", output_callback) + raise FirmwareUploadFail(f"Upload process returned errors: {errors} return code: {return_code}") + if return_code != 0: + await self._send_control_message("error", output_callback) + raise FirmwareUploadFail(f"Upload process returned non-zero code {return_code}.") + + # Send control message indicating successful completion + await self._send_control_message("done", output_callback) + # Give some time for the board to reboot (preventing fail reconnecting to it) await asyncio.sleep(10) diff --git a/core/services/ardupilot_manager/firmware/test_FirmwareDownload.py b/core/services/ardupilot_manager/firmware/test_FirmwareDownload.py index 4b7d12cf1d..956a7b8463 100644 --- a/core/services/ardupilot_manager/firmware/test_FirmwareDownload.py +++ b/core/services/ardupilot_manager/firmware/test_FirmwareDownload.py @@ -4,7 +4,30 @@ import pytest from firmware.FirmwareDownload import FirmwareDownloader -from typedefs import Platform, Vehicle +from typedefs import FlightController, Platform, PlatformType, Vehicle + +Pixhawk1 = FlightController( + name="Pixhawk1", + manufacturer="3DR", + platform=Platform(name="Pixhawk1", platform_type=PlatformType.Serial), + ardupilot_board_id=9, +) +Pixhawk4 = FlightController( + name="Pixhawk4", + manufacturer="Holybro", + platform=Platform(name="Pixhawk4", platform_type=PlatformType.Serial), + ardupilot_board_id=50, +) +SITL = FlightController( + name="SITL", + manufacturer="ArduPilot Team", + platform=Platform.SITL(), +) +Navigator = FlightController( + name="Navigator", + manufacturer="Blue Robotics", + platform=Platform(name="Navigator", platform_type=PlatformType.Linux), +) def test_static() -> None: @@ -25,17 +48,17 @@ async def firmware_download_wrapper() -> None: assert await firmware_download.download_manifest(), "Failed to download/validate manifest file." versions = await firmware_download._find_version_item( - vehicletype="Sub", format="apj", mav_firmware_version_type="STABLE-4.0.1", platform=Platform.Pixhawk1 + vehicletype="Sub", format="apj", mav_firmware_version_type="STABLE-4.0.1", platform=Pixhawk1.platform.name ) assert len(versions) == 1, "Failed to find a single firmware." versions = await firmware_download._find_version_item( - vehicletype="Sub", mav_firmware_version_type="STABLE-4.0.1", platform=Platform.Pixhawk1 + vehicletype="Sub", mav_firmware_version_type="STABLE-4.0.1", platform=Pixhawk1.platform.name ) # There are two versions, one for the firmware and one with the bootloader assert len(versions) == 2, "Failed to find multiple versions." - available_versions = await firmware_download.get_available_versions(Vehicle.Sub, Platform.Pixhawk1) + available_versions = await firmware_download.get_available_versions(Vehicle.Sub, Pixhawk1) assert len(available_versions) == len(set(available_versions)), "Available versions are not unique." test_available_versions = ["STABLE-4.0.1", "STABLE-4.0.0", "OFFICIAL", "DEV", "BETA"] @@ -44,29 +67,23 @@ async def firmware_download_wrapper() -> None: ), "Available versions are missing know versions." assert await firmware_download.download( - Vehicle.Sub, Platform.Pixhawk1, "STABLE-4.0.1" + Vehicle.Sub, Pixhawk1, "STABLE-4.0.1" ), "Failed to download a valid firmware file." - assert await firmware_download.download( - Vehicle.Sub, Platform.Pixhawk1 - ), "Failed to download latest valid firmware file." + assert await firmware_download.download(Vehicle.Sub, Pixhawk1), "Failed to download latest valid firmware file." - assert await firmware_download.download( - Vehicle.Sub, Platform.Pixhawk4 - ), "Failed to download latest valid firmware file." + assert await firmware_download.download(Vehicle.Sub, Pixhawk4), "Failed to download latest valid firmware file." - assert await firmware_download.download(Vehicle.Sub, Platform.SITL), "Failed to download SITL." + assert await firmware_download.download(Vehicle.Sub, SITL), "Failed to download SITL." # skipt these tests for MacOS if platform.system() == "Darwin": pytest.skip("Skipping test for MacOS") # It'll fail if running in an arch different of ARM if "x86" in os.uname().machine: - assert await firmware_download.download( - Vehicle.Sub, Platform.Navigator - ), "Failed to download navigator binary." + assert await firmware_download.download(Vehicle.Sub, Navigator), "Failed to download navigator binary." else: with pytest.raises(Exception): - await firmware_download.download(Vehicle.Sub, Platform.Navigator) + await firmware_download.download(Vehicle.Sub, Navigator) asyncio.run(firmware_download_wrapper()) diff --git a/core/services/ardupilot_manager/firmware/test_FirmwareInstall.py b/core/services/ardupilot_manager/firmware/test_FirmwareInstall.py index c5f10c6141..0b8a727ab8 100644 --- a/core/services/ardupilot_manager/firmware/test_FirmwareInstall.py +++ b/core/services/ardupilot_manager/firmware/test_FirmwareInstall.py @@ -6,7 +6,25 @@ from exceptions import InvalidFirmwareFile from firmware.FirmwareDownload import FirmwareDownloader from firmware.FirmwareInstall import FirmwareInstaller -from typedefs import FlightController, Platform, Vehicle +from typedefs import FlightController, Platform, PlatformType, Vehicle + +Pixhawk1 = FlightController( + name="Pixhawk1", + platform=Platform(name="Pixhawk1", platform_type=PlatformType.Serial), + ardupilot_board_id=9, +) +Pixhawk4 = FlightController( + name="Pixhawk4", + manufacturer="Holybro", + platform=Platform(name="Pixhawk4", platform_type=PlatformType.Serial), + ardupilot_board_id=50, +) +SITL = FlightController(name="SITL", manufacturer="ArduPilot Team", platform=Platform.SITL()) +Navigator = FlightController( + name="Navigator", + manufacturer="Blue Robotics", + platform=Platform(name="Navigator", platform_type=PlatformType.Linux), +) def test_firmware_validation() -> None: @@ -15,28 +33,27 @@ async def firmware_validation_wrapper() -> None: installer = FirmwareInstaller() # Pixhawk1 and Pixhawk4 APJ firmwares should always work - temporary_file = await downloader.download(Vehicle.Sub, Platform.Pixhawk1) - installer.validate_firmware(temporary_file, Platform.Pixhawk1) + temporary_file = await downloader.download(Vehicle.Sub, Pixhawk1) + installer.validate_firmware(temporary_file, Pixhawk1) - temporary_file = await downloader.download(Vehicle.Sub, Platform.Pixhawk4) - installer.validate_firmware(temporary_file, Platform.Pixhawk4) + temporary_file = await downloader.download(Vehicle.Sub, Pixhawk4) + installer.validate_firmware(temporary_file, Pixhawk4) # New SITL firmwares should always work, except for MacOS # there are no SITL builds for MacOS if platform.system() != "Darwin": - temporary_file = await downloader.download(Vehicle.Sub, Platform.SITL, version="DEV") - installer.validate_firmware(temporary_file, Platform.SITL) + temporary_file = await downloader.download(Vehicle.Sub, SITL, version="DEV") + installer.validate_firmware(temporary_file, SITL) # Raise when validating Navigator firmwares (as test platform is x86) - temporary_file = await downloader.download(Vehicle.Sub, Platform.Navigator) + temporary_file = await downloader.download(Vehicle.Sub, Navigator) with pytest.raises(InvalidFirmwareFile): - installer.validate_firmware(temporary_file, Platform.Navigator) + installer.validate_firmware(temporary_file, Navigator) # Install SITL firmware if platform.system() != "Darwin": # there are no SITL builds for MacOS - temporary_file = await downloader.download(Vehicle.Sub, Platform.SITL, version="DEV") - board = FlightController(name="SITL", manufacturer="ArduPilot Team", platform=Platform.SITL) - await installer.install_firmware(temporary_file, board, pathlib.Path(f"{temporary_file}_dest")) + temporary_file = await downloader.download(Vehicle.Sub, SITL, version="DEV") + await installer.install_firmware(temporary_file, SITL, pathlib.Path(f"{temporary_file}_dest")) asyncio.run(firmware_validation_wrapper()) diff --git a/core/services/ardupilot_manager/flight_controller_detector/Detector.py b/core/services/ardupilot_manager/flight_controller_detector/Detector.py index 369ed23c00..0e03a8f4ac 100644 --- a/core/services/ardupilot_manager/flight_controller_detector/Detector.py +++ b/core/services/ardupilot_manager/flight_controller_detector/Detector.py @@ -1,11 +1,16 @@ import asyncio from typing import List, Optional +import serial +from commonwealth.utils.decorators import temporary_cache from commonwealth.utils.general import is_running_as_root -from flight_controller_detector.board_identification import identifiers +from flight_controller_detector.bootloader.px4_bootloader import PX4BootLoader from flight_controller_detector.linux.detector import LinuxFlightControllerDetector +from flight_controller_detector.mavlink_board_id import get_board_id +from loguru import logger +from serial import SerialException from serial.tools.list_ports_linux import SysFS, comports -from typedefs import FlightController, FlightControllerFlags, Platform +from typedefs import FlightController, FlightControllerFlags, Platform, PlatformType class Detector: @@ -33,47 +38,72 @@ def is_serial_bootloader(port: SysFS) -> bool: return port.product is not None and "BL" in port.product @staticmethod - def detect_serial_platform(port: SysFS) -> Optional[Platform]: - for identifier in identifiers: - port_attr = getattr(port, identifier.attribute) - if port_attr is not None and identifier.id_value in port_attr: - return identifier.platform + def _ask_bootloader_for_board_id_sync(port: SysFS) -> Optional[int]: + """ + Synchronous implementation of bootloader board_id retrieval. + Internal function - use ask_bootloader_for_board_id() instead. + """ + try: + logger.info(f"asking bootloader for board id on {port.device}") + with serial.Serial(port.device, 115200, timeout=0.2, write_timeout=0, exclusive=True) as ser: + bootloader = PX4BootLoader(ser) + board_info = bootloader.get_board_info() + return board_info.board_id + except SerialException as e: + logger.error(f"Error asking bootloader for board id on {port.device}: {e}") + return None - return None + @staticmethod + @temporary_cache( + timeout_seconds=300 + ) # what are the chances of someone switching between two boards in bootloader mode? + async def ask_bootloader_for_board_id(port: SysFS) -> Optional[int]: + return await asyncio.to_thread(Detector._ask_bootloader_for_board_id_sync, port) @staticmethod - def detect_serial_flight_controllers() -> List[FlightController]: - """Check if a Pixhawk1 or a Pixhawk4 is connected. + @temporary_cache(timeout_seconds=30) + async def detect_serial_flight_controllers() -> List[FlightController]: + """Check if a standalone flight controller is connected via usb/serial. Returns: List[FlightController]: List with connected serial flight controller. """ sorted_serial_ports = sorted(comports(), key=lambda port: port.name) # type: ignore - unique_serial_devices: List[SysFS] = [] + boards = [] for port in sorted_serial_ports: - # usb_device_path property will be the same for two serial connections using the same USB port - if port.usb_device_path not in [device.usb_device_path for device in unique_serial_devices]: - unique_serial_devices.append(port) - boards = [ - FlightController( - name=port.product or port.name, + board_id = None + if Detector.is_serial_bootloader(port): + board_id = await Detector.ask_bootloader_for_board_id(port) + # https://github.com/mavlink/qgroundcontrol/blob/f68674f47b0ca03f23a50753280516b6fa129545/src/Vehicle/VehicleSetup/FirmwareUpgradeController.cc#L43 + if board_id == 255: + board_id = 9 # px4_fmu-v3_default edge case + if board_id is None: + board_id = await get_board_id(port.device) + if board_id is None: + continue + + board_name = port.product or port.name + board = FlightController( + name=board_name, manufacturer=port.manufacturer, - platform=Detector.detect_serial_platform(port) - or Platform(), # this is just to make CI happy. check line 82 + platform=Platform(name=board_name, platform_type=PlatformType.Serial), path=port.device, + ardupilot_board_id=board_id, + flags=[FlightControllerFlags.is_bootloader] if Detector.is_serial_bootloader(port) else [], ) - for port in unique_serial_devices - if Detector.detect_serial_platform(port) is not None - ] - for port in unique_serial_devices: - for board in boards: - if board.path == port.device and Detector.is_serial_bootloader(port): - board.flags.append(FlightControllerFlags.is_bootloader) + boards.append(board) + + # if we have multiple boards with the same name, lets keep the one with the shortest platform name + if len(boards) > 1: + names = [board.platform.name for board in boards] + logger.info(f"multiple board type candidates: ({names})") + + logger.info(f"detected serial boards: {boards}") return boards @staticmethod def detect_sitl() -> FlightController: - return FlightController(name="SITL", manufacturer="ArduPilot Team", platform=Platform.SITL) + return FlightController(name="SITL", manufacturer="ArduPilot Team", platform=Platform.SITL()) @classmethod async def detect(cls, include_sitl: bool = True, include_manual: bool = True) -> List[FlightController]: @@ -86,19 +116,28 @@ async def detect(cls, include_sitl: bool = True, include_manual: bool = True) -> List[FlightController]: List of available flight controllers """ available: List[FlightController] = [] - if not is_running_as_root(): - return available - - linux_board = await cls.detect_linux_board() - if linux_board: - available.append(linux_board) - available.extend(cls().detect_serial_flight_controllers()) + available.extend(await cls().detect_serial_flight_controllers()) if include_sitl: available.append(Detector.detect_sitl()) if include_manual: - available.append(FlightController(name="Manual", manufacturer="Manual", platform=Platform.Manual)) + available.append( + FlightController( + name="Manual", + manufacturer="Manual", + platform=Platform(name="Manual", platform_type=PlatformType.Serial), + path="", + ardupilot_board_id=None, + ) + ) + + if not is_running_as_root(): + return available + + linux_board = await cls.detect_linux_board() + if linux_board: + available.append(linux_board) return available diff --git a/core/services/ardupilot_manager/flight_controller_detector/board_identification.py b/core/services/ardupilot_manager/flight_controller_detector/board_identification.py deleted file mode 100644 index 6f1434d1a6..0000000000 --- a/core/services/ardupilot_manager/flight_controller_detector/board_identification.py +++ /dev/null @@ -1,33 +0,0 @@ -from enum import Enum -from typing import List - -from pydantic import BaseModel -from typedefs import Platform - - -class SerialAttr(str, Enum): - product = "product" - manufacturer = "manufacturer" - - -class SerialBoardIdentifier(BaseModel): - attribute: SerialAttr - id_value: str - platform: Platform - - -identifiers: List[SerialBoardIdentifier] = [ - SerialBoardIdentifier(attribute=SerialAttr.product, id_value="Pixhawk1", platform=Platform.Pixhawk1), - SerialBoardIdentifier(attribute=SerialAttr.product, id_value="FMU v2.x", platform=Platform.Pixhawk1), - SerialBoardIdentifier(attribute=SerialAttr.product, id_value="FMU v3.x", platform=Platform.Pixhawk1), - SerialBoardIdentifier(attribute=SerialAttr.product, id_value="Pixhawk4", platform=Platform.Pixhawk4), - SerialBoardIdentifier(attribute=SerialAttr.product, id_value="FMU v5.x", platform=Platform.Pixhawk4), - SerialBoardIdentifier(attribute=SerialAttr.product, id_value="FMU v6X.x", platform=Platform.Pixhawk6X), - SerialBoardIdentifier(attribute=SerialAttr.product, id_value="FMU v6C.x", platform=Platform.Pixhawk6C), - SerialBoardIdentifier(attribute=SerialAttr.product, id_value="CubeOrange", platform=Platform.CubeOrange), - SerialBoardIdentifier(attribute=SerialAttr.manufacturer, id_value="ArduPilot", platform=Platform.GenericSerial), - SerialBoardIdentifier(attribute=SerialAttr.manufacturer, id_value="Arduino", platform=Platform.GenericSerial), - SerialBoardIdentifier(attribute=SerialAttr.manufacturer, id_value="3D Robotics", platform=Platform.GenericSerial), - SerialBoardIdentifier(attribute=SerialAttr.manufacturer, id_value="Hex/ProfiCNC", platform=Platform.GenericSerial), - SerialBoardIdentifier(attribute=SerialAttr.manufacturer, id_value="Holybro", platform=Platform.GenericSerial), -] diff --git a/core/services/ardupilot_manager/flight_controller_detector/bootloader/__init__.py b/core/services/ardupilot_manager/flight_controller_detector/bootloader/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/core/services/ardupilot_manager/flight_controller_detector/bootloader/px4_boards.py b/core/services/ardupilot_manager/flight_controller_detector/bootloader/px4_boards.py new file mode 100644 index 0000000000..71b5d50579 --- /dev/null +++ b/core/services/ardupilot_manager/flight_controller_detector/bootloader/px4_boards.py @@ -0,0 +1,63 @@ +class PX4Board: + boards_id_to_firmware_name_mapping = { + 9: "px4_fmu-v2_default", + 255: "px4_fmu-v3_default", + 11: "px4_fmu-v4_default", + 13: "px4_fmu-v4pro_default", + 20: "uvify_core_default", + 50: "px4_fmu-v5_default", + 51: "px4_fmu-v5x_default", + 52: "px4_fmu-v6_default", + 53: "px4_fmu-v6x_default", + 54: "px4_fmu-v6u_default", + 56: "px4_fmu-v6c_default", + 57: "ark_fmu-v6x_default", + 35: "px4_fmu-v6xrt_default", + 55: "sky-drones_smartap-airlink_default", + 88: "airmind_mindpx-v2_default", + 12: "bitcraze_crazyflie_default", + 14: "bitcraze_crazyflie21_default", + 42: "omnibus_f4sd_default", + 33: "mro_x21_default", + 65: "intel_aerofc-v1_default", + 123: "holybro_kakutef7_default", + 41775: "modalai_fc-v1_default", + 41776: "modalai_fc-v2_default", + 78: "holybro_pix32v5_default", + 79: "holybro_can-gps-v1_default", + 28: "nxp_fmuk66-v3_default", + 30: "nxp_fmuk66-e_default", + 31: "nxp_fmurt1062-v1_default", + 85: "freefly_can-rtk-gps_default", + 120: "cubepilot_cubeyellow_default", + 136: "mro_x21-777_default", + 139: "holybro_durandal-v1_default", + 140: "cubepilot_cubeorange_default", + 1063: "cubepilot_cubeorangeplus_default", + 141: "mro_ctrl-zero-f7_default", + 142: "mro_ctrl-zero-f7-oem_default", + 212: "thepeach_k1_default", + 213: "thepeach_r1_default", + 1009: "cuav_nora_default", + 1010: "cuav_x7pro_default", + 1017: "mro_pixracerpro_default", + 1022: "mro_ctrl-zero-classic_default", + 1023: "mro_ctrl-zero-h7_default", + 1024: "mro_ctrl-zero-h7-oem_default", + 1048: "holybro_kakuteh7_default", + 1053: "holybro_kakuteh7v2_default", + 1054: "holybro_kakuteh7mini_default", + 1110: "jfb_jfb110_default", + } + + def __init__(self, board_id: int, name: str) -> None: + self.id = board_id + self.name = name + + @classmethod + def from_id(cls, board_id: int) -> "PX4Board": + board_name = cls.boards_id_to_firmware_name_mapping.get(board_id, None) + if board_name is None: + raise ValueError(f"Board with id {board_id} not found") + + return cls(board_id, board_name) diff --git a/core/services/ardupilot_manager/flight_controller_detector/bootloader/px4_bootloader.py b/core/services/ardupilot_manager/flight_controller_detector/bootloader/px4_bootloader.py new file mode 100644 index 0000000000..18e817fae5 --- /dev/null +++ b/core/services/ardupilot_manager/flight_controller_detector/bootloader/px4_bootloader.py @@ -0,0 +1,151 @@ +import time +from dataclasses import dataclass +from enum import Enum + +import serial + + +class PX4BootLoaderCommands(int, Enum): + # Protocol Bytes + PROTO_INSYNC = 0x12 # 'in sync' byte sent before status + PROTO_BAD_SILICON_REV = 0x14 # Device is using silicon not suitable + PROTO_EOC = 0x20 # End of command + + # Reply bytes + PROTO_OK = 0x10 # 'ok' response + PROTO_FAILED = 0x11 # 'fail' response + PROTO_INVALID = 0x13 # 'invalid' response for bad commands + + # Command bytes + PROTO_GET_SYNC = 0x21 # NOP for re-establishing sync + PROTO_GET_DEVICE = 0x22 # get device ID bytes + PROTO_CHIP_ERASE = 0x23 # erase program area and reset program address + PROTO_LOAD_ADDRESS = 0x24 # set next programming address + PROTO_PROG_MULTI = 0x27 # write bytes at program address and increment + PROTO_GET_CRC = 0x29 # compute & return a CRC + PROTO_BOOT = 0x30 # boot the application + + # Command bytes - Rev 2 boootloader only + PROTO_CHIP_VERIFY = 0x24 # begin verify mode + PROTO_READ_MULTI = 0x28 # read bytes at programm address and increment + + INFO_BL_REV = 1 # bootloader protocol revision + BL_REV_MIN = 2 # Minimum supported bootlader protocol + BL_REV_MAX = 5 # Maximum supported bootloader protocol + INFO_BOARD_ID = 2 # board type + INFO_BOARD_REV = 3 # board revision + INFO_FLASH_SIZE = 4 # max firmware size in bytes + + PROG_MULTI_MAX = (64,) # write size for PROTO_PROG_MULTI, must be multiple of 4 + READ_MULTI_MAX = 0x28 # read size for PROTO_READ_MULTI, must be multiple of 4. Sik Radio max size is 0x28 + + +@dataclass +class PX4BootLoaderBoardInfo: + board_id: int + boot_loader_version: int + flash_size: int + + +class PX4BootLoader: + # Specific boards that need special handling when in certain versions of boot loader + _board_id_px4_fmu_v2 = 9 + _board_id_px4_fmu_v3 = 255 + _board_flash_size_small = 1032192 + _boot_loader_version_v2_correct_flash = 5 + + def __init__(self, port: serial.Serial): + self.port = port + + def get_board_info(self) -> PX4BootLoaderBoardInfo: + # Try getting in sync + self._sync() + + # Get bootloader version + bootloader_version = self._proto_get_device(PX4BootLoaderCommands.INFO_BL_REV) + + if ( + bootloader_version < PX4BootLoaderCommands.BL_REV_MIN + or bootloader_version > PX4BootLoaderCommands.BL_REV_MAX + ): + raise ValueError("Unsupported bootloader version") + + # Get board ID + board_id = self._proto_get_device(PX4BootLoaderCommands.INFO_BOARD_ID) + + # Get flash size + flash_size = self._proto_get_device(PX4BootLoaderCommands.INFO_FLASH_SIZE) + + # From QGC source code + # Older V2 boards have large flash space but silicon error which prevents it from being used. Bootloader v5 and above + # will correctly account/report for this. Older bootloaders will not. Newer V2 board which support larger flash space are + # reported as V3 board id. + if ( + board_id == self._board_id_px4_fmu_v2 + and bootloader_version >= self._boot_loader_version_v2_correct_flash + and flash_size > self._board_flash_size_small + ): + board_id = self._board_id_px4_fmu_v3 + + return PX4BootLoaderBoardInfo(board_id, bootloader_version, flash_size) + + def _safe_read(self, expected_bytes: int, timeout: int) -> bytearray: + started_reading_at = time.time() + while self.port.in_waiting < expected_bytes: + if time.time() - started_reading_at > timeout: + raise serial.SerialTimeoutException("Timeout while waiting bootloader response") + time.sleep(0.1) + + return bytearray(self.port.read(expected_bytes)) + + def _read_command_response(self, timeout: int) -> bytearray: + # By default PX4 responses are 2 bytes long + res = self._safe_read(2, timeout) + + if res[0] != PX4BootLoaderCommands.PROTO_INSYNC: + raise serial.SerialException("Unable to sync with bootloader") + if res[0] == PX4BootLoaderCommands.PROTO_INSYNC and res[1] == PX4BootLoaderCommands.PROTO_BAD_SILICON_REV: + raise serial.SerialException("Device is using silicon not suitable") + + if res[1] == PX4BootLoaderCommands.PROTO_FAILED: + raise serial.SerialException("Failed to execute command") + if res[1] == PX4BootLoaderCommands.PROTO_INVALID: + raise serial.SerialException("Invalid command") + + return res + + def _safe_serial_write(self, data: bytearray) -> None: + bytes_written = self.port.write(data) + if bytes_written != len(data): + raise serial.SerialException("Invalid number of bytes written to serial port") + + def _send_serial_command(self, command: PX4BootLoaderCommands, timeout: int = 1) -> None: + buffer = bytearray([command, PX4BootLoaderCommands.PROTO_EOC]) + + self._safe_serial_write(buffer) + self.port.flush() + + self._read_command_response(timeout) + + def _sync(self) -> None: + # Clear the buffer prior to syncing + self.port.read_all() + + # Getting in sync some times requires multiple attempts + for _ in range(10): + try: + self._send_serial_command(PX4BootLoaderCommands.PROTO_GET_SYNC) + return + except Exception: + time.sleep(0.1) + + raise RuntimeError("Failed to sync with bootloader") + + def _proto_get_device(self, command: PX4BootLoaderCommands) -> int: + buffer = bytearray([PX4BootLoaderCommands.PROTO_GET_DEVICE, command, PX4BootLoaderCommands.PROTO_EOC]) + + self._safe_serial_write(buffer) + val = self._safe_read(4, 1) + self._read_command_response(1) + + return int.from_bytes(val, byteorder="little") diff --git a/core/services/ardupilot_manager/flight_controller_detector/linux/argonot.py b/core/services/ardupilot_manager/flight_controller_detector/linux/argonot.py index e486d39429..d6dc8cf2f7 100644 --- a/core/services/ardupilot_manager/flight_controller_detector/linux/argonot.py +++ b/core/services/ardupilot_manager/flight_controller_detector/linux/argonot.py @@ -1,11 +1,11 @@ from flight_controller_detector.linux.navigator import NavigatorPi4 -from typedefs import Platform +from typedefs import Platform, PlatformType class Argonot(NavigatorPi4): name = "Argonot" manufacturer = "SymbyTech" - platform = Platform.Argonot + platform = Platform(name="Argonot", platform_type=PlatformType.Linux) devices = { "swap_multiplexer": (0x77, 1), diff --git a/core/services/ardupilot_manager/flight_controller_detector/linux/navigator.py b/core/services/ardupilot_manager/flight_controller_detector/linux/navigator.py index 25dcab7250..9ef71028ea 100644 --- a/core/services/ardupilot_manager/flight_controller_detector/linux/navigator.py +++ b/core/services/ardupilot_manager/flight_controller_detector/linux/navigator.py @@ -5,7 +5,7 @@ from commonwealth.utils.general import CpuType, get_cpu_type from elftools.elf.elffile import ELFFile from flight_controller_detector.linux.linux_boards import LinuxFlightController -from typedefs import Platform, Serial +from typedefs import Platform, PlatformType, Serial class Navigator(LinuxFlightController): @@ -13,7 +13,8 @@ class Navigator(LinuxFlightController): def __init__(self, **data: Any) -> None: name = "Navigator" - plat = Platform.Navigator + plat = Platform(name="Navigator", platform_type=PlatformType.Linux) + ardupilot_board_id = None if platform.machine() == "aarch64": # edge case for 64-bit kernel on 32-bit userland... # let's check the arch for /usr/bin/ls @@ -23,8 +24,8 @@ def __init__(self, **data: Any) -> None: # from https://github.com/eliben/pyelftools/blob/main/elftools/elf/elffile.py#L513 if firm_arch == "AArch64": name = "Navigator64" - plat = Platform.Navigator64 - super().__init__(**data, name=name, platform=plat) + plat = Platform(name="Navigator64", platform_type=PlatformType.Linux) + super().__init__(**data, name=name, platform=plat, ardupilot_board_id=ardupilot_board_id) def detect(self) -> bool: return False diff --git a/core/services/ardupilot_manager/flight_controller_detector/mavlink_board_id.py b/core/services/ardupilot_manager/flight_controller_detector/mavlink_board_id.py new file mode 100644 index 0000000000..4303b24b78 --- /dev/null +++ b/core/services/ardupilot_manager/flight_controller_detector/mavlink_board_id.py @@ -0,0 +1,169 @@ +import asyncio +import struct +import time +from typing import Optional + +import serial +from loguru import logger + +# MAVLink protocol constants +MAVLINK_V2_MAGIC = 0xFD +MAVLINK_V1_MAGIC = 0xFE +AUTOPILOT_VERSION_MSG_ID = 148 +BOARD_ID_OFFSET = 28 # Offset in AUTOPILOT_VERSION payload where board_id is located +BOARD_ID_SHIFT = 16 # Shift to extract board type from full board_id (upper 16 bits) + +# Hardcoded MAVLink COMMAND_LONG message to request AUTOPILOT_VERSION +# This is MAVLink 2.0: system_id=255, component_id=0, requesting message ID 148 +REQUEST_AUTOPILOT_VERSION_MSG = bytes.fromhex( + "fd1e000000ff004c0000000014430000000000000000000000000000000000000000000000000002ec38" +) + + +def parse_autopilot_version_board_id(data: bytes) -> Optional[int]: + """ + Manually parse MAVLink AUTOPILOT_VERSION message to extract board_id field. + + The AUTOPILOT_VERSION message (ID 148) has the following structure: + - Header: 1 byte (magic byte 0xFD for MAVLink 2.0) + - Payload length: 1 byte + - Incompatibility flags: 1 byte + - Compatibility flags: 1 byte + - Sequence: 1 byte + - System ID: 1 byte + - Component ID: 1 byte + - Message ID: 3 bytes (little endian) + - Payload: variable + - Checksum: 2 bytes + + In the AUTOPILOT_VERSION payload, board_id is at offset 28 (uint32) + + Args: + data: Raw bytes from serial port + + Returns: + board_id as integer, or None if parsing fails + + Raises: + ValueError: If MAVLink 1.0 protocol is detected + """ + i = 0 + while i < len(data): + # Look for MAVLink 2.0 magic byte + if data[i] == MAVLINK_V2_MAGIC: + if i + 10 > len(data): + break + + msg_id = struct.unpack(" len(data): + break + msg_id = data[i + 5] + # AUTOPILOT_VERSION is usually MAVLink 2.0, but handle just in case + if msg_id == AUTOPILOT_VERSION_MSG_ID: + header_len = 6 + payload_start = i + header_len + if payload_start + 32 <= len(data): + board_id_bytes = data[payload_start + 28 : payload_start + 32] + board_id = struct.unpack(" Optional[int]: + """ + Synchronous implementation of board_id retrieval. + Internal function - use get_board_id() instead. + + Returns the board type ID (upper 16 bits of the full board_id). + + Raises: + ValueError: If MAVLink 1.0 protocol is detected + """ + try: + # Set a short timeout for individual reads + with serial.Serial(port_path, baudrate, timeout=0.2, exclusive=True, write_timeout=0) as ser: + # Send the hardcoded request message + ser.write(REQUEST_AUTOPILOT_VERSION_MSG) + # Read response with a maximum wait time of 200ms + response_data = b"" + start_time = time.time() + max_wait_time = 0.2 # 200ms + while time.time() - start_time < max_wait_time: + # Only read what's available in the buffer to avoid blocking + if ser.in_waiting > 0: + chunk = ser.read(ser.in_waiting) + response_data += chunk + # Try parsing what we have so far + board_id = parse_autopilot_version_board_id(response_data) + if board_id is not None: + # Return board type ID (upper 16 bits) + return board_id >> BOARD_ID_SHIFT + else: + time.sleep(0.01) + # Final attempt to parse all collected data + board_id = parse_autopilot_version_board_id(response_data) + if board_id is not None: + # Return board type ID (upper 16 bits) + return board_id >> BOARD_ID_SHIFT + logger.info(f"no board id found on {port_path}") + ser.reset_input_buffer() + ser.reset_output_buffer() + ser.flush() + return None + + except ValueError as e: + # Specifically handle MAVLink 1.0 detection + logger.error(f"Protocol error on {port_path}: {e}") + raise # Re-raise to inform caller about unsupported protocol + except serial.SerialException as e: + logger.error(f"Serial port error on {port_path}: {e}") + return None + except Exception as e: + logger.error(f"Error getting board version: {e}") + return None + + +async def get_board_id(port_path: str, baudrate: int = 115200) -> Optional[int]: + """ + Connect to serial port, request AUTOPILOT_VERSION, and extract board_id. + + This function does NOT require pymavlink - it uses hardcoded request bytes + and manually parses the response. + + This is an async function that runs the blocking serial I/O in a thread pool + to avoid blocking the event loop. + + Args: + port_path: Serial port path (e.g., '/dev/ttyUSB0', '/dev/ttyACM0') + baudrate: Baud rate for serial communication (default: 115200) + + Returns: + Board type ID (upper 16 bits of board_id) as integer, or None if failed + + Raises: + ValueError: If MAVLink 1.0 protocol is detected + """ + start = time.time() + board_id = await asyncio.to_thread(_get_board_id_sync, port_path, baudrate) + end = time.time() + logger.info(f"get_board_id to port {port_path} took {end - start} seconds") + return board_id diff --git a/core/services/ardupilot_manager/typedefs.py b/core/services/ardupilot_manager/typedefs.py index 4861693a02..4a439e864d 100644 --- a/core/services/ardupilot_manager/typedefs.py +++ b/core/services/ardupilot_manager/typedefs.py @@ -1,6 +1,6 @@ import ipaddress import re -from enum import Enum, auto +from enum import Enum from pathlib import Path from platform import machine from typing import Any, Dict, List, Optional @@ -75,9 +75,14 @@ def get_sitl_platform_name(machine_arch: str) -> str: class Firmware(BaseModel): """Simplified representation of a firmware, as available on Ardupilot's manifest.""" + board_id: Optional[int] + platform: str name: str url: str + def __hash__(self) -> int: + return hash(self.platform + self.name + str(self.board_id)) + class Vehicle(str, Enum): """Valid Ardupilot vehicle types. @@ -89,52 +94,24 @@ class Vehicle(str, Enum): Copter = "Copter" -# TODO: This class can be deprecated once we move to Python 3.11, which introduces the equivalent StrEnum -class LowerStringEnum(str, Enum): - def __str__(self) -> str: - return self.name.lower() - - -class PlatformType(LowerStringEnum): - Serial = auto() - Linux = auto() - SITL = auto() - Unknown = auto() - Manual = auto() +class PlatformType(str, Enum): + Serial = "Serial" + Linux = "Linux" + SITL = "SITL" + Unknown = "Unknown" + Manual = "Manual" -class Platform(str, Enum): +class Platform(BaseModel): """Valid Ardupilot platform types. - The Enum values are 1:1 representations of the platforms available on the ArduPilot manifest.""" - - Pixhawk1 = "Pixhawk1" - Pixhawk4 = "Pixhawk4" - Pixhawk6X = "Pixhawk6X" - Pixhawk6C = "Pixhawk6C" - CubeOrange = "CubeOrange" - GenericSerial = "GenericSerial" - Navigator = "navigator" - Navigator64 = "navigator64" - Argonot = "argonot" - SITL = get_sitl_platform_name(machine()) - Manual = "Manual" + The Names are a 1:1 representation of the platforms available on the ArduPilot manifest.""" - @property - def type(self) -> PlatformType: - platform_types = { - Platform.Pixhawk1: PlatformType.Serial, - Platform.Pixhawk4: PlatformType.Serial, - Platform.Pixhawk6X: PlatformType.Serial, - Platform.Pixhawk6C: PlatformType.Serial, - Platform.CubeOrange: PlatformType.Serial, - Platform.GenericSerial: PlatformType.Serial, - Platform.Navigator: PlatformType.Linux, - Platform.Navigator64: PlatformType.Linux, - Platform.Argonot: PlatformType.Linux, - Platform.SITL: PlatformType.SITL, - Platform.Manual: PlatformType.Manual, - } - return platform_types.get(self, PlatformType.Unknown) + name: str + platform_type: PlatformType + + @staticmethod + def SITL() -> "Platform": + return Platform(name=get_sitl_platform_name(machine()), platform_type=PlatformType.SITL) class FlightControllerFlags(str, Enum): @@ -150,15 +127,30 @@ class Parameters(BaseModel): class FlightController(BaseModel): """Flight-controller board.""" - name: str - manufacturer: Optional[str] - platform: Platform + name: str # Whatever we get on the usb description + manufacturer: Optional[str] # Whatever we get on the usb description + platform: Platform # The platform of the board according to the ardupilot's manifest. + ardupilot_board_id: Optional[int] path: Optional[str] flags: List[FlightControllerFlags] = [] @property def type(self) -> PlatformType: - return self.platform.type + return self.platform.platform_type + + def __hash__(self) -> int: + return hash(self.name + self.platform.name) + + +class FlightControllerV1(BaseModel): + """Flight-controller board.""" + + name: str # Whatever we get on the usb description + manufacturer: Optional[str] # Whatever we get on the usb description + platform: str # The platform of the board according to the ardupilot's manifest. + ardupilot_board_id: Optional[int] + path: Optional[str] + flags: List[FlightControllerFlags] = [] class AvailableBoards(BaseModel): From decb1df066e02ba990cdd0dc213647ac947fd230 Mon Sep 17 00:00:00 2001 From: Willian Galvani Date: Fri, 14 Nov 2025 14:48:51 -0300 Subject: [PATCH 4/6] frontend: add flashing streaming, board_id, and multiple platform support --- .../components/autopilot/FirmwareManager.vue | 245 +++++++++++++++--- .../vehiclesetup/overview/common.ts | 4 +- core/frontend/src/types/autopilot.ts | 21 +- 3 files changed, 222 insertions(+), 48 deletions(-) diff --git a/core/frontend/src/components/autopilot/FirmwareManager.vue b/core/frontend/src/components/autopilot/FirmwareManager.vue index cd63fba7c6..e981188226 100644 --- a/core/frontend/src/components/autopilot/FirmwareManager.vue +++ b/core/frontend/src/components/autopilot/FirmwareManager.vue @@ -113,7 +113,7 @@ label="Board" hint="If no board is chosen the system will try to flash the currently running board." class="ma-1 pa-0" - @change="chosen_vehicle = null" + @change="clearFirmwareSelection()" />
+
+ + Installing firmware + - Installing firmware. Please wait. +
+
+ {{ log.data.replace(/\r/g, '\n') }}
+
+
@@ -221,7 +247,6 @@