diff --git a/src/app.cpp b/src/app.cpp index ed8a1be..8634eb0 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -633,8 +633,11 @@ void receive_binlog_events( storage.discard_incomplete_transaction_events(); } - // TODO: here (upon timing out) we also need to flush internal buffers in - // the storage + // connection termination is a good place to flush any remaining data + // in the event buffer - this can be considered the third kind of + // checkpointing (in addition to size-based and time-based ones) + storage.flush_event_buffer(); + logger.log(binsrv::log_severity::info, "timed out waiting for events and disconnected"); } diff --git a/src/binsrv/storage.cpp b/src/binsrv/storage.cpp index f751acc..2bd0fa8 100644 --- a/src/binsrv/storage.cpp +++ b/src/binsrv/storage.cpp @@ -107,9 +107,7 @@ storage::storage(const storage_config &config, storage::~storage() { // bugprone-empty-catch should not be that strict in destructors try { - if (has_event_data_to_flush()) { - flush_event_buffer(); - } + flush_event_buffer(); } catch (...) { // NOLINT(bugprone-empty-catch) } } @@ -255,7 +253,7 @@ void storage::write_event(util::const_byte_span event_data, checkpoint_interval_seconds_)); } if (needs_flush) { - flush_event_buffer(); + flush_event_buffer_internal(); last_checkpoint_position_ = ready_to_flush_position; last_checkpoint_timestamp_ = now_ts; @@ -264,9 +262,7 @@ void storage::write_event(util::const_byte_span event_data, } void storage::close_binlog() { - if (has_event_data_to_flush()) { - flush_event_buffer(); - } + flush_event_buffer(); event_buffer_.clear(); event_buffer_.shrink_to_fit(); @@ -289,6 +285,12 @@ void storage::discard_incomplete_transaction_events() { } void storage::flush_event_buffer() { + if (has_event_data_to_flush()) { + flush_event_buffer_internal(); + } +} + +void storage::flush_event_buffer_internal() { assert(!event_buffer_.empty()); assert(last_transaction_boundary_position_in_event_buffer_ <= std::size(event_buffer_)); diff --git a/src/binsrv/storage.hpp b/src/binsrv/storage.hpp index 4ce277a..491ad50 100644 --- a/src/binsrv/storage.hpp +++ b/src/binsrv/storage.hpp @@ -88,6 +88,7 @@ class [[nodiscard]] storage { void close_binlog(); void discard_incomplete_transaction_events(); + void flush_event_buffer(); private: basic_storage_backend_ptr backend_; @@ -129,7 +130,7 @@ class [[nodiscard]] storage { return get_flushed_position() + last_transaction_boundary_position_in_event_buffer_; } - void flush_event_buffer(); + void flush_event_buffer_internal(); void load_binlog_index(); void validate_binlog_index(