Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions bindings/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ namespace py = pybind11;
PYBIND11_MODULE(_py_capio_cl, m) {
m.doc() =
"CAPIO-CL: Cross Application Programmable I/O - Coordination Language python bindings.";

m.attr("MODE_UPDATE") = py::str(capiocl::MODE_UPDATE);
m.attr("MODE_NO_UPDATE") = py::str(capiocl::MODE_NO_UPDATE);
m.attr("COMMITTED_ON_CLOSE") = py::str(capiocl::COMMITTED_ON_CLOSE);
m.attr("COMMITTED_ON_FILE") = py::str(capiocl::COMMITTED_ON_FILE);
m.attr("COMMITTED_N_FILES") = py::str(capiocl::COMMITTED_N_FILES);
m.attr("COMMITTED_ON_TERMINATION") = py::str(capiocl::COMMITTED_ON_TERMINATION);

py::class_<capiocl::Engine>(
m, "Engine", "The main CAPIO-CL engine for managing data communication and I/O operations.")
.def(py::init<>())
Expand Down Expand Up @@ -54,20 +62,25 @@ PYBIND11_MODULE(_py_capio_cl, m) {
return "<Engine repr at " + std::to_string(reinterpret_cast<uintptr_t>(&e)) + ">";
});

m.attr("MODE_UPDATE") = py::str(capiocl::MODE_UPDATE);
m.attr("MODE_NO_UPDATE") = py::str(capiocl::MODE_NO_UPDATE);
m.attr("COMMITTED_ON_CLOSE") = py::str(capiocl::COMMITTED_ON_CLOSE);
m.attr("COMMITTED_ON_FILE") = py::str(capiocl::COMMITTED_ON_FILE);
m.attr("COMMITTED_N_FILES") = py::str(capiocl::COMMITTED_N_FILES);
m.attr("COMMITTED_ON_TERMINATION") = py::str(capiocl::COMMITTED_ON_TERMINATION);

