diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..46b1534 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,47 @@ +name: CI + +on: + push: + branches: [main, master] + pull_request: + branches: [main, master] + +jobs: + lint: + name: Lint + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install dependencies + run: pip install black ruff + + - name: Check formatting with black + run: black --check src/ tests/ + + - name: Lint with ruff + run: ruff check src/ tests/ + + test: + name: Test (Python ${{ matrix.python-version }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.10", "3.11", "3.12"] + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install package with dev dependencies + run: pip install -e ".[dev]" + + - name: Run tests + run: pytest tests/ -v --tb=short diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..5a650d2 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,33 @@ +# Instructions for AI Agents + +## Overview +NDR-python is a faithful Python port of NDR-matlab (Neuroscience Data Reader). + +## Architecture +- **Lead-Follow:** MATLAB is the source of truth. Python mirrors it exactly. +- **Bridge Contract:** Each sub-package has an `ndr_matlab_python_bridge.yaml` + defining the function names, arguments, and return types. +- **Naming:** Preserve MATLAB names exactly. Use `readchannels_epochsamples`, + not `read_channels_epoch_samples`. + +## Key Classes +- `ndr.reader.base` — Abstract base class. All readers inherit from this. +- `ndr.reader` (wrapper) — High-level interface that delegates to a base reader. +- `ndr.reader.intan_rhd`, `ndr.reader.axon_abf`, etc. — Format-specific readers. + +## Workflow +1. Check the bridge YAML in the target package. +2. If the function is missing, add it based on the MATLAB source. +3. Record the MATLAB git hash in `matlab_last_sync_hash`. +4. Implement the Python code. +5. Run `black` and `ruff check --fix` before committing. +6. Run `pytest` to verify. + +## Testing +- Unit tests: `pytest tests/` +- Symmetry tests: `pytest tests/symmetry/` (excluded from default run) + +## Environment +- Python 3.10+ +- NumPy for all numerical data +- Pydantic for input validation (`@validate_call`) diff --git a/README.md b/README.md index 796cd26..8e3e2c1 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,67 @@ # NDR-python -Neuroscience Data Readers, Python version + +Neuroscience Data Reader - Python implementation. + +A faithful Python port of [NDR-matlab](https://github.com/VH-Lab/NDR-matlab), following a lead-follow architecture where MATLAB is the source of truth. + +## Overview + +NDR (Neuroscience Data Reader) is a lower-level data-reading library used by [NDI](https://github.com/VH-Lab/NDI-matlab). It provides: + +- An abstract base reader class (`ndr.reader.base`) +- A high-level reader wrapper (`ndr.reader`) +- Format-specific reader subclasses (Intan RHD, Axon ABF, CED SMR, etc.) +- Format handler packages with low-level file I/O +- Time, string, data, and file utilities + +## Installation + +```bash +pip install -e ".[dev]" +``` + +## Quick Start + +```python +from ndr.reader_wrapper import Reader + +# Create a reader for Intan RHD files +r = Reader("intan") + +# Read channels from an epoch +epochfiles = ["/path/to/data.rhd"] +channels = r.getchannelsepoch(epochfiles) +data, time = r.read(epochfiles, "ai1-3", t0=0, t1=10) +``` + +## Supported Formats + +| Format | Reader Class | Status | +|--------|-------------|--------| +| Intan RHD | `ndr.reader.intan_rhd.IntanRHD` | Implemented | +| Axon ABF | `ndr.reader.axon_abf.AxonABF` | Implemented (requires `pyabf`) | +| CED SMR | `ndr.reader.ced_smr.CedSMR` | Implemented (requires `neo`) | +| SpikeGadgets REC | `ndr.reader.spikegadgets_rec.SpikeGadgetsRec` | Stub | +| TDT SEV | `ndr.reader.tdt_sev.TdtSev` | Stub | +| Neo | `ndr.reader.neo.NeoReader` | Stub | +| White Matter | `ndr.reader.whitematter.WhiteMatter` | Stub | +| BJG | `ndr.reader.bjg.BJG` | Stub | +| Dabrowska | `ndr.reader.dabrowska.Dabrowska` | Stub | + +## Testing + +```bash +# Run unit tests +pytest tests/ -v + +# Run symmetry tests (cross-language verification) +pytest tests/symmetry/ -v +``` + +## Architecture + +See [AGENTS.md](AGENTS.md) and [docs/developer_notes/](docs/developer_notes/) for details on the lead-follow architecture, bridge YAML protocol, and porting guidelines. + +## License + +CC-BY-NC-SA-4.0 diff --git a/docs/developer_notes/PYTHON_PORTING_GUIDE.md b/docs/developer_notes/PYTHON_PORTING_GUIDE.md new file mode 100644 index 0000000..f9953e4 --- /dev/null +++ b/docs/developer_notes/PYTHON_PORTING_GUIDE.md @@ -0,0 +1,44 @@ +# NDR MATLAB to Python Porting Guide + +## 1. The Core Philosophy: Lead-Follow Architecture +The MATLAB codebase is the **Source of Truth**. The Python version is a "faithful mirror." +When a conflict arises between "Pythonic" style and MATLAB symmetry, **symmetry wins**. + +- **Lead-Follow:** MATLAB defines the logic, hierarchy, and naming. +- **The Contract:** Every package contains an `ndr_matlab_python_bridge.yaml`. + This file is the binding contract for function names, arguments, and return types. + +## 2. Naming & Discovery (The Mirror Rule) +Function and class names must match MATLAB exactly. + +- **Naming Source:** Refer to the local `ndr_matlab_python_bridge.yaml`. +- **Missing Entries:** If a function is not in the bridge file, refer to the MATLAB + source, add the entry to the bridge file, and notify the user. +- **Case Preservation:** Use `readchannels_epochsamples`, not `read_channels_epoch_samples`. +- **Directory Parity:** Python file paths must mirror MATLAB `+namespace` paths + (e.g., `+ndr/+reader` -> `src/ndr/reader/`). + +## 3. The Porting Workflow (The Bridge Protocol) +1. **Check the Bridge:** Open the `ndr_matlab_python_bridge.yaml` in the target package. +2. **Sync the Interface:** If the function is missing or outdated, update the YAML first. +3. **Record the Sync Hash:** Store the short git hash of the MATLAB `.m` file: + `git log -1 --format="%h" -- ` +4. **Implement:** Write Python code to satisfy the interface defined in the YAML. +5. **Log & Notify:** Record the sync date in the YAML's `decision_log`. + +## 4. Input Validation: Pydantic is Mandatory +Use `@pydantic.validate_call` on all public-facing API functions. + +- MATLAB `double`/`numeric` -> Python `float | int` +- MATLAB `char`/`string` -> Python `str` +- MATLAB `{member1, member2}` -> Python `Literal["member1", "member2"]` + +## 5. Multiple Returns (Outputs) +Return multiple values as a **tuple** in the exact order defined in the YAML. + +## 6. Code Style & Linting +- **Black:** The sole code formatter. Line length 100. +- **Ruff:** The primary linter. Run `ruff check --fix` before committing. + +## 7. Error Handling +If MATLAB throws an `error`, Python MUST raise a corresponding Exception. diff --git a/docs/developer_notes/ndr_matlab_python_bridge.yaml b/docs/developer_notes/ndr_matlab_python_bridge.yaml new file mode 100644 index 0000000..dfa3985 --- /dev/null +++ b/docs/developer_notes/ndr_matlab_python_bridge.yaml @@ -0,0 +1,40 @@ +# The NDR Bridge Protocol: YAML Specification +# +# Name: ndr_matlab_python_bridge.yaml +# Location: One file per sub-package directory +# (e.g., src/ndr/reader/ndr_matlab_python_bridge.yaml). +# Role: Primary Contract. Defines how MATLAB names and types map to Python. + +project_metadata: + bridge_version: "1.1" + naming_policy: "Strict MATLAB Mirror" + indexing_policy: "Semantic Parity (1-based for user concepts, 0-based for internal data)" + +# When porting a function: +# 1. Check: Does the function/class exist in the YAML? +# 2. Add/Update: If missing or changed, update the YAML first. +# 3. Record Hash: git log -1 --format="%h" -- +# 4. Notify: Tell the user what was added/changed. + +# --- Example: Class --- +# - name: base +# type: class +# matlab_path: "+ndr/+reader/base.m" +# python_path: "ndr/reader/base.py" +# matlab_last_sync_hash: "a4c9e07" +# methods: +# - name: readchannels_epochsamples +# input_arguments: +# - name: channeltype +# type_python: "str" +# - name: channel +# type_python: "int | list[int]" +# - name: epoch +# type_python: "str | int" +# - name: s0 +# type_python: "int" +# - name: s1 +# type_python: "int" +# output_arguments: +# - name: data +# type_python: "numpy.ndarray" diff --git a/docs/developer_notes/ndr_xlang_principles.md b/docs/developer_notes/ndr_xlang_principles.md new file mode 100644 index 0000000..5335b97 --- /dev/null +++ b/docs/developer_notes/ndr_xlang_principles.md @@ -0,0 +1,26 @@ +# NDR Cross-Language (MATLAB/Python) Principles + +- **Status:** Active +- **Scope:** Universal (Applies to all NDR implementations) + +## 1. Indexing & Counting (The Semantic Parity Rule) +- Python uses 0-indexing internally. +- User-facing concepts (Epochs, Channels) use 1-based numbering in both languages. +- Python code accepts `channel_number=1` from user, maps to `data[0]` internally. + +## 2. Data Containers +- Prefer NumPy over lists for numerical data. +- MATLAB `double` array -> `numpy.ndarray` in Python. + +## 3. Multiple Returns +- Python returns multiple values as a tuple in MATLAB signature order. + +## 4. Booleans +- MATLAB `1`/`0` (logical) -> Python `True`/`False`. + +## 5. Strings +- MATLAB `char` and `string` -> Python `str`. +- MATLAB cell array of strings -> Python `list[str]`. + +## 6. Error Philosophy +- No silent failures. If MATLAB errors, Python raises an exception. diff --git a/docs/developer_notes/symmetry_tests.md b/docs/developer_notes/symmetry_tests.md new file mode 100644 index 0000000..fbab568 --- /dev/null +++ b/docs/developer_notes/symmetry_tests.md @@ -0,0 +1,53 @@ +# Cross-Language Symmetry Test Framework + +**Status:** Active +**Scope:** NDR-python <-> NDR-matlab parity + +## Purpose +Symmetry tests verify that data read by one language implementation matches +the other. This ensures the Python and MATLAB NDR stacks remain interoperable. + +## Architecture + +| Phase | Python location | MATLAB location | +|-------|----------------|-----------------| +| **makeArtifacts** | `tests/symmetry/make_artifacts/` | `tests/+ndr/+symmetry/+makeArtifacts/` | +| **readArtifacts** | `tests/symmetry/read_artifacts/` | `tests/+ndr/+symmetry/+readArtifacts/` | + +### Artifact Directory Layout + +``` +/NDR/symmetryTest/ +├── pythonArtifacts/ +│ └── /// +│ ├── readData.json # Channel data, timestamps, etc. +│ └── metadata.json # Channel list, sample rates, epoch info +└── matlabArtifacts/ + └── /// + └── ... (same structure) +``` + +### Workflow + +1. **makeArtifacts** (Python or MATLAB) reads example data files and writes + JSON artifacts containing: channel lists, sample rates, epoch clocks, + t0/t1 boundaries, and actual data samples. + +2. **readArtifacts** (the other language) loads the same example data files, + reads the same channels/epochs, and compares against the stored artifacts. + +Each `readArtifacts` test is parameterized over `{matlabArtifacts, pythonArtifacts}`. + +## Running + +```bash +# Generate artifacts +pytest tests/symmetry/make_artifacts/ -v + +# Verify artifacts +pytest tests/symmetry/read_artifacts/ -v +``` + +## Writing a New Symmetry Test +See NDI-python's `docs/developer_notes/symmetry_tests.md` for the full template. +Adapt the pattern for NDR's reader-centric API. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..1939014 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,62 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "ndr" +version = "0.1.0" +description = "Neuroscience Data Reader - Python implementation" +readme = "README.md" +license = {text = "CC-BY-NC-SA-4.0"} +authors = [{name = "VH-Lab", email = "vhlab@brandeis.edu"}] +maintainers = [{name = "Waltham Data Science"}] +requires-python = ">=3.10" +dependencies = [ + "numpy>=1.20.0", + "pydantic>=2.0", + "vhlab-toolbox-python @ git+https://github.com/VH-Lab/vhlab-toolbox-python.git@main", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.0.0", + "pytest-cov>=4.0.0", + "black>=23.0.0", + "ruff>=0.1.0", + "mypy>=1.0.0", +] +formats = [ + "pyabf>=2.3.0", + "neo>=0.12.0", + "tdt>=0.5.0", +] + +[tool.hatch.metadata] +allow-direct-references = true + +[tool.hatch.build.targets.wheel] +packages = ["src/ndr"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = ["test_*.py"] +addopts = "-v --tb=short --ignore=tests/symmetry" +markers = [ + "slow: marks tests as slow", + "symmetry: marks cross-language symmetry tests", +] + +[tool.black] +line-length = 100 +target-version = ["py310", "py311", "py312"] + +[tool.ruff] +line-length = 100 +target-version = "py310" + +[tool.ruff.lint] +select = ["E", "W", "F", "I", "B", "C4", "UP"] +ignore = ["E501", "B905", "E402", "B017", "B028"] + +[tool.ruff.lint.per-file-ignores] +"__init__.py" = ["F401", "UP035"] diff --git a/src/ndr/__init__.py b/src/ndr/__init__.py new file mode 100644 index 0000000..ab0204a --- /dev/null +++ b/src/ndr/__init__.py @@ -0,0 +1,8 @@ +"""NDR - Neuroscience Data Reader. + +A Python port of NDR-matlab (https://github.com/VH-Lab/NDR-matlab). +""" + +from ndr.reader_wrapper import Reader as reader + +__version__ = "0.1.0" diff --git a/src/ndr/data/__init__.py b/src/ndr/data/__init__.py new file mode 100644 index 0000000..447671f --- /dev/null +++ b/src/ndr/data/__init__.py @@ -0,0 +1,7 @@ +"""NDR data utilities. + +Port of +ndr/+data/ +""" + +from ndr.data.colvec import colvec +from ndr.data.rowvec import rowvec diff --git a/src/ndr/data/colvec.py b/src/ndr/data/colvec.py new file mode 100644 index 0000000..89d7662 --- /dev/null +++ b/src/ndr/data/colvec.py @@ -0,0 +1,25 @@ +"""Column vector utility. + +Port of +ndr/+data/colvec.m +""" + +from __future__ import annotations + +import numpy as np + + +def colvec(x: np.ndarray | list) -> np.ndarray: + """Reshape an array into a column vector. + + Parameters + ---------- + x : array-like + Input data. + + Returns + ------- + numpy.ndarray + Column vector (N, 1) shape. + """ + x = np.asarray(x) + return x.reshape(-1, 1) diff --git a/src/ndr/data/rowvec.py b/src/ndr/data/rowvec.py new file mode 100644 index 0000000..1ddd0cc --- /dev/null +++ b/src/ndr/data/rowvec.py @@ -0,0 +1,25 @@ +"""Row vector utility. + +Port of +ndr/+data/rowvec.m +""" + +from __future__ import annotations + +import numpy as np + + +def rowvec(x: np.ndarray | list) -> np.ndarray: + """Reshape an array into a row vector. + + Parameters + ---------- + x : array-like + Input data. + + Returns + ------- + numpy.ndarray + Row vector (1, N) shape. + """ + x = np.asarray(x) + return x.reshape(1, -1) diff --git a/src/ndr/file/__init__.py b/src/ndr/file/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ndr/file/textfile2char.py b/src/ndr/file/textfile2char.py new file mode 100644 index 0000000..2b0f1e6 --- /dev/null +++ b/src/ndr/file/textfile2char.py @@ -0,0 +1,24 @@ +"""Text file reading utility. + +Port of +ndr/+file/textfile2char.m +""" + +from __future__ import annotations + +from pathlib import Path + + +def textfile2char(filepath: str | Path) -> str: + """Read a text file and return its contents as a string. + + Parameters + ---------- + filepath : str or Path + Path to the text file. + + Returns + ------- + str + File contents. + """ + return Path(filepath).read_text(encoding="utf-8") diff --git a/src/ndr/format/__init__.py b/src/ndr/format/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ndr/format/axon/__init__.py b/src/ndr/format/axon/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ndr/format/bjg/__init__.py b/src/ndr/format/bjg/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ndr/format/ced/__init__.py b/src/ndr/format/ced/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ndr/format/dabrowska/__init__.py b/src/ndr/format/dabrowska/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ndr/format/intan/__init__.py b/src/ndr/format/intan/__init__.py new file mode 100644 index 0000000..fddefe1 --- /dev/null +++ b/src/ndr/format/intan/__init__.py @@ -0,0 +1,10 @@ +"""NDR Intan format handler. + +Port of +ndr/+format/+intan/ +""" + +from ndr.format.intan.read_Intan_RHD2000_datafile import ( + Intan_RHD2000_blockinfo, + read_Intan_RHD2000_datafile, +) +from ndr.format.intan.read_Intan_RHD2000_header import read_Intan_RHD2000_header diff --git a/src/ndr/format/intan/manufacturer/__init__.py b/src/ndr/format/intan/manufacturer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ndr/format/intan/ndr_matlab_python_bridge.yaml b/src/ndr/format/intan/ndr_matlab_python_bridge.yaml new file mode 100644 index 0000000..568ef62 --- /dev/null +++ b/src/ndr/format/intan/ndr_matlab_python_bridge.yaml @@ -0,0 +1,53 @@ +project_metadata: + bridge_version: "1.1" + naming_policy: "Strict MATLAB Mirror" + indexing_policy: "Semantic Parity (1-based for user concepts, 0-based for internal data)" + +functions: + - name: read_Intan_RHD2000_header + matlab_path: "+ndr/+format/+intan/read_Intan_RHD2000_header.m" + python_path: "ndr/format/intan/read_Intan_RHD2000_header.py" + input_arguments: + - name: filename + type_python: "str | Path" + output_arguments: + - name: header + type_python: "dict" + + - name: read_Intan_RHD2000_datafile + matlab_path: "+ndr/+format/+intan/read_Intan_RHD2000_datafile.m" + python_path: "ndr/format/intan/read_Intan_RHD2000_datafile.py" + input_arguments: + - name: filename + type_python: "str | Path" + - name: header_filename + type_python: "str" + - name: channeltype + type_python: "str" + - name: channel + type_python: "int | list[int]" + - name: t0 + type_python: "float" + - name: t1 + type_python: "float" + output_arguments: + - name: data + type_python: "numpy.ndarray" + + - name: Intan_RHD2000_blockinfo + matlab_path: "+ndr/+format/+intan/Intan_RHD2000_blockinfo.m" + python_path: "ndr/format/intan/read_Intan_RHD2000_datafile.py" + input_arguments: + - name: filename + type_python: "str | Path" + - name: header + type_python: "dict" + output_arguments: + - name: blockinfo + type_python: "dict" + - name: bytes_per_block + type_python: "int" + - name: bytes_present + type_python: "int" + - name: num_data_blocks + type_python: "int" diff --git a/src/ndr/format/intan/read_Intan_RHD2000_datafile.py b/src/ndr/format/intan/read_Intan_RHD2000_datafile.py new file mode 100644 index 0000000..15d42fd --- /dev/null +++ b/src/ndr/format/intan/read_Intan_RHD2000_datafile.py @@ -0,0 +1,358 @@ +"""Read data from Intan RHD2000 single-file format. + +Port of +ndr/+format/+intan/read_Intan_RHD2000_datafile.m +""" + +from __future__ import annotations + +import struct +from pathlib import Path +from typing import Any + +import numpy as np + +from ndr.format.intan.read_Intan_RHD2000_header import read_Intan_RHD2000_header + + +def Intan_RHD2000_blockinfo( + filename: str | Path, header: dict[str, Any] +) -> tuple[dict[str, Any], int, int, int]: + """Get block structure information for an RHD file. + + Parameters + ---------- + filename : str or Path + Path to the .rhd file. + header : dict + Header from read_Intan_RHD2000_header. + + Returns + ------- + tuple of (blockinfo, bytes_per_block, bytes_present, num_data_blocks) + """ + filename = Path(filename) + filesize = filename.stat().st_size + + num_amplifier = len(header.get("amplifier_channels", [])) + num_aux = len(header.get("aux_input_channels", [])) + num_supply = len(header.get("supply_voltage_channels", [])) + num_adc = len(header.get("board_adc_channels", [])) + num_dig_in = len(header.get("board_dig_in_channels", [])) + num_dig_out = len(header.get("board_dig_out_channels", [])) + dc_amp_saved = header.get("dc_amplifier_data_saved", 0) + + samples_per_block = 60 # Intan RHD uses 60 samples per data block + + # Calculate bytes per data block + # timestamp: 4 bytes * 60 samples + bytes_per_block = 4 * samples_per_block + + # amplifier data: 2 bytes * num_channels * 60 samples + bytes_per_block += 2 * num_amplifier * samples_per_block + + # DC amplifier data (if saved): 2 bytes * num_channels * 60 samples + if dc_amp_saved: + bytes_per_block += 2 * num_amplifier * samples_per_block + + # Stim data (v3+): skip for now, most common is v1/v2 + + # aux data: 2 bytes * num_aux * 15 samples (sampled at 1/4 rate) + bytes_per_block += 2 * num_aux * (samples_per_block // 4) + + # supply voltage: 2 bytes * num_supply * 1 sample (sampled at 1/60 rate) + bytes_per_block += 2 * num_supply + + # temp sensor: 2 bytes * num_supply * 1 sample + bytes_per_block += 2 * num_supply + + # board ADC: 2 bytes * num_adc * 60 samples + bytes_per_block += 2 * num_adc * samples_per_block + + # board digital in: 2 bytes * 60 samples (one uint16 per sample for all channels) + if num_dig_in > 0: + bytes_per_block += 2 * samples_per_block + + # board digital out: 2 bytes * 60 samples + if num_dig_out > 0: + bytes_per_block += 2 * samples_per_block + + # Calculate header size (approximate - read until data starts) + # We need to find where the header ends + header_size = _get_header_size(filename, header) + + bytes_present = filesize - header_size + num_data_blocks = bytes_present // bytes_per_block + + blockinfo = { + "samples_per_block": samples_per_block, + "num_amplifier": num_amplifier, + "num_aux": num_aux, + "num_supply": num_supply, + "num_adc": num_adc, + "num_dig_in": num_dig_in, + "num_dig_out": num_dig_out, + "dc_amp_saved": dc_amp_saved, + "header_size": header_size, + } + + return blockinfo, bytes_per_block, bytes_present, num_data_blocks + + +def _get_header_size(filename: Path, header: dict[str, Any]) -> int: + """Determine the header size by reading and counting bytes.""" + # Re-read the file and track position after header parsing + with open(filename, "rb") as f: + # Magic number + version + f.read(4 + 2 + 2) + + main_version = header["data_file_main_version_number"] + + # Sample rate + f.read(4) + + # DSP fields + f.read(2 + 4 + 4) + if main_version > 0: + f.read(4) # lower_settle_bandwidth + f.read(4 + 4 + 4) + if main_version > 0: + f.read(4) # desired_lower_settle_bandwidth + f.read(4) + + # Notch filter + f.read(2 + 4 + 4) + + if main_version > 1: + f.read(2 + 2) # amp_settle_mode, charge_recovery_mode + + # Notes + if main_version > 0: + for _ in range(3): + _read_qstring_skip(f) + + # DC amplifier data saved + if main_version > 1: + f.read(2) + + # Eval board mode + if main_version > 1: + f.read(2) + + # Reference channel + if main_version > 0: + _read_qstring_skip(f) + + # Signal groups + num_signal_groups = struct.unpack(" 0: + f.read(2) # command_stream + f.read(2 + 2 + 2 + 2 + 2 + 4 + 4) + + return f.tell() + + +def _read_qstring_skip(f) -> None: + """Skip a Qt QString in a binary file.""" + length_bytes = f.read(4) + if len(length_bytes) < 4: + return + length = struct.unpack(" np.ndarray: + """Read data from an Intan RHD2000 single-file format. + + Parameters + ---------- + filename : str or Path + Path to the .rhd file. + header_filename : str + Unused (for API compatibility). + channeltype : str + Channel type: 'amp', 'aux', 'din', 'dout', 'adc', 'time'. + channel : int or list of int + Channel number(s) (1-based). + t0 : float + Start time in seconds. + t1 : float + End time in seconds. + + Returns + ------- + numpy.ndarray + Data array with one column per channel. + """ + filename = Path(filename) + header = read_Intan_RHD2000_header(filename) + blockinfo, bytes_per_block, bytes_present, num_data_blocks = Intan_RHD2000_blockinfo( + filename, header + ) + + if isinstance(channel, int): + channel = [channel] + + sr = header["frequency_parameters"]["amplifier_sample_rate"] + samples_per_block = blockinfo["samples_per_block"] + total_samples = samples_per_block * num_data_blocks + + if channeltype == "aux": + sr_actual = header["frequency_parameters"]["aux_input_sample_rate"] + else: + sr_actual = sr + + # Convert time to sample indices + s0 = max(0, int(round(t0 * sr_actual))) + if t1 == float("inf"): + if channeltype == "aux": + s1 = (samples_per_block // 4) * num_data_blocks - 1 + else: + s1 = total_samples - 1 + else: + s1 = int(round(t1 * sr_actual)) + + num_samples = s1 - s0 + 1 + if num_samples <= 0: + return np.array([]).reshape(0, len(channel)) + + # Read the entire data section and extract requested channels + num_amplifier = blockinfo["num_amplifier"] + num_aux = blockinfo["num_aux"] + num_supply = blockinfo["num_supply"] + num_adc = blockinfo["num_adc"] + num_dig_in = blockinfo["num_dig_in"] + num_dig_out = blockinfo["num_dig_out"] + dc_amp_saved = blockinfo["dc_amp_saved"] + header_size = blockinfo["header_size"] + + with open(filename, "rb") as f: + f.seek(header_size) + + # Pre-allocate output arrays + if channeltype == "time": + all_data = np.zeros(total_samples, dtype=np.float64) + elif channeltype == "amp": + all_data = np.zeros((total_samples, num_amplifier), dtype=np.float64) + elif channeltype == "aux": + aux_samples = (samples_per_block // 4) * num_data_blocks + all_data = np.zeros((aux_samples, num_aux), dtype=np.float64) + elif channeltype in ("din", "dout"): + all_data = np.zeros((total_samples, 16), dtype=np.float64) + elif channeltype == "adc": + all_data = np.zeros((total_samples, num_adc), dtype=np.float64) + else: + raise ValueError(f"Unknown channel type '{channeltype}'.") + + for block in range(num_data_blocks): + block_start = block * samples_per_block + + # Timestamps: int32 x 60 + ts_raw = np.frombuffer(f.read(4 * samples_per_block), dtype=np.int32) + + if channeltype == "time": + all_data[block_start : block_start + samples_per_block] = ts_raw / sr + + # Amplifier data: int16 x num_amplifier x 60 + if num_amplifier > 0: + amp_raw = ( + np.frombuffer(f.read(2 * num_amplifier * samples_per_block), dtype=np.uint16) + .reshape(num_amplifier, samples_per_block) + .T + ) + + if channeltype == "amp": + # Convert to microvolts: (raw - 32768) * 0.195 + all_data[block_start : block_start + samples_per_block, :] = ( + amp_raw.astype(np.float64) - 32768.0 + ) * 0.195 + + # DC amplifier data + if dc_amp_saved and num_amplifier > 0: + f.read(2 * num_amplifier * samples_per_block) + + # Stim data (for RHS files, not RHD - skip) + + # Aux input data: uint16 x num_aux x 15 + if num_aux > 0: + aux_per_block = samples_per_block // 4 + aux_raw = ( + np.frombuffer(f.read(2 * num_aux * aux_per_block), dtype=np.uint16) + .reshape(num_aux, aux_per_block) + .T + ) + + if channeltype == "aux": + aux_block_start = block * aux_per_block + all_data[aux_block_start : aux_block_start + aux_per_block, :] = ( + aux_raw.astype(np.float64) * 37.4e-6 + ) + + # Supply voltage: uint16 x num_supply x 1 + if num_supply > 0: + f.read(2 * num_supply) + + # Temp sensor + if num_supply > 0: + f.read(2 * num_supply) + + # Board ADC: uint16 x num_adc x 60 + if num_adc > 0: + adc_raw = ( + np.frombuffer(f.read(2 * num_adc * samples_per_block), dtype=np.uint16) + .reshape(num_adc, samples_per_block) + .T + ) + + if channeltype == "adc": + all_data[block_start : block_start + samples_per_block, :] = adc_raw.astype( + np.float64 + ) + + # Board digital inputs: uint16 x 60 + if num_dig_in > 0: + dig_in_raw = np.frombuffer(f.read(2 * samples_per_block), dtype=np.uint16) + if channeltype == "din": + for bit in range(16): + all_data[block_start : block_start + samples_per_block, bit] = ( + (dig_in_raw >> bit) & 1 + ).astype(np.float64) + + # Board digital outputs: uint16 x 60 + if num_dig_out > 0: + dig_out_raw = np.frombuffer(f.read(2 * samples_per_block), dtype=np.uint16) + if channeltype == "dout": + for bit in range(16): + all_data[block_start : block_start + samples_per_block, bit] = ( + (dig_out_raw >> bit) & 1 + ).astype(np.float64) + + # Extract the requested time range and channels + if channeltype == "time": + return all_data[s0 : s1 + 1].reshape(-1, 1) + else: + # Convert 1-based channel numbers to 0-based indices + ch_indices = [c - 1 for c in channel] + return all_data[s0 : s1 + 1, :][:, ch_indices] diff --git a/src/ndr/format/intan/read_Intan_RHD2000_header.py b/src/ndr/format/intan/read_Intan_RHD2000_header.py new file mode 100644 index 0000000..43cecf0 --- /dev/null +++ b/src/ndr/format/intan/read_Intan_RHD2000_header.py @@ -0,0 +1,171 @@ +"""Read Intan RHD2000 file headers. + +Port of +ndr/+format/+intan/read_Intan_RHD2000_header.m + +This module uses vhlab-toolbox-python's Intan reading capability when available, +and falls back to a native implementation otherwise. +""" + +from __future__ import annotations + +import struct +from pathlib import Path +from typing import Any + + +def _fread_QString(f) -> str: + """Read a Qt QString from a binary file (port of fread_QString.m).""" + length_bytes = f.read(4) + if len(length_bytes) < 4: + return "" + length = struct.unpack(" dict[str, Any]: + """Read a single channel header entry.""" + ch = {} + ch["native_channel_name"] = _fread_QString(f) + ch["custom_channel_name"] = _fread_QString(f) + ch["native_order"] = struct.unpack(" 0: + ch["command_stream"] = struct.unpack(" dict[str, Any]: + """Read the header from an Intan RHD2000 file. + + Parameters + ---------- + filename : str or Path + Path to the .rhd file. + + Returns + ------- + dict + Header information including frequency parameters and channel lists. + """ + filename = Path(filename) + header: dict[str, Any] = {} + + with open(filename, "rb") as f: + # Magic number + magic = struct.unpack(" 0: + freq["actual_lower_settle_bandwidth"] = struct.unpack(" 0: + freq["desired_lower_settle_bandwidth"] = struct.unpack(" 1: + freq["amp_settle_mode"] = struct.unpack(" 0: + header["note1"] = _fread_QString(f) + header["note2"] = _fread_QString(f) + header["note3"] = _fread_QString(f) + + # DC amplifier data saved + if main_version > 1: + header["dc_amplifier_data_saved"] = struct.unpack(" 1: + header["eval_board_mode"] = struct.unpack(" 0: + header["reference_channel"] = _fread_QString(f) + + # Number of signal groups + num_signal_groups = struct.unpack(" int: + """Return the bit depth for a given data type string. + + Parameters + ---------- + datatype : str + Data type string (e.g., 'uint16', 'float64'). + + Returns + ------- + int + Number of bits. + """ + bit_map = { + "int8": 8, + "uint8": 8, + "char": 8, + "int16": 16, + "uint16": 16, + "int32": 32, + "uint32": 32, + "float32": 32, + "single": 32, + "int64": 64, + "uint64": 64, + "float64": 64, + "double": 64, + } + if datatype not in bit_map: + raise ValueError(f"Unknown data type '{datatype}'.") + return bit_map[datatype] diff --git a/src/ndr/fun/getDataTypeString.py b/src/ndr/fun/getDataTypeString.py new file mode 100644 index 0000000..668f3b4 --- /dev/null +++ b/src/ndr/fun/getDataTypeString.py @@ -0,0 +1,29 @@ +"""Data type string utility. + +Port of +ndr/+fun/getDataTypeString.m +""" + +from __future__ import annotations + + +def getDataTypeString(bits: int, is_float: bool = False, is_signed: bool = True) -> str: + """Return a data type string for a given bit depth. + + Parameters + ---------- + bits : int + Number of bits. + is_float : bool + Whether the type is floating point. + is_signed : bool + Whether the type is signed (for integers). + + Returns + ------- + str + Data type string suitable for numpy (e.g., 'float64', 'uint16'). + """ + if is_float: + return f"float{bits}" + prefix = "int" if is_signed else "uint" + return f"{prefix}{bits}" diff --git a/src/ndr/fun/ndrpath.py b/src/ndr/fun/ndrpath.py new file mode 100644 index 0000000..25b0b4e --- /dev/null +++ b/src/ndr/fun/ndrpath.py @@ -0,0 +1,19 @@ +"""NDR path utilities. + +Port of +ndr/+fun/ndrpath.m +""" + +from __future__ import annotations + +from pathlib import Path + + +def ndrpath() -> Path: + """Return the root path of the NDR-python package. + + Returns + ------- + Path + The root directory of the NDR package (src/ndr). + """ + return Path(__file__).parent.parent diff --git a/src/ndr/fun/ndrresource.py b/src/ndr/fun/ndrresource.py new file mode 100644 index 0000000..266fde7 --- /dev/null +++ b/src/ndr/fun/ndrresource.py @@ -0,0 +1,34 @@ +"""NDR resource loading utilities. + +Port of +ndr/+fun/ndrresource.m +""" + +from __future__ import annotations + +import json +from typing import Any + +from ndr.fun.ndrpath import ndrpath + + +def ndrresource(resource_name: str) -> Any: + """Load a JSON resource file from the NDR resource directory. + + Parameters + ---------- + resource_name : str + Name of the resource file (e.g., 'ndr_reader_types.json'). + + Returns + ------- + Any + Parsed JSON content (typically a list or dict). + """ + resource_dir = ndrpath() / "resource" + filepath = resource_dir / resource_name + + if not filepath.exists(): + raise FileNotFoundError(f"NDR resource file '{resource_name}' not found at {filepath}.") + + text = filepath.read_text(encoding="utf-8") + return json.loads(text) diff --git a/src/ndr/globals.py b/src/ndr/globals.py new file mode 100644 index 0000000..1a1388f --- /dev/null +++ b/src/ndr/globals.py @@ -0,0 +1,32 @@ +"""NDR global variables and paths. + +Port of +ndr/globals.m +""" + +from __future__ import annotations + +import tempfile +from pathlib import Path + +from ndr.fun.ndrpath import ndrpath + + +class NDRGlobals: + """Container for NDR global path and configuration variables.""" + + def __init__(self) -> None: + ndr_root = ndrpath() + self.path = { + "path": str(ndr_root), + "preferences": str(ndr_root / "preferences"), + "filecachepath": str(ndr_root / "filecache"), + "temppath": str(Path(tempfile.gettempdir()) / "NDR"), + "testpath": str(Path(tempfile.gettempdir()) / "NDR" / "test"), + } + self.debug = { + "verbose": False, + } + + +# Module-level singleton +ndr_globals = NDRGlobals() diff --git a/src/ndr/known_readers.py b/src/ndr/known_readers.py new file mode 100644 index 0000000..152c3db --- /dev/null +++ b/src/ndr/known_readers.py @@ -0,0 +1,20 @@ +"""Known NDR reader types. + +Port of +ndr/known_readers.m +""" + +from __future__ import annotations + +from ndr.fun.ndrresource import ndrresource + + +def known_readers() -> list[list[str]]: + """Return all known reader file types for NDR readers. + + Returns + ------- + list of list of str + Each entry is the list of type aliases for one reader. + """ + j = ndrresource("ndr_reader_types.json") + return [entry["type"] for entry in j] diff --git a/src/ndr/reader/__init__.py b/src/ndr/reader/__init__.py new file mode 100644 index 0000000..a1b2104 --- /dev/null +++ b/src/ndr/reader/__init__.py @@ -0,0 +1,7 @@ +"""NDR reader classes. + +Port of +ndr/+reader/ +""" + +from ndr.reader.base import Base +from ndr.reader.intan_rhd import IntanRHD diff --git a/src/ndr/reader/axon_abf.py b/src/ndr/reader/axon_abf.py new file mode 100644 index 0000000..d6223b9 --- /dev/null +++ b/src/ndr/reader/axon_abf.py @@ -0,0 +1,129 @@ +"""Axon ABF reader class. + +Port of +ndr/+reader/axon_abf.m +Reads Axon Binary Format (.abf) files using pyabf when available. +""" + +from __future__ import annotations + +from typing import Any + +import numpy as np + +from ndr.reader.base import Base + + +class AxonABF(Base): + """Reader for Axon ABF (.abf) file format. + + Port of ndr.reader.axon_abf. + """ + + def __init__(self) -> None: + super().__init__() + + def readchannels_epochsamples( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + s0: int, + s1: int, + ) -> np.ndarray: + """Read data from specified channels.""" + try: + import pyabf + except ImportError as err: + raise ImportError( + "pyabf is required for reading ABF files. Install with: pip install pyabf" + ) from err + + if isinstance(channel, int): + channel = [channel] + + abf_file = self._filenamefromepochfiles(epochstreams) + abf = pyabf.ABF(abf_file) + + if channeltype in ("time", "timestamp", "t"): + abf.setSweep(sweepNumber=epoch_select - 1, channel=0) + t = abf.sweepX + return t[s0 - 1 : s1].reshape(-1, 1) + + data_list = [] + for ch in channel: + abf.setSweep(sweepNumber=epoch_select - 1, channel=ch - 1) + data_list.append(abf.sweepY[s0 - 1 : s1]) + + return np.column_stack(data_list) + + def readevents_epochsamples_native( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + t0: float, + t1: float, + ) -> tuple[np.ndarray, np.ndarray]: + """Read events (returns empty for ABF).""" + return np.array([]), np.array([]) + + def getchannelsepoch( + self, epochstreams: list[str], epoch_select: int = 1 + ) -> list[dict[str, Any]]: + """List channels available for a given epoch.""" + try: + import pyabf + except ImportError as err: + raise ImportError("pyabf is required for reading ABF files.") from err + + abf_file = self._filenamefromepochfiles(epochstreams) + abf = pyabf.ABF(abf_file) + + channels: list[dict[str, Any]] = [] + channels.append({"name": "t1", "type": "time", "time_channel": 1}) + + for i in range(abf.channelCount): + channels.append({"name": f"ai{i + 1}", "type": "analog_in", "time_channel": 1}) + + return channels + + def samplerate( + self, + epochstreams: list[str], + epoch_select: int, + channeltype: str, + channel: int | list[int], + ) -> float | np.ndarray: + """Get the sample rate.""" + try: + import pyabf + except ImportError as err: + raise ImportError("pyabf is required for reading ABF files.") from err + + abf_file = self._filenamefromepochfiles(epochstreams) + abf = pyabf.ABF(abf_file) + return float(abf.dataRate) + + def t0_t1(self, epochstreams: list[str], epoch_select: int = 1) -> list[list[float]]: + """Return the beginning and end times for an epoch.""" + try: + import pyabf + except ImportError as err: + raise ImportError("pyabf is required for reading ABF files.") from err + + abf_file = self._filenamefromepochfiles(epochstreams) + abf = pyabf.ABF(abf_file) + abf.setSweep(sweepNumber=epoch_select - 1) + t0 = abf.sweepX[0] + t1 = abf.sweepX[-1] + return [[float(t0), float(t1)]] + + @staticmethod + def _filenamefromepochfiles(filename_array: list[str]) -> str: + """Find the .abf file from epoch files.""" + abf_files = [f for f in filename_array if f.lower().endswith(".abf")] + if len(abf_files) != 1: + raise ValueError("Need exactly 1 .abf file per epoch.") + return abf_files[0] diff --git a/src/ndr/reader/base.py b/src/ndr/reader/base.py new file mode 100644 index 0000000..bed2542 --- /dev/null +++ b/src/ndr/reader/base.py @@ -0,0 +1,488 @@ +"""Abstract base class for all NDR readers. + +Port of +ndr/+reader/base.m +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any + +import numpy as np + +from ndr.time.clocktype import ClockType +from ndr.time.fun.samples2times import samples2times as _samples2times +from ndr.time.fun.times2samples import times2samples as _times2samples + + +class Base(ABC): + """Abstract base class for Neuroscience Data Readers. + + All format-specific readers inherit from this class and must implement + the abstract methods. + + Port of ndr.reader.base. + """ + + def __init__(self) -> None: + self.MightHaveTimeGaps: bool = False + + # ------------------------------------------------------------------ + # Concrete methods (base provides default implementations) + # ------------------------------------------------------------------ + + def canbereadtogether(self, channelstruct: list[dict[str, Any]]) -> tuple[bool, str]: + """Check if channels in a channel struct can be read in a single call. + + Parameters + ---------- + channelstruct : list of dict + Each dict has keys: internal_type, internal_number, + internal_channelname, ndr_type, samplerate. + + Returns + ------- + tuple of (bool, str) + (True, '') if channels can be read together, or + (False, error_message) otherwise. + """ + b = True + errormsg = "" + + sr = [ch["samplerate"] for ch in channelstruct] + sr_arr = np.array(sr, dtype=float) + + if not np.all(np.isnan(sr_arr)): + # If all are not NaN, then none can be + if np.any(np.isnan(sr_arr)): + b = False + errormsg = ( + "All samplerates must either be the same number or they must " + "all be NaN, indicating they are all not regularly sampled channels." + ) + else: + sr_unique = np.unique(sr_arr) + if len(sr_unique) != 1: + b = False + errormsg = ( + "All sample rates must be the same for all requested " + "regularly-sampled channels for a single function call." + ) + + return b, errormsg + + def daqchannels2internalchannels( + self, + channelprefix: list[str], + channelnumber: list[int] | np.ndarray, + epochstreams: list[str], + epoch_select: int = 1, + ) -> list[dict[str, Any]]: + """Convert DAQ channel prefixes and numbers to internal channel structures. + + Parameters + ---------- + channelprefix : list of str + Channel prefixes describing channels for this device. + channelnumber : array-like of int + Channel numbers, one per entry in channelprefix. + epochstreams : list of str + File paths comprising the epoch of data. + epoch_select : int + Which epoch in the file to access (usually 1). + + Returns + ------- + list of dict + Each dict has keys: internal_type, internal_number, + internal_channelname, ndr_type, samplerate. + """ + # Abstract class returns empty + return [] + + def epochclock(self, epochstreams: list[str], epoch_select: int = 1) -> list[ClockType]: + """Return the clock types available for this epoch. + + Parameters + ---------- + epochstreams : list of str + File paths comprising the epoch. + epoch_select : int + Which epoch to access. + + Returns + ------- + list of ClockType + Clock types for this epoch. + """ + return [ClockType("dev_local_time")] + + def getchannelsepoch( + self, epochstreams: list[str], epoch_select: int = 1 + ) -> list[dict[str, Any]]: + """List channels available for a given epoch. + + Parameters + ---------- + epochstreams : list of str + File paths comprising the epoch. + epoch_select : int + Which epoch to access. + + Returns + ------- + list of dict + Each dict has keys: name, type, time_channel. + """ + return [] + + def underlying_datatype( + self, + epochstreams: list[str], + epoch_select: int, + channeltype: str, + channel: int | list[int], + ) -> tuple[str, np.ndarray, int]: + """Get the underlying data type for a channel in an epoch. + + Parameters + ---------- + epochstreams : list of str + File paths comprising the epoch. + epoch_select : int + Which epoch to access. + channeltype : str + The type of channel. + channel : int or list of int + Channel number(s). + + Returns + ------- + tuple of (str, numpy.ndarray, int) + (datatype, polynomial_coefficients, datasize_in_bits) + """ + if isinstance(channel, int): + n_channels = 1 + else: + n_channels = len(channel) + + if channeltype in ("analog_in", "analog_out", "auxiliary_in", "time"): + datatype = "float64" + datasize = 64 + p = np.tile([0, 1], (n_channels, 1)) + elif channeltype in ("digital_in", "digital_out"): + datatype = "char" + datasize = 8 + p = np.tile([0, 1], (n_channels, 1)) + elif channeltype in ("eventmarktext", "event", "marker", "text"): + datatype = "float64" + datasize = 64 + p = np.tile([0, 1], (n_channels, 1)) + else: + raise ValueError(f"Unknown channel type '{channeltype}'.") + + return datatype, p, datasize + + @abstractmethod + def readchannels_epochsamples( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + s0: int, + s1: int, + ) -> np.ndarray: + """Read data from specified channels. + + Parameters + ---------- + channeltype : str + Type of channel to read. + channel : int or list of int + Channel number(s) to read (1-based). + epochstreams : list of str + File paths comprising the epoch. + epoch_select : int + Which epoch to access. + s0 : int + Start sample number (1-based). + s1 : int + End sample number (1-based). + + Returns + ------- + numpy.ndarray + Data array with one column per channel. + """ + ... + + @abstractmethod + def readevents_epochsamples_native( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + t0: float, + t1: float, + ) -> tuple[np.ndarray, np.ndarray]: + """Read events or markers for specified channels. + + Parameters + ---------- + channeltype : str + Type of channel to read. + channel : int or list of int + Channel number(s) to read. + epochstreams : list of str + File paths comprising the epoch. + epoch_select : int + Which epoch to access. + t0 : float + Start time. + t1 : float + End time. + + Returns + ------- + tuple of (numpy.ndarray, numpy.ndarray) + (timestamps, data) + """ + ... + + def samplerate( + self, + epochstreams: list[str], + epoch_select: int, + channeltype: str, + channel: int | list[int], + ) -> np.ndarray | float: + """Get the sample rate for specific channels. + + Parameters + ---------- + epochstreams : list of str + File paths comprising the epoch. + epoch_select : int + Which epoch to access. + channeltype : str + Type of channel. + channel : int or list of int + Channel number(s). + + Returns + ------- + numpy.ndarray or float + Sample rate(s) in Hz. + """ + return np.array([]) + + def t0_t1(self, epochstreams: list[str], epoch_select: int = 1) -> list[list[float]]: + """Return the beginning and end epoch times. + + Parameters + ---------- + epochstreams : list of str + File paths comprising the epoch. + epoch_select : int + Which epoch to access. + + Returns + ------- + list of list + [[t0, t1]] for each clock type. Abstract class returns [[NaN, NaN]]. + """ + return [[float("nan"), float("nan")]] + + def samples2times( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + s: np.ndarray | int | float, + ) -> np.ndarray: + """Convert sample numbers to time. + + Parameters + ---------- + channeltype : str + Type of channel. + channel : int or list of int + Channel number(s). + epochstreams : list of str + File paths comprising the epoch. + epoch_select : int + Which epoch to access. + s : array-like + Sample numbers (1-based). + + Returns + ------- + numpy.ndarray + Times in seconds. + """ + sr = self.samplerate(epochstreams, epoch_select, channeltype, channel) + sr_arr = np.atleast_1d(np.asarray(sr, dtype=float)) + sr_unique = np.unique(sr_arr) + if len(sr_unique) != 1: + raise ValueError("Do not know how to handle different sampling rates across channels.") + t0t1 = self.t0_t1(epochstreams, epoch_select) + return _samples2times(s, t0t1[0], sr_unique[0]) + + def times2samples( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + t: np.ndarray | float, + ) -> np.ndarray: + """Convert time to sample numbers. + + Parameters + ---------- + channeltype : str + Type of channel. + channel : int or list of int + Channel number(s). + epochstreams : list of str + File paths comprising the epoch. + epoch_select : int + Which epoch to access. + t : array-like + Times in seconds. + + Returns + ------- + numpy.ndarray + Sample numbers (1-based). + """ + sr = self.samplerate(epochstreams, epoch_select, channeltype, channel) + sr_arr = np.atleast_1d(np.asarray(sr, dtype=float)) + sr_unique = np.unique(sr_arr) + if len(sr_unique) != 1: + raise ValueError("Do not know how to handle different sampling rates across channels.") + t0t1 = self.t0_t1(epochstreams, epoch_select) + return _times2samples(t, t0t1[0], sr_unique[0]) + + # ------------------------------------------------------------------ + # Static methods + # ------------------------------------------------------------------ + + @staticmethod + def mfdaq_channeltypes() -> list[str]: + """Return supported channel types for multifunction DAQ readers. + + Returns + ------- + list of str + Channel type strings. + """ + return [ + "analog_in", + "aux_in", + "analog_out", + "digital_in", + "digital_out", + "marker", + "event", + "time", + ] + + @staticmethod + def mfdaq_prefix(channeltype: str) -> str: + """Return the channel prefix for a given channel type. + + Parameters + ---------- + channeltype : str + The channel type string. + + Returns + ------- + str + The channel prefix (e.g., 'ai', 'di', 't'). + """ + prefix_map = { + "analog_in": "ai", + "ai": "ai", + "analog_out": "ao", + "ao": "ao", + "digital_in": "di", + "di": "di", + "digital_out": "do", + "do": "do", + "digital_in_event": "dep", + "digital_in_event_pos": "dep", + "de": "dep", + "dep": "dep", + "digital_in_event_neg": "den", + "den": "den", + "digital_in_mark": "dimp", + "digital_in_mark_pos": "dimp", + "dim": "dimp", + "dimp": "dimp", + "digital_in_mark_neg": "dimn", + "dimn": "dimn", + "time": "t", + "timestamp": "t", + "t": "t", + "auxiliary": "ax", + "aux": "ax", + "ax": "ax", + "auxiliary_in": "ax", + "marker": "mk", + "mark": "mk", + "mk": "mk", + "event": "e", + "e": "e", + "metadata": "md", + "md": "md", + "text": "text", + } + if channeltype not in prefix_map: + raise ValueError(f"Unknown channel type '{channeltype}'.") + return prefix_map[channeltype] + + @staticmethod + def mfdaq_type(channeltype: str) -> str: + """Return the preferred long channel type name for a given channel type. + + Parameters + ---------- + channeltype : str + The channel type string (short or long form). + + Returns + ------- + str + The canonical long channel type name. + """ + type_map = { + "analog_in": "analog_in", + "ai": "analog_in", + "analog_out": "analog_out", + "ao": "analog_out", + "digital_in": "digital_in", + "di": "digital_in", + "digital_out": "digital_out", + "do": "digital_out", + "time": "time", + "timestamp": "time", + "t": "time", + "auxiliary": "ax", + "aux": "ax", + "ax": "ax", + "auxiliary_in": "ax", + "marker": "mark", + "mark": "mark", + "mk": "mark", + "event": "event", + "e": "event", + "text": "text", + } + if channeltype not in type_map: + raise ValueError(f"Type '{channeltype}' is unknown.") + return type_map[channeltype] diff --git a/src/ndr/reader/bjg.py b/src/ndr/reader/bjg.py new file mode 100644 index 0000000..b343333 --- /dev/null +++ b/src/ndr/reader/bjg.py @@ -0,0 +1,44 @@ +"""BJG reader class. + +Port of +ndr/+reader/bjg.m +""" + +from __future__ import annotations + +import numpy as np + +from ndr.reader.base import Base + + +class BJG(Base): + """Reader for BJG file format. + + Port of ndr.reader.bjg. + """ + + def __init__(self) -> None: + super().__init__() + + def readchannels_epochsamples( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + s0: int, + s1: int, + ) -> np.ndarray: + """Read data from specified channels.""" + raise NotImplementedError("BJG reader not yet fully implemented.") + + def readevents_epochsamples_native( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + t0: float, + t1: float, + ) -> tuple[np.ndarray, np.ndarray]: + """Read events.""" + return np.array([]), np.array([]) diff --git a/src/ndr/reader/ced_smr.py b/src/ndr/reader/ced_smr.py new file mode 100644 index 0000000..ab8aa55 --- /dev/null +++ b/src/ndr/reader/ced_smr.py @@ -0,0 +1,180 @@ +"""CED SMR reader class. + +Port of +ndr/+reader/ced_smr.m +Reads Cambridge Electronic Design Spike2 (.smr) files using neo. +""" + +from __future__ import annotations + +from typing import Any + +import numpy as np + +from ndr.reader.base import Base + + +class CedSMR(Base): + """Reader for CED Spike2 (.smr) file format. + + Port of ndr.reader.ced_smr. + """ + + def __init__(self) -> None: + super().__init__() + + def readchannels_epochsamples( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + s0: int, + s1: int, + ) -> np.ndarray: + """Read data from specified channels.""" + try: + from neo.io import Spike2IO + except ImportError as err: + raise ImportError( + "neo is required for reading SMR files. Install with: pip install neo" + ) from err + + if isinstance(channel, int): + channel = [channel] + + smr_file = self._filenamefromepochfiles(epochstreams) + reader = Spike2IO(filename=smr_file) + block = reader.read_block(signal_group_mode="split-all") + seg = block.segments[epoch_select - 1] + + if channeltype in ("time", "timestamp", "t"): + sig = seg.analogsignals[0] + times = sig.times.magnitude + return times[s0 - 1 : s1].reshape(-1, 1) + + data_list = [] + for ch in channel: + sig = seg.analogsignals[ch - 1] + data_list.append(sig.magnitude[s0 - 1 : s1].flatten()) + + return np.column_stack(data_list) + + def readevents_epochsamples_native( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + t0: float, + t1: float, + ) -> tuple[np.ndarray, np.ndarray]: + """Read events from CED files.""" + try: + from neo.io import Spike2IO + except ImportError as err: + raise ImportError("neo is required for reading SMR files.") from err + + smr_file = self._filenamefromepochfiles(epochstreams) + reader = Spike2IO(filename=smr_file) + block = reader.read_block(signal_group_mode="split-all") + seg = block.segments[epoch_select - 1] + + timestamps_all = [] + data_all = [] + + for evt in seg.events: + times = evt.times.magnitude + mask = (times >= t0) & (times <= t1) + timestamps_all.append(times[mask]) + data_all.append(np.ones(np.sum(mask))) + + if timestamps_all: + return np.concatenate(timestamps_all), np.concatenate(data_all) + return np.array([]), np.array([]) + + def getchannelsepoch( + self, epochstreams: list[str], epoch_select: int = 1 + ) -> list[dict[str, Any]]: + """List channels available for a given epoch.""" + try: + from neo.io import Spike2IO + except ImportError as err: + raise ImportError("neo is required for reading SMR files.") from err + + smr_file = self._filenamefromepochfiles(epochstreams) + reader = Spike2IO(filename=smr_file) + block = reader.read_block(signal_group_mode="split-all") + seg = block.segments[epoch_select - 1] + + channels: list[dict[str, Any]] = [] + channels.append({"name": "t1", "type": "time", "time_channel": 1}) + + for i, _sig in enumerate(seg.analogsignals): + channels.append({"name": f"ai{i + 1}", "type": "analog_in", "time_channel": 1}) + + for i, _evt in enumerate(seg.events): + channels.append({"name": f"e{i + 1}", "type": "event", "time_channel": 1}) + + return channels + + def samplerate( + self, + epochstreams: list[str], + epoch_select: int, + channeltype: str, + channel: int | list[int], + ) -> float | np.ndarray: + """Get the sample rate.""" + try: + from neo.io import Spike2IO + except ImportError as err: + raise ImportError("neo is required for reading SMR files.") from err + + smr_file = self._filenamefromepochfiles(epochstreams) + reader = Spike2IO(filename=smr_file) + block = reader.read_block(signal_group_mode="split-all") + seg = block.segments[epoch_select - 1] + + if isinstance(channel, int): + channel = [channel] + + sr_list = [] + for ch in channel: + if ch - 1 < len(seg.analogsignals): + sr_list.append(float(seg.analogsignals[ch - 1].sampling_rate.magnitude)) + else: + sr_list.append(float("nan")) + + if len(sr_list) == 1: + return sr_list[0] + return np.array(sr_list) + + def t0_t1(self, epochstreams: list[str], epoch_select: int = 1) -> list[list[float]]: + """Return the beginning and end times for an epoch.""" + try: + from neo.io import Spike2IO + except ImportError as err: + raise ImportError("neo is required for reading SMR files.") from err + + smr_file = self._filenamefromepochfiles(epochstreams) + reader = Spike2IO(filename=smr_file) + block = reader.read_block(signal_group_mode="split-all") + seg = block.segments[epoch_select - 1] + + if seg.analogsignals: + sig = seg.analogsignals[0] + t0 = float(sig.t_start.magnitude) + t1 = float(sig.t_stop.magnitude) + else: + t0 = float("nan") + t1 = float("nan") + + return [[t0, t1]] + + @staticmethod + def _filenamefromepochfiles(filename_array: list[str]) -> str: + """Find the .smr file from epoch files.""" + smr_files = [f for f in filename_array if f.lower().endswith(".smr")] + if len(smr_files) != 1: + raise ValueError("Need exactly 1 .smr file per epoch.") + return smr_files[0] diff --git a/src/ndr/reader/dabrowska.py b/src/ndr/reader/dabrowska.py new file mode 100644 index 0000000..fff82bc --- /dev/null +++ b/src/ndr/reader/dabrowska.py @@ -0,0 +1,44 @@ +"""Dabrowska reader class. + +Port of +ndr/+reader/dabrowska.m +""" + +from __future__ import annotations + +import numpy as np + +from ndr.reader.base import Base + + +class Dabrowska(Base): + """Reader for Dabrowska file format. + + Port of ndr.reader.dabrowska. + """ + + def __init__(self) -> None: + super().__init__() + + def readchannels_epochsamples( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + s0: int, + s1: int, + ) -> np.ndarray: + """Read data from specified channels.""" + raise NotImplementedError("Dabrowska reader not yet fully implemented.") + + def readevents_epochsamples_native( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + t0: float, + t1: float, + ) -> tuple[np.ndarray, np.ndarray]: + """Read events.""" + return np.array([]), np.array([]) diff --git a/src/ndr/reader/intan_rhd.py b/src/ndr/reader/intan_rhd.py new file mode 100644 index 0000000..fa6df03 --- /dev/null +++ b/src/ndr/reader/intan_rhd.py @@ -0,0 +1,435 @@ +"""Intan RHD reader class. + +Port of +ndr/+reader/intan_rhd.m +""" + +from __future__ import annotations + +import re +from pathlib import Path +from typing import Any + +import numpy as np + +from ndr.format.intan.read_Intan_RHD2000_datafile import ( + Intan_RHD2000_blockinfo, + read_Intan_RHD2000_datafile, +) +from ndr.format.intan.read_Intan_RHD2000_header import read_Intan_RHD2000_header +from ndr.reader.base import Base + + +class IntanRHD(Base): + """Reader for Intan Technologies .RHD file format. + + Port of ndr.reader.intan_rhd. + """ + + def __init__(self) -> None: + super().__init__() + + def daqchannels2internalchannels( + self, + channelprefix: list[str], + channelnumber: list[int] | np.ndarray, + epochstreams: list[str], + epoch_select: int = 1, + ) -> list[dict[str, Any]]: + """Convert DAQ channel prefixes/numbers to internal channel structures.""" + channelstruct: list[dict[str, Any]] = [] + + filename = self.filenamefromepochfiles(epochstreams)[0] + header = read_Intan_RHD2000_header(filename) + + for c in range(len(channelnumber)): + intan_type, absolute = IntanRHD.intananychannelname2intanchanneltype(channelprefix[c]) + ndr_type = IntanRHD.intanchanneltype2mfdaqchanneltype(intan_type) + header_name = IntanRHD.mfdaqchanneltype2intanheadertype(ndr_type) + header_chunk = header.get(header_name, []) + + entry: dict[str, Any] = { + "internal_type": ndr_type, + "internal_number": 0, + "internal_channelname": "", + "ndr_type": ndr_type, + "samplerate": 0.0, + } + + if not absolute: + entry["internal_number"] = channelnumber[c] + native_names = [ch["native_channel_name"] for ch in header_chunk] + if channelnumber[c] - 1 < len(native_names): + entry["internal_channelname"] = native_names[channelnumber[c] - 1] + entry["samplerate"] = self.samplerate( + epochstreams, epoch_select, channelprefix[c], channelnumber[c] + ) + else: + chan_name = f"{channelprefix[c]}-{channelnumber[c]:03d}" + entry["internal_channelname"] = chan_name + native_names = [ch["native_channel_name"] for ch in header_chunk] + if chan_name in native_names: + entry["internal_number"] = native_names.index(chan_name) + 1 + else: + raise ValueError( + f"Requested channel {chan_name} was not recorded in this file." + ) + entry["samplerate"] = self.samplerate( + epochstreams, + epoch_select, + entry["ndr_type"], + entry["internal_number"], + ) + + channelstruct.append(entry) + + return channelstruct + + def t0_t1(self, epochstreams: list[str], epoch_select: int = 1) -> list[list[float]]: + """Return the beginning and end epoch times.""" + filename, parentdir, isdirectory = self.filenamefromepochfiles(epochstreams) + header = read_Intan_RHD2000_header(filename) + + if not isdirectory: + _blockinfo, _bpb, _bp, num_data_blocks = Intan_RHD2000_blockinfo(filename, header) + total_samples = 60 * num_data_blocks + else: + time_dat = Path(parentdir) / "time.dat" + if not time_dat.exists(): + raise FileNotFoundError( + f"File time.dat necessary in directory {parentdir} but not found." + ) + total_samples = time_dat.stat().st_size // 4 + + sr = header["frequency_parameters"]["amplifier_sample_rate"] + total_time = total_samples / sr + t0 = 0.0 + t1 = total_time - 1.0 / sr + + return [[t0, t1]] + + def getchannelsepoch( + self, epochstreams: list[str], epoch_select: int = 1 + ) -> list[dict[str, Any]]: + """List channels available for a given epoch.""" + intan_channel_types = [ + "amplifier_channels", + "aux_input_channels", + "board_dig_in_channels", + "board_dig_out_channels", + ] + + filename = self.filenamefromepochfiles(epochstreams)[0] + header = read_Intan_RHD2000_header(filename) + + channels: list[dict[str, Any]] = [] + + # Time channel 1 (amplifier rate) + channels.append({"name": "t1", "type": "time", "time_channel": 1}) + + # Time channel 2 for aux (if aux channels present) + if header.get("aux_input_channels"): + channels.append({"name": "t2", "type": "time", "time_channel": 2}) + + for intan_type in intan_channel_types: + if intan_type in header and header[intan_type]: + channel_type_entry = IntanRHD.intanheadertype2mfdaqchanneltype(intan_type) + for ch in header[intan_type]: + name = IntanRHD.intanname2mfdaqname(channel_type_entry, ch) + time_channel = 2 if channel_type_entry == "auxiliary_in" else 1 + channels.append( + { + "name": name, + "type": channel_type_entry, + "time_channel": time_channel, + } + ) + + return channels + + def underlying_datatype( + self, + epochstreams: list[str], + epoch_select: int, + channeltype: str, + channel: int | list[int], + ) -> tuple[str, np.ndarray, int]: + """Get the underlying data type for a channel.""" + if channeltype in ("analog_in", "analog_out"): + return "uint16", np.array([32768, 0.195]), 16 + elif channeltype == "auxiliary_in": + return "uint16", np.array([0, 3.74e-05]), 16 + elif channeltype == "time": + return "float64", np.array([0, 1]), 64 + elif channeltype in ("digital_in", "digital_out"): + return "char", np.array([0, 1]), 8 + elif channeltype in ("eventmarktext", "event", "marker", "text"): + return "float64", np.array([0, 1]), 64 + else: + raise ValueError(f"Unknown channel type '{channeltype}'.") + + def readchannels_epochsamples( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + s0: int, + s1: int, + ) -> np.ndarray: + """Read data from specified channels.""" + filename, parentdir, isdirectory = self.filenamefromepochfiles(epochstreams) + + intanchanneltype = IntanRHD.mfdaqchanneltype2intanchanneltype(channeltype) + + if isinstance(channel, int): + channel = [channel] + + sr = self.samplerate(epochstreams, epoch_select, channeltype, channel[0]) + sr_arr = np.atleast_1d(np.asarray(sr, dtype=float)) + sr_unique = np.unique(sr_arr) + if len(sr_unique) != 1: + raise ValueError("Do not know how to handle different sampling rates across channels.") + sr_val = float(sr_unique[0]) + + t0 = (s0 - 1) / sr_val + t1 = (s1 - 1) / sr_val + + if intanchanneltype == "time": + channel = [1] + + if not isdirectory: + data = read_Intan_RHD2000_datafile(filename, "", intanchanneltype, channel, t0, t1) + else: + # Directory-based reading (one-file-per-channel mode) + # For now, delegate to the single file reader + raise NotImplementedError("Directory-based Intan reading not yet implemented.") + + return data + + def readevents_epochsamples_native( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + t0: float, + t1: float, + ) -> tuple[np.ndarray, np.ndarray]: + """Read events (not supported for Intan RHD, returns empty).""" + return np.array([]), np.array([]) + + def samplerate( + self, + epochstreams: list[str], + epoch_select: int, + channeltype: str | list[str], + channel: int | list[int], + ) -> float | np.ndarray: + """Get the sample rate for specific channels.""" + if epoch_select != 1: + raise ValueError("Intan RHD files have 1 epoch per file.") + + filename = self.filenamefromepochfiles(epochstreams)[0] + header = read_Intan_RHD2000_header(filename) + + if isinstance(channel, int): + channel = [channel] + + sr_list = [] + for i in range(len(channel)): + if isinstance(channeltype, list): + ct = channeltype[i] + else: + ct = channeltype + + freq_field = IntanRHD.mfdaqchanneltype2intanfreqheader(ct) + sr_list.append(header["frequency_parameters"][freq_field]) + + if len(sr_list) == 1: + return sr_list[0] + return np.array(sr_list) + + def filenamefromepochfiles(self, filename_array: list[str]) -> tuple[str, str, bool]: + """Find the .rhd file from a list of epoch files. + + Returns + ------- + tuple of (filename, parentdir, isdirectory) + """ + rhd_files = [f for f in filename_array if f.lower().endswith(".rhd")] + + if len(rhd_files) > 1: + raise ValueError("Need only 1 .rhd file per epoch.") + elif len(rhd_files) == 0: + raise ValueError("Need 1 .rhd file per epoch.") + + filename = rhd_files[0] + parentdir = str(Path(filename).parent) + isdirectory = False + + # Check if this is the one-file-per-channel format + if Path(filename).stem == "info": + time_dat_files = [f for f in filename_array if f.endswith("time.dat")] + if time_dat_files: + isdirectory = True + + return filename, parentdir, isdirectory + + # ------------------------------------------------------------------ + # Static helper methods + # ------------------------------------------------------------------ + + @staticmethod + def mfdaqchanneltype2intanheadertype(channeltype: str) -> str: + """Convert ndr.reader.base channel types to Intan header field names.""" + mapping = { + "analog_in": "amplifier_channels", + "ai": "amplifier_channels", + "digital_in": "board_dig_in_channels", + "di": "board_dig_in_channels", + "digital_out": "board_dig_out_channels", + "do": "board_dig_out_channels", + "auxiliary": "aux_input_channels", + "aux": "aux_input_channels", + "ax": "aux_input_channels", + "auxiliary_in": "aux_input_channels", + "auxiliary_input": "aux_input_channels", + } + if channeltype not in mapping: + raise ValueError(f"Could not convert channeltype '{channeltype}'.") + return mapping[channeltype] + + @staticmethod + def intanheadertype2mfdaqchanneltype(intanchanneltype: str) -> str: + """Convert Intan header field names to ndr channel types.""" + mapping = { + "amplifier_channels": "analog_in", + "board_dig_in_channels": "digital_in", + "board_dig_out_channels": "digital_out", + "aux_input_channels": "auxiliary_in", + } + if intanchanneltype not in mapping: + raise ValueError(f"Could not convert channeltype '{intanchanneltype}'.") + return mapping[intanchanneltype] + + @staticmethod + def mfdaqchanneltype2intanchanneltype(channeltype: str) -> str: + """Convert ndr channel type to Intan internal channel type.""" + mapping = { + "analog_in": "amp", + "ai": "amp", + "digital_in": "din", + "di": "din", + "digital_out": "dout", + "do": "dout", + "time": "time", + "timestamp": "time", + "auxiliary": "aux", + "aux": "aux", + "auxiliary_input": "aux", + "auxiliary_in": "aux", + } + ct = channeltype.lower() + if ct not in mapping: + raise ValueError(f"Do not know how to convert channel type '{channeltype}'.") + return mapping[ct] + + @staticmethod + def intanchanneltype2mfdaqchanneltype(channeltype: str) -> str: + """Convert Intan internal channel type to ndr channel type.""" + mapping = { + "amp": "ai", + "din": "di", + "dout": "do", + "time": "time", + "aux": "ai", + } + ct = channeltype.lower() + if ct not in mapping: + raise ValueError(f"Do not know how to convert channel type '{channeltype}'.") + return mapping[ct] + + @staticmethod + def intanname2mfdaqname(channel_type: str, name_or_struct: str | dict[str, Any]) -> str: + """Convert Intan native channel name to ndr.reader.base format.""" + if isinstance(name_or_struct, dict): + name = name_or_struct["native_channel_name"] + chip_channel = name_or_struct.get("chip_channel") + else: + name = name_or_struct + chip_channel = None + + chan = None + + # Try to find separator '-' + sep_idx = name.rfind("-") + if sep_idx >= 0: + try: + chan_intan = int(name[sep_idx + 1 :]) + chan = chan_intan + 1 # Intan numbers from 0 + except ValueError: + pass + else: + # Try to find trailing digits + m = re.search(r"\d+$", name) + if m: + chan_intan = int(m.group()) + if channel_type.startswith("aux") or channel_type.startswith("ax"): + chan = chan_intan # Aux channels are 1-based + else: + chan = chan_intan + 1 + + if chan is None and chip_channel is not None: + chan = chip_channel + 1 + + prefix = Base.mfdaq_prefix(channel_type) + if chan is None: + return prefix + return f"{prefix}{chan}" + + @staticmethod + def mfdaqchanneltype2intanfreqheader(channeltype: str) -> str: + """Return the header field name for frequency info for a channel type.""" + mapping = { + "analog_in": "amplifier_sample_rate", + "ai": "amplifier_sample_rate", + "digital_in": "board_dig_in_sample_rate", + "di": "board_dig_in_sample_rate", + "time": "amplifier_sample_rate", + "timestamp": "amplifier_sample_rate", + "auxiliary": "aux_input_sample_rate", + "aux": "aux_input_sample_rate", + "auxiliary_in": "aux_input_sample_rate", + } + ct = channeltype.lower() + if ct not in mapping: + raise ValueError(f"Do not know frequency header for channel type '{channeltype}'.") + return mapping[ct] + + @staticmethod + def intananychannelname2intanchanneltype( + intananychannelname: str, + ) -> tuple[str, bool]: + """Convert any channel name to Intan channel type. + + Returns + ------- + tuple of (str, bool) + (intan_channel_type, is_absolute_reference) + """ + # First try as a standard ndr channel type + try: + return IntanRHD.mfdaqchanneltype2intanchanneltype(intananychannelname), False + except ValueError: + pass + + # Try as Intan bank name + name_lower = intananychannelname.lower() + if name_lower in ("a", "b", "c", "d"): + return "amp", True + elif name_lower in ("aaux", "baux", "caux", "daux"): + return "aux", True + elif name_lower in ("avdd1", "bvdd1", "cvdd1", "dvdd1"): + return "supply", True + else: + raise ValueError(f"Do not know how to convert channel bank '{intananychannelname}'.") diff --git a/src/ndr/reader/ndr_matlab_python_bridge.yaml b/src/ndr/reader/ndr_matlab_python_bridge.yaml new file mode 100644 index 0000000..f358e00 --- /dev/null +++ b/src/ndr/reader/ndr_matlab_python_bridge.yaml @@ -0,0 +1,212 @@ +project_metadata: + bridge_version: "1.1" + naming_policy: "Strict MATLAB Mirror" + indexing_policy: "Semantic Parity (1-based for user concepts, 0-based for internal data)" + +classes: + - name: base + type: class + matlab_path: "+ndr/+reader/base.m" + python_path: "ndr/reader/base.py" + python_class: "Base" + properties: + - name: MightHaveTimeGaps + type_python: "bool" + methods: + - name: canbereadtogether + input_arguments: + - name: channelstruct + type_python: "list[dict]" + output_arguments: + - name: b + type_python: "bool" + - name: errormsg + type_python: "str" + - name: daqchannels2internalchannels + input_arguments: + - name: channelprefix + type_python: "list[str]" + - name: channelnumber + type_python: "list[int]" + - name: epochstreams + type_python: "list[str]" + - name: epoch_select + type_python: "int" + output_arguments: + - name: channelstruct + type_python: "list[dict]" + - name: epochclock + input_arguments: + - name: epochstreams + type_python: "list[str]" + - name: epoch_select + type_python: "int" + output_arguments: + - name: ec + type_python: "list[ClockType]" + - name: getchannelsepoch + input_arguments: + - name: epochstreams + type_python: "list[str]" + - name: epoch_select + type_python: "int" + output_arguments: + - name: channels + type_python: "list[dict]" + - name: underlying_datatype + input_arguments: + - name: epochstreams + type_python: "list[str]" + - name: epoch_select + type_python: "int" + - name: channeltype + type_python: "str" + - name: channel + type_python: "int | list[int]" + output_arguments: + - name: datatype + type_python: "str" + - name: p + type_python: "numpy.ndarray" + - name: datasize + type_python: "int" + - name: readchannels_epochsamples + input_arguments: + - name: channeltype + type_python: "str" + - name: channel + type_python: "int | list[int]" + - name: epochstreams + type_python: "list[str]" + - name: epoch_select + type_python: "int" + - name: s0 + type_python: "int" + - name: s1 + type_python: "int" + output_arguments: + - name: data + type_python: "numpy.ndarray" + - name: readevents_epochsamples_native + input_arguments: + - name: channeltype + type_python: "str" + - name: channel + type_python: "int | list[int]" + - name: epochstreams + type_python: "list[str]" + - name: epoch_select + type_python: "int" + - name: t0 + type_python: "float" + - name: t1 + type_python: "float" + output_arguments: + - name: timestamps + type_python: "numpy.ndarray" + - name: data + type_python: "numpy.ndarray" + - name: samplerate + input_arguments: + - name: epochstreams + type_python: "list[str]" + - name: epoch_select + type_python: "int" + - name: channeltype + type_python: "str" + - name: channel + type_python: "int | list[int]" + output_arguments: + - name: sr + type_python: "float | numpy.ndarray" + - name: t0_t1 + input_arguments: + - name: epochstreams + type_python: "list[str]" + - name: epoch_select + type_python: "int" + output_arguments: + - name: t0t1 + type_python: "list[list[float]]" + - name: samples2times + input_arguments: + - name: channeltype + type_python: "str" + - name: channel + type_python: "int | list[int]" + - name: epochstreams + type_python: "list[str]" + - name: epoch_select + type_python: "int" + - name: s + type_python: "numpy.ndarray | int" + output_arguments: + - name: t + type_python: "numpy.ndarray" + - name: times2samples + input_arguments: + - name: channeltype + type_python: "str" + - name: channel + type_python: "int | list[int]" + - name: epochstreams + type_python: "list[str]" + - name: epoch_select + type_python: "int" + - name: t + type_python: "numpy.ndarray | float" + output_arguments: + - name: s + type_python: "numpy.ndarray" + static_methods: + - name: mfdaq_channeltypes + output_arguments: + - name: ct + type_python: "list[str]" + - name: mfdaq_prefix + input_arguments: + - name: channeltype + type_python: "str" + output_arguments: + - name: prefix + type_python: "str" + - name: mfdaq_type + input_arguments: + - name: channeltype + type_python: "str" + output_arguments: + - name: type + type_python: "str" + + - name: intan_rhd + type: class + matlab_path: "+ndr/+reader/intan_rhd.m" + python_path: "ndr/reader/intan_rhd.py" + python_class: "IntanRHD" + inherits: base + methods: + - name: daqchannels2internalchannels + - name: t0_t1 + - name: getchannelsepoch + - name: underlying_datatype + - name: readchannels_epochsamples + - name: samplerate + - name: filenamefromepochfiles + input_arguments: + - name: filename_array + type_python: "list[str]" + output_arguments: + - name: filename + type_python: "str" + - name: parentdir + type_python: "str" + - name: isdirectory + type_python: "bool" + static_methods: + - name: mfdaqchanneltype2intanheadertype + - name: intanheadertype2mfdaqchanneltype + - name: mfdaqchanneltype2intanchanneltype + - name: intanchanneltype2mfdaqchanneltype + - name: intanname2mfdaqname + - name: mfdaqchanneltype2intanfreqheader + - name: intananychannelname2intanchanneltype diff --git a/src/ndr/reader/neo.py b/src/ndr/reader/neo.py new file mode 100644 index 0000000..fa909d6 --- /dev/null +++ b/src/ndr/reader/neo.py @@ -0,0 +1,80 @@ +"""Neo reader class. + +Port of +ndr/+reader/neo.m +Wraps the Python neo library to provide NDR-compatible interface. +""" + +from __future__ import annotations + +from typing import Any + +import numpy as np + +from ndr.reader.base import Base + + +class NeoReader(Base): + """Reader that wraps the Python neo library. + + Port of ndr.reader.neo. Provides NDR interface for any format + supported by the neo library. + """ + + def __init__(self) -> None: + super().__init__() + + def daqchannels2internalchannels( + self, + channelprefix: list[str], + channelstring: str | list[str], + epochstreams: list[str], + epoch_select: int = 1, + ) -> list[dict[str, Any]]: + """Convert channel names to internal structures. + + For Neo reader, channelstring is a list of native channel names. + """ + try: + import neo # noqa: F401 + except ImportError as err: + raise ImportError("neo is required for the Neo reader.") from err + + if isinstance(channelstring, str): + channelstring = [channelstring] + + channelstruct: list[dict[str, Any]] = [] + for name in channelstring: + channelstruct.append( + { + "internal_type": "analog_in", + "internal_number": 0, + "internal_channelname": name, + "ndr_type": "analog_in", + "samplerate": float("nan"), + } + ) + return channelstruct + + def readchannels_epochsamples( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + s0: int, + s1: int, + ) -> np.ndarray: + """Read data from specified channels.""" + raise NotImplementedError("Neo reader: use the specific neo IO classes directly.") + + def readevents_epochsamples_native( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + t0: float, + t1: float, + ) -> tuple[np.ndarray, np.ndarray]: + """Read events.""" + return np.array([]), np.array([]) diff --git a/src/ndr/reader/spikegadgets_rec.py b/src/ndr/reader/spikegadgets_rec.py new file mode 100644 index 0000000..bff6927 --- /dev/null +++ b/src/ndr/reader/spikegadgets_rec.py @@ -0,0 +1,44 @@ +"""SpikeGadgets REC reader class. + +Port of +ndr/+reader/spikegadgets_rec.m +""" + +from __future__ import annotations + +import numpy as np + +from ndr.reader.base import Base + + +class SpikeGadgetsRec(Base): + """Reader for SpikeGadgets .rec file format. + + Port of ndr.reader.spikegadgets_rec. + """ + + def __init__(self) -> None: + super().__init__() + + def readchannels_epochsamples( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + s0: int, + s1: int, + ) -> np.ndarray: + """Read data from specified channels.""" + raise NotImplementedError("SpikeGadgets reader not yet fully implemented.") + + def readevents_epochsamples_native( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + t0: float, + t1: float, + ) -> tuple[np.ndarray, np.ndarray]: + """Read events.""" + return np.array([]), np.array([]) diff --git a/src/ndr/reader/tdt_sev.py b/src/ndr/reader/tdt_sev.py new file mode 100644 index 0000000..3f7193f --- /dev/null +++ b/src/ndr/reader/tdt_sev.py @@ -0,0 +1,45 @@ +"""TDT SEV reader class. + +Port of +ndr/+reader/tdt_sev.m +Reads Tucker-Davis Technologies (.sev) files using the tdt library. +""" + +from __future__ import annotations + +import numpy as np + +from ndr.reader.base import Base + + +class TdtSev(Base): + """Reader for TDT .sev file format. + + Port of ndr.reader.tdt_sev. + """ + + def __init__(self) -> None: + super().__init__() + + def readchannels_epochsamples( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + s0: int, + s1: int, + ) -> np.ndarray: + """Read data from specified channels.""" + raise NotImplementedError("TDT reader not yet fully implemented.") + + def readevents_epochsamples_native( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + t0: float, + t1: float, + ) -> tuple[np.ndarray, np.ndarray]: + """Read events.""" + return np.array([]), np.array([]) diff --git a/src/ndr/reader/whitematter.py b/src/ndr/reader/whitematter.py new file mode 100644 index 0000000..2cbe9d1 --- /dev/null +++ b/src/ndr/reader/whitematter.py @@ -0,0 +1,44 @@ +"""White Matter reader class. + +Port of +ndr/+reader/whitematter.m +""" + +from __future__ import annotations + +import numpy as np + +from ndr.reader.base import Base + + +class WhiteMatter(Base): + """Reader for White Matter file format. + + Port of ndr.reader.whitematter. + """ + + def __init__(self) -> None: + super().__init__() + + def readchannels_epochsamples( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + s0: int, + s1: int, + ) -> np.ndarray: + """Read data from specified channels.""" + raise NotImplementedError("WhiteMatter reader not yet fully implemented.") + + def readevents_epochsamples_native( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + t0: float, + t1: float, + ) -> tuple[np.ndarray, np.ndarray]: + """Read events.""" + return np.array([]), np.array([]) diff --git a/src/ndr/reader_wrapper.py b/src/ndr/reader_wrapper.py new file mode 100644 index 0000000..3a66ed4 --- /dev/null +++ b/src/ndr/reader_wrapper.py @@ -0,0 +1,332 @@ +"""High-level NDR reader wrapper. + +Port of +ndr/reader.m + +This module provides the Reader class that wraps format-specific reader +subclasses and provides a unified interface for reading neural data. +""" + +from __future__ import annotations + +import importlib +from typing import Any + +import numpy as np + +from ndr.fun.ndrresource import ndrresource +from ndr.reader.base import Base +from ndr.string.channelstring2channels import channelstring2channels +from ndr.time.clocktype import ClockType + + +class Reader: + """High-level Neuroscience Data Reader. + + Wraps a format-specific reader (subclass of ndr.reader.base.Base) + and provides a unified interface. + + Port of ndr.reader (the MATLAB class, not the package). + + Parameters + ---------- + ndr_reader_type : str + Data format identifier (e.g., 'intan', 'rhd', 'abf', 'smr'). + """ + + def __init__(self, ndr_reader_type: str) -> None: + j = ndrresource("ndr_reader_types.json") + match = None + for entry in j: + if ndr_reader_type.lower() in [t.lower() for t in entry["type"]]: + match = entry + break + + if match is None: + raise ValueError(f"Do not know how to make a reader of type '{ndr_reader_type}'.") + + # Dynamically import and instantiate the reader class + classname = match["classname"] + parts = classname.rsplit(".", 1) + module_path = parts[0] + class_name = parts[1] + module = importlib.import_module(module_path) + cls = getattr(module, class_name) + self.ndr_reader_base: Base = cls() + + def read( + self, + epochstreams: list[str], + channelstring: str | list[str], + *, + t0: float = float("-inf"), + t1: float = float("inf"), + epoch_select: int = 1, + useSamples: bool = False, + s0: float = float("nan"), + s1: float = float("nan"), + ) -> tuple[np.ndarray, np.ndarray]: + """Read data or time information from specified channels and epoch. + + Parameters + ---------- + epochstreams : list of str + File paths comprising the epoch. + channelstring : str or list of str + Channel specification (e.g., 'ai1-3,5', 'ai1+di1'). + t0 : float + Start time in seconds (-inf for earliest). + t1 : float + End time in seconds (inf for latest). + epoch_select : int + Epoch index within the file (usually 1). + useSamples : bool + If True, interpret s0/s1 as sample numbers instead of times. + s0 : float + Start sample number (1-based) if useSamples is True. + s1 : float + End sample number (1-based) if useSamples is True. + + Returns + ------- + tuple of (numpy.ndarray, numpy.ndarray) + (data, time) + """ + is_neo = type(self.ndr_reader_base).__name__ == "NeoReader" + + if is_neo: + channelstruct = self.ndr_reader_base.daqchannels2internalchannels( + [], channelstring, epochstreams, epoch_select + ) + else: + channelprefix, channelnumber = channelstring2channels(channelstring) + channelstruct = self.ndr_reader_base.daqchannels2internalchannels( + channelprefix, channelnumber, epochstreams, epoch_select + ) + + b, errormsg = self.ndr_reader_base.canbereadtogether(channelstruct) + + if not b: + raise ValueError( + "Specified channels cannot be read in a single function call. " + f"Please split channel reading by similar channel types. {errormsg}" + ) + + ndr_type = channelstruct[0]["ndr_type"] + + if ndr_type in ( + "analog_input", + "analog_output", + "analog_in", + "analog_out", + "ai", + "ao", + ): + if not useSamples: + sr = channelstruct[0]["samplerate"] + s0 = round(1 + t0 * sr) + s1 = round(1 + t1 * sr) + + if is_neo: + channels = channelstring + else: + channels = [ch["internal_number"] for ch in channelstruct] + + data = self.readchannels_epochsamples( + channelstruct[0]["internal_type"], + channels, + epochstreams, + epoch_select, + int(s0), + int(s1), + ) + time = self.readchannels_epochsamples( + "time", + channels, + epochstreams, + epoch_select, + int(s0), + int(s1), + ) + else: + if is_neo: + channels = channelstring + else: + channels = [ch["internal_number"] for ch in channelstruct] + + data, time = self.readevents_epochsamples( + [ch["internal_type"] for ch in channelstruct], + channels, + epochstreams, + epoch_select, + t0, + t1, + ) + + return data, time + + def epochclock(self, epochstreams: list[str], epoch_select: int = 1) -> list[ClockType]: + """Return the clock types for an epoch.""" + return self.ndr_reader_base.epochclock(epochstreams, epoch_select) + + def t0_t1(self, epochstreams: list[str], epoch_select: int = 1) -> list[list[float]]: + """Return the beginning and end times for an epoch.""" + return self.ndr_reader_base.t0_t1(epochstreams, epoch_select) + + def getchannelsepoch( + self, epochstreams: list[str], epoch_select: int = 1 + ) -> list[dict[str, Any]]: + """List channels available for a given epoch.""" + return self.ndr_reader_base.getchannelsepoch(epochstreams, epoch_select) + + def underlying_datatype( + self, + epochstreams: list[str], + epoch_select: int, + channeltype: str, + channel: int | list[int], + ) -> tuple[str, np.ndarray, int]: + """Get the native data type for specified channels.""" + return self.ndr_reader_base.underlying_datatype( + epochstreams, epoch_select, channeltype, channel + ) + + def readchannels_epochsamples( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + s0: int, + s1: int, + ) -> np.ndarray: + """Read regularly sampled data channels.""" + return self.ndr_reader_base.readchannels_epochsamples( + channeltype, channel, epochstreams, epoch_select, s0, s1 + ) + + def readevents_epochsamples( + self, + channeltype: list[str], + channel: list[int], + epochstreams: list[str], + epoch_select: int, + t0: float, + t1: float, + ) -> tuple[Any, Any]: + """Read event/marker data or derive events from digital channels.""" + derived_types = {"dep", "den", "dimp", "dimn"} + + if set(channeltype) & derived_types: + timestamps_list: list[np.ndarray] = [] + data_list: list[np.ndarray] = [] + + for i, ch in enumerate(channel): + srd = self.samplerate(epochstreams, epoch_select, "di", ch) + s0d = 1 + round(srd * t0) + s1d = 1 + round(srd * t1) + + data_here = self.readchannels_epochsamples( + "di", [ch], epochstreams, epoch_select, s0d, s1d + ) + time_here = self.readchannels_epochsamples( + "time", [ch], epochstreams, epoch_select, s0d, s1d + ) + + data_flat = data_here.flatten() + time_flat = time_here.flatten() + + ct = channeltype[i] + if ct in ("dep", "dimp"): + on_samples = np.where((data_flat[:-1] == 0) & (data_flat[1:] == 1))[0] + if ct == "dimp": + off_samples = np.where((data_flat[:-1] == 1) & (data_flat[1:] == 0))[0] + 1 + else: + off_samples = np.array([], dtype=int) + elif ct in ("den", "dimn"): + on_samples = np.where((data_flat[:-1] == 1) & (data_flat[1:] == 0))[0] + if ct == "dimn": + off_samples = np.where((data_flat[:-1] == 0) & (data_flat[1:] == 1))[0] + 1 + else: + off_samples = np.array([], dtype=int) + else: + on_samples = np.array([], dtype=int) + off_samples = np.array([], dtype=int) + + ts = np.concatenate([time_flat[on_samples], time_flat[off_samples]]) + d = np.concatenate( + [ + np.ones(len(on_samples)), + -np.ones(len(off_samples)), + ] + ) + + if len(off_samples) > 0: + order = np.argsort(ts) + ts = ts[order] + d = d[order] + + timestamps_list.append(ts) + data_list.append(d) + + if len(channel) == 1: + return timestamps_list[0], data_list[0] + return timestamps_list, data_list + else: + return self.readevents_epochsamples_native( + channeltype, channel, epochstreams, epoch_select, t0, t1 + ) + + def readevents_epochsamples_native( + self, + channeltype: list[str], + channel: list[int], + epochstreams: list[str], + epoch_select: int, + t0: float, + t1: float, + ) -> tuple[Any, Any]: + """Read native event/marker channels.""" + return self.ndr_reader_base.readevents_epochsamples_native( + channeltype, channel, epochstreams, epoch_select, t0, t1 + ) + + def samplerate( + self, + epochstreams: list[str], + epoch_select: int, + channeltype: str, + channel: int | list[int], + ) -> Any: + """Get the sample rate for specific channels.""" + return self.ndr_reader_base.samplerate(epochstreams, epoch_select, channeltype, channel) + + def samples2times( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + s: np.ndarray | int | float, + ) -> np.ndarray: + """Convert sample numbers to time.""" + return self.ndr_reader_base.samples2times( + channeltype, channel, epochstreams, epoch_select, s + ) + + def times2samples( + self, + channeltype: str, + channel: int | list[int], + epochstreams: list[str], + epoch_select: int, + t: np.ndarray | float, + ) -> np.ndarray: + """Convert time to sample numbers.""" + return self.ndr_reader_base.times2samples( + channeltype, channel, epochstreams, epoch_select, t + ) + + @property + def MightHaveTimeGaps(self) -> bool: + """Whether the underlying reader might have time gaps.""" + return self.ndr_reader_base.MightHaveTimeGaps diff --git a/src/ndr/resource/ndr_reader_types.json b/src/ndr/resource/ndr_reader_types.json new file mode 100644 index 0000000..9740b5b --- /dev/null +++ b/src/ndr/resource/ndr_reader_types.json @@ -0,0 +1,38 @@ +[ + { + "type": ["intan_rhd", "intan", "rhd"], + "classname": "ndr.reader.intan_rhd.IntanRHD" + }, + { + "type": ["axon_abf", "axon", "abf"], + "classname": "ndr.reader.axon_abf.AxonABF" + }, + { + "type": ["ced_smr", "ced", "smr"], + "classname": "ndr.reader.ced_smr.CedSMR" + }, + { + "type": ["spikegadgets_rec", "spikegadgets", "rec"], + "classname": "ndr.reader.spikegadgets_rec.SpikeGadgetsRec" + }, + { + "type": ["tdt_sev", "tdt", "sev"], + "classname": "ndr.reader.tdt_sev.TdtSev" + }, + { + "type": ["neo"], + "classname": "ndr.reader.neo.NeoReader" + }, + { + "type": ["whitematter"], + "classname": "ndr.reader.whitematter.WhiteMatter" + }, + { + "type": ["bjg"], + "classname": "ndr.reader.bjg.BJG" + }, + { + "type": ["dabrowska"], + "classname": "ndr.reader.dabrowska.Dabrowska" + } +] diff --git a/src/ndr/string/__init__.py b/src/ndr/string/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ndr/string/channelstring2channels.py b/src/ndr/string/channelstring2channels.py new file mode 100644 index 0000000..590bf81 --- /dev/null +++ b/src/ndr/string/channelstring2channels.py @@ -0,0 +1,64 @@ +"""Channel string parsing utilities. + +Port of +ndr/+string/channelstring2channels.m +""" + +from __future__ import annotations + +from ndr.string.str2intseq import str2intseq + + +def channelstring2channels(channelstring: str) -> tuple[list[str], list[int]]: + """Convert a channel string to arrays of channel prefixes and numbers. + + A channel string specifies channels with prefixes and numbers. + Use '-' for sequential ranges, ',' to enumerate, and '+' to separate + different channel prefix groups. + + Parameters + ---------- + channelstring : str + Channel specification string (e.g., 'ai1-3', 'ai1,3-5+di2-4'). + + Returns + ------- + tuple of (list[str], list[int]) + (channel_name_prefixes, channel_numbers) + + Examples + -------- + >>> channelstring2channels('a1,3-5,2') + (['a', 'a', 'a', 'a', 'a'], [1, 3, 4, 5, 2]) + >>> channelstring2channels('ai1-3+b2-4') + (['ai', 'ai', 'ai', 'b', 'b', 'b'], [1, 2, 3, 2, 3, 4]) + """ + channelstring = channelstring.strip() + block_separator = "+" + + blocks = channelstring.split(block_separator) + + channelnameprefix: list[str] = [] + channelnumber: list[int] = [] + + for block in blocks: + block = block.strip() + if not block: + continue + + # Find the first non-letter character + first_non_letter = None + for i, ch in enumerate(block): + if not ch.isalpha(): + first_non_letter = i + break + + if first_non_letter is None: + raise ValueError(f"No numbers provided in channel string segment '{block}'.") + + prefix = block[:first_non_letter] + numbers = str2intseq(block[first_non_letter:]) + + channelnameprefix.extend([prefix] * len(numbers)) + channelnumber.extend(numbers) + + return channelnameprefix, channelnumber diff --git a/src/ndr/string/intseq2str.py b/src/ndr/string/intseq2str.py new file mode 100644 index 0000000..e27414b --- /dev/null +++ b/src/ndr/string/intseq2str.py @@ -0,0 +1,41 @@ +"""Integer sequence to string conversion. + +Port of +ndr/+string/intseq2str.m +""" + +from __future__ import annotations + + +def intseq2str(seq: list[int]) -> str: + """Convert a list of integers to a compact range string. + + Parameters + ---------- + seq : list of int + Integer sequence. + + Returns + ------- + str + Compact string representation (e.g., [1,2,3,5] -> '1-3,5'). + """ + if not seq: + return "" + + sorted_seq = sorted(set(seq)) + parts: list[str] = [] + i = 0 + + while i < len(sorted_seq): + start = sorted_seq[i] + end = start + while i + 1 < len(sorted_seq) and sorted_seq[i + 1] == end + 1: + end = sorted_seq[i + 1] + i += 1 + if start == end: + parts.append(str(start)) + else: + parts.append(f"{start}-{end}") + i += 1 + + return ",".join(parts) diff --git a/src/ndr/string/str2intseq.py b/src/ndr/string/str2intseq.py new file mode 100644 index 0000000..e1f77ba --- /dev/null +++ b/src/ndr/string/str2intseq.py @@ -0,0 +1,40 @@ +"""String to integer sequence parsing. + +Port of +ndr/+string/str2intseq.m +""" + +from __future__ import annotations + + +def str2intseq(s: str) -> list[int]: + """Convert a string with comma-separated numbers and dash ranges to a list of ints. + + Parameters + ---------- + s : str + Number specification (e.g., '1,3-5,2' -> [1, 3, 4, 5, 2]). + + Returns + ------- + list of int + The parsed integer sequence. + """ + s = s.strip() + if not s: + return [] + + result: list[int] = [] + parts = s.split(",") + + for part in parts: + part = part.strip() + if "-" in part: + # Handle range like '3-5' + range_parts = part.split("-", 1) + start = int(range_parts[0].strip()) + end = int(range_parts[1].strip()) + result.extend(range(start, end + 1)) + else: + result.append(int(part)) + + return result diff --git a/src/ndr/time/__init__.py b/src/ndr/time/__init__.py new file mode 100644 index 0000000..dac1b64 --- /dev/null +++ b/src/ndr/time/__init__.py @@ -0,0 +1,6 @@ +"""NDR time utilities. + +Port of +ndr/+time/ +""" + +from ndr.time.clocktype import ClockType diff --git a/src/ndr/time/clocktype.py b/src/ndr/time/clocktype.py new file mode 100644 index 0000000..e3ad62e --- /dev/null +++ b/src/ndr/time/clocktype.py @@ -0,0 +1,62 @@ +"""NDR clock type definitions. + +Port of +ndr/+time/clocktype.m +""" + +from __future__ import annotations + +VALID_CLOCK_TYPES = ( + "utc", + "exp_global_time", + "dev_global_time", + "dev_local_time", + "approx_dev_local_time", + "dev_global_time_no_tz", + "exp_global_time_no_tz", + "no_time", + "inherited", +) + + +class ClockType: + """Represents a timing specification for neural data epochs. + + Port of ndr.time.clocktype. + """ + + def __init__(self, clock_type: str = "dev_local_time"): + self.type: str = "" + self.setclocktype(clock_type) + + def setclocktype(self, clock_type: str) -> None: + """Set the clock type, validating against known types.""" + if clock_type not in VALID_CLOCK_TYPES: + raise ValueError( + f"Unknown clock type '{clock_type}'. " + f"Valid types are: {', '.join(VALID_CLOCK_TYPES)}" + ) + self.type = clock_type + + def needsepoch(self) -> bool: + """Return True if this clock type requires an epoch reference.""" + return self.type == "dev_local_time" + + def ndr_clocktype2char(self) -> str: + """Return the clock type as a string.""" + return self.type + + def __eq__(self, other: object) -> bool: + if not isinstance(other, ClockType): + return NotImplemented + return self.type == other.type + + def __ne__(self, other: object) -> bool: + if not isinstance(other, ClockType): + return NotImplemented + return self.type != other.type + + def __repr__(self) -> str: + return f"ClockType('{self.type}')" + + def __str__(self) -> str: + return self.type diff --git a/src/ndr/time/fun/__init__.py b/src/ndr/time/fun/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/ndr/time/fun/samples2times.py b/src/ndr/time/fun/samples2times.py new file mode 100644 index 0000000..a6902cf --- /dev/null +++ b/src/ndr/time/fun/samples2times.py @@ -0,0 +1,43 @@ +"""Convert sample index numbers to sample times. + +Port of +ndr/+time/+fun/samples2times.m +""" + +from __future__ import annotations + +import numpy as np + + +def samples2times( + s: np.ndarray | int | float, + t0_t1: tuple[float, float] | list[float], + sr: float, +) -> np.ndarray: + """Convert sample index numbers to sample times. + + Parameters + ---------- + s : array-like + Sample index numbers (1-based, matching MATLAB convention). + t0_t1 : tuple of (t0, t1) + The beginning and end times of the recording. + sr : float + The fixed sample rate in Hz. + + Returns + ------- + numpy.ndarray + Times corresponding to each sample. + """ + s = np.asarray(s, dtype=float) + t = (s - 1) / sr + t0_t1[0] + + # Handle -inf samples -> t0 + neg_inf = np.isinf(s) & (s < 0) + t[neg_inf] = t0_t1[0] + + # Handle +inf samples -> t1 + pos_inf = np.isinf(s) & (s > 0) + t[pos_inf] = t0_t1[1] + + return t diff --git a/src/ndr/time/fun/times2samples.py b/src/ndr/time/fun/times2samples.py new file mode 100644 index 0000000..f6b71e6 --- /dev/null +++ b/src/ndr/time/fun/times2samples.py @@ -0,0 +1,43 @@ +"""Convert sample times to sample index numbers. + +Port of +ndr/+time/+fun/times2samples.m +""" + +from __future__ import annotations + +import numpy as np + + +def times2samples( + t: np.ndarray | float, + t0_t1: tuple[float, float] | list[float], + sr: float, +) -> np.ndarray: + """Convert sample times to sample index numbers. + + Parameters + ---------- + t : array-like + Times of samples in seconds. + t0_t1 : tuple of (t0, t1) + The beginning and end times of the recording. + sr : float + The fixed sample rate in Hz. + + Returns + ------- + numpy.ndarray + Sample index numbers (1-based, matching MATLAB convention). + """ + t = np.asarray(t, dtype=float) + s = 1 + np.round((t - t0_t1[0]) * sr) + + # Handle -inf times -> sample 1 + neg_inf = np.isinf(t) & (t < 0) + s[neg_inf] = 1 + + # Handle +inf times -> last sample + pos_inf = np.isinf(t) & (t > 0) + s[pos_inf] = 1 + sr * (t0_t1[1] - t0_t1[0]) + + return s diff --git a/src/ndr/time/ndr_matlab_python_bridge.yaml b/src/ndr/time/ndr_matlab_python_bridge.yaml new file mode 100644 index 0000000..b9af53c --- /dev/null +++ b/src/ndr/time/ndr_matlab_python_bridge.yaml @@ -0,0 +1,40 @@ +project_metadata: + bridge_version: "1.1" + naming_policy: "Strict MATLAB Mirror" + indexing_policy: "Semantic Parity (1-based for user concepts, 0-based for internal data)" + +classes: + - name: clocktype + type: class + matlab_path: "+ndr/+time/clocktype.m" + python_path: "ndr/time/clocktype.py" + python_class: "ClockType" + +functions: + - name: samples2times + matlab_path: "+ndr/+time/+fun/samples2times.m" + python_path: "ndr/time/fun/samples2times.py" + input_arguments: + - name: s + type_python: "numpy.ndarray | int | float" + - name: t0_t1 + type_python: "tuple[float, float]" + - name: sr + type_python: "float" + output_arguments: + - name: t + type_python: "numpy.ndarray" + + - name: times2samples + matlab_path: "+ndr/+time/+fun/times2samples.m" + python_path: "ndr/time/fun/times2samples.py" + input_arguments: + - name: t + type_python: "numpy.ndarray | float" + - name: t0_t1 + type_python: "tuple[float, float]" + - name: sr + type_python: "float" + output_arguments: + - name: s + type_python: "numpy.ndarray" diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/symmetry/__init__.py b/tests/symmetry/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/symmetry/conftest.py b/tests/symmetry/conftest.py new file mode 100644 index 0000000..531dad1 --- /dev/null +++ b/tests/symmetry/conftest.py @@ -0,0 +1,12 @@ +"""Shared fixtures and configuration for NDR symmetry tests.""" + +import tempfile +from pathlib import Path + +# Base directory where all symmetry artifacts live: +# /NDR/symmetryTest///// +SYMMETRY_BASE = Path(tempfile.gettempdir()) / "NDR" / "symmetryTest" +PYTHON_ARTIFACTS = SYMMETRY_BASE / "pythonArtifacts" +MATLAB_ARTIFACTS = SYMMETRY_BASE / "matlabArtifacts" + +SOURCE_TYPES = ["matlabArtifacts", "pythonArtifacts"] diff --git a/tests/symmetry/make_artifacts/__init__.py b/tests/symmetry/make_artifacts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/symmetry/make_artifacts/reader/__init__.py b/tests/symmetry/make_artifacts/reader/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/symmetry/make_artifacts/reader/test_read_data.py b/tests/symmetry/make_artifacts/reader/test_read_data.py new file mode 100644 index 0000000..bb4c863 --- /dev/null +++ b/tests/symmetry/make_artifacts/reader/test_read_data.py @@ -0,0 +1,59 @@ +"""Generate symmetry artifacts for NDR reader tests. + +Reads example data files using NDR readers and exports: +- Channel metadata (names, types, sample rates) +- Epoch clock types and t0/t1 boundaries +- Actual data samples for comparison +""" + +import json +import shutil +from pathlib import Path + +import pytest + +from tests.symmetry.conftest import PYTHON_ARTIFACTS + +ARTIFACT_DIR = PYTHON_ARTIFACTS / "reader" / "readData" / "testReadDataArtifacts" +EXAMPLE_DATA = Path(__file__).parents[4] / "example_data" + + +class TestReadData: + @pytest.fixture(autouse=True) + def _setup(self): + rhd_file = EXAMPLE_DATA / "Intan_160317_125049_short.rhd" + if not rhd_file.exists(): + pytest.skip("Example RHD file not available") + + from ndr.reader.intan_rhd import IntanRHD + + self.reader = IntanRHD() + self.epochfiles = [str(rhd_file)] + + def test_read_data_artifacts(self): + if ARTIFACT_DIR.exists(): + shutil.rmtree(ARTIFACT_DIR) + ARTIFACT_DIR.mkdir(parents=True) + + # Export channel metadata + channels = self.reader.getchannelsepoch(self.epochfiles) + sr = self.reader.samplerate(self.epochfiles, 1, "ai", 1) + t0t1 = self.reader.t0_t1(self.epochfiles) + ec = self.reader.epochclock(self.epochfiles) + + metadata = { + "channels": channels, + "samplerate": sr, + "t0_t1": t0t1[0], + "epochclock": [str(c) for c in ec], + } + (ARTIFACT_DIR / "metadata.json").write_text( + json.dumps(metadata, indent=2, default=str), encoding="utf-8" + ) + + # Export a small data sample + data = self.reader.readchannels_epochsamples("ai", [1], self.epochfiles, 1, 1, 100) + (ARTIFACT_DIR / "readData.json").write_text( + json.dumps({"ai_channel_1_samples_1_100": data.flatten().tolist()}, indent=2), + encoding="utf-8", + ) diff --git a/tests/symmetry/read_artifacts/__init__.py b/tests/symmetry/read_artifacts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/symmetry/read_artifacts/reader/__init__.py b/tests/symmetry/read_artifacts/reader/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/symmetry/read_artifacts/reader/test_read_data.py b/tests/symmetry/read_artifacts/reader/test_read_data.py new file mode 100644 index 0000000..1887b70 --- /dev/null +++ b/tests/symmetry/read_artifacts/reader/test_read_data.py @@ -0,0 +1,66 @@ +"""Read and verify symmetry artifacts for NDR reader tests.""" + +import json +from pathlib import Path + +import numpy as np +import pytest + +from tests.symmetry.conftest import SOURCE_TYPES, SYMMETRY_BASE + +EXAMPLE_DATA = Path(__file__).parents[4] / "example_data" + + +@pytest.fixture(params=SOURCE_TYPES) +def source_type(request): + return request.param + + +class TestReadData: + def _artifact_dir(self, source_type): + return SYMMETRY_BASE / source_type / "reader" / "readData" / "testReadDataArtifacts" + + def test_read_data_metadata(self, source_type): + artifact_dir = self._artifact_dir(source_type) + if not artifact_dir.exists(): + pytest.skip(f"No artifacts from {source_type}") + + metadata = json.loads((artifact_dir / "metadata.json").read_text()) + + rhd_file = EXAMPLE_DATA / "Intan_160317_125049_short.rhd" + if not rhd_file.exists(): + pytest.skip("Example RHD file not available") + + from ndr.reader.intan_rhd import IntanRHD + + reader = IntanRHD() + epochfiles = [str(rhd_file)] + + actual_sr = reader.samplerate(epochfiles, 1, "ai", 1) + assert ( + actual_sr == metadata["samplerate"] + ), f"Sample rate mismatch: {actual_sr} vs {metadata['samplerate']}" + + actual_t0_t1 = reader.t0_t1(epochfiles) + assert np.allclose(actual_t0_t1[0], metadata["t0_t1"], atol=1e-6) + + def test_read_data_samples(self, source_type): + artifact_dir = self._artifact_dir(source_type) + if not artifact_dir.exists(): + pytest.skip(f"No artifacts from {source_type}") + + read_data = json.loads((artifact_dir / "readData.json").read_text()) + expected = np.array(read_data["ai_channel_1_samples_1_100"]) + + rhd_file = EXAMPLE_DATA / "Intan_160317_125049_short.rhd" + if not rhd_file.exists(): + pytest.skip("Example RHD file not available") + + from ndr.reader.intan_rhd import IntanRHD + + reader = IntanRHD() + actual = reader.readchannels_epochsamples("ai", [1], [str(rhd_file)], 1, 1, 100) + + assert np.allclose( + actual.flatten(), expected, atol=1e-9 + ), f"Data mismatch for ai channel 1, samples 1-100 ({source_type})" diff --git a/tests/test_reader_base.py b/tests/test_reader_base.py new file mode 100644 index 0000000..61d38e5 --- /dev/null +++ b/tests/test_reader_base.py @@ -0,0 +1,154 @@ +"""Unit tests for ndr.reader.base.""" + +import numpy as np +import pytest + +from ndr.reader.base import Base +from ndr.time.clocktype import ClockType + + +class ConcreteReader(Base): + """Minimal concrete implementation for testing.""" + + def readchannels_epochsamples(self, channeltype, channel, epochstreams, epoch_select, s0, s1): + return np.zeros((s1 - s0 + 1, 1)) + + def readevents_epochsamples_native( + self, channeltype, channel, epochstreams, epoch_select, t0, t1 + ): + return np.array([]), np.array([]) + + +class TestBase: + def test_mfdaq_channeltypes(self): + ct = Base.mfdaq_channeltypes() + assert "analog_in" in ct + assert "digital_in" in ct + assert "time" in ct + assert len(ct) == 8 + + def test_mfdaq_prefix(self): + assert Base.mfdaq_prefix("analog_in") == "ai" + assert Base.mfdaq_prefix("ai") == "ai" + assert Base.mfdaq_prefix("digital_in") == "di" + assert Base.mfdaq_prefix("time") == "t" + assert Base.mfdaq_prefix("auxiliary") == "ax" + assert Base.mfdaq_prefix("event") == "e" + assert Base.mfdaq_prefix("marker") == "mk" + + def test_mfdaq_prefix_unknown(self): + with pytest.raises(ValueError, match="Unknown channel type"): + Base.mfdaq_prefix("nonexistent") + + def test_mfdaq_type(self): + assert Base.mfdaq_type("ai") == "analog_in" + assert Base.mfdaq_type("analog_in") == "analog_in" + assert Base.mfdaq_type("di") == "digital_in" + assert Base.mfdaq_type("time") == "time" + + def test_mfdaq_type_unknown(self): + with pytest.raises(ValueError, match="unknown"): + Base.mfdaq_type("nonexistent") + + def test_canbereadtogether_same_sr(self): + reader = ConcreteReader() + channels = [ + {"samplerate": 20000.0}, + {"samplerate": 20000.0}, + ] + b, msg = reader.canbereadtogether(channels) + assert b is True + assert msg == "" + + def test_canbereadtogether_different_sr(self): + reader = ConcreteReader() + channels = [ + {"samplerate": 20000.0}, + {"samplerate": 10000.0}, + ] + b, msg = reader.canbereadtogether(channels) + assert b is False + assert "same" in msg.lower() + + def test_canbereadtogether_all_nan(self): + reader = ConcreteReader() + channels = [ + {"samplerate": float("nan")}, + {"samplerate": float("nan")}, + ] + b, msg = reader.canbereadtogether(channels) + assert b is True + + def test_canbereadtogether_mixed_nan(self): + reader = ConcreteReader() + channels = [ + {"samplerate": 20000.0}, + {"samplerate": float("nan")}, + ] + b, msg = reader.canbereadtogether(channels) + assert b is False + + def test_epochclock_default(self): + reader = ConcreteReader() + ec = reader.epochclock(["test.rhd"]) + assert len(ec) == 1 + assert isinstance(ec[0], ClockType) + assert ec[0].type == "dev_local_time" + + def test_t0_t1_default(self): + reader = ConcreteReader() + t = reader.t0_t1(["test.rhd"]) + assert len(t) == 1 + assert np.isnan(t[0][0]) + assert np.isnan(t[0][1]) + + def test_getchannelsepoch_default(self): + reader = ConcreteReader() + channels = reader.getchannelsepoch(["test.rhd"]) + assert channels == [] + + def test_underlying_datatype(self): + reader = ConcreteReader() + dt, p, ds = reader.underlying_datatype(["test.rhd"], 1, "analog_in", 1) + assert dt == "float64" + assert ds == 64 + assert p.shape == (1, 2) + + def test_underlying_datatype_unknown(self): + reader = ConcreteReader() + with pytest.raises(ValueError, match="Unknown channel type"): + reader.underlying_datatype(["test.rhd"], 1, "nonexistent", 1) + + def test_might_have_time_gaps(self): + reader = ConcreteReader() + assert reader.MightHaveTimeGaps is False + + +class TestClockType: + def test_init_default(self): + ct = ClockType() + assert ct.type == "dev_local_time" + + def test_init_custom(self): + ct = ClockType("utc") + assert ct.type == "utc" + + def test_invalid_type(self): + with pytest.raises(ValueError, match="Unknown clock type"): + ClockType("invalid_type") + + def test_needsepoch(self): + assert ClockType("dev_local_time").needsepoch() is True + assert ClockType("utc").needsepoch() is False + + def test_equality(self): + a = ClockType("utc") + b = ClockType("utc") + c = ClockType("dev_local_time") + assert a == b + assert a != c + + def test_str_repr(self): + ct = ClockType("utc") + assert str(ct) == "utc" + assert "utc" in repr(ct) diff --git a/tests/test_readers.py b/tests/test_readers.py new file mode 100644 index 0000000..d6717e5 --- /dev/null +++ b/tests/test_readers.py @@ -0,0 +1,167 @@ +"""Unit tests for NDR readers.""" + +import numpy as np +import pytest + +from ndr.known_readers import known_readers +from ndr.reader.intan_rhd import IntanRHD +from ndr.string.channelstring2channels import channelstring2channels +from ndr.string.str2intseq import str2intseq +from ndr.time.fun.samples2times import samples2times +from ndr.time.fun.times2samples import times2samples + + +class TestIntanRHDStatic: + """Test static methods of IntanRHD that don't require data files.""" + + def test_mfdaqchanneltype2intanheadertype(self): + assert IntanRHD.mfdaqchanneltype2intanheadertype("analog_in") == "amplifier_channels" + assert IntanRHD.mfdaqchanneltype2intanheadertype("ai") == "amplifier_channels" + assert IntanRHD.mfdaqchanneltype2intanheadertype("digital_in") == "board_dig_in_channels" + + def test_intanheadertype2mfdaqchanneltype(self): + assert IntanRHD.intanheadertype2mfdaqchanneltype("amplifier_channels") == "analog_in" + assert IntanRHD.intanheadertype2mfdaqchanneltype("board_dig_in_channels") == "digital_in" + + def test_mfdaqchanneltype2intanchanneltype(self): + assert IntanRHD.mfdaqchanneltype2intanchanneltype("analog_in") == "amp" + assert IntanRHD.mfdaqchanneltype2intanchanneltype("digital_in") == "din" + assert IntanRHD.mfdaqchanneltype2intanchanneltype("time") == "time" + + def test_intanchanneltype2mfdaqchanneltype(self): + assert IntanRHD.intanchanneltype2mfdaqchanneltype("amp") == "ai" + assert IntanRHD.intanchanneltype2mfdaqchanneltype("din") == "di" + + def test_intananychannelname2intanchanneltype(self): + ct, absolute = IntanRHD.intananychannelname2intanchanneltype("ai") + assert ct == "amp" + assert absolute is False + + ct, absolute = IntanRHD.intananychannelname2intanchanneltype("A") + assert ct == "amp" + assert absolute is True + + def test_mfdaqchanneltype2intanfreqheader(self): + assert IntanRHD.mfdaqchanneltype2intanfreqheader("ai") == "amplifier_sample_rate" + assert IntanRHD.mfdaqchanneltype2intanfreqheader("auxiliary") == "aux_input_sample_rate" + + def test_intanname2mfdaqname_struct(self): + name = IntanRHD.intanname2mfdaqname( + "analog_in", + {"native_channel_name": "A-000", "chip_channel": 0}, + ) + assert name == "ai1" + + def test_intanname2mfdaqname_string(self): + name = IntanRHD.intanname2mfdaqname("analog_in", "A-000") + assert name == "ai1" + + def test_intanname2mfdaqname_aux(self): + name = IntanRHD.intanname2mfdaqname( + "auxiliary_in", + {"native_channel_name": "AUX1", "chip_channel": 0}, + ) + assert name == "ax1" + + def test_filenamefromepochfiles(self): + reader = IntanRHD() + fn, pdir, isdir = reader.filenamefromepochfiles(["/path/to/data.rhd"]) + assert fn == "/path/to/data.rhd" + assert isdir is False + + def test_filenamefromepochfiles_no_rhd(self): + reader = IntanRHD() + with pytest.raises(ValueError, match="Need 1 .rhd"): + reader.filenamefromepochfiles(["/path/to/data.abf"]) + + def test_filenamefromepochfiles_multiple_rhd(self): + reader = IntanRHD() + with pytest.raises(ValueError, match="Need only 1"): + reader.filenamefromepochfiles(["/path/a.rhd", "/path/b.rhd"]) + + +class TestKnownReaders: + def test_known_readers(self): + readers = known_readers() + assert len(readers) > 0 + # Check that intan is in there + flat = [t for entry in readers for t in entry] + assert "intan" in flat + assert "rhd" in flat + assert "abf" in flat + + +class TestChannelString: + def test_simple(self): + prefix, numbers = channelstring2channels("ai1") + assert prefix == ["ai"] + assert numbers == [1] + + def test_range(self): + prefix, numbers = channelstring2channels("ai1-3") + assert prefix == ["ai", "ai", "ai"] + assert numbers == [1, 2, 3] + + def test_comma(self): + prefix, numbers = channelstring2channels("ai1,3,5") + assert prefix == ["ai", "ai", "ai"] + assert numbers == [1, 3, 5] + + def test_mixed(self): + prefix, numbers = channelstring2channels("ai1-3+di2-4") + assert prefix == ["ai", "ai", "ai", "di", "di", "di"] + assert numbers == [1, 2, 3, 2, 3, 4] + + def test_no_numbers(self): + with pytest.raises(ValueError, match="No numbers"): + channelstring2channels("ai") + + +class TestStr2IntSeq: + def test_single(self): + assert str2intseq("5") == [5] + + def test_range(self): + assert str2intseq("1-3") == [1, 2, 3] + + def test_comma(self): + assert str2intseq("1,3,5") == [1, 3, 5] + + def test_mixed(self): + assert str2intseq("1,3-5,2") == [1, 3, 4, 5, 2] + + def test_empty(self): + assert str2intseq("") == [] + + +class TestTimeFunctions: + def test_samples2times(self): + t = samples2times(np.array([1, 2, 3]), (0.0, 1.0), 10.0) + np.testing.assert_allclose(t, [0.0, 0.1, 0.2]) + + def test_times2samples(self): + s = times2samples(np.array([0.0, 0.1, 0.2]), (0.0, 1.0), 10.0) + np.testing.assert_allclose(s, [1, 2, 3]) + + def test_samples2times_inf(self): + t = samples2times(np.array([-np.inf, np.inf]), (0.0, 1.0), 10.0) + assert t[0] == 0.0 + assert t[1] == 1.0 + + def test_times2samples_inf(self): + s = times2samples(np.array([-np.inf, np.inf]), (0.0, 1.0), 10.0) + assert s[0] == 1 + assert s[1] == 1 + 10.0 * 1.0 + + +class TestReaderWrapper: + def test_import(self): + import ndr + + assert hasattr(ndr, "reader") + + def test_unknown_type(self): + from ndr.reader_wrapper import Reader + + with pytest.raises(ValueError, match="Do not know"): + Reader("nonexistent_format")