-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsave_datasets.py
More file actions
148 lines (115 loc) · 5.3 KB
/
save_datasets.py
File metadata and controls
148 lines (115 loc) · 5.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from __future__ import annotations
import os
import logging
from pathlib import Path
from typing import Any
import pandas as pd
from tqdm import tqdm
from river_config import TRACKS
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def save_dataset(dataset: Any, dataset_name: str, track_name: str, output_dir: Path) -> None:
"""Sauvegarde un dataset River en format CSV local.
Args:
dataset: Dataset River (itérable de (x, y))
dataset_name: Nom du dataset
track_name: Nom du track (pour organisation)
output_dir: Répertoire de sortie
"""
# Créer le répertoire pour ce track
track_dir = output_dir / track_name.replace(" ", "_").lower()
track_dir.mkdir(parents=True, exist_ok=True)
# Nom du fichier
safe_name = dataset_name.replace(" ", "_").replace("/", "_").lower()
output_path = track_dir / f"{safe_name}.csv"
# Si le fichier existe déjà, on skip (pour éviter de re-télécharger)
if output_path.exists():
logger.info(f"Dataset {dataset_name} déjà sauvegardé, skip...")
return
logger.info(f"Sauvegarde de {dataset_name} ({track_name})...")
# Collecter toutes les données
rows = []
feature_names = None
try:
# Itérer sur le dataset
for x, y in tqdm(dataset, desc=f"Collecting {dataset_name}", leave=False):
# x est un dict de features, y est la target
row = dict(x)
row["target"] = y
# Stocker les noms de features au premier passage
if feature_names is None:
feature_names = list(x.keys())
rows.append(row)
# Créer un DataFrame
if not rows:
logger.warning(f"Dataset {dataset_name} est vide!")
return
df = pd.DataFrame(rows)
# Réorganiser les colonnes: features d'abord, target à la fin
cols = [c for c in df.columns if c != "target"] + ["target"]
df = df[cols]
# Sauvegarder en CSV
df.to_csv(output_path, index=False)
logger.info(f"✓ Sauvegardé: {output_path} ({len(df)} lignes, {len(df.columns)-1} features)")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde de {dataset_name}: {e}")
# Sauvegarder un fichier vide ou avec erreur pour indiquer le problème
error_path = track_dir / f"{safe_name}_ERROR.txt"
with open(error_path, "w") as f:
f.write(f"Erreur lors de la sauvegarde: {e}\n")
def save_all_datasets(output_dir: str = "./datasets") -> None:
"""Sauvegarde tous les datasets de tous les tracks.
Args:
output_dir: Répertoire de sortie pour les datasets
"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
logger.info(f"Sauvegarde de tous les datasets dans {output_path.absolute()}")
# Télécharger tous les datasets d'abord
logger.info("Téléchargement des datasets...")
for track in TRACKS:
for dataset in track.datasets:
try:
dataset.download(verbose=False)
except Exception as e:
logger.warning(f"Erreur lors du téléchargement de {dataset.__class__.__name__}: {e}")
# Sauvegarder chaque dataset
total_datasets = sum(len(track.datasets) for track in TRACKS)
logger.info(f"Traitement de {total_datasets} datasets...")
for track in TRACKS:
track_name = track.name
logger.info(f"\n{'='*60}")
logger.info(f"Track: {track_name}")
logger.info(f"{'='*60}")
for dataset in track.datasets:
dataset_name = dataset.__class__.__name__
try:
# Créer une copie du dataset pour éviter les problèmes de réutilisation
# Certains datasets peuvent être des générateurs qui se consomment
save_dataset(dataset, dataset_name, track_name, output_path)
except Exception as e:
logger.error(f"Erreur avec {dataset_name}: {e}")
logger.info(f"\n{'='*60}")
logger.info(f"Terminé! Datasets sauvegardés dans {output_path.absolute()}")
# Créer un fichier récapitulatif
summary_path = output_path / "datasets_summary.txt"
with open(summary_path, "w") as f:
f.write("Résumé des datasets sauvegardés\n")
f.write("=" * 60 + "\n\n")
for track in TRACKS:
f.write(f"Track: {track.name}\n")
f.write("-" * 60 + "\n")
for dataset in track.datasets:
dataset_name = dataset.__class__.__name__
safe_name = dataset_name.replace(" ", "_").replace("/", "_").lower()
track_dir_name = track.name.replace(" ", "_").lower()
file_path = output_path / track_dir_name / f"{safe_name}.csv"
if file_path.exists():
df = pd.read_csv(file_path)
f.write(f" - {dataset_name}: {file_path.name} ({len(df)} lignes, {len(df.columns)-1} features)\n")
else:
f.write(f" - {dataset_name}: NON SAUVEGARDÉ\n")
f.write("\n")
logger.info(f"Résumé sauvegardé dans {summary_path}")
if __name__ == "__main__":
save_all_datasets()