Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ baselines:
root: /store_new/mch/msopr/ml/COSMO-E
steps: 0/120/6

analysis:
truth:
label: COSMO KENDA
analysis_zarr: /scratch/mch/fzanetta/data/anemoi/datasets/mch-co2-an-archive-0p02-2015-2020-6h-v3-pl13.zarr
root: /scratch/mch/fzanetta/data/anemoi/datasets/mch-co2-an-archive-0p02-2015-2020-6h-v3-pl13.zarr

locations:
output_root: output/
Expand Down
4 changes: 2 additions & 2 deletions config/forecasters-co1e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ baselines:
root: /store_new/mch/msopr/ml/COSMO-1E
steps: 0/33/6

analysis:
truth:
label: COSMO KENDA
analysis_zarr: /scratch/mch/fzanetta/data/anemoi/datasets/mch-co1e-an-archive-0p01-2019-2024-1h-v1-pl13.zarr
root: /scratch/mch/fzanetta/data/anemoi/datasets/mch-co1e-an-archive-0p01-2019-2024-1h-v1-pl13.zarr

stratification:
regions:
Expand Down
4 changes: 2 additions & 2 deletions config/forecasters-co2-disentangled.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ baselines:
root: /store_new/mch/msopr/ml/COSMO-E
steps: 0/120/6

analysis:
truth:
label: COSMO KENDA
analysis_zarr: /scratch/mch/fzanetta/data/anemoi/datasets/mch-co2-an-archive-0p02-2015-2020-6h-v3-pl13.zarr
root: /scratch/mch/fzanetta/data/anemoi/datasets/mch-co2-an-archive-0p02-2015-2020-6h-v3-pl13.zarr

stratification:
regions:
Expand Down
4 changes: 2 additions & 2 deletions config/forecasters-co2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ baselines:
root: /store_new/mch/msopr/ml/COSMO-E
steps: 0/120/6

analysis:
truth:
label: COSMO KENDA
analysis_zarr: /scratch/mch/fzanetta/data/anemoi/datasets/mch-co2-an-archive-0p02-2015-2020-6h-v3-pl13.zarr
root: /scratch/mch/fzanetta/data/anemoi/datasets/mch-co2-an-archive-0p02-2015-2020-6h-v3-pl13.zarr

stratification:
regions:
Expand Down
14 changes: 9 additions & 5 deletions config/forecasters-ich1-oper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ dates:
- 2025-02-01T06:00
- 2025-03-01T12:00


runs:
- forecaster:
checkpoint: https://servicedepl.meteoswiss.ch/mlstore#/experiments/409/runs/b30acf68520a4bbd8324c44666561696
Expand All @@ -26,13 +25,18 @@ runs:
baselines:
- baseline:
baseline_id: ICON-CH1-EPS
label: ICON-CH1-EPS
root: /store_new/mch/msopr/ml/ICON-CH1-EPS
label: ICON-CH1-ctrl
root: /scratch/mch/cmerker/ICON-CH1-EPS
steps: 0/33/6
- baseline:
baseline_id: ICON-CH2-EPS
label: ICON-CH2-ctrl
root: /scratch/mch/cmerker/ICON-CH2-EPS
steps: 0/120/6

analysis:
truth:
label: KENDA-CH1
analysis_zarr: /store_new/mch/msopr/ml/datasets/mch-ich1-1km-2024-2025-1h-pl13-v1.0.zarr
root: /store_new/mch/msopr/ml/datasets/mch-ich1-1km-2024-2025-1h-pl13-v1.0.zarr

stratification:
regions:
Expand Down
14 changes: 7 additions & 7 deletions config/forecasters-ich1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ runs:

baselines:
- baseline:
baseline_id: ICON-CH1-EPS
label: ICON-CH1-EPS
root: /store_new/mch/msopr/ml/ICON-CH1-EPS
steps: 0/33/6
baseline_id: ICON-CH2-EPS
label: ICON-CH2-EPS
root: /scratch/mch/cmerker/ICON-CH2-EPS
steps: 0/120/6

