diff --git a/bindings/python_bindings.cpp b/bindings/python_bindings.cpp index 9a0f50e..23ea867 100644 --- a/bindings/python_bindings.cpp +++ b/bindings/python_bindings.cpp @@ -8,6 +8,14 @@ namespace py = pybind11; PYBIND11_MODULE(_py_capio_cl, m) { m.doc() = "CAPIO-CL: Cross Application Programmable I/O - Coordination Language python bindings."; + + m.attr("MODE_UPDATE") = py::str(capiocl::MODE_UPDATE); + m.attr("MODE_NO_UPDATE") = py::str(capiocl::MODE_NO_UPDATE); + m.attr("COMMITTED_ON_CLOSE") = py::str(capiocl::COMMITTED_ON_CLOSE); + m.attr("COMMITTED_ON_FILE") = py::str(capiocl::COMMITTED_ON_FILE); + m.attr("COMMITTED_N_FILES") = py::str(capiocl::COMMITTED_N_FILES); + m.attr("COMMITTED_ON_TERMINATION") = py::str(capiocl::COMMITTED_ON_TERMINATION); + py::class_( m, "Engine", "The main CAPIO-CL engine for managing data communication and I/O operations.") .def(py::init<>()) @@ -54,20 +62,25 @@ PYBIND11_MODULE(_py_capio_cl, m) { return "(&e)) + ">"; }); - m.attr("MODE_UPDATE") = py::str(capiocl::MODE_UPDATE); - m.attr("MODE_NO_UPDATE") = py::str(capiocl::MODE_NO_UPDATE); - m.attr("COMMITTED_ON_CLOSE") = py::str(capiocl::COMMITTED_ON_CLOSE); - m.attr("COMMITTED_ON_FILE") = py::str(capiocl::COMMITTED_ON_FILE); - m.attr("COMMITTED_N_FILES") = py::str(capiocl::COMMITTED_N_FILES); - m.attr("COMMITTED_ON_TERMINATION") = py::str(capiocl::COMMITTED_ON_TERMINATION); - py::class_(m, "Parser", "The CAPIO-CL Parser component.") .def("parse", &capiocl::Parser::parse) .def("__str__", [](const capiocl::Parser &e) { return "(&e)) + ">"; }) - .def("__repr__", [](const capiocl::Engine &e) { + .def("__repr__", [](const capiocl::Parser &e) { return "(&e)) + ">"; }); + + py::class_(m, "Serializer", "The CAPIO-CL Serializer component.") + .def(py::init<>()) + .def("dump", &capiocl::Serializer::dump) + .def("__str__", + [](const capiocl::Serializer &e) { + return "(&e)) + + ">"; + }) + .def("__repr__", [](const capiocl::Serializer &e) { + return "(&e)) + ">"; + }); } \ No newline at end of file diff --git a/capiocl.hpp b/capiocl.hpp index 5558e5a..8459a1a 100644 --- a/capiocl.hpp +++ b/capiocl.hpp @@ -24,6 +24,7 @@ #endif namespace capiocl { +class Serializer; constexpr char CAPIO_CL_DEFAULT_WF_NAME[] = "CAPIO_CL"; @@ -78,7 +79,7 @@ inline void print_message(const std::string &message_type = "", * - Storage policy (in-memory or on filesystem) */ class Engine { - + friend class capiocl::Serializer; std::string node_name; /** @@ -433,9 +434,22 @@ class Parser { * the config file */ static std::tuple parse(const std::filesystem::path &source, - const std::filesystem::path &resolve_prexix = "", + std::filesystem::path &resolve_prexix, bool store_only_in_memory = false); }; + +class Serializer { + public: + /** + * Dump the current configuration loaded into the Engine to a CAPIO-CL configuration file. + * + * @param engine instance of @class capiocl::Engine to dump + * @param workflow_name Name of the current workflow + * @param filename path of output file @param filename + */ + static void dump(const Engine &engine, const std::string workflow_name, + const std::filesystem::path &filename); +}; } // namespace capiocl #endif // CAPIO_CL_CAPIOCL_HPP diff --git a/src/Engine.cpp b/src/Engine.cpp index 81eca5f..fe240d2 100644 --- a/src/Engine.cpp +++ b/src/Engine.cpp @@ -110,7 +110,7 @@ void capiocl::Engine::print() const { } else { line << std::setfill(' ') << std::setw(20) << "|" << std::setfill(' ') << std::setw(13) << "|" << std::setfill(' ') << std::setw(12) << "|" - << std::setfill(' ') << std::setw(10) << "|" << std::setw(10) << "|"; + << std::setfill(' ') << std::setw(10) << "|" << std::setw(11) << "|"; } print_message(CLI_LEVEL_JSON, line.str()); diff --git a/src/Parser.cpp b/src/Parser.cpp index 28d3679..76daaf4 100644 --- a/src/Parser.cpp +++ b/src/Parser.cpp @@ -21,13 +21,17 @@ bool capiocl::Parser::firstIsSubpathOfSecond(const std::filesystem::path &path, } std::tuple -capiocl::Parser::parse(const std::filesystem::path &source, - const std::filesystem::path &resolve_prefix, bool store_only_in_memory) { +capiocl::Parser::parse(const std::filesystem::path &source, std::filesystem::path &resolve_prefix, + bool store_only_in_memory) { std::string workflow_name = CAPIO_CL_DEFAULT_WF_NAME; auto locations = new Engine(); START_LOG(gettid(), "call(config_file='%s')", source.c_str()); + if (resolve_prefix.empty()) { + resolve_prefix = "."; + } + locations->newFile("*"); locations->setDirectory("*"); if (store_only_in_memory) { @@ -73,6 +77,7 @@ capiocl::Parser::parse(const std::filesystem::path &source, for (const auto &app : doc["IO_Graph"]) { if (!app.contains("name") || !app["name"].is_string()) { + print_message(CLI_LEVEL_ERROR, "Missing IO_Graph name or name is not a valid string!"); ERR_EXIT("Error: app name is mandatory"); } @@ -87,6 +92,7 @@ capiocl::Parser::parse(const std::filesystem::path &source, ERR_EXIT(msg.c_str()); } + print_message(CLI_LEVEL_JSON, "Parsing input_stream for app " + app_name); for (const auto &itm : app["input_stream"]) { std::filesystem::path file_path(itm.get()); if (file_path.is_relative()) { @@ -94,7 +100,7 @@ capiocl::Parser::parse(const std::filesystem::path &source, "Path : " + file_path.string() + " IS RELATIVE! resolving..."); file_path = resolve_prefix / file_path; } - locations->newFile(file_path.c_str()); + locations->newFile(file_path); locations->addConsumer(file_path, app_name); } @@ -104,7 +110,7 @@ capiocl::Parser::parse(const std::filesystem::path &source, print_message(CLI_LEVEL_ERROR, msg); ERR_EXIT(msg.c_str()); } - + print_message(CLI_LEVEL_JSON, "Parsing output_stream for app " + app_name); for (const auto &itm : app["output_stream"]) { std::filesystem::path file_path(itm.get()); if (file_path.is_relative()) { @@ -118,6 +124,7 @@ capiocl::Parser::parse(const std::filesystem::path &source, // ---- streaming ---- if (app.contains("streaming") && app["streaming"].is_array()) { + print_message(CLI_LEVEL_JSON, "Parsing streaming for app " + app_name); for (const auto &stream_item : app["streaming"]) { bool is_file = true; std::vector streaming_names; @@ -145,51 +152,79 @@ capiocl::Parser::parse(const std::filesystem::path &source, streaming_names.push_back(p); } } else { + print_message( + CLI_LEVEL_ERROR, + "Missing streaming name/dirname, or name/dirname is not an array for app " + + app_name); ERR_EXIT("error: either name or dirname in streaming section is required"); } - // committed - if (!stream_item.contains("committed") || !stream_item["committed"].is_string()) { - ERR_EXIT("commit rule is mandatory in streaming section"); - } - - std::string committed = stream_item["committed"].get(); - auto pos = committed.find(':'); - if (pos != std::string::npos) { - commit_rule = committed.substr(0, pos); - std::string count_str = committed.substr(pos + 1); - if (!isInteger(count_str)) { - ERR_EXIT("invalid number in commit rule"); + // Commit rule. Optional in nature, hence no check required! + if (stream_item.contains("committed")) { + if (!stream_item["committed"].is_string()) { + print_message(CLI_LEVEL_ERROR, "Error: invalid type for commit rule!"); + ERR_EXIT("Error: invalid type for commit rule!"); } - if (commit_rule == COMMITTED_ON_CLOSE) { - n_close = std::stol(count_str); - } else if (commit_rule == COMMITTED_N_FILES) { - n_files = std::stol(count_str); + + std::string committed = stream_item["committed"].get(); + auto pos = committed.find(':'); + if (pos != std::string::npos) { + commit_rule = committed.substr(0, pos); + std::string count_str = committed.substr(pos + 1); + if (!isInteger(count_str)) { + ERR_EXIT("invalid number in commit rule"); + } + if (commit_rule == COMMITTED_ON_CLOSE) { + n_close = std::stol(count_str); + } else if (commit_rule == COMMITTED_N_FILES) { + n_files = std::stol(count_str); + } else { + ERR_EXIT("invalid commit rule type"); + } } else { - ERR_EXIT("invalid commit rule type"); + commit_rule = committed; } - } else { - commit_rule = committed; - } - // file_deps - if (commit_rule == COMMITTED_ON_FILE) { - if (!stream_item.contains("file_deps") || - !stream_item["file_deps"].is_array()) { - ERR_EXIT("commit rule is on_file but no file_deps section found"); - } - for (const auto &dep : stream_item["file_deps"]) { - std::filesystem::path p(dep.get()); - if (p.is_relative()) { - p = resolve_prefix / p; + // file_deps + if (commit_rule == COMMITTED_ON_FILE) { + if (!stream_item.contains("file_deps") || + !stream_item["file_deps"].is_array()) { + ERR_EXIT("commit rule is on_file but no file_deps section found"); } - file_deps.push_back(p); + for (const auto &dep : stream_item["file_deps"]) { + std::filesystem::path p(dep.get()); + if (p.is_relative()) { + p = resolve_prefix / p; + } + file_deps.push_back(p); + } + } + + // check commit rule is one of the available + if (commit_rule != capiocl::COMMITTED_N_FILES && + commit_rule != capiocl::COMMITTED_ON_CLOSE && + commit_rule != capiocl::COMMITTED_ON_FILE && + commit_rule != capiocl::COMMITTED_ON_TERMINATION) { + print_message(CLI_LEVEL_ERROR, "Error: commit rule " + commit_rule + + " is not one of the allowed one!"); + ERR_EXIT("Unknown commit rule %s", commit_rule.c_str()); } } - // mode - if (stream_item.contains("mode") && stream_item["mode"].is_string()) { + // Firing rule. Optional in nature, hence no check required! + if (stream_item.contains("mode")) { + if (!stream_item["mode"].is_string()) { + print_message(CLI_LEVEL_ERROR, "Error: invalid type for mode"); + ERR_EXIT("Error: invalid type for mode"); + } mode = stream_item["mode"].get(); + + if (mode != capiocl::MODE_UPDATE && mode != capiocl::MODE_NO_UPDATE) { + print_message(CLI_LEVEL_ERROR, + "Error: invalid firing rule provided for app: " + app_name); + ERR_EXIT("Error: invalid firing rule provided for app: %s", + app_name.c_str()); + } } // n_files (optional) @@ -206,6 +241,14 @@ capiocl::Parser::parse(const std::filesystem::path &source, } else { locations->setDirectory(path); } + + print_message(CLI_LEVEL_INFO, + "App: " + app_name + " - " + "path: " + path.string() + " - " + + "committed: " + commit_rule + " - " + "mode: " + mode + + " - " + "n_files: " + std::to_string(n_files) + " - " + + "n_close: " + std::to_string(n_close)); + print_message(CLI_LEVEL_INFO, ""); + locations->setCommitRule(path, commit_rule); locations->setFireRule(path, mode); locations->setCommitedCloseNumber(path, n_close); diff --git a/src/Serializer.cpp b/src/Serializer.cpp new file mode 100644 index 0000000..89d989b --- /dev/null +++ b/src/Serializer.cpp @@ -0,0 +1,164 @@ +#include "capiocl.hpp" +#include +#include +#include + +void capiocl::Serializer::dump(const capiocl::Engine &engine, const std::string workflow_name, + const std::filesystem::path &filename) { + START_LOG(gettid(), "call(output='%s')", target.c_str()); + + nlohmann::json doc; + doc["name"] = workflow_name; + + // Retrieve the files map + const auto *files = engine.getLocations(); // adjust if it's a different getter + + // Build helper maps: app → inputs / outputs + std::unordered_map> app_inputs; + std::unordered_map> app_outputs; + + // For permanent/exclude/storage + std::vector permanent; + std::vector exclude; + std::vector memory_storage; + std::vector fs_storage; + + nlohmann::json io_graph = nlohmann::json::array(); + + // We'll also need a mapping of each file → its metadata for streaming reconstruction + for (const auto &[path, data] : *files) { + const auto &[producers, consumers, commit_rule, fire_rule, permanent_flag, excluded_flag, + is_file, n_close, n_dir_files, file_deps, store_in_memory] = data; + if (path == "*") { + LOG("Skipping * path"); + print_message(CLI_LEVEL_WARNING, "Skipping * path"); + continue; + } + + // Collect permanent/exclude info + if (permanent_flag) { + permanent.push_back(path); + } + if (excluded_flag) { + exclude.push_back(path); + } + + // Collect storage info + if (store_in_memory) { + memory_storage.push_back(path); + } else { + fs_storage.push_back(path); + } + + // Collect app relationships + for (const auto &p : producers) { + app_outputs[p].push_back(path); + } + for (const auto &c : consumers) { + app_inputs[c].push_back(path); + } + } + + // Construct IO_Graph section + for (const auto &[app_name, outputs] : app_outputs) { + nlohmann::json app; + app["name"] = app_name; + + if (app_inputs.count(app_name)) { + app["input_stream"] = app_inputs[app_name]; + } else { + app["input_stream"] = nlohmann::json::array(); + } + + app["output_stream"] = outputs; + + // ---- streaming ---- + nlohmann::json streaming = nlohmann::json::array(); + + for (const auto &path : outputs) { + const auto &data = files->at(path); + const auto &[producers, consumers, commit_rule, fire_rule, permanent_flag, + excluded_flag, is_file, n_close, n_dir_files, file_deps, store_in_fs] = + data; + + if (path == "*") { + LOG("Skipping * path"); + print_message(CLI_LEVEL_WARNING, "Skipping * path"); + continue; + } + + nlohmann::json sitem; + if (is_file) { + sitem["name"] = nlohmann::json::array({path}); + } else { + sitem["dirname"] = nlohmann::json::array({path}); + } + + // Commit rule + std::string committed = commit_rule; + if (commit_rule == capiocl::COMMITTED_ON_CLOSE && n_close > 0) { + committed += ":" + std::to_string(n_close); + } else if (commit_rule == capiocl::COMMITTED_N_FILES && n_dir_files > 0) { + committed += ":" + std::to_string(n_dir_files); + } + + sitem["committed"] = committed; + + // Mode / fire rule + sitem["mode"] = fire_rule; + + // File dependencies (for COMMITTED_ON_FILE) + if (commit_rule == capiocl::COMMITTED_ON_FILE && !file_deps.empty()) { + sitem["file_deps"] = file_deps; + } + + // Directory file count + if (n_dir_files > 0) { + sitem["n_files"] = n_dir_files; + } + + streaming.push_back(sitem); + } + + if (!streaming.empty()) { + app["streaming"] = streaming; + } + + io_graph.push_back(app); + } + + doc["IO_Graph"] = io_graph; + + // ---- permanent / exclude ---- + if (!permanent.empty()) { + doc["permanent"] = permanent; + } + if (!exclude.empty()) { + doc["exclude"] = exclude; + } + + // ---- storage ---- + nlohmann::json storage; + if (!memory_storage.empty()) { + storage["memory"] = memory_storage; + } + if (!fs_storage.empty()) { + storage["fs"] = fs_storage; + } + if (!storage.empty()) { + doc["storage"] = storage; + } + + // ---- Write JSON ---- + std::ofstream out(filename); + if (!out.is_open()) { + std::string msg = "Failed to open output file: " + filename.string(); + capiocl::print_message(capiocl::CLI_LEVEL_ERROR, msg); + ERR_EXIT(msg.c_str()); + } + + out << std::setw(4) << doc << std::endl; + + capiocl::print_message(capiocl::CLI_LEVEL_INFO, + "Configuration serialized to " + filename.string()); +}