diff --git a/dbms/src/Common/BackgroundTask.cpp b/dbms/src/Common/BackgroundTask.cpp index a3fc69bbeee..e1f7951863b 100644 --- a/dbms/src/Common/BackgroundTask.cpp +++ b/dbms/src/Common/BackgroundTask.cpp @@ -67,6 +67,7 @@ void CollectProcInfoBackgroundTask::memCheckJob() // Update the memory usage of the current process. Defined in Common/MemoryTracker.cpp auto res = get_process_mem_usage(); real_rss = res.resident_bytes; + rss_file = res.rss_file_bytes; proc_num_threads = res.cur_proc_num_threads; proc_virt_size = res.cur_virt_bytes; baseline_of_query_mem_tracker = root_of_query_mem_trackers->get(); diff --git a/dbms/src/Common/MemoryAllocTrace.cpp b/dbms/src/Common/MemoryAllocTrace.cpp index 1edc0c5c2bf..f10b8f50905 100644 --- a/dbms/src/Common/MemoryAllocTrace.cpp +++ b/dbms/src/Common/MemoryAllocTrace.cpp @@ -16,6 +16,8 @@ #include // Included for `USE_JEMALLOC` #include +#include +#include #if USE_JEMALLOC #include @@ -38,45 +40,76 @@ std::tuple getAllocDeallocPtr() #endif } -bool process_mem_usage(double & resident_set, Int64 & cur_proc_num_threads, UInt64 & cur_virt_size) +static bool parseStatusFieldKb(const std::string & line, std::string_view field_name, UInt64 & out_kb) { - resident_set = 0.0; + if (line.rfind(field_name, 0) != 0) + return false; + + const auto colon_pos = line.find(':'); + if (colon_pos == std::string::npos) + return false; + + std::istringstream iss(line.substr(colon_pos + 1)); + iss >> out_kb; + return !iss.fail(); +} + +static bool parseStatusFieldInt(const std::string & line, std::string_view field_name, Int64 & out_value) +{ + if (line.rfind(field_name, 0) != 0) + return false; - // 'file' stat seems to give the most reliable results - std::ifstream stat_stream("/proc/self/stat", std::ios_base::in); - // if "/proc/self/stat" is not supported - if (!stat_stream.is_open()) + const auto colon_pos = line.find(':'); + if (colon_pos == std::string::npos) return false; - // dummy vars for leading entries in stat that we don't care about - std::string pid, comm, state, ppid, pgrp, session, tty_nr; - std::string tpgid, flags, minflt, cminflt, majflt, cmajflt; - std::string utime, stime, cutime, cstime, priority, nice; - std::string itrealvalue, starttime; + std::istringstream iss(line.substr(colon_pos + 1)); + iss >> out_value; + return !iss.fail(); +} - // the field we want - Int64 rss; +bool process_mem_usage( + UInt64 & resident_bytes, + UInt64 & rss_file_bytes, + Int64 & cur_proc_num_threads, + UInt64 & cur_virt_size) +{ + std::ifstream status_stream("/proc/self/status", std::ios_base::in); + if (!status_stream.is_open()) + return false; - stat_stream >> pid >> comm >> state >> ppid >> pgrp >> session >> tty_nr >> tpgid >> flags >> minflt >> cminflt - >> majflt >> cmajflt >> utime >> stime >> cutime >> cstime >> priority >> nice >> cur_proc_num_threads - >> itrealvalue >> starttime >> cur_virt_size >> rss; // don't care about the rest + UInt64 vm_rss_kb = 0; + UInt64 rss_file_kb = 0; + UInt64 vm_size_kb = 0; + Int64 threads = 1; - stat_stream.close(); + std::string line; + while (std::getline(status_stream, line)) + { + if (parseStatusFieldKb(line, "VmRSS", vm_rss_kb)) + resident_bytes = vm_rss_kb * 1024; + if (parseStatusFieldKb(line, "RssFile", rss_file_kb)) + rss_file_bytes = rss_file_kb * 1024; + if (parseStatusFieldKb(line, "VmSize", vm_size_kb)) + cur_virt_size = vm_size_kb * 1024; + if (parseStatusFieldInt(line, "Threads", threads)) + cur_proc_num_threads = threads; + } - Int64 page_size_kb = sysconf(_SC_PAGE_SIZE) / 1024; // in case x86-64 is configured to use 2MB pages - resident_set = rss * page_size_kb; return true; } ProcessMemoryUsage get_process_mem_usage() { - double resident_set; + UInt64 raw_rss = 0; + UInt64 rss_file = 0; Int64 cur_proc_num_threads = 1; UInt64 cur_virt_size = 0; - process_mem_usage(resident_set, cur_proc_num_threads, cur_virt_size); - resident_set *= 1024; // transfrom from KB to bytes + process_mem_usage(raw_rss, rss_file, cur_proc_num_threads, cur_virt_size); + return ProcessMemoryUsage{ - static_cast(resident_set), + raw_rss, + rss_file, cur_virt_size, cur_proc_num_threads, }; diff --git a/dbms/src/Common/MemoryAllocTrace.h b/dbms/src/Common/MemoryAllocTrace.h index d6af277a952..4570bffaae6 100644 --- a/dbms/src/Common/MemoryAllocTrace.h +++ b/dbms/src/Common/MemoryAllocTrace.h @@ -25,6 +25,7 @@ std::tuple getAllocDeallocPtr(); struct ProcessMemoryUsage { UInt64 resident_bytes; + UInt64 rss_file_bytes; UInt64 cur_virt_bytes; Int64 cur_proc_num_threads; }; diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index 7e875babc57..e4ebf0840f0 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -31,8 +31,40 @@ extern const Metric MemoryTrackingSharedColumnData; extern const Metric MemoryTrackingKVStore; } // namespace CurrentMetrics -std::atomic real_rss{0}, proc_num_threads{1}, baseline_of_query_mem_tracker{0}; +std::atomic real_rss{0}, rss_file{0}, proc_num_threads{1}, baseline_of_query_mem_tracker{0}; std::atomic proc_virt_size{0}; +std::atomic exclude_rss_file_from_memory_control{false}; + +void setExcludeRssFileFromMemoryControl(bool value) +{ + exclude_rss_file_from_memory_control.store(value, std::memory_order_relaxed); +} + +bool getExcludeRssFileFromMemoryControl() +{ + return exclude_rss_file_from_memory_control.load(std::memory_order_relaxed); +} + +struct MemoryControlRssInfo +{ + Int64 real_rss; + Int64 rss_file; + Int64 memory_control_rss; +}; + +static MemoryControlRssInfo getMemoryControlRss() +{ + const Int64 current_real_rss = real_rss.load(std::memory_order_relaxed); + const Int64 current_rss_file = rss_file.load(std::memory_order_relaxed); + if (!getExcludeRssFileFromMemoryControl()) + return {current_real_rss, current_rss_file, current_real_rss}; + return { + current_real_rss, + current_rss_file, + current_real_rss > current_rss_file ? current_real_rss - current_rss_file : 0, + }; +} + MemoryTracker::~MemoryTracker() { // Destruction of global root mem tracker means the process is shutting down, log and metrics models may have been released! @@ -133,9 +165,10 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) { Int64 current_limit = limit.load(std::memory_order_relaxed); Int64 current_accuracy_diff_for_test = accuracy_diff_for_test.load(std::memory_order_relaxed); + const auto rss_info = getMemoryControlRss(); if (unlikely( !next.load(std::memory_order_relaxed) && current_accuracy_diff_for_test && current_limit - && real_rss > current_accuracy_diff_for_test + current_limit)) + && rss_info.memory_control_rss > current_accuracy_diff_for_test + current_limit)) { DB::FmtBuffer fmt_buf; fmt_buf.append("Memory tracker accuracy "); @@ -144,10 +177,13 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) fmt_buf.fmtAppend(" {}", tmp_decr); fmt_buf.fmtAppend( - ": fault injected. real_rss ({}) is much larger than limit ({}). Debug info, threads of process: {}, " + ": fault injected. memory_control_rss ({}) is much larger than limit ({}). Debug info, " + "real_rss: {}, rss_file: {}, threads of process: {}, " "memory usage tracked by ProcessList: peak {}, current {}. Virtual memory size: {}.", - formatReadableSizeWithBinarySuffix(real_rss), + formatReadableSizeWithBinarySuffix(rss_info.memory_control_rss), formatReadableSizeWithBinarySuffix(current_limit), + formatReadableSizeWithBinarySuffix(rss_info.real_rss), + formatReadableSizeWithBinarySuffix(rss_info.rss_file), proc_num_threads.load(), (root_of_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_query_mem_trackers->peak) : "0"), @@ -181,7 +217,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) Int64 current_bytes_rss_larger_than_limit = bytes_rss_larger_than_limit.load(std::memory_order_relaxed); bool is_rss_too_large = (!next.load(std::memory_order_relaxed) && current_limit - && real_rss > current_limit + current_bytes_rss_larger_than_limit + && rss_info.memory_control_rss > current_limit + current_bytes_rss_larger_than_limit && will_be > baseline_of_query_mem_tracker); if (is_rss_too_large || unlikely(current_limit && will_be > current_limit)) { @@ -207,11 +243,14 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) else { // RSS too large fmt_buf.fmtAppend( - " exceeded caused by 'RSS(Resident Set Size) much larger than limit' : process memory size would " - "be {} for (attempt to allocate chunk of {} bytes), limit of memory for data computing : {}.", - formatReadableSizeWithBinarySuffix(real_rss), + " exceeded caused by 'memory_control_rss much larger than limit' : memory_control_rss would " + "be {} for (attempt to allocate chunk of {} bytes), limit of memory for data computing : {}. " + "real_rss={}, rss_file={}.", + formatReadableSizeWithBinarySuffix(rss_info.memory_control_rss), size, - formatReadableSizeWithBinarySuffix(current_limit)); + formatReadableSizeWithBinarySuffix(current_limit), + formatReadableSizeWithBinarySuffix(rss_info.real_rss), + formatReadableSizeWithBinarySuffix(rss_info.rss_file)); } fmt_buf.fmtAppend(" Memory Usage of Storage: {}", storageMemoryUsageDetail()); diff --git a/dbms/src/Common/MemoryTracker.h b/dbms/src/Common/MemoryTracker.h index 98775a7a13b..61d52bca79b 100644 --- a/dbms/src/Common/MemoryTracker.h +++ b/dbms/src/Common/MemoryTracker.h @@ -22,8 +22,11 @@ #include #include -extern std::atomic real_rss, proc_num_threads, baseline_of_query_mem_tracker; +extern std::atomic real_rss, rss_file, proc_num_threads, baseline_of_query_mem_tracker; extern std::atomic proc_virt_size; + +void setExcludeRssFileFromMemoryControl(bool value); +bool getExcludeRssFileFromMemoryControl(); namespace CurrentMetrics { extern const Metric MemoryTracking; diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 94c3fde88f5..cc194faf22b 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -1274,6 +1275,22 @@ try GRPCCompletionQueuePool::global_instance = std::make_unique(size); } + // TiCI reader uses mmap heavily; file-backed RSS (RssFile) should not trigger memory control. + // Effective value is true only when TiCI reader is enabled AND + // `tici.exclude-rss-file-from-memory-control` is true (default: true). + // If reader is disabled, this stays false regardless of the config item. + const auto tici_reader_addr = config().getString("tici.reader-node.addr", ""); + const auto tici_reader_port = config().getInt("tici.reader-node.port", 0); + const bool tici_reader_enabled = !tici_reader_addr.empty() || tici_reader_port > 0; + const bool exclude_rss_file_from_memory_control + = tici_reader_enabled && config().getBool("tici.exclude-rss-file-from-memory-control", true); + setExcludeRssFileFromMemoryControl(exclude_rss_file_from_memory_control); + LOG_INFO( + log, + "TiCI memory control config: reader_enabled={} exclude_rss_file_from_memory_control={}", + tici_reader_enabled, + exclude_rss_file_from_memory_control); + /// startup flash service for handling coprocessor and MPP requests. FlashGrpcServerHolder flash_grpc_server_holder(this->context(), this->config(), raft_config, log); @@ -1289,9 +1306,7 @@ try proxy_machine.runKVStore(tmt_context); - auto tici_reader_addr = config().getString("tici.reader-node.addr", ""); - auto tici_reader_port = config().getInt("tici.reader-node.port", 0); - if (!tici_reader_addr.empty() || tici_reader_port > 0) + if (tici_reader_enabled) { Stopwatch watch; auto service_addr = config().getString("flash.service_addr");