Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
b1c9a0a
Add ECMWF AIFS deterministic forecast dataset integration (#448)
mrshll Feb 25, 2026
ff56174
Address PR #449 review feedback for ECMWF AIFS integration
mrshll Feb 26, 2026
98e8e93
Add integration guide sections to prevent common review issues
mrshll Feb 26, 2026
3c94115
Update src/reformatters/ecmwf/aifs_deterministic/forecast/template_co…
mrshll Feb 27, 2026
7a8cf82
Update docs/dataset_integration_guide.md
mrshll Feb 27, 2026
368683d
Update src/reformatters/ecmwf/aifs_deterministic/forecast/template_co…
mrshll Feb 27, 2026
dab3935
Update src/reformatters/ecmwf/aifs_deterministic/forecast/template_co…
mrshll Feb 27, 2026
477dade
Merge branch 'main' into ecmwf-aifs
aldenks Mar 6, 2026
6aa648f
Align ECMWF AIFS variable set and metadata with IFS ENS
aldenks Mar 6, 2026
ef03a1e
Add missing AIFS tests following GFS/HRRR patterns
aldenks Mar 6, 2026
494d15a
Address PR review comments
aldenks Mar 6, 2026
d8cd3ff
Rename EcmwfAifsForecast* to EcmwfAifsDeterministicForecast*, fix cro…
aldenks Mar 11, 2026
c8cdd4f
Fix latitude chunk/shard sizes to avoid tiny tail shards
aldenks Mar 11, 2026
9dd4933
Deduplicate _parse_index_file index column construction
aldenks Mar 12, 2026
445e625
Replace ecmwf_utils with vars_available in ecmwf_config_models
aldenks Mar 12, 2026
bfafca6
Minor AIFS cleanup: docstring, forecast_resolution wording, template …
aldenks Mar 12, 2026
951eba8
Improve AIFS URL tests, simplify integration guide, drop boto3 dev dep
aldenks Mar 12, 2026
7beb2cb
Merge branch 'main' into ecmwf-aifs
aldenks Mar 12, 2026
77fbc04
Move ecmwf_utils tests to test_ecmwf_config_models, add vars_availabl…
aldenks Mar 12, 2026
9b65ada
Deaccumulate AIFS precipitation and radiation variables
aldenks Mar 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/manual-create-job-from-cronjob.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ name: 'Manual: Create Job from CronJob'
type: choice
options:
- dwd-icon-eu-forecast-archive-grib-files
- ecmwf-aifs-deterministic-forecast-update
- ecmwf-aifs-deterministic-forecast-validate
- ecmwf-ifs-ens-forecast-15-day-0-25-degree-update
- ecmwf-ifs-ens-forecast-15-day-0-25-degree-validate
- nasa-smap-level3-36km-v9-update
Expand Down
5 changes: 4 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ docs/ # Documentation
deploy/ # Docker and kubernetes configs
```

- **Shared provider utilities** Check `src/reformatters/<provider>/` for shared modules (e.g., `ecmwf/ecmwf_grib_index.py`, `noaa/noaa_utils.py`).
- **Common utilities** Look for relevant utilities in `src/reformatters/common/` before implementing equivalent logic. An incomplete list: `download.py` (`http_download_to_disk`), `iterating.py` (`group_by`, `item`, `digest`), `logging.py` (`get_logger`), `pydantic.py` (`replace`, `FrozenBaseModel`), `retry.py`, `time_utils.py` (`whole_hours`).

## Core classes

Integrating a dataset requires subclassing three base classes. For a step by step walkthrough, see [docs/dataset_integration_guide.md](docs/dataset_integration_guide.md) and for complete details of what and how subclassers should implement see the commented templates in `src/reformatters/example/{dynamical_dataset|template_config|region_job}.py`.
Expand All @@ -58,7 +61,7 @@ Run these tests after updating a template: `uv run pytest tests/common/common_te
#### Metadata conventions
Metadata attributes for variables and coordinates must follow CF Conventions.
The `standard_name` and `units` fields must match CF definitions if one exists for that variable; if one doesn't, use SI `units` and leave `standard_name` unset.
Use ECMWF variable name for `long_name` and ECMWF short name for `short_name`.
Use ECMWF variable name for `long_name` and ECMWF short name for `short_name`. When adding a variable, search to see if another dataset already has an equivalent variable (e.g. `temperature_2m`), match those names and metadata exactly.


### RegionJob
Expand Down
20 changes: 14 additions & 6 deletions docs/dataset_integration_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,9 @@ If you plan to write this dataset to a location not maintained by dynamical.org,

### 3. Implement `TemplateConfig` subclass

Work through `src/reformatters/$DATASET_PATH/template_config.py`, setting the attributes and method definitions to describe the structure of your dataset.
Work through `src/reformatters/$DATASET_PATH/template_config.py`, setting the attributes and method definitions to describe the structure of your dataset. The report generated by following the [source_data_exploration_guide.md](source_data_exploration_guide.md) will be helpful here.

Providing an AI with 1) the example template config code to edit, 2) output of running `gdalinfo <example source data file>` and 3) any dataset documentation will help it give you a decent first implementation of your `TemplateConfig` subclass.

