-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdataframe_comparison_normalized.py
More file actions
141 lines (111 loc) · 5.1 KB
/
dataframe_comparison_normalized.py
File metadata and controls
141 lines (111 loc) · 5.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import pandas as pd
import numpy as np
import os
import warnings
import matplotlib.pyplot as plt
from fastdtw import fastdtw
from scipy.spatial.distance import euclidean
from itertools import combinations
# --- Configurações ---
DATA_DIR = 'data'
OUTPUT_DIR = 'comparativos_output'
os.makedirs(OUTPUT_DIR, exist_ok=True)
def get_all_2d_columns(df:pd.DataFrame) -> list[str]:
return [col for col in df.columns if col.endswith(('_x', '_y'))]
def normalize_pose_scale(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
"""
Normaliza os pontos de cada frame.
Centraliza o esqueleto no (0,0) e o escala para que o tamanho máximo seja 1.0.
"""
df_norm = df.copy()
x_cols = [c for c in cols if c.endswith('_x')]
y_cols = [c for c in cols if c.endswith('_y')]
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
for i, row in df_norm.iterrows():
x_vals = row[x_cols].values.astype(float)
y_vals = row[y_cols].values.astype(float)
# Pula frames vazios
if np.isnan(x_vals).all() or np.isnan(y_vals).all():
continue
# 1. Translação (Centralizar no 0,0)
x_center = np.nanmean(x_vals)
y_center = np.nanmean(y_vals)
x_centered = x_vals - x_center
y_centered = y_vals - y_center
# 2. Escala (Proporção máxima = 1.0)
distances = np.sqrt(x_centered**2 + y_centered**2)
max_dist = np.nanmax(distances)
if max_dist > 0:
x_scaled = x_centered / max_dist
y_scaled = y_centered / max_dist
else:
x_scaled = x_centered
y_scaled = y_centered
df_norm.loc[i, x_cols] = x_scaled
df_norm.loc[i, y_cols] = y_scaled
return df_norm
def get_dtw_path(df1_norm:pd.DataFrame, df2_norm:pd.DataFrame, cols:list[str]):
"""Calcula o DTW usando dados já normalizados."""
s1 = df1_norm[cols].ffill().bfill().fillna(0).values
s2 = df2_norm[cols].ffill().bfill().fillna(0).values
distance, path = fastdtw(s1, s2, dist=euclidean)
return path, distance
def compare_pair(file1_info, file2_info):
df1 = pd.read_parquet(file1_info['parquet'])
df2 = pd.read_parquet(file2_info['parquet'])
# Get columns for both
cols1 = set(get_all_2d_columns(df1))
cols2 = set(get_all_2d_columns(df2))
# Intersection: Only use columns that exist in BOTH files
common_cols = sorted(list(cols1.intersection(cols2)))
if not common_cols:
print(f"⚠️ Pular: Nenhuma coluna em comum entre {file1_info['name']} e {file2_info['name']}")
return None
# --- APLICA A NORMALIZAÇÃO ANTES DO CÁLCULO ---
df1_norm = normalize_pose_scale(df1, common_cols)
df2_norm = normalize_pose_scale(df2, common_cols)
# Calculate DTW using only the NORMALIZED shared points
path, dist = get_dtw_path(df1_norm, df2_norm, common_cols)
avg_dist = dist / len(path)
pair_name = f"{file1_info['name']}_vs_{file2_info['name']}"
print(f"Processando: {pair_name} | Pontos usados: {len(common_cols)//2} | Distância Normalizada: {avg_dist:.4f}")
return {"pair": pair_name, "distance": dist, "avg_distance": avg_dist}
def main():
# 1. Mapear todos os sinais disponíveis
signals_map = []
# Varre as subpastas em /data
for signal_name in os.listdir(DATA_DIR):
signal_path = os.path.join(DATA_DIR, signal_name)
if os.path.isdir(signal_path):
# Encontrar pares de video/parquet
files = os.listdir(signal_path)
parquets = [f for f in files if f.endswith('.parquet')]
for p in parquets:
base_name = p.replace('.parquet', '')
video_file = base_name + '.mp4'
if video_file in files:
signals_map.append({
'signal_group': signal_name,
'name': base_name,
'video': os.path.join(signal_path, video_file),
'parquet': os.path.join(signal_path, p)
})
# 2. Gerar todas as combinações possíveis (All-to-All)
all_pairs = list(combinations(signals_map, 2))
print(f"Total de sinais encontrados: {len(signals_map)}")
print(f"Total de comparações a realizar: {len(all_pairs)}")
results = []
for f1, f2 in all_pairs:
res = compare_pair(f1, f2)
if res is not None:
results.append(res)
# 3. Salvar Relatório de Rankings
df_results = pd.DataFrame(results)
# Ordenar pelos menores valores de distância média (mais parecidos primeiro)
if not df_results.empty:
df_results = df_results.sort_values(by='avg_distance', ascending=True)
df_results.to_csv(os.path.join(OUTPUT_DIR, 'ranking_similaridade.csv'), index=False)
print("\nRanking de similaridade salvo e ordenado em 'ranking_similaridade.csv'")
if __name__ == "__main__":
main()