Skip to content
171 changes: 169 additions & 2 deletions EDF.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@


def padtrim(buf, num):
""" Calibate the length w.r.t. buffer parameter and pad the input if the
calibrated length >= 0 (zero). Else trim (slice) the input using it.

Arguments:
buf (Any): Values to be inserted in EDF Header.
num (int): Value used for padding/trimming the buffer value.
"""

num -= len(buf)
if num >= 0:
# pad the input to the specified length
Expand All @@ -52,6 +60,15 @@ def padtrim(buf, num):
return buf[0:num]

def writebyte(file, content, encoding='ascii'):
""" Writes byte data into file. If string input is given, it will be
converted into byte data first before further operations

Arguments:
file (FILE): Value having a file pointer.
content (bytes/str): Value to be written in a file.
encoding(str): Value defines the byte encoding to use for strings.
"""

try:
# Py3 onwards bytes and strings are separate data format
content = bytes(content, encoding)
Expand All @@ -69,7 +86,12 @@ def writebyte(file, content, encoding='ascii'):
file.write(content)

def set_offset(chan_obj):
# return a tuple of offset value and calibrate value
""" Return a tuple of offset value and calibrate value.

Arguments:
chan_obj (dict): Dictionary containing channel information.
"""

physical_range = chan_obj['physical_max'] - chan_obj['physical_min']
digital_range = chan_obj['digital_max'] - chan_obj['digital_min']
calibrate = physical_range / digital_range
Expand All @@ -79,12 +101,38 @@ def set_offset(chan_obj):


class EDFWriter():
""" Writes EEG Data into BIDS-Standard Compliant EDF/EDF+/BDF/BDF+ files.

Attributes:
fname (str): Contains the filename.
meas_info (list): Dictionary having information of Measurement
chan_info (list) Dictionary having information of Channels
calibrate (float): Calibration Value for signals
offset (float): Offset Value for signals
n_records (int): number of data records

Methods:
_initial_state: Used to initialize/reset class attributes.
open: Creates a new file for writing data.
close: Closes the file after writing data.
write_header: Writes the header record to the file.
write_block: Writes the data records to the file.
"""

def __init__(self, fname=None):
""" Class Initializer. Calls _initial_state() method to initialize.

Arguments:
fname(str): Name of the to-be-created file.
"""

self._initial_state()
if fname:
self.open(fname)

def _initial_state(self):
""" Initializes/Resets Class Attributes."""

self.fname = None
self.meas_info = None
self.chan_info = None
Expand All @@ -93,11 +141,21 @@ def _initial_state(self):
self.n_records = 0

def open(self, fname):
""" Opens file to write.

Arguments:
fname(str): Name of the to-be-created file.
"""

with open(fname, 'wb') as fid:
assert fid.tell() == 0
self.fname = fname

def close(self):
""" Updates "Number of Data Records" field in header after writing
is complete and closes file.
"""

# it is still needed to update the number of records in the header
# this requires copying the whole file content
meas_info = self.meas_info
Expand Down Expand Up @@ -129,6 +187,26 @@ def close(self):
self._initial_state()

def write_header(self, header):
""" Writes the header record to the file.

Arguments:
header(tuple): the EDF header is represented as a tuple of
two dictionaries (meas_info{}, chan_info{}).

meas_info should have the following: {
'record_length', 'file_ver', 'hour',
'subject_id', 'recording_id', 'n_records',
'month', 'subtype', 'second', 'nchan',
'data_size', 'data_offset', 'lowpass',
'year', 'highpass', 'day', 'minute'}

chan_info should have the following: {
'physical_min', 'transducers',
'physical_max', 'digital_max',
'ch_names', 'n_samps', 'units',
'digital_min'}
"""

meas_info = header[0]
chan_info = header[1]
meas_size = 256
Expand Down Expand Up @@ -230,6 +308,13 @@ def write_header(self, header):
self.offset[ch] = 0

def write_block(self, data):
""" Writes list of data into file block by block.

Arguments:
data (list): A numpy array of 16-bit integer value
representation of signal records.
"""

meas_info = self.meas_info
chan_info = self.chan_info
with open(self.fname, 'ab') as fid:
Expand All @@ -254,29 +339,80 @@ def write_block(self, data):


