From 7e903d857b05ae5dcf84c22a33ad90b953e7d6cb Mon Sep 17 00:00:00 2001 From: Derrick Chambers Date: Fri, 1 May 2026 14:19:53 +0200 Subject: [PATCH 1/8] Add MiniSEED read support --- dascore/data_registry.txt | 1 + dascore/io/mseed/__init__.py | 5 + dascore/io/mseed/core.py | 68 +++++ dascore/io/mseed/utils.py | 309 ++++++++++++++++++++++ pyproject.toml | 3 + tests/test_io/test_common_io.py | 27 +- tests/test_io/test_mseed/test_mseed.py | 343 +++++++++++++++++++++++++ 7 files changed, 747 insertions(+), 9 deletions(-) create mode 100644 dascore/io/mseed/__init__.py create mode 100644 dascore/io/mseed/core.py create mode 100644 dascore/io/mseed/utils.py create mode 100644 tests/test_io/test_mseed/test_mseed.py diff --git a/dascore/data_registry.txt b/dascore/data_registry.txt index caf9c249..1e8d8eb2 100644 --- a/dascore/data_registry.txt +++ b/dascore/data_registry.txt @@ -6,6 +6,7 @@ terra15_v6_test_file.hdf5 63f130ed93c1fe4105f2a355272d136a51abfe5c212a40be3a2907 iDAS005_hdf5_example.626.h5 28f4f9b1f2b248fa0419b41c11e971f2d395c3ed43afc1ef5d5c35f399e99190 https://github.com/dasdae/test_data/raw/master/das/iDAS005_hdf5_example.626.h5 iDAS005_tdms_example.626.tdms 0c31f75015bc671d958967c7fdf32e2e64fdb467f997d331ae6a7d4c989ab380 https://github.com/dasdae/test_data/raw/master/das/iDAS005_tdms_example.626.tdms sample_tdms_file_v4713.tdms 22a79c4c3166ce3cc8467265540cdb3ad2b54460209d39641c1e8f20e64eca65 https://github.com/dasdae/test_data/raw/master/das/sample_tdms_file_v4713.tdms +etna_9n_3chan_10s.mseed e79608a6aa47cd0b70fa227679bc5d266af117ea4bc3e02e0c336c563b35b280 https://github.com/DASDAE/test_data/raw/master/das/etna_9n_3chan_10s.mseed example_dasdae_event_1.h5 854530af7ea10bab54dd0f6458c5d67d8b334ee6e4032b75702fa408352b5007 https://github.com/dasdae/test_data/raw/master/das/example_dasdae_event_1.h5 opta_sense_quantx_v2.h5 fca8c8b19a9e253f9252c14de3f2a7778391a763f39142fffc63ab0cc3fa9cc7 https://github.com/dasdae/test_data/raw/master/das/opta_sense_quantx_v2.h5 prodml_2.0.h5 8f8eded106acab0b4266c5c752cc702363cb09391f5fddf1534429e94e4e9ab2 https://github.com/dasdae/test_data/raw/master/das/prodml_2.0.h5 diff --git a/dascore/io/mseed/__init__.py b/dascore/io/mseed/__init__.py new file mode 100644 index 00000000..83c12b31 --- /dev/null +++ b/dascore/io/mseed/__init__.py @@ -0,0 +1,5 @@ +"""DASCore support for MiniSEED files.""" + +from __future__ import annotations + +from dascore.io.mseed.core import MSeedV2, MSeedV3 diff --git a/dascore/io/mseed/core.py b/dascore/io/mseed/core.py new file mode 100644 index 00000000..834f9dd7 --- /dev/null +++ b/dascore/io/mseed/core.py @@ -0,0 +1,68 @@ +"""IO support for MiniSEED files. + +DASCore maps MiniSEED source IDs into 2D ``("channel", "time")`` patches. +MiniSEED source identity is preserved as per-channel coordinates instead of +scalar attrs because one DASCore patch can contain many MiniSEED sources. + +The mapping follows the FDSN source identifier convention, where a SEED NSLC +code maps to ``FDSN:_____``. +See https://docs.fdsn.org/projects/source-identifiers/en/latest/definition.html. + +For DAS MiniSEED, GEOFON recommends representing each fiber sampling point as +a station because each sampling point has its own position; the channel code +``HSF`` is recommended for fiber optic DAS. See +https://geofon.gfz.de/redmine/projects/redmine/wiki/DAS. +""" + +from __future__ import annotations + +import dascore as dc +from dascore.constants import SpoolType, opt_timeable_types +from dascore.io import FiberIO, ScanPayload +from dascore.utils.io import LocalPath +from dascore.utils.misc import optional_import + +from .utils import _detect_format, _get_patches, _scan_patches + + +class MSeedV2(FiberIO): + """Support MiniSEED version 2 files.""" + + name = "MSEED" + preferred_extensions = ("mseed", "msd", "miniseed") + version = "2" + + def get_format(self, path: LocalPath, **kwargs) -> tuple[str, str] | bool: + """Determine if path is a MiniSEED file.""" + pymseed = optional_import("pymseed") + return _detect_format(path, pymseed) + + def scan(self, path: LocalPath, **kwargs) -> list[ScanPayload]: + """Scan a MiniSEED file.""" + pymseed = optional_import("pymseed") + return _scan_patches(path, pymseed) + + def read( + self, + path: LocalPath, + time: tuple[opt_timeable_types, opt_timeable_types] | None = None, + channel: tuple[int | None, int | None] | None = None, + source_patch_id=(), + **kwargs, + ) -> SpoolType: + """Read a MiniSEED file.""" + pymseed = optional_import("pymseed") + patches = _get_patches( + path, + pymseed, + time=time, + channel=channel, + source_patch_id=source_patch_id, + ) + return dc.spool(patches) + + +class MSeedV3(MSeedV2): + """Support MiniSEED version 3 files.""" + + version = "3" diff --git a/dascore/io/mseed/utils.py b/dascore/io/mseed/utils.py new file mode 100644 index 00000000..817fa5c8 --- /dev/null +++ b/dascore/io/mseed/utils.py @@ -0,0 +1,309 @@ +"""Utilities for MiniSEED IO.""" + +from __future__ import annotations + +from dataclasses import dataclass, replace +from itertools import groupby +from math import ceil, floor + +import numpy as np + +import dascore as dc +from dascore.constants import ONE_BILLION +from dascore.core import get_coord +from dascore.io import ScanPayload +from dascore.io.core import _patch_to_scan_payload +from dascore.utils.io import LocalPath, _normalize_source_patch_ids +from dascore.utils.time import to_datetime64, to_int + +_TimeLimits = tuple[int | None, int | None] + + +@dataclass(frozen=True) +class _TraceSegment: + """A decoded contiguous MiniSEED segment from one source ID.""" + + source_id: str + network: str + station: str + location: str + seed_channel: str + format_version: str + start_ns: int + sample_rate: float + data: np.ndarray + sample_type: str + encoding: str + publication_version: int + record_length: int + + @property + def sample_count(self) -> int: + """Return the number of samples in the segment.""" + return len(self.data) + + @property + def sample_step_ns(self) -> int: + """Return the sample spacing in nanoseconds.""" + return _sample_step_ns(self.sample_rate) + + @property + def next_start_ns(self) -> int: + """Return the expected start time of the next contiguous segment.""" + return self.start_ns + self.sample_count * self.sample_step_ns + + +def _sample_step_ns(sample_rate: float) -> int: + """Convert a MiniSEED sample rate to nanosecond spacing.""" + if sample_rate == 0: + msg = "MiniSEED sample rate cannot be zero." + raise ValueError(msg) + seconds = 1 / sample_rate if sample_rate > 0 else abs(sample_rate) + return round(seconds * ONE_BILLION) + + +def _time_to_ns(value) -> int | None: + """Convert a time-like value to epoch nanoseconds.""" + if value is None or value is ...: + return None + return int(to_int(to_datetime64(value))) + + +def _get_time_limits(time=None) -> _TimeLimits: + """Return optional time limits as epoch nanoseconds.""" + if time is None or time is ...: + return None, None + return _time_to_ns(time[0]), _time_to_ns(time[1]) + + +def _source_id_to_nslc(pymseed, source_id: str) -> tuple[str, str, str, str]: + """Return NSLC codes from a MiniSEED source ID.""" + try: + nslc = pymseed.sourceid2nslc(source_id) + except Exception: + return "", source_id, "", "" + return tuple("" if x is None else str(x) for x in nslc) + + +def _iter_records(path: LocalPath, pymseed, unpack_data: bool = True): + """Yield records from a MiniSEED file.""" + yield from pymseed.MS3Record.from_file(str(path), unpack_data=unpack_data) + + +def _record_overlaps_time(record, time_limits: _TimeLimits) -> bool: + """Return True if a record overlaps a requested time range.""" + start, stop = time_limits + record_start = int(record.starttime) + record_stop = int(record.endtime) + after_start = stop is None or record_start <= stop + before_stop = start is None or record_stop >= start + return after_start and before_stop + + +def _record_to_segment( + record, pymseed, time_limits: _TimeLimits +) -> _TraceSegment | None: + """Convert a PyMseed record to a trace segment.""" + if not _record_overlaps_time(record, time_limits): + return None + record.unpack_data() + data = np.asarray(record.np_datasamples) + network, station, location, seed_channel = _source_id_to_nslc( + pymseed, str(record.sourceid) + ) + segment = _TraceSegment( + source_id=str(record.sourceid), + network=network, + station=station, + location=location, + seed_channel=seed_channel, + format_version=str(record.formatversion), + start_ns=int(record.starttime), + sample_rate=float(record.samprate), + data=data.copy(), + sample_type=str(record.sampletype or ""), + encoding=str(record.encoding), + publication_version=int(getattr(record, "pubversion", 0) or 0), + record_length=int(getattr(record, "reclen", 0) or 0), + ) + return _trim_segment_time(segment, time_limits) + + +def _trim_segment_time( + segment: _TraceSegment, time_limits: _TimeLimits +) -> _TraceSegment | None: + """Trim a segment to a requested time range.""" + start, stop = time_limits + if start is None and stop is None: + return segment + step = segment.sample_step_ns + start_index = ( + 0 if start is None else max(0, ceil((start - segment.start_ns) / step)) + ) + stop_index = ( + segment.sample_count + if stop is None + else min(segment.sample_count, floor((stop - segment.start_ns) / step) + 1) + ) + if stop_index <= start_index: + return None + return replace( + segment, + start_ns=segment.start_ns + start_index * step, + data=segment.data[start_index:stop_index].copy(), + ) + + +def _coalesce_source_segments(segments: list[_TraceSegment]) -> list[_TraceSegment]: + """Merge contiguous records for the same source ID.""" + segments = sorted(segments, key=lambda x: (x.source_id, x.start_ns)) + out = [] + for source_id, source_group in groupby(segments, key=lambda x: x.source_id): + pending = None + for seg in source_group: + if pending is None: + pending = seg + continue + can_merge = ( + pending.source_id == source_id + and pending.sample_rate == seg.sample_rate + and pending.format_version == seg.format_version + and pending.next_start_ns == seg.start_ns + and pending.data.dtype == seg.data.dtype + ) + if can_merge: + pending = replace( + pending, + data=np.concatenate([pending.data, seg.data]), + record_length=max(pending.record_length, seg.record_length), + ) + else: + out.append(pending) + pending = seg + if pending is not None: + out.append(pending) + return out + + +def _read_segments(path: LocalPath, pymseed, time=None) -> list[_TraceSegment]: + """Read and coalesce decoded MiniSEED records.""" + segments = [] + time_limits = _get_time_limits(time) + for record in _iter_records(path, pymseed, unpack_data=False): + segment = _record_to_segment(record, pymseed, time_limits) + if segment is not None and segment.sample_count: + segments.append(segment) + return _coalesce_source_segments(segments) + + +def _get_group_key(segment: _TraceSegment): + """Return the compatibility key used to merge traces into a patch.""" + return ( + segment.format_version, + segment.network, + segment.location, + segment.seed_channel, + segment.start_ns, + segment.sample_rate, + ) + + +def _group_segments(segments: list[_TraceSegment]): + """Yield compatible segment groups.""" + segments = sorted(segments, key=lambda x: (_get_group_key(x), x.source_id)) + for key, group in groupby(segments, key=_get_group_key): + yield key, list(group) + + +def _source_patch_id(group_key) -> str: + """Return a stable source patch ID for a MiniSEED group.""" + version, network, location, seed_channel, start_ns, sample_rate = group_key + source_key = ".".join((network, location, seed_channel)) + return f"v{version}:{source_key}:{start_ns}:{sample_rate:g}" + + +def _patch_from_segments(group_key, segments: list[_TraceSegment]) -> dc.Patch: + """Create a DASCore Patch from compatible MiniSEED trace segments.""" + segments = sorted(segments, key=lambda x: (x.station, x.source_id)) + first = segments[0] + sample_count = min(x.sample_count for x in segments) + source_ids = tuple(x.source_id for x in segments) + data = np.stack([x.data[:sample_count] for x in segments]) + step = np.timedelta64(first.sample_step_ns, "ns") + start = np.datetime64(first.start_ns, "ns") + stop = start + step * sample_count + coords = { + "channel": np.arange(len(segments)), + "time": get_coord(start=start, stop=stop, step=step), + "source_id": (("channel",), source_ids), + "network": (("channel",), tuple(x.network for x in segments)), + "station": (("channel",), tuple(x.station for x in segments)), + "location": (("channel",), tuple(x.location for x in segments)), + "seed_channel": (("channel",), tuple(x.seed_channel for x in segments)), + } + attrs = { + "data_type": "", + "tag": ".".join((first.network, first.location, first.seed_channel)), + "sample_rate": first.sample_rate, + "sample_type": first.sample_type, + "mseed_encoding": first.encoding, + "mseed_publication_version": first.publication_version, + "mseed_record_length": first.record_length, + "_source_patch_id": _source_patch_id(group_key), + } + return dc.Patch(data=data, dims=("channel", "time"), coords=coords, attrs=attrs) + + +def _get_patches( + path, pymseed, time=None, channel=None, source_patch_id=() +) -> list[dc.Patch]: + """Read MiniSEED patches from a path.""" + wanted_ids = _normalize_source_patch_ids(source_patch_id) + read_time = None if wanted_ids else time + trim_limits = _get_time_limits(time) if wanted_ids else (None, None) + patches = [] + segment_groups = _group_segments(_read_segments(path, pymseed, read_time)) + for group_key, segments in segment_groups: + patch_id = _source_patch_id(group_key) + if wanted_ids and patch_id not in wanted_ids: + continue + if wanted_ids: + segments = [ + trimmed + for segment in segments + if (trimmed := _trim_segment_time(segment, trim_limits)) is not None + ] + if not segments: + continue + patch = _patch_from_segments(group_key, segments) + if channel is not None: + patch = patch.select(channel=channel) + if patch.size: + patches.append(patch) + return patches + + +def _scan_patches(path, pymseed) -> list[ScanPayload]: + """Return scan payloads for MiniSEED patches.""" + out = [] + for group_key, segments in _group_segments(_read_segments(path, pymseed)): + patch = _patch_from_segments(group_key, segments) + out.append(_patch_to_scan_payload(patch)) + return out + + +def _detect_format(path: LocalPath, pymseed) -> tuple[str, str] | bool: + """Return the MiniSEED version for a path or False.""" + versions = set() + try: + for record in _iter_records(path, pymseed, unpack_data=False): + versions.add(str(record.formatversion)) + break + except Exception: + return False + if not versions: + return False + version = next(iter(versions)) + if version in {"2", "3"}: + return "MSEED", version + return False diff --git a/pyproject.toml b/pyproject.toml index 400ad97d..65e220e6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,6 +70,7 @@ extras = [ "numba", "segyio", "bottleneck", + "pymseed[numpy]", ] docs = [ @@ -150,6 +151,8 @@ WAV = "dascore.io.wav.core:WavIO" XMLBINARY__V1 = "dascore.io.xml_binary.core:XMLBinaryV1" GDR_DAS__V1 = "dascore.io.gdr.core:GDR_V1" NETCDF_CF__V1_8 = "dascore.io.netcdf.core:NetCDFCFV18" +MSEED__V2 = "dascore.io.mseed.core:MSeedV2" +MSEED__V3 = "dascore.io.mseed.core:MSeedV3" [project.entry-points."dascore.patch_namespace"] io = "dascore.io:PatchIO" diff --git a/tests/test_io/test_common_io.py b/tests/test_io/test_common_io.py index d937c621..67cc8527 100644 --- a/tests/test_io/test_common_io.py +++ b/tests/test_io/test_common_io.py @@ -29,6 +29,7 @@ from dascore.io.febus import Febus1, Febus2 from dascore.io.gdr import GDR_V1 from dascore.io.h5simple import H5Simple +from dascore.io.mseed.core import MSeedV2 from dascore.io.netcdf import NetCDFCFV18 from dascore.io.neubrex import NeubrexDASV1, NeubrexRFSV1 from dascore.io.optodas import OptoDASV8 @@ -89,6 +90,7 @@ Terra15FormatterV5(): ("terra15_v5_test_file.hdf5",), Terra15FormatterV6(): ("terra15_v6_test_file.hdf5",), NetCDFCFV18(): ("xdas_netcdf.nc",), + MSeedV2(): ("etna_9n_3chan_10s.mseed",), } # This tuple is for fiber io which support a write method and can write @@ -202,6 +204,16 @@ def _assert_op_or_close(val1, val2, op): raise AssertionError(msg) +def _get_coord_trim_values(coord): + """Return existing coordinate values to use for read selections.""" + values = coord.values + if len(values) == 1: + return values[0], values[0] + start_ind = min(max(1, len(values) // 10), len(values) - 1) + stop_ind = min(max(start_ind, 2 * len(values) // 10), len(values) - 1) + return values[start_ind], values[stop_ind] + + # --- Tests @@ -299,14 +311,11 @@ def test_slice_single_dim_both_ends(self, io_path_tuple): with skip_missing(): summaries_from_file = [_scan_summary(x) for x in dc.scan(path)] assert len(summaries_from_file) - summary_init = summaries_from_file[0] - for dim in summary_init.dim_tuple: - summary_coord = summary_init.get_coord_summary(dim) - start = summary_coord.min - stop = summary_coord.max - duration = stop - start + patch_init = _cached_read(path, io=io)[0] + for dim in patch_init.dims: + trim_start, trim_stop = _get_coord_trim_values(patch_init.get_coord(dim)) # first test double ended query - trim_tuple = (start + duration / 10, start + 2 * duration / 10) + trim_tuple = (trim_start, trim_stop) spool = io.read(path, **{dim: trim_tuple}) assert len(spool) == 1 patch = spool[0] @@ -315,7 +324,7 @@ def test_slice_single_dim_both_ends(self, io_path_tuple): _assert_op_or_close(coord.min(), trim_tuple[0], ge) _assert_op_or_close(coord.max(), trim_tuple[1], le) # then single-ended query on start side - trim_tuple = (start + duration / 10, ...) + trim_tuple = (trim_start, ...) spool = io.read(path, **{dim: trim_tuple}) assert len(spool) == 1 patch = spool[0] @@ -325,7 +334,7 @@ def test_slice_single_dim_both_ends(self, io_path_tuple): _assert_op_or_close(coord.min(), trim_tuple[0], ge) _assert_op_or_close(coord.max(), summary.get_coord_summary(dim).max, eq) # then single-ended query on end side - trim_tuple = (None, start + duration / 10) + trim_tuple = (None, trim_start) spool = io.read(path, **{dim: trim_tuple}) assert len(spool) == 1 patch = spool[0] diff --git a/tests/test_io/test_mseed/test_mseed.py b/tests/test_io/test_mseed/test_mseed.py new file mode 100644 index 00000000..9f480fd9 --- /dev/null +++ b/tests/test_io/test_mseed/test_mseed.py @@ -0,0 +1,343 @@ +"""Tests for MiniSEED IO.""" + +from __future__ import annotations + +import numpy as np +import pytest + +import dascore as dc +from dascore.exceptions import MissingOptionalDependencyError +from dascore.io.mseed import utils as mseed_utils +from dascore.io.mseed.core import MSeedV2 + +pytest.importorskip("pymseed") + + +def _write_mseed( + path, + format_version=3, + starts=None, + sample_rates=None, + source_ids=None, +): + """Write a small MiniSEED file.""" + import pymseed + + source_ids = source_ids or [f"FDSN:XX_{ind:05d}__H_S_F" for ind in range(3)] + starts = starts or [pymseed.timestr2nstime("2024-01-01T00:00:00Z")] * len( + source_ids + ) + sample_rates = sample_rates or [10.0] * len(source_ids) + traces = pymseed.MS3TraceList() + for ind, (source_id, start, sample_rate) in enumerate( + zip(source_ids, starts, sample_rates, strict=True) + ): + data = np.arange(10, dtype=np.int32) + ind * 100 + traces.add_data( + sourceid=source_id, + data_samples=data, + sample_type="i", + sample_rate=sample_rate, + start_time=start, + ) + traces.to_file( + str(path), + overwrite=True, + format_version=format_version, + encoding=pymseed.DataEncoding.INT32, + ) + return path + + +@pytest.fixture +def mseed_v2_path(tmp_path): + """Return a small MiniSEED v2 path.""" + return _write_mseed(tmp_path / "test_v2.mseed", format_version=2) + + +@pytest.fixture +def mseed_v3_path(tmp_path): + """Return a small MiniSEED v3 path.""" + return _write_mseed(tmp_path / "test_v3.mseed", format_version=3) + + +class TestMiniSeedGetFormat: + """Tests for detecting MiniSEED files.""" + + def test_get_format_v2(self, mseed_v2_path): + """MiniSEED v2 files can be detected.""" + assert MSeedV2().get_format(mseed_v2_path) == ("MSEED", "2") + + def test_get_format_v3(self, mseed_v3_path): + """MiniSEED v3 files can be detected.""" + assert MSeedV2().get_format(mseed_v3_path) == ("MSEED", "3") + + def test_get_format_from_dascore(self, mseed_v3_path): + """DASCore can detect MiniSEED files through plugin discovery.""" + assert dc.get_format(mseed_v3_path) == ("MSEED", "3") + + def test_get_format_too_small(self, tmp_path): + """Small invalid files return False.""" + path = tmp_path / "bad.mseed" + path.write_bytes(b"abc") + assert MSeedV2().get_format(path) is False + + +class TestMiniSeedRead: + """Tests for reading MiniSEED files.""" + + def test_read_multichannel(self, mseed_v3_path): + """Compatible traces are merged into one channel-time patch.""" + patch = dc.read(mseed_v3_path, file_format="MSEED", file_version="3")[0] + assert patch.dims == ("channel", "time") + assert patch.shape == (3, 10) + assert np.all(patch.data[1] == np.arange(10) + 100) + assert patch.get_coord("time").step == np.timedelta64(100_000_000, "ns") + assert tuple(patch.coords.get_array("source_id")) == ( + "FDSN:XX_00000__H_S_F", + "FDSN:XX_00001__H_S_F", + "FDSN:XX_00002__H_S_F", + ) + assert tuple(patch.coords.get_array("network")) == ("XX", "XX", "XX") + assert tuple(patch.coords.get_array("station")) == ("00000", "00001", "00002") + assert tuple(patch.coords.get_array("location")) == ("", "", "") + assert tuple(patch.coords.get_array("seed_channel")) == ("HSF", "HSF", "HSF") + + def test_same_channel_different_stations_merge(self, tmp_path): + """Generic MiniSEED sources with compatible timing merge by station.""" + path = _write_mseed( + tmp_path / "multi_station.mseed", + source_ids=[ + "FDSN:XX_00066__H_S_F", + "FDSN:XX_00067__H_S_F", + "FDSN:XX_00068__H_S_F", + ], + ) + patch = dc.read(path, file_format="MSEED", file_version="3")[0] + assert patch.shape == (3, 10) + assert tuple(patch.coords.get_array("station")) == ("00066", "00067", "00068") + assert tuple(patch.coords.get_array("seed_channel")) == ("HSF", "HSF", "HSF") + + def test_different_seed_channels_are_separate_patches(self, tmp_path): + """Different physical components should not be merged into one patch.""" + path = _write_mseed( + tmp_path / "components.mseed", + source_ids=[ + "FDSN:XX_TEST__B_H_N", + "FDSN:XX_TEST__B_H_E", + "FDSN:XX_TEST__B_H_Z", + ], + ) + spool = dc.read(path, file_format="MSEED", file_version="3") + assert len(spool) == 3 + assert {x.coords.get_array("seed_channel")[0] for x in spool} == { + "BHE", + "BHN", + "BHZ", + } + + def test_read_select_time_and_channel(self, mseed_v3_path): + """Time and channel selections are applied to the output patch.""" + time_min = np.datetime64("2024-01-01T00:00:00.200000000") + time_max = np.datetime64("2024-01-01T00:00:00.500000000") + patch = dc.read( + mseed_v3_path, + file_format="MSEED", + file_version="3", + time=(time_min, time_max), + channel=(1, 3), + )[0] + assert patch.shape == (2, 4) + assert np.all(patch.get_coord("channel").values == np.array([1, 2])) + assert patch.get_coord("time").min() == time_min + + def test_incompatible_sample_rates_are_separate_patches(self, tmp_path): + """Incompatible traces are returned as separate patches.""" + path = _write_mseed( + tmp_path / "mixed.mseed", + format_version=3, + sample_rates=[10.0, 20.0, 10.0], + ) + spool = dc.read(path, file_format="MSEED", file_version="3") + assert len(spool) == 2 + assert sorted(x.shape[0] for x in spool) == [1, 2] + + def test_read_source_patch_id(self, tmp_path): + """source_patch_id filters logical MiniSEED groups.""" + path = _write_mseed( + tmp_path / "mixed.mseed", + format_version=3, + sample_rates=[10.0, 20.0, 10.0], + ) + summaries = dc.scan(path, file_format="MSEED", file_version="3") + target = summaries[0].source_patch_id + spool = dc.read( + path, file_format="MSEED", file_version="3", source_patch_id=target + ) + assert len(spool) == 1 + assert spool[0].attrs["_source_patch_id"] == target + + def test_read_source_patch_id_with_time(self, mseed_v3_path): + """source_patch_id from scan works with partial time reads.""" + target = dc.scan(mseed_v3_path, file_format="MSEED", file_version="3")[ + 0 + ].source_patch_id + time_min = np.datetime64("2024-01-01T00:00:00.200000000") + time_max = np.datetime64("2024-01-01T00:00:00.500000000") + spool = dc.read( + mseed_v3_path, + file_format="MSEED", + file_version="3", + source_patch_id=target, + time=(time_min, time_max), + ) + assert len(spool) == 1 + assert spool[0].attrs["_source_patch_id"] == target + assert spool[0].shape == (3, 4) + assert spool[0].get_coord("time").min() == time_min + + def test_read_source_patch_id_with_non_overlapping_time(self, mseed_v3_path): + """source_patch_id reads return empty spools for non-overlapping times.""" + target = dc.scan(mseed_v3_path, file_format="MSEED", file_version="3")[ + 0 + ].source_patch_id + spool = dc.read( + mseed_v3_path, + file_format="MSEED", + file_version="3", + source_patch_id=target, + time=( + np.datetime64("2024-01-01T01:00:00"), + np.datetime64("2024-01-01T01:00:01"), + ), + ) + assert len(spool) == 0 + + def test_discontinuous_source_records_are_separate_patches(self, tmp_path): + """Discontinuous records for one source ID are not coalesced.""" + import pymseed + + start = pymseed.timestr2nstime("2024-01-01T00:00:00Z") + path = _write_mseed( + tmp_path / "discontinuous.mseed", + source_ids=["FDSN:XX_TEST__H_S_F", "FDSN:XX_TEST__H_S_F"], + starts=[start, start + 2_000_000_000], + ) + spool = dc.read(path, file_format="MSEED", file_version="3") + assert len(spool) == 2 + + +class TestMiniSeedScan: + """Tests for scanning MiniSEED files.""" + + def test_scan(self, mseed_v3_path): + """Scan returns metadata for the logical MiniSEED patch.""" + summary = dc.scan(mseed_v3_path, file_format="MSEED", file_version="3")[0] + assert summary.dims == ("channel", "time") + assert summary.shape == (3, 10) + assert summary.dtype == "int32" + assert summary.source_format == "MSEED" + assert summary.source_version == "3" + assert summary.source_patch_id + + def test_missing_pymseed_raises(self, mseed_v3_path, monkeypatch): + """Explicit MiniSEED reads require PyMseed.""" + import dascore.io.mseed.core as mseed_core + + def _optional_import(*args, **kwargs): + raise MissingOptionalDependencyError("missing") + + monkeypatch.setattr(mseed_core, "optional_import", _optional_import) + with pytest.raises(MissingOptionalDependencyError): + dc.read(mseed_v3_path, file_format="MSEED", file_version="3") + + +class TestMiniSeedUtils: + """Focused tests for MiniSEED helper edge cases.""" + + def test_zero_sample_rate_raises(self): + """A zero sample rate cannot be converted to a sample step.""" + with pytest.raises(ValueError, match="sample rate"): + mseed_utils._sample_step_ns(0) + + def test_negative_sample_rate_is_sample_period(self): + """Negative MiniSEED sample rates are interpreted as sample periods.""" + assert mseed_utils._sample_step_ns(-0.1) == 100_000_000 + + def test_open_time_limits(self): + """None and ellipsis are open time bounds.""" + assert mseed_utils._get_time_limits(...) == (None, None) + assert mseed_utils._get_time_limits((None, ...)) == (None, None) + + def test_bad_source_id_falls_back_to_station(self): + """Unparseable source IDs are still preserved.""" + + class BadPyMseed: + @staticmethod + def sourceid2nslc(source_id): + raise ValueError("bad source id") + + assert mseed_utils._source_id_to_nslc(BadPyMseed, "not-fdsn") == ( + "", + "not-fdsn", + "", + "", + ) + + def test_record_to_segment_skips_non_overlapping_records(self): + """Record metadata can be rejected before sample unpacking.""" + + class Record: + starttime = 0 + endtime = 10 + + def unpack_data(self): + raise AssertionError("should not unpack") + + assert mseed_utils._record_to_segment(Record(), None, (20, 30)) is None + + def test_detect_format_no_records(self, tmp_path): + """Files with no records are not MiniSEED.""" + + class FakeMS3Record: + @staticmethod + def from_file(path, unpack_data=False): + return iter(()) + + class FakePyMseed: + MS3Record = FakeMS3Record + + out = mseed_utils._detect_format(tmp_path / "empty.mseed", FakePyMseed) + assert out is False + + def test_detect_format_unsupported_version(self, tmp_path): + """Unsupported MiniSEED versions are rejected.""" + + class Record: + formatversion = 4 + + class FakeMS3Record: + @staticmethod + def from_file(path, unpack_data=False): + yield Record() + + class FakePyMseed: + MS3Record = FakeMS3Record + + assert mseed_utils._detect_format(tmp_path / "v4.mseed", FakePyMseed) is False + + +class TestRealMiniSeed: + """Tests using real DAS MiniSEED data from the test-data registry.""" + + def test_read_merged_das_station_channels(self): + """Etna DAS station-coded channels read as one channel-time patch.""" + from dascore.utils.downloader import fetch + + path = fetch("etna_9n_3chan_10s.mseed") + patch = dc.read(path)[0] + assert patch.dims == ("channel", "time") + assert patch.shape[0] == 3 + assert tuple(patch.coords.get_array("network")) == ("9N", "9N", "9N") + assert tuple(patch.coords.get_array("station")) == ("00066", "00067", "00068") + assert tuple(patch.coords.get_array("seed_channel")) == ("HSF", "HSF", "HSF") From ae45a49fb17940a6fbc9df6b8b1d7a6aeff4adbb Mon Sep 17 00:00:00 2001 From: Derrick Chambers Date: Fri, 1 May 2026 17:54:40 +0200 Subject: [PATCH 2/8] Address MiniSEED review feedback --- dascore/io/mseed/core.py | 7 +- dascore/io/mseed/utils.py | 156 ++++++++++++++----- dascore/utils/io.py | 9 ++ tests/test_io/test_common_io.py | 36 ++--- tests/test_io/test_mseed/test_mseed.py | 205 +++++++++++++++++++++---- 5 files changed, 322 insertions(+), 91 deletions(-) diff --git a/dascore/io/mseed/core.py b/dascore/io/mseed/core.py index 834f9dd7..102438df 100644 --- a/dascore/io/mseed/core.py +++ b/dascore/io/mseed/core.py @@ -3,6 +3,10 @@ DASCore maps MiniSEED source IDs into 2D ``("channel", "time")`` patches. MiniSEED source identity is preserved as per-channel coordinates instead of scalar attrs because one DASCore patch can contain many MiniSEED sources. +Compatible MiniSEED sources share one patch only when they also have the same +sample count. Unequal-length sources are returned as separate patches with +stable ``channel`` coordinate values so selections still refer to the same +source across split patches. The mapping follows the FDSN source identifier convention, where a SEED NSLC code maps to ``FDSN:_____``. @@ -34,8 +38,7 @@ class MSeedV2(FiberIO): def get_format(self, path: LocalPath, **kwargs) -> tuple[str, str] | bool: """Determine if path is a MiniSEED file.""" - pymseed = optional_import("pymseed") - return _detect_format(path, pymseed) + return _detect_format(path) def scan(self, path: LocalPath, **kwargs) -> list[ScanPayload]: """Scan a MiniSEED file.""" diff --git a/dascore/io/mseed/utils.py b/dascore/io/mseed/utils.py index 817fa5c8..93cd1724 100644 --- a/dascore/io/mseed/utils.py +++ b/dascore/io/mseed/utils.py @@ -5,6 +5,7 @@ from dataclasses import dataclass, replace from itertools import groupby from math import ceil, floor +from struct import unpack import numpy as np @@ -13,7 +14,7 @@ from dascore.core import get_coord from dascore.io import ScanPayload from dascore.io.core import _patch_to_scan_payload -from dascore.utils.io import LocalPath, _normalize_source_patch_ids +from dascore.utils.io import LocalPath, _normalize_source_patch_ids, _read_file_header from dascore.utils.time import to_datetime64, to_int _TimeLimits = tuple[int | None, int | None] @@ -53,6 +54,19 @@ def next_start_ns(self) -> int: return self.start_ns + self.sample_count * self.sample_step_ns +@dataclass(frozen=True, order=True) +class _TraceGroupKey: + """Compatibility fields that define one MiniSEED output patch.""" + + format_version: str + network: str + location: str + seed_channel: str + start_ns: int + sample_rate: float + sample_count: int + + def _sample_step_ns(sample_rate: float) -> int: """Convert a MiniSEED sample rate to nanosecond spacing.""" if sample_rate == 0: @@ -196,15 +210,16 @@ def _read_segments(path: LocalPath, pymseed, time=None) -> list[_TraceSegment]: return _coalesce_source_segments(segments) -def _get_group_key(segment: _TraceSegment): +def _get_group_key(segment: _TraceSegment) -> _TraceGroupKey: """Return the compatibility key used to merge traces into a patch.""" - return ( - segment.format_version, - segment.network, - segment.location, - segment.seed_channel, - segment.start_ns, - segment.sample_rate, + return _TraceGroupKey( + format_version=segment.format_version, + network=segment.network, + location=segment.location, + seed_channel=segment.seed_channel, + start_ns=segment.start_ns, + sample_rate=segment.sample_rate, + sample_count=segment.sample_count, ) @@ -215,25 +230,46 @@ def _group_segments(segments: list[_TraceSegment]): yield key, list(group) -def _source_patch_id(group_key) -> str: +def _get_channel_map(segments: list[_TraceSegment]) -> dict[str, int]: + """Return stable channel indices for MiniSEED sources.""" + source_ids = { + x.source_id: (x.station, x.source_id) + for x in sorted(segments, key=lambda y: (y.station, y.source_id)) + } + return {source_id: ind for ind, source_id in enumerate(source_ids)} + + +def _source_patch_id(group_key: _TraceGroupKey) -> str: """Return a stable source patch ID for a MiniSEED group.""" - version, network, location, seed_channel, start_ns, sample_rate = group_key - source_key = ".".join((network, location, seed_channel)) - return f"v{version}:{source_key}:{start_ns}:{sample_rate:g}" + source_key = ".".join( + (group_key.network, group_key.location, group_key.seed_channel) + ) + rate_key = format(group_key.sample_rate, ".17g") + return ( + f"v{group_key.format_version}:{source_key}:" + f"{group_key.start_ns}:{rate_key}:{group_key.sample_count}" + ) -def _patch_from_segments(group_key, segments: list[_TraceSegment]) -> dc.Patch: +def _patch_from_segments( + group_key, segments: list[_TraceSegment], channel_map: dict[str, int] | None = None +) -> dc.Patch: """Create a DASCore Patch from compatible MiniSEED trace segments.""" segments = sorted(segments, key=lambda x: (x.station, x.source_id)) first = segments[0] - sample_count = min(x.sample_count for x in segments) + sample_count = first.sample_count source_ids = tuple(x.source_id for x in segments) - data = np.stack([x.data[:sample_count] for x in segments]) + channel_values = ( + tuple(channel_map[x.source_id] for x in segments) + if channel_map is not None + else tuple(range(len(segments))) + ) + data = np.stack([x.data for x in segments]) step = np.timedelta64(first.sample_step_ns, "ns") start = np.datetime64(first.start_ns, "ns") stop = start + step * sample_count coords = { - "channel": np.arange(len(segments)), + "channel": get_coord(data=np.asarray(channel_values)), "time": get_coord(start=start, stop=stop, step=step), "source_id": (("channel",), source_ids), "network": (("channel",), tuple(x.network for x in segments)), @@ -262,7 +298,9 @@ def _get_patches( read_time = None if wanted_ids else time trim_limits = _get_time_limits(time) if wanted_ids else (None, None) patches = [] - segment_groups = _group_segments(_read_segments(path, pymseed, read_time)) + all_segments = _read_segments(path, pymseed, read_time) + channel_map = _get_channel_map(all_segments) + segment_groups = _group_segments(all_segments) for group_key, segments in segment_groups: patch_id = _source_patch_id(group_key) if wanted_ids and patch_id not in wanted_ids: @@ -275,35 +313,81 @@ def _get_patches( ] if not segments: continue - patch = _patch_from_segments(group_key, segments) + patch = _patch_from_segments(group_key, segments, channel_map=channel_map) if channel is not None: patch = patch.select(channel=channel) if patch.size: patches.append(patch) - return patches + return sorted(patches, key=lambda x: x.get_coord("channel").min()) def _scan_patches(path, pymseed) -> list[ScanPayload]: """Return scan payloads for MiniSEED patches.""" - out = [] - for group_key, segments in _group_segments(_read_segments(path, pymseed)): - patch = _patch_from_segments(group_key, segments) - out.append(_patch_to_scan_payload(patch)) - return out + patches = [] + all_segments = _read_segments(path, pymseed) + channel_map = _get_channel_map(all_segments) + for group_key, segments in _group_segments(all_segments): + patch = _patch_from_segments(group_key, segments, channel_map=channel_map) + patches.append(patch) + patches = sorted(patches, key=lambda x: x.get_coord("channel").min()) + return [_patch_to_scan_payload(x) for x in patches] -def _detect_format(path: LocalPath, pymseed) -> tuple[str, str] | bool: - """Return the MiniSEED version for a path or False.""" - versions = set() +def _is_ascii_digit(value: int) -> bool: + """Return True if an integer byte is an ASCII digit.""" + return 48 <= value <= 57 + + +def _is_seed_code(value: bytes, *, allow_space: bool = True) -> bool: + """Return True if bytes are plausible SEED code bytes.""" + allowed = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + if allow_space: + allowed += b" " + return bool(value.strip()) and all(char in allowed for char in value) + + +def _detect_mseed_v2_header(header: bytes) -> bool: + """Return True if a fixed MiniSEED 2 header looks valid.""" + if len(header) < 48: + return False + if not all(_is_ascii_digit(char) for char in header[:6]): + return False + if header[6:7] not in b"DRQM": + return False + if header[7] not in (0, 32): + return False + if not _is_seed_code(header[8:13]): + return False + if not _is_seed_code(header[15:18], allow_space=False): + return False + if not _is_seed_code(header[18:20]): + return False try: - for record in _iter_records(path, pymseed, unpack_data=False): - versions.add(str(record.formatversion)) - break + year, day = unpack(">HH", header[20:24]) + hour, minute, second = header[24:27] + _unused, frac_seconds, sample_count = unpack(">BHH", header[27:32]) + data_offset, blockette_offset = unpack(">HH", header[44:48]) except Exception: return False - if not versions: - return False - version = next(iter(versions)) - if version in {"2", "3"}: - return "MSEED", version + valid_time = ( + 1900 <= year <= 2600 + and 1 <= day <= 366 + and hour <= 23 + and minute <= 59 + and second <= 60 + and frac_seconds <= 9_999 + ) + valid_offsets = data_offset >= 48 and ( + blockette_offset == 0 or blockette_offset >= 48 + ) + return valid_time and valid_offsets and sample_count > 0 + + +def _detect_format(path: LocalPath) -> tuple[str, str] | bool: + """Return the MiniSEED version for a path or False.""" + header = _read_file_header(path, 48) + if header.startswith(b"MS\x03"): + return "MSEED", "3" + if _detect_mseed_v2_header(header): + return "MSEED", "2" return False diff --git a/dascore/utils/io.py b/dascore/utils/io.py index 53ff30a5..f3a49223 100644 --- a/dascore/utils/io.py +++ b/dascore/utils/io.py @@ -85,6 +85,15 @@ def _normalize_source_patch_ids(source_patch_id) -> set[str]: return {str(value) for value in iterate(source_patch_id) if value not in (None, "")} +def _read_file_header(path, length: int) -> bytes: + """Return the first bytes from a file-like path or empty bytes on IO errors.""" + try: + with open(path, "rb") as fi: + return fi.read(length) + except OSError: + return b"" + + class BinaryReader(io.BytesIO): """Base file for reading binary files.""" diff --git a/tests/test_io/test_common_io.py b/tests/test_io/test_common_io.py index 67cc8527..4f6dcb73 100644 --- a/tests/test_io/test_common_io.py +++ b/tests/test_io/test_common_io.py @@ -214,6 +214,19 @@ def _get_coord_trim_values(coord): return values[start_ind], values[stop_ind] +def _assert_spool_dim_selection(spool, dim, trim_tuple, start_op, stop_op): + """Assert all patches in a selected spool respect a requested dim range.""" + assert len(spool) + for patch in spool: + _assert_coords_attrs_match(patch) + coord = patch.get_coord(dim) + summary = patch.summary.get_coord_summary(dim) + start = summary.min if trim_tuple[0] in (None, ...) else trim_tuple[0] + stop = summary.max if trim_tuple[1] in (None, ...) else trim_tuple[1] + _assert_op_or_close(coord.min(), start, start_op) + _assert_op_or_close(coord.max(), stop, stop_op) + + # --- Tests @@ -317,32 +330,15 @@ def test_slice_single_dim_both_ends(self, io_path_tuple): # first test double ended query trim_tuple = (trim_start, trim_stop) spool = io.read(path, **{dim: trim_tuple}) - assert len(spool) == 1 - patch = spool[0] - _assert_coords_attrs_match(patch) - coord = patch.get_coord(dim) - _assert_op_or_close(coord.min(), trim_tuple[0], ge) - _assert_op_or_close(coord.max(), trim_tuple[1], le) + _assert_spool_dim_selection(spool, dim, trim_tuple, ge, le) # then single-ended query on start side trim_tuple = (trim_start, ...) spool = io.read(path, **{dim: trim_tuple}) - assert len(spool) == 1 - patch = spool[0] - summary = patch.summary - _assert_coords_attrs_match(patch) - coord = patch.get_coord(dim) - _assert_op_or_close(coord.min(), trim_tuple[0], ge) - _assert_op_or_close(coord.max(), summary.get_coord_summary(dim).max, eq) + _assert_spool_dim_selection(spool, dim, trim_tuple, ge, eq) # then single-ended query on end side trim_tuple = (None, trim_start) spool = io.read(path, **{dim: trim_tuple}) - assert len(spool) == 1 - patch = spool[0] - summary = patch.summary - _assert_coords_attrs_match(patch) - coord = patch.get_coord(dim) - _assert_op_or_close(coord.min(), summary.get_coord_summary(dim).min, eq) - _assert_op_or_close(coord.max(), trim_tuple[1], le) + _assert_spool_dim_selection(spool, dim, trim_tuple, eq, le) def test_slice_out_all_patches_time(self, io_path_tuple): """Ensure slicing outside of file time range returns an empty spool.""" diff --git a/tests/test_io/test_mseed/test_mseed.py b/tests/test_io/test_mseed/test_mseed.py index 9f480fd9..e49136f6 100644 --- a/tests/test_io/test_mseed/test_mseed.py +++ b/tests/test_io/test_mseed/test_mseed.py @@ -10,7 +10,18 @@ from dascore.io.mseed import utils as mseed_utils from dascore.io.mseed.core import MSeedV2 -pytest.importorskip("pymseed") + +def _mseed_v2_header(): + """Return enough fixed header bytes to identify a MiniSEED 2 file.""" + header = bytearray(48) + header[0:8] = b"000001D " + header[8:13] = b"STAT " + header[13:15] = b" " + header[15:18] = b"HSF" + header[18:20] = b"XX" + header[20:32] = bytes.fromhex("07e800010000000000000001") + header[44:48] = bytes.fromhex("00300000") + return header def _write_mseed( @@ -19,20 +30,23 @@ def _write_mseed( starts=None, sample_rates=None, source_ids=None, + data_samples=None, ): """Write a small MiniSEED file.""" - import pymseed + pymseed = pytest.importorskip("pymseed") source_ids = source_ids or [f"FDSN:XX_{ind:05d}__H_S_F" for ind in range(3)] starts = starts or [pymseed.timestr2nstime("2024-01-01T00:00:00Z")] * len( source_ids ) sample_rates = sample_rates or [10.0] * len(source_ids) + data_samples = data_samples or [ + np.arange(10, dtype=np.int32) + ind * 100 for ind in range(len(source_ids)) + ] traces = pymseed.MS3TraceList() - for ind, (source_id, start, sample_rate) in enumerate( - zip(source_ids, starts, sample_rates, strict=True) + for source_id, start, sample_rate, data in zip( + source_ids, starts, sample_rates, data_samples, strict=True ): - data = np.arange(10, dtype=np.int32) + ind * 100 traces.add_data( sourceid=source_id, data_samples=data, @@ -49,6 +63,33 @@ def _write_mseed( return path +def _write_mseed_v2_header(path): + """Write enough fixed header bytes to identify a MiniSEED 2 file.""" + path.write_bytes(_mseed_v2_header()) + return path + + +def _trace_segment(**kwargs): + """Return a small decoded MiniSEED trace segment for helper tests.""" + defaults = dict( + source_id="FDSN:XX_00000__H_S_F", + network="XX", + station="00000", + location="", + seed_channel="HSF", + format_version="3", + start_ns=0, + sample_rate=10.0, + data=np.arange(3, dtype=np.int32), + sample_type="i", + encoding="11", + publication_version=0, + record_length=256, + ) + defaults.update(kwargs) + return mseed_utils._TraceSegment(**defaults) + + @pytest.fixture def mseed_v2_path(tmp_path): """Return a small MiniSEED v2 path.""" @@ -76,6 +117,20 @@ def test_get_format_from_dascore(self, mseed_v3_path): """DASCore can detect MiniSEED files through plugin discovery.""" assert dc.get_format(mseed_v3_path) == ("MSEED", "3") + def test_get_format_without_pymseed(self, tmp_path, monkeypatch): + """MiniSEED headers can be detected without PyMseed installed.""" + import dascore.io.mseed.core as mseed_core + + def _optional_import(*args, **kwargs): + raise MissingOptionalDependencyError("missing") + + monkeypatch.setattr(mseed_core, "optional_import", _optional_import) + v2_path = _write_mseed_v2_header(tmp_path / "test_v2.mseed") + v3_path = tmp_path / "test_v3.mseed" + v3_path.write_bytes(b"MS\x03" + bytes(45)) + assert MSeedV2().get_format(v2_path) == ("MSEED", "2") + assert MSeedV2().get_format(v3_path) == ("MSEED", "3") + def test_get_format_too_small(self, tmp_path): """Small invalid files return False.""" path = tmp_path / "bad.mseed" @@ -162,6 +217,23 @@ def test_incompatible_sample_rates_are_separate_patches(self, tmp_path): assert len(spool) == 2 assert sorted(x.shape[0] for x in spool) == [1, 2] + def test_unequal_sample_counts_are_separate_patches(self, tmp_path): + """Unequal trace lengths are preserved in separate patches.""" + data_samples = [ + np.arange(10, dtype=np.int32), + np.arange(7, dtype=np.int32) + 100, + np.arange(10, dtype=np.int32) + 200, + ] + path = _write_mseed(tmp_path / "unequal.mseed", data_samples=data_samples) + spool = dc.read(path, file_format="MSEED", file_version="3") + assert len(spool) == 2 + assert sorted(x.shape for x in spool) == [(1, 7), (2, 10)] + assert sorted(len(x.get_coord("time")) for x in spool) == [7, 10] + assert sorted(tuple(x.get_coord("channel").values) for x in spool) == [ + (0, 2), + (1,), + ] + def test_read_source_patch_id(self, tmp_path): """source_patch_id filters logical MiniSEED groups.""" path = _write_mseed( @@ -215,7 +287,7 @@ def test_read_source_patch_id_with_non_overlapping_time(self, mseed_v3_path): def test_discontinuous_source_records_are_separate_patches(self, tmp_path): """Discontinuous records for one source ID are not coalesced.""" - import pymseed + pymseed = pytest.importorskip("pymseed") start = pymseed.timestr2nstime("2024-01-01T00:00:00Z") path = _write_mseed( @@ -251,6 +323,9 @@ def _optional_import(*args, **kwargs): with pytest.raises(MissingOptionalDependencyError): dc.read(mseed_v3_path, file_format="MSEED", file_version="3") + with pytest.raises(MissingOptionalDependencyError): + dc.scan(mseed_v3_path, file_format="MSEED", file_version="3") + class TestMiniSeedUtils: """Focused tests for MiniSEED helper edge cases.""" @@ -296,48 +371,112 @@ def unpack_data(self): assert mseed_utils._record_to_segment(Record(), None, (20, 30)) is None + def test_patch_from_segments_defaults_to_local_channel_indices(self): + """Patches can still be built without a global channel map.""" + segment = _trace_segment() + group_key = mseed_utils._get_group_key(segment) + patch = mseed_utils._patch_from_segments(group_key, [segment]) + assert tuple(patch.get_coord("channel").values) == (0,) + + def test_source_patch_id_uses_group_key_fields(self): + """Source patch IDs are built from the typed MiniSEED group key.""" + group_key = mseed_utils._TraceGroupKey( + format_version="3", + network="XX", + location="", + seed_channel="HSF", + start_ns=10, + sample_rate=10.0, + sample_count=12, + ) + assert mseed_utils._source_patch_id(group_key) == "v3:XX..HSF:10:10:12" + def test_detect_format_no_records(self, tmp_path): """Files with no records are not MiniSEED.""" - - class FakeMS3Record: - @staticmethod - def from_file(path, unpack_data=False): - return iter(()) - - class FakePyMseed: - MS3Record = FakeMS3Record - - out = mseed_utils._detect_format(tmp_path / "empty.mseed", FakePyMseed) + path = tmp_path / "empty.mseed" + path.write_bytes(b"") + out = mseed_utils._detect_format(path) assert out is False + def test_detect_format_missing_file(self, tmp_path): + """Missing paths are not MiniSEED.""" + assert mseed_utils._detect_format(tmp_path / "missing.mseed") is False + def test_detect_format_unsupported_version(self, tmp_path): """Unsupported MiniSEED versions are rejected.""" + path = tmp_path / "v4.mseed" + path.write_bytes(b"MS\x04" + bytes(45)) + assert mseed_utils._detect_format(path) is False + + @pytest.mark.parametrize( + "field,value", + ( + ("sequence", b"A00001"), + ("record_type", b"X"), + ("reserved", b"X"), + ("station", b" "), + ("channel", b"H F"), + ("network", b" "), + ("year", bytes.fromhex("0700")), + ("data_offset", bytes.fromhex("002f")), + ("sample_count", bytes.fromhex("0000")), + ), + ) + def test_detect_mseed_v2_header_rejects_invalid_fields(self, field, value): + """Each fixed-header guard can reject invalid MiniSEED 2 headers.""" + header = _mseed_v2_header() + slices = { + "sequence": slice(0, 6), + "record_type": slice(6, 7), + "reserved": slice(7, 8), + "station": slice(8, 13), + "channel": slice(15, 18), + "network": slice(18, 20), + "year": slice(20, 22), + "data_offset": slice(44, 46), + "sample_count": slice(30, 32), + } + header[slices[field]] = value + assert not mseed_utils._detect_mseed_v2_header(header) - class Record: - formatversion = 4 + def test_detect_mseed_v2_header_rejects_short_header(self): + """Truncated fixed headers are not MiniSEED 2.""" + assert not mseed_utils._detect_mseed_v2_header(bytes(47)) - class FakeMS3Record: - @staticmethod - def from_file(path, unpack_data=False): - yield Record() + def test_detect_mseed_v2_header_handles_unpack_error(self, monkeypatch): + """Header unpack failures are treated as non-MiniSEED.""" - class FakePyMseed: - MS3Record = FakeMS3Record + def _raise_unpack(*args, **kwargs): + raise ValueError("bad unpack") - assert mseed_utils._detect_format(tmp_path / "v4.mseed", FakePyMseed) is False + monkeypatch.setattr(mseed_utils, "unpack", _raise_unpack) + assert not mseed_utils._detect_mseed_v2_header(_mseed_v2_header()) + + def test_close_sample_rates_have_distinct_source_patch_ids(self): + """Full precision sample rates avoid source patch ID collisions.""" + group_a = mseed_utils._TraceGroupKey("3", "XX", "", "HSF", 0, 1.00000001, 10) + group_b = mseed_utils._TraceGroupKey("3", "XX", "", "HSF", 0, 1.00000002, 10) + assert mseed_utils._source_patch_id(group_a) != mseed_utils._source_patch_id( + group_b + ) class TestRealMiniSeed: """Tests using real DAS MiniSEED data from the test-data registry.""" - def test_read_merged_das_station_channels(self): - """Etna DAS station-coded channels read as one channel-time patch.""" + def test_read_das_station_channels(self): + """Etna DAS station-coded channels preserve their full sample counts.""" + pytest.importorskip("pymseed") from dascore.utils.downloader import fetch path = fetch("etna_9n_3chan_10s.mseed") - patch = dc.read(path)[0] - assert patch.dims == ("channel", "time") - assert patch.shape[0] == 3 - assert tuple(patch.coords.get_array("network")) == ("9N", "9N", "9N") - assert tuple(patch.coords.get_array("station")) == ("00066", "00067", "00068") - assert tuple(patch.coords.get_array("seed_channel")) == ("HSF", "HSF", "HSF") + spool = dc.read(path) + assert len(spool) == 3 + assert sorted(x.shape for x in spool) == [(1, 13556), (1, 13729), (1, 13735)] + assert {x.coords.get_array("network")[0] for x in spool} == {"9N"} + assert {x.coords.get_array("station")[0] for x in spool} == { + "00066", + "00067", + "00068", + } + assert {x.coords.get_array("seed_channel")[0] for x in spool} == {"HSF"} From fdb6e0f1071e132c6c4119fb66e6a2c5c16eaabd Mon Sep 17 00:00:00 2001 From: Derrick Chambers Date: Sat, 2 May 2026 09:58:19 +0200 Subject: [PATCH 3/8] Refine MiniSEED scan handling --- dascore/io/mseed/core.py | 18 +- dascore/io/mseed/utils.py | 249 +++++++++++++++++++++---- tests/test_io/test_common_io.py | 3 +- tests/test_io/test_mseed/test_mseed.py | 158 +++++++++++++++- 4 files changed, 379 insertions(+), 49 deletions(-) diff --git a/dascore/io/mseed/core.py b/dascore/io/mseed/core.py index 102438df..a4aaba69 100644 --- a/dascore/io/mseed/core.py +++ b/dascore/io/mseed/core.py @@ -3,10 +3,20 @@ DASCore maps MiniSEED source IDs into 2D ``("channel", "time")`` patches. MiniSEED source identity is preserved as per-channel coordinates instead of scalar attrs because one DASCore patch can contain many MiniSEED sources. -Compatible MiniSEED sources share one patch only when they also have the same -sample count. Unequal-length sources are returned as separate patches with -stable ``channel`` coordinate values so selections still refer to the same -source across split patches. +The preserved source coordinates are ``source_id``, ``network``, ``station``, +``location``, and ``seed_channel``. + +Compatible MiniSEED sources are merged into one patch when they share the same +format version, network, location, SEED channel, start time, sample rate, and +sample count. Sources with different sample rates, start times, or sample +counts are returned as separate patches. When sources split across multiple +patches, DASCore keeps stable integer ``channel`` coordinate values so +selections still refer to the same source. + +The MiniSEED ``channel`` coordinate is an integer source index, not optical +distance. Optical distance is usually stored in companion DAS metadata rather +than MiniSEED record headers and should be attached from that metadata when +available. The mapping follows the FDSN source identifier convention, where a SEED NSLC code maps to ``FDSN:_____``. diff --git a/dascore/io/mseed/utils.py b/dascore/io/mseed/utils.py index 93cd1724..7820f2d1 100644 --- a/dascore/io/mseed/utils.py +++ b/dascore/io/mseed/utils.py @@ -11,18 +11,42 @@ import dascore as dc from dascore.constants import ONE_BILLION -from dascore.core import get_coord +from dascore.core import get_coord, get_coord_manager from dascore.io import ScanPayload -from dascore.io.core import _patch_to_scan_payload +from dascore.io.core import _make_scan_payload from dascore.utils.io import LocalPath, _normalize_source_patch_ids, _read_file_header from dascore.utils.time import to_datetime64, to_int _TimeLimits = tuple[int | None, int | None] +_TEXT_ENCODING = 0 +_INT16_ENCODING = 1 +_INT32_ENCODING = 3 +_FLOAT32_ENCODING = 4 +_FLOAT64_ENCODING = 5 +_STEIM1_ENCODING = 10 +_STEIM2_ENCODING = 11 + +_ENCODING_DTYPE_MAP = { + _TEXT_ENCODING: "S1", + _INT16_ENCODING: "int16", + _INT32_ENCODING: "int32", + _FLOAT32_ENCODING: "float32", + _FLOAT64_ENCODING: "float64", + _STEIM1_ENCODING: "int32", + _STEIM2_ENCODING: "int32", +} +_SAMPLE_TYPE_DTYPE_MAP = { + "i": "int32", + "f": "float32", + "d": "float64", + "t": "S1", +} + @dataclass(frozen=True) -class _TraceSegment: - """A decoded contiguous MiniSEED segment from one source ID.""" +class _TraceInfo: + """MiniSEED trace identity and timing metadata.""" source_id: str network: str @@ -32,17 +56,12 @@ class _TraceSegment: format_version: str start_ns: int sample_rate: float - data: np.ndarray + sample_count: int sample_type: str encoding: str publication_version: int record_length: int - @property - def sample_count(self) -> int: - """Return the number of samples in the segment.""" - return len(self.data) - @property def sample_step_ns(self) -> int: """Return the sample spacing in nanoseconds.""" @@ -54,6 +73,35 @@ def next_start_ns(self) -> int: return self.start_ns + self.sample_count * self.sample_step_ns +@dataclass(frozen=True) +class _TraceSegment: + """A decoded contiguous MiniSEED segment from one source ID.""" + + info: _TraceInfo + data: np.ndarray + + @property + def sample_count(self) -> int: + """Return the number of samples in the segment.""" + return len(self.data) + + def __getattr__(self, item): + """Delegate metadata lookups to the wrapped trace info.""" + return getattr(self.info, item) + + +@dataclass(frozen=True) +class _TraceSummary: + """Metadata for a contiguous MiniSEED segment from one source ID.""" + + info: _TraceInfo + dtype: str + + def __getattr__(self, item): + """Delegate metadata lookups to the wrapped trace info.""" + return getattr(self.info, item) + + @dataclass(frozen=True, order=True) class _TraceGroupKey: """Compatibility fields that define one MiniSEED output patch.""" @@ -92,13 +140,46 @@ def _get_time_limits(time=None) -> _TimeLimits: def _source_id_to_nslc(pymseed, source_id: str) -> tuple[str, str, str, str]: """Return NSLC codes from a MiniSEED source ID.""" + parse_errors = (ValueError, TypeError) + if source_id_error := getattr(pymseed, "SourceIDError", None): + parse_errors = (*parse_errors, source_id_error) try: nslc = pymseed.sourceid2nslc(source_id) - except Exception: + except parse_errors: return "", source_id, "", "" return tuple("" if x is None else str(x) for x in nslc) +def _record_dtype(record) -> str: + """Return the decoded NumPy dtype name from a MiniSEED record header.""" + if sample_type := getattr(record, "sampletype", None): + return _SAMPLE_TYPE_DTYPE_MAP.get(str(sample_type), "") + encoding = int(getattr(record, "encoding", -1)) + return _ENCODING_DTYPE_MAP.get(encoding, "") + + +def _record_to_trace_info(record, pymseed, sample_count: int) -> _TraceInfo: + """Convert common PyMseed record fields to trace metadata.""" + network, station, location, seed_channel = _source_id_to_nslc( + pymseed, str(record.sourceid) + ) + return _TraceInfo( + source_id=str(record.sourceid), + network=network, + station=station, + location=location, + seed_channel=seed_channel, + format_version=str(record.formatversion), + start_ns=int(record.starttime), + sample_rate=float(record.samprate), + sample_count=sample_count, + sample_type=str(record.sampletype or ""), + encoding=str(record.encoding), + publication_version=int(getattr(record, "pubversion", 0) or 0), + record_length=int(getattr(record, "reclen", 0) or 0), + ) + + def _iter_records(path: LocalPath, pymseed, unpack_data: bool = True): """Yield records from a MiniSEED file.""" yield from pymseed.MS3Record.from_file(str(path), unpack_data=unpack_data) @@ -122,27 +203,21 @@ def _record_to_segment( return None record.unpack_data() data = np.asarray(record.np_datasamples) - network, station, location, seed_channel = _source_id_to_nslc( - pymseed, str(record.sourceid) - ) segment = _TraceSegment( - source_id=str(record.sourceid), - network=network, - station=station, - location=location, - seed_channel=seed_channel, - format_version=str(record.formatversion), - start_ns=int(record.starttime), - sample_rate=float(record.samprate), + info=_record_to_trace_info(record, pymseed, sample_count=len(data)), data=data.copy(), - sample_type=str(record.sampletype or ""), - encoding=str(record.encoding), - publication_version=int(getattr(record, "pubversion", 0) or 0), - record_length=int(getattr(record, "reclen", 0) or 0), ) return _trim_segment_time(segment, time_limits) +def _record_to_summary(record, pymseed) -> _TraceSummary: + """Convert a PyMseed record header to a trace summary.""" + return _TraceSummary( + info=_record_to_trace_info(record, pymseed, sample_count=int(record.samplecnt)), + dtype=_record_dtype(record), + ) + + def _trim_segment_time( segment: _TraceSegment, time_limits: _TimeLimits ) -> _TraceSegment | None: @@ -161,9 +236,14 @@ def _trim_segment_time( ) if stop_index <= start_index: return None + info = replace( + segment.info, + start_ns=segment.start_ns + start_index * step, + sample_count=stop_index - start_index, + ) return replace( segment, - start_ns=segment.start_ns + start_index * step, + info=info, data=segment.data[start_index:stop_index].copy(), ) @@ -188,8 +268,12 @@ def _coalesce_source_segments(segments: list[_TraceSegment]) -> list[_TraceSegme if can_merge: pending = replace( pending, + info=replace( + pending.info, + sample_count=pending.sample_count + seg.sample_count, + record_length=max(pending.record_length, seg.record_length), + ), data=np.concatenate([pending.data, seg.data]), - record_length=max(pending.record_length, seg.record_length), ) else: out.append(pending) @@ -199,6 +283,43 @@ def _coalesce_source_segments(segments: list[_TraceSegment]) -> list[_TraceSegme return out +def _coalesce_source_summaries(summaries: list[_TraceSummary]) -> list[_TraceSummary]: + """Merge contiguous record summaries for the same source ID.""" + summaries = sorted(summaries, key=lambda x: (x.source_id, x.start_ns)) + out = [] + for source_id, source_group in groupby(summaries, key=lambda x: x.source_id): + pending = None + for summary in source_group: + if pending is None: + pending = summary + continue + can_merge = ( + pending.source_id == source_id + and pending.sample_rate == summary.sample_rate + and pending.format_version == summary.format_version + and pending.next_start_ns == summary.start_ns + and pending.dtype == summary.dtype + and pending.encoding == summary.encoding + ) + if can_merge: + pending = replace( + pending, + info=replace( + pending.info, + sample_count=pending.sample_count + summary.sample_count, + record_length=max( + pending.record_length, summary.record_length + ), + ), + ) + else: + out.append(pending) + pending = summary + if pending is not None: + out.append(pending) + return out + + def _read_segments(path: LocalPath, pymseed, time=None) -> list[_TraceSegment]: """Read and coalesce decoded MiniSEED records.""" segments = [] @@ -210,6 +331,16 @@ def _read_segments(path: LocalPath, pymseed, time=None) -> list[_TraceSegment]: return _coalesce_source_segments(segments) +def _scan_segments(path: LocalPath, pymseed) -> list[_TraceSummary]: + """Read and coalesce MiniSEED record metadata without unpacking samples.""" + summaries = [] + for record in _iter_records(path, pymseed, unpack_data=False): + summary = _record_to_summary(record, pymseed) + if summary.sample_count: + summaries.append(summary) + return _coalesce_source_summaries(summaries) + + def _get_group_key(segment: _TraceSegment) -> _TraceGroupKey: """Return the compatibility key used to merge traces into a patch.""" return _TraceGroupKey( @@ -251,10 +382,8 @@ def _source_patch_id(group_key: _TraceGroupKey) -> str: ) -def _patch_from_segments( - group_key, segments: list[_TraceSegment], channel_map: dict[str, int] | None = None -) -> dc.Patch: - """Create a DASCore Patch from compatible MiniSEED trace segments.""" +def _get_coords(group_key, segments, channel_map: dict[str, int] | None = None): + """Return DASCore coordinates from compatible MiniSEED segments.""" segments = sorted(segments, key=lambda x: (x.station, x.source_id)) first = segments[0] sample_count = first.sample_count @@ -264,11 +393,10 @@ def _patch_from_segments( if channel_map is not None else tuple(range(len(segments))) ) - data = np.stack([x.data for x in segments]) step = np.timedelta64(first.sample_step_ns, "ns") start = np.datetime64(first.start_ns, "ns") stop = start + step * sample_count - coords = { + return { "channel": get_coord(data=np.asarray(channel_values)), "time": get_coord(start=start, stop=stop, step=step), "source_id": (("channel",), source_ids), @@ -277,7 +405,11 @@ def _patch_from_segments( "location": (("channel",), tuple(x.location for x in segments)), "seed_channel": (("channel",), tuple(x.seed_channel for x in segments)), } - attrs = { + + +def _get_attrs(group_key, first) -> dict: + """Return DASCore attrs from a MiniSEED segment group.""" + return { "data_type": "", "tag": ".".join((first.network, first.location, first.seed_channel)), "sample_rate": first.sample_rate, @@ -287,9 +419,43 @@ def _patch_from_segments( "mseed_record_length": first.record_length, "_source_patch_id": _source_patch_id(group_key), } + + +def _patch_from_segments( + group_key, segments: list[_TraceSegment], channel_map: dict[str, int] | None = None +) -> dc.Patch: + """Create a DASCore Patch from compatible MiniSEED trace segments.""" + segments = sorted(segments, key=lambda x: (x.station, x.source_id)) + first = segments[0] + data = np.stack([x.data for x in segments]) + coords = _get_coords(group_key, segments, channel_map=channel_map) + attrs = _get_attrs(group_key, first) return dc.Patch(data=data, dims=("channel", "time"), coords=coords, attrs=attrs) +def _scan_payload_from_segments( + group_key, + segments: list[_TraceSummary], + channel_map: dict[str, int] | None = None, +) -> ScanPayload: + """Create a DASCore scan payload from MiniSEED trace summaries.""" + segments = sorted(segments, key=lambda x: (x.station, x.source_id)) + first = segments[0] + coords = get_coord_manager( + _get_coords(group_key, segments, channel_map=channel_map), + dims=("channel", "time"), + ) + attrs = _get_attrs(group_key, first) + return _make_scan_payload( + attrs=attrs, + coords=coords, + dims=coords.dims, + shape=coords.shape, + dtype=first.dtype, + source_patch_id=attrs["_source_patch_id"], + ) + + def _get_patches( path, pymseed, time=None, channel=None, source_patch_id=() ) -> list[dc.Patch]: @@ -323,14 +489,15 @@ def _get_patches( def _scan_patches(path, pymseed) -> list[ScanPayload]: """Return scan payloads for MiniSEED patches.""" - patches = [] - all_segments = _read_segments(path, pymseed) + payloads = [] + all_segments = _scan_segments(path, pymseed) channel_map = _get_channel_map(all_segments) for group_key, segments in _group_segments(all_segments): - patch = _patch_from_segments(group_key, segments, channel_map=channel_map) - patches.append(patch) - patches = sorted(patches, key=lambda x: x.get_coord("channel").min()) - return [_patch_to_scan_payload(x) for x in patches] + payload = _scan_payload_from_segments( + group_key, segments, channel_map=channel_map + ) + payloads.append(payload) + return sorted(payloads, key=lambda x: x["coords"].get_coord("channel").min()) def _is_ascii_digit(value: int) -> bool: diff --git a/tests/test_io/test_common_io.py b/tests/test_io/test_common_io.py index 4f6dcb73..5d10fb43 100644 --- a/tests/test_io/test_common_io.py +++ b/tests/test_io/test_common_io.py @@ -29,7 +29,7 @@ from dascore.io.febus import Febus1, Febus2 from dascore.io.gdr import GDR_V1 from dascore.io.h5simple import H5Simple -from dascore.io.mseed.core import MSeedV2 +from dascore.io.mseed.core import MSeedV2, MSeedV3 from dascore.io.netcdf import NetCDFCFV18 from dascore.io.neubrex import NeubrexDASV1, NeubrexRFSV1 from dascore.io.optodas import OptoDASV8 @@ -91,6 +91,7 @@ Terra15FormatterV6(): ("terra15_v6_test_file.hdf5",), NetCDFCFV18(): ("xdas_netcdf.nc",), MSeedV2(): ("etna_9n_3chan_10s.mseed",), + MSeedV3(): ("etna_9n_3chan_10s.mseed",), } # This tuple is for fiber io which support a write method and can write diff --git a/tests/test_io/test_mseed/test_mseed.py b/tests/test_io/test_mseed/test_mseed.py index e49136f6..6db6555f 100644 --- a/tests/test_io/test_mseed/test_mseed.py +++ b/tests/test_io/test_mseed/test_mseed.py @@ -9,6 +9,7 @@ from dascore.exceptions import MissingOptionalDependencyError from dascore.io.mseed import utils as mseed_utils from dascore.io.mseed.core import MSeedV2 +from tests.test_io._common_io_test_utils import skip_timeout def _mseed_v2_header(): @@ -71,6 +72,7 @@ def _write_mseed_v2_header(path): def _trace_segment(**kwargs): """Return a small decoded MiniSEED trace segment for helper tests.""" + data = kwargs.pop("data", np.arange(3, dtype=np.int32)) defaults = dict( source_id="FDSN:XX_00000__H_S_F", network="XX", @@ -80,14 +82,24 @@ def _trace_segment(**kwargs): format_version="3", start_ns=0, sample_rate=10.0, - data=np.arange(3, dtype=np.int32), + sample_count=len(data), sample_type="i", encoding="11", publication_version=0, record_length=256, ) defaults.update(kwargs) - return mseed_utils._TraceSegment(**defaults) + info = mseed_utils._TraceInfo(**defaults) + return mseed_utils._TraceSegment(info=info, data=data) + + +def _trace_summary(**kwargs): + """Return a small MiniSEED trace summary for helper tests.""" + segment = _trace_segment(**kwargs) + return mseed_utils._TraceSummary( + info=segment.info, + dtype=str(segment.data.dtype), + ) @pytest.fixture @@ -312,6 +324,69 @@ def test_scan(self, mseed_v3_path): assert summary.source_version == "3" assert summary.source_patch_id + @pytest.mark.parametrize( + ("fixture_name", "file_version"), + (("mseed_v2_path", "2"), ("mseed_v3_path", "3")), + ) + def test_scan_matches_read_summary(self, request, fixture_name, file_version): + """Light scan metadata matches the corresponding read summary.""" + path = request.getfixturevalue(fixture_name) + summary = dc.scan(path, file_format="MSEED", file_version=file_version)[0] + patch_summary = dc.read(path, file_format="MSEED", file_version=file_version)[ + 0 + ].summary + assert summary.dims == patch_summary.dims + assert summary.shape == patch_summary.shape + assert summary.dtype == patch_summary.dtype + assert summary.source_patch_id == patch_summary.source_patch_id + for dim in summary.dims: + scan_coord = summary.get_coord_summary(dim) + read_coord = patch_summary.get_coord_summary(dim) + assert scan_coord.min == read_coord.min + assert scan_coord.max == read_coord.max + assert scan_coord.step == read_coord.step + + def test_scan_v2_metadata(self, mseed_v2_path): + """MiniSEED v2 scan reports expected metadata from headers.""" + summary = dc.scan(mseed_v2_path, file_format="MSEED", file_version="2")[0] + assert summary.shape == (3, 10) + assert summary.dtype == "int32" + assert summary.source_format == "MSEED" + assert summary.source_version == "2" + assert summary.source_patch_id.startswith("v2:") + + def test_scan_does_not_unpack_records(self, monkeypatch): + """Scan uses record headers without decoding sample payloads.""" + + class Record: + sourceid = "FDSN:XX_00000__H_S_F" + formatversion = 3 + starttime = 0 + samprate = 10.0 + samplecnt = 10 + sampletype = None + encoding = 3 + pubversion = 0 + reclen = 256 + + def unpack_data(self): + raise AssertionError("scan should not unpack samples") + + class PyMseed: + @staticmethod + def sourceid2nslc(source_id): + assert source_id == Record.sourceid + return "XX", "00000", "", "HSF" + + def _iter_records(*args, **kwargs): + assert kwargs["unpack_data"] is False + yield Record() + + monkeypatch.setattr(mseed_utils, "_iter_records", _iter_records) + payload = mseed_utils._scan_patches("unused", PyMseed)[0] + assert payload["shape"] == (1, 10) + assert payload["dtype"] == "int32" + def test_missing_pymseed_raises(self, mseed_v3_path, monkeypatch): """Explicit MiniSEED reads require PyMseed.""" import dascore.io.mseed.core as mseed_core @@ -359,6 +434,35 @@ def sourceid2nslc(source_id): "", ) + def test_source_id_error_falls_back_to_station(self): + """PyMseed source ID parser errors are treated as unparseable IDs.""" + + class BadPyMseed: + class SourceIDError(Exception): + pass + + @staticmethod + def sourceid2nslc(source_id): + raise BadPyMseed.SourceIDError("bad source id") + + assert mseed_utils._source_id_to_nslc(BadPyMseed, "not-fdsn") == ( + "", + "not-fdsn", + "", + "", + ) + + def test_unexpected_source_id_errors_propagate(self): + """Only expected source ID parser errors are swallowed.""" + + class BadPyMseed: + @staticmethod + def sourceid2nslc(source_id): + raise RuntimeError("unexpected bug") + + with pytest.raises(RuntimeError, match="unexpected bug"): + mseed_utils._source_id_to_nslc(BadPyMseed, "not-fdsn") + def test_record_to_segment_skips_non_overlapping_records(self): """Record metadata can be rejected before sample unpacking.""" @@ -371,6 +475,53 @@ def unpack_data(self): assert mseed_utils._record_to_segment(Record(), None, (20, 30)) is None + def test_record_dtype_from_sample_type(self): + """Populated sample types are preferred when inferring scan dtype.""" + + class Record: + sampletype = "d" + encoding = 3 + + assert mseed_utils._record_dtype(Record()) == "float64" + + @pytest.mark.parametrize( + ("encoding", "dtype"), + ( + (0, "S1"), + (1, "int16"), + (3, "int32"), + (4, "float32"), + (5, "float64"), + (10, "int32"), + (11, "int32"), + (999, ""), + ), + ) + def test_record_dtype_from_encoding(self, encoding, dtype): + """MiniSEED scan dtype can be inferred from known encodings.""" + + class Record: + sampletype = None + + Record.encoding = encoding + assert mseed_utils._record_dtype(Record()) == dtype + + def test_contiguous_source_summaries_are_coalesced(self): + """Contiguous scan summaries from one source are merged.""" + first = _trace_summary() + second = _trace_summary(start_ns=first.next_start_ns, record_length=512) + out = mseed_utils._coalesce_source_summaries([second, first]) + assert len(out) == 1 + assert out[0].sample_count == first.sample_count + second.sample_count + assert out[0].record_length == 512 + + def test_discontinuous_source_summaries_are_not_coalesced(self): + """Discontinuous scan summaries from one source stay separate.""" + first = _trace_summary() + second = _trace_summary(start_ns=first.next_start_ns + first.sample_step_ns) + out = mseed_utils._coalesce_source_summaries([first, second]) + assert len(out) == 2 + def test_patch_from_segments_defaults_to_local_channel_indices(self): """Patches can still be built without a global channel map.""" segment = _trace_segment() @@ -469,7 +620,8 @@ def test_read_das_station_channels(self): pytest.importorskip("pymseed") from dascore.utils.downloader import fetch - path = fetch("etna_9n_3chan_10s.mseed") + with skip_timeout(): + path = fetch("etna_9n_3chan_10s.mseed") spool = dc.read(path) assert len(spool) == 3 assert sorted(x.shape for x in spool) == [(1, 13556), (1, 13729), (1, 13735)] From 5bc7805a4d204c9c2a87d3185eb7d4f86c0c83f6 Mon Sep 17 00:00:00 2001 From: Derrick Chambers Date: Sat, 2 May 2026 17:09:55 +0200 Subject: [PATCH 4/8] Clean up MiniSEED trace helpers --- dascore/io/mseed/core.py | 16 +- dascore/io/mseed/utils.py | 262 ++++++++++++++++--------- tests/test_io/test_mseed/test_mseed.py | 12 +- 3 files changed, 182 insertions(+), 108 deletions(-) diff --git a/dascore/io/mseed/core.py b/dascore/io/mseed/core.py index a4aaba69..4f605064 100644 --- a/dascore/io/mseed/core.py +++ b/dascore/io/mseed/core.py @@ -31,7 +31,7 @@ from __future__ import annotations import dascore as dc -from dascore.constants import SpoolType, opt_timeable_types +from dascore.constants import opt_timeable_types from dascore.io import FiberIO, ScanPayload from dascore.utils.io import LocalPath from dascore.utils.misc import optional_import @@ -46,27 +46,27 @@ class MSeedV2(FiberIO): preferred_extensions = ("mseed", "msd", "miniseed") version = "2" - def get_format(self, path: LocalPath, **kwargs) -> tuple[str, str] | bool: + def get_format(self, resource: LocalPath, **kwargs) -> tuple[str, str] | bool: """Determine if path is a MiniSEED file.""" - return _detect_format(path) + return _detect_format(resource) - def scan(self, path: LocalPath, **kwargs) -> list[ScanPayload]: + def scan(self, resource: LocalPath, **kwargs) -> list[ScanPayload]: """Scan a MiniSEED file.""" pymseed = optional_import("pymseed") - return _scan_patches(path, pymseed) + return _scan_patches(resource, pymseed) def read( self, - path: LocalPath, + resource: LocalPath, time: tuple[opt_timeable_types, opt_timeable_types] | None = None, channel: tuple[int | None, int | None] | None = None, source_patch_id=(), **kwargs, - ) -> SpoolType: + ) -> dc.BaseSpool: """Read a MiniSEED file.""" pymseed = optional_import("pymseed") patches = _get_patches( - path, + resource, pymseed, time=time, channel=channel, diff --git a/dascore/io/mseed/utils.py b/dascore/io/mseed/utils.py index 7820f2d1..c3692563 100644 --- a/dascore/io/mseed/utils.py +++ b/dascore/io/mseed/utils.py @@ -2,10 +2,12 @@ from __future__ import annotations +from collections.abc import Callable, Sequence from dataclasses import dataclass, replace from itertools import groupby from math import ceil, floor from struct import unpack +from typing import TypeVar import numpy as np @@ -18,6 +20,7 @@ from dascore.utils.time import to_datetime64, to_int _TimeLimits = tuple[int | None, int | None] +_T = TypeVar("_T", bound="_TraceBase") _TEXT_ENCODING = 0 _INT16_ENCODING = 1 @@ -42,6 +45,8 @@ "d": "float64", "t": "S1", } +_SEED_CHARS = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" +_SEED_CHARS_WITH_SPACE = _SEED_CHARS + b" " @dataclass(frozen=True) @@ -62,6 +67,77 @@ class _TraceInfo: publication_version: int record_length: int + +class _TraceBase: + """Base class exposing shared MiniSEED trace metadata.""" + + info: _TraceInfo + + @property + def source_id(self) -> str: + """Return the MiniSEED source ID.""" + return self.info.source_id + + @property + def network(self) -> str: + """Return the MiniSEED network code.""" + return self.info.network + + @property + def station(self) -> str: + """Return the MiniSEED station code.""" + return self.info.station + + @property + def location(self) -> str: + """Return the MiniSEED location code.""" + return self.info.location + + @property + def seed_channel(self) -> str: + """Return the SEED channel code.""" + return self.info.seed_channel + + @property + def format_version(self) -> str: + """Return the MiniSEED format version.""" + return self.info.format_version + + @property + def start_ns(self) -> int: + """Return the segment start time in epoch nanoseconds.""" + return self.info.start_ns + + @property + def sample_rate(self) -> float: + """Return the MiniSEED sample rate.""" + return self.info.sample_rate + + @property + def sample_count(self) -> int: + """Return the number of samples in the trace.""" + return self.info.sample_count + + @property + def sample_type(self) -> str: + """Return the MiniSEED sample type.""" + return self.info.sample_type + + @property + def encoding(self) -> str: + """Return the MiniSEED encoding.""" + return self.info.encoding + + @property + def publication_version(self) -> int: + """Return the MiniSEED publication version.""" + return self.info.publication_version + + @property + def record_length(self) -> int: + """Return the MiniSEED record length.""" + return self.info.record_length + @property def sample_step_ns(self) -> int: """Return the sample spacing in nanoseconds.""" @@ -74,33 +150,29 @@ def next_start_ns(self) -> int: @dataclass(frozen=True) -class _TraceSegment: +class _TraceSegment(_TraceBase): """A decoded contiguous MiniSEED segment from one source ID.""" info: _TraceInfo data: np.ndarray + def __post_init__(self) -> None: + """Validate that trace metadata matches the decoded samples.""" + assert len(self.data) == self.info.sample_count + @property def sample_count(self) -> int: """Return the number of samples in the segment.""" return len(self.data) - def __getattr__(self, item): - """Delegate metadata lookups to the wrapped trace info.""" - return getattr(self.info, item) - @dataclass(frozen=True) -class _TraceSummary: +class _TraceSummary(_TraceBase): """Metadata for a contiguous MiniSEED segment from one source ID.""" info: _TraceInfo dtype: str - def __getattr__(self, item): - """Delegate metadata lookups to the wrapped trace info.""" - return getattr(self.info, item) - @dataclass(frozen=True, order=True) class _TraceGroupKey: @@ -120,6 +192,7 @@ def _sample_step_ns(sample_rate: float) -> int: if sample_rate == 0: msg = "MiniSEED sample rate cannot be zero." raise ValueError(msg) + # MiniSEED negative sample rates encode the sample period in seconds. seconds = 1 / sample_rate if sample_rate > 0 else abs(sample_rate) return round(seconds * ONE_BILLION) @@ -147,7 +220,13 @@ def _source_id_to_nslc(pymseed, source_id: str) -> tuple[str, str, str, str]: nslc = pymseed.sourceid2nslc(source_id) except parse_errors: return "", source_id, "", "" - return tuple("" if x is None else str(x) for x in nslc) + network, station, location, seed_channel = nslc + return ( + "" if network is None else str(network), + "" if station is None else str(station), + "" if location is None else str(location), + "" if seed_channel is None else str(seed_channel), + ) def _record_dtype(record) -> str: @@ -180,11 +259,6 @@ def _record_to_trace_info(record, pymseed, sample_count: int) -> _TraceInfo: ) -def _iter_records(path: LocalPath, pymseed, unpack_data: bool = True): - """Yield records from a MiniSEED file.""" - yield from pymseed.MS3Record.from_file(str(path), unpack_data=unpack_data) - - def _record_overlaps_time(record, time_limits: _TimeLimits) -> bool: """Return True if a record overlaps a requested time range.""" start, stop = time_limits @@ -205,6 +279,7 @@ def _record_to_segment( data = np.asarray(record.np_datasamples) segment = _TraceSegment( info=_record_to_trace_info(record, pymseed, sample_count=len(data)), + # Detach from the PyMseed record buffer before records are advanced. data=data.copy(), ) return _trim_segment_time(segment, time_limits) @@ -212,6 +287,7 @@ def _record_to_segment( def _record_to_summary(record, pymseed) -> _TraceSummary: """Convert a PyMseed record header to a trace summary.""" + # Header-only scan trusts samplecnt; corrupted records may decode differently. return _TraceSummary( info=_record_to_trace_info(record, pymseed, sample_count=int(record.samplecnt)), dtype=_record_dtype(record), @@ -248,83 +324,85 @@ def _trim_segment_time( ) -def _coalesce_source_segments(segments: list[_TraceSegment]) -> list[_TraceSegment]: - """Merge contiguous records for the same source ID.""" - segments = sorted(segments, key=lambda x: (x.source_id, x.start_ns)) +def _coalesce_source_traces( + traces: Sequence[_T], + can_merge: Callable[[_T, _T], bool], + merge: Callable[[_T, _T], _T], +) -> list[_T]: + """Merge contiguous traces for each MiniSEED source ID.""" + traces = sorted(traces, key=lambda x: (x.source_id, x.start_ns)) out = [] - for source_id, source_group in groupby(segments, key=lambda x: x.source_id): + for _, source_group in groupby(traces, key=lambda x: x.source_id): pending = None - for seg in source_group: + for trace in source_group: if pending is None: - pending = seg + pending = trace continue - can_merge = ( - pending.source_id == source_id - and pending.sample_rate == seg.sample_rate - and pending.format_version == seg.format_version - and pending.next_start_ns == seg.start_ns - and pending.data.dtype == seg.data.dtype - ) - if can_merge: - pending = replace( - pending, - info=replace( - pending.info, - sample_count=pending.sample_count + seg.sample_count, - record_length=max(pending.record_length, seg.record_length), - ), - data=np.concatenate([pending.data, seg.data]), - ) + if can_merge(pending, trace): + pending = merge(pending, trace) else: out.append(pending) - pending = seg + pending = trace if pending is not None: out.append(pending) return out +def _coalesce_source_segments(segments: list[_TraceSegment]) -> list[_TraceSegment]: + """Merge contiguous decoded records for the same source ID.""" + + def can_merge(pending: _TraceSegment, seg: _TraceSegment) -> bool: + return ( + pending.sample_rate == seg.sample_rate + and pending.format_version == seg.format_version + and pending.next_start_ns == seg.start_ns + and pending.data.dtype == seg.data.dtype + ) + + def merge(pending: _TraceSegment, seg: _TraceSegment) -> _TraceSegment: + return replace( + pending, + info=replace( + pending.info, + sample_count=pending.sample_count + seg.sample_count, + record_length=max(pending.record_length, seg.record_length), + ), + data=np.concatenate([pending.data, seg.data]), + ) + + return _coalesce_source_traces(segments, can_merge, merge) + + def _coalesce_source_summaries(summaries: list[_TraceSummary]) -> list[_TraceSummary]: """Merge contiguous record summaries for the same source ID.""" - summaries = sorted(summaries, key=lambda x: (x.source_id, x.start_ns)) - out = [] - for source_id, source_group in groupby(summaries, key=lambda x: x.source_id): - pending = None - for summary in source_group: - if pending is None: - pending = summary - continue - can_merge = ( - pending.source_id == source_id - and pending.sample_rate == summary.sample_rate - and pending.format_version == summary.format_version - and pending.next_start_ns == summary.start_ns - and pending.dtype == summary.dtype - and pending.encoding == summary.encoding - ) - if can_merge: - pending = replace( - pending, - info=replace( - pending.info, - sample_count=pending.sample_count + summary.sample_count, - record_length=max( - pending.record_length, summary.record_length - ), - ), - ) - else: - out.append(pending) - pending = summary - if pending is not None: - out.append(pending) - return out + + def can_merge(pending: _TraceSummary, summary: _TraceSummary) -> bool: + return ( + pending.sample_rate == summary.sample_rate + and pending.format_version == summary.format_version + and pending.next_start_ns == summary.start_ns + and pending.dtype == summary.dtype + and pending.encoding == summary.encoding + ) + + def merge(pending: _TraceSummary, summary: _TraceSummary) -> _TraceSummary: + return replace( + pending, + info=replace( + pending.info, + sample_count=pending.sample_count + summary.sample_count, + record_length=max(pending.record_length, summary.record_length), + ), + ) + + return _coalesce_source_traces(summaries, can_merge, merge) def _read_segments(path: LocalPath, pymseed, time=None) -> list[_TraceSegment]: """Read and coalesce decoded MiniSEED records.""" segments = [] time_limits = _get_time_limits(time) - for record in _iter_records(path, pymseed, unpack_data=False): + for record in pymseed.MS3Record.from_file(str(path), unpack_data=False): segment = _record_to_segment(record, pymseed, time_limits) if segment is not None and segment.sample_count: segments.append(segment) @@ -334,14 +412,14 @@ def _read_segments(path: LocalPath, pymseed, time=None) -> list[_TraceSegment]: def _scan_segments(path: LocalPath, pymseed) -> list[_TraceSummary]: """Read and coalesce MiniSEED record metadata without unpacking samples.""" summaries = [] - for record in _iter_records(path, pymseed, unpack_data=False): + for record in pymseed.MS3Record.from_file(str(path), unpack_data=False): summary = _record_to_summary(record, pymseed) if summary.sample_count: summaries.append(summary) return _coalesce_source_summaries(summaries) -def _get_group_key(segment: _TraceSegment) -> _TraceGroupKey: +def _get_group_key(segment: _TraceBase) -> _TraceGroupKey: """Return the compatibility key used to merge traces into a patch.""" return _TraceGroupKey( format_version=segment.format_version, @@ -354,19 +432,18 @@ def _get_group_key(segment: _TraceSegment) -> _TraceGroupKey: ) -def _group_segments(segments: list[_TraceSegment]): +def _group_segments(segments: Sequence[_T]): """Yield compatible segment groups.""" segments = sorted(segments, key=lambda x: (_get_group_key(x), x.source_id)) for key, group in groupby(segments, key=_get_group_key): yield key, list(group) -def _get_channel_map(segments: list[_TraceSegment]) -> dict[str, int]: +def _get_channel_map(segments: Sequence[_TraceBase]) -> dict[str, int]: """Return stable channel indices for MiniSEED sources.""" - source_ids = { - x.source_id: (x.station, x.source_id) - for x in sorted(segments, key=lambda y: (y.station, y.source_id)) - } + source_ids = dict.fromkeys( + x.source_id for x in sorted(segments, key=lambda y: (y.station, y.source_id)) + ) return {source_id: ind for ind, source_id in enumerate(source_ids)} @@ -382,7 +459,9 @@ def _source_patch_id(group_key: _TraceGroupKey) -> str: ) -def _get_coords(group_key, segments, channel_map: dict[str, int] | None = None): +def _get_coords( + segments: Sequence[_TraceBase], channel_map: dict[str, int] | None = None +): """Return DASCore coordinates from compatible MiniSEED segments.""" segments = sorted(segments, key=lambda x: (x.station, x.source_id)) first = segments[0] @@ -428,7 +507,7 @@ def _patch_from_segments( segments = sorted(segments, key=lambda x: (x.station, x.source_id)) first = segments[0] data = np.stack([x.data for x in segments]) - coords = _get_coords(group_key, segments, channel_map=channel_map) + coords = _get_coords(segments, channel_map=channel_map) attrs = _get_attrs(group_key, first) return dc.Patch(data=data, dims=("channel", "time"), coords=coords, attrs=attrs) @@ -442,7 +521,7 @@ def _scan_payload_from_segments( segments = sorted(segments, key=lambda x: (x.station, x.source_id)) first = segments[0] coords = get_coord_manager( - _get_coords(group_key, segments, channel_map=channel_map), + _get_coords(segments, channel_map=channel_map), dims=("channel", "time"), ) attrs = _get_attrs(group_key, first) @@ -500,16 +579,9 @@ def _scan_patches(path, pymseed) -> list[ScanPayload]: return sorted(payloads, key=lambda x: x["coords"].get_coord("channel").min()) -def _is_ascii_digit(value: int) -> bool: - """Return True if an integer byte is an ASCII digit.""" - return 48 <= value <= 57 - - def _is_seed_code(value: bytes, *, allow_space: bool = True) -> bool: """Return True if bytes are plausible SEED code bytes.""" - allowed = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" - if allow_space: - allowed += b" " + allowed = _SEED_CHARS_WITH_SPACE if allow_space else _SEED_CHARS return bool(value.strip()) and all(char in allowed for char in value) @@ -517,9 +589,9 @@ def _detect_mseed_v2_header(header: bytes) -> bool: """Return True if a fixed MiniSEED 2 header looks valid.""" if len(header) < 48: return False - if not all(_is_ascii_digit(char) for char in header[:6]): + if not header[:6].isdigit(): return False - if header[6:7] not in b"DRQM": + if header[6] not in b"DRQM": return False if header[7] not in (0, 32): return False diff --git a/tests/test_io/test_mseed/test_mseed.py b/tests/test_io/test_mseed/test_mseed.py index 6db6555f..fedb5e78 100644 --- a/tests/test_io/test_mseed/test_mseed.py +++ b/tests/test_io/test_mseed/test_mseed.py @@ -373,16 +373,18 @@ def unpack_data(self): raise AssertionError("scan should not unpack samples") class PyMseed: + class MS3Record: + @staticmethod + def from_file(path, *, unpack_data): + assert path == "unused" + assert unpack_data is False + yield Record() + @staticmethod def sourceid2nslc(source_id): assert source_id == Record.sourceid return "XX", "00000", "", "HSF" - def _iter_records(*args, **kwargs): - assert kwargs["unpack_data"] is False - yield Record() - - monkeypatch.setattr(mseed_utils, "_iter_records", _iter_records) payload = mseed_utils._scan_patches("unused", PyMseed)[0] assert payload["shape"] == (1, 10) assert payload["dtype"] == "int32" From 45242d868acd0b487155b44dcb34ebcfc38c14db Mon Sep 17 00:00:00 2001 From: Derrick Chambers Date: Sat, 2 May 2026 17:14:43 +0200 Subject: [PATCH 5/8] Fix MiniSEED common IO version matrix --- tests/test_io/test_common_io.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_io/test_common_io.py b/tests/test_io/test_common_io.py index 5d10fb43..4f6dcb73 100644 --- a/tests/test_io/test_common_io.py +++ b/tests/test_io/test_common_io.py @@ -29,7 +29,7 @@ from dascore.io.febus import Febus1, Febus2 from dascore.io.gdr import GDR_V1 from dascore.io.h5simple import H5Simple -from dascore.io.mseed.core import MSeedV2, MSeedV3 +from dascore.io.mseed.core import MSeedV2 from dascore.io.netcdf import NetCDFCFV18 from dascore.io.neubrex import NeubrexDASV1, NeubrexRFSV1 from dascore.io.optodas import OptoDASV8 @@ -91,7 +91,6 @@ Terra15FormatterV6(): ("terra15_v6_test_file.hdf5",), NetCDFCFV18(): ("xdas_netcdf.nc",), MSeedV2(): ("etna_9n_3chan_10s.mseed",), - MSeedV3(): ("etna_9n_3chan_10s.mseed",), } # This tuple is for fiber io which support a write method and can write From 25d21542bdb6e4e4d6b30158f90a68e7b47b436a Mon Sep 17 00:00:00 2001 From: Derrick Chambers Date: Sat, 2 May 2026 20:14:40 +0200 Subject: [PATCH 6/8] performance review --- dascore/io/mseed/utils.py | 305 ++++++++++++++----------- tests/test_io/test_mseed/test_mseed.py | 113 ++++++++- 2 files changed, 280 insertions(+), 138 deletions(-) diff --git a/dascore/io/mseed/utils.py b/dascore/io/mseed/utils.py index c3692563..17997682 100644 --- a/dascore/io/mseed/utils.py +++ b/dascore/io/mseed/utils.py @@ -20,7 +20,9 @@ from dascore.utils.time import to_datetime64, to_int _TimeLimits = tuple[int | None, int | None] -_T = TypeVar("_T", bound="_TraceBase") +_TraceWindow = tuple[int, int] +_SourceWindows = dict[str, list[_TimeLimits]] +_T = TypeVar("_T", bound="_TraceInfo") _TEXT_ENCODING = 0 _INT16_ENCODING = 1 @@ -67,77 +69,6 @@ class _TraceInfo: publication_version: int record_length: int - -class _TraceBase: - """Base class exposing shared MiniSEED trace metadata.""" - - info: _TraceInfo - - @property - def source_id(self) -> str: - """Return the MiniSEED source ID.""" - return self.info.source_id - - @property - def network(self) -> str: - """Return the MiniSEED network code.""" - return self.info.network - - @property - def station(self) -> str: - """Return the MiniSEED station code.""" - return self.info.station - - @property - def location(self) -> str: - """Return the MiniSEED location code.""" - return self.info.location - - @property - def seed_channel(self) -> str: - """Return the SEED channel code.""" - return self.info.seed_channel - - @property - def format_version(self) -> str: - """Return the MiniSEED format version.""" - return self.info.format_version - - @property - def start_ns(self) -> int: - """Return the segment start time in epoch nanoseconds.""" - return self.info.start_ns - - @property - def sample_rate(self) -> float: - """Return the MiniSEED sample rate.""" - return self.info.sample_rate - - @property - def sample_count(self) -> int: - """Return the number of samples in the trace.""" - return self.info.sample_count - - @property - def sample_type(self) -> str: - """Return the MiniSEED sample type.""" - return self.info.sample_type - - @property - def encoding(self) -> str: - """Return the MiniSEED encoding.""" - return self.info.encoding - - @property - def publication_version(self) -> int: - """Return the MiniSEED publication version.""" - return self.info.publication_version - - @property - def record_length(self) -> int: - """Return the MiniSEED record length.""" - return self.info.record_length - @property def sample_step_ns(self) -> int: """Return the sample spacing in nanoseconds.""" @@ -150,27 +81,20 @@ def next_start_ns(self) -> int: @dataclass(frozen=True) -class _TraceSegment(_TraceBase): +class _TraceSegment(_TraceInfo): """A decoded contiguous MiniSEED segment from one source ID.""" - info: _TraceInfo data: np.ndarray def __post_init__(self) -> None: """Validate that trace metadata matches the decoded samples.""" - assert len(self.data) == self.info.sample_count - - @property - def sample_count(self) -> int: - """Return the number of samples in the segment.""" - return len(self.data) + assert len(self.data) == self.sample_count @dataclass(frozen=True) -class _TraceSummary(_TraceBase): +class _TraceSummary(_TraceInfo): """Metadata for a contiguous MiniSEED segment from one source ID.""" - info: _TraceInfo dtype: str @@ -211,6 +135,24 @@ def _get_time_limits(time=None) -> _TimeLimits: return _time_to_ns(time[0]), _time_to_ns(time[1]) +def _trace_end_ns(trace: _TraceInfo) -> int: + """Return the time of the final sample in a trace.""" + return trace.start_ns + max(trace.sample_count - 1, 0) * trace.sample_step_ns + + +def _trace_time_window(trace: _TraceInfo) -> _TraceWindow: + """Return the inclusive time window covered by a trace.""" + return trace.start_ns, _trace_end_ns(trace) + + +def _time_windows_overlap(start: int, stop: int, limits: _TimeLimits) -> bool: + """Return True if an inclusive start/stop range overlaps optional limits.""" + limit_start, limit_stop = limits + after_start = limit_stop is None or start <= limit_stop + before_stop = limit_start is None or stop >= limit_start + return after_start and before_stop + + def _source_id_to_nslc(pymseed, source_id: str) -> tuple[str, str, str, str]: """Return NSLC codes from a MiniSEED source ID.""" parse_errors = (ValueError, TypeError) @@ -261,12 +203,19 @@ def _record_to_trace_info(record, pymseed, sample_count: int) -> _TraceInfo: def _record_overlaps_time(record, time_limits: _TimeLimits) -> bool: """Return True if a record overlaps a requested time range.""" - start, stop = time_limits + return _time_windows_overlap( + int(record.starttime), int(record.endtime), time_limits + ) + + +def _record_overlaps_source_windows(record, source_windows: _SourceWindows) -> bool: + """Return True if a record overlaps one selected source time window.""" record_start = int(record.starttime) record_stop = int(record.endtime) - after_start = stop is None or record_start <= stop - before_stop = start is None or record_stop >= start - return after_start and before_stop + windows = source_windows.get(str(record.sourceid), ()) + return any( + _time_windows_overlap(record_start, record_stop, window) for window in windows + ) def _record_to_segment( @@ -277,8 +226,9 @@ def _record_to_segment( return None record.unpack_data() data = np.asarray(record.np_datasamples) + info = _record_to_trace_info(record, pymseed, sample_count=len(data)) segment = _TraceSegment( - info=_record_to_trace_info(record, pymseed, sample_count=len(data)), + **info.__dict__, # Detach from the PyMseed record buffer before records are advanced. data=data.copy(), ) @@ -288,8 +238,9 @@ def _record_to_segment( def _record_to_summary(record, pymseed) -> _TraceSummary: """Convert a PyMseed record header to a trace summary.""" # Header-only scan trusts samplecnt; corrupted records may decode differently. + info = _record_to_trace_info(record, pymseed, sample_count=int(record.samplecnt)) return _TraceSummary( - info=_record_to_trace_info(record, pymseed, sample_count=int(record.samplecnt)), + **info.__dict__, dtype=_record_dtype(record), ) @@ -312,14 +263,10 @@ def _trim_segment_time( ) if stop_index <= start_index: return None - info = replace( - segment.info, - start_ns=segment.start_ns + start_index * step, - sample_count=stop_index - start_index, - ) return replace( segment, - info=info, + start_ns=segment.start_ns + start_index * step, + sample_count=stop_index - start_index, data=segment.data[start_index:stop_index].copy(), ) @@ -351,26 +298,56 @@ def _coalesce_source_traces( def _coalesce_source_segments(segments: list[_TraceSegment]) -> list[_TraceSegment]: """Merge contiguous decoded records for the same source ID.""" - def can_merge(pending: _TraceSegment, seg: _TraceSegment) -> bool: - return ( - pending.sample_rate == seg.sample_rate - and pending.format_version == seg.format_version - and pending.next_start_ns == seg.start_ns - and pending.data.dtype == seg.data.dtype - ) - - def merge(pending: _TraceSegment, seg: _TraceSegment) -> _TraceSegment: + def flush_pending( + pending: _TraceSegment, + data: list[np.ndarray], + sample_count: int, + record_length: int, + ) -> _TraceSegment: + """Return the current pending segment without repeated concatenation.""" + if len(data) == 1: + return pending return replace( pending, - info=replace( - pending.info, - sample_count=pending.sample_count + seg.sample_count, - record_length=max(pending.record_length, seg.record_length), - ), - data=np.concatenate([pending.data, seg.data]), + sample_count=sample_count, + record_length=record_length, + data=np.concatenate(data), ) - return _coalesce_source_traces(segments, can_merge, merge) + segments = sorted(segments, key=lambda x: (x.source_id, x.start_ns)) + out = [] + for _, source_group in groupby(segments, key=lambda x: x.source_id): + pending = None + data = [] + sample_count = 0 + record_length = 0 + for seg in source_group: + if pending is None: + pending = seg + data = [seg.data] + sample_count = seg.sample_count + record_length = seg.record_length + continue + can_merge = ( + pending.sample_rate == seg.sample_rate + and pending.format_version == seg.format_version + and pending.start_ns + sample_count * pending.sample_step_ns + == seg.start_ns + and pending.data.dtype == seg.data.dtype + ) + if can_merge: + data.append(seg.data) + sample_count += seg.sample_count + record_length = max(record_length, seg.record_length) + else: + out.append(flush_pending(pending, data, sample_count, record_length)) + pending = seg + data = [seg.data] + sample_count = seg.sample_count + record_length = seg.record_length + if pending is not None: + out.append(flush_pending(pending, data, sample_count, record_length)) + return out def _coalesce_source_summaries(summaries: list[_TraceSummary]) -> list[_TraceSummary]: @@ -388,21 +365,27 @@ def can_merge(pending: _TraceSummary, summary: _TraceSummary) -> bool: def merge(pending: _TraceSummary, summary: _TraceSummary) -> _TraceSummary: return replace( pending, - info=replace( - pending.info, - sample_count=pending.sample_count + summary.sample_count, - record_length=max(pending.record_length, summary.record_length), - ), + sample_count=pending.sample_count + summary.sample_count, + record_length=max(pending.record_length, summary.record_length), ) return _coalesce_source_traces(summaries, can_merge, merge) -def _read_segments(path: LocalPath, pymseed, time=None) -> list[_TraceSegment]: +def _read_segments( + path: LocalPath, + pymseed, + time=None, + source_windows: _SourceWindows | None = None, +) -> list[_TraceSegment]: """Read and coalesce decoded MiniSEED records.""" segments = [] time_limits = _get_time_limits(time) for record in pymseed.MS3Record.from_file(str(path), unpack_data=False): + if source_windows is not None and not _record_overlaps_source_windows( + record, source_windows + ): + continue segment = _record_to_segment(record, pymseed, time_limits) if segment is not None and segment.sample_count: segments.append(segment) @@ -419,7 +402,7 @@ def _scan_segments(path: LocalPath, pymseed) -> list[_TraceSummary]: return _coalesce_source_summaries(summaries) -def _get_group_key(segment: _TraceBase) -> _TraceGroupKey: +def _get_group_key(segment: _TraceInfo) -> _TraceGroupKey: """Return the compatibility key used to merge traces into a patch.""" return _TraceGroupKey( format_version=segment.format_version, @@ -439,7 +422,7 @@ def _group_segments(segments: Sequence[_T]): yield key, list(group) -def _get_channel_map(segments: Sequence[_TraceBase]) -> dict[str, int]: +def _get_channel_map(segments: Sequence[_TraceInfo]) -> dict[str, int]: """Return stable channel indices for MiniSEED sources.""" source_ids = dict.fromkeys( x.source_id for x in sorted(segments, key=lambda y: (y.station, y.source_id)) @@ -447,6 +430,39 @@ def _get_channel_map(segments: Sequence[_TraceBase]) -> dict[str, int]: return {source_id: ind for ind, source_id in enumerate(source_ids)} +def _get_selected_source_ids(channel_map: dict[str, int], channel) -> set[str]: + """Return source IDs selected by DASCore channel selector semantics.""" + if channel is None: + return set(channel_map) + source_ids = np.asarray(tuple(channel_map)) + channel_values = np.asarray([channel_map[x] for x in source_ids]) + channel_coord = get_coord(data=channel_values) + _, indexer = channel_coord.select(channel) + return {str(x) for x in np.atleast_1d(source_ids[indexer])} + + +def _get_read_plan(path, pymseed, wanted_ids, channel): + """Return scan-derived groups and source windows needed for a read.""" + summaries = _scan_segments(path, pymseed) + channel_map = _get_channel_map(summaries) + selected_source_ids = _get_selected_source_ids(channel_map, channel) + source_windows: _SourceWindows = {} + groups = [] + for group_key, group in _group_segments(summaries): + patch_id = _source_patch_id(group_key) + if wanted_ids and patch_id not in wanted_ids: + continue + group = [x for x in group if x.source_id in selected_source_ids] + if not group: + continue + groups.append((group_key, group)) + for summary in group: + source_windows.setdefault(summary.source_id, []).append( + _trace_time_window(summary) + ) + return channel_map, groups, source_windows + + def _source_patch_id(group_key: _TraceGroupKey) -> str: """Return a stable source patch ID for a MiniSEED group.""" source_key = ".".join( @@ -460,7 +476,7 @@ def _source_patch_id(group_key: _TraceGroupKey) -> str: def _get_coords( - segments: Sequence[_TraceBase], channel_map: dict[str, int] | None = None + segments: Sequence[_TraceInfo], channel_map: dict[str, int] | None = None ): """Return DASCore coordinates from compatible MiniSEED segments.""" segments = sorted(segments, key=lambda x: (x.station, x.source_id)) @@ -512,6 +528,25 @@ def _patch_from_segments( return dc.Patch(data=data, dims=("channel", "time"), coords=coords, attrs=attrs) +def _segments_from_summaries( + segments: Sequence[_TraceSegment], summaries: Sequence[_TraceSummary] +) -> list[_TraceSegment]: + """Return decoded segments that overlap selected scan summaries.""" + summaries_by_source = {} + for summary in summaries: + summaries_by_source.setdefault(summary.source_id, []).append(summary) + out = [] + for segment in segments: + segment_start, segment_stop = _trace_time_window(segment) + for summary in summaries_by_source.get(segment.source_id, ()): + if _time_windows_overlap( + segment_start, segment_stop, _trace_time_window(summary) + ): + out.append(segment) + break + return out + + def _scan_payload_from_segments( group_key, segments: list[_TraceSummary], @@ -540,27 +575,27 @@ def _get_patches( ) -> list[dc.Patch]: """Read MiniSEED patches from a path.""" wanted_ids = _normalize_source_patch_ids(source_patch_id) - read_time = None if wanted_ids else time - trim_limits = _get_time_limits(time) if wanted_ids else (None, None) patches = [] - all_segments = _read_segments(path, pymseed, read_time) - channel_map = _get_channel_map(all_segments) - segment_groups = _group_segments(all_segments) + use_scan_plan = bool(wanted_ids) or channel is not None + if use_scan_plan: + channel_map, segment_groups, source_windows = _get_read_plan( + path, pymseed, wanted_ids, channel + ) + if not segment_groups: + return [] + all_segments = _read_segments( + path, pymseed, time=time, source_windows=source_windows + ) + else: + all_segments = _read_segments(path, pymseed, time=time) + channel_map = _get_channel_map(all_segments) + segment_groups = list(_group_segments(all_segments)) for group_key, segments in segment_groups: - patch_id = _source_patch_id(group_key) - if wanted_ids and patch_id not in wanted_ids: - continue - if wanted_ids: - segments = [ - trimmed - for segment in segments - if (trimmed := _trim_segment_time(segment, trim_limits)) is not None - ] + if use_scan_plan: + segments = _segments_from_summaries(all_segments, segments) if not segments: continue patch = _patch_from_segments(group_key, segments, channel_map=channel_map) - if channel is not None: - patch = patch.select(channel=channel) if patch.size: patches.append(patch) return sorted(patches, key=lambda x: x.get_coord("channel").min()) diff --git a/tests/test_io/test_mseed/test_mseed.py b/tests/test_io/test_mseed/test_mseed.py index fedb5e78..115cae04 100644 --- a/tests/test_io/test_mseed/test_mseed.py +++ b/tests/test_io/test_mseed/test_mseed.py @@ -89,19 +89,71 @@ def _trace_segment(**kwargs): record_length=256, ) defaults.update(kwargs) - info = mseed_utils._TraceInfo(**defaults) - return mseed_utils._TraceSegment(info=info, data=data) + return mseed_utils._TraceSegment(**defaults, data=data) def _trace_summary(**kwargs): """Return a small MiniSEED trace summary for helper tests.""" segment = _trace_segment(**kwargs) + summary_kwargs = { + key: getattr(segment, key) for key in mseed_utils._TraceInfo.__annotations__ + } return mseed_utils._TraceSummary( - info=segment.info, + **summary_kwargs, dtype=str(segment.data.dtype), ) +class _MiniSeedRecord: + """Small record stub for exercising read-planning paths.""" + + def __init__( + self, + sourceid="FDSN:XX_00000__H_S_F", + data=None, + samprate=10.0, + starttime=0, + fail_unpack=False, + ): + self.sourceid = sourceid + self.formatversion = 3 + self.starttime = starttime + self.samprate = samprate + self.np_datasamples = np.arange(3, dtype=np.int32) if data is None else data + self.samplecnt = len(self.np_datasamples) + self.endtime = starttime + int((self.samplecnt - 1) * 1_000_000_000 / samprate) + self.sampletype = "i" + self.encoding = 3 + self.pubversion = 0 + self.reclen = 256 + self.fail_unpack = fail_unpack + + def unpack_data(self): + """Raise when a test expects this record to be skipped.""" + if self.fail_unpack: + raise AssertionError(f"{self.sourceid} should not unpack") + + +def _pymseed_for_records(records): + """Return a small PyMseed-like object for record iteration tests.""" + + class PyMseed: + class MS3Record: + @staticmethod + def from_file(path, *, unpack_data): + assert path == "unused" + assert unpack_data is False + yield from records + + @staticmethod + def sourceid2nslc(source_id): + _, rest = source_id.split(":", maxsplit=1) + network, station, location, band, source, subsource = rest.split("_") + return network, station, location, f"{band}{source}{subsource}" + + return PyMseed + + @pytest.fixture def mseed_v2_path(tmp_path): """Return a small MiniSEED v2 path.""" @@ -297,6 +349,56 @@ def test_read_source_patch_id_with_non_overlapping_time(self, mseed_v3_path): ) assert len(spool) == 0 + def test_source_patch_id_skips_unselected_record_unpack(self): + """source_patch_id reads do not decode records outside selected groups.""" + records = [ + _MiniSeedRecord("FDSN:XX_00000__H_S_F", samprate=10.0), + _MiniSeedRecord("FDSN:XX_00001__H_S_F", samprate=20.0, fail_unpack=True), + ] + target = "v3:XX..HSF:0:10:3" + patches = mseed_utils._get_patches( + "unused", + _pymseed_for_records(records), + source_patch_id=target, + ) + assert len(patches) == 1 + assert patches[0].attrs["_source_patch_id"] == target + assert tuple(patches[0].get_coord("channel").values) == (0,) + + def test_channel_filter_skips_unselected_record_unpack(self): + """Channel reads do not decode records outside selected channels.""" + records = [ + _MiniSeedRecord("FDSN:XX_00000__H_S_F"), + _MiniSeedRecord("FDSN:XX_00001__H_S_F", fail_unpack=True), + ] + patches = mseed_utils._get_patches( + "unused", + _pymseed_for_records(records), + channel=(0, 0), + ) + assert len(patches) == 1 + assert tuple(patches[0].get_coord("channel").values) == (0,) + + def test_unmatched_source_patch_id_returns_no_patches(self): + """Unknown source patch IDs produce no decoded patches.""" + records = [_MiniSeedRecord("FDSN:XX_00000__H_S_F", fail_unpack=True)] + patches = mseed_utils._get_patches( + "unused", + _pymseed_for_records(records), + source_patch_id="v3:XX..HSF:1:10:3", + ) + assert patches == [] + + def test_unmatched_channel_returns_no_patches(self): + """Channel selections with no sources produce no decoded patches.""" + records = [_MiniSeedRecord("FDSN:XX_00000__H_S_F", fail_unpack=True)] + patches = mseed_utils._get_patches( + "unused", + _pymseed_for_records(records), + channel=(10, 10), + ) + assert patches == [] + def test_discontinuous_source_records_are_separate_patches(self, tmp_path): """Discontinuous records for one source ID are not coalesced.""" pymseed = pytest.importorskip("pymseed") @@ -477,6 +579,11 @@ def unpack_data(self): assert mseed_utils._record_to_segment(Record(), None, (20, 30)) is None + def test_trim_segment_time_returns_none_for_empty_selection(self): + """Time trimming can remove all samples from a decoded segment.""" + segment = _trace_segment() + assert mseed_utils._trim_segment_time(segment, (25_000_000, 75_000_000)) is None + def test_record_dtype_from_sample_type(self): """Populated sample types are preferred when inferring scan dtype.""" From c2ed884e7f619c8d25335c9f4b1530f6427e96e9 Mon Sep 17 00:00:00 2001 From: Derrick Chambers Date: Sat, 2 May 2026 20:27:07 +0200 Subject: [PATCH 7/8] address review again, improve performance --- dascore/io/mseed/utils.py | 23 ++++++++++++++++++++--- tests/test_io/test_mseed/test_mseed.py | 17 ++++++++++++++++- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/dascore/io/mseed/utils.py b/dascore/io/mseed/utils.py index 17997682..0450a13f 100644 --- a/dascore/io/mseed/utils.py +++ b/dascore/io/mseed/utils.py @@ -2,8 +2,10 @@ from __future__ import annotations +import struct from collections.abc import Callable, Sequence from dataclasses import dataclass, replace +from fractions import Fraction from itertools import groupby from math import ceil, floor from struct import unpack @@ -77,7 +79,9 @@ def sample_step_ns(self) -> int: @property def next_start_ns(self) -> int: """Return the expected start time of the next contiguous segment.""" - return self.start_ns + self.sample_count * self.sample_step_ns + return self.start_ns + _sample_count_duration_ns( + self.sample_rate, self.sample_count + ) @dataclass(frozen=True) @@ -121,6 +125,17 @@ def _sample_step_ns(sample_rate: float) -> int: return round(seconds * ONE_BILLION) +def _sample_count_duration_ns(sample_rate: float, sample_count: int) -> int: + """Convert a sample count to nanoseconds without per-sample drift.""" + if sample_rate == 0: + msg = "MiniSEED sample rate cannot be zero." + raise ValueError(msg) + rate = Fraction(str(sample_rate)) + samples = Fraction(sample_count, 1) + seconds = samples / rate if rate > 0 else samples * abs(rate) + return round(seconds * ONE_BILLION) + + def _time_to_ns(value) -> int | None: """Convert a time-like value to epoch nanoseconds.""" if value is None or value is ...: @@ -331,9 +346,11 @@ def flush_pending( can_merge = ( pending.sample_rate == seg.sample_rate and pending.format_version == seg.format_version - and pending.start_ns + sample_count * pending.sample_step_ns + and pending.start_ns + + _sample_count_duration_ns(pending.sample_rate, sample_count) == seg.start_ns and pending.data.dtype == seg.data.dtype + and pending.encoding == seg.encoding ) if can_merge: data.append(seg.data) @@ -641,7 +658,7 @@ def _detect_mseed_v2_header(header: bytes) -> bool: hour, minute, second = header[24:27] _unused, frac_seconds, sample_count = unpack(">BHH", header[27:32]) data_offset, blockette_offset = unpack(">HH", header[44:48]) - except Exception: + except struct.error: return False valid_time = ( 1900 <= year <= 2600 diff --git a/tests/test_io/test_mseed/test_mseed.py b/tests/test_io/test_mseed/test_mseed.py index 115cae04..92a76481 100644 --- a/tests/test_io/test_mseed/test_mseed.py +++ b/tests/test_io/test_mseed/test_mseed.py @@ -513,11 +513,19 @@ def test_zero_sample_rate_raises(self): """A zero sample rate cannot be converted to a sample step.""" with pytest.raises(ValueError, match="sample rate"): mseed_utils._sample_step_ns(0) + with pytest.raises(ValueError, match="sample rate"): + mseed_utils._sample_count_duration_ns(0, 1) def test_negative_sample_rate_is_sample_period(self): """Negative MiniSEED sample rates are interpreted as sample periods.""" assert mseed_utils._sample_step_ns(-0.1) == 100_000_000 + def test_next_start_ns_avoids_sample_step_drift(self): + """Record adjacency uses total duration instead of rounded sample steps.""" + segment = _trace_segment(sample_rate=3.0, sample_count=3) + assert segment.sample_step_ns == 333_333_333 + assert segment.next_start_ns == 1_000_000_000 + def test_open_time_limits(self): """None and ellipsis are open time bounds.""" assert mseed_utils._get_time_limits(...) == (None, None) @@ -631,6 +639,13 @@ def test_discontinuous_source_summaries_are_not_coalesced(self): out = mseed_utils._coalesce_source_summaries([first, second]) assert len(out) == 2 + def test_source_segments_with_different_encodings_are_not_coalesced(self): + """Read and scan coalescing both preserve MiniSEED encoding changes.""" + first = _trace_segment(encoding="3") + second = _trace_segment(start_ns=first.next_start_ns, encoding="11") + out = mseed_utils._coalesce_source_segments([first, second]) + assert len(out) == 2 + def test_patch_from_segments_defaults_to_local_channel_indices(self): """Patches can still be built without a global channel map.""" segment = _trace_segment() @@ -707,7 +722,7 @@ def test_detect_mseed_v2_header_handles_unpack_error(self, monkeypatch): """Header unpack failures are treated as non-MiniSEED.""" def _raise_unpack(*args, **kwargs): - raise ValueError("bad unpack") + raise mseed_utils.struct.error("bad unpack") monkeypatch.setattr(mseed_utils, "unpack", _raise_unpack) assert not mseed_utils._detect_mseed_v2_header(_mseed_v2_header()) From a6f03ee74df5ed7de3b065f7ae17d23cfb976c91 Mon Sep 17 00:00:00 2001 From: Derrick Chambers Date: Tue, 5 May 2026 12:21:35 +0200 Subject: [PATCH 8/8] another review and cleanup --- dascore/io/mseed/utils.py | 104 ++++++++++++------------- tests/test_io/test_mseed/test_mseed.py | 6 +- 2 files changed, 51 insertions(+), 59 deletions(-) diff --git a/dascore/io/mseed/utils.py b/dascore/io/mseed/utils.py index 0450a13f..252882dc 100644 --- a/dascore/io/mseed/utils.py +++ b/dascore/io/mseed/utils.py @@ -9,7 +9,7 @@ from itertools import groupby from math import ceil, floor from struct import unpack -from typing import TypeVar +from typing import Generic, TypeVar import numpy as np @@ -74,14 +74,12 @@ class _TraceInfo: @property def sample_step_ns(self) -> int: """Return the sample spacing in nanoseconds.""" - return _sample_step_ns(self.sample_rate) + return _duration_ns(self.sample_rate) @property def next_start_ns(self) -> int: """Return the expected start time of the next contiguous segment.""" - return self.start_ns + _sample_count_duration_ns( - self.sample_rate, self.sample_count - ) + return self.start_ns + _duration_ns(self.sample_rate, self.sample_count) @dataclass(frozen=True) @@ -115,49 +113,45 @@ class _TraceGroupKey: sample_count: int -def _sample_step_ns(sample_rate: float) -> int: - """Convert a MiniSEED sample rate to nanosecond spacing.""" - if sample_rate == 0: - msg = "MiniSEED sample rate cannot be zero." - raise ValueError(msg) - # MiniSEED negative sample rates encode the sample period in seconds. - seconds = 1 / sample_rate if sample_rate > 0 else abs(sample_rate) - return round(seconds * ONE_BILLION) +@dataclass(frozen=True) +class _PreparedGroup(Generic[_T]): + """Sorted group data shared by patch and scan payload creation.""" + + segments: Sequence[_T] + first: _T + coords: dict + attrs: dict -def _sample_count_duration_ns(sample_rate: float, sample_count: int) -> int: - """Convert a sample count to nanoseconds without per-sample drift.""" +def _duration_ns(sample_rate: float, sample_count: int = 1) -> int: + """Convert a sample count and MiniSEED sample rate to nanoseconds.""" if sample_rate == 0: msg = "MiniSEED sample rate cannot be zero." raise ValueError(msg) + # MiniSEED negative sample rates encode the sample period in seconds. rate = Fraction(str(sample_rate)) samples = Fraction(sample_count, 1) seconds = samples / rate if rate > 0 else samples * abs(rate) return round(seconds * ONE_BILLION) -def _time_to_ns(value) -> int | None: - """Convert a time-like value to epoch nanoseconds.""" - if value is None or value is ...: - return None - return int(to_int(to_datetime64(value))) - - def _get_time_limits(time=None) -> _TimeLimits: """Return optional time limits as epoch nanoseconds.""" - if time is None or time is ...: - return None, None - return _time_to_ns(time[0]), _time_to_ns(time[1]) + def to_ns(value): + if value is None or value is ...: + return None + return int(to_int(to_datetime64(value))) -def _trace_end_ns(trace: _TraceInfo) -> int: - """Return the time of the final sample in a trace.""" - return trace.start_ns + max(trace.sample_count - 1, 0) * trace.sample_step_ns + if time is None or time is ...: + return None, None + return to_ns(time[0]), to_ns(time[1]) def _trace_time_window(trace: _TraceInfo) -> _TraceWindow: """Return the inclusive time window covered by a trace.""" - return trace.start_ns, _trace_end_ns(trace) + stop = trace.start_ns + max(trace.sample_count - 1, 0) * trace.sample_step_ns + return trace.start_ns, stop def _time_windows_overlap(start: int, stop: int, limits: _TimeLimits) -> bool: @@ -216,13 +210,6 @@ def _record_to_trace_info(record, pymseed, sample_count: int) -> _TraceInfo: ) -def _record_overlaps_time(record, time_limits: _TimeLimits) -> bool: - """Return True if a record overlaps a requested time range.""" - return _time_windows_overlap( - int(record.starttime), int(record.endtime), time_limits - ) - - def _record_overlaps_source_windows(record, source_windows: _SourceWindows) -> bool: """Return True if a record overlaps one selected source time window.""" record_start = int(record.starttime) @@ -237,7 +224,9 @@ def _record_to_segment( record, pymseed, time_limits: _TimeLimits ) -> _TraceSegment | None: """Convert a PyMseed record to a trace segment.""" - if not _record_overlaps_time(record, time_limits): + if not _time_windows_overlap( + int(record.starttime), int(record.endtime), time_limits + ): return None record.unpack_data() data = np.asarray(record.np_datasamples) @@ -346,8 +335,7 @@ def flush_pending( can_merge = ( pending.sample_rate == seg.sample_rate and pending.format_version == seg.format_version - and pending.start_ns - + _sample_count_duration_ns(pending.sample_rate, sample_count) + and pending.start_ns + _duration_ns(pending.sample_rate, sample_count) == seg.start_ns and pending.data.dtype == seg.data.dtype and pending.encoding == seg.encoding @@ -519,9 +507,14 @@ def _get_coords( } -def _get_attrs(group_key, first) -> dict: - """Return DASCore attrs from a MiniSEED segment group.""" - return { +def _prepare_group( + group_key, segments: Sequence[_T], channel_map=None +) -> _PreparedGroup[_T]: + """Return shared sorted segments, coordinates, and attrs for one group.""" + segments = tuple(sorted(segments, key=lambda x: (x.station, x.source_id))) + first = segments[0] + coords = _get_coords(segments, channel_map=channel_map) + attrs = { "data_type": "", "tag": ".".join((first.network, first.location, first.seed_channel)), "sample_rate": first.sample_rate, @@ -531,18 +524,21 @@ def _get_attrs(group_key, first) -> dict: "mseed_record_length": first.record_length, "_source_patch_id": _source_patch_id(group_key), } + return _PreparedGroup(segments, first, coords, attrs) def _patch_from_segments( group_key, segments: list[_TraceSegment], channel_map: dict[str, int] | None = None ) -> dc.Patch: """Create a DASCore Patch from compatible MiniSEED trace segments.""" - segments = sorted(segments, key=lambda x: (x.station, x.source_id)) - first = segments[0] - data = np.stack([x.data for x in segments]) - coords = _get_coords(segments, channel_map=channel_map) - attrs = _get_attrs(group_key, first) - return dc.Patch(data=data, dims=("channel", "time"), coords=coords, attrs=attrs) + prepared = _prepare_group(group_key, segments, channel_map) + data = np.stack([x.data for x in prepared.segments]) + return dc.Patch( + data=data, + dims=("channel", "time"), + coords=prepared.coords, + attrs=prepared.attrs, + ) def _segments_from_summaries( @@ -570,20 +566,18 @@ def _scan_payload_from_segments( channel_map: dict[str, int] | None = None, ) -> ScanPayload: """Create a DASCore scan payload from MiniSEED trace summaries.""" - segments = sorted(segments, key=lambda x: (x.station, x.source_id)) - first = segments[0] + prepared = _prepare_group(group_key, segments, channel_map) coords = get_coord_manager( - _get_coords(segments, channel_map=channel_map), + prepared.coords, dims=("channel", "time"), ) - attrs = _get_attrs(group_key, first) return _make_scan_payload( - attrs=attrs, + attrs=prepared.attrs, coords=coords, dims=coords.dims, shape=coords.shape, - dtype=first.dtype, - source_patch_id=attrs["_source_patch_id"], + dtype=prepared.first.dtype, + source_patch_id=prepared.attrs["_source_patch_id"], ) diff --git a/tests/test_io/test_mseed/test_mseed.py b/tests/test_io/test_mseed/test_mseed.py index 92a76481..975ee101 100644 --- a/tests/test_io/test_mseed/test_mseed.py +++ b/tests/test_io/test_mseed/test_mseed.py @@ -512,13 +512,11 @@ class TestMiniSeedUtils: def test_zero_sample_rate_raises(self): """A zero sample rate cannot be converted to a sample step.""" with pytest.raises(ValueError, match="sample rate"): - mseed_utils._sample_step_ns(0) - with pytest.raises(ValueError, match="sample rate"): - mseed_utils._sample_count_duration_ns(0, 1) + mseed_utils._duration_ns(0) def test_negative_sample_rate_is_sample_period(self): """Negative MiniSEED sample rates are interpreted as sample periods.""" - assert mseed_utils._sample_step_ns(-0.1) == 100_000_000 + assert mseed_utils._duration_ns(-0.1) == 100_000_000 def test_next_start_ns_avoids_sample_step_drift(self): """Record adjacency uses total duration instead of rounded sample steps."""