Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 107 additions & 13 deletions emioapi/__main__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import emioapi
import sys

from emioapi._logging_config import logger

def calibrate():
"""
Calibrate the camera of the first Emio camera found
Calibrate the camera of the first Emio camera found. For more informations about the calibration process, please refer to the EmioCamera.calibrate() method documentation.
"""
import emioapi
from emioapi._logging_config import logger

camera = emioapi.EmioCamera(show=True)
print("Available cameras:", emioapi.EmioCamera.listCameras())

Expand All @@ -32,14 +32,108 @@ def calibrate():
else:
print("Failed to open camera.")

def startUDP(args):
"""
Start a UDP bridge configured with the parameters found in args.

A handshake is done at the beginning to ensure that the remote host is ready to receive data. It shold follow the same protocol describded below with dummy data.

The sequence number is a simple counter that is incremented at each frame. It is used by the process_motors process to make sure that the remote is synchronized.

The protocol is as follows:
- The bridge sends a packet made of a sequence number and followed by the marker(s) position(s) and the four motors posiitons
- The remote host should reply with a packet containing the four motors positions to send to the Emio.

"""
import emioapi.udp_bridge.udp_bridge as udpBdrige
config = udpBdrige.UDPBridgeConfig(
fps = args.fps,
nb_markers = args.nb_markers,
side = args.side,
sort = args.sort,
remote_ip = args.remote_ip,
remote_port = args.remote_port,
local_port = args.local_port,
bind_port = args.bind_port,
recv_timeout = args.recv_timeout,
camera_only = args.camera_only,
motors_only = args.motors_only
)

print("-"*50)
print(f"Starting UDP bridge with config: {config}")
print("-"*50)

udpBdrige.startUDPbridge(config)


def parse_args():

import argparse
import emioapi.udp_bridge.udp_bridge_params as prm

p = argparse.ArgumentParser(
description="Emio API tools for Emio",
prog="emioapi",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)

subparsers = p.add_subparsers(
title="Available Commands",
dest="command",
required=True
)

