-
Notifications
You must be signed in to change notification settings - Fork 0
Integration of MEC workflow #110
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
c1375ab
9f608f2
e82bd94
c3ab651
7512d96
13301a5
e722e5f
3d9e3c1
918913f
179eb4d
e791a30
d197712
9568987
128eb91
6315afc
e028f59
5406777
8d01490
b7b1311
04c4cf1
b1959dc
23c9599
804455a
f793d85
3839476
5b58b7a
292878d
09f06da
6776572
5d381f1
acce2f7
f9a5889
99753e5
5fa2a34
4d7191b
87a4d07
8e87ea2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| !============================================================================== | ||
| ! namelist template for MEC | ||
| !============================================================================== | ||
|
|
||
| !=================== | ||
| ! general parameters | ||
| !=================== | ||
| &run | ||
| method = 'GMESTAT' ! Model Equivalent Calculator | ||
| model = 'ML' ! forecast model. One of "COSMO" "ICON" "ML" | ||
| input = './input_mod' ! input data path | ||
| data = '/oprusers/osm/opr.emme/data/' ! data path for auxiliary data | ||
| obsinput = './input_obs' ! observation input data path | ||
| output = '.' ! output data to working directory | ||
| time_ana = {{ init_time }}00 ! analysis date YYYYMMDDHHMMSS | ||
| read_fields = 'ps u t v q geof t2m td2m u_10m v_10m' | ||
| grib_edition = 2 | ||
| grib_library = 2 ! GRIB-API used: 1=GRIBEX 2=GRIB2-API | ||
| cosmo_refatm = 2 ! reference atmosphere to be used for COSMO:1or2 | ||
| fc_hours = 0 ! Default is 3h. Has to be set to 0 if one wants to verify +0h leadtime | ||
| nproc1 = 1 | ||
| nproc2 = 1 | ||
| / | ||
|
|
||
| !=============================== | ||
| ! observation related parameters | ||
| !=============================== | ||
| &observations | ||
| !--------------------------------------------------- | ||
| ! read from CDFIN files (if not set use mon/cof/ekf) | ||
| !--------------------------------------------------- | ||
| read_cdfin = F ! (F): dont read COSMO CDFIN files get obs from ekf | ||
| vint_lin_t = T ! linear vertical interpolation for temperature | ||
| vint_lin_z = T ! linear vertical interpolation for geopotential | ||
| vint_lin_uv = T ! linear vertical interpolation for wind | ||
| ptop_lapse = 850. | ||
| pbot_lapse = 950. | ||
| ! int_nn = T ! horizontal interpolation: nearest neighbor | ||
| / | ||
|
|
||
| !==================== | ||
| ! Ensemble parameters | ||
| !==================== | ||
| &ENKF | ||
| k_enkf = 0 ! ensemble size (0 for det. run) | ||
| det_run = 1 ! set to 1 for deterministic run, 0 for ensemble | ||
| / | ||
|
|
||
| !================================ | ||
| ! Verification related parameters | ||
| !================================ | ||
| &veri_obs | ||
| obstypes = "SYNOP" ! "SYNOP TEMP" | ||
| fc_times = {{ leadtimes }} ! forecast lead time at reference (hhmm) 0000,1200,2400,... | ||
| prefix_in = 'mon' ! prefix for input files. ekf or mon | ||
| prefix_out = 'ver' | ||
| rm_old = 2 ! overwrite entries in verification file ? | ||
| fc_file = 'fc__FCR_TIME_00' ! template for forecast file name | ||
| time_range = 1 | ||
| ekf_concat = F | ||
| ref_runtype = 'any' ! accept any runtype for the reference state | ||
| / | ||
|
|
||
| &report | ||
| time_b = -0029 ! (hhmm, inclusive) | ||
| time_e = 0030 ! (hhmm, exclusive) | ||
| / | ||
|
|
||
| &cosmo_obs | ||
| lcd187 = .true. ! use ground based wind lidar obs | ||
| verification_start = -29 ! (min, inclusive) | ||
| verification_end = 30 ! (min, inclusive) | ||
| / | ||
| &synop_obs | ||
| version = 1 | ||
| / |
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we move the |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,213 @@ | ||
| from pathlib import Path | ||
| from datetime import datetime, timedelta | ||
|
|
||
| EXPERIMENT_HASH = short_hash_config() | ||
|
|
||
|
|
||
| # TODO: merge _parse_steps from generate_mec_namelist.py and verif_single_init.py? | ||
| def _parse_steps(steps: str) -> list[int]: | ||
| # check that steps is in the format "start/stop/step" | ||
| if "/" not in steps: | ||
| raise ValueError(f"Expected steps in format 'start/stop/step', got '{steps}'") | ||
| if len(steps.split("/")) != 3: | ||
| raise ValueError(f"Expected steps in format 'start/stop/step', got '{steps}'") | ||
| start, end, step = map(int, steps.split("/")) | ||
| return list(range(start, end + 1, step)) | ||
|
|
||
|
|
||
| # TODO: merge with _ref_times from common.smk? | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not merged but perhaps could be moved to |
||
| def _reftimes_mec(): | ||
| """ | ||
| Construct ref times for MEC. Needs to be max of all | ||
| leadtimes shorter than ref times from the config. | ||
| """ | ||
| cfg = config["dates"] | ||
| if isinstance(cfg, list): | ||
| return [datetime.strptime(t, "%Y-%m-%dT%H:%M") for t in cfg] | ||
| start = datetime.strptime(cfg["start"], "%Y-%m-%dT%H:%M") | ||
| leads = _parse_steps(config["runs"][0]["forecaster"]["steps"]) | ||
| start_mec = start + timedelta(hours=max(leads)) | ||
| end = datetime.strptime(cfg["end"], "%Y-%m-%dT%H:%M") | ||
| freq = _parse_timedelta(cfg["frequency"]) | ||
| times = [] | ||
| t = start_mec | ||
| while t <= end: | ||
| times.append(t) | ||
| t += freq | ||
| return times | ||
|
|
||
|
|
||
| REFTIMES_MEC = _reftimes_mec() | ||
|
|
||
|
|
||
| def init_times_for_mec(wc): | ||
| """ | ||
| Return list of init times (YYYYMMDDHHMM) from init_time - lead ... init_time | ||
| stepping by configured frequency. | ||
| """ | ||
| init = wc.init_time | ||
| base = datetime.strptime(init, "%Y%m%d%H%M") | ||
|
|
||
| lt = get_leadtime(wc) # expects something like "48h" | ||
| lead_h = int(str(lt).rstrip("h")) | ||
| freq_td = _parse_timedelta(config["dates"]["frequency"]) | ||
|
|
||
| # iterate from base - lead to base stepping by the parsed timedelta | ||
| t = base - timedelta(hours=lead_h) | ||
| times = [] | ||
|
|
||
| while t <= base: | ||
| times.append(t.strftime("%Y%m%d%H%M")) | ||
| t += freq_td | ||
| return times | ||
|
|
||
|
|
||
| # prepare_mec_input: setup run dir, gather observations and model data in the run dir for the actual init time | ||
| rule prepare_mec_input: | ||
| input: | ||
| src_dir=OUT_ROOT / "data/runs/{run_id}/{init_time}/grib", | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no rule giving this an output, so this should to the very list trigger some warnings from snakemake. You could specify it as a parameter instead. |
||
| inference_ok=lambda wc: expand( | ||
| rules.execute_inference.output.okfile, | ||
| run_id=wc.run_id, | ||
| init_time=[t.strftime("%Y%m%d%H%M") for t in REFTIMES], | ||
| ), | ||
| output: | ||
| run=directory(OUT_ROOT / "data/runs/{run_id}/{init_time}/mec"), | ||
| obs=directory(OUT_ROOT / "data/runs/{run_id}/{init_time}/mec/input_obs"), | ||
| ekf_file=OUT_ROOT / "data/runs/{run_id}/{init_time}/mec/input_obs/ekfSYNOP.nc", | ||
| fc_file=OUT_ROOT / "data/runs/{run_id}/{init_time}/mec/fc_{init_time}", | ||
| log: | ||
| OUT_ROOT / "data/runs/{run_id}/{init_time}/mec/prepare_mec_input.log", | ||
| shell: | ||
| """ | ||
| ( | ||
| set -euo pipefail | ||
| shopt -s nullglob | ||
|
|
||
| mkdir -p {output.run} {output.obs} | ||
| src_dir="{input.src_dir}" | ||
| fc_file="{output.fc_file}" | ||
|
|
||
| # extract YYYYMM from init_time (which is YYYYMMDDHHMM) | ||
| init="{wildcards.init_time}" | ||
| ym="${{init:0:6}}" | ||
| ymdh="${{init:0:10}}" | ||
| echo "init time: ${{init}}" | ||
|
|
||
| # concatenate all grib files in src_dir into a single file fc_file | ||
| echo "grib files processed:" | ||
| files=( "$src_dir"/20*.grib ) | ||
| if (( ${{#files[@]}} )); then | ||
| printf '%s\n' "${{files[@]}}" | ||
| cat "${{files[@]}}" > "$fc_file" | ||
| else | ||
| echo "WARNING: no grib files found in $src_dir" >&2 | ||
| fi | ||
|
Comment on lines
+97
to
+105
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this really necessary? We are effectively duplicating the entire output data. |
||
|
|
||
| # collect observations (ekfSYNOP) and/or (monSYNOP from DWD; includes precip) files | ||
| cp /store_new/mch/msopr/osm/KENDA-1/EKF/${{ym}}/ekfSYNOP_${{init}}00.nc {output.ekf_file} | ||
| cp /scratch/mch/paa/mec/MEC_ML_input/monFiles2020/hpc/uwork/swahl/temp/feedback/monSYNOP.${{init:0:10}} {output.obs}/monSYNOP.nc | ||
|
|
||
| ) > {log} 2>&1 | ||
| """ | ||
|
|
||
|
|
||
| # link_mec_input: create the input_mod dir with symlinks to all fc files from all source inits | ||
| rule link_mec_input: | ||
| input: | ||
| # list of source fc files produced by prepare_mec_input for each init in the window | ||
| fc_files=lambda wc: [ | ||
| OUT_ROOT / f"data/runs/{wc.run_id}/{t}/mec/fc_{t}" | ||
| for t in init_times_for_mec(wc) | ||
| ], | ||
| output: | ||
| # own the final input_mod directory for this init (and its contents) | ||
| mod=directory(OUT_ROOT / "data/runs/{run_id}/{init_time}/mec/input_mod"), | ||
| log: | ||
| OUT_ROOT / "data/runs/{run_id}/{init_time}/mec/link_mec_input.log", | ||
| shell: | ||
| """ | ||
| ( | ||
| set -euo pipefail | ||
|
|
||
| mkdir -p {output.mod} | ||
| cd {output.mod}/../../.. | ||
|
|
||
| # create symlinks for each source init into this init's input_mod | ||
| for src in {input.fc_files}; do | ||
| src_basename=$(basename "$src") | ||
| echo "Processing source fc file: $src_basename" | ||
| one_init_time="${{src_basename: -12}}" | ||
| realpath_src=$(realpath -m "$PWD/$one_init_time/mec/") | ||
|
|
||
| echo "Linking $realpath_src/$src_basename to {wildcards.init_time}/mec/input_mod/$src_basename" | ||
| ln -s "$realpath_src/$src_basename" {wildcards.init_time}/mec/input_mod/"$src_basename" | ||
| done | ||
| ) > {log} 2>&1 | ||
| """ | ||
|
|
||
|
|
||
| rule generate_mec_namelist: | ||
| localrule: True | ||
| input: | ||
| script="workflow/scripts/generate_mec_namelist.py", | ||
| template="resources/mec/namelist.jinja2", | ||
| mod_dir=directory(rules.link_mec_input.output.mod), | ||
| output: | ||
| namelist=OUT_ROOT / "data/runs/{run_id}/{init_time}/mec/namelist", | ||
| params: | ||
| steps=lambda wc: RUN_CONFIGS[wc.run_id]["steps"], | ||
| shell: | ||
| """ | ||
| uv run {input.script} \ | ||
| --steps {params.steps} \ | ||
| --init_time {wildcards.init_time} \ | ||
| --template {input.template} \ | ||
| --namelist {output.namelist} | ||
| """ | ||
|
|
||
|
|
||
| rule run_mec: | ||
| input: | ||
| namelist=rules.generate_mec_namelist.output.namelist, | ||
| run_dir=directory(rules.prepare_mec_input.output.run), | ||
| mod_dir=directory(rules.link_mec_input.output.mod), | ||
| output: | ||
| fdbk_file=OUT_ROOT / "data/runs/{run_id}/fdbk_files/verSYNOP_{init_time}00.nc", | ||
| wildcard_constraints: | ||
| init_time=r"\d{12}", | ||
| log: | ||
| OUT_ROOT / "data/runs/{run_id}/{init_time}/mec/run_mec.log", | ||
| shell: | ||
| """ | ||
| ( | ||
| set -euo pipefail | ||
|
|
||
| # Run MEC inside sarus container | ||
| # Note: pull command currently needed only once to download the container | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the pull command could then be factored out into a separate rule that is run only once before launching all the parallel MEC jobs |
||
| sarus pull container-registry.meteoswiss.ch/mecctr/mec-container:0.1.0-main | ||
| abs_run_dir=$(realpath {input.run_dir}) | ||
| abs_mod_root=$(realpath {input.run_dir}/../..) # two levels up (so that all links are mounted to the container) | ||
|
|
||
| # build mount options in a variable for readability | ||
| MOUNTS="\ | ||
| --mount=type=bind,source=$abs_run_dir,destination=/src/bin2 \ | ||
| --mount=type=bind,source=$abs_mod_root,destination=$abs_mod_root,readonly \ | ||
| --mount=type=bind,source=/oprusers/osm/opr.emme/data/,destination=/oprusers/osm/opr.emme/data/ \ | ||
| " | ||
|
|
||
| # run container (split over multiple lines for readability) | ||
| sarus run $MOUNTS container-registry.meteoswiss.ch/mecctr/mec-container:0.1.0-main | ||
|
|
||
| # Run MEC using local executable (Alternative to sarus container) | ||
| #cd {input.run_dir} | ||
| #export LM_HOST=balfrin-ln003 | ||
| #source /oprusers/osm/opr.emme/abs/mec.env | ||
| #./mec > ./mec_out.log 2>&1 | ||
|
|
||
| # copy the output file to the final location for the Feedback files plus renaming to | ||
| # match NWP naming conventions (verSYNOP_YYYYMMDDHHMMSS.nc) | ||
| mkdir -p {input.run_dir}/../../fdbk_files | ||
| cp {input.run_dir}/verSYNOP.nc {input.run_dir}/../../fdbk_files/verSYNOP_{wildcards.init_time}00.nc | ||
| ) > {log} 2>&1 | ||
| """ | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to have a closer look at this, I don0't understand why this problem would appear with your changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without it snakemake complains. Thanks for having a closer look at it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this clarified?