diff --git a/CMakeLists.txt b/CMakeLists.txt index 7c96c93b27..8ba84add7e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -165,7 +165,12 @@ add_compile_definitions(NGEN_SHARED_LIB_EXTENSION) set(Boost_USE_STATIC_LIBS OFF) set(Boost_USE_MULTITHREADED ON) set(Boost_USE_STATIC_RUNTIME OFF) -find_package(Boost 1.86.0 REQUIRED) +if(CMAKE_CXX_STANDARD LESS 17) + # requires non-header filesystem for state saving if C++ 11 or lower + find_package(Boost 1.86.0 REQUIRED COMPONENTS system filesystem) +else() + find_package(Boost 1.86.0 REQUIRED) +endif() # ----------------------------------------------------------------------------- if(NGEN_WITH_SQLITE) @@ -318,6 +323,7 @@ add_subdirectory("src/geojson") add_subdirectory("src/bmi") add_subdirectory("src/realizations/catchment") add_subdirectory("src/forcing") +add_subdirectory("src/state_save_restore") add_subdirectory("src/utilities") add_subdirectory("src/utilities/mdarray") add_subdirectory("src/utilities/mdframe") @@ -337,7 +343,9 @@ target_link_libraries(ngen NGen::core_mediator NGen::logging NGen::parallel + NGen::state_save_restore NGen::bmi_protocols + NGen::state_save_restore ) if(NGEN_WITH_SQLITE) diff --git a/Dockerfile b/Dockerfile index 6ece41e410..93edd0d218 100644 --- a/Dockerfile +++ b/Dockerfile @@ -317,7 +317,8 @@ RUN --mount=type=cache,target=/root/.cache/cmake,id=cmake-soilfreezethaw \ RUN --mount=type=cache,target=/root/.cache/cmake,id=cmake-ueb-bmi \ set -eux && \ - cmake -B extern/ueb-bmi/cmake_build -S extern/ueb-bmi/ -DBMICXX_INCLUDE_DIRS=/ngen-app/ngen/extern/bmi-cxx/ -DBOOST_ROOT=/opt/boost && \ + cmake -B extern/ueb-bmi/cmake_build -S extern/ueb-bmi/ \ + -DUEB_SUPPRESS_OUTPUTS=ON -DBMICXX_INCLUDE_DIRS=/ngen-app/ngen/extern/bmi-cxx/ -DBOOST_ROOT=/opt/boost && \ cmake --build extern/ueb-bmi/cmake_build/ && \ find /ngen-app/ngen/extern/ueb-bmi/ -name '*.o' -exec rm -f {} + diff --git a/data/example_state_saving_config.json b/data/example_state_saving_config.json new file mode 100644 index 0000000000..cba48afcdb --- /dev/null +++ b/data/example_state_saving_config.json @@ -0,0 +1,109 @@ +{ + "global": { + "formulations": [ + { + "name": "bmi_c++", + "params": { + "model_type_name": "test_bmi_cpp", + "library_file": "./extern/test_bmi_cpp/cmake_build/libtestbmicppmodel.so", + "init_config": "./data/bmi/c/test/test_bmi_c_config.ini", + "main_output_variable": "OUTPUT_VAR_2", + "variables_names_map" : { + "INPUT_VAR_2": "TMP_2maboveground", + "INPUT_VAR_1": "precip_rate" + }, + "create_function": "bmi_model_create", + "destroy_function": "bmi_model_destroy", + "uses_forcing_file": false + } + } + ], + "forcing": { + "file_pattern": ".*{{id}}.*.csv", + "path": "./data/forcing/" + } + }, + "state_saving": { + "label": "end", + "path": "state_end", + "type": "FilePerUnit", + "when": "EndOfRun" + }, + "time": { + "start_time": "2015-12-01 00:00:00", + "end_time": "2015-12-30 23:00:00", + "output_interval": 3600 + }, + "output_root": "./output_dir/", + "catchments": { + "cat-27": { + "formulations": [ + { + "name": "bmi_c++", + "params": { + "model_type_name": "test_bmi_cpp", + "library_file": "./extern/test_bmi_cpp/cmake_build/libtestbmicppmodel.so", + "init_config": "./data/bmi/c/test/test_bmi_c_config.ini", + "main_output_variable": "OUTPUT_VAR_2", + "variables_names_map" : { + "INPUT_VAR_2": "TMP_2maboveground", + "INPUT_VAR_1": "precip_rate" + }, + "create_function": "bmi_model_create", + "destroy_function": "bmi_model_destroy", + "uses_forcing_file": false + } + } + ], + "forcing": { + "path": "./data/forcing/cat-27_2015-12-01 00_00_00_2015-12-30 23_00_00.csv" + } + }, + "cat-52": { + "formulations": [ + { + "name": "bmi_c++", + "params": { + "model_type_name": "test_bmi_cpp", + "library_file": "./extern/test_bmi_cpp/cmake_build/libtestbmicppmodel.so", + "init_config": "./data/bmi/c/test/test_bmi_c_config.ini", + "main_output_variable": "OUTPUT_VAR_2", + "variables_names_map" : { + "INPUT_VAR_2": "TMP_2maboveground", + "INPUT_VAR_1": "precip_rate" + }, + "create_function": "bmi_model_create", + "destroy_function": "bmi_model_destroy", + "uses_forcing_file": false + } + } + ], + "forcing": { + "path": "./data/forcing/cat-52_2015-12-01 00_00_00_2015-12-30 23_00_00.csv" + } + }, + "cat-67": { + "formulations": [ + { + "name": "bmi_c++", + "params": { + "model_type_name": "test_bmi_cpp", + "library_file": "./extern/test_bmi_cpp/cmake_build/libtestbmicppmodel.so", + "init_config": "./data/bmi/c/test/test_bmi_c_config.ini", + "main_output_variable": "OUTPUT_VAR_2", + "variables_names_map" : { + "INPUT_VAR_2": "TMP_2maboveground", + "INPUT_VAR_1": "precip_rate" + }, + "create_function": "bmi_model_create", + "destroy_function": "bmi_model_destroy", + "uses_forcing_file": false + } + } + ], + "forcing": { + "path": "./data/forcing/cat-67_2015-12-01 00_00_00_2015-12-30 23_00_00.csv" + } + } + } +} diff --git a/data/example_state_saving_config_multi.json b/data/example_state_saving_config_multi.json new file mode 100644 index 0000000000..ecfaa272fd --- /dev/null +++ b/data/example_state_saving_config_multi.json @@ -0,0 +1,117 @@ +{ + "global": { + "formulations": [ + { + "name": "bmi_multi", + "params": { + "model_type_name": "bmi_multi_noahowp_cfe", + "forcing_file": "", + "init_config": "", + "allow_exceed_end_time": true, + "main_output_variable": "Q_OUT", + "modules": [ + { + "name": "bmi_c++", + "params": { + "model_type_name": "bmi_c++_sloth", + "library_file": "./extern/sloth/cmake_build/libslothmodel.so", + "init_config": "/dev/null", + "allow_exceed_end_time": true, + "main_output_variable": "z", + "uses_forcing_file": false, + "model_params": { + "sloth_ice_fraction_schaake(1,double,m,node)": 0.0, + "sloth_ice_fraction_xinanjiang(1,double,1,node)": 0.0, + "sloth_smp(1,double,1,node)": 0.0 + } + } + }, + { + "name": "bmi_fortran", + "params": { + "model_type_name": "bmi_fortran_noahowp", + "library_file": "./extern/noah-owp-modular/cmake_build/libsurfacebmi", + "forcing_file": "", + "init_config": "./data/bmi/fortran/noah-owp-modular-init-{{id}}.namelist.input", + "allow_exceed_end_time": true, + "main_output_variable": "QINSUR", + "variables_names_map": { + "PRCPNONC": "atmosphere_water__liquid_equivalent_precipitation_rate", + "Q2": "atmosphere_air_water~vapor__relative_saturation", + "SFCTMP": "land_surface_air__temperature", + "UU": "land_surface_wind__x_component_of_velocity", + "VV": "land_surface_wind__y_component_of_velocity", + "LWDN": "land_surface_radiation~incoming~longwave__energy_flux", + "SOLDN": "land_surface_radiation~incoming~shortwave__energy_flux", + "SFCPRS": "land_surface_air__pressure" + }, + "uses_forcing_file": false + } + }, + { + "name": "bmi_c", + "params": { + "model_type_name": "bmi_c_pet", + "library_file": "./extern/evapotranspiration/evapotranspiration/cmake_build/libpetbmi", + "forcing_file": "", + "init_config": "./data/bmi/c/pet/{{id}}_bmi_config.ini", + "allow_exceed_end_time": true, + "main_output_variable": "water_potential_evaporation_flux", + "registration_function": "register_bmi_pet", + "variables_names_map": { + "water_potential_evaporation_flux": "potential_evapotranspiration" + }, + "uses_forcing_file": false + } + }, + { + "name": "bmi_c", + "params": { + "model_type_name": "bmi_c_cfe", + "library_file": "./extern/cfe/cmake_build/libcfebmi", + "forcing_file": "", + "init_config": "./data/bmi/c/cfe/{{id}}_bmi_config.ini", + "allow_exceed_end_time": true, + "main_output_variable": "Q_OUT", + "registration_function": "register_bmi_cfe", + "variables_names_map": { + "water_potential_evaporation_flux": "potential_evapotranspiration", + "atmosphere_air_water~vapor__relative_saturation": "SPFH_2maboveground", + "land_surface_air__temperature": "TMP_2maboveground", + "land_surface_wind__x_component_of_velocity": "UGRD_10maboveground", + "land_surface_wind__y_component_of_velocity": "VGRD_10maboveground", + "land_surface_radiation~incoming~longwave__energy_flux": "DLWRF_surface", + "land_surface_radiation~incoming~shortwave__energy_flux": "DSWRF_surface", + "land_surface_air__pressure": "PRES_surface", + "ice_fraction_schaake" : "sloth_ice_fraction_schaake", + "ice_fraction_xinanjiang" : "sloth_ice_fraction_xinanjiang", + "soil_moisture_profile" : "sloth_smp" + }, + "uses_forcing_file": false + } + } + ], + "uses_forcing_file": false + } + } + ], + "forcing": { + "file_pattern": ".*{{id}}.*..csv", + "path": "./data/forcing/", + "provider": "CsvPerFeature" + } + }, + "state_saving": [{ + "direction": "save", + "label": "end", + "path": "state_end", + "type": "FilePerUnit", + "when": "EndOfRun" + }], + "time": { + "start_time": "2015-12-01 00:00:00", + "end_time": "2015-12-30 23:00:00", + "output_interval": 3600 + }, + "output_root": "./output_dir/" +} diff --git a/extern/LASAM b/extern/LASAM index 764dc82e8f..be9146acc4 160000 --- a/extern/LASAM +++ b/extern/LASAM @@ -1 +1 @@ -Subproject commit 764dc82e8fb5a160e646a8f0fde35f964fd42234 +Subproject commit be9146acc442d3efeb0b59531b0af6e8ed75ba89 diff --git a/extern/SoilFreezeThaw/SoilFreezeThaw b/extern/SoilFreezeThaw/SoilFreezeThaw index ab641a8209..6d8b09bdf5 160000 --- a/extern/SoilFreezeThaw/SoilFreezeThaw +++ b/extern/SoilFreezeThaw/SoilFreezeThaw @@ -1 +1 @@ -Subproject commit ab641a820920acb788dc47513a1e0ccbf31483c2 +Subproject commit 6d8b09bdf5be7f85a8ab6de69ee0356164f1e12b diff --git a/extern/SoilMoistureProfiles/SoilMoistureProfiles b/extern/SoilMoistureProfiles/SoilMoistureProfiles index 41c802cb48..7d3899cec5 160000 --- a/extern/SoilMoistureProfiles/SoilMoistureProfiles +++ b/extern/SoilMoistureProfiles/SoilMoistureProfiles @@ -1 +1 @@ -Subproject commit 41c802cb4862e7bd01f6081816a093135bd51282 +Subproject commit 7d3899cec59d4fb5eee26b8082b451f1336c04e1 diff --git a/extern/cfe/cfe b/extern/cfe/cfe index f60afb6a8a..db54ba0ef3 160000 --- a/extern/cfe/cfe +++ b/extern/cfe/cfe @@ -1 +1 @@ -Subproject commit f60afb6a8a49e388e6263664239ff27742132d26 +Subproject commit db54ba0ef3707d87202d958ca1a075921fdb5855 diff --git a/extern/evapotranspiration/evapotranspiration b/extern/evapotranspiration/evapotranspiration index 096208ad62..456fc262a9 160000 --- a/extern/evapotranspiration/evapotranspiration +++ b/extern/evapotranspiration/evapotranspiration @@ -1 +1 @@ -Subproject commit 096208ad624e07216617f770a3447eb829266112 +Subproject commit 456fc262a9e78386b788b24f24c2b6c1a2ab7cf0 diff --git a/extern/lstm b/extern/lstm index 85a3301dae..d9e3102b62 160000 --- a/extern/lstm +++ b/extern/lstm @@ -1 +1 @@ -Subproject commit 85a3301daeff761a54b6ebda6fee7aac977a62ce +Subproject commit d9e3102b62ecd95037ca47a9a8d610e70ec588a5 diff --git a/extern/noah-owp-modular/noah-owp-modular b/extern/noah-owp-modular/noah-owp-modular index 25579b4948..713244b946 160000 --- a/extern/noah-owp-modular/noah-owp-modular +++ b/extern/noah-owp-modular/noah-owp-modular @@ -1 +1 @@ -Subproject commit 25579b4948e28e5afd0bed3e99e08a806fd9fc7c +Subproject commit 713244b94638224e3772ce96338e8c7bdaf44b09 diff --git a/extern/sac-sma/sac-sma b/extern/sac-sma/sac-sma index b40f61ca9e..e00a0117ba 160000 --- a/extern/sac-sma/sac-sma +++ b/extern/sac-sma/sac-sma @@ -1 +1 @@ -Subproject commit b40f61ca9e82d4c4d0fc6171b314714af0160ab3 +Subproject commit e00a0117ba08c20b48e389bd2a37c199d197c3a6 diff --git a/extern/sloth b/extern/sloth index ee0d982ccc..fc9c09823c 160000 --- a/extern/sloth +++ b/extern/sloth @@ -1 +1 @@ -Subproject commit ee0d982ccc07663cfea7bf0ac4d645841e19ccc1 +Subproject commit fc9c09823c90ca70a388f032fa19d88fc4a96091 diff --git a/extern/snow17 b/extern/snow17 index 10c2510bfa..70ff380df0 160000 --- a/extern/snow17 +++ b/extern/snow17 @@ -1 +1 @@ -Subproject commit 10c2510bfa45743a3828ea0fc890f79974b48390 +Subproject commit 70ff380df0a57dd175b007eda04ce71561f16dfa diff --git a/extern/t-route b/extern/t-route index 4c100cb7b9..c00f9fd16b 160000 --- a/extern/t-route +++ b/extern/t-route @@ -1 +1 @@ -Subproject commit 4c100cb7b94ff61b8028b4169571057ccdcea02f +Subproject commit c00f9fd16b0aa7b6de2bf9f9696166402291b743 diff --git a/extern/topmodel/topmodel b/extern/topmodel/topmodel index fa4f7e56db..6259302492 160000 --- a/extern/topmodel/topmodel +++ b/extern/topmodel/topmodel @@ -1 +1 @@ -Subproject commit fa4f7e56dbe46df8cc0d7ca9095102290170b866 +Subproject commit 6259302492941fbae086a685a80c5b4632cf822d diff --git a/extern/ueb-bmi b/extern/ueb-bmi index 299976367a..1b2804cbb0 160000 --- a/extern/ueb-bmi +++ b/extern/ueb-bmi @@ -1 +1 @@ -Subproject commit 299976367a5329602fc1443f932e9cbf6de4ace6 +Subproject commit 1b2804cbb04acbd38642bd63d213517b13c78df1 diff --git a/include/bmi/Bmi_Py_Adapter.hpp b/include/bmi/Bmi_Py_Adapter.hpp index d66124d534..2e48dc7845 100644 --- a/include/bmi/Bmi_Py_Adapter.hpp +++ b/include/bmi/Bmi_Py_Adapter.hpp @@ -541,33 +541,30 @@ namespace models { int itemSize = GetVarItemsize(name); std::string py_type = GetVarType(name); std::string cxx_type = get_analogous_cxx_type(py_type, (size_t) itemSize); - - if (cxx_type == "short") { - set_value(name, (short *) src); - } else if (cxx_type == "int") { - set_value(name, (int *) src); - } else if (cxx_type == "long") { - set_value(name, (long *) src); - } else if (cxx_type == "long long") { - //FIXME this gets dicey -- if a python numpy array is of type np.int64 (long long), - //but a c++ int* is passed to this function as src, it will fail in undefined ways... - //the template type overload may be perferred for doing SetValue from framework components - //such as forcing providers... - set_value(name, (long long *) src); - } else if (cxx_type == "float") { - set_value(name, (float *) src); - } else if (cxx_type == "double") { - set_value(name, (double *) src); - } else if (cxx_type == "long double") { - set_value(name, (long double *) src); - } else { + // macro for checking type and setting value + #define BMI_PY_SET_VALUE(type) if (cxx_type == #type) {\ + this->set_value(name, static_cast(src)); } + BMI_PY_SET_VALUE(signed char) + else BMI_PY_SET_VALUE(unsigned char) + else BMI_PY_SET_VALUE(short) + else BMI_PY_SET_VALUE(unsigned short) + else BMI_PY_SET_VALUE(int) + else BMI_PY_SET_VALUE(unsigned int) + else BMI_PY_SET_VALUE(long) + else BMI_PY_SET_VALUE(unsigned long) + else BMI_PY_SET_VALUE(long long) + else BMI_PY_SET_VALUE(unsigned long long) + else BMI_PY_SET_VALUE(float) + else BMI_PY_SET_VALUE(double) + else BMI_PY_SET_VALUE(long double) + else { std::string throw_msg; throw_msg.assign("Bmi_Py_Adapter cannot set values for variable '" + name + "' that has unrecognized C++ type '" + cxx_type + "'"); LOG(throw_msg, LogLevel::WARNING); throw std::runtime_error(throw_msg); } + #undef BMI_PY_SET_VALUE } - /** * Set the values of the given BMI variable for the model, sourcing new data from the provided vector. * @@ -597,6 +594,24 @@ namespace models { } } + /** + * Set the value of a variable. This version of setting a variable will send an array with the `size` specified instead of checking the BMI for its current size of the variable. + * Ownership of the pointer will remain in C++, so the consuming BMI should not maintain a reference to the values beyond the scope of its `set_value` method. + * + * @param name The name of the BMI variable. + * @param src Pointer to the data that will be sent to the BMI. + * @param size The number of items represented by the pointer. + */ + template + void set_value_unchecked(const std::string &name, T *src, size_t size) { + // declare readonly array info with the pointer and size + py::buffer_info info(src, static_cast(size), true); + // create the array with the info and NULL handler so python doesn't take ownership + py::array_t src_array(info, nullptr); + // pass the array to python to read; the BMI should not attempt to maintain a reference beyond the scope of this function to prevent trying to use freed memory + bmi_model->attr("set_value")(name, src_array); + } + /** * Set values for a model's BMI variable at specified indices. * diff --git a/include/core/Layer.hpp b/include/core/Layer.hpp index 5c3c4481fa..9a21401fb2 100644 --- a/include/core/Layer.hpp +++ b/include/core/Layer.hpp @@ -16,6 +16,9 @@ namespace hy_features class HY_Features_MPI; } +class State_Snapshot_Saver; +class State_Snapshot_Loader; + namespace ngen { @@ -110,6 +113,10 @@ namespace ngen std::unordered_map &nexus_indexes, int current_step); + virtual void save_state_snapshot(std::shared_ptr snapshot_saver); + virtual void load_state_snapshot(std::shared_ptr snapshot_loader); + virtual void load_hot_start(std::shared_ptr snapshot_loader); + protected: const LayerDescription description; diff --git a/include/core/NgenSimulation.hpp b/include/core/NgenSimulation.hpp index 00e5ef49eb..3189045edc 100644 --- a/include/core/NgenSimulation.hpp +++ b/include/core/NgenSimulation.hpp @@ -12,6 +12,13 @@ namespace hy_features class HY_Features_MPI; } +class State_Snapshot_Saver; +class State_Snapshot_Loader; + +#if NGEN_WITH_ROUTING +#include "bmi/Bmi_Py_Adapter.hpp" +#endif // NGEN_WITH_ROUTING + #include #include #include @@ -48,6 +55,9 @@ class NgenSimulation */ void run_catchments(); + // Tear down of any items stored on the NgenSimulation object that could throw errors and, thus, should be kept separate from the deconstructor. + void finalize(); + /** * Run t-route on the stored nexus outflow values for the full configured duration of the simulation */ @@ -59,6 +69,17 @@ class NgenSimulation size_t get_num_output_times() const; std::string get_timestamp_for_step(int step) const; + void save_state_snapshot(std::shared_ptr snapshot_saver); + void load_state_snapshot(std::shared_ptr snapshot_loader); + /** + * Saves a snapshot state that's intended to be run at the end of a simulation. + * + * This version of saving will include T-Route BMI data and exclude the nexus outflow data stored during the catchment processing. + */ + void save_end_of_run(std::shared_ptr snapshot_saver); + // Load a snapshot of the end of a previous run. This will create a T-Route python adapter if the loader finds a unit for it and the config path is not empty. + void load_hot_start(std::shared_ptr snapshot_loader, const std::string &t_route_config_file_with_path); + private: void advance_models_one_output_step(); @@ -74,6 +95,12 @@ class NgenSimulation std::vector catchment_outflows_; std::unordered_map nexus_indexes_; std::vector nexus_downstream_flows_; +#if NGEN_WITH_ROUTING + std::unique_ptr py_troute_; +#endif // NGEN_WITH_ROUTING + void make_troute(const std::string &t_route_config_file_with_path); + + std::string unit_name() const; int mpi_rank_; int mpi_num_procs_; diff --git a/include/core/nexus/HY_PointHydroNexusRemote.hpp b/include/core/nexus/HY_PointHydroNexusRemote.hpp index 34d98f9a96..ebac2e9ace 100644 --- a/include/core/nexus/HY_PointHydroNexusRemote.hpp +++ b/include/core/nexus/HY_PointHydroNexusRemote.hpp @@ -72,6 +72,7 @@ class HY_PointHydroNexusRemote : public HY_PointHydroNexus communication_type get_communicator_type() { return type; } private: + void post_receives(); void process_communications(); int world_rank; diff --git a/include/realizations/catchment/Bmi_Formulation.hpp b/include/realizations/catchment/Bmi_Formulation.hpp index f2a13074e8..91236fea42 100644 --- a/include/realizations/catchment/Bmi_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Formulation.hpp @@ -41,6 +41,9 @@ class Bmi_Formulation_Test; class Bmi_C_Formulation_Test; class Bmi_C_Pet_IT; +class State_Snapshot_Saver; +class State_Snapshot_Loader; + namespace realization { /** @@ -68,6 +71,30 @@ namespace realization { virtual ~Bmi_Formulation() {}; + /** + * Passes a serialized representation of the model's state to ``saver`` + * + * Asks the model to serialize its state, queries the pointer + * and length, passes that to saver, and then releases it + */ + virtual void save_state(std::shared_ptr saver) = 0; + + /** + * Passes a serialized representation of the model's state to ``loader`` + * + * Asks saver to find data for the BMI and passes that data to the BMI for loading. + */ + virtual void load_state(std::shared_ptr loader) = 0; + + /** + * Passes a serialized representation of the model's state to ``loader`` + * + * Asks saver to find data for the BMI and passes that data to the BMI for loading. + * + * Differes from `load_state` by also restting the internal time value to its initial state. + */ + virtual void load_hot_start(std::shared_ptr loader) = 0; + /** * Convert a time value from the model to an epoch time in seconds. * diff --git a/include/realizations/catchment/Bmi_Fortran_Formulation.hpp b/include/realizations/catchment/Bmi_Fortran_Formulation.hpp index 4f6863b883..d37ca469e7 100644 --- a/include/realizations/catchment/Bmi_Fortran_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Fortran_Formulation.hpp @@ -5,6 +5,7 @@ #if NGEN_WITH_BMI_FORTRAN +#include #include "Bmi_Module_Formulation.hpp" #include @@ -23,6 +24,18 @@ namespace realization { std::string get_formulation_type() const override; + /** + * Requests the BMI to copy its current state into memory. The state will remain in memory until either a new state is made or `free_serialization_state` is called. + * Because the Fortran BMI has no messaging for 64-bit integers, this overload will use the 32-bit integer interface. + * + * @return Span of the serialized data. + */ + const boost::span get_serialization_state() override; + + void load_serialization_state(const boost::span state) override; + + void free_serialization_state() override; + protected: /** @@ -49,6 +62,10 @@ namespace realization { friend class ::Bmi_Multi_Formulation_Test; friend class ::Bmi_Formulation_Test; friend class ::Bmi_Fortran_Formulation_Test; + + private: + // location to store serialized state from the BMI because pointer interfaces are not available for Fotran + std::vector serialized_state; }; } diff --git a/include/realizations/catchment/Bmi_Module_Formulation.hpp b/include/realizations/catchment/Bmi_Module_Formulation.hpp index cd6dda4969..150bd2ac38 100644 --- a/include/realizations/catchment/Bmi_Module_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Module_Formulation.hpp @@ -7,7 +7,6 @@ #include "Bmi_Adapter.hpp" #include #include "bmi_utilities.hpp" -#include #include #include "bmi/protocols.hpp" @@ -49,12 +48,14 @@ namespace realization { void create_formulation(geojson::PropertyMap properties) override; /** - * Passes a serialized representation of the model's state to ``saver`` - * - * Asks the model to serialize its state, queries the pointer - * and length, passes that to saver, and then releases it + * Create a save state, save it using the `State_Snapshot_Saver`, then clear the save state from memory. + * `this->get_id()` will be used as the unique ID for the saver. */ - void save_state(std::shared_ptr saver) const; + void save_state(std::shared_ptr saver) override; + + void load_state(std::shared_ptr loader) override; + + void load_hot_start(std::shared_ptr loader) override; /** * Get the collection of forcing output property names this instance can provide. @@ -290,9 +291,21 @@ namespace realization { const std::vector get_bmi_input_variables() const override; const std::vector get_bmi_output_variables() const override; - const boost::span get_serialization_state() const; - void load_serialization_state(const boost::span state) const; - void free_serialization_state() const; + /** + * Requests the BMI to copy its current state into memory. The state will remain in memory until either a new state is made or `free_serialization_state` is called. + * + * @return Span of the serialized data. + */ + virtual const boost::span get_serialization_state(); + /** + * Requests the BMI to load data from a previously saved state. This has a side effect of freeing a current state if it currently exists. + */ + virtual void load_serialization_state(const boost::span state); + /** + * Requests the BMI to clear a currently saved state from memory. + * Existing state pointers should not be used as the stored data may be freed depending on implementation. + */ + virtual void free_serialization_state(); void set_realization_file_format(bool is_legacy_format); virtual void check_mass_balance(const int& iteration, const int& total_steps, const std::string& timestamp) const override { diff --git a/include/realizations/catchment/Bmi_Multi_Formulation.hpp b/include/realizations/catchment/Bmi_Multi_Formulation.hpp index 64ee733db0..20d9158b94 100644 --- a/include/realizations/catchment/Bmi_Multi_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Multi_Formulation.hpp @@ -15,6 +15,7 @@ #include "ConfigurationException.hpp" #include "ExternalIntegrationException.hpp" +#include #define BMI_REALIZATION_CFG_PARAM_REQ__MODULES "modules" #define BMI_REALIZATION_CFG_PARAM_OPT__DEFAULT_OUT_VALS "default_output_values" @@ -56,6 +57,12 @@ namespace realization { } }; + void save_state(std::shared_ptr saver) override; + + void load_state(std::shared_ptr loader) override; + + void load_hot_start(std::shared_ptr loader) override; + /** * Convert a time value from the model to an epoch time in seconds. * @@ -674,6 +681,7 @@ namespace realization { bool is_realization_legacy_format() const; private: + friend class boost::serialization::access; /** * Setup a deferred provider for a nested module, tracking the class as needed. @@ -773,6 +781,8 @@ namespace realization { friend Bmi_Multi_Formulation_Test; friend class ::Bmi_Cpp_Multi_Array_Test; + template + void serialize(Archive& ar, const unsigned int version); }; } diff --git a/include/realizations/catchment/Bmi_Py_Formulation.hpp b/include/realizations/catchment/Bmi_Py_Formulation.hpp index 76904165c9..d3830d6282 100644 --- a/include/realizations/catchment/Bmi_Py_Formulation.hpp +++ b/include/realizations/catchment/Bmi_Py_Formulation.hpp @@ -35,6 +35,13 @@ namespace realization { bool is_bmi_output_variable(const std::string &var_name) const override; + /** + * Requests the BMI to load data from a previously saved state. This has a side effect of freeing a current state if it currently exists. + * + * The python BMI requires additional messaging for pre-allocating memory for load + */ + void load_serialization_state(const boost::span state) override; + protected: std::shared_ptr construct_model(const geojson::PropertyMap &properties) override; diff --git a/include/state_save_restore/File_Per_Unit.hpp b/include/state_save_restore/File_Per_Unit.hpp new file mode 100644 index 0000000000..faec8d966a --- /dev/null +++ b/include/state_save_restore/File_Per_Unit.hpp @@ -0,0 +1,38 @@ +#ifndef NGEN_FILE_PER_UNIT_HPP +#define NGEN_FILE_PER_UNIT_HPP + +#include + +class File_Per_Unit_Saver : public State_Saver +{ +public: + File_Per_Unit_Saver(std::string base_path); + ~File_Per_Unit_Saver(); + + std::shared_ptr initialize_snapshot(State_Durability durability) override; + + std::shared_ptr initialize_checkpoint_snapshot(snapshot_time_t epoch, State_Durability durability) override; + + void finalize() override; + +private: + std::string base_path_; +}; + + +class File_Per_Unit_Loader : public State_Loader +{ +public: + File_Per_Unit_Loader(std::string dir_path); + ~File_Per_Unit_Loader() = default; + + void finalize() override { }; + + std::shared_ptr initialize_snapshot() override; + + std::shared_ptr initialize_checkpoint_snapshot(State_Saver::snapshot_time_t epoch) override; +private: + std::string dir_path_; +}; + +#endif // NGEN_FILE_PER_UNIT_HPP diff --git a/include/state_save_restore/State_Save_Restore.hpp b/include/state_save_restore/State_Save_Restore.hpp new file mode 100644 index 0000000000..57a88b5f3e --- /dev/null +++ b/include/state_save_restore/State_Save_Restore.hpp @@ -0,0 +1,183 @@ +#ifndef NGEN_STATE_SAVE_RESTORE_HPP +#define NGEN_STATE_SAVE_RESTORE_HPP + +#include + +#include +#include + +#include +#include +#include +#include +#include + +#include "State_Save_Utils.hpp" + +class State_Saver; +class State_Loader; +class State_Snapshot_Saver; + +class State_Save_Config +{ +public: + /** + * Expects the tree @param config that potentially contains a "state_saving" key + * + * + */ + State_Save_Config(boost::property_tree::ptree const& config); + + /** + * Get state loaders that perform before the catchments are run. + * + * @return `std::pair`s of the label from the config and an instance of the loader. + */ + std::vector>> start_of_run_loaders() const; + + /** + * Get state savers that perform after the catchments have run to completion. + * + * @return `std::pair`s of the label from the config and an instance of the saver. + */ + std::vector>> end_of_run_savers() const; + + /** + * Get state loader that is intended to be performed before catchment processing starts. + * + * The returned pointer may be NULL if no configuration was made for existing data. + */ + std::unique_ptr hot_start() const; + + struct instance + { + instance(std::string const& direction, std::string const& label, std::string const& path, std::string const& mechanism, std::string const& timing); + + State_Save_Direction direction_; + State_Save_Mechanism mechanism_; + State_Save_When timing_; + std::string label_; + std::string path_; + + std::string mechanism_string() const; + }; + +private: + std::vector instances_; +}; + +class State_Saver +{ +public: + using snapshot_time_t = std::chrono::time_point; + + // Flag type to indicate whether state saving needs to ensure + // stability of saved data wherever it is stored before returning + // success + enum class State_Durability { relaxed, strict }; + + State_Saver() = default; + virtual ~State_Saver() = default; + + static snapshot_time_t snapshot_time_now(); + + /** + * Return an object suitable for saving a simulation state as of a + * particular moment in time, @param epoch + * + * @param durability indicates whether callers expect all + * potential errors to be checked and reported before finalize() + * and/or State_Snapshot_Saver::finish_saving() return + */ + virtual std::shared_ptr initialize_snapshot(State_Durability durability) = 0; + + virtual std::shared_ptr initialize_checkpoint_snapshot(snapshot_time_t epoch, State_Durability durability) = 0; + + /** + * Execute any logic necessary to cleanly finish usage, and + * potentially report errors, before destructors would + * execute. E.g. closing files opened in parallel across MPI + * ranks. + */ + virtual void finalize() = 0; +}; + +class State_Snapshot_Saver +{ +public: + State_Snapshot_Saver() = delete; + State_Snapshot_Saver(State_Saver::State_Durability durability); + virtual ~State_Snapshot_Saver() = default; + + /** + * Capture the data from a single unit of the simulation + */ + virtual void save_unit(std::string const& unit_name, boost::span data) = 0; + + /** + * Execute logic to complete the saving process + * + * Data may be flushed here, and delayed errors may be detected + * and reported here. With relaxed durability, error reports may + * not come until the parent State_Saver::finalize() call is made, + * or ever. + */ + virtual void finish_saving() = 0; + +protected: + State_Saver::State_Durability durability_; +}; + +class State_Snapshot_Loader; + +class State_Loader +{ +public: + State_Loader() = default; + virtual ~State_Loader() = default; + + /** + * Return an object suitable for loading a simulation state as of + * a particular moment in time, @param epoch + */ + virtual std::shared_ptr initialize_snapshot() = 0; + + virtual std::shared_ptr initialize_checkpoint_snapshot(State_Saver::snapshot_time_t epoch) = 0; + + /** + * Execute any logic necessary to cleanly finish usage, and + * potentially report errors, before destructors would + * execute. E.g. closing files opened in parallel across MPI + * ranks. + */ + virtual void finalize() = 0; +}; + +class State_Snapshot_Loader +{ +public: + State_Snapshot_Loader() = default; + virtual ~State_Snapshot_Loader() = default; + + /** + * Check if data of a unit name exists. + */ + virtual bool has_unit(const std::string &unit_name) = 0; + + /** + * Load data from whatever source, and pass it to @param unit_loader->load() + */ + virtual void load_unit(const std::string &unit_name, std::vector &data) = 0; + + /** + * Execute logic to complete the saving process + * + * Data may be flushed here, and delayed errors may be detected + * and reported here. With relaxed durability, error reports may + * not come until the parent State_Saver::finalize() call is made, + * or ever. + */ + virtual void finish_saving() = 0; +}; + +#endif // NGEN_STATE_SAVE_RESTORE_HPP diff --git a/include/state_save_restore/State_Save_Utils.hpp b/include/state_save_restore/State_Save_Utils.hpp new file mode 100644 index 0000000000..9713b660af --- /dev/null +++ b/include/state_save_restore/State_Save_Utils.hpp @@ -0,0 +1,30 @@ +#ifndef NGEN_STATE_SAVE_UTILS_HPP +#define NGEN_STATE_SAVE_UTILS_HPP + +namespace StateSaveNames { + const auto CREATE = "serialization_create"; + const auto STATE = "serialization_state"; + const auto FREE = "serialization_free"; + const auto SIZE = "serialization_size"; + const auto RESET = "reset_time"; +} + +enum class State_Save_Direction { + None = 0, + Save, + Load +}; + +enum class State_Save_Mechanism { + None = 0, + FilePerUnit +}; + +enum class State_Save_When { + None = 0, + EndOfRun, + FirstOfMonth, + StartOfRun +}; + +#endif diff --git a/include/state_save_restore/vecbuf.hpp b/include/state_save_restore/vecbuf.hpp new file mode 100644 index 0000000000..a20bc70e73 --- /dev/null +++ b/include/state_save_restore/vecbuf.hpp @@ -0,0 +1,125 @@ +#ifndef HPP_STRING_VECBUF +#define HPP_STRING_VECBUF +// https://gist.github.com/stephanlachnit/4a06f8475afd144e73235e2a2584b000 +// SPDX-FileCopyrightText: 2023 Stephan Lachnit +// SPDX-License-Identifier: MIT + +#include +#include +#include + +template> +class vecbuf : public std::basic_streambuf { +public: + using streambuf = std::basic_streambuf; + using char_type = typename streambuf::char_type; + using int_type = typename streambuf::int_type; + using traits_type = typename streambuf::traits_type; + using vector = std::vector; + using value_type = typename vector::value_type; + using size_type = typename vector::size_type; + + // Constructor for vecbuf with optional initial capacity + vecbuf(size_type capacity = 0) : vector_() { reserve(capacity); } + + // Forwarder for std::vector::shrink_to_fit() + constexpr void shrink_to_fit() { vector_.shrink_to_fit(); } + + // Forwarder for std::vector::clear() + constexpr void clear() { vector_.clear(); } + + // Forwarder for std::vector::resize(size) + constexpr void resize(size_type size) { vector_.resize(size); } + + // Forwarder for std::vector::reserve + constexpr void reserve(size_type capacity) { vector_.reserve(capacity); setp_from_vector(); } + + // Increase the capacity of the buffer by reserving the current_size + additional_capacity + constexpr void reserve_additional(size_type additional_capacity) { reserve(size() + additional_capacity); } + + // Forwarder for std::vector::data + constexpr value_type* data() { return vector_.data(); } + + // Forwarder for std::vector::size + constexpr size_type size() const { return vector_.size(); } + + // Forwarder for std::vector::capacity + constexpr size_type capacity() const { return vector_.capacity(); } + + // Implements std::basic_streambuf::xsputn + std::streamsize xsputn(const char_type* s, std::streamsize count) override { + try { + reserve_additional(count); + } + catch (const std::bad_alloc& error) { + // reserve did not work, use slow algorithm + return xsputn_slow(s, count); + } + // reserve worked, use fast algorithm + return xsputn_fast(s, count); + } + +protected: + // Calculates value to std::basic_streambuf::pbase from vector + constexpr value_type* pbase_from_vector() const { return const_cast(vector_.data()); } + + // Calculates value to std::basic_streambuf::pptr from vector + constexpr value_type* pptr_from_vector() const { return const_cast(vector_.data() + vector_.size()); } + + // Calculates value to std::basic_streambuf::epptr from vector + constexpr value_type* epptr_from_vector() const { return const_cast(vector_.data()) + vector_.capacity(); } + + // Sets the values for std::basic_streambuf::pbase, std::basic_streambuf::pptr and std::basic_streambuf::epptr from vector + constexpr void setp_from_vector() { streambuf::setp(pbase_from_vector(), epptr_from_vector()); streambuf::pbump(size()); } + +private: + // std::vector containing the data + vector vector_; + + // Fast implementation of std::basic_streambuf::xsputn if reserve_additional(count) succeeded + std::streamsize xsputn_fast(const char_type* s, std::streamsize count) { + // store current pptr (end of vector location) + auto* old_pptr = pptr_from_vector(); + // resize the vector, does not move since space already reserved + vector_.resize(vector_.size() + count); + // directly memcpy new content to old pptr (end of vector before it was resized) + traits_type::copy(old_pptr, s, count); + // reserve() already calls setp_from_vector(), only adjust pptr to new epptr + streambuf::pbump(count); + + return count; + } + + // Slow implementation of std::basic_streambuf::xsputn if reserve_additional(count) did not succeed, might calls std::basic_streambuf::overflow() + std::streamsize xsputn_slow(const char_type* s, std::streamsize count) { + // reserving entire vector failed, emplace char for char + std::streamsize written = 0; + while (written < count) { + try { + // copy one char, should throw eventually std::bad_alloc + vector_.emplace_back(s[written]); + } + catch (const std::bad_alloc& error) { + // try overflow(), if eof return, else continue + int_type c = this->overflow(traits_type::to_int_type(s[written])); + if (traits_type::eq_int_type(c, traits_type::eof())) { + return written; + } + } + // update pbase, pptr and epptr + setp_from_vector(); + written++; + } + return written; + } + +}; + +class membuf : public std::streambuf { +public: + membuf(char *begin, size_t size) { + this->setg(begin, begin, begin + size); + } +}; + +#endif diff --git a/include/utilities/bmi/mass_balance.hpp b/include/utilities/bmi/mass_balance.hpp index e8883232ff..046b1002e8 100644 --- a/include/utilities/bmi/mass_balance.hpp +++ b/include/utilities/bmi/mass_balance.hpp @@ -152,4 +152,3 @@ namespace models{ namespace bmi{ namespace protocols{ }; }}} - diff --git a/src/NGen.cpp b/src/NGen.cpp index 9fefc64468..79946fd64f 100644 --- a/src/NGen.cpp +++ b/src/NGen.cpp @@ -54,6 +54,8 @@ #include #include +#include + void ngen::exec_info::runtime_summary(std::ostream& stream) noexcept { stream << "Runtime configuration summary:\n"; @@ -514,9 +516,10 @@ int run_ngen(int argc, char* argv[], int mpi_num_procs, int mpi_rank) { } auto simulation_time_config = realization::config::Time(*possible_simulation_time).make_params(); - sim_time = std::make_shared(simulation_time_config); + auto state_saving_config = State_Save_Config(realization_config); + ss << "Initializing formulations" << std::endl; LOG(ss.str(), LogLevel::INFO); ss.str(""); @@ -696,6 +699,15 @@ int run_ngen(int argc, char* argv[], int mpi_num_procs, int mpi_rank) { std::chrono::duration time_elapsed_init = time_done_init - time_start; LOG("[TIMING]: Init: " + std::to_string(time_elapsed_init.count()), LogLevel::INFO); + { // optionally run hot start loader if set in state saving config + auto hot_start_loader = state_saving_config.hot_start(); + if (hot_start_loader) { + LOG(LogLevel::INFO, "Loading hot start data from prior snapshot."); + std::shared_ptr snapshot_loader = hot_start_loader->initialize_snapshot(); + simulation->load_hot_start(snapshot_loader, manager->get_t_route_config_file_with_path()); + } + } + simulation->run_catchments(); #if NGEN_WITH_MPI @@ -733,8 +745,6 @@ int run_ngen(int argc, char* argv[], int mpi_num_procs, int mpi_rank) { std::chrono::duration time_elapsed_nexus_output = time_done_nexus_output - time_done_simulation; LOG("[TIMING]: Nexus outflow file writing: " + std::to_string(time_elapsed_nexus_output.count()), LogLevel::INFO); - manager->finalize(); - #if NGEN_WITH_MPI MPI_Barrier(MPI_COMM_WORLD); #endif @@ -761,6 +771,16 @@ int run_ngen(int argc, char* argv[], int mpi_num_procs, int mpi_rank) { LOG("[TIMING]: Coastal: " + std::to_string(time_elapsed_coastal.count()), LogLevel::INFO); #endif + // run any end-of-run state saving after T-Route has finished but before starting to tear down data structures + for (const auto& end_saver : state_saving_config.end_of_run_savers()) { + LOG(LogLevel::INFO, "Saving end of run simulation data for state saving config " + end_saver.first); + std::shared_ptr snapshot = end_saver.second->initialize_snapshot(State_Saver::State_Durability::strict); + simulation->save_end_of_run(snapshot); + } + + simulation->finalize(); + manager->finalize(); + auto time_done_total = std::chrono::steady_clock::now(); std::chrono::duration time_elapsed_total = time_done_total - time_start; diff --git a/src/bmi/Bmi_Py_Adapter.cpp b/src/bmi/Bmi_Py_Adapter.cpp index 7436e2b48c..5fdc3eb8ec 100644 --- a/src/bmi/Bmi_Py_Adapter.cpp +++ b/src/bmi/Bmi_Py_Adapter.cpp @@ -104,25 +104,35 @@ void Bmi_Py_Adapter::GetValue(std::string name, void *dest) { msg += e.what(); Logger::logMsgAndThrowError(msg); } - - if (cxx_type == "short") { - copy_to_array(name, (short *) dest); + if (cxx_type == "signed char") { + this->copy_to_array(name, static_cast(dest)); + } else if (cxx_type == "unsigned char") { + this->copy_to_array(name, static_cast(dest)); + } else if (cxx_type == "short") { + this->copy_to_array(name, static_cast(dest)); + } else if (cxx_type == "unsigned short") { + this->copy_to_array(name, static_cast(dest)); } else if (cxx_type == "int") { - copy_to_array(name, (int *) dest); + this->copy_to_array(name, static_cast(dest)); + } else if (cxx_type == "unsigned int") { + this->copy_to_array(name, static_cast(dest)); } else if (cxx_type == "long") { - copy_to_array(name, (long *) dest); + this->copy_to_array(name, static_cast(dest)); + } else if (cxx_type == "unsigned long") { + this->copy_to_array(name, static_cast(dest)); } else if (cxx_type == "long long") { - copy_to_array(name, (long long *) dest); + this->copy_to_array(name, static_cast(dest)); + } else if (cxx_type == "unsigned long long") { + this->copy_to_array(name, static_cast(dest)); } else if (cxx_type == "float") { - copy_to_array(name, (float *) dest); + this->copy_to_array(name, static_cast(dest)); } else if (cxx_type == "double") { - copy_to_array(name, (double *) dest); + this->copy_to_array(name, static_cast(dest)); } else if (cxx_type == "long double") { - copy_to_array(name, (long double *) dest); + this->copy_to_array(name, static_cast(dest)); } else { Logger::logMsgAndThrowError("Bmi_Py_Adapter can't get value of unsupported type: " + cxx_type); } - } void Bmi_Py_Adapter::GetValueAtIndices(std::string name, void *dest, int *inds, int count) { @@ -189,30 +199,30 @@ std::string Bmi_Py_Adapter::get_bmi_type_simple_name() const { void Bmi_Py_Adapter::SetValueAtIndices(std::string name, int *inds, int count, void *src) { std::string val_type = GetVarType(name); size_t val_item_size = (size_t)GetVarItemsize(name); - - // The available types and how they are handled here should match what is in get_value_at_indices - if (val_type == "int" && val_item_size == sizeof(short)) { - set_value_at_indices(name, inds, count, src, val_type); - } else if (val_type == "int" && val_item_size == sizeof(int)) { - set_value_at_indices(name, inds, count, src, val_type); - } else if (val_type == "int" && val_item_size == sizeof(long)) { - set_value_at_indices(name, inds, count, src, val_type); - } else if (val_type == "int" && val_item_size == sizeof(long long)) { - set_value_at_indices(name, inds, count, src, val_type); - } else if (val_type == "float" && val_item_size == sizeof(float)) { - set_value_at_indices(name, inds, count, src, val_type); - } else if (val_type == "float" && val_item_size == sizeof(double)) { - set_value_at_indices(name, inds, count, src, val_type); - } else if (val_type == "float64" && val_item_size == sizeof(double)) { - set_value_at_indices(name, inds, count, src, val_type); - } else if (val_type == "float" && val_item_size == sizeof(long double)) { - set_value_at_indices(name, inds, count, src, val_type); - } else { + std::string cxx_type = this->get_analogous_cxx_type(val_type, val_item_size); + + // macro for checking type and calling `set_value_at_indices` with that type + #define BMI_PY_SET_VALUE_INDEX(type) if (cxx_type == #type) { this->set_value_at_indices(name, inds, count, src, val_type); } + BMI_PY_SET_VALUE_INDEX(signed char) + else BMI_PY_SET_VALUE_INDEX(unsigned char) + else BMI_PY_SET_VALUE_INDEX(short) + else BMI_PY_SET_VALUE_INDEX(unsigned short) + else BMI_PY_SET_VALUE_INDEX(int) + else BMI_PY_SET_VALUE_INDEX(unsigned int) + else BMI_PY_SET_VALUE_INDEX(long) + else BMI_PY_SET_VALUE_INDEX(unsigned long) + else BMI_PY_SET_VALUE_INDEX(long long) + else BMI_PY_SET_VALUE_INDEX(unsigned long long) + else BMI_PY_SET_VALUE_INDEX(float) + else BMI_PY_SET_VALUE_INDEX(double) + else BMI_PY_SET_VALUE_INDEX(long double) + else { Logger::logMsgAndThrowError( "(Bmi_Py_Adapter) Failed attempt to SET values of BMI variable '" + name + "' from '" + model_name + "' model: model advertises unsupported combination of type (" + val_type + ") and size (" + std::to_string(val_item_size) + ")."); } + #undef BMI_PY_SET_VALUE_INDEX } void Bmi_Py_Adapter::Update() { diff --git a/src/core/Layer.cpp b/src/core/Layer.cpp index aac9ced099..432b918aa3 100644 --- a/src/core/Layer.cpp +++ b/src/core/Layer.cpp @@ -1,5 +1,6 @@ #include #include +#include #if NGEN_WITH_MPI #include "HY_Features_MPI.hpp" @@ -92,3 +93,36 @@ void ngen::Layer::update_models(boost::span catchment_outflows, simulation_time.advance_timestep(); } } + +void ngen::Layer::save_state_snapshot(std::shared_ptr snapshot_saver) +{ + // XXX Handle any of this class's own state as a meta-data unit + + for (auto const& id : processing_units) { + auto r = features.catchment_at(id); + auto r_c = std::dynamic_pointer_cast(r); + r_c->save_state(snapshot_saver); + } +} + +void ngen::Layer::load_state_snapshot(std::shared_ptr snapshot_loader) +{ + // XXX Handle any of this class's own state as a meta-data unit + + for (auto const& id : processing_units) { + auto r = features.catchment_at(id); + auto r_c = std::dynamic_pointer_cast(r); + r_c->load_state(snapshot_loader); + } +} + +void ngen::Layer::load_hot_start(std::shared_ptr snapshot_loader) +{ + // XXX Handle any of this class's own state as a meta-data unit + + for (auto const& id : processing_units) { + auto r = features.catchment_at(id); + auto r_c = std::dynamic_pointer_cast(r); + r_c->load_hot_start(snapshot_loader); + } +} diff --git a/src/core/NgenSimulation.cpp b/src/core/NgenSimulation.cpp index c5cce69103..05d4a8076a 100644 --- a/src/core/NgenSimulation.cpp +++ b/src/core/NgenSimulation.cpp @@ -8,12 +8,15 @@ #include "HY_Features.hpp" #endif -#if NGEN_WITH_ROUTING -#include "bmi/Bmi_Py_Adapter.hpp" -#endif // NGEN_WITH_ROUTING - +#include "state_save_restore/State_Save_Utils.hpp" +#include "state_save_restore/State_Save_Restore.hpp" #include "parallel_utils.h" +namespace { + const auto NGEN_UNIT_NAME = "ngen"; + const auto TROUTE_UNIT_NAME = "troute"; +} + NgenSimulation::NgenSimulation( Simulation_Time const& sim_time, std::vector> layers, @@ -54,6 +57,15 @@ void NgenSimulation::run_catchments() } } +void NgenSimulation::finalize() { +#if NGEN_WITH_ROUTING + if (this->py_troute_) { + this->py_troute_->Finalize(); + this->py_troute_.reset(); + } +#endif // NGEN_WITH_ROUTING +} + void NgenSimulation::advance_models_one_output_step() { // The Inner loop will advance all layers unless doing so will break one of two constraints @@ -108,6 +120,93 @@ void NgenSimulation::advance_models_one_output_step() } +void NgenSimulation::save_state_snapshot(std::shared_ptr snapshot_saver) +{ + // TODO: save the current nexus data + auto unit_name = this->unit_name(); + // XXX Handle self, then recursively pass responsibility to Layers + for (auto& layer : layers_) { + layer->save_state_snapshot(snapshot_saver); + } +} + +void NgenSimulation::save_end_of_run(std::shared_ptr snapshot_saver) +{ + for (auto& layer : layers_) { + layer->save_state_snapshot(snapshot_saver); + } +#if NGEN_WITH_ROUTING + if (this->mpi_rank_ == 0 && this->py_troute_) { + uint64_t serialization_size; + this->py_troute_->SetValue(StateSaveNames::CREATE, &serialization_size); + this->py_troute_->GetValue(StateSaveNames::SIZE, &serialization_size); + void *troute_state = this->py_troute_->GetValuePtr(StateSaveNames::STATE); + boost::span span(static_cast(troute_state), serialization_size); + snapshot_saver->save_unit(TROUTE_UNIT_NAME, span); + this->py_troute_->SetValue(StateSaveNames::FREE, &serialization_size); + } +#endif // NGEN_WITH_ROUTING +} + +void NgenSimulation::load_state_snapshot(std::shared_ptr snapshot_loader) { + // TODO: load the state data related to nexus outflows + auto unit_name = this->unit_name(); + for (auto& layer : layers_) { + layer->load_state_snapshot(snapshot_loader); + } +} + +void NgenSimulation::load_hot_start(std::shared_ptr snapshot_loader, const std::string &t_route_config_file_with_path) { + for (auto& layer : layers_) { + layer->load_hot_start(snapshot_loader); + } +#if NGEN_WITH_ROUTING + if (this->mpi_rank_ == 0) { + bool config_file_set = !t_route_config_file_with_path.empty(); + bool snapshot_exists = snapshot_loader->has_unit(TROUTE_UNIT_NAME); + if (config_file_set && snapshot_exists) { + LOG(LogLevel::DEBUG, "Loading T-Route data from snapshot."); + std::vector troute_data; + snapshot_loader->load_unit(TROUTE_UNIT_NAME, troute_data); + if (py_troute_ == NULL) { + this->make_troute(t_route_config_file_with_path); + } + py_troute_->set_value_unchecked(StateSaveNames::STATE, troute_data.data(), troute_data.size()); + double rt; // unused by the BMI but needed for messaging + py_troute_->SetValue(StateSaveNames::RESET, &rt); + } else if (!config_file_set && !snapshot_exists) { + LOG(LogLevel::DEBUG, "No data set for loading T-Route."); + } else if (config_file_set && !snapshot_exists) { + LOG(LogLevel::WARNING, "A T-Route config file was provided but the load data does not contain T-Route data. T-Route will be run as a cold start."); + } else if (!config_file_set && snapshot_exists) { + LOG(LogLevel::WARNING, "A T-Route hot start snapshot exists but no config file was provided. T-Route will not be loaded or run,"); + } + } +#endif // NGEN_WITH_ROUTING +} + + +void NgenSimulation::make_troute(const std::string &t_route_config_file_with_path) { +#if NGEN_WITH_ROUTING + this->py_troute_ = std::make_unique( + "T-Route", + t_route_config_file_with_path, + "troute_nwm_bmi.troute_bmi.BmiTroute", + true + ); +#endif // NGEN_WITH_ROUTING +} + + +std::string NgenSimulation::unit_name() const { +#if NGEN_WITH_MPI + return "ngen_" + std::to_string(this->mpi_rank_); +#else + return "ngen_0"; +#endif // NGEN_WITH_MPI +} + + int NgenSimulation::get_nexus_index(std::string const& nexus_id) const { auto iter = nexus_indexes_.find(nexus_id); @@ -203,14 +302,13 @@ void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::s int delta_time = sim_time_->get_output_interval_seconds(); // model for routing - models::bmi::Bmi_Py_Adapter py_troute("T-Route", t_route_config_file_with_path, "troute_nwm_bmi.troute_bmi.BmiTroute", true); + if (this->py_troute_ == NULL) { + this->make_troute(t_route_config_file_with_path); + } + this->py_troute_->set_value_unchecked("ngen_dt", &delta_time, 1); - // tell BMI to resize nexus containers - int64_t nexus_count = routing_nexus_indexes->size(); - py_troute.SetValue("land_surface_water_source__volume_flow_rate__count", &nexus_count); - py_troute.SetValue("land_surface_water_source__id__count", &nexus_count); // set up nexus id indexes - std::vector nexus_df_index(nexus_count); + std::vector nexus_df_index(routing_nexus_indexes->size()); for (const auto& key_value : *routing_nexus_indexes) { int id_index = key_value.second; @@ -228,14 +326,11 @@ void NgenSimulation::run_routing(NgenSimulation::hy_features_t &features, std::s } nexus_df_index[id_index] = id_as_int; } - py_troute.SetValue("land_surface_water_source__id", nexus_df_index.data()); - for (int i = 0; i < number_of_timesteps; ++i) { - py_troute.SetValue("land_surface_water_source__volume_flow_rate", - routing_nexus_downflows->data() + (i * nexus_count)); - py_troute.Update(); - } - // Finalize will write the output file - py_troute.Finalize(); + // use unchecked messaging to allow the BMI to change its container size + py_troute_->set_value_unchecked("land_surface_water_source__id", nexus_df_index.data(), nexus_df_index.size()); + py_troute_->set_value_unchecked("land_surface_water_source__volume_flow_rate", routing_nexus_downflows->data(), routing_nexus_downflows->size()); + // run the T-Route model and create outputs through Update + py_troute_->Update(); } #endif // NGEN_WITH_ROUTING } diff --git a/src/forcing/NetCDFPerFeatureDataProvider.cpp b/src/forcing/NetCDFPerFeatureDataProvider.cpp index ad936ca5d0..bb8507145d 100644 --- a/src/forcing/NetCDFPerFeatureDataProvider.cpp +++ b/src/forcing/NetCDFPerFeatureDataProvider.cpp @@ -135,8 +135,6 @@ NetCDFPerFeatureDataProvider::NetCDFPerFeatureDataProvider(std::string input_pat // correct string release nc_free_string(num_ids,&string_buffers[0]); -// Modified code to handle units, epoch start, and reading all time values correctly - KSL - // Get the time variable - getVar collects all values at once and stores in memory // Extremely large timespans could be problematic, but for ngen use cases, this should not be a problem auto time_var = nc_file->getVar("Time"); @@ -147,8 +145,22 @@ NetCDFPerFeatureDataProvider::NetCDFPerFeatureDataProvider(std::string input_pat std::vector raw_time(num_times); try { - time_var.getVar(raw_time.data()); - } catch(const netCDF::exceptions::NcException& e) { + auto dim_count = time_var.getDimCount(); + // Old-format files have dimensions (catchment, time), new-format + // files generated by the forcings engine have just (time) + if (dim_count == 2) { + if (time_var.getDim(0).getName() != "catchment-id" || time_var.getDim(1).getName() != "time") { + Logger::logMsgAndThrowError("In NetCDF file '" + input_path + "', 'Time' variable dimensions don't match expectations"); + } + time_var.getVar({0ul, 0ul}, {1ul, num_times}, raw_time.data()); + } else if (dim_count == 1) { + time_var.getVar({0ul}, {num_times}, raw_time.data()); + } else { + throw std::runtime_error("Unexpected " + std::to_string(dim_count) + + " dimensions on Time variable in NetCDF file '" + + input_path + "'"); + } + } catch(const std::exception& e) { netcdf_ss << "Error reading time variable: " << e.what() << std::endl; LOG(netcdf_ss.str(), LogLevel::WARNING); netcdf_ss.str(""); throw; @@ -157,7 +169,6 @@ NetCDFPerFeatureDataProvider::NetCDFPerFeatureDataProvider(std::string input_pat std::string time_units; try { time_var.getAtt("units").getValues(time_units); - } catch(const netCDF::exceptions::NcException& e) { netcdf_ss << "Error reading time units: " << e.what() << std::endl; LOG(netcdf_ss.str(), LogLevel::WARNING); netcdf_ss.str(""); @@ -169,27 +180,43 @@ NetCDFPerFeatureDataProvider::NetCDFPerFeatureDataProvider(std::string input_pat double time_scale_factor = 1.0; std::time_t epoch_start_time = 0; - //The following makes some assumptions that NetCDF output from the forcing engine will be relatively uniform - //Specifically, it assumes time values are in units since the Unix Epoch. - //If the forcings engine outputs additional unit formats, this will need to be expanded - if (time_units.find("minutes since") != std::string::npos) { + std::string time_base_unit; + auto since_index = time_units.find("since"); + if (since_index != std::string::npos) { + time_base_unit = time_units.substr(0, since_index - 1); + + std::string datetime_str = time_units.substr(since_index + 6); + std::tm tm = {}; + std::istringstream ss(datetime_str); + ss >> std::get_time(&tm, "%Y-%m-%d %H:%M:%S"); // This may be particularly inflexible + epoch_start_time = timegm(&tm); // timegm may not be available in all environments/OSes ie: Windows + } else { + time_base_unit = time_units; + } + + if (time_base_unit == "minutes") { time_unit = TIME_MINUTES; time_scale_factor = 60.0; - } else if (time_units.find("hours since") != std::string::npos) { + } else if (time_base_unit == "hours") { time_unit = TIME_HOURS; time_scale_factor = 3600.0; - } else { + } else if (time_base_unit == "seconds" || time_base_unit == "s") { time_unit = TIME_SECONDS; time_scale_factor = 1.0; + } else if (time_base_unit == "milliseconds" || time_base_unit == "ms") { + time_unit = TIME_MILLISECONDS; + time_scale_factor = 1.0e-3; + } else if (time_base_unit == "microseconds" || time_base_unit == "us") { + time_unit = TIME_MICROSECONDS; + time_scale_factor = 1.0e-6; + } else if (time_base_unit == "nanoseconds" || time_base_unit == "ns") { + time_unit = TIME_NANOSECONDS; + time_scale_factor = 1.0e-9; + } else { + Logger::logMsgAndThrowError("In NetCDF file '" + input_path + "', time unit '" + time_base_unit + "' could not be converted"); } - //This is also based on the NetCDF from the forcings engine, and may not be super flexible - std::string datetime_str = time_units.substr(time_units.find("since") + 6); - std::tm tm = {}; - std::istringstream ss(datetime_str); - ss >> std::get_time(&tm, "%Y-%m-%d %H:%M:%S"); //This may be particularly inflexible - epoch_start_time = timegm(&tm); //timegm may not be available in all environments/OSes ie: Windows + time_vals.resize(raw_time.size()); -// End modification - KSL std::transform(raw_time.begin(), raw_time.end(), time_vals.begin(), [&](const auto& n) { @@ -214,13 +241,20 @@ NetCDFPerFeatureDataProvider::NetCDFPerFeatureDataProvider(std::string input_pat #endif netcdf_ss << "All time intervals are constant within tolerance." << std::endl; - LOG(netcdf_ss.str(), LogLevel::SEVERE); netcdf_ss.str(""); + LOG(netcdf_ss.str(), LogLevel::DEBUG); netcdf_ss.str(""); // determine start_time and stop_time; start_time = time_vals[0]; stop_time = time_vals.back() + time_stride; sim_to_data_time_offset = sim_start_date_time_epoch - start_time; + + netcdf_ss << "NetCDF Forcing from file '" << input_path << "'" + << "Start time " << (time_t)start_time + << ", Stop time " << (time_t)stop_time + << ", sim_start_date_time_epoch " << sim_start_date_time_epoch + ; + LOG(netcdf_ss.str(), LogLevel::DEBUG); netcdf_ss.str(""); } NetCDFPerFeatureDataProvider::~NetCDFPerFeatureDataProvider() = default; @@ -304,7 +338,8 @@ double NetCDFPerFeatureDataProvider::get_value(const CatchmentAggrDataSelector& auto stride = idx2 - idx1; - std::vector start, count; + std::vector start(2), count(2); + std::vector var_index_map(2); auto cat_pos = id_pos[selector.get_id()]; @@ -325,16 +360,29 @@ double NetCDFPerFeatureDataProvider::get_value(const CatchmentAggrDataSelector& //TODO: Currently assuming a whole variable cache slice across all catchments for a single timestep...but some stuff here to support otherwise. // For reference: https://stackoverflow.com/a/72030286 -//Modified to work with NetCDF dimension shapes and fix errors - KSL size_t cache_slices_t_n = (read_len + cache_slice_t_size - 1) / cache_slice_t_size; // Ceiling division to ensure remainders have a slice - - //Explicitly setting dimension shapes auto dims = ncvar.getDims(); - size_t catchment_dim_size = dims[1].getSize(); - size_t time_dim_size = dims[0].getSize(); - //Cache slicing - modified to work with dimensions structure + int dim_time, dim_catchment; + if (dims.size() != 2) { + Logger::logMsgAndThrowError("Variable dimension count isn't 2"); + } + if (dims[0].getName() == "time" && dims[1].getName() == "catchment-id") { + // Forcings Engine NetCDF output case + dim_time = 0; + dim_catchment = 1; + } else if (dims[1].getName() == "time" && dims[0].getName() == "catchment-id") { + // Classic NetCDF file case + dim_time = 1; + dim_catchment = 0; + } else { + Logger::logMsgAndThrowError("Variable dimensions aren't 'time' and 'catchment-id'"); + } + + size_t time_dim_size = dims[dim_time].getSize(); + size_t catchment_dim_size = dims[dim_catchment].getSize(); + for( size_t i = 0; i < cache_slices_t_n; i++ ) { std::shared_ptr> cached; size_t cache_t_idx = idx1 + i * cache_slice_t_size; @@ -345,14 +393,18 @@ double NetCDFPerFeatureDataProvider::get_value(const CatchmentAggrDataSelector& cached = value_cache.get(key).get(); } else { cached = std::make_shared>(catchment_dim_size * slice_size); - start.clear(); - start.push_back(cache_t_idx); // start from correct time index - start.push_back(0); // Start from the first catchment - count.clear(); - count.push_back(slice_size); // Read the calculated slice size for time - count.push_back(catchment_dim_size); // Read all catchments + start[dim_time] = cache_t_idx; // start from correct time index + start[dim_catchment] = 0; // Start from the first catchment + count[dim_time] = slice_size; // Read the calculated slice size for time + count[dim_catchment] = catchment_dim_size; // Read all catchments + // Whichever order the file stores the data in, the + // resulting array should have all catchments for a given + // time step contiguous + var_index_map[dim_time] = catchment_dim_size; + var_index_map[dim_catchment] = 1; + try { - ncvar.getVar(start,count,&(*cached)[0]); + ncvar.getVar(start,count, {1l, 1l}, var_index_map, cached->data()); value_cache.insert(key, cached); } catch (netCDF::exceptions::NcException& e) { netcdf_ss << "NetCDF exception: " << e.what() << std::endl; @@ -377,7 +429,7 @@ double NetCDFPerFeatureDataProvider::get_value(const CatchmentAggrDataSelector& } } } -// End modification + rvalue = 0.0; double a , b = 0.0; diff --git a/src/realizations/catchment/Bmi_Fortran_Formulation.cpp b/src/realizations/catchment/Bmi_Fortran_Formulation.cpp index 390a6fb4e7..bfd59b0795 100644 --- a/src/realizations/catchment/Bmi_Fortran_Formulation.cpp +++ b/src/realizations/catchment/Bmi_Fortran_Formulation.cpp @@ -6,6 +6,7 @@ #include "Bmi_Fortran_Formulation.hpp" #include "Bmi_Fortran_Adapter.hpp" #include "Constants.h" +#include "state_save_restore/State_Save_Utils.hpp" using namespace realization; using namespace models::bmi; @@ -93,4 +94,46 @@ double Bmi_Fortran_Formulation::get_var_value_as_double(const int &index, const return 1.0; } +const boost::span Bmi_Fortran_Formulation::get_serialization_state() { + auto model = this->get_bmi_model(); + // create the serialized state on the Fortran BMI + int size_int = 0; + model->SetValue(StateSaveNames::CREATE, &size_int); + model->GetValue(StateSaveNames::SIZE, &size_int); + // resize the state to the array to the size of the Fortran's backing array + this->serialized_state.resize(size_int); + // since GetValuePtr on the Fortran BMI does not work currently, store the data on the formulation + model->GetValue(StateSaveNames::STATE, this->serialized_state.data()); + // the BMI can have its state freed immediately since the data is now stored on the formulation + model->SetValue(StateSaveNames::FREE, &size_int); + // return a span of the data stored on the formulation + const boost::span span(this->serialized_state.data(), this->serialized_state.size()); + return span; +} + +void Bmi_Fortran_Formulation::load_serialization_state(const boost::span state) { + auto model = this->get_bmi_model(); + int item_size = model->GetVarItemsize(StateSaveNames::STATE); + // assert the number of chars aligns with the storage array to prevent reading out of bounds + if (state.size() % item_size != 0) { + std::string error = "Fortran Deserialization: The number of bytes in the state (" + std::to_string(state.size()) + + ") must be a multiple of the size of the storage unit (" + std::to_string(item_size) + ")"; + LOG(LogLevel::SEVERE, error); + throw std::runtime_error(error); + } + // setting size is a workaround for loading the state. + // The BMI Fortran interface shapes the incoming pointer to the same size as the data currently backing the BMI's variable. + // By setting the size, the BMI can lie about the size of its state variable to that interface. + int false_nbytes = state.size(); + model->SetValue(StateSaveNames::SIZE, &false_nbytes); + model->SetValue(StateSaveNames::STATE, state.data()); +} + +void Bmi_Fortran_Formulation::free_serialization_state() { + // The serialized data needs to be stored on the formluation since GetValuePtr is not available on Fortran BMIs. + // The backing BMI's serialization data should already be freed during `get_serialization_state`, so clearing the formulation's data is all that is needed. + this->serialized_state.clear(); + this->serialized_state.shrink_to_fit(); +} + #endif // NGEN_WITH_BMI_FORTRAN diff --git a/src/realizations/catchment/Bmi_Module_Formulation.cpp b/src/realizations/catchment/Bmi_Module_Formulation.cpp index da478a294e..3fd0b3dcd7 100644 --- a/src/realizations/catchment/Bmi_Module_Formulation.cpp +++ b/src/realizations/catchment/Bmi_Module_Formulation.cpp @@ -2,6 +2,8 @@ #include "utilities/logging_utils.h" #include #include "Logger.hpp" +#include "state_save_restore/State_Save_Utils.hpp" +#include std::stringstream bmiform_ss; @@ -15,19 +17,29 @@ namespace realization { inner_create_formulation(properties, true); } - void Bmi_Module_Formulation::save_state(std::shared_ptr saver) const { - auto model = get_bmi_model(); + void Bmi_Module_Formulation::save_state(std::shared_ptr saver) { + uint64_t size = 1; + boost::span data = this->get_serialization_state(); - size_t size = 1; - model->SetValue("serialization_create", &size); - model->GetValue("serialization_size", &size); + // Rely on Formulation_Manager also using this->get_id() + // as a unique key for the individual catchment + // formulations + saver->save_unit(this->get_id(), data); - auto serialization_state = static_cast(model->GetValuePtr("serialization_state")); - boost::span data(serialization_state, size); + this->free_serialization_state(); + } - saver->save(data); + void Bmi_Module_Formulation::load_state(std::shared_ptr loader) { + std::vector buffer; + loader->load_unit(this->get_id(), buffer); + boost::span data(buffer.data(), buffer.size()); + this->load_serialization_state(data); + } - model->SetValue("serialization_free", &size); + void Bmi_Module_Formulation::load_hot_start(std::shared_ptr loader) { + this->load_state(loader); + double rt; + this->get_bmi_model()->SetValue(StateSaveNames::FREE, &rt); } boost::span Bmi_Module_Formulation::get_available_variable_names() const { @@ -1082,28 +1094,28 @@ namespace realization { } - const boost::span Bmi_Module_Formulation::get_serialization_state() const { - auto bmi = this->bmi_model; - // create a new serialized state, getting the amount of data that was saved - uint64_t* size = (uint64_t*)bmi->GetValuePtr("serialization_create"); - // get the pointer of the new state - char* serialized = (char*)bmi->GetValuePtr("serialization_state"); - const boost::span span(serialized, *size); + const boost::span Bmi_Module_Formulation::get_serialization_state() { + auto model = get_bmi_model(); + uint64_t size = 0; + model->SetValue(StateSaveNames::CREATE, &size); + model->GetValue(StateSaveNames::SIZE, &size); + auto serialization_state = static_cast(model->GetValuePtr(StateSaveNames::STATE)); + const boost::span span(serialization_state, size); return span; } - void Bmi_Module_Formulation::load_serialization_state(const boost::span state) const { + void Bmi_Module_Formulation::load_serialization_state(const boost::span state) { auto bmi = this->bmi_model; // grab the pointer to the underlying state data void* data = (void*)state.data(); // load the state through SetValue - bmi->SetValue("serialization_state", data); + bmi->SetValue(StateSaveNames::STATE, data); } - void Bmi_Module_Formulation::free_serialization_state() const { + void Bmi_Module_Formulation::free_serialization_state() { auto bmi = this->bmi_model; // send message to clear memory associated with serialized data void* _; // this pointer will be unused by SetValue - bmi->SetValue("serialization_free", _); + bmi->SetValue(StateSaveNames::FREE, _); } } diff --git a/src/realizations/catchment/Bmi_Multi_Formulation.cpp b/src/realizations/catchment/Bmi_Multi_Formulation.cpp index 145b7dfb73..1f4fbbf245 100644 --- a/src/realizations/catchment/Bmi_Multi_Formulation.cpp +++ b/src/realizations/catchment/Bmi_Multi_Formulation.cpp @@ -13,8 +13,53 @@ #include "Bmi_Py_Formulation.hpp" #include "Logger.hpp" +#include "state_save_restore/vecbuf.hpp" +#include "state_save_restore/State_Save_Utils.hpp" +#include + +#include +#include +#include + using namespace realization; + +void Bmi_Multi_Formulation::save_state(std::shared_ptr saver) { + LOG(LogLevel::DEBUG, "Saving state for Multi-BMI %s", this->get_id()); + vecbuf data; + boost::archive::binary_oarchive archive(data); + // serialization function handles freeing the sub-BMI states after archiving them + archive << (*this); + // it's recommended to keep data pointers around until serialization completes, + // so freeing the BMI states is done after the data buffer has been completely written to + for (const nested_module_ptr &m : modules) { + auto bmi = dynamic_cast(m.get()); + bmi->free_serialization_state(); + } + boost::span span(data.data(), data.size()); + saver->save_unit(this->get_id(), span); +} + +void Bmi_Multi_Formulation::load_state(std::shared_ptr loader) { + LOG(LogLevel::DEBUG, "Loading save state for Multi-BMI %s", this->get_id()); + std::vector data; + loader->load_unit(this->get_id(), data); + membuf stream(data.data(), data.size()); + boost::archive::binary_iarchive archive(stream); + archive >> (*this); +} + +void Bmi_Multi_Formulation::load_hot_start(std::shared_ptr loader) { + this->load_state(loader); + double rt; + LOG(LogLevel::DEBUG, "Resetting time for sub-BMIs"); + // Multi-BMI's current forwards its primary BMI's current time, so no additional action needed for the formulation's reset time + for (const nested_module_ptr &m : modules) { + auto bmi = dynamic_cast(m.get()); + bmi->get_bmi_model()->SetValue(StateSaveNames::RESET, &rt); + } +} + void Bmi_Multi_Formulation::create_multi_formulation(geojson::PropertyMap properties, bool needs_param_validation) { if (needs_param_validation) { validate_parameters(properties); @@ -609,6 +654,34 @@ void Bmi_Multi_Formulation::set_realization_file_format(bool is_legacy_format){ legacy_json_format = is_legacy_format; } +template +void Bmi_Multi_Formulation::serialize(Archive &ar, const unsigned int version) { + uint64_t data_size; + std::vector buffer; + for (const nested_module_ptr &m : modules) { + auto bmi = dynamic_cast(m.get()); + // if saving, make the BMI's state and record its size and data + if (Archive::is_saving::value) { + LOG(LogLevel::DEBUG, "Saving state from sub-BMI " + bmi->get_model_type_name()); + boost::span span = bmi->get_serialization_state(); + data_size = span.size(); + ar & data_size; + ar & boost::serialization::make_array(span.data(), data_size); + // it's recommended to keep raw pointers alive throughout the entire seiralization process, + // so responsibility for freeing the BMIs' state is left to the caller of this function + } + // if loading, get the current data size stored at the front, then load that much data as a char blob passed to the BMI + else { + LOG(LogLevel::DEBUG, "Loading state from sub-BMI " + bmi->get_model_type_name()); + ar & data_size; + buffer.resize(data_size); + ar & boost::serialization::make_array(buffer.data(), data_size); + boost::span span(buffer.data(), data_size); + bmi->load_serialization_state(span); + } + } +} + //Function to find whether any item in the string vector is empty or blank int find_empty_string_index(const std::vector& str_vector) { for (int i = 0; i < str_vector.size(); ++i) { diff --git a/src/realizations/catchment/Bmi_Py_Formulation.cpp b/src/realizations/catchment/Bmi_Py_Formulation.cpp index 7d266db9f8..1712ec9cc7 100644 --- a/src/realizations/catchment/Bmi_Py_Formulation.cpp +++ b/src/realizations/catchment/Bmi_Py_Formulation.cpp @@ -1,5 +1,6 @@ #include #include "Logger.hpp" +#include "state_save_restore/State_Save_Utils.hpp" #if NGEN_WITH_PYTHON @@ -53,50 +54,30 @@ double Bmi_Py_Formulation::get_var_value_as_double(const int &index, const std:: std::string val_type = model->GetVarType(var_name); size_t val_item_size = (size_t)model->GetVarItemsize(var_name); + std::string cxx_type = model->get_analogous_cxx_type(val_type, val_item_size); //void *dest; int indices[1]; indices[0] = index; - - // The available types and how they are handled here should match what is in SetValueAtIndices - if (val_type == "int" && val_item_size == sizeof(short)) { - short dest; - model->get_value_at_indices(var_name, &dest, indices, 1, false); - return (double)dest; - } - if (val_type == "int" && val_item_size == sizeof(int)) { - int dest; - model->get_value_at_indices(var_name, &dest, indices, 1, false); - return (double)dest; - } - if (val_type == "int" && val_item_size == sizeof(long)) { - long dest; - model->get_value_at_indices(var_name, &dest, indices, 1, false); - return (double)dest; - } - if (val_type == "int" && val_item_size == sizeof(long long)) { - long long dest; - model->get_value_at_indices(var_name, &dest, indices, 1, false); - return (double)dest; - } - if (val_type == "float" || val_type == "float16" || val_type == "float32" || val_type == "float64") { - if (val_item_size == sizeof(float)) { - float dest; - model->get_value_at_indices(var_name, &dest, indices, 1, false); - return (double) dest; - } - if (val_item_size == sizeof(double)) { - double dest; - model->get_value_at_indices(var_name, &dest, indices, 1, false); - return dest; - } - if (val_item_size == sizeof(long double)) { - long double dest; - model->get_value_at_indices(var_name, &dest, indices, 1, false); - return (double) dest; - } - } - + // macro for both checking and converting based on type from get_analogous_cxx_type +#define PY_BMI_DOUBLE_AT_INDEX(type) if (cxx_type == #type) {\ + type dest;\ + model->get_value_at_indices(var_name, &dest, indices, 1, false);\ + return static_cast(dest);} + PY_BMI_DOUBLE_AT_INDEX(signed char) + else PY_BMI_DOUBLE_AT_INDEX(unsigned char) + else PY_BMI_DOUBLE_AT_INDEX(short) + else PY_BMI_DOUBLE_AT_INDEX(unsigned short) + else PY_BMI_DOUBLE_AT_INDEX(int) + else PY_BMI_DOUBLE_AT_INDEX(unsigned int) + else PY_BMI_DOUBLE_AT_INDEX(long) + else PY_BMI_DOUBLE_AT_INDEX(unsigned long) + else PY_BMI_DOUBLE_AT_INDEX(long long) + else PY_BMI_DOUBLE_AT_INDEX(unsigned long long) + else PY_BMI_DOUBLE_AT_INDEX(float) + else PY_BMI_DOUBLE_AT_INDEX(double) + else PY_BMI_DOUBLE_AT_INDEX(long double) +#undef PY_BMI_DOUBLE_AT_INDEX Logger::logMsgAndThrowError("Unable to get value of variable " + var_name + " from " + get_model_type_name() + " as double: no logic for converting variable type " + val_type); @@ -117,4 +98,10 @@ bool Bmi_Py_Formulation::is_model_initialized() const { return get_bmi_model()->is_model_initialized(); } +void Bmi_Py_Formulation::load_serialization_state(const boost::span state) { + auto bmi = std::dynamic_pointer_cast(get_bmi_model()); + // load the state through the set value function that does not enforce the input size is the same as the current BMI's size + bmi->set_value_unchecked(StateSaveNames::STATE, state.data(), state.size()); +} + #endif //NGEN_WITH_PYTHON diff --git a/src/realizations/catchment/CMakeLists.txt b/src/realizations/catchment/CMakeLists.txt index 48771b18bf..6df221d6af 100644 --- a/src/realizations/catchment/CMakeLists.txt +++ b/src/realizations/catchment/CMakeLists.txt @@ -3,6 +3,15 @@ dynamic_sourced_cxx_library(realizations_catchment "${CMAKE_CURRENT_SOURCE_DIR}" add_library(NGen::realizations_catchment ALIAS realizations_catchment) +# ----------------------------------------------------------------------------- +# Find the Boost library and configure usage +set(Boost_USE_STATIC_LIBS OFF) +set(Boost_USE_MULTITHREADED ON) +set(Boost_USE_STATIC_RUNTIME OFF) +find_package(Boost 1.79.0 REQUIRED COMPONENTS serialization) + +target_link_libraries(realizations_catchment PRIVATE Boost::serialization) + target_include_directories(realizations_catchment PUBLIC ${PROJECT_SOURCE_DIR}/include/core ${PROJECT_SOURCE_DIR}/include/core/catchment diff --git a/src/state_save_restore/CMakeLists.txt b/src/state_save_restore/CMakeLists.txt new file mode 100644 index 0000000000..cde58c6073 --- /dev/null +++ b/src/state_save_restore/CMakeLists.txt @@ -0,0 +1,15 @@ +include(${PROJECT_SOURCE_DIR}/cmake/dynamic_sourced_library.cmake) +dynamic_sourced_cxx_library(state_save_restore "${CMAKE_CURRENT_SOURCE_DIR}") + +add_library(NGen::state_save_restore ALIAS state_save_restore) +target_link_libraries(state_save_restore PUBLIC + NGen::config_header + Boost::boost # Headers-only Boost + Boost::system + Boost::filesystem + ) + +target_include_directories(state_save_restore PUBLIC + ${PROJECT_SOURCE_DIR}/include + ) + diff --git a/src/state_save_restore/File_Per_Unit.cpp b/src/state_save_restore/File_Per_Unit.cpp new file mode 100644 index 0000000000..80bce91418 --- /dev/null +++ b/src/state_save_restore/File_Per_Unit.cpp @@ -0,0 +1,196 @@ +#include +#include + +#if __has_include() && __cpp_lib_filesystem >= 201703L + #include + using namespace std::filesystem; + #warning "Using STD Filesystem" +#elif __has_include() && defined(__cpp_lib_filesystem) + #include + using namespace std::experimental::filesystem; + #warning "Using Filesystem TS" +#elif __has_include() + #include + using namespace boost::filesystem; + #warning "Using Boost.Filesystem" +#else + #error "No Filesystem library implementation available" +#endif + +#include +#include + +namespace unit_saving_utils { + std::string format_epoch(State_Saver::snapshot_time_t epoch) + { + time_t t = std::chrono::system_clock::to_time_t(epoch); + std::tm tm = *std::gmtime(&t); + + std::stringstream tss; + tss << std::put_time(&tm, "%Y-%m-%dT%H:%M:%S"); + return tss.str(); + } +} + +// This class is only declared and defined here, in the .cpp file, +// because it is strictly an implementation detail of the top-level +// File_Per_Unit_Saver class +class File_Per_Unit_Snapshot_Saver : public State_Snapshot_Saver +{ + friend class File_Per_Unit_Saver; + + public: + File_Per_Unit_Snapshot_Saver() = delete; + File_Per_Unit_Snapshot_Saver(path base_path, State_Saver::State_Durability durability); + ~File_Per_Unit_Snapshot_Saver(); + +public: + void save_unit(std::string const& unit_name, boost::span data) override; + void finish_saving() override; + +private: + path dir_path_; +}; + +File_Per_Unit_Saver::File_Per_Unit_Saver(std::string base_path) + : base_path_(std::move(base_path)) +{ + auto dir_path = path(base_path_); + create_directories(dir_path); +} + +File_Per_Unit_Saver::~File_Per_Unit_Saver() = default; + +std::shared_ptr File_Per_Unit_Saver::initialize_snapshot(State_Durability durability) { + // TODO + return std::make_shared(path(this->base_path_), durability); +} + +std::shared_ptr File_Per_Unit_Saver::initialize_checkpoint_snapshot(snapshot_time_t epoch, State_Durability durability) +{ + path checkpoint_path = path(this->base_path_) / unit_saving_utils::format_epoch(epoch); + create_directory(checkpoint_path); + return std::make_shared(checkpoint_path, durability); +} + +void File_Per_Unit_Saver::finalize() +{ + // nothing to be done +} + +File_Per_Unit_Snapshot_Saver::File_Per_Unit_Snapshot_Saver(path base_path, State_Saver::State_Durability durability) + : State_Snapshot_Saver(durability) + , dir_path_(base_path) +{ + create_directory(dir_path_); +} + +File_Per_Unit_Snapshot_Saver::~File_Per_Unit_Snapshot_Saver() = default; + +void File_Per_Unit_Snapshot_Saver::save_unit(std::string const& unit_name, boost::span data) +{ + auto file_path = dir_path_ / unit_name; + try { + std::ofstream stream(file_path.string(), std::ios_base::out | std::ios_base::binary); + stream.write(data.data(), data.size()); + stream.close(); + } catch (std::exception &e) { + LOG("Failed to write state save data for unit '" + unit_name + "' in file '" + file_path.string() + "'", LogLevel::WARNING); + throw; + } +} + +void File_Per_Unit_Snapshot_Saver::finish_saving() +{ + if (durability_ == State_Saver::State_Durability::strict) { + // fsync() or whatever + } +} + + +// This class is only declared and defined here, in the .cpp file, +// because it is strictly an implementation detail of the top-level +// File_Per_Unit_Saver class +class File_Per_Unit_Snapshot_Loader : public State_Snapshot_Loader +{ + friend class State_Snapshot_Loader; +public: + File_Per_Unit_Snapshot_Loader() = default; + File_Per_Unit_Snapshot_Loader(path dir_path); + ~File_Per_Unit_Snapshot_Loader() override = default; + + bool has_unit(const std::string &unit_name) override; + + /** + * Load data from whatever source and store it in the `data` vector. + * + * @param data The location where the loaded data will be stored. This will be resized to the amount of data loaded. + */ + void load_unit(const std::string &unit_name, std::vector &data) override; + + /** + * Execute logic to complete the saving process + * + * Data may be flushed here, and delayed errors may be detected + * and reported here. With relaxed durability, error reports may + * not come until the parent State_Saver::finalize() call is made, + * or ever. + */ + void finish_saving() override { }; + +private: + path dir_path_; + std::vector data_; +}; + +File_Per_Unit_Snapshot_Loader::File_Per_Unit_Snapshot_Loader(path dir_path) + : dir_path_(dir_path) +{ + +} + +bool File_Per_Unit_Snapshot_Loader::has_unit(const std::string &unit_name) { + auto file_path = dir_path_ / unit_name; + return exists(file_path.string()); +} + +void File_Per_Unit_Snapshot_Loader::load_unit(std::string const& unit_name, std::vector &data) { + auto file_path = dir_path_ / unit_name; + std::uintmax_t size; + try { + size = file_size(file_path.string()); + } catch (std::exception &e) { + LOG("Failed to read state save data size for unit '" + unit_name + "' in file '" + file_path.string() + "'", LogLevel::WARNING); + throw; + } + std::ifstream stream(file_path.string(), std::ios_base::binary); + if (!stream) { + LOG("Failed to open state save data for unit '" + unit_name + "' in file '" + file_path.string() + "'", LogLevel::WARNING); + throw; + } + try { + data.resize(size); + stream.read(data.data(), size); + } catch (std::exception &e) { + LOG("Failed to read state save data for unit '" + unit_name + "' in file '" + file_path.string() + "'", LogLevel::WARNING); + throw; + } +} + +File_Per_Unit_Loader::File_Per_Unit_Loader(std::string dir_path) + : dir_path_(dir_path) +{ + +} + +std::shared_ptr File_Per_Unit_Loader::initialize_snapshot() +{ + return std::make_shared(path(dir_path_)); +} + +std::shared_ptr File_Per_Unit_Loader::initialize_checkpoint_snapshot(State_Saver::snapshot_time_t epoch) +{ + path checkpoint_path = path(dir_path_) / unit_saving_utils::format_epoch(epoch);; + return std::make_shared(checkpoint_path); +} + diff --git a/src/state_save_restore/State_Save_Restore.cpp b/src/state_save_restore/State_Save_Restore.cpp new file mode 100644 index 0000000000..ee3f5ae3c9 --- /dev/null +++ b/src/state_save_restore/State_Save_Restore.cpp @@ -0,0 +1,145 @@ +#include +#include + +#include + +#include +#include + +#include + +State_Save_Config::State_Save_Config(boost::property_tree::ptree const& tree) +{ + auto maybe = tree.get_child_optional("state_saving"); + + // Default initialization will represent the "not enabled" case + if (!maybe) { + LOG("State saving not configured", LogLevel::INFO); + return; + } + + bool hot_start = false; + for (const auto& saving_config : *maybe) { + try { + auto& subtree = saving_config.second; + auto direction = subtree.get("direction"); + auto what = subtree.get("label"); + auto where = subtree.get("path"); + auto how = subtree.get("type"); + auto when = subtree.get("when"); + + instance i{direction, what, where, how, when}; + if (i.timing_ == State_Save_When::StartOfRun && i.direction_ == State_Save_Direction::Load) { + if (hot_start) + throw std::runtime_error("Only one hot start state saving configuration is allowed."); + hot_start = true; + } + instances_.push_back(i); + } catch (std::exception &e) { + LOG("Bad state saving config: " + std::string(e.what()), LogLevel::WARNING); + throw; + } + } + + LOG("State saving configured", LogLevel::INFO); +} + +std::vector>> State_Save_Config::start_of_run_loaders() const { + std::vector>> loaders; + for (const auto &i : this->instances_) { + if (i.timing_ == State_Save_When::StartOfRun && i.direction_ == State_Save_Direction::Load) { + if (i.mechanism_ == State_Save_Mechanism::FilePerUnit) { + auto loader = std::make_shared(i.path_); + auto pair = std::make_pair(i.label_, loader); + loaders.push_back(pair); + } else { + LOG(LogLevel::WARNING, "State_Save_Config: Loading mechanism " + i.mechanism_string() + " is not supported for start of run loading."); + } + } + } + return loaders; +} + +std::vector>> State_Save_Config::end_of_run_savers() const { + std::vector>> savers; + for (const auto &i : this->instances_) { + if (i.timing_ == State_Save_When::EndOfRun && i.direction_ == State_Save_Direction::Save) { + if (i.mechanism_ == State_Save_Mechanism::FilePerUnit) { + auto saver = std::make_shared(i.path_); + auto pair = std::make_pair(i.label_, saver); + savers.push_back(pair); + } else { + LOG(LogLevel::WARNING, "State_Save_Config: Saving mechanism " + i.mechanism_string() + " is not supported for start of run saving."); + } + } + } + return savers; +} + +std::unique_ptr State_Save_Config::hot_start() const { + for (const auto &i : this->instances_) { + if (i.direction_ == State_Save_Direction::Load && i.timing_ == State_Save_When::StartOfRun) { + if (i.mechanism_ == State_Save_Mechanism::FilePerUnit) { + return std::make_unique(i.path_); + } else { + LOG(LogLevel::WARNING, "State_Save_Config: Saving mechanism " + i.mechanism_string() + " is not supported for start of run saving."); + } + } + } + return std::unique_ptr(); +} + +State_Save_Config::instance::instance(std::string const& direction, std::string const& label, std::string const& path, std::string const& mechanism, std::string const& timing) + : label_(label) + , path_(path) +{ + if (direction == "save") { + direction_ = State_Save_Direction::Save; + } else if (direction == "load") { + direction_ = State_Save_Direction::Load; + } else { + Logger::logMsgAndThrowError("Unrecognized state saving direction '" + direction + "'"); + } + + if (mechanism == "FilePerUnit") { + mechanism_ = State_Save_Mechanism::FilePerUnit; + } else { + Logger::logMsgAndThrowError("Unrecognized state saving mechanism '" + mechanism + "'"); + } + + if (timing == "EndOfRun") { + timing_ = State_Save_When::EndOfRun; + } else if (timing == "FirstOfMonth") { + timing_ = State_Save_When::FirstOfMonth; + } else if (timing == "StartOfRun") { + timing_ = State_Save_When::StartOfRun; + } else { + Logger::logMsgAndThrowError("Unrecognized state saving timing '" + timing + "'"); + } +} + +std::string State_Save_Config::instance::instance::mechanism_string() const { + switch (mechanism_) { + case State_Save_Mechanism::None: + return "None"; + case State_Save_Mechanism::FilePerUnit: + return "FilePerUnit"; + default: + return "Other"; + } +} + +State_Snapshot_Saver::State_Snapshot_Saver(State_Saver::State_Durability durability) + : durability_(durability) +{ + +} + +State_Saver::snapshot_time_t State_Saver::snapshot_time_now() { +#if __cplusplus < 201703L // C++ < 17 + auto now = std::chrono::system_clock::now(); + return std::chrono::time_point_cast(now); +#else + return std::chrono::floor(std::chrono::system_clock::now()); +#endif +}