From ff3f190b4c9064ee240a590e2d6c43dfd354be3b Mon Sep 17 00:00:00 2001 From: Ethan Urbanski Date: Sat, 14 Feb 2026 21:30:31 -0500 Subject: [PATCH 1/4] Feat physical plan add LazyPartition and wire LazyMemoryExec to partitions Signed-off-by: Ethan Urbanski --- datafusion/physical-plan/src/memory.rs | 567 ++++++++++++++++++++----- 1 file changed, 465 insertions(+), 102 deletions(-) diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index a58abe20a23e..a25d865cd92e 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -25,6 +25,7 @@ use std::task::{Context, Poll}; use crate::coop::cooperative; use crate::execution_plan::{Boundedness, EmissionType, SchedulingType}; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use crate::stream::{ObservedStream, RecordBatchStreamAdapter}; use crate::{ DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, RecordBatchStream, SendableRecordBatchStream, @@ -38,7 +39,7 @@ use datafusion_execution::memory_pool::MemoryReservation; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; -use futures::Stream; +use futures::{Stream, StreamExt, stream}; use parking_lot::RwLock; /// Iterator over batches @@ -133,6 +134,9 @@ impl RecordBatchStream for MemoryStream { } } +#[deprecated( + note = "Use LazyPartition with LazyMemoryExec::try_new_with_partitions instead" +)] pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display { /// Returns the generator as [`Any`] so that it can be /// downcast to a specific implementation. @@ -149,6 +153,122 @@ pub trait LazyBatchGenerator: Send + Sync + fmt::Debug + fmt::Display { fn reset_state(&self) -> Arc>; } +/// A partition that lazily produces record batches via [`SendableRecordBatchStream`]. +/// +/// Each call to [`execute`](Self::execute) must return an independent stream +/// starting from the beginning of the partition's data. Implementations must +/// be safe to call `execute` multiple times (replay semantics). +/// +/// Used with [`LazyMemoryExec::try_new_with_partitions`] to create execution +/// plans that generate data on-demand without buffering all batches in memory. +pub trait LazyPartition: fmt::Debug + fmt::Display + Send + Sync { + /// Returns the partition as [`Any`] so that it can be + /// downcast to a specific implementation. + fn as_any(&self) -> &dyn Any; + + /// Schema produced by this partition. + fn schema(&self) -> &SchemaRef; + + /// Returns the boundedness of this partition. + fn boundedness(&self) -> Boundedness { + Boundedness::Bounded + } + + /// Creates a fresh stream for this partition. + fn execute(&self) -> Result; +} + +/// Compatibility adapter for legacy [`LazyBatchGenerator`]. +#[derive(Debug)] +#[expect(deprecated)] +pub struct LazyBatchGeneratorPartition { + schema: SchemaRef, + generator: Arc>, +} + +#[expect(deprecated)] +impl LazyBatchGeneratorPartition { + pub fn new( + schema: SchemaRef, + generator: Arc>, + ) -> Self { + Self { schema, generator } + } + + pub fn generator(&self) -> &Arc> { + &self.generator + } +} + +impl fmt::Display for LazyBatchGeneratorPartition { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.generator.read().fmt(f) + } +} + +#[expect(deprecated)] +impl LazyPartition for LazyBatchGeneratorPartition { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> &SchemaRef { + &self.schema + } + + fn boundedness(&self) -> Boundedness { + self.generator.read().boundedness() + } + + fn execute(&self) -> Result { + let schema = Arc::clone(&self.schema); + let generator = self.generator.read().reset_state(); + let stream = stream::try_unfold(generator, |generator| async move { + let batch = generator.write().generate_next_batch()?; + Ok(batch.map(|batch| (batch, generator))) + }); + + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } +} + +fn aggregate_boundedness(boundedness: impl Iterator) -> Boundedness { + boundedness + .reduce(|acc, b| match acc { + Boundedness::Bounded => b, + Boundedness::Unbounded { + requires_infinite_memory, + } => { + let acc_infinite_memory = requires_infinite_memory; + match b { + Boundedness::Bounded => acc, + Boundedness::Unbounded { + requires_infinite_memory, + } => Boundedness::Unbounded { + requires_infinite_memory: requires_infinite_memory + || acc_infinite_memory, + }, + } + } + }) + .unwrap_or(Boundedness::Bounded) +} + +#[expect(deprecated)] +fn collect_legacy_generators( + partitions: &[Arc], +) -> Vec>> { + partitions + .iter() + .filter_map(|partition| { + partition + .as_any() + .downcast_ref::() + .map(|adapter| Arc::clone(adapter.generator())) + }) + .collect() +} + /// Execution plan for lazy in-memory batches of data /// /// This plan generates output batches lazily, it doesn't have to buffer all batches @@ -158,8 +278,11 @@ pub struct LazyMemoryExec { schema: SchemaRef, /// Optional projection for which columns to load projection: Option>, - /// Functions to generate batches for each partition - batch_generators: Vec>>, + /// Partition implementations for each output partition + partitions: Vec>, + /// Legacy generator compatibility cache for deprecated APIs. + #[expect(deprecated)] + legacy_generators: Vec>>, /// Plan properties cache storing equivalence properties, partitioning, and execution mode cache: PlanProperties, /// Execution metrics @@ -167,36 +290,26 @@ pub struct LazyMemoryExec { } impl LazyMemoryExec { - /// Create a new lazy memory execution plan - pub fn try_new( + /// Create a new lazy memory execution plan from partition implementations. + pub fn try_new_with_partitions( schema: SchemaRef, - generators: Vec>>, + partitions: Vec>, ) -> Result { - let boundedness = generators - .iter() - .map(|g| g.read().boundedness()) - .reduce(|acc, b| match acc { - Boundedness::Bounded => b, - Boundedness::Unbounded { - requires_infinite_memory, - } => { - let acc_infinite_memory = requires_infinite_memory; - match b { - Boundedness::Bounded => acc, - Boundedness::Unbounded { - requires_infinite_memory, - } => Boundedness::Unbounded { - requires_infinite_memory: requires_infinite_memory - || acc_infinite_memory, - }, - } - } - }) - .unwrap_or(Boundedness::Bounded); + for partition in &partitions { + assert_eq_or_internal_err!( + partition.schema().as_ref(), + schema.as_ref(), + "Partition schema must match LazyMemoryExec schema" + ); + } + + let boundedness = + aggregate_boundedness(partitions.iter().map(|p| p.boundedness())); + let legacy_generators = collect_legacy_generators(&partitions); let cache = PlanProperties::new( EquivalenceProperties::new(Arc::clone(&schema)), - Partitioning::RoundRobinBatch(generators.len()), + Partitioning::RoundRobinBatch(partitions.len()), EmissionType::Incremental, boundedness, ) @@ -205,12 +318,32 @@ impl LazyMemoryExec { Ok(Self { schema, projection: None, - batch_generators: generators, + partitions, + legacy_generators, cache, metrics: ExecutionPlanMetricsSet::new(), }) } + /// Create a new lazy memory execution plan + #[deprecated(note = "Use LazyMemoryExec::try_new_with_partitions instead")] + #[expect(deprecated)] + pub fn try_new( + schema: SchemaRef, + generators: Vec>>, + ) -> Result { + let partitions = generators + .into_iter() + .map(|generator| { + Arc::new(LazyBatchGeneratorPartition::new( + Arc::clone(&schema), + generator, + )) as Arc + }) + .collect::>(); + Self::try_new_with_partitions(schema, partitions) + } + pub fn with_projection(mut self, projection: Option>) -> Self { match projection.as_ref() { Some(columns) => { @@ -228,11 +361,11 @@ impl LazyMemoryExec { pub fn try_set_partitioning(&mut self, partitioning: Partitioning) -> Result<()> { let partition_count = partitioning.partition_count(); - let generator_count = self.batch_generators.len(); + let generator_count = self.partitions.len(); assert_eq_or_internal_err!( partition_count, generator_count, - "Partition count must match generator count: {} != {}", + "Partitioning count must match number of partitions: {} != {}", partition_count, generator_count ); @@ -246,9 +379,16 @@ impl LazyMemoryExec { .add_orderings(std::iter::once(ordering)); } + /// Get the partitions. + pub fn partitions(&self) -> &[Arc] { + &self.partitions + } + /// Get the batch generators + #[deprecated(note = "Use LazyMemoryExec::partitions instead")] + #[expect(deprecated)] pub fn generators(&self) -> &Vec>> { - &self.batch_generators + &self.legacy_generators } } @@ -256,7 +396,7 @@ impl fmt::Debug for LazyMemoryExec { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("LazyMemoryExec") .field("schema", &self.schema) - .field("batch_generators", &self.batch_generators) + .field("partitions", &self.partitions) .finish() } } @@ -268,10 +408,10 @@ impl DisplayAs for LazyMemoryExec { write!( f, "LazyMemoryExec: partitions={}, batch_generators=[{}]", - self.batch_generators.len(), - self.batch_generators + self.partitions.len(), + self.partitions .iter() - .map(|g| g.read().to_string()) + .map(|partition| partition.to_string()) .collect::>() .join(", ") ) @@ -281,9 +421,9 @@ impl DisplayAs for LazyMemoryExec { writeln!( f, "batch_generators={}", - self.batch_generators + self.partitions .iter() - .map(|g| g.read().to_string()) + .map(|partition| partition.to_string()) .collect::>() .join(", ") )?; @@ -331,20 +471,29 @@ impl ExecutionPlan for LazyMemoryExec { _context: Arc, ) -> Result { assert_or_internal_err!( - partition < self.batch_generators.len(), + partition < self.partitions.len(), "Invalid partition {} for LazyMemoryExec with {} partitions", partition, - self.batch_generators.len() + self.partitions.len() ); let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); - - let stream = LazyMemoryStream { - schema: Arc::clone(&self.schema), - projection: self.projection.clone(), - generator: Arc::clone(&self.batch_generators[partition]), - baseline_metrics, + let stream = self.partitions[partition].execute()?; + let stream = match self.projection.as_ref() { + Some(columns) => { + let columns = Arc::new(columns.clone()); + let schema = Arc::clone(&self.schema); + let stream = stream.map(move |batch| { + batch.and_then(|batch| { + batch.project(columns.as_ref()).map_err(Into::into) + }) + }); + Box::pin(RecordBatchStreamAdapter::new(schema, stream)) + as SendableRecordBatchStream + } + None => stream, }; + let stream = ObservedStream::new(stream, baseline_metrics, None); Ok(Box::pin(cooperative(stream))) } @@ -352,15 +501,31 @@ impl ExecutionPlan for LazyMemoryExec { Some(self.metrics.clone_inner()) } + #[expect(deprecated)] fn reset_state(self: Arc) -> Result> { - let generators = self - .generators() + let partitions = self + .partitions .iter() - .map(|g| g.read().reset_state()) + .map(|partition| { + partition + .as_any() + .downcast_ref::() + .map_or_else( + || Arc::clone(partition), + |adapter| { + Arc::new(LazyBatchGeneratorPartition::new( + Arc::clone(adapter.schema()), + adapter.generator().read().reset_state(), + )) as Arc + }, + ) + }) .collect::>(); + let legacy_generators = collect_legacy_generators(&partitions); Ok(Arc::new(LazyMemoryExec { schema: Arc::clone(&self.schema), - batch_generators: generators, + partitions, + legacy_generators, cache: self.cache.clone(), metrics: ExecutionPlanMetricsSet::new(), projection: self.projection.clone(), @@ -368,63 +533,15 @@ impl ExecutionPlan for LazyMemoryExec { } } -/// Stream that generates record batches on demand -pub struct LazyMemoryStream { - schema: SchemaRef, - /// Optional projection for which columns to load - projection: Option>, - /// Generator to produce batches - /// - /// Note: Idiomatically, DataFusion uses plan-time parallelism - each stream - /// should have a unique `LazyBatchGenerator`. Use RepartitionExec or - /// construct multiple `LazyMemoryStream`s during planning to enable - /// parallel execution. - /// Sharing generators between streams should be used with caution. - generator: Arc>, - /// Execution metrics - baseline_metrics: BaselineMetrics, -} - -impl Stream for LazyMemoryStream { - type Item = Result; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { - let _timer_guard = self.baseline_metrics.elapsed_compute().timer(); - let batch = self.generator.write().generate_next_batch(); - - let poll = match batch { - Ok(Some(batch)) => { - // return just the columns requested - let batch = match self.projection.as_ref() { - Some(columns) => batch.project(columns)?, - None => batch, - }; - Poll::Ready(Some(Ok(batch))) - } - Ok(None) => Poll::Ready(None), - Err(e) => Poll::Ready(Some(Err(e))), - }; - - self.baseline_metrics.record_poll(poll) - } -} - -impl RecordBatchStream for LazyMemoryStream { - fn schema(&self) -> SchemaRef { - Arc::clone(&self.schema) - } -} - #[cfg(test)] +#[expect(deprecated)] mod lazy_memory_tests { use super::*; use crate::common::collect; + use crate::stream::RecordBatchStreamAdapter; use arrow::array::Int64Array; use arrow::datatypes::{DataType, Field, Schema}; - use futures::StreamExt; + use futures::{StreamExt, stream}; #[derive(Debug, Clone)] struct TestGenerator { @@ -475,6 +592,57 @@ mod lazy_memory_tests { } } + #[derive(Debug, Clone)] + struct TestPartition { + schema: SchemaRef, + batches: Arc>, + error_at_batch: Option, + } + + impl fmt::Display for TestPartition { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "TestPartition: batches={}, error_at_batch={:?}", + self.batches.len(), + self.error_at_batch + ) + } + } + + impl LazyPartition for TestPartition { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> &SchemaRef { + &self.schema + } + + fn execute(&self) -> Result { + let schema = Arc::clone(&self.schema); + let batches = Arc::clone(&self.batches); + let error_at_batch = self.error_at_batch; + let stream = stream::try_unfold(0usize, move |index| { + let batches = Arc::clone(&batches); + async move { + if error_at_batch.is_some_and(|fail_at| fail_at == index) { + return Err(datafusion_common::internal_datafusion_err!( + "injected partition error at batch {index}" + )); + } + + let Some(batch) = batches.get(index).cloned() else { + return Ok(None); + }; + Ok(Some((batch, index + 1))) + } + }); + + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } + } + #[tokio::test] async fn test_lazy_memory_exec() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); @@ -526,6 +694,137 @@ mod lazy_memory_tests { Ok(()) } + #[tokio::test] + async fn test_lazy_memory_exec_native_partition_replay_without_reset() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int64Array::from(vec![1, 2, 3]))], + )?; + let partition = Arc::new(TestPartition { + schema: Arc::clone(&schema), + batches: Arc::new(vec![batch]), + error_at_batch: None, + }); + + let exec = LazyMemoryExec::try_new_with_partitions(schema, vec![partition])?; + + let batches_first = + collect(exec.execute(0, Arc::new(TaskContext::default()))?).await?; + let batches_second = + collect(exec.execute(0, Arc::new(TaskContext::default()))?).await?; + + assert_eq!(batches_first, batches_second); + Ok(()) + } + + #[tokio::test] + async fn test_lazy_memory_exec_partition_projection() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int64, false), + Field::new("b", DataType::Int64, false), + ])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int64Array::from(vec![10, 20])), + Arc::new(Int64Array::from(vec![1, 2])), + ], + )?; + let partition = Arc::new(TestPartition { + schema: Arc::clone(&schema), + batches: Arc::new(vec![batch]), + error_at_batch: None, + }); + + let exec = LazyMemoryExec::try_new_with_partitions(schema, vec![partition])? + .with_projection(Some(vec![1])); + + let batches = collect(exec.execute(0, Arc::new(TaskContext::default()))?).await?; + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].schema().field(0).name(), "b"); + let values = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(values.values(), &[1, 2]); + Ok(()) + } + + #[tokio::test] + async fn test_lazy_memory_exec_deprecated_try_new_replays_on_execute() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let generator = TestGenerator { + counter: 0, + max_batches: 2, + batch_size: 2, + schema: Arc::clone(&schema), + }; + + let exec = + LazyMemoryExec::try_new(schema, vec![Arc::new(RwLock::new(generator))])?; + let task_ctx = Arc::new(TaskContext::default()); + + let first = collect(exec.execute(0, Arc::clone(&task_ctx))?).await?; + let second = collect(exec.execute(0, task_ctx)?).await?; + + assert_eq!(first, second); + Ok(()) + } + + #[test] + fn test_lazy_memory_exec_generators_compatibility_accessor() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + + let legacy_exec = LazyMemoryExec::try_new( + Arc::clone(&schema), + vec![Arc::new(RwLock::new(TestGenerator { + counter: 0, + max_batches: 1, + batch_size: 1, + schema: Arc::clone(&schema), + }))], + )?; + assert_eq!(legacy_exec.generators().len(), 1); + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int64Array::from(vec![1]))], + )?; + let native_exec = LazyMemoryExec::try_new_with_partitions( + schema, + vec![Arc::new(TestPartition { + schema: legacy_exec.schema(), + batches: Arc::new(vec![batch]), + error_at_batch: None, + })], + )?; + assert!(native_exec.generators().is_empty()); + Ok(()) + } + + #[tokio::test] + async fn test_lazy_memory_exec_error_then_end() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int64Array::from(vec![1]))], + )?; + let partition = Arc::new(TestPartition { + schema: Arc::clone(&schema), + batches: Arc::new(vec![batch]), + error_at_batch: Some(1), + }); + let exec = LazyMemoryExec::try_new_with_partitions(schema, vec![partition])?; + + let mut stream = exec.execute(0, Arc::new(TaskContext::default()))?; + assert!(matches!(stream.next().await, Some(Ok(_)))); + assert!(matches!(stream.next().await, Some(Err(_)))); + assert!(stream.next().await.is_none()); + Ok(()) + } + #[tokio::test] async fn test_lazy_memory_exec_invalid_partition() -> Result<()> { let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); @@ -552,7 +851,7 @@ mod lazy_memory_tests { } #[tokio::test] - async fn test_generate_series_metrics_integration() -> Result<()> { + async fn test_lazy_memory_exec_metrics() -> Result<()> { // Test LazyMemoryExec metrics with different configurations let test_cases = vec![ (10, 2, 10), // 10 rows, batch size 2, expected 10 rows @@ -618,4 +917,68 @@ mod lazy_memory_tests { Ok(()) } + + #[tokio::test] + async fn test_lazy_memory_exec_multi_partition() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let batch_a = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int64Array::from(vec![1, 2]))], + )?; + let batch_b = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int64Array::from(vec![10, 20]))], + )?; + let p0 = Arc::new(TestPartition { + schema: Arc::clone(&schema), + batches: Arc::new(vec![batch_a.clone()]), + error_at_batch: None, + }); + let p1 = Arc::new(TestPartition { + schema: Arc::clone(&schema), + batches: Arc::new(vec![batch_b.clone()]), + error_at_batch: None, + }); + + let exec = LazyMemoryExec::try_new_with_partitions( + schema, + vec![p0 as Arc, p1 as Arc], + )?; + + let ctx = Arc::new(TaskContext::default()); + let batches_0 = collect(exec.execute(0, Arc::clone(&ctx))?).await?; + let batches_1 = collect(exec.execute(1, Arc::clone(&ctx))?).await?; + + assert_eq!(batches_0, vec![batch_a]); + assert_eq!(batches_1, vec![batch_b]); + Ok(()) + } + + #[tokio::test] + async fn test_lazy_memory_exec_native_partition_reset_state() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int64Array::from(vec![1, 2, 3]))], + )?; + let partition = Arc::new(TestPartition { + schema: Arc::clone(&schema), + batches: Arc::new(vec![batch]), + error_at_batch: None, + }); + + let exec = Arc::new(LazyMemoryExec::try_new_with_partitions( + schema, + vec![partition], + )?); + + let ctx = Arc::new(TaskContext::default()); + let batches_before = collect(exec.execute(0, Arc::clone(&ctx))?).await?; + + let exec_reset = exec.reset_state()?; + let batches_after = collect(exec_reset.execute(0, ctx)?).await?; + + assert_eq!(batches_before, batches_after); + Ok(()) + } } From 0d6abb907fee34f8c38707c3ece603ba8e9c516e Mon Sep 17 00:00:00 2001 From: Ethan Urbanski Date: Sat, 14 Feb 2026 21:30:58 -0500 Subject: [PATCH 2/4] Refactor functions table migrate generate_series range to LazyPartition Signed-off-by: Ethan Urbanski --- Cargo.lock | 1 + datafusion/functions-table/Cargo.toml | 1 + .../functions-table/src/generate_series.rs | 358 +++++++++++++----- 3 files changed, 275 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c552835a2cb6..358a7775a6cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2317,6 +2317,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datafusion-physical-plan", + "futures", "parking_lot", "paste", ] diff --git a/datafusion/functions-table/Cargo.toml b/datafusion/functions-table/Cargo.toml index aa401fbd7d4e..8d2aa7cf303a 100644 --- a/datafusion/functions-table/Cargo.toml +++ b/datafusion/functions-table/Cargo.toml @@ -47,6 +47,7 @@ datafusion-catalog = { workspace = true } datafusion-common = { workspace = true } datafusion-expr = { workspace = true } datafusion-physical-plan = { workspace = true } +futures = { workspace = true } parking_lot = { workspace = true } paste = { workspace = true } diff --git a/datafusion/functions-table/src/generate_series.rs b/datafusion/functions-table/src/generate_series.rs index 342269fbc299..2fa69636e6c8 100644 --- a/datafusion/functions-table/src/generate_series.rs +++ b/datafusion/functions-table/src/generate_series.rs @@ -28,8 +28,12 @@ use datafusion_catalog::TableFunctionImpl; use datafusion_catalog::TableProvider; use datafusion_common::{Result, ScalarValue, plan_err}; use datafusion_expr::{Expr, TableType}; -use datafusion_physical_plan::ExecutionPlan; -use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec}; +#[expect(deprecated)] +use datafusion_physical_plan::memory::LazyBatchGenerator; +use datafusion_physical_plan::memory::{LazyMemoryExec, LazyPartition}; +use datafusion_physical_plan::stream::RecordBatchStreamAdapter; +use datafusion_physical_plan::{ExecutionPlan, SendableRecordBatchStream}; +use futures::stream; use parking_lot::RwLock; use std::any::Any; use std::fmt; @@ -48,6 +52,7 @@ impl Empty { } } +#[expect(deprecated)] impl LazyBatchGenerator for Empty { fn as_any(&self) -> &dyn Any { self @@ -238,100 +243,213 @@ impl GenerateSeriesTable { Self { schema, args } } + pub fn as_partition(&self, batch_size: usize) -> Result> { + Ok(Arc::new(GenerateSeriesPartition::new( + self.schema(), + self.args.clone(), + batch_size, + ))) + } + + #[deprecated( + note = "Use GenerateSeriesPartition via GenerateSeriesTable::as_partition instead" + )] + #[expect(deprecated)] pub fn as_generator( &self, batch_size: usize, ) -> Result>> { - let generator: Arc> = match &self.args { - GenSeriesArgs::ContainsNull { name } => Arc::new(RwLock::new(Empty { name })), - GenSeriesArgs::Int64Args { - start, - end, - step, - include_end, - name, - } => Arc::new(RwLock::new(GenericSeriesState { - schema: self.schema(), - start: *start, - end: *end, - step: *step, - current: *start, + let generator: Arc> = + match build_generate_series_state( + Arc::clone(&self.schema), + &self.args, batch_size, - include_end: *include_end, - name, - })), - GenSeriesArgs::TimestampArgs { - start, - end, - step, - tz, - include_end, - name, + )? { + GenerateSeriesState::Empty { name } => { + Arc::new(RwLock::new(Empty { name })) + } + GenerateSeriesState::Int64(state) => Arc::new(RwLock::new(state)), + GenerateSeriesState::Timestamp(state) => Arc::new(RwLock::new(state)), + }; + + Ok(generator) + } +} + +#[derive(Debug, Clone)] +pub struct GenerateSeriesPartition { + schema: SchemaRef, + args: GenSeriesArgs, + batch_size: usize, +} + +impl GenerateSeriesPartition { + pub fn new(schema: SchemaRef, args: GenSeriesArgs, batch_size: usize) -> Self { + Self { + schema, + args, + batch_size, + } + } + + pub fn args(&self) -> &GenSeriesArgs { + &self.args + } + + pub fn batch_size(&self) -> usize { + self.batch_size + } +} + +impl fmt::Display for GenerateSeriesPartition { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match &self.args { + GenSeriesArgs::ContainsNull { name } => write!(f, "{name}: empty"), + GenSeriesArgs::Int64Args { + start, end, name, .. + } + | GenSeriesArgs::TimestampArgs { + start, end, name, .. + } + | GenSeriesArgs::DateArgs { + start, end, name, .. } => { - let parsed_tz = tz - .as_ref() - .map(|s| Tz::from_str(s.as_ref())) - .transpose() - .map_err(|e| { - datafusion_common::internal_datafusion_err!( - "Failed to parse timezone: {e}" - ) - })? - .unwrap_or_else(|| Tz::from_str("+00:00").unwrap()); - Arc::new(RwLock::new(GenericSeriesState { - schema: self.schema(), - start: TimestampValue { - value: *start, - parsed_tz: Some(parsed_tz), - tz_str: tz.clone(), - }, - end: TimestampValue { - value: *end, - parsed_tz: Some(parsed_tz), - tz_str: tz.clone(), - }, - step: *step, - current: TimestampValue { - value: *start, - parsed_tz: Some(parsed_tz), - tz_str: tz.clone(), - }, - batch_size, - include_end: *include_end, - name, - })) + write!( + f, + "{name}: start={start}, end={end}, batch_size={}", + self.batch_size + ) } - GenSeriesArgs::DateArgs { - start, - end, - step, - include_end, - name, - } => Arc::new(RwLock::new(GenericSeriesState { - schema: self.schema(), + } + } +} + +#[derive(Debug, Clone)] +enum GenerateSeriesState { + Empty { name: &'static str }, + Int64(GenericSeriesState), + Timestamp(GenericSeriesState), +} + +impl GenerateSeriesState { + fn generate_next_batch(&mut self) -> Result> { + match self { + Self::Empty { .. } => Ok(None), + Self::Int64(state) => state.next_batch(), + Self::Timestamp(state) => state.next_batch(), + } + } +} + +fn build_generate_series_state( + schema: SchemaRef, + args: &GenSeriesArgs, + batch_size: usize, +) -> Result { + match args { + GenSeriesArgs::ContainsNull { name } => Ok(GenerateSeriesState::Empty { name }), + GenSeriesArgs::Int64Args { + start, + end, + step, + include_end, + name, + } => Ok(GenerateSeriesState::Int64(GenericSeriesState { + schema, + start: *start, + end: *end, + step: *step, + current: *start, + batch_size, + include_end: *include_end, + name, + })), + GenSeriesArgs::TimestampArgs { + start, + end, + step, + tz, + include_end, + name, + } => { + let parsed_tz = parse_timezone(tz.as_ref())?; + Ok(GenerateSeriesState::Timestamp(GenericSeriesState { + schema, start: TimestampValue { value: *start, - parsed_tz: None, - tz_str: None, + parsed_tz: Some(parsed_tz), + tz_str: tz.clone(), }, end: TimestampValue { value: *end, - parsed_tz: None, - tz_str: None, + parsed_tz: Some(parsed_tz), + tz_str: tz.clone(), }, step: *step, current: TimestampValue { value: *start, - parsed_tz: None, - tz_str: None, + parsed_tz: Some(parsed_tz), + tz_str: tz.clone(), }, batch_size, include_end: *include_end, name, - })), - }; + })) + } + GenSeriesArgs::DateArgs { + start, + end, + step, + include_end, + name, + } => Ok(GenerateSeriesState::Timestamp(GenericSeriesState { + schema, + start: TimestampValue { + value: *start, + parsed_tz: None, + tz_str: None, + }, + end: TimestampValue { + value: *end, + parsed_tz: None, + tz_str: None, + }, + step: *step, + current: TimestampValue { + value: *start, + parsed_tz: None, + tz_str: None, + }, + batch_size, + include_end: *include_end, + name, + })), + } +} - Ok(generator) +impl LazyPartition for GenerateSeriesPartition { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> &SchemaRef { + &self.schema + } + + fn execute(&self) -> Result { + let state = build_generate_series_state( + Arc::clone(&self.schema), + &self.args, + self.batch_size, + )?; + let schema = Arc::clone(&self.schema); + + let stream = stream::try_unfold(state, |mut state| async move { + let batch = state.generate_next_batch()?; + Ok(batch.map(|batch| (batch, state))) + }); + + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) } } @@ -375,14 +493,8 @@ impl GenericSeriesState { pub fn current(&self) -> &T { &self.current } -} -impl LazyBatchGenerator for GenericSeriesState { - fn as_any(&self) -> &dyn Any { - self - } - - fn generate_next_batch(&mut self) -> Result> { + pub fn next_batch(&mut self) -> Result> { let mut buf = Vec::with_capacity(self.batch_size); while buf.len() < self.batch_size @@ -402,6 +514,17 @@ impl LazyBatchGenerator for GenericSeriesState { let batch = RecordBatch::try_new(Arc::clone(&self.schema), vec![array])?; Ok(Some(batch)) } +} + +#[expect(deprecated)] +impl LazyBatchGenerator for GenericSeriesState { + fn as_any(&self) -> &dyn Any { + self + } + + fn generate_next_batch(&mut self) -> Result> { + self.next_batch() + } fn reset_state(&self) -> Arc> { let mut new = self.clone(); @@ -441,6 +564,19 @@ fn validate_interval_step(step: IntervalMonthDayNano) -> Result<()> { Ok(()) } +fn parse_timezone(tz: Option<&Arc>) -> Result { + match tz { + Some(tz) => Tz::from_str(tz.as_ref()).map_err(|e| { + datafusion_common::internal_datafusion_err!("Failed to parse timezone: {e}") + }), + None => Tz::from_str("+00:00").map_err(|e| { + datafusion_common::internal_datafusion_err!( + "Failed to parse default timezone: {e}" + ) + }), + } +} + #[async_trait] impl TableProvider for GenerateSeriesTable { fn as_any(&self) -> &dyn Any { @@ -463,10 +599,10 @@ impl TableProvider for GenerateSeriesTable { _limit: Option, ) -> Result> { let batch_size = state.config_options().execution.batch_size; - let generator = self.as_generator(batch_size)?; + let partition = self.as_partition(batch_size)?; Ok(Arc::new( - LazyMemoryExec::try_new(self.schema(), vec![generator])? + LazyMemoryExec::try_new_with_partitions(self.schema(), vec![partition])? .with_projection(projection.cloned()), )) } @@ -760,14 +896,20 @@ impl TableFunctionImpl for RangeFunc { } #[cfg(test)] +#[expect(deprecated)] mod generate_series_tests { use std::sync::Arc; + use arrow::array::Int64Array; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::Result; + #[expect(deprecated)] use datafusion_physical_plan::memory::LazyBatchGenerator; + use futures::TryStreamExt; - use crate::generate_series::GenericSeriesState; + use crate::generate_series::{ + GenSeriesArgs, GenerateSeriesTable, GenericSeriesState, + }; #[test] fn test_generic_series_state_reset() -> Result<()> { @@ -794,4 +936,50 @@ mod generate_series_tests { Ok(()) } + + #[test] + fn test_generate_series_partition_execute() -> Result<()> { + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Int64, + false, + )])); + let table = GenerateSeriesTable::new( + Arc::clone(&schema), + GenSeriesArgs::Int64Args { + start: 1, + end: 5, + step: 1, + include_end: true, + name: "generate_series", + }, + ); + let partition = table.as_partition(2)?; + assert_eq!( + partition.to_string(), + "generate_series: start=1, end=5, batch_size=2" + ); + + let batches = + futures::executor::block_on(partition.execute()?.try_collect::>())?; + assert_eq!(batches.len(), 3); + assert_eq!( + batches + .iter() + .flat_map(|batch| { + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + .collect::>() + }) + .collect::>(), + vec![1, 2, 3, 4, 5] + ); + Ok(()) + } } From 6e32a2417a60db0df088ffe31dda934e41d3916b Mon Sep 17 00:00:00 2001 From: Ethan Urbanski Date: Sat, 14 Feb 2026 21:31:23 -0500 Subject: [PATCH 3/4] Test core migrate coop lazy source test to LazyPartition Signed-off-by: Ethan Urbanski --- datafusion/core/tests/execution/coop.rs | 95 ++++++++++++++----------- 1 file changed, 52 insertions(+), 43 deletions(-) diff --git a/datafusion/core/tests/execution/coop.rs b/datafusion/core/tests/execution/coop.rs index 9818d9d98f6b..0a7354b2e877 100644 --- a/datafusion/core/tests/execution/coop.rs +++ b/datafusion/core/tests/execution/coop.rs @@ -44,14 +44,13 @@ use datafusion_physical_optimizer::ensure_coop::EnsureCooperative; use datafusion_physical_plan::coop::make_cooperative; use datafusion_physical_plan::filter::FilterExec; use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode, SortMergeJoinExec}; -use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec}; +use datafusion_physical_plan::memory::{LazyMemoryExec, LazyPartition}; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::stream::RecordBatchStreamAdapter; use datafusion_physical_plan::union::InterleaveExec; -use futures::StreamExt; -use parking_lot::RwLock; +use futures::{StreamExt, stream}; use rstest::rstest; use std::any::Any; use std::error::Error; @@ -64,23 +63,27 @@ use tokio::runtime::{Handle, Runtime}; use tokio::select; #[derive(Debug, Clone)] -struct RangeBatchGenerator { +struct RangePartition { schema: SchemaRef, value_range: Range, boundedness: Boundedness, batch_size: usize, - poll_count: usize, - original_range: Range, } -impl std::fmt::Display for RangeBatchGenerator { +impl std::fmt::Display for RangePartition { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { - // Display current counter - write!(f, "InfiniteGenerator(counter={})", self.poll_count) + write!(f, "RangePartition") } } -impl LazyBatchGenerator for RangeBatchGenerator { +#[derive(Debug, Clone)] +struct RangeState { + schema: SchemaRef, + value_range: Range, + batch_size: usize, +} + +impl LazyPartition for RangePartition { fn as_any(&self) -> &dyn Any { self } @@ -89,33 +92,40 @@ impl LazyBatchGenerator for RangeBatchGenerator { self.boundedness } - /// Generate the next RecordBatch. - fn generate_next_batch(&mut self) -> datafusion_common::Result> { - self.poll_count += 1; + fn schema(&self) -> &SchemaRef { + &self.schema + } - let mut builder = Int64Array::builder(self.batch_size); - for _ in 0..self.batch_size { - match self.value_range.next() { - None => break, - Some(v) => builder.append_value(v), + fn execute(&self) -> datafusion_common::Result { + let state = RangeState { + schema: Arc::clone(&self.schema), + value_range: self.value_range.clone(), + batch_size: self.batch_size, + }; + + let stream = stream::try_unfold(state, |mut state| async move { + let mut builder = Int64Array::builder(state.batch_size); + for _ in 0..state.batch_size { + match state.value_range.next() { + None => break, + Some(v) => builder.append_value(v), + } } - } - let array = builder.finish(); + let array = builder.finish(); - if array.is_empty() { - return Ok(None); - } + if array.is_empty() { + return Ok(None); + } - let batch = - RecordBatch::try_new(Arc::clone(&self.schema), vec![Arc::new(array)])?; - Ok(Some(batch)) - } + let batch = + RecordBatch::try_new(Arc::clone(&state.schema), vec![Arc::new(array)])?; + Ok(Some((batch, state))) + }); - fn reset_state(&self) -> Arc> { - let mut new = self.clone(); - new.poll_count = 0; - new.value_range = new.original_range.clone(); - Arc::new(RwLock::new(new)) + Ok(Box::pin(RecordBatchStreamAdapter::new( + Arc::clone(&self.schema), + stream, + ))) } } @@ -142,21 +152,20 @@ fn make_lazy_exec_with_range( Boundedness::Bounded }; - // Instantiate the generator with the batch and limit - let batch_gen = RangeBatchGenerator { + // Instantiate a partition with the batch size and range. + let partition = RangePartition { schema: Arc::clone(&schema), boundedness, - value_range: range.clone(), + value_range: range, batch_size: 8192, - poll_count: 0, - original_range: range, }; - // Wrap the generator in a trait object behind Arc> - let generator: Arc> = Arc::new(RwLock::new(batch_gen)); - - // Create a LazyMemoryExec with one partition using our generator - let mut exec = LazyMemoryExec::try_new(schema, vec![generator]).unwrap(); + // Create a LazyMemoryExec with one native partition + let mut exec = LazyMemoryExec::try_new_with_partitions( + schema, + vec![Arc::new(partition) as Arc], + ) + .unwrap(); exec.add_ordering(vec![PhysicalSortExpr::new( Arc::new(Column::new(column_name, 0)), @@ -286,7 +295,7 @@ async fn spill_reader_stream_yield() -> Result<(), Box> { let mut mock_stream = spawn_buffered(stream, buffer_capacity); let schema = mock_stream.schema(); - let consumer_stream = futures::stream::poll_fn(move |cx| { + let consumer_stream = stream::poll_fn(move |cx| { let mut collected = vec![]; // To make sure that inner stream is polled multiple times, loop until the buffer is full // Ideally, the stream will yield before the loop ends From ebd25e4cc1517d75a6ab9948c6c8f3fc65af13c0 Mon Sep 17 00:00:00 2001 From: Ethan Urbanski Date: Sat, 14 Feb 2026 21:31:52 -0500 Subject: [PATCH 4/4] Feat proto support generate_series native partition roundtrip Signed-off-by: Ethan Urbanski --- datafusion/proto/src/physical_plan/mod.rs | 122 ++++++++++++++++-- .../tests/cases/roundtrip_physical_plan.rs | 63 +++++++++ 2 files changed, 175 insertions(+), 10 deletions(-) diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index bfba715b9124..59152455219e 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -48,7 +48,7 @@ use datafusion_datasource_parquet::source::ParquetSource; use datafusion_execution::{FunctionRegistry, TaskContext}; use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; use datafusion_functions_table::generate_series::{ - Empty, GenSeriesArgs, GenerateSeriesTable, GenericSeriesState, TimestampValue, + Empty, GenSeriesArgs, GenerateSeriesPartition, GenericSeriesState, TimestampValue, }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr; @@ -73,7 +73,7 @@ use datafusion_physical_plan::joins::{ StreamJoinPartitionMode, SymmetricHashJoinExec, }; use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; -use datafusion_physical_plan::memory::LazyMemoryExec; +use datafusion_physical_plan::memory::{LazyBatchGeneratorPartition, LazyMemoryExec}; use datafusion_physical_plan::metrics::MetricType; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr}; @@ -2153,10 +2153,16 @@ impl protobuf::PhysicalPlanNode { None => return internal_err!("Missing args in GenerateSeriesNode"), }; - let table = GenerateSeriesTable::new(Arc::clone(&schema), args); - let generator = table.as_generator(generate_series.target_batch_size as usize)?; + let partition = Arc::new(GenerateSeriesPartition::new( + Arc::clone(&schema), + args, + generate_series.target_batch_size as usize, + )); - Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?)) + Ok(Arc::new(LazyMemoryExec::try_new_with_partitions( + schema, + vec![partition], + )?)) } fn try_into_cooperative_physical_plan( @@ -3451,22 +3457,118 @@ impl protobuf::PhysicalPlanNode { } } + fn serialize_generate_series_args( + args: &GenSeriesArgs, + ) -> Result { + match args { + GenSeriesArgs::ContainsNull { name } => { + Ok(protobuf::generate_series_node::Args::ContainsNull( + protobuf::GenerateSeriesArgsContainsNull { + name: Self::str_to_generate_series_name(name)? as i32, + }, + )) + } + GenSeriesArgs::Int64Args { + start, + end, + step, + include_end, + name, + } => Ok(protobuf::generate_series_node::Args::Int64Args( + protobuf::GenerateSeriesArgsInt64 { + start: *start, + end: *end, + step: *step, + include_end: *include_end, + name: Self::str_to_generate_series_name(name)? as i32, + }, + )), + GenSeriesArgs::TimestampArgs { + start, + end, + step, + tz, + include_end, + name, + } => Ok(protobuf::generate_series_node::Args::TimestampArgs( + protobuf::GenerateSeriesArgsTimestamp { + start: *start, + end: *end, + step: Some(datafusion_proto_common::IntervalMonthDayNanoValue { + months: step.months, + days: step.days, + nanos: step.nanoseconds, + }), + include_end: *include_end, + name: Self::str_to_generate_series_name(name)? as i32, + tz: tz.as_ref().map(ToString::to_string), + }, + )), + GenSeriesArgs::DateArgs { + start, + end, + step, + include_end, + name, + } => Ok(protobuf::generate_series_node::Args::DateArgs( + protobuf::GenerateSeriesArgsDate { + start: *start, + end: *end, + step: Some(datafusion_proto_common::IntervalMonthDayNanoValue { + months: step.months, + days: step.days, + nanos: step.nanoseconds, + }), + include_end: *include_end, + name: Self::str_to_generate_series_name(name)? as i32, + }, + )), + } + } + + #[allow(deprecated)] fn try_from_lazy_memory_exec(exec: &LazyMemoryExec) -> Result> { - let generators = exec.generators(); + let partitions = exec.partitions(); - // ensure we only have one generator - let [generator] = generators.as_slice() else { + // ensure we only have one partition + let [partition] = partitions else { + return Ok(None); + }; + + if let Some(generate_series_partition) = + partition.as_any().downcast_ref::() + { + let schema = exec.schema(); + let node = protobuf::GenerateSeriesNode { + schema: Some(schema.as_ref().try_into()?), + target_batch_size: generate_series_partition.batch_size() as u32, + args: Some(Self::serialize_generate_series_args( + generate_series_partition.args(), + )?), + }; + + return Ok(Some(protobuf::PhysicalPlanNode { + physical_plan_type: Some(PhysicalPlanType::GenerateSeries(node)), + })); + } + + let Some(adapter) = partition + .as_any() + .downcast_ref::() + else { return Ok(None); }; - let generator_guard = generator.read(); + let generator_guard = adapter.generator().read(); // Try to downcast to different generate_series types if let Some(empty_gen) = generator_guard.as_any().downcast_ref::() { let schema = exec.schema(); let node = protobuf::GenerateSeriesNode { schema: Some(schema.as_ref().try_into()?), - target_batch_size: 8192, // Default batch size + // Empty generator produces no rows, so batch_size is irrelevant. + // The legacy Empty type doesn't expose batch_size(), so use a default. + target_batch_size: 8192, args: Some(protobuf::generate_series_node::Args::ContainsNull( protobuf::GenerateSeriesArgsContainsNull { name: Self::str_to_generate_series_name(empty_gen.name())? as i32, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index bc310150d898..da9d24bb161d 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1851,6 +1851,69 @@ async fn roundtrip_generate_series() -> Result<()> { Ok(()) } +#[tokio::test] +#[expect(deprecated)] +async fn roundtrip_generate_series_legacy_generator_adapter() -> Result<()> { + use datafusion_functions_table::generate_series::{ + GenSeriesArgs, GenerateSeriesPartition, GenerateSeriesTable, + }; + use datafusion_physical_plan::memory::LazyMemoryExec; + + let schema = Arc::new(Schema::new(Fields::from([Arc::new(Field::new( + "value", + DataType::Int64, + false, + ))]))); + let table = GenerateSeriesTable::new( + Arc::clone(&schema), + GenSeriesArgs::Int64Args { + start: 1, + end: 10, + step: 1, + include_end: true, + name: "generate_series", + }, + ); + let generator = table.as_generator(4)?; + let plan = Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?); + + let node = PhysicalPlanNode::try_from_physical_plan( + plan.clone(), + &DefaultPhysicalExtensionCodec {}, + )?; + let Some(protobuf::physical_plan_node::PhysicalPlanType::GenerateSeries( + generate_series_node, + )) = &node.physical_plan_type + else { + return internal_err!("Expected GenerateSeries physical plan node"); + }; + assert_eq!(generate_series_node.target_batch_size, 4); + + let encoded = node.encode_to_vec(); + let decoded = PhysicalPlanNode::decode(encoded.as_slice()) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + let ctx = SessionContext::new(); + let restored = decoded + .try_into_physical_plan(&ctx.task_ctx(), &DefaultPhysicalExtensionCodec {})?; + + assert_eq!(plan.schema(), restored.schema()); + + let Some(lazy_exec) = restored.as_any().downcast_ref::() else { + return internal_err!("Expected LazyMemoryExec after roundtrip"); + }; + assert_eq!(lazy_exec.partitions().len(), 1); + + let Some(partition) = lazy_exec.partitions()[0] + .as_any() + .downcast_ref::() + else { + return internal_err!("Expected GenerateSeriesPartition after roundtrip"); + }; + assert_eq!(partition.batch_size(), 4); + + Ok(()) +} + #[tokio::test] async fn roundtrip_projection_source() -> Result<()> { let schema = Arc::new(Schema::new(Fields::from([