Skip to content
82 changes: 44 additions & 38 deletions beneuro_pose_estimation/anipose/aniposeTools.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
"""
Module to carry out Anipose operations
TBD:
- test evaluate_reprojection
- check logging
- needs checking after the last changes
-------------------------------
conda activate bnp
-> in dev:
python -m beneuro_pose_estimation.cli pose-estimation --sessions session_name(s)
-> after package installation:
pose pose-estimation --sessions session_name(s)
bnp init # to create the .env file
bnp pose session_name(s)


"""

Expand All @@ -35,6 +33,8 @@
import toml
from anipose.compute_angles import compute_angles
from aniposelib.cameras import CameraGroup
from beneuro_pose_estimation.config import _load_config
config = _load_config()


def evaluate_reprojection(reprojection_path, predictions_2D_dir, histogram_path=None):
Expand Down Expand Up @@ -91,7 +91,7 @@ def get_most_recent_calib(session):

# Iterate over all calibration folders and extract timestamps
calib_folders = []
calib_vid_dir = Path(params.calib_vid_dir)
calib_vid_dir = config.calibration_videos
for folder in calib_vid_dir.iterdir():
if folder.is_dir():
try:
Expand Down Expand Up @@ -128,8 +128,8 @@ def get_most_recent_calib(session):
calib_file_name = Path(
f"calibration_{calib_datetime.strftime('%Y_%m_%d_%H_%M')}.toml"
)
calib_file_path = Path(params.calibration_dir) / calib_file_name
print(calib_file_path)
calib_file_path = config.calibration / calib_file_name
logging.debug(str(calib_file_path))
# Create calibration configuration if it doesn't exist
if not calib_file_path.exists():
get_calib_file(recent_calib_folder, calib_file_path)
Expand All @@ -140,7 +140,7 @@ def get_most_recent_calib(session):
return calib_file_path


