Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions src/troute-bmi/src/troute_nwm_bmi/troute_bmi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,30 @@

_VAR_NAME_UNITS_MAP = {
'land_surface_water_source__volume_flow_rate': ['streamflow_cms', 'm3 s-1'],
'channel_exit_water_x-section__volume_flow_rate': ['streamflow_cms', 'm3 s-1'],
'channel_water_flow__speed': ['streamflow_ms', 'm s-1'],
'channel_water__mean_depth': ['streamflow_m', 'm'],
'lake_water~incoming__volume_flow_rate': ['waterbody_cms', 'm3 s-1'],
'lake_water~outgoing__volume_flow_rate': ['waterbody_cms', 'm3 s-1'],
'lake_surface__elevation': ['waterbody_m', 'm'],
}

_OUTPUT_VAR_NAMES = []
_OUTPUT_VAR_NAMES = [
"channel_water__id",
"channel_exit_water_x-section__volume_flow_rate",
"channel_water_flow__speed",
"channel_water__mean_depth",
"lake_water__id",
"lake_water~incoming__volume_flow_rate",
"lake_water~outgoing__volume_flow_rate",
"lake_surface__elevation"
]

_INPUT_VAR_NAMES = [
"land_surface_water_source__id",
"land_surface_water_source__id" + _COUNT_SUFFIX,
"land_surface_water_source__volume_flow_rate",
"land_surface_water_source__volume_flow_rate" + _COUNT_SUFFIX,
"upstream_id",
]

Expand All @@ -33,10 +50,11 @@ def __init__(self):
"land_surface_water_source__volume_flow_rate" + _COUNT_SUFFIX: np.zeros(1, dtype=np.int64),
"upstream_id": np.zeros(0, dtype=int),
}
for var in _OUTPUT_VAR_NAMES:
self._values[var] = np.zeros(0, np.int32) # dtype subject to change after running model
self._var_loc = "node"
self._var_grid_id = 0
self._time_units = "s"
self._start_time = 0.0
self._end_time = np.finfo("d").max

def initialize(self, bmi_cfg_file):
Expand Down Expand Up @@ -95,25 +113,25 @@ def get_value_ptr(self, var_name: str):

def get_start_time(self):
"""Start time of model."""
return self._start_time
return 0.0

def get_end_time(self):
"""End time of model."""
return self._end_time
return self._model.ngen_dt * (self._model.nts - 1)

def get_current_time(self):
return self._model.time

def get_time_step(self):
return self._model.dt
return self._model.ngen_dt

def get_time_units(self):
return self._time_units

def finalize(self):
"""Finalize model."""
if self._model is not None:
self._model.run(self._values)
self._model.finalize(self._values)
self._model = None

