diff --git a/dascore/data_registry.txt b/dascore/data_registry.txt index caf9c249..1e8d8eb2 100644 --- a/dascore/data_registry.txt +++ b/dascore/data_registry.txt @@ -6,6 +6,7 @@ terra15_v6_test_file.hdf5 63f130ed93c1fe4105f2a355272d136a51abfe5c212a40be3a2907 iDAS005_hdf5_example.626.h5 28f4f9b1f2b248fa0419b41c11e971f2d395c3ed43afc1ef5d5c35f399e99190 https://github.com/dasdae/test_data/raw/master/das/iDAS005_hdf5_example.626.h5 iDAS005_tdms_example.626.tdms 0c31f75015bc671d958967c7fdf32e2e64fdb467f997d331ae6a7d4c989ab380 https://github.com/dasdae/test_data/raw/master/das/iDAS005_tdms_example.626.tdms sample_tdms_file_v4713.tdms 22a79c4c3166ce3cc8467265540cdb3ad2b54460209d39641c1e8f20e64eca65 https://github.com/dasdae/test_data/raw/master/das/sample_tdms_file_v4713.tdms +etna_9n_3chan_10s.mseed e79608a6aa47cd0b70fa227679bc5d266af117ea4bc3e02e0c336c563b35b280 https://github.com/DASDAE/test_data/raw/master/das/etna_9n_3chan_10s.mseed example_dasdae_event_1.h5 854530af7ea10bab54dd0f6458c5d67d8b334ee6e4032b75702fa408352b5007 https://github.com/dasdae/test_data/raw/master/das/example_dasdae_event_1.h5 opta_sense_quantx_v2.h5 fca8c8b19a9e253f9252c14de3f2a7778391a763f39142fffc63ab0cc3fa9cc7 https://github.com/dasdae/test_data/raw/master/das/opta_sense_quantx_v2.h5 prodml_2.0.h5 8f8eded106acab0b4266c5c752cc702363cb09391f5fddf1534429e94e4e9ab2 https://github.com/dasdae/test_data/raw/master/das/prodml_2.0.h5 diff --git a/dascore/io/mseed/__init__.py b/dascore/io/mseed/__init__.py new file mode 100644 index 00000000..83c12b31 --- /dev/null +++ b/dascore/io/mseed/__init__.py @@ -0,0 +1,5 @@ +"""DASCore support for MiniSEED files.""" + +from __future__ import annotations + +from dascore.io.mseed.core import MSeedV2, MSeedV3 diff --git a/dascore/io/mseed/core.py b/dascore/io/mseed/core.py new file mode 100644 index 00000000..4f605064 --- /dev/null +++ b/dascore/io/mseed/core.py @@ -0,0 +1,81 @@ +"""IO support for MiniSEED files. + +DASCore maps MiniSEED source IDs into 2D ``("channel", "time")`` patches. +MiniSEED source identity is preserved as per-channel coordinates instead of +scalar attrs because one DASCore patch can contain many MiniSEED sources. +The preserved source coordinates are ``source_id``, ``network``, ``station``, +``location``, and ``seed_channel``. + +Compatible MiniSEED sources are merged into one patch when they share the same +format version, network, location, SEED channel, start time, sample rate, and +sample count. Sources with different sample rates, start times, or sample +counts are returned as separate patches. When sources split across multiple +patches, DASCore keeps stable integer ``channel`` coordinate values so +selections still refer to the same source. + +The MiniSEED ``channel`` coordinate is an integer source index, not optical +distance. Optical distance is usually stored in companion DAS metadata rather +than MiniSEED record headers and should be attached from that metadata when +available. + +The mapping follows the FDSN source identifier convention, where a SEED NSLC +code maps to ``FDSN:_____``. +See https://docs.fdsn.org/projects/source-identifiers/en/latest/definition.html. + +For DAS MiniSEED, GEOFON recommends representing each fiber sampling point as +a station because each sampling point has its own position; the channel code +``HSF`` is recommended for fiber optic DAS. See +https://geofon.gfz.de/redmine/projects/redmine/wiki/DAS. +""" + +from __future__ import annotations + +import dascore as dc +from dascore.constants import opt_timeable_types +from dascore.io import FiberIO, ScanPayload +from dascore.utils.io import LocalPath +from dascore.utils.misc import optional_import + +from .utils import _detect_format, _get_patches, _scan_patches + + +class MSeedV2(FiberIO): + """Support MiniSEED version 2 files.""" + + name = "MSEED" + preferred_extensions = ("mseed", "msd", "miniseed") + version = "2" + + def get_format(self, resource: LocalPath, **kwargs) -> tuple[str, str] | bool: + """Determine if path is a MiniSEED file.""" + return _detect_format(resource) + + def scan(self, resource: LocalPath, **kwargs) -> list[ScanPayload]: + """Scan a MiniSEED file.""" + pymseed = optional_import("pymseed") + return _scan_patches(resource, pymseed) + + def read( + self, + resource: LocalPath, + time: tuple[opt_timeable_types, opt_timeable_types] | None = None, + channel: tuple[int | None, int | None] | None = None, + source_patch_id=(), + **kwargs, + ) -> dc.BaseSpool: + """Read a MiniSEED file.""" + pymseed = optional_import("pymseed") + patches = _get_patches( + resource, + pymseed, + time=time, + channel=channel, + source_patch_id=source_patch_id, + ) + return dc.spool(patches) + + +class MSeedV3(MSeedV2): + """Support MiniSEED version 3 files.""" + + version = "3" diff --git a/dascore/io/mseed/utils.py b/dascore/io/mseed/utils.py new file mode 100644 index 00000000..252882dc --- /dev/null +++ b/dascore/io/mseed/utils.py @@ -0,0 +1,678 @@ +"""Utilities for MiniSEED IO.""" + +from __future__ import annotations + +import struct +from collections.abc import Callable, Sequence +from dataclasses import dataclass, replace +from fractions import Fraction +from itertools import groupby +from math import ceil, floor +from struct import unpack +from typing import Generic, TypeVar + +import numpy as np + +import dascore as dc +from dascore.constants import ONE_BILLION +from dascore.core import get_coord, get_coord_manager +from dascore.io import ScanPayload +from dascore.io.core import _make_scan_payload +from dascore.utils.io import LocalPath, _normalize_source_patch_ids, _read_file_header +from dascore.utils.time import to_datetime64, to_int + +_TimeLimits = tuple[int | None, int | None] +_TraceWindow = tuple[int, int] +_SourceWindows = dict[str, list[_TimeLimits]] +_T = TypeVar("_T", bound="_TraceInfo") + +_TEXT_ENCODING = 0 +_INT16_ENCODING = 1 +_INT32_ENCODING = 3 +_FLOAT32_ENCODING = 4 +_FLOAT64_ENCODING = 5 +_STEIM1_ENCODING = 10 +_STEIM2_ENCODING = 11 + +_ENCODING_DTYPE_MAP = { + _TEXT_ENCODING: "S1", + _INT16_ENCODING: "int16", + _INT32_ENCODING: "int32", + _FLOAT32_ENCODING: "float32", + _FLOAT64_ENCODING: "float64", + _STEIM1_ENCODING: "int32", + _STEIM2_ENCODING: "int32", +} +_SAMPLE_TYPE_DTYPE_MAP = { + "i": "int32", + "f": "float32", + "d": "float64", + "t": "S1", +} +_SEED_CHARS = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" +_SEED_CHARS_WITH_SPACE = _SEED_CHARS + b" " + + +@dataclass(frozen=True) +class _TraceInfo: + """MiniSEED trace identity and timing metadata.""" + + source_id: str + network: str + station: str + location: str + seed_channel: str + format_version: str + start_ns: int + sample_rate: float + sample_count: int + sample_type: str + encoding: str + publication_version: int + record_length: int + + @property + def sample_step_ns(self) -> int: + """Return the sample spacing in nanoseconds.""" + return _duration_ns(self.sample_rate) + + @property + def next_start_ns(self) -> int: + """Return the expected start time of the next contiguous segment.""" + return self.start_ns + _duration_ns(self.sample_rate, self.sample_count) + + +@dataclass(frozen=True) +class _TraceSegment(_TraceInfo): + """A decoded contiguous MiniSEED segment from one source ID.""" + + data: np.ndarray + + def __post_init__(self) -> None: + """Validate that trace metadata matches the decoded samples.""" + assert len(self.data) == self.sample_count + + +@dataclass(frozen=True) +class _TraceSummary(_TraceInfo): + """Metadata for a contiguous MiniSEED segment from one source ID.""" + + dtype: str + + +@dataclass(frozen=True, order=True) +class _TraceGroupKey: + """Compatibility fields that define one MiniSEED output patch.""" + + format_version: str + network: str + location: str + seed_channel: str + start_ns: int + sample_rate: float + sample_count: int + + +@dataclass(frozen=True) +class _PreparedGroup(Generic[_T]): + """Sorted group data shared by patch and scan payload creation.""" + + segments: Sequence[_T] + first: _T + coords: dict + attrs: dict + + +def _duration_ns(sample_rate: float, sample_count: int = 1) -> int: + """Convert a sample count and MiniSEED sample rate to nanoseconds.""" + if sample_rate == 0: + msg = "MiniSEED sample rate cannot be zero." + raise ValueError(msg) + # MiniSEED negative sample rates encode the sample period in seconds. + rate = Fraction(str(sample_rate)) + samples = Fraction(sample_count, 1) + seconds = samples / rate if rate > 0 else samples * abs(rate) + return round(seconds * ONE_BILLION) + + +def _get_time_limits(time=None) -> _TimeLimits: + """Return optional time limits as epoch nanoseconds.""" + + def to_ns(value): + if value is None or value is ...: + return None + return int(to_int(to_datetime64(value))) + + if time is None or time is ...: + return None, None + return to_ns(time[0]), to_ns(time[1]) + + +def _trace_time_window(trace: _TraceInfo) -> _TraceWindow: + """Return the inclusive time window covered by a trace.""" + stop = trace.start_ns + max(trace.sample_count - 1, 0) * trace.sample_step_ns + return trace.start_ns, stop + + +def _time_windows_overlap(start: int, stop: int, limits: _TimeLimits) -> bool: + """Return True if an inclusive start/stop range overlaps optional limits.""" + limit_start, limit_stop = limits + after_start = limit_stop is None or start <= limit_stop + before_stop = limit_start is None or stop >= limit_start + return after_start and before_stop + + +def _source_id_to_nslc(pymseed, source_id: str) -> tuple[str, str, str, str]: + """Return NSLC codes from a MiniSEED source ID.""" + parse_errors = (ValueError, TypeError) + if source_id_error := getattr(pymseed, "SourceIDError", None): + parse_errors = (*parse_errors, source_id_error) + try: + nslc = pymseed.sourceid2nslc(source_id) + except parse_errors: + return "", source_id, "", "" + network, station, location, seed_channel = nslc + return ( + "" if network is None else str(network), + "" if station is None else str(station), + "" if location is None else str(location), + "" if seed_channel is None else str(seed_channel), + ) + + +def _record_dtype(record) -> str: + """Return the decoded NumPy dtype name from a MiniSEED record header.""" + if sample_type := getattr(record, "sampletype", None): + return _SAMPLE_TYPE_DTYPE_MAP.get(str(sample_type), "") + encoding = int(getattr(record, "encoding", -1)) + return _ENCODING_DTYPE_MAP.get(encoding, "") + + +def _record_to_trace_info(record, pymseed, sample_count: int) -> _TraceInfo: + """Convert common PyMseed record fields to trace metadata.""" + network, station, location, seed_channel = _source_id_to_nslc( + pymseed, str(record.sourceid) + ) + return _TraceInfo( + source_id=str(record.sourceid), + network=network, + station=station, + location=location, + seed_channel=seed_channel, + format_version=str(record.formatversion), + start_ns=int(record.starttime), + sample_rate=float(record.samprate), + sample_count=sample_count, + sample_type=str(record.sampletype or ""), + encoding=str(record.encoding), + publication_version=int(getattr(record, "pubversion", 0) or 0), + record_length=int(getattr(record, "reclen", 0) or 0), + ) + + +def _record_overlaps_source_windows(record, source_windows: _SourceWindows) -> bool: + """Return True if a record overlaps one selected source time window.""" + record_start = int(record.starttime) + record_stop = int(record.endtime) + windows = source_windows.get(str(record.sourceid), ()) + return any( + _time_windows_overlap(record_start, record_stop, window) for window in windows + ) + + +def _record_to_segment( + record, pymseed, time_limits: _TimeLimits +) -> _TraceSegment | None: + """Convert a PyMseed record to a trace segment.""" + if not _time_windows_overlap( + int(record.starttime), int(record.endtime), time_limits + ): + return None + record.unpack_data() + data = np.asarray(record.np_datasamples) + info = _record_to_trace_info(record, pymseed, sample_count=len(data)) + segment = _TraceSegment( + **info.__dict__, + # Detach from the PyMseed record buffer before records are advanced. + data=data.copy(), + ) + return _trim_segment_time(segment, time_limits) + + +def _record_to_summary(record, pymseed) -> _TraceSummary: + """Convert a PyMseed record header to a trace summary.""" + # Header-only scan trusts samplecnt; corrupted records may decode differently. + info = _record_to_trace_info(record, pymseed, sample_count=int(record.samplecnt)) + return _TraceSummary( + **info.__dict__, + dtype=_record_dtype(record), + ) + + +def _trim_segment_time( + segment: _TraceSegment, time_limits: _TimeLimits +) -> _TraceSegment | None: + """Trim a segment to a requested time range.""" + start, stop = time_limits + if start is None and stop is None: + return segment + step = segment.sample_step_ns + start_index = ( + 0 if start is None else max(0, ceil((start - segment.start_ns) / step)) + ) + stop_index = ( + segment.sample_count + if stop is None + else min(segment.sample_count, floor((stop - segment.start_ns) / step) + 1) + ) + if stop_index <= start_index: + return None + return replace( + segment, + start_ns=segment.start_ns + start_index * step, + sample_count=stop_index - start_index, + data=segment.data[start_index:stop_index].copy(), + ) + + +def _coalesce_source_traces( + traces: Sequence[_T], + can_merge: Callable[[_T, _T], bool], + merge: Callable[[_T, _T], _T], +) -> list[_T]: + """Merge contiguous traces for each MiniSEED source ID.""" + traces = sorted(traces, key=lambda x: (x.source_id, x.start_ns)) + out = [] + for _, source_group in groupby(traces, key=lambda x: x.source_id): + pending = None + for trace in source_group: + if pending is None: + pending = trace + continue + if can_merge(pending, trace): + pending = merge(pending, trace) + else: + out.append(pending) + pending = trace + if pending is not None: + out.append(pending) + return out + + +def _coalesce_source_segments(segments: list[_TraceSegment]) -> list[_TraceSegment]: + """Merge contiguous decoded records for the same source ID.""" + + def flush_pending( + pending: _TraceSegment, + data: list[np.ndarray], + sample_count: int, + record_length: int, + ) -> _TraceSegment: + """Return the current pending segment without repeated concatenation.""" + if len(data) == 1: + return pending + return replace( + pending, + sample_count=sample_count, + record_length=record_length, + data=np.concatenate(data), + ) + + segments = sorted(segments, key=lambda x: (x.source_id, x.start_ns)) + out = [] + for _, source_group in groupby(segments, key=lambda x: x.source_id): + pending = None + data = [] + sample_count = 0 + record_length = 0 + for seg in source_group: + if pending is None: + pending = seg + data = [seg.data] + sample_count = seg.sample_count + record_length = seg.record_length + continue + can_merge = ( + pending.sample_rate == seg.sample_rate + and pending.format_version == seg.format_version + and pending.start_ns + _duration_ns(pending.sample_rate, sample_count) + == seg.start_ns + and pending.data.dtype == seg.data.dtype + and pending.encoding == seg.encoding + ) + if can_merge: + data.append(seg.data) + sample_count += seg.sample_count + record_length = max(record_length, seg.record_length) + else: + out.append(flush_pending(pending, data, sample_count, record_length)) + pending = seg + data = [seg.data] + sample_count = seg.sample_count + record_length = seg.record_length + if pending is not None: + out.append(flush_pending(pending, data, sample_count, record_length)) + return out + + +def _coalesce_source_summaries(summaries: list[_TraceSummary]) -> list[_TraceSummary]: + """Merge contiguous record summaries for the same source ID.""" + + def can_merge(pending: _TraceSummary, summary: _TraceSummary) -> bool: + return ( + pending.sample_rate == summary.sample_rate + and pending.format_version == summary.format_version + and pending.next_start_ns == summary.start_ns + and pending.dtype == summary.dtype + and pending.encoding == summary.encoding + ) + + def merge(pending: _TraceSummary, summary: _TraceSummary) -> _TraceSummary: + return replace( + pending, + sample_count=pending.sample_count + summary.sample_count, + record_length=max(pending.record_length, summary.record_length), + ) + + return _coalesce_source_traces(summaries, can_merge, merge) + + +def _read_segments( + path: LocalPath, + pymseed, + time=None, + source_windows: _SourceWindows | None = None, +) -> list[_TraceSegment]: + """Read and coalesce decoded MiniSEED records.""" + segments = [] + time_limits = _get_time_limits(time) + for record in pymseed.MS3Record.from_file(str(path), unpack_data=False): + if source_windows is not None and not _record_overlaps_source_windows( + record, source_windows + ): + continue + segment = _record_to_segment(record, pymseed, time_limits) + if segment is not None and segment.sample_count: + segments.append(segment) + return _coalesce_source_segments(segments) + + +def _scan_segments(path: LocalPath, pymseed) -> list[_TraceSummary]: + """Read and coalesce MiniSEED record metadata without unpacking samples.""" + summaries = [] + for record in pymseed.MS3Record.from_file(str(path), unpack_data=False): + summary = _record_to_summary(record, pymseed) + if summary.sample_count: + summaries.append(summary) + return _coalesce_source_summaries(summaries) + + +def _get_group_key(segment: _TraceInfo) -> _TraceGroupKey: + """Return the compatibility key used to merge traces into a patch.""" + return _TraceGroupKey( + format_version=segment.format_version, + network=segment.network, + location=segment.location, + seed_channel=segment.seed_channel, + start_ns=segment.start_ns, + sample_rate=segment.sample_rate, + sample_count=segment.sample_count, + ) + + +def _group_segments(segments: Sequence[_T]): + """Yield compatible segment groups.""" + segments = sorted(segments, key=lambda x: (_get_group_key(x), x.source_id)) + for key, group in groupby(segments, key=_get_group_key): + yield key, list(group) + + +def _get_channel_map(segments: Sequence[_TraceInfo]) -> dict[str, int]: + """Return stable channel indices for MiniSEED sources.""" + source_ids = dict.fromkeys( + x.source_id for x in sorted(segments, key=lambda y: (y.station, y.source_id)) + ) + return {source_id: ind for ind, source_id in enumerate(source_ids)} + + +def _get_selected_source_ids(channel_map: dict[str, int], channel) -> set[str]: + """Return source IDs selected by DASCore channel selector semantics.""" + if channel is None: + return set(channel_map) + source_ids = np.asarray(tuple(channel_map)) + channel_values = np.asarray([channel_map[x] for x in source_ids]) + channel_coord = get_coord(data=channel_values) + _, indexer = channel_coord.select(channel) + return {str(x) for x in np.atleast_1d(source_ids[indexer])} + + +def _get_read_plan(path, pymseed, wanted_ids, channel): + """Return scan-derived groups and source windows needed for a read.""" + summaries = _scan_segments(path, pymseed) + channel_map = _get_channel_map(summaries) + selected_source_ids = _get_selected_source_ids(channel_map, channel) + source_windows: _SourceWindows = {} + groups = [] + for group_key, group in _group_segments(summaries): + patch_id = _source_patch_id(group_key) + if wanted_ids and patch_id not in wanted_ids: + continue + group = [x for x in group if x.source_id in selected_source_ids] + if not group: + continue + groups.append((group_key, group)) + for summary in group: + source_windows.setdefault(summary.source_id, []).append( + _trace_time_window(summary) + ) + return channel_map, groups, source_windows + + +def _source_patch_id(group_key: _TraceGroupKey) -> str: + """Return a stable source patch ID for a MiniSEED group.""" + source_key = ".".join( + (group_key.network, group_key.location, group_key.seed_channel) + ) + rate_key = format(group_key.sample_rate, ".17g") + return ( + f"v{group_key.format_version}:{source_key}:" + f"{group_key.start_ns}:{rate_key}:{group_key.sample_count}" + ) + + +def _get_coords( + segments: Sequence[_TraceInfo], channel_map: dict[str, int] | None = None +): + """Return DASCore coordinates from compatible MiniSEED segments.""" + segments = sorted(segments, key=lambda x: (x.station, x.source_id)) + first = segments[0] + sample_count = first.sample_count + source_ids = tuple(x.source_id for x in segments) + channel_values = ( + tuple(channel_map[x.source_id] for x in segments) + if channel_map is not None + else tuple(range(len(segments))) + ) + step = np.timedelta64(first.sample_step_ns, "ns") + start = np.datetime64(first.start_ns, "ns") + stop = start + step * sample_count + return { + "channel": get_coord(data=np.asarray(channel_values)), + "time": get_coord(start=start, stop=stop, step=step), + "source_id": (("channel",), source_ids), + "network": (("channel",), tuple(x.network for x in segments)), + "station": (("channel",), tuple(x.station for x in segments)), + "location": (("channel",), tuple(x.location for x in segments)), + "seed_channel": (("channel",), tuple(x.seed_channel for x in segments)), + } + + +def _prepare_group( + group_key, segments: Sequence[_T], channel_map=None +) -> _PreparedGroup[_T]: + """Return shared sorted segments, coordinates, and attrs for one group.""" + segments = tuple(sorted(segments, key=lambda x: (x.station, x.source_id))) + first = segments[0] + coords = _get_coords(segments, channel_map=channel_map) + attrs = { + "data_type": "", + "tag": ".".join((first.network, first.location, first.seed_channel)), + "sample_rate": first.sample_rate, + "sample_type": first.sample_type, + "mseed_encoding": first.encoding, + "mseed_publication_version": first.publication_version, + "mseed_record_length": first.record_length, + "_source_patch_id": _source_patch_id(group_key), + } + return _PreparedGroup(segments, first, coords, attrs) + + +def _patch_from_segments( + group_key, segments: list[_TraceSegment], channel_map: dict[str, int] | None = None +) -> dc.Patch: + """Create a DASCore Patch from compatible MiniSEED trace segments.""" + prepared = _prepare_group(group_key, segments, channel_map) + data = np.stack([x.data for x in prepared.segments]) + return dc.Patch( + data=data, + dims=("channel", "time"), + coords=prepared.coords, + attrs=prepared.attrs, + ) + + +def _segments_from_summaries( + segments: Sequence[_TraceSegment], summaries: Sequence[_TraceSummary] +) -> list[_TraceSegment]: + """Return decoded segments that overlap selected scan summaries.""" + summaries_by_source = {} + for summary in summaries: + summaries_by_source.setdefault(summary.source_id, []).append(summary) + out = [] + for segment in segments: + segment_start, segment_stop = _trace_time_window(segment) + for summary in summaries_by_source.get(segment.source_id, ()): + if _time_windows_overlap( + segment_start, segment_stop, _trace_time_window(summary) + ): + out.append(segment) + break + return out + + +def _scan_payload_from_segments( + group_key, + segments: list[_TraceSummary], + channel_map: dict[str, int] | None = None, +) -> ScanPayload: + """Create a DASCore scan payload from MiniSEED trace summaries.""" + prepared = _prepare_group(group_key, segments, channel_map) + coords = get_coord_manager( + prepared.coords, + dims=("channel", "time"), + ) + return _make_scan_payload( + attrs=prepared.attrs, + coords=coords, + dims=coords.dims, + shape=coords.shape, + dtype=prepared.first.dtype, + source_patch_id=prepared.attrs["_source_patch_id"], + ) + + +def _get_patches( + path, pymseed, time=None, channel=None, source_patch_id=() +) -> list[dc.Patch]: + """Read MiniSEED patches from a path.""" + wanted_ids = _normalize_source_patch_ids(source_patch_id) + patches = [] + use_scan_plan = bool(wanted_ids) or channel is not None + if use_scan_plan: + channel_map, segment_groups, source_windows = _get_read_plan( + path, pymseed, wanted_ids, channel + ) + if not segment_groups: + return [] + all_segments = _read_segments( + path, pymseed, time=time, source_windows=source_windows + ) + else: + all_segments = _read_segments(path, pymseed, time=time) + channel_map = _get_channel_map(all_segments) + segment_groups = list(_group_segments(all_segments)) + for group_key, segments in segment_groups: + if use_scan_plan: + segments = _segments_from_summaries(all_segments, segments) + if not segments: + continue + patch = _patch_from_segments(group_key, segments, channel_map=channel_map) + if patch.size: + patches.append(patch) + return sorted(patches, key=lambda x: x.get_coord("channel").min()) + + +def _scan_patches(path, pymseed) -> list[ScanPayload]: + """Return scan payloads for MiniSEED patches.""" + payloads = [] + all_segments = _scan_segments(path, pymseed) + channel_map = _get_channel_map(all_segments) + for group_key, segments in _group_segments(all_segments): + payload = _scan_payload_from_segments( + group_key, segments, channel_map=channel_map + ) + payloads.append(payload) + return sorted(payloads, key=lambda x: x["coords"].get_coord("channel").min()) + + +def _is_seed_code(value: bytes, *, allow_space: bool = True) -> bool: + """Return True if bytes are plausible SEED code bytes.""" + allowed = _SEED_CHARS_WITH_SPACE if allow_space else _SEED_CHARS + return bool(value.strip()) and all(char in allowed for char in value) + + +def _detect_mseed_v2_header(header: bytes) -> bool: + """Return True if a fixed MiniSEED 2 header looks valid.""" + if len(header) < 48: + return False + if not header[:6].isdigit(): + return False + if header[6] not in b"DRQM": + return False + if header[7] not in (0, 32): + return False + if not _is_seed_code(header[8:13]): + return False + if not _is_seed_code(header[15:18], allow_space=False): + return False + if not _is_seed_code(header[18:20]): + return False + try: + year, day = unpack(">HH", header[20:24]) + hour, minute, second = header[24:27] + _unused, frac_seconds, sample_count = unpack(">BHH", header[27:32]) + data_offset, blockette_offset = unpack(">HH", header[44:48]) + except struct.error: + return False + valid_time = ( + 1900 <= year <= 2600 + and 1 <= day <= 366 + and hour <= 23 + and minute <= 59 + and second <= 60 + and frac_seconds <= 9_999 + ) + valid_offsets = data_offset >= 48 and ( + blockette_offset == 0 or blockette_offset >= 48 + ) + return valid_time and valid_offsets and sample_count > 0 + + +def _detect_format(path: LocalPath) -> tuple[str, str] | bool: + """Return the MiniSEED version for a path or False.""" + header = _read_file_header(path, 48) + if header.startswith(b"MS\x03"): + return "MSEED", "3" + if _detect_mseed_v2_header(header): + return "MSEED", "2" + return False diff --git a/dascore/utils/io.py b/dascore/utils/io.py index 53ff30a5..f3a49223 100644 --- a/dascore/utils/io.py +++ b/dascore/utils/io.py @@ -85,6 +85,15 @@ def _normalize_source_patch_ids(source_patch_id) -> set[str]: return {str(value) for value in iterate(source_patch_id) if value not in (None, "")} +def _read_file_header(path, length: int) -> bytes: + """Return the first bytes from a file-like path or empty bytes on IO errors.""" + try: + with open(path, "rb") as fi: + return fi.read(length) + except OSError: + return b"" + + class BinaryReader(io.BytesIO): """Base file for reading binary files.""" diff --git a/pyproject.toml b/pyproject.toml index 400ad97d..65e220e6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -70,6 +70,7 @@ extras = [ "numba", "segyio", "bottleneck", + "pymseed[numpy]", ] docs = [ @@ -150,6 +151,8 @@ WAV = "dascore.io.wav.core:WavIO" XMLBINARY__V1 = "dascore.io.xml_binary.core:XMLBinaryV1" GDR_DAS__V1 = "dascore.io.gdr.core:GDR_V1" NETCDF_CF__V1_8 = "dascore.io.netcdf.core:NetCDFCFV18" +MSEED__V2 = "dascore.io.mseed.core:MSeedV2" +MSEED__V3 = "dascore.io.mseed.core:MSeedV3" [project.entry-points."dascore.patch_namespace"] io = "dascore.io:PatchIO" diff --git a/tests/test_io/test_common_io.py b/tests/test_io/test_common_io.py index d937c621..4f6dcb73 100644 --- a/tests/test_io/test_common_io.py +++ b/tests/test_io/test_common_io.py @@ -29,6 +29,7 @@ from dascore.io.febus import Febus1, Febus2 from dascore.io.gdr import GDR_V1 from dascore.io.h5simple import H5Simple +from dascore.io.mseed.core import MSeedV2 from dascore.io.netcdf import NetCDFCFV18 from dascore.io.neubrex import NeubrexDASV1, NeubrexRFSV1 from dascore.io.optodas import OptoDASV8 @@ -89,6 +90,7 @@ Terra15FormatterV5(): ("terra15_v5_test_file.hdf5",), Terra15FormatterV6(): ("terra15_v6_test_file.hdf5",), NetCDFCFV18(): ("xdas_netcdf.nc",), + MSeedV2(): ("etna_9n_3chan_10s.mseed",), } # This tuple is for fiber io which support a write method and can write @@ -202,6 +204,29 @@ def _assert_op_or_close(val1, val2, op): raise AssertionError(msg) +def _get_coord_trim_values(coord): + """Return existing coordinate values to use for read selections.""" + values = coord.values + if len(values) == 1: + return values[0], values[0] + start_ind = min(max(1, len(values) // 10), len(values) - 1) + stop_ind = min(max(start_ind, 2 * len(values) // 10), len(values) - 1) + return values[start_ind], values[stop_ind] + + +def _assert_spool_dim_selection(spool, dim, trim_tuple, start_op, stop_op): + """Assert all patches in a selected spool respect a requested dim range.""" + assert len(spool) + for patch in spool: + _assert_coords_attrs_match(patch) + coord = patch.get_coord(dim) + summary = patch.summary.get_coord_summary(dim) + start = summary.min if trim_tuple[0] in (None, ...) else trim_tuple[0] + stop = summary.max if trim_tuple[1] in (None, ...) else trim_tuple[1] + _assert_op_or_close(coord.min(), start, start_op) + _assert_op_or_close(coord.max(), stop, stop_op) + + # --- Tests @@ -299,41 +324,21 @@ def test_slice_single_dim_both_ends(self, io_path_tuple): with skip_missing(): summaries_from_file = [_scan_summary(x) for x in dc.scan(path)] assert len(summaries_from_file) - summary_init = summaries_from_file[0] - for dim in summary_init.dim_tuple: - summary_coord = summary_init.get_coord_summary(dim) - start = summary_coord.min - stop = summary_coord.max - duration = stop - start + patch_init = _cached_read(path, io=io)[0] + for dim in patch_init.dims: + trim_start, trim_stop = _get_coord_trim_values(patch_init.get_coord(dim)) # first test double ended query - trim_tuple = (start + duration / 10, start + 2 * duration / 10) + trim_tuple = (trim_start, trim_stop) spool = io.read(path, **{dim: trim_tuple}) - assert len(spool) == 1 - patch = spool[0] - _assert_coords_attrs_match(patch) - coord = patch.get_coord(dim) - _assert_op_or_close(coord.min(), trim_tuple[0], ge) - _assert_op_or_close(coord.max(), trim_tuple[1], le) + _assert_spool_dim_selection(spool, dim, trim_tuple, ge, le) # then single-ended query on start side - trim_tuple = (start + duration / 10, ...) + trim_tuple = (trim_start, ...) spool = io.read(path, **{dim: trim_tuple}) - assert len(spool) == 1 - patch = spool[0] - summary = patch.summary - _assert_coords_attrs_match(patch) - coord = patch.get_coord(dim) - _assert_op_or_close(coord.min(), trim_tuple[0], ge) - _assert_op_or_close(coord.max(), summary.get_coord_summary(dim).max, eq) + _assert_spool_dim_selection(spool, dim, trim_tuple, ge, eq) # then single-ended query on end side - trim_tuple = (None, start + duration / 10) + trim_tuple = (None, trim_start) spool = io.read(path, **{dim: trim_tuple}) - assert len(spool) == 1 - patch = spool[0] - summary = patch.summary - _assert_coords_attrs_match(patch) - coord = patch.get_coord(dim) - _assert_op_or_close(coord.min(), summary.get_coord_summary(dim).min, eq) - _assert_op_or_close(coord.max(), trim_tuple[1], le) + _assert_spool_dim_selection(spool, dim, trim_tuple, eq, le) def test_slice_out_all_patches_time(self, io_path_tuple): """Ensure slicing outside of file time range returns an empty spool.""" diff --git a/tests/test_io/test_mseed/test_mseed.py b/tests/test_io/test_mseed/test_mseed.py new file mode 100644 index 00000000..975ee101 --- /dev/null +++ b/tests/test_io/test_mseed/test_mseed.py @@ -0,0 +1,756 @@ +"""Tests for MiniSEED IO.""" + +from __future__ import annotations + +import numpy as np +import pytest + +import dascore as dc +from dascore.exceptions import MissingOptionalDependencyError +from dascore.io.mseed import utils as mseed_utils +from dascore.io.mseed.core import MSeedV2 +from tests.test_io._common_io_test_utils import skip_timeout + + +def _mseed_v2_header(): + """Return enough fixed header bytes to identify a MiniSEED 2 file.""" + header = bytearray(48) + header[0:8] = b"000001D " + header[8:13] = b"STAT " + header[13:15] = b" " + header[15:18] = b"HSF" + header[18:20] = b"XX" + header[20:32] = bytes.fromhex("07e800010000000000000001") + header[44:48] = bytes.fromhex("00300000") + return header + + +def _write_mseed( + path, + format_version=3, + starts=None, + sample_rates=None, + source_ids=None, + data_samples=None, +): + """Write a small MiniSEED file.""" + pymseed = pytest.importorskip("pymseed") + + source_ids = source_ids or [f"FDSN:XX_{ind:05d}__H_S_F" for ind in range(3)] + starts = starts or [pymseed.timestr2nstime("2024-01-01T00:00:00Z")] * len( + source_ids + ) + sample_rates = sample_rates or [10.0] * len(source_ids) + data_samples = data_samples or [ + np.arange(10, dtype=np.int32) + ind * 100 for ind in range(len(source_ids)) + ] + traces = pymseed.MS3TraceList() + for source_id, start, sample_rate, data in zip( + source_ids, starts, sample_rates, data_samples, strict=True + ): + traces.add_data( + sourceid=source_id, + data_samples=data, + sample_type="i", + sample_rate=sample_rate, + start_time=start, + ) + traces.to_file( + str(path), + overwrite=True, + format_version=format_version, + encoding=pymseed.DataEncoding.INT32, + ) + return path + + +def _write_mseed_v2_header(path): + """Write enough fixed header bytes to identify a MiniSEED 2 file.""" + path.write_bytes(_mseed_v2_header()) + return path + + +def _trace_segment(**kwargs): + """Return a small decoded MiniSEED trace segment for helper tests.""" + data = kwargs.pop("data", np.arange(3, dtype=np.int32)) + defaults = dict( + source_id="FDSN:XX_00000__H_S_F", + network="XX", + station="00000", + location="", + seed_channel="HSF", + format_version="3", + start_ns=0, + sample_rate=10.0, + sample_count=len(data), + sample_type="i", + encoding="11", + publication_version=0, + record_length=256, + ) + defaults.update(kwargs) + return mseed_utils._TraceSegment(**defaults, data=data) + + +def _trace_summary(**kwargs): + """Return a small MiniSEED trace summary for helper tests.""" + segment = _trace_segment(**kwargs) + summary_kwargs = { + key: getattr(segment, key) for key in mseed_utils._TraceInfo.__annotations__ + } + return mseed_utils._TraceSummary( + **summary_kwargs, + dtype=str(segment.data.dtype), + ) + + +class _MiniSeedRecord: + """Small record stub for exercising read-planning paths.""" + + def __init__( + self, + sourceid="FDSN:XX_00000__H_S_F", + data=None, + samprate=10.0, + starttime=0, + fail_unpack=False, + ): + self.sourceid = sourceid + self.formatversion = 3 + self.starttime = starttime + self.samprate = samprate + self.np_datasamples = np.arange(3, dtype=np.int32) if data is None else data + self.samplecnt = len(self.np_datasamples) + self.endtime = starttime + int((self.samplecnt - 1) * 1_000_000_000 / samprate) + self.sampletype = "i" + self.encoding = 3 + self.pubversion = 0 + self.reclen = 256 + self.fail_unpack = fail_unpack + + def unpack_data(self): + """Raise when a test expects this record to be skipped.""" + if self.fail_unpack: + raise AssertionError(f"{self.sourceid} should not unpack") + + +def _pymseed_for_records(records): + """Return a small PyMseed-like object for record iteration tests.""" + + class PyMseed: + class MS3Record: + @staticmethod + def from_file(path, *, unpack_data): + assert path == "unused" + assert unpack_data is False + yield from records + + @staticmethod + def sourceid2nslc(source_id): + _, rest = source_id.split(":", maxsplit=1) + network, station, location, band, source, subsource = rest.split("_") + return network, station, location, f"{band}{source}{subsource}" + + return PyMseed + + +@pytest.fixture +def mseed_v2_path(tmp_path): + """Return a small MiniSEED v2 path.""" + return _write_mseed(tmp_path / "test_v2.mseed", format_version=2) + + +@pytest.fixture +def mseed_v3_path(tmp_path): + """Return a small MiniSEED v3 path.""" + return _write_mseed(tmp_path / "test_v3.mseed", format_version=3) + + +class TestMiniSeedGetFormat: + """Tests for detecting MiniSEED files.""" + + def test_get_format_v2(self, mseed_v2_path): + """MiniSEED v2 files can be detected.""" + assert MSeedV2().get_format(mseed_v2_path) == ("MSEED", "2") + + def test_get_format_v3(self, mseed_v3_path): + """MiniSEED v3 files can be detected.""" + assert MSeedV2().get_format(mseed_v3_path) == ("MSEED", "3") + + def test_get_format_from_dascore(self, mseed_v3_path): + """DASCore can detect MiniSEED files through plugin discovery.""" + assert dc.get_format(mseed_v3_path) == ("MSEED", "3") + + def test_get_format_without_pymseed(self, tmp_path, monkeypatch): + """MiniSEED headers can be detected without PyMseed installed.""" + import dascore.io.mseed.core as mseed_core + + def _optional_import(*args, **kwargs): + raise MissingOptionalDependencyError("missing") + + monkeypatch.setattr(mseed_core, "optional_import", _optional_import) + v2_path = _write_mseed_v2_header(tmp_path / "test_v2.mseed") + v3_path = tmp_path / "test_v3.mseed" + v3_path.write_bytes(b"MS\x03" + bytes(45)) + assert MSeedV2().get_format(v2_path) == ("MSEED", "2") + assert MSeedV2().get_format(v3_path) == ("MSEED", "3") + + def test_get_format_too_small(self, tmp_path): + """Small invalid files return False.""" + path = tmp_path / "bad.mseed" + path.write_bytes(b"abc") + assert MSeedV2().get_format(path) is False + + +class TestMiniSeedRead: + """Tests for reading MiniSEED files.""" + + def test_read_multichannel(self, mseed_v3_path): + """Compatible traces are merged into one channel-time patch.""" + patch = dc.read(mseed_v3_path, file_format="MSEED", file_version="3")[0] + assert patch.dims == ("channel", "time") + assert patch.shape == (3, 10) + assert np.all(patch.data[1] == np.arange(10) + 100) + assert patch.get_coord("time").step == np.timedelta64(100_000_000, "ns") + assert tuple(patch.coords.get_array("source_id")) == ( + "FDSN:XX_00000__H_S_F", + "FDSN:XX_00001__H_S_F", + "FDSN:XX_00002__H_S_F", + ) + assert tuple(patch.coords.get_array("network")) == ("XX", "XX", "XX") + assert tuple(patch.coords.get_array("station")) == ("00000", "00001", "00002") + assert tuple(patch.coords.get_array("location")) == ("", "", "") + assert tuple(patch.coords.get_array("seed_channel")) == ("HSF", "HSF", "HSF") + + def test_same_channel_different_stations_merge(self, tmp_path): + """Generic MiniSEED sources with compatible timing merge by station.""" + path = _write_mseed( + tmp_path / "multi_station.mseed", + source_ids=[ + "FDSN:XX_00066__H_S_F", + "FDSN:XX_00067__H_S_F", + "FDSN:XX_00068__H_S_F", + ], + ) + patch = dc.read(path, file_format="MSEED", file_version="3")[0] + assert patch.shape == (3, 10) + assert tuple(patch.coords.get_array("station")) == ("00066", "00067", "00068") + assert tuple(patch.coords.get_array("seed_channel")) == ("HSF", "HSF", "HSF") + + def test_different_seed_channels_are_separate_patches(self, tmp_path): + """Different physical components should not be merged into one patch.""" + path = _write_mseed( + tmp_path / "components.mseed", + source_ids=[ + "FDSN:XX_TEST__B_H_N", + "FDSN:XX_TEST__B_H_E", + "FDSN:XX_TEST__B_H_Z", + ], + ) + spool = dc.read(path, file_format="MSEED", file_version="3") + assert len(spool) == 3 + assert {x.coords.get_array("seed_channel")[0] for x in spool} == { + "BHE", + "BHN", + "BHZ", + } + + def test_read_select_time_and_channel(self, mseed_v3_path): + """Time and channel selections are applied to the output patch.""" + time_min = np.datetime64("2024-01-01T00:00:00.200000000") + time_max = np.datetime64("2024-01-01T00:00:00.500000000") + patch = dc.read( + mseed_v3_path, + file_format="MSEED", + file_version="3", + time=(time_min, time_max), + channel=(1, 3), + )[0] + assert patch.shape == (2, 4) + assert np.all(patch.get_coord("channel").values == np.array([1, 2])) + assert patch.get_coord("time").min() == time_min + + def test_incompatible_sample_rates_are_separate_patches(self, tmp_path): + """Incompatible traces are returned as separate patches.""" + path = _write_mseed( + tmp_path / "mixed.mseed", + format_version=3, + sample_rates=[10.0, 20.0, 10.0], + ) + spool = dc.read(path, file_format="MSEED", file_version="3") + assert len(spool) == 2 + assert sorted(x.shape[0] for x in spool) == [1, 2] + + def test_unequal_sample_counts_are_separate_patches(self, tmp_path): + """Unequal trace lengths are preserved in separate patches.""" + data_samples = [ + np.arange(10, dtype=np.int32), + np.arange(7, dtype=np.int32) + 100, + np.arange(10, dtype=np.int32) + 200, + ] + path = _write_mseed(tmp_path / "unequal.mseed", data_samples=data_samples) + spool = dc.read(path, file_format="MSEED", file_version="3") + assert len(spool) == 2 + assert sorted(x.shape for x in spool) == [(1, 7), (2, 10)] + assert sorted(len(x.get_coord("time")) for x in spool) == [7, 10] + assert sorted(tuple(x.get_coord("channel").values) for x in spool) == [ + (0, 2), + (1,), + ] + + def test_read_source_patch_id(self, tmp_path): + """source_patch_id filters logical MiniSEED groups.""" + path = _write_mseed( + tmp_path / "mixed.mseed", + format_version=3, + sample_rates=[10.0, 20.0, 10.0], + ) + summaries = dc.scan(path, file_format="MSEED", file_version="3") + target = summaries[0].source_patch_id + spool = dc.read( + path, file_format="MSEED", file_version="3", source_patch_id=target + ) + assert len(spool) == 1 + assert spool[0].attrs["_source_patch_id"] == target + + def test_read_source_patch_id_with_time(self, mseed_v3_path): + """source_patch_id from scan works with partial time reads.""" + target = dc.scan(mseed_v3_path, file_format="MSEED", file_version="3")[ + 0 + ].source_patch_id + time_min = np.datetime64("2024-01-01T00:00:00.200000000") + time_max = np.datetime64("2024-01-01T00:00:00.500000000") + spool = dc.read( + mseed_v3_path, + file_format="MSEED", + file_version="3", + source_patch_id=target, + time=(time_min, time_max), + ) + assert len(spool) == 1 + assert spool[0].attrs["_source_patch_id"] == target + assert spool[0].shape == (3, 4) + assert spool[0].get_coord("time").min() == time_min + + def test_read_source_patch_id_with_non_overlapping_time(self, mseed_v3_path): + """source_patch_id reads return empty spools for non-overlapping times.""" + target = dc.scan(mseed_v3_path, file_format="MSEED", file_version="3")[ + 0 + ].source_patch_id + spool = dc.read( + mseed_v3_path, + file_format="MSEED", + file_version="3", + source_patch_id=target, + time=( + np.datetime64("2024-01-01T01:00:00"), + np.datetime64("2024-01-01T01:00:01"), + ), + ) + assert len(spool) == 0 + + def test_source_patch_id_skips_unselected_record_unpack(self): + """source_patch_id reads do not decode records outside selected groups.""" + records = [ + _MiniSeedRecord("FDSN:XX_00000__H_S_F", samprate=10.0), + _MiniSeedRecord("FDSN:XX_00001__H_S_F", samprate=20.0, fail_unpack=True), + ] + target = "v3:XX..HSF:0:10:3" + patches = mseed_utils._get_patches( + "unused", + _pymseed_for_records(records), + source_patch_id=target, + ) + assert len(patches) == 1 + assert patches[0].attrs["_source_patch_id"] == target + assert tuple(patches[0].get_coord("channel").values) == (0,) + + def test_channel_filter_skips_unselected_record_unpack(self): + """Channel reads do not decode records outside selected channels.""" + records = [ + _MiniSeedRecord("FDSN:XX_00000__H_S_F"), + _MiniSeedRecord("FDSN:XX_00001__H_S_F", fail_unpack=True), + ] + patches = mseed_utils._get_patches( + "unused", + _pymseed_for_records(records), + channel=(0, 0), + ) + assert len(patches) == 1 + assert tuple(patches[0].get_coord("channel").values) == (0,) + + def test_unmatched_source_patch_id_returns_no_patches(self): + """Unknown source patch IDs produce no decoded patches.""" + records = [_MiniSeedRecord("FDSN:XX_00000__H_S_F", fail_unpack=True)] + patches = mseed_utils._get_patches( + "unused", + _pymseed_for_records(records), + source_patch_id="v3:XX..HSF:1:10:3", + ) + assert patches == [] + + def test_unmatched_channel_returns_no_patches(self): + """Channel selections with no sources produce no decoded patches.""" + records = [_MiniSeedRecord("FDSN:XX_00000__H_S_F", fail_unpack=True)] + patches = mseed_utils._get_patches( + "unused", + _pymseed_for_records(records), + channel=(10, 10), + ) + assert patches == [] + + def test_discontinuous_source_records_are_separate_patches(self, tmp_path): + """Discontinuous records for one source ID are not coalesced.""" + pymseed = pytest.importorskip("pymseed") + + start = pymseed.timestr2nstime("2024-01-01T00:00:00Z") + path = _write_mseed( + tmp_path / "discontinuous.mseed", + source_ids=["FDSN:XX_TEST__H_S_F", "FDSN:XX_TEST__H_S_F"], + starts=[start, start + 2_000_000_000], + ) + spool = dc.read(path, file_format="MSEED", file_version="3") + assert len(spool) == 2 + + +class TestMiniSeedScan: + """Tests for scanning MiniSEED files.""" + + def test_scan(self, mseed_v3_path): + """Scan returns metadata for the logical MiniSEED patch.""" + summary = dc.scan(mseed_v3_path, file_format="MSEED", file_version="3")[0] + assert summary.dims == ("channel", "time") + assert summary.shape == (3, 10) + assert summary.dtype == "int32" + assert summary.source_format == "MSEED" + assert summary.source_version == "3" + assert summary.source_patch_id + + @pytest.mark.parametrize( + ("fixture_name", "file_version"), + (("mseed_v2_path", "2"), ("mseed_v3_path", "3")), + ) + def test_scan_matches_read_summary(self, request, fixture_name, file_version): + """Light scan metadata matches the corresponding read summary.""" + path = request.getfixturevalue(fixture_name) + summary = dc.scan(path, file_format="MSEED", file_version=file_version)[0] + patch_summary = dc.read(path, file_format="MSEED", file_version=file_version)[ + 0 + ].summary + assert summary.dims == patch_summary.dims + assert summary.shape == patch_summary.shape + assert summary.dtype == patch_summary.dtype + assert summary.source_patch_id == patch_summary.source_patch_id + for dim in summary.dims: + scan_coord = summary.get_coord_summary(dim) + read_coord = patch_summary.get_coord_summary(dim) + assert scan_coord.min == read_coord.min + assert scan_coord.max == read_coord.max + assert scan_coord.step == read_coord.step + + def test_scan_v2_metadata(self, mseed_v2_path): + """MiniSEED v2 scan reports expected metadata from headers.""" + summary = dc.scan(mseed_v2_path, file_format="MSEED", file_version="2")[0] + assert summary.shape == (3, 10) + assert summary.dtype == "int32" + assert summary.source_format == "MSEED" + assert summary.source_version == "2" + assert summary.source_patch_id.startswith("v2:") + + def test_scan_does_not_unpack_records(self, monkeypatch): + """Scan uses record headers without decoding sample payloads.""" + + class Record: + sourceid = "FDSN:XX_00000__H_S_F" + formatversion = 3 + starttime = 0 + samprate = 10.0 + samplecnt = 10 + sampletype = None + encoding = 3 + pubversion = 0 + reclen = 256 + + def unpack_data(self): + raise AssertionError("scan should not unpack samples") + + class PyMseed: + class MS3Record: + @staticmethod + def from_file(path, *, unpack_data): + assert path == "unused" + assert unpack_data is False + yield Record() + + @staticmethod + def sourceid2nslc(source_id): + assert source_id == Record.sourceid + return "XX", "00000", "", "HSF" + + payload = mseed_utils._scan_patches("unused", PyMseed)[0] + assert payload["shape"] == (1, 10) + assert payload["dtype"] == "int32" + + def test_missing_pymseed_raises(self, mseed_v3_path, monkeypatch): + """Explicit MiniSEED reads require PyMseed.""" + import dascore.io.mseed.core as mseed_core + + def _optional_import(*args, **kwargs): + raise MissingOptionalDependencyError("missing") + + monkeypatch.setattr(mseed_core, "optional_import", _optional_import) + with pytest.raises(MissingOptionalDependencyError): + dc.read(mseed_v3_path, file_format="MSEED", file_version="3") + + with pytest.raises(MissingOptionalDependencyError): + dc.scan(mseed_v3_path, file_format="MSEED", file_version="3") + + +class TestMiniSeedUtils: + """Focused tests for MiniSEED helper edge cases.""" + + def test_zero_sample_rate_raises(self): + """A zero sample rate cannot be converted to a sample step.""" + with pytest.raises(ValueError, match="sample rate"): + mseed_utils._duration_ns(0) + + def test_negative_sample_rate_is_sample_period(self): + """Negative MiniSEED sample rates are interpreted as sample periods.""" + assert mseed_utils._duration_ns(-0.1) == 100_000_000 + + def test_next_start_ns_avoids_sample_step_drift(self): + """Record adjacency uses total duration instead of rounded sample steps.""" + segment = _trace_segment(sample_rate=3.0, sample_count=3) + assert segment.sample_step_ns == 333_333_333 + assert segment.next_start_ns == 1_000_000_000 + + def test_open_time_limits(self): + """None and ellipsis are open time bounds.""" + assert mseed_utils._get_time_limits(...) == (None, None) + assert mseed_utils._get_time_limits((None, ...)) == (None, None) + + def test_bad_source_id_falls_back_to_station(self): + """Unparseable source IDs are still preserved.""" + + class BadPyMseed: + @staticmethod + def sourceid2nslc(source_id): + raise ValueError("bad source id") + + assert mseed_utils._source_id_to_nslc(BadPyMseed, "not-fdsn") == ( + "", + "not-fdsn", + "", + "", + ) + + def test_source_id_error_falls_back_to_station(self): + """PyMseed source ID parser errors are treated as unparseable IDs.""" + + class BadPyMseed: + class SourceIDError(Exception): + pass + + @staticmethod + def sourceid2nslc(source_id): + raise BadPyMseed.SourceIDError("bad source id") + + assert mseed_utils._source_id_to_nslc(BadPyMseed, "not-fdsn") == ( + "", + "not-fdsn", + "", + "", + ) + + def test_unexpected_source_id_errors_propagate(self): + """Only expected source ID parser errors are swallowed.""" + + class BadPyMseed: + @staticmethod + def sourceid2nslc(source_id): + raise RuntimeError("unexpected bug") + + with pytest.raises(RuntimeError, match="unexpected bug"): + mseed_utils._source_id_to_nslc(BadPyMseed, "not-fdsn") + + def test_record_to_segment_skips_non_overlapping_records(self): + """Record metadata can be rejected before sample unpacking.""" + + class Record: + starttime = 0 + endtime = 10 + + def unpack_data(self): + raise AssertionError("should not unpack") + + assert mseed_utils._record_to_segment(Record(), None, (20, 30)) is None + + def test_trim_segment_time_returns_none_for_empty_selection(self): + """Time trimming can remove all samples from a decoded segment.""" + segment = _trace_segment() + assert mseed_utils._trim_segment_time(segment, (25_000_000, 75_000_000)) is None + + def test_record_dtype_from_sample_type(self): + """Populated sample types are preferred when inferring scan dtype.""" + + class Record: + sampletype = "d" + encoding = 3 + + assert mseed_utils._record_dtype(Record()) == "float64" + + @pytest.mark.parametrize( + ("encoding", "dtype"), + ( + (0, "S1"), + (1, "int16"), + (3, "int32"), + (4, "float32"), + (5, "float64"), + (10, "int32"), + (11, "int32"), + (999, ""), + ), + ) + def test_record_dtype_from_encoding(self, encoding, dtype): + """MiniSEED scan dtype can be inferred from known encodings.""" + + class Record: + sampletype = None + + Record.encoding = encoding + assert mseed_utils._record_dtype(Record()) == dtype + + def test_contiguous_source_summaries_are_coalesced(self): + """Contiguous scan summaries from one source are merged.""" + first = _trace_summary() + second = _trace_summary(start_ns=first.next_start_ns, record_length=512) + out = mseed_utils._coalesce_source_summaries([second, first]) + assert len(out) == 1 + assert out[0].sample_count == first.sample_count + second.sample_count + assert out[0].record_length == 512 + + def test_discontinuous_source_summaries_are_not_coalesced(self): + """Discontinuous scan summaries from one source stay separate.""" + first = _trace_summary() + second = _trace_summary(start_ns=first.next_start_ns + first.sample_step_ns) + out = mseed_utils._coalesce_source_summaries([first, second]) + assert len(out) == 2 + + def test_source_segments_with_different_encodings_are_not_coalesced(self): + """Read and scan coalescing both preserve MiniSEED encoding changes.""" + first = _trace_segment(encoding="3") + second = _trace_segment(start_ns=first.next_start_ns, encoding="11") + out = mseed_utils._coalesce_source_segments([first, second]) + assert len(out) == 2 + + def test_patch_from_segments_defaults_to_local_channel_indices(self): + """Patches can still be built without a global channel map.""" + segment = _trace_segment() + group_key = mseed_utils._get_group_key(segment) + patch = mseed_utils._patch_from_segments(group_key, [segment]) + assert tuple(patch.get_coord("channel").values) == (0,) + + def test_source_patch_id_uses_group_key_fields(self): + """Source patch IDs are built from the typed MiniSEED group key.""" + group_key = mseed_utils._TraceGroupKey( + format_version="3", + network="XX", + location="", + seed_channel="HSF", + start_ns=10, + sample_rate=10.0, + sample_count=12, + ) + assert mseed_utils._source_patch_id(group_key) == "v3:XX..HSF:10:10:12" + + def test_detect_format_no_records(self, tmp_path): + """Files with no records are not MiniSEED.""" + path = tmp_path / "empty.mseed" + path.write_bytes(b"") + out = mseed_utils._detect_format(path) + assert out is False + + def test_detect_format_missing_file(self, tmp_path): + """Missing paths are not MiniSEED.""" + assert mseed_utils._detect_format(tmp_path / "missing.mseed") is False + + def test_detect_format_unsupported_version(self, tmp_path): + """Unsupported MiniSEED versions are rejected.""" + path = tmp_path / "v4.mseed" + path.write_bytes(b"MS\x04" + bytes(45)) + assert mseed_utils._detect_format(path) is False + + @pytest.mark.parametrize( + "field,value", + ( + ("sequence", b"A00001"), + ("record_type", b"X"), + ("reserved", b"X"), + ("station", b" "), + ("channel", b"H F"), + ("network", b" "), + ("year", bytes.fromhex("0700")), + ("data_offset", bytes.fromhex("002f")), + ("sample_count", bytes.fromhex("0000")), + ), + ) + def test_detect_mseed_v2_header_rejects_invalid_fields(self, field, value): + """Each fixed-header guard can reject invalid MiniSEED 2 headers.""" + header = _mseed_v2_header() + slices = { + "sequence": slice(0, 6), + "record_type": slice(6, 7), + "reserved": slice(7, 8), + "station": slice(8, 13), + "channel": slice(15, 18), + "network": slice(18, 20), + "year": slice(20, 22), + "data_offset": slice(44, 46), + "sample_count": slice(30, 32), + } + header[slices[field]] = value + assert not mseed_utils._detect_mseed_v2_header(header) + + def test_detect_mseed_v2_header_rejects_short_header(self): + """Truncated fixed headers are not MiniSEED 2.""" + assert not mseed_utils._detect_mseed_v2_header(bytes(47)) + + def test_detect_mseed_v2_header_handles_unpack_error(self, monkeypatch): + """Header unpack failures are treated as non-MiniSEED.""" + + def _raise_unpack(*args, **kwargs): + raise mseed_utils.struct.error("bad unpack") + + monkeypatch.setattr(mseed_utils, "unpack", _raise_unpack) + assert not mseed_utils._detect_mseed_v2_header(_mseed_v2_header()) + + def test_close_sample_rates_have_distinct_source_patch_ids(self): + """Full precision sample rates avoid source patch ID collisions.""" + group_a = mseed_utils._TraceGroupKey("3", "XX", "", "HSF", 0, 1.00000001, 10) + group_b = mseed_utils._TraceGroupKey("3", "XX", "", "HSF", 0, 1.00000002, 10) + assert mseed_utils._source_patch_id(group_a) != mseed_utils._source_patch_id( + group_b + ) + + +class TestRealMiniSeed: + """Tests using real DAS MiniSEED data from the test-data registry.""" + + def test_read_das_station_channels(self): + """Etna DAS station-coded channels preserve their full sample counts.""" + pytest.importorskip("pymseed") + from dascore.utils.downloader import fetch + + with skip_timeout(): + path = fetch("etna_9n_3chan_10s.mseed") + spool = dc.read(path) + assert len(spool) == 3 + assert sorted(x.shape for x in spool) == [(1, 13556), (1, 13729), (1, 13735)] + assert {x.coords.get_array("network")[0] for x in spool} == {"9N"} + assert {x.coords.get_array("station")[0] for x in spool} == { + "00066", + "00067", + "00068", + } + assert {x.coords.get_array("seed_channel")[0] for x in spool} == {"HSF"}