From 78760a623590319799ca2132a710be352a2f7057 Mon Sep 17 00:00:00 2001 From: comphead Date: Fri, 13 Mar 2026 18:03:59 -0700 Subject: [PATCH 1/2] chore: Memory scan log exec experiments --- native/core/src/execution/operators/mod.rs | 2 + .../execution/operators/scan_memory_log.rs | 267 ++++++++++++++++++ native/core/src/execution/planner.rs | 3 + 3 files changed, 272 insertions(+) create mode 100644 native/core/src/execution/operators/scan_memory_log.rs diff --git a/native/core/src/execution/operators/mod.rs b/native/core/src/execution/operators/mod.rs index 07ee995367..888c39af88 100644 --- a/native/core/src/execution/operators/mod.rs +++ b/native/core/src/execution/operators/mod.rs @@ -34,7 +34,9 @@ pub use parquet_writer::ParquetWriterExec; mod csv_scan; pub mod projection; mod scan; +mod scan_memory_log; pub use csv_scan::init_csv_datasource_exec; +pub use scan_memory_log::ScanMemoryLogExec; /// Error returned during executing operators. #[derive(thiserror::Error, Debug)] diff --git a/native/core/src/execution/operators/scan_memory_log.rs b/native/core/src/execution/operators/scan_memory_log.rs new file mode 100644 index 0000000000..f29f741cbd --- /dev/null +++ b/native/core/src/execution/operators/scan_memory_log.rs @@ -0,0 +1,267 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A transparent wrapper around a scan ExecutionPlan that logs memory stats +//! when the scan stream reaches EOF. Uses jemalloc stats (when available) +//! to track how much memory was allocated during the scan phase. +//! +//! Captures three jemalloc snapshots: +//! 1. Before `child.execute()` — baseline +//! 2. After `child.execute()` — cost of stream setup (file opens, metadata reads) +//! 3. On stream EOF — total cost of the entire scan including all data reads +//! +//! Note: jemalloc `allocated` is process-wide, so concurrent partitions will +//! see each other's allocations. The deltas are approximate but still useful +//! for identifying which partitions drive memory growth. + +use arrow::array::RecordBatch; +use arrow::datatypes::SchemaRef; +use datafusion::common::Result as DataFusionResult; +use datafusion::execution::TaskContext; +use datafusion::physical_expr::EquivalenceProperties; +use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + RecordBatchStream, SendableRecordBatchStream, +}; +use futures::Stream; +use std::any::Any; +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::task::{Context, Poll}; + +/// Global counters to aggregate stats across all partitions. +/// These are process-wide — multiple concurrent queries will aggregate together. +static TOTAL_DELTA_SUM: AtomicUsize = AtomicUsize::new(0); +static MAX_PEAK: AtomicUsize = AtomicUsize::new(0); +static COMPLETED_PARTITIONS: AtomicUsize = AtomicUsize::new(0); +static TOTAL_ROWS_SUM: AtomicUsize = AtomicUsize::new(0); + +/// Read jemalloc `stats::allocated` (bytes currently allocated by the application). +/// Returns 0 if jemalloc feature is not enabled. +#[cfg(feature = "jemalloc")] +fn jemalloc_allocated() -> usize { + use tikv_jemalloc_ctl::{epoch, stats}; + epoch::advance().ok(); + stats::allocated::read().unwrap_or(0) +} + +#[cfg(not(feature = "jemalloc"))] +fn jemalloc_allocated() -> usize { + 0 +} + +/// Wraps a child ExecutionPlan and logs memory stats when the child's stream +/// reaches EOF. Passes through all batches unchanged. +/// +/// `spark_partition` is the actual Spark partition index (not the DataFusion +/// partition, which is always 0 in Comet's per-partition execution model). +#[derive(Debug)] +pub struct ScanMemoryLogExec { + child: Arc, + spark_partition: i32, + cache: PlanProperties, + metrics: ExecutionPlanMetricsSet, +} + +impl ScanMemoryLogExec { + pub fn new(child: Arc, spark_partition: i32) -> Self { + let cache = PlanProperties::new( + EquivalenceProperties::new(child.schema()), + child.output_partitioning().clone(), + EmissionType::Final, + Boundedness::Bounded, + ); + Self { + child, + spark_partition, + cache, + metrics: ExecutionPlanMetricsSet::default(), + } + } +} + +impl DisplayAs for ScanMemoryLogExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!( + f, + "ScanMemoryLogExec: spark_partition={}", + self.spark_partition + ) + } + DisplayFormatType::TreeRender => unimplemented!(), + } + } +} + +impl ExecutionPlan for ScanMemoryLogExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.child.schema() + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.child] + } + + fn with_new_children( + self: Arc, + mut children: Vec>, + ) -> DataFusionResult> { + Ok(Arc::new(ScanMemoryLogExec::new( + children.remove(0), + self.spark_partition, + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> DataFusionResult { + // Snapshot before child.execute() — baseline + let allocated_before_execute = jemalloc_allocated(); + + let child_stream = self.child.execute(partition, Arc::clone(&context))?; + + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + Ok(Box::pin(ScanMemoryLogStream { + child: child_stream, + context, + spark_partition: self.spark_partition, + logged: false, + baseline_metrics, + allocated_before_scan: allocated_before_execute, + peak_allocated: allocated_before_execute, + batch_count: 0, + total_rows: 0, + })) + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + + fn name(&self) -> &str { + "ScanMemoryLogExec" + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } +} + +struct ScanMemoryLogStream { + child: SendableRecordBatchStream, + context: Arc, + spark_partition: i32, + logged: bool, + baseline_metrics: BaselineMetrics, + /// jemalloc allocated bytes captured before child.execute() + allocated_before_scan: usize, + /// High-water mark of jemalloc allocated bytes seen during polling + peak_allocated: usize, + /// Number of batches consumed through this stream + batch_count: usize, + /// Total rows consumed through this stream + total_rows: usize, +} + +impl Stream for ScanMemoryLogStream { + type Item = DataFusionResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let result = Pin::new(&mut self.child).poll_next(cx); + match &result { + Poll::Ready(Some(Ok(batch))) => { + self.batch_count += 1; + self.total_rows += batch.num_rows(); + self.baseline_metrics.record_output(batch.num_rows()); + // Track peak allocation across all batches + let current = jemalloc_allocated(); + if current > self.peak_allocated { + self.peak_allocated = current; + } + } + Poll::Ready(None) if !self.logged => { + self.logged = true; + let pool = self.context.memory_pool(); + let allocated_at_eof = jemalloc_allocated(); + let total_delta = + allocated_at_eof.saturating_sub(self.allocated_before_scan); + let peak_delta = + self.peak_allocated.saturating_sub(self.allocated_before_scan); + + // Accumulate into global counters + TOTAL_DELTA_SUM.fetch_add(total_delta, Ordering::Relaxed); + TOTAL_ROWS_SUM.fetch_add(self.total_rows, Ordering::Relaxed); + // Update max peak (atomic CAS loop) + let mut current_max = MAX_PEAK.load(Ordering::Relaxed); + while self.peak_allocated > current_max { + match MAX_PEAK.compare_exchange_weak( + current_max, + self.peak_allocated, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(actual) => current_max = actual, + } + } + let completed = COMPLETED_PARTITIONS.fetch_add(1, Ordering::Relaxed) + 1; + + log::info!( + "ScanMemoryLogExec spark_partition={}: scan complete, \ + batches={}, rows={}, \ + memory_pool_reserved={}, \ + jemalloc_allocated: before={}, at_eof={}, peak={}, \ + total_delta={}, peak_delta={} | \ + aggregate: completed_partitions={}, sum_total_delta={}, \ + max_peak={}, sum_rows={}", + self.spark_partition, + self.batch_count, + self.total_rows, + pool.reserved(), + self.allocated_before_scan, + allocated_at_eof, + self.peak_allocated, + total_delta, + peak_delta, + completed, + TOTAL_DELTA_SUM.load(Ordering::Relaxed), + MAX_PEAK.load(Ordering::Relaxed), + TOTAL_ROWS_SUM.load(Ordering::Relaxed), + ); + } + _ => {} + } + result + } +} + +impl RecordBatchStream for ScanMemoryLogStream { + fn schema(&self) -> SchemaRef { + self.child.schema() + } +} diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index b79b43f6c9..e35549445d 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -23,6 +23,7 @@ pub mod operator_registry; use crate::execution::operators::init_csv_datasource_exec; use crate::execution::operators::IcebergScanExec; +use crate::execution::operators::ScanMemoryLogExec; use crate::{ errors::ExpressionError, execution::{ @@ -1185,6 +1186,8 @@ impl PhysicalPlanner { self.session_ctx(), common.encryption_enabled, )?; + let scan: Arc = + Arc::new(ScanMemoryLogExec::new(scan, self.partition)); Ok(( vec![], Arc::new(SparkPlan::new(spark_plan.plan_id, scan, vec![])), From 7a99b4030a59041366ebab540a770e8a7c2a0861 Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 14 Mar 2026 11:52:14 -0700 Subject: [PATCH 2/2] chore: Memory scan log exec experiments --- .../execution/operators/scan_memory_log.rs | 181 ++++++++++-------- 1 file changed, 103 insertions(+), 78 deletions(-) diff --git a/native/core/src/execution/operators/scan_memory_log.rs b/native/core/src/execution/operators/scan_memory_log.rs index f29f741cbd..bc30c8fa5e 100644 --- a/native/core/src/execution/operators/scan_memory_log.rs +++ b/native/core/src/execution/operators/scan_memory_log.rs @@ -15,18 +15,13 @@ // specific language governing permissions and limitations // under the License. -//! A transparent wrapper around a scan ExecutionPlan that logs memory stats -//! when the scan stream reaches EOF. Uses jemalloc stats (when available) -//! to track how much memory was allocated during the scan phase. +//! A transparent wrapper around a scan ExecutionPlan that tracks per-partition +//! memory allocation using jemalloc's per-thread counters. //! -//! Captures three jemalloc snapshots: -//! 1. Before `child.execute()` — baseline -//! 2. After `child.execute()` — cost of stream setup (file opens, metadata reads) -//! 3. On stream EOF — total cost of the entire scan including all data reads -//! -//! Note: jemalloc `allocated` is process-wide, so concurrent partitions will -//! see each other's allocations. The deltas are approximate but still useful -//! for identifying which partitions drive memory growth. +//! Uses `thread.allocatedp` / `thread.deallocatedp` which are thread-local +//! monotonic counters. By measuring deltas around each `poll_next()` call, +//! we get accurate per-partition allocation/deallocation numbers even with +//! concurrent partitions on different threads. use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; @@ -42,36 +37,57 @@ use datafusion::physical_plan::{ use futures::Stream; use std::any::Any; use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; /// Global counters to aggregate stats across all partitions. -/// These are process-wide — multiple concurrent queries will aggregate together. -static TOTAL_DELTA_SUM: AtomicUsize = AtomicUsize::new(0); -static MAX_PEAK: AtomicUsize = AtomicUsize::new(0); -static COMPLETED_PARTITIONS: AtomicUsize = AtomicUsize::new(0); -static TOTAL_ROWS_SUM: AtomicUsize = AtomicUsize::new(0); - -/// Read jemalloc `stats::allocated` (bytes currently allocated by the application). -/// Returns 0 if jemalloc feature is not enabled. +static AGG_THREAD_ALLOCATED: AtomicU64 = AtomicU64::new(0); +static AGG_THREAD_DEALLOCATED: AtomicU64 = AtomicU64::new(0); +static AGG_PEAK_PROCESS_ALLOCATED: AtomicU64 = AtomicU64::new(0); +static AGG_COMPLETED_PARTITIONS: AtomicU64 = AtomicU64::new(0); +static AGG_TOTAL_ROWS: AtomicU64 = AtomicU64::new(0); + +/// Per-thread allocated/deallocated from jemalloc. +/// These are monotonically increasing counters scoped to the calling thread. +#[cfg(feature = "jemalloc")] +fn jemalloc_thread_stats() -> (u64, u64) { + use tikv_jemalloc_ctl::thread; + let allocated = thread::allocatedp::read().map(|p| p.get()).unwrap_or(0); + let deallocated = thread::deallocatedp::read().map(|p| p.get()).unwrap_or(0); + (allocated, deallocated) +} + +#[cfg(not(feature = "jemalloc"))] +fn jemalloc_thread_stats() -> (u64, u64) { + (0, 0) +} + +/// Process-wide jemalloc allocated bytes. #[cfg(feature = "jemalloc")] -fn jemalloc_allocated() -> usize { +fn jemalloc_process_allocated() -> u64 { use tikv_jemalloc_ctl::{epoch, stats}; epoch::advance().ok(); - stats::allocated::read().unwrap_or(0) + stats::allocated::read().unwrap_or(0) as u64 } #[cfg(not(feature = "jemalloc"))] -fn jemalloc_allocated() -> usize { +fn jemalloc_process_allocated() -> u64 { 0 } -/// Wraps a child ExecutionPlan and logs memory stats when the child's stream -/// reaches EOF. Passes through all batches unchanged. -/// -/// `spark_partition` is the actual Spark partition index (not the DataFusion -/// partition, which is always 0 in Comet's per-partition execution model). +fn update_atomic_max(atomic: &AtomicU64, value: u64) { + let mut current = atomic.load(Ordering::Relaxed); + while value > current { + match atomic.compare_exchange_weak(current, value, Ordering::Relaxed, Ordering::Relaxed) { + Ok(_) => break, + Err(actual) => current = actual, + } + } +} + +/// Wraps a child ExecutionPlan and tracks memory allocated/deallocated +/// during scan execution using jemalloc per-thread counters. #[derive(Debug)] pub struct ScanMemoryLogExec { child: Arc, @@ -140,11 +156,25 @@ impl ExecutionPlan for ScanMemoryLogExec { partition: usize, context: Arc, ) -> DataFusionResult { - // Snapshot before child.execute() — baseline - let allocated_before_execute = jemalloc_allocated(); + // Capture thread-local counters before execute() + let (thread_alloc_before, thread_dealloc_before) = jemalloc_thread_stats(); let child_stream = self.child.execute(partition, Arc::clone(&context))?; + // Capture after execute() to measure setup cost + let (thread_alloc_after, thread_dealloc_after) = jemalloc_thread_stats(); + let execute_allocated = thread_alloc_after - thread_alloc_before; + let execute_deallocated = thread_dealloc_after - thread_dealloc_before; + + log::info!( + "ScanMemoryLogExec spark_partition={}: execute() setup: \ + thread_allocated={}, thread_deallocated={}, net={}", + self.spark_partition, + execute_allocated, + execute_deallocated, + execute_allocated as i64 - execute_deallocated as i64, + ); + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); Ok(Box::pin(ScanMemoryLogStream { child: child_stream, @@ -152,8 +182,9 @@ impl ExecutionPlan for ScanMemoryLogExec { spark_partition: self.spark_partition, logged: false, baseline_metrics, - allocated_before_scan: allocated_before_execute, - peak_allocated: allocated_before_execute, + // Accumulate total thread-level alloc/dealloc across all polls + thread_total_allocated: execute_allocated, + thread_total_deallocated: execute_deallocated, batch_count: 0, total_rows: 0, })) @@ -178,13 +209,11 @@ struct ScanMemoryLogStream { spark_partition: i32, logged: bool, baseline_metrics: BaselineMetrics, - /// jemalloc allocated bytes captured before child.execute() - allocated_before_scan: usize, - /// High-water mark of jemalloc allocated bytes seen during polling - peak_allocated: usize, - /// Number of batches consumed through this stream + /// Accumulated bytes allocated on the thread during this scan + thread_total_allocated: u64, + /// Accumulated bytes deallocated on the thread during this scan + thread_total_deallocated: u64, batch_count: usize, - /// Total rows consumed through this stream total_rows: usize, } @@ -192,66 +221,62 @@ impl Stream for ScanMemoryLogStream { type Item = DataFusionResult; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Snapshot thread counters before polling child + let (alloc_before, dealloc_before) = jemalloc_thread_stats(); + let result = Pin::new(&mut self.child).poll_next(cx); + + // Snapshot after — delta is what this poll allocated/freed + let (alloc_after, dealloc_after) = jemalloc_thread_stats(); + self.thread_total_allocated += alloc_after - alloc_before; + self.thread_total_deallocated += dealloc_after - dealloc_before; + match &result { Poll::Ready(Some(Ok(batch))) => { self.batch_count += 1; self.total_rows += batch.num_rows(); self.baseline_metrics.record_output(batch.num_rows()); - // Track peak allocation across all batches - let current = jemalloc_allocated(); - if current > self.peak_allocated { - self.peak_allocated = current; - } } Poll::Ready(None) if !self.logged => { self.logged = true; let pool = self.context.memory_pool(); - let allocated_at_eof = jemalloc_allocated(); - let total_delta = - allocated_at_eof.saturating_sub(self.allocated_before_scan); - let peak_delta = - self.peak_allocated.saturating_sub(self.allocated_before_scan); - - // Accumulate into global counters - TOTAL_DELTA_SUM.fetch_add(total_delta, Ordering::Relaxed); - TOTAL_ROWS_SUM.fetch_add(self.total_rows, Ordering::Relaxed); - // Update max peak (atomic CAS loop) - let mut current_max = MAX_PEAK.load(Ordering::Relaxed); - while self.peak_allocated > current_max { - match MAX_PEAK.compare_exchange_weak( - current_max, - self.peak_allocated, - Ordering::Relaxed, - Ordering::Relaxed, - ) { - Ok(_) => break, - Err(actual) => current_max = actual, - } - } - let completed = COMPLETED_PARTITIONS.fetch_add(1, Ordering::Relaxed) + 1; + let process_allocated = jemalloc_process_allocated(); + let net = self.thread_total_allocated as i64 + - self.thread_total_deallocated as i64; + + // Accumulate into global aggregates + AGG_THREAD_ALLOCATED + .fetch_add(self.thread_total_allocated, Ordering::Relaxed); + AGG_THREAD_DEALLOCATED + .fetch_add(self.thread_total_deallocated, Ordering::Relaxed); + update_atomic_max(&AGG_PEAK_PROCESS_ALLOCATED, process_allocated); + AGG_TOTAL_ROWS.fetch_add(self.total_rows as u64, Ordering::Relaxed); + let completed = + AGG_COMPLETED_PARTITIONS.fetch_add(1, Ordering::Relaxed) + 1; log::info!( "ScanMemoryLogExec spark_partition={}: scan complete, \ batches={}, rows={}, \ memory_pool_reserved={}, \ - jemalloc_allocated: before={}, at_eof={}, peak={}, \ - total_delta={}, peak_delta={} | \ - aggregate: completed_partitions={}, sum_total_delta={}, \ - max_peak={}, sum_rows={}", + thread: allocated={}, deallocated={}, net={}, \ + process_allocated={} | \ + aggregate(n={}): thread_allocated={}, thread_deallocated={}, \ + thread_net={}, max_process_allocated={}, total_rows={}", self.spark_partition, self.batch_count, self.total_rows, pool.reserved(), - self.allocated_before_scan, - allocated_at_eof, - self.peak_allocated, - total_delta, - peak_delta, + self.thread_total_allocated, + self.thread_total_deallocated, + net, + process_allocated, completed, - TOTAL_DELTA_SUM.load(Ordering::Relaxed), - MAX_PEAK.load(Ordering::Relaxed), - TOTAL_ROWS_SUM.load(Ordering::Relaxed), + AGG_THREAD_ALLOCATED.load(Ordering::Relaxed), + AGG_THREAD_DEALLOCATED.load(Ordering::Relaxed), + AGG_THREAD_ALLOCATED.load(Ordering::Relaxed) as i64 + - AGG_THREAD_DEALLOCATED.load(Ordering::Relaxed) as i64, + AGG_PEAK_PROCESS_ALLOCATED.load(Ordering::Relaxed), + AGG_TOTAL_ROWS.load(Ordering::Relaxed), ); } _ => {}