From 145905eeb27ddc6ce1571b2315a40e5ef4ee6112 Mon Sep 17 00:00:00 2001 From: alireza787b Date: Tue, 10 Mar 2026 05:20:36 +0000 Subject: [PATCH 1/4] examples: add vision odometry example Add a practical example showing how to send visual odometry data to PX4 via the Mocap plugin's Odometry message. Demonstrates connecting to PX4 SITL, configuring EKF2 for vision fusion (EKF2_EV_CTRL, EKF2_HGT_REF), sending ODOMETRY at 30 Hz, and monitoring health to verify fusion is active. --- examples/vision_odometry.py | 146 ++++++++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 examples/vision_odometry.py diff --git a/examples/vision_odometry.py b/examples/vision_odometry.py new file mode 100644 index 00000000..ef2376ca --- /dev/null +++ b/examples/vision_odometry.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 + +""" +Send visual odometry to PX4 and verify EKF2 fusion. + +Demonstrates how to feed external position estimates (from a VIO pipeline, +motion capture system, or visual odometry) to PX4 using the Mocap plugin's +set_odometry() API. + +What this example does: + 1. Connects to PX4 and configures EKF2 for external vision fusion + 2. Sends simulated ODOMETRY messages at 30 Hz (a slow circle) + 3. Reads back PX4's position estimate to verify fusion is working + 4. Prints both SENT and RECEIVED positions side-by-side + +What this example does NOT do: + - It does not arm or fly the drone + - The simulated positions are fake (in a real setup, they come from + a camera/VIO pipeline like T265, ORB-SLAM, OpenCV VO, etc.) + +Prerequisites: + - PX4 simulator running (e.g. make px4_sitl gz_x500) + - pip install mavsdk + +Usage: + python3 vision_odometry.py + +After running, you should see PX4's estimated position converge to match +the sent odometry within a few seconds. This confirms EKF2 is fusing +your external vision data. +""" + +import asyncio +import math +import time + +from mavsdk import System +from mavsdk.mocap import ( + Odometry, + PositionBody, + Quaternion, + SpeedBody, + AngularVelocityBody, + Covariance, +) + +# How long to run (seconds) and send rate (Hz) +DURATION_S = 20 +SEND_RATE_HZ = 30 + + +async def run(): + drone = System() + await drone.connect(system_address="udp://:14540") + + print("Waiting for drone to connect...") + async for state in drone.core.connection_state(): + if state.is_connected: + print("-- Connected to drone!") + break + + # Configure EKF2 to fuse external vision for position + velocity + yaw + print("Configuring EKF2 for vision fusion...") + await drone.param.set_param_int("EKF2_EV_CTRL", 15) # pos_xy + pos_z + vel + yaw + await drone.param.set_param_int("EKF2_HGT_REF", 3) # vision as height reference + print(" EKF2_EV_CTRL = 15 (fuse pos_xy + pos_z + vel + yaw)") + print(" EKF2_HGT_REF = 3 (vision height reference)") + + # Run all tasks in parallel + print() + print("Sending odometry and monitoring EKF2 output...") + print("-" * 70) + await asyncio.gather( + send_odometry(drone), + print_ekf2_position(drone), + wait_for_fusion(drone), + ) + + +async def wait_for_fusion(drone): + """Wait until EKF2 reports local position is OK (fusion active).""" + async for health in drone.telemetry.health(): + if health.is_local_position_ok: + print("[HEALTH] EKF2 fusion active -- local position OK!") + return + + +async def print_ekf2_position(drone): + """Read back PX4's EKF2 estimated position to verify fusion.""" + start = time.monotonic() + async for pos_vel in drone.telemetry.position_velocity_ned(): + elapsed = time.monotonic() - start + if elapsed > DURATION_S: + break + n = pos_vel.position.north_m + e = pos_vel.position.east_m + d = pos_vel.position.down_m + print(f" [EKF2 ] t={elapsed:5.1f}s pos=({n:+7.2f}, {e:+7.2f}, {d:+7.2f})") + await asyncio.sleep(3.0) + + +async def send_odometry(drone): + """Send simulated vision odometry at SEND_RATE_HZ for DURATION_S.""" + nan_cov = Covariance([float("nan")]) # unknown covariance + start_time = time.monotonic() + total_frames = DURATION_S * SEND_RATE_HZ + + for i in range(total_frames): + elapsed = time.monotonic() - start_time + + # Simulated trajectory: slow circle, 0.5 m radius, ~12.6 s period + x = 0.5 * math.cos(elapsed * 0.5) + y = 0.5 * math.sin(elapsed * 0.5) + z = 0.0 # NED: 0 means same altitude as start + + vx = -0.25 * math.sin(elapsed * 0.5) + vy = 0.25 * math.cos(elapsed * 0.5) + + odom = Odometry( + time_usec=0, # 0 = let autopilot timestamp it + frame_id=Odometry.MavFrame.MOCAP_NED, + position_body=PositionBody(x, y, z), + q=Quaternion(1.0, 0.0, 0.0, 0.0), # identity = no rotation + speed_body=SpeedBody(vx, vy, 0.0), + angular_velocity_body=AngularVelocityBody(0.0, 0.0, 0.0), + pose_covariance=nan_cov, + velocity_covariance=nan_cov, + ) + await drone.mocap.set_odometry(odom) + + # Print sent position every 3 seconds + if i % (SEND_RATE_HZ * 3) == 0: + print(f" [SENT ] t={elapsed:5.1f}s pos=({x:+7.2f}, {y:+7.2f}, {z:+7.2f})") + + await asyncio.sleep(1.0 / SEND_RATE_HZ) + + print() + print( + "-- Done. Sent {} frames over {:.0f}s at {} Hz".format( + total_frames, DURATION_S, SEND_RATE_HZ + ) + ) + + +if __name__ == "__main__": + asyncio.run(run()) From 347a68672be82dce70a93d2b73abb027ff762910 Mon Sep 17 00:00:00 2001 From: alireza787b Date: Tue, 10 Mar 2026 06:40:58 +0000 Subject: [PATCH 2/4] examples: rewrite vision_odometry as ground truth feedback loop Rewrites the vision odometry example to use a ground truth feedback approach: reads the drone's actual position in simulation, adds configurable noise and delay, and feeds it back as vision odometry. The drone takes off and hovers while the VIO feed runs, showing real-time EKF2 fusion status. Uses new Odometry fields (reset_counter, estimator_type, quality) from MAVSDK-Proto PR #395 / MAVSDK PR #2791. Includes a startup validation that prints a clear error if these fields are not yet available. Co-Authored-By: Claude Opus 4.6 --- examples/vision_odometry.py | 375 ++++++++++++++++++++++++++++-------- 1 file changed, 297 insertions(+), 78 deletions(-) diff --git a/examples/vision_odometry.py b/examples/vision_odometry.py index ef2376ca..e207aca7 100644 --- a/examples/vision_odometry.py +++ b/examples/vision_odometry.py @@ -1,145 +1,364 @@ #!/usr/bin/env python3 """ -Send visual odometry to PX4 and verify EKF2 fusion. +Vision Odometry Ground Truth Feedback -- MAVSDK Example -Demonstrates how to feed external position estimates (from a VIO pipeline, -motion capture system, or visual odometry) to PX4 using the Mocap plugin's -set_odometry() API. +Demonstrates VIO (Visual-Inertial Odometry) integration with PX4 using +the MAVSDK Mocap plugin. In simulation, it reads the drone's actual +position as "ground truth" and feeds it back to PX4 as external vision +data, with configurable noise and delay to simulate a real VIO sensor. -What this example does: - 1. Connects to PX4 and configures EKF2 for external vision fusion - 2. Sends simulated ODOMETRY messages at 30 Hz (a slow circle) - 3. Reads back PX4's position estimate to verify fusion is working - 4. Prints both SENT and RECEIVED positions side-by-side +How it works: + 1. Connects to PX4 and configures EKF2 for vision fusion + 2. Arms and takes off (using GPS in simulation) + 3. Reads position from the simulator as ground truth + 4. Adds configurable Gaussian noise and delay + 5. Sends the result to PX4 as ODOMETRY at 30 Hz via Mocap plugin + 6. Displays real-time EKF2 position so you can see fusion working -What this example does NOT do: - - It does not arm or fly the drone - - The simulated positions are fake (in a real setup, they come from - a camera/VIO pipeline like T265, ORB-SLAM, OpenCV VO, etc.) +Adjust the parameters below to experiment: + - Increase NOISE_POS_STDDEV_M to see EKF2 handle noisy data + - Increase DELAY_MS to see the effect of sensor latency + - Lower QUALITY_PERCENT with a nonzero EKF2_EV_QMIN to see rejection + - Set EKF2_GPS_CTRL = 0 to test pure vision navigation Prerequisites: - PX4 simulator running (e.g. make px4_sitl gz_x500) - pip install mavsdk +Dependencies: + This example requires extended Odometry fields (reset_counter, + estimator_type, quality) from: + - MAVSDK-Proto PR #395: add odometry fields to mocap proto + - MAVSDK-Proto PR #396: fix MavFrame enum values + - MAVSDK PR #2791: wire up new odometry fields in C++ + Install a MAVSDK build that includes these changes. + Usage: python3 vision_odometry.py - -After running, you should see PX4's estimated position converge to match -the sent odometry within a few seconds. This confirms EKF2 is fusing -your external vision data. """ import asyncio -import math +import inspect +import random import time +from collections import deque from mavsdk import System from mavsdk.mocap import ( + AngularVelocityBody, + Covariance, Odometry, PositionBody, Quaternion, SpeedBody, - AngularVelocityBody, - Covariance, ) -# How long to run (seconds) and send rate (Hz) -DURATION_S = 20 -SEND_RATE_HZ = 30 +# =================================================================== +# Configurable Parameters +# Adjust these to experiment with VIO behavior and EKF2 response. +# =================================================================== + +# -- VIO Sensor Simulation -- +SEND_RATE_HZ = 30 # VIO output rate. PX4 expects 30-50 Hz. +NOISE_POS_STDDEV_M = 0.02 # Position noise std dev (m). Real: 0.01-0.05 +NOISE_VEL_STDDEV_M_S = 0.05 # Velocity noise std dev (m/s). Real: 0.02-0.1 +DELAY_MS = 0 # Sensor delay (ms). Real: 10-50. 0 = off. +QUALITY_PERCENT = 75 # Confidence 0-100. See EKF2_EV_QMIN. + +# -- Flight -- +FLIGHT_ALT_M = 5.0 # Takeoff altitude (m) +HOVER_TIME_S = 30 # How long to feed VIO while hovering (s) +DISPLAY_INTERVAL_S = 2.0 # Status print interval (s) + +# -- EKF2 Vision Fusion -- +# EKF2_EV_CTRL bitmask: 1=hpos, 2=vpos, 4=vel, 8=yaw. 15=all. +EKF2_EV_CTRL = 15 +# EKF2_HGT_REF: 0=baro, 1=GPS, 2=range, 3=vision +EKF2_HGT_REF = 3 +# EKF2_GPS_CTRL: 0=off, 7=full. Keep 7 to compare GPS+VIO. +# Set to 0 for pure vision navigation (no GPS fallback). +EKF2_GPS_CTRL = 7 + +# -- Connection -- +CONNECT_ADDRESS = "udpin://0.0.0.0:14540" + + +# =================================================================== +# API Validation +# =================================================================== + + +def validate_odometry_api(): + """Verify that MAVSDK includes the extended Odometry fields. + + This example uses reset_counter, estimator_type, and quality + fields added by MAVSDK-Proto PR #395. If your MAVSDK version + does not include these, you will see a clear error here. + """ + # Check constructor accepts the three new fields + try: + sig = inspect.signature(Odometry.__init__) + # Original API: self + 8 fields = 9 params + # Extended API: self + 11 fields = 12 params + if len(sig.parameters) <= 9: + raise SystemExit( + "\n[ERROR] This example requires extended Odometry fields" + "\n(reset_counter, estimator_type, quality) that are not" + "\npresent in your MAVSDK installation.\n" + "\nThese fields are introduced by:" + "\n - MAVSDK-Proto PR #395" + "\n - MAVSDK-Proto PR #396" + "\n - MAVSDK PR #2791" + "\n\nPlease install a MAVSDK build that includes them." + ) + except (ValueError, TypeError) as exc: + raise SystemExit(f"[ERROR] Cannot inspect Odometry API: {exc}") from exc + + # Check MavEstimatorType enum exists + if not hasattr(Odometry, "MavEstimatorType"): + raise SystemExit( + "[ERROR] Odometry.MavEstimatorType not found.\n" + "Install MAVSDK with MAVSDK-Proto PR #395 support." + ) + + +# =================================================================== +# Main +# =================================================================== async def run(): + validate_odometry_api() + drone = System() - await drone.connect(system_address="udp://:14540") + await drone.connect(system_address=CONNECT_ADDRESS) print("Waiting for drone to connect...") async for state in drone.core.connection_state(): if state.is_connected: - print("-- Connected to drone!") + print("[OK] Connected to drone") break - # Configure EKF2 to fuse external vision for position + velocity + yaw - print("Configuring EKF2 for vision fusion...") - await drone.param.set_param_int("EKF2_EV_CTRL", 15) # pos_xy + pos_z + vel + yaw - await drone.param.set_param_int("EKF2_HGT_REF", 3) # vision as height reference - print(" EKF2_EV_CTRL = 15 (fuse pos_xy + pos_z + vel + yaw)") - print(" EKF2_HGT_REF = 3 (vision height reference)") + await configure_ekf2(drone) + + print("\nWaiting for position estimate...") + async for health in drone.telemetry.health(): + if health.is_global_position_ok and health.is_home_position_ok: + print("[OK] Position estimate ready") + break + + print("\n-- Arming") + await drone.action.arm() + + print(f"-- Taking off to {FLIGHT_ALT_M:.0f} m") + await drone.action.set_takeoff_altitude(FLIGHT_ALT_M) + await drone.action.takeoff() + + print("-- Waiting for takeoff...") + await asyncio.sleep(10) + print("[OK] Hovering -- starting VIO feed\n") + + print_settings_summary() + + # Run VIO feed, status monitor, and health watch concurrently + stop_event = asyncio.Event() + vio_task = asyncio.ensure_future(vio_feed_loop(drone, stop_event)) + monitor_task = asyncio.ensure_future(monitor_status(drone, stop_event)) + health_task = asyncio.ensure_future(watch_fusion_health(drone, stop_event)) + + await asyncio.sleep(HOVER_TIME_S) + stop_event.set() - # Run all tasks in parallel - print() - print("Sending odometry and monitoring EKF2 output...") - print("-" * 70) await asyncio.gather( - send_odometry(drone), - print_ekf2_position(drone), - wait_for_fusion(drone), + vio_task, + monitor_task, + health_task, + return_exceptions=True, ) + print("\n-- Landing") + await drone.action.land() -async def wait_for_fusion(drone): - """Wait until EKF2 reports local position is OK (fusion active).""" - async for health in drone.telemetry.health(): - if health.is_local_position_ok: - print("[HEALTH] EKF2 fusion active -- local position OK!") - return + async for in_air in drone.telemetry.in_air(): + if not in_air: + print("[OK] Landed") + break + print_footer() + + +async def configure_ekf2(drone): + """Set EKF2 parameters for external vision fusion.""" + print("\nConfiguring EKF2 for vision fusion:") + params = [ + ("EKF2_EV_CTRL", EKF2_EV_CTRL, "fuse hpos+vpos+vel+yaw"), + ("EKF2_HGT_REF", EKF2_HGT_REF, "vision height ref"), + ("EKF2_GPS_CTRL", EKF2_GPS_CTRL, "GPS fusion"), + ] + for name, value, desc in params: + await drone.param.set_param_int(name, value) + print(f" {name:20s} = {value:2d} ({desc})") + + +def print_settings_summary(): + """Print current VIO simulation settings.""" + print("VIO simulation settings:") + print(f" Rate: {SEND_RATE_HZ} Hz") + print(f" Noise: pos={NOISE_POS_STDDEV_M} m, vel={NOISE_VEL_STDDEV_M_S} m/s") + print(f" Delay: {DELAY_MS} ms") + print(f" Quality: {QUALITY_PERCENT}%") + if EKF2_GPS_CTRL == 0: + print(" Mode: VISION ONLY (GPS disabled)") + else: + print(" Mode: GPS + Vision (dual fusion)") + print() + + +# =================================================================== +# VIO Feed Loop +# =================================================================== + + +async def vio_feed_loop(drone, stop_event): + """Read ground truth, add noise/delay, send as vision odometry. + + In simulation, position_velocity_ned() returns the EKF2 estimate, + which closely matches ground truth when GPS is working. We add + configurable noise and delay, then feed this back to PX4 as + vision odometry via the Mocap plugin. + + In a real system, the position would come from a camera-based + VIO pipeline (e.g. RealSense T265, ORB-SLAM3, OpenCV VO). + """ + nan_cov = Covariance([float("nan")]) + frame_count = 0 + reset_counter = 0 + interval = 1.0 / SEND_RATE_HZ + + # Set up delay buffer if needed + if DELAY_MS > 0: + buf_size = max(1, DELAY_MS * SEND_RATE_HZ // 1000) + delay_buf = deque(maxlen=buf_size) + print(f"[VIO ] Delay buffer: {buf_size} samples (~{DELAY_MS} ms)") + else: + delay_buf = None -async def print_ekf2_position(drone): - """Read back PX4's EKF2 estimated position to verify fusion.""" - start = time.monotonic() async for pos_vel in drone.telemetry.position_velocity_ned(): - elapsed = time.monotonic() - start - if elapsed > DURATION_S: + if stop_event.is_set(): break + n = pos_vel.position.north_m e = pos_vel.position.east_m d = pos_vel.position.down_m - print(f" [EKF2 ] t={elapsed:5.1f}s pos=({n:+7.2f}, {e:+7.2f}, {d:+7.2f})") - await asyncio.sleep(3.0) - + vn = pos_vel.velocity.north_m_s + ve = pos_vel.velocity.east_m_s + vd = pos_vel.velocity.down_m_s -async def send_odometry(drone): - """Send simulated vision odometry at SEND_RATE_HZ for DURATION_S.""" - nan_cov = Covariance([float("nan")]) # unknown covariance - start_time = time.monotonic() - total_frames = DURATION_S * SEND_RATE_HZ - - for i in range(total_frames): - elapsed = time.monotonic() - start_time + # Add Gaussian noise to simulate VIO sensor imperfections + noisy = ( + n + random.gauss(0, NOISE_POS_STDDEV_M), + e + random.gauss(0, NOISE_POS_STDDEV_M), + d + random.gauss(0, NOISE_POS_STDDEV_M), + vn + random.gauss(0, NOISE_VEL_STDDEV_M_S), + ve + random.gauss(0, NOISE_VEL_STDDEV_M_S), + vd + random.gauss(0, NOISE_VEL_STDDEV_M_S), + ) - # Simulated trajectory: slow circle, 0.5 m radius, ~12.6 s period - x = 0.5 * math.cos(elapsed * 0.5) - y = 0.5 * math.sin(elapsed * 0.5) - z = 0.0 # NED: 0 means same altitude as start + # Simulate sensor delay using a ring buffer + if delay_buf is not None: + delay_buf.append(noisy) + noisy = delay_buf[0] # oldest = ~DELAY_MS ago - vx = -0.25 * math.sin(elapsed * 0.5) - vy = 0.25 * math.cos(elapsed * 0.5) + pn, pe, pd, svn, sve, svd = noisy + # Build and send ODOMETRY message with extended fields: + # reset_counter: increment on VIO tracking reset + # estimator_type: VIO (visual-inertial odometry) + # quality: confidence percentage (see EKF2_EV_QMIN) odom = Odometry( - time_usec=0, # 0 = let autopilot timestamp it + time_usec=0, frame_id=Odometry.MavFrame.MOCAP_NED, - position_body=PositionBody(x, y, z), - q=Quaternion(1.0, 0.0, 0.0, 0.0), # identity = no rotation - speed_body=SpeedBody(vx, vy, 0.0), + position_body=PositionBody(pn, pe, pd), + q=Quaternion(1.0, 0.0, 0.0, 0.0), + speed_body=SpeedBody(svn, sve, svd), angular_velocity_body=AngularVelocityBody(0.0, 0.0, 0.0), pose_covariance=nan_cov, velocity_covariance=nan_cov, + reset_counter=reset_counter, + estimator_type=Odometry.MavEstimatorType.VIO, + quality=QUALITY_PERCENT, ) await drone.mocap.set_odometry(odom) + frame_count += 1 - # Print sent position every 3 seconds - if i % (SEND_RATE_HZ * 3) == 0: - print(f" [SENT ] t={elapsed:5.1f}s pos=({x:+7.2f}, {y:+7.2f}, {z:+7.2f})") + await asyncio.sleep(interval) - await asyncio.sleep(1.0 / SEND_RATE_HZ) + print(f"[VIO ] Sent {frame_count} frames at {SEND_RATE_HZ} Hz") - print() + +# =================================================================== +# Status Monitoring +# =================================================================== + + +async def monitor_status(drone, stop_event): + """Print EKF2 position at regular intervals.""" print( - "-- Done. Sent {} frames over {:.0f}s at {} Hz".format( - total_frames, DURATION_S, SEND_RATE_HZ - ) + f"{'Time':>6s} {'North':>8s} {'East':>8s} {'Down':>8s} {'Vn':>6s} {'Ve':>6s} {'Vd':>6s}" ) + print("-" * 60) + + start = time.monotonic() + last_print = 0.0 + + async for pv in drone.telemetry.position_velocity_ned(): + if stop_event.is_set(): + break + + elapsed = time.monotonic() - start + if elapsed - last_print < DISPLAY_INTERVAL_S: + await asyncio.sleep(0.2) + continue + last_print = elapsed + + p = pv.position + v = pv.velocity + print( + f" {elapsed:5.1f}s" + f" {p.north_m:+8.3f} {p.east_m:+8.3f}" + f" {p.down_m:+8.3f}" + f" {v.north_m_s:+5.2f}" + f" {v.east_m_s:+5.2f}" + f" {v.down_m_s:+5.2f}" + ) + + +async def watch_fusion_health(drone, stop_event): + """Report EKF2 fusion health changes.""" + was_ok = False + + async for health in drone.telemetry.health(): + if stop_event.is_set(): + break + is_ok = health.is_local_position_ok + if is_ok and not was_ok: + print("[HEALTH] Local position OK -- vision fusion active") + elif not is_ok and was_ok: + print("[HEALTH] Local position LOST -- check VIO params") + was_ok = is_ok + + +def print_footer(): + """Print final guidance for the user.""" + print() + print("=" * 60) + print("Done! Adjust parameters at the top of this file:") + print(" NOISE_POS_STDDEV_M -- increase to stress EKF2") + print(" DELAY_MS -- simulate sensor latency") + print(" QUALITY_PERCENT -- lower with EKF2_EV_QMIN > 0") + print(" EKF2_GPS_CTRL -- set 0 for vision-only mode") + print("=" * 60) if __name__ == "__main__": From f0f5daf1ffbc00412400cbf197f9807df528b755 Mon Sep 17 00:00:00 2001 From: alireza787b Date: Tue, 10 Mar 2026 11:57:52 +0000 Subject: [PATCH 3/4] examples: remove auto-takeoff, make VIO feed a background tool The vision odometry example now does NOT arm, takeoff, or control the drone. It only feeds VIO data. The user flies the drone however they like (QGroundControl, RC, mission, another script) while this runs alongside. Press Ctrl+C to stop. Co-Authored-By: Claude Opus 4.6 --- examples/vision_odometry.py | 108 ++++++++++++++---------------------- 1 file changed, 41 insertions(+), 67 deletions(-) diff --git a/examples/vision_odometry.py b/examples/vision_odometry.py index e207aca7..dc3d1180 100644 --- a/examples/vision_odometry.py +++ b/examples/vision_odometry.py @@ -3,18 +3,24 @@ """ Vision Odometry Ground Truth Feedback -- MAVSDK Example -Demonstrates VIO (Visual-Inertial Odometry) integration with PX4 using -the MAVSDK Mocap plugin. In simulation, it reads the drone's actual -position as "ground truth" and feeds it back to PX4 as external vision -data, with configurable noise and delay to simulate a real VIO sensor. +Feeds simulated VIO (Visual-Inertial Odometry) data to PX4 using the +MAVSDK Mocap plugin. Reads the simulator's ground truth position, adds +configurable noise and delay, and sends it back as external vision +odometry. This is for testing and education -- the simulator looks +and behaves completely normally while this script runs alongside it. + +This script does NOT arm, takeoff, or control the drone in any way. +You fly the drone however you like (QGroundControl, RC, mission, or +another MAVSDK script) while this script feeds VIO data in the +background. Press Ctrl+C to stop. How it works: 1. Connects to PX4 and configures EKF2 for vision fusion - 2. Arms and takes off (using GPS in simulation) - 3. Reads position from the simulator as ground truth - 4. Adds configurable Gaussian noise and delay - 5. Sends the result to PX4 as ODOMETRY at 30 Hz via Mocap plugin - 6. Displays real-time EKF2 position so you can see fusion working + 2. Reads position from the simulator as ground truth + 3. Adds configurable Gaussian noise and delay + 4. Sends the result to PX4 as ODOMETRY at 30 Hz via Mocap plugin + 5. Displays real-time EKF2 position and fusion health status + 6. Runs until you press Ctrl+C Adjust the parameters below to experiment: - Increase NOISE_POS_STDDEV_M to see EKF2 handle noisy data @@ -35,7 +41,11 @@ Install a MAVSDK build that includes these changes. Usage: - python3 vision_odometry.py + 1. Start PX4 simulator + 2. Run: python3 vision_odometry.py + 3. Fly the drone normally (QGroundControl, RC, mission, etc.) + 4. Watch the VIO feed and EKF2 fusion status in the terminal + 5. Press Ctrl+C to stop """ import asyncio @@ -66,9 +76,7 @@ DELAY_MS = 0 # Sensor delay (ms). Real: 10-50. 0 = off. QUALITY_PERCENT = 75 # Confidence 0-100. See EKF2_EV_QMIN. -# -- Flight -- -FLIGHT_ALT_M = 5.0 # Takeoff altitude (m) -HOVER_TIME_S = 30 # How long to feed VIO while hovering (s) +# -- Display -- DISPLAY_INTERVAL_S = 2.0 # Status print interval (s) # -- EKF2 Vision Fusion -- @@ -96,7 +104,6 @@ def validate_odometry_api(): fields added by MAVSDK-Proto PR #395. If your MAVSDK version does not include these, you will see a clear error here. """ - # Check constructor accepts the three new fields try: sig = inspect.signature(Odometry.__init__) # Original API: self + 8 fields = 9 params @@ -115,7 +122,6 @@ def validate_odometry_api(): except (ValueError, TypeError) as exc: raise SystemExit(f"[ERROR] Cannot inspect Odometry API: {exc}") from exc - # Check MavEstimatorType enum exists if not hasattr(Odometry, "MavEstimatorType"): raise SystemExit( "[ERROR] Odometry.MavEstimatorType not found.\n" @@ -141,49 +147,20 @@ async def run(): break await configure_ekf2(drone) - - print("\nWaiting for position estimate...") - async for health in drone.telemetry.health(): - if health.is_global_position_ok and health.is_home_position_ok: - print("[OK] Position estimate ready") - break - - print("\n-- Arming") - await drone.action.arm() - - print(f"-- Taking off to {FLIGHT_ALT_M:.0f} m") - await drone.action.set_takeoff_altitude(FLIGHT_ALT_M) - await drone.action.takeoff() - - print("-- Waiting for takeoff...") - await asyncio.sleep(10) - print("[OK] Hovering -- starting VIO feed\n") - print_settings_summary() - # Run VIO feed, status monitor, and health watch concurrently - stop_event = asyncio.Event() - vio_task = asyncio.ensure_future(vio_feed_loop(drone, stop_event)) - monitor_task = asyncio.ensure_future(monitor_status(drone, stop_event)) - health_task = asyncio.ensure_future(watch_fusion_health(drone, stop_event)) - - await asyncio.sleep(HOVER_TIME_S) - stop_event.set() - - await asyncio.gather( - vio_task, - monitor_task, - health_task, - return_exceptions=True, - ) + print("VIO feed is running. Fly the drone normally") + print("(QGroundControl, RC, mission, etc.). Press Ctrl+C to stop.\n") - print("\n-- Landing") - await drone.action.land() + # Run VIO feed, status monitor, and health watch concurrently + vio_task = asyncio.ensure_future(vio_feed_loop(drone)) + monitor_task = asyncio.ensure_future(monitor_status(drone)) + health_task = asyncio.ensure_future(watch_fusion_health(drone)) - async for in_air in drone.telemetry.in_air(): - if not in_air: - print("[OK] Landed") - break + try: + await asyncio.gather(vio_task, monitor_task, health_task) + except asyncio.CancelledError: + pass print_footer() @@ -199,6 +176,7 @@ async def configure_ekf2(drone): for name, value, desc in params: await drone.param.set_param_int(name, value) print(f" {name:20s} = {value:2d} ({desc})") + print() def print_settings_summary(): @@ -220,7 +198,7 @@ def print_settings_summary(): # =================================================================== -async def vio_feed_loop(drone, stop_event): +async def vio_feed_loop(drone): """Read ground truth, add noise/delay, send as vision odometry. In simulation, position_velocity_ned() returns the EKF2 estimate, @@ -245,9 +223,6 @@ async def vio_feed_loop(drone, stop_event): delay_buf = None async for pos_vel in drone.telemetry.position_velocity_ned(): - if stop_event.is_set(): - break - n = pos_vel.position.north_m e = pos_vel.position.east_m d = pos_vel.position.down_m @@ -302,7 +277,7 @@ async def vio_feed_loop(drone, stop_event): # =================================================================== -async def monitor_status(drone, stop_event): +async def monitor_status(drone): """Print EKF2 position at regular intervals.""" print( f"{'Time':>6s} {'North':>8s} {'East':>8s} {'Down':>8s} {'Vn':>6s} {'Ve':>6s} {'Vd':>6s}" @@ -313,9 +288,6 @@ async def monitor_status(drone, stop_event): last_print = 0.0 async for pv in drone.telemetry.position_velocity_ned(): - if stop_event.is_set(): - break - elapsed = time.monotonic() - start if elapsed - last_print < DISPLAY_INTERVAL_S: await asyncio.sleep(0.2) @@ -334,13 +306,11 @@ async def monitor_status(drone, stop_event): ) -async def watch_fusion_health(drone, stop_event): +async def watch_fusion_health(drone): """Report EKF2 fusion health changes.""" was_ok = False async for health in drone.telemetry.health(): - if stop_event.is_set(): - break is_ok = health.is_local_position_ok if is_ok and not was_ok: print("[HEALTH] Local position OK -- vision fusion active") @@ -353,7 +323,8 @@ def print_footer(): """Print final guidance for the user.""" print() print("=" * 60) - print("Done! Adjust parameters at the top of this file:") + print("VIO feed stopped. Adjust parameters at the top of") + print("this file to experiment:") print(" NOISE_POS_STDDEV_M -- increase to stress EKF2") print(" DELAY_MS -- simulate sensor latency") print(" QUALITY_PERCENT -- lower with EKF2_EV_QMIN > 0") @@ -362,4 +333,7 @@ def print_footer(): if __name__ == "__main__": - asyncio.run(run()) + try: + asyncio.run(run()) + except KeyboardInterrupt: + print("\n[OK] Stopped by user") From fc98488562796b709ed6e627cf4cdd0ffc292977 Mon Sep 17 00:00:00 2001 From: alireza787b Date: Tue, 10 Mar 2026 15:55:27 +0000 Subject: [PATCH 4/4] examples: rename quality to quality_percent Match proto field rename from MAVSDK-Proto PR #395 review feedback. Co-Authored-By: Claude Opus 4.6 --- examples/vision_odometry.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/examples/vision_odometry.py b/examples/vision_odometry.py index dc3d1180..ce1f91fb 100644 --- a/examples/vision_odometry.py +++ b/examples/vision_odometry.py @@ -34,7 +34,7 @@ Dependencies: This example requires extended Odometry fields (reset_counter, - estimator_type, quality) from: + estimator_type, quality_percent) from: - MAVSDK-Proto PR #395: add odometry fields to mocap proto - MAVSDK-Proto PR #396: fix MavFrame enum values - MAVSDK PR #2791: wire up new odometry fields in C++ @@ -100,7 +100,7 @@ def validate_odometry_api(): """Verify that MAVSDK includes the extended Odometry fields. - This example uses reset_counter, estimator_type, and quality + This example uses reset_counter, estimator_type, and quality_percent fields added by MAVSDK-Proto PR #395. If your MAVSDK version does not include these, you will see a clear error here. """ @@ -111,7 +111,7 @@ def validate_odometry_api(): if len(sig.parameters) <= 9: raise SystemExit( "\n[ERROR] This example requires extended Odometry fields" - "\n(reset_counter, estimator_type, quality) that are not" + "\n(reset_counter, estimator_type, quality_percent) that are not" "\npresent in your MAVSDK installation.\n" "\nThese fields are introduced by:" "\n - MAVSDK-Proto PR #395" @@ -250,7 +250,7 @@ async def vio_feed_loop(drone): # Build and send ODOMETRY message with extended fields: # reset_counter: increment on VIO tracking reset # estimator_type: VIO (visual-inertial odometry) - # quality: confidence percentage (see EKF2_EV_QMIN) + # quality_percent: confidence 0-100, -1=unknown (EKF2_EV_QMIN) odom = Odometry( time_usec=0, frame_id=Odometry.MavFrame.MOCAP_NED, @@ -262,7 +262,7 @@ async def vio_feed_loop(drone): velocity_covariance=nan_cov, reset_counter=reset_counter, estimator_type=Odometry.MavEstimatorType.VIO, - quality=QUALITY_PERCENT, + quality_percent=QUALITY_PERCENT, ) await drone.mocap.set_odometry(odom) frame_count += 1