From 76828665d8de9924b3d4a59f08cde34cec5905b8 Mon Sep 17 00:00:00 2001 From: Yura Sorokin Date: Mon, 19 Jan 2026 11:45:38 +0100 Subject: [PATCH] PS-10320 fix: Local binary log data is not uploaded to S3 bucket upon disconnection due to read_timeout https://perconadev.atlassian.net/browse/PS-10320 Added additional storage internal buffer flushing at the point where we detect connection termination due to timeout. This can be considered the third kind of checkpoiting (in addition to size-based and time-based ones). --- src/app.cpp | 7 +++++-- src/binsrv/storage.cpp | 16 +++++++++------- src/binsrv/storage.hpp | 3 ++- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/app.cpp b/src/app.cpp index ed8a1be..8634eb0 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -633,8 +633,11 @@ void receive_binlog_events( storage.discard_incomplete_transaction_events(); } - // TODO: here (upon timing out) we also need to flush internal buffers in - // the storage + // connection termination is a good place to flush any remaining data + // in the event buffer - this can be considered the third kind of + // checkpointing (in addition to size-based and time-based ones) + storage.flush_event_buffer(); + logger.log(binsrv::log_severity::info, "timed out waiting for events and disconnected"); } diff --git a/src/binsrv/storage.cpp b/src/binsrv/storage.cpp index f751acc..2bd0fa8 100644 --- a/src/binsrv/storage.cpp +++ b/src/binsrv/storage.cpp @@ -107,9 +107,7 @@ storage::storage(const storage_config &config, storage::~storage() { // bugprone-empty-catch should not be that strict in destructors try { - if (has_event_data_to_flush()) { - flush_event_buffer(); - } + flush_event_buffer(); } catch (...) { // NOLINT(bugprone-empty-catch) } } @@ -255,7 +253,7 @@ void storage::write_event(util::const_byte_span event_data, checkpoint_interval_seconds_)); } if (needs_flush) { - flush_event_buffer(); + flush_event_buffer_internal(); last_checkpoint_position_ = ready_to_flush_position; last_checkpoint_timestamp_ = now_ts; @@ -264,9 +262,7 @@ void storage::write_event(util::const_byte_span event_data, } void storage::close_binlog() { - if (has_event_data_to_flush()) { - flush_event_buffer(); - } + flush_event_buffer(); event_buffer_.clear(); event_buffer_.shrink_to_fit(); @@ -289,6 +285,12 @@ void storage::discard_incomplete_transaction_events() { } void storage::flush_event_buffer() { + if (has_event_data_to_flush()) { + flush_event_buffer_internal(); + } +} + +void storage::flush_event_buffer_internal() { assert(!event_buffer_.empty()); assert(last_transaction_boundary_position_in_event_buffer_ <= std::size(event_buffer_)); diff --git a/src/binsrv/storage.hpp b/src/binsrv/storage.hpp index 4ce277a..491ad50 100644 --- a/src/binsrv/storage.hpp +++ b/src/binsrv/storage.hpp @@ -88,6 +88,7 @@ class [[nodiscard]] storage { void close_binlog(); void discard_incomplete_transaction_events(); + void flush_event_buffer(); private: basic_storage_backend_ptr backend_; @@ -129,7 +130,7 @@ class [[nodiscard]] storage { return get_flushed_position() + last_transaction_boundary_position_in_event_buffer_; } - void flush_event_buffer(); + void flush_event_buffer_internal(); void load_binlog_index(); void validate_binlog_index(