diff --git a/doc/changes/dev/13942.newfeature.rst b/doc/changes/dev/13942.newfeature.rst new file mode 100644 index 00000000000..c67e568d6e0 --- /dev/null +++ b/doc/changes/dev/13942.newfeature.rst @@ -0,0 +1 @@ +Add support for automatic ``cross_talk`` and ``fine_calibration`` adjustments in :func:`mne.preprocessing.maxwell_filter` when these are stored in :class:`mne.Info` during acquisition (e.g., by newer MEGIN acquisition software), by `Eric Larson`_. \ No newline at end of file diff --git a/mne/preprocessing/maxwell.py b/mne/preprocessing/maxwell.py index 059a9a39158..427b1b9b0ef 100644 --- a/mne/preprocessing/maxwell.py +++ b/mne/preprocessing/maxwell.py @@ -3,6 +3,7 @@ # Copyright the MNE-Python contributors. from collections import Counter +from copy import deepcopy from functools import partial from math import factorial from os import path as op @@ -498,6 +499,25 @@ def _prep_maxwell_filter( add_channels = (head_pos is not None) and (not st_only) head_pos = _check_pos(head_pos, coord_frame, raw, st_fixed) mc = _MoveComp(head_pos, coord_frame, raw, mc_interp, reconstruct) + + # cross_talk=None (or True) means "use built-in ones if in info" + _validate_type(cross_talk, (None, bool, dict, "path-like")) + _validate_type(calibration, (None, bool, dict, "path-like")) + if cross_talk is True or cross_talk is None: + need_cross_talk = cross_talk is True + cross_talk = raw.info.get("cross_talk") + if cross_talk is None and need_cross_talk: + raise RuntimeError(f"{cross_talk=} but info['cross_talk'] is None.") + elif cross_talk is False: + cross_talk = None + if calibration is True or calibration is None: + need_calibration = calibration is True + calibration = raw.info.get("fine_calibration") + if calibration is None and need_calibration: + raise RuntimeError(f"{calibration=} but info['fine_calibration'] is None.") + elif calibration is False: + calibration = None + _check_info( raw.info, sss=not st_only, @@ -2174,6 +2194,31 @@ def _overlap_projector(data_int, data_res, corr): return V_principal +def _reformat_fine_cal_dict(fine_cal): + # reformat a possible dict with "cal_chans", "cal_corrs" keys to more standard one + # with "ch_names", "locs", "imb_cals" keys + if "cal_chans" in fine_cal: + # Someday we might need to refactor this for other systems, but we should do + # that when we start building in fine cal to them during acq (probably never) + _GRAD_TYPES = tuple( + getattr(FIFF, f"FIFFV_COIL_VV_PLANAR_{t}") + for t in ("T1", "T2", "T3", "T4", "W") + ) + ch_names = [f"MEG{ch_num:04d}" for ch_num in fine_cal["cal_chans"][:, 0]] + locs = fine_cal["cal_corrs"][:, -12:].astype(float) + coil_types = fine_cal["cal_chans"][:, 1] + is_grad = np.isin(coil_types, _GRAD_TYPES) + starts = np.where(is_grad, 1, 0) + stops = np.where(is_grad, fine_cal["cal_corrs"].shape[1] - 12, 1) + imb_cals = list( + fine_cal["cal_corrs"][ii, start:stop].astype(float) + for ii, (start, stop) in enumerate(zip(starts, stops)) + ) + return dict(ch_names=ch_names, locs=locs, imb_cals=imb_cals) + else: + return deepcopy(fine_cal) + + def _prep_fine_cal(info, fine_cal, *, ignore_ref): from ._fine_cal import read_fine_calibration @@ -2183,6 +2228,7 @@ def _prep_fine_cal(info, fine_cal, *, ignore_ref): fine_cal = read_fine_calibration(fine_cal) else: extra = "dict" + fine_cal = _reformat_fine_cal_dict(fine_cal) logger.info(f" Using fine calibration {extra}") ch_names = _clean_names(info["ch_names"], remove_whitespace=True) info_to_cal = dict() @@ -2230,12 +2276,12 @@ def _update_sensor_geometry(info, fine_cal, ignore_ref): ) # Replace sensor locations (and track differences) for fine calibration - ang_shift = list() used = np.zeros(len(info["chs"]), bool) cal_corrs = list() cal_chans = list() adjust_logged = False - for oi, ci in info_to_cal.items(): + ang_shift = np.zeros(len(info_to_cal)) + for ii, (oi, ci) in enumerate(info_to_cal.items()): assert not used[oi] used[oi] = True info_ch = info["chs"][oi] @@ -2250,30 +2296,25 @@ def _update_sensor_geometry(info, fine_cal, ignore_ref): # EX and EY are orthogonal to EZ. If not, we find the rotation between # the original and fine-cal ez, and rotate EX and EY accordingly: ch_coil_rot = _loc_to_coil_trans(info_ch["loc"])[:3, :3] + _normalize_vectors(ch_coil_rot.T) # column-wise cal_loc = fine_cal["locs"][ci].copy() cal_coil_rot = _loc_to_coil_trans(cal_loc)[:3, :3] - if ( - np.max( - [ - np.abs(np.dot(cal_coil_rot[:, ii], cal_coil_rot[:, 2])) - for ii in range(2) - ] - ) - > 1e-6 - ): # X or Y not orthogonal + _normalize_vectors(cal_coil_rot.T) + # X or Y not orthogonal to Z: + if np.max(np.abs(cal_coil_rot[:, 2] @ cal_coil_rot[:, :2])) > 1e-5: if not adjust_logged: logger.info(" Adjusting non-orthogonal EX and EY") adjust_logged = True # find the rotation matrix that goes from one to the other - this_trans = _find_vector_rotation(ch_coil_rot[:, 2], cal_coil_rot[:, 2]) - cal_loc[3:] = np.dot(this_trans, ch_coil_rot).T.ravel() + R = _find_vector_rotation(ch_coil_rot[:, 2], cal_coil_rot[:, 2]) + cal_coil_rot[:] = R @ ch_coil_rot + _normalize_vectors(cal_coil_rot.T) + cal_loc[3:9] = cal_coil_rot.T.ravel()[:6] # set just X and Y in output # calculate shift angle - v1 = _loc_to_coil_trans(cal_loc)[:3, :3] - _normalize_vectors(v1) - v2 = _loc_to_coil_trans(info_ch["loc"])[:3, :3] - _normalize_vectors(v2) - ang_shift.append(np.sum(v1 * v2, axis=0)) + v2 = _loc_to_coil_trans(info_ch["loc"])[:3, 2:] + _normalize_vectors(v2.T) + ang_shift[ii] = cal_coil_rot[:3, 2] @ v2[:, 0] if oi in grad_picks: extra = [1.0, fine_cal["imb_cals"][ci][0]] else: @@ -2281,19 +2322,21 @@ def _update_sensor_geometry(info, fine_cal, ignore_ref): cal_corrs.append(np.concatenate([extra, cal_loc])) # Adjust channel normal orientations with those from fine calibration # Channel positions are not changed - info_ch["loc"][3:] = cal_loc[3:] + info_ch["loc"][3:] = cal_coil_rot.T.ravel() assert info_ch["coord_frame"] == FIFF.FIFFV_COORD_DEVICE meg_picks = pick_types(info, meg=True, exclude=(), ref_meg=not ignore_ref) assert used[meg_picks].all() assert not used[np.setdiff1d(np.arange(len(used)), meg_picks)].any() # This gets written to the Info struct - sss_cal = dict(cal_corrs=np.array(cal_corrs), cal_chans=np.array(cal_chans)) + sss_cal = dict( + cal_corrs=np.array(cal_corrs, float), + cal_chans=np.array(cal_chans, int), + ) # Log quantification of sensor changes # Deal with numerical precision giving absolute vals slightly more than 1. - ang_shift = np.array(ang_shift) - np.clip(ang_shift, -1.0, 1.0, ang_shift) - np.rad2deg(np.arccos(ang_shift), ang_shift) # Convert to degrees + np.clip(ang_shift, -1.0, 1.0, out=ang_shift) + np.rad2deg(np.arccos(ang_shift), out=ang_shift) # Convert to degrees logger.info( " Adjusted coil orientations by (μ ± σ): " f"{np.mean(ang_shift):0.1f}° ± {np.std(ang_shift):0.1f}° " @@ -2899,8 +2942,13 @@ def find_bad_channels_maxwell( def _read_cross_talk(cross_talk, ch_names): sss_ctc = dict() ctc = None - if cross_talk is not None: - sss_ctc = _read_ctc(cross_talk) + if cross_talk: + if not isinstance(cross_talk, dict): + sss_ctc = _read_ctc(cross_talk) + else: + sss_ctc = deepcopy(cross_talk) + # the way it is on disk + sss_ctc["decoupler"] = sss_ctc["decoupler"].T.tocsc() ctc_chs = sss_ctc["ch_names"] # checking for extra space ambiguity in channel names # between old and new fif files diff --git a/mne/preprocessing/tests/test_maxwell.py b/mne/preprocessing/tests/test_maxwell.py index 3cfd5e547c7..d5988825436 100644 --- a/mne/preprocessing/tests/test_maxwell.py +++ b/mne/preprocessing/tests/test_maxwell.py @@ -2,7 +2,6 @@ # License: BSD-3-Clause # Copyright the MNE-Python contributors. -import pathlib import re from contextlib import contextmanager from functools import partial @@ -35,6 +34,7 @@ compute_maxwell_basis, find_bad_channels_maxwell, maxwell_filter_prepare_emptyroom, + read_fine_calibration, ) from mne.preprocessing import ( maxwell_filter as _maxwell_filter_ola, @@ -588,7 +588,12 @@ def test_basic(): assert len(raw.info["projs"]) == 12 # 11 MEG projs + 1 AVG EEG with use_coil_def(elekta_def_fname): raw_sss = maxwell_filter( - raw, origin=mf_head_origin, regularize=None, bad_condition="ignore" + raw, + origin=mf_head_origin, + regularize=None, + bad_condition="ignore", + calibration=False, # just for completeness + cross_talk=False, ) assert len(raw_sss.info["projs"]) == 1 # avg EEG assert raw_sss.info["projs"][0]["desc"] == "Average EEG reference" @@ -848,28 +853,49 @@ def test_fine_calibration(): sss_fine_cal = read_crop(sss_fine_cal_fname) # Test 1D SSS fine calibration - with use_coil_def(elekta_def_fname): - with catch_logging() as log: - raw_sss = maxwell_filter( - raw, - calibration=fine_cal_fname, - origin=mf_head_origin, - regularize=None, - bad_condition="ignore", - verbose=True, - ) + kwargs = dict( + origin=mf_head_origin, + regularize=None, + bad_condition="ignore", + verbose=True, + ) + with use_coil_def(elekta_def_fname), catch_logging() as log: + raw_sss = maxwell_filter(raw, calibration=fine_cal_fname, **kwargs) log = log.getvalue() assert "Using fine calibration" in log + adj_msg = "Adjusting non-orthogonal EX and EY" + assert adj_msg in log + assert "(max: 1.4" in log assert fine_cal_fname.stem in log assert_meg_snr(raw_sss, sss_fine_cal, 1.3, 180) # similar to MaxFilter py_cal = raw_sss.info["proc_history"][0]["max_info"]["sss_cal"] - assert py_cal is not None - assert len(py_cal) > 0 + assert isinstance(py_cal, dict) and len(py_cal) mf_cal = sss_fine_cal.info["proc_history"][0]["max_info"]["sss_cal"] + assert isinstance(mf_cal, dict) and len(mf_cal) # we identify these differently mf_cal["cal_chans"][mf_cal["cal_chans"][:, 1] == 3022, 1] = 3024 + fc = read_fine_calibration(fine_cal_fname) + assert_allclose(fc["locs"][:, :3], mf_cal["cal_corrs"][:, -12:-9]) # pos + assert_allclose(fc["locs"][:, :3], py_cal["cal_corrs"][:, -12:-9]) + assert_allclose(mf_cal["cal_corrs"][:, -3:], fc["locs"][:, -3:]) # ez + assert_allclose(py_cal["cal_corrs"][:, -3:], fc["locs"][:, -3:]) + assert_allclose( # ex, ey get updated by rotation, which doesn't exactly match + py_cal["cal_corrs"][:, -9:-3], + mf_cal["cal_corrs"][:, -9:-3], + atol=1e-3, + ) assert_allclose(py_cal["cal_chans"], mf_cal["cal_chans"]) - assert_allclose(py_cal["cal_corrs"], mf_cal["cal_corrs"], rtol=1e-3, atol=1e-3) + with pytest.raises(RuntimeError, match=r"info\['fine_calibration'\] is None"): + maxwell_filter(raw, calibration=True, **kwargs) + with raw.info._unlock(): + raw.info["fine_calibration"] = py_cal + with use_coil_def(elekta_def_fname), catch_logging() as log: + raw_sss_builtin = maxwell_filter(raw, **kwargs) + log = log.getvalue() + assert adj_msg not in log + assert_allclose(raw_sss_builtin[:][0], raw_sss[:][0]) + with raw.info._unlock(): + del raw.info["fine_calibration"] # with missing channels raw_missing = raw.copy().load_data() raw_missing.info["bads"] = ["MEG0111", "MEG0943"] # 1 mag, 1 grad @@ -958,17 +984,28 @@ def test_cross_talk(tmp_path): raw = read_crop(raw_fname, (0.0, 1.0)) raw.info["bads"] = bads sss_ctc = read_crop(sss_ctc_fname) + kwargs = dict( + origin=mf_head_origin, + regularize=None, + bad_condition="ignore", + ) with use_coil_def(elekta_def_fname): - raw_sss = maxwell_filter( - raw, - cross_talk=pathlib.Path(ctc_fname), - origin=mf_head_origin, - regularize=None, - bad_condition="ignore", - ) + raw_sss = maxwell_filter(raw, cross_talk=ctc_fname, **kwargs) assert_meg_snr(raw_sss, sss_ctc, 275.0) py_ctc = raw_sss.info["proc_history"][0]["max_info"]["sss_ctc"] assert len(py_ctc) > 0 + + # using built-in cross-talk + with pytest.raises(RuntimeError, match=r"info\['cross_talk'\] is None"): + maxwell_filter(raw, cross_talk=True, **kwargs) + with raw.info._unlock(): + raw.info["cross_talk"] = py_ctc + with use_coil_def(elekta_def_fname): + raw_sss_builtin = maxwell_filter(raw, **kwargs) + assert len(raw_sss_builtin.info["proc_history"][0]["max_info"]["sss_ctc"]) > 0 + assert_allclose(raw_sss[:][0], raw_sss_builtin[:][0]) + del raw_sss_builtin, raw_sss + with pytest.raises(TypeError, match="path-like"): maxwell_filter(raw, cross_talk=raw) with pytest.raises(ValueError, match="Invalid cross-talk FIF"): @@ -1236,7 +1273,7 @@ def test_shielding_factor(tmp_path): coord_frame="meg", regularize=None, origin=mf_meg_origin, - calibration=pathlib.Path(fine_cal_fname), + calibration=fine_cal_fname, ) _assert_shielding(raw_sss, erm_power, 12, 13) # 2.0) @@ -1479,7 +1516,7 @@ def test_all(): coord_frames = ("head", "head", "meg", "head") ctcs = (ctc_fname, ctc_fname, ctc_fname, ctc_mgh_fname) mins = (3.5, 3.5, 1.2, 0.9) - meds = (10.8, 10.2, 3.2, 5.9) + meds = (10.7, 10.1, 3.2, 5.9) st_durs = (1.0, 1.0, 1.0, None) destinations = (None, sample_fname, None, None) origins = (mf_head_origin, mf_head_origin, mf_meg_origin, mf_head_origin) @@ -1496,7 +1533,7 @@ def test_all(): origin=origins[ii], ) sss_mf = read_crop(sss_fnames[ii]) - assert_meg_snr(sss_py, sss_mf, mins[ii], meds[ii], msg=rf) + assert_meg_snr(sss_py, sss_mf, mins[ii], meds[ii], msg=f"{ii=} {rf=!s}") @pytest.mark.slowtest diff --git a/mne/transforms.py b/mne/transforms.py index 76289504785..73241df34f2 100644 --- a/mne/transforms.py +++ b/mne/transforms.py @@ -1464,6 +1464,8 @@ def _find_vector_rotation(a, b): # Rodrigues' rotation formula: # https://en.wikipedia.org/wiki/Rodrigues%27_rotation_formula # http://math.stackexchange.com/a/476311 + assert np.isclose(np.linalg.norm(a), 1.0), np.linalg.norm(a) + assert np.isclose(np.linalg.norm(b), 1.0), np.linalg.norm(b) R = np.eye(3) v = np.cross(a, b) if np.allclose(v, 0.0): # identical @@ -1472,6 +1474,7 @@ def _find_vector_rotation(a, b): c = np.dot(a, b) # cosine of the angle between them vx = _skew_symmetric_cross(v) R += vx + np.dot(vx, vx) * (1 - c) / s + # Now we have: np.allclose(R @ a, b) return R diff --git a/mne/utils/docs.py b/mne/utils/docs.py index ee3d9e67d28..1a86747427f 100644 --- a/mne/utils/docs.py +++ b/mne/utils/docs.py @@ -644,10 +644,16 @@ def _reflow_param_docstring(docstring, has_first_line=True, width=75): # C docdict["calibration_maxwell_cal"] = """ -calibration : str | None - Path to the ``'.dat'`` file with fine calibration coefficients. - File can have 1D or 3D gradiometer imbalance correction. - This file is machine/site-specific. +calibration : path-like | bool | None + Path to the .dat file with fine calibration information. + If None, will use the ``info["fine_calibration"]`` entry if present. + If True, this entry must be present in the info and will be used. + If ``False``, no calibration will be applied. + + .. versionchanged:: 1.13 + Support for ``bool`` to explicitly control calibration using + ``info["fine_calibration"]``, and ``None`` now uses ``info["fine_calibration"]`` + if available. """ docdict["cbar_fmt_topomap"] = """\ @@ -1048,8 +1054,16 @@ def _reflow_param_docstring(docstring, has_first_line=True, width=75): """ docdict["cross_talk_maxwell"] = """ -cross_talk : str | None +cross_talk : path-like | bool | None Path to the FIF file with cross-talk correction information. + If None, will use the ``info["cross_talk"]`` entry if present. + If True, this entry must be present in the info and will be used. + If ``False``, no cross-talk correction will be applied. + + .. versionchanged:: 1.13 + Support for ``bool`` to explicitly control cross-talk correction using + ``info["cross_talk"]``, and ``None`` now uses ``info["cross_talk"]`` + if available. """ # %%