# BMI functions that are not being used yet...
Expand All @@ -124,11 +142,7 @@ def update_frac(self, time_frac: float):
time_frac : float
Fraction fo a time step.
"""
time_step = self.get_time_step()
self._model.dt = int(time_frac * time_step)
if self._model.dt > 0:
self.update()
self._model.dt = time_step
raise NotImplementedError("update_frac")

def get_var_type(self, var_name):
"""Data type of variable.
Expand Down
159 changes: 126 additions & 33 deletions src/troute-bmi/src/troute_nwm_bmi/troute_model.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Basic Model Interface backing model for NGEN t-route."""
from __future__ import annotations
import logging
import time
import yaml
Expand All @@ -21,8 +22,6 @@


class Model:
dt: int

def __init__(self, config_file: str):
self._main_start_time = time.time()
self._time = 0.0
Expand All @@ -33,7 +32,18 @@ def __init__(self, config_file: str):

log_level_set(self.log_parameters)

self.dt = int(self.forcing_parameters["dt"])
output_type = self.stream_output.get("stream_output_type", None)
if output_type and (output_type != ".nc"):
error = "The stream output type can only be NetCDF. Current type: " + str(output_type)
LOG.error(error)
raise RuntimeError(error)
if self.compute_parameters.get("parallel_compute_method", "bmi") != "bmi":
LOG.warning(
"The parallel_compute_method is set to "
+ '"' + str(self.compute_parameters.get("parallel_compute_method")) + '"'
+ " in the configuration. To improve processing speed during catchment calculations,"
+ ' this will be set to "' + "bmi" '".'
)

LOG.info("Creating network of type " + self.supernetwork_parameters.get("network_type"))
network_start_time = time.time()
Expand Down Expand Up @@ -66,28 +76,28 @@ def __init__(self, config_file: str):
)
else:
raise Exception("Supernetwork network type must be HYFeaturesNetwork or NHDNetwork")
self.start_time = self._network.t0
self._network.assemble_coastal_coupling_data()
network_creation_time = time.time() - network_start_time

# Data data assimilation
LOG.debug("Creating DataAssimilation object")
forcing_start_time = time.time()
da_run = {}
run_sets = self._create_run_sets()
if self.data_assimilation_parameters:
run_set = {
"nts": self.nts,
"final_timestamp": self.t0 + timedelta(seconds=self.nts * self.dt)
}
da_sets = hnu.build_da_sets(self.data_assimilation_parameters, [run_set], self._network.t0)
if da_sets:
da_run = da_sets[0]
self._da_sets = hnu.build_da_sets(self.data_assimilation_parameters, run_sets, self._network.t0)
else:
self._da_sets = [{} for _ in run_sets]
self._da_index = 1

self._data_assimilation = DataAssimilation(
network=self._network,
data_assimilation_parameters=self.data_assimilation_parameters,
run_parameters={},
waterbody_parameters=self.waterbody_parameters,
from_files=True,
value_dict=None,
da_run=da_run,
da_run=self._da_sets[0],
)
forcing_time = time.time() - forcing_start_time

Expand All @@ -111,14 +121,20 @@ def update(self, bmi_values: dict):
step_time = self._network.t0 + timedelta(seconds=self.time)
timestamp = step_time.strftime("%Y%m%d%H%M")
self._df_data[timestamp] = np.array(qlat_values)
self._time += self.dt
self._time += self.ngen_dt
self._timings["forcing_time"] += time.time() - start
if len(self._df_data) >= self.max_timestep_buffer:
self.run(bmi_values)
self._df_data.clear()


def run(self, bmi_values: dict):
nts = self.nts
qts_subdivisions = self.forcing_parameters.get('qts_subdivisions', 12)

nts = len(self._df_data) * self.qts_subdivisions
run_params = {
"t0": self._network.t0,
"dt": self.dt,
"nts": nts,
}
LOG.debug("Assembling forcing dataframe")
forcing_start_time = time.time()
## setup the qlats dataframe from the update() data
Expand All @@ -136,26 +152,30 @@ def run(self, bmi_values: dict):
else:
flowveldepth_interorder = {}

usgs_df = self._data_assimilation.usgs_df
if not usgs_df.empty:
usgs_df = usgs_df.loc[:,self._network.t0:]

LOG.debug("Starting routing function")
route_start_time = time.time()
run_results, self._subnetwork = nwm_routing.nwm_route(
downstream_connections=self._network.connections,
upstream_connections=self._network.reverse_network,
waterbodies_in_connections=self._network.waterbody_connections,
reaches_bytw=self._network._reaches_by_tw,
parallel_compute_method=self.compute_parameters.get("parallel_compute_method", "serial"),
parallel_compute_method="bmi",
compute_kernel=self.compute_parameters.get("compute_kernel"),
subnetwork_target_size=self.compute_parameters.get('subnetwork_target_size'),
cpu_pool=self.cpu_pool,
t0=self.t0,
t0=self._network.t0,
dt=self.dt,
nts=nts,
qts_subdivisions=qts_subdivisions,
qts_subdivisions=self.qts_subdivisions,
independent_networks=self._network.independent_networks,
param_df=self._network.dataframe,
q0=self._network.q0,
qlats=qlats,
usgs_df=self._data_assimilation.usgs_df,
usgs_df=usgs_df,
lastobs_df=self._data_assimilation.lastobs_df,
reservoir_usgs_df=self._data_assimilation.reservoir_usgs_df,
reservoir_usgs_param_df=self._data_assimilation.reservoir_usgs_param_df,
Expand Down Expand Up @@ -185,20 +205,18 @@ def run(self, bmi_values: dict):
)
self._timings["route_time"] = time.time() - route_start_time

# create initial conditions for next loop iteration
# # create initial conditions for next loop iteration
self._network.new_q0(run_results)
self._network.update_waterbody_water_elevation()

# update reservoir parameters and lastobs_df
# # update reservoir parameters and lastobs_df
self._data_assimilation.update_after_compute(run_results, self.dt * nts)

LOG.debug("Generating output")
output_start_time = time.time()
run_params = {
"t0": self.t0,
"dt": self.dt,
"nts": nts,
}

# Note: After creating the output file the first run, all subsequent writes will append the results.
# TODO: Allow the results to be written into the middle of an existing file, e.g., NGEN loads a previous step and needs to re-write
nwm_output_generator(
run=run_params,
results=run_results,
Expand All @@ -207,7 +225,7 @@ def run(self, bmi_values: dict):
parity_parameters=self.parity_parameters,
restart_parameters=self.restart_parameters,
parity_set={},
qts_subdivisions=qts_subdivisions,
qts_subdivisions=self.qts_subdivisions,
return_courant=self.compute_parameters.get("return_courant", False),
cpu_pool=self.cpu_pool,
waterbodies_df=self._network.waterbody_dataframe,
Expand All @@ -219,9 +237,49 @@ def run(self, bmi_values: dict):
link_lake_crosswalk=self._network.link_lake_crosswalk,
nexus_dict=self._network.nexus_dict,
poi_crosswalk=self._network.poi_nex_dict or {},
filename_t0=self.start_time,
)

self._network.new_t0(self.dt, nts)
if self._da_index < len(self._da_sets):
self._data_assimilation.update_for_next_loop(
self._network,
self._da_sets[self._da_index]
)
self._da_index += 1

# compute BMI outputs
qvd_columns = pd.MultiIndex.from_product(
[range(nts), ["q", "v", "d"]]
).to_flat_index()
flowveldepth = pd.concat(
[pd.DataFrame(r[1], index=r[0], columns=qvd_columns) for r in run_results],
copy=False,
)
bmi_values["channel_exit_water_x-section__volume_flow_rate"] = flowveldepth.iloc[:,-3].to_numpy()
bmi_values["channel_water_flow__speed"] = flowveldepth.iloc[:,-2].to_numpy()
bmi_values["channel_water__mean_depth"] = flowveldepth.iloc[:,-1].to_numpy()
bmi_values["channel_water__id"] = flowveldepth.index.to_numpy()

i_columns = pd.MultiIndex.from_product(
[range(int(nts)), ["i"]]
).to_flat_index()
wbdy = pd.concat(
[pd.DataFrame(r[6], index=r[0], columns=i_columns) for r in run_results],
copy=False,
)

wbdy_id = self._network.waterbody_dataframe.index.values
bmi_values["lake_water__id"] = wbdy_id
bmi_values["lake_water~incoming__volume_flow_rate"] = wbdy.loc[wbdy_id].iloc[:,-1]
bmi_values["lake_water~outgoing__volume_flow_rate"] = flowveldepth.loc[wbdy_id].iloc[:,-3]
bmi_values["lake_surface__elevation"] = flowveldepth.loc[wbdy_id].iloc[:,-1]

self._timings["output_time"] = time.time() - output_start_time

def finalize(self, bmi_values: dict):
if len(self._df_data) > 0:
self.run(bmi_values)
if self.show_timing:
self._log_times()

Expand All @@ -243,7 +301,7 @@ def log_parameters(self) -> dict:

@property
def compute_parameters(self) -> dict:
return self._config.get("compute_parameters", {})
return self._config["compute_parameters"]

@property
def network_topology_parameters(self) -> dict:
Expand All @@ -253,6 +311,10 @@ def network_topology_parameters(self) -> dict:
def output_parameters(self) -> dict:
return self._config.get("output_parameters", {})

@property
def stream_output(self) -> dict:
return self.output_parameters.get("stream_output", {})

@property
def preprocessing_parameters(self) -> dict:
return self.network_topology_parameters.get("preprocessing_parameters", {})
Expand All @@ -267,7 +329,7 @@ def supernetwork_parameters(self) -> dict:

@property
def forcing_parameters(self) -> dict:
return self.compute_parameters.get("forcing_parameters", {})
return self.compute_parameters["forcing_parameters"]

@property
def restart_parameters(self) -> dict:
Expand All @@ -285,6 +347,10 @@ def data_assimilation_parameters(self) -> dict:
def parity_parameters(self) -> dict:
return self.output_parameters.get("wrf_hydro_parity_check", {})

@property
def max_timestep_buffer(self) -> int:
return self.bmi_parameters.get("max_timestep_buffer", 1000)

@property
def show_timing(self):
return bool(self.log_parameters.get("showtiming"))
Expand All @@ -303,15 +369,42 @@ def time(self) -> float:
return self._time

@property
def t0(self) -> datetime:
return self._network.t0
def qts_subdivisions(self) -> int:
return self.forcing_parameters["qts_subdivisions"]

@property
def dt(self) -> int:
return self.forcing_parameters["dt"]

@property
def ngen_dt(self) -> int:
return int(self.dt * self.qts_subdivisions)

def _create_run_sets(self):
nts = self.nts
ngen_dt = self.ngen_dt
max_buffer = self.max_timestep_buffer
run_sets = []
run_start = 0
while run_start < nts:
run_nts = max_buffer
if run_start + run_nts > nts:
run_nts = nts - run_start
run_t0 = self.start_time + timedelta(seconds=run_start * ngen_dt)
run_end = run_t0 + timedelta(seconds=(run_nts - 1) * ngen_dt)
run_sets.append({
"nts": run_nts * self.qts_subdivisions,
"final_timestamp": run_end
})
run_start += max_buffer
return run_sets

def _log_times(self):
def sec_and_per(title, key: str):
seconds = round(self._timings[key], 2)
percent = round(self._timings[key] / process_time * 100, 2)
LOG.info(f"{title}: {seconds} secs, {percent} %")
process_time = time.time() - self._main_start_time
process_time = sum(self._timings.values())
LOG.debug(f"Processes complete in {process_time} seconds.")
LOG.info('************ TIMING SUMMARY ************')
LOG.info('----------------------------------------')
Expand Down
Loading