forked from phbradley/tcr-dist
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutil.py
More file actions
226 lines (180 loc) · 7.78 KB
/
util.py
File metadata and controls
226 lines (180 loc) · 7.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
from basic import *
import cdr3s_human
from scipy.cluster import hierarchy
from scipy.spatial import distance
import os
import html_colors
verbose = __name__ == '__main__'
def get_rep( gene, organism ):
assert gene.startswith('TR')
vj = gene[3]
if vj == 'V':
rep = cdr3s_human.all_loopseq_representative[ organism ][ gene ]
else:
rep = cdr3s_human.all_jseq_representative[ organism ][ gene ]
return rep
def get_mm1_rep( gene, organism ):
assert gene.startswith('TR')
vj = gene[3]
if vj == 'V':
rep = cdr3s_human.all_loopseq_representative_mm1[ organism ][ gene ]
else:
rep = cdr3s_human.all_jseq_representative[ organism ][ gene ]
return rep
def get_rep_ignoring_allele( gene, organism ):
rep = get_rep( gene, organism )
rep = rep[:rep.index('*')]
return rep
def tree_sort( old_l, distances, return_leaves=True ): ## average linkage
assert len(distances) == len(old_l)
if len(old_l)==1:
leaves = [0]
else:
y = distance.squareform( distances, checks=True )
Z = hierarchy.average( y )
#c,coph_dists = hierarchy.cophenet(Z,y)
leaves = hierarchy.leaves_list( Z )
new_l = [ old_l[x] for x in leaves ]
if not return_leaves:
return new_l
else:
return new_l, leaves
def get_top_genes( blast_hits_string ):
hits = dict( [ ( x.split(':')[0], int( x.split(':')[1] ) ) for x in blast_hits_string.split(';') ] )
top_score = max( hits.values() )
return set( [ x for x,y in hits.iteritems() if y >= top_score ] )
def get_top_reps( blast_hits_string, organism ):
hits = dict( [ ( x.split(':')[0], int( x.split(':')[1] ) ) for x in blast_hits_string.split(';') ] )
top_score = max( hits.values() )
vj = hits.keys()[0][3]
if vj == 'V':
rep_map = cdr3s_human.all_loopseq_representative[ organism ]
else:
assert vj == 'J'
rep_map = cdr3s_human.all_jseq_representative[ organism ]
return set( [ rep_map[x] for x,y in hits.iteritems() if y >= top_score ] )
def reps_from_genes( genes, organism, mm1=False, trim_allele=False ):
## if genes is a set we can't index into it
vj = [ x[3] for x in genes ][0]
if vj == 'V':
if mm1:
rep_map = cdr3s_human.all_loopseq_representative_mm1[ organism ]
else:
rep_map = cdr3s_human.all_loopseq_representative[ organism ]
else:
assert vj == 'J'
rep_map = cdr3s_human.all_jseq_representative[ organism ]
reps = set( [ rep_map[x] for x in genes ] )
if trim_allele:
reps = set( ( x[:x.index('*')] for x in reps ) )
return reps
def readme( pngfile, text ):
"""Generate some readme text associated to an image file, that will be incorporated into the
big html results file by run_basic_analysis.py"""
out = open(pngfile+'.readme','w')
cmd = ' '.join(argv)
out.write("""
<u>Command</u>:
{}
<br><br>
<u>Filename</u>:
{}
<br><br>
<u>Readme</u>:
{}
<br><br>
""".format(cmd, pngfile, text))
out.close()
## setup a mapping that we can use for counting when allowing mm1s and also ignoring alleles
allele2mm1_rep_gene_for_counting = {}
def get_mm1_rep_ignoring_allele( gene, organism ): # helper fxn
rep = get_mm1_rep( gene, organism )
rep = rep[:rep.index('*')]
return rep
for organism in ['human','mouse']:
allele2mm1_rep_gene_for_counting[ organism ] = {}
for chain in 'AB':
## look at gene/allele maps
## V
vj_alleles = { 'V': [ x for x in cdr3s_human.all_loopseq_representative[ organism ].keys() if x[2] == chain ],
'J': [ x for x in cdr3s_human.all_jseq_representative[ organism ].keys() if x[2] == chain ] }
for vj, alleles in vj_alleles.iteritems():
gene2rep = {}
gene2alleles = {}
rep_gene2alleles = {}
for allele in alleles:
assert allele[2] == chain
gene = allele[:allele.index('*')]
rep_gene = get_mm1_rep_ignoring_allele( allele, organism )
if rep_gene not in rep_gene2alleles:
rep_gene2alleles[ rep_gene ] = []
rep_gene2alleles[ rep_gene ].append( allele )
if gene not in gene2rep:
gene2rep[gene] = set()
gene2alleles[gene] = []
gene2rep[ gene ].add( rep_gene )
gene2alleles[gene].append( allele )
merge_rep_genes = {}
for gene,reps in gene2rep.iteritems():
if len(reps)>1:
assert vj=='V'
if verbose:
print 'multireps:',organism, gene, reps
for allele in gene2alleles[gene]:
print cdr3s_human.all_merged_loopseqs[organism][allele], allele, \
get_rep(allele,organism), get_mm1_rep(allele,organism)
## we are going to merge these reps
## which one should we choose?
l = [ (len(rep_gene2alleles[rep]), rep ) for rep in reps ]
l.sort()
l.reverse()
assert l[0][0] > l[1][0]
toprep = l[0][1]
for (count,rep) in l:
if rep in merge_rep_genes:
assert rep == toprep and merge_rep_genes[rep] == rep
merge_rep_genes[ rep ] = toprep
for allele in alleles:
count_rep = get_mm1_rep_ignoring_allele( allele, organism )
if count_rep in merge_rep_genes:
count_rep = merge_rep_genes[ count_rep ]
allele2mm1_rep_gene_for_counting[ organism ][ allele] = count_rep
if verbose:
print 'allele2mm1_rep_gene_for_counting:',organism, allele, count_rep
def get_mm1_rep_gene_for_counting( allele, organism ):
global allele2mm1_rep_gene_for_counting
return allele2mm1_rep_gene_for_counting[ organism ][ allele ]
def countreps_from_genes( genes, organism ):
global allele2mm1_rep_gene_for_counting
reps = set( ( allele2mm1_rep_gene_for_counting(x,organism) for x in genes ) )
return reps
def assign_label_reps_and_colors_based_on_most_common_genes_in_repertoire( tcr_infos, organism ):
## assumes that each element of tcr_infos is a dictionary with fields that would have come from parse_tsv_line
## uses the *_countreps info that was filled in by read_pair_seqs.py
## the _label_rep* fields get over-written if they were present
for segtype in segtypes_lowercase:
countreps_tag = segtype+'_countreps'
rep_tag = segtype+'_label_rep'
color_tag = segtype+'_label_rep_color' ## where we will store the rep info
counts = {}
for tcr_info in tcr_infos:
reps = tcr_info[countreps_tag].split(';')
for rep in reps:
counts[rep] = counts.get(rep,0)+1
newcounts = {}
for tcr_info in tcr_infos:
reps = tcr_info[countreps_tag].split(';')
toprep = max( [ ( counts[x],x) for x in reps ] )[1]
tcr_info[rep_tag] = toprep ## doesnt have allele info anymore
newcounts[toprep] = newcounts.get(toprep,0)+1
l = [(y,x) for x,y in newcounts.iteritems()]
l.sort()
l.reverse()
rep_colors = dict( zip( [x[1] for x in l], html_colors.get_rank_colors_no_lights(len(l)) ) )
for tcr_info in tcr_infos:
tcr_info[ color_tag ] = rep_colors[ tcr_info[ rep_tag ] ]
return ## we modified the elements of the tcr_infos list in place
# if __name__ == '__main__':
# for organism in allele2mm1_rep_gene_for_counting:
# for allele in allele2mm1_rep_gene_for_counting[ organism ]:
# print 'get_mm1_rep_gene_for_counting