-
Notifications
You must be signed in to change notification settings - Fork 1
Track Masking & Edge Detection w/ Angle Calcs #32
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
1a23677
81dc744
67b695c
7a60990
e9b7b6a
bd6ce0c
86a132a
72a130f
940e99d
1932386
397cc22
4dd9413
9681275
f5a9fb1
c567c27
bf649ac
9983c48
785f58a
cd4955c
683d961
a25a457
fedf3c4
482f05d
5ddef6a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,234 @@ | ||
| import sys | ||
| import os | ||
| sys.path.insert(0, os.path.dirname(__file__)) | ||
|
|
||
| import cv2 as cv | ||
| import numpy as np | ||
| import utils | ||
|
|
||
| KERNEL = np.ones((3,3), np.uint8) | ||
| LOWER_RED = np.array([0,0,70]) | ||
| UPPER_RED = np.array([200,50,255]) | ||
|
|
||
| class AngleFinder: | ||
| def __init__(self, node): | ||
| self.prev_right = None | ||
| self.prev_left = None | ||
| self.logger = node | ||
|
|
||
| # @param vid: Video | ||
| # @param debug: Draws lines on image | ||
| # @param percent: Percent of the road (top down) | ||
| # @param pixel_range: Range of pixels to check around previous pixel for O(1) lookup | ||
| # @param pic_offset: Pixel offset from edge of image | ||
| # @ret video or None | ||
| def get_video_mask(self, vid, debug=False, percent=0.0, pixel_range=3, pic_offset=5): | ||
| if vid is None: | ||
| self.logger.error("Error opening video") | ||
| return None | ||
|
|
||
| r, f = vid.read() | ||
| if not r: | ||
| self.logger.error("Can't get initial video frame") | ||
| return None | ||
| h, w = f.shape[:2] | ||
| height, width = h - 1 - pic_offset, w - 1 - pic_offset | ||
|
|
||
| fps = vid.get(cv.CAP_PROP_FPS) | ||
| if fps == 0: | ||
| fps = 30 | ||
|
|
||
| video = cv.VideoWriter("labeled_video.mp4", cv.VideoWriter_fourcc(*'mp4v'), fps, (w, h), isColor=False) | ||
| vid.set(cv.CAP_PROP_POS_FRAMES, 0) | ||
|
|
||
| while (True): | ||
| ret, frame = vid.read() | ||
|
|
||
| if not ret: | ||
| break | ||
|
|
||
| result, cur_right, cur_left = self.get_img_mask(frame, debug=debug, percent=percent, pic_offset=pic_offset, pixel_range=pixel_range) | ||
|
|
||
| presult = np.zeros((h, w), dtype=result.dtype) | ||
|
|
||
| height = h - 1 - pic_offset | ||
| width = w - 1 - pic_offset | ||
| y_index = int(height * percent) | ||
|
|
||
| presult[y_index:height, pic_offset:width] = result | ||
|
|
||
| # Write debug info on video | ||
| if debug: | ||
| right_deg = utils.get_angle((width, pic_offset), (width // 2, pic_offset), cur_right) | ||
| left_deg = utils.get_angle((pic_offset, pic_offset), (width // 2, pic_offset), cur_left) | ||
|
|
||
| if right_deg is None: | ||
| right_deg = -1 | ||
|
|
||
| if left_deg is None: | ||
| left_deg = -1 | ||
|
|
||
| cv.putText(presult, f"Previous left: {self.prev_left}", (300, 200), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) | ||
| cv.putText(presult, f"Previous right: {self.prev_right}", (900, 200), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) | ||
|
|
||
| cv.putText(presult, f"Current left: {cur_left}", (300, 400), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) | ||
| cv.putText(presult, f"Current right: {cur_right}", (900, 400), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) | ||
|
|
||
|
|
||
| cv.putText(presult, f"{right_deg:.1f}", (1300, 100), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) | ||
| cv.putText(presult, f"{left_deg:.1f}", (100, 100), cv.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2) | ||
|
|
||
| video.write(presult) | ||
|
|
||
| vid.release() | ||
| video.release() | ||
| cv.destroyAllWindows() | ||
|
|
||
| # @param img: Normal image | ||
| # @param debug: Draws lines on image | ||
| # @param percent: Percent of the road to cutoff (top down) | ||
| # @param pixel_range: Range of pixels to check around previous pixel for O(1) lookup | ||
| # @param pic_offset: Pixel offset from edge of image | ||
| # @ret image angles or None | ||
| def get_img_angles(self, img, debug=False, percent=0.0, pixel_range=3, pic_offset=5): | ||
| image, right, left = self.get_img_mask(img, debug=debug, percent=percent, pixel_range=pixel_range, pic_offset=pic_offset) | ||
|
|
||
| if image is None or right is None or left is None: | ||
| return (None, None) | ||
|
|
||
| w = img.shape[1] | ||
| width = w - 1 - pic_offset | ||
|
|
||
| right_deg = utils.get_angle((width, pic_offset), (width // 2, pic_offset), right) | ||
| left_deg = utils.get_angle((pic_offset, pic_offset), (width // 2, pic_offset), left) | ||
|
|
||
| if right_deg is None or left_deg is None: | ||
| return (None, None) | ||
|
|
||
| return (right_deg, left_deg) | ||
|
|
||
|
|
||
| # @param img: Normal image | ||
| # @param debug: Draws lines on image | ||
| # @param percent: Percent of the road to cutoff (top down) | ||
| # @param pixel_range: Range of pixels to check around previous pixel for O(1) lookup | ||
| # @param pic_offset: Pixel offset from edge of image | ||
| # @ret Masked image mage or None | ||
| def get_img_mask(self, img: np.ndarray, debug=False, percent=0.0, pixel_range=3, pic_offset=5): | ||
| if img is None: | ||
| self.error.logger('Error opening image!') | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is incorrect and will crash in this case |
||
| return None, None, None | ||
|
|
||
| # Roi mask for a portion of image | ||
| h, w = img.shape[:2] | ||
| height, width = h - 1 - pic_offset, w - 1 - pic_offset | ||
| y_index = int(height * percent) | ||
|
|
||
| roi_image = img[y_index:height, pic_offset:width] | ||
|
|
||
| # Get hue mask | ||
| image_hsv = utils.convert_bgr_to_hsv(roi_image) | ||
| mask_red = cv.inRange(image_hsv, LOWER_RED, UPPER_RED) | ||
| result = cv.morphologyEx(mask_red, cv.MORPH_OPEN, KERNEL) | ||
|
|
||
| if self.prev_right is None: | ||
| self.prev_right = (width, height) | ||
|
|
||
| if self.prev_left is None: | ||
| self.prev_left = (pic_offset, height) | ||
|
|
||
| right = self.lookup_road_coord(result, self.prev_right, True, pic_offset=pic_offset, pixel_range=pixel_range) | ||
| left = self.lookup_road_coord(result, self.prev_left, False, pic_offset=pic_offset, pixel_range=pixel_range) | ||
|
|
||
| self.prev_right = right | ||
| self.prev_left = left | ||
|
|
||
| if not right or not left: | ||
| right = self.vectorized_road_coord(result, True) | ||
| left = self. vectorized_road_coord(result, False) | ||
|
|
||
| if debug: | ||
| utils.draw_lines(result, right, left) | ||
|
|
||
| return (result, right, left) | ||
|
|
||
| # Vectorized road lookup | ||
| # @param right_side: Determine what side of image | ||
| # @param pic_offset: Offset from edge of photo | ||
| def vectorized_road_coord(self, img, right_side, pic_offset=5): | ||
| h, w = img.shape[:2] | ||
| height, width = h - 1 - pic_offset, w - 1 - pic_offset | ||
|
|
||
| w_start = width if right_side else pic_offset | ||
| w_end = width // 2 | ||
| h_start = height | ||
| h_end = pic_offset | ||
|
|
||
| if img[h_start, w_start] != 0: | ||
| column = img[h_end:h_start + 1, w_start] | ||
| inv = np.logical_not(column[::-1]) | ||
| idx = np.argmax(inv) | ||
| return (w_start, h_start - idx) | ||
| else: | ||
| if right_side: | ||
| row = img[h_start, w_end:w_start + 1][::-1] | ||
| idx = np.argmax(row) | ||
| return (w_start - idx, h_start) | ||
| else: | ||
| row = img[h_start, w_start:w_end + 1] | ||
| idx = np.argmax(row) | ||
| return (w_start + idx, h_start) | ||
|
|
||
| # Give a previous pixel, look at at "pixel_range" number of | ||
| # pixel neighbors from previous pixel & determine if road exists | ||
| # | ||
| # param right_side: Bool determine if on right or left side of image | ||
| # param pixel_range: Number of neighbor pixels to check | ||
| # param pic_offset: Offset from edge of image | ||
| # ret: Pixel road coords otherwise None | ||
| def lookup_road_coord(self, img, prev_pixel, right_side, pixel_range=6, pic_offset=5): | ||
| h, w = img.shape[:2] | ||
| height, width = h - 1 - pic_offset, w - 1 - pic_offset | ||
| prev_width, prev_height, = prev_pixel[:2] | ||
|
|
||
| # Determine starting & ending pixels | ||
| if right_side: | ||
| h_start = max(pic_offset, prev_height - pixel_range) | ||
| h_end = min(height, prev_height + pixel_range) | ||
| h_step = 1 | ||
| w_start = min(width, prev_width + pixel_range) | ||
| w_end = max(width // 2, prev_width - pixel_range) | ||
| w_step = -1 | ||
| else: | ||
| h_start = max(pic_offset, prev_height - pixel_range) | ||
| h_end = min(height, prev_height + pixel_range) | ||
| h_step = 1 | ||
| w_start = max(pic_offset, prev_width - pixel_range) | ||
| w_end = min(width // 2, prev_width + pixel_range) | ||
| w_step = 1 | ||
|
|
||
| if (prev_height == height and prev_width == width): | ||
| if not img[height, w_end].any() and not img[h_start, width].any(): | ||
| return None | ||
|
|
||
|
|
||
| if (prev_width == width): | ||
| if img[h_start, width].any(): | ||
| return None | ||
| elif not img[h_end, width].any(): | ||
| return None | ||
| else: | ||
| for i in range(h_start, h_end + h_step, h_step): | ||
| if img[i, width].any(): | ||
| return (width, i) | ||
| elif (prev_height == height): | ||
| if img[height, w_start].any(): | ||
| return None | ||
| elif not img[height, w_end].any(): | ||
| return None | ||
| else: | ||
| for i in range(w_start, w_end + w_step, w_step): | ||
| if img[height, i].any(): | ||
| return (i, height) | ||
|
|
||
| return None | ||
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,7 @@ | ||
| import sys | ||
| import os | ||
| sys.path.insert(0, os.path.dirname(__file__)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not the ideal way of fixing the import issue. Take a look at pathfinder, it solves the same problem properly.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The issue with this is that it causes issues with colcon and ament. They move the files and expect a certain form, so this could cause unexpected errors later |
||
|
|
||
| import traceback | ||
|
|
||
| from cv_bridge import CvBridge | ||
|
|
@@ -10,9 +14,7 @@ | |
| from sensor_msgs.msg import Image | ||
| from std_msgs.msg import Float32MultiArray | ||
|
|
||
| from autonomous_kart.nodes.opencv_pathfinder.angle_calculator import ( | ||
| calculate_track_angles, | ||
| ) | ||
| from angle import AngleFinder | ||
|
|
||
|
|
||
| class OpenCVPathfinderNode(Node): | ||
|
|
@@ -51,6 +53,7 @@ def __init__(self): | |
| ) | ||
|
|
||
| self.logger.info("Pathfinder Node started - subscribed to /camera/image_raw") | ||
| self.angle_finder = AngleFinder(self) | ||
|
|
||
| def image_callback(self, msg): | ||
| frame = self.bridge.imgmsg_to_cv2(msg, "passthrough") | ||
|
|
@@ -70,13 +73,13 @@ def image_callback(self, msg): | |
| self.frames_since_last_log = 0 | ||
|
|
||
| # Publish angles | ||
| angles = calculate_track_angles(frame) | ||
| if not self.angle_msg: | ||
| self.angle_msg = Float32MultiArray(data=angles) | ||
| else: | ||
| # self.angle_pub.publish(Float32MultiArray(data=angles)) | ||
| self.angle_msg.data = angles | ||
| self.angle_pub.publish(self.angle_msg) | ||
| right_angle, left_angle = self.angle_finder.get_img_angles(frame, percent=0.0) | ||
| msg = Float32MultiArray() | ||
| msg.data = [ | ||
| float(right_angle) if right_angle is not None else float('nan'), | ||
| float(left_angle) if left_angle is not None else float('nan'), | ||
| ] | ||
| self.angle_pub.publish(msg) | ||
|
|
||
|
|
||
| def main(args=None): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,93 @@ | ||
| import cv2 as cv | ||
| import numpy as np | ||
| import math | ||
|
|
||
| # Angle calculation | ||
| def get_angle(zero_coord, middle_coord, road_coord): | ||
| a = get_distance(road_coord, zero_coord) | ||
| b = get_distance(zero_coord, middle_coord) | ||
| c = get_distance(middle_coord, road_coord) | ||
|
|
||
| numerator = a*a - c*c - b*b | ||
| denominator = -2 * c * b | ||
|
|
||
| if denominator == 0: | ||
| return None | ||
|
|
||
| rads = math.acos( numerator / denominator) | ||
| return math.degrees(rads) | ||
|
|
||
| # Distance calculation | ||
| def get_distance(point1, point2): | ||
| dx = point1[0] - point2[0] | ||
| dy = point1[1] - point2[1] | ||
| return math.sqrt(dx*dx + dy *dy) | ||
|
|
||
| # @param r_coord: Right road coord | ||
| # @param l_coord: Left road coord | ||
| # @ret: Return img with lines | ||
| def draw_lines(img, r_coord, l_coord, middle=None): | ||
| if not middle: | ||
| middle = img.shape[1] | ||
| middle: int = middle // 2 | ||
|
|
||
|
|
||
| cv.line(img, (middle, 0), r_coord, (200, 0, 200), 3) | ||
| cv.line(img, (middle, 0), l_coord, (200, 0, 200), 3) | ||
| return img | ||
|
|
||
|
|
||
| # @param: Image | ||
| # @ret: Adaptive Gaussain threshold | ||
| def get_threshold(img): | ||
| return cv.adaptiveThreshold(img,255,cv.ADAPTIVE_THRESH_GAUSSIAN_C, cv.THRESH_BINARY,11,2) | ||
|
|
||
|
|
||
| # @param: Threshold | ||
| # @ret: Traditional contours | ||
| def get_contours(thresh): | ||
| contours, im = cv.findContours(thresh, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE) | ||
|
|
||
| return contours | ||
|
|
||
| # @param: Photo matrix | ||
| # @ret: HSV representation | ||
| def convert_bgr_to_hsv(img: np.ndarray): | ||
| return cv.cvtColor(img, cv.COLOR_BGR2HSV) | ||
|
|
||
| # @param: Photo matrix | ||
| # @ret: Grayscale representation | ||
| def convert_bgr_to_greyscale(img: np.ndarray): | ||
| return cv.cvtColor(img, cv.COLOR_BGR2GRAY) | ||
|
|
||
| # @param: Takes a str file path | ||
| # @ret: Return a video | ||
| def get_video(path: str): | ||
| return cv.VideoCapture(path) | ||
|
|
||
| # @param: Takes a str file path | ||
| # @ret: Photo as a matrix | ||
| def get_image(path: str): | ||
| return cv.imread(cv.samples.findFile(path), cv.IMREAD_COLOR) | ||
|
|
||
| # Logic for closing image window | ||
| def display_img(img): | ||
| cv.imshow("Window", img) | ||
|
|
||
| while True: | ||
| if (cv.waitKey(1) == 13 or cv.getWindowProperty("Window", cv.WND_PROP_VISIBLE) < 1): | ||
| cv.destroyAllWindows() | ||
| return | ||
|
|
||
| # @param: Image | ||
| # @ret: Gaussian image | ||
| def get_gaussian_img(img): | ||
| h, w = img.shape[:2] | ||
| gradient = np.linspace(0, 1, h).reshape(h, 1) | ||
| gradient = np.repeat(gradient, w, axis=1) | ||
| gradient = cv.merge([gradient]*3).astype(np.float32) | ||
|
|
||
| img_float = img.astype(np.float32) | ||
| img_grad = img_float * gradient | ||
| img_grad = np.clip(img_grad, 0, 255).astype(np.uint8) | ||
| return img_grad |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will crash. Nodes don't have .error or .info. Look into how its done in other nodes