diff --git a/.github/workflows/release_ci.yml b/.github/workflows/release_ci.yml new file mode 100644 index 0000000..511cb51 --- /dev/null +++ b/.github/workflows/release_ci.yml @@ -0,0 +1,161 @@ +--- + name: Build and publish release + run-name: ${{ github.actor }} is preparing the next release + + permissions: + pull-requests: write + contents: write + packages: write + + "on": + workflow_dispatch: + inputs: + rc: + description: "Release candidate number for this release." + required: true + type: number + push: + tags: + - "v*.*.*" + + jobs: + release-type-determination: + name: Determine if tag is pre-release or not + runs-on: ubuntu-latest + outputs: + is_prerelease: ${{ steps.check-prerelease.outputs.is_prerelease }} + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Python 3 + uses: actions/setup-python@v4 + with: + python-version: "3.x" + + - name: Installing dependencies + run: pip install git-python packaging tomli requests + + - name: Getting release job + run: curl -Ls -o rc.py https://raw.githubusercontent.com/freva-org/freva-admin/main/release.py + + - name: Check if tag is a pre-release + id: check-prerelease + run: | + RC="${{ github.event_name == 'workflow_dispatch' && inputs.rc || '' }}" + IS_PRERELEASE="false" + if [ "$RC" ]; then + python3 rc.py rc $RC xarray_prism -p . 1> tag.txt + IS_PRERELEASE="true" + else + python3 -c "exec(open('src/xarray_prism/_version.py').read()); print(__version__)" 1> tag.txt + fi + echo "$IS_PRERELEASE" > is_prerelease.txt + echo "is_prerelease=$IS_PRERELEASE" >> "$GITHUB_OUTPUT" + + - name: Upload Outputs + uses: actions/upload-artifact@v4 + with: + name: release-metadata + path: | + tag.txt + is_prerelease.txt + + tests: + uses: ./.github/workflows/tests_ci.yml + + pypi: + name: Publish to PyPI + needs: [tests, release-type-determination] + if: needs.release-type-determination.outputs.is_prerelease == 'false' + runs-on: ubuntu-latest + permissions: + id-token: write + contents: read + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v4 + with: + python-version: "3.x" + - run: python -m pip install build + - run: python -m build + - name: Publish to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + with: + skip-existing: true + verbose: true + + pypi-prerelease: + name: Publish pre-release to PyPI + needs: [tests, release-type-determination] + if: needs.release-type-determination.outputs.is_prerelease == 'true' + runs-on: ubuntu-latest + permissions: + id-token: write + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Download Release Metadata + uses: actions/download-artifact@v4 + with: + name: release-metadata + + - name: Read tag version + run: | + TAG=$(tail -1 tag.txt) + echo "TAG=$TAG" >> $GITHUB_ENV + + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: "3.x" + + - name: Install build tools + run: python -m pip install build tomli + + - name: Update version for pre-release + run: | + sed -i "s/__version__ = .*/__version__ = \"${{ env.TAG }}\"/" src/xarray_prism/_version.py + + - name: Build package + run: python -m build + + - name: Publish to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + with: + password: ${{ secrets.TEST_PYPI_TOKEN }} + skip-existing: true + verbose: true + + github-release: + name: Create GitHub Release + needs: [tests, release-type-determination] + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Download Release Metadata + uses: actions/download-artifact@v4 + with: + name: release-metadata + path: release-metadata + + - name: Read Metadata + shell: bash + run: | + TAG="$(tail -n 1 release-metadata/tag.txt)" + IS_PRERELEASE="$(tail -n 1 release-metadata/is_prerelease.txt)" + echo "TAG=$TAG" >> "$GITHUB_ENV" + echo "IS_PRERELEASE=$IS_PRERELEASE" >> "$GITHUB_ENV" + + - name: Create GitHub Release + uses: softprops/action-gh-release@v1 + with: + tag_name: v${{ env.TAG }} + name: Release v${{ env.TAG }} + prerelease: ${{ env.IS_PRERELEASE == 'true' }} + generate_release_notes: true + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/tests_ci.yml b/.github/workflows/tests_ci.yml new file mode 100644 index 0000000..7da9e19 --- /dev/null +++ b/.github/workflows/tests_ci.yml @@ -0,0 +1,117 @@ +name: CI Tests + +permissions: + pull-requests: write + contents: write + packages: write + +on: [push, pull_request, workflow_call] + +jobs: + lint-and-types: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: "3.12" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + python -m pip install tox + + - name: Run lint and type checks + run: tox run-parallel --parallel-no-spinner -e lint,types + + tests: + runs-on: ubuntu-latest + strategy: + max-parallel: 4 + matrix: + python-version: ["3.10", "3.11", "3.12", "3.13"] + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup micromamba + uses: mamba-org/setup-micromamba@v1 + with: + micromamba-version: "latest" + environment-name: xarray-prism + cache-environment: true + cache-downloads: true + create-args: >- + python=${{ matrix.python-version }} + hdf5 + netcdf4 + h5py + h5netcdf + eccodes + cfgrib + rasterio + gdal + pip + tox + condarc: | + channels: + - conda-forge + channel_priority: strict + init-shell: bash + + - name: Set up services + shell: bash -l {0} + run: | + docker compose -f dev-env/docker-compose.yaml up -d --remove-orphans + + - name: Wait for MinIO + shell: bash -l {0} + run: | + echo "Waiting for MinIO to be ready..." + timeout 60 bash -c 'until curl -sf http://localhost:9000/minio/health/ready; do sleep 2; done' + echo "MinIO is ready" + + - name: Wait for THREDDS + shell: bash -l {0} + run: | + echo "Waiting for THREDDS to be ready..." + timeout 120 bash -c 'until curl -sf http://localhost:8088/thredds/catalog.html; do sleep 5; done' + echo "THREDDS is ready" + + - name: Run tests + shell: bash -l {0} + run: VIRTUALENV_SYSTEM_SITE_PACKAGES=1 tox -e test + + - name: Upload coverage to Codecov + if: matrix.python-version == '3.12' + uses: codecov/codecov-action@v3 + with: + token: ${{ secrets.CODECOV_TOKEN }} + verbose: true + + dependabot: + name: Merge PR by dependabot + runs-on: ubuntu-latest + needs: [tests, lint-and-types] + if: github.event.pull_request.user.login == 'dependabot[bot]' + steps: + - name: Dependabot metadata + id: metadata + uses: dependabot/fetch-metadata@v1 + with: + github-token: "${{ secrets.GITHUB_TOKEN }}" + + - name: Approve dependabot's PR + run: gh pr review --approve "$PR_URL" + env: + PR_URL: ${{ github.event.pull_request.html_url }} + GITHUB_TOKEN: ${{ secrets.TOKEN }} + + - name: Auto-merge for dependabot's PR + run: gh pr merge --merge --auto "$PR_URL" + env: + PR_URL: ${{ github.event.pull_request.html_url }} + GITHUB_TOKEN: ${{ secrets.TOKEN }} \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0403113 --- /dev/null +++ b/.gitignore @@ -0,0 +1,139 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +report.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# Other +dev-env/mongodb +dev-env/certs +*keycloakdb.lock.db +*keycloakdb.* +stac-catalog* +.DS_Store \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..a05616e --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,23 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +## [v2602.0.0] +### Added +- Initial release of the project. + + +# Template: +## [Unreleased] + +### Added +- New feature X. +- New feature Y. + +### Changed +- Improved performance in component A. +- Updated dependency B to version 2.0.0. + +### Fixed +- Fixed issue causing application crash on startup. +- Fixed bug preventing users from logging in. \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..945d0fb --- /dev/null +++ b/LICENSE @@ -0,0 +1,28 @@ +BSD 3-Clause License + +Copyright (c) 2023, Climate Informatics and Technologies (CLINT) + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +3. Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..859abfe --- /dev/null +++ b/README.md @@ -0,0 +1,278 @@ +# Xarray Prism Engine + +A multi-format and multi-storage xarray engine with automatic engine detection, +and ability to register new data format and uri type for climate data. + +> [!Important] +> If you encounter with a data formats that `freva` engine is not able to open, please +> files an issue report [here](https://github.com/freva-org/freva-xarray/issues/new). +> This helps us to improve the engine enabling users work with different kinds of climate data. + + +## Installation + +### Install via PyPI + +```bash +pip install xarray-prism +``` + +### Install via Conda + +```bash +conda install xarray-prism +``` + +## Quick Start + +### Using with xarray + +```python +import xarray as xr + +# Auto-detect format +ds = xr.open_dataset("my_data.unknown_fmt", engine="prism") + +# Remote Zarr on S3 +ds = xr.open_dataset( + "s3://freva/workshop/tas.zarr", + engine="prism", + storage_options={ + "anon": True, + "client_kwargs": { + "endpoint_url": "https://s3.eu-dkrz-1.dkrz.cloud" + } + } +) + +# Remote NetCDF3 on S3 +ds = xr.open_dataset( + "s3://freva/workshop/tas.nc", + engine="prism", + storage_options={ + "anon": True, + "client_kwargs": { + "endpoint_url": "https://s3.eu-dkrz-1.dkrz.cloud" + } + } +) + +# Remote NetCDF4 on S3 +ds = xr.open_dataset( + "s3://freva/workshop/tas.nc4", + engine="prism", + storage_options={ + "anon": True, + "client_kwargs": { + "endpoint_url": "https://s3.eu-dkrz-1.dkrz.cloud" + } + } +) + +# Remote Zarr on S3 - non-anon +ds = xr.open_dataset( + "s3://bucket/data.zarr", + engine="prism", + storage_options={ + "key": "YOUR_KEY", + "secret": "YOUR_SECRET", + "client_kwargs": { + "endpoint_url": "S3_ENDPOINT" + } + } +) + +# OPeNDAP from THREDDS +ds = xr.open_dataset( + "https://icdc.cen.uni-hamburg.de/thredds/dodsC/ftpthredds/ar5_sea_level_rise/gia_mean.nc", + engine="prism" +) + +# Local GRIB file +ds = xr.open_dataset("forecast.grib2", engine="prism") + +# GeoTIFF +ds = xr.open_dataset("satellite.tif", engine="prism") + +# tip: Handle the cache manually by yourself +xr.open_dataset( + "simplecache::s3://bucket/file.nc3", + engine="prism", + storage_options={ + "s3": {"anon": True, "client_kwargs": {"endpoint_url": "..."}}, + "simplecache": {"cache_storage": "/path/to/cache"} + } +) + +# Even for the tif format on the S3 you can pass the credential through +# storage_options which is not supported by rasterio: +xr.open_dataset( + "s3://bucket/file.tif", + engine="prism", + storage_options={ + "key": "YOUR_KEY", + "secret": "YOUR_SECRET", + "client_kwargs": { + "endpoint_url": "S3_ENDPOINT" + } + } +) +``` + +## Supported Formats + + +|Data format | Remote backend | Local FS | Cache| +|--------------|------------------------|-----------|-----------| +|GRIB | cfgrib + fsspec | cfgrib | fsspec simplecache (full-file)| +|Zarr | zarr + fsspec | zarr | chunked key/value store| +|NetCDF3 | scipy + fsspec | scipy | fsspec byte cache (5 MB blocks but full dowload)| +|NetCDF4/HDF5 | h5netcdf + fsspec | h5netcdf | fsspec byte cache (5 MB block)| +|GeoTIFF | rasterio + fsspec | rasterio | GDAL/rasterio block cache (5 MB block)| +|OPeNDAP/DODS | netCDF4 | n/a | n/a| + + +> [!WARNING] +> **Remote GRIB & NetCDF3 require full file download** +> +> Unlike Zarr or HDF5, these formats don't support partial/chunk reads over the network. +> +> By default, xarray-prism caches files in the system temp directory. +> This works well for most cases. +> If temp storage is a concern (e.g., limited space or cleared on reboot), +> you can specify a persistent cache: +> +> | Option | How | +> |--------|-----| +> | Environment variable | `export XARRAY_PRISM_CACHE=/path/to/cache` | +> | Per-call | `storage_options={"simplecache": {"cache_storage": "/path"}}` | +> | Default | System temp directory | + + +## Customization + +### Custom Format Detectors and URI Types + +You can extend **xarray-prism** with custom *format detectors*, *URI types*, and *open handlers* by providing a small plugin package. +Registration happens **at import time**, so importing the plugin activates it. + +### Plugin structure + +```text +xarray_prism_myplugin/ + __init__.py # imports the plugin module (triggers registration) + plugin.py # detectors, URI types, and open handlers +pyproject.toml +``` + +### Plugin implementation + +`xarray_prism_myplugin/__init__.py` + +```python +from .plugin import * # noqa: F401,F403 +``` + +`xarray_prism_myplugin/plugin.py` + +```python +import xarray as xr +from xarray_prism import register_detector, register_uri_type, registry + + +@register_uri_type(priority=100) +def detect_myfs_uri(uri: str): + """Detect a custom filesystem URI.""" + if uri.lower().startswith("myfs://"): + return "myfs" + return None + + +@register_detector(priority=100) +def detect_foo_format(uri: str): + """Detect a custom file format.""" + if uri.lower().endswith(".foo"): + return "foo" + return None + + +@registry.register("foo", uri_type="myfs") +def open_foo_from_myfs(uri: str, **kwargs): + """Open .foo files from myfs:// URIs.""" + translated = uri.replace("myfs://", "https://my-gateway.example/") + return xr.open_dataset(translated, engine="h5netcdf", **kwargs) +``` + +### Plugin installation + +`pyproject.toml` + +```toml +[project] +name = "xarray-prism-myplugin" +version = "0.1.0" +dependencies = ["xarray-prism"] + +[project.entry-points."xarray_prism.plugins"] +myplugin = "xarray_prism_myplugin" +``` + +### Using the plugin + +After installing the plugin package, **import it once** to activate the registrations: + +```python +import xarray_prism_myplugin # activates detectors and handlers + +import xarray as xr +ds = xr.open_dataset("myfs://bucket/path/data.foo", engine="prism") +``` + + +## Development + +### Setup Development Environment + +```bash +# Start test services (MinIO, THREDDS) +docker-compose -f dev-env/docker-compose.yaml up -d --remove-orphans + +# Create conda environment +conda create -n xarray-prism python=3.12 -y +conda activate xarray-prism + +# Install package in editable mode with dev dependencies +pip install -e ".[dev]" +``` + +### Running Tests + +```bash +# Run tests +tox -e test + +# Run with coverage +tox -e test-cov + +# Lint +tox -e lint + +# Type checking +tox -e types + +# Auto-format code +tox -e format +``` + +### Creating a Release + +Releases are managed via GitHub Actions and tox: + +```bash +# Tag a new release (creates git tag) +tox -e release +``` + +The release workflow is triggered automatically when: +- A version tag (`v*.*.*`) is pushed -> Full release to PyPI +- Manual workflow dispatch with RC number -> Pre-release to PyPI diff --git a/data/geodata/TCD/2021/10m/districts/DE111/TCD_S2021_R10m_DE111.tif b/data/geodata/TCD/2021/10m/districts/DE111/TCD_S2021_R10m_DE111.tif new file mode 100644 index 0000000..80d79ae Binary files /dev/null and b/data/geodata/TCD/2021/10m/districts/DE111/TCD_S2021_R10m_DE111.tif differ diff --git a/data/grib_data/gfs/2025/11/25/test.grib2 b/data/grib_data/gfs/2025/11/25/test.grib2 new file mode 100644 index 0000000..9aa35f6 Binary files /dev/null and b/data/grib_data/gfs/2025/11/25/test.grib2 differ diff --git a/data/grib_data/gfs/2025/11/25/test.grib2.5b7b6.idx b/data/grib_data/gfs/2025/11/25/test.grib2.5b7b6.idx new file mode 100755 index 0000000..82e58d1 Binary files /dev/null and b/data/grib_data/gfs/2025/11/25/test.grib2.5b7b6.idx differ diff --git a/data/model/global/cmip6/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/amip/r1i1p1f1/Amon/ua/gn/v20191108/ua_Amon_ACCESS-CM2_amip_r1i1p1f1_gn_197001-201512.nc b/data/model/global/cmip6/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/amip/r1i1p1f1/Amon/ua/gn/v20191108/ua_Amon_ACCESS-CM2_amip_r1i1p1f1_gn_197001-201512.nc new file mode 100644 index 0000000..fa3f87e Binary files /dev/null and b/data/model/global/cmip6/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/amip/r1i1p1f1/Amon/ua/gn/v20191108/ua_Amon_ACCESS-CM2_amip_r1i1p1f1_gn_197001-201512.nc differ diff --git a/data/model/global/cmip6/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/amip/r1i1p1f1/Amon/ua/gn/v20201108/ua_Amon_ACCESS-CM2_amip_r1i1p1f1_gn_197901-201412.nc b/data/model/global/cmip6/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/amip/r1i1p1f1/Amon/ua/gn/v20201108/ua_Amon_ACCESS-CM2_amip_r1i1p1f1_gn_197901-201412.nc new file mode 100644 index 0000000..fa3f87e Binary files /dev/null and b/data/model/global/cmip6/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/amip/r1i1p1f1/Amon/ua/gn/v20201108/ua_Amon_ACCESS-CM2_amip_r1i1p1f1_gn_197901-201412.nc differ diff --git a/data/model/global/cmip6/CMIP6/CMIP/MPI-M/MPI-ESM1-2-LR/amip/r2i1p1f1/Amon/ua/gn/v20190815/ua_mon_MPI-ESM1-2-LR_amip_r2i1p1f1_gn_197901-199812.nc b/data/model/global/cmip6/CMIP6/CMIP/MPI-M/MPI-ESM1-2-LR/amip/r2i1p1f1/Amon/ua/gn/v20190815/ua_mon_MPI-ESM1-2-LR_amip_r2i1p1f1_gn_197901-199812.nc new file mode 100644 index 0000000..30362f5 Binary files /dev/null and b/data/model/global/cmip6/CMIP6/CMIP/MPI-M/MPI-ESM1-2-LR/amip/r2i1p1f1/Amon/ua/gn/v20190815/ua_mon_MPI-ESM1-2-LR_amip_r2i1p1f1_gn_197901-199812.nc differ diff --git a/data/model/global/cmip6/CMIP6/CMIP/MPI-M/MPI-ESM1-2-LR/amip/r2i1p1f1/Amon/ua/gn/v20190815/ua_mon_MPI-ESM1-2-LR_amip_r2i1p1f1_gn_197901-199812_another.nc b/data/model/global/cmip6/CMIP6/CMIP/MPI-M/MPI-ESM1-2-LR/amip/r2i1p1f1/Amon/ua/gn/v20190815/ua_mon_MPI-ESM1-2-LR_amip_r2i1p1f1_gn_197901-199812_another.nc new file mode 100644 index 0000000..30362f5 Binary files /dev/null and b/data/model/global/cmip6/CMIP6/CMIP/MPI-M/MPI-ESM1-2-LR/amip/r2i1p1f1/Amon/ua/gn/v20190815/ua_mon_MPI-ESM1-2-LR_amip_r2i1p1f1_gn_197901-199812_another.nc differ diff --git a/data/model/obs/reanalysis/reanalysis/NOAA/NODC/OC5/mon/ocean/Omon/r1i1p1/v20200101/hc700/hc700_mon_NODC_OC5_r1i1p1_201201-201212.nc b/data/model/obs/reanalysis/reanalysis/NOAA/NODC/OC5/mon/ocean/Omon/r1i1p1/v20200101/hc700/hc700_mon_NODC_OC5_r1i1p1_201201-201212.nc new file mode 100644 index 0000000..ab03924 Binary files /dev/null and b/data/model/obs/reanalysis/reanalysis/NOAA/NODC/OC5/mon/ocean/Omon/r1i1p1/v20200101/hc700/hc700_mon_NODC_OC5_r1i1p1_201201-201212.nc differ diff --git a/data/model/regional/cordex/output/EUR-11/CLMcom/MPI-M-MPI-ESM-LR/historical/r0i0p0/CLMcom-CCLM4-8-17/v1/fx/orog/v20140515/orog_EUR-11_MPI-M-MPI-ESM-LR_historical_r1i1p1_CLMcom-CCLM4-8-17_v1_fx.nc b/data/model/regional/cordex/output/EUR-11/CLMcom/MPI-M-MPI-ESM-LR/historical/r0i0p0/CLMcom-CCLM4-8-17/v1/fx/orog/v20140515/orog_EUR-11_MPI-M-MPI-ESM-LR_historical_r1i1p1_CLMcom-CCLM4-8-17_v1_fx.nc new file mode 100644 index 0000000..d5bf4dd Binary files /dev/null and b/data/model/regional/cordex/output/EUR-11/CLMcom/MPI-M-MPI-ESM-LR/historical/r0i0p0/CLMcom-CCLM4-8-17/v1/fx/orog/v20140515/orog_EUR-11_MPI-M-MPI-ESM-LR_historical_r1i1p1_CLMcom-CCLM4-8-17_v1_fx.nc differ diff --git a/data/model/regional/cordex/output/EUR-11/CLMcom/MPI-M-MPI-ESM-LR/historical/r0i0p0/CLMcom-CCLM4-8-17/v1/fx/orog/v20140515/orog_EUR-11_MPI-M-MPI-ESM-LR_historical_r1i1p1_CLMcom-CCLM4-8-17_v1_fx_another.nc b/data/model/regional/cordex/output/EUR-11/CLMcom/MPI-M-MPI-ESM-LR/historical/r0i0p0/CLMcom-CCLM4-8-17/v1/fx/orog/v20140515/orog_EUR-11_MPI-M-MPI-ESM-LR_historical_r1i1p1_CLMcom-CCLM4-8-17_v1_fx_another.nc new file mode 100644 index 0000000..d5bf4dd Binary files /dev/null and b/data/model/regional/cordex/output/EUR-11/CLMcom/MPI-M-MPI-ESM-LR/historical/r0i0p0/CLMcom-CCLM4-8-17/v1/fx/orog/v20140515/orog_EUR-11_MPI-M-MPI-ESM-LR_historical_r1i1p1_CLMcom-CCLM4-8-17_v1_fx_another.nc differ diff --git a/data/model/regional/cordex/output/EUR-11/CLMcom/MPI-M-MPI-ESM-LR/historical/r1i1p1/CLMcom-CCLM4-8-17/v1/daypt/tas/v20140515/tas_EUR-11_MPI-M-MPI-ESM-LR_historical_r1i1p1_CLMcom-CCLM4-8-17_v1_daypt_194912011200-194912101200.nc b/data/model/regional/cordex/output/EUR-11/CLMcom/MPI-M-MPI-ESM-LR/historical/r1i1p1/CLMcom-CCLM4-8-17/v1/daypt/tas/v20140515/tas_EUR-11_MPI-M-MPI-ESM-LR_historical_r1i1p1_CLMcom-CCLM4-8-17_v1_daypt_194912011200-194912101200.nc new file mode 100644 index 0000000..d5bf4dd Binary files /dev/null and b/data/model/regional/cordex/output/EUR-11/CLMcom/MPI-M-MPI-ESM-LR/historical/r1i1p1/CLMcom-CCLM4-8-17/v1/daypt/tas/v20140515/tas_EUR-11_MPI-M-MPI-ESM-LR_historical_r1i1p1_CLMcom-CCLM4-8-17_v1_daypt_194912011200-194912101200.nc differ diff --git a/data/model/regional/cordex/output/EUR-11/GERICS/NCC-NorESM1-M/rcp85/r1i1p1/GERICS-REMO2015/v1/3hr/pr/v20181212/pr_EUR-11_NCC-NorESM1-M_rcp85_r1i1p1_GERICS-REMO2015_v2_3hr_200701020130-200701020430.nc b/data/model/regional/cordex/output/EUR-11/GERICS/NCC-NorESM1-M/rcp85/r1i1p1/GERICS-REMO2015/v1/3hr/pr/v20181212/pr_EUR-11_NCC-NorESM1-M_rcp85_r1i1p1_GERICS-REMO2015_v2_3hr_200701020130-200701020430.nc new file mode 100644 index 0000000..0290daf Binary files /dev/null and b/data/model/regional/cordex/output/EUR-11/GERICS/NCC-NorESM1-M/rcp85/r1i1p1/GERICS-REMO2015/v1/3hr/pr/v20181212/pr_EUR-11_NCC-NorESM1-M_rcp85_r1i1p1_GERICS-REMO2015_v2_3hr_200701020130-200701020430.nc differ diff --git a/dev-env/docker-compose.yaml b/dev-env/docker-compose.yaml new file mode 100644 index 0000000..3ddf47f --- /dev/null +++ b/dev-env/docker-compose.yaml @@ -0,0 +1,50 @@ +--- +services: + + minio: + networks: + - xarray-prism + image: quay.io/minio/minio + hostname: minio + environment: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + command: server /data --console-address :9001 + ports: + - "9000:9000" + - "9001:9001" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/ready"] + interval: 2s + timeout: 1s + retries: 30 + + setup-minio: + networks: + - xarray-prism + image: quay.io/minio/mc + environment: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + hostname: minio-setup + volumes: + - ../data:/seed:ro + - ./setup-minio.sh:/usr/local/bin/setup-minio:ro + entrypoint: ["sh", "/usr/local/bin/setup-minio"] + depends_on: + minio: + condition: service_healthy + thredds: + image: unidata/thredds-docker:5.8 + container_name: thredds + restart: unless-stopped + ports: + - "8088:8080" + environment: + - TZ=Europe/Berlin + volumes: + - ./thredds/content/catalog.xml:/usr/local/tomcat/content/thredds/catalog.xml + - ../data:/data:ro +networks: + xarray-prism: + driver: bridge diff --git a/dev-env/setup-minio.sh b/dev-env/setup-minio.sh new file mode 100644 index 0000000..5f85244 --- /dev/null +++ b/dev-env/setup-minio.sh @@ -0,0 +1,13 @@ +#!/bin/sh +set -eu + +mc alias set local http://minio:9000 minioadmin minioadmin +mc mb -p local/testdata || true + +mc cp /seed/grib_data/gfs/2025/11/25/test.grib2 local/testdata/ +mc cp /seed/model/regional/cordex/output/EUR-11/GERICS/NCC-NorESM1-M/rcp85/r1i1p1/GERICS-REMO2015/v1/3hr/pr/v20181212/pr_EUR-11_NCC-NorESM1-M_rcp85_r1i1p1_GERICS-REMO2015_v2_3hr_200701020130-200701020430.nc local/testdata/ +mc cp /seed/geodata/TCD/2021/10m/districts/DE111/TCD_S2021_R10m_DE111.tif local/testdata/ + +mc anonymous set public local/testdata + +mc ls local/testdata diff --git a/dev-env/thredds/content/catalog.xml b/dev-env/thredds/content/catalog.xml new file mode 100644 index 0000000..fa87e92 --- /dev/null +++ b/dev-env/thredds/content/catalog.xml @@ -0,0 +1,23 @@ + + + + + + + + + + + all + + + + + + + + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..71e4865 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,215 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "xarray-prism" +dynamic = ["version"] +description = "A multi-format and multi-storage xarray engine with automatic engine detection, and ability to register new data format and uri type for climate data." +readme = "README.md" +authors = [{name = "DKRZ, Clint", email = "freva@dkrz.de"}] +license = {file = "LICENSE"} +requires-python = ">=3.9" +keywords = ["xarray", "climate", "netcdf", "zarr", "grib", "geotiff"] +classifiers = [ + "Development Status :: 4 - Beta", + "Environment :: Console", + "Intended Audience :: Developers", + "Intended Audience :: Science/Research", + "License :: OSI Approved :: BSD License", + "Operating System :: POSIX :: Linux", + "Programming Language :: Python :: 3", +] + +dependencies = [ + "xarray", + "fsspec", + "h5py", + "h5netcdf", + "scipy", + "zarr", + "cfgrib", + "eccodes", + "rioxarray", + "rasterio", + "netCDF4", + "s3fs", + "gcsfs", + "adlfs", +] + +[project.optional-dependencies] + +dev = [ + "pytest", + "pytest-cov", + "mypy", + "black", + "isort", + "flake8", + "codespell", + "tox", + "ipython" +] + +[project.urls] +Issues = "https://github.com/freva-org/xarray-prism/issues" +Source = "https://github.com/freva-org/xarray-prism/" + +# IMPORTANT: Register as xarray backend, auto-discovered by xarray +[project.entry-points."xarray.backends"] +prism = "xarray_prism.entrypoint:PrismBackendEntrypoint" +[tool.codespell] +ignore-words-list = "iterm" +[tool.setuptools.dynamic] +version = {attr = "xarray_prism._version.__version__"} +[tool.setuptools.packages.find] +where = ["src"] +include = ["xarray_prism*"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = ["test_*.py"] +addopts = "-v" +env = [ + "MINIO_ENDPOINT=http://localhost:9000", + "MINIO_ACCESS_KEY=minioadmin", + "MINIO_SECRET_KEY=minioadmin", + "THREDDS_ENDPOINT=http://localhost:8088", +] +filterwarnings = [ + "ignore::UserWarning", + "ignore::DeprecationWarning", +] + +[tool.mypy] +python_version = "3.10" +warn_return_any = true +warn_unused_configs = true +ignore_missing_imports = true +files = "src/xarray_prism" +strict = false +follow_imports = "silent" + +[tool.black] +line-length = 88 +target-version = ["py39", "py310", "py311", "py312", "py313", "py314"] + +[tool.isort] +profile = "black" +line_length = 88 +known_first_party = ["xarray_prism"] + +[tool.flake8] +ignore = ["F405", "F403", "E704", "W503", "C901"] +max-line-length = 88 + + +[tool.tox] +legacy_tox_ini = """ +[tox] +min_version = 4.0 +env_list = test, lint, types +passenv = * + +[testenv] +passenv = * +parallel_show_output = false + +[testenv:test] +description = "Run the tests" +setenv = + MINIO_ENDPOINT = http://localhost:9000 + MINIO_ACCESS_KEY = minioadmin + MINIO_SECRET_KEY = minioadmin + THREDDS_ENDPOINT = http://localhost:8088 +deps = + pytest + pytest-cov + pytest-env +commands_pre = + python -m pip install --no-deps -e . +commands = + python3 -m pytest -vv tests/ {posargs} + +[testenv:test-cov] +description = "Run tests with coverage" +setenv = + MINIO_ENDPOINT = http://localhost:9000 + MINIO_ACCESS_KEY = minioadmin + MINIO_SECRET_KEY = minioadmin + THREDDS_ENDPOINT = http://localhost:8088 +deps = + -e .[dev] + pytest-env +commands = + python3 -m pytest -vv --cov=xarray_prism --cov-report=html:coverage_report --cov-report=xml --junitxml=report.xml tests/ {posargs} + python3 -m coverage report --fail-under=80 --precision=2 + +[testenv:lint] +description = "Check code quality." +skip_install = true +deps = + black + isort + flake8 + codespell +commands = + python3 -m isort --check --profile black -t py39 -l 88 src/xarray_prism + python3 -m black --check src/xarray_prism tests + python3 -m flake8 src/xarray_prism --count --max-complexity=10 --ignore=F405,F403,E704,W503,C901 --max-line-length=88 --statistics --show-source + codespell --quiet-level=2 src/xarray_prism + +[testenv:types] +description = "Static type checking." +skip_install = true +deps = + mypy + types-setuptools +commands = + python3 -m mypy src/xarray_prism --ignore-missing-imports + +[testenv:format] +description = "Auto-format code." +deps = + black + isort +commands = + python3 -m isort --profile black -t py39 -l 88 src/xarray_prism tests + python3 -m black src/xarray_prism tests + +[testenv:release] +description = "Tag a new release." +deps = + git-python + packaging + requests + tomli +commands = + python3 bump.py tag xarray_prism -p . +allowlist_externals = + rm + curl +commands_pre = + curl -H 'Cache-Control: no-cache' -Ls -o bump.py https://raw.githubusercontent.com/freva-org/freva-admin/main/release.py +commands_post = + rm bump.py + +[testenv:bump] +description = "Deploy version bump." +deps = + git-python + packaging + requests + tomli +commands = + python3 bump.py deploy xarray_prism -p . +allowlist_externals = + rm + curl +commands_pre = + curl -H 'Cache-Control: no-cache' -Ls -o bump.py https://raw.githubusercontent.com/freva-org/freva-admin/main/release.py +commands_post = + rm bump.py + +""" \ No newline at end of file diff --git a/src/xarray_prism/__init__.py b/src/xarray_prism/__init__.py new file mode 100644 index 0000000..fcf6d8f --- /dev/null +++ b/src/xarray_prism/__init__.py @@ -0,0 +1,40 @@ +# ---------------------------------------------------------------# +# Data format | Remote backend | Local FS | Cache +# ---------------------------------------------------------------# +# GRIB | cfgrib + fsspec | cfgrib | fsspec simplecache (full-file) +# Zarr | zarr + fsspec | zarr | chunked key/value store +# NetCDF3 | scipy + fsspec | scipy | fsspec byte cache (full-file) +# NetCDF4/HDF5 | h5netcdf + fsspec | h5netcdf | fsspec byte cache (5 MB blocks) +# GeoTIFF | rasterio + fsspec | rasterio | GDAL/rasterio block cache +# OPeNDAP/DODS | netCDF4 | n/a | n/a +# ---------------------------------------------------------------# + +# Important: GRIB and NetCDF3 files are not chunk-addressable. +# cfgrib and scipy typically must read the entire file (and build +# its index) even when only a small subset is requested. + +from ._detection import ( + detect_engine, + detect_uri_type, + register_detector, + register_uri_type, +) +from ._registry import registry +from ._version import __version__ # noqa + +__all__ = [ + "PrismBackendEntrypoint", + "detect_engine", + "detect_uri_type", + "register_detector", + "register_uri_type", + "registry", +] + + +def __getattr__(name): + if name == "PrismBackendEntrypoint": + from .entrypoint import PrismBackendEntrypoint + + return PrismBackendEntrypoint + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/src/xarray_prism/_detection.py b/src/xarray_prism/_detection.py new file mode 100644 index 0000000..3672639 --- /dev/null +++ b/src/xarray_prism/_detection.py @@ -0,0 +1,276 @@ +"""Engine and URI type detection for xarray data sources.""" + +from __future__ import annotations + +from functools import lru_cache +from typing import Any, Callable, Dict, List, Literal, Optional, Tuple + +Detector = Callable[[str], Optional[str]] + +_custom_detectors: List[Tuple[int, Detector]] = [] + +_custom_uri_type_detectors: List[Tuple[int, Detector]] = [] + +Engine = Literal["cfgrib", "scipy", "h5netcdf", "rasterio", "unknown"] + +# --------------------------------------------------------------------------- # +# External Detector Registration +# --------------------------------------------------------------------------- # + + +def register_detector(priority: int = 50): + """ + Decorator to register a custom format detector. + + Higher priority runs first. Built-in detection runs at priority 0. + """ + + def decorator(func: Detector) -> Detector: + _custom_detectors.append((priority, func)) + # important: highest first + _custom_detectors.sort(key=lambda x: -x[0]) + # Clear cache when new detector added + _detect_engine_cached.cache_clear() + return func + + return decorator + + +def register_uri_type(priority: int = 50): + """ + Decorator to register a custom URI type detector. + + Higher priority runs first. Built-in detection runs at priority 0. + """ + + def decorator(func: Detector) -> Detector: + _custom_uri_type_detectors.append((priority, func)) + _custom_uri_type_detectors.sort(key=lambda x: -x[0]) + return func + + return decorator + + +def unregister_detector(func: Detector) -> bool: + """Remove a registered detector.""" + global _custom_detectors + original_len = len(_custom_detectors) + _custom_detectors = [(p, f) for p, f in _custom_detectors if f is not func] + if len(_custom_detectors) != original_len: + _detect_engine_cached.cache_clear() + return True + return False + + +def unregister_uri_type(func: Detector) -> bool: + """Remove a registered URI type detector.""" + global _custom_uri_type_detectors + original_len = len(_custom_uri_type_detectors) + _custom_uri_type_detectors = [ + (p, f) for p, f in _custom_uri_type_detectors if f is not func + ] + return len(_custom_uri_type_detectors) != original_len + + +def _run_custom_detectors(uri: str) -> Optional[str]: + """Run custom detectors in priority order.""" + for _, detector in _custom_detectors: + try: + result = detector(uri) + if result is not None: + return result + except Exception: + pass + return None + + +def _run_custom_uri_type_detectors(uri: str) -> Optional[str]: + """Run custom URI type detectors in priority order.""" + for _, detector in _custom_uri_type_detectors: + try: + result = detector(uri) + if result is not None: + return result + except Exception: + pass + return None + + +def detect_uri_type(uri: str) -> str: + """Detect if URI is local (posix), remote (cloud), reference, or custom.""" + # Run custom URI type detectors first + custom_result = _run_custom_uri_type_detectors(uri) + if custom_result is not None: + return custom_result + + # Built-in detection + lower = uri.lower() + if lower.startswith("reference://"): + return "reference" + if lower.startswith("file://"): + return "posix" + if "://" in uri: + return "cloud" + return "posix" + + +def is_http_url(uri: str) -> bool: + return uri.lower().startswith(("http://", "https://")) + + +def is_remote_uri(path: str) -> bool: + return isinstance(path, str) and "://" in path and not path.startswith("file://") + + +def is_reference_uri(uri: str) -> bool: + return uri.lower().startswith("reference://") + + +def looks_like_opendap_url(uri: str) -> bool: + """Pure string heuristics for OPeNDAP-style URLs.""" + u = uri.lower() + return any( + s in u + for s in ( + "dods", + "opendap", + "thredds/dods", + "thredds/dodsC", + "thredds/dap4", + ".dods?", + "?dap4", + ) + ) + + +def _detect_from_uri_pattern(lower_uri: str) -> Optional[str]: + """Detect engine from URI patterns without I/O.""" + # Reference URIs -> zarr (Kerchunk) + if is_reference_uri(lower_uri): + return "zarr" + + # Zarr detection by extension + if lower_uri.endswith(".zarr") or ".zarr/" in lower_uri: + return "zarr" + + # THREDDS NCSS with explicit accept format (overrides file extension) + if "/ncss/" in lower_uri or "/ncss?" in lower_uri: + if "accept=netcdf3" in lower_uri: + return "scipy" + if "accept=netcdf4" in lower_uri or "accept=netcdf" in lower_uri: + return "h5netcdf" + + # OPeNDAP / DODS URL detection + opendap_patterns = ("/dodsc/", "/dods/", "/opendap/", "thredds/dodsc") + if any(t in lower_uri for t in opendap_patterns): + return "netcdf4" + + return None + + +def _detect_zarr_directory(fs: Any, path: str) -> bool: + """Check if path is a Zarr directory store.""" + try: + if fs.isdir(path): + base = path.rstrip("/") + return bool(fs.exists(f"{base}/.zgroup") or fs.exists(f"{base}/.zattrs")) + except (FileNotFoundError, OSError): + pass + return False + + +def _read_magic_bytes(fs: Any, path: str) -> Any: + """Read magic bytes from file, handling errors.""" + # Lazy import: aiohttp only needed during actual I/O + try: + from aiohttp import ClientResponseError + except ImportError: + ClientResponseError = Exception # type: ignore + + try: + return fs.cat_file(path, start=0, end=512) + except ClientResponseError as e: + content_desc = getattr(e, "headers", {}).get("Content-Description", "").lower() + if "dods" in content_desc: + return b"__OPENDAP__" + return None + except (FileNotFoundError, IsADirectoryError, OSError): + return None + except Exception: + return None + + +def _detect_from_magic_bytes(header: bytes, lower_path: str) -> Engine: + """Detect engine from magic bytes and file extension.""" + # GRIB detection + if b"GRIB" in header or lower_path.endswith((".grib", ".grb", ".grb2", ".grib2")): + return "cfgrib" + + # NetCDF3 (Classic) + if header.startswith(b"CDF"): + return "scipy" + + # HDF5 / NetCDF4 + if header.startswith(b"\x89HDF\r\n\x1a\n"): + return "h5netcdf" + + # GeoTIFF + if header.startswith((b"II*\x00", b"MM\x00*")): + return "rasterio" + if lower_path.endswith((".tif", ".tiff")): + return "rasterio" + + return "unknown" + + +def detect_engine(uri: str, storage_options: Optional[Dict] = None) -> str: + """ + Unified detection using fsspec. + + Runs custom detectors first (by priority), then built-in detection. + """ + # Use cached version if no storage_options + if not storage_options: + return _detect_engine_cached(uri) + + # Otherwise, detect without caching + return _detect_engine_impl(uri, storage_options) + + +@lru_cache(maxsize=256) +def _detect_engine_cached(uri: str) -> str: + """Cached version for URIs without storage_options.""" + return _detect_engine_impl(uri, None) + + +def _detect_engine_impl(uri: str, storage_options: Optional[Dict]) -> str: + """Actual detection logic.""" + # 1. Run custom detectors first + custom_result = _run_custom_detectors(uri) + if custom_result is not None: + return custom_result + + # 2. Pattern-based detection (no I/O) + lower_uri = uri.lower() + pattern_result = _detect_from_uri_pattern(lower_uri) + if pattern_result is not None: + return pattern_result + + # 3. Filesystem-based detection — lazy import fsspec only when needed + import fsspec # noqa: PLC0415 + + fs, path = fsspec.core.url_to_fs(uri, **(storage_options or {})) + lower_path = path.lower() + + # Check for Zarr directory + if _detect_zarr_directory(fs, path): + return "zarr" + + # 4. Magic byte detection + header = _read_magic_bytes(fs, path) + if header is None: + return "unknown" + if header == b"__OPENDAP__": + return "netcdf4" + + return _detect_from_magic_bytes(header, lower_path) diff --git a/src/xarray_prism/_registry.py b/src/xarray_prism/_registry.py new file mode 100644 index 0000000..8c65263 --- /dev/null +++ b/src/xarray_prism/_registry.py @@ -0,0 +1,45 @@ +"""Registry for custom backends for xarray_prism.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Callable, Dict, Optional + +if TYPE_CHECKING: + import xarray as xr + +OpenFunc = Callable[..., "xr.Dataset"] + + +class BackendRegistry: + """Registry for custom backends.""" + + def __init__(self): + self._handlers: Dict[tuple, OpenFunc] = {} + + def register( + self, + engine: str, + uri_type: str = "both", + ) -> Callable[[OpenFunc], OpenFunc]: + """Decorator to register a custom open function.""" + + def decorator(func: OpenFunc) -> OpenFunc: + if uri_type == "both": + self._handlers[(engine, "posix")] = func + self._handlers[(engine, "cloud")] = func + else: + self._handlers[(engine, uri_type)] = func + return func + + return decorator + + def get(self, engine: str, uri_type: str) -> Optional[OpenFunc]: + """Get handler for engine + uri_type combo.""" + return self._handlers.get((engine, uri_type)) + + def has(self, engine: str, uri_type: str) -> bool: + """Check if handler exists.""" + return (engine, uri_type) in self._handlers + + +registry = BackendRegistry() diff --git a/src/xarray_prism/_version.py b/src/xarray_prism/_version.py new file mode 100644 index 0000000..0688720 --- /dev/null +++ b/src/xarray_prism/_version.py @@ -0,0 +1,3 @@ +"""Version information for xarray-prism.""" + +__version__ = "2602.0.0" diff --git a/src/xarray_prism/backends/__init__.py b/src/xarray_prism/backends/__init__.py new file mode 100644 index 0000000..11335e8 --- /dev/null +++ b/src/xarray_prism/backends/__init__.py @@ -0,0 +1,4 @@ +from .cloud import open_cloud +from .posix import open_posix + +__all__ = ["open_posix", "open_cloud"] diff --git a/src/xarray_prism/backends/cloud.py b/src/xarray_prism/backends/cloud.py new file mode 100644 index 0000000..86db2b7 --- /dev/null +++ b/src/xarray_prism/backends/cloud.py @@ -0,0 +1,194 @@ +"""Cloud backend for xarray datasets.""" + +from __future__ import annotations + +import logging +import os +import sys +import tempfile +from hashlib import md5 +from pathlib import Path +from typing import Any, Dict, Optional +from urllib.parse import urlparse + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + + +def _get_cache_dir(storage_options: Optional[Dict] = None) -> Path: + """Get cache directory.""" + env_cache = os.environ.get("XARRAY_PRISM_CACHE") + # 1. Environment variable + if env_cache: + cache_root = Path(env_cache) + cache_root.mkdir(parents=True, exist_ok=True) + return cache_root + # 2. User-defined storage option + if storage_options: + user_cache = storage_options.get("simplecache", {}).get("cache_storage") + if user_cache: + cache_root = Path(user_cache) + cache_root.mkdir(parents=True, exist_ok=True) + return cache_root + # 3. Default temp directory + cache_root = Path(tempfile.gettempdir()) / "xarray-prism-cache" + cache_root.mkdir(parents=True, exist_ok=True) + return cache_root + + +def _cache_remote_file( + uri: str, + engine: str, + storage_options: Optional[Dict] = None, + show_progress: bool = True, + lines_above: int = 0, +) -> str: + """Cache remote file to local.""" + import fsspec + + from ..utils import ProgressBar + + cache_root = _get_cache_dir(storage_options) + parsed = urlparse(uri) + filename = Path(parsed.path).name + cache_name = md5(uri.encode()).hexdigest() + "_" + filename + local_path = cache_root / cache_name + + if local_path.exists(): + if show_progress and lines_above > 0: + for _ in range(lines_above): + sys.stdout.write("\033[A") + sys.stdout.write("\033[K") + sys.stdout.flush() + return str(local_path) + + extra_lines = 0 + if show_progress: + fmt = "GRIB" if engine == "cfgrib" else "NetCDF3" + logger.warning(f"Remote {fmt} requires full file download") + extra_lines = 2 + + fs, path = fsspec.core.url_to_fs(uri, **(storage_options or {})) + + if show_progress: + size = 0 + try: + size = fs.size(path) or 0 + except Exception: + pass + + display_name = Path(parsed.path).name + if len(display_name) > 35: + display_name = display_name[:32] + "..." + desc = f" Downloading {display_name}" + + with ProgressBar(desc=desc, lines_above=lines_above + extra_lines) as progress: + progress.set_size(size) + with fs.open(path, "rb") as src, open(local_path, "wb") as dst: + while True: + chunk = src.read(512 * 1024) + if not chunk: + break + dst.write(chunk) + progress.update(len(chunk)) + else: + fs.get(path, str(local_path)) + + return str(local_path) + + +def open_cloud( + uri: str, + engine: str, + drop_variables: Optional[Any] = None, + backend_kwargs: Optional[Dict[str, Any]] = None, + show_progress: bool = True, + lines_above: int = 0, + **kwargs, +) -> Any: + """Open remote file with detected engine.""" + import xarray as xr + + from ..utils import gdal_env + + storage_options = kwargs.pop("storage_options", None) + + def _clear_lines(): + """Clear the detection message lines.""" + if lines_above > 0: + for _ in range(lines_above): + sys.stdout.write("\033[A") + sys.stdout.write("\033[K") + sys.stdout.flush() + + bk = backend_kwargs or None + + # GRIB: cache locally + if engine == "cfgrib": + local_path = _cache_remote_file( + uri, engine, storage_options, show_progress, lines_above + ) + return xr.open_dataset( + local_path, + engine=engine, + drop_variables=drop_variables, + backend_kwargs=bk, + **kwargs, + ) + + # NetCDF3: cache locally + if engine == "scipy": + local_path = _cache_remote_file( + uri, engine, storage_options, show_progress, lines_above + ) + return xr.open_dataset( + local_path, + engine=engine, + drop_variables=drop_variables, + backend_kwargs=bk, + **kwargs, + ) + + # NetCDF4 (OPeNDAP) + if engine == "netcdf4": + ds = xr.open_dataset( + uri, + engine=engine, + drop_variables=drop_variables, + backend_kwargs=bk, + **kwargs, + ) + _clear_lines() + return ds + + # Rasterio: use GDAL env vars (doesn't support storage_options) + if engine == "rasterio": + from ..utils import sanitize_rasterio_kwargs + + rasterio_kwargs = sanitize_rasterio_kwargs(kwargs) + with gdal_env(storage_options): + ds = xr.open_dataset( + uri, + engine=engine, + drop_variables=drop_variables, + backend_kwargs=bk, + **rasterio_kwargs, + ) + _clear_lines() + return ds + + # Zarr, h5netcdf + ds = xr.open_dataset( + uri, + engine=engine, + drop_variables=drop_variables, + backend_kwargs=bk, + storage_options=storage_options, + **kwargs, + ) + if engine == "h5netcdf": + from ..utils import sanitize_dataset_attrs + + return sanitize_dataset_attrs(ds) + _clear_lines() + return ds diff --git a/src/xarray_prism/backends/posix.py b/src/xarray_prism/backends/posix.py new file mode 100644 index 0000000..fd7e67d --- /dev/null +++ b/src/xarray_prism/backends/posix.py @@ -0,0 +1,34 @@ +"""Module to open local files using xarray +with a specified engine.""" + +from __future__ import annotations + +from typing import Any, Dict, Optional + + +def open_posix( + uri: str, + engine: str, + drop_variables: Optional[Any] = None, + backend_kwargs: Optional[Dict[str, Any]] = None, + **kwargs, +) -> Any: + """Open local file with detected engine.""" + import xarray as xr + + if engine == "rasterio": + from ..utils import sanitize_rasterio_kwargs + + kwargs = sanitize_rasterio_kwargs(kwargs) + ds = xr.open_dataset( + uri, + engine=engine, + drop_variables=drop_variables, + backend_kwargs=backend_kwargs or None, + **kwargs, + ) + if engine == "h5netcdf": + from ..utils import sanitize_dataset_attrs + + return sanitize_dataset_attrs(ds) + return ds diff --git a/src/xarray_prism/entrypoint.py b/src/xarray_prism/entrypoint.py new file mode 100644 index 0000000..03bf27e --- /dev/null +++ b/src/xarray_prism/entrypoint.py @@ -0,0 +1,261 @@ +"""Xarray Prism backend entrypoint with +automatic format detection.""" + +from __future__ import annotations + +import logging +import os +import sys +from pathlib import Path +from typing import Any, Dict, Optional, Tuple + +from xarray.backends import BackendEntrypoint + +from ._detection import ( + detect_engine, + detect_uri_type, + is_http_url, + is_reference_uri, + looks_like_opendap_url, +) +from ._registry import registry + +logger = logging.getLogger(__name__) + + +def _supports_hyperlinks() -> bool: + """Check if terminal supports OSC 8 hyperlinks. + based on https://github.com/Alhadis/OSC8-Adoption + most of frequently used terminals are covered. + """ + # Not a TTY, no hyperlinks + if not sys.stderr.isatty(): + return False + # Windows Terminal + if os.environ.get("WT_SESSION"): + return True + if os.environ.get("TERM_PROGRAM") in ("vscode", "iTerm.app", "WezTerm"): + return True + # Modern terminals with truecolor support + if os.environ.get("COLORTERM") == "truecolor": + return True + # KDE Konsole + if os.environ.get("KONSOLE_VERSION"): + return True + # GNOME Terminal + if os.environ.get("VTE_VERSION"): + try: + return int(os.environ["VTE_VERSION"]) >= 5000 + except ValueError: + pass + return False + + +def _make_link(url: str, text: str) -> str: + """Create clickable link if supported, otherwise full URL.""" + if _supports_hyperlinks(): + return f"\033]8;;{url}\033\\{text}\033]8;;\033\\" + return url + + +class PrismBackendEntrypoint(BackendEntrypoint): + """ + Multi-format backend with automatic engine detection for climate data. + + Handling: + - local files: direct xarray open + - remote GRIB: cache locally via fsspec simplecache + - remote NetCDF3: fsspec file object + - remote Zarr/NetCDF4/OPeNDAP: native remote support + """ + + description = ( + "Prism multi-format/multi-storage engine with auto-detection" + "and entrypoint registry for new formats and URI types." + ) + + url = "https://github.com/freva-org/xarray-prism" + open_dataset_parameters = ("filename_or_obj", "drop_variables") + + ENGINE_MAP: Dict[str, str] = { + "zarr": "zarr", + "cfgrib": "cfgrib", + "h5netcdf": "h5netcdf", + "scipy": "scipy", + "rasterio": "rasterio", + "netcdf4": "netcdf4", + } + + def open_dataset( + self, + filename_or_obj: Any, + *, + drop_variables: Optional[Any] = None, + **kwargs: Any, + ) -> Any: + """Xarray Generic function: Open dataset with automatic format detection.""" + if not isinstance(filename_or_obj, (str, os.PathLike)): + raise ValueError( + f"Prism backend requires a file path or URL, " + f"got {type(filename_or_obj).__name__}" + ) + + uri = str(filename_or_obj) + + is_remote = "://" in uri and not uri.startswith("file://") + lines_printed = 0 + + if is_remote: + logger.info("Detecting format...") + sys.stdout.flush() + + engine, uri_type = self._detect(uri, **kwargs) + + if is_remote and engine: + logger.info(f"Detected: {engine}") + lines_printed = 1 + sys.stdout.write("\r" + " " * 25 + "\r") + sys.stdout.flush() + + if engine is None: + if is_remote: + from urllib.parse import urlencode, urlparse + + filename = Path(urlparse(uri).path).name + if len(filename) > 50: + filename = filename[:47] + "..." + + issue_params = urlencode( + { + "title": f"[Detection] Cannot detect format: {filename}", + "body": ( + f"**File/URL:**\n```\n{uri}\n```\n\n" + f"**Expected format:** (e.g., NetCDF4, GRIB, Zarr)\n\n" + ), + "labels": "bug", + } + ) + + issue_url = f"{self.url}/issues/new?{issue_params}" + link = _make_link(issue_url, "🔗 Click here to report") + + raise ValueError( + f"Xarray Prism: cannot detect format for {uri!r}\n\n" + f" 💡 Help us improve! This takes 10 seconds:\n" + f" {link}\n\n" + f" Or specify manually if you know the engine already:\n" + f" xarray.open_dataset(uri, engine='ENGINE_NAME')\n" + ) + else: + raise ValueError( + f"Xarray Prism: cannot detect format for {uri!r}\n" + f" Specify manually: " + f" xarray.open_dataset(uri, engine='ENGINE_NAME')" + ) + + # Pop prism-specific kwargs + kwargs.pop("xarray_engine", None) + backend_kwargs = kwargs.pop("backend_kwargs", None) or {} + + # Check custom registry first (handles custom uri_types too) + custom_handler = registry.get(engine, uri_type) + if custom_handler: + return custom_handler( + uri, + drop_variables=drop_variables, + backend_kwargs=backend_kwargs, + **kwargs, + ) + # Route to POSIX or cloud handler (built-in uri_types only) + # Lazy imports: backends only loaded when actually opening a dataset + if uri_type == "posix": + from .backends.posix import open_posix + + return open_posix( + uri, + engine=engine, + drop_variables=drop_variables, + backend_kwargs=backend_kwargs, + **kwargs, + ) + elif uri_type in ("cloud", "reference"): + from .backends.cloud import open_cloud + + return open_cloud( + uri, + engine=engine, + drop_variables=drop_variables, + backend_kwargs=backend_kwargs, + lines_above=lines_printed, + **kwargs, + ) + else: + # Custom uri_type without registered handler + raise ValueError( + f"No handler registered for uri_type={uri_type!r} " + f"with engine={engine!r}. " + f"Use @registry.register('{engine}', uri_type='{uri_type}')" + f" to add one." + ) + + def _detect(self, uri: str, **kwargs: Any) -> Tuple[Optional[str], str]: + """Detect engine and URI type.""" + uri_type = detect_uri_type(uri) + # Get storage_options for detection + storage_options = kwargs.get("storage_options") + + # Allow explicit override + forced = kwargs.get("xarray_engine") + if forced and forced in self.ENGINE_MAP: + return self.ENGINE_MAP[forced], uri_type + + # Reference URIs -> zarr + if is_reference_uri(uri): + return "zarr", uri_type + + # OPeNDAP detection (before magic bytes) + if is_http_url(uri) and looks_like_opendap_url(uri): + return "netcdf4", uri_type + + # Magic byte detection (with storage_options) + detected = detect_engine(uri, storage_options=storage_options) + if detected == "unknown": + return None, uri_type + + engine = self.ENGINE_MAP.get(detected, detected) + + if detected not in self.ENGINE_MAP: + import warnings + + warnings.warn( + f"Detected engine '{detected}' is not a built-in engine. " + f"Ensure it's registered with xarray or use registry.register() " + f"to add a custom handler.", + UserWarning, + stacklevel=3, + ) + + return engine, uri_type + + def guess_can_open(self, filename_or_obj: Any) -> bool: + """Xarray Generic Function: cheap check without I/O.""" + if not isinstance(filename_or_obj, (str, os.PathLike)): + return False + + u = str(filename_or_obj).lower() + + # Zarr + if u.endswith(".zarr") or ".zarr/" in u: + return True + if u.startswith("reference://"): + return True + # common climate data formats by extension + if u.endswith( + (".grib", ".grib2", ".grb", ".grb2", ".tif", ".tiff", ".nc", ".nc4") + ): + return True + # OPeNDAP + if is_http_url(u) and looks_like_opendap_url(u): + return True + + return False diff --git a/src/xarray_prism/utils.py b/src/xarray_prism/utils.py new file mode 100644 index 0000000..cb387e3 --- /dev/null +++ b/src/xarray_prism/utils.py @@ -0,0 +1,243 @@ +"""Utility functions""" + +import logging +import os +import sys +from contextlib import contextmanager +from typing import Any, Dict, Iterator, Optional + +logger = logging.getLogger(__name__) + +STORAGE_OPTIONS_TO_GDAL: Dict[str, str] = { + "key": "AWS_ACCESS_KEY_ID", + "secret": "AWS_SECRET_ACCESS_KEY", + "token": "AWS_SESSION_TOKEN", + "aws_access_key_id": "AWS_ACCESS_KEY_ID", + "aws_secret_access_key": "AWS_SECRET_ACCESS_KEY", + "aws_session_token": "AWS_SESSION_TOKEN", + "region": "AWS_DEFAULT_REGION", + "region_name": "AWS_DEFAULT_REGION", + "endpoint_url": "AWS_S3_ENDPOINT", + "client_kwargs.endpoint_url": "AWS_S3_ENDPOINT", + "anon": "AWS_NO_SIGN_REQUEST", + "profile": "AWS_PROFILE", +} + + +class ProgressBar: + """Progress bar to display cache download progress.""" + + def __init__( + self, desc: str = "Downloading", width: int = 40, lines_above: int = 0 + ): + self.desc = desc + self.width = width + self._total = 0 + self._current = 0 + self._spinner = 0 + self._spinner_chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏" + self._last_line_len = 0 + self._lines_above = lines_above + + def set_size(self, size: int) -> None: + self._total = size if size else 0 + + def update(self, inc: int) -> None: + self._current += inc + self._render() + + def _render(self) -> None: + mb = self._current / 1024**2 + + if self._total == 0: + spinner = self._spinner_chars[self._spinner % len(self._spinner_chars)] + self._spinner += 1 + line = f"{self.desc} {spinner} {mb:.1f} MB" + else: + pct = min(self._current / self._total, 1.0) + filled = int(self.width * pct) + bar = "█" * filled + "░" * (self.width - filled) + total_mb = self._total / 1024**2 + line = f"{self.desc} |{bar}| {pct * 100:.0f}% ({mb:.1f}/{total_mb:.1f} MB)" + + clear = " " * self._last_line_len + sys.stdout.write(f"\r{clear}\r{line}") + sys.stdout.flush() + self._last_line_len = len(line) + + def __enter__(self): + return self + + def __exit__(self, *args): + # Clear the progress line + sys.stdout.write("\r" + " " * self._last_line_len + "\r") + + # Clear detection + warning messages + for _ in range(self._lines_above): + sys.stdout.write("\033[A") + sys.stdout.write("\033[K") + + sys.stdout.flush() + + +def _flatten_dict(d: Dict[str, Any], parent_key: str = "") -> Dict[str, Any]: + """Flatten nested dicts with dot notation.""" + items: list = [] + for k, v in d.items(): + new_key = f"{parent_key}.{k}" if parent_key else k + if isinstance(v, dict): + items.extend(_flatten_dict(v, new_key).items()) + else: + items.append((new_key, v)) + return dict(items) + + +def _convert_value(key: str, value: Any) -> Optional[str]: + """Convert Python values to GDAL env var format.""" + if value is None: + return None + if key == "AWS_NO_SIGN_REQUEST": + return "YES" if value else "NO" + if key == "AWS_S3_ENDPOINT": + # important: GDAL expects host:port, not full URL + s = str(value) + if s.startswith("https://"): + return s[8:] + if s.startswith("http://"): + return s[7:] + return s + return str(value) + + +@contextmanager +def gdal_env(storage_options: Optional[Dict[str, Any]] = None) -> Iterator[None]: + """ + Converts fsspec-style storage_options to GDAL environment variables for + rasterio S3 access, since rasterio does not accept storage_options directly. + """ + if not storage_options: + yield + return + + flat_opts = _flatten_dict(storage_options) + + original_env: Dict[str, Optional[str]] = {} + set_vars: list = [] + + endpoint_url = flat_opts.get("client_kwargs.endpoint_url") or flat_opts.get( + "endpoint_url" + ) + + try: + for opt_key, gdal_key in STORAGE_OPTIONS_TO_GDAL.items(): + if opt_key in flat_opts: + value = _convert_value(gdal_key, flat_opts[opt_key]) + if value is not None: + original_env[gdal_key] = os.environ.get(gdal_key) + os.environ[gdal_key] = value + set_vars.append(gdal_key) + + if endpoint_url: + if "AWS_HTTPS" not in set_vars: + original_env["AWS_HTTPS"] = os.environ.get("AWS_HTTPS") + os.environ["AWS_HTTPS"] = ( + "YES" if endpoint_url.startswith("https://") else "NO" + ) + set_vars.append("AWS_HTTPS") + if "AWS_VIRTUAL_HOSTING" not in set_vars: + original_env["AWS_VIRTUAL_HOSTING"] = os.environ.get( + "AWS_VIRTUAL_HOSTING" + ) + os.environ["AWS_VIRTUAL_HOSTING"] = "FALSE" + set_vars.append("AWS_VIRTUAL_HOSTING") + + yield + + finally: + for gdal_key in set_vars: + original = original_env.get(gdal_key) + if original is None: + os.environ.pop(gdal_key, None) + else: + os.environ[gdal_key] = original + + +# kwargs that rasterio/rioxarray does not accept at all +_RASTERIO_UNSUPPORTED_KWARGS = frozenset( + [ + "use_cftime", + "decode_cf", + "decode_times", + "decode_timedelta", + "use_default_fill_value", + "cftime_variables", + ] +) + +# kwargs that rasterio/rioxarray accepts but with restricted values; +# maps kwarg name -> (allowed_value, human-readable reason) +_RASTERIO_RESTRICTED_KWARGS: Dict[str, Any] = { + "decode_coords": ( + "all", + "rioxarray only supports decode_coords='all'; overriding your value", + ), +} + + +def sanitize_rasterio_kwargs(kwargs: Dict[str, Any]) -> Dict[str, Any]: + """ + Remove or adjust kwargs that are incompatible with the rasterio/rioxarray + backend. + """ + sanitized = dict(kwargs) + + for key in _RASTERIO_UNSUPPORTED_KWARGS: + if key in sanitized: + logger.warning( + "dropping unsupported kwarg " + "'%s=%r' (rioxarray does not accept this parameter).", + key, + sanitized.pop(key), + ) + + for key, (allowed, reason) in _RASTERIO_RESTRICTED_KWARGS.items(): + if key in sanitized and sanitized[key] != allowed: + logger.warning( + "'%s=%r' is not supported — " "%s (using '%s' instead).", + key, + sanitized[key], + reason, + allowed, + ) + sanitized[key] = allowed + + return sanitized + + +def _clean_surrogates_str(s: str) -> str: + return s.encode("utf-8", "replace").decode("utf-8") + + +def _clean_attr_obj(obj): + if isinstance(obj, str): + return _clean_surrogates_str(obj) + if isinstance(obj, dict): + return {k: _clean_attr_obj(v) for k, v in obj.items()} + if isinstance(obj, list): + return [_clean_attr_obj(v) for v in obj] + if isinstance(obj, tuple): + return tuple(_clean_attr_obj(v) for v in obj) + return obj + + +def sanitize_dataset_attrs(ds): + # global attrs + if ds.attrs: + ds.attrs = _clean_attr_obj(dict(ds.attrs)) + + # variable attrs only (cheap, no data touched) + for var in ds.variables.values(): + if var.attrs: + var.attrs = _clean_attr_obj(dict(var.attrs)) + + return ds diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..a4d929c --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,168 @@ +from __future__ import annotations + +import os +import tempfile +from pathlib import Path +from typing import Generator + +import pytest + +PROJECT_ROOT = Path(__file__).parent.parent +DATA_DIR = PROJECT_ROOT / "data" + +MINIO_ENDPOINT = os.environ.get("MINIO_ENDPOINT", "http://localhost:9000") +THREDDS_ENDPOINT = os.environ.get("THREDDS_ENDPOINT", "http://localhost:8088") + +S3_ENDPOINT_URL = MINIO_ENDPOINT +S3_ACCESS_KEY = os.environ.get("MINIO_ACCESS_KEY", "minioadmin") +S3_SECRET_KEY = os.environ.get("MINIO_SECRET_KEY", "minioadmin") +S3_BUCKET = "testdata" + + +@pytest.fixture(scope="session") +def data_dir() -> Path: + """Return the path to the test data directory.""" + return DATA_DIR + + +@pytest.fixture(scope="session") +def s3_storage_options() -> dict: + """Return S3 storage options for MinIO.""" + return { + "key": S3_ACCESS_KEY, + "secret": S3_SECRET_KEY, + "client_kwargs": {"endpoint_url": S3_ENDPOINT_URL}, + } + + +@pytest.fixture +def s3_env(s3_storage_options: dict) -> Generator[dict, None, None]: + """ + Set AWS environment variables for S3 access. + + This is needed because detect_engine uses fsspec without storage_options, + so credentials must come from environment variables. + + Also sets GDAL-specific variables for rasterio/rioxarray. + """ + endpoint_url = s3_storage_options["client_kwargs"]["endpoint_url"] + # Extract host:port from endpoint URL for GDAL + endpoint_host = endpoint_url.replace("http://", "").replace("https://", "") + + env_vars = { + # Standard AWS env vars + "AWS_ACCESS_KEY_ID": s3_storage_options["key"], + "AWS_SECRET_ACCESS_KEY": s3_storage_options["secret"], + "AWS_ENDPOINT_URL": endpoint_url, + # GDAL-specific env vars + "AWS_S3_ENDPOINT": endpoint_host, + "AWS_VIRTUAL_HOSTING": "FALSE", + "AWS_HTTPS": "NO", + } + + for key, value in env_vars.items(): + os.environ[key] = value + + try: + yield s3_storage_options + finally: + for key in env_vars: + os.environ.pop(key, None) + + +@pytest.fixture(scope="session") +def s3_endpoint() -> str: + """Return the S3 endpoint URL.""" + return S3_ENDPOINT_URL + + +@pytest.fixture(scope="session") +def thredds_endpoint() -> str: + """Return the THREDDS server endpoint.""" + return THREDDS_ENDPOINT + + +@pytest.fixture +def temp_cache_dir() -> Generator[Path, None, None]: + """Create a temporary directory for GRIB cache.""" + with tempfile.TemporaryDirectory() as tmpdir: + yield Path(tmpdir) + + +@pytest.fixture +def sample_netcdf_path(data_dir: Path) -> Path: + """Return path to a sample NetCDF4 file.""" + nc_path = ( + data_dir + / "model/global/cmip6/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/amip/r1i1p1f1" + / "Amon/ua/gn/v20191108/ua_Amon_ACCESS-CM2_amip_r1i1p1f1_gn_197001-201512.nc" + ) + return nc_path + + +@pytest.fixture +def sample_grib_path(data_dir: Path) -> Path: + """Return path to a sample GRIB file.""" + return data_dir / "grib_data/gfs/2025/11/25/test.grib2" + + +@pytest.fixture +def sample_geotiff_path(data_dir: Path) -> Path: + """Return path to a sample GeoTIFF file.""" + return data_dir / "geodata/TCD/2021/10m/districts/DE111/TCD_S2021_R10m_DE111.tif" + + +@pytest.fixture +def sample_cordex_path(data_dir: Path) -> Path: + """Return path to a sample CORDEX NetCDF file.""" + return ( + data_dir + / "model/regional/cordex/output/EUR-11/GERICS/NCC-NorESM1-M/rcp85/r1i1p1" + / "GERICS-REMO2015/v1/3hr/pr/v20181212" + / "pr_EUR-11_NCC-NorESM1-M_rcp85_r1i1p1_GERICS-REMO2015_v2_3hr_200701020130-200701020430.nc" + ) + + +def pytest_configure(config): + """Register custom markers.""" + config.addinivalue_line( + "markers", "requires_minio: mark test as requiring MinIO service" + ) + config.addinivalue_line( + "markers", "requires_thredds: mark test as requiring THREDDS service" + ) + config.addinivalue_line( + "markers", "requires_data: mark test as requiring local test data" + ) + + +def pytest_collection_modifyitems(config, items): + """Skip tests based on available services and data.""" + import socket + + def is_service_available(host: str, port: int, timeout: float = 1.0) -> bool: + """Check if a service is available.""" + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + result = sock.connect_ex((host, port)) + sock.close() + return result == 0 + except (socket.error, OSError): + return False + + minio_available = is_service_available("localhost", 9000) + thredds_available = is_service_available("localhost", 8088) + data_available = DATA_DIR.exists() + + skip_minio = pytest.mark.skip(reason="MinIO service not available") + skip_thredds = pytest.mark.skip(reason="THREDDS service not available") + skip_data = pytest.mark.skip(reason="Test data directory not found") + + for item in items: + if "requires_minio" in item.keywords and not minio_available: + item.add_marker(skip_minio) + if "requires_thredds" in item.keywords and not thredds_available: + item.add_marker(skip_thredds) + if "requires_data" in item.keywords and not data_available: + item.add_marker(skip_data) diff --git a/tests/test_backends.py b/tests/test_backends.py new file mode 100644 index 0000000..9aa1beb --- /dev/null +++ b/tests/test_backends.py @@ -0,0 +1,203 @@ +"""Tests for POSIX and cloud backends.""" + +from __future__ import annotations + +import os +from pathlib import Path + +import pytest +import xarray as xr + +from xarray_prism.backends import open_cloud, open_posix + + +class TestPosixBackend: + """Tests for local filesystem backend.""" + + @pytest.mark.requires_data + def test_open_netcdf4_local(self, sample_netcdf_path: Path): + """Open a local NetCDF4 file.""" + if not sample_netcdf_path.exists(): + pytest.skip(f"Test file not found: {sample_netcdf_path}") + + ds = open_posix(str(sample_netcdf_path), engine="h5netcdf") + assert isinstance(ds, xr.Dataset) + assert len(ds.data_vars) > 0 + ds.close() + + @pytest.mark.requires_data + def test_open_grib_local(self, sample_grib_path: Path): + """Open a local GRIB file.""" + if not sample_grib_path.exists(): + pytest.skip(f"Test file not found: {sample_grib_path}") + ds = open_posix(str(sample_grib_path), engine="cfgrib") + assert isinstance(ds, xr.Dataset) + ds.close() + + @pytest.mark.requires_data + def test_open_geotiff_local(self, sample_geotiff_path: Path): + """Open a local GeoTIFF file.""" + if not sample_geotiff_path.exists(): + pytest.skip(f"Test file not found: {sample_geotiff_path}") + + ds = open_posix(str(sample_geotiff_path), engine="rasterio") + assert isinstance(ds, xr.Dataset) + ds.close() + + @pytest.mark.requires_data + def test_open_with_drop_variables(self, sample_netcdf_path: Path): + """Test drop_variables parameter.""" + if not sample_netcdf_path.exists(): + pytest.skip(f"Test file not found: {sample_netcdf_path}") + + ds_full = open_posix(str(sample_netcdf_path), engine="h5netcdf") + var_name = list(ds_full.data_vars)[0] if ds_full.data_vars else None + ds_full.close() + + if var_name: + ds_partial = open_posix( + str(sample_netcdf_path), + engine="h5netcdf", + drop_variables=[var_name], + ) + assert var_name not in ds_partial.data_vars + ds_partial.close() + + @pytest.mark.requires_data + def test_open_with_backend_kwargs(self, sample_netcdf_path: Path): + """Test backend_kwargs passthrough.""" + if not sample_netcdf_path.exists(): + pytest.skip(f"Test file not found: {sample_netcdf_path}") + + ds = open_posix( + str(sample_netcdf_path), + engine="h5netcdf", + backend_kwargs={"phony_dims": "sort"}, + ) + assert isinstance(ds, xr.Dataset) + ds.close() + + +class TestCloudBackend: + """Tests for remote/cloud backend.""" + + @pytest.mark.requires_minio + def test_open_netcdf_from_s3(self, s3_env: dict): + """Open a NetCDF file from S3 (MinIO).""" + uri = "s3://testdata/pr_EUR-11_NCC-NorESM1-M_rcp85_r1i1p1_GERICS-REMO2015_v2_3hr_200701020130-200701020430.nc" + + try: + ds = open_cloud( + uri, + engine="h5netcdf", + storage_options=s3_env, + ) + assert isinstance(ds, xr.Dataset) + ds.close() + except FileNotFoundError: + pytest.skip("Test file not found in MinIO") + except Exception as e: + err_str = str(e).lower() + if any(x in err_str for x in ("s3fs", "credentials", "nosuchbucket")): + pytest.skip(f"S3 setup issue: {e}") + raise + + @pytest.mark.requires_minio + def test_open_grib_from_s3_with_cache(self, s3_env: dict, temp_cache_dir: Path): + """Open a GRIB file from S3 with local caching.""" + uri = "s3://testdata/test.grib2" + os.environ["XARRAY_PRISM_CACHE"] = str(temp_cache_dir) + + try: + ds = open_cloud( + uri, + engine="cfgrib", + storage_options=s3_env, + ) + assert isinstance(ds, xr.Dataset) + ds.close() + cached_files = list(temp_cache_dir.glob("*")) + assert len(cached_files) >= 0 + except FileNotFoundError: + pytest.skip("Test file not found in MinIO") + except Exception as e: + err_str = str(e).lower() + if any( + x in err_str for x in ("cfgrib", "s3fs", "credentials", "nosuchbucket") + ): + pytest.skip(f"S3/cfgrib setup issue: {e}") + raise + finally: + os.environ.pop("XARRAY_PRISM_CACHE", None) + + @pytest.mark.requires_minio + def test_open_geotiff_from_s3(self, s3_env: dict): + """Open a GeoTIFF from S3.""" + uri = "s3://testdata/TCD_S2021_R10m_DE111.tif" + + try: + ds = open_cloud( + uri, + engine="rasterio", + storage_options=s3_env, + ) + assert isinstance(ds, xr.Dataset) + ds.close() + except FileNotFoundError: + pytest.skip("Test file not found in MinIO") + except Exception as e: + err_str = str(e).lower() + if any( + x in err_str + for x in ("rasterio", "s3fs", "credentials", "nosuchbucket") + ): + pytest.skip(f"S3/rasterio setup issue: {e}") + raise + + @pytest.mark.requires_thredds + def test_open_opendap(self, thredds_endpoint: str): + """Open a dataset via OPeNDAP from THREDDS.""" + opendap_url = ( + f"{thredds_endpoint}/thredds/dodsC/alldata/model/regional/cordex/output/" + "EUR-11/GERICS/NCC-NorESM1-M/rcp85/r1i1p1/GERICS-REMO2015/v1/3hr/pr/v20181212/" + "pr_EUR-11_NCC-NorESM1-M_rcp85_r1i1p1_GERICS-REMO2015_v2_3hr_200701020130-200701020430.nc" + ) + ds = open_cloud(opendap_url, engine="netcdf4") + assert isinstance(ds, xr.Dataset) + ds.close() + + +class TestCacheConfiguration: + """Tests for cache directory configuration.""" + + def test_cache_dir_from_env(self, temp_cache_dir: Path): + """Cache directory should be configurable via environment.""" + from xarray_prism.backends.cloud import _get_cache_dir + + os.environ["XARRAY_PRISM_CACHE"] = str(temp_cache_dir) + try: + cache_dir = _get_cache_dir() + assert cache_dir == temp_cache_dir + finally: + os.environ.pop("XARRAY_PRISM_CACHE", None) + + def test_cache_dir_from_storage_options(self, temp_cache_dir: Path): + """Cache directory should be configurable via storage_options.""" + from xarray_prism.backends.cloud import _get_cache_dir + + storage_options = { + "simplecache": {"cache_storage": str(temp_cache_dir)}, + } + cache_dir = _get_cache_dir(storage_options) + assert cache_dir == temp_cache_dir + + def test_cache_dir_default(self): + """Default cache should be in temp directory.""" + from xarray_prism.backends.cloud import _get_cache_dir + import tempfile + + os.environ.pop("XARRAY_PRISM_CACHE", None) + + cache_dir = _get_cache_dir() + assert cache_dir.parent == Path(tempfile.gettempdir()) + assert "xarray-prism-cache" in str(cache_dir) diff --git a/tests/test_detection.py b/tests/test_detection.py new file mode 100644 index 0000000..484e772 --- /dev/null +++ b/tests/test_detection.py @@ -0,0 +1,287 @@ +"""Tests for URI and format detection.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from xarray_prism._detection import ( + detect_engine, + detect_uri_type, + is_http_url, + is_reference_uri, + is_remote_uri, + looks_like_opendap_url, + register_detector, + register_uri_type, + unregister_detector, + unregister_uri_type, +) + + +class TestURITypeDetection: + """Tests for URI type detection.""" + + def test_posix_local_path(self): + """Local paths should be detected as posix.""" + assert detect_uri_type("/path/to/file.nc") == "posix" + assert detect_uri_type("./relative/path.zarr") == "posix" + assert detect_uri_type("data.grib2") == "posix" + + def test_posix_file_uri(self): + """file:// URIs should be detected as posix.""" + assert detect_uri_type("file:///path/to/file.nc") == "posix" + assert detect_uri_type("file://localhost/data.zarr") == "posix" + + def test_cloud_s3_uri(self): + """S3 URIs should be detected as cloud.""" + assert detect_uri_type("s3://bucket/path/data.nc") == "cloud" + assert detect_uri_type("s3://my-bucket/nested/path.zarr") == "cloud" + + def test_cloud_gcs_uri(self): + """GCS URIs should be detected as cloud.""" + assert detect_uri_type("gs://bucket/data.nc") == "cloud" + assert detect_uri_type("gcs://bucket/path.zarr") == "cloud" + + def test_cloud_azure_uri(self): + """Azure URIs should be detected as cloud.""" + assert detect_uri_type("az://container/blob.nc") == "cloud" + assert detect_uri_type("abfs://container/data.zarr") == "cloud" + + def test_cloud_http_uri(self): + """HTTP(S) URIs should be detected as cloud.""" + assert detect_uri_type("http://example.com/data.nc") == "cloud" + assert detect_uri_type("https://server.org/path/file.zarr") == "cloud" + + def test_reference_uri(self): + """reference:// URIs should be detected as reference.""" + assert detect_uri_type("reference://path/to/refs.json") == "reference" + assert detect_uri_type("REFERENCE://upper/case.json") == "reference" + + +class TestHelperFunctions: + """Tests for helper detection functions.""" + + def test_is_http_url(self): + """Test HTTP URL detection.""" + assert is_http_url("http://example.com/data.nc") is True + assert is_http_url("https://server.org/file.zarr") is True + assert is_http_url("HTTP://UPPERCASE.com/file") is True + assert is_http_url("s3://bucket/path") is False + assert is_http_url("/local/path.nc") is False + + def test_is_remote_uri(self): + """Test remote URI detection.""" + assert is_remote_uri("s3://bucket/path") is True + assert is_remote_uri("https://server.org/file") is True + assert is_remote_uri("file:///local/path") is False + assert is_remote_uri("/local/path.nc") is False + + def test_is_reference_uri(self): + """Test reference URI detection.""" + assert is_reference_uri("reference://path/refs.json") is True + assert is_reference_uri("REFERENCE://upper.json") is True + assert is_reference_uri("s3://bucket/path") is False + + def test_looks_like_opendap_url(self): + """Test OPeNDAP URL heuristics.""" + assert looks_like_opendap_url("http://server/thredds/dodsC/data") is True + assert looks_like_opendap_url("http://server/opendap/dataset") is True + assert looks_like_opendap_url("http://server/thredds/dods/data") is True + assert looks_like_opendap_url("http://example.com/data.nc") is False + + +class TestEngineDetection: + """Tests for format/engine detection.""" + + def test_zarr_extension(self): + """Zarr should be detected by extension.""" + assert detect_engine("/path/to/data.zarr") == "zarr" + assert detect_engine("s3://bucket/nested.zarr/") == "zarr" + assert detect_engine("http://server/data.zarr/subpath") == "zarr" + + def test_reference_uri_maps_to_zarr(self): + """Reference URIs should map to zarr engine.""" + assert detect_engine("reference://path/to/refs.json") == "zarr" + + def test_opendap_detection(self): + """OPeNDAP URLs should detect as netcdf4.""" + assert detect_engine("http://server/thredds/dodsc/dataset") == "netcdf4" + assert detect_engine("http://server/opendap/data") == "netcdf4" + + @pytest.mark.requires_data + def test_netcdf4_detection_local(self, sample_netcdf_path: Path): + """NetCDF4 files should be detected via magic bytes.""" + if sample_netcdf_path.exists(): + engine = detect_engine(str(sample_netcdf_path)) + assert engine in ("h5netcdf", "scipy") + + @pytest.mark.requires_data + def test_grib_detection_local(self, sample_grib_path: Path): + """GRIB files should be detected via magic bytes.""" + if sample_grib_path.exists(): + assert detect_engine(str(sample_grib_path)) == "cfgrib" + + @pytest.mark.requires_data + def test_geotiff_detection_local(self, sample_geotiff_path: Path): + """GeoTIFF files should be detected via magic bytes.""" + if sample_geotiff_path.exists(): + assert detect_engine(str(sample_geotiff_path)) == "rasterio" + + +class TestCustomDetectors: + """Tests for custom detector registration.""" + + def test_register_and_use_custom_detector(self): + """Custom detectors should be called in priority order.""" + + @register_detector(priority=100) + def my_detector(uri: str): + if uri.endswith(".myformat"): + return "my_custom_engine" + return None + + try: + assert detect_engine("file.myformat") == "my_custom_engine" + assert detect_engine("data.zarr") == "zarr" + finally: + unregister_detector(my_detector) + + def test_unregister_detector(self): + """Detectors should be removable.""" + + @register_detector(priority=100) + def temp_detector(uri: str): + if uri.endswith(".temp"): + return "temp_engine" + return None + + assert detect_engine("file.temp") == "temp_engine" + + result = unregister_detector(temp_detector) + assert result is True + assert detect_engine("file.temp") == "unknown" + + def test_detector_priority_order(self): + """Higher priority detectors should run first.""" + + @register_detector(priority=50) + def low_priority(uri: str): + if "test" in uri: + return "low" + return None + + @register_detector(priority=150) + def high_priority(uri: str): + if "test" in uri: + return "high" + return None + + try: + assert detect_engine("test_file") == "high" + finally: + unregister_detector(low_priority) + unregister_detector(high_priority) + + def test_failing_detector_skipped(self): + """Detectors that raise exceptions should be skipped.""" + + @register_detector(priority=100) + def failing_detector(uri: str): + raise ValueError("Intentional failure") + + try: + result = detect_engine("data.zarr") + assert result == "zarr" + finally: + unregister_detector(failing_detector) + + def test_unknown_engine_warning(self): + """Custom detector returning unknown engine should warn.""" + from xarray_prism import PrismBackendEntrypoint + import warnings + + @register_detector(priority=100) + def fake_engine_detector(uri: str): + if uri.endswith(".fake"): + return "nonexistent_engine" + return None + + try: + entrypoint = PrismBackendEntrypoint() + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + engine, uri_type = entrypoint._detect("test.fake") + assert engine == "nonexistent_engine" + assert len(w) == 1 + assert "not a built-in engine" in str(w[0].message) + finally: + unregister_detector(fake_engine_detector) + + +class TestCustomURITypeDetectors: + """Tests for custom URI type detector registration.""" + + def test_register_and_use_custom_uri_type(self): + """Custom URI type detectors should be called.""" + + @register_uri_type(priority=100) + def detect_tape(uri: str): + if uri.startswith("tape://"): + return "tape" + return None + + try: + assert detect_uri_type("tape://archive/data.nc") == "tape" + assert detect_uri_type("s3://bucket/data.nc") == "cloud" + finally: + unregister_uri_type(detect_tape) + + def test_unregister_uri_type(self): + """URI type detectors should be removable.""" + + @register_uri_type(priority=100) + def detect_custom(uri: str): + if uri.startswith("custom://"): + return "custom" + return None + + assert detect_uri_type("custom://path/data") == "custom" + + result = unregister_uri_type(detect_custom) + assert result is True + assert detect_uri_type("custom://path/data") == "cloud" + + def test_uri_type_priority_order(self): + """Higher priority URI type detectors should run first.""" + + @register_uri_type(priority=50) + def low_priority(uri: str): + if "special" in uri: + return "low_type" + return None + + @register_uri_type(priority=150) + def high_priority(uri: str): + if "special" in uri: + return "high_type" + return None + + try: + assert detect_uri_type("special://data") == "high_type" + finally: + unregister_uri_type(low_priority) + unregister_uri_type(high_priority) + + def test_failing_uri_type_detector_skipped(self): + """URI type detectors that raise exceptions should be skipped.""" + + @register_uri_type(priority=100) + def failing_detector(uri: str): + raise ValueError("Intentional failure") + + try: + assert detect_uri_type("s3://bucket/data") == "cloud" + finally: + unregister_uri_type(failing_detector) diff --git a/tests/test_entrypoint.py b/tests/test_entrypoint.py new file mode 100644 index 0000000..b1aef6d --- /dev/null +++ b/tests/test_entrypoint.py @@ -0,0 +1,298 @@ +"""Tests for the xarray backend entrypoint.""" + +from __future__ import annotations + +import os +from pathlib import Path +from unittest.mock import patch + +import pytest +import xarray as xr + +from xarray_prism import PrismBackendEntrypoint +from xarray_prism._registry import registry + + +class TestPrismBackendEntrypoint: + """Tests for the xarray backend entrypoint.""" + + @pytest.fixture + def entrypoint(self): + """Create a backend entrypoint instance.""" + return PrismBackendEntrypoint() + + def test_entrypoint_attributes(self, entrypoint: PrismBackendEntrypoint): + """Entrypoint should have required attributes.""" + assert hasattr(entrypoint, "open_dataset") + assert hasattr(entrypoint, "guess_can_open") + assert entrypoint.description is not None + + def test_guess_can_open_zarr(self, entrypoint: PrismBackendEntrypoint): + """Should recognize Zarr paths.""" + assert entrypoint.guess_can_open("data.zarr") is True + assert entrypoint.guess_can_open("s3://bucket/data.zarr") is True + assert entrypoint.guess_can_open("/path/to/store.zarr/") is True + + def test_guess_can_open_netcdf(self, entrypoint: PrismBackendEntrypoint): + """Should recognize NetCDF paths.""" + assert entrypoint.guess_can_open("data.nc") is True + assert entrypoint.guess_can_open("data.nc4") is True + assert entrypoint.guess_can_open("/path/to/file.nc") is True + + def test_guess_can_open_grib(self, entrypoint: PrismBackendEntrypoint): + """Should recognize GRIB paths.""" + assert entrypoint.guess_can_open("forecast.grib") is True + assert entrypoint.guess_can_open("forecast.grib2") is True + assert entrypoint.guess_can_open("data.grb") is True + assert entrypoint.guess_can_open("data.grb2") is True + + def test_guess_can_open_geotiff(self, entrypoint: PrismBackendEntrypoint): + """Should recognize GeoTIFF paths.""" + assert entrypoint.guess_can_open("image.tif") is True + assert entrypoint.guess_can_open("image.tiff") is True + + def test_guess_can_open_reference(self, entrypoint: PrismBackendEntrypoint): + """Should recognize reference:// URIs.""" + assert entrypoint.guess_can_open("reference://path/to/refs.json") is True + + def test_guess_can_open_opendap(self, entrypoint: PrismBackendEntrypoint): + """Should recognize OPeNDAP URLs.""" + assert entrypoint.guess_can_open("http://server/thredds/dodsC/data") is True + assert entrypoint.guess_can_open("http://server/opendap/dataset") is True + + def test_guess_can_open_rejects_unknown(self, entrypoint: PrismBackendEntrypoint): + """Should reject unknown formats.""" + assert entrypoint.guess_can_open("document.pdf") is False + assert entrypoint.guess_can_open("image.jpg") is False + assert entrypoint.guess_can_open(123) is False # type: ignore + + @pytest.mark.requires_data + def test_open_dataset_local_netcdf( + self, entrypoint: PrismBackendEntrypoint, sample_netcdf_path: Path + ): + """Open a local NetCDF file via entrypoint.""" + if not sample_netcdf_path.exists(): + pytest.skip(f"Test file not found: {sample_netcdf_path}") + + ds = entrypoint.open_dataset(str(sample_netcdf_path)) + assert isinstance(ds, xr.Dataset) + ds.close() + + @pytest.mark.requires_data + def test_open_dataset_local_grib( + self, entrypoint: PrismBackendEntrypoint, sample_grib_path: Path + ): + """Open a local GRIB file via entrypoint.""" + if not sample_grib_path.exists(): + pytest.skip(f"Test file not found: {sample_grib_path}") + + try: + ds = entrypoint.open_dataset(str(sample_grib_path)) + assert isinstance(ds, xr.Dataset) + ds.close() + except Exception as e: + if "cfgrib" in str(e).lower() or "eccodes" in str(e).lower(): + pytest.skip("cfgrib/eccodes not installed") + raise + + @pytest.mark.requires_data + def test_open_dataset_with_drop_variables( + self, entrypoint: PrismBackendEntrypoint, sample_netcdf_path: Path + ): + """Test drop_variables parameter via entrypoint.""" + if not sample_netcdf_path.exists(): + pytest.skip(f"Test file not found: {sample_netcdf_path}") + + ds_full = entrypoint.open_dataset(str(sample_netcdf_path)) + var_names = list(ds_full.data_vars) + ds_full.close() + + if var_names: + ds_partial = entrypoint.open_dataset( + str(sample_netcdf_path), drop_variables=[var_names[0]] + ) + assert var_names[0] not in ds_partial.data_vars + ds_partial.close() + + @pytest.mark.requires_data + def test_open_dataset_explicit_engine( + self, entrypoint: PrismBackendEntrypoint, sample_netcdf_path: Path + ): + """Test explicit engine override.""" + if not sample_netcdf_path.exists(): + pytest.skip(f"Test file not found: {sample_netcdf_path}") + + ds = entrypoint.open_dataset(str(sample_netcdf_path), xarray_engine="h5netcdf") + assert isinstance(ds, xr.Dataset) + ds.close() + + def test_open_dataset_invalid_type(self, entrypoint: PrismBackendEntrypoint): + """Should raise for invalid input types.""" + with pytest.raises(ValueError, match="file path or URL"): + entrypoint.open_dataset(123) # type: ignore + + def test_open_dataset_unknown_format(self, entrypoint: PrismBackendEntrypoint): + """Should raise for undetectable formats.""" + import tempfile + + with tempfile.NamedTemporaryFile(suffix=".xyz", delete=False) as f: + f.write(b"unknown content") + temp_path = f.name + + try: + with pytest.raises(ValueError): + entrypoint.open_dataset(temp_path) + finally: + os.unlink(temp_path) + + @pytest.mark.requires_minio + def test_open_dataset_s3(self, entrypoint: PrismBackendEntrypoint, s3_env: dict): + """Open a dataset from S3 via entrypoint.""" + uri = "s3://testdata/pr_EUR-11_NCC-NorESM1-M_rcp85_r1i1p1_GERICS-REMO2015_v2_3hr_200701020130-200701020430.nc" + + try: + ds = entrypoint.open_dataset(uri, storage_options=s3_env) + assert isinstance(ds, xr.Dataset) + ds.close() + except FileNotFoundError: + pytest.skip("Test file not found in MinIO") + except Exception as e: + err_str = str(e).lower() + if any(x in err_str for x in ("s3fs", "credentials", "nosuchbucket")): + pytest.skip(f"S3 setup issue: {e}") + raise + + @pytest.mark.requires_thredds + def test_open_dataset_opendap( + self, entrypoint: PrismBackendEntrypoint, thredds_endpoint: str + ): + """Open a dataset via OPeNDAP through entrypoint.""" + opendap_url = ( + f"{thredds_endpoint}/thredds/dodsC/alldata/model/regional/cordex/output/" + "EUR-11/GERICS/NCC-NorESM1-M/rcp85/r1i1p1/GERICS-REMO2015/v1/3hr/pr/v20181212/" + "pr_EUR-11_NCC-NorESM1-M_rcp85_r1i1p1_GERICS-REMO2015_v2_3hr_200701020130-200701020430.nc" + ) + + try: + ds = entrypoint.open_dataset(opendap_url) + assert isinstance(ds, xr.Dataset) + ds.close() + except Exception as e: + if "netcdf4" in str(e).lower() or "connection" in str(e).lower(): + pytest.skip(f"OPeNDAP access failed: {e}") + raise + + def test_open_dataset_custom_uri_type_no_handler( + self, entrypoint: PrismBackendEntrypoint + ): + """Should raise helpful error for custom uri_type without handler.""" + from xarray_prism._detection import register_uri_type, unregister_uri_type + + @register_uri_type(priority=100) + def detect_tape(uri: str): + if uri.startswith("tape://"): + return "tape" + return None + + try: + with pytest.raises(ValueError, match="No handler registered"): + entrypoint.open_dataset("tape://archive/data.zarr") + finally: + unregister_uri_type(detect_tape) + + +class TestXarrayIntegration: + """Tests for xarray.open_dataset integration.""" + + @pytest.mark.requires_data + def test_xarray_open_with_freva_engine(self, sample_netcdf_path: Path): + """xarray should recognize 'prism' as a valid engine.""" + if not sample_netcdf_path.exists(): + pytest.skip(f"Test file not found: {sample_netcdf_path}") + + ds = xr.open_dataset(str(sample_netcdf_path), engine="prism") + assert isinstance(ds, xr.Dataset) + ds.close() + + +class TestCustomRegistry: + """Tests for custom backend registration.""" + + def test_register_custom_handler(self): + """Register and use a custom handler.""" + + @registry.register("custom_format", uri_type="posix") + def custom_handler(uri, **kwargs): + return xr.Dataset({"test": (["x"], [1, 2, 3])}) + + assert registry.has("custom_format", "posix") + handler = registry.get("custom_format", "posix") + assert handler is not None + + ds = handler("any_uri.custom") + assert "test" in ds.data_vars + ds.close() + + def test_register_handler_for_both_uri_types(self): + """Register handler for both posix and cloud.""" + + @registry.register("universal_format", uri_type="both") + def universal_handler(uri, **kwargs): + return xr.Dataset({"data": (["y"], [4, 5, 6])}) + + assert registry.has("universal_format", "posix") + assert registry.has("universal_format", "cloud") + + def test_custom_handler_used_by_entrypoint(self): + """Custom handlers should be used by the entrypoint.""" + from xarray_prism._detection import register_detector, unregister_detector + + @register_detector(priority=200) + def detect_myformat(uri): + if uri.endswith(".mydata"): + return "myformat" + return None + + @registry.register("myformat", uri_type="posix") + def myformat_handler(uri, **kwargs): + return xr.Dataset({"custom": (["z"], [7, 8, 9])}) + + try: + entrypoint = PrismBackendEntrypoint() + + import tempfile + + with tempfile.NamedTemporaryFile(suffix=".mydata", delete=False) as f: + f.write(b"dummy content") + temp_path = f.name + + try: + ds = entrypoint.open_dataset(temp_path) + assert "custom" in ds.data_vars + ds.close() + finally: + os.unlink(temp_path) + finally: + unregister_detector(detect_myformat) + + def test_custom_uri_type_with_handler(self): + """Custom URI type with registered handler should work.""" + from xarray_prism._detection import register_uri_type, unregister_uri_type + + @register_uri_type(priority=100) + def detect_tape(uri: str): + if uri.startswith("tape://"): + return "tape" + return None + + @registry.register("zarr", uri_type="tape") + def tape_handler(uri, **kwargs): + return xr.Dataset({"staged": (["t"], [10, 20, 30])}) + + try: + entrypoint = PrismBackendEntrypoint() + ds = entrypoint.open_dataset("tape://archive/data.zarr") + assert "staged" in ds.data_vars + ds.close() + finally: + unregister_uri_type(detect_tape)