analysis:
label: REA-L-CH1
analysis_zarr: /store_new/mch/msopr/ml/datasets/mch-realch1-fdb-1km-2005-2025-1h-pl13-v1.0.zarr
truth:
label: KENDA-CH1
root: /store_new/mch/msopr/ml/datasets/mch-ich1-1km-2024-2025-1h-pl13-v1.0.zarr

stratification:
regions:
Expand Down
4 changes: 2 additions & 2 deletions config/interpolators-co2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ baselines:
root: /store_new/mch/msopr/ml/COSMO-E_hourly
steps: 0/120/1

analysis:
truth:
label: COSMO KENDA
analysis_zarr: /scratch/mch/fzanetta/data/anemoi/datasets/mch-co2-an-archive-0p02-2015-2020-1h-v3-pl13.zarr
root: /scratch/mch/fzanetta/data/anemoi/datasets/mch-co2-an-archive-0p02-2015-2020-1h-v3-pl13.zarr

stratification:
regions:
Expand Down
189 changes: 152 additions & 37 deletions src/data_input/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import logging
import os
import sys
from datetime import datetime
from datetime import datetime, timedelta
from pathlib import Path
from typing import Iterable

eccodes_definition_path = Path(sys.prefix) / "share/eccodes-cosmo-resources/definitions"
os.environ["ECCODES_DEFINITION_PATH"] = str(eccodes_definition_path)
Expand All @@ -16,8 +15,37 @@
LOG = logging.getLogger(__name__)


def _select_valid_times(ds, times: np.datetime64):
# (handle special case where some valid times are not in the dataset, e.g. at the end)
times_np = np.asarray(times, dtype="datetime64[ns]")
times_included = np.isin(times_np, ds.time.values)
if times_included.all():
return ds.sel(time=times_np)
elif times_included.any():
LOG.warning(
"Some valid times are not included in the dataset: \n%s",
times_np[~times_included],
)
return ds.sel(time=times_np[times_included])
else:
raise ValueError(
"Valid times are not included in the dataset. "
"Please check the valid times and the dataset."
)


def parse_steps(steps: str) -> list[int]:
# check that steps is in the format "start/stop/step"
if "/" not in steps:
raise ValueError(f"Expected steps in format 'start/stop/step', got '{steps}'")
if len(steps.split("/")) != 3:
raise ValueError(f"Expected steps in format 'start/stop/step', got '{steps}'")
start, end, step = map(int, steps.split("/"))
return list(range(start, end + 1, step))


