diff --git a/.github/workflows/python-bindings.yml b/.github/workflows/python-bindings.yml index 8b1f866..e66a301 100644 --- a/.github/workflows/python-bindings.yml +++ b/.github/workflows/python-bindings.yml @@ -14,7 +14,7 @@ jobs: name: "Test Python bindings" strategy: matrix: - on: [ 'ubuntu-24.04', 'macos-15-intel', 'macos-26' ] + on: [ 'ubuntu-24.04', 'macos-15-intel', 'macos-15' ] python: [ '3.10', '3.11', '3.12', '3.13', '3.14' ] runs-on: ${{ matrix.on }} diff --git a/.gitignore b/.gitignore index 194cd58..8a4f4c0 100644 --- a/.gitignore +++ b/.gitignore @@ -47,4 +47,6 @@ dist/** # debug information files *.dwo -**.DS_Store \ No newline at end of file +**.DS_Store + +*build* \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index fb5cdd4..bb45ddc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -57,11 +57,18 @@ FetchContent_Declare( GIT_TAG v1.8.1 ) +FetchContent_Declare( + calf + GIT_REPOSITORY https://github.com/High-Performance-IO/calf.git + GIT_TAG main +) + +# JSONCONS build flags set(JSONCONS_BUILD_TESTS OFF CACHE BOOL "" FORCE) set(JSONCONS_BUILD_EXAMPLES OFF CACHE BOOL "" FORCE) set(JSONCONS_BUILD_FUZZERS OFF CACHE BOOL "" FORCE) -FetchContent_MakeAvailable(jsoncons tomlplusplus) +FetchContent_MakeAvailable(jsoncons tomlplusplus calf) if (BUILD_PYTHON_BINDINGS) FetchContent_Declare( @@ -129,24 +136,33 @@ set(CAPIO_CL_HEADERS capiocl.hpp) # Library target add_library(libcapio_cl STATIC ${CAPIO_SRC} ${CAPIO_CL_HEADERS}) + target_include_directories(libcapio_cl PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/src ${jsoncons_SOURCE_DIR}/include ${CAPIOCL_JSON_SCHEMAS_DIRECTORY} ${TOMLPLUSPLUS_SOURCE_DIR}/include + ${CALF_SOURCE_DIR} ) target_link_libraries(libcapio_cl PUBLIC) target_link_libraries(libcapio_cl PRIVATE - tomlplusplus::tomlplusplus -) + tomlplusplus::tomlplusplus) find_library(LIBANL anl) -if(LIBANL) +if (LIBANL) target_link_libraries(libcapio_cl PRIVATE ${LIBANL}) endif () +##################################### +# set CALF logger component name +##################################### +calf_enable_log(libcapio_cl $,ON,OFF>) +calf_set_component(libcapio_cl "capiocl") +calf_set_default_log_dir(libcapio_cl "./capiocl_logs") +target_include_directories(libcapio_cl PRIVATE ${CALF_SOURCE_DIR}) + ##################################### # Install rules ##################################### @@ -202,7 +218,7 @@ if (CAPIO_CL_BUILD_TESTS) CURL::libcurl ) - if(LIBANL) + if (LIBANL) target_link_libraries(CAPIO_CL_tests PRIVATE ${LIBANL}) endif () @@ -296,11 +312,11 @@ if (ENABLE_COVERAGE_PIPELINE) COMMAND ${LCOV_BIN} --remove ${COVERAGE_INFO} - "*jsoncons*" - "/usr/include/*" - "*googletest*" - "*tests/*" - ${COVERAGE_REMOVE_PATTERNS} + "*jsoncons*" + "/usr/include/*" + "*googletest*" + "*tests/*" + ${COVERAGE_REMOVE_PATTERNS} --ignore-errors unused --ignore-errors inconsistent --output-file ${COVERAGE_INFO} diff --git a/capiocl/printer.h b/capiocl/printer.h deleted file mode 100644 index 1069bf6..0000000 --- a/capiocl/printer.h +++ /dev/null @@ -1,42 +0,0 @@ -#ifndef CAPIO_CL_PRINTER_H -#define CAPIO_CL_PRINTER_H -#include -#include - -#ifndef HOST_NAME_MAX -#define HOST_NAME_MAX 1024 -#endif - -/// @brief Namespace containing the CAPIO-CL print utilities -namespace capiocl::printer { - -/// @brief CLI print constant -constexpr char CLI_LEVEL_INFO[] = "[\033[1;32mCAPIO-CL\033[0m"; -/// @brief CLI print constant -constexpr char CLI_LEVEL_WARNING[] = "[\033[1;33mCAPIO-CL\033[0m"; -/// @brief CLI print constant -constexpr char CLI_LEVEL_ERROR[] = "[\033[1;31mCAPIO-CL\033[0m"; -/// @brief CLI print constant -constexpr char CLI_LEVEL_JSON[] = "[\033[1;34mCAPIO-CL\033[0m"; - -/** - * Print a message to standard out. Used to log messages related to the CAPIO-CL Engine - * @param message_type Type of message to print. - * @param message_line - */ -inline void print(const std::string &message_type = "", const std::string &message_line = "") { - static std::string *node_name = nullptr; - if (node_name == nullptr) { - node_name = new std::string(HOST_NAME_MAX, ' '); // LCOV_EXCL_LINE - gethostname(node_name->data(), HOST_NAME_MAX); - } - if (message_type.empty()) { - std::cout << std::endl; - } else { - std::cout << message_type << " " << node_name->c_str() << "] " << message_line << std::endl - << std::flush; - } -} -} // namespace capiocl::printer - -#endif // CAPIO_CL_PRINTER_H \ No newline at end of file diff --git a/src/Engine.cpp b/src/Engine.cpp index d6951fc..d98bc03 100644 --- a/src/Engine.cpp +++ b/src/Engine.cpp @@ -3,11 +3,11 @@ #include #include +#include "calf/StdOutLogger.h" #include "capiocl.hpp" #include "capiocl/configuration.h" #include "capiocl/engine.h" #include "capiocl/monitor.h" -#include "capiocl/printer.h" /// @brief Class to implement a shared mutex lock guard template class shared_lock_guard { @@ -26,47 +26,48 @@ template class shared_lock_guard { }; void capiocl::engine::Engine::print() const { - + UPDATE_CALF_WORKFLOW_NAME(workflow_name); // First message - printer::print(printer::CLI_LEVEL_JSON, ""); - printer::print(printer::CLI_LEVEL_JSON, "Composition of expected CAPIO FS: "); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, " "); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Composition of expected CAPIO FS: "); // Table header lines - printer::print(printer::CLI_LEVEL_JSON, "*" + std::string(134, '=') + "*"); - - printer::print(printer::CLI_LEVEL_JSON, "|" + std::string(134, ' ') + "|"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "*%s*", std::string(134, '=').c_str()); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "|%s|", std::string(134, ' ').c_str()); - { - std::ostringstream oss; - oss << "| Parsed configuration file for workflow: \033[1;36m" << workflow_name - << std::setw(94 - workflow_name.length()) << "\033[0m |"; - printer::print(printer::CLI_LEVEL_JSON, oss.str()); - } + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, + "| Parsed configuration file for workflow: \033[1;36m %s%s \033[0m |", + workflow_name.c_str(), std::string(86 - workflow_name.length(), ' ').c_str()); - printer::print(printer::CLI_LEVEL_JSON, "|" + std::string(134, ' ') + "|"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "|%s|", std::string(134, ' ').c_str()); - std::string msg = "| File color legend: \033[48;5;034m \033[0m File stored in memory"; - msg += std::string(82, ' ') + "|"; - printer::print(printer::CLI_LEVEL_JSON, msg); + CALF_PRINT_COLOR( + CALF_CLI_LEVEL_INFO, + "| File color legend: \033[48;5;034m \033[0m File stored in memory%s|", + std::string(82, ' ').c_str()); - printer::print( - printer::CLI_LEVEL_JSON, // LCOV_EXCL_LINE - "| \033[48;5;172m \033[0m File stored on file system" + - std::string(77, ' ') + "|"); + CALF_PRINT_COLOR( + CALF_CLI_LEVEL_INFO, + "| \033[48;5;172m \033[0m File stored on file system%s|", + std::string(77, ' ').c_str()); - printer::print(printer::CLI_LEVEL_JSON, "|" + std::string(134, '=') + "|"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "|%s|", std::string(134, ' ').c_str()); - std::string line = "|======|===================|===================|===================="; - line += "|====================|============|===========|=========|==========|"; - printer::print(printer::CLI_LEVEL_JSON, line); + CALF_PRINT_COLOR( + CALF_CLI_LEVEL_INFO, "|%s|%s|%s|%s|%s|%s|%s|%s|%s|", std::string(6, '=').c_str(), + std::string(19, '=').c_str(), std::string(19, '=').c_str(), std::string(20, '=').c_str(), + std::string(20, '=').c_str(), std::string(12, '=').c_str(), std::string(11, '=').c_str(), + std::string(9, '=').c_str(), std::string(10, '=').c_str()); - line = "| Kind | Filename | Producer step | Consumer step | "; - line += "Commit Rule | Fire Rule | Permanent | Exclude | n_files |"; - printer::print(printer::CLI_LEVEL_JSON, line); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, + "| Kind | Filename | Producer step | Consumer step | " + "Commit Rule | Fire Rule | Permanent | Exclude | n_files |"); - line = "|======|===================|===================|====================|========"; - line += "============|============|===========|=========|==========|"; - printer::print(printer::CLI_LEVEL_JSON, line); + CALF_PRINT_COLOR( + CALF_CLI_LEVEL_INFO, "|%s|%s|%s|%s|%s|%s|%s|%s|%s|", std::string(6, '=').c_str(), + std::string(19, '=').c_str(), std::string(19, '=').c_str(), std::string(20, '=').c_str(), + std::string(20, '=').c_str(), std::string(12, '=').c_str(), std::string(11, '=').c_str(), + std::string(9, '=').c_str(), std::string(10, '=').c_str()); // Iterate over _locations for (auto &itm : _capio_cl_entries) { @@ -136,13 +137,13 @@ void capiocl::engine::Engine::print() const { line << std::setfill(' ') << std::setw(10) << "|" << std::setw(11) << "|"; } - printer::print(printer::CLI_LEVEL_JSON, line.str()); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "%s", line.str().c_str()); } - printer::print(printer::CLI_LEVEL_JSON, "*" + std::string(134, '~') + "*"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "*%s*", std::string(134, '~').c_str()); } - printer::print(printer::CLI_LEVEL_JSON, ""); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, " "); } capiocl::engine::Engine::Engine(const bool use_default_settings) { @@ -899,6 +900,7 @@ bool capiocl::engine::Engine::operator==(const Engine &other) const { return true; } void capiocl::engine::Engine::loadConfiguration(const std::string &path) { + UPDATE_CALF_WORKFLOW_NAME(workflow_name); configuration.load(path); std::string multicast_monitor_enabled, fs_monitor_enabled; @@ -912,7 +914,7 @@ void capiocl::engine::Engine::loadConfiguration(const std::string &path) { if (multicast_monitor_enabled == "true") { monitor.registerMonitorBackend(new monitor::MulticastMonitor(configuration)); } else { - printer::print(printer::CLI_LEVEL_WARNING, "Skipping registration of MulticastMonitor"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_WARNING, "Skipping registration of MulticastMonitor"); } try { @@ -924,7 +926,7 @@ void capiocl::engine::Engine::loadConfiguration(const std::string &path) { if (fs_monitor_enabled == "true") { monitor.registerMonitorBackend(new monitor::FileSystemMonitor()); } else { - printer::print(printer::CLI_LEVEL_WARNING, "Skipping registration of FileSystemMonitor"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_WARNING, "Skipping registration of FileSystemMonitor"); } } void capiocl::engine::Engine::useDefaultConfiguration() { diff --git a/src/Monitor.cpp b/src/Monitor.cpp index 75e9f89..cbde029 100644 --- a/src/Monitor.cpp +++ b/src/Monitor.cpp @@ -1,9 +1,9 @@ #include "capiocl/monitor.h" #include "capiocl.hpp" -#include "capiocl/printer.h" + capiocl::monitor::MonitorException::MonitorException(const std::string &msg) : message(msg) { - printer::print(printer::CLI_LEVEL_ERROR, msg); + std::cerr << msg << std::endl; } bool capiocl::monitor::Monitor::isCommitted(const std::filesystem::path &path) const { @@ -19,6 +19,7 @@ void capiocl::monitor::Monitor::setCommitted(std::filesystem::path path) const { void capiocl::monitor::Monitor::registerMonitorBackend(const MonitorInterface *interface) { interfaces.emplace_back(interface); } + void capiocl::monitor::Monitor::setHomeNode(const std::filesystem::path &path) const { std::for_each(interfaces.begin(), interfaces.end(), [&path](const auto &interface) { interface->setHomeNode(path); }); @@ -27,7 +28,7 @@ void capiocl::monitor::Monitor::setHomeNode(const std::filesystem::path &path) c std::set capiocl::monitor::Monitor::getHomeNode(const std::filesystem::path &path) const { std::set home_nodes; - for (const auto &interface : interfaces) { + for (const auto &interface: interfaces) { const auto &node = interface->getHomeNode(path); if (node == NO_HOME_NODE) { continue; @@ -38,7 +39,7 @@ capiocl::monitor::Monitor::getHomeNode(const std::filesystem::path &path) const } capiocl::monitor::Monitor::~Monitor() { - for (const auto &interface : interfaces) { + for (const auto &interface: interfaces) { delete interface; } -} \ No newline at end of file +} diff --git a/src/Parser.cpp b/src/Parser.cpp index cc612ea..0d80c78 100644 --- a/src/Parser.cpp +++ b/src/Parser.cpp @@ -1,13 +1,14 @@ #include #include +#include "calf/StdOutLogger.h" #include "capiocl.hpp" #include "capiocl/engine.h" #include "capiocl/parser.h" -#include "capiocl/printer.h" capiocl::parser::ParserException::ParserException(const std::string &msg) : message(msg) { - printer::print(printer::CLI_LEVEL_ERROR, msg); + UPDATE_CALF_WORKFLOW_NAME(""); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_ERROR, "%s", msg.c_str()); } jsoncons::jsonschema::json_schema @@ -17,6 +18,7 @@ capiocl::parser::Parser::loadSchema(const char *data) { std::filesystem::path capiocl::parser::Parser::resolve(std::filesystem::path path, const std::filesystem::path &prefix) { + UPDATE_CALF_WORKFLOW_NAME(""); if (prefix.empty()) { return path; } @@ -25,9 +27,9 @@ std::filesystem::path capiocl::parser::Parser::resolve(std::filesystem::path pat return path; } - auto resolved = prefix / path; - const auto msg = "Path : " + path.string() + " IS RELATIVE! Resolved to: " + resolved.string(); - printer::print(printer::CLI_LEVEL_WARNING, msg); + auto resolved = prefix / path; + CALF_PRINT_COLOR(CALF_CLI_LEVEL_WARNING, "Path %s IS RELATIVE! Resolved to: %s", path.c_str(), + resolved.c_str()); return resolved; } @@ -38,7 +40,8 @@ void capiocl::parser::Parser::validate_json(const jsoncons::json &doc, const cha // throws jsoncons::jsonschema::validation_error on failure schema.validate(doc); } catch (const jsoncons::jsonschema::validation_error &e) { - printer::print(printer::CLI_LEVEL_ERROR, e.what()); + UPDATE_CALF_WORKFLOW_NAME(""); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_ERROR, "%s", e.what()); throw ParserException("JSON validation failed!"); } } @@ -66,8 +69,10 @@ capiocl::engine::Engine *capiocl::parser::Parser::parse(const std::filesystem::p } file.close(); - printer::print(printer::CLI_LEVEL_INFO, - "Parsing CAPIO-CL config file for version: " + capio_cl_release); + + UPDATE_CALF_WORKFLOW_NAME(""); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Parsing CAPIO-CL config file for version: %s", + capio_cl_release.c_str()); if (capio_cl_release == CAPIO_CL_VERSION::V1) { return available_parsers::parse_v1(source, resolve_prefix, store_only_in_memory); diff --git a/src/Serializer.cpp b/src/Serializer.cpp index 3af869e..38afd54 100644 --- a/src/Serializer.cpp +++ b/src/Serializer.cpp @@ -2,19 +2,20 @@ #include #include +#include "calf/StdOutLogger.h" #include "capiocl.hpp" #include "capiocl/engine.h" -#include "capiocl/printer.h" #include "capiocl/serializer.h" void capiocl::serializer::Serializer::dump(const engine::Engine &engine, const std::filesystem::path &filename, const std::string &version) { + UPDATE_CALF_WORKFLOW_NAME(engine.getWorkflowName()); if (version == CAPIO_CL_VERSION::V1) { - printer::print(printer::CLI_LEVEL_INFO, "Serializing engine with V1 specification"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Serializing engine with V1 specification"); available_serializers::serialize_v1(engine, filename); } else if (version == CAPIO_CL_VERSION::V1_1) { - printer::print(printer::CLI_LEVEL_INFO, "Serializing engine with V1.1 specification"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Serializing engine with V1.1 specification"); available_serializers::serialize_v1_1(engine, filename); } else { const auto message = "No serializer available for CAPIO-CL version: " + version; @@ -24,5 +25,6 @@ void capiocl::serializer::Serializer::dump(const engine::Engine &engine, capiocl::serializer::SerializerException::SerializerException(const std::string &msg) : message(msg) { - printer::print(printer::CLI_LEVEL_ERROR, msg); + UPDATE_CALF_WORKFLOW_NAME(""); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_ERROR, "%s", msg.c_str()); } \ No newline at end of file diff --git a/src/api.cpp b/src/api.cpp index 6adebf9..8c73e2c 100644 --- a/src/api.cpp +++ b/src/api.cpp @@ -9,9 +9,9 @@ #include #include +#include "calf/StdOutLogger.h" #include "capiocl/api.h" #include "capiocl/engine.h" -#include "capiocl/printer.h" std::mutex _setupMtx; std::condition_variable _setupCv; @@ -21,6 +21,8 @@ bool thread_ready = false; void server(const std::string &address, const int port, capiocl::engine::Engine *engine, std::atomic *terminate) { + UPDATE_CALF_WORKFLOW_NAME(engine->getWorkflowName()); + constexpr int RECV_BUF_SIZE = 65535; const auto &wf_name = engine->getWorkflowName(); @@ -93,8 +95,8 @@ void server(const std::string &address, const int port, capiocl::engine::Engine } } catch (const jsoncons::json_exception &e) { - capiocl::printer::print(capiocl::printer::CLI_LEVEL_ERROR, - "APIServer: Received invalid json: " + std::string(e.what())); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_ERROR, "APIServer: Received invalid json: %s", + e.what()); } } @@ -105,6 +107,8 @@ capiocl::api::CapioClApiServer::CapioClApiServer(engine::Engine *engine, configuration::CapioClConfiguration &config) : capiocl_configuration(config) { + UPDATE_CALF_WORKFLOW_NAME(engine->getWorkflowName()); + std::string address; int port; try { @@ -124,7 +128,7 @@ capiocl::api::CapioClApiServer::CapioClApiServer(engine::Engine *engine, std::unique_lock lock(_setupMtx); _setupCv.wait(lock, [] { return thread_ready; }); - printer::print(printer::CLI_LEVEL_INFO, "API server @ " + address + ":" + std::to_string(port)); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "API server @ %s:%d", address.c_str(), port); } capiocl::api::CapioClApiServer::~CapioClApiServer() { diff --git a/src/configuration.cpp b/src/configuration.cpp index ad62d09..4d7eb35 100644 --- a/src/configuration.cpp +++ b/src/configuration.cpp @@ -1,8 +1,8 @@ #include #include +#include "calf/StdOutLogger.h" #include "capiocl/configuration.h" -#include "capiocl/printer.h" #include "toml++/toml.hpp" void load_config_to_memory(const toml::table &tbl, @@ -91,5 +91,6 @@ void capiocl::configuration::CapioClConfiguration::getParameter(const std::strin capiocl::configuration::CapioClConfigurationException::CapioClConfigurationException( const std::string &msg) : message(msg) { - printer::print(printer::CLI_LEVEL_ERROR, msg); + UPDATE_CALF_WORKFLOW_NAME(""); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_ERROR, "%s", msg.c_str()); } \ No newline at end of file diff --git a/src/monitors/Multicast.cpp b/src/monitors/Multicast.cpp index 1f82fc3..42da9bf 100644 --- a/src/monitors/Multicast.cpp +++ b/src/monitors/Multicast.cpp @@ -2,17 +2,17 @@ #include #include #include +#include #include "capiocl.hpp" #include "capiocl/monitor.h" -#include "capiocl/printer.h" static std::tuple outgoing_socket_multicast(const std::string &address, const int port) { sockaddr_in addr{}; - addr.sin_family = AF_INET; + addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr(address.c_str()); - addr.sin_port = htons(port); + addr.sin_port = htons(port); const int transmission_socket = socket(AF_INET, SOCK_DGRAM, 0); // LCOV_EXCL_START @@ -27,16 +27,16 @@ static std::tuple outgoing_socket_multicast(const std::string static int incoming_socket_multicast(const std::string &address_ip, const int port, sockaddr_in &addr, socklen_t &addrlen) { - constexpr int loopback = 1; // enable reception of loopback messages + constexpr int loopback = 1; // enable reception of loopback messages constexpr int multi_bind = 1; // enable multiple sockets on same address - addr = {}; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); + addr = {}; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(address_ip.c_str()); - addrlen = sizeof(addr); + addrlen = sizeof(addr); - ip_mreq mreq = {}; + ip_mreq mreq = {}; mreq.imr_multiaddr.s_addr = inet_addr(address_ip.c_str()); mreq.imr_interface.s_addr = htonl(INADDR_ANY); @@ -82,14 +82,14 @@ void capiocl::monitor::MulticastMonitor::commit_listener(std::vector *terminate) { pthread_setcancelstate(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); sockaddr_in addr_in = {}; - socklen_t addr_len = {}; - const auto socket = incoming_socket_multicast(ip_addr, ip_port, addr_in, addr_len); - const auto addr = reinterpret_cast(&addr_in); + socklen_t addr_len = {}; + const auto socket = incoming_socket_multicast(ip_addr, ip_port, addr_in, addr_len); + const auto addr = reinterpret_cast(&addr_in); char incoming_message[MESSAGE_SIZE] = {0}; // Polling for non blocking pollfd pfd = {}; - pfd.fd = socket; + pfd.fd = socket; pfd.events = POLLIN | POLLPRI; do { @@ -108,12 +108,17 @@ void capiocl::monitor::MulticastMonitor::commit_listener(std::vector(incoming_size)); + if (msg.size() < 2) { + continue; + } + const auto path = msg.substr(2); if (const char command = incoming_message[0]; command == SET) { // Received an advert for a committed file @@ -142,10 +147,10 @@ void capiocl::monitor::MulticastMonitor::home_node_listener( gethostname(this_hostname, HOST_NAME_MAX); sockaddr_in addr_in = {}; - socklen_t addr_len = {}; - const auto socket = incoming_socket_multicast(ip_addr, ip_port, addr_in, addr_len); + socklen_t addr_len = {}; + const auto socket = incoming_socket_multicast(ip_addr, ip_port, addr_in, addr_len); - const auto addr = reinterpret_cast(&addr_in); + const auto addr = reinterpret_cast(&addr_in); char incoming_message[MESSAGE_SIZE] = {0}; do { @@ -153,7 +158,7 @@ void capiocl::monitor::MulticastMonitor::home_node_listener( // Polling for non blocking pollfd pfd = {}; - pfd.fd = socket; + pfd.fd = socket; pfd.events = POLLIN | POLLPRI; // TODO: migrate to epoll for linux and kqueue on MacOS @@ -169,12 +174,13 @@ void capiocl::monitor::MulticastMonitor::home_node_listener( } // LCOV_EXCL_START - if (recvfrom(socket, incoming_message, MESSAGE_SIZE, MSG_DONTWAIT, addr, &addr_len) < 0) { + const auto incoming_size = recvfrom(socket, incoming_message, MESSAGE_SIZE, MSG_DONTWAIT, addr, &addr_len); + if (incoming_size < 0) { continue; } // LCOV_EXCL_STOP - std::string incoming_message_str(incoming_message); + std::string incoming_message_str(incoming_message, incoming_size); std::vector tokens; size_t start = 0, end = incoming_message_str.find(' '); @@ -183,28 +189,38 @@ void capiocl::monitor::MulticastMonitor::home_node_listener( tokens.push_back(incoming_message_str.substr(start, end - start)); } start = end + 1; - end = incoming_message_str.find(' ', start); + end = incoming_message_str.find(' ', start); } if (start < incoming_message_str.length()) { tokens.push_back(incoming_message_str.substr(start)); } - const auto &path = tokens[1]; - if (tokens[0].c_str()[0] == SET) { - // Received an advert for a committed file - std::lock_guard lg(lock); + // Drop anything that isn't a well-formed message. + if (tokens.empty()) { + continue; + } + if (const char command = tokens[0].c_str()[0]; command == SET) { + if (tokens.size() < 3) { + // need "! " -> malformed message, skip + continue; + } + const auto &path = tokens[1]; const auto &home_node = tokens[2]; - home_nodes[path] = home_node; + std::lock_guard lg(lock); + home_nodes[path] = home_node; } else { // Received a query for a home node, Message begins with capiocl::Monitor::REQUEST + if (tokens.size() < 2) { + // need "? " -> malformed message, skip + continue; + } + const auto &path = tokens[1]; std::lock_guard lg(lock); - if (home_nodes.find(path) == home_nodes.end()) { continue; } - if (home_nodes[path] == this_hostname) { _send_message(ip_addr, ip_port, path + " " + this_hostname, SET); } @@ -231,31 +247,23 @@ capiocl::monitor::MulticastMonitor::MulticastMonitor( config.getParameter("monitor.mcast.delay_ms", &MULTICAST_DELAY_MILLIS); commit_thread = - std::thread(&commit_listener, std::ref(_committed_files), std::ref(committed_lock), - MULTICAST_COMMIT_ADDR, MULTICAST_COMMIT_PORT, &this->terminate); + std::thread(&commit_listener, std::ref(_committed_files), std::ref(committed_lock), + MULTICAST_COMMIT_ADDR, MULTICAST_COMMIT_PORT, &this->terminate); home_node_thread = - std::thread(&home_node_listener, std::ref(_home_nodes), std::ref(home_node_lock), - MULTICAST_HOME_NODE_ADDR, MULTICAST_HOME_NODE_PORT, &this->terminate); + std::thread(&home_node_listener, std::ref(_home_nodes), std::ref(home_node_lock), + MULTICAST_HOME_NODE_ADDR, MULTICAST_HOME_NODE_PORT, &this->terminate); gethostname(_hostname, HOST_NAME_MAX); } capiocl::monitor::MulticastMonitor::~MulticastMonitor() { - terminate = true; - - if (commit_thread.joinable()) { - commit_thread.join(); - } - if (home_node_thread.joinable()) { - home_node_thread.join(); - } + commit_thread.join(); + home_node_thread.join(); } -bool capiocl::monitor::MulticastMonitor::isCommitted(const std::filesystem::path &path) const { - - { +bool capiocl::monitor::MulticastMonitor::isCommitted(const std::filesystem::path &path) const { { const std::lock_guard lg(committed_lock); if (std::find(_committed_files.begin(), _committed_files.end(), path) != _committed_files.end()) { @@ -264,9 +272,7 @@ bool capiocl::monitor::MulticastMonitor::isCommitted(const std::filesystem::path } _send_message(MULTICAST_COMMIT_ADDR, MULTICAST_COMMIT_PORT, path, GET); - std::this_thread::sleep_for(std::chrono::milliseconds(MULTICAST_DELAY_MILLIS)); - - { + std::this_thread::sleep_for(std::chrono::milliseconds(MULTICAST_DELAY_MILLIS)); { const std::lock_guard lg(committed_lock); return std::find(_committed_files.begin(), _committed_files.end(), path) != _committed_files.end(); @@ -291,9 +297,7 @@ void capiocl::monitor::MulticastMonitor::setHomeNode(const std::filesystem::path } const std::string & -capiocl::monitor::MulticastMonitor::getHomeNode(const std::filesystem::path &path) const { - - { +capiocl::monitor::MulticastMonitor::getHomeNode(const std::filesystem::path &path) const { { const std::lock_guard lg(home_node_lock); if (const auto itm = _home_nodes.find(path); itm != _home_nodes.end()) { return itm->second; @@ -309,4 +313,4 @@ capiocl::monitor::MulticastMonitor::getHomeNode(const std::filesystem::path &pat } else { return NO_HOME_NODE; } -} \ No newline at end of file +} diff --git a/src/parsers/v1.1.cpp b/src/parsers/v1.1.cpp index 070dc58..318d512 100644 --- a/src/parsers/v1.1.cpp +++ b/src/parsers/v1.1.cpp @@ -1,10 +1,10 @@ #include +#include "calf/StdOutLogger.h" #include "capio_cl_json_schemas.hpp" #include "capiocl.hpp" #include "capiocl/engine.h" #include "capiocl/parser.h" -#include "capiocl/printer.h" capiocl::engine::Engine * capiocl::parser::Parser::available_parsers::parse_v1_1(const std::filesystem::path &source, @@ -22,12 +22,15 @@ capiocl::parser::Parser::available_parsers::parse_v1_1(const std::filesystem::pa // ---- workflow name ---- workflow_name = doc["name"].as(); engine->setWorkflowName(workflow_name); - printer::print(printer::CLI_LEVEL_JSON, "Parsing configuration for workflow: " + workflow_name); + UPDATE_CALF_WORKFLOW_NAME(workflow_name); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Parsing configuration for workflow: %s", + workflow_name.c_str()); // ---- CAPIO-CL TOML CONFIGURATION ---- if (doc.contains("configuration")) { auto toml_config_path = doc["configuration"].as(); - printer::print(printer::CLI_LEVEL_JSON, "Using configuration file : " + toml_config_path); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Using TOML configuration file: %s", + toml_config_path.c_str()); engine->loadConfiguration(toml_config_path); } else { engine->useDefaultConfiguration(); @@ -36,10 +39,10 @@ capiocl::parser::Parser::available_parsers::parse_v1_1(const std::filesystem::pa // ---- IO_Graph ---- for (const auto &app : doc["IO_Graph"].array_range()) { std::string app_name = app["name"].as(); - printer::print(printer::CLI_LEVEL_JSON, "Parsing config for app " + app_name); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Parsing config for app: %s", app_name.c_str()); // ---- input_stream ---- - printer::print(printer::CLI_LEVEL_JSON, "Parsing input_stream for app " + app_name); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Parsing input_stream for app: %s", app_name.c_str()); for (const auto &itm : app["input_stream"].array_range()) { auto file_path = resolve(itm.as(), resolve_prefix); engine->newFile(file_path); @@ -47,7 +50,8 @@ capiocl::parser::Parser::available_parsers::parse_v1_1(const std::filesystem::pa } // ---- output_stream ---- - printer::print(printer::CLI_LEVEL_JSON, "Parsing output_stream for app " + app_name); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Parsing output_stream for app: %s", + app_name.c_str()); for (const auto &itm : app["output_stream"].array_range()) { auto file_path = resolve(itm.as(), resolve_prefix); engine->newFile(file_path); @@ -56,7 +60,8 @@ capiocl::parser::Parser::available_parsers::parse_v1_1(const std::filesystem::pa // ---- streaming ---- if (app.contains("streaming")) { - printer::print(printer::CLI_LEVEL_JSON, "Parsing streaming for app " + app_name); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Parsing streaming section for app: %s", + app_name.c_str()); for (const auto &stream_item : app["streaming"].array_range()) { bool is_file = true; std::vector streaming_names; @@ -169,7 +174,7 @@ capiocl::parser::Parser::available_parsers::parse_v1_1(const std::filesystem::pa engine->setStoreFileInMemory(file_str); } } else { - printer::print(printer::CLI_LEVEL_INFO, "No MEM storage section found"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_WARNING, "No MEM storage section found"); } if (storage.contains("fs")) { @@ -178,15 +183,15 @@ capiocl::parser::Parser::available_parsers::parse_v1_1(const std::filesystem::pa engine->setStoreFileInFileSystem(file_str); } } else { - printer::print(printer::CLI_LEVEL_INFO, "No FS storage section found"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_WARNING, "No FS storage section found"); } } else { - printer::print(printer::CLI_LEVEL_INFO, "No storage section found"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_WARNING, "No STORAGE storage section found"); } // ---- Store only in memory ---- if (store_only_in_memory) { - printer::print(printer::CLI_LEVEL_INFO, "Storing all files in memory"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Storing all files in memory"); engine->setAllStoreInMemory(); } diff --git a/src/parsers/v1.cpp b/src/parsers/v1.cpp index af2e645..0f83994 100644 --- a/src/parsers/v1.cpp +++ b/src/parsers/v1.cpp @@ -1,15 +1,16 @@ #include +#include "calf/StdOutLogger.h" #include "capio_cl_json_schemas.hpp" #include "capiocl.hpp" #include "capiocl/engine.h" #include "capiocl/parser.h" -#include "capiocl/printer.h" capiocl::engine::Engine * capiocl::parser::Parser::available_parsers::parse_v1(const std::filesystem::path &source, const std::filesystem::path &resolve_prefix, bool store_only_in_memory) { + std::string workflow_name = CAPIO_CL_DEFAULT_WF_NAME; auto engine = new engine::Engine(true); @@ -24,15 +25,17 @@ capiocl::parser::Parser::available_parsers::parse_v1(const std::filesystem::path // ---- workflow name ---- workflow_name = doc["name"].as(); engine->setWorkflowName(workflow_name); - printer::print(printer::CLI_LEVEL_JSON, "Parsing configuration for workflow: " + workflow_name); + UPDATE_CALF_WORKFLOW_NAME(workflow_name); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Parsing configuration for workflow: %s", + workflow_name.c_str()); // ---- IO_Graph ---- for (const auto &app : doc["IO_Graph"].array_range()) { std::string app_name = app["name"].as(); - printer::print(printer::CLI_LEVEL_JSON, "Parsing config for app " + app_name); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Parsing config for app: %s", app_name.c_str()); // ---- input_stream ---- - printer::print(printer::CLI_LEVEL_JSON, "Parsing input_stream for app " + app_name); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Parsing input_stream for app: %s", app_name.c_str()); for (const auto &itm : app["input_stream"].array_range()) { auto file_path = resolve(itm.as(), resolve_prefix); engine->newFile(file_path); @@ -40,7 +43,8 @@ capiocl::parser::Parser::available_parsers::parse_v1(const std::filesystem::path } // ---- output_stream ---- - printer::print(printer::CLI_LEVEL_JSON, "Parsing output_stream for app " + app_name); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Parsing output_stream for app: %s", + app_name.c_str()); for (const auto &itm : app["output_stream"].array_range()) { auto file_path = resolve(itm.as(), resolve_prefix); engine->newFile(file_path); @@ -49,7 +53,8 @@ capiocl::parser::Parser::available_parsers::parse_v1(const std::filesystem::path // ---- streaming ---- if (app.contains("streaming")) { - printer::print(printer::CLI_LEVEL_JSON, "Parsing streaming for app " + app_name); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Parsing streaming section for app: %s", + app_name.c_str()); for (const auto &stream_item : app["streaming"].array_range()) { bool is_file = true; std::vector streaming_names; @@ -162,7 +167,7 @@ capiocl::parser::Parser::available_parsers::parse_v1(const std::filesystem::path engine->setStoreFileInMemory(file_str); } } else { - printer::print(printer::CLI_LEVEL_INFO, "No MEM storage section found"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_WARNING, "No MEM storage section found"); } if (storage.contains("fs")) { @@ -171,15 +176,15 @@ capiocl::parser::Parser::available_parsers::parse_v1(const std::filesystem::path engine->setStoreFileInFileSystem(file_str); } } else { - printer::print(printer::CLI_LEVEL_INFO, "No FS storage section found"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_WARNING, "No FS storage section found"); } } else { - printer::print(printer::CLI_LEVEL_INFO, "No storage section found"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_WARNING, "No STORAGE storage section found"); } // ---- Store only in memory ---- if (store_only_in_memory) { - printer::print(printer::CLI_LEVEL_INFO, "Storing all files in memory"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Storing all files in memory"); engine->setAllStoreInMemory(); } diff --git a/src/serializers/v1.1.cpp b/src/serializers/v1.1.cpp index b862711..ad6fd00 100644 --- a/src/serializers/v1.1.cpp +++ b/src/serializers/v1.1.cpp @@ -1,12 +1,13 @@ #include +#include "calf/StdOutLogger.h" #include "capiocl.hpp" #include "capiocl/engine.h" -#include "capiocl/printer.h" #include "capiocl/serializer.h" void capiocl::serializer::Serializer::available_serializers::serialize_v1_1( const engine::Engine &engine, const std::filesystem::path &filename) { + UPDATE_CALF_WORKFLOW_NAME(engine.getWorkflowName()); jsoncons::json doc; doc["version"] = 1.1; doc["name"] = engine.getWorkflowName(); @@ -58,9 +59,9 @@ void capiocl::serializer::Serializer::available_serializers::serialize_v1_1( const auto close_count = std::to_string(entry.commit_on_close_count); streaming_item["committed"] = entry.commit_rule + ":" + close_count; } else { - const auto msg = "Commit rule is not ON_CLOSE but close count > 0"; - printer::print(printer::CLI_LEVEL_WARNING, msg); - printer::print(printer::CLI_LEVEL_WARNING, "Setting commit rule = ON_CLOSE"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_WARNING, + "Commit rule is not ON_CLOSE but close count > 0"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_WARNING, "Setting commit rule = ON_CLOSE"); streaming_item["committed"] = std::string(commitRules::ON_CLOSE) + ":" + std::to_string(entry.commit_on_close_count); } @@ -124,5 +125,6 @@ void capiocl::serializer::Serializer::available_serializers::serialize_v1_1( } out << jsoncons::pretty_print(doc) << std::endl; - printer::print(printer::CLI_LEVEL_INFO, "Configuration serialized to " + filename.string()); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Configuration serialized to %s", + filename.string().c_str()); } \ No newline at end of file diff --git a/src/serializers/v1.cpp b/src/serializers/v1.cpp index f43a09d..ec76f16 100644 --- a/src/serializers/v1.cpp +++ b/src/serializers/v1.cpp @@ -1,12 +1,13 @@ #include +#include "calf/StdOutLogger.h" #include "capiocl.hpp" #include "capiocl/engine.h" -#include "capiocl/printer.h" #include "capiocl/serializer.h" void capiocl::serializer::Serializer::available_serializers::serialize_v1( const engine::Engine &engine, const std::filesystem::path &filename) { + UPDATE_CALF_WORKFLOW_NAME(engine.getWorkflowName()); jsoncons::json doc; doc["name"] = engine.getWorkflowName(); @@ -57,9 +58,9 @@ void capiocl::serializer::Serializer::available_serializers::serialize_v1( const auto close_count = std::to_string(entry.commit_on_close_count); streaming_item["committed"] = entry.commit_rule + ":" + close_count; } else { - const auto msg = "Commit rule is not ON_CLOSE but close count > 0"; - printer::print(printer::CLI_LEVEL_WARNING, msg); - printer::print(printer::CLI_LEVEL_WARNING, "Setting commit rule = ON_CLOSE"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_WARNING, + "Commit rule is not ON_CLOSE but close count > 0"); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_WARNING, "Setting commit rule = ON_CLOSE"); streaming_item["committed"] = std::string(commitRules::ON_CLOSE) + ":" + std::to_string(entry.commit_on_close_count); } @@ -123,5 +124,6 @@ void capiocl::serializer::Serializer::available_serializers::serialize_v1( } out << jsoncons::pretty_print(doc) << std::endl; - printer::print(printer::CLI_LEVEL_INFO, "Configuration serialized to " + filename.string()); + CALF_PRINT_COLOR(CALF_CLI_LEVEL_INFO, "Configuration serialized to %s", + filename.string().c_str()); } \ No newline at end of file diff --git a/tests/cpp/main.cpp b/tests/cpp/main.cpp index 29f742f..1163cb9 100644 --- a/tests/cpp/main.cpp +++ b/tests/cpp/main.cpp @@ -18,7 +18,6 @@ template std::string demangled_name(const T &obj) { #include "capiocl/engine.h" #include "capiocl/monitor.h" #include "capiocl/parser.h" -#include "capiocl/printer.h" #include "capiocl/serializer.h" #include "test_apis.hpp" diff --git a/tests/cpp/test_exceptions.hpp b/tests/cpp/test_exceptions.hpp index 9356939..c4bcd63 100644 --- a/tests/cpp/test_exceptions.hpp +++ b/tests/cpp/test_exceptions.hpp @@ -43,8 +43,8 @@ TEST(EXCEPTION_SUITE_NAME, testFailedserializeVersion) { TEST(EXCEPTION_SUITE_NAME, testParserException) { std::filesystem::path JSON_DIR = "/tmp/capio_cl_jsons/"; - capiocl::printer::print(capiocl::printer::CLI_LEVEL_INFO, - "Loading jsons from " + JSON_DIR.string()); + + std::cout << "Loading jsons from " << JSON_DIR << std::endl; std::vector test_filenames = { "", @@ -77,8 +77,7 @@ TEST(EXCEPTION_SUITE_NAME, testParserException) { for (const auto &version : CAPIO_CL_AVAIL_VERSIONS) { for (const auto &test : test_filenames) { const auto test_file_path = test.empty() ? test : JSON_DIR / ("V" + version) / test; - capiocl::printer::print(capiocl::printer::CLI_LEVEL_WARNING, - "Testing on file " + test_file_path.string()); + std::cout << "Testing on file " << test_file_path << std::endl; EXPECT_THROW(capiocl::parser::Parser::parse(test_file_path), capiocl::parser::ParserException); diff --git a/tests/cpp/test_serialize_deserialize.hpp b/tests/cpp/test_serialize_deserialize.hpp index 8c0cd1a..96eadb2 100644 --- a/tests/cpp/test_serialize_deserialize.hpp +++ b/tests/cpp/test_serialize_deserialize.hpp @@ -48,7 +48,6 @@ TEST(SERIALIZE_DESERIALIZE_SUITE_NAME, testSerializeParseCAPIOCLV1) { auto new_engine = capiocl::parser::Parser::parse(path, resolve); EXPECT_TRUE(new_engine->getWorkflowName() == workflow_name); - capiocl::printer::print("", ""); EXPECT_TRUE(engine == *new_engine); auto new_engine1 = capiocl::parser::Parser::parse(path, resolve, true); @@ -80,7 +79,6 @@ TEST(SERIALIZE_DESERIALIZE_SUITE_NAME, testSerializeParseCAPIOCLV1NcloseNfiles) auto new_engine = capiocl::parser::Parser::parse(path, resolve); EXPECT_TRUE(new_engine->getWorkflowName() == workflow_name); - capiocl::printer::print("", ""); EXPECT_TRUE(engine == *new_engine); std::filesystem::remove(path); @@ -116,7 +114,6 @@ TEST(SERIALIZE_DESERIALIZE_SUITE_NAME, testSerializeParseCAPIOCLV1FileDeps) { auto new_engine = capiocl::parser::Parser::parse(path, resolve); EXPECT_TRUE(new_engine->getWorkflowName() == workflow_name); - capiocl::printer::print("", ""); EXPECT_TRUE(engine == *new_engine); std::filesystem::remove(path); @@ -146,7 +143,6 @@ TEST(SERIALIZE_DESERIALIZE_SUITE_NAME, testSerializeCommitOnCloseCountNoCommitRu EXPECT_TRUE(new_engine->getWorkflowName() == workflow_name); EXPECT_FALSE(engine == *new_engine); - capiocl::printer::print("", ""); engine.setCommitRule(file_1_name, capiocl::commitRules::ON_CLOSE); EXPECT_TRUE(engine == *new_engine);