Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 83 additions & 10 deletions src/ess/bifrost/io/nexus.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import scipp as sc
import scippnexus as snx

from ess.bifrost.types import DetectorAnalyzerMap
from ess.reduce.nexus import load_all_components, open_component_group
from ess.reduce.nexus.types import NeXusAllLocationSpec, NeXusLocationSpec
from ess.spectroscopy.types import (
Expand Down Expand Up @@ -59,20 +60,85 @@ def load_analyzers(file_spec: NeXusFileSpec[RunType]) -> Analyzers[RunType]:
)


def _get_analyzer_for_detector_name(
detector_name: str, analyzers: Analyzers[RunType]
) -> Analyzers[RunType]:
detector_index = int(detector_name.split('_', 1)[0])
analyzer_index = str(detector_index - 2)
for name, analyzer in analyzers.items():
if name.startswith(analyzer_index):
return analyzer
raise RuntimeError(f"No analyzer found for detector {detector_name}")
def _do_breadth_first_search(group, targets, obj_deque, obj_next):
"""
Look for a unique element of targets by following the 'next' for object in a queue

Parameters
----------
group: HDF5 Group
The group that contains all possible next named groups
targets:
A structure with named targets that supports `name in targets`
obj_deque:
A queue.deque of HDF5 Groups to be checked
obj_next:
A function that extracts a list of named groups to check from a given group
"""
while len(obj_deque) > 0:
check = obj_next(obj_deque.popleft())
matches = [element for element in check if element in targets]
if len(matches) > 1:
raise ValueError("Non-unique elmeent match")
if len(matches) == 1:
return matches[0]
for element in check:
obj_deque.append(group[element])
raise ValueError("No unique element found")


def analyzer_search(hdf5_instrument_group, analyzers, hdf5_detector_group):
"""
Use a NeXus Group's @inputs attribute to find an analyzer given a detector group

Parameters
----------
hdf5_instrument_group: hdf5.Group
works if inside of a context group
```
scippnexus.File(filename, 'r') as f:
hdf5_instrument_group = f['/entry/instrument']
```
analyzers: Anything with __contains__(str), e.g. dict[str, hdf5.Group]
Something to identify whether we've found a valid analyzer (by name)
hdf5_detector_group: hdf5.Group
any of f['/entry/detector'][scippnexus.NXdectector].values()
"""
from queue import deque

from h5py import Group

def obj_inputs(obj: Group) -> list[str]:
"""Return the specified preceding component(s) list"""
if 'inputs' not in obj.attrs:
raise ValueError('@inputs attribute required for this search to work')
val = obj.attrs['inputs']
# Deal with nexusformat (Python module) or kafka-to-nexus (filewriter)
# silently converting a len(list[str]) == 1 attribute to a str attribute:
return [val] if isinstance(val, str) else val

return _do_breadth_first_search(
hdf5_instrument_group, analyzers, deque([hdf5_detector_group]), obj_inputs
)


def get_detector_analyzer_map(file_spec: NeXusFileSpec[RunType]) -> DetectorAnalyzerMap:
"""Probably not the right sciline way to do this."""

from scippnexus import File, NXcrystal, NXdetector

filename = file_spec.value
with File(filename, 'r') as file:
inst = file['entry/instrument']
analyzers = inst[NXcrystal]
detectors = inst[NXdetector]
return {k: analyzer_search(inst, analyzers, v) for k, v in detectors.items()}


def analyzer_for_detector(
analyzers: Analyzers[RunType],
detector_location: NeXusComponentLocationSpec[snx.NXdetector, RunType],
detector_analyzer_map: DetectorAnalyzerMap,
) -> Analyzer[RunType]:
"""Extract the analyzer for a given detector.

Expand All @@ -98,8 +164,14 @@ def analyzer_for_detector(
"""
if detector_location.component_name is None:
raise ValueError("Detector component name is None")
if (
analyzer_name := detector_analyzer_map.get(detector_location.component_name)
) is None:
raise RuntimeError(
f"No analyzer found for detector {detector_location.component_name}"
)
analyzer = snx.compute_positions(
_get_analyzer_for_detector_name(detector_location.component_name, analyzers),
analyzers[analyzer_name],
store_transform='transform',
)
return Analyzer[RunType](
Expand All @@ -117,4 +189,5 @@ def analyzer_for_detector(
load_instrument_angle,
load_sample_angle,
moderator_class_for_source,
get_detector_analyzer_map,
)
3 changes: 3 additions & 0 deletions src/ess/bifrost/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,6 @@ class McStasRawDetector(sciline.Scope[RunType, sc.DataArray], sc.DataArray): ...


ArcEnergy = NewType('ArcEnergy', sc.Variable)


DetectorAnalyzerMap = NewType('DetectorAnalyzerMap', dict[str, str])
29 changes: 29 additions & 0 deletions src/ess/spectroscopy/indirect/kf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@
)


def _no_time(x: sc.Variable | sc.DataArray) -> sc.Variable:
if isinstance(x, sc.DataArray) and 'time' in x.coords:
return x['time', 0].data
elif isinstance(x, sc.DataArray):
raise ValueError("Only `DataArray`s with a time-coordinate allowed")
return x


def sample_analyzer_vector(
sample_position: sc.Variable,
analyzer_position: sc.Variable,
Expand Down Expand Up @@ -52,6 +60,17 @@ def sample_analyzer_vector(
The vector from the sample position to the interaction point on the analyzer
for each detector element.
"""
# FIXME time-dependent depends-on chains produce positions and transformations
# which are `scipp.DataArray`s with a 'time' coordinate. We don't need the
# time-dependence here since all calculations are done in the rotating
# detector-tank coordinate system where these *have no* time-dependence
# TODO: Verify that we are actually using the rotating detector-tank coordinate
# frame, otherwise we will misidentify the Q vector for each detector
sample_position = _no_time(sample_position)
analyzer_position = _no_time(analyzer_position)
analyzer_transform = _no_time(analyzer_transform)
detector_position = _no_time(detector_position)

# Scipp does not distinguish between coordinates and directions, so we need to do
# some extra legwork to ensure we can apply the orientation transformation
# _and_ obtain a dimensionless direction vector
Expand Down Expand Up @@ -123,6 +142,16 @@ def analyzer_detector_vector(
detector_position: sc.Variable,
) -> sc.Variable:
"""Calculate the analyzer-detector vector"""
# FIXME time-dependent depends-on chains produce positions and transformations
# which are `scipp.DataArray`s with a 'time' coordinate. We don't need the
# time-dependence here since all calculations are done in the rotating
# detector-tank coordinate system where these *have no* time-dependence
# TODO: Verify that we are actually using the rotating detector-tank coordinate
# frame, otherwise we will misidentify the Q vector for each detector
sample_position = _no_time(sample_position)
sample_analyzer_vector = _no_time(sample_analyzer_vector) # FIXME unnecessary?
detector_position = _no_time(detector_position)

analyzer_position = sample_position + sample_analyzer_vector.to(
unit=sample_analyzer_vector.unit
)
Expand Down
Loading