# --- Subparser for 'calibrate' command ---
parser_calibrate = subparsers.add_parser("calibrate", help="Calibrate the Emio camera.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser_calibrate.description = calibrate.__doc__


# --- Subparser for 'startUDP' command ---
parser_udp = subparsers.add_parser("startUDP", help="Start a UDP bridge for motor/camera data.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter)

parser_udp.description = startUDP.__doc__

# Add the specific arguments needed for startUDP here
parser_udp.add_argument("--fps", type=int, default=prm.fps, help="Frames per second (e.g., 30)")
parser_udp.add_argument("--nb-markers", type=int, default=prm.nb_markers, help="Number of markers to process")
parser_udp.add_argument("--side", choices=["top", "front", "plan"], default=prm.side, help="Camera side view")
parser_udp.add_argument("--sort", choices=["y", "z"], default=prm.sort, help="Sorting axis")
parser_udp.add_argument("--remote-ip", type=str, default=prm.remote_ip, help="Remote IP address")
parser_udp.add_argument("--remote-port", type=int, default=prm.remote_port, help="Remote Port")
parser_udp.add_argument("--local-port", type=int, default=prm.local_port, help="Local Port")
parser_udp.add_argument("--bind-port", type=int, default=prm.bind_port, help="Bind port for local communication")
parser_udp.add_argument("--recv-timeout", type=float, default=prm.recv_timeout, help="Receive timeout in seconds")
parser_udp.add_argument("--camera-only", action=argparse.BooleanOptionalAction, default=False, help="Only send the markers position without waiting for the motors command. The motors positions will be sent as 0. And no command will be applied to the motors.")
parser_udp.add_argument("--motors-only", action=argparse.BooleanOptionalAction, default=False, help="Only send the motors position without waiting for the camera data")

try:
# Parse the arguments
args = p.parse_args()

print(f"Command chosen: {args.command}" + (f" with parameters: {vars(args)}" if args.command == "startUDP" else ""))

# Execute the function associated with the chosen command
if args.command == "calibrate":
calibrate()
elif args.command == "startUDP":
startUDP(args)
else:
print(f"Unknown command: {args.command}")
except:
p.print_help()
print()
parser_calibrate.print_help()
print()
parser_udp.print_help()
pass


if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == 'calibrate':
calibrate()
else:
print("Available functions from emioapi tool:")
for name in dir(sys.modules[__name__]):
if not name.startswith("_"):
attr = getattr(sys.modules[__name__], name)
if callable(attr):
print(f" {name} - {attr.__doc__}")
try:
args = parse_args()
except Exception as e:
import traceback
print(f"An error happened: {traceback.format_exc()}")
32 changes: 20 additions & 12 deletions emioapi/_depthcamera.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import os
import json
from time import sleep
import time
from enum import Enum

Expand All @@ -9,7 +7,7 @@
import pyrealsense2 as rs

from ._camerafeedwindow import CameraFeedWindow
from ._positionestimation import PositionEstimation, image_pixel_to_mm, CONFIG_FILENAME
from ._positionestimation import PositionEstimation, CONFIG_FILENAME
from emioapi._logging_config import logger

DEFAULT_CAMERA_PARAMS = {"hue_h": 90, "hue_l": 36, "sat_h": 255, "sat_l": 138, "value_h": 255, "value_l": 35, "erosion_size": 0, "area": 100}
Expand Down Expand Up @@ -68,6 +66,7 @@ class DepthCamera:
parameter = {}
tracking = False
trackers_pos = []
trackers_pos_image = []
maskWindow = None
frameWindow = None
hsvWindow = None
Expand All @@ -77,6 +76,7 @@ class DepthCamera:
maskFrame = None
frame: np.ndarray = None
depth_frame: np.ndarray = None
depth_rsframe: np.ndarray = None
depth_max = 430
depth_min = 2
calibration_status = CalibrationStatusEnum.NOT_CALIBRATED
Expand Down Expand Up @@ -123,6 +123,7 @@ def __init__(self,
return

self.trackers_pos = []
self.trackers_pos_image = []

if parameter:
self.parameter = parameter
Expand Down Expand Up @@ -290,19 +291,24 @@ def get_frame(self):
color_frame = frames.get_color_frame()

if not depth_frame or not color_frame:
return False, color_frame, depth_frame
return False

# Convert images to numpy arrays
depth_image = np.asanyarray(depth_frame.get_data())
color_image = np.asanyarray(color_frame.get_data())
return True, color_image, depth_image, depth_frame
self.depth_frame = np.asanyarray(depth_frame.get_data())
self.frame = np.asanyarray(color_frame.get_data())
self.depth_rsframe = depth_frame

return True

def update(self):
ret, self.frame, self.depth_frame, depth_rsframe = self.get_frame()

def update(self):
ret = self.get_frame()
if ret is False:
return
self.process_frame()

def process_frame(self):

# if frame is read correctly ret is True

self.hsvFrame = cv.cvtColor(self.frame, cv.COLOR_BGR2HSV)
Expand Down Expand Up @@ -333,6 +339,7 @@ def update(self):
areas = [cv.contourArea(cnt) for cnt in contours]

self.trackers_pos = []
self.trackers_pos_image = []
for i, a in enumerate(areas):
if a > self.parameter['area']:
x, y = compute_contour_center(contours[i])
Expand All @@ -341,6 +348,7 @@ def update(self):
depth = compute_median_depth(contours[i], self.depth_frame) if self.depth_frame[y, x] == 0 else self.depth_frame[y, x]
worldx, worldy, worldz = self.position_estimator.camera_image_to_simulation(x, y, depth)
self.trackers_pos.append([worldx, worldy, worldz])
self.trackers_pos_image.append([x, y, depth])

cv.drawContours(marker_mask, [contours[i]], -1, color=255, thickness=-1)
for frame in [self.hsvFrame, self.frame]:
Expand All @@ -352,7 +360,7 @@ def update(self):
cv.drawContours(self.frame, [contours[i]], -1, (255, 255, 0), 3)

if self.compute_point_cloud:
points = self.pc.calculate(depth_rsframe)
points = self.pc.calculate(self.depth_rsframe)
v = points.get_vertices()
self.point_cloud = np.asanyarray(v).view(np.float32).reshape(-1, 3) # xyz

Expand All @@ -370,7 +378,7 @@ def update(self):
self.hsvWindow.set_frame(self.hsvFrame)

if self.depthWindow is not None and self.depthWindow.running:
colorized = np.asanyarray(rs.colorizer().colorize(depth_rsframe).get_data())
colorized = np.asanyarray(rs.colorizer().colorize(self.depth_rsframe).get_data())
self.depthWindow.set_frame(colorized)

self.rootWindow.update()
Expand All @@ -395,4 +403,4 @@ def run_loop(self):
self.rootWindow.update()
self.update()

self.close()
self.close()
29 changes: 29 additions & 0 deletions emioapi/_positionestimation.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,36 @@ def calibrate(self, frame, depth_image, aggregate, window=None)-> bool:
window.set_frame(frame)

return True


def camera_image_to_simulation_plane_intersection(self, x: int, y: int, plane_n: np.ndarray, plane_d: float) -> list[float]:
"""
Calculate the position of the object in our frame space by projecting the ray from the camera to the image point on a plane.

Args
x,y: int
The pixel coordinates

plane_n: np.ndarray
The normal vector of the plane

plane_d: float
The distance of the plane from the origin along its normal vector

Return:
position: numpy.ndarray
The real world coordinates of the object in the Emio frame space
"""
ray_world = self.camera_image_to_simulation(x, y, 1.0) - self.t
camera_pos_world = self.t
denom = plane_n.dot(ray_world)

if abs(denom) < 1e-6:
raise ValueError("Ray is parallel to the plane")
s = - (plane_n.dot(camera_pos_world) + plane_d) / denom
result = camera_pos_world + s * ray_world
return [result[0], result[1], result[2]]


def camera_image_to_simulation(self, x: int, y: int, depth: float) -> list[float]:
"""
Expand Down
29 changes: 27 additions & 2 deletions emioapi/emiocamera.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class EmioCamera:
_running: bool = False
_parameter: dict = None
_trackers_pos: list = []
_trackers_pos_image: list = []
_point_cloud: np.ndarray = None
_hsv_frame: np.ndarray = None
_mask_frame: np.ndarray = None
Expand Down Expand Up @@ -261,6 +262,19 @@ def trackers_pos(self) -> list:
else:
return []

@property
def trackers_pos_image(self) -> list:
"""
Get the positions of the trackers in the image frame.
Returns:
list: The positions of the trackers in the image frame as a list of lists.
"""
with self._lock:
if self._tracking:
return self._trackers_pos_image
else:
return []

@property
def point_cloud(self) -> np.ndarray:
"""
Expand Down Expand Up @@ -462,7 +476,17 @@ def update(self):
Update the camera frames and tracking elements (markers and point cloud)
"""
if self._camera is not None:
self._camera.update()
if self.get_frame():
self.process_frame()

def get_frame(self) -> bool:
if self._camera is not None:
return self._camera.get_frame()
return False

def process_frame(self):
if self._camera is not None:
self._camera.process_frame()
with self._lock:
self._hsv_frame = self._camera.hsvFrame
self._mask_frame = self._camera.maskFrame
Expand All @@ -471,6 +495,7 @@ def update(self):
for p_camera in self._camera.trackers_pos:
p_emio = [p_camera[0], p_camera[1], p_camera[2], 0, 0, 0, 1]
self._trackers_pos.append(p_emio[0:3])
self._trackers_pos_image = self._camera.trackers_pos_image.copy()
logger.debug(f"Trackers positions in camera frame: {self._camera.trackers_pos}, converted to Emio frame: {self._trackers_pos}")
if self._compute_point_cloud:
self._point_cloud = self._camera.point_cloud
Expand All @@ -482,4 +507,4 @@ def close(self):
self._running = False
if self._camera is not None:
self._camera.close()
#endregion
#endregion
Loading
Loading