diff --git a/.gitignore b/.gitignore index 68bc17f..6551ce8 100644 --- a/.gitignore +++ b/.gitignore @@ -158,3 +158,15 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. #.idea/ + + +*.env +*.vtp +*.trc +*.osim +*.csv +*.json +*.xml +*.yaml +*.yml +*.mot \ No newline at end of file diff --git a/README.md b/README.md index dc10dbe..bc02940 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ # opencap-analysis -This repository contains analyses to be deployed on OpenCap web application. +This repository contains analyses to be deployed on OpenCap web application. diff --git a/gait_analysis/function/handler.py b/gait_analysis/function/handler.py index 5bff5f2..76390a9 100644 --- a/gait_analysis/function/handler.py +++ b/gait_analysis/function/handler.py @@ -31,8 +31,24 @@ def handler(event, context): To invoke the function do POST request on the following url http://localhost:8080/2015-03-31/functions/function/invocations """ - # temporary placeholder - kwargs = json.loads(event['body']) + try: + return _handler(event, context) + except Exception as e: + return { + 'statusCode': 500, + 'headers': {'Content-Type': 'application/json'}, + 'body': {'error': str(e)} + } + + +def _handler(event, context): + body = event.get('body', None) + if isinstance(body, dict): + kwargs = body + elif isinstance(body, str) and body: + kwargs = json.loads(body) + else: + kwargs = event for field in ('session_id', 'specific_trial_names'): if field not in kwargs: @@ -79,14 +95,23 @@ def handler(event, context): # %% Process data. # Init gait analysis and get gait events. + # Try with end trimming first (removes HRNet artifacts when subject leaves + # frame); fall back to no trimming if not enough gait cycles are found. legs = ['r'] gait, gait_events = {}, {} for leg in legs: - gait[leg] = gait_analysis( - sessionDir, trial_name, leg=leg, - lowpass_cutoff_frequency_for_coordinate_values=filter_frequency, - n_gait_cycles=n_gait_cycles, gait_style='overground', - trimming_start=0, trimming_end=0.5) + try: + gait[leg] = gait_analysis( + sessionDir, trial_name, leg=leg, + lowpass_cutoff_frequency_for_coordinate_values=filter_frequency, + n_gait_cycles=n_gait_cycles, gait_style='overground', + trimming_start=0, trimming_end=0.5) + except Exception: + gait[leg] = gait_analysis( + sessionDir, trial_name, leg=leg, + lowpass_cutoff_frequency_for_coordinate_values=filter_frequency, + n_gait_cycles=n_gait_cycles, gait_style='overground', + trimming_start=0, trimming_end=0) gait_events[leg] = gait[leg].get_gait_events() # Select last leg. diff --git a/gait_analysis/function/marker_name_mapping.py b/gait_analysis/function/marker_name_mapping.py new file mode 100644 index 0000000..cf18e2a --- /dev/null +++ b/gait_analysis/function/marker_name_mapping.py @@ -0,0 +1,56 @@ +""" +Marker name mapping dictionary for converting from expected format (with '_study' suffix) +to actual format (without '_study' suffix, lowercase). + +This mapping is used to rename markers in TRC files to match the expected format +used by the gait_analysis class. + +Expected format: markers end with '_study' (e.g., 'r_calc_study', 'r.ASIS_study') +Actual format: markers without '_study' suffix, lowercase (e.g., 'r_calc', 'r_ASIS') +""" + +MARKER_NAME_MAPPING = { + # Pelvis markers + 'r.ASIS_study': 'r_ASIS', + 'L.ASIS_study': 'l_ASIS', + 'r.PSIS_study': 'r_PSIS', + 'L.PSIS_study': 'l_PSIS', + + # Right leg markers + 'r_knee_study': 'r_knee', + 'r_mknee_study': 'r_mknee', + 'r_ankle_study': 'r_ankle', + 'r_mankle_study': 'r_mankle', + 'r_toe_study': 'r_toe', + 'r_5meta_study': 'r_5meta', + 'r_calc_study': 'r_calc', + + # Left leg markers + 'L_knee_study': 'l_knee', + 'L_mknee_study': 'l_mknee', + 'L_ankle_study': 'l_ankle', + 'L_mankle_study': 'l_mankle', + 'L_toe_study': 'l_toe', + 'L_calc_study': 'l_calc', + 'L_5meta_study': 'l_5meta', + + # Shoulder markers + 'r_shoulder_study': 'r_shoulder', + 'L_shoulder_study': 'l_shoulder', + + # Spine markers + 'C7_study': 'C7', + + # Hip joint centers + 'RHJC_study': 'RHJC', # Check if exists in actual file + 'LHJC_study': 'LHJC', # Check if exists in actual file + + # Elbow markers + 'r_melbow_study': 'r_melbow', + 'L_melbow_study': 'l_melbow', + +} + +# Reverse mapping (actual -> expected) for renaming markers in TRC files +REVERSE_MARKER_NAME_MAPPING = {v: k for k, v in MARKER_NAME_MAPPING.items()} + diff --git a/gait_analysis/function/utils.py b/gait_analysis/function/utils.py index 1e2b162..78932c4 100644 --- a/gait_analysis/function/utils.py +++ b/gait_analysis/function/utils.py @@ -34,7 +34,7 @@ from utilsAPI import get_api_url from utilsAuthentication import get_token import matplotlib.pyplot as plt -from scipy.signal import gaussian +from scipy.signal.windows import gaussian API_URL = get_api_url() @@ -80,6 +80,22 @@ def get_user_sessions_all(user_token=API_TOKEN): return sessions +# Returns a list of all subjects of the user. +def get_user_subjects(user_token=API_TOKEN): + subjects = requests.get( + API_URL + "subjects/", + headers = {"Authorization": "Token {}".format(user_token)}).json() + + return subjects + +# Returns a list of all sessions of a subject. +def get_subject_sessions(subject_id, user_token=API_TOKEN): + sessions = requests.get( + API_URL + "subjects/{}/".format(subject_id), + headers = {"Authorization": "Token {}".format(user_token)}).json()['sessions'] + + return sessions + def get_trial_json(trial_id): trialJson = requests.get( API_URL + "trials/{}/".format(trial_id), @@ -89,6 +105,8 @@ def get_trial_json(trial_id): def get_neutral_trial_id(session_id): session = get_session_json(session_id) + if session['isMono']: + return None neutral_ids = [t['id'] for t in session['trials'] if t['name']=='neutral'] if len(neutral_ids)>0: @@ -125,28 +143,36 @@ def get_camera_mapping(session_id, session_path): if not os.path.exists(mappingPath): mappingURL = trial['results'][resultTags.index('camera_mapping')]['media'] download_file(mappingURL, mappingPath) - -def get_model_and_metadata(session_id, session_path): - neutral_id = get_neutral_trial_id(session_id) - trial = get_trial_json(neutral_id) - resultTags = [res['tag'] for res in trial['results']] - - # Metadata. + +def get_metadata(session_path, trial, resultTags): metadataPath = os.path.join(session_path,'sessionMetadata.yaml') - if not os.path.exists(metadataPath) : + if not os.path.exists(metadataPath): metadataURL = trial['results'][resultTags.index('session_metadata')]['media'] download_file(metadataURL, metadataPath) - # Model. + +def get_model(session_path, trial, resultTags, isMono=False): modelURL = trial['results'][resultTags.index('opensim_model')]['media'] modelName = modelURL[modelURL.rfind('-')+1:modelURL.rfind('?')] modelFolder = os.path.join(session_path, 'OpenSimData', 'Model') + if isMono: + modelFolder = os.path.join(modelFolder, trial['name']) modelPath = os.path.join(modelFolder, modelName) if not os.path.exists(modelPath): os.makedirs(modelFolder, exist_ok=True) download_file(modelURL, modelPath) - + return modelName + + +def get_model_and_metadata(session_id, session_path): + neutral_id = get_neutral_trial_id(session_id) + trial = get_trial_json(neutral_id) + resultTags = [res['tag'] for res in trial['results']] + + get_metadata(session_path, trial, resultTags) + modelName = get_model(session_path, trial, resultTags) + return modelName def get_main_settings(session_folder,trial_name): @@ -169,34 +195,41 @@ def get_model_name_from_metadata(sessionFolder,appendText='_scaled'): return modelName -def get_motion_data(trial_id, session_path): +def get_motion_data(trial_id, session_path, isMono=False): trial = get_trial_json(trial_id) trial_name = trial['name'] resultTags = [res['tag'] for res in trial['results']] # Marker data. - if 'ik_results' in resultTags: + if 'marker_data' in resultTags: markerFolder = os.path.join(session_path, 'MarkerData') markerPath = os.path.join(markerFolder, trial_name + '.trc') os.makedirs(markerFolder, exist_ok=True) - markerURL = trial['results'][resultTags.index('marker_data')]['media'] - download_file(markerURL, markerPath) + if not os.path.exists(markerPath): + markerURL = trial['results'][resultTags.index('marker_data')]['media'] + download_file(markerURL, markerPath) # IK data. if 'ik_results' in resultTags: ikFolder = os.path.join(session_path, 'OpenSimData', 'Kinematics') ikPath = os.path.join(ikFolder, trial_name + '.mot') os.makedirs(ikFolder, exist_ok=True) - ikURL = trial['results'][resultTags.index('ik_results')]['media'] - download_file(ikURL, ikPath) + if not os.path.exists(ikPath): + ikURL = trial['results'][resultTags.index('ik_results')]['media'] + download_file(ikURL, ikPath) + + # Model data if mono trial (isMono = True in session JSON) + if isMono: + get_model(session_path, trial, resultTags, isMono=True) # Main settings if 'main_settings' in resultTags: settingsFolder = os.path.join(session_path, 'MarkerData', 'Settings') settingsPath = os.path.join(settingsFolder, 'settings_' + trial_name + '.yaml') os.makedirs(settingsFolder, exist_ok=True) - settingsURL = trial['results'][resultTags.index('main_settings')]['media'] - download_file(settingsURL, settingsPath) + if not os.path.exists(settingsPath): + settingsURL = trial['results'][resultTags.index('main_settings')]['media'] + download_file(settingsURL, settingsPath) def get_geometries(session_path, modelName='LaiUhlrich2022_scaled'): @@ -253,34 +286,65 @@ def download_kinematics(session_id, folder=None, trialNames=None): if folder is None: folder = os.getcwd() os.makedirs(folder, exist_ok=True) + + sessionJson = get_session_json(session_id) + isMono = sessionJson['isMono'] - # Model and metadata. - neutral_id = get_neutral_trial_id(session_id) - get_motion_data(neutral_id, folder) - modelName = get_model_and_metadata(session_id, folder) - # Remove extension from modelName - modelName = modelName.replace('.osim','') + if not isMono: + # Model and metadata from neutral trial + neutral_id = get_neutral_trial_id(session_id) + get_motion_data(neutral_id, folder) + modelName = get_model_and_metadata(session_id, folder) + # Remove extension from modelName + modelName = modelName.replace('.osim','') # Session trial names. - sessionJson = get_session_json(session_id) sessionTrialNames = [t['name'] for t in sessionJson['trials']] if trialNames != None: [print(t + ' not in session trial names.') for t in trialNames if t not in sessionTrialNames] - # Motion data. + # Get dynamic trial IDs + dynamic_ids = [t['id'] for t in sessionJson['trials'] if (t['name'] != 'calibration' and t['name'] !='neutral')] + + # Metadata for mono session + if isMono: + # Get metadata from the first dynamic trial + if dynamic_ids: + first_trial = get_trial_json(dynamic_ids[0]) + resultTags = [res['tag'] for res in first_trial['results']] + get_metadata(folder, first_trial, resultTags) + + # Motion data for all dynamic trials loadedTrialNames = [] for trialDict in sessionJson['trials']: if trialNames is not None and trialDict['name'] not in trialNames: continue trial_id = trialDict['id'] - get_motion_data(trial_id,folder) + get_motion_data(trial_id,folder, isMono=isMono) loadedTrialNames.append(trialDict['name']) # Remove 'calibration' and 'neutral' from loadedTrialNames. loadedTrialNames = [i for i in loadedTrialNames if i!='neutral' and i!='calibration'] - + # Geometries. + if isMono: + # For mono sessions, find model names in subfolders + modelDir = os.path.join(folder, 'OpenSimData', 'Model') + if os.path.exists(modelDir): + modelNames = [] + for subfolder in os.listdir(modelDir): + subfolderPath = os.path.join(modelDir, subfolder) + if os.path.isdir(subfolderPath): + modelNames.extend([f for f in os.listdir(subfolderPath) if f.endswith('.osim')]) + if modelNames: + # Use first model name found (assuming same model type for all trials) + modelName = modelNames[0].replace('.osim', '') + else: + raise ValueError("No model files found in mono session subfolders") + else: + raise ValueError("Model directory does not exist") + get_geometries(folder, modelName=modelName) return loadedTrialNames, modelName @@ -293,12 +357,20 @@ def download_trial(trial_id, folder, session_id=None): session_id = trial['session_id'] os.makedirs(folder,exist_ok=True) + + # check if it is a mono trial + session = get_session_json(session_id) + isMono = session['isMono'] - # download model - get_model_and_metadata(session_id, folder) - - # download trc and mot - get_motion_data(trial_id,folder) + if isMono: + resultTags = [res['tag'] for res in trial['results']] + get_metadata(folder, trial, resultTags) + else: + # download model + get_model_and_metadata(session_id, folder) + + # download trc and mot + model if mono trial + get_motion_data(trial_id,folder, isMono=isMono) return trial['name'] @@ -475,8 +547,7 @@ def download_videos_from_server(session_id,trial_id, with open(os.path.join(session_path, "Videos", 'mappingCamDevice.pickle'), 'rb') as handle: mappingCamDevice = pickle.load(handle) # ensure upper on deviceID - for dID in mappingCamDevice.keys(): - mappingCamDevice[dID.upper()] = mappingCamDevice.pop(dID) + mappingCamDevice = {k.upper(): v for k, v in mappingCamDevice.items()} for video in trial["videos"]: k = mappingCamDevice[video["device_id"].replace('-', '').upper()] videoDir = os.path.join(session_path, "Videos", "Cam{}".format(k), "InputMedia", trial_name) @@ -555,7 +626,54 @@ def post_file_to_trial(filePath,trial_id,tag,device_id): requests.post("{}results/".format(API_URL), files=files, data=data, headers = {"Authorization": "Token {}".format(API_TOKEN)}) files["media"].close() + +def post_video_to_trial(filePath,trial_id,device_id,parameters): + files = {'video': open(filePath, 'rb')} + data = { + "trial": trial_id, + "device_id" : device_id, + "parameters": parameters + } + + requests.post("{}videos/".format(API_URL), files=files, data=data, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + files["video"].close() + +def delete_video_from_trial(video_id): + + requests.delete("{}videos/{}/".format(API_URL, video_id), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) +def delete_results(trial_id, tag=None, resultNum=None): + # Delete specific result number, or all results with a specific tag, or all results if tag==None + if resultNum != None: + resultNums = [resultNum] + elif tag != None: + trial = get_trial_json(trial_id) + resultNums = [r['id'] for r in trial['results'] if r['tag']==tag] + + elif tag == None: + trial = get_trial_json(trial_id) + resultNums = [r['id'] for r in trial['results']] + + for rNum in resultNums: + requests.delete(API_URL + "results/{}/".format(rNum), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + +def set_trial_status(trial_id, status): + + # Available statuses: 'done', 'error', 'stopped', 'reprocess' + # 'processing' and 'recording also exist, but it does not make sense to set them manually. + # Throw error if status is not one of the above. + if status not in ['done', 'error', 'stopped', 'reprocess']: + raise ValueError('Invalid status. Available statuses: done, error, stopped, reprocess') + + requests.patch(API_URL+"trials/{}/".format(trial_id), data={'status': status}, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + +def set_session_subject(session_id, subject_id): + requests.patch(API_URL+"sessions/{}/".format(session_id), data={'subject': subject_id}, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) def get_syncd_videos(trial_id,session_path): trial = requests.get("{}trials/{}/".format(API_URL,trial_id), @@ -584,44 +702,57 @@ def download_session(session_id, sessionBasePath= None, session = get_session_json(session_id) session_path = os.path.join(sessionBasePath,'OpenCapData_' + session_id) + + os.makedirs(session_path, exist_ok=True) + + isMono = session['isMono'] - calib_id = get_calibration_trial_id(session_id) - neutral_id = get_neutral_trial_id(session_id) + if not isMono: + calib_id = get_calibration_trial_id(session_id) + neutral_id = get_neutral_trial_id(session_id) + + # Calibration + try: + get_camera_mapping(session_id, session_path) + if downloadVideos: + download_videos_from_server(session_id,calib_id, + isCalibration=True,isStaticPose=False, + session_path = session_path) + + get_calibration(session_id,session_path) + except: + pass + + # Neutral + try: + modelName = get_model_and_metadata(session_id,session_path) + get_motion_data(neutral_id,session_path) + if downloadVideos: + download_videos_from_server(session_id,neutral_id, + isCalibration=False,isStaticPose=True, + session_path = session_path) + + get_syncd_videos(neutral_id,session_path) + except: + pass + dynamic_ids = [t['id'] for t in session['trials'] if (t['name'] != 'calibration' and t['name'] !='neutral')] - - # Calibration - try: - get_camera_mapping(session_id, session_path) - if downloadVideos: - download_videos_from_server(session_id,calib_id, - isCalibration=True,isStaticPose=False, - session_path = session_path) - get_calibration(session_id,session_path) - except: - pass - - # Neutral - try: - modelName = get_model_and_metadata(session_id,session_path) - get_motion_data(neutral_id,session_path) - if downloadVideos: - download_videos_from_server(session_id,neutral_id, - isCalibration=False,isStaticPose=True, - session_path = session_path) - - get_syncd_videos(neutral_id,session_path) - except: - pass + # Metadata for mono session + if isMono: + # hand the first dynamic trial + first_trial = get_trial_json(dynamic_ids[0]) + resultTags = [res['tag'] for res in first_trial['results']] + get_metadata(session_path, first_trial, resultTags) # Dynamic for dynamic_id in dynamic_ids: try: - get_motion_data(dynamic_id,session_path) + get_motion_data(dynamic_id,session_path, isMono=isMono) if downloadVideos: download_videos_from_server(session_id,dynamic_id, isCalibration=False,isStaticPose=False, - session_path = session_path) + session_path=session_path) get_syncd_videos(dynamic_id,session_path) except: @@ -639,16 +770,33 @@ def download_session(session_id, sessionBasePath= None, # Geometry try: - if 'Lai' in modelName: - modelType = 'LaiArnold' + if isMono: + # get all names of .osim files in subfolders of Model folder + modelDir = os.path.join(session_path, 'OpenSimData', 'Model') + modelNames = [] + for subfolder in os.listdir(modelDir): + subfolderPath = os.path.join(modelDir, subfolder) + if os.path.isdir(subfolderPath): + modelNames.extend([f for f in os.listdir(subfolderPath) if f.endswith('.osim')]) + # check if any of the model names contain 'Lai', assuming the same model type is used for all trials of the session + if any('Lai' in name for name in modelNames): + modelType = 'LaiArnold' + else: + raise ValueError("Geometries not available for this model, please contact us") + modelName = modelNames[0] + else: - raise ValueError("Geometries not available for this model, please contact us") + if 'Lai' in modelName: + modelType = 'LaiArnold' + else: + raise ValueError("Geometries not available for this model, please contact us") + if platform.system() == 'Windows': geometryDir = os.path.join(repoDir, 'tmp', modelType, 'Geometry') else: geometryDir = "/tmp/{}/Geometry".format(modelType) - # If not in cache, download from s3. - if not os.path.exists(geometryDir): + # If not in cache or empty, download from s3. + if not os.path.exists(geometryDir) or not os.listdir(geometryDir): os.makedirs(geometryDir, exist_ok=True) get_geometries(session_path, modelName=modelName) geometryDirEnd = os.path.join(session_path, 'OpenSimData', 'Model', 'Geometry') diff --git a/gait_analysis/function/utilsKinematics.py b/gait_analysis/function/utilsKinematics.py index 2eb1b8c..f82bc37 100644 --- a/gait_analysis/function/utilsKinematics.py +++ b/gait_analysis/function/utilsKinematics.py @@ -32,6 +32,13 @@ import numpy as np from scipy.spatial.transform import Rotation +# Import marker name mapping for conversion +try: + from marker_name_mapping import REVERSE_MARKER_NAME_MAPPING +except ImportError: + # If mapping file doesn't exist, use empty dict (no conversion) + REVERSE_MARKER_NAME_MAPPING = {} + class kinematics: @@ -46,18 +53,50 @@ def __init__(self, sessionDir, trialName, opensim.Logger.setLevelString('error') modelBasePath = os.path.join(sessionDir, 'OpenSimData', 'Model') + + # Check if this is a mono session (models stored in trial subfolders) + # Check specifically for a subfolder matching the trial name + isMono = False + if os.path.exists(modelBasePath): + trialModelPath = os.path.join(modelBasePath, trialName) + if os.path.isdir(trialModelPath): + isMono = True + # Load model if specified, otherwise load the one that was on server if modelName is None: - modelName = utils.get_model_name_from_metadata(sessionDir) - modelPath = os.path.join(modelBasePath,modelName) + if isMono: + # For mono sessions, look in the trial subfolder + trialModelPath = os.path.join(modelBasePath, trialName) + if os.path.exists(trialModelPath): + # Find .osim file in the trial subfolder + osimFiles = [f for f in os.listdir(trialModelPath) if f.endswith('.osim')] + if osimFiles: + modelPath = os.path.join(trialModelPath, osimFiles[0]) + else: + raise Exception('No .osim file found in ' + trialModelPath) + else: + raise Exception('Trial model folder does not exist: ' + trialModelPath) + else: + modelName = utils.get_model_name_from_metadata(sessionDir) + modelPath = os.path.join(modelBasePath, modelName) else: - modelPath = os.path.join(modelBasePath, - '{}.osim'.format(modelName)) + if isMono: + # For mono sessions, look in the trial subfolder + trialModelPath = os.path.join(modelBasePath, trialName) + if not modelName.endswith('.osim'): + modelName = modelName + '.osim' + modelPath = os.path.join(trialModelPath, modelName) + else: + if not modelName.endswith('.osim'): + modelPath = os.path.join(modelBasePath, '{}.osim'.format(modelName)) + else: + modelPath = os.path.join(modelBasePath, modelName) # make sure model exists if not os.path.exists(modelPath): raise Exception('Model path: ' + modelPath + ' does not exist.') + self.modelPath = modelPath self.model = opensim.Model(modelPath) self.model.initSystem() @@ -175,6 +214,25 @@ def get_marker_dict(self, session_dir, trial_name, '{}.trc'.format(trial_name)) markerDict = trc_2_dict(trcFilePath) + + # Convert marker names from actual format to expected format (with _study suffix) + if REVERSE_MARKER_NAME_MAPPING: + converted_markers = {} + # First pass: add markers that are already in correct format (prioritize these) + for marker_name, marker_data in markerDict['markers'].items(): + if marker_name not in REVERSE_MARKER_NAME_MAPPING: + # Already in correct format or unknown marker - keep as-is + converted_markers[marker_name] = marker_data + # Second pass: convert markers that need renaming (only if target doesn't exist) + for marker_name, marker_data in markerDict['markers'].items(): + if marker_name in REVERSE_MARKER_NAME_MAPPING: + new_name = REVERSE_MARKER_NAME_MAPPING[marker_name] + # Only convert if the target name doesn't already exist + # (avoids overwriting markers already in correct format) + if new_name not in converted_markers: + converted_markers[new_name] = marker_data + markerDict['markers'] = converted_markers + if lowpass_cutoff_frequency > 0: markerDict['markers'] = { marker_name: lowPassFilter(self.time, data, lowpass_cutoff_frequency) @@ -211,7 +269,7 @@ def rotate_com(self, comValues, euler_angles): rotated_com = rotation.apply(comValuesArray) # turn back into a dataframe with time as first column - rotated_com = pd.DataFrame(data=np.concatenate((np.expand_dims(comValues['time'].to_numpy, axis=1), rotated_com), axis=1), + rotated_com = pd.DataFrame(data=np.concatenate((np.expand_dims(comValues['time'].to_numpy(), axis=1), rotated_com), axis=1), columns=['time','x','y','z']) return rotated_com diff --git a/max_centerofmass_vpos/function/handler.py b/max_centerofmass_vpos/function/handler.py index 1f3a2ea..7a6f698 100644 --- a/max_centerofmass_vpos/function/handler.py +++ b/max_centerofmass_vpos/function/handler.py @@ -32,8 +32,13 @@ def handler(event, context): To invoke the function do POST request on the following url http://localhost:8080/2015-03-31/functions/function/invocations """ - # temporary placeholder - kwargs = json.loads(event['body']) + body = event.get('body', None) + if isinstance(body, dict): + kwargs = body + elif isinstance(body, str) and body: + kwargs = json.loads(body) + else: + kwargs = event for field in ('session_id', 'specific_trial_names'): if field not in kwargs: diff --git a/max_centerofmass_vpos/function/marker_name_mapping.py b/max_centerofmass_vpos/function/marker_name_mapping.py new file mode 100644 index 0000000..cf18e2a --- /dev/null +++ b/max_centerofmass_vpos/function/marker_name_mapping.py @@ -0,0 +1,56 @@ +""" +Marker name mapping dictionary for converting from expected format (with '_study' suffix) +to actual format (without '_study' suffix, lowercase). + +This mapping is used to rename markers in TRC files to match the expected format +used by the gait_analysis class. + +Expected format: markers end with '_study' (e.g., 'r_calc_study', 'r.ASIS_study') +Actual format: markers without '_study' suffix, lowercase (e.g., 'r_calc', 'r_ASIS') +""" + +MARKER_NAME_MAPPING = { + # Pelvis markers + 'r.ASIS_study': 'r_ASIS', + 'L.ASIS_study': 'l_ASIS', + 'r.PSIS_study': 'r_PSIS', + 'L.PSIS_study': 'l_PSIS', + + # Right leg markers + 'r_knee_study': 'r_knee', + 'r_mknee_study': 'r_mknee', + 'r_ankle_study': 'r_ankle', + 'r_mankle_study': 'r_mankle', + 'r_toe_study': 'r_toe', + 'r_5meta_study': 'r_5meta', + 'r_calc_study': 'r_calc', + + # Left leg markers + 'L_knee_study': 'l_knee', + 'L_mknee_study': 'l_mknee', + 'L_ankle_study': 'l_ankle', + 'L_mankle_study': 'l_mankle', + 'L_toe_study': 'l_toe', + 'L_calc_study': 'l_calc', + 'L_5meta_study': 'l_5meta', + + # Shoulder markers + 'r_shoulder_study': 'r_shoulder', + 'L_shoulder_study': 'l_shoulder', + + # Spine markers + 'C7_study': 'C7', + + # Hip joint centers + 'RHJC_study': 'RHJC', # Check if exists in actual file + 'LHJC_study': 'LHJC', # Check if exists in actual file + + # Elbow markers + 'r_melbow_study': 'r_melbow', + 'L_melbow_study': 'l_melbow', + +} + +# Reverse mapping (actual -> expected) for renaming markers in TRC files +REVERSE_MARKER_NAME_MAPPING = {v: k for k, v in MARKER_NAME_MAPPING.items()} + diff --git a/max_centerofmass_vpos/function/utils.py b/max_centerofmass_vpos/function/utils.py index a1db3ab..78932c4 100644 --- a/max_centerofmass_vpos/function/utils.py +++ b/max_centerofmass_vpos/function/utils.py @@ -29,9 +29,13 @@ import glob import zipfile import platform +import opensim from utilsAPI import get_api_url from utilsAuthentication import get_token +import matplotlib.pyplot as plt +from scipy.signal.windows import gaussian + API_URL = get_api_url() API_TOKEN = get_token() @@ -67,6 +71,31 @@ def get_user_sessions(): return sessions +# Returns a list of all sessions of the user. +# TODO: this also contains public sessions of other users. +def get_user_sessions_all(user_token=API_TOKEN): + sessions = requests.get( + API_URL + "sessions/", + headers = {"Authorization": "Token {}".format(user_token)}).json() + + return sessions + +# Returns a list of all subjects of the user. +def get_user_subjects(user_token=API_TOKEN): + subjects = requests.get( + API_URL + "subjects/", + headers = {"Authorization": "Token {}".format(user_token)}).json() + + return subjects + +# Returns a list of all sessions of a subject. +def get_subject_sessions(subject_id, user_token=API_TOKEN): + sessions = requests.get( + API_URL + "subjects/{}/".format(subject_id), + headers = {"Authorization": "Token {}".format(user_token)}).json()['sessions'] + + return sessions + def get_trial_json(trial_id): trialJson = requests.get( API_URL + "trials/{}/".format(trial_id), @@ -76,6 +105,8 @@ def get_trial_json(trial_id): def get_neutral_trial_id(session_id): session = get_session_json(session_id) + if session['isMono']: + return None neutral_ids = [t['id'] for t in session['trials'] if t['name']=='neutral'] if len(neutral_ids)>0: @@ -112,29 +143,45 @@ def get_camera_mapping(session_id, session_path): if not os.path.exists(mappingPath): mappingURL = trial['results'][resultTags.index('camera_mapping')]['media'] download_file(mappingURL, mappingPath) - -def get_model_and_metadata(session_id, session_path): - neutral_id = get_neutral_trial_id(session_id) - trial = get_trial_json(neutral_id) - resultTags = [res['tag'] for res in trial['results']] - - # Metadata. + +def get_metadata(session_path, trial, resultTags): metadataPath = os.path.join(session_path,'sessionMetadata.yaml') - if not os.path.exists(metadataPath) : + if not os.path.exists(metadataPath): metadataURL = trial['results'][resultTags.index('session_metadata')]['media'] download_file(metadataURL, metadataPath) - # Model. + +def get_model(session_path, trial, resultTags, isMono=False): modelURL = trial['results'][resultTags.index('opensim_model')]['media'] modelName = modelURL[modelURL.rfind('-')+1:modelURL.rfind('?')] modelFolder = os.path.join(session_path, 'OpenSimData', 'Model') + if isMono: + modelFolder = os.path.join(modelFolder, trial['name']) modelPath = os.path.join(modelFolder, modelName) if not os.path.exists(modelPath): os.makedirs(modelFolder, exist_ok=True) download_file(modelURL, modelPath) - return modelName + + +def get_model_and_metadata(session_id, session_path): + neutral_id = get_neutral_trial_id(session_id) + trial = get_trial_json(neutral_id) + resultTags = [res['tag'] for res in trial['results']] + + get_metadata(session_path, trial, resultTags) + modelName = get_model(session_path, trial, resultTags) + + return modelName + +def get_main_settings(session_folder,trial_name): + settings_path = os.path.join(session_folder,'MarkerData', + 'Settings','settings_' + trial_name + '.yaml') + main_settings = import_metadata(settings_path) + + return main_settings + def get_model_name_from_metadata(sessionFolder,appendText='_scaled'): metadataPath = os.path.join(sessionFolder,'sessionMetadata.yaml') @@ -148,26 +195,41 @@ def get_model_name_from_metadata(sessionFolder,appendText='_scaled'): return modelName -def get_motion_data(trial_id, session_path): +def get_motion_data(trial_id, session_path, isMono=False): trial = get_trial_json(trial_id) trial_name = trial['name'] resultTags = [res['tag'] for res in trial['results']] # Marker data. - if 'ik_results' in resultTags: + if 'marker_data' in resultTags: markerFolder = os.path.join(session_path, 'MarkerData') markerPath = os.path.join(markerFolder, trial_name + '.trc') os.makedirs(markerFolder, exist_ok=True) - markerURL = trial['results'][resultTags.index('marker_data')]['media'] - download_file(markerURL, markerPath) + if not os.path.exists(markerPath): + markerURL = trial['results'][resultTags.index('marker_data')]['media'] + download_file(markerURL, markerPath) # IK data. if 'ik_results' in resultTags: ikFolder = os.path.join(session_path, 'OpenSimData', 'Kinematics') ikPath = os.path.join(ikFolder, trial_name + '.mot') os.makedirs(ikFolder, exist_ok=True) - ikURL = trial['results'][resultTags.index('ik_results')]['media'] - download_file(ikURL, ikPath) + if not os.path.exists(ikPath): + ikURL = trial['results'][resultTags.index('ik_results')]['media'] + download_file(ikURL, ikPath) + + # Model data if mono trial (isMono = True in session JSON) + if isMono: + get_model(session_path, trial, resultTags, isMono=True) + + # Main settings + if 'main_settings' in resultTags: + settingsFolder = os.path.join(session_path, 'MarkerData', 'Settings') + settingsPath = os.path.join(settingsFolder, 'settings_' + trial_name + '.yaml') + os.makedirs(settingsFolder, exist_ok=True) + if not os.path.exists(settingsPath): + settingsURL = trial['results'][resultTags.index('main_settings')]['media'] + download_file(settingsURL, settingsPath) def get_geometries(session_path, modelName='LaiUhlrich2022_scaled'): @@ -224,34 +286,65 @@ def download_kinematics(session_id, folder=None, trialNames=None): if folder is None: folder = os.getcwd() os.makedirs(folder, exist_ok=True) + + sessionJson = get_session_json(session_id) + isMono = sessionJson['isMono'] - # Model and metadata. - neutral_id = get_neutral_trial_id(session_id) - get_motion_data(neutral_id, folder) - modelName = get_model_and_metadata(session_id, folder) - # Remove extension from modelName - modelName = modelName.replace('.osim','') + if not isMono: + # Model and metadata from neutral trial + neutral_id = get_neutral_trial_id(session_id) + get_motion_data(neutral_id, folder) + modelName = get_model_and_metadata(session_id, folder) + # Remove extension from modelName + modelName = modelName.replace('.osim','') # Session trial names. - sessionJson = get_session_json(session_id) sessionTrialNames = [t['name'] for t in sessionJson['trials']] if trialNames != None: [print(t + ' not in session trial names.') for t in trialNames if t not in sessionTrialNames] - # Motion data. + # Get dynamic trial IDs + dynamic_ids = [t['id'] for t in sessionJson['trials'] if (t['name'] != 'calibration' and t['name'] !='neutral')] + + # Metadata for mono session + if isMono: + # Get metadata from the first dynamic trial + if dynamic_ids: + first_trial = get_trial_json(dynamic_ids[0]) + resultTags = [res['tag'] for res in first_trial['results']] + get_metadata(folder, first_trial, resultTags) + + # Motion data for all dynamic trials loadedTrialNames = [] for trialDict in sessionJson['trials']: if trialNames is not None and trialDict['name'] not in trialNames: continue trial_id = trialDict['id'] - get_motion_data(trial_id,folder) + get_motion_data(trial_id,folder, isMono=isMono) loadedTrialNames.append(trialDict['name']) # Remove 'calibration' and 'neutral' from loadedTrialNames. loadedTrialNames = [i for i in loadedTrialNames if i!='neutral' and i!='calibration'] - + # Geometries. + if isMono: + # For mono sessions, find model names in subfolders + modelDir = os.path.join(folder, 'OpenSimData', 'Model') + if os.path.exists(modelDir): + modelNames = [] + for subfolder in os.listdir(modelDir): + subfolderPath = os.path.join(modelDir, subfolder) + if os.path.isdir(subfolderPath): + modelNames.extend([f for f in os.listdir(subfolderPath) if f.endswith('.osim')]) + if modelNames: + # Use first model name found (assuming same model type for all trials) + modelName = modelNames[0].replace('.osim', '') + else: + raise ValueError("No model files found in mono session subfolders") + else: + raise ValueError("Model directory does not exist") + get_geometries(folder, modelName=modelName) return loadedTrialNames, modelName @@ -264,12 +357,20 @@ def download_trial(trial_id, folder, session_id=None): session_id = trial['session_id'] os.makedirs(folder,exist_ok=True) + + # check if it is a mono trial + session = get_session_json(session_id) + isMono = session['isMono'] - # download model - get_model_and_metadata(session_id, folder) - - # download trc and mot - get_motion_data(trial_id,folder) + if isMono: + resultTags = [res['tag'] for res in trial['results']] + get_metadata(folder, trial, resultTags) + else: + # download model + get_model_and_metadata(session_id, folder) + + # download trc and mot + model if mono trial + get_motion_data(trial_id,folder, isMono=isMono) return trial['name'] @@ -340,6 +441,21 @@ def storage_to_dataframe(storage_file, headers): return out +# %% Load storage and output as dataframe or numpy +def load_storage(file_path,outputFormat='numpy'): + table = opensim.TimeSeriesTable(file_path) + data = table.getMatrix().to_numpy() + time = np.asarray(table.getIndependentColumn()).reshape(-1, 1) + data = np.hstack((time,data)) + headers = ['time'] + list(table.getColumnLabels()) + + if outputFormat == 'numpy': + return data,headers + elif outputFormat == 'dataframe': + return pd.DataFrame(data, columns=headers) + else: + return None + # %% Numpy array to storage file. def numpy_to_storage(labels, data, storage_file, datatype=None): @@ -431,8 +547,7 @@ def download_videos_from_server(session_id,trial_id, with open(os.path.join(session_path, "Videos", 'mappingCamDevice.pickle'), 'rb') as handle: mappingCamDevice = pickle.load(handle) # ensure upper on deviceID - for dID in mappingCamDevice.keys(): - mappingCamDevice[dID.upper()] = mappingCamDevice.pop(dID) + mappingCamDevice = {k.upper(): v for k, v in mappingCamDevice.items()} for video in trial["videos"]: k = mappingCamDevice[video["device_id"].replace('-', '').upper()] videoDir = os.path.join(session_path, "Videos", "Cam{}".format(k), "InputMedia", trial_name) @@ -511,7 +626,54 @@ def post_file_to_trial(filePath,trial_id,tag,device_id): requests.post("{}results/".format(API_URL), files=files, data=data, headers = {"Authorization": "Token {}".format(API_TOKEN)}) files["media"].close() + +def post_video_to_trial(filePath,trial_id,device_id,parameters): + files = {'video': open(filePath, 'rb')} + data = { + "trial": trial_id, + "device_id" : device_id, + "parameters": parameters + } + + requests.post("{}videos/".format(API_URL), files=files, data=data, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + files["video"].close() + +def delete_video_from_trial(video_id): + + requests.delete("{}videos/{}/".format(API_URL, video_id), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + +def delete_results(trial_id, tag=None, resultNum=None): + # Delete specific result number, or all results with a specific tag, or all results if tag==None + if resultNum != None: + resultNums = [resultNum] + elif tag != None: + trial = get_trial_json(trial_id) + resultNums = [r['id'] for r in trial['results'] if r['tag']==tag] + + elif tag == None: + trial = get_trial_json(trial_id) + resultNums = [r['id'] for r in trial['results']] + + for rNum in resultNums: + requests.delete(API_URL + "results/{}/".format(rNum), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + +def set_trial_status(trial_id, status): + + # Available statuses: 'done', 'error', 'stopped', 'reprocess' + # 'processing' and 'recording also exist, but it does not make sense to set them manually. + # Throw error if status is not one of the above. + if status not in ['done', 'error', 'stopped', 'reprocess']: + raise ValueError('Invalid status. Available statuses: done, error, stopped, reprocess') + + requests.patch(API_URL+"trials/{}/".format(trial_id), data={'status': status}, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) +def set_session_subject(session_id, subject_id): + requests.patch(API_URL+"sessions/{}/".format(session_id), data={'subject': subject_id}, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) def get_syncd_videos(trial_id,session_path): trial = requests.get("{}trials/{}/".format(API_URL,trial_id), @@ -540,44 +702,57 @@ def download_session(session_id, sessionBasePath= None, session = get_session_json(session_id) session_path = os.path.join(sessionBasePath,'OpenCapData_' + session_id) + + os.makedirs(session_path, exist_ok=True) + + isMono = session['isMono'] - calib_id = get_calibration_trial_id(session_id) - neutral_id = get_neutral_trial_id(session_id) + if not isMono: + calib_id = get_calibration_trial_id(session_id) + neutral_id = get_neutral_trial_id(session_id) + + # Calibration + try: + get_camera_mapping(session_id, session_path) + if downloadVideos: + download_videos_from_server(session_id,calib_id, + isCalibration=True,isStaticPose=False, + session_path = session_path) + + get_calibration(session_id,session_path) + except: + pass + + # Neutral + try: + modelName = get_model_and_metadata(session_id,session_path) + get_motion_data(neutral_id,session_path) + if downloadVideos: + download_videos_from_server(session_id,neutral_id, + isCalibration=False,isStaticPose=True, + session_path = session_path) + + get_syncd_videos(neutral_id,session_path) + except: + pass + dynamic_ids = [t['id'] for t in session['trials'] if (t['name'] != 'calibration' and t['name'] !='neutral')] - - # Calibration - try: - get_camera_mapping(session_id, session_path) - if downloadVideos: - download_videos_from_server(session_id,calib_id, - isCalibration=True,isStaticPose=False, - session_path = session_path) - get_calibration(session_id,session_path) - except: - pass - - # Neutral - try: - modelName = get_model_and_metadata(session_id,session_path) - get_motion_data(neutral_id,session_path) - if downloadVideos: - download_videos_from_server(session_id,neutral_id, - isCalibration=False,isStaticPose=True, - session_path = session_path) - - get_syncd_videos(neutral_id,session_path) - except: - pass + # Metadata for mono session + if isMono: + # hand the first dynamic trial + first_trial = get_trial_json(dynamic_ids[0]) + resultTags = [res['tag'] for res in first_trial['results']] + get_metadata(session_path, first_trial, resultTags) # Dynamic for dynamic_id in dynamic_ids: try: - get_motion_data(dynamic_id,session_path) + get_motion_data(dynamic_id,session_path, isMono=isMono) if downloadVideos: download_videos_from_server(session_id,dynamic_id, isCalibration=False,isStaticPose=False, - session_path = session_path) + session_path=session_path) get_syncd_videos(dynamic_id,session_path) except: @@ -595,16 +770,33 @@ def download_session(session_id, sessionBasePath= None, # Geometry try: - if 'Lai' in modelName: - modelType = 'LaiArnold' + if isMono: + # get all names of .osim files in subfolders of Model folder + modelDir = os.path.join(session_path, 'OpenSimData', 'Model') + modelNames = [] + for subfolder in os.listdir(modelDir): + subfolderPath = os.path.join(modelDir, subfolder) + if os.path.isdir(subfolderPath): + modelNames.extend([f for f in os.listdir(subfolderPath) if f.endswith('.osim')]) + # check if any of the model names contain 'Lai', assuming the same model type is used for all trials of the session + if any('Lai' in name for name in modelNames): + modelType = 'LaiArnold' + else: + raise ValueError("Geometries not available for this model, please contact us") + modelName = modelNames[0] + else: - raise ValueError("Geometries not available for this model, please contact us") + if 'Lai' in modelName: + modelType = 'LaiArnold' + else: + raise ValueError("Geometries not available for this model, please contact us") + if platform.system() == 'Windows': geometryDir = os.path.join(repoDir, 'tmp', modelType, 'Geometry') else: geometryDir = "/tmp/{}/Geometry".format(modelType) - # If not in cache, download from s3. - if not os.path.exists(geometryDir): + # If not in cache or empty, download from s3. + if not os.path.exists(geometryDir) or not os.listdir(geometryDir): os.makedirs(geometryDir, exist_ok=True) get_geometries(session_path, modelName=modelName) geometryDirEnd = os.path.join(session_path, 'OpenSimData', 'Model', 'Geometry') @@ -632,4 +824,72 @@ def zipdir(path, ziph): if writeToDB: post_file_to_trial(session_zip,dynamic_ids[-1],tag='session_zip', device_id='all') - \ No newline at end of file + +def cross_corr(y1, y2,multCorrGaussianStd=None,visualize=False): + """Calculates the cross correlation and lags without normalization. + + The definition of the discrete cross-correlation is in: + https://www.mathworks.com/help/matlab/ref/xcorr.html + + Args: + y1, y2: Should have the same length. + + Returns: + max_corr: Maximum correlation without normalization. + lag: The lag in terms of the index. + """ + # Pad shorter signal with 0s + if len(y1) > len(y2): + temp = np.zeros(len(y1)) + temp[0:len(y2)] = y2 + y2 = np.copy(temp) + elif len(y2)>len(y1): + temp = np.zeros(len(y2)) + temp[0:len(y1)] = y1 + y1 = np.copy(temp) + + y1_auto_corr = np.dot(y1, y1) / len(y1) + y2_auto_corr = np.dot(y2, y2) / len(y1) + corr = np.correlate(y1, y2, mode='same') + # The unbiased sample size is N - lag. + unbiased_sample_size = np.correlate(np.ones(len(y1)), np.ones(len(y1)), mode='same') + corr = corr / unbiased_sample_size / np.sqrt(y1_auto_corr * y2_auto_corr) + shift = len(y1) // 2 + max_corr = np.max(corr) + argmax_corr = np.argmax(corr) + + if visualize: + plt.figure() + plt.plot(corr) + plt.title('vertical velocity correlation') + + # Multiply correlation curve by gaussian (prioritizing lag solution closest to 0) + if multCorrGaussianStd is not None: + corr = np.multiply(corr,gaussian(len(corr),multCorrGaussianStd)) + if visualize: + plt.plot(corr,color=[.4,.4,.4]) + plt.legend(['corr','corr*gaussian']) + + argmax_corr = np.argmax(corr) + max_corr = np.nanmax(corr) + + lag = argmax_corr-shift + + return max_corr, lag + +def downsample(data,time,framerate_in,framerate_out): + # Calculate the downsampling factor + downsampling_factor = framerate_in / framerate_out + + # Create new indices for downsampling + original_indices = np.arange(len(data)) + new_indices = np.arange(0, len(data), downsampling_factor) + + # Perform downsampling with interpolation + downsampled_data = np.ndarray((len(new_indices), data.shape[1])) + for i in range(data.shape[1]): + downsampled_data[:,i] = np.interp(new_indices, original_indices, data[:,i]) + + downsampled_time = np.interp(new_indices, original_indices, time) + + return downsampled_time, downsampled_data \ No newline at end of file diff --git a/max_centerofmass_vpos/function/utilsKinematics.py b/max_centerofmass_vpos/function/utilsKinematics.py index 796e0d8..f82bc37 100644 --- a/max_centerofmass_vpos/function/utilsKinematics.py +++ b/max_centerofmass_vpos/function/utilsKinematics.py @@ -20,6 +20,7 @@ import os import opensim +import copy import utils import numpy as np import pandas as pd @@ -28,6 +29,15 @@ from utilsProcessing import lowPassFilter from utilsTRC import trc_2_dict +import numpy as np +from scipy.spatial.transform import Rotation + +# Import marker name mapping for conversion +try: + from marker_name_mapping import REVERSE_MARKER_NAME_MAPPING +except ImportError: + # If mapping file doesn't exist, use empty dict (no conversion) + REVERSE_MARKER_NAME_MAPPING = {} class kinematics: @@ -43,18 +53,50 @@ def __init__(self, sessionDir, trialName, opensim.Logger.setLevelString('error') modelBasePath = os.path.join(sessionDir, 'OpenSimData', 'Model') + + # Check if this is a mono session (models stored in trial subfolders) + # Check specifically for a subfolder matching the trial name + isMono = False + if os.path.exists(modelBasePath): + trialModelPath = os.path.join(modelBasePath, trialName) + if os.path.isdir(trialModelPath): + isMono = True + # Load model if specified, otherwise load the one that was on server if modelName is None: - modelName = utils.get_model_name_from_metadata(sessionDir) - modelPath = os.path.join(modelBasePath,modelName) + if isMono: + # For mono sessions, look in the trial subfolder + trialModelPath = os.path.join(modelBasePath, trialName) + if os.path.exists(trialModelPath): + # Find .osim file in the trial subfolder + osimFiles = [f for f in os.listdir(trialModelPath) if f.endswith('.osim')] + if osimFiles: + modelPath = os.path.join(trialModelPath, osimFiles[0]) + else: + raise Exception('No .osim file found in ' + trialModelPath) + else: + raise Exception('Trial model folder does not exist: ' + trialModelPath) + else: + modelName = utils.get_model_name_from_metadata(sessionDir) + modelPath = os.path.join(modelBasePath, modelName) else: - modelPath = os.path.join(modelBasePath, - '{}.osim'.format(modelName)) + if isMono: + # For mono sessions, look in the trial subfolder + trialModelPath = os.path.join(modelBasePath, trialName) + if not modelName.endswith('.osim'): + modelName = modelName + '.osim' + modelPath = os.path.join(trialModelPath, modelName) + else: + if not modelName.endswith('.osim'): + modelPath = os.path.join(modelBasePath, '{}.osim'.format(modelName)) + else: + modelPath = os.path.join(modelBasePath, modelName) # make sure model exists if not os.path.exists(modelPath): raise Exception('Model path: ' + modelPath + ' does not exist.') + self.modelPath = modelPath self.model = opensim.Model(modelPath) self.model.initSystem() @@ -172,13 +214,66 @@ def get_marker_dict(self, session_dir, trial_name, '{}.trc'.format(trial_name)) markerDict = trc_2_dict(trcFilePath) + + # Convert marker names from actual format to expected format (with _study suffix) + if REVERSE_MARKER_NAME_MAPPING: + converted_markers = {} + # First pass: add markers that are already in correct format (prioritize these) + for marker_name, marker_data in markerDict['markers'].items(): + if marker_name not in REVERSE_MARKER_NAME_MAPPING: + # Already in correct format or unknown marker - keep as-is + converted_markers[marker_name] = marker_data + # Second pass: convert markers that need renaming (only if target doesn't exist) + for marker_name, marker_data in markerDict['markers'].items(): + if marker_name in REVERSE_MARKER_NAME_MAPPING: + new_name = REVERSE_MARKER_NAME_MAPPING[marker_name] + # Only convert if the target name doesn't already exist + # (avoids overwriting markers already in correct format) + if new_name not in converted_markers: + converted_markers[new_name] = marker_data + markerDict['markers'] = converted_markers + if lowpass_cutoff_frequency > 0: markerDict['markers'] = { marker_name: lowPassFilter(self.time, data, lowpass_cutoff_frequency) for marker_name, data in markerDict['markers'].items()} return markerDict + + def rotate_marker_dict(self, markerDict, euler_angles): + # euler_angles is a dictionary with keys being the axes of rotation + # (x, y, z) and values being the angles in degrees. e.g. {'x': 90, 'y': 180} + + rotated_marker_dict = copy.deepcopy(markerDict) + rotated_marker_dict['markers'] = {} + + rotation = Rotation.from_euler(''.join(list(euler_angles.keys())), + list(euler_angles.values()), degrees=True) + + for marker, positions in markerDict['markers'].items(): + rotated_positions = rotation.apply(positions) + rotated_marker_dict['markers'][marker] = rotated_positions + + return rotated_marker_dict + + def rotate_com(self, comValues, euler_angles): + # euler_angles is a dictionary with keys being the axes of rotation + # (x, y, z) and values being the angles in degrees. e.g. {'x': 90, 'y': 180} + + rotation = Rotation.from_euler(''.join(list(euler_angles.keys())), + list(euler_angles.values()), degrees=True) + # turn the x, y, z dataframe entries into a into a 3xN array + comValuesArray = comValues[['x','y','z']].to_numpy() + + rotated_com = rotation.apply(comValuesArray) + + # turn back into a dataframe with time as first column + rotated_com = pd.DataFrame(data=np.concatenate((np.expand_dims(comValues['time'].to_numpy(), axis=1), rotated_com), axis=1), + columns=['time','x','y','z']) + + return rotated_com + def get_coordinate_values(self, in_degrees=True, lowpass_cutoff_frequency=-1): @@ -201,9 +296,9 @@ def get_coordinate_values(self, in_degrees=True, data = np.concatenate( (np.expand_dims(self.time, axis=1), Qs), axis=1) columns = ['time'] + self.columnLabels - coordinate_values = pd.DataFrame(data=data, columns=columns) + self.coordinate_values = pd.DataFrame(data=data, columns=columns) - return coordinate_values + return self.coordinate_values def get_coordinate_speeds(self, in_degrees=True, lowpass_cutoff_frequency=-1): @@ -406,4 +501,67 @@ def get_center_of_mass_accelerations(self, lowpass_cutoff_frequency=-1): columns = ['time'] + ['x','y','z'] com_accelerations = pd.DataFrame(data=data, columns=columns) - return com_accelerations \ No newline at end of file + return com_accelerations + + def get_body_angular_velocity(self, body_names=None, lowpass_cutoff_frequency=-1, + expressed_in='body'): + + body_set = self.model.getBodySet() + if body_names is None: + body_names = [] + for i in range(body_set.getSize()): + print(i) + body = body_set.get(i) + body_names.append(body.getName()) + + bodies = [body_set.get(body_name) for body_name in body_names] + ground = self.model.getGround() + + angular_velocity = np.ndarray((self.table.getNumRows(), + len(body_names)*3)) # time x bodies x dim + + for i_time in range(self.table.getNumRows()): # loop over time + state = self.stateTrajectory()[i_time] + self.model.realizeVelocity(state) + + + for i_body,body in enumerate(bodies): + ang_vel_in_ground = body.getAngularVelocityInGround(state) + if expressed_in == 'body': + angular_velocity[i_time, i_body*3:i_body*3+3] = ground.expressVectorInAnotherFrame( + state, ang_vel_in_ground, body + ).to_numpy() + elif expressed_in == 'ground': + angular_velocity[i_time, i_body*3:i_body*3+3] = ang_vel_in_ground.to_numpy() + else: + raise Exception (expressed_in + ' is not a valid frame to express angular' + + ' velocity.') + + angular_velocity_filtered = lowPassFilter(self.time, angular_velocity, lowpass_cutoff_frequency) + + # Put into a dataframe + data = np.concatenate((np.expand_dims(self.time, axis=1), angular_velocity_filtered), axis=1) + columns = ['time'] + for i, body_name in enumerate(body_names): + columns += [f'{body_name}_x', f'{body_name}_y', f'{body_name}_z'] + angular_velocity_df = pd.DataFrame(data=data, columns=columns) + + return angular_velocity_df + + def get_ranges_of_motion(self, in_degrees=True, lowpass_cutoff_frequency=-1): + + self.get_coordinate_values( + in_degrees=in_degrees, + lowpass_cutoff_frequency=lowpass_cutoff_frequency) + + # Compute ranges of motion. + ROM = {} + for c, coord in enumerate(self.coordinates): + ROM[coord] = {} + ROM[coord]['min'] = self.coordinate_values[coord].min() + ROM[coord]['max'] = self.coordinate_values[coord].max() + ROM[coord]['amplitude'] = ( + self.coordinate_values[coord].max() - + self.coordinate_values[coord].min()) + + return ROM diff --git a/squat_analysis/function/handler.py b/squat_analysis/function/handler.py index fd481fb..2c73b68 100644 --- a/squat_analysis/function/handler.py +++ b/squat_analysis/function/handler.py @@ -31,8 +31,13 @@ def handler(event, context): To invoke the function do POST request on the following url http://localhost:8080/2015-03-31/functions/function/invocations """ - # temporary placeholder - kwargs = json.loads(event['body']) + body = event.get('body', None) + if isinstance(body, dict): + kwargs = body + elif isinstance(body, str) and body: + kwargs = json.loads(body) + else: + kwargs = event for field in ('session_id', 'specific_trial_names'): if field not in kwargs: diff --git a/squat_analysis/function/marker_name_mapping.py b/squat_analysis/function/marker_name_mapping.py new file mode 100644 index 0000000..cf18e2a --- /dev/null +++ b/squat_analysis/function/marker_name_mapping.py @@ -0,0 +1,56 @@ +""" +Marker name mapping dictionary for converting from expected format (with '_study' suffix) +to actual format (without '_study' suffix, lowercase). + +This mapping is used to rename markers in TRC files to match the expected format +used by the gait_analysis class. + +Expected format: markers end with '_study' (e.g., 'r_calc_study', 'r.ASIS_study') +Actual format: markers without '_study' suffix, lowercase (e.g., 'r_calc', 'r_ASIS') +""" + +MARKER_NAME_MAPPING = { + # Pelvis markers + 'r.ASIS_study': 'r_ASIS', + 'L.ASIS_study': 'l_ASIS', + 'r.PSIS_study': 'r_PSIS', + 'L.PSIS_study': 'l_PSIS', + + # Right leg markers + 'r_knee_study': 'r_knee', + 'r_mknee_study': 'r_mknee', + 'r_ankle_study': 'r_ankle', + 'r_mankle_study': 'r_mankle', + 'r_toe_study': 'r_toe', + 'r_5meta_study': 'r_5meta', + 'r_calc_study': 'r_calc', + + # Left leg markers + 'L_knee_study': 'l_knee', + 'L_mknee_study': 'l_mknee', + 'L_ankle_study': 'l_ankle', + 'L_mankle_study': 'l_mankle', + 'L_toe_study': 'l_toe', + 'L_calc_study': 'l_calc', + 'L_5meta_study': 'l_5meta', + + # Shoulder markers + 'r_shoulder_study': 'r_shoulder', + 'L_shoulder_study': 'l_shoulder', + + # Spine markers + 'C7_study': 'C7', + + # Hip joint centers + 'RHJC_study': 'RHJC', # Check if exists in actual file + 'LHJC_study': 'LHJC', # Check if exists in actual file + + # Elbow markers + 'r_melbow_study': 'r_melbow', + 'L_melbow_study': 'l_melbow', + +} + +# Reverse mapping (actual -> expected) for renaming markers in TRC files +REVERSE_MARKER_NAME_MAPPING = {v: k for k, v in MARKER_NAME_MAPPING.items()} + diff --git a/squat_analysis/function/utils.py b/squat_analysis/function/utils.py index 1e2b162..78932c4 100644 --- a/squat_analysis/function/utils.py +++ b/squat_analysis/function/utils.py @@ -34,7 +34,7 @@ from utilsAPI import get_api_url from utilsAuthentication import get_token import matplotlib.pyplot as plt -from scipy.signal import gaussian +from scipy.signal.windows import gaussian API_URL = get_api_url() @@ -80,6 +80,22 @@ def get_user_sessions_all(user_token=API_TOKEN): return sessions +# Returns a list of all subjects of the user. +def get_user_subjects(user_token=API_TOKEN): + subjects = requests.get( + API_URL + "subjects/", + headers = {"Authorization": "Token {}".format(user_token)}).json() + + return subjects + +# Returns a list of all sessions of a subject. +def get_subject_sessions(subject_id, user_token=API_TOKEN): + sessions = requests.get( + API_URL + "subjects/{}/".format(subject_id), + headers = {"Authorization": "Token {}".format(user_token)}).json()['sessions'] + + return sessions + def get_trial_json(trial_id): trialJson = requests.get( API_URL + "trials/{}/".format(trial_id), @@ -89,6 +105,8 @@ def get_trial_json(trial_id): def get_neutral_trial_id(session_id): session = get_session_json(session_id) + if session['isMono']: + return None neutral_ids = [t['id'] for t in session['trials'] if t['name']=='neutral'] if len(neutral_ids)>0: @@ -125,28 +143,36 @@ def get_camera_mapping(session_id, session_path): if not os.path.exists(mappingPath): mappingURL = trial['results'][resultTags.index('camera_mapping')]['media'] download_file(mappingURL, mappingPath) - -def get_model_and_metadata(session_id, session_path): - neutral_id = get_neutral_trial_id(session_id) - trial = get_trial_json(neutral_id) - resultTags = [res['tag'] for res in trial['results']] - - # Metadata. + +def get_metadata(session_path, trial, resultTags): metadataPath = os.path.join(session_path,'sessionMetadata.yaml') - if not os.path.exists(metadataPath) : + if not os.path.exists(metadataPath): metadataURL = trial['results'][resultTags.index('session_metadata')]['media'] download_file(metadataURL, metadataPath) - # Model. + +def get_model(session_path, trial, resultTags, isMono=False): modelURL = trial['results'][resultTags.index('opensim_model')]['media'] modelName = modelURL[modelURL.rfind('-')+1:modelURL.rfind('?')] modelFolder = os.path.join(session_path, 'OpenSimData', 'Model') + if isMono: + modelFolder = os.path.join(modelFolder, trial['name']) modelPath = os.path.join(modelFolder, modelName) if not os.path.exists(modelPath): os.makedirs(modelFolder, exist_ok=True) download_file(modelURL, modelPath) - + return modelName + + +def get_model_and_metadata(session_id, session_path): + neutral_id = get_neutral_trial_id(session_id) + trial = get_trial_json(neutral_id) + resultTags = [res['tag'] for res in trial['results']] + + get_metadata(session_path, trial, resultTags) + modelName = get_model(session_path, trial, resultTags) + return modelName def get_main_settings(session_folder,trial_name): @@ -169,34 +195,41 @@ def get_model_name_from_metadata(sessionFolder,appendText='_scaled'): return modelName -def get_motion_data(trial_id, session_path): +def get_motion_data(trial_id, session_path, isMono=False): trial = get_trial_json(trial_id) trial_name = trial['name'] resultTags = [res['tag'] for res in trial['results']] # Marker data. - if 'ik_results' in resultTags: + if 'marker_data' in resultTags: markerFolder = os.path.join(session_path, 'MarkerData') markerPath = os.path.join(markerFolder, trial_name + '.trc') os.makedirs(markerFolder, exist_ok=True) - markerURL = trial['results'][resultTags.index('marker_data')]['media'] - download_file(markerURL, markerPath) + if not os.path.exists(markerPath): + markerURL = trial['results'][resultTags.index('marker_data')]['media'] + download_file(markerURL, markerPath) # IK data. if 'ik_results' in resultTags: ikFolder = os.path.join(session_path, 'OpenSimData', 'Kinematics') ikPath = os.path.join(ikFolder, trial_name + '.mot') os.makedirs(ikFolder, exist_ok=True) - ikURL = trial['results'][resultTags.index('ik_results')]['media'] - download_file(ikURL, ikPath) + if not os.path.exists(ikPath): + ikURL = trial['results'][resultTags.index('ik_results')]['media'] + download_file(ikURL, ikPath) + + # Model data if mono trial (isMono = True in session JSON) + if isMono: + get_model(session_path, trial, resultTags, isMono=True) # Main settings if 'main_settings' in resultTags: settingsFolder = os.path.join(session_path, 'MarkerData', 'Settings') settingsPath = os.path.join(settingsFolder, 'settings_' + trial_name + '.yaml') os.makedirs(settingsFolder, exist_ok=True) - settingsURL = trial['results'][resultTags.index('main_settings')]['media'] - download_file(settingsURL, settingsPath) + if not os.path.exists(settingsPath): + settingsURL = trial['results'][resultTags.index('main_settings')]['media'] + download_file(settingsURL, settingsPath) def get_geometries(session_path, modelName='LaiUhlrich2022_scaled'): @@ -253,34 +286,65 @@ def download_kinematics(session_id, folder=None, trialNames=None): if folder is None: folder = os.getcwd() os.makedirs(folder, exist_ok=True) + + sessionJson = get_session_json(session_id) + isMono = sessionJson['isMono'] - # Model and metadata. - neutral_id = get_neutral_trial_id(session_id) - get_motion_data(neutral_id, folder) - modelName = get_model_and_metadata(session_id, folder) - # Remove extension from modelName - modelName = modelName.replace('.osim','') + if not isMono: + # Model and metadata from neutral trial + neutral_id = get_neutral_trial_id(session_id) + get_motion_data(neutral_id, folder) + modelName = get_model_and_metadata(session_id, folder) + # Remove extension from modelName + modelName = modelName.replace('.osim','') # Session trial names. - sessionJson = get_session_json(session_id) sessionTrialNames = [t['name'] for t in sessionJson['trials']] if trialNames != None: [print(t + ' not in session trial names.') for t in trialNames if t not in sessionTrialNames] - # Motion data. + # Get dynamic trial IDs + dynamic_ids = [t['id'] for t in sessionJson['trials'] if (t['name'] != 'calibration' and t['name'] !='neutral')] + + # Metadata for mono session + if isMono: + # Get metadata from the first dynamic trial + if dynamic_ids: + first_trial = get_trial_json(dynamic_ids[0]) + resultTags = [res['tag'] for res in first_trial['results']] + get_metadata(folder, first_trial, resultTags) + + # Motion data for all dynamic trials loadedTrialNames = [] for trialDict in sessionJson['trials']: if trialNames is not None and trialDict['name'] not in trialNames: continue trial_id = trialDict['id'] - get_motion_data(trial_id,folder) + get_motion_data(trial_id,folder, isMono=isMono) loadedTrialNames.append(trialDict['name']) # Remove 'calibration' and 'neutral' from loadedTrialNames. loadedTrialNames = [i for i in loadedTrialNames if i!='neutral' and i!='calibration'] - + # Geometries. + if isMono: + # For mono sessions, find model names in subfolders + modelDir = os.path.join(folder, 'OpenSimData', 'Model') + if os.path.exists(modelDir): + modelNames = [] + for subfolder in os.listdir(modelDir): + subfolderPath = os.path.join(modelDir, subfolder) + if os.path.isdir(subfolderPath): + modelNames.extend([f for f in os.listdir(subfolderPath) if f.endswith('.osim')]) + if modelNames: + # Use first model name found (assuming same model type for all trials) + modelName = modelNames[0].replace('.osim', '') + else: + raise ValueError("No model files found in mono session subfolders") + else: + raise ValueError("Model directory does not exist") + get_geometries(folder, modelName=modelName) return loadedTrialNames, modelName @@ -293,12 +357,20 @@ def download_trial(trial_id, folder, session_id=None): session_id = trial['session_id'] os.makedirs(folder,exist_ok=True) + + # check if it is a mono trial + session = get_session_json(session_id) + isMono = session['isMono'] - # download model - get_model_and_metadata(session_id, folder) - - # download trc and mot - get_motion_data(trial_id,folder) + if isMono: + resultTags = [res['tag'] for res in trial['results']] + get_metadata(folder, trial, resultTags) + else: + # download model + get_model_and_metadata(session_id, folder) + + # download trc and mot + model if mono trial + get_motion_data(trial_id,folder, isMono=isMono) return trial['name'] @@ -475,8 +547,7 @@ def download_videos_from_server(session_id,trial_id, with open(os.path.join(session_path, "Videos", 'mappingCamDevice.pickle'), 'rb') as handle: mappingCamDevice = pickle.load(handle) # ensure upper on deviceID - for dID in mappingCamDevice.keys(): - mappingCamDevice[dID.upper()] = mappingCamDevice.pop(dID) + mappingCamDevice = {k.upper(): v for k, v in mappingCamDevice.items()} for video in trial["videos"]: k = mappingCamDevice[video["device_id"].replace('-', '').upper()] videoDir = os.path.join(session_path, "Videos", "Cam{}".format(k), "InputMedia", trial_name) @@ -555,7 +626,54 @@ def post_file_to_trial(filePath,trial_id,tag,device_id): requests.post("{}results/".format(API_URL), files=files, data=data, headers = {"Authorization": "Token {}".format(API_TOKEN)}) files["media"].close() + +def post_video_to_trial(filePath,trial_id,device_id,parameters): + files = {'video': open(filePath, 'rb')} + data = { + "trial": trial_id, + "device_id" : device_id, + "parameters": parameters + } + + requests.post("{}videos/".format(API_URL), files=files, data=data, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + files["video"].close() + +def delete_video_from_trial(video_id): + + requests.delete("{}videos/{}/".format(API_URL, video_id), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) +def delete_results(trial_id, tag=None, resultNum=None): + # Delete specific result number, or all results with a specific tag, or all results if tag==None + if resultNum != None: + resultNums = [resultNum] + elif tag != None: + trial = get_trial_json(trial_id) + resultNums = [r['id'] for r in trial['results'] if r['tag']==tag] + + elif tag == None: + trial = get_trial_json(trial_id) + resultNums = [r['id'] for r in trial['results']] + + for rNum in resultNums: + requests.delete(API_URL + "results/{}/".format(rNum), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + +def set_trial_status(trial_id, status): + + # Available statuses: 'done', 'error', 'stopped', 'reprocess' + # 'processing' and 'recording also exist, but it does not make sense to set them manually. + # Throw error if status is not one of the above. + if status not in ['done', 'error', 'stopped', 'reprocess']: + raise ValueError('Invalid status. Available statuses: done, error, stopped, reprocess') + + requests.patch(API_URL+"trials/{}/".format(trial_id), data={'status': status}, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + +def set_session_subject(session_id, subject_id): + requests.patch(API_URL+"sessions/{}/".format(session_id), data={'subject': subject_id}, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) def get_syncd_videos(trial_id,session_path): trial = requests.get("{}trials/{}/".format(API_URL,trial_id), @@ -584,44 +702,57 @@ def download_session(session_id, sessionBasePath= None, session = get_session_json(session_id) session_path = os.path.join(sessionBasePath,'OpenCapData_' + session_id) + + os.makedirs(session_path, exist_ok=True) + + isMono = session['isMono'] - calib_id = get_calibration_trial_id(session_id) - neutral_id = get_neutral_trial_id(session_id) + if not isMono: + calib_id = get_calibration_trial_id(session_id) + neutral_id = get_neutral_trial_id(session_id) + + # Calibration + try: + get_camera_mapping(session_id, session_path) + if downloadVideos: + download_videos_from_server(session_id,calib_id, + isCalibration=True,isStaticPose=False, + session_path = session_path) + + get_calibration(session_id,session_path) + except: + pass + + # Neutral + try: + modelName = get_model_and_metadata(session_id,session_path) + get_motion_data(neutral_id,session_path) + if downloadVideos: + download_videos_from_server(session_id,neutral_id, + isCalibration=False,isStaticPose=True, + session_path = session_path) + + get_syncd_videos(neutral_id,session_path) + except: + pass + dynamic_ids = [t['id'] for t in session['trials'] if (t['name'] != 'calibration' and t['name'] !='neutral')] - - # Calibration - try: - get_camera_mapping(session_id, session_path) - if downloadVideos: - download_videos_from_server(session_id,calib_id, - isCalibration=True,isStaticPose=False, - session_path = session_path) - get_calibration(session_id,session_path) - except: - pass - - # Neutral - try: - modelName = get_model_and_metadata(session_id,session_path) - get_motion_data(neutral_id,session_path) - if downloadVideos: - download_videos_from_server(session_id,neutral_id, - isCalibration=False,isStaticPose=True, - session_path = session_path) - - get_syncd_videos(neutral_id,session_path) - except: - pass + # Metadata for mono session + if isMono: + # hand the first dynamic trial + first_trial = get_trial_json(dynamic_ids[0]) + resultTags = [res['tag'] for res in first_trial['results']] + get_metadata(session_path, first_trial, resultTags) # Dynamic for dynamic_id in dynamic_ids: try: - get_motion_data(dynamic_id,session_path) + get_motion_data(dynamic_id,session_path, isMono=isMono) if downloadVideos: download_videos_from_server(session_id,dynamic_id, isCalibration=False,isStaticPose=False, - session_path = session_path) + session_path=session_path) get_syncd_videos(dynamic_id,session_path) except: @@ -639,16 +770,33 @@ def download_session(session_id, sessionBasePath= None, # Geometry try: - if 'Lai' in modelName: - modelType = 'LaiArnold' + if isMono: + # get all names of .osim files in subfolders of Model folder + modelDir = os.path.join(session_path, 'OpenSimData', 'Model') + modelNames = [] + for subfolder in os.listdir(modelDir): + subfolderPath = os.path.join(modelDir, subfolder) + if os.path.isdir(subfolderPath): + modelNames.extend([f for f in os.listdir(subfolderPath) if f.endswith('.osim')]) + # check if any of the model names contain 'Lai', assuming the same model type is used for all trials of the session + if any('Lai' in name for name in modelNames): + modelType = 'LaiArnold' + else: + raise ValueError("Geometries not available for this model, please contact us") + modelName = modelNames[0] + else: - raise ValueError("Geometries not available for this model, please contact us") + if 'Lai' in modelName: + modelType = 'LaiArnold' + else: + raise ValueError("Geometries not available for this model, please contact us") + if platform.system() == 'Windows': geometryDir = os.path.join(repoDir, 'tmp', modelType, 'Geometry') else: geometryDir = "/tmp/{}/Geometry".format(modelType) - # If not in cache, download from s3. - if not os.path.exists(geometryDir): + # If not in cache or empty, download from s3. + if not os.path.exists(geometryDir) or not os.listdir(geometryDir): os.makedirs(geometryDir, exist_ok=True) get_geometries(session_path, modelName=modelName) geometryDirEnd = os.path.join(session_path, 'OpenSimData', 'Model', 'Geometry') diff --git a/squat_analysis/function/utilsKinematics.py b/squat_analysis/function/utilsKinematics.py index c7c5f96..7d63bd5 100644 --- a/squat_analysis/function/utilsKinematics.py +++ b/squat_analysis/function/utilsKinematics.py @@ -20,6 +20,7 @@ import os import opensim +import copy import utils import numpy as np import pandas as pd @@ -28,6 +29,15 @@ from utilsProcessing import lowPassFilter from utilsTRC import trc_2_dict +import numpy as np +from scipy.spatial.transform import Rotation + +# Import marker name mapping for conversion +try: + from marker_name_mapping import REVERSE_MARKER_NAME_MAPPING +except ImportError: + # If mapping file doesn't exist, use empty dict (no conversion) + REVERSE_MARKER_NAME_MAPPING = {} class kinematics: @@ -43,18 +53,50 @@ def __init__(self, sessionDir, trialName, opensim.Logger.setLevelString('error') modelBasePath = os.path.join(sessionDir, 'OpenSimData', 'Model') + + # Check if this is a mono session (models stored in trial subfolders) + # Check specifically for a subfolder matching the trial name + isMono = False + if os.path.exists(modelBasePath): + trialModelPath = os.path.join(modelBasePath, trialName) + if os.path.isdir(trialModelPath): + isMono = True + # Load model if specified, otherwise load the one that was on server if modelName is None: - modelName = utils.get_model_name_from_metadata(sessionDir) - modelPath = os.path.join(modelBasePath,modelName) + if isMono: + # For mono sessions, look in the trial subfolder + trialModelPath = os.path.join(modelBasePath, trialName) + if os.path.exists(trialModelPath): + # Find .osim file in the trial subfolder + osimFiles = [f for f in os.listdir(trialModelPath) if f.endswith('.osim')] + if osimFiles: + modelPath = os.path.join(trialModelPath, osimFiles[0]) + else: + raise Exception('No .osim file found in ' + trialModelPath) + else: + raise Exception('Trial model folder does not exist: ' + trialModelPath) + else: + modelName = utils.get_model_name_from_metadata(sessionDir) + modelPath = os.path.join(modelBasePath, modelName) else: - modelPath = os.path.join(modelBasePath, - '{}.osim'.format(modelName)) + if isMono: + # For mono sessions, look in the trial subfolder + trialModelPath = os.path.join(modelBasePath, trialName) + if not modelName.endswith('.osim'): + modelName = modelName + '.osim' + modelPath = os.path.join(trialModelPath, modelName) + else: + if not modelName.endswith('.osim'): + modelPath = os.path.join(modelBasePath, '{}.osim'.format(modelName)) + else: + modelPath = os.path.join(modelBasePath, modelName) # make sure model exists if not os.path.exists(modelPath): raise Exception('Model path: ' + modelPath + ' does not exist.') + self.modelPath = modelPath self.model = opensim.Model(modelPath) self.model.initSystem() @@ -62,7 +104,7 @@ def __init__(self, sessionDir, trialName, motionPath = os.path.join(sessionDir, 'OpenSimData', 'Kinematics', '{}.mot'.format(trialName)) - # Create time-series table with coordinate values. + # Create time-series table with coordinate values. self.table = opensim.TimeSeriesTable(motionPath) tableProcessor = opensim.TableProcessor(self.table) self.columnLabels = list(self.table.getColumnLabels()) @@ -102,7 +144,7 @@ def __init__(self, sessionDir, trialName, self.Qds[:,i] = splineD1(self.time) # Coordinate accelerations. splineD2 = spline.derivative(n=2) - self.Qdds[:,i] = splineD2(self.time) + self.Qdds[:,i] = splineD2(self.time) # Add coordinate speeds to table. columnLabel_speed = columnLabel[:-5] + 'speed' self.table.appendColumn( @@ -118,14 +160,14 @@ def __init__(self, sessionDir, trialName, existingLabels = self.table.getColumnLabels() for stateVariableNameStr in stateVariableNamesStr: if not stateVariableNameStr in existingLabels: - vec_0 = opensim.Vector([0] * self.table.getNumRows()) + vec_0 = opensim.Vector([0] * self.table.getNumRows()) self.table.appendColumn(stateVariableNameStr, vec_0) # Number of muscles. self.nMuscles = 0 self.forceSet = self.model.getForceSet() for i in range(self.forceSet.getSize()): - c_force_elt = self.forceSet.get(i) + c_force_elt = self.forceSet.get(i) if 'Muscle' in c_force_elt.getConcreteClassName(): self.nMuscles += 1 @@ -172,6 +214,25 @@ def get_marker_dict(self, session_dir, trial_name, '{}.trc'.format(trial_name)) markerDict = trc_2_dict(trcFilePath) + + # Convert marker names from actual format to expected format (with _study suffix) + if REVERSE_MARKER_NAME_MAPPING: + converted_markers = {} + # First pass: add markers that are already in correct format (prioritize these) + for marker_name, marker_data in markerDict['markers'].items(): + if marker_name not in REVERSE_MARKER_NAME_MAPPING: + # Already in correct format or unknown marker - keep as-is + converted_markers[marker_name] = marker_data + # Second pass: convert markers that need renaming (only if target doesn't exist) + for marker_name, marker_data in markerDict['markers'].items(): + if marker_name in REVERSE_MARKER_NAME_MAPPING: + new_name = REVERSE_MARKER_NAME_MAPPING[marker_name] + # Only convert if the target name doesn't already exist + # (avoids overwriting markers already in correct format) + if new_name not in converted_markers: + converted_markers[new_name] = marker_data + markerDict['markers'] = converted_markers + if lowpass_cutoff_frequency > 0: markerDict['markers'] = { marker_name: lowPassFilter(self.time, data, lowpass_cutoff_frequency) @@ -179,33 +240,40 @@ def get_marker_dict(self, session_dir, trial_name, return markerDict - def get_body_transform_dict(self): - - states_traj = self.stateTrajectory() - states_table = states_traj.exportToTable(self.model) - - body_dict = {} - body_dict['time'] = np.array(states_table.getIndependentColumn()) + def rotate_marker_dict(self, markerDict, euler_angles): + # euler_angles is a dictionary with keys being the axes of rotation + # (x, y, z) and values being the angles in degrees. e.g. {'x': 90, 'y': 180} - body_list = [] - body_transforms_dict = {} - for body in self.model.getBodySet(): - body_list.append(body.getName()) - body_transforms_dict[body.getName()] = [] - body_dict['body_names'] = body_list - - for i in range(self.table.getNumRows()): - this_state = states_traj[i] - self.model.realizePosition(this_state) - - for body in self.model.getBodySet(): - this_body_transform = body.getTransformInGround(this_state) - body_transforms_dict[body.getName()].append(this_body_transform) + rotated_marker_dict = copy.deepcopy(markerDict) + rotated_marker_dict['markers'] = {} - body_dict['body_transforms'] = body_transforms_dict + rotation = Rotation.from_euler(''.join(list(euler_angles.keys())), + list(euler_angles.values()), degrees=True) - return body_dict - + for marker, positions in markerDict['markers'].items(): + rotated_positions = rotation.apply(positions) + rotated_marker_dict['markers'][marker] = rotated_positions + + return rotated_marker_dict + + def rotate_com(self, comValues, euler_angles): + # euler_angles is a dictionary with keys being the axes of rotation + # (x, y, z) and values being the angles in degrees. e.g. {'x': 90, 'y': 180} + + rotation = Rotation.from_euler(''.join(list(euler_angles.keys())), + list(euler_angles.values()), degrees=True) + + # turn the x, y, z dataframe entries into a into a 3xN array + comValuesArray = comValues[['x','y','z']].to_numpy() + + rotated_com = rotation.apply(comValuesArray) + + # turn back into a dataframe with time as first column + rotated_com = pd.DataFrame(data=np.concatenate((np.expand_dims(comValues['time'].to_numpy(), axis=1), rotated_com), axis=1), + columns=['time','x','y','z']) + + return rotated_com + def get_coordinate_values(self, in_degrees=True, lowpass_cutoff_frequency=-1): @@ -228,9 +296,9 @@ def get_coordinate_values(self, in_degrees=True, data = np.concatenate( (np.expand_dims(self.time, axis=1), Qs), axis=1) columns = ['time'] + self.columnLabels - coordinate_values = pd.DataFrame(data=data, columns=columns) + self.coordinate_values = pd.DataFrame(data=data, columns=columns) - return coordinate_values + return self.coordinate_values def get_coordinate_speeds(self, in_degrees=True, lowpass_cutoff_frequency=-1): @@ -433,4 +501,95 @@ def get_center_of_mass_accelerations(self, lowpass_cutoff_frequency=-1): columns = ['time'] + ['x','y','z'] com_accelerations = pd.DataFrame(data=data, columns=columns) - return com_accelerations \ No newline at end of file + return com_accelerations + + def get_body_angular_velocity(self, body_names=None, lowpass_cutoff_frequency=-1, + expressed_in='body'): + + body_set = self.model.getBodySet() + if body_names is None: + body_names = [] + for i in range(body_set.getSize()): + print(i) + body = body_set.get(i) + body_names.append(body.getName()) + + bodies = [body_set.get(body_name) for body_name in body_names] + ground = self.model.getGround() + + angular_velocity = np.ndarray((self.table.getNumRows(), + len(body_names)*3)) # time x bodies x dim + + for i_time in range(self.table.getNumRows()): # loop over time + state = self.stateTrajectory()[i_time] + self.model.realizeVelocity(state) + + + for i_body,body in enumerate(bodies): + ang_vel_in_ground = body.getAngularVelocityInGround(state) + if expressed_in == 'body': + angular_velocity[i_time, i_body*3:i_body*3+3] = ground.expressVectorInAnotherFrame( + state, ang_vel_in_ground, body + ).to_numpy() + elif expressed_in == 'ground': + angular_velocity[i_time, i_body*3:i_body*3+3] = ang_vel_in_ground.to_numpy() + else: + raise Exception (expressed_in + ' is not a valid frame to express angular' + + ' velocity.') + + angular_velocity_filtered = lowPassFilter(self.time, angular_velocity, lowpass_cutoff_frequency) + + # Put into a dataframe + data = np.concatenate((np.expand_dims(self.time, axis=1), angular_velocity_filtered), axis=1) + columns = ['time'] + for i, body_name in enumerate(body_names): + columns += [f'{body_name}_x', f'{body_name}_y', f'{body_name}_z'] + angular_velocity_df = pd.DataFrame(data=data, columns=columns) + + return angular_velocity_df + + def get_ranges_of_motion(self, in_degrees=True, lowpass_cutoff_frequency=-1): + + self.get_coordinate_values( + in_degrees=in_degrees, + lowpass_cutoff_frequency=lowpass_cutoff_frequency) + + # Compute ranges of motion. + ROM = {} + for c, coord in enumerate(self.coordinates): + ROM[coord] = {} + ROM[coord]['min'] = self.coordinate_values[coord].min() + ROM[coord]['max'] = self.coordinate_values[coord].max() + ROM[coord]['amplitude'] = ( + self.coordinate_values[coord].max() - + self.coordinate_values[coord].min()) + + return ROM + + + def get_body_transform_dict(self): + + states_traj = self.stateTrajectory() + states_table = states_traj.exportToTable(self.model) + + body_dict = {} + body_dict['time'] = np.array(states_table.getIndependentColumn()) + + body_list = [] + body_transforms_dict = {} + for body in self.model.getBodySet(): + body_list.append(body.getName()) + body_transforms_dict[body.getName()] = [] + body_dict['body_names'] = body_list + + for i in range(self.table.getNumRows()): + this_state = states_traj[i] + self.model.realizePosition(this_state) + + for body in self.model.getBodySet(): + this_body_transform = body.getTransformInGround(this_state) + body_transforms_dict[body.getName()].append(this_body_transform) + + body_dict['body_transforms'] = body_transforms_dict + + return body_dict diff --git a/treadmill_gait_analysis/function/handler.py b/treadmill_gait_analysis/function/handler.py index 8c02074..ae29844 100644 --- a/treadmill_gait_analysis/function/handler.py +++ b/treadmill_gait_analysis/function/handler.py @@ -31,8 +31,13 @@ def handler(event, context): To invoke the function do POST request on the following url http://localhost:8080/2015-03-31/functions/function/invocations """ - # temporary placeholder - kwargs = json.loads(event['body']) + body = event.get('body', None) + if isinstance(body, dict): + kwargs = body + elif isinstance(body, str) and body: + kwargs = json.loads(body) + else: + kwargs = event for field in ('session_id', 'specific_trial_names'): if field not in kwargs: diff --git a/treadmill_gait_analysis/function/marker_name_mapping.py b/treadmill_gait_analysis/function/marker_name_mapping.py new file mode 100644 index 0000000..cf18e2a --- /dev/null +++ b/treadmill_gait_analysis/function/marker_name_mapping.py @@ -0,0 +1,56 @@ +""" +Marker name mapping dictionary for converting from expected format (with '_study' suffix) +to actual format (without '_study' suffix, lowercase). + +This mapping is used to rename markers in TRC files to match the expected format +used by the gait_analysis class. + +Expected format: markers end with '_study' (e.g., 'r_calc_study', 'r.ASIS_study') +Actual format: markers without '_study' suffix, lowercase (e.g., 'r_calc', 'r_ASIS') +""" + +MARKER_NAME_MAPPING = { + # Pelvis markers + 'r.ASIS_study': 'r_ASIS', + 'L.ASIS_study': 'l_ASIS', + 'r.PSIS_study': 'r_PSIS', + 'L.PSIS_study': 'l_PSIS', + + # Right leg markers + 'r_knee_study': 'r_knee', + 'r_mknee_study': 'r_mknee', + 'r_ankle_study': 'r_ankle', + 'r_mankle_study': 'r_mankle', + 'r_toe_study': 'r_toe', + 'r_5meta_study': 'r_5meta', + 'r_calc_study': 'r_calc', + + # Left leg markers + 'L_knee_study': 'l_knee', + 'L_mknee_study': 'l_mknee', + 'L_ankle_study': 'l_ankle', + 'L_mankle_study': 'l_mankle', + 'L_toe_study': 'l_toe', + 'L_calc_study': 'l_calc', + 'L_5meta_study': 'l_5meta', + + # Shoulder markers + 'r_shoulder_study': 'r_shoulder', + 'L_shoulder_study': 'l_shoulder', + + # Spine markers + 'C7_study': 'C7', + + # Hip joint centers + 'RHJC_study': 'RHJC', # Check if exists in actual file + 'LHJC_study': 'LHJC', # Check if exists in actual file + + # Elbow markers + 'r_melbow_study': 'r_melbow', + 'L_melbow_study': 'l_melbow', + +} + +# Reverse mapping (actual -> expected) for renaming markers in TRC files +REVERSE_MARKER_NAME_MAPPING = {v: k for k, v in MARKER_NAME_MAPPING.items()} + diff --git a/treadmill_gait_analysis/function/utils.py b/treadmill_gait_analysis/function/utils.py index 1e2b162..78932c4 100644 --- a/treadmill_gait_analysis/function/utils.py +++ b/treadmill_gait_analysis/function/utils.py @@ -34,7 +34,7 @@ from utilsAPI import get_api_url from utilsAuthentication import get_token import matplotlib.pyplot as plt -from scipy.signal import gaussian +from scipy.signal.windows import gaussian API_URL = get_api_url() @@ -80,6 +80,22 @@ def get_user_sessions_all(user_token=API_TOKEN): return sessions +# Returns a list of all subjects of the user. +def get_user_subjects(user_token=API_TOKEN): + subjects = requests.get( + API_URL + "subjects/", + headers = {"Authorization": "Token {}".format(user_token)}).json() + + return subjects + +# Returns a list of all sessions of a subject. +def get_subject_sessions(subject_id, user_token=API_TOKEN): + sessions = requests.get( + API_URL + "subjects/{}/".format(subject_id), + headers = {"Authorization": "Token {}".format(user_token)}).json()['sessions'] + + return sessions + def get_trial_json(trial_id): trialJson = requests.get( API_URL + "trials/{}/".format(trial_id), @@ -89,6 +105,8 @@ def get_trial_json(trial_id): def get_neutral_trial_id(session_id): session = get_session_json(session_id) + if session['isMono']: + return None neutral_ids = [t['id'] for t in session['trials'] if t['name']=='neutral'] if len(neutral_ids)>0: @@ -125,28 +143,36 @@ def get_camera_mapping(session_id, session_path): if not os.path.exists(mappingPath): mappingURL = trial['results'][resultTags.index('camera_mapping')]['media'] download_file(mappingURL, mappingPath) - -def get_model_and_metadata(session_id, session_path): - neutral_id = get_neutral_trial_id(session_id) - trial = get_trial_json(neutral_id) - resultTags = [res['tag'] for res in trial['results']] - - # Metadata. + +def get_metadata(session_path, trial, resultTags): metadataPath = os.path.join(session_path,'sessionMetadata.yaml') - if not os.path.exists(metadataPath) : + if not os.path.exists(metadataPath): metadataURL = trial['results'][resultTags.index('session_metadata')]['media'] download_file(metadataURL, metadataPath) - # Model. + +def get_model(session_path, trial, resultTags, isMono=False): modelURL = trial['results'][resultTags.index('opensim_model')]['media'] modelName = modelURL[modelURL.rfind('-')+1:modelURL.rfind('?')] modelFolder = os.path.join(session_path, 'OpenSimData', 'Model') + if isMono: + modelFolder = os.path.join(modelFolder, trial['name']) modelPath = os.path.join(modelFolder, modelName) if not os.path.exists(modelPath): os.makedirs(modelFolder, exist_ok=True) download_file(modelURL, modelPath) - + return modelName + + +def get_model_and_metadata(session_id, session_path): + neutral_id = get_neutral_trial_id(session_id) + trial = get_trial_json(neutral_id) + resultTags = [res['tag'] for res in trial['results']] + + get_metadata(session_path, trial, resultTags) + modelName = get_model(session_path, trial, resultTags) + return modelName def get_main_settings(session_folder,trial_name): @@ -169,34 +195,41 @@ def get_model_name_from_metadata(sessionFolder,appendText='_scaled'): return modelName -def get_motion_data(trial_id, session_path): +def get_motion_data(trial_id, session_path, isMono=False): trial = get_trial_json(trial_id) trial_name = trial['name'] resultTags = [res['tag'] for res in trial['results']] # Marker data. - if 'ik_results' in resultTags: + if 'marker_data' in resultTags: markerFolder = os.path.join(session_path, 'MarkerData') markerPath = os.path.join(markerFolder, trial_name + '.trc') os.makedirs(markerFolder, exist_ok=True) - markerURL = trial['results'][resultTags.index('marker_data')]['media'] - download_file(markerURL, markerPath) + if not os.path.exists(markerPath): + markerURL = trial['results'][resultTags.index('marker_data')]['media'] + download_file(markerURL, markerPath) # IK data. if 'ik_results' in resultTags: ikFolder = os.path.join(session_path, 'OpenSimData', 'Kinematics') ikPath = os.path.join(ikFolder, trial_name + '.mot') os.makedirs(ikFolder, exist_ok=True) - ikURL = trial['results'][resultTags.index('ik_results')]['media'] - download_file(ikURL, ikPath) + if not os.path.exists(ikPath): + ikURL = trial['results'][resultTags.index('ik_results')]['media'] + download_file(ikURL, ikPath) + + # Model data if mono trial (isMono = True in session JSON) + if isMono: + get_model(session_path, trial, resultTags, isMono=True) # Main settings if 'main_settings' in resultTags: settingsFolder = os.path.join(session_path, 'MarkerData', 'Settings') settingsPath = os.path.join(settingsFolder, 'settings_' + trial_name + '.yaml') os.makedirs(settingsFolder, exist_ok=True) - settingsURL = trial['results'][resultTags.index('main_settings')]['media'] - download_file(settingsURL, settingsPath) + if not os.path.exists(settingsPath): + settingsURL = trial['results'][resultTags.index('main_settings')]['media'] + download_file(settingsURL, settingsPath) def get_geometries(session_path, modelName='LaiUhlrich2022_scaled'): @@ -253,34 +286,65 @@ def download_kinematics(session_id, folder=None, trialNames=None): if folder is None: folder = os.getcwd() os.makedirs(folder, exist_ok=True) + + sessionJson = get_session_json(session_id) + isMono = sessionJson['isMono'] - # Model and metadata. - neutral_id = get_neutral_trial_id(session_id) - get_motion_data(neutral_id, folder) - modelName = get_model_and_metadata(session_id, folder) - # Remove extension from modelName - modelName = modelName.replace('.osim','') + if not isMono: + # Model and metadata from neutral trial + neutral_id = get_neutral_trial_id(session_id) + get_motion_data(neutral_id, folder) + modelName = get_model_and_metadata(session_id, folder) + # Remove extension from modelName + modelName = modelName.replace('.osim','') # Session trial names. - sessionJson = get_session_json(session_id) sessionTrialNames = [t['name'] for t in sessionJson['trials']] if trialNames != None: [print(t + ' not in session trial names.') for t in trialNames if t not in sessionTrialNames] - # Motion data. + # Get dynamic trial IDs + dynamic_ids = [t['id'] for t in sessionJson['trials'] if (t['name'] != 'calibration' and t['name'] !='neutral')] + + # Metadata for mono session + if isMono: + # Get metadata from the first dynamic trial + if dynamic_ids: + first_trial = get_trial_json(dynamic_ids[0]) + resultTags = [res['tag'] for res in first_trial['results']] + get_metadata(folder, first_trial, resultTags) + + # Motion data for all dynamic trials loadedTrialNames = [] for trialDict in sessionJson['trials']: if trialNames is not None and trialDict['name'] not in trialNames: continue trial_id = trialDict['id'] - get_motion_data(trial_id,folder) + get_motion_data(trial_id,folder, isMono=isMono) loadedTrialNames.append(trialDict['name']) # Remove 'calibration' and 'neutral' from loadedTrialNames. loadedTrialNames = [i for i in loadedTrialNames if i!='neutral' and i!='calibration'] - + # Geometries. + if isMono: + # For mono sessions, find model names in subfolders + modelDir = os.path.join(folder, 'OpenSimData', 'Model') + if os.path.exists(modelDir): + modelNames = [] + for subfolder in os.listdir(modelDir): + subfolderPath = os.path.join(modelDir, subfolder) + if os.path.isdir(subfolderPath): + modelNames.extend([f for f in os.listdir(subfolderPath) if f.endswith('.osim')]) + if modelNames: + # Use first model name found (assuming same model type for all trials) + modelName = modelNames[0].replace('.osim', '') + else: + raise ValueError("No model files found in mono session subfolders") + else: + raise ValueError("Model directory does not exist") + get_geometries(folder, modelName=modelName) return loadedTrialNames, modelName @@ -293,12 +357,20 @@ def download_trial(trial_id, folder, session_id=None): session_id = trial['session_id'] os.makedirs(folder,exist_ok=True) + + # check if it is a mono trial + session = get_session_json(session_id) + isMono = session['isMono'] - # download model - get_model_and_metadata(session_id, folder) - - # download trc and mot - get_motion_data(trial_id,folder) + if isMono: + resultTags = [res['tag'] for res in trial['results']] + get_metadata(folder, trial, resultTags) + else: + # download model + get_model_and_metadata(session_id, folder) + + # download trc and mot + model if mono trial + get_motion_data(trial_id,folder, isMono=isMono) return trial['name'] @@ -475,8 +547,7 @@ def download_videos_from_server(session_id,trial_id, with open(os.path.join(session_path, "Videos", 'mappingCamDevice.pickle'), 'rb') as handle: mappingCamDevice = pickle.load(handle) # ensure upper on deviceID - for dID in mappingCamDevice.keys(): - mappingCamDevice[dID.upper()] = mappingCamDevice.pop(dID) + mappingCamDevice = {k.upper(): v for k, v in mappingCamDevice.items()} for video in trial["videos"]: k = mappingCamDevice[video["device_id"].replace('-', '').upper()] videoDir = os.path.join(session_path, "Videos", "Cam{}".format(k), "InputMedia", trial_name) @@ -555,7 +626,54 @@ def post_file_to_trial(filePath,trial_id,tag,device_id): requests.post("{}results/".format(API_URL), files=files, data=data, headers = {"Authorization": "Token {}".format(API_TOKEN)}) files["media"].close() + +def post_video_to_trial(filePath,trial_id,device_id,parameters): + files = {'video': open(filePath, 'rb')} + data = { + "trial": trial_id, + "device_id" : device_id, + "parameters": parameters + } + + requests.post("{}videos/".format(API_URL), files=files, data=data, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + files["video"].close() + +def delete_video_from_trial(video_id): + + requests.delete("{}videos/{}/".format(API_URL, video_id), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) +def delete_results(trial_id, tag=None, resultNum=None): + # Delete specific result number, or all results with a specific tag, or all results if tag==None + if resultNum != None: + resultNums = [resultNum] + elif tag != None: + trial = get_trial_json(trial_id) + resultNums = [r['id'] for r in trial['results'] if r['tag']==tag] + + elif tag == None: + trial = get_trial_json(trial_id) + resultNums = [r['id'] for r in trial['results']] + + for rNum in resultNums: + requests.delete(API_URL + "results/{}/".format(rNum), + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + +def set_trial_status(trial_id, status): + + # Available statuses: 'done', 'error', 'stopped', 'reprocess' + # 'processing' and 'recording also exist, but it does not make sense to set them manually. + # Throw error if status is not one of the above. + if status not in ['done', 'error', 'stopped', 'reprocess']: + raise ValueError('Invalid status. Available statuses: done, error, stopped, reprocess') + + requests.patch(API_URL+"trials/{}/".format(trial_id), data={'status': status}, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) + +def set_session_subject(session_id, subject_id): + requests.patch(API_URL+"sessions/{}/".format(session_id), data={'subject': subject_id}, + headers = {"Authorization": "Token {}".format(API_TOKEN)}) def get_syncd_videos(trial_id,session_path): trial = requests.get("{}trials/{}/".format(API_URL,trial_id), @@ -584,44 +702,57 @@ def download_session(session_id, sessionBasePath= None, session = get_session_json(session_id) session_path = os.path.join(sessionBasePath,'OpenCapData_' + session_id) + + os.makedirs(session_path, exist_ok=True) + + isMono = session['isMono'] - calib_id = get_calibration_trial_id(session_id) - neutral_id = get_neutral_trial_id(session_id) + if not isMono: + calib_id = get_calibration_trial_id(session_id) + neutral_id = get_neutral_trial_id(session_id) + + # Calibration + try: + get_camera_mapping(session_id, session_path) + if downloadVideos: + download_videos_from_server(session_id,calib_id, + isCalibration=True,isStaticPose=False, + session_path = session_path) + + get_calibration(session_id,session_path) + except: + pass + + # Neutral + try: + modelName = get_model_and_metadata(session_id,session_path) + get_motion_data(neutral_id,session_path) + if downloadVideos: + download_videos_from_server(session_id,neutral_id, + isCalibration=False,isStaticPose=True, + session_path = session_path) + + get_syncd_videos(neutral_id,session_path) + except: + pass + dynamic_ids = [t['id'] for t in session['trials'] if (t['name'] != 'calibration' and t['name'] !='neutral')] - - # Calibration - try: - get_camera_mapping(session_id, session_path) - if downloadVideos: - download_videos_from_server(session_id,calib_id, - isCalibration=True,isStaticPose=False, - session_path = session_path) - get_calibration(session_id,session_path) - except: - pass - - # Neutral - try: - modelName = get_model_and_metadata(session_id,session_path) - get_motion_data(neutral_id,session_path) - if downloadVideos: - download_videos_from_server(session_id,neutral_id, - isCalibration=False,isStaticPose=True, - session_path = session_path) - - get_syncd_videos(neutral_id,session_path) - except: - pass + # Metadata for mono session + if isMono: + # hand the first dynamic trial + first_trial = get_trial_json(dynamic_ids[0]) + resultTags = [res['tag'] for res in first_trial['results']] + get_metadata(session_path, first_trial, resultTags) # Dynamic for dynamic_id in dynamic_ids: try: - get_motion_data(dynamic_id,session_path) + get_motion_data(dynamic_id,session_path, isMono=isMono) if downloadVideos: download_videos_from_server(session_id,dynamic_id, isCalibration=False,isStaticPose=False, - session_path = session_path) + session_path=session_path) get_syncd_videos(dynamic_id,session_path) except: @@ -639,16 +770,33 @@ def download_session(session_id, sessionBasePath= None, # Geometry try: - if 'Lai' in modelName: - modelType = 'LaiArnold' + if isMono: + # get all names of .osim files in subfolders of Model folder + modelDir = os.path.join(session_path, 'OpenSimData', 'Model') + modelNames = [] + for subfolder in os.listdir(modelDir): + subfolderPath = os.path.join(modelDir, subfolder) + if os.path.isdir(subfolderPath): + modelNames.extend([f for f in os.listdir(subfolderPath) if f.endswith('.osim')]) + # check if any of the model names contain 'Lai', assuming the same model type is used for all trials of the session + if any('Lai' in name for name in modelNames): + modelType = 'LaiArnold' + else: + raise ValueError("Geometries not available for this model, please contact us") + modelName = modelNames[0] + else: - raise ValueError("Geometries not available for this model, please contact us") + if 'Lai' in modelName: + modelType = 'LaiArnold' + else: + raise ValueError("Geometries not available for this model, please contact us") + if platform.system() == 'Windows': geometryDir = os.path.join(repoDir, 'tmp', modelType, 'Geometry') else: geometryDir = "/tmp/{}/Geometry".format(modelType) - # If not in cache, download from s3. - if not os.path.exists(geometryDir): + # If not in cache or empty, download from s3. + if not os.path.exists(geometryDir) or not os.listdir(geometryDir): os.makedirs(geometryDir, exist_ok=True) get_geometries(session_path, modelName=modelName) geometryDirEnd = os.path.join(session_path, 'OpenSimData', 'Model', 'Geometry') diff --git a/treadmill_gait_analysis/function/utilsKinematics.py b/treadmill_gait_analysis/function/utilsKinematics.py index 2eb1b8c..f82bc37 100644 --- a/treadmill_gait_analysis/function/utilsKinematics.py +++ b/treadmill_gait_analysis/function/utilsKinematics.py @@ -32,6 +32,13 @@ import numpy as np from scipy.spatial.transform import Rotation +# Import marker name mapping for conversion +try: + from marker_name_mapping import REVERSE_MARKER_NAME_MAPPING +except ImportError: + # If mapping file doesn't exist, use empty dict (no conversion) + REVERSE_MARKER_NAME_MAPPING = {} + class kinematics: @@ -46,18 +53,50 @@ def __init__(self, sessionDir, trialName, opensim.Logger.setLevelString('error') modelBasePath = os.path.join(sessionDir, 'OpenSimData', 'Model') + + # Check if this is a mono session (models stored in trial subfolders) + # Check specifically for a subfolder matching the trial name + isMono = False + if os.path.exists(modelBasePath): + trialModelPath = os.path.join(modelBasePath, trialName) + if os.path.isdir(trialModelPath): + isMono = True + # Load model if specified, otherwise load the one that was on server if modelName is None: - modelName = utils.get_model_name_from_metadata(sessionDir) - modelPath = os.path.join(modelBasePath,modelName) + if isMono: + # For mono sessions, look in the trial subfolder + trialModelPath = os.path.join(modelBasePath, trialName) + if os.path.exists(trialModelPath): + # Find .osim file in the trial subfolder + osimFiles = [f for f in os.listdir(trialModelPath) if f.endswith('.osim')] + if osimFiles: + modelPath = os.path.join(trialModelPath, osimFiles[0]) + else: + raise Exception('No .osim file found in ' + trialModelPath) + else: + raise Exception('Trial model folder does not exist: ' + trialModelPath) + else: + modelName = utils.get_model_name_from_metadata(sessionDir) + modelPath = os.path.join(modelBasePath, modelName) else: - modelPath = os.path.join(modelBasePath, - '{}.osim'.format(modelName)) + if isMono: + # For mono sessions, look in the trial subfolder + trialModelPath = os.path.join(modelBasePath, trialName) + if not modelName.endswith('.osim'): + modelName = modelName + '.osim' + modelPath = os.path.join(trialModelPath, modelName) + else: + if not modelName.endswith('.osim'): + modelPath = os.path.join(modelBasePath, '{}.osim'.format(modelName)) + else: + modelPath = os.path.join(modelBasePath, modelName) # make sure model exists if not os.path.exists(modelPath): raise Exception('Model path: ' + modelPath + ' does not exist.') + self.modelPath = modelPath self.model = opensim.Model(modelPath) self.model.initSystem() @@ -175,6 +214,25 @@ def get_marker_dict(self, session_dir, trial_name, '{}.trc'.format(trial_name)) markerDict = trc_2_dict(trcFilePath) + + # Convert marker names from actual format to expected format (with _study suffix) + if REVERSE_MARKER_NAME_MAPPING: + converted_markers = {} + # First pass: add markers that are already in correct format (prioritize these) + for marker_name, marker_data in markerDict['markers'].items(): + if marker_name not in REVERSE_MARKER_NAME_MAPPING: + # Already in correct format or unknown marker - keep as-is + converted_markers[marker_name] = marker_data + # Second pass: convert markers that need renaming (only if target doesn't exist) + for marker_name, marker_data in markerDict['markers'].items(): + if marker_name in REVERSE_MARKER_NAME_MAPPING: + new_name = REVERSE_MARKER_NAME_MAPPING[marker_name] + # Only convert if the target name doesn't already exist + # (avoids overwriting markers already in correct format) + if new_name not in converted_markers: + converted_markers[new_name] = marker_data + markerDict['markers'] = converted_markers + if lowpass_cutoff_frequency > 0: markerDict['markers'] = { marker_name: lowPassFilter(self.time, data, lowpass_cutoff_frequency) @@ -211,7 +269,7 @@ def rotate_com(self, comValues, euler_angles): rotated_com = rotation.apply(comValuesArray) # turn back into a dataframe with time as first column - rotated_com = pd.DataFrame(data=np.concatenate((np.expand_dims(comValues['time'].to_numpy, axis=1), rotated_com), axis=1), + rotated_com = pd.DataFrame(data=np.concatenate((np.expand_dims(comValues['time'].to_numpy(), axis=1), rotated_com), axis=1), columns=['time','x','y','z']) return rotated_com