Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ arrow = { version = "57.3.0", features = [
"chrono-tz",
] }
arrow-buffer = { version = "57.2.0", default-features = false }
arrow-data = { version = "57.3.0", default-features = false }
arrow-flight = { version = "57.3.0", features = [
"flight-sql-experimental",
] }
Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ name = "datafusion_physical_plan"
[dependencies]
ahash = { workspace = true }
arrow = { workspace = true }
arrow-data = { workspace = true }
arrow-ord = { workspace = true }
arrow-schema = { workspace = true }
async-trait = { workspace = true }
Expand All @@ -73,6 +74,7 @@ pin-project-lite = "^0.2.7"
tokio = { workspace = true }

[dev-dependencies]
arrow-data = { workspace = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this unintentional change / git diff ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional, arrow-data is added as a dev-dependency because the test helper calculate_string_view_waste_ratio uses arrow_data::MAX_INLINE_VIEW_LEN to determine whether a string value would be inlined in a StringView array.

criterion = { workspace = true, features = ["async_futures"] }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-window = { workspace = true }
Expand Down
69 changes: 1 addition & 68 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::{
Statistics,
};

use arrow::array::{Array, RecordBatch, RecordBatchOptions, StringViewArray};
use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays};
use arrow::datatypes::SchemaRef;
use datafusion_common::config::SpillCompression;
Expand Down Expand Up @@ -402,8 +402,6 @@ impl ExternalSorter {
Some((self.spill_manager.create_in_progress_file("Sorting")?, 0));
}

Self::organize_stringview_arrays(globally_sorted_batches)?;

debug!("Spilling sort data of ExternalSorter to disk whilst inserting");

let batches_to_spill = std::mem::take(globally_sorted_batches);
Expand Down Expand Up @@ -447,71 +445,6 @@ impl ExternalSorter {
Ok(())
}

/// Reconstruct `globally_sorted_batches` to organize the payload buffers of each
/// `StringViewArray` in sequential order by calling `gc()` on them.
///
/// Note this is a workaround until <https://github.com/apache/arrow-rs/issues/7185> is
/// available
///
/// # Rationale
/// After (merge-based) sorting, all batches will be sorted into a single run,
/// but physically this sorted run is chunked into many small batches. For
/// `StringViewArray`s inside each sorted run, their inner buffers are not
/// re-constructed by default, leading to non-sequential payload locations
/// (permutated by `interleave()` Arrow kernel). A single payload buffer might
/// be shared by multiple `RecordBatch`es.
/// When writing each batch to disk, the writer has to write all referenced buffers,
/// because they have to be read back one by one to reduce memory usage. This
/// causes extra disk reads and writes, and potentially execution failure.
///
/// # Example
/// Before sorting:
/// batch1 -> buffer1
/// batch2 -> buffer2
///
/// sorted_batch1 -> buffer1
/// -> buffer2
/// sorted_batch2 -> buffer1
/// -> buffer2
///
/// Then when spilling each batch, the writer has to write all referenced buffers
/// repeatedly.
fn organize_stringview_arrays(
globally_sorted_batches: &mut Vec<RecordBatch>,
) -> Result<()> {
let mut organized_batches = Vec::with_capacity(globally_sorted_batches.len());

for batch in globally_sorted_batches.drain(..) {
let mut new_columns: Vec<Arc<dyn Array>> =
Vec::with_capacity(batch.num_columns());

let mut arr_mutated = false;
for array in batch.columns() {
if let Some(string_view_array) =
array.as_any().downcast_ref::<StringViewArray>()
{
let new_array = string_view_array.gc();
new_columns.push(Arc::new(new_array));
arr_mutated = true;
} else {
new_columns.push(Arc::clone(array));
}
}

let organized_batch = if arr_mutated {
RecordBatch::try_new(batch.schema(), new_columns)?
} else {
batch
};

organized_batches.push(organized_batch);
}

*globally_sorted_batches = organized_batches;

Ok(())
}

/// Sorts the in-memory batches and merges them into a single sorted run, then writes
/// the result to spill files.
async fn sort_and_spill_in_mem_batches(&mut self) -> Result<()> {
Expand Down
10 changes: 7 additions & 3 deletions datafusion/physical-plan/src/spill/in_progress_spill_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow::array::RecordBatch;
use datafusion_common::exec_datafusion_err;
use datafusion_execution::disk_manager::RefCountedTempFile;

use super::{IPCStreamWriter, spill_manager::SpillManager};
use super::{IPCStreamWriter, gc_view_arrays, spill_manager::SpillManager};

/// Represents an in-progress spill file used for writing `RecordBatch`es to disk, created by `SpillManager`.
/// Caller is able to use this struct to incrementally append in-memory batches to
Expand All @@ -50,6 +50,7 @@ impl InProgressSpillFile {
}

/// Appends a `RecordBatch` to the spill file, initializing the writer if necessary.
/// Performs garbage collection on StringView/BinaryView arrays to reduce spill file size.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend to add more comments to explain the rationale for views gc, perhaps just copy and paste from

fn organize_stringview_arrays(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EeshanBembi could you enhance the comment here?

///
/// # Errors
/// - Returns an error if the file is not active (has been finalized)
Expand All @@ -61,8 +62,11 @@ impl InProgressSpillFile {
"Append operation failed: No active in-progress file. The file may have already been finalized."
));
}

let gc_batch = gc_view_arrays(batch)?;

if self.writer.is_none() {
let schema = batch.schema();
let schema = gc_batch.schema();
if let Some(in_progress_file) = &mut self.in_progress_file {
self.writer = Some(IPCStreamWriter::new(
in_progress_file.path(),
Expand All @@ -83,7 +87,7 @@ impl InProgressSpillFile {
}
}
if let Some(writer) = &mut self.writer {
let (spilled_rows, _) = writer.write(batch)?;
let (spilled_rows, _) = writer.write(&gc_batch)?;
if let Some(in_progress_file) = &mut self.in_progress_file {
let pre_size = in_progress_file.current_disk_usage();
in_progress_file.update_disk_usage()?;
Expand Down
Loading