Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

95 changes: 52 additions & 43 deletions datafusion/core/tests/execution/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,13 @@ use datafusion_physical_optimizer::ensure_coop::EnsureCooperative;
use datafusion_physical_plan::coop::make_cooperative;
use datafusion_physical_plan::filter::FilterExec;
use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec};
use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec};
use datafusion_physical_plan::memory::{LazyMemoryExec, LazyPartition};
use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use datafusion_physical_plan::union::InterleaveExec;
use futures::StreamExt;
use parking_lot::RwLock;
use futures::{StreamExt, stream};
use rstest::rstest;
use std::any::Any;
use std::error::Error;
Expand All @@ -64,23 +63,27 @@ use tokio::runtime::{Handle, Runtime};
use tokio::select;

#[derive(Debug, Clone)]
struct RangeBatchGenerator {
struct RangePartition {
schema: SchemaRef,
value_range: Range<i64>,
boundedness: Boundedness,
batch_size: usize,
poll_count: usize,
original_range: Range<i64>,
}

impl std::fmt::Display for RangeBatchGenerator {
impl std::fmt::Display for RangePartition {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
// Display current counter
write!(f, "InfiniteGenerator(counter={})", self.poll_count)
write!(f, "RangePartition")
}
}

impl LazyBatchGenerator for RangeBatchGenerator {
#[derive(Debug, Clone)]
struct RangeState {
schema: SchemaRef,
value_range: Range<i64>,
batch_size: usize,
}

impl LazyPartition for RangePartition {
fn as_any(&self) -> &dyn Any {
self
}
Expand All @@ -89,33 +92,40 @@ impl LazyBatchGenerator for RangeBatchGenerator {
self.boundedness
}

/// Generate the next RecordBatch.
fn generate_next_batch(&mut self) -> datafusion_common::Result<Option<RecordBatch>> {
self.poll_count += 1;
fn schema(&self) -> &SchemaRef {
&self.schema
}

let mut builder = Int64Array::builder(self.batch_size);
for _ in 0..self.batch_size {
match self.value_range.next() {
None => break,
Some(v) => builder.append_value(v),
fn execute(&self) -> datafusion_common::Result<SendableRecordBatchStream> {
let state = RangeState {
schema: Arc::clone(&self.schema),
value_range: self.value_range.clone(),
batch_size: self.batch_size,
};

let stream = stream::try_unfold(state, |mut state| async move {
let mut builder = Int64Array::builder(state.batch_size);
for _ in 0..state.batch_size {
match state.value_range.next() {
None => break,
Some(v) => builder.append_value(v),
}
}
}
let array = builder.finish();
let array = builder.finish();

if array.is_empty() {
return Ok(None);
}
if array.is_empty() {
return Ok(None);
}

let batch =
RecordBatch::try_new(Arc::clone(&self.schema), vec![Arc::new(array)])?;
Ok(Some(batch))
}
let batch =
RecordBatch::try_new(Arc::clone(&state.schema), vec![Arc::new(array)])?;
Ok(Some((batch, state)))
});

fn reset_state(&self) -> Arc<RwLock<dyn LazyBatchGenerator>> {
let mut new = self.clone();
new.poll_count = 0;
new.value_range = new.original_range.clone();
Arc::new(RwLock::new(new))
Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&self.schema),
stream,
)))
}
}

Expand All @@ -142,21 +152,20 @@ fn make_lazy_exec_with_range(
Boundedness::Bounded
};

// Instantiate the generator with the batch and limit
let batch_gen = RangeBatchGenerator {
// Instantiate a partition with the batch size and range.
let partition = RangePartition {
schema: Arc::clone(&schema),
boundedness,
value_range: range.clone(),
value_range: range,
batch_size: 8192,
poll_count: 0,
original_range: range,
};

// Wrap the generator in a trait object behind Arc<RwLock<_>>
let generator: Arc<RwLock<dyn LazyBatchGenerator>> = Arc::new(RwLock::new(batch_gen));

// Create a LazyMemoryExec with one partition using our generator
let mut exec = LazyMemoryExec::try_new(schema, vec![generator]).unwrap();
// Create a LazyMemoryExec with one native partition
let mut exec = LazyMemoryExec::try_new_with_partitions(
schema,
vec![Arc::new(partition) as Arc<dyn LazyPartition>],
)
.unwrap();

exec.add_ordering(vec![PhysicalSortExpr::new(
Arc::new(Column::new(column_name, 0)),
Expand Down Expand Up @@ -286,7 +295,7 @@ async fn spill_reader_stream_yield() -> Result<(), Box<dyn Error>> {
let mut mock_stream = spawn_buffered(stream, buffer_capacity);
let schema = mock_stream.schema();

let consumer_stream = futures::stream::poll_fn(move |cx| {
let consumer_stream = stream::poll_fn(move |cx| {
let mut collected = vec![];
// To make sure that inner stream is polled multiple times, loop until the buffer is full
// Ideally, the stream will yield before the loop ends
Expand Down
1 change: 1 addition & 0 deletions datafusion/functions-table/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ datafusion-catalog = { workspace = true }
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-physical-plan = { workspace = true }
futures = { workspace = true }
parking_lot = { workspace = true }
paste = { workspace = true }

Expand Down
Loading