Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1,673 changes: 217 additions & 1,456 deletions src/troute-network/troute/NHF.py

Large diffs are not rendered by default.

471 changes: 471 additions & 0 deletions src/troute-network/troute/nhf_discretize.py

Large diffs are not rendered by default.

578 changes: 578 additions & 0 deletions src/troute-network/troute/nhf_preprocess.py

Large diffs are not rendered by default.

434 changes: 434 additions & 0 deletions src/troute-network/troute/nhf_topology.py

Large diffs are not rendered by default.

19 changes: 12 additions & 7 deletions src/troute-nwm/src/nwm_routing/nhf_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,13 @@ def nhf_routing(argv):
})
routing_df["dx"] = routing_df["dx"] * 1000 # converted to meters

# Build flowveldepth_interorder for upstream inflow virtual segments
flowveldepth_interorder = network.build_flowveldepth_interorder(nts, qts_subdivisions)

run_results, subnetwork_list = nwm_route(
network.connections,
network.reverse_network,
network.waterbody_connections,
network.connections,
network.reverse_network,
network.waterbody_connections,
network.reaches_by_tailwater,
parallel_compute_method,
compute_kernel,
Expand All @@ -220,7 +222,7 @@ def nhf_routing(argv):
dt,
nts,
qts_subdivisions,
network.independent_networks,
network.independent_networks,
routing_df, # only routing where there are routing segments
network.q0,
network._qlateral,
Expand Down Expand Up @@ -254,7 +256,8 @@ def nhf_routing(argv):
network.coastal_boundary_depth_df,
network.unrefactored_topobathy_df,
firstRun,
logFileName
logFileName,
flowveldepth_interorder=flowveldepth_interorder,
)

route_end_time = time.time()
Expand Down Expand Up @@ -329,8 +332,10 @@ def nhf_routing(argv):
network.link_gage_df,
network.link_lake_crosswalk,
network.nexus_dict,
poi_crosswalk,
logFileName
poi_crosswalk,
logFileName,
fp_outlet_crosswalk=network.fp_outlet_crosswalk,
link_ids=network.links_df.index,
)


Expand Down
29 changes: 25 additions & 4 deletions src/troute-nwm/src/nwm_routing/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ def nwm_output_generator(
link_lake_crosswalk = None,
nexus_dict = None,
poi_crosswalk = None,
logFileName='NONE'
logFileName='NONE',
fp_outlet_crosswalk = None,
link_ids = None,
):

