diff --git a/.gitignore b/.gitignore index 7b6caf3..ba6b398 100644 --- a/.gitignore +++ b/.gitignore @@ -160,3 +160,4 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ +projects/ \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..f2d22e8 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,14 @@ +repos: +- repo: https://github.com/astral-sh/ruff-pre-commit + # Ruff version. + rev: v0.6.8 + hooks: + # Run the linter. + - id: ruff + args: [--select, "E1,E4,F,I,W", --extend-ignore, "W5,W6,F841,W291", --fix, --line-length, "92"] + types: + - python + # Run the formatter. + - id: ruff-format + types: + - python \ No newline at end of file diff --git a/README.md b/README.md index 2f0d225..5a384ea 100644 --- a/README.md +++ b/README.md @@ -19,33 +19,42 @@ environment and separate from `bnd`. # Miniconda $ curl https://repo.anaconda.com/miniconda/Miniconda3-latest-Windows-x86_64.exe -o miniconda.exe; Start-Process -FilePath ".\miniconda.exe" -ArgumentList "/S" -Wait; del miniconda.exe ``` +3. Clone repo and navigate to folder: + ```shell + git clone git@github.com:BeNeuroLab/beneuro_pose_estimation.git + cd ./beneuro_pose_estimation + ``` +4. Creating the conda environment + +```shell +conda create -y -n bnp -c conda-forge -c nvidia -c sleap/label/dev -c sleap -c anaconda sleap=1.4.1 + +# Activate the Conda environment +conda activate bnp + +# Remove opencv pypi version to avoid conflicts +pip uninstall -y opencv-python-headless + +# Install the required version of OpenCV +pip install "opencv-contrib-python<4.7.0" + +# Install sleap_anipose and the required version of anipose +pip install sleap_anipose +pip install "anipose<1.1" + +# Upgrade apptools to the latest version +pip install --upgrade apptools + +# Install package in editable form +pip install -e .\ # Windows +``` -3. Creating the conda environment - - This seems to be working: - ```shell - # Create the environment called bnp and install sleap - $ conda create -y -n bnp -c conda-forge -c nvidia -c sleap -c anaconda sleap - $ conda activate bnp - - # Remove opencv pypi version because it conflicts with sleap-anipose and anipose - $ pip uninstall -y opencv-python-headless - - # Install required version - $ pip install "opencv-contrib-python<4.7.0" - - # Install sleap anipose and anipose version 1.0 because we cannot use 1.1 - $ pip install sleap_anipose - $ pip install "anipose<1.1" - $ pip install --upgrade apptools - ``` - - The key package versions are: - ```text - # Name Version Build Channel - anipose 1.0.1 pypi_0 pypi - aniposelib 0.5.1 pypi_0 pypi - sleap-anipose 0.1.8 pypi_0 pypi - opencv-contrib-python 4.6.0.66 pypi_0 pypi - opencv-python 4.10.0.84 pypi_0 pypi - ``` \ No newline at end of file +The key package versions are: +```text +# Name Version Build Channel +anipose 1.0.1 pypi_0 pypi +aniposelib 0.5.1 pypi_0 pypi +sleap-anipose 0.1.8 pypi_0 pypi +opencv-contrib-python 4.6.0.66 pypi_0 pypi +opencv-python 4.10.0.84 pypi_0 pypi +``` \ No newline at end of file diff --git a/beneuro_pose_estimation/__init__.py b/beneuro_pose_estimation/__init__.py index 4a7a74d..b572c68 100644 --- a/beneuro_pose_estimation/__init__.py +++ b/beneuro_pose_estimation/__init__.py @@ -1,27 +1,39 @@ -""" -Initialize macro variables and functions -""" import logging -from pathlib import Path - -from rich.logging import RichHandler - -def set_logging(file_path = None, overwrite = True): - frmt = '%(asctime)s - %(levelname)s - %(message)s' - - if file_path is not None: - file_path = Path(file_path) - if overwrite is True and file_path.exists() is True: - file_path.unlink() - logging.basicConfig( - filename=file_path, - level=logging.INFO, - format=frmt, - datefmt='%Y-%m-%d %H:%M:%S' - ) - else: - logging.basicConfig( - handlers=[RichHandler(level="NOTSET")], - level=logging.INFO, - format=frmt, datefmt='%Y-%m-%d %H:%M:%S' - ) +import warnings + + +# Create a logger for the package +def set_logging( + file_name: str, +) -> logging.Logger: + """ + Set project-wide logging + + Parameters + ---------- + file_name: str + Name of the module being logged + + Returns + ------- + logger: logging.Logger + logger object + """ + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + logging.captureWarnings(True) + + logger = logging.getLogger(file_name) + + def custom_warning_handler( + message, category, filename, lineno, file=None, line=None + ): + logger.warning(f"{category.__name__}: {message}") + + # Set the custom handler + warnings.showwarning = custom_warning_handler + + return logger diff --git a/beneuro_pose_estimation/anipose/aniposeTools.py b/beneuro_pose_estimation/anipose/aniposeTools.py index 194f832..d6e75e7 100644 --- a/beneuro_pose_estimation/anipose/aniposeTools.py +++ b/beneuro_pose_estimation/anipose/aniposeTools.py @@ -1,41 +1,43 @@ """ -Module to carry out Anipose operations +Module to carry out Anipose operations TBD: - test evaluate_reprojection - check logging ------------------------------- conda activate bnp -> in dev: -python -m beneuro_pose_estimation.cli pose-estimation --sessions session_name(s) +python -m beneuro_pose_estimation.cli pose-estimation --sessions session_name(s) -> after package installation: -pose pose-estimation --sessions session_name(s) +pose pose-estimation --sessions session_name(s) """ -import json -import matplotlib as plt + import logging -from beneuro_pose_estimation import params, set_logging + +import matplotlib as plt + import beneuro_pose_estimation.sleap.sleapTools as sleapTools +from beneuro_pose_estimation import params + if not logging.getLogger().hasHandlers(): - logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -from pathlib import Path -import sleap -import numpy as np + logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" + ) import os -import h5py -from aniposelib.boards import CharucoBoard -from aniposelib.cameras import CameraGroup -import sleap_anipose as slap -import argparse -import pandas as pd import subprocess from datetime import datetime -from anipose.compute_angles import compute_angles -import toml +from pathlib import Path +import h5py +import numpy as np +import pandas as pd +import sleap_anipose as slap +import toml +from anipose.compute_angles import compute_angles +from aniposelib.cameras import CameraGroup -def evaluate_reprojection(reprojection_path, predictions_2D_dir,histogram_path = None): +def evaluate_reprojection(reprojection_path, predictions_2D_dir, histogram_path=None): """ Plots histogram of reprojection error. Not tested @@ -45,21 +47,22 @@ def evaluate_reprojection(reprojection_path, predictions_2D_dir,histogram_path = reprojection_data = np.load(reprojection_path) slap.make_histogram(detection_data, reprojection_data, save_path=histogram_path) logging.info(f"Reprojection histogram saved at {histogram_path}") - - return + + return + def load_2D_predictions(predictions_2D_dir): - ''' + """ TBD - need to check that the order of cameras is the same as in reprojections - ''' + """ predictions_list = [] cameras = params.default_cameras session = os.path.basename(predictions_2D_dir) for camera in cameras: h5_file = f"{predictions_2D_dir}/{session}_{camera}.analysis.h5" - with h5py.File(h5_file, 'r') as f: + with h5py.File(h5_file, "r") as f: # Shape of 'tracks': (1, 2, n_nodes, n_frames) - tracks = f['tracks'][:] # Read the data into memory + tracks = f["tracks"][:] # Read the data into memory tracks = np.squeeze(tracks) # Remove the leading dimension if it is 1 # Rearrange the dimensions to (n_frames, n_nodes, 2) tracks = np.moveaxis(tracks, 1, -1) @@ -72,13 +75,16 @@ def load_2D_predictions(predictions_2D_dir): def get_frame_count(h5_analysis_file): - with h5py.File(h5_analysis_file, 'r') as f: - return f['tracks'].shape[-1] + with h5py.File(h5_analysis_file, "r") as f: + return f["tracks"].shape[-1] + def get_most_recent_calib(session): # Parse session date and time try: - session_datetime = datetime.strptime("_".join(session.split('_')[1:]), "%Y_%m_%d_%H_%M") + session_datetime = datetime.strptime( + "_".join(session.split("_")[1:]), "%Y_%m_%d_%H_%M" + ) except ValueError: logging.error(f"Invalid session format: {session}") return None @@ -90,10 +96,14 @@ def get_most_recent_calib(session): if folder.is_dir(): try: # Extract the datetime from the "Recording_" format - calib_datetime = datetime.strptime("_".join(folder.name.split('_')[2:]), "%Y_%m_%d_%H_%M") + calib_datetime = datetime.strptime( + "_".join(folder.name.split("_")[2:]), "%Y_%m_%d_%H_%M" + ) calib_folders.append((calib_datetime, folder)) except ValueError: - logging.warning(f"Invalid calibration folder format: {folder.name}. Skipping.") + logging.warning( + f"Invalid calibration folder format: {folder.name}. Skipping." + ) continue # Sort calibration folders by datetime in descending order @@ -105,15 +115,19 @@ def get_most_recent_calib(session): if calib_datetime < session_datetime: recent_calib_folder = folder break - + if recent_calib_folder is None: logging.warning(f"No valid calibration folders found before session {session}.") return None - logging.info(f"Using calibration folder: {recent_calib_folder} for session {session}") + logging.info( + f"Using calibration folder: {recent_calib_folder} for session {session}" + ) # Generate calibration file path - calib_file_name = Path(f"calibration_{calib_datetime.strftime('%Y_%m_%d_%H_%M')}.toml") + calib_file_name = Path( + f"calibration_{calib_datetime.strftime('%Y_%m_%d_%H_%M')}.toml" + ) calib_file_path = Path(params.calibration_dir) / calib_file_name print(calib_file_path) # Create calibration configuration if it doesn't exist @@ -125,15 +139,16 @@ def get_most_recent_calib(session): return calib_file_path + def get_calib_file(calib_videos_dir, calib_save_path, board=params.board): """ - Generates calibration file using ChArUco board videos. - get most recent calibration + Generates calibration file using ChArUco board videos. - get most recent calibration Parameters ---------- calib_videos_dir : str - Directory path containing ChArUco videos for calibration + Directory path containing ChArUco videos for calibration calib_save_path : str Path to save the calibration file board : CharucoBoard @@ -142,30 +157,37 @@ def get_calib_file(calib_videos_dir, calib_save_path, board=params.board): ------- """ - - calib_videos_dir = next(calib_videos_dir.iterdir(), None) # might want to change this - video_files = os.listdir(calib_videos_dir) + + calib_videos_dir = next( + calib_videos_dir.iterdir(), None + ) # might want to change this + video_files = os.listdir(calib_videos_dir) cam_names, vidnames = [], [] reversed_mapping = {v: k for k, v in params.camera_name_mapping.items()} for video_file in video_files: - if video_file.endswith('.avi') or video_file.endswith('.mp4'): + if video_file.endswith(".avi") or video_file.endswith(".mp4"): # cam_name = "_".join(video_file.split('_')[:2]) camera = Path(video_file).stem - if camera =="Camera_3": + if camera == "Camera_3": continue cam_name = reversed_mapping.get(camera, camera) vidnames.append([f"{calib_videos_dir}/{video_file}"]) cam_names.append(cam_name) - + # Initialize and configure CharucoBoard and CameraGroup cgroup = CameraGroup.from_names(cam_names, fisheye=params.fisheye) cgroup.calibrate_videos(vidnames, board) cgroup.dump(calib_save_path) logging.info(f"Calibration file saved at {calib_save_path}") - return + return -def convert_2Dpred_to_h5(sessions, cameras=params.default_cameras, input_dir=params.predictions_dir, output_dir=params.complete_projects_dir): +def convert_2Dpred_to_h5( + sessions, + cameras=params.default_cameras, + input_dir=params.predictions_dir, + output_dir=params.complete_projects_dir, +): """ Converts .slp.predictions.slp files to .h5 analysis files for each session and camera. """ @@ -185,20 +207,31 @@ def convert_2Dpred_to_h5(sessions, cameras=params.default_cameras, input_dir=par else: try: subprocess.run( - ["sleap-convert", "--format", "analysis", "-o", output_file, input_file], - check=True + [ + "sleap-convert", + "--format", + "analysis", + "-o", + output_file, + input_file, + ], + check=True, ) logging.info(f"Converted {input_file} to {output_file}") except subprocess.CalledProcessError as e: logging.error(f"Error during conversion for {input_file}: {e}") -def compute_3Dpredictions(session, project_dir, calib_file_path, frame_window=params.frame_window,eval = False): +def compute_3Dpredictions( + session, project_dir, calib_file_path, frame_window=params.frame_window, eval=False +): """ Triangulates 3D predictions from 2D predictions for each session in windows and then combines them. """ cgroup = CameraGroup.load(calib_file_path) - n_frames = get_frame_count(f"{project_dir}/{params.default_cameras[0]}/{session}_{params.default_cameras[0]}.analysis.h5") + n_frames = get_frame_count( + f"{project_dir}/{params.default_cameras[0]}/{session}_{params.default_cameras[0]}.analysis.h5" + ) windows = np.arange(0, n_frames, frame_window) windows = np.append(windows, n_frames) reprojections_list = [] # List to store reprojections from each window @@ -206,7 +239,7 @@ def compute_3Dpredictions(session, project_dir, calib_file_path, frame_window=pa for start, end in zip(windows[:-1], windows[1:]): print(f"Processing frames {start} to {end}") output_file = f"{project_dir}/{session}_triangulation_{start}_{end}.h5" - + slap.triangulate( p2d=project_dir, calib=calib_file_path, @@ -217,57 +250,67 @@ def compute_3Dpredictions(session, project_dir, calib_file_path, frame_window=pa scale_smooth=params.triangulation_params["scale_smooth"], scale_length=params.triangulation_params["scale_length"], scale_length_weak=params.triangulation_params["scale_length_weak"], - reproj_error_threshold=params.triangulation_params["reproj_error_threshold"], + reproj_error_threshold=params.triangulation_params[ + "reproj_error_threshold" + ], reproj_loss=params.triangulation_params["reproj_loss"], - n_deriv_smooth=params.triangulation_params["n_deriv_smooth"] + n_deriv_smooth=params.triangulation_params["n_deriv_smooth"], ) - + logging.info(f"3D prediction file created: {output_file}") if eval: reproj_output = slap.reproject( - p3d=output_file, - calib=calib_file_path, - frames=(start, end) + p3d=output_file, calib=calib_file_path, frames=(start, end) ) logging.info(f"Reprojection created for frames {start} to {end}") reprojections_list.append(reproj_output) if reprojections_list: - reprojections_array = np.concatenate(reprojections_list, axis=1) # Concatenate along the frames axis - logging.info(f"Reprojections concatenated with shape: {reprojections_array.shape}") + reprojections_array = np.concatenate( + reprojections_list, axis=1 + ) # Concatenate along the frames axis + logging.info( + f"Reprojections concatenated with shape: {reprojections_array.shape}" + ) save_path = f"{project_dir}/{session}_reprojections.npy" np.save(save_path, reprojections_array) logging.info(f"Reprojections saved to {save_path}") histogram_path = f"{project_dir}/{session}_reprojection_histogram.pdf" - evaluate_reprojection(reprojection_path = save_path,predictions_2D_dir = project_dir, histogram_path = histogram_path) - + evaluate_reprojection( + reprojection_path=save_path, + predictions_2D_dir=project_dir, + histogram_path=histogram_path, + ) - combine_h5_files(session, windows, project_dir,eval) - - - + combine_h5_files(session, windows, project_dir, eval) -def combine_h5_files(session, windows, project_dir,eval = False): +def combine_h5_files(session, windows, project_dir, eval=False): """ Combines multiple .h5 files into one. """ combined_file = f"{project_dir}/{session}_pose_estimation_combined.h5" - with h5py.File(combined_file, 'w') as combined_h5: + with h5py.File(combined_file, "w") as combined_h5: for start, end in zip(windows[:-1], windows[1:]): fname = f"{project_dir}/{session}_triangulation_{start}_{end}.h5" if os.path.exists(fname): - with h5py.File(fname, 'r') as f: + with h5py.File(fname, "r") as f: # points3d_data = f['points3d'][:] - points3d_data = f['tracks'] - if 'points3d' not in combined_h5: - combined_dataset = combined_h5.create_dataset('points3d', data=points3d_data, maxshape=(None,) + points3d_data.shape[1:]) + points3d_data = f["tracks"] + if "points3d" not in combined_h5: + combined_dataset = combined_h5.create_dataset( + "points3d", + data=points3d_data, + maxshape=(None,) + points3d_data.shape[1:], + ) else: - combined_dataset.resize((combined_dataset.shape[0] + points3d_data.shape[0]), axis=0) - combined_dataset[-points3d_data.shape[0]:] = points3d_data - + combined_dataset.resize( + (combined_dataset.shape[0] + points3d_data.shape[0]), axis=0 + ) + combined_dataset[-points3d_data.shape[0] :] = points3d_data + logging.info(f"Combined 3D predictions saved at {combined_file}") @@ -276,20 +319,26 @@ def save_to_csv(session, h5_file_path, csv_path): Saves 3D prediction data to CSV format. add _error, _score, _ncams,add fnum """ - points3d = h5py.File(h5_file_path, 'r')['points3d'][:] + points3d = h5py.File(h5_file_path, "r")["points3d"][:] fnum = np.arange(points3d.shape[0]) # Generate frame numbers points3d_flat = points3d.reshape((points3d.shape[0], -1)) # Prepare column names for the 3D points - columns = [f"{part}_{axis}" for part in params.body_parts for axis in ("x", "y", "z")] + columns = [ + f"{part}_{axis}" for part in params.body_parts for axis in ("x", "y", "z") + ] # Create the base DataFrame with points3d and frame numbers df = pd.DataFrame(points3d_flat, columns=columns) - df.insert(0, 'fnum', fnum) # Add 'fnum' column as the first column + df.insert(0, "fnum", fnum) # Add 'fnum' column as the first column # Prepare '_error' and '_score' columns for all body parts in bulk - error_columns = {f"{part}_error": 0 for part in params.body_parts} # All '_error' set to 0 - score_columns = {f"{part}_score": 1 for part in params.body_parts} # All '_score' set to 1 + error_columns = { + f"{part}_error": 0 for part in params.body_parts + } # All '_error' set to 0 + score_columns = { + f"{part}_score": 1 for part in params.body_parts + } # All '_score' set to 1 # Create DataFrames for error and score columns error_df = pd.DataFrame(error_columns, index=df.index) @@ -304,9 +353,8 @@ def save_to_csv(session, h5_file_path, csv_path): def get_body_part_connections(constraints, keypoints_dict): - return [ - [keypoints_dict[start], keypoints_dict[end]] for start, end in constraints - ] + return [[keypoints_dict[start], keypoints_dict[end]] for start, end in constraints] + # def create_config_file(project_dir, config_file_name="config.toml", body_parts = params.body_parts, constraints = params.constraints, triangulation_params = params.triangulation_params,angles = params.angles, frame_window = params.frame_window): # """ @@ -329,7 +377,7 @@ def get_body_part_connections(constraints, keypoints_dict): # "angles": angles, # "frame_window": frame_window, # } - + # # Save configuration to a JSON file # with open(config_path, "w") as config_file: # json.dump(config_dict, config_file, indent=4) @@ -337,16 +385,15 @@ def get_body_part_connections(constraints, keypoints_dict): # return config_path -def create_config_file(config_path, angles = params.angles): + +def create_config_file(config_path, angles=params.angles): """ Creates a configuration file in the project directory if it doesn't already exist. Uses parameters defined in params.py for setup. """ if not os.path.isfile(config_path): - config = { - "angles": angles - } + config = {"angles": angles} with open(config_path, "w") as f: toml.dump(config, f) @@ -357,19 +404,25 @@ def create_config_file(config_path, angles = params.angles): return config_path -def plot_behaviour(data_path, columns_to_plot, windwow = params.frame_window, frame_start = None, frame_end = None): +def plot_behaviour( + data_path, + columns_to_plot, + windwow=params.frame_window, + frame_start=None, + frame_end=None, +): """ Plots selected body points or angles over time from a CSV file. """ data = pd.read_csv(data_path) - + plt.figure(figsize=(10, 6)) for column in columns_to_plot: if column in data.columns: plt.plot(data.index, data[column], label=column) else: logging.warning(f"Column {column} not found in data.") - + plt.xlabel("Frame Index") plt.ylabel("Value") plt.title("Behaviour over time") @@ -382,7 +435,13 @@ def extract_date(session_name): return "".join(session_name.split("_")[1:4]) -def run_pose_estimation(sessions, log_file = None, projects_dir=params.complete_projects_dir,videos_folder = None, eval = False): +def run_pose_estimation( + sessions, + log_file=None, + projects_dir=params.complete_projects_dir, + videos_folder=None, + eval=False, +): """ Main routing from videos to 3D keypoints and angles. """ @@ -393,14 +452,20 @@ def run_pose_estimation(sessions, log_file = None, projects_dir=params.complete_ logging.info(f"Running pose estimation on {session}") project_dir = f"{projects_dir}/{session}" os.makedirs(project_dir, exist_ok=True) - sleapTools.get_2Dpredictions(session,input_file = videos_folder) + sleapTools.get_2Dpredictions(session, input_file=videos_folder) convert_2Dpred_to_h5(session) ############################################### # calib_file_path = get_most_recent_calib("M045_2024_11_20_11_35") calib_file_path = get_most_recent_calib(session) - compute_3Dpredictions(session, calib_file_path=calib_file_path, project_dir = project_dir,eval = eval) + compute_3Dpredictions( + session, calib_file_path=calib_file_path, project_dir=project_dir, eval=eval + ) labels_fname = f"{project_dir}/{session}_3d_predictions.csv" - save_to_csv(session,f"{project_dir}/{session}_pose_estimation_combined.h5", labels_fname) + save_to_csv( + session, + f"{project_dir}/{session}_pose_estimation_combined.h5", + labels_fname, + ) config_path = f"{project_dir}/config.toml" if not os.path.exists(config_path): config_path = create_config_file(config_path) @@ -408,7 +473,7 @@ def run_pose_estimation(sessions, log_file = None, projects_dir=params.complete_ angles_csv = f"{project_dir}/{session}_angles.csv" labels_data = pd.read_csv(labels_fname) print(labels_data.columns) - compute_angles(config,labels_fname, angles_csv ) + compute_angles(config, labels_fname, angles_csv) logging.info(f"Pose estimation completed for {session}") pose_data = pd.read_csv(labels_fname) angles_data = pd.read_csv(angles_csv) @@ -419,5 +484,3 @@ def run_pose_estimation(sessions, log_file = None, projects_dir=params.complete_ # Save the updated CSV combined_data.to_csv(combined_csv, index=False) logging.info(f"Angles computed and combined CSV saved at {combined_csv}") - - diff --git a/beneuro_pose_estimation/cli.py b/beneuro_pose_estimation/cli.py index 28c98b8..81b8b5e 100644 --- a/beneuro_pose_estimation/cli.py +++ b/beneuro_pose_estimation/cli.py @@ -1,84 +1,114 @@ -# command line interface -import argparse -from beneuro_pose_estimation import set_logging -from beneuro_pose_estimation.sleap.sleapTools import ( - annotate_videos, - create_annotation_projects, - get_2Dpredictions, - train_models, - visualize_predictions, +from pathlib import Path + +import typer + +from rich import print + +from beneuro_pose_estimation import params, set_logging +from beneuro_pose_estimation.config import _check_config, _get_package_path, \ + _check_is_git_track, _check_root, _get_env_path +from beneuro_pose_estimation.sleap.sleapTools import annotate_videos +from beneuro_pose_estimation.update_bnp import check_for_updates, update_bnp + +# Create a Typer app +app = typer.Typer( + add_completion=False, # Disable the auto-completion options ) -from beneuro_pose_estimation.anipose.aniposeTools import run_pose_estimation - - -def main(): - # Initialize logging - set_logging() - - # Set up main parser - parser = argparse.ArgumentParser( - description="Beneuro Pose Estimation Toolkit", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, - ) - subparsers = parser.add_subparsers(dest="command", required=True, help="Available commands") - - # Subcommand: Annotate video - annotate_parser = subparsers.add_parser("annotate", help="Annotate video frames for SLEAP") - annotate_parser.add_argument("--sessions", nargs="+", required=True, help="List of sessions to annotate") - annotate_parser.add_argument("--cameras", nargs="*", help="List of cameras to annotate") - annotate_parser.add_argument( - "--pred", - action="store_true", - help="Run predictions before annotating using an existing model" - ) - # Subcommand: Create annotation projects - annotation_parser = subparsers.add_parser("create-annotations", help="Create annotation projects for SLEAP") - annotation_parser.add_argument("--sessions", nargs="+", required=True, help="List of sessions for annotation projects") - annotation_parser.add_argument("--cameras", nargs="*", help="List of cameras for annotation projects") - - # Subcommand: Run pose estimation - pose_parser = subparsers.add_parser("pose-estimation", help="Run full 3D pose estimation") - pose_parser.add_argument("--sessions", nargs="+", required=True, help="List of sessions to process") - - # Subcommand: Run 2D predictions - predict_parser = subparsers.add_parser("predict-2D", help="Run 2D predictions with SLEAP") - predict_parser.add_argument("--sessions", nargs="+", required=True, help="List of sessions to process") - predict_parser.add_argument("--cameras", nargs="*", help="List of cameras to process") - predict_parser.add_argument("--frames", nargs="*", help="Specific frames to predict on") - predict_parser.add_argument("--input_file", type=str, help="Input file to video or directory") - predict_parser.add_argument("--output_file", type=str, help="Output file for predictions") - predict_parser.add_argument("--model_path", type=str, help="Model configuration file path") - - - # Subcommand: Train SLEAP models - train_parser = subparsers.add_parser("train", help="Train SLEAP models") - train_parser.add_argument("--sessions", nargs="+", required=True, help="List of sessions for training") - train_parser.add_argument("--cameras", nargs="*", help="List of cameras for training") - - # Subcommand: Visualize predictions - visualize_parser = subparsers.add_parser("visualize-2D", help="Launch SLEAP label to visualize predictions") - visualize_parser.add_argument("--sessions", nargs="+", required=True, help="List of sessions to process") - visualize_parser.add_argument("--cameras", nargs="*", help="List of cameras to process") - - # Parse arguments - args = parser.parse_args() - - # Dispatch to the appropriate function - if args.command == "annotate": - annotate_videos(sessions=args.sessions, cameras=args.cameras,pred = args.pred) - elif args.command == "create-annotations": - create_annotation_projects(sessions=args.sessions, cameras=args.cameras) - elif args.command == "pose-estimation": - run_pose_estimation(sessions=args.sessions) - elif args.command == "predict-2D": - get_2Dpredictions(sessions=args.sessions, cameras=args.cameras, frames=args.frames, input_file =args.input_file, output_file = args.output_file, model_path = args.model_path) - elif args.command == "train": - train_models(sessions=args.sessions, cameras=args.cameras) - elif args.command == "visualize-2D": - visualize_predictions(sessions=args.sessions, cameras=args.cameras) + +logger = set_logging(__name__) + +# ================================== Functionality ========================================= + + +@app.command() +def annotate( + session_name: str = typer.Argument(..., help="Session name to annotate"), + camera: str = typer.Argument(..., help=f"Camera name to annotate. Must be part of {params.default_cameras}"), + pred: bool = typer.Option(True, "--pred/--no-pred", help="Run annotation on prediction or not.", ), +): + """ + Annotate sleap project + """ + annotate_videos( + sessions=session_name, + cameras=camera, + pred=pred) + + return + +def create_annotation_project(): + return + +def run_pose_estimation(): + return + +def get_2d_predictions(): + return + +# =================================== Updating ========================================== + + +@app.command() +def check_updates(): + """ + Check if there are any new commits on the repo's main branch. + """ + logger.info('test_message') + + check_for_updates() + + +@app.command() +def self_update(): + """ + Update the bnd tool by pulling the latest commits from the repo's main branch. + """ + update_bnp() + +# ================================= Initialization ========================================= + +@app.command() +def init(): + """ + Create a .env file to store the paths to the local and remote data storage. + """ + + # check if the file exists + env_path = _get_env_path() + + if env_path.exists(): + print("\n[yellow]Config file already exists.\n") + + _check_config() + else: - parser.print_help() + print("\nConfig file doesn't exist. Let's create one.") + repo_path = _get_package_path() + _check_is_git_track(repo_path) + + local_path = Path( + typer.prompt( + "Enter the absolute path to the root of the local data storage" + ) + ) + _check_root(local_path) + + remote_path = Path( + typer.prompt("Enter the absolute path to the root of remote data storage") + ) + _check_root(remote_path) + + with open(env_path, "w") as f: + f.write(f"REPO_PATH = {repo_path}\n") + f.write(f"LOCAL_PATH = {local_path}\n") + f.write(f"REMOTE_PATH = {remote_path}\n") + + # make sure that it works + _check_config() + + print("\n[green]Config file created successfully.\n") +# Main Entry Point if __name__ == "__main__": - main() + app() diff --git a/beneuro_pose_estimation/config.py b/beneuro_pose_estimation/config.py new file mode 100644 index 0000000..61c2af1 --- /dev/null +++ b/beneuro_pose_estimation/config.py @@ -0,0 +1,88 @@ +""" +Initialize macro variables and functions +""" + +from pathlib import Path +from rich import print + + +def _get_package_path() -> Path: + """ + Returns the path to the package directory. + """ + return Path(__file__).absolute().parent.parent + + +def _get_env_path() -> Path: + """ + Returns the path to the .env file containing the configuration settings. + """ + package_path = _get_package_path() + return package_path / ".env" + + +def _check_is_git_track(repo_path): + folder = Path(repo_path) # Convert to Path object + assert (folder / ".git").is_dir() + + +def _check_root(root_path: Path): + assert root_path.exists(), f"{root_path} does not exist." + assert root_path.is_dir(), f"{root_path} is not a directory." + + files_in_root = [f.stem for f in root_path.iterdir()] + + assert "raw" in files_in_root, f"No raw folder in {root_path}" + + +def _check_config(): + """ + Check that the local and remote root folders have the expected raw and processed folders. + """ + config = _load_config() + + print( + "Checking that local and remote root folders have the expected raw and processed folders..." + ) + + _check_root(config.LOCAL_PATH) + _check_root(config.REMOTE_PATH) + + print("[green]Config looks good.") + + +class Config: + """ + Class to load local configuration + """ + + def __init__(self, env_path=_get_env_path()): + self.load_env(env_path) + self.assign_paths() + + def load_env(self, env_path: Path): + with open(env_path, "r") as file: + for line in file: + # Ignore comments and empty lines + line = line.strip() + if not line or line.startswith("#"): + continue + + # Parse key-value pairs + key, value = map(str.strip, line.split("=", 1)) + + # Set as environment variable + setattr(self, key, Path(value)) + + def assign_paths(self): + return + + +def _load_config() -> Config: + """ + Loads the configuration settings from the .env file and returns it as a Config object. + """ + if not _get_env_path().exists(): + raise FileNotFoundError("Config file not found. Run `bnp init` to create one.") + + return Config() diff --git a/beneuro_pose_estimation/old_cli.py b/beneuro_pose_estimation/old_cli.py new file mode 100644 index 0000000..001fc7a --- /dev/null +++ b/beneuro_pose_estimation/old_cli.py @@ -0,0 +1,140 @@ +# command line interface +import argparse + +from beneuro_pose_estimation import set_logging +from beneuro_pose_estimation.anipose.aniposeTools import run_pose_estimation +from beneuro_pose_estimation.sleap.sleapTools import ( + annotate_videos, + create_annotation_projects, + get_2Dpredictions, + train_models, + visualize_predictions, +) +from beneuro_pose_estimation import Config + + + +def main(): + # Initialize logging + set_logging() + + # Set up main parser + parser = argparse.ArgumentParser( + description="Beneuro Pose Estimation Toolkit", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + subparsers = parser.add_subparsers( + dest="command", required=True, help="Available commands" + ) + + # Subcommand: Annotate video + annotate_parser = subparsers.add_parser( + "annotate", help="Annotate video frames for SLEAP" + ) + annotate_parser.add_argument( + "--sessions", nargs="+", required=True, help="List of sessions to annotate" + ) + annotate_parser.add_argument( + "--cameras", nargs="*", help="List of cameras to annotate" + ) + annotate_parser.add_argument( + "--pred", + action="store_true", + help="Run predictions before annotating using an existing model", + ) + # Subcommand: Create annotation projects + annotation_parser = subparsers.add_parser( + "create-annotations", help="Create annotation projects for SLEAP" + ) + annotation_parser.add_argument( + "--sessions", + nargs="+", + required=True, + help="List of sessions for annotation projects", + ) + annotation_parser.add_argument( + "--cameras", nargs="*", help="List of cameras for annotation projects" + ) + + # Subcommand: Run pose estimation + pose_parser = subparsers.add_parser( + "pose-estimation", help="Run full 3D pose estimation" + ) + pose_parser.add_argument( + "--sessions", nargs="+", required=True, help="List of sessions to process" + ) + + # Subcommand: Run 2D predictions + predict_parser = subparsers.add_parser( + "predict-2D", help="Run 2D predictions with SLEAP" + ) + predict_parser.add_argument( + "--sessions", nargs="+", required=True, help="List of sessions to process" + ) + predict_parser.add_argument( + "--cameras", nargs="*", help="List of cameras to process" + ) + predict_parser.add_argument( + "--frames", nargs="*", help="Specific frames to predict on" + ) + predict_parser.add_argument( + "--input_file", type=str, help="Input file to video or directory" + ) + predict_parser.add_argument( + "--output_file", type=str, help="Output file for predictions" + ) + predict_parser.add_argument( + "--model_path", type=str, help="Model configuration file path" + ) + + # Subcommand: Train SLEAP models + train_parser = subparsers.add_parser("train", help="Train SLEAP models") + train_parser.add_argument( + "--sessions", nargs="+", required=True, help="List of sessions for training" + ) + train_parser.add_argument( + "--cameras", nargs="*", help="List of cameras for training" + ) + + # Subcommand: Visualize predictions + visualize_parser = subparsers.add_parser( + "visualize-2D", help="Launch SLEAP label to visualize predictions" + ) + visualize_parser.add_argument( + "--sessions", nargs="+", required=True, help="List of sessions to process" + ) + visualize_parser.add_argument( + "--cameras", nargs="*", help="List of cameras to process" + ) + + # Parse arguments + args = parser.parse_args() + + # Dispatch to the appropriate function + if args.command == "annotate": + annotate_videos(sessions=args.sessions, cameras=args.cameras, pred=args.pred) + elif args.command == "create-annotations": + create_annotation_projects(sessions=args.sessions, cameras=args.cameras) + elif args.command == "pose-estimation": + run_pose_estimation(sessions=args.sessions) + elif args.command == "predict-2D": + get_2Dpredictions( + sessions=args.sessions, + cameras=args.cameras, + frames=args.frames, + input_file=args.input_file, + output_file=args.output_file, + model_path=args.model_path, + ) + elif args.command == "train": + train_models(sessions=args.sessions, cameras=args.cameras) + elif args.command == "visualize-2D": + visualize_predictions(sessions=args.sessions, cameras=args.cameras) + else: + parser.print_help() + + +if __name__ == "__main__": + # main() + config = Config() + print(type(config.REPO_PATH)) diff --git a/beneuro_pose_estimation/params.py b/beneuro_pose_estimation/params.py index cca58d3..1dbdfd9 100644 --- a/beneuro_pose_estimation/params.py +++ b/beneuro_pose_estimation/params.py @@ -1,50 +1,52 @@ +from aniposelib.boards import CharucoBoard from sleap.info.feature_suggestions import ( FeatureSuggestionPipeline, - ParallelFeaturePipeline, ) -from aniposelib.boards import CharucoBoard ############### CONFIGURATIONS ######### PATHS -repo_dir = "/home/il620/beneuro_pose_estimation" -recordings_dir = "/mnt/rds/bb2020/projects/beneuro/live/raw" +repo_dir = r"C:\repos-windows\beneuro_pose_estimation" +recordings_dir = r"Z:\live\raw" # file format: "M043/M043_2024_10_23_11_15/M043_2024_10_23_11_15_cameras/M043_2024_10_23_11_15_camera_1.avi" -projects_dir = "/home/il620/beneuro_pose_estimation/projects" #? +projects_dir = "/home/il620/beneuro_pose_estimation/projects" # ? ## SLEAP paths -slp_annotations_dir = "/home/il620/beneuro_pose_estimation/projects/annotations" -slp_training_dir = "/home/il620/beneuro_pose_estimation/projects/training" -predictions_dir = "/home/il620/beneuro_pose_estimation/projects/predictions" #2D -slp_models_dir = "/mnt/rds/bb2020/projects/beneuro/live/raw/pose-estimation/models/h1_new_setup" # will change this -slp_training_config_path = "/mnt/rds/bb2020/projects/beneuro/live/raw/pose-estimation/models/h1_new_setup" - -skeleton_path = f"{repo_dir}/beneuro_pose_estimation/sleap/skeleton.json" -predicition_eval_dir = "/home/il620/beneuro_pose_estimation/projects/predictions/evaluation" +slp_annotations_dir = r"C:\repos-windows\beneuro_pose_estimation\projects\annotations" +slp_training_dir = r"C:\repos-windows\beneuro_pose_estimation\projects\training" +predictions_dir = r"C:\repos-windows\beneuro_pose_estimation\projects\predictions" # 2D +slp_models_dir = r"Z:\live\raw\pose-estimation\models\h1_new_setup" # will change this +slp_training_config_path = r"Z:\live\raw\pose-estimation\models\h1_new_setup" + +skeleton_path = rf"{repo_dir}\beneuro_pose_estimation\sleap\skeleton.json" +predicition_eval_dir = ( + r"C:\repos-windows\beneuro_pose_estimation\projects\predictions\evaluation" +) # input_2Dpred = slp_annotations_dir # can be recordings_dir or projects_dir or slp_annotations_dir input_2Dpred = recordings_dir ## Anipose paths # path to 3D pose estimation directory -complete_projects_dir = "/home/il620/beneuro_pose_estimation/projects/complete_projects" +complete_projects_dir = ( + r"C:\repos-windows\beneuro_pose_estimation\projects\complete_projects" +) # path to calibration videos directory -calib_vid_dir = "/mnt/rds/bb2020/projects/beneuro/live/raw/pose-estimation/calibration-videos" #? +calib_vid_dir = r"Z:\live\raw\pose-estimation\calibration-videos" # ? # path to the calibration output file directory -calibration_dir = f"{projects_dir}/calibrations" - +calibration_dir = f"{projects_dir}\calibrations" #### CAMERAS default_cameras = [ - "Camera_Top_Left", - "Camera_Side_Left", - "Camera_Front_Right", - "Camera_Front_Left", - "Camera_Side_Right", - "Camera_Back_Right" - ] - + "Camera_Top_Left", + "Camera_Side_Left", + "Camera_Front_Right", + "Camera_Front_Left", + "Camera_Side_Right", + "Camera_Back_Right", +] + camera_name_mapping = { "Camera_Top_Left": "camera_0", @@ -53,7 +55,7 @@ "Camera_Face": "camera_3", "Camera_Front_Left": "camera_4", "Camera_Side_Right": "camera_5", - "Camera_Back_Right": "camera_6" + "Camera_Back_Right": "camera_6", } #### SLEAP config @@ -69,7 +71,7 @@ n_components=10, n_clusters=10, per_cluster=5, - ) +) ## SLEAP training training_sessions = [] @@ -77,33 +79,33 @@ ## SLEAP 2D predictions sessions_to_predict = [] -# SLEAP tracking +# SLEAP tracking frames_to_predict = None tracking_options = None #### ANIPOSE config body_parts = [ - "shoulder_center", - "left_shoulder", - "left_paw", - "right_shoulder", - "right_elbow", - "right_paw", - "hip_center", - "left_knee", - "left_ankle", - "left_foot", - "right_knee", - "right_ankle", - "right_foot", - "tail_base", - "tail_middle", - "tail_tip", - "left_elbow", - "left_wrist", - "right_wrist" - ] + "shoulder_center", + "left_shoulder", + "left_paw", + "right_shoulder", + "right_elbow", + "right_paw", + "hip_center", + "left_knee", + "left_ankle", + "left_foot", + "right_knee", + "right_ankle", + "right_foot", + "tail_base", + "tail_middle", + "tail_tip", + "left_elbow", + "left_wrist", + "right_wrist", +] keypoints_dict = { 0: "shoulder_center", @@ -124,13 +126,32 @@ 15: "tail_tip", 16: "left_elbow", 17: "left_wrist", - 18: "right_wrist" + 18: "right_wrist", } -constraints = [[0,1],[0,3],[2,17],[16,17],[1,16],[5,18],[4,18],[6,7],[6,10],[7,8],[8,9],[10,11],[11,12],[6,13],[13,14],[14,15]] - -board = CharucoBoard(5, 4, square_length=10, marker_length=6, marker_bits=4, dict_size=250) +constraints = [ + [0, 1], + [0, 3], + [2, 17], + [16, 17], + [1, 16], + [5, 18], + [4, 18], + [6, 7], + [6, 10], + [7, 8], + [8, 9], + [10, 11], + [11, 12], + [6, 13], + [13, 14], + [14, 15], +] + +board = CharucoBoard( + 5, 4, square_length=10, marker_length=6, marker_bits=4, dict_size=250 +) fisheye = False @@ -141,21 +162,19 @@ "scale_length_weak": 1, "reproj_error_threshold": 5, "reproj_loss": "l2", - "n_deriv_smooth": 2 + "n_deriv_smooth": 2, } frame_window = 1000 # Angle calculation config angles = { - "right_knee": ["hip_center", "right_knee", "right_ankle"], - "left_knee": ["hip_center", "left_knee", "left_ankle"], - "right_ankle": ["right_knee", "right_ankle", "right_foot"], - "left_ankle": ["left_knee", "left_ankle", "left_foot"], - "right_wrist": ["right_elbow", "right_wrist", "right_paw"], - "left_wrist": ["left_elbow", "left_wrist", "left_paw"], - "right_elbow": ["right_shoulder", "right_elbow", "right_wrist"], - "left_elbow": ["left_shoulder", "left_elbow", "left_wrist"] - } - - + "right_knee": ["hip_center", "right_knee", "right_ankle"], + "left_knee": ["hip_center", "left_knee", "left_ankle"], + "right_ankle": ["right_knee", "right_ankle", "right_foot"], + "left_ankle": ["left_knee", "left_ankle", "left_foot"], + "right_wrist": ["right_elbow", "right_wrist", "right_paw"], + "left_wrist": ["left_elbow", "left_wrist", "left_paw"], + "right_elbow": ["right_shoulder", "right_elbow", "right_wrist"], + "left_elbow": ["left_shoulder", "left_elbow", "left_wrist"], +} diff --git a/beneuro_pose_estimation/sleap/sleapTools.py b/beneuro_pose_estimation/sleap/sleapTools.py index 3d68ef3..02c02cc 100644 --- a/beneuro_pose_estimation/sleap/sleapTools.py +++ b/beneuro_pose_estimation/sleap/sleapTools.py @@ -6,12 +6,12 @@ -> define paths in repo_path/beneuro_pose_estimation/params.py - + -> to run as package (in dev stage): conda activate bnp cd repo_path -python -m beneuro_pose_estimation.cli annotate --sessions session_name --cameras camera_name --pred (to launch annotation GUI to annotate; +python -m beneuro_pose_estimation.cli annotate --sessions session_name --cameras camera_name --pred (to launch annotation GUI to annotate; (if --pred, predictions are run on the annotation videos using the current model so anotations can be made by correcting predictions) python -m beneuro_pose_estimation.cli predict-2D --sessions session_name --cameras camera_name (to get 2D predictions) python -m beneuro_pose_estimation.cli visualize-2D --sessions session_name --cameras camera_name (to launch annotation GUI to visualize predictions) @@ -30,42 +30,33 @@ # cameras argument optional - default = params.default_cameras """ -import sys + +import logging import os import subprocess -from pathlib import Path + from beneuro_pose_estimation import params, set_logging -import logging -if not logging.getLogger().hasHandlers(): - logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') -import attr -import numpy as np -import random -import json -import matplotlib.pyplot as plt -import cv2 + import json -from typing import Dict, List, Optional, Union -from sleap.io.video import Video -from sleap import Labels, Video, LabeledFrame, Instance, Skeleton -from sleap.info.feature_suggestions import ( - FeatureSuggestionPipeline, - ParallelFeaturePipeline, -) -import sleap -import seaborn as sns +import cv2 +import matplotlib.pyplot as plt +import numpy as np import pandas as pd -# import argparse +import seaborn as sns +import sleap +from sleap import Instance, LabeledFrame, Labels, Skeleton, Video +from sleap.io.video import Video +logger = set_logging(__name__) -def compare_models(models_folder,test_gt_path = None): +def compare_models(models_folder, test_gt_path=None): """ TBD - test """ metrics_list = [] - + for folder in os.listdir(models_folder): model_folder = os.path.join(models_folder, folder) if os.path.isdir(model_folder): @@ -78,7 +69,7 @@ def compare_models(models_folder,test_gt_path = None): metrics = sleap.nn.evals.evaluate(labels_gt, labels_pr) else: metrics = sleap.load_metrics(model_folder, split="val") - + # Flatten metrics into a single row for DataFrame metrics_flat = { "Model": folder, @@ -88,9 +79,9 @@ def compare_models(models_folder,test_gt_path = None): metrics_flat[key] = value elif isinstance(value, (list, np.ndarray)): metrics_flat[key] = np.mean(value) # Use mean for lists/arrays - + metrics_list.append(metrics_flat) - + except Exception as e: print(f"Error evaluating model in folder {folder}: {e}") output_csv = f"{models_folder}/metrics.csv" @@ -98,14 +89,15 @@ def compare_models(models_folder,test_gt_path = None): metrics_df = pd.DataFrame(metrics_list) metrics_df.to_csv(output_csv, index=False) print(f"Metrics comparison saved to {output_csv}") - - return metrics_df -def find_best_models(metrics_df,metric = None): + return metrics_df + + +def find_best_models(metrics_df, metric=None): """ Print the best model for each metric. TBD - test - + Args: metrics_df (pd.DataFrame): DataFrame containing metrics for all models. """ @@ -114,7 +106,7 @@ def find_best_models(metrics_df,metric = None): if metric not in metrics_df.columns: print(f"Metric '{metric}' not found in the DataFrame.") return - + if pd.api.types.is_numeric_dtype(metrics_df[metric]): best_index = metrics_df[metric].idxmax() best_model = metrics_df.iloc[best_index]["Model"] @@ -128,12 +120,13 @@ def find_best_models(metrics_df,metric = None): if column != "Model" and pd.api.types.is_numeric_dtype(metrics_df[column]): best_index = metrics_df[column].idxmax() best_models[column] = metrics_df.iloc[best_index]["Model"] - + print("Best models for each metric:") for metric_name, model in best_models.items(): print(f"{metric_name}: {model}") -def evaluate_model(model_path,test_gt_path = None): + +def evaluate_model(model_path, test_gt_path=None): """ TBD - test @@ -146,82 +139,116 @@ def evaluate_model(model_path,test_gt_path = None): else: metrics = sleap.load_metrics(model_path, split="val") plt.figure(figsize=(6, 3), dpi=150, facecolor="w") - sns.histplot(metrics["dist.dists"].flatten(), binrange=(0, 20), kde=True, kde_kws={"clip": (0, 20)}, stat="probability") + sns.histplot( + metrics["dist.dists"].flatten(), + binrange=(0, 20), + kde=True, + kde_kws={"clip": (0, 20)}, + stat="probability", + ) plt.xlabel("Localization error (px)") plt.figure(figsize=(6, 3), dpi=150, facecolor="w") - sns.histplot(metrics["oks_voc.match_scores"].flatten(), binrange=(0, 1), kde=True, kde_kws={"clip": (0, 1)}, stat="probability") + sns.histplot( + metrics["oks_voc.match_scores"].flatten(), + binrange=(0, 1), + kde=True, + kde_kws={"clip": (0, 1)}, + stat="probability", + ) plt.xlabel("Object Keypoint Similarity") plt.figure(figsize=(4, 4), dpi=150, facecolor="w") - for precision, thresh in zip(metrics["oks_voc.precisions"][::2], metrics["oks_voc.match_score_thresholds"][::2]): - plt.plot(metrics["oks_voc.recall_thresholds"], precision, "-", label=f"OKS @ {thresh:.2f}") + for precision, thresh in zip( + metrics["oks_voc.precisions"][::2], + metrics["oks_voc.match_score_thresholds"][::2], + ): + plt.plot( + metrics["oks_voc.recall_thresholds"], + precision, + "-", + label=f"OKS @ {thresh:.2f}", + ) plt.xlabel("Recall") plt.ylabel("Precision") plt.legend(loc="lower left") - node_names = metrics.get("node_names", [f"Node {i}" for i in range(metrics["dist.dists"].shape[1])]) + node_names = metrics.get( + "node_names", [f"Node {i}" for i in range(metrics["dist.dists"].shape[1])] + ) dists_df = pd.DataFrame(metrics["dist.dists"], columns=node_names) dists_melted = dists_df.melt(var_name="Node", value_name="Error") # Create the boxplot plt.figure(figsize=(8, 6), dpi=150, facecolor="w") sns.boxplot(data=dists_melted, x="Error", y="Node", fliersize=0) - sns.stripplot(data=dists_melted, x="Error", y="Node", alpha=0.5, jitter=True, color="red") + sns.stripplot( + data=dists_melted, x="Error", y="Node", alpha=0.5, jitter=True, color="red" + ) plt.title("Localization Error by Node") plt.xlabel("Error (px)") plt.ylabel("Node") plt.grid(True) plt.show() - return + return + -def select_frames_to_annotate(session,camera,pipeline = params.frame_selection_pipeline,new_video_path = None): +def select_frames_to_annotate( + session, camera, pipeline=params.frame_selection_pipeline, new_video_path=None +): """ - - Selects frames to annotate using the feature suggestion pipeline, + - Selects frames to annotate using the feature suggestion pipeline, - Saves them as .png, - - Creates a new .mp4 video for faster processing + - Creates a new .mp4 video for faster processing """ - + # Define input video path animal = session.split("_")[0] # video_path = f"{params.recordings_dir}/{animal}/{session}/{session}_cameras/{session}_{params.camera_name_mapping.get(camera, camera)}.avi" video_path = f"{params.recordings_dir}/{animal}/{session}/{session}_cameras/{session}_{params.camera_name_mapping.get(camera, camera)}.avi" - try: + try: video = Video.from_filename(video_path) - + # Run frames selection pipeline pipeline.run_disk_stage([video]) frame_data = pipeline.run_processing_state() - # Define selected frames path + # Define selected frames path frames_dir = f"{params.slp_annotations_dir}/{session}_annotations/{session}_{camera}_annotations" os.makedirs(frames_dir, exist_ok=True) # Save selected frames as images in the frame directory for item in frame_data.items: frame_idx = item.frame_idx frame = video.get_frame(frame_idx) - plt.imsave(os.path.join(frames_dir, f'{session}_{camera}_frame_{frame_idx}.png'), frame) - + plt.imsave( + os.path.join(frames_dir, f"{session}_{camera}_frame_{frame_idx}.png"), + frame, + ) + logging.info(f"Selected frames saved for {session}, {camera}") except Exception as e: - logging.error(f"An error occurred while selecting frames to annotate for session {session}, camera {camera}: {e}") + logging.error( + f"An error occurred while selecting frames to annotate for session {session}, camera {camera}: {e}" + ) # create new video from the selected frames if new_video_path is None: - new_video_path = f"{frames_dir}/{session}_{camera}_annotations.mp4" - + new_video_path = f"{frames_dir}/{session}_{camera}_annotations.mp4" + try: - create_video_from_frames(frames_dir,new_video_path) + create_video_from_frames(frames_dir, new_video_path) except Exception as e: - logging.error(f"An error occurred while creating the annotation video for session {session}, camera {camera}: {e}") + logging.error( + f"An error occurred while creating the annotation video for session {session}, camera {camera}: {e}" + ) - - + return - return -def create_annotation_projects(sessions = params.sessions_to_annotate,cameras = params.default_cameras): - ''' +def create_annotation_projects( + sessions=params.sessions_to_annotate, cameras=params.default_cameras +): + """ create annotation projects for a list of sessions and cameras without launching GUI for annotation - ''' + """ cameras = cameras or params.default_cameras if isinstance(sessions, str): sessions = [sessions] @@ -229,10 +256,12 @@ def create_annotation_projects(sessions = params.sessions_to_annotate,cameras = cameras = [cameras] for session in sessions: for camera in cameras: - create_annotation_project(session,camera) - + create_annotation_project(session, camera) + -def create_video_from_frames(frames_dir, video_path, output_width=1280, output_height=720, fps=5): +def create_video_from_frames( + frames_dir, video_path, output_width=1280, output_height=720, fps=5 +): # Get a list of PNG image filenames images = [img for img in os.listdir(frames_dir) if img.endswith(".png")] @@ -240,7 +269,7 @@ def create_video_from_frames(frames_dir, video_path, output_width=1280, output_h images = sorted(images) # Set the video codec and create VideoWriter object - fourcc = cv2.VideoWriter_fourcc(*'mp4v') + fourcc = cv2.VideoWriter_fourcc(*"mp4v") video = cv2.VideoWriter(video_path, fourcc, fps, (output_width, output_height)) # Iterate through the PNG images and write them to the video @@ -266,25 +295,23 @@ def create_video_from_frames(frames_dir, video_path, output_width=1280, output_h return - def create_annotation_project(session, camera): - ''' + """ Create slp project for annotation to launch annotation GUI on * should we initialize instances for all the frames in the annotation video instead of just the first one? - ''' + """ # Paths # video_path = f"{params.recordings_dir}/{animal}/{session}/{session}_cameras/{session}_{params.camera_name_mapping.get(camera, camera)}.avi" - select_frames_to_annotate(session,camera,params.frame_selection_pipeline) + select_frames_to_annotate(session, camera, params.frame_selection_pipeline) labels_output_path = f"{params.slp_annotations_dir}/{session}_annotations/{session}_{camera}_annotations/{session}_{camera}.slp" annotations_dir_path = f"{params.slp_annotations_dir}/{session}_annotations/{session}_{camera}_annotations/" videos = [vid for vid in os.listdir(annotations_dir_path) if vid.endswith(".mp4")] - # Load skeleton - with open(params.skeleton_path, 'r') as f: + with open(params.skeleton_path, "r") as f: skeleton_data = json.load(f) skeleton = Skeleton.from_dict(skeleton_data) - + # Initialize a list of labeled frames labeled_frames = [] for vid in videos: @@ -296,17 +323,16 @@ def create_annotation_project(session, camera): os.makedirs(os.path.dirname(labels_output_path), exist_ok=True) labels.save(labels_output_path) logging.info(f"Sleap project created for session {session},camera {camera}.") - - return + return def create_annotation_project_inefficient(session, camera): - ''' + """ create annotation video using the full video (without creating a new video from the selected frames) - ''' + """ logging.info(f"Creating SLEAP project for session {session} and camera {camera}...") - + animal = session.split("_")[0] # Paths video_path = f"{params.recordings_dir}/{animal}/{session}/{session}_cameras/{session}_{params.camera_name_mapping.get(camera, camera)}.avi" @@ -314,7 +340,7 @@ def create_annotation_project_inefficient(session, camera): # Load video and skeleton video = Video.from_filename(video_path) - with open(params.skeleton_path, 'r') as f: + with open(params.skeleton_path, "r") as f: skeleton_data = json.load(f) skeleton = Skeleton.from_dict(skeleton_data) @@ -326,21 +352,24 @@ def create_annotation_project_inefficient(session, camera): for item in frame_data.items: frame_idx = item.frame_idx instances = [Instance(skeleton=skeleton)] # Empty instance - labeled_frame = LabeledFrame(video=video, frame_idx=frame_idx, instances=instances) + labeled_frame = LabeledFrame( + video=video, frame_idx=frame_idx, instances=instances + ) labeled_frames.append(labeled_frame) - logging.info(f"Labeled frame created for {session}, {camera}, frame {frame_idx}") + logging.info( + f"Labeled frame created for {session}, {camera}, frame {frame_idx}" + ) # Save the labeled frames to a .slp project file labels = Labels(labeled_frames) os.makedirs(os.path.dirname(labels_output_path), exist_ok=True) labels.save(labels_output_path) logging.info(f"Sleap project created for session {session},camera {camera}.") - - return + return -def annotate_videos(sessions, cameras = params.default_cameras, pred = False): +def annotate_videos(sessions, cameras=params.default_cameras, pred=False): """ creates slp project using selected frames from raw video and launches annotation GUI if pred, runs predictions on the selected frames with existing model, @@ -365,39 +394,49 @@ def annotate_videos(sessions, cameras = params.default_cameras, pred = False): project_path = f"{project_dir}/{session}_{camera}.slp" os.makedirs(project_dir, exist_ok=True) if not os.path.exists(project_path): - create_annotation_project(session,camera) + create_annotation_project(session, camera) if pred: model_dir = f"{params.slp_models_dir}/{camera}" if not os.path.exists(model_dir): - logging.info(f"Model directory for {camera} does not exist, skipping.") + logging.info( + f"Model directory for {camera} does not exist, skipping." + ) continue model_path = f"{model_dir}/training_config.json" command = [ - "sleap-track", - project_path, - "--video.index", "0", - "-m", model_path, - "-o", project_path + "sleap-track", + project_path, + "--video.index", + "0", + "-m", + model_path, + "-o", + project_path, ] - logging.info(f"Running sleap-track on annotation video") + logging.info("Running sleap-track on annotation video") # Run the sleap-track command using subprocess subprocess.run(command, check=True) - logging.info(f"Tracking completed\n") + logging.info("Tracking completed\n") - logging.info(f"Launching annotation GUI...") - subprocess.run(["sleap-label", project_path]) # first test if the project is created + logging.info("Launching annotation GUI...") + subprocess.run( + ["sleap-label", project_path] + ) # first test if the project is created except Exception as e: - logging.error(f"Failed to process camera {camera} for session {session}: {e}") + logging.error( + f"Failed to process camera {camera} for session {session}: {e}" + ) except Exception as e: logging.error(f"Failed to process session {session}: {e}") + def create_training_file(camera, sessions): """ .slp project for a specific camera - merging all projects for that camera """ # Path to save the combined training project combined_project_path = f"{params.slp_training_dir}/{camera}.slp" - + all_labeled_frames = [] for session in sessions: @@ -409,12 +448,18 @@ def create_training_file(camera, sessions): session_labels = sleap.load_file(session_slp_path) session_labeled_frames = session_labels.labeled_frames all_labeled_frames.extend(session_labeled_frames) - logging.info(f"Added {len(session_labeled_frames)} frames from session {session} for camera {camera}.") + logging.info( + f"Added {len(session_labeled_frames)} frames from session {session} for camera {camera}." + ) else: - logging.info(f"SLP annotation project for {session}, {camera} does not exist. Skipping.") + logging.info( + f"SLP annotation project for {session}, {camera} does not exist. Skipping." + ) continue except Exception as e: - logging.error(f"An error occurred while processing session {session} for camera {camera}: {e}") + logging.error( + f"An error occurred while processing session {session} for camera {camera}: {e}" + ) # Create a new Labels object with the combined labeled frames combined_labels = Labels(labeled_frames=all_labeled_frames) @@ -430,7 +475,8 @@ def create_training_file(camera, sessions): return -def create_training_projects(sessions, cameras = params.default_cameras): + +def create_training_projects(sessions, cameras=params.default_cameras): """ creates .slp training projects """ @@ -450,15 +496,17 @@ def create_training_projects(sessions, cameras = params.default_cameras): # Ensure configuration file exists if not os.path.exists(config_file): - logging.info(f"Configuration file for {camera} does not exist, using default one") + logging.info( + f"Configuration file for {camera} does not exist, using default one" + ) source_path = params.training_config_path # Read JSON data from the source file - with open(source_path, 'r') as src_file: + with open(source_path, "r") as src_file: data = json.load(src_file) # Write JSON data to the destination file - with open(config_file, 'w') as dest_file: + with open(config_file, "w") as dest_file: json.dump(data, dest_file, indent=4) logging.info(f"File copied from {source_path} to {config_file}") @@ -469,13 +517,14 @@ def create_training_projects(sessions, cameras = params.default_cameras): def create_training_config_file(config_file): return -def train_models(cameras = params.default_cameras, sessions = params.training_sessions): - ''' - TBD + +def train_models(cameras=params.default_cameras, sessions=params.training_sessions): + """ + TBD - create config file with training parameters; check if config file exists, if not create it using the parameters in params - test creation of training project - can be done from GUI - ''' + """ cameras = cameras or params.default_cameras if isinstance(sessions, str): @@ -484,7 +533,6 @@ def train_models(cameras = params.default_cameras, sessions = params.training_se if isinstance(cameras, str): cameras = [cameras] - # Run sleap-train for each session and camera combination for camera in cameras: # Define paths for model and labels @@ -504,12 +552,12 @@ def train_models(cameras = params.default_cameras, sessions = params.training_se if not os.path.exists(config_file): create_training_config_file(config_file) logging.info(f"Configuration file for {camera} created.") - + # Run sleap-train command logging.info(f"Training model for {camera}...") command = ["sleap-train", config_file, labels_file] result = subprocess.run(command, cwd=training_dir) - + if result.returncode == 0: logging.info(f"Finished training for {camera}.") else: @@ -518,12 +566,18 @@ def train_models(cameras = params.default_cameras, sessions = params.training_se logging.info("All training has been executed.") - def select_frames_to_predict(): - return + return -def get_2Dpredictions(sessions = params.sessions_to_predict, cameras = params.default_cameras,frames = params.frames_to_predict,input_file = None, output_file = None, model_path = None): +def get_2Dpredictions( + sessions=params.sessions_to_predict, + cameras=params.default_cameras, + frames=params.frames_to_predict, + input_file=None, + output_file=None, + model_path=None, +): """ Runs sleap-track on a list of sessions and cameras. ------- @@ -547,73 +601,86 @@ def get_2Dpredictions(sessions = params.sessions_to_predict, cameras = params.de logging.error(f"Failed to create predictions directory: {e}") return - tracking_options = params.tracking_options ## If input, model, output files are specified, run directly - could remove this if input_file is not None: - if os.path.isfile(input_file) and output_file is not None and model_path is not None: + if ( + os.path.isfile(input_file) + and output_file is not None + and model_path is not None + ): command = [ - "sleap-track", - input_file, - "--video.index", "0", - "-m", model_path, - "-o", output_file - ] - + "sleap-track", + input_file, + "--video.index", + "0", + "-m", + model_path, + "-o", + output_file, + ] + # Add frames to predict on if specified - otherwise all frames if frames: command.extend(["--frames", frames]) - + # Add tracking options if specified if tracking_options: command.extend(tracking_options.split()) - logging.info(f"Running sleap-track") + logging.info("Running sleap-track") logging.info(f"Input file: {input_file}") logging.info(f"Output file: {output_file}") # Run the sleap-track command using subprocess subprocess.run(command, check=True) - logging.info(f"Tracking completed\n") - + logging.info("Tracking completed\n") + else: input_dir = input_file for camera in cameras: model_dir = f"{params.slp_models_dir}/{camera}" if not os.path.exists(model_dir): - logging.info(f"Model directory for {camera} does not exist, skipping.") + logging.info( + f"Model directory for {camera} does not exist, skipping." + ) continue model_path = f"{model_dir}/training_config.json" - input_file = f"{input_dir}/{params.camera_name_mapping.get(camera, camera)}.avi" + input_file = ( + f"{input_dir}/{params.camera_name_mapping.get(camera, camera)}.avi" + ) output_file = f"{params.predictions_dir}/{sessions[0]}_{camera}.slp.predictions.slp" - logging.info(f"Running sleap-track for camera {camera}") logging.info(f"Input file: {input_file}") logging.info(f"Output file: {output_file}") - # construct sleap-track command + # construct sleap-track command command = [ "sleap-track", input_file, - "--video.index", "0", - "-m", model_path, - "-o", output_file + "--video.index", + "0", + "-m", + model_path, + "-o", + output_file, ] - + # Add frames to predict on if specified - otherwise all frames if frames: command.extend(["--frames", frames]) - + # Add tracking options if specified if tracking_options: command.extend(tracking_options.split()) # Run the sleap-track command using subprocess subprocess.run(command, check=True) - logging.info(f"Tracking completed for session {sessions[0]}, camera {camera}\n") + logging.info( + f"Tracking completed for session {sessions[0]}, camera {camera}\n" + ) - ## Otherwise go through the list of sessions and cameras else: for session in sessions: @@ -622,12 +689,14 @@ def get_2Dpredictions(sessions = params.sessions_to_predict, cameras = params.de try: model_dir = f"{params.slp_models_dir}/{camera}" if not os.path.exists(model_dir): - logging.info(f"Model directory for {camera} does not exist, skipping.") + logging.info( + f"Model directory for {camera} does not exist, skipping." + ) continue model_path = f"{model_dir}/training_config.json" - # Different cases for different input directories because different saving formats are used - if "raw" in params.input_2Dpred: + # Different cases for different input directories because different saving formats are used + if "raw" in params.input_2Dpred: input_file = f"{params.input_2Dpred}/{animal}/{session}/{session}_cameras/{session}_{params.camera_name_mapping.get(camera, camera)}.avi" elif "annotations" in params.input_2Dpred: input_file = f"{params.slp_annotations_dir}/{session}_annotations/{session}_{camera}_annotations/{session}_{camera}.slp" @@ -635,41 +704,48 @@ def get_2Dpredictions(sessions = params.sessions_to_predict, cameras = params.de input_file = f"{params.input_2Dpred}/{session}/{camera}/{session}_{camera}.slp" output_file = f"{params.predictions_dir}/{session}_{camera}.slp.predictions.slp" - - logging.info(f"Running sleap-track for session {session} and camera {camera}") + logging.info( + f"Running sleap-track for session {session} and camera {camera}" + ) logging.info(f"Input file: {input_file}") logging.info(f"Output file: {output_file}") - # construct sleap-track command + # construct sleap-track command command = [ "sleap-track", input_file, - "--video.index", "0", - "-m", model_path, - "-o", output_file + "--video.index", + "0", + "-m", + model_path, + "-o", + output_file, ] - + # Add frames to predict on if specified - otherwise all frames if frames: command.extend(["--frames", frames]) - + # Add tracking options if specified if tracking_options: command.extend(tracking_options.split()) # Run the sleap-track command using subprocess subprocess.run(command, check=True) - logging.info(f"Tracking completed for session {session}, camera {camera}\n") + logging.info( + f"Tracking completed for session {session}, camera {camera}\n" + ) except Exception as e: - logging.error(f"Failed to process session {session}, camera {camera}: {e}") - + logging.error( + f"Failed to process session {session}, camera {camera}: {e}" + ) return -def visualize_predictions(sessions, cameras = params.default_cameras): +def visualize_predictions(sessions, cameras=params.default_cameras): """ Launches SLEAP GUI for the predictions slp project for a list of sessions and cameras """ @@ -680,7 +756,8 @@ def visualize_predictions(sessions, cameras = params.default_cameras): cameras = [cameras] for session in sessions: for camera in cameras: - predictions_path = f"{params.predictions_dir}/{session}_{camera}.slp.predictions.slp" + predictions_path = ( + f"{params.predictions_dir}/{session}_{camera}.slp.predictions.slp" + ) subprocess.run(["sleap-label", predictions_path]) return - diff --git a/beneuro_pose_estimation/update_bnp.py b/beneuro_pose_estimation/update_bnp.py new file mode 100644 index 0000000..0aa08e2 --- /dev/null +++ b/beneuro_pose_estimation/update_bnp.py @@ -0,0 +1,119 @@ +import subprocess +from pathlib import Path + +from typing import List + +from beneuro_pose_estimation import set_logging +from beneuro_pose_estimation.config import _load_config + +logger = set_logging(__name__) + +def _run_git_command(repo_path: Path, command: List[str]) -> str: + """ + Run a git command in the specified repository and return its output + + Parameters + ---------- + repo_path : Path + Path to the git repository to run the command in. + command : list[str] + Git command to run, as a list of strings. + E.g. ["log", "HEAD..origin/main", "--oneline"] + + Returns + ------- + The output of the git command as a string. + """ + repo_path = Path(repo_path) + if not repo_path.is_absolute(): + raise ValueError(f"{repo_path} is not an absolute path") + + if not (repo_path / ".git").exists(): + raise ValueError(f"{repo_path} is not a git repository") + + result = subprocess.run( + ["git", "-C", str(repo_path.absolute())] + command, capture_output=True, text=True + ) + if result.returncode != 0: + raise Exception(f"Git command failed: {result.stderr}") + + return result.stdout.strip() + + +def _get_new_commits(repo_path: Path) -> List[str]: + """ + Check for new commits from origin/main of the specified repository. + + Parameters + ---------- + repo_path : Path + Path to the git repository. + + Returns + ------- + Each new commit as a string in a list. + """ + repo_path = Path(repo_path) + + # Fetch the latest changes from the remote repository + _run_git_command(repo_path, ["fetch"]) + + # Check if origin/main has new commits compared to the local branch + new_commits = _run_git_command(repo_path, ["log", "HEAD..origin/main", "--oneline"]) + + # filter empty lines and strip whitespaces + return [ + commit.strip() for commit in new_commits.split("\n") if commit.strip() != "" + ] + +def check_for_updates() -> bool: + """ + Check if the package has new commits on the origin/main branch. + + Returns True if new commits are found, False otherwise. + """ + config = _load_config() + package_path = config.REPO_PATH + + new_commits = _get_new_commits(package_path) + + if len(new_commits) > 0: + print("New commits found, run `bnd self-update` to update the package.") + for commit in new_commits: + print(f" - {commit}") + + return True + + print("No new commits found, package is up to date.") + +def update_bnp(print_new_commits: bool = True) -> None: + """ + Update bnd if it was installed with conda + + Parameters + ---------- + install_method + print_new_commits + + """ + config = _load_config() + + new_commits = _get_new_commits(config.REPO_PATH) + + if len(new_commits) > 0: + print("New commits found, pulling changes...") + + print(1 * "\n") + + _run_git_command(config.REPO_PATH, ["pull", "origin", "main"]) + + print(1 * "\n") + print("Package updated successfully.") + print("\n") + + if print_new_commits: + print("New commits:") + for commit in new_commits: + print(f" - {commit}") + else: + print("Package appears to be up to date, no new commits found.") \ No newline at end of file diff --git a/scripts/test_calib.py b/scripts/test_calib.py index 9c415be..826dd32 100644 --- a/scripts/test_calib.py +++ b/scripts/test_calib.py @@ -1,11 +1,11 @@ -import sys import os - +import sys # Add the parent directory of 'scripts' to sys.path parent_dir = os.path.abspath(os.path.join(os.getcwd(), os.pardir)) sys.path.append(parent_dir) import beneuro_pose_estimation.anipose.aniposeTools as aniposeTools + recent_calib_folder = "/mnt/rds/bb2020/projects/beneuro/live/raw/pose-estimation/calibration-videos/camera_calibration_2024_11_20_11_25/Recording_2024-11-20T113135" calib_file_path = "/home/il620/beneuro_pose_estimation/projects/calibrations/calibration_2024_11_20_11_25.toml" -aniposeTools.get_calib_file(recent_calib_folder, calib_file_path) \ No newline at end of file +aniposeTools.get_calib_file(recent_calib_folder, calib_file_path) diff --git a/setup.py b/setup.py index fb88c7e..b2f48d6 100644 --- a/setup.py +++ b/setup.py @@ -1,21 +1,16 @@ -from setuptools import setup, find_packages +from setuptools import find_packages, setup setup( - name="beneuro_pose_estimation", # The name of your package - version="1.0", # Version of your package + name="bnp", # The name of your package + version="0.1.0", # Version of your package packages=find_packages(), # Automatically find all packages in your project - install_requires=[ - # Add any additional Python dependencies here - # These should match what you've included in your environment file - "opencv-contrib-python<4.7.0", - "sleap_anipose", - "anipose<1.1", - "apptools", - ], + install_requires=[], entry_points={ "console_scripts": [ - "bnp=beneuro_pose_estimation.cli:main", # Register the CLI command + "bnp=beneuro_pose_estimation.cli:app", # Register the CLI command ], }, python_requires=">=3.7", ) + +