Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 110 additions & 44 deletions src/scippneutron/metadata/_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@
from __future__ import annotations

import enum
import warnings
from datetime import datetime
from typing import Annotated, Any
from typing import Annotated, Any, ClassVar

import scipp as sc
import scippnexus as snx
from dateutil.parser import parse as parse_datetime
from pydantic import BaseModel, BeforeValidator, EmailStr
from scippnexus.typing import H5Group

from ._orcid import ORCIDiD


# Defined at the top so that Pydantic validators can use it.
def _unpack_variable(value: object) -> Any:
"""Before validator to support passing scalar scipp variables as inputs."""
if isinstance(value, sc.Variable):
Expand Down Expand Up @@ -204,6 +207,42 @@ class Person(BaseModel):
affiliation: Annotated[str | None, BeforeValidator(_unpack_variable)] = None
"""Affiliation of the person."""

nx_class: ClassVar[type] = snx.NXuser

@classmethod
def from_nexus_user(cls, group: snx.Group) -> Person:
"""Construct a Person object from a Nexus NXuser group.

Parameters
----------
group:
ScippNexus group for a NeXus group.

Returns
-------
:
An Person object constructed from the given Nexus entry.
"""
return cls(
name=str(group['name'][()]),
orcid_id=_read_optional_nexus_string(group, 'ORCID'),
# User `or None` to convert empty strings to None to bypass validator
email=_read_optional_nexus_string(group, 'email') or None,
corresponding=False,
owner=True,
role=_read_optional_nexus_string(group, 'role'),
address=_read_optional_nexus_string(group, 'address'),
affiliation=_read_optional_nexus_string(group, 'affiliation'),
)

def __write_to_nexus_group__(self, group: H5Group) -> None:
snx.create_field(group, 'name', self.name)
_create_optional_nexus_field(group, 'address', self.address)
_create_optional_nexus_field(group, 'affiliation', self.affiliation)
_create_optional_nexus_field(group, 'email', self.email)
_create_optional_nexus_field(group, 'ORCID', self.orcid_id)
_create_optional_nexus_field(group, 'role', self.role)


class Software(BaseModel):
"""A piece of software.
Expand Down Expand Up @@ -251,6 +290,8 @@ class Software(BaseModel):
a general DOI for the software may be used.
"""

nx_class: ClassVar[str] = 'NXprogram'

