Skip to content

Commit b073185

Browse files
Merge pull request #17 from VH-Lab/claude/pip-installable-intan-rhd-9sc0o
Add Intan RHD2000 file header reader
2 parents cf3d5cd + 0595819 commit b073185

7 files changed

Lines changed: 542 additions & 0 deletions

File tree

.github/workflows/ci.yml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
name: CI
2+
3+
on:
4+
push:
5+
branches: [main, master]
6+
pull_request:
7+
branches: [main, master]
8+
9+
jobs:
10+
lint:
11+
runs-on: ubuntu-latest
12+
steps:
13+
- uses: actions/checkout@v4
14+
- uses: actions/setup-python@v5
15+
with:
16+
python-version: "3.12"
17+
- run: pip install ruff
18+
- run: ruff check vlt/ tests/
19+
20+
test:
21+
runs-on: ubuntu-latest
22+
strategy:
23+
matrix:
24+
python-version: ["3.10", "3.11", "3.12"]
25+
steps:
26+
- uses: actions/checkout@v4
27+
- uses: actions/setup-python@v5
28+
with:
29+
python-version: ${{ matrix.python-version }}
30+
- run: pip install -e ".[test]"
31+
- run: python -m pytest tests/ -v

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ __pycache__/
44
venv/
55
.pytest_cache/
66
test_logs/
7+
*.egg-info/

pyproject.toml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
[build-system]
2+
requires = ["setuptools>=64"]
3+
build-backend = "setuptools.build_meta"
4+
5+
[project]
6+
name = "vhlab-toolbox-python"
7+
version = "0.1.0"
8+
description = "Python port of vhlab-toolbox-matlab"
9+
requires-python = ">=3.9"
10+
dependencies = [
11+
"numpy",
12+
"pandas",
13+
"scipy",
14+
"h5py",
15+
"matplotlib",
16+
]
17+
18+
[project.optional-dependencies]
19+
test = ["pytest"]
20+
21+
[project.urls]
22+
Repository = "https://github.com/VH-Lab/vhlab-toolbox-python"
23+
24+
[tool.pytest.ini_options]
25+
addopts = "--import-mode=importlib"
26+
27+
[tool.ruff.lint]
28+
select = ["E", "F", "W"]
29+
ignore = [
30+
"E501", # line too long — existing code
31+
"E701", # multiple statements on one line — existing style
32+
"E741", # ambiguous variable name — existing style
33+
"F401", # unused imports — many in existing code
34+
"F841", # local variable assigned but never used — existing code
35+
"E722", # bare except — existing code
36+
"E402", # module-level import not at top — existing code
37+
"W293", # blank line contains whitespace — existing code
38+
"W605", # invalid escape sequence — existing code
39+
]
40+
41+
[tool.setuptools.packages.find]
42+
include = ["vlt*"]

tests/vlt/hardware/__init__.py

Whitespace-only changes.

