diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a1865ae --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +week11/calibration/__pycache__/intrinsic_calibrator.cpython-310.pyc +week11/calibration/__pycache__/stereo_calibrator.cpython-310.pyc diff --git a/calibration.md b/calibration.md new file mode 100644 index 0000000..38bc438 --- /dev/null +++ b/calibration.md @@ -0,0 +1,44 @@ +# Calibration + +## Intrinsic Calibration + +❗️**LEFT: 1, RIGHT: 0**❗️ + +왼쪽 카메라 Intrinsic Calibration +```bash +python3 intrinsic_capture.py --cam_id 1 # Left +``` +![](./week11/calibration/asset/Intrinsic.png) + +`C`를 누르면 캡쳐가 되고, `Q`를 누르면 종료 및 캘리브레이션 실행 +다양한 거리, 각도로 캡쳐하여 최소 20장의 사진을 취득하세요. + +완료 되었으면, 반대쪽도 마찬가지로 캘리브레이션을 수행하세요. +```bash +python3 intrinsic_capture.py --cam_id 0 # Right +``` + +## Extrinsic Calibration + +아래 명령어를 실행하여 extrinsic calibration을 완료하세요. +```bash +python3 stereo_capture.py +``` +![](./week11/calibration/asset/Extrinsic.png) + + +## Rectification + +캘리브레이션이 성공적으로 완료되었는지 확인하세요. +아래 명령어로, rectify된 이미지를 실시간으로 확인할 수 있습니다. +```bash +python3 stereo_preview.py +``` + +아래 명령어로 이미 저장된 이미지를 rectify할 수 있습니다. +파일을 열어 이미지가 있는 경로를 수정하세요. + +```bash +python3 rectify_images.py +``` +![](./week11/calibration/asset/Rectify.png) \ No newline at end of file diff --git a/week11/calibration/asset/Extrinsic.png b/week11/calibration/asset/Extrinsic.png new file mode 100644 index 0000000..a193114 Binary files /dev/null and b/week11/calibration/asset/Extrinsic.png differ diff --git a/week11/calibration/asset/Intrinsic.png b/week11/calibration/asset/Intrinsic.png new file mode 100644 index 0000000..b3a2b5c Binary files /dev/null and b/week11/calibration/asset/Intrinsic.png differ diff --git a/week11/calibration/asset/Rectify.png b/week11/calibration/asset/Rectify.png new file mode 100644 index 0000000..c4d9b2b Binary files /dev/null and b/week11/calibration/asset/Rectify.png differ diff --git a/week11/calibration/intrinsic_calibrator.py b/week11/calibration/intrinsic_calibrator.py new file mode 100644 index 0000000..7f1f03e --- /dev/null +++ b/week11/calibration/intrinsic_calibrator.py @@ -0,0 +1,344 @@ +# Filename: intrinsic_calibrator.py +# Description: Intrinsic Calibration from captured single camera images, saves result to YAML. + +import sys +import os +import glob +import argparse +import numpy as np +import cv2 +import yaml +from pathlib import Path + +class IntrinsicCalibrator: + """ + 단일 카메라의 intrinsic calibration을 수행하고 결과를 관리하는 클래스. + """ + def __init__(self, image_dir: str, grid_x: int, grid_y: int, grid_size: float, camera_id: int): + """ + IntrinsicCalibrator 클래스를 초기화합니다. + + Args: + image_dir (str): 캘리브레이션 이미지가 저장된 디렉터리 경로. + grid_x (int): 체커보드 내부 코너의 x 방향 개수. + grid_y (int): 체커보드 내부 코너의 y 방향 개수. + grid_size (float): 한 체커보드 정사각형의 실제 크기 (단위: 사용자 정의). + camera_id (int): 이 캘리브레이션이 속한 카메라의 ID. + """ + if not os.path.isdir(image_dir): + raise FileNotFoundError(f"Image directory not found: {image_dir}") + + self.image_dir = image_dir + self.grid_x = grid_x + self.grid_y = grid_y + self.grid_size = grid_size + self.camera_id = camera_id + + # 캘리브레이션 결과 저장 변수 + self.camera_matrix = None + self.distortion_coefficients = None + self.image_size = None + self.num_images_used = 0 + self.reprojection_error = None + + def detect_corners(self, use_sb_alg=True): + """ + 초기화된 이미지 디렉터리에서 체커보드 코너를 검출합니다. + + Args: + use_sb_alg (bool): findChessboardCornersSB 알고리즘 사용 여부. + + Returns: + tuple: (objpoints, imgpoints, imgsize) + objpoints: 각 이미지에 대한 3D 월드 좌표 리스트. + imgpoints: 각 이미지에 대한 검출된 2D 이미지 좌표 리스트. + imgsize: 첫 번째 이미지 기준 해상도 (width, height). + """ + objp = np.zeros((self.grid_x * self.grid_y, 3), np.float32) + objp[:, :2] = np.mgrid[0:self.grid_x, 0:self.grid_y].T.reshape(-1, 2) * self.grid_size + + objpoints = [] + imgpoints = [] + + images = sorted(glob.glob(os.path.join(self.image_dir, "*.png"))) + if not images: + images = sorted(glob.glob(os.path.join(self.image_dir, "*.jpg"))) + + if not images: + print(f"Warning: No images found in directory: {self.image_dir}") + return [], [], (0, 0) + + imgsize = (0, 0) + print(f"Found {len(images)} images in {self.image_dir}. Processing...") + + for fname in images: + # print(f"Processing {os.path.basename(fname)} ...", end=" ") + img = cv2.imread(fname) + if img is None: + # print("Failed to load") # 너무 많이 출력될 수 있으므로 주석 처리 + continue + + if imgsize == (0, 0): + imgsize = (img.shape[1], img.shape[0]) + print(f"\nDetected image size: {imgsize}") + elif (img.shape[1], img.shape[0]) != imgsize: + # print(f"Warning: Image size mismatch for {os.path.basename(fname)}. Expected {imgsize}, skipping.") + continue # 크기 불일치 이미지 건너뛰기 + + + gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + + # 체커보드 코너 검출 + if use_sb_alg: + ret, corners = cv2.findChessboardCornersSB(gray, (self.grid_x, self.grid_y)) + else: + flags = cv2.CALIB_CB_ADAPTIVE_THRESH | cv2.CALIB_CB_NORMALIZE_IMAGE + ret, corners = cv2.findChessboardCorners(gray, (self.grid_x, self.grid_y), flags) + if ret: + criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 300, 1e-6) + corners = cv2.cornerSubPix(gray, corners, (11, 11), (-1, -1), criteria) + + if ret: + # print("Success") + objpoints.append(objp) + imgpoints.append(corners) + # else: + # print("Failed") # 너무 많이 출력될 수 있으므로 주석 처리 + + print(f"Finished corner detection. Found corners in {len(objpoints)} images.") + return objpoints, imgpoints, imgsize + + def calibrate_camera(self, objpoints, imgpoints, imgsize): + """ + 수집된 데이터로 단일 카메라 intrinsic calibration을 수행합니다. + + Args: + objpoints: 3D 월드 좌표 리스트. + imgpoints: 2D 이미지 좌표 리스트. + imgsize: 이미지 해상도 (width, height). + + Returns: + tuple: (mtx, dist, mean_error) + mtx: 카메라 매트릭스 (intrinsic matrix). + dist: 왜곡 계수. + mean_error: 평균 reprojection error. + """ + if len(objpoints) == 0: + print("Error: No points to calibrate.") + return None, None, None + + print("Performing camera calibration...") + flags = cv2.CALIB_RATIONAL_MODEL if len(imgpoints) > 0 and len(imgpoints[0]) is not None and len(imgpoints[0][0]) > 5 else cv2.CALIB_TILTED_MODEL + # Add CALIB_ZERO_TANGENT_DIST, CALIB_FIX_PRINCIPAL_POINT if applicable + # flags |= cv2.CALIB_ZERO_TANGENT_DIST # 접선 왜곡 무시 + # flags |= cv2.CALIB_FIX_PRINCIPAL_POINT # 주점 고정 (보통 이미지 중앙) + + + ret, mtx, dist, rvecs, tvecs = cv2.calibrateCamera(objpoints, imgpoints, imgsize, None, None, flags=flags, criteria=(cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 300, 1e-6)) + + if not ret: + raise RuntimeError("Calibration failed.") + + # 캘리브레이션 결과의 평균 reprojection error 계산 + mean_error = 0 + for i in range(len(objpoints)): + # None 체크 추가: rvecs 또는 tvecs가 None일 수 있는 경우 대비 + if rvecs is not None and tvecs is not None and i < len(rvecs) and i < len(tvecs): + imgpoints2, _ = cv2.projectPoints(objpoints[i], rvecs[i], tvecs[i], mtx, dist) + # imgpoints[i]가 None이 아니고 imgpoints2도 None이 아닌 경우에만 norm 계산 + if imgpoints[i] is not None and imgpoints2 is not None and len(imgpoints[i]) > 0: + error = cv2.norm(imgpoints[i], imgpoints2, cv2.NORM_L2) / len(imgpoints[i]) + mean_error += error + # else: + # print(f"Warning: Skipping error calculation for image {i} due to empty points or projection failure.") + # else: + # print(f"Warning: Skipping error calculation for image {i} due to missing rvec/tvec.") + + + if len(objpoints) > 0: + mean_error /= len(objpoints) + else: + mean_error = float('inf') # 점이 없으면 에러 무한대 + + print(f"\nAverage reprojection error: {mean_error}") + if mean_error > 1.0: + print("Warning: High reprojection error. Calibration results may not be accurate.") + + + return mtx, dist, mean_error + + def calibrate(self, use_sb_alg=True): + """ + 코너 검출부터 캘리브레이션 계산까지 전체 과정을 실행하고 결과를 클래스 내부에 저장합니다. + """ + print(f"--- Starting Intrinsic Calibration for camera {self.camera_id} from images in: {self.image_dir} ---") + objpoints, imgpoints, imgsize = self.detect_corners(use_sb_alg) + + if len(objpoints) == 0: + print("Error: No valid chessboard corners were detected in any image. Calibration skipped.") + self.camera_matrix = None + self.distortion_coefficients = None + self.image_size = imgsize # 이미지 사이즈는 찾았을 수 있으므로 저장 + self.num_images_used = 0 + self.reprojection_error = None + return # 캘리브레이션 수행 불가 + + mtx, dist, mean_error = self.calibrate_camera(objpoints, imgpoints, imgsize) + + if mtx is not None: + self.camera_matrix = mtx + self.distortion_coefficients = dist + self.image_size = imgsize + self.num_images_used = len(objpoints) + self.reprojection_error = mean_error + print("\nCalibration results stored in the object.") + else: + print("\nCalibration calculation failed. Results not stored.") + self.camera_matrix = None + self.distortion_coefficients = None + # 이미지 사이즈 등은 detect_corners에서 가져온 값 유지 + self.num_images_used = 0 + self.reprojection_error = None + + + def save_to_yaml(self, output_file: str = "intrinsic_calibration.yaml"): + """ + 클래스 내부에 저장된 intrinsic calibration 결과를 YAML 파일로 저장합니다. + """ + if self.camera_matrix is None or self.distortion_coefficients is None: + print("Error: No valid calibration data to save. Run calibrate() first.") + return False + + def convert_numpy(obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + return obj + + calib_data = { + "camera_id": self.camera_id, + "camera_matrix": convert_numpy(self.camera_matrix), + "distortion_coefficients": convert_numpy(self.distortion_coefficients), + "image_width": self.image_size[0], + "image_height": self.image_size[1], + "pattern_size_width": self.grid_x, + "pattern_size_height": self.grid_y, + "square_size_meters": self.grid_size, # 실제 단위는 사용자에 따라 다름 + "num_images_used": self.num_images_used, + "reprojection_error": self.reprojection_error, + } + + output_path = Path(output_file) + output_path.parent.mkdir(parents=True, exist_ok=True) + try: + with open(output_path, 'w') as outfile: + yaml.dump(calib_data, outfile, default_flow_style=False, indent=4, sort_keys=False) + print(f"Intrinsic calibration results saved successfully to {output_path}") + return True + except Exception as e: + print(f"Error saving intrinsic calibration data to {output_path}: {e}") + return False + + @classmethod + def load_from_yaml(cls, yaml_file: str): + """ + YAML 파일에서 intrinsic calibration 결과를 불러와 IntrinsicCalibrator 객체를 생성합니다. + (로드된 데이터만 포함하며, 이미지 디렉터리 정보 등은 포함되지 않음) + """ + yaml_path = Path(yaml_file) + if not yaml_path.exists(): + print(f"Error: Intrinsic calibration file not found at {yaml_file}") + return None + + try: + with open(yaml_path, 'r') as f: + calib_data = yaml.safe_load(f) + + camera_id = calib_data.get('camera_id') + camera_matrix = np.array(calib_data.get('camera_matrix')) + distortion_coefficients = np.array(calib_data.get('distortion_coefficients')) + image_width = calib_data.get('image_width') + image_height = calib_data.get('image_height') + grid_x = calib_data.get('pattern_size_width') + grid_y = calib_data.get('pattern_size_height') + grid_size = calib_data.get('square_size_meters') + num_images_used = calib_data.get('num_images_used') + reprojection_error = calib_data.get('reprojection_error') + + + if camera_matrix is None or distortion_coefficients is None or image_width is None or image_height is None: + print(f"Error: Missing essential data in intrinsic calibration file {yaml_file}") + return None + + if distortion_coefficients.ndim == 1: + distortion_coefficients = distortion_coefficients.reshape(1, -1) + + print(f"Intrinsic calibration data loaded successfully from {yaml_file}.") + + # IntrinsicCalibrator 객체를 생성하고 로드된 데이터를 채웁니다. + # 이미지 디렉터리는 이 시점에서는 알 수 없으므로 None 또는 빈 문자열로 설정합니다. + loaded_calibrator = cls( + image_dir="", # 이미지 디렉터리 정보는 파일에 저장되지 않으므로 빈 값 + grid_x=grid_x if grid_x is not None else 0, # 저장되지 않았다면 기본값 또는 0 + grid_y=grid_y if grid_y is not None else 0, + grid_size=grid_size if grid_size is not None else 0.0, + camera_id=camera_id if camera_id is not None else -1 # 저장되지 않았다면 -1 + ) + loaded_calibrator.camera_matrix = camera_matrix + loaded_calibrator.distortion_coefficients = distortion_coefficients + loaded_calibrator.image_size = (image_width, image_height) + loaded_calibrator.num_images_used = num_images_used if num_images_used is not None else 0 + loaded_calibrator.reprojection_error = reprojection_error if reprojection_error is not None else float('inf') + + + return loaded_calibrator + + except Exception as e: + print(f"Error loading intrinsic calibration data from {yaml_file}: {e}") + print("This error likely means the YAML file is not in the format saved by this script.") + return None + + +def main(): + """ + 스크립트 단독 실행 시 Intrinsic Calibration을 수행하는 main 함수. + """ + parser = argparse.ArgumentParser(description="Intrinsic Calibration from Captured Images (YAML Output)") + parser.add_argument('--image_dir', type=str, required=True, + help="Directory containing captured chessboard images") + parser.add_argument('--camera_id', type=int, default=0, + help="ID of the camera these images belong to (saved in YAML)") + parser.add_argument('--grid_x', type=int, default=10, + help="Number of internal corners in x dimension") + parser.add_argument('--grid_y', type=int, default=7, + help="Number of internal corners in y dimension") + parser.add_argument('--grid_size', type=float, default=0.025, + help="Size of one square on the chessboard (in real-world units)") + parser.add_argument('--use_sb_alg', action='store_true', + help="Use the findChessboardCornersSB algorithm") + parser.add_argument('--output_yaml', type=str, default="intrinsic_calibration.yaml", + help="Output YAML file to save calibration results") + args = parser.parse_args() + + # Check if image directory exists + if not os.path.isdir(args.image_dir): + print(f"Error: Image directory not found at {args.image_dir}") + sys.exit(1) + + # IntrinsicCalibrator 객체 생성 및 캘리브레이션 실행 + calibrator = IntrinsicCalibrator( + image_dir=args.image_dir, + grid_x=args.grid_x, + grid_y=args.grid_y, + grid_size=args.grid_size, + camera_id=args.camera_id # camera_id 전달 + ) + + calibrator.calibrate(args.use_sb_alg) # 캘리브레이션 수행 + + # 결과 저장 + if calibrator.camera_matrix is not None: # 캘리브레이션 성공 시 + calibrator.save_to_yaml(args.output_yaml) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/week11/calibration/intrinsic_capture.py b/week11/calibration/intrinsic_capture.py new file mode 100644 index 0000000..221ae0b --- /dev/null +++ b/week11/calibration/intrinsic_capture.py @@ -0,0 +1,403 @@ +# Filename: intrinsic_capture.py +# Description: Capture images for intrinsic calibration from a single camera and auto-calibrate. + +import sys +import os +import time +import argparse +from datetime import datetime + +import cv2 +import numpy as np + +import gi +gi.require_version('Gst', '1.0') +gi.require_version('GstApp', '1.0') +from gi.repository import Gst, GLib +from pathlib import Path + +# GStreamer 초기화는 main에서 수행합니다. + +# intrinsic_calibrator.py 스크립트에서 IntrinsicCalibrator 클래스를 가져옵니다. +# 이 스크립트와 intrinsic_calibrator.py는 같은 디렉터리에 있어야 합니다. +try: + from intrinsic_calibrator import IntrinsicCalibrator +except ImportError: + print("Error: Could not import IntrinsicCalibrator from intrinsic_calibrator.py.") + print("Please ensure intrinsic_calibrator.py is in the same directory.") + IntrinsicCalibrator = None # 임포트 실패 시 클래스를 None으로 설정 + + +class IntrinsicCapturer: + """ + 단일 카메라에서 intrinsic calibration용 이미지를 캡쳐하고 저장하는 클래스. + """ + def __init__(self, camera_id: int, cam_mode: int, hflip: bool, vflip: bool, + width: int, height: int, fps: int, output_dir: str, + pattern_width: int, pattern_height: int, grid_size: float): + """ + IntrinsicCapturer 클래스를 초기화합니다. + + Args: + camera_id (int): 사용할 카메라의 sensor-id. + cam_mode (int): GStreamer 카메라 센서 모드. + hflip (bool): 수평 반전 적용 여부. + vflip (bool): 수직 반전 적용 여부. + width (int): 이미지/파이프라인 해상도 너비. + height (int): 이미지/파이프라인 해상도 높이. + fps (int): 파이프라인 프레임 레이트. + output_dir (str): 캡쳐 이미지 저장 디렉터리 경로. + pattern_width (int): 체커보드 내부 코너 x 개수 (미리보기 표시용). + pattern_height (int): 체커보드 내부 코너 y 개수 (미리보기 표시용). + grid_size (float): 체커보드 한 칸의 실제 크기 (캘리브레이터 전달용). + """ + self.camera_id = camera_id + self.cam_mode = cam_mode + self.hflip = hflip + self.vflip = vflip + self.width = width + self.height = height + self.fps = fps + self.output_dir = output_dir + self.pattern_size = (pattern_width, pattern_height) + self.grid_size = grid_size # 캘리브레이터에 전달할 값 + self.img_size = (width, height) + + self.pipeline = None + self.appsink = None + + def _link_elements(self, *elements): + """여러 GStreamer 요소를 순차적으로 연결하는 헬퍼 함수.""" + for i in range(len(elements) - 1): + if not elements[i].link(elements[i+1]): + print(f"Failed to link {elements[i].name} to {elements[i+1].name}") + return False + return True + + def build_pipeline(self): + """GStreamer 파이프라인을 생성하고 앱싱크를 반환합니다.""" + # 1) DeepStream 전용 nvvideoconvert 유무 검사 + if Gst.ElementFactory.find('nvvideoconvert'): + convert_plugin = 'nvvideoconvert' + deepstream_available = True + elif Gst.ElementFactory.find('nvvidconv'): + convert_plugin = 'nvvidconv' + deepstream_available = False + else: + print("Error: Neither 'nvvideoconvert' nor 'nvvidconv' plugins found! " + "Please install DeepStream or Jetson GStreamer extensions.") + return False + + print(f"🔌 Video converter plugin: {convert_plugin} " + f"(DeepStream available: {deepstream_available})") + + try: + # GStreamer 요소 생성 + pipeline_name = f"pipeline_cam{self.camera_id}_capture_intrinsic" + self.pipeline = Gst.Pipeline.new(pipeline_name) + + src = Gst.ElementFactory.make('nvarguscamerasrc', f'source_{self.camera_id}') + queue1 = Gst.ElementFactory.make('queue', f'queue1_{self.camera_id}') + caps_filter = Gst.ElementFactory.make('capsfilter', f'caps_filter_{self.camera_id}') + queue2 = Gst.ElementFactory.make('queue', f'queue2_{self.camera_id}') + # 여기서 검사한 플러그인 이름을 사용 + video_convert = Gst.ElementFactory.make(convert_plugin, f'video_convert_{self.camera_id}') + queue3 = Gst.ElementFactory.make('queue', f'queue3_{self.camera_id}') + caps_filter2 = Gst.ElementFactory.make('capsfilter', f'caps_filter2_{self.camera_id}') + queue4 = Gst.ElementFactory.make('queue', f'queue4_{self.camera_id}') + self.appsink = Gst.ElementFactory.make('appsink', f'appsink_{self.camera_id}') + + + # src 설정 + Gst.util_set_object_arg(src, "sensor-id", f"{self.camera_id}") + Gst.util_set_object_arg(src, "bufapi-version", "true") + Gst.util_set_object_arg(src, "sensor-mode", f"{self.cam_mode}") + + # 첫번째 capsfilter: NVMM 메모리, 해상도, FPS 설정 + caps_str = f"video/x-raw(memory:NVMM), width=(int){self.width}, height=(int){self.height}, framerate=(fraction){self.fps}/1" + Gst.util_set_object_arg(caps_filter, "caps", caps_str) + + # nvvideoconvert: flip 옵션 설정 + if self.hflip and self.vflip: + Gst.util_set_object_arg(video_convert, "flip-method", "2") # 180도 회전 + elif self.hflip: + Gst.util_set_object_arg(video_convert, "flip-method", "4") # 수평 반전 + elif self.vflip: + Gst.util_set_object_arg(video_convert, "flip-method", "6") # 수직 반전 + else: + Gst.util_set_object_arg(video_convert, "flip-method", "0") # no flip + + # 두번째 capsfilter: OpenCV와 호환되는 포맷 (BGRx) + caps_str2 = "video/x-raw, format=(string)BGRx" + Gst.util_set_object_arg(caps_filter2, "caps", caps_str2) + + # appsink 설정: 항상 최신 프레임만 보관 (max-buffers=1, drop=True) + self.appsink.set_property("emit-signals", False) + self.appsink.set_property("max-buffers", 1) + self.appsink.set_property("drop", True) + # appsink의 caps를 설정하여 원하는 최종 포맷을 보장 (필수) + appsink_caps = Gst.Caps.from_string(f"video/x-raw, format=(string)BGRx, width=(int){self.width}, height=(int){self.height}") + self.appsink.set_property("caps", appsink_caps) + + + # 요소들을 파이프라인에 추가 + self.pipeline.add(src) + self.pipeline.add(queue1) + self.pipeline.add(caps_filter) + self.pipeline.add(queue2) + self.pipeline.add(video_convert) + self.pipeline.add(queue3) + self.pipeline.add(caps_filter2) + self.pipeline.add(queue4) + self.pipeline.add(self.appsink) + + + # 요소들을 순차적으로 연결 + if not self._link_elements(src, queue1, caps_filter, queue2, video_convert, queue3, caps_filter2, queue4, self.appsink): + print(f"GStreamer 요소들을 연결하지 못했습니다 (카메라 ID: {self.camera_id}).") + self.pipeline.set_state(Gst.State.NULL) + self.pipeline = None + self.appsink = None + return False # 파이프라인 빌드 실패 + + print(f"GStreamer pipeline built for camera {self.camera_id}.") + return True # 파이프라인 빌드 성공 + except Exception as e: + print(f"파이프라인 생성 중 오류 발생 (카메라 ID: {self.camera_id}): {e}") + self.pipeline = None + self.appsink = None + return False # 파이프라인 빌드 실패 + + + def start_pipeline(self): + """GStreamer 파이프라인을 PLAYING 상태로 시작합니다.""" + if self.pipeline is None: + print("Error: Pipeline not built. Cannot start.") + return False + print(f"Setting pipeline {self.pipeline.get_name()} to PLAYING state...") + ret = self.pipeline.set_state(Gst.State.PLAYING) + if ret == Gst.StateChangeReturn.FAILURE: + print(f"Error: Failed to set pipeline {self.pipeline.get_name()} to PLAYING state.") + return False + # PLAYING 상태로 전환될 때까지 약간 대기 + time.sleep(1.0) + print(f"Pipeline {self.pipeline.get_name()} is PLAYING.") + return True + + def stop_pipeline(self): + """GStreamer 파이프라인을 NULL 상태로 중지하고 해제합니다.""" + if self.pipeline: + print(f"Setting pipeline {self.pipeline.get_name()} to NULL state...") + self.pipeline.set_state(Gst.State.NULL) + self.pipeline = None + self.appsink = None + print(f"Pipeline set to NULL state.") + else: + print("Pipeline is already stopped or not built.") + + + def run_capture_loop(self): + """이미지 캡쳐 및 미리보기 루프를 실행합니다.""" + if self.appsink is None: + print("Error: Appsink not available. Cannot run capture loop.") + return 0 + + window_name = "Intrinsic Capture Preview (Press 'c' to Save, 'q' or ESC to Quit)" + cv2.namedWindow(window_name, cv2.WINDOW_NORMAL) + cv2.resizeWindow(window_name, self.img_size[0], self.img_size[1]) # 단일 이미지 보여줌 + + captured_count = 0 + instructions = "Press 'c' to capture, 'q' or ESC to quit." + print("\n--- Starting Intrinsic Capture Loop ---") + print(instructions) + + try: + while True: + sample = self.appsink.emit("pull-sample") + if sample is None: + time.sleep(0.001) + continue + + buffer = sample.get_buffer() + success, map_info = buffer.map(Gst.MapFlags.READ) + if not success: + print("Error mapping buffer") + buffer.unmap(map_info) + continue + + try: + # BGRx 포맷에서 BGR로 변환 (원본 프레임) + frame_bgr_raw = np.frombuffer(map_info.data, dtype=np.uint8).reshape((self.img_size[1], self.img_size[0], 4)) + frame_bgr_raw = cv2.cvtColor(frame_bgr_raw, cv2.COLOR_BGRA2BGR) + except Exception as e: + print("Frame processing error:", e) + buffer.unmap(map_info) + continue + buffer.unmap(map_info) + + # --- 미리보기 화면 구성 --- + frame_display = frame_bgr_raw.copy() + + # 체커보드 검출 (미리보기 표시용 - 원본 이미지에서 빠르게 시도) + gray_raw = cv2.cvtColor(frame_bgr_raw, cv2.COLOR_BGR2GRAY) + flags_find_corners = cv2.CALIB_CB_ADAPTIVE_THRESH | cv2.CALIB_CB_NORMALIZE_IMAGE | cv2.CALIB_CB_FAST_CHECK + ret_preview, corners_preview = cv2.findChessboardCorners(gray_raw, self.pattern_size, flags_find_corners) + + if ret_preview: + cv2.drawChessboardCorners(frame_display, self.pattern_size, corners_preview, ret_preview) + + # 코너 찾기 상태 텍스트 오버레이 (미리보기용) + corner_status_text = "Chessboard: FOUND" if ret_preview else "Chessboard: NOT FOUND" + corner_status_color = (0, 255, 0) if ret_preview else (0, 0, 255) + + # 텍스트 오버레이 + cv2.putText(frame_display, instructions, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2) + cv2.putText(frame_display, f"Captured: {captured_count}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2) + cv2.putText(frame_display, corner_status_text, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.8, corner_status_color, 2) + + cv2.imshow(window_name, frame_display) + + key = cv2.waitKey(1) & 0xFF + + if key == ord('c'): + print(f"Capturing frame {captured_count+1}...") + timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3] + filename = os.path.join(self.output_dir, f"frame_{timestamp_str}.png") + + try: + cv2.imwrite(filename, frame_bgr_raw) # <-- 원본 이미지 저장 + captured_count += 1 + print(f"Saved frame to {filename}. Total captured: {captured_count}") + except Exception as e: + print(f"Error saving image: {e}") + + elif key == ord('q') or key == 27: # 'q' 또는 ESC + print("Quit signal received. Exiting capture loop.") + break + + except Exception as e: + print(f"An error occurred during the capture loop: {e}") + import traceback + traceback.print_exc() + + finally: + cv2.destroyAllWindows() + print("Capture loop finished.") + + return captured_count + + def capture_and_calibrate(self, output_yaml_file: str): + """ + 이미지 캡쳐를 실행하고, 완료 후 자동으로 캘리브레이션을 수행합니다. + """ + print(f"--- Intrinsic Capture and Calibration for Camera {self.camera_id} ---") + + # 1. 파이프라인 빌드 및 시작 + if not self.build_pipeline(): + print("Failed to build pipeline. Cannot proceed with capture or calibration.") + return + + if not self.start_pipeline(): + print("Failed to start pipeline. Cannot proceed with capture or calibration.") + self.stop_pipeline() + return + + # 2. 캡쳐 루프 실행 + captured_count = self.run_capture_loop() + + # 3. 파이프라인 중지 + self.stop_pipeline() + + # 4. 자동 캘리브레이션 실행 + if captured_count > 0: # 캡쳐된 이미지가 하나라도 있을 경우에만 캘리브레이션 시도 + print(f"\n--- Capture finished. Automatically starting Intrinsic Calibration ---") + if IntrinsicCalibrator is None: + print("Cannot perform automatic calibration: IntrinsicCalibrator class not found (check intrinsic_calibrator.py).") + return + + try: + # IntrinsicCalibrator 객체 생성 및 실행 + # 캡쳐된 이미지가 있는 디렉터리, 체커보드 정보, 카메라 ID 사용 + calibrator = IntrinsicCalibrator( + image_dir=self.output_dir, + grid_x=self.pattern_size[0], # 캘리브레이터는 grid_x/y를 사용 + grid_y=self.pattern_size[1], + grid_size=self.grid_size, + camera_id=self.camera_id + ) + calibrator.calibrate() # 캘리브레이션 수행 + + # 캘리브레이션 결과 저장 + if calibrator.camera_matrix is not None: # 캘리브레이션 성공 시 + calibrator.save_to_yaml(output_yaml_file) + else: + print("Automatic intrinsic calibration calculation failed.") + + except Exception as e: + print(f"An unexpected error occurred during automatic intrinsic calibration: {e}") + import traceback + traceback.print_exc() + else: + print("\nNo images captured. Skipping automatic intrinsic calibration.") + + +def main(): + parser = argparse.ArgumentParser(description="단일 카메라 Intrinsic Calibration 이미지 캡쳐 및 캘리브레이션") + + Gst.init(sys.argv) + + parser.add_argument('--camera_id', type=int, default=1, help="사용할 카메라 번호 (sensor-id)") + parser.add_argument('--mode', type=int, default=2, help="카메라 센서 모드 (예: 2)") + parser.add_argument('--hflip', action='store_true', help="수평 반전 활성화") + parser.add_argument('--vflip', action='store_true', help="수직 반전 활성화") + parser.add_argument('--width', type=int, default=320, help="이미지/파이프라인 해상도 너비") + parser.add_argument('--height', type=int, default=256, help="이미지/파이프라인 해상도 높이") + parser.add_argument('--fps', type=int, default=15, help="파이프라인 프레임 레이트") + + parser.add_argument('--output_dir', type=str, default="intrinsic_calib_images", help="캡쳐 이미지 저장 디렉터리 (모든 카메라 공통)") + + parser.add_argument('--output_yaml_base', type=str, default="intrinsic_param", help="Intrinsic 캘리브레이션 결과 YAML 파일 기본 이름") + + # 체커보드 패턴 인자 + parser.add_argument('--pattern_width', type=int, default=10, help="체커보드 내부 코너 수 (가로)") + parser.add_argument('--pattern_height', type=int, default=7, help="체커보드 내부 코너 수 (세로)") + parser.add_argument('--grid_size', type=float, default=0.025, help="체커보드 한 칸의 실제 크기 (미터 단위 - 캘리브레이션 전달용)") + + args = parser.parse_args() + + final_output_dir = args.output_dir + final_output_dir = Path(final_output_dir) / f'cam{args.camera_id}' + + # 카메라별 캘리브레이션 결과가 저장될 최종 YAML 파일 이름 + final_output_yaml = f"params/{args.output_yaml_base}_cam{args.camera_id}.yaml" + + # 출력 디렉터리 생성 (모든 카메라가 이 디렉토리에 이미지 저장) + os.makedirs(final_output_dir, exist_ok=True) + print(f"Capture images will be saved to: {final_output_dir}") + print(f"Calibration result for Camera {args.camera_id} will be saved to: {final_output_yaml}") + + + # IntrinsicCapturer 객체 생성 시 파싱된 args 값을 사용 + capturer = IntrinsicCapturer( + camera_id=args.camera_id, + cam_mode=args.mode, + hflip=args.hflip, + vflip=args.vflip, + width=args.width, + height=args.height, + fps=args.fps, + output_dir=final_output_dir, # 이미지는 공통 디렉토리에 저장 + pattern_width=args.pattern_width, + pattern_height=args.pattern_height, + grid_size=args.grid_size + ) + + # 캡쳐 및 자동 캘리브레이션 실행, 결과는 카메라별 고유 YAML 파일에 저장 + capturer.capture_and_calibrate(final_output_yaml) + + print(f"\n✅ Intrinsic Capture and Calibration process finished for Camera {args.camera_id}.") + + +if __name__ == "__main__": + main() + diff --git a/week11/calibration/rectify_images.py b/week11/calibration/rectify_images.py new file mode 100644 index 0000000..04e00af --- /dev/null +++ b/week11/calibration/rectify_images.py @@ -0,0 +1,338 @@ +# Filename: rectify_saved_images.py +# Description: Rectify saved stereo image pairs from multiple directories using a stereo calibration file and save the rectified images. +# (Argparse removed, configuration is set within the code) + +import sys +import os +import glob +# import argparse # Argparse 제거 +import numpy as np +import cv2 +import yaml +from pathlib import Path +# from datetime import datetime # 파일 이름 처리에 필요할 수 있음 (현재 사용 안함) + +# --- Helper Functions (Included for self-containment) --- + +# stereo_preview.py 및 stereo_calibrator.py에서 사용된 로드 함수와 동일합니다. +def load_stereo_calibration_yaml_standard(yaml_file_path: Path): + """ + 표준 YAML 형식으로 저장된 스테레오 캘리브레이션 결과를 불러옵니다. + """ + if not yaml_file_path.exists(): + print(f"Error: Calibration file not found at {yaml_file_path}") + return None + + try: + with open(yaml_file_path, 'r') as f: + print(f"Attempting to load stereo YAML from {yaml_file_path} using yaml.safe_load...") + calib_data = yaml.safe_load(f) + + # 렉티피케이션에 필요한 데이터 로드 및 numpy 배열로 변환 + cameraMatrix1 = np.array(calib_data.get('cameraMatrix1_result')) + distCoeffs1 = np.array(calib_data.get('distCoeffs1_result')) + cameraMatrix2 = np.array(calib_data.get('cameraMatrix2_result')) + distCoeffs2 = np.array(calib_data.get('distCoeffs2_result')) + R1 = np.array(calib_data.get('R1')) + R2 = np.array(calib_data.get('R2')) + P1 = np.array(calib_data.get('P1')) + P2 = np.array(calib_data.get('P2')) + + img_width = calib_data.get('image_width') + img_height = calib_data.get('image_height') + img_size_calibrated = (img_width, img_height) + + validPixROI1 = calib_data.get('validPixROI1') # 튜플 또는 List 형태로 저장되어 있다면 그대로 로드 + validPixROI2 = calib_data.get('validPixROI2') # 튜플 또는 List 형태로 저장되어 있다면 그대로 로드 + + + # Check if essential data is loaded + if cameraMatrix1 is None or distCoeffs1 is None or cameraMatrix2 is None or distCoeffs2 is None or \ + R1 is None or R2 is None or P1 is None or P2 is None or img_width is None or img_height is None: + print("Error: Missing essential calibration data (matrices, image size) in YAML file for rectification.") + missing_keys = [k for k in ['cameraMatrix1_result', 'distCoeffs1_result', 'cameraMatrix2_result', 'distCoeffs2_result', + 'R1', 'R2', 'P1', 'P2', 'image_width', 'image_height'] if calib_data.get(k) is None] + print(f"Missing keys: {missing_keys}") + + return None + + print("Standard YAML calibration data loaded successfully.") + + # 렉티피케이션 맵 생성에 필요한 모든 데이터를 반환 + loaded_data = { + 'cameraMatrix1_result': cameraMatrix1, + 'distCoeffs1_result': distCoeffs1, + 'cameraMatrix2_result': cameraMatrix2, + 'distCoeffs2_result': distCoeffs2, + 'R1': R1, + 'R2': R2, + 'P1': P1, + 'P2': P2, + 'image_size_calibrated': img_size_calibrated, # 캘리브레이션 시 사용된 이미지 크기 반환 + 'validPixROI1': validPixROI1, + 'validPixROI2': validPixROI2, + } + + return loaded_data + + except Exception as e: + print(f"Error loading calibration data from {yaml_file_path}: {e}") + print("This error likely means the YAML file is not in the standard format saved by stereo_calibrator.py.") + return None + +# stereo_preview.py에서 사용된 맵 생성 함수와 동일합니다. +def create_rectification_maps(cameraMatrix1, distCoeffs1, R1, P1, + cameraMatrix2, distCoeffs2, R2, P2, img_size_output): + """ + 주어진 캘리브레이션 파라미터와 출력 이미지 크기로 렉티피케이션 매핑 테이블을 생성합니다. + """ + print(f"Creating rectification maps for output size: {img_size_output}") + try: + map1_left, map2_left = cv2.initUndistortRectifyMap( + cameraMatrix1, distCoeffs1, R1, P1, img_size_output, cv2.CV_32FC1 + ) + map1_right, map2_right = cv2.initUndistortRectifyMap( + cameraMatrix2, distCoeffs2, R2, P2, img_size_output, cv2.CV_32FC1 + ) + print("Rectification maps created.") + return map1_left, map2_left, map1_right, map2_right + except Exception as e: + print(f"Error creating rectification maps: {e}") + return None, None, None, None + + +def derive_output_dir(input_dir: str, prefix: str = "rectified_"): + """ + 입력 디렉터리 경로로부터 출력 디렉터리 경로를 생성합니다. + (예: /path/to/images_left -> /path/to/rectified_images_left) + """ + input_path = Path(input_dir) + parent_dir = input_path.parent + dir_name = input_path.name + output_dir_name = f"{prefix}{dir_name}" + output_path = parent_dir / output_dir_name + return str(output_path) + + +def process_image_pair_directory(left_input_dir: str, right_input_dir: str, + left_output_dir: str, right_output_dir: str, + calib_data, img_size_input, img_size_output, + map1_left, map2_left, map1_right, map2_right, + crop_to_valid_roi: bool): + """ + 하나의 좌/우 이미지 디렉터리 쌍에 대해 렉티피케이션을 수행합니다. + """ + print(f"\nProcessing directory pair: {left_input_dir} and {right_input_dir}") + print(f"Saving rectified images to: {left_output_dir} and {right_output_dir}") + + # --- 출력 디렉터리 생성 --- + try: + os.makedirs(left_output_dir, exist_ok=True) + os.makedirs(right_output_dir, exist_ok=True) + except Exception as e: + print(f"Error creating output directories {left_output_dir}, {right_output_dir}: {e}") + return 0 # 처리 실패 + + # --- 이미지 파일 목록 가져오기 및 처리 --- + left_images = sorted(glob.glob(os.path.join(left_input_dir, "*.png"))) # 예시로 png만 + right_images = sorted(glob.glob(os.path.join(right_input_dir, "*.png"))) + + print(f" Found {len(left_images)} left images and {len(right_images)} right images.") + + if len(left_images) == 0 or len(left_images) != len(right_images): + print(" Error: Image counts do not match or no images found in input directories. Skipping this pair.") + if len(left_images) > 0 and len(right_images) > 0: + print(f" Left: {len(left_images)}, Right: {len(right_images)}") + return 0 # 처리 실패 + + # validPixROI 정보 로드 (자르기 옵션 사용 시 필요) + validPixROI1 = calib_data.get('validPixROI1') + validPixROI2 = calib_data.get('validPixROI2') + + processed_count = 0 + total_images_in_pair = len(left_images) + print(f" Processing {total_images_in_pair} image pairs...") + + for i in range(total_images_in_pair): + left_path = left_images[i] + right_path = right_images[i] + + # 파일명 일치 여부 간단히 확인 (timestamps) + left_filename = os.path.basename(left_path) + right_filename = os.path.basename(right_path) + # 'left_' 또는 'right_' 접두사를 제거하고 나머지 파일 이름이 동일한지 확인 + left_base_name = left_filename.split('_', 1)[-1] if '_' in left_filename else left_filename + right_base_name = right_filename.split('_', 1)[-1] if '_' in right_filename else right_filename + + if left_base_name != right_base_name: + print(f" Warning: Filenames do not match for pair {i+1}: {left_filename} vs {right_filename}. Skipping pair.") + continue + + # 이미지 로드 (원본 왜곡 이미지) + img_left_raw = cv2.imread(left_path) + img_right_raw = cv2.imread(right_path) + + if img_left_raw is None or img_right_raw is None: + print(f" Error: Could not read image pair {i+1} ({left_filename}). Skipping.") + continue + + # 이미지 크기 확인 (지정된 입력 크기와 일치해야 함) + if img_left_raw.shape[:2][::-1] != img_size_input: + print(f" Warning: Left image size mismatch for {left_filename}. Expected {img_size_input}, got {img_left_raw.shape[:2][::-1]}. Skipping pair.") + continue + if img_right_raw.shape[:2][::-1] != img_size_input: + print(f" Warning: Right image size mismatch for {right_filename}. Expected {img_size_input}, got {img_right_raw.shape[:2][::-1]}. Skipping pair.") + continue + + # --- 이미지 렉티피케이션 적용 --- + rectified_left = cv2.remap(img_left_raw, map1_left, map2_left, cv2.INTER_LINEAR) + rectified_right = cv2.remap(img_right_raw, map1_right, map2_right, cv2.INTER_LINEAR) + + # --- 유효 영역으로 자르기 (옵션) --- + if crop_to_valid_roi and validPixROI1 and validPixROI2: + try: + # validPixROI는 튜플 또는 리스트로 로드됩니다. + x1, y1, w1, h1 = validPixROI1 if isinstance(validPixROI1, tuple) else tuple(validPixROI1) + x2, y2, w2, h2 = validPixROI2 if isinstance(validPixROI2, tuple) else tuple(validPixROI2) + + # 자르기 영역 계산 (간단하게 ROI1 기준) + rectified_left = rectified_left[y1:y1+h1, x1:x1+w1] + rectified_right = rectified_right[y2:y2+h2, x2:x2+w2] + + except Exception as e: + print(f" Warning: Failed to crop image pair {i+1} to valid ROI: {e}. Saving full rectified image.") + + + # --- 렉티피케이션된 이미지 저장 --- + # 출력 파일 이름 생성 (예: rectified_left_frame_timestamp.png) + output_left_filename = os.path.join(left_output_dir, f"rectified_{left_filename}") + output_right_filename = os.path.join(right_output_dir, f"rectified_{right_filename}") + + try: + cv2.imwrite(output_left_filename, rectified_left) + cv2.imwrite(output_right_filename, rectified_right) + processed_count += 1 + except Exception as e: + print(f" Error: Failed to save rectified image pair {i+1}: {e}") + + # 진행 상황 표시 (선택 사항) + if (i + 1) % 50 == 0 or (i + 1) == total_images_in_pair: + print(f" Processed {i + 1}/{total_images_in_pair} pairs.") + + + print(f"Finished processing directory pair. Successfully rectified and saved {processed_count} pairs from {left_input_dir}/{right_input_dir}.") + return processed_count + + +def main(): + # --- 설정 구간: 여기에 값을 직접 입력하세요 --- + config = { + # 원본(왜곡된) 왼쪽 이미지가 저장된 디렉터리 목록 (순서 중요) + 'right_dirs': [ + 'stereo_calib_images/cam0', + ], + # 원본(왜곡된) 오른쪽 이미지가 저장된 디렉터리 목록 (left_dirs와 순서 및 개수 일치) + 'left_dirs': [ + 'stereo_calib_images/cam1', + ], + # 스테레오 캘리브레이션 결과 YAML 파일 경로 (모든 폴더 쌍에 동일 적용) + 'stereo_calib_yaml': 'params/stereo_calibration_results.yaml', # 실제 파일 경로로 변경하세요 + + # 이미지 너비와 높이 (캘리브레이션 및 원본 이미지 해상도와 일치해야 함) + 'width': 320, # 실제 해상도로 변경하세요 + 'height': 256, # 실제 해상도로 변경하세요 + + # 렉티피케이션된 이미지를 유효 영역(validPixROI)으로 자를지 여부 (True/False) + 'crop_to_valid_roi': False, # 필요에 따라 변경하세요 + + # 출력 폴더 이름에 사용할 접두사 (예: rectified_calib_images_cam0_set1) + 'output_prefix': 'rectified_new_', # 필요에 따라 변경하세요 + } + # --- 설정 구간 끝 --- + + + # --- 설정 유효성 검사 --- + if len(config['left_dirs']) != len(config['right_dirs']): + print("Error: The number of left directories and right directories in the config must be the same.") + sys.exit(1) + + # 각 입력 디렉터리가 실제로 존재하는지 사전 확인 + print("--- Checking Input Directories ---") + for d in config['left_dirs'] + config['right_dirs']: + if not os.path.isdir(d): + print(f"Error: Input directory not found at {d}") + sys.exit(1) + print(f"Found input directory: {d}") + + # 캘리브레이션 파일 존재 확인 + if not os.path.exists(config['stereo_calib_yaml']): + print(f"Error: Stereo calibration file not found at {config['stereo_calib_yaml']}") + sys.exit(1) + print(f"Found calibration file: {config['stereo_calib_yaml']}") + + + # --- 캘리브레이션 데이터 로드 (한 번만) --- + print("\n--- Loading Stereo Calibration Data ---") + calib_data = load_stereo_calibration_yaml_standard(Path(config['stereo_calib_yaml'])) + + if calib_data is None: + print("Failed to load stereo calibration data. Cannot rectify images.") + sys.exit(1) + + # 캘리브레이션 이미지 사이즈와 현재 지정된 이미지 사이즈 비교 + img_size_calibrated = calib_data.get('image_size_calibrated') # 로드된 튜플 형태 + img_size_input = (config['width'], config['height']) # 사용자가 지정한 입력 이미지 크기 + + if img_size_input != img_size_calibrated: + print(f"Error: Specified image size ({img_size_input}) in config does not match calibration file size ({img_size_calibrated}).") + print("Calibration and image sizes must match for accurate rectification.") + sys.exit(1) + print(f"Image size validation successful: {img_size_input}") + + + # --- 렉티피케이션 맵 생성 (한 번만) --- + # 맵 생성 시 사용자가 지정한 입력 이미지 크기 (출력 이미지 크기) 사용 + map1_left, map2_left, map1_right, map2_right = create_rectification_maps( + calib_data.get('cameraMatrix1_result'), calib_data.get('distCoeffs1_result'), + calib_data.get('R1'), calib_data.get('P1'), + calib_data.get('cameraMatrix2_result'), calib_data.get('distCoeffs2_result'), + calib_data.get('R2'), calib_data.get('P2'), + img_size_input # 맵 생성 시 사용될 출력 이미지 크기 + ) + + if map1_left is None: # 맵 생성 실패 시 + print("Failed to create rectification maps. Cannot rectify images.") + sys.exit(1) + + # --- 각 디렉터리 쌍 순차적으로 처리 --- + total_processed_pairs_count = 0 + num_directory_pairs = len(config['left_dirs']) + + print(f"\n--- Starting Rectification of {num_directory_pairs} Directory Pairs ---") + + for i in range(num_directory_pairs): + left_input_dir = config['left_dirs'][i] + right_input_dir = config['right_dirs'][i] + + # 출력 디렉터리 경로 자동 생성 + left_output_dir = derive_output_dir(left_input_dir, config['output_prefix']) + right_output_dir = derive_output_dir(right_input_dir, config['output_prefix']) + + print(f"\nProcessing pair {i+1}/{num_directory_pairs}:") + + # 현재 디렉터리 쌍 처리 함수 호출 + processed_count_in_pair = process_image_pair_directory( + left_input_dir, right_input_dir, + left_output_dir, right_output_dir, + calib_data, img_size_input, img_size_input, # 입력 이미지 크기와 출력 이미지 크기는 같음 + map1_left, map2_left, map1_right, map2_right, + config['crop_to_valid_roi'] + ) + total_processed_pairs_count += processed_count_in_pair + + print(f"\n--- Rectification Process Finished ---") + print(f"Successfully processed {total_processed_pairs_count} image pairs in total across all directories.") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/week11/calibration/stereo_calibrator.py b/week11/calibration/stereo_calibrator.py new file mode 100644 index 0000000..2bd82b9 --- /dev/null +++ b/week11/calibration/stereo_calibrator.py @@ -0,0 +1,525 @@ +# Filename: stereo_calibrator.py +# Description: Perform stereo calibration from saved stereo image pairs and intrinsic calibration files. + +import sys +import os +import glob +import argparse +import numpy as np +import cv2 +import yaml +from pathlib import Path + +# intrinsic_calibrator.py 스크립트에서 사용된 로드 함수를 포함합니다. +# 이는 좌우 intrinsic 데이터를 로드하기 위함입니다. +def load_intrinsic_calibration_yaml(yaml_file: Path): + """ + Intrinsic calibration 결과를 YAML 파일에서 불러옵니다. + """ + # (load_intrinsic_calibration_yaml 함수 코드는 이전 intrinsic_calibrator.py의 + # load_calibration_yaml 메서드와 동일합니다. 여기에 다시 포함시키거나 + # 별도 유틸리티 파일에서 임포트할 수 있습니다. 여기서는 편의상 포함시킵니다.) + if not yaml_file.exists(): + print(f"Error: Intrinsic calibration file not found at {yaml_file}") + return None, None, None, None + + try: + with open(yaml_file, 'r') as f: + # print(f"Attempting to load intrinsic YAML from {yaml_file} using yaml.safe_load...") + calib_data = yaml.safe_load(f) + + camera_id = calib_data.get('camera_id') # 카메라 ID 로드 + camera_matrix = np.array(calib_data.get('camera_matrix')) + distortion_coefficients = np.array(calib_data.get('distortion_coefficients')) + image_width = calib_data.get('image_width') + image_height = calib_data.get('image_height') + # 그리드 정보 등 다른 intrinsic 정보도 필요에 따라 로드 가능 + + if camera_matrix is None or distortion_coefficients is None or image_width is None or image_height is None: + print(f"Error: Missing essential data in intrinsic calibration file {yaml_file}") + return None, None, None, None + + if distortion_coefficients.ndim == 1: + distortion_coefficients = distortion_coefficients.reshape(1, -1) + + # print(f"Intrinsic calibration data loaded successfully from {yaml_file}.") + return camera_id, camera_matrix, distortion_coefficients, (image_width, image_height) + + except Exception as e: + print(f"Error loading intrinsic calibration data from {yaml_file}: {e}") + print("This error likely means the intrinsic YAML file is not in the format saved by intrinsic_calibrator.py.") + return None, None, None, None + + +class StereoCalibrationCalculator: + """ + 스테레오 캘리브레이션 계산 과정을 캡슐화하는 클래스. + """ + def __init__(self, left_image_dir: str, right_image_dir: str, + left_intrinsics_yaml: str, right_intrinsics_yaml: str, + pattern_width: int, pattern_height: int, square_size: float, + image_width: int, image_height: int, fix_intrinsics: bool = True): + """ + StereoCalibrationCalculator 클래스를 초기화합니다. + + Args: + left_image_dir (str): 왼쪽 카메라 캘리브레이션 이미지가 있는 디렉터리. + right_image_dir (str): 오른쪽 카메라 캘리브레이션 이미지가 있는 디렉터리. + left_intrinsics_yaml (str): 왼쪽 intrinsic YAML 파일 경로. + right_intrinsics_yaml (str): 오른쪽 intrinsic YAML 파일 경로. + pattern_width (int): 체커보드 내부 코너 x 개수. + pattern_height (int): 체커보드 내부 코너 y 개수. + square_size (float): 체커보드 한 칸의 실제 크기. + image_width (int): 캘리브레이션 이미지의 너비 (해상도). + image_height (int): 캘리브레이션 이미지의 높이 (해상도). + fix_intrinsics (bool): stereoCalibrate 시 intrinsic 고정 여부 (CALIB_FIX_INTRINSIC 사용). + """ + if not os.path.isdir(left_image_dir): + raise FileNotFoundError(f"Left image directory not found: {left_image_dir}") + if not os.path.isdir(right_image_dir): + raise FileNotFoundError(f"Right image directory not found: {right_image_dir}") + if not os.path.exists(left_intrinsics_yaml): + raise FileNotFoundError(f"Left intrinsics file not found: {left_intrinsics_yaml}") + if not os.path.exists(right_intrinsics_yaml): + raise FileNotFoundError(f"Right intrinsics file not found: {right_intrinsics_yaml}") + + self.left_image_dir = left_image_dir + self.right_image_dir = right_image_dir + self.left_intrinsics_yaml = left_intrinsics_yaml + self.right_intrinsics_yaml = right_intrinsics_yaml + self.pattern_size = (pattern_width, pattern_height) + self.square_size = square_size + self.image_size = (image_width, image_height) + self.fix_intrinsics = fix_intrinsics + + # 로드된 Intrinsic 결과 저장 변수 + self.left_cam_id = None + self.left_intrinsic_mtx = None + self.left_dist_coeffs = None + self.left_img_size_calibrated = None + + self.right_cam_id = None + self.right_intrinsic_mtx = None + self.right_dist_coeffs = None + self.right_img_size_calibrated = None + + # 수집된 데이터 저장 변수 + self.objpoints = [] + self.imgpoints_left = [] + self.imgpoints_right = [] + self.collected_count = 0 + + # Stereo 캘리브레이션 결과 저장 변수 + self.stereo_reprojection_error = None + self.R = None + self.T = None + self.E = None + self.F = None + self.R1 = None + self.R2 = None + self.P1 = None + self.P2 = None + self.Q = None + self.validPixROI1 = None + self.validPixROI2 = None + # 최종 stereoCalibrate 결과 Intrinsic (fix 시 로드된 값, use_guess 시 최적화된 값) + self.cameraMatrix1_result = None + self.distCoeffs1_result = None + self.cameraMatrix2_result = None + self.distCoeffs2_result = None + + + def _load_intrinsics(self): + """Intrinsic calibration YAML 파일들을 로드합니다.""" + print("\n--- Loading Intrinsic Calibration Data ---") + self.left_cam_id, self.left_intrinsic_mtx, self.left_dist_coeffs, self.left_img_size_calibrated = load_intrinsic_calibration_yaml(Path(self.left_intrinsics_yaml)) + self.right_cam_id, self.right_intrinsic_mtx, self.right_dist_coeffs, self.right_img_size_calibrated = load_intrinsic_calibration_yaml(Path(self.right_intrinsics_yaml)) + + if self.left_intrinsic_mtx is None or self.right_intrinsic_mtx is None: + print("Error: Failed to load one or both intrinsic calibration files.") + return False # 로드 실패 + + # Intrinsic 캘리브레이션 이미지 사이즈와 현재 지정된 이미지 사이즈 비교 + if self.image_size != self.left_img_size_calibrated or self.image_size != self.right_img_size_calibrated: + print(f"Warning: Specified image size ({self.image_size}) does not match intrinsic calibration size (Left: {self.left_img_size_calibrated}, Right: {self.right_img_size_calibrated}).") + print("Proceeding with stereo calibration using specified image size, but results may be inaccurate if sizes differ significantly.") + # 이 경우, CALIB_FIX_INTRINSIC 보다는 CALIB_USE_INTRINSIC_GUESS가 권장됩니다. + + print("Intrinsic data loaded successfully.") + return True # 로드 성공 + + + def _collect_stereo_data(self): + """ + 지정된 좌/우 이미지 디렉터리에서 스테레오 이미지 파일을 읽어 코너를 찾고 데이터를 수집합니다. + """ + print(f"\n--- Collecting Stereo Data from Saved Images ---") + print(f"Loading images from: {self.left_image_dir} and {self.right_image_dir}") + print(f"Looking for chessboard pattern: {self.pattern_size[0]}x{self.pattern_size[1]}") + print(f"Expected image size: {self.image_size}") + + # Assume filenames correspond based on sorting + left_images = sorted(glob.glob(os.path.join(self.left_image_dir, '*.png'))) # 예시로 png만 + right_images = sorted(glob.glob(os.path.join(self.right_image_dir, '*.png'))) + + print(f"Found {len(left_images)} left images and {len(right_images)} right images.") + + # Check file count and potential pairing issues + if len(left_images) == 0 or len(left_images) != len(right_images): + print("Error: Image counts do not match or no images found. Cannot proceed with data collection.") + if len(left_images) > 0 and len(right_images) > 0: + print(f"Left: {len(left_images)}, Right: {len(right_images)}") + self.collected_count = 0 + return False # 데이터 수집 실패 + + # 3D object points 준비 (체커보드 좌표계) + objp = np.zeros((self.pattern_size[0] * self.pattern_size[1], 3), np.float32) + objp[:, :2] = np.mgrid[0:self.pattern_size[0], 0:self.pattern_size[1]].T.reshape(-1, 2) * self.square_size + + objpoints = [] + imgpoints_left = [] + imgpoints_right = [] + processed_count = 0 + + # 체커보드 검출 플래그 (원본 이미지에서 코너 찾기) + flags = cv2.CALIB_CB_ADAPTIVE_THRESH | cv2.CALIB_CB_NORMALIZE_IMAGE + + print(f"Processing {len(left_images)} image pairs...") + # (Optional) 디버그 창을 여기서 열어 코너 인식 실패 이미지 확인 가능 + # cv2.namedWindow("Corner Detection Debug", cv2.WINDOW_NORMAL) + # cv2.resizeWindow("Corner Detection Debug", self.image_size[0] * 2, self.image_size[1]) + + + for i in range(len(left_images)): + left_path = left_images[i] + right_path = right_images[i] + + # 파일명 일치 여부 간단히 확인 (timestamps) + left_id = os.path.basename(left_path)[len("left_"):] + right_id = os.path.basename(right_path)[len("right_"):] + # left_id = os.path.basename(left_path)[len("right_"):] + # right_id = os.path.basename(right_path)[len("left_"):] + + if left_id != right_id: + # print(f"Warning: Filenames do not match for pair {i+1}: {os.path.basename(left_path)} vs {os.path.basename(right_path)}. Skipping pair.") + continue + + # 이미지 로드 (원본 왜곡 이미지) + img_left_raw = cv2.imread(left_path) + img_right_raw = cv2.imread(right_path) + + if img_left_raw is None or img_right_raw is None: + # print(f"Error: Could not read image pair {i+1}. Skipping.") + continue + + # 이미지 크기 확인 (모든 이미지가 동일 크기여야 함) + if img_left_raw.shape[:2][::-1] != self.image_size: + # print(f"Warning: Left image size mismatch for pair {i+1}. Expected {self.image_size}, got {img_left_raw.shape[:2][::-1]}. Skipping pair.") + continue + if img_right_raw.shape[:2][::-1] != self.image_size: + # print(f"Warning: Right image size mismatch for pair {i+1}. Expected {self.image_size}, got {img_right_raw.shape[:2][::-1]}. Skipping pair.") + continue + + # 체커보드 검출 (GRAY 이미지에서 수행) + gray_left_raw = cv2.cvtColor(img_left_raw, cv2.COLOR_BGR2GRAY) + gray_right_raw = cv2.cvtColor(img_right_raw, cv2.COLOR_BGR2GRAY) + + # findChessboardCornersSB 사용 시 (일반적으로 더 정확) + # ret_left, corners_left = cv2.findChessboardCornersSB(gray_left_raw, self.pattern_size) + # ret_right, corners_right = cv2.findChessboardCornersSB(gray_right_raw, self.pattern_size) + + # 일반 findChessboardCorners 사용 시 (더 많은 이미지에서 작동할 수 있음) + ret_left, corners_left = cv2.findChessboardCorners(gray_left_raw, self.pattern_size, flags) + ret_right, corners_right = cv2.findChessboardCorners(gray_right_raw, self.pattern_size, flags) + + if ret_left and ret_right: + # 코너 정밀화 (원본 이미지에서 찾은 코너를 원본 이미지에서 정밀화) + criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 300, 1e-6) + corners_left_refined = cv2.cornerSubPix(gray_left_raw, corners_left, (11, 11), (-1, -1), criteria) + corners_right_refined = cv2.cornerSubPix(gray_right_raw, corners_right, (11, 11), (-1, -1), criteria) + + # 데이터 저장 + objpoints.append(objp) + imgpoints_left.append(corners_left_refined) + imgpoints_right.append(corners_right_refined) + processed_count += 1 + # print(f" Collected pair {processed_count}.") + + # (Optional) 디버그 창에 성공 이미지와 코너 표시 + # img_debug = np.hstack((cv2.drawChessboardCorners(img_left_raw.copy(), self.pattern_size, corners_left_refined, ret_left), + # cv2.drawChessboardCorners(img_right_raw.copy(), self.pattern_size, corners_right_refined, ret_right))) + # cv2.imshow("Corner Detection Debug", img_debug) + # cv2.waitKey(50) # 짧게 대기 + + # else: + # (Optional) 디버그 창에 실패 이미지와 상태 표시 + # img_debug_fail = np.hstack((img_left_raw.copy(), img_right_raw.copy())) + # cv2.putText(img_debug_fail, f"Pair {i+1}: Left Found: {ret_left}, Right Found: {ret_right}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) + # cv2.imshow("Corner Detection Debug", img_debug_fail) + # key = cv2.waitKey(0) # 실패 시 키 입력 대기 + # if key == ord('q') or key == 27: # ESC or 'q' + # break # 루프 중단 + + + # cv2.destroyAllWindows() # 디버그 창 닫기 + + self.objpoints = objpoints + self.imgpoints_left = imgpoints_left + self.imgpoints_right = imgpoints_right + self.collected_count = processed_count + + print(f"Finished collecting data. Successfully collected {self.collected_count} stereo pairs.") + + return self.collected_count > 0 # 데이터 수집 성공 여부 반환 + + + def _perform_calibration(self): + """수집된 데이터와 로드된 intrinsic으로 스테레오 캘리브레이션을 수행합니다.""" + print("\n--- Performing Stereo Calibration Calculation ---") + min_pairs_needed = 5 + if self.collected_count < min_pairs_needed: + print(f"Error: Need at least {min_pairs_needed} captured pairs for stereo calibration, but only {self.collected_count} pairs collected. Cannot perform calculation.") + return False # 캘리브레이션 실패 + + if self.left_intrinsic_mtx is None or self.left_dist_coeffs is None or self.right_intrinsic_mtx is None or self.right_dist_coeffs is None: + print("Error: Intrinsic calibration data not loaded for both cameras. Cannot perform stereo calibration.") + return False # 캘리브레이션 실패 + + # 캘리브레이션 플래그 설정 + calibration_flags = cv2.CALIB_ZERO_DISPARITY # 왼쪽 카메라의 주점을 (0,0)으로 설정 (렉티피케이션에 유용) + + if self.fix_intrinsics: + calibration_flags |= cv2.CALIB_FIX_INTRINSIC + # 만약 로드된 intrinsic 이미지 사이즈와 현재 지정 사이즈가 다르다면, + # CALIB_FIX_INTRINSIC 사용 시 오류가 발생하거나 결과가 틀릴 수 있습니다. + # 이때는 CALIB_USE_INTRINSIC_GUESS를 사용하는 것이 더 나을 수 있습니다. + if self.image_size != self.left_img_size_calibrated or self.image_size != self.right_img_size_calibrated: + print("Warning: CALIB_FIX_INTRINSIC is used, but image sizes mismatch intrinsic calibration. Consider setting fix_intrinsics=False.") + print("Stereo calibration flags: CALIB_FIX_INTRINSIC | CALIB_ZERO_DISPARITY") + else: + calibration_flags |= cv2.CALIB_USE_INTRINSIC_GUESS + print("Stereo calibration flags: CALIB_USE_INTRINSIC_GUESS | CALIB_ZERO_DISPARITY") + + try: + # cv2.stereoCalibrate 호출 + ret, cameraMatrix1_res, distCoeffs1_res, cameraMatrix2_res, distCoeffs2_res, R, T, E, F = cv2.stereoCalibrate( + self.objpoints, self.imgpoints_left, self.imgpoints_right, + self.left_intrinsic_mtx, self.left_dist_coeffs, # <-- 로드된 intrinsic을 초기값/고정값으로 사용 + self.right_intrinsic_mtx, self.right_dist_coeffs, # <-- 로드된 intrinsic을 초기값/고정값으로 사용 + self.image_size, + flags=calibration_flags, + criteria=(cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 300, 1e-6) # 캘리브레이션 정밀도 기준 + ) + + self.stereo_reprojection_error = ret + print(f"Stereo calibration reprojection error: {self.stereo_reprojection_error}") + if self.stereo_reprojection_error > 1.0: # 일반적인 기준 (1.0보다 크면 결과가 좋지 않다고 판단) + print("Warning: High reprojection error. Calibration results may not be accurate.") + + + # cv2.stereoRectify 호출: 스테레오 이미지를 같은 평면에 오도록 회전시켜 렉티피케이션 변환을 계산 + R1, R2, P1, P2, Q, validPixROI1, validPixROI2 = cv2.stereoRectify( + cameraMatrix1_res, distCoeffs1_res, cameraMatrix2_res, distCoeffs2_res, # 최적화된(또는 고정된) intrinsic 사용 + self.image_size, R, T, + flags=cv2.CALIB_ZERO_DISPARITY, + alpha=0 # 0이면 유효 픽셀만 포함, 1이면 모든 픽셀 포함 (검은 테두리 생김) + ) + + # 결과 변수에 저장 + self.R = R + self.T = T + self.E = E + self.F = F + self.R1 = R1 + self.R2 = R2 + self.P1 = P1 + self.P2 = P2 + self.Q = Q + # validPixROI는 튜플 형태이므로, YAML 저장을 위해 리스트로 변환 필요 + self.validPixROI1 = validPixROI1 + self.validPixROI2 = validPixROI2 + self.cameraMatrix1_result = cameraMatrix1_res + self.distCoeffs1_result = distCoeffs1_res + self.cameraMatrix2_result = cameraMatrix2_res + self.distCoeffs2_result = distCoeffs2_res + + + print("Stereo calibration calculation successful.") + return True # 캘리브레이션 성공 + + except Exception as e: + print(f"An error occurred during stereo calibration calculation: {e}") + import traceback + traceback.print_exc() + return False # 캘리브레이션 실패 + + + def calibrate(self): + """ + Intrinsic 로드, 데이터 수집, 스테레오 캘리브레이션 계산의 전체 과정을 실행합니다. + """ + print("\n--- Starting Stereo Calibration Process ---") + + # 1. Intrinsic 로드 + if not self._load_intrinsics(): + print("Failed to load intrinsic calibration data. Stereo calibration aborted.") + return False # 전체 프로세스 실패 + + # 2. 데이터 수집 + if not self._collect_stereo_data(): + print("Failed to collect enough stereo data from images. Stereo calibration aborted.") + return False # 전체 프로세스 실패 + + # 3. 캘리브레이션 계산 + if not self._perform_calibration(): + print("Stereo calibration calculation failed. Stereo calibration aborted.") + return False # 전체 프로세스 실패 + + print("\nStereo Calibration Process Completed Successfully.") + return True # 전체 프로세스 성공 + + + def save_to_yaml(self, output_file: str = "stereo_calibration_results.yaml"): + """ + 클래스 내부에 저장된 스테레오 캘리브레이션 결과를 YAML 파일로 저장합니다. + """ + if self.R is None or self.T is None or self.R1 is None or self.P1 is None: + print("Error: No valid stereo calibration data to save. Run calibrate() first.") + return False + + def convert_numpy(obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + return obj + + # validPixROI는 튜플이므로 .tolist()가 필요 없을 수 있습니다. + # 확실하게 리스트로 변환하거나 튜플 상태로 저장합니다. + # PyYAML은 기본적으로 튜플을 YAML 시퀀스로 저장합니다. + # 따라서 그대로 저장해도 괜찮습니다. + validPixROI1_serializable = self.validPixROI1 if self.validPixROI1 is None else list(self.validPixROI1) + validPixROI2_serializable = self.validPixROI2 if self.validPixROI2 is None else list(self.validPixROI2) + + + calib_data = { + "stereo_reprojection_error": self.stereo_reprojection_error, + "R": convert_numpy(self.R), + "T": convert_numpy(self.T), + "E": convert_numpy(self.E), + "F": convert_numpy(self.F), + "R1": convert_numpy(self.R1), + "R2": convert_numpy(self.R2), + "P1": convert_numpy(self.P1), + "P2": convert_numpy(self.P2), + "Q": convert_numpy(self.Q), + "image_width": self.image_size[0], + "image_height": self.image_size[1], + "pattern_size_width": self.pattern_size[0], + "pattern_size_height": self.pattern_size[1], + "square_size_meters": self.square_size, + "num_images_used": self.collected_count, + "cameraMatrix1_result": convert_numpy(self.cameraMatrix1_result), + "distCoeffs1_result": convert_numpy(self.distCoeffs1_result), + "cameraMatrix2_result": convert_numpy(self.cameraMatrix2_result), + "distCoeffs2_result": convert_numpy(self.distCoeffs2_result), + "validPixROI1": validPixROI1_serializable, # 튜플 그대로 저장 + "validPixROI2": validPixROI2_serializable, # 튜플 그대로 저장 + # 로드된 intrinsic 파일 경로도 저장하면 유용할 수 있습니다. + "left_intrinsics_yaml_used": str(Path(self.left_intrinsics_yaml).name), # 파일 이름만 저장 + "right_intrinsics_yaml_used": str(Path(self.right_intrinsics_yaml).name), + # 로드된 intrinsic 파일의 카메라 ID도 저장 + "left_camera_id_used": self.left_cam_id, + "right_camera_id_used": self.right_cam_id, + } + + def convert_Coefficients(coeffs): + if coeffs is None: + return None + if isinstance(coeffs, np.ndarray): + # (1, N) 형태이면 N개짜리 리스트로 변환 + return coeffs.flatten().tolist() + return coeffs # 이미 리스트 등 다른 형태일 경우 그대로 반환 + + + output_path = Path(output_file) + try: + with open(output_path, 'w') as outfile: + yaml.dump(calib_data, outfile, default_flow_style=False, indent=4, sort_keys=False) + print(f"Stereo calibration results saved successfully to {output_path}") + return True + except Exception as e: + print(f"Error saving stereo calibration data to {output_path}: {e}") + return False + + # @classmethod + # def load_from_yaml(cls, yaml_file: str): + # """ + # YAML 파일에서 스테레오 캘리브레이션 결과를 불러와 클래스 객체를 생성합니다. + # (이 객체는 이미지 디렉터리 등 계산에 필요한 원본 정보는 포함하지 않습니다.) + # Previewer에서 로딩하는 것과는 역할이 다릅니다. + # """ + # # 필요하다면 구현합니다. Previewer에서는 다른 로딩 함수를 사용합니다. + # pass + + +def main(): + """ + 스크립트 단독 실행 시 스테레오 캘리브레이션 계산을 수행하는 main 함수. + """ + parser = argparse.ArgumentParser(description="스테레오 카메라 캘리브레이션 계산 (YAML 입/출력)") + + parser.add_argument('--left_image_dir', type=str, default='stereo_calib_images_cam1', + help="왼쪽 카메라 캘리브레이션 이미지가 저장된 디렉터리") + parser.add_argument('--right_image_dir', type=str, default='stereo_calib_images_cam0', + help="오른쪽 카메라 캘리브레이션 이미지가 저장된 디렉터리") + parser.add_argument('--left_intrinsics_yaml', type=str, default='intrinsic_calibration_cam1.yaml', + help="왼쪽 카메라 intrinsic calibration YAML 파일 경로") + parser.add_argument('--right_intrinsics_yaml', type=str, default='intrinsic_calibration_cam0.yaml', + help="오른쪽 카메라 intrinsic calibration YAML 파일 경로") + parser.add_argument('--width', type=int, default=1280, + help="캘리브레이션 이미지의 너비 (해상도)") + parser.add_argument('--height', type=int, default=720, + help="캘리브레이션 이미지의 높이 (해상도)") + parser.add_argument('--pattern_width', type=int, default=10, + help="체커보드 내부 코너 수 (가로)") + parser.add_argument('--pattern_height', type=int, default=7, + help="체커보드 내부 코너 수 (세로)") + parser.add_argument('--square_size', type=float, default=0.025, + help="체커보드 한 칸의 실제 크기 (단위: intrinsic calib와 동일하게)") + parser.add_argument('--fix_intrinsics', action='store_true', default=True, + help="Stereo calibration 시 intrinsic parameters를 고정 (기본값: True - CALIB_FIX_INTRINSIC). " + "False로 설정 시 intrinsic parameters도 함께 최적화 (CALIB_USE_INTRINSIC_GUESS 사용).") + parser.add_argument('--output_yaml', type=str, default="stereo_calibration_results.yaml", + help="스테레오 캘리브레이션 결과 저장 YAML 파일 경로") + + + args = parser.parse_args() + + # 필수 입력 파일/디렉터리 존재 확인은 __init__에서 수행됩니다. + + # StereoCalibrationCalculator 객체 생성 + calibrator = StereoCalibrationCalculator( + left_image_dir=args.left_image_dir, + right_image_dir=args.right_image_dir, + left_intrinsics_yaml=args.left_intrinsics_yaml, + right_intrinsics_yaml=args.right_intrinsics_yaml, + pattern_width=args.pattern_width, + pattern_height=args.pattern_height, + square_size=args.square_size, + image_width=args.width, + image_height=args.height, + fix_intrinsics=args.fix_intrinsics + ) + + # 캘리브레이션 전체 과정 실행 (로드, 수집, 계산) + calibration_successful = calibrator.calibrate() + + # 결과 저장 + if calibration_successful: # 캘리브레이션 계산까지 성공했으면 + calibrator.save_to_yaml(args.output_yaml) + else: + print("\nStereo calibration process did not complete successfully.") + # sys.exit(1) # 에러 발생 시 종료하려면 주석 해제 + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/week11/calibration/stereo_capture.py b/week11/calibration/stereo_capture.py new file mode 100644 index 0000000..6eb0363 --- /dev/null +++ b/week11/calibration/stereo_capture.py @@ -0,0 +1,415 @@ +# Filename: stereo_capture.py (Modified to auto-calibrate stereo after capture) + +import sys +import os +import time +import argparse +from datetime import datetime +import glob # stereo_calibrator에서 필요하므로 미리 임포트 +import yaml # stereo_calibrator에서 필요하므로 미리 임포트 +from pathlib import Path # stereo_calibrator에서 필요하므로 미리 임포트 + +import cv2 +import numpy as np + +import gi +gi.require_version('Gst', '1.0') +gi.require_version('GstApp', '1.0') +from gi.repository import Gst, GLib + +# GStreamer 초기화는 main에서 수행합니다. + +# stereo_calibrator.py 스크립트에서 필요한 함수들을 가져옵니다. +# 따라서 이 스크립트와 stereo_calibrator.py는 같은 디렉터리에 있어야 합니다. + +try: + from stereo_calibrator import load_intrinsic_calibration_yaml, StereoCalibrationCalculator +except ImportError: + print("Error: Could not import load_intrinsic_calibration_yaml or StereoCalibrationCalculator from stereo_calibrator.py.") + print("Please ensure stereo_calibrator.py is in the same directory and has no syntax errors.") + load_intrinsic_calibration_yaml = None # 임포트 실패 시 None 설정 + StereoCalibrationCalculator = None # 임포트 실패 시 None 설정 + # 이 경우 자동 캘리브레이션은 불가능함을 알림 + print("Automatic stereo calibration after capture is disabled.") +print("Automatic stereo calibration after capture is disabled because stereo_calibrator.py could not be imported.") + + +def link_elements(*elements): + """여러 GStreamer 요소를 순차적으로 연결하는 헬퍼 함수.""" + for i in range(len(elements) - 1): + if not elements[i].link(elements[i+1]): + print(f"Failed to link {elements[i].name} to {elements[i+1].name}") + return False + return True + +def build_gst_pipeline(camera_id, cam_mode, hflip, vflip, width, height, fps): + """ + 주어진 카메라 ID와 설정으로 GStreamer 파이프라인을 생성합니다. + Appsink를 포함하여 OpenCV로 프레임을 가져올 수 있도록 구성합니다. + """ + """GStreamer 파이프라인을 생성하고 앱싱크를 반환합니다.""" + # 1) DeepStream 전용 nvvideoconvert 유무 검사 + if Gst.ElementFactory.find('nvvideoconvert'): + convert_plugin = 'nvvideoconvert' + deepstream_available = True + elif Gst.ElementFactory.find('nvvidconv'): + convert_plugin = 'nvvidconv' + deepstream_available = False + else: + print("Error: Neither 'nvvideoconvert' nor 'nvvidconv' plugins found! " + "Please install DeepStream or Jetson GStreamer extensions.") + return False + + print(f"🔌 Video converter plugin: {convert_plugin} " + f"(DeepStream available: {deepstream_available})") + try: + # ... (build_gst_pipeline 함수 내용은 이전 stereo_capture.py와 동일) + # GStreamer 요소 생성 + pipeline_name = f"pipeline_cam{camera_id}" + pipeline = Gst.Pipeline.new(pipeline_name) + + src = Gst.ElementFactory.make('nvarguscamerasrc', f'source_{camera_id}') + queue1 = Gst.ElementFactory.make('queue', f'queue1_{camera_id}') + caps_filter = Gst.ElementFactory.make('capsfilter', f'caps_filter_{camera_id}') + queue2 = Gst.ElementFactory.make('queue', f'queue2_{camera_id}') + video_convert = Gst.ElementFactory.make(convert_plugin, f'video_convert_{camera_id}') + queue3 = Gst.ElementFactory.make('queue', f'queue3_{camera_id}') + caps_filter2 = Gst.ElementFactory.make('capsfilter', f'caps_filter2_{camera_id}') + queue4 = Gst.ElementFactory.make('queue', f'queue4_{camera_id}') + appsink = Gst.ElementFactory.make('appsink', f'appsink_{camera_id}') + + if not all([pipeline, src, queue1, caps_filter, queue2, video_convert, queue3, caps_filter2, queue4, appsink]): + print(f"GStreamer 요소 생성에 실패했습니다 (카메라 ID: {camera_id}). 필요한 플러그인이 설치되었는지 확인하세요.") + return None, None + + # src 설정 + Gst.util_set_object_arg(src, "sensor-id", f"{camera_id}") + Gst.util_set_object_arg(src, "bufapi-version", "true") + Gst.util_set_object_arg(src, "sensor-mode", f"{cam_mode}") + + # 첫번째 capsfilter: NVMM 메모리, 해상도, FPS 설정 + caps_str = f"video/x-raw(memory:NVMM), width=(int){width}, height=(int){height}, framerate=(fraction){fps}/1" + Gst.util_set_object_arg(caps_filter, "caps", caps_str) + + # nvvideoconvert: flip 옵션 설정 + if hflip and vflip: + Gst.util_set_object_arg(video_convert, "flip-method", "2") # 180도 회전 + elif hflip: + Gst.util_set_object_arg(video_convert, "flip-method", "4") # 수평 반전 + elif vflip: + Gst.util_set_object_arg(video_convert, "flip-method", "6") # 수직 반전 + else: + Gst.util_set_object_arg(video_convert, "flip-method", "0") # no flip + + # 두번째 capsfilter: OpenCV와 호환되는 포맷 (BGRx) + caps_str2 = "video/x-raw, format=(string)BGRx" + Gst.util_set_object_arg(caps_filter2, "caps", caps_str2) + + # appsink 설정: 항상 최신 프레임만 보관 (max-buffers=1, drop=True) + appsink.set_property("emit-signals", False) + appsink.set_property("max-buffers", 1) + appsink.set_property("drop", True) + # appsink의 caps를 설정하여 원하는 최종 포맷을 보장 (필수) + appsink_caps = Gst.Caps.from_string(f"video/x-raw, format=(string)BGRx, width=(int){width}, height=(int){height}") + appsink.set_property("caps", appsink_caps) + + + # 요소들을 파이프라인에 추가 + pipeline.add(src) + pipeline.add(queue1) + pipeline.add(caps_filter) + pipeline.add(queue2) + pipeline.add(video_convert) + pipeline.add(queue3) + pipeline.add(caps_filter2) + pipeline.add(queue4) + pipeline.add(appsink) + + + # 요소들을 순차적으로 연결 + if not link_elements(src, queue1, caps_filter, queue2, video_convert, queue3, caps_filter2, queue4, appsink): + print(f"GStreamer 요소들을 연결하지 못했습니다 (카메라 ID: {camera_id}).") + pipeline.set_state(Gst.State.NULL) + return None, None + + print(f"GStreamer pipeline built for camera {camera_id}.") + return pipeline, appsink + except Exception as e: + print(f"파이프라인 생성 중 오류 발생 (카메라 ID: {camera_id}): {e}") + return None, None + + +def run_stereo_capture_loop(left_pipeline, left_appsink, right_pipeline, right_appsink, + left_output_dir, right_output_dir, + img_size, pattern_size): # pattern_size는 미리보기 표시용 + """ + 스테레오 이미지를 실시간으로 캡쳐하고 지정할 때 파일로 저장하는 루프. + """ + window_name = "Stereo Capture Preview (Press 'c' to Save, 'q' or ESC to Quit)" + cv2.namedWindow(window_name, cv2.WINDOW_NORMAL) + cv2.resizeWindow(window_name, img_size[0] * 2, img_size[1]) # 좌우 이미지 합쳐서 보여줌 + + captured_count = 0 + instructions = "Press 'c' to capture, 'q' or ESC to quit." + print("\n--- Starting Stereo Capture ---") + print(instructions) + + + try: + while True: + # 양쪽 appsink에서 최신 프레임을 가져옴 (동기화되지 않을 수 있습니다) + # 정확한 동기화를 위해서는 GStreamer 레벨에서 별도 처리가 필요할 수 있습니다. + # 여기서는 단순히 각 파이프라인에서 준비된 최신 프레임을 가져옵니다. + left_sample = left_appsink.emit("pull-sample") + right_sample = right_appsink.emit("pull-sample") + + if left_sample is None or right_sample is None: + # 프레임이 아직 준비되지 않았거나 오류 발생 + if left_sample is None and right_sample is None: + time.sleep(0.001) + continue # 한쪽이라도 프레임이 있으면 계속 진행 + + # 왼쪽 프레임 처리 + left_buffer = left_sample.get_buffer() + success_l, map_l = left_buffer.map(Gst.MapFlags.READ) + if not success_l: + print("Error mapping left buffer") + left_buffer.unmap(map_l) # 매핑 실패 시에도 언매핑 시도 + continue + try: + # BGRx 포맷에서 BGR로 변환 (원본 이미지) + left_frame_bgr_raw = np.frombuffer(map_l.data, dtype=np.uint8).reshape((img_size[1], img_size[0], 4)) + left_frame_bgr_raw = cv2.cvtColor(left_frame_bgr_raw, cv2.COLOR_BGRA2BGR) + except Exception as e: + print("Left frame processing error:", e) + left_buffer.unmap(map_l) + continue + left_buffer.unmap(map_l) + + # 오른쪽 프레임 처리 + right_buffer = right_sample.get_buffer() + success_r, map_r = right_buffer.map(Gst.MapFlags.READ) + if not success_r: + print("Error mapping right buffer") + right_buffer.unmap(map_r) # 매핑 실패 시에도 언매핑 시도 + continue + try: + # BGRx 포맷에서 BGR로 변환 (원본 이미지) + right_frame_bgr_raw = np.frombuffer(map_r.data, dtype=np.uint8).reshape((img_size[1], img_size[0], 4)) + right_frame_bgr_raw = cv2.cvtColor(right_frame_bgr_raw, cv2.COLOR_BGRA2BGR) + except Exception as e: + print("Right frame processing error:", e) + right_buffer.unmap(map_r) + continue + right_buffer.unmap(map_r) + + + # --- 미리보기 화면 구성 --- + # 미리보기에는 코너 찾기 결과 등을 표시하기 위해 원본 이미지를 복사하여 사용 + left_display = left_frame_bgr_raw.copy() + right_display = right_frame_bgr_raw.copy() + + # 체커보드 검출 (미리보기 표시용 - 원본 이미지에서 빠르게 시도) + # 실제 캘리브레이션 코너 찾기는 stereo_calibrator.py에서 수행 + left_gray_raw = cv2.cvtColor(left_frame_bgr_raw, cv2.COLOR_BGR2GRAY) + right_gray_raw = cv2.cvtColor(right_frame_bgr_raw, cv2.COLOR_BGR2GRAY) + + flags_find_corners = cv2.CALIB_CB_ADAPTIVE_THRESH | cv2.CALIB_CB_NORMALIZE_IMAGE | cv2.CALIB_CB_FAST_CHECK # 미리보기는 빠르게 + ret_left_preview, corners_left_preview = cv2.findChessboardCorners(left_gray_raw, pattern_size, flags_find_corners) + ret_right_preview, corners_right_preview = cv2.findChessboardCorners(right_gray_raw, pattern_size, flags_find_corners) + + if ret_left_preview: + cv2.drawChessboardCorners(left_display, pattern_size, corners_left_preview, ret_left_preview) + if ret_right_preview: + cv2.drawChessboardCorners(right_display, pattern_size, corners_right_preview, ret_right_preview) + + # 코너 찾기 상태 텍스트 오버레이 (미리보기용) + if ret_left_preview and ret_right_preview: + corner_status_text = "Chessboard: FOUND (Press 'c')" + corner_status_color = (0, 255, 0) # Green + else: + corner_status_text = "Chessboard: NOT FOUND" + corner_status_color = (0, 0, 255) # Red + + # 좌/우 이미지를 좌우 결합하여 미리보기 + preview = np.hstack((left_display, right_display)) + + # 텍스트 오버레이 + cv2.putText(preview, instructions, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2) + cv2.putText(preview, f"Captured pairs: {captured_count}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2) + cv2.putText(preview, corner_status_text, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.8, corner_status_color, 2) + + + cv2.imshow(window_name, preview) + + key = cv2.waitKey(1) & 0xFF + + if key == ord('c'): + # 코너 찾기 여부와 무관하게 저장 (필요하다면 코너 찾았을 때만 저장하도록 수정 가능) + # 여기서는 사용자가 원할 때 언제든 캡쳐하도록 구현 + print(f"Capturing pair {captured_count+1}...") + + # !!! 원본 프레임을 파일로 저장 !!! + timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3] # 밀리초까지 포함 + left_filename = os.path.join(left_output_dir, f"left_{timestamp_str}.png") + right_filename = os.path.join(right_output_dir, f"right_{timestamp_str}.png") + + try: + cv2.imwrite(left_filename, left_frame_bgr_raw) # <-- 원본 이미지 저장 + cv2.imwrite(right_filename, right_frame_bgr_raw) # <-- 원본 이미지 저장 + captured_count += 1 + print(f"Saved pair {captured_count}. (Raw images saved)") + except Exception as e: + print(f"Error saving image pair: {e}") + + + elif key == ord('q') or key == 27: # 'q' 또는 ESC + print("Quit signal received. Exiting capture loop.") + break + + except Exception as e: + print(f"An error occurred during the capture loop: {e}") + import traceback + traceback.print_exc() + + finally: + cv2.destroyAllWindows() + print("Capture loop finished.") + + return captured_count # 캡쳐된 이미지 쌍 개수 반환 + + +def main(): + parser = argparse.ArgumentParser(description="스테레오 카메라 캡쳐 시스템 및 선택적 자동 캘리브레이션") + Gst.init(sys.argv) # GStreamer 초기화 + + parser.add_argument('--camera_mode', type=int, default=2, help="카메라 센서 모드 (예: 2)") + parser.add_argument('--hflip', action='store_true', help="수평 반전 활성화") + parser.add_argument('--vflip', action='store_true', help="수직 반전 활성화") + parser.add_argument('--left_camera', type=int, default=1, help="왼쪽 카메라 번호 (sensor-id)") + parser.add_argument('--right_camera', type=int, default=0, help="오른쪽 카메라 번호 (sensor-id)") + parser.add_argument('--width', type=int, default=320, help="이미지/파이프라인 해상도 너비") + parser.add_argument('--height', type=int, default=256, help="이미지/파이프라인 해상도 높이") + parser.add_argument('--fps', type=int, default=10, help="파이프라인 프레임 레이트") + parser.add_argument('--left_output_dir', type=str, default="stereo_calib_images/cam1", help="왼쪽 이미지 저장 디렉터리") + parser.add_argument('--right_output_dir', type=str, default="stereo_calib_images/cam0", help="오른쪽 이미지 저장 디렉터리") + parser.add_argument('--pattern_width', type=int, default=10, help="체커보드 내부 코너 수 (가로 - 미리보기 표시 및 캘리브레이션용)") + parser.add_argument('--pattern_height', type=int, default=7, help="체커보드 내부 코너 수 (세로 - 미리보기 표시 및 캘리브레이션용)") + parser.add_argument('--square_size', type=float, default=0.025, help="체커보드 한 칸의 실제 크기 (미터 단위 - 캘리브레이션용)") + + # --- 자동 캘리브레이션 관련 인자 --- + parser.add_argument('--auto_calibrate', action='store_true', default=True, + help="캡쳐 종료 후 스테레오 캘리브레이션을 자동으로 수행합니다.") + parser.add_argument('--left_intrinsics_yaml', type=str, default='params/intrinsic_param_cam1.yaml', + help="[--auto_calibrate 시 필수] 왼쪽 카메라 intrinsic calibration YAML 파일 경로") + parser.add_argument('--right_intrinsics_yaml', type=str, default="params/intrinsic_param_cam0.yaml", + help="[--auto_calibrate 시 필수] 오른쪽 카메라 intrinsic calibration YAML 파일 경로") + parser.add_argument('--stereo_calib_output_yaml', type=str, default="params/stereo_calibration_results.yaml", + help="[--auto_calibrate 시 사용] 스테레오 캘리브레이션 결과 저장 YAML 파일 경로") + parser.add_argument('--fix_intrinsics', action='store_true', default=True, + help="[--auto_calibrate 시 사용] Stereo calibration 시 intrinsic parameters를 고정 (기본값: True - CALIB_FIX_INTRINSIC). " + "False로 설정 시 intrinsic parameters도 함께 최적화 (CALIB_USE_INTRINSIC_GUESS 사용).") + + + args = parser.parse_args() + + # 출력 디렉터리 생성 + os.makedirs(args.left_output_dir, exist_ok=True) + os.makedirs(args.right_output_dir, exist_ok=True) + print(f"Left images will be saved to: {args.left_output_dir}") + print(f"Right images will be saved to: {args.right_output_dir}") + + img_size = (args.width, args.height) + pattern_size = (args.pattern_width, args.pattern_height) + + # GStreamer 파이프라인 생성 + print("\nBuilding GStreamer pipelines...") + left_pipeline, left_appsink = build_gst_pipeline( + args.left_camera, args.camera_mode, args.hflip, args.vflip, + args.width, args.height, args.fps + ) + right_pipeline, right_appsink = build_gst_pipeline( + args.right_camera, args.camera_mode, args.hflip, args.vflip, + args.width, args.height, args.fps + ) + + if left_pipeline is None or right_pipeline is None: + print("Failed to build one or both GStreamer pipelines. Exiting.") + sys.exit(1) + print("Pipelines built.") + + # 파이프라인 실행 + print("Setting pipelines to PLAYING state...") + left_pipeline.set_state(Gst.State.PLAYING) + right_pipeline.set_state(Gst.State.PLAYING) + + # PLAYING 상태로 전환될 때까지 약간 대기 + time.sleep(1.0) + print("Pipelines are PLAYING.") + + # 캡쳐 루프 실행 + captured_count = run_stereo_capture_loop( + left_pipeline, left_appsink, right_pipeline, right_appsink, # right_pipeline은 루프에서 직접 사용 안 함 + args.left_output_dir, args.right_output_dir, + img_size, pattern_size + ) + + # GStreamer 파이프라인 정리 + print("\nSetting GStreamer pipelines to NULL state...") + if left_pipeline: + left_pipeline.set_state(Gst.State.NULL) + if right_pipeline: + right_pipeline.set_state(Gst.State.NULL) + print("GStreamer pipelines set to NULL state.") + + # --- 캡쳐 종료 후 스테레오 캘리브레이션 자동 실행 --- + if args.auto_calibrate: + print(f"\n--- Capture finished. Automatically starting Stereo Calibration ---") + min_pairs_needed = 5 + # 임포트 성공 여부 및 캡쳐 이미지 수 확인 + if StereoCalibrationCalculator is None: # <-- 임포트 실패했는지 확인 + print("Cannot perform automatic calibration: StereoCalibrationCalculator class not found.") + elif captured_count < min_pairs_needed: + print(f"Not enough captured pairs ({captured_count}) for stereo calibration. Need at least {min_pairs_needed}. Skipping automatic calibration.") + else: + try: + # StereoCalibrationCalculator 객체 생성 및 실행 + # 이 객체는 캡쳐된 이미지가 있는 디렉터리, intrinsic 파일 경로, 체커보드 정보를 사용합니다. + calibrator = StereoCalibrationCalculator( + left_image_dir=args.left_output_dir, + right_image_dir=args.right_output_dir, + left_intrinsics_yaml=args.left_intrinsics_yaml, # 기본값 또는 지정된 경로 사용 + right_intrinsics_yaml=args.right_intrinsics_yaml, # 기본값 또는 지정된 경로 사용 + pattern_width=args.pattern_width, + pattern_height=args.pattern_height, + square_size=args.square_size, + image_width=args.width, + image_height=args.height, + fix_intrinsics=args.fix_intrinsics + ) + + # 캘리브레이션 전체 과정 실행 (로드, 수집, 계산) + calibration_successful = calibrator.calibrate() + + # 결과 저장 + if calibration_successful: # 캘리브레이션 계산까지 성공했으면 + calibrator.save_to_yaml(args.stereo_calib_output_yaml) # 기본값 또는 지정된 경로 사용 + else: + print("Automatic stereo calibration calculation failed.") + + except FileNotFoundError as e: + # StereoCalibrationCalculator 초기화 중 발생한 FileNotFoundError 처리 + print(f"Error during automatic calibration setup: {e}") + print("Please ensure intrinsic YAML files and image directories exist.") + except Exception as e: + print(f"An unexpected error occurred during automatic stereo calibration: {e}") + import traceback + traceback.print_exc() + + else: + print("\nAutomatic stereo calibration is disabled. To run, use --auto_calibrate.") + + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/week11/calibration/stereo_preview.py b/week11/calibration/stereo_preview.py new file mode 100644 index 0000000..1601cdb --- /dev/null +++ b/week11/calibration/stereo_preview.py @@ -0,0 +1,605 @@ +# Filename: stereo_preview.py +# Description: Load stereo calibration results, display rectified stereo stream, and capture rectified images. + +import sys +import os +import time +import argparse +import numpy as np +import cv2 +import yaml +from pathlib import Path +from datetime import datetime # 캡쳐 시간 기록용 + +import gi +gi.require_version('Gst', '1.0') +gi.require_version('GstApp', '1.0') +from gi.repository import Gst, GLib + +# GStreamer 초기화는 main에서 수행합니다. + +# --- Helper Functions (Included for self-containment) --- + +def link_elements(*elements): + """여러 GStreamer 요소를 순차적으로 연결하는 헬퍼 함수.""" + for i in range(len(elements) - 1): + if not elements[i].link(elements[i+1]): + print(f"Failed to link {elements[i].name} to {elements[i+1].name}") + return False + return True + +# --- Stereo Calibration Data Load Function (Included for self-containment) --- +# stereo_calibrator.py에서 사용된 로드 함수와 동일합니다. +def load_stereo_calibration_yaml_standard(yaml_file_path: Path): + """ + 표준 YAML 형식으로 저장된 스테레오 캘리브레이션 결과를 불러옵니다. + """ + if not yaml_file_path.exists(): + print(f"Error: Calibration file not found at {yaml_file_path}") + return None + + try: + with open(yaml_file_path, 'r') as f: + print(f"Attempting to load stereo YAML from {yaml_file_path} using yaml.safe_load...") + calib_data = yaml.safe_load(f) + + # 필요한 데이터 로드 및 numpy 배열로 변환 + # stereo_calibrator.py의 save_to_yaml 메서드에서 저장된 키 이름과 일치해야 합니다. + cameraMatrix1 = np.array(calib_data.get('cameraMatrix1_result')) + distCoeffs1 = np.array(calib_data.get('distCoeffs1_result')) + cameraMatrix2 = np.array(calib_data.get('cameraMatrix2_result')) + distCoeffs2 = np.array(calib_data.get('distCoeffs2_result')) + R = np.array(calib_data.get('R')) + T = np.array(calib_data.get('T')) + R1 = np.array(calib_data.get('R1')) + R2 = np.array(calib_data.get('R2')) + P1 = np.array(calib_data.get('P1')) + P2 = np.array(calib_data.get('P2')) + Q = np.array(calib_data.get('Q')) + img_width = calib_data.get('image_width') + img_height = calib_data.get('image_height') + img_size = (img_width, img_height) + + validPixROI1 = calib_data.get('validPixROI1') # 튜플 또는 List 형태로 저장되어 있다면 그대로 로드 + validPixROI2 = calib_data.get('validPixROI2') # 튜플 또는 List 형태로 저장되어 있다면 그대로 로드 + + # Check if essential data is loaded + if cameraMatrix1 is None or distCoeffs1 is None or cameraMatrix2 is None or distCoeffs2 is None or \ + R1 is None or R2 is None or P1 is None or P2 is None or Q is None or img_width is None or img_height is None: + print("Error: Missing essential calibration data (matrices, image size) in YAML file.") + missing_keys = [k for k in ['cameraMatrix1_result', 'distCoeffs1_result', 'cameraMatrix2_result', 'distCoeffs2_result', + 'R1', 'R2', 'P1', 'P2', 'Q', 'image_width', 'image_height'] if calib_data.get(k) is None] + print(f"Missing keys: {missing_keys}") + return None + + print("Standard YAML calibration data loaded successfully.") + + # 반환 값 구조를 캘리브레이터 클래스에서 저장된 내용과 일치시킵니다. + loaded_data = { + 'cameraMatrix1_result': cameraMatrix1, + 'distCoeffs1_result': distCoeffs1, + 'cameraMatrix2_result': cameraMatrix2, + 'distCoeffs2_result': distCoeffs2, + 'R': R, + 'T': T, + 'R1': R1, + 'R2': R2, + 'P1': P1, + 'P2': P2, + 'Q': Q, + 'image_width': img_width, + 'image_height': img_height, + 'validPixROI1': validPixROI1, # 튜플 그대로 저장 + 'validPixROI2': validPixROI2, # 튜플 그대로 저장 + # 다른 저장된 정보 (reprojection_error, pattern_size, square_size 등)도 필요하면 로드하여 반환 가능 + } + + return loaded_data + + except Exception as e: + print(f"Error loading calibration data from {yaml_file_path}: {e}") + print("This error likely means the YAML file is not in the standard format saved by stereo_calibrator.py.") + return None + + +class StereoPreviewer: + """ + 스테레오 캘리브레이션 결과를 사용하여 실시간 렉티피케이션 미리보기를 제공하고, + 렉티피케이션된 이미지를 캡쳐/저장하는 클래스. + """ + def __init__(self, stereo_calib_yaml: str, left_camera_id: int, right_camera_id: int, + cam_mode: int, hflip: bool, vflip: bool, width: int, height: int, fps: int, + capture_mode: str = 'none', output_rectified_dir_left: str = None, + output_rectified_dir_right: str = None, capture_period_sec: float = 1.0): + """ + StereoPreviewer 클래스를 초기화합니다. + + Args: + stereo_calib_yaml (str): 스테레오 캘리브레이션 결과 YAML 파일 경로. + left_camera_id (int): 왼쪽 카메라 sensor-id. + right_camera_id (int): 오른쪽 카메라 sensor-id. + cam_mode (int): GStreamer 카메라 센서 모드. + hflip (bool): 수평 반전 적용 여부. + vflip (bool): 수직 반전 적용 여부. + width (int): 이미지/파이프라인 해상도 너비 (캘리브레이션 해상도와 일치해야 함). + height (int): 이미지/파이플라인 해상도 높이 (캘리브레이션 해상도와 일치해야 함). + fps (int): 파이프라인 프레임 레이트. + capture_mode (str): 'none', 'manual' ('c' 키), 'periodic' (주기적). 기본값 'none'. + output_rectified_dir_left (str): 렉티피케이션 왼쪽 이미지 저장 디렉터리. + output_rectified_dir_right (str): 렉티피케이션 오른쪽 이미지 저장 디렉터리. + capture_period_sec (float): 'periodic' 모드 시 캡쳐 주기 (초). + """ + self.stereo_calib_yaml = stereo_calib_yaml + self.left_camera_id = left_camera_id + self.right_camera_id = right_camera_id + self.cam_mode = cam_mode + self.hflip = hflip + self.vflip = vflip + self.width = width + self.height = height + self.fps = fps + self.img_size = (width, height) + + self.capture_mode = capture_mode.lower() + self.output_rectified_dir_left = output_rectified_dir_left + self.output_rectified_dir_right = output_rectified_dir_right + self.capture_period_sec = capture_period_sec + + # 캘리브레이션 데이터 저장 변수 + self.calib_data = None + self.map1_left = None + self.map2_left = None + self.map1_right = None + self.map2_right = None + + # GStreamer 파이프라인 및 앱싱크 저장 변수 + self.left_pipeline = None + self.left_appsink = None + self.right_pipeline = None + self.right_appsink = None + + # 캡쳐 디렉터리 생성 (캡쳐 모드일 경우) + if self.capture_mode in ['manual', 'periodic']: + if not self.output_rectified_dir_left or not self.output_rectified_dir_right: + print("Error: Capture mode requires output directories (--output_rectified_dir_left, --output_rectified_dir_right).") + # 이 시점에서 종료하는 대신, run_preview에서 오류를 처리합니다. + self.capture_mode = 'none' # 캡쳐 모드 비활성화 + else: + try: + os.makedirs(self.output_rectified_dir_left, exist_ok=True) + os.makedirs(self.output_rectified_dir_right, exist_ok=True) + print(f"Rectified images will be saved to: {self.output_rectified_dir_left} and {self.output_rectified_dir_right}") + except Exception as e: + print(f"Error creating output directories: {e}. Disabling capture mode.") + self.capture_mode = 'none' # 캡쳐 모드 비활성화 + + + def _load_calibration_data(self): + """스테레오 캘리브레이션 YAML 파일을 로드합니다.""" + # (load_stereo_calibration_yaml_standard 함수는 클래스 외부에 정의되어 있다고 가정) + print("\n--- Loading Stereo Calibration Data ---") + calib_file_path = Path(self.stereo_calib_yaml) + self.calib_data = load_stereo_calibration_yaml_standard(calib_file_path) + + if self.calib_data is None: + print("Failed to load stereo calibration data.") + return False # 로드 실패 + + # 로드된 캘리브레이션 이미지 사이즈와 현재 미리보기 해상도 비교 + img_size_calibrated = (self.calib_data.get('image_width'), self.calib_data.get('image_height')) + if self.img_size != img_size_calibrated: + print(f"Warning: Current preview size ({self.img_size}) does not match calibration file size ({img_size_calibrated}).") + print("Rectification maps will be created for the current preview size, but results may be inaccurate.") + print(f"Consider running the preview with --width {img_size_calibrated[0]} --height {img_size_calibrated[1]}.") + + + print("Stereo calibration data loaded successfully.") + return True # 로드 성공 + + + def _build_pipelines(self): + """양쪽 카메라의 GStreamer 파이프라인을 생성합니다.""" + print("\nBuilding GStreamer pipelines for preview...") + # build_gst_pipeline 함수는 이전에 정의된 helper 함수입니다. + # 여기에 포함시키거나 별도 유틸리티 파일에서 임포트합니다. 포함시키는 것으로 합니다. + + if Gst.ElementFactory.find('nvvideoconvert'): + convert_plugin = 'nvvideoconvert' + deepstream_available = True + elif Gst.ElementFactory.find('nvvidconv'): + convert_plugin = 'nvvidconv' + deepstream_available = False + else: + print("Error: Neither 'nvvideoconvert' nor 'nvvidconv' plugins found! " + "Please install DeepStream or Jetson GStreamer extensions.") + return False + + print(f"🔌 Video converter plugin: {convert_plugin} " + f"(DeepStream available: {deepstream_available})") + + def build_single_pipeline(camera_id, cam_mode, hflip, vflip, width, height, fps): + pipeline_name = f"pipeline_cam{camera_id}_preview" + pipeline = Gst.Pipeline.new(pipeline_name) + src = Gst.ElementFactory.make('nvarguscamerasrc', f'source_{camera_id}_preview') + queue1 = Gst.ElementFactory.make('queue', f'queue1_{camera_id}_preview') + caps_filter = Gst.ElementFactory.make('capsfilter', f'caps_filter_{camera_id}_preview') + queue2 = Gst.ElementFactory.make('queue', f'queue2_{camera_id}_preview') + video_convert = Gst.ElementFactory.make(convert_plugin, f'video_convert_{camera_id}_preview') + queue3 = Gst.ElementFactory.make('queue', f'queue3_{camera_id}_preview') + caps_filter2 = Gst.ElementFactory.make('capsfilter', f'caps_filter2_{camera_id}_preview') + queue4 = Gst.ElementFactory.make('queue', f'queue4_{camera_id}_preview') + appsink = Gst.ElementFactory.make('appsink', f'appsink_{camera_id}_preview') + + if not all([pipeline, src, queue1, caps_filter, queue2, video_convert, queue3, caps_filter2, queue4, appsink]): + print(f"GStreamer 요소 생성 실패 (카메라 ID: {camera_id}).") + return None, None + + Gst.util_set_object_arg(src, "sensor-id", f"{camera_id}") + Gst.util_set_object_arg(src, "bufapi-version", "true") + Gst.util_set_object_arg(src, "sensor-mode", f"{cam_mode}") + + caps_str = f"video/x-raw(memory:NVMM), width=(int){width}, height=(int){height}, framerate=(fraction){fps}/1" + Gst.util_set_object_arg(caps_filter, "caps", caps_str) + + if hflip and vflip: flip_method = "2" + elif hflip: flip_method = "4" + elif vflip: flip_method = "6" + else: flip_method = "0" + Gst.util_set_object_arg(video_convert, "flip-method", flip_method) + + caps_str2 = "video/x-raw, format=(string)BGRx" + Gst.util_set_object_arg(caps_filter2, "caps", caps_str2) + + appsink.set_property("emit-signals", False) + appsink.set_property("max-buffers", 1) + appsink.set_property("drop", True) + appsink_caps = Gst.Caps.from_string(f"video/x-raw, format=(string)BGRx, width=(int){width}, height=(int){height}") + appsink.set_property("caps", appsink_caps) + + pipeline.add(src) + pipeline.add(queue1) + pipeline.add(caps_filter) + pipeline.add(queue2) + pipeline.add(video_convert) + pipeline.add(queue3) + pipeline.add(caps_filter2) + pipeline.add(queue4) + pipeline.add(appsink) + + if not link_elements(src, queue1, caps_filter, queue2, video_convert, queue3, caps_filter2, queue4, appsink): + print(f"GStreamer 요소 연결 실패 (카메라 ID: {camera_id}).") + pipeline.set_state(Gst.State.NULL) + return None, None + + # print(f"GStreamer pipeline built for camera {camera_id}.") + return pipeline, appsink + + + self.left_pipeline, self.left_appsink = build_single_pipeline( + self.left_camera_id, self.cam_mode, self.hflip, self.vflip, + self.width, self.height, self.fps + ) + self.right_pipeline, self.right_appsink = build_single_pipeline( + self.right_camera_id, self.cam_mode, self.hflip, self.vflip, + self.width, self.height, self.fps + ) + + if self.left_pipeline is None or self.right_pipeline is None: + print("Failed to build one or both GStreamer pipelines for preview.") + return False # 파이프라인 빌드 실패 + print("Pipelines built successfully.") + return True # 파이프라인 빌드 성공 + + + def _start_pipelines(self): + """양쪽 GStreamer 파이프라인을 PLAYING 상태로 시작합니다.""" + if self.left_pipeline is None or self.right_pipeline is None: + print("Error: Pipelines not built. Cannot start.") + return False + + print("Setting pipelines to PLAYING state...") + ret_left = self.left_pipeline.set_state(Gst.State.PLAYING) + ret_right = self.right_pipeline.set_state(Gst.State.PLAYING) + + if ret_left == Gst.StateChangeReturn.FAILURE or ret_right == Gst.StateChangeReturn.FAILURE: + print("Error: Failed to set one or both pipelines to PLAYING state.") + return False + + # PLAYING 상태로 전환될 때까지 약간 대기 + time.sleep(1.0) + print("Pipelines are PLAYING.") + return True + + def _stop_pipelines(self): + """양쪽 GStreamer 파이프라인을 NULL 상태로 중지하고 해제합니다.""" + print("\nSetting GStreamer pipelines to NULL state...") + if self.left_pipeline: + self.left_pipeline.set_state(Gst.State.NULL) + self.left_pipeline = None + self.left_appsink = None + if self.right_pipeline: + self.right_pipeline.set_state(Gst.State.NULL) + self.right_pipeline = None + self.right_appsink = None + print("GStreamer pipelines set to NULL state.") + + def _create_rectification_maps(self): + """로드된 캘리브레이션 데이터로 렉티피케이션 매핑 테이블을 생성합니다.""" + if self.calib_data is None: + print("Error: Calibration data not loaded. Cannot create rectification maps.") + return False + + # calib_data에서 필요한 값 추출 + cameraMatrix1 = self.calib_data.get('cameraMatrix1_result') + distCoeffs1 = self.calib_data.get('distCoeffs1_result') + cameraMatrix2 = self.calib_data.get('cameraMatrix2_result') + distCoeffs2 = self.calib_data.get('distCoeffs2_result') + R1 = self.calib_data.get('R1') + P1 = self.calib_data.get('P1') + R2 = self.calib_data.get('R2') + P2 = self.calib_data.get('P2') + # 맵 생성은 현재 미리보기 해상도 (self.img_size) 기준으로 이루어집니다. + + if cameraMatrix1 is None or distCoeffs1 is None or cameraMatrix2 is None or distCoeffs2 is None or \ + R1 is None or P1 is None or R2 is None or P2 is None: + print("Error: Missing essential calibration data for map creation.") + return False + + print("Creating rectification maps for size:", self.img_size) + try: + # cv2.initUndistortRectifyMap(cameraMatrix, distCoeffs, R, P, newImageSize, m1type) + self.map1_left, self.map2_left = cv2.initUndistortRectifyMap( + cameraMatrix1, distCoeffs1, R1, P1, self.img_size, cv2.CV_32FC1 + ) + self.map1_right, self.map2_right = cv2.initUndistortRectifyMap( + cameraMatrix2, distCoeffs2, R2, P2, self.img_size, cv2.CV_32FC1 + ) + print("Rectification maps created.") + return True # 맵 생성 성공 + except Exception as e: + print(f"Error creating rectification maps: {e}") + return False # 맵 생성 실패 + + def _save_rectified_pair(self, left_img, right_img): + """렉티피케이션된 이미지 쌍을 파일로 저장합니다.""" + if not self.output_rectified_dir_left or not self.output_rectified_dir_right: + print("Error: Output directories not specified for saving rectified images.") + return False # 저장 실패 + + timestamp_str = datetime.now().strftime('%Y%m%d_%H%M%S_%f')[:-3] # 밀리초까지 포함 + left_filename = os.path.join(self.output_rectified_dir_left, f"rectified_left_{timestamp_str}.png") + right_filename = os.path.join(self.output_rectified_dir_right, f"rectified_right_{timestamp_str}.png") + + try: + cv2.imwrite(left_filename, left_img) + cv2.imwrite(right_filename, right_img) + print(f"Saved rectified pair: {os.path.basename(left_filename)}, {os.path.basename(right_filename)}") + return True # 저장 성공 + except Exception as e: + print(f"Error saving rectified image pair: {e}") + return False # 저장 실패 + + + def run_preview_loop(self): + """실시간으로 렉티피케이션된 스테레오 미리보기를 보여주고 이미지를 캡쳐/저장하는 루프.""" + if self.left_appsink is None or self.right_appsink is None: + print("Error: Appsinks not available. Cannot run preview loop.") + return + + if self.map1_left is None or self.map1_right is None: + print("Error: Rectification maps not created. Cannot run preview.") + return + + print(f"\n--- Starting Rectified Stream Preview ({self.capture_mode.capitalize()} capture) ---") + print("Press 'q' or ESC to quit.") + if self.capture_mode == 'manual': + print("Press 'c' to capture a rectified image pair.") + elif self.capture_mode == 'periodic': + print(f"Capturing rectified images every {self.capture_period_sec:.2f} seconds.") + print(f"Output directories: {self.output_rectified_dir_left}, {self.output_rectified_dir_right}") + + + window_name_rect = "Rectified Stereo Preview" + cv2.namedWindow(window_name_rect, cv2.WINDOW_NORMAL) + cv2.resizeWindow(window_name_rect, self.img_size[0] * 2, self.img_size[1]) # 좌우 이미지 합쳐서 보여줌 + + # validPixROI 정보를 calib_data에서 가져옵니다. + validPixROI1 = self.calib_data.get('validPixROI1') + validPixROI2 = self.calib_data.get('validPixROI2') + + last_capture_time = time.time() # 주기적 캡쳐를 위한 타이머 초기화 + captured_count = 0 # 캡쳐된 이미지 쌍 카운트 + + try: + while True: + # 양쪽 appsink에서 최신 프레임을 가져옴 + left_sample = self.left_appsink.emit("pull-sample") + right_sample = self.right_appsink.emit("pull-sample") + + if left_sample is None or right_sample is None: + if left_sample is None and right_sample is None: + time.sleep(0.001) + continue + + # GStreamer buffer에서 OpenCV 이미지로 변환 (BGRx -> BGR) + try: + # 왼쪽 프레임 + left_buffer = left_sample.get_buffer() + success_l, map_l = left_buffer.map(Gst.MapFlags.READ) + if not success_l: print("Error mapping left buffer"); left_buffer.unmap(map_l); continue + left_frame_bgr = np.frombuffer(map_l.data, dtype=np.uint8).reshape((self.img_size[1], self.img_size[0], 4)) + left_frame_bgr = cv2.cvtColor(left_frame_bgr, cv2.COLOR_BGRA2BGR) + left_buffer.unmap(map_l) + + # 오른쪽 프레임 + right_buffer = right_sample.get_buffer() + success_r, map_r = right_buffer.map(Gst.MapFlags.READ) + if not success_r: print("Error mapping right buffer"); right_buffer.unmap(map_r); continue + right_frame_bgr = np.frombuffer(map_r.data, dtype=np.uint8).reshape((self.img_size[1], self.img_size[0], 4)) + right_frame_bgr = cv2.cvtColor(right_frame_bgr, cv2.COLOR_BGRA2BGR) + right_buffer.unmap(map_r) + + except Exception as e: + print(f"Error processing GStreamer buffer: {e}") + continue + + + # --- 이미지 렉티피케이션 적용 --- + rectified_left = cv2.remap(left_frame_bgr, self.map1_left, self.map2_left, cv2.INTER_LINEAR) + rectified_right = cv2.remap(right_frame_bgr, self.map1_right, self.map2_right, cv2.INTER_LINEAR) + + # 유효 픽셀 영역 표시 (선택 사항) + # if validPixROI1 and validPixROI2: + # x1, y1, w1, h1 = validPixROI1 if isinstance(validPixROI1, tuple) else tuple(validPixROI1) + # x2, y2, w2, h2 = validPixROI2 if isinstance(validPixROI2, tuple) else tuple(validPixROI2) + # cv2.rectangle(rectified_left, (x1, y1), (x1+w1, y1+h1), (0, 0, 255), 2) + # cv2.rectangle(rectified_right, (x2, y2), (x2+w2, y2+h2), (255, 0, 0), 2) + + # 렉티피케이션된 이미지 합쳐서 미리보기 + preview_rectified = np.hstack((rectified_left, rectified_right)) + + # 텍스트 오버레이 + cv2.putText(preview_rectified, "Rectified Stream (Press 'q' or ESC to quit)", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2) + if self.capture_mode != 'none': + cv2.putText(preview_rectified, f"Captured: {captured_count}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2) + if self.capture_mode == 'periodic': + cv2.putText(preview_rectified, f"Period: {self.capture_period_sec:.2f}s", (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2) + + + cv2.imshow(window_name_rect, preview_rectified) + + # --- 이미지 캡쳐 로직 --- + current_time = time.time() + trigger_capture = False + + if self.capture_mode == 'manual': + key = cv2.waitKey(1) & 0xFF + if key == ord('c'): + trigger_capture = True + elif key == ord('q') or key == 27: # 'q' 또는 ESC + print("Quit signal received. Exiting rectified stream.") + break # 루프 종료 + elif self.capture_mode == 'periodic': + key = cv2.waitKey(1) & 0xFF # 주기 모드에서도 키 입력 확인 + if key == ord('q') or key == 27: # 'q' 또는 ESC + print("Quit signal received. Exiting rectified stream.") + break # 루프 종료 + + if (current_time - last_capture_time) >= self.capture_period_sec: + trigger_capture = True + last_capture_time = current_time # 타이머 리셋 + + # 캡쳐 트리거 발생 시 이미지 저장 + if trigger_capture: + if self._save_rectified_pair(rectified_left, rectified_right): + captured_count += 1 + + + except Exception as e: + print(f"An error occurred during the rectified stream loop: {e}") + import traceback + traceback.print_exc() + + finally: + cv2.destroyAllWindows() + print("Rectified stream preview ended.") + + + def run_preview(self): + """ + 미리보기 실행의 전체 과정을 조율합니다. + """ + print(f"--- Starting Stereo Preview Process ---") + + # 1. 캘리브레이션 데이터 로드 + if not self._load_calibration_data(): + print("Failed to load calibration data. Preview aborted.") + return # 전체 프로세스 실패 + + # 2. GStreamer 파이프라인 빌드 및 시작 + if not self._build_pipelines(): + print("Failed to build pipelines. Preview aborted.") + return # 전체 프로세스 실패 + + if not self._start_pipelines(): + print("Failed to start pipelines. Preview aborted.") + self._stop_pipelines() + return # 전체 프로세스 실패 + + # 3. 렉티피케이션 맵 생성 + if not self._create_rectification_maps(): + print("Failed to create rectification maps. Preview aborted.") + self._stop_pipelines() + return # 전체 프로세스 실패 + + # 4. 미리보기 루프 실행 및 캡쳐 (루프 안에 캡쳐 로직 포함) + self.run_preview_loop() + + # 5. 파이프라인 중지 + self._stop_pipelines() + + print("\nStereo Preview Process Completed.") + + +def main(): + """ + 스크립트 단독 실행 시 스테레오 렉티피케이션 미리보기를 수행하며 선택적으로 이미지를 캡쳐하는 main 함수. + """ + parser = argparse.ArgumentParser(description="스테레오 카메라 렉티피케이션 미리보기 및 이미지 캡쳐") + Gst.init(sys.argv) # GStreamer 초기화 + + parser.add_argument('--stereo_calib_yaml', type=str, default='params/stereo_calibration_results.yaml', + help="스테레오 캘리브레이션 결과 YAML 파일 경로") + parser.add_argument('--left_camera', type=int, default=1, help="왼쪽 카메라 번호 (sensor-id)") + parser.add_argument('--right_camera', type=int, default=0, help="오른쪽 카메라 번호 (sensor-id)") + # 미리보기 해상도는 캘리브레이션 해상도와 일치해야 합니다. + parser.add_argument('--width', type=int, default=320, help="미리보기/파이프라인 해상도 너비 (캘리브레이션 해상도와 일치해야 함)") + parser.add_argument('--height', type=int, default=256, help="미리보기/파이프라인 해상도 높이 (캘리브레이션 해상도와 일치해야 함)") + parser.add_argument('--fps', type=int, default=15, help="파이프라인 프레임 레이트") + parser.add_argument('--camera_mode', type=int, default=2, help="카메라 센서 모드 (예: 2)") # 필요한 경우 + parser.add_argument('--hflip', action='store_true', help="수평 반전 활성화") # 필요한 경우 + parser.add_argument('--vflip', action='store_true', help="수직 반전 활성화") # 필요한 경우 + + + # --- 렉티피케이션 이미지 캡쳐 관련 인자 --- + parser.add_argument('--capture_mode', type=str, default='manual', + choices=['none', 'manual', 'periodic'], + help="렉티피케이션 이미지 캡쳐 모드: 'none' (캡쳐 안함), 'manual' ('c' 키), 'periodic' (주기적).") + parser.add_argument('--output_rectified_dir_left', type=str, default='rect_images_0', + help="[capture_mode 'manual' 또는 'periodic' 시 필수] 렉티피케이션 왼쪽 이미지 저장 디렉터리.") + parser.add_argument('--output_rectified_dir_right', type=str, default='rect_images_1', + help="[capture_mode 'manual' 또는 'periodic' 시 필수] 렉티피케이션 오른쪽 이미지 저장 디렉터리.") + parser.add_argument('--capture_period_sec', type=float, default=0.3, + help="[capture_mode 'periodic' 시 사용] 이미지 캡쳐 주기 (초).") + + + args = parser.parse_args() + + # 캡쳐 모드 시 출력 디렉터리 필수 확인은 클래스 __init__에서 수행합니다. + # 여기서 기본값을 지정할 수도 있습니다. (예: --output_rectified_dir_left calib_rectified_cam0) + + # StereoPreviewer 객체 생성 + previewer = StereoPreviewer( + stereo_calib_yaml=args.stereo_calib_yaml, + left_camera_id=args.left_camera, + right_camera_id=args.right_camera, + cam_mode=args.camera_mode, + hflip=args.hflip, + vflip=args.vflip, + width=args.width, + height=args.height, + fps=args.fps, + capture_mode=args.capture_mode, # 캡쳐 관련 인자 전달 + output_rectified_dir_left=args.output_rectified_dir_left, + output_rectified_dir_right=args.output_rectified_dir_right, + capture_period_sec=args.capture_period_sec + ) + + # 미리보기 실행 (캡쳐 로직 포함) + previewer.run_preview() + + print("\nStereo Preview and/or Capture process finished.") + + +if __name__ == "__main__": + main() \ No newline at end of file