diff --git a/EDF.py b/EDF.py index 17e4f79..7fa1560 100644 --- a/EDF.py +++ b/EDF.py @@ -43,6 +43,14 @@ def padtrim(buf, num): + """ Calibate the length w.r.t. buffer parameter and pad the input if the + calibrated length >= 0 (zero). Else trim (slice) the input using it. + + Arguments: + buf (Any): Values to be inserted in EDF Header. + num (int): Value used for padding/trimming the buffer value. + """ + num -= len(buf) if num >= 0: # pad the input to the specified length @@ -52,6 +60,15 @@ def padtrim(buf, num): return buf[0:num] def writebyte(file, content, encoding='ascii'): + """ Writes byte data into file. If string input is given, it will be + converted into byte data first before further operations + + Arguments: + file (FILE): Value having a file pointer. + content (bytes/str): Value to be written in a file. + encoding(str): Value defines the byte encoding to use for strings. + """ + try: # Py3 onwards bytes and strings are separate data format content = bytes(content, encoding) @@ -69,7 +86,12 @@ def writebyte(file, content, encoding='ascii'): file.write(content) def set_offset(chan_obj): - # return a tuple of offset value and calibrate value + """ Return a tuple of offset value and calibrate value. + + Arguments: + chan_obj (dict): Dictionary containing channel information. + """ + physical_range = chan_obj['physical_max'] - chan_obj['physical_min'] digital_range = chan_obj['digital_max'] - chan_obj['digital_min'] calibrate = physical_range / digital_range @@ -79,12 +101,38 @@ def set_offset(chan_obj): class EDFWriter(): + """ Writes EEG Data into BIDS-Standard Compliant EDF/EDF+/BDF/BDF+ files. + + Attributes: + fname (str): Contains the filename. + meas_info (list): Dictionary having information of Measurement + chan_info (list) Dictionary having information of Channels + calibrate (float): Calibration Value for signals + offset (float): Offset Value for signals + n_records (int): number of data records + + Methods: + _initial_state: Used to initialize/reset class attributes. + open: Creates a new file for writing data. + close: Closes the file after writing data. + write_header: Writes the header record to the file. + write_block: Writes the data records to the file. + """ + def __init__(self, fname=None): + """ Class Initializer. Calls _initial_state() method to initialize. + + Arguments: + fname(str): Name of the to-be-created file. + """ + self._initial_state() if fname: self.open(fname) def _initial_state(self): + """ Initializes/Resets Class Attributes.""" + self.fname = None self.meas_info = None self.chan_info = None @@ -93,11 +141,21 @@ def _initial_state(self): self.n_records = 0 def open(self, fname): + """ Opens file to write. + + Arguments: + fname(str): Name of the to-be-created file. + """ + with open(fname, 'wb') as fid: assert fid.tell() == 0 self.fname = fname def close(self): + """ Updates "Number of Data Records" field in header after writing + is complete and closes file. + """ + # it is still needed to update the number of records in the header # this requires copying the whole file content meas_info = self.meas_info @@ -129,6 +187,26 @@ def close(self): self._initial_state() def write_header(self, header): + """ Writes the header record to the file. + + Arguments: + header(tuple): the EDF header is represented as a tuple of + two dictionaries (meas_info{}, chan_info{}). + + meas_info should have the following: { + 'record_length', 'file_ver', 'hour', + 'subject_id', 'recording_id', 'n_records', + 'month', 'subtype', 'second', 'nchan', + 'data_size', 'data_offset', 'lowpass', + 'year', 'highpass', 'day', 'minute'} + + chan_info should have the following: { + 'physical_min', 'transducers', + 'physical_max', 'digital_max', + 'ch_names', 'n_samps', 'units', + 'digital_min'} + """ + meas_info = header[0] chan_info = header[1] meas_size = 256 @@ -230,6 +308,13 @@ def write_header(self, header): self.offset[ch] = 0 def write_block(self, data): + """ Writes list of data into file block by block. + + Arguments: + data (list): A numpy array of 16-bit integer value + representation of signal records. + """ + meas_info = self.meas_info chan_info = self.chan_info with open(self.fname, 'ab') as fid: @@ -254,12 +339,49 @@ def write_block(self, data): class EDFReader(): + """ Reads EEG Data from BIDS-Standard Compliant EDF/EDF+/BDF/BDF+ files. + + Attributes: + fname (str): Contains the filename. + meas_info (list): Dictionary having information of Measurement + chan_info (list) Dictionary having information of Channels + calibrate (float): Calibration Value for signals + offset (float): Offset Value for signals. + + Methods: + _initial_state: Used to initialize/reset class attributes. + open: Opens an existing file to read data. + close: Closes the file after reading data. + read_header: Reads the header record from the file. + read_block: Reads the data record blocks from the file. + read_samples: Reads the data samples from the file. + + Helper Methods: The following are a number of helper functions to make + the behaviour of this EDFReader class more similar to + https://bitbucket.org/cleemesser/python-edf/ + get_signal_text_labels: Convert Signal Text Labels from unicode + to strings. + get_n_signals: Get Number of Channels. + get_signal_freqs: Get Signal Frequencies. + get_n_samples: Get Total Number of Samples. + read_signal: Reads Entire Signal Record and returns the + values as an array. + """ + def __init__(self, fname=None): + """ Class Initializer. Calls _initial_state() method to initialize. + + Arguments: + fname(str): Name of the to-be-created file. + """ + self._initial_state() if fname: self.open(fname) def _initial_state(self): + """ Initializes/Resets Class Attributes.""" + self.fname = None self.meas_info = None self.chan_info = None @@ -267,6 +389,12 @@ def _initial_state(self): self.offset = None def open(self, fname): + """ Opens file to read. + + Arguments: + fname(str): Name of the to-be-opened existing file. + """ + with open(fname, 'rb') as fid: assert fid.tell() == 0 self.fname = fname @@ -274,9 +402,17 @@ def open(self, fname): return self.meas_info, self.chan_info def close(self): + """Closes opened file""" + self._initial_state() def read_header(self): + """ Reads header record from file. + + The contents were copied over from MNE-Python and subsequently + modified to closely reflect the native EDF File standard. + """ + meas_info = {} chan_info = {} with open(self.fname, 'rb') as fid: @@ -366,7 +502,7 @@ def read_header(self): fid.read(32 *meas_info['nchan']).decode() # reserved assert fid.tell() == header_nbyte - + if meas_info['n_records'] == -1: # happens if n_records is not updated at the end of recording file_size = os.path.getsize(self.fname) @@ -385,6 +521,15 @@ def read_header(self): return (meas_info, chan_info) def read_block(self, block): + """ Reads data records blockwise. + + Arguments: + block (int): indicates block number in file. + + Example: + If you want to read data block 63 from file, use read_block(63) + """ + assert block >= 0 meas_info = self.meas_info chan_info = self.chan_info @@ -404,6 +549,14 @@ def read_block(self, block): return data def read_samples(self, channel, begsample, endsample): + """ Reads data sample from data block in file. + + Arguments: + channel (int): Indicates channel number. + begsample (int): Value of beginning sample (to start reading) + endsample (int): Value of ending sample (to stop reading) + """ + meas_info = self.meas_info chan_info = self.chan_info n_samps = chan_info['n_samps'][channel] @@ -417,18 +570,32 @@ def read_samples(self, channel, begsample, endsample): return data[begsample:(endsample + 1)] def get_signal_text_labels(self): + """ Retieve and convert Signal Text Labels from unicode to string.""" + return [str(x) for x in self.chan_info['ch_names']] def get_n_signals(self): + """ Get number of signal channels.""" + return self.meas_info['nchan'] def get_signal_freqs(self): + """ Get signal frequencies.""" + return self.chan_info['n_samps'] / self.meas_info['record_length'] def get_n_samples(self): + """ Get total number of samples.""" + return self.chan_info['n_samps'] * self.meas_info['n_records'] def read_signal(self, chanindx): + """ Reads Entire Signal Record and returns the values as an array. + + Arguments: + chanindx: Indicates channel index. + """ + begsample = 0 endsample = (self.chan_info['n_samps'][chanindx] * self.meas_info['n_records']) - 1 return self.read_samples(chanindx, begsample, endsample)