tests/vlt/hardware/test_intan.py

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
"""Tests for vlt.hardware.intan module."""
2+
3+
import os
4+
import struct
5+
import tempfile
6+
7+
import pytest
8+
9+
from vlt.hardware.intan import read_Intan_RHD2000_header
10+
11+
12+
def _write_qstring(fid, s):
13+
"""Write a Qt-style QString to a binary file."""
14+
if not s:
15+
fid.write(struct.pack('<I', 0xFFFFFFFF))
16+
else:
17+
encoded = s.encode('utf-16-le')
18+
fid.write(struct.pack('<I', len(encoded)))
19+
fid.write(encoded)
20+
21+
22+
def _write_channel(fid, native_name, custom_name, native_order, custom_order,
23+
signal_type, channel_enabled, chip_channel, board_stream):
24+
"""Write a single channel definition to the binary file."""
25+
_write_qstring(fid, native_name)
26+
_write_qstring(fid, custom_name)
27+
fid.write(struct.pack('<hhhhhh', native_order, custom_order,
28+
signal_type, channel_enabled, chip_channel, board_stream))
29+
# Trigger settings
30+
fid.write(struct.pack('<hhhh', 0, 0, 0, 0))
31+
# Impedance
32+
fid.write(struct.pack('<ff', 0.0, 0.0))
33+
34+
35+
def create_synthetic_rhd(filepath, sample_rate=20000.0, num_amp_channels=2,
36+
num_aux_channels=3, num_supply_channels=1,
37+
num_adc_channels=0, num_dig_in_channels=0,
38+
num_dig_out_channels=0, num_data_blocks=1000,
39+
version_major=1, version_minor=3):
40+
"""Create a synthetic .rhd file for testing.
41+
42+
With version 1.x, each data block contains 60 samples.
43+
"""
44+
num_samples_per_block = 60 if version_major <= 1 else 128
45+
46+
with open(filepath, 'wb') as fid:
47+
# Magic number
48+
fid.write(struct.pack('<I', 0xC6912702))
49+
# Version
50+
fid.write(struct.pack('<hh', version_major, version_minor))
51+
# Sample rate
52+
fid.write(struct.pack('<f', sample_rate))
53+
# Frequency parameters: dsp_enabled(h) + 6 floats
54+
fid.write(struct.pack('<hffffff', 1, 1.0, 1.0, 7500.0, 1.0, 1.0, 7500.0))
55+
# Notch filter mode
56+
fid.write(struct.pack('<h', 2)) # 60 Hz
57+
# Impedance test frequencies
58+
fid.write(struct.pack('<ff', 1000.0, 1000.0))
59+
# Notes
60+
_write_qstring(fid, '')
61+
_write_qstring(fid, '')
62+
_write_qstring(fid, '')
63+
# Num temp sensor channels (version >= 1.1)
64+
num_temp_channels = 1
65+
fid.write(struct.pack('<h', num_temp_channels))
66+
# Eval board mode (version >= 1.3)
67+
fid.write(struct.pack('<h', 0))
68+
# Reference channel (version >= 2.0)
69+
if version_major > 1:
70+
_write_qstring(fid, '')
71+
72+
# Signal groups: put all channels in one group
73+
total_channels = (num_amp_channels + num_aux_channels +
74+
num_supply_channels + num_adc_channels +
75+
num_dig_in_channels + num_dig_out_channels)
76+
fid.write(struct.pack('<h', 1)) # 1 signal group
77+
78+
_write_qstring(fid, 'Port A')
79+
_write_qstring(fid, 'A')
80+
fid.write(struct.pack('<hhh', 1, total_channels, 0))
81+
82+
order = 0
83+
for i in range(num_amp_channels):
84+
_write_channel(fid, f'A-{i:03d}', f'A-{i:03d}', order, order, 0, 1, i, 0)
85+
order += 1
86+
for i in range(num_aux_channels):
87+
_write_channel(fid, f'AUX-{i}', f'AUX-{i}', order, order, 1, 1, i, 0)
88+
order += 1
89+
for i in range(num_supply_channels):
90+
_write_channel(fid, f'VDD-{i}', f'VDD-{i}', order, order, 2, 1, i, 0)
91+
order += 1
92+
for i in range(num_adc_channels):
93+
_write_channel(fid, f'ADC-{i}', f'ADC-{i}', order, order, 3, 1, i, 0)
94+
order += 1
95+
for i in range(num_dig_in_channels):
96+
_write_channel(fid, f'DIN-{i}', f'DIN-{i}', order, order, 4, 1, i, 0)
97+
order += 1
98+
for i in range(num_dig_out_channels):
99+
_write_channel(fid, f'DOUT-{i}', f'DOUT-{i}', order, order, 5, 1, i, 0)
100+
order += 1
101+
102+
header_size = fid.tell()
103+
104+
# Compute bytes per data block and write dummy data
105+
bytes_per_block = 0
106+
bytes_per_block += num_samples_per_block * 4 # timestamps
107+
bytes_per_block += num_samples_per_block * num_amp_channels * 2 # amplifier
108+
bytes_per_block += (num_samples_per_block // 4) * num_aux_channels * 2 # aux
109+
bytes_per_block += 1 * num_supply_channels * 2 # supply voltage
110+
bytes_per_block += 1 * num_temp_channels * 2 # temp sensors
111+
bytes_per_block += num_samples_per_block * num_adc_channels * 2 # ADC
112+
if num_dig_in_channels > 0:
113+
bytes_per_block += num_samples_per_block * 2 # digital in
114+
if num_dig_out_channels > 0:
115+
bytes_per_block += num_samples_per_block * 2 # digital out
116+
117+
# Write dummy data blocks
118+
fid.write(b'\x00' * (bytes_per_block * num_data_blocks))
119+
120+
return header_size
121+
122+
123+
class TestReadIntanRHD2000Header:
124+
"""Tests for read_Intan_RHD2000_header."""
125+
126+
def test_basic_header_reading(self, tmp_path):
127+
"""Test reading a synthetic RHD file with known parameters."""
128+
filepath = str(tmp_path / 'test.rhd')
129+
create_synthetic_rhd(filepath, sample_rate=20000.0,
130+
num_amp_channels=2, num_data_blocks=1000)
131+
132+
header = read_Intan_RHD2000_header(filepath)
133+
134+
assert header['sample_rate'] == 20000.0
135+
assert header['num_amplifier_channels'] == 2
136+
assert header['num_aux_input_channels'] == 3
137+
assert header['num_supply_voltage_channels'] == 1
138+
# 1000 blocks * 60 samples/block = 60000 samples
139+
assert header['num_samples'] == 60000
140+
141+
def test_sample_rate_and_num_samples(self, tmp_path):
142+
"""Test that sample_rate and num_samples are correct for t0_t1 computation.
143+
144+
With 1000 data blocks at 60 samples/block = 60000 samples,
145+
t0 = 0, t1 = (60000 - 1) / 20000 = 2.99995
146+
"""
147+
filepath = str(tmp_path / 'test.rhd')
148+
create_synthetic_rhd(filepath, sample_rate=20000.0, num_data_blocks=1000)
149+
150+
header = read_Intan_RHD2000_header(filepath)
151+
152+
t0 = 0
153+
t1 = (header['num_samples'] - 1) / header['sample_rate']
154+
assert t0 == 0
155+
assert abs(t1 - 2.99995) < 1e-10
156+
157+
def test_channel_info_populated(self, tmp_path):
158+
"""Test that channel info dicts contain expected fields."""
159+
filepath = str(tmp_path / 'test.rhd')
160+
create_synthetic_rhd(filepath, num_amp_channels=4)
161+
162+
header = read_Intan_RHD2000_header(filepath)
163+
164+
assert len(header['amplifier_channels']) == 4
165+
ch = header['amplifier_channels'][0]
166+
assert 'native_channel_name' in ch
167+
assert 'custom_channel_name' in ch
168+
assert 'chip_channel' in ch
169+
assert ch['signal_type'] == 0
170+
171+
def test_frequency_parameters(self, tmp_path):
172+
"""Test that frequency parameters are read correctly."""
173+
filepath = str(tmp_path / 'test.rhd')
174+
create_synthetic_rhd(filepath, sample_rate=20000.0)
175+
176+
header = read_Intan_RHD2000_header(filepath)
177+
178+
freq = header['frequency_parameters']
179+
assert freq['amplifier_sample_rate'] == 20000.0
180+
assert freq['aux_input_sample_rate'] == 5000.0
181+
assert freq['notch_filter_frequency'] == 60
182+
assert freq['dsp_enabled'] == 1
183+
184+
def test_invalid_magic_number(self, tmp_path):
185+
"""Test that an invalid magic number raises ValueError."""
186+
filepath = str(tmp_path / 'bad.rhd')
187+
with open(filepath, 'wb') as f:
188+
f.write(struct.pack('<I', 0xDEADBEEF))
189+
f.write(b'\x00' * 100)
190+
191+
with pytest.raises(ValueError, match='Not a valid Intan RHD file'):
192+
read_Intan_RHD2000_header(filepath)
193+
194+
def test_version_2(self, tmp_path):
195+
"""Test reading a version 2.x file (128 samples per block)."""
196+
filepath = str(tmp_path / 'test_v2.rhd')
197+
create_synthetic_rhd(filepath, sample_rate=30000.0,
198+
num_amp_channels=1, num_aux_channels=1,
199+
num_supply_channels=1, num_data_blocks=100,
200+
version_major=2, version_minor=0)
201+
202+
header = read_Intan_RHD2000_header(filepath)
203+
204+
assert header['sample_rate'] == 30000.0
205+
assert header['num_amplifier_channels'] == 1
206+
# 100 blocks * 128 samples/block = 12800 samples
207+
assert header['num_samples'] == 12800
208+
assert header['num_samples_per_data_block'] == 128
209+
210+
def test_no_data_blocks(self, tmp_path):
211+
"""Test reading a file with header only (no data)."""
212+
filepath = str(tmp_path / 'header_only.rhd')
213+
create_synthetic_rhd(filepath, num_data_blocks=0)
214+
215+
header = read_Intan_RHD2000_header(filepath)
216+
217+
assert header['num_samples'] == 0
218+
assert header['sample_rate'] == 20000.0

vlt/hardware/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)