From a0dad648fcd09f4bd3424eaa443e14934626fc53 Mon Sep 17 00:00:00 2001 From: Quinn Lee Date: Thu, 5 Mar 2026 09:05:41 -0600 Subject: [PATCH 01/15] outline proposed changes --- src/forcingprocessor/processor.py | 76 +++++++++++++++----- src/forcingprocessor/troute_restart_tools.py | 19 +++++ 2 files changed, 76 insertions(+), 19 deletions(-) create mode 100644 src/forcingprocessor/troute_restart_tools.py diff --git a/src/forcingprocessor/processor.py b/src/forcingprocessor/processor.py index 0bb21a2..c1a9d33 100644 --- a/src/forcingprocessor/processor.py +++ b/src/forcingprocessor/processor.py @@ -20,6 +20,7 @@ from forcingprocessor.plot_forcings import plot_ngen_forcings from forcingprocessor.utils import make_forcing_netcdf, get_window, log_time, convert_url2key, report_usage, nwm_variables, ngen_variables from forcingprocessor.channel_routing_tools import channelrouting_nwm2ngen, write_netcdf_chrt +from forcingprocessor.troute_restart_tools import make_troute_restart B2MB = 1048576 @@ -790,6 +791,7 @@ def prep_ngen_data(conf): else: gpkg_files = gpkg_file map_file_path = conf['forcing'].get("map_file",None) + restart_map_file_path = conf['forcings'].get("restart_map_file", None) if map_file_path: # NWM to NGEN channel routing processing requires json map data_source = "channel_routing" if "s3://" in map_file_path: @@ -799,7 +801,15 @@ def prep_ngen_data(conf): else: with open(map_file_path, "r", encoding="utf-8") as map_file: full_nwm_ngen_map = json.load(map_file) - + elif restart_map_file_path: + data_source = "troute_restarts" + if "s3://" in restart_map_file_path: + s3 = s3fs.S3FileSystem(anon=True) + with s3.open(restart_map_file_path, "r") as map_file: + restart_map = json.load(map_file) + else: + with open(map_file_path, "r", encoding="utf-8") as map_file: + restart_map = json.load(map_file) else: data_source = "forcings" @@ -815,8 +825,8 @@ def prep_ngen_data(conf): global ii_plot, nts_plot, ngen_vars_plot ii_plot = conf.get("plot",False) if ii_plot: - if data_source == "channel_routing": - raise RuntimeError("Plotting not supported for channel routing processing.") + if data_source == "channel_routing" or "troute_restarts": + raise RuntimeError("Plotting not supported for channel routing or restart processing.") nts_plot = conf["plot"].get("nts_plot",10) ngen_vars_plot = conf["plot"].get("ngen_vars",ngen_variables) @@ -880,7 +890,7 @@ def prep_ngen_data(conf): window = [x_max, x_min, y_max, y_min] weight_time = time.perf_counter() - tw log_time("CALC_WINDOW_END", log_file) - else: + elif data_source == "channel_routing": log_time("READMAP_START", log_file) tw = time.perf_counter() if ii_verbose: @@ -903,6 +913,8 @@ def prep_ngen_data(conf): output_path = Path(output_path) if data_source == "channel_routing": forcing_path = Path(output_path, 'outputs', 'ngen') + elif data_source == "troute_restarts": + forcing_path = Path(output_path, 'restart') else: forcing_path = Path(output_path, 'forcings') meta_path = Path(output_path, 'metadata') @@ -950,8 +962,12 @@ def prep_ngen_data(conf): # s3://noaa-nwm-pds/nwm.20241029/forcing_short_range/nwm.t00z.short_range.forcing.f001.conus.nc if data_source == "forcings": pattern = r"nwm\.(\d{8})/forcing_(\w+)/nwm\.(\w+)(\d{2})z\.\w+\.forcing\.(\w+)(\d{2})\.conus\.nc" - else: + elif data_source == "channel_routing": pattern = r"nwm\.(\d{8})/(\w+)/nwm\.(\w+)(\d{2})z\.\w+\.channel_rt[^\.]*\.(\w+)(\d{2})\.conus\.nc" + else: + #TODO: figure out what regex pattern goes here + #s3://noaa-nwm-pds/nwm.20241029/analysis_assim/nwm.t16z.analysis_assim.channel_rt.tm00.conus.nc + pass # Extract forecast cycle and lead time from the first and last file names global URLBASE, FCST_CYCLE, LEAD_START, LEAD_END @@ -998,13 +1014,14 @@ def prep_ngen_data(conf): # data_array=data_array[0][None,:] # t_ax = t_ax # nwm_data=nwm_data[0][None,:] - if data_source == "forcings": - data_array, t_ax, nwm_data, nwm_file_sizes_MB = multiprocess_data_extract(nwm_forcing_files,nprocs,weights_df,fs) - else: - data_array, t_ax, nwm_file_sizes_MB = multiprocess_chrt_extract( - nwm_forcing_files,nprocs,nwm_ngen_map,fs) + if data_source == "forcings" or data_source == "channel_routing": + if data_source == "forcings": + data_array, t_ax, nwm_data, nwm_file_sizes_MB = multiprocess_data_extract(nwm_forcing_files,nprocs,weights_df,fs) + else: + data_array, t_ax, nwm_file_sizes_MB = multiprocess_chrt_extract( + nwm_forcing_files,nprocs,nwm_ngen_map,fs) - if datetime.strptime(t_ax[0],'%Y-%m-%d %H:%M:%S') > datetime.strptime(t_ax[-1],'%Y-%m-%d %H:%M:%S'): + if datetime.strptime(t_ax[0],'%Y-%m-%d %H:%M:%S') > datetime.strptime(t_ax[-1],'%Y-%m-%d %H:%M:%S'): # Hack to ensure data is always written out with time moving forward. t_ax=list(reversed(t_ax)) data_array = np.flip(data_array,axis=0) @@ -1012,10 +1029,17 @@ def prep_ngen_data(conf): LEAD_START = LEAD_END LEAD_END = tmp - t_extract = time.perf_counter() - t0 - complexity = (nfiles * ncatchments) / 10000 - score = complexity / t_extract - if ii_verbose: print(f'Data extract processs: {nprocs:.2f}\nExtract time: {t_extract:.2f}\nComplexity: {complexity:.2f}\nScore: {score:.2f}\n', end=None,flush=True) + t_extract = time.perf_counter() - t0 + complexity = (nfiles * ncatchments) / 10000 + score = complexity / t_extract + if ii_verbose: print(f'Data extract processs: {nprocs:.2f}\nExtract time: {t_extract:.2f}\nComplexity: {complexity:.2f}\nScore: {score:.2f}\n', end=None,flush=True) + + else: + #TODO: make the troute restarts + data_array = make_troute_restart() + pass + + log_time("PROCESSING_END", log_file) log_time("FILEWRITING_START", log_file) @@ -1023,22 +1047,30 @@ def prep_ngen_data(conf): if "netcdf" in output_file_type: if data_source == "forcings": netcdf_cat_file_sizes_MB = multiprocess_write_netcdf(data_array, jcatchment_dict, t_ax) - else: + elif data_source == "channel_routing": if FCST_CYCLE is None: filename = 'qlaterals.nc' else: filename = f'ngen.{FCST_CYCLE}z.{URLBASE}.channel_routing.{LEAD_START}_{LEAD_END}.nc' netcdf_cat_file_sizes_MB = write_netcdf_chrt( storage_type, forcing_path, data_array, t_ax, filename) + else: + #TODO: write the troute restart out to a file + pass # write_netcdf(data_array,"1", t_ax, jcatchment_dict['1']) if ii_verbose: print(f'Writing catchment forcings to {output_path}!', end=None,flush=True) if ii_plot or ii_collect_stats or any(x in output_file_type for x in ["csv","parquet","tar"]): if data_source == "forcings": forcing_cat_ids, filenames, individual_cat_file_sizes_MB, individual_cat_file_sizes_MB_zipped, tar_buffs = multiprocess_write_df( data_array,t_ax,list(weights_df.index),nprocs,forcing_path,data_source) - else: + elif data_source == "channel_routing": forcing_cat_ids, filenames, individual_cat_file_sizes_MB, individual_cat_file_sizes_MB_zipped, tar_buffs = multiprocess_write_df( data_array,t_ax,list(nwm_ngen_map.keys()),nprocs,forcing_path,data_source) + else: + #TODO: figure out what goes here for troute restarts, pretty sure it's just a pass + # because it will never get written out as a dataframe + pass + write_time += time.perf_counter() - t0 write_rate = ncatchments / write_time if ii_verbose: print(f'\n\nWrite processs: {nprocs}\nWrite time: {write_time:.2f}\nWrite rate {write_rate:.2f} files/second\n', end=None,flush=True) @@ -1118,7 +1150,7 @@ def prep_ngen_data(conf): "netcdf_catch_file_size_med_MB" : [netcdf_catch_file_size_med], "netcdf_catch_file_size_std_MB" : [netcdf_catch_file_size_std] } - else: + elif data_source == "channel_routing": metadata = { "runtime_s" : [round(runtime,2)], "nvars_intput" : [1], @@ -1138,6 +1170,9 @@ def prep_ngen_data(conf): "netcdf_catch_file_size_med_MB" : [netcdf_catch_file_size_med], "netcdf_catch_file_size_std_MB" : [netcdf_catch_file_size_std] } + else: + #TODO figure out what metadata for troute restarts + pass if data_source == "forcings": data_avg = np.average(data_array,axis=0) @@ -1147,7 +1182,7 @@ def prep_ngen_data(conf): data_med = np.median(data_array,axis=0) med_df = pd.DataFrame(data_med.T,columns=ngen_variables) med_df.insert(0,"catchment id",forcing_cat_ids) - else: + elif data_source == "channel_routing": data_avg = np.average(data_array[:,:,1],axis=0) avg_df = pd.DataFrame(data_avg.T, columns=['q_lateral']) avg_df.insert(0,"nexus id",list(nwm_ngen_map.keys())) @@ -1155,6 +1190,9 @@ def prep_ngen_data(conf): data_med = np.median(data_array[:,:,1],axis=0) med_df = pd.DataFrame(data_med.T,columns=['q_lateral']) med_df.insert(0,"nexus id",list(nwm_ngen_map.keys())) + else: + #TODO pretty sure troute restarts won't need stats calculated for them + pass del data_array diff --git a/src/forcingprocessor/troute_restart_tools.py b/src/forcingprocessor/troute_restart_tools.py new file mode 100644 index 0000000..19d5388 --- /dev/null +++ b/src/forcingprocessor/troute_restart_tools.py @@ -0,0 +1,19 @@ +# TODO: define the functions + +def make_troute_restart(): + # TODO: define the function + """ + What the function will do: + 1. Define a depth solving function that takes streamflow, velocity, top width, bottom width, and + channel slope as parameters + 2. Read an NextGen catchment to NHD reach mapping, a "crosswalk" netcdf file that has all the + NextGen catchments in the order that the restart file will have them in, the NWM analysis + assimilation netcdf, and the NWM routelink file (contains channel geometries) + 3. Preprocess those files + 4. Take averages of all the depth function parameters. One NextGen catchment can be associated + with multiple NHD reaches, so we average the values for all the associated NHD reaches to get + the value for the NextGen catchment to be passed into the depth function + 5. Solve for depths + 6. Create xarray dataset + """ + pass \ No newline at end of file From 751bc558fe626207ff9dd9d08e71fdd8024d7763 Mon Sep 17 00:00:00 2001 From: Quinn Lee Date: Thu, 5 Mar 2026 10:27:22 -0600 Subject: [PATCH 02/15] troute restart generation functions --- src/forcingprocessor/processor.py | 4 +- src/forcingprocessor/troute_restart_tools.py | 306 +++++++++++++++++-- 2 files changed, 289 insertions(+), 21 deletions(-) diff --git a/src/forcingprocessor/processor.py b/src/forcingprocessor/processor.py index c1a9d33..ff61c8e 100644 --- a/src/forcingprocessor/processor.py +++ b/src/forcingprocessor/processor.py @@ -20,7 +20,7 @@ from forcingprocessor.plot_forcings import plot_ngen_forcings from forcingprocessor.utils import make_forcing_netcdf, get_window, log_time, convert_url2key, report_usage, nwm_variables, ngen_variables from forcingprocessor.channel_routing_tools import channelrouting_nwm2ngen, write_netcdf_chrt -from forcingprocessor.troute_restart_tools import make_troute_restart +from forcingprocessor.troute_restart_tools import create_restart B2MB = 1048576 @@ -1036,7 +1036,7 @@ def prep_ngen_data(conf): else: #TODO: make the troute restarts - data_array = make_troute_restart() + data_array = create_restart(cat_map_fp, crosswalk_fp, nwm_aa_fp, rtlink_fp) pass diff --git a/src/forcingprocessor/troute_restart_tools.py b/src/forcingprocessor/troute_restart_tools.py index 19d5388..a08c5fd 100644 --- a/src/forcingprocessor/troute_restart_tools.py +++ b/src/forcingprocessor/troute_restart_tools.py @@ -1,19 +1,287 @@ -# TODO: define the functions - -def make_troute_restart(): - # TODO: define the function - """ - What the function will do: - 1. Define a depth solving function that takes streamflow, velocity, top width, bottom width, and - channel slope as parameters - 2. Read an NextGen catchment to NHD reach mapping, a "crosswalk" netcdf file that has all the - NextGen catchments in the order that the restart file will have them in, the NWM analysis - assimilation netcdf, and the NWM routelink file (contains channel geometries) - 3. Preprocess those files - 4. Take averages of all the depth function parameters. One NextGen catchment can be associated - with multiple NHD reaches, so we average the values for all the associated NHD reaches to get - the value for the NextGen catchment to be passed into the depth function - 5. Solve for depths - 6. Create xarray dataset - """ - pass \ No newline at end of file +""" +Tools to extract and write streamflow and depth values into a restart format ingestible by +t-route. Translates between NWM and NGEN IDs! +""" + +import json +from pathlib import Path +import xarray as xr +import numpy as np +import pandas as pd + + +def read_files( + cat_map_fp: Path, crosswalk_fp: Path, nwm_aa_fp: Path, rtlink_fp: Path +) -> tuple[dict, xr.Dataset, xr.Dataset, xr.Dataset]: + """ + Open and process needed files + + Parameters: + - cat_map_fp: path to NGEN to NWM catchment json file (Path) + - crosswalk_fp: path to "crosswalk" NetCDF file that has all the + NextGen catchments in the order that the restart file will have them in (Path) + - nwm_aa_fp: path to NWM analysis/assimilation NetCDF (Path) + - rtlink_fp: path to NWM RouteLink channel geometry NetCDF (Path) + + Returns: + - cat_map, crosswalk_ds, nwm_ds, routelink_ds: (tuple[dict, xr.Dataset, xr.Dataset, xr.Dataset]) + """ + with open(cat_map_fp, "r", encoding="utf-8") as cat_map_file: + cat_map_temp = json.load(cat_map_file) + cat_map_temp = { + k[4:]: v for k, v in cat_map_temp.items() + } # remove "cat" prefix from keys + crosswalk_ds = xr.open_dataset(crosswalk_fp) + cat_map = {} + for link_id in crosswalk_ds["link"].values: + if cat_map_temp.get(str(link_id)) is None: + cat_map[link_id] = [] # add empty list for missing cat_id + else: + cat_map[link_id] = cat_map_temp[str(link_id)] + nwm_ds = xr.open_dataset(nwm_aa_fp) + routelink_ds = xr.open_dataset(rtlink_fp) + + return cat_map, crosswalk_ds, nwm_ds, routelink_ds + + +def flat_map(cat_map: dict) -> tuple[np.ndarray, np.ndarray]: + """ + Build a flat mapping: nwm_id -> nexus_id + + Parameters: + - cat_map: a one-to-many mapping (dict) + + Returns: + - nwm_ids_flat, cat_ids_flat: np.ndarrays of the same length + + """ + nwm_ids_flat = [] + cat_ids_flat = [] + for cat_id, nwm_ids in cat_map.items(): + for nwm_id in nwm_ids: + nwm_ids_flat.append(nwm_id) + cat_ids_flat.append(cat_id) + + nwm_ids_flat = np.array(nwm_ids_flat, dtype=float) + cat_ids_flat = np.array(cat_ids_flat) + + return nwm_ids_flat, cat_ids_flat + + +def average_nwm_variables( + nwm_ids_flat: np.ndarray, cat_ids_flat: np.ndarray, nwm_ds: xr.Dataset +) -> tuple[pd.DataFrame, pd.DataFrame]: + """ + Vectorized averaging calculations for NWM data + + Parameters: + - nwm_ids_flat: array of NWM ids (np.ndarray) + - cat_ids_flat: array of NextGen cat-ids (np.ndarray) + - nwm_ds: NWM analysis/assimilation data (xr.Dataset) + + Returns: + - nwm_agg: averaged NWM data (pd.DataFrame) + - mapping_df: DataFrame version of flat maps (pd.DataFrame) + """ + + # --- NWM dataset: streamflow and velocity --- + # Filter nwm_ds to only feature_ids we care about + valid_mask = np.isin(nwm_ds["feature_id"].values, nwm_ids_flat) + nwm_sub = nwm_ds.isel(feature_id=valid_mask) + + # Build a df with feature_id -> cat_id, then merge with nwm values + mapping_df = pd.DataFrame({"feature_id": nwm_ids_flat, "cat_id": cat_ids_flat}) + + nwm_df = pd.DataFrame( + { + "feature_id": nwm_sub["feature_id"].values, + "streamflow": nwm_sub["streamflow"].values, + "velocity": nwm_sub["velocity"].values, + } + ) + + nwm_df = nwm_df.merge(mapping_df, on="feature_id", how="left") + nwm_agg = nwm_df.groupby("cat_id")[["streamflow", "velocity"]].mean() + + return nwm_agg, mapping_df + + +def average_rtlink_variables( + nwm_ids_flat: np.ndarray, mapping_df: pd.DataFrame, routelink_ds: xr.Dataset +) -> pd.DataFrame: + """ + Vectorized averaging calculations for RouteLink data + + Parameters + - nwm_ids_flat: array of NWM ids (np.ndarray) + - mapping_df: dataframe of flat map (pd.DataFrame) + - routelink_ds: NWM RouteLink data (xr.Dataset) + + Returns: + - rl_agg: averaged NWM RouteLink channel geometry data (pd.DataFrame) + """ + + # --- Routelink dataset: TopWdth, BtmWdth, ChSlp --- + valid_mask_rl = np.isin(routelink_ds["link"].values, nwm_ids_flat) + rl_sub = routelink_ds.isel(feature_id=valid_mask_rl) + + rl_df = pd.DataFrame( + { + "feature_id": rl_sub["link"].values, + "TopWdth": rl_sub["TopWdth"].values, + "BtmWdth": rl_sub["BtmWdth"].values, + "ChSlp": rl_sub["ChSlp"].values, + } + ) + + rl_df = rl_df.merge(mapping_df, on="feature_id", how="left") + rl_agg = rl_df.groupby("cat_id")[["TopWdth", "BtmWdth", "ChSlp"]].mean() + + return rl_agg + + +def quadratic_formula(b_coeff: np.ndarray, c_coeff: np.ndarray) -> np.ndarray: + """ + Vectorized quadratic formula solver (assumes no a coefficient). Only returns positive root + + Parameters: + - b_coeff: np.ndarray + - c_coeff: np.ndarray + + Returns: + - h_positive: positive root (np.ndarray) + """ + discriminant = b_coeff**2 - 4 * c_coeff + h_positive = (-b_coeff + np.sqrt(discriminant)) / 2 + + return h_positive + + +def solve_depth_geom( + streamflow: np.ndarray, + velocity: np.ndarray, + tw: np.ndarray, + bw: np.ndarray, + cs: np.ndarray, +) -> np.ndarray: + """ + Solves for depth h using CHRTOUT file variables and channel geometry variables. + + Parameters: + - streamflow: Streamflow from CHRTOUT file. (m^3/s) + - velocity: Velocity from CHRTOUT file. (m/s) + - tw: Top width of the main channel. (m) + - bw: Bottom width of the main channel. (m) + - cs: Channel slope (dimensionless). + + Returns: + - h: Initial depth that achieves the target flow rate, or NaN if no solution is found. + """ + + area = streamflow / velocity # cross-sectional area of initial flow + area = np.where(np.isnan(area), 0, area) # set NaN areas to 0 + area = np.where(np.isinf(area), 0, area) # set infinite areas to 0 + + db = (cs * (tw - bw)) / 2 # bankfull depth + area_bankfull = (tw + bw) / 2 * db # cross-sectional area at bankfull conditions + # assume trapezoidal main channel + + depths = np.zeros_like(area) # initialize depths array with 0 values + + above_bankfull = area >= area_bankfull + area_flood = area[above_bankfull] - area_bankfull[above_bankfull] + df = area_flood / (tw[above_bankfull] * 3) + depths[above_bankfull] = db[above_bankfull] + df + + # Below bankfull - solve quadratic formula directly (vectorized) + below_bankfull = ~above_bankfull & (area > 0) + + # Quadratic: h^2 + cs*bw*h - cs*area = 0 + # Using formula: h = (-b + sqrt(b^2 + 4*c)) / 2, where a=1 + h_positive = quadratic_formula( + cs[below_bankfull] * bw[below_bankfull], + -cs[below_bankfull] * area[below_bankfull], + ) + depths[below_bankfull] = h_positive + + return depths + + +def calculate_restart_values( + cat_map: dict, + crosswalk_ds: xr.Dataset, + nwm_ds: xr.Dataset, + routelink_ds: xr.Dataset, +) -> pd.DataFrame: + """ + Average depth calculation params, execute depth calculation + + Parameters: + - cat_map: NGEN to NWM catchment json file (dict) + - crosswalk_ds: "crosswalk" NetCDF file that has all the + NextGen catchments in the order that the restart file will have them in (xr.Dataset) + - nwm_ds: NWM analysis/assimilation NetCDF (xr.Dataset) + - routelink_ds: NWM RouteLink channel geometry NetCDF (xr.Dataset) + + Returns: + - result_df: pd.DataFrame with calculated restart values + + """ + + nwm_ids_flat, cat_ids_flat = flat_map(cat_map) + nwm_agg, mapping_df = average_nwm_variables(nwm_ids_flat, cat_ids_flat, nwm_ds) + rl_agg = average_rtlink_variables(nwm_ids_flat, mapping_df, routelink_ds) + result_df = pd.DataFrame({"cat_id": crosswalk_ds["link"].values}) + result_df = result_df.join(nwm_agg, on="cat_id").join(rl_agg, on="cat_id").fillna(0) + + # depth calculation + depths = solve_depth_geom( + streamflow=np.array(result_df["streamflow"].values), + velocity=np.array(result_df["velocity"].values), + tw=np.array(result_df["TopWdth"].values), + bw=np.array(result_df["BtmWdth"].values), + cs=np.array(result_df["ChSlp"].values), + ) + result_df["depth"] = depths + + return result_df + + +def create_restart( + cat_map_fp: Path, crosswalk_fp: Path, nwm_aa_fp: Path, rtlink_fp: Path +) -> xr.Dataset: + """ + Creates t-route restart file. + + Parameters: + - cat_map_fp: path to NGEN to NWM catchment json file (Path) + - crosswalk_fp: path to "crosswalk" NetCDF file that has all the + NextGen catchments in the order that the restart file will have them in (Path) + - nwm_aa_fp: path to NWM analysis/assimilation NetCDF (Path) + - rtlink_fp: path to NWM RouteLink channel geometry NetCDF (Path) + + Returns: + - restart: t-route ingestible restart file (xr.Dataset) + """ + cat_map, crosswalk_ds, nwm_ds, routelink_ds = read_files( + cat_map_fp, crosswalk_fp, nwm_aa_fp, rtlink_fp + ) + + result_df = calculate_restart_values(cat_map, crosswalk_ds, nwm_ds, routelink_ds) + + # create netcdf + restart = xr.Dataset( + data_vars={ + "hlink": (["links"], result_df["depth"].values), + "qlink1": (["links"], result_df["streamflow"].values), + "qlink2": (["links"], result_df["streamflow"].values), + }, + coords={"links": range(len(result_df))}, + attrs={ + "Restart_Time": pd.Timestamp(nwm_ds["time"].values[0]).strftime( + "%Y-%m-%d_%H:%M:%S" + ) + }, + ) + + return restart From efbac7d560fbe95e22a27a6f6fe54c6e582bf2a1 Mon Sep 17 00:00:00 2001 From: Quinn Lee Date: Thu, 5 Mar 2026 11:10:06 -0600 Subject: [PATCH 03/15] added file writing, metadata --- src/forcingprocessor/processor.py | 79 ++++++++++++++++++++++--------- 1 file changed, 56 insertions(+), 23 deletions(-) diff --git a/src/forcingprocessor/processor.py b/src/forcingprocessor/processor.py index ff61c8e..735d803 100644 --- a/src/forcingprocessor/processor.py +++ b/src/forcingprocessor/processor.py @@ -965,8 +965,8 @@ def prep_ngen_data(conf): elif data_source == "channel_routing": pattern = r"nwm\.(\d{8})/(\w+)/nwm\.(\w+)(\d{2})z\.\w+\.channel_rt[^\.]*\.(\w+)(\d{2})\.conus\.nc" else: - #TODO: figure out what regex pattern goes here #s3://noaa-nwm-pds/nwm.20241029/analysis_assim/nwm.t16z.analysis_assim.channel_rt.tm00.conus.nc + pattern = r"nwm\.(\d{8})/analysis_assim/nwm\.t(\d{2})z\.analysis_assim\.channel_rt\.tm00\.conus\.nc" pass # Extract forecast cycle and lead time from the first and last file names @@ -975,17 +975,22 @@ def prep_ngen_data(conf): FCST_CYCLE=None LEAD_START=None LEAD_END=None - if match: - URLBASE = match.group(2) - FCST_CYCLE = match.group(3) + match.group(4) - LEAD_START = match.group(5) + match.group(6) - else: - print(f"Could not extract forecast cycle and lead start from the first NWM forcing file: {nwm_forcing_files[0]}") - match = re.search(pattern, nwm_forcing_files[-1]) - if match: - LEAD_END = match.group(5) + match.group(6) + if data_source != "troute_restarts": + if match: + URLBASE = match.group(2) + FCST_CYCLE = match.group(3) + match.group(4) + LEAD_START = match.group(5) + match.group(6) + else: + print(f"Could not extract forecast cycle and lead start from the first NWM forcing file: {nwm_forcing_files[0]}") + match = re.search(pattern, nwm_forcing_files[-1]) + if match: + LEAD_END = match.group(5) + match.group(6) + else: + print(f"Could not extract lead end from the last NWM forcing file: {nwm_forcing_files[-1]}") else: - print(f"Could not extract lead end from the last NWM forcing file: {nwm_forcing_files[-1]}") + if match: + restart_date = match.group(1) + restart_hour = match.group(2) # Determine the file system type based on the first NWM forcing file global fs_type @@ -1036,8 +1041,29 @@ def prep_ngen_data(conf): else: #TODO: make the troute restarts + nwm_aa_fp = nwm_forcing_files[0] data_array = create_restart(cat_map_fp, crosswalk_fp, nwm_aa_fp, rtlink_fp) - pass + nwm_file_sizes_MB = [] + if fs_type == 'google': + fs_arg = gcsfs.GCSFileSystem() + if fs_arg: + if nwm_file.find('https://') >= 0: + _, bucket_key = convert_url2key(nwm_file,fs_type) + else: + bucket_key = nwm_file + file_obj = fs_arg.open(bucket_key, mode='rb') + nwm_file_sizes_MB.append(file_obj.details['size']) + elif 'https://' in nwm_file: + response = requests.get(nwm_file, timeout=10) + + if response.status_code == 200: + file_obj = BytesIO(response.content) + else: + raise RuntimeError(f"{nwm_file} does not exist") + nwm_file_sizes_MB.append(len(response.content) / B2MB) + else: + file_obj = nwm_file + nwm_file_sizes_MB.append(os.path.getsize(nwm_file / B2MB)) log_time("PROCESSING_END", log_file) @@ -1055,8 +1081,9 @@ def prep_ngen_data(conf): netcdf_cat_file_sizes_MB = write_netcdf_chrt( storage_type, forcing_path, data_array, t_ax, filename) else: - #TODO: write the troute restart out to a file - pass + filename = "channel_restart" + restart_date + "_" + restart_hour + "0000.nc" + data_array.to_netcdf(filename) + netcdf_cat_file_sizes_MB = [os.path.getsize(filename) / B2MB] # write_netcdf(data_array,"1", t_ax, jcatchment_dict['1']) if ii_verbose: print(f'Writing catchment forcings to {output_path}!', end=None,flush=True) if ii_plot or ii_collect_stats or any(x in output_file_type for x in ["csv","parquet","tar"]): @@ -1067,9 +1094,7 @@ def prep_ngen_data(conf): forcing_cat_ids, filenames, individual_cat_file_sizes_MB, individual_cat_file_sizes_MB_zipped, tar_buffs = multiprocess_write_df( data_array,t_ax,list(nwm_ngen_map.keys()),nprocs,forcing_path,data_source) else: - #TODO: figure out what goes here for troute restarts, pretty sure it's just a pass - # because it will never get written out as a dataframe - pass + print("Dataframes don't get written for t-route restarts") write_time += time.perf_counter() - t0 write_rate = ncatchments / write_time @@ -1171,8 +1196,13 @@ def prep_ngen_data(conf): "netcdf_catch_file_size_std_MB" : [netcdf_catch_file_size_std] } else: - #TODO figure out what metadata for troute restarts - pass + # metadata for troute restart gen + metadata = { + "runtime_s" : [round(runtime,2)], + "nwmfiles_input" : [len(nwm_forcing_files)], + "nwm_file_size" : [nwm_file_size_avg], + "netcdf_catch_file_size_MB" : [netcdf_catch_file_size_avg], + } if data_source == "forcings": data_avg = np.average(data_array,axis=0) @@ -1191,8 +1221,9 @@ def prep_ngen_data(conf): med_df = pd.DataFrame(data_med.T,columns=['q_lateral']) med_df.insert(0,"nexus id",list(nwm_ngen_map.keys())) else: - #TODO pretty sure troute restarts won't need stats calculated for them - pass + # troute restarts won't need stats calculated for them since there's no time axis + avg_df = None + med_df = None del data_array @@ -1209,8 +1240,10 @@ def prep_ngen_data(conf): local_metapath = metaf_path write_df(metadata_df, "metadata.csv", storage_type, data_source_arg="na", local_path=local_metapath, key_prefix=meta_key, bucket=meta_bucket, client=s3) - write_df(avg_df, "catchments_avg.csv", storage_type, data_source_arg="na", local_path=local_metapath, key_prefix=meta_key, bucket=meta_bucket, client=s3) - write_df(med_df, "catchments_median.csv", storage_type, data_source_arg="na", local_path=local_metapath, key_prefix=meta_key, bucket=meta_bucket, client=s3) + if avg_df != None: + write_df(avg_df, "catchments_avg.csv", storage_type, data_source_arg="na", local_path=local_metapath, key_prefix=meta_key, bucket=meta_bucket, client=s3) + if med_df != None: + write_df(med_df, "catchments_median.csv", storage_type, data_source_arg="na", local_path=local_metapath, key_prefix=meta_key, bucket=meta_bucket, client=s3) meta_time = time.perf_counter() - t000 log_time("METADATA_END", log_file) From 6152364caa0af544e47a483cd10c4ed64be52884 Mon Sep 17 00:00:00 2001 From: Quinn Lee Date: Thu, 5 Mar 2026 11:43:40 -0600 Subject: [PATCH 04/15] read input files, refactor restart gen tools --- src/forcingprocessor/processor.py | 28 ++++- src/forcingprocessor/troute_restart_tools.py | 105 +++---------------- 2 files changed, 37 insertions(+), 96 deletions(-) diff --git a/src/forcingprocessor/processor.py b/src/forcingprocessor/processor.py index 735d803..e35f795 100644 --- a/src/forcingprocessor/processor.py +++ b/src/forcingprocessor/processor.py @@ -792,6 +792,8 @@ def prep_ngen_data(conf): map_file_path = conf['forcing'].get("map_file",None) restart_map_file_path = conf['forcings'].get("restart_map_file", None) + crosswalk_file_path = conf['forcings'].get("crosswalk_file", None) + routelink_file_path = conf['forcings'].get("routelink_file", None) if map_file_path: # NWM to NGEN channel routing processing requires json map data_source = "channel_routing" if "s3://" in map_file_path: @@ -803,13 +805,29 @@ def prep_ngen_data(conf): full_nwm_ngen_map = json.load(map_file) elif restart_map_file_path: data_source = "troute_restarts" + if "s3://" in restart_map_file_path: s3 = s3fs.S3FileSystem(anon=True) with s3.open(restart_map_file_path, "r") as map_file: - restart_map = json.load(map_file) + cat_map = json.load(map_file) else: with open(map_file_path, "r", encoding="utf-8") as map_file: - restart_map = json.load(map_file) + cat_map = json.load(map_file) + + if "s3://" in crosswalk_file_path: + s3 = s3fs.S3FileSystem(anon=True) + with s3.open(crosswalk_file_path, "rb") as crosswalk_file: + crosswalk_ds = xr.open_dataset(crosswalk_file) + else: + crosswalk_ds = xr.open_dataset(crosswalk_file_path) + + if "s3://" in routelink_file_path: + s3 = s3fs.S3FileSystem(anon=True) + with s3.open(routelink_file_path, "rb") as routelink_file: + routelink_ds = xr.open_dataset(routelink_file) + else: + routelink_ds = xr.open_dataset(routelink_file_path) + else: data_source = "forcings" @@ -1040,9 +1058,7 @@ def prep_ngen_data(conf): if ii_verbose: print(f'Data extract processs: {nprocs:.2f}\nExtract time: {t_extract:.2f}\nComplexity: {complexity:.2f}\nScore: {score:.2f}\n', end=None,flush=True) else: - #TODO: make the troute restarts - nwm_aa_fp = nwm_forcing_files[0] - data_array = create_restart(cat_map_fp, crosswalk_fp, nwm_aa_fp, rtlink_fp) + nwm_file = nwm_forcing_files[0] nwm_file_sizes_MB = [] if fs_type == 'google': fs_arg = gcsfs.GCSFileSystem() @@ -1065,6 +1081,8 @@ def prep_ngen_data(conf): file_obj = nwm_file nwm_file_sizes_MB.append(os.path.getsize(nwm_file / B2MB)) + with xr.open_dataset(file_obj) as nwm_ds: + data_array = create_restart(cat_map, crosswalk_ds, nwm_ds, routelink_ds) log_time("PROCESSING_END", log_file) diff --git a/src/forcingprocessor/troute_restart_tools.py b/src/forcingprocessor/troute_restart_tools.py index a08c5fd..97dbabc 100644 --- a/src/forcingprocessor/troute_restart_tools.py +++ b/src/forcingprocessor/troute_restart_tools.py @@ -3,71 +3,11 @@ t-route. Translates between NWM and NGEN IDs! """ -import json -from pathlib import Path import xarray as xr import numpy as np import pandas as pd -def read_files( - cat_map_fp: Path, crosswalk_fp: Path, nwm_aa_fp: Path, rtlink_fp: Path -) -> tuple[dict, xr.Dataset, xr.Dataset, xr.Dataset]: - """ - Open and process needed files - - Parameters: - - cat_map_fp: path to NGEN to NWM catchment json file (Path) - - crosswalk_fp: path to "crosswalk" NetCDF file that has all the - NextGen catchments in the order that the restart file will have them in (Path) - - nwm_aa_fp: path to NWM analysis/assimilation NetCDF (Path) - - rtlink_fp: path to NWM RouteLink channel geometry NetCDF (Path) - - Returns: - - cat_map, crosswalk_ds, nwm_ds, routelink_ds: (tuple[dict, xr.Dataset, xr.Dataset, xr.Dataset]) - """ - with open(cat_map_fp, "r", encoding="utf-8") as cat_map_file: - cat_map_temp = json.load(cat_map_file) - cat_map_temp = { - k[4:]: v for k, v in cat_map_temp.items() - } # remove "cat" prefix from keys - crosswalk_ds = xr.open_dataset(crosswalk_fp) - cat_map = {} - for link_id in crosswalk_ds["link"].values: - if cat_map_temp.get(str(link_id)) is None: - cat_map[link_id] = [] # add empty list for missing cat_id - else: - cat_map[link_id] = cat_map_temp[str(link_id)] - nwm_ds = xr.open_dataset(nwm_aa_fp) - routelink_ds = xr.open_dataset(rtlink_fp) - - return cat_map, crosswalk_ds, nwm_ds, routelink_ds - - -def flat_map(cat_map: dict) -> tuple[np.ndarray, np.ndarray]: - """ - Build a flat mapping: nwm_id -> nexus_id - - Parameters: - - cat_map: a one-to-many mapping (dict) - - Returns: - - nwm_ids_flat, cat_ids_flat: np.ndarrays of the same length - - """ - nwm_ids_flat = [] - cat_ids_flat = [] - for cat_id, nwm_ids in cat_map.items(): - for nwm_id in nwm_ids: - nwm_ids_flat.append(nwm_id) - cat_ids_flat.append(cat_id) - - nwm_ids_flat = np.array(nwm_ids_flat, dtype=float) - cat_ids_flat = np.array(cat_ids_flat) - - return nwm_ids_flat, cat_ids_flat - - def average_nwm_variables( nwm_ids_flat: np.ndarray, cat_ids_flat: np.ndarray, nwm_ds: xr.Dataset ) -> tuple[pd.DataFrame, pd.DataFrame]: @@ -207,14 +147,14 @@ def solve_depth_geom( return depths -def calculate_restart_values( +def create_restart( cat_map: dict, crosswalk_ds: xr.Dataset, nwm_ds: xr.Dataset, routelink_ds: xr.Dataset, -) -> pd.DataFrame: +) -> xr.Dataset: """ - Average depth calculation params, execute depth calculation + Creates t-route restart file. Parameters: - cat_map: NGEN to NWM catchment json file (dict) @@ -224,11 +164,19 @@ def calculate_restart_values( - routelink_ds: NWM RouteLink channel geometry NetCDF (xr.Dataset) Returns: - - result_df: pd.DataFrame with calculated restart values - + - restart: t-route ingestible restart file (xr.Dataset) """ - nwm_ids_flat, cat_ids_flat = flat_map(cat_map) + nwm_ids_flat = [] + cat_ids_flat = [] + for cat_id, nwm_ids in cat_map.items(): + for nwm_id in nwm_ids: + nwm_ids_flat.append(nwm_id) + cat_ids_flat.append(cat_id) + + nwm_ids_flat = np.array(nwm_ids_flat, dtype=float) + cat_ids_flat = np.array(cat_ids_flat) + nwm_agg, mapping_df = average_nwm_variables(nwm_ids_flat, cat_ids_flat, nwm_ds) rl_agg = average_rtlink_variables(nwm_ids_flat, mapping_df, routelink_ds) result_df = pd.DataFrame({"cat_id": crosswalk_ds["link"].values}) @@ -244,31 +192,6 @@ def calculate_restart_values( ) result_df["depth"] = depths - return result_df - - -def create_restart( - cat_map_fp: Path, crosswalk_fp: Path, nwm_aa_fp: Path, rtlink_fp: Path -) -> xr.Dataset: - """ - Creates t-route restart file. - - Parameters: - - cat_map_fp: path to NGEN to NWM catchment json file (Path) - - crosswalk_fp: path to "crosswalk" NetCDF file that has all the - NextGen catchments in the order that the restart file will have them in (Path) - - nwm_aa_fp: path to NWM analysis/assimilation NetCDF (Path) - - rtlink_fp: path to NWM RouteLink channel geometry NetCDF (Path) - - Returns: - - restart: t-route ingestible restart file (xr.Dataset) - """ - cat_map, crosswalk_ds, nwm_ds, routelink_ds = read_files( - cat_map_fp, crosswalk_fp, nwm_aa_fp, rtlink_fp - ) - - result_df = calculate_restart_values(cat_map, crosswalk_ds, nwm_ds, routelink_ds) - # create netcdf restart = xr.Dataset( data_vars={ From a499e20527644751fb4ea54801b8d39954d12bf0 Mon Sep 17 00:00:00 2001 From: Quinn Lee Date: Thu, 5 Mar 2026 12:22:15 -0600 Subject: [PATCH 05/15] add example files, minor bug fixes --- .gitignore | 1 + .../RouteLink_CONUS_subset.nc | Bin 0 -> 26236 bytes .../channel_restart_20250718_000000.nc | Bin 0 -> 9007 bytes .../examples/troute-restart_example/conf.json | 20 ++++++++++++++ .../crosswalk_subset.nc | Bin 0 -> 6208 bytes .../hf2.2_subset_cat_map.json | 18 ++++++++++++ src/forcingprocessor/processor.py | 26 ++++++++++-------- src/forcingprocessor/troute_restart_tools.py | 13 +++++++-- 8 files changed, 65 insertions(+), 13 deletions(-) create mode 100644 docs/examples/troute-restart_example/RouteLink_CONUS_subset.nc create mode 100644 docs/examples/troute-restart_example/channel_restart_20250718_000000.nc create mode 100644 docs/examples/troute-restart_example/conf.json create mode 100644 docs/examples/troute-restart_example/crosswalk_subset.nc create mode 100644 docs/examples/troute-restart_example/hf2.2_subset_cat_map.json diff --git a/.gitignore b/.gitignore index 4a5c204..eb35b02 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ tests/__pycache__/ tests/data/ tests/*.txt docs/examples/*/forcings +docs/examples/*/restart *.txt \ No newline at end of file diff --git a/docs/examples/troute-restart_example/RouteLink_CONUS_subset.nc b/docs/examples/troute-restart_example/RouteLink_CONUS_subset.nc new file mode 100644 index 0000000000000000000000000000000000000000..9bc018850e152635601672a20d1ab2c406e7d8b9 GIT binary patch literal 26236 zcmeG^3v^UPmeuJFgwO(U2tmZ5Kn8-6n2_*`2KtjU3FL$62*_k;o9@?1OLxC)zwRVB zuA(EyBLfQ}>a4Q|$8}d{J&qq1J#Ud*sp);8DGwB(#| zoh4_6$TXj&-mNK>5A!aR@fsE9=P(oIOqhUwpU=g=j5W%LS!dycnT=(6%)*#ck928~ zSWu6!Z$SPi_~pRQ=_)BNU0^+*K~-jEm1P5(Hx`d-ts6TvQ8-@}js?S-TZ;xY!&;4> zo1wUDGKYI3e%=8!#5yz{&_f};8`|UESS%V`ABRS(cs}RQV`?pKXtHoq zUCqi++~8gF%K3~y2)@uFntujw*L1%(=3VbKG~QKKQN>lkRrAU{Oc}Y0_fddUQjgmSx^{4Zy4g zw5S&GX*>k;Tg4;s@OmxU94Idf=)Rx;QNQm>VtPqHjevXm3UObDH_{G+O0Lb}9ax)@sIE2W_l8P> zkw7Spqww=~Xm|irL@Df20}RrGqbaK>Uy!I|H6U?xz3$(b+!??@EgI0HVFE5UI$d@-YKneGFGYinBF=;^BP zl(mVOZ)>5?jkfwoXFTSijsW#}sH~?JU|ec+be00zQUO2|Nx!N=Kw^ItA4VmDDKJ^# z^G7DPOMb*mrinn(EXQOr5g()2JP|oi!Srd!VIXi;l*fv$hNoP?o6Px=H|?AvUbCGt zVue%12f*QoAEw60k&t_hlH9@BV0XI(OF&+SjH{-;v9`%w-`wPJJM9g%iV2>Dd<{Z} zj2Kpfd}9P@dI*YL0@?@au_Y}%y;%`pk#_*iSTV>vUM2&4lHfJn!w>rsC& z0$kDP*D$gNXix--F`*d182JYKB!YmsdOlus{SW8a5ay%A7)%zbBd@-uL5sAuYp1cZJcw`-_(w=$Frkr5Wb4{>2I;Nz%wqT~hvpD$_K^QUFb<^}-`Au-B)d{6ka zPqIVNk-_po91|h6y!vcFJw!LD<<)!*D6fcrKt6p>G4+$MDm7mXdq6#7)AZk(CLbpf zLu&oRV^Z^pt)}KHa1W?Q5a%HEBSK)HdI-VALY$OOa3~a|`^r=Pq%`@6HyEgXDnD6f z56D;9>}diBWn|+l1hNpwLLdu)ECjL;$U-0sfh+{F5XeFx3xTgB1W^A$8X?lSa@A8* zo|xGlzf3+Wl_rd>I(Y+G^XN>{3z7sSFo+c;g#zDaz1I3BZiOzsNpl+l~ zi=qOwdfr@6#eg2Sy0Ua}>Able{=od|^3w9k(n_HbVX3+5<-f-~cTYe3&KM5AJ1=mt z!V(wz7Cbxdy2^A8#1pQ)ibkDw(D*HurEgOUF8X+T?_u~&ov@>K#rPe)z3>a(zq9vs z%Z}dat{uJIUsO20lUwO1I#%IWW3F_(mRIT6+%?nj;nYe;3%slI9&{dj?ImZ+A8pbxjuhGeDt{S%K zO()wox`rKn4eCAU+}<0nPW|1t%v(*rE5{dA=WqJMMN6JDs_xxCdm;WdKbTu}>#+$J zy>S2Ai*{Y-Tw49uMN6mL`Tml-E8br6>el~V^5<1!mfrlkmzO;7+Z&b~+W+*@S;lXc z?*5P4ZLt^cvORR@Zrf8^Jhq$4p0T}Ky4n7RSC1VV>W6?)I`S_)N}ZG=wp=!T^(40J znj-drwTLxcQ^?-EzL529D`X$-En?ey{{*n?|GRJ5L4c+Ftv&Lo?Z`W$?VIm^%6{?p z4%pZK*GqQBes6!|%Fpcoc*R|goz1gt%jeIx-8U`Qz946;{g!ve+CQuNneDqjo^Aj2 z(Kn!MHe?}?g+LYpUmpl$(x$dIjZJL`qp0Ay8M3JznklF4ZJNE2*wp@e)_H$K*-M(t z)~QczR27OC;3OpM1SSiaeJp{f9^Q4XEY#ES+M_~xTQh_`Z84NZF~tQR@`OfSU?f&Ew6SHzYK9%JbzU9ogVTYfa_mtV zF|_FozjusPubpy-tFGR`$fXPGnW`4z<=>XIl3-5pBYyu=@TTHW>3?PtdtW)V*t=Vy z+*@+cOj_Y7ah$j~a?fWtD}Zly5h)B#_0ezL`HO12*$u;km~{A#%O5m;1?9*;Ly|wzGpEj&=n$*a-51sf-$*?MjJQQ z;Mdxt8h9jVUL$6mi&KW~QDWz9#K1w)8};}3L*Uf+YZkFO7rs!QqQmfF7zL#>!;cZU z9$$S=z|mS)109Bb72tS^+sYw=Fb4j~aNVa@?-ixSU@0_OL#Yb+uCLVUtQf-^`^R1zEudvD1Kto?RV05w``j^R$}=D8O8F6Q)LTWHAJ2~ zrwf+D5}FpvaZCc;z?pJLB!{UO6v@f+hLAj>M`Io6iZ&#cFUPi$0TM2_%T8)nN?~uDVdhg8$$A@r;j7S; z=YzvBaPB1@=H-hQEds}Cd(Emvi{MCBjKj7giYFRiQ&ZgN?ZK+uVSUaJ+Y)5v*ozJ= z*xnHvWV3@%#sXn8>V)3z%=`TK@_AqCO6AxJKrKJylBub6KQs{3FrQk3shQ3nX)*m&>oqZI^ z83OhI1I03hLLctzIV5F4v(i?bs|rxOynE8kLgvVQ{L`8o_9(ob#O6Nq{@JV$s+`Oo zeEGw(SqVH&XREhXOk*ynZX)}^RSm^#hbVjTm@LWi#BP55*mkxSDl25OKg|0LdmSDpvyTgZdWc;Q zk5kx_8@_mp-2sm?*`HVJdY8?E&(3Df)g2!)U6kGWi$HKNR>FnA+dBEy_Hx(hgj6?CsYg) zTTtaDg2b3JDJe0R4z4IHqw^|?Na&b~A{4QU!tW){BBKyN;uoDoQ7CU&R*J@?QyEa^ zBwcfBib5i@`g@%EhY$L{ncU*UbL7~$0zqa^h#^WikJ{erldJz&BOCe=wO57QGmTgX z8*IT^9eOMVLUeb~4+0=)sPWKtL<@nW4Lj~7#HoqfR`*yq?&VaaH^9siC`^_abv~nwUkYSi`i@Lt)pxr z;8F6xm z4N-alY>5(Q%bj|-zUE(AsSJ_71&=y5!_EtX$wF)(SDd&%B9t=n9sQM3&$GurTRw;Z zCHahNvi&EPLX#x_q(7Pc)w2Hpe@jbXcz_;}ydLa1c+UV{k$_t-GnOW`jq7S^&f zHYb#7kTm_?jRuFyS$rB?LQ_^&QdSQC8~!c-AChY&E6qg;nXDmM>v&d%rsI}cS8Ype zle5+%&Ur6@dXYin9W#a&Ly1yqp%a%uusnP(`^X192Tt6V1l8F)zff&cRAD%S5}#kp zC_mnNn&a~loYyo0&olrYPI@0ccMDwihH<#e8(k?E9(z8CNf!)4-c-(^6f5>6x}rcnIV+9&}o4(k1b zA`VVuc+&Svnvcz#DIwRJ5y%lCRHkfePPa{F`4EJFyLp74Q)z9CAeiI~G?ku8IzV;C zH-5SKketmXlhhJnKh^DJ!%z`Oyx+47K7z1HM#Ew7HY|FgoEd#Gk-w*PvqbqF2ZVbe z>u^|=*ffEDEeu|qa8oCGfLdW@(aTNQJHg(np);pAKuH`9Zq5v8onVahfDdT94!6cd z3Km|_EFt^qP;@fvi^gJU=;P!Yp>CEy9@KrdO95D@uTU|WtkX)|mq^!54t}d+F?iR4 zw*cJP2xm~|@bz#Tn5IPra~Kf!jFQR>Z%c(;0Ioj6SA8IR%Z%y+CH>2HmoJrbTtBQi zMpVR&XKo%^#mwhn+O$d1(Dlp3OBR*UaA?SmMNinF!CEoI{Nm07drA6cpB{lLV!_@I zmy}wkOXv^qh(_HmeiRP#j*{|`u{b!duyw&IMw(ElMTE=#?)*z6qGGVfi0dfr7t&FA+r*q3?^p32Nqt(0r zS*NPVMqdI1BqeSciV}w{pzsuh)G#=y9fvp~PAYged_*x;Ur!x>9R4M zI|P43OD*z8U?6<8u#cl6?YLW6WSJ<|4^MVvtnKA-8iFtprfj*jM9$RjGcr>okzAIB zj!u`ksvcslaA$S literal 0 HcmV?d00001 diff --git a/docs/examples/troute-restart_example/channel_restart_20250718_000000.nc b/docs/examples/troute-restart_example/channel_restart_20250718_000000.nc new file mode 100644 index 0000000000000000000000000000000000000000..96916ea683bee0766c2df321e750e88f9a063388 GIT binary patch literal 9007 zcmeHNO=uHa6h4z_VyM&FrhDpYWzD87xj5EOA$EP~W??oX{lEq%~Zg?p2kbLZT9 z&pr3M-a;iJhRaLpB(&hDnEK{OvZsZ!jXn20aKAY%{BwfZW*E*)t-J(ejamq>; zXw;>qhR#8q&H&w9fYC-39jd?!6#&Ux+0K;g^j>Qy2dl7!4i_G+{?@i;|L@H$Tho63 zcJ}quqaL&hW*uwpgL$j)=N9TsWd#@PEGC8XQ+qxz_p9nPt(KZxnx^5rtVpRFO(b~NMRz=dO-sT4ztU?v|UJ-yL*swWXor^21FC>SQ` zb#U_m@y^~T>+cc$3A{4a--TZ1Bi4|W{nia^;bni_+HsB{%!OesA@++IoW?M)90F6g z5i_ym7AHr%E2*Yyz_t{?-*m63ySy+Eien*b(q;^KEH5m>q2EXz{fKeC<5ueeECrEq-1s z>YGbT{SkrQ6@ebdw2?5vG7I(l+Xnw2I`EF0#CA*YB}oLofs>@gCqqZ*PwUT^k)MJ0 zv`{x36^PvAz_8o!|BExYlN_^Hu*Zn*IzFgsDA>Q=uVX!_y+Bb$0U_gLwqR#L&!15& zaBh4adWA#kFRm85DV&^J>hapb$_9CpPWhdW6@*=SCYEL#Y2}boAPM+L<5n zy@Gs?Am1CXMs-pMCieEO@dvv{V_dCCoM%3!3wxjLGfOlM{qCG(|;--;gto>M4}pfw)&$c{oAsigVd-vgvE9gK^{R&o zdaV8d9=v$+BzWkhp8OjGk9rgj;>kjtc^`&`dXR#Myn)@>H*em|@BQ9P0`FPiXfVE9jtd3y`y{ftG(gD|>8+(i7|8VlFS#EFz0m(kGb9ofQh zMNOL~i>_%JWhRl@FbQ!1+2DjMLwb@d#zh*7RF62It^l;oB8)bY{_RH0I=LD!4JJO& zbAn{U?^mqBAojz^OM)Rw&B2ryKuMX@6u_LtePfsmf<#}dU3XoW!U^JuFg9ANw&S)s zPS3UKZ5zxBWH%tq7dZ7*oB5v*{v+nlu=B0@@-PfEa@pR$QREy>9h<}KNs5W5F3POL zQ~WB2)Zbu{NSVaLKo;d;G{UXWKXU*4)R#o5+!T0`R4MCkpYM09<~8oPBOITJX*xCI zPx1jQZ9KF9{BSFXhT$NBEFCZpdvtZko|1@OeSGnS5ScbvXs@hddXBXg5AFqV5@N|w z<;>timGdBoDN}86E@HJ=tCX*l7w7Ky{mZq5@EGwflRvG3dZQg)9%u~j$y<&H5$Fh4YV-?|3 zM&C=kVUUbcAA>&c*6lqEsYEgGe==~@>a datetime.strptime(t_ax[-1],'%Y-%m-%d %H:%M:%S'): - # Hack to ensure data is always written out with time moving forward. - t_ax=list(reversed(t_ax)) - data_array = np.flip(data_array,axis=0) - tmp = LEAD_START - LEAD_START = LEAD_END - LEAD_END = tmp + # Hack to ensure data is always written out with time moving forward. + t_ax=list(reversed(t_ax)) + data_array = np.flip(data_array,axis=0) + tmp = LEAD_START + LEAD_START = LEAD_END + LEAD_END = tmp t_extract = time.perf_counter() - t0 complexity = (nfiles * ncatchments) / 10000 @@ -1062,6 +1064,8 @@ def prep_ngen_data(conf): nwm_file_sizes_MB = [] if fs_type == 'google': fs_arg = gcsfs.GCSFileSystem() + else: + fs_arg = None if fs_arg: if nwm_file.find('https://') >= 0: _, bucket_key = convert_url2key(nwm_file,fs_type) @@ -1099,7 +1103,7 @@ def prep_ngen_data(conf): netcdf_cat_file_sizes_MB = write_netcdf_chrt( storage_type, forcing_path, data_array, t_ax, filename) else: - filename = "channel_restart" + restart_date + "_" + restart_hour + "0000.nc" + filename = "channel_restart_" + restart_date + "_" + restart_hour + "0000.nc" data_array.to_netcdf(filename) netcdf_cat_file_sizes_MB = [os.path.getsize(filename) / B2MB] # write_netcdf(data_array,"1", t_ax, jcatchment_dict['1']) diff --git a/src/forcingprocessor/troute_restart_tools.py b/src/forcingprocessor/troute_restart_tools.py index 97dbabc..4b5e5c4 100644 --- a/src/forcingprocessor/troute_restart_tools.py +++ b/src/forcingprocessor/troute_restart_tools.py @@ -148,7 +148,7 @@ def solve_depth_geom( def create_restart( - cat_map: dict, + cat_map_temp: dict, crosswalk_ds: xr.Dataset, nwm_ds: xr.Dataset, routelink_ds: xr.Dataset, @@ -166,7 +166,16 @@ def create_restart( Returns: - restart: t-route ingestible restart file (xr.Dataset) """ - + cat_map_temp = { + k[4:]: v for k, v in cat_map_temp.items() + } # remove "cat" prefix from keys + + cat_map = {} + for link_id in crosswalk_ds["link"].values: + if cat_map_temp.get(str(link_id)) is None: + cat_map[link_id] = [] # add empty list for missing cat_id + else: + cat_map[link_id] = cat_map_temp[str(link_id)] nwm_ids_flat = [] cat_ids_flat = [] for cat_id, nwm_ids in cat_map.items(): From c0da995f15b599724b666f34819e5261a9611d26 Mon Sep 17 00:00:00 2001 From: Quinn Lee Date: Thu, 5 Mar 2026 12:25:49 -0600 Subject: [PATCH 06/15] updated readme --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 19035cd..08e24c0 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,9 @@ where the list of `nwm-id`s are the NHD reaches associated with that NextGen hyd | nwm_file | Path to a text file containing nwm file names. One filename per line. [Tool](#nwm_file) to create this file | :white_check_mark: | | gpkg_file | Geopackage file to define spatial domain. Use [hfsubset](https://github.com/lynker-spatial/hfsubsetCLI) to generate a geopackage with a `forcing-weights` layer. Accepts local absolute path, s3 URI or URL. Also acceptable is a weights parquet generated with [weights_hf2ds.py](https://github.com/CIROH-UA/forcingprocessor/blob/main/src/forcingprocessor/weights_hf2ds.py), though the plotting option will no longer be available. | :white_check_mark: | | map_file | Path to a json containing the NWM to NGEN mapping for channel routing data extraction. Absolute path or s3 URI | | +| restart_map_file | Path to a json containing the NWM to NGEN catchment mapping for t-route restart generation. Absolute path or s3 URI | | +| crosswalk_file | Path to a netCDF containing the exact order of the catchments in the t-route restart file. Absolute path or s3 URI | | +| routelink_file | Path to a netCDF containing the NWM channel geometry data, needed for t-route restart generation. Absolute path or s3 URI | | ### 2. Storage From 1211f4dcebe7b15df079591f9171337187487e2a Mon Sep 17 00:00:00 2001 From: Quinn Lee Date: Thu, 5 Mar 2026 13:18:51 -0600 Subject: [PATCH 07/15] fixes to maintain backwards compatibility --- src/forcingprocessor/processor.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/forcingprocessor/processor.py b/src/forcingprocessor/processor.py index 2c60fba..4976e18 100644 --- a/src/forcingprocessor/processor.py +++ b/src/forcingprocessor/processor.py @@ -843,7 +843,7 @@ def prep_ngen_data(conf): global ii_plot, nts_plot, ngen_vars_plot ii_plot = conf.get("plot",False) if ii_plot: - if data_source == "channel_routing" or "troute_restarts": + if data_source == "channel_routing" or data_source == "troute_restarts": raise RuntimeError("Plotting not supported for channel routing or restart processing.") nts_plot = conf["plot"].get("nts_plot",10) @@ -1244,8 +1244,8 @@ def prep_ngen_data(conf): med_df.insert(0,"nexus id",list(nwm_ngen_map.keys())) else: # troute restarts won't need stats calculated for them since there's no time axis - avg_df = None - med_df = None + avg_df = pd.DataFrame() + med_df = pd.DataFrame() del data_array @@ -1262,9 +1262,9 @@ def prep_ngen_data(conf): local_metapath = metaf_path write_df(metadata_df, "metadata.csv", storage_type, data_source_arg="na", local_path=local_metapath, key_prefix=meta_key, bucket=meta_bucket, client=s3) - if avg_df != None: + if not avg_df.empty: write_df(avg_df, "catchments_avg.csv", storage_type, data_source_arg="na", local_path=local_metapath, key_prefix=meta_key, bucket=meta_bucket, client=s3) - if med_df != None: + if not med_df.empty: write_df(med_df, "catchments_median.csv", storage_type, data_source_arg="na", local_path=local_metapath, key_prefix=meta_key, bucket=meta_bucket, client=s3) meta_time = time.perf_counter() - t000 From 85b156b2f454ff8805e5e24e7ea613ad5fa4af3f Mon Sep 17 00:00:00 2001 From: Quinn Lee Date: Thu, 5 Mar 2026 13:28:31 -0600 Subject: [PATCH 08/15] add missing file --- docs/examples/troute-restart_example/filenamelist.txt | 1 + 1 file changed, 1 insertion(+) create mode 100644 docs/examples/troute-restart_example/filenamelist.txt diff --git a/docs/examples/troute-restart_example/filenamelist.txt b/docs/examples/troute-restart_example/filenamelist.txt new file mode 100644 index 0000000..8394e8a --- /dev/null +++ b/docs/examples/troute-restart_example/filenamelist.txt @@ -0,0 +1 @@ +https://noaa-nwm-pds.s3.amazonaws.com/nwm.20250718/analysis_assim/nwm.t00z.analysis_assim.channel_rt.tm00.conus.nc \ No newline at end of file From 37f73b10fb0ff23aada1f04f375ee3b7d1464a7f Mon Sep 17 00:00:00 2001 From: Quinn Lee Date: Thu, 5 Mar 2026 14:40:25 -0600 Subject: [PATCH 09/15] fixed calculation --- src/forcingprocessor/processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/forcingprocessor/processor.py b/src/forcingprocessor/processor.py index 4976e18..fc4ec35 100644 --- a/src/forcingprocessor/processor.py +++ b/src/forcingprocessor/processor.py @@ -1083,7 +1083,7 @@ def prep_ngen_data(conf): nwm_file_sizes_MB.append(len(response.content) / B2MB) else: file_obj = nwm_file - nwm_file_sizes_MB.append(os.path.getsize(nwm_file / B2MB)) + nwm_file_sizes_MB.append(os.path.getsize(nwm_file) / B2MB) with xr.open_dataset(file_obj) as nwm_ds: data_array = create_restart(cat_map, crosswalk_ds, nwm_ds, routelink_ds) From 3c4483fdca8d07e40a3847e35377dad9fe6cd57c Mon Sep 17 00:00:00 2001 From: Quinn Lee Date: Wed, 11 Mar 2026 16:19:43 -0500 Subject: [PATCH 10/15] add unit tests --- tests/test_trouterestarts.py | 138 +++++++++++++++++++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 tests/test_trouterestarts.py diff --git a/tests/test_trouterestarts.py b/tests/test_trouterestarts.py new file mode 100644 index 0000000..390bf41 --- /dev/null +++ b/tests/test_trouterestarts.py @@ -0,0 +1,138 @@ +""" +Unit tests for restart utility functions. +""" + +import numpy as np +import pandas as pd +import pytest +import xarray as xr + +from forcingprocessor.troute_restart_tools import ( + average_nwm_variables, + average_rtlink_variables, + create_restart, + quadratic_formula, + solve_depth_geom, +) + + +# --------------------------------------------------------------------------- +# minimum viable examples +# --------------------------------------------------------------------------- + +simple_nwm_ds = xr.Dataset( + { + "streamflow": ("feature_id", [10.0, 20.0, 30.0, 40.0]), + "velocity": ("feature_id", [1.0, 2.0, 3.0, 4.0]), + }, + coords={ + "feature_id": [101, 102, 103, 104], + "time": [np.datetime64("2024-01-15T12:00:00.000000000")], + }, +) + +simple_routelink_ds = xr.Dataset( + { + "link": ("feature_id", [101, 102, 103, 104]), + "TopWdth": ("feature_id", [10.0, 12.0, 14.0, 16.0]), + "BtmWdth": ("feature_id", [5.0, 6.0, 7.0, 8.0]), + "ChSlp": ("feature_id", [0.5, 0.5, 0.5, 0.5]), + }, + coords={}, +) + +simple_crosswalk_ds = xr.Dataset(coords={"link": [1, 2]}) + +simple_cat_map = { + "cat-1": [101.0, 102.0], + "cat-2": [103.0, 104.0], +} + + +# --------------------------------------------------------------------------- +# unit tests +# --------------------------------------------------------------------------- + + +def test_averages_streamflow_and_velocity(): + # cat 1 -> features 101 (sf=10, v=1) and 102 (sf=20, v=2) => mean sf=15, v=1.5 + # cat 2 -> feature 103 (sf=30, v=3) => mean sf=30, v=3.0 + nwm_ids = np.array([101.0, 102.0, 103.0]) + cat_ids = np.array([1, 1, 2]) + agg, mapping = average_nwm_variables(nwm_ids, cat_ids, simple_nwm_ds) + + assert agg.loc[1, "streamflow"] == pytest.approx(15.0) # test averaging + assert agg.loc[1, "velocity"] == pytest.approx(1.5) + assert agg.loc[2, "streamflow"] == pytest.approx(30.0) + assert agg.loc[2, "velocity"] == pytest.approx(3.0) + + assert len(mapping) == 3 # test mapping + assert set(mapping.columns) == {"feature_id", "cat_id"} + + assert 3 not in agg.index # test subsetting + assert len(agg) == 2 + + +def test_averages_routelink(): + nwm_ids = np.array([101.0, 102.0, 103.0, 104.0]) + mapping = pd.DataFrame( + {"feature_id": [101.0, 102.0, 103.0, 104.0], "cat_id": [1, 1, 2, 2]} + ) + agg = average_rtlink_variables(nwm_ids, mapping, simple_routelink_ds) + + assert agg.loc[1, "TopWdth"] == pytest.approx(11.0) # test averaging + assert agg.loc[2, "TopWdth"] == pytest.approx(15.0) + assert agg.loc[1, "BtmWdth"] == pytest.approx(5.5) + assert agg.loc[2, "BtmWdth"] == pytest.approx(7.5) + assert agg.loc[1, "ChSlp"] == pytest.approx(0.5) + assert agg.loc[2, "ChSlp"] == pytest.approx(0.5) + + assert len(agg) == 2 # test layout + + +def test_quadratic_formula(): + # Two equations: [x^2+2x-3, x^2-4] -> roots [1, 2] + b = np.array([2.0, 0.0]) + c = np.array([-3.0, -4.0]) + result = quadratic_formula(b, c) + np.testing.assert_allclose(result, [1.0, 2.0]) + + +def test_solve_depth_geom(): + sf = np.array([0.0, 5.0, 2.0, 8.0, 1000.0, np.nan]) + v = np.array([1.0, 0.0, 1.0, 1.0, 1.0, 1.0]) + tw = np.array([10.0, 10.0, 10.0, 10.0, 10.0, 10.0]) + bw = np.array([5.0, 5.0, 5.0, 5.0, 5.0, 5.0]) + cs = np.array([0.5, 0.5, 0.5, 0.5, 0.5, 0.5]) + depths = solve_depth_geom(sf, v, tw, bw, cs) + + assert depths.shape == (6,) + assert depths[0] == pytest.approx(0.0) + assert depths[1] == pytest.approx(0.0) + assert depths[2] == pytest.approx(0.35078106) + assert depths[3] == pytest.approx(1.10849528) + assert depths[4] == pytest.approx(34.27083333) + assert depths[5] == pytest.approx(0.0) + + +def test_restart(): + result = create_restart( + simple_cat_map, simple_crosswalk_ds, simple_nwm_ds, simple_routelink_ds + ) + + n_links = len(simple_crosswalk_ds["link"]) + assert result["hlink"].shape == (n_links,) + assert result["qlink1"].shape == (n_links,) + assert result["qlink2"].shape == (n_links,) + + assert result.attrs["Restart_Time"] == "2024-01-15_12:00:00" + + assert result["hlink"].values[0] == pytest.approx(1.25) + assert result["hlink"].values[1] == pytest.approx(1.04315438) + assert ( + result["qlink1"].values[0] == result["qlink2"].values[0] == pytest.approx(15.0) + ) + assert ( + result["qlink1"].values[1] == result["qlink2"].values[1] == pytest.approx(35.0) + ) + From f6a38824ddf1ca8b7f0ad8abd1d02b18792e7b01 Mon Sep 17 00:00:00 2001 From: Quinn Lee Date: Wed, 11 Mar 2026 16:44:35 -0500 Subject: [PATCH 11/15] add docstrigns --- tests/test_trouterestarts.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/test_trouterestarts.py b/tests/test_trouterestarts.py index 390bf41..eb5a6d0 100644 --- a/tests/test_trouterestarts.py +++ b/tests/test_trouterestarts.py @@ -55,6 +55,7 @@ def test_averages_streamflow_and_velocity(): + """Test average_nwm_variables""" # cat 1 -> features 101 (sf=10, v=1) and 102 (sf=20, v=2) => mean sf=15, v=1.5 # cat 2 -> feature 103 (sf=30, v=3) => mean sf=30, v=3.0 nwm_ids = np.array([101.0, 102.0, 103.0]) @@ -74,6 +75,7 @@ def test_averages_streamflow_and_velocity(): def test_averages_routelink(): + """Test average_rtlink_variables""" nwm_ids = np.array([101.0, 102.0, 103.0, 104.0]) mapping = pd.DataFrame( {"feature_id": [101.0, 102.0, 103.0, 104.0], "cat_id": [1, 1, 2, 2]} @@ -91,6 +93,7 @@ def test_averages_routelink(): def test_quadratic_formula(): + """Test quadratic_formula""" # Two equations: [x^2+2x-3, x^2-4] -> roots [1, 2] b = np.array([2.0, 0.0]) c = np.array([-3.0, -4.0]) @@ -99,6 +102,7 @@ def test_quadratic_formula(): def test_solve_depth_geom(): + """Test solve_depth_geom""" sf = np.array([0.0, 5.0, 2.0, 8.0, 1000.0, np.nan]) v = np.array([1.0, 0.0, 1.0, 1.0, 1.0, 1.0]) tw = np.array([10.0, 10.0, 10.0, 10.0, 10.0, 10.0]) @@ -116,6 +120,7 @@ def test_solve_depth_geom(): def test_restart(): + """Test create_restart""" result = create_restart( simple_cat_map, simple_crosswalk_ds, simple_nwm_ds, simple_routelink_ds ) @@ -135,4 +140,3 @@ def test_restart(): assert ( result["qlink1"].values[1] == result["qlink2"].values[1] == pytest.approx(35.0) ) - From 60c37041f3dec7c4e1b7c7199309ccb97a9d3165 Mon Sep 17 00:00:00 2001 From: Quinn Lee Date: Wed, 11 Mar 2026 17:21:23 -0500 Subject: [PATCH 12/15] full prep_ngen_conf test suite --- src/forcingprocessor/processor.py | 8 +- tests/test_trouterestarts.py | 162 ++++++++++++++++++++++++++++-- 2 files changed, 162 insertions(+), 8 deletions(-) diff --git a/src/forcingprocessor/processor.py b/src/forcingprocessor/processor.py index fc4ec35..36a3e4a 100644 --- a/src/forcingprocessor/processor.py +++ b/src/forcingprocessor/processor.py @@ -1011,6 +1011,8 @@ def prep_ngen_data(conf): if match: restart_date = match.group(1) restart_hour = match.group(2) + else: + print("Could not extract restart date and time") # Determine the file system type based on the first NWM forcing file global fs_type @@ -1064,6 +1066,8 @@ def prep_ngen_data(conf): nwm_file_sizes_MB = [] if fs_type == 'google': fs_arg = gcsfs.GCSFileSystem() + elif fs_type == 's3': + fs_arg = s3fs.S3FileSystem(anon=True) else: fs_arg = None if fs_arg: @@ -1104,8 +1108,8 @@ def prep_ngen_data(conf): storage_type, forcing_path, data_array, t_ax, filename) else: filename = "channel_restart_" + restart_date + "_" + restart_hour + "0000.nc" - data_array.to_netcdf(filename) - netcdf_cat_file_sizes_MB = [os.path.getsize(filename) / B2MB] + data_array.to_netcdf(forcing_path / Path(filename)) + netcdf_cat_file_sizes_MB = [os.path.getsize(forcing_path / filename) / B2MB] # write_netcdf(data_array,"1", t_ax, jcatchment_dict['1']) if ii_verbose: print(f'Writing catchment forcings to {output_path}!', end=None,flush=True) if ii_plot or ii_collect_stats or any(x in output_file_type for x in ["csv","parquet","tar"]): diff --git a/tests/test_trouterestarts.py b/tests/test_trouterestarts.py index eb5a6d0..936f5ba 100644 --- a/tests/test_trouterestarts.py +++ b/tests/test_trouterestarts.py @@ -2,6 +2,10 @@ Unit tests for restart utility functions. """ +import os +from pathlib import Path +from datetime import datetime, timedelta, timezone +import re import numpy as np import pandas as pd import pytest @@ -14,7 +18,8 @@ quadratic_formula, solve_depth_geom, ) - +from forcingprocessor.processor import prep_ngen_data +from forcingprocessor.nwm_filenames_generator import generate_nwmfiles # --------------------------------------------------------------------------- # minimum viable examples @@ -55,7 +60,6 @@ def test_averages_streamflow_and_velocity(): - """Test average_nwm_variables""" # cat 1 -> features 101 (sf=10, v=1) and 102 (sf=20, v=2) => mean sf=15, v=1.5 # cat 2 -> feature 103 (sf=30, v=3) => mean sf=30, v=3.0 nwm_ids = np.array([101.0, 102.0, 103.0]) @@ -75,7 +79,6 @@ def test_averages_streamflow_and_velocity(): def test_averages_routelink(): - """Test average_rtlink_variables""" nwm_ids = np.array([101.0, 102.0, 103.0, 104.0]) mapping = pd.DataFrame( {"feature_id": [101.0, 102.0, 103.0, 104.0], "cat_id": [1, 1, 2, 2]} @@ -93,7 +96,6 @@ def test_averages_routelink(): def test_quadratic_formula(): - """Test quadratic_formula""" # Two equations: [x^2+2x-3, x^2-4] -> roots [1, 2] b = np.array([2.0, 0.0]) c = np.array([-3.0, -4.0]) @@ -102,7 +104,6 @@ def test_quadratic_formula(): def test_solve_depth_geom(): - """Test solve_depth_geom""" sf = np.array([0.0, 5.0, 2.0, 8.0, 1000.0, np.nan]) v = np.array([1.0, 0.0, 1.0, 1.0, 1.0, 1.0]) tw = np.array([10.0, 10.0, 10.0, 10.0, 10.0, 10.0]) @@ -120,7 +121,6 @@ def test_solve_depth_geom(): def test_restart(): - """Test create_restart""" result = create_restart( simple_cat_map, simple_crosswalk_ds, simple_nwm_ds, simple_routelink_ds ) @@ -140,3 +140,153 @@ def test_restart(): assert ( result["qlink1"].values[1] == result["qlink2"].values[1] == pytest.approx(35.0) ) + + +# --------------------------------------------------------------------------- +# test prep_ngen_conf +# --------------------------------------------------------------------------- + +HF_VERSION="v2.2" +date = datetime.now(timezone.utc) +date = date.strftime('%Y%m%d') +HOURMINUTE = '0000' +TODAY_START = date + HOURMINUTE +yesterday = datetime.now(timezone.utc) - timedelta(hours=24) +yesterday = yesterday.strftime('%Y%m%d') +test_dir = Path(__file__).parent +data_dir = (test_dir/'data').resolve() +forcings_dir = (data_dir/'restart').resolve() +pwd = Path.cwd() +if os.path.exists(data_dir): + os.system(f"rm -rf {data_dir}") +os.system(f"mkdir {data_dir}") +FILENAMELIST = str((pwd/"filenamelist.txt").resolve()) +RETRO_FILENAMELIST = str((pwd/"retro_filenamelist.txt").resolve()) + +conf = { + "forcing" : { + "nwm_file" : FILENAMELIST, + "restart_map_file" : f"{pwd}/docs/examples/troute-restart_example/hf2.2_subset_cat_map.json", + "crosswalk_file" : f"{pwd}/docs/examples/troute-restart_example/crosswalk_subset.nc", + "routelink_file" : f"{pwd}/docs/examples/troute-restart_example/RouteLink_CONUS_subset.nc" + }, + + "storage":{ + "storage_type" : "local", + "output_path" : str(data_dir), + "output_file_type" : ["netcdf"] + }, + + "run" : { + "verbose" : False, + "collect_stats" : False, + "nprocs" : 1 + } + } + +nwmurl_conf = { + "forcing_type" : "operational_archive", + "start_date" : "", + "end_date" : "", + "runinput" : 5, + "varinput" : 1, + "geoinput" : 1, + "meminput" : 0, + "urlbaseinput" : 7, + "fcst_cycle" : [0], + "lead_time" : [0] + } + +@pytest.fixture +def clean_dir(autouse=True): + if os.path.exists(forcings_dir): + os.system(f'rm -rf {str(forcings_dir)}') + +def test_nomads_prod(): + nwmurl_conf['start_date'] = TODAY_START + nwmurl_conf['end_date'] = TODAY_START + nwmurl_conf["urlbaseinput"] = 1 + generate_nwmfiles(nwmurl_conf) + conf['run']['collect_stats'] = True # test metadata generation once + prep_ngen_data(conf) + conf['run']['collect_stats'] = False + assert_file=Path(data_dir / f"restart/channel_restart_{date}_{HOURMINUTE}00.nc").resolve() + assert assert_file.exists() + os.remove(assert_file) + +def test_nwm_google_apis(): + nwmurl_conf['start_date'] = TODAY_START + nwmurl_conf['end_date'] = TODAY_START + nwmurl_conf["urlbaseinput"] = 3 + generate_nwmfiles(nwmurl_conf) + prep_ngen_data(conf) + assert_file=Path(data_dir / f"restart/channel_restart_{date}_{HOURMINUTE}00.nc").resolve() + assert assert_file.exists() + os.remove(assert_file) + +def test_google_cloud_storage(): + nwmurl_conf['start_date'] = "202407100100" + nwmurl_conf['end_date'] = "202407100100" + nwmurl_conf["urlbaseinput"] = 4 + generate_nwmfiles(nwmurl_conf) + prep_ngen_data(conf) + assert_file=(data_dir/"restart/channel_restart_20240710_000000.nc").resolve() + assert assert_file.exists() + os.remove(assert_file) + +def test_gs(): + nwmurl_conf['start_date'] = TODAY_START + nwmurl_conf['end_date'] = TODAY_START + nwmurl_conf["urlbaseinput"] = 5 + generate_nwmfiles(nwmurl_conf) + assert_file=Path(data_dir / f"restart/channel_restart_{date}_{HOURMINUTE}00.nc").resolve() + prep_ngen_data(conf) + assert assert_file.exists() + os.remove(assert_file) + +def test_gcs(): + nwmurl_conf['start_date'] = "202407100100" + nwmurl_conf['end_date'] = "202407100100" + nwmurl_conf["urlbaseinput"] = 6 + generate_nwmfiles(nwmurl_conf) + prep_ngen_data(conf) + assert_file=(data_dir/"restart/channel_restart_20240710_000000.nc").resolve() + assert assert_file.exists() + os.remove(assert_file) + +def test_noaa_nwm_pds_https(): + nwmurl_conf['start_date'] = TODAY_START + nwmurl_conf['end_date'] = TODAY_START + nwmurl_conf["urlbaseinput"] = 7 + generate_nwmfiles(nwmurl_conf) + prep_ngen_data(conf) + assert_file=Path(data_dir / f"restart/channel_restart_{date}_{HOURMINUTE}00.nc").resolve() + assert assert_file.exists() + os.remove(assert_file) + +def test_noaa_nwm_pds_s3(): + nwmurl_conf['start_date'] = TODAY_START + nwmurl_conf['end_date'] = TODAY_START + nwmurl_conf["urlbaseinput"] = 8 + generate_nwmfiles(nwmurl_conf) + prep_ngen_data(conf) + assert_file=Path(data_dir / f"restart/channel_restart_{date}_{HOURMINUTE}00.nc").resolve() + assert assert_file.exists() + os.remove(assert_file) + +def test_s3_output(): + test_bucket = "ciroh-community-ngen-datastream" + conf['forcing']['nwm_file'] = RETRO_FILENAMELIST + conf['storage']['output_path'] = f's3://{test_bucket}/test/cicd/forcingprocessor/pytest' + nwmurl_conf["urlbaseinput"] = 4 + generate_nwmfiles(nwmurl_conf) + prep_ngen_data(conf) + conf['storage']['output_path'] = str(data_dir) + +def test_netcdf_output_type(): + generate_nwmfiles(nwmurl_conf) + conf['storage']['output_file_type'] = ["netcdf"] + prep_ngen_data(conf) + assert_file=(data_dir/"restart/channel_restart_20180101_000000.nc").resolve() + assert assert_file.exists() + os.remove(assert_file) From c56668c44303f0883a41d328dc0b72d444702748 Mon Sep 17 00:00:00 2001 From: Quinn Lee Date: Thu, 12 Mar 2026 08:28:22 -0500 Subject: [PATCH 13/15] remove retrospective support --- tests/test_trouterestarts.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/test_trouterestarts.py b/tests/test_trouterestarts.py index 936f5ba..68a872c 100644 --- a/tests/test_trouterestarts.py +++ b/tests/test_trouterestarts.py @@ -161,7 +161,6 @@ def test_restart(): os.system(f"rm -rf {data_dir}") os.system(f"mkdir {data_dir}") FILENAMELIST = str((pwd/"filenamelist.txt").resolve()) -RETRO_FILENAMELIST = str((pwd/"retro_filenamelist.txt").resolve()) conf = { "forcing" : { @@ -276,7 +275,6 @@ def test_noaa_nwm_pds_s3(): def test_s3_output(): test_bucket = "ciroh-community-ngen-datastream" - conf['forcing']['nwm_file'] = RETRO_FILENAMELIST conf['storage']['output_path'] = f's3://{test_bucket}/test/cicd/forcingprocessor/pytest' nwmurl_conf["urlbaseinput"] = 4 generate_nwmfiles(nwmurl_conf) @@ -287,6 +285,6 @@ def test_netcdf_output_type(): generate_nwmfiles(nwmurl_conf) conf['storage']['output_file_type'] = ["netcdf"] prep_ngen_data(conf) - assert_file=(data_dir/"restart/channel_restart_20180101_000000.nc").resolve() + assert_file=(data_dir/"restart/channel_restart_{date}_{HOURMINUTE}00.nc").resolve() assert assert_file.exists() os.remove(assert_file) From c3294a1ea302859e62f43b882f721347cad38a62 Mon Sep 17 00:00:00 2001 From: Quinn Lee Date: Thu, 12 Mar 2026 08:49:48 -0500 Subject: [PATCH 14/15] fix s3 writing --- src/forcingprocessor/processor.py | 7 ++-- src/forcingprocessor/troute_restart_tools.py | 38 ++++++++++++++++++++ 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/src/forcingprocessor/processor.py b/src/forcingprocessor/processor.py index 36a3e4a..5ec9c72 100644 --- a/src/forcingprocessor/processor.py +++ b/src/forcingprocessor/processor.py @@ -20,7 +20,7 @@ from forcingprocessor.plot_forcings import plot_ngen_forcings from forcingprocessor.utils import make_forcing_netcdf, get_window, log_time, convert_url2key, report_usage, nwm_variables, ngen_variables from forcingprocessor.channel_routing_tools import channelrouting_nwm2ngen, write_netcdf_chrt -from forcingprocessor.troute_restart_tools import create_restart +from forcingprocessor.troute_restart_tools import create_restart, write_netcdf_restart B2MB = 1048576 @@ -1108,8 +1108,9 @@ def prep_ngen_data(conf): storage_type, forcing_path, data_array, t_ax, filename) else: filename = "channel_restart_" + restart_date + "_" + restart_hour + "0000.nc" - data_array.to_netcdf(forcing_path / Path(filename)) - netcdf_cat_file_sizes_MB = [os.path.getsize(forcing_path / filename) / B2MB] + netcdf_cat_file_sizes_MB = write_netcdf_restart( + storage_type, forcing_path, data_array, filename + ) # write_netcdf(data_array,"1", t_ax, jcatchment_dict['1']) if ii_verbose: print(f'Writing catchment forcings to {output_path}!', end=None,flush=True) if ii_plot or ii_collect_stats or any(x in output_file_type for x in ["csv","parquet","tar"]): diff --git a/src/forcingprocessor/troute_restart_tools.py b/src/forcingprocessor/troute_restart_tools.py index 4b5e5c4..922f5d0 100644 --- a/src/forcingprocessor/troute_restart_tools.py +++ b/src/forcingprocessor/troute_restart_tools.py @@ -3,9 +3,16 @@ t-route. Translates between NWM and NGEN IDs! """ +from pathlib import Path +import tempfile +import os import xarray as xr import numpy as np import pandas as pd +import boto3 +from forcingprocessor.utils import convert_url2key + +B2MB = 1048576 def average_nwm_variables( @@ -217,3 +224,34 @@ def create_restart( ) return restart + + +def write_netcdf_restart(storage_type: str, prefix: Path, ds: xr.Dataset, name: str): + """ + Write restart data to a NetCDF file. + + Parameters: + storage_type (str): s3 or local + prefix (Path): filename prefix + data (xr.Dataset): restart file + name (str): string for the filename + Returns: + netcdf_cat_file_size (list): file size of output netcdf + """ + if storage_type == "s3": + s3_client = boto3.session.Session().client("s3") + nc_filename = str(prefix) + "/" + name + bucket, key = convert_url2key(nc_filename, "s3") + with tempfile.NamedTemporaryFile(suffix=".nc") as tmpfile: + ds.to_netcdf(tmpfile.name, engine="netcdf4") + netcdf_cat_file_size = os.path.getsize(tmpfile.name) / B2MB + tmpfile.flush() + tmpfile.seek(0) + print(f"Uploading netcdf forcings to S3: bucket={bucket}, key={key}") + s3_client.upload_file(tmpfile.name, bucket, key) + else: + nc_filename = Path(prefix, name) + ds.to_netcdf(nc_filename, engine="netcdf4") + print(f"netcdf has been written to {nc_filename}") + netcdf_cat_file_size = os.path.getsize(nc_filename) / B2MB + return [netcdf_cat_file_size] From ccc17252d3b751f2e6b79d96f93ff00cfbf5ce3a Mon Sep 17 00:00:00 2001 From: Quinn Lee Date: Thu, 12 Mar 2026 08:58:01 -0500 Subject: [PATCH 15/15] fix test --- tests/test_trouterestarts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_trouterestarts.py b/tests/test_trouterestarts.py index 68a872c..3ad7326 100644 --- a/tests/test_trouterestarts.py +++ b/tests/test_trouterestarts.py @@ -285,6 +285,6 @@ def test_netcdf_output_type(): generate_nwmfiles(nwmurl_conf) conf['storage']['output_file_type'] = ["netcdf"] prep_ngen_data(conf) - assert_file=(data_dir/"restart/channel_restart_{date}_{HOURMINUTE}00.nc").resolve() + assert_file=Path(data_dir / f"restart/channel_restart_{date}_{HOURMINUTE}00.nc").resolve() assert assert_file.exists() os.remove(assert_file)