Use the [chunk/shard layout tool](./chunk_shard_layout_tool.md) to find chunk and shard sizes for your data variables.
Read the [chunk/shard layout tool](./chunk_shard_layout_tool.md) docs and use the tool to find chunk and shard sizes for your data variables.

Using the information in the `TemplateConfig`, `reformatters` writes the Zarr metadata for your dataset to `src/reformatters/$DATASET_PATH/templates/latest.zarr`. Run this command in your terminal to create or update the template based on the your `TemplateConfig` subclass:

Expand Down Expand Up @@ -93,7 +91,7 @@ There are four required methods:

There are a few optional, additional methods which are described in the example code. Implement them if required for your dataset, otherwise remove them to use the base class `RegionJob` implementations.

Write a test or two for any custom logic you've created. Generally don't implement integration style tests that make network requests in your `region_job_test.py`, we'll do those in the `dynamical_dataset_test.py`.
Write tests for any custom logic you've created.

```bash
uv run pytest tests/$DATASET_PATH/region_job_test.py
Expand All @@ -111,7 +109,17 @@ Reformatting locally can be slow. Choosing an `<append_dim_end>` not long after

To operationalize your dataset and have the `update` and `validate` Kubernetes cron jobs be deployed automatically by GitHub CI, implement the two methods in `src/reformatters/$DATASET_PATH/dynamical_dataset.py`.

In `dynamical_dataset_test.py` create a test that runs `backfill_local` followed by `update` for a couple data variables.
Kubernetes resource values:
- shared memory: Round the value calculated in the chunk/shard size tool output up to the nearest half GB.
- memory: 1.5x shared memory.
- cpu: the number of spatial dimension shards minus 1 to account for kubernetes headroom. e.g. if 2 latitude shards * 4 longitude shards = 8, choose 7 cpu to schedule on an 8 cpu node.
- ephemeral_storage: 20GB is a good starting point.

The update cron schedule should run shortly after the source data is expected to be available and the validate cron should run at `update cron start + update pod_active_deadline`.

#### Integration test with snapshot values

In `dynamical_dataset_test.py` create a test that runs `backfill_local` followed by `update` for a couple data variables and a minimal number of time steps, lead times and ensemble members. Include snapshot value assertions for every data variable that the test processes — check specific known values at specific coordinates (e.g. `assert_allclose(point["temperature_2m"].values, [28.75, 29.23])`). Snapshot values catch silent regressions in data reading, unit conversion, or coordinate alignment that other tests miss.

```bash
uv run pytest tests/$DATASET_PATH/dynamical_dataset_test.py
Expand Down
17 changes: 17 additions & 0 deletions src/reformatters/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
)
from reformatters.contrib.uarizona.swann.analysis import UarizonaSwannAnalysisDataset
from reformatters.dwd.icon_eu.forecast import DwdIconEuForecastDataset
from reformatters.ecmwf.aifs_deterministic.forecast import (
EcmwfAifsDeterministicForecastDataset,
)
from reformatters.ecmwf.ifs_ens.forecast_15_day_0_25_degree.dynamical_dataset import (
EcmwfIfsEnsForecast15Day025DegreeDataset,
)
Expand Down Expand Up @@ -79,6 +82,14 @@ class EcmwfIfsEnsIcechunkAwsOpenDataDatasetStorageConfig(StorageConfig):
format: DatasetFormat = DatasetFormat.ICECHUNK


