From 22a4ad6e6082b97e4b5d5ea081b3927f3e5eda2f Mon Sep 17 00:00:00 2001 From: Phil Miller Date: Mon, 22 Dec 2025 09:47:59 -0800 Subject: [PATCH 1/4] WIP: Split apart t-route initialization/setup from execution --- include/core/NgenSimulation.hpp | 11 +++ src/core/NgenSimulation.cpp | 151 ++++++++++++++++++++------------ 2 files changed, 105 insertions(+), 57 deletions(-) diff --git a/include/core/NgenSimulation.hpp b/include/core/NgenSimulation.hpp index 00e5ef49eb..2870b3b1b8 100644 --- a/include/core/NgenSimulation.hpp +++ b/include/core/NgenSimulation.hpp @@ -12,6 +12,11 @@ namespace hy_features class HY_Features_MPI; } +namespace models::bmi +{ + class Bmi_Py_Adapter; +} + #include #include #include @@ -48,6 +53,7 @@ class NgenSimulation */ void run_catchments(); + void initialize_routing(std::string const& t_route_config_file_with_path); /** * Run t-route on the stored nexus outflow values for the full configured duration of the simulation */ @@ -75,6 +81,11 @@ class NgenSimulation std::unordered_map nexus_indexes_; std::vector nexus_downstream_flows_; + std::unique_ptr py_troute_ = nullptr; + size_t global_nexus_count_; + std::unordered_map global_nexus_indexes_; + std::unordered_map *routing_nexus_indexes_; + int mpi_rank_; int mpi_num_procs_; diff --git a/src/core/NgenSimulation.cpp b/src/core/NgenSimulation.cpp index c5cce69103..5afc1b491a 100644 --- a/src/core/NgenSimulation.cpp +++ b/src/core/NgenSimulation.cpp @@ -27,6 +27,7 @@ NgenSimulation::NgenSimulation( , layers_(std::move(layers)) , catchment_indexes_(std::move(catchment_indexes)) , nexus_indexes_(std::move(nexus_indexes)) + , routing_nexus_indexes_(&nexus_indexes) , mpi_rank_(mpi_rank) , mpi_num_procs_(mpi_num_procs) { @@ -119,23 +120,13 @@ double NgenSimulation::get_nexus_outflow(int nexus_index, int timestep_index) co return nexus_downstream_flows_[timestep_index * nexus_indexes_.size() + nexus_index]; } -void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::string const& t_route_config_file_with_path) +void NgenSimulation::initialize_routing(std::string const& t_route_config_file_with_path) { #if NGEN_WITH_ROUTING - std::vector *routing_nexus_downflows = &nexus_downstream_flows_; std::unordered_map *routing_nexus_indexes = &nexus_indexes_; - - size_t number_of_timesteps = sim_time_->get_total_output_times(); - if (nexus_downstream_flows_.size() != number_of_timesteps * nexus_indexes_.size()) { - std::string msg = "Routing input data in NgenSimulation::nexus_downstream_flows_ does not reflect a full-duration run"; - LOG(msg, LogLevel::FATAL); - throw std::runtime_error(msg); - } + global_nexus_count_ = nexus_indexes_.size(); #if NGEN_WITH_MPI - std::vector all_nexus_downflows; - std::unordered_map all_nexus_indexes; - if (mpi_num_procs_ > 1) { std::vector local_nexus_ids; for (const auto& nexus : nexus_indexes_) { @@ -154,17 +145,98 @@ void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::s // MPI_Broadcast so all processes share the nexus IDs all_nexus_ids = std::move(parallel::broadcast_strings(all_nexus_ids, mpi_rank_, mpi_num_procs_)); + for (int i = 0; i < all_nexus_ids.size(); ++i) { + std::string const& nexus_id = all_nexus_ids[i]; + global_nexus_indexes_[nexus_id] = i; + } + + routing_nexus_indexes_ = &global_nexus_indexes_; + global_nexus_count_ = all_nexus_ids.size(); + } +#endif // NGEN_WITH_MPI + + if (mpi_rank_ == 0) { + py_troute_ = std::make_unique("T-Route", t_route_config_file_with_path, "troute_nwm_bmi.troute_bmi.BmiTroute", true); + + // Error/inconsistency checking + { + std::string time_units = py_troute_->GetTimeUnits(); + if (time_units != "s") { + Logger::logMsgAndThrowError("T-route units were expected to be 's', got " + time_units); + } + + // Note: Currently, delta_time is set in the t-route yaml configuration file, and the + // number_of_timesteps is determined from the total number of nexus outputs in t-route. + // It is recommended to still pass these values to the routing_py_adapter object in + // case a future implementation needs these two values from the ngen framework. + int delta_time = sim_time_->get_output_interval_seconds(); + double bmi_timestep = py_troute_->GetTimeStep(); + if (bmi_timestep != delta_time) { + Logger::logMsgAndThrowError("T-route timestep " + std::to_string(bmi_timestep) + " s" + + "doesn't match simulation timestep " + std::to_string(delta_time) + " s "); + } + } + + // tell BMI to resize nexus containers + py_troute_->SetValue("land_surface_water_source__volume_flow_rate__count", &global_nexus_count_); + py_troute_->SetValue("land_surface_water_source__id__count", &global_nexus_count_); + + // set up nexus id indexes + std::vector nexus_df_index(global_nexus_count_); + for (const auto& key_value : *routing_nexus_indexes_) { + int id_index = key_value.second; + + // Convert string ID into numbers for T-route index + int id_as_int = -1; + size_t sep_index = key_value.first.find(hy_features::identifiers::separator); + if (sep_index != std::string::npos) { + std::string numbers = key_value.first.substr(sep_index + hy_features::identifiers::separator.length()); + id_as_int = std::stoi(numbers); + } + if (id_as_int == -1) { + std::string error_msg = "Cannot convert the nexus ID to an integer: " + key_value.first; + LOG(LogLevel::FATAL, error_msg); + throw std::runtime_error(error_msg); + } + nexus_df_index[id_index] = id_as_int; + } + py_troute_->SetValue("land_surface_water_source__id", nexus_df_index.data()); + } +#endif // NGEN_WITH_ROUTING +} + +void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::string const& t_route_config_file_with_path) +{ +#if NGEN_WITH_ROUTING + initialize_routing(t_route_config_file_with_path); + + std::vector *routing_nexus_downflows = &nexus_downstream_flows_; + + size_t number_of_timesteps = sim_time_->get_total_output_times(); + if (nexus_downstream_flows_.size() != number_of_timesteps * nexus_indexes_.size()) { + std::string msg = "Routing input data in NgenSimulation::nexus_downstream_flows_ does not reflect a full-duration run"; + LOG(msg, LogLevel::FATAL); + throw std::runtime_error(msg); + } + +#if NGEN_WITH_MPI + std::vector all_nexus_downflows; + + if (mpi_num_procs_ > 1) { // MPI_Reduce to collect the results from processes if (mpi_rank_ == 0) { - all_nexus_downflows.resize(number_of_timesteps * all_nexus_ids.size(), 0.0); + all_nexus_downflows.resize(number_of_timesteps * global_nexus_count_, 0.0); } std::vector local_buffer(number_of_timesteps); std::vector receive_buffer(number_of_timesteps, 0.0); - for (int i = 0; i < all_nexus_ids.size(); ++i) { - std::string nexus_id = all_nexus_ids[i]; - if (nexus_indexes_.find(nexus_id) != nexus_indexes_.end() && !features.is_remote_sender_nexus(nexus_id)) { + for (auto const& global_nexus_iter : global_nexus_indexes_) { + std::string const& nexus_id = global_nexus_iter.first; + int global_nexus_index = global_nexus_iter.second; + + auto local_iter = nexus_indexes_.find(nexus_id); + if (local_iter != nexus_indexes_.end() && !features.is_remote_sender_nexus(nexus_id)) { // if this process has the id and receives/records data, copy the values to the buffer - int nexus_index = nexus_indexes_[nexus_id]; + int nexus_index = local_iter->second; for (int step = 0; step < number_of_timesteps; ++step) { int offset = step * nexus_indexes_.size() + nexus_index; local_buffer[step] = nexus_downstream_flows_[offset]; @@ -176,9 +248,8 @@ void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::s MPI_Reduce(local_buffer.data(), receive_buffer.data(), number_of_timesteps, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); if (mpi_rank_ == 0) { // copy reduce values to a combined downflows vector - all_nexus_indexes[nexus_id] = i; for (int step = 0; step < number_of_timesteps; ++step) { - int offset = step * all_nexus_ids.size() + i; + int offset = step * global_nexus_count_ + global_nexus_index; all_nexus_downflows[offset] = receive_buffer[step]; receive_buffer[step] = 0.0; } @@ -187,7 +258,6 @@ void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::s if (mpi_rank_ == 0) { // update root's local data for running t-route below - routing_nexus_indexes = &all_nexus_indexes; routing_nexus_downflows = &all_nexus_downflows; } } @@ -196,46 +266,13 @@ void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::s if (mpi_rank_ == 0) { // Run t-route from single process LOG(LogLevel::INFO, "Running T-Route on nexus outflows."); - // Note: Currently, delta_time is set in the t-route yaml configuration file, and the - // number_of_timesteps is determined from the total number of nexus outputs in t-route. - // It is recommended to still pass these values to the routing_py_adapter object in - // case a future implementation needs these two values from the ngen framework. - int delta_time = sim_time_->get_output_interval_seconds(); - - // model for routing - models::bmi::Bmi_Py_Adapter py_troute("T-Route", t_route_config_file_with_path, "troute_nwm_bmi.troute_bmi.BmiTroute", true); - - // tell BMI to resize nexus containers - int64_t nexus_count = routing_nexus_indexes->size(); - py_troute.SetValue("land_surface_water_source__volume_flow_rate__count", &nexus_count); - py_troute.SetValue("land_surface_water_source__id__count", &nexus_count); - // set up nexus id indexes - std::vector nexus_df_index(nexus_count); - for (const auto& key_value : *routing_nexus_indexes) { - int id_index = key_value.second; - - // Convert string ID into numbers for T-route index - int id_as_int = -1; - size_t sep_index = key_value.first.find(hy_features::identifiers::separator); - if (sep_index != std::string::npos) { - std::string numbers = key_value.first.substr(sep_index + hy_features::identifiers::separator.length()); - id_as_int = std::stoi(numbers); - } - if (id_as_int == -1) { - std::string error_msg = "Cannot convert the nexus ID to an integer: " + key_value.first; - LOG(LogLevel::FATAL, error_msg); - throw std::runtime_error(error_msg); - } - nexus_df_index[id_index] = id_as_int; - } - py_troute.SetValue("land_surface_water_source__id", nexus_df_index.data()); for (int i = 0; i < number_of_timesteps; ++i) { - py_troute.SetValue("land_surface_water_source__volume_flow_rate", - routing_nexus_downflows->data() + (i * nexus_count)); - py_troute.Update(); + py_troute_->SetValue("land_surface_water_source__volume_flow_rate", + routing_nexus_downflows->data() + (i * global_nexus_count_)); + py_troute_->Update(); } // Finalize will write the output file - py_troute.Finalize(); + py_troute_->Finalize(); } #endif // NGEN_WITH_ROUTING } From b52d85cef87009c48ceb826fe92982bdc3feec2b Mon Sep 17 00:00:00 2001 From: Phil Miller Date: Fri, 26 Dec 2025 13:40:30 -0800 Subject: [PATCH 2/4] Abstract pointer to router module to avoid compile-time coupling to complete class definition --- include/core/NgenSimulation.hpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/include/core/NgenSimulation.hpp b/include/core/NgenSimulation.hpp index 2870b3b1b8..1066a00170 100644 --- a/include/core/NgenSimulation.hpp +++ b/include/core/NgenSimulation.hpp @@ -5,6 +5,7 @@ #include #include +#include namespace hy_features { @@ -81,7 +82,7 @@ class NgenSimulation std::unordered_map nexus_indexes_; std::vector nexus_downstream_flows_; - std::unique_ptr py_troute_ = nullptr; + std::unique_ptr py_troute_ = nullptr; size_t global_nexus_count_; std::unordered_map global_nexus_indexes_; std::unordered_map *routing_nexus_indexes_; From 5a7d5b236803a2e63b2213d85cf130576d76ede5 Mon Sep 17 00:00:00 2001 From: Phil Miller Date: Tue, 30 Dec 2025 17:18:24 -0800 Subject: [PATCH 3/4] NgenSimulation: Pass HY_Features to constructor for later use in routing --- include/core/NgenSimulation.hpp | 14 ++++++++------ src/NGen.cpp | 1 + src/core/NgenSimulation.cpp | 2 ++ 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/include/core/NgenSimulation.hpp b/include/core/NgenSimulation.hpp index 1066a00170..6fd8aefea4 100644 --- a/include/core/NgenSimulation.hpp +++ b/include/core/NgenSimulation.hpp @@ -27,9 +27,16 @@ namespace models::bmi class NgenSimulation { public: +#if NGEN_WITH_MPI + using hy_features_t = hy_features::HY_Features_MPI; +#else + using hy_features_t = hy_features::HY_Features; +#endif + NgenSimulation( Simulation_Time const& sim_time, std::vector> layers, + hy_features_t *features, std::unordered_map catchment_indexes, std::unordered_map nexus_indexes, int mpi_rank, @@ -39,12 +46,6 @@ class NgenSimulation ~NgenSimulation(); -#if NGEN_WITH_MPI - using hy_features_t = hy_features::HY_Features_MPI; -#else - using hy_features_t = hy_features::HY_Features; -#endif - /** * Run the catchment formulations for the full configured duration of the simulation * @@ -75,6 +76,7 @@ class NgenSimulation // Catchment data std::vector> layers_; + hy_features_t *features_; // Routing data structured for t-route std::unordered_map catchment_indexes_; diff --git a/src/NGen.cpp b/src/NGen.cpp index c554ef751a..5a3d14c462 100644 --- a/src/NGen.cpp +++ b/src/NGen.cpp @@ -708,6 +708,7 @@ int main(int argc, char* argv[]) { auto simulation = std::make_unique(*sim_time, layers, + &features, std::move(catchment_indexes), std::move(nexus_indexes), mpi_rank, diff --git a/src/core/NgenSimulation.cpp b/src/core/NgenSimulation.cpp index 5afc1b491a..58a4b29436 100644 --- a/src/core/NgenSimulation.cpp +++ b/src/core/NgenSimulation.cpp @@ -17,6 +17,7 @@ NgenSimulation::NgenSimulation( Simulation_Time const& sim_time, std::vector> layers, + hy_features_t *features, std::unordered_map catchment_indexes, std::unordered_map nexus_indexes, int mpi_rank, @@ -25,6 +26,7 @@ NgenSimulation::NgenSimulation( : simulation_step_(0) , sim_time_(std::make_shared(sim_time)) , layers_(std::move(layers)) + , features_(features) , catchment_indexes_(std::move(catchment_indexes)) , nexus_indexes_(std::move(nexus_indexes)) , routing_nexus_indexes_(&nexus_indexes) From 5d708cdc9ebff02c9c3281f1759590f8db311329 Mon Sep 17 00:00:00 2001 From: Phil Miller Date: Tue, 30 Dec 2025 17:21:51 -0800 Subject: [PATCH 4/4] NgenSimulation: Restructure routing logic to gather and pass nexus flows, and call Update() at each timestep --- include/core/NgenSimulation.hpp | 5 +- src/NGen.cpp | 6 +- src/core/NgenSimulation.cpp | 122 ++++++++++++++++++-------------- 3 files changed, 76 insertions(+), 57 deletions(-) diff --git a/include/core/NgenSimulation.hpp b/include/core/NgenSimulation.hpp index 6fd8aefea4..c9661d92ce 100644 --- a/include/core/NgenSimulation.hpp +++ b/include/core/NgenSimulation.hpp @@ -59,7 +59,7 @@ class NgenSimulation /** * Run t-route on the stored nexus outflow values for the full configured duration of the simulation */ - void run_routing(hy_features_t &features, std::string const& t_route_config_file_with_path); + void run_routing(); int get_nexus_index(std::string const& nexus_id) const; double get_nexus_outflow(int nexus_index, int timestep_index) const; @@ -70,6 +70,9 @@ class NgenSimulation private: void advance_models_one_output_step(); + void gather_flows_for_routing(size_t num_steps, boost::span local_downflows, boost::span gathered_downflows); + void advance_routing_one_step(boost::span downflows); + int simulation_step_; std::shared_ptr sim_time_; diff --git a/src/NGen.cpp b/src/NGen.cpp index 5a3d14c462..899a030521 100644 --- a/src/NGen.cpp +++ b/src/NGen.cpp @@ -714,6 +714,10 @@ int main(int argc, char* argv[]) { mpi_rank, mpi_num_procs); + if (manager->get_using_routing()) { + simulation->initialize_routing(manager->get_t_route_config_file_with_path()); + } + auto time_done_init = std::chrono::steady_clock::now(); std::chrono::duration time_elapsed_init = time_done_init - time_start; LOG("[TIMING]: Init: " + std::to_string(time_elapsed_init.count()), LogLevel::INFO); @@ -762,7 +766,7 @@ int main(int argc, char* argv[]) { #endif if (manager->get_using_routing()) { - simulation->run_routing(features, manager->get_t_route_config_file_with_path()); + simulation->run_routing(); } auto time_done_routing = std::chrono::steady_clock::now(); diff --git a/src/core/NgenSimulation.cpp b/src/core/NgenSimulation.cpp index 58a4b29436..f4cae4f8cb 100644 --- a/src/core/NgenSimulation.cpp +++ b/src/core/NgenSimulation.cpp @@ -44,6 +44,16 @@ void NgenSimulation::run_catchments() // Now loop some time, iterate catchments, do stuff for total number of output times auto num_times = get_num_output_times(); +#if NGEN_WITH_MPI + std::vector all_nexus_downflows; + + if (mpi_num_procs_ > 1) { + if (mpi_rank_ == 0) { + all_nexus_downflows.resize(global_nexus_count_); + } + } +#endif // NGEN_WITH_MPI + for (; simulation_step_ < num_times; simulation_step_++) { // Make room for this output step's results catchment_outflows_.resize(catchment_outflows_.size() + catchment_indexes_.size(), 0.0); @@ -51,6 +61,22 @@ void NgenSimulation::run_catchments() advance_models_one_output_step(); +#if NGEN_WITH_ROUTING + boost::span routing_nexus_downflows = {nexus_downstream_flows_.data() + nexus_indexes_.size() * simulation_step_, + nexus_indexes_.size()}; + +#if NGEN_WITH_MPI + if (mpi_num_procs_ > 1) { + gather_flows_for_routing(1, routing_nexus_downflows, all_nexus_downflows); + routing_nexus_downflows = all_nexus_downflows; + } +#endif // NGEN_WITH_MPI + + if (mpi_rank_ == 0) { // Run t-route from single process + advance_routing_one_step(routing_nexus_downflows); + } +#endif // NGEN_WITH_ROUTING + if (simulation_step_ + 1 < num_times) { sim_time_->advance_timestep(); } @@ -207,73 +233,59 @@ void NgenSimulation::initialize_routing(std::string const& t_route_config_file_w #endif // NGEN_WITH_ROUTING } -void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::string const& t_route_config_file_with_path) +// Only called in the mpi_num_procs_ > 1 case +void NgenSimulation::gather_flows_for_routing(size_t num_steps, + boost::span local_downflows, + boost::span gathered_downflows) { -#if NGEN_WITH_ROUTING - initialize_routing(t_route_config_file_with_path); - - std::vector *routing_nexus_downflows = &nexus_downstream_flows_; - - size_t number_of_timesteps = sim_time_->get_total_output_times(); - if (nexus_downstream_flows_.size() != number_of_timesteps * nexus_indexes_.size()) { - std::string msg = "Routing input data in NgenSimulation::nexus_downstream_flows_ does not reflect a full-duration run"; - LOG(msg, LogLevel::FATAL); - throw std::runtime_error(msg); - } - #if NGEN_WITH_MPI - std::vector all_nexus_downflows; - - if (mpi_num_procs_ > 1) { - // MPI_Reduce to collect the results from processes - if (mpi_rank_ == 0) { - all_nexus_downflows.resize(number_of_timesteps * global_nexus_count_, 0.0); - } - std::vector local_buffer(number_of_timesteps); - std::vector receive_buffer(number_of_timesteps, 0.0); - for (auto const& global_nexus_iter : global_nexus_indexes_) { - std::string const& nexus_id = global_nexus_iter.first; - int global_nexus_index = global_nexus_iter.second; - - auto local_iter = nexus_indexes_.find(nexus_id); - if (local_iter != nexus_indexes_.end() && !features.is_remote_sender_nexus(nexus_id)) { - // if this process has the id and receives/records data, copy the values to the buffer - int nexus_index = local_iter->second; - for (int step = 0; step < number_of_timesteps; ++step) { - int offset = step * nexus_indexes_.size() + nexus_index; - local_buffer[step] = nexus_downstream_flows_[offset]; - } - } else { - // if this process does not have the id, fill with 0 to make sure it doesn't affect reduce sum - std::fill(local_buffer.begin(), local_buffer.end(), 0.0); - } - MPI_Reduce(local_buffer.data(), receive_buffer.data(), number_of_timesteps, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); - if (mpi_rank_ == 0) { - // copy reduce values to a combined downflows vector - for (int step = 0; step < number_of_timesteps; ++step) { - int offset = step * global_nexus_count_ + global_nexus_index; - all_nexus_downflows[offset] = receive_buffer[step]; - receive_buffer[step] = 0.0; - } + std::vector local_buffer(num_steps); + std::vector receive_buffer(num_steps, 0.0); + + for (auto const& global_nexus_iter : global_nexus_indexes_) { + std::string const& nexus_id = global_nexus_iter.first; + int global_nexus_index = global_nexus_iter.second; + + auto local_iter = nexus_indexes_.find(nexus_id); + if (local_iter != nexus_indexes_.end() && !features_->is_remote_sender_nexus(nexus_id)) { + // if this process has the id and receives/records data, copy the values to the buffer + int nexus_index = local_iter->second; + for (int step = 0; step < num_steps; ++step) { + int offset = step * nexus_indexes_.size() + nexus_index; + local_buffer[step] = local_downflows[offset]; } + } else { + // if this process does not have the id, fill with 0 to make sure it doesn't affect reduce sum + std::fill(local_buffer.begin(), local_buffer.end(), 0.0); } + MPI_Reduce(local_buffer.data(), receive_buffer.data(), num_steps, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); + if (mpi_rank_ == 0) { - // update root's local data for running t-route below - routing_nexus_downflows = &all_nexus_downflows; + // copy reduce values to a combined downflows vector + for (int step = 0; step < num_steps; ++step) { + int offset = step * global_nexus_count_ + global_nexus_index; + gathered_downflows[offset] = receive_buffer[step]; + receive_buffer[step] = 0.0; + } } } -#endif // NGEN_WITH_MPI +#endif +} + +void NgenSimulation::advance_routing_one_step(boost::span downflows) +{ + py_troute_->SetValue("land_surface_water_source__volume_flow_rate", downflows.data()); + py_troute_->Update(); +} +void NgenSimulation::run_routing() +{ +#if NGEN_WITH_ROUTING if (mpi_rank_ == 0) { // Run t-route from single process LOG(LogLevel::INFO, "Running T-Route on nexus outflows."); - for (int i = 0; i < number_of_timesteps; ++i) { - py_troute_->SetValue("land_surface_water_source__volume_flow_rate", - routing_nexus_downflows->data() + (i * global_nexus_count_)); - py_troute_->Update(); - } - // Finalize will write the output file + // Finalize will finish running the simulation and write the output file py_troute_->Finalize(); } #endif // NGEN_WITH_ROUTING