Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/dev/13942.newfeature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for automatic ``cross_talk`` and ``fine_calibration`` adjustments in :func:`mne.preprocessing.maxwell_filter` when these are stored in :class:`mne.Info` during acquisition (e.g., by newer MEGIN acquisition software), by `Eric Larson`_.
98 changes: 73 additions & 25 deletions mne/preprocessing/maxwell.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Copyright the MNE-Python contributors.

from collections import Counter
from copy import deepcopy
from functools import partial
from math import factorial
from os import path as op
Expand Down Expand Up @@ -498,6 +499,25 @@ def _prep_maxwell_filter(
add_channels = (head_pos is not None) and (not st_only)
head_pos = _check_pos(head_pos, coord_frame, raw, st_fixed)
mc = _MoveComp(head_pos, coord_frame, raw, mc_interp, reconstruct)

# cross_talk=None (or True) means "use built-in ones if in info"
_validate_type(cross_talk, (None, bool, dict, "path-like"))
_validate_type(calibration, (None, bool, dict, "path-like"))
if cross_talk is True or cross_talk is None:
need_cross_talk = cross_talk is True
cross_talk = raw.info.get("cross_talk")
if cross_talk is None and need_cross_talk:
raise RuntimeError(f"{cross_talk=} but info['cross_talk'] is None.")
elif cross_talk is False:
cross_talk = None
if calibration is True or calibration is None:
need_calibration = calibration is True
calibration = raw.info.get("fine_calibration")
if calibration is None and need_calibration:
raise RuntimeError(f"{calibration=} but info['fine_calibration'] is None.")
elif calibration is False:
calibration = None

_check_info(
raw.info,
sss=not st_only,
Expand Down Expand Up @@ -2174,6 +2194,31 @@ def _overlap_projector(data_int, data_res, corr):
return V_principal


def _reformat_fine_cal_dict(fine_cal):
# reformat a possible dict with "cal_chans", "cal_corrs" keys to more standard one
# with "ch_names", "locs", "imb_cals" keys
if "cal_chans" in fine_cal:
# Someday we might need to refactor this for other systems, but we should do
# that when we start building in fine cal to them during acq (probably never)
_GRAD_TYPES = tuple(
getattr(FIFF, f"FIFFV_COIL_VV_PLANAR_{t}")
for t in ("T1", "T2", "T3", "T4", "W")
)
ch_names = [f"MEG{ch_num:04d}" for ch_num in fine_cal["cal_chans"][:, 0]]
locs = fine_cal["cal_corrs"][:, -12:].astype(float)
coil_types = fine_cal["cal_chans"][:, 1]
is_grad = np.isin(coil_types, _GRAD_TYPES)
starts = np.where(is_grad, 1, 0)
stops = np.where(is_grad, fine_cal["cal_corrs"].shape[1] - 12, 1)
imb_cals = list(
fine_cal["cal_corrs"][ii, start:stop].astype(float)
for ii, (start, stop) in enumerate(zip(starts, stops))
)
return dict(ch_names=ch_names, locs=locs, imb_cals=imb_cals)
else:
return deepcopy(fine_cal)


def _prep_fine_cal(info, fine_cal, *, ignore_ref):
from ._fine_cal import read_fine_calibration

Expand All @@ -2183,6 +2228,7 @@ def _prep_fine_cal(info, fine_cal, *, ignore_ref):
fine_cal = read_fine_calibration(fine_cal)
else:
extra = "dict"
fine_cal = _reformat_fine_cal_dict(fine_cal)
logger.info(f" Using fine calibration {extra}")
ch_names = _clean_names(info["ch_names"], remove_whitespace=True)
info_to_cal = dict()
Expand Down Expand Up @@ -2230,12 +2276,12 @@ def _update_sensor_geometry(info, fine_cal, ignore_ref):
)

# Replace sensor locations (and track differences) for fine calibration
ang_shift = list()
used = np.zeros(len(info["chs"]), bool)
cal_corrs = list()
cal_chans = list()
adjust_logged = False
for oi, ci in info_to_cal.items():
ang_shift = np.zeros(len(info_to_cal))
for ii, (oi, ci) in enumerate(info_to_cal.items()):
assert not used[oi]
used[oi] = True
info_ch = info["chs"][oi]
Expand All @@ -2250,50 +2296,47 @@ def _update_sensor_geometry(info, fine_cal, ignore_ref):
# EX and EY are orthogonal to EZ. If not, we find the rotation between
# the original and fine-cal ez, and rotate EX and EY accordingly:
ch_coil_rot = _loc_to_coil_trans(info_ch["loc"])[:3, :3]
_normalize_vectors(ch_coil_rot.T) # column-wise
cal_loc = fine_cal["locs"][ci].copy()
cal_coil_rot = _loc_to_coil_trans(cal_loc)[:3, :3]
if (
np.max(
[
np.abs(np.dot(cal_coil_rot[:, ii], cal_coil_rot[:, 2]))
for ii in range(2)
]
)
> 1e-6
): # X or Y not orthogonal
_normalize_vectors(cal_coil_rot.T)
# X or Y not orthogonal to Z:
if np.max(np.abs(cal_coil_rot[:, 2] @ cal_coil_rot[:, :2])) > 1e-5:
if not adjust_logged:
logger.info(" Adjusting non-orthogonal EX and EY")
adjust_logged = True
# find the rotation matrix that goes from one to the other
this_trans = _find_vector_rotation(ch_coil_rot[:, 2], cal_coil_rot[:, 2])
cal_loc[3:] = np.dot(this_trans, ch_coil_rot).T.ravel()
R = _find_vector_rotation(ch_coil_rot[:, 2], cal_coil_rot[:, 2])
cal_coil_rot[:] = R @ ch_coil_rot
_normalize_vectors(cal_coil_rot.T)
cal_loc[3:9] = cal_coil_rot.T.ravel()[:6] # set just X and Y in output

# calculate shift angle
v1 = _loc_to_coil_trans(cal_loc)[:3, :3]
_normalize_vectors(v1)
v2 = _loc_to_coil_trans(info_ch["loc"])[:3, :3]
_normalize_vectors(v2)
ang_shift.append(np.sum(v1 * v2, axis=0))
v2 = _loc_to_coil_trans(info_ch["loc"])[:3, 2:]
_normalize_vectors(v2.T)
ang_shift[ii] = cal_coil_rot[:3, 2] @ v2[:, 0]
if oi in grad_picks:
extra = [1.0, fine_cal["imb_cals"][ci][0]]
else:
extra = [fine_cal["imb_cals"][ci][0], 0.0]
cal_corrs.append(np.concatenate([extra, cal_loc]))
# Adjust channel normal orientations with those from fine calibration
# Channel positions are not changed
info_ch["loc"][3:] = cal_loc[3:]
info_ch["loc"][3:] = cal_coil_rot.T.ravel()
assert info_ch["coord_frame"] == FIFF.FIFFV_COORD_DEVICE
meg_picks = pick_types(info, meg=True, exclude=(), ref_meg=not ignore_ref)
assert used[meg_picks].all()
assert not used[np.setdiff1d(np.arange(len(used)), meg_picks)].any()
# This gets written to the Info struct
sss_cal = dict(cal_corrs=np.array(cal_corrs), cal_chans=np.array(cal_chans))
sss_cal = dict(
cal_corrs=np.array(cal_corrs, float),
cal_chans=np.array(cal_chans, int),
)

# Log quantification of sensor changes
# Deal with numerical precision giving absolute vals slightly more than 1.
ang_shift = np.array(ang_shift)
np.clip(ang_shift, -1.0, 1.0, ang_shift)
np.rad2deg(np.arccos(ang_shift), ang_shift) # Convert to degrees
np.clip(ang_shift, -1.0, 1.0, out=ang_shift)
np.rad2deg(np.arccos(ang_shift), out=ang_shift) # Convert to degrees
logger.info(
" Adjusted coil orientations by (μ ± σ): "
f"{np.mean(ang_shift):0.1f}° ± {np.std(ang_shift):0.1f}° "
Expand Down Expand Up @@ -2899,8 +2942,13 @@ def find_bad_channels_maxwell(
def _read_cross_talk(cross_talk, ch_names):
sss_ctc = dict()
ctc = None
if cross_talk is not None:
sss_ctc = _read_ctc(cross_talk)
if cross_talk:
if not isinstance(cross_talk, dict):
sss_ctc = _read_ctc(cross_talk)
else:
sss_ctc = deepcopy(cross_talk)
# the way it is on disk
sss_ctc["decoupler"] = sss_ctc["decoupler"].T.tocsc()
ctc_chs = sss_ctc["ch_names"]
# checking for extra space ambiguity in channel names
# between old and new fif files
Expand Down
87 changes: 62 additions & 25 deletions mne/preprocessing/tests/test_maxwell.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# License: BSD-3-Clause
# Copyright the MNE-Python contributors.

import pathlib
import re
from contextlib import contextmanager
from functools import partial
Expand Down Expand Up @@ -35,6 +34,7 @@
compute_maxwell_basis,
find_bad_channels_maxwell,
maxwell_filter_prepare_emptyroom,
read_fine_calibration,
)
from mne.preprocessing import (
maxwell_filter as _maxwell_filter_ola,
Expand Down Expand Up @@ -588,7 +588,12 @@ def test_basic():
assert len(raw.info["projs"]) == 12 # 11 MEG projs + 1 AVG EEG
with use_coil_def(elekta_def_fname):
raw_sss = maxwell_filter(
raw, origin=mf_head_origin, regularize=None, bad_condition="ignore"
raw,
origin=mf_head_origin,
regularize=None,
bad_condition="ignore",
calibration=False, # just for completeness
cross_talk=False,
)
assert len(raw_sss.info["projs"]) == 1 # avg EEG
assert raw_sss.info["projs"][0]["desc"] == "Average EEG reference"
Expand Down Expand Up @@ -848,28 +853,49 @@ def test_fine_calibration():
sss_fine_cal = read_crop(sss_fine_cal_fname)

# Test 1D SSS fine calibration
with use_coil_def(elekta_def_fname):
with catch_logging() as log:
raw_sss = maxwell_filter(
raw,
calibration=fine_cal_fname,
origin=mf_head_origin,
regularize=None,
bad_condition="ignore",
verbose=True,
)
kwargs = dict(
origin=mf_head_origin,
regularize=None,
bad_condition="ignore",
verbose=True,
)
with use_coil_def(elekta_def_fname), catch_logging() as log:
raw_sss = maxwell_filter(raw, calibration=fine_cal_fname, **kwargs)
log = log.getvalue()
assert "Using fine calibration" in log
adj_msg = "Adjusting non-orthogonal EX and EY"
assert adj_msg in log
assert "(max: 1.4" in log
assert fine_cal_fname.stem in log
assert_meg_snr(raw_sss, sss_fine_cal, 1.3, 180) # similar to MaxFilter
py_cal = raw_sss.info["proc_history"][0]["max_info"]["sss_cal"]
assert py_cal is not None
assert len(py_cal) > 0
assert isinstance(py_cal, dict) and len(py_cal)
mf_cal = sss_fine_cal.info["proc_history"][0]["max_info"]["sss_cal"]
assert isinstance(mf_cal, dict) and len(mf_cal)
# we identify these differently
mf_cal["cal_chans"][mf_cal["cal_chans"][:, 1] == 3022, 1] = 3024
fc = read_fine_calibration(fine_cal_fname)
assert_allclose(fc["locs"][:, :3], mf_cal["cal_corrs"][:, -12:-9]) # pos
assert_allclose(fc["locs"][:, :3], py_cal["cal_corrs"][:, -12:-9])
assert_allclose(mf_cal["cal_corrs"][:, -3:], fc["locs"][:, -3:]) # ez
assert_allclose(py_cal["cal_corrs"][:, -3:], fc["locs"][:, -3:])
assert_allclose( # ex, ey get updated by rotation, which doesn't exactly match
py_cal["cal_corrs"][:, -9:-3],
mf_cal["cal_corrs"][:, -9:-3],
atol=1e-3,
)
assert_allclose(py_cal["cal_chans"], mf_cal["cal_chans"])
assert_allclose(py_cal["cal_corrs"], mf_cal["cal_corrs"], rtol=1e-3, atol=1e-3)
with pytest.raises(RuntimeError, match=r"info\['fine_calibration'\] is None"):
maxwell_filter(raw, calibration=True, **kwargs)
with raw.info._unlock():
raw.info["fine_calibration"] = py_cal
with use_coil_def(elekta_def_fname), catch_logging() as log:
raw_sss_builtin = maxwell_filter(raw, **kwargs)
log = log.getvalue()
assert adj_msg not in log
assert_allclose(raw_sss_builtin[:][0], raw_sss[:][0])
with raw.info._unlock():
del raw.info["fine_calibration"]
# with missing channels
raw_missing = raw.copy().load_data()
raw_missing.info["bads"] = ["MEG0111", "MEG0943"] # 1 mag, 1 grad
Expand Down Expand Up @@ -958,17 +984,28 @@ def test_cross_talk(tmp_path):
raw = read_crop(raw_fname, (0.0, 1.0))
raw.info["bads"] = bads
sss_ctc = read_crop(sss_ctc_fname)
kwargs = dict(
origin=mf_head_origin,
regularize=None,
bad_condition="ignore",
)
with use_coil_def(elekta_def_fname):
raw_sss = maxwell_filter(
raw,
cross_talk=pathlib.Path(ctc_fname),
origin=mf_head_origin,
regularize=None,
bad_condition="ignore",
)
raw_sss = maxwell_filter(raw, cross_talk=ctc_fname, **kwargs)
assert_meg_snr(raw_sss, sss_ctc, 275.0)
py_ctc = raw_sss.info["proc_history"][0]["max_info"]["sss_ctc"]
assert len(py_ctc) > 0

# using built-in cross-talk
with pytest.raises(RuntimeError, match=r"info\['cross_talk'\] is None"):
maxwell_filter(raw, cross_talk=True, **kwargs)
with raw.info._unlock():
raw.info["cross_talk"] = py_ctc
with use_coil_def(elekta_def_fname):
raw_sss_builtin = maxwell_filter(raw, **kwargs)
assert len(raw_sss_builtin.info["proc_history"][0]["max_info"]["sss_ctc"]) > 0
assert_allclose(raw_sss[:][0], raw_sss_builtin[:][0])
del raw_sss_builtin, raw_sss

with pytest.raises(TypeError, match="path-like"):
maxwell_filter(raw, cross_talk=raw)
with pytest.raises(ValueError, match="Invalid cross-talk FIF"):
Expand Down Expand Up @@ -1236,7 +1273,7 @@ def test_shielding_factor(tmp_path):
coord_frame="meg",
regularize=None,
origin=mf_meg_origin,
calibration=pathlib.Path(fine_cal_fname),
calibration=fine_cal_fname,
)
_assert_shielding(raw_sss, erm_power, 12, 13) # 2.0)

Expand Down Expand Up @@ -1479,7 +1516,7 @@ def test_all():
coord_frames = ("head", "head", "meg", "head")
ctcs = (ctc_fname, ctc_fname, ctc_fname, ctc_mgh_fname)
mins = (3.5, 3.5, 1.2, 0.9)
meds = (10.8, 10.2, 3.2, 5.9)
meds = (10.7, 10.1, 3.2, 5.9)
st_durs = (1.0, 1.0, 1.0, None)
destinations = (None, sample_fname, None, None)
origins = (mf_head_origin, mf_head_origin, mf_meg_origin, mf_head_origin)
Expand All @@ -1496,7 +1533,7 @@ def test_all():
origin=origins[ii],
)
sss_mf = read_crop(sss_fnames[ii])
assert_meg_snr(sss_py, sss_mf, mins[ii], meds[ii], msg=rf)
assert_meg_snr(sss_py, sss_mf, mins[ii], meds[ii], msg=f"{ii=} {rf=!s}")


@pytest.mark.slowtest
Expand Down
3 changes: 3 additions & 0 deletions mne/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -1464,6 +1464,8 @@ def _find_vector_rotation(a, b):
# Rodrigues' rotation formula:
# https://en.wikipedia.org/wiki/Rodrigues%27_rotation_formula
# http://math.stackexchange.com/a/476311
assert np.isclose(np.linalg.norm(a), 1.0), np.linalg.norm(a)
assert np.isclose(np.linalg.norm(b), 1.0), np.linalg.norm(b)
R = np.eye(3)
v = np.cross(a, b)
if np.allclose(v, 0.0): # identical
Expand All @@ -1472,6 +1474,7 @@ def _find_vector_rotation(a, b):
c = np.dot(a, b) # cosine of the angle between them
vx = _skew_symmetric_cross(v)
R += vx + np.dot(vx, vx) * (1 - c) / s
# Now we have: np.allclose(R @ a, b)
return R


Expand Down
24 changes: 19 additions & 5 deletions mne/utils/docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,10 +644,16 @@ def _reflow_param_docstring(docstring, has_first_line=True, width=75):
# C

docdict["calibration_maxwell_cal"] = """
calibration : str | None
Path to the ``'.dat'`` file with fine calibration coefficients.
File can have 1D or 3D gradiometer imbalance correction.
This file is machine/site-specific.
calibration : path-like | bool | None
Path to the .dat file with fine calibration information.
If None, will use the ``info["fine_calibration"]`` entry if present.
If True, this entry must be present in the info and will be used.
Comment on lines +649 to +650
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
If None, will use the ``info["fine_calibration"]`` entry if present.
If True, this entry must be present in the info and will be used.
If ``None``, will use the ``info["fine_calibration"]`` entry if present.
If ``True``, this entry must be present in the info and will be used.

If ``False``, no calibration will be applied.
.. versionchanged:: 1.13
Support for ``bool`` to explicitly control calibration using
``info["fine_calibration"]``, and ``None`` now uses ``info["fine_calibration"]``
if available.
"""

docdict["cbar_fmt_topomap"] = """\
Expand Down Expand Up @@ -1048,8 +1054,16 @@ def _reflow_param_docstring(docstring, has_first_line=True, width=75):
"""

docdict["cross_talk_maxwell"] = """
cross_talk : str | None
cross_talk : path-like | bool | None
Path to the FIF file with cross-talk correction information.
If None, will use the ``info["cross_talk"]`` entry if present.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
If None, will use the ``info["cross_talk"]`` entry if present.
If ``None``, will use the ``info["cross_talk"]`` entry if present.

If True, this entry must be present in the info and will be used.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
If True, this entry must be present in the info and will be used.
If ``True``, this entry must be present in the info and will be used.

If ``False``, no cross-talk correction will be applied.
.. versionchanged:: 1.13
Support for ``bool`` to explicitly control cross-talk correction using
``info["cross_talk"]``, and ``None`` now uses ``info["cross_talk"]``
if available.
"""

# %%
Expand Down
Loading