class EDFReader():
""" Reads EEG Data from BIDS-Standard Compliant EDF/EDF+/BDF/BDF+ files.

Attributes:
fname (str): Contains the filename.
meas_info (list): Dictionary having information of Measurement
chan_info (list) Dictionary having information of Channels
calibrate (float): Calibration Value for signals
offset (float): Offset Value for signals.

Methods:
_initial_state: Used to initialize/reset class attributes.
open: Opens an existing file to read data.
close: Closes the file after reading data.
read_header: Reads the header record from the file.
read_block: Reads the data record blocks from the file.
read_samples: Reads the data samples from the file.

Helper Methods: The following are a number of helper functions to make
the behaviour of this EDFReader class more similar to
https://bitbucket.org/cleemesser/python-edf/
get_signal_text_labels: Convert Signal Text Labels from unicode
to strings.
get_n_signals: Get Number of Channels.
get_signal_freqs: Get Signal Frequencies.
get_n_samples: Get Total Number of Samples.
read_signal: Reads Entire Signal Record and returns the
values as an array.
"""

def __init__(self, fname=None):
""" Class Initializer. Calls _initial_state() method to initialize.

Arguments:
fname(str): Name of the to-be-created file.
"""

self._initial_state()
if fname:
self.open(fname)

def _initial_state(self):
""" Initializes/Resets Class Attributes."""

self.fname = None
self.meas_info = None
self.chan_info = None
self.calibrate = None
self.offset = None

def open(self, fname):
""" Opens file to read.

Arguments:
fname(str): Name of the to-be-opened existing file.
"""

with open(fname, 'rb') as fid:
assert fid.tell() == 0
self.fname = fname
self.read_header()
return self.meas_info, self.chan_info

def close(self):
"""Closes opened file"""

self._initial_state()

def read_header(self):
""" Reads header record from file.

The contents were copied over from MNE-Python and subsequently
modified to closely reflect the native EDF File standard.
"""

meas_info = {}
chan_info = {}
with open(self.fname, 'rb') as fid:
Expand Down Expand Up @@ -366,7 +502,7 @@ def read_header(self):

fid.read(32 *meas_info['nchan']).decode() # reserved
assert fid.tell() == header_nbyte

if meas_info['n_records'] == -1:
# happens if n_records is not updated at the end of recording
file_size = os.path.getsize(self.fname)
Expand All @@ -385,6 +521,15 @@ def read_header(self):
return (meas_info, chan_info)

def read_block(self, block):
""" Reads data records blockwise.

Arguments:
block (int): indicates block number in file.

Example:
If you want to read data block 63 from file, use read_block(63)
"""

assert block >= 0
meas_info = self.meas_info
chan_info = self.chan_info
Expand All @@ -404,6 +549,14 @@ def read_block(self, block):
return data

def read_samples(self, channel, begsample, endsample):
""" Reads data sample from data block in file.

Arguments:
channel (int): Indicates channel number.
begsample (int): Value of beginning sample (to start reading)
endsample (int): Value of ending sample (to stop reading)
"""

meas_info = self.meas_info
chan_info = self.chan_info
n_samps = chan_info['n_samps'][channel]
Expand All @@ -417,18 +570,32 @@ def read_samples(self, channel, begsample, endsample):
return data[begsample:(endsample + 1)]

def get_signal_text_labels(self):
""" Retieve and convert Signal Text Labels from unicode to string."""

return [str(x) for x in self.chan_info['ch_names']]

def get_n_signals(self):
""" Get number of signal channels."""

return self.meas_info['nchan']

def get_signal_freqs(self):
""" Get signal frequencies."""

return self.chan_info['n_samps'] / self.meas_info['record_length']

def get_n_samples(self):
""" Get total number of samples."""

return self.chan_info['n_samps'] * self.meas_info['n_records']

def read_signal(self, chanindx):
""" Reads Entire Signal Record and returns the values as an array.

Arguments:
chanindx: Indicates channel index.
"""

begsample = 0
endsample = (self.chan_info['n_samps'][chanindx] * self.meas_info['n_records']) - 1
return self.read_samples(chanindx, begsample, endsample)
Expand Down