@classmethod
def from_package_metadata(cls, package_name: str) -> Software:
"""Construct a Software instance from the metadata of an installed package.
Expand Down Expand Up @@ -289,48 +330,11 @@ def compact_repr(self) -> str:
return f'{self.name_version} ({self.url})'
return self.name_version


def _deduce_package_version(package_name: str) -> str | None:
from importlib.metadata import PackageNotFoundError, version

try:
return version(package_name)
except PackageNotFoundError:
# Either the package is not installed or has no metadata.
from importlib import import_module

try:
package = import_module(package_name)
except ModuleNotFoundError as e:
raise e from None

try:
return package.__version__
except AttributeError:
raise RuntimeError(
f"Package '{package_name}' has no metadata and no "
f"__version__ attribute. Specify the version manually."
) from None


def _deduce_package_source_url(package_name: str) -> str | None:
from importlib.metadata import PackageNotFoundError, metadata

try:
meta = metadata(package_name)
except PackageNotFoundError:
# Either the package is not installed or has no metadata.
return None

if not (urls := meta.get_all("project-url")):
return None

try:
return next(
url.split(',')[-1].strip() for url in urls if url.startswith("Source")
)
except StopIteration:
return None
def __write_to_nexus_group__(self, group: H5Group) -> None:
field = snx.create_field(group, 'program', self.name)
field.attrs['version'] = self.version
if self.url:
field.attrs['url'] = self.url


class SourceType(enum.Enum):
Expand Down Expand Up @@ -372,6 +376,13 @@ class Source(BaseModel):
probe: RadiationProbe
"""Radiation probe of the source."""

nx_class: ClassVar[type] = snx.NXsource

def __write_to_nexus_group__(self, group: H5Group) -> None:
_create_optional_nexus_field(group, 'name', self.name)
snx.create_field(group, 'type', self.source_type.value)
snx.create_field(group, 'probe', self.probe.value)


ESS_SOURCE = Source(
name="ESS Butterfly",
Expand All @@ -385,7 +396,10 @@ def _read_optional_nexus_string(group: snx.Group | None, key: str) -> str | None
if group is None:
return None
if (ds := group.get(key)) is not None:
return ds[()]
data = ds[()]
if not isinstance(data, str):
warnings.warn(f"NeXus field '{key}' is not a string", stacklevel=3)
return str(data)
return None


Expand Down Expand Up @@ -450,3 +464,55 @@ def _guess_facility_and_site(
return facility, site
case facility:
return facility, facility


def _deduce_package_version(package_name: str) -> str | None:
from importlib.metadata import PackageNotFoundError, version

try:
return version(package_name)
except PackageNotFoundError:
# Either the package is not installed or has no metadata.
from importlib import import_module

try:
package = import_module(package_name)
except ModuleNotFoundError as e:
raise e from None

try:
return package.__version__
except AttributeError:
raise RuntimeError(
f"Package '{package_name}' has no metadata and no "
f"__version__ attribute. Specify the version manually."
) from None


def _deduce_package_source_url(package_name: str) -> str | None:
from importlib.metadata import PackageNotFoundError, metadata

try:
meta = metadata(package_name)
except PackageNotFoundError:
# Either the package is not installed or has no metadata.
return None

if not (urls := meta.get_all("project-url")):
return None

try:
return next(
url.split(',')[-1].strip() for url in urls if url.startswith("Source")
)
except StopIteration:
return None


def _create_optional_nexus_field(
group: H5Group, name: str, value: object | None
) -> None:
if value is not None:
if not isinstance(value, str):
value = str(value)
snx.create_field(group, name, value)
116 changes: 116 additions & 0 deletions tests/metadata/metadata_model_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2025 Scipp contributors (https://github.com/scipp)

from typing import cast

import h5py as h5
import pytest
import scipp as sc
import scippnexus as snx
Expand All @@ -11,6 +14,15 @@
from scippneutron import metadata


@pytest.fixture
def nxroot() -> snx.Group:
"""Yield NXroot containing a single NXentry named 'entry'"""
with h5.File('dummy.nxs', mode='w', driver="core", backing_store=False) as f:
root = snx.Group(f, definitions=snx.base_definitions())
root.create_class('entry', snx.NXentry)
yield root


def test_measurement_from_nexus_entry() -> None:
with snx.File(scn.data.get_path('PG3_4844_event.nxs')) as f:
experiment = metadata.Measurement.from_nexus_entry(f['entry'])
Expand Down Expand Up @@ -43,6 +55,73 @@ def test_beamline_from_nexus_entry() -> None:
assert beamline.revision is None


def test_person_from_nexus_group(nxroot: snx.Group) -> None:
user_group = nxroot.create_class("user_Testo", snx.NXuser)
user_group['name'] = "Testo Prøvegaard"
user_group['email'] = "" # empty to check validator bypass
user_group['affiliation'] = 'Fakington University'
user_group['address'] = 'Fakington University, Back Alley 3, Fakington'
user_group['ORCID'] = 'https://orcid.org/0000-0000-0000-0001'
user_group['role'] = 'Principal Investigator'
# Exists in files by we cannot represent it in `Person`:
user_group['facility_user_id'] = 'testo.faking'

person = metadata.Person.from_nexus_user(user_group)
assert person.name == "Testo Prøvegaard"
assert person.email is None
assert person.affiliation == 'Fakington University'
assert person.orcid_id == 'https://orcid.org/0000-0000-0000-0001'
assert person.role == 'Principal Investigator'
assert person.address == 'Fakington University, Back Alley 3, Fakington'

# We have no logic for deducing these from NeXus
assert not person.corresponding
assert person.owner


def test_person_write_to_nexus(nxroot: snx.Group) -> None:
person = metadata.Person(
name='Testo Prøvegaard',
email='testo.prove@fake.uni',
affiliation='Fakington University',
address='Fakington University, Back Alley 3, Fakington',
orcid_id='https://orcid.org/0000-0000-0000-0001',
role='Principal Investigator',
)

nxroot['user_Testo'] = person

assert nxroot['user_Testo'].attrs['NX_class'] == 'NXuser'
loaded = cast(sc.DataGroup[str], nxroot['user_Testo'][()])
assert loaded.keys() == {
'address',
'affiliation',
'email',
'name',
'ORCID',
'role',
}
assert loaded['name'] == 'Testo Prøvegaard'
assert loaded['email'] == 'testo.prove@fake.uni'
assert loaded['affiliation'] == 'Fakington University'
assert loaded['address'] == 'Fakington University, Back Alley 3, Fakington'
assert loaded['ORCID'] == 'https://orcid.org/0000-0000-0000-0001'
assert loaded['role'] == 'Principal Investigator'


def test_person_write_to_nexus_empty_optional(nxroot: snx.Group) -> None:
person = metadata.Person(
name='Testo Prøvegaard',
)

nxroot['user_Testo'] = person

assert nxroot['user_Testo'].attrs['NX_class'] == 'NXuser'
loaded = cast(sc.DataGroup[str], nxroot['user_Testo'][()])
assert loaded.keys() == {'name'}
assert loaded['name'] == 'Testo Prøvegaard'


def test_software_from_from_package_metadata_first_party() -> None:
software = metadata.Software.from_package_metadata('scippneutron')
expected = metadata.Software(
Expand All @@ -68,3 +147,40 @@ def test_software_from_from_package_metadata_third_party() -> None:
def test_software_from_from_package_metadata_fails_when_package_not_installed() -> None:
with pytest.raises(ModuleNotFoundError):
metadata.Software.from_package_metadata('not-a-package')


def test_software_write_to_nexus(nxroot: snx.Group) -> None:
software = metadata.Software(
name='scippneutron',
version='0.1.0',
url='https://github.com/scipp/scippneutron',
doi='10.1007/s11224-022-02522-2',
)

nxroot['program'] = software

assert nxroot['program'].attrs['NX_class'] == 'NXprogram'
loaded = cast(sc.DataGroup[str], nxroot['program'][()])
assert loaded.keys() == {'program'}
assert loaded['program'] == 'scippneutron'
prog = nxroot['program']['program']
assert prog.attrs.keys() == {'version', 'url'}
assert prog.attrs['version'] == '0.1.0'
assert prog.attrs['url'] == 'https://github.com/scipp/scippneutron'


def test_source_write_to_nexus(nxroot: snx.Group) -> None:
source = metadata.Source(
name="Test source",
source_type=metadata.SourceType.SpallationNeutronSource,
probe=metadata.RadiationProbe.Neutron,
)

nxroot['source'] = source

assert nxroot['source'].attrs['NX_class'] == 'NXsource'
loaded = cast(sc.DataGroup[str], nxroot['source'][()])
assert loaded.keys() == {'name', 'type', 'probe'}
assert loaded['name'] == 'Test source'
assert loaded['type'] == 'Spallation Neutron Source'
assert loaded['probe'] == 'neutron'
Loading