diff --git a/pyproject.toml b/pyproject.toml index 4b86e7a..a941a30 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -120,7 +120,7 @@ version = { attr = "zppy_interfaces.version.__version__" } [project.scripts] zi-global-time-series = "zppy_interfaces.global_time_series.__main__:main" zi-pcmdi-link-observation = "zppy_interfaces.pcmdi_diags.link_observation:main" -zi-pcmdi-mean-climate = "zppy_interfaces.pcmdi_diags.pcmdi_mean_cimate:main" +zi-pcmdi-mean-climate = "zppy_interfaces.pcmdi_diags.pcmdi_mean_climate:main" zi-pcmdi-variability-modes = "zppy_interfaces.pcmdi_diags.pcmdi_variability_modes:main" zi-pcmdi-enso = "zppy_interfaces.pcmdi_diags.pcmdi_enso:main" zi-pcmdi-synthetic-plots = "zppy_interfaces.pcmdi_diags.pcmdi_synthetic_plots:main" diff --git a/tests/unit/pcmdi_diags/test_pcmdi_mean_climate.py b/tests/unit/pcmdi_diags/test_pcmdi_mean_climate.py index ae2c6d6..df6ff1b 100644 --- a/tests/unit/pcmdi_diags/test_pcmdi_mean_climate.py +++ b/tests/unit/pcmdi_diags/test_pcmdi_mean_climate.py @@ -1,6 +1,6 @@ from typing import List -from zppy_interfaces.pcmdi_diags.pcmdi_mean_cimate import generate_mean_clim_cmds +from zppy_interfaces.pcmdi_diags.pcmdi_mean_climate import generate_mean_clim_cmds def test_generate_mean_clim_cmds(): diff --git a/tests/unit/pcmdi_diags/test_pcmdi_variability_modes.py b/tests/unit/pcmdi_diags/test_pcmdi_variability_modes.py index b3a2d81..8323702 100644 --- a/tests/unit/pcmdi_diags/test_pcmdi_variability_modes.py +++ b/tests/unit/pcmdi_diags/test_pcmdi_variability_modes.py @@ -60,7 +60,7 @@ def test_generate_varmode_cmds(): "v20250923", ) expected = [ - "variability_modes_driver.py -p parameterfile.py --variability_mode mode1 --eofn_mod 1 --eofn_obs 1 --varOBS varOBS --osyear reftyrs --oeyear reftyre --reference_data_name refname --reference_data_path refpath --case_id v20250923", - "variability_modes_driver.py -p parameterfile.py --variability_mode mode2 --eofn_mod 1 --eofn_obs 1 --varOBS varOBS --osyear reftyrs --oeyear reftyre --reference_data_name refname --reference_data_path refpath --case_id v20250923", + 'variability_modes_driver.py -p parameterfile.py --variability_mode mode1 --eofn_mod 1 --eofn_obs 1 --varOBS varOBS --osyear reftyrs --oeyear reftyre --reference_data_name refname --reference_data_path "refpath" --case_id v20250923', + 'variability_modes_driver.py -p parameterfile.py --variability_mode mode2 --eofn_mod 1 --eofn_obs 1 --varOBS varOBS --osyear reftyrs --oeyear reftyre --reference_data_name refname --reference_data_path "refpath" --case_id v20250923', ] assert actual == expected diff --git a/tests/unit/pcmdi_diags/test_synthetic_metrics_plotter.py b/tests/unit/pcmdi_diags/test_synthetic_metrics_plotter.py new file mode 100644 index 0000000..45dfdcd --- /dev/null +++ b/tests/unit/pcmdi_diags/test_synthetic_metrics_plotter.py @@ -0,0 +1,86 @@ +import pandas as pd + +from zppy_interfaces.pcmdi_diags.synthetic_plots import synthetic_metrics_plotter +from zppy_interfaces.pcmdi_diags.synthetic_plots.synthetic_metrics_plotter import ( + drop_vars, + mean_climate_plot_driver, +) + + +def _mean_climate_frame(**values): + data = { + "model": ["CMIP", "E3SM"], + "run": ["r1", "r1"], + "model_run": ["CMIP_r1", "E3SM_r1"], + } + data.update(values) + return pd.DataFrame(data) + + +def test_drop_vars_removes_requested_variables_missing_from_dataframe(): + data_dict, var_names, var_units = drop_vars( + _mean_climate_frame(pr=[1.0, 2.0]), + ["pr", "prw"], + ["mm/day", "kg/m2"], + ) + + assert "prw" not in data_dict.columns + assert var_names == ["pr"] + assert var_units == ["mm/day"] + + +def test_mean_climate_portrait_skips_region_missing_variables(monkeypatch, tmp_path): + captured = {} + + def fake_portrait_metric_plot( + region, + stat, + group, + data_dict, + stat_name, + model_name, + var_list, + model_list, + out_path, + fig_format, + ): + captured["region"] = region + captured["var_list"] = var_list + captured["data_dict"] = data_dict + + monkeypatch.setattr( + synthetic_metrics_plotter, + "portrait_metric_plot", + fake_portrait_metric_plot, + ) + + metric_dict = { + "type": ["portrait"], + "region": ["ocean"], + "season": ["djf", "mam", "jja", "son"], + "name": "Mean Bias", + } + df_dict = { + "djf": {"ocean": _mean_climate_frame(pr=[1.0, 2.0], prw=[3.0, 4.0])}, + "mam": {"ocean": _mean_climate_frame(pr=[1.0, 2.0])}, + "jja": {"ocean": _mean_climate_frame(pr=[1.0, 2.0], prw=[3.0, 4.0])}, + "son": {"ocean": _mean_climate_frame(pr=[1.0, 2.0], prw=[3.0, 4.0])}, + } + + mean_climate_plot_driver( + metric="mean_climate", + stat="mae_xy", + regions=["ocean"], + model_name=["E3SM"], + metric_dict=metric_dict, + df_dict=df_dict, + var_list=["pr", "prw"], + var_unit_list=["mm/day", "kg/m2"], + save_data=False, + out_path=str(tmp_path), + fig_format="png", + ) + + assert captured["region"] == "ocean" + assert captured["var_list"] == ["pr"] + assert all(values.shape == (1, 2) for values in captured["data_dict"].values()) diff --git a/tests/unit/pcmdi_diags/test_viewer.py b/tests/unit/pcmdi_diags/test_viewer.py index 0e72222..01cc21d 100644 --- a/tests/unit/pcmdi_diags/test_viewer.py +++ b/tests/unit/pcmdi_diags/test_viewer.py @@ -1,6 +1,126 @@ -from zppy_interfaces.pcmdi_diags.viewer import safe_join +from zppy_interfaces.pcmdi_diags.viewer import ( + CMVARGroupBuilder, + generate_cmvar_table, + generate_data_html, + generate_emovs_table, + safe_join, +) def test_safe_join(): assert safe_join("a", "b") == "a/b" assert safe_join("a/", "b") == "a/b" + + +def test_coupled_mov_compose_eof_uses_mode_eof(tmp_path): + fig_dir = tmp_path / "figures" + diag_dir = tmp_path / "viewer" + yearly_dir = fig_dir / "MOV_compose" / "yearly" + yearly_dir.mkdir(parents=True) + diag_dir.mkdir() + + (yearly_dir / "MOV_compose_AMO_yearly_eof1.png").touch() + (yearly_dir / "MOV_compose_PDO_yearly_eof1.png").touch() + (yearly_dir / "MOV_compose_NPGO_yearly_eof2.png").touch() + + builder = CMVARGroupBuilder() + + amo_row = builder.generate_mcpl_row("AMO", str(diag_dir), str(fig_dir)) + amo_compose_cell = amo_row[2]["content"] + assert "AMO (SST)" in amo_row[0]["content"] + assert "MOV_compose_AMO_yearly_eof1.png" in amo_compose_cell + + pdo_row = builder.generate_mcpl_row("PDO", str(diag_dir), str(fig_dir)) + pdo_compose_cell = pdo_row[2]["content"] + assert "PDO (SST)" in pdo_row[0]["content"] + assert "MOV_compose_PDO_yearly_eof1.png" in pdo_compose_cell + assert "MOV_compose_PDO_yearly_cbf.png" not in pdo_compose_cell + + npgo_row = builder.generate_mcpl_row("NPGO", str(diag_dir), str(fig_dir)) + npgo_compose_cell = npgo_row[2]["content"] + assert "NPGO (SST)" in npgo_row[0]["content"] + assert "MOV_compose_NPGO_yearly_eof2.png" in npgo_compose_cell + assert "MOV_compose_NPGO_yearly_eof1.png" not in npgo_compose_cell + + +def test_coupled_modes_are_normalized_from_config(tmp_path): + fig_dir = tmp_path / "figures" + diag_dir = tmp_path / "viewer" + yearly_dir = fig_dir / "MOV_metric" / "MOV_compose" / "yearly" + yearly_dir.mkdir(parents=True) + diag_dir.mkdir() + + (yearly_dir / "MOV_compose_NPGO_yearly_eof2.png").touch() + + table = generate_cmvar_table( + str(diag_dir), + str(fig_dir), + enso_show=False, + movc_show=True, + movc_modes=" npgo ", + ) + + assert len(table) == 1 + assert "NPGO (SST)" in table[0][0]["content"] + assert "MOV_compose_NPGO_yearly_eof2.png" in table[0][2]["content"] + assert "MOV_compose_NPGO_yearly_eof1.png" not in table[0][2]["content"] + + +def test_emovs_table_defaults_to_atmospheric_modes(tmp_path): + table = generate_emovs_table(str(tmp_path / "viewer"), str(tmp_path / "figures")) + + first_cells = [row[0]["content"] for row in table if row and "rowspan" in row[0]] + assert first_cells == [ + "NAM (PSL)", + "PNA (PSL)", + "NPO (PSL)", + "NAO (PSL)", + "SAM (PSL)", + "PSA1 (PSL)", + "PSA2 (PSL)", + ] + + +def test_emovs_modes_are_normalized_from_config(tmp_path): + fig_dir = tmp_path / "figures" + compose_dir = fig_dir / "MOV_metric" / "MOV_compose" / "DJF" + compose_dir.mkdir(parents=True) + (compose_dir / "MOV_compose_NPO_DJF_eof2.png").touch() + (compose_dir / "MOV_compose_PSA2_DJF_eof3.png").touch() + + table = generate_emovs_table( + str(tmp_path / "viewer"), + str(fig_dir), + modes=" npo, psa2 ", + ) + + first_cells = [row[0]["content"] for row in table if row and "rowspan" in row[0]] + assert first_cells == ["NPO (PSL)", "PSA2 (PSL)"] + assert "MOV_compose_NPO_DJF_eof2.png" in table[1][1]["content"] + assert "MOV_compose_PSA2_DJF_eof3.png" in table[10][1]["content"] + + +def test_generate_data_html_creates_out_dir_and_keeps_string_lists(tmp_path): + template_dir = tmp_path / "templates" + out_dir = tmp_path / "viewer" + template_dir.mkdir(parents=True) + (template_dir / "data_template.html").write_text( + "{% for section in sections %}" + "{% for row in section.rows %}{{ row.description }}\n{% endfor %}" + "{% endfor %}" + ) + + out_path = generate_data_html( + { + "template_dir": str(template_dir), + "out_dir": str(out_dir), + "clim_viewer": True, + "clim_vars": "pr,tas", + } + ) + + html = (out_dir / "diag_data.html").read_text() + assert out_path == str(out_dir / "diag_data.html") + assert out_dir.is_dir() + assert "pr,tas" in html + assert "p, r" not in html diff --git a/zppy_interfaces/pcmdi_diags/link_observation.py b/zppy_interfaces/pcmdi_diags/link_observation.py index 897ff34..e0ab206 100644 --- a/zppy_interfaces/pcmdi_diags/link_observation.py +++ b/zppy_interfaces/pcmdi_diags/link_observation.py @@ -44,7 +44,8 @@ def __init__( self.obs_sets = obs_sets self.ts_dir_ref_source = ts_dir_ref_source self.obstmp_dir = obstmp_dir - self.obs_dic = json.load(open(obs_alias_file)) + with open(obs_alias_file) as _f: + self.obs_dic = json.load(_f) self.altobs_dic = altobs_dic def _resolve_obs_file(self, varin, obsid): diff --git a/zppy_interfaces/pcmdi_diags/pcmdi_enso.py b/zppy_interfaces/pcmdi_diags/pcmdi_enso.py index 741077e..cc450ca 100644 --- a/zppy_interfaces/pcmdi_diags/pcmdi_enso.py +++ b/zppy_interfaces/pcmdi_diags/pcmdi_enso.py @@ -3,6 +3,7 @@ import json import os import re +import shutil import sys import time from collections import OrderedDict @@ -25,7 +26,10 @@ # Classes ##################################################################### class ENSOParameters(object): def __init__(self, args: Dict[str, str]): - self.enso_groups: str = args["enso_groups"] + enso_groups = args.get("enso_groups") + if not enso_groups: + raise ValueError("--enso_groups is required but was not provided.") + self.enso_groups: str = enso_groups class EnsoDiagnosticsCollector: @@ -34,6 +38,11 @@ def __init__( ): self.fig_format = fig_format self.refname = refname + if len(model_name_parts) != 4: + raise ValueError( + f"model_name must have 4 dot-separated parts (mip.exp.model.relm), " + f"got {len(model_name_parts)}: {model_name_parts}" + ) self.mip, self.exp, self.model, self.relm = model_name_parts self.case_id = case_id self.model_name = f"{self.mip}.{self.exp}.{self.model}_{self.relm}" @@ -45,17 +54,32 @@ def __init__( def collect_figures(self, groups) -> bool: logger.info("Entering EnsoDiagnosticsCollector.collect_figures") success: bool = True + for fset, (subdir, pattern) in self.fig_sets.items(): logger.info(f"Processing {fset}, ({subdir}, {pattern})") fdir = self.input_dir.replace("%(output_type)", subdir) logger.info(f"Processing fdir={fdir}") - found_groups: List[str] = os.listdir(fdir) - if sorted(groups) != sorted(found_groups): + + if not os.path.isdir(fdir): + logger.error(f"Expected output directory does not exist: {fdir}") + success = False + continue + + found_groups: List[str] = [ + name + for name in os.listdir(fdir) + if os.path.isdir(os.path.join(fdir, name)) + ] + missing_groups = sorted(set(groups) - set(found_groups)) + + if missing_groups: logger.error( - f"Groups mismatch: expected {sorted(groups)}, found {sorted(found_groups)} in {fdir}" + f"Missing expected group directories: {missing_groups}; " + f"found {sorted(found_groups)} in {fdir}" ) success = False continue + for group in groups: logger.info(f"Processing group={group}") template = os.path.join(fdir, group, f"{pattern}.{self.fig_format}") @@ -65,20 +89,48 @@ def collect_figures(self, groups) -> bool: fpaths = sorted(glob.glob(template)) if not fpaths: + group_dir = os.path.join(fdir, group) + dir_contents = ( + os.listdir(group_dir) + if os.path.isdir(group_dir) + else "" + ) logger.error( - f"fpaths={fpaths}, self.input_dir={self.input_dir}, template={os.path.abspath(template)}, files in template={os.listdir(os.path.join(fdir, group))}" + f"No figures found. input_dir={self.input_dir}, " + f"template={os.path.abspath(template)}, " + f"files in group dir={dir_contents}" ) success = False + for fpath in fpaths: logger.info(f"Processing fpath={fpath}") - tail = fpath.split("/")[-1].split(f"{self.model}_{self.relm}")[-1] + fname = os.path.basename(fpath) + marker = f"{self.model}_{self.relm}" + + if marker in fname: + tail = fname.split(marker, 1)[-1] + outfile = f"{group}{tail}" + else: + logger.warning( + f"Expected '{marker}' in filename '{fname}'; " + "using full filename as output." + ) + outfile = fname + outpath = os.path.join( self.output_dir.replace("%(group_type)", fset), group ) logger.info(f"outpath={outpath}") os.makedirs(outpath, exist_ok=True) - outfile = f"{group}{tail}" - os.rename(fpath, os.path.join(outpath, outfile)) + + dest = os.path.join(outpath, outfile) + if os.path.isdir(dest): + raise IsADirectoryError(f"Destination is a directory: {dest}") + if os.path.exists(dest): + logger.warning(f"Destination already exists, replacing: {dest}") + os.remove(dest) + shutil.move(fpath, dest) + return success def collect_metrics(self) -> bool: @@ -88,14 +140,19 @@ def collect_metrics(self) -> bool: fpaths = sorted(glob.glob(os.path.join(inpath, "*/*.json"))) if not fpaths: + dir_contents = ( + os.listdir(inpath) if os.path.isdir(inpath) else "" + ) logger.error( - f"fpaths={fpaths}, self.input_dir={self.input_dir}, inpath={os.path.abspath(inpath)}, files in inpath={os.listdir(inpath)}" + f"No metrics JSON files found. input_dir={self.input_dir}, " + f"inpath={os.path.abspath(inpath)}, contents={dir_contents}" ) success = False + for fpath in fpaths: logger.info(f"Processing fpath={fpath}") - refmode = fpath.split("/")[-2] - reffile = fpath.split("/")[-1] + refmode = os.path.basename(os.path.dirname(fpath)) + reffile = os.path.basename(fpath) outpath = os.path.join( self.output_dir.replace("%(group_type)", "metrics_data"), self.diag_metric, @@ -112,7 +169,52 @@ def collect_metrics(self) -> bool: if "diveDown" in reffile else base_filename ) - os.rename(fpath, os.path.join(outpath, outfile)) + dest = os.path.join(outpath, outfile) + + with open(fpath) as _f: + data = json.load(_f) + + try: + for _model, _members in data["RESULTS"]["model"].items(): + for _member, _entry in _members.items(): + value_block = _entry.get("value", {}) + incomplete = [ + m for m, v in value_block.items() if not v.get("metric") + ] + if incomplete: + logger.warning( + f"{reffile}: dropping {len(incomplete)} incomplete " + f"metric(s) with empty 'metric' dict: {incomplete}" + ) + for m in incomplete: + del value_block[m] + except (KeyError, AttributeError) as e: + logger.warning(f"Could not prune incomplete metrics in {fpath}: {e}") + + if os.path.isdir(dest): + raise IsADirectoryError(f"Destination is a directory: {dest}") + + if os.path.exists(dest): + logger.warning(f"Destination already exists, replacing: {dest}") + + tmp_dest = f"{dest}.tmp" + try: + with open(tmp_dest, "w") as _f: + json.dump( + data, + _f, + indent=4, + separators=(",", ": "), + sort_keys=False, + ) + os.replace(tmp_dest, dest) + except Exception: + if os.path.exists(tmp_dest): + os.remove(tmp_dest) + raise + + os.remove(fpath) + return success def collect_diags(self) -> bool: @@ -122,14 +224,18 @@ def collect_diags(self) -> bool: fpaths = sorted(glob.glob(os.path.join(inpath, "*/*.nc"))) if not fpaths: + dir_contents = ( + os.listdir(inpath) if os.path.isdir(inpath) else "" + ) logger.error( - f"fpaths={fpaths}, self.input_dir={self.input_dir}, inpath={os.path.abspath(inpath)}, files in inpath={os.listdir(inpath)}" + f"No diagnostic NetCDF files found. input_dir={self.input_dir}, " + f"inpath={os.path.abspath(inpath)}, contents={dir_contents}" ) success = False for fpath in fpaths: logger.info(f"Processing fpath={fpath}") - refmode = fpath.split("/")[-2] - reffile = fpath.split("/")[-1] + refmode = os.path.basename(os.path.dirname(fpath)) + reffile = os.path.basename(fpath) outpath = os.path.join( self.output_dir.replace("%(group_type)", "metrics_data"), self.diag_metric, @@ -138,7 +244,14 @@ def collect_diags(self) -> bool: logger.info(f"outpath={outpath}") os.makedirs(outpath, exist_ok=True) - os.rename(fpath, os.path.join(outpath, reffile)) + dest = os.path.join(outpath, reffile) + if os.path.isdir(dest): + raise IsADirectoryError(f"Destination is a directory: {dest}") + if os.path.exists(dest): + logger.warning(f"Destination already exists, replacing: {dest}") + os.remove(dest) + shutil.move(fpath, dest) + return success def run(self, groups): @@ -157,8 +270,8 @@ def run(self, groups): # Functions ################################################################### def main(): - logger.error("zi-pcmdi-enso is not yet supported. Exiting.") - sys.exit(1) + # logger.error("zi-pcmdi-enso is not yet supported. Exiting.") + # sys.exit(1) args: Dict[str, str] = _get_args() core_parameters = CoreParameters(args) enso_parameters = ENSOParameters(args) @@ -171,10 +284,12 @@ def main(): build_enso_obsvar_landmask(core_output.obs_dic, core_parameters.variables) # now start enso driver check_enso_input() + normalize_enso_model_catalogue(core_parameters.variables) lstcmd = generate_enso_cmds(enso_parameters.enso_groups, core_parameters.case_id) logger.info( - f"input_template={core_output.input_template}; if the directories based on this template are empty, lstcmd={lstcmd} failed to produce output." + f"input_template={core_output.input_template}; If directories derived from this template are empty, it may indicate that lstcmd did not produce output." ) + if (len(lstcmd) > 0) and core_parameters.multiprocessing: logger.info(f"Running parallel jobs for {lstcmd}") try: @@ -182,7 +297,7 @@ def main(): check_enso_output(results) except RuntimeError as e: logger.error(f"Execution failed: {e}") - raise e + raise elif (len(lstcmd) > 0) and not core_parameters.multiprocessing: logger.info(f"Running serial jobs for {lstcmd}") try: @@ -190,14 +305,19 @@ def main(): check_enso_output(results) except RuntimeError as e: logger.error(f"Execution failed: {e}") - raise e + raise else: logger.info("no jobs to run...") logger.info("successfully finish all jobs....") # time delay to ensure process completely finished time.sleep(5) # Initialize and run collector - obs_dict = json.load(open("obs_catalogue.json")) + with open("obs_catalogue.json") as _f: + obs_dict = json.load(_f) + if not obs_dict: + raise ValueError( + "obs_catalogue.json is empty; cannot determine observation name." + ) obs_name = list(obs_dict.keys())[0] collector = EnsoDiagnosticsCollector( fig_format=core_parameters.figure_format, @@ -207,7 +327,11 @@ def main(): input_dir=core_output.input_template, output_dir=core_output.out_path, ) - enso_groups: List[str] = enso_parameters.enso_groups.split(",") + enso_groups: List[str] = [ + group.strip() + for group in enso_parameters.enso_groups.split(",") + if group.strip() + ] collector.run(enso_groups) @@ -330,30 +454,184 @@ def build_enso_obsvar_landmask( logger.info(f"[INFO] Landmask mapping written to: {output_file}") +def normalize_enso_model_catalogue( + variables: List[str], + catalogue_file: str = "pcmdi_diags/ts_enso_catalogue.json", +) -> None: + """ + Ensure the ENSO model catalogue exposes both logical diagnostic names + and available source-variable names when aliases are used. + + Example: + EAM/EAMxx may provide SST as source variable 'ts', while ENSO + diagnostics may expect logical variable 'sst'. If either 'ts' or + 'sst' appears in the variable list/catalogue, make sure the logical + 'sst' catalogue entry exists and points to the available source data. + """ + if not os.path.exists(catalogue_file): + logger.warning( + f"ENSO model catalogue not found, skipping normalization: {catalogue_file}" + ) + return + + with open(catalogue_file) as f: + catalogue = json.load(f, object_pairs_hook=OrderedDict) + + changed = False + + # Normalize requested variables to simple variable keys. + requested_vars = { + re.split(r"[_-]", var)[0] if "_" in var or "-" in var else var + for var in variables + } + + for logical_var, source_var in ALT_OBS_MAP.items(): + # Only act when this alias is relevant to the requested variables + # or already present in the catalogue. + alias_is_relevant = ( + logical_var in requested_vars + or source_var in requested_vars + or logical_var in catalogue + or source_var in catalogue + ) + + if not alias_is_relevant: + continue + + # Case 1: + # The catalogue already has logical_var, e.g. "sst". + # Make sure it has the expected metadata. + if logical_var in catalogue: + refset = catalogue[logical_var].get("set") + model_name = catalogue[logical_var].get(refset) + + if refset and model_name and model_name in catalogue[logical_var]: + entry = catalogue[logical_var][model_name] + + if entry.get("var_name") != logical_var: + entry["var_name"] = logical_var + changed = True + + if source_var in catalogue and entry.get("var_in_file") not in { + logical_var, + source_var, + }: + entry["var_in_file"] = source_var + changed = True + + continue + + # Case 2: + # The catalogue lacks logical_var, e.g. no "sst", + # but has source_var, e.g. "ts". Add logical_var from source_var. + if source_var not in catalogue: + logger.warning( + f"Cannot add logical ENSO variable '{logical_var}' to catalogue " + f"because source variable '{source_var}' is missing from " + f"{catalogue_file}." + ) + continue + + logger.info( + f"Adding logical ENSO catalogue entry '{logical_var}' using source " + f"variable '{source_var}'." + ) + + catalogue[logical_var] = catalogue[source_var].copy() + + refset = catalogue[logical_var].get("set") + model_name = catalogue[logical_var].get(refset) + + if refset and model_name and model_name in catalogue[logical_var]: + catalogue[logical_var][model_name] = catalogue[logical_var][ + model_name + ].copy() + + entry = catalogue[logical_var][model_name] + entry["var_in_file"] = source_var + entry["var_name"] = logical_var + + old_file_path = entry.get("file_path", "") + old_template = entry.get("template", "") + + # Prefer the symlinked logical filename, e.g. *.sst.*.nc, + # because check_enso_input() creates this link before normalization. + entry["file_path"] = old_file_path.replace( + f".{source_var}.", f".{logical_var}." + ) + entry["template"] = old_template.replace( + f".{source_var}.", f".{logical_var}." + ) + + changed = True + + if changed: + with open(catalogue_file, "w") as f: + json.dump( + catalogue, + f, + indent=4, + sort_keys=False, + separators=(",", ": "), + ) + + logger.info(f"Normalized ENSO model catalogue: {catalogue_file}") + + def check_enso_input(): current_dir: str = os.path.abspath(os.getcwd()) ts_dir: str = os.path.join(current_dir, "ts") + if not os.path.exists(ts_dir): raise FileNotFoundError(f"{ts_dir} (input for enso_driver) does not exist.") + if not os.listdir(ts_dir): raise FileNotFoundError(f"{ts_dir} is empty.") - else: - for obs_var_name, cmip_var_name in ALT_OBS_MAP.items(): - logger.info( - f"Symlinking cmip-standard {cmip_var_name} to observational variable name {obs_var_name}, if present" - ) - found_nc_file = glob.glob(f"{ts_dir}/*.{cmip_var_name}.*.nc") - if found_nc_file: - source_file = found_nc_file[0] - link_name = found_nc_file[0].replace( + + for obs_var_name, cmip_var_name in ALT_OBS_MAP.items(): + logger.info( + f"Symlinking cmip-standard {cmip_var_name} to observational " + f"variable name {obs_var_name}, if present" + ) + + found_nc_file = glob.glob( + os.path.join(ts_dir, f"*.{cmip_var_name}.*.nc") + ) + glob.glob(os.path.join(ts_dir, f"{cmip_var_name}_*.nc")) + + if found_nc_file: + source_file = found_nc_file[0] + source_basename = os.path.basename(source_file) + + if f".{cmip_var_name}." in source_basename: + link_name = source_file.replace( f".{cmip_var_name}.", f".{obs_var_name}." ) + elif source_basename.startswith(f"{cmip_var_name}_"): + link_name = os.path.join( + ts_dir, + source_basename.replace(f"{cmip_var_name}_", f"{obs_var_name}_", 1), + ) + else: + logger.warning( + f"Could not infer symlink name for source file: {source_file}" + ) + link_name = None + + if link_name: + if not os.path.exists(link_name): + os.symlink(source_file, link_name) + else: + logger.info(f"Symlink already exists, skipping: {link_name}") + + found_txt_file = glob.glob(os.path.join(ts_dir, f"{cmip_var_name}_files.txt")) + if found_txt_file: + source_file = found_txt_file[0] + link_name = os.path.join(ts_dir, f"{obs_var_name}_files.txt") + + if not os.path.exists(link_name): os.symlink(source_file, link_name) - found_txt_file = glob.glob(f"{ts_dir}/{cmip_var_name}_files.txt") - if found_txt_file: - source_file = found_txt_file[0] - link_name = f"{ts_dir}/{obs_var_name}_files.txt" - os.symlink(source_file, link_name) + else: + logger.info(f"Symlink already exists, skipping: {link_name}") def generate_enso_cmds( @@ -362,28 +640,20 @@ def generate_enso_cmds( param_file="parameterfile.py", driver_script="enso_driver.py", ): - """ - Generate ENSO driver command-line strings for given metric groups. - - Parameters: - enso_groups_str: Comma-separated list of ENSO metric groups. - case_id: Case identifier. - param_file: Parameter file used by the driver script. - driver_script: ENSO driver script filename. - - Returns: - cmds: List of shell command strings to run. - """ - enso_groups = enso_groups_str.split(",") + enso_groups = [ + group.strip() for group in enso_groups_str.split(",") if group.strip() + ] commands = [ "{} -p {} --metricsCollection {} --case_id {}".format( driver_script, param_file, group, case_id ) for group in enso_groups ] + current_dir: str = os.path.abspath(os.getcwd()) logger.info(f"Commands will be run from current_dir={current_dir}") dir_contents: List[str] = os.listdir(current_dir) + if param_file not in dir_contents: logger.error( f"Parameter file '{param_file}' not found in current directory: {current_dir}" @@ -396,21 +666,30 @@ def generate_enso_cmds( def check_enso_output(results): logger.info("Checking ENSO output.") success: bool = True + for i, (stdout, stderr, return_code) in enumerate(results): logger.info(f"Command {i + 1} finished:") logger.info(f"STDOUT: {stdout}") logger.info(f"STDERR: {stderr}") logger.info(f"Return code: {return_code}") + + if return_code != 0: + logger.error(f"Command {i + 1} failed with return code {return_code}.") + success = False + if not check_vars(stdout): logger.error(f"Command {i + 1} failed to produce expected variables.") success = False + if not check_output_dirs(stdout): logger.error( f"Command {i + 1} failed to produce expected output directories." ) success = False + if not success: raise RuntimeError("ENSO output check failed.") + logger.info("ENSO output check passed.") @@ -422,82 +701,129 @@ def check_vars(stdout: str) -> bool: stdout (str): Standard output from the command execution. Returns: - bool: True if expected variables are found, False otherwise. + bool: True if expected variables are found, or if only optional + process-level variables are missing. False otherwise. """ - success: bool = True - match_object = re.search(r"list_variables:\s*\[(.*?)\]", stdout) - # Special-case "optional" missing variables (these quantities may not be frequently output) - optional_missing = {"ssh", "thf"} - if match_object: - variables_content = match_object.group(1) - # Split by comma and clean up each variable name - requested_variables = [] - for var in variables_content.split(","): - # Remove quotes, whitespace, and extract just the variable name - clean_var = re.sub(r"['\"\s]", "", var.strip()) - if clean_var: # Only care about non-empty strings - requested_variables.append(clean_var) - # Now, check if we actually have data for these variables - current_dir: str = os.path.abspath(os.getcwd()) - variables_missing_data: List[str] = [] - for var in requested_variables: - found_nc_file = glob.glob(f"ts/*.{var}.*.nc") - found_txt_file = glob.glob(f"ts/{var}_files.txt") - if (not found_nc_file) or (not found_txt_file): - variables_missing_data.append(var) - # Check for references - if var in ALT_OBS_MAP: - alt_var = ALT_OBS_MAP[var] - found_nc_file_alt = glob.glob(f"ts/*.{alt_var}.*.nc") - found_txt_file_alt = glob.glob(f"ts/{alt_var}_files.txt") - if found_nc_file_alt or found_txt_file_alt: - logger.error( - f"Found alternative variable '{alt_var}' for '{var}' in {current_dir}/ts. This indicates that the variable derivation/mapping has not been applied correctly." - ) - ts_dir = os.path.join(current_dir, "ts") - if variables_missing_data: - if set(variables_missing_data) <= optional_missing: - # Only ssh/thf are missing → warn but do not fail - logger.warning( - f"Optional variables missing: {variables_missing_data} in directory{ts_dir}" - ) - success = True - else: - # Other variables missing → error and fail - logger.error( - f"Variables missing data: {variables_missing_data} in directory {ts_dir}" + match_object = re.search(r"list_variables\s*[:=]\s*\[(.*?)\]", stdout, re.DOTALL) + + # Allow ENSO diagnostics to continue if only selected variables are missing. + # ssh and thf are rarely available process-level variables and are mainly used + # by selected ENSO process diagnostics, not the core ENSO performance diagnostics. + # + # v4 note: EAMxx does not always output taux and tauy by default, so we also + # treat them as soft-missing variables here to avoid stopping the full ENSO + # workflow when surface wind stress diagnostics are unavailable. + optional_missing = {"ssh", "thf", "taux", "tauy"} + + if not match_object: + logger.error("No variable list found in stdout.") + return False + + variables_content = match_object.group(1) + + requested_variables: List[str] = [] + for var in variables_content.split(","): + clean_var = re.sub(r"['\"\s]", "", var.strip()) + if clean_var: + requested_variables.append(clean_var) + + current_dir: str = os.path.abspath(os.getcwd()) + ts_dir = os.path.join(current_dir, "ts") + + variables_missing_data: List[str] = [] + + for var in requested_variables: + # Support both common filename conventions: + # ts/*..*.nc + # ts/_*.nc + found_nc_file = glob.glob(os.path.join(ts_dir, f"*.{var}.*.nc")) + glob.glob( + os.path.join(ts_dir, f"{var}_*.nc") + ) + found_txt_file = glob.glob(os.path.join(ts_dir, f"{var}_files.txt")) + + if (not found_nc_file) or (not found_txt_file): + variables_missing_data.append(var) + + # Check whether an alternative/source variable exists. + # This may indicate that variable derivation/mapping was not applied. + if var in ALT_OBS_MAP: + alt_var = ALT_OBS_MAP[var] + found_nc_file_alt = glob.glob( + os.path.join(ts_dir, f"*.{alt_var}.*.nc") + ) + glob.glob(os.path.join(ts_dir, f"{alt_var}_*.nc")) + found_txt_file_alt = glob.glob( + os.path.join(ts_dir, f"{alt_var}_files.txt") ) - logger.error(f"Full contents of {ts_dir}: {os.listdir(ts_dir)}") - success = False - else: - logger.info( - f"All requested variables {requested_variables} found in directory {ts_dir}" + if found_nc_file_alt or found_txt_file_alt: + logger.warning( + f"Found alternative variable '{alt_var}' for expected variable " + f"'{var}' in {ts_dir}. " + f"NetCDF found: {bool(found_nc_file_alt)}; " + f"txt found: {bool(found_txt_file_alt)}. " + "This indicates that the variable derivation/mapping may not " + "have been applied correctly." + ) + + if variables_missing_data: + if set(variables_missing_data) <= optional_missing: + logger.warning( + f"Optional process-level variables missing: {variables_missing_data} " + f"in directory {ts_dir}; continuing ENSO diagnostics." ) - else: - logger.error("No variable list found in stdout.") - success = False - return success + return True + + logger.error( + f"Variables missing data: {variables_missing_data} in directory {ts_dir}" + ) + + if os.path.isdir(ts_dir): + logger.error(f"Full contents of {ts_dir}: {os.listdir(ts_dir)}") + else: + logger.error(f"Directory does not exist: {ts_dir}") + + return False + + logger.info( + f"All requested variables {requested_variables} found in directory {ts_dir}" + ) + return True def check_output_dirs(stdout: str) -> bool: current_dir: str = os.path.abspath(os.getcwd()) success: bool = True + for output_type in ["graphics", "diagnostic_results", "metrics_results"]: - match_object = re.search(f"output directory for {output_type}:(.*)", stdout) - if match_object: - subdir = match_object.group(1).strip() - combined_dir = os.path.join(current_dir, subdir) - if not os.path.exists(combined_dir): - logger.error( - f"{output_type} output directory does not exist: {combined_dir}" - ) - success = False - else: - if not os.listdir(combined_dir): - logger.error( - f"{output_type} output directory is empty: {combined_dir}" - ) - success = False - # else: success = True - # else: success = True # Don't assume we have any particular directory + match_object = re.search( + rf"output directory for {re.escape(output_type)}:(.*)", + stdout, + ) + + if not match_object: + logger.warning( + f"No output directory line found for {output_type} in stdout." + ) + continue + + subdir = match_object.group(1).strip() + combined_dir = os.path.join(current_dir, subdir) + + if not os.path.exists(combined_dir): + logger.error( + f"{output_type} output directory does not exist: {combined_dir}" + ) + success = False + continue + + if not os.path.isdir(combined_dir): + logger.error( + f"{output_type} output path exists but is not a directory: {combined_dir}" + ) + success = False + continue + + if not os.listdir(combined_dir): + logger.error(f"{output_type} output directory is empty: {combined_dir}") + success = False + return success diff --git a/zppy_interfaces/pcmdi_diags/pcmdi_mean_cimate.py b/zppy_interfaces/pcmdi_diags/pcmdi_mean_climate.py similarity index 81% rename from zppy_interfaces/pcmdi_diags/pcmdi_mean_cimate.py rename to zppy_interfaces/pcmdi_diags/pcmdi_mean_climate.py index c29a713..8cc1023 100644 --- a/zppy_interfaces/pcmdi_diags/pcmdi_mean_cimate.py +++ b/zppy_interfaces/pcmdi_diags/pcmdi_mean_climate.py @@ -21,7 +21,10 @@ # Classes ##################################################################### class MeanClimateParameters(object): def __init__(self, args: Dict[str, str]): - self.regions: List[str] = args["regions"].split(",") + regions = args.get("regions") + if not regions: + raise ValueError("--regions is required but was not provided.") + self.regions: List[str] = regions.split(",") class MeanClimateMetricsCollector: @@ -38,6 +41,11 @@ def __init__( self.regions = regions self.variables = variables self.fig_format = fig_format + if len(model_info) != 4: + raise ValueError( + f"model_info must have 4 parts (mip, exp, model, relm), " + f"got {len(model_info)}: {model_info}" + ) self.mip, self.exp, self.model, self.relm = model_info self.case_id = case_id self.input_template = input_template @@ -53,7 +61,7 @@ def collect(self): def _collect_figures(self): fig_sets = OrderedDict() - fig_sets["CLIM_patttern"] = ["graphics", "*"] + fig_sets["CLIM_pattern"] = ["graphics", "*"] for fset, (fig_type, prefix) in fig_sets.items(): for region in self.regions: @@ -68,6 +76,11 @@ def _collect_figures(self): ) fpaths = sorted(glob.glob(search_path)) + if not fpaths: + logger.warning( + f"No figures found for var={var}, region={region}, " + f"season={season}: {search_path}" + ) for fpath in fpaths: refname = os.path.basename(fpath).split("_")[0] filname = f"{refname}_{region}_{season}.{self.fig_format}" @@ -106,8 +119,14 @@ def _collect_metrics(self): fpaths = sorted(glob.glob(os.path.join(inpath, "*.json"))) for fpath in fpaths: - refname = os.path.basename(fpath).split("_")[:2] - filname = f"{refname[0]}.{refname[1]}.{self.model_name}.{self.case_id}.json" + parts = os.path.basename(fpath).split("_") + if len(parts) < 2: + logger.error( + f"Unexpected metrics filename format (need at least 2 '_'-separated " + f"parts): {os.path.basename(fpath)}, skipping." + ) + continue + filname = f"{parts[0]}.{parts[1]}.{self.model_name}.{self.case_id}.json" outfile = os.path.join(outpath, filname) os.rename(fpath, outfile) @@ -134,25 +153,27 @@ def main(): try: results = run_parallel_jobs(lstcmd, core_parameters.num_workers) for i, (stdout, stderr, return_code) in enumerate(results): - print(f"\nCommand {i + 1} finished:") - print(f"STDOUT: {stdout}") - print(f"STDERR: {stderr}") - print(f"Return code: {return_code}") + logger.info(f"Command {i + 1} finished:") + logger.info(f"STDOUT: {stdout}") + logger.info(f"STDERR: {stderr}") + logger.info(f"Return code: {return_code}") except RuntimeError as e: - print(f"Execution failed: {e}") + logger.error(f"Execution failed: {e}") + raise elif len(lstcmd) > 0: try: results = run_serial_jobs(lstcmd) for i, (stdout, stderr, return_code) in enumerate(results): - print(f"\nCommand {i + 1} finished:") - print(f"STDOUT: {stdout}") - print(f"STDERR: {stderr}") - print(f"Return code: {return_code}") + logger.info(f"Command {i + 1} finished:") + logger.info(f"STDOUT: {stdout}") + logger.info(f"STDERR: {stderr}") + logger.info(f"Return code: {return_code}") except RuntimeError as e: - print(f"Execution failed: {e}") + logger.error(f"Execution failed: {e}") + raise else: - print("no jobs to run,continue....") - print("successfully finish all jobs....") + logger.info("no jobs to run, continuing...") + logger.info("successfully finished all jobs.") # time delay to ensure process completely finished time.sleep(5) # orgnize diagnostic output @@ -254,4 +275,9 @@ def generate_mean_clim_cmds(variables, obs_dic, case_id): ] ) commands.append(cmd) + else: + logger.warning( + f"Variable '{var_key}' not found in obs_dic; " + f"no mean_climate command will be generated for it." + ) return commands diff --git a/zppy_interfaces/pcmdi_diags/pcmdi_setup.py b/zppy_interfaces/pcmdi_diags/pcmdi_setup.py index 57bbc86..0185c22 100644 --- a/zppy_interfaces/pcmdi_diags/pcmdi_setup.py +++ b/zppy_interfaces/pcmdi_diags/pcmdi_setup.py @@ -18,8 +18,14 @@ # Classes ##################################################################### class CoreParameters(object): def __init__(self, args: Dict[str, str]): - self.num_workers: int = int(args["num_workers"]) - self.multiprocessing: bool = args["multiprocessing"].lower() == "true" + num_workers = args.get("num_workers") + if num_workers is None: + raise ValueError("--num_workers is required but was not provided.") + self.num_workers: int = int(num_workers) + multiprocessing = args.get("multiprocessing") + if multiprocessing is None: + raise ValueError("--multiprocessing is required but was not provided.") + self.multiprocessing: bool = multiprocessing.lower() == "true" self.subsection: str = args["subsection"] self.test_data_path: str = args["climo_ts_dir_primary"] self.reference_data_path: str = args["climo_ts_dir_ref"] @@ -31,7 +37,10 @@ def __init__(self, args: Dict[str, str]): self.model_name_ref: str = args[ "model_name_ref" ] # run_type == "model_vs_model" only - self.variables: List[str] = args["vars"].split(",") + vars_arg = args.get("vars") + if not vars_arg: + raise ValueError("--vars is required but was not provided.") + self.variables: List[str] = vars_arg.split(",") self.tableID_ref: str = args["tableID_ref"] # run_type == "model_vs_model" only # Whether to generate the land/sea mask self.generate_flag: str = args["generate_sftlf"] @@ -94,7 +103,7 @@ def build_catalogues(self) -> Tuple[OrderedDict, OrderedDict]: and os.path.exists(ref_files[0]) ): logger.info( - f"Extracting & assigining metadata for {varin}, the base var name of {var}" + f"Extracting & assigning metadata for {varin}, the base var name of {var}" ) for fileset, info_dict, dataset, dataset_set in [ (test_files[0], self.test_info, self.variables, self.test_set), @@ -106,7 +115,7 @@ def build_catalogues(self) -> Tuple[OrderedDict, OrderedDict]: ) else: logger.info( - f"NOT extracting & assigining metadata for {varin}, the base var name of {var}." + f"NOT extracting & assigning metadata for {varin}, the base var name of {var}." ) logger.info(f"test_files={test_files}") logger.info(f"ref_files={ref_files}") @@ -132,17 +141,17 @@ def _get_base_varname(self, var): def _extract_metadata(self, filepath, varin, var): filename = os.path.basename(filepath) - logger.info(f"Extracting metadata from {filename}, dervied from {filepath}") + logger.info(f"Extracting metadata from {filename}, derived from {filepath}") parts = filename.split(".") if len(parts) < 7: # Example file in tmp-dir/ts: # e3sm.amip.v3-LR.0101.Amon.ts.200501-201412.nc - logger.error( + raise ValueError( f"Filename {filename} does not have at least 7 parts when split by '.', unexpected format." ) yymm_range = parts[6].split("-") if len(yymm_range) != 2: - logger.error( + raise ValueError( f"Filename {filename} has unexpected date range format in part '{parts[6]}'." ) logger.info( @@ -209,8 +218,8 @@ def _process_group(self, group): ) if not os.path.exists(catalog_path): - print( - f"Warning: Catalogue not found at {catalog_path}, absolute path {os.path.abspath(catalog_path)}" + logger.warning( + f"Catalogue not found at {catalog_path}, absolute path {os.path.abspath(catalog_path)}" ) return @@ -235,10 +244,10 @@ def _generate_mask(self, input_path, output_path, model_name): try: mask = create_land_sea_mask(ds, method="regionmask") - print("Land mask estimated using regionmask method.") + logger.info("Land mask estimated using regionmask method.") except Exception: mask = create_land_sea_mask(ds, method="pcmdi") - print("Land mask estimated using PCMDI method.") + logger.info("Land mask estimated using PCMDI method.") mask = mask * 100.0 mask.attrs.update( @@ -276,12 +285,26 @@ def set_up(parameters: CoreParameters) -> CoreOutput: parameters.multiprocessing if parameters.num_workers >= 2 else False ) # Dataset identifiers - test_data_set: List[str] = [parameters.model_name.split(".")[1]] + model_name_parts = parameters.model_name.split(".") + if len(model_name_parts) < 2: + raise ValueError( + f"model_name must have at least 2 dot-separated parts, " + f"got: {parameters.model_name}" + ) + test_data_set: List[str] = [model_name_parts[1]] reference_data_set: List[str] if parameters.run_type == "model_vs_obs": reference_data_set = parameters.obs_sets.split(",") elif parameters.run_type == "model_vs_model": - reference_data_set = [parameters.model_name_ref.split(".")[1]] + if not parameters.model_name_ref: + raise ValueError("model_name_ref is required for run_type=model_vs_model") + ref_parts = parameters.model_name_ref.split(".") + if len(ref_parts) < 2: + raise ValueError( + f"model_name_ref must have at least 2 dot-separated parts, " + f"got: {parameters.model_name_ref}" + ) + reference_data_set = [ref_parts[1]] else: raise ValueError(f"Invalid run_type={parameters.run_type}") ############################################################### @@ -345,8 +368,8 @@ def set_up(parameters: CoreParameters) -> CoreOutput: "pcmdi_diags", "%(output_type)", "%(metric_type)", - parameters.model_name.split(".")[0], - parameters.model_name.split(".")[1], + model_name_parts[0], + model_name_parts[1], parameters.case_id, ) # Diagnostic output path templates @@ -424,6 +447,6 @@ def derive_missing_variable(varin, path, model_id): ) out_ds.to_netcdf(output_file) - print(f"Derived variable '{varin}' written to {output_file}") + logger.info(f"Derived variable '{varin}' written to {output_file}") return diff --git a/zppy_interfaces/pcmdi_diags/pcmdi_synthetic_plots.py b/zppy_interfaces/pcmdi_diags/pcmdi_synthetic_plots.py index aa24802..bc3d267 100644 --- a/zppy_interfaces/pcmdi_diags/pcmdi_synthetic_plots.py +++ b/zppy_interfaces/pcmdi_diags/pcmdi_synthetic_plots.py @@ -27,50 +27,46 @@ class SyntheticPlotsParameters(object): def __init__(self, args: Dict[str, str]): self.figure_format: str = args["figure_format"] self.www: str = args["www"] - self.save_all_data: bool = str(args["save_all_data"]).lower() in ( - "true", - "1", - "yes", - ) + self.save_all_data: bool = str2bool(args.get("save_all_data", False)) self.results_dir: str = args["results_dir"] self.case: str = args["case"] self.model_name: str = args["model_name"] self.model_tableID: str = args["model_tableID"] self.web_dir: str = args["web_dir"] - self.clim_viewer: bool = str(args["clim_viewer"]).lower() in ( - "true", - "1", - "yes", + self.clim_viewer: bool = str2bool(args.get("clim_viewer", False)) + self.clim_vars: List[str] = ( + (args.get("clim_vars") or "").split(",") if args.get("clim_vars") else [] ) - self.clim_vars: List[str] = args["clim_vars"].split(",") self.clim_years: str = args["clim_years"] - self.clim_regions: List[str] = args["clim_regions"].split(",") + self.clim_regions: List[str] = ( + (args.get("clim_regions") or "").split(",") + if args.get("clim_regions") + else [] + ) self.cmip_clim_dir: str = args["cmip_clim_dir"] self.cmip_clim_set: str = args["cmip_clim_set"] - self.mova_viewer: bool = str(args["mova_viewer"]).lower() in ( - "true", - "1", - "yes", + self.mova_viewer: bool = str2bool(args.get("mova_viewer", False)) + self.mova_modes: List[str] = ( + (args.get("mova_modes") or "").split(",") if args.get("mova_modes") else [] + ) + self.mova_vars: List[str] = ( + (args.get("mova_vars") or "").split(",") if args.get("mova_vars") else [] ) - self.mova_modes: List[str] = args["mova_modes"].split(",") - self.mova_vars: List[str] = args["mova_vars"].split(",") self.mova_years: str = args["mova_years"] - self.movc_viewer: bool = str(args["movc_viewer"]).lower() in ( - "true", - "1", - "yes", + self.movc_viewer: bool = str2bool(args.get("movc_viewer", False)) + self.movc_modes: List[str] = ( + (args.get("movc_modes") or "").split(",") if args.get("movc_modes") else [] + ) + self.movc_vars: List[str] = ( + (args.get("movc_vars") or "").split(",") if args.get("movc_vars") else [] ) - self.movc_modes: List[str] = args["movc_modes"].split(",") - self.movc_vars: List[str] = args["movc_vars"].split(",") self.movc_years: str = args["movc_years"] self.cmip_movs_dir: str = args["cmip_movs_dir"] self.cmip_movs_set: str = args["cmip_movs_set"] - self.enso_viewer: bool = str(args["enso_viewer"]).lower() in ( - "true", - "1", - "yes", + self.enso_viewer: bool = str2bool(args.get("enso_viewer", False)) + self.enso_vars: List[str] = ( + (args.get("enso_vars") or "").split(",") if args.get("enso_vars") else [] ) - self.enso_vars: List[str] = args["enso_vars"].split(",") self.enso_years: str = args["enso_years"] self.cmip_enso_dir: str = args["cmip_enso_dir"] self.cmip_enso_set: str = args["cmip_enso_set"] @@ -98,7 +94,8 @@ def main(): "metrics_data", "%(group_type)", ) - metric_dict = json.load(open("synthetic_metrics_list.json")) + with open("synthetic_metrics_list.json") as _f: + metric_dict = json.load(_f) plotter = SyntheticMetricsPlotter( # Core case_name=parameters.case, @@ -163,7 +160,10 @@ def main(): "e3sm_pmp_logo.png", ) web_logo_dst = os.path.join(out_dir, "e3sm_pmp_logo.png") - shutil.copy(web_logo_src, web_logo_dst) + if not os.path.exists(web_logo_src): + logger.warning(f"Logo file not found, skipping copy: {web_logo_src}") + else: + shutil.copy(web_logo_src, web_logo_dst) # Build config config = collect_config( title=parameters.pcmdi_webtitle, @@ -247,13 +247,13 @@ def _get_args() -> Dict[str, str]: parser.add_argument("--pcmdi_external_prefix", type=str) parser.add_argument("--pcmdi_viewer_template", type=str) parser.add_argument("--save_all_data", type=str2bool) - parser.add_argument("--debug", type=str) + parser.add_argument("--debug", type=str2bool, default=False) # Ignore the first arg # (zi-pcmdi-synthetic-plots) args: argparse.Namespace = parser.parse_args(sys.argv[1:]) - if args.debug and args.debug.lower() == "true": + if args.debug: logger.setLevel("DEBUG") logger.debug("Debug logging enabled") diff --git a/zppy_interfaces/pcmdi_diags/pcmdi_variability_modes.py b/zppy_interfaces/pcmdi_diags/pcmdi_variability_modes.py index 22ac7ad..3ac107e 100644 --- a/zppy_interfaces/pcmdi_diags/pcmdi_variability_modes.py +++ b/zppy_interfaces/pcmdi_diags/pcmdi_variability_modes.py @@ -19,9 +19,15 @@ # Classes ##################################################################### class VariabilityModesParameters(object): def __init__(self, args: Dict[str, str]): - self.var_modes: List[str] = args["var_modes"].split(",") + var_modes = args.get("var_modes") + if not var_modes: + raise ValueError("--var_modes is required but was not provided.") + self.var_modes: List[str] = var_modes.split(",") # self.vars is distinct from the list version in CoreParameters - self.vars: str = args["vars"] + vars_arg = args.get("vars") + if not vars_arg: + raise ValueError("--vars is required but was not provided.") + self.vars: str = vars_arg class VariabilityMetricsCollector: @@ -66,6 +72,11 @@ def _collect_figures(self): search_path = os.path.join(indir, mode, "*", template) matched_files = sorted(glob.glob(search_path)) + if not matched_files: + logger.warning( + f"No figures found for fig_set={fig_set}, mode={mode}, " + f"season={season}: {search_path}" + ) for fpath in matched_files: filename = os.path.basename(fpath) outfile = self._classify_output_name( @@ -91,6 +102,12 @@ def _classify_output_name(self, fig_set, mode, season, filename): suffix = "eof2" elif "EOF3" in filename: suffix = "eof3" + if suffix == "unknown": + logger.warning( + f"Could not classify output name for file '{filename}' " + f"(fig_set={fig_set}, mode={mode}, season={season}); " + f"using suffix 'unknown'." + ) return f"{fig_set}_{mode}_{season}_{suffix}.{self.fig_format}" def _collect_metrics(self): @@ -178,29 +195,36 @@ def main(): try: results = run_parallel_jobs(lstcmd, core_parameters.num_workers) for i, (stdout, stderr, return_code) in enumerate(results): - print(f"\nCommand {i + 1} finished:") - print(f"STDOUT: {stdout}") - print(f"STDERR: {stderr}") - print(f"Return code: {return_code}") + logger.info(f"Command {i + 1} finished:") + logger.info(f"STDOUT: {stdout}") + logger.info(f"STDERR: {stderr}") + logger.info(f"Return code: {return_code}") except RuntimeError as e: - print(f"Execution failed: {e}") + logger.error(f"Execution failed: {e}") + raise elif len(lstcmd) > 0: try: results = run_serial_jobs(lstcmd) for i, (stdout, stderr, return_code) in enumerate(results): - print(f"\nCommand {i + 1} finished:") - print(f"STDOUT: {stdout}") - print(f"STDERR: {stderr}") - print(f"Return code: {return_code}") + logger.info(f"Command {i + 1} finished:") + logger.info(f"STDOUT: {stdout}") + logger.info(f"STDERR: {stderr}") + logger.info(f"Return code: {return_code}") except RuntimeError as e: - print(f"Execution failed: {e}") + logger.error(f"Execution failed: {e}") + raise else: - print("no jobs to run,continue...") - print("successfully finish all jobs....") + logger.info("no jobs to run, continuing...") + logger.info("successfully finished all jobs.") # time delay to ensure process completely finished time.sleep(5) # Create the collector instance split_name: List[str] = core_parameters.model_name.split(".") + if len(split_name) != 4: + raise ValueError( + f"model_name must have 4 dot-separated parts (mip.exp.model.relm), " + f"got {len(split_name)}: {core_parameters.model_name}" + ) collector = VariabilityMetricsCollector( modes=variability_modes_parameters.var_modes, fig_format=core_parameters.figure_format, @@ -278,7 +302,7 @@ def generate_varmode_cmds(modes, varOBS, reftyrs, reftyre, refname, refpath, cas f"--osyear {reftyrs} " f"--oeyear {reftyre} " f"--reference_data_name {refname} " - f"--reference_data_path {refpath} " + f'--reference_data_path "{refpath}" ' f"--case_id {case_id}" ) commands.append(cmd) diff --git a/zppy_interfaces/pcmdi_diags/synthetic_plots/enso_metrics_reader.py b/zppy_interfaces/pcmdi_diags/synthetic_plots/enso_metrics_reader.py index b07d9f4..230d160 100644 --- a/zppy_interfaces/pcmdi_diags/synthetic_plots/enso_metrics_reader.py +++ b/zppy_interfaces/pcmdi_diags/synthetic_plots/enso_metrics_reader.py @@ -23,6 +23,16 @@ def __init__(self, parameter, stat, metric_dict, mips, collections): def run(self): """Collect paths to ENSO metrics JSON files and return the mapping.""" + if not self.mips: + raise ValueError( + "mips is empty; cannot retrieve ENSO metrics. " + "Check that cmip_name and model_name are configured." + ) + if not self.metrics_collections: + raise ValueError( + "metrics_collections is empty; cannot retrieve ENSO metrics. " + "Check that 'collection' is configured in the metric_dict." + ) for mip in self.mips: self.dict_json_path[mip] = {} for metrics_collection in self.metrics_collections: @@ -35,9 +45,9 @@ def run(self): self._get_test_json_path(mip, metrics_collection) ) - if len(self.dict_json_path[mip]) < 1: + if not self.dict_json_path[mip]: raise FileNotFoundError( - f"No Synthetic ENSO Metrics Data for {mip}, aborting..." + f"No ENSO metrics files were collected for mip '{mip}'." ) return self.dict_json_path @@ -79,7 +89,13 @@ def _get_test_json_path(self, mip, metrics_collection): with open(json_path) as ff: data_json = json.load(ff) - old_key = list(data_json["RESULTS"]["model"].keys())[0] + results_block = data_json.get("RESULTS", {}) + model_block = results_block.get("model", {}) + if not model_block: + raise KeyError( + f"Expected non-empty 'RESULTS.model' dict in {json_path}" + ) + old_key = list(model_block.keys())[0] data_json["RESULTS"]["model"][mip] = data_json["RESULTS"]["model"].pop( old_key diff --git a/zppy_interfaces/pcmdi_diags/synthetic_plots/synthetic_metrics_plotter.py b/zppy_interfaces/pcmdi_diags/synthetic_plots/synthetic_metrics_plotter.py index 86803d1..2d0da9d 100644 --- a/zppy_interfaces/pcmdi_diags/synthetic_plots/synthetic_metrics_plotter.py +++ b/zppy_interfaces/pcmdi_diags/synthetic_plots/synthetic_metrics_plotter.py @@ -316,12 +316,8 @@ def _handle_enso_metric(self, metric: str) -> None: ) # --- Collections (optional config) --- - enso_collections = self.metric_dict.get("collection", []) - if not isinstance(enso_collections, (list, tuple)): - logger.warning( - f"[enso] 'collection' should be list/tuple; got {type(enso_collections).__name__}. Using empty list." - ) - enso_collections = [] + # Resolved per-stat inside the loop below, as the JSON structure is + # metric_dict["enso_metric"][stat]["collection"]. # --- Validate metric entry --- if metric not in self.metric_dict or not isinstance( @@ -348,6 +344,14 @@ def _handle_enso_metric(self, metric: str) -> None: ) continue + enso_collections = self.metric_dict[metric][stat].get("collection", []) + if not isinstance(enso_collections, (list, tuple)): + logger.warning( + f"[enso] 'collection' for stat='{stat}' should be list/tuple; " + f"got {type(enso_collections).__name__}. Using empty list." + ) + enso_collections = [] + logger.debug( f"[enso] stat='{stat}', enso_mips={enso_mips}, collections={enso_collections}" ) @@ -375,6 +379,60 @@ def _handle_enso_metric(self, metric: str) -> None: logger.exception(f"[enso] Plot driver failed for stat='{stat}': {e}") +def _prepare_mean_climate_portrait_variables( + df_dict, + seasons, + region, + stat, + var_list, + var_unit_list, +): + var_names = sorted(var_list.copy()) + name_to_unit = dict(zip(var_list, var_unit_list)) + season_data = {} + season_var_sets = [] + + for season in seasons: + if season not in df_dict or region not in df_dict[season]: + logger.warning( + "[mean_climate] Missing data for season=%s region=%s; " + "skipping portrait plot for stat=%s.", + season, + region, + stat, + ) + return {}, [], [] + + data_dict, available_vars, _ = drop_vars( + df_dict[season][region].copy(), + var_names.copy(), + [name_to_unit[v] for v in var_names], + ) + season_data[season] = data_dict + season_var_sets.append(set(available_vars)) + + common_vars = [var for var in var_names if all(var in s for s in season_var_sets)] + skipped_vars = [var for var in var_names if var not in common_vars] + if skipped_vars: + logger.warning( + "[mean_climate] Variables unavailable for all seasons in " + "region=%s stat=%s and will be skipped: %s", + region, + stat, + skipped_vars, + ) + if not common_vars: + logger.warning( + "[mean_climate] No variables available for portrait plot " + "region=%s stat=%s; skipping.", + region, + stat, + ) + return {}, [], [] + + return season_data, common_vars, [name_to_unit[v] for v in common_vars] + + def mean_climate_plot_driver( metric, stat, @@ -406,15 +464,22 @@ def mean_climate_plot_driver( metric, region, stat ) ) - var_names = sorted(var_list.copy()) - # label information - var_units = [] - for i, var in enumerate(var_names): - index = var_list.index(var) - var_units.append(var_unit_list[index]) + season_data, var_names, var_units = ( + _prepare_mean_climate_portrait_variables( + df_dict, + metric_dict["season"], + region, + stat, + var_list, + var_unit_list, + ) + ) + if not season_data: + continue + data_nor = dict() for season in metric_dict["season"]: - data_dict = df_dict[season][region].copy() + data_dict = season_data[season] if stat == "cor_xy": data_nor[season] = data_dict[var_names].to_numpy().T else: @@ -852,6 +917,9 @@ def drop_vars(data_dict, var_names, var_units=None): protected_columns = {"model", "run", "model_run", "num_runs"} columns_to_drop = [] + missing_columns = [var for var in var_names if var not in data_dict.columns] + columns_to_drop.extend(missing_columns) + for column in data_dict.columns: if column in protected_columns: continue @@ -860,7 +928,7 @@ def drop_vars(data_dict, var_names, var_units=None): columns_to_drop.append(column) # Drop columns from DataFrame - data_dict = data_dict.drop(columns=columns_to_drop) + data_dict = data_dict.drop(columns=columns_to_drop, errors="ignore") # Update var_names and var_units if applicable updated_var_names = [v for v in var_names if v not in columns_to_drop] diff --git a/zppy_interfaces/pcmdi_diags/utils.py b/zppy_interfaces/pcmdi_diags/utils.py index 10c3290..1168b88 100644 --- a/zppy_interfaces/pcmdi_diags/utils.py +++ b/zppy_interfaces/pcmdi_diags/utils.py @@ -49,6 +49,9 @@ def run_parallel_jobs(cmds: List[str], num_workers: int) -> List[Tuple[str, str, Returns: - List of tuples: (stdout, stderr, return_code) for each command. """ + if num_workers < 1: + raise ValueError(f"num_workers must be >= 1, got {num_workers}") + results = [] procs = [] @@ -58,16 +61,20 @@ def run_parallel_jobs(cmds: List[str], num_workers: int) -> List[Tuple[str, str, # Run the batch if full or if it's the last command if len(procs) >= num_workers or i == len(cmds) - 1: - logger.info(f"Running {count_child_processes()} subprocesses...") - for cmd, proc in procs: - stdout, stderr = proc.communicate() - return_code = proc.returncode + logger.info(f"Running batch of {len(procs)} subprocesses...") + for batch_cmd, batch_proc in procs: + stdout, stderr = batch_proc.communicate() + return_code = batch_proc.returncode if return_code != 0: + # Terminate any remaining running processes in the batch + for _, remaining_proc in procs: + if remaining_proc.poll() is None: + remaining_proc.terminate() logger.error( - f"ERROR: Process failed: '{cmd}'\nError: {stderr.strip()}" + f"ERROR: Process failed: '{batch_cmd}'\nError: {stderr.strip()}" ) - raise RuntimeError(f"Subprocess failed: {cmd}") + raise RuntimeError(f"Subprocess failed: {batch_cmd}") results.append((stdout.strip(), stderr.strip(), return_code)) @@ -91,14 +98,26 @@ def run_serial_jobs(cmds: List[str]) -> List[Tuple[str, str, int]]: for i, cmd in enumerate(cmds): logger.info(f"Running [{i + 1}/{len(cmds)}]: {cmd}") + proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True, text=True) stdout, stderr = proc.communicate() return_code = proc.returncode - if return_code != 0: - logger.error(f"ERROR: Process failed: '{cmd}'\nError: {stderr.strip()}") - raise RuntimeError(f"Subprocess failed: {cmd}") + stdout = stdout.strip() + stderr = stderr.strip() - results.append((stdout.strip(), stderr.strip(), return_code)) + if return_code != 0: + logger.error( + f"ERROR: Process failed [{i + 1}/{len(cmds)}]: '{cmd}'\n" + f"Return code: {return_code}\n" + f"STDOUT:\n{stdout}\n" + f"STDERR:\n{stderr}" + ) + raise RuntimeError( + f"Subprocess failed [{i + 1}/{len(cmds)}] " + f"with return code {return_code}: {cmd}" + ) + + results.append((stdout, stderr, return_code)) return results diff --git a/zppy_interfaces/pcmdi_diags/viewer.py b/zppy_interfaces/pcmdi_diags/viewer.py index 98b7034..3a3f739 100644 --- a/zppy_interfaces/pcmdi_diags/viewer.py +++ b/zppy_interfaces/pcmdi_diags/viewer.py @@ -1,4 +1,5 @@ import glob +import logging import os from datetime import datetime from pathlib import Path @@ -6,6 +7,8 @@ from jinja2 import Environment, FileSystemLoader +logger = logging.getLogger(__name__) + def collect_config( title: str = "E3SM-PMP Diagnostics", @@ -169,6 +172,8 @@ def setup_jinja_env(template_dir): """ Set up the Jinja2 environment """ + if not os.path.isdir(template_dir): + raise FileNotFoundError(f"Jinja2 template directory not found: {template_dir}") return Environment(loader=FileSystemLoader(template_dir)) @@ -425,9 +430,10 @@ def cfg(key, default=None): # Render and write rendered_html = template.render(sections=sections) - out_path = os.path.join(cfg("out_dir"), "methodology.html") + Path(cfg("out_dir", ".")).mkdir(parents=True, exist_ok=True) + out_path = os.path.join(cfg("out_dir", "."), "methodology.html") Path(out_path).write_text(rendered_html) - print(f"HTML file written to: {cfg('out_dir')}") + logger.info(f"HTML file written to: {cfg('out_dir')}") return out_path @@ -444,6 +450,8 @@ def cfg(key, default=None): # Join lists safely def join_list(key): vals = cfg(key, []) or [] + if isinstance(vals, str): + return vals return ", ".join(vals) clim_vars = join_list("clim_vars") @@ -626,9 +634,10 @@ def join_list(key): title="E3SM-PMP Diagnostics Package", sections=sections ) - out_path = os.path.join(cfg("out_dir"), "diag_data.html") + Path(cfg("out_dir", ".")).mkdir(parents=True, exist_ok=True) + out_path = os.path.join(cfg("out_dir", "."), "diag_data.html") Path(out_path).write_text(output_html) - print(f"HTML file written to: {cfg('out_dir')}") + logger.info(f"HTML file written to: {cfg('out_dir')}") return out_path @@ -662,7 +671,7 @@ def create_image_link( ): sub_path = Path(*subdirs) search_path = Path(fig_dir) / sub_path / filename_pattern - matches = glob.glob(str(search_path)) + matches = sorted(glob.glob(str(search_path))) if matches: file_name = Path(matches[0]).name @@ -917,6 +926,10 @@ def __init__(self): "AUS": {"All": "07", "El/La": "12"}, } + @staticmethod + def map_coupled_mode_to_eof_tag(mode): + return {"NPGO": "eof2"}.get(str(mode).strip().upper(), "eof1") + def create_metric_group(self, *metrics): return { name: f"divedown{str(i + 1).zfill(2)}" for i, name in enumerate(metrics) @@ -1063,6 +1076,8 @@ def build_cmvar_cell(fig_dir, diag_dir, group, variable, keys_dict): return content def generate_mcpl_row(self, mode, diag_dir, fig_dir): + mode = str(mode).strip().upper() + eof_tag = self.map_coupled_mode_to_eof_tag(mode) groups = { "MOV_eoftest": { "EOF Spectr": { @@ -1074,8 +1089,8 @@ def generate_mcpl_row(self, mode, diag_dir, fig_dir): "EOF Compos": { "CBF(Yearly)": {"cbf": "yearly"}, "CBF(Monthly)": {"cbf": "monthly"}, - "EOF(Yearly)": {"cbf": "yearly"}, - "EOF(Monthly)": {"cbf": "monthly"}, + "EOF(Yearly)": {eof_tag: "yearly"}, + "EOF(Monthly)": {eof_tag: "monthly"}, } }, "MOV_pattern": { @@ -1267,7 +1282,8 @@ def map_modes(self, names): } if names is None: return default_modes - return {k: default_modes.get(k, "EOF1") for k in names} + modes = [str(name).strip().upper() for name in names if str(name).strip()] + return {mode: default_modes.get(mode, "EOF1") for mode in modes} def map_seasons(self, names): default_seasons = ["DJF", "MAM", "JJA", "SON", "yearly", "monthly"] @@ -1355,7 +1371,7 @@ def generate_emovs_table( diag_dir: Path to diagnostics directory (for relative links). fig_dir: Path to figures directory. show: Whether to build the table. If a string (e.g., "false"), it will be coerced. - modes: List of modes or comma-separated string (e.g., "PDO,NPGO,AMO"). + modes: List of modes or comma-separated string (e.g., "NAM,PNA,NPO"). Returns: list: HTML-ready rows (list of lists of cell dicts). @@ -1368,7 +1384,7 @@ def generate_emovs_table( # Accept comma-separated string for modes if modes is None: - modes = ["PDO", "NPGO", "AMO"] + modes = ["NAM", "PNA", "NPO", "NAO", "SAM", "PSA1", "PSA2"] elif isinstance(modes, str): modes = [s.strip() for s in modes.split(",") if s.strip()] @@ -1399,10 +1415,10 @@ def map_vars(self, names): def map_regions(self, names): default_regions = ["global", "land", "ocean", "TROPICS", "NHEX", "SHEX"] if names is None: - seasons = default_regions + regions = default_regions else: - seasons = names - return seasons + regions = names + return regions def map_seasons(self, names): default_seasons = ["DJF", "MAM", "JJA", "SON", "AC"] # AC = Annual Cycle @@ -1422,7 +1438,7 @@ def build_table(self): """ Constructs a list of table row dictionaries for use in an HTML diagnostic viewer. """ - clim_path = safe_join(str(self.fig_dir), "CLIM_patttern") + clim_path = safe_join(str(self.fig_dir), "CLIM_pattern") table = [] for var in self.variables: @@ -1563,6 +1579,7 @@ def generate_viewer_html(config): ) # Write the generated HTML to the specified file + Path(config["out_dir"]).mkdir(parents=True, exist_ok=True) Path(os.path.join(config["out_dir"], "index.html")).write_text(output_html) - print(f"HTML file written to: {config['out_dir']}") + logger.info(f"HTML file written to: {config['out_dir']}") return