From fb549a400786c62ec7882954f37834caab5bb5ef Mon Sep 17 00:00:00 2001 From: yongman Date: Tue, 26 May 2026 18:37:30 +0800 Subject: [PATCH 1/2] columnar: support bucket parallel read in region Signed-off-by: yongman --- .../ffi/src/RaftStoreProxyFFI/ProxyFFI.h | 2 + .../hub-runtime/src/cloud_helper.rs | 14 + .../hub-runtime/src/columnar_impls.rs | 20 +- .../hub-runtime/src/interfaces.rs | 7 + .../hub-runtime/src/run.rs | 5 +- .../Storages/StorageDisaggregatedColumnar.cpp | 663 ++++++++++++------ .../Storages/StorageDisaggregatedColumnar.h | 109 ++- 7 files changed, 565 insertions(+), 255 deletions(-) diff --git a/contrib/tiflash-columnar-hub/hub-runtime/ffi/src/RaftStoreProxyFFI/ProxyFFI.h b/contrib/tiflash-columnar-hub/hub-runtime/ffi/src/RaftStoreProxyFFI/ProxyFFI.h index ce4e60648a8..f4621ee0af6 100644 --- a/contrib/tiflash-columnar-hub/hub-runtime/ffi/src/RaftStoreProxyFFI/ProxyFFI.h +++ b/contrib/tiflash-columnar-hub/hub-runtime/ffi/src/RaftStoreProxyFFI/ProxyFFI.h @@ -228,6 +228,8 @@ struct SSTReaderInterfaces { struct CloudStorageEngineInterfaces { bool (*fn_get_keyspace_encryption)(RaftStoreProxyPtr, uint32_t); RawCppStringPtr (*fn_get_master_key)(RaftStoreProxyPtr); + RustStrWithViewVec (*fn_get_region_bucket_keys)(uint64_t, uint64_t, + RaftStoreProxyPtr); ColumnarReaderPtr (*fn_get_columnar_reader)(uint64_t, uint64_t, uint64_t, BaseBuffView, BaseBuffView, BaseBuffView, BaseBuffView, diff --git a/contrib/tiflash-columnar-hub/hub-runtime/src/cloud_helper.rs b/contrib/tiflash-columnar-hub/hub-runtime/src/cloud_helper.rs index dbbcb525000..03277c5227c 100644 --- a/contrib/tiflash-columnar-hub/hub-runtime/src/cloud_helper.rs +++ b/contrib/tiflash-columnar-hub/hub-runtime/src/cloud_helper.rs @@ -176,6 +176,16 @@ impl PdClientWithCache { pub fn get_security_mgr(&self) -> Arc { self.pd_client.get_security_mgr() } + + pub fn get_region_bucket_keys(&self, region_id: u64, region_ver: u64) -> Vec> { + let Some(bucket_stat) = self.pd_client.get_buckets(region_id) else { + return Vec::new(); + }; + if bucket_stat.meta.region_epoch.get_version() != region_ver { + return Vec::new(); + } + bucket_stat.meta.keys.clone() + } } #[derive(Clone)] @@ -444,6 +454,10 @@ impl CloudHelper { } } } + + pub fn get_region_bucket_keys(&self, region_id: u64, region_ver: u64) -> Vec> { + self.pd_client.get_region_bucket_keys(region_id, region_ver) + } } fn collect_ia_meta_files(meta_paths: &[PathBuf]) -> std::io::Result> { diff --git a/contrib/tiflash-columnar-hub/hub-runtime/src/columnar_impls.rs b/contrib/tiflash-columnar-hub/hub-runtime/src/columnar_impls.rs index 6316c2107a6..3d063267c3f 100644 --- a/contrib/tiflash-columnar-hub/hub-runtime/src/columnar_impls.rs +++ b/contrib/tiflash-columnar-hub/hub-runtime/src/columnar_impls.rs @@ -18,10 +18,10 @@ use kvengine::{CloudColumnarReaders, TableCtx}; use protobuf::{parse_from_bytes, Message}; use crate::{ - build_from_string, + build_from_string, build_from_vec_string, interfaces_ffi::{ BaseBuffView, ColumnarReaderErrorType, ColumnarReaderPtr, RaftStoreProxyPtr, RawRustPtr, - RawVoidPtr, RustStrWithView, + RawVoidPtr, RustStrWithView, RustStrWithViewVec, }, RawRustPtrType, }; @@ -73,6 +73,22 @@ impl From for ColumnarReaderPtr { } } +pub unsafe extern "C" fn ffi_get_region_bucket_keys( + region_id: u64, + region_ver: u64, + hub_ptr: RaftStoreProxyPtr, +) -> RustStrWithViewVec { + let hub = hub_ptr.as_ref(); + let bucket_keys = hub + .cloud_helper + .get_region_bucket_keys(region_id, region_ver); + if bucket_keys.is_empty() { + RustStrWithViewVec::default() + } else { + build_from_vec_string(bucket_keys) + } +} + pub unsafe extern "C" fn ffi_make_columnar_reader( shard_id: u64, shard_ver: u64, diff --git a/contrib/tiflash-columnar-hub/hub-runtime/src/interfaces.rs b/contrib/tiflash-columnar-hub/hub-runtime/src/interfaces.rs index a52c1b0233e..5f9c9485b37 100644 --- a/contrib/tiflash-columnar-hub/hub-runtime/src/interfaces.rs +++ b/contrib/tiflash-columnar-hub/hub-runtime/src/interfaces.rs @@ -355,6 +355,13 @@ pub mod root { arg1: root::DB::RaftStoreProxyPtr, ) -> root::DB::RawCppStringPtr, >, + pub fn_get_region_bucket_keys: ::std::option::Option< + unsafe extern "C" fn( + arg1: u64, + arg2: u64, + arg3: root::DB::RaftStoreProxyPtr, + ) -> root::DB::RustStrWithViewVec, + >, pub fn_get_columnar_reader: ::std::option::Option< unsafe extern "C" fn( arg1: u64, diff --git a/contrib/tiflash-columnar-hub/hub-runtime/src/run.rs b/contrib/tiflash-columnar-hub/hub-runtime/src/run.rs index 9d62d3b740a..b4b78a3e6d8 100644 --- a/contrib/tiflash-columnar-hub/hub-runtime/src/run.rs +++ b/contrib/tiflash-columnar-hub/hub-runtime/src/run.rs @@ -55,8 +55,8 @@ use tikv_util::{ use crate::{ cloud_helper::{CloudEngineBackends, CloudHelper}, columnar_impls::{ - ffi_make_columnar_reader, ffi_physical_table_id, ffi_read_block, ffi_read_column, - ffi_read_handle, ffi_read_version, + ffi_get_region_bucket_keys, ffi_make_columnar_reader, ffi_physical_table_id, + ffi_read_block, ffi_read_column, ffi_read_handle, ffi_read_version, }, domain_impls::ffi_gc_rust_ptr, engine_store_helper::{ @@ -1140,6 +1140,7 @@ fn build_hub_ffi_helper(hub: &ColumnarHub) -> RaftStoreProxyFFIHelper { cloud_storage_engine_interfaces: CloudStorageEngineInterfaces { fn_get_keyspace_encryption: Some(ffi_get_keyspace_encryption), fn_get_master_key: Some(ffi_get_master_key), + fn_get_region_bucket_keys: Some(ffi_get_region_bucket_keys), fn_get_columnar_reader: Some(ffi_make_columnar_reader), fn_read_block: Some(ffi_read_block), fn_read_handle: Some(ffi_read_handle), diff --git a/dbms/src/Storages/StorageDisaggregatedColumnar.cpp b/dbms/src/Storages/StorageDisaggregatedColumnar.cpp index 0bd82bf41eb..1ffe9dad4dc 100644 --- a/dbms/src/Storages/StorageDisaggregatedColumnar.cpp +++ b/dbms/src/Storages/StorageDisaggregatedColumnar.cpp @@ -55,6 +55,7 @@ #include #include +#include #include #include @@ -65,8 +66,151 @@ namespace ErrorCodes extern const int COLUMNAR_SNAPSHOT_ERROR; } // namespace ErrorCodes +struct RNProxyReaderSharedContext +{ + LoggerPtr log; + const Context * context = nullptr; + UInt64 start_ts = 0; + DM::ColumnDefinesPtr column_defines; + int extra_table_id_index = -1; + TableID logical_table_id = 0; + String executor_id; + String table_scan_data; + String filter_conditions_data; + String table_info_data; + String ann_query_info_data; + String fts_query_info_data; + std::shared_ptr output_lock = std::make_shared(); +}; + namespace { +using ProxyPhysicalTableRanges = std::vector>; +using BucketSplitUnit = std::pair; + +void normalizeTimestampCompareDateTimeLiteralToUTC(tipb::Expr & expr, const TimezoneInfo & timezone_info); + +struct BucketSplitResult +{ + bool has_bucket_split = false; + std::vector units; +}; + +struct RegionReaderPlan +{ + RegionID region_id; + pingcap::kv::RegionVerID region_ver_id; + ProxyPhysicalTableRanges physical_table_ranges; + std::vector bucket_units; +}; + +bool isBucketBoundaryInsideRange(const String & bucket_key, const pingcap::coprocessor::KeyRange & range) +{ + if (bucket_key.empty()) + return false; + if (!range.start_key.empty() && bucket_key <= range.start_key) + return false; + if (!range.end_key.empty() && bucket_key >= range.end_key) + return false; + return true; +} + +BucketSplitResult splitRangesByBucketKeys( + const ProxyPhysicalTableRanges & physical_table_ranges, + const std::vector & bucket_keys) +{ + BucketSplitResult result; + if (bucket_keys.size() <= 2) + return result; + + for (const auto & [table_id, ranges] : physical_table_ranges) + { + for (const auto & range : ranges) + { + String current_start = range.start_key; + bool current_range_split = false; + for (const auto & bucket_key : bucket_keys) + { + if (!isBucketBoundaryInsideRange(bucket_key, range)) + continue; + result.units.emplace_back(table_id, pingcap::coprocessor::KeyRange{current_start, bucket_key}); + current_start = bucket_key; + current_range_split = true; + } + if (!range.end_key.empty() && current_start >= range.end_key) + continue; + result.units.emplace_back(table_id, pingcap::coprocessor::KeyRange{current_start, range.end_key}); + result.has_bucket_split = result.has_bucket_split || current_range_split; + } + } + return result; +} + +void appendRangeToReaderRanges( + ProxyPhysicalTableRanges & reader_ranges, + TableID table_id, + pingcap::coprocessor::KeyRange range) +{ + auto it = std::find_if(reader_ranges.begin(), reader_ranges.end(), [&](const auto & entry) { + return std::get<0>(entry) == table_id; + }); + if (it == reader_ranges.end()) + { + reader_ranges.emplace_back(table_id, pingcap::coprocessor::KeyRanges{std::move(range)}); + return; + } + std::get<1>(*it).push_back(std::move(range)); +} + +std::vector packBucketUnitsIntoReaders( + const std::vector & units, + size_t reader_count) +{ + std::vector reader_groups; + if (units.empty() || reader_count == 0) + return reader_groups; + + reader_count = std::min(reader_count, units.size()); + reader_groups.resize(reader_count); + size_t base_unit_count = units.size() / reader_count; + size_t remainder = units.size() % reader_count; + size_t unit_index = 0; + for (size_t reader_index = 0; reader_index < reader_count; ++reader_index) + { + size_t current_unit_count = base_unit_count + (reader_index < remainder ? 1 : 0); + auto & reader_ranges = reader_groups[reader_index]; + for (size_t i = 0; i < current_unit_count; ++i) + { + const auto & [table_id, range] = units[unit_index++]; + appendRangeToReaderRanges(reader_ranges, table_id, range); + } + } + return reader_groups; +} + +std::vector getRegionBucketKeysFromProxy(const Context & context, RegionID region_id, UInt64 region_ver) +{ + const Context & global_ctx = context.getGlobalContext(); + const TiFlashRaftProxyHelper * proxy_helper = global_ctx.getSharedContextDisagg()->getColumnarProxyHelper(); + if (proxy_helper == nullptr || proxy_helper->cloud_storage_engine_interfaces.fn_get_region_bucket_keys == nullptr) + return {}; + + RustStrWithViewVec bucket_keys = proxy_helper->cloud_storage_engine_interfaces.fn_get_region_bucket_keys( + region_id, + region_ver, + proxy_helper->proxy_ptr); + SCOPE_EXIT({ + if (bucket_keys.inner.ptr != nullptr) + RustGcHelper::instance().gcRustPtr(bucket_keys.inner.ptr, bucket_keys.inner.type); + }); + + std::vector res; + res.reserve(static_cast(bucket_keys.len)); + for (size_t i = 0; i < bucket_keys.len; ++i) + res.emplace_back(bucket_keys.buffs[i].data, bucket_keys.buffs[i].len); + return res; +} + std::vector> genGeneratedColumnInfosForDisaggregatedRead( const TiDBTableScan & table_scan) { @@ -109,6 +253,87 @@ std::tuple genColumnDefinesForDisaggregatedReadThroug return {std::move(column_defines), extra_table_id_index}; } +std::shared_ptr buildProxyReaderSharedContext( + const LoggerPtr & log, + const Context & context, + UInt64 start_ts, + const TiDBTableScan & table_scan, + const FilterConditions & filter_conditions) +{ + auto shared_context = std::make_shared(); + shared_context->log = log; + shared_context->context = &context; + shared_context->start_ts = start_ts; + shared_context->logical_table_id = table_scan.getLogicalTableID(); + shared_context->executor_id = table_scan.getTableScanExecutorID(); + std::tie(shared_context->column_defines, shared_context->extra_table_id_index) + = genColumnDefinesForDisaggregatedReadThroughColumnar(table_scan); + + auto table_scan_pb = *table_scan.getTableScanPB(); + const auto & timezone_info = context.getTimezoneInfo(); + if (table_scan_pb.tp() == tipb::TypePartitionTableScan) + { + auto * pushed_down_filters + = table_scan_pb.mutable_partition_table_scan()->mutable_pushed_down_filter_conditions(); + for (int i = 0; i < pushed_down_filters->size(); ++i) + normalizeTimestampCompareDateTimeLiteralToUTC(*pushed_down_filters->Mutable(i), timezone_info); + } + else + { + auto * pushed_down_filters = table_scan_pb.mutable_tbl_scan()->mutable_pushed_down_filter_conditions(); + for (int i = 0; i < pushed_down_filters->size(); ++i) + normalizeTimestampCompareDateTimeLiteralToUTC(*pushed_down_filters->Mutable(i), timezone_info); + } + shared_context->table_scan_data = table_scan_pb.SerializeAsString(); + + auto conditions = filter_conditions.conditions; + for (int i = 0; i < conditions.size(); ++i) + normalizeTimestampCompareDateTimeLiteralToUTC(*conditions.Mutable(i), timezone_info); + for (const auto & condition : conditions) + { + auto data = condition.SerializeAsString(); + uint32_t len = data.size(); + shared_context->filter_conditions_data.append(reinterpret_cast(&len), sizeof(len)); + shared_context->filter_conditions_data.append(data.data(), data.size()); + } + + tipb::TableInfo table_info; + bool is_partition_scan = table_scan.isPartitionTableScan(); + const auto & tidb_columns = table_scan.getColumns(); + const auto should_skip_column_for_columnar_table_info = [&](ColumnID column_id) { + if (column_id == MutSup::extra_table_id_col_id) + return true; + for (const auto & ci : tidb_columns) + { + if (ci.id == column_id && ci.hasGeneratedColumnFlag()) + return true; + } + return false; + }; + if (is_partition_scan) + { + for (const auto & column : table_scan_pb.partition_table_scan().columns()) + { + if (should_skip_column_for_columnar_table_info(column.column_id())) + continue; + *table_info.add_columns() = column; + } + } + else + { + for (const auto & column : table_scan_pb.tbl_scan().columns()) + { + if (should_skip_column_for_columnar_table_info(column.column_id())) + continue; + *table_info.add_columns() = column; + } + } + shared_context->table_info_data = table_info.SerializeAsString(); + shared_context->ann_query_info_data = table_scan.getANNQueryInfo().SerializeAsString(); + shared_context->fts_query_info_data = table_scan.getFTSQueryInfo().SerializeAsString(); + return shared_context; +} + bool isProxyFilterComparableExpr(tipb::ScalarFuncSig sig) { // Keep this aligned with proxy columnar filter supported signatures: @@ -283,20 +508,15 @@ void StorageDisaggregated::readThroughColumnar( filter_conditions, remote_table_ranges, num_streams); - auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedReadThroughColumnar(table_scan); + const auto generated_column_infos = genGeneratedColumnInfosForDisaggregatedRead(table_scan); for (auto & task : read_proxy_tasks) { group_builder.addConcurrency(RNProxySourceOp::create({ - .context = context, - .debug_tag = log->identifier(), .exec_context = exec_context, - .columns_to_read = *column_defines, .task = task, - .extra_table_id_index = extra_table_id_index, })); } - const auto generated_column_infos = genGeneratedColumnInfosForDisaggregatedRead(table_scan); executeGeneratedColumnPlaceholder(exec_context, group_builder, generated_column_infos, log); NamesAndTypes source_columns; @@ -312,44 +532,14 @@ void StorageDisaggregated::readThroughColumnar( filterConditionsWithPushedDownFilters(exec_context, group_builder, *analyzer); } -// RNProxyReaderPtr -RNProxyReaderPtr RNProxyReader::createProxyReader( - const LoggerPtr & log, - const Context & context, - RegionID region_id, - RegionVersion region_ver, - UInt64 region_conf_ver, - const std::vector> & physical_table_ranges, - UInt64 start_ts, - const TiDBTableScan & table_scan, - const FilterConditions & filter_conditions, - std::mutex & output_lock) +ColumnarReaderPtr createProxyColumnarReader( + const RNProxyReaderSharedContext & shared_context, + const RNProxyReaderPlan & reader_plan) { - auto table_scan_pb = *table_scan.getTableScanPB(); - const auto & timezone_info = context.getTimezoneInfo(); - if (table_scan_pb.tp() == tipb::TypePartitionTableScan) - { - auto * pushed_down_filters - = table_scan_pb.mutable_partition_table_scan()->mutable_pushed_down_filter_conditions(); - for (int i = 0; i < pushed_down_filters->size(); ++i) - normalizeTimestampCompareDateTimeLiteralToUTC(*pushed_down_filters->Mutable(i), timezone_info); - } - else - { - auto * pushed_down_filters = table_scan_pb.mutable_tbl_scan()->mutable_pushed_down_filter_conditions(); - for (int i = 0; i < pushed_down_filters->size(); ++i) - normalizeTimestampCompareDateTimeLiteralToUTC(*pushed_down_filters->Mutable(i), timezone_info); - } - auto table_scan_data = table_scan_pb.SerializeAsString(); - auto table_scan_view = BaseBuffView{table_scan_data.data(), table_scan_data.size()}; - auto conditions = filter_conditions.conditions; - for (int i = 0; i < conditions.size(); ++i) - normalizeTimestampCompareDateTimeLiteralToUTC(*conditions.Mutable(i), timezone_info); - // Copy pushed down filters to filter_conditions to make filterConditions works properly. - // Proxy columnar reader use pushed down filters to reduce packs load from disk and has no - // guarantee to filter all useless data, so we rely on the filterConditions to filter data. + const auto & log = shared_context.log; + const auto & context = *shared_context.context; String tables_range_data; - for (const auto & [physical_table_id, ranges] : physical_table_ranges) + for (const auto & [physical_table_id, ranges] : reader_plan.physical_table_ranges) { tables_range_data.append(reinterpret_cast(&physical_table_id), sizeof(physical_table_id)); @@ -368,65 +558,24 @@ RNProxyReaderPtr RNProxyReader::createProxyReader( tables_range_data.append(reinterpret_cast(&ranges_data_size), sizeof(ranges_data_size)); tables_range_data.append(ranges_data.data(), ranges_data.size()); } - auto tables_range_view = BaseBuffView{tables_range_data.data(), tables_range_data.size()}; - String filter_conditions_data; - for (const auto & condition : conditions) - { - auto data = condition.SerializeAsString(); - uint32_t len = data.size(); - filter_conditions_data.append(reinterpret_cast(&len), sizeof(len)); - filter_conditions_data.append(data.data(), data.size()); - } - tipb::TableInfo table_info; - bool is_partition_scan = table_scan.isPartitionTableScan(); - const auto & tidb_columns = table_scan.getColumns(); - const auto should_skip_column_for_columnar_table_info = [&](ColumnID column_id) { - // _tidb_tid is filled locally by TiFlash, consistent with genColumnDefinesForDisaggregatedRead(). - if (column_id == MutSup::extra_table_id_col_id) - return true; - // Generated columns are not stored in kvengine; executeGeneratedColumnPlaceholder fills them later. - for (const auto & ci : tidb_columns) - { - if (ci.id == column_id && ci.hasGeneratedColumnFlag()) - return true; - } - return false; - }; - if (is_partition_scan) - { - for (const auto & column : table_scan_pb.partition_table_scan().columns()) - { - if (should_skip_column_for_columnar_table_info(column.column_id())) - continue; - *table_info.add_columns() = column; - } - } - else - { - for (const auto & column : table_scan_pb.tbl_scan().columns()) - { - if (should_skip_column_for_columnar_table_info(column.column_id())) - continue; - *table_info.add_columns() = column; - } - } - auto table_info_data = table_info.SerializeAsString(); - auto columns = BaseBuffView{table_info_data.data(), table_info_data.size()}; - auto filter_conditions_view = BaseBuffView{filter_conditions_data.data(), filter_conditions_data.size()}; - const auto & ann_query_info_pb = table_scan.getANNQueryInfo(); - const auto & fts_query_info_pb = table_scan.getFTSQueryInfo(); - auto ann_query_info_data = ann_query_info_pb.SerializeAsString(); - auto fts_query_info_data = fts_query_info_pb.SerializeAsString(); - auto ann_query_info_view = BaseBuffView{ann_query_info_data.data(), ann_query_info_data.size()}; - auto fts_query_info_view = BaseBuffView{fts_query_info_data.data(), fts_query_info_data.size()}; + BaseBuffView tables_range_view = BaseBuffView{tables_range_data.data(), tables_range_data.size()}; + BaseBuffView columns = BaseBuffView{shared_context.table_info_data.data(), shared_context.table_info_data.size()}; + BaseBuffView filter_conditions_view + = BaseBuffView{shared_context.filter_conditions_data.data(), shared_context.filter_conditions_data.size()}; + BaseBuffView table_scan_view + = BaseBuffView{shared_context.table_scan_data.data(), shared_context.table_scan_data.size()}; + BaseBuffView ann_query_info_view + = BaseBuffView{shared_context.ann_query_info_data.data(), shared_context.ann_query_info_data.size()}; + BaseBuffView fts_query_info_view + = BaseBuffView{shared_context.fts_query_info_data.data(), shared_context.fts_query_info_data.size()}; const Context & global_ctx = context.getGlobalContext(); auto * cluster = global_ctx.getTMTContext().getKVCluster(); const TiFlashRaftProxyHelper * proxy_helper = global_ctx.getSharedContextDisagg()->getColumnarProxyHelper(); RUNTIME_CHECK_MSG(proxy_helper != nullptr, "columnar proxy helper is not initialized"); ColumnarReaderPtr columnar_reader = proxy_helper->cloud_storage_engine_interfaces.fn_get_columnar_reader( - region_id, - region_ver, - start_ts, + reader_plan.region_id, + reader_plan.region_ver, + shared_context.start_ts, std::move(tables_range_view), std::move(columns), std::move(table_scan_view), @@ -434,13 +583,14 @@ RNProxyReaderPtr RNProxyReader::createProxyReader( std::move(ann_query_info_view), std::move(fts_query_info_view), proxy_helper->proxy_ptr); - bool reader_transferred = false; + bool reader_returned = false; SCOPE_EXIT({ - if (!reader_transferred) + if (!reader_returned && columnar_reader.inner.ptr != nullptr) RustGcHelper::instance().gcRustPtr(columnar_reader.inner.ptr, columnar_reader.inner.type); }); SCOPE_EXIT({ - if (!reader_transferred && columnar_reader.error_type != ColumnarReaderErrorType::OK) + if (!reader_returned && columnar_reader.error_type != ColumnarReaderErrorType::OK + && columnar_reader.error.inner.ptr != nullptr) RustGcHelper::instance().gcRustPtr(columnar_reader.error.inner.ptr, columnar_reader.error.inner.type); }); if (columnar_reader.error_type == ColumnarReaderErrorType::RegionError) @@ -448,26 +598,25 @@ RNProxyReaderPtr RNProxyReader::createProxyReader( auto error_msg = String(columnar_reader.error.buff.data, columnar_reader.error.buff.len); errorpb::Error region_error; region_error.ParseFromString(error_msg); - auto region_ver_id = pingcap::kv::RegionVerID(region_id, region_conf_ver, region_ver); + auto region_ver_id + = pingcap::kv::RegionVerID(reader_plan.region_id, reader_plan.region_conf_ver, reader_plan.region_ver); // Refresh region cache and throw an exception for retrying. if (region_error.has_epoch_not_match()) { RegionException::UnavailableRegions unavailable_regions; String region_id_ver; // region_id:region_ver:conf_ver - std::unordered_set retry_regions; for (const auto & region : region_error.epoch_not_match().current_regions()) { unavailable_regions.insert(region.id()); - retry_regions.insert(region.id()); - region_id_ver = std::to_string(region.id()) + ":" + std::to_string(region_ver) + ":" + region_id_ver = std::to_string(region.id()) + ":" + std::to_string(reader_plan.region_ver) + ":" + std::to_string(region.region_epoch().conf_ver()); } - auto guard = std::lock_guard(output_lock); + auto _guard = std::lock_guard(*shared_context.output_lock); cluster->region_cache->dropRegion(region_ver_id); LOG_WARNING( log, "create columnar reader failed region_id={}, epoch not match {}", - std::to_string(region_id), + std::to_string(reader_plan.region_id), region_ver_id.toString()); throw RegionException( std::move(unavailable_regions), @@ -477,17 +626,15 @@ RNProxyReaderPtr RNProxyReader::createProxyReader( else { RegionException::UnavailableRegions unavailable_regions; - std::unordered_set retry_regions; auto err_region_id = 0; if (region_error.has_region_not_found()) { err_region_id = region_error.region_not_found().region_id(); unavailable_regions.insert(err_region_id); - retry_regions.insert(err_region_id); LOG_WARNING( log, "create columnar reader failed region_id={}, region not found {}", - std::to_string(region_id), + std::to_string(reader_plan.region_id), std::to_string(err_region_id)); } else @@ -495,15 +642,15 @@ RNProxyReaderPtr RNProxyReader::createProxyReader( LOG_WARNING( log, "create columnar reader failed region_id={}, {}", - std::to_string(region_id), + std::to_string(reader_plan.region_id), region_error.ShortDebugString()); } - auto guard = std::lock_guard(output_lock); + auto _guard = std::lock_guard(*shared_context.output_lock); cluster->region_cache->dropRegion(region_ver_id); throw RegionException( std::move(unavailable_regions), RegionException::RegionReadStatus::NOT_FOUND, - std::to_string(region_id).c_str()); + std::to_string(reader_plan.region_id).c_str()); } } else if (columnar_reader.error_type == ColumnarReaderErrorType::LockedError) @@ -515,8 +662,8 @@ RNProxyReaderPtr RNProxyReader::createProxyReader( pingcap::kv::Backoffer bo(pingcap::kv::copNextMaxBackoff); std::vector pushed; std::vector locks{std::make_shared(lock_info)}; - auto guard = std::lock_guard(output_lock); - auto before_expired = cluster->lock_resolver->resolveLocks(bo, start_ts, locks, pushed); + auto _guard = std::lock_guard(*shared_context.output_lock); + auto before_expired = cluster->lock_resolver->resolveLocks(bo, shared_context.start_ts, locks, pushed); LOG_WARNING(log, "Finished resolve locks, before_expired={}", before_expired); throw Exception("lock error", ErrorCodes::COLUMNAR_SNAPSHOT_ERROR); } @@ -538,22 +685,99 @@ RNProxyReaderPtr RNProxyReader::createProxyReader( throw Exception(ErrorCodes::COLUMNAR_SNAPSHOT_ERROR, "{}", error_msg); } - // Create input stream. - auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedReadThroughColumnar(table_scan); - BlockInputStreamPtr input_stream = RNProxyInputStream::create({ - .context = context, - .debug_tag = log->identifier(), - .columns_to_read = *column_defines, - .reader = columnar_reader, - .extra_table_id_index = extra_table_id_index, - .table_id = table_scan.getLogicalTableID(), - .executor_id = table_scan.getTableScanExecutorID(), - }); - reader_transferred = true; - return std::make_shared(input_stream); + reader_returned = true; + return columnar_reader; } // RNProxyReadTask +RNProxyReadTask::RNProxyReadTask( + std::vector reader_plans_, + std::shared_ptr shared_reader_context_) + : reader_plans(std::move(reader_plans_)) + , shared_reader_context(std::move(shared_reader_context_)) +{} + +size_t RNProxyReadTask::getReaderCount() const +{ + return reader_plans.size(); +} + +const Context & RNProxyReadTask::getContext() const +{ + return *shared_reader_context->context; +} + +const LoggerPtr & RNProxyReadTask::getLog() const +{ + return shared_reader_context->log; +} + +const DM::ColumnDefines & RNProxyReadTask::getColumnsToRead() const +{ + return *shared_reader_context->column_defines; +} + +int RNProxyReadTask::getExtraTableIDIndex() const +{ + return shared_reader_context->extra_table_id_index; +} + +TableID RNProxyReadTask::getLogicalTableID() const +{ + return shared_reader_context->logical_table_id; +} + +const String & RNProxyReadTask::getExecutorID() const +{ + return shared_reader_context->executor_id; +} + +ColumnarReaderPtr RNProxyReadTask::createColumnarReaderWithBackoff(size_t reader_index) const +{ + RUNTIME_CHECK(reader_index < reader_plans.size()); + const auto & reader_plan = reader_plans[reader_index]; + pingcap::kv::Backoffer bo(pingcap::kv::copNextMaxBackoff); + while (true) + { + try + { + LOG_INFO( + getLog(), + "materialize proxy reader for tables in region, region_id={}, table_num={}", + reader_plan.region_id, + reader_plan.physical_table_ranges.size()); + return createProxyColumnarReader(*shared_reader_context, reader_plan); + } + catch (RegionException & e) + { + LOG_WARNING(getLog(), "create proxy reader failed, backoff and retry, {}", e.message()); + bo.backoff(pingcap::kv::boRegionMiss, pingcap::Exception(e.message(), e.code())); + } + catch (Exception & e) + { + if (e.code() != ErrorCodes::COLUMNAR_SNAPSHOT_ERROR) + throw; + LOG_WARNING(getLog(), "create proxy reader failed, backoff and retry, {}", e.message()); + bo.backoff(pingcap::kv::boRegionMiss, pingcap::Exception(e.message(), e.code())); + } + } +} + +BlockInputStreamPtr RNProxyReadTask::createInputStream(size_t reader_index) +{ + RUNTIME_CHECK(reader_index < reader_plans.size()); + return RNProxyInputStream::create({ + .context = getContext(), + .log = getLog(), + .task = shared_from_this(), + .reader_index = reader_index, + .columns_to_read = getColumnsToRead(), + .extra_table_id_index = getExtraTableIDIndex(), + .table_id = getLogicalTableID(), + .executor_id = getExecutorID(), + }); +} + std::vector RNProxyReadTask::buildProxyReadTaskWithBackoff( const LoggerPtr & log, const Context & context, @@ -608,6 +832,7 @@ std::vector RNProxyReadTask::buildProxyReadTask( auto scan_context = std::make_shared(dag_context->getKeyspaceID(), dag_context->getResourceGroupName()); dag_context->scan_context_map[table_scan.getTableScanExecutorID()] = scan_context; + auto shared_reader_context = buildProxyReaderSharedContext(log, context, start_ts, table_scan, filter_conditions); std::vector tasks; // Collect all regions in the table scan. @@ -663,102 +888,141 @@ std::vector RNProxyReadTask::buildProxyReadTask( } unsigned region_num = all_remote_regions_by_region.size(); unsigned physical_table_num = physical_table_ids.size(); - unsigned real_num_streams = std::min(num_streams, region_num); - // Regions per RNProxyReader, it should be ceil of region_num / real_num_streams. - // `regions_per_reader` is the ceil of the division, so the concurrency may be less than `real_num_streams`. - unsigned regions_per_reader = (region_num + real_num_streams - 1) / real_num_streams; + const bool enable_bucket_parallel = !table_scan.keepOrder() && num_streams > region_num; + std::vector region_reader_plans; + region_reader_plans.reserve(region_num); + size_t total_max_reader_num = region_num; + for (const auto & [region_id, physical_table_ranges] : all_remote_regions_by_region) + { + RegionReaderPlan plan{ + .region_id = region_id, + .region_ver_id = region_ver_ids[region_id], + .physical_table_ranges = physical_table_ranges, + }; + if (enable_bucket_parallel) + { + auto bucket_keys = getRegionBucketKeysFromProxy(context, region_id, plan.region_ver_id.ver); + auto split_result = splitRangesByBucketKeys(physical_table_ranges, bucket_keys); + if (split_result.has_bucket_split && split_result.units.size() > 1) + { + total_max_reader_num += split_result.units.size() - 1; + plan.bucket_units = std::move(split_result.units); + } + } + region_reader_plans.emplace_back(std::move(plan)); + } + + std::vector reader_count_per_region(region_reader_plans.size(), 1); + if (enable_bucket_parallel) + { + size_t target_reader_num = std::min(total_max_reader_num, static_cast(num_streams)); + size_t extra_reader_budget = target_reader_num > region_num ? target_reader_num - region_num : 0; + while (extra_reader_budget > 0) + { + bool allocated = false; + for (size_t i = 0; i < region_reader_plans.size() && extra_reader_budget > 0; ++i) + { + const auto max_reader_count + = region_reader_plans[i].bucket_units.empty() ? 1 : region_reader_plans[i].bucket_units.size(); + if (reader_count_per_region[i] >= max_reader_count) + continue; + ++reader_count_per_region[i]; + --extra_reader_budget; + allocated = true; + } + if (!allocated) + break; + } + } + + size_t planned_reader_num = 0; + for (auto reader_count : reader_count_per_region) + planned_reader_num += reader_count; LOG_INFO( log, - "region_num={}, table_num={}, num_streams={}, real_num_streams={}, regions_per_reader={}", + "region_num={}, table_num={}, num_streams={}, keep_order={}, bucket_parallel={}, planned_reader_num={}, " + "max_reader_num={}", region_num, physical_table_num, num_streams, - real_num_streams, - regions_per_reader); - unsigned reader_idx = 0; - std::vector all_readers; - std::mutex output_lock; - auto thread_manager = newThreadManager(); + table_scan.keepOrder(), + enable_bucket_parallel, + planned_reader_num, + total_max_reader_num); - for (const auto & [region_id, physical_table_ranges] : all_remote_regions_by_region) + std::vector all_reader_plans; + all_reader_plans.reserve(planned_reader_num); + + for (size_t i = 0; i < region_reader_plans.size(); ++i) { - auto region_ver = region_ver_ids[region_id].ver; - auto region_conf_ver = region_ver_ids[region_id].conf_ver; - thread_manager->schedule( - true, - "createProxyReader", - [log, - &context, - region_id, - region_ver, - region_conf_ver, - physical_table_ranges, - start_ts, - &table_scan, - &filter_conditions, - &output_lock, - &all_readers] { - LOG_INFO( - log, - "create proxy reader for tables in region, region_id={}, table_num={}", - region_id, - physical_table_ranges.size()); - auto reader_ptr = RNProxyReader::createProxyReader( - log, - context, - region_id, - region_ver, - region_conf_ver, - physical_table_ranges, - start_ts, - table_scan, - filter_conditions, - output_lock); - { - std::lock_guard lock(output_lock); - all_readers.push_back(reader_ptr); - } + const auto & plan = region_reader_plans[i]; + auto reader_groups = plan.bucket_units.empty() || reader_count_per_region[i] <= 1 + ? std::vector{plan.physical_table_ranges} + : packBucketUnitsIntoReaders(plan.bucket_units, reader_count_per_region[i]); + for (const auto & physical_table_ranges : reader_groups) + { + all_reader_plans.push_back(RNProxyReaderPlan{ + .region_id = plan.region_id, + .region_ver = plan.region_ver_id.ver, + .region_conf_ver = plan.region_ver_id.conf_ver, + .physical_table_ranges = physical_table_ranges, }); + } } - thread_manager->wait(); - - std::vector readers; - for (auto & reader : all_readers) + unsigned reader_num = all_reader_plans.size(); + if (reader_num == 0) + return tasks; + unsigned real_num_streams = std::min(num_streams, reader_num); + // Readers per RNProxyReadTask, it should be ceil of reader_num / real_num_streams. + unsigned readers_per_task = (reader_num + real_num_streams - 1) / real_num_streams; + unsigned reader_idx = 0; + std::vector readers; + for (auto & reader_plan : all_reader_plans) { ++reader_idx; - readers.push_back(reader); - if (reader_idx == regions_per_reader) + readers.push_back(std::move(reader_plan)); + if (reader_idx == readers_per_task) { reader_idx = 0; - tasks.push_back(std::make_shared(std::move(readers))); + tasks.push_back(std::make_shared(std::move(readers), shared_reader_context)); readers.clear(); } } if (!readers.empty()) { - tasks.push_back(std::make_shared(std::move(readers))); + tasks.push_back(std::make_shared(std::move(readers), shared_reader_context)); } return tasks; } -BlockInputStreams RNProxyReadTask::getInputStreams() const +BlockInputStreams RNProxyReadTask::getInputStreams() { BlockInputStreams streams; - streams.reserve(proxy_readers.size()); - for (const auto & reader : proxy_readers) + streams.reserve(reader_plans.size()); + for (size_t reader_index = 0; reader_index < reader_plans.size(); ++reader_index) { - streams.push_back(reader->getInputStream()); + streams.push_back(createInputStream(reader_index)); } return streams; } // RNProxyInputStream +void RNProxyInputStream::ensureReader() +{ + if (reader.has_value()) + return; + reader.emplace(task->createColumnarReaderWithBackoff(reader_index)); +} + RNProxyInputStream::~RNProxyInputStream() { - SCOPE_EXIT({ RustGcHelper::instance().gcRustPtr(reader.inner.ptr, reader.inner.type); }); + SCOPE_EXIT({ + if (reader.has_value() && reader->inner.ptr != nullptr) + RustGcHelper::instance().gcRustPtr(reader->inner.ptr, reader->inner.type); + }); try { LOG_INFO( @@ -800,11 +1064,12 @@ Block RNProxyInputStream::readImpl([[maybe_unused]] FilterPtr & res_filter, [[ma { if (done) return {}; + ensureReader(); const Context & global_ctx = context.getGlobalContext(); const TiFlashRaftProxyHelper * proxy_helper = global_ctx.getSharedContextDisagg()->getColumnarProxyHelper(); RUNTIME_CHECK_MSG(proxy_helper != nullptr, "columnar proxy helper is not initialized"); Stopwatch w{CLOCK_MONOTONIC_COARSE}; - UInt64 rows = proxy_helper->cloud_storage_engine_interfaces.fn_read_block(reader, batch_size); + UInt64 rows = proxy_helper->cloud_storage_engine_interfaces.fn_read_block(reader.value(), batch_size); duration_read_sec += w.elapsedSecondsFromLastTime(); LOG_DEBUG(log, "Read {} rows from proxy", rows); if (rows == std::numeric_limits::max()) @@ -813,7 +1078,10 @@ Block RNProxyInputStream::readImpl([[maybe_unused]] FilterPtr & res_filter, [[ma throw Exception("read_block failed in tiflash-proxy", ErrorCodes::LOGICAL_ERROR); } if (rows == 0) + { + done = true; return {}; + } TableID physical_table_id = -1; Block header = getHeader(); @@ -832,9 +1100,9 @@ Block RNProxyInputStream::readImpl([[maybe_unused]] FilterPtr & res_filter, [[ma Int64 col_id = col_type_and_name[i].column_id; if (col_id == MutSup::extra_handle_id) { - RustStrWithView col_data = proxy_helper->cloud_storage_engine_interfaces.fn_read_handle(reader); + RustStrWithView col_data = proxy_helper->cloud_storage_engine_interfaces.fn_read_handle(reader.value()); SCOPE_EXIT({ RustGcHelper::instance().gcRustPtr(col_data.inner.ptr, col_data.inner.type); }); - physical_table_id = proxy_helper->cloud_storage_engine_interfaces.fn_physical_table_id(reader); + physical_table_id = proxy_helper->cloud_storage_engine_interfaces.fn_physical_table_id(reader.value()); ReadBufferFromMemory buf(col_data.buff.data, static_cast(col_data.buff.len)); auto & col = *columns[i]; col_type_and_name[i].type->deserializeBinaryBulkWithMultipleStreams( @@ -851,9 +1119,10 @@ Block RNProxyInputStream::readImpl([[maybe_unused]] FilterPtr & res_filter, [[ma } else { - RustStrWithView col_data = proxy_helper->cloud_storage_engine_interfaces.fn_read_column(reader, col_id); + RustStrWithView col_data + = proxy_helper->cloud_storage_engine_interfaces.fn_read_column(reader.value(), col_id); SCOPE_EXIT({ RustGcHelper::instance().gcRustPtr(col_data.inner.ptr, col_data.inner.type); }); - physical_table_id = proxy_helper->cloud_storage_engine_interfaces.fn_physical_table_id(reader); + physical_table_id = proxy_helper->cloud_storage_engine_interfaces.fn_physical_table_id(reader.value()); ReadBufferFromMemory buf(col_data.buff.data, static_cast(col_data.buff.len)); auto & col = *columns[i]; col_type_and_name[i].type->deserializeBinaryBulkWithMultipleStreams( @@ -940,9 +1209,12 @@ OperatorStatus RNProxySourceOp::executeIOImpl() return awaitImpl(); } + if (!current_input_stream) + current_input_stream = task->createInputStream(static_cast(current_reader_idx)); + FilterPtr filter_ignored = nullptr; Stopwatch w{CLOCK_MONOTONIC_COARSE}; - Block block = task->getProxyReaders()[current_reader_idx]->getInputStream()->read(filter_ignored, false); + Block block = current_input_stream->read(filter_ignored, false); duration_read_sec += w.elapsedSeconds(); if likely (block && block.rows() > 0) { @@ -952,11 +1224,12 @@ OperatorStatus RNProxySourceOp::executeIOImpl() } else { - if (current_reader_idx == static_cast(task->getProxyReaders().size() - 1)) + current_input_stream.reset(); + if (current_reader_idx == static_cast(task->getReaderCount() - 1)) { done = true; } - else if (current_reader_idx < static_cast(task->getProxyReaders().size() - 1)) + else if (current_reader_idx < static_cast(task->getReaderCount() - 1)) { ++current_reader_idx; } diff --git a/dbms/src/Storages/StorageDisaggregatedColumnar.h b/dbms/src/Storages/StorageDisaggregatedColumnar.h index 64a93e5775a..80a33946f56 100644 --- a/dbms/src/Storages/StorageDisaggregatedColumnar.h +++ b/dbms/src/Storages/StorageDisaggregatedColumnar.h @@ -48,49 +48,24 @@ class RSOperator; using RSOperatorPtr = std::shared_ptr; } // namespace DM -class RNProxyReader; -using RNProxyReaderPtr = std::shared_ptr; -class RNProxyReader : boost::noncopyable -{ -public: - static RNProxyReaderPtr createProxyReader( - const LoggerPtr & log, - const Context & context, - RegionID region_id, - RegionVersion region_ver, - UInt64 region_conf_ver, - const std::vector> & physical_table_ranges, - UInt64 start_ts, - const TiDBTableScan & table_scan, - const FilterConditions & filter_conditions, - std::mutex & output_lock); - - BlockInputStreamPtr getInputStream() const - { - RUNTIME_CHECK(input_stream != nullptr); - return input_stream; - } +struct RNProxyReaderSharedContext; - RNProxyReader(BlockInputStreamPtr input_stream) - : input_stream(input_stream) - {} - -private: - BlockInputStreamPtr input_stream; +struct RNProxyReaderPlan +{ + RegionID region_id; + RegionVersion region_ver; + UInt64 region_conf_ver; + std::vector> physical_table_ranges; }; class RNProxyReadTask; using RNProxyReadTaskPtr = std::shared_ptr; -class RNProxyReadTask : boost::noncopyable +class RNProxyReadTask + : public boost::noncopyable + , public std::enable_shared_from_this { public: using RemoteTableRange = std::pair; - const std::vector proxy_readers; - - static RNProxyReadTaskPtr create(const std::vector & proxy_readers) - { - return std::shared_ptr(new RNProxyReadTask(proxy_readers)); - } static std::vector buildProxyReadTaskWithBackoff( const LoggerPtr & log, @@ -110,13 +85,33 @@ class RNProxyReadTask : boost::noncopyable const std::vector & remote_table_ranges, unsigned num_streams); - BlockInputStreams getInputStreams() const; + BlockInputStreams getInputStreams(); + + BlockInputStreamPtr createInputStream(size_t reader_index); + + ColumnarReaderPtr createColumnarReaderWithBackoff(size_t reader_index) const; + + size_t getReaderCount() const; - std::vector getProxyReaders() { return proxy_readers; } + const Context & getContext() const; - RNProxyReadTask(const std::vector & proxy_readers) - : proxy_readers(proxy_readers) - {} + const LoggerPtr & getLog() const; + + const DM::ColumnDefines & getColumnsToRead() const; + + int getExtraTableIDIndex() const; + + TableID getLogicalTableID() const; + + const String & getExecutorID() const; + + RNProxyReadTask( + std::vector reader_plans, + std::shared_ptr shared_reader_context); + +private: + std::vector reader_plans; + std::shared_ptr shared_reader_context; }; class RNProxyInputStream : public IProfilingBlockInputStream @@ -139,9 +134,10 @@ class RNProxyInputStream : public IProfilingBlockInputStream struct Options { const Context & context; - std::string_view debug_tag; + LoggerPtr log; + RNProxyReadTaskPtr task; + size_t reader_index; const DM::ColumnDefines & columns_to_read; - ColumnarReaderPtr reader; int extra_table_id_index; TableID table_id; const String & executor_id; @@ -149,8 +145,9 @@ class RNProxyInputStream : public IProfilingBlockInputStream explicit RNProxyInputStream(const Options & options) : context(options.context) - , log(Logger::get(options.debug_tag)) - , reader(options.reader) + , log(options.log) + , task(options.task) + , reader_index(options.reader_index) , action(options.columns_to_read, options.extra_table_id_index) , table_id(options.table_id) , executor_id(options.executor_id) @@ -162,9 +159,13 @@ class RNProxyInputStream : public IProfilingBlockInputStream static BlockInputStreamPtr create(const Options & options) { return std::make_shared(options); } private: + void ensureReader(); + const Context & context; const LoggerPtr log; - ColumnarReaderPtr reader; + RNProxyReadTaskPtr task; + size_t reader_index; + std::optional reader; AddExtraTableIDColumnTransformAction action; TableID table_id; const String & executor_id; @@ -185,23 +186,19 @@ class RNProxySourceOp : public SourceOp public: struct Options { - const Context & context; - std::string_view debug_tag; PipelineExecutorContext & exec_context; - const DM::ColumnDefines & columns_to_read; RNProxyReadTaskPtr task; - int extra_table_id_index; }; explicit RNProxySourceOp(const Options & options) - : SourceOp(options.exec_context, String(options.debug_tag)) - , context(options.context) - , log(Logger::get(options.debug_tag)) + : SourceOp(options.exec_context, options.task->getLog()->identifier()) + , context(options.task->getContext()) + , log(options.task->getLog()) , task(options.task) - , action(options.columns_to_read, options.extra_table_id_index) { - // Keep header aligned with genNamesAndTypesForTableScan when TiDB requests _tidb_tid on partition scans. - setHeader(action.getHeader()); + setHeader(AddExtraTableIDColumnTransformAction::buildHeader( + options.task->getColumnsToRead(), + options.task->getExtraTableIDIndex())); } static SourceOpPtr create(const Options & options) { return std::make_unique(options); } @@ -225,10 +222,10 @@ class RNProxySourceOp : public SourceOp const Context & context; const LoggerPtr log; RNProxyReadTaskPtr task; - AddExtraTableIDColumnTransformAction action; size_t total_rows = 0; Int32 current_reader_idx = -1; + BlockInputStreamPtr current_input_stream; // Temporarily store the block read from current_seg_task->stream and pass it to downstream operators in readImpl. std::optional t_block = std::nullopt; From 3989666e23613da08897464867edf510d23a7144 Mon Sep 17 00:00:00 2001 From: yongman Date: Tue, 2 Jun 2026 21:45:26 +0800 Subject: [PATCH 2/2] optimize tasks dispatch Signed-off-by: yongman --- .../Storages/StorageDisaggregatedColumnar.cpp | 241 ++++++++++++++---- .../Storages/StorageDisaggregatedColumnar.h | 42 ++- 2 files changed, 229 insertions(+), 54 deletions(-) diff --git a/dbms/src/Storages/StorageDisaggregatedColumnar.cpp b/dbms/src/Storages/StorageDisaggregatedColumnar.cpp index 1ffe9dad4dc..90e196d2fb7 100644 --- a/dbms/src/Storages/StorageDisaggregatedColumnar.cpp +++ b/dbms/src/Storages/StorageDisaggregatedColumnar.cpp @@ -509,12 +509,22 @@ void StorageDisaggregated::readThroughColumnar( remote_table_ranges, num_streams); const auto generated_column_infos = genGeneratedColumnInfosForDisaggregatedRead(table_scan); - for (auto & task : read_proxy_tasks) + if (!read_proxy_tasks.empty()) { - group_builder.addConcurrency(RNProxySourceOp::create({ - .exec_context = exec_context, - .task = task, - })); + auto & task_pool = read_proxy_tasks.front(); + const size_t source_num = std::min(num_streams, task_pool->getReaderCount()); + LOG_INFO( + log, + "use shared proxy reader task pool, reader_num={}, source_num={}", + task_pool->getReaderCount(), + source_num); + for (size_t i = 0; i < source_num; ++i) + { + group_builder.addConcurrency(RNProxySourceOp::create({ + .exec_context = exec_context, + .task = task_pool, + })); + } } executeGeneratedColumnPlaceholder(exec_context, group_builder, generated_column_infos, log); @@ -690,12 +700,22 @@ ColumnarReaderPtr createProxyColumnarReader( } // RNProxyReadTask +RNProxyReaderSlot::~RNProxyReaderSlot() +{ + if (reader.has_value() && reader->inner.ptr != nullptr) + RustGcHelper::instance().gcRustPtr(reader->inner.ptr, reader->inner.type); +} + RNProxyReadTask::RNProxyReadTask( std::vector reader_plans_, std::shared_ptr shared_reader_context_) : reader_plans(std::move(reader_plans_)) , shared_reader_context(std::move(shared_reader_context_)) -{} +{ + reader_slots.reserve(reader_plans.size()); + for (size_t i = 0; i < reader_plans.size(); ++i) + reader_slots.emplace_back(std::make_shared()); +} size_t RNProxyReadTask::getReaderCount() const { @@ -763,6 +783,134 @@ ColumnarReaderPtr RNProxyReadTask::createColumnarReaderWithBackoff(size_t reader } } +ColumnarReaderPtr RNProxyReadTask::getOrCreateReader(size_t reader_index) +{ + RUNTIME_CHECK(reader_index < reader_slots.size()); + auto slot = reader_slots[reader_index]; + bool should_create_inline = false; + { + std::unique_lock lock(slot->mutex); + switch (slot->state) + { + case RNProxyReaderMaterializeState::Ready: + { + auto reader = std::move(slot->reader); + slot->reader.reset(); + slot->state = RNProxyReaderMaterializeState::Consumed; + return reader.value(); + } + case RNProxyReaderMaterializeState::Failed: + std::rethrow_exception(slot->exception); + case RNProxyReaderMaterializeState::Consumed: + throw Exception(ErrorCodes::LOGICAL_ERROR, "proxy reader {} is already consumed", reader_index); + case RNProxyReaderMaterializeState::Creating: + slot->cv.wait(lock, [&] { return slot->state != RNProxyReaderMaterializeState::Creating; }); + if (slot->state == RNProxyReaderMaterializeState::Ready) + { + auto reader = std::move(slot->reader); + slot->reader.reset(); + slot->state = RNProxyReaderMaterializeState::Consumed; + return reader.value(); + } + if (slot->state == RNProxyReaderMaterializeState::Failed) + std::rethrow_exception(slot->exception); + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "proxy reader {} becomes invalid after wait, state={}", + reader_index, + static_cast(slot->state)); + case RNProxyReaderMaterializeState::NotStarted: + slot->state = RNProxyReaderMaterializeState::Creating; + should_create_inline = true; + break; + } + } + + RUNTIME_CHECK(should_create_inline); + LOG_INFO( + getLog(), + "materialize proxy reader synchronously, reader_index={}, region_id={}", + reader_index, + reader_plans[reader_index].region_id); + try + { + auto reader = createColumnarReaderWithBackoff(reader_index); + { + auto guard = std::lock_guard(slot->mutex); + slot->state = RNProxyReaderMaterializeState::Consumed; + } + slot->cv.notify_all(); + return reader; + } + catch (...) + { + { + auto guard = std::lock_guard(slot->mutex); + slot->exception = std::current_exception(); + slot->state = RNProxyReaderMaterializeState::Failed; + } + slot->cv.notify_all(); + throw; + } +} + +void RNProxyReadTask::prefetchReader(size_t reader_index) +{ + if (reader_index >= reader_slots.size()) + return; + + std::call_once(prefetch_thread_manager_once, [&] { prefetch_thread_manager = newThreadManager(); }); + + auto slot = reader_slots[reader_index]; + { + auto guard = std::lock_guard(slot->mutex); + if (slot->state != RNProxyReaderMaterializeState::NotStarted) + return; + slot->state = RNProxyReaderMaterializeState::Creating; + } + + prefetch_thread_manager->scheduleThenDetach( + true, + "PrefetchRNProxyReader", + [self = shared_from_this(), slot, reader_index] { + LOG_INFO( + self->getLog(), + "materialize proxy reader asynchronously, reader_index={}, region_id={}", + reader_index, + self->reader_plans[reader_index].region_id); + try + { + auto reader = self->createColumnarReaderWithBackoff(reader_index); + { + auto guard = std::lock_guard(slot->mutex); + if (slot->state == RNProxyReaderMaterializeState::Consumed) + return; + slot->reader.emplace(std::move(reader)); + slot->state = RNProxyReaderMaterializeState::Ready; + } + } + catch (...) + { + { + auto guard = std::lock_guard(slot->mutex); + if (slot->state == RNProxyReaderMaterializeState::Consumed) + return; + slot->exception = std::current_exception(); + slot->state = RNProxyReaderMaterializeState::Failed; + } + } + slot->cv.notify_all(); + }); +} + +std::optional RNProxyReadTask::tryAcquireReaderIndex() +{ + const size_t reader_index = next_reader_index.fetch_add(1, std::memory_order_relaxed); + if (reader_index >= reader_plans.size()) + return std::nullopt; + return reader_index; +} + BlockInputStreamPtr RNProxyReadTask::createInputStream(size_t reader_index) { RUNTIME_CHECK(reader_index < reader_plans.size()); @@ -970,31 +1118,9 @@ std::vector RNProxyReadTask::buildProxyReadTask( } } - unsigned reader_num = all_reader_plans.size(); - if (reader_num == 0) + if (all_reader_plans.empty()) return tasks; - unsigned real_num_streams = std::min(num_streams, reader_num); - // Readers per RNProxyReadTask, it should be ceil of reader_num / real_num_streams. - unsigned readers_per_task = (reader_num + real_num_streams - 1) / real_num_streams; - unsigned reader_idx = 0; - std::vector readers; - for (auto & reader_plan : all_reader_plans) - { - ++reader_idx; - readers.push_back(std::move(reader_plan)); - if (reader_idx == readers_per_task) - { - reader_idx = 0; - tasks.push_back(std::make_shared(std::move(readers), shared_reader_context)); - readers.clear(); - } - } - - if (!readers.empty()) - { - tasks.push_back(std::make_shared(std::move(readers), shared_reader_context)); - } - + tasks.push_back(std::make_shared(std::move(all_reader_plans), shared_reader_context)); return tasks; } @@ -1014,7 +1140,7 @@ void RNProxyInputStream::ensureReader() { if (reader.has_value()) return; - reader.emplace(task->createColumnarReaderWithBackoff(reader_index)); + reader.emplace(task->getOrCreateReader(reader_index)); } RNProxyInputStream::~RNProxyInputStream() @@ -1156,11 +1282,28 @@ Block RNProxyInputStream::readImpl([[maybe_unused]] FilterPtr & res_filter, [[ma void RNProxySourceOp::operateSuffixImpl() { UNUSED(context); - LOG_INFO(log, "Finished reading proxy snapshots, rows={} cost={:.3f}s", total_rows, duration_read_sec); + const double total_cost_sec = total_cost_watch.elapsedSeconds(); + const UInt64 rows_per_sec + = total_cost_sec > 0 ? static_cast(static_cast(total_rows) / total_cost_sec) : 0; + const UInt64 bytes_per_sec + = total_cost_sec > 0 ? static_cast(static_cast(total_bytes) / total_cost_sec) : 0; + LOG_INFO( + log, + "Finished reading proxy snapshots, task_pool_worker_total_cost={:.3f}s claimed_streams={} rows={} " + "rows_per_sec={} " + "bytes={} bytes_per_sec={} read_cost={:.3f}s", + total_cost_sec, + total_streams, + total_rows, + rows_per_sec, + total_bytes, + bytes_per_sec, + duration_read_sec); } void RNProxySourceOp::operatePrefixImpl() { + total_cost_watch.restart(); LOG_INFO(log, "Begin reading proxy snapshots"); } @@ -1179,7 +1322,7 @@ OperatorStatus RNProxySourceOp::readImpl(Block & block) return OperatorStatus::HAS_OUTPUT; } - return current_reader_idx < 0 ? OperatorStatus::IO_IN : awaitImpl(); + return awaitImpl(); } OperatorStatus RNProxySourceOp::awaitImpl() @@ -1189,11 +1332,6 @@ OperatorStatus RNProxySourceOp::awaitImpl() return OperatorStatus::HAS_OUTPUT; } - if (unlikely(current_reader_idx < 0)) - { - current_reader_idx = 0; - } - return OperatorStatus::IO_IN; } @@ -1204,14 +1342,20 @@ OperatorStatus RNProxySourceOp::executeIOImpl() return OperatorStatus::HAS_OUTPUT; } - if (unlikely(current_reader_idx < 0)) + if (!current_input_stream) { - return awaitImpl(); + auto next_reader_idx = task->tryAcquireReaderIndex(); + if (!next_reader_idx.has_value()) + { + done = true; + return OperatorStatus::HAS_OUTPUT; + } + current_reader_idx = next_reader_idx; + current_input_stream = task->createInputStream(current_reader_idx.value()); + ++total_streams; + task->prefetchReader(current_reader_idx.value() + 1); } - if (!current_input_stream) - current_input_stream = task->createInputStream(static_cast(current_reader_idx)); - FilterPtr filter_ignored = nullptr; Stopwatch w{CLOCK_MONOTONIC_COARSE}; Block block = current_input_stream->read(filter_ignored, false); @@ -1219,21 +1363,14 @@ OperatorStatus RNProxySourceOp::executeIOImpl() if likely (block && block.rows() > 0) { total_rows += block.rows(); + total_bytes += block.bytes(); t_block.emplace(std::move(block)); return OperatorStatus::HAS_OUTPUT; } else { current_input_stream.reset(); - if (current_reader_idx == static_cast(task->getReaderCount() - 1)) - { - done = true; - } - else if (current_reader_idx < static_cast(task->getReaderCount() - 1)) - { - ++current_reader_idx; - } - // Current stream is drained, try to read from next stream. + current_reader_idx.reset(); return awaitImpl(); } } diff --git a/dbms/src/Storages/StorageDisaggregatedColumnar.h b/dbms/src/Storages/StorageDisaggregatedColumnar.h index 80a33946f56..9a31996d4df 100644 --- a/dbms/src/Storages/StorageDisaggregatedColumnar.h +++ b/dbms/src/Storages/StorageDisaggregatedColumnar.h @@ -35,12 +35,18 @@ #include #include +#include +#include +#include +#include +#include #include #pragma GCC diagnostic pop namespace DB { class DAGContext; +class ThreadManager; namespace DM { @@ -58,6 +64,26 @@ struct RNProxyReaderPlan std::vector> physical_table_ranges; }; +enum class RNProxyReaderMaterializeState +{ + NotStarted, + Creating, + Ready, + Failed, + Consumed, +}; + +struct RNProxyReaderSlot +{ + ~RNProxyReaderSlot(); + + std::mutex mutex; + std::condition_variable cv; + RNProxyReaderMaterializeState state = RNProxyReaderMaterializeState::NotStarted; + std::optional reader; + std::exception_ptr exception; +}; + class RNProxyReadTask; using RNProxyReadTaskPtr = std::shared_ptr; class RNProxyReadTask @@ -91,6 +117,12 @@ class RNProxyReadTask ColumnarReaderPtr createColumnarReaderWithBackoff(size_t reader_index) const; + ColumnarReaderPtr getOrCreateReader(size_t reader_index); + + void prefetchReader(size_t reader_index); + + std::optional tryAcquireReaderIndex(); + size_t getReaderCount() const; const Context & getContext() const; @@ -112,6 +144,10 @@ class RNProxyReadTask private: std::vector reader_plans; std::shared_ptr shared_reader_context; + std::vector> reader_slots; + std::atomic_size_t next_reader_index = 0; + std::once_flag prefetch_thread_manager_once; + std::shared_ptr prefetch_thread_manager; }; class RNProxyInputStream : public IProfilingBlockInputStream @@ -222,9 +258,11 @@ class RNProxySourceOp : public SourceOp const Context & context; const LoggerPtr log; RNProxyReadTaskPtr task; + UInt64 total_bytes = 0; size_t total_rows = 0; + size_t total_streams = 0; - Int32 current_reader_idx = -1; + std::optional current_reader_idx; BlockInputStreamPtr current_input_stream; // Temporarily store the block read from current_seg_task->stream and pass it to downstream operators in readImpl. @@ -233,7 +271,7 @@ class RNProxySourceOp : public SourceOp bool done = false; // Count the time spent waiting for segment tasks to be ready. //double duration_wait_ready_task_sec = 0; - Stopwatch wait_stop_watch{CLOCK_MONOTONIC_COARSE}; + Stopwatch total_cost_watch{CLOCK_MONOTONIC_COARSE}; // Count the time consumed by reading blocks in the stream of segment tasks. double duration_read_sec = 0;