py::class_<capiocl::Parser>(m, "Parser", "The CAPIO-CL Parser component.")
.def("parse", &capiocl::Parser::parse)
.def("__str__",
[](const capiocl::Parser &e) {
return "<Parser repr at " + std::to_string(reinterpret_cast<uintptr_t>(&e)) + ">";
})
.def("__repr__", [](const capiocl::Engine &e) {
.def("__repr__", [](const capiocl::Parser &e) {
return "<Parser repr at " + std::to_string(reinterpret_cast<uintptr_t>(&e)) + ">";
});

py::class_<capiocl::Serializer>(m, "Serializer", "The CAPIO-CL Serializer component.")
.def(py::init<>())
.def("dump", &capiocl::Serializer::dump)
.def("__str__",
[](const capiocl::Serializer &e) {
return "<Serializer repr at " + std::to_string(reinterpret_cast<uintptr_t>(&e)) +
">";
})
.def("__repr__", [](const capiocl::Serializer &e) {
return "<Serializer repr at " + std::to_string(reinterpret_cast<uintptr_t>(&e)) + ">";
});
}
18 changes: 16 additions & 2 deletions capiocl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#endif

namespace capiocl {
class Serializer;

constexpr char CAPIO_CL_DEFAULT_WF_NAME[] = "CAPIO_CL";

Expand Down Expand Up @@ -78,7 +79,7 @@ inline void print_message(const std::string &message_type = "",
* - Storage policy (in-memory or on filesystem)
*/
class Engine {

friend class capiocl::Serializer;
std::string node_name;

/**
Expand Down Expand Up @@ -433,9 +434,22 @@ class Parser {
* the config file
*/
static std::tuple<std::string, Engine *> parse(const std::filesystem::path &source,
const std::filesystem::path &resolve_prexix = "",
std::filesystem::path &resolve_prexix,
bool store_only_in_memory = false);
};

class Serializer {
public:
/**
* Dump the current configuration loaded into the Engine to a CAPIO-CL configuration file.
*
* @param engine instance of @class capiocl::Engine to dump
* @param workflow_name Name of the current workflow
* @param filename path of output file @param filename
*/
static void dump(const Engine &engine, const std::string workflow_name,
const std::filesystem::path &filename);
};
} // namespace capiocl

#endif // CAPIO_CL_CAPIOCL_HPP
2 changes: 1 addition & 1 deletion src/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ void capiocl::Engine::print() const {
} else {
line << std::setfill(' ') << std::setw(20) << "|" << std::setfill(' ')
<< std::setw(13) << "|" << std::setfill(' ') << std::setw(12) << "|"
<< std::setfill(' ') << std::setw(10) << "|" << std::setw(10) << "|";
<< std::setfill(' ') << std::setw(10) << "|" << std::setw(11) << "|";
}

print_message(CLI_LEVEL_JSON, line.str());
Expand Down
117 changes: 80 additions & 37 deletions src/Parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ bool capiocl::Parser::firstIsSubpathOfSecond(const std::filesystem::path &path,
}

std::tuple<std::string, capiocl::Engine *>
capiocl::Parser::parse(const std::filesystem::path &source,
const std::filesystem::path &resolve_prefix, bool store_only_in_memory) {
capiocl::Parser::parse(const std::filesystem::path &source, std::filesystem::path &resolve_prefix,
bool store_only_in_memory) {

std::string workflow_name = CAPIO_CL_DEFAULT_WF_NAME;
auto locations = new Engine();
START_LOG(gettid(), "call(config_file='%s')", source.c_str());

if (resolve_prefix.empty()) {
resolve_prefix = ".";
}

locations->newFile("*");
locations->setDirectory("*");
if (store_only_in_memory) {
Expand Down Expand Up @@ -73,6 +77,7 @@ capiocl::Parser::parse(const std::filesystem::path &source,

for (const auto &app : doc["IO_Graph"]) {
if (!app.contains("name") || !app["name"].is_string()) {
print_message(CLI_LEVEL_ERROR, "Missing IO_Graph name or name is not a valid string!");
ERR_EXIT("Error: app name is mandatory");
}

Expand All @@ -87,14 +92,15 @@ capiocl::Parser::parse(const std::filesystem::path &source,
ERR_EXIT(msg.c_str());
}

print_message(CLI_LEVEL_JSON, "Parsing input_stream for app " + app_name);
for (const auto &itm : app["input_stream"]) {
std::filesystem::path file_path(itm.get<std::string>());
if (file_path.is_relative()) {
print_message(CLI_LEVEL_WARNING,
"Path : " + file_path.string() + " IS RELATIVE! resolving...");
file_path = resolve_prefix / file_path;
}
locations->newFile(file_path.c_str());
locations->newFile(file_path);
locations->addConsumer(file_path, app_name);
}

Expand All @@ -104,7 +110,7 @@ capiocl::Parser::parse(const std::filesystem::path &source,
print_message(CLI_LEVEL_ERROR, msg);
ERR_EXIT(msg.c_str());
}

print_message(CLI_LEVEL_JSON, "Parsing output_stream for app " + app_name);
for (const auto &itm : app["output_stream"]) {
std::filesystem::path file_path(itm.get<std::string>());
if (file_path.is_relative()) {
Expand All @@ -118,6 +124,7 @@ capiocl::Parser::parse(const std::filesystem::path &source,

// ---- streaming ----
if (app.contains("streaming") && app["streaming"].is_array()) {
print_message(CLI_LEVEL_JSON, "Parsing streaming for app " + app_name);
for (const auto &stream_item : app["streaming"]) {
bool is_file = true;
std::vector<std::filesystem::path> streaming_names;
Expand Down Expand Up @@ -145,51 +152,79 @@ capiocl::Parser::parse(const std::filesystem::path &source,
streaming_names.push_back(p);
}
} else {
print_message(
CLI_LEVEL_ERROR,
"Missing streaming name/dirname, or name/dirname is not an array for app " +
app_name);
ERR_EXIT("error: either name or dirname in streaming section is required");
}

// committed
if (!stream_item.contains("committed") || !stream_item["committed"].is_string()) {
ERR_EXIT("commit rule is mandatory in streaming section");
}

std::string committed = stream_item["committed"].get<std::string>();
auto pos = committed.find(':');
if (pos != std::string::npos) {
commit_rule = committed.substr(0, pos);
std::string count_str = committed.substr(pos + 1);
if (!isInteger(count_str)) {
ERR_EXIT("invalid number in commit rule");
// Commit rule. Optional in nature, hence no check required!
if (stream_item.contains("committed")) {
if (!stream_item["committed"].is_string()) {
print_message(CLI_LEVEL_ERROR, "Error: invalid type for commit rule!");
ERR_EXIT("Error: invalid type for commit rule!");
}
if (commit_rule == COMMITTED_ON_CLOSE) {
n_close = std::stol(count_str);
} else if (commit_rule == COMMITTED_N_FILES) {
n_files = std::stol(count_str);

std::string committed = stream_item["committed"].get<std::string>();
auto pos = committed.find(':');
if (pos != std::string::npos) {
commit_rule = committed.substr(0, pos);
std::string count_str = committed.substr(pos + 1);
if (!isInteger(count_str)) {
ERR_EXIT("invalid number in commit rule");
}
if (commit_rule == COMMITTED_ON_CLOSE) {
n_close = std::stol(count_str);
} else if (commit_rule == COMMITTED_N_FILES) {
n_files = std::stol(count_str);
} else {
ERR_EXIT("invalid commit rule type");
}
} else {
ERR_EXIT("invalid commit rule type");
commit_rule = committed;
}
} else {
commit_rule = committed;
}

// file_deps
if (commit_rule == COMMITTED_ON_FILE) {
if (!stream_item.contains("file_deps") ||
!stream_item["file_deps"].is_array()) {
ERR_EXIT("commit rule is on_file but no file_deps section found");
}
for (const auto &dep : stream_item["file_deps"]) {
std::filesystem::path p(dep.get<std::string>());
if (p.is_relative()) {
p = resolve_prefix / p;
// file_deps
if (commit_rule == COMMITTED_ON_FILE) {
if (!stream_item.contains("file_deps") ||
!stream_item["file_deps"].is_array()) {
ERR_EXIT("commit rule is on_file but no file_deps section found");
}
file_deps.push_back(p);
for (const auto &dep : stream_item["file_deps"]) {
std::filesystem::path p(dep.get<std::string>());
if (p.is_relative()) {
p = resolve_prefix / p;
}
file_deps.push_back(p);
}
}

// check commit rule is one of the available
if (commit_rule != capiocl::COMMITTED_N_FILES &&
commit_rule != capiocl::COMMITTED_ON_CLOSE &&
commit_rule != capiocl::COMMITTED_ON_FILE &&
commit_rule != capiocl::COMMITTED_ON_TERMINATION) {
print_message(CLI_LEVEL_ERROR, "Error: commit rule " + commit_rule +
" is not one of the allowed one!");
ERR_EXIT("Unknown commit rule %s", commit_rule.c_str());
}
}

// mode
if (stream_item.contains("mode") && stream_item["mode"].is_string()) {
// Firing rule. Optional in nature, hence no check required!
if (stream_item.contains("mode")) {
if (!stream_item["mode"].is_string()) {
print_message(CLI_LEVEL_ERROR, "Error: invalid type for mode");
ERR_EXIT("Error: invalid type for mode");
}
mode = stream_item["mode"].get<std::string>();

if (mode != capiocl::MODE_UPDATE && mode != capiocl::MODE_NO_UPDATE) {
print_message(CLI_LEVEL_ERROR,
"Error: invalid firing rule provided for app: " + app_name);
ERR_EXIT("Error: invalid firing rule provided for app: %s",
app_name.c_str());
}
}

// n_files (optional)
Expand All @@ -206,6 +241,14 @@ capiocl::Parser::parse(const std::filesystem::path &source,
} else {
locations->setDirectory(path);
}

print_message(CLI_LEVEL_INFO,
"App: " + app_name + " - " + "path: " + path.string() + " - " +
"committed: " + commit_rule + " - " + "mode: " + mode +
" - " + "n_files: " + std::to_string(n_files) + " - " +
"n_close: " + std::to_string(n_close));
print_message(CLI_LEVEL_INFO, "");

locations->setCommitRule(path, commit_rule);
locations->setFireRule(path, mode);
locations->setCommitedCloseNumber(path, n_close);
Expand Down
Loading
Loading