From 37e3acf4ae18759211adcf11907c991bfc4a7efe Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Thu, 5 Feb 2026 15:22:24 +0800 Subject: [PATCH 1/2] This is an automated cherry-pick of #10700 Signed-off-by: ti-chi-bot --- .../KVStore/MultiRaft/RaftCommands.cpp | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp b/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp index c8b9fd89c86..a845e68da04 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp @@ -348,6 +348,36 @@ std::pair Region::handleWriteRaftCmd( auto is_v2 = this->getClusterRaftstoreVer() == RaftstoreVer::V2; +<<<<<<< HEAD +======= + // After removing the logic of the read thread writing to the storage engine during learner reads, + // if we remove the locks before committed data written to storage, + // it would cause concurrency control problems that could result in missing some of the latest data. + // The concurrency logic is as follows: + // 1. **Raft thread**: Receives a write log and removes the lock. + // 2. **Read thread**: Detects no lock and begins reading (missing the data written in step 3). + // 3. **Raft thread**: Writes the data. + std::vector deleting_lock_keys; + const size_t lock_count + = std::count_if(cmds.cmd_cf, cmds.cmd_cf + cmds.len, [](auto cf) { return cf == ColumnFamilyType::Lock; }); + deleting_lock_keys.reserve(lock_count); + auto update_write_size = [&write_size](Int64 payload) { + if (payload > 0) + { + write_size += static_cast(payload); + } + else if (payload < 0) + { + // A lock being rewritten could lead to a negative payload. + // Try to turn the negative payload into a positive decrement on `write_size`. + const auto dec = static_cast(-(payload + 1)) + 1; // avoid INT64_MIN overflow + if (write_size >= dec) + write_size -= dec; + else + write_size = 0; + } + }; +>>>>>>> 74de3a584a (KVStore: fix tiflash_raft_throughput_bytes warp problem (#10700)) const auto handle_by_index_func = [&](auto i) { auto type = cmds.cmd_types[i]; auto cf = cmds.cmd_cf[i]; @@ -371,6 +401,7 @@ std::pair Region::handleWriteRaftCmd( } try { +<<<<<<< HEAD if unlikely (is_v2) { // There may be orphan default key in a snapshot. @@ -380,6 +411,12 @@ std::pair Region::handleWriteRaftCmd( { write_size += doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::Deny); } +======= + // There may be orphan default key in a snapshot under raftstore v2. + auto dup_check = is_v2 ? DupCheck::AllowSame : DupCheck::Deny; + auto payload = doInsert(cf, std::move(tikv_key), std::move(tikv_value), dup_check).payload; + update_write_size(payload); +>>>>>>> 74de3a584a (KVStore: fix tiflash_raft_throughput_bytes warp problem (#10700)) } catch (Exception & e) { From bd6a58095d5f7661167e5a0775bdb392ff57fd07 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 5 Feb 2026 16:55:33 +0800 Subject: [PATCH 2/2] Resolve conflict Signed-off-by: JaySon-Huang --- .../KVStore/MultiRaft/RaftCommands.cpp | 30 ++----------------- 1 file changed, 2 insertions(+), 28 deletions(-) diff --git a/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp b/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp index a845e68da04..4a81eefb50f 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/RaftCommands.cpp @@ -348,19 +348,6 @@ std::pair Region::handleWriteRaftCmd( auto is_v2 = this->getClusterRaftstoreVer() == RaftstoreVer::V2; -<<<<<<< HEAD -======= - // After removing the logic of the read thread writing to the storage engine during learner reads, - // if we remove the locks before committed data written to storage, - // it would cause concurrency control problems that could result in missing some of the latest data. - // The concurrency logic is as follows: - // 1. **Raft thread**: Receives a write log and removes the lock. - // 2. **Read thread**: Detects no lock and begins reading (missing the data written in step 3). - // 3. **Raft thread**: Writes the data. - std::vector deleting_lock_keys; - const size_t lock_count - = std::count_if(cmds.cmd_cf, cmds.cmd_cf + cmds.len, [](auto cf) { return cf == ColumnFamilyType::Lock; }); - deleting_lock_keys.reserve(lock_count); auto update_write_size = [&write_size](Int64 payload) { if (payload > 0) { @@ -377,7 +364,6 @@ std::pair Region::handleWriteRaftCmd( write_size = 0; } }; ->>>>>>> 74de3a584a (KVStore: fix tiflash_raft_throughput_bytes warp problem (#10700)) const auto handle_by_index_func = [&](auto i) { auto type = cmds.cmd_types[i]; auto cf = cmds.cmd_cf[i]; @@ -401,22 +387,10 @@ std::pair Region::handleWriteRaftCmd( } try { -<<<<<<< HEAD - if unlikely (is_v2) - { - // There may be orphan default key in a snapshot. - write_size += doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::AllowSame); - } - else - { - write_size += doInsert(cf, std::move(tikv_key), std::move(tikv_value), DupCheck::Deny); - } -======= // There may be orphan default key in a snapshot under raftstore v2. auto dup_check = is_v2 ? DupCheck::AllowSame : DupCheck::Deny; - auto payload = doInsert(cf, std::move(tikv_key), std::move(tikv_value), dup_check).payload; + auto payload = doInsert(cf, std::move(tikv_key), std::move(tikv_value), dup_check); update_write_size(payload); ->>>>>>> 74de3a584a (KVStore: fix tiflash_raft_throughput_bytes warp problem (#10700)) } catch (Exception & e) { @@ -560,4 +534,4 @@ RegionRaftCommandDelegate & Region::makeRaftCommandDelegate(const KVStoreTaskLoc std::ignore = lock; return static_cast(*this); } -} // namespace DB \ No newline at end of file +} // namespace DB