def load_analysis_data_from_zarr(
analysis_zarr: Path, times: Iterable[datetime], params: list[str]
root: Path, reftime: datetime, steps: list[int], params: list[str]
) -> xr.Dataset:
"""Load analysis data from an anemoi-generated Zarr dataset

Expand All @@ -36,9 +64,9 @@ def load_analysis_data_from_zarr(
PARAMS_MAP_COSMO1 = {
v: v.replace("TOT_PREC", "TOT_PREC_6H") for v in PARAMS_MAP_COSMO2.keys()
}
PARAMS_MAP = PARAMS_MAP_COSMO2 if "co2" in analysis_zarr.name else PARAMS_MAP_COSMO1
PARAMS_MAP = PARAMS_MAP_COSMO2 if "co2" in root.name else PARAMS_MAP_COSMO1

ds = xr.open_zarr(analysis_zarr, consolidated=False)
ds = xr.open_zarr(root, consolidated=False)

# rename "dates" to "time" and set it as index
ds = ds.set_index(time="dates")
Expand All @@ -59,8 +87,8 @@ def load_analysis_data_from_zarr(

# set lat lon as coords (optional)
if "latitudes" in ds and "longitudes" in ds:
ds = ds.rename({"latitudes": "latitude", "longitudes": "longitude"})
ds = ds.set_coords(["latitude", "longitude"])
ds = ds.rename({"latitudes": "lat", "longitudes": "lon"})
ds = ds.set_coords(["lat", "lon"])
ds = (
ds["data"]
.to_dataset("variable")
Expand All @@ -71,30 +99,15 @@ def load_analysis_data_from_zarr(
if "cell" in ds.dims:
ds = ds.rename({"cell": "values"})

# select valid times
# (handle special case where some valid times are not in the dataset, e.g. at the end)
times_included = times.isin(ds.time.values).values
if all(times_included):
ds = ds.sel(time=times)
elif np.sum(times_included) < len(times_included):
LOG.warning(
"Some valid times are not included in the dataset: \n%s",
times[~times_included].values,
)
ds = ds.sel(time=times[times_included])
else:
raise ValueError(
"Valid times are not included in the dataset. "
"Please check the valid times and the dataset."
)
return ds
times = np.datetime64(reftime) + np.asarray(steps, dtype="timedelta64[h]")
return _select_valid_times(ds, times)


def load_fct_data_from_grib(
grib_output_dir: Path, reftime: datetime, steps: list[int], params: list[str]
root: Path, reftime: datetime, steps: list[int], params: list[str]
) -> xr.Dataset:
"""Load forecast data from GRIB files for a specific valid time."""
files = sorted(grib_output_dir.glob("20*.grib"))
files = sorted(root.glob(f"{reftime:%Y%m%d%H%M}*.grib"))
fds = data_source.FileDataSource(datafiles=files)
ds = grib_decoder.load(fds, {"param": params, "step": steps})
for var, da in ds.items():
Expand Down Expand Up @@ -127,13 +140,13 @@ def load_fct_data_from_grib(


def load_baseline_from_zarr(
zarr_path: Path, reftime: datetime, steps: list[int], params: list[str]
root: Path, reftime: datetime, steps: list[int], params: list[str]
) -> xr.Dataset:
"""Load forecast data from a Zarr dataset."""
try:
baseline = xr.open_zarr(zarr_path, consolidated=True, decode_timedelta=True)
baseline = xr.open_zarr(root, consolidated=True, decode_timedelta=True)
except ValueError:
raise ValueError(f"Could not open baseline zarr at {zarr_path}")
raise ValueError(f"Could not open baseline zarr at {root}")

baseline = baseline.rename(
{"forecast_reference_time": "ref_time", "step": "lead_time"}
Expand All @@ -156,14 +169,116 @@ def load_baseline_from_zarr(
lead_time=np.array(steps, dtype="timedelta64[h]"),
)
baseline = baseline.assign_coords(time=baseline.ref_time + baseline.lead_time)
if "latitude" in baseline.coords and "longitude" in baseline:
baseline = baseline.rename({"latitude": "lat", "longitude": "lon"})
return baseline


def parse_steps(steps: str) -> list[int]:
# check that steps is in the format "start/stop/step"
if "/" not in steps:
raise ValueError(f"Expected steps in format 'start/stop/step', got '{steps}'")
if len(steps.split("/")) != 3:
raise ValueError(f"Expected steps in format 'start/stop/step', got '{steps}'")
start, end, step = map(int, steps.split("/"))
return list(range(start, end + 1, step))
def load_obs_data_from_peakweather(
root, reftime: datetime, steps: list[int], params: list[str], freq: str = "1h"
) -> xr.Dataset:
"""Load PeakWeather station observations into an xarray Dataset.

