-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfiletypes.py
More file actions
169 lines (144 loc) · 5.4 KB
/
filetypes.py
File metadata and controls
169 lines (144 loc) · 5.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
##############################################################################
# This module supports the .2bit genome data format
# See http://genome.ucsc.edu/FAQ/FAQformat.html#format7
import numpy as np
import struct
import os
import pyximport; pyximport.install()
from inner_loops import twobit_decode
class BinaryReader(object):
'''
This is a wrapper around struct for easy reading of sequential values in a
raw binary file.
'''
def __init__(self, data):
self.data = data
self.idx = 0
self.e = '<'
return
def reset(self):
self.idx = 0
return
def endian(self, e):
if e == 'little':
self.e = '<'
else:
self.e = '>'
return
def _parse(self, num, size, fmt_char):
fmt = self.e + fmt_char*num
ret = struct.unpack(fmt, self.data[self.idx:self.idx + (size*num)])
self.idx += size*num
if num == 1:
return ret[0]
else:
return ret
def uint32(self, num=1):
return self._parse(num, 4, 'I')
def uint8(self, num=1):
return self._parse(num, 1, 'B')
def char(self, num=1):
ret = self.data[self.idx:self.idx+num]
self.idx += num
return ret
class Sequence(object):
'''
Abstract class that represents a sequence as an array, with an underlying
memmapped numpy array backing it.
'''
def __getitem__(self):
raise NotImplementedError()
def __setitem__(self):
raise NotImplementedError()
class TwoBitSeq(Sequence):
# T C A G
base_lut = np.array([0,1,2,3], dtype=np.uint8)
#base_lut = np.array(map(ord, ['T', 'C', 'A', 'G']), dtype=np.uint8)
def __init__(self, data, offset, name):
self.name = name
self.data = data #
self._rec_start = offset # where this record starts in data
b_read = BinaryReader(data)
b_read.idx = offset
self.size = b_read.uint32() # number of bases
self.nBlockCount = b_read.uint32() # number of N blocks
self.nBlockStarts = [b_read.uint32() for i in xrange(self.nBlockCount)]
self.nBlockSizes = [b_read.uint32() for i in xrange(self.nBlockCount)]
self.maskBlockCount = b_read.uint32()
self.maskBlockStarts = [b_read.uint32() for i in xrange(self.nBlockCount)]
self.maskBlockSizes = [b_read.uint32() for i in xrange(self.nBlockCount)]
reserved = b_read.uint32()
# b_read.idx should now be pointing to the start of the packedDNA record
self._seq_start = b_read.idx
return
def __getitem__(self, name):
base_lut = self.base_lut
data = self.data
try:
if not name.start: start = 0
else: start = name.start
result = twobit_decode(self.data, self.base_lut, self._seq_start,
start, name.stop)
except:
next_byte = data[name/4 + self._seq_start]
bases = (base_lut[(next_byte & 0xC0) >> 6],
base_lut[(next_byte & 0x30) >> 4],
base_lut[(next_byte & 0x0C) >> 2],
base_lut[(next_byte & 0x03) >> 0])
result = np.array([bases[name % 4]])
return result
class TwoBit(object):
'''
This class lets one treat a .2bit file as an array of flat arrays, one for
each sequence. The metadata included in the file and each sequence record
are available as class members. It decompresses on the fly packed gene
data into FASTA. Any caching is up to the user of this class.
'''
def __init__(self, filename):
'''
filename - string
'''
self.filename = filename
parts = filename.split('/')
basename = '/'.join(parts[:-1])
self.name = parts[-1]
self.type = '.2bit'
data = np.memmap(filename, dtype=np.uint8, mode='r')
self.data = data
# Parse the .2bit header - 16 bytes = 4 32-bit fields
b_read = BinaryReader(data)
signature = b_read.uint32()
if signature != 0x1A412743: # compare against magic number
b_read.endian('big')
b_read.reset()
signature = b_read.uint32()
if signature != 0x1A412743:
raise ValueError(".2bit signature didn't match: %s"%hex(signature))
version = b_read.uint32()
if version != 0:
raise ValueError('Version not 0')
self.sequenceCount = b_read.uint32()
reserved = b_read.uint32()
# These (implicitly) maps sequence names to their offset in the file
self.sequences = []
self.sequence_map = {}
idx = 16
for seq_idx in range(self.sequenceCount):
nameSize = b_read.uint8()
name = ''.join(map(chr,b_read.char(nameSize)))
offset = b_read.uint32()
tbs = TwoBitSeq(data, offset, name)
self.sequences.append((name, tbs))
self.sequence_map[name] = tbs
self.total_size = sum(s[1].size for s in self.sequences)
return
##############################################################################
# Tests
if __name__ == "__main__":
data = TwoBit('sacCer3_yeast2011.2bit')
print data.sequenceCount
s1 = data.sequences[0][1]
print s1[0]
print s1[:3]
print s1[:20]
print ''.join(map(chr,s1[:80]))