diff --git a/CMakeLists.txt b/CMakeLists.txt index 0fe9d22..c5112a6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -39,9 +39,9 @@ endif () ##################################### # simdjson FetchContent_Declare( - simdjson - GIT_REPOSITORY https://github.com/simdjson/simdjson.git - GIT_TAG v4.0.7 + nlohmann_json + GIT_REPOSITORY https://github.com/nlohmann/json.git + GIT_TAG v3.12.0 ) FetchContent_Declare( pybind11 @@ -49,8 +49,7 @@ FetchContent_Declare( GIT_TAG v3.0.1 ) -set(SIMDJSON_IMPLEMENTATION fallback CACHE STRING "" FORCE) -FetchContent_MakeAvailable(simdjson pybind11) +FetchContent_MakeAvailable(nlohmann_json pybind11) ##################################### # Sources and headers @@ -64,10 +63,9 @@ add_library(libcapio_cl STATIC ${CAPIO_SRC} ${CAPIO_CL_HEADERS}) target_include_directories(libcapio_cl PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/src - ${simdjson_SOURCE_DIR} + ${nlohmann_json_SOURCE_DIR}/include ) -target_link_libraries(libcapio_cl PUBLIC simdjson) ##################################### # Install rules diff --git a/README.md b/README.md index 1cab53e..831070e 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ At runtime, CAPIO-CL’s parser and engine components analyze, track, and manage ### Requirements & dependencies - C++17 or greater - Cmake 3.15 or newer -- [simdjson](https://github.com/simdjson/simdjson) to parse JSON config files +- [nlohmann/json](https://github.com/nlohmann/json) to parse JSON config files - [GoogleTest](https://github.com/google/googletest) for automated testing All dependencies are fetched automatically by CMake — no manual setup required. diff --git a/bindings/python_bindings.cpp b/bindings/python_bindings.cpp index 6525a8f..c612bba 100644 --- a/bindings/python_bindings.cpp +++ b/bindings/python_bindings.cpp @@ -48,6 +48,7 @@ PYBIND11_MODULE(py_capio_cl, m) { .def("isDirectory", &capiocl::Engine::isDirectory) .def("isStoredInMemory", &capiocl::Engine::isStoredInMemory) .def("isPermanent", &capiocl::Engine::isPermanent) + .def("setAllStoreInMemory", &capiocl::Engine::setAllStoreInMemory) .def("__str__", &capiocl::Engine::print) .def("__repr__", [](const capiocl::Engine &e) { return "(&e)) + ">"; diff --git a/capiocl.hpp b/capiocl.hpp index 21c3bc9..5558e5a 100644 --- a/capiocl.hpp +++ b/capiocl.hpp @@ -288,6 +288,9 @@ class Engine { */ void setStoreFileInMemory(const std::filesystem::path &path); + /// @brief set all files to be stored in memory + void setAllStoreInMemory(); + /** * @brief Store the file on the file system. * @@ -430,7 +433,7 @@ class Parser { * the config file */ static std::tuple parse(const std::filesystem::path &source, - const std::filesystem::path &resolve_prexix, + const std::filesystem::path &resolve_prexix = "", bool store_only_in_memory = false); }; } // namespace capiocl diff --git a/src/Engine.cpp b/src/Engine.cpp index 98a080c..81eca5f 100644 --- a/src/Engine.cpp +++ b/src/Engine.cpp @@ -479,6 +479,11 @@ void capiocl::Engine::setStoreFileInMemory(const std::filesystem::path &path) { this->newFile(path); std::get<10>(_locations.at(path)) = true; } +void capiocl::Engine::setAllStoreInMemory() { + for (const auto &[fst, snd] : _locations) { + this->setStoreFileInMemory(fst); + } +} void capiocl::Engine::setStoreFileInFileSystem(const std::filesystem::path &path) { this->newFile(path); diff --git a/src/Parser.cpp b/src/Parser.cpp index 4afaf4a..28d3679 100644 --- a/src/Parser.cpp +++ b/src/Parser.cpp @@ -1,6 +1,7 @@ #include "capiocl.hpp" -#include "singleheader/simdjson.h" #include +#include +#include bool capiocl::Parser::isInteger(const std::string &s) { START_LOG(gettid(), "call(%s)", s.c_str()); @@ -21,18 +22,12 @@ bool capiocl::Parser::firstIsSubpathOfSecond(const std::filesystem::path &path, std::tuple capiocl::Parser::parse(const std::filesystem::path &source, - const std::filesystem::path &resolve_prexix, bool store_only_in_memory) { + const std::filesystem::path &resolve_prefix, bool store_only_in_memory) { std::string workflow_name = CAPIO_CL_DEFAULT_WF_NAME; auto locations = new Engine(); START_LOG(gettid(), "call(config_file='%s')", source.c_str()); - /* - * Before here a call to get_capio_dir() was issued. However, to support multiple CAPIO_DIRs - * there is no difference to use the wildcard * instead of CAPIO_DIR. This is true as only - * paths relative to the capio_dir directory are forwarded to the server, and as such, there - * is no difference that to create a ROOT dir equal to CAPIO_DIR compared to the wildcard *. - */ locations->newFile("*"); locations->setDirectory("*"); if (store_only_in_memory) { @@ -43,382 +38,232 @@ capiocl::Parser::parse(const std::filesystem::path &source, return {workflow_name, locations}; } - simdjson::ondemand::parser parser; - simdjson::padded_string json; - simdjson::ondemand::document entries; - simdjson::ondemand::array input_stream, output_stream, streaming, permanent_files, - exclude_files, storage_memory, storage_fs; - simdjson::ondemand::object storage_section; - simdjson::error_code error; + // ---- Load JSON ---- + std::ifstream file(source); + if (!file.is_open()) { + std::string msg = "Failed to open config file: " + source.string(); + print_message(CLI_LEVEL_ERROR, msg); + ERR_EXIT(msg.c_str()); + } + nlohmann::json doc; try { - json = simdjson::padded_string::load(source.c_str()); - } catch (const simdjson::simdjson_error &e) { - - const std::string message_line = - std::string("Exception thrown while opening config file:") + e.what(); - print_message(CLI_LEVEL_ERROR, message_line); - LOG(errmessg.c_str()); - ERR_EXIT(errmessg.c_str()); + file >> doc; + } catch (const std::exception &e) { + std::string err = "JSON parse error: "; + err += e.what(); + print_message(CLI_LEVEL_ERROR, err); + ERR_EXIT(err.c_str()); } - entries = parser.iterate(json); - std::string_view wf_name; - error = entries["name"].get_string().get(wf_name); - if (error) { + // ---- workflow name ---- + if (!doc.contains("name") || !doc["name"].is_string()) { + print_message(CLI_LEVEL_ERROR, "Missing workflow name!"); ERR_EXIT("Error: workflow name is mandatory"); } - workflow_name = std::string(wf_name); + workflow_name = doc["name"].get(); print_message(CLI_LEVEL_JSON, "Parsing configuration for workflow: " + workflow_name); + LOG("Parsing configuration for workflow: %s", workflow_name.c_str()); - LOG("Parsing configuration for workflow: %s", std::string(workflow_name).c_str()); - - print_message(CLI_LEVEL_JSON, ""); - - auto io_graph = entries["IO_Graph"]; + // ---- IO_Graph ---- + if (!doc.contains("IO_Graph") || !doc["IO_Graph"].is_array()) { + ERR_EXIT("Error: IO_Graph section missing or invalid"); + } - for (auto app : io_graph) { - std::string_view app_name; - error = app["name"].get_string().get(app_name); - if (error) { + for (const auto &app : doc["IO_Graph"]) { + if (!app.contains("name") || !app["name"].is_string()) { ERR_EXIT("Error: app name is mandatory"); } - print_message(CLI_LEVEL_JSON, "Parsing config for app " + std::string(app_name)); - LOG("Parsing config for app %s", std::string(app_name).c_str()); + std::string app_name = app["name"].get(); + print_message(CLI_LEVEL_JSON, "Parsing config for app " + app_name); + LOG("Parsing config for app %s", app_name.c_str()); - if (app["input_stream"].get_array().get(input_stream)) { - print_message(CLI_LEVEL_ERROR, - "No input_stream section found for app " + std::string(app_name)); - ERR_EXIT("No input_stream section found for app %s", std::string(app_name).c_str()); + // ---- input_stream ---- + if (!app.contains("input_stream") || !app["input_stream"].is_array()) { + std::string msg = "No input_stream section found for app " + app_name; + print_message(CLI_LEVEL_ERROR, msg); + ERR_EXIT(msg.c_str()); } - for (auto itm : input_stream) { - std::filesystem::path file(itm.get_string().take_value()); - - print_message(CLI_LEVEL_JSON, "Found file " + std::string(file)); - - if (file.is_relative()) { + for (const auto &itm : app["input_stream"]) { + std::filesystem::path file_path(itm.get()); + if (file_path.is_relative()) { print_message(CLI_LEVEL_WARNING, - "Path : " + std::string(file) + - " IS RELATIVE! using cwd() of server to compute abs path."); - file = resolve_prexix / file; + "Path : " + file_path.string() + " IS RELATIVE! resolving..."); + file_path = resolve_prefix / file_path; } - std::string appname(app_name); - - print_message(CLI_LEVEL_JSON, "Path : " + std::string(file) + - " added to app: " + std::string(app_name)); - - locations->newFile(file.c_str()); - locations->addConsumer(file, appname); + locations->newFile(file_path.c_str()); + locations->addConsumer(file_path, app_name); } - print_message(CLI_LEVEL_JSON, - "Completed input_stream parsing for app: " + std::string(app_name)); - - LOG("Completed input_stream parsing for app: %s", std::string(app_name).c_str()); - - if (app["output_stream"].get_array().get(output_stream)) { - - print_message(CLI_LEVEL_ERROR, - "No output_stream section found for app " + std::string(app_name)); - ERR_EXIT("No output_stream section found for app %s", std::string(app_name).c_str()); + // ---- output_stream ---- + if (!app.contains("output_stream") || !app["output_stream"].is_array()) { + std::string msg = "No output_stream section found for app " + app_name; + print_message(CLI_LEVEL_ERROR, msg); + ERR_EXIT(msg.c_str()); } - for (auto itm : output_stream) { - std::filesystem::path file(itm.get_string().take_value()); - if (file.is_relative()) { - if (file.is_relative()) { - print_message(CLI_LEVEL_WARNING, - "Path : " + std::string(file) + - " IS RELATIVE! using cwd() of server to compute abs path."); - file = resolve_prexix / file; - } - } - std::string appname(app_name); - - print_message(CLI_LEVEL_JSON, - "Adding file: " + std::string(file) + " to app: " + appname); - locations->newFile(file); - locations->addProducer(file, appname); + for (const auto &itm : app["output_stream"]) { + std::filesystem::path file_path(itm.get()); + if (file_path.is_relative()) { + print_message(CLI_LEVEL_WARNING, + "Path : " + file_path.string() + " IS RELATIVE! resolving..."); + file_path = resolve_prefix / file_path; + } + locations->newFile(file_path); + locations->addProducer(file_path, app_name); } - print_message(CLI_LEVEL_JSON, - "Completed output_stream parsing for app: " + std::string(app_name)); - LOG("Completed output_stream parsing for app: %s", std::string(app_name).c_str()); - - // PARSING STREAMING FILES - if (app["streaming"].get_array().get(streaming)) { - print_message(CLI_LEVEL_WARNING, - "No Streaming section found for app " + std::string(app_name)); - LOG("No streaming section found for app: %s", std::string(app_name).c_str()); - } else { - LOG("Began parsing streaming section for app %s", std::string(app_name).c_str()); - for (auto file : streaming) { - std::string_view committed, mode, commit_rule; + // ---- streaming ---- + if (app.contains("streaming") && app["streaming"].is_array()) { + for (const auto &stream_item : app["streaming"]) { + bool is_file = true; std::vector streaming_names; std::vector file_deps; + std::string commit_rule = COMMITTED_ON_TERMINATION, mode = MODE_UPDATE; long int n_close = -1; int64_t n_files = -1; - bool is_file = true; - - simdjson::ondemand::array name; - error = file["name"].get_array().get(name); - if (error || name.is_empty()) { - error = file["dirname"].get_array().get(name); - if (error || name.is_empty()) { - print_message( - CLI_LEVEL_ERROR, - "error: either name or dirname in streaming section is required"); - ERR_EXIT("error: either name or dirname in streaming section is required"); + + // name or dirname + if (stream_item.contains("name") && stream_item["name"].is_array()) { + for (const auto &nm : stream_item["name"]) { + std::filesystem::path p(nm.get()); + if (p.is_relative()) { + p = resolve_prefix / p; + } + streaming_names.push_back(p); } + } else if (stream_item.contains("dirname") && stream_item["dirname"].is_array()) { is_file = false; - } - - for (auto item : name) { - std::string_view elem = item.get_string().value(); - LOG("Found name: %s", std::string(elem).c_str()); - std::filesystem::path file_fs(elem); - if (file_fs.is_relative()) { - print_message( - CLI_LEVEL_WARNING, - "Path : " + std::string(file_fs) + - " IS RELATIVE! using cwd() of server to compute abs path."); - file_fs = resolve_prexix / file_fs; + for (const auto &nm : stream_item["dirname"]) { + std::filesystem::path p(nm.get()); + if (p.is_relative()) { + p = resolve_prefix / p; + } + streaming_names.push_back(p); } - LOG("Saving file %s to locations", std::string(elem).c_str()); - streaming_names.emplace_back(file_fs); + } else { + ERR_EXIT("error: either name or dirname in streaming section is required"); } - // PARSING COMMITTED - error = file["committed"].get_string().get(committed); - if (error) { - print_message(CLI_LEVEL_ERROR, "commit rule is mandatory in streaming section"); - ERR_EXIT("error commit rule is mandatory in streaming section"); + // committed + if (!stream_item.contains("committed") || !stream_item["committed"].is_string()) { + ERR_EXIT("commit rule is mandatory in streaming section"); } - auto pos = committed.find(':'); + + std::string committed = stream_item["committed"].get(); + auto pos = committed.find(':'); if (pos != std::string::npos) { - commit_rule = committed.substr(0, pos); - std::string count_str(committed.substr(pos + 1, committed.length())); + commit_rule = committed.substr(0, pos); + std::string count_str = committed.substr(pos + 1); if (!isInteger(count_str)) { - print_message(CLI_LEVEL_ERROR, - "commit rule on_close/n_files invalid number"); - ERR_EXIT("error commit rule on_close invalid number: !is_int()"); + ERR_EXIT("invalid number in commit rule"); } - if (commit_rule == COMMITTED_ON_CLOSE) { n_close = std::stol(count_str); } else if (commit_rule == COMMITTED_N_FILES) { n_files = std::stol(count_str); } else { - print_message(CLI_LEVEL_ERROR, - "Invalid commit rule: " + std::string(commit_rule)); - ERR_EXIT("error commit rule: %s for operand ':'. Either on_close or " - "n_files is expected", - std::string(commit_rule).c_str()); + ERR_EXIT("invalid commit rule type"); } - } else { commit_rule = committed; } - // check for committed on file: + // file_deps if (commit_rule == COMMITTED_ON_FILE) { - simdjson::ondemand::array file_deps_tmp; - error = file["file_deps"].get_array().get(file_deps_tmp); - - if (error) { - print_message(CLI_LEVEL_ERROR, - "commit rule is on_file but no file_deps section found"); + if (!stream_item.contains("file_deps") || + !stream_item["file_deps"].is_array()) { ERR_EXIT("commit rule is on_file but no file_deps section found"); } - - std::string_view name_tmp; - for (auto itm : file_deps_tmp) { - name_tmp = itm.get_string().value(); - std::filesystem::path computed_path(name_tmp); - if (computed_path.is_relative()) { - print_message( - CLI_LEVEL_WARNING, - "Path : " + std::string(computed_path) + - " IS RELATIVE! using cwd() of server to compute abs path."); - computed_path = resolve_prexix / computed_path; + for (const auto &dep : stream_item["file_deps"]) { + std::filesystem::path p(dep.get()); + if (p.is_relative()) { + p = resolve_prefix / p; } - print_message(CLI_LEVEL_JSON, "Adding file: " + std::string(computed_path) + - " to file dependencies: "); - - file_deps.emplace_back(computed_path); + file_deps.push_back(p); } } - LOG("Committed: %s", std::string(committed).c_str()); - // END PARSING COMMITTED - - error = file["mode"].get_string().get(mode); - if (error) { - mode = MODE_UPDATE; + // mode + if (stream_item.contains("mode") && stream_item["mode"].is_string()) { + mode = stream_item["mode"].get(); } - LOG("Mode: %s", std::string(mode).c_str()); - if (n_files == -1) { - error = file["n_files"].get_int64().get(n_files); - if (error && n_files != -1) { - n_files = -1; - } + // n_files (optional) + if (stream_item.contains("n_files") && stream_item["n_files"].is_number_integer()) { + n_files = stream_item["n_files"].get(); } - LOG("n_files: %d", n_files); - - for (auto path : streaming_names) { - print_message(CLI_LEVEL_JSON, - "Updating metadata for path: " + std::string(path)); - - if (path.is_relative()) { - print_message( - CLI_LEVEL_WARNING, - "Path : " + std::string(path) + - " IS RELATIVE! using cwd() of server to compute abs path."); - path = resolve_prexix / path; - } - LOG("path: %s", path.c_str()); + for (auto &path : streaming_names) { if (n_files != -1) { - - print_message(CLI_LEVEL_JSON, "Setting path: " + std::string(path) + - " n_files to " + std::to_string(n_files)); locations->setDirectoryFileCount(path, n_files); } - - is_file ? locations->setFile(path) : locations->setDirectory(path); - locations->setCommitRule(path, commit_rule.data()); - locations->setFireRule(path, mode.data()); + if (is_file) { + locations->setFile(path); + } else { + locations->setDirectory(path); + } + locations->setCommitRule(path, commit_rule); + locations->setFireRule(path, mode); locations->setCommitedCloseNumber(path, n_close); locations->setFileDeps(path, file_deps); } } + } + } - print_message(CLI_LEVEL_JSON, "completed parsing of streaming section for app: " + - std::string(app_name)); - LOG("completed parsing of streaming section for app: %s", - std::string(app_name).c_str()); - } // END PARSING STREAMING FILES - print_message(CLI_LEVEL_JSON, ""); - } // END OF APP MAIN LOOPS - LOG("Completed parsing of io_graph app main loops"); - - print_message(CLI_LEVEL_JSON, "Completed parsing of io_graph"); - LOG("Completed parsing of io_graph"); - - if (entries["permanent"].get_array().get(permanent_files)) { - // PARSING PERMANENT FILES - print_message(CLI_LEVEL_WARNING, - "No permanent section found for workflow: " + workflow_name); - LOG("No permanent section found for workflow: %s", workflow_name.c_str()); - } else { - for (auto file : permanent_files) { - std::string_view name; - error = file.get_string().get(name); - if (error) { - ERR_EXIT("error name for permanent section is mandatory"); - } - LOG("Permanent name: %s", std::string(name).c_str()); - - std::filesystem::path path(name); - + // ---- permanent ---- + if (doc.contains("permanent") && doc["permanent"].is_array()) { + for (const auto &item : doc["permanent"]) { + std::filesystem::path path(item.get()); if (path.is_relative()) { - print_message(CLI_LEVEL_WARNING, - "Path : " + std::string(path) + - " IS RELATIVE! using cwd() of server to compute abs path."); - path = resolve_prexix / path; + path = resolve_prefix / path; } - + locations->newFile(path); locations->setPermanent(path, true); } - print_message(CLI_LEVEL_JSON, "Completed parsing of permanent files"); - LOG("Completed parsing of permanent files"); - } // END PARSING PERMANENT FILES - - print_message(CLI_LEVEL_JSON, ""); - - if (entries["exclude"].get_array().get(exclude_files)) { - // PARSING PERMANENT FILES - print_message(CLI_LEVEL_WARNING, "No exclude section found for workflow: " + workflow_name); - LOG("No exclude section found for workflow: %s", std::string(workflow_name).c_str()); - } else { - for (auto file : exclude_files) { - std::string_view name; - error = file.get_string().get(name); - if (error) { - ERR_EXIT("error name for exclude section is mandatory"); - } - LOG("exclude name: %s", std::string(name).c_str()); - - std::filesystem::path path(name); + } + // ---- exclude ---- + if (doc.contains("exclude") && doc["exclude"].is_array()) { + for (const auto &item : doc["exclude"]) { + std::filesystem::path path(item.get()); if (path.is_relative()) { - print_message(CLI_LEVEL_WARNING, - "Path : " + std::string(path) + - " IS RELATIVE! using cwd() of server to compute abs path."); - path = resolve_prexix / path; + path = resolve_prefix / path; } locations->newFile(path); locations->setExclude(path, true); } - - print_message(CLI_LEVEL_JSON, "Completed parsing of exclude files"); - LOG("Completed parsing of exclude files"); - } // END PARSING PERMANENT FILES - - print_message(CLI_LEVEL_JSON, ""); - - auto home_node_policies = entries["home_node_policy"].error(); - if (!home_node_policies) { - print_message(CLI_LEVEL_WARNING, - "Warning: capio does not support home node policies yet! skipping section "); } - if (entries["storage"].get_object().get(storage_section)) { - print_message(CLI_LEVEL_WARNING, "No storage section found for workflow: " + workflow_name); - LOG("No storage section found for workflow: %s", std::string(workflow_name).c_str()); - } else { - if (storage_section["memory"].get_array().get(storage_memory)) { - print_message(CLI_LEVEL_WARNING, - "No files listed in memory storage section for workflow: " + - workflow_name); - LOG("No files listed in memory storage section for workflow: %s", - std::string(workflow_name).c_str()); - } else { - for (auto file : storage_memory) { - if (std::string_view file_str; !file.get_string().get(file_str)) { - print_message(CLI_LEVEL_JSON, "Setting file " + std::string(file_str) + - " to be stored in memory"); - locations->setStoreFileInMemory(file_str); - } else { - print_message(CLI_LEVEL_WARNING, "Unbale to obtain file"); - } + // ---- storage ---- + if (doc.contains("storage") && doc["storage"].is_object()) { + const auto &storage = doc["storage"]; + + if (storage.contains("memory") && storage["memory"].is_array()) { + for (const auto &f : storage["memory"]) { + auto file_str = f.get(); + locations->setStoreFileInMemory(file_str); } } - if (storage_section["fs"].get_array().get(storage_fs)) { - print_message(CLI_LEVEL_WARNING, - "No files listed in fs storage section for workflow: " + workflow_name); - LOG("No files listed in fs storage section for workflow: %s", - std::string(workflow_name).c_str()); - } else { - for (auto file : storage_fs) { - if (std::string_view file_str; !file.get_string().get(file_str)) { - print_message(CLI_LEVEL_JSON, "Setting file " + std::string(file_str) + - " to be stored on file system"); - locations->setStoreFileInFileSystem(file_str); - } else { - print_message(CLI_LEVEL_WARNING, "Unbale to obtain file"); - } + if (storage.contains("fs") && storage["fs"].is_array()) { + for (const auto &f : storage["fs"]) { + auto file_str = f.get(); + locations->setStoreFileInFileSystem(file_str); } } - print_message(CLI_LEVEL_JSON, "Completed parsing of memory storage directives"); } - print_message(CLI_LEVEL_JSON, ""); + // ---- Store only in memory ---- + + if (store_only_in_memory) { + print_message(CLI_LEVEL_INFO, "Storing all files in memory"); + locations->setAllStoreInMemory(); + } return {workflow_name, locations}; -} \ No newline at end of file +}