diff --git a/beneuro_pose_estimation/anipose/aniposeTools.py b/beneuro_pose_estimation/anipose/aniposeTools.py index d6e75e7..5ad05f7 100644 --- a/beneuro_pose_estimation/anipose/aniposeTools.py +++ b/beneuro_pose_estimation/anipose/aniposeTools.py @@ -1,14 +1,12 @@ """ Module to carry out Anipose operations TBD: -- test evaluate_reprojection -- check logging +- needs checking after the last changes ------------------------------- conda activate bnp --> in dev: -python -m beneuro_pose_estimation.cli pose-estimation --sessions session_name(s) --> after package installation: -pose pose-estimation --sessions session_name(s) +bnp init # to create the .env file +bnp pose session_name(s) + """ @@ -35,6 +33,8 @@ import toml from anipose.compute_angles import compute_angles from aniposelib.cameras import CameraGroup +from beneuro_pose_estimation.config import _load_config +config = _load_config() def evaluate_reprojection(reprojection_path, predictions_2D_dir, histogram_path=None): @@ -91,7 +91,7 @@ def get_most_recent_calib(session): # Iterate over all calibration folders and extract timestamps calib_folders = [] - calib_vid_dir = Path(params.calib_vid_dir) + calib_vid_dir = config.calibration_videos for folder in calib_vid_dir.iterdir(): if folder.is_dir(): try: @@ -128,8 +128,8 @@ def get_most_recent_calib(session): calib_file_name = Path( f"calibration_{calib_datetime.strftime('%Y_%m_%d_%H_%M')}.toml" ) - calib_file_path = Path(params.calibration_dir) / calib_file_name - print(calib_file_path) + calib_file_path = config.calibration / calib_file_name + logging.debug(str(calib_file_path)) # Create calibration configuration if it doesn't exist if not calib_file_path.exists(): get_calib_file(recent_calib_folder, calib_file_path) @@ -140,7 +140,7 @@ def get_most_recent_calib(session): return calib_file_path -def get_calib_file(calib_videos_dir, calib_save_path, board=params.board): +def get_calib_file(calib_videos_dir, calib_save_path = None, board=params.board): """ Generates calibration file using ChArUco board videos. - get most recent calibration @@ -161,19 +161,20 @@ def get_calib_file(calib_videos_dir, calib_save_path, board=params.board): calib_videos_dir = next( calib_videos_dir.iterdir(), None ) # might want to change this - video_files = os.listdir(calib_videos_dir) + video_files = list(calib_videos_dir.iterdir()) cam_names, vidnames = [], [] reversed_mapping = {v: k for k, v in params.camera_name_mapping.items()} for video_file in video_files: - if video_file.endswith(".avi") or video_file.endswith(".mp4"): - # cam_name = "_".join(video_file.split('_')[:2]) - camera = Path(video_file).stem + if video_file.suffix in [".avi", ".mp4"]: # Check file extension + camera = video_file.stem if camera == "Camera_3": continue cam_name = reversed_mapping.get(camera, camera) - vidnames.append([f"{calib_videos_dir}/{video_file}"]) + vidnames.append([str(video_file)]) # Convert to str if required by downstream methods cam_names.append(cam_name) + if calib_save_path is None: + calib_save_path = config.calibration / "calibration.toml" # Initialize and configure CharucoBoard and CameraGroup cgroup = CameraGroup.from_names(cam_names, fisheye=params.fisheye) cgroup.calibrate_videos(vidnames, board) @@ -185,8 +186,8 @@ def get_calib_file(calib_videos_dir, calib_save_path, board=params.board): def convert_2Dpred_to_h5( sessions, cameras=params.default_cameras, - input_dir=params.predictions_dir, - output_dir=params.complete_projects_dir, + input_dir=None, + output_dir=None, ): """ Converts .slp.predictions.slp files to .h5 analysis files for each session and camera. @@ -195,14 +196,19 @@ def convert_2Dpred_to_h5( sessions = [sessions] if isinstance(cameras, str): cameras = [cameras] + if input_dir is None: + input_dir = config.predictions2D + if output_dir is None: + output_dir = config.predictions3D + for session in sessions: - session_dir = f"{output_dir}/{session}" - os.makedirs(session_dir, exist_ok=True) + session_dir = output_dir/session + session_dir.mkdir(parents=True, exist_ok=True) for camera in cameras: - input_file = f"{input_dir}/{session}_{camera}.slp.predictions.slp" - os.makedirs(f"{session_dir}/{camera}", exist_ok=True) - output_file = f"{session_dir}/{camera}/{session}_{camera}.analysis.h5" - if os.path.exists(output_file): + input_file = input_dir/f"{session}_{camera}.slp.predictions.slp" + output_file = session_dir/camera/f"{session}_{camera}.analysis.h5" + output_file.parent.mkdir(parents=True, exist_ok=True) + if output_file.exists(): logging.info(f"Output file {output_file} already exists. Skipping...") else: try: @@ -437,42 +443,42 @@ def extract_date(session_name): def run_pose_estimation( sessions, - log_file=None, - projects_dir=params.complete_projects_dir, + projects_dir = None, videos_folder=None, - eval=False, + eval=False ): """ Main routing from videos to 3D keypoints and angles. """ - # set_logging(log_file) + if isinstance(sessions, str): sessions = [sessions] + if projects_dir is None: + projects_dir = config.predictions3D for session in sessions: logging.info(f"Running pose estimation on {session}") - project_dir = f"{projects_dir}/{session}" - os.makedirs(project_dir, exist_ok=True) + project_dir = projects_dir / session + project_dir.mkdir(parents=True, exist_ok=True) sleapTools.get_2Dpredictions(session, input_file=videos_folder) - convert_2Dpred_to_h5(session) + convert_2Dpred_to_h5(session) ############################################### - # calib_file_path = get_most_recent_calib("M045_2024_11_20_11_35") calib_file_path = get_most_recent_calib(session) compute_3Dpredictions( session, calib_file_path=calib_file_path, project_dir=project_dir, eval=eval ) - labels_fname = f"{project_dir}/{session}_3d_predictions.csv" + labels_fname = project_dir/f"{session}_3d_predictions.csv" save_to_csv( session, - f"{project_dir}/{session}_pose_estimation_combined.h5", + project_dir/f"{session}_pose_estimation_combined.h5", labels_fname, ) - config_path = f"{project_dir}/config.toml" - if not os.path.exists(config_path): + config_path = project_dir/"config.toml" + if not config_path.exists(): config_path = create_config_file(config_path) config = toml.load(config_path) - angles_csv = f"{project_dir}/{session}_angles.csv" + angles_csv = project_dir/f"{session}_angles.csv" labels_data = pd.read_csv(labels_fname) - print(labels_data.columns) + logging.debug(labels_data.columns) compute_angles(config, labels_fname, angles_csv) logging.info(f"Pose estimation completed for {session}") pose_data = pd.read_csv(labels_fname) @@ -480,7 +486,7 @@ def run_pose_estimation( # Combine pose data and angles data combined_data = pd.concat([pose_data, angles_data], axis=1) - combined_csv = f"{project_dir}/{session}_pose_and_angles.csv" + combined_csv = project_dir/f"{session}_pose_and_angles.csv" # Save the updated CSV combined_data.to_csv(combined_csv, index=False) logging.info(f"Angles computed and combined CSV saved at {combined_csv}") diff --git a/beneuro_pose_estimation/cli.py b/beneuro_pose_estimation/cli.py index 81b8b5e..4355ad9 100644 --- a/beneuro_pose_estimation/cli.py +++ b/beneuro_pose_estimation/cli.py @@ -1,15 +1,15 @@ from pathlib import Path import typer - +from typing import List, Optional from rich import print from beneuro_pose_estimation import params, set_logging from beneuro_pose_estimation.config import _check_config, _get_package_path, \ _check_is_git_track, _check_root, _get_env_path -from beneuro_pose_estimation.sleap.sleapTools import annotate_videos +# from beneuro_pose_estimation.sleap.sleapTools import annotate_videos, get_2Dpredictions from beneuro_pose_estimation.update_bnp import check_for_updates, update_bnp - +from pathlib import Path # Create a Typer app app = typer.Typer( add_completion=False, # Disable the auto-completion options @@ -22,29 +22,75 @@ @app.command() def annotate( - session_name: str = typer.Argument(..., help="Session name to annotate"), + session: str = typer.Argument(..., help="Session name to annotate"), camera: str = typer.Argument(..., help=f"Camera name to annotate. Must be part of {params.default_cameras}"), - pred: bool = typer.Option(True, "--pred/--no-pred", help="Run annotation on prediction or not.", ), + pred: bool = typer.Option(True, "--pred/--no-pred", help="Run annotation on prediction or not." ) ): """ - Annotate sleap project + Create annotation project for the session if it doesn't exist and launch annotation GUI. """ + from beneuro_pose_estimation.sleap.sleapTools import annotate_videos annotate_videos( - sessions=session_name, + sessions=session, cameras=camera, pred=pred) return -def create_annotation_project(): +@app.command() +def create_annotation_projects( + sessions: List[str] = typer.Argument( + ..., help="Session name(s) to annotate. Provide as a single session name or a list of session names." + ), + cameras: List[str] = typer.Option( + None, "--cameras", "-c", help=f"Camera name(s) to annotate. Provide as a single camera name or a list of camera names. Defaults to {params.default_cameras} if not specified." + ), + pred: bool = typer.Option(True, "--pred/--no-pred", help="Run annotation on prediction or not." ) + ): + """ + Create annotation projects for a list of sessions and cameras without launching the GUI. + """ + from beneuro_pose_estimation.sleap.sleapTools import create_annotation_projects + create_annotation_projects(sessions, cameras,pred) return -def run_pose_estimation(): +@app.command() +def pose( + sessions: List[str] = typer.Argument( + ..., help="Session name(s) to run pose estimation on. Provide as a single session name or a list of session names." + ) +): + from beneuro_pose_estimation.anipose.aniposeTools import run_pose_estimation + + run_pose_estimation(sessions) + return -def get_2d_predictions(): +@app.command() +def track_2d( + sessions: List[str] = typer.Argument( + ..., help="Session name(s) to track. Provide as a single session name or a list of session names." + ), + cameras: List[str] = typer.Option( + None, "--cameras", "-c", help=f"Camera name(s) to track. Provide as a single camera name or a list of camera names. Defaults to {params.default_cameras} if not specified." + ), +): + """ + Get 2D predictions for a list of sessions and cameras + """ + if cameras is None: + cameras = params.default_cameras + logger.info(f"No cameras specified. Predictions will be run on all default cameras: {params.default_cameras}") + from beneuro_pose_estimation.sleap.sleapTools import get_2Dpredictions + get_2Dpredictions( + sessions=sessions, + cameras = cameras + + ) + return + # =================================== Updating ========================================== diff --git a/beneuro_pose_estimation/config.py b/beneuro_pose_estimation/config.py index 61c2af1..b8213e1 100644 --- a/beneuro_pose_estimation/config.py +++ b/beneuro_pose_estimation/config.py @@ -3,6 +3,7 @@ """ from pathlib import Path + from rich import print @@ -75,6 +76,21 @@ def load_env(self, env_path: Path): setattr(self, key, Path(value)) def assign_paths(self): + self.recordings_remote = self.REMOTE_PATH / "raw" + self.annotation_party = self.REMOTE_PATH / "processed" / "AnnotationParty" + self.annotations = self.annotation_party / "annotations" + self.models = ( + self.REMOTE_PATH / "raw" / "pose-estimation" / "models" / "h1_new_setup" + ) + self.skeleton_path = ( + self.REPO_PATH / "beneuro_pose_estimation" / "sleap" / "skeleton.json" + ) + self.recordings = self.annotation_party # can change to self.recordings_local + self.predictions2D = self.LOCAL_PATH / "predictions2D" + self.training = self.REMOTE_PATH / "pose-estimation" / "models" / "uren_setup" + self.predictions3D = self.LOCAL_PATH / "predictions3D" + self.calibration_videos = self.REMOTE_PATH / "raw" / "calibration_videos" + self.calibration = self.LOCAL_PATH / "calibration_config" return diff --git a/beneuro_pose_estimation/params.py b/beneuro_pose_estimation/params.py index 1dbdfd9..fc70ac6 100644 --- a/beneuro_pose_estimation/params.py +++ b/beneuro_pose_estimation/params.py @@ -2,40 +2,10 @@ from sleap.info.feature_suggestions import ( FeatureSuggestionPipeline, ) +import cv2 -############### CONFIGURATIONS -######### PATHS - -repo_dir = r"C:\repos-windows\beneuro_pose_estimation" -recordings_dir = r"Z:\live\raw" -# file format: "M043/M043_2024_10_23_11_15/M043_2024_10_23_11_15_cameras/M043_2024_10_23_11_15_camera_1.avi" -projects_dir = "/home/il620/beneuro_pose_estimation/projects" # ? - -## SLEAP paths -slp_annotations_dir = r"C:\repos-windows\beneuro_pose_estimation\projects\annotations" -slp_training_dir = r"C:\repos-windows\beneuro_pose_estimation\projects\training" -predictions_dir = r"C:\repos-windows\beneuro_pose_estimation\projects\predictions" # 2D -slp_models_dir = r"Z:\live\raw\pose-estimation\models\h1_new_setup" # will change this -slp_training_config_path = r"Z:\live\raw\pose-estimation\models\h1_new_setup" - -skeleton_path = rf"{repo_dir}\beneuro_pose_estimation\sleap\skeleton.json" -predicition_eval_dir = ( - r"C:\repos-windows\beneuro_pose_estimation\projects\predictions\evaluation" -) - -# input_2Dpred = slp_annotations_dir # can be recordings_dir or projects_dir or slp_annotations_dir -input_2Dpred = recordings_dir - -## Anipose paths -# path to 3D pose estimation directory -complete_projects_dir = ( - r"C:\repos-windows\beneuro_pose_estimation\projects\complete_projects" -) -# path to calibration videos directory -calib_vid_dir = r"Z:\live\raw\pose-estimation\calibration-videos" # ? -# path to the calibration output file directory -calibration_dir = f"{projects_dir}\calibrations" +############### CONFIGURATIONS #### CAMERAS default_cameras = [ @@ -60,26 +30,20 @@ #### SLEAP config -## SLEAP annotation -sessions_to_annotate = [] +## SLEAP annotation parameters frame_selection_pipeline = FeatureSuggestionPipeline( - per_video=50, + per_video=150, scale=0.25, sample_method="stride", - feature_type="hog", + feature_type="hog", # or brisk brisk_threshold=10, n_components=10, n_clusters=10, - per_cluster=5, + per_cluster=15, ) -## SLEAP training - -training_sessions = [] -## SLEAP 2D predictions -sessions_to_predict = [] -# SLEAP tracking +## SLEAP tracking frames_to_predict = None tracking_options = None @@ -149,9 +113,12 @@ [14, 15], ] -board = CharucoBoard( - 5, 4, square_length=10, marker_length=6, marker_bits=4, dict_size=250 -) +aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_250) + +board = cv2.aruco.CharucoBoard((5, 4), 10, 6, aruco_dict) +# board = CharucoBoard( +# 5, 4, square_length=10, marker_length=6, marker_bits=4, dict_size=250 +# ) fisheye = False diff --git a/beneuro_pose_estimation/sleap/sleapTools.py b/beneuro_pose_estimation/sleap/sleapTools.py index 02c02cc..d56e5fe 100644 --- a/beneuro_pose_estimation/sleap/sleapTools.py +++ b/beneuro_pose_estimation/sleap/sleapTools.py @@ -1,43 +1,21 @@ """ Module for SLEAP processing TBD: -- test evaluation functions - - - --> define paths in repo_path/beneuro_pose_estimation/params.py - --> to run as package (in dev stage): +- training and model evaluation might be better done from GUI +--------------------------------------------------------------- conda activate bnp -cd repo_path - -python -m beneuro_pose_estimation.cli annotate --sessions session_name --cameras camera_name --pred (to launch annotation GUI to annotate; -(if --pred, predictions are run on the annotation videos using the current model so anotations can be made by correcting predictions) -python -m beneuro_pose_estimation.cli predict-2D --sessions session_name --cameras camera_name (to get 2D predictions) -python -m beneuro_pose_estimation.cli visualize-2D --sessions session_name --cameras camera_name (to launch annotation GUI to visualize predictions) -python -m beneuro_pose_estimation.cli create-annotations --sessions session_name --cameras camera_name (to create annotation projects using frame selection pipeline without launching the GUI) -python -m beneuro_pose_estimation.cli train --sessions session_name --cameras camera_name (to train models) -# cameras argument optional - default = params.default_cameras - --> after package installation: -conda activate bnp - -bnp annotate --sessions session_name --cameras camera_name (to launch annotation GUI to annotate) -bnp predict-2D --sessions session_name --cameras camera_name (to get 2D predictions) -bnp visualize-2D --sessions session_name --cameras camera_name (to launch annotation GUI to visualize predictions) -bnp create-annotation-projects --sessions session_name --cameras camera_name (to create annotation projects using frame selection pipeline without launching the GUI) -bnp train --sessions session_name --cameras camera_name (to train models) -# cameras argument optional - default = params.default_cameras +bnp init (to create .env file) +bnp annotate session_name camera_name --pred/--no-pred (to launch annotation GUI to annotate) +bnp track-2d session_name(s) --cameras camera_name(s)(to get 2D predictions) +bnp visualize-2d session_name camera_name (to launch annotation GUI to visualize predictions) - add to cli """ +import json import logging import os import subprocess - -from beneuro_pose_estimation import params, set_logging - -import json +from pathlib import Path import cv2 import matplotlib.pyplot as plt @@ -45,30 +23,36 @@ import pandas as pd import seaborn as sns import sleap -from sleap import Instance, LabeledFrame, Labels, Skeleton, Video +from sleap import Instance, LabeledFrame, Labels, Skeleton, Video, load_file from sleap.io.video import Video +from beneuro_pose_estimation import params, set_logging +from beneuro_pose_estimation.config import _load_config + +config = _load_config() + logger = set_logging(__name__) + def compare_models(models_folder, test_gt_path=None): """ TBD - test """ metrics_list = [] + models_folder = Path(models_folder) - for folder in os.listdir(models_folder): - model_folder = os.path.join(models_folder, folder) - if os.path.isdir(model_folder): + for folder in models_folder.iterdir(): + if folder.is_dir(): try: # Load and evaluate model if test_gt_path is not None: - predictor = sleap.load_model(model_folder) + predictor = sleap.load_model(folder) labels_gt = sleap.load_file(test_gt_path) labels_pr = predictor.predict(labels_gt) metrics = sleap.nn.evals.evaluate(labels_gt, labels_pr) else: - metrics = sleap.load_metrics(model_folder, split="val") + metrics = sleap.load_metrics(folder, split="val") # Flatten metrics into a single row for DataFrame metrics_flat = { @@ -84,7 +68,9 @@ def compare_models(models_folder, test_gt_path=None): except Exception as e: print(f"Error evaluating model in folder {folder}: {e}") - output_csv = f"{models_folder}/metrics.csv" + + # use Path instead of strings + output_csv = models_folder / "metrics.csv" # Create DataFrame from collected metrics metrics_df = pd.DataFrame(metrics_list) metrics_df.to_csv(output_csv, index=False) @@ -131,6 +117,7 @@ def evaluate_model(model_path, test_gt_path=None): TBD - test """ + model_path = Path(model_path) if test_gt_path is not None: predictor = sleap.load_model(model_path) labels_gt = sleap.load_file(test_gt_path) @@ -203,23 +190,33 @@ def select_frames_to_annotate( # Define input video path animal = session.split("_")[0] # video_path = f"{params.recordings_dir}/{animal}/{session}/{session}_cameras/{session}_{params.camera_name_mapping.get(camera, camera)}.avi" - video_path = f"{params.recordings_dir}/{animal}/{session}/{session}_cameras/{session}_{params.camera_name_mapping.get(camera, camera)}.avi" + video_path = ( + config.recordings + / animal + / session + / f"{session}_cameras" + / f"{session}_{params.camera_name_mapping.get(camera, camera)}.avi" + ) try: - video = Video.from_filename(video_path) + video = Video.from_filename(str(video_path)) # Run frames selection pipeline pipeline.run_disk_stage([video]) frame_data = pipeline.run_processing_state() # Define selected frames path - frames_dir = f"{params.slp_annotations_dir}/{session}_annotations/{session}_{camera}_annotations" - os.makedirs(frames_dir, exist_ok=True) + frames_dir = ( + config.annotations + / f"{session}_annotations" + / f"{session}_{camera}_annotations" + ) + frames_dir.mkdir(parents=True, exist_ok=True) # Save selected frames as images in the frame directory for item in frame_data.items: frame_idx = item.frame_idx frame = video.get_frame(frame_idx) plt.imsave( - os.path.join(frames_dir, f"{session}_{camera}_frame_{frame_idx}.png"), + frames_dir / f"{session}_{camera}_frame_{frame_idx}.png", frame, ) @@ -231,7 +228,7 @@ def select_frames_to_annotate( # create new video from the selected frames if new_video_path is None: - new_video_path = f"{frames_dir}/{session}_{camera}_annotations.mp4" + new_video_path = frames_dir / f"{session}_{camera}_annotations.mp4" try: create_video_from_frames(frames_dir, new_video_path) @@ -243,38 +240,37 @@ def select_frames_to_annotate( return -def create_annotation_projects( - sessions=params.sessions_to_annotate, cameras=params.default_cameras -): +def create_annotation_projects(sessions, cameras=None, pred=False): """ create annotation projects for a list of sessions and cameras without launching GUI for annotation """ - cameras = cameras or params.default_cameras if isinstance(sessions, str): sessions = [sessions] + cameras = cameras or params.default_cameras if isinstance(cameras, str): cameras = [cameras] for session in sessions: for camera in cameras: - create_annotation_project(session, camera) + create_annotation_project(session, camera, pred) def create_video_from_frames( frames_dir, video_path, output_width=1280, output_height=720, fps=5 ): # Get a list of PNG image filenames - images = [img for img in os.listdir(frames_dir) if img.endswith(".png")] + images = [img for img in frames_dir.iterdir() if img.suffix == ".png"] # Sort the image filenames to ensure correct order - images = sorted(images) + images = sorted(images, key=lambda x: x.name) # Set the video codec and create VideoWriter object fourcc = cv2.VideoWriter_fourcc(*"mp4v") - video = cv2.VideoWriter(video_path, fourcc, fps, (output_width, output_height)) + video = cv2.VideoWriter(str(video_path), fourcc, fps, (output_width, output_height)) # Iterate through the PNG images and write them to the video for image in images: - frame = cv2.imread(os.path.join(frames_dir, image)) + # Read the image + frame = cv2.imread(str(image)) if frame is not None: # Resize the frame to the desired output size @@ -285,9 +281,9 @@ def create_video_from_frames( else: logging.info(f"Skipping image {image} due to reading error.") + # Delete the processed images for image in images: - image_path = os.path.join(frames_dir, image) - os.remove(image_path) + image.unlink() # This removes the file represented by the Path object # Release the VideoWriter object video.release() @@ -295,7 +291,7 @@ def create_video_from_frames( return -def create_annotation_project(session, camera): +def create_annotation_project(session, camera, pred): """ Create slp project for annotation to launch annotation GUI on * should we initialize instances for all the frames in the annotation video instead of just the first one? @@ -303,27 +299,54 @@ def create_annotation_project(session, camera): # Paths # video_path = f"{params.recordings_dir}/{animal}/{session}/{session}_cameras/{session}_{params.camera_name_mapping.get(camera, camera)}.avi" select_frames_to_annotate(session, camera, params.frame_selection_pipeline) - labels_output_path = f"{params.slp_annotations_dir}/{session}_annotations/{session}_{camera}_annotations/{session}_{camera}.slp" - annotations_dir_path = f"{params.slp_annotations_dir}/{session}_annotations/{session}_{camera}_annotations/" - videos = [vid for vid in os.listdir(annotations_dir_path) if vid.endswith(".mp4")] + annotations_dir = ( + config.annotations / f"{session}_annotations" / f"{session}_{camera}_annotations" + ) + labels_output_path = annotations_dir / f"{session}_{camera}.slp" + videos = [vid for vid in annotations_dir.glob("*.mp4")] # Load skeleton - with open(params.skeleton_path, "r") as f: + with config.skeleton_path.open("r") as f: skeleton_data = json.load(f) skeleton = Skeleton.from_dict(skeleton_data) # Initialize a list of labeled frames labeled_frames = [] for vid in videos: - video = Video.from_filename(annotations_dir_path + vid) + video = Video.from_filename(str(vid)) # Convert Path to string for compatibility instances = [Instance(skeleton=skeleton)] labeled_frame = LabeledFrame(video=video, frame_idx=0, instances=instances) labeled_frames.append(labeled_frame) + + # Create Labels and save the output labels = Labels(labeled_frames) - os.makedirs(os.path.dirname(labels_output_path), exist_ok=True) - labels.save(labels_output_path) - logging.info(f"Sleap project created for session {session},camera {camera}.") + annotations_dir.mkdir(parents=True, exist_ok=True) # Ensure directory exists + labels.save(str(labels_output_path)) # Save the Labels to the .slp file + + if pred: + model_dir = config.models / camera + if not model_dir.exists(): + logging.info(f"Model directory for {camera} does not exist, skipping tracking.") + + else: + model_path = model_dir / "training_config.json" + command = [ + "sleap-track", + str(labels_output_path), + "--video.index", + "0", + "-m", + str(model_path), + "-o", + str(labels_output_path), + ] + logging.info("Running sleap-track on annotation video") + # Run the sleap-track command using subprocess + subprocess.run(command, check=True) + logging.info("Tracking completed\n") + + return @@ -352,13 +375,9 @@ def create_annotation_project_inefficient(session, camera): for item in frame_data.items: frame_idx = item.frame_idx instances = [Instance(skeleton=skeleton)] # Empty instance - labeled_frame = LabeledFrame( - video=video, frame_idx=frame_idx, instances=instances - ) + labeled_frame = LabeledFrame(video=video, frame_idx=frame_idx, instances=instances) labeled_frames.append(labeled_frame) - logging.info( - f"Labeled frame created for {session}, {camera}, frame {frame_idx}" - ) + logging.info(f"Labeled frame created for {session}, {camera}, frame {frame_idx}") # Save the labeled frames to a .slp project file labels = Labels(labeled_frames) @@ -377,8 +396,7 @@ def annotate_videos(sessions, cameras=params.default_cameras, pred=False): ------ """ - sessions = sessions or params.sessions_to_annotate - cameras = cameras or params.default_cameras + if isinstance(sessions, str): sessions = [sessions] if isinstance(cameras, str): @@ -386,41 +404,42 @@ def annotate_videos(sessions, cameras=params.default_cameras, pred=False): # for each session, check if a sleap projects exists already or not for session in sessions: try: - session_dir = f"{params.slp_annotations_dir}/{session}_annotations" - os.makedirs(session_dir, exist_ok=True) + # use Path instead of strings + session_dir = config.annotations / f"{session}_annotations" + session_dir.mkdir(parents=True, exist_ok=True) for camera in cameras: try: - project_dir = f"{session_dir}/{session}_{camera}_annotations" - project_path = f"{project_dir}/{session}_{camera}.slp" - os.makedirs(project_dir, exist_ok=True) - if not os.path.exists(project_path): - create_annotation_project(session, camera) - if pred: - model_dir = f"{params.slp_models_dir}/{camera}" - if not os.path.exists(model_dir): - logging.info( - f"Model directory for {camera} does not exist, skipping." - ) - continue - model_path = f"{model_dir}/training_config.json" - command = [ - "sleap-track", - project_path, - "--video.index", - "0", - "-m", - model_path, - "-o", - project_path, - ] - logging.info("Running sleap-track on annotation video") - # Run the sleap-track command using subprocess - subprocess.run(command, check=True) - logging.info("Tracking completed\n") + project_dir = session_dir / f"{session}_{camera}_annotations" + project_path = project_dir / f"{session}_{camera}.slp" + session_dir.mkdir(parents=True, exist_ok=True) + if not project_path.exists(): + create_annotation_project(session, camera, pred) + # if pred: # moved to create_annotation_project + # model_dir = config.models/ camera + # if not model_dir.exists(): + # logging.info( + # f"Model directory for {camera} does not exist, skipping." + # ) + # continue + # model_path = model_dir/"training_config.json" + # command = [ + # "sleap-track", + # project_path, + # "--video.index", + # "0", + # "-m", + # model_path, + # "-o", + # project_path, + # ] + # logging.info("Running sleap-track on annotation video") + # # Run the sleap-track command using subprocess + # subprocess.run(command, check=True) + # logging.info("Tracking completed\n") logging.info("Launching annotation GUI...") subprocess.run( - ["sleap-label", project_path] + ["sleap-label", str(project_path)] ) # first test if the project is created except Exception as e: logging.error( @@ -435,17 +454,22 @@ def create_training_file(camera, sessions): .slp project for a specific camera - merging all projects for that camera """ # Path to save the combined training project - combined_project_path = f"{params.slp_training_dir}/{camera}.slp" + combined_project_path = config.training / camera / f"{camera}.slp" all_labeled_frames = [] for session in sessions: try: # Define path to the session-specific .slp file - session_slp_path = f"{params.slp_annotations_dir}/{session}_annotations/{session}_{camera}_annotations/{session}_{camera}.slp" + session_slp_path = ( + config.annotations + / f"{session}_annotations" + / f"{session}_{camera}_annotations" + / f"{session}_{camera}.slp" + ) # # Check if the .slp file exists for the session - if os.path.exists(f"{session_slp_path}"): - session_labels = sleap.load_file(session_slp_path) + if session_slp_path.exists(): + session_labels = sleap.load_file(str(session_slp_path)) session_labeled_frames = session_labels.labeled_frames all_labeled_frames.extend(session_labeled_frames) logging.info( @@ -464,8 +488,8 @@ def create_training_file(camera, sessions): # Create a new Labels object with the combined labeled frames combined_labels = Labels(labeled_frames=all_labeled_frames) - # Ensure the directory for the combined project exists - os.makedirs(os.path.dirname(combined_project_path), exist_ok=True) + # Ensure the directory exists + combined_project_path.parent.mkdir(parents=True, exist_ok=True) try: # Save the combined Labels object to a new .slp file combined_labels.save(combined_project_path) @@ -480,22 +504,21 @@ def create_training_projects(sessions, cameras=params.default_cameras): """ creates .slp training projects """ - cameras = cameras or params.default_cameras if isinstance(cameras, str): cameras = [cameras] for camera in cameras: - training_dir = f"{params.slp_training_dir}/{camera}" - os.makedirs(training_dir, exist_ok=True) - labels_file = f"{params.slp_training_dir}/{camera}/{camera}.slp" - config_file = f"{params.slp_training_dir}/{camera}/training_config.json" + training_dir = config.training / camera + training_dir.mkdir(parents=True, exist_ok=True) + labels_file = training_dir / f"{camera}.slp" + config_file = training_dir / "training_config.json" # Check if the .slp file exists; if not, run create_training_file - if not os.path.exists(labels_file): + if not labels_file.exists(): logging.info(f"{labels_file} does not exist. Creating training file...") - create_training_file(camera, sessions) + create_training_file(camera=camera, sessions=sessions) # Ensure configuration file exists - if not os.path.exists(config_file): + if not config_file.exists(): logging.info( f"Configuration file for {camera} does not exist, using default one" ) @@ -518,14 +541,13 @@ def create_training_config_file(config_file): return -def train_models(cameras=params.default_cameras, sessions=params.training_sessions): +def train_models(cameras=params.default_cameras, sessions=None): """ TBD - create config file with training parameters; check if config file exists, if not create it using the parameters in params - test creation of training project - can be done from GUI """ - cameras = cameras or params.default_cameras if isinstance(sessions, str): sessions = [sessions] @@ -537,19 +559,18 @@ def train_models(cameras=params.default_cameras, sessions=params.training_sessio for camera in cameras: # Define paths for model and labels # model_dir = os.path.join(params.slp_models_dir, camera) - training_dir = f"{params.slp_training_dir}/{camera}" - os.makedirs(training_dir, exist_ok=True) - - labels_file = f"{params.slp_training_dir}/{camera}.slp" - config_file = f"{params.slp_training_dir}/models/{camera}/training_config.json" + training_dir = config.training / camera + training_dir.mkdir(parents=True, exist_ok=True) + labels_file = training_dir / f"{camera}.slp" + config_file = training_dir / "training_config.json" # Check if the .slp file exists; if not, run create_training_file - if not os.path.exists(labels_file): + if not labels_file.exists(): logging.info(f"{labels_file} does not exist. Creating training file...") create_training_file(camera, sessions) # Ensure configuration file exists - if not os.path.exists(config_file): + if not config_file.exists(): create_training_config_file(config_file) logging.info(f"Configuration file for {camera} created.") @@ -571,7 +592,7 @@ def select_frames_to_predict(): def get_2Dpredictions( - sessions=params.sessions_to_predict, + sessions, cameras=params.default_cameras, frames=params.frames_to_predict, input_file=None, @@ -587,16 +608,16 @@ def get_2Dpredictions( ## check if the output folder exists # if the arguments are passes as None from cli: - sessions = sessions or params.sessions_to_predict - cameras = cameras or params.default_cameras - frames = frames or params.frames_to_predict + + # cameras = cameras or params.default_cameras + # frames = frames or params.frames_to_predict if isinstance(sessions, str): sessions = [sessions] if isinstance(cameras, str): cameras = [cameras] try: - os.makedirs(os.path.dirname(params.predictions_dir), exist_ok=True) + config.predictions2D.mkdir(parents=True, exist_ok=True) except Exception as e: logging.error(f"Failed to create predictions directory: {e}") return @@ -640,15 +661,15 @@ def get_2Dpredictions( for camera in cameras: model_dir = f"{params.slp_models_dir}/{camera}" if not os.path.exists(model_dir): - logging.info( - f"Model directory for {camera} does not exist, skipping." - ) + logging.info(f"Model directory for {camera} does not exist, skipping.") continue model_path = f"{model_dir}/training_config.json" input_file = ( f"{input_dir}/{params.camera_name_mapping.get(camera, camera)}.avi" ) - output_file = f"{params.predictions_dir}/{sessions[0]}_{camera}.slp.predictions.slp" + output_file = ( + f"{params.predictions_dir}/{sessions[0]}_{camera}.slp.predictions.slp" + ) logging.info(f"Running sleap-track for camera {camera}") logging.info(f"Input file: {input_file}") @@ -687,22 +708,30 @@ def get_2Dpredictions( animal = session.split("_")[0] for camera in cameras: try: - model_dir = f"{params.slp_models_dir}/{camera}" - if not os.path.exists(model_dir): + model_dir = config.models / camera + if not model_dir.exists(): logging.info( f"Model directory for {camera} does not exist, skipping." ) continue - model_path = f"{model_dir}/training_config.json" + model_path = model_dir / "training_config.json" # Different cases for different input directories because different saving formats are used - if "raw" in params.input_2Dpred: - input_file = f"{params.input_2Dpred}/{animal}/{session}/{session}_cameras/{session}_{params.camera_name_mapping.get(camera, camera)}.avi" - elif "annotations" in params.input_2Dpred: - input_file = f"{params.slp_annotations_dir}/{session}_annotations/{session}_{camera}_annotations/{session}_{camera}.slp" - else: - input_file = f"{params.input_2Dpred}/{session}/{camera}/{session}_{camera}.slp" - output_file = f"{params.predictions_dir}/{session}_{camera}.slp.predictions.slp" + input_file = ( + config.recordings + / animal + / session + / f"{session}_cameras" + / f"{session}_{params.camera_name_mapping.get(camera, camera)}.avi" + ) + # if config.recordings == config.recordings_local: # in case the videos are just copied in the local folder without the raw data structure + # input_file = config.recordings/f"{session}_{params.camera_name_mapping.get(camera, camera)}.avi" + # else: + # input_file = f"{params.input_2Dpred}/{session}/{camera}/{session}_{camera}.slp" + + output_file = ( + config.predictions2D / f"{session}_{camera}.slp.predictions.slp" + ) logging.info( f"Running sleap-track for session {session} and camera {camera}" @@ -757,7 +786,7 @@ def visualize_predictions(sessions, cameras=params.default_cameras): for session in sessions: for camera in cameras: predictions_path = ( - f"{params.predictions_dir}/{session}_{camera}.slp.predictions.slp" + config.predictions2D / f"{session}_{camera}.slp.predictions.slp" ) subprocess.run(["sleap-label", predictions_path]) return diff --git a/projects/complete_projects/M045_2024_11_20_11_15/config.toml b/projects/complete_projects/M045_2024_11_20_11_15/config.toml deleted file mode 100644 index 8ff8fde..0000000 --- a/projects/complete_projects/M045_2024_11_20_11_15/config.toml +++ /dev/null @@ -1,9 +0,0 @@ -[angles] -right_knee = [ "hip_center", "right_knee", "right_ankle",] -left_knee = [ "hip_center", "left_knee", "left_ankle",] -right_ankle = [ "right_knee", "right_ankle", "right_foot",] -left_ankle = [ "left_knee", "left_ankle", "left_foot",] -right_wrist = [ "right_elbow", "right_wrist", "right_paw",] -left_wrist = [ "left_elbow", "left_wrist", "left_paw",] -right_elbow = [ "right_shoulder", "right_elbow", "right_wrist",] -left_elbow = [ "left_shoulder", "left_elbow", "left_wrist",]