diff --git a/neo/rawio/intanrawio.py b/neo/rawio/intanrawio.py index be665cb51..b5192e520 100644 --- a/neo/rawio/intanrawio.py +++ b/neo/rawio/intanrawio.py @@ -566,6 +566,7 @@ def _demultiplex_digital_data(self, raw_digital_data, channel_ids, i_start, i_st set high and the rest low, the 16-bit word would be 2^0 + 2^4 + 2^5 = 1 + 16 + 32 = 49. The native_order property for each channel corresponds to its bit position in the packed word. + """ dtype = np.uint16 # We fix this to match the memmap dtype output = np.zeros((i_stop - i_start, len(channel_ids)), dtype=dtype) @@ -952,6 +953,7 @@ def read_rhs(filename, file_format: str): chan_info_dc = dict(chan_info) name = chan_info["native_channel_name"] chan_info_dc["native_channel_name"] = name + "_DC" + chan_info_dc["custom_channel_name"] = chan_info_dc["custom_channel_name"] + "_DC" chan_info_dc["sampling_rate"] = sr chan_info_dc["units"] = "mV" chan_info_dc["gain"] = 19.23 @@ -971,7 +973,6 @@ def read_rhs(filename, file_format: str): # Add stim channels for chan_info in stream_name_to_channel_info_list["RHS2000 amplifier channel"]: # stim channel presence is not indicated in the header so for some formats each amplifier channel has a stim channel, but for other formats this isn't the case. - if file_format == "one-file-per-channel": # Some amplifier channels don't have a corresponding stim channel, # so we need to make sure we don't add channel info for stim channels that don't exist. @@ -985,6 +986,7 @@ def read_rhs(filename, file_format: str): chan_info_stim = dict(chan_info) name = chan_info["native_channel_name"] chan_info_stim["native_channel_name"] = name + "_STIM" + chan_info_stim["custom_channel_name"] = chan_info_stim["custom_channel_name"] + "_STIM" chan_info_stim["sampling_rate"] = sr chan_info_stim["units"] = "A" # Amps chan_info_stim["gain"] = global_info["stim_step_size"] diff --git a/neo/test/rawiotest/test_intanrawio.py b/neo/test/rawiotest/test_intanrawio.py index 1ccf7b911..dc04a7ad3 100644 --- a/neo/test/rawiotest/test_intanrawio.py +++ b/neo/test/rawiotest/test_intanrawio.py @@ -23,7 +23,8 @@ class TestIntanRawIO( "intan/rhd_fpc_multistim_240514_082044/info.rhd", # Multiple digital channels one-file-per-channel rhd "intan/rhs_stim_data_single_file_format/intanTestFile.rhs", # header-attached rhs data with stimulus current "intan/test_fcs_dc_250327_154333/info.rhs", # this is an example of only having dc amp rather than amp files - #"intan/test_fpc_stim_250327_151617/info.rhs", # wrong files Heberto will fix + "intan/test_fpc_stim_250327_151617/info.rhs", # wrong files names Heberto will fix naimgin in the future + ] def test_annotations(self): @@ -70,7 +71,7 @@ def test_annotations(self): ) np.testing.assert_array_equal(signal_array_annotations["board_stream_num"][:10], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]) - def test_correct_reading_one_file_per_channel(self): + def test_correct_reading_one_file_per_channel_amplifiers(self): "Issue: https://github.com/NeuralEnsemble/python-neo/issues/1599" # Test reading of one-file-per-channel format file. The channels should match the raw files file_path = Path(self.get_local_path("intan/intan_fpc_test_231117_052630/info.rhd")) @@ -84,11 +85,84 @@ def test_correct_reading_one_file_per_channel(self): amplifier_file_paths = [path for path in folder_path.iterdir() if "amp" in path.name] channel_names = [path.name[4:-4] for path in amplifier_file_paths] + amplifier_stream_index = 0 for channel_name, amplifier_file_path in zip(channel_names, amplifier_file_paths): data_raw = np.fromfile(amplifier_file_path, dtype=np.int16).squeeze() - data_from_neo = intan_reader.get_analogsignal_chunk(channel_ids=[channel_name], stream_index=0).squeeze() + data_from_neo = intan_reader.get_analogsignal_chunk(channel_ids=[channel_name], stream_index=amplifier_stream_index).squeeze() np.testing.assert_allclose(data_raw, data_from_neo) + def test_correct_reading_one_file_per_channel_rhs_stim(self): + "Zach request for testing that the channel order is correct" + # Test reading of one-file-per-channel format file. The channels should match the raw files + file_path = Path(self.get_local_path("intan/test_fpc_stim_250327_151617/info.rhs")) + intan_reader = IntanRawIO(filename=file_path) + intan_reader.parse_header() + + # This should be the folder where the files of all the channels are stored + folder_path = file_path.parent + + # The paths for the stim channels are stim-A-000.dat, stim-A-001.dat, stim-A-002.dat, + # Whereas the ids are A-001_STIM, A-002_STIM, A-003_STIM, etc + stim_file_paths = [path for path in folder_path.iterdir() if "stim" in path.name] + channel_ids = [f"{p.stem[5:]}_STIM" for p in stim_file_paths] + + stim_stream_index = 2 + for channel_id, amplifier_file_path in zip(channel_ids, stim_file_paths): + data_raw = np.fromfile(amplifier_file_path, dtype=np.uint16) + decoded_data = intan_reader._decode_current_from_stim_data(data_raw, 0, data_raw.shape[0]) + data_from_neo = intan_reader.get_analogsignal_chunk(channel_ids=[channel_id], stream_index=stim_stream_index).squeeze() + np.testing.assert_allclose(decoded_data, data_from_neo) + + + def test_correct_decoding_of_stimulus_current(self): + # See https://github.com/NeuralEnsemble/python-neo/pull/1660 for discussion + # See https://gin.g-node.org/NeuralEnsemble/ephy_testing_data/src/master/intan/README.md#rhs_stim_data_single_file_format + # For a description of the data + + file_path = Path(self.get_local_path("intan/rhs_stim_data_single_file_format/intanTestFile.rhs")) + intan_reader = IntanRawIO(filename=file_path) + intan_reader.parse_header() + + signal_streams = intan_reader.header['signal_streams'] + stream_ids = signal_streams['id'].tolist() + stream_index = stream_ids.index('11') + sampling_rate = intan_reader.get_signal_sampling_rate(stream_index=stream_index) + sig_chunk = intan_reader.get_analogsignal_chunk(stream_index=stream_index, channel_ids=["D-016_STIM"]) + final_stim = intan_reader.rescale_signal_raw_to_float(sig_chunk, stream_index=stream_index, channel_ids=["D-016_STIM"]) + + # This contains only the first pulse and I got this by visual inspection + data_to_test = final_stim[200:250] + + positive_pulse_size = np.max(data_to_test).item() + negative_pulse_size = np.min(data_to_test).item() + + expected_value = 60 * 1e-6# 60 microamperes + + # Assert is close float + assert np.isclose(positive_pulse_size, expected_value) + assert np.isclose(negative_pulse_size, -expected_value) + + # Check that negative pulse is leading + argmin = np.argmin(data_to_test) + argmax = np.argmax(data_to_test) + assert argmin < argmax + + # Check that the negative pulse is 200 us long + negative_pulse_frames = np.where(data_to_test > 0)[0] + number_of_negative_frames = negative_pulse_frames.size + duration_of_negative_pulse = number_of_negative_frames / sampling_rate + + expected_duration = 200 * 1e-6 # 400 microseconds / 2 + assert np.isclose(duration_of_negative_pulse, expected_duration) + + # Check that the positive pulse is 200 us long + positive_pulse_frames = np.where(data_to_test > 0)[0] + number_of_positive_frames = positive_pulse_frames.size + duration_of_positive_pulse = number_of_positive_frames / sampling_rate + expected_duration = 200 * 1e-6 # 400 microseconds / 2 + + assert np.isclose(duration_of_positive_pulse, expected_duration) + def test_correct_decoding_of_stimulus_current(self): # See https://github.com/NeuralEnsemble/python-neo/pull/1660 for discussion