Returns a Dataset with dimensions `time` and `values`, values coordinates
(`lat`, `lon`), and variables renamed to ICON parameter names.
Temperatures are converted to Kelvin when present.
"""
from peakweather.dataset import PeakWeatherDataset

param_names = {
"temperature": "T_2M",
"wind_u": "U_10M",
"wind_v": "V_10M",
}
param_names = {k: v for k, v in param_names.items() if v in params}

start = reftime
end = start + timedelta(hours=max(steps))
if len(steps) > 1:
end += timedelta(hours=steps[-1] - steps[-2]) # extend by 1 extra step
years = list(set([start.year, end.year]))
pw = PeakWeatherDataset(root=root, years=years, freq=freq)
ds, mask = pw.get_observations(
parameters=[k for k in param_names.keys()],
first_date=f"{start:%Y-%m-%d %H:%M}",
last_date=f"{end:%Y-%m-%d %H:%M}",
return_mask=True,
)
ds = (
ds.stack(["nat_abbr", "name"], future_stack=True)
.to_xarray()
.to_dataset(dim="name")
)
mask = (
mask.stack(["nat_abbr", "name"], future_stack=True)
.to_xarray()
.to_dataset(dim="name")
)
ds = ds.where(mask)
ds = ds.rename({"datetime": "time", "nat_abbr": "values"})
ds = ds.rename(param_names)
ds = ds.assign_coords(time=ds.indexes["time"].tz_convert("UTC").tz_localize(None))
ds = ds.assign_coords(values=ds.indexes["values"])
ds = ds.assign_coords(lon=("values", pw.stations_table["longitude"]))
ds = ds.assign_coords(lat=("values", pw.stations_table["latitude"]))
if "T_2M" in ds:
ds["T_2M"] = ds["T_2M"] + 273.15 # convert to Kelvin
ds = ds.dropna("values", how="all")

times = np.datetime64(reftime) + np.asarray(steps, dtype="timedelta64[h]")
return _select_valid_times(ds, times)


def load_truth_data(
root, reftime: datetime, steps: list[int], params: list[str]
) -> xr.Dataset:
"""Load truth data from analysis Zarr or PeakWeather observations."""
if root.suffix == ".zarr":
LOG.info("Loading ground truth from an analysis zarr dataset...")
truth = load_analysis_data_from_zarr(
root=root,
reftime=reftime,
steps=steps,
params=params,
)
truth = truth.compute().chunk(
{"y": -1, "x": -1}
if "y" in truth.dims and "x" in truth.dims
else {"values": -1}
)
elif "peakweather" in str(root):
LOG.info("Loading ground truth from PeakWeather observations...")
truth = load_obs_data_from_peakweather(
root=root,
reftime=reftime,
steps=steps,
params=params,
)
else:
raise ValueError(f"Unsupported truth root: {root}")
return truth


def load_forecast_data(
root, reftime: datetime, steps: list[int], params: list[str]
) -> xr.Dataset:
"""Load forecast data from GRIB files or a baseline Zarr dataset."""

if any(root.glob("*.grib")):
LOG.info("Loading forecasts from GRIB files...")
fcst = load_fct_data_from_grib(
root=root,
reftime=reftime,
steps=steps,
params=params,
)
else:
LOG.info("Loading baseline forecasts from zarr dataset...")
fcst = load_baseline_from_zarr(
root=root,
reftime=reftime,
steps=steps,
params=params,
)

return fcst
12 changes: 6 additions & 6 deletions src/evalml/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,18 +173,18 @@ class BaselineConfig(BaseModel):
)


class AnalysisConfig(BaseModel):
"""Configuration for the analysis data used in the verification."""
class TruthConfig(BaseModel):
"""Configuration for the truth data used in the verification."""

label: str = Field(
...,
min_length=1,
description="Label for the analysis that will be used in experiment results such as reports and figures.",
description="Label that will be used in experiment results such as reports and figures.",
)
analysis_zarr: str = Field(
root: str = Field(
...,
min_length=1,
description="Path to the zarr dataset containing the analysis data.",
description="Path to the root of the dataset.",
)


Expand Down Expand Up @@ -306,7 +306,7 @@ class ConfigModel(BaseModel):
...,
description="Dictionary of baselines to include in the verification.",
)
analysis: AnalysisConfig
truth: TruthConfig | None
stratification: Stratification
locations: Locations
profile: Profile
Expand Down
Loading
Loading