diff --git a/config/forecasters-co2.yaml b/config/forecasters-co2.yaml index ef881d49..d32aa125 100644 --- a/config/forecasters-co2.yaml +++ b/config/forecasters-co2.yaml @@ -1,17 +1,17 @@ # yaml-language-server: $schema=../workflow/tools/config.schema.json -description: | +description: Evaluate skill of COSMO-E emulator (M-1 forecaster). dates: - - 2020-02-03T00:00 # Storm Petra - - 2020-02-07T00:00 # Storm Sabine - - 2020-10-01T00:00 # Storm Brigitte + start: 2020-07-25T00:00 + end: 2020-07-27T00:00 + frequency: 12h runs: - forecaster: mlflow_id: d0846032fc7248a58b089cbe8fa4c511 label: M-1 forecaster - steps: 0/120/6 + steps: 0/24/12 config: resources/inference/configs/sgm-forecaster-global_trimedge.yaml extra_dependencies: - git+https://github.com/ecmwf/anemoi-inference.git@0.8.3 @@ -21,7 +21,7 @@ baselines: baseline_id: COSMO-E label: COSMO-E root: /store_new/mch/msopr/ml/COSMO-E - steps: 0/120/6 + steps: 0/24/12 analysis: label: COSMO KENDA diff --git a/resources/mec/namelist.jinja2 b/resources/mec/namelist.jinja2 new file mode 100644 index 00000000..e6fceba5 --- /dev/null +++ b/resources/mec/namelist.jinja2 @@ -0,0 +1,76 @@ +!============================================================================== +! namelist template for MEC +!============================================================================== + + !=================== + ! general parameters + !=================== + &run + method = 'GMESTAT' ! Model Equivalent Calculator + model = 'ML' ! forecast model. One of "COSMO" "ICON" "ML" + input = './input_mod' ! input data path + data = '/oprusers/osm/opr.emme/data/' ! data path for auxiliary data + obsinput = './input_obs' ! observation input data path + output = '.' ! output data to working directory + time_ana = {{ init_time }}00 ! analysis date YYYYMMDDHHMMSS + read_fields = 'ps u t v q geof t2m td2m u_10m v_10m' + grib_edition = 2 + grib_library = 2 ! GRIB-API used: 1=GRIBEX 2=GRIB2-API + cosmo_refatm = 2 ! reference atmosphere to be used for COSMO:1or2 + fc_hours = 0 ! Default is 3h. Has to be set to 0 if one wants to verify +0h leadtime + nproc1 = 1 + nproc2 = 1 + / + + !=============================== + ! observation related parameters + !=============================== + &observations + !--------------------------------------------------- + ! read from CDFIN files (if not set use mon/cof/ekf) + !--------------------------------------------------- + read_cdfin = F ! (F): dont read COSMO CDFIN files get obs from ekf + vint_lin_t = T ! linear vertical interpolation for temperature + vint_lin_z = T ! linear vertical interpolation for geopotential + vint_lin_uv = T ! linear vertical interpolation for wind + ptop_lapse = 850. + pbot_lapse = 950. +! int_nn = T ! horizontal interpolation: nearest neighbor + / + + !==================== + ! Ensemble parameters + !==================== + &ENKF + k_enkf = 0 ! ensemble size (0 for det. run) + det_run = 1 ! set to 1 for deterministic run, 0 for ensemble + / + + !================================ + ! Verification related parameters + !================================ + &veri_obs + obstypes = "SYNOP" ! "SYNOP TEMP" + fc_times = {{ leadtimes }} ! forecast lead time at reference (hhmm) 0000,1200,2400,... + prefix_in = 'mon' ! prefix for input files. ekf or mon + prefix_out = 'ver' + rm_old = 2 ! overwrite entries in verification file ? + fc_file = 'fc__FCR_TIME_00' ! template for forecast file name + time_range = 1 + ekf_concat = F + ref_runtype = 'any' ! accept any runtype for the reference state + / + + &report + time_b = -0029 ! (hhmm, inclusive) + time_e = 0030 ! (hhmm, exclusive) + / + + &cosmo_obs + lcd187 = .true. ! use ground based wind lidar obs + verification_start = -29 ! (min, inclusive) + verification_end = 30 ! (min, inclusive) + / + &synop_obs + version = 1 + / diff --git a/workflow/Snakefile b/workflow/Snakefile index e7594fc8..140b3c97 100644 --- a/workflow/Snakefile +++ b/workflow/Snakefile @@ -19,6 +19,7 @@ include: "rules/inference.smk" include: "rules/verif.smk" include: "rules/report.smk" include: "rules/plot.smk" +include: "rules/verif_obs.smk" # about workflow @@ -37,6 +38,10 @@ LOGS_DIR = OUT_ROOT / "logs" RESULTS_DIR = OUT_ROOT / "results" / EXPERIMENT_NAME +# prefer one rule because snakemake complains about ambiguous rules (same output) +ruleorder: prepare_inference_forecaster > prepare_inference_interpolator + + # optional messages, log and error handling # ----------------------------------------------------- @@ -126,6 +131,11 @@ rule experiment_all: rules.verif_metrics_plot.output, experiment=EXPERIMENT_NAME, ), + expand( + OUT_ROOT / "data/runs/{run_id}/fdbk_files/verSYNOP_{init_time}00.nc", + init_time=[t.strftime("%Y%m%d%H%M") for t in REFTIMES_MEC], + run_id=CANDIDATES, + ), rule showcase_all: @@ -162,10 +172,17 @@ rule run_inference_all: """Run inference for all reference times as defined in the configuration.""" input: expand( - OUT_ROOT / "data/runs/{run_id}/{init_time}/raw", + rules.execute_inference.output.okfile, init_time=[t.strftime("%Y%m%d%H%M") for t in REFTIMES], run_id=CANDIDATES, ), + output: + run_all_ok=touch(OUT_ROOT / "logs/run_inference_all.ok"), + shell: + """ + mkdir -p $(dirname {output.run_all_ok}) + touch {output.run_all_ok} + """ rule verif_metrics_all: @@ -184,3 +201,12 @@ rule verif_metrics_plot_all: rules.verif_metrics_plot.output, experiment=EXPERIMENT_NAME, ), + + +rule verif_obs_all: + input: + expand( + rules.run_mec.output, + init_time=[t.strftime("%Y%m%d%H%M") for t in REFTIMES_MEC], + run_id=CANDIDATES, + ), diff --git a/workflow/rules/verif_obs.smk b/workflow/rules/verif_obs.smk new file mode 100644 index 00000000..1f22fa74 --- /dev/null +++ b/workflow/rules/verif_obs.smk @@ -0,0 +1,213 @@ +from pathlib import Path +from datetime import datetime, timedelta + +EXPERIMENT_HASH = short_hash_config() + + +# TODO: merge _parse_steps from generate_mec_namelist.py and verif_single_init.py? +def _parse_steps(steps: str) -> list[int]: + # check that steps is in the format "start/stop/step" + if "/" not in steps: + raise ValueError(f"Expected steps in format 'start/stop/step', got '{steps}'") + if len(steps.split("/")) != 3: + raise ValueError(f"Expected steps in format 'start/stop/step', got '{steps}'") + start, end, step = map(int, steps.split("/")) + return list(range(start, end + 1, step)) + + +# TODO: merge with _ref_times from common.smk? +def _reftimes_mec(): + """ + Construct ref times for MEC. Needs to be max of all + leadtimes shorter than ref times from the config. + """ + cfg = config["dates"] + if isinstance(cfg, list): + return [datetime.strptime(t, "%Y-%m-%dT%H:%M") for t in cfg] + start = datetime.strptime(cfg["start"], "%Y-%m-%dT%H:%M") + leads = _parse_steps(config["runs"][0]["forecaster"]["steps"]) + start_mec = start + timedelta(hours=max(leads)) + end = datetime.strptime(cfg["end"], "%Y-%m-%dT%H:%M") + freq = _parse_timedelta(cfg["frequency"]) + times = [] + t = start_mec + while t <= end: + times.append(t) + t += freq + return times + + +REFTIMES_MEC = _reftimes_mec() + + +def init_times_for_mec(wc): + """ + Return list of init times (YYYYMMDDHHMM) from init_time - lead ... init_time + stepping by configured frequency. + """ + init = wc.init_time + base = datetime.strptime(init, "%Y%m%d%H%M") + + lt = get_leadtime(wc) # expects something like "48h" + lead_h = int(str(lt).rstrip("h")) + freq_td = _parse_timedelta(config["dates"]["frequency"]) + + # iterate from base - lead to base stepping by the parsed timedelta + t = base - timedelta(hours=lead_h) + times = [] + + while t <= base: + times.append(t.strftime("%Y%m%d%H%M")) + t += freq_td + return times + + +# prepare_mec_input: setup run dir, gather observations and model data in the run dir for the actual init time +rule prepare_mec_input: + input: + src_dir=OUT_ROOT / "data/runs/{run_id}/{init_time}/grib", + inference_ok=lambda wc: expand( + rules.execute_inference.output.okfile, + run_id=wc.run_id, + init_time=[t.strftime("%Y%m%d%H%M") for t in REFTIMES], + ), + output: + run=directory(OUT_ROOT / "data/runs/{run_id}/{init_time}/mec"), + obs=directory(OUT_ROOT / "data/runs/{run_id}/{init_time}/mec/input_obs"), + ekf_file=OUT_ROOT / "data/runs/{run_id}/{init_time}/mec/input_obs/ekfSYNOP.nc", + fc_file=OUT_ROOT / "data/runs/{run_id}/{init_time}/mec/fc_{init_time}", + log: + OUT_ROOT / "data/runs/{run_id}/{init_time}/mec/prepare_mec_input.log", + shell: + """ + ( + set -euo pipefail + shopt -s nullglob + + mkdir -p {output.run} {output.obs} + src_dir="{input.src_dir}" + fc_file="{output.fc_file}" + + # extract YYYYMM from init_time (which is YYYYMMDDHHMM) + init="{wildcards.init_time}" + ym="${{init:0:6}}" + ymdh="${{init:0:10}}" + echo "init time: ${{init}}" + + # concatenate all grib files in src_dir into a single file fc_file + echo "grib files processed:" + files=( "$src_dir"/20*.grib ) + if (( ${{#files[@]}} )); then + printf '%s\n' "${{files[@]}}" + cat "${{files[@]}}" > "$fc_file" + else + echo "WARNING: no grib files found in $src_dir" >&2 + fi + + # collect observations (ekfSYNOP) and/or (monSYNOP from DWD; includes precip) files + cp /store_new/mch/msopr/osm/KENDA-1/EKF/${{ym}}/ekfSYNOP_${{init}}00.nc {output.ekf_file} + cp /scratch/mch/paa/mec/MEC_ML_input/monFiles2020/hpc/uwork/swahl/temp/feedback/monSYNOP.${{init:0:10}} {output.obs}/monSYNOP.nc + + ) > {log} 2>&1 + """ + + +# link_mec_input: create the input_mod dir with symlinks to all fc files from all source inits +rule link_mec_input: + input: + # list of source fc files produced by prepare_mec_input for each init in the window + fc_files=lambda wc: [ + OUT_ROOT / f"data/runs/{wc.run_id}/{t}/mec/fc_{t}" + for t in init_times_for_mec(wc) + ], + output: + # own the final input_mod directory for this init (and its contents) + mod=directory(OUT_ROOT / "data/runs/{run_id}/{init_time}/mec/input_mod"), + log: + OUT_ROOT / "data/runs/{run_id}/{init_time}/mec/link_mec_input.log", + shell: + """ + ( + set -euo pipefail + + mkdir -p {output.mod} + cd {output.mod}/../../.. + + # create symlinks for each source init into this init's input_mod + for src in {input.fc_files}; do + src_basename=$(basename "$src") + echo "Processing source fc file: $src_basename" + one_init_time="${{src_basename: -12}}" + realpath_src=$(realpath -m "$PWD/$one_init_time/mec/") + + echo "Linking $realpath_src/$src_basename to {wildcards.init_time}/mec/input_mod/$src_basename" + ln -s "$realpath_src/$src_basename" {wildcards.init_time}/mec/input_mod/"$src_basename" + done + ) > {log} 2>&1 + """ + + +rule generate_mec_namelist: + localrule: True + input: + script="workflow/scripts/generate_mec_namelist.py", + template="resources/mec/namelist.jinja2", + mod_dir=directory(rules.link_mec_input.output.mod), + output: + namelist=OUT_ROOT / "data/runs/{run_id}/{init_time}/mec/namelist", + params: + steps=lambda wc: RUN_CONFIGS[wc.run_id]["steps"], + shell: + """ + uv run {input.script} \ + --steps {params.steps} \ + --init_time {wildcards.init_time} \ + --template {input.template} \ + --namelist {output.namelist} + """ + + +rule run_mec: + input: + namelist=rules.generate_mec_namelist.output.namelist, + run_dir=directory(rules.prepare_mec_input.output.run), + mod_dir=directory(rules.link_mec_input.output.mod), + output: + fdbk_file=OUT_ROOT / "data/runs/{run_id}/fdbk_files/verSYNOP_{init_time}00.nc", + wildcard_constraints: + init_time=r"\d{12}", + log: + OUT_ROOT / "data/runs/{run_id}/{init_time}/mec/run_mec.log", + shell: + """ + ( + set -euo pipefail + + # Run MEC inside sarus container + # Note: pull command currently needed only once to download the container + sarus pull container-registry.meteoswiss.ch/mecctr/mec-container:0.1.0-main + abs_run_dir=$(realpath {input.run_dir}) + abs_mod_root=$(realpath {input.run_dir}/../..) # two levels up (so that all links are mounted to the container) + + # build mount options in a variable for readability + MOUNTS="\ + --mount=type=bind,source=$abs_run_dir,destination=/src/bin2 \ + --mount=type=bind,source=$abs_mod_root,destination=$abs_mod_root,readonly \ + --mount=type=bind,source=/oprusers/osm/opr.emme/data/,destination=/oprusers/osm/opr.emme/data/ \ + " + + # run container (split over multiple lines for readability) + sarus run $MOUNTS container-registry.meteoswiss.ch/mecctr/mec-container:0.1.0-main + + # Run MEC using local executable (Alternative to sarus container) + #cd {input.run_dir} + #export LM_HOST=balfrin-ln003 + #source /oprusers/osm/opr.emme/abs/mec.env + #./mec > ./mec_out.log 2>&1 + + # copy the output file to the final location for the Feedback files plus renaming to + # match NWP naming conventions (verSYNOP_YYYYMMDDHHMMSS.nc) + mkdir -p {input.run_dir}/../../fdbk_files + cp {input.run_dir}/verSYNOP.nc {input.run_dir}/../../fdbk_files/verSYNOP_{wildcards.init_time}00.nc + ) > {log} 2>&1 + """ diff --git a/workflow/scripts/generate_mec_namelist.py b/workflow/scripts/generate_mec_namelist.py new file mode 100644 index 00000000..0bfe145a --- /dev/null +++ b/workflow/scripts/generate_mec_namelist.py @@ -0,0 +1,71 @@ +import logging +from argparse import ArgumentParser +from datetime import datetime +from pathlib import Path + +import jinja2 + +LOG = logging.getLogger(__name__) +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) + + +def _parse_steps(steps: str) -> int: + # check that steps is in the format "start/stop/step" + if "/" not in steps: + raise ValueError(f"Expected steps in format 'start/stop/step', got '{steps}'") + if len(steps.split("/")) != 3: + raise ValueError(f"Expected steps in format 'start/stop/step', got '{steps}'") + start, end, step = map(int, steps.split("/")) + return list(range(start, end + 1, step)) + + +def main(args): + # Include stop_h (inclusive). Produce strings like 0000,0600,1200,...,12000 + lead_hours = args.steps + leadtimes = ",".join(f"{h:02d}00" for h in lead_hours) + + # Render template with init_time and computed leadtimes + context = {"init_time": f"{args.init_time:%Y%m%d%H%M}", "leadtimes": leadtimes} + template_path = Path(args.template) + env = jinja2.Environment(loader=jinja2.FileSystemLoader(str(template_path.parent))) + template = env.get_template(template_path.name) + namelist = template.render(**context) + # Ensure file ends with a newline (prevent editors/tools from removing final RETURN) + if not namelist.endswith("\n"): + namelist += "\n" + LOG.info("MEC namelist created:\n%s", namelist) + + out_path = Path(str(args.namelist)) + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", encoding="utf-8") as f: + f.write(namelist) + + +if __name__ == "__main__": + parser = ArgumentParser() + + parser.add_argument("--steps", type=_parse_steps, default="0/120/6") + + parser.add_argument( + "--init_time", + type=lambda s: datetime.strptime(s, "%Y%m%d%H%M"), + default="202010010000", + help="Valid time for the data in ISO format.", + ) + + parser.add_argument( + "--template", + type=str, + ) + + parser.add_argument( + "--namelist", + type=str, + help="Anything useful", + ) + + args = parser.parse_args() + + main(args)