diff --git a/doc/changes/dev/13932.bugfix.rst b/doc/changes/dev/13932.bugfix.rst new file mode 100644 index 00000000000..bdc6cb9c2eb --- /dev/null +++ b/doc/changes/dev/13932.bugfix.rst @@ -0,0 +1 @@ +Use mffpy-backed parsing for EGI MFF event tracks while tolerating nanosecond timestamps that some files store in ``beginTime``, by `Pragnya Khandelwal`_. \ No newline at end of file diff --git a/mne/fixes.py b/mne/fixes.py index 9269a0989f4..cca5738cd12 100644 --- a/mne/fixes.py +++ b/mne/fixes.py @@ -20,6 +20,7 @@ import operator as operator_module import os import warnings +from datetime import datetime from math import log import numpy as np @@ -136,6 +137,37 @@ def _safe_svd(A, **kwargs): return linalg.svd(A, lapack_driver="gesvd", **kwargs) +def _parse_mffpy_datetime(time_str, *, tzinfo=None): + """Parse an MFF timestamp with nanosecond fractional seconds. + + TODO VERSION: Remove once BEL-Public/mffpy#133 is released. + Upstream issue: https://github.com/BEL-Public/mffpy/issues/138 + """ + if time_str is None: + return None + stripped = time_str.strip() + tz_pos = max(stripped.rfind("+"), stripped.rfind("-")) + tz = "" + core = stripped + if tz_pos > stripped.find("T"): + core = stripped[:tz_pos] + tz = stripped[tz_pos:] + if "." in core: + left, frac = core.split(".", 1) + core = f"{left}.{(frac + '000000')[:6]}" + if tz and ":" in tz: + tz = tz.replace(":", "") + formatted = core + tz + fmt = "%Y-%m-%dT%H:%M:%S.%f%z" if tz else "%Y-%m-%dT%H:%M:%S.%f" + try: + parsed = datetime.strptime(formatted, fmt) + except ValueError: + parsed = datetime.strptime(formatted.split(".")[0], "%Y-%m-%dT%H:%M:%S%z") + if parsed.tzinfo is None and tzinfo is not None: + parsed = parsed.replace(tzinfo=tzinfo) + return parsed + + ############################################################################### # NumPy Generator (NumPy 1.17) diff --git a/mne/io/egi/events.py b/mne/io/egi/events.py index c160ceb208c..82a4e271d16 100644 --- a/mne/io/egi/events.py +++ b/mne/io/egi/events.py @@ -3,12 +3,11 @@ # License: BSD-3-Clause # Copyright the MNE-Python contributors. -from datetime import datetime -from glob import glob -from os.path import basename, join, splitext +from os.path import basename, splitext import numpy as np +from ...fixes import _parse_mffpy_datetime from ...utils import _soft_import, _validate_type, logger, warn @@ -23,7 +22,9 @@ def _read_events(input_fname, info): Header info array. """ n_samples = info["last_samps"][-1] - mff_events, event_codes = _read_mff_events(input_fname, info["sfreq"]) + mff_events, event_codes = _read_mff_events( + input_fname, info["sfreq"], info["meas_dt_local"] + ) info["n_events"] = len(event_codes) info["event_codes"] = event_codes events = np.zeros([info["n_events"], info["n_segments"] * n_samples]) @@ -35,8 +36,8 @@ def _read_events(input_fname, info): return events, info, mff_events -def _read_mff_events(filename, sfreq): - """Extract the events. +def _read_mff_events(filename, sfreq, start_time): + """Extract the events with mffpy. Parameters ---------- @@ -44,56 +45,90 @@ def _read_mff_events(filename, sfreq): File path. sfreq : float The sampling frequency + start_time : datetime + The recording start time used as the event anchor. """ - orig = {} - for xml_file in glob(join(filename, "*.xml")): - xml_type = splitext(basename(xml_file))[0] - et = _parse_xml(xml_file) - if et is not None: - orig[xml_type] = et - xml_files = orig.keys() - xml_events = [x for x in xml_files if x[:7] == "Events_"] - for item in orig["info"]: - if "recordTime" in item: - start_time = _ns2py_time(item["recordTime"]) - break + # Use defusedxml to parse Events XML directly (avoid mffpy's strict + # datetime parsing which may include nanosecond fractions). We still use + # mffpy.Reader for locating the Events.xml files inside the MFF. + _soft_import("mffpy", "reading EGI MFF data") + _soft_import("defusedxml", "reading EGI MFF data") + import defusedxml.ElementTree as DET + import mffpy + + reader = mffpy.Reader(filename) + try: + files_list = sorted(reader.directory.listdir()) + except Exception: + files_list = [] + tracks = [] + for xml_name in files_list: + stem = splitext(basename(xml_name))[0] + if not stem.startswith("Events"): + continue + with reader.directory.filepointer(stem) as fp: + try: + root = DET.parse(fp).getroot() + except Exception: + # fallback: try reading as bytes and parse string + try: + fp.seek(0) + txt = fp.read() + root = DET.fromstring(txt) + except Exception as exc2: + warn( + f"Could not parse the XML file {xml_name}: {exc2}", + RuntimeWarning, + ) + continue + # identify eventTrack root (namespace-insensitive) + if _ns(root.tag) == "eventTrack": + tracks.append(root) + markers = [] code = [] - for xml in xml_events: - for event in orig[xml][2:]: - event_start = _ns2py_time(event["beginTime"]) - start = (event_start - start_time).total_seconds() - if event["code"] not in code: - code.append(event["code"]) - marker = { - "name": event["code"], - "start": start, - "start_sample": int(np.trunc(start * sfreq)), - "end": start + float(event["duration"]) / 1e9, - "chan": None, - } - markers.append(marker) - events_tims = dict() - for ev in code: - trig_samp = list( - c["start_sample"] for n, c in enumerate(markers) if c["name"] == ev - ) - events_tims.update({ev: trig_samp}) + for root in tracks: + # each child 'event' element + for event_el in root.findall("{*}event"): + # extract fields by tag name ignoring namespace + ev = {} + for child in event_el: + tag = _ns(child.tag) + ev[tag] = child.text + # parse times and duration + event_start = _parse_mffpy_datetime( + ev.get("beginTime"), tzinfo=start_time.tzinfo + ) + if event_start is None: + continue + start_sec = (event_start - start_time).total_seconds() + code_str = ev.get("code", "") + if code_str not in code: + code.append(code_str) + # duration in xml is typically in nanoseconds + duration = None + if ev.get("duration") is not None: + try: + duration = int(ev.get("duration")) / 1e9 + except Exception: + duration = None + markers.append( + { + "name": code_str, + "start": start_sec, + "start_sample": int(np.trunc(start_sec * sfreq)), + "end": start_sec + (duration if duration is not None else 0.0), + "chan": None, + } + ) + + events_tims = { + ev: [marker["start_sample"] for marker in markers if marker["name"] == ev] + for ev in code + } return events_tims, code -def _parse_xml(xml_file: str) -> list[dict[str, str]] | None: - """Parse XML file.""" - defusedxml = _soft_import("defusedxml", "reading EGI MFF data") - try: - xml = defusedxml.ElementTree.parse(xml_file) - except defusedxml.ElementTree.ParseError as e: - warn(f"Could not parse the XML file {xml_file}: {e}") - return - root = xml.getroot() - return _xml2list(root) - - def _xml2list(root): """Parse XML item.""" output = [] @@ -150,15 +185,6 @@ def _xml2dict(root): return output -def _ns2py_time(nstime): - """Parse times.""" - nsdate = nstime[0:10] - nstime0 = nstime[11:26] - nstime00 = nsdate + " " + nstime0 - pytime = datetime.strptime(nstime00, "%Y-%m-%d %H:%M:%S.%f") - return pytime - - def _combine_triggers(data, remapping=None): """Combine binary triggers.""" new_trigger = np.zeros(data.shape[1]) diff --git a/mne/io/egi/tests/test_egi.py b/mne/io/egi/tests/test_egi.py index 09d1946e108..6963394fe38 100644 --- a/mne/io/egi/tests/test_egi.py +++ b/mne/io/egi/tests/test_egi.py @@ -593,7 +593,7 @@ def test_egi_mff_bad_xml(tmp_path): """Test that corrupt XML files are gracefully handled.""" pytest.importorskip("defusedxml") mff_fname = copytree_rw(egi_mff_fname, tmp_path / "test_egi_bad_xml.mff") - bad_xml = mff_fname / "bad.xml" + bad_xml = mff_fname / "Events_bad.xml" bad_xml.write_text("", encoding="utf-8") # Missing coordinate file (mff_fname / "coordinates.xml").unlink()