-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathimport_os.py
More file actions
173 lines (143 loc) · 6.15 KB
/
import_os.py
File metadata and controls
173 lines (143 loc) · 6.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import os
import pandas as pd
import requests
import gzip
# Constants
URL = "https://ftp.ncbi.nlm.nih.gov/pub/clinvar/tab_delimited/variant_summary.txt.gz"
ARCHIVE_BASE_URL = "https://ftp.ncbi.nlm.nih.gov/pub/clinvar/tab_delimited/archive"
OUTPUT_GZ = "data/variant_summary.txt.gz"
OUTPUT_TXT = "data/variant_summary.txt"
PROCESSED_CSV = "data/processed_variants.csv"
# Download gzipped file
def download_file(url, output_path):
print(f"Downloading: {url}")
try:
response = requests.get(url, stream=True)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(output_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
percent = (downloaded / total_size) * 100
print(f"\rProgress: {percent:.1f}%", end="", flush=True)
print(f"\nDownload completed: {output_path}")
return True
except requests.exceptions.RequestException as e:
print(f"Error downloading {url}: {e}")
return False
# Download historical files from archive
def download_historical_files(years, output_dir="data"):
"""Download December snapshot for each year from archive"""
os.makedirs(output_dir, exist_ok=True)
downloaded_files = []
for year in years:
filename = f"variant_summary_{year}-10.txt.gz"
url = f"{ARCHIVE_BASE_URL}/{year}/{filename}"
output_path = os.path.join(output_dir, filename)
if os.path.exists(output_path):
print(f"File already exists: {output_path}")
downloaded_files.append(output_path)
continue
print(f"Downloading {year} December snapshot...")
if download_file(url, output_path):
downloaded_files.append(output_path)
else:
print(f"Failed to download {year} data")
return downloaded_files
def download_current_file(output_dir="data"):
"""Download most recent variant_summary.txt.gz"""
os.makedirs(output_dir, exist_ok=True)
current_filename = "variant_summary_current.txt.gz"
output_path = os.path.join(output_dir, current_filename)
print("Downloading current ClinVar data...")
if download_file(URL, output_path):
return output_path
else:
print("Failed to download current data")
return None
def download_recent_years(output_dir="data"):
"""Download 2024 and 2025 data from archive directory"""
os.makedirs(output_dir, exist_ok=True)
downloaded_files = []
# 2024 and 2025 files are directly in the archive directory
recent_files = [
"variant_summary_2024-10.txt.gz",
"variant_summary_2025-10.txt.gz"
]
for filename in recent_files:
url = f"{ARCHIVE_BASE_URL}/{filename}"
output_path = os.path.join(output_dir, filename)
if os.path.exists(output_path):
print(f"File already exists: {output_path}")
downloaded_files.append(output_path)
continue
year = filename.split('_')[2].split('-')[0]
print(f"Downloading {year} October snapshot...")
if download_file(url, output_path):
downloaded_files.append(output_path)
else:
print(f"Failed to download {year} data")
return downloaded_files
# Parse and clean
def process_file(txt_path, csv_path):
df = pd.read_csv(txt_path, sep="\t", low_memory=False)
print(f"Loaded {df.shape[0]} rows.")
# Filter for indels (<=50bp) and SNVs
df = df[df['Type'].str.contains('indel|single nucleotide variant', case=False, na=False)]
df = df[df['Stop'] - df['Start'] <= 50]
df.to_csv(csv_path, index=False)
print(f"Saved filtered data to {csv_path}")
# Generate stacked area plot for clinical significance by year
import matplotlib.pyplot as plt
import os
from collections import defaultdict
# Assume the current year is the latest year in the data
df['Year'] = pd.to_datetime(df['DateLastUpdated'], errors='coerce').dt.year
years = sorted(df['Year'].dropna().unique())
categories = ['Benign', 'Likely benign', 'Likely pathogenic', 'Pathogenic', 'Uncertain significance']
colors = ['#00008B', '#ADD8E6', '#FF6347', '#8B0000', '#A9A9A9']
y_data = defaultdict(list)
for cat in categories:
for year in years:
count = df[(df['Year'] == year) & (df['ClinicalSignificance'] == cat)].shape[0]
y_data[cat].append(count)
x = years
y_stack = [y_data[cat] for cat in categories]
plt.figure(figsize=(8,6))
plt.stackplot(x, *y_stack, colors=colors, labels=categories)
plt.legend(loc='upper left')
plt.ylabel('Number of Variants (Millions)')
plt.title('Clinical Significance of Variants in ClinVar')
os.makedirs('clinvar', exist_ok=True)
plt.savefig('clinvar/clinvar_clinical_significance_oct2024.png', dpi=1200, format='png')
plt.close()
def download_all_historical_data():
"""Download historical data from 2015-2023, recent years 2024-2025, and current data"""
print("Starting ClinVar data download...")
# Download historical files (2015-2023)
historical_years = list(range(2015, 2024))
historical_files = download_historical_files(historical_years)
# Download recent years (2024-2025)
recent_files = download_recent_years()
# Download current data
current_file = download_current_file()
print(f"\nDownload summary:")
print(f"Historical files (2015-2023): {len(historical_files)}")
print(f"Recent files (2024-2025): {len(recent_files)}")
if current_file:
print(f"Current file downloaded: {current_file}")
return historical_files, recent_files, current_file
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--historical":
# Download all historical data
download_all_historical_data()
else:
# Original behavior - download and process current data only
os.makedirs("data", exist_ok=True)
download_file(URL, OUTPUT_GZ)
process_file(OUTPUT_TXT, PROCESSED_CSV)