dt = run.get("dt")
Expand Down Expand Up @@ -253,9 +255,19 @@ def nwm_output_generator(
# replace waterbody lake_ids with outlet link ids
if (link_lake_crosswalk):
flowveldepth = _reindex_lake_to_link_id(flowveldepth, link_lake_crosswalk)


# reindex from link_id to fp_id, keeping outlet links + non-link rows (VFPs)
if fp_outlet_crosswalk:
outlet_link_ids = set(fp_outlet_crosswalk.keys())
all_link_ids = set(link_ids) if link_ids is not None else outlet_link_ids
# Keep outlet links and any non-link rows (e.g. VFP flow-scaled)
flowveldepth = flowveldepth.loc[
flowveldepth.index.isin(outlet_link_ids) | ~flowveldepth.index.isin(all_link_ids)
]
flowveldepth = flowveldepth.rename(index=fp_outlet_crosswalk)

# todo: create a unit test by saving FVD array to disk and then checking that
# it matches FVD array from parent branch or other configurations.
# it matches FVD array from parent branch or other configurations.
# flowveldepth.to_pickle(output_parameters['test_output'])

if return_courant:
Expand All @@ -274,7 +286,16 @@ def nwm_output_generator(
if link_lake_crosswalk:
# (re) set the flowveldepth index
courant.set_index(fvdidxs, inplace = True)


# reindex courant from link_id to fp_id
if fp_outlet_crosswalk:
outlet_link_ids_c = set(fp_outlet_crosswalk.keys())
all_link_ids_c = set(link_ids) if link_ids is not None else outlet_link_ids_c
courant = courant.loc[
courant.index.isin(outlet_link_ids_c) | ~courant.index.isin(all_link_ids_c)
]
courant = courant.rename(index=fp_outlet_crosswalk)

LOG.debug("Constructing the FVD DataFrame took %s seconds." % (time.time() - start))

if stream_output:
Expand Down
84 changes: 63 additions & 21 deletions src/troute-routing/troute/routing/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,26 +709,41 @@ def compute_nhd_routing_v02(
# if 1 == 1:
with Parallel(n_jobs=cpu_pool, backend="loky") as parallel:
results_subn = defaultdict(list)
flowveldepth_interorder = {}
# Preserve any pre-populated entries (e.g., upstream inflow
# virtual segments from NHF catchment discharge routing).
_prepopulated_fvd = dict(flowveldepth_interorder)
flowveldepth_interorder = dict(_prepopulated_fvd)

for order in range(max(subnetworks_only_ordered_jit.keys()), -1, -1):
jobs = []
for cluster, clustered_subns in reaches_ordered_bysubntw_clustered[
order
].items():
segs = clustered_subns["segs"]

# Identify virtual segments (those with pre-filled FVD
# data) that were absorbed into this cluster. They must
# be treated as offnetwork_upstreams so the kernel
# injects their flow via flowveldepth_interorder instead
# of routing through them as regular segments.
fvd_keys = set(flowveldepth_interorder.keys())
cluster_virtual_segs = fvd_keys & set(segs)
if cluster_virtual_segs:
segs = [s for s in segs if s not in cluster_virtual_segs]

offnetwork_upstreams = set()
segs_set = set(segs)
for seg in segs:
for us in rconn[seg]:
if us not in segs_set:
offnetwork_upstreams.add(us)

offnetwork_upstreams |= cluster_virtual_segs
segs.extend(offnetwork_upstreams)

common_segs = list(param_df.index.intersection(segs))
wbodies_segs = set(segs).symmetric_difference(common_segs)

#Declare empty dataframe
waterbody_types_df_sub = pd.DataFrame()

Expand All @@ -750,7 +765,7 @@ def compute_nhd_routing_v02(
"h0",
],
]

#If reservoir types other than Level Pool are active
if not waterbody_types_df.empty:
waterbody_types_df_sub = waterbody_types_df.loc[
Expand All @@ -768,21 +783,29 @@ def compute_nhd_routing_v02(
common_segs,
["dt", "bw", "tw", "twcc", "dx", "n", "ncc", "cs", "s0", "alt"],
].sort_index()

param_df_sub_super = param_df_sub.reindex(
param_df_sub.index.tolist() + lake_segs
).sort_index()
if order < max(subnetworks_only_ordered_jit.keys()):
for us_subn_tw in offnetwork_upstreams:

for us_subn_tw in offnetwork_upstreams:
if us_subn_tw in flowveldepth_interorder:
subn_tw_sortposition = param_df_sub_super.index.get_loc(
us_subn_tw
)
flowveldepth_interorder[us_subn_tw][
"position_index"
] = subn_tw_sortposition

subn_reach_list = clustered_subns["subn_reach_list"]
# Remove virtual segments from reach lists so the kernel
# doesn't route through them (they get flow from FVD)
if cluster_virtual_segs:
subn_reach_list = [
[s for s in reach if s not in cluster_virtual_segs]
for reach in subn_reach_list
]
subn_reach_list = [r for r in subn_reach_list if r]
upstreams = clustered_subns["upstreams"]

subn_reach_list_with_type = _build_reach_type_list(subn_reach_list, wbodies_segs)
Expand Down Expand Up @@ -946,7 +969,8 @@ def compute_nhd_routing_v02(
results_subn[order] = parallel(jobs)

if order > 0: # This is not needed for the last rank of subnetworks
flowveldepth_interorder = {}
# Start with pre-populated entries (e.g., catchment inflow virtual segments)
flowveldepth_interorder = dict(_prepopulated_fvd)
for ci, (cluster, clustered_subns) in enumerate(
reaches_ordered_bysubntw_clustered[order].items()
):
Expand Down Expand Up @@ -1036,7 +1060,10 @@ def compute_nhd_routing_v02(
start_para_time = time.time()
with Parallel(n_jobs=cpu_pool, backend="loky") as parallel:
results_subn = defaultdict(list)
flowveldepth_interorder = {}
# Preserve any pre-populated entries (e.g., upstream inflow
# virtual segments from NHF catchment discharge routing).
_prepopulated_fvd = dict(flowveldepth_interorder)
flowveldepth_interorder = dict(_prepopulated_fvd)

for order in range(max(subnetworks_only_ordered_jit.keys()), -1, -1):
jobs = []
Expand All @@ -1046,21 +1073,35 @@ def compute_nhd_routing_v02(
# TODO: Confirm that a list here is best -- we are sorting,
# so a set might be sufficient/better
segs = list(chain.from_iterable(subn_reach_list))

# Identify virtual segments (those with pre-filled FVD
# data) that were absorbed into this subnetwork.
fvd_keys = set(flowveldepth_interorder.keys())
cluster_virtual_segs = fvd_keys & set(segs)
if cluster_virtual_segs:
segs = [s for s in segs if s not in cluster_virtual_segs]
subn_reach_list = [
[s for s in reach if s not in cluster_virtual_segs]
for reach in subn_reach_list
]
subn_reach_list = [r for r in subn_reach_list if r]

offnetwork_upstreams = set()
segs_set = set(segs)
for seg in segs:
for us in rconn[seg]:
if us not in segs_set:
offnetwork_upstreams.add(us)

offnetwork_upstreams |= cluster_virtual_segs
segs.extend(offnetwork_upstreams)

common_segs = list(param_df.index.intersection(segs))
wbodies_segs = set(segs).symmetric_difference(common_segs)

#Declare empty dataframe
waterbody_types_df_sub = pd.DataFrame()

if not waterbodies_df.empty:
lake_segs = list(waterbodies_df.index.intersection(segs))
waterbodies_df_sub = waterbodies_df.loc[
Expand All @@ -1079,7 +1120,7 @@ def compute_nhd_routing_v02(
"h0",
],
]

#If reservoir types other than Level Pool are active
if not waterbody_types_df.empty:
waterbody_types_df_sub = waterbody_types_df.loc[
Expand All @@ -1092,18 +1133,18 @@ def compute_nhd_routing_v02(
else:
lake_segs = []
waterbodies_df_sub = pd.DataFrame()

param_df_sub = param_df.loc[
common_segs,
["dt", "bw", "tw", "twcc", "dx", "n", "ncc", "cs", "s0", "alt"],
].sort_index()

param_df_sub_super = param_df_sub.reindex(
param_df_sub.index.tolist() + lake_segs
).sort_index()
if order < max(subnetworks_only_ordered_jit.keys()):
for us_subn_tw in offnetwork_upstreams:

for us_subn_tw in offnetwork_upstreams:
if us_subn_tw in flowveldepth_interorder:
subn_tw_sortposition = param_df_sub_super.index.get_loc(
us_subn_tw
)
Expand Down Expand Up @@ -1269,7 +1310,8 @@ def compute_nhd_routing_v02(
results_subn[order] = parallel(jobs)

if order > 0: # This is not needed for the last rank of subnetworks
flowveldepth_interorder = {}
# Start with pre-populated entries (e.g., catchment inflow virtual segments)
flowveldepth_interorder = dict(_prepopulated_fvd)
for twi, subn_tw in enumerate(reaches_ordered_bysubntw[order]):
# TODO: This index step is necessary because we sort the segment index
# TODO: I think there are a number of ways we could remove the sorting step
Expand Down
Loading