diff --git a/EDF.py b/EDF.py index 7f08e02..a53ce0c 100644 --- a/EDF.py +++ b/EDF.py @@ -8,15 +8,17 @@ import re import warnings + def padtrim(buf, num): num -= len(buf) - if num>=0: + if num >= 0: # pad the input to the specified length return str(buf) + ' ' * num else: # trim the input to the specified length return buf[0:num] + #################################################################################################### # the EDF header is represented as a tuple of (meas_info, chan_info) # meas_info should have ['record_length', 'magic', 'hour', 'subject_id', 'recording_id', 'n_records', 'month', 'subtype', 'second', 'nchan', 'data_size', 'data_offset', 'lowpass', 'year', 'highpass', 'day', 'minute'] @@ -65,7 +67,6 @@ def close(self): self.calibrate = None self.offset = None self.n_records = 0 - return def writeHeader(self, header): meas_info = header[0] @@ -76,18 +77,18 @@ def writeHeader(self, header): assert(fid.tell() == 0) # fill in the missing or incomplete information - if not 'subject_id' in meas_info: + if 'subject_id' not in meas_info: meas_info['subject_id'] = '' - if not 'recording_id' in meas_info: + if 'recording_id' not in meas_info: meas_info['recording_id'] = '' - if not 'subtype' in meas_info: + if 'subtype' not in meas_info: meas_info['subtype'] = 'edf' nchan = meas_info['nchan'] - if not 'ch_names' in chan_info or len(chan_info['ch_names'])chan_info['physical_max'][i]: - warnings.warn('Value exceeds physical_max: '+ str(max(raw))); + assert(len(raw) == chan_info['n_samps'][i]) + if min(raw) < chan_info['physical_min'][i]: + warnings.warn('Value exceeds physical_min: ' + str(min(raw))) + if max(raw) > chan_info['physical_max'][i]: + warnings.warn('Value exceeds physical_max: ' + str(max(raw))) raw -= self.offset[i] # FIXME I am not sure about the order of calibrate and offset raw /= self.calibrate[i] @@ -167,6 +168,7 @@ def writeBlock(self, data): #################################################################################################### + class EDFReader(): def __init__(self, fname=None): self.fname = None @@ -203,8 +205,8 @@ def readHeader(self): meas_info['subject_id'] = fid.read(80).strip().decode() # subject id meas_info['recording_id'] = fid.read(80).strip().decode() # recording id - day, month, year = [int(x) for x in re.findall('(\d+)', fid.read(8).decode())] - hour, minute, second = [int(x) for x in re.findall('(\d+)', fid.read(8).decode())] + day, month, year = [int(x) for x in re.findall(r'(\d+)', fid.read(8).decode())] + hour, minute, second = [int(x) for x in re.findall(r'(\d+)', fid.read(8).decode())] meas_info['day'] = day meas_info['month'] = month meas_info['year'] = year @@ -249,14 +251,14 @@ def readHeader(self): chan_info['digital_max'] = digital_max = np.array([float(fid.read(8).decode()) for ch in channels]) prefiltering = [fid.read(80).strip().decode() for ch in channels][:-1] - highpass = np.ravel([re.findall('HP:\s+(\w+)', filt) for filt in prefiltering]) - lowpass = np.ravel([re.findall('LP:\s+(\w+)', filt) for filt in prefiltering]) - high_pass_default = 0. + highpass = np.ravel([re.findall(r'HP:\s+(\w+)', filt) for filt in prefiltering]) + lowpass = np.ravel([re.findall(r'LP:\s+(\w+)', filt) for filt in prefiltering]) + highpass_default = 0. if highpass.size == 0: - meas_info['highpass'] = high_pass_default + meas_info['highpass'] = highpass_default elif all(highpass): if highpass[0] == 'NaN': - meas_info['highpass'] = high_pass_default + meas_info['highpass'] = highpass_default elif highpass[0] == 'DC': meas_info['highpass'] = 0. else: @@ -280,27 +282,27 @@ def readHeader(self): # number of samples per record chan_info['n_samps'] = n_samps = np.array([int(fid.read(8).decode()) for ch in channels]) - fid.read(32 *meas_info['nchan']).decode() # reserved + fid.read(32 * meas_info['nchan']).decode() # reserved assert fid.tell() == header_nbytes - if meas_info['n_records']==-1: + if meas_info['n_records'] == -1: # this happens if the n_records is not updated at the end of recording - tot_samps = (os.path.getsize(self.fname)-meas_info['data_offset'])/meas_info['data_size'] - meas_info['n_records'] = tot_samps/sum(n_samps) + tot_samps = (os.path.getsize(self.fname) - meas_info['data_offset']) / meas_info['data_size'] + meas_info['n_records'] = tot_samps / sum(n_samps) - self.calibrate = (chan_info['physical_max'] - chan_info['physical_min'])/(chan_info['digital_max'] - chan_info['digital_min']); - self.offset = chan_info['physical_min'] - self.calibrate * chan_info['digital_min']; + self.calibrate = (chan_info['physical_max'] - chan_info['physical_min']) / (chan_info['digital_max'] - chan_info['digital_min']) + self.offset = chan_info['physical_min'] - self.calibrate * chan_info['digital_min'] for ch in channels: - if self.calibrate[ch]<0: - self.calibrate[ch] = 1; - self.offset[ch] = 0; + if self.calibrate[ch] < 0: + self.calibrate[ch] = 1 + self.offset[ch] = 0 self.meas_info = meas_info self.chan_info = chan_info return (meas_info, chan_info) def readBlock(self, block): - assert(block>=0) + assert(block >= 0) meas_info = self.meas_info chan_info = self.chan_info data = [] @@ -309,7 +311,7 @@ def readBlock(self, block): blocksize = np.sum(chan_info['n_samps']) * meas_info['data_size'] fid.seek(meas_info['data_offset'] + block * blocksize) for i in range(meas_info['nchan']): - buf = fid.read(chan_info['n_samps'][i]*meas_info['data_size']) + buf = fid.read(chan_info['n_samps'][i] * meas_info['data_size']) raw = np.asarray(unpack('<{}h'.format(chan_info['n_samps'][i]), buf), dtype=np.float32) raw *= self.calibrate[i] raw += self.offset[i] # FIXME I am not sure about the order of calibrate and offset @@ -325,8 +327,8 @@ def readSamples(self, channel, begsample, endsample): data = self.readBlock(begblock)[channel] for block in range(begblock+1, endblock+1): data = np.append(data, self.readBlock(block)[channel]) - begsample -= begblock*n_samps - endsample -= begblock*n_samps + begsample -= begblock * n_samps + endsample -= begblock * n_samps return data[begsample:(endsample+1)] #################################################################################################### @@ -348,12 +350,13 @@ def getNSamples(self): return self.chan_info['n_samps'] * self.meas_info['n_records'] def readSignal(self, chanindx): - begsample = 0; - endsample = self.chan_info['n_samps'][chanindx] * self.meas_info['n_records'] - 1; + begsample = 0 + endsample = self.chan_info['n_samps'][chanindx] * self.meas_info['n_records'] - 1 return self.readSamples(chanindx, begsample, endsample) #################################################################################################### + if False: file_in = EDFReader() file_in.open('/Users/roboos/day 01[10.03].edf')