def get_calib_file(calib_videos_dir, calib_save_path, board=params.board):
def get_calib_file(calib_videos_dir, calib_save_path = None, board=params.board):
"""
Generates calibration file using ChArUco board videos. - get most recent calibration

Expand All @@ -161,19 +161,20 @@ def get_calib_file(calib_videos_dir, calib_save_path, board=params.board):
calib_videos_dir = next(
calib_videos_dir.iterdir(), None
) # might want to change this
video_files = os.listdir(calib_videos_dir)
video_files = list(calib_videos_dir.iterdir())
cam_names, vidnames = [], []
reversed_mapping = {v: k for k, v in params.camera_name_mapping.items()}
for video_file in video_files:
if video_file.endswith(".avi") or video_file.endswith(".mp4"):
# cam_name = "_".join(video_file.split('_')[:2])
camera = Path(video_file).stem
if video_file.suffix in [".avi", ".mp4"]: # Check file extension
camera = video_file.stem
if camera == "Camera_3":
continue
cam_name = reversed_mapping.get(camera, camera)
vidnames.append([f"{calib_videos_dir}/{video_file}"])
vidnames.append([str(video_file)]) # Convert to str if required by downstream methods
cam_names.append(cam_name)

if calib_save_path is None:
calib_save_path = config.calibration / "calibration.toml"
# Initialize and configure CharucoBoard and CameraGroup
cgroup = CameraGroup.from_names(cam_names, fisheye=params.fisheye)
cgroup.calibrate_videos(vidnames, board)
Expand All @@ -185,8 +186,8 @@ def get_calib_file(calib_videos_dir, calib_save_path, board=params.board):
def convert_2Dpred_to_h5(
sessions,
cameras=params.default_cameras,
input_dir=params.predictions_dir,
output_dir=params.complete_projects_dir,
input_dir=None,
output_dir=None,
):
"""
Converts .slp.predictions.slp files to .h5 analysis files for each session and camera.
Expand All @@ -195,14 +196,19 @@ def convert_2Dpred_to_h5(
sessions = [sessions]
if isinstance(cameras, str):
cameras = [cameras]
if input_dir is None:
input_dir = config.predictions2D
if output_dir is None:
output_dir = config.predictions3D

for session in sessions:
session_dir = f"{output_dir}/{session}"
os.makedirs(session_dir, exist_ok=True)
session_dir = output_dir/session
session_dir.mkdir(parents=True, exist_ok=True)
for camera in cameras:
input_file = f"{input_dir}/{session}_{camera}.slp.predictions.slp"
os.makedirs(f"{session_dir}/{camera}", exist_ok=True)
output_file = f"{session_dir}/{camera}/{session}_{camera}.analysis.h5"
if os.path.exists(output_file):
input_file = input_dir/f"{session}_{camera}.slp.predictions.slp"
output_file = session_dir/camera/f"{session}_{camera}.analysis.h5"
output_file.parent.mkdir(parents=True, exist_ok=True)
if output_file.exists():
logging.info(f"Output file {output_file} already exists. Skipping...")
else:
try:
Expand Down Expand Up @@ -437,50 +443,50 @@ def extract_date(session_name):

def run_pose_estimation(
sessions,
log_file=None,
projects_dir=params.complete_projects_dir,
projects_dir = None,
videos_folder=None,
eval=False,
eval=False
):
"""
Main routing from videos to 3D keypoints and angles.
"""
# set_logging(log_file)

if isinstance(sessions, str):
sessions = [sessions]
if projects_dir is None:
projects_dir = config.predictions3D
for session in sessions:
logging.info(f"Running pose estimation on {session}")
project_dir = f"{projects_dir}/{session}"
os.makedirs(project_dir, exist_ok=True)
project_dir = projects_dir / session
project_dir.mkdir(parents=True, exist_ok=True)
sleapTools.get_2Dpredictions(session, input_file=videos_folder)
convert_2Dpred_to_h5(session)
convert_2Dpred_to_h5(session)
###############################################
# calib_file_path = get_most_recent_calib("M045_2024_11_20_11_35")
calib_file_path = get_most_recent_calib(session)
compute_3Dpredictions(
session, calib_file_path=calib_file_path, project_dir=project_dir, eval=eval
)
labels_fname = f"{project_dir}/{session}_3d_predictions.csv"
labels_fname = project_dir/f"{session}_3d_predictions.csv"
save_to_csv(
session,
f"{project_dir}/{session}_pose_estimation_combined.h5",
project_dir/f"{session}_pose_estimation_combined.h5",
labels_fname,
)
config_path = f"{project_dir}/config.toml"
if not os.path.exists(config_path):
config_path = project_dir/"config.toml"
if not config_path.exists():
config_path = create_config_file(config_path)
config = toml.load(config_path)
angles_csv = f"{project_dir}/{session}_angles.csv"
angles_csv = project_dir/f"{session}_angles.csv"
labels_data = pd.read_csv(labels_fname)
print(labels_data.columns)
logging.debug(labels_data.columns)
compute_angles(config, labels_fname, angles_csv)
logging.info(f"Pose estimation completed for {session}")
pose_data = pd.read_csv(labels_fname)
angles_data = pd.read_csv(angles_csv)

# Combine pose data and angles data
combined_data = pd.concat([pose_data, angles_data], axis=1)
combined_csv = f"{project_dir}/{session}_pose_and_angles.csv"
combined_csv = project_dir/f"{session}_pose_and_angles.csv"
# Save the updated CSV
combined_data.to_csv(combined_csv, index=False)
logging.info(f"Angles computed and combined CSV saved at {combined_csv}")
66 changes: 56 additions & 10 deletions beneuro_pose_estimation/cli.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from pathlib import Path

import typer

from typing import List, Optional
from rich import print

from beneuro_pose_estimation import params, set_logging
from beneuro_pose_estimation.config import _check_config, _get_package_path, \
_check_is_git_track, _check_root, _get_env_path
from beneuro_pose_estimation.sleap.sleapTools import annotate_videos
# from beneuro_pose_estimation.sleap.sleapTools import annotate_videos, get_2Dpredictions
from beneuro_pose_estimation.update_bnp import check_for_updates, update_bnp

from pathlib import Path
# Create a Typer app
app = typer.Typer(
add_completion=False, # Disable the auto-completion options
Expand All @@ -22,29 +22,75 @@

@app.command()
def annotate(
session_name: str = typer.Argument(..., help="Session name to annotate"),
session: str = typer.Argument(..., help="Session name to annotate"),
camera: str = typer.Argument(..., help=f"Camera name to annotate. Must be part of {params.default_cameras}"),
pred: bool = typer.Option(True, "--pred/--no-pred", help="Run annotation on prediction or not.", ),
pred: bool = typer.Option(True, "--pred/--no-pred", help="Run annotation on prediction or not." )
):
"""
Annotate sleap project
Create annotation project for the session if it doesn't exist and launch annotation GUI.
"""
from beneuro_pose_estimation.sleap.sleapTools import annotate_videos
annotate_videos(
sessions=session_name,
sessions=session,
cameras=camera,
pred=pred)

return

def create_annotation_project():
@app.command()
def create_annotation_projects(
sessions: List[str] = typer.Argument(
..., help="Session name(s) to annotate. Provide as a single session name or a list of session names."
),
cameras: List[str] = typer.Option(
None, "--cameras", "-c", help=f"Camera name(s) to annotate. Provide as a single camera name or a list of camera names. Defaults to {params.default_cameras} if not specified."
),
pred: bool = typer.Option(True, "--pred/--no-pred", help="Run annotation on prediction or not." )
):
"""
Create annotation projects for a list of sessions and cameras without launching the GUI.
"""
from beneuro_pose_estimation.sleap.sleapTools import create_annotation_projects
create_annotation_projects(sessions, cameras,pred)
return

def run_pose_estimation():
@app.command()
def pose(
sessions: List[str] = typer.Argument(
..., help="Session name(s) to run pose estimation on. Provide as a single session name or a list of session names."
)
):
from beneuro_pose_estimation.anipose.aniposeTools import run_pose_estimation

run_pose_estimation(sessions)

return

def get_2d_predictions():
@app.command()
def track_2d(
sessions: List[str] = typer.Argument(
..., help="Session name(s) to track. Provide as a single session name or a list of session names."
),
cameras: List[str] = typer.Option(
None, "--cameras", "-c", help=f"Camera name(s) to track. Provide as a single camera name or a list of camera names. Defaults to {params.default_cameras} if not specified."
),
):
"""
Get 2D predictions for a list of sessions and cameras
"""
if cameras is None:
cameras = params.default_cameras
logger.info(f"No cameras specified. Predictions will be run on all default cameras: {params.default_cameras}")
from beneuro_pose_estimation.sleap.sleapTools import get_2Dpredictions
get_2Dpredictions(
sessions=sessions,
cameras = cameras

)

return


# =================================== Updating ==========================================


Expand Down
16 changes: 16 additions & 0 deletions beneuro_pose_estimation/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

from pathlib import Path

from rich import print


Expand Down Expand Up @@ -75,6 +76,21 @@ def load_env(self, env_path: Path):
setattr(self, key, Path(value))

def assign_paths(self):
self.recordings_remote = self.REMOTE_PATH / "raw"
self.annotation_party = self.REMOTE_PATH / "processed" / "AnnotationParty"
self.annotations = self.annotation_party / "annotations"
self.models = (
self.REMOTE_PATH / "raw" / "pose-estimation" / "models" / "h1_new_setup"
)
self.skeleton_path = (
self.REPO_PATH / "beneuro_pose_estimation" / "sleap" / "skeleton.json"
)
self.recordings = self.annotation_party # can change to self.recordings_local
self.predictions2D = self.LOCAL_PATH / "predictions2D"
self.training = self.REMOTE_PATH / "pose-estimation" / "models" / "uren_setup"
self.predictions3D = self.LOCAL_PATH / "predictions3D"
self.calibration_videos = self.REMOTE_PATH / "raw" / "calibration_videos"
self.calibration = self.LOCAL_PATH / "calibration_config"
return


Expand Down
Loading
Loading