diff --git a/CMakeLists.txt b/CMakeLists.txt index d98f543..d154823 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -215,6 +215,21 @@ set(source_files src/binsrv/gtids/uuid.hpp src/binsrv/gtids/uuid.cpp + # models files + src/binsrv/models/binlog_file_record_fwd.hpp + src/binsrv/models/binlog_file_record.hpp + + src/binsrv/models/error_response_fwd.hpp + src/binsrv/models/error_response.hpp + src/binsrv/models/error_response.cpp + + src/binsrv/models/response_status_type_fwd.hpp + src/binsrv/models/response_status_type.hpp + + src/binsrv/models/search_by_timestamp_response_fwd.hpp + src/binsrv/models/search_by_timestamp_response.hpp + src/binsrv/models/search_by_timestamp_response.cpp + # binlog files src/binsrv/basic_logger_fwd.hpp src/binsrv/basic_logger.hpp @@ -231,6 +246,14 @@ set(source_files src/binsrv/cout_logger.hpp src/binsrv/cout_logger.cpp + src/binsrv/ctime_timestamp_fwd.hpp + src/binsrv/ctime_timestamp.hpp + src/binsrv/ctime_timestamp.cpp + + src/binsrv/ctime_timestamp_range_fwd.hpp + src/binsrv/ctime_timestamp_range.hpp + src/binsrv/ctime_timestamp_range.cpp + src/binsrv/exception_handling_helpers.hpp src/binsrv/exception_handling_helpers.cpp diff --git a/README.md b/README.md index 00bc945..921b591 100644 --- a/README.md +++ b/README.md @@ -146,17 +146,18 @@ The result binary can be found under the following path `ws/percona-binlog-serve Please run ```bash -./binlog_server [ ] +./binlog_server [ [ ] ] ``` where -`` can be either `version`, `fetch`, or `pull` -and -`` is an optional parameter (required only when `` is not `version`) that represents a path to a JSON configuration file (described below). +`` can be either `version`, `search_by_timestamp`, `fetch`, or `pull`, +`` is an optional parameter (required when `` is not `version`) that represents a path to a JSON configuration file (described below), +and `` is an optional parameter (required only when `` is `search_by_timestamp`), that represents a valid timestamp in ISO format (e.g. `2026-02-10T14:30:00`) ### Operation modes Percona Binary Log Server utility can operate in three modes: - 'version' +- 'search_by_timestamp' - 'fetch' - 'pull' @@ -172,6 +173,48 @@ may print 0.1.0 ``` +#### 'search_by_timestamp' operation mode + +In this mode the utility requires one additional command line parameter `` and will print to the standard output the list of binlog files stored in the Binary Log Server data directory that have at least one event whose timestamp is less or equal to the provided ``. +Along with the file name the output will also return its current size in bytes, timestamps and URI. +For instance, +```bash +./binlog_server search_by_timestamp config.json 2026-02-10T14:30:00 +``` +may print +```json +{ + "status": "success", + "result": [ + { + "name": "binlog.000001", + "size": 134217728, + "uri": "s3://binsrv-bucket/storage/binlog.000001", + "min_timestamp":"2026-02-09T17:22:01", + "max_timestamp":"2026-02-09T17:22:08" + }, + { + "name": "binlog.000002", + "size": 134217728, + "uri": "s3://binsrv-bucket/storage/binlog.000002", + "min_timestamp":"2026-02-09T17:22:08", + "max_timestamp":"2026-02-09T17:22:09" + } + ] +} +``` +If an error occurs, +```json +{ + "status": "error", + "message": "" +} +``` +The `` may be one of the following (but not limited to): +- `Invalid timestamp format` +- `Binlog storage is empty` +- `Timestamp is too old` + #### 'fetch' operation mode In this mode the utility tries to connect to a remote MySQL server, switch connection to replication mode and read events from all available binary logs already stored on the server. After reading the very last event, the utility gracefully disconnects and exits. diff --git a/mtr/binlog_streaming/r/search_by_timestamp.result b/mtr/binlog_streaming/r/search_by_timestamp.result new file mode 100644 index 0000000..e03a183 --- /dev/null +++ b/mtr/binlog_streaming/r/search_by_timestamp.result @@ -0,0 +1,62 @@ +*** Resetting replication at the very beginning of the test. + +*** Generating a configuration file in JSON format for the Binlog +*** Server utility. + +*** Determining binlog file directory from the server. + +*** Creating a temporary directory for storing +*** binlog files downloaded via the Binlog Server utility. + +*** 1. Executing the Binlog Server utility in the 'search_by_timestamp' +*** mode with an invalid timestamp +include/read_file_to_var.inc + +*** 2. Executing the Binlog Server utility in the 'search_by_timestamp' +*** mode with the beginning of Epoch (Jan 1, 1970, 00:00:00) on an +*** empty storage +include/read_file_to_var.inc + +*** Creating a simple table. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +*** Filling the table with some data. +INSERT INTO t1 VALUES(); + +*** Flushing the first binary log and switching to the second one. +FLUSH BINARY LOGS; + +*** Filling the table with more data. +INSERT INTO t1 VALUES(); + +*** Executing the Binlog Server utility and fetching all events. + +*** 3. Executing the Binlog Server utility in the 'search_by_timestamp' +*** mode with the beginning of Epoch (Jan 1, 1970, 00:00:00) on an +*** non-empty storage +include/read_file_to_var.inc + +*** 4. Executing the Binlog Server utility in the 'search_by_timestamp' +*** mode with the +include/read_file_to_var.inc + +*** 5. Executing the Binlog Server utility in the 'search_by_timestamp' +*** mode with the and expecting one binlog +*** file to be returned +include/read_file_to_var.inc + +*** 6. Executing the Binlog Server utility in the 'search_by_timestamp' +*** mode with the and expecting two binlog +*** files to be returned +include/read_file_to_var.inc + +*** Removing the search result file. + +*** Dropping the table. +DROP TABLE t1; + +*** Removing the Binlog Server utility storage directory. + +*** Removing the Binlog Server utility log file. + +*** Removing the Binlog Server utility configuration file. diff --git a/mtr/binlog_streaming/t/search_by_timestamp.combinations b/mtr/binlog_streaming/t/search_by_timestamp.combinations new file mode 100644 index 0000000..411ce97 --- /dev/null +++ b/mtr/binlog_streaming/t/search_by_timestamp.combinations @@ -0,0 +1,5 @@ +[position] + +[gtid] +gtid-mode=on +enforce-gtid-consistency diff --git a/mtr/binlog_streaming/t/search_by_timestamp.test b/mtr/binlog_streaming/t/search_by_timestamp.test new file mode 100644 index 0000000..4173f91 --- /dev/null +++ b/mtr/binlog_streaming/t/search_by_timestamp.test @@ -0,0 +1,120 @@ +--source ../include/have_binsrv.inc + +--source ../include/v80_v84_compatibility_defines.inc + +# in case of --repeat=N, we need to start from a fresh binary log to make +# this test deterministic +--echo *** Resetting replication at the very beginning of the test. +--disable_query_log +eval $stmt_reset_binary_logs_and_gtids; +--enable_query_log + +# identifying backend storage type ('file' or 's3') +--source ../include/identify_storage_backend.inc + +# creating data directory, configuration file, etc. +--let $binsrv_connect_timeout = 20 +--let $binsrv_read_timeout = 60 +--let $binsrv_idle_time = 10 +--let $binsrv_verify_checksum = TRUE +--let $binsrv_replication_mode = `SELECT IF(@@global.gtid_mode, 'gtid', 'position')` +--let $binsrv_checkpoint_size = 1 +--source ../include/set_up_binsrv_environment.inc + +--let $timestamp_query = SELECT DATE_FORMAT(CONVERT_TZ(NOW(), @@session.time_zone, '+00:00'),'%Y-%m-%dT%H:%i:%s') + +--let $read_from_file = $MYSQL_TMP_DIR/search_result.json + +--echo +--echo *** 1. Executing the Binlog Server utility in the 'search_by_timestamp' +--echo *** mode with an invalid timestamp +--error 1 +--exec $BINSRV search_by_timestamp $binsrv_config_file_path 20000101T000000 > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'error'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.message') = 'Invalid timestamp format'`) + +--echo +--echo *** 2. Executing the Binlog Server utility in the 'search_by_timestamp' +--echo *** mode with the beginning of Epoch (Jan 1, 1970, 00:00:00) on an +--echo *** empty storage +--error 1 +--exec $BINSRV search_by_timestamp $binsrv_config_file_path 1970-01-01T00:00:00 > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'error'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.message') = 'Binlog storage is empty'`) + +--let $before_activity_timestamp = 2000-01-01T00:00:00 + +--echo +--echo *** Creating a simple table. +CREATE TABLE t1(id INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY(id)) ENGINE=InnoDB; + +--echo +--echo *** Filling the table with some data. +INSERT INTO t1 VALUES(); + +--let $inside_activity_timestamp = `$timestamp_query` +--sleep 2 + +--echo +--echo *** Flushing the first binary log and switching to the second one. +FLUSH BINARY LOGS; + +--echo +--echo *** Filling the table with more data. +INSERT INTO t1 VALUES(); + +--let $after_activity_timestamp = `$timestamp_query` + +--echo +--echo *** Executing the Binlog Server utility and fetching all events. +--exec $BINSRV fetch $binsrv_config_file_path > /dev/null + +--echo +--echo *** 3. Executing the Binlog Server utility in the 'search_by_timestamp' +--echo *** mode with the beginning of Epoch (Jan 1, 1970, 00:00:00) on an +--echo *** non-empty storage +--error 1 +--exec $BINSRV search_by_timestamp $binsrv_config_file_path 1970-01-01T00:00:00 > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'error'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.message') = 'Timestamp is too old'`) + +--echo +--echo *** 4. Executing the Binlog Server utility in the 'search_by_timestamp' +--echo *** mode with the +--error 1 +--exec $BINSRV search_by_timestamp $binsrv_config_file_path $before_activity_timestamp > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'error'`) +--assert(`SELECT JSON_EXTRACT('$result', '$.message') = 'Timestamp is too old'`) + +--echo +--echo *** 5. Executing the Binlog Server utility in the 'search_by_timestamp' +--echo *** mode with the and expecting one binlog +--echo *** file to be returned +--exec $BINSRV search_by_timestamp $binsrv_config_file_path $inside_activity_timestamp > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`) +--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = 1`) + +--echo +--echo *** 6. Executing the Binlog Server utility in the 'search_by_timestamp' +--echo *** mode with the and expecting two binlog +--echo *** files to be returned +--exec $BINSRV search_by_timestamp $binsrv_config_file_path $after_activity_timestamp > $read_from_file +--source include/read_file_to_var.inc +--assert(`SELECT JSON_EXTRACT('$result', '$.status') = 'success'`) +--assert(`SELECT JSON_LENGTH(JSON_EXTRACT('$result', '$.result')) = 2`) + +--echo +--echo *** Removing the search result file. +--remove_file $read_from_file + +--echo +--echo *** Dropping the table. +DROP TABLE t1; + +# cleaning up +--source ../include/tear_down_binsrv_environment.inc diff --git a/src/app.cpp b/src/app.cpp index 8634eb0..d660388 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -39,6 +40,7 @@ #include "app_version.hpp" #include "binsrv/basic_logger.hpp" +#include "binsrv/ctime_timestamp.hpp" #include "binsrv/exception_handling_helpers.hpp" #include "binsrv/log_severity.hpp" #include "binsrv/logger_factory.hpp" @@ -54,6 +56,9 @@ #include "binsrv/gtids/common_types.hpp" #include "binsrv/gtids/gtid_set.hpp" +#include "binsrv/models/error_response.hpp" +#include "binsrv/models/search_by_timestamp_response.hpp" + #include "binsrv/event/code_type.hpp" #include "binsrv/event/common_header_flag_type.hpp" #include "binsrv/event/event.hpp" @@ -75,9 +80,12 @@ namespace { -[[nodiscard]] bool check_cmd_args(const util::command_line_arg_view &cmd_args, - binsrv::operation_mode_type &operation_mode, - std::string_view &config_file_path) noexcept { +[[nodiscard]] bool +check_cmd_args(const util::command_line_arg_view &cmd_args, + binsrv::operation_mode_type &operation_mode, + // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) + std::string_view &config_file_path, + std::string_view &subcommand_value) noexcept { const auto number_of_cmd_args = std::size(cmd_args); static constexpr std::size_t expected_number_of_cmd_args_min{2U}; @@ -97,6 +105,8 @@ namespace { } static constexpr std::size_t expected_number_of_cmd_args_with_config{3U}; + static constexpr std::size_t + expected_number_of_cmd_args_with_config_and_value{4U}; switch (operation_mode) { case binsrv::operation_mode_type::fetch: @@ -106,6 +116,15 @@ namespace { } config_file_path = cmd_args[expected_number_of_cmd_args_with_config - 1U]; return true; + case binsrv::operation_mode_type::search_by_timestamp: + if (number_of_cmd_args != + expected_number_of_cmd_args_with_config_and_value) { + return false; + } + config_file_path = cmd_args[expected_number_of_cmd_args_with_config - 1U]; + subcommand_value = + cmd_args[expected_number_of_cmd_args_with_config_and_value - 1U]; + return true; case binsrv::operation_mode_type::version: return number_of_cmd_args == expected_number_of_cmd_args_min; break; @@ -237,7 +256,7 @@ void log_storage_info(binsrv::basic_logger &logger, logger.log(binsrv::log_severity::info, msg); msg.clear(); - if (storage.has_current_binlog_name()) { + if (storage.is_empty()) { msg = "binlog storage initialized at \""; msg += storage.get_current_binlog_name(); msg += "\":"; @@ -297,7 +316,7 @@ void log_replication_info( : "non-blocking"); msg += ", starting from "; if (replication_mode == binsrv::replication_mode_type::position) { - if (storage.has_current_binlog_name()) { + if (storage.is_empty()) { msg += storage.get_current_binlog_name(); msg += ":"; msg += std::to_string(storage.get_current_position()); @@ -475,7 +494,8 @@ void process_binlog_event(const binsrv::event::event ¤t_event, // checking if the event needs to be written to the binlog if (!context.is_event_info_only()) { storage.write_event(portion, context.is_at_transaction_boundary(), - context.get_transaction_gtid()); + context.get_transaction_gtid(), + current_common_header.get_timestamp()); } // processing the very last event in the sequence - either a non-artificial @@ -526,7 +546,7 @@ bool open_connection_and_switch_to_replication( server_id, util::const_byte_span{encoded_gtids_buffer}, verify_checksum, blocking_mode); } else { - if (storage.has_current_binlog_name()) { + if (storage.is_empty()) { connection.switch_to_position_replication( server_id, storage.get_current_binlog_name(), storage.get_current_position(), verify_checksum, blocking_mode); @@ -662,6 +682,61 @@ bool wait_for_interruptable(std::uint32_t idle_time_seconds, return !termination_flag.test(); } +bool handle_version() { + std::cout << app_version.get_string() << '\n'; + return true; +} + +// NOLINTNEXTLINE(bugprone-easily-swappable-parameters) +bool handle_search_by_timestamp(std::string_view config_file_path, + std::string_view subcommand_value) { + bool operation_successful{false}; + std::string result; + + try { + binsrv::ctime_timestamp timestamp; + if (!binsrv::ctime_timestamp::try_parse(subcommand_value, timestamp)) { + throw std::runtime_error("Invalid timestamp format"); + } + + const binsrv::main_config config{config_file_path}; + const auto &storage_config = config.root().get<"storage">(); + const auto &replication_config = config.root().get<"replication">(); + const auto replication_mode{replication_config.get<"mode">()}; + + const binsrv::storage storage{ + storage_config, binsrv::storage_construction_mode_type::querying_only, + replication_mode}; + + binsrv::models::search_by_timestamp_response response; + const auto &binlog_records{storage.get_binlog_records()}; + if (binlog_records.empty()) { + throw std::runtime_error("Binlog storage is empty"); + } + for (const auto &record : binlog_records) { + // break when we find a binlog file with min timestamp greater + // than the provided one + if (record.timestamps.get_min_timestamp() > timestamp) { + break; + } + response.add_record(record.name, record.size, + storage.get_binlog_uri(record.name), + record.timestamps.get_min_timestamp().get_value(), + record.timestamps.get_max_timestamp().get_value()); + } + if (response.root().get<"result">().empty()) { + throw std::runtime_error("Timestamp is too old"); + } + result = response.str(); + operation_successful = true; + } catch (const std::exception &e) { + const binsrv::models::error_response response{e.what()}; + result = response.str(); + } + std::cout << result << '\n'; + return operation_successful; +} + // since c++20 it is no longer needed to initialize std::atomic_flag with // ATOMIC_FLAG_INIT as this flag is modified from a signal handler it is marked // as volatile to make sure optimizer do optimizations which will be unsafe for @@ -683,18 +758,28 @@ int main(int argc, char *argv[]) { binsrv::operation_mode_type operation_mode{ binsrv::operation_mode_type::delimiter}; std::string_view config_file_path; - const auto cmd_args_checked{ - check_cmd_args(cmd_args, operation_mode, config_file_path)}; + std::string_view subcommand_value; + const auto cmd_args_checked{check_cmd_args( + cmd_args, operation_mode, config_file_path, subcommand_value)}; if (!cmd_args_checked) { std::cerr << "usage: " << executable_name << " (fetch|pull)) \n" + << " " << executable_name + << " search_by_timestamp \n" << " " << executable_name << " version\n"; return EXIT_FAILURE; } + // handling the 'version' command if (operation_mode == binsrv::operation_mode_type::version) { - std::cout << app_version.get_string() << '\n'; - return EXIT_SUCCESS; + return handle_version() ? EXIT_SUCCESS : EXIT_FAILURE; + } + + // handling the 'search_by_timestamp' command + if (operation_mode == binsrv::operation_mode_type::search_by_timestamp) { + return handle_search_by_timestamp(config_file_path, subcommand_value) + ? EXIT_SUCCESS + : EXIT_FAILURE; } int exit_code = EXIT_FAILURE; @@ -715,13 +800,11 @@ int main(int argc, char *argv[]) { logger->log(binsrv::log_severity::delimiter, util::get_readable_command_line_arguments(cmd_args)); - binsrv::main_config_ptr config; logger->log(binsrv::log_severity::delimiter, "reading configuration from the JSON file."); - config = std::make_shared(config_file_path); - assert(config); + const binsrv::main_config config{config_file_path}; - const auto &logger_config = config->root().get<"logger">(); + const auto &logger_config = config.root().get<"logger">(); if (!logger_config.has_file()) { logger->set_min_level(logger_config.get<"level">()); } else { @@ -763,13 +846,13 @@ int main(int argc, char *argv[]) { "set custom handlers for SIGINT and SIGTERM signals"); const volatile std::atomic_flag &termination_flag{global_termination_flag}; - const auto &storage_config = config->root().get<"storage">(); + const auto &storage_config = config.root().get<"storage">(); log_storage_config_info(*logger, storage_config); - const auto &connection_config = config->root().get<"connection">(); + const auto &connection_config = config.root().get<"connection">(); log_connection_config_info(*logger, connection_config); - const auto &replication_config = config->root().get<"replication">(); + const auto &replication_config = config.root().get<"replication">(); log_replication_config_info(*logger, replication_config); const auto server_id{replication_config.get<"server_id">()}; @@ -777,7 +860,9 @@ int main(int argc, char *argv[]) { const auto verify_checksum{replication_config.get<"verify_checksum">()}; const auto replication_mode{replication_config.get<"mode">()}; - binsrv::storage storage{storage_config, replication_mode}; + binsrv::storage storage{storage_config, + binsrv::storage_construction_mode_type::streaming, + replication_mode}; log_storage_info(*logger, storage); const easymysql::library mysql_lib; diff --git a/src/binsrv/basic_storage_backend.cpp b/src/binsrv/basic_storage_backend.cpp index 3159923..28e80a3 100644 --- a/src/binsrv/basic_storage_backend.cpp +++ b/src/binsrv/basic_storage_backend.cpp @@ -74,4 +74,9 @@ void basic_storage_backend::close_stream() { return do_get_description(); } +[[nodiscard]] std::string +basic_storage_backend::get_object_uri(std::string_view name) const { + return do_get_object_uri(name); +} + } // namespace binsrv diff --git a/src/binsrv/basic_storage_backend.hpp b/src/binsrv/basic_storage_backend.hpp index bc09690..04d4f2f 100644 --- a/src/binsrv/basic_storage_backend.hpp +++ b/src/binsrv/basic_storage_backend.hpp @@ -46,6 +46,7 @@ class basic_storage_backend { void close_stream(); [[nodiscard]] std::string get_description() const; + [[nodiscard]] std::string get_object_uri(std::string_view name) const; private: bool stream_open_{false}; @@ -62,6 +63,8 @@ class basic_storage_backend { virtual void do_close_stream() = 0; [[nodiscard]] virtual std::string do_get_description() const = 0; + [[nodiscard]] virtual std::string + do_get_object_uri(std::string_view name) const = 0; }; } // namespace binsrv diff --git a/src/binsrv/binlog_file_metadata.cpp b/src/binsrv/binlog_file_metadata.cpp index a9f2678..2e575cf 100644 --- a/src/binsrv/binlog_file_metadata.cpp +++ b/src/binsrv/binlog_file_metadata.cpp @@ -39,7 +39,7 @@ namespace binsrv { binlog_file_metadata::binlog_file_metadata() - : impl_{{expected_binlog_file_metadata_version}, {}, {}} {} + : impl_{{expected_binlog_file_metadata_version}, {}, {}, {}, {}} {} binlog_file_metadata::binlog_file_metadata(std::string_view data) : impl_{} { auto json_value = boost::json::parse(data); @@ -55,7 +55,7 @@ binlog_file_metadata::binlog_file_metadata(std::string_view data) : impl_{} { return boost::json::serialize(json_value); } -[[nodiscard]] gtids::gtid_set binlog_file_metadata::get_gtids() const { +[[nodiscard]] gtids::optional_gtid_set binlog_file_metadata::get_gtids() const { const auto &optional_gtids{root().get<"gtids">()}; if (!optional_gtids.has_value()) { return {}; @@ -64,20 +64,26 @@ binlog_file_metadata::binlog_file_metadata(std::string_view data) : impl_{} { const auto &encoded_gtids{optional_gtids.value()}; std::string decoded_gtids(std::size(encoded_gtids) / 2U, 'x'); boost::algorithm::unhex(encoded_gtids, std::data(decoded_gtids)); - return gtids::gtid_set{util::as_const_byte_span(decoded_gtids)}; + return gtids::optional_gtid_set{std::in_place, + util::as_const_byte_span(decoded_gtids)}; } -void binlog_file_metadata::set_gtids(const gtids::gtid_set >ids) { - const auto encoded_size{gtids.calculate_encoded_size()}; +void binlog_file_metadata::set_gtids(const gtids::optional_gtid_set >ids) { + auto &optional_gtids{root().get<"gtids">()}; + if (!gtids.has_value()) { + optional_gtids.reset(); + return; + } + const auto encoded_size{gtids.value().calculate_encoded_size()}; gtids::gtid_set_storage buffer(encoded_size); util::byte_span destination{buffer}; - gtids.encode_to(destination); + gtids.value().encode_to(destination); std::string encoded_gtids(std::size(buffer) * 2U, 'x'); const auto buffer_sv{util::as_string_view(std::as_const(buffer))}; boost::algorithm::hex_lower(buffer_sv, std::data(encoded_gtids)); - root().get<"gtids">().emplace(std::move(encoded_gtids)); + optional_gtids.emplace(std::move(encoded_gtids)); } void binlog_file_metadata::validate() const { diff --git a/src/binsrv/binlog_file_metadata.hpp b/src/binsrv/binlog_file_metadata.hpp index 30d6e84..f2007fe 100644 --- a/src/binsrv/binlog_file_metadata.hpp +++ b/src/binsrv/binlog_file_metadata.hpp @@ -22,6 +22,8 @@ #include #include +#include "binsrv/ctime_timestamp.hpp" + #include "binsrv/gtids/gtid_set.hpp" #include "util/common_optional_types.hpp" @@ -35,7 +37,9 @@ class [[nodiscard]] binlog_file_metadata { // clang-format off util::nv<"version", std::uint32_t>, util::nv<"size", std::uint64_t>, - util::nv<"gtids", util::optional_string> + util::nv<"gtids", util::optional_string>, + util::nv<"min_timestamp", ctime_timestamp>, + util::nv<"max_timestamp", ctime_timestamp> // clang-format on >; @@ -49,8 +53,11 @@ class [[nodiscard]] binlog_file_metadata { [[nodiscard]] auto &root() noexcept { return impl_; } [[nodiscard]] const auto &root() const noexcept { return impl_; } - [[nodiscard]] gtids::gtid_set get_gtids() const; - void set_gtids(const gtids::gtid_set >ids); + [[nodiscard]] bool has_gtids() const noexcept { + return impl_.get<"gtids">().has_value(); + } + [[nodiscard]] gtids::optional_gtid_set get_gtids() const; + void set_gtids(const gtids::optional_gtid_set >ids); private: impl_type impl_; diff --git a/src/binsrv/ctime_timestamp.cpp b/src/binsrv/ctime_timestamp.cpp new file mode 100644 index 0000000..78e92f9 --- /dev/null +++ b/src/binsrv/ctime_timestamp.cpp @@ -0,0 +1,72 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#include "binsrv/ctime_timestamp.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace binsrv { + +ctime_timestamp::ctime_timestamp(std::string_view value_sv) + : value_{boost::posix_time::to_time_t( + boost::posix_time::from_iso_extended_string(std::string{value_sv}))} { +} + +[[nodiscard]] bool +ctime_timestamp::try_parse(std::string_view value_sv, + ctime_timestamp ×tamp) noexcept { + bool result{true}; + try { + timestamp = ctime_timestamp{value_sv}; + } catch (const std::exception &) { + result = false; + } + return result; +} + +[[nodiscard]] std::string ctime_timestamp::str() const { + return boost::posix_time::to_iso_extended_string( + boost::posix_time::from_time_t(get_value())); +} + +std::ostream &operator<<(std::ostream &output, + const ctime_timestamp ×tamp) { + return output << timestamp.str(); +} + +std::istream &operator>>(std::istream &input, ctime_timestamp ×tamp) { + std::string timestamp_str; + input >> timestamp_str; + if (!input) { + return input; + } + try { + timestamp = ctime_timestamp{timestamp_str}; + } catch (const std::exception &) { + input.setstate(std::ios_base::failbit); + } + return input; +} + +} // namespace binsrv diff --git a/src/binsrv/ctime_timestamp.hpp b/src/binsrv/ctime_timestamp.hpp new file mode 100644 index 0000000..e004b22 --- /dev/null +++ b/src/binsrv/ctime_timestamp.hpp @@ -0,0 +1,54 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_CTIME_TIMESTAMP_HPP +#define BINSRV_CTIME_TIMESTAMP_HPP + +#include "binsrv/ctime_timestamp_fwd.hpp" // IWYU pragma: export + +#include +#include +#include +#include + +namespace binsrv { + +class [[nodiscard]] ctime_timestamp { +public: + ctime_timestamp() noexcept = default; + explicit ctime_timestamp(std::time_t value) noexcept : value_{value} {} + explicit ctime_timestamp(std::string_view value_sv); + + [[nodiscard]] static bool try_parse(std::string_view value_sv, + ctime_timestamp ×tamp) noexcept; + + [[nodiscard]] bool is_epoch() const noexcept { + return value_ == std::time_t{}; + } + void reset_to_epoch() noexcept { value_ = std::time_t{}; } + + [[nodiscard]] std::time_t get_value() const noexcept { return value_; } + [[nodiscard]] std::string str() const; + + friend auto operator<=>(const ctime_timestamp &first, + const ctime_timestamp &second) = default; + +private: + std::time_t value_{}; +}; + +} // namespace binsrv + +#endif // BINSRV_CTIME_TIMESTAMP_HPP diff --git a/src/binsrv/ctime_timestamp_fwd.hpp b/src/binsrv/ctime_timestamp_fwd.hpp new file mode 100644 index 0000000..85a23a0 --- /dev/null +++ b/src/binsrv/ctime_timestamp_fwd.hpp @@ -0,0 +1,38 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_CTIME_TIMESTAMP_FWD_HPP +#define BINSRV_CTIME_TIMESTAMP_FWD_HPP + +#include +#include + +#include "util/nv_tuple_json_support.hpp" + +namespace binsrv { + +class ctime_timestamp; + +std::ostream &operator<<(std::ostream &output, + const ctime_timestamp ×tamp); + +std::istream &operator>>(std::istream &input, ctime_timestamp ×tamp); + +} // namespace binsrv + +template <> +struct util::is_string_convertable : std::true_type {}; + +#endif // BINSRV_CTIME_TIMESTAMP_FWD_HPP diff --git a/src/binsrv/ctime_timestamp_range.cpp b/src/binsrv/ctime_timestamp_range.cpp new file mode 100644 index 0000000..d5cb7ce --- /dev/null +++ b/src/binsrv/ctime_timestamp_range.cpp @@ -0,0 +1,64 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#include "binsrv/ctime_timestamp_range.hpp" + +#include + +#include "binsrv/ctime_timestamp.hpp" + +namespace binsrv { + +ctime_timestamp_range::ctime_timestamp_range( + // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) + const ctime_timestamp &min_timestamp, + const ctime_timestamp &max_timestamp) noexcept + : min_timestamp_{min_timestamp}, max_timestamp_{max_timestamp} { + if (min_timestamp_.is_epoch() && !max_timestamp_.is_epoch()) { + min_timestamp_ = max_timestamp_; + } + if (!min_timestamp_.is_epoch() && max_timestamp_.is_epoch()) { + max_timestamp_ = min_timestamp_; + } +} + +void ctime_timestamp_range::add_timestamp( + const ctime_timestamp ×tamp) noexcept { + if (timestamp.is_epoch()) { + return; + } + if (is_empty()) { + min_timestamp_ = timestamp; + max_timestamp_ = timestamp; + return; + } + min_timestamp_ = std::min(min_timestamp_, timestamp); + max_timestamp_ = std::max(max_timestamp_, timestamp); +} + +void ctime_timestamp_range::add_range( + const ctime_timestamp_range &range) noexcept { + if (range.is_empty()) { + return; + } + if (is_empty()) { + *this = range; + return; + } + min_timestamp_ = std::min(min_timestamp_, range.get_min_timestamp()); + max_timestamp_ = std::max(max_timestamp_, range.get_max_timestamp()); +} + +} // namespace binsrv diff --git a/src/binsrv/ctime_timestamp_range.hpp b/src/binsrv/ctime_timestamp_range.hpp new file mode 100644 index 0000000..545b114 --- /dev/null +++ b/src/binsrv/ctime_timestamp_range.hpp @@ -0,0 +1,60 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_CTIME_TIMESTAMP_RANGE_HPP +#define BINSRV_CTIME_TIMESTAMP_RANGE_HPP + +#include "binsrv/ctime_timestamp_range_fwd.hpp" // IWYU pragma: export + +#include + +#include "binsrv/ctime_timestamp.hpp" + +namespace binsrv { + +class [[nodiscard]] ctime_timestamp_range { +public: + ctime_timestamp_range() = default; + + ctime_timestamp_range(const ctime_timestamp &min_timestamp, + const ctime_timestamp &max_timestamp) noexcept; + + [[nodiscard]] bool is_empty() const noexcept { + return min_timestamp_.is_epoch(); + } + [[nodiscard]] const ctime_timestamp &get_min_timestamp() const noexcept { + return min_timestamp_; + } + [[nodiscard]] const ctime_timestamp &get_max_timestamp() const noexcept { + return max_timestamp_; + } + + void clear() noexcept { + min_timestamp_.reset_to_epoch(); + max_timestamp_.reset_to_epoch(); + } + + void add_timestamp(const ctime_timestamp ×tamp) noexcept; + + void add_range(const ctime_timestamp_range &range) noexcept; + +private: + ctime_timestamp min_timestamp_{}; + ctime_timestamp max_timestamp_{}; +}; + +} // namespace binsrv + +#endif // BINSRV_CTIME_TIMESTAMP_RANGE_HPP diff --git a/src/binsrv/ctime_timestamp_range_fwd.hpp b/src/binsrv/ctime_timestamp_range_fwd.hpp new file mode 100644 index 0000000..90a3e60 --- /dev/null +++ b/src/binsrv/ctime_timestamp_range_fwd.hpp @@ -0,0 +1,25 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_CTIME_TIMESTAMP_RANGE_FWD_HPP +#define BINSRV_CTIME_TIMESTAMP_RANGE_FWD_HPP + +namespace binsrv { + +class ctime_timestamp_range; + +} // namespace binsrv + +#endif // BINSRV_CTIME_TIMESTAMP_RANGE_FWD_HPP diff --git a/src/binsrv/event/common_header.cpp b/src/binsrv/event/common_header.cpp index ee88c88..0cca433 100644 --- a/src/binsrv/event/common_header.cpp +++ b/src/binsrv/event/common_header.cpp @@ -15,6 +15,7 @@ #include "binsrv/event/common_header.hpp" +#include #include #include #include @@ -22,8 +23,7 @@ #include -#include -#include +#include "binsrv/ctime_timestamp.hpp" #include "binsrv/event/code_type.hpp" #include "binsrv/event/common_header_flag_type.hpp" @@ -96,9 +96,12 @@ common_header::common_header(util::const_byte_span portion) { // TODO: check if flags are valid (all the bits have corresponding enum) } +[[nodiscard]] ctime_timestamp common_header::get_timestamp() const noexcept { + return ctime_timestamp{static_cast(get_timestamp_raw())}; +} + [[nodiscard]] std::string common_header::get_readable_timestamp() const { - return boost::posix_time::to_simple_string( - boost::posix_time::from_time_t(get_timestamp())); + return get_timestamp().str(); } [[nodiscard]] std::string_view diff --git a/src/binsrv/event/common_header.hpp b/src/binsrv/event/common_header.hpp index c878930..39ac2e1 100644 --- a/src/binsrv/event/common_header.hpp +++ b/src/binsrv/event/common_header.hpp @@ -23,6 +23,8 @@ #include #include +#include "binsrv/ctime_timestamp_fwd.hpp" + #include "binsrv/event/code_type_fwd.hpp" #include "binsrv/event/common_header_flag_type_fwd.hpp" #include "binsrv/event/protocol_traits_fwd.hpp" @@ -40,9 +42,8 @@ class [[nodiscard]] common_header { [[nodiscard]] std::uint32_t get_timestamp_raw() const noexcept { return timestamp_; } - [[nodiscard]] std::time_t get_timestamp() const noexcept { - return static_cast(get_timestamp_raw()); - } + [[nodiscard]] ctime_timestamp get_timestamp() const noexcept; + [[nodiscard]] std::string get_readable_timestamp() const; [[nodiscard]] std::uint8_t get_type_code_raw() const noexcept { diff --git a/src/binsrv/filesystem_storage_backend.cpp b/src/binsrv/filesystem_storage_backend.cpp index 7638e6a..631ea7f 100644 --- a/src/binsrv/filesystem_storage_backend.cpp +++ b/src/binsrv/filesystem_storage_backend.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include "binsrv/storage_config.hpp" @@ -202,6 +203,17 @@ filesystem_storage_backend::do_get_description() const { return "local filesystem - " + root_path_.generic_string(); } +[[nodiscard]] std::string +filesystem_storage_backend::do_get_object_uri(std::string_view name) const { + boost::urls::url result; + result.set_scheme_id(boost::urls::scheme::file); + // set_encoded_authority() is needed to create proper 'file:///path' + // instead of 'file:/path' + result.set_encoded_authority(""); + result.set_path(get_object_path(name).generic_string()); + return result.c_str(); +} + [[nodiscard]] std::filesystem::path filesystem_storage_backend::get_object_path(std::string_view name) const { auto result{root_path_}; diff --git a/src/binsrv/filesystem_storage_backend.hpp b/src/binsrv/filesystem_storage_backend.hpp index b83c8bc..dd2dbaf 100644 --- a/src/binsrv/filesystem_storage_backend.hpp +++ b/src/binsrv/filesystem_storage_backend.hpp @@ -55,6 +55,8 @@ class [[nodiscard]] filesystem_storage_backend final void do_close_stream() override; [[nodiscard]] std::string do_get_description() const override; + [[nodiscard]] std::string + do_get_object_uri(std::string_view name) const override; [[nodiscard]] std::filesystem::path get_object_path(std::string_view name) const; diff --git a/src/binsrv/gtids/gtid_set_fwd.hpp b/src/binsrv/gtids/gtid_set_fwd.hpp index 18ccf11..4f915a1 100644 --- a/src/binsrv/gtids/gtid_set_fwd.hpp +++ b/src/binsrv/gtids/gtid_set_fwd.hpp @@ -17,10 +17,12 @@ #define BINSRV_GTIDS_GTID_SET_FWD_HPP #include +#include namespace binsrv::gtids { class gtid_set; +using optional_gtid_set = std::optional; std::ostream &operator<<(std::ostream &output, const gtid_set &obj); diff --git a/src/binsrv/models/binlog_file_record.hpp b/src/binsrv/models/binlog_file_record.hpp new file mode 100644 index 0000000..ec17ea7 --- /dev/null +++ b/src/binsrv/models/binlog_file_record.hpp @@ -0,0 +1,43 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_MODELS_BINLOG_FILE_RECORD_HPP +#define BINSRV_MODELS_BINLOG_FILE_RECORD_HPP + +#include "binsrv/models/binlog_file_record_fwd.hpp" // IWYU pragma: export + +#include +#include + +#include "binsrv/ctime_timestamp.hpp" + +#include "util/nv_tuple.hpp" + +namespace binsrv::models { + +struct [[nodiscard]] binlog_file_record + : util::nv_tuple< + // clang-format off + util::nv<"name", std::string>, + util::nv<"size", std::uint64_t>, + util::nv<"uri", std::string>, + util::nv<"min_timestamp", ctime_timestamp>, + util::nv<"max_timestamp", ctime_timestamp> + // clang-format on + > {}; + +} // namespace binsrv::models + +#endif // BINSRV_MODELS_BINLOG_FILE_RECORD_HPP diff --git a/src/binsrv/models/binlog_file_record_fwd.hpp b/src/binsrv/models/binlog_file_record_fwd.hpp new file mode 100644 index 0000000..dae4f8e --- /dev/null +++ b/src/binsrv/models/binlog_file_record_fwd.hpp @@ -0,0 +1,25 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_MODELS_BINLOG_FILE_RECORD_FWD_HPP +#define BINSRV_MODELS_BINLOG_FILE_RECORD_FWD_HPP + +namespace binsrv::models { + +struct binlog_file_record; + +} // namespace binsrv::models + +#endif // BINSRV_MODELS_BINLOG_FILE_RECORD_FWD_HPP diff --git a/src/binsrv/models/error_response.cpp b/src/binsrv/models/error_response.cpp new file mode 100644 index 0000000..66b6975 --- /dev/null +++ b/src/binsrv/models/error_response.cpp @@ -0,0 +1,42 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#include "binsrv/models/error_response.hpp" + +#include +#include + +#include +#include + +#include "binsrv/models/response_status_type.hpp" + +#include "util/nv_tuple_to_json.hpp" + +namespace binsrv::models { + +error_response::error_response(std::string_view message) + : impl_{{expected_error_response_version}, + {response_status_type::error}, + {std::string{message}}} {} + +[[nodiscard]] std::string error_response::str() const { + boost::json::value json_value; + util::nv_tuple_to_json(json_value, impl_); + + return boost::json::serialize(json_value); +} + +} // namespace binsrv::models diff --git a/src/binsrv/models/error_response.hpp b/src/binsrv/models/error_response.hpp new file mode 100644 index 0000000..e6a8fc5 --- /dev/null +++ b/src/binsrv/models/error_response.hpp @@ -0,0 +1,54 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_MODELS_ERROR_RESPONSE_HPP +#define BINSRV_MODELS_ERROR_RESPONSE_HPP + +#include "binsrv/models/error_response_fwd.hpp" // IWYU pragma: export + +#include +#include +#include + +#include "binsrv/models/response_status_type_fwd.hpp" + +#include "util/nv_tuple.hpp" + +namespace binsrv::models { + +class [[nodiscard]] error_response { +private: + using impl_type = util::nv_tuple< + // clang-format off + util::nv<"version", std::uint32_t>, + util::nv<"status", response_status_type>, + util::nv<"message", std::string> + // clang-format on + >; + +public: + explicit error_response(std::string_view message); + + [[nodiscard]] std::string str() const; + + [[nodiscard]] auto &root() noexcept { return impl_; } + +private: + impl_type impl_; +}; + +} // namespace binsrv::models + +#endif // BINSRV_MODELS_ERROR_RESPONSE_HPP diff --git a/src/binsrv/models/error_response_fwd.hpp b/src/binsrv/models/error_response_fwd.hpp new file mode 100644 index 0000000..3a1c33f --- /dev/null +++ b/src/binsrv/models/error_response_fwd.hpp @@ -0,0 +1,29 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_MODELS_ERROR_RESPONSE_FWD_HPP +#define BINSRV_MODELS_ERROR_RESPONSE_FWD_HPP + +#include + +namespace binsrv::models { + +class error_response; + +inline constexpr std::uint32_t expected_error_response_version{1U}; + +} // namespace binsrv::models + +#endif // BINSRV_MODELS_ERROR_RESPONSE_FWD_HPP diff --git a/src/binsrv/models/response_status_type.hpp b/src/binsrv/models/response_status_type.hpp new file mode 100644 index 0000000..e42e69d --- /dev/null +++ b/src/binsrv/models/response_status_type.hpp @@ -0,0 +1,96 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_MODELS_RESPONSE_STATUS_TYPE_HPP +#define BINSRV_MODELS_RESPONSE_STATUS_TYPE_HPP + +#include "binsrv/models/response_status_type_fwd.hpp" // IWYU pragma: export + +#include +#include +#include +#include +#include +#include +#include + +#include "util/conversion_helpers.hpp" + +namespace binsrv::models { + +// NOLINTBEGIN(cppcoreguidelines-macro-usage) +// clang-format off +#define BINSRV_MODELS_RESPONSE_STATUS_TYPE_X_SEQUENCE() \ + BINSRV_MODELS_RESPONSE_STATUS_TYPE_X_MACRO(error ), \ + BINSRV_MODELS_RESPONSE_STATUS_TYPE_X_MACRO(success) +// clang-format on + +#define BINSRV_MODELS_RESPONSE_STATUS_TYPE_X_MACRO(X) X +enum class response_status_type : std::uint8_t { + BINSRV_MODELS_RESPONSE_STATUS_TYPE_X_SEQUENCE(), + delimiter +}; +#undef BINSRV_MODELS_RESPONSE_STATUS_TYPE_X_MACRO + +inline std::string_view to_string_view(response_status_type level) noexcept { + using namespace std::string_view_literals; +#define BINSRV_MODELS_RESPONSE_STATUS_TYPE_X_MACRO(X) #X##sv + static constexpr std::array labels{ + BINSRV_MODELS_RESPONSE_STATUS_TYPE_X_SEQUENCE(), ""sv}; +#undef BINSRV_MODELS_RESPONSE_STATUS_TYPE_X_MACRO + const auto index{ + util::enum_to_index(std::min(response_status_type::delimiter, level))}; + // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-constant-array-index) + return labels[index]; +} +#undef BINSRV_LOG_SEVERITY_X_SEQUENCE +// NOLINTEND(cppcoreguidelines-macro-usage) + +template + requires std::same_as +std::basic_ostream & +operator<<(std::basic_ostream &output, + response_status_type level) { + return output << to_string_view(level); +} + +template + requires std::same_as +std::basic_istream & +operator>>(std::basic_istream &input, + response_status_type &level) { + std::string level_str; + input >> level_str; + if (!input) { + return input; + } + std::size_t index{0U}; + const auto max_index = util::enum_to_index(response_status_type::delimiter); + while (index < max_index && + to_string_view(util::index_to_enum(index)) != + level_str) { + ++index; + } + if (index < max_index) { + level = util::index_to_enum(index); + } else { + input.setstate(std::ios_base::failbit); + } + return input; +} + +} // namespace binsrv::models + +#endif // BINSRV_MODELS_RESPONSE_STATUS_TYPE_HPP diff --git a/src/binsrv/models/response_status_type_fwd.hpp b/src/binsrv/models/response_status_type_fwd.hpp new file mode 100644 index 0000000..136c493 --- /dev/null +++ b/src/binsrv/models/response_status_type_fwd.hpp @@ -0,0 +1,47 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_MODELS_RESPONSE_STATUS_TYPE_FWD_HPP +#define BINSRV_MODELS_RESPONSE_STATUS_TYPE_FWD_HPP + +#include +#include +#include + +#include "util/nv_tuple_json_support.hpp" + +namespace binsrv::models { + +enum class response_status_type : std::uint8_t; + +template + requires std::same_as +std::basic_ostream & +operator<<(std::basic_ostream &output, + response_status_type level); + +template + requires std::same_as +std::basic_istream & +operator>>(std::basic_istream &input, + response_status_type &level); + +} // namespace binsrv::models + +template <> +struct util::is_string_convertable + : std::true_type {}; + +#endif // BINSRV_MODELS_RESPONSE_STATUS_TYPE_FWD_HPP diff --git a/src/binsrv/models/search_by_timestamp_response.cpp b/src/binsrv/models/search_by_timestamp_response.cpp new file mode 100644 index 0000000..63eae75 --- /dev/null +++ b/src/binsrv/models/search_by_timestamp_response.cpp @@ -0,0 +1,69 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#include "binsrv/models/search_by_timestamp_response.hpp" + +#include +#include +#include +#include +#include + +#include +#include + +#include "binsrv/models/binlog_file_record.hpp" +#include "binsrv/models/response_status_type.hpp" + +#include "util/nv_tuple_to_json.hpp" + +namespace binsrv::models { + +search_by_timestamp_response::search_by_timestamp_response() + : impl_{{expected_search_by_timestamp_response_version}, + {response_status_type::success}, + {}} {} + +search_by_timestamp_response::search_by_timestamp_response( + const search_by_timestamp_response &) = default; +search_by_timestamp_response::search_by_timestamp_response( + search_by_timestamp_response &&) noexcept = default; +search_by_timestamp_response &search_by_timestamp_response::operator=( + const search_by_timestamp_response &) = default; +search_by_timestamp_response &search_by_timestamp_response::operator=( + search_by_timestamp_response &&) noexcept = default; +search_by_timestamp_response::~search_by_timestamp_response() = default; + +[[nodiscard]] std::string search_by_timestamp_response::str() const { + boost::json::value json_value; + util::nv_tuple_to_json(json_value, impl_); + + return boost::json::serialize(json_value); +} + +void search_by_timestamp_response::add_record(std::string_view name, + std::uint64_t size, + std::string_view uri, + std::time_t min_timestamp, + std::time_t max_timestamp) { + binlog_file_record record{{{std::string{name}}, + {size}, + {std::string{uri}}, + {ctime_timestamp{min_timestamp}}, + {ctime_timestamp{max_timestamp}}}}; + impl_.template get<"result">().emplace_back(std::move(record)); +} + +} // namespace binsrv::models diff --git a/src/binsrv/models/search_by_timestamp_response.hpp b/src/binsrv/models/search_by_timestamp_response.hpp new file mode 100644 index 0000000..08c3c89 --- /dev/null +++ b/src/binsrv/models/search_by_timestamp_response.hpp @@ -0,0 +1,69 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_MODELS_SEARCH_BY_TIMESTAMP_RESPONSE_HPP +#define BINSRV_MODELS_SEARCH_BY_TIMESTAMP_RESPONSE_HPP + +#include "binsrv/models/search_by_timestamp_response_fwd.hpp" // IWYU pragma: export + +#include +#include +#include +#include +#include + +#include "binsrv/models/binlog_file_record_fwd.hpp" +#include "binsrv/models/response_status_type_fwd.hpp" + +#include "util/nv_tuple.hpp" + +namespace binsrv::models { + +class [[nodiscard]] search_by_timestamp_response { +private: + using binlog_file_record_container = std::vector; + + using impl_type = util::nv_tuple< + // clang-format off + util::nv<"version", std::uint32_t>, + util::nv<"status", response_status_type>, + util::nv<"result", binlog_file_record_container> + // clang-format on + >; + +public: + explicit search_by_timestamp_response(); + search_by_timestamp_response(const search_by_timestamp_response &); + search_by_timestamp_response(search_by_timestamp_response &&) noexcept; + search_by_timestamp_response &operator=(const search_by_timestamp_response &); + search_by_timestamp_response & + operator=(search_by_timestamp_response &&) noexcept; + ~search_by_timestamp_response(); + + [[nodiscard]] std::string str() const; + + [[nodiscard]] auto &root() noexcept { return impl_; } + + void add_record(std::string_view name, std::uint64_t size, + std::string_view uri, std::time_t min_timestamp, + std::time_t max_timestamp); + +private: + impl_type impl_; +}; + +} // namespace binsrv::models + +#endif // BINSRV_MODELS_SEARCH_BY_TIMESTAMP_RESPONSE_HPP diff --git a/src/binsrv/models/search_by_timestamp_response_fwd.hpp b/src/binsrv/models/search_by_timestamp_response_fwd.hpp new file mode 100644 index 0000000..4cdc86a --- /dev/null +++ b/src/binsrv/models/search_by_timestamp_response_fwd.hpp @@ -0,0 +1,30 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_MODELS_SEARCH_BY_TIMESTAMP_RESPONSE_FWD_HPP +#define BINSRV_MODELS_SEARCH_BY_TIMESTAMP_RESPONSE_FWD_HPP + +#include + +namespace binsrv::models { + +class search_by_timestamp_response; + +inline constexpr std::uint32_t expected_search_by_timestamp_response_version{ + 1U}; + +} // namespace binsrv::models + +#endif // BINSRV_MODELS_SEARCH_BY_TIMESTAMP_RESPONSE_FWD_HPP diff --git a/src/binsrv/operation_mode_type.hpp b/src/binsrv/operation_mode_type.hpp index d22821b..ee9a8e6 100644 --- a/src/binsrv/operation_mode_type.hpp +++ b/src/binsrv/operation_mode_type.hpp @@ -32,9 +32,10 @@ namespace binsrv { // NOLINTBEGIN(cppcoreguidelines-macro-usage) // clang-format off #define BINSRV_OPERATION_MODE_TYPE_X_SEQUENCE() \ - BINSRV_OPERATION_MODE_TYPE_X_MACRO(fetch ), \ - BINSRV_OPERATION_MODE_TYPE_X_MACRO(pull ), \ - BINSRV_OPERATION_MODE_TYPE_X_MACRO(version) + BINSRV_OPERATION_MODE_TYPE_X_MACRO(fetch ), \ + BINSRV_OPERATION_MODE_TYPE_X_MACRO(pull ), \ + BINSRV_OPERATION_MODE_TYPE_X_MACRO(search_by_timestamp), \ + BINSRV_OPERATION_MODE_TYPE_X_MACRO(version ) // clang-format on #define BINSRV_OPERATION_MODE_TYPE_X_MACRO(X) X diff --git a/src/binsrv/s3_storage_backend.cpp b/src/binsrv/s3_storage_backend.cpp index 6344f3e..45986fb 100644 --- a/src/binsrv/s3_storage_backend.cpp +++ b/src/binsrv/s3_storage_backend.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include @@ -731,6 +732,23 @@ void s3_storage_backend::do_close_stream() { return res; } +[[nodiscard]] std::string +s3_storage_backend::do_get_object_uri(std::string_view name) const { + boost::urls::url result; + if (impl_->has_endpoint()) { + result.set_scheme(impl_->get_scheme_label()); + result.set_encoded_authority(impl_->get_endpoint()); + std::filesystem::path result_path{result.path()}; + result_path /= get_object_path(name); + result.set_path(result_path.generic_string()); + } else { + result.set_scheme(original_uri_schema); + result.set_host(get_bucket()); + result.set_path(get_object_path(name).generic_string()); + } + return result.c_str(); +} + [[nodiscard]] std::filesystem::path s3_storage_backend::get_object_path(std::string_view name) const { auto result{root_path_}; diff --git a/src/binsrv/s3_storage_backend.hpp b/src/binsrv/s3_storage_backend.hpp index 421f097..e4bf039 100644 --- a/src/binsrv/s3_storage_backend.hpp +++ b/src/binsrv/s3_storage_backend.hpp @@ -80,6 +80,8 @@ class [[nodiscard]] s3_storage_backend final : public basic_storage_backend { void do_close_stream() override; [[nodiscard]] std::string do_get_description() const override; + [[nodiscard]] std::string + do_get_object_uri(std::string_view name) const override; [[nodiscard]] std::filesystem::path get_object_path(std::string_view name) const; diff --git a/src/binsrv/storage.cpp b/src/binsrv/storage.cpp index 2bd0fa8..326c3b9 100644 --- a/src/binsrv/storage.cpp +++ b/src/binsrv/storage.cpp @@ -29,6 +29,7 @@ #include "binsrv/basic_storage_backend.hpp" #include "binsrv/binlog_file_metadata.hpp" +#include "binsrv/ctime_timestamp.hpp" #include "binsrv/replication_mode_type.hpp" #include "binsrv/storage_backend_factory.hpp" #include "binsrv/storage_config.hpp" @@ -45,8 +46,10 @@ namespace binsrv { storage::storage(const storage_config &config, + storage_construction_mode_type construction_mode, replication_mode_type replication_mode) - : backend_{}, replication_mode_{replication_mode}, binlog_names_{} { + : construction_mode_{construction_mode}, backend_{}, + replication_mode_{replication_mode} { const auto &checkpoint_size_opt{config.get<"checkpoint_size">()}; if (checkpoint_size_opt.has_value()) { checkpoint_size_bytes_ = checkpoint_size_opt.value().get_value(); @@ -63,7 +66,9 @@ storage::storage(const storage_config &config, auto storage_objects{backend_->list_objects()}; if (storage_objects.empty()) { // initialized on a new / empty storage - just save metadata and return - save_metadata(); + if (construction_mode_ == storage_construction_mode_type::streaming) { + save_metadata(); + } return; } @@ -105,10 +110,12 @@ storage::storage(const storage_config &config, } storage::~storage() { - // bugprone-empty-catch should not be that strict in destructors - try { - flush_event_buffer(); - } catch (...) { // NOLINT(bugprone-empty-catch) + if (construction_mode_ == storage_construction_mode_type::streaming) { + // bugprone-empty-catch should not be that strict in destructors + try { + flush_event_buffer(); + } catch (...) { // NOLINT(bugprone-empty-catch) + } } } @@ -137,6 +144,8 @@ storage::check_binlog_name(std::string_view binlog_name) noexcept { [[nodiscard]] open_binlog_status storage::open_binlog(std::string_view binlog_name) { + ensure_streaming_mode(); + auto result{open_binlog_status::opened_with_data_present}; if (!check_binlog_name(binlog_name)) { @@ -145,13 +154,14 @@ storage::open_binlog(std::string_view binlog_name) { } // here we either create a new binlog file if its name is not presen in the - // "binlog_names_", or we open an existing one and append to it, in which + // "binlog_records_", or we open an existing one and append to it, in which // case we need to make sure that the current position is properly set - const bool binlog_exists{std::ranges::find(binlog_names_, binlog_name) != - std::end(binlog_names_)}; + const bool binlog_exists{ + std::ranges::find(binlog_records_, binlog_name, &binlog_record::name) != + std::end(binlog_records_)}; // in the case when binlog exists, the name must be equal to the last item in - // "binlog_names_" list and "position_" must be set to a non-zero value + // "binlog_records_" list and "position_" must be set to a non-zero value if (binlog_exists) { if (binlog_name != get_current_binlog_name()) { util::exception_location().raise( @@ -173,16 +183,22 @@ storage::open_binlog(std::string_view binlog_name) { // created file backend_->write_data_to_stream(event::magic_binlog_payload); - binlog_names_.emplace_back(binlog_name); + gtids::optional_gtid_set previous_binlog_gtids{}; + if (is_in_gtid_replication_mode()) { + previous_binlog_gtids = get_gtids(); + } + + binlog_records_.emplace_back( + std::string{binlog_name}, event::magic_binlog_offset, + std::move(previous_binlog_gtids), ctime_timestamp_range{}); + save_binlog_metadata(get_current_binlog_record()); save_binlog_index(); - position_ = event::magic_binlog_offset; - save_binlog_metadata(get_current_binlog_name()); result = open_binlog_status::created; } else { - assert(position_ == open_stream_offset); + assert(get_current_position() == open_stream_offset); if (open_stream_offset == 0ULL) { backend_->write_data_to_stream(event::magic_binlog_payload); - position_ = event::magic_binlog_offset; + get_current_binlog_record().size = event::magic_binlog_offset; result = open_binlog_status::opened_empty; } else if (open_stream_offset == event::magic_binlog_offset) { result = open_binlog_status::opened_at_magic_paylod_offset; @@ -192,37 +208,38 @@ storage::open_binlog(std::string_view binlog_name) { result = open_binlog_status::opened_with_data_present; } } - if (size_checkpointing_enabled()) { - last_checkpoint_position_ = get_current_position(); - } - if (interval_checkpointing_enabled()) { - last_checkpoint_timestamp_ = std::chrono::steady_clock::now(); - } + update_last_checkpoint_info(); assert(std::size(event_buffer_) == 0U); event_buffer_.reserve(default_event_buffer_size_in_bytes); assert(!has_event_data_to_flush()); assert(gtids_in_event_buffer_.is_empty()); + assert(ready_to_flush_timestamps_.is_empty()); + assert(incomplete_transaction_timestamps_.is_empty()); return result; } void storage::write_event(util::const_byte_span event_data, bool at_transaction_boundary, - const gtids::gtid &transaction_gtid) { + const gtids::gtid &transaction_gtid, + const ctime_timestamp &event_timestamp) { + ensure_streaming_mode(); + event_buffer_.insert(std::end(event_buffer_), std::cbegin(event_data), std::cend(event_data)); + incomplete_transaction_timestamps_.add_timestamp(event_timestamp); + if (at_transaction_boundary) { last_transaction_boundary_position_in_event_buffer_ = std::size(event_buffer_); if (is_in_gtid_replication_mode() && !transaction_gtid.is_empty()) { gtids_in_event_buffer_ += transaction_gtid; } + ready_to_flush_timestamps_.add_range(incomplete_transaction_timestamps_); + incomplete_transaction_timestamps_.clear(); } - const auto event_data_size{std::size(event_data)}; - position_ += event_data_size; - // now we are writing data from the event buffer to the storage backend if // event buffer has some data in it that can be considered a complete // transaction and a checkpoint event (either size-based or time-based) @@ -262,34 +279,52 @@ void storage::write_event(util::const_byte_span event_data, } void storage::close_binlog() { + ensure_streaming_mode(); + flush_event_buffer(); event_buffer_.clear(); event_buffer_.shrink_to_fit(); backend_->close_stream(); - position_ = 0ULL; - if (size_checkpointing_enabled()) { - last_checkpoint_position_ = get_current_position(); - } - if (interval_checkpointing_enabled()) { - last_checkpoint_timestamp_ = std::chrono::steady_clock::now(); - } + update_last_checkpoint_info(); } void storage::discard_incomplete_transaction_events() { - const std::size_t bytes_to_discard{ - std::size(event_buffer_) - - last_transaction_boundary_position_in_event_buffer_}; - position_ -= bytes_to_discard; + ensure_streaming_mode(); + event_buffer_.resize(last_transaction_boundary_position_in_event_buffer_); + incomplete_transaction_timestamps_.clear(); } void storage::flush_event_buffer() { + ensure_streaming_mode(); + if (has_event_data_to_flush()) { flush_event_buffer_internal(); } } +[[nodiscard]] std::string +storage::get_binlog_uri(std::string_view binlog_name) const { + return backend_->get_object_uri(binlog_name); +} + +void storage::ensure_streaming_mode() const { + if (construction_mode_ != storage_construction_mode_type::streaming) { + util::exception_location().raise( + "operation requires storage to be constructed in streaming mode"); + } +} + +void storage::update_last_checkpoint_info() { + if (size_checkpointing_enabled()) { + last_checkpoint_position_ = get_current_position(); + } + if (interval_checkpointing_enabled()) { + last_checkpoint_timestamp_ = std::chrono::steady_clock::now(); + } +} + void storage::flush_event_buffer_internal() { assert(!event_buffer_.empty()); assert(last_transaction_boundary_position_in_event_buffer_ <= @@ -301,10 +336,17 @@ void storage::flush_event_buffer_internal() { // writing bytes from // the beginning of the event buffer backend_->write_data_to_stream(transactions_data); + get_current_binlog_record().size += + last_transaction_boundary_position_in_event_buffer_; if (is_in_gtid_replication_mode()) { - gtids_ += gtids_in_event_buffer_; + auto &optional_gtids{get_current_binlog_record().gtids}; + if (optional_gtids.has_value()) { + optional_gtids.value() += gtids_in_event_buffer_; + } } - save_binlog_metadata(get_current_binlog_name()); + get_current_binlog_record().timestamps.add_range(ready_to_flush_timestamps_); + + save_binlog_metadata(get_current_binlog_record()); const auto begin_it{std::begin(event_buffer_)}; const auto portion_it{std::next( @@ -317,6 +359,7 @@ void storage::flush_event_buffer_internal() { if (is_in_gtid_replication_mode()) { gtids_in_event_buffer_.clear(); } + ready_to_flush_timestamps_.clear(); } void storage::load_binlog_index() { @@ -344,29 +387,29 @@ void storage::load_binlog_index() { "binlog index contains a reference to a binlog with invalid " "name"); } - if (std::ranges::find(binlog_names_, current_binlog_name) != - std::end(binlog_names_)) { + if (std::ranges::find(binlog_records_, current_binlog_name, + &binlog_record::name) != std::end(binlog_records_)) { util::exception_location().raise( "binlog index contains a duplicate entry"); } - binlog_names_.emplace_back(std::move(current_binlog_name)); + binlog_records_.emplace_back(current_binlog_name, 0ULL, gtids::gtid_set{}, + ctime_timestamp_range{}); } } void storage::validate_binlog_index( const storage_object_name_container &object_names) const { - for (auto const &[object_name, object_size] : object_names) { - if (std::ranges::find(binlog_names_, object_name) == - std::end(binlog_names_)) { + for (auto const &record : binlog_records_) { + if (!object_names.contains(record.name)) { util::exception_location().raise( - "storage contains an object that is not " - "referenced in the binlog index"); + "binlog index contains a reference to a non-existing object"); } } - if (std::size(object_names) != std::size(binlog_names_)) { + if (std::size(object_names) != std::size(binlog_records_)) { util::exception_location().raise( - "binlog index contains a reference to a non-existing object"); + "storage contains an object that is not " + "referenced in the binlog index"); } // TODO: add integrity checks (parsing + checksumming) for the binlog @@ -375,9 +418,9 @@ void storage::validate_binlog_index( void storage::save_binlog_index() const { std::ostringstream oss; - for (const auto &binlog_name : binlog_names_) { + for (const auto &record : binlog_records_) { std::filesystem::path binlog_path{default_binlog_index_entry_path}; - binlog_path /= binlog_name; + binlog_path /= record.name; oss << binlog_path.generic_string() << '\n'; } const auto content{oss.str()}; @@ -413,24 +456,45 @@ storage::generate_binlog_metadata_name(std::string_view binlog_name) { return binlog_metadata_name; } -void storage::load_binlog_metadata(std::string_view binlog_name) { +[[nodiscard]] storage::binlog_record +storage::load_binlog_metadata(std::string_view binlog_name) const { const auto content{ backend_->get_object(generate_binlog_metadata_name(binlog_name))}; binlog_file_metadata metadata{content}; - position_ = metadata.root().get<"size">(); + + return binlog_record{.name = std::string(binlog_name), + .size = metadata.root().get<"size">(), + .gtids = metadata.get_gtids(), + .timestamps = {metadata.root().get<"min_timestamp">(), + metadata.root().get<"max_timestamp">()}}; +} + +void storage::validate_binlog_metadata(const binlog_record &record) const { if (is_in_gtid_replication_mode()) { - gtids_ = metadata.get_gtids(); + if (!record.gtids.has_value()) { + util::exception_location().raise( + "missing GTID set in the binlog metadata while in GTID replication " + "mode"); + } + } else { + if (record.gtids.has_value()) { + util::exception_location().raise( + "found GTID set in the binlog metadata while in position " + "replication mode"); + } } } -void storage::save_binlog_metadata(std::string_view binlog_name) const { +void storage::save_binlog_metadata(const binlog_record &record) const { binlog_file_metadata metadata{}; - metadata.root().get<"size">() = get_ready_to_flush_position(); - if (is_in_gtid_replication_mode()) { - metadata.set_gtids(get_gtids()); - } + metadata.root().get<"size">() = record.size; + metadata.set_gtids(record.gtids); + metadata.root().get<"min_timestamp">() = + ctime_timestamp{record.timestamps.get_min_timestamp()}; + metadata.root().get<"max_timestamp">() = + ctime_timestamp{record.timestamps.get_max_timestamp()}; const auto content{metadata.str()}; - backend_->put_object(generate_binlog_metadata_name(binlog_name), + backend_->put_object(generate_binlog_metadata_name(record.name), util::as_const_byte_span(content)); } @@ -438,28 +502,26 @@ void storage::load_and_validate_binlog_metadata_set( // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) const storage_object_name_container &object_names, const storage_object_name_container &object_metadata_names) { - for (const auto &binlog_name : binlog_names_) { - const auto binlog_metadata_name{generate_binlog_metadata_name(binlog_name)}; + for (auto &record : binlog_records_) { + const auto binlog_metadata_name{generate_binlog_metadata_name(record.name)}; if (!object_metadata_names.contains(binlog_metadata_name)) { util::exception_location().raise( "missing metadata for a binlog listed in the binlog index"); } - load_binlog_metadata(binlog_name); - if (get_current_position() != object_names.at(binlog_name)) { + auto loaded_binlog_metadata{load_binlog_metadata(record.name)}; + validate_binlog_metadata(loaded_binlog_metadata); + // validating that the size stored in the metadata matches the actual size + if (loaded_binlog_metadata.size != object_names.at(record.name)) { util::exception_location().raise( "size from the binlog metadata does not match the actual binlog " "size"); } - if (!is_in_gtid_replication_mode() && !gtids_.is_empty()) { - util::exception_location().raise( - "found non-empty GTID set in the binlog metadata while in position " - "replication mode"); - } + record = std::move(loaded_binlog_metadata); } // after this loop position_ and gtids_ should store the values from the last // binlog file metadata - if (std::size(object_metadata_names) != std::size(binlog_names_)) { + if (std::size(object_metadata_names) != std::size(binlog_records_)) { util::exception_location().raise( "found metadata for a non-existing binlog"); } diff --git a/src/binsrv/storage.hpp b/src/binsrv/storage.hpp index 491ad50..a0af80a 100644 --- a/src/binsrv/storage.hpp +++ b/src/binsrv/storage.hpp @@ -24,6 +24,8 @@ #include #include "binsrv/basic_storage_backend_fwd.hpp" +#include "binsrv/ctime_timestamp_fwd.hpp" +#include "binsrv/ctime_timestamp_range.hpp" #include "binsrv/replication_mode_type_fwd.hpp" #include "binsrv/storage_config_fwd.hpp" @@ -35,6 +37,15 @@ namespace binsrv { class [[nodiscard]] storage { +private: + struct binlog_record { + std::string name; + std::uint64_t size{0ULL}; + gtids::optional_gtid_set gtids{}; + ctime_timestamp_range timestamps{}; + }; + using binlog_record_container = std::vector; + public: static constexpr std::string_view default_binlog_index_name{"binlog.index"}; static constexpr std::string_view default_binlog_index_entry_path{"."}; @@ -44,7 +55,9 @@ class [[nodiscard]] storage { static constexpr std::size_t default_event_buffer_size_in_bytes{16384U}; // passing by value as we are going to move from this unique_ptr - storage(const storage_config &config, replication_mode_type replication_mode); + storage(const storage_config &config, + storage_construction_mode_type construction_mode, + replication_mode_type replication_mode); storage(const storage &) = delete; storage &operator=(const storage &) = delete; @@ -61,19 +74,29 @@ class [[nodiscard]] storage { } [[nodiscard]] bool is_in_gtid_replication_mode() const noexcept; - [[nodiscard]] bool has_current_binlog_name() const noexcept { - return !binlog_names_.empty(); + [[nodiscard]] const binlog_record_container & + get_binlog_records() const noexcept { + return binlog_records_; + } + [[nodiscard]] bool is_empty() const noexcept { + return !binlog_records_.empty(); } [[nodiscard]] std::string_view get_current_binlog_name() const noexcept { - return has_current_binlog_name() ? binlog_names_.back() - : std::string_view{}; + return is_empty() ? get_current_binlog_record().name : std::string_view{}; } [[nodiscard]] std::uint64_t get_current_position() const noexcept { - return position_; + return get_flushed_position() + std::size(event_buffer_); } [[nodiscard]] const gtids::gtid_set &get_gtids() const noexcept { - return gtids_; + if (is_empty()) { + return empty_gtids_; + } + const auto &optional_gtids{get_current_binlog_record().gtids}; + if (!optional_gtids.has_value()) { + return empty_gtids_; + } + return *optional_gtids; } [[nodiscard]] static bool @@ -84,21 +107,21 @@ class [[nodiscard]] storage { [[nodiscard]] open_binlog_status open_binlog(std::string_view binlog_name); void write_event(util::const_byte_span event_data, bool at_transaction_boundary, - const gtids::gtid &transaction_gtid); + const gtids::gtid &transaction_gtid, + const ctime_timestamp &event_timestamp); void close_binlog(); void discard_incomplete_transaction_events(); void flush_event_buffer(); + [[nodiscard]] std::string get_binlog_uri(std::string_view binlog_name) const; + private: + storage_construction_mode_type construction_mode_; basic_storage_backend_ptr backend_; replication_mode_type replication_mode_; - using binlog_name_container = std::vector; - binlog_name_container binlog_names_; - std::uint64_t position_{0ULL}; - - gtids::gtid_set gtids_{}; + binlog_record_container binlog_records_{}; std::uint64_t checkpoint_size_bytes_{0ULL}; std::uint64_t last_checkpoint_position_{0ULL}; @@ -106,10 +129,24 @@ class [[nodiscard]] storage { std::chrono::steady_clock::duration checkpoint_interval_seconds_{}; std::chrono::steady_clock::time_point last_checkpoint_timestamp_{}; + gtids::gtid_set empty_gtids_{}; + using event_buffer_type = std::vector; event_buffer_type event_buffer_{}; std::size_t last_transaction_boundary_position_in_event_buffer_{}; gtids::gtid_set gtids_in_event_buffer_{}; + ctime_timestamp_range ready_to_flush_timestamps_{}; + ctime_timestamp_range incomplete_transaction_timestamps_{}; + + void ensure_streaming_mode() const; + + [[nodiscard]] const binlog_record & + get_current_binlog_record() const noexcept { + return binlog_records_.back(); + } + [[nodiscard]] binlog_record &get_current_binlog_record() noexcept { + return binlog_records_.back(); + } [[nodiscard]] bool size_checkpointing_enabled() const noexcept { return checkpoint_size_bytes_ != 0ULL; @@ -119,12 +156,13 @@ class [[nodiscard]] storage { return checkpoint_interval_seconds_ != std::chrono::steady_clock::duration{}; } + void update_last_checkpoint_info(); [[nodiscard]] bool has_event_data_to_flush() const noexcept { return last_transaction_boundary_position_in_event_buffer_ != 0ULL; } [[nodiscard]] std::uint64_t get_flushed_position() const noexcept { - return get_current_position() - std::size(event_buffer_); + return is_empty() ? get_current_binlog_record().size : 0ULL; } [[nodiscard]] std::uint64_t get_ready_to_flush_position() const noexcept { return get_flushed_position() + @@ -143,8 +181,10 @@ class [[nodiscard]] storage { [[nodiscard]] static std::string generate_binlog_metadata_name(std::string_view binlog_name); - void load_binlog_metadata(std::string_view binlog_name); - void save_binlog_metadata(std::string_view binlog_name) const; + [[nodiscard]] binlog_record + load_binlog_metadata(std::string_view binlog_name) const; + void validate_binlog_metadata(const binlog_record &record) const; + void save_binlog_metadata(const binlog_record &record) const; void load_and_validate_binlog_metadata_set( const storage_object_name_container &object_names, diff --git a/src/binsrv/storage_fwd.hpp b/src/binsrv/storage_fwd.hpp index 84b0268..6adcb2e 100644 --- a/src/binsrv/storage_fwd.hpp +++ b/src/binsrv/storage_fwd.hpp @@ -20,6 +20,11 @@ namespace binsrv { +enum class storage_construction_mode_type : std::uint8_t { + querying_only, + streaming +}; + enum class open_binlog_status : std::uint8_t { created, opened_empty, diff --git a/src/easymysql/ssl_mode_type.hpp b/src/easymysql/ssl_mode_type.hpp index eae8baf..882f95b 100644 --- a/src/easymysql/ssl_mode_type.hpp +++ b/src/easymysql/ssl_mode_type.hpp @@ -62,7 +62,7 @@ inline std::string_view to_string_view(ssl_mode_type level) noexcept { // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-constant-array-index) return labels[index]; } -#undef BINSRV_LOG_SEVERITY_X_SEQUENCE +#undef EASYMYSQL_SSL_MODE_TYPE_X_SEQUENCE // NOLINTEND(cppcoreguidelines-macro-usage) template