diff --git a/Cargo.lock b/Cargo.lock index c552835a2cb6f..664cd46af74ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2463,6 +2463,7 @@ version = "52.1.0" dependencies = [ "ahash", "arrow", + "arrow-data", "arrow-ord", "arrow-schema", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index f19d0fbfa0eb4..b2f106814ff1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,6 +96,7 @@ arrow = { version = "57.3.0", features = [ "chrono-tz", ] } arrow-buffer = { version = "57.2.0", default-features = false } +arrow-data = { version = "57.3.0", default-features = false } arrow-flight = { version = "57.3.0", features = [ "flight-sql-experimental", ] } diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 13f91fd7d4ea2..7d44057cdee9b 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -49,6 +49,7 @@ name = "datafusion_physical_plan" [dependencies] ahash = { workspace = true } arrow = { workspace = true } +arrow-data = { workspace = true } arrow-ord = { workspace = true } arrow-schema = { workspace = true } async-trait = { workspace = true } @@ -73,6 +74,7 @@ pin-project-lite = "^0.2.7" tokio = { workspace = true } [dev-dependencies] +arrow-data = { workspace = true } criterion = { workspace = true, features = ["async_futures"] } datafusion-functions-aggregate = { workspace = true } datafusion-functions-window = { workspace = true } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 55e1f460e1901..f23b855db4610 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -51,7 +51,7 @@ use crate::{ Statistics, }; -use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray}; +use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays}; use arrow::datatypes::SchemaRef; use datafusion_common::config::SpillCompression; @@ -402,8 +402,6 @@ impl ExternalSorter { Some((self.spill_manager.create_in_progress_file("Sorting")?, 0)); } - Self::organize_stringview_arrays(globally_sorted_batches)?; - debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); let batches_to_spill = std::mem::take(globally_sorted_batches); @@ -447,71 +445,6 @@ impl ExternalSorter { Ok(()) } - /// Reconstruct `globally_sorted_batches` to organize the payload buffers of each - /// `StringViewArray` in sequential order by calling `gc()` on them. - /// - /// Note this is a workaround until is - /// available - /// - /// # Rationale - /// After (merge-based) sorting, all batches will be sorted into a single run, - /// but physically this sorted run is chunked into many small batches. For - /// `StringViewArray`s inside each sorted run, their inner buffers are not - /// re-constructed by default, leading to non-sequential payload locations - /// (permutated by `interleave()` Arrow kernel). A single payload buffer might - /// be shared by multiple `RecordBatch`es. - /// When writing each batch to disk, the writer has to write all referenced buffers, - /// because they have to be read back one by one to reduce memory usage. This - /// causes extra disk reads and writes, and potentially execution failure. - /// - /// # Example - /// Before sorting: - /// batch1 -> buffer1 - /// batch2 -> buffer2 - /// - /// sorted_batch1 -> buffer1 - /// -> buffer2 - /// sorted_batch2 -> buffer1 - /// -> buffer2 - /// - /// Then when spilling each batch, the writer has to write all referenced buffers - /// repeatedly. - fn organize_stringview_arrays( - globally_sorted_batches: &mut Vec, - ) -> Result<()> { - let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len()); - - for batch in globally_sorted_batches.drain(..) { - let mut new_columns: Vec> = - Vec::with_capacity(batch.num_columns()); - - let mut arr_mutated = false; - for array in batch.columns() { - if let Some(string_view_array) = - array.as_any().downcast_ref::() - { - let new_array = string_view_array.gc(); - new_columns.push(Arc::new(new_array)); - arr_mutated = true; - } else { - new_columns.push(Arc::clone(array)); - } - } - - let organized_batch = if arr_mutated { - RecordBatch::try_new(batch.schema(), new_columns)? - } else { - batch - }; - - organized_batches.push(organized_batch); - } - - *globally_sorted_batches = organized_batches; - - Ok(()) - } - /// Sorts the in-memory batches and merges them into a single sorted run, then writes /// the result to spill files. async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> { diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs index 2666ab8822ed9..7aa927b725f26 100644 --- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs +++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs @@ -24,7 +24,7 @@ use arrow::array::RecordBatch; use datafusion_common::exec_datafusion_err; use datafusion_execution::disk_manager::RefCountedTempFile; -use super::{IPCStreamWriter, spill_manager::SpillManager}; +use super::{IPCStreamWriter, gc_view_arrays, spill_manager::SpillManager}; /// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`. /// Caller is able to use this struct to incrementally append in-memory batches to @@ -50,6 +50,7 @@ impl InProgressSpillFile { } /// Appends a `RecordBatch` to the spill file, initializing the writer if necessary. + /// Performs garbage collection on StringView/BinaryView arrays to reduce spill file size. /// /// # Errors /// - Returns an error if the file is not active (has been finalized) @@ -61,8 +62,11 @@ impl InProgressSpillFile { "Append operation failed: No active in-progress file. The file may have already been finalized." )); } + + let gc_batch = gc_view_arrays(batch)?; + if self.writer.is_none() { - let schema = batch.schema(); + let schema = gc_batch.schema(); if let Some(in_progress_file) = &mut self.in_progress_file { self.writer = Some(IPCStreamWriter::new( in_progress_file.path(), @@ -83,7 +87,7 @@ impl InProgressSpillFile { } } if let Some(writer) = &mut self.writer { - let (spilled_rows, _) = writer.write(batch)?; + let (spilled_rows, _) = writer.write(&gc_batch)?; if let Some(in_progress_file) = &mut self.in_progress_file { let pre_size = in_progress_file.current_disk_usage(); in_progress_file.update_disk_usage()?; diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index 4c93c03b342eb..cb7bab147363c 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -34,8 +34,10 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use arrow::array::{BufferSpec, layout}; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::array::{ + Array, BinaryViewArray, BufferSpec, GenericByteViewArray, StringViewArray, layout, +}; +use arrow::datatypes::{ByteViewType, Schema, SchemaRef}; use arrow::ipc::{ MetadataVersion, reader::StreamReader, @@ -344,6 +346,168 @@ fn get_max_alignment_for_schema(schema: &Schema) -> usize { max_alignment } +/// Size of a single view structure in StringView/BinaryView arrays (in bytes). +/// Each view is 16 bytes: 4 bytes length + 4 bytes prefix + 8 bytes buffer ID/offset. +#[cfg(test)] +const VIEW_SIZE_BYTES: usize = 16; + +/// Performs garbage collection on StringView and BinaryView arrays before spilling to reduce memory usage. +/// +/// # Why GC is needed +/// +/// StringView and BinaryView arrays can accumulate significant memory waste when sliced. +/// When a large array is sliced (e.g., taking first 100 rows of 1000), the view array +/// still references the original data buffers containing all 1000 rows of data. +/// +/// For example, in the ClickBench benchmark (issue #19414), repeated slicing of StringView +/// arrays resulted in 820MB of spill files that could be reduced to just 33MB after GC - +/// a 96% reduction in size. +/// +/// # How it works +/// +/// The GC process: +/// 1. Identifies view arrays (StringView/BinaryView) in the batch +/// 2. Checks if their data buffers exceed a memory threshold +/// 3. If exceeded, calls the Arrow `gc()` method which creates new compact buffers +/// containing only the data referenced by the current views +/// 4. Returns a new batch with GC'd arrays (or original arrays if GC not needed) +/// +/// # When GC is triggered +/// +/// GC is only performed when data buffers exceed a threshold (currently 10KB). +/// This balances memory savings against the CPU overhead of garbage collection. +/// Small arrays are passed through unchanged since the GC overhead would exceed +/// any memory savings. +/// +/// # Performance considerations +/// +/// The function always returns a new RecordBatch for API consistency, but: +/// - If no view arrays are present, it's a cheap clone (just Arc increments) +/// - GC is skipped for small buffers to avoid unnecessary CPU overhead +/// - The Arrow `gc()` method itself is optimized and only copies referenced data +pub(crate) fn gc_view_arrays(batch: &RecordBatch) -> Result { + // Early return optimization: Skip GC entirely if the batch contains no view arrays. + // This avoids unnecessary processing for batches with only primitive types. + let has_view_arrays = batch.columns().iter().any(|array| { + matches!( + array.data_type(), + arrow::datatypes::DataType::Utf8View | arrow::datatypes::DataType::BinaryView + ) + }); + + if !has_view_arrays { + // RecordBatch::clone() is cheap - just Arc reference count bumps + return Ok(batch.clone()); + } + + let mut new_columns: Vec> = Vec::with_capacity(batch.num_columns()); + + for array in batch.columns() { + let gc_array = match array.data_type() { + arrow::datatypes::DataType::Utf8View => { + let string_view = array + .as_any() + .downcast_ref::() + .expect("Utf8View array should downcast to StringViewArray"); + // Only perform GC if the array appears to be sliced (has potential waste). + // The gc() method internally checks if GC is beneficial. + if should_gc_view_array(string_view) { + Arc::new(string_view.gc()) as Arc + } else { + Arc::clone(array) + } + } + arrow::datatypes::DataType::BinaryView => { + let binary_view = array + .as_any() + .downcast_ref::() + .expect("BinaryView array should downcast to BinaryViewArray"); + // Only perform GC if the array appears to be sliced (has potential waste). + // The gc() method internally checks if GC is beneficial. + if should_gc_view_array(binary_view) { + Arc::new(binary_view.gc()) as Arc + } else { + Arc::clone(array) + } + } + // Non-view arrays are passed through unchanged + _ => Arc::clone(array), + }; + new_columns.push(gc_array); + } + + // Always return a new batch for consistency + Ok(RecordBatch::try_new(batch.schema(), new_columns)?) +} + +/// Determines whether a view array should be garbage collected. +/// +/// This function checks if: +/// 1. The array has data buffers (non-inline strings/binaries) +/// 2. The total buffer memory exceeds a threshold +/// 3. The array appears to be sliced (has potential memory waste) +/// +/// The Arrow `gc()` method itself is a no-op for arrays that don't benefit from GC, +/// but we still check here to avoid the overhead of calling gc() unnecessarily. +/// +/// # Memory threshold rationale +/// +/// We use a 10KB threshold based on: +/// - Small arrays (< 10KB) have negligible memory impact even if sliced +/// - GC has CPU overhead that isn't worth it for small arrays +/// - This threshold captures pathological cases (e.g., ClickBench: 820MB -> 33MB) +/// while avoiding unnecessary GC on small working sets +fn should_gc_view_array(array: &GenericByteViewArray) -> bool { + const MIN_BUFFER_SIZE_FOR_GC: usize = 10 * 1024; // 10KB threshold + + let data_buffers = array.data_buffers(); + if data_buffers.is_empty() { + // All strings/binaries are inlined (< 12 bytes), no GC needed + return false; + } + + // Calculate total buffer memory + let total_buffer_size: usize = data_buffers.iter().map(|b| b.capacity()).sum(); + + // Only GC if buffers exceed threshold + total_buffer_size > MIN_BUFFER_SIZE_FOR_GC +} + +#[cfg(test)] +fn calculate_string_view_waste_ratio(array: &StringViewArray) -> f64 { + use arrow_data::MAX_INLINE_VIEW_LEN; + calculate_view_waste_ratio(array.len(), array.data_buffers(), |i| { + if !array.is_null(i) { + let value = array.value(i); + if value.len() > MAX_INLINE_VIEW_LEN as usize { + return value.len(); + } + } + 0 + }) +} + +#[cfg(test)] +fn calculate_view_waste_ratio( + len: usize, + data_buffers: &[arrow::buffer::Buffer], + get_value_size: F, +) -> f64 +where + F: Fn(usize) -> usize, +{ + let total_buffer_size: usize = data_buffers.iter().map(|b| b.capacity()).sum(); + if total_buffer_size == 0 { + return 0.0; + } + + let mut actual_used_size = (0..len).map(get_value_size).sum::(); + actual_used_size += len * VIEW_SIZE_BYTES; + + let waste = total_buffer_size.saturating_sub(actual_used_size); + waste as f64 / total_buffer_size as f64 +} + #[cfg(test)] mod tests { use super::in_progress_spill_file::InProgressSpillFile; @@ -870,4 +1034,312 @@ mod tests { Ok(()) } + + #[test] + fn test_gc_string_view_before_spill() -> Result<()> { + use arrow::array::StringViewArray; + + let strings: Vec = (0..1000) + .map(|i| { + if i % 2 == 0 { + "short_string".to_string() + } else { + "this_is_a_much_longer_string_that_will_not_be_inlined".to_string() + } + }) + .collect(); + + let string_array = StringViewArray::from(strings); + let schema = Arc::new(Schema::new(vec![Field::new( + "strings", + DataType::Utf8View, + false, + )])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(string_array) as ArrayRef], + )?; + let sliced_batch = batch.slice(0, 100); + let gc_batch = gc_view_arrays(&sliced_batch)?; + + assert_eq!(gc_batch.num_rows(), sliced_batch.num_rows()); + assert_eq!(gc_batch.num_columns(), sliced_batch.num_columns()); + + Ok(()) + } + + #[test] + fn test_gc_binary_view_before_spill() -> Result<()> { + use arrow::array::BinaryViewArray; + + let binaries: Vec> = (0..1000) + .map(|i| { + if i % 2 == 0 { + vec![1, 2, 3, 4] + } else { + vec![1; 50] + } + }) + .collect(); + + let binary_array = + BinaryViewArray::from_iter(binaries.iter().map(|b| Some(b.as_slice()))); + let schema = Arc::new(Schema::new(vec![Field::new( + "binaries", + DataType::BinaryView, + false, + )])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(binary_array) as ArrayRef], + )?; + let sliced_batch = batch.slice(0, 100); + let gc_batch = gc_view_arrays(&sliced_batch)?; + + assert_eq!(gc_batch.num_rows(), sliced_batch.num_rows()); + assert_eq!(gc_batch.num_columns(), sliced_batch.num_columns()); + + Ok(()) + } + + #[test] + fn test_gc_skips_small_arrays() -> Result<()> { + use arrow::array::StringViewArray; + + let strings: Vec = (0..10).map(|i| format!("string_{i}")).collect(); + + let string_array = StringViewArray::from(strings); + let array_ref: ArrayRef = Arc::new(string_array); + + let schema = Arc::new(Schema::new(vec![Field::new( + "strings", + DataType::Utf8View, + false, + )])); + + let batch = RecordBatch::try_new(Arc::clone(&schema), vec![array_ref])?; + + // GC should return the original batch for small arrays + let gc_batch = gc_view_arrays(&batch)?; + + // The batch should be unchanged (cloned, not GC'd) + assert_eq!(gc_batch.num_rows(), batch.num_rows()); + + Ok(()) + } + + #[test] + fn test_gc_with_mixed_columns() -> Result<()> { + use arrow::array::{Int32Array, StringViewArray}; + + let strings: Vec = (0..200) + .map(|i| format!("long_string_for_gc_testing_{i}")) + .collect(); + + let string_array = StringViewArray::from(strings); + let int_array = Int32Array::from((0..200).collect::>()); + + let schema = Arc::new(Schema::new(vec![ + Field::new("strings", DataType::Utf8View, false), + Field::new("ints", DataType::Int32, false), + ])); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(string_array) as ArrayRef, + Arc::new(int_array) as ArrayRef, + ], + )?; + + let sliced_batch = batch.slice(0, 50); + let gc_batch = gc_view_arrays(&sliced_batch)?; + + assert_eq!(gc_batch.num_columns(), 2); + assert_eq!(gc_batch.num_rows(), 50); + + Ok(()) + } + + #[test] + fn test_verify_gc_triggers_for_sliced_arrays() -> Result<()> { + let strings: Vec = (0..1000) + .map(|i| { + format!( + "http://example.com/very/long/path/that/exceeds/inline/threshold/{i}" + ) + }) + .collect(); + + let string_array = StringViewArray::from(strings); + let schema = Arc::new(Schema::new(vec![Field::new( + "url", + DataType::Utf8View, + false, + )])); + + let batch = RecordBatch::try_new( + schema, + vec![Arc::new(string_array.clone()) as ArrayRef], + )?; + + let sliced = batch.slice(0, 100); + + let sliced_array = sliced + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let should_gc = should_gc_view_array(sliced_array); + let waste_ratio = calculate_string_view_waste_ratio(sliced_array); + + assert!( + waste_ratio > 0.8, + "Waste ratio should be > 0.8 for sliced array" + ); + assert!( + should_gc, + "GC should trigger for sliced array with high waste" + ); + + Ok(()) + } + + #[test] + fn test_reproduce_issue_19414_string_view_spill_without_gc() -> Result<()> { + use arrow::array::StringViewArray; + use std::fs; + + let num_rows = 5000; + let mut strings = Vec::with_capacity(num_rows); + + for i in 0..num_rows { + let url = match i % 5 { + 0 => format!( + "http://irr.ru/index.php?showalbum/login-leniya7777294,938303130/{i}" + ), + 1 => format!("http://komme%2F27.0.1453.116/very/long/path/{i}"), + 2 => format!("https://produkty%2Fproduct/category/item/{i}"), + 3 => format!( + "http://irr.ru/index.php?showalbum/login-kapusta-advert2668/{i}" + ), + 4 => format!( + "http://irr.ru/index.php?showalbum/login-kapustic/product/{i}" + ), + _ => unreachable!(), + }; + strings.push(url); + } + + let string_array = StringViewArray::from(strings); + let schema = Arc::new(Schema::new(vec![Field::new( + "URL", + DataType::Utf8View, + false, + )])); + + let original_batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(string_array.clone()) as ArrayRef], + )?; + + let total_buffer_size: usize = string_array + .data_buffers() + .iter() + .map(|buffer| buffer.capacity()) + .sum(); + + let mut sliced_batches = Vec::new(); + let slice_size = 100; + + for i in (0..num_rows).step_by(slice_size) { + let len = std::cmp::min(slice_size, num_rows - i); + let sliced = original_batch.slice(i, len); + sliced_batches.push(sliced); + } + + let env = Arc::new(RuntimeEnv::default()); + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let spill_manager = SpillManager::new(env, metrics, schema); + + let mut in_progress_file = spill_manager.create_in_progress_file("Test GC")?; + + for batch in &sliced_batches { + in_progress_file.append_batch(batch)?; + } + + let spill_file = in_progress_file.finish()?.unwrap(); + let file_size = fs::metadata(spill_file.path())?.len() as usize; + + let theoretical_without_gc = total_buffer_size * sliced_batches.len(); + let reduction_percent = ((theoretical_without_gc - file_size) as f64 + / theoretical_without_gc as f64) + * 100.0; + + assert!( + reduction_percent > 80.0, + "GC should reduce spill file size by >80%, got {reduction_percent:.1}%" + ); + + Ok(()) + } + + #[test] + fn test_spill_with_and_without_gc_comparison() -> Result<()> { + let num_rows = 2000; + let strings: Vec = (0..num_rows) + .map(|i| { + format!( + "http://example.com/this/is/a/long/url/path/that/wont/be/inlined/{i}" + ) + }) + .collect(); + + let string_array = StringViewArray::from(strings); + let schema = Arc::new(Schema::new(vec![Field::new( + "url", + DataType::Utf8View, + false, + )])); + + let batch = + RecordBatch::try_new(schema, vec![Arc::new(string_array) as ArrayRef])?; + + let sliced_batch = batch.slice(0, 200); + + let array_without_gc = sliced_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let size_without_gc: usize = array_without_gc + .data_buffers() + .iter() + .map(|buffer| buffer.capacity()) + .sum(); + + let gc_batch = gc_view_arrays(&sliced_batch)?; + let array_with_gc = gc_batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let size_with_gc: usize = array_with_gc + .data_buffers() + .iter() + .map(|buffer| buffer.capacity()) + .sum(); + + let reduction_percent = + ((size_without_gc - size_with_gc) as f64 / size_without_gc as f64) * 100.0; + + assert!( + reduction_percent > 85.0, + "Expected >85% reduction for 10% slice, got {reduction_percent:.1}%" + ); + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index 6d931112ad888..d5b9bca35d62a 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -206,7 +206,7 @@ impl SpillManager { pub(crate) trait GetSlicedSize { /// Returns the size of the `RecordBatch` when sliced. /// Note: if multiple arrays or even a single array share the same data buffers, we may double count each buffer. - /// Therefore, make sure we call gc() or organize_stringview_arrays() before using this method. + /// Therefore, make sure we call gc() or gc_view_arrays() before using this method. fn get_sliced_size(&self) -> Result; }