class EcmwfAifsDeterministicIcechunkAwsOpenDataDatasetStorageConfig(StorageConfig):
"""ECMWF AIFS deterministic in Icechunk on AWS Open Data."""

base_path: str = "s3://dynamical-ecmwf-aifs-deterministic"
k8s_secret_name: str = "aws-open-data-icechunk-storage-options-key" # noqa: S105
format: DatasetFormat = DatasetFormat.ICECHUNK


class NoaaMrmsIcechunkAwsOpenDataDatasetStorageConfig(StorageConfig):
"""NOAA MRMS in Icechunk on AWS Open Data."""

Expand Down Expand Up @@ -144,6 +155,12 @@ class UpstreamGriddedZarrsDatasetStorageConfig(StorageConfig):
primary_storage_config=SourceCoopZarrDatasetStorageConfig(),
replica_storage_configs=[EcmwfIfsEnsIcechunkAwsOpenDataDatasetStorageConfig()],
),
EcmwfAifsDeterministicForecastDataset(
primary_storage_config=SourceCoopZarrDatasetStorageConfig(),
replica_storage_configs=[
EcmwfAifsDeterministicIcechunkAwsOpenDataDatasetStorageConfig()
],
),
# DWD
DwdIconEuForecastDataset(
primary_storage_config=SourceCoopZarrDatasetStorageConfig()
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .dynamical_dataset import (
EcmwfAifsDeterministicForecastDataset as EcmwfAifsDeterministicForecastDataset,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from collections.abc import Sequence
from datetime import timedelta

from reformatters.common import validation
from reformatters.common.dynamical_dataset import DynamicalDataset
from reformatters.common.kubernetes import CronJob, ReformatCronJob, ValidationCronJob
from reformatters.ecmwf.ecmwf_config_models import EcmwfDataVar

from .region_job import (
EcmwfAifsDeterministicForecastRegionJob,
EcmwfAifsDeterministicForecastSourceFileCoord,
)
from .template_config import EcmwfAifsDeterministicForecastTemplateConfig


class EcmwfAifsDeterministicForecastDataset(
DynamicalDataset[EcmwfDataVar, EcmwfAifsDeterministicForecastSourceFileCoord]
):
template_config: EcmwfAifsDeterministicForecastTemplateConfig = (
EcmwfAifsDeterministicForecastTemplateConfig()
)
region_job_class: type[EcmwfAifsDeterministicForecastRegionJob] = (
EcmwfAifsDeterministicForecastRegionJob
)

def operational_kubernetes_resources(self, image_tag: str) -> Sequence[CronJob]:
suspend = True
operational_update_cron_job = ReformatCronJob(
name=f"{self.dataset_id}-update",
schedule="25 */6 * * *",
suspend=suspend,
pod_active_deadline=timedelta(minutes=30),
image=image_tag,
dataset_id=self.dataset_id,
cpu="3.5",
memory="7G",
shared_memory="1.5G",
ephemeral_storage="20G",
secret_names=self.store_factory.k8s_secret_names(),
)
validation_cron_job = ValidationCronJob(
name=f"{self.dataset_id}-validate",
schedule="55 */6 * * *",
suspend=suspend,
pod_active_deadline=timedelta(minutes=10),
image=image_tag,
dataset_id=self.dataset_id,
cpu="1.3",
memory="7G",
secret_names=self.store_factory.k8s_secret_names(),
)

return [operational_update_cron_job, validation_cron_job]

def validators(self) -> Sequence[validation.DataValidator]:
return (
validation.check_forecast_current_data,
validation.check_forecast_recent_nans,
)
227 changes: 227 additions & 0 deletions src/reformatters/ecmwf/aifs_deterministic/forecast/region_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
import itertools
from collections.abc import Callable, Mapping, Sequence
from pathlib import Path
from typing import ClassVar

import numpy as np
import pandas as pd
import rasterio
import xarray as xr
from zarr.abc.store import Store

from reformatters.common.deaccumulation import deaccumulate_to_rates_inplace
from reformatters.common.download import http_download_to_disk
from reformatters.common.iterating import digest, group_by
from reformatters.common.logging import get_logger
from reformatters.common.region_job import (
CoordinateValueOrRange,
RegionJob,
SourceFileCoord,
)
from reformatters.common.time_utils import whole_hours
from reformatters.common.types import (
AppendDim,
ArrayFloat32,
DatetimeLike,
Dim,
Timedelta,
Timestamp,
)
from reformatters.ecmwf.ecmwf_config_models import EcmwfDataVar, vars_available
from reformatters.ecmwf.ecmwf_grib_index import get_message_byte_ranges_from_index

log = get_logger(__name__)

# Path changed from aifs/ to aifs-single/ on this date
AIFS_SINGLE_PATH_CHANGE_DATE = pd.Timestamp("2025-02-26T00:00")

# GRIB master table version changes caused metadata differences for precipitation.
# Early data (table v27): generic product template codes.
# Recent data (table v34+): specific parameter names with different step encoding.
# Maps grib_index_param -> (alt_grib_comment, alt_grib_description)
_PRECIP_ALT_GRIB_METADATA: dict[str, tuple[str, str]] = {
"tp": (
"Total precipitation rate [kg/(m^2*s)]",
'0[-] SFC="Ground or water surface"',
),
}


class EcmwfAifsDeterministicForecastSourceFileCoord(SourceFileCoord):
init_time: Timestamp
lead_time: Timedelta
data_var_group: Sequence[EcmwfDataVar]

def _get_base_url(self) -> str:
root_url = "https://ecmwf-forecasts.s3.eu-central-1.amazonaws.com"

init_time_str = self.init_time.strftime("%Y%m%d")
init_hour_str = self.init_time.strftime("%H")
lead_time_hour_str = whole_hours(self.lead_time)

if self.init_time >= AIFS_SINGLE_PATH_CHANGE_DATE:
model_dir = "aifs-single"
else:
model_dir = "aifs"

directory_path = f"{init_time_str}/{init_hour_str}z/{model_dir}/0p25/oper"
filename = f"{init_time_str}{init_hour_str}0000-{lead_time_hour_str}h-oper-fc"
return f"{root_url}/{directory_path}/{filename}"

def get_url(self) -> str:
return self._get_base_url() + ".grib2"

def get_index_url(self) -> str:
return self._get_base_url() + ".index"

def out_loc(self) -> Mapping[Dim, CoordinateValueOrRange]:
return {
"init_time": self.init_time,
"lead_time": self.lead_time,
}


class EcmwfAifsDeterministicForecastRegionJob(
RegionJob[EcmwfDataVar, EcmwfAifsDeterministicForecastSourceFileCoord]
):
max_vars_per_download_group: ClassVar[int] = 10

@classmethod
def source_groups(
cls,
data_vars: Sequence[EcmwfDataVar],
) -> Sequence[Sequence[EcmwfDataVar]]:
return group_by(data_vars, lambda v: v.internal_attrs.date_available)

def generate_source_file_coords(
self,
processing_region_ds: xr.Dataset,
data_var_group: Sequence[EcmwfDataVar],
) -> Sequence[EcmwfAifsDeterministicForecastSourceFileCoord]:
coords = []
for init_time, lead_time in itertools.product(
processing_region_ds["init_time"].values,
processing_region_ds["lead_time"].values,
):
if not vars_available(data_var_group, init_time):
continue

coords.append(
EcmwfAifsDeterministicForecastSourceFileCoord(
init_time=init_time,
lead_time=lead_time,
data_var_group=data_var_group,
)
)
return coords

def download_file(
self, coord: EcmwfAifsDeterministicForecastSourceFileCoord
) -> Path:
idx_url = coord.get_index_url()
idx_local_path = http_download_to_disk(idx_url, self.dataset_id)

byte_range_starts, byte_range_ends = get_message_byte_ranges_from_index(
idx_local_path,
coord.data_var_group,
)
suffix = digest(
f"{s}-{e}" for s, e in zip(byte_range_starts, byte_range_ends, strict=True)
)
return http_download_to_disk(
coord.get_url(),
self.dataset_id,
byte_ranges=(byte_range_starts, byte_range_ends),
local_path_suffix=f"-{suffix}",
)

def read_data(
self,
coord: EcmwfAifsDeterministicForecastSourceFileCoord,
data_var: EcmwfDataVar,
) -> ArrayFloat32:
expected_comment = data_var.internal_attrs.grib_comment
expected_description = data_var.internal_attrs.grib_description

alt_metadata = _PRECIP_ALT_GRIB_METADATA.get(
data_var.internal_attrs.grib_index_param
)
allowed_comments = {expected_comment}
allowed_descriptions = {expected_description}
if alt_metadata is not None:
allowed_comments.add(alt_metadata[0])
allowed_descriptions.add(alt_metadata[1])

with rasterio.open(coord.downloaded_path) as reader:
matching_bands: list[int] = []
for band_i in range(reader.count):
rasterio_band_i = band_i + 1
if (
reader.tags(rasterio_band_i)["GRIB_COMMENT"] in allowed_comments
and reader.descriptions[band_i] in allowed_descriptions
):
matching_bands.append(rasterio_band_i)

assert len(matching_bands) == 1, (
f"Expected exactly 1 matching band, found {len(matching_bands)}. "
f"{expected_comment=}, {expected_description=}, {coord.downloaded_path=}"
)
result: ArrayFloat32 = reader.read(matching_bands[0], out_dtype=np.float32)
return result

def apply_data_transformations(
self, data_array: xr.DataArray, data_var: EcmwfDataVar
) -> None:
if data_var.internal_attrs.scale_factor is not None:
data_array *= data_var.internal_attrs.scale_factor

if data_var.internal_attrs.deaccumulate_to_rate:
reset_freq = data_var.internal_attrs.window_reset_frequency
deaccumulation_invalid_below_threshold_rate = (
data_var.internal_attrs.deaccumulation_invalid_below_threshold_rate
)
assert deaccumulation_invalid_below_threshold_rate is not None
assert reset_freq is not None

try:
deaccumulate_to_rates_inplace(
data_array,
dim="lead_time",
reset_frequency=reset_freq,
invalid_below_threshold_rate=deaccumulation_invalid_below_threshold_rate,
)
except ValueError:
log.exception(f"Error deaccumulating {data_var.name}")

super().apply_data_transformations(data_array, data_var)

@classmethod
def operational_update_jobs(
cls,
primary_store: Store,
tmp_store: Path,
get_template_fn: Callable[[DatetimeLike], xr.Dataset],
append_dim: AppendDim,
all_data_vars: Sequence[EcmwfDataVar],
reformat_job_name: str,
) -> tuple[
Sequence[
"RegionJob[EcmwfDataVar, EcmwfAifsDeterministicForecastSourceFileCoord]"
],
xr.Dataset,
]:
existing_ds = xr.open_zarr(primary_store, chunks=None)
append_dim_start = existing_ds[append_dim].max()
append_dim_end = pd.Timestamp.now()
template_ds = get_template_fn(append_dim_end)

jobs = cls.get_jobs(
kind="operational-update",
tmp_store=tmp_store,
template_ds=template_ds,
append_dim=append_dim,
all_data_vars=all_data_vars,
reformat_job_name=reformat_job_name,
filter_start=append_dim_start,
)
return jobs, template_ds
Loading
Loading